1use std::any::Any;
2use std::fmt::Debug;
3use std::future::Future;
4use std::sync::Arc;
5use downcast_rs::{impl_downcast, Downcast};
6use dyn_clone::{clone_trait_object, DynClone};
7use tokio::runtime::Builder;
8use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
9use tokio::sync::{oneshot, Mutex, Notify};
10use tokio::sync::oneshot::{Receiver, Sender};
11
12pub use asyncra_macros::main;
13pub use asyncra_macros::test;
14pub use asyncra_macros::bench;
15pub use criterion::{criterion_group, criterion_main, Criterion};
16use futures::stream::FuturesUnordered;
17use futures::StreamExt;
18use lazy_static::lazy_static;
19use tokio::join;
20use tokio::task::JoinHandle;
21
22lazy_static! {
23 static ref NODE_MANAGER_CHANNEL: (NodeSender<NodeManagerMessage>, NodeReceiver<NodeManagerMessage>) = {
24 let (tx, rx) = unbounded_channel();
25 (Arc::new(tx), Arc::new(Mutex::new(rx)))
26 };
27}
28
29#[macro_export] macro_rules! extract_benches {
30 ($( $target:path ),+ $(,)*) => {
31 asyncra::criterion_group!(benches, $($target),*);
32 asyncra::criterion_main!(benches);
33 };
34}
35
36pub type NodeSender<T> = Arc<UnboundedSender<T>>;
37pub type NodeReceiver<T> = Arc<Mutex<UnboundedReceiver<T>>>;
38pub type DataSender<T> = Sender<T>;
39pub type DataReceiver<T> = Receiver<T>;
40
41pub type Result<T> = anyhow::Result<T>;
42
43pub trait EventLoopResult<T> {
44 fn unwrap(self) -> T;
45}
46impl EventLoopResult<()> for Result<()> {
47 fn unwrap(self) -> () {
48 self.unwrap()
49 }
50}
51
52pub trait CloneableAny: Any + Send + Sync + Downcast + Debug + DynClone {
53 fn clone_box(&self) -> Box<dyn CloneableAny + 'static>;
54}
55
56impl_downcast!(CloneableAny);
57clone_trait_object!(CloneableAny);
58
59impl<T> CloneableAny for T
60where
61 T: Any + Clone + Send + Sync + Debug,
62{
63 fn clone_box(&self) -> Box<dyn CloneableAny + 'static> {
64 Box::new(self.clone())
65 }
66}
67
68pub enum SharedValueMessage<T> {
69 Write {
70 data: T,
71 },
72 WriteLock {
73 data: T,
74 },
75 Read{ tx: DataSender<T>},
76 ReadLock { tx: DataSender<T> }
77}
78
79pub enum NodeManagerMessage {
80 Reg(JoinHandle<anyhow::Result<()>>),
81 Close
82}
83
84#[derive(Clone)]
85pub struct SharedValue {
86 sender: Arc<UnboundedSender<SharedValueMessage<Box<dyn CloneableAny>>>>,
87 #[allow(dead_code)]
88 notify: Arc<Notify>,
89}
90
91#[allow(unused_assignments)]
92impl SharedValue {
93 pub fn new<T: CloneableAny>(data: T) -> Self {
94 let (tx, mut rx) = unbounded_channel();
95 let notify = Arc::new(Notify::new());
96 let notify_clone = notify.clone();
97 let s_val = Self {
98 sender: Arc::new(tx),
99 notify: notify_clone,
100 };
101
102 tokio::spawn(async move {
103 let mut storage: Option<Box<dyn CloneableAny>> = Some(Box::new(data));
104 let mut read_locks = 0;
105 let mut write_lock = false;
106
107 while let Some(msg) = rx.recv().await {
108 match msg {
109 SharedValueMessage::Write { data } => {
110 if !write_lock {
111 storage = Some(data);
112 notify.notify_waiters();
113 }
114 }
115 SharedValueMessage::Read { tx } => {
116 if let Some(ref value) = storage {
117 let _ = tx.send(value.clone());
118 }
119 }
120 SharedValueMessage::WriteLock { data } => {
121 while write_lock || read_locks > 0 {
122 notify.notified().await;
123 }
124 write_lock = true;
125 storage = Some(data);
126 write_lock = false;
127 notify.notify_waiters();
128 }
129 SharedValueMessage::ReadLock { tx } => {
130 while write_lock {
131 notify.notified().await;
132 }
133 read_locks += 1;
134 if let Some(ref value) = storage {
135 let _ = tx.send(value.clone());
136 }
137 read_locks -= 1;
138 notify.notify_waiters();
139 }
140 }
141 }
142 });
143
144 s_val
145 }
146
147 pub async fn read<T: CloneableAny>(&self) -> T {
148 let (tx, rx) = oneshot::channel();
149 self.sender.send(SharedValueMessage::Read { tx }).unwrap();
150 *rx.await.unwrap().downcast::<T>().unwrap()
151 }
152
153 pub async fn read_lock<T: CloneableAny>(&self) -> T {
154 let (tx, rx) = oneshot::channel();
155 self.sender.send(SharedValueMessage::ReadLock { tx }).unwrap();
156 *rx.await.unwrap().downcast::<T>().unwrap()
157 }
158
159 pub fn write<T: CloneableAny>(&self, data: T) {
160 self.sender
161 .send(SharedValueMessage::Write {
162 data: Box::new(data),
163 })
164 .unwrap();
165 }
166
167 pub fn write_lock<T: CloneableAny>(&self, data: T) {
168 self.sender
169 .send(SharedValueMessage::WriteLock {
170 data: Box::new(data),
171 })
172 .unwrap();
173 }
174}
175
176pub fn wake_runtime<
177 M: Future<Output = Result<()>> + Sized,
178>(
179 fabric: fn() -> M,
180) -> anyhow::Result<()> {
181 let rt = Builder::new_multi_thread()
182 .worker_threads(num_cpus::get())
183 .enable_all()
184 .build()?;
185 rt.block_on(async move {
186 let node_manager = tokio::spawn(async move {
187 let mut nodes = FuturesUnordered::new();
188 loop {
189 if let Some(msg) = NODE_MANAGER_CHANNEL.clone().1.lock().await.recv().await {
190 match msg {
191 NodeManagerMessage::Reg(node) => {
192 nodes.push(node);
193 }
194 NodeManagerMessage::Close => {
195 while let Some(_node) = nodes.next().await {}
196 return ();
197 }
198 }
199 }
200 }
201 });
202 let handle = async move {
203 let res = fabric().await;
204 let _ = NODE_MANAGER_CHANNEL.clone().0.send(NodeManagerMessage::Close);
205 res
206 };
207 let res = join!(handle, node_manager);
208 res.0
209 })?;
210 Ok(())
211}
212
213pub fn spawn_node<F: Future<Output=anyhow::Result<()>> + Send + Sync + 'static>(fabric: F) -> anyhow::Result<()> {
214 NODE_MANAGER_CHANNEL.clone().0.send(NodeManagerMessage::Reg(tokio::task::spawn(fabric)))?;
215 Ok(())
216}