use crate::{
auth::UserDetail,
server::{
ControlChanMsg,
chancomms::DataChanCmd,
controlchan::{
Reply, ReplyCode,
error::ControlChanError,
handler::{CommandContext, CommandHandler},
},
datachan,
session::SharedSession,
},
storage::{Metadata, StorageBackend},
};
use async_trait::async_trait;
use std::io;
use std::net::{Ipv4Addr, SocketAddrV4};
use tokio::net::TcpStream;
use tokio::sync::mpsc::{Receiver, Sender, channel};
#[derive(Debug)]
pub struct Port {
addr: String,
}
impl Port {
pub fn new(addr: String) -> Self {
Port { addr }
}
#[tracing_attributes::instrument]
async fn setup_inter_loop_comms<S, U>(&self, session: SharedSession<S, U>, control_loop_tx: Sender<ControlChanMsg>)
where
U: UserDetail + 'static,
S: StorageBackend<U> + 'static,
S::Metadata: Metadata,
{
let (cmd_tx, cmd_rx): (Sender<DataChanCmd>, Receiver<DataChanCmd>) = channel(1);
let (data_abort_tx, data_abort_rx): (Sender<()>, Receiver<()>) = channel(1);
let mut session = session.lock().await;
session.data_cmd_tx = Some(cmd_tx);
session.data_cmd_rx = Some(cmd_rx);
session.data_abort_tx = Some(data_abort_tx);
session.data_abort_rx = Some(data_abort_rx);
session.control_msg_tx = Some(control_loop_tx);
}
}
#[async_trait]
impl<Storage, User> CommandHandler<Storage, User> for Port
where
User: UserDetail + 'static,
Storage: StorageBackend<User> + 'static,
Storage::Metadata: Metadata,
{
#[tracing_attributes::instrument]
async fn handle(&self, args: CommandContext<Storage, User>) -> Result<Reply, ControlChanError> {
let CommandContext {
logger,
tx_control_chan: tx,
session,
..
} = args;
let bytes: Vec<u8> = self.addr.split(',').map(|x| x.parse::<u8>()).filter_map(Result::ok).collect();
if bytes.len() != 6 {
return Ok(Reply::new(ReplyCode::ParameterSyntaxError, "Invalid address format"));
}
let port = ((bytes[4] as u16) << 8) | bytes[5] as u16;
let addr = SocketAddrV4::new(Ipv4Addr::new(bytes[0], bytes[1], bytes[2], bytes[3]), port);
let stream: io::Result<TcpStream> = TcpStream::connect(addr).await;
let stream = match stream {
Err(e) => {
slog::error!(logger, "Could not connect to client for active mode: {}", e);
return Ok(Reply::new(ReplyCode::CantOpenDataConnection, "Could not establish data connection"));
}
Ok(s) => s,
};
self.setup_inter_loop_comms(session.clone(), tx).await;
datachan::spawn_processing(logger, session, stream).await;
Ok(Reply::new(ReplyCode::CommandOkay, "PORT command successful"))
}
}