1use std::{
2 io::{Read, Write},
3 sync::mpsc::{Receiver, Sender},
4 vec,
5};
6
7use crate::{
8 client::Client,
9 constant::ssh_connection_code,
10 error::{SshError, SshResult},
11 model::{BackendResp, BackendRqst, Data, FlowControl, Packet},
12 TerminalSize,
13};
14use tracing::*;
15
16#[cfg(feature = "scp")]
17use super::channel_scp::ScpBroker;
18use super::{channel_exec::ExecBroker, channel_shell::ShellBrocker};
19
20pub(crate) struct Channel {
21 snd: Sender<BackendResp>,
22 server_channel_no: u32,
23 client_channel_no: u32,
24 remote_close: bool,
25 local_close: bool,
26 flow_control: FlowControl,
27 pending_send: Vec<u8>,
28}
29
30impl Channel {
31 pub fn new(
32 server_channel_no: u32,
33 client_channel_no: u32,
34 remote_window: u32,
35 snd: Sender<BackendResp>,
36 ) -> SshResult<Self> {
37 snd.send(BackendResp::Ok(server_channel_no))?;
38
39 Ok(Self {
40 snd,
41 server_channel_no,
42 client_channel_no,
43 remote_close: false,
44 local_close: false,
45 flow_control: FlowControl::new(remote_window),
46 pending_send: vec![],
47 })
48 }
49
50 pub fn send_data<S>(&mut self, data: Data, client: &mut Client, stream: &mut S) -> SshResult<()>
51 where
52 S: Read + Write,
53 {
54 self.pending_send.append(&mut data.into_inner());
55 self.try_send_data(client, stream)
56 }
57
58 fn try_send_data<S>(&mut self, client: &mut Client, stream: &mut S) -> SshResult<()>
59 where
60 S: Read + Write,
61 {
62 while !self.pending_send.is_empty() {
64 if self.flow_control.can_send() {
65 let maybe_remain = self.flow_control.tune_on_send(&mut self.pending_send);
66
67 let mut data = Data::new();
69 data.put_u8(ssh_connection_code::CHANNEL_DATA)
70 .put_u32(self.server_channel_no)
71 .put_u8s(&self.pending_send);
72
73 self.pending_send = maybe_remain;
75
76 self.send(data, client, stream)?;
77 } else {
78 break;
79 }
80 }
81 Ok(())
82 }
83
84 pub fn send<S>(&mut self, data: Data, client: &mut Client, stream: &mut S) -> SshResult<()>
85 where
86 S: Read + Write,
87 {
88 if !self.closed() {
89 data.pack(client).write_stream(stream)
90 } else {
91 Err(SshError::GeneralError(
92 "Send data on a closed channel".to_owned(),
93 ))
94 }
95 }
96
97 pub fn recv<S>(&mut self, mut data: Data, client: &mut Client, stream: &mut S) -> SshResult<()>
98 where
99 S: Read + Write,
100 {
101 let mut buf = data.get_u8s();
102 self.flow_control.tune_on_recv(&mut buf);
104 self.send_window_adjust(buf.len() as u32, client, stream)?;
105 self.snd.send(BackendResp::Data(buf.into()))?;
106 Ok(())
107 }
108
109 fn send_window_adjust<S>(
110 &mut self,
111 to_add: u32,
112 client: &mut Client,
113 stream: &mut S,
114 ) -> SshResult<()>
115 where
116 S: Read + Write,
117 {
118 let mut data = Data::new();
119 data.put_u8(ssh_connection_code::CHANNEL_WINDOW_ADJUST)
120 .put_u32(self.server_channel_no)
121 .put_u32(to_add);
122 self.flow_control.on_send(to_add);
123 self.send(data, client, stream)
124 }
125
126 pub fn recv_window_adjust<S>(
127 &mut self,
128 to_add: u32,
129 client: &mut Client,
130 stream: &mut S,
131 ) -> SshResult<()>
132 where
133 S: Read + Write,
134 {
135 self.flow_control.on_recv(to_add);
136 if !self.pending_send.is_empty() {
137 self.try_send_data(client, stream)
138 } else {
139 Ok(())
140 }
141 }
142
143 pub fn local_close(&mut self) -> SshResult<()> {
144 trace!("Channel {} send local close", self.client_channel_no);
145 self.local_close = true;
146 Ok(())
147 }
148
149 pub fn remote_close(&mut self) -> SshResult<()> {
150 trace!("Channel {} recv remote close", self.client_channel_no);
151 self.remote_close = true;
152 if !self.local_close {
153 self.snd.send(BackendResp::Close)?;
154 }
155 Ok(())
156 }
157
158 pub fn success(&mut self) -> SshResult<()> {
159 self.snd.send(BackendResp::Ok(self.client_channel_no))?;
160 Ok(())
161 }
162
163 pub fn failed(&mut self) -> SshResult<()> {
164 self.snd.send(BackendResp::Fail("".to_owned()))?;
165 Ok(())
166 }
167
168 pub fn recv_rqst(&mut self, mut data: Data) -> SshResult<()> {
169 let status: Vec<u8> = data.get_u8s();
170 if let Ok(status_string) = String::from_utf8(status.clone()) {
171 match status_string.as_str() {
172 "exit-status" => {
173 let _ = self.handle_exit_status(&mut data);
174 }
175 "exit-signal" => {
176 let _ = self.handle_exit_signal(&mut data);
177 }
178 s => {
179 debug!("Currently ignore request {}", s);
180 }
181 }
182 }
183 Ok(())
184 }
185
186 fn handle_exit_status(&mut self, data: &mut Data) -> SshResult<()> {
187 let maybe_false = data.get_u8();
188 if maybe_false == 0 {
189 self.snd.send(BackendResp::ExitStatus(data.get_u32()))?
190 }
191 Ok(())
192 }
193
194 fn handle_exit_signal(&mut self, data: &mut Data) -> SshResult<()> {
195 let maybe_false = data.get_u8();
196 let mut msg = "".to_owned();
197 if maybe_false == 0 {
198 if let Ok(sig_name) = String::from_utf8(data.get_u8s()) {
199 msg += &format!("Current request is terminated by signal: {sig_name}\n");
200 }
201 let coredumped = data.get_u8();
202 msg += &format!("Coredumped: {}\n", {
203 if coredumped == 0 {
204 "False"
205 } else {
206 "True"
207 }
208 });
209 if let Ok(err_msg) = String::from_utf8(data.get_u8s()) {
210 msg += &format!("Error message:\n{err_msg}\n");
211 }
212 }
213 self.snd.send(BackendResp::TermMsg(msg))?;
214 Ok(())
215 }
216
217 pub fn closed(&self) -> bool {
218 self.local_close && self.remote_close
219 }
220}
221
222impl Drop for Channel {
223 fn drop(&mut self) {
224 info!("Channel {} closed", self.client_channel_no);
225 }
226}
227
228pub struct ChannelBroker {
229 pub(crate) client_channel_no: u32,
230 pub(crate) server_channel_no: u32,
231 pub(crate) rcv: Receiver<BackendResp>,
232 pub(crate) snd: Sender<BackendRqst>,
233 pub(crate) close: bool,
234 pub(crate) exit_status: u32,
235 pub(crate) terminate_msg: String,
236}
237
238impl ChannelBroker {
239 pub(crate) fn new(
240 client_id: u32,
241 server_id: u32,
242 rcv: Receiver<BackendResp>,
243 snd: Sender<BackendRqst>,
244 ) -> Self {
245 Self {
246 client_channel_no: client_id,
247 server_channel_no: server_id,
248 rcv,
249 snd,
250 close: false,
251 exit_status: 0,
252 terminate_msg: "".to_owned(),
253 }
254 }
255
256 pub fn exec(self) -> SshResult<ExecBroker> {
259 Ok(ExecBroker::open(self))
260 }
261
262 #[cfg(feature = "scp")]
265 pub fn scp(self) -> SshResult<ScpBroker> {
266 Ok(ScpBroker::open(self))
267 }
268
269 pub fn shell(self, tv: TerminalSize) -> SshResult<ShellBrocker> {
272 ShellBrocker::open(self, tv)
273 }
274
275 pub fn exit_status(&self) -> SshResult<u32> {
280 Ok(self.exit_status)
281 }
282
283 pub fn terminate_msg(&self) -> SshResult<String> {
288 Ok(self.terminate_msg.clone())
289 }
290
291 pub fn close(&mut self) -> SshResult<()> {
294 if !self.close {
295 let mut data = Data::new();
296 data.put_u8(ssh_connection_code::CHANNEL_CLOSE)
297 .put_u32(self.server_channel_no);
298 self.close = true;
299 self.snd
300 .send(BackendRqst::CloseChannel(self.client_channel_no, data))?;
301 }
302 Ok(())
303 }
304
305 pub(super) fn send_data(&self, data: Data) -> SshResult<()> {
306 self.snd
307 .send(BackendRqst::Data(self.client_channel_no, data))?;
308 Ok(())
309 }
310
311 pub(super) fn send(&self, data: Data) -> SshResult<()> {
312 self.snd
313 .send(BackendRqst::Command(self.client_channel_no, data))?;
314 if !self.close {
315 match self.rcv.recv()? {
316 BackendResp::Ok(_) => trace!("{}: control command ok", self.client_channel_no),
317 BackendResp::Fail(msg) => error!(
318 "{}: channel error with reason {}",
319 self.client_channel_no, msg
320 ),
321 _ => unreachable!(),
322 }
323 }
324 Ok(())
325 }
326
327 pub(super) fn recv(&mut self) -> SshResult<Vec<u8>> {
328 if self.close {
329 Ok(vec![])
330 } else {
331 match self.rcv.recv()? {
332 BackendResp::Close => {
333 self.close = true;
337 Ok(vec![])
338 }
339 BackendResp::ExitStatus(status) => {
340 self.exit_status = status;
341 Ok(vec![])
342 }
343 BackendResp::TermMsg(msg) => {
344 self.terminate_msg = msg;
345 Ok(vec![])
346 }
347 BackendResp::Data(data) => Ok(data.into_inner()),
348 _ => unreachable!(),
349 }
350 }
351 }
352
353 pub(super) fn try_recv(&mut self) -> SshResult<Option<Vec<u8>>> {
354 if !self.close {
355 if let Ok(resp) = self.rcv.try_recv() {
356 match resp {
357 BackendResp::Close => {
358 self.close = true;
362 Ok(None)
363 }
364 BackendResp::Data(data) => Ok(Some(data.into_inner())),
365 BackendResp::ExitStatus(status) => {
366 self.exit_status = status;
367 Ok(None)
368 }
369 BackendResp::TermMsg(msg) => {
370 self.terminate_msg = msg;
371 Ok(None)
372 }
373 _ => unreachable!(),
374 }
375 } else {
376 Ok(None)
377 }
378 } else {
379 Err(SshError::GeneralError(
380 "Read data on a closed channel".to_owned(),
381 ))
382 }
383 }
384
385 pub(super) fn recv_to_end(&mut self) -> SshResult<Vec<u8>> {
386 let mut buf = vec![];
387 while !self.close {
388 buf.append(&mut self.recv()?);
389 }
390 Ok(buf)
391 }
392}
393
394impl Drop for ChannelBroker {
395 fn drop(&mut self) {
396 let _ = self.close();
397 }
398}