1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
//! Contains message of the `Actor`'s lifecycle.

use crate::actor_runtime::Actor;
use crate::handlers::{InstantAction, InstantActionHandler, Operation, Parcel};
use crate::ids::{Id, IdOf};
use crate::linkage::Address;
use crate::lite_runtime::{LiteTask, Tag, TaskAddress, TaskError};
use anyhow::Error;
use std::collections::{HashMap, HashSet};
use std::marker::PhantomData;

struct Stage {
    terminating: bool,
    ids: HashSet<Id>,
}

impl Default for Stage {
    fn default() -> Self {
        Self {
            terminating: false,
            ids: HashSet::new(),
        }
    }
}

impl Stage {
    fn is_finished(&self) -> bool {
        self.terminating && self.ids.is_empty()
    }
}

struct Record<A: Actor> {
    group: A::GroupBy,
    notifier: Box<dyn LifecycleNotifier<Interrupt<A>>>,
}

impl<A: Actor> Record<A> {
    fn interrupt(&mut self) -> Result<(), Error> {
        self.notifier.notify(Interrupt::new())
    }
}

// TODO: Rename to Terminator again
pub(crate) struct LifetimeTracker<A: Actor> {
    terminating: bool,
    prioritized: Vec<A::GroupBy>,
    stages: HashMap<A::GroupBy, Stage>,
    records: HashMap<Id, Record<A>>,
}

// TODO: Change T to A
impl<A: Actor> LifetimeTracker<A> {
    // TODO: Make the constructor private
    pub fn new() -> Self {
        Self {
            terminating: false,
            // TODO: with_capacity 0 ?
            prioritized: Vec::new(),
            stages: HashMap::new(),
            records: HashMap::new(),
        }
    }

    pub fn is_terminating(&self) -> bool {
        self.terminating
    }

    // TODO: Rename to `insert_actor`
    pub fn insert<T>(&mut self, address: Address<T>, group: A::GroupBy)
    where
        T: Actor + InstantActionHandler<Interrupt<A>>,
    {
        let stage = self.stages.entry(group.clone()).or_default();
        let id: Id = address.id().into();
        stage.ids.insert(id.clone());
        // TODO: Use the same `stopper` like `LiteTasks` does. The problem it's not cloneable.
        // TODO: Use `schedule` queue with oneshot to avoid blocking of queue drain handlers
        let notifier = <dyn LifecycleNotifier<_>>::once(address, Operation::Forward);
        let mut record = Record { group, notifier };
        if stage.terminating {
            log::warn!(
                "Actor added into the terminating state (interrupt it immediately): {}",
                id
            );
            if let Err(err) = record.interrupt() {
                log::error!("Can't interrupt actor {:?} immediately: {}", id, err);
            }
        }
        self.records.insert(id, record);
    }

    pub fn insert_task<T>(&mut self, stopper: TaskAddress<T>, group: A::GroupBy)
    where
        T: LiteTask,
    {
        let stage = self.stages.entry(group.clone()).or_default();
        let id: Id = stopper.id().into();
        stage.ids.insert(id.clone());
        let notifier = <dyn LifecycleNotifier<_>>::stop(stopper);
        let mut record = Record { group, notifier };
        if stage.terminating {
            log::warn!(
                "Task added into the terminating state (interrupt it immediately): {}",
                id
            );
            // But this event will never received, because LiteTasks can't do that.
            // Instead it will set stop signal to watcher.
            if let Err(err) = record.interrupt() {
                log::error!("Can't interrupt task {:?} immediately: {}", id, err);
            }
        }
        self.records.insert(id, record);
    }

    pub fn remove(&mut self, id: &Id) {
        if let Some(record) = self.records.remove(id) {
            if let Some(stage) = self.stages.get_mut(&record.group) {
                stage.ids.remove(id);
            }
        }
        if self.terminating {
            self.try_terminate_next();
        }
    }

    // TODO: Change `Vec` to `OrderedSet`
    pub fn termination_sequence(&mut self, groups: Vec<A::GroupBy>) {
        self.prioritized = groups;
    }

    pub fn is_finished(&self) -> bool {
        self.stages.values().all(Stage::is_finished)
    }

    fn stage_sequence(&self) -> Vec<A::GroupBy> {
        let stages_to_term: HashSet<_> = self.stages.keys().cloned().collect();
        let prior_set: HashSet<_> = self.prioritized.iter().cloned().collect();
        let remained = stages_to_term.difference(&prior_set).cloned();
        let mut sequence = self.prioritized.clone();
        sequence.extend(remained);
        sequence
    }

