asyncra/
lib.rs

1use std::any::Any;
2use std::fmt::Debug;
3use std::future::Future;
4use std::sync::Arc;
5use downcast_rs::{impl_downcast, Downcast};
6use dyn_clone::{clone_trait_object, DynClone};
7use tokio::runtime::Builder;
8use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
9use tokio::sync::{oneshot, Mutex, Notify};
10use tokio::sync::oneshot::{Receiver, Sender};
11
12pub use asyncra_macros::main;
13pub use asyncra_macros::test;
14pub use asyncra_macros::bench;
15pub use criterion::{criterion_group, criterion_main, Criterion};
16use futures::stream::FuturesUnordered;
17use futures::StreamExt;
18use lazy_static::lazy_static;
19use tokio::join;
20use tokio::task::JoinHandle;
21
22lazy_static! {
23    static ref NODE_MANAGER_CHANNEL: (NodeSender<NodeManagerMessage>, NodeReceiver<NodeManagerMessage>) = {
24        let (tx, rx) = unbounded_channel();
25        (Arc::new(tx), Arc::new(Mutex::new(rx)))
26    };
27}
28
29#[macro_export] macro_rules! extract_benches {
30    ($( $target:path ),+ $(,)*) => {
31        asyncra::criterion_group!(benches, $($target),*);
32        asyncra::criterion_main!(benches);
33    };
34}
35
36pub type NodeSender<T> = Arc<UnboundedSender<T>>;
37pub type NodeReceiver<T> = Arc<Mutex<UnboundedReceiver<T>>>;
38pub type DataSender<T> = Sender<T>;
39pub type DataReceiver<T> = Receiver<T>;
40
41pub type Result<T> = anyhow::Result<T>;
42
43pub trait EventLoopResult<T> {
44    fn unwrap(self) -> T;
45}
46impl EventLoopResult<()> for Result<()> {
47    fn unwrap(self) -> () {
48        self.unwrap()
49    }
50}
51
52pub trait CloneableAny: Any + Send + Sync + Downcast + Debug + DynClone {
53    fn clone_box(&self) -> Box<dyn CloneableAny + 'static>;
54}
55
56impl_downcast!(CloneableAny);
57clone_trait_object!(CloneableAny);
58
59impl<T> CloneableAny for T
60where
61    T: Any + Clone + Send + Sync + Debug,
62{
63    fn clone_box(&self) -> Box<dyn CloneableAny + 'static> {
64        Box::new(self.clone())
65    }
66}
67
68pub enum SharedValueMessage<T> {
69    Write {
70        data: T,
71    },
72    WriteLock {
73      data: T,
74    },
75    Read{ tx: DataSender<T>},
76    ReadLock { tx: DataSender<T> }
77}
78
79pub enum NodeManagerMessage {
80    Reg(JoinHandle<anyhow::Result<()>>),
81    Close
82}
83
84#[derive(Clone)]
85pub struct SharedValue {
86    sender: Arc<UnboundedSender<SharedValueMessage<Box<dyn CloneableAny>>>>,
87    #[allow(dead_code)]
88    notify: Arc<Notify>,
89}
90
91#[allow(unused_assignments)]
92impl SharedValue {
93    pub fn new<T: CloneableAny>(data: T) -> Self {
94        let (tx, mut rx) = unbounded_channel();
95        let notify = Arc::new(Notify::new());
96        let notify_clone = notify.clone();
97        let s_val = Self {
98            sender: Arc::new(tx),
99            notify: notify_clone,
100        };
101
102        tokio::spawn(async move {
103            let mut storage: Option<Box<dyn CloneableAny>> = Some(Box::new(data));
104            let mut read_locks = 0;
105            let mut write_lock = false;
106
107            while let Some(msg) = rx.recv().await {
108                match msg {
109                    SharedValueMessage::Write { data } => {
110                        if !write_lock {
111                            storage = Some(data);
112                            notify.notify_waiters();
113                        }
114                    }
115                    SharedValueMessage::Read { tx } => {
116                        if let Some(ref value) = storage {
117                            let _ = tx.send(value.clone());
118                        }
119                    }
120                    SharedValueMessage::WriteLock { data } => {
121                        while write_lock || read_locks > 0 {
122                            notify.notified().await;
123                        }
124                        write_lock = true;
125                        storage = Some(data);
126                        write_lock = false;
127                        notify.notify_waiters();
128                    }
129                    SharedValueMessage::ReadLock { tx } => {
130                        while write_lock {
131                            notify.notified().await;
132                        }
133                        read_locks += 1;
134                        if let Some(ref value) = storage {
135                            let _ = tx.send(value.clone());
136                        }
137                        read_locks -= 1;
138                        notify.notify_waiters();
139                    }
140                }
141            }
142        });
143
144        s_val
145    }
146
147    pub async fn read<T: CloneableAny>(&self) -> T {
148        let (tx, rx) = oneshot::channel();
149        self.sender.send(SharedValueMessage::Read { tx }).unwrap();
150        *rx.await.unwrap().downcast::<T>().unwrap()
151    }
152
153    pub async fn read_lock<T: CloneableAny>(&self) -> T {
154        let (tx, rx) = oneshot::channel();
155        self.sender.send(SharedValueMessage::ReadLock { tx }).unwrap();
156        *rx.await.unwrap().downcast::<T>().unwrap()
157    }
158
159    pub fn write<T: CloneableAny>(&self, data: T) {
160        self.sender
161            .send(SharedValueMessage::Write {
162                data: Box::new(data),
163            })
164            .unwrap();
165    }
166
167    pub fn write_lock<T: CloneableAny>(&self, data: T) {
168        self.sender
169            .send(SharedValueMessage::WriteLock {
170                data: Box::new(data),
171            })
172            .unwrap();
173    }
174}
175
176pub fn wake_runtime<
177    M: Future<Output = Result<()>> + Sized,
178>(
179    fabric: fn() -> M,
180) -> anyhow::Result<()> {
181    let rt = Builder::new_multi_thread()
182        .worker_threads(num_cpus::get())
183        .enable_all()
184        .build()?;
185    rt.block_on(async move {
186        let node_manager = tokio::spawn(async move {
187            let mut nodes = FuturesUnordered::new();
188            loop {
189                if let Some(msg) = NODE_MANAGER_CHANNEL.clone().1.lock().await.recv().await {
190                    match msg {
191                        NodeManagerMessage::Reg(node) => {
192                            nodes.push(node);
193                        }
194                        NodeManagerMessage::Close => {
195                            while let Some(_node) = nodes.next().await {}
196                            return ();
197                        }
198                    }
199                }
200            }
201        });
202        let handle = async move {
203            let res = fabric().await;
204            let _ = NODE_MANAGER_CHANNEL.clone().0.send(NodeManagerMessage::Close);
205            res
206        };
207        let res = join!(handle, node_manager);
208        res.0
209    })?;
210    Ok(())
211}
212
213pub fn spawn_node<F: Future<Output=anyhow::Result<()>> + Send + Sync + 'static>(fabric: F) -> anyhow::Result<()> {
214    NODE_MANAGER_CHANNEL.clone().0.send(NodeManagerMessage::Reg(tokio::task::spawn(fabric)))?;
215    Ok(())
216}