1use std::collections::{HashMap, HashSet};
4
5use crate::models::{
6 memory_job_status, AgentNamespaceRow, ClaimedMemoryJob, EnqueueJobParams, MemoryEvidenceRow,
7 MemoryJobRow, MemoryLineageEntry, MemoryRow, ProcessedFileRow, SessionDigestRow,
8 SystemMetricRow,
9};
10use crate::{db_error, Result};
11use chrono::{DateTime, Utc};
12use nexus_core::{
13 AgentNamespace, CognitiveLevel, Memory, MemoryCategory, MemoryLaneType, PerspectiveKey,
14};
15use sqlx::SqlitePool;
16
17type Category = MemoryCategory;
19
20pub struct StoreMemoryParams<'a> {
22 pub namespace_id: i64,
23 pub content: &'a str,
24 pub category: &'a Category,
25 pub memory_lane_type: Option<&'a MemoryLaneType>,
26 pub labels: &'a [String],
27 pub metadata: &'a serde_json::Value,
28 pub embedding: Option<&'a [f32]>,
29 pub embedding_model: Option<&'a str>,
30}
31
32pub struct StoreMemoryWithLineageParams<'a> {
34 pub store: StoreMemoryParams<'a>,
35 pub source_memory_ids: &'a [i64],
36 pub evidence_role: &'a str,
37}
38
39pub struct StoreDigestParams<'a> {
41 pub namespace_id: i64,
42 pub session_key: &'a str,
43 pub digest_kind: &'a str,
44 pub memory_id: i64,
45 pub start_memory_id: Option<i64>,
46 pub end_memory_id: Option<i64>,
47 pub token_count: usize,
48}
49
50pub struct SessionDigestRollover {
51 pub last_digest_end_memory_id: Option<i64>,
52 pub new_memory_count: i64,
53 pub estimated_new_tokens: i64,
54}
55
56pub struct ListMemoryFilters<'a> {
57 pub category: Option<&'a str>,
58 pub since: Option<DateTime<Utc>>,
59 pub until: Option<DateTime<Utc>>,
60 pub content_like: Option<&'a str>,
61 pub include_raw: bool,
62 pub limit: i64,
63 pub offset: i64,
64}
65
66pub struct WorkingSetParams<'a> {
68 pub namespace_id: i64,
70 pub perspective: Option<&'a PerspectiveKey>,
73 pub max_items: usize,
75 pub include_raw: bool,
78}
79
80pub struct SemanticCandidateParams<'a> {
82 pub namespace_id: i64,
84 pub perspective: Option<&'a PerspectiveKey>,
86 pub limit: i64,
88 pub include_raw: bool,
90}
91
92#[derive(Debug, Clone)]
94pub struct MetricSample {
95 pub metric_name: String,
96 pub metric_value: f64,
97 pub labels: serde_json::Value,
98}
99
100const MAX_JOB_ATTEMPTS: i64 = 5;
102const RAW_ACTIVITY_FILTER_SQL: &str =
103 "labels NOT LIKE '%raw-activity%' AND json_extract(COALESCE(metadata, '{}'), '$.raw_activity') IS NULL";
104
105const METADATA: &str = "COALESCE(metadata, '{}')";
121
122const SESSION_KEY_FILTER: &str =
127 "(json_extract(METADATA, '$.cognitive.session_key') = ? \
128 OR EXISTS (SELECT 1 FROM json_each(COALESCE(json_extract(metadata, '$.cognitive.session_keys'), '[]')) WHERE value = ?))";
129
130const PERSPECTIVE_IDENTITY_FILTER: &str = "json_extract(METADATA, '$.cognitive.observer') = ? \
134 AND json_extract(METADATA, '$.cognitive.subject') = ?";
135
136const COGNITIVE_LEVEL_EXPR: &str = "json_extract(METADATA, '$.cognitive.level')";
138
139fn perspective_where_clause(perspective: &PerspectiveKey) -> String {
146 if perspective.session_key.is_some() {
147 format!(
148 "{} AND {}",
149 PERSPECTIVE_IDENTITY_FILTER.replace("METADATA", METADATA),
150 SESSION_KEY_FILTER.replace("METADATA", METADATA),
151 )
152 } else {
153 PERSPECTIVE_IDENTITY_FILTER.replace("METADATA", METADATA)
154 }
155}
156
157fn bind_perspective(perspective: &PerspectiveKey) -> Vec<&str> {
160 let mut vals = vec![perspective.observer.as_str(), perspective.subject.as_str()];
161 if let Some(ref sk) = perspective.session_key {
162 vals.push(sk.as_str());
163 vals.push(sk.as_str());
164 }
165 vals
166}
167
168pub struct MemoryRepository {
170 pool: SqlitePool,
171}
172
173impl MemoryRepository {
174 pub fn new(pool: SqlitePool) -> Self {
175 Self { pool }
176 }
177
178 pub fn pool(&self) -> &SqlitePool {
179 &self.pool
180 }
181
182 pub async fn store(&self, params: StoreMemoryParams<'_>) -> Result<Memory> {
184 let labels_json = serde_json::to_string(params.labels)?;
185 let metadata_json = serde_json::to_string(params.metadata)?;
186 let embedding_json = params.embedding.map(serde_json::to_string).transpose()?;
187
188 let result = sqlx::query(
189 r#"
190 INSERT INTO memories (
191 namespace_id, content, category, memory_lane_type, labels, metadata,
192 content_embedding, embedding_model, created_at, is_active, access_count
193 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 1, 0)
194 "#,
195 )
196 .bind(params.namespace_id)
197 .bind(params.content)
198 .bind(params.category.to_string())
199 .bind(params.memory_lane_type.map(|t| t.to_string()))
200 .bind(&labels_json)
201 .bind(&metadata_json)
202 .bind(&embedding_json)
203 .bind(params.embedding_model)
204 .bind(Utc::now())
205 .execute(&self.pool)
206 .await
207 .map_err(db_error)?;
208
209 let id = result.last_insert_rowid();
210
211 if id == 0 {
216 let row: Option<MemoryRow> = sqlx::query_as(
218 "SELECT * FROM memories WHERE namespace_id = ? AND LOWER(TRIM(content)) = LOWER(TRIM(?)) AND is_active = 1 ORDER BY created_at DESC LIMIT 1"
219 )
220 .bind(params.namespace_id)
221 .bind(params.content)
222 .fetch_optional(&self.pool)
223 .await
224 .map_err(db_error)?;
225
226 if let Some(existing) = row {
227 self.merge_duplicate_memory_context(existing.id, params)
228 .await?;
229 return self.get_by_id(existing.id).await?.ok_or_else(|| {
230 nexus_core::NexusError::Storage(format!(
231 "Duplicate merged row {} could not be reloaded",
232 existing.id
233 ))
234 });
235 }
236
237 tracing::warn!(
240 namespace_id = params.namespace_id,
241 content_length = params.content.len(),
242 "Insert returned id 0 but no matching duplicate found - treating as successful insert"
243 );
244
245 return self
248 .get_by_content(params.namespace_id, params.content)
249 .await;
250 }
251
252 self.get_by_id(id).await?.ok_or_else(|| {
253 nexus_core::NexusError::Storage(format!("Failed to retrieve memory with id {}", id))
254 })
255 }
256
257 async fn merge_duplicate_memory_context(
258 &self,
259 existing_id: i64,
260 params: StoreMemoryParams<'_>,
261 ) -> Result<()> {
262 let current = self.get_by_id(existing_id).await?.ok_or_else(|| {
263 nexus_core::NexusError::Storage(format!(
264 "Failed to load duplicate-merged memory {}",
265 existing_id
266 ))
267 })?;
268
269 let merged_labels = merge_labels(¤t.labels, params.labels);
270 let merged_metadata = merge_duplicate_metadata(¤t.metadata, params.metadata);
271 let labels_json = serde_json::to_string(&merged_labels)?;
272 let metadata_json = serde_json::to_string(&merged_metadata)?;
273
274 sqlx::query(
275 r#"
276 UPDATE memories
277 SET labels = ?, metadata = ?, updated_at = ?
278 WHERE id = ?
279 "#,
280 )
281 .bind(&labels_json)
282 .bind(&metadata_json)
283 .bind(Utc::now())
284 .bind(existing_id)
285 .execute(&self.pool)
286 .await
287 .map_err(db_error)?;
288
289 Ok(())
290 }
291
292 pub async fn store_with_lineage(
294 &self,
295 params: StoreMemoryWithLineageParams<'_>,
296 ) -> Result<Memory> {
297 let mut tx = self.pool.begin().await.map_err(db_error)?;
298 let memory_id = insert_memory_tx(&mut tx, ¶ms.store).await?;
299 for &source_id in params.source_memory_ids {
300 insert_evidence_tx(&mut tx, memory_id, source_id, params.evidence_role).await?;
301 }
302 tx.commit().await.map_err(db_error)?;
303
304 self.get_by_id(memory_id).await?.ok_or_else(|| {
305 nexus_core::NexusError::Storage(format!(
306 "Failed to retrieve memory with id {} after lineage store",
307 memory_id
308 ))
309 })
310 }
311
312 pub async fn enqueue_job(&self, params: EnqueueJobParams<'_>) -> Result<i64> {
314 let perspective_json = params.perspective.map(serde_json::to_string).transpose()?;
315 let payload_json = serde_json::to_string(params.payload)?;
316
317 let id: i64 = sqlx::query_scalar(
318 r#"
319 INSERT INTO memory_jobs (namespace_id, job_type, status, priority, perspective_json, payload_json, created_at, updated_at)
320 VALUES (?, ?, 'pending', ?, ?, ?, datetime('now'), datetime('now'))
321 RETURNING id
322 "#,
323 )
324 .bind(params.namespace_id)
325 .bind(params.job_type)
326 .bind(params.priority)
327 .bind(&perspective_json)
328 .bind(&payload_json)
329 .fetch_one(&self.pool)
330 .await
331 .map_err(db_error)?;
332
333 Ok(id)
334 }
335
336 pub async fn claim_jobs(
341 &self,
342 namespace_id: i64,
343 job_type: &str,
344 lease_owner: &str,
345 lease_ttl_secs: u64,
346 limit: i64,
347 ) -> Result<Vec<ClaimedMemoryJob>> {
348 let claim_token = new_claim_token(lease_owner);
349 let rows: Vec<MemoryJobRow> = sqlx::query_as::<_, MemoryJobRow>(
353 r#"
354 WITH candidates AS (
355 SELECT id
356 FROM memory_jobs
357 WHERE namespace_id = ? AND job_type = ? AND (
358 status = ?
359 OR (status = ? AND lease_expires_at IS NOT NULL AND lease_expires_at < datetime('now'))
360 )
361 ORDER BY priority DESC, created_at ASC
362 LIMIT ?
363 )
364 UPDATE memory_jobs
365 SET status = ?,
366 lease_owner = ?,
367 claim_token = ?,
368 lease_expires_at = datetime('now', '+' || ? || ' seconds'),
369 attempts = attempts + 1,
370 updated_at = datetime('now')
371 WHERE id IN (SELECT id FROM candidates)
372 RETURNING *
373 "#,
374 )
375 .bind(namespace_id)
376 .bind(job_type)
377 .bind(memory_job_status::PENDING)
378 .bind(memory_job_status::RUNNING)
379 .bind(limit)
380 .bind(memory_job_status::RUNNING)
381 .bind(lease_owner)
382 .bind(&claim_token)
383 .bind(lease_ttl_secs as i64)
384 .fetch_all(&self.pool)
385 .await
386 .map_err(db_error)?;
387
388 let mut rows = rows;
389 rows.sort_by(|left, right| {
390 right
391 .priority
392 .cmp(&left.priority)
393 .then_with(|| left.created_at.cmp(&right.created_at))
394 });
395
396 let mut claimed = Vec::with_capacity(rows.len());
397 for row in rows {
398 let perspective = match row.perspective_json.as_deref() {
399 Some(s) => match serde_json::from_str(s) {
400 Ok(p) => Some(p),
401 Err(e) => {
402 tracing::warn!(
403 job_id = row.id,
404 error = %e,
405 "corrupted perspective JSON, permanently failing job"
406 );
407 let _ = self
408 .permanently_fail_job(
409 row.id,
410 &row.lease_owner,
411 &row.claim_token,
412 &format!("corrupted perspective JSON: {e}"),
413 )
414 .await;
415 continue;
416 }
417 },
418 None => None,
419 };
420 let payload: serde_json::Value = match serde_json::from_str(&row.payload_json) {
421 Ok(p) => p,
422 Err(e) => {
423 tracing::warn!(
424 job_id = row.id,
425 error = %e,
426 "corrupted payload JSON, permanently failing job"
427 );
428 let _ = self
429 .permanently_fail_job(
430 row.id,
431 &row.lease_owner,
432 &row.claim_token,
433 &format!("corrupted payload JSON: {e}"),
434 )
435 .await;
436 continue;
437 }
438 };
439 claimed.push(ClaimedMemoryJob {
440 row,
441 perspective,
442 payload,
443 });
444 }
445
446 Ok(claimed)
447 }
448
449 pub async fn complete_job(&self, job: &ClaimedMemoryJob) -> Result<()> {
451 let result = sqlx::query(
452 r#"
453 UPDATE memory_jobs
454 SET status = ?, lease_owner = NULL, claim_token = NULL, lease_expires_at = NULL, updated_at = datetime('now')
455 WHERE id = ?
456 AND lease_owner = ?
457 AND claim_token = ?
458 "#,
459 )
460 .bind(memory_job_status::COMPLETED)
461 .bind(job.row.id)
462 .bind(job.row.lease_owner.as_deref())
463 .bind(job.row.claim_token.as_deref())
464 .execute(&self.pool)
465 .await
466 .map_err(db_error)?;
467
468 if result.rows_affected() == 0 {
469 return Err(nexus_core::NexusError::Storage(format!(
470 "Memory job {} completion lost lease ownership",
471 job.row.id
472 )));
473 }
474
475 Ok(())
476 }
477
478 pub async fn fail_job(&self, job: &ClaimedMemoryJob, error: &str) -> Result<()> {
481 let row: Option<MemoryJobRow> = sqlx::query_as("SELECT * FROM memory_jobs WHERE id = ?")
482 .bind(job.row.id)
483 .fetch_optional(&self.pool)
484 .await
485 .map_err(db_error)?;
486
487 let row = row.ok_or_else(|| {
488 nexus_core::NexusError::Storage(format!("Memory job {} not found", job.row.id))
489 })?;
490
491 let lease_matches =
492 row.lease_owner == job.row.lease_owner && row.claim_token == job.row.claim_token;
493 if !lease_matches {
494 return Err(nexus_core::NexusError::Storage(format!(
495 "Memory job {} failure lost lease ownership",
496 job.row.id
497 )));
498 }
499
500 if row.attempts >= MAX_JOB_ATTEMPTS {
501 let result = sqlx::query(
503 r#"
504 UPDATE memory_jobs
505 SET status = ?, last_error = ?, lease_owner = NULL, claim_token = NULL, lease_expires_at = NULL, updated_at = datetime('now')
506 WHERE id = ? AND lease_owner = ? AND claim_token = ?
507 "#,
508 )
509 .bind(memory_job_status::FAILED)
510 .bind(error)
511 .bind(job.row.id)
512 .bind(job.row.lease_owner.as_deref())
513 .bind(job.row.claim_token.as_deref())
514 .execute(&self.pool)
515 .await
516 .map_err(db_error)?;
517 if result.rows_affected() == 0 {
518 return Err(nexus_core::NexusError::Storage(format!(
519 "Memory job {} failure lost lease ownership",
520 job.row.id
521 )));
522 }
523 } else {
524 let result = sqlx::query(
526 r#"
527 UPDATE memory_jobs
528 SET status = ?, last_error = ?, lease_owner = NULL, claim_token = NULL, lease_expires_at = NULL, updated_at = datetime('now')
529 WHERE id = ? AND lease_owner = ? AND claim_token = ?
530 "#,
531 )
532 .bind(memory_job_status::PENDING)
533 .bind(error)
534 .bind(job.row.id)
535 .bind(job.row.lease_owner.as_deref())
536 .bind(job.row.claim_token.as_deref())
537 .execute(&self.pool)
538 .await
539 .map_err(db_error)?;
540 if result.rows_affected() == 0 {
541 return Err(nexus_core::NexusError::Storage(format!(
542 "Memory job {} failure lost lease ownership",
543 job.row.id
544 )));
545 }
546 }
547
548 Ok(())
549 }
550
551 async fn permanently_fail_job(
555 &self,
556 job_id: i64,
557 lease_owner: &Option<String>,
558 claim_token: &Option<String>,
559 error: &str,
560 ) -> Result<()> {
561 sqlx::query(
562 r#"
563 UPDATE memory_jobs
564 SET status = ?, last_error = ?, lease_owner = NULL, claim_token = NULL,
565 lease_expires_at = NULL, updated_at = datetime('now')
566 WHERE id = ? AND lease_owner = ? AND claim_token = ?
567 "#,
568 )
569 .bind(memory_job_status::FAILED)
570 .bind(error)
571 .bind(job_id)
572 .bind(lease_owner.as_deref())
573 .bind(claim_token.as_deref())
574 .execute(&self.pool)
575 .await
576 .map_err(db_error)?;
577 Ok(())
578 }
579
580 pub async fn get_most_reinforced_by_namespace(
581 &self,
582 namespace_id: i64,
583 limit: i64,
584 include_raw: bool,
585 ) -> Result<Vec<Memory>> {
586 let noise_sql = if include_raw {
587 String::new()
588 } else {
589 format!("AND {}", RAW_ACTIVITY_FILTER_SQL)
590 };
591 let rows = sqlx::query_as::<_, MemoryRow>(&format!(
592 r#"
593 SELECT * FROM memories
594 WHERE namespace_id = ?
595 AND is_active = 1
596 AND json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.level') = ?
597 {noise_sql}
598 ORDER BY COALESCE(json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.times_reinforced'), 0) DESC,
599 created_at DESC
600 LIMIT ?
601 "#
602 ))
603 .bind(namespace_id)
604 .bind(CognitiveLevel::Derived.as_str())
605 .bind(limit)
606 .fetch_all(&self.pool)
607 .await
608 .map_err(db_error)?;
609
610 rows.into_iter()
611 .map(|row| self.row_to_memory(row))
612 .collect()
613 }
614
615 pub async fn get_contradictions_by_namespace(
616 &self,
617 namespace_id: i64,
618 limit: i64,
619 include_raw: bool,
620 ) -> Result<Vec<Memory>> {
621 let noise_sql = if include_raw {
622 String::new()
623 } else {
624 format!("AND {}", RAW_ACTIVITY_FILTER_SQL)
625 };
626 let rows = sqlx::query_as::<_, MemoryRow>(&format!(
627 r#"
628 SELECT * FROM memories
629 WHERE namespace_id = ?
630 AND is_active = 1
631 AND json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.level') = ?
632 {noise_sql}
633 ORDER BY COALESCE(json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.times_contradicted'), 0) DESC,
634 created_at DESC
635 LIMIT ?
636 "#
637 ))
638 .bind(namespace_id)
639 .bind(CognitiveLevel::Contradiction.as_str())
640 .bind(limit)
641 .fetch_all(&self.pool)
642 .await
643 .map_err(db_error)?;
644
645 rows.into_iter()
646 .map(|row| self.row_to_memory(row))
647 .collect()
648 }
649
650 pub async fn list_by_session_key(
651 &self,
652 namespace_id: i64,
653 session_key: &str,
654 limit: i64,
655 include_raw: bool,
656 ) -> Result<Vec<Memory>> {
657 let noise_sql = if include_raw {
658 String::new()
659 } else {
660 format!("AND {}", RAW_ACTIVITY_FILTER_SQL)
661 };
662 let session_filter = SESSION_KEY_FILTER.replace("METADATA", METADATA);
663 let rows = sqlx::query_as::<_, MemoryRow>(&format!(
664 r#"
665 SELECT * FROM memories
666 WHERE namespace_id = ?
667 AND is_active = 1
668 AND {session_filter}
669 {noise_sql}
670 ORDER BY created_at DESC
671 LIMIT ?
672 "#
673 ))
674 .bind(namespace_id)
675 .bind(session_key)
676 .bind(session_key)
677 .bind(limit)
678 .fetch_all(&self.pool)
679 .await
680 .map_err(db_error)?;
681
682 rows.into_iter()
683 .map(|row| self.row_to_memory(row))
684 .collect()
685 }
686
687 pub async fn store_digest(&self, params: StoreDigestParams<'_>) -> Result<i64> {
693 let id: i64 = sqlx::query_scalar(
694 r#"
695 INSERT INTO session_digests (namespace_id, session_key, digest_kind, memory_id, start_memory_id, end_memory_id, token_count, created_at)
696 VALUES (?, ?, ?, ?, ?, ?, ?, datetime('now'))
697 ON CONFLICT(namespace_id, session_key, digest_kind, end_memory_id) DO UPDATE SET
698 memory_id = excluded.memory_id,
699 token_count = excluded.token_count
700 RETURNING id
701 "#,
702 )
703 .bind(params.namespace_id)
704 .bind(params.session_key)
705 .bind(params.digest_kind)
706 .bind(params.memory_id)
707 .bind(params.start_memory_id)
708 .bind(params.end_memory_id)
709 .bind(params.token_count as i64)
710 .fetch_one(&self.pool)
711 .await
712 .map_err(db_error)?;
713
714 Ok(id)
715 }
716
717 pub async fn latest_digest_for_session(
720 &self,
721 namespace_id: i64,
722 session_key: &str,
723 digest_kind: &str,
724 ) -> Result<Option<Memory>> {
725 let digest: Option<SessionDigestRow> = sqlx::query_as::<_, SessionDigestRow>(
726 "SELECT * FROM session_digests WHERE namespace_id = ? AND session_key = ? AND digest_kind = ? ORDER BY created_at DESC LIMIT 1",
727 )
728 .bind(namespace_id)
729 .bind(session_key)
730 .bind(digest_kind)
731 .fetch_optional(&self.pool)
732 .await
733 .map_err(db_error)?;
734
735 match digest {
736 Some(d) => self.get_by_id(d.memory_id).await,
737 None => Ok(None),
738 }
739 }
740
741 pub async fn latest_digest_for_namespace(
745 &self,
746 namespace_id: i64,
747 digest_kind: &str,
748 ) -> Result<Option<Memory>> {
749 let digest: Option<SessionDigestRow> = sqlx::query_as::<_, SessionDigestRow>(
750 "SELECT * FROM session_digests WHERE namespace_id = ? AND digest_kind = ? ORDER BY created_at DESC LIMIT 1",
751 )
752 .bind(namespace_id)
753 .bind(digest_kind)
754 .fetch_optional(&self.pool)
755 .await
756 .map_err(db_error)?;
757
758 match digest {
759 Some(d) => self.get_by_id(d.memory_id).await,
760 None => Ok(None),
761 }
762 }
763
764 pub async fn session_digest_rollover(
767 &self,
768 namespace_id: i64,
769 session_key: &str,
770 ) -> Result<SessionDigestRollover> {
771 let last_digest_end_memory_id: Option<i64> = sqlx::query_scalar(
772 "SELECT MAX(end_memory_id) FROM session_digests WHERE namespace_id = ? AND session_key = ?",
773 )
774 .bind(namespace_id)
775 .bind(session_key)
776 .fetch_one(&self.pool)
777 .await
778 .map_err(db_error)?;
779
780 let (new_memory_count, estimated_new_tokens): (i64, i64) = sqlx::query_as(&format!(
781 r#"
782 SELECT
783 COUNT(*) as new_memory_count,
784 COALESCE(SUM((LENGTH(content) + 3) / 4), 0) as estimated_new_tokens
785 FROM memories
786 WHERE namespace_id = ?
787 AND is_active = 1
788 AND id > ?
789 AND (
790 json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.session_key') = ?
791 OR EXISTS (
792 SELECT 1
793 FROM json_each(COALESCE(json_extract(metadata, '$.cognitive.session_keys'), '[]'))
794 WHERE value = ?
795 )
796 )
797 AND {RAW_ACTIVITY_FILTER_SQL}
798 AND COALESCE(json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.level'), '') NOT IN ('raw', 'summary_short', 'summary_long')
799 "#,
800 ))
801 .bind(namespace_id)
802 .bind(last_digest_end_memory_id.unwrap_or(0))
803 .bind(session_key)
804 .bind(session_key)
805 .fetch_one(&self.pool)
806 .await
807 .map_err(db_error)?;
808
809 Ok(SessionDigestRollover {
810 last_digest_end_memory_id,
811 new_memory_count,
812 estimated_new_tokens,
813 })
814 }
815
816 pub async fn get_recent_by_perspective(
821 &self,
822 namespace_id: i64,
823 perspective: &PerspectiveKey,
824 limit: i64,
825 ) -> Result<Vec<Memory>> {
826 self.get_recent_by_perspective_opts(namespace_id, perspective, limit, false)
827 .await
828 }
829
830 pub async fn get_recent_by_perspective_opts(
833 &self,
834 namespace_id: i64,
835 perspective: &PerspectiveKey,
836 limit: i64,
837 include_raw: bool,
838 ) -> Result<Vec<Memory>> {
839 let noise_sql = if include_raw {
840 String::new()
841 } else {
842 format!("AND {}", RAW_ACTIVITY_FILTER_SQL)
843 };
844
845 let perspective_sql = perspective_where_clause(perspective);
846 let sql = format!(
847 r#"
848 SELECT * FROM memories
849 WHERE namespace_id = ?
850 AND is_active = 1
851 AND {perspective_sql}
852 {noise_sql}
853 ORDER BY created_at DESC
854 LIMIT ?
855 "#
856 );
857 let mut query = sqlx::query_as::<_, MemoryRow>(&sql).bind(namespace_id);
858
859 for val in bind_perspective(perspective) {
860 query = query.bind(val);
861 }
862
863 let rows = query
864 .bind(limit)
865 .fetch_all(&self.pool)
866 .await
867 .map_err(db_error)?;
868
869 rows.into_iter()
870 .map(|row| self.row_to_memory(row))
871 .collect()
872 }
873
874 pub async fn get_by_cognitive_level(
876 &self,
877 namespace_id: i64,
878 level: CognitiveLevel,
879 limit: i64,
880 ) -> Result<Vec<Memory>> {
881 let rows = sqlx::query_as::<_, MemoryRow>(&format!(
882 r#"
883 SELECT * FROM memories
884 WHERE namespace_id = ?
885 AND is_active = 1
886 AND {} = ?
887 ORDER BY created_at DESC
888 LIMIT ?
889 "#,
890 COGNITIVE_LEVEL_EXPR,
891 ))
892 .bind(namespace_id)
893 .bind(level.as_str())
894 .bind(limit)
895 .fetch_all(&self.pool)
896 .await
897 .map_err(db_error)?;
898
899 rows.into_iter()
900 .map(|row| self.row_to_memory(row))
901 .collect()
902 }
903
904 pub async fn get_by_cognitive_level_with_perspective(
910 &self,
911 namespace_id: i64,
912 level: CognitiveLevel,
913 perspective: &PerspectiveKey,
914 limit: i64,
915 ) -> Result<Vec<Memory>> {
916 let perspective_sql = perspective_where_clause(perspective);
917 let sql = format!(
918 r#"
919 SELECT * FROM memories
920 WHERE namespace_id = ?
921 AND is_active = 1
922 AND {COGNITIVE_LEVEL_EXPR} = ?
923 AND {perspective_sql}
924 ORDER BY created_at DESC
925 LIMIT ?
926 "#
927 );
928 let mut query = sqlx::query_as::<_, MemoryRow>(&sql)
929 .bind(namespace_id)
930 .bind(level.as_str());
931
932 for val in bind_perspective(perspective) {
933 query = query.bind(val);
934 }
935
936 let rows = query
937 .bind(limit)
938 .fetch_all(&self.pool)
939 .await
940 .map_err(db_error)?;
941
942 rows.into_iter()
943 .map(|row| self.row_to_memory(row))
944 .collect()
945 }
946
947 pub async fn get_most_reinforced_by_perspective(
951 &self,
952 namespace_id: i64,
953 perspective: &PerspectiveKey,
954 limit: i64,
955 ) -> Result<Vec<Memory>> {
956 self.get_most_reinforced_by_perspective_opts(namespace_id, perspective, limit, false)
957 .await
958 }
959
960 pub async fn get_most_reinforced_by_perspective_opts(
963 &self,
964 namespace_id: i64,
965 perspective: &PerspectiveKey,
966 limit: i64,
967 include_raw: bool,
968 ) -> Result<Vec<Memory>> {
969 let noise_sql = if include_raw {
970 String::new()
971 } else {
972 format!("AND {}", RAW_ACTIVITY_FILTER_SQL)
973 };
974
975 let perspective_sql = perspective_where_clause(perspective);
976 let sql = format!(
977 r#"
978 SELECT * FROM memories
979 WHERE namespace_id = ?
980 AND is_active = 1
981 AND {perspective_sql}
982 AND {COGNITIVE_LEVEL_EXPR} != ?
983 {noise_sql}
984 ORDER BY COALESCE(json_extract({METADATA}, '$.cognitive.times_reinforced'), 0) DESC,
985 created_at DESC
986 LIMIT ?
987 "#
988 );
989 let mut query = sqlx::query_as::<_, MemoryRow>(&sql).bind(namespace_id);
990
991 for val in bind_perspective(perspective) {
992 query = query.bind(val);
993 }
994
995 let rows = query
996 .bind(CognitiveLevel::Contradiction.as_str())
997 .bind(limit)
998 .fetch_all(&self.pool)
999 .await
1000 .map_err(db_error)?;
1001
1002 rows.into_iter()
1003 .map(|row| self.row_to_memory(row))
1004 .collect()
1005 }
1006
1007 pub async fn get_contradictions_by_perspective(
1011 &self,
1012 namespace_id: i64,
1013 perspective: &PerspectiveKey,
1014 limit: i64,
1015 ) -> Result<Vec<Memory>> {
1016 self.get_contradictions_by_perspective_opts(namespace_id, perspective, limit, false)
1017 .await
1018 }
1019
1020 pub async fn get_contradictions_by_perspective_opts(
1023 &self,
1024 namespace_id: i64,
1025 perspective: &PerspectiveKey,
1026 limit: i64,
1027 include_raw: bool,
1028 ) -> Result<Vec<Memory>> {
1029 let noise_sql = if include_raw {
1030 String::new()
1031 } else {
1032 format!("AND {}", RAW_ACTIVITY_FILTER_SQL)
1033 };
1034
1035 let perspective_sql = perspective_where_clause(perspective);
1036 let sql = format!(
1037 r#"
1038 SELECT * FROM memories
1039 WHERE namespace_id = ?
1040 AND is_active = 1
1041 AND {perspective_sql}
1042 AND {COGNITIVE_LEVEL_EXPR} = ?
1043 {noise_sql}
1044 ORDER BY COALESCE(json_extract({METADATA}, '$.cognitive.times_contradicted'), 0) DESC,
1045 created_at DESC
1046 LIMIT ?
1047 "#
1048 );
1049 let mut query = sqlx::query_as::<_, MemoryRow>(&sql).bind(namespace_id);
1050
1051 for val in bind_perspective(perspective) {
1052 query = query.bind(val);
1053 }
1054
1055 let rows = query
1056 .bind(CognitiveLevel::Contradiction.as_str())
1057 .bind(limit)
1058 .fetch_all(&self.pool)
1059 .await
1060 .map_err(db_error)?;
1061
1062 rows.into_iter()
1063 .map(|row| self.row_to_memory(row))
1064 .collect()
1065 }
1066
1067 pub async fn search_working_set(&self, params: WorkingSetParams<'_>) -> Result<Vec<Memory>> {
1086 let WorkingSetParams {
1087 namespace_id,
1088 perspective,
1089 max_items,
1090 include_raw,
1091 } = params;
1092
1093 let per_bucket = (max_items as i64).max(8);
1096
1097 let session = perspective
1099 .and_then(|p| p.session_key.as_deref())
1100 .unwrap_or("");
1101 let mut digests = Vec::new();
1102 if let Some(short) = self
1103 .latest_digest_for_session(namespace_id, session, "short")
1104 .await?
1105 {
1106 digests.push(short);
1107 }
1108 if let Some(long) = self
1109 .latest_digest_for_session(namespace_id, session, "long")
1110 .await?
1111 {
1112 digests.push(long);
1113 }
1114
1115 let reinforced = if let Some(persp) = perspective {
1117 self.get_most_reinforced_by_perspective_opts(
1118 namespace_id,
1119 persp,
1120 per_bucket,
1121 include_raw,
1122 )
1123 .await?
1124 } else {
1125 self.get_most_reinforced_by_namespace(namespace_id, per_bucket, include_raw)
1126 .await?
1127 };
1128
1129 let recent = if let Some(persp) = perspective {
1130 self.get_recent_by_perspective_opts(namespace_id, persp, per_bucket, include_raw)
1131 .await?
1132 } else {
1133 let sql = if include_raw {
1134 "SELECT * FROM memories WHERE namespace_id = ? AND is_active = 1 ORDER BY created_at DESC LIMIT ?"
1135 .to_string()
1136 } else {
1137 format!(
1138 "SELECT * FROM memories WHERE namespace_id = ? AND is_active = 1 AND {} ORDER BY created_at DESC LIMIT ?",
1139 RAW_ACTIVITY_FILTER_SQL,
1140 )
1141 };
1142 let rows: Vec<MemoryRow> = sqlx::query_as(&sql)
1143 .bind(namespace_id)
1144 .bind(per_bucket)
1145 .fetch_all(&self.pool)
1146 .await
1147 .map_err(db_error)?;
1148 rows.into_iter()
1149 .map(|r| self.row_to_memory(r))
1150 .collect::<Result<Vec<_>>>()?
1151 };
1152
1153 let contradictions = if let Some(persp) = perspective {
1154 self.get_contradictions_by_perspective_opts(
1155 namespace_id,
1156 persp,
1157 per_bucket,
1158 include_raw,
1159 )
1160 .await?
1161 } else {
1162 self.get_contradictions_by_namespace(namespace_id, per_bucket, include_raw)
1163 .await?
1164 };
1165
1166 let mut seen = std::collections::HashSet::new();
1168 let mut result = Vec::with_capacity(max_items);
1169
1170 for memory in digests
1171 .into_iter()
1172 .chain(reinforced)
1173 .chain(recent)
1174 .chain(contradictions)
1175 {
1176 if seen.insert(memory.id) {
1177 result.push(memory);
1178 if result.len() >= max_items {
1179 break;
1180 }
1181 }
1182 }
1183
1184 Ok(result)
1185 }
1186
1187 pub async fn load_lineage(&self, memory_id: i64) -> Result<Vec<MemoryLineageEntry>> {
1189 let rows: Vec<MemoryEvidenceRow> = sqlx::query_as::<_, MemoryEvidenceRow>(
1190 r#"
1191 SELECT * FROM memory_evidence
1192 WHERE derived_memory_id = ? OR source_memory_id = ?
1193 ORDER BY created_at ASC
1194 "#,
1195 )
1196 .bind(memory_id)
1197 .bind(memory_id)
1198 .fetch_all(&self.pool)
1199 .await
1200 .map_err(db_error)?;
1201
1202 Ok(rows
1203 .into_iter()
1204 .map(|r| MemoryLineageEntry {
1205 derived_memory_id: r.derived_memory_id,
1206 source_memory_id: r.source_memory_id,
1207 evidence_role: r.evidence_role,
1208 })
1209 .collect())
1210 }
1211
1212 pub async fn load_lineage_batch(
1214 &self,
1215 memory_ids: &[i64],
1216 ) -> Result<HashMap<i64, Vec<MemoryLineageEntry>>> {
1217 if memory_ids.is_empty() {
1218 return Ok(HashMap::new());
1219 }
1220
1221 let placeholders = memory_ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1222 let sql = format!(
1223 r#"
1224 SELECT * FROM memory_evidence
1225 WHERE derived_memory_id IN ({placeholders})
1226 OR source_memory_id IN ({placeholders})
1227 ORDER BY created_at ASC
1228 "#
1229 );
1230
1231 let mut query = sqlx::query_as::<_, MemoryEvidenceRow>(&sql);
1232 for id in memory_ids {
1233 query = query.bind(*id);
1234 }
1235 for id in memory_ids {
1236 query = query.bind(*id);
1237 }
1238
1239 let rows = query.fetch_all(&self.pool).await.map_err(db_error)?;
1240 let id_set: HashSet<i64> = memory_ids.iter().copied().collect();
1241 let mut grouped: HashMap<i64, Vec<MemoryLineageEntry>> = HashMap::new();
1242
1243 for row in rows {
1244 let entry = MemoryLineageEntry {
1245 derived_memory_id: row.derived_memory_id,
1246 source_memory_id: row.source_memory_id,
1247 evidence_role: row.evidence_role,
1248 };
1249
1250 if id_set.contains(&entry.derived_memory_id) {
1251 grouped
1252 .entry(entry.derived_memory_id)
1253 .or_default()
1254 .push(entry.clone());
1255 }
1256 if id_set.contains(&entry.source_memory_id) {
1257 grouped
1258 .entry(entry.source_memory_id)
1259 .or_default()
1260 .push(entry);
1261 }
1262 }
1263
1264 Ok(grouped)
1265 }
1266
1267 pub async fn get_by_id(&self, id: i64) -> Result<Option<Memory>> {
1269 let row: Option<MemoryRow> = sqlx::query_as("SELECT * FROM memories WHERE id = ?")
1270 .bind(id)
1271 .fetch_optional(&self.pool)
1272 .await
1273 .map_err(db_error)?;
1274
1275 row.map(|r| self.row_to_memory(r)).transpose()
1276 }
1277
1278 pub async fn get_by_content(&self, namespace_id: i64, content: &str) -> Result<Memory> {
1280 let row: Option<MemoryRow> = sqlx::query_as(
1281 "SELECT * FROM memories WHERE namespace_id = ? AND LOWER(TRIM(content)) = LOWER(TRIM(?)) AND is_active = 1 ORDER BY created_at DESC LIMIT 1"
1282 )
1283 .bind(namespace_id)
1284 .bind(content)
1285 .fetch_optional(&self.pool)
1286 .await
1287 .map_err(db_error)?;
1288
1289 row.map(|r| self.row_to_memory(r))
1290 .transpose()?
1291 .ok_or_else(|| {
1292 nexus_core::NexusError::Storage(
1293 "No memories found in namespace after insert".to_string(),
1294 )
1295 })
1296 }
1297
1298 pub async fn search_by_namespace(
1300 &self,
1301 namespace_id: i64,
1302 limit: usize,
1303 offset: usize,
1304 ) -> Result<Vec<Memory>> {
1305 let rows: Vec<MemoryRow> = sqlx::query_as(
1306 "SELECT * FROM memories WHERE namespace_id = ? AND is_active = 1 ORDER BY created_at DESC LIMIT ? OFFSET ?"
1307 )
1308 .bind(namespace_id)
1309 .bind(limit as i64)
1310 .bind(offset as i64)
1311 .fetch_all(&self.pool)
1312 .await
1313 .map_err(db_error)?;
1314
1315 rows.into_iter().map(|r| self.row_to_memory(r)).collect()
1316 }
1317
1318 pub async fn count_by_namespace(&self, namespace_id: i64) -> Result<i64> {
1320 let count: (i64,) = sqlx::query_as(
1321 "SELECT COUNT(*) FROM memories WHERE namespace_id = ? AND is_active = 1",
1322 )
1323 .bind(namespace_id)
1324 .fetch_one(&self.pool)
1325 .await
1326 .map_err(db_error)?;
1327
1328 Ok(count.0)
1329 }
1330
1331 pub async fn count_all_by_namespace(&self, namespace_id: i64) -> Result<i64> {
1333 let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM memories WHERE namespace_id = ?")
1334 .bind(namespace_id)
1335 .fetch_one(&self.pool)
1336 .await
1337 .map_err(db_error)?;
1338
1339 Ok(count.0)
1340 }
1341
1342 pub async fn count_archived_by_namespace(&self, namespace_id: i64) -> Result<i64> {
1344 let count: (i64,) = sqlx::query_as(
1345 "SELECT COUNT(*) FROM memories WHERE namespace_id = ? AND is_archived = 1",
1346 )
1347 .bind(namespace_id)
1348 .fetch_one(&self.pool)
1349 .await
1350 .map_err(db_error)?;
1351
1352 Ok(count.0)
1353 }
1354
1355 pub async fn delete(&self, id: i64) -> Result<bool> {
1357 let result = sqlx::query("DELETE FROM memories WHERE id = ?")
1358 .bind(id)
1359 .execute(&self.pool)
1360 .await
1361 .map_err(db_error)?;
1362
1363 Ok(result.rows_affected() > 0)
1364 }
1365
1366 pub async fn touch(&self, id: i64) -> Result<()> {
1368 sqlx::query(
1369 "UPDATE memories SET access_count = access_count + 1, last_accessed = ? WHERE id = ?",
1370 )
1371 .bind(Utc::now())
1372 .bind(id)
1373 .execute(&self.pool)
1374 .await
1375 .map_err(db_error)?;
1376
1377 Ok(())
1378 }
1379
1380 pub async fn get_unconsolidated(
1382 &self,
1383 namespace_id: i64,
1384 limit: i32,
1385 ) -> Result<Vec<MemoryRow>> {
1386 let rows = sqlx::query_as::<_, MemoryRow>(
1387 r#"
1388 SELECT * FROM memories
1389 WHERE namespace_id = ?
1390 AND is_active = 1
1391 AND (metadata IS NULL OR json_extract(metadata, '$.agent.consolidated') IS NULL)
1392 ORDER BY created_at ASC
1393 LIMIT ?
1394 "#,
1395 )
1396 .bind(namespace_id)
1397 .bind(limit)
1398 .fetch_all(&self.pool)
1399 .await
1400 .map_err(db_error)?;
1401
1402 Ok(rows)
1403 }
1404
1405 pub async fn mark_consolidated(&self, id: i64) -> Result<()> {
1407 sqlx::query(
1408 r#"
1409 UPDATE memories
1410 SET metadata = json_set(
1411 COALESCE(metadata, '{}'),
1412 '$.agent.consolidated',
1413 true,
1414 '$.agent.consolidated_at',
1415 datetime('now')
1416 ),
1417 updated_at = datetime('now')
1418 WHERE id = ?
1419 "#,
1420 )
1421 .bind(id)
1422 .execute(&self.pool)
1423 .await
1424 .map_err(db_error)?;
1425
1426 Ok(())
1427 }
1428
1429 pub async fn mark_consolidated_batch(&self, ids: &[i64]) -> Result<()> {
1431 if ids.is_empty() {
1432 return Ok(());
1433 }
1434 let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1435 let query = format!(
1436 r#"
1437 UPDATE memories
1438 SET metadata = json_set(
1439 COALESCE(metadata, '{{}}'),
1440 '$.agent.consolidated',
1441 true,
1442 '$.agent.consolidated_at',
1443 datetime('now')
1444 ),
1445 updated_at = datetime('now')
1446 WHERE id IN ({})
1447 "#,
1448 placeholders
1449 );
1450 let mut q = sqlx::query(&query);
1451 for id in ids {
1452 q = q.bind(*id);
1453 }
1454 q.execute(&self.pool).await.map_err(db_error)?;
1455 Ok(())
1456 }
1457
1458 pub async fn search_by_text(
1460 &self,
1461 namespace_id: i64,
1462 query: &str,
1463 limit: i32,
1464 include_raw: bool,
1465 ) -> Result<Vec<MemoryRow>> {
1466 let pattern = format!("%{}%", query);
1467 let raw_clause = if include_raw {
1468 String::new()
1469 } else {
1470 format!("AND {RAW_ACTIVITY_FILTER_SQL}")
1471 };
1472 let rows = sqlx::query_as::<_, MemoryRow>(&format!(
1473 r#"
1474 SELECT * FROM memories
1475 WHERE namespace_id = ?
1476 AND is_active = 1
1477 AND content LIKE ?
1478 {}
1479 ORDER BY updated_at DESC
1480 LIMIT ?
1481 "#,
1482 raw_clause
1483 ))
1484 .bind(namespace_id)
1485 .bind(&pattern)
1486 .bind(limit)
1487 .fetch_all(&self.pool)
1488 .await
1489 .map_err(db_error)?;
1490
1491 Ok(rows)
1492 }
1493
1494 pub async fn search_by_text_memories(
1496 &self,
1497 namespace_id: i64,
1498 query: &str,
1499 limit: i32,
1500 include_raw: bool,
1501 ) -> Result<Vec<Memory>> {
1502 let rows = self
1503 .search_by_text(namespace_id, query, limit, include_raw)
1504 .await?;
1505 rows.into_iter()
1506 .map(|row| self.row_to_memory(row))
1507 .collect()
1508 }
1509
1510 pub async fn get_semantic_candidates(
1512 &self,
1513 params: SemanticCandidateParams<'_>,
1514 ) -> Result<Vec<Memory>> {
1515 let SemanticCandidateParams {
1516 namespace_id,
1517 perspective,
1518 limit,
1519 include_raw,
1520 } = params;
1521
1522 let noise_sql = if include_raw {
1523 String::new()
1524 } else {
1525 format!("AND {}", RAW_ACTIVITY_FILTER_SQL)
1526 };
1527
1528 let rows = if let Some(perspective) = perspective {
1529 let sql = if perspective.session_key.is_some() {
1530 format!(
1531 r#"
1532 SELECT * FROM memories
1533 WHERE namespace_id = ?
1534 AND is_active = 1
1535 AND content_embedding IS NOT NULL
1536 AND json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.observer') = ?
1537 AND json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.subject') = ?
1538 AND (
1539 json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.session_key') = ?
1540 OR EXISTS (
1541 SELECT 1
1542 FROM json_each(COALESCE(json_extract(metadata, '$.cognitive.session_keys'), '[]'))
1543 WHERE value = ?
1544 )
1545 )
1546 {noise_sql}
1547 ORDER BY updated_at DESC, created_at DESC
1548 LIMIT ?
1549 "#
1550 )
1551 } else {
1552 format!(
1553 r#"
1554 SELECT * FROM memories
1555 WHERE namespace_id = ?
1556 AND is_active = 1
1557 AND content_embedding IS NOT NULL
1558 AND json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.observer') = ?
1559 AND json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.subject') = ?
1560 {noise_sql}
1561 ORDER BY updated_at DESC, created_at DESC
1562 LIMIT ?
1563 "#
1564 )
1565 };
1566
1567 let mut query = sqlx::query_as::<_, MemoryRow>(&sql)
1568 .bind(namespace_id)
1569 .bind(&perspective.observer)
1570 .bind(&perspective.subject);
1571
1572 if let Some(session_key) = &perspective.session_key {
1573 query = query.bind(session_key);
1574 query = query.bind(session_key);
1575 }
1576
1577 query
1578 .bind(limit)
1579 .fetch_all(&self.pool)
1580 .await
1581 .map_err(db_error)?
1582 } else {
1583 let sql = if include_raw {
1584 r#"
1585 SELECT * FROM memories
1586 WHERE namespace_id = ?
1587 AND is_active = 1
1588 AND content_embedding IS NOT NULL
1589 ORDER BY updated_at DESC, created_at DESC
1590 LIMIT ?
1591 "#
1592 .to_string()
1593 } else {
1594 format!(
1595 r#"
1596 SELECT * FROM memories
1597 WHERE namespace_id = ?
1598 AND is_active = 1
1599 AND content_embedding IS NOT NULL
1600 AND {}
1601 ORDER BY updated_at DESC, created_at DESC
1602 LIMIT ?
1603 "#,
1604 RAW_ACTIVITY_FILTER_SQL,
1605 )
1606 };
1607
1608 sqlx::query_as::<_, MemoryRow>(&sql)
1609 .bind(namespace_id)
1610 .bind(limit)
1611 .fetch_all(&self.pool)
1612 .await
1613 .map_err(db_error)?
1614 };
1615
1616 rows.into_iter()
1617 .map(|row| self.row_to_memory(row))
1618 .collect()
1619 }
1620
1621 pub async fn list_filtered(
1623 &self,
1624 namespace_id: i64,
1625 filters: ListMemoryFilters<'_>,
1626 ) -> Result<Vec<Memory>> {
1627 let mut conditions = vec!["namespace_id = ?1".to_string(), "is_active = 1".to_string()];
1629 let mut param_idx = 2u32;
1630
1631 if filters.category.is_some() {
1632 conditions.push(format!("category = ?{}", param_idx));
1633 param_idx += 1;
1634 }
1635 if filters.since.is_some() {
1636 conditions.push(format!("created_at >= ?{}", param_idx));
1637 param_idx += 1;
1638 }
1639 if filters.until.is_some() {
1640 conditions.push(format!("created_at <= ?{}", param_idx));
1641 param_idx += 1;
1642 }
1643 if filters.content_like.is_some() {
1644 conditions.push(format!("content LIKE ?{}", param_idx));
1645 param_idx += 1;
1646 }
1647 if !filters.include_raw {
1648 conditions.push(RAW_ACTIVITY_FILTER_SQL.to_string());
1649 }
1650
1651 let sql = format!(
1652 "SELECT * FROM memories WHERE {} ORDER BY created_at DESC LIMIT ?{} OFFSET ?{}",
1653 conditions.join(" AND "),
1654 param_idx,
1655 param_idx + 1,
1656 );
1657
1658 let mut query = sqlx::query_as::<_, MemoryRow>(&sql).bind(namespace_id);
1659
1660 if let Some(cat) = filters.category {
1661 query = query.bind(cat.to_string());
1662 }
1663 if let Some(s) = filters.since {
1664 query = query.bind(s);
1665 }
1666 if let Some(u) = filters.until {
1667 query = query.bind(u);
1668 }
1669 if let Some(search) = filters.content_like {
1670 query = query.bind(format!("%{}%", search));
1671 }
1672
1673 let rows: Vec<MemoryRow> = query
1674 .bind(filters.limit)
1675 .bind(filters.offset)
1676 .fetch_all(&self.pool)
1677 .await
1678 .map_err(db_error)?;
1679
1680 rows.into_iter().map(|r| self.row_to_memory(r)).collect()
1681 }
1682
1683 pub async fn list_missing_cognitive_metadata(
1685 &self,
1686 namespace_id: i64,
1687 limit: i64,
1688 offset: i64,
1689 ) -> Result<Vec<Memory>> {
1690 let rows: Vec<MemoryRow> = sqlx::query_as(
1691 r#"
1692 SELECT * FROM memories
1693 WHERE namespace_id = ?
1694 AND json_extract(COALESCE(metadata, '{}'), '$.cognitive.level') IS NULL
1695 ORDER BY id ASC
1696 LIMIT ? OFFSET ?
1697 "#,
1698 )
1699 .bind(namespace_id)
1700 .bind(limit)
1701 .bind(offset)
1702 .fetch_all(&self.pool)
1703 .await
1704 .map_err(db_error)?;
1705
1706 rows.into_iter().map(|r| self.row_to_memory(r)).collect()
1707 }
1708
1709 pub async fn count_missing_cognitive_metadata(&self, namespace_id: i64) -> Result<i64> {
1711 let count: i64 = sqlx::query_scalar(
1712 r#"
1713 SELECT COUNT(*) FROM memories
1714 WHERE namespace_id = ?
1715 AND json_extract(COALESCE(metadata, '{}'), '$.cognitive.level') IS NULL
1716 "#,
1717 )
1718 .bind(namespace_id)
1719 .fetch_one(&self.pool)
1720 .await
1721 .map_err(db_error)?;
1722
1723 Ok(count)
1724 }
1725
1726 pub async fn update_memory_metadata(
1728 &self,
1729 memory_id: i64,
1730 metadata: &serde_json::Value,
1731 ) -> Result<()> {
1732 let metadata_json = serde_json::to_string(metadata)?;
1733 sqlx::query(
1734 r#"
1735 UPDATE memories
1736 SET metadata = ?, updated_at = ?
1737 WHERE id = ?
1738 "#,
1739 )
1740 .bind(metadata_json)
1741 .bind(Utc::now())
1742 .bind(memory_id)
1743 .execute(&self.pool)
1744 .await
1745 .map_err(db_error)?;
1746
1747 Ok(())
1748 }
1749
1750 pub async fn list_session_keys_without_digests(
1752 &self,
1753 namespace_id: i64,
1754 limit: i64,
1755 ) -> Result<Vec<String>> {
1756 let rows: Vec<(String,)> = sqlx::query_as(
1757 r#"
1758 SELECT DISTINCT json_extract(COALESCE(m.metadata, '{}'), '$.cognitive.session_key') AS session_key
1759 FROM memories m
1760 WHERE m.namespace_id = ?
1761 AND json_extract(COALESCE(m.metadata, '{}'), '$.cognitive.session_key') IS NOT NULL
1762 AND TRIM(json_extract(COALESCE(m.metadata, '{}'), '$.cognitive.session_key')) <> ''
1763 AND NOT EXISTS (
1764 SELECT 1 FROM session_digests sd
1765 WHERE sd.namespace_id = m.namespace_id
1766 AND sd.session_key = json_extract(COALESCE(m.metadata, '{}'), '$.cognitive.session_key')
1767 )
1768 ORDER BY session_key ASC
1769 LIMIT ?
1770 "#,
1771 )
1772 .bind(namespace_id)
1773 .bind(limit)
1774 .fetch_all(&self.pool)
1775 .await
1776 .map_err(db_error)?;
1777
1778 Ok(rows.into_iter().map(|(session_key,)| session_key).collect())
1779 }
1780
1781 pub async fn count_distinct_session_keys_with_cognition(
1783 &self,
1784 namespace_id: i64,
1785 ) -> Result<i64> {
1786 let count: i64 = sqlx::query_scalar(
1787 r#"
1788 SELECT COUNT(DISTINCT json_extract(COALESCE(metadata, '{}'), '$.cognitive.session_key'))
1789 FROM memories
1790 WHERE namespace_id = ?
1791 AND is_active = 1
1792 AND json_extract(COALESCE(metadata, '{}'), '$.cognitive.session_key') IS NOT NULL
1793 AND TRIM(json_extract(COALESCE(metadata, '{}'), '$.cognitive.session_key')) <> ''
1794 "#,
1795 )
1796 .bind(namespace_id)
1797 .fetch_one(&self.pool)
1798 .await
1799 .map_err(db_error)?;
1800
1801 Ok(count)
1802 }
1803
1804 pub async fn list_archived_raw_cleanup_candidates(
1806 &self,
1807 namespace_id: i64,
1808 older_than: DateTime<Utc>,
1809 limit: i64,
1810 ) -> Result<Vec<Memory>> {
1811 let rows: Vec<MemoryRow> = sqlx::query_as(
1812 r#"
1813 SELECT * FROM memories
1814 WHERE namespace_id = ?
1815 AND is_active = 0
1816 AND is_archived = 1
1817 AND (
1818 labels LIKE '%raw-activity%'
1819 OR json_extract(COALESCE(metadata, '{}'), '$.raw_activity') = 1
1820 )
1821 AND json_extract(COALESCE(metadata, '{}'), '$.distillation.summary_memory_id') IS NOT NULL
1822 AND created_at <= ?
1823 ORDER BY created_at ASC
1824 LIMIT ?
1825 "#,
1826 )
1827 .bind(namespace_id)
1828 .bind(older_than)
1829 .bind(limit)
1830 .fetch_all(&self.pool)
1831 .await
1832 .map_err(db_error)?;
1833
1834 rows.into_iter().map(|r| self.row_to_memory(r)).collect()
1835 }
1836
1837 pub async fn count_archived_raw_cleanup_candidates(
1839 &self,
1840 namespace_id: i64,
1841 older_than: DateTime<Utc>,
1842 ) -> Result<i64> {
1843 let count: i64 = sqlx::query_scalar(
1844 r#"
1845 SELECT COUNT(*) FROM memories
1846 WHERE namespace_id = ?
1847 AND is_active = 0
1848 AND is_archived = 1
1849 AND (
1850 labels LIKE '%raw-activity%'
1851 OR json_extract(COALESCE(metadata, '{}'), '$.raw_activity') = 1
1852 )
1853 AND json_extract(COALESCE(metadata, '{}'), '$.distillation.summary_memory_id') IS NOT NULL
1854 AND created_at <= ?
1855 "#,
1856 )
1857 .bind(namespace_id)
1858 .bind(older_than)
1859 .fetch_one(&self.pool)
1860 .await
1861 .map_err(db_error)?;
1862
1863 Ok(count)
1864 }
1865
1866 pub async fn delete_batch(&self, ids: &[i64]) -> Result<u64> {
1868 if ids.is_empty() {
1869 return Ok(0);
1870 }
1871
1872 let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1873 let sql = format!("DELETE FROM memories WHERE id IN ({placeholders})");
1874 let mut query = sqlx::query(&sql);
1875 for id in ids {
1876 query = query.bind(*id);
1877 }
1878
1879 let result = query.execute(&self.pool).await.map_err(db_error)?;
1880 Ok(result.rows_affected())
1881 }
1882
1883 pub async fn delete_by_content_pattern(&self, namespace_id: i64, pattern: &str) -> Result<u64> {
1885 let result = sqlx::query("DELETE FROM memories WHERE namespace_id = ? AND content LIKE ?")
1886 .bind(namespace_id)
1887 .bind(pattern)
1888 .execute(&self.pool)
1889 .await
1890 .map_err(db_error)?;
1891
1892 Ok(result.rows_affected())
1893 }
1894
1895 pub async fn count_filtered(
1897 &self,
1898 namespace_id: i64,
1899 category: Option<&str>,
1900 since: Option<DateTime<Utc>>,
1901 until: Option<DateTime<Utc>>,
1902 include_raw: bool,
1903 ) -> Result<i64> {
1904 let mut conditions = vec!["namespace_id = ?1".to_string(), "is_active = 1".to_string()];
1905 let mut param_idx = 2u32;
1906
1907 if category.is_some() {
1908 conditions.push(format!("category = ?{}", param_idx));
1909 param_idx += 1;
1910 }
1911 if since.is_some() {
1912 conditions.push(format!("created_at >= ?{}", param_idx));
1913 param_idx += 1;
1914 }
1915 if until.is_some() {
1916 conditions.push(format!("created_at <= ?{}", param_idx));
1917 }
1918 if !include_raw {
1919 conditions.push(RAW_ACTIVITY_FILTER_SQL.to_string());
1920 }
1921
1922 let sql = format!(
1923 "SELECT COUNT(*) FROM memories WHERE {}",
1924 conditions.join(" AND "),
1925 );
1926
1927 let mut query = sqlx::query_scalar::<_, i64>(&sql).bind(namespace_id);
1928
1929 if let Some(cat) = category {
1930 query = query.bind(cat.to_string());
1931 }
1932 if let Some(s) = since {
1933 query = query.bind(s);
1934 }
1935 if let Some(u) = until {
1936 query = query.bind(u);
1937 }
1938
1939 let count = query.fetch_one(&self.pool).await.map_err(db_error)?;
1940 Ok(count)
1941 }
1942
1943 pub async fn store_distilled_summary(
1945 &self,
1946 params: StoreMemoryParams<'_>,
1947 source_ids: &[i64],
1948 ) -> Result<Memory> {
1949 let labels_json = serde_json::to_string(params.labels)?;
1950 let metadata_json = serde_json::to_string(params.metadata)?;
1951 let embedding_json = params.embedding.map(serde_json::to_string).transpose()?;
1952 let mut tx = self.pool.begin().await.map_err(db_error)?;
1953
1954 let result = sqlx::query(
1955 r#"
1956 INSERT INTO memories (
1957 namespace_id, content, category, memory_lane_type, labels, metadata,
1958 content_embedding, embedding_model, created_at, is_active, access_count
1959 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 1, 0)
1960 "#,
1961 )
1962 .bind(params.namespace_id)
1963 .bind(params.content)
1964 .bind(params.category.to_string())
1965 .bind(params.memory_lane_type.map(|t| t.to_string()))
1966 .bind(&labels_json)
1967 .bind(&metadata_json)
1968 .bind(&embedding_json)
1969 .bind(params.embedding_model)
1970 .bind(Utc::now())
1971 .execute(&mut *tx)
1972 .await
1973 .map_err(db_error)?;
1974
1975 let summary_id = if result.last_insert_rowid() == 0 {
1976 let row: Option<MemoryRow> = sqlx::query_as(
1977 "SELECT * FROM memories WHERE namespace_id = ? AND LOWER(TRIM(content)) = LOWER(TRIM(?)) ORDER BY created_at DESC LIMIT 1",
1978 )
1979 .bind(params.namespace_id)
1980 .bind(params.content)
1981 .fetch_optional(&mut *tx)
1982 .await
1983 .map_err(db_error)?;
1984 row.map(|memory| memory.id).ok_or_else(|| {
1985 nexus_core::NexusError::Storage(
1986 "Duplicate distilled summary merged but matching row not found".to_string(),
1987 )
1988 })?
1989 } else {
1990 result.last_insert_rowid()
1991 };
1992
1993 if !source_ids.is_empty() {
1994 for source_id in source_ids {
1995 sqlx::query(
1996 r#"
1997 INSERT OR IGNORE INTO memory_evidence (
1998 derived_memory_id,
1999 source_memory_id,
2000 evidence_role,
2001 created_at
2002 ) VALUES (?, ?, 'source', datetime('now'))
2003 "#,
2004 )
2005 .bind(summary_id)
2006 .bind(*source_id)
2007 .execute(&mut *tx)
2008 .await
2009 .map_err(db_error)?;
2010 }
2011
2012 let placeholders = source_ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
2013 let sql = format!(
2014 r#"
2015 UPDATE memories
2016 SET
2017 is_active = 0,
2018 is_archived = 1,
2019 updated_at = ?,
2020 metadata = json_set(
2021 COALESCE(metadata, '{{}}'),
2022 '$.distillation.status', 'archived',
2023 '$.distillation.summary_memory_id', ?,
2024 '$.distillation.archived_at', ?
2025 )
2026 WHERE id IN ({})
2027 "#,
2028 placeholders
2029 );
2030 let archived_at = Utc::now().to_rfc3339();
2031 let mut query = sqlx::query(&sql)
2032 .bind(Utc::now())
2033 .bind(summary_id)
2034 .bind(&archived_at);
2035 for source_id in source_ids {
2036 query = query.bind(*source_id);
2037 }
2038 query.execute(&mut *tx).await.map_err(db_error)?;
2039 }
2040
2041 tx.commit().await.map_err(db_error)?;
2042 self.get_by_id(summary_id).await?.ok_or_else(|| {
2043 nexus_core::NexusError::Storage(format!(
2044 "Failed to retrieve distilled summary with id {}",
2045 summary_id
2046 ))
2047 })
2048 }
2049
2050 fn row_to_memory(&self, row: MemoryRow) -> Result<Memory> {
2051 let labels: Vec<String> = serde_json::from_str(&row.labels).map_err(|e| {
2052 nexus_core::NexusError::Storage(format!(
2053 "corrupted labels JSON for memory {}: {e}",
2054 row.id
2055 ))
2056 })?;
2057 let metadata: serde_json::Value = serde_json::from_str(&row.metadata).map_err(|e| {
2058 nexus_core::NexusError::Storage(format!(
2059 "corrupted metadata JSON for memory {}: {e}",
2060 row.id
2061 ))
2062 })?;
2063 let embedding: Option<Vec<f32>> = row
2064 .content_embedding
2065 .map(|e| {
2066 serde_json::from_str(&e).map_err(|err| {
2067 nexus_core::NexusError::Storage(format!(
2068 "corrupted embedding JSON for memory {}: {err}",
2069 row.id
2070 ))
2071 })
2072 })
2073 .transpose()?;
2074
2075 Ok(Memory {
2076 id: row.id,
2077 namespace_id: row.namespace_id,
2078 content: row.content,
2079 category: parse_category(&row.category)?,
2080 memory_lane_type: match &row.memory_lane_type {
2081 Some(s) => parse_memory_lane_type(s)?,
2082 None => None,
2083 },
2084 labels,
2085 metadata,
2086 similarity_score: row.similarity_score,
2087 relevance_score: row.relevance_score,
2088 content_embedding: embedding,
2089 embedding_model: row.embedding_model,
2090 created_at: row.created_at,
2091 updated_at: row.updated_at,
2092 last_accessed: row.last_accessed,
2093 is_active: row.is_active,
2094 is_archived: row.is_archived,
2095 access_count: row.access_count,
2096 })
2097 }
2098
2099 pub async fn list_jobs(
2103 &self,
2104 namespace_id: i64,
2105 job_type: Option<&str>,
2106 status: Option<&str>,
2107 limit: i64,
2108 offset: i64,
2109 ) -> Result<Vec<MemoryJobRow>> {
2110 let mut where_clauses = vec!["namespace_id = ?".to_string()];
2111 if job_type.is_some() {
2112 where_clauses.push("job_type = ?".to_string());
2113 }
2114 if status.is_some() {
2115 where_clauses.push("status = ?".to_string());
2116 }
2117 let where_sql = where_clauses.join(" AND ");
2118
2119 let sql = format!(
2120 "SELECT * FROM memory_jobs WHERE {} ORDER BY created_at DESC LIMIT ? OFFSET ?",
2121 where_sql
2122 );
2123
2124 let mut query = sqlx::query_as::<_, MemoryJobRow>(&sql).bind(namespace_id);
2125 if let Some(jt) = job_type {
2126 query = query.bind(jt);
2127 }
2128 if let Some(st) = status {
2129 query = query.bind(st);
2130 }
2131 query = query.bind(limit).bind(offset);
2132
2133 let rows = query.fetch_all(&self.pool).await.map_err(db_error)?;
2134 Ok(rows)
2135 }
2136
2137 pub async fn count_jobs(
2139 &self,
2140 namespace_id: i64,
2141 job_type: Option<&str>,
2142 status: Option<&str>,
2143 ) -> Result<i64> {
2144 let mut where_clauses = vec!["namespace_id = ?".to_string()];
2145 if job_type.is_some() {
2146 where_clauses.push("job_type = ?".to_string());
2147 }
2148 if status.is_some() {
2149 where_clauses.push("status = ?".to_string());
2150 }
2151 let where_sql = where_clauses.join(" AND ");
2152
2153 let sql = format!("SELECT COUNT(*) FROM memory_jobs WHERE {}", where_sql);
2154
2155 let mut query = sqlx::query_scalar::<_, i64>(&sql).bind(namespace_id);
2156 if let Some(jt) = job_type {
2157 query = query.bind(jt);
2158 }
2159 if let Some(st) = status {
2160 query = query.bind(st);
2161 }
2162
2163 let count = query.fetch_one(&self.pool).await.map_err(db_error)?;
2164 Ok(count)
2165 }
2166
2167 pub async fn count_jobs_by_status(
2169 &self,
2170 namespace_id: i64,
2171 job_type: Option<&str>,
2172 ) -> Result<Vec<(String, i64)>> {
2173 let mut where_clauses = vec!["namespace_id = ?".to_string()];
2174 if job_type.is_some() {
2175 where_clauses.push("job_type = ?".to_string());
2176 }
2177 let where_sql = where_clauses.join(" AND ");
2178
2179 let sql = format!(
2180 "SELECT status, COUNT(*) as cnt FROM memory_jobs WHERE {} GROUP BY status",
2181 where_sql
2182 );
2183
2184 let mut query = sqlx::query_as::<_, (String, i64)>(&sql).bind(namespace_id);
2185 if let Some(jt) = job_type {
2186 query = query.bind(jt);
2187 }
2188
2189 let rows = query.fetch_all(&self.pool).await.map_err(db_error)?;
2190 Ok(rows)
2191 }
2192
2193 pub async fn purge_completed_jobs(&self, older_than: DateTime<Utc>) -> Result<u64> {
2197 let result = sqlx::query(
2198 r#"
2199 DELETE FROM memory_jobs
2200 WHERE status = ? AND updated_at < ?
2201 "#,
2202 )
2203 .bind(memory_job_status::COMPLETED)
2204 .bind(older_than.format("%Y-%m-%d %H:%M:%S").to_string())
2205 .execute(&self.pool)
2206 .await
2207 .map_err(db_error)?;
2208
2209 Ok(result.rows_affected())
2210 }
2211
2212 pub async fn purge_permanently_failed_jobs(&self, older_than: DateTime<Utc>) -> Result<u64> {
2217 let result = sqlx::query(
2218 r#"
2219 DELETE FROM memory_jobs
2220 WHERE status = ? AND attempts >= ? AND updated_at < ?
2221 "#,
2222 )
2223 .bind(memory_job_status::FAILED)
2224 .bind(MAX_JOB_ATTEMPTS)
2225 .bind(older_than.format("%Y-%m-%d %H:%M:%S").to_string())
2226 .execute(&self.pool)
2227 .await
2228 .map_err(db_error)?;
2229
2230 Ok(result.rows_affected())
2231 }
2232
2233 pub async fn list_digests(
2235 &self,
2236 namespace_id: i64,
2237 session_key: Option<&str>,
2238 limit: i64,
2239 offset: i64,
2240 ) -> Result<Vec<SessionDigestRow>> {
2241 let mut query = if let Some(sk) = session_key {
2242 sqlx::query_as::<_, SessionDigestRow>(
2243 "SELECT * FROM session_digests WHERE namespace_id = ? AND session_key = ? ORDER BY created_at DESC LIMIT ? OFFSET ?"
2244 )
2245 .bind(namespace_id)
2246 .bind(sk)
2247 } else {
2248 sqlx::query_as::<_, SessionDigestRow>(
2249 "SELECT * FROM session_digests WHERE namespace_id = ? ORDER BY created_at DESC LIMIT ? OFFSET ?"
2250 )
2251 .bind(namespace_id)
2252 };
2253
2254 query = query.bind(limit).bind(offset);
2255 let rows = query.fetch_all(&self.pool).await.map_err(db_error)?;
2256 Ok(rows)
2257 }
2258
2259 pub async fn count_digests(&self, namespace_id: i64, session_key: Option<&str>) -> Result<i64> {
2261 let query = if let Some(sk) = session_key {
2262 sqlx::query_scalar(
2263 "SELECT COUNT(*) FROM session_digests WHERE namespace_id = ? AND session_key = ?",
2264 )
2265 .bind(namespace_id)
2266 .bind(sk)
2267 } else {
2268 sqlx::query_scalar("SELECT COUNT(*) FROM session_digests WHERE namespace_id = ?")
2269 .bind(namespace_id)
2270 };
2271
2272 let count: i64 = query.fetch_one(&self.pool).await.map_err(db_error)?;
2273 Ok(count)
2274 }
2275
2276 pub async fn count_evidence(&self, namespace_id: i64) -> Result<i64> {
2278 let count: i64 = sqlx::query_scalar(
2279 "SELECT COUNT(*) FROM memory_evidence WHERE derived_memory_id IN (SELECT id FROM memories WHERE namespace_id = ?)"
2280 )
2281 .bind(namespace_id)
2282 .fetch_one(&self.pool)
2283 .await
2284 .map_err(db_error)?;
2285 Ok(count)
2286 }
2287
2288 pub async fn record_metric(
2290 &self,
2291 metric_name: &str,
2292 metric_value: f64,
2293 labels: &serde_json::Value,
2294 ) -> Result<i64> {
2295 let labels_json = serde_json::to_string(labels)?;
2296 let id: i64 = sqlx::query_scalar(
2297 r#"
2298 INSERT INTO system_metrics (metric_name, metric_value, labels, recorded_at)
2299 VALUES (?, ?, ?, ?)
2300 RETURNING id
2301 "#,
2302 )
2303 .bind(metric_name)
2304 .bind(metric_value)
2305 .bind(labels_json)
2306 .bind(Utc::now())
2307 .fetch_one(&self.pool)
2308 .await
2309 .map_err(db_error)?;
2310 Ok(id)
2311 }
2312
2313 pub async fn record_metrics_batch(&self, samples: &[MetricSample]) -> Result<()> {
2315 if samples.is_empty() {
2316 return Ok(());
2317 }
2318
2319 let mut tx = self.pool.begin().await.map_err(db_error)?;
2320 for sample in samples {
2321 let labels_json = serde_json::to_string(&sample.labels)?;
2322 sqlx::query(
2323 r#"
2324 INSERT INTO system_metrics (metric_name, metric_value, labels, recorded_at)
2325 VALUES (?, ?, ?, ?)
2326 "#,
2327 )
2328 .bind(&sample.metric_name)
2329 .bind(sample.metric_value)
2330 .bind(labels_json)
2331 .bind(Utc::now())
2332 .execute(&mut *tx)
2333 .await
2334 .map_err(db_error)?;
2335 }
2336 tx.commit().await.map_err(db_error)?;
2337 Ok(())
2338 }
2339
2340 pub async fn latest_metrics_for_namespace(
2342 &self,
2343 namespace_id: i64,
2344 metric_prefix: Option<&str>,
2345 limit: i64,
2346 ) -> Result<Vec<SystemMetricRow>> {
2347 let limit = limit.max(1);
2348 let rows = if let Some(prefix) = metric_prefix {
2349 sqlx::query_as::<_, SystemMetricRow>(
2350 r#"
2351 SELECT *
2352 FROM system_metrics
2353 WHERE json_extract(COALESCE(labels, '{}'), '$.namespace_id') = ?
2354 AND metric_name LIKE ?
2355 ORDER BY recorded_at DESC, id DESC
2356 LIMIT ?
2357 "#,
2358 )
2359 .bind(namespace_id)
2360 .bind(format!("{prefix}%"))
2361 .bind(limit)
2362 .fetch_all(&self.pool)
2363 .await
2364 .map_err(db_error)?
2365 } else {
2366 sqlx::query_as::<_, SystemMetricRow>(
2367 r#"
2368 SELECT *
2369 FROM system_metrics
2370 WHERE json_extract(COALESCE(labels, '{}'), '$.namespace_id') = ?
2371 ORDER BY recorded_at DESC, id DESC
2372 LIMIT ?
2373 "#,
2374 )
2375 .bind(namespace_id)
2376 .bind(limit)
2377 .fetch_all(&self.pool)
2378 .await
2379 .map_err(db_error)?
2380 };
2381 Ok(rows)
2382 }
2383
2384 pub async fn count_by_cognitive_level(
2386 &self,
2387 namespace_id: i64,
2388 level: CognitiveLevel,
2389 ) -> Result<i64> {
2390 let count: i64 = sqlx::query_scalar(
2391 r#"
2392 SELECT COUNT(*) FROM memories
2393 WHERE namespace_id = ?
2394 AND is_active = 1
2395 AND json_extract(COALESCE(metadata, '{}'), '$.cognitive.level') = ?
2396 "#,
2397 )
2398 .bind(namespace_id)
2399 .bind(level.as_str())
2400 .fetch_one(&self.pool)
2401 .await
2402 .map_err(db_error)?;
2403 Ok(count)
2404 }
2405}
2406
2407async fn insert_memory_tx(
2408 tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>,
2409 params: &StoreMemoryParams<'_>,
2410) -> Result<i64> {
2411 let labels_json = serde_json::to_string(params.labels)?;
2412 let metadata_json = serde_json::to_string(params.metadata)?;
2413 let embedding_json = params.embedding.map(serde_json::to_string).transpose()?;
2414
2415 let result = sqlx::query(
2416 r#"
2417 INSERT INTO memories (
2418 namespace_id, content, category, memory_lane_type, labels, metadata,
2419 content_embedding, embedding_model, created_at, is_active, access_count
2420 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 1, 0)
2421 "#,
2422 )
2423 .bind(params.namespace_id)
2424 .bind(params.content)
2425 .bind(params.category.to_string())
2426 .bind(params.memory_lane_type.map(|t| t.to_string()))
2427 .bind(&labels_json)
2428 .bind(&metadata_json)
2429 .bind(&embedding_json)
2430 .bind(params.embedding_model)
2431 .bind(Utc::now())
2432 .execute(&mut **tx)
2433 .await
2434 .map_err(db_error)?;
2435
2436 let inserted_id = result.last_insert_rowid();
2437 if inserted_id != 0 {
2438 return Ok(inserted_id);
2439 }
2440
2441 let row: Option<MemoryRow> = sqlx::query_as(
2442 "SELECT * FROM memories WHERE namespace_id = ? AND LOWER(TRIM(content)) = LOWER(TRIM(?)) AND is_active = 1 ORDER BY created_at DESC LIMIT 1",
2443 )
2444 .bind(params.namespace_id)
2445 .bind(params.content)
2446 .fetch_optional(&mut **tx)
2447 .await
2448 .map_err(db_error)?;
2449
2450 row.map(|memory| memory.id).ok_or_else(|| {
2451 nexus_core::NexusError::Storage(
2452 "Duplicate merged by trigger but matching row not found".to_string(),
2453 )
2454 })
2455}
2456
2457async fn insert_evidence_tx(
2458 tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>,
2459 derived_memory_id: i64,
2460 source_memory_id: i64,
2461 evidence_role: &str,
2462) -> Result<()> {
2463 sqlx::query(
2464 r#"
2465 INSERT OR IGNORE INTO memory_evidence (derived_memory_id, source_memory_id, evidence_role, created_at)
2466 VALUES (?, ?, ?, datetime('now'))
2467 "#,
2468 )
2469 .bind(derived_memory_id)
2470 .bind(source_memory_id)
2471 .bind(evidence_role)
2472 .execute(&mut **tx)
2473 .await
2474 .map_err(db_error)?;
2475
2476 Ok(())
2477}
2478
2479fn new_claim_token(lease_owner: &str) -> String {
2480 let nanos = Utc::now().timestamp_nanos_opt().unwrap_or_default();
2481 format!("{lease_owner}-{nanos}-{}", std::process::id())
2482}
2483
2484fn merge_labels(existing: &[String], incoming: &[String]) -> Vec<String> {
2485 let mut merged = existing.to_vec();
2486 for label in incoming {
2487 if !merged
2488 .iter()
2489 .any(|current| current.eq_ignore_ascii_case(label))
2490 {
2491 merged.push(label.clone());
2492 }
2493 }
2494 merged
2495}
2496
2497fn merge_duplicate_metadata(
2498 existing: &serde_json::Value,
2499 incoming: &serde_json::Value,
2500) -> serde_json::Value {
2501 let mut merged = existing.clone();
2502
2503 if let Some(session_key) = incoming
2504 .pointer("/cognitive/session_key")
2505 .and_then(serde_json::Value::as_str)
2506 {
2507 let mut session_keys = existing
2508 .pointer("/cognitive/session_keys")
2509 .and_then(serde_json::Value::as_array)
2510 .cloned()
2511 .unwrap_or_default();
2512 if let Some(existing_key) = existing
2513 .pointer("/cognitive/session_key")
2514 .and_then(serde_json::Value::as_str)
2515 {
2516 push_unique_json_string(&mut session_keys, existing_key);
2517 }
2518 push_unique_json_string(&mut session_keys, session_key);
2519 ensure_object_path(&mut merged, "cognitive").insert(
2520 "session_key".to_string(),
2521 serde_json::Value::String(session_key.to_string()),
2522 );
2523 ensure_object_path(&mut merged, "cognitive").insert(
2524 "session_keys".to_string(),
2525 serde_json::Value::Array(session_keys),
2526 );
2527 }
2528
2529 if let Some(derived_session_key) = incoming
2530 .pointer("/source/derived_session_key")
2531 .and_then(serde_json::Value::as_str)
2532 {
2533 let mut derived_keys = existing
2534 .pointer("/source/derived_session_keys")
2535 .and_then(serde_json::Value::as_array)
2536 .cloned()
2537 .unwrap_or_default();
2538 if let Some(existing_key) = existing
2539 .pointer("/source/derived_session_key")
2540 .and_then(serde_json::Value::as_str)
2541 {
2542 push_unique_json_string(&mut derived_keys, existing_key);
2543 }
2544 push_unique_json_string(&mut derived_keys, derived_session_key);
2545 ensure_object_path(&mut merged, "source").insert(
2546 "derived_session_key".to_string(),
2547 serde_json::Value::String(derived_session_key.to_string()),
2548 );
2549 ensure_object_path(&mut merged, "source").insert(
2550 "derived_session_keys".to_string(),
2551 serde_json::Value::Array(derived_keys),
2552 );
2553 }
2554
2555 merged
2556}
2557
2558fn push_unique_json_string(values: &mut Vec<serde_json::Value>, candidate: &str) {
2559 if values
2560 .iter()
2561 .filter_map(serde_json::Value::as_str)
2562 .any(|current| current.eq_ignore_ascii_case(candidate))
2563 {
2564 return;
2565 }
2566 values.push(serde_json::Value::String(candidate.to_string()));
2567}
2568
2569fn ensure_object_path<'a>(
2570 root: &'a mut serde_json::Value,
2571 key: &str,
2572) -> &'a mut serde_json::Map<String, serde_json::Value> {
2573 if !root.is_object() {
2574 *root = serde_json::Value::Object(serde_json::Map::new());
2575 }
2576
2577 let object = root.as_object_mut().expect("root object ensured");
2578 let entry = object
2579 .entry(key.to_string())
2580 .or_insert_with(|| serde_json::Value::Object(serde_json::Map::new()));
2581 if !entry.is_object() {
2582 *entry = serde_json::Value::Object(serde_json::Map::new());
2583 }
2584
2585 entry.as_object_mut().expect("child object ensured")
2586}
2587
2588pub struct NamespaceRepository {
2590 pool: SqlitePool,
2591}
2592
2593impl NamespaceRepository {
2594 pub fn new(pool: SqlitePool) -> Self {
2595 Self { pool }
2596 }
2597
2598 pub async fn get_or_create(&self, name: &str, agent_type: &str) -> Result<AgentNamespace> {
2600 if let Some(ns) = self.get_by_name(name).await? {
2601 return Ok(ns);
2602 }
2603
2604 let result = sqlx::query(
2605 "INSERT INTO agent_namespaces (name, agent_type, created_at) VALUES (?, ?, ?)",
2606 )
2607 .bind(name)
2608 .bind(agent_type)
2609 .bind(Utc::now())
2610 .execute(&self.pool)
2611 .await
2612 .map_err(db_error)?;
2613
2614 let id = result.last_insert_rowid();
2615 Ok(AgentNamespace {
2616 id,
2617 name: name.to_string(),
2618 description: None,
2619 agent_type: agent_type.to_string(),
2620 created_at: Utc::now(),
2621 updated_at: None,
2622 })
2623 }
2624
2625 pub async fn get_by_name(&self, name: &str) -> Result<Option<AgentNamespace>> {
2627 let row: Option<AgentNamespaceRow> =
2628 sqlx::query_as("SELECT * FROM agent_namespaces WHERE name = ?")
2629 .bind(name)
2630 .fetch_optional(&self.pool)
2631 .await
2632 .map_err(db_error)?;
2633
2634 Ok(row.map(|r| AgentNamespace {
2635 id: r.id,
2636 name: r.name,
2637 description: r.description,
2638 agent_type: r.agent_type,
2639 created_at: r.created_at,
2640 updated_at: r.updated_at,
2641 }))
2642 }
2643
2644 pub async fn list_all(&self) -> Result<Vec<AgentNamespace>> {
2646 let rows: Vec<AgentNamespaceRow> =
2647 sqlx::query_as("SELECT * FROM agent_namespaces ORDER BY name")
2648 .fetch_all(&self.pool)
2649 .await
2650 .map_err(db_error)?;
2651
2652 Ok(rows
2653 .into_iter()
2654 .map(|r| AgentNamespace {
2655 id: r.id,
2656 name: r.name,
2657 description: r.description,
2658 agent_type: r.agent_type,
2659 created_at: r.created_at,
2660 updated_at: r.updated_at,
2661 })
2662 .collect())
2663 }
2664}
2665
2666pub struct ProcessedFileRepository<'a> {
2668 pub pool: &'a SqlitePool,
2669}
2670
2671impl<'a> ProcessedFileRepository<'a> {
2672 pub fn new(pool: &'a SqlitePool) -> Self {
2673 Self { pool }
2674 }
2675
2676 pub async fn is_processed(&self, namespace_id: i64, path: &str) -> Result<bool> {
2678 let row: Option<(i64,)> =
2679 sqlx::query_as("SELECT id FROM processed_files WHERE namespace_id = ? AND path = ? AND status = 'completed'")
2680 .bind(namespace_id)
2681 .bind(path)
2682 .fetch_optional(self.pool)
2683 .await
2684 .map_err(db_error)?;
2685
2686 Ok(row.is_some())
2687 }
2688
2689 pub async fn get_completed_paths(
2691 &self,
2692 namespace_id: i64,
2693 ) -> Result<std::collections::HashSet<String>> {
2694 let rows: Vec<(String,)> = sqlx::query_as(
2695 "SELECT path FROM processed_files WHERE namespace_id = ? AND status = 'completed'",
2696 )
2697 .bind(namespace_id)
2698 .fetch_all(self.pool)
2699 .await
2700 .map_err(db_error)?;
2701
2702 Ok(rows.into_iter().map(|r| r.0).collect())
2703 }
2704
2705 pub async fn mark_processing(
2707 &self,
2708 namespace_id: i64,
2709 path: &str,
2710 content_hash: Option<&str>,
2711 ) -> Result<i64> {
2712 let id: i64 = sqlx::query_scalar(
2713 r#"
2714 INSERT INTO processed_files (namespace_id, path, content_hash, status, updated_at)
2715 VALUES (?, ?, ?, 'processing', datetime('now'))
2716 ON CONFLICT(namespace_id, path) DO UPDATE SET
2717 content_hash = excluded.content_hash,
2718 status = 'processing',
2719 updated_at = datetime('now')
2720 RETURNING id
2721 "#,
2722 )
2723 .bind(namespace_id)
2724 .bind(path)
2725 .bind(content_hash)
2726 .fetch_one(self.pool)
2727 .await
2728 .map_err(db_error)?;
2729
2730 Ok(id)
2731 }
2732
2733 pub async fn mark_processed(&self, id: i64, memory_id: i64) -> Result<()> {
2735 sqlx::query(
2736 r#"
2737 UPDATE processed_files
2738 SET status = 'completed', memory_id = ?, processed_at = datetime('now'), updated_at = datetime('now')
2739 WHERE id = ?
2740 "#
2741 )
2742 .bind(memory_id)
2743 .bind(id)
2744 .execute(self.pool)
2745 .await
2746 .map_err(db_error)?;
2747
2748 Ok(())
2749 }
2750
2751 pub async fn mark_failed(&self, id: i64, error: &str) -> Result<()> {
2753 sqlx::query(
2754 r#"
2755 UPDATE processed_files
2756 SET status = 'failed', last_error = ?, updated_at = datetime('now')
2757 WHERE id = ?
2758 "#,
2759 )
2760 .bind(error)
2761 .bind(id)
2762 .execute(self.pool)
2763 .await
2764 .map_err(db_error)?;
2765
2766 Ok(())
2767 }
2768
2769 pub async fn get_pending(
2771 &self,
2772 namespace_id: i64,
2773 limit: i32,
2774 ) -> Result<Vec<ProcessedFileRow>> {
2775 let rows = sqlx::query_as::<_, ProcessedFileRow>(
2776 r#"
2777 SELECT * FROM processed_files
2778 WHERE namespace_id = ? AND status = 'pending'
2779 ORDER BY created_at ASC
2780 LIMIT ?
2781 "#,
2782 )
2783 .bind(namespace_id)
2784 .bind(limit)
2785 .fetch_all(self.pool)
2786 .await
2787 .map_err(db_error)?;
2788
2789 Ok(rows)
2790 }
2791
2792 pub async fn clear_namespace(&self, namespace_id: i64) -> Result<u64> {
2794 let result = sqlx::query("DELETE FROM processed_files WHERE namespace_id = ?")
2795 .bind(namespace_id)
2796 .execute(self.pool)
2797 .await
2798 .map_err(db_error)?;
2799
2800 Ok(result.rows_affected())
2801 }
2802}
2803
2804pub struct MemoryRelationRepository<'a> {
2806 pub pool: &'a SqlitePool,
2807}
2808
2809impl<'a> MemoryRelationRepository<'a> {
2810 pub fn new(pool: &'a SqlitePool) -> Self {
2811 Self { pool }
2812 }
2813
2814 pub async fn store(
2816 &self,
2817 source_id: i64,
2818 target_id: i64,
2819 relation_type: &str,
2820 strength: f32,
2821 ) -> Result<i64> {
2822 let id: i64 = sqlx::query_scalar(
2823 r#"
2824 INSERT INTO memory_relations (source_memory_id, target_memory_id, relation_type, strength, created_at)
2825 VALUES (?, ?, ?, ?, datetime('now'))
2826 ON CONFLICT(source_memory_id, target_memory_id, relation_type) DO UPDATE SET
2827 strength = excluded.strength,
2828 created_at = datetime('now')
2829 RETURNING id
2830 "#
2831 )
2832 .bind(source_id)
2833 .bind(target_id)
2834 .bind(relation_type)
2835 .bind(strength)
2836 .fetch_one(self.pool)
2837 .await
2838 .map_err(db_error)?;
2839
2840 Ok(id)
2841 }
2842
2843 pub async fn get_related(&self, memory_id: i64) -> Result<Vec<(i64, String, f32)>> {
2845 let rows: Vec<(i64, String, f32)> = sqlx::query_as(
2846 r#"
2847 SELECT target_memory_id as memory_id, relation_type, strength
2848 FROM memory_relations
2849 WHERE source_memory_id = ?
2850 UNION
2851 SELECT source_memory_id as memory_id, relation_type, strength
2852 FROM memory_relations
2853 WHERE target_memory_id = ?
2854 ORDER BY strength DESC
2855 "#,
2856 )
2857 .bind(memory_id)
2858 .bind(memory_id)
2859 .fetch_all(self.pool)
2860 .await
2861 .map_err(db_error)?;
2862
2863 Ok(rows)
2864 }
2865
2866 pub async fn delete_for_memory(&self, memory_id: i64) -> Result<u64> {
2868 let result = sqlx::query(
2869 "DELETE FROM memory_relations WHERE source_memory_id = ? OR target_memory_id = ?",
2870 )
2871 .bind(memory_id)
2872 .bind(memory_id)
2873 .execute(self.pool)
2874 .await
2875 .map_err(db_error)?;
2876
2877 Ok(result.rows_affected())
2878 }
2879}
2880
2881fn parse_category(s: &str) -> Result<Category> {
2882 match MemoryCategory::parse(s) {
2883 Some(cat) => Ok(cat),
2884 None => Err(nexus_core::NexusError::Storage(format!(
2885 "Unknown memory category '{s}' persisted in database; row may be corrupted"
2886 ))),
2887 }
2888}
2889
2890fn parse_memory_lane_type(s: &str) -> Result<Option<MemoryLaneType>> {
2891 match MemoryLaneType::parse(s) {
2892 Some(t) => Ok(Some(t)),
2893 None => Err(nexus_core::NexusError::Storage(format!(
2894 "Unknown memory_lane_type '{s}' persisted in database; row may be corrupted"
2895 ))),
2896 }
2897}
2898
2899#[cfg(test)]
2900mod tests {
2901 use super::*;
2902 use nexus_core::MemoryLanePriorityType;
2903 use sqlx::sqlite::SqlitePoolOptions;
2904
2905 fn cognitive_metadata(
2906 level: CognitiveLevel,
2907 perspective: &PerspectiveKey,
2908 times_reinforced: i64,
2909 times_contradicted: i64,
2910 ) -> serde_json::Value {
2911 serde_json::json!({
2912 "cognitive": {
2913 "level": level.as_str(),
2914 "observer": perspective.observer,
2915 "subject": perspective.subject,
2916 "session_key": perspective.session_key,
2917 "source_memory_ids": [],
2918 "confidence": 0.9,
2919 "times_reinforced": times_reinforced,
2920 "times_contradicted": times_contradicted,
2921 "generated_by": "test",
2922 }
2923 })
2924 }
2925
2926 #[test]
2927 fn test_parse_category() {
2928 assert!(matches!(parse_category("facts"), Ok(Category::Facts)));
2929 assert!(matches!(
2930 parse_category("preferences"),
2931 Ok(Category::Preferences)
2932 ));
2933 assert!(parse_category("unknown").is_err());
2934 }
2935
2936 #[test]
2937 fn test_parse_memory_lane_type() {
2938 let correction = parse_memory_lane_type("correction");
2939 assert!(matches!(
2940 correction,
2941 Ok(Some(MemoryLaneType::Priority(
2942 MemoryLanePriorityType::Correction
2943 )))
2944 ));
2945
2946 let pattern_seed = parse_memory_lane_type("pattern_seed");
2947 assert!(matches!(
2948 pattern_seed,
2949 Ok(Some(MemoryLaneType::Priority(
2950 MemoryLanePriorityType::PatternSeed
2951 )))
2952 ));
2953
2954 assert!(parse_memory_lane_type("unknown").is_err());
2955 }
2956
2957 #[test]
2958 fn test_parse_category_all_variants() {
2959 assert!(matches!(parse_category("general"), Ok(Category::General)));
2960 assert!(matches!(parse_category("session"), Ok(Category::Session)));
2961 assert!(matches!(parse_category("context"), Ok(Category::Context)));
2962 assert!(matches!(
2963 parse_category("specifications"),
2964 Ok(Category::Specifications)
2965 ));
2966 assert!(matches!(parse_category("facts"), Ok(Category::Facts)));
2967 assert!(matches!(
2968 parse_category("preferences"),
2969 Ok(Category::Preferences)
2970 ));
2971 assert!(parse_category("bogus").is_err());
2973 assert!(parse_category("").is_err());
2974 }
2975
2976 #[test]
2977 fn test_store_memory_params_fields() {
2978 let params = StoreMemoryParams {
2980 namespace_id: 1,
2981 content: "test content",
2982 category: &Category::General,
2983 memory_lane_type: None,
2984 labels: &[],
2985 metadata: &serde_json::Value::Null,
2986 embedding: None,
2987 embedding_model: None,
2988 };
2989 assert_eq!(params.namespace_id, 1);
2990 assert_eq!(params.content, "test content");
2991 assert!(params.labels.is_empty());
2992 }
2993
2994 #[test]
2995 fn test_merge_duplicate_metadata_preserves_multiple_session_keys() {
2996 let existing = serde_json::json!({
2997 "cognitive": {
2998 "session_key": "session-a"
2999 },
3000 "source": {
3001 "derived_session_key": "session-a"
3002 }
3003 });
3004 let incoming = serde_json::json!({
3005 "cognitive": {
3006 "session_key": "session-b"
3007 },
3008 "source": {
3009 "derived_session_key": "session-b"
3010 }
3011 });
3012
3013 let merged = merge_duplicate_metadata(&existing, &incoming);
3014 assert_eq!(merged["cognitive"]["session_key"], "session-b");
3015 assert_eq!(
3016 merged["cognitive"]["session_keys"],
3017 serde_json::json!(["session-a", "session-b"])
3018 );
3019 assert_eq!(
3020 merged["source"]["derived_session_keys"],
3021 serde_json::json!(["session-a", "session-b"])
3022 );
3023 }
3024
3025 async fn setup_test_db() -> SqlitePool {
3028 let pool = SqlitePoolOptions::new()
3029 .max_connections(1)
3030 .connect("sqlite::memory:")
3031 .await
3032 .unwrap();
3033 crate::migrations::run_migrations(&pool).await.unwrap();
3034 pool
3035 }
3036
3037 async fn create_namespace(pool: &SqlitePool, name: &str) -> i64 {
3038 let ns = NamespaceRepository::new(pool.clone());
3039 ns.get_or_create(name, "test").await.unwrap();
3040 ns.get_by_name(name).await.unwrap().unwrap().id
3041 }
3042
3043 #[tokio::test]
3049 async fn test_get_by_content_matches_actual_content() {
3050 let pool = setup_test_db().await;
3051 let ns_id = create_namespace(&pool, "test-agent").await;
3052 let repo = MemoryRepository::new(pool);
3053
3054 let mem_a = repo
3056 .store(StoreMemoryParams {
3057 namespace_id: ns_id,
3058 content: "first memory content",
3059 category: &Category::General,
3060 memory_lane_type: None,
3061 labels: &[],
3062 metadata: &serde_json::Value::Null,
3063 embedding: None,
3064 embedding_model: None,
3065 })
3066 .await
3067 .unwrap();
3068
3069 let mem_b = repo
3070 .store(StoreMemoryParams {
3071 namespace_id: ns_id,
3072 content: "second memory content",
3073 category: &Category::General,
3074 memory_lane_type: None,
3075 labels: &[],
3076 metadata: &serde_json::Value::Null,
3077 embedding: None,
3078 embedding_model: None,
3079 })
3080 .await
3081 .unwrap();
3082
3083 assert_ne!(mem_a.id, mem_b.id);
3084
3085 let found_a = repo
3088 .get_by_content(ns_id, "first memory content")
3089 .await
3090 .unwrap();
3091 assert_eq!(found_a.id, mem_a.id);
3092 assert_eq!(found_a.content, "first memory content");
3093
3094 let found_b = repo
3095 .get_by_content(ns_id, "second memory content")
3096 .await
3097 .unwrap();
3098 assert_eq!(found_b.id, mem_b.id);
3099 assert_eq!(found_b.content, "second memory content");
3100
3101 let result = repo.get_by_content(ns_id, "nonexistent").await;
3103 assert!(result.is_err());
3104 }
3105
3106 #[tokio::test]
3107 async fn test_enqueue_and_claim_jobs() {
3108 let pool = setup_test_db().await;
3109 let ns_id = create_namespace(&pool, "test-agent").await;
3110 let repo = MemoryRepository::new(pool);
3111
3112 let id1 = repo
3114 .enqueue_job(EnqueueJobParams {
3115 namespace_id: ns_id,
3116 job_type: "derive_memory",
3117 priority: 100,
3118 perspective: None,
3119 payload: &serde_json::json!({"memory_id": 1}),
3120 })
3121 .await
3122 .unwrap();
3123
3124 let id2 = repo
3125 .enqueue_job(EnqueueJobParams {
3126 namespace_id: ns_id,
3127 job_type: "derive_memory",
3128 priority: 50,
3129 perspective: None,
3130 payload: &serde_json::json!({"memory_id": 2}),
3131 })
3132 .await
3133 .unwrap();
3134
3135 assert!(id1 > 0);
3136 assert!(id2 > 0);
3137 assert_ne!(id1, id2);
3138
3139 let claimed = repo
3141 .claim_jobs(ns_id, "derive_memory", "worker-1", 120, 1)
3142 .await
3143 .unwrap();
3144
3145 assert_eq!(claimed.len(), 1);
3146 assert_eq!(claimed[0].row.id, id1); assert_eq!(claimed[0].row.status, "running");
3148 assert_eq!(claimed[0].payload["memory_id"], 1);
3149
3150 let claimed2 = repo
3152 .claim_jobs(ns_id, "derive_memory", "worker-2", 120, 1)
3153 .await
3154 .unwrap();
3155
3156 assert_eq!(claimed2.len(), 1);
3157 assert_eq!(claimed2[0].row.id, id2);
3158 }
3159
3160 #[tokio::test]
3161 async fn test_complete_and_fail_job() {
3162 let pool = setup_test_db().await;
3163 let ns_id = create_namespace(&pool, "test-agent").await;
3164 let repo = MemoryRepository::new(pool);
3165
3166 let _id = repo
3167 .enqueue_job(EnqueueJobParams {
3168 namespace_id: ns_id,
3169 job_type: "digest_session",
3170 priority: 100,
3171 perspective: None,
3172 payload: &serde_json::json!({"session": "s1"}),
3173 })
3174 .await
3175 .unwrap();
3176
3177 let claimed = repo
3179 .claim_jobs(ns_id, "digest_session", "w", 60, 10)
3180 .await
3181 .unwrap();
3182 assert_eq!(claimed.len(), 1);
3183
3184 repo.complete_job(&claimed[0]).await.unwrap();
3185
3186 let claimed_again = repo
3188 .claim_jobs(ns_id, "digest_session", "w", 60, 10)
3189 .await
3190 .unwrap();
3191 assert!(claimed_again.is_empty());
3192 }
3193
3194 #[tokio::test]
3195 async fn test_fail_job_requeues_before_limit() {
3196 let pool = setup_test_db().await;
3197 let ns_id = create_namespace(&pool, "test-agent").await;
3198 let repo = MemoryRepository::new(pool);
3199
3200 let _id = repo
3201 .enqueue_job(EnqueueJobParams {
3202 namespace_id: ns_id,
3203 job_type: "derive_memory",
3204 priority: 100,
3205 perspective: None,
3206 payload: &serde_json::json!({"test": true}),
3207 })
3208 .await
3209 .unwrap();
3210
3211 let claimed = repo
3213 .claim_jobs(ns_id, "derive_memory", "w", 60, 10)
3214 .await
3215 .unwrap();
3216 repo.fail_job(&claimed[0], "transient error").await.unwrap();
3217
3218 let reclaimed = repo
3219 .claim_jobs(ns_id, "derive_memory", "w", 60, 10)
3220 .await
3221 .unwrap();
3222 assert_eq!(reclaimed.len(), 1);
3223 assert_eq!(reclaimed[0].row.attempts, 2);
3224 }
3225
3226 #[tokio::test]
3227 async fn test_complete_job_requires_matching_claim_token() {
3228 let pool = setup_test_db().await;
3229 let ns_id = create_namespace(&pool, "test-agent").await;
3230 let repo = MemoryRepository::new(pool);
3231
3232 repo.enqueue_job(EnqueueJobParams {
3233 namespace_id: ns_id,
3234 job_type: "derive_memory",
3235 priority: 100,
3236 perspective: None,
3237 payload: &serde_json::json!({"memory_id": 7}),
3238 })
3239 .await
3240 .unwrap();
3241
3242 let claimed = repo
3243 .claim_jobs(ns_id, "derive_memory", "worker-1", 60, 1)
3244 .await
3245 .unwrap();
3246 let mut forged = claimed[0].clone();
3247 forged.row.claim_token = Some("forged-token".to_string());
3248
3249 let error = repo.complete_job(&forged).await.unwrap_err();
3250 assert!(error.to_string().contains("lost lease ownership"));
3251 }
3252
3253 #[tokio::test]
3254 async fn test_store_digest_and_latest_digest() {
3255 let pool = setup_test_db().await;
3256 let ns_id = create_namespace(&pool, "test-agent").await;
3257 let repo = MemoryRepository::new(pool);
3258
3259 let digest_memory = repo
3261 .store(StoreMemoryParams {
3262 namespace_id: ns_id,
3263 content: "session summary short",
3264 category: &Category::Session,
3265 memory_lane_type: None,
3266 labels: &[],
3267 metadata: &serde_json::json!({"cognitive": {"level": "summary_short"}}),
3268 embedding: None,
3269 embedding_model: None,
3270 })
3271 .await
3272 .unwrap();
3273
3274 let digest_id = repo
3275 .store_digest(StoreDigestParams {
3276 namespace_id: ns_id,
3277 session_key: "session-abc",
3278 digest_kind: "short",
3279 memory_id: digest_memory.id,
3280 start_memory_id: Some(1),
3281 end_memory_id: Some(100),
3282 token_count: 42,
3283 })
3284 .await
3285 .unwrap();
3286
3287 assert!(digest_id > 0);
3288
3289 let result = repo
3291 .latest_digest_for_session(ns_id, "session-abc", "short")
3292 .await
3293 .unwrap();
3294
3295 assert!(result.is_some());
3296 assert_eq!(result.as_ref().unwrap().id, digest_memory.id);
3297
3298 let replacement_memory = repo
3299 .store(StoreMemoryParams {
3300 namespace_id: ns_id,
3301 content: "session summary short updated",
3302 category: &Category::Session,
3303 memory_lane_type: None,
3304 labels: &[],
3305 metadata: &serde_json::json!({"cognitive": {"level": "summary_short"}}),
3306 embedding: None,
3307 embedding_model: None,
3308 })
3309 .await
3310 .unwrap();
3311
3312 let replacement_digest_id = repo
3313 .store_digest(StoreDigestParams {
3314 namespace_id: ns_id,
3315 session_key: "session-abc",
3316 digest_kind: "short",
3317 memory_id: replacement_memory.id,
3318 start_memory_id: Some(1),
3319 end_memory_id: Some(100),
3320 token_count: 64,
3321 })
3322 .await
3323 .unwrap();
3324
3325 assert_eq!(replacement_digest_id, digest_id);
3326
3327 let updated = repo
3328 .latest_digest_for_session(ns_id, "session-abc", "short")
3329 .await
3330 .unwrap()
3331 .unwrap();
3332 assert_eq!(updated.id, replacement_memory.id);
3333
3334 let latest_for_namespace = repo
3335 .latest_digest_for_namespace(ns_id, "short")
3336 .await
3337 .unwrap()
3338 .unwrap();
3339 assert_eq!(latest_for_namespace.id, replacement_memory.id);
3340 }
3341
3342 #[tokio::test]
3343 async fn test_session_digest_rollover_reports_new_signal_since_last_digest() {
3344 let pool = setup_test_db().await;
3345 let ns_id = create_namespace(&pool, "test-agent").await;
3346 let repo = MemoryRepository::new(pool);
3347
3348 let source = repo
3349 .store(StoreMemoryParams {
3350 namespace_id: ns_id,
3351 content: "Implemented bounded digest rollover policy.",
3352 category: &Category::Session,
3353 memory_lane_type: None,
3354 labels: &[],
3355 metadata: &serde_json::json!({
3356 "cognitive": {
3357 "level": "explicit",
3358 "observer": "claude-code",
3359 "subject": "claude-code",
3360 "session_key": "session-rollover"
3361 }
3362 }),
3363 embedding: None,
3364 embedding_model: None,
3365 })
3366 .await
3367 .unwrap();
3368
3369 let first = repo
3370 .session_digest_rollover(ns_id, "session-rollover")
3371 .await
3372 .unwrap();
3373 assert_eq!(first.last_digest_end_memory_id, None);
3374 assert_eq!(first.new_memory_count, 1);
3375 assert!(first.estimated_new_tokens > 0);
3376
3377 let digest_memory = repo
3378 .store(StoreMemoryParams {
3379 namespace_id: ns_id,
3380 content: "Short digest",
3381 category: &Category::Session,
3382 memory_lane_type: None,
3383 labels: &[],
3384 metadata: &serde_json::json!({
3385 "cognitive": {
3386 "level": "summary_short",
3387 "observer": "claude-code",
3388 "subject": "claude-code",
3389 "session_key": "session-rollover"
3390 }
3391 }),
3392 embedding: None,
3393 embedding_model: None,
3394 })
3395 .await
3396 .unwrap();
3397
3398 repo.store_digest(StoreDigestParams {
3399 namespace_id: ns_id,
3400 session_key: "session-rollover",
3401 digest_kind: "short",
3402 memory_id: digest_memory.id,
3403 start_memory_id: Some(source.id),
3404 end_memory_id: Some(source.id),
3405 token_count: 16,
3406 })
3407 .await
3408 .unwrap();
3409
3410 let covered = repo
3411 .session_digest_rollover(ns_id, "session-rollover")
3412 .await
3413 .unwrap();
3414 assert_eq!(covered.last_digest_end_memory_id, Some(source.id));
3415 assert_eq!(covered.new_memory_count, 0);
3416 assert_eq!(covered.estimated_new_tokens, 0);
3417
3418 repo.store(StoreMemoryParams {
3419 namespace_id: ns_id,
3420 content: "Added one more explicit memory after the digest coverage window.",
3421 category: &Category::Session,
3422 memory_lane_type: None,
3423 labels: &[],
3424 metadata: &serde_json::json!({
3425 "cognitive": {
3426 "level": "explicit",
3427 "observer": "claude-code",
3428 "subject": "claude-code",
3429 "session_key": "session-rollover"
3430 }
3431 }),
3432 embedding: None,
3433 embedding_model: None,
3434 })
3435 .await
3436 .unwrap();
3437
3438 let second = repo
3439 .session_digest_rollover(ns_id, "session-rollover")
3440 .await
3441 .unwrap();
3442 assert_eq!(second.last_digest_end_memory_id, Some(source.id));
3443 assert_eq!(second.new_memory_count, 1);
3444 assert!(second.estimated_new_tokens > 0);
3445 }
3446
3447 #[tokio::test]
3448 async fn test_store_with_lineage() {
3449 let pool = setup_test_db().await;
3450 let ns_id = create_namespace(&pool, "test-agent").await;
3451 let repo = MemoryRepository::new(pool);
3452
3453 let source1 = repo
3455 .store(StoreMemoryParams {
3456 namespace_id: ns_id,
3457 content: "raw observation one",
3458 category: &Category::Facts,
3459 memory_lane_type: None,
3460 labels: &[],
3461 metadata: &serde_json::Value::Null,
3462 embedding: None,
3463 embedding_model: None,
3464 })
3465 .await
3466 .unwrap();
3467
3468 let source2 = repo
3469 .store(StoreMemoryParams {
3470 namespace_id: ns_id,
3471 content: "raw observation two",
3472 category: &Category::Facts,
3473 memory_lane_type: None,
3474 labels: &[],
3475 metadata: &serde_json::Value::Null,
3476 embedding: None,
3477 embedding_model: None,
3478 })
3479 .await
3480 .unwrap();
3481
3482 let derived = repo
3484 .store_with_lineage(StoreMemoryWithLineageParams {
3485 store: StoreMemoryParams {
3486 namespace_id: ns_id,
3487 content: "derived insight",
3488 category: &Category::Facts,
3489 memory_lane_type: None,
3490 labels: &[],
3491 metadata: &serde_json::json!({"cognitive": {"level": "derived"}}),
3492 embedding: None,
3493 embedding_model: None,
3494 },
3495 source_memory_ids: &[source1.id, source2.id],
3496 evidence_role: "derived_from",
3497 })
3498 .await
3499 .unwrap();
3500
3501 assert_eq!(derived.content, "derived insight");
3502
3503 let lineage = repo.load_lineage(derived.id).await.unwrap();
3505 assert_eq!(lineage.len(), 2);
3506 assert!(lineage.iter().any(|e| e.source_memory_id == source1.id));
3507 assert!(lineage.iter().any(|e| e.source_memory_id == source2.id));
3508 }
3509
3510 #[tokio::test]
3511 async fn test_cognitive_queries_by_level_and_perspective() {
3512 let pool = setup_test_db().await;
3513 let ns_id = create_namespace(&pool, "test-agent").await;
3514 let repo = MemoryRepository::new(pool);
3515 let perspective =
3516 PerspectiveKey::new("claude-code", "claude-code", Some("session-1".into()));
3517
3518 let _raw = repo
3519 .store(StoreMemoryParams {
3520 namespace_id: ns_id,
3521 content: "raw note",
3522 category: &Category::Session,
3523 memory_lane_type: None,
3524 labels: &[],
3525 metadata: &cognitive_metadata(CognitiveLevel::Raw, &perspective, 0, 0),
3526 embedding: None,
3527 embedding_model: None,
3528 })
3529 .await
3530 .unwrap();
3531
3532 let explicit = repo
3533 .store(StoreMemoryParams {
3534 namespace_id: ns_id,
3535 content: "explicit note",
3536 category: &Category::Session,
3537 memory_lane_type: None,
3538 labels: &[],
3539 metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective, 1, 0),
3540 embedding: None,
3541 embedding_model: None,
3542 })
3543 .await
3544 .unwrap();
3545
3546 let derived = repo
3547 .store(StoreMemoryParams {
3548 namespace_id: ns_id,
3549 content: "reinforced insight",
3550 category: &Category::Facts,
3551 memory_lane_type: None,
3552 labels: &[],
3553 metadata: &cognitive_metadata(CognitiveLevel::Derived, &perspective, 7, 0),
3554 embedding: None,
3555 embedding_model: None,
3556 })
3557 .await
3558 .unwrap();
3559
3560 let contradiction = repo
3561 .store(StoreMemoryParams {
3562 namespace_id: ns_id,
3563 content: "contradiction note",
3564 category: &Category::Facts,
3565 memory_lane_type: None,
3566 labels: &[],
3567 metadata: &cognitive_metadata(CognitiveLevel::Contradiction, &perspective, 1, 5),
3568 embedding: None,
3569 embedding_model: None,
3570 })
3571 .await
3572 .unwrap();
3573
3574 let explicit_rows = repo
3575 .get_by_cognitive_level(ns_id, CognitiveLevel::Explicit, 10)
3576 .await
3577 .unwrap();
3578 assert_eq!(explicit_rows.len(), 1);
3579 assert_eq!(explicit_rows[0].id, explicit.id);
3580
3581 let recent = repo
3582 .get_recent_by_perspective(ns_id, &perspective, 10)
3583 .await
3584 .unwrap();
3585 assert_eq!(recent.len(), 4);
3586
3587 let reinforced = repo
3588 .get_most_reinforced_by_perspective(ns_id, &perspective, 10)
3589 .await
3590 .unwrap();
3591 assert_eq!(reinforced[0].id, derived.id);
3592 assert!(reinforced
3593 .iter()
3594 .all(|memory| memory.id != contradiction.id));
3595
3596 let contradictions = repo
3597 .get_contradictions_by_perspective(ns_id, &perspective, 10)
3598 .await
3599 .unwrap();
3600 assert_eq!(contradictions.len(), 1);
3601 assert_eq!(contradictions[0].id, contradiction.id);
3602 }
3603
3604 #[tokio::test]
3605 async fn test_store_distilled_summary_archives_sources_and_records_lineage() {
3606 let pool = setup_test_db().await;
3607 let ns_id = create_namespace(&pool, "test-agent").await;
3608 let repo = MemoryRepository::new(pool);
3609
3610 let source1 = repo
3611 .store(StoreMemoryParams {
3612 namespace_id: ns_id,
3613 content: "raw event 1",
3614 category: &Category::Session,
3615 memory_lane_type: None,
3616 labels: &["raw-activity".to_string()],
3617 metadata: &serde_json::json!({"raw_activity": true}),
3618 embedding: None,
3619 embedding_model: None,
3620 })
3621 .await
3622 .unwrap();
3623
3624 let source2 = repo
3625 .store(StoreMemoryParams {
3626 namespace_id: ns_id,
3627 content: "raw event 2",
3628 category: &Category::Session,
3629 memory_lane_type: None,
3630 labels: &["raw-activity".to_string()],
3631 metadata: &serde_json::json!({"raw_activity": true}),
3632 embedding: None,
3633 embedding_model: None,
3634 })
3635 .await
3636 .unwrap();
3637
3638 let summary = repo
3639 .store_distilled_summary(
3640 StoreMemoryParams {
3641 namespace_id: ns_id,
3642 content: "distilled summary",
3643 category: &Category::Session,
3644 memory_lane_type: None,
3645 labels: &["activity-summary".to_string()],
3646 metadata: &serde_json::json!({"pipeline": "distill-v1"}),
3647 embedding: None,
3648 embedding_model: None,
3649 },
3650 &[source1.id, source2.id],
3651 )
3652 .await
3653 .unwrap();
3654
3655 let source1_after = repo.get_by_id(source1.id).await.unwrap().unwrap();
3656 let source2_after = repo.get_by_id(source2.id).await.unwrap().unwrap();
3657 assert!(!source1_after.is_active);
3658 assert!(source1_after.is_archived);
3659 assert!(!source2_after.is_active);
3660 assert!(source2_after.is_archived);
3661
3662 let lineage = repo.load_lineage(summary.id).await.unwrap();
3663 assert_eq!(lineage.len(), 2);
3664 assert!(lineage.iter().all(|entry| entry.evidence_role == "source"));
3665 }
3666
3667 #[tokio::test]
3668 async fn test_load_lineage_empty() {
3669 let pool = setup_test_db().await;
3670 let _ns_id = create_namespace(&pool, "test-agent").await;
3671 let repo = MemoryRepository::new(pool);
3672
3673 let lineage = repo.load_lineage(9999).await.unwrap();
3674 assert!(lineage.is_empty());
3675 }
3676
3677 #[tokio::test]
3680 async fn test_recent_perspective_excludes_raw_noise() {
3681 let pool = setup_test_db().await;
3682 let ns_id = create_namespace(&pool, "test-agent").await;
3683 let repo = MemoryRepository::new(pool);
3684 let perspective = PerspectiveKey::new("claude-code", "claude-code", Some("s1".into()));
3685
3686 repo.store(StoreMemoryParams {
3688 namespace_id: ns_id,
3689 content: "clean observation",
3690 category: &Category::Facts,
3691 memory_lane_type: None,
3692 labels: &[],
3693 metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective, 0, 0),
3694 embedding: None,
3695 embedding_model: None,
3696 })
3697 .await
3698 .unwrap();
3699
3700 repo.store(StoreMemoryParams {
3702 namespace_id: ns_id,
3703 content: "raw noise payload",
3704 category: &Category::Session,
3705 memory_lane_type: None,
3706 labels: &["raw-activity".to_string()],
3707 metadata: &serde_json::json!({
3708 "raw_activity": true,
3709 "cognitive": {
3710 "level": "raw",
3711 "observer": perspective.observer,
3712 "subject": perspective.subject,
3713 "session_key": perspective.session_key,
3714 "source_memory_ids": [],
3715 "confidence": 0.5,
3716 "times_reinforced": 0,
3717 "times_contradicted": 0,
3718 "generated_by": "test"
3719 }
3720 }),
3721 embedding: None,
3722 embedding_model: None,
3723 })
3724 .await
3725 .unwrap();
3726
3727 let recent = repo
3729 .get_recent_by_perspective(ns_id, &perspective, 10)
3730 .await
3731 .unwrap();
3732 assert_eq!(recent.len(), 1);
3733 assert_eq!(recent[0].content, "clean observation");
3734
3735 let recent_all = repo
3737 .get_recent_by_perspective_opts(ns_id, &perspective, 10, true)
3738 .await
3739 .unwrap();
3740 assert_eq!(recent_all.len(), 2);
3741 }
3742
3743 #[tokio::test]
3744 async fn test_semantic_candidates_respect_perspective_and_raw_noise_filtering() {
3745 let pool = setup_test_db().await;
3746 let ns_id = create_namespace(&pool, "test-agent").await;
3747 let repo = MemoryRepository::new(pool);
3748 let perspective = PerspectiveKey::new("claude-code", "claude-code", Some("s1".into()));
3749
3750 repo.store(StoreMemoryParams {
3751 namespace_id: ns_id,
3752 content: "clean semantic observation",
3753 category: &Category::Facts,
3754 memory_lane_type: None,
3755 labels: &[],
3756 metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective, 0, 0),
3757 embedding: Some(&[0.1_f32; 384]),
3758 embedding_model: Some("mock"),
3759 })
3760 .await
3761 .unwrap();
3762
3763 repo.store(StoreMemoryParams {
3764 namespace_id: ns_id,
3765 content: "raw semantic noise",
3766 category: &Category::Session,
3767 memory_lane_type: None,
3768 labels: &["raw-activity".to_string()],
3769 metadata: &serde_json::json!({
3770 "raw_activity": true,
3771 "cognitive": {
3772 "level": "raw",
3773 "observer": "claude-code",
3774 "subject": "claude-code",
3775 "session_key": "s1",
3776 "generated_by": "test"
3777 }
3778 }),
3779 embedding: Some(&[0.2_f32; 384]),
3780 embedding_model: Some("mock"),
3781 })
3782 .await
3783 .unwrap();
3784
3785 repo.store(StoreMemoryParams {
3786 namespace_id: ns_id,
3787 content: "other perspective semantic",
3788 category: &Category::Facts,
3789 memory_lane_type: None,
3790 labels: &[],
3791 metadata: &serde_json::json!({
3792 "cognitive": {
3793 "level": "explicit",
3794 "observer": "codex",
3795 "subject": "codex",
3796 "session_key": "s1",
3797 "generated_by": "test"
3798 }
3799 }),
3800 embedding: Some(&[0.3_f32; 384]),
3801 embedding_model: Some("mock"),
3802 })
3803 .await
3804 .unwrap();
3805
3806 let candidates = repo
3807 .get_semantic_candidates(SemanticCandidateParams {
3808 namespace_id: ns_id,
3809 perspective: Some(&perspective),
3810 limit: 10,
3811 include_raw: false,
3812 })
3813 .await
3814 .unwrap();
3815
3816 assert_eq!(candidates.len(), 1);
3817 assert_eq!(candidates[0].content, "clean semantic observation");
3818 }
3819
3820 #[tokio::test]
3821 async fn test_semantic_candidates_match_session_keys_array() {
3822 let pool = setup_test_db().await;
3823 let ns_id = create_namespace(&pool, "test-agent").await;
3824 let repo = MemoryRepository::new(pool);
3825 let perspective = PerspectiveKey::new("claude-code", "claude-code", Some("s-array".into()));
3826
3827 repo.store(StoreMemoryParams {
3828 namespace_id: ns_id,
3829 content: "session array semantic observation",
3830 category: &Category::Facts,
3831 memory_lane_type: None,
3832 labels: &[],
3833 metadata: &serde_json::json!({
3834 "cognitive": {
3835 "level": "explicit",
3836 "observer": "claude-code",
3837 "subject": "claude-code",
3838 "session_keys": ["s-array", "s-other"],
3839 "generated_by": "test"
3840 }
3841 }),
3842 embedding: Some(&[0.4_f32; 384]),
3843 embedding_model: Some("mock"),
3844 })
3845 .await
3846 .unwrap();
3847
3848 let candidates = repo
3849 .get_semantic_candidates(SemanticCandidateParams {
3850 namespace_id: ns_id,
3851 perspective: Some(&perspective),
3852 limit: 10,
3853 include_raw: false,
3854 })
3855 .await
3856 .unwrap();
3857
3858 assert_eq!(candidates.len(), 1);
3859 assert_eq!(candidates[0].content, "session array semantic observation");
3860 }
3861
3862 #[tokio::test]
3863 async fn test_reinforced_perspective_excludes_raw_noise() {
3864 let pool = setup_test_db().await;
3865 let ns_id = create_namespace(&pool, "test-agent").await;
3866 let repo = MemoryRepository::new(pool);
3867 let perspective = PerspectiveKey::new("claude-code", "claude-code", Some("s1".into()));
3868
3869 repo.store(StoreMemoryParams {
3870 namespace_id: ns_id,
3871 content: "reinforced insight",
3872 category: &Category::Facts,
3873 memory_lane_type: None,
3874 labels: &[],
3875 metadata: &cognitive_metadata(CognitiveLevel::Derived, &perspective, 5, 0),
3876 embedding: None,
3877 embedding_model: None,
3878 })
3879 .await
3880 .unwrap();
3881
3882 repo.store(StoreMemoryParams {
3883 namespace_id: ns_id,
3884 content: "raw noise",
3885 category: &Category::Session,
3886 memory_lane_type: None,
3887 labels: &["raw-activity".to_string()],
3888 metadata: &serde_json::json!({
3889 "raw_activity": true,
3890 "cognitive": {
3891 "level": "raw",
3892 "observer": perspective.observer,
3893 "subject": perspective.subject,
3894 "session_key": perspective.session_key,
3895 "source_memory_ids": [],
3896 "confidence": 0.5,
3897 "times_reinforced": 0,
3898 "times_contradicted": 0,
3899 "generated_by": "test"
3900 }
3901 }),
3902 embedding: None,
3903 embedding_model: None,
3904 })
3905 .await
3906 .unwrap();
3907
3908 let reinforced = repo
3909 .get_most_reinforced_by_perspective(ns_id, &perspective, 10)
3910 .await
3911 .unwrap();
3912 assert_eq!(reinforced.len(), 1);
3913 assert_eq!(reinforced[0].content, "reinforced insight");
3914 }
3915
3916 #[tokio::test]
3917 async fn test_contradictions_perspective_excludes_raw_noise() {
3918 let pool = setup_test_db().await;
3919 let ns_id = create_namespace(&pool, "test-agent").await;
3920 let repo = MemoryRepository::new(pool);
3921 let perspective = PerspectiveKey::new("claude-code", "claude-code", Some("s1".into()));
3922
3923 repo.store(StoreMemoryParams {
3924 namespace_id: ns_id,
3925 content: "a real contradiction",
3926 category: &Category::Facts,
3927 memory_lane_type: None,
3928 labels: &[],
3929 metadata: &cognitive_metadata(CognitiveLevel::Contradiction, &perspective, 0, 3),
3930 embedding: None,
3931 embedding_model: None,
3932 })
3933 .await
3934 .unwrap();
3935
3936 repo.store(StoreMemoryParams {
3937 namespace_id: ns_id,
3938 content: "raw noise",
3939 category: &Category::Session,
3940 memory_lane_type: None,
3941 labels: &["raw-activity".to_string()],
3942 metadata: &serde_json::json!({
3943 "raw_activity": true,
3944 "cognitive": {
3945 "level": "raw",
3946 "observer": perspective.observer,
3947 "subject": perspective.subject,
3948 "session_key": perspective.session_key,
3949 "source_memory_ids": [],
3950 "confidence": 0.5,
3951 "times_reinforced": 0,
3952 "times_contradicted": 0,
3953 "generated_by": "test"
3954 }
3955 }),
3956 embedding: None,
3957 embedding_model: None,
3958 })
3959 .await
3960 .unwrap();
3961
3962 let contradictions = repo
3963 .get_contradictions_by_perspective(ns_id, &perspective, 10)
3964 .await
3965 .unwrap();
3966 assert_eq!(contradictions.len(), 1);
3967 assert_eq!(contradictions[0].content, "a real contradiction");
3968 }
3969
3970 #[tokio::test]
3973 async fn test_search_working_set_basic() {
3974 let pool = setup_test_db().await;
3975 let ns_id = create_namespace(&pool, "test-agent").await;
3976 let repo = MemoryRepository::new(pool);
3977 let perspective = PerspectiveKey::new("claude-code", "claude-code", Some("s1".into()));
3978
3979 let _raw = repo
3981 .store(StoreMemoryParams {
3982 namespace_id: ns_id,
3983 content: "raw note",
3984 category: &Category::Session,
3985 memory_lane_type: None,
3986 labels: &[],
3987 metadata: &cognitive_metadata(CognitiveLevel::Raw, &perspective, 0, 0),
3988 embedding: None,
3989 embedding_model: None,
3990 })
3991 .await
3992 .unwrap();
3993
3994 let explicit = repo
3995 .store(StoreMemoryParams {
3996 namespace_id: ns_id,
3997 content: "explicit fact",
3998 category: &Category::Facts,
3999 memory_lane_type: None,
4000 labels: &[],
4001 metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective, 3, 0),
4002 embedding: None,
4003 embedding_model: None,
4004 })
4005 .await
4006 .unwrap();
4007
4008 let derived = repo
4009 .store(StoreMemoryParams {
4010 namespace_id: ns_id,
4011 content: "derived insight",
4012 category: &Category::Facts,
4013 memory_lane_type: None,
4014 labels: &[],
4015 metadata: &cognitive_metadata(CognitiveLevel::Derived, &perspective, 8, 0),
4016 embedding: None,
4017 embedding_model: None,
4018 })
4019 .await
4020 .unwrap();
4021
4022 let contradiction = repo
4023 .store(StoreMemoryParams {
4024 namespace_id: ns_id,
4025 content: "contradiction",
4026 category: &Category::Facts,
4027 memory_lane_type: None,
4028 labels: &[],
4029 metadata: &cognitive_metadata(CognitiveLevel::Contradiction, &perspective, 0, 2),
4030 embedding: None,
4031 embedding_model: None,
4032 })
4033 .await
4034 .unwrap();
4035
4036 let result = repo
4037 .search_working_set(WorkingSetParams {
4038 namespace_id: ns_id,
4039 perspective: Some(&perspective),
4040 max_items: 20,
4041 include_raw: false,
4042 })
4043 .await
4044 .unwrap();
4045
4046 assert!(result.len() >= 3);
4049 let ids: Vec<i64> = result.iter().map(|m| m.id).collect();
4050 assert!(ids.contains(&explicit.id));
4051 assert!(ids.contains(&derived.id));
4052 assert!(ids.contains(&contradiction.id));
4053 }
4054
4055 #[tokio::test]
4056 async fn test_search_working_set_dedupes() {
4057 let pool = setup_test_db().await;
4058 let ns_id = create_namespace(&pool, "test-agent").await;
4059 let repo = MemoryRepository::new(pool);
4060 let perspective = PerspectiveKey::new("claude-code", "claude-code", Some("s1".into()));
4061
4062 let shared = repo
4064 .store(StoreMemoryParams {
4065 namespace_id: ns_id,
4066 content: "shared memory",
4067 category: &Category::Facts,
4068 memory_lane_type: None,
4069 labels: &[],
4070 metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective, 10, 0),
4071 embedding: None,
4072 embedding_model: None,
4073 })
4074 .await
4075 .unwrap();
4076
4077 let result = repo
4078 .search_working_set(WorkingSetParams {
4079 namespace_id: ns_id,
4080 perspective: Some(&perspective),
4081 max_items: 20,
4082 include_raw: false,
4083 })
4084 .await
4085 .unwrap();
4086
4087 let count = result.iter().filter(|m| m.id == shared.id).count();
4088 assert_eq!(count, 1, "shared memory should appear exactly once");
4089 }
4090
4091 #[tokio::test]
4092 async fn test_search_working_set_respects_max_items() {
4093 let pool = setup_test_db().await;
4094 let ns_id = create_namespace(&pool, "test-agent").await;
4095 let repo = MemoryRepository::new(pool);
4096 let perspective = PerspectiveKey::new("claude-code", "claude-code", Some("s1".into()));
4097
4098 for i in 0..10 {
4099 let content = format!("memory {}", i);
4100 repo.store(StoreMemoryParams {
4101 namespace_id: ns_id,
4102 content: &content,
4103 category: &Category::Facts,
4104 memory_lane_type: None,
4105 labels: &[],
4106 metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective, i as i64, 0),
4107 embedding: None,
4108 embedding_model: None,
4109 })
4110 .await
4111 .unwrap();
4112 }
4113
4114 let result = repo
4115 .search_working_set(WorkingSetParams {
4116 namespace_id: ns_id,
4117 perspective: Some(&perspective),
4118 max_items: 3,
4119 include_raw: false,
4120 })
4121 .await
4122 .unwrap();
4123
4124 assert_eq!(result.len(), 3);
4125 }
4126
4127 #[tokio::test]
4128 async fn test_search_working_set_excludes_raw_noise() {
4129 let pool = setup_test_db().await;
4130 let ns_id = create_namespace(&pool, "test-agent").await;
4131 let repo = MemoryRepository::new(pool);
4132 let perspective = PerspectiveKey::new("claude-code", "claude-code", Some("s1".into()));
4133
4134 repo.store(StoreMemoryParams {
4135 namespace_id: ns_id,
4136 content: "real observation",
4137 category: &Category::Facts,
4138 memory_lane_type: None,
4139 labels: &[],
4140 metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective, 1, 0),
4141 embedding: None,
4142 embedding_model: None,
4143 })
4144 .await
4145 .unwrap();
4146
4147 repo.store(StoreMemoryParams {
4148 namespace_id: ns_id,
4149 content: "raw noise",
4150 category: &Category::Session,
4151 memory_lane_type: None,
4152 labels: &["raw-activity".to_string()],
4153 metadata: &serde_json::json!({"raw_activity": true, "cognitive": {"level": "raw"}}),
4154 embedding: None,
4155 embedding_model: None,
4156 })
4157 .await
4158 .unwrap();
4159
4160 let result = repo
4161 .search_working_set(WorkingSetParams {
4162 namespace_id: ns_id,
4163 perspective: Some(&perspective),
4164 max_items: 20,
4165 include_raw: false,
4166 })
4167 .await
4168 .unwrap();
4169
4170 assert!(result.iter().all(|m| m.content != "raw noise"));
4171 assert!(result.iter().any(|m| m.content == "real observation"));
4172 }
4173
4174 #[tokio::test]
4175 async fn test_search_working_set_without_perspective() {
4176 let pool = setup_test_db().await;
4177 let ns_id = create_namespace(&pool, "test-agent").await;
4178 let repo = MemoryRepository::new(pool);
4179
4180 repo.store(StoreMemoryParams {
4181 namespace_id: ns_id,
4182 content: "namespace memory one",
4183 category: &Category::Facts,
4184 memory_lane_type: None,
4185 labels: &[],
4186 metadata: &serde_json::json!({"cognitive": {"level": "explicit"}}),
4187 embedding: None,
4188 embedding_model: None,
4189 })
4190 .await
4191 .unwrap();
4192
4193 repo.store(StoreMemoryParams {
4194 namespace_id: ns_id,
4195 content: "namespace memory two",
4196 category: &Category::Facts,
4197 memory_lane_type: None,
4198 labels: &[],
4199 metadata: &serde_json::json!({"cognitive": {"level": "explicit"}}),
4200 embedding: None,
4201 embedding_model: None,
4202 })
4203 .await
4204 .unwrap();
4205
4206 let result = repo
4207 .search_working_set(WorkingSetParams {
4208 namespace_id: ns_id,
4209 perspective: None,
4210 max_items: 20,
4211 include_raw: false,
4212 })
4213 .await
4214 .unwrap();
4215
4216 assert!(result.len() >= 2);
4217 }
4218
4219 #[tokio::test]
4220 async fn test_list_by_session_key_matches_session_keys_array() {
4221 let pool = setup_test_db().await;
4222 let ns_id = create_namespace(&pool, "test-agent").await;
4223 let repo = MemoryRepository::new(pool);
4224
4225 repo.store(StoreMemoryParams {
4226 namespace_id: ns_id,
4227 content: "shared explicit memory",
4228 category: &Category::Facts,
4229 memory_lane_type: None,
4230 labels: &[],
4231 metadata: &serde_json::json!({
4232 "cognitive": {
4233 "level": "explicit",
4234 "session_key": "session-b",
4235 "session_keys": ["session-a", "session-b"]
4236 }
4237 }),
4238 embedding: None,
4239 embedding_model: None,
4240 })
4241 .await
4242 .unwrap();
4243
4244 let session_a = repo
4245 .list_by_session_key(ns_id, "session-a", 10, false)
4246 .await
4247 .unwrap();
4248 let session_b = repo
4249 .list_by_session_key(ns_id, "session-b", 10, false)
4250 .await
4251 .unwrap();
4252
4253 assert_eq!(session_a.len(), 1);
4254 assert_eq!(session_b.len(), 1);
4255 }
4256
4257 #[tokio::test]
4258 async fn test_count_evidence_returns_zero_for_empty_namespace() {
4259 let pool = setup_test_db().await;
4260 let ns_id = create_namespace(&pool, "test-agent").await;
4261 let repo = MemoryRepository::new(pool);
4262
4263 let count = repo.count_evidence(ns_id).await.unwrap();
4264 assert_eq!(count, 0);
4265 }
4266
4267 #[tokio::test]
4268 async fn test_count_evidence_counts_lineage_edges() {
4269 let pool = setup_test_db().await;
4270 let ns_id = create_namespace(&pool, "test-agent").await;
4271 let repo = MemoryRepository::new(pool);
4272
4273 let source = repo
4274 .store(StoreMemoryParams {
4275 namespace_id: ns_id,
4276 content: "source memory",
4277 category: &Category::Session,
4278 memory_lane_type: None,
4279 labels: &[],
4280 metadata: &serde_json::json!({}),
4281 embedding: None,
4282 embedding_model: None,
4283 })
4284 .await
4285 .unwrap();
4286
4287 let _derived = repo
4288 .store_with_lineage(StoreMemoryWithLineageParams {
4289 store: StoreMemoryParams {
4290 namespace_id: ns_id,
4291 content: "derived with evidence",
4292 category: &Category::Facts,
4293 memory_lane_type: None,
4294 labels: &[],
4295 metadata: &serde_json::json!({"cognitive": {"level": "derived"}}),
4296 embedding: None,
4297 embedding_model: None,
4298 },
4299 source_memory_ids: &[source.id],
4300 evidence_role: "source",
4301 })
4302 .await
4303 .unwrap();
4304
4305 let count = repo.count_evidence(ns_id).await.unwrap();
4306 assert_eq!(count, 1);
4307 }
4308
4309 #[tokio::test]
4310 async fn test_count_evidence_does_not_count_other_namespace() {
4311 let pool = setup_test_db().await;
4312 let ns_a = create_namespace(&pool, "agent-a").await;
4313 let ns_b = create_namespace(&pool, "agent-b").await;
4314 let repo = MemoryRepository::new(pool);
4315
4316 let source = repo
4317 .store(StoreMemoryParams {
4318 namespace_id: ns_a,
4319 content: "source in ns-a",
4320 category: &Category::Session,
4321 memory_lane_type: None,
4322 labels: &[],
4323 metadata: &serde_json::json!({}),
4324 embedding: None,
4325 embedding_model: None,
4326 })
4327 .await
4328 .unwrap();
4329
4330 let _derived = repo
4331 .store_with_lineage(StoreMemoryWithLineageParams {
4332 store: StoreMemoryParams {
4333 namespace_id: ns_a,
4334 content: "derived in ns-a",
4335 category: &Category::Facts,
4336 memory_lane_type: None,
4337 labels: &[],
4338 metadata: &serde_json::json!({}),
4339 embedding: None,
4340 embedding_model: None,
4341 },
4342 source_memory_ids: &[source.id],
4343 evidence_role: "source",
4344 })
4345 .await
4346 .unwrap();
4347
4348 assert_eq!(repo.count_evidence(ns_a).await.unwrap(), 1);
4349 assert_eq!(repo.count_evidence(ns_b).await.unwrap(), 0);
4350 }
4351
4352 #[tokio::test]
4353 async fn test_count_by_cognitive_level_returns_matching_total() {
4354 let pool = setup_test_db().await;
4355 let ns_id = create_namespace(&pool, "level-counts").await;
4356 let repo = MemoryRepository::new(pool);
4357
4358 for (content, level) in [
4359 ("raw event", CognitiveLevel::Raw),
4360 ("derived insight", CognitiveLevel::Derived),
4361 ("derived insight 2", CognitiveLevel::Derived),
4362 ("contradiction note", CognitiveLevel::Contradiction),
4363 ] {
4364 repo.store(StoreMemoryParams {
4365 namespace_id: ns_id,
4366 content,
4367 category: &Category::Session,
4368 memory_lane_type: None,
4369 labels: &[],
4370 metadata: &serde_json::json!({
4371 "cognitive": {
4372 "level": level.as_str(),
4373 "observer": "claude-code",
4374 "subject": "claude-code",
4375 "generated_by": "test"
4376 }
4377 }),
4378 embedding: None,
4379 embedding_model: None,
4380 })
4381 .await
4382 .unwrap();
4383 }
4384
4385 assert_eq!(
4386 repo.count_by_cognitive_level(ns_id, CognitiveLevel::Derived)
4387 .await
4388 .unwrap(),
4389 2
4390 );
4391 assert_eq!(
4392 repo.count_by_cognitive_level(ns_id, CognitiveLevel::Contradiction)
4393 .await
4394 .unwrap(),
4395 1
4396 );
4397 }
4398
4399 #[tokio::test]
4403 async fn test_get_by_cognitive_level_with_perspective_filters_before_limit() {
4404 let pool = setup_test_db().await;
4405 let ns_id = create_namespace(&pool, "perspective-limit").await;
4406 let repo = MemoryRepository::new(pool);
4407
4408 let perspective_a = PerspectiveKey::new("alice", "project-x", None);
4409 let perspective_b = PerspectiveKey::new("bob", "project-y", None);
4410
4411 for i in 0..5 {
4413 repo.store(StoreMemoryParams {
4414 namespace_id: ns_id,
4415 content: &format!("alice memory {}", i),
4416 category: &Category::Facts,
4417 memory_lane_type: None,
4418 labels: &[],
4419 metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective_a, 0, 0),
4420 embedding: None,
4421 embedding_model: None,
4422 })
4423 .await
4424 .unwrap();
4425 }
4426
4427 for i in 0..5 {
4429 repo.store(StoreMemoryParams {
4430 namespace_id: ns_id,
4431 content: &format!("bob memory {}", i),
4432 category: &Category::Facts,
4433 memory_lane_type: None,
4434 labels: &[],
4435 metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective_b, 0, 0),
4436 embedding: None,
4437 embedding_model: None,
4438 })
4439 .await
4440 .unwrap();
4441 }
4442
4443 let alice_results = repo
4445 .get_by_cognitive_level_with_perspective(
4446 ns_id,
4447 CognitiveLevel::Explicit,
4448 &perspective_a,
4449 3,
4450 )
4451 .await
4452 .unwrap();
4453 assert_eq!(alice_results.len(), 3);
4454 assert!(alice_results.iter().all(|m| {
4455 let meta = &m.metadata;
4456 let obs = meta
4457 .get("cognitive")
4458 .and_then(|c| c.get("observer"))
4459 .and_then(|v| v.as_str());
4460 let sub = meta
4461 .get("cognitive")
4462 .and_then(|c| c.get("subject"))
4463 .and_then(|v| v.as_str());
4464 obs == Some("alice") && sub == Some("project-x")
4465 }));
4466
4467 let alice_many = repo
4469 .get_by_cognitive_level_with_perspective(
4470 ns_id,
4471 CognitiveLevel::Explicit,
4472 &perspective_a,
4473 10,
4474 )
4475 .await
4476 .unwrap();
4477 assert_eq!(alice_many.len(), 5);
4478
4479 let bob_results = repo
4481 .get_by_cognitive_level_with_perspective(
4482 ns_id,
4483 CognitiveLevel::Explicit,
4484 &perspective_b,
4485 3,
4486 )
4487 .await
4488 .unwrap();
4489 assert_eq!(bob_results.len(), 3);
4490 assert!(bob_results.iter().all(|m| {
4491 let meta = &m.metadata;
4492 let obs = meta
4493 .get("cognitive")
4494 .and_then(|c| c.get("observer"))
4495 .and_then(|v| v.as_str());
4496 let sub = meta
4497 .get("cognitive")
4498 .and_then(|c| c.get("subject"))
4499 .and_then(|v| v.as_str());
4500 obs == Some("bob") && sub == Some("project-y")
4501 }));
4502 }
4503
4504 #[tokio::test]
4506 async fn test_get_by_cognitive_level_with_perspective_respects_session_key() {
4507 let pool = setup_test_db().await;
4508 let ns_id = create_namespace(&pool, "session-key-scalar").await;
4509 let repo = MemoryRepository::new(pool);
4510
4511 let perspective_s1 =
4512 PerspectiveKey::new("alice", "project-x", Some("session-1".to_string()));
4513 let perspective_s2 =
4514 PerspectiveKey::new("alice", "project-x", Some("session-2".to_string()));
4515
4516 for i in 0..3 {
4517 repo.store(StoreMemoryParams {
4518 namespace_id: ns_id,
4519 content: &format!("s1 memory {}", i),
4520 category: &Category::Facts,
4521 memory_lane_type: None,
4522 labels: &[],
4523 metadata: &cognitive_metadata(CognitiveLevel::Derived, &perspective_s1, 0, 0),
4524 embedding: None,
4525 embedding_model: None,
4526 })
4527 .await
4528 .unwrap();
4529 }
4530 for i in 0..3 {
4531 repo.store(StoreMemoryParams {
4532 namespace_id: ns_id,
4533 content: &format!("s2 memory {}", i),
4534 category: &Category::Facts,
4535 memory_lane_type: None,
4536 labels: &[],
4537 metadata: &cognitive_metadata(CognitiveLevel::Derived, &perspective_s2, 0, 0),
4538 embedding: None,
4539 embedding_model: None,
4540 })
4541 .await
4542 .unwrap();
4543 }
4544
4545 let s1_results = repo
4546 .get_by_cognitive_level_with_perspective(
4547 ns_id,
4548 CognitiveLevel::Derived,
4549 &perspective_s1,
4550 10,
4551 )
4552 .await
4553 .unwrap();
4554 assert_eq!(s1_results.len(), 3);
4555 assert!(s1_results.iter().all(|m| m.content.starts_with("s1")));
4556
4557 let s2_results = repo
4558 .get_by_cognitive_level_with_perspective(
4559 ns_id,
4560 CognitiveLevel::Derived,
4561 &perspective_s2,
4562 10,
4563 )
4564 .await
4565 .unwrap();
4566 assert_eq!(s2_results.len(), 3);
4567 assert!(s2_results.iter().all(|m| m.content.starts_with("s2")));
4568 }
4569
4570 #[tokio::test]
4572 async fn test_get_by_cognitive_level_with_perspective_matches_session_keys_array() {
4573 let pool = setup_test_db().await;
4574 let ns_id = create_namespace(&pool, "session-keys-array").await;
4575 let repo = MemoryRepository::new(pool);
4576
4577 let perspective = PerspectiveKey::new("alice", "project-x", Some("session-a".to_string()));
4578
4579 repo.store(StoreMemoryParams {
4581 namespace_id: ns_id,
4582 content: "scalar match",
4583 category: &Category::Facts,
4584 memory_lane_type: None,
4585 labels: &[],
4586 metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective, 0, 0),
4587 embedding: None,
4588 embedding_model: None,
4589 })
4590 .await
4591 .unwrap();
4592
4593 repo.store(StoreMemoryParams {
4595 namespace_id: ns_id,
4596 content: "array match",
4597 category: &Category::Facts,
4598 memory_lane_type: None,
4599 labels: &[],
4600 metadata: &serde_json::json!({
4601 "cognitive": {
4602 "level": "explicit",
4603 "observer": "alice",
4604 "subject": "project-x",
4605 "session_key": "session-other",
4606 "session_keys": ["session-a", "session-b"],
4607 "generated_by": "test"
4608 }
4609 }),
4610 embedding: None,
4611 embedding_model: None,
4612 })
4613 .await
4614 .unwrap();
4615
4616 repo.store(StoreMemoryParams {
4618 namespace_id: ns_id,
4619 content: "no match",
4620 category: &Category::Facts,
4621 memory_lane_type: None,
4622 labels: &[],
4623 metadata: &serde_json::json!({
4624 "cognitive": {
4625 "level": "explicit",
4626 "observer": "alice",
4627 "subject": "project-x",
4628 "session_key": "session-other",
4629 "session_keys": ["session-z"],
4630 "generated_by": "test"
4631 }
4632 }),
4633 embedding: None,
4634 embedding_model: None,
4635 })
4636 .await
4637 .unwrap();
4638
4639 let results = repo
4640 .get_by_cognitive_level_with_perspective(
4641 ns_id,
4642 CognitiveLevel::Explicit,
4643 &perspective,
4644 10,
4645 )
4646 .await
4647 .unwrap();
4648 assert_eq!(results.len(), 2);
4649 let contents: Vec<_> = results.iter().map(|m| m.content.as_str()).collect();
4650 assert!(contents.contains(&"scalar match"));
4651 assert!(contents.contains(&"array match"));
4652 }
4653
4654 #[tokio::test]
4655 async fn test_record_metric_and_latest_metrics_for_namespace() {
4656 let pool = setup_test_db().await;
4657 let ns_id = create_namespace(&pool, "metric-ns").await;
4658 let other_ns = create_namespace(&pool, "metric-other").await;
4659 let repo = MemoryRepository::new(pool);
4660
4661 repo.record_metric(
4662 "cognition.query.total_ms",
4663 12.5,
4664 &serde_json::json!({"namespace_id": ns_id, "stage": "total", "unit": "ms"}),
4665 )
4666 .await
4667 .unwrap();
4668 repo.record_metric(
4669 "cognition.query.total_ms",
4670 18.0,
4671 &serde_json::json!({"namespace_id": other_ns, "stage": "total", "unit": "ms"}),
4672 )
4673 .await
4674 .unwrap();
4675 repo.record_metric(
4676 "cognition.representation.total_ms",
4677 4.0,
4678 &serde_json::json!({"namespace_id": ns_id, "stage": "total", "unit": "ms"}),
4679 )
4680 .await
4681 .unwrap();
4682
4683 let metrics = repo
4684 .latest_metrics_for_namespace(ns_id, Some("cognition."), 10)
4685 .await
4686 .unwrap();
4687
4688 assert_eq!(metrics.len(), 2);
4689 assert!(metrics
4690 .iter()
4691 .all(|metric| metric.labels.contains(&ns_id.to_string())));
4692 assert!(metrics
4693 .iter()
4694 .any(|metric| metric.metric_name == "cognition.query.total_ms"));
4695 assert!(metrics
4696 .iter()
4697 .any(|metric| metric.metric_name == "cognition.representation.total_ms"));
4698 assert!(metrics
4699 .iter()
4700 .all(|metric| { metric.metric_name.starts_with("cognition.") }));
4701 }
4702
4703 #[tokio::test]
4704 async fn test_record_metrics_batch_persists_all_samples() {
4705 let pool = setup_test_db().await;
4706 let ns_id = create_namespace(&pool, "metric-batch").await;
4707 let repo = MemoryRepository::new(pool);
4708
4709 repo.record_metrics_batch(&[
4710 MetricSample {
4711 metric_name: "cognition.query.total_ms".to_string(),
4712 metric_value: 9.5,
4713 labels: serde_json::json!({"namespace_id": ns_id, "stage": "total", "unit": "ms"}),
4714 },
4715 MetricSample {
4716 metric_name: "cognition.query.answer.total_tokens".to_string(),
4717 metric_value: 128.0,
4718 labels: serde_json::json!({"namespace_id": ns_id, "stage": "answer", "unit": "tokens"}),
4719 },
4720 ])
4721 .await
4722 .unwrap();
4723
4724 let metrics = repo
4725 .latest_metrics_for_namespace(ns_id, Some("cognition."), 10)
4726 .await
4727 .unwrap();
4728
4729 assert_eq!(metrics.len(), 2);
4730 }
4731
4732 #[tokio::test]
4735 async fn test_list_jobs_returns_enqueued_jobs() {
4736 let pool = setup_test_db().await;
4737 let ns_id = create_namespace(&pool, "obs-jobs").await;
4738 let repo = MemoryRepository::new(pool);
4739
4740 repo.enqueue_job(EnqueueJobParams {
4741 namespace_id: ns_id,
4742 job_type: "derive",
4743 priority: 10,
4744 perspective: None,
4745 payload: &serde_json::json!({"a": 1}),
4746 })
4747 .await
4748 .unwrap();
4749
4750 repo.enqueue_job(EnqueueJobParams {
4751 namespace_id: ns_id,
4752 job_type: "digest",
4753 priority: 5,
4754 perspective: None,
4755 payload: &serde_json::json!({"b": 2}),
4756 })
4757 .await
4758 .unwrap();
4759
4760 let all = repo.list_jobs(ns_id, None, None, 50, 0).await.unwrap();
4762 assert_eq!(all.len(), 2);
4763
4764 let derive_only = repo
4766 .list_jobs(ns_id, Some("derive"), None, 50, 0)
4767 .await
4768 .unwrap();
4769 assert_eq!(derive_only.len(), 1);
4770 assert_eq!(derive_only[0].job_type, "derive");
4771
4772 let pending = repo
4774 .list_jobs(ns_id, None, Some("pending"), 50, 0)
4775 .await
4776 .unwrap();
4777 assert_eq!(pending.len(), 2);
4778
4779 let digest_pending = repo
4781 .list_jobs(ns_id, Some("digest"), Some("pending"), 50, 0)
4782 .await
4783 .unwrap();
4784 assert_eq!(digest_pending.len(), 1);
4785 }
4786
4787 #[tokio::test]
4788 async fn test_list_jobs_respects_limit_offset() {
4789 let pool = setup_test_db().await;
4790 let ns_id = create_namespace(&pool, "obs-limit").await;
4791 let repo = MemoryRepository::new(pool);
4792
4793 for i in 0..5 {
4794 repo.enqueue_job(EnqueueJobParams {
4795 namespace_id: ns_id,
4796 job_type: "derive",
4797 priority: i,
4798 perspective: None,
4799 payload: &serde_json::json!({"i": i}),
4800 })
4801 .await
4802 .unwrap();
4803 }
4804
4805 let page1 = repo.list_jobs(ns_id, None, None, 2, 0).await.unwrap();
4806 assert_eq!(page1.len(), 2);
4807
4808 let page2 = repo.list_jobs(ns_id, None, None, 2, 2).await.unwrap();
4809 assert_eq!(page2.len(), 2);
4810
4811 let page3 = repo.list_jobs(ns_id, None, None, 2, 4).await.unwrap();
4812 assert_eq!(page3.len(), 1);
4813 }
4814
4815 #[tokio::test]
4816 async fn test_count_jobs_by_status() {
4817 let pool = setup_test_db().await;
4818 let ns_id = create_namespace(&pool, "obs-count").await;
4819 let repo = MemoryRepository::new(pool);
4820
4821 repo.enqueue_job(EnqueueJobParams {
4822 namespace_id: ns_id,
4823 job_type: "derive",
4824 priority: 10,
4825 perspective: None,
4826 payload: &serde_json::json!({}),
4827 })
4828 .await
4829 .unwrap();
4830
4831 repo.enqueue_job(EnqueueJobParams {
4832 namespace_id: ns_id,
4833 job_type: "derive",
4834 priority: 5,
4835 perspective: None,
4836 payload: &serde_json::json!({}),
4837 })
4838 .await
4839 .unwrap();
4840
4841 repo.enqueue_job(EnqueueJobParams {
4842 namespace_id: ns_id,
4843 job_type: "digest",
4844 priority: 10,
4845 perspective: None,
4846 payload: &serde_json::json!({}),
4847 })
4848 .await
4849 .unwrap();
4850
4851 let all_counts = repo.count_jobs_by_status(ns_id, None).await.unwrap();
4853 let total: i64 = all_counts.iter().map(|(_, c)| c).sum();
4854 assert_eq!(total, 3);
4855
4856 let derive_counts = repo
4858 .count_jobs_by_status(ns_id, Some("derive"))
4859 .await
4860 .unwrap();
4861 let derive_total: i64 = derive_counts.iter().map(|(_, c)| c).sum();
4862 assert_eq!(derive_total, 2);
4863 }
4864
4865 #[tokio::test]
4866 async fn test_count_jobs_respects_filters() {
4867 let pool = setup_test_db().await;
4868 let ns_id = create_namespace(&pool, "obs-job-total").await;
4869 let repo = MemoryRepository::new(pool);
4870
4871 repo.enqueue_job(EnqueueJobParams {
4872 namespace_id: ns_id,
4873 job_type: "derive",
4874 priority: 10,
4875 perspective: None,
4876 payload: &serde_json::json!({"index": 1}),
4877 })
4878 .await
4879 .unwrap();
4880 repo.enqueue_job(EnqueueJobParams {
4881 namespace_id: ns_id,
4882 job_type: "derive",
4883 priority: 5,
4884 perspective: None,
4885 payload: &serde_json::json!({"index": 2}),
4886 })
4887 .await
4888 .unwrap();
4889 repo.enqueue_job(EnqueueJobParams {
4890 namespace_id: ns_id,
4891 job_type: "digest",
4892 priority: 1,
4893 perspective: None,
4894 payload: &serde_json::json!({"index": 3}),
4895 })
4896 .await
4897 .unwrap();
4898
4899 assert_eq!(repo.count_jobs(ns_id, None, None).await.unwrap(), 3);
4900 assert_eq!(
4901 repo.count_jobs(ns_id, Some("derive"), None).await.unwrap(),
4902 2
4903 );
4904 assert_eq!(
4905 repo.count_jobs(ns_id, Some("derive"), Some("pending"))
4906 .await
4907 .unwrap(),
4908 2
4909 );
4910 assert_eq!(
4911 repo.count_jobs(ns_id, Some("reflect"), Some("pending"))
4912 .await
4913 .unwrap(),
4914 0
4915 );
4916 }
4917
4918 #[tokio::test]
4919 async fn test_list_digests_and_count() {
4920 let pool = setup_test_db().await;
4921 let ns_id = create_namespace(&pool, "obs-digests").await;
4922 let repo = MemoryRepository::new(pool);
4923
4924 let mem = repo
4926 .store(StoreMemoryParams {
4927 namespace_id: ns_id,
4928 content: "digest content",
4929 category: &Category::Session,
4930 memory_lane_type: None,
4931 labels: &[],
4932 metadata: &serde_json::json!({"cognitive": {"level": "summary_short"}}),
4933 embedding: None,
4934 embedding_model: None,
4935 })
4936 .await
4937 .unwrap();
4938
4939 repo.store_digest(StoreDigestParams {
4940 namespace_id: ns_id,
4941 session_key: "session-1",
4942 digest_kind: "short",
4943 memory_id: mem.id,
4944 start_memory_id: Some(1),
4945 end_memory_id: Some(10),
4946 token_count: 50,
4947 })
4948 .await
4949 .unwrap();
4950
4951 repo.store_digest(StoreDigestParams {
4952 namespace_id: ns_id,
4953 session_key: "session-2",
4954 digest_kind: "long",
4955 memory_id: mem.id,
4956 start_memory_id: Some(11),
4957 end_memory_id: Some(20),
4958 token_count: 100,
4959 })
4960 .await
4961 .unwrap();
4962
4963 let all = repo.list_digests(ns_id, None, 50, 0).await.unwrap();
4965 assert_eq!(all.len(), 2);
4966
4967 let total = repo.count_digests(ns_id, None).await.unwrap();
4968 assert_eq!(total, 2);
4969
4970 let sess1 = repo
4972 .list_digests(ns_id, Some("session-1"), 50, 0)
4973 .await
4974 .unwrap();
4975 assert_eq!(sess1.len(), 1);
4976 assert_eq!(sess1[0].session_key, "session-1");
4977
4978 let sess1_count = repo.count_digests(ns_id, Some("session-1")).await.unwrap();
4979 assert_eq!(sess1_count, 1);
4980
4981 let none = repo
4983 .list_digests(ns_id, Some("session-none"), 50, 0)
4984 .await
4985 .unwrap();
4986 assert!(none.is_empty());
4987
4988 let none_count = repo
4989 .count_digests(ns_id, Some("session-none"))
4990 .await
4991 .unwrap();
4992 assert_eq!(none_count, 0);
4993 }
4994
4995 #[tokio::test]
5000 async fn test_row_to_memory_rejects_malformed_labels() {
5001 let pool = setup_test_db().await;
5002 let ns_id = create_namespace(&pool, "test-agent").await;
5003 let repo = MemoryRepository::new(pool);
5004
5005 let memory = repo
5007 .store(StoreMemoryParams {
5008 namespace_id: ns_id,
5009 content: "corruption test labels",
5010 category: &Category::General,
5011 memory_lane_type: None,
5012 labels: &["valid-label".to_string()],
5013 metadata: &serde_json::Value::Null,
5014 embedding: None,
5015 embedding_model: None,
5016 })
5017 .await
5018 .unwrap();
5019
5020 sqlx::query("UPDATE memories SET labels = 'NOT VALID JSON{{{' WHERE id = ?")
5022 .bind(memory.id)
5023 .execute(repo.pool())
5024 .await
5025 .unwrap();
5026
5027 let err = repo.get_by_id(memory.id).await.unwrap_err();
5028 let msg = err.to_string();
5029 assert!(
5030 msg.contains("corrupted labels JSON"),
5031 "expected labels corruption error, got: {msg}"
5032 );
5033 assert!(msg.contains(&memory.id.to_string()));
5034 }
5035
5036 #[tokio::test]
5039 async fn test_row_to_memory_rejects_malformed_metadata() {
5040 let pool = setup_test_db().await;
5041 let repo = MemoryRepository::new(pool);
5042
5043 let row = MemoryRow {
5046 id: 999,
5047 namespace_id: 1,
5048 content: "test".to_string(),
5049 category: "general".to_string(),
5050 memory_lane_type: None,
5051 labels: "[]".to_string(),
5052 metadata: "[truncated".to_string(), similarity_score: None,
5054 relevance_score: None,
5055 content_embedding: None,
5056 embedding_model: None,
5057 created_at: Utc::now(),
5058 updated_at: None,
5059 last_accessed: None,
5060 is_active: true,
5061 is_archived: false,
5062 access_count: 0,
5063 };
5064
5065 let err = repo.row_to_memory(row).unwrap_err();
5066 let msg = err.to_string();
5067 assert!(
5068 msg.contains("corrupted metadata JSON"),
5069 "expected metadata corruption error, got: {msg}"
5070 );
5071 }
5072
5073 #[tokio::test]
5076 async fn test_row_to_memory_rejects_malformed_embedding() {
5077 let pool = setup_test_db().await;
5078 let ns_id = create_namespace(&pool, "test-agent").await;
5079 let repo = MemoryRepository::new(pool);
5080
5081 let memory = repo
5082 .store(StoreMemoryParams {
5083 namespace_id: ns_id,
5084 content: "corruption test embedding",
5085 category: &Category::General,
5086 memory_lane_type: None,
5087 labels: &[],
5088 metadata: &serde_json::Value::Null,
5089 embedding: Some(&[0.1, 0.2, 0.3]),
5090 embedding_model: Some("test-model"),
5091 })
5092 .await
5093 .unwrap();
5094
5095 sqlx::query("UPDATE memories SET content_embedding = 'not-an-array' WHERE id = ?")
5096 .bind(memory.id)
5097 .execute(repo.pool())
5098 .await
5099 .unwrap();
5100
5101 let err = repo.get_by_id(memory.id).await.unwrap_err();
5102 let msg = err.to_string();
5103 assert!(
5104 msg.contains("corrupted embedding JSON"),
5105 "expected embedding corruption error, got: {msg}"
5106 );
5107 }
5108
5109 #[tokio::test]
5112 async fn test_claim_jobs_rejects_malformed_payload() {
5113 let pool = setup_test_db().await;
5114 let ns_id = create_namespace(&pool, "test-agent").await;
5115 let repo = MemoryRepository::new(pool);
5116
5117 sqlx::query(
5119 r#"
5120 INSERT INTO memory_jobs (namespace_id, job_type, status, priority, payload_json, created_at, updated_at)
5121 VALUES (?, 'derive_memory', 'pending', 100, '{INVALID_JSON}', datetime('now'), datetime('now'))
5122 "#,
5123 )
5124 .bind(ns_id)
5125 .execute(repo.pool())
5126 .await
5127 .unwrap();
5128
5129 let claimed = repo
5131 .claim_jobs(ns_id, "derive_memory", "worker-1", 60, 1)
5132 .await
5133 .unwrap();
5134 assert!(
5135 claimed.is_empty(),
5136 "corrupt payload job should not be returned"
5137 );
5138
5139 let status: String =
5141 sqlx::query_scalar("SELECT status FROM memory_jobs WHERE namespace_id = ?")
5142 .bind(ns_id)
5143 .fetch_one(repo.pool())
5144 .await
5145 .unwrap();
5146 assert_eq!(status, "failed", "corrupt job should be permanently failed");
5147
5148 let last_error: Option<String> =
5149 sqlx::query_scalar("SELECT last_error FROM memory_jobs WHERE namespace_id = ?")
5150 .bind(ns_id)
5151 .fetch_one(repo.pool())
5152 .await
5153 .unwrap();
5154 assert!(
5155 last_error
5156 .unwrap_or_default()
5157 .contains("corrupted payload JSON"),
5158 "last_error should mention payload corruption"
5159 );
5160 }
5161
5162 #[tokio::test]
5165 async fn test_claim_jobs_rejects_malformed_perspective() {
5166 let pool = setup_test_db().await;
5167 let ns_id = create_namespace(&pool, "test-agent").await;
5168 let repo = MemoryRepository::new(pool);
5169
5170 sqlx::query(
5172 r#"
5173 INSERT INTO memory_jobs (namespace_id, job_type, status, priority, perspective_json, payload_json, created_at, updated_at)
5174 VALUES (?, 'derive_memory', 'pending', 100, '{BOGUS}', '{"ok": true}', datetime('now'), datetime('now'))
5175 "#,
5176 )
5177 .bind(ns_id)
5178 .execute(repo.pool())
5179 .await
5180 .unwrap();
5181
5182 let claimed = repo
5184 .claim_jobs(ns_id, "derive_memory", "worker-1", 60, 1)
5185 .await
5186 .unwrap();
5187 assert!(
5188 claimed.is_empty(),
5189 "corrupt perspective job should not be returned"
5190 );
5191
5192 let status: String =
5194 sqlx::query_scalar("SELECT status FROM memory_jobs WHERE namespace_id = ?")
5195 .bind(ns_id)
5196 .fetch_one(repo.pool())
5197 .await
5198 .unwrap();
5199 assert_eq!(status, "failed", "corrupt job should be permanently failed");
5200
5201 let last_error: Option<String> =
5202 sqlx::query_scalar("SELECT last_error FROM memory_jobs WHERE namespace_id = ?")
5203 .bind(ns_id)
5204 .fetch_one(repo.pool())
5205 .await
5206 .unwrap();
5207 assert!(
5208 last_error
5209 .unwrap_or_default()
5210 .contains("corrupted perspective JSON"),
5211 "last_error should mention perspective corruption"
5212 );
5213 }
5214
5215 #[tokio::test]
5218 async fn test_claim_jobs_skips_corrupt_returns_valid() {
5219 let pool = setup_test_db().await;
5220 let ns_id = create_namespace(&pool, "test-agent").await;
5221 let repo = MemoryRepository::new(pool);
5222
5223 let p1 = serde_json::json!({"memory_id": 1});
5225 repo.enqueue_job(EnqueueJobParams {
5226 namespace_id: ns_id,
5227 job_type: "derive_memory",
5228 priority: 100,
5229 perspective: None,
5230 payload: &p1,
5231 })
5232 .await
5233 .unwrap();
5234 let p2 = serde_json::json!({"memory_id": 2});
5235 repo.enqueue_job(EnqueueJobParams {
5236 namespace_id: ns_id,
5237 job_type: "derive_memory",
5238 priority: 50,
5239 perspective: None,
5240 payload: &p2,
5241 })
5242 .await
5243 .unwrap();
5244
5245 sqlx::query(
5247 r#"
5248 INSERT INTO memory_jobs (namespace_id, job_type, status, priority, payload_json, created_at, updated_at)
5249 VALUES (?, 'derive_memory', 'pending', 200, '{BROKEN}', datetime('now'), datetime('now'))
5250 "#,
5251 )
5252 .bind(ns_id)
5253 .execute(repo.pool())
5254 .await
5255 .unwrap();
5256
5257 let claimed = repo
5259 .claim_jobs(ns_id, "derive_memory", "worker-1", 60, 10)
5260 .await
5261 .unwrap();
5262 assert_eq!(
5263 claimed.len(),
5264 2,
5265 "should return 2 valid jobs, skipping the corrupt one"
5266 );
5267
5268 let failed_count: i64 = sqlx::query_scalar(
5270 "SELECT COUNT(*) FROM memory_jobs WHERE namespace_id = ? AND status = 'failed'",
5271 )
5272 .bind(ns_id)
5273 .fetch_one(repo.pool())
5274 .await
5275 .unwrap();
5276 assert_eq!(failed_count, 1, "corrupt job should be permanently failed");
5277 }
5278
5279 #[tokio::test]
5283 async fn test_purge_completed_jobs_removes_old_keeps_recent() {
5284 let pool = setup_test_db().await;
5285 let ns_id = create_namespace(&pool, "purge-test").await;
5286 let repo = MemoryRepository::new(pool);
5287
5288 repo.enqueue_job(EnqueueJobParams {
5290 namespace_id: ns_id,
5291 job_type: "derive_memory",
5292 priority: 10,
5293 perspective: None,
5294 payload: &serde_json::json!({"old": true}),
5295 })
5296 .await
5297 .unwrap();
5298
5299 let claimed = repo
5300 .claim_jobs(ns_id, "derive_memory", "w", 60, 10)
5301 .await
5302 .unwrap();
5303 assert_eq!(claimed.len(), 1);
5304 let old_job_id = claimed[0].row.id;
5305
5306 repo.complete_job(&claimed[0]).await.unwrap();
5307
5308 sqlx::query("UPDATE memory_jobs SET updated_at = datetime('now', '-30 days') WHERE id = ?")
5310 .bind(old_job_id)
5311 .execute(repo.pool())
5312 .await
5313 .unwrap();
5314
5315 repo.enqueue_job(EnqueueJobParams {
5317 namespace_id: ns_id,
5318 job_type: "derive_memory",
5319 priority: 10,
5320 perspective: None,
5321 payload: &serde_json::json!({"new": true}),
5322 })
5323 .await
5324 .unwrap();
5325
5326 let claimed2 = repo
5327 .claim_jobs(ns_id, "derive_memory", "w", 60, 10)
5328 .await
5329 .unwrap();
5330 assert_eq!(claimed2.len(), 1);
5331 repo.complete_job(&claimed2[0]).await.unwrap();
5332
5333 let cutoff = Utc::now() - chrono::Duration::days(7);
5335 let deleted = repo.purge_completed_jobs(cutoff).await.unwrap();
5336 assert_eq!(deleted, 1);
5337
5338 let remaining = repo.list_jobs(ns_id, None, None, 50, 0).await.unwrap();
5340 assert_eq!(remaining.len(), 1);
5341 assert_eq!(remaining[0].id, claimed2[0].row.id);
5342 }
5343
5344 #[tokio::test]
5346 async fn test_purge_permanently_failed_jobs_removes_old_keeps_recent() {
5347 let pool = setup_test_db().await;
5348 let ns_id = create_namespace(&pool, "purge-failed").await;
5349 let repo = MemoryRepository::new(pool);
5350
5351 repo.enqueue_job(EnqueueJobParams {
5353 namespace_id: ns_id,
5354 job_type: "derive_memory",
5355 priority: 10,
5356 perspective: None,
5357 payload: &serde_json::json!({"fail_me": true}),
5358 })
5359 .await
5360 .unwrap();
5361
5362 for _ in 0..5 {
5363 let claimed = repo
5364 .claim_jobs(ns_id, "derive_memory", "w", 60, 10)
5365 .await
5366 .unwrap();
5367 assert_eq!(claimed.len(), 1);
5368 repo.fail_job(&claimed[0], "persistent error")
5369 .await
5370 .unwrap();
5371 }
5372
5373 sqlx::query(
5375 "UPDATE memory_jobs SET updated_at = datetime('now', '-30 days') WHERE status = ?",
5376 )
5377 .bind(memory_job_status::FAILED)
5378 .execute(repo.pool())
5379 .await
5380 .unwrap();
5381
5382 repo.enqueue_job(EnqueueJobParams {
5384 namespace_id: ns_id,
5385 job_type: "derive_memory",
5386 priority: 10,
5387 perspective: None,
5388 payload: &serde_json::json!({"retry_me": true}),
5389 })
5390 .await
5391 .unwrap();
5392
5393 for _ in 0..2 {
5394 let claimed = repo
5395 .claim_jobs(ns_id, "derive_memory", "w", 60, 10)
5396 .await
5397 .unwrap();
5398 assert_eq!(claimed.len(), 1);
5399 repo.fail_job(&claimed[0], "transient error").await.unwrap();
5400 }
5401
5402 let cutoff = Utc::now() - chrono::Duration::days(7);
5404 let deleted = repo.purge_permanently_failed_jobs(cutoff).await.unwrap();
5405 assert_eq!(deleted, 1);
5406
5407 let remaining = repo.list_jobs(ns_id, None, None, 50, 0).await.unwrap();
5409 assert_eq!(remaining.len(), 1);
5410 assert_eq!(remaining[0].status, memory_job_status::PENDING);
5411 }
5412
5413 #[tokio::test]
5415 async fn test_active_leasing_works_after_purge() {
5416 let pool = setup_test_db().await;
5417 let ns_id = create_namespace(&pool, "purge-lease").await;
5418 let repo = MemoryRepository::new(pool);
5419
5420 repo.enqueue_job(EnqueueJobParams {
5422 namespace_id: ns_id,
5423 job_type: "derive_memory",
5424 priority: 10,
5425 perspective: None,
5426 payload: &serde_json::json!({"old": true}),
5427 })
5428 .await
5429 .unwrap();
5430
5431 let claimed = repo
5432 .claim_jobs(ns_id, "derive_memory", "w", 60, 10)
5433 .await
5434 .unwrap();
5435 repo.complete_job(&claimed[0]).await.unwrap();
5436
5437 sqlx::query("UPDATE memory_jobs SET updated_at = datetime('now', '-30 days') WHERE id = ?")
5438 .bind(claimed[0].row.id)
5439 .execute(repo.pool())
5440 .await
5441 .unwrap();
5442
5443 let cutoff = Utc::now() - chrono::Duration::days(7);
5445 repo.purge_completed_jobs(cutoff).await.unwrap();
5446
5447 repo.enqueue_job(EnqueueJobParams {
5449 namespace_id: ns_id,
5450 job_type: "derive_memory",
5451 priority: 20,
5452 perspective: None,
5453 payload: &serde_json::json!({"fresh": true}),
5454 })
5455 .await
5456 .unwrap();
5457
5458 let fresh_claimed = repo
5459 .claim_jobs(ns_id, "derive_memory", "worker-2", 120, 10)
5460 .await
5461 .unwrap();
5462 assert_eq!(fresh_claimed.len(), 1);
5463 assert_eq!(fresh_claimed[0].row.status, "running");
5464 assert_eq!(fresh_claimed[0].payload["fresh"], true);
5465
5466 repo.complete_job(&fresh_claimed[0]).await.unwrap();
5468
5469 let empty = repo
5471 .claim_jobs(ns_id, "derive_memory", "worker-3", 60, 10)
5472 .await
5473 .unwrap();
5474 assert!(empty.is_empty());
5475 }
5476}