makiko 0.2.5

Asynchronous SSH client library in pure Rust
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
use bytes::Bytes;
use futures_core::ready;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::sync::oneshot;
use crate::codec::{PacketDecode, PacketEncode};
use crate::error::{Result, Error};
use super::channel::{
    Channel, ChannelReceiver, ChannelEvent, ChannelConfig,
    ChannelReq, ChannelReply, DATA_STANDARD, DATA_STDERR
};
use super::client::Client;

/// Handle to an SSH session.
///
/// SSH session (RFC 4254, section 6) corresponds to the execution of a single process. The
/// [`Session`] is used to send requests and data to the server, and [`SessionReceiver`] will
/// receive the requests and data from the server. To open the session, use
/// [`Client::open_session()`].
///
/// Once the session is open, you will typically go through three stages:
/// - prepare the execution environment: [`env()`][Self::env()],
/// [`request_pty()`][Self::request_pty()],
/// - start the execution: [`shell()`][Self::shell()], [`exec()`][Self::exec()],
/// [`subsystem()`][Self::subsystem()],
/// - interact with the process: [`send_stdin()`][Self::send_stdin()],
/// [`send_eof()`][Self::send_eof()], [`signal()`][Self::signal()].
///
/// In parallel, you will handle the events produced by [`SessionReceiver`].
///
/// An SSH session is a particular type of an SSH channel. However, this object provides higher
/// level API than a raw [`Channel`].
///
/// You can cheaply clone this object and safely share the clones between tasks.
#[derive(Clone)]
pub struct Session {
    channel: Channel,
}

impl Session {
    pub(super) async fn open(client: &Client, config: ChannelConfig) -> Result<(Session, SessionReceiver)> {
        let (channel, channel_rx, _) = client.open_channel("session".into(), config, Bytes::new()).await?;
        Ok((Session { channel }, SessionReceiver { channel_rx }))
    }

    /// Get the [`Client`] that this session belongs to.
    pub fn client(&self) -> Client {
        self.channel.client()
    }

    /// Close the session.
    ///
    /// We won't send any further requests or data to the server and the session will be closed
    /// once the server acknowledges our request.
    ///
    /// This method is idempotent: if the session is already closed or closing, we do nothing.
    pub fn close(&self) -> Result<()> {
        self.channel.close()
    }
}

/// # Preparing the execution environment
///
/// Use these methods to configure the session before starting the process.
impl Session {
    /// Pass an environment variable to the future process.
    ///
    /// This will set an environment variable for the process that will be started later.
    ///
    /// This method returns immediately without any blocking, but you may use the returned
    /// [`SessionResp`] to wait for the server response.
    pub fn env(&self, name: &[u8], value: &[u8]) -> Result<SessionResp> {
        let (reply_tx, reply_rx) = oneshot::channel();
        let mut payload = PacketEncode::new();
        payload.put_bytes(name);
        payload.put_bytes(value);
        self.channel.send_request(ChannelReq {
            request_type: "env".into(),
            payload: payload.finish(),
            reply_tx: Some(reply_tx),
        })?;
        Ok(SessionResp { reply_rx })
    }

    /// Request a pseudo-terminal (pty) for the future process.
    ///
    /// This will allocate a pseudo-terminal according to the `request`.
    ///
    /// This method returns immediately without any blocking, but you may use the returned
    /// [`SessionResp`] to wait for the server response.
    pub fn request_pty(&self, request: &PtyRequest) -> Result<SessionResp> {
        let (reply_tx, reply_rx) = oneshot::channel();
        let mut payload = PacketEncode::new();
        payload.put_str(&request.term);
        payload.put_u32(request.width);
        payload.put_u32(request.height);
        payload.put_u32(request.width_px);
        payload.put_u32(request.height_px);

        let mut encoded_modes = PacketEncode::new();
        for &(op, arg) in request.modes.opcodes.iter() {
            encoded_modes.put_u8(op);
            encoded_modes.put_u32(arg);
        }
        encoded_modes.put_u8(0);
        payload.put_bytes(&encoded_modes.finish());

        self.channel.send_request(ChannelReq {
            request_type: "pty-req".into(),
            payload: payload.finish(),
            reply_tx: Some(reply_tx),
        })?;
        Ok(SessionResp { reply_rx })
    }
}

