1use super::*;
4
5pub(super) fn open_queue_db<P: AsRef<std::path::Path>>(path: P) -> Result<Connection, AppError> {
20 let conn = Connection::open(path)?;
21 conn.pragma_update(None, "journal_mode", "wal")?;
22 conn.pragma_update(None, "busy_timeout", crate::constants::BUSY_TIMEOUT_MILLIS)?;
28 conn.execute_batch(
29 "CREATE TABLE IF NOT EXISTS queue (
30 id INTEGER PRIMARY KEY AUTOINCREMENT,
31 item_key TEXT NOT NULL UNIQUE,
32 item_type TEXT NOT NULL DEFAULT 'memory',
33 status TEXT NOT NULL DEFAULT 'pending',
34 memory_id INTEGER,
35 entity_id INTEGER,
36 entities INTEGER DEFAULT 0,
37 rels INTEGER DEFAULT 0,
38 error TEXT,
39 cost_usd REAL DEFAULT 0.0,
40 attempt INTEGER DEFAULT 0,
41 elapsed_ms INTEGER,
42 created_at TEXT DEFAULT (datetime('now')),
43 done_at TEXT
44 );
45 CREATE INDEX IF NOT EXISTS idx_enrich_queue_status ON queue(status);",
46 )?;
47 let mut has_error_class = false;
51 let mut has_next_retry_at = false;
52 let mut has_operation = false;
57 let mut has_finish_reason = false;
63 let mut has_input_tokens = false;
64 let mut has_output_tokens = false;
65 {
66 let mut stmt = conn.prepare("PRAGMA table_info(queue)")?;
67 let names = stmt.query_map([], |r| r.get::<_, String>(1))?;
68 for name in names {
69 match name?.as_str() {
70 "error_class" => has_error_class = true,
71 "next_retry_at" => has_next_retry_at = true,
72 "operation" => has_operation = true,
73 "finish_reason" => has_finish_reason = true,
74 "input_tokens" => has_input_tokens = true,
75 "output_tokens" => has_output_tokens = true,
76 _ => {}
77 }
78 }
79 }
80 if !has_error_class {
81 conn.execute_batch("ALTER TABLE queue ADD COLUMN error_class TEXT")?;
82 }
83 if !has_next_retry_at {
84 conn.execute_batch("ALTER TABLE queue ADD COLUMN next_retry_at TEXT")?;
85 }
86 if !has_operation {
87 conn.execute_batch("ALTER TABLE queue ADD COLUMN operation TEXT")?;
88 }
89 if !has_finish_reason {
90 conn.execute_batch("ALTER TABLE queue ADD COLUMN finish_reason TEXT")?;
91 }
92 if !has_input_tokens {
93 conn.execute_batch("ALTER TABLE queue ADD COLUMN input_tokens INTEGER")?;
94 }
95 if !has_output_tokens {
96 conn.execute_batch("ALTER TABLE queue ADD COLUMN output_tokens INTEGER")?;
97 }
98 conn.execute_batch(
99 "CREATE INDEX IF NOT EXISTS idx_enrich_queue_eligible ON queue(status, next_retry_at);
100 CREATE INDEX IF NOT EXISTS idx_enrich_queue_operation ON queue(operation, status);
101 CREATE INDEX IF NOT EXISTS idx_enrich_queue_memory ON queue(memory_id)",
102 )?;
103 Ok(conn)
104}
105
106pub(super) fn enqueue_candidate(
114 queue_conn: &Connection,
115 main_conn: &Connection,
116 namespace: &str,
117 key: &str,
118 item_type: &str,
119 operation: &str,
120) {
121 let memory_id: Option<i64> = if item_type == "memory" {
122 main_conn
123 .query_row(
124 "SELECT id FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
125 rusqlite::params![namespace, key],
126 |r| r.get(0),
127 )
128 .ok()
129 } else {
130 None
131 };
132 if let Err(e) = queue_conn.execute(
133 "INSERT OR IGNORE INTO queue (item_key, item_type, status, operation, memory_id) \
134 VALUES (?1, ?2, 'pending', ?3, ?4)",
135 rusqlite::params![key, item_type, operation, memory_id],
136 ) {
137 tracing::warn!(target: "enrich", error = %e, "queue insert failed");
138 }
139}
140
141pub(super) fn skipped_item_keys(
149 conn: &Connection,
150 operation: &str,
151) -> Result<std::collections::HashSet<String>, AppError> {
152 let mut stmt = conn.prepare(
153 "SELECT item_key FROM queue WHERE status='skipped' AND (operation = ?1 OR operation IS NULL)",
154 )?;
155 let keys = stmt
156 .query_map(rusqlite::params![operation], |r| r.get::<_, String>(0))?
157 .collect::<Result<std::collections::HashSet<String>, _>>()?;
158 Ok(keys)
159}
160
161pub(super) fn item_type_for(operation: &EnrichOperation) -> &'static str {
164 match operation {
165 EnrichOperation::EntityDescriptions => "entity",
166 _ => "memory",
167 }
168}
169
170pub fn cleanup_queue_entry(db_path: &std::path::Path, memory_id: i64, name: &str) {
179 let queue_path = crate::paths::sidecar_path(db_path, ".enrich-queue.sqlite");
180 if !queue_path.exists() {
181 return;
182 }
183 match open_queue_db(&queue_path) {
184 Ok(conn) => {
185 if let Err(e) = conn.execute(
186 "DELETE FROM queue WHERE memory_id = ?1 OR item_key = ?2",
187 rusqlite::params![memory_id, name],
188 ) {
189 tracing::warn!(target: "enrich", error = %e, memory_id, "enrich-queue cleanup failed");
190 }
191 }
192 Err(e) => {
193 tracing::warn!(target: "enrich", error = %e, "enrich-queue cleanup skipped (open failed)");
194 }
195 }
196}
197
198pub(super) fn prune_dead_orphans(
209 queue_conn: &Connection,
210 main_conn: &Connection,
211 operation: &str,
212 namespace: &str,
213) -> Result<i64, AppError> {
214 let dead: Vec<(i64, String)> = {
215 let mut stmt = queue_conn.prepare(
216 "SELECT id, item_key FROM queue \
217 WHERE status='dead' AND item_type='memory' \
218 AND (operation = ?1 OR operation IS NULL) ORDER BY id",
219 )?;
220 let rows = stmt
221 .query_map(rusqlite::params![operation], |r| Ok((r.get(0)?, r.get(1)?)))?
222 .collect::<Result<Vec<_>, _>>()?;
223 rows
224 };
225 let mut pruned = 0_i64;
226 for (id, name) in dead {
227 let exists = main_conn
228 .query_row(
229 "SELECT 1 FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
230 rusqlite::params![namespace, name],
231 |_| Ok(()),
232 )
233 .is_ok();
234 if !exists {
235 queue_conn.execute("DELETE FROM queue WHERE id=?1", rusqlite::params![id])?;
236 pruned += 1;
237 }
238 }
239 if pruned > 0 {
240 let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
241 }
242 Ok(pruned)
243}
244
245#[derive(Debug, Serialize, schemars::JsonSchema)]
255pub struct EnrichStatus {
256 pub(super) status_report: bool,
257 pub(super) operation: String,
258 pub(super) namespace: String,
259 pub(super) unbound_backlog: usize,
260 pub(super) scan_backlog: i64,
267 pub(super) queue_pending: i64,
268 pub(super) queue_processing: i64,
269 pub(super) queue_done: i64,
270 pub(super) queue_failed: i64,
271 pub(super) queue_skipped: i64,
272 pub(super) queue_dead: i64,
273 pub(super) eligible_now: i64,
274 pub(super) waiting: i64,
275 pub(super) state: &'static str,
281 pub(super) waiting_items: Vec<WaitingItem>,
284}
285
286#[derive(Debug, Serialize, schemars::JsonSchema)]
288pub struct WaitingItem {
289 pub(super) item_key: String,
290 pub(super) attempt: i64,
291 pub(super) next_retry_at: Option<String>,
292 pub(super) error_class: Option<String>,
293}
294
295#[derive(Debug, Serialize, schemars::JsonSchema)]
297pub struct DeadItem {
298 pub(super) dead_item: bool,
299 pub(super) item_key: String,
300 pub(super) item_type: String,
301 pub(super) attempt: i64,
302 pub(super) error_class: Option<String>,
303 pub(super) error: Option<String>,
304 pub(super) finish_reason: Option<String>,
309 pub(super) input_tokens: Option<i64>,
311 pub(super) output_tokens: Option<i64>,
313}
314
315#[derive(Debug, Serialize, schemars::JsonSchema)]
317pub struct DeadSummary {
318 pub(super) summary: bool,
319 pub(super) operation: String,
320 pub(super) namespace: String,
321 pub(super) action: &'static str,
323 pub(super) dead_total: i64,
324 pub(super) requeued: i64,
325 pub(super) pruned: i64,
329}
330
331pub(super) fn classify_enrich_outcome(e: &AppError) -> crate::retry::AttemptOutcome {
342 use crate::retry::AttemptOutcome;
343 match e {
344 AppError::RateLimited { .. } | AppError::Timeout { .. } | AppError::DbBusy(_) => {
345 AttemptOutcome::Transient
346 }
347 AppError::EntityNotYetMaterialized { .. } => AttemptOutcome::Transient,
355 AppError::ProviderError { .. }
361 | AppError::NotFound(_)
362 | AppError::MemoryNotFound { .. }
363 | AppError::MemoryNotFoundById { .. } => AttemptOutcome::HardFailure,
364 AppError::Database(_) => {
368 if crate::storage::utils::is_sqlite_busy(e) {
369 AttemptOutcome::Transient
370 } else {
371 AttemptOutcome::HardFailure
372 }
373 }
374 AppError::Embedding(_) => AttemptOutcome::Transient,
385 _ => AttemptOutcome::HardFailure,
396 }
397}
398
399pub(super) fn record_item_failure(
412 queue_conn: &rusqlite::Connection,
413 queue_id: i64,
414 attempt: i64,
415 max_attempts: u32,
416 err: &AppError,
417) -> crate::retry::AttemptOutcome {
418 let outcome = classify_enrich_outcome(err);
419 let err_str = format!("{err}");
420 record_item_failure_typed(
421 queue_conn,
422 queue_id,
423 attempt,
424 max_attempts,
425 outcome,
426 &err_str,
427 None,
428 None,
429 None,
430 )
431}
432
433#[allow(clippy::too_many_arguments)]
442pub(super) fn record_item_failure_typed(
443 queue_conn: &rusqlite::Connection,
444 queue_id: i64,
445 attempt: i64,
446 max_attempts: u32,
447 outcome: crate::retry::AttemptOutcome,
448 err_str: &str,
449 finish_reason: Option<&str>,
450 input_tokens: Option<i64>,
451 output_tokens: Option<i64>,
452) -> crate::retry::AttemptOutcome {
453 use crate::retry::AttemptOutcome;
454 let error_class = match outcome {
455 AttemptOutcome::Transient => "transient",
456 AttemptOutcome::HardFailure => "permanent",
457 AttemptOutcome::Success => "success",
458 };
459
460 let terminal = matches!(outcome, AttemptOutcome::HardFailure) || attempt >= max_attempts as i64;
461 if terminal {
462 let _ = queue_conn.execute(
463 "UPDATE queue SET status='dead', error=?1, error_class=?2, done_at=datetime('now'), \
464 finish_reason=?3, input_tokens=?4, output_tokens=?5 WHERE id=?6",
465 rusqlite::params![
466 err_str,
467 error_class,
468 finish_reason,
469 input_tokens,
470 output_tokens,
471 queue_id
472 ],
473 );
474 } else {
475 let delay = crate::retry::compute_delay(
476 &crate::retry::RetryConfig::llm_rate_limit(),
477 attempt.max(0) as u32,
478 );
479 let secs = delay.as_secs().max(1);
480 let modifier = format!("+{secs} seconds");
481 let _ = queue_conn.execute(
482 "UPDATE queue SET status='pending', error=?1, error_class=?2, next_retry_at=datetime('now', ?3), \
483 finish_reason=?4, input_tokens=?5, output_tokens=?6 WHERE id=?7",
484 rusqlite::params![
485 err_str,
486 error_class,
487 modifier,
488 finish_reason,
489 input_tokens,
490 output_tokens,
491 queue_id
492 ],
493 );
494 }
495 outcome
496}
497
498pub(super) enum DequeueOutcome {
506 Claimed((i64, String, String, i64)),
507 Empty,
508}
509
510pub(super) fn dequeue_next_pending(
511 queue_conn: &rusqlite::Connection,
512 backoff_clause: &str,
513) -> Result<DequeueOutcome, AppError> {
514 let dequeue_sql = format!(
515 "UPDATE queue SET status='processing', attempt=attempt+1 \
516 WHERE id = (SELECT id FROM queue WHERE status='pending' {backoff_clause} \
517 ORDER BY id LIMIT 1) \
518 RETURNING id, item_key, item_type, attempt"
519 );
520 match queue_conn.query_row(&dequeue_sql, [], |row| {
521 Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?))
522 }) {
523 Ok(claimed) => Ok(DequeueOutcome::Claimed(claimed)),
524 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(DequeueOutcome::Empty),
525 Err(e) => Err(AppError::Database(e)),
526 }
527}
528
529#[cfg(test)]
534mod tests {
535 use super::*;
536
537 fn open_test_db() -> Connection {
538 let conn = Connection::open_in_memory().expect("in-memory db");
539 conn.execute_batch(
540 "CREATE TABLE memories (
541 id INTEGER PRIMARY KEY AUTOINCREMENT,
542 namespace TEXT NOT NULL DEFAULT 'global',
543 name TEXT NOT NULL,
544 type TEXT NOT NULL DEFAULT 'note',
545 description TEXT NOT NULL DEFAULT '',
546 body TEXT NOT NULL DEFAULT '',
547 body_hash TEXT NOT NULL DEFAULT '',
548 session_id TEXT,
549 source TEXT NOT NULL DEFAULT 'agent',
550 metadata TEXT NOT NULL DEFAULT '{}',
551 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
552 updated_at INTEGER NOT NULL DEFAULT (unixepoch()),
553 deleted_at INTEGER,
554 UNIQUE(namespace, name)
555 );",
556 )
557 .expect("schema creation must succeed");
558 conn
559 }
560
561 fn open_temp_queue() -> (Connection, String) {
562 let path = format!(
563 "/tmp/test-enrich-dl-{}-{}.sqlite",
564 std::process::id(),
565 fastrand::u64(..)
566 );
567 let conn = open_queue_db(&path).expect("queue db must open");
568 (conn, path)
569 }
570
571 fn insert_pending(conn: &Connection, key: &str) -> i64 {
572 conn.execute(
573 "INSERT INTO queue (item_key, item_type, status) VALUES (?1, 'memory', 'pending')",
574 rusqlite::params![key],
575 )
576 .unwrap();
577 conn.last_insert_rowid()
578 }
579
580 #[test]
581 fn queue_db_schema_creates_correctly() {
582 let tmp_path = format!("/tmp/test-enrich-queue-{}.sqlite", std::process::id());
583 let conn = open_queue_db(&tmp_path).expect("queue db must open");
584 let count: i64 = conn
585 .query_row("SELECT COUNT(*) FROM queue", [], |r| r.get(0))
586 .unwrap();
587 assert_eq!(count, 0);
588 let _ = std::fs::remove_file(&tmp_path);
589 }
590
591 #[test]
592 fn classify_rate_limit_is_transient() {
593 let e = AppError::RateLimited {
594 detail: "429".into(),
595 };
596 assert_eq!(
597 classify_enrich_outcome(&e),
598 crate::retry::AttemptOutcome::Transient
599 );
600 }
601
602 #[test]
603 fn classify_timeout_and_dbbusy_are_transient() {
604 let t = AppError::Timeout {
605 operation: "judge".into(),
606 duration_secs: 30,
607 };
608 let b = AppError::DbBusy("locked".into());
609 assert_eq!(
610 classify_enrich_outcome(&t),
611 crate::retry::AttemptOutcome::Transient
612 );
613 assert_eq!(
614 classify_enrich_outcome(&b),
615 crate::retry::AttemptOutcome::Transient
616 );
617 }
618
619 #[test]
620 fn classify_validation_and_parse_are_hard_failure() {
621 let v = AppError::Validation("failed to parse entities array: bad".into());
622 assert_eq!(
623 classify_enrich_outcome(&v),
624 crate::retry::AttemptOutcome::HardFailure
625 );
626 }
627
628 #[test]
629 fn open_queue_db_alter_is_idempotent() {
630 let path = format!(
631 "/tmp/test-enrich-idem-{}-{}.sqlite",
632 std::process::id(),
633 fastrand::u64(..)
634 );
635 let _ = open_queue_db(&path).expect("first open");
636 let conn = open_queue_db(&path).expect("second open is idempotent");
637 let cols: Vec<String> = {
638 let mut stmt = conn.prepare("PRAGMA table_info(queue)").unwrap();
639 stmt.query_map([], |r| r.get::<_, String>(1))
640 .unwrap()
641 .collect::<Result<Vec<_>, _>>()
642 .unwrap()
643 };
644 assert!(cols.iter().any(|c| c == "error_class"));
645 assert!(cols.iter().any(|c| c == "next_retry_at"));
646 let _ = std::fs::remove_file(&path);
647 }
648
649 #[test]
650 fn record_item_failure_hard_marks_dead() {
651 let (conn, path) = open_temp_queue();
652 let id = insert_pending(&conn, "mem-hard");
653 let outcome = record_item_failure(
654 &conn,
655 id,
656 1,
657 5,
658 &AppError::Validation("invalid body".into()),
659 );
660 assert_eq!(outcome, crate::retry::AttemptOutcome::HardFailure);
661 let status: String = conn
662 .query_row(
663 "SELECT status FROM queue WHERE id=?1",
664 rusqlite::params![id],
665 |r| r.get(0),
666 )
667 .unwrap();
668 assert_eq!(status, "dead");
669 let _ = std::fs::remove_file(&path);
670 }
671
672 #[test]
673 fn record_item_failure_transient_reschedules_pending() {
674 let (conn, path) = open_temp_queue();
675 let id = insert_pending(&conn, "mem-transient");
676 let outcome = record_item_failure(
677 &conn,
678 id,
679 1,
680 5,
681 &AppError::RateLimited {
682 detail: "429".into(),
683 },
684 );
685 assert_eq!(outcome, crate::retry::AttemptOutcome::Transient);
686 let (status, future): (String, i64) = conn
687 .query_row(
688 "SELECT status, (next_retry_at > datetime('now')) FROM queue WHERE id=?1",
689 rusqlite::params![id],
690 |r| Ok((r.get(0)?, r.get(1)?)),
691 )
692 .unwrap();
693 assert_eq!(status, "pending");
694 assert_eq!(future, 1, "next_retry_at must be in the future");
695 let _ = std::fs::remove_file(&path);
696 }
697
698 #[test]
699 fn record_item_failure_transient_at_cap_marks_dead() {
700 let (conn, path) = open_temp_queue();
701 let id = insert_pending(&conn, "mem-cap");
702 let outcome = record_item_failure(
703 &conn,
704 id,
705 5,
706 5,
707 &AppError::RateLimited {
708 detail: "429".into(),
709 },
710 );
711 assert_eq!(outcome, crate::retry::AttemptOutcome::Transient);
712 let status: String = conn
713 .query_row(
714 "SELECT status FROM queue WHERE id=?1",
715 rusqlite::params![id],
716 |r| r.get(0),
717 )
718 .unwrap();
719 assert_eq!(status, "dead");
720 let _ = std::fs::remove_file(&path);
721 }
722
723 #[test]
724 fn dequeue_skips_future_retry_and_dead() {
725 let (conn, path) = open_temp_queue();
726 let eligible = insert_pending(&conn, "mem-eligible");
727 let waiting = insert_pending(&conn, "mem-waiting");
728 conn.execute(
729 "UPDATE queue SET next_retry_at=datetime('now', '+3600 seconds') WHERE id=?1",
730 rusqlite::params![waiting],
731 )
732 .unwrap();
733 let dead = insert_pending(&conn, "mem-dead");
734 conn.execute(
735 "UPDATE queue SET status='dead' WHERE id=?1",
736 rusqlite::params![dead],
737 )
738 .unwrap();
739
740 let claimed: Option<i64> = conn
741 .query_row(
742 "UPDATE queue SET status='processing', attempt=attempt+1 \
743 WHERE id = (SELECT id FROM queue WHERE status='pending' \
744 AND (next_retry_at IS NULL OR next_retry_at <= datetime('now')) \
745 ORDER BY id LIMIT 1) \
746 RETURNING id",
747 [],
748 |r| r.get(0),
749 )
750 .ok();
751 assert_eq!(claimed, Some(eligible));
752
753 let second: Option<i64> = conn
754 .query_row(
755 "UPDATE queue SET status='processing', attempt=attempt+1 \
756 WHERE id = (SELECT id FROM queue WHERE status='pending' \
757 AND (next_retry_at IS NULL OR next_retry_at <= datetime('now')) \
758 ORDER BY id LIMIT 1) \
759 RETURNING id",
760 [],
761 |r| r.get(0),
762 )
763 .ok();
764 assert_eq!(second, None);
765 let _ = std::fs::remove_file(&path);
766 }
767
768 #[test]
769 fn classify_validation_never_infers_transience_from_message() {
770 for msg in [
778 "model 'x' returned non-object JSON after repair (got string)",
779 "model 'x' returned content that could not be parsed even after JSON repair",
780 "model 'x' returned no structured content",
781 "LLM result missing 'description' field",
782 "LLM result missing 'enriched_body' field",
783 ] {
784 assert_eq!(
785 classify_enrich_outcome(&AppError::Validation(msg.into())),
786 crate::retry::AttemptOutcome::HardFailure,
787 "expected hard failure for: {msg}"
788 );
789 }
790 }
791
792 #[test]
793 fn classify_embedding_error_is_transient_floor() {
794 assert_eq!(
795 classify_enrich_outcome(&AppError::Embedding("dimension mismatch".into())),
796 crate::retry::AttemptOutcome::Transient
797 );
798 }
799
800 #[test]
803 fn classify_entity_not_yet_materialized_is_transient() {
804 assert_eq!(
805 classify_enrich_outcome(&AppError::EntityNotYetMaterialized {
806 name: "acme".into(),
807 namespace: "global".into(),
808 }),
809 crate::retry::AttemptOutcome::Transient
810 );
811 }
812
813 #[test]
814 fn classify_memory_absence_stays_hard_failure() {
815 assert_eq!(
816 classify_enrich_outcome(&AppError::MemoryNotFound {
817 name: "mem-x".into(),
818 namespace: "global".into(),
819 }),
820 crate::retry::AttemptOutcome::HardFailure
821 );
822 assert_eq!(
823 classify_enrich_outcome(&AppError::MemoryNotFoundById { id: 42 }),
824 crate::retry::AttemptOutcome::HardFailure
825 );
826 assert_eq!(
827 classify_enrich_outcome(&AppError::NotFound("gone".into())),
828 crate::retry::AttemptOutcome::HardFailure
829 );
830 }
831
832 #[test]
833 fn classify_database_busy_is_transient_non_busy_is_hard() {
834 let busy = AppError::Database(rusqlite::Error::SqliteFailure(
835 rusqlite::ffi::Error::new(rusqlite::ffi::SQLITE_BUSY),
836 Some("database is locked".into()),
837 ));
838 assert_eq!(
839 classify_enrich_outcome(&busy),
840 crate::retry::AttemptOutcome::Transient
841 );
842 let constraint = AppError::Database(rusqlite::Error::SqliteFailure(
843 rusqlite::ffi::Error::new(rusqlite::ffi::SQLITE_CONSTRAINT),
844 Some("UNIQUE constraint failed".into()),
845 ));
846 assert_eq!(
847 classify_enrich_outcome(&constraint),
848 crate::retry::AttemptOutcome::HardFailure
849 );
850 }
851
852 #[test]
853 fn record_item_failure_typed_persists_diagnostics_on_dead_letter() {
854 let (conn, path) = open_temp_queue();
855 let id = insert_pending(&conn, "mem-diag");
856 let outcome = record_item_failure_typed(
857 &conn,
858 id,
859 1,
860 5,
861 crate::retry::AttemptOutcome::HardFailure,
862 "truncated response",
863 Some("length"),
864 Some(120),
865 Some(4096),
866 );
867 assert_eq!(outcome, crate::retry::AttemptOutcome::HardFailure);
868 let (status, finish_reason, input_tokens, output_tokens): (
869 String,
870 Option<String>,
871 Option<i64>,
872 Option<i64>,
873 ) = conn
874 .query_row(
875 "SELECT status, finish_reason, input_tokens, output_tokens FROM queue WHERE id=?1",
876 rusqlite::params![id],
877 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
878 )
879 .unwrap();
880 assert_eq!(status, "dead");
881 assert_eq!(finish_reason.as_deref(), Some("length"));
882 assert_eq!(input_tokens, Some(120));
883 assert_eq!(output_tokens, Some(4096));
884 let _ = std::fs::remove_file(&path);
885 }
886
887 #[test]
888 fn record_item_failure_typed_reschedules_transient_below_max_attempts() {
889 let (conn, path) = open_temp_queue();
895 let id = insert_pending(&conn, "mem-retry");
896 let outcome = record_item_failure_typed(
897 &conn,
898 id,
899 1,
900 5,
901 crate::retry::AttemptOutcome::Transient,
902 "truncated response",
903 Some("length"),
904 Some(120),
905 Some(64),
906 );
907 assert_eq!(outcome, crate::retry::AttemptOutcome::Transient);
908 let (status, error_class, finish_reason, next_retry_at): (
909 String,
910 String,
911 Option<String>,
912 Option<String>,
913 ) = conn
914 .query_row(
915 "SELECT status, error_class, finish_reason, next_retry_at FROM queue WHERE id=?1",
916 rusqlite::params![id],
917 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
918 )
919 .unwrap();
920 assert_eq!(status, "pending");
921 assert_eq!(error_class, "transient");
922 assert_eq!(finish_reason.as_deref(), Some("length"));
923 assert!(
924 next_retry_at.is_some(),
925 "a rescheduled item must carry a next_retry_at"
926 );
927 let _ = std::fs::remove_file(&path);
928 }
929
930 #[test]
940 fn with_busy_retry_bounds_dequeue_under_sustained_contention() {
941 let (conn, path) = open_temp_queue();
942 insert_pending(&conn, "mem-busy");
943 conn.pragma_update(None, "busy_timeout", 0i64)
944 .expect("busy_timeout override must succeed");
945
946 let blocker = Connection::open(&path).expect("blocker connection must open");
950 blocker
951 .execute_batch("BEGIN EXCLUSIVE;")
952 .expect("exclusive lock must be acquired");
953
954 let calls = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
955 let calls_clone = std::sync::Arc::clone(&calls);
956 let result: Result<DequeueOutcome, AppError> =
957 crate::storage::utils::with_busy_retry(|| {
958 calls_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
959 dequeue_next_pending(&conn, "")
960 });
961
962 assert!(
963 matches!(result, Err(AppError::DbBusy(_))),
964 "sustained SQLITE_BUSY must convert to DbBusy, not hang or silently report Empty"
965 );
966 assert_eq!(
967 calls.load(std::sync::atomic::Ordering::SeqCst),
968 crate::constants::MAX_SQLITE_BUSY_RETRIES,
969 "must attempt exactly MAX_SQLITE_BUSY_RETRIES times, never retry unbounded"
970 );
971
972 blocker
973 .execute_batch("ROLLBACK;")
974 .expect("releasing the exclusive lock must succeed");
975 let _ = std::fs::remove_file(&path);
976 }
977
978 #[test]
979 fn dequeue_next_pending_distinguishes_empty_from_claimed() {
980 let (conn, path) = open_temp_queue();
981 let id = insert_pending(&conn, "mem-dequeue");
982 let claimed = dequeue_next_pending(&conn, "").expect("dequeue must succeed");
983 match claimed {
984 DequeueOutcome::Claimed((claimed_id, key, _, _)) => {
985 assert_eq!(claimed_id, id);
986 assert_eq!(key, "mem-dequeue");
987 }
988 DequeueOutcome::Empty => panic!("expected a claimed row"),
989 }
990 let empty = dequeue_next_pending(&conn, "").expect("dequeue must succeed");
991 assert!(matches!(empty, DequeueOutcome::Empty));
992 let _ = std::fs::remove_file(&path);
993 }
994
995 #[test]
996 fn classify_provider_error_and_not_found_are_hard() {
997 assert_eq!(
998 classify_enrich_outcome(&AppError::ProviderError {
999 code: "400".into(),
1000 message: "context length exceeded".into(),
1001 }),
1002 crate::retry::AttemptOutcome::HardFailure
1003 );
1004 assert_eq!(
1005 classify_enrich_outcome(&AppError::NotFound("memory 'gone' not found".into())),
1006 crate::retry::AttemptOutcome::HardFailure
1007 );
1008 }
1009
1010 #[test]
1011 fn open_queue_db_migrates_operation_column() {
1012 let (conn, path) = open_temp_queue();
1013 drop(conn);
1014 let conn = open_queue_db(&path).expect("second open is idempotent");
1015 let cols: Vec<String> = {
1016 let mut stmt = conn.prepare("PRAGMA table_info(queue)").unwrap();
1017 stmt.query_map([], |r| r.get::<_, String>(1))
1018 .unwrap()
1019 .collect::<Result<Vec<_>, _>>()
1020 .unwrap()
1021 };
1022 assert!(cols.iter().any(|c| c == "operation"));
1023 assert!(cols.iter().any(|c| c == "memory_id"));
1024 let _ = std::fs::remove_file(&path);
1025 }
1026
1027 #[test]
1028 fn enqueue_candidate_tags_operation_and_memory_id() {
1029 let main = open_test_db();
1030 main.execute(
1031 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'mem-x', 'body')",
1032 [],
1033 )
1034 .unwrap();
1035 let mem_id: i64 = main
1036 .query_row("SELECT id FROM memories WHERE name='mem-x'", [], |r| {
1037 r.get(0)
1038 })
1039 .unwrap();
1040 let (queue, path) = open_temp_queue();
1041 enqueue_candidate(&queue, &main, "global", "mem-x", "memory", "MemoryBindings");
1042 let (op, mid): (String, i64) = queue
1043 .query_row(
1044 "SELECT operation, memory_id FROM queue WHERE item_key='mem-x'",
1045 [],
1046 |r| Ok((r.get(0)?, r.get(1)?)),
1047 )
1048 .unwrap();
1049 assert_eq!(op, "MemoryBindings");
1050 assert_eq!(mid, mem_id);
1051 let _ = std::fs::remove_file(&path);
1052 }
1053
1054 #[test]
1055 fn requeue_dead_resurrects_dead_rows() {
1056 let (conn, path) = open_temp_queue();
1057 conn.execute(
1058 "INSERT INTO queue (item_key, item_type, status, operation, attempt, error, error_class, next_retry_at) \
1059 VALUES ('mem-dead', 'memory', 'dead', 'MemoryBindings', 8, 'boom', 'permanent', datetime('now'))",
1060 [],
1061 )
1062 .unwrap();
1063 let n = conn
1064 .execute(
1065 "UPDATE queue SET status='pending', attempt=0, next_retry_at=NULL, \
1066 error=NULL, error_class=NULL \
1067 WHERE status='dead' AND (operation = ?1 OR operation IS NULL)",
1068 rusqlite::params!["MemoryBindings"],
1069 )
1070 .unwrap();
1071 assert_eq!(n, 1);
1072 let (status, attempt, nra): (String, i64, Option<String>) = conn
1073 .query_row(
1074 "SELECT status, attempt, next_retry_at FROM queue WHERE item_key='mem-dead'",
1075 [],
1076 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1077 )
1078 .unwrap();
1079 assert_eq!(status, "pending");
1080 assert_eq!(attempt, 0);
1081 assert!(nra.is_none());
1082 let _ = std::fs::remove_file(&path);
1083 }
1084
1085 #[test]
1086 fn skipped_item_keys_excludes_only_skipped_for_operation() {
1087 let (conn, path) = open_temp_queue();
1092 conn.execute(
1093 "INSERT INTO queue (item_key, item_type, status, operation) VALUES ('mem-vetoed', 'memory', 'skipped', 'BodyEnrich')",
1094 [],
1095 )
1096 .unwrap();
1097 conn.execute(
1098 "INSERT INTO queue (item_key, item_type, status, operation) VALUES ('mem-pending', 'memory', 'pending', 'BodyEnrich')",
1099 [],
1100 )
1101 .unwrap();
1102 conn.execute(
1103 "INSERT INTO queue (item_key, item_type, status, operation) VALUES ('mem-other-op', 'memory', 'skipped', 'MemoryBindings')",
1104 [],
1105 )
1106 .unwrap();
1107 let keys = skipped_item_keys(&conn, "BodyEnrich").unwrap();
1108 assert!(
1109 keys.contains("mem-vetoed"),
1110 "vetoed BodyEnrich item must be excluded from scan"
1111 );
1112 assert!(
1113 !keys.contains("mem-pending"),
1114 "pending item is still actionable"
1115 );
1116 assert!(
1117 !keys.contains("mem-other-op"),
1118 "skipped item from another operation must not leak"
1119 );
1120 assert_eq!(keys.len(), 1);
1121 let _ = std::fs::remove_file(&path);
1122 }
1123
1124 #[test]
1125 fn cascade_cleanup_delete_targets_memory_id_and_name() {
1126 let (conn, path) = open_temp_queue();
1127 conn.execute(
1128 "INSERT INTO queue (item_key, item_type, status, memory_id) VALUES ('by-id', 'memory', 'done', 42)",
1129 [],
1130 )
1131 .unwrap();
1132 conn.execute(
1133 "INSERT INTO queue (item_key, item_type, status) VALUES ('by-name', 'memory', 'pending')",
1134 [],
1135 )
1136 .unwrap();
1137 let removed = conn
1138 .execute(
1139 "DELETE FROM queue WHERE memory_id = ?1 OR item_key = ?2",
1140 rusqlite::params![42_i64, "by-name"],
1141 )
1142 .unwrap();
1143 assert_eq!(removed, 2);
1144 let remaining: i64 = conn
1145 .query_row("SELECT COUNT(*) FROM queue", [], |r| r.get(0))
1146 .unwrap();
1147 assert_eq!(remaining, 0);
1148 let _ = std::fs::remove_file(&path);
1149 }
1150
1151 #[test]
1152 fn item_type_for_maps_entity_and_memory() {
1153 assert_eq!(
1154 item_type_for(&EnrichOperation::EntityDescriptions),
1155 "entity"
1156 );
1157 assert_eq!(item_type_for(&EnrichOperation::MemoryBindings), "memory");
1158 assert_eq!(item_type_for(&EnrichOperation::AugmentBindings), "memory");
1159 assert_eq!(item_type_for(&EnrichOperation::BodyExtract), "memory");
1160 }
1161
1162 #[test]
1163 fn prune_dead_orphans_removes_only_orphan_memory_rows() {
1164 let main = open_test_db();
1165 main.execute(
1167 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'alive', 'b')",
1168 [],
1169 )
1170 .unwrap();
1171 let (queue, path) = open_temp_queue();
1172 queue
1174 .execute(
1175 "INSERT INTO queue (item_key, item_type, status, operation, error_class) \
1176 VALUES ('gone', 'memory', 'dead', 'MemoryBindings', 'permanent')",
1177 [],
1178 )
1179 .unwrap();
1180 queue
1182 .execute(
1183 "INSERT INTO queue (item_key, item_type, status, operation, error_class) \
1184 VALUES ('alive', 'memory', 'dead', 'MemoryBindings', 'permanent')",
1185 [],
1186 )
1187 .unwrap();
1188 queue
1190 .execute(
1191 "INSERT INTO queue (item_key, item_type, status, operation) \
1192 VALUES ('some-entity', 'entity', 'dead', 'EntityDescriptions')",
1193 [],
1194 )
1195 .unwrap();
1196
1197 let pruned = prune_dead_orphans(&queue, &main, "MemoryBindings", "global").unwrap();
1198 assert_eq!(pruned, 1, "only the orphan memory row is pruned");
1199
1200 let remaining: Vec<String> = {
1201 let mut stmt = queue
1202 .prepare("SELECT item_key FROM queue ORDER BY item_key")
1203 .unwrap();
1204 stmt.query_map([], |r| r.get::<_, String>(0))
1205 .unwrap()
1206 .collect::<Result<Vec<_>, _>>()
1207 .unwrap()
1208 };
1209 assert_eq!(remaining, vec!["alive", "some-entity"]);
1210 let _ = std::fs::remove_file(&path);
1211 }
1212}