use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use channels::WindowSizeRef;
use log::debug;
use negotiation::parse_kex_algo_list;
use russh_keys::helpers::NameList;
use russh_keys::map_err;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use tokio::sync::mpsc::{unbounded_channel, Receiver, Sender, UnboundedReceiver};
use tokio::sync::oneshot;
use super::*;
use crate::channels::{Channel, ChannelMsg, ChannelRef};
use crate::kex::EXTENSION_SUPPORT_AS_CLIENT;
use crate::msg;
#[derive(Debug)]
pub struct Session {
pub(crate) common: CommonSession<Arc<Config>>,
pub(crate) sender: Handle,
pub(crate) receiver: Receiver<Msg>,
pub(crate) target_window_size: u32,
pub(crate) pending_reads: Vec<CryptoVec>,
pub(crate) pending_len: u32,
pub(crate) channels: HashMap<ChannelId, ChannelRef>,
pub(crate) open_global_requests: VecDeque<GlobalRequestResponse>,
}
#[derive(Debug)]
pub enum Msg {
ChannelOpenAgent {
channel_ref: ChannelRef,
},
ChannelOpenSession {
channel_ref: ChannelRef,
},
ChannelOpenDirectTcpIp {
host_to_connect: String,
port_to_connect: u32,
originator_address: String,
originator_port: u32,
channel_ref: ChannelRef,
},
ChannelOpenForwardedTcpIp {
connected_address: String,
connected_port: u32,
originator_address: String,
originator_port: u32,
channel_ref: ChannelRef,
},
ChannelOpenForwardedStreamLocal {
server_socket_path: String,
channel_ref: ChannelRef,
},
ChannelOpenX11 {
originator_address: String,
originator_port: u32,
channel_ref: ChannelRef,
},
TcpIpForward {
reply_channel: Option<oneshot::Sender<Option<u32>>>,
address: String,
port: u32,
},
CancelTcpIpForward {
reply_channel: Option<oneshot::Sender<bool>>,
address: String,
port: u32,
},
Disconnect {
reason: crate::Disconnect,
description: String,
language_tag: String,
},
Channel(ChannelId, ChannelMsg),
}
impl From<(ChannelId, ChannelMsg)> for Msg {
fn from((id, msg): (ChannelId, ChannelMsg)) -> Self {
Msg::Channel(id, msg)
}
}
#[derive(Clone, Debug)]
pub struct Handle {
pub(crate) sender: Sender<Msg>,
}
impl Handle {
pub async fn data(&self, id: ChannelId, data: CryptoVec) -> Result<(), CryptoVec> {
self.sender
.send(Msg::Channel(id, ChannelMsg::Data { data }))
.await
.map_err(|e| match e.0 {
Msg::Channel(_, ChannelMsg::Data { data }) => data,
_ => unreachable!(),
})
}
pub async fn extended_data(
&self,
id: ChannelId,
ext: u32,
data: CryptoVec,
) -> Result<(), CryptoVec> {
self.sender
.send(Msg::Channel(id, ChannelMsg::ExtendedData { ext, data }))
.await
.map_err(|e| match e.0 {
Msg::Channel(_, ChannelMsg::ExtendedData { data, .. }) => data,
_ => unreachable!(),
})
}
pub async fn eof(&self, id: ChannelId) -> Result<(), ()> {
self.sender
.send(Msg::Channel(id, ChannelMsg::Eof))
.await
.map_err(|_| ())
}
pub async fn channel_success(&self, id: ChannelId) -> Result<(), ()> {
self.sender
.send(Msg::Channel(id, ChannelMsg::Success))
.await
.map_err(|_| ())
}
pub async fn channel_failure(&self, id: ChannelId) -> Result<(), ()> {
self.sender
.send(Msg::Channel(id, ChannelMsg::Failure))
.await
.map_err(|_| ())
}
pub async fn close(&self, id: ChannelId) -> Result<(), ()> {
self.sender
.send(Msg::Channel(id, ChannelMsg::Close))
.await
.map_err(|_| ())
}
pub async fn xon_xoff_request(&self, id: ChannelId, client_can_do: bool) -> Result<(), ()> {
self.sender
.send(Msg::Channel(id, ChannelMsg::XonXoff { client_can_do }))
.await
.map_err(|_| ())
}
pub async fn exit_status_request(&self, id: ChannelId, exit_status: u32) -> Result<(), ()> {
self.sender
.send(Msg::Channel(id, ChannelMsg::ExitStatus { exit_status }))
.await
.map_err(|_| ())
}
pub async fn forward_tcpip(&self, address: String, port: u32) -> Result<u32, ()> {
let (reply_send, reply_recv) = oneshot::channel();
self.sender
.send(Msg::TcpIpForward {
reply_channel: Some(reply_send),
address,
port,
})
.await
.map_err(|_| ())?;
match reply_recv.await {
Ok(Some(port)) => Ok(port),
Ok(None) => Err(()), Err(e) => {
error!("Unable to receive TcpIpForward result: {e:?}");
Err(()) }
}
}
pub async fn cancel_forward_tcpip(&self, address: String, port: u32) -> Result<(), ()> {
let (reply_send, reply_recv) = oneshot::channel();
self.sender
.send(Msg::CancelTcpIpForward {
reply_channel: Some(reply_send),
address,
port,
})
.await
.map_err(|_| ())?;
match reply_recv.await {
Ok(true) => Ok(()),
Ok(false) => Err(()), Err(e) => {
error!("Unable to receive CancelTcpIpForward result: {e:?}");
Err(()) }
}
}
pub async fn channel_open_agent(&self) -> Result<Channel<Msg>, Error> {
let (sender, receiver) = unbounded_channel();
let channel_ref = ChannelRef::new(sender);
let window_size_ref = channel_ref.window_size().clone();
self.sender
.send(Msg::ChannelOpenAgent { channel_ref })
.await
.map_err(|_| Error::SendError)?;
self.wait_channel_confirmation(receiver, window_size_ref)
.await
}
pub async fn channel_open_session(&self) -> Result<Channel<Msg>, Error> {
let (sender, receiver) = unbounded_channel();
let channel_ref = ChannelRef::new(sender);
let window_size_ref = channel_ref.window_size().clone();
self.sender
.send(Msg::ChannelOpenSession { channel_ref })
.await
.map_err(|_| Error::SendError)?;
self.wait_channel_confirmation(receiver, window_size_ref)
.await
}
pub async fn channel_open_direct_tcpip<A: Into<String>, B: Into<String>>(
&self,
host_to_connect: A,
port_to_connect: u32,
originator_address: B,
originator_port: u32,
) -> Result<Channel<Msg>, Error> {
let (sender, receiver) = unbounded_channel();
let channel_ref = ChannelRef::new(sender);
let window_size_ref = channel_ref.window_size().clone();
self.sender
.send(Msg::ChannelOpenDirectTcpIp {
host_to_connect: host_to_connect.into(),
port_to_connect,
originator_address: originator_address.into(),
originator_port,
channel_ref,
})
.await
.map_err(|_| Error::SendError)?;
self.wait_channel_confirmation(receiver, window_size_ref)
.await
}
pub async fn channel_open_forwarded_tcpip<A: Into<String>, B: Into<String>>(
&self,
connected_address: A,
connected_port: u32,
originator_address: B,
originator_port: u32,
) -> Result<Channel<Msg>, Error> {
let (sender, receiver) = unbounded_channel();
let channel_ref = ChannelRef::new(sender);
let window_size_ref = channel_ref.window_size().clone();
self.sender
.send(Msg::ChannelOpenForwardedTcpIp {
connected_address: connected_address.into(),
connected_port,
originator_address: originator_address.into(),
originator_port,
channel_ref,
})
.await
.map_err(|_| Error::SendError)?;
self.wait_channel_confirmation(receiver, window_size_ref)
.await
}
pub async fn channel_open_forwarded_streamlocal<A: Into<String>>(
&self,
server_socket_path: A,
) -> Result<Channel<Msg>, Error> {
let (sender, receiver) = unbounded_channel();
let channel_ref = ChannelRef::new(sender);
let window_size_ref = channel_ref.window_size().clone();
self.sender
.send(Msg::ChannelOpenForwardedStreamLocal {
server_socket_path: server_socket_path.into(),
channel_ref,
})
.await
.map_err(|_| Error::SendError)?;
self.wait_channel_confirmation(receiver, window_size_ref)
.await
}
pub async fn channel_open_x11<A: Into<String>>(
&self,
originator_address: A,
originator_port: u32,
) -> Result<Channel<Msg>, Error> {
let (sender, receiver) = unbounded_channel();
let channel_ref = ChannelRef::new(sender);
let window_size_ref = channel_ref.window_size().clone();
self.sender
.send(Msg::ChannelOpenX11 {
originator_address: originator_address.into(),
originator_port,
channel_ref,
})
.await
.map_err(|_| Error::SendError)?;
self.wait_channel_confirmation(receiver, window_size_ref)
.await
}
async fn wait_channel_confirmation(
&self,
mut receiver: UnboundedReceiver<ChannelMsg>,
window_size_ref: WindowSizeRef,
) -> Result<Channel<Msg>, Error> {
loop {
match receiver.recv().await {
Some(ChannelMsg::Open {
id,
max_packet_size,
window_size,
}) => {
window_size_ref.update(window_size).await;
return Ok(Channel {
id,
sender: self.sender.clone(),
receiver,
max_packet_size,
window_size: window_size_ref,
});
}
Some(ChannelMsg::OpenFailure(reason)) => {
return Err(Error::ChannelOpenFailure(reason))
}
None => {
return Err(Error::Disconnect);
}
msg => {
debug!("msg = {:?}", msg);
}
}
}
}
pub async fn exit_signal_request(
&self,
id: ChannelId,
signal_name: Sig,
core_dumped: bool,
error_message: String,
lang_tag: String,
) -> Result<(), ()> {
self.sender
.send(Msg::Channel(
id,
ChannelMsg::ExitSignal {
signal_name,
core_dumped,
error_message,
lang_tag,
},
))
.await
.map_err(|_| ())
}
pub async fn disconnect(
&self,
reason: Disconnect,
description: String,
language_tag: String,
) -> Result<(), Error> {
self.sender
.send(Msg::Disconnect {
reason,
description,
language_tag,
})
.await
.map_err(|_| Error::SendError)
}
}
impl Session {
pub(crate) fn is_rekeying(&self) -> bool {
if let Some(ref enc) = self.common.encrypted {
enc.rekey.is_some()
} else {
true
}
}
pub(crate) async fn run<H, R>(
mut self,
mut stream: SshRead<R>,
mut handler: H,
) -> Result<(), H::Error>
where
H: Handler + Send + 'static,
R: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
self.flush()?;
map_err!(stream.write_all(&self.common.write_buffer.buffer).await)?;
self.common.write_buffer.buffer.clear();
let (stream_read, mut stream_write) = stream.split();
let buffer = SSHBuffer::new();
let mut opening_cipher = Box::new(clear::Key) as Box<dyn OpeningKey + Send>;
std::mem::swap(&mut opening_cipher, &mut self.common.cipher.remote_to_local);
let keepalive_timer =
future_or_pending(self.common.config.keepalive_interval, tokio::time::sleep);
pin!(keepalive_timer);
let inactivity_timer =
future_or_pending(self.common.config.inactivity_timeout, tokio::time::sleep);
pin!(inactivity_timer);
let reading = start_reading(stream_read, buffer, opening_cipher);
pin!(reading);
let mut is_reading = None;
let mut decomp = CryptoVec::new();
#[allow(clippy::panic)] while !self.common.disconnected {
self.common.received_data = false;
let mut sent_keepalive = false;
tokio::select! {
r = &mut reading => {
let (stream_read, mut buffer, mut opening_cipher) = match r {
Ok((_, stream_read, buffer, opening_cipher)) => (stream_read, buffer, opening_cipher),
Err(e) => return Err(e.into())
};
if buffer.buffer.len() < 5 {
is_reading = Some((stream_read, buffer, opening_cipher));
break
}
#[allow(clippy::indexing_slicing)] let buf = if let Some(ref mut enc) = self.common.encrypted {
let d = enc.decompress.decompress(
&buffer.buffer[5..],
&mut decomp,
);
if let Ok(buf) = d {
buf
} else {
debug!("err = {:?}", d);
is_reading = Some((stream_read, buffer, opening_cipher));
break
}
} else {
&buffer.buffer[5..]
};
if !buf.is_empty() {
#[allow(clippy::indexing_slicing)] if buf[0] == crate::msg::DISCONNECT {
debug!("break");
is_reading = Some((stream_read, buffer, opening_cipher));
break;
} else {
self.common.received_data = true;
std::mem::swap(&mut opening_cipher, &mut self.common.cipher.remote_to_local);
match reply(&mut self, &mut handler, &mut buffer.seqn, buf).await {
Ok(_) => {},
Err(e) => return Err(e),
}
std::mem::swap(&mut opening_cipher, &mut self.common.cipher.remote_to_local);
}
}
reading.set(start_reading(stream_read, buffer, opening_cipher));
}
() = &mut keepalive_timer => {
if self.common.config.keepalive_max != 0 && self.common.alive_timeouts > self.common.config.keepalive_max {
debug!("Timeout, client not responding to keepalives");
return Err(crate::Error::KeepaliveTimeout.into());
}
self.common.alive_timeouts = self.common.alive_timeouts.saturating_add(1);
sent_keepalive = true;
self.keepalive_request()?;
}
() = &mut inactivity_timer => {
debug!("timeout");
return Err(crate::Error::InactivityTimeout.into());
}
msg = self.receiver.recv(), if !self.is_rekeying() => {
match msg {
Some(Msg::Channel(id, ChannelMsg::Data { data })) => {
self.data(id, data)?;
}
Some(Msg::Channel(id, ChannelMsg::ExtendedData { ext, data })) => {
self.extended_data(id, ext, data)?;
}
Some(Msg::Channel(id, ChannelMsg::Eof)) => {
self.eof(id)?;
}
Some(Msg::Channel(id, ChannelMsg::Close)) => {
self.close(id)?;
}
Some(Msg::Channel(id, ChannelMsg::Success)) => {
self.channel_success(id)?;
}
Some(Msg::Channel(id, ChannelMsg::Failure)) => {
self.channel_failure(id)?;
}
Some(Msg::Channel(id, ChannelMsg::XonXoff { client_can_do })) => {
self.xon_xoff_request(id, client_can_do)?;
}
Some(Msg::Channel(id, ChannelMsg::ExitStatus { exit_status })) => {
self.exit_status_request(id, exit_status)?;
}
Some(Msg::Channel(id, ChannelMsg::ExitSignal { signal_name, core_dumped, error_message, lang_tag })) => {
self.exit_signal_request(id, signal_name, core_dumped, &error_message, &lang_tag)?;
}
Some(Msg::Channel(id, ChannelMsg::WindowAdjusted { new_size })) => {
debug!("window adjusted to {:?} for channel {:?}", new_size, id);
}
Some(Msg::ChannelOpenAgent { channel_ref }) => {
let id = self.channel_open_agent()?;
self.channels.insert(id, channel_ref);
}
Some(Msg::ChannelOpenSession { channel_ref }) => {
let id = self.channel_open_session()?;
self.channels.insert(id, channel_ref);
}
Some(Msg::ChannelOpenDirectTcpIp { host_to_connect, port_to_connect, originator_address, originator_port, channel_ref }) => {
let id = self.channel_open_direct_tcpip(&host_to_connect, port_to_connect, &originator_address, originator_port)?;
self.channels.insert(id, channel_ref);
}
Some(Msg::ChannelOpenForwardedTcpIp { connected_address, connected_port, originator_address, originator_port, channel_ref }) => {
let id = self.channel_open_forwarded_tcpip(&connected_address, connected_port, &originator_address, originator_port)?;
self.channels.insert(id, channel_ref);
}
Some(Msg::ChannelOpenForwardedStreamLocal { server_socket_path, channel_ref }) => {
let id = self.channel_open_forwarded_streamlocal(&server_socket_path)?;
self.channels.insert(id, channel_ref);
}
Some(Msg::ChannelOpenX11 { originator_address, originator_port, channel_ref }) => {
let id = self.channel_open_x11(&originator_address, originator_port)?;
self.channels.insert(id, channel_ref);
}
Some(Msg::TcpIpForward { address, port, reply_channel }) => {
self.tcpip_forward(&address, port, reply_channel)?;
}
Some(Msg::CancelTcpIpForward { address, port, reply_channel }) => {
self.cancel_tcpip_forward(&address, port, reply_channel)?;
}
Some(Msg::Disconnect {reason, description, language_tag}) => {
self.common.disconnect(reason, &description, &language_tag)?;
}
Some(_) => {
unimplemented!("unimplemented (client-only?) message: {:?}", msg)
}
None => {
debug!("self.receiver: received None");
}
}
}
}
self.flush()?;
map_err!(
stream_write
.write_all(&self.common.write_buffer.buffer)
.await
)?;
self.common.write_buffer.buffer.clear();
if self.common.received_data {
self.common.alive_timeouts = 0;
}
if self.common.received_data || sent_keepalive {
if let (futures::future::Either::Right(ref mut sleep), Some(d)) = (
keepalive_timer.as_mut().as_pin_mut(),
self.common.config.keepalive_interval,
) {
sleep.as_mut().reset(tokio::time::Instant::now() + d);
}
}
if !sent_keepalive {
if let (futures::future::Either::Right(ref mut sleep), Some(d)) = (
inactivity_timer.as_mut().as_pin_mut(),
self.common.config.inactivity_timeout,
) {
sleep.as_mut().reset(tokio::time::Instant::now() + d);
}
}
}
debug!("disconnected");
map_err!(stream_write.shutdown().await)?;
loop {
if let Some((stream_read, buffer, opening_cipher)) = is_reading.take() {
reading.set(start_reading(stream_read, buffer, opening_cipher));
}
let (n, r, b, opening_cipher) = (&mut reading).await?;
is_reading = Some((r, b, opening_cipher));
if n == 0 {
break;
}
}
Ok(())
}
pub fn handle(&self) -> Handle {
self.sender.clone()
}
pub fn writable_packet_size(&self, channel: &ChannelId) -> u32 {
if let Some(ref enc) = self.common.encrypted {
if let Some(channel) = enc.channels.get(channel) {
return channel
.sender_window_size
.min(channel.sender_maximum_packet_size);
}
}
0
}
pub fn window_size(&self, channel: &ChannelId) -> u32 {
if let Some(ref enc) = self.common.encrypted {
if let Some(channel) = enc.channels.get(channel) {
return channel.sender_window_size;
}
}
0
}
pub fn max_packet_size(&self, channel: &ChannelId) -> u32 {
if let Some(ref enc) = self.common.encrypted {
if let Some(channel) = enc.channels.get(channel) {
return channel.sender_maximum_packet_size;
}
}
0
}
pub fn flush(&mut self) -> Result<(), Error> {
if let Some(ref mut enc) = self.common.encrypted {
if enc.flush(
&self.common.config.as_ref().limits,
&mut *self.common.cipher.local_to_remote,
&mut self.common.write_buffer,
)? && enc.rekey.is_none()
{
debug!("starting rekeying");
if let Some(exchange) = enc.exchange.take() {
let mut kexinit = KexInit::initiate_rekey(exchange, &enc.session_id);
kexinit.server_write(
self.common.config.as_ref(),
&mut *self.common.cipher.local_to_remote,
&mut self.common.write_buffer,
)?;
enc.rekey = Some(Kex::Init(kexinit))
}
}
}
Ok(())
}
pub fn flush_pending(&mut self, channel: ChannelId) -> Result<usize, Error> {
if let Some(ref mut enc) = self.common.encrypted {
enc.flush_pending(channel)
} else {
Ok(0)
}
}
pub fn sender_window_size(&self, channel: ChannelId) -> usize {
if let Some(ref enc) = self.common.encrypted {
enc.sender_window_size(channel)
} else {
0
}
}
pub fn has_pending_data(&self, channel: ChannelId) -> bool {
if let Some(ref enc) = self.common.encrypted {
enc.has_pending_data(channel)
} else {
false
}
}
pub fn config(&self) -> &Config {
&self.common.config
}
pub fn disconnect(
&mut self,
reason: Disconnect,
description: &str,
language_tag: &str,
) -> Result<(), Error> {
self.common.disconnect(reason, description, language_tag)
}
pub fn request_success(&mut self) {
if self.common.wants_reply {
if let Some(ref mut enc) = self.common.encrypted {
self.common.wants_reply = false;
push_packet!(enc.write, enc.write.push(msg::REQUEST_SUCCESS))
}
}
}
pub fn request_failure(&mut self) {
if let Some(ref mut enc) = self.common.encrypted {
self.common.wants_reply = false;
push_packet!(enc.write, enc.write.push(msg::REQUEST_FAILURE))
}
}
pub fn channel_success(&mut self, channel: ChannelId) -> Result<(), crate::Error> {
if let Some(ref mut enc) = self.common.encrypted {
if let Some(channel) = enc.channels.get_mut(&channel) {
assert!(channel.confirmed);
if channel.wants_reply {
channel.wants_reply = false;
debug!("channel_success {:?}", channel);
push_packet!(enc.write, {
msg::CHANNEL_SUCCESS.encode(&mut enc.write)?;
channel.recipient_channel.encode(&mut enc.write)?;
})
}
}
}
Ok(())
}
pub fn channel_failure(&mut self, channel: ChannelId) -> Result<(), crate::Error> {
if let Some(ref mut enc) = self.common.encrypted {
if let Some(channel) = enc.channels.get_mut(&channel) {
assert!(channel.confirmed);
if channel.wants_reply {
channel.wants_reply = false;
push_packet!(enc.write, {
enc.write.push(msg::CHANNEL_FAILURE);
channel.recipient_channel.encode(&mut enc.write)?;
})
}
}
}
Ok(())
}
pub fn channel_open_failure(
&mut self,
channel: ChannelId,
reason: ChannelOpenFailure,
description: &str,
language: &str,
) -> Result<(), crate::Error> {
if let Some(ref mut enc) = self.common.encrypted {
push_packet!(enc.write, {
enc.write.push(msg::CHANNEL_OPEN_FAILURE);
channel.encode(&mut enc.write)?;
(reason as u32).encode(&mut enc.write)?;
description.encode(&mut enc.write)?;
language.encode(&mut enc.write)?;
})
}
Ok(())
}
pub fn close(&mut self, channel: ChannelId) -> Result<(), Error> {
self.common.byte(channel, msg::CHANNEL_CLOSE)
}
pub fn eof(&mut self, channel: ChannelId) -> Result<(), Error> {
self.common.byte(channel, msg::CHANNEL_EOF)
}
pub fn data(&mut self, channel: ChannelId, data: CryptoVec) -> Result<(), Error> {
if let Some(ref mut enc) = self.common.encrypted {
enc.data(channel, data)
} else {
unreachable!()
}
}
pub fn extended_data(
&mut self,
channel: ChannelId,
extended: u32,
data: CryptoVec,
) -> Result<(), Error> {
if let Some(ref mut enc) = self.common.encrypted {
enc.extended_data(channel, extended, data)
} else {
unreachable!()
}
}
pub fn xon_xoff_request(
&mut self,
channel: ChannelId,
client_can_do: bool,
) -> Result<(), Error> {
if let Some(ref mut enc) = self.common.encrypted {
if let Some(channel) = enc.channels.get(&channel) {
assert!(channel.confirmed);
push_packet!(enc.write, {
msg::CHANNEL_REQUEST.encode(&mut enc.write)?;
channel.recipient_channel.encode(&mut enc.write)?;
"xon-xoff".encode(&mut enc.write)?;
0u8.encode(&mut enc.write)?;
(client_can_do as u8).encode(&mut enc.write)?;
})
}
}
Ok(())
}
pub fn keepalive_request(&mut self) -> Result<(), Error> {
let want_reply = u8::from(true);
if let Some(ref mut enc) = self.common.encrypted {
self.open_global_requests
.push_back(GlobalRequestResponse::Keepalive);
push_packet!(enc.write, {
msg::GLOBAL_REQUEST.encode(&mut enc.write)?;
"keepalive@openssh.com".encode(&mut enc.write)?;
want_reply.encode(&mut enc.write)?;
})
}
Ok(())
}
pub fn exit_status_request(
&mut self,
channel: ChannelId,
exit_status: u32,
) -> Result<(), Error> {
if let Some(ref mut enc) = self.common.encrypted {
if let Some(channel) = enc.channels.get(&channel) {
assert!(channel.confirmed);
push_packet!(enc.write, {
msg::CHANNEL_REQUEST.encode(&mut enc.write)?;
channel.recipient_channel.encode(&mut enc.write)?;
"exit-status".encode(&mut enc.write)?;
0u8.encode(&mut enc.write)?;
exit_status.encode(&mut enc.write)?;
})
}
}
Ok(())
}
pub fn exit_signal_request(
&mut self,
channel: ChannelId,
signal: Sig,
core_dumped: bool,
error_message: &str,
language_tag: &str,
) -> Result<(), Error> {
if let Some(ref mut enc) = self.common.encrypted {
if let Some(channel) = enc.channels.get(&channel) {
assert!(channel.confirmed);
push_packet!(enc.write, {
msg::CHANNEL_REQUEST.encode(&mut enc.write)?;
channel.recipient_channel.encode(&mut enc.write)?;
"exit-signal".encode(&mut enc.write)?;
0u8.encode(&mut enc.write)?;
signal.name().encode(&mut enc.write)?;
(core_dumped as u8).encode(&mut enc.write)?;
error_message.encode(&mut enc.write)?;
language_tag.encode(&mut enc.write)?;
})
}
}
Ok(())
}
pub fn channel_open_session(&mut self) -> Result<ChannelId, Error> {
self.channel_open_generic(b"session", |_| Ok(()))
}
pub fn channel_open_direct_tcpip(
&mut self,
host_to_connect: &str,
port_to_connect: u32,
originator_address: &str,
originator_port: u32,
) -> Result<ChannelId, Error> {
self.channel_open_generic(b"direct-tcpip", |write| {
host_to_connect.encode(write)?;
port_to_connect.encode(write)?; originator_address.encode(write)?;
originator_port.encode(write)?; Ok(())
})
}
pub fn channel_open_forwarded_tcpip(
&mut self,
connected_address: &str,
connected_port: u32,
originator_address: &str,
originator_port: u32,
) -> Result<ChannelId, Error> {
self.channel_open_generic(b"forwarded-tcpip", |write| {
connected_address.encode(write)?;
connected_port.encode(write)?; originator_address.encode(write)?;
originator_port.encode(write)?; Ok(())
})
}
pub fn channel_open_forwarded_streamlocal(
&mut self,
socket_path: &str,
) -> Result<ChannelId, Error> {
self.channel_open_generic(b"forwarded-streamlocal@openssh.com", |write| {
socket_path.encode(write)?;
"".encode(write)?;
Ok(())
})
}
pub fn channel_open_x11(
&mut self,
originator_address: &str,
originator_port: u32,
) -> Result<ChannelId, Error> {
self.channel_open_generic(b"x11", |write| {
originator_address.encode(write)?;
originator_port.encode(write)?;
Ok(())
})
}
pub fn channel_open_agent(&mut self) -> Result<ChannelId, Error> {
self.channel_open_generic(b"auth-agent@openssh.com", |_| Ok(()))
}
fn channel_open_generic<F>(&mut self, kind: &[u8], write_suffix: F) -> Result<ChannelId, Error>
where
F: FnOnce(&mut CryptoVec) -> Result<(), Error>,
{
let result = if let Some(ref mut enc) = self.common.encrypted {
if !matches!(
enc.state,
EncryptedState::Authenticated | EncryptedState::InitCompression
) {
return Err(Error::Inconsistent);
}
let sender_channel = enc.new_channel(
self.common.config.window_size,
self.common.config.maximum_packet_size,
);
push_packet!(enc.write, {
enc.write.push(msg::CHANNEL_OPEN);
kind.encode(&mut enc.write)?;
sender_channel.encode(&mut enc.write)?;
self.common
.config
.as_ref()
.window_size
.encode(&mut enc.write)?;
self.common
.config
.as_ref()
.maximum_packet_size
.encode(&mut enc.write)?;
write_suffix(&mut enc.write)?;
});
sender_channel
} else {
return Err(Error::Inconsistent);
};
Ok(result)
}
pub fn tcpip_forward(
&mut self,
address: &str,
port: u32,
reply_channel: Option<oneshot::Sender<Option<u32>>>,
) -> Result<(), Error> {
if let Some(ref mut enc) = self.common.encrypted {
let want_reply = reply_channel.is_some();
if let Some(reply_channel) = reply_channel {
self.open_global_requests.push_back(
crate::session::GlobalRequestResponse::TcpIpForward(reply_channel),
);
}
push_packet!(enc.write, {
enc.write.push(msg::GLOBAL_REQUEST);
"tcpip-forward".encode(&mut enc.write)?;
(want_reply as u8).encode(&mut enc.write)?;
address.encode(&mut enc.write)?;
port.encode(&mut enc.write)?;
});
}
Ok(())
}
pub fn cancel_tcpip_forward(
&mut self,
address: &str,
port: u32,
reply_channel: Option<oneshot::Sender<bool>>,
) -> Result<(), Error> {
if let Some(ref mut enc) = self.common.encrypted {
let want_reply = reply_channel.is_some();
if let Some(reply_channel) = reply_channel {
self.open_global_requests.push_back(
crate::session::GlobalRequestResponse::CancelTcpIpForward(reply_channel),
);
}
push_packet!(enc.write, {
msg::GLOBAL_REQUEST.encode(&mut enc.write)?;
"cancel-tcpip-forward".encode(&mut enc.write)?;
(want_reply as u8).encode(&mut enc.write)?;
address.encode(&mut enc.write)?;
port.encode(&mut enc.write)?;
});
}
Ok(())
}
pub fn remote_sshid(&self) -> &[u8] {
&self.common.remote_sshid
}
pub(crate) fn maybe_send_ext_info(&mut self) -> Result<(), Error> {
if let Some(ref mut enc) = self.common.encrypted {
let mut key_extension_client = false;
if let Some(e) = &enc.exchange {
let Some(mut r) = &e.client_kex_init.as_ref().get(17..) else {
return Ok(());
};
if let Ok(kex_string) = String::decode(&mut r) {
use super::negotiation::Select;
key_extension_client = super::negotiation::Server::select(
&[EXTENSION_SUPPORT_AS_CLIENT],
&parse_kex_algo_list(&kex_string),
AlgorithmKind::Kex,
)
.is_ok();
}
}
if !key_extension_client {
debug!("RFC 8308 Extension Negotiation not supported by client");
return Ok(());
}
push_packet!(enc.write, {
msg::EXT_INFO.encode(&mut enc.write)?;
1u32.encode(&mut enc.write)?;
"server-sig-algs".encode(&mut enc.write)?;
NameList(
self.common
.config
.preferred
.key
.iter()
.map(|x| x.to_string())
.collect(),
)
.encode(&mut enc.write)?;
});
}
Ok(())
}
}