/// # Starting the process
///
/// Use one of these methods to start the remote process. Only one of them can succeed, you cannot
/// start multiple processes with a single session (but you may open multiple sessions).
impl Session {
    /// Start the user's default shell on the server.
    ///
    /// This method returns immediately without any blocking, but you may use the returned
    /// [`SessionResp`] to wait for the server response.
    pub fn shell(&self) -> Result<SessionResp> {
        let (reply_tx, reply_rx) = oneshot::channel();
        self.channel.send_request(ChannelReq {
            request_type: "shell".into(),
            payload: Bytes::new(),
            reply_tx: Some(reply_tx),
        })?;
        Ok(SessionResp { reply_rx })
    }

    /// Start a command on the server.
    ///
    /// This method returns immediately without any blocking, but you may use the returned
    /// [`SessionResp`] to wait for the server response.
    pub fn exec(&self, command: &[u8]) -> Result<SessionResp> {
        let (reply_tx, reply_rx) = oneshot::channel();
        let mut payload = PacketEncode::new();
        payload.put_bytes(command);
        self.channel.send_request(ChannelReq {
            request_type: "exec".into(),
            payload: payload.finish(),
            reply_tx: Some(reply_tx),
        })?;
        Ok(SessionResp { reply_rx })
    }

    /// Start an SSH subsystem on the server.
    ///
    /// Subsystems are described in RFC 4254, section 6.5.
    ///
    /// This method returns immediately without any blocking, but you may use the returned
    /// [`SessionResp`] to wait for the server response.
    pub fn subsystem(&self, subsystem_name: &str) -> Result<SessionResp> {
        let (reply_tx, reply_rx) = oneshot::channel();
        let mut payload = PacketEncode::new();
        payload.put_str(subsystem_name);
        self.channel.send_request(ChannelReq {
            request_type: "subsystem".into(),
            payload: payload.finish(),
            reply_tx: Some(reply_tx),
        })?;
        Ok(SessionResp { reply_rx })
    }
}

/// # Interacting with a running process
impl Session {
    /// Send data to the standard input of the running process.
    ///
    /// This method returns after all bytes have been accepted by the flow control mechanism and
    /// written to the internal send buffer, but before we send them to the socket (or other I/O
    /// stream that backs this SSH connection).
    pub async fn send_stdin(&self, data: Bytes) -> Result<()> {
        self.channel.send_data(data, DATA_STANDARD).await
    }

    /// Close the standard input of the running process.
    ///
    /// This method returns after all bytes previously sent to this session have been accepted by
    /// the flow control mechanism, but before we write the message to the socket (or other I/O
    /// stream that backs this SSH connection).
    ///
    /// If the session is closed before you call this method, or if it closes before this method
    /// returns, we quietly ignore this error and return `Ok`.
    pub async fn send_eof(&self) -> Result<()> {
        self.channel.send_eof().await
    }

    /// Deliver a signal to the running process.
    ///
    /// Signal names are described in RFC 4254, section 6.10.
    /// [`codes::signal`][crate::codes::signal] lists the signal names defined by SSH.
    ///
    /// This method returns immediately without any blocking, it is not possible to get a reply
    /// from the server.
    pub fn signal(&self, signal_name: &str) -> Result<()> {
        let mut payload = PacketEncode::new();
        payload.put_str(signal_name);
        self.channel.send_request(ChannelReq {
            request_type: "signal".into(),
            payload: payload.finish(),
            reply_tx: None,
        })?;
        Ok(())
    }

