Skip to main content

ao_core/
parity_observability.rs

1//! TS observability helpers (ported from `packages/core/src/observability.ts`).
2//!
3//! Parity status: test-only.
4//!
5//! No runtime consumer. Depends on `parity_metadata::atomic_write_file` for
6//! snapshot persistence during tests. See
7//! `docs/ts-core-parity-report.md` → "Parity-only modules".
8
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::path::{Path, PathBuf};
12
13pub type ObservabilityLevel = &'static str; // "debug" | "info" | "warn" | "error"
14pub type ObservabilityOutcome = &'static str; // "success" | "failure"
15pub type ObservabilityHealthStatus = &'static str; // "ok" | "warn" | "error"
16
17#[derive(Debug, Clone)]
18pub struct TsObservabilityConfig {
19    pub config_path: PathBuf,
20}
21
22#[derive(Debug, Clone, Serialize, Deserialize, Default)]
23pub struct ObservabilityMetricCounter {
24    pub total: u64,
25    pub success: u64,
26    pub failure: u64,
27    #[serde(skip_serializing_if = "Option::is_none")]
28    pub last_failure_reason: Option<String>,
29}
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct ObservabilityTraceRecord {
33    pub operation: String,
34    pub outcome: String,
35    pub correlation_id: String,
36    #[serde(skip_serializing_if = "Option::is_none")]
37    pub project_id: Option<String>,
38    #[serde(skip_serializing_if = "Option::is_none")]
39    pub session_id: Option<String>,
40    #[serde(skip_serializing_if = "Option::is_none")]
41    pub reason: Option<String>,
42}
43
44#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct ObservabilitySessionStatus {
46    pub session_id: String,
47    pub correlation_id: String,
48    pub operation: String,
49    pub outcome: String,
50    #[serde(skip_serializing_if = "Option::is_none")]
51    pub reason: Option<String>,
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct ObservabilityHealthSurface {
56    pub surface: String,
57    pub status: String,
58    pub component: String,
59    #[serde(skip_serializing_if = "Option::is_none")]
60    pub reason: Option<String>,
61}
62
63#[derive(Debug, Clone, Serialize, Deserialize, Default)]
64struct ProcessSnapshot {
65    pub component: String,
66    pub metrics: HashMap<String, ObservabilityMetricCounter>,
67    pub traces: Vec<ObservabilityTraceRecord>,
68    pub sessions: HashMap<String, ObservabilitySessionStatus>,
69    pub health: HashMap<String, ObservabilityHealthSurface>,
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize, Default)]
73pub struct ObservabilityProjectSnapshot {
74    pub metrics: HashMap<String, ObservabilityMetricCounter>,
75    pub recent_traces: Vec<ObservabilityTraceRecord>,
76    pub sessions: HashMap<String, ObservabilitySessionStatus>,
77    pub health: HashMap<String, ObservabilityHealthSurface>,
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize, Default)]
81pub struct ObservabilitySummary {
82    pub overall_status: String,
83    pub projects: HashMap<String, ObservabilityProjectSnapshot>,
84}
85
86pub struct RecordOperationInput {
87    pub metric: String,
88    pub operation: String,
89    pub outcome: String,
90    pub correlation_id: String,
91    pub project_id: Option<String>,
92    pub session_id: Option<String>,
93    pub reason: Option<String>,
94    pub level: Option<ObservabilityLevel>,
95}
96
97pub struct SetHealthInput {
98    pub surface: String,
99    pub status: String,
100    pub project_id: Option<String>,
101    pub correlation_id: Option<String>,
102    pub reason: Option<String>,
103}
104
105pub struct ProjectObserver {
106    config: TsObservabilityConfig,
107    component: String,
108}
109
110pub fn create_project_observer(config: TsObservabilityConfig, component: &str) -> ProjectObserver {
111    ProjectObserver {
112        config,
113        component: component.to_string(),
114    }
115}
116
117impl ProjectObserver {
118    pub fn record_operation(&self, input: RecordOperationInput) {
119        let mut snap = self.read_snapshot();
120        let ctr = snap.metrics.entry(input.metric.clone()).or_default();
121        ctr.total += 1;
122        if input.outcome == "success" {
123            ctr.success += 1;
124        } else {
125            ctr.failure += 1;
126            ctr.last_failure_reason = input.reason.clone();
127        }
128        snap.traces.push(ObservabilityTraceRecord {
129            operation: input.operation.clone(),
130            outcome: input.outcome.clone(),
131            correlation_id: input.correlation_id.clone(),
132            project_id: input.project_id.clone(),
133            session_id: input.session_id.clone(),
134            reason: input.reason.clone(),
135        });
136        if let Some(session_id) = &input.session_id {
137            snap.sessions.insert(
138                session_id.clone(),
139                ObservabilitySessionStatus {
140                    session_id: session_id.clone(),
141                    correlation_id: input.correlation_id,
142                    operation: input.operation,
143                    outcome: input.outcome,
144                    reason: input.reason,
145                },
146            );
147        }
148        self.write_snapshot(&snap);
149    }
150
151    pub fn set_health(&self, input: SetHealthInput) {
152        let mut snap = self.read_snapshot();
153        snap.health.insert(
154            input.surface.clone(),
155            ObservabilityHealthSurface {
156                surface: input.surface,
157                status: input.status,
158                component: self.component.clone(),
159                reason: input.reason,
160            },
161        );
162        self.write_snapshot(&snap);
163    }
164
165    fn snapshot_path(&self) -> PathBuf {
166        let base = self
167            .config
168            .config_path
169            .parent()
170            .unwrap_or(Path::new("."))
171            .join(".ao-observability")
172            .join("processes");
173        let _ = std::fs::create_dir_all(&base);
174        base.join(format!(
175            "{}-{}.json",
176            sanitize_component(&self.component),
177            std::process::id()
178        ))
179    }
180
181    fn read_snapshot(&self) -> ProcessSnapshot {
182        let path = self.snapshot_path();
183        let content = std::fs::read_to_string(&path).ok();
184        if let Some(content) = content {
185            serde_json::from_str(&content).unwrap_or_else(|_| ProcessSnapshot {
186                component: self.component.clone(),
187                ..Default::default()
188            })
189        } else {
190            ProcessSnapshot {
191                component: self.component.clone(),
192                ..Default::default()
193            }
194        }
195    }
196
197    fn write_snapshot(&self, snap: &ProcessSnapshot) {
198        let path = self.snapshot_path();
199        let payload = serde_json::to_string_pretty(snap).unwrap_or_else(|e| {
200            tracing::warn!("observability snapshot serialization failed: {e}");
201            "{}".into()
202        }) + "\n";
203        let _ = crate::parity_metadata::atomic_write_file(&path, &payload);
204    }
205}
206
207pub fn read_observability_summary(config: TsObservabilityConfig) -> ObservabilitySummary {
208    let processes_dir = config
209        .config_path
210        .parent()
211        .unwrap_or(Path::new("."))
212        .join(".ao-observability")
213        .join("processes");
214    let mut summary = ObservabilitySummary {
215        overall_status: "ok".into(),
216        ..Default::default()
217    };
218    let Ok(rd) = std::fs::read_dir(processes_dir) else {
219        return summary;
220    };
221    for ent in rd.flatten() {
222        let Ok(content) = std::fs::read_to_string(ent.path()) else {
223            continue;
224        };
225        let Ok(snap) = serde_json::from_str::<ProcessSnapshot>(&content) else {
226            continue;
227        };
228        for trace in &snap.traces {
229            if let Some(project_id) = &trace.project_id {
230                let p = summary.projects.entry(project_id.clone()).or_default();
231                p.recent_traces.push(trace.clone());
232            }
233        }
234        for (metric, ctr) in &snap.metrics {
235            for project_id in snap
236                .traces
237                .iter()
238                .filter_map(|t| t.project_id.clone())
239                .collect::<std::collections::HashSet<_>>()
240            {
241                let p = summary.projects.entry(project_id).or_default();
242                let c = p.metrics.entry(metric.clone()).or_default();
243                c.total += ctr.total;
244                c.success += ctr.success;
245                c.failure += ctr.failure;
246                if c.last_failure_reason.is_none() {
247                    c.last_failure_reason = ctr.last_failure_reason.clone();
248                }
249            }
250        }
251        for (sid, s) in &snap.sessions {
252            if let Some(project_id) = snap
253                .traces
254                .iter()
255                .rev()
256                .find(|t| t.session_id.as_deref() == Some(sid))
257                .and_then(|t| t.project_id.clone())
258            {
259                let p = summary.projects.entry(project_id).or_default();
260                p.sessions.insert(sid.clone(), s.clone());
261            }
262        }
263        for (surface, h) in &snap.health {
264            let status = h.status.as_str();
265            if status_priority(status) > status_priority(&summary.overall_status) {
266                summary.overall_status = status.to_string();
267            }
268            if let Some(project_id) = snap.traces.iter().rev().find_map(|t| t.project_id.clone()) {
269                let p = summary.projects.entry(project_id).or_default();
270                p.health.insert(surface.clone(), h.clone());
271            }
272        }
273    }
274    for p in summary.projects.values() {
275        for h in p.health.values() {
276            if status_priority(&h.status) > status_priority(&summary.overall_status) {
277                summary.overall_status = h.status.clone();
278            }
279        }
280    }
281    summary
282}
283
284fn status_priority(s: &str) -> u8 {
285    match s {
286        "error" => 3,
287        "warn" => 2,
288        _ => 1,
289    }
290}
291
292fn sanitize_component(component: &str) -> String {
293    let cleaned = component
294        .chars()
295        .map(|c| {
296            if c.is_ascii_alphanumeric() || c == '_' || c == '-' {
297                c
298            } else {
299                '-'
300            }
301        })
302        .collect::<String>();
303    cleaned.trim_matches('-').to_string()
304}