Skip to main content

tailscale/ssh/
channel_server.rs

1use std::{collections::HashMap, marker::PhantomData, net::SocketAddr, sync::Arc};
2
3use russh::{
4    Channel, ChannelId, Pty, Sig,
5    server::{Auth, Handle, Msg, Session},
6};
7use tokio::{
8    sync::{mpsc, mpsc::UnboundedSender},
9    task::JoinSet,
10};
11
12use crate::{
13    Device,
14    ssh::{SshAccept, TailnetServer},
15};
16
17type Request = (ChannelId, ChannelEvent);
18
19/// Handler for a channel session.
20pub trait ChannelHandler: Sized {
21    /// Error this handler produces.
22    type Error: Into<std::io::Error> + std::error::Error;
23
24    /// Construct a new per-channel handler.
25    ///
26    /// `accept` is the [`SshAccept`] produced by the single fail-closed authorization decision in
27    /// [`auth_none`][russh::server::Handler::auth_none]; in particular its
28    /// [`local_user`][SshAccept::local_user] is the policy-mapped identity the session must run as.
29    /// Handlers MUST NOT re-evaluate policy or substitute a different user — the accepted identity
30    /// is the sole authorization source.
31    fn new(
32        handle: tokio::runtime::Handle,
33        channel_id: ChannelId,
34        session: Handle,
35        dev: Arc<Device>,
36        accept: &SshAccept,
37    ) -> Result<Self, Self::Error>;
38
39    /// Handle an event from the channel.
40    fn handle_event(
41        &mut self,
42        event: &ChannelEvent,
43    ) -> impl Future<Output = Result<(), Self::Error>> + Send;
44}
45
46/// Implementation of [`russh::server::Handler`] which provides per-channel session
47/// handlers using a parametric [`ChannelHandler`].
48///
49/// Primary motivation is to support custom console or TUI sessions over tailnet SSH
50/// connections.
51///
52/// # Authentication and authorization
53///
54/// Incoming connections are gated by the control-pushed Tailscale SSH policy: [`auth_none`]
55/// resolves the source IP to a known tailnet peer and evaluates the policy via
56/// [`Device::authorize_ssh`][crate::Device::authorize_ssh] (fail-closed — an unknown peer, an
57/// absent policy, or a non-matching policy all reject). The `ssh` policy block's accept/reject
58/// rules, principal matching, and SSH-user mapping are honored. A rule that **demands** session
59/// recording (non-empty `recorders`) or `holdAndDelegate` is enforced **fail-closed**: since this
60/// fork has no recorder transport / delegate round-trip yet, such a session is refused rather than
61/// silently accepted un-recorded (see [`auth_none`]). Building those transports is deferred.
62///
63/// [`auth_none`]: russh::server::Handler::auth_none
64pub struct ChannelServer<H> {
65    channel_state: HashMap<ChannelId, ChannelState>,
66    remote: SocketAddr,
67    dev: Arc<Device>,
68    /// The accepted identity from the single [`auth_none`][russh::server::Handler::auth_none]
69    /// authorization decision, stashed so per-channel handlers run as the policy-mapped user.
70    /// `None` until a successful `auth_none`; a channel open with `None` here fails closed.
71    accepted: Option<SshAccept>,
72    _handler: PhantomSend<H>,
73}
74
75struct PhantomSend<H>(PhantomData<fn() -> H>);
76
77/// Maximum number of concurrent channels a single SSH connection may open. Each channel spawns a
78/// session handler (e.g. a login shell), so this caps the per-connection resource/process fan-out
79/// an authorized-but-hostile peer can induce. SSH clients realistically open one (or a few)
80/// sessions per connection, so this is generous for legitimate use.
81const MAX_CHANNELS_PER_CONN: usize = 16;
82
83/// Whether a connection at `open_channels` currently-open channels has reached the per-connection
84/// channel cap and must refuse the next channel open. Pure boundary predicate extracted from
85/// [`ChannelServer::channel_open_session`] so the fork-bomb guard's edge can be unit-tested without
86/// a live russh [`Session`].
87fn at_channel_cap(open_channels: usize) -> bool {
88    open_channels >= MAX_CHANNELS_PER_CONN
89}
90
91/// Fallback message logged when a `recording_required` session is refused and the policy supplied
92/// no message of its own.
93const DEFAULT_RECORDING_REFUSAL: &str =
94    "policy requires session recording but recording is not available";
95
96/// The fail-closed recording gate (tsr-0h2), extracted as a pure predicate so it can be unit-tested
97/// without a live russh [`Session`]/[`Device`] (mirrors [`at_channel_cap`]).
98///
99/// Returns `Some(message)` when the accepted session must be **refused** because the matched rule
100/// demands a capability this fork cannot provide — session recording (non-empty `recorders`) or a
101/// `holdAndDelegate` decision (both surfaced as [`SshAccept::recording_required`]) — and there is no
102/// recorder/delegate transport yet. The message is the policy's
103/// [`recording_refusal_message`][crate::ssh::SshAccept::recording_refusal_message] when non-empty,
104/// else [`DEFAULT_RECORDING_REFUSAL`]. Returns `None` for the common case (no recorders, no
105/// delegate), so those sessions accept unchanged.
106///
107/// TODO(tsr-0h2 follow-up): once the recorder stream transport exists (dial `recorders`, asciinema/
108/// CastV2 stream, tee PTY I/O at `shell.rs`) — and a Noise control round-trip backs `holdAndDelegate`
109/// — relax this to Go `tailssh`'s true default: fail-OPEN on a recorder-connect failure UNLESS
110/// `on_recording_failure.reject_session_with_message` is set. Until then, refuse rather than record
111/// nothing.
112fn recording_refusal(accept: &SshAccept) -> Option<String> {
113    if !accept.recording_required {
114        return None;
115    }
116    if accept.recording_refusal_message.is_empty() {
117        Some(DEFAULT_RECORDING_REFUSAL.to_string())
118    } else {
119        Some(accept.recording_refusal_message.clone())
120    }
121}
122
123#[derive(thiserror::Error, Debug, Copy, Clone, PartialEq, Eq)]
124#[error("no such channel")]
125struct NoChannel;
126
127/// State of a channel in [`ChannelServer`].
128struct ChannelState {
129    channel: ChannelId,
130    tx: UnboundedSender<Request>,
131    _joinset: JoinSet<()>,
132}
133
134impl ChannelState {
135    fn send(&self, event: ChannelEvent) {
136        if self.tx.send((self.channel, event)).is_err() {
137            tracing::error!(channel = %self.channel, "failed to send event");
138        }
139    }
140}
141
142impl<H> ChannelServer<H> {
143    fn get_channel(
144        &mut self,
145        id: ChannelId,
146    ) -> Result<&mut ChannelState, Box<dyn std::error::Error + Send + Sync + 'static>> {
147        self.channel_state.get_mut(&id).ok_or(Box::new(NoChannel))
148    }
149}
150
151impl<H> TailnetServer for ChannelServer<H> {
152    fn new_client(dev: Arc<Device>, addr: SocketAddr) -> Self {
153        Self {
154            channel_state: Default::default(),
155            dev,
156            remote: addr,
157            accepted: None,
158            _handler: PhantomSend(PhantomData),
159        }
160    }
161}
162
163/// An event that may be generated by a channel connected to a [`ChannelServer`].
164#[derive(Debug, Clone)]
165pub enum ChannelEvent {
166    /// Data was received over the channel.
167    Data(Vec<u8>),
168    /// A resize event occurred.
169    Resize {
170        /// The new width of the tty.
171        width: u16,
172        /// The new height of the tty.
173        height: u16,
174    },
175    /// A signal was sent over the channel.
176    Signal(Sig),
177    /// The channel was closed.
178    Close,
179    /// The channel received EOF.
180    Eof,
181}
182
183impl<H> russh::server::Handler for ChannelServer<H>
184where
185    H: ChannelHandler + Send,
186    H::Error: Send,
187{
188    type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
189
190    #[tracing::instrument(skip_all, fields(user = %user, remote = ?self.remote))]
191    async fn auth_none(&mut self, user: &str) -> Result<Auth, Self::Error> {
192        // Enforce the control-pushed Tailscale SSH policy. Fail-closed: an unknown source, an
193        // absent policy, a non-matching policy, or any lookup error all reject the connection.
194        match self.dev.authorize_ssh(self.remote, user).await {
195            Ok(crate::ssh::SshDecision::Accept(accept)) => {
196                // SECURITY (tsr-0h2): a matched rule that DEMANDS session recording (non-empty
197                // `recorders`) — or a `holdAndDelegate` decision — cannot be honored because this
198                // fork has no recorder transport / delegate round-trip yet. Refuse the session
199                // (fail-closed) rather than silently downgrade it to a plain accept. This mirrors
200                // Go `tailssh`'s posture when `OnRecordingFailure.RejectSessionWithMessage` is set.
201                // `Auth::reject()` (the SSH `none`-method rejection) carries no client-visible
202                // message, so the policy's refusal message is surfaced in the warning log.
203                if let Some(msg) = recording_refusal(&accept) {
204                    tracing::warn!(
205                        local_user = %accept.local_user,
206                        recorders = ?accept.recorders,
207                        message = %msg,
208                        "ssh: session refused: policy requires session recording but recording is not available"
209                    );
210                    return Ok(Auth::reject());
211                }
212                tracing::debug!(
213                    local_user = %accept.local_user,
214                    "ssh: policy accepted connection"
215                );
216                // Stash the accepted identity so the per-channel handler runs as the
217                // policy-mapped local user. This is the single fail-closed authorization point;
218                // the handler never re-evaluates policy.
219                self.accepted = Some(accept);
220                Ok(Auth::Accept)
221            }
222            Ok(crate::ssh::SshDecision::Deny(reason)) => {
223                tracing::warn!(?reason, "ssh: policy denied connection");
224                Ok(Auth::reject())
225            }
226            Err(e) => {
227                tracing::error!(error = %e, "ssh: authorization failed; rejecting");
228                Ok(Auth::reject())
229            }
230        }
231    }
232
233    async fn channel_open_session(
234        &mut self,
235        channel: Channel<Msg>,
236        session: &mut Session,
237    ) -> Result<bool, Self::Error> {
238        tracing::debug!(channel = ?channel.id(), "new session");
239
240        // Fail closed: a channel open must be preceded by a successful `auth_none` that stashed
241        // the accepted identity. If it is somehow absent, refuse to open the channel rather than
242        // run a handler with no authorized user.
243        let Some(accept) = self.accepted.clone() else {
244            tracing::error!(
245                channel = ?channel.id(),
246                "ssh: channel open with no accepted identity; refusing"
247            );
248            return Ok(false);
249        };
250
251        // Bound the number of concurrent channels (each opens a session/handler — e.g. a login
252        // shell). Without this an authorized-but-hostile peer could open unbounded channels on one
253        // connection and fork-bomb the host with session handlers. Past the cap, refuse new channels.
254        if at_channel_cap(self.channel_state.len()) {
255            tracing::warn!(
256                channel = ?channel.id(),
257                cap = MAX_CHANNELS_PER_CONN,
258                "ssh: per-connection channel cap reached; refusing new channel"
259            );
260            return Ok(false);
261        }
262
263        let (tx, mut rx) = mpsc::unbounded_channel::<Request>();
264        let mut joinset = JoinSet::new();
265
266        let (channel_id, session_handle) = (channel.id(), session.handle());
267        let dev = self.dev.clone();
268
269        joinset.spawn(async move {
270            let rt = tokio::runtime::Handle::current();
271
272            let mut handler = match H::new(rt, channel_id, session_handle.clone(), dev, &accept) {
273                Ok(handler) => handler,
274                Err(e) => {
275                    let e = e.into();
276                    tracing::error!(error = %e, %channel_id, "spawning channel handler");
277
278                    if session_handle.close(channel_id).await.is_err() {
279                        tracing::error!("failed closing channel after handler init error");
280                    };
281
282                    return;
283                }
284            };
285
286            while let Some((_channel, evt)) = rx.recv().await {
287                let result = handler.handle_event(&evt).await;
288
289                if let Err(e) = result {
290                    let e = e.into();
291                    tracing::error!(error = %e, %channel_id, ?evt, "handling event");
292
293                    if session_handle.close(channel_id).await.is_err() {
294                        tracing::error!("failed closing channel after event handler error");
295                    };
296
297                    break;
298                }
299            }
300
301            tracing::debug!(?channel_id, "closed");
302        });
303
304        self.channel_state.insert(
305            channel.id(),
306            ChannelState {
307                channel: channel.id(),
308                tx,
309                _joinset: joinset,
310            },
311        );
312
313        session.channel_success(channel.id())?;
314
315        Ok(true)
316    }
317
318    async fn channel_close(
319        &mut self,
320        channel: ChannelId,
321        session: &mut Session,
322    ) -> Result<(), Self::Error> {
323        tracing::trace!(?channel, "session closed");
324
325        self.get_channel(channel)?.send(ChannelEvent::Close);
326        self.channel_state.remove(&channel);
327
328        session.channel_success(channel)?;
329
330        Ok(())
331    }
332
333    async fn signal(
334        &mut self,
335        channel: ChannelId,
336        signal: Sig,
337        session: &mut Session,
338    ) -> Result<(), Self::Error> {
339        self.get_channel(channel)?
340            .send(ChannelEvent::Signal(signal));
341        session.channel_success(channel)?;
342
343        Ok(())
344    }
345
346    async fn data(
347        &mut self,
348        channel: ChannelId,
349        data: &[u8],
350        session: &mut Session,
351    ) -> Result<(), Self::Error> {
352        self.get_channel(channel)?
353            .send(ChannelEvent::Data(data.into()));
354
355        session.channel_success(channel)?;
356
357        Ok(())
358    }
359
360    async fn channel_eof(
361        &mut self,
362        channel: ChannelId,
363        session: &mut Session,
364    ) -> Result<(), Self::Error> {
365        self.get_channel(channel)?.send(ChannelEvent::Eof);
366        session.channel_success(channel)?;
367
368        Ok(())
369    }
370
371    async fn window_change_request(
372        &mut self,
373        channel: ChannelId,
374        col_width: u32,
375        row_height: u32,
376        _: u32,
377        _: u32,
378        session: &mut Session,
379    ) -> Result<(), Self::Error> {
380        self.get_channel(channel)?.send(ChannelEvent::Resize {
381            width: col_width as _,
382            height: row_height as _,
383        });
384
385        session.channel_success(channel)?;
386
387        Ok(())
388    }
389
390    async fn pty_request(
391        &mut self,
392        channel: ChannelId,
393        _: &str,
394        col_width: u32,
395        row_height: u32,
396        _: u32,
397        _: u32,
398        _: &[(Pty, u32)],
399        session: &mut Session,
400    ) -> Result<(), Self::Error> {
401        self.get_channel(channel)?.send(ChannelEvent::Resize {
402            width: col_width as _,
403            height: row_height as _,
404        });
405
406        session.channel_success(channel)?;
407
408        Ok(())
409    }
410}
411
412#[cfg(test)]
413mod tests {
414    use super::{
415        DEFAULT_RECORDING_REFUSAL, MAX_CHANNELS_PER_CONN, at_channel_cap, recording_refusal,
416    };
417    use crate::ssh::SshAccept;
418
419    /// The per-connection channel cap (fork-bomb guard) refuses at and beyond `MAX_CHANNELS_PER_CONN`
420    /// and allows below it. Pins the exact boundary: a `>=`→`>` flip would let `MAX_CHANNELS_PER_CONN`
421    /// open channels become `MAX_CHANNELS_PER_CONN + 1`, failing the `== cap` assertion below.
422    #[test]
423    fn channel_cap_boundary_is_inclusive() {
424        // Below the cap: still allowed.
425        assert!(!at_channel_cap(MAX_CHANNELS_PER_CONN - 1));
426        assert!(!at_channel_cap(15));
427        // At the cap: refuse the next open (the channel that would make it 17).
428        assert!(at_channel_cap(MAX_CHANNELS_PER_CONN));
429        assert!(at_channel_cap(16));
430        // Above the cap (defensive): still refused.
431        assert!(at_channel_cap(17));
432        // The const itself is the documented value.
433        assert_eq!(MAX_CHANNELS_PER_CONN, 16);
434    }
435
436    fn accept(recording_required: bool, refusal_message: &str) -> SshAccept {
437        SshAccept {
438            local_user: "root".to_string(),
439            accept_env: Vec::new(),
440            session_duration_nanos: None,
441            allow_agent_forwarding: false,
442            allow_local_port_forwarding: false,
443            allow_remote_port_forwarding: false,
444            recorders: Vec::new(),
445            recording_required,
446            recording_refusal_message: refusal_message.to_string(),
447        }
448    }
449
450    /// tsr-0h2: an accept that demands recording must be REFUSED (the bypass is closed). With a
451    /// policy-supplied message, that exact message is used; without one, the default is logged.
452    #[test]
453    fn recording_required_accept_is_refused() {
454        // Policy-supplied refusal message wins.
455        assert_eq!(
456            recording_refusal(&accept(true, "recording required by policy")),
457            Some("recording required by policy".to_string()),
458        );
459        // No message → default refusal text, but still a refusal (Some).
460        assert_eq!(
461            recording_refusal(&accept(true, "")),
462            Some(DEFAULT_RECORDING_REFUSAL.to_string()),
463        );
464    }
465
466    /// Regression guard for the common path: a normal accept (no recording demanded) is NOT refused,
467    /// so the gate is a no-op and the session proceeds.
468    #[test]
469    fn normal_accept_is_not_refused() {
470        assert_eq!(recording_refusal(&accept(false, "")), None);
471        // Even a stray non-empty message never forces a refusal when recording isn't required.
472        assert_eq!(recording_refusal(&accept(false, "ignored")), None);
473    }
474}