1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
use super::*;
use crate::{
    component::Handled,
    messaging::{DispatchEnvelope, MsgEnvelope, NetMessage, UnpackError},
    prelude::NetworkStatusPort,
};
use std::{
    fmt,
    sync::{Arc, Weak},
};

mod paths;
mod refs;
pub use paths::*;
pub use refs::*;

/// Just a trait alias hack to avoid constantly writing `Debug+Send+'static`
pub trait MessageBounds: fmt::Debug + Send + 'static
where
    Self: std::marker::Sized,
{
    // Trait aliases need no methods
}
impl<M> MessageBounds for M
where
    M: fmt::Debug + Send + 'static,
{
    // Nothing to implement
}

/// The base trait for all actors
///
/// This trait handles raw message envelopes, without any unpacking
/// or other convenient syntactic sugars.
///
/// Usually it's better to use the unwrapped functions in [Actor](Actor),
/// but this can be more efficient at times, for example
/// when the content of the envelope isn't actually accessed in the component.
///
/// # Example
///
/// ```
/// use kompact::prelude::*;
///
/// #[derive(ComponentDefinition)]
/// struct RawActor {
///    ctx: ComponentContext<Self>
/// }
/// ignore_lifecycle!(RawActor);
/// impl ActorRaw for RawActor {
///     type Message = ();
///
///     fn receive(&mut self, env: MsgEnvelope<Self::Message>) -> Handled {
///         match env {
///            MsgEnvelope::Typed(m) => info!(self.log(), "Got a local message: {:?}", m),
///            MsgEnvelope::Net(nm) => info!(self.log(), "Got a network message: {:?}", nm),
///         }
///         Handled::Ok
///     }
/// }
/// ```
pub trait ActorRaw {
    /// The type of local messages the actor accepts
    type Message: MessageBounds;

    /// Handle an incoming message
    ///
    /// Incoming messages can either be local, in which case they are of type
    /// `Self::Message` (wrapped into a [MsgEnvelope](MsgEnvelope::Typed)),
    /// or they are coming from the network, in which case they of type
    /// [NetMessage](NetMessage) (again wrapped into a [MsgEnvelope](MsgEnvelope::Net)).
    ///
    /// # Note
    ///
    /// Remember that components usually run on a shared thread pool,
    /// so, just like for [handle](Provide::handle) implementations,
    /// you shouldn't ever block in this method unless you know what you are doing.
    fn receive(&mut self, env: MsgEnvelope<Self::Message>) -> Handled;
}

/// A slightly higher level Actor API that handles both local and networked messages
///
/// This trait should generally be preferred over [ActorRaw](ActorRaw), as it abstracts
/// away the message envelope enum used internally.
///
/// Actor can also be derived via `#[derive(Actor)]` for components which don't require
/// this messaging mechanism.
///
/// # Example
///
/// ```
/// use kompact::prelude::*;
///
/// #[derive(ComponentDefinition)]
/// struct NormalActor {
///    ctx: ComponentContext<Self>
/// }
/// ignore_lifecycle!(NormalActor);
/// impl Actor for NormalActor {
///     type Message = ();
///
///     fn receive_local(&mut self, msg: Self::Message) -> Handled {
///         info!(self.log(), "Got a local message: {:?}", msg);
///         Handled::Ok
///     }
///
///     fn receive_network(&mut self, msg: NetMessage) -> Handled {
///         info!(self.log(), "Got a network message: {:?}", msg);
///         Handled::Ok
///     }
/// }
/// ```
pub trait Actor {
    /// The type of local messages the actor accepts
    type Message: MessageBounds;

    /// Handle an incoming local message
    ///
    /// Local message are of type `Self::Message`.
    ///
    /// # Note
    ///
    /// Remember that components usually run on a shared thread pool,
    /// so, just like for [handle](Provide::handle) implementations,
    /// you shouldn't ever block in this method unless you know what you are doing.
    fn receive_local(&mut self, msg: Self::Message) -> Handled;

    /// Handle an incoming network message
    ///
    /// Network messages are of type [NetMessage](NetMessage) and
    /// can be either be serialised data, or a heap-allocated "reflected"
    /// message. Messages are "reflected" instead of serialised whenever possible
    /// for messages sent to an [ActorPath](ActorPath) that turned out to be in the same
    /// [KompactSystem](KompactSystem).
    ///
    /// # Note
    ///
    /// Remember that components usually run on a shared thread pool,
    /// so, just like for [handle](Provide::handle) implementations,
    /// you shouldn't ever block in this method unless you know what you are doing.
    fn receive_network(&mut self, msg: NetMessage) -> Handled;
}

