use rust_supervisor::id::types::ChildId;
use rust_supervisor::policy::task_role_defaults::TaskRole;
use rust_supervisor::spec::child::{ChildSpec, Criticality, ShutdownPolicy, TaskKind};
use rust_supervisor::task::context::TaskContext;
use rust_supervisor::task::factory::{TaskResult, service_fn};
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::sync::mpsc;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum WorkerEvent {
Initialized {
child_id: String,
},
Running {
child_id: String,
batch: u64,
},
Completed {
child_id: String,
},
}
pub fn worker_child(events: mpsc::UnboundedSender<WorkerEvent>) -> ChildSpec {
let factory = service_fn(move |ctx: TaskContext| {
let events = events.clone();
async move { run_worker(ctx, events).await }
});
let mut child = ChildSpec::worker(
ChildId::new("invoice-worker"),
"Invoice Worker",
TaskKind::AsyncWorker,
Arc::new(factory),
);
child.task_role = Some(TaskRole::Worker);
child.criticality = Criticality::Critical;
child.tags = vec!["worker".to_owned(), "invoice".to_owned()];
child.shutdown_policy =
ShutdownPolicy::new(Duration::from_millis(150), Duration::from_millis(50));
child
}
async fn run_worker(ctx: TaskContext, events: mpsc::UnboundedSender<WorkerEvent>) -> TaskResult {
ctx.mark_ready();
ctx.heartbeat();
let _ignored = events.send(WorkerEvent::Initialized {
child_id: ctx.child_id.value.clone(),
});
for batch in 1..=3 {
tokio::time::sleep(Duration::from_millis(200)).await;
run_invoice_worker_business(&ctx, &events, batch);
}
let _ignored = events.send(WorkerEvent::Completed {
child_id: ctx.child_id.value.clone(),
});
TaskResult::Succeeded
}
fn run_invoice_worker_business(
ctx: &TaskContext,
events: &mpsc::UnboundedSender<WorkerEvent>,
batch: u64,
) {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default();
println!(
"worker business: child={} batch={} now_unix={}.{:09}",
ctx.child_id,
batch,
now.as_secs(),
now.subsec_nanos()
);
ctx.heartbeat();
let _ignored = events.send(WorkerEvent::Running {
child_id: ctx.child_id.value.clone(),
batch,
});
}