aqueue 0.1.9

fast speed thread safe async execute queue.
Documentation

fast speed thread safe async execute queue

Examples

use aqueue::AQueue;
static mut VALUE:i32=0;

#[tokio::main]
async fn main()->Result<(),Box<dyn Error+Sync+Send>> {
    let queue = AQueue::new();
    let mut v=0i32;
    for i in 0..2000000 {
        v= queue.run(async move |x| unsafe {
            // thread safe execute
            VALUE += x;
            Ok(VALUE)
        }, i).await?;
    }

    assert_eq!(v,-1455759936);
   
}

Examples Actor

#![feature(async_closure)]
use aqueue::AQueue;
use std::sync::Arc;
use std::cell::{RefCell};
use std::error::Error;
use std::time::Instant;

struct Foo{
    count:u64,
    i:i128
}

impl Foo{
    pub fn add(&mut self,x:i32)->i128{
        self.count+=1;
        self.i+=x as i128;
        self.i
    }

    pub fn get(&self)->i128{
        self.i
    }
    pub fn get_count(&self)->u64{
        self.count
    }
}

struct Store<T>(RefCell<T>);
unsafe impl<T> Sync for Store<T>{}
unsafe impl<T> Send for Store<T>{}

impl<T> Store<T>{
    pub fn new(x:T)->Store<T>{
        Store(RefCell::new(x))
    }
}

struct FooRunner {
    inner:Arc<Store<Foo>>,
    queue:AQueue
}

impl FooRunner {
    pub fn new()-> FooRunner {
        FooRunner {
            inner:Arc::new(Store::new(Foo{ count:0, i:0})),
            queue:AQueue::new()
        }
    }
    pub async fn add(&self,x:i32)->Result<i128,Box<dyn Error+ Send + Sync>>{
        self.queue.run(async move |inner| {
            Ok(inner.0.borrow_mut().add(x))
        },self.inner.clone()).await
    }

    pub async fn get(&self)->Result<i128,Box<dyn Error+ Send + Sync>>{
        self.queue.run(async move |inner| {
            Ok(inner.0.borrow().get())
        },self.inner.clone()).await
    }

    pub async fn get_count(&self)->Result<u64,Box<dyn Error+ Send + Sync>>{
        self.queue.run(async move |inner| {
            Ok(inner.0.borrow().get_count())
        },self.inner.clone()).await
    }
}


#[tokio::main]
async fn main() {
    {
        // Single thread test
        let tf = Arc::new(FooRunner::new());
        tf.add(100).await.unwrap();
        assert_eq!(100, tf.get().await.unwrap());
        tf.add(-100).await.unwrap();
        assert_eq!(0, tf.get().await.unwrap());

        let start = Instant::now();
        for i in 0..2000000 {
            if let Err(er) = tf.add(i).await {
                println!("{}", er);
            };
        }

        println!("test a count:{} value:{} time:{} qps:{}",
                 tf.get_count().await.unwrap(),
                 tf.get().await.unwrap(),
                 start.elapsed().as_secs_f32(),
                 tf.get_count().await.unwrap() / start.elapsed().as_millis() as u64 * 1000);
    }

    {
        //Multithreading test
        let tf = Arc::new(FooRunner::new());
        let start = Instant::now();
        let a_tf = tf.clone();
        let a = tokio::spawn(async move {
            for i in 0..1000000 {
                if let Err(er) = a_tf.add(i).await {
                    println!("{}", er);
                };
            }
        });

        let b_tf = tf.clone();
        let b = tokio::spawn(async move {
            for i in 1000000..2000000 {
                if let Err(er) = b_tf.add(i).await {
                    println!("{}", er);
                };
            }
        });

        let c_tf = tf.clone();
        let c = tokio::spawn(async move {
            for i in 2000000..3000000 {
                if let Err(er) = c_tf.add(i).await {
                    println!("{}", er);
                };
            }
        });

        c.await.unwrap();
        a.await.unwrap();
        b.await.unwrap();

        println!("test b count:{} value:{} time:{} qps:{}",
                 tf.get_count().await.unwrap(),
                 tf.get().await.unwrap(),
                 start.elapsed().as_secs_f32(),
                 tf.get_count().await.unwrap() / start.elapsed().as_millis() as u64 * 1000);
    }
}