1use std::any::Any;
2use std::future::Future;
3use std::sync::atomic::AtomicUsize;
4use std::sync::{Arc, Weak};
5
6use agner_utils::std_error_pp::StdErrorPP;
7use futures::{stream, Stream, StreamExt};
8use tokio::sync::{mpsc, oneshot, RwLock};
9use tracing::Instrument;
10
11use crate::actor::Actor;
12use crate::actor_id::ActorID;
13use crate::actor_runner::sys_msg::{ActorInfo, SysMsg};
14use crate::actor_runner::ActorRunner;
15use crate::exit::Exit;
16use crate::exit_handler::ExitHandler;
17use crate::spawn_opts::SpawnOpts;
18use crate::system_config::SystemConfig;
19
20mod actor_entry;
21mod sys_actor_entry;
22use actor_entry::ActorEntry;
23
24mod actor_id_pool;
25use actor_id_pool::ActorIDPool;
26
27mod errors;
28pub use errors::{SysChannelError, SysSpawnError};
29
30pub type ActorChannel<M> = mpsc::UnboundedSender<M>;
31
32#[derive(Debug, Clone)]
34pub struct System(Arc<Inner>);
35
36impl System {
37 pub fn rc_downgrade(&self) -> SystemWeakRef {
38 SystemWeakRef(Arc::downgrade(&self.0))
39 }
40}
41
42#[derive(Debug, Clone)]
43pub struct SystemWeakRef(Weak<Inner>);
44impl SystemWeakRef {
45 pub fn rc_upgrade(&self) -> Option<System> {
46 self.0.upgrade().map(System)
47 }
48}
49
50impl System {
51 pub fn new(config: SystemConfig) -> Self {
53 static NEXT_SYSTEM_ID: AtomicUsize = AtomicUsize::new(1);
54
55 let system_id = NEXT_SYSTEM_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
56
57 let actor_id_pool = ActorIDPool::new(system_id, config.max_actors);
58 let actor_entries =
59 (0..config.max_actors).map(|_| RwLock::new(Default::default())).collect();
60
61 let exit_handler = config.exit_handler.to_owned();
62
63 let inner = Inner { config, system_id, actor_id_pool, actor_entries, exit_handler };
64 Self(Arc::new(inner))
65 }
66
67 pub fn config(&self) -> &SystemConfig {
69 &self.0.config
70 }
71}
72
73impl System {
74 #[tracing::instrument(skip_all, fields(
95 sys_id = self.0.system_id,
96 behaviour = std::any::type_name::<Behaviour>(),
97 ))]
98 pub async fn spawn<Behaviour, Args, Message>(
99 &self,
100 behaviour: Behaviour,
101 args: Args,
102 mut spawn_opts: SpawnOpts,
103 ) -> Result<ActorID, SysSpawnError>
104 where
105 Args: Send + 'static,
106 Message: Unpin + Send + 'static,
107 for<'a> Behaviour: Actor<'a, Args, Message>,
108 {
109 let exit_handler =
110 spawn_opts.take_exit_handler().unwrap_or_else(|| self.0.exit_handler.to_owned());
111
112 let system = self.to_owned();
113 let actor_id_lease =
114 system.0.actor_id_pool.acquire_id().ok_or(SysSpawnError::MaxActorsLimit)?;
115 let actor_id = *actor_id_lease;
116
117 let (messages_tx, messages_rx) = mpsc::unbounded_channel::<Message>();
118 let (sys_msg_tx, sys_msg_rx) = mpsc::unbounded_channel();
119
120 let actor = ActorRunner {
121 actor_id,
122 system_opt: system.rc_downgrade(),
123 messages_rx,
124 sys_msg_rx,
125 sys_msg_tx: sys_msg_tx.to_owned(),
126 exit_handler,
127 spawn_opts,
128 };
129 tokio::spawn(actor.run(behaviour, args));
130
131 let entry = ActorEntry::new(actor_id_lease, messages_tx, sys_msg_tx);
132 self.actor_entry_put(entry).await;
136
137 Ok(actor_id)
138 }
139
140 #[tracing::instrument(skip_all, fields(
142 sys_id = self.0.system_id,
143 actor_id = display(actor_id),
144 exit_reason = display(exit_reason.pp())
145 ))]
146 pub async fn exit(&self, actor_id: ActorID, exit_reason: Exit) {
147 self.send_sys_msg(actor_id, SysMsg::SigExit(actor_id, exit_reason)).await;
148 }
149
150 pub fn wait(&self, actor_id: ActorID) -> impl Future<Output = Exit> {
154 let sys = self.clone();
155 async move {
156 let (tx, rx) = oneshot::channel();
157
158 if let Some(mut entry) = sys.actor_entry_write(actor_id).await {
159 entry.add_watch(tx);
160 } else {
161 tracing::warn!("attempt to install a watch before the ActorEntry is initialized [actor_id: {}]", actor_id);
162 }
163 rx.await.unwrap_or_else(|_| Exit::no_actor())
164 }.instrument(
165 tracing::span!(
166 tracing::Level::TRACE,
167 "System::wait",
168 sys_id = self.0.system_id,
169 actor_id = display(actor_id)
170 )
171 )
172 }
173
174 #[tracing::instrument(skip_all, fields(
180 sys_id = self.0.system_id,
181 to = display(to)
182 ))]
183 pub(crate) async fn send_sys_msg(&self, to: ActorID, sys_msg: SysMsg) -> bool {
184 tracing::trace!(
185 "[sys:{}] trying to send sys-msg [to: {}, sys-msg: {:?}]",
186 self.0.system_id,
187 to,
188 sys_msg
189 );
190
191 if let Some(entry) = self.actor_entry_read(to).await {
192 if entry.running_actor_id() == Some(to) {
193 if let Some(tx) = entry.sys_msg_tx() {
194 return tx.send(sys_msg).is_ok()
195 } else {
196 tracing::warn!("actor_entry is not occupied")
197 }
198 } else {
199 tracing::warn!("actor_id mismatch")
200 }
201 }
202 false
203 }
204
205 #[tracing::instrument(skip_all, fields(
207 sys_id = self.0.system_id,
208 to = display(to),
209 msg_type = std::any::type_name::<M>()
210 ))]
211 pub async fn send<M>(&self, to: ActorID, message: M)
212 where
213 M: Send + 'static,
214 {
215 tracing::trace!("trying to send message",);
216 if let Some(entry) = self.actor_entry_read(to).await {
217 if entry.running_actor_id() == Some(to) {
218 if let Some(tx) = entry.messages_tx::<M>() {
219 let _ = tx.send(message);
220 } else {
221 tracing::warn!("message-type mismatch or actor_entry is not occupied");
222 }
223 } else {
224 tracing::warn!("actor_id mismatch")
225 }
226 } else {
227 tracing::trace!("no actor_entry")
228 }
229 }
230
231 #[tracing::instrument(skip_all, fields(
237 sys_id = self.0.system_id,
238 to = display(to)
239 ))]
240 pub async fn channel<M>(&self, to: ActorID) -> Result<ActorChannel<M>, SysChannelError>
241 where
242 M: Send + 'static,
243 {
244 self.actor_entry_read(to)
245 .await
246 .ok_or(SysChannelError::NoActor)?
247 .messages_tx()
248 .cloned()
249 .ok_or(SysChannelError::InvalidMessageType)
250 }
251
252 #[tracing::instrument(skip_all, fields(
254 sys_id = self.0.system_id,
255 left = display(left),
256 right = display(right)
257 ))]
258 pub async fn link(&self, left: ActorID, right: ActorID) {
259 let left_accepted_sys_msg = self.send_sys_msg(left, SysMsg::Link(right)).await;
260 let right_accepted_sys_msg = self.send_sys_msg(right, SysMsg::Link(left)).await;
261
262 if !right_accepted_sys_msg {
263 self.send_sys_msg(left, SysMsg::SigExit(right, Exit::no_actor())).await;
264 }
265 if !left_accepted_sys_msg {
266 self.send_sys_msg(right, SysMsg::SigExit(left, Exit::no_actor())).await;
267 }
268 }
269
270 #[tracing::instrument(skip_all, fields(
274 sys_id = self.0.system_id,
275 actor_id = display(actor_id),
276 data_type = std::any::type_name::<D>()
277 ))]
278 pub async fn put_data<D: Any + Send + Sync + 'static>(&self, actor_id: ActorID, data: D) {
279 if let Some(mut actor_entry) = self.actor_entry_write(actor_id).await {
280 actor_entry.put_data(data);
281 }
282 }
283
284 #[tracing::instrument(skip_all, fields(
285 sys_id = self.0.system_id,
286 actor_id = display(actor_id),
287 data_type = std::any::type_name::<D>()
288 ))]
289 pub async fn get_data<D: Any + Clone>(&self, actor_id: ActorID) -> Option<D> {
290 self.actor_entry_read(actor_id)
291 .await
292 .and_then(|actor_entry| actor_entry.get_data().cloned())
293 }
294
295 #[tracing::instrument(skip_all, fields(
296 sys_id = self.0.system_id,
297 actor_id = display(actor_id),
298 data_type = std::any::type_name::<D>()
299 ))]
300 pub async fn take_data<D: Any>(&self, actor_id: ActorID) -> Option<D> {
301 self.actor_entry_write(actor_id)
302 .await
303 .and_then(|mut actor_entry| actor_entry.take_data())
304 }
305
306 pub fn all_actors(&self) -> impl Stream<Item = ActorID> + '_ {
307 stream::iter(&self.0.actor_entries[..])
308 .filter_map(|slot| async move { slot.read().await.running_actor_id() })
309 }
310
311 #[tracing::instrument(skip_all, fields(
312 sys_id = self.0.system_id,
313 actor_id = display(actor_id)
314 ))]
315 pub async fn actor_info(&self, actor_id: ActorID) -> Option<ActorInfo> {
316 let (tx, rx) = oneshot::channel();
317 self.send_sys_msg(actor_id, SysMsg::GetInfo(tx)).await;
318 rx.await.ok()
319 }
320}
321
322#[derive(Debug)]
323struct Inner {
324 config: SystemConfig,
325 system_id: usize,
326 actor_id_pool: ActorIDPool,
327 actor_entries: Box<[RwLock<ActorEntry>]>,
328 exit_handler: Arc<dyn ExitHandler>,
329}