    /// Notify the process that the terminal window size has changed.
    ///
    /// This method returns immediately without any blocking, it is not possible to get a reply
    /// from the server.
    pub fn window_change(&self, change: &WindowChange) -> Result<()> {
        let mut payload = PacketEncode::new();
        payload.put_u32(change.width);
        payload.put_u32(change.height);
        payload.put_u32(change.width_px);
        payload.put_u32(change.height_px);
        self.channel.send_request(ChannelReq {
            request_type: "window-change".into(),
            payload: payload.finish(),
            reply_tx: None,
        })?;
        Ok(())
    }
}



/// Future server response to a [`Session`] request.
///
/// You may either wait for the response using [`.wait()`][Self::wait()], or ignore the response
/// using [`.ignore()`][Self::ignore()].
#[derive(Debug)]
#[must_use = "please use .wait().await to await the response, or .ignore() to ignore it"]
pub struct SessionResp {
    reply_rx: oneshot::Receiver<ChannelReply>,
}

impl SessionResp {
    /// Wait for the response from the server.
    ///
    /// If the request failed, this returns an error ([`Error::ChannelReq`]).
    pub async fn wait(self) -> Result<()> {
        match self.reply_rx.await {
            Ok(ChannelReply::Success) => Ok(()),
            Ok(ChannelReply::Failure) => Err(Error::ChannelReq),
            Err(_) => Err(Error::ChannelClosed),
        }
    }

    /// Ignore the response.
    ///
    /// This just drops the [`SessionResp`], but it is a good practice to do this explicitly
    /// with this method.
    pub fn ignore(self) {}
}



/// An event returned from [`SessionReceiver`].
///
/// These are events related to a particular SSH session, they correspond to the requests and data
/// sent by the server.
///
/// This enum is marked as `#[non_exhaustive]`, so that we can add new variants without breaking
/// backwards compatibility. It should always be safe to ignore any events that you don't intend to
/// handle.
#[derive(Debug)]
#[non_exhaustive]
pub enum SessionEvent {
    /// Data from the standard output of the running process.
    ///
    /// You should handle this data as a byte stream, the boundaries between consecutive
    /// `StdoutData` events might be arbitrary.
    StdoutData(Bytes),

    /// Data from the standard error of the running process.
    ///
    /// You should handle this data as a byte stream, the boundaries between consecutive
    /// `StderrData` events might be arbitrary.
    StderrData(Bytes),

    /// End-of-file marker from the running process.
    ///
    /// After this, the server should not send more data (both stdout and stderr).
    Eof,

    /// The process terminated with given exit status.
    ExitStatus(u32),

    /// The process terminated violently due to a signal.
    ExitSignal(ExitSignal),
}

/// Information about a process that terminated due to a signal.
#[derive(Debug)]
pub struct ExitSignal {
    /// Name of the signal that terminated the process.
    ///
    /// Signal names are described in RFC 4254, section 6.10.
    /// [`codes::signal`][crate::codes::signal] lists the signal names defined by SSH.
    pub signal_name: String,

    /// True if the process produced a core dump.
    pub core_dumped: bool,

    /// Error message.
    pub message: String,

    /// Language tag of `message` (per RFC 3066).
    pub message_lang: String,
}

/// Receiving half of a [`Session`].
///
/// [`SessionReceiver`] produces [`SessionEvent`]s, which correspond to the requests and data sent
/// by the server on the channel. You can ignore these events if you don't need them, but you
/// **must** receive them, otherwise the client will stall when the internal buffer of events fills
/// up.
#[derive(Debug)]
pub struct SessionReceiver {
    channel_rx: ChannelReceiver,
}

impl SessionReceiver {
    /// Wait for the next event.
    ///
    /// Returns `None` if the session was closed.
    pub async fn recv(&mut self) -> Result<Option<SessionEvent>> {
        struct Recv<'a> { rx: &'a mut SessionReceiver }
        impl<'a> Future for Recv<'a> {
            type Output = Result<Option<SessionEvent>>;
            fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
                self.rx.poll_recv(cx)
            }
        }
        Recv { rx: self }.await
    }

    /// Poll-friendly variant of [`.recv()`][Self::recv()].
    pub fn poll_recv(&mut self, cx: &mut Context) -> Poll<Result<Option<SessionEvent>>> {
        loop {
            match ready!(self.channel_rx.poll_recv(cx)) {
                Some(channel_event) => match translate_event(channel_event)? {
                    Some(event) => return Poll::Ready(Ok(Some(event))),
                    None => continue,
                },
                None => return Poll::Ready(Ok(None)),
            }
        }
    }
}

