1use std::time::{Duration, SystemTime};
10
11use serde::Serialize;
12
13use crate::{Draft, DraftState, DraftStore, LibrarianError};
14
15pub const DEFAULT_PROCESSING_STALE_SECS: u64 = 15 * 60;
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
21#[serde(rename_all = "snake_case")]
22pub enum DraftProcessingDecision {
23 Accepted,
29 Skipped,
31 Failed,
33 Quarantined,
35 Deferred,
37}
38
39impl DraftProcessingDecision {
40 fn target_state(self) -> DraftState {
41 match self {
42 Self::Accepted => DraftState::Accepted,
43 Self::Skipped => DraftState::Skipped,
44 Self::Failed => DraftState::Failed,
45 Self::Quarantined => DraftState::Quarantined,
46 Self::Deferred => DraftState::Pending,
47 }
48 }
49
50 pub(crate) fn as_str(self) -> &'static str {
51 match self {
52 Self::Accepted => "accepted",
53 Self::Skipped => "skipped",
54 Self::Failed => "failed",
55 Self::Quarantined => "quarantined",
56 Self::Deferred => "deferred",
57 }
58 }
59}
60
61pub trait DraftProcessor {
63 fn process(&mut self, draft: &Draft) -> Result<DraftProcessingDecision, LibrarianError>;
75}
76
77#[derive(Debug, Default)]
80pub struct DeferredDraftProcessor;
81
82impl DraftProcessor for DeferredDraftProcessor {
83 fn process(&mut self, _draft: &Draft) -> Result<DraftProcessingDecision, LibrarianError> {
84 Ok(DraftProcessingDecision::Deferred)
85 }
86}
87
88#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
90pub struct DraftRunItem {
91 pub id: String,
93 pub decision: DraftProcessingDecision,
95 pub final_state: String,
97}
98
99#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
101pub struct DraftRunSummary {
102 pub recovered_processing: usize,
104 pub pending_seen: usize,
106 pub claimed: usize,
108 pub accepted: usize,
110 pub skipped: usize,
112 pub failed: usize,
114 pub quarantined: usize,
116 pub deferred: usize,
118 pub claim_misses: usize,
121 pub items: Vec<DraftRunItem>,
123}
124
125impl DraftRunSummary {
126 fn new(recovered_processing: usize, pending_seen: usize) -> Self {
127 Self {
128 recovered_processing,
129 pending_seen,
130 claimed: 0,
131 accepted: 0,
132 skipped: 0,
133 failed: 0,
134 quarantined: 0,
135 deferred: 0,
136 claim_misses: 0,
137 items: Vec::new(),
138 }
139 }
140
141 fn record(&mut self, id: String, decision: DraftProcessingDecision, final_state: DraftState) {
142 match decision {
143 DraftProcessingDecision::Accepted => self.accepted += 1,
144 DraftProcessingDecision::Skipped => self.skipped += 1,
145 DraftProcessingDecision::Failed => self.failed += 1,
146 DraftProcessingDecision::Quarantined => self.quarantined += 1,
147 DraftProcessingDecision::Deferred => self.deferred += 1,
148 }
149 self.items.push(DraftRunItem {
150 id,
151 decision,
152 final_state: final_state.dir_name().to_string(),
153 });
154 }
155}
156
157pub fn run_once<P: DraftProcessor>(
166 store: &DraftStore,
167 processor: &mut P,
168 now: SystemTime,
169 stale_after: Duration,
170) -> Result<DraftRunSummary, LibrarianError> {
171 let span = tracing::info_span!(
172 target: "mimir.librarian.run",
173 "mimir.librarian.run",
174 recovered_processing = tracing::field::Empty,
175 pending_seen = tracing::field::Empty,
176 claimed = tracing::field::Empty,
177 accepted = tracing::field::Empty,
178 skipped = tracing::field::Empty,
179 failed = tracing::field::Empty,
180 quarantined = tracing::field::Empty,
181 deferred = tracing::field::Empty,
182 claim_misses = tracing::field::Empty,
183 );
184 let _guard = span.enter();
185
186 let stale_before = now
187 .checked_sub(stale_after)
188 .unwrap_or(SystemTime::UNIX_EPOCH);
189 let recovered = store.recover_stale_processing(stale_before)?;
190 let pending = store.list(DraftState::Pending)?;
191 let mut summary = DraftRunSummary::new(recovered.len(), pending.len());
192 record_summary_fields(&span, &summary);
193
194 for draft in pending {
195 let id = draft.id();
196 let id_hex = id.to_hex();
197 match store.transition(id, DraftState::Pending, DraftState::Processing) {
198 Ok(_) => {
199 summary.claimed += 1;
200 span.record("claimed", count_u64(summary.claimed));
201 }
202 Err(LibrarianError::DraftNotFound {
203 state: DraftState::Pending,
204 id: missing,
205 }) if missing == id => {
206 summary.claim_misses += 1;
207 span.record("claim_misses", count_u64(summary.claim_misses));
208 continue;
209 }
210 Err(err) => return Err(err),
211 }
212
213 let decision = processor.process(&draft)?;
214 let final_state = decision.target_state();
215 store.transition(id, DraftState::Processing, final_state)?;
216 summary.record(id_hex, decision, final_state);
217 record_summary_fields(&span, &summary);
218 tracing::info!(
219 target: "mimir.librarian.draft_processed",
220 draft_id = %id,
221 decision = decision.as_str(),
222 final_state = final_state.dir_name(),
223 "draft processed"
224 );
225 }
226
227 Ok(summary)
228}
229
230fn record_summary_fields(span: &tracing::Span, summary: &DraftRunSummary) {
231 span.record(
232 "recovered_processing",
233 count_u64(summary.recovered_processing),
234 );
235 span.record("pending_seen", count_u64(summary.pending_seen));
236 span.record("claimed", count_u64(summary.claimed));
237 span.record("accepted", count_u64(summary.accepted));
238 span.record("skipped", count_u64(summary.skipped));
239 span.record("failed", count_u64(summary.failed));
240 span.record("quarantined", count_u64(summary.quarantined));
241 span.record("deferred", count_u64(summary.deferred));
242 span.record("claim_misses", count_u64(summary.claim_misses));
243}
244
245fn count_u64(value: usize) -> u64 {
246 u64::try_from(value).unwrap_or(u64::MAX)
247}
248
249#[cfg(test)]
250mod tests {
251 use super::*;
252
253 use crate::test_tracing::{capture, FieldValue};
254 use crate::{DraftMetadata, DraftSourceSurface};
255
256 #[derive(Debug)]
257 struct TextMatchingProcessor;
258
259 impl DraftProcessor for TextMatchingProcessor {
260 fn process(&mut self, draft: &Draft) -> Result<DraftProcessingDecision, LibrarianError> {
261 if draft.raw_text().contains("accept") {
262 Ok(DraftProcessingDecision::Accepted)
263 } else if draft.raw_text().contains("skip") {
264 Ok(DraftProcessingDecision::Skipped)
265 } else if draft.raw_text().contains("fail") {
266 Ok(DraftProcessingDecision::Failed)
267 } else if draft.raw_text().contains("quarantine") {
268 Ok(DraftProcessingDecision::Quarantined)
269 } else {
270 Err(LibrarianError::NotYetImplemented {
271 component: "test processor missing decision",
272 })
273 }
274 }
275 }
276
277 #[derive(Debug)]
278 struct SkipProcessor;
279
280 impl DraftProcessor for SkipProcessor {
281 fn process(&mut self, _draft: &Draft) -> Result<DraftProcessingDecision, LibrarianError> {
282 Ok(DraftProcessingDecision::Skipped)
283 }
284 }
285
286 fn draft(text: &str) -> Draft {
287 Draft::with_metadata(
288 text.to_string(),
289 DraftMetadata::new(DraftSourceSurface::Cli, SystemTime::UNIX_EPOCH),
290 )
291 }
292
293 #[test]
294 fn run_once_moves_pending_drafts_to_processor_terminal_states(
295 ) -> Result<(), Box<dyn std::error::Error>> {
296 let tmp = tempfile::tempdir()?;
297 let store = DraftStore::new(tmp.path());
298 let accepted = draft("accept this draft");
299 let skipped = draft("skip this draft");
300 let failed = draft("fail this draft");
301 let quarantined = draft("quarantine this draft");
302 for draft in [&accepted, &skipped, &failed, &quarantined] {
303 store.submit(draft)?;
304 }
305
306 let mut processor = TextMatchingProcessor;
307 let summary = run_once(
308 &store,
309 &mut processor,
310 SystemTime::UNIX_EPOCH + Duration::from_secs(30),
311 Duration::from_secs(10),
312 )?;
313
314 assert_eq!(summary.recovered_processing, 0);
315 assert_eq!(summary.pending_seen, 4);
316 assert_eq!(summary.claimed, 4);
317 assert_eq!(summary.accepted, 1);
318 assert_eq!(summary.skipped, 1);
319 assert_eq!(summary.failed, 1);
320 assert_eq!(summary.quarantined, 1);
321 assert_eq!(summary.deferred, 0);
322 assert_eq!(store.list(DraftState::Pending)?.len(), 0);
323 assert_eq!(
324 store.load(DraftState::Accepted, accepted.id())?.id(),
325 accepted.id()
326 );
327 assert_eq!(
328 store.load(DraftState::Skipped, skipped.id())?.id(),
329 skipped.id()
330 );
331 assert_eq!(
332 store.load(DraftState::Failed, failed.id())?.id(),
333 failed.id()
334 );
335 assert_eq!(
336 store.load(DraftState::Quarantined, quarantined.id())?.id(),
337 quarantined.id()
338 );
339 Ok(())
340 }
341
342 #[test]
343 fn run_once_defers_without_terminal_state() -> Result<(), Box<dyn std::error::Error>> {
344 let tmp = tempfile::tempdir()?;
345 let store = DraftStore::new(tmp.path());
346 let draft = draft("processor is not ready yet");
347 store.submit(&draft)?;
348
349 let mut processor = DeferredDraftProcessor;
350 let summary = run_once(
351 &store,
352 &mut processor,
353 SystemTime::UNIX_EPOCH + Duration::from_secs(30),
354 Duration::from_secs(10),
355 )?;
356
357 assert_eq!(summary.claimed, 1);
358 assert_eq!(summary.deferred, 1);
359 assert_eq!(summary.items[0].final_state, "pending");
360 assert_eq!(store.list(DraftState::Pending)?.len(), 1);
361 assert_eq!(store.list(DraftState::Processing)?.len(), 0);
362 Ok(())
363 }
364
365 #[test]
366 fn run_once_recovers_stale_processing_before_pending_scan(
367 ) -> Result<(), Box<dyn std::error::Error>> {
368 let tmp = tempfile::tempdir()?;
369 let store = DraftStore::new(tmp.path());
370 let draft = draft("recover me first");
371 store.submit(&draft)?;
372 store.transition(draft.id(), DraftState::Pending, DraftState::Processing)?;
373
374 let mut processor = SkipProcessor;
375 let summary = run_once(
376 &store,
377 &mut processor,
378 SystemTime::now() + Duration::from_secs(60),
379 Duration::from_secs(1),
380 )?;
381
382 assert_eq!(summary.recovered_processing, 1);
383 assert_eq!(summary.pending_seen, 1);
384 assert_eq!(summary.skipped, 1);
385 assert_eq!(store.list(DraftState::Processing)?.len(), 0);
386 assert_eq!(store.list(DraftState::Skipped)?.len(), 1);
387 Ok(())
388 }
389
390 #[test]
391 fn run_once_emits_summary_span() -> Result<(), Box<dyn std::error::Error>> {
392 let tmp = tempfile::tempdir()?;
393 let store = DraftStore::new(tmp.path());
394 let accepted = draft("accept this draft");
395 let skipped = draft("skip this draft");
396 store.submit(&accepted)?;
397 store.submit(&skipped)?;
398
399 let mut run_result = None;
400 let shared = capture(|| {
401 let mut processor = TextMatchingProcessor;
402 run_result = Some(run_once(
403 &store,
404 &mut processor,
405 SystemTime::UNIX_EPOCH + Duration::from_secs(30),
406 Duration::from_secs(10),
407 ));
408 });
409 let summary = match run_result {
410 Some(Ok(summary)) => summary,
411 Some(Err(err)) => return Err(Box::new(err)),
412 None => return Err("run did not execute".into()),
413 };
414 assert_eq!(summary.claimed, 2);
415
416 let spans = shared
417 .spans
418 .lock()
419 .map_err(|err| format!("spans lock poisoned: {err}"))?;
420 let Some(span) = spans.iter().find(|span| {
421 span.name == "mimir.librarian.run"
422 && span.fields.get("pending_seen").and_then(FieldValue::as_u64) == Some(2)
423 && span.fields.get("accepted").and_then(FieldValue::as_u64) == Some(1)
424 && span.fields.get("skipped").and_then(FieldValue::as_u64) == Some(1)
425 }) else {
426 return Err("run span missing".into());
427 };
428 assert_eq!(
429 span.fields.get("pending_seen").and_then(FieldValue::as_u64),
430 Some(2),
431 );
432 assert_eq!(
433 span.fields.get("accepted").and_then(FieldValue::as_u64),
434 Some(1),
435 );
436 assert_eq!(
437 span.fields.get("skipped").and_then(FieldValue::as_u64),
438 Some(1),
439 );
440 Ok(())
441 }
442}