1use std::collections::{HashMap, HashSet};
4
5use crate::models::{
6 memory_job_status, AgentNamespaceRow, ClaimedMemoryJob, EnqueueJobParams, MemoryEvidenceRow,
7 MemoryJobRow, MemoryLineageEntry, MemoryRow, ProcessedFileRow, SessionDigestRow,
8 SystemMetricRow,
9};
10use crate::{db_error, Result};
11use chrono::{DateTime, Utc};
12use nexus_core::{
13 AgentNamespace, CognitiveLevel, Memory, MemoryCategory, MemoryLaneType, PerspectiveKey,
14};
15use sqlx::SqlitePool;
16
17type Category = MemoryCategory;
19
20pub struct StoreMemoryParams<'a> {
22 pub namespace_id: i64,
23 pub content: &'a str,
24 pub category: &'a Category,
25 pub memory_lane_type: Option<&'a MemoryLaneType>,
26 pub labels: &'a [String],
27 pub metadata: &'a serde_json::Value,
28 pub embedding: Option<&'a [f32]>,
29 pub embedding_model: Option<&'a str>,
30}
31
32pub struct StoreMemoryWithLineageParams<'a> {
34 pub store: StoreMemoryParams<'a>,
35 pub source_memory_ids: &'a [i64],
36 pub evidence_role: &'a str,
37}
38
39pub struct StoreDigestParams<'a> {
41 pub namespace_id: i64,
42 pub session_key: &'a str,
43 pub digest_kind: &'a str,
44 pub memory_id: i64,
45 pub start_memory_id: Option<i64>,
46 pub end_memory_id: Option<i64>,
47 pub token_count: usize,
48}
49
50pub struct SessionDigestRollover {
51 pub last_digest_end_memory_id: Option<i64>,
52 pub new_memory_count: i64,
53 pub estimated_new_tokens: i64,
54}
55
56pub struct ListMemoryFilters<'a> {
57 pub category: Option<&'a str>,
58 pub since: Option<DateTime<Utc>>,
59 pub until: Option<DateTime<Utc>>,
60 pub content_like: Option<&'a str>,
61 pub include_raw: bool,
62 pub limit: i64,
63 pub offset: i64,
64}
65
66pub struct WorkingSetParams<'a> {
68 pub namespace_id: i64,
70 pub perspective: Option<&'a PerspectiveKey>,
73 pub max_items: usize,
75 pub include_raw: bool,
78}
79
80pub struct SemanticCandidateParams<'a> {
82 pub namespace_id: i64,
84 pub perspective: Option<&'a PerspectiveKey>,
86 pub limit: i64,
88 pub include_raw: bool,
90}
91
92#[derive(Debug, Clone)]
94pub struct MetricSample {
95 pub metric_name: String,
96 pub metric_value: f64,
97 pub labels: serde_json::Value,
98}
99
100const MAX_JOB_ATTEMPTS: i64 = 5;
102const RAW_ACTIVITY_FILTER_SQL: &str =
103 "labels NOT LIKE '%raw-activity%' AND json_extract(COALESCE(metadata, '{}'), '$.raw_activity') IS NULL";
104
105const METADATA: &str = "COALESCE(metadata, '{}')";
121
122const SESSION_KEY_FILTER: &str =
127 "(json_extract(METADATA, '$.cognitive.session_key') = ? \
128 OR EXISTS (SELECT 1 FROM json_each(COALESCE(json_extract(metadata, '$.cognitive.session_keys'), '[]')) WHERE value = ?))";
129
130const PERSPECTIVE_IDENTITY_FILTER: &str = "json_extract(METADATA, '$.cognitive.observer') = ? \
134 AND json_extract(METADATA, '$.cognitive.subject') = ?";
135
136const COGNITIVE_LEVEL_EXPR: &str = "json_extract(METADATA, '$.cognitive.level')";
138
139fn perspective_where_clause(perspective: &PerspectiveKey) -> String {
146 if perspective.session_key.is_some() {
147 format!(
148 "{} AND {}",
149 PERSPECTIVE_IDENTITY_FILTER.replace("METADATA", METADATA),
150 SESSION_KEY_FILTER.replace("METADATA", METADATA),
151 )
152 } else {
153 PERSPECTIVE_IDENTITY_FILTER.replace("METADATA", METADATA)
154 }
155}
156
157fn bind_perspective(perspective: &PerspectiveKey) -> Vec<&str> {
160 let mut vals = vec![perspective.observer.as_str(), perspective.subject.as_str()];
161 if let Some(ref sk) = perspective.session_key {
162 vals.push(sk.as_str());
163 vals.push(sk.as_str());
164 }
165 vals
166}
167
168pub struct MemoryRepository {
170 pool: SqlitePool,
171}
172
173impl MemoryRepository {
174 pub fn new(pool: SqlitePool) -> Self {
175 Self { pool }
176 }
177
178 pub fn pool(&self) -> &SqlitePool {
179 &self.pool
180 }
181
182 pub async fn store(&self, params: StoreMemoryParams<'_>) -> Result<Memory> {
184 let labels_json = serde_json::to_string(params.labels)?;
185 let metadata_json = serde_json::to_string(params.metadata)?;
186 let embedding_json = params.embedding.map(serde_json::to_string).transpose()?;
187
188 let result = sqlx::query(
189 r#"
190 INSERT INTO memories (
191 namespace_id, content, category, memory_lane_type, labels, metadata,
192 content_embedding, embedding_model, created_at, is_active, access_count
193 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 1, 0)
194 "#,
195 )
196 .bind(params.namespace_id)
197 .bind(params.content)
198 .bind(params.category.to_string())
199 .bind(params.memory_lane_type.map(|t| t.to_string()))
200 .bind(&labels_json)
201 .bind(&metadata_json)
202 .bind(&embedding_json)
203 .bind(params.embedding_model)
204 .bind(Utc::now())
205 .execute(&self.pool)
206 .await
207 .map_err(db_error)?;
208
209 let id = result.last_insert_rowid();
210
211 if id == 0 {
216 let row: Option<MemoryRow> = sqlx::query_as(
218 "SELECT * FROM memories WHERE namespace_id = ? AND LOWER(TRIM(content)) = LOWER(TRIM(?)) AND is_active = 1 ORDER BY created_at DESC LIMIT 1"
219 )
220 .bind(params.namespace_id)
221 .bind(params.content)
222 .fetch_optional(&self.pool)
223 .await
224 .map_err(db_error)?;
225
226 if let Some(existing) = row {
227 self.merge_duplicate_memory_context(existing.id, params)
228 .await?;
229 return self.get_by_id(existing.id).await?.ok_or_else(|| {
230 nexus_core::NexusError::Storage(format!(
231 "Duplicate merged row {} could not be reloaded",
232 existing.id
233 ))
234 });
235 }
236
237 tracing::warn!(
240 namespace_id = params.namespace_id,
241 content_length = params.content.len(),
242 "Insert returned id 0 but no matching duplicate found - treating as successful insert"
243 );
244
245 return self
248 .get_by_content(params.namespace_id, params.content)
249 .await;
250 }
251
252 self.get_by_id(id).await?.ok_or_else(|| {
253 nexus_core::NexusError::Storage(format!("Failed to retrieve memory with id {}", id))
254 })
255 }
256
257 async fn merge_duplicate_memory_context(
258 &self,
259 existing_id: i64,
260 params: StoreMemoryParams<'_>,
261 ) -> Result<()> {
262 let current = self.get_by_id(existing_id).await?.ok_or_else(|| {
263 nexus_core::NexusError::Storage(format!(
264 "Failed to load duplicate-merged memory {}",
265 existing_id
266 ))
267 })?;
268
269 let merged_labels = merge_labels(¤t.labels, params.labels);
270 let merged_metadata = merge_duplicate_metadata(¤t.metadata, params.metadata);
271 let labels_json = serde_json::to_string(&merged_labels)?;
272 let metadata_json = serde_json::to_string(&merged_metadata)?;
273
274 sqlx::query(
275 r#"
276 UPDATE memories
277 SET labels = ?, metadata = ?, updated_at = ?
278 WHERE id = ?
279 "#,
280 )
281 .bind(&labels_json)
282 .bind(&metadata_json)
283 .bind(Utc::now())
284 .bind(existing_id)
285 .execute(&self.pool)
286 .await
287 .map_err(db_error)?;
288
289 Ok(())
290 }
291
292 pub async fn store_with_lineage(
294 &self,
295 params: StoreMemoryWithLineageParams<'_>,
296 ) -> Result<Memory> {
297 let mut tx = self.pool.begin().await.map_err(db_error)?;
298 let memory_id = insert_memory_tx(&mut tx, ¶ms.store).await?;
299 for &source_id in params.source_memory_ids {
300 insert_evidence_tx(&mut tx, memory_id, source_id, params.evidence_role).await?;
301 }
302 tx.commit().await.map_err(db_error)?;
303
304 self.get_by_id(memory_id).await?.ok_or_else(|| {
305 nexus_core::NexusError::Storage(format!(
306 "Failed to retrieve memory with id {} after lineage store",
307 memory_id
308 ))
309 })
310 }
311
312 pub async fn enqueue_job(&self, params: EnqueueJobParams<'_>) -> Result<i64> {
314 let perspective_json = params.perspective.map(serde_json::to_string).transpose()?;
315 let payload_json = serde_json::to_string(params.payload)?;
316
317 let id: i64 = sqlx::query_scalar(
318 r#"
319 INSERT INTO memory_jobs (namespace_id, job_type, status, priority, perspective_json, payload_json, created_at, updated_at)
320 VALUES (?, ?, 'pending', ?, ?, ?, datetime('now'), datetime('now'))
321 RETURNING id
322 "#,
323 )
324 .bind(params.namespace_id)
325 .bind(params.job_type)
326 .bind(params.priority)
327 .bind(&perspective_json)
328 .bind(&payload_json)
329 .fetch_one(&self.pool)
330 .await
331 .map_err(db_error)?;
332
333 Ok(id)
334 }
335
336 pub async fn claim_jobs(
341 &self,
342 namespace_id: i64,
343 job_type: &str,
344 lease_owner: &str,
345 lease_ttl_secs: u64,
346 limit: i64,
347 ) -> Result<Vec<ClaimedMemoryJob>> {
348 let claim_token = new_claim_token(lease_owner);
349 let rows: Vec<MemoryJobRow> = sqlx::query_as::<_, MemoryJobRow>(
353 r#"
354 WITH candidates AS (
355 SELECT id
356 FROM memory_jobs
357 WHERE namespace_id = ? AND job_type = ? AND (
358 status = ?
359 OR (status = ? AND lease_expires_at IS NOT NULL AND lease_expires_at < datetime('now'))
360 )
361 ORDER BY priority DESC, created_at ASC
362 LIMIT ?
363 )
364 UPDATE memory_jobs
365 SET status = ?,
366 lease_owner = ?,
367 claim_token = ?,
368 lease_expires_at = datetime('now', '+' || ? || ' seconds'),
369 attempts = attempts + 1,
370 updated_at = datetime('now')
371 WHERE id IN (SELECT id FROM candidates)
372 RETURNING *
373 "#,
374 )
375 .bind(namespace_id)
376 .bind(job_type)
377 .bind(memory_job_status::PENDING)
378 .bind(memory_job_status::RUNNING)
379 .bind(limit)
380 .bind(memory_job_status::RUNNING)
381 .bind(lease_owner)
382 .bind(&claim_token)
383 .bind(lease_ttl_secs as i64)
384 .fetch_all(&self.pool)
385 .await
386 .map_err(db_error)?;
387
388 let mut rows = rows;
389 rows.sort_by(|left, right| {
390 right
391 .priority
392 .cmp(&left.priority)
393 .then_with(|| left.created_at.cmp(&right.created_at))
394 });
395
396 let mut claimed = Vec::with_capacity(rows.len());
397 for row in rows {
398 let perspective = match row.perspective_json.as_deref() {
399 Some(s) => match serde_json::from_str(s) {
400 Ok(p) => Some(p),
401 Err(e) => {
402 tracing::warn!(
403 job_id = row.id,
404 error = %e,
405 "corrupted perspective JSON, permanently failing job"
406 );
407 let _ = self
408 .permanently_fail_job(
409 row.id,
410 &row.lease_owner,
411 &row.claim_token,
412 &format!("corrupted perspective JSON: {e}"),
413 )
414 .await;
415 continue;
416 }
417 },
418 None => None,
419 };
420 let payload: serde_json::Value = match serde_json::from_str(&row.payload_json) {
421 Ok(p) => p,
422 Err(e) => {
423 tracing::warn!(
424 job_id = row.id,
425 error = %e,
426 "corrupted payload JSON, permanently failing job"
427 );
428 let _ = self
429 .permanently_fail_job(
430 row.id,
431 &row.lease_owner,
432 &row.claim_token,
433 &format!("corrupted payload JSON: {e}"),
434 )
435 .await;
436 continue;
437 }
438 };
439 claimed.push(ClaimedMemoryJob {
440 row,
441 perspective,
442 payload,
443 });
444 }
445
446 Ok(claimed)
447 }
448
449 pub async fn complete_job(&self, job: &ClaimedMemoryJob) -> Result<()> {
451 let result = sqlx::query(
452 r#"
453 UPDATE memory_jobs
454 SET status = ?, lease_owner = NULL, claim_token = NULL, lease_expires_at = NULL, updated_at = datetime('now')
455 WHERE id = ?
456 AND lease_owner = ?
457 AND claim_token = ?
458 "#,
459 )
460 .bind(memory_job_status::COMPLETED)
461 .bind(job.row.id)
462 .bind(job.row.lease_owner.as_deref())
463 .bind(job.row.claim_token.as_deref())
464 .execute(&self.pool)
465 .await
466 .map_err(db_error)?;
467
468 if result.rows_affected() == 0 {
469 return Err(nexus_core::NexusError::Storage(format!(
470 "Memory job {} completion lost lease ownership",
471 job.row.id
472 )));
473 }
474
475 Ok(())
476 }
477
478 pub async fn fail_job(&self, job: &ClaimedMemoryJob, error: &str) -> Result<()> {
481 let row: Option<MemoryJobRow> = sqlx::query_as("SELECT * FROM memory_jobs WHERE id = ?")
482 .bind(job.row.id)
483 .fetch_optional(&self.pool)
484 .await
485 .map_err(db_error)?;
486
487 let row = row.ok_or_else(|| {
488 nexus_core::NexusError::Storage(format!("Memory job {} not found", job.row.id))
489 })?;
490
491 let lease_matches =
492 row.lease_owner == job.row.lease_owner && row.claim_token == job.row.claim_token;
493 if !lease_matches {
494 return Err(nexus_core::NexusError::Storage(format!(
495 "Memory job {} failure lost lease ownership",
496 job.row.id
497 )));
498 }
499
500 if row.attempts >= MAX_JOB_ATTEMPTS {
501 let result = sqlx::query(
503 r#"
504 UPDATE memory_jobs
505 SET status = ?, last_error = ?, lease_owner = NULL, claim_token = NULL, lease_expires_at = NULL, updated_at = datetime('now')
506 WHERE id = ? AND lease_owner = ? AND claim_token = ?
507 "#,
508 )
509 .bind(memory_job_status::FAILED)
510 .bind(error)
511 .bind(job.row.id)
512 .bind(job.row.lease_owner.as_deref())
513 .bind(job.row.claim_token.as_deref())
514 .execute(&self.pool)
515 .await
516 .map_err(db_error)?;
517 if result.rows_affected() == 0 {
518 return Err(nexus_core::NexusError::Storage(format!(
519 "Memory job {} failure lost lease ownership",
520 job.row.id
521 )));
522 }
523 } else {
524 let result = sqlx::query(
526 r#"
527 UPDATE memory_jobs
528 SET status = ?, last_error = ?, lease_owner = NULL, claim_token = NULL, lease_expires_at = NULL, updated_at = datetime('now')
529 WHERE id = ? AND lease_owner = ? AND claim_token = ?
530 "#,
531 )
532 .bind(memory_job_status::PENDING)
533 .bind(error)
534 .bind(job.row.id)
535 .bind(job.row.lease_owner.as_deref())
536 .bind(job.row.claim_token.as_deref())
537 .execute(&self.pool)
538 .await
539 .map_err(db_error)?;
540 if result.rows_affected() == 0 {
541 return Err(nexus_core::NexusError::Storage(format!(
542 "Memory job {} failure lost lease ownership",
543 job.row.id
544 )));
545 }
546 }
547
548 Ok(())
549 }
550
551 async fn permanently_fail_job(
555 &self,
556 job_id: i64,
557 lease_owner: &Option<String>,
558 claim_token: &Option<String>,
559 error: &str,
560 ) -> Result<()> {
561 sqlx::query(
562 r#"
563 UPDATE memory_jobs
564 SET status = ?, last_error = ?, lease_owner = NULL, claim_token = NULL,
565 lease_expires_at = NULL, updated_at = datetime('now')
566 WHERE id = ? AND lease_owner = ? AND claim_token = ?
567 "#,
568 )
569 .bind(memory_job_status::FAILED)
570 .bind(error)
571 .bind(job_id)
572 .bind(lease_owner.as_deref())
573 .bind(claim_token.as_deref())
574 .execute(&self.pool)
575 .await
576 .map_err(db_error)?;
577 Ok(())
578 }
579
580 pub async fn get_most_reinforced_by_namespace(
581 &self,
582 namespace_id: i64,
583 limit: i64,
584 include_raw: bool,
585 ) -> Result<Vec<Memory>> {
586 let noise_sql = if include_raw {
587 String::new()
588 } else {
589 format!("AND {}", RAW_ACTIVITY_FILTER_SQL)
590 };
591 let rows = sqlx::query_as::<_, MemoryRow>(&format!(
592 r#"
593 SELECT * FROM memories
594 WHERE namespace_id = ?
595 AND is_active = 1
596 AND json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.level') = ?
597 {noise_sql}
598 ORDER BY COALESCE(json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.times_reinforced'), 0) DESC,
599 created_at DESC
600 LIMIT ?
601 "#
602 ))
603 .bind(namespace_id)
604 .bind(CognitiveLevel::Derived.as_str())
605 .bind(limit)
606 .fetch_all(&self.pool)
607 .await
608 .map_err(db_error)?;
609
610 rows.into_iter()
611 .map(|row| self.row_to_memory(row))
612 .collect()
613 }
614
615 pub async fn get_contradictions_by_namespace(
616 &self,
617 namespace_id: i64,
618 limit: i64,
619 include_raw: bool,
620 ) -> Result<Vec<Memory>> {
621 let noise_sql = if include_raw {
622 String::new()
623 } else {
624 format!("AND {}", RAW_ACTIVITY_FILTER_SQL)
625 };
626 let rows = sqlx::query_as::<_, MemoryRow>(&format!(
627 r#"
628 SELECT * FROM memories
629 WHERE namespace_id = ?
630 AND is_active = 1
631 AND json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.level') = ?
632 {noise_sql}
633 ORDER BY COALESCE(json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.times_contradicted'), 0) DESC,
634 created_at DESC
635 LIMIT ?
636 "#
637 ))
638 .bind(namespace_id)
639 .bind(CognitiveLevel::Contradiction.as_str())
640 .bind(limit)
641 .fetch_all(&self.pool)
642 .await
643 .map_err(db_error)?;
644
645 rows.into_iter()
646 .map(|row| self.row_to_memory(row))
647 .collect()
648 }
649
650 pub async fn list_by_session_key(
651 &self,
652 namespace_id: i64,
653 session_key: &str,
654 limit: i64,
655 include_raw: bool,
656 ) -> Result<Vec<Memory>> {
657 let noise_sql = if include_raw {
658 String::new()
659 } else {
660 format!("AND {}", RAW_ACTIVITY_FILTER_SQL)
661 };
662 let session_filter = SESSION_KEY_FILTER.replace("METADATA", METADATA);
663 let rows = sqlx::query_as::<_, MemoryRow>(&format!(
664 r#"
665 SELECT * FROM memories
666 WHERE namespace_id = ?
667 AND is_active = 1
668 AND {session_filter}
669 {noise_sql}
670 ORDER BY created_at DESC
671 LIMIT ?
672 "#
673 ))
674 .bind(namespace_id)
675 .bind(session_key)
676 .bind(session_key)
677 .bind(limit)
678 .fetch_all(&self.pool)
679 .await
680 .map_err(db_error)?;
681
682 rows.into_iter()
683 .map(|row| self.row_to_memory(row))
684 .collect()
685 }
686
687 pub async fn store_digest(&self, params: StoreDigestParams<'_>) -> Result<i64> {
693 let id: i64 = sqlx::query_scalar(
694 r#"
695 INSERT INTO session_digests (namespace_id, session_key, digest_kind, memory_id, start_memory_id, end_memory_id, token_count, created_at)
696 VALUES (?, ?, ?, ?, ?, ?, ?, datetime('now'))
697 ON CONFLICT(namespace_id, session_key, digest_kind, end_memory_id) DO UPDATE SET
698 memory_id = excluded.memory_id,
699 token_count = excluded.token_count
700 RETURNING id
701 "#,
702 )
703 .bind(params.namespace_id)
704 .bind(params.session_key)
705 .bind(params.digest_kind)
706 .bind(params.memory_id)
707 .bind(params.start_memory_id)
708 .bind(params.end_memory_id)
709 .bind(params.token_count as i64)
710 .fetch_one(&self.pool)
711 .await
712 .map_err(db_error)?;
713
714 Ok(id)
715 }
716
717 pub async fn latest_digest_for_session(
720 &self,
721 namespace_id: i64,
722 session_key: &str,
723 digest_kind: &str,
724 ) -> Result<Option<Memory>> {
725 let digest: Option<SessionDigestRow> = sqlx::query_as::<_, SessionDigestRow>(
726 "SELECT * FROM session_digests WHERE namespace_id = ? AND session_key = ? AND digest_kind = ? ORDER BY created_at DESC LIMIT 1",
727 )
728 .bind(namespace_id)
729 .bind(session_key)
730 .bind(digest_kind)
731 .fetch_optional(&self.pool)
732 .await
733 .map_err(db_error)?;
734
735 match digest {
736 Some(d) => self.get_by_id(d.memory_id).await,
737 None => Ok(None),
738 }
739 }
740
741 pub async fn latest_digest_for_namespace(
745 &self,
746 namespace_id: i64,
747 digest_kind: &str,
748 ) -> Result<Option<Memory>> {
749 let digest: Option<SessionDigestRow> = sqlx::query_as::<_, SessionDigestRow>(
750 "SELECT * FROM session_digests WHERE namespace_id = ? AND digest_kind = ? ORDER BY created_at DESC LIMIT 1",
751 )
752 .bind(namespace_id)
753 .bind(digest_kind)
754 .fetch_optional(&self.pool)
755 .await
756 .map_err(db_error)?;
757
758 match digest {
759 Some(d) => self.get_by_id(d.memory_id).await,
760 None => Ok(None),
761 }
762 }
763
764 pub async fn session_digest_rollover(
767 &self,
768 namespace_id: i64,
769 session_key: &str,
770 ) -> Result<SessionDigestRollover> {
771 let last_digest_end_memory_id: Option<i64> = sqlx::query_scalar(
772 "SELECT MAX(end_memory_id) FROM session_digests WHERE namespace_id = ? AND session_key = ?",
773 )
774 .bind(namespace_id)
775 .bind(session_key)
776 .fetch_one(&self.pool)
777 .await
778 .map_err(db_error)?;
779
780 let (new_memory_count, estimated_new_tokens): (i64, i64) = sqlx::query_as(&format!(
781 r#"
782 SELECT
783 COUNT(*) as new_memory_count,
784 COALESCE(SUM((LENGTH(content) + 3) / 4), 0) as estimated_new_tokens
785 FROM memories
786 WHERE namespace_id = ?
787 AND is_active = 1
788 AND id > ?
789 AND (
790 json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.session_key') = ?
791 OR EXISTS (
792 SELECT 1
793 FROM json_each(COALESCE(json_extract(metadata, '$.cognitive.session_keys'), '[]'))
794 WHERE value = ?
795 )
796 )
797 AND {RAW_ACTIVITY_FILTER_SQL}
798 AND COALESCE(json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.level'), '') NOT IN ('raw', 'summary_short', 'summary_long')
799 "#,
800 ))
801 .bind(namespace_id)
802 .bind(last_digest_end_memory_id.unwrap_or(0))
803 .bind(session_key)
804 .bind(session_key)
805 .fetch_one(&self.pool)
806 .await
807 .map_err(db_error)?;
808
809 Ok(SessionDigestRollover {
810 last_digest_end_memory_id,
811 new_memory_count,
812 estimated_new_tokens,
813 })
814 }
815
816 pub async fn get_recent_by_perspective(
821 &self,
822 namespace_id: i64,
823 perspective: &PerspectiveKey,
824 limit: i64,
825 ) -> Result<Vec<Memory>> {
826 self.get_recent_by_perspective_opts(namespace_id, perspective, limit, false)
827 .await
828 }
829
830 pub async fn get_recent_by_perspective_opts(
833 &self,
834 namespace_id: i64,
835 perspective: &PerspectiveKey,
836 limit: i64,
837 include_raw: bool,
838 ) -> Result<Vec<Memory>> {
839 let noise_sql = if include_raw {
840 String::new()
841 } else {
842 format!("AND {}", RAW_ACTIVITY_FILTER_SQL)
843 };
844
845 let perspective_sql = perspective_where_clause(perspective);
846 let sql = format!(
847 r#"
848 SELECT * FROM memories
849 WHERE namespace_id = ?
850 AND is_active = 1
851 AND {perspective_sql}
852 {noise_sql}
853 ORDER BY created_at DESC
854 LIMIT ?
855 "#
856 );
857 let mut query = sqlx::query_as::<_, MemoryRow>(&sql).bind(namespace_id);
858
859 for val in bind_perspective(perspective) {
860 query = query.bind(val);
861 }
862
863 let rows = query
864 .bind(limit)
865 .fetch_all(&self.pool)
866 .await
867 .map_err(db_error)?;
868
869 rows.into_iter()
870 .map(|row| self.row_to_memory(row))
871 .collect()
872 }
873
874 pub async fn get_by_cognitive_level(
876 &self,
877 namespace_id: i64,
878 level: CognitiveLevel,
879 limit: i64,
880 ) -> Result<Vec<Memory>> {
881 let rows = sqlx::query_as::<_, MemoryRow>(&format!(
882 r#"
883 SELECT * FROM memories
884 WHERE namespace_id = ?
885 AND is_active = 1
886 AND {} = ?
887 ORDER BY created_at DESC
888 LIMIT ?
889 "#,
890 COGNITIVE_LEVEL_EXPR,
891 ))
892 .bind(namespace_id)
893 .bind(level.as_str())
894 .bind(limit)
895 .fetch_all(&self.pool)
896 .await
897 .map_err(db_error)?;
898
899 rows.into_iter()
900 .map(|row| self.row_to_memory(row))
901 .collect()
902 }
903
904 pub async fn get_by_cognitive_level_with_perspective(
910 &self,
911 namespace_id: i64,
912 level: CognitiveLevel,
913 perspective: &PerspectiveKey,
914 limit: i64,
915 ) -> Result<Vec<Memory>> {
916 let perspective_sql = perspective_where_clause(perspective);
917 let sql = format!(
918 r#"
919 SELECT * FROM memories
920 WHERE namespace_id = ?
921 AND is_active = 1
922 AND {COGNITIVE_LEVEL_EXPR} = ?
923 AND {perspective_sql}
924 ORDER BY created_at DESC
925 LIMIT ?
926 "#
927 );
928 let mut query = sqlx::query_as::<_, MemoryRow>(&sql)
929 .bind(namespace_id)
930 .bind(level.as_str());
931
932 for val in bind_perspective(perspective) {
933 query = query.bind(val);
934 }
935
936 let rows = query
937 .bind(limit)
938 .fetch_all(&self.pool)
939 .await
940 .map_err(db_error)?;
941
942 rows.into_iter()
943 .map(|row| self.row_to_memory(row))
944 .collect()
945 }
946
947 pub async fn get_most_reinforced_by_perspective(
951 &self,
952 namespace_id: i64,
953 perspective: &PerspectiveKey,
954 limit: i64,
955 ) -> Result<Vec<Memory>> {
956 self.get_most_reinforced_by_perspective_opts(namespace_id, perspective, limit, false)
957 .await
958 }
959
960 pub async fn get_most_reinforced_by_perspective_opts(
963 &self,
964 namespace_id: i64,
965 perspective: &PerspectiveKey,
966 limit: i64,
967 include_raw: bool,
968 ) -> Result<Vec<Memory>> {
969 let noise_sql = if include_raw {
970 String::new()
971 } else {
972 format!("AND {}", RAW_ACTIVITY_FILTER_SQL)
973 };
974
975 let perspective_sql = perspective_where_clause(perspective);
976 let sql = format!(
977 r#"
978 SELECT * FROM memories
979 WHERE namespace_id = ?
980 AND is_active = 1
981 AND {perspective_sql}
982 AND {COGNITIVE_LEVEL_EXPR} != ?
983 {noise_sql}
984 ORDER BY COALESCE(json_extract({METADATA}, '$.cognitive.times_reinforced'), 0) DESC,
985 created_at DESC
986 LIMIT ?
987 "#
988 );
989 let mut query = sqlx::query_as::<_, MemoryRow>(&sql).bind(namespace_id);
990
991 for val in bind_perspective(perspective) {
992 query = query.bind(val);
993 }
994
995 let rows = query
996 .bind(CognitiveLevel::Contradiction.as_str())
997 .bind(limit)
998 .fetch_all(&self.pool)
999 .await
1000 .map_err(db_error)?;
1001
1002 rows.into_iter()
1003 .map(|row| self.row_to_memory(row))
1004 .collect()
1005 }
1006
1007 pub async fn get_contradictions_by_perspective(
1011 &self,
1012 namespace_id: i64,
1013 perspective: &PerspectiveKey,
1014 limit: i64,
1015 ) -> Result<Vec<Memory>> {
1016 self.get_contradictions_by_perspective_opts(namespace_id, perspective, limit, false)
1017 .await
1018 }
1019
1020 pub async fn get_contradictions_by_perspective_opts(
1023 &self,
1024 namespace_id: i64,
1025 perspective: &PerspectiveKey,
1026 limit: i64,
1027 include_raw: bool,
1028 ) -> Result<Vec<Memory>> {
1029 let noise_sql = if include_raw {
1030 String::new()
1031 } else {
1032 format!("AND {}", RAW_ACTIVITY_FILTER_SQL)
1033 };
1034
1035 let perspective_sql = perspective_where_clause(perspective);
1036 let sql = format!(
1037 r#"
1038 SELECT * FROM memories
1039 WHERE namespace_id = ?
1040 AND is_active = 1
1041 AND {perspective_sql}
1042 AND {COGNITIVE_LEVEL_EXPR} = ?
1043 {noise_sql}
1044 ORDER BY COALESCE(json_extract({METADATA}, '$.cognitive.times_contradicted'), 0) DESC,
1045 created_at DESC
1046 LIMIT ?
1047 "#
1048 );
1049 let mut query = sqlx::query_as::<_, MemoryRow>(&sql).bind(namespace_id);
1050
1051 for val in bind_perspective(perspective) {
1052 query = query.bind(val);
1053 }
1054
1055 let rows = query
1056 .bind(CognitiveLevel::Contradiction.as_str())
1057 .bind(limit)
1058 .fetch_all(&self.pool)
1059 .await
1060 .map_err(db_error)?;
1061
1062 rows.into_iter()
1063 .map(|row| self.row_to_memory(row))
1064 .collect()
1065 }
1066
1067 pub async fn search_working_set(&self, params: WorkingSetParams<'_>) -> Result<Vec<Memory>> {
1086 let WorkingSetParams {
1087 namespace_id,
1088 perspective,
1089 max_items,
1090 include_raw,
1091 } = params;
1092
1093 let per_bucket = (max_items as i64).max(8);
1096
1097 let session = perspective
1099 .and_then(|p| p.session_key.as_deref())
1100 .unwrap_or("");
1101 let mut digests = Vec::new();
1102 if let Some(short) = self
1103 .latest_digest_for_session(namespace_id, session, "short")
1104 .await?
1105 {
1106 digests.push(short);
1107 }
1108 if let Some(long) = self
1109 .latest_digest_for_session(namespace_id, session, "long")
1110 .await?
1111 {
1112 digests.push(long);
1113 }
1114
1115 let reinforced = if let Some(persp) = perspective {
1117 self.get_most_reinforced_by_perspective_opts(
1118 namespace_id,
1119 persp,
1120 per_bucket,
1121 include_raw,
1122 )
1123 .await?
1124 } else {
1125 self.get_most_reinforced_by_namespace(namespace_id, per_bucket, include_raw)
1126 .await?
1127 };
1128
1129 let recent = if let Some(persp) = perspective {
1130 self.get_recent_by_perspective_opts(namespace_id, persp, per_bucket, include_raw)
1131 .await?
1132 } else {
1133 let sql = if include_raw {
1134 "SELECT * FROM memories WHERE namespace_id = ? AND is_active = 1 ORDER BY created_at DESC LIMIT ?"
1135 .to_string()
1136 } else {
1137 format!(
1138 "SELECT * FROM memories WHERE namespace_id = ? AND is_active = 1 AND {} ORDER BY created_at DESC LIMIT ?",
1139 RAW_ACTIVITY_FILTER_SQL,
1140 )
1141 };
1142 let rows: Vec<MemoryRow> = sqlx::query_as(&sql)
1143 .bind(namespace_id)
1144 .bind(per_bucket)
1145 .fetch_all(&self.pool)
1146 .await
1147 .map_err(db_error)?;
1148 rows.into_iter()
1149 .map(|r| self.row_to_memory(r))
1150 .collect::<Result<Vec<_>>>()?
1151 };
1152
1153 let contradictions = if let Some(persp) = perspective {
1154 self.get_contradictions_by_perspective_opts(
1155 namespace_id,
1156 persp,
1157 per_bucket,
1158 include_raw,
1159 )
1160 .await?
1161 } else {
1162 self.get_contradictions_by_namespace(namespace_id, per_bucket, include_raw)
1163 .await?
1164 };
1165
1166 let mut seen = std::collections::HashSet::new();
1168 let mut result = Vec::with_capacity(max_items);
1169
1170 for memory in digests
1171 .into_iter()
1172 .chain(reinforced)
1173 .chain(recent)
1174 .chain(contradictions)
1175 {
1176 if seen.insert(memory.id) {
1177 result.push(memory);
1178 if result.len() >= max_items {
1179 break;
1180 }
1181 }
1182 }
1183
1184 Ok(result)
1185 }
1186
1187 pub async fn load_lineage(&self, memory_id: i64) -> Result<Vec<MemoryLineageEntry>> {
1189 let rows: Vec<MemoryEvidenceRow> = sqlx::query_as::<_, MemoryEvidenceRow>(
1190 r#"
1191 SELECT * FROM memory_evidence
1192 WHERE derived_memory_id = ? OR source_memory_id = ?
1193 ORDER BY created_at ASC
1194 "#,
1195 )
1196 .bind(memory_id)
1197 .bind(memory_id)
1198 .fetch_all(&self.pool)
1199 .await
1200 .map_err(db_error)?;
1201
1202 Ok(rows
1203 .into_iter()
1204 .map(|r| MemoryLineageEntry {
1205 derived_memory_id: r.derived_memory_id,
1206 source_memory_id: r.source_memory_id,
1207 evidence_role: r.evidence_role,
1208 })
1209 .collect())
1210 }
1211
1212 pub async fn load_lineage_batch(
1214 &self,
1215 memory_ids: &[i64],
1216 ) -> Result<HashMap<i64, Vec<MemoryLineageEntry>>> {
1217 if memory_ids.is_empty() {
1218 return Ok(HashMap::new());
1219 }
1220
1221 let placeholders = memory_ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1222 let sql = format!(
1223 r#"
1224 SELECT * FROM memory_evidence
1225 WHERE derived_memory_id IN ({placeholders})
1226 OR source_memory_id IN ({placeholders})
1227 ORDER BY created_at ASC
1228 "#
1229 );
1230
1231 let mut query = sqlx::query_as::<_, MemoryEvidenceRow>(&sql);
1232 for id in memory_ids {
1233 query = query.bind(*id);
1234 }
1235 for id in memory_ids {
1236 query = query.bind(*id);
1237 }
1238
1239 let rows = query.fetch_all(&self.pool).await.map_err(db_error)?;
1240 let id_set: HashSet<i64> = memory_ids.iter().copied().collect();
1241 let mut grouped: HashMap<i64, Vec<MemoryLineageEntry>> = HashMap::new();
1242
1243 for row in rows {
1244 let entry = MemoryLineageEntry {
1245 derived_memory_id: row.derived_memory_id,
1246 source_memory_id: row.source_memory_id,
1247 evidence_role: row.evidence_role,
1248 };
1249
1250 if id_set.contains(&entry.derived_memory_id) {
1251 grouped
1252 .entry(entry.derived_memory_id)
1253 .or_default()
1254 .push(entry.clone());
1255 }
1256 if id_set.contains(&entry.source_memory_id) {
1257 grouped
1258 .entry(entry.source_memory_id)
1259 .or_default()
1260 .push(entry);
1261 }
1262 }
1263
1264 Ok(grouped)
1265 }
1266
1267 pub async fn get_by_id(&self, id: i64) -> Result<Option<Memory>> {
1269 let row: Option<MemoryRow> = sqlx::query_as("SELECT * FROM memories WHERE id = ?")
1270 .bind(id)
1271 .fetch_optional(&self.pool)
1272 .await
1273 .map_err(db_error)?;
1274
1275 row.map(|r| self.row_to_memory(r)).transpose()
1276 }
1277
1278 pub async fn get_by_ids(&self, ids: &[i64]) -> Result<Vec<Memory>> {
1283 if ids.is_empty() {
1284 return Ok(Vec::new());
1285 }
1286
1287 let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1289
1290 let sql = format!("SELECT * FROM memories WHERE id IN ({placeholders})");
1291
1292 let mut query = sqlx::query_as::<_, MemoryRow>(&sql);
1293 for id in ids {
1294 query = query.bind(*id);
1295 }
1296
1297 let rows = query.fetch_all(&self.pool).await.map_err(db_error)?;
1298
1299 let mut memories: Vec<Memory> = Vec::with_capacity(rows.len());
1300 for row in rows {
1301 memories.push(self.row_to_memory(row)?);
1302 }
1303
1304 Ok(memories)
1305 }
1306
1307 pub async fn get_by_content(&self, namespace_id: i64, content: &str) -> Result<Memory> {
1309 let row: Option<MemoryRow> = sqlx::query_as(
1310 "SELECT * FROM memories WHERE namespace_id = ? AND LOWER(TRIM(content)) = LOWER(TRIM(?)) AND is_active = 1 ORDER BY created_at DESC LIMIT 1"
1311 )
1312 .bind(namespace_id)
1313 .bind(content)
1314 .fetch_optional(&self.pool)
1315 .await
1316 .map_err(db_error)?;
1317
1318 row.map(|r| self.row_to_memory(r))
1319 .transpose()?
1320 .ok_or_else(|| {
1321 nexus_core::NexusError::Storage(
1322 "No memories found in namespace after insert".to_string(),
1323 )
1324 })
1325 }
1326
1327 pub async fn search_by_namespace(
1329 &self,
1330 namespace_id: i64,
1331 limit: usize,
1332 offset: usize,
1333 ) -> Result<Vec<Memory>> {
1334 let rows: Vec<MemoryRow> = sqlx::query_as(
1335 "SELECT * FROM memories WHERE namespace_id = ? AND is_active = 1 ORDER BY created_at DESC LIMIT ? OFFSET ?"
1336 )
1337 .bind(namespace_id)
1338 .bind(limit as i64)
1339 .bind(offset as i64)
1340 .fetch_all(&self.pool)
1341 .await
1342 .map_err(db_error)?;
1343
1344 rows.into_iter().map(|r| self.row_to_memory(r)).collect()
1345 }
1346
1347 pub async fn count_by_namespace(&self, namespace_id: i64) -> Result<i64> {
1349 let count: (i64,) = sqlx::query_as(
1350 "SELECT COUNT(*) FROM memories WHERE namespace_id = ? AND is_active = 1",
1351 )
1352 .bind(namespace_id)
1353 .fetch_one(&self.pool)
1354 .await
1355 .map_err(db_error)?;
1356
1357 Ok(count.0)
1358 }
1359
1360 pub async fn count_all_by_namespace(&self, namespace_id: i64) -> Result<i64> {
1362 let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM memories WHERE namespace_id = ?")
1363 .bind(namespace_id)
1364 .fetch_one(&self.pool)
1365 .await
1366 .map_err(db_error)?;
1367
1368 Ok(count.0)
1369 }
1370
1371 pub async fn count_archived_by_namespace(&self, namespace_id: i64) -> Result<i64> {
1373 let count: (i64,) = sqlx::query_as(
1374 "SELECT COUNT(*) FROM memories WHERE namespace_id = ? AND is_archived = 1",
1375 )
1376 .bind(namespace_id)
1377 .fetch_one(&self.pool)
1378 .await
1379 .map_err(db_error)?;
1380
1381 Ok(count.0)
1382 }
1383
1384 pub async fn delete(&self, id: i64) -> Result<bool> {
1386 let result = sqlx::query("DELETE FROM memories WHERE id = ?")
1387 .bind(id)
1388 .execute(&self.pool)
1389 .await
1390 .map_err(db_error)?;
1391
1392 Ok(result.rows_affected() > 0)
1393 }
1394
1395 pub async fn touch(&self, id: i64) -> Result<()> {
1397 sqlx::query(
1398 "UPDATE memories SET access_count = access_count + 1, last_accessed = ? WHERE id = ?",
1399 )
1400 .bind(Utc::now())
1401 .bind(id)
1402 .execute(&self.pool)
1403 .await
1404 .map_err(db_error)?;
1405
1406 Ok(())
1407 }
1408
1409 pub async fn get_unconsolidated(
1411 &self,
1412 namespace_id: i64,
1413 limit: i32,
1414 ) -> Result<Vec<MemoryRow>> {
1415 let rows = sqlx::query_as::<_, MemoryRow>(
1416 r#"
1417 SELECT * FROM memories
1418 WHERE namespace_id = ?
1419 AND is_active = 1
1420 AND (metadata IS NULL OR json_extract(metadata, '$.agent.consolidated') IS NULL)
1421 ORDER BY created_at ASC
1422 LIMIT ?
1423 "#,
1424 )
1425 .bind(namespace_id)
1426 .bind(limit)
1427 .fetch_all(&self.pool)
1428 .await
1429 .map_err(db_error)?;
1430
1431 Ok(rows)
1432 }
1433
1434 pub async fn mark_consolidated(&self, id: i64) -> Result<()> {
1436 sqlx::query(
1437 r#"
1438 UPDATE memories
1439 SET metadata = json_set(
1440 COALESCE(metadata, '{}'),
1441 '$.agent.consolidated',
1442 true,
1443 '$.agent.consolidated_at',
1444 datetime('now')
1445 ),
1446 updated_at = datetime('now')
1447 WHERE id = ?
1448 "#,
1449 )
1450 .bind(id)
1451 .execute(&self.pool)
1452 .await
1453 .map_err(db_error)?;
1454
1455 Ok(())
1456 }
1457
1458 pub async fn mark_consolidated_batch(&self, ids: &[i64]) -> Result<()> {
1460 if ids.is_empty() {
1461 return Ok(());
1462 }
1463 let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1464 let query = format!(
1465 r#"
1466 UPDATE memories
1467 SET metadata = json_set(
1468 COALESCE(metadata, '{{}}'),
1469 '$.agent.consolidated',
1470 true,
1471 '$.agent.consolidated_at',
1472 datetime('now')
1473 ),
1474 updated_at = datetime('now')
1475 WHERE id IN ({})
1476 "#,
1477 placeholders
1478 );
1479 let mut q = sqlx::query(&query);
1480 for id in ids {
1481 q = q.bind(*id);
1482 }
1483 q.execute(&self.pool).await.map_err(db_error)?;
1484 Ok(())
1485 }
1486
1487 pub async fn search_by_text(
1489 &self,
1490 namespace_id: i64,
1491 query: &str,
1492 limit: i32,
1493 include_raw: bool,
1494 ) -> Result<Vec<MemoryRow>> {
1495 let pattern = format!("%{}%", query);
1496 let raw_clause = if include_raw {
1497 String::new()
1498 } else {
1499 format!("AND {RAW_ACTIVITY_FILTER_SQL}")
1500 };
1501 let rows = sqlx::query_as::<_, MemoryRow>(&format!(
1502 r#"
1503 SELECT * FROM memories
1504 WHERE namespace_id = ?
1505 AND is_active = 1
1506 AND content LIKE ?
1507 {}
1508 ORDER BY updated_at DESC
1509 LIMIT ?
1510 "#,
1511 raw_clause
1512 ))
1513 .bind(namespace_id)
1514 .bind(&pattern)
1515 .bind(limit)
1516 .fetch_all(&self.pool)
1517 .await
1518 .map_err(db_error)?;
1519
1520 Ok(rows)
1521 }
1522
1523 pub async fn search_by_text_memories(
1525 &self,
1526 namespace_id: i64,
1527 query: &str,
1528 limit: i32,
1529 include_raw: bool,
1530 ) -> Result<Vec<Memory>> {
1531 let rows = self
1532 .search_by_text(namespace_id, query, limit, include_raw)
1533 .await?;
1534 rows.into_iter()
1535 .map(|row| self.row_to_memory(row))
1536 .collect()
1537 }
1538
1539 pub async fn get_semantic_candidates(
1541 &self,
1542 params: SemanticCandidateParams<'_>,
1543 ) -> Result<Vec<Memory>> {
1544 let SemanticCandidateParams {
1545 namespace_id,
1546 perspective,
1547 limit,
1548 include_raw,
1549 } = params;
1550
1551 let noise_sql = if include_raw {
1552 String::new()
1553 } else {
1554 format!("AND {}", RAW_ACTIVITY_FILTER_SQL)
1555 };
1556
1557 let rows = if let Some(perspective) = perspective {
1558 let sql = if perspective.session_key.is_some() {
1559 format!(
1560 r#"
1561 SELECT * FROM memories
1562 WHERE namespace_id = ?
1563 AND is_active = 1
1564 AND content_embedding IS NOT NULL
1565 AND json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.observer') = ?
1566 AND json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.subject') = ?
1567 AND (
1568 json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.session_key') = ?
1569 OR EXISTS (
1570 SELECT 1
1571 FROM json_each(COALESCE(json_extract(metadata, '$.cognitive.session_keys'), '[]'))
1572 WHERE value = ?
1573 )
1574 )
1575 {noise_sql}
1576 ORDER BY updated_at DESC, created_at DESC
1577 LIMIT ?
1578 "#
1579 )
1580 } else {
1581 format!(
1582 r#"
1583 SELECT * FROM memories
1584 WHERE namespace_id = ?
1585 AND is_active = 1
1586 AND content_embedding IS NOT NULL
1587 AND json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.observer') = ?
1588 AND json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.subject') = ?
1589 {noise_sql}
1590 ORDER BY updated_at DESC, created_at DESC
1591 LIMIT ?
1592 "#
1593 )
1594 };
1595
1596 let mut query = sqlx::query_as::<_, MemoryRow>(&sql)
1597 .bind(namespace_id)
1598 .bind(&perspective.observer)
1599 .bind(&perspective.subject);
1600
1601 if let Some(session_key) = &perspective.session_key {
1602 query = query.bind(session_key);
1603 query = query.bind(session_key);
1604 }
1605
1606 query
1607 .bind(limit)
1608 .fetch_all(&self.pool)
1609 .await
1610 .map_err(db_error)?
1611 } else {
1612 let sql = if include_raw {
1613 r#"
1614 SELECT * FROM memories
1615 WHERE namespace_id = ?
1616 AND is_active = 1
1617 AND content_embedding IS NOT NULL
1618 ORDER BY updated_at DESC, created_at DESC
1619 LIMIT ?
1620 "#
1621 .to_string()
1622 } else {
1623 format!(
1624 r#"
1625 SELECT * FROM memories
1626 WHERE namespace_id = ?
1627 AND is_active = 1
1628 AND content_embedding IS NOT NULL
1629 AND {}
1630 ORDER BY updated_at DESC, created_at DESC
1631 LIMIT ?
1632 "#,
1633 RAW_ACTIVITY_FILTER_SQL,
1634 )
1635 };
1636
1637 sqlx::query_as::<_, MemoryRow>(&sql)
1638 .bind(namespace_id)
1639 .bind(limit)
1640 .fetch_all(&self.pool)
1641 .await
1642 .map_err(db_error)?
1643 };
1644
1645 rows.into_iter()
1646 .map(|row| self.row_to_memory(row))
1647 .collect()
1648 }
1649
1650 pub async fn list_filtered(
1652 &self,
1653 namespace_id: i64,
1654 filters: ListMemoryFilters<'_>,
1655 ) -> Result<Vec<Memory>> {
1656 let mut conditions = vec!["namespace_id = ?1".to_string(), "is_active = 1".to_string()];
1658 let mut param_idx = 2u32;
1659
1660 if filters.category.is_some() {
1661 conditions.push(format!("category = ?{}", param_idx));
1662 param_idx += 1;
1663 }
1664 if filters.since.is_some() {
1665 conditions.push(format!("created_at >= ?{}", param_idx));
1666 param_idx += 1;
1667 }
1668 if filters.until.is_some() {
1669 conditions.push(format!("created_at <= ?{}", param_idx));
1670 param_idx += 1;
1671 }
1672 if filters.content_like.is_some() {
1673 conditions.push(format!("content LIKE ?{}", param_idx));
1674 param_idx += 1;
1675 }
1676 if !filters.include_raw {
1677 conditions.push(RAW_ACTIVITY_FILTER_SQL.to_string());
1678 }
1679
1680 let sql = format!(
1681 "SELECT * FROM memories WHERE {} ORDER BY created_at DESC LIMIT ?{} OFFSET ?{}",
1682 conditions.join(" AND "),
1683 param_idx,
1684 param_idx + 1,
1685 );
1686
1687 let mut query = sqlx::query_as::<_, MemoryRow>(&sql).bind(namespace_id);
1688
1689 if let Some(cat) = filters.category {
1690 query = query.bind(cat.to_string());
1691 }
1692 if let Some(s) = filters.since {
1693 query = query.bind(s);
1694 }
1695 if let Some(u) = filters.until {
1696 query = query.bind(u);
1697 }
1698 if let Some(search) = filters.content_like {
1699 query = query.bind(format!("%{}%", search));
1700 }
1701
1702 let rows: Vec<MemoryRow> = query
1703 .bind(filters.limit)
1704 .bind(filters.offset)
1705 .fetch_all(&self.pool)
1706 .await
1707 .map_err(db_error)?;
1708
1709 rows.into_iter().map(|r| self.row_to_memory(r)).collect()
1710 }
1711
1712 pub async fn list_missing_cognitive_metadata(
1714 &self,
1715 namespace_id: i64,
1716 limit: i64,
1717 offset: i64,
1718 ) -> Result<Vec<Memory>> {
1719 let rows: Vec<MemoryRow> = sqlx::query_as(
1720 r#"
1721 SELECT * FROM memories
1722 WHERE namespace_id = ?
1723 AND json_extract(COALESCE(metadata, '{}'), '$.cognitive.level') IS NULL
1724 ORDER BY id ASC
1725 LIMIT ? OFFSET ?
1726 "#,
1727 )
1728 .bind(namespace_id)
1729 .bind(limit)
1730 .bind(offset)
1731 .fetch_all(&self.pool)
1732 .await
1733 .map_err(db_error)?;
1734
1735 rows.into_iter().map(|r| self.row_to_memory(r)).collect()
1736 }
1737
1738 pub async fn count_missing_cognitive_metadata(&self, namespace_id: i64) -> Result<i64> {
1740 let count: i64 = sqlx::query_scalar(
1741 r#"
1742 SELECT COUNT(*) FROM memories
1743 WHERE namespace_id = ?
1744 AND json_extract(COALESCE(metadata, '{}'), '$.cognitive.level') IS NULL
1745 "#,
1746 )
1747 .bind(namespace_id)
1748 .fetch_one(&self.pool)
1749 .await
1750 .map_err(db_error)?;
1751
1752 Ok(count)
1753 }
1754
1755 pub async fn update_memory_metadata(
1757 &self,
1758 memory_id: i64,
1759 metadata: &serde_json::Value,
1760 ) -> Result<()> {
1761 let metadata_json = serde_json::to_string(metadata)?;
1762 sqlx::query(
1763 r#"
1764 UPDATE memories
1765 SET metadata = ?, updated_at = ?
1766 WHERE id = ?
1767 "#,
1768 )
1769 .bind(metadata_json)
1770 .bind(Utc::now())
1771 .bind(memory_id)
1772 .execute(&self.pool)
1773 .await
1774 .map_err(db_error)?;
1775
1776 Ok(())
1777 }
1778
1779 pub async fn list_session_keys_without_digests(
1781 &self,
1782 namespace_id: i64,
1783 limit: i64,
1784 ) -> Result<Vec<String>> {
1785 let rows: Vec<(String,)> = sqlx::query_as(
1786 r#"
1787 SELECT DISTINCT json_extract(COALESCE(m.metadata, '{}'), '$.cognitive.session_key') AS session_key
1788 FROM memories m
1789 WHERE m.namespace_id = ?
1790 AND json_extract(COALESCE(m.metadata, '{}'), '$.cognitive.session_key') IS NOT NULL
1791 AND TRIM(json_extract(COALESCE(m.metadata, '{}'), '$.cognitive.session_key')) <> ''
1792 AND NOT EXISTS (
1793 SELECT 1 FROM session_digests sd
1794 WHERE sd.namespace_id = m.namespace_id
1795 AND sd.session_key = json_extract(COALESCE(m.metadata, '{}'), '$.cognitive.session_key')
1796 )
1797 ORDER BY session_key ASC
1798 LIMIT ?
1799 "#,
1800 )
1801 .bind(namespace_id)
1802 .bind(limit)
1803 .fetch_all(&self.pool)
1804 .await
1805 .map_err(db_error)?;
1806
1807 Ok(rows.into_iter().map(|(session_key,)| session_key).collect())
1808 }
1809
1810 pub async fn count_distinct_session_keys_with_cognition(
1812 &self,
1813 namespace_id: i64,
1814 ) -> Result<i64> {
1815 let count: i64 = sqlx::query_scalar(
1816 r#"
1817 SELECT COUNT(DISTINCT json_extract(COALESCE(metadata, '{}'), '$.cognitive.session_key'))
1818 FROM memories
1819 WHERE namespace_id = ?
1820 AND is_active = 1
1821 AND json_extract(COALESCE(metadata, '{}'), '$.cognitive.session_key') IS NOT NULL
1822 AND TRIM(json_extract(COALESCE(metadata, '{}'), '$.cognitive.session_key')) <> ''
1823 "#,
1824 )
1825 .bind(namespace_id)
1826 .fetch_one(&self.pool)
1827 .await
1828 .map_err(db_error)?;
1829
1830 Ok(count)
1831 }
1832
1833 pub async fn list_archived_raw_cleanup_candidates(
1835 &self,
1836 namespace_id: i64,
1837 older_than: DateTime<Utc>,
1838 limit: i64,
1839 ) -> Result<Vec<Memory>> {
1840 let rows: Vec<MemoryRow> = sqlx::query_as(
1841 r#"
1842 SELECT * FROM memories
1843 WHERE namespace_id = ?
1844 AND is_active = 0
1845 AND is_archived = 1
1846 AND (
1847 labels LIKE '%raw-activity%'
1848 OR json_extract(COALESCE(metadata, '{}'), '$.raw_activity') = 1
1849 )
1850 AND json_extract(COALESCE(metadata, '{}'), '$.distillation.summary_memory_id') IS NOT NULL
1851 AND created_at <= ?
1852 ORDER BY created_at ASC
1853 LIMIT ?
1854 "#,
1855 )
1856 .bind(namespace_id)
1857 .bind(older_than)
1858 .bind(limit)
1859 .fetch_all(&self.pool)
1860 .await
1861 .map_err(db_error)?;
1862
1863 rows.into_iter().map(|r| self.row_to_memory(r)).collect()
1864 }
1865
1866 pub async fn count_archived_raw_cleanup_candidates(
1868 &self,
1869 namespace_id: i64,
1870 older_than: DateTime<Utc>,
1871 ) -> Result<i64> {
1872 let count: i64 = sqlx::query_scalar(
1873 r#"
1874 SELECT COUNT(*) FROM memories
1875 WHERE namespace_id = ?
1876 AND is_active = 0
1877 AND is_archived = 1
1878 AND (
1879 labels LIKE '%raw-activity%'
1880 OR json_extract(COALESCE(metadata, '{}'), '$.raw_activity') = 1
1881 )
1882 AND json_extract(COALESCE(metadata, '{}'), '$.distillation.summary_memory_id') IS NOT NULL
1883 AND created_at <= ?
1884 "#,
1885 )
1886 .bind(namespace_id)
1887 .bind(older_than)
1888 .fetch_one(&self.pool)
1889 .await
1890 .map_err(db_error)?;
1891
1892 Ok(count)
1893 }
1894
1895 pub async fn delete_batch(&self, ids: &[i64]) -> Result<u64> {
1897 if ids.is_empty() {
1898 return Ok(0);
1899 }
1900
1901 let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1902 let sql = format!("DELETE FROM memories WHERE id IN ({placeholders})");
1903 let mut query = sqlx::query(&sql);
1904 for id in ids {
1905 query = query.bind(*id);
1906 }
1907
1908 let result = query.execute(&self.pool).await.map_err(db_error)?;
1909 Ok(result.rows_affected())
1910 }
1911
1912 pub async fn delete_by_content_pattern(&self, namespace_id: i64, pattern: &str) -> Result<u64> {
1914 let result = sqlx::query("DELETE FROM memories WHERE namespace_id = ? AND content LIKE ?")
1915 .bind(namespace_id)
1916 .bind(pattern)
1917 .execute(&self.pool)
1918 .await
1919 .map_err(db_error)?;
1920
1921 Ok(result.rows_affected())
1922 }
1923
1924 pub async fn count_filtered(
1926 &self,
1927 namespace_id: i64,
1928 category: Option<&str>,
1929 since: Option<DateTime<Utc>>,
1930 until: Option<DateTime<Utc>>,
1931 include_raw: bool,
1932 ) -> Result<i64> {
1933 let mut conditions = vec!["namespace_id = ?1".to_string(), "is_active = 1".to_string()];
1934 let mut param_idx = 2u32;
1935
1936 if category.is_some() {
1937 conditions.push(format!("category = ?{}", param_idx));
1938 param_idx += 1;
1939 }
1940 if since.is_some() {
1941 conditions.push(format!("created_at >= ?{}", param_idx));
1942 param_idx += 1;
1943 }
1944 if until.is_some() {
1945 conditions.push(format!("created_at <= ?{}", param_idx));
1946 }
1947 if !include_raw {
1948 conditions.push(RAW_ACTIVITY_FILTER_SQL.to_string());
1949 }
1950
1951 let sql = format!(
1952 "SELECT COUNT(*) FROM memories WHERE {}",
1953 conditions.join(" AND "),
1954 );
1955
1956 let mut query = sqlx::query_scalar::<_, i64>(&sql).bind(namespace_id);
1957
1958 if let Some(cat) = category {
1959 query = query.bind(cat.to_string());
1960 }
1961 if let Some(s) = since {
1962 query = query.bind(s);
1963 }
1964 if let Some(u) = until {
1965 query = query.bind(u);
1966 }
1967
1968 let count = query.fetch_one(&self.pool).await.map_err(db_error)?;
1969 Ok(count)
1970 }
1971
1972 pub async fn store_distilled_summary(
1974 &self,
1975 params: StoreMemoryParams<'_>,
1976 source_ids: &[i64],
1977 ) -> Result<Memory> {
1978 let labels_json = serde_json::to_string(params.labels)?;
1979 let metadata_json = serde_json::to_string(params.metadata)?;
1980 let embedding_json = params.embedding.map(serde_json::to_string).transpose()?;
1981 let mut tx = self.pool.begin().await.map_err(db_error)?;
1982
1983 let result = sqlx::query(
1984 r#"
1985 INSERT INTO memories (
1986 namespace_id, content, category, memory_lane_type, labels, metadata,
1987 content_embedding, embedding_model, created_at, is_active, access_count
1988 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 1, 0)
1989 "#,
1990 )
1991 .bind(params.namespace_id)
1992 .bind(params.content)
1993 .bind(params.category.to_string())
1994 .bind(params.memory_lane_type.map(|t| t.to_string()))
1995 .bind(&labels_json)
1996 .bind(&metadata_json)
1997 .bind(&embedding_json)
1998 .bind(params.embedding_model)
1999 .bind(Utc::now())
2000 .execute(&mut *tx)
2001 .await
2002 .map_err(db_error)?;
2003
2004 let summary_id = if result.last_insert_rowid() == 0 {
2005 let row: Option<MemoryRow> = sqlx::query_as(
2006 "SELECT * FROM memories WHERE namespace_id = ? AND LOWER(TRIM(content)) = LOWER(TRIM(?)) ORDER BY created_at DESC LIMIT 1",
2007 )
2008 .bind(params.namespace_id)
2009 .bind(params.content)
2010 .fetch_optional(&mut *tx)
2011 .await
2012 .map_err(db_error)?;
2013 row.map(|memory| memory.id).ok_or_else(|| {
2014 nexus_core::NexusError::Storage(
2015 "Duplicate distilled summary merged but matching row not found".to_string(),
2016 )
2017 })?
2018 } else {
2019 result.last_insert_rowid()
2020 };
2021
2022 if !source_ids.is_empty() {
2023 for source_id in source_ids {
2024 sqlx::query(
2025 r#"
2026 INSERT OR IGNORE INTO memory_evidence (
2027 derived_memory_id,
2028 source_memory_id,
2029 evidence_role,
2030 created_at
2031 ) VALUES (?, ?, 'source', datetime('now'))
2032 "#,
2033 )
2034 .bind(summary_id)
2035 .bind(*source_id)
2036 .execute(&mut *tx)
2037 .await
2038 .map_err(db_error)?;
2039 }
2040
2041 let placeholders = source_ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
2042 let sql = format!(
2043 r#"
2044 UPDATE memories
2045 SET
2046 is_active = 0,
2047 is_archived = 1,
2048 updated_at = ?,
2049 metadata = json_set(
2050 COALESCE(metadata, '{{}}'),
2051 '$.distillation.status', 'archived',
2052 '$.distillation.summary_memory_id', ?,
2053 '$.distillation.archived_at', ?
2054 )
2055 WHERE id IN ({})
2056 "#,
2057 placeholders
2058 );
2059 let archived_at = Utc::now().to_rfc3339();
2060 let mut query = sqlx::query(&sql)
2061 .bind(Utc::now())
2062 .bind(summary_id)
2063 .bind(&archived_at);
2064 for source_id in source_ids {
2065 query = query.bind(*source_id);
2066 }
2067 query.execute(&mut *tx).await.map_err(db_error)?;
2068 }
2069
2070 tx.commit().await.map_err(db_error)?;
2071 self.get_by_id(summary_id).await?.ok_or_else(|| {
2072 nexus_core::NexusError::Storage(format!(
2073 "Failed to retrieve distilled summary with id {}",
2074 summary_id
2075 ))
2076 })
2077 }
2078
2079 fn row_to_memory(&self, row: MemoryRow) -> Result<Memory> {
2080 let labels: Vec<String> = serde_json::from_str(&row.labels).map_err(|e| {
2081 nexus_core::NexusError::Storage(format!(
2082 "corrupted labels JSON for memory {}: {e}",
2083 row.id
2084 ))
2085 })?;
2086 let metadata: serde_json::Value = serde_json::from_str(&row.metadata).map_err(|e| {
2087 nexus_core::NexusError::Storage(format!(
2088 "corrupted metadata JSON for memory {}: {e}",
2089 row.id
2090 ))
2091 })?;
2092 let embedding: Option<Vec<f32>> = row
2093 .content_embedding
2094 .map(|e| {
2095 serde_json::from_str(&e).map_err(|err| {
2096 nexus_core::NexusError::Storage(format!(
2097 "corrupted embedding JSON for memory {}: {err}",
2098 row.id
2099 ))
2100 })
2101 })
2102 .transpose()?;
2103
2104 Ok(Memory {
2105 id: row.id,
2106 namespace_id: row.namespace_id,
2107 content: row.content,
2108 category: parse_category(&row.category)?,
2109 memory_lane_type: match &row.memory_lane_type {
2110 Some(s) => parse_memory_lane_type(s)?,
2111 None => None,
2112 },
2113 labels,
2114 metadata,
2115 similarity_score: row.similarity_score,
2116 relevance_score: row.relevance_score,
2117 content_embedding: embedding,
2118 embedding_model: row.embedding_model,
2119 created_at: row.created_at,
2120 updated_at: row.updated_at,
2121 last_accessed: row.last_accessed,
2122 is_active: row.is_active,
2123 is_archived: row.is_archived,
2124 access_count: row.access_count,
2125 })
2126 }
2127
2128 pub async fn list_jobs(
2132 &self,
2133 namespace_id: i64,
2134 job_type: Option<&str>,
2135 status: Option<&str>,
2136 limit: i64,
2137 offset: i64,
2138 ) -> Result<Vec<MemoryJobRow>> {
2139 let mut where_clauses = vec!["namespace_id = ?".to_string()];
2140 if job_type.is_some() {
2141 where_clauses.push("job_type = ?".to_string());
2142 }
2143 if status.is_some() {
2144 where_clauses.push("status = ?".to_string());
2145 }
2146 let where_sql = where_clauses.join(" AND ");
2147
2148 let sql = format!(
2149 "SELECT * FROM memory_jobs WHERE {} ORDER BY created_at DESC LIMIT ? OFFSET ?",
2150 where_sql
2151 );
2152
2153 let mut query = sqlx::query_as::<_, MemoryJobRow>(&sql).bind(namespace_id);
2154 if let Some(jt) = job_type {
2155 query = query.bind(jt);
2156 }
2157 if let Some(st) = status {
2158 query = query.bind(st);
2159 }
2160 query = query.bind(limit).bind(offset);
2161
2162 let rows = query.fetch_all(&self.pool).await.map_err(db_error)?;
2163 Ok(rows)
2164 }
2165
2166 pub async fn count_jobs(
2168 &self,
2169 namespace_id: i64,
2170 job_type: Option<&str>,
2171 status: Option<&str>,
2172 ) -> Result<i64> {
2173 let mut where_clauses = vec!["namespace_id = ?".to_string()];
2174 if job_type.is_some() {
2175 where_clauses.push("job_type = ?".to_string());
2176 }
2177 if status.is_some() {
2178 where_clauses.push("status = ?".to_string());
2179 }
2180 let where_sql = where_clauses.join(" AND ");
2181
2182 let sql = format!("SELECT COUNT(*) FROM memory_jobs WHERE {}", where_sql);
2183
2184 let mut query = sqlx::query_scalar::<_, i64>(&sql).bind(namespace_id);
2185 if let Some(jt) = job_type {
2186 query = query.bind(jt);
2187 }
2188 if let Some(st) = status {
2189 query = query.bind(st);
2190 }
2191
2192 let count = query.fetch_one(&self.pool).await.map_err(db_error)?;
2193 Ok(count)
2194 }
2195
2196 pub async fn count_jobs_by_status(
2198 &self,
2199 namespace_id: i64,
2200 job_type: Option<&str>,
2201 ) -> Result<Vec<(String, i64)>> {
2202 let mut where_clauses = vec!["namespace_id = ?".to_string()];
2203 if job_type.is_some() {
2204 where_clauses.push("job_type = ?".to_string());
2205 }
2206 let where_sql = where_clauses.join(" AND ");
2207
2208 let sql = format!(
2209 "SELECT status, COUNT(*) as cnt FROM memory_jobs WHERE {} GROUP BY status",
2210 where_sql
2211 );
2212
2213 let mut query = sqlx::query_as::<_, (String, i64)>(&sql).bind(namespace_id);
2214 if let Some(jt) = job_type {
2215 query = query.bind(jt);
2216 }
2217
2218 let rows = query.fetch_all(&self.pool).await.map_err(db_error)?;
2219 Ok(rows)
2220 }
2221
2222 pub async fn purge_completed_jobs(&self, older_than: DateTime<Utc>) -> Result<u64> {
2226 let result = sqlx::query(
2227 r#"
2228 DELETE FROM memory_jobs
2229 WHERE status = ? AND updated_at < ?
2230 "#,
2231 )
2232 .bind(memory_job_status::COMPLETED)
2233 .bind(older_than.format("%Y-%m-%d %H:%M:%S").to_string())
2234 .execute(&self.pool)
2235 .await
2236 .map_err(db_error)?;
2237
2238 Ok(result.rows_affected())
2239 }
2240
2241 pub async fn purge_permanently_failed_jobs(&self, older_than: DateTime<Utc>) -> Result<u64> {
2246 let result = sqlx::query(
2247 r#"
2248 DELETE FROM memory_jobs
2249 WHERE status = ? AND attempts >= ? AND updated_at < ?
2250 "#,
2251 )
2252 .bind(memory_job_status::FAILED)
2253 .bind(MAX_JOB_ATTEMPTS)
2254 .bind(older_than.format("%Y-%m-%d %H:%M:%S").to_string())
2255 .execute(&self.pool)
2256 .await
2257 .map_err(db_error)?;
2258
2259 Ok(result.rows_affected())
2260 }
2261
2262 pub async fn list_digests(
2264 &self,
2265 namespace_id: i64,
2266 session_key: Option<&str>,
2267 limit: i64,
2268 offset: i64,
2269 ) -> Result<Vec<SessionDigestRow>> {
2270 let mut query = if let Some(sk) = session_key {
2271 sqlx::query_as::<_, SessionDigestRow>(
2272 "SELECT * FROM session_digests WHERE namespace_id = ? AND session_key = ? ORDER BY created_at DESC LIMIT ? OFFSET ?"
2273 )
2274 .bind(namespace_id)
2275 .bind(sk)
2276 } else {
2277 sqlx::query_as::<_, SessionDigestRow>(
2278 "SELECT * FROM session_digests WHERE namespace_id = ? ORDER BY created_at DESC LIMIT ? OFFSET ?"
2279 )
2280 .bind(namespace_id)
2281 };
2282
2283 query = query.bind(limit).bind(offset);
2284 let rows = query.fetch_all(&self.pool).await.map_err(db_error)?;
2285 Ok(rows)
2286 }
2287
2288 pub async fn count_digests(&self, namespace_id: i64, session_key: Option<&str>) -> Result<i64> {
2290 let query = if let Some(sk) = session_key {
2291 sqlx::query_scalar(
2292 "SELECT COUNT(*) FROM session_digests WHERE namespace_id = ? AND session_key = ?",
2293 )
2294 .bind(namespace_id)
2295 .bind(sk)
2296 } else {
2297 sqlx::query_scalar("SELECT COUNT(*) FROM session_digests WHERE namespace_id = ?")
2298 .bind(namespace_id)
2299 };
2300
2301 let count: i64 = query.fetch_one(&self.pool).await.map_err(db_error)?;
2302 Ok(count)
2303 }
2304
2305 pub async fn count_evidence(&self, namespace_id: i64) -> Result<i64> {
2307 let count: i64 = sqlx::query_scalar(
2308 "SELECT COUNT(*) FROM memory_evidence WHERE derived_memory_id IN (SELECT id FROM memories WHERE namespace_id = ?)"
2309 )
2310 .bind(namespace_id)
2311 .fetch_one(&self.pool)
2312 .await
2313 .map_err(db_error)?;
2314 Ok(count)
2315 }
2316
2317 pub async fn record_metric(
2319 &self,
2320 metric_name: &str,
2321 metric_value: f64,
2322 labels: &serde_json::Value,
2323 ) -> Result<i64> {
2324 let labels_json = serde_json::to_string(labels)?;
2325 let id: i64 = sqlx::query_scalar(
2326 r#"
2327 INSERT INTO system_metrics (metric_name, metric_value, labels, recorded_at)
2328 VALUES (?, ?, ?, ?)
2329 RETURNING id
2330 "#,
2331 )
2332 .bind(metric_name)
2333 .bind(metric_value)
2334 .bind(labels_json)
2335 .bind(Utc::now())
2336 .fetch_one(&self.pool)
2337 .await
2338 .map_err(db_error)?;
2339 Ok(id)
2340 }
2341
2342 pub async fn record_metrics_batch(&self, samples: &[MetricSample]) -> Result<()> {
2344 if samples.is_empty() {
2345 return Ok(());
2346 }
2347
2348 let mut tx = self.pool.begin().await.map_err(db_error)?;
2349 for sample in samples {
2350 let labels_json = serde_json::to_string(&sample.labels)?;
2351 sqlx::query(
2352 r#"
2353 INSERT INTO system_metrics (metric_name, metric_value, labels, recorded_at)
2354 VALUES (?, ?, ?, ?)
2355 "#,
2356 )
2357 .bind(&sample.metric_name)
2358 .bind(sample.metric_value)
2359 .bind(labels_json)
2360 .bind(Utc::now())
2361 .execute(&mut *tx)
2362 .await
2363 .map_err(db_error)?;
2364 }
2365 tx.commit().await.map_err(db_error)?;
2366 Ok(())
2367 }
2368
2369 pub async fn latest_metrics_for_namespace(
2371 &self,
2372 namespace_id: i64,
2373 metric_prefix: Option<&str>,
2374 limit: i64,
2375 ) -> Result<Vec<SystemMetricRow>> {
2376 let limit = limit.max(1);
2377 let rows = if let Some(prefix) = metric_prefix {
2378 sqlx::query_as::<_, SystemMetricRow>(
2379 r#"
2380 SELECT *
2381 FROM system_metrics
2382 WHERE json_extract(COALESCE(labels, '{}'), '$.namespace_id') = ?
2383 AND metric_name LIKE ?
2384 ORDER BY recorded_at DESC, id DESC
2385 LIMIT ?
2386 "#,
2387 )
2388 .bind(namespace_id)
2389 .bind(format!("{prefix}%"))
2390 .bind(limit)
2391 .fetch_all(&self.pool)
2392 .await
2393 .map_err(db_error)?
2394 } else {
2395 sqlx::query_as::<_, SystemMetricRow>(
2396 r#"
2397 SELECT *
2398 FROM system_metrics
2399 WHERE json_extract(COALESCE(labels, '{}'), '$.namespace_id') = ?
2400 ORDER BY recorded_at DESC, id DESC
2401 LIMIT ?
2402 "#,
2403 )
2404 .bind(namespace_id)
2405 .bind(limit)
2406 .fetch_all(&self.pool)
2407 .await
2408 .map_err(db_error)?
2409 };
2410 Ok(rows)
2411 }
2412
2413 pub async fn count_by_cognitive_level(
2415 &self,
2416 namespace_id: i64,
2417 level: CognitiveLevel,
2418 ) -> Result<i64> {
2419 let count: i64 = sqlx::query_scalar(
2420 r#"
2421 SELECT COUNT(*) FROM memories
2422 WHERE namespace_id = ?
2423 AND is_active = 1
2424 AND json_extract(COALESCE(metadata, '{}'), '$.cognitive.level') = ?
2425 "#,
2426 )
2427 .bind(namespace_id)
2428 .bind(level.as_str())
2429 .fetch_one(&self.pool)
2430 .await
2431 .map_err(db_error)?;
2432 Ok(count)
2433 }
2434}
2435
2436async fn insert_memory_tx(
2437 tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>,
2438 params: &StoreMemoryParams<'_>,
2439) -> Result<i64> {
2440 let labels_json = serde_json::to_string(params.labels)?;
2441 let metadata_json = serde_json::to_string(params.metadata)?;
2442 let embedding_json = params.embedding.map(serde_json::to_string).transpose()?;
2443
2444 let result = sqlx::query(
2445 r#"
2446 INSERT INTO memories (
2447 namespace_id, content, category, memory_lane_type, labels, metadata,
2448 content_embedding, embedding_model, created_at, is_active, access_count
2449 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 1, 0)
2450 "#,
2451 )
2452 .bind(params.namespace_id)
2453 .bind(params.content)
2454 .bind(params.category.to_string())
2455 .bind(params.memory_lane_type.map(|t| t.to_string()))
2456 .bind(&labels_json)
2457 .bind(&metadata_json)
2458 .bind(&embedding_json)
2459 .bind(params.embedding_model)
2460 .bind(Utc::now())
2461 .execute(&mut **tx)
2462 .await
2463 .map_err(db_error)?;
2464
2465 let inserted_id = result.last_insert_rowid();
2466 if inserted_id != 0 {
2467 return Ok(inserted_id);
2468 }
2469
2470 let row: Option<MemoryRow> = sqlx::query_as(
2471 "SELECT * FROM memories WHERE namespace_id = ? AND LOWER(TRIM(content)) = LOWER(TRIM(?)) AND is_active = 1 ORDER BY created_at DESC LIMIT 1",
2472 )
2473 .bind(params.namespace_id)
2474 .bind(params.content)
2475 .fetch_optional(&mut **tx)
2476 .await
2477 .map_err(db_error)?;
2478
2479 row.map(|memory| memory.id).ok_or_else(|| {
2480 nexus_core::NexusError::Storage(
2481 "Duplicate merged by trigger but matching row not found".to_string(),
2482 )
2483 })
2484}
2485
2486async fn insert_evidence_tx(
2487 tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>,
2488 derived_memory_id: i64,
2489 source_memory_id: i64,
2490 evidence_role: &str,
2491) -> Result<()> {
2492 sqlx::query(
2493 r#"
2494 INSERT OR IGNORE INTO memory_evidence (derived_memory_id, source_memory_id, evidence_role, created_at)
2495 VALUES (?, ?, ?, datetime('now'))
2496 "#,
2497 )
2498 .bind(derived_memory_id)
2499 .bind(source_memory_id)
2500 .bind(evidence_role)
2501 .execute(&mut **tx)
2502 .await
2503 .map_err(db_error)?;
2504
2505 Ok(())
2506}
2507
2508fn new_claim_token(lease_owner: &str) -> String {
2509 let nanos = Utc::now().timestamp_nanos_opt().unwrap_or_default();
2510 format!("{lease_owner}-{nanos}-{}", std::process::id())
2511}
2512
2513fn merge_labels(existing: &[String], incoming: &[String]) -> Vec<String> {
2514 let mut merged = existing.to_vec();
2515 for label in incoming {
2516 if !merged
2517 .iter()
2518 .any(|current| current.eq_ignore_ascii_case(label))
2519 {
2520 merged.push(label.clone());
2521 }
2522 }
2523 merged
2524}
2525
2526fn merge_duplicate_metadata(
2527 existing: &serde_json::Value,
2528 incoming: &serde_json::Value,
2529) -> serde_json::Value {
2530 let mut merged = existing.clone();
2531
2532 if let Some(session_key) = incoming
2533 .pointer("/cognitive/session_key")
2534 .and_then(serde_json::Value::as_str)
2535 {
2536 let mut session_keys = existing
2537 .pointer("/cognitive/session_keys")
2538 .and_then(serde_json::Value::as_array)
2539 .cloned()
2540 .unwrap_or_default();
2541 if let Some(existing_key) = existing
2542 .pointer("/cognitive/session_key")
2543 .and_then(serde_json::Value::as_str)
2544 {
2545 push_unique_json_string(&mut session_keys, existing_key);
2546 }
2547 push_unique_json_string(&mut session_keys, session_key);
2548 ensure_object_path(&mut merged, "cognitive").insert(
2549 "session_key".to_string(),
2550 serde_json::Value::String(session_key.to_string()),
2551 );
2552 ensure_object_path(&mut merged, "cognitive").insert(
2553 "session_keys".to_string(),
2554 serde_json::Value::Array(session_keys),
2555 );
2556 }
2557
2558 if let Some(derived_session_key) = incoming
2559 .pointer("/source/derived_session_key")
2560 .and_then(serde_json::Value::as_str)
2561 {
2562 let mut derived_keys = existing
2563 .pointer("/source/derived_session_keys")
2564 .and_then(serde_json::Value::as_array)
2565 .cloned()
2566 .unwrap_or_default();
2567 if let Some(existing_key) = existing
2568 .pointer("/source/derived_session_key")
2569 .and_then(serde_json::Value::as_str)
2570 {
2571 push_unique_json_string(&mut derived_keys, existing_key);
2572 }
2573 push_unique_json_string(&mut derived_keys, derived_session_key);
2574 ensure_object_path(&mut merged, "source").insert(
2575 "derived_session_key".to_string(),
2576 serde_json::Value::String(derived_session_key.to_string()),
2577 );
2578 ensure_object_path(&mut merged, "source").insert(
2579 "derived_session_keys".to_string(),
2580 serde_json::Value::Array(derived_keys),
2581 );
2582 }
2583
2584 merged
2585}
2586
2587fn push_unique_json_string(values: &mut Vec<serde_json::Value>, candidate: &str) {
2588 if values
2589 .iter()
2590 .filter_map(serde_json::Value::as_str)
2591 .any(|current| current.eq_ignore_ascii_case(candidate))
2592 {
2593 return;
2594 }
2595 values.push(serde_json::Value::String(candidate.to_string()));
2596}
2597
2598fn ensure_object_path<'a>(
2599 root: &'a mut serde_json::Value,
2600 key: &str,
2601) -> &'a mut serde_json::Map<String, serde_json::Value> {
2602 if !root.is_object() {
2603 *root = serde_json::Value::Object(serde_json::Map::new());
2604 }
2605
2606 let object = root.as_object_mut().expect("root object ensured");
2607 let entry = object
2608 .entry(key.to_string())
2609 .or_insert_with(|| serde_json::Value::Object(serde_json::Map::new()));
2610 if !entry.is_object() {
2611 *entry = serde_json::Value::Object(serde_json::Map::new());
2612 }
2613
2614 entry.as_object_mut().expect("child object ensured")
2615}
2616
2617pub struct NamespaceRepository {
2619 pool: SqlitePool,
2620}
2621
2622impl NamespaceRepository {
2623 pub fn new(pool: SqlitePool) -> Self {
2624 Self { pool }
2625 }
2626
2627 pub async fn get_or_create(&self, name: &str, agent_type: &str) -> Result<AgentNamespace> {
2629 if let Some(ns) = self.get_by_name(name).await? {
2630 return Ok(ns);
2631 }
2632
2633 let result = sqlx::query(
2634 "INSERT INTO agent_namespaces (name, agent_type, created_at) VALUES (?, ?, ?)",
2635 )
2636 .bind(name)
2637 .bind(agent_type)
2638 .bind(Utc::now())
2639 .execute(&self.pool)
2640 .await
2641 .map_err(db_error)?;
2642
2643 let id = result.last_insert_rowid();
2644 Ok(AgentNamespace {
2645 id,
2646 name: name.to_string(),
2647 description: None,
2648 agent_type: agent_type.to_string(),
2649 created_at: Utc::now(),
2650 updated_at: None,
2651 })
2652 }
2653
2654 pub async fn get_by_name(&self, name: &str) -> Result<Option<AgentNamespace>> {
2656 let row: Option<AgentNamespaceRow> =
2657 sqlx::query_as("SELECT * FROM agent_namespaces WHERE name = ?")
2658 .bind(name)
2659 .fetch_optional(&self.pool)
2660 .await
2661 .map_err(db_error)?;
2662
2663 Ok(row.map(|r| AgentNamespace {
2664 id: r.id,
2665 name: r.name,
2666 description: r.description,
2667 agent_type: r.agent_type,
2668 created_at: r.created_at,
2669 updated_at: r.updated_at,
2670 }))
2671 }
2672
2673 pub async fn get_by_id(&self, id: i64) -> Result<Option<AgentNamespace>> {
2675 let row: Option<AgentNamespaceRow> =
2676 sqlx::query_as("SELECT * FROM agent_namespaces WHERE id = ?")
2677 .bind(id)
2678 .fetch_optional(&self.pool)
2679 .await
2680 .map_err(db_error)?;
2681
2682 Ok(row.map(|r| AgentNamespace {
2683 id: r.id,
2684 name: r.name,
2685 description: r.description,
2686 agent_type: r.agent_type,
2687 created_at: r.created_at,
2688 updated_at: r.updated_at,
2689 }))
2690 }
2691
2692 pub async fn list_all(&self) -> Result<Vec<AgentNamespace>> {
2694 let rows: Vec<AgentNamespaceRow> =
2695 sqlx::query_as("SELECT * FROM agent_namespaces ORDER BY name")
2696 .fetch_all(&self.pool)
2697 .await
2698 .map_err(db_error)?;
2699
2700 Ok(rows
2701 .into_iter()
2702 .map(|r| AgentNamespace {
2703 id: r.id,
2704 name: r.name,
2705 description: r.description,
2706 agent_type: r.agent_type,
2707 created_at: r.created_at,
2708 updated_at: r.updated_at,
2709 })
2710 .collect())
2711 }
2712}
2713
2714pub struct ProcessedFileRepository<'a> {
2716 pub pool: &'a SqlitePool,
2717}
2718
2719impl<'a> ProcessedFileRepository<'a> {
2720 pub fn new(pool: &'a SqlitePool) -> Self {
2721 Self { pool }
2722 }
2723
2724 pub async fn is_processed(&self, namespace_id: i64, path: &str) -> Result<bool> {
2726 let row: Option<(i64,)> =
2727 sqlx::query_as("SELECT id FROM processed_files WHERE namespace_id = ? AND path = ? AND status = 'completed'")
2728 .bind(namespace_id)
2729 .bind(path)
2730 .fetch_optional(self.pool)
2731 .await
2732 .map_err(db_error)?;
2733
2734 Ok(row.is_some())
2735 }
2736
2737 pub async fn get_completed_paths(
2739 &self,
2740 namespace_id: i64,
2741 ) -> Result<std::collections::HashSet<String>> {
2742 let rows: Vec<(String,)> = sqlx::query_as(
2743 "SELECT path FROM processed_files WHERE namespace_id = ? AND status = 'completed'",
2744 )
2745 .bind(namespace_id)
2746 .fetch_all(self.pool)
2747 .await
2748 .map_err(db_error)?;
2749
2750 Ok(rows.into_iter().map(|r| r.0).collect())
2751 }
2752
2753 pub async fn mark_processing(
2755 &self,
2756 namespace_id: i64,
2757 path: &str,
2758 content_hash: Option<&str>,
2759 ) -> Result<i64> {
2760 let id: i64 = sqlx::query_scalar(
2761 r#"
2762 INSERT INTO processed_files (namespace_id, path, content_hash, status, updated_at)
2763 VALUES (?, ?, ?, 'processing', datetime('now'))
2764 ON CONFLICT(namespace_id, path) DO UPDATE SET
2765 content_hash = excluded.content_hash,
2766 status = 'processing',
2767 updated_at = datetime('now')
2768 RETURNING id
2769 "#,
2770 )
2771 .bind(namespace_id)
2772 .bind(path)
2773 .bind(content_hash)
2774 .fetch_one(self.pool)
2775 .await
2776 .map_err(db_error)?;
2777
2778 Ok(id)
2779 }
2780
2781 pub async fn mark_processed(&self, id: i64, memory_id: i64) -> Result<()> {
2783 sqlx::query(
2784 r#"
2785 UPDATE processed_files
2786 SET status = 'completed', memory_id = ?, processed_at = datetime('now'), updated_at = datetime('now')
2787 WHERE id = ?
2788 "#
2789 )
2790 .bind(memory_id)
2791 .bind(id)
2792 .execute(self.pool)
2793 .await
2794 .map_err(db_error)?;
2795
2796 Ok(())
2797 }
2798
2799 pub async fn mark_failed(&self, id: i64, error: &str) -> Result<()> {
2801 sqlx::query(
2802 r#"
2803 UPDATE processed_files
2804 SET status = 'failed', last_error = ?, updated_at = datetime('now')
2805 WHERE id = ?
2806 "#,
2807 )
2808 .bind(error)
2809 .bind(id)
2810 .execute(self.pool)
2811 .await
2812 .map_err(db_error)?;
2813
2814 Ok(())
2815 }
2816
2817 pub async fn get_pending(
2819 &self,
2820 namespace_id: i64,
2821 limit: i32,
2822 ) -> Result<Vec<ProcessedFileRow>> {
2823 let rows = sqlx::query_as::<_, ProcessedFileRow>(
2824 r#"
2825 SELECT * FROM processed_files
2826 WHERE namespace_id = ? AND status = 'pending'
2827 ORDER BY created_at ASC
2828 LIMIT ?
2829 "#,
2830 )
2831 .bind(namespace_id)
2832 .bind(limit)
2833 .fetch_all(self.pool)
2834 .await
2835 .map_err(db_error)?;
2836
2837 Ok(rows)
2838 }
2839
2840 pub async fn clear_namespace(&self, namespace_id: i64) -> Result<u64> {
2842 let result = sqlx::query("DELETE FROM processed_files WHERE namespace_id = ?")
2843 .bind(namespace_id)
2844 .execute(self.pool)
2845 .await
2846 .map_err(db_error)?;
2847
2848 Ok(result.rows_affected())
2849 }
2850}
2851
2852pub struct MemoryRelationRepository<'a> {
2854 pub pool: &'a SqlitePool,
2855}
2856
2857impl<'a> MemoryRelationRepository<'a> {
2858 pub fn new(pool: &'a SqlitePool) -> Self {
2859 Self { pool }
2860 }
2861
2862 pub async fn store(
2864 &self,
2865 source_id: i64,
2866 target_id: i64,
2867 relation_type: &str,
2868 strength: f32,
2869 ) -> Result<i64> {
2870 let id: i64 = sqlx::query_scalar(
2871 r#"
2872 INSERT INTO memory_relations (source_memory_id, target_memory_id, relation_type, strength, created_at)
2873 VALUES (?, ?, ?, ?, datetime('now'))
2874 ON CONFLICT(source_memory_id, target_memory_id, relation_type) DO UPDATE SET
2875 strength = excluded.strength,
2876 created_at = datetime('now')
2877 RETURNING id
2878 "#
2879 )
2880 .bind(source_id)
2881 .bind(target_id)
2882 .bind(relation_type)
2883 .bind(strength)
2884 .fetch_one(self.pool)
2885 .await
2886 .map_err(db_error)?;
2887
2888 Ok(id)
2889 }
2890
2891 pub async fn get_related(&self, memory_id: i64) -> Result<Vec<(i64, String, f32)>> {
2893 let rows: Vec<(i64, String, f32)> = sqlx::query_as(
2894 r#"
2895 SELECT target_memory_id as memory_id, relation_type, strength
2896 FROM memory_relations
2897 WHERE source_memory_id = ?
2898 UNION
2899 SELECT source_memory_id as memory_id, relation_type, strength
2900 FROM memory_relations
2901 WHERE target_memory_id = ?
2902 ORDER BY strength DESC
2903 "#,
2904 )
2905 .bind(memory_id)
2906 .bind(memory_id)
2907 .fetch_all(self.pool)
2908 .await
2909 .map_err(db_error)?;
2910
2911 Ok(rows)
2912 }
2913
2914 pub async fn delete_for_memory(&self, memory_id: i64) -> Result<u64> {
2916 let result = sqlx::query(
2917 "DELETE FROM memory_relations WHERE source_memory_id = ? OR target_memory_id = ?",
2918 )
2919 .bind(memory_id)
2920 .bind(memory_id)
2921 .execute(self.pool)
2922 .await
2923 .map_err(db_error)?;
2924
2925 Ok(result.rows_affected())
2926 }
2927}
2928
2929fn parse_category(s: &str) -> Result<Category> {
2930 match MemoryCategory::parse(s) {
2931 Some(cat) => Ok(cat),
2932 None => Err(nexus_core::NexusError::Storage(format!(
2933 "Unknown memory category '{s}' persisted in database; row may be corrupted"
2934 ))),
2935 }
2936}
2937
2938fn parse_memory_lane_type(s: &str) -> Result<Option<MemoryLaneType>> {
2939 match MemoryLaneType::parse(s) {
2940 Some(t) => Ok(Some(t)),
2941 None => Err(nexus_core::NexusError::Storage(format!(
2942 "Unknown memory_lane_type '{s}' persisted in database; row may be corrupted"
2943 ))),
2944 }
2945}
2946
2947#[cfg(test)]
2948mod tests {
2949 use super::*;
2950 use nexus_core::MemoryLanePriorityType;
2951 use sqlx::sqlite::SqlitePoolOptions;
2952
2953 fn cognitive_metadata(
2954 level: CognitiveLevel,
2955 perspective: &PerspectiveKey,
2956 times_reinforced: i64,
2957 times_contradicted: i64,
2958 ) -> serde_json::Value {
2959 serde_json::json!({
2960 "cognitive": {
2961 "level": level.as_str(),
2962 "observer": perspective.observer,
2963 "subject": perspective.subject,
2964 "session_key": perspective.session_key,
2965 "source_memory_ids": [],
2966 "confidence": 0.9,
2967 "times_reinforced": times_reinforced,
2968 "times_contradicted": times_contradicted,
2969 "generated_by": "test",
2970 }
2971 })
2972 }
2973
2974 #[test]
2975 fn test_parse_category() {
2976 assert!(matches!(parse_category("facts"), Ok(Category::Facts)));
2977 assert!(matches!(
2978 parse_category("preferences"),
2979 Ok(Category::Preferences)
2980 ));
2981 assert!(parse_category("unknown").is_err());
2982 }
2983
2984 #[test]
2985 fn test_parse_memory_lane_type() {
2986 let correction = parse_memory_lane_type("correction");
2987 assert!(matches!(
2988 correction,
2989 Ok(Some(MemoryLaneType::Priority(
2990 MemoryLanePriorityType::Correction
2991 )))
2992 ));
2993
2994 let pattern_seed = parse_memory_lane_type("pattern_seed");
2995 assert!(matches!(
2996 pattern_seed,
2997 Ok(Some(MemoryLaneType::Priority(
2998 MemoryLanePriorityType::PatternSeed
2999 )))
3000 ));
3001
3002 assert!(parse_memory_lane_type("unknown").is_err());
3003 }
3004
3005 #[test]
3006 fn test_parse_category_all_variants() {
3007 assert!(matches!(parse_category("general"), Ok(Category::General)));
3008 assert!(matches!(parse_category("session"), Ok(Category::Session)));
3009 assert!(matches!(parse_category("context"), Ok(Category::Context)));
3010 assert!(matches!(
3011 parse_category("specifications"),
3012 Ok(Category::Specifications)
3013 ));
3014 assert!(matches!(parse_category("facts"), Ok(Category::Facts)));
3015 assert!(matches!(
3016 parse_category("preferences"),
3017 Ok(Category::Preferences)
3018 ));
3019 assert!(parse_category("bogus").is_err());
3021 assert!(parse_category("").is_err());
3022 }
3023
3024 #[test]
3025 fn test_store_memory_params_fields() {
3026 let params = StoreMemoryParams {
3028 namespace_id: 1,
3029 content: "test content",
3030 category: &Category::General,
3031 memory_lane_type: None,
3032 labels: &[],
3033 metadata: &serde_json::Value::Null,
3034 embedding: None,
3035 embedding_model: None,
3036 };
3037 assert_eq!(params.namespace_id, 1);
3038 assert_eq!(params.content, "test content");
3039 assert!(params.labels.is_empty());
3040 }
3041
3042 #[test]
3043 fn test_merge_duplicate_metadata_preserves_multiple_session_keys() {
3044 let existing = serde_json::json!({
3045 "cognitive": {
3046 "session_key": "session-a"
3047 },
3048 "source": {
3049 "derived_session_key": "session-a"
3050 }
3051 });
3052 let incoming = serde_json::json!({
3053 "cognitive": {
3054 "session_key": "session-b"
3055 },
3056 "source": {
3057 "derived_session_key": "session-b"
3058 }
3059 });
3060
3061 let merged = merge_duplicate_metadata(&existing, &incoming);
3062 assert_eq!(merged["cognitive"]["session_key"], "session-b");
3063 assert_eq!(
3064 merged["cognitive"]["session_keys"],
3065 serde_json::json!(["session-a", "session-b"])
3066 );
3067 assert_eq!(
3068 merged["source"]["derived_session_keys"],
3069 serde_json::json!(["session-a", "session-b"])
3070 );
3071 }
3072
3073 async fn setup_test_db() -> SqlitePool {
3076 let pool = SqlitePoolOptions::new()
3077 .max_connections(1)
3078 .connect("sqlite::memory:")
3079 .await
3080 .unwrap();
3081 crate::migrations::run_migrations(&pool).await.unwrap();
3082 pool
3083 }
3084
3085 async fn create_namespace(pool: &SqlitePool, name: &str) -> i64 {
3086 let ns = NamespaceRepository::new(pool.clone());
3087 ns.get_or_create(name, "test").await.unwrap();
3088 ns.get_by_name(name).await.unwrap().unwrap().id
3089 }
3090
3091 #[tokio::test]
3097 async fn test_get_by_content_matches_actual_content() {
3098 let pool = setup_test_db().await;
3099 let ns_id = create_namespace(&pool, "test-agent").await;
3100 let repo = MemoryRepository::new(pool);
3101
3102 let mem_a = repo
3104 .store(StoreMemoryParams {
3105 namespace_id: ns_id,
3106 content: "first memory content",
3107 category: &Category::General,
3108 memory_lane_type: None,
3109 labels: &[],
3110 metadata: &serde_json::Value::Null,
3111 embedding: None,
3112 embedding_model: None,
3113 })
3114 .await
3115 .unwrap();
3116
3117 let mem_b = repo
3118 .store(StoreMemoryParams {
3119 namespace_id: ns_id,
3120 content: "second memory content",
3121 category: &Category::General,
3122 memory_lane_type: None,
3123 labels: &[],
3124 metadata: &serde_json::Value::Null,
3125 embedding: None,
3126 embedding_model: None,
3127 })
3128 .await
3129 .unwrap();
3130
3131 assert_ne!(mem_a.id, mem_b.id);
3132
3133 let found_a = repo
3136 .get_by_content(ns_id, "first memory content")
3137 .await
3138 .unwrap();
3139 assert_eq!(found_a.id, mem_a.id);
3140 assert_eq!(found_a.content, "first memory content");
3141
3142 let found_b = repo
3143 .get_by_content(ns_id, "second memory content")
3144 .await
3145 .unwrap();
3146 assert_eq!(found_b.id, mem_b.id);
3147 assert_eq!(found_b.content, "second memory content");
3148
3149 let result = repo.get_by_content(ns_id, "nonexistent").await;
3151 assert!(result.is_err());
3152 }
3153
3154 #[tokio::test]
3155 async fn test_enqueue_and_claim_jobs() {
3156 let pool = setup_test_db().await;
3157 let ns_id = create_namespace(&pool, "test-agent").await;
3158 let repo = MemoryRepository::new(pool);
3159
3160 let id1 = repo
3162 .enqueue_job(EnqueueJobParams {
3163 namespace_id: ns_id,
3164 job_type: "derive_memory",
3165 priority: 100,
3166 perspective: None,
3167 payload: &serde_json::json!({"memory_id": 1}),
3168 })
3169 .await
3170 .unwrap();
3171
3172 let id2 = repo
3173 .enqueue_job(EnqueueJobParams {
3174 namespace_id: ns_id,
3175 job_type: "derive_memory",
3176 priority: 50,
3177 perspective: None,
3178 payload: &serde_json::json!({"memory_id": 2}),
3179 })
3180 .await
3181 .unwrap();
3182
3183 assert!(id1 > 0);
3184 assert!(id2 > 0);
3185 assert_ne!(id1, id2);
3186
3187 let claimed = repo
3189 .claim_jobs(ns_id, "derive_memory", "worker-1", 120, 1)
3190 .await
3191 .unwrap();
3192
3193 assert_eq!(claimed.len(), 1);
3194 assert_eq!(claimed[0].row.id, id1); assert_eq!(claimed[0].row.status, "running");
3196 assert_eq!(claimed[0].payload["memory_id"], 1);
3197
3198 let claimed2 = repo
3200 .claim_jobs(ns_id, "derive_memory", "worker-2", 120, 1)
3201 .await
3202 .unwrap();
3203
3204 assert_eq!(claimed2.len(), 1);
3205 assert_eq!(claimed2[0].row.id, id2);
3206 }
3207
3208 #[tokio::test]
3209 async fn test_complete_and_fail_job() {
3210 let pool = setup_test_db().await;
3211 let ns_id = create_namespace(&pool, "test-agent").await;
3212 let repo = MemoryRepository::new(pool);
3213
3214 let _id = repo
3215 .enqueue_job(EnqueueJobParams {
3216 namespace_id: ns_id,
3217 job_type: "digest_session",
3218 priority: 100,
3219 perspective: None,
3220 payload: &serde_json::json!({"session": "s1"}),
3221 })
3222 .await
3223 .unwrap();
3224
3225 let claimed = repo
3227 .claim_jobs(ns_id, "digest_session", "w", 60, 10)
3228 .await
3229 .unwrap();
3230 assert_eq!(claimed.len(), 1);
3231
3232 repo.complete_job(&claimed[0]).await.unwrap();
3233
3234 let claimed_again = repo
3236 .claim_jobs(ns_id, "digest_session", "w", 60, 10)
3237 .await
3238 .unwrap();
3239 assert!(claimed_again.is_empty());
3240 }
3241
3242 #[tokio::test]
3243 async fn test_fail_job_requeues_before_limit() {
3244 let pool = setup_test_db().await;
3245 let ns_id = create_namespace(&pool, "test-agent").await;
3246 let repo = MemoryRepository::new(pool);
3247
3248 let _id = repo
3249 .enqueue_job(EnqueueJobParams {
3250 namespace_id: ns_id,
3251 job_type: "derive_memory",
3252 priority: 100,
3253 perspective: None,
3254 payload: &serde_json::json!({"test": true}),
3255 })
3256 .await
3257 .unwrap();
3258
3259 let claimed = repo
3261 .claim_jobs(ns_id, "derive_memory", "w", 60, 10)
3262 .await
3263 .unwrap();
3264 repo.fail_job(&claimed[0], "transient error").await.unwrap();
3265
3266 let reclaimed = repo
3267 .claim_jobs(ns_id, "derive_memory", "w", 60, 10)
3268 .await
3269 .unwrap();
3270 assert_eq!(reclaimed.len(), 1);
3271 assert_eq!(reclaimed[0].row.attempts, 2);
3272 }
3273
3274 #[tokio::test]
3275 async fn test_complete_job_requires_matching_claim_token() {
3276 let pool = setup_test_db().await;
3277 let ns_id = create_namespace(&pool, "test-agent").await;
3278 let repo = MemoryRepository::new(pool);
3279
3280 repo.enqueue_job(EnqueueJobParams {
3281 namespace_id: ns_id,
3282 job_type: "derive_memory",
3283 priority: 100,
3284 perspective: None,
3285 payload: &serde_json::json!({"memory_id": 7}),
3286 })
3287 .await
3288 .unwrap();
3289
3290 let claimed = repo
3291 .claim_jobs(ns_id, "derive_memory", "worker-1", 60, 1)
3292 .await
3293 .unwrap();
3294 let mut forged = claimed[0].clone();
3295 forged.row.claim_token = Some("forged-token".to_string());
3296
3297 let error = repo.complete_job(&forged).await.unwrap_err();
3298 assert!(error.to_string().contains("lost lease ownership"));
3299 }
3300
3301 #[tokio::test]
3302 async fn test_store_digest_and_latest_digest() {
3303 let pool = setup_test_db().await;
3304 let ns_id = create_namespace(&pool, "test-agent").await;
3305 let repo = MemoryRepository::new(pool);
3306
3307 let digest_memory = repo
3309 .store(StoreMemoryParams {
3310 namespace_id: ns_id,
3311 content: "session summary short",
3312 category: &Category::Session,
3313 memory_lane_type: None,
3314 labels: &[],
3315 metadata: &serde_json::json!({"cognitive": {"level": "summary_short"}}),
3316 embedding: None,
3317 embedding_model: None,
3318 })
3319 .await
3320 .unwrap();
3321
3322 let digest_id = repo
3323 .store_digest(StoreDigestParams {
3324 namespace_id: ns_id,
3325 session_key: "session-abc",
3326 digest_kind: "short",
3327 memory_id: digest_memory.id,
3328 start_memory_id: Some(1),
3329 end_memory_id: Some(100),
3330 token_count: 42,
3331 })
3332 .await
3333 .unwrap();
3334
3335 assert!(digest_id > 0);
3336
3337 let result = repo
3339 .latest_digest_for_session(ns_id, "session-abc", "short")
3340 .await
3341 .unwrap();
3342
3343 assert!(result.is_some());
3344 assert_eq!(result.as_ref().unwrap().id, digest_memory.id);
3345
3346 let replacement_memory = repo
3347 .store(StoreMemoryParams {
3348 namespace_id: ns_id,
3349 content: "session summary short updated",
3350 category: &Category::Session,
3351 memory_lane_type: None,
3352 labels: &[],
3353 metadata: &serde_json::json!({"cognitive": {"level": "summary_short"}}),
3354 embedding: None,
3355 embedding_model: None,
3356 })
3357 .await
3358 .unwrap();
3359
3360 let replacement_digest_id = repo
3361 .store_digest(StoreDigestParams {
3362 namespace_id: ns_id,
3363 session_key: "session-abc",
3364 digest_kind: "short",
3365 memory_id: replacement_memory.id,
3366 start_memory_id: Some(1),
3367 end_memory_id: Some(100),
3368 token_count: 64,
3369 })
3370 .await
3371 .unwrap();
3372
3373 assert_eq!(replacement_digest_id, digest_id);
3374
3375 let updated = repo
3376 .latest_digest_for_session(ns_id, "session-abc", "short")
3377 .await
3378 .unwrap()
3379 .unwrap();
3380 assert_eq!(updated.id, replacement_memory.id);
3381
3382 let latest_for_namespace = repo
3383 .latest_digest_for_namespace(ns_id, "short")
3384 .await
3385 .unwrap()
3386 .unwrap();
3387 assert_eq!(latest_for_namespace.id, replacement_memory.id);
3388 }
3389
3390 #[tokio::test]
3391 async fn test_session_digest_rollover_reports_new_signal_since_last_digest() {
3392 let pool = setup_test_db().await;
3393 let ns_id = create_namespace(&pool, "test-agent").await;
3394 let repo = MemoryRepository::new(pool);
3395
3396 let source = repo
3397 .store(StoreMemoryParams {
3398 namespace_id: ns_id,
3399 content: "Implemented bounded digest rollover policy.",
3400 category: &Category::Session,
3401 memory_lane_type: None,
3402 labels: &[],
3403 metadata: &serde_json::json!({
3404 "cognitive": {
3405 "level": "explicit",
3406 "observer": "claude-code",
3407 "subject": "claude-code",
3408 "session_key": "session-rollover"
3409 }
3410 }),
3411 embedding: None,
3412 embedding_model: None,
3413 })
3414 .await
3415 .unwrap();
3416
3417 let first = repo
3418 .session_digest_rollover(ns_id, "session-rollover")
3419 .await
3420 .unwrap();
3421 assert_eq!(first.last_digest_end_memory_id, None);
3422 assert_eq!(first.new_memory_count, 1);
3423 assert!(first.estimated_new_tokens > 0);
3424
3425 let digest_memory = repo
3426 .store(StoreMemoryParams {
3427 namespace_id: ns_id,
3428 content: "Short digest",
3429 category: &Category::Session,
3430 memory_lane_type: None,
3431 labels: &[],
3432 metadata: &serde_json::json!({
3433 "cognitive": {
3434 "level": "summary_short",
3435 "observer": "claude-code",
3436 "subject": "claude-code",
3437 "session_key": "session-rollover"
3438 }
3439 }),
3440 embedding: None,
3441 embedding_model: None,
3442 })
3443 .await
3444 .unwrap();
3445
3446 repo.store_digest(StoreDigestParams {
3447 namespace_id: ns_id,
3448 session_key: "session-rollover",
3449 digest_kind: "short",
3450 memory_id: digest_memory.id,
3451 start_memory_id: Some(source.id),
3452 end_memory_id: Some(source.id),
3453 token_count: 16,
3454 })
3455 .await
3456 .unwrap();
3457
3458 let covered = repo
3459 .session_digest_rollover(ns_id, "session-rollover")
3460 .await
3461 .unwrap();
3462 assert_eq!(covered.last_digest_end_memory_id, Some(source.id));
3463 assert_eq!(covered.new_memory_count, 0);
3464 assert_eq!(covered.estimated_new_tokens, 0);
3465
3466 repo.store(StoreMemoryParams {
3467 namespace_id: ns_id,
3468 content: "Added one more explicit memory after the digest coverage window.",
3469 category: &Category::Session,
3470 memory_lane_type: None,
3471 labels: &[],
3472 metadata: &serde_json::json!({
3473 "cognitive": {
3474 "level": "explicit",
3475 "observer": "claude-code",
3476 "subject": "claude-code",
3477 "session_key": "session-rollover"
3478 }
3479 }),
3480 embedding: None,
3481 embedding_model: None,
3482 })
3483 .await
3484 .unwrap();
3485
3486 let second = repo
3487 .session_digest_rollover(ns_id, "session-rollover")
3488 .await
3489 .unwrap();
3490 assert_eq!(second.last_digest_end_memory_id, Some(source.id));
3491 assert_eq!(second.new_memory_count, 1);
3492 assert!(second.estimated_new_tokens > 0);
3493 }
3494
3495 #[tokio::test]
3496 async fn test_store_with_lineage() {
3497 let pool = setup_test_db().await;
3498 let ns_id = create_namespace(&pool, "test-agent").await;
3499 let repo = MemoryRepository::new(pool);
3500
3501 let source1 = repo
3503 .store(StoreMemoryParams {
3504 namespace_id: ns_id,
3505 content: "raw observation one",
3506 category: &Category::Facts,
3507 memory_lane_type: None,
3508 labels: &[],
3509 metadata: &serde_json::Value::Null,
3510 embedding: None,
3511 embedding_model: None,
3512 })
3513 .await
3514 .unwrap();
3515
3516 let source2 = repo
3517 .store(StoreMemoryParams {
3518 namespace_id: ns_id,
3519 content: "raw observation two",
3520 category: &Category::Facts,
3521 memory_lane_type: None,
3522 labels: &[],
3523 metadata: &serde_json::Value::Null,
3524 embedding: None,
3525 embedding_model: None,
3526 })
3527 .await
3528 .unwrap();
3529
3530 let derived = repo
3532 .store_with_lineage(StoreMemoryWithLineageParams {
3533 store: StoreMemoryParams {
3534 namespace_id: ns_id,
3535 content: "derived insight",
3536 category: &Category::Facts,
3537 memory_lane_type: None,
3538 labels: &[],
3539 metadata: &serde_json::json!({"cognitive": {"level": "derived"}}),
3540 embedding: None,
3541 embedding_model: None,
3542 },
3543 source_memory_ids: &[source1.id, source2.id],
3544 evidence_role: "derived_from",
3545 })
3546 .await
3547 .unwrap();
3548
3549 assert_eq!(derived.content, "derived insight");
3550
3551 let lineage = repo.load_lineage(derived.id).await.unwrap();
3553 assert_eq!(lineage.len(), 2);
3554 assert!(lineage.iter().any(|e| e.source_memory_id == source1.id));
3555 assert!(lineage.iter().any(|e| e.source_memory_id == source2.id));
3556 }
3557
3558 #[tokio::test]
3559 async fn test_cognitive_queries_by_level_and_perspective() {
3560 let pool = setup_test_db().await;
3561 let ns_id = create_namespace(&pool, "test-agent").await;
3562 let repo = MemoryRepository::new(pool);
3563 let perspective =
3564 PerspectiveKey::new("claude-code", "claude-code", Some("session-1".into()));
3565
3566 let _raw = repo
3567 .store(StoreMemoryParams {
3568 namespace_id: ns_id,
3569 content: "raw note",
3570 category: &Category::Session,
3571 memory_lane_type: None,
3572 labels: &[],
3573 metadata: &cognitive_metadata(CognitiveLevel::Raw, &perspective, 0, 0),
3574 embedding: None,
3575 embedding_model: None,
3576 })
3577 .await
3578 .unwrap();
3579
3580 let explicit = repo
3581 .store(StoreMemoryParams {
3582 namespace_id: ns_id,
3583 content: "explicit note",
3584 category: &Category::Session,
3585 memory_lane_type: None,
3586 labels: &[],
3587 metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective, 1, 0),
3588 embedding: None,
3589 embedding_model: None,
3590 })
3591 .await
3592 .unwrap();
3593
3594 let derived = repo
3595 .store(StoreMemoryParams {
3596 namespace_id: ns_id,
3597 content: "reinforced insight",
3598 category: &Category::Facts,
3599 memory_lane_type: None,
3600 labels: &[],
3601 metadata: &cognitive_metadata(CognitiveLevel::Derived, &perspective, 7, 0),
3602 embedding: None,
3603 embedding_model: None,
3604 })
3605 .await
3606 .unwrap();
3607
3608 let contradiction = repo
3609 .store(StoreMemoryParams {
3610 namespace_id: ns_id,
3611 content: "contradiction note",
3612 category: &Category::Facts,
3613 memory_lane_type: None,
3614 labels: &[],
3615 metadata: &cognitive_metadata(CognitiveLevel::Contradiction, &perspective, 1, 5),
3616 embedding: None,
3617 embedding_model: None,
3618 })
3619 .await
3620 .unwrap();
3621
3622 let explicit_rows = repo
3623 .get_by_cognitive_level(ns_id, CognitiveLevel::Explicit, 10)
3624 .await
3625 .unwrap();
3626 assert_eq!(explicit_rows.len(), 1);
3627 assert_eq!(explicit_rows[0].id, explicit.id);
3628
3629 let recent = repo
3630 .get_recent_by_perspective(ns_id, &perspective, 10)
3631 .await
3632 .unwrap();
3633 assert_eq!(recent.len(), 4);
3634
3635 let reinforced = repo
3636 .get_most_reinforced_by_perspective(ns_id, &perspective, 10)
3637 .await
3638 .unwrap();
3639 assert_eq!(reinforced[0].id, derived.id);
3640 assert!(reinforced
3641 .iter()
3642 .all(|memory| memory.id != contradiction.id));
3643
3644 let contradictions = repo
3645 .get_contradictions_by_perspective(ns_id, &perspective, 10)
3646 .await
3647 .unwrap();
3648 assert_eq!(contradictions.len(), 1);
3649 assert_eq!(contradictions[0].id, contradiction.id);
3650 }
3651
3652 #[tokio::test]
3653 async fn test_store_distilled_summary_archives_sources_and_records_lineage() {
3654 let pool = setup_test_db().await;
3655 let ns_id = create_namespace(&pool, "test-agent").await;
3656 let repo = MemoryRepository::new(pool);
3657
3658 let source1 = repo
3659 .store(StoreMemoryParams {
3660 namespace_id: ns_id,
3661 content: "raw event 1",
3662 category: &Category::Session,
3663 memory_lane_type: None,
3664 labels: &["raw-activity".to_string()],
3665 metadata: &serde_json::json!({"raw_activity": true}),
3666 embedding: None,
3667 embedding_model: None,
3668 })
3669 .await
3670 .unwrap();
3671
3672 let source2 = repo
3673 .store(StoreMemoryParams {
3674 namespace_id: ns_id,
3675 content: "raw event 2",
3676 category: &Category::Session,
3677 memory_lane_type: None,
3678 labels: &["raw-activity".to_string()],
3679 metadata: &serde_json::json!({"raw_activity": true}),
3680 embedding: None,
3681 embedding_model: None,
3682 })
3683 .await
3684 .unwrap();
3685
3686 let summary = repo
3687 .store_distilled_summary(
3688 StoreMemoryParams {
3689 namespace_id: ns_id,
3690 content: "distilled summary",
3691 category: &Category::Session,
3692 memory_lane_type: None,
3693 labels: &["activity-summary".to_string()],
3694 metadata: &serde_json::json!({"pipeline": "distill-v1"}),
3695 embedding: None,
3696 embedding_model: None,
3697 },
3698 &[source1.id, source2.id],
3699 )
3700 .await
3701 .unwrap();
3702
3703 let source1_after = repo.get_by_id(source1.id).await.unwrap().unwrap();
3704 let source2_after = repo.get_by_id(source2.id).await.unwrap().unwrap();
3705 assert!(!source1_after.is_active);
3706 assert!(source1_after.is_archived);
3707 assert!(!source2_after.is_active);
3708 assert!(source2_after.is_archived);
3709
3710 let lineage = repo.load_lineage(summary.id).await.unwrap();
3711 assert_eq!(lineage.len(), 2);
3712 assert!(lineage.iter().all(|entry| entry.evidence_role == "source"));
3713 }
3714
3715 #[tokio::test]
3716 async fn test_load_lineage_empty() {
3717 let pool = setup_test_db().await;
3718 let _ns_id = create_namespace(&pool, "test-agent").await;
3719 let repo = MemoryRepository::new(pool);
3720
3721 let lineage = repo.load_lineage(9999).await.unwrap();
3722 assert!(lineage.is_empty());
3723 }
3724
3725 #[tokio::test]
3728 async fn test_recent_perspective_excludes_raw_noise() {
3729 let pool = setup_test_db().await;
3730 let ns_id = create_namespace(&pool, "test-agent").await;
3731 let repo = MemoryRepository::new(pool);
3732 let perspective = PerspectiveKey::new("claude-code", "claude-code", Some("s1".into()));
3733
3734 repo.store(StoreMemoryParams {
3736 namespace_id: ns_id,
3737 content: "clean observation",
3738 category: &Category::Facts,
3739 memory_lane_type: None,
3740 labels: &[],
3741 metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective, 0, 0),
3742 embedding: None,
3743 embedding_model: None,
3744 })
3745 .await
3746 .unwrap();
3747
3748 repo.store(StoreMemoryParams {
3750 namespace_id: ns_id,
3751 content: "raw noise payload",
3752 category: &Category::Session,
3753 memory_lane_type: None,
3754 labels: &["raw-activity".to_string()],
3755 metadata: &serde_json::json!({
3756 "raw_activity": true,
3757 "cognitive": {
3758 "level": "raw",
3759 "observer": perspective.observer,
3760 "subject": perspective.subject,
3761 "session_key": perspective.session_key,
3762 "source_memory_ids": [],
3763 "confidence": 0.5,
3764 "times_reinforced": 0,
3765 "times_contradicted": 0,
3766 "generated_by": "test"
3767 }
3768 }),
3769 embedding: None,
3770 embedding_model: None,
3771 })
3772 .await
3773 .unwrap();
3774
3775 let recent = repo
3777 .get_recent_by_perspective(ns_id, &perspective, 10)
3778 .await
3779 .unwrap();
3780 assert_eq!(recent.len(), 1);
3781 assert_eq!(recent[0].content, "clean observation");
3782
3783 let recent_all = repo
3785 .get_recent_by_perspective_opts(ns_id, &perspective, 10, true)
3786 .await
3787 .unwrap();
3788 assert_eq!(recent_all.len(), 2);
3789 }
3790
3791 #[tokio::test]
3792 async fn test_semantic_candidates_respect_perspective_and_raw_noise_filtering() {
3793 let pool = setup_test_db().await;
3794 let ns_id = create_namespace(&pool, "test-agent").await;
3795 let repo = MemoryRepository::new(pool);
3796 let perspective = PerspectiveKey::new("claude-code", "claude-code", Some("s1".into()));
3797
3798 repo.store(StoreMemoryParams {
3799 namespace_id: ns_id,
3800 content: "clean semantic observation",
3801 category: &Category::Facts,
3802 memory_lane_type: None,
3803 labels: &[],
3804 metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective, 0, 0),
3805 embedding: Some(&[0.1_f32; 384]),
3806 embedding_model: Some("mock"),
3807 })
3808 .await
3809 .unwrap();
3810
3811 repo.store(StoreMemoryParams {
3812 namespace_id: ns_id,
3813 content: "raw semantic noise",
3814 category: &Category::Session,
3815 memory_lane_type: None,
3816 labels: &["raw-activity".to_string()],
3817 metadata: &serde_json::json!({
3818 "raw_activity": true,
3819 "cognitive": {
3820 "level": "raw",
3821 "observer": "claude-code",
3822 "subject": "claude-code",
3823 "session_key": "s1",
3824 "generated_by": "test"
3825 }
3826 }),
3827 embedding: Some(&[0.2_f32; 384]),
3828 embedding_model: Some("mock"),
3829 })
3830 .await
3831 .unwrap();
3832
3833 repo.store(StoreMemoryParams {
3834 namespace_id: ns_id,
3835 content: "other perspective semantic",
3836 category: &Category::Facts,
3837 memory_lane_type: None,
3838 labels: &[],
3839 metadata: &serde_json::json!({
3840 "cognitive": {
3841 "level": "explicit",
3842 "observer": "codex",
3843 "subject": "codex",
3844 "session_key": "s1",
3845 "generated_by": "test"
3846 }
3847 }),
3848 embedding: Some(&[0.3_f32; 384]),
3849 embedding_model: Some("mock"),
3850 })
3851 .await
3852 .unwrap();
3853
3854 let candidates = repo
3855 .get_semantic_candidates(SemanticCandidateParams {
3856 namespace_id: ns_id,
3857 perspective: Some(&perspective),
3858 limit: 10,
3859 include_raw: false,
3860 })
3861 .await
3862 .unwrap();
3863
3864 assert_eq!(candidates.len(), 1);
3865 assert_eq!(candidates[0].content, "clean semantic observation");
3866 }
3867
3868 #[tokio::test]
3869 async fn test_semantic_candidates_match_session_keys_array() {
3870 let pool = setup_test_db().await;
3871 let ns_id = create_namespace(&pool, "test-agent").await;
3872 let repo = MemoryRepository::new(pool);
3873 let perspective = PerspectiveKey::new("claude-code", "claude-code", Some("s-array".into()));
3874
3875 repo.store(StoreMemoryParams {
3876 namespace_id: ns_id,
3877 content: "session array semantic observation",
3878 category: &Category::Facts,
3879 memory_lane_type: None,
3880 labels: &[],
3881 metadata: &serde_json::json!({
3882 "cognitive": {
3883 "level": "explicit",
3884 "observer": "claude-code",
3885 "subject": "claude-code",
3886 "session_keys": ["s-array", "s-other"],
3887 "generated_by": "test"
3888 }
3889 }),
3890 embedding: Some(&[0.4_f32; 384]),
3891 embedding_model: Some("mock"),
3892 })
3893 .await
3894 .unwrap();
3895
3896 let candidates = repo
3897 .get_semantic_candidates(SemanticCandidateParams {
3898 namespace_id: ns_id,
3899 perspective: Some(&perspective),
3900 limit: 10,
3901 include_raw: false,
3902 })
3903 .await
3904 .unwrap();
3905
3906 assert_eq!(candidates.len(), 1);
3907 assert_eq!(candidates[0].content, "session array semantic observation");
3908 }
3909
3910 #[tokio::test]
3911 async fn test_reinforced_perspective_excludes_raw_noise() {
3912 let pool = setup_test_db().await;
3913 let ns_id = create_namespace(&pool, "test-agent").await;
3914 let repo = MemoryRepository::new(pool);
3915 let perspective = PerspectiveKey::new("claude-code", "claude-code", Some("s1".into()));
3916
3917 repo.store(StoreMemoryParams {
3918 namespace_id: ns_id,
3919 content: "reinforced insight",
3920 category: &Category::Facts,
3921 memory_lane_type: None,
3922 labels: &[],
3923 metadata: &cognitive_metadata(CognitiveLevel::Derived, &perspective, 5, 0),
3924 embedding: None,
3925 embedding_model: None,
3926 })
3927 .await
3928 .unwrap();
3929
3930 repo.store(StoreMemoryParams {
3931 namespace_id: ns_id,
3932 content: "raw noise",
3933 category: &Category::Session,
3934 memory_lane_type: None,
3935 labels: &["raw-activity".to_string()],
3936 metadata: &serde_json::json!({
3937 "raw_activity": true,
3938 "cognitive": {
3939 "level": "raw",
3940 "observer": perspective.observer,
3941 "subject": perspective.subject,
3942 "session_key": perspective.session_key,
3943 "source_memory_ids": [],
3944 "confidence": 0.5,
3945 "times_reinforced": 0,
3946 "times_contradicted": 0,
3947 "generated_by": "test"
3948 }
3949 }),
3950 embedding: None,
3951 embedding_model: None,
3952 })
3953 .await
3954 .unwrap();
3955
3956 let reinforced = repo
3957 .get_most_reinforced_by_perspective(ns_id, &perspective, 10)
3958 .await
3959 .unwrap();
3960 assert_eq!(reinforced.len(), 1);
3961 assert_eq!(reinforced[0].content, "reinforced insight");
3962 }
3963
3964 #[tokio::test]
3965 async fn test_contradictions_perspective_excludes_raw_noise() {
3966 let pool = setup_test_db().await;
3967 let ns_id = create_namespace(&pool, "test-agent").await;
3968 let repo = MemoryRepository::new(pool);
3969 let perspective = PerspectiveKey::new("claude-code", "claude-code", Some("s1".into()));
3970
3971 repo.store(StoreMemoryParams {
3972 namespace_id: ns_id,
3973 content: "a real contradiction",
3974 category: &Category::Facts,
3975 memory_lane_type: None,
3976 labels: &[],
3977 metadata: &cognitive_metadata(CognitiveLevel::Contradiction, &perspective, 0, 3),
3978 embedding: None,
3979 embedding_model: None,
3980 })
3981 .await
3982 .unwrap();
3983
3984 repo.store(StoreMemoryParams {
3985 namespace_id: ns_id,
3986 content: "raw noise",
3987 category: &Category::Session,
3988 memory_lane_type: None,
3989 labels: &["raw-activity".to_string()],
3990 metadata: &serde_json::json!({
3991 "raw_activity": true,
3992 "cognitive": {
3993 "level": "raw",
3994 "observer": perspective.observer,
3995 "subject": perspective.subject,
3996 "session_key": perspective.session_key,
3997 "source_memory_ids": [],
3998 "confidence": 0.5,
3999 "times_reinforced": 0,
4000 "times_contradicted": 0,
4001 "generated_by": "test"
4002 }
4003 }),
4004 embedding: None,
4005 embedding_model: None,
4006 })
4007 .await
4008 .unwrap();
4009
4010 let contradictions = repo
4011 .get_contradictions_by_perspective(ns_id, &perspective, 10)
4012 .await
4013 .unwrap();
4014 assert_eq!(contradictions.len(), 1);
4015 assert_eq!(contradictions[0].content, "a real contradiction");
4016 }
4017
4018 #[tokio::test]
4021 async fn test_search_working_set_basic() {
4022 let pool = setup_test_db().await;
4023 let ns_id = create_namespace(&pool, "test-agent").await;
4024 let repo = MemoryRepository::new(pool);
4025 let perspective = PerspectiveKey::new("claude-code", "claude-code", Some("s1".into()));
4026
4027 let _raw = repo
4029 .store(StoreMemoryParams {
4030 namespace_id: ns_id,
4031 content: "raw note",
4032 category: &Category::Session,
4033 memory_lane_type: None,
4034 labels: &[],
4035 metadata: &cognitive_metadata(CognitiveLevel::Raw, &perspective, 0, 0),
4036 embedding: None,
4037 embedding_model: None,
4038 })
4039 .await
4040 .unwrap();
4041
4042 let explicit = repo
4043 .store(StoreMemoryParams {
4044 namespace_id: ns_id,
4045 content: "explicit fact",
4046 category: &Category::Facts,
4047 memory_lane_type: None,
4048 labels: &[],
4049 metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective, 3, 0),
4050 embedding: None,
4051 embedding_model: None,
4052 })
4053 .await
4054 .unwrap();
4055
4056 let derived = repo
4057 .store(StoreMemoryParams {
4058 namespace_id: ns_id,
4059 content: "derived insight",
4060 category: &Category::Facts,
4061 memory_lane_type: None,
4062 labels: &[],
4063 metadata: &cognitive_metadata(CognitiveLevel::Derived, &perspective, 8, 0),
4064 embedding: None,
4065 embedding_model: None,
4066 })
4067 .await
4068 .unwrap();
4069
4070 let contradiction = repo
4071 .store(StoreMemoryParams {
4072 namespace_id: ns_id,
4073 content: "contradiction",
4074 category: &Category::Facts,
4075 memory_lane_type: None,
4076 labels: &[],
4077 metadata: &cognitive_metadata(CognitiveLevel::Contradiction, &perspective, 0, 2),
4078 embedding: None,
4079 embedding_model: None,
4080 })
4081 .await
4082 .unwrap();
4083
4084 let result = repo
4085 .search_working_set(WorkingSetParams {
4086 namespace_id: ns_id,
4087 perspective: Some(&perspective),
4088 max_items: 20,
4089 include_raw: false,
4090 })
4091 .await
4092 .unwrap();
4093
4094 assert!(result.len() >= 3);
4097 let ids: Vec<i64> = result.iter().map(|m| m.id).collect();
4098 assert!(ids.contains(&explicit.id));
4099 assert!(ids.contains(&derived.id));
4100 assert!(ids.contains(&contradiction.id));
4101 }
4102
4103 #[tokio::test]
4104 async fn test_search_working_set_dedupes() {
4105 let pool = setup_test_db().await;
4106 let ns_id = create_namespace(&pool, "test-agent").await;
4107 let repo = MemoryRepository::new(pool);
4108 let perspective = PerspectiveKey::new("claude-code", "claude-code", Some("s1".into()));
4109
4110 let shared = repo
4112 .store(StoreMemoryParams {
4113 namespace_id: ns_id,
4114 content: "shared memory",
4115 category: &Category::Facts,
4116 memory_lane_type: None,
4117 labels: &[],
4118 metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective, 10, 0),
4119 embedding: None,
4120 embedding_model: None,
4121 })
4122 .await
4123 .unwrap();
4124
4125 let result = repo
4126 .search_working_set(WorkingSetParams {
4127 namespace_id: ns_id,
4128 perspective: Some(&perspective),
4129 max_items: 20,
4130 include_raw: false,
4131 })
4132 .await
4133 .unwrap();
4134
4135 let count = result.iter().filter(|m| m.id == shared.id).count();
4136 assert_eq!(count, 1, "shared memory should appear exactly once");
4137 }
4138
4139 #[tokio::test]
4140 async fn test_search_working_set_respects_max_items() {
4141 let pool = setup_test_db().await;
4142 let ns_id = create_namespace(&pool, "test-agent").await;
4143 let repo = MemoryRepository::new(pool);
4144 let perspective = PerspectiveKey::new("claude-code", "claude-code", Some("s1".into()));
4145
4146 for i in 0..10 {
4147 let content = format!("memory {}", i);
4148 repo.store(StoreMemoryParams {
4149 namespace_id: ns_id,
4150 content: &content,
4151 category: &Category::Facts,
4152 memory_lane_type: None,
4153 labels: &[],
4154 metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective, i as i64, 0),
4155 embedding: None,
4156 embedding_model: None,
4157 })
4158 .await
4159 .unwrap();
4160 }
4161
4162 let result = repo
4163 .search_working_set(WorkingSetParams {
4164 namespace_id: ns_id,
4165 perspective: Some(&perspective),
4166 max_items: 3,
4167 include_raw: false,
4168 })
4169 .await
4170 .unwrap();
4171
4172 assert_eq!(result.len(), 3);
4173 }
4174
4175 #[tokio::test]
4176 async fn test_search_working_set_excludes_raw_noise() {
4177 let pool = setup_test_db().await;
4178 let ns_id = create_namespace(&pool, "test-agent").await;
4179 let repo = MemoryRepository::new(pool);
4180 let perspective = PerspectiveKey::new("claude-code", "claude-code", Some("s1".into()));
4181
4182 repo.store(StoreMemoryParams {
4183 namespace_id: ns_id,
4184 content: "real observation",
4185 category: &Category::Facts,
4186 memory_lane_type: None,
4187 labels: &[],
4188 metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective, 1, 0),
4189 embedding: None,
4190 embedding_model: None,
4191 })
4192 .await
4193 .unwrap();
4194
4195 repo.store(StoreMemoryParams {
4196 namespace_id: ns_id,
4197 content: "raw noise",
4198 category: &Category::Session,
4199 memory_lane_type: None,
4200 labels: &["raw-activity".to_string()],
4201 metadata: &serde_json::json!({"raw_activity": true, "cognitive": {"level": "raw"}}),
4202 embedding: None,
4203 embedding_model: None,
4204 })
4205 .await
4206 .unwrap();
4207
4208 let result = repo
4209 .search_working_set(WorkingSetParams {
4210 namespace_id: ns_id,
4211 perspective: Some(&perspective),
4212 max_items: 20,
4213 include_raw: false,
4214 })
4215 .await
4216 .unwrap();
4217
4218 assert!(result.iter().all(|m| m.content != "raw noise"));
4219 assert!(result.iter().any(|m| m.content == "real observation"));
4220 }
4221
4222 #[tokio::test]
4223 async fn test_search_working_set_without_perspective() {
4224 let pool = setup_test_db().await;
4225 let ns_id = create_namespace(&pool, "test-agent").await;
4226 let repo = MemoryRepository::new(pool);
4227
4228 repo.store(StoreMemoryParams {
4229 namespace_id: ns_id,
4230 content: "namespace memory one",
4231 category: &Category::Facts,
4232 memory_lane_type: None,
4233 labels: &[],
4234 metadata: &serde_json::json!({"cognitive": {"level": "explicit"}}),
4235 embedding: None,
4236 embedding_model: None,
4237 })
4238 .await
4239 .unwrap();
4240
4241 repo.store(StoreMemoryParams {
4242 namespace_id: ns_id,
4243 content: "namespace memory two",
4244 category: &Category::Facts,
4245 memory_lane_type: None,
4246 labels: &[],
4247 metadata: &serde_json::json!({"cognitive": {"level": "explicit"}}),
4248 embedding: None,
4249 embedding_model: None,
4250 })
4251 .await
4252 .unwrap();
4253
4254 let result = repo
4255 .search_working_set(WorkingSetParams {
4256 namespace_id: ns_id,
4257 perspective: None,
4258 max_items: 20,
4259 include_raw: false,
4260 })
4261 .await
4262 .unwrap();
4263
4264 assert!(result.len() >= 2);
4265 }
4266
4267 #[tokio::test]
4268 async fn test_list_by_session_key_matches_session_keys_array() {
4269 let pool = setup_test_db().await;
4270 let ns_id = create_namespace(&pool, "test-agent").await;
4271 let repo = MemoryRepository::new(pool);
4272
4273 repo.store(StoreMemoryParams {
4274 namespace_id: ns_id,
4275 content: "shared explicit memory",
4276 category: &Category::Facts,
4277 memory_lane_type: None,
4278 labels: &[],
4279 metadata: &serde_json::json!({
4280 "cognitive": {
4281 "level": "explicit",
4282 "session_key": "session-b",
4283 "session_keys": ["session-a", "session-b"]
4284 }
4285 }),
4286 embedding: None,
4287 embedding_model: None,
4288 })
4289 .await
4290 .unwrap();
4291
4292 let session_a = repo
4293 .list_by_session_key(ns_id, "session-a", 10, false)
4294 .await
4295 .unwrap();
4296 let session_b = repo
4297 .list_by_session_key(ns_id, "session-b", 10, false)
4298 .await
4299 .unwrap();
4300
4301 assert_eq!(session_a.len(), 1);
4302 assert_eq!(session_b.len(), 1);
4303 }
4304
4305 #[tokio::test]
4306 async fn test_count_evidence_returns_zero_for_empty_namespace() {
4307 let pool = setup_test_db().await;
4308 let ns_id = create_namespace(&pool, "test-agent").await;
4309 let repo = MemoryRepository::new(pool);
4310
4311 let count = repo.count_evidence(ns_id).await.unwrap();
4312 assert_eq!(count, 0);
4313 }
4314
4315 #[tokio::test]
4316 async fn test_count_evidence_counts_lineage_edges() {
4317 let pool = setup_test_db().await;
4318 let ns_id = create_namespace(&pool, "test-agent").await;
4319 let repo = MemoryRepository::new(pool);
4320
4321 let source = repo
4322 .store(StoreMemoryParams {
4323 namespace_id: ns_id,
4324 content: "source memory",
4325 category: &Category::Session,
4326 memory_lane_type: None,
4327 labels: &[],
4328 metadata: &serde_json::json!({}),
4329 embedding: None,
4330 embedding_model: None,
4331 })
4332 .await
4333 .unwrap();
4334
4335 let _derived = repo
4336 .store_with_lineage(StoreMemoryWithLineageParams {
4337 store: StoreMemoryParams {
4338 namespace_id: ns_id,
4339 content: "derived with evidence",
4340 category: &Category::Facts,
4341 memory_lane_type: None,
4342 labels: &[],
4343 metadata: &serde_json::json!({"cognitive": {"level": "derived"}}),
4344 embedding: None,
4345 embedding_model: None,
4346 },
4347 source_memory_ids: &[source.id],
4348 evidence_role: "source",
4349 })
4350 .await
4351 .unwrap();
4352
4353 let count = repo.count_evidence(ns_id).await.unwrap();
4354 assert_eq!(count, 1);
4355 }
4356
4357 #[tokio::test]
4358 async fn test_count_evidence_does_not_count_other_namespace() {
4359 let pool = setup_test_db().await;
4360 let ns_a = create_namespace(&pool, "agent-a").await;
4361 let ns_b = create_namespace(&pool, "agent-b").await;
4362 let repo = MemoryRepository::new(pool);
4363
4364 let source = repo
4365 .store(StoreMemoryParams {
4366 namespace_id: ns_a,
4367 content: "source in ns-a",
4368 category: &Category::Session,
4369 memory_lane_type: None,
4370 labels: &[],
4371 metadata: &serde_json::json!({}),
4372 embedding: None,
4373 embedding_model: None,
4374 })
4375 .await
4376 .unwrap();
4377
4378 let _derived = repo
4379 .store_with_lineage(StoreMemoryWithLineageParams {
4380 store: StoreMemoryParams {
4381 namespace_id: ns_a,
4382 content: "derived in ns-a",
4383 category: &Category::Facts,
4384 memory_lane_type: None,
4385 labels: &[],
4386 metadata: &serde_json::json!({}),
4387 embedding: None,
4388 embedding_model: None,
4389 },
4390 source_memory_ids: &[source.id],
4391 evidence_role: "source",
4392 })
4393 .await
4394 .unwrap();
4395
4396 assert_eq!(repo.count_evidence(ns_a).await.unwrap(), 1);
4397 assert_eq!(repo.count_evidence(ns_b).await.unwrap(), 0);
4398 }
4399
4400 #[tokio::test]
4401 async fn test_count_by_cognitive_level_returns_matching_total() {
4402 let pool = setup_test_db().await;
4403 let ns_id = create_namespace(&pool, "level-counts").await;
4404 let repo = MemoryRepository::new(pool);
4405
4406 for (content, level) in [
4407 ("raw event", CognitiveLevel::Raw),
4408 ("derived insight", CognitiveLevel::Derived),
4409 ("derived insight 2", CognitiveLevel::Derived),
4410 ("contradiction note", CognitiveLevel::Contradiction),
4411 ] {
4412 repo.store(StoreMemoryParams {
4413 namespace_id: ns_id,
4414 content,
4415 category: &Category::Session,
4416 memory_lane_type: None,
4417 labels: &[],
4418 metadata: &serde_json::json!({
4419 "cognitive": {
4420 "level": level.as_str(),
4421 "observer": "claude-code",
4422 "subject": "claude-code",
4423 "generated_by": "test"
4424 }
4425 }),
4426 embedding: None,
4427 embedding_model: None,
4428 })
4429 .await
4430 .unwrap();
4431 }
4432
4433 assert_eq!(
4434 repo.count_by_cognitive_level(ns_id, CognitiveLevel::Derived)
4435 .await
4436 .unwrap(),
4437 2
4438 );
4439 assert_eq!(
4440 repo.count_by_cognitive_level(ns_id, CognitiveLevel::Contradiction)
4441 .await
4442 .unwrap(),
4443 1
4444 );
4445 }
4446
4447 #[tokio::test]
4451 async fn test_get_by_cognitive_level_with_perspective_filters_before_limit() {
4452 let pool = setup_test_db().await;
4453 let ns_id = create_namespace(&pool, "perspective-limit").await;
4454 let repo = MemoryRepository::new(pool);
4455
4456 let perspective_a = PerspectiveKey::new("alice", "project-x", None);
4457 let perspective_b = PerspectiveKey::new("bob", "project-y", None);
4458
4459 for i in 0..5 {
4461 repo.store(StoreMemoryParams {
4462 namespace_id: ns_id,
4463 content: &format!("alice memory {}", i),
4464 category: &Category::Facts,
4465 memory_lane_type: None,
4466 labels: &[],
4467 metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective_a, 0, 0),
4468 embedding: None,
4469 embedding_model: None,
4470 })
4471 .await
4472 .unwrap();
4473 }
4474
4475 for i in 0..5 {
4477 repo.store(StoreMemoryParams {
4478 namespace_id: ns_id,
4479 content: &format!("bob memory {}", i),
4480 category: &Category::Facts,
4481 memory_lane_type: None,
4482 labels: &[],
4483 metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective_b, 0, 0),
4484 embedding: None,
4485 embedding_model: None,
4486 })
4487 .await
4488 .unwrap();
4489 }
4490
4491 let alice_results = repo
4493 .get_by_cognitive_level_with_perspective(
4494 ns_id,
4495 CognitiveLevel::Explicit,
4496 &perspective_a,
4497 3,
4498 )
4499 .await
4500 .unwrap();
4501 assert_eq!(alice_results.len(), 3);
4502 assert!(alice_results.iter().all(|m| {
4503 let meta = &m.metadata;
4504 let obs = meta
4505 .get("cognitive")
4506 .and_then(|c| c.get("observer"))
4507 .and_then(|v| v.as_str());
4508 let sub = meta
4509 .get("cognitive")
4510 .and_then(|c| c.get("subject"))
4511 .and_then(|v| v.as_str());
4512 obs == Some("alice") && sub == Some("project-x")
4513 }));
4514
4515 let alice_many = repo
4517 .get_by_cognitive_level_with_perspective(
4518 ns_id,
4519 CognitiveLevel::Explicit,
4520 &perspective_a,
4521 10,
4522 )
4523 .await
4524 .unwrap();
4525 assert_eq!(alice_many.len(), 5);
4526
4527 let bob_results = repo
4529 .get_by_cognitive_level_with_perspective(
4530 ns_id,
4531 CognitiveLevel::Explicit,
4532 &perspective_b,
4533 3,
4534 )
4535 .await
4536 .unwrap();
4537 assert_eq!(bob_results.len(), 3);
4538 assert!(bob_results.iter().all(|m| {
4539 let meta = &m.metadata;
4540 let obs = meta
4541 .get("cognitive")
4542 .and_then(|c| c.get("observer"))
4543 .and_then(|v| v.as_str());
4544 let sub = meta
4545 .get("cognitive")
4546 .and_then(|c| c.get("subject"))
4547 .and_then(|v| v.as_str());
4548 obs == Some("bob") && sub == Some("project-y")
4549 }));
4550 }
4551
4552 #[tokio::test]
4554 async fn test_get_by_cognitive_level_with_perspective_respects_session_key() {
4555 let pool = setup_test_db().await;
4556 let ns_id = create_namespace(&pool, "session-key-scalar").await;
4557 let repo = MemoryRepository::new(pool);
4558
4559 let perspective_s1 =
4560 PerspectiveKey::new("alice", "project-x", Some("session-1".to_string()));
4561 let perspective_s2 =
4562 PerspectiveKey::new("alice", "project-x", Some("session-2".to_string()));
4563
4564 for i in 0..3 {
4565 repo.store(StoreMemoryParams {
4566 namespace_id: ns_id,
4567 content: &format!("s1 memory {}", i),
4568 category: &Category::Facts,
4569 memory_lane_type: None,
4570 labels: &[],
4571 metadata: &cognitive_metadata(CognitiveLevel::Derived, &perspective_s1, 0, 0),
4572 embedding: None,
4573 embedding_model: None,
4574 })
4575 .await
4576 .unwrap();
4577 }
4578 for i in 0..3 {
4579 repo.store(StoreMemoryParams {
4580 namespace_id: ns_id,
4581 content: &format!("s2 memory {}", i),
4582 category: &Category::Facts,
4583 memory_lane_type: None,
4584 labels: &[],
4585 metadata: &cognitive_metadata(CognitiveLevel::Derived, &perspective_s2, 0, 0),
4586 embedding: None,
4587 embedding_model: None,
4588 })
4589 .await
4590 .unwrap();
4591 }
4592
4593 let s1_results = repo
4594 .get_by_cognitive_level_with_perspective(
4595 ns_id,
4596 CognitiveLevel::Derived,
4597 &perspective_s1,
4598 10,
4599 )
4600 .await
4601 .unwrap();
4602 assert_eq!(s1_results.len(), 3);
4603 assert!(s1_results.iter().all(|m| m.content.starts_with("s1")));
4604
4605 let s2_results = repo
4606 .get_by_cognitive_level_with_perspective(
4607 ns_id,
4608 CognitiveLevel::Derived,
4609 &perspective_s2,
4610 10,
4611 )
4612 .await
4613 .unwrap();
4614 assert_eq!(s2_results.len(), 3);
4615 assert!(s2_results.iter().all(|m| m.content.starts_with("s2")));
4616 }
4617
4618 #[tokio::test]
4620 async fn test_get_by_cognitive_level_with_perspective_matches_session_keys_array() {
4621 let pool = setup_test_db().await;
4622 let ns_id = create_namespace(&pool, "session-keys-array").await;
4623 let repo = MemoryRepository::new(pool);
4624
4625 let perspective = PerspectiveKey::new("alice", "project-x", Some("session-a".to_string()));
4626
4627 repo.store(StoreMemoryParams {
4629 namespace_id: ns_id,
4630 content: "scalar match",
4631 category: &Category::Facts,
4632 memory_lane_type: None,
4633 labels: &[],
4634 metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective, 0, 0),
4635 embedding: None,
4636 embedding_model: None,
4637 })
4638 .await
4639 .unwrap();
4640
4641 repo.store(StoreMemoryParams {
4643 namespace_id: ns_id,
4644 content: "array match",
4645 category: &Category::Facts,
4646 memory_lane_type: None,
4647 labels: &[],
4648 metadata: &serde_json::json!({
4649 "cognitive": {
4650 "level": "explicit",
4651 "observer": "alice",
4652 "subject": "project-x",
4653 "session_key": "session-other",
4654 "session_keys": ["session-a", "session-b"],
4655 "generated_by": "test"
4656 }
4657 }),
4658 embedding: None,
4659 embedding_model: None,
4660 })
4661 .await
4662 .unwrap();
4663
4664 repo.store(StoreMemoryParams {
4666 namespace_id: ns_id,
4667 content: "no match",
4668 category: &Category::Facts,
4669 memory_lane_type: None,
4670 labels: &[],
4671 metadata: &serde_json::json!({
4672 "cognitive": {
4673 "level": "explicit",
4674 "observer": "alice",
4675 "subject": "project-x",
4676 "session_key": "session-other",
4677 "session_keys": ["session-z"],
4678 "generated_by": "test"
4679 }
4680 }),
4681 embedding: None,
4682 embedding_model: None,
4683 })
4684 .await
4685 .unwrap();
4686
4687 let results = repo
4688 .get_by_cognitive_level_with_perspective(
4689 ns_id,
4690 CognitiveLevel::Explicit,
4691 &perspective,
4692 10,
4693 )
4694 .await
4695 .unwrap();
4696 assert_eq!(results.len(), 2);
4697 let contents: Vec<_> = results.iter().map(|m| m.content.as_str()).collect();
4698 assert!(contents.contains(&"scalar match"));
4699 assert!(contents.contains(&"array match"));
4700 }
4701
4702 #[tokio::test]
4703 async fn test_record_metric_and_latest_metrics_for_namespace() {
4704 let pool = setup_test_db().await;
4705 let ns_id = create_namespace(&pool, "metric-ns").await;
4706 let other_ns = create_namespace(&pool, "metric-other").await;
4707 let repo = MemoryRepository::new(pool);
4708
4709 repo.record_metric(
4710 "cognition.query.total_ms",
4711 12.5,
4712 &serde_json::json!({"namespace_id": ns_id, "stage": "total", "unit": "ms"}),
4713 )
4714 .await
4715 .unwrap();
4716 repo.record_metric(
4717 "cognition.query.total_ms",
4718 18.0,
4719 &serde_json::json!({"namespace_id": other_ns, "stage": "total", "unit": "ms"}),
4720 )
4721 .await
4722 .unwrap();
4723 repo.record_metric(
4724 "cognition.representation.total_ms",
4725 4.0,
4726 &serde_json::json!({"namespace_id": ns_id, "stage": "total", "unit": "ms"}),
4727 )
4728 .await
4729 .unwrap();
4730
4731 let metrics = repo
4732 .latest_metrics_for_namespace(ns_id, Some("cognition."), 10)
4733 .await
4734 .unwrap();
4735
4736 assert_eq!(metrics.len(), 2);
4737 assert!(metrics
4738 .iter()
4739 .all(|metric| metric.labels.contains(&ns_id.to_string())));
4740 assert!(metrics
4741 .iter()
4742 .any(|metric| metric.metric_name == "cognition.query.total_ms"));
4743 assert!(metrics
4744 .iter()
4745 .any(|metric| metric.metric_name == "cognition.representation.total_ms"));
4746 assert!(metrics
4747 .iter()
4748 .all(|metric| { metric.metric_name.starts_with("cognition.") }));
4749 }
4750
4751 #[tokio::test]
4752 async fn test_record_metrics_batch_persists_all_samples() {
4753 let pool = setup_test_db().await;
4754 let ns_id = create_namespace(&pool, "metric-batch").await;
4755 let repo = MemoryRepository::new(pool);
4756
4757 repo.record_metrics_batch(&[
4758 MetricSample {
4759 metric_name: "cognition.query.total_ms".to_string(),
4760 metric_value: 9.5,
4761 labels: serde_json::json!({"namespace_id": ns_id, "stage": "total", "unit": "ms"}),
4762 },
4763 MetricSample {
4764 metric_name: "cognition.query.answer.total_tokens".to_string(),
4765 metric_value: 128.0,
4766 labels: serde_json::json!({"namespace_id": ns_id, "stage": "answer", "unit": "tokens"}),
4767 },
4768 ])
4769 .await
4770 .unwrap();
4771
4772 let metrics = repo
4773 .latest_metrics_for_namespace(ns_id, Some("cognition."), 10)
4774 .await
4775 .unwrap();
4776
4777 assert_eq!(metrics.len(), 2);
4778 }
4779
4780 #[tokio::test]
4783 async fn test_list_jobs_returns_enqueued_jobs() {
4784 let pool = setup_test_db().await;
4785 let ns_id = create_namespace(&pool, "obs-jobs").await;
4786 let repo = MemoryRepository::new(pool);
4787
4788 repo.enqueue_job(EnqueueJobParams {
4789 namespace_id: ns_id,
4790 job_type: "derive",
4791 priority: 10,
4792 perspective: None,
4793 payload: &serde_json::json!({"a": 1}),
4794 })
4795 .await
4796 .unwrap();
4797
4798 repo.enqueue_job(EnqueueJobParams {
4799 namespace_id: ns_id,
4800 job_type: "digest",
4801 priority: 5,
4802 perspective: None,
4803 payload: &serde_json::json!({"b": 2}),
4804 })
4805 .await
4806 .unwrap();
4807
4808 let all = repo.list_jobs(ns_id, None, None, 50, 0).await.unwrap();
4810 assert_eq!(all.len(), 2);
4811
4812 let derive_only = repo
4814 .list_jobs(ns_id, Some("derive"), None, 50, 0)
4815 .await
4816 .unwrap();
4817 assert_eq!(derive_only.len(), 1);
4818 assert_eq!(derive_only[0].job_type, "derive");
4819
4820 let pending = repo
4822 .list_jobs(ns_id, None, Some("pending"), 50, 0)
4823 .await
4824 .unwrap();
4825 assert_eq!(pending.len(), 2);
4826
4827 let digest_pending = repo
4829 .list_jobs(ns_id, Some("digest"), Some("pending"), 50, 0)
4830 .await
4831 .unwrap();
4832 assert_eq!(digest_pending.len(), 1);
4833 }
4834
4835 #[tokio::test]
4836 async fn test_list_jobs_respects_limit_offset() {
4837 let pool = setup_test_db().await;
4838 let ns_id = create_namespace(&pool, "obs-limit").await;
4839 let repo = MemoryRepository::new(pool);
4840
4841 for i in 0..5 {
4842 repo.enqueue_job(EnqueueJobParams {
4843 namespace_id: ns_id,
4844 job_type: "derive",
4845 priority: i,
4846 perspective: None,
4847 payload: &serde_json::json!({"i": i}),
4848 })
4849 .await
4850 .unwrap();
4851 }
4852
4853 let page1 = repo.list_jobs(ns_id, None, None, 2, 0).await.unwrap();
4854 assert_eq!(page1.len(), 2);
4855
4856 let page2 = repo.list_jobs(ns_id, None, None, 2, 2).await.unwrap();
4857 assert_eq!(page2.len(), 2);
4858
4859 let page3 = repo.list_jobs(ns_id, None, None, 2, 4).await.unwrap();
4860 assert_eq!(page3.len(), 1);
4861 }
4862
4863 #[tokio::test]
4864 async fn test_count_jobs_by_status() {
4865 let pool = setup_test_db().await;
4866 let ns_id = create_namespace(&pool, "obs-count").await;
4867 let repo = MemoryRepository::new(pool);
4868
4869 repo.enqueue_job(EnqueueJobParams {
4870 namespace_id: ns_id,
4871 job_type: "derive",
4872 priority: 10,
4873 perspective: None,
4874 payload: &serde_json::json!({}),
4875 })
4876 .await
4877 .unwrap();
4878
4879 repo.enqueue_job(EnqueueJobParams {
4880 namespace_id: ns_id,
4881 job_type: "derive",
4882 priority: 5,
4883 perspective: None,
4884 payload: &serde_json::json!({}),
4885 })
4886 .await
4887 .unwrap();
4888
4889 repo.enqueue_job(EnqueueJobParams {
4890 namespace_id: ns_id,
4891 job_type: "digest",
4892 priority: 10,
4893 perspective: None,
4894 payload: &serde_json::json!({}),
4895 })
4896 .await
4897 .unwrap();
4898
4899 let all_counts = repo.count_jobs_by_status(ns_id, None).await.unwrap();
4901 let total: i64 = all_counts.iter().map(|(_, c)| c).sum();
4902 assert_eq!(total, 3);
4903
4904 let derive_counts = repo
4906 .count_jobs_by_status(ns_id, Some("derive"))
4907 .await
4908 .unwrap();
4909 let derive_total: i64 = derive_counts.iter().map(|(_, c)| c).sum();
4910 assert_eq!(derive_total, 2);
4911 }
4912
4913 #[tokio::test]
4914 async fn test_count_jobs_respects_filters() {
4915 let pool = setup_test_db().await;
4916 let ns_id = create_namespace(&pool, "obs-job-total").await;
4917 let repo = MemoryRepository::new(pool);
4918
4919 repo.enqueue_job(EnqueueJobParams {
4920 namespace_id: ns_id,
4921 job_type: "derive",
4922 priority: 10,
4923 perspective: None,
4924 payload: &serde_json::json!({"index": 1}),
4925 })
4926 .await
4927 .unwrap();
4928 repo.enqueue_job(EnqueueJobParams {
4929 namespace_id: ns_id,
4930 job_type: "derive",
4931 priority: 5,
4932 perspective: None,
4933 payload: &serde_json::json!({"index": 2}),
4934 })
4935 .await
4936 .unwrap();
4937 repo.enqueue_job(EnqueueJobParams {
4938 namespace_id: ns_id,
4939 job_type: "digest",
4940 priority: 1,
4941 perspective: None,
4942 payload: &serde_json::json!({"index": 3}),
4943 })
4944 .await
4945 .unwrap();
4946
4947 assert_eq!(repo.count_jobs(ns_id, None, None).await.unwrap(), 3);
4948 assert_eq!(
4949 repo.count_jobs(ns_id, Some("derive"), None).await.unwrap(),
4950 2
4951 );
4952 assert_eq!(
4953 repo.count_jobs(ns_id, Some("derive"), Some("pending"))
4954 .await
4955 .unwrap(),
4956 2
4957 );
4958 assert_eq!(
4959 repo.count_jobs(ns_id, Some("reflect"), Some("pending"))
4960 .await
4961 .unwrap(),
4962 0
4963 );
4964 }
4965
4966 #[tokio::test]
4967 async fn test_list_digests_and_count() {
4968 let pool = setup_test_db().await;
4969 let ns_id = create_namespace(&pool, "obs-digests").await;
4970 let repo = MemoryRepository::new(pool);
4971
4972 let mem = repo
4974 .store(StoreMemoryParams {
4975 namespace_id: ns_id,
4976 content: "digest content",
4977 category: &Category::Session,
4978 memory_lane_type: None,
4979 labels: &[],
4980 metadata: &serde_json::json!({"cognitive": {"level": "summary_short"}}),
4981 embedding: None,
4982 embedding_model: None,
4983 })
4984 .await
4985 .unwrap();
4986
4987 repo.store_digest(StoreDigestParams {
4988 namespace_id: ns_id,
4989 session_key: "session-1",
4990 digest_kind: "short",
4991 memory_id: mem.id,
4992 start_memory_id: Some(1),
4993 end_memory_id: Some(10),
4994 token_count: 50,
4995 })
4996 .await
4997 .unwrap();
4998
4999 repo.store_digest(StoreDigestParams {
5000 namespace_id: ns_id,
5001 session_key: "session-2",
5002 digest_kind: "long",
5003 memory_id: mem.id,
5004 start_memory_id: Some(11),
5005 end_memory_id: Some(20),
5006 token_count: 100,
5007 })
5008 .await
5009 .unwrap();
5010
5011 let all = repo.list_digests(ns_id, None, 50, 0).await.unwrap();
5013 assert_eq!(all.len(), 2);
5014
5015 let total = repo.count_digests(ns_id, None).await.unwrap();
5016 assert_eq!(total, 2);
5017
5018 let sess1 = repo
5020 .list_digests(ns_id, Some("session-1"), 50, 0)
5021 .await
5022 .unwrap();
5023 assert_eq!(sess1.len(), 1);
5024 assert_eq!(sess1[0].session_key, "session-1");
5025
5026 let sess1_count = repo.count_digests(ns_id, Some("session-1")).await.unwrap();
5027 assert_eq!(sess1_count, 1);
5028
5029 let none = repo
5031 .list_digests(ns_id, Some("session-none"), 50, 0)
5032 .await
5033 .unwrap();
5034 assert!(none.is_empty());
5035
5036 let none_count = repo
5037 .count_digests(ns_id, Some("session-none"))
5038 .await
5039 .unwrap();
5040 assert_eq!(none_count, 0);
5041 }
5042
5043 #[tokio::test]
5048 async fn test_row_to_memory_rejects_malformed_labels() {
5049 let pool = setup_test_db().await;
5050 let ns_id = create_namespace(&pool, "test-agent").await;
5051 let repo = MemoryRepository::new(pool);
5052
5053 let memory = repo
5055 .store(StoreMemoryParams {
5056 namespace_id: ns_id,
5057 content: "corruption test labels",
5058 category: &Category::General,
5059 memory_lane_type: None,
5060 labels: &["valid-label".to_string()],
5061 metadata: &serde_json::Value::Null,
5062 embedding: None,
5063 embedding_model: None,
5064 })
5065 .await
5066 .unwrap();
5067
5068 sqlx::query("UPDATE memories SET labels = 'NOT VALID JSON{{{' WHERE id = ?")
5070 .bind(memory.id)
5071 .execute(repo.pool())
5072 .await
5073 .unwrap();
5074
5075 let err = repo.get_by_id(memory.id).await.unwrap_err();
5076 let msg = err.to_string();
5077 assert!(
5078 msg.contains("corrupted labels JSON"),
5079 "expected labels corruption error, got: {msg}"
5080 );
5081 assert!(msg.contains(&memory.id.to_string()));
5082 }
5083
5084 #[tokio::test]
5087 async fn test_row_to_memory_rejects_malformed_metadata() {
5088 let pool = setup_test_db().await;
5089 let repo = MemoryRepository::new(pool);
5090
5091 let row = MemoryRow {
5094 id: 999,
5095 namespace_id: 1,
5096 content: "test".to_string(),
5097 category: "general".to_string(),
5098 memory_lane_type: None,
5099 labels: "[]".to_string(),
5100 metadata: "[truncated".to_string(), similarity_score: None,
5102 relevance_score: None,
5103 content_embedding: None,
5104 embedding_model: None,
5105 created_at: Utc::now(),
5106 updated_at: None,
5107 last_accessed: None,
5108 is_active: true,
5109 is_archived: false,
5110 access_count: 0,
5111 };
5112
5113 let err = repo.row_to_memory(row).unwrap_err();
5114 let msg = err.to_string();
5115 assert!(
5116 msg.contains("corrupted metadata JSON"),
5117 "expected metadata corruption error, got: {msg}"
5118 );
5119 }
5120
5121 #[tokio::test]
5124 async fn test_row_to_memory_rejects_malformed_embedding() {
5125 let pool = setup_test_db().await;
5126 let ns_id = create_namespace(&pool, "test-agent").await;
5127 let repo = MemoryRepository::new(pool);
5128
5129 let memory = repo
5130 .store(StoreMemoryParams {
5131 namespace_id: ns_id,
5132 content: "corruption test embedding",
5133 category: &Category::General,
5134 memory_lane_type: None,
5135 labels: &[],
5136 metadata: &serde_json::Value::Null,
5137 embedding: Some(&[0.1, 0.2, 0.3]),
5138 embedding_model: Some("test-model"),
5139 })
5140 .await
5141 .unwrap();
5142
5143 sqlx::query("UPDATE memories SET content_embedding = 'not-an-array' WHERE id = ?")
5144 .bind(memory.id)
5145 .execute(repo.pool())
5146 .await
5147 .unwrap();
5148
5149 let err = repo.get_by_id(memory.id).await.unwrap_err();
5150 let msg = err.to_string();
5151 assert!(
5152 msg.contains("corrupted embedding JSON"),
5153 "expected embedding corruption error, got: {msg}"
5154 );
5155 }
5156
5157 #[tokio::test]
5160 async fn test_claim_jobs_rejects_malformed_payload() {
5161 let pool = setup_test_db().await;
5162 let ns_id = create_namespace(&pool, "test-agent").await;
5163 let repo = MemoryRepository::new(pool);
5164
5165 sqlx::query(
5167 r#"
5168 INSERT INTO memory_jobs (namespace_id, job_type, status, priority, payload_json, created_at, updated_at)
5169 VALUES (?, 'derive_memory', 'pending', 100, '{INVALID_JSON}', datetime('now'), datetime('now'))
5170 "#,
5171 )
5172 .bind(ns_id)
5173 .execute(repo.pool())
5174 .await
5175 .unwrap();
5176
5177 let claimed = repo
5179 .claim_jobs(ns_id, "derive_memory", "worker-1", 60, 1)
5180 .await
5181 .unwrap();
5182 assert!(
5183 claimed.is_empty(),
5184 "corrupt payload job should not be returned"
5185 );
5186
5187 let status: String =
5189 sqlx::query_scalar("SELECT status FROM memory_jobs WHERE namespace_id = ?")
5190 .bind(ns_id)
5191 .fetch_one(repo.pool())
5192 .await
5193 .unwrap();
5194 assert_eq!(status, "failed", "corrupt job should be permanently failed");
5195
5196 let last_error: Option<String> =
5197 sqlx::query_scalar("SELECT last_error FROM memory_jobs WHERE namespace_id = ?")
5198 .bind(ns_id)
5199 .fetch_one(repo.pool())
5200 .await
5201 .unwrap();
5202 assert!(
5203 last_error
5204 .unwrap_or_default()
5205 .contains("corrupted payload JSON"),
5206 "last_error should mention payload corruption"
5207 );
5208 }
5209
5210 #[tokio::test]
5213 async fn test_claim_jobs_rejects_malformed_perspective() {
5214 let pool = setup_test_db().await;
5215 let ns_id = create_namespace(&pool, "test-agent").await;
5216 let repo = MemoryRepository::new(pool);
5217
5218 sqlx::query(
5220 r#"
5221 INSERT INTO memory_jobs (namespace_id, job_type, status, priority, perspective_json, payload_json, created_at, updated_at)
5222 VALUES (?, 'derive_memory', 'pending', 100, '{BOGUS}', '{"ok": true}', datetime('now'), datetime('now'))
5223 "#,
5224 )
5225 .bind(ns_id)
5226 .execute(repo.pool())
5227 .await
5228 .unwrap();
5229
5230 let claimed = repo
5232 .claim_jobs(ns_id, "derive_memory", "worker-1", 60, 1)
5233 .await
5234 .unwrap();
5235 assert!(
5236 claimed.is_empty(),
5237 "corrupt perspective job should not be returned"
5238 );
5239
5240 let status: String =
5242 sqlx::query_scalar("SELECT status FROM memory_jobs WHERE namespace_id = ?")
5243 .bind(ns_id)
5244 .fetch_one(repo.pool())
5245 .await
5246 .unwrap();
5247 assert_eq!(status, "failed", "corrupt job should be permanently failed");
5248
5249 let last_error: Option<String> =
5250 sqlx::query_scalar("SELECT last_error FROM memory_jobs WHERE namespace_id = ?")
5251 .bind(ns_id)
5252 .fetch_one(repo.pool())
5253 .await
5254 .unwrap();
5255 assert!(
5256 last_error
5257 .unwrap_or_default()
5258 .contains("corrupted perspective JSON"),
5259 "last_error should mention perspective corruption"
5260 );
5261 }
5262
5263 #[tokio::test]
5266 async fn test_claim_jobs_skips_corrupt_returns_valid() {
5267 let pool = setup_test_db().await;
5268 let ns_id = create_namespace(&pool, "test-agent").await;
5269 let repo = MemoryRepository::new(pool);
5270
5271 let p1 = serde_json::json!({"memory_id": 1});
5273 repo.enqueue_job(EnqueueJobParams {
5274 namespace_id: ns_id,
5275 job_type: "derive_memory",
5276 priority: 100,
5277 perspective: None,
5278 payload: &p1,
5279 })
5280 .await
5281 .unwrap();
5282 let p2 = serde_json::json!({"memory_id": 2});
5283 repo.enqueue_job(EnqueueJobParams {
5284 namespace_id: ns_id,
5285 job_type: "derive_memory",
5286 priority: 50,
5287 perspective: None,
5288 payload: &p2,
5289 })
5290 .await
5291 .unwrap();
5292
5293 sqlx::query(
5295 r#"
5296 INSERT INTO memory_jobs (namespace_id, job_type, status, priority, payload_json, created_at, updated_at)
5297 VALUES (?, 'derive_memory', 'pending', 200, '{BROKEN}', datetime('now'), datetime('now'))
5298 "#,
5299 )
5300 .bind(ns_id)
5301 .execute(repo.pool())
5302 .await
5303 .unwrap();
5304
5305 let claimed = repo
5307 .claim_jobs(ns_id, "derive_memory", "worker-1", 60, 10)
5308 .await
5309 .unwrap();
5310 assert_eq!(
5311 claimed.len(),
5312 2,
5313 "should return 2 valid jobs, skipping the corrupt one"
5314 );
5315
5316 let failed_count: i64 = sqlx::query_scalar(
5318 "SELECT COUNT(*) FROM memory_jobs WHERE namespace_id = ? AND status = 'failed'",
5319 )
5320 .bind(ns_id)
5321 .fetch_one(repo.pool())
5322 .await
5323 .unwrap();
5324 assert_eq!(failed_count, 1, "corrupt job should be permanently failed");
5325 }
5326
5327 #[tokio::test]
5331 async fn test_purge_completed_jobs_removes_old_keeps_recent() {
5332 let pool = setup_test_db().await;
5333 let ns_id = create_namespace(&pool, "purge-test").await;
5334 let repo = MemoryRepository::new(pool);
5335
5336 repo.enqueue_job(EnqueueJobParams {
5338 namespace_id: ns_id,
5339 job_type: "derive_memory",
5340 priority: 10,
5341 perspective: None,
5342 payload: &serde_json::json!({"old": true}),
5343 })
5344 .await
5345 .unwrap();
5346
5347 let claimed = repo
5348 .claim_jobs(ns_id, "derive_memory", "w", 60, 10)
5349 .await
5350 .unwrap();
5351 assert_eq!(claimed.len(), 1);
5352 let old_job_id = claimed[0].row.id;
5353
5354 repo.complete_job(&claimed[0]).await.unwrap();
5355
5356 sqlx::query("UPDATE memory_jobs SET updated_at = datetime('now', '-30 days') WHERE id = ?")
5358 .bind(old_job_id)
5359 .execute(repo.pool())
5360 .await
5361 .unwrap();
5362
5363 repo.enqueue_job(EnqueueJobParams {
5365 namespace_id: ns_id,
5366 job_type: "derive_memory",
5367 priority: 10,
5368 perspective: None,
5369 payload: &serde_json::json!({"new": true}),
5370 })
5371 .await
5372 .unwrap();
5373
5374 let claimed2 = repo
5375 .claim_jobs(ns_id, "derive_memory", "w", 60, 10)
5376 .await
5377 .unwrap();
5378 assert_eq!(claimed2.len(), 1);
5379 repo.complete_job(&claimed2[0]).await.unwrap();
5380
5381 let cutoff = Utc::now() - chrono::Duration::days(7);
5383 let deleted = repo.purge_completed_jobs(cutoff).await.unwrap();
5384 assert_eq!(deleted, 1);
5385
5386 let remaining = repo.list_jobs(ns_id, None, None, 50, 0).await.unwrap();
5388 assert_eq!(remaining.len(), 1);
5389 assert_eq!(remaining[0].id, claimed2[0].row.id);
5390 }
5391
5392 #[tokio::test]
5394 async fn test_purge_permanently_failed_jobs_removes_old_keeps_recent() {
5395 let pool = setup_test_db().await;
5396 let ns_id = create_namespace(&pool, "purge-failed").await;
5397 let repo = MemoryRepository::new(pool);
5398
5399 repo.enqueue_job(EnqueueJobParams {
5401 namespace_id: ns_id,
5402 job_type: "derive_memory",
5403 priority: 10,
5404 perspective: None,
5405 payload: &serde_json::json!({"fail_me": true}),
5406 })
5407 .await
5408 .unwrap();
5409
5410 for _ in 0..5 {
5411 let claimed = repo
5412 .claim_jobs(ns_id, "derive_memory", "w", 60, 10)
5413 .await
5414 .unwrap();
5415 assert_eq!(claimed.len(), 1);
5416 repo.fail_job(&claimed[0], "persistent error")
5417 .await
5418 .unwrap();
5419 }
5420
5421 sqlx::query(
5423 "UPDATE memory_jobs SET updated_at = datetime('now', '-30 days') WHERE status = ?",
5424 )
5425 .bind(memory_job_status::FAILED)
5426 .execute(repo.pool())
5427 .await
5428 .unwrap();
5429
5430 repo.enqueue_job(EnqueueJobParams {
5432 namespace_id: ns_id,
5433 job_type: "derive_memory",
5434 priority: 10,
5435 perspective: None,
5436 payload: &serde_json::json!({"retry_me": true}),
5437 })
5438 .await
5439 .unwrap();
5440
5441 for _ in 0..2 {
5442 let claimed = repo
5443 .claim_jobs(ns_id, "derive_memory", "w", 60, 10)
5444 .await
5445 .unwrap();
5446 assert_eq!(claimed.len(), 1);
5447 repo.fail_job(&claimed[0], "transient error").await.unwrap();
5448 }
5449
5450 let cutoff = Utc::now() - chrono::Duration::days(7);
5452 let deleted = repo.purge_permanently_failed_jobs(cutoff).await.unwrap();
5453 assert_eq!(deleted, 1);
5454
5455 let remaining = repo.list_jobs(ns_id, None, None, 50, 0).await.unwrap();
5457 assert_eq!(remaining.len(), 1);
5458 assert_eq!(remaining[0].status, memory_job_status::PENDING);
5459 }
5460
5461 #[tokio::test]
5463 async fn test_active_leasing_works_after_purge() {
5464 let pool = setup_test_db().await;
5465 let ns_id = create_namespace(&pool, "purge-lease").await;
5466 let repo = MemoryRepository::new(pool);
5467
5468 repo.enqueue_job(EnqueueJobParams {
5470 namespace_id: ns_id,
5471 job_type: "derive_memory",
5472 priority: 10,
5473 perspective: None,
5474 payload: &serde_json::json!({"old": true}),
5475 })
5476 .await
5477 .unwrap();
5478
5479 let claimed = repo
5480 .claim_jobs(ns_id, "derive_memory", "w", 60, 10)
5481 .await
5482 .unwrap();
5483 repo.complete_job(&claimed[0]).await.unwrap();
5484
5485 sqlx::query("UPDATE memory_jobs SET updated_at = datetime('now', '-30 days') WHERE id = ?")
5486 .bind(claimed[0].row.id)
5487 .execute(repo.pool())
5488 .await
5489 .unwrap();
5490
5491 let cutoff = Utc::now() - chrono::Duration::days(7);
5493 repo.purge_completed_jobs(cutoff).await.unwrap();
5494
5495 repo.enqueue_job(EnqueueJobParams {
5497 namespace_id: ns_id,
5498 job_type: "derive_memory",
5499 priority: 20,
5500 perspective: None,
5501 payload: &serde_json::json!({"fresh": true}),
5502 })
5503 .await
5504 .unwrap();
5505
5506 let fresh_claimed = repo
5507 .claim_jobs(ns_id, "derive_memory", "worker-2", 120, 10)
5508 .await
5509 .unwrap();
5510 assert_eq!(fresh_claimed.len(), 1);
5511 assert_eq!(fresh_claimed[0].row.status, "running");
5512 assert_eq!(fresh_claimed[0].payload["fresh"], true);
5513
5514 repo.complete_job(&fresh_claimed[0]).await.unwrap();
5516
5517 let empty = repo
5519 .claim_jobs(ns_id, "derive_memory", "worker-3", 60, 10)
5520 .await
5521 .unwrap();
5522 assert!(empty.is_empty());
5523 }
5524}