russh/server/mod.rs
1// Copyright 2016 Pierre-Étienne Meunier
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15
16//! # Writing servers
17//!
18//! There are two ways of accepting connections:
19//! * implement the [Server](server::Server) trait and let [run_on_socket](server::Server::run_on_socket)/[run_on_address](server::Server::run_on_address) handle everything
20//! * accept connections yourself and pass them to [run_stream](server::run_stream)
21//!
22//! In both cases, you'll first need to implement the [Handler](server::Handler) trait -
23//! this is where you'll handle various events.
24//!
25//! Check out the following examples:
26//!
27//! * [Server that forwards your input to all connected clients](https://github.com/warp-tech/russh/blob/main/russh/examples/echoserver.rs)
28//! * [Server handing channel processing off to a library (here, `russh-sftp`)](https://github.com/warp-tech/russh/blob/main/russh/examples/sftp_server.rs)
29//! * Serving `ratatui` based TUI app to clients: [per-client](https://github.com/warp-tech/russh/blob/main/russh/examples/ratatui_app.rs), [shared](https://github.com/warp-tech/russh/blob/main/russh/examples/ratatui_shared_app.rs)
30
31use std;
32use std::collections::{HashMap, VecDeque};
33use std::num::Wrapping;
34use std::pin::Pin;
35use std::sync::Arc;
36use std::task::{Context, Poll};
37
38use bytes::Bytes;
39use client::GexParams;
40use futures::future::Future;
41use log::{debug, error, info, warn};
42use msg::{is_kex_msg, validate_client_msg_strict_kex};
43use russh_util::runtime::JoinHandle;
44use russh_util::time::Instant;
45use ssh_key::{Certificate, PrivateKey};
46use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
47use tokio::net::{TcpListener, ToSocketAddrs};
48use tokio::pin;
49use tokio::sync::{broadcast, mpsc};
50
51use crate::cipher::{clear, OpeningKey};
52use crate::kex::dh::groups::{DhGroup, BUILTIN_SAFE_DH_GROUPS, DH_GROUP14};
53use crate::kex::{KexProgress, SessionKexState};
54use crate::session::*;
55use crate::ssh_read::*;
56use crate::sshbuffer::*;
57use crate::{*};
58
59mod kex;
60mod session;
61pub use self::session::*;
62mod encrypted;
63
64/// Configuration of a server.
65pub struct Config {
66 /// The server ID string sent at the beginning of the protocol.
67 pub server_id: SshId,
68 /// Authentication methods proposed to the client.
69 pub methods: auth::MethodSet,
70 /// Authentication rejections must happen in constant time for
71 /// security reasons. Russh does not handle this by default.
72 pub auth_rejection_time: std::time::Duration,
73 /// Authentication rejection time override for the initial "none" auth attempt.
74 /// OpenSSH clients will send an initial "none" auth to probe for authentication methods.
75 pub auth_rejection_time_initial: Option<std::time::Duration>,
76 /// The server's keys. The first key pair in the client's preference order will be chosen.
77 pub keys: Vec<PrivateKey>,
78 /// The bytes and time limits before key re-exchange.
79 pub limits: Limits,
80 /// The initial size of a channel (used for flow control).
81 pub window_size: u32,
82 /// The maximal size of a single packet.
83 pub maximum_packet_size: u32,
84 /// Buffer size for each channel (a number of unprocessed messages to store before propagating backpressure to the TCP stream)
85 pub channel_buffer_size: usize,
86 /// Internal event buffer size
87 pub event_buffer_size: usize,
88 /// Lists of preferred algorithms.
89 pub preferred: Preferred,
90 /// Maximal number of allowed authentication attempts.
91 pub max_auth_attempts: usize,
92 /// Time after which the connection is garbage-collected.
93 pub inactivity_timeout: Option<std::time::Duration>,
94 /// If nothing is received from the client for this amount of time, send a keepalive message.
95 pub keepalive_interval: Option<std::time::Duration>,
96 /// If this many keepalives have been sent without reply, close the connection.
97 pub keepalive_max: usize,
98 /// If active, invoke `set_nodelay(true)` on client sockets; disabled by default (i.e. Nagle's algorithm is active).
99 pub nodelay: bool,
100}
101
102impl Default for Config {
103 fn default() -> Config {
104 Config {
105 server_id: SshId::Standard(format!(
106 "SSH-2.0-{}_{}",
107 env!("CARGO_PKG_NAME"),
108 env!("CARGO_PKG_VERSION")
109 )),
110 methods: auth::MethodSet::all(),
111 auth_rejection_time: std::time::Duration::from_secs(1),
112 auth_rejection_time_initial: None,
113 keys: Vec::new(),
114 window_size: 2097152,
115 maximum_packet_size: 32768,
116 channel_buffer_size: 100,
117 event_buffer_size: 10,
118 limits: Limits::default(),
119 preferred: Default::default(),
120 max_auth_attempts: 10,
121 inactivity_timeout: Some(std::time::Duration::from_secs(600)),
122 keepalive_interval: None,
123 keepalive_max: 3,
124 nodelay: false,
125 }
126 }
127}
128
129impl Debug for Config {
130 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131 // display everything except the private keys
132 f.debug_struct("Config")
133 .field("server_id", &self.server_id)
134 .field("methods", &self.methods)
135 .field("auth_rejection_time", &self.auth_rejection_time)
136 .field(
137 "auth_rejection_time_initial",
138 &self.auth_rejection_time_initial,
139 )
140 .field("keys", &"***")
141 .field("window_size", &self.window_size)
142 .field("maximum_packet_size", &self.maximum_packet_size)
143 .field("channel_buffer_size", &self.channel_buffer_size)
144 .field("event_buffer_size", &self.event_buffer_size)
145 .field("limits", &self.limits)
146 .field("preferred", &self.preferred)
147 .field("max_auth_attempts", &self.max_auth_attempts)
148 .field("inactivity_timeout", &self.inactivity_timeout)
149 .field("keepalive_interval", &self.keepalive_interval)
150 .field("keepalive_max", &self.keepalive_max)
151 .finish()
152 }
153}
154
155/// A client's response in a challenge-response authentication.
156///
157/// You should iterate it to get `&[u8]` response slices.
158pub struct Response<'a>(&'a mut (dyn Iterator<Item = Option<Bytes>> + Send));
159
160impl Iterator for Response<'_> {
161 type Item = Bytes;
162 fn next(&mut self) -> Option<Self::Item> {
163 self.0.next().flatten()
164 }
165}
166
167use std::borrow::Cow;
168/// An authentication result, in a challenge-response authentication.
169#[derive(Debug, PartialEq, Eq)]
170pub enum Auth {
171 /// Reject the authentication request.
172 Reject {
173 proceed_with_methods: Option<MethodSet>,
174 partial_success: bool,
175 },
176 /// Accept the authentication request.
177 Accept,
178
179 /// Method was not accepted, but no other check was performed.
180 UnsupportedMethod,
181
182 /// Partially accept the challenge-response authentication
183 /// request, providing more instructions for the client to follow.
184 Partial {
185 /// Name of this challenge.
186 name: Cow<'static, str>,
187 /// Instructions for this challenge.
188 instructions: Cow<'static, str>,
189 /// A number of prompts to the user. Each prompt has a `bool`
190 /// indicating whether the terminal must echo the characters
191 /// typed by the user.
192 prompts: Cow<'static, [(Cow<'static, str>, bool)]>,
193 },
194}
195
196impl Auth {
197 pub fn reject() -> Self {
198 Auth::Reject {
199 proceed_with_methods: None,
200 partial_success: false,
201 }
202 }
203}
204
205/// Server handler. Each client will have their own handler.
206///
207/// Note: this is an async trait. The trait functions return `impl Future`,
208/// and you can simply define them as `async fn` instead.
209#[cfg_attr(feature = "async-trait", async_trait::async_trait)]
210pub trait Handler: Sized {
211 type Error: From<crate::Error> + Send;
212
213 /// Check authentication using the "none" method. Russh makes
214 /// sure rejection happens in time `config.auth_rejection_time`,
215 /// except if this method takes more than that.
216 #[allow(unused_variables)]
217 fn auth_none(&mut self, user: &str) -> impl Future<Output = Result<Auth, Self::Error>> + Send {
218 async { Ok(Auth::reject()) }
219 }
220
221 /// Check authentication using the "password" method. Russh
222 /// makes sure rejection happens in time
223 /// `config.auth_rejection_time`, except if this method takes more
224 /// than that.
225 #[allow(unused_variables)]
226 fn auth_password(
227 &mut self,
228 user: &str,
229 password: &str,
230 ) -> impl Future<Output = Result<Auth, Self::Error>> + Send {
231 async { Ok(Auth::reject()) }
232 }
233
234 /// Check authentication using the "publickey" method. This method
235 /// should just check whether the public key matches the
236 /// authorized ones. Russh then checks the signature. If the key
237 /// is unknown, or the signature is invalid, Russh guarantees
238 /// that rejection happens in constant time
239 /// `config.auth_rejection_time`, except if this method takes more
240 /// time than that.
241 #[allow(unused_variables)]
242 fn auth_publickey_offered(
243 &mut self,
244 user: &str,
245 public_key: &ssh_key::PublicKey,
246 ) -> impl Future<Output = Result<Auth, Self::Error>> + Send {
247 async { Ok(Auth::Accept) }
248 }
249
250 /// Check authentication using the "publickey" method. This method
251 /// is called after the signature has been verified and key
252 /// ownership has been confirmed.
253 /// Russh guarantees that rejection happens in constant time
254 /// `config.auth_rejection_time`, except if this method takes more
255 /// time than that.
256 #[allow(unused_variables)]
257 fn auth_publickey(
258 &mut self,
259 user: &str,
260 public_key: &ssh_key::PublicKey,
261 ) -> impl Future<Output = Result<Auth, Self::Error>> + Send {
262 async { Ok(Auth::reject()) }
263 }
264
265 /// Check authentication using an OpenSSH certificate. This method
266 /// is called after the signature has been verified and key
267 /// ownership has been confirmed.
268 /// Russh guarantees that rejection happens in constant time
269 /// `config.auth_rejection_time`, except if this method takes more
270 /// time than that.
271 #[allow(unused_variables)]
272 fn auth_openssh_certificate(
273 &mut self,
274 user: &str,
275 certificate: &Certificate,
276 ) -> impl Future<Output = Result<Auth, Self::Error>> + Send {
277 async { Ok(Auth::reject()) }
278 }
279
280 /// Check authentication using the "keyboard-interactive"
281 /// method. Russh makes sure rejection happens in time
282 /// `config.auth_rejection_time`, except if this method takes more
283 /// than that.
284 #[allow(unused_variables)]
285 fn auth_keyboard_interactive<'a>(
286 &'a mut self,
287 user: &str,
288 submethods: &str,
289 response: Option<Response<'a>>,
290 ) -> impl Future<Output = Result<Auth, Self::Error>> + Send {
291 async { Ok(Auth::reject()) }
292 }
293
294 /// Called when authentication succeeds for a session.
295 #[allow(unused_variables)]
296 fn auth_succeeded(
297 &mut self,
298 session: &mut Session,
299 ) -> impl Future<Output = Result<(), Self::Error>> + Send {
300 async { Ok(()) }
301 }
302
303 /// Called when authentication starts but before it is successful.
304 /// Return value is an authentication banner, usually a warning message shown to the client.
305 #[allow(unused_variables)]
306 fn authentication_banner(
307 &mut self,
308 ) -> impl Future<Output = Result<Option<String>, Self::Error>> + Send {
309 async { Ok(None) }
310 }
311
312 /// Called when the client closes a channel.
313 #[allow(unused_variables)]
314 fn channel_close(
315 &mut self,
316 channel: ChannelId,
317 session: &mut Session,
318 ) -> impl Future<Output = Result<(), Self::Error>> + Send {
319 async { Ok(()) }
320 }
321
322 /// Called when the client sends EOF to a channel.
323 #[allow(unused_variables)]
324 fn channel_eof(
325 &mut self,
326 channel: ChannelId,
327 session: &mut Session,
328 ) -> impl Future<Output = Result<(), Self::Error>> + Send {
329 async { Ok(()) }
330 }
331
332 /// Called when a new session channel is created.
333 /// Return value indicates whether the channel request should be granted.
334 #[allow(unused_variables)]
335 fn channel_open_session(
336 &mut self,
337 channel: Channel<Msg>,
338 session: &mut Session,
339 ) -> impl Future<Output = Result<bool, Self::Error>> + Send {
340 async { Ok(false) }
341 }
342
343 /// Called when a new X11 channel is created.
344 /// Return value indicates whether the channel request should be granted.
345 #[allow(unused_variables)]
346 fn channel_open_x11(
347 &mut self,
348 channel: Channel<Msg>,
349 originator_address: &str,
350 originator_port: u32,
351 session: &mut Session,
352 ) -> impl Future<Output = Result<bool, Self::Error>> + Send {
353 async { Ok(false) }
354 }
355
356 /// Called when a new direct TCP/IP ("local TCP forwarding") channel is opened.
357 /// Return value indicates whether the channel request should be granted.
358 #[allow(unused_variables)]
359 fn channel_open_direct_tcpip(
360 &mut self,
361 channel: Channel<Msg>,
362 host_to_connect: &str,
363 port_to_connect: u32,
364 originator_address: &str,
365 originator_port: u32,
366 session: &mut Session,
367 ) -> impl Future<Output = Result<bool, Self::Error>> + Send {
368 async { Ok(false) }
369 }
370
371 /// Called when a new remote forwarded TCP connection comes in.
372 /// <https://www.rfc-editor.org/rfc/rfc4254#section-7>
373 #[allow(unused_variables)]
374 fn channel_open_forwarded_tcpip(
375 &mut self,
376 channel: Channel<Msg>,
377 host_to_connect: &str,
378 port_to_connect: u32,
379 originator_address: &str,
380 originator_port: u32,
381 session: &mut Session,
382 ) -> impl Future<Output = Result<bool, Self::Error>> + Send {
383 async { Ok(false) }
384 }
385
386 /// Called when a new direct-streamlocal ("local UNIX socket forwarding") channel is created.
387 /// Return value indicates whether the channel request should be granted.
388 #[allow(unused_variables)]
389 fn channel_open_direct_streamlocal(
390 &mut self,
391 channel: Channel<Msg>,
392 socket_path: &str,
393 session: &mut Session,
394 ) -> impl Future<Output = Result<bool, Self::Error>> + Send {
395 async { Ok(false) }
396 }
397
398 /// Called when the client confirmed our request to open a
399 /// channel. A channel can only be written to after receiving this
400 /// message (this library panics otherwise).
401 #[allow(unused_variables)]
402 fn channel_open_confirmation(
403 &mut self,
404 id: ChannelId,
405 max_packet_size: u32,
406 window_size: u32,
407 session: &mut Session,
408 ) -> impl Future<Output = Result<(), Self::Error>> + Send {
409 async { Ok(()) }
410 }
411
412 /// Called when a data packet is received. A response can be
413 /// written to the `response` argument.
414 #[allow(unused_variables)]
415 fn data(
416 &mut self,
417 channel: ChannelId,
418 data: &[u8],
419 session: &mut Session,
420 ) -> impl Future<Output = Result<(), Self::Error>> + Send {
421 async { Ok(()) }
422 }
423
424 /// Called when an extended data packet is received. Code 1 means
425 /// that this packet comes from stderr, other codes are not
426 /// defined (see
427 /// [RFC4254](https://tools.ietf.org/html/rfc4254#section-5.2)).
428 #[allow(unused_variables)]
429 fn extended_data(
430 &mut self,
431 channel: ChannelId,
432 code: u32,
433 data: &[u8],
434 session: &mut Session,
435 ) -> impl Future<Output = Result<(), Self::Error>> + Send {
436 async { Ok(()) }
437 }
438
439 /// Called when the network window is adjusted, meaning that we
440 /// can send more bytes.
441 #[allow(unused_variables)]
442 fn window_adjusted(
443 &mut self,
444 channel: ChannelId,
445 new_size: u32,
446 session: &mut Session,
447 ) -> impl Future<Output = Result<(), Self::Error>> + Send {
448 async { Ok(()) }
449 }
450
451 /// Called when this server adjusts the network window. Return the
452 /// next target window.
453 #[allow(unused_variables)]
454 fn adjust_window(&mut self, channel: ChannelId, current: u32) -> u32 {
455 current
456 }
457
458 /// The client requests a pseudo-terminal with the given
459 /// specifications.
460 ///
461 /// **Note:** Success or failure should be communicated to the client by calling
462 /// `session.channel_success(channel)` or `session.channel_failure(channel)` respectively. For
463 /// instance:
464 ///
465 /// ```ignore
466 /// async fn pty_request(
467 /// &mut self,
468 /// channel: ChannelId,
469 /// term: &str,
470 /// col_width: u32,
471 /// row_height: u32,
472 /// pix_width: u32,
473 /// pix_height: u32,
474 /// modes: &[(Pty, u32)],
475 /// session: &mut Session,
476 /// ) -> Result<(), Self::Error> {
477 /// session.channel_success(channel);
478 /// Ok(())
479 /// }
480 /// ```
481 #[allow(unused_variables, clippy::too_many_arguments)]
482 fn pty_request(
483 &mut self,
484 channel: ChannelId,
485 term: &str,
486 col_width: u32,
487 row_height: u32,
488 pix_width: u32,
489 pix_height: u32,
490 modes: &[(Pty, u32)],
491 session: &mut Session,
492 ) -> impl Future<Output = Result<(), Self::Error>> + Send {
493 async { Ok(()) }
494 }
495
496 /// The client requests an X11 connection.
497 ///
498 /// **Note:** Success or failure should be communicated to the client by calling
499 /// `session.channel_success(channel)` or `session.channel_failure(channel)` respectively. For
500 /// instance:
501 ///
502 /// ```ignore
503 /// async fn x11_request(
504 /// &mut self,
505 /// channel: ChannelId,
506 /// single_connection: bool,
507 /// x11_auth_protocol: &str,
508 /// x11_auth_cookie: &str,
509 /// x11_screen_number: u32,
510 /// session: &mut Session,
511 /// ) -> Result<(), Self::Error> {
512 /// session.channel_success(channel);
513 /// Ok(())
514 /// }
515 /// ```
516 #[allow(unused_variables)]
517 fn x11_request(
518 &mut self,
519 channel: ChannelId,
520 single_connection: bool,
521 x11_auth_protocol: &str,
522 x11_auth_cookie: &str,
523 x11_screen_number: u32,
524 session: &mut Session,
525 ) -> impl Future<Output = Result<(), Self::Error>> + Send {
526 async { Ok(()) }
527 }
528
529 /// The client wants to set the given environment variable. Check
530 /// these carefully, as it is dangerous to allow any variable
531 /// environment to be set.
532 ///
533 /// **Note:** Success or failure should be communicated to the client by calling
534 /// `session.channel_success(channel)` or `session.channel_failure(channel)` respectively. For
535 /// instance:
536 ///
537 /// ```ignore
538 /// async fn env_request(
539 /// &mut self,
540 /// channel: ChannelId,
541 /// variable_name: &str,
542 /// variable_value: &str,
543 /// session: &mut Session,
544 /// ) -> Result<(), Self::Error> {
545 /// session.channel_success(channel);
546 /// Ok(())
547 /// }
548 /// ```
549 #[allow(unused_variables)]
550 fn env_request(
551 &mut self,
552 channel: ChannelId,
553 variable_name: &str,
554 variable_value: &str,
555 session: &mut Session,
556 ) -> impl Future<Output = Result<(), Self::Error>> + Send {
557 async { Ok(()) }
558 }
559
560 /// The client requests a shell.
561 ///
562 /// **Note:** Success or failure should be communicated to the client by calling
563 /// `session.channel_success(channel)` or `session.channel_failure(channel)` respectively. For
564 /// instance:
565 ///
566 /// ```ignore
567 /// async fn shell_request(
568 /// &mut self,
569 /// channel: ChannelId,
570 /// session: &mut Session,
571 /// ) -> Result<(), Self::Error> {
572 /// session.channel_success(channel);
573 /// Ok(())
574 /// }
575 /// ```
576 #[allow(unused_variables)]
577 fn shell_request(
578 &mut self,
579 channel: ChannelId,
580 session: &mut Session,
581 ) -> impl Future<Output = Result<(), Self::Error>> + Send {
582 async { Ok(()) }
583 }
584
585 /// The client sends a command to execute, to be passed to a
586 /// shell. Make sure to check the command before doing so.
587 ///
588 /// **Note:** Success or failure should be communicated to the client by calling
589 /// `session.channel_success(channel)` or `session.channel_failure(channel)` respectively. For
590 /// instance:
591 ///
592 /// ```ignore
593 /// async fn exec_request(
594 /// &mut self,
595 /// channel: ChannelId,
596 /// data: &[u8],
597 /// session: &mut Session,
598 /// ) -> Result<(), Self::Error> {
599 /// session.channel_success(channel);
600 /// Ok(())
601 /// }
602 /// ```
603 #[allow(unused_variables)]
604 fn exec_request(
605 &mut self,
606 channel: ChannelId,
607 data: &[u8],
608 session: &mut Session,
609 ) -> impl Future<Output = Result<(), Self::Error>> + Send {
610 async { Ok(()) }
611 }
612
613 /// The client asks to start the subsystem with the given name
614 /// (such as sftp).
615 ///
616 /// **Note:** Success or failure should be communicated to the client by calling
617 /// `session.channel_success(channel)` or `session.channel_failure(channel)` respectively. For
618 /// instance:
619 ///
620 /// ```ignore
621 /// async fn subsystem_request(
622 /// &mut self,
623 /// channel: ChannelId,
624 /// name: &str,
625 /// session: &mut Session,
626 /// ) -> Result<(), Self::Error> {
627 /// session.channel_success(channel);
628 /// Ok(())
629 /// }
630 /// ```
631 #[allow(unused_variables)]
632 fn subsystem_request(
633 &mut self,
634 channel: ChannelId,
635 name: &str,
636 session: &mut Session,
637 ) -> impl Future<Output = Result<(), Self::Error>> + Send {
638 async { Ok(()) }
639 }
640
641 /// The client's pseudo-terminal window size has changed.
642 ///
643 /// **Note:** Success or failure should be communicated to the client by calling
644 /// `session.channel_success(channel)` or `session.channel_failure(channel)` respectively. For
645 /// instance:
646 ///
647 /// ```ignore
648 /// async fn window_change_request(
649 /// &mut self,
650 /// channel: ChannelId,
651 /// col_width: u32,
652 /// row_height: u32,
653 /// pix_width: u32,
654 /// pix_height: u32,
655 /// session: &mut Session,
656 /// ) -> Result<(), Self::Error> {
657 /// session.channel_success(channel);
658 /// Ok(())
659 /// }
660 /// ```
661 #[allow(unused_variables)]
662 fn window_change_request(
663 &mut self,
664 channel: ChannelId,
665 col_width: u32,
666 row_height: u32,
667 pix_width: u32,
668 pix_height: u32,
669 session: &mut Session,
670 ) -> impl Future<Output = Result<(), Self::Error>> + Send {
671 async { Ok(()) }
672 }
673
674 /// The client requests OpenSSH agent forwarding
675 ///
676 /// **Note:** Success or failure should be communicated to the client by calling
677 /// `session.channel_success(channel)` or `session.channel_failure(channel)` respectively. For
678 /// instance:
679 ///
680 /// ```ignore
681 /// async fn agent_request(
682 /// &mut self,
683 /// channel: ChannelId,
684 /// session: &mut Session,
685 /// ) -> Result<bool, Self::Error> {
686 /// session.channel_success(channel);
687 /// Ok(())
688 /// }
689 /// ```
690 #[allow(unused_variables)]
691 fn agent_request(
692 &mut self,
693 channel: ChannelId,
694 session: &mut Session,
695 ) -> impl Future<Output = Result<bool, Self::Error>> + Send {
696 async { Ok(false) }
697 }
698
699 /// The client is sending a signal (usually to pass to the
700 /// currently running process).
701 #[allow(unused_variables)]
702 fn signal(
703 &mut self,
704 channel: ChannelId,
705 signal: Sig,
706 session: &mut Session,
707 ) -> impl Future<Output = Result<(), Self::Error>> + Send {
708 async { Ok(()) }
709 }
710
711 /// Used for reverse-forwarding ports, see
712 /// [RFC4254](https://tools.ietf.org/html/rfc4254#section-7).
713 /// If `port` is 0, you should set it to the allocated port number.
714 #[allow(unused_variables)]
715 fn tcpip_forward(
716 &mut self,
717 address: &str,
718 port: &mut u32,
719 session: &mut Session,
720 ) -> impl Future<Output = Result<bool, Self::Error>> + Send {
721 async { Ok(false) }
722 }
723
724 /// Used to stop the reverse-forwarding of a port, see
725 /// [RFC4254](https://tools.ietf.org/html/rfc4254#section-7).
726 #[allow(unused_variables)]
727 fn cancel_tcpip_forward(
728 &mut self,
729 address: &str,
730 port: u32,
731 session: &mut Session,
732 ) -> impl Future<Output = Result<bool, Self::Error>> + Send {
733 async { Ok(false) }
734 }
735
736 #[allow(unused_variables)]
737 fn streamlocal_forward(
738 &mut self,
739 socket_path: &str,
740 session: &mut Session,
741 ) -> impl Future<Output = Result<bool, Self::Error>> + Send {
742 async { Ok(false) }
743 }
744
745 #[allow(unused_variables)]
746 fn cancel_streamlocal_forward(
747 &mut self,
748 socket_path: &str,
749 session: &mut Session,
750 ) -> impl Future<Output = Result<bool, Self::Error>> + Send {
751 async { Ok(false) }
752 }
753
754 /// Override when enabling the `diffie-hellman-group-exchange-*` key exchange methods.
755 /// Should return a Diffie-Hellman group with a safe prime whose length is
756 /// between `gex_params.min_group_size` and `gex_params.max_group_size` and
757 /// (if possible) over and as close as possible to `gex_params.preferred_group_size`.
758 ///
759 /// OpenSSH uses a pre-generated database of safe primes stored in `/etc/ssh/moduli`
760 ///
761 /// The default implementation picks a group from a very short static list
762 /// of built-in standard groups and is not really taking advantage of the security
763 /// offered by these kex methods.
764 ///
765 /// See https://datatracker.ietf.org/doc/html/rfc4419#section-3
766 #[allow(unused_variables)]
767 fn lookup_dh_gex_group(
768 &mut self,
769 gex_params: &GexParams,
770 ) -> impl Future<Output = Result<Option<DhGroup>, Self::Error>> + Send {
771 async {
772 let mut best_group = &DH_GROUP14;
773
774 // Find _some_ matching group
775 for group in BUILTIN_SAFE_DH_GROUPS.iter() {
776 if group.bit_size() >= gex_params.min_group_size()
777 && group.bit_size() <= gex_params.max_group_size()
778 {
779 best_group = *group;
780 break;
781 }
782 }
783
784 // Find _closest_ matching group
785 for group in BUILTIN_SAFE_DH_GROUPS.iter() {
786 if group.bit_size() > gex_params.preferred_group_size() {
787 best_group = *group;
788 break;
789 }
790 }
791
792 Ok(Some(best_group.clone()))
793 }
794 }
795}
796
797pub struct RunningServerHandle {
798 shutdown_tx: broadcast::Sender<String>,
799}
800
801impl RunningServerHandle {
802 /// Request graceful server shutdown.
803 /// Starts the shutdown and immediately returns.
804 /// To wait for all the clients to disconnect, await `RunningServer` .
805 pub fn shutdown(&self, reason: String) {
806 let _ = self.shutdown_tx.send(reason);
807 }
808}
809
810pub struct RunningServer<F: Future<Output = std::io::Result<()>> + Unpin + Send> {
811 inner: F,
812 shutdown_tx: broadcast::Sender<String>,
813}
814
815impl<F: Future<Output = std::io::Result<()>> + Unpin + Send> RunningServer<F> {
816 pub fn handle(&self) -> RunningServerHandle {
817 RunningServerHandle {
818 shutdown_tx: self.shutdown_tx.clone(),
819 }
820 }
821}
822
823impl<F: Future<Output = std::io::Result<()>> + Unpin + Send> Future for RunningServer<F> {
824 type Output = std::io::Result<()>;
825
826 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
827 Future::poll(Pin::new(&mut self.inner), cx)
828 }
829}
830
831#[cfg_attr(feature = "async-trait", async_trait::async_trait)]
832/// Trait used to create new handlers when clients connect.
833pub trait Server {
834 /// The type of handlers.
835 type Handler: Handler + Send + 'static;
836 /// Called when a new client connects.
837 fn new_client(&mut self, peer_addr: Option<std::net::SocketAddr>) -> Self::Handler;
838 /// Called when an active connection fails.
839 fn handle_session_error(&mut self, _error: <Self::Handler as Handler>::Error) {}
840
841 /// Run a server on a specified `tokio::net::TcpListener`. Useful when dropping
842 /// privileges immediately after socket binding, for example.
843 fn run_on_socket(
844 &mut self,
845 config: Arc<Config>,
846 socket: &TcpListener,
847 ) -> RunningServer<impl Future<Output = std::io::Result<()>> + Unpin + Send>
848 where
849 Self: Send,
850 {
851 let (shutdown_tx, mut shutdown_rx) = broadcast::channel(1);
852 let shutdown_tx2 = shutdown_tx.clone();
853
854 let fut = async move {
855 if config.maximum_packet_size > 65535 {
856 error!(
857 "Maximum packet size ({:?}) should not larger than a TCP packet (65535)",
858 config.maximum_packet_size
859 );
860 }
861
862 let (error_tx, mut error_rx) = mpsc::unbounded_channel();
863
864 loop {
865 tokio::select! {
866 _ = shutdown_rx.recv() => {
867 debug!("Server shutdown requested");
868 return Ok(());
869 },
870 accept_result = socket.accept() => {
871 match accept_result {
872 Ok((socket, peer_addr)) => {
873 let mut shutdown_rx = shutdown_tx2.subscribe();
874
875 let config = config.clone();
876 // NOTE: For backwards compatibility, we keep the Option signature as changing it would be a breaking change.
877 let handler = self.new_client(Some(peer_addr));
878 let error_tx = error_tx.clone();
879
880 russh_util::runtime::spawn(async move {
881 if config.nodelay {
882 if let Err(e) = socket.set_nodelay(true) {
883 warn!("set_nodelay() failed: {e:?}");
884 }
885 }
886
887 let session = match run_stream(config, socket, handler).await {
888 Ok(s) => s,
889 Err(e) => {
890 debug!("Connection setup failed");
891 let _ = error_tx.send(e);
892 return
893 }
894 };
895
896 let handle = session.handle();
897
898 tokio::select! {
899 reason = shutdown_rx.recv() => {
900 if handle.disconnect(
901 Disconnect::ByApplication,
902 reason.unwrap_or_else(|_| "".into()),
903 "".into()
904 ).await.is_err() {
905 debug!("Failed to send disconnect message");
906 }
907 },
908 result = session => {
909 if let Err(e) = result {
910 debug!("Connection closed with error");
911 let _ = error_tx.send(e);
912 } else {
913 debug!("Connection closed");
914 }
915 }
916 }
917 });
918 }
919 Err(e) => {
920 return Err(e);
921 }
922 }
923 },
924
925 Some(error) = error_rx.recv() => {
926 self.handle_session_error(error);
927 }
928 }
929 }
930 };
931
932 RunningServer {
933 inner: Box::pin(fut),
934 shutdown_tx,
935 }
936 }
937
938 /// Run a server.
939 /// This is a convenience function; consider using `run_on_socket` for more control.
940 fn run_on_address<A: ToSocketAddrs + Send>(
941 &mut self,
942 config: Arc<Config>,
943 addrs: A,
944 ) -> impl Future<Output = std::io::Result<()>> + Send
945 where
946 Self: Send,
947 {
948 async {
949 let socket = TcpListener::bind(addrs).await?;
950 self.run_on_socket(config, &socket).await?;
951 Ok(())
952 }
953 }
954}
955
956use std::cell::RefCell;
957thread_local! {
958 static B1: RefCell<CryptoVec> = RefCell::new(CryptoVec::new());
959 static B2: RefCell<CryptoVec> = RefCell::new(CryptoVec::new());
960}
961
962async fn start_reading<R: AsyncRead + Unpin>(
963 mut stream_read: R,
964 mut buffer: SSHBuffer,
965 mut cipher: Box<dyn OpeningKey + Send>,
966) -> Result<(usize, R, SSHBuffer, Box<dyn OpeningKey + Send>), Error> {
967 buffer.buffer.clear();
968 let n = cipher::read(&mut stream_read, &mut buffer, &mut *cipher).await?;
969 Ok((n, stream_read, buffer, cipher))
970}
971
972/// An active server session returned by [run_stream].
973///
974/// Implements [Future] and can be awaited to wait for the session to finish.
975pub struct RunningSession<H: Handler> {
976 handle: Handle,
977 join: JoinHandle<Result<(), H::Error>>,
978}
979
980impl<H: Handler> RunningSession<H> {
981 /// Returns a new handle for the session.
982 pub fn handle(&self) -> Handle {
983 self.handle.clone()
984 }
985}
986
987impl<H: Handler> Future for RunningSession<H> {
988 type Output = Result<(), H::Error>;
989 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
990 match Future::poll(Pin::new(&mut self.join), cx) {
991 Poll::Ready(r) => Poll::Ready(match r {
992 Ok(Ok(x)) => Ok(x),
993 Err(e) => Err(crate::Error::from(e).into()),
994 Ok(Err(e)) => Err(e),
995 }),
996 Poll::Pending => Poll::Pending,
997 }
998 }
999}
1000
1001/// Start a single connection in the background.
1002pub async fn run_stream<H, R>(
1003 config: Arc<Config>,
1004 mut stream: R,
1005 handler: H,
1006) -> Result<RunningSession<H>, H::Error>
1007where
1008 H: Handler + Send + 'static,
1009 R: AsyncRead + AsyncWrite + Unpin + Send + 'static,
1010{
1011 // Writing SSH id.
1012 let mut write_buffer = SSHBuffer::new();
1013 write_buffer.send_ssh_id(&config.as_ref().server_id);
1014 map_err!(stream.write_all(&write_buffer.buffer[..]).await)?;
1015
1016 // Reading SSH id and allocating a session.
1017 let mut stream = SshRead::new(stream);
1018 let (sender, receiver) = tokio::sync::mpsc::channel(config.event_buffer_size);
1019 let handle = server::session::Handle {
1020 sender,
1021 channel_buffer_size: config.channel_buffer_size,
1022 };
1023
1024 let common = read_ssh_id(config, &mut stream).await?;
1025 let mut session = Session {
1026 target_window_size: common.config.window_size,
1027 common,
1028 receiver,
1029 sender: handle.clone(),
1030 pending_reads: Vec::new(),
1031 pending_len: 0,
1032 channels: HashMap::new(),
1033 open_global_requests: VecDeque::new(),
1034 kex: SessionKexState::Idle,
1035 };
1036
1037 session.begin_rekey()?;
1038
1039 let join = russh_util::runtime::spawn(session.run(stream, handler));
1040
1041 Ok(RunningSession { handle, join })
1042}
1043
1044async fn read_ssh_id<R: AsyncRead + Unpin>(
1045 config: Arc<Config>,
1046 read: &mut SshRead<R>,
1047) -> Result<CommonSession<Arc<Config>>, Error> {
1048 let sshid = if let Some(t) = config.inactivity_timeout {
1049 tokio::time::timeout(t, read.read_ssh_id()).await??
1050 } else {
1051 read.read_ssh_id().await?
1052 };
1053
1054 let session = CommonSession {
1055 packet_writer: PacketWriter::clear(),
1056 // kex: Some(Kex::Init(kexinit)),
1057 auth_user: String::new(),
1058 auth_method: None, // Client only.
1059 auth_attempts: 0,
1060 remote_to_local: Box::new(clear::Key),
1061 encrypted: None,
1062 config,
1063 wants_reply: false,
1064 disconnected: false,
1065 buffer: CryptoVec::new(),
1066 strict_kex: false,
1067 alive_timeouts: 0,
1068 received_data: false,
1069 remote_sshid: sshid.into(),
1070 };
1071 Ok(session)
1072}
1073
1074async fn reply<H: Handler + Send>(
1075 session: &mut Session,
1076 handler: &mut H,
1077 pkt: &mut IncomingSshPacket,
1078) -> Result<(), H::Error> {
1079 if let Some(message_type) = pkt.buffer.first() {
1080 debug!(
1081 "< msg type {message_type:?}, seqn {:?}, len {}",
1082 pkt.seqn.0,
1083 pkt.buffer.len()
1084 );
1085 if session.common.strict_kex && session.common.encrypted.is_none() {
1086 let seqno = pkt.seqn.0 - 1; // was incremented after read()
1087 validate_client_msg_strict_kex(*message_type, seqno as usize)?;
1088 }
1089
1090 if [msg::IGNORE, msg::UNIMPLEMENTED, msg::DEBUG].contains(message_type) {
1091 return Ok(());
1092 }
1093 }
1094
1095 if pkt.buffer.first() == Some(&msg::KEXINIT) && session.kex == SessionKexState::Idle {
1096 // Not currently in a rekey but received KEXINIT
1097 info!("Client has initiated re-key");
1098 session.begin_rekey()?;
1099 // Kex will consume the packet right away
1100 }
1101
1102 let is_kex_msg = pkt.buffer.first().cloned().map(is_kex_msg).unwrap_or(false);
1103
1104 if is_kex_msg {
1105 if let SessionKexState::InProgress(kex) = session.kex.take() {
1106 let progress = kex
1107 .step(Some(pkt), &mut session.common.packet_writer, handler)
1108 .await?;
1109
1110 match progress {
1111 KexProgress::NeedsReply { kex, reset_seqn } => {
1112 debug!("kex impl continues: {kex:?}");
1113 session.kex = SessionKexState::InProgress(kex);
1114 if reset_seqn {
1115 debug!("kex impl requests seqno reset");
1116 session.common.reset_seqn();
1117 }
1118 }
1119 KexProgress::Done { newkeys, .. } => {
1120 debug!("kex impl has completed");
1121 session.common.strict_kex =
1122 session.common.strict_kex || newkeys.names.strict_kex();
1123
1124 if let Some(ref mut enc) = session.common.encrypted {
1125 // This is a rekey
1126 enc.last_rekey = Instant::now();
1127 session.common.packet_writer.buffer().bytes = 0;
1128 enc.flush_all_pending()?;
1129
1130 let mut pending = std::mem::take(&mut session.pending_reads);
1131 for p in pending.drain(..) {
1132 session.process_packet(handler, &p).await?;
1133 }
1134 session.pending_reads = pending;
1135 session.pending_len = 0;
1136 session.common.newkeys(newkeys);
1137 session.flush()?;
1138 } else {
1139 // This is the initial kex
1140
1141 session.common.encrypted(
1142 EncryptedState::WaitingAuthServiceRequest {
1143 sent: false,
1144 accepted: false,
1145 },
1146 newkeys,
1147 );
1148
1149 session.maybe_send_ext_info()?;
1150 }
1151
1152 session.kex = SessionKexState::Idle;
1153
1154 if session.common.strict_kex {
1155 pkt.seqn = Wrapping(0);
1156 }
1157
1158 debug!("kex done");
1159 }
1160 }
1161
1162 session.flush()?;
1163
1164 return Ok(());
1165 }
1166 }
1167
1168 // Handle key exchange/re-exchange.
1169 session.server_read_encrypted(handler, pkt).await
1170}