use rust_supervisor::id::types::ChildId;
use rust_supervisor::policy::task_role_defaults::TaskRole;
use rust_supervisor::spec::child::{ChildSpec, Criticality, ShutdownPolicy, TaskKind};
use rust_supervisor::task::context::TaskContext;
use rust_supervisor::task::factory::{TaskResult, service_fn};
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::sync::mpsc;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SupervisorEvent {
Initialized {
child_id: String,
path: String,
},
Running {
child_id: String,
tick: u64,
},
Stopping {
child_id: String,
},
}
pub fn supervisor_role_child(events: mpsc::UnboundedSender<SupervisorEvent>) -> ChildSpec {
let factory = service_fn(move |ctx: TaskContext| {
let events = events.clone();
async move { run_supervisor_role_unit(ctx, events).await }
});
let mut child = ChildSpec::worker(
ChildId::new("nested-supervisor-unit"),
"Nested Supervisor Unit",
TaskKind::AsyncWorker,
Arc::new(factory),
);
child.task_role = Some(TaskRole::Supervisor);
child.criticality = Criticality::Critical;
child.tags = vec!["supervisor".to_owned(), "nested".to_owned()];
child.shutdown_policy =
ShutdownPolicy::new(Duration::from_millis(150), Duration::from_millis(50));
child
}
async fn run_supervisor_role_unit(
ctx: TaskContext,
events: mpsc::UnboundedSender<SupervisorEvent>,
) -> TaskResult {
ctx.mark_ready();
ctx.heartbeat();
let _ignored = events.send(SupervisorEvent::Initialized {
child_id: ctx.child_id.value.clone(),
path: ctx.path.to_string(),
});
let mut interval = tokio::time::interval(Duration::from_secs(1));
let cancellation_token = ctx.cancellation_token();
let mut tick = 0_u64;
loop {
tokio::select! {
_ = cancellation_token.cancelled() => {
let _ignored = events.send(SupervisorEvent::Stopping {
child_id: ctx.child_id.value.clone(),
});
return TaskResult::Cancelled;
}
_ = interval.tick() => {
tick += 1;
run_supervisor_business(&ctx, &events, tick);
}
}
}
}
fn run_supervisor_business(
ctx: &TaskContext,
events: &mpsc::UnboundedSender<SupervisorEvent>,
tick: u64,
) {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default();
println!(
"supervisor business: child={} tick={} now_unix={}.{:09}",
ctx.child_id,
tick,
now.as_secs(),
now.subsec_nanos()
);
ctx.heartbeat();
let _ignored = events.send(SupervisorEvent::Running {
child_id: ctx.child_id.value.clone(),
tick,
});
}