libsignal_rust/
queue_job.rs1use std::collections::HashMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5use tokio::sync::Mutex;
6
7type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send>>;
8
9lazy_static::lazy_static! {
10 static ref QUEUE_ASYNC_BUCKETS: Arc<Mutex<HashMap<String, Vec<BoxFuture<()>>>>> =
11 Arc::new(Mutex::new(HashMap::new()));
12}
13
14const GC_LIMIT: usize = 10000;
15
16async fn async_queue_executor(bucket: String) {
17 let offset = 0;
18
19 loop {
20 let mut buckets = QUEUE_ASYNC_BUCKETS.lock().await;
21
22 if let Some(queue) = buckets.get_mut(&bucket) {
23 let limit = std::cmp::min(queue.len(), GC_LIMIT);
24
25 if limit == 0 {
26 buckets.remove(&bucket);
27 break;
28 }
29
30 let mut tasks_to_run = Vec::new();
31 for _ in offset..limit {
32 if let Some(task) = queue.pop() {
33 tasks_to_run.push(task);
34 }
35 }
36
37 drop(buckets);
38
39 for task in tasks_to_run {
40 task.await;
41 }
42
43 let buckets = QUEUE_ASYNC_BUCKETS.lock().await;
44 if let Some(queue) = buckets.get(&bucket) {
45 if queue.is_empty() {
46 break;
47 }
48 }
49 } else {
50 break;
51 }
52 }
53}
54
55pub async fn queue_job<F, R>(bucket: String, awaitable: F) -> R
56where
57 F: Future<Output = R> + Send + 'static,
58 R: Send + 'static,
59{
60 let (sender, receiver) = tokio::sync::oneshot::channel();
61
62 let wrapped_task = Box::pin(async move {
63 let result = awaitable.await;
64 let _ = sender.send(result);
65 });
66
67 let mut buckets = QUEUE_ASYNC_BUCKETS.lock().await;
68 let is_inactive = !buckets.contains_key(&bucket);
69
70 buckets.entry(bucket.clone()).or_insert_with(Vec::new).push(wrapped_task);
71
72 if is_inactive {
73 let bucket_clone = bucket.clone();
74 tokio::spawn(async_queue_executor(bucket_clone));
75 }
76
77 drop(buckets);
78
79 receiver.await.unwrap()
80}