use ringbuf::traits::{Consumer, Observer, Producer};
use ringbuf::StaticRb;
use tokio::task::AbortHandle;
use crate::servers::udp::UDP_TRACKER_LOG_TARGET;
#[derive(Default)]
pub struct ActiveRequests {
rb: StaticRb<AbortHandle, 50>, }
impl std::fmt::Debug for ActiveRequests {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let (left, right) = &self.rb.as_slices();
let dbg = format!("capacity: {}, left: {left:?}, right: {right:?}", &self.rb.capacity());
f.debug_struct("ActiveRequests").field("rb", &dbg).finish()
}
}
impl Drop for ActiveRequests {
fn drop(&mut self) {
for h in self.rb.pop_iter() {
if !h.is_finished() {
h.abort();
}
}
}
}
impl ActiveRequests {
pub async fn force_push(&mut self, new_task: AbortHandle, local_addr: &str) {
match self.rb.try_push(new_task) {
Ok(()) => {
}
Err(new_task) => {
let mut finished: u64 = 0;
let mut unfinished_task = None;
for old_task in self.rb.pop_iter() {
if old_task.is_finished() {
finished += 1;
continue;
}
tokio::task::yield_now().await;
if old_task.is_finished() {
finished += 1;
continue;
}
tracing::debug!(
target: UDP_TRACKER_LOG_TARGET,
local_addr,
removed_count = finished,
"Udp::run_udp_server::loop (got unfinished task)"
);
if finished == 0 {
old_task.abort();
tracing::warn!(
target: UDP_TRACKER_LOG_TARGET,
local_addr,
"Udp::run_udp_server::loop aborting request: (no finished tasks)"
);
break;
}
unfinished_task = Some(old_task);
}
if let Some(h) = unfinished_task {
self.rb.try_push(h).expect("it was previously inserted");
}
if !new_task.is_finished() {
self.rb.try_push(new_task).expect("it should have space for this new task.");
}
}
};
}
}