1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
mod fn_one; use aqueue_trait::async_trait; #[async_trait] pub trait QueueItem{ async fn run(&self)->Result<(), Box<dyn Error+Send+Sync>>; } use std::error::Error; pub use fn_one::AQueueItem; use std::sync::atomic::{AtomicU8, Ordering}; use async_oneshot::{ Receiver}; use std::future::Future; use concurrent_queue::ConcurrentQueue; const IDLE:u8=0; const OPEN:u8=1; pub struct AQueue{ deque:ConcurrentQueue<Box<dyn QueueItem+Send+Sync>>, status:AtomicU8 } unsafe impl Send for AQueue{} unsafe impl Sync for AQueue{} impl AQueue{ pub fn new()->AQueue{ AQueue{ deque:ConcurrentQueue::unbounded(), status:AtomicU8::new(IDLE) } } #[inline] pub async fn run<A,T,S>(&self, call:impl FnOnce(A)->T+ Send+Sync+'static, arg:A) ->Result<S, Box<dyn Error+Send+Sync>> where T:Future<Output = Result<S, Box<dyn Error+Send+Sync>>> + Send+ Sync+'static, S:'static, A: Send+Sync+'static { let x= AQueueItem::new(call,arg); self.push(x).await } #[inline] pub async fn push<T>(&self,(rx,item):(Receiver<Result<T, Box<dyn Error+Send+Sync>>>,Box<dyn QueueItem+Send+Sync>))->Result<T, Box<dyn Error+Send+Sync>>{ if let Err(er)= self.deque.push(item){ return Err(er.to_string().into()) } self.run_ing().await?; match rx.await { Ok(x)=>Ok(x?), Err(_)=> Err("CLOSE".into()) } } #[inline] pub async fn run_ing(&self)->Result<(), Box<dyn Error+Send+Sync>>{ if self.status.compare_and_swap(IDLE,OPEN,Ordering::Release)==IDLE { loop { let item = { match self.deque.pop() { Ok(p)=>{ p } _ => { if self.status.compare_and_swap(OPEN, IDLE, Ordering::Release) == OPEN { break; } else { panic!("error status") } } } }; item.run().await?; } } Ok(()) } }