1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
//! This module contains the `Envelope` that allow
//! to call methods of actors related to a sepcific
//! imcoming message.

use crate::actor_runtime::{Actor, Context};
use crate::ids::{Id, IdOf};
use crate::lifecycle;
use crate::lite_runtime::LiteTask;
use anyhow::Error;
use async_trait::async_trait;
use futures::channel::oneshot;
use std::time::Instant;

pub(crate) struct Envelope<A: Actor> {
    handler: Box<dyn Handler<A>>,
}

impl<A: Actor> Envelope<A> {
    pub(crate) async fn handle(
        &mut self,
        actor: &mut A,
        ctx: &mut Context<A>,
    ) -> Result<(), Error> {
        self.handler.handle(actor, ctx).await
    }

    // TODO: Is it posiible to use `handle` method directly and drop this one?
    /// Creates an `Envelope` for `Action`.
    pub(crate) fn new<I>(input: I) -> Self
    where
        A: ActionHandler<I>,
        I: Action,
    {
        let handler = ActionHandlerImpl { input: Some(input) };
        Self {
            handler: Box::new(handler),
        }
    }
}

// TODO: Consider renaming to attached action
#[derive(Clone)]
pub(crate) enum Operation {
    // TODO: Awake, Interrupt, also can be added here!
    Done {
        id: Id,
    },
    /// Just process it with high-priority.
    Forward,
    /// The operation to schedule en action handling at the specific time.
    ///
    /// `Instant` used to avoid delays for sending and processing this `Operation` message.
    ///
    /// It can't be sent as normal priority, because the message has to be scheduled as
    /// soon as possible to reduce influence of the ordinary processing queue to execution time.
    Schedule {
        deadline: Instant,
    },
}

pub(crate) struct HpEnvelope<A: Actor> {
    pub operation: Operation,
    pub envelope: Envelope<A>,
}

/// Internal `Handler` type that used by `Actor`'s routine to execute
/// `ActionHandler` or `InteractionHandler`.
#[async_trait]
trait Handler<A: Actor>: Send {
    /// Main method that expects a mutable reference to `Actor` that
    /// will be used by implementations to handle messages.
    async fn handle(&mut self, actor: &mut A, _ctx: &mut Context<A>) -> Result<(), Error>;
}

/// `Action` type can be sent to an `Actor` that implements
/// `ActionHandler` for that message type.
pub trait Action: Send + 'static {
    /// Indicates that this message have to be sent with high-priority.
    ///
    /// IMPORTANT! It's not recommended to set high-priority for the end
    /// notifications, because it will be faster that ordinary messgaes
    /// generated by `Actor` during lifetime.
    fn is_high_priority(&self) -> bool {
        false
    }
}

/// Type of `Handler` to process incoming messages in one-shot style.
#[async_trait]
pub trait ActionHandler<I: Action>: Actor {
    /// Asyncronous method that receives incoming message.
    async fn handle(&mut self, input: I, _ctx: &mut Context<Self>) -> Result<(), Error>;
}

struct ActionHandlerImpl<I> {
    input: Option<I>,
}

#[async_trait]
impl<A, I> Handler<A> for ActionHandlerImpl<I>
where
    A: ActionHandler<I>,
    I: Action,
{
    async fn handle(&mut self, actor: &mut A, ctx: &mut Context<A>) -> Result<(), Error> {
        let input = self.input.take().expect("action handler called twice");
        actor.handle(input, ctx).await
    }
}

/// Implements an interaction with an `Actor`.
#[async_trait]
pub trait InteractionHandler<I: Interaction>: Actor {
    /// Asyncronous method that receives incoming message.
    async fn handle(&mut self, input: I, _ctx: &mut Context<Self>) -> Result<I::Output, Error>;
}

#[async_trait]
impl<T, I> ActionHandler<Interact<I>> for T
where
    T: InteractionHandler<I>,
    I: Interaction,
{
    async fn handle(&mut self, input: Interact<I>, ctx: &mut Context<Self>) -> Result<(), Error> {
        let res = InteractionHandler::handle(self, input.request, ctx).await;
        let send_res = input.responder.send(res);
        // TODO: How to improve that???
        match send_res {
            Ok(()) => Ok(()),
            Err(Ok(_)) => Err(Error::msg(
                "Can't send the successful result of interaction",
            )),
            Err(Err(err)) => Err(err),
        }
    }
}

pub(crate) struct Interact<T: Interaction> {
    pub request: T,
    pub responder: oneshot::Sender<Result<T::Output, Error>>,
}

impl<T: Interaction> Action for Interact<T> {
    fn is_high_priority(&self) -> bool {
        Interaction::is_high_priority(&self.request)
    }
}

/// Interaction message to an `Actor`.
pub trait Interaction: Send + 'static {
    /// The result of the `Interaction` that will be returned by `InteractionHandler`.
    type Output: Send + 'static;

    /// The priority of the `Interaction`.
    ///
    /// It goes to `Action` that used to implement interaction process directly.
    fn is_high_priority(&self) -> bool {
        false
    }
}

/// Represents initialization routine of an `Actor`.
#[async_trait]
pub trait StartedBy<A: Actor>: Actor {
    /// It's an initialization method of the `Actor`.
    async fn handle(&mut self, ctx: &mut Context<Self>) -> Result<(), Error>;
}

