tailscale/ssh/channel_server.rs
1use std::{collections::HashMap, marker::PhantomData, net::SocketAddr, sync::Arc};
2
3use russh::{
4 Channel, ChannelId, Pty, Sig,
5 server::{Auth, Handle, Msg, Session},
6};
7use tokio::{
8 sync::{mpsc, mpsc::UnboundedSender},
9 task::JoinSet,
10};
11
12use crate::{
13 Device,
14 ssh::{SshAccept, TailnetServer},
15};
16
17type Request = (ChannelId, ChannelEvent);
18
19/// Handler for a channel session.
20pub trait ChannelHandler: Sized {
21 /// Error this handler produces.
22 type Error: Into<std::io::Error> + std::error::Error;
23
24 /// Construct a new per-channel handler.
25 ///
26 /// `accept` is the [`SshAccept`] produced by the single fail-closed authorization decision in
27 /// [`auth_none`][russh::server::Handler::auth_none]; in particular its
28 /// [`local_user`][SshAccept::local_user] is the policy-mapped identity the session must run as.
29 /// Handlers MUST NOT re-evaluate policy or substitute a different user — the accepted identity
30 /// is the sole authorization source.
31 fn new(
32 handle: tokio::runtime::Handle,
33 channel_id: ChannelId,
34 session: Handle,
35 dev: Arc<Device>,
36 accept: &SshAccept,
37 ) -> Result<Self, Self::Error>;
38
39 /// Handle an event from the channel.
40 fn handle_event(
41 &mut self,
42 event: &ChannelEvent,
43 ) -> impl Future<Output = Result<(), Self::Error>> + Send;
44}
45
46/// Implementation of [`russh::server::Handler`] which provides per-channel session
47/// handlers using a parametric [`ChannelHandler`].
48///
49/// Primary motivation is to support custom console or TUI sessions over tailnet SSH
50/// connections.
51///
52/// # Authentication and authorization
53///
54/// Incoming connections are gated by the control-pushed Tailscale SSH policy: [`auth_none`]
55/// resolves the source IP to a known tailnet peer and evaluates the policy via
56/// [`Device::authorize_ssh`][crate::Device::authorize_ssh] (fail-closed — an unknown peer, an
57/// absent policy, or a non-matching policy all reject). The `ssh` policy block's accept/reject
58/// rules, principal matching, and SSH-user mapping are honored. A rule that **demands** session
59/// recording (non-empty `recorders`) or `holdAndDelegate` is enforced **fail-closed**: since this
60/// fork has no recorder transport / delegate round-trip yet, such a session is refused rather than
61/// silently accepted un-recorded (see [`auth_none`]). Building those transports is deferred.
62///
63/// [`auth_none`]: russh::server::Handler::auth_none
64pub struct ChannelServer<H> {
65 channel_state: HashMap<ChannelId, ChannelState>,
66 remote: SocketAddr,
67 dev: Arc<Device>,
68 /// The accepted identity from the single [`auth_none`][russh::server::Handler::auth_none]
69 /// authorization decision, stashed so per-channel handlers run as the policy-mapped user.
70 /// `None` until a successful `auth_none`; a channel open with `None` here fails closed.
71 accepted: Option<SshAccept>,
72 _handler: PhantomSend<H>,
73}
74
75struct PhantomSend<H>(PhantomData<fn() -> H>);
76
77/// Maximum number of concurrent channels a single SSH connection may open. Each channel spawns a
78/// session handler (e.g. a login shell), so this caps the per-connection resource/process fan-out
79/// an authorized-but-hostile peer can induce. SSH clients realistically open one (or a few)
80/// sessions per connection, so this is generous for legitimate use.
81const MAX_CHANNELS_PER_CONN: usize = 16;
82
83/// Whether a connection at `open_channels` currently-open channels has reached the per-connection
84/// channel cap and must refuse the next channel open. Pure boundary predicate extracted from
85/// [`ChannelServer::channel_open_session`] so the fork-bomb guard's edge can be unit-tested without
86/// a live russh [`Session`].
87fn at_channel_cap(open_channels: usize) -> bool {
88 open_channels >= MAX_CHANNELS_PER_CONN
89}
90
91/// Fallback message logged when a `recording_required` session is refused and the policy supplied
92/// no message of its own.
93const DEFAULT_RECORDING_REFUSAL: &str =
94 "policy requires session recording but recording is not available";
95
96/// The fail-closed recording gate (tsr-0h2), extracted as a pure predicate so it can be unit-tested
97/// without a live russh [`Session`]/[`Device`] (mirrors [`at_channel_cap`]).
98///
99/// Returns `Some(message)` when the accepted session must be **refused** because the matched rule
100/// demands a capability this fork cannot provide — session recording (non-empty `recorders`) or a
101/// `holdAndDelegate` decision (both surfaced as [`SshAccept::recording_required`]) — and there is no
102/// recorder/delegate transport yet. The message is the policy's
103/// [`recording_refusal_message`][crate::ssh::SshAccept::recording_refusal_message] when non-empty,
104/// else [`DEFAULT_RECORDING_REFUSAL`]. Returns `None` for the common case (no recorders, no
105/// delegate), so those sessions accept unchanged.
106///
107/// TODO(tsr-0h2 follow-up): once the recorder stream transport exists (dial `recorders`, asciinema/
108/// CastV2 stream, tee PTY I/O at `shell.rs`) — and a Noise control round-trip backs `holdAndDelegate`
109/// — relax this to Go `tailssh`'s true default: fail-OPEN on a recorder-connect failure UNLESS
110/// `on_recording_failure.reject_session_with_message` is set. Until then, refuse rather than record
111/// nothing.
112fn recording_refusal(accept: &SshAccept) -> Option<String> {
113 if !accept.recording_required {
114 return None;
115 }
116 if accept.recording_refusal_message.is_empty() {
117 Some(DEFAULT_RECORDING_REFUSAL.to_string())
118 } else {
119 Some(accept.recording_refusal_message.clone())
120 }
121}
122
123#[derive(thiserror::Error, Debug, Copy, Clone, PartialEq, Eq)]
124#[error("no such channel")]
125struct NoChannel;
126
127/// State of a channel in [`ChannelServer`].
128struct ChannelState {
129 channel: ChannelId,
130 tx: UnboundedSender<Request>,
131 _joinset: JoinSet<()>,
132}
133
134impl ChannelState {
135 fn send(&self, event: ChannelEvent) {
136 if self.tx.send((self.channel, event)).is_err() {
137 tracing::error!(channel = %self.channel, "failed to send event");
138 }
139 }
140}
141
142impl<H> ChannelServer<H> {
143 fn get_channel(
144 &mut self,
145 id: ChannelId,
146 ) -> Result<&mut ChannelState, Box<dyn std::error::Error + Send + Sync + 'static>> {
147 self.channel_state.get_mut(&id).ok_or(Box::new(NoChannel))
148 }
149}
150
151impl<H> TailnetServer for ChannelServer<H> {
152 fn new_client(dev: Arc<Device>, addr: SocketAddr) -> Self {
153 Self {
154 channel_state: Default::default(),
155 dev,
156 remote: addr,
157 accepted: None,
158 _handler: PhantomSend(PhantomData),
159 }
160 }
161}
162
163/// An event that may be generated by a channel connected to a [`ChannelServer`].
164#[derive(Debug, Clone)]
165pub enum ChannelEvent {
166 /// Data was received over the channel.
167 Data(Vec<u8>),
168 /// A resize event occurred.
169 Resize {
170 /// The new width of the tty.
171 width: u16,
172 /// The new height of the tty.
173 height: u16,
174 },
175 /// A signal was sent over the channel.
176 Signal(Sig),
177 /// The channel was closed.
178 Close,
179 /// The channel received EOF.
180 Eof,
181}
182
183impl<H> russh::server::Handler for ChannelServer<H>
184where
185 H: ChannelHandler + Send,
186 H::Error: Send,
187{
188 type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
189
190 #[tracing::instrument(skip_all, fields(user = %user, remote = ?self.remote))]
191 async fn auth_none(&mut self, user: &str) -> Result<Auth, Self::Error> {
192 // Enforce the control-pushed Tailscale SSH policy. Fail-closed: an unknown source, an
193 // absent policy, a non-matching policy, or any lookup error all reject the connection.
194 match self.dev.authorize_ssh(self.remote, user).await {
195 Ok(crate::ssh::SshDecision::Accept(accept)) => {
196 // SECURITY (tsr-0h2): a matched rule that DEMANDS session recording (non-empty
197 // `recorders`) — or a `holdAndDelegate` decision — cannot be honored because this
198 // fork has no recorder transport / delegate round-trip yet. Refuse the session
199 // (fail-closed) rather than silently downgrade it to a plain accept. This mirrors
200 // Go `tailssh`'s posture when `OnRecordingFailure.RejectSessionWithMessage` is set.
201 // `Auth::reject()` (the SSH `none`-method rejection) carries no client-visible
202 // message, so the policy's refusal message is surfaced in the warning log.
203 if let Some(msg) = recording_refusal(&accept) {
204 tracing::warn!(
205 local_user = %accept.local_user,
206 recorders = ?accept.recorders,
207 message = %msg,
208 "ssh: session refused: policy requires session recording but recording is not available"
209 );
210 return Ok(Auth::reject());
211 }
212 tracing::debug!(
213 local_user = %accept.local_user,
214 "ssh: policy accepted connection"
215 );
216 // Stash the accepted identity so the per-channel handler runs as the
217 // policy-mapped local user. This is the single fail-closed authorization point;
218 // the handler never re-evaluates policy.
219 self.accepted = Some(accept);
220 Ok(Auth::Accept)
221 }
222 Ok(crate::ssh::SshDecision::Deny(reason)) => {
223 tracing::warn!(?reason, "ssh: policy denied connection");
224 Ok(Auth::reject())
225 }
226 Err(e) => {
227 tracing::error!(error = %e, "ssh: authorization failed; rejecting");
228 Ok(Auth::reject())
229 }
230 }
231 }
232
233 async fn channel_open_session(
234 &mut self,
235 channel: Channel<Msg>,
236 session: &mut Session,
237 ) -> Result<bool, Self::Error> {
238 tracing::debug!(channel = ?channel.id(), "new session");
239
240 // Fail closed: a channel open must be preceded by a successful `auth_none` that stashed
241 // the accepted identity. If it is somehow absent, refuse to open the channel rather than
242 // run a handler with no authorized user.
243 let Some(accept) = self.accepted.clone() else {
244 tracing::error!(
245 channel = ?channel.id(),
246 "ssh: channel open with no accepted identity; refusing"
247 );
248 return Ok(false);
249 };
250
251 // Bound the number of concurrent channels (each opens a session/handler — e.g. a login
252 // shell). Without this an authorized-but-hostile peer could open unbounded channels on one
253 // connection and fork-bomb the host with session handlers. Past the cap, refuse new channels.
254 if at_channel_cap(self.channel_state.len()) {
255 tracing::warn!(
256 channel = ?channel.id(),
257 cap = MAX_CHANNELS_PER_CONN,
258 "ssh: per-connection channel cap reached; refusing new channel"
259 );
260 return Ok(false);
261 }
262
263 let (tx, mut rx) = mpsc::unbounded_channel::<Request>();
264 let mut joinset = JoinSet::new();
265
266 let (channel_id, session_handle) = (channel.id(), session.handle());
267 let dev = self.dev.clone();
268
269 joinset.spawn(async move {
270 let rt = tokio::runtime::Handle::current();
271
272 let mut handler = match H::new(rt, channel_id, session_handle.clone(), dev, &accept) {
273 Ok(handler) => handler,
274 Err(e) => {
275 let e = e.into();
276 tracing::error!(error = %e, %channel_id, "spawning channel handler");
277
278 if session_handle.close(channel_id).await.is_err() {
279 tracing::error!("failed closing channel after handler init error");
280 };
281
282 return;
283 }
284 };
285
286 while let Some((_channel, evt)) = rx.recv().await {
287 let result = handler.handle_event(&evt).await;
288
289 if let Err(e) = result {
290 let e = e.into();
291 tracing::error!(error = %e, %channel_id, ?evt, "handling event");
292
293 if session_handle.close(channel_id).await.is_err() {
294 tracing::error!("failed closing channel after event handler error");
295 };
296
297 break;
298 }
299 }
300
301 tracing::debug!(?channel_id, "closed");
302 });
303
304 self.channel_state.insert(
305 channel.id(),
306 ChannelState {
307 channel: channel.id(),
308 tx,
309 _joinset: joinset,
310 },
311 );
312
313 session.channel_success(channel.id())?;
314
315 Ok(true)
316 }
317
318 async fn channel_close(
319 &mut self,
320 channel: ChannelId,
321 session: &mut Session,
322 ) -> Result<(), Self::Error> {
323 tracing::trace!(?channel, "session closed");
324
325 self.get_channel(channel)?.send(ChannelEvent::Close);
326 self.channel_state.remove(&channel);
327
328 session.channel_success(channel)?;
329
330 Ok(())
331 }
332
333 async fn signal(
334 &mut self,
335 channel: ChannelId,
336 signal: Sig,
337 session: &mut Session,
338 ) -> Result<(), Self::Error> {
339 self.get_channel(channel)?
340 .send(ChannelEvent::Signal(signal));
341 session.channel_success(channel)?;
342
343 Ok(())
344 }
345
346 async fn data(
347 &mut self,
348 channel: ChannelId,
349 data: &[u8],
350 session: &mut Session,
351 ) -> Result<(), Self::Error> {
352 self.get_channel(channel)?
353 .send(ChannelEvent::Data(data.into()));
354
355 session.channel_success(channel)?;
356
357 Ok(())
358 }
359
360 async fn channel_eof(
361 &mut self,
362 channel: ChannelId,
363 session: &mut Session,
364 ) -> Result<(), Self::Error> {
365 self.get_channel(channel)?.send(ChannelEvent::Eof);
366 session.channel_success(channel)?;
367
368 Ok(())
369 }
370
371 async fn window_change_request(
372 &mut self,
373 channel: ChannelId,
374 col_width: u32,
375 row_height: u32,
376 _: u32,
377 _: u32,
378 session: &mut Session,
379 ) -> Result<(), Self::Error> {
380 self.get_channel(channel)?.send(ChannelEvent::Resize {
381 width: col_width as _,
382 height: row_height as _,
383 });
384
385 session.channel_success(channel)?;
386
387 Ok(())
388 }
389
390 async fn pty_request(
391 &mut self,
392 channel: ChannelId,
393 _: &str,
394 col_width: u32,
395 row_height: u32,
396 _: u32,
397 _: u32,
398 _: &[(Pty, u32)],
399 session: &mut Session,
400 ) -> Result<(), Self::Error> {
401 self.get_channel(channel)?.send(ChannelEvent::Resize {
402 width: col_width as _,
403 height: row_height as _,
404 });
405
406 session.channel_success(channel)?;
407
408 Ok(())
409 }
410}
411
412#[cfg(test)]
413mod tests {
414 use super::{
415 DEFAULT_RECORDING_REFUSAL, MAX_CHANNELS_PER_CONN, at_channel_cap, recording_refusal,
416 };
417 use crate::ssh::SshAccept;
418
419 /// The per-connection channel cap (fork-bomb guard) refuses at and beyond `MAX_CHANNELS_PER_CONN`
420 /// and allows below it. Pins the exact boundary: a `>=`→`>` flip would let `MAX_CHANNELS_PER_CONN`
421 /// open channels become `MAX_CHANNELS_PER_CONN + 1`, failing the `== cap` assertion below.
422 #[test]
423 fn channel_cap_boundary_is_inclusive() {
424 // Below the cap: still allowed.
425 assert!(!at_channel_cap(MAX_CHANNELS_PER_CONN - 1));
426 assert!(!at_channel_cap(15));
427 // At the cap: refuse the next open (the channel that would make it 17).
428 assert!(at_channel_cap(MAX_CHANNELS_PER_CONN));
429 assert!(at_channel_cap(16));
430 // Above the cap (defensive): still refused.
431 assert!(at_channel_cap(17));
432 // The const itself is the documented value.
433 assert_eq!(MAX_CHANNELS_PER_CONN, 16);
434 }
435
436 fn accept(recording_required: bool, refusal_message: &str) -> SshAccept {
437 SshAccept {
438 local_user: "root".to_string(),
439 accept_env: Vec::new(),
440 session_duration_nanos: None,
441 allow_agent_forwarding: false,
442 allow_local_port_forwarding: false,
443 allow_remote_port_forwarding: false,
444 recorders: Vec::new(),
445 recording_required,
446 recording_refusal_message: refusal_message.to_string(),
447 }
448 }
449
450 /// tsr-0h2: an accept that demands recording must be REFUSED (the bypass is closed). With a
451 /// policy-supplied message, that exact message is used; without one, the default is logged.
452 #[test]
453 fn recording_required_accept_is_refused() {
454 // Policy-supplied refusal message wins.
455 assert_eq!(
456 recording_refusal(&accept(true, "recording required by policy")),
457 Some("recording required by policy".to_string()),
458 );
459 // No message → default refusal text, but still a refusal (Some).
460 assert_eq!(
461 recording_refusal(&accept(true, "")),
462 Some(DEFAULT_RECORDING_REFUSAL.to_string()),
463 );
464 }
465
466 /// Regression guard for the common path: a normal accept (no recording demanded) is NOT refused,
467 /// so the gate is a no-op and the session proceeds.
468 #[test]
469 fn normal_accept_is_not_refused() {
470 assert_eq!(recording_refusal(&accept(false, "")), None);
471 // Even a stray non-empty message never forces a refusal when recording isn't required.
472 assert_eq!(recording_refusal(&accept(false, "ignored")), None);
473 }
474}