torrust_tracker/servers/udp/server/
request_buffer.rs

1use ringbuf::traits::{Consumer, Observer, Producer};
2use ringbuf::StaticRb;
3use tokio::task::AbortHandle;
4
5use crate::servers::udp::UDP_TRACKER_LOG_TARGET;
6
7/// A ring buffer for managing active UDP request abort handles.
8///
9/// The `ActiveRequests` struct maintains a fixed-size ring buffer of abort
10/// handles for UDP request processor tasks. It ensures that at most 50 requests
11/// are handled concurrently, and provides mechanisms to handle buffer overflow
12/// by removing finished or oldest unfinished tasks.
13#[derive(Default)]
14pub struct ActiveRequests {
15    rb: StaticRb<AbortHandle, 50>, // The number of requests handled simultaneously.
16}
17
18impl std::fmt::Debug for ActiveRequests {
19    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
20        let (left, right) = &self.rb.as_slices();
21        let dbg = format!("capacity: {}, left: {left:?}, right: {right:?}", &self.rb.capacity());
22        f.debug_struct("ActiveRequests").field("rb", &dbg).finish()
23    }
24}
25
26impl Drop for ActiveRequests {
27    fn drop(&mut self) {
28        for h in self.rb.pop_iter() {
29            if !h.is_finished() {
30                h.abort();
31            }
32        }
33    }
34}
35
36impl ActiveRequests {
37    /// Inserts an abort handle for a UDP request processor task.
38    ///
39    /// If the buffer is full, this method attempts to make space by:
40    ///
41    /// 1. Removing finished tasks.
42    /// 2. Removing the oldest unfinished task if no finished tasks are found.
43    ///
44    /// # Panics
45    ///
46    /// This method will panic if it cannot make space for adding a new handle.
47    ///
48    /// # Arguments
49    ///
50    /// * `abort_handle` - The `AbortHandle` for the UDP request processor task.
51    /// * `local_addr` - A string slice representing the local address for logging.
52    pub async fn force_push(&mut self, new_task: AbortHandle, local_addr: &str) {
53        // Attempt to add the new handle to the buffer.
54        match self.rb.try_push(new_task) {
55            Ok(()) => {
56                // Successfully added the task, no further action needed.
57            }
58            Err(new_task) => {
59                // Buffer is full, attempt to make space.
60
61                let mut finished: u64 = 0;
62                let mut unfinished_task = None;
63
64                for old_task in self.rb.pop_iter() {
65                    // We found a finished tasks ... increase the counter and
66                    // continue searching for more and ...
67                    if old_task.is_finished() {
68                        finished += 1;
69                        continue;
70                    }
71
72                    // The current removed tasks is not finished.
73
74                    // Give it a second chance to finish.
75                    tokio::task::yield_now().await;
76
77                    // Recheck if it finished ... increase the counter and
78                    // continue searching for more and ...
79                    if old_task.is_finished() {
80                        finished += 1;
81                        continue;
82                    }
83
84                    // At this point we found a "definitive" unfinished task.
85
86                    // Log unfinished task.
87                    tracing::debug!(
88                        target: UDP_TRACKER_LOG_TARGET,
89                        local_addr,
90                        removed_count = finished,
91                        "Udp::run_udp_server::loop (got unfinished task)"
92                    );
93
94                    // If no finished tasks were found, abort the current
95                    // unfinished task.
96                    if finished == 0 {
97                        // We make place aborting this task.
98                        old_task.abort();
99
100                        tracing::warn!(
101                            target: UDP_TRACKER_LOG_TARGET,
102                            local_addr,
103                            "Udp::run_udp_server::loop aborting request: (no finished tasks)"
104                        );
105
106                        break;
107                    }
108
109                    // At this point we found at least one finished task, but the
110                    // current one is not finished and it was removed from the
111                    // buffer, so we need to re-insert in in the buffer.
112
113                    // Save the unfinished task for re-entry.
114                    unfinished_task = Some(old_task);
115                }
116
117                // After this point there can't be a race condition because only
118                // one thread owns the active buffer. There is no way for the
119                // buffer to be full again. That means the "expects" should
120                // never happen.
121
122                // Reinsert the unfinished task if any.
123                if let Some(h) = unfinished_task {
124                    self.rb.try_push(h).expect("it was previously inserted");
125                }
126
127                // Insert the new task.
128                //
129                // Notice that space has already been made for this new task in
130                // the buffer. One or many old task have already been finished
131                // or yielded, freeing space in the buffer. Or a single
132                // unfinished task has been aborted to make space for this new
133                // task.
134                if !new_task.is_finished() {
135                    self.rb.try_push(new_task).expect("it should have space for this new task.");
136                }
137            }
138        };
139    }
140}