1use super::*;
4
5pub(super) fn open_queue_db<P: AsRef<std::path::Path>>(path: P) -> Result<Connection, AppError> {
20 let conn = Connection::open(path)?;
21 conn.pragma_update(None, "journal_mode", "wal")?;
22 conn.execute_batch(
23 "CREATE TABLE IF NOT EXISTS queue (
24 id INTEGER PRIMARY KEY AUTOINCREMENT,
25 item_key TEXT NOT NULL UNIQUE,
26 item_type TEXT NOT NULL DEFAULT 'memory',
27 status TEXT NOT NULL DEFAULT 'pending',
28 memory_id INTEGER,
29 entity_id INTEGER,
30 entities INTEGER DEFAULT 0,
31 rels INTEGER DEFAULT 0,
32 error TEXT,
33 cost_usd REAL DEFAULT 0.0,
34 attempt INTEGER DEFAULT 0,
35 elapsed_ms INTEGER,
36 created_at TEXT DEFAULT (datetime('now')),
37 done_at TEXT
38 );
39 CREATE INDEX IF NOT EXISTS idx_enrich_queue_status ON queue(status);",
40 )?;
41 let mut has_error_class = false;
45 let mut has_next_retry_at = false;
46 let mut has_operation = false;
51 {
52 let mut stmt = conn.prepare("PRAGMA table_info(queue)")?;
53 let names = stmt.query_map([], |r| r.get::<_, String>(1))?;
54 for name in names {
55 match name?.as_str() {
56 "error_class" => has_error_class = true,
57 "next_retry_at" => has_next_retry_at = true,
58 "operation" => has_operation = true,
59 _ => {}
60 }
61 }
62 }
63 if !has_error_class {
64 conn.execute_batch("ALTER TABLE queue ADD COLUMN error_class TEXT")?;
65 }
66 if !has_next_retry_at {
67 conn.execute_batch("ALTER TABLE queue ADD COLUMN next_retry_at TEXT")?;
68 }
69 if !has_operation {
70 conn.execute_batch("ALTER TABLE queue ADD COLUMN operation TEXT")?;
71 }
72 conn.execute_batch(
73 "CREATE INDEX IF NOT EXISTS idx_enrich_queue_eligible ON queue(status, next_retry_at);
74 CREATE INDEX IF NOT EXISTS idx_enrich_queue_operation ON queue(operation, status);
75 CREATE INDEX IF NOT EXISTS idx_enrich_queue_memory ON queue(memory_id)",
76 )?;
77 Ok(conn)
78}
79
80pub(super) fn enqueue_candidate(
88 queue_conn: &Connection,
89 main_conn: &Connection,
90 namespace: &str,
91 key: &str,
92 item_type: &str,
93 operation: &str,
94) {
95 let memory_id: Option<i64> = if item_type == "memory" {
96 main_conn
97 .query_row(
98 "SELECT id FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
99 rusqlite::params![namespace, key],
100 |r| r.get(0),
101 )
102 .ok()
103 } else {
104 None
105 };
106 if let Err(e) = queue_conn.execute(
107 "INSERT OR IGNORE INTO queue (item_key, item_type, status, operation, memory_id) \
108 VALUES (?1, ?2, 'pending', ?3, ?4)",
109 rusqlite::params![key, item_type, operation, memory_id],
110 ) {
111 tracing::warn!(target: "enrich", error = %e, "queue insert failed");
112 }
113}
114
115pub(super) fn skipped_item_keys(
123 conn: &Connection,
124 operation: &str,
125) -> Result<std::collections::HashSet<String>, AppError> {
126 let mut stmt = conn.prepare(
127 "SELECT item_key FROM queue WHERE status='skipped' AND (operation = ?1 OR operation IS NULL)",
128 )?;
129 let keys = stmt
130 .query_map(rusqlite::params![operation], |r| r.get::<_, String>(0))?
131 .collect::<Result<std::collections::HashSet<String>, _>>()?;
132 Ok(keys)
133}
134
135pub(super) fn item_type_for(operation: &EnrichOperation) -> &'static str {
138 match operation {
139 EnrichOperation::EntityDescriptions => "entity",
140 _ => "memory",
141 }
142}
143
144pub fn cleanup_queue_entry(db_path: &std::path::Path, memory_id: i64, name: &str) {
153 let queue_path = crate::paths::sidecar_path(db_path, ".enrich-queue.sqlite");
154 if !queue_path.exists() {
155 return;
156 }
157 match open_queue_db(&queue_path) {
158 Ok(conn) => {
159 if let Err(e) = conn.execute(
160 "DELETE FROM queue WHERE memory_id = ?1 OR item_key = ?2",
161 rusqlite::params![memory_id, name],
162 ) {
163 tracing::warn!(target: "enrich", error = %e, memory_id, "enrich-queue cleanup failed");
164 }
165 }
166 Err(e) => {
167 tracing::warn!(target: "enrich", error = %e, "enrich-queue cleanup skipped (open failed)");
168 }
169 }
170}
171
172pub(super) fn prune_dead_orphans(
183 queue_conn: &Connection,
184 main_conn: &Connection,
185 operation: &str,
186 namespace: &str,
187) -> Result<i64, AppError> {
188 let dead: Vec<(i64, String)> = {
189 let mut stmt = queue_conn.prepare(
190 "SELECT id, item_key FROM queue \
191 WHERE status='dead' AND item_type='memory' \
192 AND (operation = ?1 OR operation IS NULL) ORDER BY id",
193 )?;
194 let rows = stmt
195 .query_map(rusqlite::params![operation], |r| Ok((r.get(0)?, r.get(1)?)))?
196 .collect::<Result<Vec<_>, _>>()?;
197 rows
198 };
199 let mut pruned = 0_i64;
200 for (id, name) in dead {
201 let exists = main_conn
202 .query_row(
203 "SELECT 1 FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
204 rusqlite::params![namespace, name],
205 |_| Ok(()),
206 )
207 .is_ok();
208 if !exists {
209 queue_conn.execute("DELETE FROM queue WHERE id=?1", rusqlite::params![id])?;
210 pruned += 1;
211 }
212 }
213 if pruned > 0 {
214 let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
215 }
216 Ok(pruned)
217}
218
219#[derive(Debug, Serialize, schemars::JsonSchema)]
229pub struct EnrichStatus {
230 pub(super) status_report: bool,
231 pub(super) operation: String,
232 pub(super) namespace: String,
233 pub(super) unbound_backlog: usize,
234 pub(super) queue_pending: i64,
235 pub(super) queue_processing: i64,
236 pub(super) queue_done: i64,
237 pub(super) queue_failed: i64,
238 pub(super) queue_skipped: i64,
239 pub(super) queue_dead: i64,
240 pub(super) eligible_now: i64,
241 pub(super) waiting: i64,
242 pub(super) state: &'static str,
248 pub(super) waiting_items: Vec<WaitingItem>,
251}
252
253#[derive(Debug, Serialize, schemars::JsonSchema)]
255pub struct WaitingItem {
256 pub(super) item_key: String,
257 pub(super) attempt: i64,
258 pub(super) next_retry_at: Option<String>,
259 pub(super) error_class: Option<String>,
260}
261
262#[derive(Debug, Serialize, schemars::JsonSchema)]
264pub struct DeadItem {
265 pub(super) dead_item: bool,
266 pub(super) item_key: String,
267 pub(super) item_type: String,
268 pub(super) attempt: i64,
269 pub(super) error_class: Option<String>,
270 pub(super) error: Option<String>,
271}
272
273#[derive(Debug, Serialize, schemars::JsonSchema)]
275pub struct DeadSummary {
276 pub(super) summary: bool,
277 pub(super) operation: String,
278 pub(super) namespace: String,
279 pub(super) action: &'static str,
281 pub(super) dead_total: i64,
282 pub(super) requeued: i64,
283 pub(super) pruned: i64,
287}
288
289pub(super) fn classify_enrich_outcome(e: &AppError) -> crate::retry::AttemptOutcome {
296 use crate::retry::AttemptOutcome;
297 match e {
298 AppError::RateLimited { .. } | AppError::Timeout { .. } | AppError::DbBusy(_) => {
299 AttemptOutcome::Transient
300 }
301 AppError::ProviderError { .. }
307 | AppError::NotFound(_)
308 | AppError::MemoryNotFound { .. }
309 | AppError::MemoryNotFoundById { .. } => AttemptOutcome::HardFailure,
310 _ => {
311 let msg = format!("{e}").to_lowercase();
312 if msg.contains("server error")
313 || msg.contains("timed out")
314 || msg.contains("timeout")
315 || msg.contains("connection")
316 || msg.contains("5xx")
317 || msg.contains("502")
318 || msg.contains("503")
319 || msg.contains("504")
320 {
321 AttemptOutcome::Transient
322 } else if msg.contains("json")
323 || msg.contains("no structured content")
324 || msg.contains("non-object")
325 || msg.contains("missing '")
326 {
327 AttemptOutcome::Transient
334 } else {
335 AttemptOutcome::HardFailure
336 }
337 }
338 }
339}
340
341pub(super) fn record_item_failure(
349 queue_conn: &rusqlite::Connection,
350 queue_id: i64,
351 attempt: i64,
352 max_attempts: u32,
353 err: &AppError,
354) -> crate::retry::AttemptOutcome {
355 use crate::retry::AttemptOutcome;
356 let outcome = classify_enrich_outcome(err);
357 let err_str = format!("{err}");
358 let error_class = match outcome {
359 AttemptOutcome::Transient => "transient",
360 AttemptOutcome::HardFailure => "permanent",
361 AttemptOutcome::Success => "success",
362 };
363
364 let terminal = matches!(outcome, AttemptOutcome::HardFailure) || attempt >= max_attempts as i64;
365 if terminal {
366 let _ = queue_conn.execute(
367 "UPDATE queue SET status='dead', error=?1, error_class=?2, done_at=datetime('now') WHERE id=?3",
368 rusqlite::params![err_str, error_class, queue_id],
369 );
370 } else {
371 let delay = crate::retry::compute_delay(
372 &crate::retry::RetryConfig::llm_rate_limit(),
373 attempt.max(0) as u32,
374 );
375 let secs = delay.as_secs().max(1);
376 let modifier = format!("+{secs} seconds");
377 let _ = queue_conn.execute(
378 "UPDATE queue SET status='pending', error=?1, error_class=?2, next_retry_at=datetime('now', ?3) WHERE id=?4",
379 rusqlite::params![err_str, error_class, modifier, queue_id],
380 );
381 }
382 outcome
383}
384
385#[cfg(test)]
390mod tests {
391 use super::*;
392
393 fn open_test_db() -> Connection {
394 let conn = Connection::open_in_memory().expect("in-memory db");
395 conn.execute_batch(
396 "CREATE TABLE memories (
397 id INTEGER PRIMARY KEY AUTOINCREMENT,
398 namespace TEXT NOT NULL DEFAULT 'global',
399 name TEXT NOT NULL,
400 type TEXT NOT NULL DEFAULT 'note',
401 description TEXT NOT NULL DEFAULT '',
402 body TEXT NOT NULL DEFAULT '',
403 body_hash TEXT NOT NULL DEFAULT '',
404 session_id TEXT,
405 source TEXT NOT NULL DEFAULT 'agent',
406 metadata TEXT NOT NULL DEFAULT '{}',
407 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
408 updated_at INTEGER NOT NULL DEFAULT (unixepoch()),
409 deleted_at INTEGER,
410 UNIQUE(namespace, name)
411 );",
412 )
413 .expect("schema creation must succeed");
414 conn
415 }
416
417 fn open_temp_queue() -> (Connection, String) {
418 let path = format!(
419 "/tmp/test-enrich-dl-{}-{}.sqlite",
420 std::process::id(),
421 fastrand::u64(..)
422 );
423 let conn = open_queue_db(&path).expect("queue db must open");
424 (conn, path)
425 }
426
427 fn insert_pending(conn: &Connection, key: &str) -> i64 {
428 conn.execute(
429 "INSERT INTO queue (item_key, item_type, status) VALUES (?1, 'memory', 'pending')",
430 rusqlite::params![key],
431 )
432 .unwrap();
433 conn.last_insert_rowid()
434 }
435
436 #[test]
437 fn queue_db_schema_creates_correctly() {
438 let tmp_path = format!("/tmp/test-enrich-queue-{}.sqlite", std::process::id());
439 let conn = open_queue_db(&tmp_path).expect("queue db must open");
440 let count: i64 = conn
441 .query_row("SELECT COUNT(*) FROM queue", [], |r| r.get(0))
442 .unwrap();
443 assert_eq!(count, 0);
444 let _ = std::fs::remove_file(&tmp_path);
445 }
446
447 #[test]
448 fn classify_rate_limit_is_transient() {
449 let e = AppError::RateLimited {
450 detail: "429".into(),
451 };
452 assert_eq!(
453 classify_enrich_outcome(&e),
454 crate::retry::AttemptOutcome::Transient
455 );
456 }
457
458 #[test]
459 fn classify_timeout_and_dbbusy_are_transient() {
460 let t = AppError::Timeout {
461 operation: "judge".into(),
462 duration_secs: 30,
463 };
464 let b = AppError::DbBusy("locked".into());
465 assert_eq!(
466 classify_enrich_outcome(&t),
467 crate::retry::AttemptOutcome::Transient
468 );
469 assert_eq!(
470 classify_enrich_outcome(&b),
471 crate::retry::AttemptOutcome::Transient
472 );
473 }
474
475 #[test]
476 fn classify_validation_and_parse_are_hard_failure() {
477 let v = AppError::Validation("failed to parse entities array: bad".into());
478 assert_eq!(
479 classify_enrich_outcome(&v),
480 crate::retry::AttemptOutcome::HardFailure
481 );
482 }
483
484 #[test]
485 fn open_queue_db_alter_is_idempotent() {
486 let path = format!(
487 "/tmp/test-enrich-idem-{}-{}.sqlite",
488 std::process::id(),
489 fastrand::u64(..)
490 );
491 let _ = open_queue_db(&path).expect("first open");
492 let conn = open_queue_db(&path).expect("second open is idempotent");
493 let cols: Vec<String> = {
494 let mut stmt = conn.prepare("PRAGMA table_info(queue)").unwrap();
495 stmt.query_map([], |r| r.get::<_, String>(1))
496 .unwrap()
497 .collect::<Result<Vec<_>, _>>()
498 .unwrap()
499 };
500 assert!(cols.iter().any(|c| c == "error_class"));
501 assert!(cols.iter().any(|c| c == "next_retry_at"));
502 let _ = std::fs::remove_file(&path);
503 }
504
505 #[test]
506 fn record_item_failure_hard_marks_dead() {
507 let (conn, path) = open_temp_queue();
508 let id = insert_pending(&conn, "mem-hard");
509 let outcome = record_item_failure(
510 &conn,
511 id,
512 1,
513 5,
514 &AppError::Validation("invalid body".into()),
515 );
516 assert_eq!(outcome, crate::retry::AttemptOutcome::HardFailure);
517 let status: String = conn
518 .query_row(
519 "SELECT status FROM queue WHERE id=?1",
520 rusqlite::params![id],
521 |r| r.get(0),
522 )
523 .unwrap();
524 assert_eq!(status, "dead");
525 let _ = std::fs::remove_file(&path);
526 }
527
528 #[test]
529 fn record_item_failure_transient_reschedules_pending() {
530 let (conn, path) = open_temp_queue();
531 let id = insert_pending(&conn, "mem-transient");
532 let outcome = record_item_failure(
533 &conn,
534 id,
535 1,
536 5,
537 &AppError::RateLimited {
538 detail: "429".into(),
539 },
540 );
541 assert_eq!(outcome, crate::retry::AttemptOutcome::Transient);
542 let (status, future): (String, i64) = conn
543 .query_row(
544 "SELECT status, (next_retry_at > datetime('now')) FROM queue WHERE id=?1",
545 rusqlite::params![id],
546 |r| Ok((r.get(0)?, r.get(1)?)),
547 )
548 .unwrap();
549 assert_eq!(status, "pending");
550 assert_eq!(future, 1, "next_retry_at must be in the future");
551 let _ = std::fs::remove_file(&path);
552 }
553
554 #[test]
555 fn record_item_failure_transient_at_cap_marks_dead() {
556 let (conn, path) = open_temp_queue();
557 let id = insert_pending(&conn, "mem-cap");
558 let outcome = record_item_failure(
559 &conn,
560 id,
561 5,
562 5,
563 &AppError::RateLimited {
564 detail: "429".into(),
565 },
566 );
567 assert_eq!(outcome, crate::retry::AttemptOutcome::Transient);
568 let status: String = conn
569 .query_row(
570 "SELECT status FROM queue WHERE id=?1",
571 rusqlite::params![id],
572 |r| r.get(0),
573 )
574 .unwrap();
575 assert_eq!(status, "dead");
576 let _ = std::fs::remove_file(&path);
577 }
578
579 #[test]
580 fn dequeue_skips_future_retry_and_dead() {
581 let (conn, path) = open_temp_queue();
582 let eligible = insert_pending(&conn, "mem-eligible");
583 let waiting = insert_pending(&conn, "mem-waiting");
584 conn.execute(
585 "UPDATE queue SET next_retry_at=datetime('now', '+3600 seconds') WHERE id=?1",
586 rusqlite::params![waiting],
587 )
588 .unwrap();
589 let dead = insert_pending(&conn, "mem-dead");
590 conn.execute(
591 "UPDATE queue SET status='dead' WHERE id=?1",
592 rusqlite::params![dead],
593 )
594 .unwrap();
595
596 let claimed: Option<i64> = conn
597 .query_row(
598 "UPDATE queue SET status='processing', attempt=attempt+1 \
599 WHERE id = (SELECT id FROM queue WHERE status='pending' \
600 AND (next_retry_at IS NULL OR next_retry_at <= datetime('now')) \
601 ORDER BY id LIMIT 1) \
602 RETURNING id",
603 [],
604 |r| r.get(0),
605 )
606 .ok();
607 assert_eq!(claimed, Some(eligible));
608
609 let second: Option<i64> = conn
610 .query_row(
611 "UPDATE queue SET status='processing', attempt=attempt+1 \
612 WHERE id = (SELECT id FROM queue WHERE status='pending' \
613 AND (next_retry_at IS NULL OR next_retry_at <= datetime('now')) \
614 ORDER BY id LIMIT 1) \
615 RETURNING id",
616 [],
617 |r| r.get(0),
618 )
619 .ok();
620 assert_eq!(second, None);
621 let _ = std::fs::remove_file(&path);
622 }
623
624 #[test]
625 fn classify_non_json_and_shape_errors_are_transient() {
626 for msg in [
627 "model 'x' returned non-object JSON after repair (got string)",
628 "model 'x' returned content that could not be parsed even after JSON repair",
629 "model 'x' returned no structured content",
630 "LLM result missing 'description' field",
631 "LLM result missing 'enriched_body' field",
632 ] {
633 assert_eq!(
634 classify_enrich_outcome(&AppError::Validation(msg.into())),
635 crate::retry::AttemptOutcome::Transient,
636 "expected transient for: {msg}"
637 );
638 }
639 }
640
641 #[test]
642 fn classify_provider_error_and_not_found_are_hard() {
643 assert_eq!(
644 classify_enrich_outcome(&AppError::ProviderError {
645 code: "400".into(),
646 message: "context length exceeded".into(),
647 }),
648 crate::retry::AttemptOutcome::HardFailure
649 );
650 assert_eq!(
651 classify_enrich_outcome(&AppError::NotFound("memory 'gone' not found".into())),
652 crate::retry::AttemptOutcome::HardFailure
653 );
654 }
655
656 #[test]
657 fn open_queue_db_migrates_operation_column() {
658 let (conn, path) = open_temp_queue();
659 drop(conn);
660 let conn = open_queue_db(&path).expect("second open is idempotent");
661 let cols: Vec<String> = {
662 let mut stmt = conn.prepare("PRAGMA table_info(queue)").unwrap();
663 stmt.query_map([], |r| r.get::<_, String>(1))
664 .unwrap()
665 .collect::<Result<Vec<_>, _>>()
666 .unwrap()
667 };
668 assert!(cols.iter().any(|c| c == "operation"));
669 assert!(cols.iter().any(|c| c == "memory_id"));
670 let _ = std::fs::remove_file(&path);
671 }
672
673 #[test]
674 fn enqueue_candidate_tags_operation_and_memory_id() {
675 let main = open_test_db();
676 main.execute(
677 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'mem-x', 'body')",
678 [],
679 )
680 .unwrap();
681 let mem_id: i64 = main
682 .query_row("SELECT id FROM memories WHERE name='mem-x'", [], |r| {
683 r.get(0)
684 })
685 .unwrap();
686 let (queue, path) = open_temp_queue();
687 enqueue_candidate(&queue, &main, "global", "mem-x", "memory", "MemoryBindings");
688 let (op, mid): (String, i64) = queue
689 .query_row(
690 "SELECT operation, memory_id FROM queue WHERE item_key='mem-x'",
691 [],
692 |r| Ok((r.get(0)?, r.get(1)?)),
693 )
694 .unwrap();
695 assert_eq!(op, "MemoryBindings");
696 assert_eq!(mid, mem_id);
697 let _ = std::fs::remove_file(&path);
698 }
699
700 #[test]
701 fn requeue_dead_resurrects_dead_rows() {
702 let (conn, path) = open_temp_queue();
703 conn.execute(
704 "INSERT INTO queue (item_key, item_type, status, operation, attempt, error, error_class, next_retry_at) \
705 VALUES ('mem-dead', 'memory', 'dead', 'MemoryBindings', 8, 'boom', 'permanent', datetime('now'))",
706 [],
707 )
708 .unwrap();
709 let n = conn
710 .execute(
711 "UPDATE queue SET status='pending', attempt=0, next_retry_at=NULL, \
712 error=NULL, error_class=NULL \
713 WHERE status='dead' AND (operation = ?1 OR operation IS NULL)",
714 rusqlite::params!["MemoryBindings"],
715 )
716 .unwrap();
717 assert_eq!(n, 1);
718 let (status, attempt, nra): (String, i64, Option<String>) = conn
719 .query_row(
720 "SELECT status, attempt, next_retry_at FROM queue WHERE item_key='mem-dead'",
721 [],
722 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
723 )
724 .unwrap();
725 assert_eq!(status, "pending");
726 assert_eq!(attempt, 0);
727 assert!(nra.is_none());
728 let _ = std::fs::remove_file(&path);
729 }
730
731 #[test]
732 fn skipped_item_keys_excludes_only_skipped_for_operation() {
733 let (conn, path) = open_temp_queue();
738 conn.execute(
739 "INSERT INTO queue (item_key, item_type, status, operation) VALUES ('mem-vetoed', 'memory', 'skipped', 'BodyEnrich')",
740 [],
741 )
742 .unwrap();
743 conn.execute(
744 "INSERT INTO queue (item_key, item_type, status, operation) VALUES ('mem-pending', 'memory', 'pending', 'BodyEnrich')",
745 [],
746 )
747 .unwrap();
748 conn.execute(
749 "INSERT INTO queue (item_key, item_type, status, operation) VALUES ('mem-other-op', 'memory', 'skipped', 'MemoryBindings')",
750 [],
751 )
752 .unwrap();
753 let keys = skipped_item_keys(&conn, "BodyEnrich").unwrap();
754 assert!(
755 keys.contains("mem-vetoed"),
756 "vetoed BodyEnrich item must be excluded from scan"
757 );
758 assert!(
759 !keys.contains("mem-pending"),
760 "pending item is still actionable"
761 );
762 assert!(
763 !keys.contains("mem-other-op"),
764 "skipped item from another operation must not leak"
765 );
766 assert_eq!(keys.len(), 1);
767 let _ = std::fs::remove_file(&path);
768 }
769
770 #[test]
771 fn cascade_cleanup_delete_targets_memory_id_and_name() {
772 let (conn, path) = open_temp_queue();
773 conn.execute(
774 "INSERT INTO queue (item_key, item_type, status, memory_id) VALUES ('by-id', 'memory', 'done', 42)",
775 [],
776 )
777 .unwrap();
778 conn.execute(
779 "INSERT INTO queue (item_key, item_type, status) VALUES ('by-name', 'memory', 'pending')",
780 [],
781 )
782 .unwrap();
783 let removed = conn
784 .execute(
785 "DELETE FROM queue WHERE memory_id = ?1 OR item_key = ?2",
786 rusqlite::params![42_i64, "by-name"],
787 )
788 .unwrap();
789 assert_eq!(removed, 2);
790 let remaining: i64 = conn
791 .query_row("SELECT COUNT(*) FROM queue", [], |r| r.get(0))
792 .unwrap();
793 assert_eq!(remaining, 0);
794 let _ = std::fs::remove_file(&path);
795 }
796
797 #[test]
798 fn item_type_for_maps_entity_and_memory() {
799 assert_eq!(
800 item_type_for(&EnrichOperation::EntityDescriptions),
801 "entity"
802 );
803 assert_eq!(item_type_for(&EnrichOperation::MemoryBindings), "memory");
804 assert_eq!(item_type_for(&EnrichOperation::AugmentBindings), "memory");
805 assert_eq!(item_type_for(&EnrichOperation::BodyExtract), "memory");
806 }
807
808 #[test]
809 fn prune_dead_orphans_removes_only_orphan_memory_rows() {
810 let main = open_test_db();
811 main.execute(
813 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'alive', 'b')",
814 [],
815 )
816 .unwrap();
817 let (queue, path) = open_temp_queue();
818 queue
820 .execute(
821 "INSERT INTO queue (item_key, item_type, status, operation, error_class) \
822 VALUES ('gone', 'memory', 'dead', 'MemoryBindings', 'permanent')",
823 [],
824 )
825 .unwrap();
826 queue
828 .execute(
829 "INSERT INTO queue (item_key, item_type, status, operation, error_class) \
830 VALUES ('alive', 'memory', 'dead', 'MemoryBindings', 'permanent')",
831 [],
832 )
833 .unwrap();
834 queue
836 .execute(
837 "INSERT INTO queue (item_key, item_type, status, operation) \
838 VALUES ('some-entity', 'entity', 'dead', 'EntityDescriptions')",
839 [],
840 )
841 .unwrap();
842
843 let pruned = prune_dead_orphans(&queue, &main, "MemoryBindings", "global").unwrap();
844 assert_eq!(pruned, 1, "only the orphan memory row is pruned");
845
846 let remaining: Vec<String> = {
847 let mut stmt = queue
848 .prepare("SELECT item_key FROM queue ORDER BY item_key")
849 .unwrap();
850 stmt.query_map([], |r| r.get::<_, String>(0))
851 .unwrap()
852 .collect::<Result<Vec<_>, _>>()
853 .unwrap()
854 };
855 assert_eq!(remaining, vec!["alive", "some-entity"]);
856 let _ = std::fs::remove_file(&path);
857 }
858}