1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
use agner_actors::{BoxError, Context, Event, Signal};
use agner_utils::std_error_pp::StdErrorPP;

use crate::fixed::hlist::HList;
use crate::fixed::restart_strategy::{Action, Instant};
use crate::fixed::{Decider, SupSpec};

use crate::fixed::sup_spec::SupSpecStartChild;

use super::sup_spec::SupSpecStopChild;
use super::RestartStrategy;

#[derive(Debug)]
pub enum Message {}

pub async fn fixed_sup<R, CS>(
    context: &mut Context<Message>,
    mut sup_spec: SupSpec<R, CS>,
) -> Result<(), BoxError>
where
    CS: HList,
    R: RestartStrategy,
    SupSpec<R, CS>: SupSpecStartChild<Message>,
    SupSpec<R, CS>: SupSpecStopChild<Message>,
{
    context.trap_exit(true).await;
    context.init_ack(Default::default());

    log::debug!("[{}] starting fixed sup with {} children", context.actor_id(), CS::LEN);

    let mut children = Vec::with_capacity(CS::LEN);

    for child_idx in 0..CS::LEN {
        log::trace!("[{}] initially starting child #{}...", context.actor_id(), child_idx);
        let child_id = sup_spec.start_child(context, child_idx).await?;
        children.push(child_id);
        log::trace!("[{}]   child #{}: {}", context.actor_id(), child_idx, child_id);
    }
    let children = children;
    assert_eq!(children.len(), CS::LEN);

    log::trace!(
        "[{}] initializing restart decider for {}",
        context.actor_id(),
        sup_spec.restart_strategy
    );
    let mut restart_decider =
        sup_spec.restart_strategy.new_decider(context.actor_id(), &children[..]);
    std::mem::drop(children);

    loop {
        if let Some(action) = restart_decider.next_action() {
            match action {
                Action::Exit(exit_reason) => {
                    log::debug!("[{}] exitting {}", context.actor_id(), exit_reason.pp());
                    context.exit(exit_reason).await;
                    unreachable!()
                },
                Action::Start(child_idx) => {
                    log::trace!("[{}] starting child #{}...", context.actor_id(), child_idx);
                    let child_id = sup_spec.start_child(context, child_idx).await?;
                    log::trace!("[{}]   child #{}: {}", context.actor_id(), child_idx, child_id);
                },
                Action::Stop(child_idx, actor_id, exit_reason) => {
                    log::trace!(
                        "[{}] stopping child #{} ({})...",
                        context.actor_id(),
                        child_idx,
                        actor_id
                    );
                    sup_spec.stop_child(context, child_idx, actor_id, exit_reason).await?;
                    log::trace!(
                        "[{}]   child #{} ({}) stopped",
                        context.actor_id(),
                        child_idx,
                        actor_id
                    );
                },
            }
        }

        let event = context.next_event().await;

        match event {
            Event::Message(message) => unimplemented!("message: {:?}", message),
            Event::Signal(Signal::Exit(actor_id, exit_reason)) =>
                restart_decider.actor_down(Instant::now(), actor_id, exit_reason),
        }
    }
}