use super::SHUTDOWN_TIMEOUT;
use crate::connection_controller::{ConnectionMessage, ConnectionReceiver};
use futures::FutureExt;
use futures::StreamExt;
use log::*;
use nym_socks5_requests::ConnectionId;
use nym_task::ShutdownToken;
use std::{sync::Arc, time::Duration};
use tokio::io::AsyncWriteExt;
use tokio::select;
use tokio::{net::tcp::OwnedWriteHalf, sync::Notify, time::sleep, time::Instant};
const MIX_TTL: Duration = Duration::from_secs(5 * 60);
async fn deal_with_message(
connection_message: ConnectionMessage,
writer: &mut OwnedWriteHalf,
local_destination_address: &str,
remote_source_address: &str,
connection_id: ConnectionId,
) -> bool {
debug!(
target: &*format!("({connection_id}) socks5 outbound"),
"[{} bytes]\t{} → remote → mixnet → local → {} Remote closed: {}",
connection_message.payload.len(),
remote_source_address,
local_destination_address,
connection_message.socket_closed
);
if let Err(err) = writer.write_all(&connection_message.payload).await {
error!(target: &*format!("({connection_id}) socks5 outbound"), "failed to write response back to the socket - {err}");
return true;
}
if connection_message.socket_closed {
debug!(target: &*format!("({connection_id}) socks5 outbound"),
"Remote socket got closed - closing the local socket too");
return true;
}
false
}
pub(super) async fn run_outbound(
mut writer: OwnedWriteHalf,
local_destination_address: String, remote_source_address: String,
mut mix_receiver: ConnectionReceiver,
connection_id: ConnectionId,
shutdown_notify: Arc<Notify>,
shutdown_listener: ShutdownToken,
) -> (OwnedWriteHalf, ConnectionReceiver) {
let shutdown_future = shutdown_notify.notified().then(|_| sleep(SHUTDOWN_TIMEOUT));
tokio::pin!(shutdown_future);
let mut mix_timeout = Box::pin(sleep(MIX_TTL));
loop {
select! {
biased;
_ = shutdown_listener.cancelled() => {
log::trace!("ProxyRunner outbound: Received shutdown");
break;
}
connection_message = mix_receiver.next() => {
if let Some(connection_message) = connection_message {
if deal_with_message(connection_message, &mut writer, &local_destination_address, &remote_source_address, connection_id).await {
break;
}
mix_timeout.as_mut().reset(Instant::now() + MIX_TTL);
} else {
warn!("mix receiver is none so we already got removed somewhere. This isn't really a warning, but shouldn't happen to begin with, so please say if you see this message");
break;
}
}
_ = &mut mix_timeout => {
warn!("didn't get anything from the client on {connection_id} mixnet in {MIX_TTL:?}. Shutting down the proxy.");
break;
}
_ = &mut shutdown_future => {
debug!("closing outbound proxy after inbound was closed {SHUTDOWN_TIMEOUT:?} ago");
break;
}
}
}
trace!("{connection_id} - outbound closed");
shutdown_notify.notify_one();
(writer, mix_receiver)
}