use std::{collections::HashMap, sync::Arc};
use parking_lot::RwLock;
use tokio::sync::watch;
use crate::job::JobLog;
#[derive(Debug)]
pub(crate) struct RunListEntry<K, V>
where
K: Eq + std::hash::Hash,
{
owner: Arc<Inner<K, V>>,
key: K,
value: V,
}
impl<K, V> std::ops::Deref for RunListEntry<K, V>
where
K: Eq + std::hash::Hash,
{
type Target = V;
fn deref(&self) -> &Self::Target {
&self.value
}
}
impl<K, V> Drop for RunListEntry<K, V>
where
K: Eq + std::hash::Hash,
{
fn drop(&mut self) {
self.owner.remove(&self.key);
}
}
#[derive(Debug)]
struct Inner<K, V> {
rundata: RwLock<HashMap<K, V>>,
changed: watch::Sender<usize>,
}
impl<K, V> Inner<K, V>
where
K: Eq + std::hash::Hash,
{
fn remove(&self, key: &K) {
let mut r = self.rundata.write();
r.remove(key);
self.changed.send_replace(r.len());
}
}
#[derive(Clone, Debug)]
pub struct RunList<K, V> {
inner: Arc<Inner<K, V>>,
size: watch::Receiver<usize>,
}
impl<K, V> RunList<K, V>
where
K: Eq + std::hash::Hash + Clone,
V: Clone,
{
pub(crate) fn new() -> Self {
let (tx, rx) = watch::channel(0);
RunList {
inner: Arc::new(Inner {
rundata: RwLock::new(HashMap::new()),
changed: tx,
}),
size: rx,
}
}
pub(crate) async fn wait_for_space(&mut self, max: usize) {
loop {
if *self.size.borrow_and_update() < max {
break;
}
let _ = self.size.changed().await;
}
}
pub(crate) fn size(&self) -> usize {
*self.size.borrow()
}
pub(crate) fn insert(&mut self, id: K, data: V) -> RunListEntry<K, V> {
let mut r = self.inner.rundata.write();
r.insert(id.clone(), data.clone());
self.inner.changed.send_replace(r.len());
RunListEntry {
owner: self.inner.clone(),
key: id,
value: data,
}
}
#[allow(dead_code)]
pub(crate) fn lookup(&self, id: &K) -> Option<V> {
let r = self.inner.rundata.read();
r.get(id).cloned()
}
}
pub struct JobRunList(RunList<u64, JobLog>);
impl From<RunList<u64, JobLog>> for JobRunList {
fn from(value: RunList<u64, JobLog>) -> Self {
JobRunList(value)
}
}
impl JobRunList {
pub(crate) fn inner(self) -> RunList<u64, JobLog> {
self.0
}
}
#[cfg(test)]
mod test {
use std::time::Duration;
use tokio::time::sleep;
use super::*;
#[tokio::test]
async fn runlist() {
let mut runlist = RunList::new();
let v = runlist.insert(1, 2);
runlist.wait_for_space(2).await;
let mut r_clone = runlist.clone();
let join = tokio::spawn(async move {
r_clone.wait_for_space(1).await;
assert_eq!(r_clone.size(), 0)
});
assert_eq!(runlist.size(), 1);
assert_eq!(*v, 2);
assert_eq!(runlist.lookup(&1), Some(2));
sleep(Duration::from_millis(100)).await;
drop(v);
assert_eq!(runlist.size(), 0);
assert_eq!(runlist.lookup(&1), None);
join.await.unwrap();
}
}