makiko/client/
session.rs

1use bytes::Bytes;
2use futures_core::ready;
3use std::future::Future;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6use tokio::sync::oneshot;
7use crate::codec::{PacketDecode, PacketEncode};
8use crate::error::{Result, Error};
9use super::channel::{
10    Channel, ChannelReceiver, ChannelEvent, ChannelConfig,
11    ChannelReq, ChannelReply, DATA_STANDARD, DATA_STDERR
12};
13use super::client::Client;
14
15/// Handle to an SSH session.
16///
17/// SSH session (RFC 4254, section 6) corresponds to the execution of a single process. The
18/// [`Session`] is used to send requests and data to the server, and [`SessionReceiver`] will
19/// receive the requests and data from the server. To open the session, use
20/// [`Client::open_session()`].
21///
22/// Once the session is open, you will typically go through three stages:
23/// - prepare the execution environment: [`env()`][Self::env()],
24/// [`request_pty()`][Self::request_pty()],
25/// - start the execution: [`shell()`][Self::shell()], [`exec()`][Self::exec()],
26/// [`subsystem()`][Self::subsystem()],
27/// - interact with the process: [`send_stdin()`][Self::send_stdin()],
28/// [`send_eof()`][Self::send_eof()], [`signal()`][Self::signal()].
29///
30/// In parallel, you will handle the events produced by [`SessionReceiver`].
31///
32/// An SSH session is a particular type of an SSH channel. However, this object provides higher
33/// level API than a raw [`Channel`].
34///
35/// You can cheaply clone this object and safely share the clones between tasks.
36#[derive(Clone)]
37pub struct Session {
38    channel: Channel,
39}
40
41impl Session {
42    pub(super) async fn open(client: &Client, config: ChannelConfig) -> Result<(Session, SessionReceiver)> {
43        let (channel, channel_rx, _) = client.open_channel("session".into(), config, Bytes::new()).await?;
44        Ok((Session { channel }, SessionReceiver { channel_rx }))
45    }
46
47    /// Get the [`Client`] that this session belongs to.
48    pub fn client(&self) -> Client {
49        self.channel.client()
50    }
51
52    /// Close the session.
53    ///
54    /// We won't send any further requests or data to the server and the session will be closed
55    /// once the server acknowledges our request.
56    ///
57    /// This method is idempotent: if the session is already closed or closing, we do nothing.
58    pub fn close(&self) -> Result<()> {
59        self.channel.close()
60    }
61}
62
63/// # Preparing the execution environment
64///
65/// Use these methods to configure the session before starting the process.
66impl Session {
67    /// Pass an environment variable to the future process.
68    ///
69    /// This will set an environment variable for the process that will be started later.
70    ///
71    /// This method returns immediately without any blocking, but you may use the returned
72    /// [`SessionResp`] to wait for the server response.
73    pub fn env(&self, name: &[u8], value: &[u8]) -> Result<SessionResp> {
74        let (reply_tx, reply_rx) = oneshot::channel();
75        let mut payload = PacketEncode::new();
76        payload.put_bytes(name);
77        payload.put_bytes(value);
78        self.channel.send_request(ChannelReq {
79            request_type: "env".into(),
80            payload: payload.finish(),
81            reply_tx: Some(reply_tx),
82        })?;
83        Ok(SessionResp { reply_rx })
84    }
85
86    /// Request a pseudo-terminal (pty) for the future process.
87    ///
88    /// This will allocate a pseudo-terminal according to the `request`.
89    ///
90    /// This method returns immediately without any blocking, but you may use the returned
91    /// [`SessionResp`] to wait for the server response.
92    pub fn request_pty(&self, request: &PtyRequest) -> Result<SessionResp> {
93        let (reply_tx, reply_rx) = oneshot::channel();
94        let mut payload = PacketEncode::new();
95        payload.put_str(&request.term);
96        payload.put_u32(request.width);
97        payload.put_u32(request.height);
98        payload.put_u32(request.width_px);
99        payload.put_u32(request.height_px);
100
101        let mut encoded_modes = PacketEncode::new();
102        for &(op, arg) in request.modes.opcodes.iter() {
103            encoded_modes.put_u8(op);
104            encoded_modes.put_u32(arg);
105        }
106        encoded_modes.put_u8(0);
107        payload.put_bytes(&encoded_modes.finish());
108
109        self.channel.send_request(ChannelReq {
110            request_type: "pty-req".into(),
111            payload: payload.finish(),
112            reply_tx: Some(reply_tx),
113        })?;
114        Ok(SessionResp { reply_rx })
115    }
116}
117
118/// # Starting the process
119///
120/// Use one of these methods to start the remote process. Only one of them can succeed, you cannot
121/// start multiple processes with a single session (but you may open multiple sessions).
122impl Session {
123    /// Start the user's default shell on the server.
124    ///
125    /// This method returns immediately without any blocking, but you may use the returned
126    /// [`SessionResp`] to wait for the server response.
127    pub fn shell(&self) -> Result<SessionResp> {
128        let (reply_tx, reply_rx) = oneshot::channel();
129        self.channel.send_request(ChannelReq {
130            request_type: "shell".into(),
131            payload: Bytes::new(),
132            reply_tx: Some(reply_tx),
133        })?;
134        Ok(SessionResp { reply_rx })
135    }
136
137    /// Start a command on the server.
138    ///
139    /// This method returns immediately without any blocking, but you may use the returned
140    /// [`SessionResp`] to wait for the server response.
141    pub fn exec(&self, command: &[u8]) -> Result<SessionResp> {
142        let (reply_tx, reply_rx) = oneshot::channel();
143        let mut payload = PacketEncode::new();
144        payload.put_bytes(command);
145        self.channel.send_request(ChannelReq {
146            request_type: "exec".into(),
147            payload: payload.finish(),
148            reply_tx: Some(reply_tx),
149        })?;
150        Ok(SessionResp { reply_rx })
151    }
152
153    /// Start an SSH subsystem on the server.
154    ///
155    /// Subsystems are described in RFC 4254, section 6.5.
156    ///
157    /// This method returns immediately without any blocking, but you may use the returned
158    /// [`SessionResp`] to wait for the server response.
159    pub fn subsystem(&self, subsystem_name: &str) -> Result<SessionResp> {
160        let (reply_tx, reply_rx) = oneshot::channel();
161        let mut payload = PacketEncode::new();
162        payload.put_str(subsystem_name);
163        self.channel.send_request(ChannelReq {
164            request_type: "subsystem".into(),
165            payload: payload.finish(),
166            reply_tx: Some(reply_tx),
167        })?;
168        Ok(SessionResp { reply_rx })
169    }
170}
171
172/// # Interacting with a running process
173impl Session {
174    /// Send data to the standard input of the running process.
175    ///
176    /// This method returns after all bytes have been accepted by the flow control mechanism and
177    /// written to the internal send buffer, but before we send them to the socket (or other I/O
178    /// stream that backs this SSH connection).
179    pub async fn send_stdin(&self, data: Bytes) -> Result<()> {
180        self.channel.send_data(data, DATA_STANDARD).await
181    }
182
183    /// Close the standard input of the running process.
184    ///
185    /// This method returns after all bytes previously sent to this session have been accepted by
186    /// the flow control mechanism, but before we write the message to the socket (or other I/O
187    /// stream that backs this SSH connection).
188    ///
189    /// If the session is closed before you call this method, or if it closes before this method
190    /// returns, we quietly ignore this error and return `Ok`.
191    pub async fn send_eof(&self) -> Result<()> {
192        self.channel.send_eof().await
193    }
194
195    /// Deliver a signal to the running process.
196    ///
197    /// Signal names are described in RFC 4254, section 6.10.
198    /// [`codes::signal`][crate::codes::signal] lists the signal names defined by SSH.
199    ///
200    /// This method returns immediately without any blocking, it is not possible to get a reply
201    /// from the server.
202    pub fn signal(&self, signal_name: &str) -> Result<()> {
203        let mut payload = PacketEncode::new();
204        payload.put_str(signal_name);
205        self.channel.send_request(ChannelReq {
206            request_type: "signal".into(),
207            payload: payload.finish(),
208            reply_tx: None,
209        })?;
210        Ok(())
211    }
212
213    /// Notify the process that the terminal window size has changed.
214    ///
215    /// This method returns immediately without any blocking, it is not possible to get a reply
216    /// from the server.
217    pub fn window_change(&self, change: &WindowChange) -> Result<()> {
218        let mut payload = PacketEncode::new();
219        payload.put_u32(change.width);
220        payload.put_u32(change.height);
221        payload.put_u32(change.width_px);
222        payload.put_u32(change.height_px);
223        self.channel.send_request(ChannelReq {
224            request_type: "window-change".into(),
225            payload: payload.finish(),
226            reply_tx: None,
227        })?;
228        Ok(())
229    }
230}
231
232
233
234/// Future server response to a [`Session`] request.
235///
236/// You may either wait for the response using [`.wait()`][Self::wait()], or ignore the response
237/// using [`.ignore()`][Self::ignore()].
238#[derive(Debug)]
239#[must_use = "please use .wait().await to await the response, or .ignore() to ignore it"]
240pub struct SessionResp {
241    reply_rx: oneshot::Receiver<ChannelReply>,
242}
243
244impl SessionResp {
245    /// Wait for the response from the server.
246    ///
247    /// If the request failed, this returns an error ([`Error::ChannelReq`]).
248    pub async fn wait(self) -> Result<()> {
249        match self.reply_rx.await {
250            Ok(ChannelReply::Success) => Ok(()),
251            Ok(ChannelReply::Failure) => Err(Error::ChannelReq),
252            Err(_) => Err(Error::ChannelClosed),
253        }
254    }
255
256    /// Ignore the response.
257    ///
258    /// This just drops the [`SessionResp`], but it is a good practice to do this explicitly
259    /// with this method.
260    pub fn ignore(self) {}
261}
262
263
264
265/// An event returned from [`SessionReceiver`].
266///
267/// These are events related to a particular SSH session, they correspond to the requests and data
268/// sent by the server.
269///
270/// This enum is marked as `#[non_exhaustive]`, so that we can add new variants without breaking
271/// backwards compatibility. It should always be safe to ignore any events that you don't intend to
272/// handle.
273#[derive(Debug)]
274#[non_exhaustive]
275pub enum SessionEvent {
276    /// Data from the standard output of the running process.
277    ///
278    /// You should handle this data as a byte stream, the boundaries between consecutive
279    /// `StdoutData` events might be arbitrary.
280    StdoutData(Bytes),
281
282    /// Data from the standard error of the running process.
283    ///
284    /// You should handle this data as a byte stream, the boundaries between consecutive
285    /// `StderrData` events might be arbitrary.
286    StderrData(Bytes),
287
288    /// End-of-file marker from the running process.
289    ///
290    /// After this, the server should not send more data (both stdout and stderr).
291    Eof,
292
293    /// The process terminated with given exit status.
294    ExitStatus(u32),
295
296    /// The process terminated violently due to a signal.
297    ExitSignal(ExitSignal),
298}
299
300/// Information about a process that terminated due to a signal.
301#[derive(Debug)]
302pub struct ExitSignal {
303    /// Name of the signal that terminated the process.
304    ///
305    /// Signal names are described in RFC 4254, section 6.10.
306    /// [`codes::signal`][crate::codes::signal] lists the signal names defined by SSH.
307    pub signal_name: String,
308
309    /// True if the process produced a core dump.
310    pub core_dumped: bool,
311
312    /// Error message.
313    pub message: String,
314
315    /// Language tag of `message` (per RFC 3066).
316    pub message_lang: String,
317}
318
319/// Receiving half of a [`Session`].
320///
321/// [`SessionReceiver`] produces [`SessionEvent`]s, which correspond to the requests and data sent
322/// by the server on the channel. You can ignore these events if you don't need them, but you
323/// **must** receive them, otherwise the client will stall when the internal buffer of events fills
324/// up.
325#[derive(Debug)]
326pub struct SessionReceiver {
327    channel_rx: ChannelReceiver,
328}
329
330impl SessionReceiver {
331    /// Wait for the next event.
332    ///
333    /// Returns `None` if the session was closed.
334    pub async fn recv(&mut self) -> Result<Option<SessionEvent>> {
335        struct Recv<'a> { rx: &'a mut SessionReceiver }
336        impl<'a> Future for Recv<'a> {
337            type Output = Result<Option<SessionEvent>>;
338            fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
339                self.rx.poll_recv(cx)
340            }
341        }
342        Recv { rx: self }.await
343    }
344
345    /// Poll-friendly variant of [`.recv()`][Self::recv()].
346    pub fn poll_recv(&mut self, cx: &mut Context) -> Poll<Result<Option<SessionEvent>>> {
347        loop {
348            match ready!(self.channel_rx.poll_recv(cx)) {
349                Some(channel_event) => match translate_event(channel_event)? {
350                    Some(event) => return Poll::Ready(Ok(Some(event))),
351                    None => continue,
352                },
353                None => return Poll::Ready(Ok(None)),
354            }
355        }
356    }
357}
358
359fn translate_event(event: ChannelEvent) -> Result<Option<SessionEvent>> {
360    Ok(match event {
361        ChannelEvent::Data(data, DATA_STANDARD) =>
362            Some(SessionEvent::StdoutData(data)),
363        ChannelEvent::Data(data, DATA_STDERR) =>
364            Some(SessionEvent::StderrData(data)),
365        ChannelEvent::Data(_data, _) =>
366            None,
367        ChannelEvent::Eof =>
368            Some(SessionEvent::Eof),
369        ChannelEvent::Request(req) =>
370            translate_request(req)?,
371    })
372}
373
374fn translate_request(request: ChannelReq) -> Result<Option<SessionEvent>> {
375    let mut payload = PacketDecode::new(request.payload);
376    let event = match request.request_type.as_str() {
377        "exit-status" => {
378            let status = payload.get_u32()?;
379            SessionEvent::ExitStatus(status)
380        },
381        "exit-signal" => {
382            let signal_name = payload.get_string()?;
383            let core_dumped = payload.get_bool()?;
384            let message = payload.get_string()?;
385            let message_lang = payload.get_string()?;
386            let signal = ExitSignal { signal_name, core_dumped, message, message_lang };
387            SessionEvent::ExitSignal(signal)
388        },
389        _ =>
390            return Ok(None)
391    };
392
393    if let Some(reply_tx) = request.reply_tx {
394        let _: Result<_, _> = reply_tx.send(ChannelReply::Success);
395    }
396    Ok(Some(event))
397}
398
399
400
401/// Pseudo-terminal request.
402///
403/// Request for allocation of a pseudo-terminal (pty), used with [`Session::request_pty()`], as
404/// described in RFC 4254, section 6.2.
405#[derive(Debug, Clone, Default)]
406pub struct PtyRequest {
407    /// Value of the `TERM` environment variable (e.g. `"vt100"`).
408    pub term: String,
409    /// Terminal width in characters (e.g. 80).
410    pub width: u32,
411    /// Terminal height in rows (e.g. 24).
412    pub height: u32,
413    /// Terminal width in pixels (e.g. 640).
414    pub width_px: u32,
415    /// Terminal height in pixels (e.g. 480).
416    pub height_px: u32,
417    /// Terminal modes.
418    pub modes: PtyTerminalModes,
419}
420
421/// Terminal modes for a [`PtyRequest`].
422///
423/// The terminal modes are encoded as a sequence of opcodes that take a single `u32` argument. For
424/// more details, please see RFC 4254, section 8. 
425#[derive(Debug, Clone, Default)]
426pub struct PtyTerminalModes {
427    opcodes: Vec<(u8, u32)>,
428}
429
430impl PtyTerminalModes {
431    /// Create an empty instance.
432    pub fn new() -> Self {
433        Default::default()
434    }
435
436    /// Adds an opcode with a single `u32` argument.
437    ///
438    /// Only opcodes between 1 and 159 are supported. This method will panic if you try to use it
439    /// with other opcodes. In particular, do not use the `TTY_OP_END` (0) opcode.
440    ///
441    /// The opcodes are defined in [`codes::terminal_mode`][crate::codes::terminal_mode].
442    pub fn add(&mut self, opcode: u8, arg: u32) {
443        assert!((1..160).contains(&opcode));
444        self.opcodes.push((opcode, arg));
445    }
446}
447
448/// Change of terminal window dimensions.
449///
450/// Notification that window dimensions have changed, used with [`Session::window_change()`], as
451/// described in RFC 4254, section 6.7.
452#[derive(Debug, Copy, Clone)]
453pub struct WindowChange {
454    /// Terminal width in characters (e.g. 80).
455    pub width: u32,
456    /// Terminal height in rows (e.g. 24).
457    pub height: u32,
458    /// Terminal width in pixels (e.g. 640).
459    pub width_px: u32,
460    /// Terminal height in pixels (e.g. 480).
461    pub height_px: u32,
462}