russh/server/
mod.rs

1// Copyright 2016 Pierre-Étienne Meunier
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15
16//! # Writing servers
17//!
18//! There are two ways of accepting connections:
19//! * implement the [Server](server::Server) trait and let [run_on_socket](server::Server::run_on_socket)/[run_on_address](server::Server::run_on_address) handle everything
20//! * accept connections yourself and pass them to [run_stream](server::run_stream)
21//!
22//! In both cases, you'll first need to implement the [Handler](server::Handler) trait -
23//! this is where you'll handle various events.
24//!
25//! Check out the following examples:
26//!
27//! * [Server that forwards your input to all connected clients](https://github.com/warp-tech/russh/blob/main/russh/examples/echoserver.rs)
28//! * [Server handing channel processing off to a library (here, `russh-sftp`)](https://github.com/warp-tech/russh/blob/main/russh/examples/sftp_server.rs)
29//! * Serving `ratatui` based TUI app to clients: [per-client](https://github.com/warp-tech/russh/blob/main/russh/examples/ratatui_app.rs), [shared](https://github.com/warp-tech/russh/blob/main/russh/examples/ratatui_shared_app.rs)
30
31use std;
32use std::collections::{HashMap, VecDeque};
33use std::num::Wrapping;
34use std::pin::Pin;
35use std::sync::Arc;
36use std::task::{Context, Poll};
37
38use bytes::Bytes;
39use client::GexParams;
40use futures::future::Future;
41use log::{debug, error, info, warn};
42use msg::{is_kex_msg, validate_client_msg_strict_kex};
43use russh_util::runtime::JoinHandle;
44use russh_util::time::Instant;
45use ssh_key::{Certificate, PrivateKey};
46use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
47use tokio::net::{TcpListener, ToSocketAddrs};
48use tokio::pin;
49use tokio::sync::{broadcast, mpsc};
50
51use crate::cipher::{clear, OpeningKey};
52use crate::kex::dh::groups::{DhGroup, BUILTIN_SAFE_DH_GROUPS, DH_GROUP14};
53use crate::kex::{KexProgress, SessionKexState};
54use crate::session::*;
55use crate::ssh_read::*;
56use crate::sshbuffer::*;
57use crate::{*};
58
59mod kex;
60mod session;
61pub use self::session::*;
62mod encrypted;
63
64/// Configuration of a server.
65pub struct Config {
66    /// The server ID string sent at the beginning of the protocol.
67    pub server_id: SshId,
68    /// Authentication methods proposed to the client.
69    pub methods: auth::MethodSet,
70    /// Authentication rejections must happen in constant time for
71    /// security reasons. Russh does not handle this by default.
72    pub auth_rejection_time: std::time::Duration,
73    /// Authentication rejection time override for the initial "none" auth attempt.
74    /// OpenSSH clients will send an initial "none" auth to probe for authentication methods.
75    pub auth_rejection_time_initial: Option<std::time::Duration>,
76    /// The server's keys. The first key pair in the client's preference order will be chosen.
77    pub keys: Vec<PrivateKey>,
78    /// The bytes and time limits before key re-exchange.
79    pub limits: Limits,
80    /// The initial size of a channel (used for flow control).
81    pub window_size: u32,
82    /// The maximal size of a single packet.
83    pub maximum_packet_size: u32,
84    /// Buffer size for each channel (a number of unprocessed messages to store before propagating backpressure to the TCP stream)
85    pub channel_buffer_size: usize,
86    /// Internal event buffer size
87    pub event_buffer_size: usize,
88    /// Lists of preferred algorithms.
89    pub preferred: Preferred,
90    /// Maximal number of allowed authentication attempts.
91    pub max_auth_attempts: usize,
92    /// Time after which the connection is garbage-collected.
93    pub inactivity_timeout: Option<std::time::Duration>,
94    /// If nothing is received from the client for this amount of time, send a keepalive message.
95    pub keepalive_interval: Option<std::time::Duration>,
96    /// If this many keepalives have been sent without reply, close the connection.
97    pub keepalive_max: usize,
98    /// If active, invoke `set_nodelay(true)` on client sockets; disabled by default (i.e. Nagle's algorithm is active).
99    pub nodelay: bool,
100}
101
102impl Default for Config {
103    fn default() -> Config {
104        Config {
105            server_id: SshId::Standard(format!(
106                "SSH-2.0-{}_{}",
107                env!("CARGO_PKG_NAME"),
108                env!("CARGO_PKG_VERSION")
109            )),
110            methods: auth::MethodSet::all(),
111            auth_rejection_time: std::time::Duration::from_secs(1),
112            auth_rejection_time_initial: None,
113            keys: Vec::new(),
114            window_size: 2097152,
115            maximum_packet_size: 32768,
116            channel_buffer_size: 100,
117            event_buffer_size: 10,
118            limits: Limits::default(),
119            preferred: Default::default(),
120            max_auth_attempts: 10,
121            inactivity_timeout: Some(std::time::Duration::from_secs(600)),
122            keepalive_interval: None,
123            keepalive_max: 3,
124            nodelay: false,
125        }
126    }
127}
128
129impl Debug for Config {
130    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131        // display everything except the private keys
132        f.debug_struct("Config")
133            .field("server_id", &self.server_id)
134            .field("methods", &self.methods)
135            .field("auth_rejection_time", &self.auth_rejection_time)
136            .field(
137                "auth_rejection_time_initial",
138                &self.auth_rejection_time_initial,
139            )
140            .field("keys", &"***")
141            .field("window_size", &self.window_size)
142            .field("maximum_packet_size", &self.maximum_packet_size)
143            .field("channel_buffer_size", &self.channel_buffer_size)
144            .field("event_buffer_size", &self.event_buffer_size)
145            .field("limits", &self.limits)
146            .field("preferred", &self.preferred)
147            .field("max_auth_attempts", &self.max_auth_attempts)
148            .field("inactivity_timeout", &self.inactivity_timeout)
149            .field("keepalive_interval", &self.keepalive_interval)
150            .field("keepalive_max", &self.keepalive_max)
151            .finish()
152    }
153}
154
155/// A client's response in a challenge-response authentication.
156///
157/// You should iterate it to get `&[u8]` response slices.
158pub struct Response<'a>(&'a mut (dyn Iterator<Item = Option<Bytes>> + Send));
159
160impl Iterator for Response<'_> {
161    type Item = Bytes;
162    fn next(&mut self) -> Option<Self::Item> {
163        self.0.next().flatten()
164    }
165}
166
167use std::borrow::Cow;
168/// An authentication result, in a challenge-response authentication.
169#[derive(Debug, PartialEq, Eq)]
170pub enum Auth {
171    /// Reject the authentication request.
172    Reject {
173        proceed_with_methods: Option<MethodSet>,
174        partial_success: bool,
175    },
176    /// Accept the authentication request.
177    Accept,
178
179    /// Method was not accepted, but no other check was performed.
180    UnsupportedMethod,
181
182    /// Partially accept the challenge-response authentication
183    /// request, providing more instructions for the client to follow.
184    Partial {
185        /// Name of this challenge.
186        name: Cow<'static, str>,
187        /// Instructions for this challenge.
188        instructions: Cow<'static, str>,
189        /// A number of prompts to the user. Each prompt has a `bool`
190        /// indicating whether the terminal must echo the characters
191        /// typed by the user.
192        prompts: Cow<'static, [(Cow<'static, str>, bool)]>,
193    },
194}
195
196impl Auth {
197    pub fn reject() -> Self {
198        Auth::Reject {
199            proceed_with_methods: None,
200            partial_success: false,
201        }
202    }
203}
204
205/// Server handler. Each client will have their own handler.
206///
207/// Note: this is an async trait. The trait functions return `impl Future`,
208/// and you can simply define them as `async fn` instead.
209#[cfg_attr(feature = "async-trait", async_trait::async_trait)]
210pub trait Handler: Sized {
211    type Error: From<crate::Error> + Send;
212
213    /// Check authentication using the "none" method. Russh makes
214    /// sure rejection happens in time `config.auth_rejection_time`,
215    /// except if this method takes more than that.
216    #[allow(unused_variables)]
217    fn auth_none(&mut self, user: &str) -> impl Future<Output = Result<Auth, Self::Error>> + Send {
218        async { Ok(Auth::reject()) }
219    }
220
221    /// Check authentication using the "password" method. Russh
222    /// makes sure rejection happens in time
223    /// `config.auth_rejection_time`, except if this method takes more
224    /// than that.
225    #[allow(unused_variables)]
226    fn auth_password(
227        &mut self,
228        user: &str,
229        password: &str,
230    ) -> impl Future<Output = Result<Auth, Self::Error>> + Send {
231        async { Ok(Auth::reject()) }
232    }
233
234    /// Check authentication using the "publickey" method. This method
235    /// should just check whether the public key matches the
236    /// authorized ones. Russh then checks the signature. If the key
237    /// is unknown, or the signature is invalid, Russh guarantees
238    /// that rejection happens in constant time
239    /// `config.auth_rejection_time`, except if this method takes more
240    /// time than that.
241    #[allow(unused_variables)]
242    fn auth_publickey_offered(
243        &mut self,
244        user: &str,
245        public_key: &ssh_key::PublicKey,
246    ) -> impl Future<Output = Result<Auth, Self::Error>> + Send {
247        async { Ok(Auth::Accept) }
248    }
249
250    /// Check authentication using the "publickey" method. This method
251    /// is called after the signature has been verified and key
252    /// ownership has been confirmed.
253    /// Russh guarantees that rejection happens in constant time
254    /// `config.auth_rejection_time`, except if this method takes more
255    /// time than that.
256    #[allow(unused_variables)]
257    fn auth_publickey(
258        &mut self,
259        user: &str,
260        public_key: &ssh_key::PublicKey,
261    ) -> impl Future<Output = Result<Auth, Self::Error>> + Send {
262        async { Ok(Auth::reject()) }
263    }
264
265    /// Check authentication using an OpenSSH certificate. This method
266    /// is called after the signature has been verified and key
267    /// ownership has been confirmed.
268    /// Russh guarantees that rejection happens in constant time
269    /// `config.auth_rejection_time`, except if this method takes more
270    /// time than that.
271    #[allow(unused_variables)]
272    fn auth_openssh_certificate(
273        &mut self,
274        user: &str,
275        certificate: &Certificate,
276    ) -> impl Future<Output = Result<Auth, Self::Error>> + Send {
277        async { Ok(Auth::reject()) }
278    }
279
280    /// Check authentication using the "keyboard-interactive"
281    /// method. Russh makes sure rejection happens in time
282    /// `config.auth_rejection_time`, except if this method takes more
283    /// than that.
284    #[allow(unused_variables)]
285    fn auth_keyboard_interactive<'a>(
286        &'a mut self,
287        user: &str,
288        submethods: &str,
289        response: Option<Response<'a>>,
290    ) -> impl Future<Output = Result<Auth, Self::Error>> + Send {
291        async { Ok(Auth::reject()) }
292    }
293
294    /// Called when authentication succeeds for a session.
295    #[allow(unused_variables)]
296    fn auth_succeeded(
297        &mut self,
298        session: &mut Session,
299    ) -> impl Future<Output = Result<(), Self::Error>> + Send {
300        async { Ok(()) }
301    }
302
303    /// Called when authentication starts but before it is successful.
304    /// Return value is an authentication banner, usually a warning message shown to the client.
305    #[allow(unused_variables)]
306    fn authentication_banner(
307        &mut self,
308    ) -> impl Future<Output = Result<Option<String>, Self::Error>> + Send {
309        async { Ok(None) }
310    }
311
312    /// Called when the client closes a channel.
313    #[allow(unused_variables)]
314    fn channel_close(
315        &mut self,
316        channel: ChannelId,
317        session: &mut Session,
318    ) -> impl Future<Output = Result<(), Self::Error>> + Send {
319        async { Ok(()) }
320    }
321
322    /// Called when the client sends EOF to a channel.
323    #[allow(unused_variables)]
324    fn channel_eof(
325        &mut self,
326        channel: ChannelId,
327        session: &mut Session,
328    ) -> impl Future<Output = Result<(), Self::Error>> + Send {
329        async { Ok(()) }
330    }
331
332    /// Called when a new session channel is created.
333    /// Return value indicates whether the channel request should be granted.
334    #[allow(unused_variables)]
335    fn channel_open_session(
336        &mut self,
337        channel: Channel<Msg>,
338        session: &mut Session,
339    ) -> impl Future<Output = Result<bool, Self::Error>> + Send {
340        async { Ok(false) }
341    }
342
343    /// Called when a new X11 channel is created.
344    /// Return value indicates whether the channel request should be granted.
345    #[allow(unused_variables)]
346    fn channel_open_x11(
347        &mut self,
348        channel: Channel<Msg>,
349        originator_address: &str,
350        originator_port: u32,
351        session: &mut Session,
352    ) -> impl Future<Output = Result<bool, Self::Error>> + Send {
353        async { Ok(false) }
354    }
355
356    /// Called when a new direct TCP/IP ("local TCP forwarding") channel is opened.
357    /// Return value indicates whether the channel request should be granted.
358    #[allow(unused_variables)]
359    fn channel_open_direct_tcpip(
360        &mut self,
361        channel: Channel<Msg>,
362        host_to_connect: &str,
363        port_to_connect: u32,
364        originator_address: &str,
365        originator_port: u32,
366        session: &mut Session,
367    ) -> impl Future<Output = Result<bool, Self::Error>> + Send {
368        async { Ok(false) }
369    }
370
371    /// Called when a new remote forwarded TCP connection comes in.
372    /// <https://www.rfc-editor.org/rfc/rfc4254#section-7>
373    #[allow(unused_variables)]
374    fn channel_open_forwarded_tcpip(
375        &mut self,
376        channel: Channel<Msg>,
377        host_to_connect: &str,
378        port_to_connect: u32,
379        originator_address: &str,
380        originator_port: u32,
381        session: &mut Session,
382    ) -> impl Future<Output = Result<bool, Self::Error>> + Send {
383        async { Ok(false) }
384    }
385
386    /// Called when a new direct-streamlocal ("local UNIX socket forwarding") channel is created.
387    /// Return value indicates whether the channel request should be granted.
388    #[allow(unused_variables)]
389    fn channel_open_direct_streamlocal(
390        &mut self,
391        channel: Channel<Msg>,
392        socket_path: &str,
393        session: &mut Session,
394    ) -> impl Future<Output = Result<bool, Self::Error>> + Send {
395        async { Ok(false) }
396    }
397
398    /// Called when the client confirmed our request to open a
399    /// channel. A channel can only be written to after receiving this
400    /// message (this library panics otherwise).
401    #[allow(unused_variables)]
402    fn channel_open_confirmation(
403        &mut self,
404        id: ChannelId,
405        max_packet_size: u32,
406        window_size: u32,
407        session: &mut Session,
408    ) -> impl Future<Output = Result<(), Self::Error>> + Send {
409        async { Ok(()) }
410    }
411
412    /// Called when a data packet is received. A response can be
413    /// written to the `response` argument.
414    #[allow(unused_variables)]
415    fn data(
416        &mut self,
417        channel: ChannelId,
418        data: &[u8],
419        session: &mut Session,
420    ) -> impl Future<Output = Result<(), Self::Error>> + Send {
421        async { Ok(()) }
422    }
423
424    /// Called when an extended data packet is received. Code 1 means
425    /// that this packet comes from stderr, other codes are not
426    /// defined (see
427    /// [RFC4254](https://tools.ietf.org/html/rfc4254#section-5.2)).
428    #[allow(unused_variables)]
429    fn extended_data(
430        &mut self,
431        channel: ChannelId,
432        code: u32,
433        data: &[u8],
434        session: &mut Session,
435    ) -> impl Future<Output = Result<(), Self::Error>> + Send {
436        async { Ok(()) }
437    }
438
439    /// Called when the network window is adjusted, meaning that we
440    /// can send more bytes.
441    #[allow(unused_variables)]
442    fn window_adjusted(
443        &mut self,
444        channel: ChannelId,
445        new_size: u32,
446        session: &mut Session,
447    ) -> impl Future<Output = Result<(), Self::Error>> + Send {
448        async { Ok(()) }
449    }
450
451    /// Called when this server adjusts the network window. Return the
452    /// next target window.
453    #[allow(unused_variables)]
454    fn adjust_window(&mut self, channel: ChannelId, current: u32) -> u32 {
455        current
456    }
457
458    /// The client requests a pseudo-terminal with the given
459    /// specifications.
460    ///
461    /// **Note:** Success or failure should be communicated to the client by calling
462    /// `session.channel_success(channel)` or `session.channel_failure(channel)` respectively. For
463    /// instance:
464    ///
465    /// ```ignore
466    /// async fn pty_request(
467    ///     &mut self,
468    ///     channel: ChannelId,
469    ///     term: &str,
470    ///     col_width: u32,
471    ///     row_height: u32,
472    ///     pix_width: u32,
473    ///     pix_height: u32,
474    ///     modes: &[(Pty, u32)],
475    ///     session: &mut Session,
476    /// ) -> Result<(), Self::Error> {
477    ///     session.channel_success(channel);
478    ///     Ok(())
479    /// }
480    /// ```
481    #[allow(unused_variables, clippy::too_many_arguments)]
482    fn pty_request(
483        &mut self,
484        channel: ChannelId,
485        term: &str,
486        col_width: u32,
487        row_height: u32,
488        pix_width: u32,
489        pix_height: u32,
490        modes: &[(Pty, u32)],
491        session: &mut Session,
492    ) -> impl Future<Output = Result<(), Self::Error>> + Send {
493        async { Ok(()) }
494    }
495
496    /// The client requests an X11 connection.
497    ///
498    /// **Note:** Success or failure should be communicated to the client by calling
499    /// `session.channel_success(channel)` or `session.channel_failure(channel)` respectively. For
500    /// instance:
501    ///
502    /// ```ignore
503    /// async fn x11_request(
504    ///     &mut self,
505    ///     channel: ChannelId,
506    ///     single_connection: bool,
507    ///     x11_auth_protocol: &str,
508    ///     x11_auth_cookie: &str,
509    ///     x11_screen_number: u32,
510    ///     session: &mut Session,
511    /// ) -> Result<(), Self::Error> {
512    ///     session.channel_success(channel);
513    ///     Ok(())
514    /// }
515    /// ```
516    #[allow(unused_variables)]
517    fn x11_request(
518        &mut self,
519        channel: ChannelId,
520        single_connection: bool,
521        x11_auth_protocol: &str,
522        x11_auth_cookie: &str,
523        x11_screen_number: u32,
524        session: &mut Session,
525    ) -> impl Future<Output = Result<(), Self::Error>> + Send {
526        async { Ok(()) }
527    }
528
529    /// The client wants to set the given environment variable. Check
530    /// these carefully, as it is dangerous to allow any variable
531    /// environment to be set.
532    ///
533    /// **Note:** Success or failure should be communicated to the client by calling
534    /// `session.channel_success(channel)` or `session.channel_failure(channel)` respectively. For
535    /// instance:
536    ///
537    /// ```ignore
538    /// async fn env_request(
539    ///     &mut self,
540    ///     channel: ChannelId,
541    ///     variable_name: &str,
542    ///     variable_value: &str,
543    ///     session: &mut Session,
544    /// ) -> Result<(), Self::Error> {
545    ///     session.channel_success(channel);
546    ///     Ok(())
547    /// }
548    /// ```
549    #[allow(unused_variables)]
550    fn env_request(
551        &mut self,
552        channel: ChannelId,
553        variable_name: &str,
554        variable_value: &str,
555        session: &mut Session,
556    ) -> impl Future<Output = Result<(), Self::Error>> + Send {
557        async { Ok(()) }
558    }
559
560    /// The client requests a shell.
561    ///
562    /// **Note:** Success or failure should be communicated to the client by calling
563    /// `session.channel_success(channel)` or `session.channel_failure(channel)` respectively. For
564    /// instance:
565    ///
566    /// ```ignore
567    /// async fn shell_request(
568    ///     &mut self,
569    ///     channel: ChannelId,
570    ///     session: &mut Session,
571    /// ) -> Result<(), Self::Error> {
572    ///     session.channel_success(channel);
573    ///     Ok(())
574    /// }
575    /// ```
576    #[allow(unused_variables)]
577    fn shell_request(
578        &mut self,
579        channel: ChannelId,
580        session: &mut Session,
581    ) -> impl Future<Output = Result<(), Self::Error>> + Send {
582        async { Ok(()) }
583    }
584
585    /// The client sends a command to execute, to be passed to a
586    /// shell. Make sure to check the command before doing so.
587    ///
588    /// **Note:** Success or failure should be communicated to the client by calling
589    /// `session.channel_success(channel)` or `session.channel_failure(channel)` respectively. For
590    /// instance:
591    ///
592    /// ```ignore
593    /// async fn exec_request(
594    ///     &mut self,
595    ///     channel: ChannelId,
596    ///     data: &[u8],
597    ///     session: &mut Session,
598    /// ) -> Result<(), Self::Error> {
599    ///     session.channel_success(channel);
600    ///     Ok(())
601    /// }
602    /// ```
603    #[allow(unused_variables)]
604    fn exec_request(
605        &mut self,
606        channel: ChannelId,
607        data: &[u8],
608        session: &mut Session,
609    ) -> impl Future<Output = Result<(), Self::Error>> + Send {
610        async { Ok(()) }
611    }
612
613    /// The client asks to start the subsystem with the given name
614    /// (such as sftp).
615    ///
616    /// **Note:** Success or failure should be communicated to the client by calling
617    /// `session.channel_success(channel)` or `session.channel_failure(channel)` respectively. For
618    /// instance:
619    ///
620    /// ```ignore
621    /// async fn subsystem_request(
622    ///     &mut self,
623    ///     channel: ChannelId,
624    ///     name: &str,
625    ///     session: &mut Session,
626    /// ) -> Result<(), Self::Error> {
627    ///     session.channel_success(channel);
628    ///     Ok(())
629    /// }
630    /// ```
631    #[allow(unused_variables)]
632    fn subsystem_request(
633        &mut self,
634        channel: ChannelId,
635        name: &str,
636        session: &mut Session,
637    ) -> impl Future<Output = Result<(), Self::Error>> + Send {
638        async { Ok(()) }
639    }
640
641    /// The client's pseudo-terminal window size has changed.
642    ///
643    /// **Note:** Success or failure should be communicated to the client by calling
644    /// `session.channel_success(channel)` or `session.channel_failure(channel)` respectively. For
645    /// instance:
646    ///
647    /// ```ignore
648    /// async fn window_change_request(
649    ///     &mut self,
650    ///     channel: ChannelId,
651    ///     col_width: u32,
652    ///     row_height: u32,
653    ///     pix_width: u32,
654    ///     pix_height: u32,
655    ///     session: &mut Session,
656    /// ) -> Result<(), Self::Error> {
657    ///     session.channel_success(channel);
658    ///     Ok(())
659    /// }
660    /// ```
661    #[allow(unused_variables)]
662    fn window_change_request(
663        &mut self,
664        channel: ChannelId,
665        col_width: u32,
666        row_height: u32,
667        pix_width: u32,
668        pix_height: u32,
669        session: &mut Session,
670    ) -> impl Future<Output = Result<(), Self::Error>> + Send {
671        async { Ok(()) }
672    }
673
674    /// The client requests OpenSSH agent forwarding
675    ///
676    /// **Note:** Success or failure should be communicated to the client by calling
677    /// `session.channel_success(channel)` or `session.channel_failure(channel)` respectively. For
678    /// instance:
679    ///
680    /// ```ignore
681    /// async fn agent_request(
682    ///     &mut self,
683    ///     channel: ChannelId,
684    ///     session: &mut Session,
685    /// ) -> Result<bool, Self::Error> {
686    ///     session.channel_success(channel);
687    ///     Ok(())
688    /// }
689    /// ```
690    #[allow(unused_variables)]
691    fn agent_request(
692        &mut self,
693        channel: ChannelId,
694        session: &mut Session,
695    ) -> impl Future<Output = Result<bool, Self::Error>> + Send {
696        async { Ok(false) }
697    }
698
699    /// The client is sending a signal (usually to pass to the
700    /// currently running process).
701    #[allow(unused_variables)]
702    fn signal(
703        &mut self,
704        channel: ChannelId,
705        signal: Sig,
706        session: &mut Session,
707    ) -> impl Future<Output = Result<(), Self::Error>> + Send {
708        async { Ok(()) }
709    }
710
711    /// Used for reverse-forwarding ports, see
712    /// [RFC4254](https://tools.ietf.org/html/rfc4254#section-7).
713    /// If `port` is 0, you should set it to the allocated port number.
714    #[allow(unused_variables)]
715    fn tcpip_forward(
716        &mut self,
717        address: &str,
718        port: &mut u32,
719        session: &mut Session,
720    ) -> impl Future<Output = Result<bool, Self::Error>> + Send {
721        async { Ok(false) }
722    }
723
724    /// Used to stop the reverse-forwarding of a port, see
725    /// [RFC4254](https://tools.ietf.org/html/rfc4254#section-7).
726    #[allow(unused_variables)]
727    fn cancel_tcpip_forward(
728        &mut self,
729        address: &str,
730        port: u32,
731        session: &mut Session,
732    ) -> impl Future<Output = Result<bool, Self::Error>> + Send {
733        async { Ok(false) }
734    }
735
736    #[allow(unused_variables)]
737    fn streamlocal_forward(
738        &mut self,
739        socket_path: &str,
740        session: &mut Session,
741    ) -> impl Future<Output = Result<bool, Self::Error>> + Send {
742        async { Ok(false) }
743    }
744
745    #[allow(unused_variables)]
746    fn cancel_streamlocal_forward(
747        &mut self,
748        socket_path: &str,
749        session: &mut Session,
750    ) -> impl Future<Output = Result<bool, Self::Error>> + Send {
751        async { Ok(false) }
752    }
753
754    /// Override when enabling the `diffie-hellman-group-exchange-*` key exchange methods.
755    /// Should return a Diffie-Hellman group with a safe prime whose length is
756    /// between `gex_params.min_group_size` and `gex_params.max_group_size` and
757    /// (if possible) over and as close as possible to `gex_params.preferred_group_size`.
758    ///
759    /// OpenSSH uses a pre-generated database of safe primes stored in `/etc/ssh/moduli`
760    ///
761    /// The default implementation picks a group from a very short static list
762    /// of built-in standard groups and is not really taking advantage of the security
763    /// offered by these kex methods.
764    ///
765    /// See https://datatracker.ietf.org/doc/html/rfc4419#section-3
766    #[allow(unused_variables)]
767    fn lookup_dh_gex_group(
768        &mut self,
769        gex_params: &GexParams,
770    ) -> impl Future<Output = Result<Option<DhGroup>, Self::Error>> + Send {
771        async {
772            let mut best_group = &DH_GROUP14;
773
774            // Find _some_ matching group
775            for group in BUILTIN_SAFE_DH_GROUPS.iter() {
776                if group.bit_size() >= gex_params.min_group_size()
777                    && group.bit_size() <= gex_params.max_group_size()
778                {
779                    best_group = *group;
780                    break;
781                }
782            }
783
784            // Find _closest_ matching group
785            for group in BUILTIN_SAFE_DH_GROUPS.iter() {
786                if group.bit_size() > gex_params.preferred_group_size() {
787                    best_group = *group;
788                    break;
789                }
790            }
791
792            Ok(Some(best_group.clone()))
793        }
794    }
795}
796
797pub struct RunningServerHandle {
798    shutdown_tx: broadcast::Sender<String>,
799}
800
801impl RunningServerHandle {
802    /// Request graceful server shutdown.
803    /// Starts the shutdown and immediately returns.
804    /// To wait for all the clients to disconnect, await `RunningServer` .
805    pub fn shutdown(&self, reason: String) {
806        let _ = self.shutdown_tx.send(reason);
807    }
808}
809
810pub struct RunningServer<F: Future<Output = std::io::Result<()>> + Unpin + Send> {
811    inner: F,
812    shutdown_tx: broadcast::Sender<String>,
813}
814
815impl<F: Future<Output = std::io::Result<()>> + Unpin + Send> RunningServer<F> {
816    pub fn handle(&self) -> RunningServerHandle {
817        RunningServerHandle {
818            shutdown_tx: self.shutdown_tx.clone(),
819        }
820    }
821}
822
823impl<F: Future<Output = std::io::Result<()>> + Unpin + Send> Future for RunningServer<F> {
824    type Output = std::io::Result<()>;
825
826    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
827        Future::poll(Pin::new(&mut self.inner), cx)
828    }
829}
830
831#[cfg_attr(feature = "async-trait", async_trait::async_trait)]
832/// Trait used to create new handlers when clients connect.
833pub trait Server {
834    /// The type of handlers.
835    type Handler: Handler + Send + 'static;
836    /// Called when a new client connects.
837    fn new_client(&mut self, peer_addr: Option<std::net::SocketAddr>) -> Self::Handler;
838    /// Called when an active connection fails.
839    fn handle_session_error(&mut self, _error: <Self::Handler as Handler>::Error) {}
840
841    /// Run a server on a specified `tokio::net::TcpListener`. Useful when dropping
842    /// privileges immediately after socket binding, for example.
843    fn run_on_socket(
844        &mut self,
845        config: Arc<Config>,
846        socket: &TcpListener,
847    ) -> RunningServer<impl Future<Output = std::io::Result<()>> + Unpin + Send>
848    where
849        Self: Send,
850    {
851        let (shutdown_tx, mut shutdown_rx) = broadcast::channel(1);
852        let shutdown_tx2 = shutdown_tx.clone();
853
854        let fut = async move {
855            if config.maximum_packet_size > 65535 {
856                error!(
857                    "Maximum packet size ({:?}) should not larger than a TCP packet (65535)",
858                    config.maximum_packet_size
859                );
860            }
861
862            let (error_tx, mut error_rx) = mpsc::unbounded_channel();
863
864            loop {
865                tokio::select! {
866                    _ = shutdown_rx.recv() => {
867                        debug!("Server shutdown requested");
868                        return Ok(());
869                    },
870                    accept_result = socket.accept() => {
871                        match accept_result {
872                            Ok((socket, peer_addr)) => {
873                                let mut shutdown_rx = shutdown_tx2.subscribe();
874
875                                let config = config.clone();
876                                // NOTE: For backwards compatibility, we keep the Option signature as changing it would be a breaking change.
877                                let handler = self.new_client(Some(peer_addr));
878                                let error_tx = error_tx.clone();
879
880                                russh_util::runtime::spawn(async move {
881                                    if config.nodelay {
882                                        if let Err(e) = socket.set_nodelay(true) {
883                                            warn!("set_nodelay() failed: {e:?}");
884                                        }
885                                    }
886
887                                    let session = match run_stream(config, socket, handler).await {
888                                        Ok(s) => s,
889                                        Err(e) => {
890                                            debug!("Connection setup failed");
891                                            let _ = error_tx.send(e);
892                                            return
893                                        }
894                                    };
895
896                                    let handle = session.handle();
897
898                                    tokio::select! {
899                                        reason = shutdown_rx.recv() => {
900                                            if handle.disconnect(
901                                                Disconnect::ByApplication,
902                                                reason.unwrap_or_else(|_| "".into()),
903                                                "".into()
904                                            ).await.is_err() {
905                                                debug!("Failed to send disconnect message");
906                                            }
907                                        },
908                                        result = session => {
909                                            if let Err(e) = result {
910                                                debug!("Connection closed with error");
911                                                let _ = error_tx.send(e);
912                                            } else {
913                                                debug!("Connection closed");
914                                            }
915                                        }
916                                    }
917                                });
918                            }
919                            Err(e) => {
920                                return Err(e);
921                            }
922                        }
923                    },
924
925                    Some(error) = error_rx.recv() => {
926                        self.handle_session_error(error);
927                    }
928                }
929            }
930        };
931
932        RunningServer {
933            inner: Box::pin(fut),
934            shutdown_tx,
935        }
936    }
937
938    /// Run a server.
939    /// This is a convenience function; consider using `run_on_socket` for more control.
940    fn run_on_address<A: ToSocketAddrs + Send>(
941        &mut self,
942        config: Arc<Config>,
943        addrs: A,
944    ) -> impl Future<Output = std::io::Result<()>> + Send
945    where
946        Self: Send,
947    {
948        async {
949            let socket = TcpListener::bind(addrs).await?;
950            self.run_on_socket(config, &socket).await?;
951            Ok(())
952        }
953    }
954}
955
956use std::cell::RefCell;
957thread_local! {
958    static B1: RefCell<CryptoVec> = RefCell::new(CryptoVec::new());
959    static B2: RefCell<CryptoVec> = RefCell::new(CryptoVec::new());
960}
961
962async fn start_reading<R: AsyncRead + Unpin>(
963    mut stream_read: R,
964    mut buffer: SSHBuffer,
965    mut cipher: Box<dyn OpeningKey + Send>,
966) -> Result<(usize, R, SSHBuffer, Box<dyn OpeningKey + Send>), Error> {
967    buffer.buffer.clear();
968    let n = cipher::read(&mut stream_read, &mut buffer, &mut *cipher).await?;
969    Ok((n, stream_read, buffer, cipher))
970}
971
972/// An active server session returned by [run_stream].
973///
974/// Implements [Future] and can be awaited to wait for the session to finish.
975pub struct RunningSession<H: Handler> {
976    handle: Handle,
977    join: JoinHandle<Result<(), H::Error>>,
978}
979
980impl<H: Handler> RunningSession<H> {
981    /// Returns a new handle for the session.
982    pub fn handle(&self) -> Handle {
983        self.handle.clone()
984    }
985}
986
987impl<H: Handler> Future for RunningSession<H> {
988    type Output = Result<(), H::Error>;
989    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
990        match Future::poll(Pin::new(&mut self.join), cx) {
991            Poll::Ready(r) => Poll::Ready(match r {
992                Ok(Ok(x)) => Ok(x),
993                Err(e) => Err(crate::Error::from(e).into()),
994                Ok(Err(e)) => Err(e),
995            }),
996            Poll::Pending => Poll::Pending,
997        }
998    }
999}
1000
1001/// Start a single connection in the background.
1002pub async fn run_stream<H, R>(
1003    config: Arc<Config>,
1004    mut stream: R,
1005    handler: H,
1006) -> Result<RunningSession<H>, H::Error>
1007where
1008    H: Handler + Send + 'static,
1009    R: AsyncRead + AsyncWrite + Unpin + Send + 'static,
1010{
1011    // Writing SSH id.
1012    let mut write_buffer = SSHBuffer::new();
1013    write_buffer.send_ssh_id(&config.as_ref().server_id);
1014    map_err!(stream.write_all(&write_buffer.buffer[..]).await)?;
1015
1016    // Reading SSH id and allocating a session.
1017    let mut stream = SshRead::new(stream);
1018    let (sender, receiver) = tokio::sync::mpsc::channel(config.event_buffer_size);
1019    let handle = server::session::Handle {
1020        sender,
1021        channel_buffer_size: config.channel_buffer_size,
1022    };
1023
1024    let common = read_ssh_id(config, &mut stream).await?;
1025    let mut session = Session {
1026        target_window_size: common.config.window_size,
1027        common,
1028        receiver,
1029        sender: handle.clone(),
1030        pending_reads: Vec::new(),
1031        pending_len: 0,
1032        channels: HashMap::new(),
1033        open_global_requests: VecDeque::new(),
1034        kex: SessionKexState::Idle,
1035    };
1036
1037    session.begin_rekey()?;
1038
1039    let join = russh_util::runtime::spawn(session.run(stream, handler));
1040
1041    Ok(RunningSession { handle, join })
1042}
1043
1044async fn read_ssh_id<R: AsyncRead + Unpin>(
1045    config: Arc<Config>,
1046    read: &mut SshRead<R>,
1047) -> Result<CommonSession<Arc<Config>>, Error> {
1048    let sshid = if let Some(t) = config.inactivity_timeout {
1049        tokio::time::timeout(t, read.read_ssh_id()).await??
1050    } else {
1051        read.read_ssh_id().await?
1052    };
1053
1054    let session = CommonSession {
1055        packet_writer: PacketWriter::clear(),
1056        // kex: Some(Kex::Init(kexinit)),
1057        auth_user: String::new(),
1058        auth_method: None, // Client only.
1059        auth_attempts: 0,
1060        remote_to_local: Box::new(clear::Key),
1061        encrypted: None,
1062        config,
1063        wants_reply: false,
1064        disconnected: false,
1065        buffer: CryptoVec::new(),
1066        strict_kex: false,
1067        alive_timeouts: 0,
1068        received_data: false,
1069        remote_sshid: sshid.into(),
1070    };
1071    Ok(session)
1072}
1073
1074async fn reply<H: Handler + Send>(
1075    session: &mut Session,
1076    handler: &mut H,
1077    pkt: &mut IncomingSshPacket,
1078) -> Result<(), H::Error> {
1079    if let Some(message_type) = pkt.buffer.first() {
1080        debug!(
1081            "< msg type {message_type:?}, seqn {:?}, len {}",
1082            pkt.seqn.0,
1083            pkt.buffer.len()
1084        );
1085        if session.common.strict_kex && session.common.encrypted.is_none() {
1086            let seqno = pkt.seqn.0 - 1; // was incremented after read()
1087            validate_client_msg_strict_kex(*message_type, seqno as usize)?;
1088        }
1089
1090        if [msg::IGNORE, msg::UNIMPLEMENTED, msg::DEBUG].contains(message_type) {
1091            return Ok(());
1092        }
1093    }
1094
1095    if pkt.buffer.first() == Some(&msg::KEXINIT) && session.kex == SessionKexState::Idle {
1096        // Not currently in a rekey but received KEXINIT
1097        info!("Client has initiated re-key");
1098        session.begin_rekey()?;
1099        // Kex will consume the packet right away
1100    }
1101
1102    let is_kex_msg = pkt.buffer.first().cloned().map(is_kex_msg).unwrap_or(false);
1103
1104    if is_kex_msg {
1105        if let SessionKexState::InProgress(kex) = session.kex.take() {
1106            let progress = kex
1107                .step(Some(pkt), &mut session.common.packet_writer, handler)
1108                .await?;
1109
1110            match progress {
1111                KexProgress::NeedsReply { kex, reset_seqn } => {
1112                    debug!("kex impl continues: {kex:?}");
1113                    session.kex = SessionKexState::InProgress(kex);
1114                    if reset_seqn {
1115                        debug!("kex impl requests seqno reset");
1116                        session.common.reset_seqn();
1117                    }
1118                }
1119                KexProgress::Done { newkeys, .. } => {
1120                    debug!("kex impl has completed");
1121                    session.common.strict_kex =
1122                        session.common.strict_kex || newkeys.names.strict_kex();
1123
1124                    if let Some(ref mut enc) = session.common.encrypted {
1125                        // This is a rekey
1126                        enc.last_rekey = Instant::now();
1127                        session.common.packet_writer.buffer().bytes = 0;
1128                        enc.flush_all_pending()?;
1129
1130                        let mut pending = std::mem::take(&mut session.pending_reads);
1131                        for p in pending.drain(..) {
1132                            session.process_packet(handler, &p).await?;
1133                        }
1134                        session.pending_reads = pending;
1135                        session.pending_len = 0;
1136                        session.common.newkeys(newkeys);
1137                        session.flush()?;
1138                    } else {
1139                        // This is the initial kex
1140
1141                        session.common.encrypted(
1142                            EncryptedState::WaitingAuthServiceRequest {
1143                                sent: false,
1144                                accepted: false,
1145                            },
1146                            newkeys,
1147                        );
1148
1149                        session.maybe_send_ext_info()?;
1150                    }
1151
1152                    session.kex = SessionKexState::Idle;
1153
1154                    if session.common.strict_kex {
1155                        pkt.seqn = Wrapping(0);
1156                    }
1157
1158                    debug!("kex done");
1159                }
1160            }
1161
1162            session.flush()?;
1163
1164            return Ok(());
1165        }
1166    }
1167
1168    // Handle key exchange/re-exchange.
1169    session.server_read_encrypted(handler, pkt).await
1170}