Skip to main content

rust_memex/tui/indexer/
contracts.rs

1//! Indexer event contracts shared by the scheduler, TUI, and future consumers.
2
3use std::collections::VecDeque;
4use std::time::Duration;
5
6use chrono::{DateTime, Utc};
7use serde::{Deserialize, Serialize};
8use tokio::sync::watch;
9
10/// Events emitted by the indexing scheduler.
11#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
12#[serde(tag = "type", rename_all = "snake_case")]
13pub enum IndexEvent {
14    RunStarted {
15        total_files: usize,
16        namespace: String,
17        source_dir: String,
18        parallelism: usize,
19        started_at: DateTime<Utc>,
20    },
21    FileStarted {
22        file_index: usize,
23        path: String,
24        size_bytes: u64,
25    },
26    FileIndexed {
27        file_index: usize,
28        path: String,
29        chunks_indexed: usize,
30        content_hash: String,
31        duration_ms: u64,
32        #[serde(skip_serializing_if = "Option::is_none")]
33        embedder_ms: Option<u64>,
34        #[serde(skip_serializing_if = "Option::is_none")]
35        tokens_estimated: Option<usize>,
36    },
37    FileSkipped {
38        file_index: usize,
39        path: String,
40        reason: String,
41        #[serde(skip_serializing_if = "Option::is_none")]
42        content_hash: Option<String>,
43    },
44    FileFailed {
45        file_index: usize,
46        path: String,
47        error: String,
48    },
49    StatsTick {
50        processed: usize,
51        indexed: usize,
52        skipped: usize,
53        failed: usize,
54        total: usize,
55        files_per_sec: f64,
56        eta_secs: Option<f64>,
57        total_chunks: usize,
58        in_flight: usize,
59    },
60    RunCompleted {
61        processed: usize,
62        indexed: usize,
63        skipped: usize,
64        failed: usize,
65        total_chunks: usize,
66        elapsed: Duration,
67        stopped_early: bool,
68    },
69    RunFailed {
70        error: String,
71        processed_before_failure: usize,
72    },
73    Paused,
74    Resumed,
75    ParallelismChanged {
76        previous: usize,
77        current: usize,
78    },
79    StopRequested,
80    Warning {
81        code: String,
82        message: String,
83    },
84}
85
86/// Event sinks must stay synchronous and infallible.
87pub trait IndexEventSink: Send + Sync {
88    fn on_event(&self, event: &IndexEvent);
89}
90
91/// Maximum number of recent warnings kept in the live snapshot.
92pub const MAX_RECENT_WARNINGS: usize = 20;
93
94/// Pull-friendly indexing telemetry snapshot for the TUI and future tooling.
95#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
96pub struct IndexTelemetrySnapshot {
97    pub namespace: String,
98    pub source_dir: String,
99    pub started_at: Option<DateTime<Utc>>,
100    pub total: usize,
101    pub processed: usize,
102    pub indexed: usize,
103    pub skipped: usize,
104    pub failed: usize,
105    pub total_chunks: usize,
106    pub current_file: Option<String>,
107    pub in_flight: usize,
108    pub parallelism: usize,
109    pub paused: bool,
110    pub stopping: bool,
111    pub files_per_sec: f64,
112    pub eta_secs: Option<f64>,
113    pub elapsed: Duration,
114    pub avg_embedder_ms: Option<f64>,
115    pub total_tokens_estimated: usize,
116    pub complete: bool,
117    pub stopped_early: bool,
118    pub fatal_error: Option<String>,
119    pub recent_warnings: VecDeque<WarningEntry>,
120}
121
122impl Default for IndexTelemetrySnapshot {
123    fn default() -> Self {
124        Self {
125            namespace: String::new(),
126            source_dir: String::new(),
127            started_at: None,
128            total: 0,
129            processed: 0,
130            indexed: 0,
131            skipped: 0,
132            failed: 0,
133            total_chunks: 0,
134            current_file: None,
135            in_flight: 0,
136            parallelism: 1,
137            paused: false,
138            stopping: false,
139            files_per_sec: 0.0,
140            eta_secs: None,
141            elapsed: Duration::ZERO,
142            avg_embedder_ms: None,
143            total_tokens_estimated: 0,
144            complete: false,
145            stopped_early: false,
146            fatal_error: None,
147            recent_warnings: VecDeque::new(),
148        }
149    }
150}
151
152/// Warning displayed in the dashboard.
153#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
154pub struct WarningEntry {
155    pub code: String,
156    pub message: String,
157    pub at: DateTime<Utc>,
158}
159
160/// Shared watch sender for the latest telemetry snapshot.
161///
162/// This mirrors the rust-mux `publish_status` pattern: writers can emit
163/// frequently while consumers always observe the newest coalesced snapshot.
164pub type SharedIndexTelemetry = watch::Sender<IndexTelemetrySnapshot>;
165
166/// Build the watch channel used by the dashboard.
167pub fn new_index_telemetry() -> (
168    SharedIndexTelemetry,
169    watch::Receiver<IndexTelemetrySnapshot>,
170) {
171    watch::channel(IndexTelemetrySnapshot::default())
172}
173
174/// Control messages sent from the TUI to the scheduler.
175#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
176#[serde(tag = "type", rename_all = "snake_case")]
177pub enum IndexControl {
178    Pause,
179    Resume,
180    SetParallelism(usize),
181    Stop,
182}
183
184/// Bounded control channel size.
185pub const INDEX_CONTROL_CHANNEL_CAPACITY: usize = 16;
186
187#[cfg(test)]
188mod tests {
189    use super::*;
190
191    #[test]
192    fn index_event_serde_roundtrip_representative_variants() {
193        let events = vec![
194            IndexEvent::RunStarted {
195                total_files: 12,
196                namespace: "kb:test".to_string(),
197                source_dir: "/tmp/input".to_string(),
198                parallelism: 4,
199                started_at: Utc::now(),
200            },
201            IndexEvent::FileStarted {
202                file_index: 2,
203                path: "notes.md".to_string(),
204                size_bytes: 512,
205            },
206            IndexEvent::FileIndexed {
207                file_index: 2,
208                path: "notes.md".to_string(),
209                chunks_indexed: 7,
210                content_hash: "abc123".to_string(),
211                duration_ms: 231,
212                embedder_ms: Some(187),
213                tokens_estimated: Some(128),
214            },
215            IndexEvent::StatsTick {
216                processed: 8,
217                indexed: 6,
218                skipped: 1,
219                failed: 1,
220                total: 12,
221                files_per_sec: 1.5,
222                eta_secs: Some(2.6),
223                total_chunks: 18,
224                in_flight: 2,
225            },
226            IndexEvent::RunCompleted {
227                processed: 12,
228                indexed: 9,
229                skipped: 2,
230                failed: 1,
231                total_chunks: 28,
232                elapsed: Duration::from_secs(12),
233                stopped_early: false,
234            },
235        ];
236
237        for event in events {
238            let json = serde_json::to_string(&event).expect("serialize event");
239            let roundtrip: IndexEvent = serde_json::from_str(&json).expect("deserialize event");
240            assert_eq!(roundtrip, event);
241        }
242    }
243}