nym_task/
connections.rs

1// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
2// SPDX-License-Identifier: Apache-2.0
3
4use futures::channel::mpsc;
5use std::collections::HashMap;
6
7// const LANE_CONSIDERED_CLEAR: usize = 10;
8
9pub type ConnectionId = u64;
10
11#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq)]
12pub enum TransmissionLane {
13    General,
14    // we need to treat surb-related requests and responses at higher priority
15    // so that the rest of underlying communication could actually continue
16    ReplySurbRequest,
17    AdditionalReplySurbs,
18    Retransmission,
19    ConnectionId(ConnectionId),
20}
21
22/// Used by the connection controller to report current state for client connections.
23pub type ConnectionCommandSender = mpsc::UnboundedSender<ConnectionCommand>;
24pub type ConnectionCommandReceiver = mpsc::UnboundedReceiver<ConnectionCommand>;
25
26pub enum ConnectionCommand {
27    // Announce that at a connection was closed. E.g the `OutQueueControl` uses this to discard
28    // transmission lanes.
29    Close(ConnectionId),
30}
31
32// The `OutQueueControl` publishes the backlog per lane, primarily so that upstream can slow down
33// if needed.
34#[derive(Clone, Debug)]
35pub struct LaneQueueLengths(std::sync::Arc<std::sync::Mutex<LaneQueueLengthsInner>>);
36
37impl LaneQueueLengths {
38    pub fn new() -> Self {
39        LaneQueueLengths(std::sync::Arc::new(std::sync::Mutex::new(
40            LaneQueueLengthsInner {
41                map: HashMap::new(),
42            },
43        )))
44    }
45
46    pub fn set(&mut self, lane: &TransmissionLane, lane_length: Option<usize>) {
47        match self.0.lock() {
48            Ok(mut inner) => {
49                if let Some(length) = lane_length {
50                    inner
51                        .map
52                        .entry(*lane)
53                        .and_modify(|e| *e = length)
54                        .or_insert(length);
55                } else {
56                    inner.map.remove(lane);
57                }
58            }
59            Err(err) => log::warn!("Failed to set lane queue length: {err}"),
60        }
61    }
62
63    pub fn get(&self, lane: &TransmissionLane) -> Option<usize> {
64        match self.0.lock() {
65            Ok(inner) => inner.get(lane),
66            Err(err) => {
67                log::warn!("Failed to get lane queue length: {err}");
68                None
69            }
70        }
71    }
72
73    pub fn total(&self) -> usize {
74        match self.0.lock() {
75            Ok(inner) => inner.values().sum(),
76            Err(err) => {
77                log::warn!("Failed to get total queue length: {err}");
78                0
79            }
80        }
81    }
82
83    // pub async fn wait_until_clear(&self, lane: &TransmissionLane, timeout: Option<Duration>) {
84    //     let total_time_waited = Instant::now();
85    //     loop {
86    //         let lane_length = self.get(lane).unwrap_or_default();
87    //         if lane_length < LANE_CONSIDERED_CLEAR {
88    //             break;
89    //         }
90    //         if timeout.is_some_and(|timeout| total_time_waited.elapsed() > timeout) {
91    //             log::warn!("Timeout reached while waiting for queue to clear");
92    //             break;
93    //         }
94    //         log::trace!("Waiting for queue to clear ({lane_length} items left)");
95    //         tokio::time::sleep(Duration::from_millis(100)).await;
96    //     }
97    // }
98}
99
100impl Default for LaneQueueLengths {
101    fn default() -> Self {
102        Self::new()
103    }
104}
105
106impl std::ops::Deref for LaneQueueLengths {
107    type Target = std::sync::Arc<std::sync::Mutex<LaneQueueLengthsInner>>;
108
109    fn deref(&self) -> &Self::Target {
110        &self.0
111    }
112}
113
114#[derive(Debug)]
115pub struct LaneQueueLengthsInner {
116    pub map: HashMap<TransmissionLane, usize>,
117}
118
119impl LaneQueueLengthsInner {
120    pub fn get(&self, lane: &TransmissionLane) -> Option<usize> {
121        self.map.get(lane).copied()
122    }
123
124    pub fn values(&self) -> impl Iterator<Item = &usize> {
125        self.map.values()
126    }
127
128    pub fn modify<F>(&mut self, lane: &TransmissionLane, f: F)
129    where
130        F: FnOnce(&mut usize),
131    {
132        self.map.entry(*lane).and_modify(f);
133    }
134
135    pub fn total(&self) -> usize {
136        self.map.values().sum()
137    }
138}