makiko/client/session.rs
1use bytes::Bytes;
2use futures_core::ready;
3use std::future::Future;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6use tokio::sync::oneshot;
7use crate::codec::{PacketDecode, PacketEncode};
8use crate::error::{Result, Error};
9use super::channel::{
10 Channel, ChannelReceiver, ChannelEvent, ChannelConfig,
11 ChannelReq, ChannelReply, DATA_STANDARD, DATA_STDERR
12};
13use super::client::Client;
14
15/// Handle to an SSH session.
16///
17/// SSH session (RFC 4254, section 6) corresponds to the execution of a single process. The
18/// [`Session`] is used to send requests and data to the server, and [`SessionReceiver`] will
19/// receive the requests and data from the server. To open the session, use
20/// [`Client::open_session()`].
21///
22/// Once the session is open, you will typically go through three stages:
23/// - prepare the execution environment: [`env()`][Self::env()],
24/// [`request_pty()`][Self::request_pty()],
25/// - start the execution: [`shell()`][Self::shell()], [`exec()`][Self::exec()],
26/// [`subsystem()`][Self::subsystem()],
27/// - interact with the process: [`send_stdin()`][Self::send_stdin()],
28/// [`send_eof()`][Self::send_eof()], [`signal()`][Self::signal()].
29///
30/// In parallel, you will handle the events produced by [`SessionReceiver`].
31///
32/// An SSH session is a particular type of an SSH channel. However, this object provides higher
33/// level API than a raw [`Channel`].
34///
35/// You can cheaply clone this object and safely share the clones between tasks.
36#[derive(Clone)]
37pub struct Session {
38 channel: Channel,
39}
40
41impl Session {
42 pub(super) async fn open(client: &Client, config: ChannelConfig) -> Result<(Session, SessionReceiver)> {
43 let (channel, channel_rx, _) = client.open_channel("session".into(), config, Bytes::new()).await?;
44 Ok((Session { channel }, SessionReceiver { channel_rx }))
45 }
46
47 /// Get the [`Client`] that this session belongs to.
48 pub fn client(&self) -> Client {
49 self.channel.client()
50 }
51
52 /// Close the session.
53 ///
54 /// We won't send any further requests or data to the server and the session will be closed
55 /// once the server acknowledges our request.
56 ///
57 /// This method is idempotent: if the session is already closed or closing, we do nothing.
58 pub fn close(&self) -> Result<()> {
59 self.channel.close()
60 }
61}
62
63/// # Preparing the execution environment
64///
65/// Use these methods to configure the session before starting the process.
66impl Session {
67 /// Pass an environment variable to the future process.
68 ///
69 /// This will set an environment variable for the process that will be started later.
70 ///
71 /// This method returns immediately without any blocking, but you may use the returned
72 /// [`SessionResp`] to wait for the server response.
73 pub fn env(&self, name: &[u8], value: &[u8]) -> Result<SessionResp> {
74 let (reply_tx, reply_rx) = oneshot::channel();
75 let mut payload = PacketEncode::new();
76 payload.put_bytes(name);
77 payload.put_bytes(value);
78 self.channel.send_request(ChannelReq {
79 request_type: "env".into(),
80 payload: payload.finish(),
81 reply_tx: Some(reply_tx),
82 })?;
83 Ok(SessionResp { reply_rx })
84 }
85
86 /// Request a pseudo-terminal (pty) for the future process.
87 ///
88 /// This will allocate a pseudo-terminal according to the `request`.
89 ///
90 /// This method returns immediately without any blocking, but you may use the returned
91 /// [`SessionResp`] to wait for the server response.
92 pub fn request_pty(&self, request: &PtyRequest) -> Result<SessionResp> {
93 let (reply_tx, reply_rx) = oneshot::channel();
94 let mut payload = PacketEncode::new();
95 payload.put_str(&request.term);
96 payload.put_u32(request.width);
97 payload.put_u32(request.height);
98 payload.put_u32(request.width_px);
99 payload.put_u32(request.height_px);
100
101 let mut encoded_modes = PacketEncode::new();
102 for &(op, arg) in request.modes.opcodes.iter() {
103 encoded_modes.put_u8(op);
104 encoded_modes.put_u32(arg);
105 }
106 encoded_modes.put_u8(0);
107 payload.put_bytes(&encoded_modes.finish());
108
109 self.channel.send_request(ChannelReq {
110 request_type: "pty-req".into(),
111 payload: payload.finish(),
112 reply_tx: Some(reply_tx),
113 })?;
114 Ok(SessionResp { reply_rx })
115 }
116}
117
118/// # Starting the process
119///
120/// Use one of these methods to start the remote process. Only one of them can succeed, you cannot
121/// start multiple processes with a single session (but you may open multiple sessions).
122impl Session {
123 /// Start the user's default shell on the server.
124 ///
125 /// This method returns immediately without any blocking, but you may use the returned
126 /// [`SessionResp`] to wait for the server response.
127 pub fn shell(&self) -> Result<SessionResp> {
128 let (reply_tx, reply_rx) = oneshot::channel();
129 self.channel.send_request(ChannelReq {
130 request_type: "shell".into(),
131 payload: Bytes::new(),
132 reply_tx: Some(reply_tx),
133 })?;
134 Ok(SessionResp { reply_rx })
135 }
136
137 /// Start a command on the server.
138 ///
139 /// This method returns immediately without any blocking, but you may use the returned
140 /// [`SessionResp`] to wait for the server response.
141 pub fn exec(&self, command: &[u8]) -> Result<SessionResp> {
142 let (reply_tx, reply_rx) = oneshot::channel();
143 let mut payload = PacketEncode::new();
144 payload.put_bytes(command);
145 self.channel.send_request(ChannelReq {
146 request_type: "exec".into(),
147 payload: payload.finish(),
148 reply_tx: Some(reply_tx),
149 })?;
150 Ok(SessionResp { reply_rx })
151 }
152
153 /// Start an SSH subsystem on the server.
154 ///
155 /// Subsystems are described in RFC 4254, section 6.5.
156 ///
157 /// This method returns immediately without any blocking, but you may use the returned
158 /// [`SessionResp`] to wait for the server response.
159 pub fn subsystem(&self, subsystem_name: &str) -> Result<SessionResp> {
160 let (reply_tx, reply_rx) = oneshot::channel();
161 let mut payload = PacketEncode::new();
162 payload.put_str(subsystem_name);
163 self.channel.send_request(ChannelReq {
164 request_type: "subsystem".into(),
165 payload: payload.finish(),
166 reply_tx: Some(reply_tx),
167 })?;
168 Ok(SessionResp { reply_rx })
169 }
170}
171
172/// # Interacting with a running process
173impl Session {
174 /// Send data to the standard input of the running process.
175 ///
176 /// This method returns after all bytes have been accepted by the flow control mechanism and
177 /// written to the internal send buffer, but before we send them to the socket (or other I/O
178 /// stream that backs this SSH connection).
179 pub async fn send_stdin(&self, data: Bytes) -> Result<()> {
180 self.channel.send_data(data, DATA_STANDARD).await
181 }
182
183 /// Close the standard input of the running process.
184 ///
185 /// This method returns after all bytes previously sent to this session have been accepted by
186 /// the flow control mechanism, but before we write the message to the socket (or other I/O
187 /// stream that backs this SSH connection).
188 ///
189 /// If the session is closed before you call this method, or if it closes before this method
190 /// returns, we quietly ignore this error and return `Ok`.
191 pub async fn send_eof(&self) -> Result<()> {
192 self.channel.send_eof().await
193 }
194
195 /// Deliver a signal to the running process.
196 ///
197 /// Signal names are described in RFC 4254, section 6.10.
198 /// [`codes::signal`][crate::codes::signal] lists the signal names defined by SSH.
199 ///
200 /// This method returns immediately without any blocking, it is not possible to get a reply
201 /// from the server.
202 pub fn signal(&self, signal_name: &str) -> Result<()> {
203 let mut payload = PacketEncode::new();
204 payload.put_str(signal_name);
205 self.channel.send_request(ChannelReq {
206 request_type: "signal".into(),
207 payload: payload.finish(),
208 reply_tx: None,
209 })?;
210 Ok(())
211 }
212
213 /// Notify the process that the terminal window size has changed.
214 ///
215 /// This method returns immediately without any blocking, it is not possible to get a reply
216 /// from the server.
217 pub fn window_change(&self, change: &WindowChange) -> Result<()> {
218 let mut payload = PacketEncode::new();
219 payload.put_u32(change.width);
220 payload.put_u32(change.height);
221 payload.put_u32(change.width_px);
222 payload.put_u32(change.height_px);
223 self.channel.send_request(ChannelReq {
224 request_type: "window-change".into(),
225 payload: payload.finish(),
226 reply_tx: None,
227 })?;
228 Ok(())
229 }
230}
231
232
233
234/// Future server response to a [`Session`] request.
235///
236/// You may either wait for the response using [`.wait()`][Self::wait()], or ignore the response
237/// using [`.ignore()`][Self::ignore()].
238#[derive(Debug)]
239#[must_use = "please use .wait().await to await the response, or .ignore() to ignore it"]
240pub struct SessionResp {
241 reply_rx: oneshot::Receiver<ChannelReply>,
242}
243
244impl SessionResp {
245 /// Wait for the response from the server.
246 ///
247 /// If the request failed, this returns an error ([`Error::ChannelReq`]).
248 pub async fn wait(self) -> Result<()> {
249 match self.reply_rx.await {
250 Ok(ChannelReply::Success) => Ok(()),
251 Ok(ChannelReply::Failure) => Err(Error::ChannelReq),
252 Err(_) => Err(Error::ChannelClosed),
253 }
254 }
255
256 /// Ignore the response.
257 ///
258 /// This just drops the [`SessionResp`], but it is a good practice to do this explicitly
259 /// with this method.
260 pub fn ignore(self) {}
261}
262
263
264
265/// An event returned from [`SessionReceiver`].
266///
267/// These are events related to a particular SSH session, they correspond to the requests and data
268/// sent by the server.
269///
270/// This enum is marked as `#[non_exhaustive]`, so that we can add new variants without breaking
271/// backwards compatibility. It should always be safe to ignore any events that you don't intend to
272/// handle.
273#[derive(Debug)]
274#[non_exhaustive]
275pub enum SessionEvent {
276 /// Data from the standard output of the running process.
277 ///
278 /// You should handle this data as a byte stream, the boundaries between consecutive
279 /// `StdoutData` events might be arbitrary.
280 StdoutData(Bytes),
281
282 /// Data from the standard error of the running process.
283 ///
284 /// You should handle this data as a byte stream, the boundaries between consecutive
285 /// `StderrData` events might be arbitrary.
286 StderrData(Bytes),
287
288 /// End-of-file marker from the running process.
289 ///
290 /// After this, the server should not send more data (both stdout and stderr).
291 Eof,
292
293 /// The process terminated with given exit status.
294 ExitStatus(u32),
295
296 /// The process terminated violently due to a signal.
297 ExitSignal(ExitSignal),
298}
299
300/// Information about a process that terminated due to a signal.
301#[derive(Debug)]
302pub struct ExitSignal {
303 /// Name of the signal that terminated the process.
304 ///
305 /// Signal names are described in RFC 4254, section 6.10.
306 /// [`codes::signal`][crate::codes::signal] lists the signal names defined by SSH.
307 pub signal_name: String,
308
309 /// True if the process produced a core dump.
310 pub core_dumped: bool,
311
312 /// Error message.
313 pub message: String,
314
315 /// Language tag of `message` (per RFC 3066).
316 pub message_lang: String,
317}
318
319/// Receiving half of a [`Session`].
320///
321/// [`SessionReceiver`] produces [`SessionEvent`]s, which correspond to the requests and data sent
322/// by the server on the channel. You can ignore these events if you don't need them, but you
323/// **must** receive them, otherwise the client will stall when the internal buffer of events fills
324/// up.
325#[derive(Debug)]
326pub struct SessionReceiver {
327 channel_rx: ChannelReceiver,
328}
329
330impl SessionReceiver {
331 /// Wait for the next event.
332 ///
333 /// Returns `None` if the session was closed.
334 pub async fn recv(&mut self) -> Result<Option<SessionEvent>> {
335 struct Recv<'a> { rx: &'a mut SessionReceiver }
336 impl<'a> Future for Recv<'a> {
337 type Output = Result<Option<SessionEvent>>;
338 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
339 self.rx.poll_recv(cx)
340 }
341 }
342 Recv { rx: self }.await
343 }
344
345 /// Poll-friendly variant of [`.recv()`][Self::recv()].
346 pub fn poll_recv(&mut self, cx: &mut Context) -> Poll<Result<Option<SessionEvent>>> {
347 loop {
348 match ready!(self.channel_rx.poll_recv(cx)) {
349 Some(channel_event) => match translate_event(channel_event)? {
350 Some(event) => return Poll::Ready(Ok(Some(event))),
351 None => continue,
352 },
353 None => return Poll::Ready(Ok(None)),
354 }
355 }
356 }
357}
358
359fn translate_event(event: ChannelEvent) -> Result<Option<SessionEvent>> {
360 Ok(match event {
361 ChannelEvent::Data(data, DATA_STANDARD) =>
362 Some(SessionEvent::StdoutData(data)),
363 ChannelEvent::Data(data, DATA_STDERR) =>
364 Some(SessionEvent::StderrData(data)),
365 ChannelEvent::Data(_data, _) =>
366 None,
367 ChannelEvent::Eof =>
368 Some(SessionEvent::Eof),
369 ChannelEvent::Request(req) =>
370 translate_request(req)?,
371 })
372}
373
374fn translate_request(request: ChannelReq) -> Result<Option<SessionEvent>> {
375 let mut payload = PacketDecode::new(request.payload);
376 let event = match request.request_type.as_str() {
377 "exit-status" => {
378 let status = payload.get_u32()?;
379 SessionEvent::ExitStatus(status)
380 },
381 "exit-signal" => {
382 let signal_name = payload.get_string()?;
383 let core_dumped = payload.get_bool()?;
384 let message = payload.get_string()?;
385 let message_lang = payload.get_string()?;
386 let signal = ExitSignal { signal_name, core_dumped, message, message_lang };
387 SessionEvent::ExitSignal(signal)
388 },
389 _ =>
390 return Ok(None)
391 };
392
393 if let Some(reply_tx) = request.reply_tx {
394 let _: Result<_, _> = reply_tx.send(ChannelReply::Success);
395 }
396 Ok(Some(event))
397}
398
399
400
401/// Pseudo-terminal request.
402///
403/// Request for allocation of a pseudo-terminal (pty), used with [`Session::request_pty()`], as
404/// described in RFC 4254, section 6.2.
405#[derive(Debug, Clone, Default)]
406pub struct PtyRequest {
407 /// Value of the `TERM` environment variable (e.g. `"vt100"`).
408 pub term: String,
409 /// Terminal width in characters (e.g. 80).
410 pub width: u32,
411 /// Terminal height in rows (e.g. 24).
412 pub height: u32,
413 /// Terminal width in pixels (e.g. 640).
414 pub width_px: u32,
415 /// Terminal height in pixels (e.g. 480).
416 pub height_px: u32,
417 /// Terminal modes.
418 pub modes: PtyTerminalModes,
419}
420
421/// Terminal modes for a [`PtyRequest`].
422///
423/// The terminal modes are encoded as a sequence of opcodes that take a single `u32` argument. For
424/// more details, please see RFC 4254, section 8.
425#[derive(Debug, Clone, Default)]
426pub struct PtyTerminalModes {
427 opcodes: Vec<(u8, u32)>,
428}
429
430impl PtyTerminalModes {
431 /// Create an empty instance.
432 pub fn new() -> Self {
433 Default::default()
434 }
435
436 /// Adds an opcode with a single `u32` argument.
437 ///
438 /// Only opcodes between 1 and 159 are supported. This method will panic if you try to use it
439 /// with other opcodes. In particular, do not use the `TTY_OP_END` (0) opcode.
440 ///
441 /// The opcodes are defined in [`codes::terminal_mode`][crate::codes::terminal_mode].
442 pub fn add(&mut self, opcode: u8, arg: u32) {
443 assert!((1..160).contains(&opcode));
444 self.opcodes.push((opcode, arg));
445 }
446}
447
448/// Change of terminal window dimensions.
449///
450/// Notification that window dimensions have changed, used with [`Session::window_change()`], as
451/// described in RFC 4254, section 6.7.
452#[derive(Debug, Copy, Clone)]
453pub struct WindowChange {
454 /// Terminal width in characters (e.g. 80).
455 pub width: u32,
456 /// Terminal height in rows (e.g. 24).
457 pub height: u32,
458 /// Terminal width in pixels (e.g. 640).
459 pub width_px: u32,
460 /// Terminal height in pixels (e.g. 480).
461 pub height_px: u32,
462}