swarm_engine_core/telemetry/
layer.rs1use tokio::sync::broadcast;
7use tracing::field::{Field, Visit};
8use tracing::Subscriber;
9use tracing_subscriber::layer::Context;
10use tracing_subscriber::Layer;
11
12use super::event::{ManagerState, SwarmEvent, TickMetrics};
13use crate::types::TaskId;
14
15pub struct SwarmLayer {
17 tx: broadcast::Sender<SwarmEvent>,
18}
19
20impl SwarmLayer {
21 pub fn new(capacity: usize) -> (Self, broadcast::Receiver<SwarmEvent>) {
30 let (tx, rx) = broadcast::channel(capacity);
31 (Self { tx }, rx)
32 }
33
34 pub fn subscribe(&self) -> broadcast::Receiver<SwarmEvent> {
36 self.tx.subscribe()
37 }
38
39 pub fn sender(&self) -> broadcast::Sender<SwarmEvent> {
41 self.tx.clone()
42 }
43}
44
45struct SwarmEventVisitor {
47 message: Option<String>,
49
50 tick: Option<u64>,
52 duration_ns: Option<u64>,
53
54 total_actions: Option<u64>,
56 successful_actions: Option<u64>,
57 failed_actions: Option<u64>,
58 active_workers: Option<usize>,
59
60 manager_id: Option<usize>,
62 manager_state: Option<String>,
63
64 worker_id: Option<usize>,
66 worker_count: Option<usize>,
67 action: Option<String>,
68 success: Option<bool>,
69
70 task_id: Option<u64>,
72 duration_ms: Option<u64>,
73
74 total_ticks: Option<u64>,
76 total_duration_ms: Option<u64>,
77}
78
79impl SwarmEventVisitor {
80 fn new() -> Self {
81 Self {
82 message: None,
83 tick: None,
84 duration_ns: None,
85 total_actions: None,
86 successful_actions: None,
87 failed_actions: None,
88 active_workers: None,
89 manager_id: None,
90 manager_state: None,
91 worker_id: None,
92 worker_count: None,
93 action: None,
94 success: None,
95 task_id: None,
96 duration_ms: None,
97 total_ticks: None,
98 total_duration_ms: None,
99 }
100 }
101
102 fn into_event(self) -> Option<SwarmEvent> {
104 let msg = self.message.as_deref()?;
105
106 match msg {
107 "tick_start" => Some(SwarmEvent::TickStart { tick: self.tick? }),
108
109 "tick_complete" => Some(SwarmEvent::TickComplete {
110 tick: self.tick?,
111 duration_ns: self.duration_ns.unwrap_or(0),
112 metrics: TickMetrics {
113 total_actions: self.total_actions.unwrap_or(0),
114 successful_actions: self.successful_actions.unwrap_or(0),
115 failed_actions: self.failed_actions.unwrap_or(0),
116 active_workers: self.active_workers.unwrap_or(0),
117 },
118 }),
119
120 "manager_state_change" => Some(SwarmEvent::ManagerStateChange {
121 manager_id: self.manager_id?,
122 new_state: parse_manager_state(self.manager_state.as_deref()),
123 }),
124
125 "worker_action" => Some(SwarmEvent::WorkerAction {
126 worker_id: self.worker_id?,
127 action: self.action.unwrap_or_default(),
128 success: self.success.unwrap_or(true),
129 }),
130
131 "async_task_complete" => Some(SwarmEvent::AsyncTaskComplete {
132 task_id: TaskId(self.task_id?),
133 duration_ms: self.duration_ms.unwrap_or(0),
134 }),
135
136 "system_start" => Some(SwarmEvent::SystemStart {
137 worker_count: self.worker_count.unwrap_or(0),
138 }),
139
140 "system_stop" => Some(SwarmEvent::SystemStop {
141 total_ticks: self.total_ticks.unwrap_or(0),
142 total_duration_ms: self.total_duration_ms.unwrap_or(0),
143 }),
144
145 _ => None,
146 }
147 }
148}
149
150impl Visit for SwarmEventVisitor {
151 fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
152 if field.name() == "message" {
153 self.message = Some(format!("{:?}", value).trim_matches('"').to_string());
154 }
155 }
156
157 fn record_str(&mut self, field: &Field, value: &str) {
158 match field.name() {
159 "message" => self.message = Some(value.to_string()),
160 "manager_state" => self.manager_state = Some(value.to_string()),
161 "action" => self.action = Some(value.to_string()),
162 _ => {}
163 }
164 }
165
166 fn record_u64(&mut self, field: &Field, value: u64) {
167 match field.name() {
168 "tick" => self.tick = Some(value),
169 "duration_ns" => self.duration_ns = Some(value),
170 "total_actions" => self.total_actions = Some(value),
171 "successful_actions" => self.successful_actions = Some(value),
172 "failed_actions" => self.failed_actions = Some(value),
173 "active_workers" => self.active_workers = Some(value as usize),
174 "manager_id" => self.manager_id = Some(value as usize),
175 "worker_id" => self.worker_id = Some(value as usize),
176 "worker_count" => self.worker_count = Some(value as usize),
177 "task_id" => self.task_id = Some(value),
178 "duration_ms" => self.duration_ms = Some(value),
179 "total_ticks" => self.total_ticks = Some(value),
180 "total_duration_ms" => self.total_duration_ms = Some(value),
181 _ => {}
182 }
183 }
184
185 fn record_i64(&mut self, field: &Field, value: i64) {
186 if value >= 0 {
188 self.record_u64(field, value as u64);
189 }
190 }
191
192 fn record_bool(&mut self, field: &Field, value: bool) {
193 if field.name() == "success" {
194 self.success = Some(value);
195 }
196 }
197}
198
199fn parse_manager_state(s: Option<&str>) -> ManagerState {
200 match s {
201 Some("idle") => ManagerState::Idle,
202 Some("processing") => ManagerState::Processing,
203 Some("delegated") => ManagerState::Delegated,
204 Some("escalated") => ManagerState::Escalated,
205 Some("error") => ManagerState::Error,
206 _ => ManagerState::Idle,
207 }
208}
209
210impl<S> Layer<S> for SwarmLayer
211where
212 S: Subscriber,
213{
214 fn on_event(&self, event: &tracing::Event<'_>, _ctx: Context<'_, S>) {
215 let mut visitor = SwarmEventVisitor::new();
216 event.record(&mut visitor);
217
218 if let Some(swarm_event) = visitor.into_event() {
219 let _ = self.tx.send(swarm_event);
221 }
222 }
223}
224
225pub fn create_swarm_layer(
230 capacity: usize,
231) -> (
232 SwarmLayer,
233 broadcast::Sender<SwarmEvent>,
234 broadcast::Receiver<SwarmEvent>,
235) {
236 let (layer, rx) = SwarmLayer::new(capacity);
237 let tx = layer.sender();
238 (layer, tx, rx)
239}