1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
mod fn_one; use aqueue_trait::async_trait; use std::sync::atomic::{AtomicU8, Ordering}; use async_oneshot::{ Receiver}; use std::future::Future; use concurrent_queue::ConcurrentQueue; pub use fn_one::AQueueItem; use crate::AResult; #[async_trait] pub trait QueueItem{ async fn run(&self)->AResult<()>; } const IDLE:u8=0; const OPEN:u8=1; pub struct AQueue{ deque:ConcurrentQueue<Box<dyn QueueItem+Send+Sync>>, status:AtomicU8 } unsafe impl Send for AQueue{} unsafe impl Sync for AQueue{} impl AQueue{ pub fn new()->AQueue{ AQueue{ deque:ConcurrentQueue::unbounded(), status:AtomicU8::new(IDLE) } } #[inline] pub async fn run<A,T,S>(&self, call:impl FnOnce(A)->T+ Send+Sync+'static, arg:A) ->AResult<S> where T:Future<Output = AResult<S>> + Send+ Sync+'static, S:'static, A: Send+Sync+'static { self.push(AQueueItem::new(call,arg)).await } #[inline] pub async fn push<T>(&self,(rx,item):(Receiver<AResult<T>>,Box<dyn QueueItem+Send+Sync>)) ->AResult<T>{ if let Err(er)= self.deque.push(item){ return Err(er.to_string().into()) } self.run_ing().await?; match rx.await { Ok(x)=>Ok(x?), Err(_)=> Err("CLOSE".into()) } } #[inline] pub async fn run_ing(&self)->AResult<()>{ if self.status.compare_and_swap(IDLE,OPEN,Ordering::Release)==IDLE { 'recv:loop { let item = { match self.deque.pop() { Ok(p)=>{ p } _ => { if self.status.compare_and_swap(OPEN, IDLE, Ordering::Release) == OPEN { break 'recv; } else { panic!("error status") } } } }; item.run().await?; } } Ok(()) } }