#[async_trait]
impl<T, S> ActionHandler<lifecycle::Awake<S>> for T
where
    T: StartedBy<S>,
    S: Actor,
{
    async fn handle(
        &mut self,
        _input: lifecycle::Awake<S>,
        ctx: &mut Context<Self>,
    ) -> Result<(), Error> {
        StartedBy::handle(self, ctx).await
    }
}

/// The listener to an interruption signal.
#[async_trait]
pub trait InterruptedBy<A: Actor>: Actor {
    /// Called when the `Actor` terminated by another actor.
    async fn handle(&mut self, ctx: &mut Context<Self>) -> Result<(), Error>;
}

#[async_trait]
impl<T, S> ActionHandler<lifecycle::Interrupt<S>> for T
where
    T: InterruptedBy<S>,
    S: Actor,
{
    async fn handle(
        &mut self,
        _input: lifecycle::Interrupt<S>,
        ctx: &mut Context<Self>,
    ) -> Result<(), Error> {
        InterruptedBy::handle(self, ctx).await
    }
}

/// Listens for spawned actors finished.
#[async_trait]
pub trait Eliminated<A: Actor>: Actor {
    /// Called when the `Actor` finished.
    async fn handle(&mut self, id: IdOf<A>, ctx: &mut Context<Self>) -> Result<(), Error>;
}

#[async_trait]
impl<T, C> ActionHandler<lifecycle::Done<C>> for T
where
    T: Eliminated<C>,
    C: Actor,
{
    async fn handle(
        &mut self,
        done: lifecycle::Done<C>,
        ctx: &mut Context<Self>,
    ) -> Result<(), Error> {
        Eliminated::handle(self, done.id, ctx).await
    }
}

/// Listens for spawned tasks finished.
#[async_trait]
pub trait TaskEliminated<T: LiteTask>: Actor {
    /// Called when the `Task` finished.
    async fn handle(&mut self, id: IdOf<T>, ctx: &mut Context<Self>) -> Result<(), Error>;
}

#[async_trait]
impl<T, C> ActionHandler<lifecycle::TaskDone<C>> for T
where
    T: TaskEliminated<C>,
    C: LiteTask,
{
    async fn handle(
        &mut self,
        done: lifecycle::TaskDone<C>,
        ctx: &mut Context<Self>,
    ) -> Result<(), Error> {
        TaskEliminated::handle(self, done.id, ctx).await
    }
}

pub(crate) struct StreamItem<T> {
    pub item: T,
}

impl<T: Send + 'static> Action for StreamItem<T> {}

/// Represents a capability to receive message from a `Stream`.
#[async_trait]
pub trait Consumer<T>: Actor {
    /// The method called when the next item received from a `Stream`.
    async fn handle(&mut self, item: T, ctx: &mut Context<Self>) -> Result<(), Error>;
}

#[async_trait]
impl<T, I> ActionHandler<StreamItem<I>> for T
where
    T: Consumer<I>,
    I: Send + 'static,
{
    async fn handle(&mut self, msg: StreamItem<I>, ctx: &mut Context<Self>) -> Result<(), Error> {
        Consumer::handle(self, msg.item, ctx).await
    }
}

/// Represents a capability to receive message from a `TryStream`.
#[async_trait]
pub trait TryConsumer<T>: Actor {
    /// `Error` value that can happen in a stream.
    type Error;
    /// The method called when the next item received from a `Stream`.
    async fn handle(&mut self, item: T, ctx: &mut Context<Self>) -> Result<(), Error>;
    /// The method called when the stream received an `Error`.
    async fn error(&mut self, error: Self::Error, ctx: &mut Context<Self>) -> Result<(), Error>;
}

#[async_trait]
impl<T, I> Consumer<Result<I, T::Error>> for T
where
    T: TryConsumer<I>,
    T::Error: Send,
    I: Send + 'static,
{
    async fn handle(
        &mut self,
        result: Result<I, T::Error>,
        ctx: &mut Context<Self>,
    ) -> Result<(), Error> {
        match result {
            Ok(item) => TryConsumer::handle(self, item, ctx).await,
            Err(err) => TryConsumer::error(self, err, ctx).await,
        }
    }
}

/// Used to wrap scheduled event.
pub(crate) struct ScheduledItem<T> {
    pub timestamp: Instant,
    pub item: T,
}

impl<T: Send + 'static> Action for ScheduledItem<T> {
    /// Priority never taken into account for `Scheduled` message,
    /// but it has high-priority to show that it will be called as
    /// soon as the deadline has reached.
    fn is_high_priority(&self) -> bool {
        true
    }
}

/// Represents reaction to a scheduled activity.
#[async_trait]
pub trait Scheduled<T>: Actor {
    /// The method called when the deadline has reached.
    async fn handle(
        &mut self,
        timestamp: Instant,
        item: T,
        ctx: &mut Context<Self>,
    ) -> Result<(), Error>;
}

#[async_trait]
impl<T, I> ActionHandler<ScheduledItem<I>> for T
where
    T: Scheduled<I>,
    I: Send + 'static,
{
    async fn handle(
        &mut self,
        msg: ScheduledItem<I>,
        ctx: &mut Context<Self>,
    ) -> Result<(), Error> {
        Scheduled::handle(self, msg.timestamp, msg.item, ctx).await
    }
}