ssh/channel/backend/
channel.rs

1use std::{
2    io::{Read, Write},
3    sync::mpsc::{Receiver, Sender},
4    vec,
5};
6
7use crate::{
8    client::Client,
9    constant::ssh_connection_code,
10    error::{SshError, SshResult},
11    model::{BackendResp, BackendRqst, Data, FlowControl, Packet},
12    TerminalSize,
13};
14use tracing::*;
15
16#[cfg(feature = "scp")]
17use super::channel_scp::ScpBroker;
18use super::{channel_exec::ExecBroker, channel_shell::ShellBrocker};
19
20pub(crate) struct Channel {
21    snd: Sender<BackendResp>,
22    server_channel_no: u32,
23    client_channel_no: u32,
24    remote_close: bool,
25    local_close: bool,
26    flow_control: FlowControl,
27    pending_send: Vec<u8>,
28}
29
30impl Channel {
31    pub fn new(
32        server_channel_no: u32,
33        client_channel_no: u32,
34        remote_window: u32,
35        snd: Sender<BackendResp>,
36    ) -> SshResult<Self> {
37        snd.send(BackendResp::Ok(server_channel_no))?;
38
39        Ok(Self {
40            snd,
41            server_channel_no,
42            client_channel_no,
43            remote_close: false,
44            local_close: false,
45            flow_control: FlowControl::new(remote_window),
46            pending_send: vec![],
47        })
48    }
49
50    pub fn send_data<S>(&mut self, data: Data, client: &mut Client, stream: &mut S) -> SshResult<()>
51    where
52        S: Read + Write,
53    {
54        self.pending_send.append(&mut data.into_inner());
55        self.try_send_data(client, stream)
56    }
57
58    fn try_send_data<S>(&mut self, client: &mut Client, stream: &mut S) -> SshResult<()>
59    where
60        S: Read + Write,
61    {
62        // try to send as much as we can
63        while !self.pending_send.is_empty() {
64            if self.flow_control.can_send() {
65                let maybe_remain = self.flow_control.tune_on_send(&mut self.pending_send);
66
67                // send it
68                let mut data = Data::new();
69                data.put_u8(ssh_connection_code::CHANNEL_DATA)
70                    .put_u32(self.server_channel_no)
71                    .put_u8s(&self.pending_send);
72
73                // update remain
74                self.pending_send = maybe_remain;
75
76                self.send(data, client, stream)?;
77            } else {
78                break;
79            }
80        }
81        Ok(())
82    }
83
84    pub fn send<S>(&mut self, data: Data, client: &mut Client, stream: &mut S) -> SshResult<()>
85    where
86        S: Read + Write,
87    {
88        if !self.closed() {
89            data.pack(client).write_stream(stream)
90        } else {
91            Err(SshError::GeneralError(
92                "Send data on a closed channel".to_owned(),
93            ))
94        }
95    }
96
97    pub fn recv<S>(&mut self, mut data: Data, client: &mut Client, stream: &mut S) -> SshResult<()>
98    where
99        S: Read + Write,
100    {
101        let mut buf = data.get_u8s();
102        // flow_control
103        self.flow_control.tune_on_recv(&mut buf);
104        self.send_window_adjust(buf.len() as u32, client, stream)?;
105        self.snd.send(BackendResp::Data(buf.into()))?;
106        Ok(())
107    }
108
109    fn send_window_adjust<S>(
110        &mut self,
111        to_add: u32,
112        client: &mut Client,
113        stream: &mut S,
114    ) -> SshResult<()>
115    where
116        S: Read + Write,
117    {
118        let mut data = Data::new();
119        data.put_u8(ssh_connection_code::CHANNEL_WINDOW_ADJUST)
120            .put_u32(self.server_channel_no)
121            .put_u32(to_add);
122        self.flow_control.on_send(to_add);
123        self.send(data, client, stream)
124    }
125
126    pub fn recv_window_adjust<S>(
127        &mut self,
128        to_add: u32,
129        client: &mut Client,
130        stream: &mut S,
131    ) -> SshResult<()>
132    where
133        S: Read + Write,
134    {
135        self.flow_control.on_recv(to_add);
136        if !self.pending_send.is_empty() {
137            self.try_send_data(client, stream)
138        } else {
139            Ok(())
140        }
141    }
142
143    pub fn local_close(&mut self) -> SshResult<()> {
144        trace!("Channel {} send local close", self.client_channel_no);
145        self.local_close = true;
146        Ok(())
147    }
148
149    pub fn remote_close(&mut self) -> SshResult<()> {
150        trace!("Channel {} recv remote close", self.client_channel_no);
151        self.remote_close = true;
152        if !self.local_close {
153            self.snd.send(BackendResp::Close)?;
154        }
155        Ok(())
156    }
157
158    pub fn success(&mut self) -> SshResult<()> {
159        self.snd.send(BackendResp::Ok(self.client_channel_no))?;
160        Ok(())
161    }
162
163    pub fn failed(&mut self) -> SshResult<()> {
164        self.snd.send(BackendResp::Fail("".to_owned()))?;
165        Ok(())
166    }
167
168    pub fn recv_rqst(&mut self, mut data: Data) -> SshResult<()> {
169        let status: Vec<u8> = data.get_u8s();
170        if let Ok(status_string) = String::from_utf8(status.clone()) {
171            match status_string.as_str() {
172                "exit-status" => {
173                    let _ = self.handle_exit_status(&mut data);
174                }
175                "exit-signal" => {
176                    let _ = self.handle_exit_signal(&mut data);
177                }
178                s => {
179                    debug!("Currently ignore request {}", s);
180                }
181            }
182        }
183        Ok(())
184    }
185
186    fn handle_exit_status(&mut self, data: &mut Data) -> SshResult<()> {
187        let maybe_false = data.get_u8();
188        if maybe_false == 0 {
189            self.snd.send(BackendResp::ExitStatus(data.get_u32()))?
190        }
191        Ok(())
192    }
193
194    fn handle_exit_signal(&mut self, data: &mut Data) -> SshResult<()> {
195        let maybe_false = data.get_u8();
196        let mut msg = "".to_owned();
197        if maybe_false == 0 {
198            if let Ok(sig_name) = String::from_utf8(data.get_u8s()) {
199                msg += &format!("Current request is terminated by signal: {sig_name}\n");
200            }
201            let coredumped = data.get_u8();
202            msg += &format!("Coredumped: {}\n", {
203                if coredumped == 0 {
204                    "False"
205                } else {
206                    "True"
207                }
208            });
209            if let Ok(err_msg) = String::from_utf8(data.get_u8s()) {
210                msg += &format!("Error message:\n{err_msg}\n");
211            }
212        }
213        self.snd.send(BackendResp::TermMsg(msg))?;
214        Ok(())
215    }
216
217    pub fn closed(&self) -> bool {
218        self.local_close && self.remote_close
219    }
220}
221
222impl Drop for Channel {
223    fn drop(&mut self) {
224        info!("Channel {} closed", self.client_channel_no);
225    }
226}
227
228pub struct ChannelBroker {
229    pub(crate) client_channel_no: u32,
230    pub(crate) server_channel_no: u32,
231    pub(crate) rcv: Receiver<BackendResp>,
232    pub(crate) snd: Sender<BackendRqst>,
233    pub(crate) close: bool,
234    pub(crate) exit_status: u32,
235    pub(crate) terminate_msg: String,
236}
237
238impl ChannelBroker {
239    pub(crate) fn new(
240        client_id: u32,
241        server_id: u32,
242        rcv: Receiver<BackendResp>,
243        snd: Sender<BackendRqst>,
244    ) -> Self {
245        Self {
246            client_channel_no: client_id,
247            server_channel_no: server_id,
248            rcv,
249            snd,
250            close: false,
251            exit_status: 0,
252            terminate_msg: "".to_owned(),
253        }
254    }
255
256    /// open a [ExecBroker] channel which can excute commands
257    ///
258    pub fn exec(self) -> SshResult<ExecBroker> {
259        Ok(ExecBroker::open(self))
260    }
261
262    /// open a [ScpBroker] channel which can download/upload files/directories
263    ///
264    #[cfg(feature = "scp")]
265    pub fn scp(self) -> SshResult<ScpBroker> {
266        Ok(ScpBroker::open(self))
267    }
268
269    /// open a [ShellBrocker] channel which  can be used as a pseudo terminal (AKA PTY)
270    ///
271    pub fn shell(self, tv: TerminalSize) -> SshResult<ShellBrocker> {
272        ShellBrocker::open(self, tv)
273    }
274
275    /// <https://datatracker.ietf.org/doc/html/rfc4254#section-6.10>
276    ///
277    /// Return the command execute status
278    ///
279    pub fn exit_status(&self) -> SshResult<u32> {
280        Ok(self.exit_status)
281    }
282
283    /// <https://datatracker.ietf.org/doc/html/rfc4254#section-6.10>
284    ///
285    /// Return the terminate message if the command excution was 'killed' by a signal
286    ///
287    pub fn terminate_msg(&self) -> SshResult<String> {
288        Ok(self.terminate_msg.clone())
289    }
290
291    /// close the backend channel but do not consume
292    ///
293    pub fn close(&mut self) -> SshResult<()> {
294        if !self.close {
295            let mut data = Data::new();
296            data.put_u8(ssh_connection_code::CHANNEL_CLOSE)
297                .put_u32(self.server_channel_no);
298            self.close = true;
299            self.snd
300                .send(BackendRqst::CloseChannel(self.client_channel_no, data))?;
301        }
302        Ok(())
303    }
304
305    pub(super) fn send_data(&self, data: Data) -> SshResult<()> {
306        self.snd
307            .send(BackendRqst::Data(self.client_channel_no, data))?;
308        Ok(())
309    }
310
311    pub(super) fn send(&self, data: Data) -> SshResult<()> {
312        self.snd
313            .send(BackendRqst::Command(self.client_channel_no, data))?;
314        if !self.close {
315            match self.rcv.recv()? {
316                BackendResp::Ok(_) => trace!("{}: control command ok", self.client_channel_no),
317                BackendResp::Fail(msg) => error!(
318                    "{}: channel error with reason {}",
319                    self.client_channel_no, msg
320                ),
321                _ => unreachable!(),
322            }
323        }
324        Ok(())
325    }
326
327    pub(super) fn recv(&mut self) -> SshResult<Vec<u8>> {
328        if self.close {
329            Ok(vec![])
330        } else {
331            match self.rcv.recv()? {
332                BackendResp::Close => {
333                    // the remote actively close their end
334                    // but we can send close later when the broker get dropped
335                    // just set a flag here
336                    self.close = true;
337                    Ok(vec![])
338                }
339                BackendResp::ExitStatus(status) => {
340                    self.exit_status = status;
341                    Ok(vec![])
342                }
343                BackendResp::TermMsg(msg) => {
344                    self.terminate_msg = msg;
345                    Ok(vec![])
346                }
347                BackendResp::Data(data) => Ok(data.into_inner()),
348                _ => unreachable!(),
349            }
350        }
351    }
352
353    pub(super) fn try_recv(&mut self) -> SshResult<Option<Vec<u8>>> {
354        if !self.close {
355            if let Ok(resp) = self.rcv.try_recv() {
356                match resp {
357                    BackendResp::Close => {
358                        // the remote actively close their end
359                        // but we can send close later when the broker get dropped
360                        // just set a flag here
361                        self.close = true;
362                        Ok(None)
363                    }
364                    BackendResp::Data(data) => Ok(Some(data.into_inner())),
365                    BackendResp::ExitStatus(status) => {
366                        self.exit_status = status;
367                        Ok(None)
368                    }
369                    BackendResp::TermMsg(msg) => {
370                        self.terminate_msg = msg;
371                        Ok(None)
372                    }
373                    _ => unreachable!(),
374                }
375            } else {
376                Ok(None)
377            }
378        } else {
379            Err(SshError::GeneralError(
380                "Read data on a closed channel".to_owned(),
381            ))
382        }
383    }
384
385    pub(super) fn recv_to_end(&mut self) -> SshResult<Vec<u8>> {
386        let mut buf = vec![];
387        while !self.close {
388            buf.append(&mut self.recv()?);
389        }
390        Ok(buf)
391    }
392}
393
394impl Drop for ChannelBroker {
395    fn drop(&mut self) {
396        let _ = self.close();
397    }
398}