1use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::path::{Path, PathBuf};
12
13pub type ObservabilityLevel = &'static str; pub type ObservabilityOutcome = &'static str; pub type ObservabilityHealthStatus = &'static str; #[derive(Debug, Clone)]
18pub struct TsObservabilityConfig {
19 pub config_path: PathBuf,
20}
21
22#[derive(Debug, Clone, Serialize, Deserialize, Default)]
23pub struct ObservabilityMetricCounter {
24 pub total: u64,
25 pub success: u64,
26 pub failure: u64,
27 #[serde(skip_serializing_if = "Option::is_none")]
28 pub last_failure_reason: Option<String>,
29}
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct ObservabilityTraceRecord {
33 pub operation: String,
34 pub outcome: String,
35 pub correlation_id: String,
36 #[serde(skip_serializing_if = "Option::is_none")]
37 pub project_id: Option<String>,
38 #[serde(skip_serializing_if = "Option::is_none")]
39 pub session_id: Option<String>,
40 #[serde(skip_serializing_if = "Option::is_none")]
41 pub reason: Option<String>,
42}
43
44#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct ObservabilitySessionStatus {
46 pub session_id: String,
47 pub correlation_id: String,
48 pub operation: String,
49 pub outcome: String,
50 #[serde(skip_serializing_if = "Option::is_none")]
51 pub reason: Option<String>,
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct ObservabilityHealthSurface {
56 pub surface: String,
57 pub status: String,
58 pub component: String,
59 #[serde(skip_serializing_if = "Option::is_none")]
60 pub reason: Option<String>,
61}
62
63#[derive(Debug, Clone, Serialize, Deserialize, Default)]
64struct ProcessSnapshot {
65 pub component: String,
66 pub metrics: HashMap<String, ObservabilityMetricCounter>,
67 pub traces: Vec<ObservabilityTraceRecord>,
68 pub sessions: HashMap<String, ObservabilitySessionStatus>,
69 pub health: HashMap<String, ObservabilityHealthSurface>,
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize, Default)]
73pub struct ObservabilityProjectSnapshot {
74 pub metrics: HashMap<String, ObservabilityMetricCounter>,
75 pub recent_traces: Vec<ObservabilityTraceRecord>,
76 pub sessions: HashMap<String, ObservabilitySessionStatus>,
77 pub health: HashMap<String, ObservabilityHealthSurface>,
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize, Default)]
81pub struct ObservabilitySummary {
82 pub overall_status: String,
83 pub projects: HashMap<String, ObservabilityProjectSnapshot>,
84}
85
86pub struct RecordOperationInput {
87 pub metric: String,
88 pub operation: String,
89 pub outcome: String,
90 pub correlation_id: String,
91 pub project_id: Option<String>,
92 pub session_id: Option<String>,
93 pub reason: Option<String>,
94 pub level: Option<ObservabilityLevel>,
95}
96
97pub struct SetHealthInput {
98 pub surface: String,
99 pub status: String,
100 pub project_id: Option<String>,
101 pub correlation_id: Option<String>,
102 pub reason: Option<String>,
103}
104
105pub struct ProjectObserver {
106 config: TsObservabilityConfig,
107 component: String,
108}
109
110pub fn create_project_observer(config: TsObservabilityConfig, component: &str) -> ProjectObserver {
111 ProjectObserver {
112 config,
113 component: component.to_string(),
114 }
115}
116
117impl ProjectObserver {
118 pub fn record_operation(&self, input: RecordOperationInput) {
119 let mut snap = self.read_snapshot();
120 let ctr = snap.metrics.entry(input.metric.clone()).or_default();
121 ctr.total += 1;
122 if input.outcome == "success" {
123 ctr.success += 1;
124 } else {
125 ctr.failure += 1;
126 ctr.last_failure_reason = input.reason.clone();
127 }
128 snap.traces.push(ObservabilityTraceRecord {
129 operation: input.operation.clone(),
130 outcome: input.outcome.clone(),
131 correlation_id: input.correlation_id.clone(),
132 project_id: input.project_id.clone(),
133 session_id: input.session_id.clone(),
134 reason: input.reason.clone(),
135 });
136 if let Some(session_id) = &input.session_id {
137 snap.sessions.insert(
138 session_id.clone(),
139 ObservabilitySessionStatus {
140 session_id: session_id.clone(),
141 correlation_id: input.correlation_id,
142 operation: input.operation,
143 outcome: input.outcome,
144 reason: input.reason,
145 },
146 );
147 }
148 self.write_snapshot(&snap);
149 }
150
151 pub fn set_health(&self, input: SetHealthInput) {
152 let mut snap = self.read_snapshot();
153 snap.health.insert(
154 input.surface.clone(),
155 ObservabilityHealthSurface {
156 surface: input.surface,
157 status: input.status,
158 component: self.component.clone(),
159 reason: input.reason,
160 },
161 );
162 self.write_snapshot(&snap);
163 }
164
165 fn snapshot_path(&self) -> PathBuf {
166 let base = self
167 .config
168 .config_path
169 .parent()
170 .unwrap_or(Path::new("."))
171 .join(".ao-observability")
172 .join("processes");
173 let _ = std::fs::create_dir_all(&base);
174 base.join(format!(
175 "{}-{}.json",
176 sanitize_component(&self.component),
177 std::process::id()
178 ))
179 }
180
181 fn read_snapshot(&self) -> ProcessSnapshot {
182 let path = self.snapshot_path();
183 let content = std::fs::read_to_string(&path).ok();
184 if let Some(content) = content {
185 serde_json::from_str(&content).unwrap_or_else(|_| ProcessSnapshot {
186 component: self.component.clone(),
187 ..Default::default()
188 })
189 } else {
190 ProcessSnapshot {
191 component: self.component.clone(),
192 ..Default::default()
193 }
194 }
195 }
196
197 fn write_snapshot(&self, snap: &ProcessSnapshot) {
198 let path = self.snapshot_path();
199 let payload = serde_json::to_string_pretty(snap).unwrap_or_else(|e| {
200 tracing::warn!("observability snapshot serialization failed: {e}");
201 "{}".into()
202 }) + "\n";
203 let _ = crate::parity_metadata::atomic_write_file(&path, &payload);
204 }
205}
206
207pub fn read_observability_summary(config: TsObservabilityConfig) -> ObservabilitySummary {
208 let processes_dir = config
209 .config_path
210 .parent()
211 .unwrap_or(Path::new("."))
212 .join(".ao-observability")
213 .join("processes");
214 let mut summary = ObservabilitySummary {
215 overall_status: "ok".into(),
216 ..Default::default()
217 };
218 let Ok(rd) = std::fs::read_dir(processes_dir) else {
219 return summary;
220 };
221 for ent in rd.flatten() {
222 let Ok(content) = std::fs::read_to_string(ent.path()) else {
223 continue;
224 };
225 let Ok(snap) = serde_json::from_str::<ProcessSnapshot>(&content) else {
226 continue;
227 };
228 for trace in &snap.traces {
229 if let Some(project_id) = &trace.project_id {
230 let p = summary.projects.entry(project_id.clone()).or_default();
231 p.recent_traces.push(trace.clone());
232 }
233 }
234 for (metric, ctr) in &snap.metrics {
235 for project_id in snap
236 .traces
237 .iter()
238 .filter_map(|t| t.project_id.clone())
239 .collect::<std::collections::HashSet<_>>()
240 {
241 let p = summary.projects.entry(project_id).or_default();
242 let c = p.metrics.entry(metric.clone()).or_default();
243 c.total += ctr.total;
244 c.success += ctr.success;
245 c.failure += ctr.failure;
246 if c.last_failure_reason.is_none() {
247 c.last_failure_reason = ctr.last_failure_reason.clone();
248 }
249 }
250 }
251 for (sid, s) in &snap.sessions {
252 if let Some(project_id) = snap
253 .traces
254 .iter()
255 .rev()
256 .find(|t| t.session_id.as_deref() == Some(sid))
257 .and_then(|t| t.project_id.clone())
258 {
259 let p = summary.projects.entry(project_id).or_default();
260 p.sessions.insert(sid.clone(), s.clone());
261 }
262 }
263 for (surface, h) in &snap.health {
264 let status = h.status.as_str();
265 if status_priority(status) > status_priority(&summary.overall_status) {
266 summary.overall_status = status.to_string();
267 }
268 if let Some(project_id) = snap.traces.iter().rev().find_map(|t| t.project_id.clone()) {
269 let p = summary.projects.entry(project_id).or_default();
270 p.health.insert(surface.clone(), h.clone());
271 }
272 }
273 }
274 for p in summary.projects.values() {
275 for h in p.health.values() {
276 if status_priority(&h.status) > status_priority(&summary.overall_status) {
277 summary.overall_status = h.status.clone();
278 }
279 }
280 }
281 summary
282}
283
284fn status_priority(s: &str) -> u8 {
285 match s {
286 "error" => 3,
287 "warn" => 2,
288 _ => 1,
289 }
290}
291
292fn sanitize_component(component: &str) -> String {
293 let cleaned = component
294 .chars()
295 .map(|c| {
296 if c.is_ascii_alphanumeric() || c == '_' || c == '-' {
297 c
298 } else {
299 '-'
300 }
301 })
302 .collect::<String>();
303 cleaned.trim_matches('-').to_string()
304}