Skip to main content

mimir_librarian/
runner.rs

1//! Draft processing runner.
2//!
3//! This module owns the mechanics around a one-shot librarian run:
4//! recover stale `processing/` drafts, claim pending drafts, invoke a
5//! processor, then move each draft to its resulting lifecycle state.
6//! The actual LLM / validation / commit logic is injected through
7//! [`DraftProcessor`] and lands in later Category 1 slices.
8
9use std::time::{Duration, SystemTime};
10
11use serde::Serialize;
12
13use crate::{Draft, DraftState, DraftStore, LibrarianError};
14
15/// Default age after which a draft left in `processing/` is assumed
16/// abandoned and recovered to `pending/`.
17pub const DEFAULT_PROCESSING_STALE_SECS: u64 = 15 * 60;
18
19/// Decision returned by a draft processor.
20#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
21#[serde(rename_all = "snake_case")]
22pub enum DraftProcessingDecision {
23    /// Draft was accepted by the processor.
24    ///
25    /// In the current Category 1 slice this means the LLM output
26    /// passed bounded pre-emit validation and committed durably to the
27    /// canonical store.
28    Accepted,
29    /// Draft was intentionally skipped.
30    Skipped,
31    /// Draft failed processing and should be retained for review.
32    Failed,
33    /// Draft is unsafe, conflicting, or unresolved.
34    Quarantined,
35    /// Processor is not ready to make progress; draft returns to pending.
36    Deferred,
37}
38
39impl DraftProcessingDecision {
40    fn target_state(self) -> DraftState {
41        match self {
42            Self::Accepted => DraftState::Accepted,
43            Self::Skipped => DraftState::Skipped,
44            Self::Failed => DraftState::Failed,
45            Self::Quarantined => DraftState::Quarantined,
46            Self::Deferred => DraftState::Pending,
47        }
48    }
49
50    pub(crate) fn as_str(self) -> &'static str {
51        match self {
52            Self::Accepted => "accepted",
53            Self::Skipped => "skipped",
54            Self::Failed => "failed",
55            Self::Quarantined => "quarantined",
56            Self::Deferred => "deferred",
57        }
58    }
59}
60
61/// Processor invoked for each claimed draft.
62pub trait DraftProcessor {
63    /// Process one draft and decide where it should move next.
64    ///
65    /// The runner guarantees the draft has already been claimed into
66    /// `processing/`. It also owns the post-decision state move.
67    ///
68    /// # Errors
69    ///
70    /// Returns a librarian error when the processor cannot make a
71    /// safe lifecycle decision. The runner leaves the draft in
72    /// `processing/` in this case so a later stale-processing recovery
73    /// can retry it rather than silently marking it terminal.
74    fn process(&mut self, draft: &Draft) -> Result<DraftProcessingDecision, LibrarianError>;
75}
76
77/// Placeholder processor used until LLM structuring / validation /
78/// commit processing is wired.
79#[derive(Debug, Default)]
80pub struct DeferredDraftProcessor;
81
82impl DraftProcessor for DeferredDraftProcessor {
83    fn process(&mut self, _draft: &Draft) -> Result<DraftProcessingDecision, LibrarianError> {
84        Ok(DraftProcessingDecision::Deferred)
85    }
86}
87
88/// One processed draft row for operator-visible run summaries.
89#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
90pub struct DraftRunItem {
91    /// Draft ID.
92    pub id: String,
93    /// Processor decision.
94    pub decision: DraftProcessingDecision,
95    /// Final lifecycle state after the runner moved the file.
96    pub final_state: String,
97}
98
99/// Summary of one run.
100#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
101pub struct DraftRunSummary {
102    /// Stale `processing/` drafts recovered before claiming pending work.
103    pub recovered_processing: usize,
104    /// Pending drafts listed after recovery.
105    pub pending_seen: usize,
106    /// Pending drafts successfully claimed into `processing/`.
107    pub claimed: usize,
108    /// Drafts accepted by the processor.
109    pub accepted: usize,
110    /// Drafts skipped by the processor.
111    pub skipped: usize,
112    /// Drafts failed by the processor.
113    pub failed: usize,
114    /// Drafts quarantined by the processor.
115    pub quarantined: usize,
116    /// Drafts returned to `pending/` without terminal handling.
117    pub deferred: usize,
118    /// Drafts that disappeared between list and claim, usually due to
119    /// a concurrent runner. This is not fatal.
120    pub claim_misses: usize,
121    /// Per-draft movement report.
122    pub items: Vec<DraftRunItem>,
123}
124
125impl DraftRunSummary {
126    fn new(recovered_processing: usize, pending_seen: usize) -> Self {
127        Self {
128            recovered_processing,
129            pending_seen,
130            claimed: 0,
131            accepted: 0,
132            skipped: 0,
133            failed: 0,
134            quarantined: 0,
135            deferred: 0,
136            claim_misses: 0,
137            items: Vec::new(),
138        }
139    }
140
141    fn record(&mut self, id: String, decision: DraftProcessingDecision, final_state: DraftState) {
142        match decision {
143            DraftProcessingDecision::Accepted => self.accepted += 1,
144            DraftProcessingDecision::Skipped => self.skipped += 1,
145            DraftProcessingDecision::Failed => self.failed += 1,
146            DraftProcessingDecision::Quarantined => self.quarantined += 1,
147            DraftProcessingDecision::Deferred => self.deferred += 1,
148        }
149        self.items.push(DraftRunItem {
150            id,
151            decision,
152            final_state: final_state.dir_name().to_string(),
153        });
154    }
155}
156
157/// Run pending draft processing once.
158///
159/// `now` and `stale_after` are explicit so tests can be deterministic.
160///
161/// # Errors
162///
163/// Returns any draft store or processor error that prevents the run
164/// from preserving a clear lifecycle state.
165pub fn run_once<P: DraftProcessor>(
166    store: &DraftStore,
167    processor: &mut P,
168    now: SystemTime,
169    stale_after: Duration,
170) -> Result<DraftRunSummary, LibrarianError> {
171    let span = tracing::info_span!(
172        target: "mimir.librarian.run",
173        "mimir.librarian.run",
174        recovered_processing = tracing::field::Empty,
175        pending_seen = tracing::field::Empty,
176        claimed = tracing::field::Empty,
177        accepted = tracing::field::Empty,
178        skipped = tracing::field::Empty,
179        failed = tracing::field::Empty,
180        quarantined = tracing::field::Empty,
181        deferred = tracing::field::Empty,
182        claim_misses = tracing::field::Empty,
183    );
184    let _guard = span.enter();
185
186    let stale_before = now
187        .checked_sub(stale_after)
188        .unwrap_or(SystemTime::UNIX_EPOCH);
189    let recovered = store.recover_stale_processing(stale_before)?;
190    let pending = store.list(DraftState::Pending)?;
191    let mut summary = DraftRunSummary::new(recovered.len(), pending.len());
192    record_summary_fields(&span, &summary);
193
194    for draft in pending {
195        let id = draft.id();
196        let id_hex = id.to_hex();
197        match store.transition(id, DraftState::Pending, DraftState::Processing) {
198            Ok(_) => {
199                summary.claimed += 1;
200                span.record("claimed", count_u64(summary.claimed));
201            }
202            Err(LibrarianError::DraftNotFound {
203                state: DraftState::Pending,
204                id: missing,
205            }) if missing == id => {
206                summary.claim_misses += 1;
207                span.record("claim_misses", count_u64(summary.claim_misses));
208                continue;
209            }
210            Err(err) => return Err(err),
211        }
212
213        let decision = processor.process(&draft)?;
214        let final_state = decision.target_state();
215        store.transition(id, DraftState::Processing, final_state)?;
216        summary.record(id_hex, decision, final_state);
217        record_summary_fields(&span, &summary);
218        tracing::info!(
219            target: "mimir.librarian.draft_processed",
220            draft_id = %id,
221            decision = decision.as_str(),
222            final_state = final_state.dir_name(),
223            "draft processed"
224        );
225    }
226
227    Ok(summary)
228}
229
230fn record_summary_fields(span: &tracing::Span, summary: &DraftRunSummary) {
231    span.record(
232        "recovered_processing",
233        count_u64(summary.recovered_processing),
234    );
235    span.record("pending_seen", count_u64(summary.pending_seen));
236    span.record("claimed", count_u64(summary.claimed));
237    span.record("accepted", count_u64(summary.accepted));
238    span.record("skipped", count_u64(summary.skipped));
239    span.record("failed", count_u64(summary.failed));
240    span.record("quarantined", count_u64(summary.quarantined));
241    span.record("deferred", count_u64(summary.deferred));
242    span.record("claim_misses", count_u64(summary.claim_misses));
243}
244
245fn count_u64(value: usize) -> u64 {
246    u64::try_from(value).unwrap_or(u64::MAX)
247}
248
249#[cfg(test)]
250mod tests {
251    use super::*;
252
253    use crate::test_tracing::{capture, FieldValue};
254    use crate::{DraftMetadata, DraftSourceSurface};
255
256    #[derive(Debug)]
257    struct TextMatchingProcessor;
258
259    impl DraftProcessor for TextMatchingProcessor {
260        fn process(&mut self, draft: &Draft) -> Result<DraftProcessingDecision, LibrarianError> {
261            if draft.raw_text().contains("accept") {
262                Ok(DraftProcessingDecision::Accepted)
263            } else if draft.raw_text().contains("skip") {
264                Ok(DraftProcessingDecision::Skipped)
265            } else if draft.raw_text().contains("fail") {
266                Ok(DraftProcessingDecision::Failed)
267            } else if draft.raw_text().contains("quarantine") {
268                Ok(DraftProcessingDecision::Quarantined)
269            } else {
270                Err(LibrarianError::NotYetImplemented {
271                    component: "test processor missing decision",
272                })
273            }
274        }
275    }
276
277    #[derive(Debug)]
278    struct SkipProcessor;
279
280    impl DraftProcessor for SkipProcessor {
281        fn process(&mut self, _draft: &Draft) -> Result<DraftProcessingDecision, LibrarianError> {
282            Ok(DraftProcessingDecision::Skipped)
283        }
284    }
285
286    fn draft(text: &str) -> Draft {
287        Draft::with_metadata(
288            text.to_string(),
289            DraftMetadata::new(DraftSourceSurface::Cli, SystemTime::UNIX_EPOCH),
290        )
291    }
292
293    #[test]
294    fn run_once_moves_pending_drafts_to_processor_terminal_states(
295    ) -> Result<(), Box<dyn std::error::Error>> {
296        let tmp = tempfile::tempdir()?;
297        let store = DraftStore::new(tmp.path());
298        let accepted = draft("accept this draft");
299        let skipped = draft("skip this draft");
300        let failed = draft("fail this draft");
301        let quarantined = draft("quarantine this draft");
302        for draft in [&accepted, &skipped, &failed, &quarantined] {
303            store.submit(draft)?;
304        }
305
306        let mut processor = TextMatchingProcessor;
307        let summary = run_once(
308            &store,
309            &mut processor,
310            SystemTime::UNIX_EPOCH + Duration::from_secs(30),
311            Duration::from_secs(10),
312        )?;
313
314        assert_eq!(summary.recovered_processing, 0);
315        assert_eq!(summary.pending_seen, 4);
316        assert_eq!(summary.claimed, 4);
317        assert_eq!(summary.accepted, 1);
318        assert_eq!(summary.skipped, 1);
319        assert_eq!(summary.failed, 1);
320        assert_eq!(summary.quarantined, 1);
321        assert_eq!(summary.deferred, 0);
322        assert_eq!(store.list(DraftState::Pending)?.len(), 0);
323        assert_eq!(
324            store.load(DraftState::Accepted, accepted.id())?.id(),
325            accepted.id()
326        );
327        assert_eq!(
328            store.load(DraftState::Skipped, skipped.id())?.id(),
329            skipped.id()
330        );
331        assert_eq!(
332            store.load(DraftState::Failed, failed.id())?.id(),
333            failed.id()
334        );
335        assert_eq!(
336            store.load(DraftState::Quarantined, quarantined.id())?.id(),
337            quarantined.id()
338        );
339        Ok(())
340    }
341
342    #[test]
343    fn run_once_defers_without_terminal_state() -> Result<(), Box<dyn std::error::Error>> {
344        let tmp = tempfile::tempdir()?;
345        let store = DraftStore::new(tmp.path());
346        let draft = draft("processor is not ready yet");
347        store.submit(&draft)?;
348
349        let mut processor = DeferredDraftProcessor;
350        let summary = run_once(
351            &store,
352            &mut processor,
353            SystemTime::UNIX_EPOCH + Duration::from_secs(30),
354            Duration::from_secs(10),
355        )?;
356
357        assert_eq!(summary.claimed, 1);
358        assert_eq!(summary.deferred, 1);
359        assert_eq!(summary.items[0].final_state, "pending");
360        assert_eq!(store.list(DraftState::Pending)?.len(), 1);
361        assert_eq!(store.list(DraftState::Processing)?.len(), 0);
362        Ok(())
363    }
364
365    #[test]
366    fn run_once_recovers_stale_processing_before_pending_scan(
367    ) -> Result<(), Box<dyn std::error::Error>> {
368        let tmp = tempfile::tempdir()?;
369        let store = DraftStore::new(tmp.path());
370        let draft = draft("recover me first");
371        store.submit(&draft)?;
372        store.transition(draft.id(), DraftState::Pending, DraftState::Processing)?;
373
374        let mut processor = SkipProcessor;
375        let summary = run_once(
376            &store,
377            &mut processor,
378            SystemTime::now() + Duration::from_secs(60),
379            Duration::from_secs(1),
380        )?;
381
382        assert_eq!(summary.recovered_processing, 1);
383        assert_eq!(summary.pending_seen, 1);
384        assert_eq!(summary.skipped, 1);
385        assert_eq!(store.list(DraftState::Processing)?.len(), 0);
386        assert_eq!(store.list(DraftState::Skipped)?.len(), 1);
387        Ok(())
388    }
389
390    #[test]
391    fn run_once_emits_summary_span() -> Result<(), Box<dyn std::error::Error>> {
392        let tmp = tempfile::tempdir()?;
393        let store = DraftStore::new(tmp.path());
394        let accepted = draft("accept this draft");
395        let skipped = draft("skip this draft");
396        store.submit(&accepted)?;
397        store.submit(&skipped)?;
398
399        let mut run_result = None;
400        let shared = capture(|| {
401            let mut processor = TextMatchingProcessor;
402            run_result = Some(run_once(
403                &store,
404                &mut processor,
405                SystemTime::UNIX_EPOCH + Duration::from_secs(30),
406                Duration::from_secs(10),
407            ));
408        });
409        let summary = match run_result {
410            Some(Ok(summary)) => summary,
411            Some(Err(err)) => return Err(Box::new(err)),
412            None => return Err("run did not execute".into()),
413        };
414        assert_eq!(summary.claimed, 2);
415
416        let spans = shared
417            .spans
418            .lock()
419            .map_err(|err| format!("spans lock poisoned: {err}"))?;
420        let Some(span) = spans.iter().find(|span| {
421            span.name == "mimir.librarian.run"
422                && span.fields.get("pending_seen").and_then(FieldValue::as_u64) == Some(2)
423                && span.fields.get("accepted").and_then(FieldValue::as_u64) == Some(1)
424                && span.fields.get("skipped").and_then(FieldValue::as_u64) == Some(1)
425        }) else {
426            return Err("run span missing".into());
427        };
428        assert_eq!(
429            span.fields.get("pending_seen").and_then(FieldValue::as_u64),
430            Some(2),
431        );
432        assert_eq!(
433            span.fields.get("accepted").and_then(FieldValue::as_u64),
434            Some(1),
435        );
436        assert_eq!(
437            span.fields.get("skipped").and_then(FieldValue::as_u64),
438            Some(1),
439        );
440        Ok(())
441    }
442}