1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
mod fn_one;
use aqueue_trait::async_trait;
use std::sync::atomic::{AtomicU8, Ordering};
use async_oneshot::{ Receiver};
use std::future::Future;
use concurrent_queue::ConcurrentQueue;
pub use fn_one::AQueueItem;
use crate::AResult;

#[async_trait]
pub trait QueueItem{
    async fn run(&self)->AResult<()>;
}

const IDLE:u8=0;
const OPEN:u8=1;

pub struct AQueue{
    deque:ConcurrentQueue<Box<dyn QueueItem+Send+Sync>>,
    status:AtomicU8
}

unsafe impl Send for AQueue{}
unsafe impl Sync for AQueue{}

impl AQueue{
    pub fn new()->AQueue{
        AQueue{
            deque:ConcurrentQueue::unbounded(),
            status:AtomicU8::new(IDLE)
        }
    }

    #[inline]
    pub async fn run<A,T,S>(&self, call:impl FnOnce(A)->T+ Send+Sync+'static, arg:A) ->AResult<S>
    where T:Future<Output = AResult<S>> + Send+ Sync+'static,
          S:'static, A: Send+Sync+'static {
        self.push(AQueueItem::new(call,arg)).await
    }

    #[inline]
    pub async fn push<T>(&self,(rx,item):(Receiver<AResult<T>>,Box<dyn QueueItem+Send+Sync>))
        ->AResult<T>{
        if let Err(er)= self.deque.push(item){
            return Err(er.to_string().into())
        }
        self.run_ing().await?;
        match rx.await {
            Ok(x)=>Ok(x?),
            Err(_)=> Err("CLOSE".into())
        }
    }


    #[inline]
    pub async fn run_ing(&self)->AResult<()>{
        if  self.status.compare_and_swap(IDLE,OPEN,Ordering::Release)==IDLE {
            'recv:loop {
                let item = {
                    match self.deque.pop() {
                        Ok(p)=>{
                            p
                        }
                        _ => {
                            if self.status.compare_and_swap(OPEN, IDLE, Ordering::Release) == OPEN {
                                break 'recv;
                            } else {
                                panic!("error status")
                            }
                        }
                    }
                };

                item.run().await?;
            }

        }

        Ok(())
    }

}