Skip to main content

difflore_core/observability/
activity_stream.rs

1//! Real-time "memory pipeline" event stream powering the TUI's Activity tab.
2//!
3//! Mirrors jcode's `MemoryEvent` design: each retrieval / injection /
4//! reinforcement emits a small typed record to a JSONL file at
5//! `$DIFFLORE_HOME/activity.jsonl`. The TUI tail-reads this file and
6//! renders the last N events so users SEE rules being recalled and
7//! reinforced as their agent runs.
8//!
9//! Why a JSONL file rather than an in-process channel: the MCP server,
10//! CLI fix command, and TUI run as separate processes. A file is the
11//! cheapest cross-process bus that survives a TUI restart and needs no
12//! daemon. Capped at 1000 lines via tail-rotation so it never grows
13//! without bound.
14
15use std::fs;
16use std::io::Write;
17use std::path::PathBuf;
18
19use serde::{Deserialize, Serialize};
20
21/// Hard cap on retained events. When a write would exceed this, the
22/// file is truncated to the last `MAX_EVENTS - 1` lines plus the new
23/// one. Keeps the file readable in `cat` and bounded on disk.
24pub const MAX_EVENTS: usize = 1000;
25
26/// One line in `activity.jsonl`. `kind` is the discriminant; payload
27/// fields live alongside it (flat layout — easier for ad-hoc `jq`
28/// inspection than a tagged enum).
29#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
30#[serde(rename_all = "camelCase")]
31pub struct ActivityEvent {
32    pub ts_ms: i64,
33    #[serde(flatten)]
34    pub payload: ActivityPayload,
35}
36
37#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
38#[serde(tag = "kind", rename_all = "snake_case")]
39pub enum ActivityPayload {
40    /// One rule surfaced by retrieval. Emitted once per top-K rule per
41    /// MCP call so the TUI can show a stream of "what just got pulled".
42    RuleRecalled {
43        rule_id: String,
44        rule_title: String,
45        score: f32,
46        took_ms: u64,
47    },
48    /// Aggregate signal for an MCP call: how many rules were placed in
49    /// the agent's context window, with an intent summary the user can
50    /// scan ("review src/foo.rs · general").
51    RuleInjected {
52        rule_count: u32,
53        prompt_chars: u32,
54        intent_summary: String,
55    },
56    /// Confidence shift on a single rule. `reason` is one of
57    /// {"`recalled","cited","fix_accepted","fix_rejected`"}. Stored
58    /// as `String` so the round-tripped event from JSONL still
59    /// deserialises (serde flatten + 'static refs are incompatible).
60    RuleReinforced {
61        rule_id: String,
62        rule_title: String,
63        prev_strength: f32,
64        new_strength: f32,
65        reason: String,
66    },
67    /// ANN / hybrid retrieval pass result. Lets the user see the engine
68    /// is actually running and how fast.
69    RetrievalEmbedding { hits: u32, took_ms: u64 },
70    /// Cloud-managed embedding cap hit. Emitted whenever the cloud
71    /// returns `409 embed_cap_reached` so `difflore doctor` can surface
72    /// "you've been hitting the Free-tier embed cap N times this week —
73    /// upgrade or switch to BYOK". `cap` is the tier ceiling; `used` is
74    /// the value the cloud reported.
75    EmbedCapReached { cap: u32, used: u32 },
76    /// Semantic embedding provider fell back to local SHA1 after retry.
77    /// `reason` is a short sanitized bucket such as "network", "scope",
78    /// "cap", or "empty"; raw provider errors are intentionally not
79    /// persisted because they can contain URLs or request context.
80    EmbeddingFallback { reason: String },
81}
82
83fn now_ms() -> i64 {
84    use std::time::{SystemTime, UNIX_EPOCH};
85    SystemTime::now()
86        .duration_since(UNIX_EPOCH)
87        .map_or(0, |d| i64::try_from(d.as_millis()).unwrap_or(i64::MAX))
88}
89
90fn log_path() -> Option<PathBuf> {
91    crate::paths::data_home()
92        .ok()
93        .map(|dir| dir.join("activity.jsonl"))
94}
95
96/// Append `payload` to the activity log. Best-effort: any IO failure is
97/// swallowed so telemetry never breaks the caller. When the file would
98/// exceed `MAX_EVENTS` lines, rotates by tail-keeping the most recent.
99pub fn record(payload: ActivityPayload) {
100    let Some(path) = log_path() else {
101        return;
102    };
103    let event = ActivityEvent {
104        ts_ms: now_ms(),
105        payload,
106    };
107    let Ok(line) = serde_json::to_string(&event) else {
108        return;
109    };
110    let _ = append_with_cap(&path, &line);
111}
112
113fn append_with_cap(path: &std::path::Path, line: &str) -> std::io::Result<()> {
114    if let Some(parent) = path.parent() {
115        fs::create_dir_all(parent)?;
116    }
117
118    // Cheap path: file is small, just append. Read once and pass the
119    // string into the rotate branch — the previous shape read twice,
120    // which on the rotate path could see the second read fail (file
121    // deleted/renamed between calls) and silently truncate the log to
122    // just the new entry. One read = single source of truth.
123    let existing = fs::read_to_string(path).unwrap_or_default();
124    if existing.lines().count() >= MAX_EVENTS {
125        // Tail-rotate: keep the last MAX_EVENTS-1 lines, then append.
126        let mut kept: Vec<&str> = existing.lines().collect();
127        let drop = kept.len().saturating_sub(MAX_EVENTS - 1);
128        kept.drain(..drop);
129        let mut out = kept.join("\n");
130        if !out.is_empty() {
131            out.push('\n');
132        }
133        out.push_str(line);
134        out.push('\n');
135        fs::write(path, out)?;
136        return Ok(());
137    }
138
139    let mut f = fs::OpenOptions::new()
140        .create(true)
141        .append(true)
142        .open(path)?;
143    writeln!(f, "{line}")?;
144    Ok(())
145}
146
147/// Read the last `n` events from disk, newest-first. Best-effort: a
148/// missing or unreadable file yields an empty Vec so the TUI can render
149/// an empty state without erroring.
150pub fn tail(n: usize) -> Vec<ActivityEvent> {
151    let Some(path) = log_path() else {
152        return Vec::new();
153    };
154    let Ok(raw) = fs::read_to_string(&path) else {
155        return Vec::new();
156    };
157    raw.lines()
158        .rev()
159        .take(n)
160        .filter_map(|l| serde_json::from_str(l).ok())
161        .collect()
162}
163
164/// Test-friendly variant of `record` that writes to an explicit path
165/// instead of `$DIFFLORE_HOME/activity.jsonl`. Production callers should
166/// use `record`; tests use this to avoid a shared-env-var race against
167/// the workspace-wide `shared_test_home()`.
168pub fn record_to(path: &std::path::Path, payload: ActivityPayload) -> std::io::Result<()> {
169    let event = ActivityEvent {
170        ts_ms: now_ms(),
171        payload,
172    };
173    let line = serde_json::to_string(&event)
174        .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
175    append_with_cap(path, &line)
176}
177
178/// Test-friendly variant of `tail`.
179pub fn tail_from(path: &std::path::Path, n: usize) -> Vec<ActivityEvent> {
180    let Ok(raw) = fs::read_to_string(path) else {
181        return Vec::new();
182    };
183    raw.lines()
184        .rev()
185        .take(n)
186        .filter_map(|l| serde_json::from_str(l).ok())
187        .collect()
188}
189
190#[cfg(test)]
191mod tests {
192    use super::*;
193
194    #[test]
195    fn writer_caps_at_max_events() {
196        let dir = tempfile::tempdir().unwrap();
197        let path = dir.path().join("activity.jsonl");
198        for i in 0..=MAX_EVENTS {
199            record_to(
200                &path,
201                ActivityPayload::RuleRecalled {
202                    rule_id: format!("r{i}"),
203                    rule_title: "t".into(),
204                    score: 0.1,
205                    took_ms: 1,
206                },
207            )
208            .unwrap();
209        }
210        let events = tail_from(&path, MAX_EVENTS + 50);
211        assert_eq!(
212            events.len(),
213            MAX_EVENTS,
214            "file should be capped at {MAX_EVENTS} entries"
215        );
216        if let ActivityPayload::RuleRecalled { rule_id, .. } = &events[0].payload {
217            assert_eq!(rule_id, &format!("r{MAX_EVENTS}"));
218        } else {
219            panic!("unexpected payload kind on top");
220        }
221    }
222
223    #[test]
224    fn tail_returns_newest_first() {
225        let dir = tempfile::tempdir().unwrap();
226        let path = dir.path().join("activity.jsonl");
227        record_to(
228            &path,
229            ActivityPayload::RuleInjected {
230                rule_count: 1,
231                prompt_chars: 10,
232                intent_summary: "first".into(),
233            },
234        )
235        .unwrap();
236        record_to(
237            &path,
238            ActivityPayload::RuleInjected {
239                rule_count: 2,
240                prompt_chars: 20,
241                intent_summary: "second".into(),
242            },
243        )
244        .unwrap();
245        let events = tail_from(&path, 10);
246        assert_eq!(events.len(), 2);
247        if let ActivityPayload::RuleInjected { intent_summary, .. } = &events[0].payload {
248            assert_eq!(intent_summary, "second");
249        } else {
250            panic!("expected RuleInjected on top");
251        }
252    }
253
254    #[test]
255    fn embedding_fallback_round_trips_sanitized_reason() {
256        let dir = tempfile::tempdir().unwrap();
257        let path = dir.path().join("activity.jsonl");
258        record_to(
259            &path,
260            ActivityPayload::EmbeddingFallback {
261                reason: "network".into(),
262            },
263        )
264        .unwrap();
265        let events = tail_from(&path, 10);
266        assert_eq!(events.len(), 1);
267        if let ActivityPayload::EmbeddingFallback { reason } = &events[0].payload {
268            assert_eq!(reason, "network");
269        } else {
270            panic!("expected EmbeddingFallback on top");
271        }
272    }
273}