libsignal_rust/
queue_job.rs

1use std::collections::HashMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5use tokio::sync::Mutex;
6
7type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send>>;
8
9lazy_static::lazy_static! {
10    static ref QUEUE_ASYNC_BUCKETS: Arc<Mutex<HashMap<String, Vec<BoxFuture<()>>>>> = 
11        Arc::new(Mutex::new(HashMap::new()));
12}
13
14const GC_LIMIT: usize = 10000;
15
16async fn async_queue_executor(bucket: String) {
17    let offset = 0;
18    
19    loop {
20        let mut buckets = QUEUE_ASYNC_BUCKETS.lock().await;
21        
22        if let Some(queue) = buckets.get_mut(&bucket) {
23            let limit = std::cmp::min(queue.len(), GC_LIMIT);
24            
25            if limit == 0 {
26                buckets.remove(&bucket);
27                break;
28            }
29            
30            let mut tasks_to_run = Vec::new();
31            for _ in offset..limit {
32                if let Some(task) = queue.pop() {
33                    tasks_to_run.push(task);
34                }
35            }
36            
37            drop(buckets);
38            
39            for task in tasks_to_run {
40                task.await;
41            }
42            
43            let buckets = QUEUE_ASYNC_BUCKETS.lock().await;
44            if let Some(queue) = buckets.get(&bucket) {
45                if queue.is_empty() {
46                    break;
47                }
48            }
49        } else {
50            break;
51        }
52    }
53}
54
55pub async fn queue_job<F, R>(bucket: String, awaitable: F) -> R
56where
57    F: Future<Output = R> + Send + 'static,
58    R: Send + 'static,
59{
60    let (sender, receiver) = tokio::sync::oneshot::channel();
61    
62    let wrapped_task = Box::pin(async move {
63        let result = awaitable.await;
64        let _ = sender.send(result);
65    });
66    
67    let mut buckets = QUEUE_ASYNC_BUCKETS.lock().await;
68    let is_inactive = !buckets.contains_key(&bucket);
69    
70    buckets.entry(bucket.clone()).or_insert_with(Vec::new).push(wrapped_task);
71    
72    if is_inactive {
73        let bucket_clone = bucket.clone();
74        tokio::spawn(async_queue_executor(bucket_clone));
75    }
76    
77    drop(buckets);
78    
79    receiver.await.unwrap()
80}