use super::{
chancomms::{ControlChanMsg, DataChanMsg},
tls::FtpsConfig,
};
use crate::server::session::SharedSession;
use crate::{
auth::UserDetail,
storage::{Error, ErrorKind, Metadata, StorageBackend},
};
use crate::server::chancomms::DataChanCmd;
use std::{path::PathBuf, sync::Arc};
use tokio::io::AsyncWriteExt;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio_rustls::TlsAcceptor;
#[derive(Debug)]
struct DataCommandExecutor<Storage, User>
where
Storage: StorageBackend<User>,
Storage::Metadata: Metadata,
User: UserDetail,
{
pub user: Arc<Option<User>>,
pub socket: tokio::net::TcpStream,
pub control_msg_tx: Sender<ControlChanMsg>,
pub storage: Arc<Storage>,
pub cwd: PathBuf,
pub start_pos: u64,
pub ftps_mode: FtpsConfig,
pub logger: slog::Logger,
pub data_cmd_rx: Option<Receiver<DataChanCmd>>,
pub data_abort_rx: Option<Receiver<()>>,
}
impl<Storage, User> DataCommandExecutor<Storage, User>
where
Storage: StorageBackend<User> + 'static,
Storage::Metadata: Metadata,
User: UserDetail + 'static,
{
async fn execute(mut self, session_arc: SharedSession<Storage, User>) {
let mut data_cmd_rx = self.data_cmd_rx.take().unwrap();
let mut data_abort_rx = self.data_abort_rx.take().unwrap();
let mut timeout_delay = Box::pin(tokio::time::sleep(std::time::Duration::from_secs(5 * 60)));
tokio::select! {
Some(command) = data_cmd_rx.recv() => {
self.handle_incoming(DataChanMsg::ExternalCommand(command)).await;
},
Some(_) = data_abort_rx.recv() => {
self.handle_incoming(DataChanMsg::Abort).await;
},
_ = &mut timeout_delay => {
slog::warn!(self.logger, "Data channel connection timed out");
}
};
let mut session = session_arc.lock().await;
session.data_busy = false;
}
#[tracing_attributes::instrument]
async fn handle_incoming(self, incoming: DataChanMsg) {
match incoming {
DataChanMsg::Abort => {
slog::info!(self.logger, "Data channel abort received");
}
DataChanMsg::ExternalCommand(command) => {
let p = command.path().unwrap_or_default();
slog::info!(self.logger, "Data channel command received: {:?}", command; "path" => p);
self.execute_command(command).await;
}
}
}
#[tracing_attributes::instrument]
async fn execute_command(self, cmd: DataChanCmd) {
match cmd {
DataChanCmd::Retr { path } => {
self.exec_retr(path).await;
}
DataChanCmd::Stor { path } => {
self.exec_stor(path).await;
}
DataChanCmd::List { path, .. } => {
self.exec_list(path).await;
}
DataChanCmd::Nlst { path } => {
self.exec_nlst(path).await;
}
}
}
#[tracing_attributes::instrument]
async fn exec_retr(self, path: String) {
let path_copy = path.clone();
let path = self.cwd.join(path);
let tx_sending: Sender<ControlChanMsg> = self.control_msg_tx.clone();
let tx_error: Sender<ControlChanMsg> = self.control_msg_tx.clone();
let mut output = Self::writer(self.socket, self.ftps_mode).await;
let get_result = self.storage.get_into((*self.user).as_ref().unwrap(), path, self.start_pos, &mut output).await;
match get_result {
Ok(bytes_copied) => {
if let Err(err) = output.shutdown().await {
slog::warn!(self.logger, "Could not shutdown output stream after RETR: {}", err);
}
if let Err(err) = tx_sending
.send(ControlChanMsg::SentData {
bytes: bytes_copied,
path: path_copy,
})
.await
{
slog::error!(self.logger, "Could not notify control channel of successful RETR: {}", err);
}
}
Err(err) => {
slog::warn!(self.logger, "Error copying streams during RETR: {}", err);
if let Err(err) = tx_error.send(ControlChanMsg::StorageError(err)).await {
slog::warn!(self.logger, "Could not notify control channel of error with RETR: {}", err);
}
}
}
}
#[tracing_attributes::instrument]
async fn exec_stor(self, path: String) {
let path_copy = path.clone();
let path = self.cwd.join(path);
let tx_ok = self.control_msg_tx.clone();
let tx_error = self.control_msg_tx.clone();
let put_result = self
.storage
.put(
(*self.user).as_ref().unwrap(),
Self::reader(self.socket, self.ftps_mode).await,
path,
self.start_pos,
)
.await;
match put_result {
Ok(bytes) => {
if let Err(err) = tx_ok.send(ControlChanMsg::WrittenData { bytes, path: path_copy }).await {
slog::error!(self.logger, "Could not notify control channel of successful STOR: {}", err);
}
}
Err(err) => {
if let Err(err) = tx_error.send(ControlChanMsg::StorageError(err)).await {
slog::error!(self.logger, "Could not notify control channel of error with STOR: {}", err);
}
}
}
}
#[tracing_attributes::instrument]
async fn exec_list(self, path: Option<String>) {
let path = match path {
Some(path) => {
if path == "." {
self.cwd.clone()
} else {
self.cwd.join(path)
}
}
None => self.cwd.clone(),
};
let tx_ok = self.control_msg_tx.clone();
let mut output = Self::writer(self.socket, self.ftps_mode).await;
let result = match self.storage.list_fmt((*self.user).as_ref().unwrap(), path).await {
Ok(cursor) => {
slog::debug!(self.logger, "Copying future for List");
let mut input = cursor;
match tokio::io::copy(&mut input, &mut output).await {
Ok(_) => Ok(ControlChanMsg::DirectorySuccessfullyListed),
Err(e) => Err(e),
}
}
Err(err) => {
slog::warn!(self.logger, "Failed to send directory list: {:?}", err);
match output.write_all(&format!("{}\r\n", err).into_bytes()).await {
Ok(_) => Ok(ControlChanMsg::DirectoryListFailure),
Err(e) => Err(e),
}
}
};
match result {
Ok(msg) => {
if let Err(err) = output.shutdown().await {
slog::warn!(self.logger, "Could not shutdown output stream during LIST: {}", err);
}
if let Err(err) = tx_ok.send(msg).await {
slog::error!(self.logger, "Could not notify control channel of LIST result: {}", err);
}
}
Err(err) => slog::warn!(self.logger, "Failed to send reply to LIST: {}", err),
}
}
#[tracing_attributes::instrument]
async fn exec_nlst(self, path: Option<String>) {
let path = match path {
Some(path) => self.cwd.join(path),
None => self.cwd.clone(),
};
let tx_ok = self.control_msg_tx.clone();
let tx_error = self.control_msg_tx.clone();
match self.storage.nlst((*self.user).as_ref().unwrap(), path).await {
Ok(mut input) => {
let mut output = Self::writer(self.socket, self.ftps_mode).await;
match tokio::io::copy(&mut input, &mut output).await {
Ok(_) => {
if let Err(err) = output.shutdown().await {
slog::warn!(self.logger, "Could not shutdown output stream during NLIST: {}", err);
}
if let Err(err) = tx_ok.send(ControlChanMsg::DirectorySuccessfullyListed).await {
slog::error!(self.logger, "Could not notify control channel of successful NLIST: {}", err);
}
}
Err(err) => slog::warn!(self.logger, "Could not copy from storage implementation during NLST: {}", err),
}
}
Err(e) => {
if let Err(err) = tx_error.send(ControlChanMsg::StorageError(Error::new(ErrorKind::LocalError, e))).await {
slog::warn!(self.logger, "Could not notify control channel of error with NLIST: {}", err);
}
}
}
}
#[tracing_attributes::instrument]
async fn writer(socket: tokio::net::TcpStream, ftps_mode: FtpsConfig) -> Box<dyn tokio::io::AsyncWrite + Send + Unpin + Sync> {
match ftps_mode {
FtpsConfig::Off => Box::new(socket) as Box<dyn tokio::io::AsyncWrite + Send + Unpin + Sync>,
FtpsConfig::Building { .. } => panic!("Illegal state"),
FtpsConfig::On { tls_config } => {
let io = async move {
let acceptor: TlsAcceptor = tls_config.into();
acceptor.accept(socket).await.unwrap()
}
.await;
Box::new(io) as Box<dyn tokio::io::AsyncWrite + Send + Unpin + Sync>
}
}
}
#[tracing_attributes::instrument]
async fn reader(socket: tokio::net::TcpStream, ftps_mode: FtpsConfig) -> Box<dyn tokio::io::AsyncRead + Send + Unpin + Sync> {
match ftps_mode {
FtpsConfig::Off => Box::new(socket) as Box<dyn tokio::io::AsyncRead + Send + Unpin + Sync>,
FtpsConfig::Building { .. } => panic!("Illegal state"),
FtpsConfig::On { tls_config } => {
let io = async move {
let acceptor: TlsAcceptor = tls_config.into();
acceptor.accept(socket).await.unwrap()
}
.await;
Box::new(io) as Box<dyn tokio::io::AsyncRead + Send + Unpin + Sync>
}
}
}
}
#[tracing_attributes::instrument]
pub async fn spawn_processing<Storage, User>(logger: slog::Logger, session_arc: SharedSession<Storage, User>, mut socket: tokio::net::TcpStream)
where
Storage: StorageBackend<User> + 'static,
Storage::Metadata: Metadata,
User: UserDetail + 'static,
{
let command_executor = {
let mut session = session_arc.lock().await;
match socket.peer_addr() {
Ok(datachan_addr) => {
let controlcahn_ip = session.source.ip();
if controlcahn_ip != datachan_addr.ip() {
if let Err(err) = socket.shutdown().await {
slog::error!(
logger,
"Couldn't close datachannel for IP ({}) that does not match the IP({}) of the control channel.\n{:?}",
datachan_addr.ip(),
controlcahn_ip,
err
)
} else {
slog::warn!(
logger,
"Closing datachannel for IP ({}) that does not match the IP({}) of the control channel.",
datachan_addr.ip(),
controlcahn_ip
)
}
return;
}
}
Err(err) => {
slog::error!(logger, "Couldn't determine data channel address.\n{:?}", err);
return;
}
}
let username = session.username.as_ref().cloned().unwrap_or_else(|| String::from("unknown"));
let logger = logger.new(slog::o!("username" => username));
let control_msg_tx: Sender<ControlChanMsg> = match session.control_msg_tx {
Some(ref tx) => tx.clone(),
None => {
slog::error!(logger, "Control loop message sender expected to be set up. Aborting data loop.");
return;
}
};
let data_cmd_rx = match session.data_cmd_rx.take() {
Some(rx) => rx,
None => {
slog::error!(logger, "Data loop command receiver expected to be set up. Aborting data loop.");
return;
}
};
let data_abort_rx = match session.data_abort_rx.take() {
Some(rx) => rx,
None => {
slog::error!(logger, "Data loop abort receiver expected to be set up. Aborting data loop.");
return;
}
};
let ftps_mode = if session.data_tls { session.ftps_config.clone() } else { FtpsConfig::Off };
let command_executor = DataCommandExecutor {
user: session.user.clone(),
socket,
control_msg_tx,
storage: Arc::clone(&session.storage),
cwd: session.cwd.clone(),
start_pos: session.start_pos,
ftps_mode,
logger,
data_abort_rx: Some(data_abort_rx),
data_cmd_rx: Some(data_cmd_rx),
};
session.data_busy = true;
command_executor
};
tokio::spawn(command_executor.execute(session_arc));
}