use crate::ServerError;
use crate::server::chancomms::{SwitchboardReceiver, SwitchboardSender};
use crate::server::controlchan;
use crate::server::ftpserver::listen_prebound::PreboundListener;
use crate::server::proxy_protocol::{ProxyHeaderReceived, spawn_proxy_header_parsing};
use tokio::io::AsyncWriteExt;
use tokio::sync::mpsc::{Receiver, Sender, channel};
use unftp_core::auth::UserDetail;
use unftp_core::storage::StorageBackend;
impl<Storage, User> PreboundListener<Storage, User>
where
Storage: StorageBackend<User> + 'static,
User: UserDetail + 'static,
{
pub async fn listen_proxy_protocol(mut self) -> std::result::Result<(), ServerError> {
let listener = tokio::net::TcpListener::bind(self.bind_address).await?;
let (switchboard_msg_tx, mut switchboard_msg_rx): (SwitchboardSender<Storage, User>, SwitchboardReceiver<Storage, User>) = channel(1);
let (proxy_msg_tx, mut proxy_msg_rx): (Sender<ProxyHeaderReceived>, Receiver<ProxyHeaderReceived>) = channel(1);
loop {
tokio::select! {
Ok((tcp_stream, _socket_addr)) = listener.accept() => {
let socket_addr = tcp_stream.peer_addr();
slog::info!(self.logger, "Incoming proxy connection from {:?}", socket_addr);
spawn_proxy_header_parsing(self.logger.clone(), tcp_stream, proxy_msg_tx.clone());
},
Some(msg) = proxy_msg_rx.recv() => match msg {
ProxyHeaderReceived (connection, mut tcp_stream) => {
let socket_addr = tcp_stream.peer_addr();
let destination_port = connection.destination.port();
if Some(destination_port) == self.external_control_port {
slog::info!(self.logger, "Incoming control connection: {:?} ({:?})(control port: {:?})", connection, socket_addr, self.external_control_port);
let params: controlchan::LoopConfig<Storage,User> = (&self.options).into();
let result = controlchan::spawn_loop::<Storage,User>(params, tcp_stream, Some(connection), Some(switchboard_msg_tx.clone()), self.shutdown_topic.subscribe().await, self.failed_logins.clone()).await;
if let Err(e) = result {
slog::warn!(self.logger, "Could not spawn control channel loop for connection: {:?}", e);
}
} else {
slog::info!(self.logger, "Incoming data connection: {:?} ({:?}) (range: {:?})", connection, socket_addr, self.options.passive_ports);
if !self.options.passive_ports.contains(&destination_port) {
slog::warn!(self.logger, "Incoming proxy connection going to unconfigured port! This port is not configured as a passive listening port: port {} not in passive port range {:?}", destination_port, self.options.passive_ports);
tcp_stream.shutdown().await?;
continue;
}
self.dispatch_data_connection(tcp_stream, connection).await;
}
},
},
Some(msg) = switchboard_msg_rx.recv() => {
self.handle_switchboard_message(msg).await;
},
}
}
}
}