1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
use std::ops::Drop;
use std::{collections::HashMap, fmt::Debug};
use tokio::sync::{mpsc, oneshot};
use crate::{
batch_function::BatchFunction,
loader_op::{LoadRequest, LoaderOp},
loader_worker::LoaderWorker,
};
pub struct Loader<K, V>
where
K: 'static + Eq + Debug + Copy + Send,
V: 'static + Send + Debug + Clone,
{
request_tx: mpsc::UnboundedSender<LoaderOp<K, V>>,
load_task_handle: tokio::task::JoinHandle<()>,
}
impl<K, V> Drop for Loader<K, V>
where
K: 'static + Eq + Debug + Copy + Send,
V: 'static + Send + Debug + Clone,
{
fn drop(&mut self) {
self.load_task_handle.abort();
}
}
impl<K, V> Loader<K, V>
where
K: 'static + Eq + Debug + Ord + Copy + std::hash::Hash + Send + Sync,
V: 'static + Send + Debug + Clone,
{
pub fn new<F, ContextT>(_: F, context: ContextT) -> Self
where
ContextT: Send + Sync + 'static,
F: 'static + BatchFunction<K, V, Context = ContextT> + Send,
{
let (tx, rx) = mpsc::unbounded_channel();
Self {
request_tx: tx,
load_task_handle: tokio::task::spawn(
LoaderWorker::<K, V, F, HashMap<K, V>, ContextT>::new(HashMap::new(), rx, context)
.start(),
),
}
}
}
impl<K, V> Loader<K, V>
where
K: 'static + Eq + Debug + Ord + Copy + Send + Sync,
V: 'static + Send + Debug + Clone,
{
pub async fn load(&self, key: K) -> Option<V> {
let (response_tx, response_rx) = oneshot::channel();
self.request_tx
.send(LoaderOp::Load(LoadRequest::One(key, response_tx)))
.unwrap();
response_rx.await.unwrap()
}
pub async fn load_many(&self, keys: Vec<K>) -> Vec<Option<V>> {
let (response_tx, response_rx) = oneshot::channel();
self.request_tx
.send(LoaderOp::Load(LoadRequest::Many(keys, response_tx)))
.unwrap();
response_rx.await.unwrap()
}
pub async fn prime(&self, key: K, value: V) {
self.request_tx.send(LoaderOp::Prime(key, value)).unwrap();
}
pub async fn prime_many(&self, key_vals: Vec<(K, V)>) {
self.request_tx.send(LoaderOp::PrimeMany(key_vals)).unwrap();
}
pub async fn clear(&self, key: K) {
self.request_tx.send(LoaderOp::Clear(key)).unwrap();
}
pub async fn clear_many(&self, keys: Vec<K>) {
self.request_tx.send(LoaderOp::ClearMany(keys)).unwrap();
}
}