use tokio::sync::broadcast;
use tracing::field::{Field, Visit};
use tracing::Subscriber;
use tracing_subscriber::layer::Context;
use tracing_subscriber::Layer;
use super::event::{ManagerState, SwarmEvent, TickMetrics};
use crate::types::TaskId;
pub struct SwarmLayer {
tx: broadcast::Sender<SwarmEvent>,
}
impl SwarmLayer {
pub fn new(capacity: usize) -> (Self, broadcast::Receiver<SwarmEvent>) {
let (tx, rx) = broadcast::channel(capacity);
(Self { tx }, rx)
}
pub fn subscribe(&self) -> broadcast::Receiver<SwarmEvent> {
self.tx.subscribe()
}
pub fn sender(&self) -> broadcast::Sender<SwarmEvent> {
self.tx.clone()
}
}
struct SwarmEventVisitor {
message: Option<String>,
tick: Option<u64>,
duration_ns: Option<u64>,
total_actions: Option<u64>,
successful_actions: Option<u64>,
failed_actions: Option<u64>,
active_workers: Option<usize>,
manager_id: Option<usize>,
manager_state: Option<String>,
worker_id: Option<usize>,
worker_count: Option<usize>,
action: Option<String>,
success: Option<bool>,
task_id: Option<u64>,
duration_ms: Option<u64>,
total_ticks: Option<u64>,
total_duration_ms: Option<u64>,
}
impl SwarmEventVisitor {
fn new() -> Self {
Self {
message: None,
tick: None,
duration_ns: None,
total_actions: None,
successful_actions: None,
failed_actions: None,
active_workers: None,
manager_id: None,
manager_state: None,
worker_id: None,
worker_count: None,
action: None,
success: None,
task_id: None,
duration_ms: None,
total_ticks: None,
total_duration_ms: None,
}
}
fn into_event(self) -> Option<SwarmEvent> {
let msg = self.message.as_deref()?;
match msg {
"tick_start" => Some(SwarmEvent::TickStart { tick: self.tick? }),
"tick_complete" => Some(SwarmEvent::TickComplete {
tick: self.tick?,
duration_ns: self.duration_ns.unwrap_or(0),
metrics: TickMetrics {
total_actions: self.total_actions.unwrap_or(0),
successful_actions: self.successful_actions.unwrap_or(0),
failed_actions: self.failed_actions.unwrap_or(0),
active_workers: self.active_workers.unwrap_or(0),
},
}),
"manager_state_change" => Some(SwarmEvent::ManagerStateChange {
manager_id: self.manager_id?,
new_state: parse_manager_state(self.manager_state.as_deref()),
}),
"worker_action" => Some(SwarmEvent::WorkerAction {
worker_id: self.worker_id?,
action: self.action.unwrap_or_default(),
success: self.success.unwrap_or(true),
}),
"async_task_complete" => Some(SwarmEvent::AsyncTaskComplete {
task_id: TaskId(self.task_id?),
duration_ms: self.duration_ms.unwrap_or(0),
}),
"system_start" => Some(SwarmEvent::SystemStart {
worker_count: self.worker_count.unwrap_or(0),
}),
"system_stop" => Some(SwarmEvent::SystemStop {
total_ticks: self.total_ticks.unwrap_or(0),
total_duration_ms: self.total_duration_ms.unwrap_or(0),
}),
_ => None,
}
}
}
impl Visit for SwarmEventVisitor {
fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
if field.name() == "message" {
self.message = Some(format!("{:?}", value).trim_matches('"').to_string());
}
}
fn record_str(&mut self, field: &Field, value: &str) {
match field.name() {
"message" => self.message = Some(value.to_string()),
"manager_state" => self.manager_state = Some(value.to_string()),
"action" => self.action = Some(value.to_string()),
_ => {}
}
}
fn record_u64(&mut self, field: &Field, value: u64) {
match field.name() {
"tick" => self.tick = Some(value),
"duration_ns" => self.duration_ns = Some(value),
"total_actions" => self.total_actions = Some(value),
"successful_actions" => self.successful_actions = Some(value),
"failed_actions" => self.failed_actions = Some(value),
"active_workers" => self.active_workers = Some(value as usize),
"manager_id" => self.manager_id = Some(value as usize),
"worker_id" => self.worker_id = Some(value as usize),
"worker_count" => self.worker_count = Some(value as usize),
"task_id" => self.task_id = Some(value),
"duration_ms" => self.duration_ms = Some(value),
"total_ticks" => self.total_ticks = Some(value),
"total_duration_ms" => self.total_duration_ms = Some(value),
_ => {}
}
}
fn record_i64(&mut self, field: &Field, value: i64) {
if value >= 0 {
self.record_u64(field, value as u64);
}
}
fn record_bool(&mut self, field: &Field, value: bool) {
if field.name() == "success" {
self.success = Some(value);
}
}
}
fn parse_manager_state(s: Option<&str>) -> ManagerState {
match s {
Some("idle") => ManagerState::Idle,
Some("processing") => ManagerState::Processing,
Some("delegated") => ManagerState::Delegated,
Some("escalated") => ManagerState::Escalated,
Some("error") => ManagerState::Error,
_ => ManagerState::Idle,
}
}
impl<S> Layer<S> for SwarmLayer
where
S: Subscriber,
{
fn on_event(&self, event: &tracing::Event<'_>, _ctx: Context<'_, S>) {
let mut visitor = SwarmEventVisitor::new();
event.record(&mut visitor);
if let Some(swarm_event) = visitor.into_event() {
let _ = self.tx.send(swarm_event);
}
}
}
pub fn create_swarm_layer(
capacity: usize,
) -> (
SwarmLayer,
broadcast::Sender<SwarmEvent>,
broadcast::Receiver<SwarmEvent>,
) {
let (layer, rx) = SwarmLayer::new(capacity);
let tx = layer.sender();
(layer, tx, rx)
}