use super::SHUTDOWN_TIMEOUT;
use crate::available_reader::AvailableReader;
use crate::ordered_sender::OrderedMessageSender;
use crate::proxy_runner::KEEPALIVE_INTERVAL;
use futures::FutureExt;
use futures::StreamExt;
use log::*;
use nym_socks5_requests::{ConnectionId, SocketData};
use nym_task::connections::LaneQueueLengths;
use nym_task::connections::TransmissionLane;
use nym_task::ShutdownToken;
use std::sync::Arc;
use std::time::Duration;
use tokio::select;
use tokio::{net::tcp::OwnedReadHalf, sync::Notify, time::sleep};
async fn wait_until_lane_empty(lane_queue_lengths: &Option<LaneQueueLengths>, connection_id: u64) {
if let Some(lane_queue_lengths) = lane_queue_lengths {
if tokio::time::timeout(
Duration::from_secs(4 * 60),
wait_for_lane(
lane_queue_lengths,
connection_id,
0,
Duration::from_millis(500),
),
)
.await
.is_err()
{
log::warn!("Wait until lane empty timed out");
}
}
}
async fn wait_until_lane_almost_empty(
lane_queue_lengths: &Option<LaneQueueLengths>,
connection_id: u64,
) {
if let Some(lane_queue_lengths) = lane_queue_lengths {
if tokio::time::timeout(
Duration::from_secs(4 * 60),
wait_for_lane(
lane_queue_lengths,
connection_id,
30,
Duration::from_millis(100),
),
)
.await
.is_err()
{
log::debug!("Wait until lane almost empty timed out");
}
}
}
async fn wait_for_lane(
lane_queue_lengths: &LaneQueueLengths,
connection_id: u64,
queue_length_threshold: usize,
sleep_duration: Duration,
) {
while let Some(queue) = lane_queue_lengths.get(&TransmissionLane::ConnectionId(connection_id)) {
if queue > queue_length_threshold {
sleep(sleep_duration).await;
} else {
break;
}
}
}
pub(super) async fn run_inbound<F, S>(
mut reader: OwnedReadHalf,
mut message_sender: OrderedMessageSender<F, S>,
connection_id: ConnectionId,
available_plaintext_per_mix_packet: usize,
shutdown_notify: Arc<Notify>,
lane_queue_lengths: Option<LaneQueueLengths>,
shutdown_listener: ShutdownToken,
) -> OwnedReadHalf
where
F: Fn(SocketData) -> S + Send + 'static,
{
let mut available_reader =
AvailableReader::new(&mut reader, Some(available_plaintext_per_mix_packet * 4));
let shutdown_future = shutdown_notify.notified().then(|_| sleep(SHUTDOWN_TIMEOUT));
tokio::pin!(shutdown_future);
let mut keepalive_timer = tokio::time::interval(KEEPALIVE_INTERVAL);
let closing_notify = Arc::new(Notify::new());
let closing_future = closing_notify
.notified()
.then(|_| {
sleep(Duration::from_secs(2))
})
.then(|_| wait_until_lane_empty(&lane_queue_lengths, connection_id));
tokio::pin!(closing_future);
let mut we_are_closed = false;
loop {
select! {
biased;
_ = &mut shutdown_future => {
debug!(
"closing inbound proxy after outbound was closed {SHUTDOWN_TIMEOUT:?} ago"
);
message_sender.send_empty_close().await;
break;
}
_ = shutdown_listener.cancelled() => {
log::trace!("ProxyRunner inbound: Received shutdown");
break;
}
_ = &mut closing_future => {
debug!(
target: &*format!("({connection_id}) socks5 inbound"),
"The local socket is closed - won't receive any more data. Informing remote about that..."
);
break;
}
_ = keepalive_timer.tick() => {
message_sender.send_empty_keepalive().await;
}
read_data = wait_until_lane_almost_empty(&lane_queue_lengths, connection_id)
.then(|_| available_reader.next()), if !we_are_closed =>
{
let processed = message_sender.process_data(read_data);
let is_done = processed.is_done;
message_sender.send_data(processed).await;
if is_done {
closing_notify.notify_one();
we_are_closed = true;
}
keepalive_timer.reset();
}
}
}
trace!("{connection_id} - inbound closed");
shutdown_notify.notify_one();
reader
}