acter_queue/queue/
fn_one.rs1use std::future::Future;
2use std::error::Error;
3use async_trait::async_trait;
4use std::cell::RefCell;
5use async_oneshot::{oneshot,Receiver,Sender};
6use super::QueueItem;
7
8
9pub struct AQueueItem<A,T,S>{
10 arg:RefCell<Option<A>>,
11 call:RefCell<Option<Box<dyn FnOnce(A)->T+ Send>>>,
12 result_sender:RefCell<Option<Sender<Result<S, Box<dyn Error+Send+Sync>>>>>
13}
14
15unsafe impl<A,T,S> Send for AQueueItem<A,T,S>{}
16unsafe impl<A,T,S> Sync for AQueueItem<A,T,S>{}
17
18
19
20#[async_trait]
21impl<A,T,S> QueueItem for AQueueItem<A,T,S>
22 where T:Future<Output = Result<S, Box<dyn Error+Send+Sync>>> + Send+ Sync, A: Send{
23
24 #[inline]
25 async fn run(&self) -> Result<(), Box<dyn Error+Send+Sync>> {
26 let call = self.call.replace(None);
27
28 if let Some(call) = call {
29 let arg= self.arg.replace(None).unwrap();
30 let res= (call)(arg).await;
31
32 if let Some(x)= self.result_sender.replace(None){
33 if x.send(res).is_err() {
34 Err("close".into())
35 }
36 else{
37 Ok(())
38 }
39 }
40 else{
41 Err("not call oneshot is none".into())
42 }
43 }
44 else {
45 Err("not call fn is none".into())
46 }
47 }
48
49}
50
51impl <A,T,S> AQueueItem<A,T,S>
52 where T:Future<Output = Result<S, Box<dyn Error+Send+Sync>>> + Send+ Sync+'static,
53 S:'static, A: Send +'static {
54 #[inline]
55 pub fn new(call:impl FnOnce(A)->T+ Send+'static,arg:A)->(Receiver<Result<S, Box<dyn Error+Send+Sync>>>,Box<dyn QueueItem+Send+Sync>){
56 let (tx,rx)=oneshot();
57 (rx, Box::new(AQueueItem{
58 arg:RefCell::new(Some(arg)),
59 call:RefCell::new(Some(Box::new(call))),
60 result_sender:RefCell::new(Some(tx))
61 }))
62 }
63
64}
65