Skip to main content

acter_queue/queue/
fn_one.rs

1use std::future::Future;
2use std::error::Error;
3use async_trait::async_trait;
4use std::cell::RefCell;
5use async_oneshot::{oneshot,Receiver,Sender};
6use super::QueueItem;
7
8
9pub struct AQueueItem<A,T,S>{
10    arg:RefCell<Option<A>>,
11    call:RefCell<Option<Box<dyn FnOnce(A)->T+ Send>>>,
12    result_sender:RefCell<Option<Sender<Result<S, Box<dyn Error+Send+Sync>>>>>
13}
14
15unsafe impl<A,T,S> Send for AQueueItem<A,T,S>{}
16unsafe impl<A,T,S> Sync for AQueueItem<A,T,S>{}
17
18
19
20#[async_trait]
21impl<A,T,S> QueueItem for AQueueItem<A,T,S>
22    where T:Future<Output = Result<S, Box<dyn Error+Send+Sync>>> + Send+ Sync, A: Send{
23
24    #[inline]
25    async fn run(&self) -> Result<(), Box<dyn Error+Send+Sync>> {
26        let call = self.call.replace(None);
27
28        if let Some(call) = call {
29            let arg= self.arg.replace(None).unwrap();
30            let res=  (call)(arg).await;
31
32            if let Some(x)= self.result_sender.replace(None){
33                if x.send(res).is_err() {
34                    Err("close".into())
35                }
36                else{
37                    Ok(())
38                }
39            }
40            else{
41                Err("not call oneshot is none".into())
42            }
43        }
44        else {
45            Err("not call fn is none".into())
46        }
47    }
48
49}
50
51impl <A,T,S> AQueueItem<A,T,S>
52    where T:Future<Output = Result<S, Box<dyn Error+Send+Sync>>> + Send+ Sync+'static,
53          S:'static, A: Send +'static {
54    #[inline]
55    pub fn new(call:impl FnOnce(A)->T+ Send+'static,arg:A)->(Receiver<Result<S, Box<dyn Error+Send+Sync>>>,Box<dyn QueueItem+Send+Sync>){
56        let (tx,rx)=oneshot();
57        (rx, Box::new(AQueueItem{
58            arg:RefCell::new(Some(arg)),
59            call:RefCell::new(Some(Box::new(call))),
60            result_sender:RefCell::new(Some(tx))
61        }))
62    }
63
64}
65