1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
use std::fmt::Debug;
use std::io::ErrorKind;

use bytes::Bytes;

use crate::actor::{ActorRef, BoxActorProd};
use crate::actors::{Evt, Topic};
use crate::system::{LogEntry, IoType, IoAddress};

/// Enum used to store messages in an actor's mailbox
#[derive(Debug, Clone)]
#[doc(hidden)]
pub enum Enqueued<T: Message> {
    ActorMsg(Envelope<T>),
    SystemMsg(SystemEnvelope<T>)
}

/// Wraps message and sender
#[derive(Debug, Clone)]
pub struct Envelope<T: Message> {
    pub sender: Option<ActorRef<T>>,
    pub msg: ActorMsg<T>,
}

/// Wraps system message and sender
#[derive(Clone, Debug)]
pub struct SystemEnvelope<T: Message> {
    pub sender: Option<ActorRef<T>>,
    pub msg: SystemMsg<T>,
}

/// Standard message type. All actor messages are `ActorMsg`
#[derive(Debug, Clone)]
pub enum ActorMsg<Msg: Message> {
    /// User message type
    User(Msg),

    /// Channel messages
    Channel(ChannelMsg<Msg>),

    /// IO messages (IoManager)
    IO(IOMsg<Msg>),

    /// Event sourcing messages
    ES(ESMsg<Msg>),

    /// CQRS messages
    CQ(CQMsg<Msg>),

    /// Request actor info
    Identify,

    /// Response to Identify
    Info(Info),

    /// Dead letter messages
    DeadLetter(Box<DeadLetter<Msg>>),

    /// A utility message for user to schedule actor execution
    Tick,

    // ...
}

#[derive(Debug, Clone)]
pub enum ChannelMsg<Msg: Message> {
    /// Publish message
    Publish(Topic, Msg),

    /// Publish system event
    PublishEvent(SystemEvent<Msg>),

    /// Publish dead letter
    PublishDeadLetter(Box<DeadLetter<Msg>>),

    /// Subscribe given `ActorRef` to a topic on a channel
    Subscribe(Topic, ActorRef<Msg>),

    /// Unsubscribe the given `ActorRef` from a topic on a channel
    Unsubscribe(Topic, ActorRef<Msg>),

    /// Unsubscribe the given `ActorRef` from all topics on a channel
    UnsubscribeAll(ActorRef<Msg>),
}

impl<Msg: Message> Into<ActorMsg<Msg>> for ChannelMsg<Msg> {
    fn into(self) -> ActorMsg<Msg> {
        ActorMsg::Channel(self)
    }
}

#[derive(Debug, Clone)]
pub enum IOMsg<Msg: Message> {
    /// Register a connection manager for the given `IoType`
    Manage(IoType, BoxActorProd<Msg>),

    /// Bind on an IO type, e.g. TCP Socket
    Bind(IoType, IoAddress, ActorRef<Msg>),

    /// Received when an IO type is bound, e.g. TCP Socket
    Bound(IoAddress),

    /// Unbind an IO type, e.g. TCP Socket
    Unbind,

    /// Received when an IO type is unbound
    Unbound(IoAddress),

    /// Connect to an `IoAddress`, e.g. TCP/IP Address
    Connect(IoType, IoAddress),

    /// Received when an IO type is connected, e.g. TCP/IP Address
    Connected(IoAddress, IoAddress),

    /// Register given actor to receive data on a connected `IoAddress`
    Register(ActorRef<Msg>),

    /// Close the IO resource, e.g. disconnect from TCP/IP Address
    Close,

    /// Received when an IO resource is closed
    Closed,
    
    /// IO resource is ready to read or write
    Ready,

    /// Write given bytes to IO resource
    Write(Bytes),

    /// Currently not used
    Send(IoType),

    /// ???
    TryRead,

    /// Received when IO resource reads bytes
    Received(Bytes),

    /// Flush any cached data
    Flush,

