ssh/channel/local/
channel.rs

1use std::io::{Read, Write};
2
3use crate::{
4    algorithm::Digest,
5    client::Client,
6    config::algorithm::AlgList,
7    constant::ssh_connection_code,
8    error::{SshError, SshResult},
9    model::{Data, FlowControl, Packet, RcMut, SecPacket},
10};
11use crate::{constant::ssh_transport_code, model::TerminalSize};
12use tracing::*;
13
14#[cfg(feature = "scp")]
15use super::ChannelScp;
16use super::{ChannelExec, ChannelShell};
17
18pub(super) enum ChannelRead {
19    Data(Vec<u8>),
20    Code(u8),
21}
22
23pub struct Channel<S>
24where
25    S: Read + Write,
26{
27    pub(crate) server_channel_no: u32,
28    pub(crate) client_channel_no: u32,
29    pub(crate) remote_close: bool,
30    pub(crate) local_close: bool,
31    pub(crate) flow_control: FlowControl,
32    pub(crate) client: RcMut<Client>,
33    pub(crate) stream: RcMut<S>,
34    pub(crate) exit_status: u32,
35    pub(crate) terminate_msg: String,
36}
37
38impl<S> Channel<S>
39where
40    S: Read + Write,
41{
42    pub(crate) fn new(
43        server_channel_no: u32,
44        client_channel_no: u32,
45        remote_window: u32,
46        client: RcMut<Client>,
47        stream: RcMut<S>,
48    ) -> Self {
49        Self {
50            server_channel_no,
51            client_channel_no,
52            remote_close: false,
53            local_close: false,
54            flow_control: FlowControl::new(remote_window),
55            client,
56            stream,
57            exit_status: 0,
58            terminate_msg: "".to_owned(),
59        }
60    }
61
62    /// convert the raw channel to an [self::ChannelExec]
63    ///
64    pub fn exec(self) -> SshResult<ChannelExec<S>> {
65        info!("exec opened.");
66        Ok(ChannelExec::open(self))
67    }
68
69    /// convert the raw channel to an [self::ChannelScp]
70    ///
71    #[cfg(feature = "scp")]
72    pub fn scp(self) -> SshResult<ChannelScp<S>> {
73        info!("scp opened.");
74        Ok(ChannelScp::open(self))
75    }
76
77    /// convert the raw channel to an [self::ChannelShell]
78    ///
79    /// with `row` lines & `column` characters per one line
80    ///
81    pub fn shell(self, tv: TerminalSize) -> SshResult<ChannelShell<S>> {
82        info!("shell opened.");
83        ChannelShell::open(self, tv)
84    }
85
86    /// close the channel gracefully, but do not consume it
87    ///
88    pub fn close(&mut self) -> SshResult<()> {
89        info!("channel close.");
90        self.send_close()?;
91        self.receive_close()
92    }
93
94    /// <https://datatracker.ietf.org/doc/html/rfc4254#section-6.10>
95    ///
96    /// Return the command execute status
97    ///
98    pub fn exit_status(&self) -> SshResult<u32> {
99        Ok(self.exit_status)
100    }
101
102    /// <https://datatracker.ietf.org/doc/html/rfc4254#section-6.10>
103    ///
104    /// Return the terminate message if the command excution was 'killed' by a signal
105    ///
106    pub fn terminate_msg(&self) -> SshResult<String> {
107        Ok(self.terminate_msg.clone())
108    }
109
110    fn send_close(&mut self) -> SshResult<()> {
111        if self.local_close {
112            return Ok(());
113        }
114        let mut data = Data::new();
115        data.put_u8(ssh_connection_code::CHANNEL_CLOSE)
116            .put_u32(self.server_channel_no);
117        self.local_close = true;
118        self.send(data)
119    }
120
121    fn receive_close(&mut self) -> SshResult<()> {
122        if self.remote_close {
123            return Ok(());
124        }
125        let _ = self.recv_to_end()?;
126        Ok(())
127    }
128
129    pub(super) fn send(&mut self, data: Data) -> SshResult<()> {
130        data.pack(&mut self.client.borrow_mut())
131            .write_stream(&mut *self.stream.borrow_mut())
132    }
133
134    // only send SSH_MSG_CHANNEL_DATA will call this,
135    // for auto adjust the window size
136    pub(super) fn send_data(&mut self, mut buf: Vec<u8>) -> SshResult<Vec<u8>> {
137        let mut maybe_response = vec![];
138
139        loop {
140            // first adjust the data to the max size we can send
141            let maybe_remain = self.flow_control.tune_on_send(&mut buf);
142
143            // send it
144            let mut data = Data::new();
145            data.put_u8(ssh_connection_code::CHANNEL_DATA)
146                .put_u32(self.server_channel_no)
147                .put_u8s(&buf);
148            self.send(data)?;
149
150            if maybe_remain.is_empty() {
151                // if all send, return
152                break;
153            } else {
154                buf = maybe_remain
155            }
156
157            // otherwise wait the server to adjust its window
158            while !self.flow_control.can_send() {
159                let buf = self.recv_once()?;
160
161                if let ChannelRead::Data(mut data) = buf {
162                    maybe_response.append(&mut data);
163                }
164            }
165        }
166
167        Ok(maybe_response)
168    }
169
170    /// this method will receive at least one data packet
171    ///
172    pub(super) fn recv(&mut self) -> SshResult<Vec<u8>> {
173        while !self.closed() {
174            let maybe_recv = self.recv_once()?;
175
176            if let ChannelRead::Data(data) = maybe_recv {
177                return Ok(data);
178            }
179        }
180        Ok(vec![])
181    }
182
183    pub(super) fn recv_to_end(&mut self) -> SshResult<Vec<u8>> {
184        let mut resp = vec![];
185        while !self.closed() {
186            let mut read_this_time = self.recv()?;
187            resp.append(&mut read_this_time);
188        }
189        Ok(resp)
190    }
191
192    pub(super) fn try_recv(&mut self) -> SshResult<Option<Vec<u8>>> {
193        let data = {
194            match SecPacket::try_from_stream(
195                &mut *self.stream.borrow_mut(),
196                &mut self.client.borrow_mut(),
197            )? {
198                Some(pkt) => Data::unpack(pkt)?,
199                None => return Ok(None),
200            }
201        };
202        if let ChannelRead::Data(d) = self.handle_msg(data)? {
203            Ok(Some(d))
204        } else {
205            Ok(None)
206        }
207    }
208
209    fn recv_once(&mut self) -> SshResult<ChannelRead> {
210        let data = Data::unpack(SecPacket::from_stream(
211            &mut *self.stream.borrow_mut(),
212            &mut self.client.borrow_mut(),
213        )?)?;
214        self.handle_msg(data)
215    }
216
217    fn handle_msg(&mut self, mut data: Data) -> SshResult<ChannelRead> {
218        let message_code = data.get_u8();
219        match message_code {
220            x @ ssh_transport_code::KEXINIT => {
221                data.insert(0, message_code);
222                let mut digest = Digest::new();
223                digest.hash_ctx.set_i_s(&data);
224                let server_algs = AlgList::unpack((data, &mut *self.client.borrow_mut()).into())?;
225                self.client.borrow_mut().key_agreement(
226                    &mut *self.stream.borrow_mut(),
227                    server_algs,
228                    &mut digest,
229                )?;
230                Ok(ChannelRead::Code(x))
231            }
232            x @ ssh_connection_code::CHANNEL_DATA => {
233                let cc = data.get_u32();
234                if cc == self.client_channel_no {
235                    let mut data = data.get_u8s();
236
237                    // flow_control
238                    self.flow_control.tune_on_recv(&mut data);
239                    self.send_window_adjust(data.len() as u32)?;
240
241                    return Ok(ChannelRead::Data(data));
242                }
243                Ok(ChannelRead::Code(x))
244            }
245            x @ ssh_connection_code::CHANNEL_EXTENDED_DATA => {
246                let cc = data.get_u32();
247                if cc == self.client_channel_no {
248                    let data_type_code = data.get_u32();
249                    let mut data = data.get_u8s();
250
251                    debug!("Recv extended data with type {data_type_code}");
252
253                    // flow_contrl
254                    self.flow_control.tune_on_recv(&mut data);
255                    self.send_window_adjust(data.len() as u32)?;
256
257                    return Ok(ChannelRead::Data(data));
258                }
259                Ok(ChannelRead::Code(x))
260            }
261            x @ ssh_connection_code::GLOBAL_REQUEST => {
262                let mut data = Data::new();
263                data.put_u8(ssh_connection_code::REQUEST_FAILURE);
264                self.send(data)?;
265                Ok(ChannelRead::Code(x))
266            }
267            x @ ssh_connection_code::CHANNEL_WINDOW_ADJUST => {
268                data.get_u32();
269                // to add
270                let rws = data.get_u32();
271                self.recv_window_adjust(rws)?;
272                Ok(ChannelRead::Code(x))
273            }
274            x @ ssh_connection_code::CHANNEL_EOF => {
275                debug!("Currently ignore message {}", x);
276                Ok(ChannelRead::Code(x))
277            }
278            x @ ssh_connection_code::CHANNEL_REQUEST => {
279                let cc = data.get_u32();
280                if cc == self.client_channel_no {
281                    let status: Vec<u8> = data.get_u8s();
282                    if let Ok(status_string) = String::from_utf8(status.clone()) {
283                        match status_string.as_str() {
284                            "exit-status" => {
285                                let _ = self.handle_exit_status(&mut data);
286                            }
287                            "exit-signal" => {
288                                let _ = self.handle_exit_signal(&mut data);
289                            }
290                            s => {
291                                debug!("Currently ignore request {}", s);
292                            }
293                        }
294                    }
295                }
296                Ok(ChannelRead::Code(x))
297            }
298            x @ ssh_connection_code::CHANNEL_SUCCESS => {
299                debug!("Currently ignore message {}", x);
300                Ok(ChannelRead::Code(x))
301            }
302            ssh_connection_code::CHANNEL_FAILURE => {
303                Err(SshError::GeneralError("channel failure.".to_owned()))
304            }
305            x @ ssh_connection_code::CHANNEL_CLOSE => {
306                let cc = data.get_u32();
307                if cc == self.client_channel_no {
308                    self.remote_close = true;
309                    self.send_close()?;
310                }
311                Ok(ChannelRead::Code(x))
312            }
313            x => {
314                debug!("Currently ignore message {}", x);
315                Ok(ChannelRead::Code(x))
316            }
317        }
318    }
319
320    fn handle_exit_status(&mut self, data: &mut Data) -> SshResult<()> {
321        let maybe_false = data.get_u8();
322        if maybe_false == 0 {
323            self.exit_status = data.get_u32()
324        }
325        Ok(())
326    }
327
328    fn handle_exit_signal(&mut self, data: &mut Data) -> SshResult<()> {
329        let maybe_false = data.get_u8();
330        if maybe_false == 0 {
331            let sig_name = String::from_utf8(data.get_u8s())?;
332            self.terminate_msg += &format!("Current request is terminated by signal: {sig_name}\n");
333            let coredumped = data.get_u8();
334            self.terminate_msg += &format!("Coredumped: {}\n", {
335                if coredumped == 0 {
336                    "False"
337                } else {
338                    "True"
339                }
340            });
341            let err_msg = String::from_utf8(data.get_u8s())?;
342            self.terminate_msg += &format!("Error message:\n{err_msg}\n");
343        }
344        Ok(())
345    }
346
347    fn send_window_adjust(&mut self, to_add: u32) -> SshResult<()> {
348        let mut data = Data::new();
349        data.put_u8(ssh_connection_code::CHANNEL_WINDOW_ADJUST)
350            .put_u32(self.server_channel_no)
351            .put_u32(to_add);
352        self.flow_control.on_send(to_add);
353        self.send(data)
354    }
355
356    fn recv_window_adjust(&mut self, to_add: u32) -> SshResult<()> {
357        self.flow_control.on_recv(to_add);
358        Ok(())
359    }
360
361    /// Return if the channel is closed
362    ///
363    pub fn closed(&self) -> bool {
364        self.local_close && self.remote_close
365    }
366}