/// A dispatcher is a system component that knows how to route messages and create system paths
///
/// If you need a custom networking implementation, it must implement `Dispatcher`
/// to allow messages to be routed correctly to channels, for example.
///
/// See [NetworkDispatcher](crate::prelude::NetworkDispatcher) for the provided networking dispatcher solution.
pub trait Dispatcher: ActorRaw<Message = DispatchEnvelope> {
    /// Returns the system path for this dispatcher
    fn system_path(&mut self) -> SystemPath;
    /// Returns a mutable pointer to the dispatchers provided NetworkStatusPort
    /// Can be unimplemented in systems where the NetworkStatusPort is not used
    fn network_status_port(&mut self) -> &mut ProvidedPort<NetworkStatusPort>;
}

impl<A, M: MessageBounds> ActorRaw for A
where
    A: Actor<Message = M>,
{
    type Message = M;

    fn receive(&mut self, env: MsgEnvelope<M>) -> Handled {
        match env {
            MsgEnvelope::Typed(m) => self.receive_local(m),
            MsgEnvelope::Net(nm) => self.receive_network(nm),
        }
    }
}

/// A trait for things that have associated [actor references](ActorRef)
pub trait ActorRefFactory {
    /// The type of messages carried by references produced by this factory
    type Message: MessageBounds;

    /// Returns the associated actor reference
    fn actor_ref(&self) -> ActorRef<Self::Message>;
}

/// A trait for things that can deal with [network messages](NetMessage)
pub trait DynActorRefFactory {
    /// Returns a version of an actor ref that can only be used for [network messages](NetMessage),
    /// but not for typed messages
    fn dyn_ref(&self) -> DynActorRef;
}

/// A trait for accessing [dispatcher references](DispatcherRef)
pub trait Dispatching {
    /// Returns the associated dispatcher reference
    fn dispatcher_ref(&self) -> DispatcherRef;
}

/// A trait for actors that handle the same set of messages locally and remotely
///
/// Implementing this trait is roughly equivalent to the default assumptions in most
/// actor system implementations with a distributed runtime (Erlang, Akka, Orelans, etc.).
///
/// # Example
///
/// This implementation uses the trivial default [Deserialiser](Deserialiser)
/// for the unit type `()` from the `serialisation::default_serialisiers` module
/// (which is imported by default with the prelude).
///
/// ```
/// use kompact::prelude::*;
///
/// #[derive(ComponentDefinition)]
/// struct NetActor {
///    ctx: ComponentContext<Self>
/// }
/// ignore_lifecycle!(NetActor);
/// impl NetworkActor for NetActor {
///     type Message = ();
///     type Deserialiser = ();
///
///     fn receive(&mut self, sender: Option<ActorPath>, msg: Self::Message) -> Handled {
///         info!(self.log(), "Got a local or deserialised remote message: {:?}", msg);
///         Handled::Ok
///     }
/// }
/// ```
pub trait NetworkActor: ComponentLogging {
    /// The type of messages the actor accepts
    type Message: MessageBounds;
    /// The deserialiser used to unpack [network messages](NetMessage)
    /// into `Self::Message`.
    type Deserialiser: Deserialiser<Self::Message>;

    /// Handles all messages after deserialisation
    ///
    /// The `sender` argument will only be supplied if the original message
    /// was a [NetMessage](crate::messaging::NetMessage), otherwise it's `None`.
    ///
    /// All messages are of type `Self::Message`.
    ///
    /// # Note
    ///
    /// Remember that components usually run on a shared thread pool,
    /// so, just like for [handle](Provide::handle) implementations,
    /// you shouldn't ever block in this method unless you know what you are doing.
    fn receive(&mut self, sender: Option<ActorPath>, msg: Self::Message) -> Handled;

    /// Handle errors during unpacking of network messages
    ///
    /// The default implementation logs every error as a warning.
    fn on_error(&mut self, error: UnpackError<NetMessage>) -> Handled {
        warn!(
            self.log(),
            "Could not deserialise a message with Deserialiser with id={}. Error was: {:?}",
            Self::Deserialiser::SER_ID,
            error
        );
        Handled::Ok
    }
}

impl<A, M, D> Actor for A
where
    M: MessageBounds,
    D: Deserialiser<M>,
    A: NetworkActor<Message = M, Deserialiser = D>,
{
    type Message = M;

    #[inline(always)]
    fn receive_local(&mut self, msg: Self::Message) -> Handled {
        self.receive(None, msg)
    }

    #[inline(always)]
    fn receive_network(&mut self, msg: NetMessage) -> Handled {
        match msg.try_into_deserialised::<_, <Self as NetworkActor>::Deserialiser>() {
            Ok(m) => self.receive(Some(m.sender), m.content),
            Err(e) => self.on_error(e),
        }
    }
}