    /// Received when an IO operation failed, e.g. `Bind` and `Connect`
    Failed(ErrorKind),
}

impl<Msg: Message> Into<ActorMsg<Msg>> for IOMsg<Msg> {
    fn into(self) -> ActorMsg<Msg> {
        ActorMsg::IO(self)
    }
}

#[derive(Debug, Clone)]
pub enum ESMsg<Msg: Message> {
    /// Persist given Evt to the event store. (Event to store, Unique ID, Keyspace, Optional Sender)
    Persist(Evt<Msg>, String, String, Option<ActorRef<Msg>>),

    /// Load all events from the event store. (Unique ID, Keyspace)
    Load(String, String),

    /// Received when loading events
    LoadResult(Vec<Msg>),
}

impl<Msg: Message> Into<ActorMsg<Msg>> for ESMsg<Msg> {
    fn into(self) -> ActorMsg<Msg> {
        ActorMsg::ES(self)
    }
}

#[derive(Clone, Debug)]
pub enum CQMsg<Msg: Message> {
    /// CQRS command message
    Cmd(String, Msg),
}

impl<Msg: Message> Into<ActorMsg<Msg>> for CQMsg<Msg> {
    fn into(self) -> ActorMsg<Msg> {
        ActorMsg::CQ(self)
    }
}

/// Message type to request actor info
pub struct Identify;

impl<Msg: Message> Into<ActorMsg<Msg>> for Identify {
    fn into(self) -> ActorMsg<Msg> {
        ActorMsg::Identify
    }
}

/// Message type received in response to `Identify`
#[derive(Debug, Clone)]
pub struct Info; // this will be expanded to a full struct, later, containing stats on the actor

impl<Msg: Message> Into<ActorMsg<Msg>> for Info {
    fn into(self) -> ActorMsg<Msg> {
        ActorMsg::Info(self)
    }
}

#[derive(Clone, Debug)]
pub enum SystemMsg<Msg: Message> {
    ActorInit,
    ActorCmd(ActorCmd),
    Event(SystemEvent<Msg>),
    Failed(ActorRef<Msg>),
    Persisted(Msg, Option<ActorRef<Msg>>),
    Replay(Vec<Msg>),
    Log(LogEntry),
}

#[derive(Clone, Debug)]
pub enum ActorCmd {
    Stop,
    Restart,
}

#[derive(Clone, Debug)]
pub enum SystemEvent<Msg: Message> {
    /// An actor was terminated
    ActorTerminated(ActorRef<Msg>),

    /// An actor was restarted
    ActorRestarted(ActorRef<Msg>),

    /// An actor was started
    ActorCreated(ActorRef<Msg>),
}

#[derive(Clone, Debug)]
pub enum SystemEventType {
    ActorTerminated,
    ActorRestarted,
    ActorCreated,
}

#[derive(Clone, Debug)]
pub struct DeadLetter<Msg: Message> {
    pub msg: ActorMsg<Msg>,
    pub sender: String,
    pub recipient: String,
}

impl<Msg: Message> Into<ActorMsg<Msg>> for DeadLetter<Msg> {
    fn into(self) -> ActorMsg<Msg> {
        ActorMsg::DeadLetter(Box::new(self))
    }
}

// implement Into<ActorMsg<Msg>> for common types String, u32
impl Into<ActorMsg<String>> for String {
    fn into(self) -> ActorMsg<String> {
        ActorMsg::User(self)
    }
}

impl<'a> Into<ActorMsg<String>> for &'a str {
    fn into(self) -> ActorMsg<String> {
        ActorMsg::User(self.to_string())
    }
}

impl Into<ActorMsg<u32>> for u32 {
    fn into(self) -> ActorMsg<u32> {
        ActorMsg::User(self)
    }
}

unsafe impl<T: Message> Send for Envelope<T> {}

pub trait Message: Debug + Clone + Send + Into<ActorMsg<Self>> + 'static {}
impl<T: Debug + Clone + Send + Into<ActorMsg<Self>> + 'static> Message for T {}