1pub use near_async_derive::{MultiSend, MultiSenderFrom};
2
3mod functional;
4pub mod futures;
5pub mod instrumentation;
6pub mod messaging;
7pub mod multithread;
8pub mod test_loop;
9pub mod tokio;
10
11use crate::futures::FutureSpawner;
12use crate::messaging::Actor;
13use crate::multithread::runtime_handle::{MultithreadRuntimeHandle, spawn_multithread_actor};
14use crate::tokio::TokioRuntimeHandle;
15use crate::tokio::runtime_handle::{TokioRuntimeBuilder, spawn_tokio_actor};
16pub use near_time as time;
17use parking_lot::Mutex;
18use std::any::type_name;
19use std::sync::Arc;
20use std::sync::atomic::AtomicU64;
21use tokio_util::sync::CancellationToken;
22
23static MESSAGE_SEQUENCE_NUM: AtomicU64 = AtomicU64::new(0);
25
26pub(crate) fn next_message_sequence_num() -> u64 {
27 MESSAGE_SEQUENCE_NUM.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
28}
29
30fn pretty_type_name<T>() -> &'static str {
35 type_name::<T>().rsplit("::").next().unwrap().trim_end_matches('>')
36}
37
38struct EmptyActor;
41impl Actor for EmptyActor {}
42
43#[derive(Clone)]
45pub struct ActorSystem {
46 tokio_cancellation_signal: CancellationToken,
48 multithread_cancellation_signal: Arc<Mutex<Option<crossbeam_channel::Sender<()>>>>,
52 multithread_cancellation_receiver: crossbeam_channel::Receiver<()>,
53}
54
55impl ActorSystem {
56 pub fn new() -> Self {
57 let mut systems = ACTOR_SYSTEMS.lock();
58 let (multithread_cancellation_sender, multithread_cancellation_receiver) =
59 crossbeam_channel::bounded(0);
60 let ret = Self {
61 tokio_cancellation_signal: CancellationToken::new(),
62 multithread_cancellation_signal: Arc::new(Mutex::new(Some(
63 multithread_cancellation_sender,
64 ))),
65 multithread_cancellation_receiver,
66 };
67 systems.push(ret.clone());
68 ret
69 }
70
71 pub fn stop(&self) {
72 tracing::info!("Stopping all actors in ActorSystem");
73 self.tokio_cancellation_signal.cancel();
74 self.multithread_cancellation_signal.lock().take();
75 }
76
77 pub fn spawn_tokio_actor<A: messaging::Actor + Send + 'static>(
100 &self,
101 actor: A,
102 ) -> TokioRuntimeHandle<A> {
103 spawn_tokio_actor(
104 actor,
105 std::any::type_name::<A>().to_string(),
106 self.tokio_cancellation_signal.clone(),
107 )
108 }
109
110 pub fn new_tokio_builder<A: messaging::Actor + Send + 'static>(
114 &self,
115 ) -> TokioRuntimeBuilder<A> {
116 TokioRuntimeBuilder::new(
117 pretty_type_name::<A>().to_string(),
118 self.tokio_cancellation_signal.clone(),
119 )
120 }
121
122 pub fn spawn_multithread_actor<A: messaging::Actor + Send + 'static>(
126 &self,
127 num_threads: usize,
128 make_actor_fn: impl Fn() -> A + Sync + Send + 'static,
129 ) -> MultithreadRuntimeHandle<A> {
130 spawn_multithread_actor(
131 num_threads,
132 make_actor_fn,
133 self.multithread_cancellation_receiver.clone(),
134 None,
135 )
136 }
137
138 pub fn new_future_spawner(&self, description: &str) -> Box<dyn FutureSpawner> {
145 let handle = spawn_tokio_actor(
146 EmptyActor,
147 description.to_string(),
148 self.tokio_cancellation_signal.clone(),
149 );
150 handle.future_spawner()
151 }
152}
153
154pub fn new_owned_future_spawner(description: &str) -> Box<dyn FutureSpawner> {
157 Box::new(OwnedFutureSpawner {
158 handle: spawn_tokio_actor(EmptyActor, description.to_string(), CancellationToken::new()),
159 })
160}
161
162pub fn new_owned_multithread_actor<A: Actor + Send + 'static>(
165 num_threads: usize,
166 make_actor_fn: impl Fn() -> A + Sync + Send + 'static,
167) -> MultithreadRuntimeHandle<A> {
168 let (cancellation_signal, cancellation_receiver) = crossbeam_channel::bounded::<()>(0);
169 spawn_multithread_actor(
170 num_threads,
171 make_actor_fn,
172 cancellation_receiver,
173 Some(cancellation_signal), )
175}
176
177struct OwnedFutureSpawner {
178 handle: TokioRuntimeHandle<EmptyActor>,
179}
180
181impl FutureSpawner for OwnedFutureSpawner {
182 fn spawn_boxed(&self, description: &'static str, f: crate::futures::BoxFuture<'static, ()>) {
183 self.handle.future_spawner().spawn_boxed(description, f);
184 }
185}
186
187impl Drop for OwnedFutureSpawner {
188 fn drop(&mut self) {
189 self.handle.stop();
190 }
191}
192
193static ACTOR_SYSTEMS: Mutex<Vec<ActorSystem>> = Mutex::new(Vec::new());
197
198pub fn shutdown_all_actors() {
201 {
202 let systems = ACTOR_SYSTEMS.lock();
203 if systems.len() > 1 {
204 panic!("shutdown_all_actors should not be used when there are multiple ActorSystems");
205 }
206 if let Some(system) = systems.first() {
207 system.stop();
208 }
209 }
210}