libsignal-rust 0.1.0

Complete and Powerful Rust implementation of libsignal protocol
Documentation
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::Mutex;

type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send>>;

lazy_static::lazy_static! {
    static ref QUEUE_ASYNC_BUCKETS: Arc<Mutex<HashMap<String, Vec<BoxFuture<()>>>>> = 
        Arc::new(Mutex::new(HashMap::new()));
}

const GC_LIMIT: usize = 10000;

async fn async_queue_executor(bucket: String) {
    let offset = 0;
    
    loop {
        let mut buckets = QUEUE_ASYNC_BUCKETS.lock().await;
        
        if let Some(queue) = buckets.get_mut(&bucket) {
            let limit = std::cmp::min(queue.len(), GC_LIMIT);
            
            if limit == 0 {
                buckets.remove(&bucket);
                break;
            }
            
            let mut tasks_to_run = Vec::new();
            for _ in offset..limit {
                if let Some(task) = queue.pop() {
                    tasks_to_run.push(task);
                }
            }
            
            drop(buckets);
            
            for task in tasks_to_run {
                task.await;
            }
            
            let buckets = QUEUE_ASYNC_BUCKETS.lock().await;
            if let Some(queue) = buckets.get(&bucket) {
                if queue.is_empty() {
                    break;
                }
            }
        } else {
            break;
        }
    }
}

pub async fn queue_job<F, R>(bucket: String, awaitable: F) -> R
where
    F: Future<Output = R> + Send + 'static,
    R: Send + 'static,
{
    let (sender, receiver) = tokio::sync::oneshot::channel();
    
    let wrapped_task = Box::pin(async move {
        let result = awaitable.await;
        let _ = sender.send(result);
    });
    
    let mut buckets = QUEUE_ASYNC_BUCKETS.lock().await;
    let is_inactive = !buckets.contains_key(&bucket);
    
    buckets.entry(bucket.clone()).or_insert_with(Vec::new).push(wrapped_task);
    
    if is_inactive {
        let bucket_clone = bucket.clone();
        tokio::spawn(async_queue_executor(bucket_clone));
    }
    
    drop(buckets);
    
    receiver.await.unwrap()
}