ssh/channel/local/
channel.rs1use std::io::{Read, Write};
2
3use crate::{
4 algorithm::Digest,
5 client::Client,
6 config::algorithm::AlgList,
7 constant::ssh_connection_code,
8 error::{SshError, SshResult},
9 model::{Data, FlowControl, Packet, RcMut, SecPacket},
10};
11use crate::{constant::ssh_transport_code, model::TerminalSize};
12use tracing::*;
13
14#[cfg(feature = "scp")]
15use super::ChannelScp;
16use super::{ChannelExec, ChannelShell};
17
18pub(super) enum ChannelRead {
19 Data(Vec<u8>),
20 Code(u8),
21}
22
23pub struct Channel<S>
24where
25 S: Read + Write,
26{
27 pub(crate) server_channel_no: u32,
28 pub(crate) client_channel_no: u32,
29 pub(crate) remote_close: bool,
30 pub(crate) local_close: bool,
31 pub(crate) flow_control: FlowControl,
32 pub(crate) client: RcMut<Client>,
33 pub(crate) stream: RcMut<S>,
34 pub(crate) exit_status: u32,
35 pub(crate) terminate_msg: String,
36}
37
38impl<S> Channel<S>
39where
40 S: Read + Write,
41{
42 pub(crate) fn new(
43 server_channel_no: u32,
44 client_channel_no: u32,
45 remote_window: u32,
46 client: RcMut<Client>,
47 stream: RcMut<S>,
48 ) -> Self {
49 Self {
50 server_channel_no,
51 client_channel_no,
52 remote_close: false,
53 local_close: false,
54 flow_control: FlowControl::new(remote_window),
55 client,
56 stream,
57 exit_status: 0,
58 terminate_msg: "".to_owned(),
59 }
60 }
61
62 pub fn exec(self) -> SshResult<ChannelExec<S>> {
65 info!("exec opened.");
66 Ok(ChannelExec::open(self))
67 }
68
69 #[cfg(feature = "scp")]
72 pub fn scp(self) -> SshResult<ChannelScp<S>> {
73 info!("scp opened.");
74 Ok(ChannelScp::open(self))
75 }
76
77 pub fn shell(self, tv: TerminalSize) -> SshResult<ChannelShell<S>> {
82 info!("shell opened.");
83 ChannelShell::open(self, tv)
84 }
85
86 pub fn close(&mut self) -> SshResult<()> {
89 info!("channel close.");
90 self.send_close()?;
91 self.receive_close()
92 }
93
94 pub fn exit_status(&self) -> SshResult<u32> {
99 Ok(self.exit_status)
100 }
101
102 pub fn terminate_msg(&self) -> SshResult<String> {
107 Ok(self.terminate_msg.clone())
108 }
109
110 fn send_close(&mut self) -> SshResult<()> {
111 if self.local_close {
112 return Ok(());
113 }
114 let mut data = Data::new();
115 data.put_u8(ssh_connection_code::CHANNEL_CLOSE)
116 .put_u32(self.server_channel_no);
117 self.local_close = true;
118 self.send(data)
119 }
120
121 fn receive_close(&mut self) -> SshResult<()> {
122 if self.remote_close {
123 return Ok(());
124 }
125 let _ = self.recv_to_end()?;
126 Ok(())
127 }
128
129 pub(super) fn send(&mut self, data: Data) -> SshResult<()> {
130 data.pack(&mut self.client.borrow_mut())
131 .write_stream(&mut *self.stream.borrow_mut())
132 }
133
134 pub(super) fn send_data(&mut self, mut buf: Vec<u8>) -> SshResult<Vec<u8>> {
137 let mut maybe_response = vec![];
138
139 loop {
140 let maybe_remain = self.flow_control.tune_on_send(&mut buf);
142
143 let mut data = Data::new();
145 data.put_u8(ssh_connection_code::CHANNEL_DATA)
146 .put_u32(self.server_channel_no)
147 .put_u8s(&buf);
148 self.send(data)?;
149
150 if maybe_remain.is_empty() {
151 break;
153 } else {
154 buf = maybe_remain
155 }
156
157 while !self.flow_control.can_send() {
159 let buf = self.recv_once()?;
160
161 if let ChannelRead::Data(mut data) = buf {
162 maybe_response.append(&mut data);
163 }
164 }
165 }
166
167 Ok(maybe_response)
168 }
169
170 pub(super) fn recv(&mut self) -> SshResult<Vec<u8>> {
173 while !self.closed() {
174 let maybe_recv = self.recv_once()?;
175
176 if let ChannelRead::Data(data) = maybe_recv {
177 return Ok(data);
178 }
179 }
180 Ok(vec![])
181 }
182
183 pub(super) fn recv_to_end(&mut self) -> SshResult<Vec<u8>> {
184 let mut resp = vec![];
185 while !self.closed() {
186 let mut read_this_time = self.recv()?;
187 resp.append(&mut read_this_time);
188 }
189 Ok(resp)
190 }
191
192 pub(super) fn try_recv(&mut self) -> SshResult<Option<Vec<u8>>> {
193 let data = {
194 match SecPacket::try_from_stream(
195 &mut *self.stream.borrow_mut(),
196 &mut self.client.borrow_mut(),
197 )? {
198 Some(pkt) => Data::unpack(pkt)?,
199 None => return Ok(None),
200 }
201 };
202 if let ChannelRead::Data(d) = self.handle_msg(data)? {
203 Ok(Some(d))
204 } else {
205 Ok(None)
206 }
207 }
208
209 fn recv_once(&mut self) -> SshResult<ChannelRead> {
210 let data = Data::unpack(SecPacket::from_stream(
211 &mut *self.stream.borrow_mut(),
212 &mut self.client.borrow_mut(),
213 )?)?;
214 self.handle_msg(data)
215 }
216
217 fn handle_msg(&mut self, mut data: Data) -> SshResult<ChannelRead> {
218 let message_code = data.get_u8();
219 match message_code {
220 x @ ssh_transport_code::KEXINIT => {
221 data.insert(0, message_code);
222 let mut digest = Digest::new();
223 digest.hash_ctx.set_i_s(&data);
224 let server_algs = AlgList::unpack((data, &mut *self.client.borrow_mut()).into())?;
225 self.client.borrow_mut().key_agreement(
226 &mut *self.stream.borrow_mut(),
227 server_algs,
228 &mut digest,
229 )?;
230 Ok(ChannelRead::Code(x))
231 }
232 x @ ssh_connection_code::CHANNEL_DATA => {
233 let cc = data.get_u32();
234 if cc == self.client_channel_no {
235 let mut data = data.get_u8s();
236
237 self.flow_control.tune_on_recv(&mut data);
239 self.send_window_adjust(data.len() as u32)?;
240
241 return Ok(ChannelRead::Data(data));
242 }
243 Ok(ChannelRead::Code(x))
244 }
245 x @ ssh_connection_code::CHANNEL_EXTENDED_DATA => {
246 let cc = data.get_u32();
247 if cc == self.client_channel_no {
248 let data_type_code = data.get_u32();
249 let mut data = data.get_u8s();
250
251 debug!("Recv extended data with type {data_type_code}");
252
253 self.flow_control.tune_on_recv(&mut data);
255 self.send_window_adjust(data.len() as u32)?;
256
257 return Ok(ChannelRead::Data(data));
258 }
259 Ok(ChannelRead::Code(x))
260 }
261 x @ ssh_connection_code::GLOBAL_REQUEST => {
262 let mut data = Data::new();
263 data.put_u8(ssh_connection_code::REQUEST_FAILURE);
264 self.send(data)?;
265 Ok(ChannelRead::Code(x))
266 }
267 x @ ssh_connection_code::CHANNEL_WINDOW_ADJUST => {
268 data.get_u32();
269 let rws = data.get_u32();
271 self.recv_window_adjust(rws)?;
272 Ok(ChannelRead::Code(x))
273 }
274 x @ ssh_connection_code::CHANNEL_EOF => {
275 debug!("Currently ignore message {}", x);
276 Ok(ChannelRead::Code(x))
277 }
278 x @ ssh_connection_code::CHANNEL_REQUEST => {
279 let cc = data.get_u32();
280 if cc == self.client_channel_no {
281 let status: Vec<u8> = data.get_u8s();
282 if let Ok(status_string) = String::from_utf8(status.clone()) {
283 match status_string.as_str() {
284 "exit-status" => {
285 let _ = self.handle_exit_status(&mut data);
286 }
287 "exit-signal" => {
288 let _ = self.handle_exit_signal(&mut data);
289 }
290 s => {
291 debug!("Currently ignore request {}", s);
292 }
293 }
294 }
295 }
296 Ok(ChannelRead::Code(x))
297 }
298 x @ ssh_connection_code::CHANNEL_SUCCESS => {
299 debug!("Currently ignore message {}", x);
300 Ok(ChannelRead::Code(x))
301 }
302 ssh_connection_code::CHANNEL_FAILURE => {
303 Err(SshError::GeneralError("channel failure.".to_owned()))
304 }
305 x @ ssh_connection_code::CHANNEL_CLOSE => {
306 let cc = data.get_u32();
307 if cc == self.client_channel_no {
308 self.remote_close = true;
309 self.send_close()?;
310 }
311 Ok(ChannelRead::Code(x))
312 }
313 x => {
314 debug!("Currently ignore message {}", x);
315 Ok(ChannelRead::Code(x))
316 }
317 }
318 }
319
320 fn handle_exit_status(&mut self, data: &mut Data) -> SshResult<()> {
321 let maybe_false = data.get_u8();
322 if maybe_false == 0 {
323 self.exit_status = data.get_u32()
324 }
325 Ok(())
326 }
327
328 fn handle_exit_signal(&mut self, data: &mut Data) -> SshResult<()> {
329 let maybe_false = data.get_u8();
330 if maybe_false == 0 {
331 let sig_name = String::from_utf8(data.get_u8s())?;
332 self.terminate_msg += &format!("Current request is terminated by signal: {sig_name}\n");
333 let coredumped = data.get_u8();
334 self.terminate_msg += &format!("Coredumped: {}\n", {
335 if coredumped == 0 {
336 "False"
337 } else {
338 "True"
339 }
340 });
341 let err_msg = String::from_utf8(data.get_u8s())?;
342 self.terminate_msg += &format!("Error message:\n{err_msg}\n");
343 }
344 Ok(())
345 }
346
347 fn send_window_adjust(&mut self, to_add: u32) -> SshResult<()> {
348 let mut data = Data::new();
349 data.put_u8(ssh_connection_code::CHANNEL_WINDOW_ADJUST)
350 .put_u32(self.server_channel_no)
351 .put_u32(to_add);
352 self.flow_control.on_send(to_add);
353 self.send(data)
354 }
355
356 fn recv_window_adjust(&mut self, to_add: u32) -> SshResult<()> {
357 self.flow_control.on_recv(to_add);
358 Ok(())
359 }
360
361 pub fn closed(&self) -> bool {
364 self.local_close && self.remote_close
365 }
366}