1use std::collections::{HashMap, HashSet};
4
5use crate::models::{
6 memory_job_status, AgentNamespaceRow, ClaimedMemoryJob, EnqueueJobParams, MemoryEvidenceRow,
7 MemoryJobRow, MemoryLineageEntry, MemoryRow, ProcessedFileRow, SessionDigestRow,
8 SystemMetricRow,
9};
10use crate::{db_error, Result};
11use chrono::{DateTime, Utc};
12use nexus_core::{
13 AgentNamespace, CognitiveLevel, Memory, MemoryCategory, MemoryLaneType, PerspectiveKey,
14};
15use sqlx::SqlitePool;
16
17type Category = MemoryCategory;
19
20pub struct StoreMemoryParams<'a> {
22 pub namespace_id: i64,
23 pub content: &'a str,
24 pub category: &'a Category,
25 pub memory_lane_type: Option<&'a MemoryLaneType>,
26 pub labels: &'a [String],
27 pub metadata: &'a serde_json::Value,
28 pub embedding: Option<&'a [f32]>,
29 pub embedding_model: Option<&'a str>,
30}
31
32pub struct StoreMemoryWithLineageParams<'a> {
34 pub store: StoreMemoryParams<'a>,
35 pub source_memory_ids: &'a [i64],
36 pub evidence_role: &'a str,
37}
38
39pub struct StoreDigestParams<'a> {
41 pub namespace_id: i64,
42 pub session_key: &'a str,
43 pub digest_kind: &'a str,
44 pub memory_id: i64,
45 pub start_memory_id: Option<i64>,
46 pub end_memory_id: Option<i64>,
47 pub token_count: usize,
48}
49
50pub struct SessionDigestRollover {
51 pub last_digest_end_memory_id: Option<i64>,
52 pub new_memory_count: i64,
53 pub estimated_new_tokens: i64,
54}
55
56pub struct ListMemoryFilters<'a> {
57 pub category: Option<&'a str>,
58 pub since: Option<DateTime<Utc>>,
59 pub until: Option<DateTime<Utc>>,
60 pub content_like: Option<&'a str>,
61 pub include_raw: bool,
62 pub limit: i64,
63 pub offset: i64,
64}
65
66pub struct WorkingSetParams<'a> {
68 pub namespace_id: i64,
70 pub perspective: Option<&'a PerspectiveKey>,
73 pub max_items: usize,
75 pub include_raw: bool,
78}
79
80pub struct SemanticCandidateParams<'a> {
82 pub namespace_id: i64,
84 pub perspective: Option<&'a PerspectiveKey>,
86 pub limit: i64,
88 pub include_raw: bool,
90}
91
92#[derive(Debug, Clone)]
94pub struct MetricSample {
95 pub metric_name: String,
96 pub metric_value: f64,
97 pub labels: serde_json::Value,
98}
99
100const MAX_JOB_ATTEMPTS: i64 = 5;
102const RAW_ACTIVITY_FILTER_SQL: &str =
103 "labels NOT LIKE '%raw-activity%' AND json_extract(COALESCE(metadata, '{}'), '$.raw_activity') IS NULL";
104
105const METADATA: &str = "COALESCE(metadata, '{}')";
121
122const SESSION_KEY_FILTER: &str =
127 "(json_extract(METADATA, '$.cognitive.session_key') = ? \
128 OR EXISTS (SELECT 1 FROM json_each(COALESCE(json_extract(metadata, '$.cognitive.session_keys'), '[]')) WHERE value = ?))";
129
130const PERSPECTIVE_IDENTITY_FILTER: &str = "json_extract(METADATA, '$.cognitive.observer') = ? \
134 AND json_extract(METADATA, '$.cognitive.subject') = ?";
135
136const COGNITIVE_LEVEL_EXPR: &str = "json_extract(METADATA, '$.cognitive.level')";
138
139fn perspective_where_clause(perspective: &PerspectiveKey) -> String {
146 if perspective.session_key.is_some() {
147 format!(
148 "{} AND {}",
149 PERSPECTIVE_IDENTITY_FILTER.replace("METADATA", METADATA),
150 SESSION_KEY_FILTER.replace("METADATA", METADATA),
151 )
152 } else {
153 PERSPECTIVE_IDENTITY_FILTER.replace("METADATA", METADATA)
154 }
155}
156
157fn bind_perspective(perspective: &PerspectiveKey) -> Vec<&str> {
160 let mut vals = vec![perspective.observer.as_str(), perspective.subject.as_str()];
161 if let Some(ref sk) = perspective.session_key {
162 vals.push(sk.as_str());
163 vals.push(sk.as_str());
164 }
165 vals
166}
167
168pub struct MemoryRepository {
170 pool: SqlitePool,
171}
172
173impl MemoryRepository {
174 pub fn new(pool: SqlitePool) -> Self {
175 Self { pool }
176 }
177
178 pub fn pool(&self) -> &SqlitePool {
179 &self.pool
180 }
181
182 pub async fn store(&self, params: StoreMemoryParams<'_>) -> Result<Memory> {
184 let labels_json = serde_json::to_string(params.labels)?;
185 let metadata_json = serde_json::to_string(params.metadata)?;
186 let embedding_json = params.embedding.map(serde_json::to_string).transpose()?;
187
188 let result = sqlx::query(
189 r#"
190 INSERT INTO memories (
191 namespace_id, content, category, memory_lane_type, labels, metadata,
192 content_embedding, embedding_model, created_at, is_active, access_count
193 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 1, 0)
194 "#,
195 )
196 .bind(params.namespace_id)
197 .bind(params.content)
198 .bind(params.category.to_string())
199 .bind(params.memory_lane_type.map(|t| t.to_string()))
200 .bind(&labels_json)
201 .bind(&metadata_json)
202 .bind(&embedding_json)
203 .bind(params.embedding_model)
204 .bind(Utc::now())
205 .execute(&self.pool)
206 .await
207 .map_err(db_error)?;
208
209 let id = result.last_insert_rowid();
210
211 if id == 0 {
216 let row: Option<MemoryRow> = sqlx::query_as(
218 "SELECT * FROM memories WHERE namespace_id = ? AND LOWER(TRIM(content)) = LOWER(TRIM(?)) AND is_active = 1 ORDER BY created_at DESC LIMIT 1"
219 )
220 .bind(params.namespace_id)
221 .bind(params.content)
222 .fetch_optional(&self.pool)
223 .await
224 .map_err(db_error)?;
225
226 if let Some(existing) = row {
227 self.merge_duplicate_memory_context(existing.id, params)
228 .await?;
229 return self.get_by_id(existing.id).await?.ok_or_else(|| {
230 nexus_core::NexusError::Storage(format!(
231 "Duplicate merged row {} could not be reloaded",
232 existing.id
233 ))
234 });
235 }
236
237 tracing::warn!(
240 namespace_id = params.namespace_id,
241 content_length = params.content.len(),
242 "Insert returned id 0 but no matching duplicate found - treating as successful insert"
243 );
244
245 return self
248 .get_by_content(params.namespace_id, params.content)
249 .await;
250 }
251
252 self.get_by_id(id).await?.ok_or_else(|| {
253 nexus_core::NexusError::Storage(format!("Failed to retrieve memory with id {}", id))
254 })
255 }
256
257 async fn merge_duplicate_memory_context(
258 &self,
259 existing_id: i64,
260 params: StoreMemoryParams<'_>,
261 ) -> Result<()> {
262 let current = self.get_by_id(existing_id).await?.ok_or_else(|| {
263 nexus_core::NexusError::Storage(format!(
264 "Failed to load duplicate-merged memory {}",
265 existing_id
266 ))
267 })?;
268
269 let merged_labels = merge_labels(¤t.labels, params.labels);
270 let merged_metadata = merge_duplicate_metadata(¤t.metadata, params.metadata);
271 let labels_json = serde_json::to_string(&merged_labels)?;
272 let metadata_json = serde_json::to_string(&merged_metadata)?;
273
274 sqlx::query(
275 r#"
276 UPDATE memories
277 SET labels = ?, metadata = ?, updated_at = ?
278 WHERE id = ?
279 "#,
280 )
281 .bind(&labels_json)
282 .bind(&metadata_json)
283 .bind(Utc::now())
284 .bind(existing_id)
285 .execute(&self.pool)
286 .await
287 .map_err(db_error)?;
288
289 Ok(())
290 }
291
292 pub async fn store_with_lineage(
294 &self,
295 params: StoreMemoryWithLineageParams<'_>,
296 ) -> Result<Memory> {
297 let mut tx = self.pool.begin().await.map_err(db_error)?;
298 let memory_id = insert_memory_tx(&mut tx, ¶ms.store).await?;
299 for &source_id in params.source_memory_ids {
300 insert_evidence_tx(&mut tx, memory_id, source_id, params.evidence_role).await?;
301 }
302 tx.commit().await.map_err(db_error)?;
303
304 self.get_by_id(memory_id).await?.ok_or_else(|| {
305 nexus_core::NexusError::Storage(format!(
306 "Failed to retrieve memory with id {} after lineage store",
307 memory_id
308 ))
309 })
310 }
311
312 pub async fn enqueue_job(&self, params: EnqueueJobParams<'_>) -> Result<i64> {
314 let perspective_json = params.perspective.map(serde_json::to_string).transpose()?;
315 let payload_json = serde_json::to_string(params.payload)?;
316
317 let id: i64 = sqlx::query_scalar(
318 r#"
319 INSERT INTO memory_jobs (namespace_id, job_type, status, priority, perspective_json, payload_json, created_at, updated_at)
320 VALUES (?, ?, 'pending', ?, ?, ?, datetime('now'), datetime('now'))
321 RETURNING id
322 "#,
323 )
324 .bind(params.namespace_id)
325 .bind(params.job_type)
326 .bind(params.priority)
327 .bind(&perspective_json)
328 .bind(&payload_json)
329 .fetch_one(&self.pool)
330 .await
331 .map_err(db_error)?;
332
333 Ok(id)
334 }
335
336 pub async fn claim_jobs(
341 &self,
342 namespace_id: i64,
343 job_type: &str,
344 lease_owner: &str,
345 lease_ttl_secs: u64,
346 limit: i64,
347 ) -> Result<Vec<ClaimedMemoryJob>> {
348 let claim_token = new_claim_token(lease_owner);
349 let rows: Vec<MemoryJobRow> = sqlx::query_as::<_, MemoryJobRow>(
353 r#"
354 WITH candidates AS (
355 SELECT id
356 FROM memory_jobs
357 WHERE namespace_id = ? AND job_type = ? AND (
358 status = ?
359 OR (status = ? AND lease_expires_at IS NOT NULL AND lease_expires_at < datetime('now'))
360 )
361 ORDER BY priority DESC, created_at ASC
362 LIMIT ?
363 )
364 UPDATE memory_jobs
365 SET status = ?,
366 lease_owner = ?,
367 claim_token = ?,
368 lease_expires_at = datetime('now', '+' || ? || ' seconds'),
369 attempts = attempts + 1,
370 updated_at = datetime('now')
371 WHERE id IN (SELECT id FROM candidates)
372 RETURNING *
373 "#,
374 )
375 .bind(namespace_id)
376 .bind(job_type)
377 .bind(memory_job_status::PENDING)
378 .bind(memory_job_status::RUNNING)
379 .bind(limit)
380 .bind(memory_job_status::RUNNING)
381 .bind(lease_owner)
382 .bind(&claim_token)
383 .bind(lease_ttl_secs as i64)
384 .fetch_all(&self.pool)
385 .await
386 .map_err(db_error)?;
387
388 let mut rows = rows;
389 rows.sort_by(|left, right| {
390 right
391 .priority
392 .cmp(&left.priority)
393 .then_with(|| left.created_at.cmp(&right.created_at))
394 });
395
396 let mut claimed = Vec::with_capacity(rows.len());
397 for row in rows {
398 let perspective = match row.perspective_json.as_deref() {
399 Some(s) => match serde_json::from_str(s) {
400 Ok(p) => Some(p),
401 Err(e) => {
402 tracing::warn!(
403 job_id = row.id,
404 error = %e,
405 "corrupted perspective JSON, permanently failing job"
406 );
407 let _ = self
408 .permanently_fail_job(
409 row.id,
410 &row.lease_owner,
411 &row.claim_token,
412 &format!("corrupted perspective JSON: {e}"),
413 )
414 .await;
415 continue;
416 }
417 },
418 None => None,
419 };
420 let payload: serde_json::Value = match serde_json::from_str(&row.payload_json) {
421 Ok(p) => p,
422 Err(e) => {
423 tracing::warn!(
424 job_id = row.id,
425 error = %e,
426 "corrupted payload JSON, permanently failing job"
427 );
428 let _ = self
429 .permanently_fail_job(
430 row.id,
431 &row.lease_owner,
432 &row.claim_token,
433 &format!("corrupted payload JSON: {e}"),
434 )
435 .await;
436 continue;
437 }
438 };
439 claimed.push(ClaimedMemoryJob {
440 row,
441 perspective,
442 payload,
443 });
444 }
445
446 Ok(claimed)
447 }
448
449 pub async fn complete_job(&self, job: &ClaimedMemoryJob) -> Result<()> {
451 let result = sqlx::query(
452 r#"
453 UPDATE memory_jobs
454 SET status = ?, lease_owner = NULL, claim_token = NULL, lease_expires_at = NULL, updated_at = datetime('now')
455 WHERE id = ?
456 AND lease_owner = ?
457 AND claim_token = ?
458 "#,
459 )
460 .bind(memory_job_status::COMPLETED)
461 .bind(job.row.id)
462 .bind(job.row.lease_owner.as_deref())
463 .bind(job.row.claim_token.as_deref())
464 .execute(&self.pool)
465 .await
466 .map_err(db_error)?;
467
468 if result.rows_affected() == 0 {
469 return Err(nexus_core::NexusError::Storage(format!(
470 "Memory job {} completion lost lease ownership",
471 job.row.id
472 )));
473 }
474
475 Ok(())
476 }
477
478 pub async fn fail_job(&self, job: &ClaimedMemoryJob, error: &str) -> Result<()> {
481 let row: Option<MemoryJobRow> = sqlx::query_as("SELECT * FROM memory_jobs WHERE id = ?")
482 .bind(job.row.id)
483 .fetch_optional(&self.pool)
484 .await
485 .map_err(db_error)?;
486
487 let row = row.ok_or_else(|| {
488 nexus_core::NexusError::Storage(format!("Memory job {} not found", job.row.id))
489 })?;
490
491 let lease_matches =
492 row.lease_owner == job.row.lease_owner && row.claim_token == job.row.claim_token;
493 if !lease_matches {
494 return Err(nexus_core::NexusError::Storage(format!(
495 "Memory job {} failure lost lease ownership",
496 job.row.id
497 )));
498 }
499
500 if row.attempts >= MAX_JOB_ATTEMPTS {
501 let result = sqlx::query(
503 r#"
504 UPDATE memory_jobs
505 SET status = ?, last_error = ?, lease_owner = NULL, claim_token = NULL, lease_expires_at = NULL, updated_at = datetime('now')
506 WHERE id = ? AND lease_owner = ? AND claim_token = ?
507 "#,
508 )
509 .bind(memory_job_status::FAILED)
510 .bind(error)
511 .bind(job.row.id)
512 .bind(job.row.lease_owner.as_deref())
513 .bind(job.row.claim_token.as_deref())
514 .execute(&self.pool)
515 .await
516 .map_err(db_error)?;
517 if result.rows_affected() == 0 {
518 return Err(nexus_core::NexusError::Storage(format!(
519 "Memory job {} failure lost lease ownership",
520 job.row.id
521 )));
522 }
523 } else {
524 let result = sqlx::query(
526 r#"
527 UPDATE memory_jobs
528 SET status = ?, last_error = ?, lease_owner = NULL, claim_token = NULL, lease_expires_at = NULL, updated_at = datetime('now')
529 WHERE id = ? AND lease_owner = ? AND claim_token = ?
530 "#,
531 )
532 .bind(memory_job_status::PENDING)
533 .bind(error)
534 .bind(job.row.id)
535 .bind(job.row.lease_owner.as_deref())
536 .bind(job.row.claim_token.as_deref())
537 .execute(&self.pool)
538 .await
539 .map_err(db_error)?;
540 if result.rows_affected() == 0 {
541 return Err(nexus_core::NexusError::Storage(format!(
542 "Memory job {} failure lost lease ownership",
543 job.row.id
544 )));
545 }
546 }
547
548 Ok(())
549 }
550
551 async fn permanently_fail_job(
555 &self,
556 job_id: i64,
557 lease_owner: &Option<String>,
558 claim_token: &Option<String>,
559 error: &str,
560 ) -> Result<()> {
561 sqlx::query(
562 r#"
563 UPDATE memory_jobs
564 SET status = ?, last_error = ?, lease_owner = NULL, claim_token = NULL,
565 lease_expires_at = NULL, updated_at = datetime('now')
566 WHERE id = ? AND lease_owner = ? AND claim_token = ?
567 "#,
568 )
569 .bind(memory_job_status::FAILED)
570 .bind(error)
571 .bind(job_id)
572 .bind(lease_owner.as_deref())
573 .bind(claim_token.as_deref())
574 .execute(&self.pool)
575 .await
576 .map_err(db_error)?;
577 Ok(())
578 }
579
580 pub async fn get_most_reinforced_by_namespace(
581 &self,
582 namespace_id: i64,
583 limit: i64,
584 include_raw: bool,
585 ) -> Result<Vec<Memory>> {
586 let noise_sql = if include_raw {
587 String::new()
588 } else {
589 format!("AND {}", RAW_ACTIVITY_FILTER_SQL)
590 };
591 let rows = sqlx::query_as::<_, MemoryRow>(&format!(
592 r#"
593 SELECT * FROM memories
594 WHERE namespace_id = ?
595 AND is_active = 1
596 AND json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.level') = ?
597 {noise_sql}
598 ORDER BY COALESCE(json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.times_reinforced'), 0) DESC,
599 created_at DESC
600 LIMIT ?
601 "#
602 ))
603 .bind(namespace_id)
604 .bind(CognitiveLevel::Derived.as_str())
605 .bind(limit)
606 .fetch_all(&self.pool)
607 .await
608 .map_err(db_error)?;
609
610 rows.into_iter()
611 .map(|row| self.row_to_memory(row))
612 .collect()
613 }
614
615 pub async fn get_contradictions_by_namespace(
616 &self,
617 namespace_id: i64,
618 limit: i64,
619 include_raw: bool,
620 ) -> Result<Vec<Memory>> {
621 let noise_sql = if include_raw {
622 String::new()
623 } else {
624 format!("AND {}", RAW_ACTIVITY_FILTER_SQL)
625 };
626 let rows = sqlx::query_as::<_, MemoryRow>(&format!(
627 r#"
628 SELECT * FROM memories
629 WHERE namespace_id = ?
630 AND is_active = 1
631 AND json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.level') = ?
632 {noise_sql}
633 ORDER BY COALESCE(json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.times_contradicted'), 0) DESC,
634 created_at DESC
635 LIMIT ?
636 "#
637 ))
638 .bind(namespace_id)
639 .bind(CognitiveLevel::Contradiction.as_str())
640 .bind(limit)
641 .fetch_all(&self.pool)
642 .await
643 .map_err(db_error)?;
644
645 rows.into_iter()
646 .map(|row| self.row_to_memory(row))
647 .collect()
648 }
649
650 pub async fn list_by_session_key(
651 &self,
652 namespace_id: i64,
653 session_key: &str,
654 limit: i64,
655 include_raw: bool,
656 ) -> Result<Vec<Memory>> {
657 let noise_sql = if include_raw {
658 String::new()
659 } else {
660 format!("AND {}", RAW_ACTIVITY_FILTER_SQL)
661 };
662 let session_filter = SESSION_KEY_FILTER.replace("METADATA", METADATA);
663 let rows = sqlx::query_as::<_, MemoryRow>(&format!(
664 r#"
665 SELECT * FROM memories
666 WHERE namespace_id = ?
667 AND is_active = 1
668 AND {session_filter}
669 {noise_sql}
670 ORDER BY created_at DESC
671 LIMIT ?
672 "#
673 ))
674 .bind(namespace_id)
675 .bind(session_key)
676 .bind(session_key)
677 .bind(limit)
678 .fetch_all(&self.pool)
679 .await
680 .map_err(db_error)?;
681
682 rows.into_iter()
683 .map(|row| self.row_to_memory(row))
684 .collect()
685 }
686
687 pub async fn store_digest(&self, params: StoreDigestParams<'_>) -> Result<i64> {
693 let id: i64 = sqlx::query_scalar(
694 r#"
695 INSERT INTO session_digests (namespace_id, session_key, digest_kind, memory_id, start_memory_id, end_memory_id, token_count, created_at)
696 VALUES (?, ?, ?, ?, ?, ?, ?, datetime('now'))
697 ON CONFLICT(namespace_id, session_key, digest_kind, end_memory_id) DO UPDATE SET
698 memory_id = excluded.memory_id,
699 token_count = excluded.token_count
700 RETURNING id
701 "#,
702 )
703 .bind(params.namespace_id)
704 .bind(params.session_key)
705 .bind(params.digest_kind)
706 .bind(params.memory_id)
707 .bind(params.start_memory_id)
708 .bind(params.end_memory_id)
709 .bind(params.token_count as i64)
710 .fetch_one(&self.pool)
711 .await
712 .map_err(db_error)?;
713
714 Ok(id)
715 }
716
717 pub async fn latest_digest_for_session(
720 &self,
721 namespace_id: i64,
722 session_key: &str,
723 digest_kind: &str,
724 ) -> Result<Option<Memory>> {
725 let digest: Option<SessionDigestRow> = sqlx::query_as::<_, SessionDigestRow>(
726 "SELECT * FROM session_digests WHERE namespace_id = ? AND session_key = ? AND digest_kind = ? ORDER BY created_at DESC LIMIT 1",
727 )
728 .bind(namespace_id)
729 .bind(session_key)
730 .bind(digest_kind)
731 .fetch_optional(&self.pool)
732 .await
733 .map_err(db_error)?;
734
735 match digest {
736 Some(d) => self.get_by_id(d.memory_id).await,
737 None => Ok(None),
738 }
739 }
740
741 pub async fn latest_digest_for_namespace(
745 &self,
746 namespace_id: i64,
747 digest_kind: &str,
748 ) -> Result<Option<Memory>> {
749 let digest: Option<SessionDigestRow> = sqlx::query_as::<_, SessionDigestRow>(
750 "SELECT * FROM session_digests WHERE namespace_id = ? AND digest_kind = ? ORDER BY created_at DESC LIMIT 1",
751 )
752 .bind(namespace_id)
753 .bind(digest_kind)
754 .fetch_optional(&self.pool)
755 .await
756 .map_err(db_error)?;
757
758 match digest {
759 Some(d) => self.get_by_id(d.memory_id).await,
760 None => Ok(None),
761 }
762 }
763
764 pub async fn session_digest_rollover(
767 &self,
768 namespace_id: i64,
769 session_key: &str,
770 ) -> Result<SessionDigestRollover> {
771 let last_digest_end_memory_id: Option<i64> = sqlx::query_scalar(
772 "SELECT MAX(end_memory_id) FROM session_digests WHERE namespace_id = ? AND session_key = ?",
773 )
774 .bind(namespace_id)
775 .bind(session_key)
776 .fetch_one(&self.pool)
777 .await
778 .map_err(db_error)?;
779
780 let (new_memory_count, estimated_new_tokens): (i64, i64) = sqlx::query_as(&format!(
781 r#"
782 SELECT
783 COUNT(*) as new_memory_count,
784 COALESCE(SUM((LENGTH(content) + 3) / 4), 0) as estimated_new_tokens
785 FROM memories
786 WHERE namespace_id = ?
787 AND is_active = 1
788 AND id > ?
789 AND (
790 json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.session_key') = ?
791 OR EXISTS (
792 SELECT 1
793 FROM json_each(COALESCE(json_extract(metadata, '$.cognitive.session_keys'), '[]'))
794 WHERE value = ?
795 )
796 )
797 AND {RAW_ACTIVITY_FILTER_SQL}
798 AND COALESCE(json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.level'), '') NOT IN ('raw', 'summary_short', 'summary_long')
799 "#,
800 ))
801 .bind(namespace_id)
802 .bind(last_digest_end_memory_id.unwrap_or(0))
803 .bind(session_key)
804 .bind(session_key)
805 .fetch_one(&self.pool)
806 .await
807 .map_err(db_error)?;
808
809 Ok(SessionDigestRollover {
810 last_digest_end_memory_id,
811 new_memory_count,
812 estimated_new_tokens,
813 })
814 }
815
816 pub async fn get_recent_by_perspective(
821 &self,
822 namespace_id: i64,
823 perspective: &PerspectiveKey,
824 limit: i64,
825 ) -> Result<Vec<Memory>> {
826 self.get_recent_by_perspective_opts(namespace_id, perspective, limit, false)
827 .await
828 }
829
830 pub async fn get_recent_by_perspective_opts(
833 &self,
834 namespace_id: i64,
835 perspective: &PerspectiveKey,
836 limit: i64,
837 include_raw: bool,
838 ) -> Result<Vec<Memory>> {
839 let noise_sql = if include_raw {
840 String::new()
841 } else {
842 format!("AND {}", RAW_ACTIVITY_FILTER_SQL)
843 };
844
845 let perspective_sql = perspective_where_clause(perspective);
846 let sql = format!(
847 r#"
848 SELECT * FROM memories
849 WHERE namespace_id = ?
850 AND is_active = 1
851 AND {perspective_sql}
852 {noise_sql}
853 ORDER BY created_at DESC
854 LIMIT ?
855 "#
856 );
857 let mut query = sqlx::query_as::<_, MemoryRow>(&sql).bind(namespace_id);
858
859 for val in bind_perspective(perspective) {
860 query = query.bind(val);
861 }
862
863 let rows = query
864 .bind(limit)
865 .fetch_all(&self.pool)
866 .await
867 .map_err(db_error)?;
868
869 rows.into_iter()
870 .map(|row| self.row_to_memory(row))
871 .collect()
872 }
873
874 pub async fn get_by_cognitive_level(
876 &self,
877 namespace_id: i64,
878 level: CognitiveLevel,
879 limit: i64,
880 ) -> Result<Vec<Memory>> {
881 let rows = sqlx::query_as::<_, MemoryRow>(&format!(
882 r#"
883 SELECT * FROM memories
884 WHERE namespace_id = ?
885 AND is_active = 1
886 AND {} = ?
887 ORDER BY created_at DESC
888 LIMIT ?
889 "#,
890 COGNITIVE_LEVEL_EXPR,
891 ))
892 .bind(namespace_id)
893 .bind(level.as_str())
894 .bind(limit)
895 .fetch_all(&self.pool)
896 .await
897 .map_err(db_error)?;
898
899 rows.into_iter()
900 .map(|row| self.row_to_memory(row))
901 .collect()
902 }
903
904 pub async fn get_by_cognitive_level_with_perspective(
910 &self,
911 namespace_id: i64,
912 level: CognitiveLevel,
913 perspective: &PerspectiveKey,
914 limit: i64,
915 ) -> Result<Vec<Memory>> {
916 let perspective_sql = perspective_where_clause(perspective);
917 let sql = format!(
918 r#"
919 SELECT * FROM memories
920 WHERE namespace_id = ?
921 AND is_active = 1
922 AND {COGNITIVE_LEVEL_EXPR} = ?
923 AND {perspective_sql}
924 ORDER BY created_at DESC
925 LIMIT ?
926 "#
927 );
928 let mut query = sqlx::query_as::<_, MemoryRow>(&sql)
929 .bind(namespace_id)
930 .bind(level.as_str());
931
932 for val in bind_perspective(perspective) {
933 query = query.bind(val);
934 }
935
936 let rows = query
937 .bind(limit)
938 .fetch_all(&self.pool)
939 .await
940 .map_err(db_error)?;
941
942 rows.into_iter()
943 .map(|row| self.row_to_memory(row))
944 .collect()
945 }
946
947 pub async fn get_most_reinforced_by_perspective(
951 &self,
952 namespace_id: i64,
953 perspective: &PerspectiveKey,
954 limit: i64,
955 ) -> Result<Vec<Memory>> {
956 self.get_most_reinforced_by_perspective_opts(namespace_id, perspective, limit, false)
957 .await
958 }
959
960 pub async fn get_most_reinforced_by_perspective_opts(
963 &self,
964 namespace_id: i64,
965 perspective: &PerspectiveKey,
966 limit: i64,
967 include_raw: bool,
968 ) -> Result<Vec<Memory>> {
969 let noise_sql = if include_raw {
970 String::new()
971 } else {
972 format!("AND {}", RAW_ACTIVITY_FILTER_SQL)
973 };
974
975 let perspective_sql = perspective_where_clause(perspective);
976 let sql = format!(
977 r#"
978 SELECT * FROM memories
979 WHERE namespace_id = ?
980 AND is_active = 1
981 AND {perspective_sql}
982 AND {COGNITIVE_LEVEL_EXPR} != ?
983 {noise_sql}
984 ORDER BY COALESCE(json_extract({METADATA}, '$.cognitive.times_reinforced'), 0) DESC,
985 created_at DESC
986 LIMIT ?
987 "#
988 );
989 let mut query = sqlx::query_as::<_, MemoryRow>(&sql).bind(namespace_id);
990
991 for val in bind_perspective(perspective) {
992 query = query.bind(val);
993 }
994
995 let rows = query
996 .bind(CognitiveLevel::Contradiction.as_str())
997 .bind(limit)
998 .fetch_all(&self.pool)
999 .await
1000 .map_err(db_error)?;
1001
1002 rows.into_iter()
1003 .map(|row| self.row_to_memory(row))
1004 .collect()
1005 }
1006
1007 pub async fn get_contradictions_by_perspective(
1011 &self,
1012 namespace_id: i64,
1013 perspective: &PerspectiveKey,
1014 limit: i64,
1015 ) -> Result<Vec<Memory>> {
1016 self.get_contradictions_by_perspective_opts(namespace_id, perspective, limit, false)
1017 .await
1018 }
1019
1020 pub async fn get_contradictions_by_perspective_opts(
1023 &self,
1024 namespace_id: i64,
1025 perspective: &PerspectiveKey,
1026 limit: i64,
1027 include_raw: bool,
1028 ) -> Result<Vec<Memory>> {
1029 let noise_sql = if include_raw {
1030 String::new()
1031 } else {
1032 format!("AND {}", RAW_ACTIVITY_FILTER_SQL)
1033 };
1034
1035 let perspective_sql = perspective_where_clause(perspective);
1036 let sql = format!(
1037 r#"
1038 SELECT * FROM memories
1039 WHERE namespace_id = ?
1040 AND is_active = 1
1041 AND {perspective_sql}
1042 AND {COGNITIVE_LEVEL_EXPR} = ?
1043 {noise_sql}
1044 ORDER BY COALESCE(json_extract({METADATA}, '$.cognitive.times_contradicted'), 0) DESC,
1045 created_at DESC
1046 LIMIT ?
1047 "#
1048 );
1049 let mut query = sqlx::query_as::<_, MemoryRow>(&sql).bind(namespace_id);
1050
1051 for val in bind_perspective(perspective) {
1052 query = query.bind(val);
1053 }
1054
1055 let rows = query
1056 .bind(CognitiveLevel::Contradiction.as_str())
1057 .bind(limit)
1058 .fetch_all(&self.pool)
1059 .await
1060 .map_err(db_error)?;
1061
1062 rows.into_iter()
1063 .map(|row| self.row_to_memory(row))
1064 .collect()
1065 }
1066
1067 pub async fn search_working_set(&self, params: WorkingSetParams<'_>) -> Result<Vec<Memory>> {
1086 let WorkingSetParams {
1087 namespace_id,
1088 perspective,
1089 max_items,
1090 include_raw,
1091 } = params;
1092
1093 let per_bucket = (max_items as i64).max(8);
1096
1097 let session = perspective
1099 .and_then(|p| p.session_key.as_deref())
1100 .unwrap_or("");
1101 let mut digests = Vec::new();
1102 if let Some(short) = self
1103 .latest_digest_for_session(namespace_id, session, "short")
1104 .await?
1105 {
1106 digests.push(short);
1107 }
1108 if let Some(long) = self
1109 .latest_digest_for_session(namespace_id, session, "long")
1110 .await?
1111 {
1112 digests.push(long);
1113 }
1114
1115 let reinforced = if let Some(persp) = perspective {
1117 self.get_most_reinforced_by_perspective_opts(
1118 namespace_id,
1119 persp,
1120 per_bucket,
1121 include_raw,
1122 )
1123 .await?
1124 } else {
1125 self.get_most_reinforced_by_namespace(namespace_id, per_bucket, include_raw)
1126 .await?
1127 };
1128
1129 let recent = if let Some(persp) = perspective {
1130 self.get_recent_by_perspective_opts(namespace_id, persp, per_bucket, include_raw)
1131 .await?
1132 } else {
1133 let sql = if include_raw {
1134 "SELECT * FROM memories WHERE namespace_id = ? AND is_active = 1 ORDER BY created_at DESC LIMIT ?"
1135 .to_string()
1136 } else {
1137 format!(
1138 "SELECT * FROM memories WHERE namespace_id = ? AND is_active = 1 AND {} ORDER BY created_at DESC LIMIT ?",
1139 RAW_ACTIVITY_FILTER_SQL,
1140 )
1141 };
1142 let rows: Vec<MemoryRow> = sqlx::query_as(&sql)
1143 .bind(namespace_id)
1144 .bind(per_bucket)
1145 .fetch_all(&self.pool)
1146 .await
1147 .map_err(db_error)?;
1148 rows.into_iter()
1149 .map(|r| self.row_to_memory(r))
1150 .collect::<Result<Vec<_>>>()?
1151 };
1152
1153 let contradictions = if let Some(persp) = perspective {
1154 self.get_contradictions_by_perspective_opts(
1155 namespace_id,
1156 persp,
1157 per_bucket,
1158 include_raw,
1159 )
1160 .await?
1161 } else {
1162 self.get_contradictions_by_namespace(namespace_id, per_bucket, include_raw)
1163 .await?
1164 };
1165
1166 let mut seen = std::collections::HashSet::new();
1168 let mut result = Vec::with_capacity(max_items);
1169
1170 for memory in digests
1171 .into_iter()
1172 .chain(reinforced)
1173 .chain(recent)
1174 .chain(contradictions)
1175 {
1176 if seen.insert(memory.id) {
1177 result.push(memory);
1178 if result.len() >= max_items {
1179 break;
1180 }
1181 }
1182 }
1183
1184 Ok(result)
1185 }
1186
1187 pub async fn load_lineage(&self, memory_id: i64) -> Result<Vec<MemoryLineageEntry>> {
1189 let rows: Vec<MemoryEvidenceRow> = sqlx::query_as::<_, MemoryEvidenceRow>(
1190 r#"
1191 SELECT * FROM memory_evidence
1192 WHERE derived_memory_id = ? OR source_memory_id = ?
1193 ORDER BY created_at ASC
1194 "#,
1195 )
1196 .bind(memory_id)
1197 .bind(memory_id)
1198 .fetch_all(&self.pool)
1199 .await
1200 .map_err(db_error)?;
1201
1202 Ok(rows
1203 .into_iter()
1204 .map(|r| MemoryLineageEntry {
1205 derived_memory_id: r.derived_memory_id,
1206 source_memory_id: r.source_memory_id,
1207 evidence_role: r.evidence_role,
1208 })
1209 .collect())
1210 }
1211
1212 pub async fn load_lineage_batch(
1214 &self,
1215 memory_ids: &[i64],
1216 ) -> Result<HashMap<i64, Vec<MemoryLineageEntry>>> {
1217 if memory_ids.is_empty() {
1218 return Ok(HashMap::new());
1219 }
1220
1221 let placeholders = memory_ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1222 let sql = format!(
1223 r#"
1224 SELECT * FROM memory_evidence
1225 WHERE derived_memory_id IN ({placeholders})
1226 OR source_memory_id IN ({placeholders})
1227 ORDER BY created_at ASC
1228 "#
1229 );
1230
1231 let mut query = sqlx::query_as::<_, MemoryEvidenceRow>(&sql);
1232 for id in memory_ids {
1233 query = query.bind(*id);
1234 }
1235 for id in memory_ids {
1236 query = query.bind(*id);
1237 }
1238
1239 let rows = query.fetch_all(&self.pool).await.map_err(db_error)?;
1240 let id_set: HashSet<i64> = memory_ids.iter().copied().collect();
1241 let mut grouped: HashMap<i64, Vec<MemoryLineageEntry>> = HashMap::new();
1242
1243 for row in rows {
1244 let entry = MemoryLineageEntry {
1245 derived_memory_id: row.derived_memory_id,
1246 source_memory_id: row.source_memory_id,
1247 evidence_role: row.evidence_role,
1248 };
1249
1250 if id_set.contains(&entry.derived_memory_id) {
1251 grouped
1252 .entry(entry.derived_memory_id)
1253 .or_default()
1254 .push(entry.clone());
1255 }
1256 if id_set.contains(&entry.source_memory_id) {
1257 grouped
1258 .entry(entry.source_memory_id)
1259 .or_default()
1260 .push(entry);
1261 }
1262 }
1263
1264 Ok(grouped)
1265 }
1266
1267 pub async fn get_by_id(&self, id: i64) -> Result<Option<Memory>> {
1269 let row: Option<MemoryRow> = sqlx::query_as("SELECT * FROM memories WHERE id = ?")
1270 .bind(id)
1271 .fetch_optional(&self.pool)
1272 .await
1273 .map_err(db_error)?;
1274
1275 row.map(|r| self.row_to_memory(r)).transpose()
1276 }
1277
1278 pub async fn get_by_ids(&self, ids: &[i64]) -> Result<Vec<Memory>> {
1283 if ids.is_empty() {
1284 return Ok(Vec::new());
1285 }
1286
1287 let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1289
1290 let sql = format!("SELECT * FROM memories WHERE id IN ({placeholders})");
1291
1292 let mut query = sqlx::query_as::<_, MemoryRow>(&sql);
1293 for id in ids {
1294 query = query.bind(*id);
1295 }
1296
1297 let rows = query.fetch_all(&self.pool).await.map_err(db_error)?;
1298
1299 let mut memories: Vec<Memory> = Vec::with_capacity(rows.len());
1300 for row in rows {
1301 memories.push(self.row_to_memory(row)?);
1302 }
1303
1304 Ok(memories)
1305 }
1306
1307 pub async fn get_by_content(&self, namespace_id: i64, content: &str) -> Result<Memory> {
1309 let row: Option<MemoryRow> = sqlx::query_as(
1310 "SELECT * FROM memories WHERE namespace_id = ? AND LOWER(TRIM(content)) = LOWER(TRIM(?)) AND is_active = 1 ORDER BY created_at DESC LIMIT 1"
1311 )
1312 .bind(namespace_id)
1313 .bind(content)
1314 .fetch_optional(&self.pool)
1315 .await
1316 .map_err(db_error)?;
1317
1318 row.map(|r| self.row_to_memory(r))
1319 .transpose()?
1320 .ok_or_else(|| {
1321 nexus_core::NexusError::Storage(
1322 "No memories found in namespace after insert".to_string(),
1323 )
1324 })
1325 }
1326
1327 pub async fn search_by_namespace(
1329 &self,
1330 namespace_id: i64,
1331 limit: usize,
1332 offset: usize,
1333 ) -> Result<Vec<Memory>> {
1334 let rows: Vec<MemoryRow> = sqlx::query_as(
1335 "SELECT * FROM memories WHERE namespace_id = ? AND is_active = 1 ORDER BY created_at DESC LIMIT ? OFFSET ?"
1336 )
1337 .bind(namespace_id)
1338 .bind(limit as i64)
1339 .bind(offset as i64)
1340 .fetch_all(&self.pool)
1341 .await
1342 .map_err(db_error)?;
1343
1344 rows.into_iter().map(|r| self.row_to_memory(r)).collect()
1345 }
1346
1347 pub async fn count_by_namespace(&self, namespace_id: i64) -> Result<i64> {
1349 let count: (i64,) = sqlx::query_as(
1350 "SELECT COUNT(*) FROM memories WHERE namespace_id = ? AND is_active = 1",
1351 )
1352 .bind(namespace_id)
1353 .fetch_one(&self.pool)
1354 .await
1355 .map_err(db_error)?;
1356
1357 Ok(count.0)
1358 }
1359
1360 pub async fn count_all_by_namespace(&self, namespace_id: i64) -> Result<i64> {
1362 let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM memories WHERE namespace_id = ?")
1363 .bind(namespace_id)
1364 .fetch_one(&self.pool)
1365 .await
1366 .map_err(db_error)?;
1367
1368 Ok(count.0)
1369 }
1370
1371 pub async fn count_archived_by_namespace(&self, namespace_id: i64) -> Result<i64> {
1373 let count: (i64,) = sqlx::query_as(
1374 "SELECT COUNT(*) FROM memories WHERE namespace_id = ? AND is_archived = 1",
1375 )
1376 .bind(namespace_id)
1377 .fetch_one(&self.pool)
1378 .await
1379 .map_err(db_error)?;
1380
1381 Ok(count.0)
1382 }
1383
1384 pub async fn delete(&self, id: i64) -> Result<bool> {
1386 let result = sqlx::query("DELETE FROM memories WHERE id = ?")
1387 .bind(id)
1388 .execute(&self.pool)
1389 .await
1390 .map_err(db_error)?;
1391
1392 Ok(result.rows_affected() > 0)
1393 }
1394
1395 pub async fn touch(&self, id: i64) -> Result<()> {
1397 sqlx::query(
1398 "UPDATE memories SET access_count = access_count + 1, last_accessed = ? WHERE id = ?",
1399 )
1400 .bind(Utc::now())
1401 .bind(id)
1402 .execute(&self.pool)
1403 .await
1404 .map_err(db_error)?;
1405
1406 Ok(())
1407 }
1408
1409 pub async fn get_unconsolidated(
1411 &self,
1412 namespace_id: i64,
1413 limit: i32,
1414 ) -> Result<Vec<MemoryRow>> {
1415 let rows = sqlx::query_as::<_, MemoryRow>(
1416 r#"
1417 SELECT * FROM memories
1418 WHERE namespace_id = ?
1419 AND is_active = 1
1420 AND (metadata IS NULL OR json_extract(metadata, '$.agent.consolidated') IS NULL)
1421 ORDER BY created_at ASC
1422 LIMIT ?
1423 "#,
1424 )
1425 .bind(namespace_id)
1426 .bind(limit)
1427 .fetch_all(&self.pool)
1428 .await
1429 .map_err(db_error)?;
1430
1431 Ok(rows)
1432 }
1433
1434 pub async fn mark_consolidated(&self, id: i64) -> Result<()> {
1436 sqlx::query(
1437 r#"
1438 UPDATE memories
1439 SET metadata = json_set(
1440 COALESCE(metadata, '{}'),
1441 '$.agent.consolidated',
1442 true,
1443 '$.agent.consolidated_at',
1444 datetime('now')
1445 ),
1446 updated_at = datetime('now')
1447 WHERE id = ?
1448 "#,
1449 )
1450 .bind(id)
1451 .execute(&self.pool)
1452 .await
1453 .map_err(db_error)?;
1454
1455 Ok(())
1456 }
1457
1458 pub async fn mark_consolidated_batch(&self, ids: &[i64]) -> Result<()> {
1460 if ids.is_empty() {
1461 return Ok(());
1462 }
1463 let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1464 let query = format!(
1465 r#"
1466 UPDATE memories
1467 SET metadata = json_set(
1468 COALESCE(metadata, '{{}}'),
1469 '$.agent.consolidated',
1470 true,
1471 '$.agent.consolidated_at',
1472 datetime('now')
1473 ),
1474 updated_at = datetime('now')
1475 WHERE id IN ({})
1476 "#,
1477 placeholders
1478 );
1479 let mut q = sqlx::query(&query);
1480 for id in ids {
1481 q = q.bind(*id);
1482 }
1483 q.execute(&self.pool).await.map_err(db_error)?;
1484 Ok(())
1485 }
1486
1487 pub async fn search_by_text(
1489 &self,
1490 namespace_id: i64,
1491 query: &str,
1492 limit: i32,
1493 include_raw: bool,
1494 ) -> Result<Vec<MemoryRow>> {
1495 let pattern = format!("%{}%", query);
1496 let raw_clause = if include_raw {
1497 String::new()
1498 } else {
1499 format!("AND {RAW_ACTIVITY_FILTER_SQL}")
1500 };
1501 let rows = sqlx::query_as::<_, MemoryRow>(&format!(
1502 r#"
1503 SELECT * FROM memories
1504 WHERE namespace_id = ?
1505 AND is_active = 1
1506 AND content LIKE ?
1507 {}
1508 ORDER BY updated_at DESC
1509 LIMIT ?
1510 "#,
1511 raw_clause
1512 ))
1513 .bind(namespace_id)
1514 .bind(&pattern)
1515 .bind(limit)
1516 .fetch_all(&self.pool)
1517 .await
1518 .map_err(db_error)?;
1519
1520 Ok(rows)
1521 }
1522
1523 pub async fn search_by_text_memories(
1525 &self,
1526 namespace_id: i64,
1527 query: &str,
1528 limit: i32,
1529 include_raw: bool,
1530 ) -> Result<Vec<Memory>> {
1531 let rows = self
1532 .search_by_text(namespace_id, query, limit, include_raw)
1533 .await?;
1534 rows.into_iter()
1535 .map(|row| self.row_to_memory(row))
1536 .collect()
1537 }
1538
1539 pub async fn get_semantic_candidates(
1541 &self,
1542 params: SemanticCandidateParams<'_>,
1543 ) -> Result<Vec<Memory>> {
1544 let SemanticCandidateParams {
1545 namespace_id,
1546 perspective,
1547 limit,
1548 include_raw,
1549 } = params;
1550
1551 let noise_sql = if include_raw {
1552 String::new()
1553 } else {
1554 format!("AND {}", RAW_ACTIVITY_FILTER_SQL)
1555 };
1556
1557 let rows = if let Some(perspective) = perspective {
1558 let sql = if perspective.session_key.is_some() {
1559 format!(
1560 r#"
1561 SELECT * FROM memories
1562 WHERE namespace_id = ?
1563 AND is_active = 1
1564 AND content_embedding IS NOT NULL
1565 AND json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.observer') = ?
1566 AND json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.subject') = ?
1567 AND (
1568 json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.session_key') = ?
1569 OR EXISTS (
1570 SELECT 1
1571 FROM json_each(COALESCE(json_extract(metadata, '$.cognitive.session_keys'), '[]'))
1572 WHERE value = ?
1573 )
1574 )
1575 {noise_sql}
1576 ORDER BY updated_at DESC, created_at DESC
1577 LIMIT ?
1578 "#
1579 )
1580 } else {
1581 format!(
1582 r#"
1583 SELECT * FROM memories
1584 WHERE namespace_id = ?
1585 AND is_active = 1
1586 AND content_embedding IS NOT NULL
1587 AND json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.observer') = ?
1588 AND json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.subject') = ?
1589 {noise_sql}
1590 ORDER BY updated_at DESC, created_at DESC
1591 LIMIT ?
1592 "#
1593 )
1594 };
1595
1596 let mut query = sqlx::query_as::<_, MemoryRow>(&sql)
1597 .bind(namespace_id)
1598 .bind(&perspective.observer)
1599 .bind(&perspective.subject);
1600
1601 if let Some(session_key) = &perspective.session_key {
1602 query = query.bind(session_key);
1603 query = query.bind(session_key);
1604 }
1605
1606 query
1607 .bind(limit)
1608 .fetch_all(&self.pool)
1609 .await
1610 .map_err(db_error)?
1611 } else {
1612 let sql = if include_raw {
1613 r#"
1614 SELECT * FROM memories
1615 WHERE namespace_id = ?
1616 AND is_active = 1
1617 AND content_embedding IS NOT NULL
1618 ORDER BY updated_at DESC, created_at DESC
1619 LIMIT ?
1620 "#
1621 .to_string()
1622 } else {
1623 format!(
1624 r#"
1625 SELECT * FROM memories
1626 WHERE namespace_id = ?
1627 AND is_active = 1
1628 AND content_embedding IS NOT NULL
1629 AND {}
1630 ORDER BY updated_at DESC, created_at DESC
1631 LIMIT ?
1632 "#,
1633 RAW_ACTIVITY_FILTER_SQL,
1634 )
1635 };
1636
1637 sqlx::query_as::<_, MemoryRow>(&sql)
1638 .bind(namespace_id)
1639 .bind(limit)
1640 .fetch_all(&self.pool)
1641 .await
1642 .map_err(db_error)?
1643 };
1644
1645 rows.into_iter()
1646 .map(|row| self.row_to_memory(row))
1647 .collect()
1648 }
1649
1650 pub async fn list_filtered(
1652 &self,
1653 namespace_id: i64,
1654 filters: ListMemoryFilters<'_>,
1655 ) -> Result<Vec<Memory>> {
1656 let mut conditions = vec!["namespace_id = ?1".to_string(), "is_active = 1".to_string()];
1658 let mut param_idx = 2u32;
1659
1660 if filters.category.is_some() {
1661 conditions.push(format!("category = ?{}", param_idx));
1662 param_idx += 1;
1663 }
1664 if filters.since.is_some() {
1665 conditions.push(format!("created_at >= ?{}", param_idx));
1666 param_idx += 1;
1667 }
1668 if filters.until.is_some() {
1669 conditions.push(format!("created_at <= ?{}", param_idx));
1670 param_idx += 1;
1671 }
1672 if filters.content_like.is_some() {
1673 conditions.push(format!("content LIKE ?{}", param_idx));
1674 param_idx += 1;
1675 }
1676 if !filters.include_raw {
1677 conditions.push(RAW_ACTIVITY_FILTER_SQL.to_string());
1678 }
1679
1680 let sql = format!(
1681 "SELECT * FROM memories WHERE {} ORDER BY created_at DESC LIMIT ?{} OFFSET ?{}",
1682 conditions.join(" AND "),
1683 param_idx,
1684 param_idx + 1,
1685 );
1686
1687 let mut query = sqlx::query_as::<_, MemoryRow>(&sql).bind(namespace_id);
1688
1689 if let Some(cat) = filters.category {
1690 query = query.bind(cat.to_string());
1691 }
1692 if let Some(s) = filters.since {
1693 query = query.bind(s);
1694 }
1695 if let Some(u) = filters.until {
1696 query = query.bind(u);
1697 }
1698 if let Some(search) = filters.content_like {
1699 query = query.bind(format!("%{}%", search));
1700 }
1701
1702 let rows: Vec<MemoryRow> = query
1703 .bind(filters.limit)
1704 .bind(filters.offset)
1705 .fetch_all(&self.pool)
1706 .await
1707 .map_err(db_error)?;
1708
1709 rows.into_iter().map(|r| self.row_to_memory(r)).collect()
1710 }
1711
1712 pub async fn list_missing_cognitive_metadata(
1714 &self,
1715 namespace_id: i64,
1716 limit: i64,
1717 offset: i64,
1718 ) -> Result<Vec<Memory>> {
1719 let rows: Vec<MemoryRow> = sqlx::query_as(
1720 r#"
1721 SELECT * FROM memories
1722 WHERE namespace_id = ?
1723 AND json_extract(COALESCE(metadata, '{}'), '$.cognitive.level') IS NULL
1724 ORDER BY id ASC
1725 LIMIT ? OFFSET ?
1726 "#,
1727 )
1728 .bind(namespace_id)
1729 .bind(limit)
1730 .bind(offset)
1731 .fetch_all(&self.pool)
1732 .await
1733 .map_err(db_error)?;
1734
1735 rows.into_iter().map(|r| self.row_to_memory(r)).collect()
1736 }
1737
1738 pub async fn count_missing_cognitive_metadata(&self, namespace_id: i64) -> Result<i64> {
1740 let count: i64 = sqlx::query_scalar(
1741 r#"
1742 SELECT COUNT(*) FROM memories
1743 WHERE namespace_id = ?
1744 AND json_extract(COALESCE(metadata, '{}'), '$.cognitive.level') IS NULL
1745 "#,
1746 )
1747 .bind(namespace_id)
1748 .fetch_one(&self.pool)
1749 .await
1750 .map_err(db_error)?;
1751
1752 Ok(count)
1753 }
1754
1755 pub async fn update_memory_metadata(
1757 &self,
1758 memory_id: i64,
1759 metadata: &serde_json::Value,
1760 ) -> Result<()> {
1761 let metadata_json = serde_json::to_string(metadata)?;
1762 sqlx::query(
1763 r#"
1764 UPDATE memories
1765 SET metadata = ?, updated_at = ?
1766 WHERE id = ?
1767 "#,
1768 )
1769 .bind(metadata_json)
1770 .bind(Utc::now())
1771 .bind(memory_id)
1772 .execute(&self.pool)
1773 .await
1774 .map_err(db_error)?;
1775
1776 Ok(())
1777 }
1778
1779 pub async fn list_session_keys_without_digests(
1781 &self,
1782 namespace_id: i64,
1783 limit: i64,
1784 ) -> Result<Vec<String>> {
1785 let rows: Vec<(String,)> = sqlx::query_as(
1786 r#"
1787 SELECT DISTINCT json_extract(COALESCE(m.metadata, '{}'), '$.cognitive.session_key') AS session_key
1788 FROM memories m
1789 WHERE m.namespace_id = ?
1790 AND json_extract(COALESCE(m.metadata, '{}'), '$.cognitive.session_key') IS NOT NULL
1791 AND TRIM(json_extract(COALESCE(m.metadata, '{}'), '$.cognitive.session_key')) <> ''
1792 AND NOT EXISTS (
1793 SELECT 1 FROM session_digests sd
1794 WHERE sd.namespace_id = m.namespace_id
1795 AND sd.session_key = json_extract(COALESCE(m.metadata, '{}'), '$.cognitive.session_key')
1796 )
1797 ORDER BY session_key ASC
1798 LIMIT ?
1799 "#,
1800 )
1801 .bind(namespace_id)
1802 .bind(limit)
1803 .fetch_all(&self.pool)
1804 .await
1805 .map_err(db_error)?;
1806
1807 Ok(rows.into_iter().map(|(session_key,)| session_key).collect())
1808 }
1809
1810 pub async fn count_distinct_session_keys_with_cognition(
1812 &self,
1813 namespace_id: i64,
1814 ) -> Result<i64> {
1815 let count: i64 = sqlx::query_scalar(
1816 r#"
1817 SELECT COUNT(DISTINCT json_extract(COALESCE(metadata, '{}'), '$.cognitive.session_key'))
1818 FROM memories
1819 WHERE namespace_id = ?
1820 AND is_active = 1
1821 AND json_extract(COALESCE(metadata, '{}'), '$.cognitive.session_key') IS NOT NULL
1822 AND TRIM(json_extract(COALESCE(metadata, '{}'), '$.cognitive.session_key')) <> ''
1823 "#,
1824 )
1825 .bind(namespace_id)
1826 .fetch_one(&self.pool)
1827 .await
1828 .map_err(db_error)?;
1829
1830 Ok(count)
1831 }
1832
1833 pub async fn list_archived_raw_cleanup_candidates(
1835 &self,
1836 namespace_id: i64,
1837 older_than: DateTime<Utc>,
1838 limit: i64,
1839 ) -> Result<Vec<Memory>> {
1840 let rows: Vec<MemoryRow> = sqlx::query_as(
1841 r#"
1842 SELECT * FROM memories
1843 WHERE namespace_id = ?
1844 AND is_active = 0
1845 AND is_archived = 1
1846 AND (
1847 labels LIKE '%raw-activity%'
1848 OR json_extract(COALESCE(metadata, '{}'), '$.raw_activity') = 1
1849 )
1850 AND json_extract(COALESCE(metadata, '{}'), '$.distillation.summary_memory_id') IS NOT NULL
1851 AND created_at <= ?
1852 ORDER BY created_at ASC
1853 LIMIT ?
1854 "#,
1855 )
1856 .bind(namespace_id)
1857 .bind(older_than)
1858 .bind(limit)
1859 .fetch_all(&self.pool)
1860 .await
1861 .map_err(db_error)?;
1862
1863 rows.into_iter().map(|r| self.row_to_memory(r)).collect()
1864 }
1865
1866 pub async fn count_archived_raw_cleanup_candidates(
1868 &self,
1869 namespace_id: i64,
1870 older_than: DateTime<Utc>,
1871 ) -> Result<i64> {
1872 let count: i64 = sqlx::query_scalar(
1873 r#"
1874 SELECT COUNT(*) FROM memories
1875 WHERE namespace_id = ?
1876 AND is_active = 0
1877 AND is_archived = 1
1878 AND (
1879 labels LIKE '%raw-activity%'
1880 OR json_extract(COALESCE(metadata, '{}'), '$.raw_activity') = 1
1881 )
1882 AND json_extract(COALESCE(metadata, '{}'), '$.distillation.summary_memory_id') IS NOT NULL
1883 AND created_at <= ?
1884 "#,
1885 )
1886 .bind(namespace_id)
1887 .bind(older_than)
1888 .fetch_one(&self.pool)
1889 .await
1890 .map_err(db_error)?;
1891
1892 Ok(count)
1893 }
1894
1895 pub async fn delete_batch(&self, ids: &[i64]) -> Result<u64> {
1897 if ids.is_empty() {
1898 return Ok(0);
1899 }
1900
1901 let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1902 let sql = format!("DELETE FROM memories WHERE id IN ({placeholders})");
1903 let mut query = sqlx::query(&sql);
1904 for id in ids {
1905 query = query.bind(*id);
1906 }
1907
1908 let result = query.execute(&self.pool).await.map_err(db_error)?;
1909 Ok(result.rows_affected())
1910 }
1911
1912 pub async fn delete_by_content_pattern(&self, namespace_id: i64, pattern: &str) -> Result<u64> {
1914 let result = sqlx::query("DELETE FROM memories WHERE namespace_id = ? AND content LIKE ?")
1915 .bind(namespace_id)
1916 .bind(pattern)
1917 .execute(&self.pool)
1918 .await
1919 .map_err(db_error)?;
1920
1921 Ok(result.rows_affected())
1922 }
1923
1924 pub async fn count_filtered(
1926 &self,
1927 namespace_id: i64,
1928 category: Option<&str>,
1929 since: Option<DateTime<Utc>>,
1930 until: Option<DateTime<Utc>>,
1931 include_raw: bool,
1932 ) -> Result<i64> {
1933 let mut conditions = vec!["namespace_id = ?1".to_string(), "is_active = 1".to_string()];
1934 let mut param_idx = 2u32;
1935
1936 if category.is_some() {
1937 conditions.push(format!("category = ?{}", param_idx));
1938 param_idx += 1;
1939 }
1940 if since.is_some() {
1941 conditions.push(format!("created_at >= ?{}", param_idx));
1942 param_idx += 1;
1943 }
1944 if until.is_some() {
1945 conditions.push(format!("created_at <= ?{}", param_idx));
1946 }
1947 if !include_raw {
1948 conditions.push(RAW_ACTIVITY_FILTER_SQL.to_string());
1949 }
1950
1951 let sql = format!(
1952 "SELECT COUNT(*) FROM memories WHERE {}",
1953 conditions.join(" AND "),
1954 );
1955
1956 let mut query = sqlx::query_scalar::<_, i64>(&sql).bind(namespace_id);
1957
1958 if let Some(cat) = category {
1959 query = query.bind(cat.to_string());
1960 }
1961 if let Some(s) = since {
1962 query = query.bind(s);
1963 }
1964 if let Some(u) = until {
1965 query = query.bind(u);
1966 }
1967
1968 let count = query.fetch_one(&self.pool).await.map_err(db_error)?;
1969 Ok(count)
1970 }
1971
1972 pub async fn store_distilled_summary(
1974 &self,
1975 params: StoreMemoryParams<'_>,
1976 source_ids: &[i64],
1977 ) -> Result<Memory> {
1978 let labels_json = serde_json::to_string(params.labels)?;
1979 let metadata_json = serde_json::to_string(params.metadata)?;
1980 let embedding_json = params.embedding.map(serde_json::to_string).transpose()?;
1981 let mut tx = self.pool.begin().await.map_err(db_error)?;
1982
1983 let result = sqlx::query(
1984 r#"
1985 INSERT INTO memories (
1986 namespace_id, content, category, memory_lane_type, labels, metadata,
1987 content_embedding, embedding_model, created_at, is_active, access_count
1988 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 1, 0)
1989 "#,
1990 )
1991 .bind(params.namespace_id)
1992 .bind(params.content)
1993 .bind(params.category.to_string())
1994 .bind(params.memory_lane_type.map(|t| t.to_string()))
1995 .bind(&labels_json)
1996 .bind(&metadata_json)
1997 .bind(&embedding_json)
1998 .bind(params.embedding_model)
1999 .bind(Utc::now())
2000 .execute(&mut *tx)
2001 .await
2002 .map_err(db_error)?;
2003
2004 let summary_id = if result.last_insert_rowid() == 0 {
2005 let row: Option<MemoryRow> = sqlx::query_as(
2006 "SELECT * FROM memories WHERE namespace_id = ? AND LOWER(TRIM(content)) = LOWER(TRIM(?)) ORDER BY created_at DESC LIMIT 1",
2007 )
2008 .bind(params.namespace_id)
2009 .bind(params.content)
2010 .fetch_optional(&mut *tx)
2011 .await
2012 .map_err(db_error)?;
2013 row.map(|memory| memory.id).ok_or_else(|| {
2014 nexus_core::NexusError::Storage(
2015 "Duplicate distilled summary merged but matching row not found".to_string(),
2016 )
2017 })?
2018 } else {
2019 result.last_insert_rowid()
2020 };
2021
2022 if !source_ids.is_empty() {
2023 for source_id in source_ids {
2024 sqlx::query(
2025 r#"
2026 INSERT OR IGNORE INTO memory_evidence (
2027 derived_memory_id,
2028 source_memory_id,
2029 evidence_role,
2030 created_at
2031 ) VALUES (?, ?, 'source', datetime('now'))
2032 "#,
2033 )
2034 .bind(summary_id)
2035 .bind(*source_id)
2036 .execute(&mut *tx)
2037 .await
2038 .map_err(db_error)?;
2039 }
2040
2041 let placeholders = source_ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
2042 let sql = format!(
2043 r#"
2044 UPDATE memories
2045 SET
2046 is_active = 0,
2047 is_archived = 1,
2048 updated_at = ?,
2049 metadata = json_set(
2050 COALESCE(metadata, '{{}}'),
2051 '$.distillation.status', 'archived',
2052 '$.distillation.summary_memory_id', ?,
2053 '$.distillation.archived_at', ?
2054 )
2055 WHERE id IN ({})
2056 "#,
2057 placeholders
2058 );
2059 let archived_at = Utc::now().to_rfc3339();
2060 let mut query = sqlx::query(&sql)
2061 .bind(Utc::now())
2062 .bind(summary_id)
2063 .bind(&archived_at);
2064 for source_id in source_ids {
2065 query = query.bind(*source_id);
2066 }
2067 query.execute(&mut *tx).await.map_err(db_error)?;
2068 }
2069
2070 tx.commit().await.map_err(db_error)?;
2071 self.get_by_id(summary_id).await?.ok_or_else(|| {
2072 nexus_core::NexusError::Storage(format!(
2073 "Failed to retrieve distilled summary with id {}",
2074 summary_id
2075 ))
2076 })
2077 }
2078
2079 fn row_to_memory(&self, row: MemoryRow) -> Result<Memory> {
2080 let labels: Vec<String> = serde_json::from_str(&row.labels).map_err(|e| {
2081 nexus_core::NexusError::Storage(format!(
2082 "corrupted labels JSON for memory {}: {e}",
2083 row.id
2084 ))
2085 })?;
2086 let metadata: serde_json::Value = serde_json::from_str(&row.metadata).map_err(|e| {
2087 nexus_core::NexusError::Storage(format!(
2088 "corrupted metadata JSON for memory {}: {e}",
2089 row.id
2090 ))
2091 })?;
2092 let embedding: Option<Vec<f32>> = row
2093 .content_embedding
2094 .map(|e| {
2095 serde_json::from_str(&e).map_err(|err| {
2096 nexus_core::NexusError::Storage(format!(
2097 "corrupted embedding JSON for memory {}: {err}",
2098 row.id
2099 ))
2100 })
2101 })
2102 .transpose()?;
2103
2104 Ok(Memory {
2105 id: row.id,
2106 namespace_id: row.namespace_id,
2107 content: row.content,
2108 category: parse_category(&row.category)?,
2109 memory_lane_type: match &row.memory_lane_type {
2110 Some(s) => parse_memory_lane_type(s)?,
2111 None => None,
2112 },
2113 labels,
2114 metadata,
2115 similarity_score: row.similarity_score,
2116 relevance_score: row.relevance_score,
2117 content_embedding: embedding,
2118 embedding_model: row.embedding_model,
2119 created_at: row.created_at,
2120 updated_at: row.updated_at,
2121 last_accessed: row.last_accessed,
2122 is_active: row.is_active,
2123 is_archived: row.is_archived,
2124 access_count: row.access_count,
2125 })
2126 }
2127
2128 pub async fn list_jobs(
2132 &self,
2133 namespace_id: i64,
2134 job_type: Option<&str>,
2135 status: Option<&str>,
2136 limit: i64,
2137 offset: i64,
2138 ) -> Result<Vec<MemoryJobRow>> {
2139 let mut where_clauses = vec!["namespace_id = ?".to_string()];
2140 if job_type.is_some() {
2141 where_clauses.push("job_type = ?".to_string());
2142 }
2143 if status.is_some() {
2144 where_clauses.push("status = ?".to_string());
2145 }
2146 let where_sql = where_clauses.join(" AND ");
2147
2148 let sql = format!(
2149 "SELECT * FROM memory_jobs WHERE {} ORDER BY created_at DESC LIMIT ? OFFSET ?",
2150 where_sql
2151 );
2152
2153 let mut query = sqlx::query_as::<_, MemoryJobRow>(&sql).bind(namespace_id);
2154 if let Some(jt) = job_type {
2155 query = query.bind(jt);
2156 }
2157 if let Some(st) = status {
2158 query = query.bind(st);
2159 }
2160 query = query.bind(limit).bind(offset);
2161
2162 let rows = query.fetch_all(&self.pool).await.map_err(db_error)?;
2163 Ok(rows)
2164 }
2165
2166 pub async fn count_jobs(
2168 &self,
2169 namespace_id: i64,
2170 job_type: Option<&str>,
2171 status: Option<&str>,
2172 ) -> Result<i64> {
2173 let mut where_clauses = vec!["namespace_id = ?".to_string()];
2174 if job_type.is_some() {
2175 where_clauses.push("job_type = ?".to_string());
2176 }
2177 if status.is_some() {
2178 where_clauses.push("status = ?".to_string());
2179 }
2180 let where_sql = where_clauses.join(" AND ");
2181
2182 let sql = format!("SELECT COUNT(*) FROM memory_jobs WHERE {}", where_sql);
2183
2184 let mut query = sqlx::query_scalar::<_, i64>(&sql).bind(namespace_id);
2185 if let Some(jt) = job_type {
2186 query = query.bind(jt);
2187 }
2188 if let Some(st) = status {
2189 query = query.bind(st);
2190 }
2191
2192 let count = query.fetch_one(&self.pool).await.map_err(db_error)?;
2193 Ok(count)
2194 }
2195
2196 pub async fn count_jobs_by_status(
2198 &self,
2199 namespace_id: i64,
2200 job_type: Option<&str>,
2201 ) -> Result<Vec<(String, i64)>> {
2202 let mut where_clauses = vec!["namespace_id = ?".to_string()];
2203 if job_type.is_some() {
2204 where_clauses.push("job_type = ?".to_string());
2205 }
2206 let where_sql = where_clauses.join(" AND ");
2207
2208 let sql = format!(
2209 "SELECT status, COUNT(*) as cnt FROM memory_jobs WHERE {} GROUP BY status",
2210 where_sql
2211 );
2212
2213 let mut query = sqlx::query_as::<_, (String, i64)>(&sql).bind(namespace_id);
2214 if let Some(jt) = job_type {
2215 query = query.bind(jt);
2216 }
2217
2218 let rows = query.fetch_all(&self.pool).await.map_err(db_error)?;
2219 Ok(rows)
2220 }
2221
2222 pub async fn purge_completed_jobs(&self, older_than: DateTime<Utc>) -> Result<u64> {
2226 let result = sqlx::query(
2227 r#"
2228 DELETE FROM memory_jobs
2229 WHERE status = ? AND updated_at < ?
2230 "#,
2231 )
2232 .bind(memory_job_status::COMPLETED)
2233 .bind(older_than.format("%Y-%m-%d %H:%M:%S").to_string())
2234 .execute(&self.pool)
2235 .await
2236 .map_err(db_error)?;
2237
2238 Ok(result.rows_affected())
2239 }
2240
2241 pub async fn purge_permanently_failed_jobs(&self, older_than: DateTime<Utc>) -> Result<u64> {
2246 let result = sqlx::query(
2247 r#"
2248 DELETE FROM memory_jobs
2249 WHERE status = ? AND attempts >= ? AND updated_at < ?
2250 "#,
2251 )
2252 .bind(memory_job_status::FAILED)
2253 .bind(MAX_JOB_ATTEMPTS)
2254 .bind(older_than.format("%Y-%m-%d %H:%M:%S").to_string())
2255 .execute(&self.pool)
2256 .await
2257 .map_err(db_error)?;
2258
2259 Ok(result.rows_affected())
2260 }
2261
2262 pub async fn list_digests(
2264 &self,
2265 namespace_id: i64,
2266 session_key: Option<&str>,
2267 limit: i64,
2268 offset: i64,
2269 ) -> Result<Vec<SessionDigestRow>> {
2270 let mut query = if let Some(sk) = session_key {
2271 sqlx::query_as::<_, SessionDigestRow>(
2272 "SELECT * FROM session_digests WHERE namespace_id = ? AND session_key = ? ORDER BY created_at DESC LIMIT ? OFFSET ?"
2273 )
2274 .bind(namespace_id)
2275 .bind(sk)
2276 } else {
2277 sqlx::query_as::<_, SessionDigestRow>(
2278 "SELECT * FROM session_digests WHERE namespace_id = ? ORDER BY created_at DESC LIMIT ? OFFSET ?"
2279 )
2280 .bind(namespace_id)
2281 };
2282
2283 query = query.bind(limit).bind(offset);
2284 let rows = query.fetch_all(&self.pool).await.map_err(db_error)?;
2285 Ok(rows)
2286 }
2287
2288 pub async fn count_digests(&self, namespace_id: i64, session_key: Option<&str>) -> Result<i64> {
2290 let query = if let Some(sk) = session_key {
2291 sqlx::query_scalar(
2292 "SELECT COUNT(*) FROM session_digests WHERE namespace_id = ? AND session_key = ?",
2293 )
2294 .bind(namespace_id)
2295 .bind(sk)
2296 } else {
2297 sqlx::query_scalar("SELECT COUNT(*) FROM session_digests WHERE namespace_id = ?")
2298 .bind(namespace_id)
2299 };
2300
2301 let count: i64 = query.fetch_one(&self.pool).await.map_err(db_error)?;
2302 Ok(count)
2303 }
2304
2305 pub async fn count_evidence(&self, namespace_id: i64) -> Result<i64> {
2307 let count: i64 = sqlx::query_scalar(
2308 "SELECT COUNT(*) FROM memory_evidence WHERE derived_memory_id IN (SELECT id FROM memories WHERE namespace_id = ?)"
2309 )
2310 .bind(namespace_id)
2311 .fetch_one(&self.pool)
2312 .await
2313 .map_err(db_error)?;
2314 Ok(count)
2315 }
2316
2317 pub async fn record_metric(
2319 &self,
2320 metric_name: &str,
2321 metric_value: f64,
2322 labels: &serde_json::Value,
2323 ) -> Result<i64> {
2324 let labels_json = serde_json::to_string(labels)?;
2325 let id: i64 = sqlx::query_scalar(
2326 r#"
2327 INSERT INTO system_metrics (metric_name, metric_value, labels, recorded_at)
2328 VALUES (?, ?, ?, ?)
2329 RETURNING id
2330 "#,
2331 )
2332 .bind(metric_name)
2333 .bind(metric_value)
2334 .bind(labels_json)
2335 .bind(Utc::now())
2336 .fetch_one(&self.pool)
2337 .await
2338 .map_err(db_error)?;
2339 Ok(id)
2340 }
2341
2342 pub async fn record_metrics_batch(&self, samples: &[MetricSample]) -> Result<()> {
2344 if samples.is_empty() {
2345 return Ok(());
2346 }
2347
2348 let mut tx = self.pool.begin().await.map_err(db_error)?;
2349 for sample in samples {
2350 let labels_json = serde_json::to_string(&sample.labels)?;
2351 sqlx::query(
2352 r#"
2353 INSERT INTO system_metrics (metric_name, metric_value, labels, recorded_at)
2354 VALUES (?, ?, ?, ?)
2355 "#,
2356 )
2357 .bind(&sample.metric_name)
2358 .bind(sample.metric_value)
2359 .bind(labels_json)
2360 .bind(Utc::now())
2361 .execute(&mut *tx)
2362 .await
2363 .map_err(db_error)?;
2364 }
2365 tx.commit().await.map_err(db_error)?;
2366 Ok(())
2367 }
2368
2369 pub async fn latest_metrics_for_namespace(
2371 &self,
2372 namespace_id: i64,
2373 metric_prefix: Option<&str>,
2374 limit: i64,
2375 ) -> Result<Vec<SystemMetricRow>> {
2376 let limit = limit.max(1);
2377 let rows = if let Some(prefix) = metric_prefix {
2378 sqlx::query_as::<_, SystemMetricRow>(
2379 r#"
2380 SELECT *
2381 FROM system_metrics
2382 WHERE json_extract(COALESCE(labels, '{}'), '$.namespace_id') = ?
2383 AND metric_name LIKE ?
2384 ORDER BY recorded_at DESC, id DESC
2385 LIMIT ?
2386 "#,
2387 )
2388 .bind(namespace_id)
2389 .bind(format!("{prefix}%"))
2390 .bind(limit)
2391 .fetch_all(&self.pool)
2392 .await
2393 .map_err(db_error)?
2394 } else {
2395 sqlx::query_as::<_, SystemMetricRow>(
2396 r#"
2397 SELECT *
2398 FROM system_metrics
2399 WHERE json_extract(COALESCE(labels, '{}'), '$.namespace_id') = ?
2400 ORDER BY recorded_at DESC, id DESC
2401 LIMIT ?
2402 "#,
2403 )
2404 .bind(namespace_id)
2405 .bind(limit)
2406 .fetch_all(&self.pool)
2407 .await
2408 .map_err(db_error)?
2409 };
2410 Ok(rows)
2411 }
2412
2413 pub async fn count_by_cognitive_level(
2415 &self,
2416 namespace_id: i64,
2417 level: CognitiveLevel,
2418 ) -> Result<i64> {
2419 let count: i64 = sqlx::query_scalar(
2420 r#"
2421 SELECT COUNT(*) FROM memories
2422 WHERE namespace_id = ?
2423 AND is_active = 1
2424 AND json_extract(COALESCE(metadata, '{}'), '$.cognitive.level') = ?
2425 "#,
2426 )
2427 .bind(namespace_id)
2428 .bind(level.as_str())
2429 .fetch_one(&self.pool)
2430 .await
2431 .map_err(db_error)?;
2432 Ok(count)
2433 }
2434}
2435
2436async fn insert_memory_tx(
2437 tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>,
2438 params: &StoreMemoryParams<'_>,
2439) -> Result<i64> {
2440 let labels_json = serde_json::to_string(params.labels)?;
2441 let metadata_json = serde_json::to_string(params.metadata)?;
2442 let embedding_json = params.embedding.map(serde_json::to_string).transpose()?;
2443
2444 let result = sqlx::query(
2445 r#"
2446 INSERT INTO memories (
2447 namespace_id, content, category, memory_lane_type, labels, metadata,
2448 content_embedding, embedding_model, created_at, is_active, access_count
2449 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 1, 0)
2450 "#,
2451 )
2452 .bind(params.namespace_id)
2453 .bind(params.content)
2454 .bind(params.category.to_string())
2455 .bind(params.memory_lane_type.map(|t| t.to_string()))
2456 .bind(&labels_json)
2457 .bind(&metadata_json)
2458 .bind(&embedding_json)
2459 .bind(params.embedding_model)
2460 .bind(Utc::now())
2461 .execute(&mut **tx)
2462 .await
2463 .map_err(db_error)?;
2464
2465 let inserted_id = result.last_insert_rowid();
2466 if inserted_id != 0 {
2467 return Ok(inserted_id);
2468 }
2469
2470 let row: Option<MemoryRow> = sqlx::query_as(
2471 "SELECT * FROM memories WHERE namespace_id = ? AND LOWER(TRIM(content)) = LOWER(TRIM(?)) AND is_active = 1 ORDER BY created_at DESC LIMIT 1",
2472 )
2473 .bind(params.namespace_id)
2474 .bind(params.content)
2475 .fetch_optional(&mut **tx)
2476 .await
2477 .map_err(db_error)?;
2478
2479 row.map(|memory| memory.id).ok_or_else(|| {
2480 nexus_core::NexusError::Storage(
2481 "Duplicate merged by trigger but matching row not found".to_string(),
2482 )
2483 })
2484}
2485
2486async fn insert_evidence_tx(
2487 tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>,
2488 derived_memory_id: i64,
2489 source_memory_id: i64,
2490 evidence_role: &str,
2491) -> Result<()> {
2492 sqlx::query(
2493 r#"
2494 INSERT OR IGNORE INTO memory_evidence (derived_memory_id, source_memory_id, evidence_role, created_at)
2495 VALUES (?, ?, ?, datetime('now'))
2496 "#,
2497 )
2498 .bind(derived_memory_id)
2499 .bind(source_memory_id)
2500 .bind(evidence_role)
2501 .execute(&mut **tx)
2502 .await
2503 .map_err(db_error)?;
2504
2505 Ok(())
2506}
2507
2508fn new_claim_token(lease_owner: &str) -> String {
2509 let nanos = Utc::now().timestamp_nanos_opt().unwrap_or_default();
2510 format!("{lease_owner}-{nanos}-{}", std::process::id())
2511}
2512
2513fn merge_labels(existing: &[String], incoming: &[String]) -> Vec<String> {
2514 let mut merged = existing.to_vec();
2515 for label in incoming {
2516 if !merged
2517 .iter()
2518 .any(|current| current.eq_ignore_ascii_case(label))
2519 {
2520 merged.push(label.clone());
2521 }
2522 }
2523 merged
2524}
2525
2526fn merge_duplicate_metadata(
2527 existing: &serde_json::Value,
2528 incoming: &serde_json::Value,
2529) -> serde_json::Value {
2530 let mut merged = existing.clone();
2531
2532 if let Some(session_key) = incoming
2533 .pointer("/cognitive/session_key")
2534 .and_then(serde_json::Value::as_str)
2535 {
2536 let mut session_keys = existing
2537 .pointer("/cognitive/session_keys")
2538 .and_then(serde_json::Value::as_array)
2539 .cloned()
2540 .unwrap_or_default();
2541 if let Some(existing_key) = existing
2542 .pointer("/cognitive/session_key")
2543 .and_then(serde_json::Value::as_str)
2544 {
2545 push_unique_json_string(&mut session_keys, existing_key);
2546 }
2547 push_unique_json_string(&mut session_keys, session_key);
2548 ensure_object_path(&mut merged, "cognitive").insert(
2549 "session_key".to_string(),
2550 serde_json::Value::String(session_key.to_string()),
2551 );
2552 ensure_object_path(&mut merged, "cognitive").insert(
2553 "session_keys".to_string(),
2554 serde_json::Value::Array(session_keys),
2555 );
2556 }
2557
2558 if let Some(derived_session_key) = incoming
2559 .pointer("/source/derived_session_key")
2560 .and_then(serde_json::Value::as_str)
2561 {
2562 let mut derived_keys = existing
2563 .pointer("/source/derived_session_keys")
2564 .and_then(serde_json::Value::as_array)
2565 .cloned()
2566 .unwrap_or_default();
2567 if let Some(existing_key) = existing
2568 .pointer("/source/derived_session_key")
2569 .and_then(serde_json::Value::as_str)
2570 {
2571 push_unique_json_string(&mut derived_keys, existing_key);
2572 }
2573 push_unique_json_string(&mut derived_keys, derived_session_key);
2574 ensure_object_path(&mut merged, "source").insert(
2575 "derived_session_key".to_string(),
2576 serde_json::Value::String(derived_session_key.to_string()),
2577 );
2578 ensure_object_path(&mut merged, "source").insert(
2579 "derived_session_keys".to_string(),
2580 serde_json::Value::Array(derived_keys),
2581 );
2582 }
2583
2584 merged
2585}
2586
2587fn push_unique_json_string(values: &mut Vec<serde_json::Value>, candidate: &str) {
2588 if values
2589 .iter()
2590 .filter_map(serde_json::Value::as_str)
2591 .any(|current| current.eq_ignore_ascii_case(candidate))
2592 {
2593 return;
2594 }
2595 values.push(serde_json::Value::String(candidate.to_string()));
2596}
2597
2598fn ensure_object_path<'a>(
2599 root: &'a mut serde_json::Value,
2600 key: &str,
2601) -> &'a mut serde_json::Map<String, serde_json::Value> {
2602 if !root.is_object() {
2603 *root = serde_json::Value::Object(serde_json::Map::new());
2604 }
2605
2606 let object = root.as_object_mut().expect("root object ensured");
2607 let entry = object
2608 .entry(key.to_string())
2609 .or_insert_with(|| serde_json::Value::Object(serde_json::Map::new()));
2610 if !entry.is_object() {
2611 *entry = serde_json::Value::Object(serde_json::Map::new());
2612 }
2613
2614 entry.as_object_mut().expect("child object ensured")
2615}
2616
2617pub struct NamespaceRepository {
2619 pool: SqlitePool,
2620}
2621
2622impl NamespaceRepository {
2623 pub fn new(pool: SqlitePool) -> Self {
2624 Self { pool }
2625 }
2626
2627 pub async fn get_or_create(&self, name: &str, agent_type: &str) -> Result<AgentNamespace> {
2629 if let Some(ns) = self.get_by_name(name).await? {
2630 return Ok(ns);
2631 }
2632
2633 let result = sqlx::query(
2634 "INSERT INTO agent_namespaces (name, agent_type, created_at) VALUES (?, ?, ?)",
2635 )
2636 .bind(name)
2637 .bind(agent_type)
2638 .bind(Utc::now())
2639 .execute(&self.pool)
2640 .await
2641 .map_err(db_error)?;
2642
2643 let id = result.last_insert_rowid();
2644 Ok(AgentNamespace {
2645 id,
2646 name: name.to_string(),
2647 description: None,
2648 agent_type: agent_type.to_string(),
2649 created_at: Utc::now(),
2650 updated_at: None,
2651 })
2652 }
2653
2654 pub async fn get_by_name(&self, name: &str) -> Result<Option<AgentNamespace>> {
2656 let row: Option<AgentNamespaceRow> =
2657 sqlx::query_as("SELECT * FROM agent_namespaces WHERE name = ?")
2658 .bind(name)
2659 .fetch_optional(&self.pool)
2660 .await
2661 .map_err(db_error)?;
2662
2663 Ok(row.map(|r| AgentNamespace {
2664 id: r.id,
2665 name: r.name,
2666 description: r.description,
2667 agent_type: r.agent_type,
2668 created_at: r.created_at,
2669 updated_at: r.updated_at,
2670 }))
2671 }
2672
2673 pub async fn list_all(&self) -> Result<Vec<AgentNamespace>> {
2675 let rows: Vec<AgentNamespaceRow> =
2676 sqlx::query_as("SELECT * FROM agent_namespaces ORDER BY name")
2677 .fetch_all(&self.pool)
2678 .await
2679 .map_err(db_error)?;
2680
2681 Ok(rows
2682 .into_iter()
2683 .map(|r| AgentNamespace {
2684 id: r.id,
2685 name: r.name,
2686 description: r.description,
2687 agent_type: r.agent_type,
2688 created_at: r.created_at,
2689 updated_at: r.updated_at,
2690 })
2691 .collect())
2692 }
2693}
2694
2695pub struct ProcessedFileRepository<'a> {
2697 pub pool: &'a SqlitePool,
2698}
2699
2700impl<'a> ProcessedFileRepository<'a> {
2701 pub fn new(pool: &'a SqlitePool) -> Self {
2702 Self { pool }
2703 }
2704
2705 pub async fn is_processed(&self, namespace_id: i64, path: &str) -> Result<bool> {
2707 let row: Option<(i64,)> =
2708 sqlx::query_as("SELECT id FROM processed_files WHERE namespace_id = ? AND path = ? AND status = 'completed'")
2709 .bind(namespace_id)
2710 .bind(path)
2711 .fetch_optional(self.pool)
2712 .await
2713 .map_err(db_error)?;
2714
2715 Ok(row.is_some())
2716 }
2717
2718 pub async fn get_completed_paths(
2720 &self,
2721 namespace_id: i64,
2722 ) -> Result<std::collections::HashSet<String>> {
2723 let rows: Vec<(String,)> = sqlx::query_as(
2724 "SELECT path FROM processed_files WHERE namespace_id = ? AND status = 'completed'",
2725 )
2726 .bind(namespace_id)
2727 .fetch_all(self.pool)
2728 .await
2729 .map_err(db_error)?;
2730
2731 Ok(rows.into_iter().map(|r| r.0).collect())
2732 }
2733
2734 pub async fn mark_processing(
2736 &self,
2737 namespace_id: i64,
2738 path: &str,
2739 content_hash: Option<&str>,
2740 ) -> Result<i64> {
2741 let id: i64 = sqlx::query_scalar(
2742 r#"
2743 INSERT INTO processed_files (namespace_id, path, content_hash, status, updated_at)
2744 VALUES (?, ?, ?, 'processing', datetime('now'))
2745 ON CONFLICT(namespace_id, path) DO UPDATE SET
2746 content_hash = excluded.content_hash,
2747 status = 'processing',
2748 updated_at = datetime('now')
2749 RETURNING id
2750 "#,
2751 )
2752 .bind(namespace_id)
2753 .bind(path)
2754 .bind(content_hash)
2755 .fetch_one(self.pool)
2756 .await
2757 .map_err(db_error)?;
2758
2759 Ok(id)
2760 }
2761
2762 pub async fn mark_processed(&self, id: i64, memory_id: i64) -> Result<()> {
2764 sqlx::query(
2765 r#"
2766 UPDATE processed_files
2767 SET status = 'completed', memory_id = ?, processed_at = datetime('now'), updated_at = datetime('now')
2768 WHERE id = ?
2769 "#
2770 )
2771 .bind(memory_id)
2772 .bind(id)
2773 .execute(self.pool)
2774 .await
2775 .map_err(db_error)?;
2776
2777 Ok(())
2778 }
2779
2780 pub async fn mark_failed(&self, id: i64, error: &str) -> Result<()> {
2782 sqlx::query(
2783 r#"
2784 UPDATE processed_files
2785 SET status = 'failed', last_error = ?, updated_at = datetime('now')
2786 WHERE id = ?
2787 "#,
2788 )
2789 .bind(error)
2790 .bind(id)
2791 .execute(self.pool)
2792 .await
2793 .map_err(db_error)?;
2794
2795 Ok(())
2796 }
2797
2798 pub async fn get_pending(
2800 &self,
2801 namespace_id: i64,
2802 limit: i32,
2803 ) -> Result<Vec<ProcessedFileRow>> {
2804 let rows = sqlx::query_as::<_, ProcessedFileRow>(
2805 r#"
2806 SELECT * FROM processed_files
2807 WHERE namespace_id = ? AND status = 'pending'
2808 ORDER BY created_at ASC
2809 LIMIT ?
2810 "#,
2811 )
2812 .bind(namespace_id)
2813 .bind(limit)
2814 .fetch_all(self.pool)
2815 .await
2816 .map_err(db_error)?;
2817
2818 Ok(rows)
2819 }
2820
2821 pub async fn clear_namespace(&self, namespace_id: i64) -> Result<u64> {
2823 let result = sqlx::query("DELETE FROM processed_files WHERE namespace_id = ?")
2824 .bind(namespace_id)
2825 .execute(self.pool)
2826 .await
2827 .map_err(db_error)?;
2828
2829 Ok(result.rows_affected())
2830 }
2831}
2832
2833pub struct MemoryRelationRepository<'a> {
2835 pub pool: &'a SqlitePool,
2836}
2837
2838impl<'a> MemoryRelationRepository<'a> {
2839 pub fn new(pool: &'a SqlitePool) -> Self {
2840 Self { pool }
2841 }
2842
2843 pub async fn store(
2845 &self,
2846 source_id: i64,
2847 target_id: i64,
2848 relation_type: &str,
2849 strength: f32,
2850 ) -> Result<i64> {
2851 let id: i64 = sqlx::query_scalar(
2852 r#"
2853 INSERT INTO memory_relations (source_memory_id, target_memory_id, relation_type, strength, created_at)
2854 VALUES (?, ?, ?, ?, datetime('now'))
2855 ON CONFLICT(source_memory_id, target_memory_id, relation_type) DO UPDATE SET
2856 strength = excluded.strength,
2857 created_at = datetime('now')
2858 RETURNING id
2859 "#
2860 )
2861 .bind(source_id)
2862 .bind(target_id)
2863 .bind(relation_type)
2864 .bind(strength)
2865 .fetch_one(self.pool)
2866 .await
2867 .map_err(db_error)?;
2868
2869 Ok(id)
2870 }
2871
2872 pub async fn get_related(&self, memory_id: i64) -> Result<Vec<(i64, String, f32)>> {
2874 let rows: Vec<(i64, String, f32)> = sqlx::query_as(
2875 r#"
2876 SELECT target_memory_id as memory_id, relation_type, strength
2877 FROM memory_relations
2878 WHERE source_memory_id = ?
2879 UNION
2880 SELECT source_memory_id as memory_id, relation_type, strength
2881 FROM memory_relations
2882 WHERE target_memory_id = ?
2883 ORDER BY strength DESC
2884 "#,
2885 )
2886 .bind(memory_id)
2887 .bind(memory_id)
2888 .fetch_all(self.pool)
2889 .await
2890 .map_err(db_error)?;
2891
2892 Ok(rows)
2893 }
2894
2895 pub async fn delete_for_memory(&self, memory_id: i64) -> Result<u64> {
2897 let result = sqlx::query(
2898 "DELETE FROM memory_relations WHERE source_memory_id = ? OR target_memory_id = ?",
2899 )
2900 .bind(memory_id)
2901 .bind(memory_id)
2902 .execute(self.pool)
2903 .await
2904 .map_err(db_error)?;
2905
2906 Ok(result.rows_affected())
2907 }
2908}
2909
2910fn parse_category(s: &str) -> Result<Category> {
2911 match MemoryCategory::parse(s) {
2912 Some(cat) => Ok(cat),
2913 None => Err(nexus_core::NexusError::Storage(format!(
2914 "Unknown memory category '{s}' persisted in database; row may be corrupted"
2915 ))),
2916 }
2917}
2918
2919fn parse_memory_lane_type(s: &str) -> Result<Option<MemoryLaneType>> {
2920 match MemoryLaneType::parse(s) {
2921 Some(t) => Ok(Some(t)),
2922 None => Err(nexus_core::NexusError::Storage(format!(
2923 "Unknown memory_lane_type '{s}' persisted in database; row may be corrupted"
2924 ))),
2925 }
2926}
2927
2928#[cfg(test)]
2929mod tests {
2930 use super::*;
2931 use nexus_core::MemoryLanePriorityType;
2932 use sqlx::sqlite::SqlitePoolOptions;
2933
2934 fn cognitive_metadata(
2935 level: CognitiveLevel,
2936 perspective: &PerspectiveKey,
2937 times_reinforced: i64,
2938 times_contradicted: i64,
2939 ) -> serde_json::Value {
2940 serde_json::json!({
2941 "cognitive": {
2942 "level": level.as_str(),
2943 "observer": perspective.observer,
2944 "subject": perspective.subject,
2945 "session_key": perspective.session_key,
2946 "source_memory_ids": [],
2947 "confidence": 0.9,
2948 "times_reinforced": times_reinforced,
2949 "times_contradicted": times_contradicted,
2950 "generated_by": "test",
2951 }
2952 })
2953 }
2954
2955 #[test]
2956 fn test_parse_category() {
2957 assert!(matches!(parse_category("facts"), Ok(Category::Facts)));
2958 assert!(matches!(
2959 parse_category("preferences"),
2960 Ok(Category::Preferences)
2961 ));
2962 assert!(parse_category("unknown").is_err());
2963 }
2964
2965 #[test]
2966 fn test_parse_memory_lane_type() {
2967 let correction = parse_memory_lane_type("correction");
2968 assert!(matches!(
2969 correction,
2970 Ok(Some(MemoryLaneType::Priority(
2971 MemoryLanePriorityType::Correction
2972 )))
2973 ));
2974
2975 let pattern_seed = parse_memory_lane_type("pattern_seed");
2976 assert!(matches!(
2977 pattern_seed,
2978 Ok(Some(MemoryLaneType::Priority(
2979 MemoryLanePriorityType::PatternSeed
2980 )))
2981 ));
2982
2983 assert!(parse_memory_lane_type("unknown").is_err());
2984 }
2985
2986 #[test]
2987 fn test_parse_category_all_variants() {
2988 assert!(matches!(parse_category("general"), Ok(Category::General)));
2989 assert!(matches!(parse_category("session"), Ok(Category::Session)));
2990 assert!(matches!(parse_category("context"), Ok(Category::Context)));
2991 assert!(matches!(
2992 parse_category("specifications"),
2993 Ok(Category::Specifications)
2994 ));
2995 assert!(matches!(parse_category("facts"), Ok(Category::Facts)));
2996 assert!(matches!(
2997 parse_category("preferences"),
2998 Ok(Category::Preferences)
2999 ));
3000 assert!(parse_category("bogus").is_err());
3002 assert!(parse_category("").is_err());
3003 }
3004
3005 #[test]
3006 fn test_store_memory_params_fields() {
3007 let params = StoreMemoryParams {
3009 namespace_id: 1,
3010 content: "test content",
3011 category: &Category::General,
3012 memory_lane_type: None,
3013 labels: &[],
3014 metadata: &serde_json::Value::Null,
3015 embedding: None,
3016 embedding_model: None,
3017 };
3018 assert_eq!(params.namespace_id, 1);
3019 assert_eq!(params.content, "test content");
3020 assert!(params.labels.is_empty());
3021 }
3022
3023 #[test]
3024 fn test_merge_duplicate_metadata_preserves_multiple_session_keys() {
3025 let existing = serde_json::json!({
3026 "cognitive": {
3027 "session_key": "session-a"
3028 },
3029 "source": {
3030 "derived_session_key": "session-a"
3031 }
3032 });
3033 let incoming = serde_json::json!({
3034 "cognitive": {
3035 "session_key": "session-b"
3036 },
3037 "source": {
3038 "derived_session_key": "session-b"
3039 }
3040 });
3041
3042 let merged = merge_duplicate_metadata(&existing, &incoming);
3043 assert_eq!(merged["cognitive"]["session_key"], "session-b");
3044 assert_eq!(
3045 merged["cognitive"]["session_keys"],
3046 serde_json::json!(["session-a", "session-b"])
3047 );
3048 assert_eq!(
3049 merged["source"]["derived_session_keys"],
3050 serde_json::json!(["session-a", "session-b"])
3051 );
3052 }
3053
3054 async fn setup_test_db() -> SqlitePool {
3057 let pool = SqlitePoolOptions::new()
3058 .max_connections(1)
3059 .connect("sqlite::memory:")
3060 .await
3061 .unwrap();
3062 crate::migrations::run_migrations(&pool).await.unwrap();
3063 pool
3064 }
3065
3066 async fn create_namespace(pool: &SqlitePool, name: &str) -> i64 {
3067 let ns = NamespaceRepository::new(pool.clone());
3068 ns.get_or_create(name, "test").await.unwrap();
3069 ns.get_by_name(name).await.unwrap().unwrap().id
3070 }
3071
3072 #[tokio::test]
3078 async fn test_get_by_content_matches_actual_content() {
3079 let pool = setup_test_db().await;
3080 let ns_id = create_namespace(&pool, "test-agent").await;
3081 let repo = MemoryRepository::new(pool);
3082
3083 let mem_a = repo
3085 .store(StoreMemoryParams {
3086 namespace_id: ns_id,
3087 content: "first memory content",
3088 category: &Category::General,
3089 memory_lane_type: None,
3090 labels: &[],
3091 metadata: &serde_json::Value::Null,
3092 embedding: None,
3093 embedding_model: None,
3094 })
3095 .await
3096 .unwrap();
3097
3098 let mem_b = repo
3099 .store(StoreMemoryParams {
3100 namespace_id: ns_id,
3101 content: "second memory content",
3102 category: &Category::General,
3103 memory_lane_type: None,
3104 labels: &[],
3105 metadata: &serde_json::Value::Null,
3106 embedding: None,
3107 embedding_model: None,
3108 })
3109 .await
3110 .unwrap();
3111
3112 assert_ne!(mem_a.id, mem_b.id);
3113
3114 let found_a = repo
3117 .get_by_content(ns_id, "first memory content")
3118 .await
3119 .unwrap();
3120 assert_eq!(found_a.id, mem_a.id);
3121 assert_eq!(found_a.content, "first memory content");
3122
3123 let found_b = repo
3124 .get_by_content(ns_id, "second memory content")
3125 .await
3126 .unwrap();
3127 assert_eq!(found_b.id, mem_b.id);
3128 assert_eq!(found_b.content, "second memory content");
3129
3130 let result = repo.get_by_content(ns_id, "nonexistent").await;
3132 assert!(result.is_err());
3133 }
3134
3135 #[tokio::test]
3136 async fn test_enqueue_and_claim_jobs() {
3137 let pool = setup_test_db().await;
3138 let ns_id = create_namespace(&pool, "test-agent").await;
3139 let repo = MemoryRepository::new(pool);
3140
3141 let id1 = repo
3143 .enqueue_job(EnqueueJobParams {
3144 namespace_id: ns_id,
3145 job_type: "derive_memory",
3146 priority: 100,
3147 perspective: None,
3148 payload: &serde_json::json!({"memory_id": 1}),
3149 })
3150 .await
3151 .unwrap();
3152
3153 let id2 = repo
3154 .enqueue_job(EnqueueJobParams {
3155 namespace_id: ns_id,
3156 job_type: "derive_memory",
3157 priority: 50,
3158 perspective: None,
3159 payload: &serde_json::json!({"memory_id": 2}),
3160 })
3161 .await
3162 .unwrap();
3163
3164 assert!(id1 > 0);
3165 assert!(id2 > 0);
3166 assert_ne!(id1, id2);
3167
3168 let claimed = repo
3170 .claim_jobs(ns_id, "derive_memory", "worker-1", 120, 1)
3171 .await
3172 .unwrap();
3173
3174 assert_eq!(claimed.len(), 1);
3175 assert_eq!(claimed[0].row.id, id1); assert_eq!(claimed[0].row.status, "running");
3177 assert_eq!(claimed[0].payload["memory_id"], 1);
3178
3179 let claimed2 = repo
3181 .claim_jobs(ns_id, "derive_memory", "worker-2", 120, 1)
3182 .await
3183 .unwrap();
3184
3185 assert_eq!(claimed2.len(), 1);
3186 assert_eq!(claimed2[0].row.id, id2);
3187 }
3188
3189 #[tokio::test]
3190 async fn test_complete_and_fail_job() {
3191 let pool = setup_test_db().await;
3192 let ns_id = create_namespace(&pool, "test-agent").await;
3193 let repo = MemoryRepository::new(pool);
3194
3195 let _id = repo
3196 .enqueue_job(EnqueueJobParams {
3197 namespace_id: ns_id,
3198 job_type: "digest_session",
3199 priority: 100,
3200 perspective: None,
3201 payload: &serde_json::json!({"session": "s1"}),
3202 })
3203 .await
3204 .unwrap();
3205
3206 let claimed = repo
3208 .claim_jobs(ns_id, "digest_session", "w", 60, 10)
3209 .await
3210 .unwrap();
3211 assert_eq!(claimed.len(), 1);
3212
3213 repo.complete_job(&claimed[0]).await.unwrap();
3214
3215 let claimed_again = repo
3217 .claim_jobs(ns_id, "digest_session", "w", 60, 10)
3218 .await
3219 .unwrap();
3220 assert!(claimed_again.is_empty());
3221 }
3222
3223 #[tokio::test]
3224 async fn test_fail_job_requeues_before_limit() {
3225 let pool = setup_test_db().await;
3226 let ns_id = create_namespace(&pool, "test-agent").await;
3227 let repo = MemoryRepository::new(pool);
3228
3229 let _id = repo
3230 .enqueue_job(EnqueueJobParams {
3231 namespace_id: ns_id,
3232 job_type: "derive_memory",
3233 priority: 100,
3234 perspective: None,
3235 payload: &serde_json::json!({"test": true}),
3236 })
3237 .await
3238 .unwrap();
3239
3240 let claimed = repo
3242 .claim_jobs(ns_id, "derive_memory", "w", 60, 10)
3243 .await
3244 .unwrap();
3245 repo.fail_job(&claimed[0], "transient error").await.unwrap();
3246
3247 let reclaimed = repo
3248 .claim_jobs(ns_id, "derive_memory", "w", 60, 10)
3249 .await
3250 .unwrap();
3251 assert_eq!(reclaimed.len(), 1);
3252 assert_eq!(reclaimed[0].row.attempts, 2);
3253 }
3254
3255 #[tokio::test]
3256 async fn test_complete_job_requires_matching_claim_token() {
3257 let pool = setup_test_db().await;
3258 let ns_id = create_namespace(&pool, "test-agent").await;
3259 let repo = MemoryRepository::new(pool);
3260
3261 repo.enqueue_job(EnqueueJobParams {
3262 namespace_id: ns_id,
3263 job_type: "derive_memory",
3264 priority: 100,
3265 perspective: None,
3266 payload: &serde_json::json!({"memory_id": 7}),
3267 })
3268 .await
3269 .unwrap();
3270
3271 let claimed = repo
3272 .claim_jobs(ns_id, "derive_memory", "worker-1", 60, 1)
3273 .await
3274 .unwrap();
3275 let mut forged = claimed[0].clone();
3276 forged.row.claim_token = Some("forged-token".to_string());
3277
3278 let error = repo.complete_job(&forged).await.unwrap_err();
3279 assert!(error.to_string().contains("lost lease ownership"));
3280 }
3281
3282 #[tokio::test]
3283 async fn test_store_digest_and_latest_digest() {
3284 let pool = setup_test_db().await;
3285 let ns_id = create_namespace(&pool, "test-agent").await;
3286 let repo = MemoryRepository::new(pool);
3287
3288 let digest_memory = repo
3290 .store(StoreMemoryParams {
3291 namespace_id: ns_id,
3292 content: "session summary short",
3293 category: &Category::Session,
3294 memory_lane_type: None,
3295 labels: &[],
3296 metadata: &serde_json::json!({"cognitive": {"level": "summary_short"}}),
3297 embedding: None,
3298 embedding_model: None,
3299 })
3300 .await
3301 .unwrap();
3302
3303 let digest_id = repo
3304 .store_digest(StoreDigestParams {
3305 namespace_id: ns_id,
3306 session_key: "session-abc",
3307 digest_kind: "short",
3308 memory_id: digest_memory.id,
3309 start_memory_id: Some(1),
3310 end_memory_id: Some(100),
3311 token_count: 42,
3312 })
3313 .await
3314 .unwrap();
3315
3316 assert!(digest_id > 0);
3317
3318 let result = repo
3320 .latest_digest_for_session(ns_id, "session-abc", "short")
3321 .await
3322 .unwrap();
3323
3324 assert!(result.is_some());
3325 assert_eq!(result.as_ref().unwrap().id, digest_memory.id);
3326
3327 let replacement_memory = repo
3328 .store(StoreMemoryParams {
3329 namespace_id: ns_id,
3330 content: "session summary short updated",
3331 category: &Category::Session,
3332 memory_lane_type: None,
3333 labels: &[],
3334 metadata: &serde_json::json!({"cognitive": {"level": "summary_short"}}),
3335 embedding: None,
3336 embedding_model: None,
3337 })
3338 .await
3339 .unwrap();
3340
3341 let replacement_digest_id = repo
3342 .store_digest(StoreDigestParams {
3343 namespace_id: ns_id,
3344 session_key: "session-abc",
3345 digest_kind: "short",
3346 memory_id: replacement_memory.id,
3347 start_memory_id: Some(1),
3348 end_memory_id: Some(100),
3349 token_count: 64,
3350 })
3351 .await
3352 .unwrap();
3353
3354 assert_eq!(replacement_digest_id, digest_id);
3355
3356 let updated = repo
3357 .latest_digest_for_session(ns_id, "session-abc", "short")
3358 .await
3359 .unwrap()
3360 .unwrap();
3361 assert_eq!(updated.id, replacement_memory.id);
3362
3363 let latest_for_namespace = repo
3364 .latest_digest_for_namespace(ns_id, "short")
3365 .await
3366 .unwrap()
3367 .unwrap();
3368 assert_eq!(latest_for_namespace.id, replacement_memory.id);
3369 }
3370
3371 #[tokio::test]
3372 async fn test_session_digest_rollover_reports_new_signal_since_last_digest() {
3373 let pool = setup_test_db().await;
3374 let ns_id = create_namespace(&pool, "test-agent").await;
3375 let repo = MemoryRepository::new(pool);
3376
3377 let source = repo
3378 .store(StoreMemoryParams {
3379 namespace_id: ns_id,
3380 content: "Implemented bounded digest rollover policy.",
3381 category: &Category::Session,
3382 memory_lane_type: None,
3383 labels: &[],
3384 metadata: &serde_json::json!({
3385 "cognitive": {
3386 "level": "explicit",
3387 "observer": "claude-code",
3388 "subject": "claude-code",
3389 "session_key": "session-rollover"
3390 }
3391 }),
3392 embedding: None,
3393 embedding_model: None,
3394 })
3395 .await
3396 .unwrap();
3397
3398 let first = repo
3399 .session_digest_rollover(ns_id, "session-rollover")
3400 .await
3401 .unwrap();
3402 assert_eq!(first.last_digest_end_memory_id, None);
3403 assert_eq!(first.new_memory_count, 1);
3404 assert!(first.estimated_new_tokens > 0);
3405
3406 let digest_memory = repo
3407 .store(StoreMemoryParams {
3408 namespace_id: ns_id,
3409 content: "Short digest",
3410 category: &Category::Session,
3411 memory_lane_type: None,
3412 labels: &[],
3413 metadata: &serde_json::json!({
3414 "cognitive": {
3415 "level": "summary_short",
3416 "observer": "claude-code",
3417 "subject": "claude-code",
3418 "session_key": "session-rollover"
3419 }
3420 }),
3421 embedding: None,
3422 embedding_model: None,
3423 })
3424 .await
3425 .unwrap();
3426
3427 repo.store_digest(StoreDigestParams {
3428 namespace_id: ns_id,
3429 session_key: "session-rollover",
3430 digest_kind: "short",
3431 memory_id: digest_memory.id,
3432 start_memory_id: Some(source.id),
3433 end_memory_id: Some(source.id),
3434 token_count: 16,
3435 })
3436 .await
3437 .unwrap();
3438
3439 let covered = repo
3440 .session_digest_rollover(ns_id, "session-rollover")
3441 .await
3442 .unwrap();
3443 assert_eq!(covered.last_digest_end_memory_id, Some(source.id));
3444 assert_eq!(covered.new_memory_count, 0);
3445 assert_eq!(covered.estimated_new_tokens, 0);
3446
3447 repo.store(StoreMemoryParams {
3448 namespace_id: ns_id,
3449 content: "Added one more explicit memory after the digest coverage window.",
3450 category: &Category::Session,
3451 memory_lane_type: None,
3452 labels: &[],
3453 metadata: &serde_json::json!({
3454 "cognitive": {
3455 "level": "explicit",
3456 "observer": "claude-code",
3457 "subject": "claude-code",
3458 "session_key": "session-rollover"
3459 }
3460 }),
3461 embedding: None,
3462 embedding_model: None,
3463 })
3464 .await
3465 .unwrap();
3466
3467 let second = repo
3468 .session_digest_rollover(ns_id, "session-rollover")
3469 .await
3470 .unwrap();
3471 assert_eq!(second.last_digest_end_memory_id, Some(source.id));
3472 assert_eq!(second.new_memory_count, 1);
3473 assert!(second.estimated_new_tokens > 0);
3474 }
3475
3476 #[tokio::test]
3477 async fn test_store_with_lineage() {
3478 let pool = setup_test_db().await;
3479 let ns_id = create_namespace(&pool, "test-agent").await;
3480 let repo = MemoryRepository::new(pool);
3481
3482 let source1 = repo
3484 .store(StoreMemoryParams {
3485 namespace_id: ns_id,
3486 content: "raw observation one",
3487 category: &Category::Facts,
3488 memory_lane_type: None,
3489 labels: &[],
3490 metadata: &serde_json::Value::Null,
3491 embedding: None,
3492 embedding_model: None,
3493 })
3494 .await
3495 .unwrap();
3496
3497 let source2 = repo
3498 .store(StoreMemoryParams {
3499 namespace_id: ns_id,
3500 content: "raw observation two",
3501 category: &Category::Facts,
3502 memory_lane_type: None,
3503 labels: &[],
3504 metadata: &serde_json::Value::Null,
3505 embedding: None,
3506 embedding_model: None,
3507 })
3508 .await
3509 .unwrap();
3510
3511 let derived = repo
3513 .store_with_lineage(StoreMemoryWithLineageParams {
3514 store: StoreMemoryParams {
3515 namespace_id: ns_id,
3516 content: "derived insight",
3517 category: &Category::Facts,
3518 memory_lane_type: None,
3519 labels: &[],
3520 metadata: &serde_json::json!({"cognitive": {"level": "derived"}}),
3521 embedding: None,
3522 embedding_model: None,
3523 },
3524 source_memory_ids: &[source1.id, source2.id],
3525 evidence_role: "derived_from",
3526 })
3527 .await
3528 .unwrap();
3529
3530 assert_eq!(derived.content, "derived insight");
3531
3532 let lineage = repo.load_lineage(derived.id).await.unwrap();
3534 assert_eq!(lineage.len(), 2);
3535 assert!(lineage.iter().any(|e| e.source_memory_id == source1.id));
3536 assert!(lineage.iter().any(|e| e.source_memory_id == source2.id));
3537 }
3538
3539 #[tokio::test]
3540 async fn test_cognitive_queries_by_level_and_perspective() {
3541 let pool = setup_test_db().await;
3542 let ns_id = create_namespace(&pool, "test-agent").await;
3543 let repo = MemoryRepository::new(pool);
3544 let perspective =
3545 PerspectiveKey::new("claude-code", "claude-code", Some("session-1".into()));
3546
3547 let _raw = repo
3548 .store(StoreMemoryParams {
3549 namespace_id: ns_id,
3550 content: "raw note",
3551 category: &Category::Session,
3552 memory_lane_type: None,
3553 labels: &[],
3554 metadata: &cognitive_metadata(CognitiveLevel::Raw, &perspective, 0, 0),
3555 embedding: None,
3556 embedding_model: None,
3557 })
3558 .await
3559 .unwrap();
3560
3561 let explicit = repo
3562 .store(StoreMemoryParams {
3563 namespace_id: ns_id,
3564 content: "explicit note",
3565 category: &Category::Session,
3566 memory_lane_type: None,
3567 labels: &[],
3568 metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective, 1, 0),
3569 embedding: None,
3570 embedding_model: None,
3571 })
3572 .await
3573 .unwrap();
3574
3575 let derived = repo
3576 .store(StoreMemoryParams {
3577 namespace_id: ns_id,
3578 content: "reinforced insight",
3579 category: &Category::Facts,
3580 memory_lane_type: None,
3581 labels: &[],
3582 metadata: &cognitive_metadata(CognitiveLevel::Derived, &perspective, 7, 0),
3583 embedding: None,
3584 embedding_model: None,
3585 })
3586 .await
3587 .unwrap();
3588
3589 let contradiction = repo
3590 .store(StoreMemoryParams {
3591 namespace_id: ns_id,
3592 content: "contradiction note",
3593 category: &Category::Facts,
3594 memory_lane_type: None,
3595 labels: &[],
3596 metadata: &cognitive_metadata(CognitiveLevel::Contradiction, &perspective, 1, 5),
3597 embedding: None,
3598 embedding_model: None,
3599 })
3600 .await
3601 .unwrap();
3602
3603 let explicit_rows = repo
3604 .get_by_cognitive_level(ns_id, CognitiveLevel::Explicit, 10)
3605 .await
3606 .unwrap();
3607 assert_eq!(explicit_rows.len(), 1);
3608 assert_eq!(explicit_rows[0].id, explicit.id);
3609
3610 let recent = repo
3611 .get_recent_by_perspective(ns_id, &perspective, 10)
3612 .await
3613 .unwrap();
3614 assert_eq!(recent.len(), 4);
3615
3616 let reinforced = repo
3617 .get_most_reinforced_by_perspective(ns_id, &perspective, 10)
3618 .await
3619 .unwrap();
3620 assert_eq!(reinforced[0].id, derived.id);
3621 assert!(reinforced
3622 .iter()
3623 .all(|memory| memory.id != contradiction.id));
3624
3625 let contradictions = repo
3626 .get_contradictions_by_perspective(ns_id, &perspective, 10)
3627 .await
3628 .unwrap();
3629 assert_eq!(contradictions.len(), 1);
3630 assert_eq!(contradictions[0].id, contradiction.id);
3631 }
3632
3633 #[tokio::test]
3634 async fn test_store_distilled_summary_archives_sources_and_records_lineage() {
3635 let pool = setup_test_db().await;
3636 let ns_id = create_namespace(&pool, "test-agent").await;
3637 let repo = MemoryRepository::new(pool);
3638
3639 let source1 = repo
3640 .store(StoreMemoryParams {
3641 namespace_id: ns_id,
3642 content: "raw event 1",
3643 category: &Category::Session,
3644 memory_lane_type: None,
3645 labels: &["raw-activity".to_string()],
3646 metadata: &serde_json::json!({"raw_activity": true}),
3647 embedding: None,
3648 embedding_model: None,
3649 })
3650 .await
3651 .unwrap();
3652
3653 let source2 = repo
3654 .store(StoreMemoryParams {
3655 namespace_id: ns_id,
3656 content: "raw event 2",
3657 category: &Category::Session,
3658 memory_lane_type: None,
3659 labels: &["raw-activity".to_string()],
3660 metadata: &serde_json::json!({"raw_activity": true}),
3661 embedding: None,
3662 embedding_model: None,
3663 })
3664 .await
3665 .unwrap();
3666
3667 let summary = repo
3668 .store_distilled_summary(
3669 StoreMemoryParams {
3670 namespace_id: ns_id,
3671 content: "distilled summary",
3672 category: &Category::Session,
3673 memory_lane_type: None,
3674 labels: &["activity-summary".to_string()],
3675 metadata: &serde_json::json!({"pipeline": "distill-v1"}),
3676 embedding: None,
3677 embedding_model: None,
3678 },
3679 &[source1.id, source2.id],
3680 )
3681 .await
3682 .unwrap();
3683
3684 let source1_after = repo.get_by_id(source1.id).await.unwrap().unwrap();
3685 let source2_after = repo.get_by_id(source2.id).await.unwrap().unwrap();
3686 assert!(!source1_after.is_active);
3687 assert!(source1_after.is_archived);
3688 assert!(!source2_after.is_active);
3689 assert!(source2_after.is_archived);
3690
3691 let lineage = repo.load_lineage(summary.id).await.unwrap();
3692 assert_eq!(lineage.len(), 2);
3693 assert!(lineage.iter().all(|entry| entry.evidence_role == "source"));
3694 }
3695
3696 #[tokio::test]
3697 async fn test_load_lineage_empty() {
3698 let pool = setup_test_db().await;
3699 let _ns_id = create_namespace(&pool, "test-agent").await;
3700 let repo = MemoryRepository::new(pool);
3701
3702 let lineage = repo.load_lineage(9999).await.unwrap();
3703 assert!(lineage.is_empty());
3704 }
3705
3706 #[tokio::test]
3709 async fn test_recent_perspective_excludes_raw_noise() {
3710 let pool = setup_test_db().await;
3711 let ns_id = create_namespace(&pool, "test-agent").await;
3712 let repo = MemoryRepository::new(pool);
3713 let perspective = PerspectiveKey::new("claude-code", "claude-code", Some("s1".into()));
3714
3715 repo.store(StoreMemoryParams {
3717 namespace_id: ns_id,
3718 content: "clean observation",
3719 category: &Category::Facts,
3720 memory_lane_type: None,
3721 labels: &[],
3722 metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective, 0, 0),
3723 embedding: None,
3724 embedding_model: None,
3725 })
3726 .await
3727 .unwrap();
3728
3729 repo.store(StoreMemoryParams {
3731 namespace_id: ns_id,
3732 content: "raw noise payload",
3733 category: &Category::Session,
3734 memory_lane_type: None,
3735 labels: &["raw-activity".to_string()],
3736 metadata: &serde_json::json!({
3737 "raw_activity": true,
3738 "cognitive": {
3739 "level": "raw",
3740 "observer": perspective.observer,
3741 "subject": perspective.subject,
3742 "session_key": perspective.session_key,
3743 "source_memory_ids": [],
3744 "confidence": 0.5,
3745 "times_reinforced": 0,
3746 "times_contradicted": 0,
3747 "generated_by": "test"
3748 }
3749 }),
3750 embedding: None,
3751 embedding_model: None,
3752 })
3753 .await
3754 .unwrap();
3755
3756 let recent = repo
3758 .get_recent_by_perspective(ns_id, &perspective, 10)
3759 .await
3760 .unwrap();
3761 assert_eq!(recent.len(), 1);
3762 assert_eq!(recent[0].content, "clean observation");
3763
3764 let recent_all = repo
3766 .get_recent_by_perspective_opts(ns_id, &perspective, 10, true)
3767 .await
3768 .unwrap();
3769 assert_eq!(recent_all.len(), 2);
3770 }
3771
3772 #[tokio::test]
3773 async fn test_semantic_candidates_respect_perspective_and_raw_noise_filtering() {
3774 let pool = setup_test_db().await;
3775 let ns_id = create_namespace(&pool, "test-agent").await;
3776 let repo = MemoryRepository::new(pool);
3777 let perspective = PerspectiveKey::new("claude-code", "claude-code", Some("s1".into()));
3778
3779 repo.store(StoreMemoryParams {
3780 namespace_id: ns_id,
3781 content: "clean semantic observation",
3782 category: &Category::Facts,
3783 memory_lane_type: None,
3784 labels: &[],
3785 metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective, 0, 0),
3786 embedding: Some(&[0.1_f32; 384]),
3787 embedding_model: Some("mock"),
3788 })
3789 .await
3790 .unwrap();
3791
3792 repo.store(StoreMemoryParams {
3793 namespace_id: ns_id,
3794 content: "raw semantic noise",
3795 category: &Category::Session,
3796 memory_lane_type: None,
3797 labels: &["raw-activity".to_string()],
3798 metadata: &serde_json::json!({
3799 "raw_activity": true,
3800 "cognitive": {
3801 "level": "raw",
3802 "observer": "claude-code",
3803 "subject": "claude-code",
3804 "session_key": "s1",
3805 "generated_by": "test"
3806 }
3807 }),
3808 embedding: Some(&[0.2_f32; 384]),
3809 embedding_model: Some("mock"),
3810 })
3811 .await
3812 .unwrap();
3813
3814 repo.store(StoreMemoryParams {
3815 namespace_id: ns_id,
3816 content: "other perspective semantic",
3817 category: &Category::Facts,
3818 memory_lane_type: None,
3819 labels: &[],
3820 metadata: &serde_json::json!({
3821 "cognitive": {
3822 "level": "explicit",
3823 "observer": "codex",
3824 "subject": "codex",
3825 "session_key": "s1",
3826 "generated_by": "test"
3827 }
3828 }),
3829 embedding: Some(&[0.3_f32; 384]),
3830 embedding_model: Some("mock"),
3831 })
3832 .await
3833 .unwrap();
3834
3835 let candidates = repo
3836 .get_semantic_candidates(SemanticCandidateParams {
3837 namespace_id: ns_id,
3838 perspective: Some(&perspective),
3839 limit: 10,
3840 include_raw: false,
3841 })
3842 .await
3843 .unwrap();
3844
3845 assert_eq!(candidates.len(), 1);
3846 assert_eq!(candidates[0].content, "clean semantic observation");
3847 }
3848
3849 #[tokio::test]
3850 async fn test_semantic_candidates_match_session_keys_array() {
3851 let pool = setup_test_db().await;
3852 let ns_id = create_namespace(&pool, "test-agent").await;
3853 let repo = MemoryRepository::new(pool);
3854 let perspective = PerspectiveKey::new("claude-code", "claude-code", Some("s-array".into()));
3855
3856 repo.store(StoreMemoryParams {
3857 namespace_id: ns_id,
3858 content: "session array semantic observation",
3859 category: &Category::Facts,
3860 memory_lane_type: None,
3861 labels: &[],
3862 metadata: &serde_json::json!({
3863 "cognitive": {
3864 "level": "explicit",
3865 "observer": "claude-code",
3866 "subject": "claude-code",
3867 "session_keys": ["s-array", "s-other"],
3868 "generated_by": "test"
3869 }
3870 }),
3871 embedding: Some(&[0.4_f32; 384]),
3872 embedding_model: Some("mock"),
3873 })
3874 .await
3875 .unwrap();
3876
3877 let candidates = repo
3878 .get_semantic_candidates(SemanticCandidateParams {
3879 namespace_id: ns_id,
3880 perspective: Some(&perspective),
3881 limit: 10,
3882 include_raw: false,
3883 })
3884 .await
3885 .unwrap();
3886
3887 assert_eq!(candidates.len(), 1);
3888 assert_eq!(candidates[0].content, "session array semantic observation");
3889 }
3890
3891 #[tokio::test]
3892 async fn test_reinforced_perspective_excludes_raw_noise() {
3893 let pool = setup_test_db().await;
3894 let ns_id = create_namespace(&pool, "test-agent").await;
3895 let repo = MemoryRepository::new(pool);
3896 let perspective = PerspectiveKey::new("claude-code", "claude-code", Some("s1".into()));
3897
3898 repo.store(StoreMemoryParams {
3899 namespace_id: ns_id,
3900 content: "reinforced insight",
3901 category: &Category::Facts,
3902 memory_lane_type: None,
3903 labels: &[],
3904 metadata: &cognitive_metadata(CognitiveLevel::Derived, &perspective, 5, 0),
3905 embedding: None,
3906 embedding_model: None,
3907 })
3908 .await
3909 .unwrap();
3910
3911 repo.store(StoreMemoryParams {
3912 namespace_id: ns_id,
3913 content: "raw noise",
3914 category: &Category::Session,
3915 memory_lane_type: None,
3916 labels: &["raw-activity".to_string()],
3917 metadata: &serde_json::json!({
3918 "raw_activity": true,
3919 "cognitive": {
3920 "level": "raw",
3921 "observer": perspective.observer,
3922 "subject": perspective.subject,
3923 "session_key": perspective.session_key,
3924 "source_memory_ids": [],
3925 "confidence": 0.5,
3926 "times_reinforced": 0,
3927 "times_contradicted": 0,
3928 "generated_by": "test"
3929 }
3930 }),
3931 embedding: None,
3932 embedding_model: None,
3933 })
3934 .await
3935 .unwrap();
3936
3937 let reinforced = repo
3938 .get_most_reinforced_by_perspective(ns_id, &perspective, 10)
3939 .await
3940 .unwrap();
3941 assert_eq!(reinforced.len(), 1);
3942 assert_eq!(reinforced[0].content, "reinforced insight");
3943 }
3944
3945 #[tokio::test]
3946 async fn test_contradictions_perspective_excludes_raw_noise() {
3947 let pool = setup_test_db().await;
3948 let ns_id = create_namespace(&pool, "test-agent").await;
3949 let repo = MemoryRepository::new(pool);
3950 let perspective = PerspectiveKey::new("claude-code", "claude-code", Some("s1".into()));
3951
3952 repo.store(StoreMemoryParams {
3953 namespace_id: ns_id,
3954 content: "a real contradiction",
3955 category: &Category::Facts,
3956 memory_lane_type: None,
3957 labels: &[],
3958 metadata: &cognitive_metadata(CognitiveLevel::Contradiction, &perspective, 0, 3),
3959 embedding: None,
3960 embedding_model: None,
3961 })
3962 .await
3963 .unwrap();
3964
3965 repo.store(StoreMemoryParams {
3966 namespace_id: ns_id,
3967 content: "raw noise",
3968 category: &Category::Session,
3969 memory_lane_type: None,
3970 labels: &["raw-activity".to_string()],
3971 metadata: &serde_json::json!({
3972 "raw_activity": true,
3973 "cognitive": {
3974 "level": "raw",
3975 "observer": perspective.observer,
3976 "subject": perspective.subject,
3977 "session_key": perspective.session_key,
3978 "source_memory_ids": [],
3979 "confidence": 0.5,
3980 "times_reinforced": 0,
3981 "times_contradicted": 0,
3982 "generated_by": "test"
3983 }
3984 }),
3985 embedding: None,
3986 embedding_model: None,
3987 })
3988 .await
3989 .unwrap();
3990
3991 let contradictions = repo
3992 .get_contradictions_by_perspective(ns_id, &perspective, 10)
3993 .await
3994 .unwrap();
3995 assert_eq!(contradictions.len(), 1);
3996 assert_eq!(contradictions[0].content, "a real contradiction");
3997 }
3998
3999 #[tokio::test]
4002 async fn test_search_working_set_basic() {
4003 let pool = setup_test_db().await;
4004 let ns_id = create_namespace(&pool, "test-agent").await;
4005 let repo = MemoryRepository::new(pool);
4006 let perspective = PerspectiveKey::new("claude-code", "claude-code", Some("s1".into()));
4007
4008 let _raw = repo
4010 .store(StoreMemoryParams {
4011 namespace_id: ns_id,
4012 content: "raw note",
4013 category: &Category::Session,
4014 memory_lane_type: None,
4015 labels: &[],
4016 metadata: &cognitive_metadata(CognitiveLevel::Raw, &perspective, 0, 0),
4017 embedding: None,
4018 embedding_model: None,
4019 })
4020 .await
4021 .unwrap();
4022
4023 let explicit = repo
4024 .store(StoreMemoryParams {
4025 namespace_id: ns_id,
4026 content: "explicit fact",
4027 category: &Category::Facts,
4028 memory_lane_type: None,
4029 labels: &[],
4030 metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective, 3, 0),
4031 embedding: None,
4032 embedding_model: None,
4033 })
4034 .await
4035 .unwrap();
4036
4037 let derived = repo
4038 .store(StoreMemoryParams {
4039 namespace_id: ns_id,
4040 content: "derived insight",
4041 category: &Category::Facts,
4042 memory_lane_type: None,
4043 labels: &[],
4044 metadata: &cognitive_metadata(CognitiveLevel::Derived, &perspective, 8, 0),
4045 embedding: None,
4046 embedding_model: None,
4047 })
4048 .await
4049 .unwrap();
4050
4051 let contradiction = repo
4052 .store(StoreMemoryParams {
4053 namespace_id: ns_id,
4054 content: "contradiction",
4055 category: &Category::Facts,
4056 memory_lane_type: None,
4057 labels: &[],
4058 metadata: &cognitive_metadata(CognitiveLevel::Contradiction, &perspective, 0, 2),
4059 embedding: None,
4060 embedding_model: None,
4061 })
4062 .await
4063 .unwrap();
4064
4065 let result = repo
4066 .search_working_set(WorkingSetParams {
4067 namespace_id: ns_id,
4068 perspective: Some(&perspective),
4069 max_items: 20,
4070 include_raw: false,
4071 })
4072 .await
4073 .unwrap();
4074
4075 assert!(result.len() >= 3);
4078 let ids: Vec<i64> = result.iter().map(|m| m.id).collect();
4079 assert!(ids.contains(&explicit.id));
4080 assert!(ids.contains(&derived.id));
4081 assert!(ids.contains(&contradiction.id));
4082 }
4083
4084 #[tokio::test]
4085 async fn test_search_working_set_dedupes() {
4086 let pool = setup_test_db().await;
4087 let ns_id = create_namespace(&pool, "test-agent").await;
4088 let repo = MemoryRepository::new(pool);
4089 let perspective = PerspectiveKey::new("claude-code", "claude-code", Some("s1".into()));
4090
4091 let shared = repo
4093 .store(StoreMemoryParams {
4094 namespace_id: ns_id,
4095 content: "shared memory",
4096 category: &Category::Facts,
4097 memory_lane_type: None,
4098 labels: &[],
4099 metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective, 10, 0),
4100 embedding: None,
4101 embedding_model: None,
4102 })
4103 .await
4104 .unwrap();
4105
4106 let result = repo
4107 .search_working_set(WorkingSetParams {
4108 namespace_id: ns_id,
4109 perspective: Some(&perspective),
4110 max_items: 20,
4111 include_raw: false,
4112 })
4113 .await
4114 .unwrap();
4115
4116 let count = result.iter().filter(|m| m.id == shared.id).count();
4117 assert_eq!(count, 1, "shared memory should appear exactly once");
4118 }
4119
4120 #[tokio::test]
4121 async fn test_search_working_set_respects_max_items() {
4122 let pool = setup_test_db().await;
4123 let ns_id = create_namespace(&pool, "test-agent").await;
4124 let repo = MemoryRepository::new(pool);
4125 let perspective = PerspectiveKey::new("claude-code", "claude-code", Some("s1".into()));
4126
4127 for i in 0..10 {
4128 let content = format!("memory {}", i);
4129 repo.store(StoreMemoryParams {
4130 namespace_id: ns_id,
4131 content: &content,
4132 category: &Category::Facts,
4133 memory_lane_type: None,
4134 labels: &[],
4135 metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective, i as i64, 0),
4136 embedding: None,
4137 embedding_model: None,
4138 })
4139 .await
4140 .unwrap();
4141 }
4142
4143 let result = repo
4144 .search_working_set(WorkingSetParams {
4145 namespace_id: ns_id,
4146 perspective: Some(&perspective),
4147 max_items: 3,
4148 include_raw: false,
4149 })
4150 .await
4151 .unwrap();
4152
4153 assert_eq!(result.len(), 3);
4154 }
4155
4156 #[tokio::test]
4157 async fn test_search_working_set_excludes_raw_noise() {
4158 let pool = setup_test_db().await;
4159 let ns_id = create_namespace(&pool, "test-agent").await;
4160 let repo = MemoryRepository::new(pool);
4161 let perspective = PerspectiveKey::new("claude-code", "claude-code", Some("s1".into()));
4162
4163 repo.store(StoreMemoryParams {
4164 namespace_id: ns_id,
4165 content: "real observation",
4166 category: &Category::Facts,
4167 memory_lane_type: None,
4168 labels: &[],
4169 metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective, 1, 0),
4170 embedding: None,
4171 embedding_model: None,
4172 })
4173 .await
4174 .unwrap();
4175
4176 repo.store(StoreMemoryParams {
4177 namespace_id: ns_id,
4178 content: "raw noise",
4179 category: &Category::Session,
4180 memory_lane_type: None,
4181 labels: &["raw-activity".to_string()],
4182 metadata: &serde_json::json!({"raw_activity": true, "cognitive": {"level": "raw"}}),
4183 embedding: None,
4184 embedding_model: None,
4185 })
4186 .await
4187 .unwrap();
4188
4189 let result = repo
4190 .search_working_set(WorkingSetParams {
4191 namespace_id: ns_id,
4192 perspective: Some(&perspective),
4193 max_items: 20,
4194 include_raw: false,
4195 })
4196 .await
4197 .unwrap();
4198
4199 assert!(result.iter().all(|m| m.content != "raw noise"));
4200 assert!(result.iter().any(|m| m.content == "real observation"));
4201 }
4202
4203 #[tokio::test]
4204 async fn test_search_working_set_without_perspective() {
4205 let pool = setup_test_db().await;
4206 let ns_id = create_namespace(&pool, "test-agent").await;
4207 let repo = MemoryRepository::new(pool);
4208
4209 repo.store(StoreMemoryParams {
4210 namespace_id: ns_id,
4211 content: "namespace memory one",
4212 category: &Category::Facts,
4213 memory_lane_type: None,
4214 labels: &[],
4215 metadata: &serde_json::json!({"cognitive": {"level": "explicit"}}),
4216 embedding: None,
4217 embedding_model: None,
4218 })
4219 .await
4220 .unwrap();
4221
4222 repo.store(StoreMemoryParams {
4223 namespace_id: ns_id,
4224 content: "namespace memory two",
4225 category: &Category::Facts,
4226 memory_lane_type: None,
4227 labels: &[],
4228 metadata: &serde_json::json!({"cognitive": {"level": "explicit"}}),
4229 embedding: None,
4230 embedding_model: None,
4231 })
4232 .await
4233 .unwrap();
4234
4235 let result = repo
4236 .search_working_set(WorkingSetParams {
4237 namespace_id: ns_id,
4238 perspective: None,
4239 max_items: 20,
4240 include_raw: false,
4241 })
4242 .await
4243 .unwrap();
4244
4245 assert!(result.len() >= 2);
4246 }
4247
4248 #[tokio::test]
4249 async fn test_list_by_session_key_matches_session_keys_array() {
4250 let pool = setup_test_db().await;
4251 let ns_id = create_namespace(&pool, "test-agent").await;
4252 let repo = MemoryRepository::new(pool);
4253
4254 repo.store(StoreMemoryParams {
4255 namespace_id: ns_id,
4256 content: "shared explicit memory",
4257 category: &Category::Facts,
4258 memory_lane_type: None,
4259 labels: &[],
4260 metadata: &serde_json::json!({
4261 "cognitive": {
4262 "level": "explicit",
4263 "session_key": "session-b",
4264 "session_keys": ["session-a", "session-b"]
4265 }
4266 }),
4267 embedding: None,
4268 embedding_model: None,
4269 })
4270 .await
4271 .unwrap();
4272
4273 let session_a = repo
4274 .list_by_session_key(ns_id, "session-a", 10, false)
4275 .await
4276 .unwrap();
4277 let session_b = repo
4278 .list_by_session_key(ns_id, "session-b", 10, false)
4279 .await
4280 .unwrap();
4281
4282 assert_eq!(session_a.len(), 1);
4283 assert_eq!(session_b.len(), 1);
4284 }
4285
4286 #[tokio::test]
4287 async fn test_count_evidence_returns_zero_for_empty_namespace() {
4288 let pool = setup_test_db().await;
4289 let ns_id = create_namespace(&pool, "test-agent").await;
4290 let repo = MemoryRepository::new(pool);
4291
4292 let count = repo.count_evidence(ns_id).await.unwrap();
4293 assert_eq!(count, 0);
4294 }
4295
4296 #[tokio::test]
4297 async fn test_count_evidence_counts_lineage_edges() {
4298 let pool = setup_test_db().await;
4299 let ns_id = create_namespace(&pool, "test-agent").await;
4300 let repo = MemoryRepository::new(pool);
4301
4302 let source = repo
4303 .store(StoreMemoryParams {
4304 namespace_id: ns_id,
4305 content: "source memory",
4306 category: &Category::Session,
4307 memory_lane_type: None,
4308 labels: &[],
4309 metadata: &serde_json::json!({}),
4310 embedding: None,
4311 embedding_model: None,
4312 })
4313 .await
4314 .unwrap();
4315
4316 let _derived = repo
4317 .store_with_lineage(StoreMemoryWithLineageParams {
4318 store: StoreMemoryParams {
4319 namespace_id: ns_id,
4320 content: "derived with evidence",
4321 category: &Category::Facts,
4322 memory_lane_type: None,
4323 labels: &[],
4324 metadata: &serde_json::json!({"cognitive": {"level": "derived"}}),
4325 embedding: None,
4326 embedding_model: None,
4327 },
4328 source_memory_ids: &[source.id],
4329 evidence_role: "source",
4330 })
4331 .await
4332 .unwrap();
4333
4334 let count = repo.count_evidence(ns_id).await.unwrap();
4335 assert_eq!(count, 1);
4336 }
4337
4338 #[tokio::test]
4339 async fn test_count_evidence_does_not_count_other_namespace() {
4340 let pool = setup_test_db().await;
4341 let ns_a = create_namespace(&pool, "agent-a").await;
4342 let ns_b = create_namespace(&pool, "agent-b").await;
4343 let repo = MemoryRepository::new(pool);
4344
4345 let source = repo
4346 .store(StoreMemoryParams {
4347 namespace_id: ns_a,
4348 content: "source in ns-a",
4349 category: &Category::Session,
4350 memory_lane_type: None,
4351 labels: &[],
4352 metadata: &serde_json::json!({}),
4353 embedding: None,
4354 embedding_model: None,
4355 })
4356 .await
4357 .unwrap();
4358
4359 let _derived = repo
4360 .store_with_lineage(StoreMemoryWithLineageParams {
4361 store: StoreMemoryParams {
4362 namespace_id: ns_a,
4363 content: "derived in ns-a",
4364 category: &Category::Facts,
4365 memory_lane_type: None,
4366 labels: &[],
4367 metadata: &serde_json::json!({}),
4368 embedding: None,
4369 embedding_model: None,
4370 },
4371 source_memory_ids: &[source.id],
4372 evidence_role: "source",
4373 })
4374 .await
4375 .unwrap();
4376
4377 assert_eq!(repo.count_evidence(ns_a).await.unwrap(), 1);
4378 assert_eq!(repo.count_evidence(ns_b).await.unwrap(), 0);
4379 }
4380
4381 #[tokio::test]
4382 async fn test_count_by_cognitive_level_returns_matching_total() {
4383 let pool = setup_test_db().await;
4384 let ns_id = create_namespace(&pool, "level-counts").await;
4385 let repo = MemoryRepository::new(pool);
4386
4387 for (content, level) in [
4388 ("raw event", CognitiveLevel::Raw),
4389 ("derived insight", CognitiveLevel::Derived),
4390 ("derived insight 2", CognitiveLevel::Derived),
4391 ("contradiction note", CognitiveLevel::Contradiction),
4392 ] {
4393 repo.store(StoreMemoryParams {
4394 namespace_id: ns_id,
4395 content,
4396 category: &Category::Session,
4397 memory_lane_type: None,
4398 labels: &[],
4399 metadata: &serde_json::json!({
4400 "cognitive": {
4401 "level": level.as_str(),
4402 "observer": "claude-code",
4403 "subject": "claude-code",
4404 "generated_by": "test"
4405 }
4406 }),
4407 embedding: None,
4408 embedding_model: None,
4409 })
4410 .await
4411 .unwrap();
4412 }
4413
4414 assert_eq!(
4415 repo.count_by_cognitive_level(ns_id, CognitiveLevel::Derived)
4416 .await
4417 .unwrap(),
4418 2
4419 );
4420 assert_eq!(
4421 repo.count_by_cognitive_level(ns_id, CognitiveLevel::Contradiction)
4422 .await
4423 .unwrap(),
4424 1
4425 );
4426 }
4427
4428 #[tokio::test]
4432 async fn test_get_by_cognitive_level_with_perspective_filters_before_limit() {
4433 let pool = setup_test_db().await;
4434 let ns_id = create_namespace(&pool, "perspective-limit").await;
4435 let repo = MemoryRepository::new(pool);
4436
4437 let perspective_a = PerspectiveKey::new("alice", "project-x", None);
4438 let perspective_b = PerspectiveKey::new("bob", "project-y", None);
4439
4440 for i in 0..5 {
4442 repo.store(StoreMemoryParams {
4443 namespace_id: ns_id,
4444 content: &format!("alice memory {}", i),
4445 category: &Category::Facts,
4446 memory_lane_type: None,
4447 labels: &[],
4448 metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective_a, 0, 0),
4449 embedding: None,
4450 embedding_model: None,
4451 })
4452 .await
4453 .unwrap();
4454 }
4455
4456 for i in 0..5 {
4458 repo.store(StoreMemoryParams {
4459 namespace_id: ns_id,
4460 content: &format!("bob memory {}", i),
4461 category: &Category::Facts,
4462 memory_lane_type: None,
4463 labels: &[],
4464 metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective_b, 0, 0),
4465 embedding: None,
4466 embedding_model: None,
4467 })
4468 .await
4469 .unwrap();
4470 }
4471
4472 let alice_results = repo
4474 .get_by_cognitive_level_with_perspective(
4475 ns_id,
4476 CognitiveLevel::Explicit,
4477 &perspective_a,
4478 3,
4479 )
4480 .await
4481 .unwrap();
4482 assert_eq!(alice_results.len(), 3);
4483 assert!(alice_results.iter().all(|m| {
4484 let meta = &m.metadata;
4485 let obs = meta
4486 .get("cognitive")
4487 .and_then(|c| c.get("observer"))
4488 .and_then(|v| v.as_str());
4489 let sub = meta
4490 .get("cognitive")
4491 .and_then(|c| c.get("subject"))
4492 .and_then(|v| v.as_str());
4493 obs == Some("alice") && sub == Some("project-x")
4494 }));
4495
4496 let alice_many = repo
4498 .get_by_cognitive_level_with_perspective(
4499 ns_id,
4500 CognitiveLevel::Explicit,
4501 &perspective_a,
4502 10,
4503 )
4504 .await
4505 .unwrap();
4506 assert_eq!(alice_many.len(), 5);
4507
4508 let bob_results = repo
4510 .get_by_cognitive_level_with_perspective(
4511 ns_id,
4512 CognitiveLevel::Explicit,
4513 &perspective_b,
4514 3,
4515 )
4516 .await
4517 .unwrap();
4518 assert_eq!(bob_results.len(), 3);
4519 assert!(bob_results.iter().all(|m| {
4520 let meta = &m.metadata;
4521 let obs = meta
4522 .get("cognitive")
4523 .and_then(|c| c.get("observer"))
4524 .and_then(|v| v.as_str());
4525 let sub = meta
4526 .get("cognitive")
4527 .and_then(|c| c.get("subject"))
4528 .and_then(|v| v.as_str());
4529 obs == Some("bob") && sub == Some("project-y")
4530 }));
4531 }
4532
4533 #[tokio::test]
4535 async fn test_get_by_cognitive_level_with_perspective_respects_session_key() {
4536 let pool = setup_test_db().await;
4537 let ns_id = create_namespace(&pool, "session-key-scalar").await;
4538 let repo = MemoryRepository::new(pool);
4539
4540 let perspective_s1 =
4541 PerspectiveKey::new("alice", "project-x", Some("session-1".to_string()));
4542 let perspective_s2 =
4543 PerspectiveKey::new("alice", "project-x", Some("session-2".to_string()));
4544
4545 for i in 0..3 {
4546 repo.store(StoreMemoryParams {
4547 namespace_id: ns_id,
4548 content: &format!("s1 memory {}", i),
4549 category: &Category::Facts,
4550 memory_lane_type: None,
4551 labels: &[],
4552 metadata: &cognitive_metadata(CognitiveLevel::Derived, &perspective_s1, 0, 0),
4553 embedding: None,
4554 embedding_model: None,
4555 })
4556 .await
4557 .unwrap();
4558 }
4559 for i in 0..3 {
4560 repo.store(StoreMemoryParams {
4561 namespace_id: ns_id,
4562 content: &format!("s2 memory {}", i),
4563 category: &Category::Facts,
4564 memory_lane_type: None,
4565 labels: &[],
4566 metadata: &cognitive_metadata(CognitiveLevel::Derived, &perspective_s2, 0, 0),
4567 embedding: None,
4568 embedding_model: None,
4569 })
4570 .await
4571 .unwrap();
4572 }
4573
4574 let s1_results = repo
4575 .get_by_cognitive_level_with_perspective(
4576 ns_id,
4577 CognitiveLevel::Derived,
4578 &perspective_s1,
4579 10,
4580 )
4581 .await
4582 .unwrap();
4583 assert_eq!(s1_results.len(), 3);
4584 assert!(s1_results.iter().all(|m| m.content.starts_with("s1")));
4585
4586 let s2_results = repo
4587 .get_by_cognitive_level_with_perspective(
4588 ns_id,
4589 CognitiveLevel::Derived,
4590 &perspective_s2,
4591 10,
4592 )
4593 .await
4594 .unwrap();
4595 assert_eq!(s2_results.len(), 3);
4596 assert!(s2_results.iter().all(|m| m.content.starts_with("s2")));
4597 }
4598
4599 #[tokio::test]
4601 async fn test_get_by_cognitive_level_with_perspective_matches_session_keys_array() {
4602 let pool = setup_test_db().await;
4603 let ns_id = create_namespace(&pool, "session-keys-array").await;
4604 let repo = MemoryRepository::new(pool);
4605
4606 let perspective = PerspectiveKey::new("alice", "project-x", Some("session-a".to_string()));
4607
4608 repo.store(StoreMemoryParams {
4610 namespace_id: ns_id,
4611 content: "scalar match",
4612 category: &Category::Facts,
4613 memory_lane_type: None,
4614 labels: &[],
4615 metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective, 0, 0),
4616 embedding: None,
4617 embedding_model: None,
4618 })
4619 .await
4620 .unwrap();
4621
4622 repo.store(StoreMemoryParams {
4624 namespace_id: ns_id,
4625 content: "array match",
4626 category: &Category::Facts,
4627 memory_lane_type: None,
4628 labels: &[],
4629 metadata: &serde_json::json!({
4630 "cognitive": {
4631 "level": "explicit",
4632 "observer": "alice",
4633 "subject": "project-x",
4634 "session_key": "session-other",
4635 "session_keys": ["session-a", "session-b"],
4636 "generated_by": "test"
4637 }
4638 }),
4639 embedding: None,
4640 embedding_model: None,
4641 })
4642 .await
4643 .unwrap();
4644
4645 repo.store(StoreMemoryParams {
4647 namespace_id: ns_id,
4648 content: "no match",
4649 category: &Category::Facts,
4650 memory_lane_type: None,
4651 labels: &[],
4652 metadata: &serde_json::json!({
4653 "cognitive": {
4654 "level": "explicit",
4655 "observer": "alice",
4656 "subject": "project-x",
4657 "session_key": "session-other",
4658 "session_keys": ["session-z"],
4659 "generated_by": "test"
4660 }
4661 }),
4662 embedding: None,
4663 embedding_model: None,
4664 })
4665 .await
4666 .unwrap();
4667
4668 let results = repo
4669 .get_by_cognitive_level_with_perspective(
4670 ns_id,
4671 CognitiveLevel::Explicit,
4672 &perspective,
4673 10,
4674 )
4675 .await
4676 .unwrap();
4677 assert_eq!(results.len(), 2);
4678 let contents: Vec<_> = results.iter().map(|m| m.content.as_str()).collect();
4679 assert!(contents.contains(&"scalar match"));
4680 assert!(contents.contains(&"array match"));
4681 }
4682
4683 #[tokio::test]
4684 async fn test_record_metric_and_latest_metrics_for_namespace() {
4685 let pool = setup_test_db().await;
4686 let ns_id = create_namespace(&pool, "metric-ns").await;
4687 let other_ns = create_namespace(&pool, "metric-other").await;
4688 let repo = MemoryRepository::new(pool);
4689
4690 repo.record_metric(
4691 "cognition.query.total_ms",
4692 12.5,
4693 &serde_json::json!({"namespace_id": ns_id, "stage": "total", "unit": "ms"}),
4694 )
4695 .await
4696 .unwrap();
4697 repo.record_metric(
4698 "cognition.query.total_ms",
4699 18.0,
4700 &serde_json::json!({"namespace_id": other_ns, "stage": "total", "unit": "ms"}),
4701 )
4702 .await
4703 .unwrap();
4704 repo.record_metric(
4705 "cognition.representation.total_ms",
4706 4.0,
4707 &serde_json::json!({"namespace_id": ns_id, "stage": "total", "unit": "ms"}),
4708 )
4709 .await
4710 .unwrap();
4711
4712 let metrics = repo
4713 .latest_metrics_for_namespace(ns_id, Some("cognition."), 10)
4714 .await
4715 .unwrap();
4716
4717 assert_eq!(metrics.len(), 2);
4718 assert!(metrics
4719 .iter()
4720 .all(|metric| metric.labels.contains(&ns_id.to_string())));
4721 assert!(metrics
4722 .iter()
4723 .any(|metric| metric.metric_name == "cognition.query.total_ms"));
4724 assert!(metrics
4725 .iter()
4726 .any(|metric| metric.metric_name == "cognition.representation.total_ms"));
4727 assert!(metrics
4728 .iter()
4729 .all(|metric| { metric.metric_name.starts_with("cognition.") }));
4730 }
4731
4732 #[tokio::test]
4733 async fn test_record_metrics_batch_persists_all_samples() {
4734 let pool = setup_test_db().await;
4735 let ns_id = create_namespace(&pool, "metric-batch").await;
4736 let repo = MemoryRepository::new(pool);
4737
4738 repo.record_metrics_batch(&[
4739 MetricSample {
4740 metric_name: "cognition.query.total_ms".to_string(),
4741 metric_value: 9.5,
4742 labels: serde_json::json!({"namespace_id": ns_id, "stage": "total", "unit": "ms"}),
4743 },
4744 MetricSample {
4745 metric_name: "cognition.query.answer.total_tokens".to_string(),
4746 metric_value: 128.0,
4747 labels: serde_json::json!({"namespace_id": ns_id, "stage": "answer", "unit": "tokens"}),
4748 },
4749 ])
4750 .await
4751 .unwrap();
4752
4753 let metrics = repo
4754 .latest_metrics_for_namespace(ns_id, Some("cognition."), 10)
4755 .await
4756 .unwrap();
4757
4758 assert_eq!(metrics.len(), 2);
4759 }
4760
4761 #[tokio::test]
4764 async fn test_list_jobs_returns_enqueued_jobs() {
4765 let pool = setup_test_db().await;
4766 let ns_id = create_namespace(&pool, "obs-jobs").await;
4767 let repo = MemoryRepository::new(pool);
4768
4769 repo.enqueue_job(EnqueueJobParams {
4770 namespace_id: ns_id,
4771 job_type: "derive",
4772 priority: 10,
4773 perspective: None,
4774 payload: &serde_json::json!({"a": 1}),
4775 })
4776 .await
4777 .unwrap();
4778
4779 repo.enqueue_job(EnqueueJobParams {
4780 namespace_id: ns_id,
4781 job_type: "digest",
4782 priority: 5,
4783 perspective: None,
4784 payload: &serde_json::json!({"b": 2}),
4785 })
4786 .await
4787 .unwrap();
4788
4789 let all = repo.list_jobs(ns_id, None, None, 50, 0).await.unwrap();
4791 assert_eq!(all.len(), 2);
4792
4793 let derive_only = repo
4795 .list_jobs(ns_id, Some("derive"), None, 50, 0)
4796 .await
4797 .unwrap();
4798 assert_eq!(derive_only.len(), 1);
4799 assert_eq!(derive_only[0].job_type, "derive");
4800
4801 let pending = repo
4803 .list_jobs(ns_id, None, Some("pending"), 50, 0)
4804 .await
4805 .unwrap();
4806 assert_eq!(pending.len(), 2);
4807
4808 let digest_pending = repo
4810 .list_jobs(ns_id, Some("digest"), Some("pending"), 50, 0)
4811 .await
4812 .unwrap();
4813 assert_eq!(digest_pending.len(), 1);
4814 }
4815
4816 #[tokio::test]
4817 async fn test_list_jobs_respects_limit_offset() {
4818 let pool = setup_test_db().await;
4819 let ns_id = create_namespace(&pool, "obs-limit").await;
4820 let repo = MemoryRepository::new(pool);
4821
4822 for i in 0..5 {
4823 repo.enqueue_job(EnqueueJobParams {
4824 namespace_id: ns_id,
4825 job_type: "derive",
4826 priority: i,
4827 perspective: None,
4828 payload: &serde_json::json!({"i": i}),
4829 })
4830 .await
4831 .unwrap();
4832 }
4833
4834 let page1 = repo.list_jobs(ns_id, None, None, 2, 0).await.unwrap();
4835 assert_eq!(page1.len(), 2);
4836
4837 let page2 = repo.list_jobs(ns_id, None, None, 2, 2).await.unwrap();
4838 assert_eq!(page2.len(), 2);
4839
4840 let page3 = repo.list_jobs(ns_id, None, None, 2, 4).await.unwrap();
4841 assert_eq!(page3.len(), 1);
4842 }
4843
4844 #[tokio::test]
4845 async fn test_count_jobs_by_status() {
4846 let pool = setup_test_db().await;
4847 let ns_id = create_namespace(&pool, "obs-count").await;
4848 let repo = MemoryRepository::new(pool);
4849
4850 repo.enqueue_job(EnqueueJobParams {
4851 namespace_id: ns_id,
4852 job_type: "derive",
4853 priority: 10,
4854 perspective: None,
4855 payload: &serde_json::json!({}),
4856 })
4857 .await
4858 .unwrap();
4859
4860 repo.enqueue_job(EnqueueJobParams {
4861 namespace_id: ns_id,
4862 job_type: "derive",
4863 priority: 5,
4864 perspective: None,
4865 payload: &serde_json::json!({}),
4866 })
4867 .await
4868 .unwrap();
4869
4870 repo.enqueue_job(EnqueueJobParams {
4871 namespace_id: ns_id,
4872 job_type: "digest",
4873 priority: 10,
4874 perspective: None,
4875 payload: &serde_json::json!({}),
4876 })
4877 .await
4878 .unwrap();
4879
4880 let all_counts = repo.count_jobs_by_status(ns_id, None).await.unwrap();
4882 let total: i64 = all_counts.iter().map(|(_, c)| c).sum();
4883 assert_eq!(total, 3);
4884
4885 let derive_counts = repo
4887 .count_jobs_by_status(ns_id, Some("derive"))
4888 .await
4889 .unwrap();
4890 let derive_total: i64 = derive_counts.iter().map(|(_, c)| c).sum();
4891 assert_eq!(derive_total, 2);
4892 }
4893
4894 #[tokio::test]
4895 async fn test_count_jobs_respects_filters() {
4896 let pool = setup_test_db().await;
4897 let ns_id = create_namespace(&pool, "obs-job-total").await;
4898 let repo = MemoryRepository::new(pool);
4899
4900 repo.enqueue_job(EnqueueJobParams {
4901 namespace_id: ns_id,
4902 job_type: "derive",
4903 priority: 10,
4904 perspective: None,
4905 payload: &serde_json::json!({"index": 1}),
4906 })
4907 .await
4908 .unwrap();
4909 repo.enqueue_job(EnqueueJobParams {
4910 namespace_id: ns_id,
4911 job_type: "derive",
4912 priority: 5,
4913 perspective: None,
4914 payload: &serde_json::json!({"index": 2}),
4915 })
4916 .await
4917 .unwrap();
4918 repo.enqueue_job(EnqueueJobParams {
4919 namespace_id: ns_id,
4920 job_type: "digest",
4921 priority: 1,
4922 perspective: None,
4923 payload: &serde_json::json!({"index": 3}),
4924 })
4925 .await
4926 .unwrap();
4927
4928 assert_eq!(repo.count_jobs(ns_id, None, None).await.unwrap(), 3);
4929 assert_eq!(
4930 repo.count_jobs(ns_id, Some("derive"), None).await.unwrap(),
4931 2
4932 );
4933 assert_eq!(
4934 repo.count_jobs(ns_id, Some("derive"), Some("pending"))
4935 .await
4936 .unwrap(),
4937 2
4938 );
4939 assert_eq!(
4940 repo.count_jobs(ns_id, Some("reflect"), Some("pending"))
4941 .await
4942 .unwrap(),
4943 0
4944 );
4945 }
4946
4947 #[tokio::test]
4948 async fn test_list_digests_and_count() {
4949 let pool = setup_test_db().await;
4950 let ns_id = create_namespace(&pool, "obs-digests").await;
4951 let repo = MemoryRepository::new(pool);
4952
4953 let mem = repo
4955 .store(StoreMemoryParams {
4956 namespace_id: ns_id,
4957 content: "digest content",
4958 category: &Category::Session,
4959 memory_lane_type: None,
4960 labels: &[],
4961 metadata: &serde_json::json!({"cognitive": {"level": "summary_short"}}),
4962 embedding: None,
4963 embedding_model: None,
4964 })
4965 .await
4966 .unwrap();
4967
4968 repo.store_digest(StoreDigestParams {
4969 namespace_id: ns_id,
4970 session_key: "session-1",
4971 digest_kind: "short",
4972 memory_id: mem.id,
4973 start_memory_id: Some(1),
4974 end_memory_id: Some(10),
4975 token_count: 50,
4976 })
4977 .await
4978 .unwrap();
4979
4980 repo.store_digest(StoreDigestParams {
4981 namespace_id: ns_id,
4982 session_key: "session-2",
4983 digest_kind: "long",
4984 memory_id: mem.id,
4985 start_memory_id: Some(11),
4986 end_memory_id: Some(20),
4987 token_count: 100,
4988 })
4989 .await
4990 .unwrap();
4991
4992 let all = repo.list_digests(ns_id, None, 50, 0).await.unwrap();
4994 assert_eq!(all.len(), 2);
4995
4996 let total = repo.count_digests(ns_id, None).await.unwrap();
4997 assert_eq!(total, 2);
4998
4999 let sess1 = repo
5001 .list_digests(ns_id, Some("session-1"), 50, 0)
5002 .await
5003 .unwrap();
5004 assert_eq!(sess1.len(), 1);
5005 assert_eq!(sess1[0].session_key, "session-1");
5006
5007 let sess1_count = repo.count_digests(ns_id, Some("session-1")).await.unwrap();
5008 assert_eq!(sess1_count, 1);
5009
5010 let none = repo
5012 .list_digests(ns_id, Some("session-none"), 50, 0)
5013 .await
5014 .unwrap();
5015 assert!(none.is_empty());
5016
5017 let none_count = repo
5018 .count_digests(ns_id, Some("session-none"))
5019 .await
5020 .unwrap();
5021 assert_eq!(none_count, 0);
5022 }
5023
5024 #[tokio::test]
5029 async fn test_row_to_memory_rejects_malformed_labels() {
5030 let pool = setup_test_db().await;
5031 let ns_id = create_namespace(&pool, "test-agent").await;
5032 let repo = MemoryRepository::new(pool);
5033
5034 let memory = repo
5036 .store(StoreMemoryParams {
5037 namespace_id: ns_id,
5038 content: "corruption test labels",
5039 category: &Category::General,
5040 memory_lane_type: None,
5041 labels: &["valid-label".to_string()],
5042 metadata: &serde_json::Value::Null,
5043 embedding: None,
5044 embedding_model: None,
5045 })
5046 .await
5047 .unwrap();
5048
5049 sqlx::query("UPDATE memories SET labels = 'NOT VALID JSON{{{' WHERE id = ?")
5051 .bind(memory.id)
5052 .execute(repo.pool())
5053 .await
5054 .unwrap();
5055
5056 let err = repo.get_by_id(memory.id).await.unwrap_err();
5057 let msg = err.to_string();
5058 assert!(
5059 msg.contains("corrupted labels JSON"),
5060 "expected labels corruption error, got: {msg}"
5061 );
5062 assert!(msg.contains(&memory.id.to_string()));
5063 }
5064
5065 #[tokio::test]
5068 async fn test_row_to_memory_rejects_malformed_metadata() {
5069 let pool = setup_test_db().await;
5070 let repo = MemoryRepository::new(pool);
5071
5072 let row = MemoryRow {
5075 id: 999,
5076 namespace_id: 1,
5077 content: "test".to_string(),
5078 category: "general".to_string(),
5079 memory_lane_type: None,
5080 labels: "[]".to_string(),
5081 metadata: "[truncated".to_string(), similarity_score: None,
5083 relevance_score: None,
5084 content_embedding: None,
5085 embedding_model: None,
5086 created_at: Utc::now(),
5087 updated_at: None,
5088 last_accessed: None,
5089 is_active: true,
5090 is_archived: false,
5091 access_count: 0,
5092 };
5093
5094 let err = repo.row_to_memory(row).unwrap_err();
5095 let msg = err.to_string();
5096 assert!(
5097 msg.contains("corrupted metadata JSON"),
5098 "expected metadata corruption error, got: {msg}"
5099 );
5100 }
5101
5102 #[tokio::test]
5105 async fn test_row_to_memory_rejects_malformed_embedding() {
5106 let pool = setup_test_db().await;
5107 let ns_id = create_namespace(&pool, "test-agent").await;
5108 let repo = MemoryRepository::new(pool);
5109
5110 let memory = repo
5111 .store(StoreMemoryParams {
5112 namespace_id: ns_id,
5113 content: "corruption test embedding",
5114 category: &Category::General,
5115 memory_lane_type: None,
5116 labels: &[],
5117 metadata: &serde_json::Value::Null,
5118 embedding: Some(&[0.1, 0.2, 0.3]),
5119 embedding_model: Some("test-model"),
5120 })
5121 .await
5122 .unwrap();
5123
5124 sqlx::query("UPDATE memories SET content_embedding = 'not-an-array' WHERE id = ?")
5125 .bind(memory.id)
5126 .execute(repo.pool())
5127 .await
5128 .unwrap();
5129
5130 let err = repo.get_by_id(memory.id).await.unwrap_err();
5131 let msg = err.to_string();
5132 assert!(
5133 msg.contains("corrupted embedding JSON"),
5134 "expected embedding corruption error, got: {msg}"
5135 );
5136 }
5137
5138 #[tokio::test]
5141 async fn test_claim_jobs_rejects_malformed_payload() {
5142 let pool = setup_test_db().await;
5143 let ns_id = create_namespace(&pool, "test-agent").await;
5144 let repo = MemoryRepository::new(pool);
5145
5146 sqlx::query(
5148 r#"
5149 INSERT INTO memory_jobs (namespace_id, job_type, status, priority, payload_json, created_at, updated_at)
5150 VALUES (?, 'derive_memory', 'pending', 100, '{INVALID_JSON}', datetime('now'), datetime('now'))
5151 "#,
5152 )
5153 .bind(ns_id)
5154 .execute(repo.pool())
5155 .await
5156 .unwrap();
5157
5158 let claimed = repo
5160 .claim_jobs(ns_id, "derive_memory", "worker-1", 60, 1)
5161 .await
5162 .unwrap();
5163 assert!(
5164 claimed.is_empty(),
5165 "corrupt payload job should not be returned"
5166 );
5167
5168 let status: String =
5170 sqlx::query_scalar("SELECT status FROM memory_jobs WHERE namespace_id = ?")
5171 .bind(ns_id)
5172 .fetch_one(repo.pool())
5173 .await
5174 .unwrap();
5175 assert_eq!(status, "failed", "corrupt job should be permanently failed");
5176
5177 let last_error: Option<String> =
5178 sqlx::query_scalar("SELECT last_error FROM memory_jobs WHERE namespace_id = ?")
5179 .bind(ns_id)
5180 .fetch_one(repo.pool())
5181 .await
5182 .unwrap();
5183 assert!(
5184 last_error
5185 .unwrap_or_default()
5186 .contains("corrupted payload JSON"),
5187 "last_error should mention payload corruption"
5188 );
5189 }
5190
5191 #[tokio::test]
5194 async fn test_claim_jobs_rejects_malformed_perspective() {
5195 let pool = setup_test_db().await;
5196 let ns_id = create_namespace(&pool, "test-agent").await;
5197 let repo = MemoryRepository::new(pool);
5198
5199 sqlx::query(
5201 r#"
5202 INSERT INTO memory_jobs (namespace_id, job_type, status, priority, perspective_json, payload_json, created_at, updated_at)
5203 VALUES (?, 'derive_memory', 'pending', 100, '{BOGUS}', '{"ok": true}', datetime('now'), datetime('now'))
5204 "#,
5205 )
5206 .bind(ns_id)
5207 .execute(repo.pool())
5208 .await
5209 .unwrap();
5210
5211 let claimed = repo
5213 .claim_jobs(ns_id, "derive_memory", "worker-1", 60, 1)
5214 .await
5215 .unwrap();
5216 assert!(
5217 claimed.is_empty(),
5218 "corrupt perspective job should not be returned"
5219 );
5220
5221 let status: String =
5223 sqlx::query_scalar("SELECT status FROM memory_jobs WHERE namespace_id = ?")
5224 .bind(ns_id)
5225 .fetch_one(repo.pool())
5226 .await
5227 .unwrap();
5228 assert_eq!(status, "failed", "corrupt job should be permanently failed");
5229
5230 let last_error: Option<String> =
5231 sqlx::query_scalar("SELECT last_error FROM memory_jobs WHERE namespace_id = ?")
5232 .bind(ns_id)
5233 .fetch_one(repo.pool())
5234 .await
5235 .unwrap();
5236 assert!(
5237 last_error
5238 .unwrap_or_default()
5239 .contains("corrupted perspective JSON"),
5240 "last_error should mention perspective corruption"
5241 );
5242 }
5243
5244 #[tokio::test]
5247 async fn test_claim_jobs_skips_corrupt_returns_valid() {
5248 let pool = setup_test_db().await;
5249 let ns_id = create_namespace(&pool, "test-agent").await;
5250 let repo = MemoryRepository::new(pool);
5251
5252 let p1 = serde_json::json!({"memory_id": 1});
5254 repo.enqueue_job(EnqueueJobParams {
5255 namespace_id: ns_id,
5256 job_type: "derive_memory",
5257 priority: 100,
5258 perspective: None,
5259 payload: &p1,
5260 })
5261 .await
5262 .unwrap();
5263 let p2 = serde_json::json!({"memory_id": 2});
5264 repo.enqueue_job(EnqueueJobParams {
5265 namespace_id: ns_id,
5266 job_type: "derive_memory",
5267 priority: 50,
5268 perspective: None,
5269 payload: &p2,
5270 })
5271 .await
5272 .unwrap();
5273
5274 sqlx::query(
5276 r#"
5277 INSERT INTO memory_jobs (namespace_id, job_type, status, priority, payload_json, created_at, updated_at)
5278 VALUES (?, 'derive_memory', 'pending', 200, '{BROKEN}', datetime('now'), datetime('now'))
5279 "#,
5280 )
5281 .bind(ns_id)
5282 .execute(repo.pool())
5283 .await
5284 .unwrap();
5285
5286 let claimed = repo
5288 .claim_jobs(ns_id, "derive_memory", "worker-1", 60, 10)
5289 .await
5290 .unwrap();
5291 assert_eq!(
5292 claimed.len(),
5293 2,
5294 "should return 2 valid jobs, skipping the corrupt one"
5295 );
5296
5297 let failed_count: i64 = sqlx::query_scalar(
5299 "SELECT COUNT(*) FROM memory_jobs WHERE namespace_id = ? AND status = 'failed'",
5300 )
5301 .bind(ns_id)
5302 .fetch_one(repo.pool())
5303 .await
5304 .unwrap();
5305 assert_eq!(failed_count, 1, "corrupt job should be permanently failed");
5306 }
5307
5308 #[tokio::test]
5312 async fn test_purge_completed_jobs_removes_old_keeps_recent() {
5313 let pool = setup_test_db().await;
5314 let ns_id = create_namespace(&pool, "purge-test").await;
5315 let repo = MemoryRepository::new(pool);
5316
5317 repo.enqueue_job(EnqueueJobParams {
5319 namespace_id: ns_id,
5320 job_type: "derive_memory",
5321 priority: 10,
5322 perspective: None,
5323 payload: &serde_json::json!({"old": true}),
5324 })
5325 .await
5326 .unwrap();
5327
5328 let claimed = repo
5329 .claim_jobs(ns_id, "derive_memory", "w", 60, 10)
5330 .await
5331 .unwrap();
5332 assert_eq!(claimed.len(), 1);
5333 let old_job_id = claimed[0].row.id;
5334
5335 repo.complete_job(&claimed[0]).await.unwrap();
5336
5337 sqlx::query("UPDATE memory_jobs SET updated_at = datetime('now', '-30 days') WHERE id = ?")
5339 .bind(old_job_id)
5340 .execute(repo.pool())
5341 .await
5342 .unwrap();
5343
5344 repo.enqueue_job(EnqueueJobParams {
5346 namespace_id: ns_id,
5347 job_type: "derive_memory",
5348 priority: 10,
5349 perspective: None,
5350 payload: &serde_json::json!({"new": true}),
5351 })
5352 .await
5353 .unwrap();
5354
5355 let claimed2 = repo
5356 .claim_jobs(ns_id, "derive_memory", "w", 60, 10)
5357 .await
5358 .unwrap();
5359 assert_eq!(claimed2.len(), 1);
5360 repo.complete_job(&claimed2[0]).await.unwrap();
5361
5362 let cutoff = Utc::now() - chrono::Duration::days(7);
5364 let deleted = repo.purge_completed_jobs(cutoff).await.unwrap();
5365 assert_eq!(deleted, 1);
5366
5367 let remaining = repo.list_jobs(ns_id, None, None, 50, 0).await.unwrap();
5369 assert_eq!(remaining.len(), 1);
5370 assert_eq!(remaining[0].id, claimed2[0].row.id);
5371 }
5372
5373 #[tokio::test]
5375 async fn test_purge_permanently_failed_jobs_removes_old_keeps_recent() {
5376 let pool = setup_test_db().await;
5377 let ns_id = create_namespace(&pool, "purge-failed").await;
5378 let repo = MemoryRepository::new(pool);
5379
5380 repo.enqueue_job(EnqueueJobParams {
5382 namespace_id: ns_id,
5383 job_type: "derive_memory",
5384 priority: 10,
5385 perspective: None,
5386 payload: &serde_json::json!({"fail_me": true}),
5387 })
5388 .await
5389 .unwrap();
5390
5391 for _ in 0..5 {
5392 let claimed = repo
5393 .claim_jobs(ns_id, "derive_memory", "w", 60, 10)
5394 .await
5395 .unwrap();
5396 assert_eq!(claimed.len(), 1);
5397 repo.fail_job(&claimed[0], "persistent error")
5398 .await
5399 .unwrap();
5400 }
5401
5402 sqlx::query(
5404 "UPDATE memory_jobs SET updated_at = datetime('now', '-30 days') WHERE status = ?",
5405 )
5406 .bind(memory_job_status::FAILED)
5407 .execute(repo.pool())
5408 .await
5409 .unwrap();
5410
5411 repo.enqueue_job(EnqueueJobParams {
5413 namespace_id: ns_id,
5414 job_type: "derive_memory",
5415 priority: 10,
5416 perspective: None,
5417 payload: &serde_json::json!({"retry_me": true}),
5418 })
5419 .await
5420 .unwrap();
5421
5422 for _ in 0..2 {
5423 let claimed = repo
5424 .claim_jobs(ns_id, "derive_memory", "w", 60, 10)
5425 .await
5426 .unwrap();
5427 assert_eq!(claimed.len(), 1);
5428 repo.fail_job(&claimed[0], "transient error").await.unwrap();
5429 }
5430
5431 let cutoff = Utc::now() - chrono::Duration::days(7);
5433 let deleted = repo.purge_permanently_failed_jobs(cutoff).await.unwrap();
5434 assert_eq!(deleted, 1);
5435
5436 let remaining = repo.list_jobs(ns_id, None, None, 50, 0).await.unwrap();
5438 assert_eq!(remaining.len(), 1);
5439 assert_eq!(remaining[0].status, memory_job_status::PENDING);
5440 }
5441
5442 #[tokio::test]
5444 async fn test_active_leasing_works_after_purge() {
5445 let pool = setup_test_db().await;
5446 let ns_id = create_namespace(&pool, "purge-lease").await;
5447 let repo = MemoryRepository::new(pool);
5448
5449 repo.enqueue_job(EnqueueJobParams {
5451 namespace_id: ns_id,
5452 job_type: "derive_memory",
5453 priority: 10,
5454 perspective: None,
5455 payload: &serde_json::json!({"old": true}),
5456 })
5457 .await
5458 .unwrap();
5459
5460 let claimed = repo
5461 .claim_jobs(ns_id, "derive_memory", "w", 60, 10)
5462 .await
5463 .unwrap();
5464 repo.complete_job(&claimed[0]).await.unwrap();
5465
5466 sqlx::query("UPDATE memory_jobs SET updated_at = datetime('now', '-30 days') WHERE id = ?")
5467 .bind(claimed[0].row.id)
5468 .execute(repo.pool())
5469 .await
5470 .unwrap();
5471
5472 let cutoff = Utc::now() - chrono::Duration::days(7);
5474 repo.purge_completed_jobs(cutoff).await.unwrap();
5475
5476 repo.enqueue_job(EnqueueJobParams {
5478 namespace_id: ns_id,
5479 job_type: "derive_memory",
5480 priority: 20,
5481 perspective: None,
5482 payload: &serde_json::json!({"fresh": true}),
5483 })
5484 .await
5485 .unwrap();
5486
5487 let fresh_claimed = repo
5488 .claim_jobs(ns_id, "derive_memory", "worker-2", 120, 10)
5489 .await
5490 .unwrap();
5491 assert_eq!(fresh_claimed.len(), 1);
5492 assert_eq!(fresh_claimed[0].row.status, "running");
5493 assert_eq!(fresh_claimed[0].payload["fresh"], true);
5494
5495 repo.complete_job(&fresh_claimed[0]).await.unwrap();
5497
5498 let empty = repo
5500 .claim_jobs(ns_id, "derive_memory", "worker-3", 60, 10)
5501 .await
5502 .unwrap();
5503 assert!(empty.is_empty());
5504 }
5505}