torrust_tracker/servers/udp/server/request_buffer.rs
1use ringbuf::traits::{Consumer, Observer, Producer};
2use ringbuf::StaticRb;
3use tokio::task::AbortHandle;
4
5use crate::servers::udp::UDP_TRACKER_LOG_TARGET;
6
7/// A ring buffer for managing active UDP request abort handles.
8///
9/// The `ActiveRequests` struct maintains a fixed-size ring buffer of abort
10/// handles for UDP request processor tasks. It ensures that at most 50 requests
11/// are handled concurrently, and provides mechanisms to handle buffer overflow
12/// by removing finished or oldest unfinished tasks.
13#[derive(Default)]
14pub struct ActiveRequests {
15 rb: StaticRb<AbortHandle, 50>, // The number of requests handled simultaneously.
16}
17
18impl std::fmt::Debug for ActiveRequests {
19 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
20 let (left, right) = &self.rb.as_slices();
21 let dbg = format!("capacity: {}, left: {left:?}, right: {right:?}", &self.rb.capacity());
22 f.debug_struct("ActiveRequests").field("rb", &dbg).finish()
23 }
24}
25
26impl Drop for ActiveRequests {
27 fn drop(&mut self) {
28 for h in self.rb.pop_iter() {
29 if !h.is_finished() {
30 h.abort();
31 }
32 }
33 }
34}
35
36impl ActiveRequests {
37 /// Inserts an abort handle for a UDP request processor task.
38 ///
39 /// If the buffer is full, this method attempts to make space by:
40 ///
41 /// 1. Removing finished tasks.
42 /// 2. Removing the oldest unfinished task if no finished tasks are found.
43 ///
44 /// # Panics
45 ///
46 /// This method will panic if it cannot make space for adding a new handle.
47 ///
48 /// # Arguments
49 ///
50 /// * `abort_handle` - The `AbortHandle` for the UDP request processor task.
51 /// * `local_addr` - A string slice representing the local address for logging.
52 pub async fn force_push(&mut self, new_task: AbortHandle, local_addr: &str) {
53 // Attempt to add the new handle to the buffer.
54 match self.rb.try_push(new_task) {
55 Ok(()) => {
56 // Successfully added the task, no further action needed.
57 }
58 Err(new_task) => {
59 // Buffer is full, attempt to make space.
60
61 let mut finished: u64 = 0;
62 let mut unfinished_task = None;
63
64 for old_task in self.rb.pop_iter() {
65 // We found a finished tasks ... increase the counter and
66 // continue searching for more and ...
67 if old_task.is_finished() {
68 finished += 1;
69 continue;
70 }
71
72 // The current removed tasks is not finished.
73
74 // Give it a second chance to finish.
75 tokio::task::yield_now().await;
76
77 // Recheck if it finished ... increase the counter and
78 // continue searching for more and ...
79 if old_task.is_finished() {
80 finished += 1;
81 continue;
82 }
83
84 // At this point we found a "definitive" unfinished task.
85
86 // Log unfinished task.
87 tracing::debug!(
88 target: UDP_TRACKER_LOG_TARGET,
89 local_addr,
90 removed_count = finished,
91 "Udp::run_udp_server::loop (got unfinished task)"
92 );
93
94 // If no finished tasks were found, abort the current
95 // unfinished task.
96 if finished == 0 {
97 // We make place aborting this task.
98 old_task.abort();
99
100 tracing::warn!(
101 target: UDP_TRACKER_LOG_TARGET,
102 local_addr,
103 "Udp::run_udp_server::loop aborting request: (no finished tasks)"
104 );
105
106 break;
107 }
108
109 // At this point we found at least one finished task, but the
110 // current one is not finished and it was removed from the
111 // buffer, so we need to re-insert in in the buffer.
112
113 // Save the unfinished task for re-entry.
114 unfinished_task = Some(old_task);
115 }
116
117 // After this point there can't be a race condition because only
118 // one thread owns the active buffer. There is no way for the
119 // buffer to be full again. That means the "expects" should
120 // never happen.
121
122 // Reinsert the unfinished task if any.
123 if let Some(h) = unfinished_task {
124 self.rb.try_push(h).expect("it was previously inserted");
125 }
126
127 // Insert the new task.
128 //
129 // Notice that space has already been made for this new task in
130 // the buffer. One or many old task have already been finished
131 // or yielded, freeing space in the buffer. Or a single
132 // unfinished task has been aborted to make space for this new
133 // task.
134 if !new_task.is_finished() {
135 self.rb.try_push(new_task).expect("it should have space for this new task.");
136 }
137 }
138 };
139 }
140}