1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
use alloc::sync::Arc;
use maitake_sync::RwLock;
use slacktor::Slacktor;
use crate::{Actor, ActorContext, ActorWrapper, Delegate, Handler, Identifier, IndeterminateMessage, LocalRef, MessageSender};
use alloc::string::String;
use alloc::collections::BTreeMap;
/// # [`Fluxion`]
/// Contains the core actor management functionality of fluxion
pub struct Fluxion<D> {
/// The underlying slacktor instance.
/// This is wrapped in an [`Arc`] and [`RwLock`] to allow concurrent access from different tasks.
/// The [`RwLock`] is used instead of a mutex because it can be assumed that actor references
/// will be retrieved more often than actors are created.
slacktor: Arc<RwLock<Slacktor>>,
/// A mapping of string actor names to their slacktor ids.
actor_ids: Arc<RwLock<BTreeMap<String, u64>>>,
/// The identifier of this system as a string
system_id: Arc<str>,
/// The foreign delegate of this system
delegate: Arc<D>,
}
impl<D> Clone for Fluxion<D> {
fn clone(&self) -> Self {
Self { slacktor: self.slacktor.clone(), system_id: self.system_id.clone(), delegate: self.delegate.clone(), actor_ids: self.actor_ids.clone() }
}
}
impl<D: Delegate> Fluxion<D> {
/// # [`Fluxion::new`]
/// Creates a new [`Fluxion`] instance with the given system id and delegate
#[must_use]
pub fn new(id: &str, delegate: D) -> Self {
Self {
slacktor: Arc::new(RwLock::new(Slacktor::new())),
system_id: id.into(),
delegate: Arc::new(delegate),
actor_ids: Arc::default(),
}
}
/// # [`Fluxion::get_delegate`]
/// Gets a reference to the delegate.
#[must_use]
pub fn get_delegate(&self) -> &D {
&self.delegate
}
/// # [`Fluxion::get_id`]
/// Gets the system's id
#[must_use]
pub fn get_id(&self) -> &str {
&self.system_id
}
/// # [`Fluxion::get_actor_id`]
/// Retrieve's an actor's ID by its name
#[must_use]
pub async fn get_actor_id(&self, name: &str) -> Option<u64> {
self.actor_ids.read().await.get(name).copied()
}
/// # [`Fluxion::add_named`]
/// Adds an actor to the local instance, returning its id and assigning
/// the given name to it for retrieval by [`Fluxion::get_actor_id`].
/// This is handy when using actors with static names on a foreign system.
/// <div class = "info">
/// Locks the underlying RwLock as write. This will block "management" functionalities such as adding, removing, and retrieving actors, but
/// will not block any messages.
/// </div>
/// <div class = "warn">
/// If an actor with a duplicate name is added, it will overwrite the original actor's name.
/// The original actor won't be killed, but it may become inaccessible.
/// </div>
///
/// # Errors
/// Returns an error if the actor failed to initialize.
/// On an error, the actor will not be spawned, and the name will not be assigned.
pub async fn add_named<A: Actor>(&self, name: &str, actor: A) -> Result<u64, A::Error> {
// Add the actor, assigning an id
let id = self.add(actor).await?;
// Store the actor's name in the actor_ids map
let mut actor_ids = self.actor_ids.write().await;
actor_ids.insert(String::from(name), id as u64);
// Return the actor's id.
Ok(id)
}
/// # [`Fluxion::add`]
/// Adds an actor to the local instance, returning its id.
/// <div class = "info">
/// Locks the underlying RwLock as write. This will block "management" functionalities such as adding, removing, and retrieving actors, but
/// will not block any messages.
/// </div>
///
/// # Errors
/// Returns an error if the actor failed to initialize.
/// On an error, the actor will not be spawned.
pub async fn add<A: Actor>(&self, mut actor: A) -> Result<u64, A::Error> {
// Run the actor's initialization code
actor.initialize().await?;
// Lock the underlying slacktor instance as write
let mut system = self.slacktor.write().await;
// Wrap the actor
let actor = ActorWrapper(actor, Arc::new(
ActorContext {
system: self.clone(),
id: system.next_id()
}
));
// Spawn the actor on the slacktor instance
let id = system.spawn(actor);
// Return the actor's id.
Ok(id as u64)
}
/// # [`Fluxion::kill`]
/// Given an actor's id, kills the actor
///
/// <div class = "info">
/// Locks the underlying RwLock as write. This will block "management" functionalities such as adding, removing, and retrieving actors, but
/// will not block any messages.
/// </div>
pub async fn kill<A: Actor>(&self, id: u64) {
// Realistically, it should not be possible for this conversion to ever fail.
// If the input id is more than usize::MAX, it is most likely an error on the caller's part,
// as it should be impossible to allocate over usize::MAX actors at all, because
// each actor has an overhead of more than one byte.
// We just fail silently here, as it is the same case as the actor not existing.
let Ok(id) = id.try_into() else {
return;
};
// Lock the underylying slacktor instance as write and kill the actor
self.slacktor.write().await.kill::<ActorWrapper<A, D>>(id).await;
// Shrink the slacktor instance
self.slacktor.write().await.shrink();
}
/// # [`Fluxion::get_local`]
/// Gets an actor that is known to reside on the local system.
/// This allows messages that are not serializable to still be used even if Fluxion is compiled with foreign message support.
/// This function also allows retrieving an actor handle that is capable of sending multiple different messages.
pub async fn get_local<A: Actor>(&self, id: u64) -> Option<LocalRef<A, D>> {
// If the id refers to a local actor, lock the slacktor
// instance as read, and retrieve the handle.
// The handle is then cloned and returned
self.slacktor.read().await.get::<ActorWrapper<A, D>>(
id.try_into().ok()? // If overflow, then the actor does not exist.
).cloned()
.map(|handle| LocalRef(handle, id))
}
/// # [`Fluxion::get`]
/// Retrieves an actor reference capable of communicating using the given message via the given ID.
#[cfg(feature = "serde")]
pub async fn get<'a, A: Handler<M>, M: IndeterminateMessage>(&self,
#[cfg(feature="foreign")] id: impl Into<Identifier<'a>>,
#[cfg(not(feature="foreign"))] id: impl Into<Identifier>
) -> Option<Arc<dyn MessageSender<M>>>
where M::Result: serde::Serialize + for<'d> serde::Deserialize<'d> {
match id.into() {
Identifier::Local(id) => {
// Get the local ref and wrap in an arc
self.get_local::<A>(id).await
.map(|h| Arc::new(h) as Arc<dyn MessageSender<M>>)
},
Identifier::LocalNamed(name) => {
// Get the actor's id by name
let id = self.get_actor_id(name).await?;
// Get the local ref and wrap in an arc
self.get_local::<A>(id).await
.map(|h| Arc::new(h) as Arc<dyn MessageSender<M>>)
},
#[cfg(feature = "foreign")]
id => {
// Send the request on to the delegate
self.delegate.get_actor::<A, M>(id).await
},
}
}
/// # [`Fluxion::get`]
/// Retrieves an actor reference capable of communicating using the given message via the given ID.
#[cfg(not(feature = "serde"))]
pub async fn get<'a, A: Handler<M>, M: IndeterminateMessage>(&self,
id: impl Into<Identifier<'a>>,
) -> Option<Arc<dyn MessageSender<M>>> {
match id.into() {
Identifier::Local(id) => {
// Get the local ref and wrap in an arc
self.get_local::<A>(id).await
.map(|h| Arc::new(h) as Arc<dyn MessageSender<M>>)
},
Identifier::LocalNamed(name) => {
// Get the actor's id by name
let id = self.get_actor_id(name).await?;
// Get the local ref and wrap in an arc
self.get_local::<A>(id).await
.map(|h| Arc::new(h) as Arc<dyn MessageSender<M>>)
},
#[cfg(feature = "foreign")]
id => {
// Send the request on to the delegate
self.delegate.get_actor::<A, M>(id).await
},
}
}
/// # [`Fluxion::shutdown`]
/// Removes all actors from the system and deallocates the underlying slab.
///
/// <div class = "info">
/// Locks the underlying RwLock as write. This will block "management" functionalities such as adding, removing, and retrieving actors, but
/// will not block any messages.
/// </div>
pub async fn shutdown(&self) {
self.slacktor.write().await.shutdown().await;
}
}