difflore_core/observability/
activity_stream.rs1use std::fs;
16use std::io::Write;
17use std::path::PathBuf;
18
19use serde::{Deserialize, Serialize};
20
21pub const MAX_EVENTS: usize = 1000;
25
26#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
30#[serde(rename_all = "camelCase")]
31pub struct ActivityEvent {
32 pub ts_ms: i64,
33 #[serde(flatten)]
34 pub payload: ActivityPayload,
35}
36
37#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
38#[serde(tag = "kind", rename_all = "snake_case")]
39pub enum ActivityPayload {
40 RuleRecalled {
43 rule_id: String,
44 rule_title: String,
45 score: f32,
46 took_ms: u64,
47 },
48 RuleInjected {
52 rule_count: u32,
53 prompt_chars: u32,
54 intent_summary: String,
55 },
56 RuleReinforced {
61 rule_id: String,
62 rule_title: String,
63 prev_strength: f32,
64 new_strength: f32,
65 reason: String,
66 },
67 RetrievalEmbedding { hits: u32, took_ms: u64 },
70 EmbedCapReached { cap: u32, used: u32 },
76 EmbeddingFallback { reason: String },
81}
82
83fn now_ms() -> i64 {
84 use std::time::{SystemTime, UNIX_EPOCH};
85 SystemTime::now()
86 .duration_since(UNIX_EPOCH)
87 .map_or(0, |d| i64::try_from(d.as_millis()).unwrap_or(i64::MAX))
88}
89
90fn log_path() -> Option<PathBuf> {
91 crate::paths::data_home()
92 .ok()
93 .map(|dir| dir.join("activity.jsonl"))
94}
95
96pub fn record(payload: ActivityPayload) {
100 let Some(path) = log_path() else {
101 return;
102 };
103 let event = ActivityEvent {
104 ts_ms: now_ms(),
105 payload,
106 };
107 let Ok(line) = serde_json::to_string(&event) else {
108 return;
109 };
110 let _ = append_with_cap(&path, &line);
111}
112
113fn append_with_cap(path: &std::path::Path, line: &str) -> std::io::Result<()> {
114 if let Some(parent) = path.parent() {
115 fs::create_dir_all(parent)?;
116 }
117
118 let existing = fs::read_to_string(path).unwrap_or_default();
124 if existing.lines().count() >= MAX_EVENTS {
125 let mut kept: Vec<&str> = existing.lines().collect();
127 let drop = kept.len().saturating_sub(MAX_EVENTS - 1);
128 kept.drain(..drop);
129 let mut out = kept.join("\n");
130 if !out.is_empty() {
131 out.push('\n');
132 }
133 out.push_str(line);
134 out.push('\n');
135 fs::write(path, out)?;
136 return Ok(());
137 }
138
139 let mut f = fs::OpenOptions::new()
140 .create(true)
141 .append(true)
142 .open(path)?;
143 writeln!(f, "{line}")?;
144 Ok(())
145}
146
147pub fn tail(n: usize) -> Vec<ActivityEvent> {
151 let Some(path) = log_path() else {
152 return Vec::new();
153 };
154 let Ok(raw) = fs::read_to_string(&path) else {
155 return Vec::new();
156 };
157 raw.lines()
158 .rev()
159 .take(n)
160 .filter_map(|l| serde_json::from_str(l).ok())
161 .collect()
162}
163
164pub fn record_to(path: &std::path::Path, payload: ActivityPayload) -> std::io::Result<()> {
169 let event = ActivityEvent {
170 ts_ms: now_ms(),
171 payload,
172 };
173 let line = serde_json::to_string(&event)
174 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
175 append_with_cap(path, &line)
176}
177
178pub fn tail_from(path: &std::path::Path, n: usize) -> Vec<ActivityEvent> {
180 let Ok(raw) = fs::read_to_string(path) else {
181 return Vec::new();
182 };
183 raw.lines()
184 .rev()
185 .take(n)
186 .filter_map(|l| serde_json::from_str(l).ok())
187 .collect()
188}
189
190#[cfg(test)]
191mod tests {
192 use super::*;
193
194 #[test]
195 fn writer_caps_at_max_events() {
196 let dir = tempfile::tempdir().unwrap();
197 let path = dir.path().join("activity.jsonl");
198 for i in 0..=MAX_EVENTS {
199 record_to(
200 &path,
201 ActivityPayload::RuleRecalled {
202 rule_id: format!("r{i}"),
203 rule_title: "t".into(),
204 score: 0.1,
205 took_ms: 1,
206 },
207 )
208 .unwrap();
209 }
210 let events = tail_from(&path, MAX_EVENTS + 50);
211 assert_eq!(
212 events.len(),
213 MAX_EVENTS,
214 "file should be capped at {MAX_EVENTS} entries"
215 );
216 if let ActivityPayload::RuleRecalled { rule_id, .. } = &events[0].payload {
217 assert_eq!(rule_id, &format!("r{MAX_EVENTS}"));
218 } else {
219 panic!("unexpected payload kind on top");
220 }
221 }
222
223 #[test]
224 fn tail_returns_newest_first() {
225 let dir = tempfile::tempdir().unwrap();
226 let path = dir.path().join("activity.jsonl");
227 record_to(
228 &path,
229 ActivityPayload::RuleInjected {
230 rule_count: 1,
231 prompt_chars: 10,
232 intent_summary: "first".into(),
233 },
234 )
235 .unwrap();
236 record_to(
237 &path,
238 ActivityPayload::RuleInjected {
239 rule_count: 2,
240 prompt_chars: 20,
241 intent_summary: "second".into(),
242 },
243 )
244 .unwrap();
245 let events = tail_from(&path, 10);
246 assert_eq!(events.len(), 2);
247 if let ActivityPayload::RuleInjected { intent_summary, .. } = &events[0].payload {
248 assert_eq!(intent_summary, "second");
249 } else {
250 panic!("expected RuleInjected on top");
251 }
252 }
253
254 #[test]
255 fn embedding_fallback_round_trips_sanitized_reason() {
256 let dir = tempfile::tempdir().unwrap();
257 let path = dir.path().join("activity.jsonl");
258 record_to(
259 &path,
260 ActivityPayload::EmbeddingFallback {
261 reason: "network".into(),
262 },
263 )
264 .unwrap();
265 let events = tail_from(&path, 10);
266 assert_eq!(events.len(), 1);
267 if let ActivityPayload::EmbeddingFallback { reason } = &events[0].payload {
268 assert_eq!(reason, "network");
269 } else {
270 panic!("expected EmbeddingFallback on top");
271 }
272 }
273}