use crate::ServerError;
use crate::server::chancomms::{SwitchboardReceiver, SwitchboardSender};
use crate::server::controlchan;
use crate::server::ftpserver::error::ListenerError;
use crate::server::ftpserver::listen_prebound::PreboundListener;
use crate::server::switchboard::SocketAddrPair;
use std::net::SocketAddr;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::mpsc::{Sender, channel};
use unftp_core::auth::UserDetail;
use unftp_core::storage::StorageBackend;
fn spawn_data_acceptors(listeners: Vec<TcpListener>, tx: Sender<Result<(TcpStream, SocketAddrPair), ServerError>>) {
for listener in listeners.into_iter() {
let tx = tx.clone();
tokio::spawn(async move {
let destination = match listener.local_addr() {
Ok(a) => a,
Err(e) => {
let _ = tx.send(Err(e.into())).await;
return;
}
};
loop {
match listener.accept().await {
Ok((stream, source)) => {
let msg = (stream, SocketAddrPair { source, destination });
if tx.send(Ok(msg)).await.is_err() {
break;
}
}
Err(e) => {
if tx.send(Err(e.into())).await.is_err() {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
}
}
}
});
}
}
impl<Storage, User> PreboundListener<Storage, User>
where
Storage: StorageBackend<User> + 'static,
User: UserDetail + 'static,
{
pub async fn listen_pooled(mut self) -> std::result::Result<(), ServerError> {
let control_listener = tokio::net::TcpListener::bind(self.bind_address).await?;
let mut passive_listeners: Vec<tokio::net::TcpListener> = Vec::new();
let listener_ip = control_listener.local_addr()?.ip();
for port in self.options.passive_ports.clone() {
let addr = SocketAddr::new(listener_ip, port);
passive_listeners.push(tokio::net::TcpListener::bind(addr).await?);
}
let (data_tx, mut data_rx) = channel::<Result<(TcpStream, SocketAddrPair), ServerError>>(128);
spawn_data_acceptors(passive_listeners, data_tx);
let (switchboard_msg_tx, mut switchboard_msg_rx): (SwitchboardSender<Storage, User>, SwitchboardReceiver<Storage, User>) = channel(1);
loop {
tokio::select! {
Ok((tcp_stream, socket_addr)) = control_listener.accept() => {
slog::info!(self.logger, "Incoming control connection from {:?}", socket_addr);
let params: controlchan::LoopConfig<Storage,User> = (&self.options).into();
let conn = SocketAddrPair { source: socket_addr, destination: self.bind_address };
let result = controlchan::spawn_loop::<Storage,User>(params, tcp_stream, Some(conn), Some(switchboard_msg_tx.clone()), self.shutdown_topic.subscribe().await, self.failed_logins.clone()).await;
if let Err(e) = result {
slog::warn!(self.logger, "Could not spawn control channel loop for connection: {:?}", e);
}
},
incoming = data_rx.recv() => {
match incoming {
Some(Ok((stream, addr_pair))) => {
self.dispatch_data_connection(stream, addr_pair).await;
}
Some(Err(e)) => {
slog::warn!(self.logger, "Could not accept data connection: {:?}", e)
}
None => {
slog::warn!(self.logger, "data acceptor channel closed");
let listener_err = ListenerError { msg: "Data acceptor listener channels closed unexpectedly".to_string() };
return Err(listener_err.into());
}
}
}
Some(msg) = switchboard_msg_rx.recv() => {
self.handle_switchboard_message(msg).await;
},
}
}
}
}