wezterm_ssh/
session.rs

1use crate::auth::*;
2use crate::config::ConfigMap;
3use crate::host::*;
4use crate::pty::*;
5use crate::sessioninner::*;
6use crate::sftp::{Sftp, SftpRequest};
7use filedescriptor::{socketpair, FileDescriptor};
8use portable_pty::PtySize;
9use smol::channel::{bounded, Receiver, Sender};
10use std::collections::HashMap;
11use std::io::Write;
12use std::sync::{Arc, Mutex};
13
14#[derive(Debug)]
15pub enum SessionEvent {
16    Banner(Option<String>),
17    HostVerify(HostVerificationEvent),
18    Authenticate(AuthenticationEvent),
19    Error(String),
20    Authenticated,
21}
22
23#[derive(Debug, Clone)]
24pub(crate) struct SessionSender {
25    pub tx: Sender<SessionRequest>,
26    pub pipe: Arc<Mutex<FileDescriptor>>,
27}
28
29impl SessionSender {
30    fn post_send(&self) {
31        let mut pipe = self.pipe.lock().unwrap();
32        let _ = pipe.write(b"x");
33    }
34
35    pub fn try_send(&self, event: SessionRequest) -> anyhow::Result<()> {
36        self.tx.try_send(event)?;
37        self.post_send();
38        Ok(())
39    }
40
41    pub async fn send(&self, event: SessionRequest) -> anyhow::Result<()> {
42        self.tx.send(event).await?;
43        self.post_send();
44        Ok(())
45    }
46}
47
48#[derive(Debug)]
49pub(crate) enum SessionRequest {
50    NewPty(NewPty, Sender<anyhow::Result<(SshPty, SshChildProcess)>>),
51    ResizePty(ResizePty, Option<Sender<anyhow::Result<()>>>),
52    Exec(Exec, Sender<anyhow::Result<ExecResult>>),
53    Sftp(SftpRequest),
54    SignalChannel(SignalChannel),
55}
56
57#[derive(Debug)]
58pub(crate) struct SignalChannel {
59    pub channel: ChannelId,
60    pub signame: &'static str,
61}
62
63#[derive(Debug)]
64pub(crate) struct Exec {
65    pub command_line: String,
66    pub env: Option<HashMap<String, String>>,
67}
68
69#[derive(Clone)]
70pub struct Session {
71    tx: SessionSender,
72}
73
74impl Drop for Session {
75    fn drop(&mut self) {
76        log::trace!("Drop Session");
77    }
78}
79
80impl Session {
81    pub fn connect(config: ConfigMap) -> anyhow::Result<(Self, Receiver<SessionEvent>)> {
82        let (tx_event, rx_event) = bounded(8);
83        let (tx_req, rx_req) = bounded(8);
84        let (mut sender_write, mut sender_read) = socketpair()?;
85        sender_write.set_non_blocking(true)?;
86        sender_read.set_non_blocking(true)?;
87
88        let session_sender = SessionSender {
89            tx: tx_req,
90            pipe: Arc::new(Mutex::new(sender_write)),
91        };
92
93        let mut inner = SessionInner {
94            config,
95            tx_event,
96            rx_req,
97            channels: HashMap::new(),
98            files: HashMap::new(),
99            dirs: HashMap::new(),
100            next_channel_id: 1,
101            next_file_id: 1,
102            sender_read,
103        };
104        std::thread::spawn(move || inner.run());
105        Ok((Self { tx: session_sender }, rx_event))
106    }
107
108    pub async fn request_pty(
109        &self,
110        term: &str,
111        size: PtySize,
112        command_line: Option<&str>,
113        env: Option<HashMap<String, String>>,
114    ) -> anyhow::Result<(SshPty, SshChildProcess)> {
115        let (reply, rx) = bounded(1);
116        self.tx
117            .send(SessionRequest::NewPty(
118                NewPty {
119                    term: term.to_string(),
120                    size,
121                    command_line: command_line.map(|s| s.to_string()),
122                    env,
123                },
124                reply,
125            ))
126            .await?;
127        let (mut ssh_pty, mut child) = rx.recv().await??;
128        ssh_pty.tx.replace(self.tx.clone());
129        child.tx.replace(self.tx.clone());
130        Ok((ssh_pty, child))
131    }
132
133    pub async fn exec(
134        &self,
135        command_line: &str,
136        env: Option<HashMap<String, String>>,
137    ) -> anyhow::Result<ExecResult> {
138        let (reply, rx) = bounded(1);
139        self.tx
140            .send(SessionRequest::Exec(
141                Exec {
142                    command_line: command_line.to_string(),
143                    env,
144                },
145                reply,
146            ))
147            .await?;
148        let mut exec = rx.recv().await??;
149        exec.child.tx.replace(self.tx.clone());
150        Ok(exec)
151    }
152
153    /// Creates a new reference to the sftp channel for filesystem operations
154    ///
155    /// ### Note
156    ///
157    /// This does not actually initialize the sftp subsystem and only provides
158    /// a reference to a means to perform sftp operations. Upon requesting the
159    /// first sftp operation, the sftp subsystem will be initialized.
160    pub fn sftp(&self) -> Sftp {
161        Sftp {
162            tx: self.tx.clone(),
163        }
164    }
165}
166
167#[derive(Debug)]
168pub struct ExecResult {
169    pub stdin: FileDescriptor,
170    pub stdout: FileDescriptor,
171    pub stderr: FileDescriptor,
172    pub child: SshChildProcess,
173}