Skip to main content

swarm_engine_core/telemetry/
layer.rs

1//! SwarmLayer - tracing Layer による観測層
2//!
3//! Orchestrator が発行する tracing イベントを SwarmEvent に変換し、
4//! broadcast channel で Consumer に配信する。
5
6use tokio::sync::broadcast;
7use tracing::field::{Field, Visit};
8use tracing::Subscriber;
9use tracing_subscriber::layer::Context;
10use tracing_subscriber::Layer;
11
12use super::event::{ManagerState, SwarmEvent, TickMetrics};
13use crate::types::TaskId;
14
15/// Swarm イベントを配信する tracing Layer
16pub struct SwarmLayer {
17    tx: broadcast::Sender<SwarmEvent>,
18}
19
20impl SwarmLayer {
21    /// 新規作成
22    ///
23    /// # Arguments
24    /// * `capacity` - broadcast channel のバッファサイズ
25    ///
26    /// # Returns
27    /// * `SwarmLayer` - tracing Layer
28    /// * `broadcast::Receiver<SwarmEvent>` - 最初の Receiver
29    pub fn new(capacity: usize) -> (Self, broadcast::Receiver<SwarmEvent>) {
30        let (tx, rx) = broadcast::channel(capacity);
31        (Self { tx }, rx)
32    }
33
34    /// 追加の Receiver を取得
35    pub fn subscribe(&self) -> broadcast::Receiver<SwarmEvent> {
36        self.tx.subscribe()
37    }
38
39    /// Sender を Arc でラップして返す(Layer 登録後にも subscribe 可能にする)
40    pub fn sender(&self) -> broadcast::Sender<SwarmEvent> {
41        self.tx.clone()
42    }
43}
44
45/// tracing イベントから値を抽出する Visitor
46struct SwarmEventVisitor {
47    // 共通フィールド
48    message: Option<String>,
49
50    // tick 関連
51    tick: Option<u64>,
52    duration_ns: Option<u64>,
53
54    // metrics 関連
55    total_actions: Option<u64>,
56    successful_actions: Option<u64>,
57    failed_actions: Option<u64>,
58    active_workers: Option<usize>,
59
60    // manager 関連
61    manager_id: Option<usize>,
62    manager_state: Option<String>,
63
64    // worker 関連
65    worker_id: Option<usize>,
66    worker_count: Option<usize>,
67    action: Option<String>,
68    success: Option<bool>,
69
70    // async task 関連
71    task_id: Option<u64>,
72    duration_ms: Option<u64>,
73
74    // system 関連
75    total_ticks: Option<u64>,
76    total_duration_ms: Option<u64>,
77}
78
79impl SwarmEventVisitor {
80    fn new() -> Self {
81        Self {
82            message: None,
83            tick: None,
84            duration_ns: None,
85            total_actions: None,
86            successful_actions: None,
87            failed_actions: None,
88            active_workers: None,
89            manager_id: None,
90            manager_state: None,
91            worker_id: None,
92            worker_count: None,
93            action: None,
94            success: None,
95            task_id: None,
96            duration_ms: None,
97            total_ticks: None,
98            total_duration_ms: None,
99        }
100    }
101
102    /// 収集した値から SwarmEvent を生成
103    fn into_event(self) -> Option<SwarmEvent> {
104        let msg = self.message.as_deref()?;
105
106        match msg {
107            "tick_start" => Some(SwarmEvent::TickStart { tick: self.tick? }),
108
109            "tick_complete" => Some(SwarmEvent::TickComplete {
110                tick: self.tick?,
111                duration_ns: self.duration_ns.unwrap_or(0),
112                metrics: TickMetrics {
113                    total_actions: self.total_actions.unwrap_or(0),
114                    successful_actions: self.successful_actions.unwrap_or(0),
115                    failed_actions: self.failed_actions.unwrap_or(0),
116                    active_workers: self.active_workers.unwrap_or(0),
117                },
118            }),
119
120            "manager_state_change" => Some(SwarmEvent::ManagerStateChange {
121                manager_id: self.manager_id?,
122                new_state: parse_manager_state(self.manager_state.as_deref()),
123            }),
124
125            "worker_action" => Some(SwarmEvent::WorkerAction {
126                worker_id: self.worker_id?,
127                action: self.action.unwrap_or_default(),
128                success: self.success.unwrap_or(true),
129            }),
130
131            "async_task_complete" => Some(SwarmEvent::AsyncTaskComplete {
132                task_id: TaskId(self.task_id?),
133                duration_ms: self.duration_ms.unwrap_or(0),
134            }),
135
136            "system_start" => Some(SwarmEvent::SystemStart {
137                worker_count: self.worker_count.unwrap_or(0),
138            }),
139
140            "system_stop" => Some(SwarmEvent::SystemStop {
141                total_ticks: self.total_ticks.unwrap_or(0),
142                total_duration_ms: self.total_duration_ms.unwrap_or(0),
143            }),
144
145            _ => None,
146        }
147    }
148}
149
150impl Visit for SwarmEventVisitor {
151    fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
152        if field.name() == "message" {
153            self.message = Some(format!("{:?}", value).trim_matches('"').to_string());
154        }
155    }
156
157    fn record_str(&mut self, field: &Field, value: &str) {
158        match field.name() {
159            "message" => self.message = Some(value.to_string()),
160            "manager_state" => self.manager_state = Some(value.to_string()),
161            "action" => self.action = Some(value.to_string()),
162            _ => {}
163        }
164    }
165
166    fn record_u64(&mut self, field: &Field, value: u64) {
167        match field.name() {
168            "tick" => self.tick = Some(value),
169            "duration_ns" => self.duration_ns = Some(value),
170            "total_actions" => self.total_actions = Some(value),
171            "successful_actions" => self.successful_actions = Some(value),
172            "failed_actions" => self.failed_actions = Some(value),
173            "active_workers" => self.active_workers = Some(value as usize),
174            "manager_id" => self.manager_id = Some(value as usize),
175            "worker_id" => self.worker_id = Some(value as usize),
176            "worker_count" => self.worker_count = Some(value as usize),
177            "task_id" => self.task_id = Some(value),
178            "duration_ms" => self.duration_ms = Some(value),
179            "total_ticks" => self.total_ticks = Some(value),
180            "total_duration_ms" => self.total_duration_ms = Some(value),
181            _ => {}
182        }
183    }
184
185    fn record_i64(&mut self, field: &Field, value: i64) {
186        // u64 に変換して処理
187        if value >= 0 {
188            self.record_u64(field, value as u64);
189        }
190    }
191
192    fn record_bool(&mut self, field: &Field, value: bool) {
193        if field.name() == "success" {
194            self.success = Some(value);
195        }
196    }
197}
198
199fn parse_manager_state(s: Option<&str>) -> ManagerState {
200    match s {
201        Some("idle") => ManagerState::Idle,
202        Some("processing") => ManagerState::Processing,
203        Some("delegated") => ManagerState::Delegated,
204        Some("escalated") => ManagerState::Escalated,
205        Some("error") => ManagerState::Error,
206        _ => ManagerState::Idle,
207    }
208}
209
210impl<S> Layer<S> for SwarmLayer
211where
212    S: Subscriber,
213{
214    fn on_event(&self, event: &tracing::Event<'_>, _ctx: Context<'_, S>) {
215        let mut visitor = SwarmEventVisitor::new();
216        event.record(&mut visitor);
217
218        if let Some(swarm_event) = visitor.into_event() {
219            // ブロックしない。満杯なら古いイベントが drop される
220            let _ = self.tx.send(swarm_event);
221        }
222    }
223}
224
225/// SwarmLayer を作成して Sender を返すヘルパー
226///
227/// Layer を tracing に登録した後でも subscribe() できるように、
228/// Sender を返す。
229pub fn create_swarm_layer(
230    capacity: usize,
231) -> (
232    SwarmLayer,
233    broadcast::Sender<SwarmEvent>,
234    broadcast::Receiver<SwarmEvent>,
235) {
236    let (layer, rx) = SwarmLayer::new(capacity);
237    let tx = layer.sender();
238    (layer, tx, rx)
239}