fn translate_event(event: ChannelEvent) -> Result<Option<SessionEvent>> {
    Ok(match event {
        ChannelEvent::Data(data, DATA_STANDARD) =>
            Some(SessionEvent::StdoutData(data)),
        ChannelEvent::Data(data, DATA_STDERR) =>
            Some(SessionEvent::StderrData(data)),
        ChannelEvent::Data(_data, _) =>
            None,
        ChannelEvent::Eof =>
            Some(SessionEvent::Eof),
        ChannelEvent::Request(req) =>
            translate_request(req)?,
    })
}

fn translate_request(request: ChannelReq) -> Result<Option<SessionEvent>> {
    let mut payload = PacketDecode::new(request.payload);
    let event = match request.request_type.as_str() {
        "exit-status" => {
            let status = payload.get_u32()?;
            SessionEvent::ExitStatus(status)
        },
        "exit-signal" => {
            let signal_name = payload.get_string()?;
            let core_dumped = payload.get_bool()?;
            let message = payload.get_string()?;
            let message_lang = payload.get_string()?;
            let signal = ExitSignal { signal_name, core_dumped, message, message_lang };
            SessionEvent::ExitSignal(signal)
        },
        _ =>
            return Ok(None)
    };

    if let Some(reply_tx) = request.reply_tx {
        let _: Result<_, _> = reply_tx.send(ChannelReply::Success);
    }
    Ok(Some(event))
}



/// Pseudo-terminal request.
///
/// Request for allocation of a pseudo-terminal (pty), used with [`Session::request_pty()`], as
/// described in RFC 4254, section 6.2.
#[derive(Debug, Clone, Default)]
pub struct PtyRequest {
    /// Value of the `TERM` environment variable (e.g. `"vt100"`).
    pub term: String,
    /// Terminal width in characters (e.g. 80).
    pub width: u32,
    /// Terminal height in rows (e.g. 24).
    pub height: u32,
    /// Terminal width in pixels (e.g. 640).
    pub width_px: u32,
    /// Terminal height in pixels (e.g. 480).
    pub height_px: u32,
    /// Terminal modes.
    pub modes: PtyTerminalModes,
}

/// Terminal modes for a [`PtyRequest`].
///
/// The terminal modes are encoded as a sequence of opcodes that take a single `u32` argument. For
/// more details, please see RFC 4254, section 8. 
#[derive(Debug, Clone, Default)]
pub struct PtyTerminalModes {
    opcodes: Vec<(u8, u32)>,
}

impl PtyTerminalModes {
    /// Create an empty instance.
    pub fn new() -> Self {
        Default::default()
    }

    /// Adds an opcode with a single `u32` argument.
    ///
    /// Only opcodes between 1 and 159 are supported. This method will panic if you try to use it
    /// with other opcodes. In particular, do not use the `TTY_OP_END` (0) opcode.
    ///
    /// The opcodes are defined in [`codes::terminal_mode`][crate::codes::terminal_mode].
    pub fn add(&mut self, opcode: u8, arg: u32) {
        assert!((1..160).contains(&opcode));
        self.opcodes.push((opcode, arg));
    }
}

/// Change of terminal window dimensions.
///
/// Notification that window dimensions have changed, used with [`Session::window_change()`], as
/// described in RFC 4254, section 6.7.
#[derive(Debug, Copy, Clone)]
pub struct WindowChange {
    /// Terminal width in characters (e.g. 80).
    pub width: u32,
    /// Terminal height in rows (e.g. 24).
    pub height: u32,
    /// Terminal width in pixels (e.g. 640).
    pub width_px: u32,
    /// Terminal height in pixels (e.g. 480).
    pub height_px: u32,
}