1use super::*;
4
5pub(super) fn open_queue_db<P: AsRef<std::path::Path>>(path: P) -> Result<Connection, AppError> {
20 let conn = Connection::open(path)?;
21 conn.pragma_update(None, "journal_mode", "wal")?;
22 conn.pragma_update(None, "busy_timeout", crate::constants::BUSY_TIMEOUT_MILLIS)?;
28 conn.execute_batch(
29 "CREATE TABLE IF NOT EXISTS queue (
30 id INTEGER PRIMARY KEY AUTOINCREMENT,
31 item_key TEXT NOT NULL UNIQUE,
32 item_type TEXT NOT NULL DEFAULT 'memory',
33 status TEXT NOT NULL DEFAULT 'pending',
34 memory_id INTEGER,
35 entity_id INTEGER,
36 entities INTEGER DEFAULT 0,
37 rels INTEGER DEFAULT 0,
38 error TEXT,
39 cost_usd REAL DEFAULT 0.0,
40 attempt INTEGER DEFAULT 0,
41 elapsed_ms INTEGER,
42 created_at TEXT DEFAULT (datetime('now')),
43 done_at TEXT
44 );
45 CREATE INDEX IF NOT EXISTS idx_enrich_queue_status ON queue(status);",
46 )?;
47 let mut has_error_class = false;
51 let mut has_next_retry_at = false;
52 let mut has_operation = false;
57 let mut has_finish_reason = false;
63 let mut has_input_tokens = false;
64 let mut has_output_tokens = false;
65 {
66 let mut stmt = conn.prepare("PRAGMA table_info(queue)")?;
67 let names = stmt.query_map([], |r| r.get::<_, String>(1))?;
68 for name in names {
69 match name?.as_str() {
70 "error_class" => has_error_class = true,
71 "next_retry_at" => has_next_retry_at = true,
72 "operation" => has_operation = true,
73 "finish_reason" => has_finish_reason = true,
74 "input_tokens" => has_input_tokens = true,
75 "output_tokens" => has_output_tokens = true,
76 _ => {}
77 }
78 }
79 }
80 if !has_error_class {
81 conn.execute_batch("ALTER TABLE queue ADD COLUMN error_class TEXT")?;
82 }
83 if !has_next_retry_at {
84 conn.execute_batch("ALTER TABLE queue ADD COLUMN next_retry_at TEXT")?;
85 }
86 if !has_operation {
87 conn.execute_batch("ALTER TABLE queue ADD COLUMN operation TEXT")?;
88 }
89 if !has_finish_reason {
90 conn.execute_batch("ALTER TABLE queue ADD COLUMN finish_reason TEXT")?;
91 }
92 if !has_input_tokens {
93 conn.execute_batch("ALTER TABLE queue ADD COLUMN input_tokens INTEGER")?;
94 }
95 if !has_output_tokens {
96 conn.execute_batch("ALTER TABLE queue ADD COLUMN output_tokens INTEGER")?;
97 }
98 conn.execute_batch(
99 "CREATE INDEX IF NOT EXISTS idx_enrich_queue_eligible ON queue(status, next_retry_at);
100 CREATE INDEX IF NOT EXISTS idx_enrich_queue_operation ON queue(operation, status);
101 CREATE INDEX IF NOT EXISTS idx_enrich_queue_memory ON queue(memory_id)",
102 )?;
103 Ok(conn)
104}
105
106pub(super) fn enqueue_candidate(
114 queue_conn: &Connection,
115 main_conn: &Connection,
116 namespace: &str,
117 key: &str,
118 item_type: &str,
119 operation: &str,
120) {
121 let memory_id: Option<i64> = if item_type == "memory" {
122 main_conn
123 .query_row(
124 "SELECT id FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
125 rusqlite::params![namespace, key],
126 |r| r.get(0),
127 )
128 .ok()
129 } else {
130 None
131 };
132 if let Err(e) = queue_conn.execute(
133 "INSERT OR IGNORE INTO queue (item_key, item_type, status, operation, memory_id) \
134 VALUES (?1, ?2, 'pending', ?3, ?4)",
135 rusqlite::params![key, item_type, operation, memory_id],
136 ) {
137 tracing::warn!(target: "enrich", error = %e, "queue insert failed");
138 }
139}
140
141pub(super) fn skipped_item_keys(
149 conn: &Connection,
150 operation: &str,
151) -> Result<std::collections::HashSet<String>, AppError> {
152 let mut stmt = conn.prepare(
153 "SELECT item_key FROM queue WHERE status='skipped' AND (operation = ?1 OR operation IS NULL)",
154 )?;
155 let keys = stmt
156 .query_map(rusqlite::params![operation], |r| r.get::<_, String>(0))?
157 .collect::<Result<std::collections::HashSet<String>, _>>()?;
158 Ok(keys)
159}
160
161pub(super) fn item_type_for(operation: &EnrichOperation) -> &'static str {
164 match operation {
165 EnrichOperation::EntityDescriptions => "entity",
166 _ => "memory",
167 }
168}
169
170pub(super) fn item_type_for_key(key: &str, default: &'static str) -> &'static str {
178 if key.starts_with("entity:") {
179 "entity"
180 } else if key.starts_with("chunk:") {
181 "chunk"
182 } else {
183 default
184 }
185}
186
187pub fn cleanup_queue_entry(db_path: &std::path::Path, memory_id: i64, name: &str) {
196 let queue_path = crate::paths::sidecar_path(db_path, ".enrich-queue.sqlite");
197 if !queue_path.exists() {
198 return;
199 }
200 match open_queue_db(&queue_path) {
201 Ok(conn) => {
202 if let Err(e) = conn.execute(
203 "DELETE FROM queue WHERE memory_id = ?1 OR item_key = ?2",
204 rusqlite::params![memory_id, name],
205 ) {
206 tracing::warn!(target: "enrich", error = %e, memory_id, "enrich-queue cleanup failed");
207 }
208 }
209 Err(e) => {
210 tracing::warn!(target: "enrich", error = %e, "enrich-queue cleanup skipped (open failed)");
211 }
212 }
213}
214
215pub(super) fn prune_dead_orphans(
226 queue_conn: &Connection,
227 main_conn: &Connection,
228 operation: &str,
229 namespace: &str,
230) -> Result<i64, AppError> {
231 let dead: Vec<(i64, String)> = {
232 let mut stmt = queue_conn.prepare(
233 "SELECT id, item_key FROM queue \
234 WHERE status='dead' AND item_type='memory' \
235 AND (operation = ?1 OR operation IS NULL) ORDER BY id",
236 )?;
237 let rows = stmt
238 .query_map(rusqlite::params![operation], |r| Ok((r.get(0)?, r.get(1)?)))?
239 .collect::<Result<Vec<_>, _>>()?;
240 rows
241 };
242 let mut pruned = 0_i64;
243 for (id, name) in dead {
244 let exists = main_conn
245 .query_row(
246 "SELECT 1 FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
247 rusqlite::params![namespace, name],
248 |_| Ok(()),
249 )
250 .is_ok();
251 if !exists {
252 queue_conn.execute("DELETE FROM queue WHERE id=?1", rusqlite::params![id])?;
253 pruned += 1;
254 }
255 }
256 if pruned > 0 {
257 let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
258 }
259 Ok(pruned)
260}
261
262#[derive(Debug, Serialize, schemars::JsonSchema)]
272pub struct EnrichStatus {
273 pub(super) status_report: bool,
274 pub(super) operation: String,
275 pub(super) namespace: String,
276 pub(super) unbound_backlog: usize,
277 pub(super) scan_backlog: i64,
284 pub(super) queue_pending: i64,
285 pub(super) queue_processing: i64,
286 pub(super) queue_done: i64,
287 pub(super) queue_failed: i64,
288 pub(super) queue_skipped: i64,
289 pub(super) queue_dead: i64,
290 pub(super) eligible_now: i64,
291 pub(super) waiting: i64,
292 pub(super) state: &'static str,
298 pub(super) waiting_items: Vec<WaitingItem>,
301}
302
303#[derive(Debug, Serialize, schemars::JsonSchema)]
305pub struct WaitingItem {
306 pub(super) item_key: String,
307 pub(super) attempt: i64,
308 pub(super) next_retry_at: Option<String>,
309 pub(super) error_class: Option<String>,
310}
311
312#[derive(Debug, Serialize, schemars::JsonSchema)]
314pub struct DeadItem {
315 pub(super) dead_item: bool,
316 pub(super) item_key: String,
317 pub(super) item_type: String,
318 pub(super) attempt: i64,
319 pub(super) error_class: Option<String>,
320 pub(super) error: Option<String>,
321 pub(super) finish_reason: Option<String>,
326 pub(super) input_tokens: Option<i64>,
328 pub(super) output_tokens: Option<i64>,
330}
331
332#[derive(Debug, Serialize, schemars::JsonSchema)]
334pub struct DeadSummary {
335 pub(super) summary: bool,
336 pub(super) operation: String,
337 pub(super) namespace: String,
338 pub(super) action: &'static str,
340 pub(super) dead_total: i64,
341 pub(super) requeued: i64,
342 pub(super) pruned: i64,
346}
347
348pub(super) fn classify_enrich_outcome(e: &AppError) -> crate::retry::AttemptOutcome {
359 use crate::retry::AttemptOutcome;
360 match e {
361 AppError::RateLimited { .. } | AppError::Timeout { .. } | AppError::DbBusy(_) => {
362 AttemptOutcome::Transient
363 }
364 AppError::EntityNotYetMaterialized { .. } => AttemptOutcome::Transient,
372 AppError::ProviderError { .. }
378 | AppError::NotFound(_)
379 | AppError::MemoryNotFound { .. }
380 | AppError::MemoryNotFoundById { .. } => AttemptOutcome::HardFailure,
381 AppError::Database(_) => {
385 if crate::storage::utils::is_sqlite_busy(e) {
386 AttemptOutcome::Transient
387 } else {
388 AttemptOutcome::HardFailure
389 }
390 }
391 AppError::Embedding(_) => AttemptOutcome::Transient,
402 _ => AttemptOutcome::HardFailure,
413 }
414}
415
416pub(super) fn record_item_failure(
429 queue_conn: &rusqlite::Connection,
430 queue_id: i64,
431 attempt: i64,
432 max_attempts: u32,
433 err: &AppError,
434) -> crate::retry::AttemptOutcome {
435 let outcome = classify_enrich_outcome(err);
436 let err_str = format!("{err}");
437 record_item_failure_typed(
438 queue_conn,
439 queue_id,
440 attempt,
441 max_attempts,
442 outcome,
443 &err_str,
444 None,
445 None,
446 None,
447 )
448}
449
450#[allow(clippy::too_many_arguments)]
459pub(super) fn record_item_failure_typed(
460 queue_conn: &rusqlite::Connection,
461 queue_id: i64,
462 attempt: i64,
463 max_attempts: u32,
464 outcome: crate::retry::AttemptOutcome,
465 err_str: &str,
466 finish_reason: Option<&str>,
467 input_tokens: Option<i64>,
468 output_tokens: Option<i64>,
469) -> crate::retry::AttemptOutcome {
470 use crate::retry::AttemptOutcome;
471 let error_class = match outcome {
472 AttemptOutcome::Transient => "transient",
473 AttemptOutcome::HardFailure => "permanent",
474 AttemptOutcome::Success => "success",
475 };
476
477 let terminal = matches!(outcome, AttemptOutcome::HardFailure) || attempt >= max_attempts as i64;
478 if terminal {
479 let _ = queue_conn.execute(
480 "UPDATE queue SET status='dead', error=?1, error_class=?2, done_at=datetime('now'), \
481 finish_reason=?3, input_tokens=?4, output_tokens=?5 WHERE id=?6",
482 rusqlite::params![
483 err_str,
484 error_class,
485 finish_reason,
486 input_tokens,
487 output_tokens,
488 queue_id
489 ],
490 );
491 } else {
492 let delay = crate::retry::compute_delay(
493 &crate::retry::RetryConfig::llm_rate_limit(),
494 attempt.max(0) as u32,
495 );
496 let secs = delay.as_secs().max(1);
497 let modifier = format!("+{secs} seconds");
498 let _ = queue_conn.execute(
499 "UPDATE queue SET status='pending', error=?1, error_class=?2, next_retry_at=datetime('now', ?3), \
500 finish_reason=?4, input_tokens=?5, output_tokens=?6 WHERE id=?7",
501 rusqlite::params![
502 err_str,
503 error_class,
504 modifier,
505 finish_reason,
506 input_tokens,
507 output_tokens,
508 queue_id
509 ],
510 );
511 }
512 outcome
513}
514
515pub(super) enum DequeueOutcome {
523 Claimed((i64, String, String, i64)),
524 Empty,
525}
526
527pub(super) fn dequeue_next_pending(
528 queue_conn: &rusqlite::Connection,
529 backoff_clause: &str,
530) -> Result<DequeueOutcome, AppError> {
531 let dequeue_sql = format!(
532 "UPDATE queue SET status='processing', attempt=attempt+1 \
533 WHERE id = (SELECT id FROM queue WHERE status='pending' {backoff_clause} \
534 ORDER BY id LIMIT 1) \
535 RETURNING id, item_key, item_type, attempt"
536 );
537 match queue_conn.query_row(&dequeue_sql, [], |row| {
538 Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?))
539 }) {
540 Ok(claimed) => Ok(DequeueOutcome::Claimed(claimed)),
541 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(DequeueOutcome::Empty),
542 Err(e) => Err(AppError::Database(e)),
543 }
544}
545
546#[cfg(test)]
551mod tests {
552 use super::*;
553
554 fn open_test_db() -> Connection {
555 let conn = Connection::open_in_memory().expect("in-memory db");
556 conn.execute_batch(
557 "CREATE TABLE memories (
558 id INTEGER PRIMARY KEY AUTOINCREMENT,
559 namespace TEXT NOT NULL DEFAULT 'global',
560 name TEXT NOT NULL,
561 type TEXT NOT NULL DEFAULT 'note',
562 description TEXT NOT NULL DEFAULT '',
563 body TEXT NOT NULL DEFAULT '',
564 body_hash TEXT NOT NULL DEFAULT '',
565 session_id TEXT,
566 source TEXT NOT NULL DEFAULT 'agent',
567 metadata TEXT NOT NULL DEFAULT '{}',
568 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
569 updated_at INTEGER NOT NULL DEFAULT (unixepoch()),
570 deleted_at INTEGER,
571 UNIQUE(namespace, name)
572 );",
573 )
574 .expect("schema creation must succeed");
575 conn
576 }
577
578 fn open_temp_queue() -> (Connection, String) {
579 let path = format!(
580 "/tmp/test-enrich-dl-{}-{}.sqlite",
581 std::process::id(),
582 fastrand::u64(..)
583 );
584 let conn = open_queue_db(&path).expect("queue db must open");
585 (conn, path)
586 }
587
588 fn insert_pending(conn: &Connection, key: &str) -> i64 {
589 conn.execute(
590 "INSERT INTO queue (item_key, item_type, status) VALUES (?1, 'memory', 'pending')",
591 rusqlite::params![key],
592 )
593 .unwrap();
594 conn.last_insert_rowid()
595 }
596
597 #[test]
598 fn queue_db_schema_creates_correctly() {
599 let tmp_path = format!("/tmp/test-enrich-queue-{}.sqlite", std::process::id());
600 let conn = open_queue_db(&tmp_path).expect("queue db must open");
601 let count: i64 = conn
602 .query_row("SELECT COUNT(*) FROM queue", [], |r| r.get(0))
603 .unwrap();
604 assert_eq!(count, 0);
605 let _ = std::fs::remove_file(&tmp_path);
606 }
607
608 #[test]
609 fn classify_rate_limit_is_transient() {
610 let e = AppError::RateLimited {
611 detail: "429".into(),
612 };
613 assert_eq!(
614 classify_enrich_outcome(&e),
615 crate::retry::AttemptOutcome::Transient
616 );
617 }
618
619 #[test]
620 fn classify_timeout_and_dbbusy_are_transient() {
621 let t = AppError::Timeout {
622 operation: "judge".into(),
623 duration_secs: 30,
624 };
625 let b = AppError::DbBusy("locked".into());
626 assert_eq!(
627 classify_enrich_outcome(&t),
628 crate::retry::AttemptOutcome::Transient
629 );
630 assert_eq!(
631 classify_enrich_outcome(&b),
632 crate::retry::AttemptOutcome::Transient
633 );
634 }
635
636 #[test]
637 fn classify_validation_and_parse_are_hard_failure() {
638 let v = AppError::Validation("failed to parse entities array: bad".into());
639 assert_eq!(
640 classify_enrich_outcome(&v),
641 crate::retry::AttemptOutcome::HardFailure
642 );
643 }
644
645 #[test]
646 fn open_queue_db_alter_is_idempotent() {
647 let path = format!(
648 "/tmp/test-enrich-idem-{}-{}.sqlite",
649 std::process::id(),
650 fastrand::u64(..)
651 );
652 let _ = open_queue_db(&path).expect("first open");
653 let conn = open_queue_db(&path).expect("second open is idempotent");
654 let cols: Vec<String> = {
655 let mut stmt = conn.prepare("PRAGMA table_info(queue)").unwrap();
656 stmt.query_map([], |r| r.get::<_, String>(1))
657 .unwrap()
658 .collect::<Result<Vec<_>, _>>()
659 .unwrap()
660 };
661 assert!(cols.iter().any(|c| c == "error_class"));
662 assert!(cols.iter().any(|c| c == "next_retry_at"));
663 let _ = std::fs::remove_file(&path);
664 }
665
666 #[test]
667 fn record_item_failure_hard_marks_dead() {
668 let (conn, path) = open_temp_queue();
669 let id = insert_pending(&conn, "mem-hard");
670 let outcome = record_item_failure(
671 &conn,
672 id,
673 1,
674 5,
675 &AppError::Validation("invalid body".into()),
676 );
677 assert_eq!(outcome, crate::retry::AttemptOutcome::HardFailure);
678 let status: String = conn
679 .query_row(
680 "SELECT status FROM queue WHERE id=?1",
681 rusqlite::params![id],
682 |r| r.get(0),
683 )
684 .unwrap();
685 assert_eq!(status, "dead");
686 let _ = std::fs::remove_file(&path);
687 }
688
689 #[test]
690 fn record_item_failure_transient_reschedules_pending() {
691 let (conn, path) = open_temp_queue();
692 let id = insert_pending(&conn, "mem-transient");
693 let outcome = record_item_failure(
694 &conn,
695 id,
696 1,
697 5,
698 &AppError::RateLimited {
699 detail: "429".into(),
700 },
701 );
702 assert_eq!(outcome, crate::retry::AttemptOutcome::Transient);
703 let (status, future): (String, i64) = conn
704 .query_row(
705 "SELECT status, (next_retry_at > datetime('now')) FROM queue WHERE id=?1",
706 rusqlite::params![id],
707 |r| Ok((r.get(0)?, r.get(1)?)),
708 )
709 .unwrap();
710 assert_eq!(status, "pending");
711 assert_eq!(future, 1, "next_retry_at must be in the future");
712 let _ = std::fs::remove_file(&path);
713 }
714
715 #[test]
716 fn record_item_failure_transient_at_cap_marks_dead() {
717 let (conn, path) = open_temp_queue();
718 let id = insert_pending(&conn, "mem-cap");
719 let outcome = record_item_failure(
720 &conn,
721 id,
722 5,
723 5,
724 &AppError::RateLimited {
725 detail: "429".into(),
726 },
727 );
728 assert_eq!(outcome, crate::retry::AttemptOutcome::Transient);
729 let status: String = conn
730 .query_row(
731 "SELECT status FROM queue WHERE id=?1",
732 rusqlite::params![id],
733 |r| r.get(0),
734 )
735 .unwrap();
736 assert_eq!(status, "dead");
737 let _ = std::fs::remove_file(&path);
738 }
739
740 #[test]
741 fn dequeue_skips_future_retry_and_dead() {
742 let (conn, path) = open_temp_queue();
743 let eligible = insert_pending(&conn, "mem-eligible");
744 let waiting = insert_pending(&conn, "mem-waiting");
745 conn.execute(
746 "UPDATE queue SET next_retry_at=datetime('now', '+3600 seconds') WHERE id=?1",
747 rusqlite::params![waiting],
748 )
749 .unwrap();
750 let dead = insert_pending(&conn, "mem-dead");
751 conn.execute(
752 "UPDATE queue SET status='dead' WHERE id=?1",
753 rusqlite::params![dead],
754 )
755 .unwrap();
756
757 let claimed: Option<i64> = conn
758 .query_row(
759 "UPDATE queue SET status='processing', attempt=attempt+1 \
760 WHERE id = (SELECT id FROM queue WHERE status='pending' \
761 AND (next_retry_at IS NULL OR next_retry_at <= datetime('now')) \
762 ORDER BY id LIMIT 1) \
763 RETURNING id",
764 [],
765 |r| r.get(0),
766 )
767 .ok();
768 assert_eq!(claimed, Some(eligible));
769
770 let second: Option<i64> = conn
771 .query_row(
772 "UPDATE queue SET status='processing', attempt=attempt+1 \
773 WHERE id = (SELECT id FROM queue WHERE status='pending' \
774 AND (next_retry_at IS NULL OR next_retry_at <= datetime('now')) \
775 ORDER BY id LIMIT 1) \
776 RETURNING id",
777 [],
778 |r| r.get(0),
779 )
780 .ok();
781 assert_eq!(second, None);
782 let _ = std::fs::remove_file(&path);
783 }
784
785 #[test]
786 fn classify_validation_never_infers_transience_from_message() {
787 for msg in [
795 "model 'x' returned non-object JSON after repair (got string)",
796 "model 'x' returned content that could not be parsed even after JSON repair",
797 "model 'x' returned no structured content",
798 "LLM result missing 'description' field",
799 "LLM result missing 'enriched_body' field",
800 ] {
801 assert_eq!(
802 classify_enrich_outcome(&AppError::Validation(msg.into())),
803 crate::retry::AttemptOutcome::HardFailure,
804 "expected hard failure for: {msg}"
805 );
806 }
807 }
808
809 #[test]
810 fn classify_embedding_error_is_transient_floor() {
811 assert_eq!(
812 classify_enrich_outcome(&AppError::Embedding("dimension mismatch".into())),
813 crate::retry::AttemptOutcome::Transient
814 );
815 }
816
817 #[test]
820 fn classify_entity_not_yet_materialized_is_transient() {
821 assert_eq!(
822 classify_enrich_outcome(&AppError::EntityNotYetMaterialized {
823 name: "acme".into(),
824 namespace: "global".into(),
825 }),
826 crate::retry::AttemptOutcome::Transient
827 );
828 }
829
830 #[test]
831 fn classify_memory_absence_stays_hard_failure() {
832 assert_eq!(
833 classify_enrich_outcome(&AppError::MemoryNotFound {
834 name: "mem-x".into(),
835 namespace: "global".into(),
836 }),
837 crate::retry::AttemptOutcome::HardFailure
838 );
839 assert_eq!(
840 classify_enrich_outcome(&AppError::MemoryNotFoundById { id: 42 }),
841 crate::retry::AttemptOutcome::HardFailure
842 );
843 assert_eq!(
844 classify_enrich_outcome(&AppError::NotFound("gone".into())),
845 crate::retry::AttemptOutcome::HardFailure
846 );
847 }
848
849 #[test]
850 fn classify_database_busy_is_transient_non_busy_is_hard() {
851 let busy = AppError::Database(rusqlite::Error::SqliteFailure(
852 rusqlite::ffi::Error::new(rusqlite::ffi::SQLITE_BUSY),
853 Some("database is locked".into()),
854 ));
855 assert_eq!(
856 classify_enrich_outcome(&busy),
857 crate::retry::AttemptOutcome::Transient
858 );
859 let constraint = AppError::Database(rusqlite::Error::SqliteFailure(
860 rusqlite::ffi::Error::new(rusqlite::ffi::SQLITE_CONSTRAINT),
861 Some("UNIQUE constraint failed".into()),
862 ));
863 assert_eq!(
864 classify_enrich_outcome(&constraint),
865 crate::retry::AttemptOutcome::HardFailure
866 );
867 }
868
869 #[test]
870 fn record_item_failure_typed_persists_diagnostics_on_dead_letter() {
871 let (conn, path) = open_temp_queue();
872 let id = insert_pending(&conn, "mem-diag");
873 let outcome = record_item_failure_typed(
874 &conn,
875 id,
876 1,
877 5,
878 crate::retry::AttemptOutcome::HardFailure,
879 "truncated response",
880 Some("length"),
881 Some(120),
882 Some(4096),
883 );
884 assert_eq!(outcome, crate::retry::AttemptOutcome::HardFailure);
885 let (status, finish_reason, input_tokens, output_tokens): (
886 String,
887 Option<String>,
888 Option<i64>,
889 Option<i64>,
890 ) = conn
891 .query_row(
892 "SELECT status, finish_reason, input_tokens, output_tokens FROM queue WHERE id=?1",
893 rusqlite::params![id],
894 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
895 )
896 .unwrap();
897 assert_eq!(status, "dead");
898 assert_eq!(finish_reason.as_deref(), Some("length"));
899 assert_eq!(input_tokens, Some(120));
900 assert_eq!(output_tokens, Some(4096));
901 let _ = std::fs::remove_file(&path);
902 }
903
904 #[test]
905 fn record_item_failure_typed_reschedules_transient_below_max_attempts() {
906 let (conn, path) = open_temp_queue();
912 let id = insert_pending(&conn, "mem-retry");
913 let outcome = record_item_failure_typed(
914 &conn,
915 id,
916 1,
917 5,
918 crate::retry::AttemptOutcome::Transient,
919 "truncated response",
920 Some("length"),
921 Some(120),
922 Some(64),
923 );
924 assert_eq!(outcome, crate::retry::AttemptOutcome::Transient);
925 let (status, error_class, finish_reason, next_retry_at): (
926 String,
927 String,
928 Option<String>,
929 Option<String>,
930 ) = conn
931 .query_row(
932 "SELECT status, error_class, finish_reason, next_retry_at FROM queue WHERE id=?1",
933 rusqlite::params![id],
934 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
935 )
936 .unwrap();
937 assert_eq!(status, "pending");
938 assert_eq!(error_class, "transient");
939 assert_eq!(finish_reason.as_deref(), Some("length"));
940 assert!(
941 next_retry_at.is_some(),
942 "a rescheduled item must carry a next_retry_at"
943 );
944 let _ = std::fs::remove_file(&path);
945 }
946
947 #[test]
957 fn with_busy_retry_bounds_dequeue_under_sustained_contention() {
958 let (conn, path) = open_temp_queue();
959 insert_pending(&conn, "mem-busy");
960 conn.pragma_update(None, "busy_timeout", 0i64)
961 .expect("busy_timeout override must succeed");
962
963 let blocker = Connection::open(&path).expect("blocker connection must open");
967 blocker
968 .execute_batch("BEGIN EXCLUSIVE;")
969 .expect("exclusive lock must be acquired");
970
971 let calls = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
972 let calls_clone = std::sync::Arc::clone(&calls);
973 let result: Result<DequeueOutcome, AppError> =
974 crate::storage::utils::with_busy_retry(|| {
975 calls_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
976 dequeue_next_pending(&conn, "")
977 });
978
979 assert!(
980 matches!(result, Err(AppError::DbBusy(_))),
981 "sustained SQLITE_BUSY must convert to DbBusy, not hang or silently report Empty"
982 );
983 assert_eq!(
984 calls.load(std::sync::atomic::Ordering::SeqCst),
985 crate::constants::MAX_SQLITE_BUSY_RETRIES,
986 "must attempt exactly MAX_SQLITE_BUSY_RETRIES times, never retry unbounded"
987 );
988
989 blocker
990 .execute_batch("ROLLBACK;")
991 .expect("releasing the exclusive lock must succeed");
992 let _ = std::fs::remove_file(&path);
993 }
994
995 #[test]
996 fn dequeue_next_pending_distinguishes_empty_from_claimed() {
997 let (conn, path) = open_temp_queue();
998 let id = insert_pending(&conn, "mem-dequeue");
999 let claimed = dequeue_next_pending(&conn, "").expect("dequeue must succeed");
1000 match claimed {
1001 DequeueOutcome::Claimed((claimed_id, key, _, _)) => {
1002 assert_eq!(claimed_id, id);
1003 assert_eq!(key, "mem-dequeue");
1004 }
1005 DequeueOutcome::Empty => panic!("expected a claimed row"),
1006 }
1007 let empty = dequeue_next_pending(&conn, "").expect("dequeue must succeed");
1008 assert!(matches!(empty, DequeueOutcome::Empty));
1009 let _ = std::fs::remove_file(&path);
1010 }
1011
1012 #[test]
1013 fn classify_provider_error_and_not_found_are_hard() {
1014 assert_eq!(
1015 classify_enrich_outcome(&AppError::ProviderError {
1016 code: "400".into(),
1017 message: "context length exceeded".into(),
1018 }),
1019 crate::retry::AttemptOutcome::HardFailure
1020 );
1021 assert_eq!(
1022 classify_enrich_outcome(&AppError::NotFound("memory 'gone' not found".into())),
1023 crate::retry::AttemptOutcome::HardFailure
1024 );
1025 }
1026
1027 #[test]
1028 fn open_queue_db_migrates_operation_column() {
1029 let (conn, path) = open_temp_queue();
1030 drop(conn);
1031 let conn = open_queue_db(&path).expect("second open is idempotent");
1032 let cols: Vec<String> = {
1033 let mut stmt = conn.prepare("PRAGMA table_info(queue)").unwrap();
1034 stmt.query_map([], |r| r.get::<_, String>(1))
1035 .unwrap()
1036 .collect::<Result<Vec<_>, _>>()
1037 .unwrap()
1038 };
1039 assert!(cols.iter().any(|c| c == "operation"));
1040 assert!(cols.iter().any(|c| c == "memory_id"));
1041 let _ = std::fs::remove_file(&path);
1042 }
1043
1044 #[test]
1045 fn enqueue_candidate_tags_operation_and_memory_id() {
1046 let main = open_test_db();
1047 main.execute(
1048 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'mem-x', 'body')",
1049 [],
1050 )
1051 .unwrap();
1052 let mem_id: i64 = main
1053 .query_row("SELECT id FROM memories WHERE name='mem-x'", [], |r| {
1054 r.get(0)
1055 })
1056 .unwrap();
1057 let (queue, path) = open_temp_queue();
1058 enqueue_candidate(&queue, &main, "global", "mem-x", "memory", "MemoryBindings");
1059 let (op, mid): (String, i64) = queue
1060 .query_row(
1061 "SELECT operation, memory_id FROM queue WHERE item_key='mem-x'",
1062 [],
1063 |r| Ok((r.get(0)?, r.get(1)?)),
1064 )
1065 .unwrap();
1066 assert_eq!(op, "MemoryBindings");
1067 assert_eq!(mid, mem_id);
1068 let _ = std::fs::remove_file(&path);
1069 }
1070
1071 #[test]
1072 fn requeue_dead_resurrects_dead_rows() {
1073 let (conn, path) = open_temp_queue();
1074 conn.execute(
1075 "INSERT INTO queue (item_key, item_type, status, operation, attempt, error, error_class, next_retry_at) \
1076 VALUES ('mem-dead', 'memory', 'dead', 'MemoryBindings', 8, 'boom', 'permanent', datetime('now'))",
1077 [],
1078 )
1079 .unwrap();
1080 let n = conn
1081 .execute(
1082 "UPDATE queue SET status='pending', attempt=0, next_retry_at=NULL, \
1083 error=NULL, error_class=NULL \
1084 WHERE status='dead' AND (operation = ?1 OR operation IS NULL)",
1085 rusqlite::params!["MemoryBindings"],
1086 )
1087 .unwrap();
1088 assert_eq!(n, 1);
1089 let (status, attempt, nra): (String, i64, Option<String>) = conn
1090 .query_row(
1091 "SELECT status, attempt, next_retry_at FROM queue WHERE item_key='mem-dead'",
1092 [],
1093 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1094 )
1095 .unwrap();
1096 assert_eq!(status, "pending");
1097 assert_eq!(attempt, 0);
1098 assert!(nra.is_none());
1099 let _ = std::fs::remove_file(&path);
1100 }
1101
1102 #[test]
1103 fn skipped_item_keys_excludes_only_skipped_for_operation() {
1104 let (conn, path) = open_temp_queue();
1109 conn.execute(
1110 "INSERT INTO queue (item_key, item_type, status, operation) VALUES ('mem-vetoed', 'memory', 'skipped', 'BodyEnrich')",
1111 [],
1112 )
1113 .unwrap();
1114 conn.execute(
1115 "INSERT INTO queue (item_key, item_type, status, operation) VALUES ('mem-pending', 'memory', 'pending', 'BodyEnrich')",
1116 [],
1117 )
1118 .unwrap();
1119 conn.execute(
1120 "INSERT INTO queue (item_key, item_type, status, operation) VALUES ('mem-other-op', 'memory', 'skipped', 'MemoryBindings')",
1121 [],
1122 )
1123 .unwrap();
1124 let keys = skipped_item_keys(&conn, "BodyEnrich").unwrap();
1125 assert!(
1126 keys.contains("mem-vetoed"),
1127 "vetoed BodyEnrich item must be excluded from scan"
1128 );
1129 assert!(
1130 !keys.contains("mem-pending"),
1131 "pending item is still actionable"
1132 );
1133 assert!(
1134 !keys.contains("mem-other-op"),
1135 "skipped item from another operation must not leak"
1136 );
1137 assert_eq!(keys.len(), 1);
1138 let _ = std::fs::remove_file(&path);
1139 }
1140
1141 #[test]
1142 fn cascade_cleanup_delete_targets_memory_id_and_name() {
1143 let (conn, path) = open_temp_queue();
1144 conn.execute(
1145 "INSERT INTO queue (item_key, item_type, status, memory_id) VALUES ('by-id', 'memory', 'done', 42)",
1146 [],
1147 )
1148 .unwrap();
1149 conn.execute(
1150 "INSERT INTO queue (item_key, item_type, status) VALUES ('by-name', 'memory', 'pending')",
1151 [],
1152 )
1153 .unwrap();
1154 let removed = conn
1155 .execute(
1156 "DELETE FROM queue WHERE memory_id = ?1 OR item_key = ?2",
1157 rusqlite::params![42_i64, "by-name"],
1158 )
1159 .unwrap();
1160 assert_eq!(removed, 2);
1161 let remaining: i64 = conn
1162 .query_row("SELECT COUNT(*) FROM queue", [], |r| r.get(0))
1163 .unwrap();
1164 assert_eq!(remaining, 0);
1165 let _ = std::fs::remove_file(&path);
1166 }
1167
1168 #[test]
1169 fn item_type_for_maps_entity_and_memory() {
1170 assert_eq!(
1171 item_type_for(&EnrichOperation::EntityDescriptions),
1172 "entity"
1173 );
1174 assert_eq!(item_type_for(&EnrichOperation::MemoryBindings), "memory");
1175 assert_eq!(item_type_for(&EnrichOperation::AugmentBindings), "memory");
1176 assert_eq!(item_type_for(&EnrichOperation::BodyExtract), "memory");
1177 }
1178
1179 #[test]
1182 fn item_type_for_key_honours_reembed_prefixes() {
1183 assert_eq!(item_type_for_key("plain-memory-name", "memory"), "memory");
1184 assert_eq!(
1185 item_type_for_key("entity:tokio-runtime", "memory"),
1186 "entity"
1187 );
1188 assert_eq!(item_type_for_key("chunk:42", "memory"), "chunk");
1189 assert_eq!(item_type_for_key("some-entity", "entity"), "entity");
1190 }
1191
1192 #[test]
1193 fn prune_dead_orphans_removes_only_orphan_memory_rows() {
1194 let main = open_test_db();
1195 main.execute(
1197 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'alive', 'b')",
1198 [],
1199 )
1200 .unwrap();
1201 let (queue, path) = open_temp_queue();
1202 queue
1204 .execute(
1205 "INSERT INTO queue (item_key, item_type, status, operation, error_class) \
1206 VALUES ('gone', 'memory', 'dead', 'MemoryBindings', 'permanent')",
1207 [],
1208 )
1209 .unwrap();
1210 queue
1212 .execute(
1213 "INSERT INTO queue (item_key, item_type, status, operation, error_class) \
1214 VALUES ('alive', 'memory', 'dead', 'MemoryBindings', 'permanent')",
1215 [],
1216 )
1217 .unwrap();
1218 queue
1220 .execute(
1221 "INSERT INTO queue (item_key, item_type, status, operation) \
1222 VALUES ('some-entity', 'entity', 'dead', 'EntityDescriptions')",
1223 [],
1224 )
1225 .unwrap();
1226
1227 let pruned = prune_dead_orphans(&queue, &main, "MemoryBindings", "global").unwrap();
1228 assert_eq!(pruned, 1, "only the orphan memory row is pruned");
1229
1230 let remaining: Vec<String> = {
1231 let mut stmt = queue
1232 .prepare("SELECT item_key FROM queue ORDER BY item_key")
1233 .unwrap();
1234 stmt.query_map([], |r| r.get::<_, String>(0))
1235 .unwrap()
1236 .collect::<Result<Vec<_>, _>>()
1237 .unwrap()
1238 };
1239 assert_eq!(remaining, vec!["alive", "some-entity"]);
1240 let _ = std::fs::remove_file(&path);
1241 }
1242}