    pub fn terminate_group(&mut self, group: A::GroupBy) {
        if let Some(stage) = self.stages.get(&group) {
            for id in stage.ids.iter() {
                if let Some(record) = self.records.get_mut(id) {
                    if let Err(err) = record.interrupt() {
                        // TODO: Add `Group` name to logs?
                        log::error!(
                            "Can't send interruption signal to {:?} for a group termination: {}",
                            id,
                            err,
                        );
                    }
                }
            }
        }
    }

    fn try_terminate_next(&mut self) {
        self.terminating = true;
        for stage_name in self.stage_sequence() {
            if let Some(stage) = self.stages.get_mut(&stage_name) {
                if !stage.terminating {
                    stage.terminating = true;
                    for id in stage.ids.iter() {
                        if let Some(record) = self.records.get_mut(id) {
                            if let Err(err) = record.interrupt() {
                                log::error!(
                                    "Can't notify the supervisor about actor with {:?} termination: {}",
                                    id,
                                    err
                                );
                            }
                        }
                    }
                }
                if !stage.is_finished() {
                    break;
                }
            }
        }
    }

    pub fn start_termination(&mut self) {
        self.try_terminate_next();
    }
}

pub(crate) trait LifecycleNotifier<P>: Send {
    fn notify(&mut self, parameter: P) -> Result<(), Error>;
}

impl<T, P> LifecycleNotifier<P> for T
where
    T: FnMut(P) -> Result<(), Error>,
    T: Send,
{
    fn notify(&mut self, parameter: P) -> Result<(), Error> {
        (self)(parameter)
    }
}

impl<P> dyn LifecycleNotifier<P> {
    pub fn ignore() -> Box<Self> {
        Box::new(|_| Ok(()))
    }

    // TODO: Make it `async` and take priorities into account
    pub fn once<A>(address: Address<A>, operation: Operation) -> Box<Self>
    where
        A: Actor + InstantActionHandler<P>,
        P: InstantAction,
    {
        let notifier = move |msg| {
            // TODO: Take the priority into account (don't put all in hp)
            let parcel = Parcel::new(operation.clone(), msg);
            address.unpack_parcel(parcel)
        };
        Box::new(notifier)
    }

    pub fn stop<T: LiteTask>(stopper: TaskAddress<T>) -> Box<Self> {
        let notifier = move |_| stopper.stop();
        Box::new(notifier)
    }
}

/// This message sent by a `Supervisor` to a spawned child actor.
#[derive(Debug)]
pub(crate) struct Awake<T: Actor> {
    _origin: PhantomData<T>,
}

impl<T: Actor> Awake<T> {
    pub(crate) fn new() -> Self {
        Self {
            _origin: PhantomData,
        }
    }
}

// High priority to indicate it will be called first.
impl<T: Actor> InstantAction for Awake<T> {}

/// The event to ask an `Actor` to interrupt its activity.
#[derive(Debug)]
pub(crate) struct Interrupt<T: Actor> {
    _origin: PhantomData<T>,
}

impl<T: Actor> Interrupt<T> {
    pub(crate) fn new() -> Self {
        Self {
            _origin: PhantomData,
        }
    }
}

// `Interrupt` event has high-priority,
// because all actors have to react to it as fast
// as possible even in case when all queues are full.
impl<T: Actor> InstantAction for Interrupt<T> {}

/// Notifies when `Actor`'s activity is completed.
#[derive(Debug)]
pub(crate) struct Done<T: Actor> {
    pub id: IdOf<T>,
}

impl<T: Actor> Done<T> {
    pub(crate) fn new(id: IdOf<T>) -> Self {
        Self { id }
    }
}

// This type of messages can be send in hp queue only, because if the
// normal channel will be full that this message can block the thread
// that have to notify the actor. It can be high-priority only.
impl<T: Actor> InstantAction for Done<T> {}

#[derive(Debug)]
pub(crate) struct TaskDone<T: LiteTask, M> {
    pub id: IdOf<T>,
    pub tag: M,
    pub result: Result<T::Output, TaskError>,
}

impl<T: LiteTask, M> TaskDone<T, M> {
    pub(crate) fn new(id: IdOf<T>, tag: M, result: Result<T::Output, TaskError>) -> Self {
        Self { id, tag, result }
    }
}

// It's high priority, because it's impossible to use a channel with limited
// size for this type of messages.
impl<T: LiteTask, M: Tag> InstantAction for TaskDone<T, M> {}