use crate::{mmr::Location, qmdb::sync::engine::IndexedFetchResult};
use commonware_cryptography::Digest;
use commonware_utils::channel::oneshot;
use futures::stream::FuturesUnordered;
use std::{
collections::{BTreeMap, HashMap},
future::Future,
pin::Pin,
};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub(super) struct Id(u64);
pub(super) struct Requests<Op, D: Digest, E> {
#[allow(clippy::type_complexity)]
futures: FuturesUnordered<Pin<Box<dyn Future<Output = IndexedFetchResult<Op, D, E>> + Send>>>,
next_id: u64,
tracked: HashMap<Id, (Location, oneshot::Sender<()>)>,
by_location: BTreeMap<Location, Id>,
}
impl<Op, D: Digest, E> Requests<Op, D, E> {
pub fn new() -> Self {
Self {
futures: FuturesUnordered::new(),
next_id: 0,
tracked: HashMap::new(),
by_location: BTreeMap::new(),
}
}
pub const fn next_id(&mut self) -> Id {
let id = Id(self.next_id);
self.next_id += 1;
id
}
pub fn insert(
&mut self,
id: Id,
start_loc: Location,
cancel_tx: oneshot::Sender<()>,
future: Pin<Box<dyn Future<Output = IndexedFetchResult<Op, D, E>> + Send>>,
) {
if let Some(old_id) = self.by_location.insert(start_loc, id) {
self.tracked.remove(&old_id);
}
self.tracked.insert(id, (start_loc, cancel_tx));
self.futures.push(future);
}
pub fn remove(&mut self, id: Id) -> bool {
if let Some((loc, _cancel_tx)) = self.tracked.remove(&id) {
if self.by_location.get(&loc) == Some(&id) {
self.by_location.remove(&loc);
}
true
} else {
false
}
}
pub fn remove_before(&mut self, loc: Location) {
let keep = self.by_location.split_off(&loc);
for id in self.by_location.values() {
self.tracked.remove(id);
}
self.by_location = keep;
}
pub fn locations(&self) -> impl Iterator<Item = &Location> {
self.by_location.keys()
}
pub fn contains(&self, loc: &Location) -> bool {
self.by_location.contains_key(loc)
}
#[allow(clippy::type_complexity)]
pub fn futures_mut(
&mut self,
) -> &mut FuturesUnordered<Pin<Box<dyn Future<Output = IndexedFetchResult<Op, D, E>> + Send>>>
{
&mut self.futures
}
pub fn len(&self) -> usize {
self.tracked.len()
}
}
impl<Op, D: Digest, E> Default for Requests<Op, D, E> {
fn default() -> Self {
Self::new()
}
}