Skip to main content

nexus_memory_storage/
repository.rs

1//! Repository implementations for database operations
2
3use std::collections::{HashMap, HashSet};
4
5use crate::models::{
6    memory_job_status, AgentNamespaceRow, ClaimedMemoryJob, EnqueueJobParams, MemoryEvidenceRow,
7    MemoryJobRow, MemoryLineageEntry, MemoryRow, ProcessedFileRow, SessionDigestRow,
8    SystemMetricRow,
9};
10use crate::{db_error, Result};
11use chrono::{DateTime, Utc};
12use nexus_core::{
13    AgentNamespace, CognitiveLevel, Memory, MemoryCategory, MemoryLaneType, PerspectiveKey,
14};
15use sqlx::SqlitePool;
16
17/// Type alias for backward compatibility
18type Category = MemoryCategory;
19
20/// Parameters for storing a new memory
21pub struct StoreMemoryParams<'a> {
22    pub namespace_id: i64,
23    pub content: &'a str,
24    pub category: &'a Category,
25    pub memory_lane_type: Option<&'a MemoryLaneType>,
26    pub labels: &'a [String],
27    pub metadata: &'a serde_json::Value,
28    pub embedding: Option<&'a [f32]>,
29    pub embedding_model: Option<&'a str>,
30}
31
32/// Parameters for storing a memory with evidence lineage.
33pub struct StoreMemoryWithLineageParams<'a> {
34    pub store: StoreMemoryParams<'a>,
35    pub source_memory_ids: &'a [i64],
36    pub evidence_role: &'a str,
37}
38
39/// Parameters for storing or updating a session digest registration.
40pub struct StoreDigestParams<'a> {
41    pub namespace_id: i64,
42    pub session_key: &'a str,
43    pub digest_kind: &'a str,
44    pub memory_id: i64,
45    pub start_memory_id: Option<i64>,
46    pub end_memory_id: Option<i64>,
47    pub token_count: usize,
48}
49
50pub struct SessionDigestRollover {
51    pub last_digest_end_memory_id: Option<i64>,
52    pub new_memory_count: i64,
53    pub estimated_new_tokens: i64,
54}
55
56pub struct ListMemoryFilters<'a> {
57    pub category: Option<&'a str>,
58    pub since: Option<DateTime<Utc>>,
59    pub until: Option<DateTime<Utc>>,
60    pub content_like: Option<&'a str>,
61    pub include_raw: bool,
62    pub limit: i64,
63    pub offset: i64,
64}
65
66/// Parameters for [`MemoryRepository::search_working_set`].
67pub struct WorkingSetParams<'a> {
68    /// Namespace to search within.
69    pub namespace_id: i64,
70    /// Optional perspective lens. When `None`, queries fall back to
71    /// namespace-only retrieval without observer/subject filtering.
72    pub perspective: Option<&'a PerspectiveKey>,
73    /// Maximum number of memories to return after deduplication.
74    pub max_items: usize,
75    /// When `false` (default), raw-activity hook noise is excluded from
76    /// every bucket.
77    pub include_raw: bool,
78}
79
80/// Parameters for embedding-backed semantic candidate retrieval.
81pub struct SemanticCandidateParams<'a> {
82    /// Namespace to search within.
83    pub namespace_id: i64,
84    /// Optional perspective lens for observer/subject/session scoping.
85    pub perspective: Option<&'a PerspectiveKey>,
86    /// Maximum number of candidate memories to return before vector ranking.
87    pub limit: i64,
88    /// When `false`, raw-activity hook noise is excluded.
89    pub include_raw: bool,
90}
91
92/// A persisted system metric sample.
93#[derive(Debug, Clone)]
94pub struct MetricSample {
95    pub metric_name: String,
96    pub metric_value: f64,
97    pub labels: serde_json::Value,
98}
99
100/// Maximum number of retry attempts before a job is permanently failed.
101const MAX_JOB_ATTEMPTS: i64 = 5;
102const RAW_ACTIVITY_FILTER_SQL: &str =
103    "labels NOT LIKE '%raw-activity%' AND json_extract(COALESCE(metadata, '{}'), '$.raw_activity') IS NULL";
104
105// ---------------------------------------------------------------------------
106// Shared cognitive-metadata query fragments
107// ---------------------------------------------------------------------------
108// These constants are the single source of truth for how the repository
109// extracts values from the `metadata` JSON column.  Every query that touches
110// cognitive fields (observer, subject, session_key, session_keys, level,
111// times_reinforced, times_contradicted) MUST use these fragments so that
112// schema-level changes propagate consistently and cannot drift.
113// ---------------------------------------------------------------------------
114
115/// SQL expression that safely extracts a value from the metadata JSON column,
116/// defaulting to the given JSON literal when metadata is NULL.
117///
118/// Usage in a format! SQL template: `{METADATA}`
119/// Example: `json_extract({METADATA}, '$.cognitive.level')`
120const METADATA: &str = "COALESCE(metadata, '{}')";
121
122/// WHERE-clause fragment that matches a memory whose session context equals the
123/// given session key via either the scalar `session_key` field or the
124/// `session_keys` array.  The fragment expects TWO bound parameters (same value
125/// for both).
126const SESSION_KEY_FILTER: &str =
127    "(json_extract(METADATA, '$.cognitive.session_key') = ? \
128     OR EXISTS (SELECT 1 FROM json_each(COALESCE(json_extract(metadata, '$.cognitive.session_keys'), '[]')) WHERE value = ?))";
129
130/// WHERE-clause fragment that filters by perspective observer and subject.
131/// Does NOT include session_key filtering — combine with [`SESSION_KEY_FILTER`]
132/// when the perspective has a session_key.
133const PERSPECTIVE_IDENTITY_FILTER: &str = "json_extract(METADATA, '$.cognitive.observer') = ? \
134     AND json_extract(METADATA, '$.cognitive.subject') = ?";
135
136/// Cognitive-level extraction expression (no comparison — yields the raw string).
137const COGNITIVE_LEVEL_EXPR: &str = "json_extract(METADATA, '$.cognitive.level')";
138
139/// Assembles the full perspective WHERE clause (observer, subject, and optional
140/// session key) for use in SQL templates.
141///
142/// Returns `(sql_fragment, bind_count)`. The caller must bind:
143/// - observer, subject (always)
144/// - session_key × 2 (if present)
145fn perspective_where_clause(perspective: &PerspectiveKey) -> String {
146    if perspective.session_key.is_some() {
147        format!(
148            "{} AND {}",
149            PERSPECTIVE_IDENTITY_FILTER.replace("METADATA", METADATA),
150            SESSION_KEY_FILTER.replace("METADATA", METADATA),
151        )
152    } else {
153        PERSPECTIVE_IDENTITY_FILTER.replace("METADATA", METADATA)
154    }
155}
156
157/// Bind values for a perspective WHERE clause.
158/// Order: observer, subject, [session_key, session_key]
159fn bind_perspective(perspective: &PerspectiveKey) -> Vec<&str> {
160    let mut vals = vec![perspective.observer.as_str(), perspective.subject.as_str()];
161    if let Some(ref sk) = perspective.session_key {
162        vals.push(sk.as_str());
163        vals.push(sk.as_str());
164    }
165    vals
166}
167
168/// Repository for memory operations
169pub struct MemoryRepository {
170    pool: SqlitePool,
171}
172
173impl MemoryRepository {
174    pub fn new(pool: SqlitePool) -> Self {
175        Self { pool }
176    }
177
178    pub fn pool(&self) -> &SqlitePool {
179        &self.pool
180    }
181
182    /// Store a new memory
183    pub async fn store(&self, params: StoreMemoryParams<'_>) -> Result<Memory> {
184        let labels_json = serde_json::to_string(params.labels)?;
185        let metadata_json = serde_json::to_string(params.metadata)?;
186        let embedding_json = params.embedding.map(serde_json::to_string).transpose()?;
187
188        let result = sqlx::query(
189            r#"
190            INSERT INTO memories (
191                namespace_id, content, category, memory_lane_type, labels, metadata,
192                content_embedding, embedding_model, created_at, is_active, access_count
193            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 1, 0)
194            "#,
195        )
196        .bind(params.namespace_id)
197        .bind(params.content)
198        .bind(params.category.to_string())
199        .bind(params.memory_lane_type.map(|t| t.to_string()))
200        .bind(&labels_json)
201        .bind(&metadata_json)
202        .bind(&embedding_json)
203        .bind(params.embedding_model)
204        .bind(Utc::now())
205        .execute(&self.pool)
206        .await
207        .map_err(db_error)?;
208
209        let id = result.last_insert_rowid();
210
211        // If last_insert_rowid() is 0, the BEFORE INSERT trigger
212        // (trg_memories_same_namespace_merge) detected a duplicate and
213        // raised IGNORE. The existing row was touched (access_count
214        // incremented) — find it by content match.
215        if id == 0 {
216            // Try to find existing row by content match (case-insensitive, trimmed)
217            let row: Option<MemoryRow> = sqlx::query_as(
218                "SELECT * FROM memories WHERE namespace_id = ? AND LOWER(TRIM(content)) = LOWER(TRIM(?)) AND is_active = 1 ORDER BY created_at DESC LIMIT 1"
219            )
220            .bind(params.namespace_id)
221            .bind(params.content)
222            .fetch_optional(&self.pool)
223            .await
224            .map_err(db_error)?;
225
226            if let Some(existing) = row {
227                self.merge_duplicate_memory_context(existing.id, params)
228                    .await?;
229                return self.get_by_id(existing.id).await?.ok_or_else(|| {
230                    nexus_core::NexusError::Storage(format!(
231                        "Duplicate merged row {} could not be reloaded",
232                        existing.id
233                    ))
234                });
235            }
236
237            // If no duplicate found, this is unexpected - log warning but don't fail
238            // This can happen if trigger fires but content doesn't match exactly
239            tracing::warn!(
240                namespace_id = params.namespace_id,
241                content_length = params.content.len(),
242                "Insert returned id 0 but no matching duplicate found - treating as successful insert"
243            );
244
245            // Return success anyway - the insert did happen, we just can't track the id
246            // This prevents hook failures due to this edge case
247            return self
248                .get_by_content(params.namespace_id, params.content)
249                .await;
250        }
251
252        self.get_by_id(id).await?.ok_or_else(|| {
253            nexus_core::NexusError::Storage(format!("Failed to retrieve memory with id {}", id))
254        })
255    }
256
257    async fn merge_duplicate_memory_context(
258        &self,
259        existing_id: i64,
260        params: StoreMemoryParams<'_>,
261    ) -> Result<()> {
262        let current = self.get_by_id(existing_id).await?.ok_or_else(|| {
263            nexus_core::NexusError::Storage(format!(
264                "Failed to load duplicate-merged memory {}",
265                existing_id
266            ))
267        })?;
268
269        let merged_labels = merge_labels(&current.labels, params.labels);
270        let merged_metadata = merge_duplicate_metadata(&current.metadata, params.metadata);
271        let labels_json = serde_json::to_string(&merged_labels)?;
272        let metadata_json = serde_json::to_string(&merged_metadata)?;
273
274        sqlx::query(
275            r#"
276            UPDATE memories
277            SET labels = ?, metadata = ?, updated_at = ?
278            WHERE id = ?
279            "#,
280        )
281        .bind(&labels_json)
282        .bind(&metadata_json)
283        .bind(Utc::now())
284        .bind(existing_id)
285        .execute(&self.pool)
286        .await
287        .map_err(db_error)?;
288
289        Ok(())
290    }
291
292    /// Store a memory and write evidence lineage rows atomically.
293    pub async fn store_with_lineage(
294        &self,
295        params: StoreMemoryWithLineageParams<'_>,
296    ) -> Result<Memory> {
297        let mut tx = self.pool.begin().await.map_err(db_error)?;
298        let memory_id = insert_memory_tx(&mut tx, &params.store).await?;
299        for &source_id in params.source_memory_ids {
300            insert_evidence_tx(&mut tx, memory_id, source_id, params.evidence_role).await?;
301        }
302        tx.commit().await.map_err(db_error)?;
303
304        self.get_by_id(memory_id).await?.ok_or_else(|| {
305            nexus_core::NexusError::Storage(format!(
306                "Failed to retrieve memory with id {} after lineage store",
307                memory_id
308            ))
309        })
310    }
311
312    /// Enqueue a new cognitive job.
313    pub async fn enqueue_job(&self, params: EnqueueJobParams<'_>) -> Result<i64> {
314        let perspective_json = params.perspective.map(serde_json::to_string).transpose()?;
315        let payload_json = serde_json::to_string(params.payload)?;
316
317        let id: i64 = sqlx::query_scalar(
318            r#"
319            INSERT INTO memory_jobs (namespace_id, job_type, status, priority, perspective_json, payload_json, created_at, updated_at)
320            VALUES (?, ?, 'pending', ?, ?, ?, datetime('now'), datetime('now'))
321            RETURNING id
322            "#,
323        )
324        .bind(params.namespace_id)
325        .bind(params.job_type)
326        .bind(params.priority)
327        .bind(&perspective_json)
328        .bind(&payload_json)
329        .fetch_one(&self.pool)
330        .await
331        .map_err(db_error)?;
332
333        Ok(id)
334    }
335
336    /// Claim up to `limit` pending (or stale running) jobs for a given type and namespace.
337    ///
338    /// Stale running jobs are those whose lease has expired (`lease_expires_at < now`).
339    /// Claimed jobs are transitioned to `running` with a lease owner and TTL.
340    pub async fn claim_jobs(
341        &self,
342        namespace_id: i64,
343        job_type: &str,
344        lease_owner: &str,
345        lease_ttl_secs: u64,
346        limit: i64,
347    ) -> Result<Vec<ClaimedMemoryJob>> {
348        let claim_token = new_claim_token(lease_owner);
349        // SQLite's UPDATE ... RETURNING does not preserve CTE ordering.
350        // We must sort in Rust to guarantee priority-first delivery.
351        // This is O(n log n) but n is bounded by the `limit` parameter (typically small).
352        let rows: Vec<MemoryJobRow> = sqlx::query_as::<_, MemoryJobRow>(
353            r#"
354            WITH candidates AS (
355                SELECT id
356                FROM memory_jobs
357                WHERE namespace_id = ? AND job_type = ? AND (
358                    status = ?
359                    OR (status = ? AND lease_expires_at IS NOT NULL AND lease_expires_at < datetime('now'))
360                )
361                ORDER BY priority DESC, created_at ASC
362                LIMIT ?
363            )
364            UPDATE memory_jobs
365            SET status = ?,
366                lease_owner = ?,
367                claim_token = ?,
368                lease_expires_at = datetime('now', '+' || ? || ' seconds'),
369                attempts = attempts + 1,
370                updated_at = datetime('now')
371            WHERE id IN (SELECT id FROM candidates)
372            RETURNING *
373            "#,
374        )
375        .bind(namespace_id)
376        .bind(job_type)
377        .bind(memory_job_status::PENDING)
378        .bind(memory_job_status::RUNNING)
379        .bind(limit)
380        .bind(memory_job_status::RUNNING)
381        .bind(lease_owner)
382        .bind(&claim_token)
383        .bind(lease_ttl_secs as i64)
384        .fetch_all(&self.pool)
385        .await
386        .map_err(db_error)?;
387
388        let mut rows = rows;
389        rows.sort_by(|left, right| {
390            right
391                .priority
392                .cmp(&left.priority)
393                .then_with(|| left.created_at.cmp(&right.created_at))
394        });
395
396        let mut claimed = Vec::with_capacity(rows.len());
397        for row in rows {
398            let perspective = match row.perspective_json.as_deref() {
399                Some(s) => match serde_json::from_str(s) {
400                    Ok(p) => Some(p),
401                    Err(e) => {
402                        tracing::warn!(
403                            job_id = row.id,
404                            error = %e,
405                            "corrupted perspective JSON, permanently failing job"
406                        );
407                        let _ = self
408                            .permanently_fail_job(
409                                row.id,
410                                &row.lease_owner,
411                                &row.claim_token,
412                                &format!("corrupted perspective JSON: {e}"),
413                            )
414                            .await;
415                        continue;
416                    }
417                },
418                None => None,
419            };
420            let payload: serde_json::Value = match serde_json::from_str(&row.payload_json) {
421                Ok(p) => p,
422                Err(e) => {
423                    tracing::warn!(
424                        job_id = row.id,
425                        error = %e,
426                        "corrupted payload JSON, permanently failing job"
427                    );
428                    let _ = self
429                        .permanently_fail_job(
430                            row.id,
431                            &row.lease_owner,
432                            &row.claim_token,
433                            &format!("corrupted payload JSON: {e}"),
434                        )
435                        .await;
436                    continue;
437                }
438            };
439            claimed.push(ClaimedMemoryJob {
440                row,
441                perspective,
442                payload,
443            });
444        }
445
446        Ok(claimed)
447    }
448
449    /// Mark a job as completed.
450    pub async fn complete_job(&self, job: &ClaimedMemoryJob) -> Result<()> {
451        let result = sqlx::query(
452            r#"
453            UPDATE memory_jobs
454            SET status = ?, lease_owner = NULL, claim_token = NULL, lease_expires_at = NULL, updated_at = datetime('now')
455            WHERE id = ?
456              AND lease_owner = ?
457              AND claim_token = ?
458            "#,
459        )
460        .bind(memory_job_status::COMPLETED)
461        .bind(job.row.id)
462        .bind(job.row.lease_owner.as_deref())
463        .bind(job.row.claim_token.as_deref())
464        .execute(&self.pool)
465        .await
466        .map_err(db_error)?;
467
468        if result.rows_affected() == 0 {
469            return Err(nexus_core::NexusError::Storage(format!(
470                "Memory job {} completion lost lease ownership",
471                job.row.id
472            )));
473        }
474
475        Ok(())
476    }
477
478    /// Mark a job as failed. If attempts < MAX_JOB_ATTEMPTS, requeue it as pending.
479    /// Otherwise, mark it permanently failed.
480    pub async fn fail_job(&self, job: &ClaimedMemoryJob, error: &str) -> Result<()> {
481        let row: Option<MemoryJobRow> = sqlx::query_as("SELECT * FROM memory_jobs WHERE id = ?")
482            .bind(job.row.id)
483            .fetch_optional(&self.pool)
484            .await
485            .map_err(db_error)?;
486
487        let row = row.ok_or_else(|| {
488            nexus_core::NexusError::Storage(format!("Memory job {} not found", job.row.id))
489        })?;
490
491        let lease_matches =
492            row.lease_owner == job.row.lease_owner && row.claim_token == job.row.claim_token;
493        if !lease_matches {
494            return Err(nexus_core::NexusError::Storage(format!(
495                "Memory job {} failure lost lease ownership",
496                job.row.id
497            )));
498        }
499
500        if row.attempts >= MAX_JOB_ATTEMPTS {
501            // Permanently failed.
502            let result = sqlx::query(
503                r#"
504                UPDATE memory_jobs
505                SET status = ?, last_error = ?, lease_owner = NULL, claim_token = NULL, lease_expires_at = NULL, updated_at = datetime('now')
506                WHERE id = ? AND lease_owner = ? AND claim_token = ?
507                "#,
508            )
509            .bind(memory_job_status::FAILED)
510            .bind(error)
511            .bind(job.row.id)
512            .bind(job.row.lease_owner.as_deref())
513            .bind(job.row.claim_token.as_deref())
514            .execute(&self.pool)
515            .await
516            .map_err(db_error)?;
517            if result.rows_affected() == 0 {
518                return Err(nexus_core::NexusError::Storage(format!(
519                    "Memory job {} failure lost lease ownership",
520                    job.row.id
521                )));
522            }
523        } else {
524            // Requeue for retry.
525            let result = sqlx::query(
526                r#"
527                UPDATE memory_jobs
528                SET status = ?, last_error = ?, lease_owner = NULL, claim_token = NULL, lease_expires_at = NULL, updated_at = datetime('now')
529                WHERE id = ? AND lease_owner = ? AND claim_token = ?
530                "#,
531            )
532            .bind(memory_job_status::PENDING)
533            .bind(error)
534            .bind(job.row.id)
535            .bind(job.row.lease_owner.as_deref())
536            .bind(job.row.claim_token.as_deref())
537            .execute(&self.pool)
538            .await
539            .map_err(db_error)?;
540            if result.rows_affected() == 0 {
541                return Err(nexus_core::NexusError::Storage(format!(
542                    "Memory job {} failure lost lease ownership",
543                    job.row.id
544                )));
545            }
546        }
547
548        Ok(())
549    }
550
551    /// Permanently fail a job by ID without requiring a ClaimedMemoryJob.
552    /// Used by claim_jobs to handle corrupted jobs without
553    /// poisoning the entire batch.
554    async fn permanently_fail_job(
555        &self,
556        job_id: i64,
557        lease_owner: &Option<String>,
558        claim_token: &Option<String>,
559        error: &str,
560    ) -> Result<()> {
561        sqlx::query(
562            r#"
563            UPDATE memory_jobs
564            SET status = ?, last_error = ?, lease_owner = NULL, claim_token = NULL,
565                lease_expires_at = NULL, updated_at = datetime('now')
566            WHERE id = ? AND lease_owner = ? AND claim_token = ?
567            "#,
568        )
569        .bind(memory_job_status::FAILED)
570        .bind(error)
571        .bind(job_id)
572        .bind(lease_owner.as_deref())
573        .bind(claim_token.as_deref())
574        .execute(&self.pool)
575        .await
576        .map_err(db_error)?;
577        Ok(())
578    }
579
580    pub async fn get_most_reinforced_by_namespace(
581        &self,
582        namespace_id: i64,
583        limit: i64,
584        include_raw: bool,
585    ) -> Result<Vec<Memory>> {
586        let noise_sql = if include_raw {
587            String::new()
588        } else {
589            format!("AND {}", RAW_ACTIVITY_FILTER_SQL)
590        };
591        let rows = sqlx::query_as::<_, MemoryRow>(&format!(
592            r#"
593            SELECT * FROM memories
594            WHERE namespace_id = ?
595              AND is_active = 1
596              AND json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.level') = ?
597              {noise_sql}
598            ORDER BY COALESCE(json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.times_reinforced'), 0) DESC,
599                     created_at DESC
600            LIMIT ?
601            "#
602        ))
603        .bind(namespace_id)
604        .bind(CognitiveLevel::Derived.as_str())
605        .bind(limit)
606        .fetch_all(&self.pool)
607        .await
608        .map_err(db_error)?;
609
610        rows.into_iter()
611            .map(|row| self.row_to_memory(row))
612            .collect()
613    }
614
615    pub async fn get_contradictions_by_namespace(
616        &self,
617        namespace_id: i64,
618        limit: i64,
619        include_raw: bool,
620    ) -> Result<Vec<Memory>> {
621        let noise_sql = if include_raw {
622            String::new()
623        } else {
624            format!("AND {}", RAW_ACTIVITY_FILTER_SQL)
625        };
626        let rows = sqlx::query_as::<_, MemoryRow>(&format!(
627            r#"
628            SELECT * FROM memories
629            WHERE namespace_id = ?
630              AND is_active = 1
631              AND json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.level') = ?
632              {noise_sql}
633            ORDER BY COALESCE(json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.times_contradicted'), 0) DESC,
634                     created_at DESC
635            LIMIT ?
636            "#
637        ))
638        .bind(namespace_id)
639        .bind(CognitiveLevel::Contradiction.as_str())
640        .bind(limit)
641        .fetch_all(&self.pool)
642        .await
643        .map_err(db_error)?;
644
645        rows.into_iter()
646            .map(|row| self.row_to_memory(row))
647            .collect()
648    }
649
650    pub async fn list_by_session_key(
651        &self,
652        namespace_id: i64,
653        session_key: &str,
654        limit: i64,
655        include_raw: bool,
656    ) -> Result<Vec<Memory>> {
657        let noise_sql = if include_raw {
658            String::new()
659        } else {
660            format!("AND {}", RAW_ACTIVITY_FILTER_SQL)
661        };
662        let session_filter = SESSION_KEY_FILTER.replace("METADATA", METADATA);
663        let rows = sqlx::query_as::<_, MemoryRow>(&format!(
664            r#"
665            SELECT * FROM memories
666            WHERE namespace_id = ?
667              AND is_active = 1
668              AND {session_filter}
669              {noise_sql}
670            ORDER BY created_at DESC
671            LIMIT ?
672            "#
673        ))
674        .bind(namespace_id)
675        .bind(session_key)
676        .bind(session_key)
677        .bind(limit)
678        .fetch_all(&self.pool)
679        .await
680        .map_err(db_error)?;
681
682        rows.into_iter()
683            .map(|row| self.row_to_memory(row))
684            .collect()
685    }
686
687    /// Register a session digest for a memory.
688    ///
689    /// Returns the digest row ID. Uses the unique index on
690    /// `(namespace_id, session_key, digest_kind, end_memory_id)` to prevent
691    /// duplicate registrations.
692    pub async fn store_digest(&self, params: StoreDigestParams<'_>) -> Result<i64> {
693        let id: i64 = sqlx::query_scalar(
694            r#"
695            INSERT INTO session_digests (namespace_id, session_key, digest_kind, memory_id, start_memory_id, end_memory_id, token_count, created_at)
696            VALUES (?, ?, ?, ?, ?, ?, ?, datetime('now'))
697            ON CONFLICT(namespace_id, session_key, digest_kind, end_memory_id) DO UPDATE SET
698                memory_id = excluded.memory_id,
699                token_count = excluded.token_count
700            RETURNING id
701            "#,
702        )
703        .bind(params.namespace_id)
704        .bind(params.session_key)
705        .bind(params.digest_kind)
706        .bind(params.memory_id)
707        .bind(params.start_memory_id)
708        .bind(params.end_memory_id)
709        .bind(params.token_count as i64)
710        .fetch_one(&self.pool)
711        .await
712        .map_err(db_error)?;
713
714        Ok(id)
715    }
716
717    /// Get the latest digest memory for a session and digest kind.
718    /// Returns the `Memory` row that the digest points to, or None.
719    pub async fn latest_digest_for_session(
720        &self,
721        namespace_id: i64,
722        session_key: &str,
723        digest_kind: &str,
724    ) -> Result<Option<Memory>> {
725        let digest: Option<SessionDigestRow> = sqlx::query_as::<_, SessionDigestRow>(
726            "SELECT * FROM session_digests WHERE namespace_id = ? AND session_key = ? AND digest_kind = ? ORDER BY created_at DESC LIMIT 1",
727        )
728        .bind(namespace_id)
729        .bind(session_key)
730        .bind(digest_kind)
731        .fetch_optional(&self.pool)
732        .await
733        .map_err(db_error)?;
734
735        match digest {
736            Some(d) => self.get_by_id(d.memory_id).await,
737            None => Ok(None),
738        }
739    }
740
741    /// Get the latest digest memory for a namespace and digest kind, regardless
742    /// of session key. Returns the `Memory` row that the digest points to, or
743    /// None if no digest exists.
744    pub async fn latest_digest_for_namespace(
745        &self,
746        namespace_id: i64,
747        digest_kind: &str,
748    ) -> Result<Option<Memory>> {
749        let digest: Option<SessionDigestRow> = sqlx::query_as::<_, SessionDigestRow>(
750            "SELECT * FROM session_digests WHERE namespace_id = ? AND digest_kind = ? ORDER BY created_at DESC LIMIT 1",
751        )
752        .bind(namespace_id)
753        .bind(digest_kind)
754        .fetch_optional(&self.pool)
755        .await
756        .map_err(db_error)?;
757
758        match digest {
759            Some(d) => self.get_by_id(d.memory_id).await,
760            None => Ok(None),
761        }
762    }
763
764    /// Return how much fresh non-raw, non-digest session content has accumulated
765    /// since the latest covered digest window.
766    pub async fn session_digest_rollover(
767        &self,
768        namespace_id: i64,
769        session_key: &str,
770    ) -> Result<SessionDigestRollover> {
771        let last_digest_end_memory_id: Option<i64> = sqlx::query_scalar(
772            "SELECT MAX(end_memory_id) FROM session_digests WHERE namespace_id = ? AND session_key = ?",
773        )
774        .bind(namespace_id)
775        .bind(session_key)
776        .fetch_one(&self.pool)
777        .await
778        .map_err(db_error)?;
779
780        let (new_memory_count, estimated_new_tokens): (i64, i64) = sqlx::query_as(&format!(
781            r#"
782            SELECT
783                COUNT(*) as new_memory_count,
784                COALESCE(SUM((LENGTH(content) + 3) / 4), 0) as estimated_new_tokens
785            FROM memories
786            WHERE namespace_id = ?
787              AND is_active = 1
788              AND id > ?
789              AND (
790                  json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.session_key') = ?
791                  OR EXISTS (
792                      SELECT 1
793                      FROM json_each(COALESCE(json_extract(metadata, '$.cognitive.session_keys'), '[]'))
794                      WHERE value = ?
795                  )
796              )
797              AND {RAW_ACTIVITY_FILTER_SQL}
798              AND COALESCE(json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.level'), '') NOT IN ('raw', 'summary_short', 'summary_long')
799            "#,
800        ))
801        .bind(namespace_id)
802        .bind(last_digest_end_memory_id.unwrap_or(0))
803        .bind(session_key)
804        .bind(session_key)
805        .fetch_one(&self.pool)
806        .await
807        .map_err(db_error)?;
808
809        Ok(SessionDigestRollover {
810            last_digest_end_memory_id,
811            new_memory_count,
812            estimated_new_tokens,
813        })
814    }
815
816    /// Get recent memories scoped to a perspective, optionally narrowed to a session.
817    ///
818    /// By default, raw-activity noise is excluded. Set `include_raw` to `true` to
819    /// include operational hook event payloads.
820    pub async fn get_recent_by_perspective(
821        &self,
822        namespace_id: i64,
823        perspective: &PerspectiveKey,
824        limit: i64,
825    ) -> Result<Vec<Memory>> {
826        self.get_recent_by_perspective_opts(namespace_id, perspective, limit, false)
827            .await
828    }
829
830    /// Like [`Self::get_recent_by_perspective`] but allows opting into raw-activity
831    /// noise inclusion.
832    pub async fn get_recent_by_perspective_opts(
833        &self,
834        namespace_id: i64,
835        perspective: &PerspectiveKey,
836        limit: i64,
837        include_raw: bool,
838    ) -> Result<Vec<Memory>> {
839        let noise_sql = if include_raw {
840            String::new()
841        } else {
842            format!("AND {}", RAW_ACTIVITY_FILTER_SQL)
843        };
844
845        let perspective_sql = perspective_where_clause(perspective);
846        let sql = format!(
847            r#"
848            SELECT * FROM memories
849            WHERE namespace_id = ?
850              AND is_active = 1
851              AND {perspective_sql}
852              {noise_sql}
853            ORDER BY created_at DESC
854            LIMIT ?
855            "#
856        );
857        let mut query = sqlx::query_as::<_, MemoryRow>(&sql).bind(namespace_id);
858
859        for val in bind_perspective(perspective) {
860            query = query.bind(val);
861        }
862
863        let rows = query
864            .bind(limit)
865            .fetch_all(&self.pool)
866            .await
867            .map_err(db_error)?;
868
869        rows.into_iter()
870            .map(|row| self.row_to_memory(row))
871            .collect()
872    }
873
874    /// Get active memories by cognitive level ordered from newest to oldest.
875    pub async fn get_by_cognitive_level(
876        &self,
877        namespace_id: i64,
878        level: CognitiveLevel,
879        limit: i64,
880    ) -> Result<Vec<Memory>> {
881        let rows = sqlx::query_as::<_, MemoryRow>(&format!(
882            r#"
883            SELECT * FROM memories
884            WHERE namespace_id = ?
885              AND is_active = 1
886              AND {} = ?
887            ORDER BY created_at DESC
888            LIMIT ?
889            "#,
890            COGNITIVE_LEVEL_EXPR,
891        ))
892        .bind(namespace_id)
893        .bind(level.as_str())
894        .bind(limit)
895        .fetch_all(&self.pool)
896        .await
897        .map_err(db_error)?;
898
899        rows.into_iter()
900            .map(|row| self.row_to_memory(row))
901            .collect()
902    }
903
904    /// Get active memories by cognitive level and perspective, ordered from newest to oldest.
905    ///
906    /// Unlike [`Self::get_by_cognitive_level`], this method applies perspective filtering
907    /// (observer, subject, session_key, and session_keys arrays) in the SQL query BEFORE
908    /// the LIMIT is applied, ensuring the caller receives up to `limit` matching results.
909    pub async fn get_by_cognitive_level_with_perspective(
910        &self,
911        namespace_id: i64,
912        level: CognitiveLevel,
913        perspective: &PerspectiveKey,
914        limit: i64,
915    ) -> Result<Vec<Memory>> {
916        let perspective_sql = perspective_where_clause(perspective);
917        let sql = format!(
918            r#"
919            SELECT * FROM memories
920            WHERE namespace_id = ?
921              AND is_active = 1
922              AND {COGNITIVE_LEVEL_EXPR} = ?
923              AND {perspective_sql}
924            ORDER BY created_at DESC
925            LIMIT ?
926            "#
927        );
928        let mut query = sqlx::query_as::<_, MemoryRow>(&sql)
929            .bind(namespace_id)
930            .bind(level.as_str());
931
932        for val in bind_perspective(perspective) {
933            query = query.bind(val);
934        }
935
936        let rows = query
937            .bind(limit)
938            .fetch_all(&self.pool)
939            .await
940            .map_err(db_error)?;
941
942        rows.into_iter()
943            .map(|row| self.row_to_memory(row))
944            .collect()
945    }
946
947    /// Get the most reinforced perspective-aligned memories first.
948    ///
949    /// By default, raw-activity noise and contradiction memories are excluded.
950    pub async fn get_most_reinforced_by_perspective(
951        &self,
952        namespace_id: i64,
953        perspective: &PerspectiveKey,
954        limit: i64,
955    ) -> Result<Vec<Memory>> {
956        self.get_most_reinforced_by_perspective_opts(namespace_id, perspective, limit, false)
957            .await
958    }
959
960    /// Like [`Self::get_most_reinforced_by_perspective`] but allows opting into
961    /// raw-activity noise inclusion.
962    pub async fn get_most_reinforced_by_perspective_opts(
963        &self,
964        namespace_id: i64,
965        perspective: &PerspectiveKey,
966        limit: i64,
967        include_raw: bool,
968    ) -> Result<Vec<Memory>> {
969        let noise_sql = if include_raw {
970            String::new()
971        } else {
972            format!("AND {}", RAW_ACTIVITY_FILTER_SQL)
973        };
974
975        let perspective_sql = perspective_where_clause(perspective);
976        let sql = format!(
977            r#"
978            SELECT * FROM memories
979            WHERE namespace_id = ?
980              AND is_active = 1
981              AND {perspective_sql}
982              AND {COGNITIVE_LEVEL_EXPR} != ?
983              {noise_sql}
984            ORDER BY COALESCE(json_extract({METADATA}, '$.cognitive.times_reinforced'), 0) DESC,
985                     created_at DESC
986            LIMIT ?
987            "#
988        );
989        let mut query = sqlx::query_as::<_, MemoryRow>(&sql).bind(namespace_id);
990
991        for val in bind_perspective(perspective) {
992            query = query.bind(val);
993        }
994
995        let rows = query
996            .bind(CognitiveLevel::Contradiction.as_str())
997            .bind(limit)
998            .fetch_all(&self.pool)
999            .await
1000            .map_err(db_error)?;
1001
1002        rows.into_iter()
1003            .map(|row| self.row_to_memory(row))
1004            .collect()
1005    }
1006
1007    /// Get contradiction memories for the requested perspective.
1008    ///
1009    /// By default, raw-activity noise is excluded.
1010    pub async fn get_contradictions_by_perspective(
1011        &self,
1012        namespace_id: i64,
1013        perspective: &PerspectiveKey,
1014        limit: i64,
1015    ) -> Result<Vec<Memory>> {
1016        self.get_contradictions_by_perspective_opts(namespace_id, perspective, limit, false)
1017            .await
1018    }
1019
1020    /// Like [`Self::get_contradictions_by_perspective`] but allows opting into
1021    /// raw-activity noise inclusion.
1022    pub async fn get_contradictions_by_perspective_opts(
1023        &self,
1024        namespace_id: i64,
1025        perspective: &PerspectiveKey,
1026        limit: i64,
1027        include_raw: bool,
1028    ) -> Result<Vec<Memory>> {
1029        let noise_sql = if include_raw {
1030            String::new()
1031        } else {
1032            format!("AND {}", RAW_ACTIVITY_FILTER_SQL)
1033        };
1034
1035        let perspective_sql = perspective_where_clause(perspective);
1036        let sql = format!(
1037            r#"
1038            SELECT * FROM memories
1039            WHERE namespace_id = ?
1040              AND is_active = 1
1041              AND {perspective_sql}
1042              AND {COGNITIVE_LEVEL_EXPR} = ?
1043              {noise_sql}
1044            ORDER BY COALESCE(json_extract({METADATA}, '$.cognitive.times_contradicted'), 0) DESC,
1045                     created_at DESC
1046            LIMIT ?
1047            "#
1048        );
1049        let mut query = sqlx::query_as::<_, MemoryRow>(&sql).bind(namespace_id);
1050
1051        for val in bind_perspective(perspective) {
1052            query = query.bind(val);
1053        }
1054
1055        let rows = query
1056            .bind(CognitiveLevel::Contradiction.as_str())
1057            .bind(limit)
1058            .fetch_all(&self.pool)
1059            .await
1060            .map_err(db_error)?;
1061
1062        rows.into_iter()
1063            .map(|row| self.row_to_memory(row))
1064            .collect()
1065    }
1066
1067    // ---------------------------------------------------------------------------
1068    // Working-set assembly
1069    // ---------------------------------------------------------------------------
1070
1071    /// Assemble a bounded working set from multiple retrieval buckets.
1072    ///
1073    /// This is the low-level repository primitive used by higher-level services
1074    /// (e.g. `RepresentationService`) to build a [`WorkingRepresentation`].
1075    ///
1076    /// The method queries four buckets in sequence:
1077    ///
1078    /// 1. **Digests** — latest short + long session digests
1079    /// 2. **Reinforced** — highest reinforcement-scored memories
1080    /// 3. **Recent** — newest perspective-aligned memories
1081    /// 4. **Contradictions** — highest-confidence contradiction records
1082    ///
1083    /// Results are deduplicated by memory ID (first-seen-wins in bucket order)
1084    /// and truncated to `max_items`.
1085    pub async fn search_working_set(&self, params: WorkingSetParams<'_>) -> Result<Vec<Memory>> {
1086        let WorkingSetParams {
1087            namespace_id,
1088            perspective,
1089            max_items,
1090            include_raw,
1091        } = params;
1092
1093        // Each sub-query requests more than max_items so dedup still has enough
1094        // material after filtering.
1095        let per_bucket = (max_items as i64).max(8);
1096
1097        // 1. Digests
1098        let session = perspective
1099            .and_then(|p| p.session_key.as_deref())
1100            .unwrap_or("");
1101        let mut digests = Vec::new();
1102        if let Some(short) = self
1103            .latest_digest_for_session(namespace_id, session, "short")
1104            .await?
1105        {
1106            digests.push(short);
1107        }
1108        if let Some(long) = self
1109            .latest_digest_for_session(namespace_id, session, "long")
1110            .await?
1111        {
1112            digests.push(long);
1113        }
1114
1115        // 2-4. Perspective or namespace-level queries
1116        let reinforced = if let Some(persp) = perspective {
1117            self.get_most_reinforced_by_perspective_opts(
1118                namespace_id,
1119                persp,
1120                per_bucket,
1121                include_raw,
1122            )
1123            .await?
1124        } else {
1125            self.get_most_reinforced_by_namespace(namespace_id, per_bucket, include_raw)
1126                .await?
1127        };
1128
1129        let recent = if let Some(persp) = perspective {
1130            self.get_recent_by_perspective_opts(namespace_id, persp, per_bucket, include_raw)
1131                .await?
1132        } else {
1133            let sql = if include_raw {
1134                "SELECT * FROM memories WHERE namespace_id = ? AND is_active = 1 ORDER BY created_at DESC LIMIT ?"
1135                    .to_string()
1136            } else {
1137                format!(
1138                    "SELECT * FROM memories WHERE namespace_id = ? AND is_active = 1 AND {} ORDER BY created_at DESC LIMIT ?",
1139                    RAW_ACTIVITY_FILTER_SQL,
1140                )
1141            };
1142            let rows: Vec<MemoryRow> = sqlx::query_as(&sql)
1143                .bind(namespace_id)
1144                .bind(per_bucket)
1145                .fetch_all(&self.pool)
1146                .await
1147                .map_err(db_error)?;
1148            rows.into_iter()
1149                .map(|r| self.row_to_memory(r))
1150                .collect::<Result<Vec<_>>>()?
1151        };
1152
1153        let contradictions = if let Some(persp) = perspective {
1154            self.get_contradictions_by_perspective_opts(
1155                namespace_id,
1156                persp,
1157                per_bucket,
1158                include_raw,
1159            )
1160            .await?
1161        } else {
1162            self.get_contradictions_by_namespace(namespace_id, per_bucket, include_raw)
1163                .await?
1164        };
1165
1166        // Merge: digests → reinforced → recent → contradictions, dedupe by ID
1167        let mut seen = std::collections::HashSet::new();
1168        let mut result = Vec::with_capacity(max_items);
1169
1170        for memory in digests
1171            .into_iter()
1172            .chain(reinforced)
1173            .chain(recent)
1174            .chain(contradictions)
1175        {
1176            if seen.insert(memory.id) {
1177                result.push(memory);
1178                if result.len() >= max_items {
1179                    break;
1180                }
1181            }
1182        }
1183
1184        Ok(result)
1185    }
1186
1187    /// Load all evidence lineage entries for a given memory (as derived or source).
1188    pub async fn load_lineage(&self, memory_id: i64) -> Result<Vec<MemoryLineageEntry>> {
1189        let rows: Vec<MemoryEvidenceRow> = sqlx::query_as::<_, MemoryEvidenceRow>(
1190            r#"
1191            SELECT * FROM memory_evidence
1192            WHERE derived_memory_id = ? OR source_memory_id = ?
1193            ORDER BY created_at ASC
1194            "#,
1195        )
1196        .bind(memory_id)
1197        .bind(memory_id)
1198        .fetch_all(&self.pool)
1199        .await
1200        .map_err(db_error)?;
1201
1202        Ok(rows
1203            .into_iter()
1204            .map(|r| MemoryLineageEntry {
1205                derived_memory_id: r.derived_memory_id,
1206                source_memory_id: r.source_memory_id,
1207                evidence_role: r.evidence_role,
1208            })
1209            .collect())
1210    }
1211
1212    /// Load lineage rows for many source/derived memory IDs in one query.
1213    pub async fn load_lineage_batch(
1214        &self,
1215        memory_ids: &[i64],
1216    ) -> Result<HashMap<i64, Vec<MemoryLineageEntry>>> {
1217        if memory_ids.is_empty() {
1218            return Ok(HashMap::new());
1219        }
1220
1221        let placeholders = memory_ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1222        let sql = format!(
1223            r#"
1224            SELECT * FROM memory_evidence
1225            WHERE derived_memory_id IN ({placeholders})
1226               OR source_memory_id IN ({placeholders})
1227            ORDER BY created_at ASC
1228            "#
1229        );
1230
1231        let mut query = sqlx::query_as::<_, MemoryEvidenceRow>(&sql);
1232        for id in memory_ids {
1233            query = query.bind(*id);
1234        }
1235        for id in memory_ids {
1236            query = query.bind(*id);
1237        }
1238
1239        let rows = query.fetch_all(&self.pool).await.map_err(db_error)?;
1240        let id_set: HashSet<i64> = memory_ids.iter().copied().collect();
1241        let mut grouped: HashMap<i64, Vec<MemoryLineageEntry>> = HashMap::new();
1242
1243        for row in rows {
1244            let entry = MemoryLineageEntry {
1245                derived_memory_id: row.derived_memory_id,
1246                source_memory_id: row.source_memory_id,
1247                evidence_role: row.evidence_role,
1248            };
1249
1250            if id_set.contains(&entry.derived_memory_id) {
1251                grouped
1252                    .entry(entry.derived_memory_id)
1253                    .or_default()
1254                    .push(entry.clone());
1255            }
1256            if id_set.contains(&entry.source_memory_id) {
1257                grouped
1258                    .entry(entry.source_memory_id)
1259                    .or_default()
1260                    .push(entry);
1261            }
1262        }
1263
1264        Ok(grouped)
1265    }
1266
1267    /// Get a memory by ID
1268    pub async fn get_by_id(&self, id: i64) -> Result<Option<Memory>> {
1269        let row: Option<MemoryRow> = sqlx::query_as("SELECT * FROM memories WHERE id = ?")
1270            .bind(id)
1271            .fetch_optional(&self.pool)
1272            .await
1273            .map_err(db_error)?;
1274
1275        row.map(|r| self.row_to_memory(r)).transpose()
1276    }
1277
1278    /// Get multiple memories by their IDs (batch fetch).
1279    ///
1280    /// Returns only memories that exist; the result Vec length may be less than
1281    /// the input slice length if some IDs are not found.
1282    pub async fn get_by_ids(&self, ids: &[i64]) -> Result<Vec<Memory>> {
1283        if ids.is_empty() {
1284            return Ok(Vec::new());
1285        }
1286
1287        // Build parameter placeholders: (?, ?, ?, ...)
1288        let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1289
1290        let sql = format!("SELECT * FROM memories WHERE id IN ({placeholders})");
1291
1292        let mut query = sqlx::query_as::<_, MemoryRow>(&sql);
1293        for id in ids {
1294            query = query.bind(*id);
1295        }
1296
1297        let rows = query.fetch_all(&self.pool).await.map_err(db_error)?;
1298
1299        let mut memories: Vec<Memory> = Vec::with_capacity(rows.len());
1300        for row in rows {
1301            memories.push(self.row_to_memory(row)?);
1302        }
1303
1304        Ok(memories)
1305    }
1306
1307    /// Get a memory by namespace and content (fallback for id 0 edge case)
1308    pub async fn get_by_content(&self, namespace_id: i64, content: &str) -> Result<Memory> {
1309        let row: Option<MemoryRow> = sqlx::query_as(
1310            "SELECT * FROM memories WHERE namespace_id = ? AND LOWER(TRIM(content)) = LOWER(TRIM(?)) AND is_active = 1 ORDER BY created_at DESC LIMIT 1"
1311        )
1312        .bind(namespace_id)
1313        .bind(content)
1314        .fetch_optional(&self.pool)
1315        .await
1316        .map_err(db_error)?;
1317
1318        row.map(|r| self.row_to_memory(r))
1319            .transpose()?
1320            .ok_or_else(|| {
1321                nexus_core::NexusError::Storage(
1322                    "No memories found in namespace after insert".to_string(),
1323                )
1324            })
1325    }
1326
1327    /// Search memories by namespace
1328    pub async fn search_by_namespace(
1329        &self,
1330        namespace_id: i64,
1331        limit: usize,
1332        offset: usize,
1333    ) -> Result<Vec<Memory>> {
1334        let rows: Vec<MemoryRow> = sqlx::query_as(
1335            "SELECT * FROM memories WHERE namespace_id = ? AND is_active = 1 ORDER BY created_at DESC LIMIT ? OFFSET ?"
1336        )
1337        .bind(namespace_id)
1338        .bind(limit as i64)
1339        .bind(offset as i64)
1340        .fetch_all(&self.pool)
1341        .await
1342        .map_err(db_error)?;
1343
1344        rows.into_iter().map(|r| self.row_to_memory(r)).collect()
1345    }
1346
1347    /// Count memories in namespace
1348    pub async fn count_by_namespace(&self, namespace_id: i64) -> Result<i64> {
1349        let count: (i64,) = sqlx::query_as(
1350            "SELECT COUNT(*) FROM memories WHERE namespace_id = ? AND is_active = 1",
1351        )
1352        .bind(namespace_id)
1353        .fetch_one(&self.pool)
1354        .await
1355        .map_err(db_error)?;
1356
1357        Ok(count.0)
1358    }
1359
1360    /// Count all memories in namespace (including inactive/archived)
1361    pub async fn count_all_by_namespace(&self, namespace_id: i64) -> Result<i64> {
1362        let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM memories WHERE namespace_id = ?")
1363            .bind(namespace_id)
1364            .fetch_one(&self.pool)
1365            .await
1366            .map_err(db_error)?;
1367
1368        Ok(count.0)
1369    }
1370
1371    /// Count archived memories in namespace
1372    pub async fn count_archived_by_namespace(&self, namespace_id: i64) -> Result<i64> {
1373        let count: (i64,) = sqlx::query_as(
1374            "SELECT COUNT(*) FROM memories WHERE namespace_id = ? AND is_archived = 1",
1375        )
1376        .bind(namespace_id)
1377        .fetch_one(&self.pool)
1378        .await
1379        .map_err(db_error)?;
1380
1381        Ok(count.0)
1382    }
1383
1384    /// Delete a memory
1385    pub async fn delete(&self, id: i64) -> Result<bool> {
1386        let result = sqlx::query("DELETE FROM memories WHERE id = ?")
1387            .bind(id)
1388            .execute(&self.pool)
1389            .await
1390            .map_err(db_error)?;
1391
1392        Ok(result.rows_affected() > 0)
1393    }
1394
1395    /// Update access count
1396    pub async fn touch(&self, id: i64) -> Result<()> {
1397        sqlx::query(
1398            "UPDATE memories SET access_count = access_count + 1, last_accessed = ? WHERE id = ?",
1399        )
1400        .bind(Utc::now())
1401        .bind(id)
1402        .execute(&self.pool)
1403        .await
1404        .map_err(db_error)?;
1405
1406        Ok(())
1407    }
1408
1409    /// Get unconsolidated memories
1410    pub async fn get_unconsolidated(
1411        &self,
1412        namespace_id: i64,
1413        limit: i32,
1414    ) -> Result<Vec<MemoryRow>> {
1415        let rows = sqlx::query_as::<_, MemoryRow>(
1416            r#"
1417            SELECT * FROM memories
1418            WHERE namespace_id = ?
1419            AND is_active = 1
1420            AND (metadata IS NULL OR json_extract(metadata, '$.agent.consolidated') IS NULL)
1421            ORDER BY created_at ASC
1422            LIMIT ?
1423            "#,
1424        )
1425        .bind(namespace_id)
1426        .bind(limit)
1427        .fetch_all(&self.pool)
1428        .await
1429        .map_err(db_error)?;
1430
1431        Ok(rows)
1432    }
1433
1434    /// Mark a memory as consolidated
1435    pub async fn mark_consolidated(&self, id: i64) -> Result<()> {
1436        sqlx::query(
1437            r#"
1438            UPDATE memories
1439            SET metadata = json_set(
1440                COALESCE(metadata, '{}'),
1441                '$.agent.consolidated',
1442                true,
1443                '$.agent.consolidated_at',
1444                datetime('now')
1445            ),
1446            updated_at = datetime('now')
1447            WHERE id = ?
1448            "#,
1449        )
1450        .bind(id)
1451        .execute(&self.pool)
1452        .await
1453        .map_err(db_error)?;
1454
1455        Ok(())
1456    }
1457
1458    /// Mark multiple memories as consolidated in a single query
1459    pub async fn mark_consolidated_batch(&self, ids: &[i64]) -> Result<()> {
1460        if ids.is_empty() {
1461            return Ok(());
1462        }
1463        let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1464        let query = format!(
1465            r#"
1466            UPDATE memories
1467            SET metadata = json_set(
1468                COALESCE(metadata, '{{}}'),
1469                '$.agent.consolidated',
1470                true,
1471                '$.agent.consolidated_at',
1472                datetime('now')
1473            ),
1474            updated_at = datetime('now')
1475            WHERE id IN ({})
1476            "#,
1477            placeholders
1478        );
1479        let mut q = sqlx::query(&query);
1480        for id in ids {
1481            q = q.bind(*id);
1482        }
1483        q.execute(&self.pool).await.map_err(db_error)?;
1484        Ok(())
1485    }
1486
1487    /// Search memories by text content (LIKE search)
1488    pub async fn search_by_text(
1489        &self,
1490        namespace_id: i64,
1491        query: &str,
1492        limit: i32,
1493        include_raw: bool,
1494    ) -> Result<Vec<MemoryRow>> {
1495        let pattern = format!("%{}%", query);
1496        let raw_clause = if include_raw {
1497            String::new()
1498        } else {
1499            format!("AND {RAW_ACTIVITY_FILTER_SQL}")
1500        };
1501        let rows = sqlx::query_as::<_, MemoryRow>(&format!(
1502            r#"
1503            SELECT * FROM memories
1504            WHERE namespace_id = ?
1505            AND is_active = 1
1506            AND content LIKE ?
1507            {}
1508            ORDER BY updated_at DESC
1509            LIMIT ?
1510            "#,
1511            raw_clause
1512        ))
1513        .bind(namespace_id)
1514        .bind(&pattern)
1515        .bind(limit)
1516        .fetch_all(&self.pool)
1517        .await
1518        .map_err(db_error)?;
1519
1520        Ok(rows)
1521    }
1522
1523    /// Search memories by text content and return domain memories.
1524    pub async fn search_by_text_memories(
1525        &self,
1526        namespace_id: i64,
1527        query: &str,
1528        limit: i32,
1529        include_raw: bool,
1530    ) -> Result<Vec<Memory>> {
1531        let rows = self
1532            .search_by_text(namespace_id, query, limit, include_raw)
1533            .await?;
1534        rows.into_iter()
1535            .map(|row| self.row_to_memory(row))
1536            .collect()
1537    }
1538
1539    /// Fetch recent, embedding-bearing cognition memories for vector-first semantic recall.
1540    pub async fn get_semantic_candidates(
1541        &self,
1542        params: SemanticCandidateParams<'_>,
1543    ) -> Result<Vec<Memory>> {
1544        let SemanticCandidateParams {
1545            namespace_id,
1546            perspective,
1547            limit,
1548            include_raw,
1549        } = params;
1550
1551        let noise_sql = if include_raw {
1552            String::new()
1553        } else {
1554            format!("AND {}", RAW_ACTIVITY_FILTER_SQL)
1555        };
1556
1557        let rows = if let Some(perspective) = perspective {
1558            let sql = if perspective.session_key.is_some() {
1559                format!(
1560                    r#"
1561                    SELECT * FROM memories
1562                    WHERE namespace_id = ?
1563                      AND is_active = 1
1564                      AND content_embedding IS NOT NULL
1565                      AND json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.observer') = ?
1566                      AND json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.subject') = ?
1567                      AND (
1568                          json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.session_key') = ?
1569                          OR EXISTS (
1570                              SELECT 1
1571                              FROM json_each(COALESCE(json_extract(metadata, '$.cognitive.session_keys'), '[]'))
1572                              WHERE value = ?
1573                          )
1574                      )
1575                      {noise_sql}
1576                    ORDER BY updated_at DESC, created_at DESC
1577                    LIMIT ?
1578                    "#
1579                )
1580            } else {
1581                format!(
1582                    r#"
1583                    SELECT * FROM memories
1584                    WHERE namespace_id = ?
1585                      AND is_active = 1
1586                      AND content_embedding IS NOT NULL
1587                      AND json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.observer') = ?
1588                      AND json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.subject') = ?
1589                      {noise_sql}
1590                    ORDER BY updated_at DESC, created_at DESC
1591                    LIMIT ?
1592                    "#
1593                )
1594            };
1595
1596            let mut query = sqlx::query_as::<_, MemoryRow>(&sql)
1597                .bind(namespace_id)
1598                .bind(&perspective.observer)
1599                .bind(&perspective.subject);
1600
1601            if let Some(session_key) = &perspective.session_key {
1602                query = query.bind(session_key);
1603                query = query.bind(session_key);
1604            }
1605
1606            query
1607                .bind(limit)
1608                .fetch_all(&self.pool)
1609                .await
1610                .map_err(db_error)?
1611        } else {
1612            let sql = if include_raw {
1613                r#"
1614                SELECT * FROM memories
1615                WHERE namespace_id = ?
1616                  AND is_active = 1
1617                  AND content_embedding IS NOT NULL
1618                ORDER BY updated_at DESC, created_at DESC
1619                LIMIT ?
1620                "#
1621                .to_string()
1622            } else {
1623                format!(
1624                    r#"
1625                    SELECT * FROM memories
1626                    WHERE namespace_id = ?
1627                      AND is_active = 1
1628                      AND content_embedding IS NOT NULL
1629                      AND {}
1630                    ORDER BY updated_at DESC, created_at DESC
1631                    LIMIT ?
1632                    "#,
1633                    RAW_ACTIVITY_FILTER_SQL,
1634                )
1635            };
1636
1637            sqlx::query_as::<_, MemoryRow>(&sql)
1638                .bind(namespace_id)
1639                .bind(limit)
1640                .fetch_all(&self.pool)
1641                .await
1642                .map_err(db_error)?
1643        };
1644
1645        rows.into_iter()
1646            .map(|row| self.row_to_memory(row))
1647            .collect()
1648    }
1649
1650    /// List memories with optional filters
1651    pub async fn list_filtered(
1652        &self,
1653        namespace_id: i64,
1654        filters: ListMemoryFilters<'_>,
1655    ) -> Result<Vec<Memory>> {
1656        // Build WHERE clause dynamically
1657        let mut conditions = vec!["namespace_id = ?1".to_string(), "is_active = 1".to_string()];
1658        let mut param_idx = 2u32;
1659
1660        if filters.category.is_some() {
1661            conditions.push(format!("category = ?{}", param_idx));
1662            param_idx += 1;
1663        }
1664        if filters.since.is_some() {
1665            conditions.push(format!("created_at >= ?{}", param_idx));
1666            param_idx += 1;
1667        }
1668        if filters.until.is_some() {
1669            conditions.push(format!("created_at <= ?{}", param_idx));
1670            param_idx += 1;
1671        }
1672        if filters.content_like.is_some() {
1673            conditions.push(format!("content LIKE ?{}", param_idx));
1674            param_idx += 1;
1675        }
1676        if !filters.include_raw {
1677            conditions.push(RAW_ACTIVITY_FILTER_SQL.to_string());
1678        }
1679
1680        let sql = format!(
1681            "SELECT * FROM memories WHERE {} ORDER BY created_at DESC LIMIT ?{} OFFSET ?{}",
1682            conditions.join(" AND "),
1683            param_idx,
1684            param_idx + 1,
1685        );
1686
1687        let mut query = sqlx::query_as::<_, MemoryRow>(&sql).bind(namespace_id);
1688
1689        if let Some(cat) = filters.category {
1690            query = query.bind(cat.to_string());
1691        }
1692        if let Some(s) = filters.since {
1693            query = query.bind(s);
1694        }
1695        if let Some(u) = filters.until {
1696            query = query.bind(u);
1697        }
1698        if let Some(search) = filters.content_like {
1699            query = query.bind(format!("%{}%", search));
1700        }
1701
1702        let rows: Vec<MemoryRow> = query
1703            .bind(filters.limit)
1704            .bind(filters.offset)
1705            .fetch_all(&self.pool)
1706            .await
1707            .map_err(db_error)?;
1708
1709        rows.into_iter().map(|r| self.row_to_memory(r)).collect()
1710    }
1711
1712    /// List memories that still need cognition metadata backfilled.
1713    pub async fn list_missing_cognitive_metadata(
1714        &self,
1715        namespace_id: i64,
1716        limit: i64,
1717        offset: i64,
1718    ) -> Result<Vec<Memory>> {
1719        let rows: Vec<MemoryRow> = sqlx::query_as(
1720            r#"
1721            SELECT * FROM memories
1722            WHERE namespace_id = ?
1723            AND json_extract(COALESCE(metadata, '{}'), '$.cognitive.level') IS NULL
1724            ORDER BY id ASC
1725            LIMIT ? OFFSET ?
1726            "#,
1727        )
1728        .bind(namespace_id)
1729        .bind(limit)
1730        .bind(offset)
1731        .fetch_all(&self.pool)
1732        .await
1733        .map_err(db_error)?;
1734
1735        rows.into_iter().map(|r| self.row_to_memory(r)).collect()
1736    }
1737
1738    /// Count memories that still need cognition metadata backfilled.
1739    pub async fn count_missing_cognitive_metadata(&self, namespace_id: i64) -> Result<i64> {
1740        let count: i64 = sqlx::query_scalar(
1741            r#"
1742            SELECT COUNT(*) FROM memories
1743            WHERE namespace_id = ?
1744            AND json_extract(COALESCE(metadata, '{}'), '$.cognitive.level') IS NULL
1745            "#,
1746        )
1747        .bind(namespace_id)
1748        .fetch_one(&self.pool)
1749        .await
1750        .map_err(db_error)?;
1751
1752        Ok(count)
1753    }
1754
1755    /// Replace the metadata blob for a single memory.
1756    pub async fn update_memory_metadata(
1757        &self,
1758        memory_id: i64,
1759        metadata: &serde_json::Value,
1760    ) -> Result<()> {
1761        let metadata_json = serde_json::to_string(metadata)?;
1762        sqlx::query(
1763            r#"
1764            UPDATE memories
1765            SET metadata = ?, updated_at = ?
1766            WHERE id = ?
1767            "#,
1768        )
1769        .bind(metadata_json)
1770        .bind(Utc::now())
1771        .bind(memory_id)
1772        .execute(&self.pool)
1773        .await
1774        .map_err(db_error)?;
1775
1776        Ok(())
1777    }
1778
1779    /// List distinct session keys that have cognitive metadata but no digest coverage yet.
1780    pub async fn list_session_keys_without_digests(
1781        &self,
1782        namespace_id: i64,
1783        limit: i64,
1784    ) -> Result<Vec<String>> {
1785        let rows: Vec<(String,)> = sqlx::query_as(
1786            r#"
1787            SELECT DISTINCT json_extract(COALESCE(m.metadata, '{}'), '$.cognitive.session_key') AS session_key
1788            FROM memories m
1789            WHERE m.namespace_id = ?
1790            AND json_extract(COALESCE(m.metadata, '{}'), '$.cognitive.session_key') IS NOT NULL
1791            AND TRIM(json_extract(COALESCE(m.metadata, '{}'), '$.cognitive.session_key')) <> ''
1792            AND NOT EXISTS (
1793                SELECT 1 FROM session_digests sd
1794                WHERE sd.namespace_id = m.namespace_id
1795                AND sd.session_key = json_extract(COALESCE(m.metadata, '{}'), '$.cognitive.session_key')
1796            )
1797            ORDER BY session_key ASC
1798            LIMIT ?
1799            "#,
1800        )
1801        .bind(namespace_id)
1802        .bind(limit)
1803        .fetch_all(&self.pool)
1804        .await
1805        .map_err(db_error)?;
1806
1807        Ok(rows.into_iter().map(|(session_key,)| session_key).collect())
1808    }
1809
1810    /// Count distinct non-empty cognitive session keys present in active memories.
1811    pub async fn count_distinct_session_keys_with_cognition(
1812        &self,
1813        namespace_id: i64,
1814    ) -> Result<i64> {
1815        let count: i64 = sqlx::query_scalar(
1816            r#"
1817            SELECT COUNT(DISTINCT json_extract(COALESCE(metadata, '{}'), '$.cognitive.session_key'))
1818            FROM memories
1819            WHERE namespace_id = ?
1820              AND is_active = 1
1821              AND json_extract(COALESCE(metadata, '{}'), '$.cognitive.session_key') IS NOT NULL
1822              AND TRIM(json_extract(COALESCE(metadata, '{}'), '$.cognitive.session_key')) <> ''
1823            "#,
1824        )
1825        .bind(namespace_id)
1826        .fetch_one(&self.pool)
1827        .await
1828        .map_err(db_error)?;
1829
1830        Ok(count)
1831    }
1832
1833    /// List lineage-backed archived raw-activity memories that are safe to prune.
1834    pub async fn list_archived_raw_cleanup_candidates(
1835        &self,
1836        namespace_id: i64,
1837        older_than: DateTime<Utc>,
1838        limit: i64,
1839    ) -> Result<Vec<Memory>> {
1840        let rows: Vec<MemoryRow> = sqlx::query_as(
1841            r#"
1842            SELECT * FROM memories
1843            WHERE namespace_id = ?
1844            AND is_active = 0
1845            AND is_archived = 1
1846            AND (
1847                labels LIKE '%raw-activity%'
1848                OR json_extract(COALESCE(metadata, '{}'), '$.raw_activity') = 1
1849            )
1850            AND json_extract(COALESCE(metadata, '{}'), '$.distillation.summary_memory_id') IS NOT NULL
1851            AND created_at <= ?
1852            ORDER BY created_at ASC
1853            LIMIT ?
1854            "#,
1855        )
1856        .bind(namespace_id)
1857        .bind(older_than)
1858        .bind(limit)
1859        .fetch_all(&self.pool)
1860        .await
1861        .map_err(db_error)?;
1862
1863        rows.into_iter().map(|r| self.row_to_memory(r)).collect()
1864    }
1865
1866    /// Count lineage-backed archived raw-activity memories that are safe to prune.
1867    pub async fn count_archived_raw_cleanup_candidates(
1868        &self,
1869        namespace_id: i64,
1870        older_than: DateTime<Utc>,
1871    ) -> Result<i64> {
1872        let count: i64 = sqlx::query_scalar(
1873            r#"
1874            SELECT COUNT(*) FROM memories
1875            WHERE namespace_id = ?
1876            AND is_active = 0
1877            AND is_archived = 1
1878            AND (
1879                labels LIKE '%raw-activity%'
1880                OR json_extract(COALESCE(metadata, '{}'), '$.raw_activity') = 1
1881            )
1882            AND json_extract(COALESCE(metadata, '{}'), '$.distillation.summary_memory_id') IS NOT NULL
1883            AND created_at <= ?
1884            "#,
1885        )
1886        .bind(namespace_id)
1887        .bind(older_than)
1888        .fetch_one(&self.pool)
1889        .await
1890        .map_err(db_error)?;
1891
1892        Ok(count)
1893    }
1894
1895    /// Delete a batch of memories by id.
1896    pub async fn delete_batch(&self, ids: &[i64]) -> Result<u64> {
1897        if ids.is_empty() {
1898            return Ok(0);
1899        }
1900
1901        let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1902        let sql = format!("DELETE FROM memories WHERE id IN ({placeholders})");
1903        let mut query = sqlx::query(&sql);
1904        for id in ids {
1905            query = query.bind(*id);
1906        }
1907
1908        let result = query.execute(&self.pool).await.map_err(db_error)?;
1909        Ok(result.rows_affected())
1910    }
1911
1912    /// Delete memories matching a content pattern (for cleaning noise)
1913    pub async fn delete_by_content_pattern(&self, namespace_id: i64, pattern: &str) -> Result<u64> {
1914        let result = sqlx::query("DELETE FROM memories WHERE namespace_id = ? AND content LIKE ?")
1915            .bind(namespace_id)
1916            .bind(pattern)
1917            .execute(&self.pool)
1918            .await
1919            .map_err(db_error)?;
1920
1921        Ok(result.rows_affected())
1922    }
1923
1924    /// Count memories matching filters (for stats with time ranges)
1925    pub async fn count_filtered(
1926        &self,
1927        namespace_id: i64,
1928        category: Option<&str>,
1929        since: Option<DateTime<Utc>>,
1930        until: Option<DateTime<Utc>>,
1931        include_raw: bool,
1932    ) -> Result<i64> {
1933        let mut conditions = vec!["namespace_id = ?1".to_string(), "is_active = 1".to_string()];
1934        let mut param_idx = 2u32;
1935
1936        if category.is_some() {
1937            conditions.push(format!("category = ?{}", param_idx));
1938            param_idx += 1;
1939        }
1940        if since.is_some() {
1941            conditions.push(format!("created_at >= ?{}", param_idx));
1942            param_idx += 1;
1943        }
1944        if until.is_some() {
1945            conditions.push(format!("created_at <= ?{}", param_idx));
1946        }
1947        if !include_raw {
1948            conditions.push(RAW_ACTIVITY_FILTER_SQL.to_string());
1949        }
1950
1951        let sql = format!(
1952            "SELECT COUNT(*) FROM memories WHERE {}",
1953            conditions.join(" AND "),
1954        );
1955
1956        let mut query = sqlx::query_scalar::<_, i64>(&sql).bind(namespace_id);
1957
1958        if let Some(cat) = category {
1959            query = query.bind(cat.to_string());
1960        }
1961        if let Some(s) = since {
1962            query = query.bind(s);
1963        }
1964        if let Some(u) = until {
1965            query = query.bind(u);
1966        }
1967
1968        let count = query.fetch_one(&self.pool).await.map_err(db_error)?;
1969        Ok(count)
1970    }
1971
1972    /// Store a distilled summary and archive its source memories atomically.
1973    pub async fn store_distilled_summary(
1974        &self,
1975        params: StoreMemoryParams<'_>,
1976        source_ids: &[i64],
1977    ) -> Result<Memory> {
1978        let labels_json = serde_json::to_string(params.labels)?;
1979        let metadata_json = serde_json::to_string(params.metadata)?;
1980        let embedding_json = params.embedding.map(serde_json::to_string).transpose()?;
1981        let mut tx = self.pool.begin().await.map_err(db_error)?;
1982
1983        let result = sqlx::query(
1984            r#"
1985            INSERT INTO memories (
1986                namespace_id, content, category, memory_lane_type, labels, metadata,
1987                content_embedding, embedding_model, created_at, is_active, access_count
1988            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 1, 0)
1989            "#,
1990        )
1991        .bind(params.namespace_id)
1992        .bind(params.content)
1993        .bind(params.category.to_string())
1994        .bind(params.memory_lane_type.map(|t| t.to_string()))
1995        .bind(&labels_json)
1996        .bind(&metadata_json)
1997        .bind(&embedding_json)
1998        .bind(params.embedding_model)
1999        .bind(Utc::now())
2000        .execute(&mut *tx)
2001        .await
2002        .map_err(db_error)?;
2003
2004        let summary_id = if result.last_insert_rowid() == 0 {
2005            let row: Option<MemoryRow> = sqlx::query_as(
2006                "SELECT * FROM memories WHERE namespace_id = ? AND LOWER(TRIM(content)) = LOWER(TRIM(?)) ORDER BY created_at DESC LIMIT 1",
2007            )
2008            .bind(params.namespace_id)
2009            .bind(params.content)
2010            .fetch_optional(&mut *tx)
2011            .await
2012            .map_err(db_error)?;
2013            row.map(|memory| memory.id).ok_or_else(|| {
2014                nexus_core::NexusError::Storage(
2015                    "Duplicate distilled summary merged but matching row not found".to_string(),
2016                )
2017            })?
2018        } else {
2019            result.last_insert_rowid()
2020        };
2021
2022        if !source_ids.is_empty() {
2023            for source_id in source_ids {
2024                sqlx::query(
2025                    r#"
2026                    INSERT OR IGNORE INTO memory_evidence (
2027                        derived_memory_id,
2028                        source_memory_id,
2029                        evidence_role,
2030                        created_at
2031                    ) VALUES (?, ?, 'source', datetime('now'))
2032                    "#,
2033                )
2034                .bind(summary_id)
2035                .bind(*source_id)
2036                .execute(&mut *tx)
2037                .await
2038                .map_err(db_error)?;
2039            }
2040
2041            let placeholders = source_ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
2042            let sql = format!(
2043                r#"
2044                UPDATE memories
2045                SET
2046                    is_active = 0,
2047                    is_archived = 1,
2048                    updated_at = ?,
2049                    metadata = json_set(
2050                        COALESCE(metadata, '{{}}'),
2051                        '$.distillation.status', 'archived',
2052                        '$.distillation.summary_memory_id', ?,
2053                        '$.distillation.archived_at', ?
2054                    )
2055                WHERE id IN ({})
2056                "#,
2057                placeholders
2058            );
2059            let archived_at = Utc::now().to_rfc3339();
2060            let mut query = sqlx::query(&sql)
2061                .bind(Utc::now())
2062                .bind(summary_id)
2063                .bind(&archived_at);
2064            for source_id in source_ids {
2065                query = query.bind(*source_id);
2066            }
2067            query.execute(&mut *tx).await.map_err(db_error)?;
2068        }
2069
2070        tx.commit().await.map_err(db_error)?;
2071        self.get_by_id(summary_id).await?.ok_or_else(|| {
2072            nexus_core::NexusError::Storage(format!(
2073                "Failed to retrieve distilled summary with id {}",
2074                summary_id
2075            ))
2076        })
2077    }
2078
2079    fn row_to_memory(&self, row: MemoryRow) -> Result<Memory> {
2080        let labels: Vec<String> = serde_json::from_str(&row.labels).map_err(|e| {
2081            nexus_core::NexusError::Storage(format!(
2082                "corrupted labels JSON for memory {}: {e}",
2083                row.id
2084            ))
2085        })?;
2086        let metadata: serde_json::Value = serde_json::from_str(&row.metadata).map_err(|e| {
2087            nexus_core::NexusError::Storage(format!(
2088                "corrupted metadata JSON for memory {}: {e}",
2089                row.id
2090            ))
2091        })?;
2092        let embedding: Option<Vec<f32>> = row
2093            .content_embedding
2094            .map(|e| {
2095                serde_json::from_str(&e).map_err(|err| {
2096                    nexus_core::NexusError::Storage(format!(
2097                        "corrupted embedding JSON for memory {}: {err}",
2098                        row.id
2099                    ))
2100                })
2101            })
2102            .transpose()?;
2103
2104        Ok(Memory {
2105            id: row.id,
2106            namespace_id: row.namespace_id,
2107            content: row.content,
2108            category: parse_category(&row.category)?,
2109            memory_lane_type: match &row.memory_lane_type {
2110                Some(s) => parse_memory_lane_type(s)?,
2111                None => None,
2112            },
2113            labels,
2114            metadata,
2115            similarity_score: row.similarity_score,
2116            relevance_score: row.relevance_score,
2117            content_embedding: embedding,
2118            embedding_model: row.embedding_model,
2119            created_at: row.created_at,
2120            updated_at: row.updated_at,
2121            last_accessed: row.last_accessed,
2122            is_active: row.is_active,
2123            is_archived: row.is_archived,
2124            access_count: row.access_count,
2125        })
2126    }
2127
2128    // ---- Observability queries ----
2129
2130    /// List memory jobs with optional filters.
2131    pub async fn list_jobs(
2132        &self,
2133        namespace_id: i64,
2134        job_type: Option<&str>,
2135        status: Option<&str>,
2136        limit: i64,
2137        offset: i64,
2138    ) -> Result<Vec<MemoryJobRow>> {
2139        let mut where_clauses = vec!["namespace_id = ?".to_string()];
2140        if job_type.is_some() {
2141            where_clauses.push("job_type = ?".to_string());
2142        }
2143        if status.is_some() {
2144            where_clauses.push("status = ?".to_string());
2145        }
2146        let where_sql = where_clauses.join(" AND ");
2147
2148        let sql = format!(
2149            "SELECT * FROM memory_jobs WHERE {} ORDER BY created_at DESC LIMIT ? OFFSET ?",
2150            where_sql
2151        );
2152
2153        let mut query = sqlx::query_as::<_, MemoryJobRow>(&sql).bind(namespace_id);
2154        if let Some(jt) = job_type {
2155            query = query.bind(jt);
2156        }
2157        if let Some(st) = status {
2158            query = query.bind(st);
2159        }
2160        query = query.bind(limit).bind(offset);
2161
2162        let rows = query.fetch_all(&self.pool).await.map_err(db_error)?;
2163        Ok(rows)
2164    }
2165
2166    /// Count memory jobs with optional filters.
2167    pub async fn count_jobs(
2168        &self,
2169        namespace_id: i64,
2170        job_type: Option<&str>,
2171        status: Option<&str>,
2172    ) -> Result<i64> {
2173        let mut where_clauses = vec!["namespace_id = ?".to_string()];
2174        if job_type.is_some() {
2175            where_clauses.push("job_type = ?".to_string());
2176        }
2177        if status.is_some() {
2178            where_clauses.push("status = ?".to_string());
2179        }
2180        let where_sql = where_clauses.join(" AND ");
2181
2182        let sql = format!("SELECT COUNT(*) FROM memory_jobs WHERE {}", where_sql);
2183
2184        let mut query = sqlx::query_scalar::<_, i64>(&sql).bind(namespace_id);
2185        if let Some(jt) = job_type {
2186            query = query.bind(jt);
2187        }
2188        if let Some(st) = status {
2189            query = query.bind(st);
2190        }
2191
2192        let count = query.fetch_one(&self.pool).await.map_err(db_error)?;
2193        Ok(count)
2194    }
2195
2196    /// Count memory jobs grouped by status for a namespace.
2197    pub async fn count_jobs_by_status(
2198        &self,
2199        namespace_id: i64,
2200        job_type: Option<&str>,
2201    ) -> Result<Vec<(String, i64)>> {
2202        let mut where_clauses = vec!["namespace_id = ?".to_string()];
2203        if job_type.is_some() {
2204            where_clauses.push("job_type = ?".to_string());
2205        }
2206        let where_sql = where_clauses.join(" AND ");
2207
2208        let sql = format!(
2209            "SELECT status, COUNT(*) as cnt FROM memory_jobs WHERE {} GROUP BY status",
2210            where_sql
2211        );
2212
2213        let mut query = sqlx::query_as::<_, (String, i64)>(&sql).bind(namespace_id);
2214        if let Some(jt) = job_type {
2215            query = query.bind(jt);
2216        }
2217
2218        let rows = query.fetch_all(&self.pool).await.map_err(db_error)?;
2219        Ok(rows)
2220    }
2221
2222    /// Delete completed jobs that were last updated before the given timestamp.
2223    ///
2224    /// Returns the number of rows removed.
2225    pub async fn purge_completed_jobs(&self, older_than: DateTime<Utc>) -> Result<u64> {
2226        let result = sqlx::query(
2227            r#"
2228            DELETE FROM memory_jobs
2229            WHERE status = ? AND updated_at < ?
2230            "#,
2231        )
2232        .bind(memory_job_status::COMPLETED)
2233        .bind(older_than.format("%Y-%m-%d %H:%M:%S").to_string())
2234        .execute(&self.pool)
2235        .await
2236        .map_err(db_error)?;
2237
2238        Ok(result.rows_affected())
2239    }
2240
2241    /// Delete permanently failed jobs (attempts >= 5) that were last updated
2242    /// before the given timestamp.
2243    ///
2244    /// Returns the number of rows removed.
2245    pub async fn purge_permanently_failed_jobs(&self, older_than: DateTime<Utc>) -> Result<u64> {
2246        let result = sqlx::query(
2247            r#"
2248            DELETE FROM memory_jobs
2249            WHERE status = ? AND attempts >= ? AND updated_at < ?
2250            "#,
2251        )
2252        .bind(memory_job_status::FAILED)
2253        .bind(MAX_JOB_ATTEMPTS)
2254        .bind(older_than.format("%Y-%m-%d %H:%M:%S").to_string())
2255        .execute(&self.pool)
2256        .await
2257        .map_err(db_error)?;
2258
2259        Ok(result.rows_affected())
2260    }
2261
2262    /// List session digests with optional session_key filter.
2263    pub async fn list_digests(
2264        &self,
2265        namespace_id: i64,
2266        session_key: Option<&str>,
2267        limit: i64,
2268        offset: i64,
2269    ) -> Result<Vec<SessionDigestRow>> {
2270        let mut query = if let Some(sk) = session_key {
2271            sqlx::query_as::<_, SessionDigestRow>(
2272                "SELECT * FROM session_digests WHERE namespace_id = ? AND session_key = ? ORDER BY created_at DESC LIMIT ? OFFSET ?"
2273            )
2274            .bind(namespace_id)
2275            .bind(sk)
2276        } else {
2277            sqlx::query_as::<_, SessionDigestRow>(
2278                "SELECT * FROM session_digests WHERE namespace_id = ? ORDER BY created_at DESC LIMIT ? OFFSET ?"
2279            )
2280            .bind(namespace_id)
2281        };
2282
2283        query = query.bind(limit).bind(offset);
2284        let rows = query.fetch_all(&self.pool).await.map_err(db_error)?;
2285        Ok(rows)
2286    }
2287
2288    /// Count session digests for a namespace, optionally filtered by session_key.
2289    pub async fn count_digests(&self, namespace_id: i64, session_key: Option<&str>) -> Result<i64> {
2290        let query = if let Some(sk) = session_key {
2291            sqlx::query_scalar(
2292                "SELECT COUNT(*) FROM session_digests WHERE namespace_id = ? AND session_key = ?",
2293            )
2294            .bind(namespace_id)
2295            .bind(sk)
2296        } else {
2297            sqlx::query_scalar("SELECT COUNT(*) FROM session_digests WHERE namespace_id = ?")
2298                .bind(namespace_id)
2299        };
2300
2301        let count: i64 = query.fetch_one(&self.pool).await.map_err(db_error)?;
2302        Ok(count)
2303    }
2304
2305    /// Count evidence edges for a namespace.
2306    pub async fn count_evidence(&self, namespace_id: i64) -> Result<i64> {
2307        let count: i64 = sqlx::query_scalar(
2308            "SELECT COUNT(*) FROM memory_evidence WHERE derived_memory_id IN (SELECT id FROM memories WHERE namespace_id = ?)"
2309        )
2310        .bind(namespace_id)
2311        .fetch_one(&self.pool)
2312        .await
2313        .map_err(db_error)?;
2314        Ok(count)
2315    }
2316
2317    /// Record a system metric sample.
2318    pub async fn record_metric(
2319        &self,
2320        metric_name: &str,
2321        metric_value: f64,
2322        labels: &serde_json::Value,
2323    ) -> Result<i64> {
2324        let labels_json = serde_json::to_string(labels)?;
2325        let id: i64 = sqlx::query_scalar(
2326            r#"
2327            INSERT INTO system_metrics (metric_name, metric_value, labels, recorded_at)
2328            VALUES (?, ?, ?, ?)
2329            RETURNING id
2330            "#,
2331        )
2332        .bind(metric_name)
2333        .bind(metric_value)
2334        .bind(labels_json)
2335        .bind(Utc::now())
2336        .fetch_one(&self.pool)
2337        .await
2338        .map_err(db_error)?;
2339        Ok(id)
2340    }
2341
2342    /// Persist multiple metric samples in a single transaction.
2343    pub async fn record_metrics_batch(&self, samples: &[MetricSample]) -> Result<()> {
2344        if samples.is_empty() {
2345            return Ok(());
2346        }
2347
2348        let mut tx = self.pool.begin().await.map_err(db_error)?;
2349        for sample in samples {
2350            let labels_json = serde_json::to_string(&sample.labels)?;
2351            sqlx::query(
2352                r#"
2353                INSERT INTO system_metrics (metric_name, metric_value, labels, recorded_at)
2354                VALUES (?, ?, ?, ?)
2355                "#,
2356            )
2357            .bind(&sample.metric_name)
2358            .bind(sample.metric_value)
2359            .bind(labels_json)
2360            .bind(Utc::now())
2361            .execute(&mut *tx)
2362            .await
2363            .map_err(db_error)?;
2364        }
2365        tx.commit().await.map_err(db_error)?;
2366        Ok(())
2367    }
2368
2369    /// Fetch the newest metric samples for a namespace and optional prefix.
2370    pub async fn latest_metrics_for_namespace(
2371        &self,
2372        namespace_id: i64,
2373        metric_prefix: Option<&str>,
2374        limit: i64,
2375    ) -> Result<Vec<SystemMetricRow>> {
2376        let limit = limit.max(1);
2377        let rows = if let Some(prefix) = metric_prefix {
2378            sqlx::query_as::<_, SystemMetricRow>(
2379                r#"
2380                SELECT *
2381                FROM system_metrics
2382                WHERE json_extract(COALESCE(labels, '{}'), '$.namespace_id') = ?
2383                  AND metric_name LIKE ?
2384                ORDER BY recorded_at DESC, id DESC
2385                LIMIT ?
2386                "#,
2387            )
2388            .bind(namespace_id)
2389            .bind(format!("{prefix}%"))
2390            .bind(limit)
2391            .fetch_all(&self.pool)
2392            .await
2393            .map_err(db_error)?
2394        } else {
2395            sqlx::query_as::<_, SystemMetricRow>(
2396                r#"
2397                SELECT *
2398                FROM system_metrics
2399                WHERE json_extract(COALESCE(labels, '{}'), '$.namespace_id') = ?
2400                ORDER BY recorded_at DESC, id DESC
2401                LIMIT ?
2402                "#,
2403            )
2404            .bind(namespace_id)
2405            .bind(limit)
2406            .fetch_all(&self.pool)
2407            .await
2408            .map_err(db_error)?
2409        };
2410        Ok(rows)
2411    }
2412
2413    /// Count active memories for a namespace at one cognitive level.
2414    pub async fn count_by_cognitive_level(
2415        &self,
2416        namespace_id: i64,
2417        level: CognitiveLevel,
2418    ) -> Result<i64> {
2419        let count: i64 = sqlx::query_scalar(
2420            r#"
2421            SELECT COUNT(*) FROM memories
2422            WHERE namespace_id = ?
2423              AND is_active = 1
2424              AND json_extract(COALESCE(metadata, '{}'), '$.cognitive.level') = ?
2425            "#,
2426        )
2427        .bind(namespace_id)
2428        .bind(level.as_str())
2429        .fetch_one(&self.pool)
2430        .await
2431        .map_err(db_error)?;
2432        Ok(count)
2433    }
2434}
2435
2436async fn insert_memory_tx(
2437    tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>,
2438    params: &StoreMemoryParams<'_>,
2439) -> Result<i64> {
2440    let labels_json = serde_json::to_string(params.labels)?;
2441    let metadata_json = serde_json::to_string(params.metadata)?;
2442    let embedding_json = params.embedding.map(serde_json::to_string).transpose()?;
2443
2444    let result = sqlx::query(
2445        r#"
2446        INSERT INTO memories (
2447            namespace_id, content, category, memory_lane_type, labels, metadata,
2448            content_embedding, embedding_model, created_at, is_active, access_count
2449        ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 1, 0)
2450        "#,
2451    )
2452    .bind(params.namespace_id)
2453    .bind(params.content)
2454    .bind(params.category.to_string())
2455    .bind(params.memory_lane_type.map(|t| t.to_string()))
2456    .bind(&labels_json)
2457    .bind(&metadata_json)
2458    .bind(&embedding_json)
2459    .bind(params.embedding_model)
2460    .bind(Utc::now())
2461    .execute(&mut **tx)
2462    .await
2463    .map_err(db_error)?;
2464
2465    let inserted_id = result.last_insert_rowid();
2466    if inserted_id != 0 {
2467        return Ok(inserted_id);
2468    }
2469
2470    let row: Option<MemoryRow> = sqlx::query_as(
2471        "SELECT * FROM memories WHERE namespace_id = ? AND LOWER(TRIM(content)) = LOWER(TRIM(?)) AND is_active = 1 ORDER BY created_at DESC LIMIT 1",
2472    )
2473    .bind(params.namespace_id)
2474    .bind(params.content)
2475    .fetch_optional(&mut **tx)
2476    .await
2477    .map_err(db_error)?;
2478
2479    row.map(|memory| memory.id).ok_or_else(|| {
2480        nexus_core::NexusError::Storage(
2481            "Duplicate merged by trigger but matching row not found".to_string(),
2482        )
2483    })
2484}
2485
2486async fn insert_evidence_tx(
2487    tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>,
2488    derived_memory_id: i64,
2489    source_memory_id: i64,
2490    evidence_role: &str,
2491) -> Result<()> {
2492    sqlx::query(
2493        r#"
2494        INSERT OR IGNORE INTO memory_evidence (derived_memory_id, source_memory_id, evidence_role, created_at)
2495        VALUES (?, ?, ?, datetime('now'))
2496        "#,
2497    )
2498    .bind(derived_memory_id)
2499    .bind(source_memory_id)
2500    .bind(evidence_role)
2501    .execute(&mut **tx)
2502    .await
2503    .map_err(db_error)?;
2504
2505    Ok(())
2506}
2507
2508fn new_claim_token(lease_owner: &str) -> String {
2509    let nanos = Utc::now().timestamp_nanos_opt().unwrap_or_default();
2510    format!("{lease_owner}-{nanos}-{}", std::process::id())
2511}
2512
2513fn merge_labels(existing: &[String], incoming: &[String]) -> Vec<String> {
2514    let mut merged = existing.to_vec();
2515    for label in incoming {
2516        if !merged
2517            .iter()
2518            .any(|current| current.eq_ignore_ascii_case(label))
2519        {
2520            merged.push(label.clone());
2521        }
2522    }
2523    merged
2524}
2525
2526fn merge_duplicate_metadata(
2527    existing: &serde_json::Value,
2528    incoming: &serde_json::Value,
2529) -> serde_json::Value {
2530    let mut merged = existing.clone();
2531
2532    if let Some(session_key) = incoming
2533        .pointer("/cognitive/session_key")
2534        .and_then(serde_json::Value::as_str)
2535    {
2536        let mut session_keys = existing
2537            .pointer("/cognitive/session_keys")
2538            .and_then(serde_json::Value::as_array)
2539            .cloned()
2540            .unwrap_or_default();
2541        if let Some(existing_key) = existing
2542            .pointer("/cognitive/session_key")
2543            .and_then(serde_json::Value::as_str)
2544        {
2545            push_unique_json_string(&mut session_keys, existing_key);
2546        }
2547        push_unique_json_string(&mut session_keys, session_key);
2548        ensure_object_path(&mut merged, "cognitive").insert(
2549            "session_key".to_string(),
2550            serde_json::Value::String(session_key.to_string()),
2551        );
2552        ensure_object_path(&mut merged, "cognitive").insert(
2553            "session_keys".to_string(),
2554            serde_json::Value::Array(session_keys),
2555        );
2556    }
2557
2558    if let Some(derived_session_key) = incoming
2559        .pointer("/source/derived_session_key")
2560        .and_then(serde_json::Value::as_str)
2561    {
2562        let mut derived_keys = existing
2563            .pointer("/source/derived_session_keys")
2564            .and_then(serde_json::Value::as_array)
2565            .cloned()
2566            .unwrap_or_default();
2567        if let Some(existing_key) = existing
2568            .pointer("/source/derived_session_key")
2569            .and_then(serde_json::Value::as_str)
2570        {
2571            push_unique_json_string(&mut derived_keys, existing_key);
2572        }
2573        push_unique_json_string(&mut derived_keys, derived_session_key);
2574        ensure_object_path(&mut merged, "source").insert(
2575            "derived_session_key".to_string(),
2576            serde_json::Value::String(derived_session_key.to_string()),
2577        );
2578        ensure_object_path(&mut merged, "source").insert(
2579            "derived_session_keys".to_string(),
2580            serde_json::Value::Array(derived_keys),
2581        );
2582    }
2583
2584    merged
2585}
2586
2587fn push_unique_json_string(values: &mut Vec<serde_json::Value>, candidate: &str) {
2588    if values
2589        .iter()
2590        .filter_map(serde_json::Value::as_str)
2591        .any(|current| current.eq_ignore_ascii_case(candidate))
2592    {
2593        return;
2594    }
2595    values.push(serde_json::Value::String(candidate.to_string()));
2596}
2597
2598fn ensure_object_path<'a>(
2599    root: &'a mut serde_json::Value,
2600    key: &str,
2601) -> &'a mut serde_json::Map<String, serde_json::Value> {
2602    if !root.is_object() {
2603        *root = serde_json::Value::Object(serde_json::Map::new());
2604    }
2605
2606    let object = root.as_object_mut().expect("root object ensured");
2607    let entry = object
2608        .entry(key.to_string())
2609        .or_insert_with(|| serde_json::Value::Object(serde_json::Map::new()));
2610    if !entry.is_object() {
2611        *entry = serde_json::Value::Object(serde_json::Map::new());
2612    }
2613
2614    entry.as_object_mut().expect("child object ensured")
2615}
2616
2617/// Repository for namespace operations
2618pub struct NamespaceRepository {
2619    pool: SqlitePool,
2620}
2621
2622impl NamespaceRepository {
2623    pub fn new(pool: SqlitePool) -> Self {
2624        Self { pool }
2625    }
2626
2627    /// Get or create a namespace
2628    pub async fn get_or_create(&self, name: &str, agent_type: &str) -> Result<AgentNamespace> {
2629        if let Some(ns) = self.get_by_name(name).await? {
2630            return Ok(ns);
2631        }
2632
2633        let result = sqlx::query(
2634            "INSERT INTO agent_namespaces (name, agent_type, created_at) VALUES (?, ?, ?)",
2635        )
2636        .bind(name)
2637        .bind(agent_type)
2638        .bind(Utc::now())
2639        .execute(&self.pool)
2640        .await
2641        .map_err(db_error)?;
2642
2643        let id = result.last_insert_rowid();
2644        Ok(AgentNamespace {
2645            id,
2646            name: name.to_string(),
2647            description: None,
2648            agent_type: agent_type.to_string(),
2649            created_at: Utc::now(),
2650            updated_at: None,
2651        })
2652    }
2653
2654    /// Get a namespace by name
2655    pub async fn get_by_name(&self, name: &str) -> Result<Option<AgentNamespace>> {
2656        let row: Option<AgentNamespaceRow> =
2657            sqlx::query_as("SELECT * FROM agent_namespaces WHERE name = ?")
2658                .bind(name)
2659                .fetch_optional(&self.pool)
2660                .await
2661                .map_err(db_error)?;
2662
2663        Ok(row.map(|r| AgentNamespace {
2664            id: r.id,
2665            name: r.name,
2666            description: r.description,
2667            agent_type: r.agent_type,
2668            created_at: r.created_at,
2669            updated_at: r.updated_at,
2670        }))
2671    }
2672
2673    /// List all namespaces
2674    pub async fn list_all(&self) -> Result<Vec<AgentNamespace>> {
2675        let rows: Vec<AgentNamespaceRow> =
2676            sqlx::query_as("SELECT * FROM agent_namespaces ORDER BY name")
2677                .fetch_all(&self.pool)
2678                .await
2679                .map_err(db_error)?;
2680
2681        Ok(rows
2682            .into_iter()
2683            .map(|r| AgentNamespace {
2684                id: r.id,
2685                name: r.name,
2686                description: r.description,
2687                agent_type: r.agent_type,
2688                created_at: r.created_at,
2689                updated_at: r.updated_at,
2690            })
2691            .collect())
2692    }
2693}
2694
2695/// Repository for processed file operations (inbox deduplication)
2696pub struct ProcessedFileRepository<'a> {
2697    pub pool: &'a SqlitePool,
2698}
2699
2700impl<'a> ProcessedFileRepository<'a> {
2701    pub fn new(pool: &'a SqlitePool) -> Self {
2702        Self { pool }
2703    }
2704
2705    /// Check if a file has been successfully processed
2706    pub async fn is_processed(&self, namespace_id: i64, path: &str) -> Result<bool> {
2707        let row: Option<(i64,)> =
2708            sqlx::query_as("SELECT id FROM processed_files WHERE namespace_id = ? AND path = ? AND status = 'completed'")
2709                .bind(namespace_id)
2710                .bind(path)
2711                .fetch_optional(self.pool)
2712                .await
2713                .map_err(db_error)?;
2714
2715        Ok(row.is_some())
2716    }
2717
2718    /// Get all completed file paths for a namespace (for batch dedup checks)
2719    pub async fn get_completed_paths(
2720        &self,
2721        namespace_id: i64,
2722    ) -> Result<std::collections::HashSet<String>> {
2723        let rows: Vec<(String,)> = sqlx::query_as(
2724            "SELECT path FROM processed_files WHERE namespace_id = ? AND status = 'completed'",
2725        )
2726        .bind(namespace_id)
2727        .fetch_all(self.pool)
2728        .await
2729        .map_err(db_error)?;
2730
2731        Ok(rows.into_iter().map(|r| r.0).collect())
2732    }
2733
2734    /// Mark a file as being processed
2735    pub async fn mark_processing(
2736        &self,
2737        namespace_id: i64,
2738        path: &str,
2739        content_hash: Option<&str>,
2740    ) -> Result<i64> {
2741        let id: i64 = sqlx::query_scalar(
2742            r#"
2743            INSERT INTO processed_files (namespace_id, path, content_hash, status, updated_at)
2744            VALUES (?, ?, ?, 'processing', datetime('now'))
2745            ON CONFLICT(namespace_id, path) DO UPDATE SET
2746                content_hash = excluded.content_hash,
2747                status = 'processing',
2748                updated_at = datetime('now')
2749            RETURNING id
2750            "#,
2751        )
2752        .bind(namespace_id)
2753        .bind(path)
2754        .bind(content_hash)
2755        .fetch_one(self.pool)
2756        .await
2757        .map_err(db_error)?;
2758
2759        Ok(id)
2760    }
2761
2762    /// Mark a file as successfully processed with memory reference
2763    pub async fn mark_processed(&self, id: i64, memory_id: i64) -> Result<()> {
2764        sqlx::query(
2765            r#"
2766            UPDATE processed_files
2767            SET status = 'completed', memory_id = ?, processed_at = datetime('now'), updated_at = datetime('now')
2768            WHERE id = ?
2769            "#
2770        )
2771        .bind(memory_id)
2772        .bind(id)
2773        .execute(self.pool)
2774        .await
2775        .map_err(db_error)?;
2776
2777        Ok(())
2778    }
2779
2780    /// Mark a file as failed
2781    pub async fn mark_failed(&self, id: i64, error: &str) -> Result<()> {
2782        sqlx::query(
2783            r#"
2784            UPDATE processed_files
2785            SET status = 'failed', last_error = ?, updated_at = datetime('now')
2786            WHERE id = ?
2787            "#,
2788        )
2789        .bind(error)
2790        .bind(id)
2791        .execute(self.pool)
2792        .await
2793        .map_err(db_error)?;
2794
2795        Ok(())
2796    }
2797
2798    /// Get files pending processing
2799    pub async fn get_pending(
2800        &self,
2801        namespace_id: i64,
2802        limit: i32,
2803    ) -> Result<Vec<ProcessedFileRow>> {
2804        let rows = sqlx::query_as::<_, ProcessedFileRow>(
2805            r#"
2806            SELECT * FROM processed_files
2807            WHERE namespace_id = ? AND status = 'pending'
2808            ORDER BY created_at ASC
2809            LIMIT ?
2810            "#,
2811        )
2812        .bind(namespace_id)
2813        .bind(limit)
2814        .fetch_all(self.pool)
2815        .await
2816        .map_err(db_error)?;
2817
2818        Ok(rows)
2819    }
2820
2821    /// Clear all processed files for a namespace
2822    pub async fn clear_namespace(&self, namespace_id: i64) -> Result<u64> {
2823        let result = sqlx::query("DELETE FROM processed_files WHERE namespace_id = ?")
2824            .bind(namespace_id)
2825            .execute(self.pool)
2826            .await
2827            .map_err(db_error)?;
2828
2829        Ok(result.rows_affected())
2830    }
2831}
2832
2833/// Repository for memory relationship operations
2834pub struct MemoryRelationRepository<'a> {
2835    pub pool: &'a SqlitePool,
2836}
2837
2838impl<'a> MemoryRelationRepository<'a> {
2839    pub fn new(pool: &'a SqlitePool) -> Self {
2840        Self { pool }
2841    }
2842
2843    /// Store a relationship between two memories
2844    pub async fn store(
2845        &self,
2846        source_id: i64,
2847        target_id: i64,
2848        relation_type: &str,
2849        strength: f32,
2850    ) -> Result<i64> {
2851        let id: i64 = sqlx::query_scalar(
2852            r#"
2853            INSERT INTO memory_relations (source_memory_id, target_memory_id, relation_type, strength, created_at)
2854            VALUES (?, ?, ?, ?, datetime('now'))
2855            ON CONFLICT(source_memory_id, target_memory_id, relation_type) DO UPDATE SET
2856                strength = excluded.strength,
2857                created_at = datetime('now')
2858            RETURNING id
2859            "#
2860        )
2861        .bind(source_id)
2862        .bind(target_id)
2863        .bind(relation_type)
2864        .bind(strength)
2865        .fetch_one(self.pool)
2866        .await
2867        .map_err(db_error)?;
2868
2869        Ok(id)
2870    }
2871
2872    /// Get all related memories for a given memory
2873    pub async fn get_related(&self, memory_id: i64) -> Result<Vec<(i64, String, f32)>> {
2874        let rows: Vec<(i64, String, f32)> = sqlx::query_as(
2875            r#"
2876            SELECT target_memory_id as memory_id, relation_type, strength
2877            FROM memory_relations
2878            WHERE source_memory_id = ?
2879            UNION
2880            SELECT source_memory_id as memory_id, relation_type, strength
2881            FROM memory_relations
2882            WHERE target_memory_id = ?
2883            ORDER BY strength DESC
2884            "#,
2885        )
2886        .bind(memory_id)
2887        .bind(memory_id)
2888        .fetch_all(self.pool)
2889        .await
2890        .map_err(db_error)?;
2891
2892        Ok(rows)
2893    }
2894
2895    /// Delete all relations for a memory
2896    pub async fn delete_for_memory(&self, memory_id: i64) -> Result<u64> {
2897        let result = sqlx::query(
2898            "DELETE FROM memory_relations WHERE source_memory_id = ? OR target_memory_id = ?",
2899        )
2900        .bind(memory_id)
2901        .bind(memory_id)
2902        .execute(self.pool)
2903        .await
2904        .map_err(db_error)?;
2905
2906        Ok(result.rows_affected())
2907    }
2908}
2909
2910fn parse_category(s: &str) -> Result<Category> {
2911    match MemoryCategory::parse(s) {
2912        Some(cat) => Ok(cat),
2913        None => Err(nexus_core::NexusError::Storage(format!(
2914            "Unknown memory category '{s}' persisted in database; row may be corrupted"
2915        ))),
2916    }
2917}
2918
2919fn parse_memory_lane_type(s: &str) -> Result<Option<MemoryLaneType>> {
2920    match MemoryLaneType::parse(s) {
2921        Some(t) => Ok(Some(t)),
2922        None => Err(nexus_core::NexusError::Storage(format!(
2923            "Unknown memory_lane_type '{s}' persisted in database; row may be corrupted"
2924        ))),
2925    }
2926}
2927
2928#[cfg(test)]
2929mod tests {
2930    use super::*;
2931    use nexus_core::MemoryLanePriorityType;
2932    use sqlx::sqlite::SqlitePoolOptions;
2933
2934    fn cognitive_metadata(
2935        level: CognitiveLevel,
2936        perspective: &PerspectiveKey,
2937        times_reinforced: i64,
2938        times_contradicted: i64,
2939    ) -> serde_json::Value {
2940        serde_json::json!({
2941            "cognitive": {
2942                "level": level.as_str(),
2943                "observer": perspective.observer,
2944                "subject": perspective.subject,
2945                "session_key": perspective.session_key,
2946                "source_memory_ids": [],
2947                "confidence": 0.9,
2948                "times_reinforced": times_reinforced,
2949                "times_contradicted": times_contradicted,
2950                "generated_by": "test",
2951            }
2952        })
2953    }
2954
2955    #[test]
2956    fn test_parse_category() {
2957        assert!(matches!(parse_category("facts"), Ok(Category::Facts)));
2958        assert!(matches!(
2959            parse_category("preferences"),
2960            Ok(Category::Preferences)
2961        ));
2962        assert!(parse_category("unknown").is_err());
2963    }
2964
2965    #[test]
2966    fn test_parse_memory_lane_type() {
2967        let correction = parse_memory_lane_type("correction");
2968        assert!(matches!(
2969            correction,
2970            Ok(Some(MemoryLaneType::Priority(
2971                MemoryLanePriorityType::Correction
2972            )))
2973        ));
2974
2975        let pattern_seed = parse_memory_lane_type("pattern_seed");
2976        assert!(matches!(
2977            pattern_seed,
2978            Ok(Some(MemoryLaneType::Priority(
2979                MemoryLanePriorityType::PatternSeed
2980            )))
2981        ));
2982
2983        assert!(parse_memory_lane_type("unknown").is_err());
2984    }
2985
2986    #[test]
2987    fn test_parse_category_all_variants() {
2988        assert!(matches!(parse_category("general"), Ok(Category::General)));
2989        assert!(matches!(parse_category("session"), Ok(Category::Session)));
2990        assert!(matches!(parse_category("context"), Ok(Category::Context)));
2991        assert!(matches!(
2992            parse_category("specifications"),
2993            Ok(Category::Specifications)
2994        ));
2995        assert!(matches!(parse_category("facts"), Ok(Category::Facts)));
2996        assert!(matches!(
2997            parse_category("preferences"),
2998            Ok(Category::Preferences)
2999        ));
3000        // Unknown values are rejected (fail-closed)
3001        assert!(parse_category("bogus").is_err());
3002        assert!(parse_category("").is_err());
3003    }
3004
3005    #[test]
3006    fn test_store_memory_params_fields() {
3007        // Verify StoreMemoryParams can be constructed with all fields
3008        let params = StoreMemoryParams {
3009            namespace_id: 1,
3010            content: "test content",
3011            category: &Category::General,
3012            memory_lane_type: None,
3013            labels: &[],
3014            metadata: &serde_json::Value::Null,
3015            embedding: None,
3016            embedding_model: None,
3017        };
3018        assert_eq!(params.namespace_id, 1);
3019        assert_eq!(params.content, "test content");
3020        assert!(params.labels.is_empty());
3021    }
3022
3023    #[test]
3024    fn test_merge_duplicate_metadata_preserves_multiple_session_keys() {
3025        let existing = serde_json::json!({
3026            "cognitive": {
3027                "session_key": "session-a"
3028            },
3029            "source": {
3030                "derived_session_key": "session-a"
3031            }
3032        });
3033        let incoming = serde_json::json!({
3034            "cognitive": {
3035                "session_key": "session-b"
3036            },
3037            "source": {
3038                "derived_session_key": "session-b"
3039            }
3040        });
3041
3042        let merged = merge_duplicate_metadata(&existing, &incoming);
3043        assert_eq!(merged["cognitive"]["session_key"], "session-b");
3044        assert_eq!(
3045            merged["cognitive"]["session_keys"],
3046            serde_json::json!(["session-a", "session-b"])
3047        );
3048        assert_eq!(
3049            merged["source"]["derived_session_keys"],
3050            serde_json::json!(["session-a", "session-b"])
3051        );
3052    }
3053
3054    // ---- Phase 2: Job, Digest, Evidence integration tests ----
3055
3056    async fn setup_test_db() -> SqlitePool {
3057        let pool = SqlitePoolOptions::new()
3058            .max_connections(1)
3059            .connect("sqlite::memory:")
3060            .await
3061            .unwrap();
3062        crate::migrations::run_migrations(&pool).await.unwrap();
3063        pool
3064    }
3065
3066    async fn create_namespace(pool: &SqlitePool, name: &str) -> i64 {
3067        let ns = NamespaceRepository::new(pool.clone());
3068        ns.get_or_create(name, "test").await.unwrap();
3069        ns.get_by_name(name).await.unwrap().unwrap().id
3070    }
3071
3072    // ---- Regression tests for audit findings ----
3073
3074    /// Audit finding #1: `get_by_content` must match `namespace_id + content`,
3075    /// not "latest active row". This prevents the duplicate-insert fallback
3076    /// in `store` from returning the wrong memory.
3077    #[tokio::test]
3078    async fn test_get_by_content_matches_actual_content() {
3079        let pool = setup_test_db().await;
3080        let ns_id = create_namespace(&pool, "test-agent").await;
3081        let repo = MemoryRepository::new(pool);
3082
3083        // Store two different memories in the same namespace.
3084        let mem_a = repo
3085            .store(StoreMemoryParams {
3086                namespace_id: ns_id,
3087                content: "first memory content",
3088                category: &Category::General,
3089                memory_lane_type: None,
3090                labels: &[],
3091                metadata: &serde_json::Value::Null,
3092                embedding: None,
3093                embedding_model: None,
3094            })
3095            .await
3096            .unwrap();
3097
3098        let mem_b = repo
3099            .store(StoreMemoryParams {
3100                namespace_id: ns_id,
3101                content: "second memory content",
3102                category: &Category::General,
3103                memory_lane_type: None,
3104                labels: &[],
3105                metadata: &serde_json::Value::Null,
3106                embedding: None,
3107                embedding_model: None,
3108            })
3109            .await
3110            .unwrap();
3111
3112        assert_ne!(mem_a.id, mem_b.id);
3113
3114        // get_by_content must return the memory with matching content,
3115        // NOT the newest one.
3116        let found_a = repo
3117            .get_by_content(ns_id, "first memory content")
3118            .await
3119            .unwrap();
3120        assert_eq!(found_a.id, mem_a.id);
3121        assert_eq!(found_a.content, "first memory content");
3122
3123        let found_b = repo
3124            .get_by_content(ns_id, "second memory content")
3125            .await
3126            .unwrap();
3127        assert_eq!(found_b.id, mem_b.id);
3128        assert_eq!(found_b.content, "second memory content");
3129
3130        // Non-existent content must error.
3131        let result = repo.get_by_content(ns_id, "nonexistent").await;
3132        assert!(result.is_err());
3133    }
3134
3135    #[tokio::test]
3136    async fn test_enqueue_and_claim_jobs() {
3137        let pool = setup_test_db().await;
3138        let ns_id = create_namespace(&pool, "test-agent").await;
3139        let repo = MemoryRepository::new(pool);
3140
3141        // Enqueue two jobs with different priorities.
3142        let id1 = repo
3143            .enqueue_job(EnqueueJobParams {
3144                namespace_id: ns_id,
3145                job_type: "derive_memory",
3146                priority: 100,
3147                perspective: None,
3148                payload: &serde_json::json!({"memory_id": 1}),
3149            })
3150            .await
3151            .unwrap();
3152
3153        let id2 = repo
3154            .enqueue_job(EnqueueJobParams {
3155                namespace_id: ns_id,
3156                job_type: "derive_memory",
3157                priority: 50,
3158                perspective: None,
3159                payload: &serde_json::json!({"memory_id": 2}),
3160            })
3161            .await
3162            .unwrap();
3163
3164        assert!(id1 > 0);
3165        assert!(id2 > 0);
3166        assert_ne!(id1, id2);
3167
3168        // Claim one job — higher priority first.
3169        let claimed = repo
3170            .claim_jobs(ns_id, "derive_memory", "worker-1", 120, 1)
3171            .await
3172            .unwrap();
3173
3174        assert_eq!(claimed.len(), 1);
3175        assert_eq!(claimed[0].row.id, id1); // Higher priority claimed first
3176        assert_eq!(claimed[0].row.status, "running");
3177        assert_eq!(claimed[0].payload["memory_id"], 1);
3178
3179        // Second claim should get the lower-priority job.
3180        let claimed2 = repo
3181            .claim_jobs(ns_id, "derive_memory", "worker-2", 120, 1)
3182            .await
3183            .unwrap();
3184
3185        assert_eq!(claimed2.len(), 1);
3186        assert_eq!(claimed2[0].row.id, id2);
3187    }
3188
3189    #[tokio::test]
3190    async fn test_complete_and_fail_job() {
3191        let pool = setup_test_db().await;
3192        let ns_id = create_namespace(&pool, "test-agent").await;
3193        let repo = MemoryRepository::new(pool);
3194
3195        let _id = repo
3196            .enqueue_job(EnqueueJobParams {
3197                namespace_id: ns_id,
3198                job_type: "digest_session",
3199                priority: 100,
3200                perspective: None,
3201                payload: &serde_json::json!({"session": "s1"}),
3202            })
3203            .await
3204            .unwrap();
3205
3206        // Claim then complete.
3207        let claimed = repo
3208            .claim_jobs(ns_id, "digest_session", "w", 60, 10)
3209            .await
3210            .unwrap();
3211        assert_eq!(claimed.len(), 1);
3212
3213        repo.complete_job(&claimed[0]).await.unwrap();
3214
3215        // Verify completed status by attempting to claim — should be empty.
3216        let claimed_again = repo
3217            .claim_jobs(ns_id, "digest_session", "w", 60, 10)
3218            .await
3219            .unwrap();
3220        assert!(claimed_again.is_empty());
3221    }
3222
3223    #[tokio::test]
3224    async fn test_fail_job_requeues_before_limit() {
3225        let pool = setup_test_db().await;
3226        let ns_id = create_namespace(&pool, "test-agent").await;
3227        let repo = MemoryRepository::new(pool);
3228
3229        let _id = repo
3230            .enqueue_job(EnqueueJobParams {
3231                namespace_id: ns_id,
3232                job_type: "derive_memory",
3233                priority: 100,
3234                perspective: None,
3235                payload: &serde_json::json!({"test": true}),
3236            })
3237            .await
3238            .unwrap();
3239
3240        // Claim, fail, re-claim (should work).
3241        let claimed = repo
3242            .claim_jobs(ns_id, "derive_memory", "w", 60, 10)
3243            .await
3244            .unwrap();
3245        repo.fail_job(&claimed[0], "transient error").await.unwrap();
3246
3247        let reclaimed = repo
3248            .claim_jobs(ns_id, "derive_memory", "w", 60, 10)
3249            .await
3250            .unwrap();
3251        assert_eq!(reclaimed.len(), 1);
3252        assert_eq!(reclaimed[0].row.attempts, 2);
3253    }
3254
3255    #[tokio::test]
3256    async fn test_complete_job_requires_matching_claim_token() {
3257        let pool = setup_test_db().await;
3258        let ns_id = create_namespace(&pool, "test-agent").await;
3259        let repo = MemoryRepository::new(pool);
3260
3261        repo.enqueue_job(EnqueueJobParams {
3262            namespace_id: ns_id,
3263            job_type: "derive_memory",
3264            priority: 100,
3265            perspective: None,
3266            payload: &serde_json::json!({"memory_id": 7}),
3267        })
3268        .await
3269        .unwrap();
3270
3271        let claimed = repo
3272            .claim_jobs(ns_id, "derive_memory", "worker-1", 60, 1)
3273            .await
3274            .unwrap();
3275        let mut forged = claimed[0].clone();
3276        forged.row.claim_token = Some("forged-token".to_string());
3277
3278        let error = repo.complete_job(&forged).await.unwrap_err();
3279        assert!(error.to_string().contains("lost lease ownership"));
3280    }
3281
3282    #[tokio::test]
3283    async fn test_store_digest_and_latest_digest() {
3284        let pool = setup_test_db().await;
3285        let ns_id = create_namespace(&pool, "test-agent").await;
3286        let repo = MemoryRepository::new(pool);
3287
3288        // Store a memory that will serve as the digest content.
3289        let digest_memory = repo
3290            .store(StoreMemoryParams {
3291                namespace_id: ns_id,
3292                content: "session summary short",
3293                category: &Category::Session,
3294                memory_lane_type: None,
3295                labels: &[],
3296                metadata: &serde_json::json!({"cognitive": {"level": "summary_short"}}),
3297                embedding: None,
3298                embedding_model: None,
3299            })
3300            .await
3301            .unwrap();
3302
3303        let digest_id = repo
3304            .store_digest(StoreDigestParams {
3305                namespace_id: ns_id,
3306                session_key: "session-abc",
3307                digest_kind: "short",
3308                memory_id: digest_memory.id,
3309                start_memory_id: Some(1),
3310                end_memory_id: Some(100),
3311                token_count: 42,
3312            })
3313            .await
3314            .unwrap();
3315
3316        assert!(digest_id > 0);
3317
3318        // Retrieve latest digest.
3319        let result = repo
3320            .latest_digest_for_session(ns_id, "session-abc", "short")
3321            .await
3322            .unwrap();
3323
3324        assert!(result.is_some());
3325        assert_eq!(result.as_ref().unwrap().id, digest_memory.id);
3326
3327        let replacement_memory = repo
3328            .store(StoreMemoryParams {
3329                namespace_id: ns_id,
3330                content: "session summary short updated",
3331                category: &Category::Session,
3332                memory_lane_type: None,
3333                labels: &[],
3334                metadata: &serde_json::json!({"cognitive": {"level": "summary_short"}}),
3335                embedding: None,
3336                embedding_model: None,
3337            })
3338            .await
3339            .unwrap();
3340
3341        let replacement_digest_id = repo
3342            .store_digest(StoreDigestParams {
3343                namespace_id: ns_id,
3344                session_key: "session-abc",
3345                digest_kind: "short",
3346                memory_id: replacement_memory.id,
3347                start_memory_id: Some(1),
3348                end_memory_id: Some(100),
3349                token_count: 64,
3350            })
3351            .await
3352            .unwrap();
3353
3354        assert_eq!(replacement_digest_id, digest_id);
3355
3356        let updated = repo
3357            .latest_digest_for_session(ns_id, "session-abc", "short")
3358            .await
3359            .unwrap()
3360            .unwrap();
3361        assert_eq!(updated.id, replacement_memory.id);
3362
3363        let latest_for_namespace = repo
3364            .latest_digest_for_namespace(ns_id, "short")
3365            .await
3366            .unwrap()
3367            .unwrap();
3368        assert_eq!(latest_for_namespace.id, replacement_memory.id);
3369    }
3370
3371    #[tokio::test]
3372    async fn test_session_digest_rollover_reports_new_signal_since_last_digest() {
3373        let pool = setup_test_db().await;
3374        let ns_id = create_namespace(&pool, "test-agent").await;
3375        let repo = MemoryRepository::new(pool);
3376
3377        let source = repo
3378            .store(StoreMemoryParams {
3379                namespace_id: ns_id,
3380                content: "Implemented bounded digest rollover policy.",
3381                category: &Category::Session,
3382                memory_lane_type: None,
3383                labels: &[],
3384                metadata: &serde_json::json!({
3385                    "cognitive": {
3386                        "level": "explicit",
3387                        "observer": "claude-code",
3388                        "subject": "claude-code",
3389                        "session_key": "session-rollover"
3390                    }
3391                }),
3392                embedding: None,
3393                embedding_model: None,
3394            })
3395            .await
3396            .unwrap();
3397
3398        let first = repo
3399            .session_digest_rollover(ns_id, "session-rollover")
3400            .await
3401            .unwrap();
3402        assert_eq!(first.last_digest_end_memory_id, None);
3403        assert_eq!(first.new_memory_count, 1);
3404        assert!(first.estimated_new_tokens > 0);
3405
3406        let digest_memory = repo
3407            .store(StoreMemoryParams {
3408                namespace_id: ns_id,
3409                content: "Short digest",
3410                category: &Category::Session,
3411                memory_lane_type: None,
3412                labels: &[],
3413                metadata: &serde_json::json!({
3414                    "cognitive": {
3415                        "level": "summary_short",
3416                        "observer": "claude-code",
3417                        "subject": "claude-code",
3418                        "session_key": "session-rollover"
3419                    }
3420                }),
3421                embedding: None,
3422                embedding_model: None,
3423            })
3424            .await
3425            .unwrap();
3426
3427        repo.store_digest(StoreDigestParams {
3428            namespace_id: ns_id,
3429            session_key: "session-rollover",
3430            digest_kind: "short",
3431            memory_id: digest_memory.id,
3432            start_memory_id: Some(source.id),
3433            end_memory_id: Some(source.id),
3434            token_count: 16,
3435        })
3436        .await
3437        .unwrap();
3438
3439        let covered = repo
3440            .session_digest_rollover(ns_id, "session-rollover")
3441            .await
3442            .unwrap();
3443        assert_eq!(covered.last_digest_end_memory_id, Some(source.id));
3444        assert_eq!(covered.new_memory_count, 0);
3445        assert_eq!(covered.estimated_new_tokens, 0);
3446
3447        repo.store(StoreMemoryParams {
3448            namespace_id: ns_id,
3449            content: "Added one more explicit memory after the digest coverage window.",
3450            category: &Category::Session,
3451            memory_lane_type: None,
3452            labels: &[],
3453            metadata: &serde_json::json!({
3454                "cognitive": {
3455                    "level": "explicit",
3456                    "observer": "claude-code",
3457                    "subject": "claude-code",
3458                    "session_key": "session-rollover"
3459                }
3460            }),
3461            embedding: None,
3462            embedding_model: None,
3463        })
3464        .await
3465        .unwrap();
3466
3467        let second = repo
3468            .session_digest_rollover(ns_id, "session-rollover")
3469            .await
3470            .unwrap();
3471        assert_eq!(second.last_digest_end_memory_id, Some(source.id));
3472        assert_eq!(second.new_memory_count, 1);
3473        assert!(second.estimated_new_tokens > 0);
3474    }
3475
3476    #[tokio::test]
3477    async fn test_store_with_lineage() {
3478        let pool = setup_test_db().await;
3479        let ns_id = create_namespace(&pool, "test-agent").await;
3480        let repo = MemoryRepository::new(pool);
3481
3482        // Store two source memories.
3483        let source1 = repo
3484            .store(StoreMemoryParams {
3485                namespace_id: ns_id,
3486                content: "raw observation one",
3487                category: &Category::Facts,
3488                memory_lane_type: None,
3489                labels: &[],
3490                metadata: &serde_json::Value::Null,
3491                embedding: None,
3492                embedding_model: None,
3493            })
3494            .await
3495            .unwrap();
3496
3497        let source2 = repo
3498            .store(StoreMemoryParams {
3499                namespace_id: ns_id,
3500                content: "raw observation two",
3501                category: &Category::Facts,
3502                memory_lane_type: None,
3503                labels: &[],
3504                metadata: &serde_json::Value::Null,
3505                embedding: None,
3506                embedding_model: None,
3507            })
3508            .await
3509            .unwrap();
3510
3511        // Store derived with lineage.
3512        let derived = repo
3513            .store_with_lineage(StoreMemoryWithLineageParams {
3514                store: StoreMemoryParams {
3515                    namespace_id: ns_id,
3516                    content: "derived insight",
3517                    category: &Category::Facts,
3518                    memory_lane_type: None,
3519                    labels: &[],
3520                    metadata: &serde_json::json!({"cognitive": {"level": "derived"}}),
3521                    embedding: None,
3522                    embedding_model: None,
3523                },
3524                source_memory_ids: &[source1.id, source2.id],
3525                evidence_role: "derived_from",
3526            })
3527            .await
3528            .unwrap();
3529
3530        assert_eq!(derived.content, "derived insight");
3531
3532        // Load lineage for the derived memory.
3533        let lineage = repo.load_lineage(derived.id).await.unwrap();
3534        assert_eq!(lineage.len(), 2);
3535        assert!(lineage.iter().any(|e| e.source_memory_id == source1.id));
3536        assert!(lineage.iter().any(|e| e.source_memory_id == source2.id));
3537    }
3538
3539    #[tokio::test]
3540    async fn test_cognitive_queries_by_level_and_perspective() {
3541        let pool = setup_test_db().await;
3542        let ns_id = create_namespace(&pool, "test-agent").await;
3543        let repo = MemoryRepository::new(pool);
3544        let perspective =
3545            PerspectiveKey::new("claude-code", "claude-code", Some("session-1".into()));
3546
3547        let _raw = repo
3548            .store(StoreMemoryParams {
3549                namespace_id: ns_id,
3550                content: "raw note",
3551                category: &Category::Session,
3552                memory_lane_type: None,
3553                labels: &[],
3554                metadata: &cognitive_metadata(CognitiveLevel::Raw, &perspective, 0, 0),
3555                embedding: None,
3556                embedding_model: None,
3557            })
3558            .await
3559            .unwrap();
3560
3561        let explicit = repo
3562            .store(StoreMemoryParams {
3563                namespace_id: ns_id,
3564                content: "explicit note",
3565                category: &Category::Session,
3566                memory_lane_type: None,
3567                labels: &[],
3568                metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective, 1, 0),
3569                embedding: None,
3570                embedding_model: None,
3571            })
3572            .await
3573            .unwrap();
3574
3575        let derived = repo
3576            .store(StoreMemoryParams {
3577                namespace_id: ns_id,
3578                content: "reinforced insight",
3579                category: &Category::Facts,
3580                memory_lane_type: None,
3581                labels: &[],
3582                metadata: &cognitive_metadata(CognitiveLevel::Derived, &perspective, 7, 0),
3583                embedding: None,
3584                embedding_model: None,
3585            })
3586            .await
3587            .unwrap();
3588
3589        let contradiction = repo
3590            .store(StoreMemoryParams {
3591                namespace_id: ns_id,
3592                content: "contradiction note",
3593                category: &Category::Facts,
3594                memory_lane_type: None,
3595                labels: &[],
3596                metadata: &cognitive_metadata(CognitiveLevel::Contradiction, &perspective, 1, 5),
3597                embedding: None,
3598                embedding_model: None,
3599            })
3600            .await
3601            .unwrap();
3602
3603        let explicit_rows = repo
3604            .get_by_cognitive_level(ns_id, CognitiveLevel::Explicit, 10)
3605            .await
3606            .unwrap();
3607        assert_eq!(explicit_rows.len(), 1);
3608        assert_eq!(explicit_rows[0].id, explicit.id);
3609
3610        let recent = repo
3611            .get_recent_by_perspective(ns_id, &perspective, 10)
3612            .await
3613            .unwrap();
3614        assert_eq!(recent.len(), 4);
3615
3616        let reinforced = repo
3617            .get_most_reinforced_by_perspective(ns_id, &perspective, 10)
3618            .await
3619            .unwrap();
3620        assert_eq!(reinforced[0].id, derived.id);
3621        assert!(reinforced
3622            .iter()
3623            .all(|memory| memory.id != contradiction.id));
3624
3625        let contradictions = repo
3626            .get_contradictions_by_perspective(ns_id, &perspective, 10)
3627            .await
3628            .unwrap();
3629        assert_eq!(contradictions.len(), 1);
3630        assert_eq!(contradictions[0].id, contradiction.id);
3631    }
3632
3633    #[tokio::test]
3634    async fn test_store_distilled_summary_archives_sources_and_records_lineage() {
3635        let pool = setup_test_db().await;
3636        let ns_id = create_namespace(&pool, "test-agent").await;
3637        let repo = MemoryRepository::new(pool);
3638
3639        let source1 = repo
3640            .store(StoreMemoryParams {
3641                namespace_id: ns_id,
3642                content: "raw event 1",
3643                category: &Category::Session,
3644                memory_lane_type: None,
3645                labels: &["raw-activity".to_string()],
3646                metadata: &serde_json::json!({"raw_activity": true}),
3647                embedding: None,
3648                embedding_model: None,
3649            })
3650            .await
3651            .unwrap();
3652
3653        let source2 = repo
3654            .store(StoreMemoryParams {
3655                namespace_id: ns_id,
3656                content: "raw event 2",
3657                category: &Category::Session,
3658                memory_lane_type: None,
3659                labels: &["raw-activity".to_string()],
3660                metadata: &serde_json::json!({"raw_activity": true}),
3661                embedding: None,
3662                embedding_model: None,
3663            })
3664            .await
3665            .unwrap();
3666
3667        let summary = repo
3668            .store_distilled_summary(
3669                StoreMemoryParams {
3670                    namespace_id: ns_id,
3671                    content: "distilled summary",
3672                    category: &Category::Session,
3673                    memory_lane_type: None,
3674                    labels: &["activity-summary".to_string()],
3675                    metadata: &serde_json::json!({"pipeline": "distill-v1"}),
3676                    embedding: None,
3677                    embedding_model: None,
3678                },
3679                &[source1.id, source2.id],
3680            )
3681            .await
3682            .unwrap();
3683
3684        let source1_after = repo.get_by_id(source1.id).await.unwrap().unwrap();
3685        let source2_after = repo.get_by_id(source2.id).await.unwrap().unwrap();
3686        assert!(!source1_after.is_active);
3687        assert!(source1_after.is_archived);
3688        assert!(!source2_after.is_active);
3689        assert!(source2_after.is_archived);
3690
3691        let lineage = repo.load_lineage(summary.id).await.unwrap();
3692        assert_eq!(lineage.len(), 2);
3693        assert!(lineage.iter().all(|entry| entry.evidence_role == "source"));
3694    }
3695
3696    #[tokio::test]
3697    async fn test_load_lineage_empty() {
3698        let pool = setup_test_db().await;
3699        let _ns_id = create_namespace(&pool, "test-agent").await;
3700        let repo = MemoryRepository::new(pool);
3701
3702        let lineage = repo.load_lineage(9999).await.unwrap();
3703        assert!(lineage.is_empty());
3704    }
3705
3706    // ---- Raw-noise exclusion tests ----
3707
3708    #[tokio::test]
3709    async fn test_recent_perspective_excludes_raw_noise() {
3710        let pool = setup_test_db().await;
3711        let ns_id = create_namespace(&pool, "test-agent").await;
3712        let repo = MemoryRepository::new(pool);
3713        let perspective = PerspectiveKey::new("claude-code", "claude-code", Some("s1".into()));
3714
3715        // Store a clean memory.
3716        repo.store(StoreMemoryParams {
3717            namespace_id: ns_id,
3718            content: "clean observation",
3719            category: &Category::Facts,
3720            memory_lane_type: None,
3721            labels: &[],
3722            metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective, 0, 0),
3723            embedding: None,
3724            embedding_model: None,
3725        })
3726        .await
3727        .unwrap();
3728
3729        // Store a raw-activity noise memory (both label and metadata markers).
3730        repo.store(StoreMemoryParams {
3731            namespace_id: ns_id,
3732            content: "raw noise payload",
3733            category: &Category::Session,
3734            memory_lane_type: None,
3735            labels: &["raw-activity".to_string()],
3736            metadata: &serde_json::json!({
3737                "raw_activity": true,
3738                "cognitive": {
3739                    "level": "raw",
3740                    "observer": perspective.observer,
3741                    "subject": perspective.subject,
3742                    "session_key": perspective.session_key,
3743                    "source_memory_ids": [],
3744                    "confidence": 0.5,
3745                    "times_reinforced": 0,
3746                    "times_contradicted": 0,
3747                    "generated_by": "test"
3748                }
3749            }),
3750            embedding: None,
3751            embedding_model: None,
3752        })
3753        .await
3754        .unwrap();
3755
3756        // Default query should exclude the noise.
3757        let recent = repo
3758            .get_recent_by_perspective(ns_id, &perspective, 10)
3759            .await
3760            .unwrap();
3761        assert_eq!(recent.len(), 1);
3762        assert_eq!(recent[0].content, "clean observation");
3763
3764        // With include_raw, both should appear.
3765        let recent_all = repo
3766            .get_recent_by_perspective_opts(ns_id, &perspective, 10, true)
3767            .await
3768            .unwrap();
3769        assert_eq!(recent_all.len(), 2);
3770    }
3771
3772    #[tokio::test]
3773    async fn test_semantic_candidates_respect_perspective_and_raw_noise_filtering() {
3774        let pool = setup_test_db().await;
3775        let ns_id = create_namespace(&pool, "test-agent").await;
3776        let repo = MemoryRepository::new(pool);
3777        let perspective = PerspectiveKey::new("claude-code", "claude-code", Some("s1".into()));
3778
3779        repo.store(StoreMemoryParams {
3780            namespace_id: ns_id,
3781            content: "clean semantic observation",
3782            category: &Category::Facts,
3783            memory_lane_type: None,
3784            labels: &[],
3785            metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective, 0, 0),
3786            embedding: Some(&[0.1_f32; 384]),
3787            embedding_model: Some("mock"),
3788        })
3789        .await
3790        .unwrap();
3791
3792        repo.store(StoreMemoryParams {
3793            namespace_id: ns_id,
3794            content: "raw semantic noise",
3795            category: &Category::Session,
3796            memory_lane_type: None,
3797            labels: &["raw-activity".to_string()],
3798            metadata: &serde_json::json!({
3799                "raw_activity": true,
3800                "cognitive": {
3801                    "level": "raw",
3802                    "observer": "claude-code",
3803                    "subject": "claude-code",
3804                    "session_key": "s1",
3805                    "generated_by": "test"
3806                }
3807            }),
3808            embedding: Some(&[0.2_f32; 384]),
3809            embedding_model: Some("mock"),
3810        })
3811        .await
3812        .unwrap();
3813
3814        repo.store(StoreMemoryParams {
3815            namespace_id: ns_id,
3816            content: "other perspective semantic",
3817            category: &Category::Facts,
3818            memory_lane_type: None,
3819            labels: &[],
3820            metadata: &serde_json::json!({
3821                "cognitive": {
3822                    "level": "explicit",
3823                    "observer": "codex",
3824                    "subject": "codex",
3825                    "session_key": "s1",
3826                    "generated_by": "test"
3827                }
3828            }),
3829            embedding: Some(&[0.3_f32; 384]),
3830            embedding_model: Some("mock"),
3831        })
3832        .await
3833        .unwrap();
3834
3835        let candidates = repo
3836            .get_semantic_candidates(SemanticCandidateParams {
3837                namespace_id: ns_id,
3838                perspective: Some(&perspective),
3839                limit: 10,
3840                include_raw: false,
3841            })
3842            .await
3843            .unwrap();
3844
3845        assert_eq!(candidates.len(), 1);
3846        assert_eq!(candidates[0].content, "clean semantic observation");
3847    }
3848
3849    #[tokio::test]
3850    async fn test_semantic_candidates_match_session_keys_array() {
3851        let pool = setup_test_db().await;
3852        let ns_id = create_namespace(&pool, "test-agent").await;
3853        let repo = MemoryRepository::new(pool);
3854        let perspective = PerspectiveKey::new("claude-code", "claude-code", Some("s-array".into()));
3855
3856        repo.store(StoreMemoryParams {
3857            namespace_id: ns_id,
3858            content: "session array semantic observation",
3859            category: &Category::Facts,
3860            memory_lane_type: None,
3861            labels: &[],
3862            metadata: &serde_json::json!({
3863                "cognitive": {
3864                    "level": "explicit",
3865                    "observer": "claude-code",
3866                    "subject": "claude-code",
3867                    "session_keys": ["s-array", "s-other"],
3868                    "generated_by": "test"
3869                }
3870            }),
3871            embedding: Some(&[0.4_f32; 384]),
3872            embedding_model: Some("mock"),
3873        })
3874        .await
3875        .unwrap();
3876
3877        let candidates = repo
3878            .get_semantic_candidates(SemanticCandidateParams {
3879                namespace_id: ns_id,
3880                perspective: Some(&perspective),
3881                limit: 10,
3882                include_raw: false,
3883            })
3884            .await
3885            .unwrap();
3886
3887        assert_eq!(candidates.len(), 1);
3888        assert_eq!(candidates[0].content, "session array semantic observation");
3889    }
3890
3891    #[tokio::test]
3892    async fn test_reinforced_perspective_excludes_raw_noise() {
3893        let pool = setup_test_db().await;
3894        let ns_id = create_namespace(&pool, "test-agent").await;
3895        let repo = MemoryRepository::new(pool);
3896        let perspective = PerspectiveKey::new("claude-code", "claude-code", Some("s1".into()));
3897
3898        repo.store(StoreMemoryParams {
3899            namespace_id: ns_id,
3900            content: "reinforced insight",
3901            category: &Category::Facts,
3902            memory_lane_type: None,
3903            labels: &[],
3904            metadata: &cognitive_metadata(CognitiveLevel::Derived, &perspective, 5, 0),
3905            embedding: None,
3906            embedding_model: None,
3907        })
3908        .await
3909        .unwrap();
3910
3911        repo.store(StoreMemoryParams {
3912            namespace_id: ns_id,
3913            content: "raw noise",
3914            category: &Category::Session,
3915            memory_lane_type: None,
3916            labels: &["raw-activity".to_string()],
3917            metadata: &serde_json::json!({
3918                "raw_activity": true,
3919                "cognitive": {
3920                    "level": "raw",
3921                    "observer": perspective.observer,
3922                    "subject": perspective.subject,
3923                    "session_key": perspective.session_key,
3924                    "source_memory_ids": [],
3925                    "confidence": 0.5,
3926                    "times_reinforced": 0,
3927                    "times_contradicted": 0,
3928                    "generated_by": "test"
3929                }
3930            }),
3931            embedding: None,
3932            embedding_model: None,
3933        })
3934        .await
3935        .unwrap();
3936
3937        let reinforced = repo
3938            .get_most_reinforced_by_perspective(ns_id, &perspective, 10)
3939            .await
3940            .unwrap();
3941        assert_eq!(reinforced.len(), 1);
3942        assert_eq!(reinforced[0].content, "reinforced insight");
3943    }
3944
3945    #[tokio::test]
3946    async fn test_contradictions_perspective_excludes_raw_noise() {
3947        let pool = setup_test_db().await;
3948        let ns_id = create_namespace(&pool, "test-agent").await;
3949        let repo = MemoryRepository::new(pool);
3950        let perspective = PerspectiveKey::new("claude-code", "claude-code", Some("s1".into()));
3951
3952        repo.store(StoreMemoryParams {
3953            namespace_id: ns_id,
3954            content: "a real contradiction",
3955            category: &Category::Facts,
3956            memory_lane_type: None,
3957            labels: &[],
3958            metadata: &cognitive_metadata(CognitiveLevel::Contradiction, &perspective, 0, 3),
3959            embedding: None,
3960            embedding_model: None,
3961        })
3962        .await
3963        .unwrap();
3964
3965        repo.store(StoreMemoryParams {
3966            namespace_id: ns_id,
3967            content: "raw noise",
3968            category: &Category::Session,
3969            memory_lane_type: None,
3970            labels: &["raw-activity".to_string()],
3971            metadata: &serde_json::json!({
3972                "raw_activity": true,
3973                "cognitive": {
3974                    "level": "raw",
3975                    "observer": perspective.observer,
3976                    "subject": perspective.subject,
3977                    "session_key": perspective.session_key,
3978                    "source_memory_ids": [],
3979                    "confidence": 0.5,
3980                    "times_reinforced": 0,
3981                    "times_contradicted": 0,
3982                    "generated_by": "test"
3983                }
3984            }),
3985            embedding: None,
3986            embedding_model: None,
3987        })
3988        .await
3989        .unwrap();
3990
3991        let contradictions = repo
3992            .get_contradictions_by_perspective(ns_id, &perspective, 10)
3993            .await
3994            .unwrap();
3995        assert_eq!(contradictions.len(), 1);
3996        assert_eq!(contradictions[0].content, "a real contradiction");
3997    }
3998
3999    // ---- search_working_set tests ----
4000
4001    #[tokio::test]
4002    async fn test_search_working_set_basic() {
4003        let pool = setup_test_db().await;
4004        let ns_id = create_namespace(&pool, "test-agent").await;
4005        let repo = MemoryRepository::new(pool);
4006        let perspective = PerspectiveKey::new("claude-code", "claude-code", Some("s1".into()));
4007
4008        // Store memories across different cognitive levels.
4009        let _raw = repo
4010            .store(StoreMemoryParams {
4011                namespace_id: ns_id,
4012                content: "raw note",
4013                category: &Category::Session,
4014                memory_lane_type: None,
4015                labels: &[],
4016                metadata: &cognitive_metadata(CognitiveLevel::Raw, &perspective, 0, 0),
4017                embedding: None,
4018                embedding_model: None,
4019            })
4020            .await
4021            .unwrap();
4022
4023        let explicit = repo
4024            .store(StoreMemoryParams {
4025                namespace_id: ns_id,
4026                content: "explicit fact",
4027                category: &Category::Facts,
4028                memory_lane_type: None,
4029                labels: &[],
4030                metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective, 3, 0),
4031                embedding: None,
4032                embedding_model: None,
4033            })
4034            .await
4035            .unwrap();
4036
4037        let derived = repo
4038            .store(StoreMemoryParams {
4039                namespace_id: ns_id,
4040                content: "derived insight",
4041                category: &Category::Facts,
4042                memory_lane_type: None,
4043                labels: &[],
4044                metadata: &cognitive_metadata(CognitiveLevel::Derived, &perspective, 8, 0),
4045                embedding: None,
4046                embedding_model: None,
4047            })
4048            .await
4049            .unwrap();
4050
4051        let contradiction = repo
4052            .store(StoreMemoryParams {
4053                namespace_id: ns_id,
4054                content: "contradiction",
4055                category: &Category::Facts,
4056                memory_lane_type: None,
4057                labels: &[],
4058                metadata: &cognitive_metadata(CognitiveLevel::Contradiction, &perspective, 0, 2),
4059                embedding: None,
4060                embedding_model: None,
4061            })
4062            .await
4063            .unwrap();
4064
4065        let result = repo
4066            .search_working_set(WorkingSetParams {
4067                namespace_id: ns_id,
4068                perspective: Some(&perspective),
4069                max_items: 20,
4070                include_raw: false,
4071            })
4072            .await
4073            .unwrap();
4074
4075        // Should contain all non-noise memories. Order: reinforced first, then recent,
4076        // then contradictions.
4077        assert!(result.len() >= 3);
4078        let ids: Vec<i64> = result.iter().map(|m| m.id).collect();
4079        assert!(ids.contains(&explicit.id));
4080        assert!(ids.contains(&derived.id));
4081        assert!(ids.contains(&contradiction.id));
4082    }
4083
4084    #[tokio::test]
4085    async fn test_search_working_set_dedupes() {
4086        let pool = setup_test_db().await;
4087        let ns_id = create_namespace(&pool, "test-agent").await;
4088        let repo = MemoryRepository::new(pool);
4089        let perspective = PerspectiveKey::new("claude-code", "claude-code", Some("s1".into()));
4090
4091        // A memory that is both reinforced and recent should appear only once.
4092        let shared = repo
4093            .store(StoreMemoryParams {
4094                namespace_id: ns_id,
4095                content: "shared memory",
4096                category: &Category::Facts,
4097                memory_lane_type: None,
4098                labels: &[],
4099                metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective, 10, 0),
4100                embedding: None,
4101                embedding_model: None,
4102            })
4103            .await
4104            .unwrap();
4105
4106        let result = repo
4107            .search_working_set(WorkingSetParams {
4108                namespace_id: ns_id,
4109                perspective: Some(&perspective),
4110                max_items: 20,
4111                include_raw: false,
4112            })
4113            .await
4114            .unwrap();
4115
4116        let count = result.iter().filter(|m| m.id == shared.id).count();
4117        assert_eq!(count, 1, "shared memory should appear exactly once");
4118    }
4119
4120    #[tokio::test]
4121    async fn test_search_working_set_respects_max_items() {
4122        let pool = setup_test_db().await;
4123        let ns_id = create_namespace(&pool, "test-agent").await;
4124        let repo = MemoryRepository::new(pool);
4125        let perspective = PerspectiveKey::new("claude-code", "claude-code", Some("s1".into()));
4126
4127        for i in 0..10 {
4128            let content = format!("memory {}", i);
4129            repo.store(StoreMemoryParams {
4130                namespace_id: ns_id,
4131                content: &content,
4132                category: &Category::Facts,
4133                memory_lane_type: None,
4134                labels: &[],
4135                metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective, i as i64, 0),
4136                embedding: None,
4137                embedding_model: None,
4138            })
4139            .await
4140            .unwrap();
4141        }
4142
4143        let result = repo
4144            .search_working_set(WorkingSetParams {
4145                namespace_id: ns_id,
4146                perspective: Some(&perspective),
4147                max_items: 3,
4148                include_raw: false,
4149            })
4150            .await
4151            .unwrap();
4152
4153        assert_eq!(result.len(), 3);
4154    }
4155
4156    #[tokio::test]
4157    async fn test_search_working_set_excludes_raw_noise() {
4158        let pool = setup_test_db().await;
4159        let ns_id = create_namespace(&pool, "test-agent").await;
4160        let repo = MemoryRepository::new(pool);
4161        let perspective = PerspectiveKey::new("claude-code", "claude-code", Some("s1".into()));
4162
4163        repo.store(StoreMemoryParams {
4164            namespace_id: ns_id,
4165            content: "real observation",
4166            category: &Category::Facts,
4167            memory_lane_type: None,
4168            labels: &[],
4169            metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective, 1, 0),
4170            embedding: None,
4171            embedding_model: None,
4172        })
4173        .await
4174        .unwrap();
4175
4176        repo.store(StoreMemoryParams {
4177            namespace_id: ns_id,
4178            content: "raw noise",
4179            category: &Category::Session,
4180            memory_lane_type: None,
4181            labels: &["raw-activity".to_string()],
4182            metadata: &serde_json::json!({"raw_activity": true, "cognitive": {"level": "raw"}}),
4183            embedding: None,
4184            embedding_model: None,
4185        })
4186        .await
4187        .unwrap();
4188
4189        let result = repo
4190            .search_working_set(WorkingSetParams {
4191                namespace_id: ns_id,
4192                perspective: Some(&perspective),
4193                max_items: 20,
4194                include_raw: false,
4195            })
4196            .await
4197            .unwrap();
4198
4199        assert!(result.iter().all(|m| m.content != "raw noise"));
4200        assert!(result.iter().any(|m| m.content == "real observation"));
4201    }
4202
4203    #[tokio::test]
4204    async fn test_search_working_set_without_perspective() {
4205        let pool = setup_test_db().await;
4206        let ns_id = create_namespace(&pool, "test-agent").await;
4207        let repo = MemoryRepository::new(pool);
4208
4209        repo.store(StoreMemoryParams {
4210            namespace_id: ns_id,
4211            content: "namespace memory one",
4212            category: &Category::Facts,
4213            memory_lane_type: None,
4214            labels: &[],
4215            metadata: &serde_json::json!({"cognitive": {"level": "explicit"}}),
4216            embedding: None,
4217            embedding_model: None,
4218        })
4219        .await
4220        .unwrap();
4221
4222        repo.store(StoreMemoryParams {
4223            namespace_id: ns_id,
4224            content: "namespace memory two",
4225            category: &Category::Facts,
4226            memory_lane_type: None,
4227            labels: &[],
4228            metadata: &serde_json::json!({"cognitive": {"level": "explicit"}}),
4229            embedding: None,
4230            embedding_model: None,
4231        })
4232        .await
4233        .unwrap();
4234
4235        let result = repo
4236            .search_working_set(WorkingSetParams {
4237                namespace_id: ns_id,
4238                perspective: None,
4239                max_items: 20,
4240                include_raw: false,
4241            })
4242            .await
4243            .unwrap();
4244
4245        assert!(result.len() >= 2);
4246    }
4247
4248    #[tokio::test]
4249    async fn test_list_by_session_key_matches_session_keys_array() {
4250        let pool = setup_test_db().await;
4251        let ns_id = create_namespace(&pool, "test-agent").await;
4252        let repo = MemoryRepository::new(pool);
4253
4254        repo.store(StoreMemoryParams {
4255            namespace_id: ns_id,
4256            content: "shared explicit memory",
4257            category: &Category::Facts,
4258            memory_lane_type: None,
4259            labels: &[],
4260            metadata: &serde_json::json!({
4261                "cognitive": {
4262                    "level": "explicit",
4263                    "session_key": "session-b",
4264                    "session_keys": ["session-a", "session-b"]
4265                }
4266            }),
4267            embedding: None,
4268            embedding_model: None,
4269        })
4270        .await
4271        .unwrap();
4272
4273        let session_a = repo
4274            .list_by_session_key(ns_id, "session-a", 10, false)
4275            .await
4276            .unwrap();
4277        let session_b = repo
4278            .list_by_session_key(ns_id, "session-b", 10, false)
4279            .await
4280            .unwrap();
4281
4282        assert_eq!(session_a.len(), 1);
4283        assert_eq!(session_b.len(), 1);
4284    }
4285
4286    #[tokio::test]
4287    async fn test_count_evidence_returns_zero_for_empty_namespace() {
4288        let pool = setup_test_db().await;
4289        let ns_id = create_namespace(&pool, "test-agent").await;
4290        let repo = MemoryRepository::new(pool);
4291
4292        let count = repo.count_evidence(ns_id).await.unwrap();
4293        assert_eq!(count, 0);
4294    }
4295
4296    #[tokio::test]
4297    async fn test_count_evidence_counts_lineage_edges() {
4298        let pool = setup_test_db().await;
4299        let ns_id = create_namespace(&pool, "test-agent").await;
4300        let repo = MemoryRepository::new(pool);
4301
4302        let source = repo
4303            .store(StoreMemoryParams {
4304                namespace_id: ns_id,
4305                content: "source memory",
4306                category: &Category::Session,
4307                memory_lane_type: None,
4308                labels: &[],
4309                metadata: &serde_json::json!({}),
4310                embedding: None,
4311                embedding_model: None,
4312            })
4313            .await
4314            .unwrap();
4315
4316        let _derived = repo
4317            .store_with_lineage(StoreMemoryWithLineageParams {
4318                store: StoreMemoryParams {
4319                    namespace_id: ns_id,
4320                    content: "derived with evidence",
4321                    category: &Category::Facts,
4322                    memory_lane_type: None,
4323                    labels: &[],
4324                    metadata: &serde_json::json!({"cognitive": {"level": "derived"}}),
4325                    embedding: None,
4326                    embedding_model: None,
4327                },
4328                source_memory_ids: &[source.id],
4329                evidence_role: "source",
4330            })
4331            .await
4332            .unwrap();
4333
4334        let count = repo.count_evidence(ns_id).await.unwrap();
4335        assert_eq!(count, 1);
4336    }
4337
4338    #[tokio::test]
4339    async fn test_count_evidence_does_not_count_other_namespace() {
4340        let pool = setup_test_db().await;
4341        let ns_a = create_namespace(&pool, "agent-a").await;
4342        let ns_b = create_namespace(&pool, "agent-b").await;
4343        let repo = MemoryRepository::new(pool);
4344
4345        let source = repo
4346            .store(StoreMemoryParams {
4347                namespace_id: ns_a,
4348                content: "source in ns-a",
4349                category: &Category::Session,
4350                memory_lane_type: None,
4351                labels: &[],
4352                metadata: &serde_json::json!({}),
4353                embedding: None,
4354                embedding_model: None,
4355            })
4356            .await
4357            .unwrap();
4358
4359        let _derived = repo
4360            .store_with_lineage(StoreMemoryWithLineageParams {
4361                store: StoreMemoryParams {
4362                    namespace_id: ns_a,
4363                    content: "derived in ns-a",
4364                    category: &Category::Facts,
4365                    memory_lane_type: None,
4366                    labels: &[],
4367                    metadata: &serde_json::json!({}),
4368                    embedding: None,
4369                    embedding_model: None,
4370                },
4371                source_memory_ids: &[source.id],
4372                evidence_role: "source",
4373            })
4374            .await
4375            .unwrap();
4376
4377        assert_eq!(repo.count_evidence(ns_a).await.unwrap(), 1);
4378        assert_eq!(repo.count_evidence(ns_b).await.unwrap(), 0);
4379    }
4380
4381    #[tokio::test]
4382    async fn test_count_by_cognitive_level_returns_matching_total() {
4383        let pool = setup_test_db().await;
4384        let ns_id = create_namespace(&pool, "level-counts").await;
4385        let repo = MemoryRepository::new(pool);
4386
4387        for (content, level) in [
4388            ("raw event", CognitiveLevel::Raw),
4389            ("derived insight", CognitiveLevel::Derived),
4390            ("derived insight 2", CognitiveLevel::Derived),
4391            ("contradiction note", CognitiveLevel::Contradiction),
4392        ] {
4393            repo.store(StoreMemoryParams {
4394                namespace_id: ns_id,
4395                content,
4396                category: &Category::Session,
4397                memory_lane_type: None,
4398                labels: &[],
4399                metadata: &serde_json::json!({
4400                    "cognitive": {
4401                        "level": level.as_str(),
4402                        "observer": "claude-code",
4403                        "subject": "claude-code",
4404                        "generated_by": "test"
4405                    }
4406                }),
4407                embedding: None,
4408                embedding_model: None,
4409            })
4410            .await
4411            .unwrap();
4412        }
4413
4414        assert_eq!(
4415            repo.count_by_cognitive_level(ns_id, CognitiveLevel::Derived)
4416                .await
4417                .unwrap(),
4418            2
4419        );
4420        assert_eq!(
4421            repo.count_by_cognitive_level(ns_id, CognitiveLevel::Contradiction)
4422                .await
4423                .unwrap(),
4424            1
4425        );
4426    }
4427
4428    /// Regression test: `get_by_cognitive_level_with_perspective` must apply
4429    /// perspective filtering in SQL BEFORE the LIMIT, so callers receive up to
4430    /// `limit` matching results instead of silently getting fewer.
4431    #[tokio::test]
4432    async fn test_get_by_cognitive_level_with_perspective_filters_before_limit() {
4433        let pool = setup_test_db().await;
4434        let ns_id = create_namespace(&pool, "perspective-limit").await;
4435        let repo = MemoryRepository::new(pool);
4436
4437        let perspective_a = PerspectiveKey::new("alice", "project-x", None);
4438        let perspective_b = PerspectiveKey::new("bob", "project-y", None);
4439
4440        // Insert 5 memories for alice + project-x at Explicit level.
4441        for i in 0..5 {
4442            repo.store(StoreMemoryParams {
4443                namespace_id: ns_id,
4444                content: &format!("alice memory {}", i),
4445                category: &Category::Facts,
4446                memory_lane_type: None,
4447                labels: &[],
4448                metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective_a, 0, 0),
4449                embedding: None,
4450                embedding_model: None,
4451            })
4452            .await
4453            .unwrap();
4454        }
4455
4456        // Insert 5 memories for bob + project-y at Explicit level.
4457        for i in 0..5 {
4458            repo.store(StoreMemoryParams {
4459                namespace_id: ns_id,
4460                content: &format!("bob memory {}", i),
4461                category: &Category::Facts,
4462                memory_lane_type: None,
4463                labels: &[],
4464                metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective_b, 0, 0),
4465                embedding: None,
4466                embedding_model: None,
4467            })
4468            .await
4469            .unwrap();
4470        }
4471
4472        // Request 3 results for alice's perspective; should get exactly 3, not fewer.
4473        let alice_results = repo
4474            .get_by_cognitive_level_with_perspective(
4475                ns_id,
4476                CognitiveLevel::Explicit,
4477                &perspective_a,
4478                3,
4479            )
4480            .await
4481            .unwrap();
4482        assert_eq!(alice_results.len(), 3);
4483        assert!(alice_results.iter().all(|m| {
4484            let meta = &m.metadata;
4485            let obs = meta
4486                .get("cognitive")
4487                .and_then(|c| c.get("observer"))
4488                .and_then(|v| v.as_str());
4489            let sub = meta
4490                .get("cognitive")
4491                .and_then(|c| c.get("subject"))
4492                .and_then(|v| v.as_str());
4493            obs == Some("alice") && sub == Some("project-x")
4494        }));
4495
4496        // Request 10 results for alice; there are only 5, so capped at 5.
4497        let alice_many = repo
4498            .get_by_cognitive_level_with_perspective(
4499                ns_id,
4500                CognitiveLevel::Explicit,
4501                &perspective_a,
4502                10,
4503            )
4504            .await
4505            .unwrap();
4506        assert_eq!(alice_many.len(), 5);
4507
4508        // Bob gets a separate set.
4509        let bob_results = repo
4510            .get_by_cognitive_level_with_perspective(
4511                ns_id,
4512                CognitiveLevel::Explicit,
4513                &perspective_b,
4514                3,
4515            )
4516            .await
4517            .unwrap();
4518        assert_eq!(bob_results.len(), 3);
4519        assert!(bob_results.iter().all(|m| {
4520            let meta = &m.metadata;
4521            let obs = meta
4522                .get("cognitive")
4523                .and_then(|c| c.get("observer"))
4524                .and_then(|v| v.as_str());
4525            let sub = meta
4526                .get("cognitive")
4527                .and_then(|c| c.get("subject"))
4528                .and_then(|v| v.as_str());
4529            obs == Some("bob") && sub == Some("project-y")
4530        }));
4531    }
4532
4533    /// Verifies that the scalar session_key field is respected in the SQL filter.
4534    #[tokio::test]
4535    async fn test_get_by_cognitive_level_with_perspective_respects_session_key() {
4536        let pool = setup_test_db().await;
4537        let ns_id = create_namespace(&pool, "session-key-scalar").await;
4538        let repo = MemoryRepository::new(pool);
4539
4540        let perspective_s1 =
4541            PerspectiveKey::new("alice", "project-x", Some("session-1".to_string()));
4542        let perspective_s2 =
4543            PerspectiveKey::new("alice", "project-x", Some("session-2".to_string()));
4544
4545        for i in 0..3 {
4546            repo.store(StoreMemoryParams {
4547                namespace_id: ns_id,
4548                content: &format!("s1 memory {}", i),
4549                category: &Category::Facts,
4550                memory_lane_type: None,
4551                labels: &[],
4552                metadata: &cognitive_metadata(CognitiveLevel::Derived, &perspective_s1, 0, 0),
4553                embedding: None,
4554                embedding_model: None,
4555            })
4556            .await
4557            .unwrap();
4558        }
4559        for i in 0..3 {
4560            repo.store(StoreMemoryParams {
4561                namespace_id: ns_id,
4562                content: &format!("s2 memory {}", i),
4563                category: &Category::Facts,
4564                memory_lane_type: None,
4565                labels: &[],
4566                metadata: &cognitive_metadata(CognitiveLevel::Derived, &perspective_s2, 0, 0),
4567                embedding: None,
4568                embedding_model: None,
4569            })
4570            .await
4571            .unwrap();
4572        }
4573
4574        let s1_results = repo
4575            .get_by_cognitive_level_with_perspective(
4576                ns_id,
4577                CognitiveLevel::Derived,
4578                &perspective_s1,
4579                10,
4580            )
4581            .await
4582            .unwrap();
4583        assert_eq!(s1_results.len(), 3);
4584        assert!(s1_results.iter().all(|m| m.content.starts_with("s1")));
4585
4586        let s2_results = repo
4587            .get_by_cognitive_level_with_perspective(
4588                ns_id,
4589                CognitiveLevel::Derived,
4590                &perspective_s2,
4591                10,
4592            )
4593            .await
4594            .unwrap();
4595        assert_eq!(s2_results.len(), 3);
4596        assert!(s2_results.iter().all(|m| m.content.starts_with("s2")));
4597    }
4598
4599    /// Verifies that memories matching only the session_keys array are included.
4600    #[tokio::test]
4601    async fn test_get_by_cognitive_level_with_perspective_matches_session_keys_array() {
4602        let pool = setup_test_db().await;
4603        let ns_id = create_namespace(&pool, "session-keys-array").await;
4604        let repo = MemoryRepository::new(pool);
4605
4606        let perspective = PerspectiveKey::new("alice", "project-x", Some("session-a".to_string()));
4607
4608        // Memory with session_key set to the same key as the perspective.
4609        repo.store(StoreMemoryParams {
4610            namespace_id: ns_id,
4611            content: "scalar match",
4612            category: &Category::Facts,
4613            memory_lane_type: None,
4614            labels: &[],
4615            metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective, 0, 0),
4616            embedding: None,
4617            embedding_model: None,
4618        })
4619        .await
4620        .unwrap();
4621
4622        // Memory with session_keys array containing the key (but different scalar session_key).
4623        repo.store(StoreMemoryParams {
4624            namespace_id: ns_id,
4625            content: "array match",
4626            category: &Category::Facts,
4627            memory_lane_type: None,
4628            labels: &[],
4629            metadata: &serde_json::json!({
4630                "cognitive": {
4631                    "level": "explicit",
4632                    "observer": "alice",
4633                    "subject": "project-x",
4634                    "session_key": "session-other",
4635                    "session_keys": ["session-a", "session-b"],
4636                    "generated_by": "test"
4637                }
4638            }),
4639            embedding: None,
4640            embedding_model: None,
4641        })
4642        .await
4643        .unwrap();
4644
4645        // Memory that does not match at all.
4646        repo.store(StoreMemoryParams {
4647            namespace_id: ns_id,
4648            content: "no match",
4649            category: &Category::Facts,
4650            memory_lane_type: None,
4651            labels: &[],
4652            metadata: &serde_json::json!({
4653                "cognitive": {
4654                    "level": "explicit",
4655                    "observer": "alice",
4656                    "subject": "project-x",
4657                    "session_key": "session-other",
4658                    "session_keys": ["session-z"],
4659                    "generated_by": "test"
4660                }
4661            }),
4662            embedding: None,
4663            embedding_model: None,
4664        })
4665        .await
4666        .unwrap();
4667
4668        let results = repo
4669            .get_by_cognitive_level_with_perspective(
4670                ns_id,
4671                CognitiveLevel::Explicit,
4672                &perspective,
4673                10,
4674            )
4675            .await
4676            .unwrap();
4677        assert_eq!(results.len(), 2);
4678        let contents: Vec<_> = results.iter().map(|m| m.content.as_str()).collect();
4679        assert!(contents.contains(&"scalar match"));
4680        assert!(contents.contains(&"array match"));
4681    }
4682
4683    #[tokio::test]
4684    async fn test_record_metric_and_latest_metrics_for_namespace() {
4685        let pool = setup_test_db().await;
4686        let ns_id = create_namespace(&pool, "metric-ns").await;
4687        let other_ns = create_namespace(&pool, "metric-other").await;
4688        let repo = MemoryRepository::new(pool);
4689
4690        repo.record_metric(
4691            "cognition.query.total_ms",
4692            12.5,
4693            &serde_json::json!({"namespace_id": ns_id, "stage": "total", "unit": "ms"}),
4694        )
4695        .await
4696        .unwrap();
4697        repo.record_metric(
4698            "cognition.query.total_ms",
4699            18.0,
4700            &serde_json::json!({"namespace_id": other_ns, "stage": "total", "unit": "ms"}),
4701        )
4702        .await
4703        .unwrap();
4704        repo.record_metric(
4705            "cognition.representation.total_ms",
4706            4.0,
4707            &serde_json::json!({"namespace_id": ns_id, "stage": "total", "unit": "ms"}),
4708        )
4709        .await
4710        .unwrap();
4711
4712        let metrics = repo
4713            .latest_metrics_for_namespace(ns_id, Some("cognition."), 10)
4714            .await
4715            .unwrap();
4716
4717        assert_eq!(metrics.len(), 2);
4718        assert!(metrics
4719            .iter()
4720            .all(|metric| metric.labels.contains(&ns_id.to_string())));
4721        assert!(metrics
4722            .iter()
4723            .any(|metric| metric.metric_name == "cognition.query.total_ms"));
4724        assert!(metrics
4725            .iter()
4726            .any(|metric| metric.metric_name == "cognition.representation.total_ms"));
4727        assert!(metrics
4728            .iter()
4729            .all(|metric| { metric.metric_name.starts_with("cognition.") }));
4730    }
4731
4732    #[tokio::test]
4733    async fn test_record_metrics_batch_persists_all_samples() {
4734        let pool = setup_test_db().await;
4735        let ns_id = create_namespace(&pool, "metric-batch").await;
4736        let repo = MemoryRepository::new(pool);
4737
4738        repo.record_metrics_batch(&[
4739            MetricSample {
4740                metric_name: "cognition.query.total_ms".to_string(),
4741                metric_value: 9.5,
4742                labels: serde_json::json!({"namespace_id": ns_id, "stage": "total", "unit": "ms"}),
4743            },
4744            MetricSample {
4745                metric_name: "cognition.query.answer.total_tokens".to_string(),
4746                metric_value: 128.0,
4747                labels: serde_json::json!({"namespace_id": ns_id, "stage": "answer", "unit": "tokens"}),
4748            },
4749        ])
4750        .await
4751        .unwrap();
4752
4753        let metrics = repo
4754            .latest_metrics_for_namespace(ns_id, Some("cognition."), 10)
4755            .await
4756            .unwrap();
4757
4758        assert_eq!(metrics.len(), 2);
4759    }
4760
4761    // ---- Observability query tests ----
4762
4763    #[tokio::test]
4764    async fn test_list_jobs_returns_enqueued_jobs() {
4765        let pool = setup_test_db().await;
4766        let ns_id = create_namespace(&pool, "obs-jobs").await;
4767        let repo = MemoryRepository::new(pool);
4768
4769        repo.enqueue_job(EnqueueJobParams {
4770            namespace_id: ns_id,
4771            job_type: "derive",
4772            priority: 10,
4773            perspective: None,
4774            payload: &serde_json::json!({"a": 1}),
4775        })
4776        .await
4777        .unwrap();
4778
4779        repo.enqueue_job(EnqueueJobParams {
4780            namespace_id: ns_id,
4781            job_type: "digest",
4782            priority: 5,
4783            perspective: None,
4784            payload: &serde_json::json!({"b": 2}),
4785        })
4786        .await
4787        .unwrap();
4788
4789        // List all jobs.
4790        let all = repo.list_jobs(ns_id, None, None, 50, 0).await.unwrap();
4791        assert_eq!(all.len(), 2);
4792
4793        // Filter by job_type.
4794        let derive_only = repo
4795            .list_jobs(ns_id, Some("derive"), None, 50, 0)
4796            .await
4797            .unwrap();
4798        assert_eq!(derive_only.len(), 1);
4799        assert_eq!(derive_only[0].job_type, "derive");
4800
4801        // Filter by status (both pending).
4802        let pending = repo
4803            .list_jobs(ns_id, None, Some("pending"), 50, 0)
4804            .await
4805            .unwrap();
4806        assert_eq!(pending.len(), 2);
4807
4808        // Combined filter.
4809        let digest_pending = repo
4810            .list_jobs(ns_id, Some("digest"), Some("pending"), 50, 0)
4811            .await
4812            .unwrap();
4813        assert_eq!(digest_pending.len(), 1);
4814    }
4815
4816    #[tokio::test]
4817    async fn test_list_jobs_respects_limit_offset() {
4818        let pool = setup_test_db().await;
4819        let ns_id = create_namespace(&pool, "obs-limit").await;
4820        let repo = MemoryRepository::new(pool);
4821
4822        for i in 0..5 {
4823            repo.enqueue_job(EnqueueJobParams {
4824                namespace_id: ns_id,
4825                job_type: "derive",
4826                priority: i,
4827                perspective: None,
4828                payload: &serde_json::json!({"i": i}),
4829            })
4830            .await
4831            .unwrap();
4832        }
4833
4834        let page1 = repo.list_jobs(ns_id, None, None, 2, 0).await.unwrap();
4835        assert_eq!(page1.len(), 2);
4836
4837        let page2 = repo.list_jobs(ns_id, None, None, 2, 2).await.unwrap();
4838        assert_eq!(page2.len(), 2);
4839
4840        let page3 = repo.list_jobs(ns_id, None, None, 2, 4).await.unwrap();
4841        assert_eq!(page3.len(), 1);
4842    }
4843
4844    #[tokio::test]
4845    async fn test_count_jobs_by_status() {
4846        let pool = setup_test_db().await;
4847        let ns_id = create_namespace(&pool, "obs-count").await;
4848        let repo = MemoryRepository::new(pool);
4849
4850        repo.enqueue_job(EnqueueJobParams {
4851            namespace_id: ns_id,
4852            job_type: "derive",
4853            priority: 10,
4854            perspective: None,
4855            payload: &serde_json::json!({}),
4856        })
4857        .await
4858        .unwrap();
4859
4860        repo.enqueue_job(EnqueueJobParams {
4861            namespace_id: ns_id,
4862            job_type: "derive",
4863            priority: 5,
4864            perspective: None,
4865            payload: &serde_json::json!({}),
4866        })
4867        .await
4868        .unwrap();
4869
4870        repo.enqueue_job(EnqueueJobParams {
4871            namespace_id: ns_id,
4872            job_type: "digest",
4873            priority: 10,
4874            perspective: None,
4875            payload: &serde_json::json!({}),
4876        })
4877        .await
4878        .unwrap();
4879
4880        // All jobs.
4881        let all_counts = repo.count_jobs_by_status(ns_id, None).await.unwrap();
4882        let total: i64 = all_counts.iter().map(|(_, c)| c).sum();
4883        assert_eq!(total, 3);
4884
4885        // Filtered by job_type.
4886        let derive_counts = repo
4887            .count_jobs_by_status(ns_id, Some("derive"))
4888            .await
4889            .unwrap();
4890        let derive_total: i64 = derive_counts.iter().map(|(_, c)| c).sum();
4891        assert_eq!(derive_total, 2);
4892    }
4893
4894    #[tokio::test]
4895    async fn test_count_jobs_respects_filters() {
4896        let pool = setup_test_db().await;
4897        let ns_id = create_namespace(&pool, "obs-job-total").await;
4898        let repo = MemoryRepository::new(pool);
4899
4900        repo.enqueue_job(EnqueueJobParams {
4901            namespace_id: ns_id,
4902            job_type: "derive",
4903            priority: 10,
4904            perspective: None,
4905            payload: &serde_json::json!({"index": 1}),
4906        })
4907        .await
4908        .unwrap();
4909        repo.enqueue_job(EnqueueJobParams {
4910            namespace_id: ns_id,
4911            job_type: "derive",
4912            priority: 5,
4913            perspective: None,
4914            payload: &serde_json::json!({"index": 2}),
4915        })
4916        .await
4917        .unwrap();
4918        repo.enqueue_job(EnqueueJobParams {
4919            namespace_id: ns_id,
4920            job_type: "digest",
4921            priority: 1,
4922            perspective: None,
4923            payload: &serde_json::json!({"index": 3}),
4924        })
4925        .await
4926        .unwrap();
4927
4928        assert_eq!(repo.count_jobs(ns_id, None, None).await.unwrap(), 3);
4929        assert_eq!(
4930            repo.count_jobs(ns_id, Some("derive"), None).await.unwrap(),
4931            2
4932        );
4933        assert_eq!(
4934            repo.count_jobs(ns_id, Some("derive"), Some("pending"))
4935                .await
4936                .unwrap(),
4937            2
4938        );
4939        assert_eq!(
4940            repo.count_jobs(ns_id, Some("reflect"), Some("pending"))
4941                .await
4942                .unwrap(),
4943            0
4944        );
4945    }
4946
4947    #[tokio::test]
4948    async fn test_list_digests_and_count() {
4949        let pool = setup_test_db().await;
4950        let ns_id = create_namespace(&pool, "obs-digests").await;
4951        let repo = MemoryRepository::new(pool);
4952
4953        // Store a memory to use as digest content.
4954        let mem = repo
4955            .store(StoreMemoryParams {
4956                namespace_id: ns_id,
4957                content: "digest content",
4958                category: &Category::Session,
4959                memory_lane_type: None,
4960                labels: &[],
4961                metadata: &serde_json::json!({"cognitive": {"level": "summary_short"}}),
4962                embedding: None,
4963                embedding_model: None,
4964            })
4965            .await
4966            .unwrap();
4967
4968        repo.store_digest(StoreDigestParams {
4969            namespace_id: ns_id,
4970            session_key: "session-1",
4971            digest_kind: "short",
4972            memory_id: mem.id,
4973            start_memory_id: Some(1),
4974            end_memory_id: Some(10),
4975            token_count: 50,
4976        })
4977        .await
4978        .unwrap();
4979
4980        repo.store_digest(StoreDigestParams {
4981            namespace_id: ns_id,
4982            session_key: "session-2",
4983            digest_kind: "long",
4984            memory_id: mem.id,
4985            start_memory_id: Some(11),
4986            end_memory_id: Some(20),
4987            token_count: 100,
4988        })
4989        .await
4990        .unwrap();
4991
4992        // List all digests.
4993        let all = repo.list_digests(ns_id, None, 50, 0).await.unwrap();
4994        assert_eq!(all.len(), 2);
4995
4996        let total = repo.count_digests(ns_id, None).await.unwrap();
4997        assert_eq!(total, 2);
4998
4999        // Filter by session_key.
5000        let sess1 = repo
5001            .list_digests(ns_id, Some("session-1"), 50, 0)
5002            .await
5003            .unwrap();
5004        assert_eq!(sess1.len(), 1);
5005        assert_eq!(sess1[0].session_key, "session-1");
5006
5007        let sess1_count = repo.count_digests(ns_id, Some("session-1")).await.unwrap();
5008        assert_eq!(sess1_count, 1);
5009
5010        // Filter by non-existent session_key.
5011        let none = repo
5012            .list_digests(ns_id, Some("session-none"), 50, 0)
5013            .await
5014            .unwrap();
5015        assert!(none.is_empty());
5016
5017        let none_count = repo
5018            .count_digests(ns_id, Some("session-none"))
5019            .await
5020            .unwrap();
5021        assert_eq!(none_count, 0);
5022    }
5023
5024    // ---- Phase 2 Task 2: Explicit JSON decode error tests ----
5025
5026    /// Malformed labels JSON must produce an explicit Storage error,
5027    /// not silently default to an empty Vec.
5028    #[tokio::test]
5029    async fn test_row_to_memory_rejects_malformed_labels() {
5030        let pool = setup_test_db().await;
5031        let ns_id = create_namespace(&pool, "test-agent").await;
5032        let repo = MemoryRepository::new(pool);
5033
5034        // Insert a memory with valid data first.
5035        let memory = repo
5036            .store(StoreMemoryParams {
5037                namespace_id: ns_id,
5038                content: "corruption test labels",
5039                category: &Category::General,
5040                memory_lane_type: None,
5041                labels: &["valid-label".to_string()],
5042                metadata: &serde_json::Value::Null,
5043                embedding: None,
5044                embedding_model: None,
5045            })
5046            .await
5047            .unwrap();
5048
5049        // Corrupt the labels in-place to invalid JSON.
5050        sqlx::query("UPDATE memories SET labels = 'NOT VALID JSON{{{' WHERE id = ?")
5051            .bind(memory.id)
5052            .execute(repo.pool())
5053            .await
5054            .unwrap();
5055
5056        let err = repo.get_by_id(memory.id).await.unwrap_err();
5057        let msg = err.to_string();
5058        assert!(
5059            msg.contains("corrupted labels JSON"),
5060            "expected labels corruption error, got: {msg}"
5061        );
5062        assert!(msg.contains(&memory.id.to_string()));
5063    }
5064
5065    /// Malformed metadata JSON must produce an explicit Storage error,
5066    /// not silently default to Value::Null.
5067    #[tokio::test]
5068    async fn test_row_to_memory_rejects_malformed_metadata() {
5069        let pool = setup_test_db().await;
5070        let repo = MemoryRepository::new(pool);
5071
5072        // Construct a MemoryRow with corrupted metadata directly,
5073        // bypassing SQLite expression-index validation on UPDATE.
5074        let row = MemoryRow {
5075            id: 999,
5076            namespace_id: 1,
5077            content: "test".to_string(),
5078            category: "general".to_string(),
5079            memory_lane_type: None,
5080            labels: "[]".to_string(),
5081            metadata: "[truncated".to_string(), // invalid JSON
5082            similarity_score: None,
5083            relevance_score: None,
5084            content_embedding: None,
5085            embedding_model: None,
5086            created_at: Utc::now(),
5087            updated_at: None,
5088            last_accessed: None,
5089            is_active: true,
5090            is_archived: false,
5091            access_count: 0,
5092        };
5093
5094        let err = repo.row_to_memory(row).unwrap_err();
5095        let msg = err.to_string();
5096        assert!(
5097            msg.contains("corrupted metadata JSON"),
5098            "expected metadata corruption error, got: {msg}"
5099        );
5100    }
5101
5102    /// Malformed embedding JSON must produce an explicit Storage error,
5103    /// not silently default to None.
5104    #[tokio::test]
5105    async fn test_row_to_memory_rejects_malformed_embedding() {
5106        let pool = setup_test_db().await;
5107        let ns_id = create_namespace(&pool, "test-agent").await;
5108        let repo = MemoryRepository::new(pool);
5109
5110        let memory = repo
5111            .store(StoreMemoryParams {
5112                namespace_id: ns_id,
5113                content: "corruption test embedding",
5114                category: &Category::General,
5115                memory_lane_type: None,
5116                labels: &[],
5117                metadata: &serde_json::Value::Null,
5118                embedding: Some(&[0.1, 0.2, 0.3]),
5119                embedding_model: Some("test-model"),
5120            })
5121            .await
5122            .unwrap();
5123
5124        sqlx::query("UPDATE memories SET content_embedding = 'not-an-array' WHERE id = ?")
5125            .bind(memory.id)
5126            .execute(repo.pool())
5127            .await
5128            .unwrap();
5129
5130        let err = repo.get_by_id(memory.id).await.unwrap_err();
5131        let msg = err.to_string();
5132        assert!(
5133            msg.contains("corrupted embedding JSON"),
5134            "expected embedding corruption error, got: {msg}"
5135        );
5136    }
5137
5138    /// Malformed job payload JSON is permanently failed and the claim succeeds
5139    /// with an empty result, rather than poisoning the entire batch.
5140    #[tokio::test]
5141    async fn test_claim_jobs_rejects_malformed_payload() {
5142        let pool = setup_test_db().await;
5143        let ns_id = create_namespace(&pool, "test-agent").await;
5144        let repo = MemoryRepository::new(pool);
5145
5146        // Insert a job directly with corrupted payload JSON.
5147        sqlx::query(
5148            r#"
5149            INSERT INTO memory_jobs (namespace_id, job_type, status, priority, payload_json, created_at, updated_at)
5150            VALUES (?, 'derive_memory', 'pending', 100, '{INVALID_JSON}', datetime('now'), datetime('now'))
5151            "#,
5152        )
5153        .bind(ns_id)
5154        .execute(repo.pool())
5155        .await
5156        .unwrap();
5157
5158        // claim_jobs should succeed with empty result — the bad job is permanently failed.
5159        let claimed = repo
5160            .claim_jobs(ns_id, "derive_memory", "worker-1", 60, 1)
5161            .await
5162            .unwrap();
5163        assert!(
5164            claimed.is_empty(),
5165            "corrupt payload job should not be returned"
5166        );
5167
5168        // Verify the job was permanently failed.
5169        let status: String =
5170            sqlx::query_scalar("SELECT status FROM memory_jobs WHERE namespace_id = ?")
5171                .bind(ns_id)
5172                .fetch_one(repo.pool())
5173                .await
5174                .unwrap();
5175        assert_eq!(status, "failed", "corrupt job should be permanently failed");
5176
5177        let last_error: Option<String> =
5178            sqlx::query_scalar("SELECT last_error FROM memory_jobs WHERE namespace_id = ?")
5179                .bind(ns_id)
5180                .fetch_one(repo.pool())
5181                .await
5182                .unwrap();
5183        assert!(
5184            last_error
5185                .unwrap_or_default()
5186                .contains("corrupted payload JSON"),
5187            "last_error should mention payload corruption"
5188        );
5189    }
5190
5191    /// Malformed perspective JSON in claim_jobs is permanently failed and the
5192    /// claim succeeds with an empty result, rather than poisoning the batch.
5193    #[tokio::test]
5194    async fn test_claim_jobs_rejects_malformed_perspective() {
5195        let pool = setup_test_db().await;
5196        let ns_id = create_namespace(&pool, "test-agent").await;
5197        let repo = MemoryRepository::new(pool);
5198
5199        // Insert a job with valid payload but corrupted perspective JSON.
5200        sqlx::query(
5201            r#"
5202            INSERT INTO memory_jobs (namespace_id, job_type, status, priority, perspective_json, payload_json, created_at, updated_at)
5203            VALUES (?, 'derive_memory', 'pending', 100, '{BOGUS}', '{"ok": true}', datetime('now'), datetime('now'))
5204            "#,
5205        )
5206        .bind(ns_id)
5207        .execute(repo.pool())
5208        .await
5209        .unwrap();
5210
5211        // claim_jobs should succeed with empty result — the bad job is permanently failed.
5212        let claimed = repo
5213            .claim_jobs(ns_id, "derive_memory", "worker-1", 60, 1)
5214            .await
5215            .unwrap();
5216        assert!(
5217            claimed.is_empty(),
5218            "corrupt perspective job should not be returned"
5219        );
5220
5221        // Verify the job was permanently failed.
5222        let status: String =
5223            sqlx::query_scalar("SELECT status FROM memory_jobs WHERE namespace_id = ?")
5224                .bind(ns_id)
5225                .fetch_one(repo.pool())
5226                .await
5227                .unwrap();
5228        assert_eq!(status, "failed", "corrupt job should be permanently failed");
5229
5230        let last_error: Option<String> =
5231            sqlx::query_scalar("SELECT last_error FROM memory_jobs WHERE namespace_id = ?")
5232                .bind(ns_id)
5233                .fetch_one(repo.pool())
5234                .await
5235                .unwrap();
5236        assert!(
5237            last_error
5238                .unwrap_or_default()
5239                .contains("corrupted perspective JSON"),
5240            "last_error should mention perspective corruption"
5241        );
5242    }
5243
5244    /// When a batch contains one corrupt job among valid ones, the valid jobs
5245    /// are returned and only the corrupt job is permanently failed.
5246    #[tokio::test]
5247    async fn test_claim_jobs_skips_corrupt_returns_valid() {
5248        let pool = setup_test_db().await;
5249        let ns_id = create_namespace(&pool, "test-agent").await;
5250        let repo = MemoryRepository::new(pool);
5251
5252        // Enqueue 2 valid jobs.
5253        let p1 = serde_json::json!({"memory_id": 1});
5254        repo.enqueue_job(EnqueueJobParams {
5255            namespace_id: ns_id,
5256            job_type: "derive_memory",
5257            priority: 100,
5258            perspective: None,
5259            payload: &p1,
5260        })
5261        .await
5262        .unwrap();
5263        let p2 = serde_json::json!({"memory_id": 2});
5264        repo.enqueue_job(EnqueueJobParams {
5265            namespace_id: ns_id,
5266            job_type: "derive_memory",
5267            priority: 50,
5268            perspective: None,
5269            payload: &p2,
5270        })
5271        .await
5272        .unwrap();
5273
5274        // Insert 1 corrupt job at the highest priority.
5275        sqlx::query(
5276            r#"
5277            INSERT INTO memory_jobs (namespace_id, job_type, status, priority, payload_json, created_at, updated_at)
5278            VALUES (?, 'derive_memory', 'pending', 200, '{BROKEN}', datetime('now'), datetime('now'))
5279            "#,
5280        )
5281        .bind(ns_id)
5282        .execute(repo.pool())
5283        .await
5284        .unwrap();
5285
5286        // Claim all 3 — should get only the 2 valid jobs.
5287        let claimed = repo
5288            .claim_jobs(ns_id, "derive_memory", "worker-1", 60, 10)
5289            .await
5290            .unwrap();
5291        assert_eq!(
5292            claimed.len(),
5293            2,
5294            "should return 2 valid jobs, skipping the corrupt one"
5295        );
5296
5297        // The corrupt job should be permanently failed.
5298        let failed_count: i64 = sqlx::query_scalar(
5299            "SELECT COUNT(*) FROM memory_jobs WHERE namespace_id = ? AND status = 'failed'",
5300        )
5301        .bind(ns_id)
5302        .fetch_one(repo.pool())
5303        .await
5304        .unwrap();
5305        assert_eq!(failed_count, 1, "corrupt job should be permanently failed");
5306    }
5307
5308    // ---- Lifecycle management: purge tests ----
5309
5310    /// Purge should remove old completed jobs but keep recently completed ones.
5311    #[tokio::test]
5312    async fn test_purge_completed_jobs_removes_old_keeps_recent() {
5313        let pool = setup_test_db().await;
5314        let ns_id = create_namespace(&pool, "purge-test").await;
5315        let repo = MemoryRepository::new(pool);
5316
5317        // Enqueue and complete a job, then backdate its updated_at.
5318        repo.enqueue_job(EnqueueJobParams {
5319            namespace_id: ns_id,
5320            job_type: "derive_memory",
5321            priority: 10,
5322            perspective: None,
5323            payload: &serde_json::json!({"old": true}),
5324        })
5325        .await
5326        .unwrap();
5327
5328        let claimed = repo
5329            .claim_jobs(ns_id, "derive_memory", "w", 60, 10)
5330            .await
5331            .unwrap();
5332        assert_eq!(claimed.len(), 1);
5333        let old_job_id = claimed[0].row.id;
5334
5335        repo.complete_job(&claimed[0]).await.unwrap();
5336
5337        // Backdate the completed job to 30 days ago.
5338        sqlx::query("UPDATE memory_jobs SET updated_at = datetime('now', '-30 days') WHERE id = ?")
5339            .bind(old_job_id)
5340            .execute(repo.pool())
5341            .await
5342            .unwrap();
5343
5344        // Enqueue and complete a second job that stays recent.
5345        repo.enqueue_job(EnqueueJobParams {
5346            namespace_id: ns_id,
5347            job_type: "derive_memory",
5348            priority: 10,
5349            perspective: None,
5350            payload: &serde_json::json!({"new": true}),
5351        })
5352        .await
5353        .unwrap();
5354
5355        let claimed2 = repo
5356            .claim_jobs(ns_id, "derive_memory", "w", 60, 10)
5357            .await
5358            .unwrap();
5359        assert_eq!(claimed2.len(), 1);
5360        repo.complete_job(&claimed2[0]).await.unwrap();
5361
5362        // Purge with a cutoff of 7 days ago.
5363        let cutoff = Utc::now() - chrono::Duration::days(7);
5364        let deleted = repo.purge_completed_jobs(cutoff).await.unwrap();
5365        assert_eq!(deleted, 1);
5366
5367        // Verify: old job is gone, recent job remains.
5368        let remaining = repo.list_jobs(ns_id, None, None, 50, 0).await.unwrap();
5369        assert_eq!(remaining.len(), 1);
5370        assert_eq!(remaining[0].id, claimed2[0].row.id);
5371    }
5372
5373    /// Purge permanently failed jobs should only remove those with attempts >= 5.
5374    #[tokio::test]
5375    async fn test_purge_permanently_failed_jobs_removes_old_keeps_recent() {
5376        let pool = setup_test_db().await;
5377        let ns_id = create_namespace(&pool, "purge-failed").await;
5378        let repo = MemoryRepository::new(pool);
5379
5380        // Enqueue a job and fail it 5 times to make it permanently failed.
5381        repo.enqueue_job(EnqueueJobParams {
5382            namespace_id: ns_id,
5383            job_type: "derive_memory",
5384            priority: 10,
5385            perspective: None,
5386            payload: &serde_json::json!({"fail_me": true}),
5387        })
5388        .await
5389        .unwrap();
5390
5391        for _ in 0..5 {
5392            let claimed = repo
5393                .claim_jobs(ns_id, "derive_memory", "w", 60, 10)
5394                .await
5395                .unwrap();
5396            assert_eq!(claimed.len(), 1);
5397            repo.fail_job(&claimed[0], "persistent error")
5398                .await
5399                .unwrap();
5400        }
5401
5402        // Backdate to 30 days ago.
5403        sqlx::query(
5404            "UPDATE memory_jobs SET updated_at = datetime('now', '-30 days') WHERE status = ?",
5405        )
5406        .bind(memory_job_status::FAILED)
5407        .execute(repo.pool())
5408        .await
5409        .unwrap();
5410
5411        // Enqueue a second job and fail it only 2 times (still re-queueable).
5412        repo.enqueue_job(EnqueueJobParams {
5413            namespace_id: ns_id,
5414            job_type: "derive_memory",
5415            priority: 10,
5416            perspective: None,
5417            payload: &serde_json::json!({"retry_me": true}),
5418        })
5419        .await
5420        .unwrap();
5421
5422        for _ in 0..2 {
5423            let claimed = repo
5424                .claim_jobs(ns_id, "derive_memory", "w", 60, 10)
5425                .await
5426                .unwrap();
5427            assert_eq!(claimed.len(), 1);
5428            repo.fail_job(&claimed[0], "transient error").await.unwrap();
5429        }
5430
5431        // Purge with a cutoff of 7 days ago.
5432        let cutoff = Utc::now() - chrono::Duration::days(7);
5433        let deleted = repo.purge_permanently_failed_jobs(cutoff).await.unwrap();
5434        assert_eq!(deleted, 1);
5435
5436        // The permanently failed job is gone; the re-queueable job remains as pending.
5437        let remaining = repo.list_jobs(ns_id, None, None, 50, 0).await.unwrap();
5438        assert_eq!(remaining.len(), 1);
5439        assert_eq!(remaining[0].status, memory_job_status::PENDING);
5440    }
5441
5442    /// Active leasing should still work after purging old completed/failed jobs.
5443    #[tokio::test]
5444    async fn test_active_leasing_works_after_purge() {
5445        let pool = setup_test_db().await;
5446        let ns_id = create_namespace(&pool, "purge-lease").await;
5447        let repo = MemoryRepository::new(pool);
5448
5449        // Create and complete an old job.
5450        repo.enqueue_job(EnqueueJobParams {
5451            namespace_id: ns_id,
5452            job_type: "derive_memory",
5453            priority: 10,
5454            perspective: None,
5455            payload: &serde_json::json!({"old": true}),
5456        })
5457        .await
5458        .unwrap();
5459
5460        let claimed = repo
5461            .claim_jobs(ns_id, "derive_memory", "w", 60, 10)
5462            .await
5463            .unwrap();
5464        repo.complete_job(&claimed[0]).await.unwrap();
5465
5466        sqlx::query("UPDATE memory_jobs SET updated_at = datetime('now', '-30 days') WHERE id = ?")
5467            .bind(claimed[0].row.id)
5468            .execute(repo.pool())
5469            .await
5470            .unwrap();
5471
5472        // Purge old completed jobs.
5473        let cutoff = Utc::now() - chrono::Duration::days(7);
5474        repo.purge_completed_jobs(cutoff).await.unwrap();
5475
5476        // Enqueue a fresh job and verify the full claim/complete/fail cycle still works.
5477        repo.enqueue_job(EnqueueJobParams {
5478            namespace_id: ns_id,
5479            job_type: "derive_memory",
5480            priority: 20,
5481            perspective: None,
5482            payload: &serde_json::json!({"fresh": true}),
5483        })
5484        .await
5485        .unwrap();
5486
5487        let fresh_claimed = repo
5488            .claim_jobs(ns_id, "derive_memory", "worker-2", 120, 10)
5489            .await
5490            .unwrap();
5491        assert_eq!(fresh_claimed.len(), 1);
5492        assert_eq!(fresh_claimed[0].row.status, "running");
5493        assert_eq!(fresh_claimed[0].payload["fresh"], true);
5494
5495        // Complete it.
5496        repo.complete_job(&fresh_claimed[0]).await.unwrap();
5497
5498        // Verify no more claimable jobs.
5499        let empty = repo
5500            .claim_jobs(ns_id, "derive_memory", "worker-3", 60, 10)
5501            .await
5502            .unwrap();
5503        assert!(empty.is_empty());
5504    }
5505}