use crate::utils;
use crate::Client;
use std::collections::{HashMap, LinkedList};
use tokio::task::JoinHandle as Handle;
pub(crate) struct AppFut {
pub data: HashMap<u64, Handle<()>>,
pub index: LinkedList<(u64, f64)>,
}
impl AppFut {
pub(crate) fn new() -> Self {
Self {
data: HashMap::new(),
index: LinkedList::new(),
}
}
pub(crate) async fn direct_join(&mut self, mut ids: Vec<u64>) {
let mut raw_results = Vec::with_capacity(ids.len());
ids.iter().for_each(|id| {
if let Some(item) = self.data.remove(id) {
raw_results.push(item);
}
});
let mut item_cached = LinkedList::new();
while let Some(item) = self.index.pop_front() {
if ids.contains(&item.0) {
ids.retain(|&en| en != item.0);
} else {
item_cached.push_back(item);
}
if ids.is_empty() {
break;
}
}
item_cached.append(&mut self.index);
self.index = item_cached;
Client::join_all(raw_results).await;
}
pub(crate) fn cancell(&mut self, gap: f64, capacity: usize) {
let idels = self.get_idel(gap, capacity);
if !idels.is_empty() {
log::info!(
"cancelling {} / {} for Response.",
idels.len(),
self.index.len() + idels.len(),
);
idels.into_iter().for_each(|idel| (&idel.2).abort());
}
}
pub(crate) async fn all(&mut self, gap: f64, capacity: usize) {
let idels = self.get_idel(gap, capacity);
if !idels.is_empty() {
log::info!(
"cancelling {} / {} for Response.",
idels.len(),
self.index.len() + idels.len(),
);
let tasks = idels
.into_iter()
.map(|idel| idel.2)
.collect::<Vec<Handle<()>>>();
Client::join_all(tasks).await;
}
}
pub(crate) fn insert(&mut self, item: Handle<()>, hash: u64, stamp: f64) {
self.data.insert(hash, item);
let now = utils::now();
self.index.push_back((hash, stamp));
assert!(self.index.front().unwrap_or(&(0, 0.0)).1 < now);
}
pub(crate) fn get_idel(&mut self, gap: f64, capacity: usize) -> Vec<(u64, f64, Handle<()>)> {
let now = utils::now();
let mut items = Vec::with_capacity(capacity);
while let Some(item) = self.index.pop_front() {
if item.1 + gap < now && items.len() < capacity {
let ele = (item.0, item.1, self.data.remove(&item.0).unwrap());
items.push(ele);
} else {
self.index.push_front(item);
break;
}
}
if !items.is_empty() {
log::debug!("Availible response to parse: {}", items.len());
}
items
}
}