near_async/
lib.rs

1pub use near_async_derive::{MultiSend, MultiSenderFrom};
2
3mod functional;
4pub mod futures;
5pub mod instrumentation;
6pub mod messaging;
7pub mod multithread;
8pub mod test_loop;
9pub mod tokio;
10
11use crate::futures::FutureSpawner;
12use crate::messaging::Actor;
13use crate::multithread::runtime_handle::{MultithreadRuntimeHandle, spawn_multithread_actor};
14use crate::tokio::TokioRuntimeHandle;
15use crate::tokio::runtime_handle::{TokioRuntimeBuilder, spawn_tokio_actor};
16pub use near_time as time;
17use parking_lot::Mutex;
18use std::any::type_name;
19use std::sync::Arc;
20use std::sync::atomic::AtomicU64;
21use tokio_util::sync::CancellationToken;
22
23/// Sequence number to be shared for all messages, to distinguish messages when logging.
24static MESSAGE_SEQUENCE_NUM: AtomicU64 = AtomicU64::new(0);
25
26pub(crate) fn next_message_sequence_num() -> u64 {
27    MESSAGE_SEQUENCE_NUM.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
28}
29
30// Quick and dirty way of getting the type name without the module path.
31// Does not work for more complex types like std::sync::Arc<std::sync::atomic::AtomicBool<...>>
32// example near_chunks::shards_manager_actor::ShardsManagerActor -> ShardsManagerActor
33// To support using it with "SpanWrapped<>" types, we trim the trailing '>' characters.
34fn pretty_type_name<T>() -> &'static str {
35    type_name::<T>().rsplit("::").next().unwrap().trim_end_matches('>')
36}
37
38/// Actor that doesn't handle any messages and does nothing. It's used to host a runtime that can
39/// run futures only.
40struct EmptyActor;
41impl Actor for EmptyActor {}
42
43/// Represents a collection of actors, so that they can be shutdown together.
44#[derive(Clone)]
45pub struct ActorSystem {
46    /// Cancellation token used to signal shutdown of Tokio runtimes spawned with this actor system.
47    tokio_cancellation_signal: CancellationToken,
48    /// Cancellation signal used to signal shutdown of multithread actors spawned with this actor
49    /// system. To send the cancellation signal, the sender is dropped, which causes the receivers
50    /// to error.
51    multithread_cancellation_signal: Arc<Mutex<Option<crossbeam_channel::Sender<()>>>>,
52    multithread_cancellation_receiver: crossbeam_channel::Receiver<()>,
53}
54
55impl ActorSystem {
56    pub fn new() -> Self {
57        let mut systems = ACTOR_SYSTEMS.lock();
58        let (multithread_cancellation_sender, multithread_cancellation_receiver) =
59            crossbeam_channel::bounded(0);
60        let ret = Self {
61            tokio_cancellation_signal: CancellationToken::new(),
62            multithread_cancellation_signal: Arc::new(Mutex::new(Some(
63                multithread_cancellation_sender,
64            ))),
65            multithread_cancellation_receiver,
66        };
67        systems.push(ret.clone());
68        ret
69    }
70
71    pub fn stop(&self) {
72        tracing::info!("Stopping all actors in ActorSystem");
73        self.tokio_cancellation_signal.cancel();
74        self.multithread_cancellation_signal.lock().take();
75    }
76
77    /// Spawns an actor in a single threaded Tokio runtime and returns a handle to it.
78    /// The handle can be used to get the sender and future spawner for the actor.
79    ///
80    /// ```rust, ignore
81    ///
82    /// struct MyActor;
83    ///
84    /// impl Actor for MyActor {}
85    ///
86    /// impl Handler<MyMessage> for MyActor {
87    ///     fn handle(&mut self, msg: MyMessage) {}
88    /// }
89    ///
90    /// // We can use the actor_handle to create senders and future spawners.
91    /// let actor_handle = actor_system.spawn_tokio_actor(MyActor);
92    ///
93    /// let sender: MyAdapter = actor_handle.sender();
94    /// let future_spawner = actor_handle.future_spawner();
95    /// ```
96    ///
97    /// The sender and future spawner can then be passed onto other components that need to send messages
98    /// to the actor or spawn futures in the runtime of the actor.
99    pub fn spawn_tokio_actor<A: messaging::Actor + Send + 'static>(
100        &self,
101        actor: A,
102    ) -> TokioRuntimeHandle<A> {
103        spawn_tokio_actor(
104            actor,
105            std::any::type_name::<A>().to_string(),
106            self.tokio_cancellation_signal.clone(),
107        )
108    }
109
110    /// A more granular way to build a tokio runtime. It allows spawning futures and getting a handle
111    /// before the actor is constructed (so that the actor can be constructed with the handle,
112    /// for sending messages to itself).
113    pub fn new_tokio_builder<A: messaging::Actor + Send + 'static>(
114        &self,
115    ) -> TokioRuntimeBuilder<A> {
116        TokioRuntimeBuilder::new(
117            pretty_type_name::<A>().to_string(),
118            self.tokio_cancellation_signal.clone(),
119        )
120    }
121
122    /// Spawns a multi-threaded actor which handles messages in a synchronous thread pool.
123    /// Used similarly to `spawn_tokio_actor`, but this actor is intended for CPU-bound tasks,
124    /// can run multiple threads, and does not support futures, timers, or delayed messages.
125    pub fn spawn_multithread_actor<A: messaging::Actor + Send + 'static>(
126        &self,
127        num_threads: usize,
128        make_actor_fn: impl Fn() -> A + Sync + Send + 'static,
129    ) -> MultithreadRuntimeHandle<A> {
130        spawn_multithread_actor(
131            num_threads,
132            make_actor_fn,
133            self.multithread_cancellation_receiver.clone(),
134            None,
135        )
136    }
137
138    /// Returns a future spawner for the actor system on an independent Tokio runtime.
139    /// Note: For typical actors, it is recommended we use the future spawner of the
140    /// actor instead.
141    ///
142    /// This is useful for keeping track of spawned futures and their lifetimes.
143    /// Behind the scenes, this builds a new EmptyActor each time.
144    pub fn new_future_spawner(&self, description: &str) -> Box<dyn FutureSpawner> {
145        let handle = spawn_tokio_actor(
146            EmptyActor,
147            description.to_string(),
148            self.tokio_cancellation_signal.clone(),
149        );
150        handle.future_spawner()
151    }
152}
153
154/// Spawns a future spawner that is NOT owned by any ActorSystem.
155/// Rather, the returned FutureSpawner, when dropped, will stop the runtime.
156pub fn new_owned_future_spawner(description: &str) -> Box<dyn FutureSpawner> {
157    Box::new(OwnedFutureSpawner {
158        handle: spawn_tokio_actor(EmptyActor, description.to_string(), CancellationToken::new()),
159    })
160}
161
162/// Spawns a multithreaded actor which is NOT owned by any ActorSystem.
163/// Rather, the returned handle, when dropped, will stop the actor and its runtime.
164pub fn new_owned_multithread_actor<A: Actor + Send + 'static>(
165    num_threads: usize,
166    make_actor_fn: impl Fn() -> A + Sync + Send + 'static,
167) -> MultithreadRuntimeHandle<A> {
168    let (cancellation_signal, cancellation_receiver) = crossbeam_channel::bounded::<()>(0);
169    spawn_multithread_actor(
170        num_threads,
171        make_actor_fn,
172        cancellation_receiver,
173        Some(cancellation_signal), // never cancelled
174    )
175}
176
177struct OwnedFutureSpawner {
178    handle: TokioRuntimeHandle<EmptyActor>,
179}
180
181impl FutureSpawner for OwnedFutureSpawner {
182    fn spawn_boxed(&self, description: &'static str, f: crate::futures::BoxFuture<'static, ()>) {
183        self.handle.future_spawner().spawn_boxed(description, f);
184    }
185}
186
187impl Drop for OwnedFutureSpawner {
188    fn drop(&mut self) {
189        self.handle.stop();
190    }
191}
192
193/// Used to determine whether shutdown_all_actors is being used properly. If there are multiple
194/// ActorSystems, shutdown_all_actors shall not be used, but instead the test needs to manage
195/// the shutdown of each ActorSystem individually.
196static ACTOR_SYSTEMS: Mutex<Vec<ActorSystem>> = Mutex::new(Vec::new());
197
198/// Shutdown all actors, assuming at most one ActorSystem.
199/// TODO(#14005): Ideally, shutting down actors should not be done by calling a global function.
200pub fn shutdown_all_actors() {
201    {
202        let systems = ACTOR_SYSTEMS.lock();
203        if systems.len() > 1 {
204            panic!("shutdown_all_actors should not be used when there are multiple ActorSystems");
205        }
206        if let Some(system) = systems.first() {
207            system.stop();
208        }
209    }
210}