use std::borrow::Cow;
use std::collections::{HashMap, VecDeque};
use std::convert::TryInto;
use std::num::Wrapping;
use std::pin::Pin;
use std::sync::Arc;
#[cfg(not(target_arch = "wasm32"))]
use std::time::Duration;
use futures::Future;
use futures::task::{Context, Poll};
use kex::ClientKex;
use log::{debug, error, trace, warn};
use russh_util::time::Instant;
use ssh_encoding::Decode;
use ssh_key::{Algorithm, Certificate, HashAlg, PrivateKey, PublicKey};
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadHalf, WriteHalf};
use tokio::pin;
use tokio::sync::mpsc::{
Receiver, Sender, UnboundedReceiver, UnboundedSender, channel, unbounded_channel,
};
use tokio::sync::oneshot;
pub use crate::auth::AuthResult;
use crate::channels::{
Channel, ChannelMsg, ChannelReadHalf, ChannelRef, ChannelWriteHalf, WindowSizeRef,
};
use crate::cipher::{self, OpeningKey, clear};
use crate::kex::{KexAlgorithmImplementor, KexCause, KexProgress, SessionKexState};
use crate::keys::PrivateKeyWithHashAlg;
use crate::msg::{is_kex_msg, validate_server_msg_strict_kex};
use crate::session::{CommonSession, EncryptedState, GlobalRequestResponse, NewKeys};
use crate::ssh_read::SshRead;
use crate::sshbuffer::{IncomingSshPacket, PacketWriter, SSHBuffer, SshId};
use crate::{
ChannelId, ChannelOpenFailure, Disconnect, Error, Limits, MethodSet, Sig, auth, map_err, msg,
negotiation,
};
mod encrypted;
mod kex;
mod session;
#[cfg(test)]
mod test;
#[derive(Debug)]
pub struct Session {
kex: SessionKexState<ClientKex>,
common: CommonSession<Arc<Config>>,
receiver: Receiver<Msg>,
sender: UnboundedSender<Reply>,
channels: HashMap<ChannelId, ChannelRef>,
target_window_size: u32,
pending_reads: Vec<Vec<u8>>,
pending_len: u32,
inbound_channel_sender: Sender<Msg>,
inbound_channel_receiver: Receiver<Msg>,
open_global_requests: VecDeque<GlobalRequestResponse>,
server_sig_algs: Option<Vec<Algorithm>>,
}
impl Drop for Session {
fn drop(&mut self) {
debug!("drop session")
}
}
#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
enum Reply {
AuthSuccess,
AuthFailure {
proceed_with_methods: MethodSet,
partial_success: bool,
},
ChannelOpenFailure,
SignRequest {
key: ssh_key::PublicKey,
data: Vec<u8>,
},
SignRequestCert {
cert: Certificate,
hash_alg: Option<HashAlg>,
data: Vec<u8>,
},
AuthInfoRequest {
name: String,
instructions: String,
prompts: Vec<Prompt>,
},
}
#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
pub enum Msg {
Authenticate {
user: String,
method: auth::Method,
},
AuthInfoResponse {
responses: Vec<String>,
},
Signed {
data: Vec<u8>,
},
ChannelOpenSession {
channel_ref: ChannelRef,
},
ChannelOpenX11 {
originator_address: String,
originator_port: u32,
channel_ref: ChannelRef,
},
ChannelOpenDirectTcpIp {
host_to_connect: String,
port_to_connect: u32,
originator_address: String,
originator_port: u32,
channel_ref: ChannelRef,
},
ChannelOpenDirectStreamLocal {
socket_path: String,
channel_ref: ChannelRef,
},
TcpIpForward {
reply_channel: Option<oneshot::Sender<Option<u32>>>,
address: String,
port: u32,
},
CancelTcpIpForward {
reply_channel: Option<oneshot::Sender<bool>>,
address: String,
port: u32,
},
StreamLocalForward {
reply_channel: Option<oneshot::Sender<bool>>,
socket_path: String,
},
CancelStreamLocalForward {
reply_channel: Option<oneshot::Sender<bool>>,
socket_path: String,
},
Close {
id: ChannelId,
},
Disconnect {
reason: Disconnect,
description: String,
language_tag: String,
},
Channel(ChannelId, ChannelMsg),
Rekey,
AwaitExtensionInfo {
extension_name: String,
reply_channel: oneshot::Sender<()>,
},
GetServerSigAlgs {
reply_channel: oneshot::Sender<Option<Vec<Algorithm>>>,
},
Keepalive {
want_reply: bool,
},
Ping {
reply_channel: oneshot::Sender<()>,
},
NoMoreSessions {
want_reply: bool,
},
}
impl From<(ChannelId, ChannelMsg)> for Msg {
fn from((id, msg): (ChannelId, ChannelMsg)) -> Self {
Msg::Channel(id, msg)
}
}
#[derive(Debug)]
pub enum KeyboardInteractiveAuthResponse {
Success,
Failure {
remaining_methods: MethodSet,
partial_success: bool,
},
InfoRequest {
name: String,
instructions: String,
prompts: Vec<Prompt>,
},
}
#[derive(Debug)]
pub struct Prompt {
pub prompt: String,
pub echo: bool,
}
#[derive(Debug)]
pub struct RemoteDisconnectInfo {
pub reason_code: crate::Disconnect,
pub message: String,
pub lang_tag: String,
}
#[derive(Debug)]
pub enum DisconnectReason<E: From<crate::Error> + Send> {
ReceivedDisconnect(RemoteDisconnectInfo),
Error(E),
}
pub struct Handle<H: Handler> {
sender: Sender<Msg>,
receiver: UnboundedReceiver<Reply>,
join: russh_util::runtime::JoinHandle<Result<(), H::Error>>,
channel_buffer_size: usize,
}
impl<H: Handler> Drop for Handle<H> {
fn drop(&mut self) {
debug!("drop handle")
}
}
impl<H: Handler> Handle<H> {
pub fn is_closed(&self) -> bool {
self.sender.is_closed()
}
pub async fn authenticate_none<U: Into<String>>(
&mut self,
user: U,
) -> Result<AuthResult, crate::Error> {
let user = user.into();
self.sender
.send(Msg::Authenticate {
user,
method: auth::Method::None,
})
.await
.map_err(|_| crate::Error::SendError)?;
self.wait_recv_reply().await
}
pub async fn authenticate_password<U: Into<String>, P: Into<String>>(
&mut self,
user: U,
password: P,
) -> Result<AuthResult, crate::Error> {
let user = user.into();
self.sender
.send(Msg::Authenticate {
user,
method: auth::Method::Password {
password: password.into(),
},
})
.await
.map_err(|_| crate::Error::SendError)?;
self.wait_recv_reply().await
}
pub async fn authenticate_keyboard_interactive_start<
U: Into<String>,
S: Into<Option<String>>,
>(
&mut self,
user: U,
submethods: S,
) -> Result<KeyboardInteractiveAuthResponse, crate::Error> {
self.sender
.send(Msg::Authenticate {
user: user.into(),
method: auth::Method::KeyboardInteractive {
submethods: submethods.into().unwrap_or_else(|| "".to_owned()),
},
})
.await
.map_err(|_| crate::Error::SendError)?;
self.wait_recv_keyboard_interactive_reply().await
}
pub async fn authenticate_keyboard_interactive_respond(
&mut self,
responses: Vec<String>,
) -> Result<KeyboardInteractiveAuthResponse, crate::Error> {
self.sender
.send(Msg::AuthInfoResponse { responses })
.await
.map_err(|_| crate::Error::SendError)?;
self.wait_recv_keyboard_interactive_reply().await
}
async fn wait_recv_keyboard_interactive_reply(
&mut self,
) -> Result<KeyboardInteractiveAuthResponse, crate::Error> {
loop {
match self.receiver.recv().await {
Some(Reply::AuthSuccess) => return Ok(KeyboardInteractiveAuthResponse::Success),
Some(Reply::AuthFailure {
proceed_with_methods: remaining_methods,
partial_success,
}) => {
return Ok(KeyboardInteractiveAuthResponse::Failure {
remaining_methods,
partial_success,
});
}
Some(Reply::AuthInfoRequest {
name,
instructions,
prompts,
}) => {
return Ok(KeyboardInteractiveAuthResponse::InfoRequest {
name,
instructions,
prompts,
});
}
None => return Err(crate::Error::RecvError),
_ => {}
}
}
}
async fn wait_recv_reply(&mut self) -> Result<AuthResult, crate::Error> {
loop {
match self.receiver.recv().await {
Some(Reply::AuthSuccess) => return Ok(AuthResult::Success),
Some(Reply::AuthFailure {
proceed_with_methods: remaining_methods,
partial_success,
}) => {
return Ok(AuthResult::Failure {
remaining_methods,
partial_success,
});
}
None => {
return Ok(AuthResult::Failure {
remaining_methods: MethodSet::empty(),
partial_success: false,
});
}
_ => {}
}
}
}
pub async fn authenticate_publickey<U: Into<String>>(
&mut self,
user: U,
key: PrivateKeyWithHashAlg,
) -> Result<AuthResult, crate::Error> {
let user = user.into();
self.sender
.send(Msg::Authenticate {
user,
method: auth::Method::PublicKey { key },
})
.await
.map_err(|_| crate::Error::SendError)?;
self.wait_recv_reply().await
}
pub async fn authenticate_openssh_cert<U: Into<String>>(
&mut self,
user: U,
key: Arc<PrivateKey>,
cert: Certificate,
) -> Result<AuthResult, crate::Error> {
let user = user.into();
self.sender
.send(Msg::Authenticate {
user,
method: auth::Method::OpenSshCertificate { key, cert },
})
.await
.map_err(|_| crate::Error::SendError)?;
self.wait_recv_reply().await
}
pub async fn authenticate_publickey_with<U: Into<String>, S: auth::Signer>(
&mut self,
user: U,
key: ssh_key::PublicKey,
hash_alg: Option<HashAlg>,
signer: &mut S,
) -> Result<AuthResult, S::Error> {
let user = user.into();
if self
.sender
.send(Msg::Authenticate {
user,
method: auth::Method::FuturePublicKey { key, hash_alg },
})
.await
.is_err()
{
return Err((crate::SendError {}).into());
}
loop {
let reply = self.receiver.recv().await;
match reply {
Some(Reply::AuthSuccess) => return Ok(AuthResult::Success),
Some(Reply::AuthFailure {
proceed_with_methods: remaining_methods,
partial_success,
}) => {
return Ok(AuthResult::Failure {
remaining_methods,
partial_success,
});
}
Some(Reply::SignRequest { key, data }) => {
let data = signer.auth_sign(&key.into(), hash_alg, data).await;
let data = match data {
Ok(data) => data,
Err(e) => return Err(e),
};
if self.sender.send(Msg::Signed { data }).await.is_err() {
return Err((crate::SendError {}).into());
}
}
None => {
return Ok(AuthResult::Failure {
remaining_methods: MethodSet::empty(),
partial_success: false,
});
}
_ => {}
}
}
}
pub async fn authenticate_certificate_with<U: Into<String>, S: auth::Signer>(
&mut self,
user: U,
cert: Certificate,
hash_alg: Option<HashAlg>,
signer: &mut S,
) -> Result<AuthResult, S::Error> {
let user = user.into();
if self
.sender
.send(Msg::Authenticate {
user,
method: auth::Method::FutureCertificate { cert, hash_alg },
})
.await
.is_err()
{
return Err((crate::SendError {}).into());
}
loop {
let reply = self.receiver.recv().await;
match reply {
Some(Reply::AuthSuccess) => return Ok(AuthResult::Success),
Some(Reply::AuthFailure {
proceed_with_methods: remaining_methods,
partial_success,
}) => {
return Ok(AuthResult::Failure {
remaining_methods,
partial_success,
});
}
Some(Reply::SignRequestCert {
cert,
hash_alg,
data,
}) => {
let data = signer.auth_sign(&cert.into(), hash_alg, data).await;
let data = match data {
Ok(data) => data,
Err(e) => return Err(e),
};
if self.sender.send(Msg::Signed { data }).await.is_err() {
return Err((crate::SendError {}).into());
}
}
None => {
return Ok(AuthResult::Failure {
remaining_methods: MethodSet::empty(),
partial_success: false,
});
}
_ => {}
}
}
}
async fn wait_channel_confirmation(
&self,
mut receiver: Receiver<ChannelMsg>,
window_size_ref: WindowSizeRef,
) -> Result<Channel<Msg>, crate::Error> {
loop {
match receiver.recv().await {
Some(ChannelMsg::Open {
id,
max_packet_size,
window_size,
}) => {
window_size_ref.update(window_size).await;
return Ok(Channel {
write_half: ChannelWriteHalf {
id,
sender: self.sender.clone(),
max_packet_size,
window_size: window_size_ref,
},
read_half: ChannelReadHalf { receiver },
});
}
Some(ChannelMsg::OpenFailure(reason)) => {
return Err(crate::Error::ChannelOpenFailure(reason));
}
None => {
debug!("channel confirmation sender was dropped");
return Err(crate::Error::Disconnect);
}
msg => {
debug!("msg = {msg:?}");
}
}
}
}
#[cfg(not(target_arch = "wasm32"))]
async fn await_extension_info(&self, extension_name: String) -> Result<(), crate::Error> {
let (sender, receiver) = oneshot::channel();
self.sender
.send(Msg::AwaitExtensionInfo {
extension_name,
reply_channel: sender,
})
.await
.map_err(|_| crate::Error::SendError)?;
let _ = tokio::time::timeout(Duration::from_secs(1), receiver).await;
Ok(())
}
pub async fn best_supported_rsa_hash(&self) -> Result<Option<Option<HashAlg>>, Error> {
#[cfg(not(target_arch = "wasm32"))]
self.await_extension_info("server-sig-algs".into()).await?;
let (sender, receiver) = oneshot::channel();
self.sender
.send(Msg::GetServerSigAlgs {
reply_channel: sender,
})
.await
.map_err(|_| crate::Error::SendError)?;
if let Some(ssa) = receiver.await.map_err(|_| Error::Inconsistent)? {
let possible_algs = [
Some(ssh_key::HashAlg::Sha512),
Some(ssh_key::HashAlg::Sha256),
None,
];
for alg in possible_algs.into_iter() {
if ssa.contains(&Algorithm::Rsa { hash: alg }) {
return Ok(Some(alg));
}
}
}
Ok(None)
}
pub async fn channel_open_session(&self) -> Result<Channel<Msg>, crate::Error> {
let (sender, receiver) = channel(self.channel_buffer_size);
let channel_ref = ChannelRef::new(sender);
let window_size_ref = channel_ref.window_size().clone();
self.sender
.send(Msg::ChannelOpenSession { channel_ref })
.await
.map_err(|_| crate::Error::SendError)?;
self.wait_channel_confirmation(receiver, window_size_ref)
.await
}
pub async fn channel_open_x11<A: Into<String>>(
&self,
originator_address: A,
originator_port: u32,
) -> Result<Channel<Msg>, crate::Error> {
let (sender, receiver) = channel(self.channel_buffer_size);
let channel_ref = ChannelRef::new(sender);
let window_size_ref = channel_ref.window_size().clone();
self.sender
.send(Msg::ChannelOpenX11 {
originator_address: originator_address.into(),
originator_port,
channel_ref,
})
.await
.map_err(|_| crate::Error::SendError)?;
self.wait_channel_confirmation(receiver, window_size_ref)
.await
}
pub async fn channel_open_direct_tcpip<A: Into<String>, B: Into<String>>(
&self,
host_to_connect: A,
port_to_connect: u32,
originator_address: B,
originator_port: u32,
) -> Result<Channel<Msg>, crate::Error> {
let (sender, receiver) = channel(self.channel_buffer_size);
let channel_ref = ChannelRef::new(sender);
let window_size_ref = channel_ref.window_size().clone();
self.sender
.send(Msg::ChannelOpenDirectTcpIp {
host_to_connect: host_to_connect.into(),
port_to_connect,
originator_address: originator_address.into(),
originator_port,
channel_ref,
})
.await
.map_err(|_| crate::Error::SendError)?;
self.wait_channel_confirmation(receiver, window_size_ref)
.await
}
pub async fn channel_open_direct_streamlocal<S: Into<String>>(
&self,
socket_path: S,
) -> Result<Channel<Msg>, crate::Error> {
let (sender, receiver) = channel(self.channel_buffer_size);
let channel_ref = ChannelRef::new(sender);
let window_size_ref = channel_ref.window_size().clone();
self.sender
.send(Msg::ChannelOpenDirectStreamLocal {
socket_path: socket_path.into(),
channel_ref,
})
.await
.map_err(|_| crate::Error::SendError)?;
self.wait_channel_confirmation(receiver, window_size_ref)
.await
}
pub async fn tcpip_forward<A: Into<String>>(
&self,
address: A,
port: u32,
) -> Result<u32, crate::Error> {
let (reply_send, reply_recv) = oneshot::channel();
self.sender
.send(Msg::TcpIpForward {
reply_channel: Some(reply_send),
address: address.into(),
port,
})
.await
.map_err(|_| crate::Error::SendError)?;
match reply_recv.await {
Ok(Some(port)) => Ok(port),
Ok(None) => Err(crate::Error::RequestDenied),
Err(e) => {
error!("Unable to receive TcpIpForward result: {e:?}");
Err(crate::Error::Disconnect)
}
}
}
pub async fn cancel_tcpip_forward<A: Into<String>>(
&self,
address: A,
port: u32,
) -> Result<(), crate::Error> {
let (reply_send, reply_recv) = oneshot::channel();
self.sender
.send(Msg::CancelTcpIpForward {
reply_channel: Some(reply_send),
address: address.into(),
port,
})
.await
.map_err(|_| crate::Error::SendError)?;
match reply_recv.await {
Ok(true) => Ok(()),
Ok(false) => Err(crate::Error::RequestDenied),
Err(e) => {
error!("Unable to receive CancelTcpIpForward result: {e:?}");
Err(crate::Error::Disconnect)
}
}
}
pub async fn streamlocal_forward<A: Into<String>>(
&self,
socket_path: A,
) -> Result<(), crate::Error> {
let (reply_send, reply_recv) = oneshot::channel();
self.sender
.send(Msg::StreamLocalForward {
reply_channel: Some(reply_send),
socket_path: socket_path.into(),
})
.await
.map_err(|_| crate::Error::SendError)?;
match reply_recv.await {
Ok(true) => Ok(()),
Ok(false) => Err(crate::Error::RequestDenied),
Err(e) => {
error!("Unable to receive StreamLocalForward result: {e:?}");
Err(crate::Error::Disconnect)
}
}
}
pub async fn cancel_streamlocal_forward<A: Into<String>>(
&self,
socket_path: A,
) -> Result<(), crate::Error> {
let (reply_send, reply_recv) = oneshot::channel();
self.sender
.send(Msg::CancelStreamLocalForward {
reply_channel: Some(reply_send),
socket_path: socket_path.into(),
})
.await
.map_err(|_| crate::Error::SendError)?;
match reply_recv.await {
Ok(true) => Ok(()),
Ok(false) => Err(crate::Error::RequestDenied),
Err(e) => {
error!("Unable to receive CancelStreamLocalForward result: {e:?}");
Err(crate::Error::Disconnect)
}
}
}
pub async fn disconnect(
&self,
reason: Disconnect,
description: &str,
language_tag: &str,
) -> Result<(), crate::Error> {
self.sender
.send(Msg::Disconnect {
reason,
description: description.into(),
language_tag: language_tag.into(),
})
.await
.map_err(|_| crate::Error::SendError)?;
Ok(())
}
pub async fn data(
&self,
id: ChannelId,
data: impl Into<bytes::Bytes>,
) -> Result<(), bytes::Bytes> {
let data = data.into();
self.sender
.send(Msg::Channel(id, ChannelMsg::Data { data: data.clone() }))
.await
.map_err(|e| match e.0 {
Msg::Channel(_, ChannelMsg::Data { data, .. }) => data,
_ => unreachable!(),
})
}
pub async fn rekey_soon(&self) -> Result<(), Error> {
self.sender
.send(Msg::Rekey)
.await
.map_err(|_| Error::SendError)?;
Ok(())
}
pub async fn send_keepalive(&self, want_reply: bool) -> Result<(), Error> {
self.sender
.send(Msg::Keepalive { want_reply })
.await
.map_err(|_| Error::SendError)
}
pub async fn send_ping(&self) -> Result<(), Error> {
let (sender, receiver) = oneshot::channel();
self.sender
.send(Msg::Ping {
reply_channel: sender,
})
.await
.map_err(|_| Error::SendError)?;
let _ = receiver.await;
Ok(())
}
pub async fn no_more_sessions(&self, want_reply: bool) -> Result<(), Error> {
self.sender
.send(Msg::NoMoreSessions { want_reply })
.await
.map_err(|_| Error::SendError)
}
}
impl<H: Handler> Future for Handle<H> {
type Output = Result<(), H::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
match Future::poll(Pin::new(&mut self.join), cx) {
Poll::Ready(r) => Poll::Ready(match r {
Ok(Ok(x)) => Ok(x),
Err(e) => Err(crate::Error::from(e).into()),
Ok(Err(e)) => Err(e),
}),
Poll::Pending => Poll::Pending,
}
}
}
#[cfg(not(target_arch = "wasm32"))]
pub async fn connect<H: Handler + Send + 'static, A: tokio::net::ToSocketAddrs>(
config: Arc<Config>,
addrs: A,
handler: H,
) -> Result<Handle<H>, H::Error> {
let socket = map_err!(tokio::net::TcpStream::connect(addrs).await)?;
if config.as_ref().nodelay {
if let Err(e) = socket.set_nodelay(true) {
warn!("set_nodelay() failed: {e:?}");
}
}
connect_stream(config, socket, handler).await
}
pub async fn connect_stream<H, R>(
config: Arc<Config>,
mut stream: R,
handler: H,
) -> Result<Handle<H>, H::Error>
where
H: Handler + Send + 'static,
R: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
let mut write_buffer = SSHBuffer::new();
debug!("ssh id = {:?}", config.as_ref().client_id);
write_buffer.send_ssh_id(&config.as_ref().client_id);
map_err!(stream.write_all(&write_buffer.buffer).await)?;
let mut stream = SshRead::new(stream);
let sshid = stream.read_ssh_id().await?;
let (handle_sender, session_receiver) = channel(10);
let (session_sender, handle_receiver) = unbounded_channel();
if config.maximum_packet_size > 65535 {
error!(
"Maximum packet size ({:?}) should not larger than a TCP packet (65535)",
config.maximum_packet_size
);
}
let channel_buffer_size = config.channel_buffer_size;
let mut session = Session::new(
config.window_size,
CommonSession {
packet_writer: PacketWriter::clear(),
auth_user: String::new(),
auth_attempts: 0,
auth_method: None, remote_to_local: Box::new(clear::Key),
encrypted: None,
config,
wants_reply: false,
disconnected: false,
buffer: Vec::new(),
strict_kex: false,
alive_timeouts: 0,
received_data: false,
remote_sshid: sshid.into(),
},
session_receiver,
session_sender,
);
session.begin_rekey()?;
let (kex_done_signal, kex_done_signal_rx) = oneshot::channel();
let join = russh_util::runtime::spawn(session.run(stream, handler, Some(kex_done_signal)));
if let Err(err) = kex_done_signal_rx.await {
debug!("kex_done_signal sender was dropped {err:?}");
join.await.map_err(crate::Error::Join)??;
return Err(H::Error::from(crate::Error::Disconnect));
}
Ok(Handle {
sender: handle_sender,
receiver: handle_receiver,
join,
channel_buffer_size,
})
}
async fn start_reading<R: AsyncRead + Unpin>(
mut stream_read: R,
mut buffer: SSHBuffer,
mut cipher: Box<dyn OpeningKey + Send>,
) -> Result<(usize, R, SSHBuffer, Box<dyn OpeningKey + Send>), crate::Error> {
buffer.buffer.clear();
let n = cipher::read(&mut stream_read, &mut buffer, &mut *cipher).await?;
Ok((n, stream_read, buffer, cipher))
}
impl Session {
fn maybe_decompress(&mut self, buffer: &SSHBuffer) -> Result<IncomingSshPacket, Error> {
if let Some(ref mut enc) = self.common.encrypted {
let mut decomp = Vec::new();
Ok(IncomingSshPacket {
#[allow(clippy::indexing_slicing)] buffer: enc.decompress.decompress(
&buffer.buffer[5..],
&mut decomp,
)?.into(),
seqn: buffer.seqn,
})
} else {
Ok(IncomingSshPacket {
#[allow(clippy::indexing_slicing)] buffer: buffer.buffer[5..].into(),
seqn: buffer.seqn,
})
}
}
fn new(
target_window_size: u32,
common: CommonSession<Arc<Config>>,
receiver: Receiver<Msg>,
sender: UnboundedSender<Reply>,
) -> Self {
let (inbound_channel_sender, inbound_channel_receiver) = channel(10);
Self {
common,
receiver,
sender,
kex: SessionKexState::Idle,
target_window_size,
inbound_channel_sender,
inbound_channel_receiver,
channels: HashMap::new(),
pending_reads: Vec::new(),
pending_len: 0,
open_global_requests: VecDeque::new(),
server_sig_algs: None,
}
}
async fn run<H: Handler + Send, R: AsyncRead + AsyncWrite + Unpin + Send>(
mut self,
stream: SshRead<R>,
mut handler: H,
mut kex_done_signal: Option<oneshot::Sender<()>>,
) -> Result<(), H::Error> {
let (stream_read, mut stream_write) = stream.split();
let result = self
.run_inner(
stream_read,
&mut stream_write,
&mut handler,
&mut kex_done_signal,
)
.await;
trace!("disconnected");
self.receiver.close();
self.inbound_channel_receiver.close();
map_err!(stream_write.shutdown().await)?;
match result {
Ok(v) => {
handler
.disconnected(DisconnectReason::ReceivedDisconnect(v))
.await?;
Ok(())
}
Err(e) => {
if kex_done_signal.is_some() {
Err(e)
} else {
debug!("disconnected {e:?}");
handler.disconnected(DisconnectReason::Error(e)).await?;
Err(H::Error::from(crate::Error::Disconnect))
}
}
}
}
async fn run_inner<H: Handler + Send, R: AsyncRead + AsyncWrite + Unpin + Send>(
&mut self,
stream_read: SshRead<ReadHalf<R>>,
stream_write: &mut WriteHalf<R>,
handler: &mut H,
kex_done_signal: &mut Option<tokio::sync::oneshot::Sender<()>>,
) -> Result<RemoteDisconnectInfo, H::Error> {
let mut result: Result<RemoteDisconnectInfo, H::Error> = Err(Error::Disconnect.into());
self.flush()?;
map_err!(self.common.packet_writer.flush_into(stream_write).await)?;
let buffer = SSHBuffer::new();
let mut opening_cipher = Box::new(clear::Key) as Box<dyn OpeningKey + Send>;
std::mem::swap(&mut opening_cipher, &mut self.common.remote_to_local);
let keepalive_timer =
crate::future_or_pending(self.common.config.keepalive_interval, tokio::time::sleep);
pin!(keepalive_timer);
let inactivity_timer =
crate::future_or_pending(self.common.config.inactivity_timeout, tokio::time::sleep);
pin!(inactivity_timer);
let reading = start_reading(stream_read, buffer, opening_cipher);
pin!(reading);
#[allow(clippy::panic)] while !self.common.disconnected {
self.common.received_data = false;
let mut sent_keepalive = false;
tokio::select! {
r = &mut reading => {
let (stream_read, mut buffer, mut opening_cipher) = match r {
Ok((_, stream_read, buffer, opening_cipher)) => (stream_read, buffer, opening_cipher),
Err(e) => return Err(e.into())
};
std::mem::swap(&mut opening_cipher, &mut self.common.remote_to_local);
if buffer.buffer.len() < 5 {
break
}
let mut pkt = self.maybe_decompress(&buffer)?;
if !pkt.buffer.is_empty() {
#[allow(clippy::indexing_slicing)] if pkt.buffer[0] == crate::msg::DISCONNECT {
debug!("received disconnect");
result = self.process_disconnect(&pkt).map_err(H::Error::from);
} else {
self.common.received_data = true;
reply(self, handler, kex_done_signal, &mut pkt).await?;
buffer.seqn = pkt.seqn; }
}
std::mem::swap(&mut opening_cipher, &mut self.common.remote_to_local);
reading.set(start_reading(stream_read, buffer, opening_cipher));
}
() = &mut keepalive_timer => {
if let Some(ref mut enc) = self.common.encrypted {
if matches!(enc.state, EncryptedState::Authenticated) {
self.common.alive_timeouts = self.common.alive_timeouts.saturating_add(1);
if self.common.config.keepalive_max != 0 && self.common.alive_timeouts > self.common.config.keepalive_max {
debug!("Timeout, server not responding to keepalives");
return Err(crate::Error::KeepaliveTimeout.into());
}
sent_keepalive = true;
self.send_keepalive(true)?;
}
}
}
() = &mut inactivity_timer => {
debug!("timeout");
return Err(crate::Error::InactivityTimeout.into());
}
msg = self.receiver.recv(), if !self.kex.active() => {
match msg {
Some(msg) => self.handle_msg(msg)?,
None => {
self.common.disconnected = true;
break
}
};
while !self.kex.active() {
match self.receiver.try_recv() {
Ok(next) => self.handle_msg(next)?,
Err(_) => break
}
}
}
msg = self.inbound_channel_receiver.recv(), if !self.kex.active() => {
match msg {
Some(msg) => self.handle_msg(msg)?,
None => (),
}
while !self.kex.active() {
match self.inbound_channel_receiver.try_recv() {
Ok(next) => self.handle_msg(next)?,
Err(_) => break
}
}
}
};
self.flush()?;
map_err!(self.common.packet_writer.flush_into(stream_write).await)?;
if let Some(ref mut enc) = self.common.encrypted {
if let EncryptedState::InitCompression = enc.state {
if enc.client_compression.is_deferred() {
enc.client_compression
.init_compress(self.common.packet_writer.compress());
}
enc.state = EncryptedState::Authenticated;
}
}
if self.common.received_data {
self.common.alive_timeouts = 0;
}
if self.common.received_data || sent_keepalive {
if let (futures::future::Either::Right(ref mut sleep), Some(d)) = (
keepalive_timer.as_mut().as_pin_mut(),
self.common.config.keepalive_interval,
) {
sleep.as_mut().reset(tokio::time::Instant::now() + d);
}
}
if !sent_keepalive {
if let (futures::future::Either::Right(ref mut sleep), Some(d)) = (
inactivity_timer.as_mut().as_pin_mut(),
self.common.config.inactivity_timeout,
) {
sleep.as_mut().reset(tokio::time::Instant::now() + d);
}
}
}
result
}
fn process_disconnect(
&mut self,
pkt: &IncomingSshPacket,
) -> Result<RemoteDisconnectInfo, Error> {
let mut r = &pkt.buffer[..];
u8::decode(&mut r)?; self.common.disconnected = true;
let reason_code = u32::decode(&mut r)?.try_into()?;
let message = String::decode(&mut r)?;
let lang_tag = String::decode(&mut r)?;
Ok(RemoteDisconnectInfo {
reason_code,
message,
lang_tag,
})
}
fn handle_msg(&mut self, msg: Msg) -> Result<(), crate::Error> {
match msg {
Msg::Authenticate { user, method } => {
self.write_auth_request_if_needed(&user, method)?;
}
Msg::Signed { .. } => {}
Msg::AuthInfoResponse { .. } => {}
Msg::ChannelOpenSession { channel_ref } => {
let id = self.channel_open_session()?;
self.channels.insert(id, channel_ref);
}
Msg::ChannelOpenX11 {
originator_address,
originator_port,
channel_ref,
} => {
let id = self.channel_open_x11(&originator_address, originator_port)?;
self.channels.insert(id, channel_ref);
}
Msg::ChannelOpenDirectTcpIp {
host_to_connect,
port_to_connect,
originator_address,
originator_port,
channel_ref,
} => {
let id = self.channel_open_direct_tcpip(
&host_to_connect,
port_to_connect,
&originator_address,
originator_port,
)?;
self.channels.insert(id, channel_ref);
}
Msg::ChannelOpenDirectStreamLocal {
socket_path,
channel_ref,
} => {
let id = self.channel_open_direct_streamlocal(&socket_path)?;
self.channels.insert(id, channel_ref);
}
Msg::TcpIpForward {
reply_channel,
address,
port,
} => self.tcpip_forward(reply_channel, &address, port)?,
Msg::CancelTcpIpForward {
reply_channel,
address,
port,
} => self.cancel_tcpip_forward(reply_channel, &address, port)?,
Msg::StreamLocalForward {
reply_channel,
socket_path,
} => self.streamlocal_forward(reply_channel, &socket_path)?,
Msg::CancelStreamLocalForward {
reply_channel,
socket_path,
} => self.cancel_streamlocal_forward(reply_channel, &socket_path)?,
Msg::Disconnect {
reason,
description,
language_tag,
} => self.disconnect(reason, &description, &language_tag)?,
Msg::Channel(id, ChannelMsg::Data { data }) => self.data(id, data)?,
Msg::Channel(id, ChannelMsg::Eof) => {
self.eof(id)?;
}
Msg::Channel(id, ChannelMsg::ExtendedData { data, ext }) => {
self.extended_data(id, ext, data)?;
}
Msg::Channel(
id,
ChannelMsg::RequestPty {
want_reply,
term,
col_width,
row_height,
pix_width,
pix_height,
terminal_modes,
},
) => self.request_pty(
id,
want_reply,
&term,
col_width,
row_height,
pix_width,
pix_height,
&terminal_modes,
)?,
Msg::Channel(
id,
ChannelMsg::WindowChange {
col_width,
row_height,
pix_width,
pix_height,
},
) => self.window_change(id, col_width, row_height, pix_width, pix_height)?,
Msg::Channel(
id,
ChannelMsg::RequestX11 {
want_reply,
single_connection,
x11_authentication_protocol,
x11_authentication_cookie,
x11_screen_number,
},
) => self.request_x11(
id,
want_reply,
single_connection,
&x11_authentication_protocol,
&x11_authentication_cookie,
x11_screen_number,
)?,
Msg::Channel(
id,
ChannelMsg::SetEnv {
want_reply,
variable_name,
variable_value,
},
) => self.set_env(id, want_reply, &variable_name, &variable_value)?,
Msg::Channel(id, ChannelMsg::RequestShell { want_reply }) => {
self.request_shell(want_reply, id)?
}
Msg::Channel(
id,
ChannelMsg::Exec {
want_reply,
command,
},
) => self.exec(id, want_reply, &command)?,
Msg::Channel(id, ChannelMsg::Signal { signal }) => self.signal(id, signal)?,
Msg::Channel(id, ChannelMsg::RequestSubsystem { want_reply, name }) => {
self.request_subsystem(want_reply, id, &name)?
}
Msg::Channel(id, ChannelMsg::AgentForward { want_reply }) => {
self.agent_forward(id, want_reply)?
}
Msg::Channel(id, ChannelMsg::Close) => self.close(id)?,
Msg::Rekey => self.initiate_rekey()?,
Msg::AwaitExtensionInfo {
extension_name,
reply_channel,
} => {
if let Some(ref mut enc) = self.common.encrypted {
if !enc.received_extensions.contains(&extension_name) {
if !matches!(enc.state, EncryptedState::Authenticated) {
enc.extension_info_awaiters
.entry(extension_name)
.or_insert(vec![])
.push(reply_channel);
}
}
}
}
Msg::GetServerSigAlgs { reply_channel } => {
let _ = reply_channel.send(self.server_sig_algs.clone());
}
Msg::Keepalive { want_reply } => {
let _ = self.send_keepalive(want_reply);
}
Msg::Ping { reply_channel } => {
let _ = self.send_ping(reply_channel);
}
Msg::NoMoreSessions { want_reply } => {
let _ = self.no_more_sessions(want_reply);
}
msg => {
unimplemented!("unimplemented (server-only?) message: {:?}", msg)
}
}
Ok(())
}
fn begin_rekey(&mut self) -> Result<(), crate::Error> {
debug!("beginning re-key");
let mut kex = ClientKex::new(
self.common.config.clone(),
&self.common.config.client_id,
&self.common.remote_sshid,
match &self.common.encrypted {
None => KexCause::Initial,
Some(enc) => KexCause::Rekey {
strict: self.common.strict_kex,
session_id: enc.session_id.clone(),
},
},
);
kex.kexinit(&mut self.common.packet_writer)?;
self.kex = SessionKexState::InProgress(kex);
Ok(())
}
fn flush(&mut self) -> Result<(), crate::Error> {
if let Some(ref mut enc) = self.common.encrypted {
if enc.flush(
&self.common.config.as_ref().limits,
&mut self.common.packet_writer,
)? && !self.kex.active()
{
self.begin_rekey()?;
}
}
Ok(())
}
pub fn initiate_rekey(&mut self) -> Result<(), Error> {
if let Some(ref mut enc) = self.common.encrypted {
enc.rekey_wanted = true;
self.flush()?
}
Ok(())
}
}
async fn reply<H: Handler>(
session: &mut Session,
handler: &mut H,
kex_done_signal: &mut Option<tokio::sync::oneshot::Sender<()>>,
pkt: &mut IncomingSshPacket,
) -> Result<(), H::Error> {
if let Some(message_type) = pkt.buffer.first() {
debug!(
"< msg type {message_type:?}, seqn {:?}, len {}",
pkt.seqn.0,
pkt.buffer.len()
);
if session.common.strict_kex && session.common.encrypted.is_none() {
let seqno = pkt.seqn.0 - 1; validate_server_msg_strict_kex(*message_type, seqno as usize)?;
}
if [msg::IGNORE, msg::UNIMPLEMENTED, msg::DEBUG].contains(message_type) {
return Ok(());
}
}
if pkt.buffer.first() == Some(&msg::KEXINIT) && session.kex == SessionKexState::Idle {
debug!("server has initiated re-key");
session.begin_rekey()?;
}
let is_kex_msg = pkt.buffer.first().cloned().map(is_kex_msg).unwrap_or(false);
if is_kex_msg {
if let SessionKexState::InProgress(kex) = session.kex.take() {
let progress = kex.step(Some(pkt), &mut session.common.packet_writer)?;
match progress {
KexProgress::NeedsReply { kex, reset_seqn } => {
debug!("kex impl continues: {kex:?}");
session.kex = SessionKexState::InProgress(kex);
if reset_seqn {
debug!("kex impl requests seqno reset");
session.common.reset_seqn();
}
}
KexProgress::Done {
server_host_key,
newkeys,
} => {
debug!("kex impl has completed");
session.common.strict_kex =
session.common.strict_kex || newkeys.names.strict_kex();
let shared_secret = newkeys.kex.shared_secret_bytes();
handler
.kex_done(shared_secret, &newkeys.names, session)
.await?;
if let Some(ref mut enc) = session.common.encrypted {
enc.last_rekey = Instant::now();
session.common.packet_writer.buffer().bytes = 0;
enc.flush_all_pending()?;
let mut pending = std::mem::take(&mut session.pending_reads);
for p in pending.drain(..) {
session.process_packet(handler, &p).await?;
}
session.pending_reads = pending;
session.pending_len = 0;
session.common.newkeys(newkeys);
} else {
if let Some(server_host_key) = &server_host_key {
let check = handler.check_server_key(server_host_key).await?;
if !check {
return Err(crate::Error::UnknownKey.into());
}
}
session
.common
.encrypted(initial_encrypted_state(session), newkeys);
if let Some(sender) = kex_done_signal.take() {
sender.send(()).unwrap_or(());
}
}
session.kex = SessionKexState::Idle;
if session.common.strict_kex {
pkt.seqn = Wrapping(0);
}
debug!("kex done");
}
}
session.flush()?;
return Ok(());
}
}
session.client_read_encrypted(handler, pkt).await
}
fn initial_encrypted_state(session: &Session) -> EncryptedState {
if session.common.config.anonymous {
EncryptedState::Authenticated
} else {
EncryptedState::WaitingAuthServiceRequest {
accepted: false,
sent: false,
}
}
}
#[derive(Debug, Clone)]
pub struct GexParams {
min_group_size: usize,
preferred_group_size: usize,
max_group_size: usize,
}
impl GexParams {
pub fn new(
min_group_size: usize,
preferred_group_size: usize,
max_group_size: usize,
) -> Result<Self, Error> {
Self::for_client_config(min_group_size, preferred_group_size, max_group_size)
}
pub fn for_client_config(
min_group_size: usize,
preferred_group_size: usize,
max_group_size: usize,
) -> Result<Self, Error> {
Self::build(
min_group_size,
preferred_group_size,
max_group_size,
ValidationKind::ClientConfig,
)
}
fn validate(&self, kind: ValidationKind) -> Result<(), Error> {
match kind {
ValidationKind::ClientConfig => {
if self.min_group_size < 2048 {
return Err(Error::InvalidConfig(format!(
"min_group_size must be at least 2048 bits. We got {} bits",
self.min_group_size
)));
}
}
ValidationKind::PeerRequest => {
if self.max_group_size < 2048 {
return Err(Error::InvalidConfig(format!(
"max_group_size must be at least 2048 bits. We got {} bits",
self.max_group_size
)));
}
}
}
if self.preferred_group_size < self.min_group_size {
return Err(Error::InvalidConfig(format!(
"preferred_group_size must be at least as large as min_group_size. We have preferred_group_size = {} < min_group_size = {}",
self.preferred_group_size, self.min_group_size
)));
}
if self.max_group_size < self.preferred_group_size {
return Err(Error::InvalidConfig(format!(
"max_group_size must be at least as large as preferred_group_size. We have max_group_size = {} < preferred_group_size = {}",
self.max_group_size, self.preferred_group_size
)));
}
Ok(())
}
pub(crate) fn from_peer_request(
min_group_size: usize,
preferred_group_size: usize,
max_group_size: usize,
) -> Result<Self, Error> {
Self::build(
min_group_size,
preferred_group_size,
max_group_size,
ValidationKind::PeerRequest,
)
}
fn build(
min_group_size: usize,
preferred_group_size: usize,
max_group_size: usize,
kind: ValidationKind,
) -> Result<Self, Error> {
let this = Self {
min_group_size,
preferred_group_size,
max_group_size,
};
this.validate(kind)?;
Ok(this)
}
pub fn min_group_size(&self) -> usize {
self.min_group_size
}
pub fn preferred_group_size(&self) -> usize {
self.preferred_group_size
}
pub fn max_group_size(&self) -> usize {
self.max_group_size
}
}
#[derive(Clone, Copy, Eq, PartialEq)]
enum ValidationKind {
ClientConfig,
PeerRequest,
}
impl Default for GexParams {
fn default() -> GexParams {
GexParams {
min_group_size: 3072,
preferred_group_size: 8192,
max_group_size: 8192,
}
}
}
#[derive(Debug)]
pub struct Config {
pub client_id: SshId,
pub limits: Limits,
pub window_size: u32,
pub maximum_packet_size: u32,
pub channel_buffer_size: usize,
pub preferred: negotiation::Preferred,
pub inactivity_timeout: Option<std::time::Duration>,
pub keepalive_interval: Option<std::time::Duration>,
pub keepalive_max: usize,
pub anonymous: bool,
pub gex: GexParams,
pub nodelay: bool,
}
impl Default for Config {
fn default() -> Config {
Config {
client_id: SshId::Standard(Cow::Borrowed(concat!(
"SSH-2.0-",
env!("CARGO_PKG_NAME"),
"_",
env!("CARGO_PKG_VERSION")
))),
limits: Limits::default(),
window_size: 2097152,
maximum_packet_size: 32768,
channel_buffer_size: 100,
preferred: Default::default(),
inactivity_timeout: None,
keepalive_interval: None,
keepalive_max: 3,
anonymous: false,
gex: Default::default(),
nodelay: false,
}
}
}
#[cfg_attr(feature = "async-trait", async_trait::async_trait)]
pub trait Handler: Sized + Send {
type Error: From<crate::Error> + Send + core::fmt::Debug;
#[allow(unused_variables)]
fn auth_banner(
&mut self,
banner: &str,
session: &mut Session,
) -> impl Future<Output = Result<(), Self::Error>> + Send {
async { Ok(()) }
}
#[allow(unused_variables)]
fn check_server_key(
&mut self,
server_public_key: &ssh_key::PublicKey,
) -> impl Future<Output = Result<bool, Self::Error>> + Send {
async { Ok(false) }
}
#[allow(unused_variables)]
fn kex_done(
&mut self,
shared_secret: Option<&[u8]>,
names: &negotiation::Names,
session: &mut Session,
) -> impl Future<Output = Result<(), Self::Error>> + Send {
async { Ok(()) }
}
#[allow(unused_variables)]
fn channel_open_confirmation(
&mut self,
id: ChannelId,
max_packet_size: u32,
window_size: u32,
session: &mut Session,
) -> impl Future<Output = Result<(), Self::Error>> + Send {
async { Ok(()) }
}
#[allow(unused_variables)]
fn channel_success(
&mut self,
channel: ChannelId,
session: &mut Session,
) -> impl Future<Output = Result<(), Self::Error>> + Send {
async { Ok(()) }
}
#[allow(unused_variables)]
fn channel_failure(
&mut self,
channel: ChannelId,
session: &mut Session,
) -> impl Future<Output = Result<(), Self::Error>> + Send {
async { Ok(()) }
}
#[allow(unused_variables)]
fn channel_close(
&mut self,
channel: ChannelId,
session: &mut Session,
) -> impl Future<Output = Result<(), Self::Error>> + Send {
async { Ok(()) }
}
#[allow(unused_variables)]
fn channel_eof(
&mut self,
channel: ChannelId,
session: &mut Session,
) -> impl Future<Output = Result<(), Self::Error>> + Send {
async { Ok(()) }
}
#[allow(unused_variables)]
fn channel_open_failure(
&mut self,
channel: ChannelId,
reason: ChannelOpenFailure,
description: &str,
language: &str,
session: &mut Session,
) -> impl Future<Output = Result<(), Self::Error>> + Send {
async { Ok(()) }
}
#[allow(unused_variables)]
fn server_channel_open_forwarded_tcpip(
&mut self,
channel: Channel<Msg>,
connected_address: &str,
connected_port: u32,
originator_address: &str,
originator_port: u32,
session: &mut Session,
) -> impl Future<Output = Result<(), Self::Error>> + Send {
async { Ok(()) }
}
#[allow(unused_variables)]
fn server_channel_open_forwarded_streamlocal(
&mut self,
channel: Channel<Msg>,
socket_path: &str,
session: &mut Session,
) -> impl Future<Output = Result<(), Self::Error>> + Send {
async { Ok(()) }
}
#[allow(unused_variables)]
fn server_channel_open_agent_forward(
&mut self,
channel: Channel<Msg>,
session: &mut Session,
) -> impl Future<Output = Result<(), Self::Error>> + Send {
async { Ok(()) }
}
#[allow(unused_variables)]
fn should_accept_unknown_server_channel(
&mut self,
id: ChannelId,
channel_type: &str,
) -> impl Future<Output = bool> + Send {
async { false }
}
#[allow(unused_variables)]
fn server_channel_open_unknown(
&mut self,
channel: Channel<Msg>,
session: &mut Session,
) -> impl Future<Output = Result<(), Self::Error>> + Send {
async { Ok(()) }
}
#[allow(unused_variables)]
fn server_channel_open_session(
&mut self,
channel: Channel<Msg>,
session: &mut Session,
) -> impl Future<Output = Result<(), Self::Error>> + Send {
async { Ok(()) }
}
#[allow(unused_variables)]
fn server_channel_open_direct_tcpip(
&mut self,
channel: Channel<Msg>,
host_to_connect: &str,
port_to_connect: u32,
originator_address: &str,
originator_port: u32,
session: &mut Session,
) -> impl Future<Output = Result<(), Self::Error>> + Send {
async { Ok(()) }
}
#[allow(unused_variables)]
fn server_channel_open_direct_streamlocal(
&mut self,
channel: Channel<Msg>,
socket_path: &str,
session: &mut Session,
) -> impl Future<Output = Result<(), Self::Error>> + Send {
async { Ok(()) }
}
#[allow(unused_variables)]
fn server_channel_open_x11(
&mut self,
channel: Channel<Msg>,
originator_address: &str,
originator_port: u32,
session: &mut Session,
) -> impl Future<Output = Result<(), Self::Error>> + Send {
async { Ok(()) }
}
#[allow(unused_variables)]
fn data(
&mut self,
channel: ChannelId,
data: &[u8],
session: &mut Session,
) -> impl Future<Output = Result<(), Self::Error>> + Send {
async { Ok(()) }
}
#[allow(unused_variables)]
fn extended_data(
&mut self,
channel: ChannelId,
ext: u32,
data: &[u8],
session: &mut Session,
) -> impl Future<Output = Result<(), Self::Error>> + Send {
async { Ok(()) }
}
#[allow(unused_variables)]
fn xon_xoff(
&mut self,
channel: ChannelId,
client_can_do: bool,
session: &mut Session,
) -> impl Future<Output = Result<(), Self::Error>> + Send {
async { Ok(()) }
}
#[allow(unused_variables)]
fn exit_status(
&mut self,
channel: ChannelId,
exit_status: u32,
session: &mut Session,
) -> impl Future<Output = Result<(), Self::Error>> + Send {
async { Ok(()) }
}
#[allow(unused_variables)]
fn exit_signal(
&mut self,
channel: ChannelId,
signal_name: Sig,
core_dumped: bool,
error_message: &str,
lang_tag: &str,
session: &mut Session,
) -> impl Future<Output = Result<(), Self::Error>> + Send {
async { Ok(()) }
}
#[allow(unused_variables)]
fn window_adjusted(
&mut self,
channel: ChannelId,
new_size: u32,
session: &mut Session,
) -> impl Future<Output = Result<(), Self::Error>> + Send {
async { Ok(()) }
}
#[allow(unused_variables)]
fn adjust_window(&mut self, channel: ChannelId, window: u32) -> u32 {
window
}
#[allow(unused_variables)]
fn openssh_ext_host_keys_announced(
&mut self,
keys: Vec<PublicKey>,
session: &mut Session,
) -> impl Future<Output = Result<(), Self::Error>> + Send {
async move {
debug!("openssh_ext_hostkeys_announced: {keys:?}");
Ok(())
}
}
#[allow(unused_variables)]
fn disconnected(
&mut self,
reason: DisconnectReason<Self::Error>,
) -> impl Future<Output = Result<(), Self::Error>> + Send {
async {
debug!("disconnected: {reason:?}");
match reason {
DisconnectReason::ReceivedDisconnect(_) => Ok(()),
DisconnectReason::Error(e) => Err(e),
}
}
}
}