Skip to main content

nexus_memory_storage/
repository.rs

1//! Repository implementations for database operations
2
3use std::collections::{HashMap, HashSet};
4
5use crate::models::{
6    memory_job_status, AgentNamespaceRow, ClaimedMemoryJob, EnqueueJobParams, MemoryEvidenceRow,
7    MemoryJobRow, MemoryLineageEntry, MemoryRow, ProcessedFileRow, SessionDigestRow,
8    SystemMetricRow,
9};
10use crate::{db_error, Result};
11use chrono::{DateTime, Utc};
12use nexus_core::{
13    AgentNamespace, CognitiveLevel, Memory, MemoryCategory, MemoryLaneType, PerspectiveKey,
14};
15use sqlx::SqlitePool;
16
17/// Type alias for backward compatibility
18type Category = MemoryCategory;
19
20/// Parameters for storing a new memory
21pub struct StoreMemoryParams<'a> {
22    pub namespace_id: i64,
23    pub content: &'a str,
24    pub category: &'a Category,
25    pub memory_lane_type: Option<&'a MemoryLaneType>,
26    pub labels: &'a [String],
27    pub metadata: &'a serde_json::Value,
28    pub embedding: Option<&'a [f32]>,
29    pub embedding_model: Option<&'a str>,
30}
31
32/// Parameters for storing a memory with evidence lineage.
33pub struct StoreMemoryWithLineageParams<'a> {
34    pub store: StoreMemoryParams<'a>,
35    pub source_memory_ids: &'a [i64],
36    pub evidence_role: &'a str,
37}
38
39/// Parameters for storing or updating a session digest registration.
40pub struct StoreDigestParams<'a> {
41    pub namespace_id: i64,
42    pub session_key: &'a str,
43    pub digest_kind: &'a str,
44    pub memory_id: i64,
45    pub start_memory_id: Option<i64>,
46    pub end_memory_id: Option<i64>,
47    pub token_count: usize,
48}
49
50pub struct SessionDigestRollover {
51    pub last_digest_end_memory_id: Option<i64>,
52    pub new_memory_count: i64,
53    pub estimated_new_tokens: i64,
54}
55
56pub struct ListMemoryFilters<'a> {
57    pub category: Option<&'a str>,
58    pub since: Option<DateTime<Utc>>,
59    pub until: Option<DateTime<Utc>>,
60    pub content_like: Option<&'a str>,
61    pub include_raw: bool,
62    pub limit: i64,
63    pub offset: i64,
64}
65
66/// Parameters for [`MemoryRepository::search_working_set`].
67pub struct WorkingSetParams<'a> {
68    /// Namespace to search within.
69    pub namespace_id: i64,
70    /// Optional perspective lens. When `None`, queries fall back to
71    /// namespace-only retrieval without observer/subject filtering.
72    pub perspective: Option<&'a PerspectiveKey>,
73    /// Maximum number of memories to return after deduplication.
74    pub max_items: usize,
75    /// When `false` (default), raw-activity hook noise is excluded from
76    /// every bucket.
77    pub include_raw: bool,
78}
79
80/// Parameters for embedding-backed semantic candidate retrieval.
81pub struct SemanticCandidateParams<'a> {
82    /// Namespace to search within.
83    pub namespace_id: i64,
84    /// Optional perspective lens for observer/subject/session scoping.
85    pub perspective: Option<&'a PerspectiveKey>,
86    /// Maximum number of candidate memories to return before vector ranking.
87    pub limit: i64,
88    /// When `false`, raw-activity hook noise is excluded.
89    pub include_raw: bool,
90}
91
92/// A persisted system metric sample.
93#[derive(Debug, Clone)]
94pub struct MetricSample {
95    pub metric_name: String,
96    pub metric_value: f64,
97    pub labels: serde_json::Value,
98}
99
100/// Maximum number of retry attempts before a job is permanently failed.
101const MAX_JOB_ATTEMPTS: i64 = 5;
102const RAW_ACTIVITY_FILTER_SQL: &str =
103    "labels NOT LIKE '%raw-activity%' AND json_extract(COALESCE(metadata, '{}'), '$.raw_activity') IS NULL";
104
105// ---------------------------------------------------------------------------
106// Shared cognitive-metadata query fragments
107// ---------------------------------------------------------------------------
108// These constants are the single source of truth for how the repository
109// extracts values from the `metadata` JSON column.  Every query that touches
110// cognitive fields (observer, subject, session_key, session_keys, level,
111// times_reinforced, times_contradicted) MUST use these fragments so that
112// schema-level changes propagate consistently and cannot drift.
113// ---------------------------------------------------------------------------
114
115/// SQL expression that safely extracts a value from the metadata JSON column,
116/// defaulting to the given JSON literal when metadata is NULL.
117///
118/// Usage in a format! SQL template: `{METADATA}`
119/// Example: `json_extract({METADATA}, '$.cognitive.level')`
120const METADATA: &str = "COALESCE(metadata, '{}')";
121
122/// WHERE-clause fragment that matches a memory whose session context equals the
123/// given session key via either the scalar `session_key` field or the
124/// `session_keys` array.  The fragment expects TWO bound parameters (same value
125/// for both).
126const SESSION_KEY_FILTER: &str =
127    "(json_extract(METADATA, '$.cognitive.session_key') = ? \
128     OR EXISTS (SELECT 1 FROM json_each(COALESCE(json_extract(metadata, '$.cognitive.session_keys'), '[]')) WHERE value = ?))";
129
130/// WHERE-clause fragment that filters by perspective observer and subject.
131/// Does NOT include session_key filtering — combine with [`SESSION_KEY_FILTER`]
132/// when the perspective has a session_key.
133const PERSPECTIVE_IDENTITY_FILTER: &str = "json_extract(METADATA, '$.cognitive.observer') = ? \
134     AND json_extract(METADATA, '$.cognitive.subject') = ?";
135
136/// Cognitive-level extraction expression (no comparison — yields the raw string).
137const COGNITIVE_LEVEL_EXPR: &str = "json_extract(METADATA, '$.cognitive.level')";
138
139/// Assembles the full perspective WHERE clause (observer, subject, and optional
140/// session key) for use in SQL templates.
141///
142/// Returns `(sql_fragment, bind_count)`. The caller must bind:
143/// - observer, subject (always)
144/// - session_key × 2 (if present)
145fn perspective_where_clause(perspective: &PerspectiveKey) -> String {
146    if perspective.session_key.is_some() {
147        format!(
148            "{} AND {}",
149            PERSPECTIVE_IDENTITY_FILTER.replace("METADATA", METADATA),
150            SESSION_KEY_FILTER.replace("METADATA", METADATA),
151        )
152    } else {
153        PERSPECTIVE_IDENTITY_FILTER.replace("METADATA", METADATA)
154    }
155}
156
157/// Bind values for a perspective WHERE clause.
158/// Order: observer, subject, [session_key, session_key]
159fn bind_perspective(perspective: &PerspectiveKey) -> Vec<&str> {
160    let mut vals = vec![perspective.observer.as_str(), perspective.subject.as_str()];
161    if let Some(ref sk) = perspective.session_key {
162        vals.push(sk.as_str());
163        vals.push(sk.as_str());
164    }
165    vals
166}
167
168/// Repository for memory operations
169pub struct MemoryRepository {
170    pool: SqlitePool,
171}
172
173impl MemoryRepository {
174    pub fn new(pool: SqlitePool) -> Self {
175        Self { pool }
176    }
177
178    pub fn pool(&self) -> &SqlitePool {
179        &self.pool
180    }
181
182    /// Store a new memory
183    pub async fn store(&self, params: StoreMemoryParams<'_>) -> Result<Memory> {
184        let labels_json = serde_json::to_string(params.labels)?;
185        let metadata_json = serde_json::to_string(params.metadata)?;
186        let embedding_json = params.embedding.map(serde_json::to_string).transpose()?;
187
188        let result = sqlx::query(
189            r#"
190            INSERT INTO memories (
191                namespace_id, content, category, memory_lane_type, labels, metadata,
192                content_embedding, embedding_model, created_at, is_active, access_count
193            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 1, 0)
194            "#,
195        )
196        .bind(params.namespace_id)
197        .bind(params.content)
198        .bind(params.category.to_string())
199        .bind(params.memory_lane_type.map(|t| t.to_string()))
200        .bind(&labels_json)
201        .bind(&metadata_json)
202        .bind(&embedding_json)
203        .bind(params.embedding_model)
204        .bind(Utc::now())
205        .execute(&self.pool)
206        .await
207        .map_err(db_error)?;
208
209        let id = result.last_insert_rowid();
210
211        // If last_insert_rowid() is 0, the BEFORE INSERT trigger
212        // (trg_memories_same_namespace_merge) detected a duplicate and
213        // raised IGNORE. The existing row was touched (access_count
214        // incremented) — find it by content match.
215        if id == 0 {
216            // Try to find existing row by content match (case-insensitive, trimmed)
217            let row: Option<MemoryRow> = sqlx::query_as(
218                "SELECT * FROM memories WHERE namespace_id = ? AND LOWER(TRIM(content)) = LOWER(TRIM(?)) AND is_active = 1 ORDER BY created_at DESC LIMIT 1"
219            )
220            .bind(params.namespace_id)
221            .bind(params.content)
222            .fetch_optional(&self.pool)
223            .await
224            .map_err(db_error)?;
225
226            if let Some(existing) = row {
227                self.merge_duplicate_memory_context(existing.id, params)
228                    .await?;
229                return self.get_by_id(existing.id).await?.ok_or_else(|| {
230                    nexus_core::NexusError::Storage(format!(
231                        "Duplicate merged row {} could not be reloaded",
232                        existing.id
233                    ))
234                });
235            }
236
237            // If no duplicate found, this is unexpected - log warning but don't fail
238            // This can happen if trigger fires but content doesn't match exactly
239            tracing::warn!(
240                namespace_id = params.namespace_id,
241                content_length = params.content.len(),
242                "Insert returned id 0 but no matching duplicate found - treating as successful insert"
243            );
244
245            // Return success anyway - the insert did happen, we just can't track the id
246            // This prevents hook failures due to this edge case
247            return self
248                .get_by_content(params.namespace_id, params.content)
249                .await;
250        }
251
252        self.get_by_id(id).await?.ok_or_else(|| {
253            nexus_core::NexusError::Storage(format!("Failed to retrieve memory with id {}", id))
254        })
255    }
256
257    async fn merge_duplicate_memory_context(
258        &self,
259        existing_id: i64,
260        params: StoreMemoryParams<'_>,
261    ) -> Result<()> {
262        let current = self.get_by_id(existing_id).await?.ok_or_else(|| {
263            nexus_core::NexusError::Storage(format!(
264                "Failed to load duplicate-merged memory {}",
265                existing_id
266            ))
267        })?;
268
269        let merged_labels = merge_labels(&current.labels, params.labels);
270        let merged_metadata = merge_duplicate_metadata(&current.metadata, params.metadata);
271        let labels_json = serde_json::to_string(&merged_labels)?;
272        let metadata_json = serde_json::to_string(&merged_metadata)?;
273
274        sqlx::query(
275            r#"
276            UPDATE memories
277            SET labels = ?, metadata = ?, updated_at = ?
278            WHERE id = ?
279            "#,
280        )
281        .bind(&labels_json)
282        .bind(&metadata_json)
283        .bind(Utc::now())
284        .bind(existing_id)
285        .execute(&self.pool)
286        .await
287        .map_err(db_error)?;
288
289        Ok(())
290    }
291
292    /// Store a memory and write evidence lineage rows atomically.
293    pub async fn store_with_lineage(
294        &self,
295        params: StoreMemoryWithLineageParams<'_>,
296    ) -> Result<Memory> {
297        let mut tx = self.pool.begin().await.map_err(db_error)?;
298        let memory_id = insert_memory_tx(&mut tx, &params.store).await?;
299        for &source_id in params.source_memory_ids {
300            insert_evidence_tx(&mut tx, memory_id, source_id, params.evidence_role).await?;
301        }
302        tx.commit().await.map_err(db_error)?;
303
304        self.get_by_id(memory_id).await?.ok_or_else(|| {
305            nexus_core::NexusError::Storage(format!(
306                "Failed to retrieve memory with id {} after lineage store",
307                memory_id
308            ))
309        })
310    }
311
312    /// Enqueue a new cognitive job.
313    pub async fn enqueue_job(&self, params: EnqueueJobParams<'_>) -> Result<i64> {
314        let perspective_json = params.perspective.map(serde_json::to_string).transpose()?;
315        let payload_json = serde_json::to_string(params.payload)?;
316
317        let id: i64 = sqlx::query_scalar(
318            r#"
319            INSERT INTO memory_jobs (namespace_id, job_type, status, priority, perspective_json, payload_json, created_at, updated_at)
320            VALUES (?, ?, 'pending', ?, ?, ?, datetime('now'), datetime('now'))
321            RETURNING id
322            "#,
323        )
324        .bind(params.namespace_id)
325        .bind(params.job_type)
326        .bind(params.priority)
327        .bind(&perspective_json)
328        .bind(&payload_json)
329        .fetch_one(&self.pool)
330        .await
331        .map_err(db_error)?;
332
333        Ok(id)
334    }
335
336    /// Claim up to `limit` pending (or stale running) jobs for a given type and namespace.
337    ///
338    /// Stale running jobs are those whose lease has expired (`lease_expires_at < now`).
339    /// Claimed jobs are transitioned to `running` with a lease owner and TTL.
340    pub async fn claim_jobs(
341        &self,
342        namespace_id: i64,
343        job_type: &str,
344        lease_owner: &str,
345        lease_ttl_secs: u64,
346        limit: i64,
347    ) -> Result<Vec<ClaimedMemoryJob>> {
348        let claim_token = new_claim_token(lease_owner);
349        // SQLite's UPDATE ... RETURNING does not preserve CTE ordering.
350        // We must sort in Rust to guarantee priority-first delivery.
351        // This is O(n log n) but n is bounded by the `limit` parameter (typically small).
352        let rows: Vec<MemoryJobRow> = sqlx::query_as::<_, MemoryJobRow>(
353            r#"
354            WITH candidates AS (
355                SELECT id
356                FROM memory_jobs
357                WHERE namespace_id = ? AND job_type = ? AND (
358                    status = ?
359                    OR (status = ? AND lease_expires_at IS NOT NULL AND lease_expires_at < datetime('now'))
360                )
361                ORDER BY priority DESC, created_at ASC
362                LIMIT ?
363            )
364            UPDATE memory_jobs
365            SET status = ?,
366                lease_owner = ?,
367                claim_token = ?,
368                lease_expires_at = datetime('now', '+' || ? || ' seconds'),
369                attempts = attempts + 1,
370                updated_at = datetime('now')
371            WHERE id IN (SELECT id FROM candidates)
372            RETURNING *
373            "#,
374        )
375        .bind(namespace_id)
376        .bind(job_type)
377        .bind(memory_job_status::PENDING)
378        .bind(memory_job_status::RUNNING)
379        .bind(limit)
380        .bind(memory_job_status::RUNNING)
381        .bind(lease_owner)
382        .bind(&claim_token)
383        .bind(lease_ttl_secs as i64)
384        .fetch_all(&self.pool)
385        .await
386        .map_err(db_error)?;
387
388        let mut rows = rows;
389        rows.sort_by(|left, right| {
390            right
391                .priority
392                .cmp(&left.priority)
393                .then_with(|| left.created_at.cmp(&right.created_at))
394        });
395
396        let mut claimed = Vec::with_capacity(rows.len());
397        for row in rows {
398            let perspective = match row.perspective_json.as_deref() {
399                Some(s) => match serde_json::from_str(s) {
400                    Ok(p) => Some(p),
401                    Err(e) => {
402                        tracing::warn!(
403                            job_id = row.id,
404                            error = %e,
405                            "corrupted perspective JSON, permanently failing job"
406                        );
407                        let _ = self
408                            .permanently_fail_job(
409                                row.id,
410                                &row.lease_owner,
411                                &row.claim_token,
412                                &format!("corrupted perspective JSON: {e}"),
413                            )
414                            .await;
415                        continue;
416                    }
417                },
418                None => None,
419            };
420            let payload: serde_json::Value = match serde_json::from_str(&row.payload_json) {
421                Ok(p) => p,
422                Err(e) => {
423                    tracing::warn!(
424                        job_id = row.id,
425                        error = %e,
426                        "corrupted payload JSON, permanently failing job"
427                    );
428                    let _ = self
429                        .permanently_fail_job(
430                            row.id,
431                            &row.lease_owner,
432                            &row.claim_token,
433                            &format!("corrupted payload JSON: {e}"),
434                        )
435                        .await;
436                    continue;
437                }
438            };
439            claimed.push(ClaimedMemoryJob {
440                row,
441                perspective,
442                payload,
443            });
444        }
445
446        Ok(claimed)
447    }
448
449    /// Mark a job as completed.
450    pub async fn complete_job(&self, job: &ClaimedMemoryJob) -> Result<()> {
451        let result = sqlx::query(
452            r#"
453            UPDATE memory_jobs
454            SET status = ?, lease_owner = NULL, claim_token = NULL, lease_expires_at = NULL, updated_at = datetime('now')
455            WHERE id = ?
456              AND lease_owner = ?
457              AND claim_token = ?
458            "#,
459        )
460        .bind(memory_job_status::COMPLETED)
461        .bind(job.row.id)
462        .bind(job.row.lease_owner.as_deref())
463        .bind(job.row.claim_token.as_deref())
464        .execute(&self.pool)
465        .await
466        .map_err(db_error)?;
467
468        if result.rows_affected() == 0 {
469            return Err(nexus_core::NexusError::Storage(format!(
470                "Memory job {} completion lost lease ownership",
471                job.row.id
472            )));
473        }
474
475        Ok(())
476    }
477
478    /// Mark a job as failed. If attempts < MAX_JOB_ATTEMPTS, requeue it as pending.
479    /// Otherwise, mark it permanently failed.
480    pub async fn fail_job(&self, job: &ClaimedMemoryJob, error: &str) -> Result<()> {
481        let row: Option<MemoryJobRow> = sqlx::query_as("SELECT * FROM memory_jobs WHERE id = ?")
482            .bind(job.row.id)
483            .fetch_optional(&self.pool)
484            .await
485            .map_err(db_error)?;
486
487        let row = row.ok_or_else(|| {
488            nexus_core::NexusError::Storage(format!("Memory job {} not found", job.row.id))
489        })?;
490
491        let lease_matches =
492            row.lease_owner == job.row.lease_owner && row.claim_token == job.row.claim_token;
493        if !lease_matches {
494            return Err(nexus_core::NexusError::Storage(format!(
495                "Memory job {} failure lost lease ownership",
496                job.row.id
497            )));
498        }
499
500        if row.attempts >= MAX_JOB_ATTEMPTS {
501            // Permanently failed.
502            let result = sqlx::query(
503                r#"
504                UPDATE memory_jobs
505                SET status = ?, last_error = ?, lease_owner = NULL, claim_token = NULL, lease_expires_at = NULL, updated_at = datetime('now')
506                WHERE id = ? AND lease_owner = ? AND claim_token = ?
507                "#,
508            )
509            .bind(memory_job_status::FAILED)
510            .bind(error)
511            .bind(job.row.id)
512            .bind(job.row.lease_owner.as_deref())
513            .bind(job.row.claim_token.as_deref())
514            .execute(&self.pool)
515            .await
516            .map_err(db_error)?;
517            if result.rows_affected() == 0 {
518                return Err(nexus_core::NexusError::Storage(format!(
519                    "Memory job {} failure lost lease ownership",
520                    job.row.id
521                )));
522            }
523        } else {
524            // Requeue for retry.
525            let result = sqlx::query(
526                r#"
527                UPDATE memory_jobs
528                SET status = ?, last_error = ?, lease_owner = NULL, claim_token = NULL, lease_expires_at = NULL, updated_at = datetime('now')
529                WHERE id = ? AND lease_owner = ? AND claim_token = ?
530                "#,
531            )
532            .bind(memory_job_status::PENDING)
533            .bind(error)
534            .bind(job.row.id)
535            .bind(job.row.lease_owner.as_deref())
536            .bind(job.row.claim_token.as_deref())
537            .execute(&self.pool)
538            .await
539            .map_err(db_error)?;
540            if result.rows_affected() == 0 {
541                return Err(nexus_core::NexusError::Storage(format!(
542                    "Memory job {} failure lost lease ownership",
543                    job.row.id
544                )));
545            }
546        }
547
548        Ok(())
549    }
550
551    /// Permanently fail a job by ID without requiring a ClaimedMemoryJob.
552    /// Used by claim_jobs to handle corrupted jobs without
553    /// poisoning the entire batch.
554    async fn permanently_fail_job(
555        &self,
556        job_id: i64,
557        lease_owner: &Option<String>,
558        claim_token: &Option<String>,
559        error: &str,
560    ) -> Result<()> {
561        sqlx::query(
562            r#"
563            UPDATE memory_jobs
564            SET status = ?, last_error = ?, lease_owner = NULL, claim_token = NULL,
565                lease_expires_at = NULL, updated_at = datetime('now')
566            WHERE id = ? AND lease_owner = ? AND claim_token = ?
567            "#,
568        )
569        .bind(memory_job_status::FAILED)
570        .bind(error)
571        .bind(job_id)
572        .bind(lease_owner.as_deref())
573        .bind(claim_token.as_deref())
574        .execute(&self.pool)
575        .await
576        .map_err(db_error)?;
577        Ok(())
578    }
579
580    pub async fn get_most_reinforced_by_namespace(
581        &self,
582        namespace_id: i64,
583        limit: i64,
584        include_raw: bool,
585    ) -> Result<Vec<Memory>> {
586        let noise_sql = if include_raw {
587            String::new()
588        } else {
589            format!("AND {}", RAW_ACTIVITY_FILTER_SQL)
590        };
591        let rows = sqlx::query_as::<_, MemoryRow>(&format!(
592            r#"
593            SELECT * FROM memories
594            WHERE namespace_id = ?
595              AND is_active = 1
596              AND json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.level') = ?
597              {noise_sql}
598            ORDER BY COALESCE(json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.times_reinforced'), 0) DESC,
599                     created_at DESC
600            LIMIT ?
601            "#
602        ))
603        .bind(namespace_id)
604        .bind(CognitiveLevel::Derived.as_str())
605        .bind(limit)
606        .fetch_all(&self.pool)
607        .await
608        .map_err(db_error)?;
609
610        rows.into_iter()
611            .map(|row| self.row_to_memory(row))
612            .collect()
613    }
614
615    pub async fn get_contradictions_by_namespace(
616        &self,
617        namespace_id: i64,
618        limit: i64,
619        include_raw: bool,
620    ) -> Result<Vec<Memory>> {
621        let noise_sql = if include_raw {
622            String::new()
623        } else {
624            format!("AND {}", RAW_ACTIVITY_FILTER_SQL)
625        };
626        let rows = sqlx::query_as::<_, MemoryRow>(&format!(
627            r#"
628            SELECT * FROM memories
629            WHERE namespace_id = ?
630              AND is_active = 1
631              AND json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.level') = ?
632              {noise_sql}
633            ORDER BY COALESCE(json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.times_contradicted'), 0) DESC,
634                     created_at DESC
635            LIMIT ?
636            "#
637        ))
638        .bind(namespace_id)
639        .bind(CognitiveLevel::Contradiction.as_str())
640        .bind(limit)
641        .fetch_all(&self.pool)
642        .await
643        .map_err(db_error)?;
644
645        rows.into_iter()
646            .map(|row| self.row_to_memory(row))
647            .collect()
648    }
649
650    pub async fn list_by_session_key(
651        &self,
652        namespace_id: i64,
653        session_key: &str,
654        limit: i64,
655        include_raw: bool,
656    ) -> Result<Vec<Memory>> {
657        let noise_sql = if include_raw {
658            String::new()
659        } else {
660            format!("AND {}", RAW_ACTIVITY_FILTER_SQL)
661        };
662        let session_filter = SESSION_KEY_FILTER.replace("METADATA", METADATA);
663        let rows = sqlx::query_as::<_, MemoryRow>(&format!(
664            r#"
665            SELECT * FROM memories
666            WHERE namespace_id = ?
667              AND is_active = 1
668              AND {session_filter}
669              {noise_sql}
670            ORDER BY created_at DESC
671            LIMIT ?
672            "#
673        ))
674        .bind(namespace_id)
675        .bind(session_key)
676        .bind(session_key)
677        .bind(limit)
678        .fetch_all(&self.pool)
679        .await
680        .map_err(db_error)?;
681
682        rows.into_iter()
683            .map(|row| self.row_to_memory(row))
684            .collect()
685    }
686
687    /// Register a session digest for a memory.
688    ///
689    /// Returns the digest row ID. Uses the unique index on
690    /// `(namespace_id, session_key, digest_kind, end_memory_id)` to prevent
691    /// duplicate registrations.
692    pub async fn store_digest(&self, params: StoreDigestParams<'_>) -> Result<i64> {
693        let id: i64 = sqlx::query_scalar(
694            r#"
695            INSERT INTO session_digests (namespace_id, session_key, digest_kind, memory_id, start_memory_id, end_memory_id, token_count, created_at)
696            VALUES (?, ?, ?, ?, ?, ?, ?, datetime('now'))
697            ON CONFLICT(namespace_id, session_key, digest_kind, end_memory_id) DO UPDATE SET
698                memory_id = excluded.memory_id,
699                token_count = excluded.token_count
700            RETURNING id
701            "#,
702        )
703        .bind(params.namespace_id)
704        .bind(params.session_key)
705        .bind(params.digest_kind)
706        .bind(params.memory_id)
707        .bind(params.start_memory_id)
708        .bind(params.end_memory_id)
709        .bind(params.token_count as i64)
710        .fetch_one(&self.pool)
711        .await
712        .map_err(db_error)?;
713
714        Ok(id)
715    }
716
717    /// Get the latest digest memory for a session and digest kind.
718    /// Returns the `Memory` row that the digest points to, or None.
719    pub async fn latest_digest_for_session(
720        &self,
721        namespace_id: i64,
722        session_key: &str,
723        digest_kind: &str,
724    ) -> Result<Option<Memory>> {
725        let digest: Option<SessionDigestRow> = sqlx::query_as::<_, SessionDigestRow>(
726            "SELECT * FROM session_digests WHERE namespace_id = ? AND session_key = ? AND digest_kind = ? ORDER BY created_at DESC LIMIT 1",
727        )
728        .bind(namespace_id)
729        .bind(session_key)
730        .bind(digest_kind)
731        .fetch_optional(&self.pool)
732        .await
733        .map_err(db_error)?;
734
735        match digest {
736            Some(d) => self.get_by_id(d.memory_id).await,
737            None => Ok(None),
738        }
739    }
740
741    /// Get the latest digest memory for a namespace and digest kind, regardless
742    /// of session key. Returns the `Memory` row that the digest points to, or
743    /// None if no digest exists.
744    pub async fn latest_digest_for_namespace(
745        &self,
746        namespace_id: i64,
747        digest_kind: &str,
748    ) -> Result<Option<Memory>> {
749        let digest: Option<SessionDigestRow> = sqlx::query_as::<_, SessionDigestRow>(
750            "SELECT * FROM session_digests WHERE namespace_id = ? AND digest_kind = ? ORDER BY created_at DESC LIMIT 1",
751        )
752        .bind(namespace_id)
753        .bind(digest_kind)
754        .fetch_optional(&self.pool)
755        .await
756        .map_err(db_error)?;
757
758        match digest {
759            Some(d) => self.get_by_id(d.memory_id).await,
760            None => Ok(None),
761        }
762    }
763
764    /// Return how much fresh non-raw, non-digest session content has accumulated
765    /// since the latest covered digest window.
766    pub async fn session_digest_rollover(
767        &self,
768        namespace_id: i64,
769        session_key: &str,
770    ) -> Result<SessionDigestRollover> {
771        let last_digest_end_memory_id: Option<i64> = sqlx::query_scalar(
772            "SELECT MAX(end_memory_id) FROM session_digests WHERE namespace_id = ? AND session_key = ?",
773        )
774        .bind(namespace_id)
775        .bind(session_key)
776        .fetch_one(&self.pool)
777        .await
778        .map_err(db_error)?;
779
780        let (new_memory_count, estimated_new_tokens): (i64, i64) = sqlx::query_as(&format!(
781            r#"
782            SELECT
783                COUNT(*) as new_memory_count,
784                COALESCE(SUM((LENGTH(content) + 3) / 4), 0) as estimated_new_tokens
785            FROM memories
786            WHERE namespace_id = ?
787              AND is_active = 1
788              AND id > ?
789              AND (
790                  json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.session_key') = ?
791                  OR EXISTS (
792                      SELECT 1
793                      FROM json_each(COALESCE(json_extract(metadata, '$.cognitive.session_keys'), '[]'))
794                      WHERE value = ?
795                  )
796              )
797              AND {RAW_ACTIVITY_FILTER_SQL}
798              AND COALESCE(json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.level'), '') NOT IN ('raw', 'summary_short', 'summary_long')
799            "#,
800        ))
801        .bind(namespace_id)
802        .bind(last_digest_end_memory_id.unwrap_or(0))
803        .bind(session_key)
804        .bind(session_key)
805        .fetch_one(&self.pool)
806        .await
807        .map_err(db_error)?;
808
809        Ok(SessionDigestRollover {
810            last_digest_end_memory_id,
811            new_memory_count,
812            estimated_new_tokens,
813        })
814    }
815
816    /// Get recent memories scoped to a perspective, optionally narrowed to a session.
817    ///
818    /// By default, raw-activity noise is excluded. Set `include_raw` to `true` to
819    /// include operational hook event payloads.
820    pub async fn get_recent_by_perspective(
821        &self,
822        namespace_id: i64,
823        perspective: &PerspectiveKey,
824        limit: i64,
825    ) -> Result<Vec<Memory>> {
826        self.get_recent_by_perspective_opts(namespace_id, perspective, limit, false)
827            .await
828    }
829
830    /// Like [`Self::get_recent_by_perspective`] but allows opting into raw-activity
831    /// noise inclusion.
832    pub async fn get_recent_by_perspective_opts(
833        &self,
834        namespace_id: i64,
835        perspective: &PerspectiveKey,
836        limit: i64,
837        include_raw: bool,
838    ) -> Result<Vec<Memory>> {
839        let noise_sql = if include_raw {
840            String::new()
841        } else {
842            format!("AND {}", RAW_ACTIVITY_FILTER_SQL)
843        };
844
845        let perspective_sql = perspective_where_clause(perspective);
846        let sql = format!(
847            r#"
848            SELECT * FROM memories
849            WHERE namespace_id = ?
850              AND is_active = 1
851              AND {perspective_sql}
852              {noise_sql}
853            ORDER BY created_at DESC
854            LIMIT ?
855            "#
856        );
857        let mut query = sqlx::query_as::<_, MemoryRow>(&sql).bind(namespace_id);
858
859        for val in bind_perspective(perspective) {
860            query = query.bind(val);
861        }
862
863        let rows = query
864            .bind(limit)
865            .fetch_all(&self.pool)
866            .await
867            .map_err(db_error)?;
868
869        rows.into_iter()
870            .map(|row| self.row_to_memory(row))
871            .collect()
872    }
873
874    /// Get active memories by cognitive level ordered from newest to oldest.
875    pub async fn get_by_cognitive_level(
876        &self,
877        namespace_id: i64,
878        level: CognitiveLevel,
879        limit: i64,
880    ) -> Result<Vec<Memory>> {
881        let rows = sqlx::query_as::<_, MemoryRow>(&format!(
882            r#"
883            SELECT * FROM memories
884            WHERE namespace_id = ?
885              AND is_active = 1
886              AND {} = ?
887            ORDER BY created_at DESC
888            LIMIT ?
889            "#,
890            COGNITIVE_LEVEL_EXPR,
891        ))
892        .bind(namespace_id)
893        .bind(level.as_str())
894        .bind(limit)
895        .fetch_all(&self.pool)
896        .await
897        .map_err(db_error)?;
898
899        rows.into_iter()
900            .map(|row| self.row_to_memory(row))
901            .collect()
902    }
903
904    /// Get active memories by cognitive level and perspective, ordered from newest to oldest.
905    ///
906    /// Unlike [`Self::get_by_cognitive_level`], this method applies perspective filtering
907    /// (observer, subject, session_key, and session_keys arrays) in the SQL query BEFORE
908    /// the LIMIT is applied, ensuring the caller receives up to `limit` matching results.
909    pub async fn get_by_cognitive_level_with_perspective(
910        &self,
911        namespace_id: i64,
912        level: CognitiveLevel,
913        perspective: &PerspectiveKey,
914        limit: i64,
915    ) -> Result<Vec<Memory>> {
916        let perspective_sql = perspective_where_clause(perspective);
917        let sql = format!(
918            r#"
919            SELECT * FROM memories
920            WHERE namespace_id = ?
921              AND is_active = 1
922              AND {COGNITIVE_LEVEL_EXPR} = ?
923              AND {perspective_sql}
924            ORDER BY created_at DESC
925            LIMIT ?
926            "#
927        );
928        let mut query = sqlx::query_as::<_, MemoryRow>(&sql)
929            .bind(namespace_id)
930            .bind(level.as_str());
931
932        for val in bind_perspective(perspective) {
933            query = query.bind(val);
934        }
935
936        let rows = query
937            .bind(limit)
938            .fetch_all(&self.pool)
939            .await
940            .map_err(db_error)?;
941
942        rows.into_iter()
943            .map(|row| self.row_to_memory(row))
944            .collect()
945    }
946
947    /// Get the most reinforced perspective-aligned memories first.
948    ///
949    /// By default, raw-activity noise and contradiction memories are excluded.
950    pub async fn get_most_reinforced_by_perspective(
951        &self,
952        namespace_id: i64,
953        perspective: &PerspectiveKey,
954        limit: i64,
955    ) -> Result<Vec<Memory>> {
956        self.get_most_reinforced_by_perspective_opts(namespace_id, perspective, limit, false)
957            .await
958    }
959
960    /// Like [`Self::get_most_reinforced_by_perspective`] but allows opting into
961    /// raw-activity noise inclusion.
962    pub async fn get_most_reinforced_by_perspective_opts(
963        &self,
964        namespace_id: i64,
965        perspective: &PerspectiveKey,
966        limit: i64,
967        include_raw: bool,
968    ) -> Result<Vec<Memory>> {
969        let noise_sql = if include_raw {
970            String::new()
971        } else {
972            format!("AND {}", RAW_ACTIVITY_FILTER_SQL)
973        };
974
975        let perspective_sql = perspective_where_clause(perspective);
976        let sql = format!(
977            r#"
978            SELECT * FROM memories
979            WHERE namespace_id = ?
980              AND is_active = 1
981              AND {perspective_sql}
982              AND {COGNITIVE_LEVEL_EXPR} != ?
983              {noise_sql}
984            ORDER BY COALESCE(json_extract({METADATA}, '$.cognitive.times_reinforced'), 0) DESC,
985                     created_at DESC
986            LIMIT ?
987            "#
988        );
989        let mut query = sqlx::query_as::<_, MemoryRow>(&sql).bind(namespace_id);
990
991        for val in bind_perspective(perspective) {
992            query = query.bind(val);
993        }
994
995        let rows = query
996            .bind(CognitiveLevel::Contradiction.as_str())
997            .bind(limit)
998            .fetch_all(&self.pool)
999            .await
1000            .map_err(db_error)?;
1001
1002        rows.into_iter()
1003            .map(|row| self.row_to_memory(row))
1004            .collect()
1005    }
1006
1007    /// Get contradiction memories for the requested perspective.
1008    ///
1009    /// By default, raw-activity noise is excluded.
1010    pub async fn get_contradictions_by_perspective(
1011        &self,
1012        namespace_id: i64,
1013        perspective: &PerspectiveKey,
1014        limit: i64,
1015    ) -> Result<Vec<Memory>> {
1016        self.get_contradictions_by_perspective_opts(namespace_id, perspective, limit, false)
1017            .await
1018    }
1019
1020    /// Like [`Self::get_contradictions_by_perspective`] but allows opting into
1021    /// raw-activity noise inclusion.
1022    pub async fn get_contradictions_by_perspective_opts(
1023        &self,
1024        namespace_id: i64,
1025        perspective: &PerspectiveKey,
1026        limit: i64,
1027        include_raw: bool,
1028    ) -> Result<Vec<Memory>> {
1029        let noise_sql = if include_raw {
1030            String::new()
1031        } else {
1032            format!("AND {}", RAW_ACTIVITY_FILTER_SQL)
1033        };
1034
1035        let perspective_sql = perspective_where_clause(perspective);
1036        let sql = format!(
1037            r#"
1038            SELECT * FROM memories
1039            WHERE namespace_id = ?
1040              AND is_active = 1
1041              AND {perspective_sql}
1042              AND {COGNITIVE_LEVEL_EXPR} = ?
1043              {noise_sql}
1044            ORDER BY COALESCE(json_extract({METADATA}, '$.cognitive.times_contradicted'), 0) DESC,
1045                     created_at DESC
1046            LIMIT ?
1047            "#
1048        );
1049        let mut query = sqlx::query_as::<_, MemoryRow>(&sql).bind(namespace_id);
1050
1051        for val in bind_perspective(perspective) {
1052            query = query.bind(val);
1053        }
1054
1055        let rows = query
1056            .bind(CognitiveLevel::Contradiction.as_str())
1057            .bind(limit)
1058            .fetch_all(&self.pool)
1059            .await
1060            .map_err(db_error)?;
1061
1062        rows.into_iter()
1063            .map(|row| self.row_to_memory(row))
1064            .collect()
1065    }
1066
1067    // ---------------------------------------------------------------------------
1068    // Working-set assembly
1069    // ---------------------------------------------------------------------------
1070
1071    /// Assemble a bounded working set from multiple retrieval buckets.
1072    ///
1073    /// This is the low-level repository primitive used by higher-level services
1074    /// (e.g. `RepresentationService`) to build a [`WorkingRepresentation`].
1075    ///
1076    /// The method queries four buckets in sequence:
1077    ///
1078    /// 1. **Digests** — latest short + long session digests
1079    /// 2. **Reinforced** — highest reinforcement-scored memories
1080    /// 3. **Recent** — newest perspective-aligned memories
1081    /// 4. **Contradictions** — highest-confidence contradiction records
1082    ///
1083    /// Results are deduplicated by memory ID (first-seen-wins in bucket order)
1084    /// and truncated to `max_items`.
1085    pub async fn search_working_set(&self, params: WorkingSetParams<'_>) -> Result<Vec<Memory>> {
1086        let WorkingSetParams {
1087            namespace_id,
1088            perspective,
1089            max_items,
1090            include_raw,
1091        } = params;
1092
1093        // Each sub-query requests more than max_items so dedup still has enough
1094        // material after filtering.
1095        let per_bucket = (max_items as i64).max(8);
1096
1097        // 1. Digests
1098        let session = perspective
1099            .and_then(|p| p.session_key.as_deref())
1100            .unwrap_or("");
1101        let mut digests = Vec::new();
1102        if let Some(short) = self
1103            .latest_digest_for_session(namespace_id, session, "short")
1104            .await?
1105        {
1106            digests.push(short);
1107        }
1108        if let Some(long) = self
1109            .latest_digest_for_session(namespace_id, session, "long")
1110            .await?
1111        {
1112            digests.push(long);
1113        }
1114
1115        // 2-4. Perspective or namespace-level queries
1116        let reinforced = if let Some(persp) = perspective {
1117            self.get_most_reinforced_by_perspective_opts(
1118                namespace_id,
1119                persp,
1120                per_bucket,
1121                include_raw,
1122            )
1123            .await?
1124        } else {
1125            self.get_most_reinforced_by_namespace(namespace_id, per_bucket, include_raw)
1126                .await?
1127        };
1128
1129        let recent = if let Some(persp) = perspective {
1130            self.get_recent_by_perspective_opts(namespace_id, persp, per_bucket, include_raw)
1131                .await?
1132        } else {
1133            let sql = if include_raw {
1134                "SELECT * FROM memories WHERE namespace_id = ? AND is_active = 1 ORDER BY created_at DESC LIMIT ?"
1135                    .to_string()
1136            } else {
1137                format!(
1138                    "SELECT * FROM memories WHERE namespace_id = ? AND is_active = 1 AND {} ORDER BY created_at DESC LIMIT ?",
1139                    RAW_ACTIVITY_FILTER_SQL,
1140                )
1141            };
1142            let rows: Vec<MemoryRow> = sqlx::query_as(&sql)
1143                .bind(namespace_id)
1144                .bind(per_bucket)
1145                .fetch_all(&self.pool)
1146                .await
1147                .map_err(db_error)?;
1148            rows.into_iter()
1149                .map(|r| self.row_to_memory(r))
1150                .collect::<Result<Vec<_>>>()?
1151        };
1152
1153        let contradictions = if let Some(persp) = perspective {
1154            self.get_contradictions_by_perspective_opts(
1155                namespace_id,
1156                persp,
1157                per_bucket,
1158                include_raw,
1159            )
1160            .await?
1161        } else {
1162            self.get_contradictions_by_namespace(namespace_id, per_bucket, include_raw)
1163                .await?
1164        };
1165
1166        // Merge: digests → reinforced → recent → contradictions, dedupe by ID
1167        let mut seen = std::collections::HashSet::new();
1168        let mut result = Vec::with_capacity(max_items);
1169
1170        for memory in digests
1171            .into_iter()
1172            .chain(reinforced)
1173            .chain(recent)
1174            .chain(contradictions)
1175        {
1176            if seen.insert(memory.id) {
1177                result.push(memory);
1178                if result.len() >= max_items {
1179                    break;
1180                }
1181            }
1182        }
1183
1184        Ok(result)
1185    }
1186
1187    /// Load all evidence lineage entries for a given memory (as derived or source).
1188    pub async fn load_lineage(&self, memory_id: i64) -> Result<Vec<MemoryLineageEntry>> {
1189        let rows: Vec<MemoryEvidenceRow> = sqlx::query_as::<_, MemoryEvidenceRow>(
1190            r#"
1191            SELECT * FROM memory_evidence
1192            WHERE derived_memory_id = ? OR source_memory_id = ?
1193            ORDER BY created_at ASC
1194            "#,
1195        )
1196        .bind(memory_id)
1197        .bind(memory_id)
1198        .fetch_all(&self.pool)
1199        .await
1200        .map_err(db_error)?;
1201
1202        Ok(rows
1203            .into_iter()
1204            .map(|r| MemoryLineageEntry {
1205                derived_memory_id: r.derived_memory_id,
1206                source_memory_id: r.source_memory_id,
1207                evidence_role: r.evidence_role,
1208            })
1209            .collect())
1210    }
1211
1212    /// Load lineage rows for many source/derived memory IDs in one query.
1213    pub async fn load_lineage_batch(
1214        &self,
1215        memory_ids: &[i64],
1216    ) -> Result<HashMap<i64, Vec<MemoryLineageEntry>>> {
1217        if memory_ids.is_empty() {
1218            return Ok(HashMap::new());
1219        }
1220
1221        let placeholders = memory_ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1222        let sql = format!(
1223            r#"
1224            SELECT * FROM memory_evidence
1225            WHERE derived_memory_id IN ({placeholders})
1226               OR source_memory_id IN ({placeholders})
1227            ORDER BY created_at ASC
1228            "#
1229        );
1230
1231        let mut query = sqlx::query_as::<_, MemoryEvidenceRow>(&sql);
1232        for id in memory_ids {
1233            query = query.bind(*id);
1234        }
1235        for id in memory_ids {
1236            query = query.bind(*id);
1237        }
1238
1239        let rows = query.fetch_all(&self.pool).await.map_err(db_error)?;
1240        let id_set: HashSet<i64> = memory_ids.iter().copied().collect();
1241        let mut grouped: HashMap<i64, Vec<MemoryLineageEntry>> = HashMap::new();
1242
1243        for row in rows {
1244            let entry = MemoryLineageEntry {
1245                derived_memory_id: row.derived_memory_id,
1246                source_memory_id: row.source_memory_id,
1247                evidence_role: row.evidence_role,
1248            };
1249
1250            if id_set.contains(&entry.derived_memory_id) {
1251                grouped
1252                    .entry(entry.derived_memory_id)
1253                    .or_default()
1254                    .push(entry.clone());
1255            }
1256            if id_set.contains(&entry.source_memory_id) {
1257                grouped
1258                    .entry(entry.source_memory_id)
1259                    .or_default()
1260                    .push(entry);
1261            }
1262        }
1263
1264        Ok(grouped)
1265    }
1266
1267    /// Get a memory by ID
1268    pub async fn get_by_id(&self, id: i64) -> Result<Option<Memory>> {
1269        let row: Option<MemoryRow> = sqlx::query_as("SELECT * FROM memories WHERE id = ?")
1270            .bind(id)
1271            .fetch_optional(&self.pool)
1272            .await
1273            .map_err(db_error)?;
1274
1275        row.map(|r| self.row_to_memory(r)).transpose()
1276    }
1277
1278    /// Get a memory by namespace and content (fallback for id 0 edge case)
1279    pub async fn get_by_content(&self, namespace_id: i64, content: &str) -> Result<Memory> {
1280        let row: Option<MemoryRow> = sqlx::query_as(
1281            "SELECT * FROM memories WHERE namespace_id = ? AND LOWER(TRIM(content)) = LOWER(TRIM(?)) AND is_active = 1 ORDER BY created_at DESC LIMIT 1"
1282        )
1283        .bind(namespace_id)
1284        .bind(content)
1285        .fetch_optional(&self.pool)
1286        .await
1287        .map_err(db_error)?;
1288
1289        row.map(|r| self.row_to_memory(r))
1290            .transpose()?
1291            .ok_or_else(|| {
1292                nexus_core::NexusError::Storage(
1293                    "No memories found in namespace after insert".to_string(),
1294                )
1295            })
1296    }
1297
1298    /// Search memories by namespace
1299    pub async fn search_by_namespace(
1300        &self,
1301        namespace_id: i64,
1302        limit: usize,
1303        offset: usize,
1304    ) -> Result<Vec<Memory>> {
1305        let rows: Vec<MemoryRow> = sqlx::query_as(
1306            "SELECT * FROM memories WHERE namespace_id = ? AND is_active = 1 ORDER BY created_at DESC LIMIT ? OFFSET ?"
1307        )
1308        .bind(namespace_id)
1309        .bind(limit as i64)
1310        .bind(offset as i64)
1311        .fetch_all(&self.pool)
1312        .await
1313        .map_err(db_error)?;
1314
1315        rows.into_iter().map(|r| self.row_to_memory(r)).collect()
1316    }
1317
1318    /// Count memories in namespace
1319    pub async fn count_by_namespace(&self, namespace_id: i64) -> Result<i64> {
1320        let count: (i64,) = sqlx::query_as(
1321            "SELECT COUNT(*) FROM memories WHERE namespace_id = ? AND is_active = 1",
1322        )
1323        .bind(namespace_id)
1324        .fetch_one(&self.pool)
1325        .await
1326        .map_err(db_error)?;
1327
1328        Ok(count.0)
1329    }
1330
1331    /// Count all memories in namespace (including inactive/archived)
1332    pub async fn count_all_by_namespace(&self, namespace_id: i64) -> Result<i64> {
1333        let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM memories WHERE namespace_id = ?")
1334            .bind(namespace_id)
1335            .fetch_one(&self.pool)
1336            .await
1337            .map_err(db_error)?;
1338
1339        Ok(count.0)
1340    }
1341
1342    /// Count archived memories in namespace
1343    pub async fn count_archived_by_namespace(&self, namespace_id: i64) -> Result<i64> {
1344        let count: (i64,) = sqlx::query_as(
1345            "SELECT COUNT(*) FROM memories WHERE namespace_id = ? AND is_archived = 1",
1346        )
1347        .bind(namespace_id)
1348        .fetch_one(&self.pool)
1349        .await
1350        .map_err(db_error)?;
1351
1352        Ok(count.0)
1353    }
1354
1355    /// Delete a memory
1356    pub async fn delete(&self, id: i64) -> Result<bool> {
1357        let result = sqlx::query("DELETE FROM memories WHERE id = ?")
1358            .bind(id)
1359            .execute(&self.pool)
1360            .await
1361            .map_err(db_error)?;
1362
1363        Ok(result.rows_affected() > 0)
1364    }
1365
1366    /// Update access count
1367    pub async fn touch(&self, id: i64) -> Result<()> {
1368        sqlx::query(
1369            "UPDATE memories SET access_count = access_count + 1, last_accessed = ? WHERE id = ?",
1370        )
1371        .bind(Utc::now())
1372        .bind(id)
1373        .execute(&self.pool)
1374        .await
1375        .map_err(db_error)?;
1376
1377        Ok(())
1378    }
1379
1380    /// Get unconsolidated memories
1381    pub async fn get_unconsolidated(
1382        &self,
1383        namespace_id: i64,
1384        limit: i32,
1385    ) -> Result<Vec<MemoryRow>> {
1386        let rows = sqlx::query_as::<_, MemoryRow>(
1387            r#"
1388            SELECT * FROM memories
1389            WHERE namespace_id = ?
1390            AND is_active = 1
1391            AND (metadata IS NULL OR json_extract(metadata, '$.agent.consolidated') IS NULL)
1392            ORDER BY created_at ASC
1393            LIMIT ?
1394            "#,
1395        )
1396        .bind(namespace_id)
1397        .bind(limit)
1398        .fetch_all(&self.pool)
1399        .await
1400        .map_err(db_error)?;
1401
1402        Ok(rows)
1403    }
1404
1405    /// Mark a memory as consolidated
1406    pub async fn mark_consolidated(&self, id: i64) -> Result<()> {
1407        sqlx::query(
1408            r#"
1409            UPDATE memories
1410            SET metadata = json_set(
1411                COALESCE(metadata, '{}'),
1412                '$.agent.consolidated',
1413                true,
1414                '$.agent.consolidated_at',
1415                datetime('now')
1416            ),
1417            updated_at = datetime('now')
1418            WHERE id = ?
1419            "#,
1420        )
1421        .bind(id)
1422        .execute(&self.pool)
1423        .await
1424        .map_err(db_error)?;
1425
1426        Ok(())
1427    }
1428
1429    /// Mark multiple memories as consolidated in a single query
1430    pub async fn mark_consolidated_batch(&self, ids: &[i64]) -> Result<()> {
1431        if ids.is_empty() {
1432            return Ok(());
1433        }
1434        let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1435        let query = format!(
1436            r#"
1437            UPDATE memories
1438            SET metadata = json_set(
1439                COALESCE(metadata, '{{}}'),
1440                '$.agent.consolidated',
1441                true,
1442                '$.agent.consolidated_at',
1443                datetime('now')
1444            ),
1445            updated_at = datetime('now')
1446            WHERE id IN ({})
1447            "#,
1448            placeholders
1449        );
1450        let mut q = sqlx::query(&query);
1451        for id in ids {
1452            q = q.bind(*id);
1453        }
1454        q.execute(&self.pool).await.map_err(db_error)?;
1455        Ok(())
1456    }
1457
1458    /// Search memories by text content (LIKE search)
1459    pub async fn search_by_text(
1460        &self,
1461        namespace_id: i64,
1462        query: &str,
1463        limit: i32,
1464        include_raw: bool,
1465    ) -> Result<Vec<MemoryRow>> {
1466        let pattern = format!("%{}%", query);
1467        let raw_clause = if include_raw {
1468            String::new()
1469        } else {
1470            format!("AND {RAW_ACTIVITY_FILTER_SQL}")
1471        };
1472        let rows = sqlx::query_as::<_, MemoryRow>(&format!(
1473            r#"
1474            SELECT * FROM memories
1475            WHERE namespace_id = ?
1476            AND is_active = 1
1477            AND content LIKE ?
1478            {}
1479            ORDER BY updated_at DESC
1480            LIMIT ?
1481            "#,
1482            raw_clause
1483        ))
1484        .bind(namespace_id)
1485        .bind(&pattern)
1486        .bind(limit)
1487        .fetch_all(&self.pool)
1488        .await
1489        .map_err(db_error)?;
1490
1491        Ok(rows)
1492    }
1493
1494    /// Search memories by text content and return domain memories.
1495    pub async fn search_by_text_memories(
1496        &self,
1497        namespace_id: i64,
1498        query: &str,
1499        limit: i32,
1500        include_raw: bool,
1501    ) -> Result<Vec<Memory>> {
1502        let rows = self
1503            .search_by_text(namespace_id, query, limit, include_raw)
1504            .await?;
1505        rows.into_iter()
1506            .map(|row| self.row_to_memory(row))
1507            .collect()
1508    }
1509
1510    /// Fetch recent, embedding-bearing cognition memories for vector-first semantic recall.
1511    pub async fn get_semantic_candidates(
1512        &self,
1513        params: SemanticCandidateParams<'_>,
1514    ) -> Result<Vec<Memory>> {
1515        let SemanticCandidateParams {
1516            namespace_id,
1517            perspective,
1518            limit,
1519            include_raw,
1520        } = params;
1521
1522        let noise_sql = if include_raw {
1523            String::new()
1524        } else {
1525            format!("AND {}", RAW_ACTIVITY_FILTER_SQL)
1526        };
1527
1528        let rows = if let Some(perspective) = perspective {
1529            let sql = if perspective.session_key.is_some() {
1530                format!(
1531                    r#"
1532                    SELECT * FROM memories
1533                    WHERE namespace_id = ?
1534                      AND is_active = 1
1535                      AND content_embedding IS NOT NULL
1536                      AND json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.observer') = ?
1537                      AND json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.subject') = ?
1538                      AND (
1539                          json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.session_key') = ?
1540                          OR EXISTS (
1541                              SELECT 1
1542                              FROM json_each(COALESCE(json_extract(metadata, '$.cognitive.session_keys'), '[]'))
1543                              WHERE value = ?
1544                          )
1545                      )
1546                      {noise_sql}
1547                    ORDER BY updated_at DESC, created_at DESC
1548                    LIMIT ?
1549                    "#
1550                )
1551            } else {
1552                format!(
1553                    r#"
1554                    SELECT * FROM memories
1555                    WHERE namespace_id = ?
1556                      AND is_active = 1
1557                      AND content_embedding IS NOT NULL
1558                      AND json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.observer') = ?
1559                      AND json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.subject') = ?
1560                      {noise_sql}
1561                    ORDER BY updated_at DESC, created_at DESC
1562                    LIMIT ?
1563                    "#
1564                )
1565            };
1566
1567            let mut query = sqlx::query_as::<_, MemoryRow>(&sql)
1568                .bind(namespace_id)
1569                .bind(&perspective.observer)
1570                .bind(&perspective.subject);
1571
1572            if let Some(session_key) = &perspective.session_key {
1573                query = query.bind(session_key);
1574                query = query.bind(session_key);
1575            }
1576
1577            query
1578                .bind(limit)
1579                .fetch_all(&self.pool)
1580                .await
1581                .map_err(db_error)?
1582        } else {
1583            let sql = if include_raw {
1584                r#"
1585                SELECT * FROM memories
1586                WHERE namespace_id = ?
1587                  AND is_active = 1
1588                  AND content_embedding IS NOT NULL
1589                ORDER BY updated_at DESC, created_at DESC
1590                LIMIT ?
1591                "#
1592                .to_string()
1593            } else {
1594                format!(
1595                    r#"
1596                    SELECT * FROM memories
1597                    WHERE namespace_id = ?
1598                      AND is_active = 1
1599                      AND content_embedding IS NOT NULL
1600                      AND {}
1601                    ORDER BY updated_at DESC, created_at DESC
1602                    LIMIT ?
1603                    "#,
1604                    RAW_ACTIVITY_FILTER_SQL,
1605                )
1606            };
1607
1608            sqlx::query_as::<_, MemoryRow>(&sql)
1609                .bind(namespace_id)
1610                .bind(limit)
1611                .fetch_all(&self.pool)
1612                .await
1613                .map_err(db_error)?
1614        };
1615
1616        rows.into_iter()
1617            .map(|row| self.row_to_memory(row))
1618            .collect()
1619    }
1620
1621    /// List memories with optional filters
1622    pub async fn list_filtered(
1623        &self,
1624        namespace_id: i64,
1625        filters: ListMemoryFilters<'_>,
1626    ) -> Result<Vec<Memory>> {
1627        // Build WHERE clause dynamically
1628        let mut conditions = vec!["namespace_id = ?1".to_string(), "is_active = 1".to_string()];
1629        let mut param_idx = 2u32;
1630
1631        if filters.category.is_some() {
1632            conditions.push(format!("category = ?{}", param_idx));
1633            param_idx += 1;
1634        }
1635        if filters.since.is_some() {
1636            conditions.push(format!("created_at >= ?{}", param_idx));
1637            param_idx += 1;
1638        }
1639        if filters.until.is_some() {
1640            conditions.push(format!("created_at <= ?{}", param_idx));
1641            param_idx += 1;
1642        }
1643        if filters.content_like.is_some() {
1644            conditions.push(format!("content LIKE ?{}", param_idx));
1645            param_idx += 1;
1646        }
1647        if !filters.include_raw {
1648            conditions.push(RAW_ACTIVITY_FILTER_SQL.to_string());
1649        }
1650
1651        let sql = format!(
1652            "SELECT * FROM memories WHERE {} ORDER BY created_at DESC LIMIT ?{} OFFSET ?{}",
1653            conditions.join(" AND "),
1654            param_idx,
1655            param_idx + 1,
1656        );
1657
1658        let mut query = sqlx::query_as::<_, MemoryRow>(&sql).bind(namespace_id);
1659
1660        if let Some(cat) = filters.category {
1661            query = query.bind(cat.to_string());
1662        }
1663        if let Some(s) = filters.since {
1664            query = query.bind(s);
1665        }
1666        if let Some(u) = filters.until {
1667            query = query.bind(u);
1668        }
1669        if let Some(search) = filters.content_like {
1670            query = query.bind(format!("%{}%", search));
1671        }
1672
1673        let rows: Vec<MemoryRow> = query
1674            .bind(filters.limit)
1675            .bind(filters.offset)
1676            .fetch_all(&self.pool)
1677            .await
1678            .map_err(db_error)?;
1679
1680        rows.into_iter().map(|r| self.row_to_memory(r)).collect()
1681    }
1682
1683    /// List memories that still need cognition metadata backfilled.
1684    pub async fn list_missing_cognitive_metadata(
1685        &self,
1686        namespace_id: i64,
1687        limit: i64,
1688        offset: i64,
1689    ) -> Result<Vec<Memory>> {
1690        let rows: Vec<MemoryRow> = sqlx::query_as(
1691            r#"
1692            SELECT * FROM memories
1693            WHERE namespace_id = ?
1694            AND json_extract(COALESCE(metadata, '{}'), '$.cognitive.level') IS NULL
1695            ORDER BY id ASC
1696            LIMIT ? OFFSET ?
1697            "#,
1698        )
1699        .bind(namespace_id)
1700        .bind(limit)
1701        .bind(offset)
1702        .fetch_all(&self.pool)
1703        .await
1704        .map_err(db_error)?;
1705
1706        rows.into_iter().map(|r| self.row_to_memory(r)).collect()
1707    }
1708
1709    /// Count memories that still need cognition metadata backfilled.
1710    pub async fn count_missing_cognitive_metadata(&self, namespace_id: i64) -> Result<i64> {
1711        let count: i64 = sqlx::query_scalar(
1712            r#"
1713            SELECT COUNT(*) FROM memories
1714            WHERE namespace_id = ?
1715            AND json_extract(COALESCE(metadata, '{}'), '$.cognitive.level') IS NULL
1716            "#,
1717        )
1718        .bind(namespace_id)
1719        .fetch_one(&self.pool)
1720        .await
1721        .map_err(db_error)?;
1722
1723        Ok(count)
1724    }
1725
1726    /// Replace the metadata blob for a single memory.
1727    pub async fn update_memory_metadata(
1728        &self,
1729        memory_id: i64,
1730        metadata: &serde_json::Value,
1731    ) -> Result<()> {
1732        let metadata_json = serde_json::to_string(metadata)?;
1733        sqlx::query(
1734            r#"
1735            UPDATE memories
1736            SET metadata = ?, updated_at = ?
1737            WHERE id = ?
1738            "#,
1739        )
1740        .bind(metadata_json)
1741        .bind(Utc::now())
1742        .bind(memory_id)
1743        .execute(&self.pool)
1744        .await
1745        .map_err(db_error)?;
1746
1747        Ok(())
1748    }
1749
1750    /// List distinct session keys that have cognitive metadata but no digest coverage yet.
1751    pub async fn list_session_keys_without_digests(
1752        &self,
1753        namespace_id: i64,
1754        limit: i64,
1755    ) -> Result<Vec<String>> {
1756        let rows: Vec<(String,)> = sqlx::query_as(
1757            r#"
1758            SELECT DISTINCT json_extract(COALESCE(m.metadata, '{}'), '$.cognitive.session_key') AS session_key
1759            FROM memories m
1760            WHERE m.namespace_id = ?
1761            AND json_extract(COALESCE(m.metadata, '{}'), '$.cognitive.session_key') IS NOT NULL
1762            AND TRIM(json_extract(COALESCE(m.metadata, '{}'), '$.cognitive.session_key')) <> ''
1763            AND NOT EXISTS (
1764                SELECT 1 FROM session_digests sd
1765                WHERE sd.namespace_id = m.namespace_id
1766                AND sd.session_key = json_extract(COALESCE(m.metadata, '{}'), '$.cognitive.session_key')
1767            )
1768            ORDER BY session_key ASC
1769            LIMIT ?
1770            "#,
1771        )
1772        .bind(namespace_id)
1773        .bind(limit)
1774        .fetch_all(&self.pool)
1775        .await
1776        .map_err(db_error)?;
1777
1778        Ok(rows.into_iter().map(|(session_key,)| session_key).collect())
1779    }
1780
1781    /// Count distinct non-empty cognitive session keys present in active memories.
1782    pub async fn count_distinct_session_keys_with_cognition(
1783        &self,
1784        namespace_id: i64,
1785    ) -> Result<i64> {
1786        let count: i64 = sqlx::query_scalar(
1787            r#"
1788            SELECT COUNT(DISTINCT json_extract(COALESCE(metadata, '{}'), '$.cognitive.session_key'))
1789            FROM memories
1790            WHERE namespace_id = ?
1791              AND is_active = 1
1792              AND json_extract(COALESCE(metadata, '{}'), '$.cognitive.session_key') IS NOT NULL
1793              AND TRIM(json_extract(COALESCE(metadata, '{}'), '$.cognitive.session_key')) <> ''
1794            "#,
1795        )
1796        .bind(namespace_id)
1797        .fetch_one(&self.pool)
1798        .await
1799        .map_err(db_error)?;
1800
1801        Ok(count)
1802    }
1803
1804    /// List lineage-backed archived raw-activity memories that are safe to prune.
1805    pub async fn list_archived_raw_cleanup_candidates(
1806        &self,
1807        namespace_id: i64,
1808        older_than: DateTime<Utc>,
1809        limit: i64,
1810    ) -> Result<Vec<Memory>> {
1811        let rows: Vec<MemoryRow> = sqlx::query_as(
1812            r#"
1813            SELECT * FROM memories
1814            WHERE namespace_id = ?
1815            AND is_active = 0
1816            AND is_archived = 1
1817            AND (
1818                labels LIKE '%raw-activity%'
1819                OR json_extract(COALESCE(metadata, '{}'), '$.raw_activity') = 1
1820            )
1821            AND json_extract(COALESCE(metadata, '{}'), '$.distillation.summary_memory_id') IS NOT NULL
1822            AND created_at <= ?
1823            ORDER BY created_at ASC
1824            LIMIT ?
1825            "#,
1826        )
1827        .bind(namespace_id)
1828        .bind(older_than)
1829        .bind(limit)
1830        .fetch_all(&self.pool)
1831        .await
1832        .map_err(db_error)?;
1833
1834        rows.into_iter().map(|r| self.row_to_memory(r)).collect()
1835    }
1836
1837    /// Count lineage-backed archived raw-activity memories that are safe to prune.
1838    pub async fn count_archived_raw_cleanup_candidates(
1839        &self,
1840        namespace_id: i64,
1841        older_than: DateTime<Utc>,
1842    ) -> Result<i64> {
1843        let count: i64 = sqlx::query_scalar(
1844            r#"
1845            SELECT COUNT(*) FROM memories
1846            WHERE namespace_id = ?
1847            AND is_active = 0
1848            AND is_archived = 1
1849            AND (
1850                labels LIKE '%raw-activity%'
1851                OR json_extract(COALESCE(metadata, '{}'), '$.raw_activity') = 1
1852            )
1853            AND json_extract(COALESCE(metadata, '{}'), '$.distillation.summary_memory_id') IS NOT NULL
1854            AND created_at <= ?
1855            "#,
1856        )
1857        .bind(namespace_id)
1858        .bind(older_than)
1859        .fetch_one(&self.pool)
1860        .await
1861        .map_err(db_error)?;
1862
1863        Ok(count)
1864    }
1865
1866    /// Delete a batch of memories by id.
1867    pub async fn delete_batch(&self, ids: &[i64]) -> Result<u64> {
1868        if ids.is_empty() {
1869            return Ok(0);
1870        }
1871
1872        let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1873        let sql = format!("DELETE FROM memories WHERE id IN ({placeholders})");
1874        let mut query = sqlx::query(&sql);
1875        for id in ids {
1876            query = query.bind(*id);
1877        }
1878
1879        let result = query.execute(&self.pool).await.map_err(db_error)?;
1880        Ok(result.rows_affected())
1881    }
1882
1883    /// Delete memories matching a content pattern (for cleaning noise)
1884    pub async fn delete_by_content_pattern(&self, namespace_id: i64, pattern: &str) -> Result<u64> {
1885        let result = sqlx::query("DELETE FROM memories WHERE namespace_id = ? AND content LIKE ?")
1886            .bind(namespace_id)
1887            .bind(pattern)
1888            .execute(&self.pool)
1889            .await
1890            .map_err(db_error)?;
1891
1892        Ok(result.rows_affected())
1893    }
1894
1895    /// Count memories matching filters (for stats with time ranges)
1896    pub async fn count_filtered(
1897        &self,
1898        namespace_id: i64,
1899        category: Option<&str>,
1900        since: Option<DateTime<Utc>>,
1901        until: Option<DateTime<Utc>>,
1902        include_raw: bool,
1903    ) -> Result<i64> {
1904        let mut conditions = vec!["namespace_id = ?1".to_string(), "is_active = 1".to_string()];
1905        let mut param_idx = 2u32;
1906
1907        if category.is_some() {
1908            conditions.push(format!("category = ?{}", param_idx));
1909            param_idx += 1;
1910        }
1911        if since.is_some() {
1912            conditions.push(format!("created_at >= ?{}", param_idx));
1913            param_idx += 1;
1914        }
1915        if until.is_some() {
1916            conditions.push(format!("created_at <= ?{}", param_idx));
1917        }
1918        if !include_raw {
1919            conditions.push(RAW_ACTIVITY_FILTER_SQL.to_string());
1920        }
1921
1922        let sql = format!(
1923            "SELECT COUNT(*) FROM memories WHERE {}",
1924            conditions.join(" AND "),
1925        );
1926
1927        let mut query = sqlx::query_scalar::<_, i64>(&sql).bind(namespace_id);
1928
1929        if let Some(cat) = category {
1930            query = query.bind(cat.to_string());
1931        }
1932        if let Some(s) = since {
1933            query = query.bind(s);
1934        }
1935        if let Some(u) = until {
1936            query = query.bind(u);
1937        }
1938
1939        let count = query.fetch_one(&self.pool).await.map_err(db_error)?;
1940        Ok(count)
1941    }
1942
1943    /// Store a distilled summary and archive its source memories atomically.
1944    pub async fn store_distilled_summary(
1945        &self,
1946        params: StoreMemoryParams<'_>,
1947        source_ids: &[i64],
1948    ) -> Result<Memory> {
1949        let labels_json = serde_json::to_string(params.labels)?;
1950        let metadata_json = serde_json::to_string(params.metadata)?;
1951        let embedding_json = params.embedding.map(serde_json::to_string).transpose()?;
1952        let mut tx = self.pool.begin().await.map_err(db_error)?;
1953
1954        let result = sqlx::query(
1955            r#"
1956            INSERT INTO memories (
1957                namespace_id, content, category, memory_lane_type, labels, metadata,
1958                content_embedding, embedding_model, created_at, is_active, access_count
1959            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 1, 0)
1960            "#,
1961        )
1962        .bind(params.namespace_id)
1963        .bind(params.content)
1964        .bind(params.category.to_string())
1965        .bind(params.memory_lane_type.map(|t| t.to_string()))
1966        .bind(&labels_json)
1967        .bind(&metadata_json)
1968        .bind(&embedding_json)
1969        .bind(params.embedding_model)
1970        .bind(Utc::now())
1971        .execute(&mut *tx)
1972        .await
1973        .map_err(db_error)?;
1974
1975        let summary_id = if result.last_insert_rowid() == 0 {
1976            let row: Option<MemoryRow> = sqlx::query_as(
1977                "SELECT * FROM memories WHERE namespace_id = ? AND LOWER(TRIM(content)) = LOWER(TRIM(?)) ORDER BY created_at DESC LIMIT 1",
1978            )
1979            .bind(params.namespace_id)
1980            .bind(params.content)
1981            .fetch_optional(&mut *tx)
1982            .await
1983            .map_err(db_error)?;
1984            row.map(|memory| memory.id).ok_or_else(|| {
1985                nexus_core::NexusError::Storage(
1986                    "Duplicate distilled summary merged but matching row not found".to_string(),
1987                )
1988            })?
1989        } else {
1990            result.last_insert_rowid()
1991        };
1992
1993        if !source_ids.is_empty() {
1994            for source_id in source_ids {
1995                sqlx::query(
1996                    r#"
1997                    INSERT OR IGNORE INTO memory_evidence (
1998                        derived_memory_id,
1999                        source_memory_id,
2000                        evidence_role,
2001                        created_at
2002                    ) VALUES (?, ?, 'source', datetime('now'))
2003                    "#,
2004                )
2005                .bind(summary_id)
2006                .bind(*source_id)
2007                .execute(&mut *tx)
2008                .await
2009                .map_err(db_error)?;
2010            }
2011
2012            let placeholders = source_ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
2013            let sql = format!(
2014                r#"
2015                UPDATE memories
2016                SET
2017                    is_active = 0,
2018                    is_archived = 1,
2019                    updated_at = ?,
2020                    metadata = json_set(
2021                        COALESCE(metadata, '{{}}'),
2022                        '$.distillation.status', 'archived',
2023                        '$.distillation.summary_memory_id', ?,
2024                        '$.distillation.archived_at', ?
2025                    )
2026                WHERE id IN ({})
2027                "#,
2028                placeholders
2029            );
2030            let archived_at = Utc::now().to_rfc3339();
2031            let mut query = sqlx::query(&sql)
2032                .bind(Utc::now())
2033                .bind(summary_id)
2034                .bind(&archived_at);
2035            for source_id in source_ids {
2036                query = query.bind(*source_id);
2037            }
2038            query.execute(&mut *tx).await.map_err(db_error)?;
2039        }
2040
2041        tx.commit().await.map_err(db_error)?;
2042        self.get_by_id(summary_id).await?.ok_or_else(|| {
2043            nexus_core::NexusError::Storage(format!(
2044                "Failed to retrieve distilled summary with id {}",
2045                summary_id
2046            ))
2047        })
2048    }
2049
2050    fn row_to_memory(&self, row: MemoryRow) -> Result<Memory> {
2051        let labels: Vec<String> = serde_json::from_str(&row.labels).map_err(|e| {
2052            nexus_core::NexusError::Storage(format!(
2053                "corrupted labels JSON for memory {}: {e}",
2054                row.id
2055            ))
2056        })?;
2057        let metadata: serde_json::Value = serde_json::from_str(&row.metadata).map_err(|e| {
2058            nexus_core::NexusError::Storage(format!(
2059                "corrupted metadata JSON for memory {}: {e}",
2060                row.id
2061            ))
2062        })?;
2063        let embedding: Option<Vec<f32>> = row
2064            .content_embedding
2065            .map(|e| {
2066                serde_json::from_str(&e).map_err(|err| {
2067                    nexus_core::NexusError::Storage(format!(
2068                        "corrupted embedding JSON for memory {}: {err}",
2069                        row.id
2070                    ))
2071                })
2072            })
2073            .transpose()?;
2074
2075        Ok(Memory {
2076            id: row.id,
2077            namespace_id: row.namespace_id,
2078            content: row.content,
2079            category: parse_category(&row.category)?,
2080            memory_lane_type: match &row.memory_lane_type {
2081                Some(s) => parse_memory_lane_type(s)?,
2082                None => None,
2083            },
2084            labels,
2085            metadata,
2086            similarity_score: row.similarity_score,
2087            relevance_score: row.relevance_score,
2088            content_embedding: embedding,
2089            embedding_model: row.embedding_model,
2090            created_at: row.created_at,
2091            updated_at: row.updated_at,
2092            last_accessed: row.last_accessed,
2093            is_active: row.is_active,
2094            is_archived: row.is_archived,
2095            access_count: row.access_count,
2096        })
2097    }
2098
2099    // ---- Observability queries ----
2100
2101    /// List memory jobs with optional filters.
2102    pub async fn list_jobs(
2103        &self,
2104        namespace_id: i64,
2105        job_type: Option<&str>,
2106        status: Option<&str>,
2107        limit: i64,
2108        offset: i64,
2109    ) -> Result<Vec<MemoryJobRow>> {
2110        let mut where_clauses = vec!["namespace_id = ?".to_string()];
2111        if job_type.is_some() {
2112            where_clauses.push("job_type = ?".to_string());
2113        }
2114        if status.is_some() {
2115            where_clauses.push("status = ?".to_string());
2116        }
2117        let where_sql = where_clauses.join(" AND ");
2118
2119        let sql = format!(
2120            "SELECT * FROM memory_jobs WHERE {} ORDER BY created_at DESC LIMIT ? OFFSET ?",
2121            where_sql
2122        );
2123
2124        let mut query = sqlx::query_as::<_, MemoryJobRow>(&sql).bind(namespace_id);
2125        if let Some(jt) = job_type {
2126            query = query.bind(jt);
2127        }
2128        if let Some(st) = status {
2129            query = query.bind(st);
2130        }
2131        query = query.bind(limit).bind(offset);
2132
2133        let rows = query.fetch_all(&self.pool).await.map_err(db_error)?;
2134        Ok(rows)
2135    }
2136
2137    /// Count memory jobs with optional filters.
2138    pub async fn count_jobs(
2139        &self,
2140        namespace_id: i64,
2141        job_type: Option<&str>,
2142        status: Option<&str>,
2143    ) -> Result<i64> {
2144        let mut where_clauses = vec!["namespace_id = ?".to_string()];
2145        if job_type.is_some() {
2146            where_clauses.push("job_type = ?".to_string());
2147        }
2148        if status.is_some() {
2149            where_clauses.push("status = ?".to_string());
2150        }
2151        let where_sql = where_clauses.join(" AND ");
2152
2153        let sql = format!("SELECT COUNT(*) FROM memory_jobs WHERE {}", where_sql);
2154
2155        let mut query = sqlx::query_scalar::<_, i64>(&sql).bind(namespace_id);
2156        if let Some(jt) = job_type {
2157            query = query.bind(jt);
2158        }
2159        if let Some(st) = status {
2160            query = query.bind(st);
2161        }
2162
2163        let count = query.fetch_one(&self.pool).await.map_err(db_error)?;
2164        Ok(count)
2165    }
2166
2167    /// Count memory jobs grouped by status for a namespace.
2168    pub async fn count_jobs_by_status(
2169        &self,
2170        namespace_id: i64,
2171        job_type: Option<&str>,
2172    ) -> Result<Vec<(String, i64)>> {
2173        let mut where_clauses = vec!["namespace_id = ?".to_string()];
2174        if job_type.is_some() {
2175            where_clauses.push("job_type = ?".to_string());
2176        }
2177        let where_sql = where_clauses.join(" AND ");
2178
2179        let sql = format!(
2180            "SELECT status, COUNT(*) as cnt FROM memory_jobs WHERE {} GROUP BY status",
2181            where_sql
2182        );
2183
2184        let mut query = sqlx::query_as::<_, (String, i64)>(&sql).bind(namespace_id);
2185        if let Some(jt) = job_type {
2186            query = query.bind(jt);
2187        }
2188
2189        let rows = query.fetch_all(&self.pool).await.map_err(db_error)?;
2190        Ok(rows)
2191    }
2192
2193    /// Delete completed jobs that were last updated before the given timestamp.
2194    ///
2195    /// Returns the number of rows removed.
2196    pub async fn purge_completed_jobs(&self, older_than: DateTime<Utc>) -> Result<u64> {
2197        let result = sqlx::query(
2198            r#"
2199            DELETE FROM memory_jobs
2200            WHERE status = ? AND updated_at < ?
2201            "#,
2202        )
2203        .bind(memory_job_status::COMPLETED)
2204        .bind(older_than.format("%Y-%m-%d %H:%M:%S").to_string())
2205        .execute(&self.pool)
2206        .await
2207        .map_err(db_error)?;
2208
2209        Ok(result.rows_affected())
2210    }
2211
2212    /// Delete permanently failed jobs (attempts >= 5) that were last updated
2213    /// before the given timestamp.
2214    ///
2215    /// Returns the number of rows removed.
2216    pub async fn purge_permanently_failed_jobs(&self, older_than: DateTime<Utc>) -> Result<u64> {
2217        let result = sqlx::query(
2218            r#"
2219            DELETE FROM memory_jobs
2220            WHERE status = ? AND attempts >= ? AND updated_at < ?
2221            "#,
2222        )
2223        .bind(memory_job_status::FAILED)
2224        .bind(MAX_JOB_ATTEMPTS)
2225        .bind(older_than.format("%Y-%m-%d %H:%M:%S").to_string())
2226        .execute(&self.pool)
2227        .await
2228        .map_err(db_error)?;
2229
2230        Ok(result.rows_affected())
2231    }
2232
2233    /// List session digests with optional session_key filter.
2234    pub async fn list_digests(
2235        &self,
2236        namespace_id: i64,
2237        session_key: Option<&str>,
2238        limit: i64,
2239        offset: i64,
2240    ) -> Result<Vec<SessionDigestRow>> {
2241        let mut query = if let Some(sk) = session_key {
2242            sqlx::query_as::<_, SessionDigestRow>(
2243                "SELECT * FROM session_digests WHERE namespace_id = ? AND session_key = ? ORDER BY created_at DESC LIMIT ? OFFSET ?"
2244            )
2245            .bind(namespace_id)
2246            .bind(sk)
2247        } else {
2248            sqlx::query_as::<_, SessionDigestRow>(
2249                "SELECT * FROM session_digests WHERE namespace_id = ? ORDER BY created_at DESC LIMIT ? OFFSET ?"
2250            )
2251            .bind(namespace_id)
2252        };
2253
2254        query = query.bind(limit).bind(offset);
2255        let rows = query.fetch_all(&self.pool).await.map_err(db_error)?;
2256        Ok(rows)
2257    }
2258
2259    /// Count session digests for a namespace, optionally filtered by session_key.
2260    pub async fn count_digests(&self, namespace_id: i64, session_key: Option<&str>) -> Result<i64> {
2261        let query = if let Some(sk) = session_key {
2262            sqlx::query_scalar(
2263                "SELECT COUNT(*) FROM session_digests WHERE namespace_id = ? AND session_key = ?",
2264            )
2265            .bind(namespace_id)
2266            .bind(sk)
2267        } else {
2268            sqlx::query_scalar("SELECT COUNT(*) FROM session_digests WHERE namespace_id = ?")
2269                .bind(namespace_id)
2270        };
2271
2272        let count: i64 = query.fetch_one(&self.pool).await.map_err(db_error)?;
2273        Ok(count)
2274    }
2275
2276    /// Count evidence edges for a namespace.
2277    pub async fn count_evidence(&self, namespace_id: i64) -> Result<i64> {
2278        let count: i64 = sqlx::query_scalar(
2279            "SELECT COUNT(*) FROM memory_evidence WHERE derived_memory_id IN (SELECT id FROM memories WHERE namespace_id = ?)"
2280        )
2281        .bind(namespace_id)
2282        .fetch_one(&self.pool)
2283        .await
2284        .map_err(db_error)?;
2285        Ok(count)
2286    }
2287
2288    /// Record a system metric sample.
2289    pub async fn record_metric(
2290        &self,
2291        metric_name: &str,
2292        metric_value: f64,
2293        labels: &serde_json::Value,
2294    ) -> Result<i64> {
2295        let labels_json = serde_json::to_string(labels)?;
2296        let id: i64 = sqlx::query_scalar(
2297            r#"
2298            INSERT INTO system_metrics (metric_name, metric_value, labels, recorded_at)
2299            VALUES (?, ?, ?, ?)
2300            RETURNING id
2301            "#,
2302        )
2303        .bind(metric_name)
2304        .bind(metric_value)
2305        .bind(labels_json)
2306        .bind(Utc::now())
2307        .fetch_one(&self.pool)
2308        .await
2309        .map_err(db_error)?;
2310        Ok(id)
2311    }
2312
2313    /// Persist multiple metric samples in a single transaction.
2314    pub async fn record_metrics_batch(&self, samples: &[MetricSample]) -> Result<()> {
2315        if samples.is_empty() {
2316            return Ok(());
2317        }
2318
2319        let mut tx = self.pool.begin().await.map_err(db_error)?;
2320        for sample in samples {
2321            let labels_json = serde_json::to_string(&sample.labels)?;
2322            sqlx::query(
2323                r#"
2324                INSERT INTO system_metrics (metric_name, metric_value, labels, recorded_at)
2325                VALUES (?, ?, ?, ?)
2326                "#,
2327            )
2328            .bind(&sample.metric_name)
2329            .bind(sample.metric_value)
2330            .bind(labels_json)
2331            .bind(Utc::now())
2332            .execute(&mut *tx)
2333            .await
2334            .map_err(db_error)?;
2335        }
2336        tx.commit().await.map_err(db_error)?;
2337        Ok(())
2338    }
2339
2340    /// Fetch the newest metric samples for a namespace and optional prefix.
2341    pub async fn latest_metrics_for_namespace(
2342        &self,
2343        namespace_id: i64,
2344        metric_prefix: Option<&str>,
2345        limit: i64,
2346    ) -> Result<Vec<SystemMetricRow>> {
2347        let limit = limit.max(1);
2348        let rows = if let Some(prefix) = metric_prefix {
2349            sqlx::query_as::<_, SystemMetricRow>(
2350                r#"
2351                SELECT *
2352                FROM system_metrics
2353                WHERE json_extract(COALESCE(labels, '{}'), '$.namespace_id') = ?
2354                  AND metric_name LIKE ?
2355                ORDER BY recorded_at DESC, id DESC
2356                LIMIT ?
2357                "#,
2358            )
2359            .bind(namespace_id)
2360            .bind(format!("{prefix}%"))
2361            .bind(limit)
2362            .fetch_all(&self.pool)
2363            .await
2364            .map_err(db_error)?
2365        } else {
2366            sqlx::query_as::<_, SystemMetricRow>(
2367                r#"
2368                SELECT *
2369                FROM system_metrics
2370                WHERE json_extract(COALESCE(labels, '{}'), '$.namespace_id') = ?
2371                ORDER BY recorded_at DESC, id DESC
2372                LIMIT ?
2373                "#,
2374            )
2375            .bind(namespace_id)
2376            .bind(limit)
2377            .fetch_all(&self.pool)
2378            .await
2379            .map_err(db_error)?
2380        };
2381        Ok(rows)
2382    }
2383
2384    /// Count active memories for a namespace at one cognitive level.
2385    pub async fn count_by_cognitive_level(
2386        &self,
2387        namespace_id: i64,
2388        level: CognitiveLevel,
2389    ) -> Result<i64> {
2390        let count: i64 = sqlx::query_scalar(
2391            r#"
2392            SELECT COUNT(*) FROM memories
2393            WHERE namespace_id = ?
2394              AND is_active = 1
2395              AND json_extract(COALESCE(metadata, '{}'), '$.cognitive.level') = ?
2396            "#,
2397        )
2398        .bind(namespace_id)
2399        .bind(level.as_str())
2400        .fetch_one(&self.pool)
2401        .await
2402        .map_err(db_error)?;
2403        Ok(count)
2404    }
2405}
2406
2407async fn insert_memory_tx(
2408    tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>,
2409    params: &StoreMemoryParams<'_>,
2410) -> Result<i64> {
2411    let labels_json = serde_json::to_string(params.labels)?;
2412    let metadata_json = serde_json::to_string(params.metadata)?;
2413    let embedding_json = params.embedding.map(serde_json::to_string).transpose()?;
2414
2415    let result = sqlx::query(
2416        r#"
2417        INSERT INTO memories (
2418            namespace_id, content, category, memory_lane_type, labels, metadata,
2419            content_embedding, embedding_model, created_at, is_active, access_count
2420        ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 1, 0)
2421        "#,
2422    )
2423    .bind(params.namespace_id)
2424    .bind(params.content)
2425    .bind(params.category.to_string())
2426    .bind(params.memory_lane_type.map(|t| t.to_string()))
2427    .bind(&labels_json)
2428    .bind(&metadata_json)
2429    .bind(&embedding_json)
2430    .bind(params.embedding_model)
2431    .bind(Utc::now())
2432    .execute(&mut **tx)
2433    .await
2434    .map_err(db_error)?;
2435
2436    let inserted_id = result.last_insert_rowid();
2437    if inserted_id != 0 {
2438        return Ok(inserted_id);
2439    }
2440
2441    let row: Option<MemoryRow> = sqlx::query_as(
2442        "SELECT * FROM memories WHERE namespace_id = ? AND LOWER(TRIM(content)) = LOWER(TRIM(?)) AND is_active = 1 ORDER BY created_at DESC LIMIT 1",
2443    )
2444    .bind(params.namespace_id)
2445    .bind(params.content)
2446    .fetch_optional(&mut **tx)
2447    .await
2448    .map_err(db_error)?;
2449
2450    row.map(|memory| memory.id).ok_or_else(|| {
2451        nexus_core::NexusError::Storage(
2452            "Duplicate merged by trigger but matching row not found".to_string(),
2453        )
2454    })
2455}
2456
2457async fn insert_evidence_tx(
2458    tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>,
2459    derived_memory_id: i64,
2460    source_memory_id: i64,
2461    evidence_role: &str,
2462) -> Result<()> {
2463    sqlx::query(
2464        r#"
2465        INSERT OR IGNORE INTO memory_evidence (derived_memory_id, source_memory_id, evidence_role, created_at)
2466        VALUES (?, ?, ?, datetime('now'))
2467        "#,
2468    )
2469    .bind(derived_memory_id)
2470    .bind(source_memory_id)
2471    .bind(evidence_role)
2472    .execute(&mut **tx)
2473    .await
2474    .map_err(db_error)?;
2475
2476    Ok(())
2477}
2478
2479fn new_claim_token(lease_owner: &str) -> String {
2480    let nanos = Utc::now().timestamp_nanos_opt().unwrap_or_default();
2481    format!("{lease_owner}-{nanos}-{}", std::process::id())
2482}
2483
2484fn merge_labels(existing: &[String], incoming: &[String]) -> Vec<String> {
2485    let mut merged = existing.to_vec();
2486    for label in incoming {
2487        if !merged
2488            .iter()
2489            .any(|current| current.eq_ignore_ascii_case(label))
2490        {
2491            merged.push(label.clone());
2492        }
2493    }
2494    merged
2495}
2496
2497fn merge_duplicate_metadata(
2498    existing: &serde_json::Value,
2499    incoming: &serde_json::Value,
2500) -> serde_json::Value {
2501    let mut merged = existing.clone();
2502
2503    if let Some(session_key) = incoming
2504        .pointer("/cognitive/session_key")
2505        .and_then(serde_json::Value::as_str)
2506    {
2507        let mut session_keys = existing
2508            .pointer("/cognitive/session_keys")
2509            .and_then(serde_json::Value::as_array)
2510            .cloned()
2511            .unwrap_or_default();
2512        if let Some(existing_key) = existing
2513            .pointer("/cognitive/session_key")
2514            .and_then(serde_json::Value::as_str)
2515        {
2516            push_unique_json_string(&mut session_keys, existing_key);
2517        }
2518        push_unique_json_string(&mut session_keys, session_key);
2519        ensure_object_path(&mut merged, "cognitive").insert(
2520            "session_key".to_string(),
2521            serde_json::Value::String(session_key.to_string()),
2522        );
2523        ensure_object_path(&mut merged, "cognitive").insert(
2524            "session_keys".to_string(),
2525            serde_json::Value::Array(session_keys),
2526        );
2527    }
2528
2529    if let Some(derived_session_key) = incoming
2530        .pointer("/source/derived_session_key")
2531        .and_then(serde_json::Value::as_str)
2532    {
2533        let mut derived_keys = existing
2534            .pointer("/source/derived_session_keys")
2535            .and_then(serde_json::Value::as_array)
2536            .cloned()
2537            .unwrap_or_default();
2538        if let Some(existing_key) = existing
2539            .pointer("/source/derived_session_key")
2540            .and_then(serde_json::Value::as_str)
2541        {
2542            push_unique_json_string(&mut derived_keys, existing_key);
2543        }
2544        push_unique_json_string(&mut derived_keys, derived_session_key);
2545        ensure_object_path(&mut merged, "source").insert(
2546            "derived_session_key".to_string(),
2547            serde_json::Value::String(derived_session_key.to_string()),
2548        );
2549        ensure_object_path(&mut merged, "source").insert(
2550            "derived_session_keys".to_string(),
2551            serde_json::Value::Array(derived_keys),
2552        );
2553    }
2554
2555    merged
2556}
2557
2558fn push_unique_json_string(values: &mut Vec<serde_json::Value>, candidate: &str) {
2559    if values
2560        .iter()
2561        .filter_map(serde_json::Value::as_str)
2562        .any(|current| current.eq_ignore_ascii_case(candidate))
2563    {
2564        return;
2565    }
2566    values.push(serde_json::Value::String(candidate.to_string()));
2567}
2568
2569fn ensure_object_path<'a>(
2570    root: &'a mut serde_json::Value,
2571    key: &str,
2572) -> &'a mut serde_json::Map<String, serde_json::Value> {
2573    if !root.is_object() {
2574        *root = serde_json::Value::Object(serde_json::Map::new());
2575    }
2576
2577    let object = root.as_object_mut().expect("root object ensured");
2578    let entry = object
2579        .entry(key.to_string())
2580        .or_insert_with(|| serde_json::Value::Object(serde_json::Map::new()));
2581    if !entry.is_object() {
2582        *entry = serde_json::Value::Object(serde_json::Map::new());
2583    }
2584
2585    entry.as_object_mut().expect("child object ensured")
2586}
2587
2588/// Repository for namespace operations
2589pub struct NamespaceRepository {
2590    pool: SqlitePool,
2591}
2592
2593impl NamespaceRepository {
2594    pub fn new(pool: SqlitePool) -> Self {
2595        Self { pool }
2596    }
2597
2598    /// Get or create a namespace
2599    pub async fn get_or_create(&self, name: &str, agent_type: &str) -> Result<AgentNamespace> {
2600        if let Some(ns) = self.get_by_name(name).await? {
2601            return Ok(ns);
2602        }
2603
2604        let result = sqlx::query(
2605            "INSERT INTO agent_namespaces (name, agent_type, created_at) VALUES (?, ?, ?)",
2606        )
2607        .bind(name)
2608        .bind(agent_type)
2609        .bind(Utc::now())
2610        .execute(&self.pool)
2611        .await
2612        .map_err(db_error)?;
2613
2614        let id = result.last_insert_rowid();
2615        Ok(AgentNamespace {
2616            id,
2617            name: name.to_string(),
2618            description: None,
2619            agent_type: agent_type.to_string(),
2620            created_at: Utc::now(),
2621            updated_at: None,
2622        })
2623    }
2624
2625    /// Get a namespace by name
2626    pub async fn get_by_name(&self, name: &str) -> Result<Option<AgentNamespace>> {
2627        let row: Option<AgentNamespaceRow> =
2628            sqlx::query_as("SELECT * FROM agent_namespaces WHERE name = ?")
2629                .bind(name)
2630                .fetch_optional(&self.pool)
2631                .await
2632                .map_err(db_error)?;
2633
2634        Ok(row.map(|r| AgentNamespace {
2635            id: r.id,
2636            name: r.name,
2637            description: r.description,
2638            agent_type: r.agent_type,
2639            created_at: r.created_at,
2640            updated_at: r.updated_at,
2641        }))
2642    }
2643
2644    /// List all namespaces
2645    pub async fn list_all(&self) -> Result<Vec<AgentNamespace>> {
2646        let rows: Vec<AgentNamespaceRow> =
2647            sqlx::query_as("SELECT * FROM agent_namespaces ORDER BY name")
2648                .fetch_all(&self.pool)
2649                .await
2650                .map_err(db_error)?;
2651
2652        Ok(rows
2653            .into_iter()
2654            .map(|r| AgentNamespace {
2655                id: r.id,
2656                name: r.name,
2657                description: r.description,
2658                agent_type: r.agent_type,
2659                created_at: r.created_at,
2660                updated_at: r.updated_at,
2661            })
2662            .collect())
2663    }
2664}
2665
2666/// Repository for processed file operations (inbox deduplication)
2667pub struct ProcessedFileRepository<'a> {
2668    pub pool: &'a SqlitePool,
2669}
2670
2671impl<'a> ProcessedFileRepository<'a> {
2672    pub fn new(pool: &'a SqlitePool) -> Self {
2673        Self { pool }
2674    }
2675
2676    /// Check if a file has been successfully processed
2677    pub async fn is_processed(&self, namespace_id: i64, path: &str) -> Result<bool> {
2678        let row: Option<(i64,)> =
2679            sqlx::query_as("SELECT id FROM processed_files WHERE namespace_id = ? AND path = ? AND status = 'completed'")
2680                .bind(namespace_id)
2681                .bind(path)
2682                .fetch_optional(self.pool)
2683                .await
2684                .map_err(db_error)?;
2685
2686        Ok(row.is_some())
2687    }
2688
2689    /// Get all completed file paths for a namespace (for batch dedup checks)
2690    pub async fn get_completed_paths(
2691        &self,
2692        namespace_id: i64,
2693    ) -> Result<std::collections::HashSet<String>> {
2694        let rows: Vec<(String,)> = sqlx::query_as(
2695            "SELECT path FROM processed_files WHERE namespace_id = ? AND status = 'completed'",
2696        )
2697        .bind(namespace_id)
2698        .fetch_all(self.pool)
2699        .await
2700        .map_err(db_error)?;
2701
2702        Ok(rows.into_iter().map(|r| r.0).collect())
2703    }
2704
2705    /// Mark a file as being processed
2706    pub async fn mark_processing(
2707        &self,
2708        namespace_id: i64,
2709        path: &str,
2710        content_hash: Option<&str>,
2711    ) -> Result<i64> {
2712        let id: i64 = sqlx::query_scalar(
2713            r#"
2714            INSERT INTO processed_files (namespace_id, path, content_hash, status, updated_at)
2715            VALUES (?, ?, ?, 'processing', datetime('now'))
2716            ON CONFLICT(namespace_id, path) DO UPDATE SET
2717                content_hash = excluded.content_hash,
2718                status = 'processing',
2719                updated_at = datetime('now')
2720            RETURNING id
2721            "#,
2722        )
2723        .bind(namespace_id)
2724        .bind(path)
2725        .bind(content_hash)
2726        .fetch_one(self.pool)
2727        .await
2728        .map_err(db_error)?;
2729
2730        Ok(id)
2731    }
2732
2733    /// Mark a file as successfully processed with memory reference
2734    pub async fn mark_processed(&self, id: i64, memory_id: i64) -> Result<()> {
2735        sqlx::query(
2736            r#"
2737            UPDATE processed_files
2738            SET status = 'completed', memory_id = ?, processed_at = datetime('now'), updated_at = datetime('now')
2739            WHERE id = ?
2740            "#
2741        )
2742        .bind(memory_id)
2743        .bind(id)
2744        .execute(self.pool)
2745        .await
2746        .map_err(db_error)?;
2747
2748        Ok(())
2749    }
2750
2751    /// Mark a file as failed
2752    pub async fn mark_failed(&self, id: i64, error: &str) -> Result<()> {
2753        sqlx::query(
2754            r#"
2755            UPDATE processed_files
2756            SET status = 'failed', last_error = ?, updated_at = datetime('now')
2757            WHERE id = ?
2758            "#,
2759        )
2760        .bind(error)
2761        .bind(id)
2762        .execute(self.pool)
2763        .await
2764        .map_err(db_error)?;
2765
2766        Ok(())
2767    }
2768
2769    /// Get files pending processing
2770    pub async fn get_pending(
2771        &self,
2772        namespace_id: i64,
2773        limit: i32,
2774    ) -> Result<Vec<ProcessedFileRow>> {
2775        let rows = sqlx::query_as::<_, ProcessedFileRow>(
2776            r#"
2777            SELECT * FROM processed_files
2778            WHERE namespace_id = ? AND status = 'pending'
2779            ORDER BY created_at ASC
2780            LIMIT ?
2781            "#,
2782        )
2783        .bind(namespace_id)
2784        .bind(limit)
2785        .fetch_all(self.pool)
2786        .await
2787        .map_err(db_error)?;
2788
2789        Ok(rows)
2790    }
2791
2792    /// Clear all processed files for a namespace
2793    pub async fn clear_namespace(&self, namespace_id: i64) -> Result<u64> {
2794        let result = sqlx::query("DELETE FROM processed_files WHERE namespace_id = ?")
2795            .bind(namespace_id)
2796            .execute(self.pool)
2797            .await
2798            .map_err(db_error)?;
2799
2800        Ok(result.rows_affected())
2801    }
2802}
2803
2804/// Repository for memory relationship operations
2805pub struct MemoryRelationRepository<'a> {
2806    pub pool: &'a SqlitePool,
2807}
2808
2809impl<'a> MemoryRelationRepository<'a> {
2810    pub fn new(pool: &'a SqlitePool) -> Self {
2811        Self { pool }
2812    }
2813
2814    /// Store a relationship between two memories
2815    pub async fn store(
2816        &self,
2817        source_id: i64,
2818        target_id: i64,
2819        relation_type: &str,
2820        strength: f32,
2821    ) -> Result<i64> {
2822        let id: i64 = sqlx::query_scalar(
2823            r#"
2824            INSERT INTO memory_relations (source_memory_id, target_memory_id, relation_type, strength, created_at)
2825            VALUES (?, ?, ?, ?, datetime('now'))
2826            ON CONFLICT(source_memory_id, target_memory_id, relation_type) DO UPDATE SET
2827                strength = excluded.strength,
2828                created_at = datetime('now')
2829            RETURNING id
2830            "#
2831        )
2832        .bind(source_id)
2833        .bind(target_id)
2834        .bind(relation_type)
2835        .bind(strength)
2836        .fetch_one(self.pool)
2837        .await
2838        .map_err(db_error)?;
2839
2840        Ok(id)
2841    }
2842
2843    /// Get all related memories for a given memory
2844    pub async fn get_related(&self, memory_id: i64) -> Result<Vec<(i64, String, f32)>> {
2845        let rows: Vec<(i64, String, f32)> = sqlx::query_as(
2846            r#"
2847            SELECT target_memory_id as memory_id, relation_type, strength
2848            FROM memory_relations
2849            WHERE source_memory_id = ?
2850            UNION
2851            SELECT source_memory_id as memory_id, relation_type, strength
2852            FROM memory_relations
2853            WHERE target_memory_id = ?
2854            ORDER BY strength DESC
2855            "#,
2856        )
2857        .bind(memory_id)
2858        .bind(memory_id)
2859        .fetch_all(self.pool)
2860        .await
2861        .map_err(db_error)?;
2862
2863        Ok(rows)
2864    }
2865
2866    /// Delete all relations for a memory
2867    pub async fn delete_for_memory(&self, memory_id: i64) -> Result<u64> {
2868        let result = sqlx::query(
2869            "DELETE FROM memory_relations WHERE source_memory_id = ? OR target_memory_id = ?",
2870        )
2871        .bind(memory_id)
2872        .bind(memory_id)
2873        .execute(self.pool)
2874        .await
2875        .map_err(db_error)?;
2876
2877        Ok(result.rows_affected())
2878    }
2879}
2880
2881fn parse_category(s: &str) -> Result<Category> {
2882    match MemoryCategory::parse(s) {
2883        Some(cat) => Ok(cat),
2884        None => Err(nexus_core::NexusError::Storage(format!(
2885            "Unknown memory category '{s}' persisted in database; row may be corrupted"
2886        ))),
2887    }
2888}
2889
2890fn parse_memory_lane_type(s: &str) -> Result<Option<MemoryLaneType>> {
2891    match MemoryLaneType::parse(s) {
2892        Some(t) => Ok(Some(t)),
2893        None => Err(nexus_core::NexusError::Storage(format!(
2894            "Unknown memory_lane_type '{s}' persisted in database; row may be corrupted"
2895        ))),
2896    }
2897}
2898
2899#[cfg(test)]
2900mod tests {
2901    use super::*;
2902    use nexus_core::MemoryLanePriorityType;
2903    use sqlx::sqlite::SqlitePoolOptions;
2904
2905    fn cognitive_metadata(
2906        level: CognitiveLevel,
2907        perspective: &PerspectiveKey,
2908        times_reinforced: i64,
2909        times_contradicted: i64,
2910    ) -> serde_json::Value {
2911        serde_json::json!({
2912            "cognitive": {
2913                "level": level.as_str(),
2914                "observer": perspective.observer,
2915                "subject": perspective.subject,
2916                "session_key": perspective.session_key,
2917                "source_memory_ids": [],
2918                "confidence": 0.9,
2919                "times_reinforced": times_reinforced,
2920                "times_contradicted": times_contradicted,
2921                "generated_by": "test",
2922            }
2923        })
2924    }
2925
2926    #[test]
2927    fn test_parse_category() {
2928        assert!(matches!(parse_category("facts"), Ok(Category::Facts)));
2929        assert!(matches!(
2930            parse_category("preferences"),
2931            Ok(Category::Preferences)
2932        ));
2933        assert!(parse_category("unknown").is_err());
2934    }
2935
2936    #[test]
2937    fn test_parse_memory_lane_type() {
2938        let correction = parse_memory_lane_type("correction");
2939        assert!(matches!(
2940            correction,
2941            Ok(Some(MemoryLaneType::Priority(
2942                MemoryLanePriorityType::Correction
2943            )))
2944        ));
2945
2946        let pattern_seed = parse_memory_lane_type("pattern_seed");
2947        assert!(matches!(
2948            pattern_seed,
2949            Ok(Some(MemoryLaneType::Priority(
2950                MemoryLanePriorityType::PatternSeed
2951            )))
2952        ));
2953
2954        assert!(parse_memory_lane_type("unknown").is_err());
2955    }
2956
2957    #[test]
2958    fn test_parse_category_all_variants() {
2959        assert!(matches!(parse_category("general"), Ok(Category::General)));
2960        assert!(matches!(parse_category("session"), Ok(Category::Session)));
2961        assert!(matches!(parse_category("context"), Ok(Category::Context)));
2962        assert!(matches!(
2963            parse_category("specifications"),
2964            Ok(Category::Specifications)
2965        ));
2966        assert!(matches!(parse_category("facts"), Ok(Category::Facts)));
2967        assert!(matches!(
2968            parse_category("preferences"),
2969            Ok(Category::Preferences)
2970        ));
2971        // Unknown values are rejected (fail-closed)
2972        assert!(parse_category("bogus").is_err());
2973        assert!(parse_category("").is_err());
2974    }
2975
2976    #[test]
2977    fn test_store_memory_params_fields() {
2978        // Verify StoreMemoryParams can be constructed with all fields
2979        let params = StoreMemoryParams {
2980            namespace_id: 1,
2981            content: "test content",
2982            category: &Category::General,
2983            memory_lane_type: None,
2984            labels: &[],
2985            metadata: &serde_json::Value::Null,
2986            embedding: None,
2987            embedding_model: None,
2988        };
2989        assert_eq!(params.namespace_id, 1);
2990        assert_eq!(params.content, "test content");
2991        assert!(params.labels.is_empty());
2992    }
2993
2994    #[test]
2995    fn test_merge_duplicate_metadata_preserves_multiple_session_keys() {
2996        let existing = serde_json::json!({
2997            "cognitive": {
2998                "session_key": "session-a"
2999            },
3000            "source": {
3001                "derived_session_key": "session-a"
3002            }
3003        });
3004        let incoming = serde_json::json!({
3005            "cognitive": {
3006                "session_key": "session-b"
3007            },
3008            "source": {
3009                "derived_session_key": "session-b"
3010            }
3011        });
3012
3013        let merged = merge_duplicate_metadata(&existing, &incoming);
3014        assert_eq!(merged["cognitive"]["session_key"], "session-b");
3015        assert_eq!(
3016            merged["cognitive"]["session_keys"],
3017            serde_json::json!(["session-a", "session-b"])
3018        );
3019        assert_eq!(
3020            merged["source"]["derived_session_keys"],
3021            serde_json::json!(["session-a", "session-b"])
3022        );
3023    }
3024
3025    // ---- Phase 2: Job, Digest, Evidence integration tests ----
3026
3027    async fn setup_test_db() -> SqlitePool {
3028        let pool = SqlitePoolOptions::new()
3029            .max_connections(1)
3030            .connect("sqlite::memory:")
3031            .await
3032            .unwrap();
3033        crate::migrations::run_migrations(&pool).await.unwrap();
3034        pool
3035    }
3036
3037    async fn create_namespace(pool: &SqlitePool, name: &str) -> i64 {
3038        let ns = NamespaceRepository::new(pool.clone());
3039        ns.get_or_create(name, "test").await.unwrap();
3040        ns.get_by_name(name).await.unwrap().unwrap().id
3041    }
3042
3043    // ---- Regression tests for audit findings ----
3044
3045    /// Audit finding #1: `get_by_content` must match `namespace_id + content`,
3046    /// not "latest active row". This prevents the duplicate-insert fallback
3047    /// in `store` from returning the wrong memory.
3048    #[tokio::test]
3049    async fn test_get_by_content_matches_actual_content() {
3050        let pool = setup_test_db().await;
3051        let ns_id = create_namespace(&pool, "test-agent").await;
3052        let repo = MemoryRepository::new(pool);
3053
3054        // Store two different memories in the same namespace.
3055        let mem_a = repo
3056            .store(StoreMemoryParams {
3057                namespace_id: ns_id,
3058                content: "first memory content",
3059                category: &Category::General,
3060                memory_lane_type: None,
3061                labels: &[],
3062                metadata: &serde_json::Value::Null,
3063                embedding: None,
3064                embedding_model: None,
3065            })
3066            .await
3067            .unwrap();
3068
3069        let mem_b = repo
3070            .store(StoreMemoryParams {
3071                namespace_id: ns_id,
3072                content: "second memory content",
3073                category: &Category::General,
3074                memory_lane_type: None,
3075                labels: &[],
3076                metadata: &serde_json::Value::Null,
3077                embedding: None,
3078                embedding_model: None,
3079            })
3080            .await
3081            .unwrap();
3082
3083        assert_ne!(mem_a.id, mem_b.id);
3084
3085        // get_by_content must return the memory with matching content,
3086        // NOT the newest one.
3087        let found_a = repo
3088            .get_by_content(ns_id, "first memory content")
3089            .await
3090            .unwrap();
3091        assert_eq!(found_a.id, mem_a.id);
3092        assert_eq!(found_a.content, "first memory content");
3093
3094        let found_b = repo
3095            .get_by_content(ns_id, "second memory content")
3096            .await
3097            .unwrap();
3098        assert_eq!(found_b.id, mem_b.id);
3099        assert_eq!(found_b.content, "second memory content");
3100
3101        // Non-existent content must error.
3102        let result = repo.get_by_content(ns_id, "nonexistent").await;
3103        assert!(result.is_err());
3104    }
3105
3106    #[tokio::test]
3107    async fn test_enqueue_and_claim_jobs() {
3108        let pool = setup_test_db().await;
3109        let ns_id = create_namespace(&pool, "test-agent").await;
3110        let repo = MemoryRepository::new(pool);
3111
3112        // Enqueue two jobs with different priorities.
3113        let id1 = repo
3114            .enqueue_job(EnqueueJobParams {
3115                namespace_id: ns_id,
3116                job_type: "derive_memory",
3117                priority: 100,
3118                perspective: None,
3119                payload: &serde_json::json!({"memory_id": 1}),
3120            })
3121            .await
3122            .unwrap();
3123
3124        let id2 = repo
3125            .enqueue_job(EnqueueJobParams {
3126                namespace_id: ns_id,
3127                job_type: "derive_memory",
3128                priority: 50,
3129                perspective: None,
3130                payload: &serde_json::json!({"memory_id": 2}),
3131            })
3132            .await
3133            .unwrap();
3134
3135        assert!(id1 > 0);
3136        assert!(id2 > 0);
3137        assert_ne!(id1, id2);
3138
3139        // Claim one job — higher priority first.
3140        let claimed = repo
3141            .claim_jobs(ns_id, "derive_memory", "worker-1", 120, 1)
3142            .await
3143            .unwrap();
3144
3145        assert_eq!(claimed.len(), 1);
3146        assert_eq!(claimed[0].row.id, id1); // Higher priority claimed first
3147        assert_eq!(claimed[0].row.status, "running");
3148        assert_eq!(claimed[0].payload["memory_id"], 1);
3149
3150        // Second claim should get the lower-priority job.
3151        let claimed2 = repo
3152            .claim_jobs(ns_id, "derive_memory", "worker-2", 120, 1)
3153            .await
3154            .unwrap();
3155
3156        assert_eq!(claimed2.len(), 1);
3157        assert_eq!(claimed2[0].row.id, id2);
3158    }
3159
3160    #[tokio::test]
3161    async fn test_complete_and_fail_job() {
3162        let pool = setup_test_db().await;
3163        let ns_id = create_namespace(&pool, "test-agent").await;
3164        let repo = MemoryRepository::new(pool);
3165
3166        let _id = repo
3167            .enqueue_job(EnqueueJobParams {
3168                namespace_id: ns_id,
3169                job_type: "digest_session",
3170                priority: 100,
3171                perspective: None,
3172                payload: &serde_json::json!({"session": "s1"}),
3173            })
3174            .await
3175            .unwrap();
3176
3177        // Claim then complete.
3178        let claimed = repo
3179            .claim_jobs(ns_id, "digest_session", "w", 60, 10)
3180            .await
3181            .unwrap();
3182        assert_eq!(claimed.len(), 1);
3183
3184        repo.complete_job(&claimed[0]).await.unwrap();
3185
3186        // Verify completed status by attempting to claim — should be empty.
3187        let claimed_again = repo
3188            .claim_jobs(ns_id, "digest_session", "w", 60, 10)
3189            .await
3190            .unwrap();
3191        assert!(claimed_again.is_empty());
3192    }
3193
3194    #[tokio::test]
3195    async fn test_fail_job_requeues_before_limit() {
3196        let pool = setup_test_db().await;
3197        let ns_id = create_namespace(&pool, "test-agent").await;
3198        let repo = MemoryRepository::new(pool);
3199
3200        let _id = repo
3201            .enqueue_job(EnqueueJobParams {
3202                namespace_id: ns_id,
3203                job_type: "derive_memory",
3204                priority: 100,
3205                perspective: None,
3206                payload: &serde_json::json!({"test": true}),
3207            })
3208            .await
3209            .unwrap();
3210
3211        // Claim, fail, re-claim (should work).
3212        let claimed = repo
3213            .claim_jobs(ns_id, "derive_memory", "w", 60, 10)
3214            .await
3215            .unwrap();
3216        repo.fail_job(&claimed[0], "transient error").await.unwrap();
3217
3218        let reclaimed = repo
3219            .claim_jobs(ns_id, "derive_memory", "w", 60, 10)
3220            .await
3221            .unwrap();
3222        assert_eq!(reclaimed.len(), 1);
3223        assert_eq!(reclaimed[0].row.attempts, 2);
3224    }
3225
3226    #[tokio::test]
3227    async fn test_complete_job_requires_matching_claim_token() {
3228        let pool = setup_test_db().await;
3229        let ns_id = create_namespace(&pool, "test-agent").await;
3230        let repo = MemoryRepository::new(pool);
3231
3232        repo.enqueue_job(EnqueueJobParams {
3233            namespace_id: ns_id,
3234            job_type: "derive_memory",
3235            priority: 100,
3236            perspective: None,
3237            payload: &serde_json::json!({"memory_id": 7}),
3238        })
3239        .await
3240        .unwrap();
3241
3242        let claimed = repo
3243            .claim_jobs(ns_id, "derive_memory", "worker-1", 60, 1)
3244            .await
3245            .unwrap();
3246        let mut forged = claimed[0].clone();
3247        forged.row.claim_token = Some("forged-token".to_string());
3248
3249        let error = repo.complete_job(&forged).await.unwrap_err();
3250        assert!(error.to_string().contains("lost lease ownership"));
3251    }
3252
3253    #[tokio::test]
3254    async fn test_store_digest_and_latest_digest() {
3255        let pool = setup_test_db().await;
3256        let ns_id = create_namespace(&pool, "test-agent").await;
3257        let repo = MemoryRepository::new(pool);
3258
3259        // Store a memory that will serve as the digest content.
3260        let digest_memory = repo
3261            .store(StoreMemoryParams {
3262                namespace_id: ns_id,
3263                content: "session summary short",
3264                category: &Category::Session,
3265                memory_lane_type: None,
3266                labels: &[],
3267                metadata: &serde_json::json!({"cognitive": {"level": "summary_short"}}),
3268                embedding: None,
3269                embedding_model: None,
3270            })
3271            .await
3272            .unwrap();
3273
3274        let digest_id = repo
3275            .store_digest(StoreDigestParams {
3276                namespace_id: ns_id,
3277                session_key: "session-abc",
3278                digest_kind: "short",
3279                memory_id: digest_memory.id,
3280                start_memory_id: Some(1),
3281                end_memory_id: Some(100),
3282                token_count: 42,
3283            })
3284            .await
3285            .unwrap();
3286
3287        assert!(digest_id > 0);
3288
3289        // Retrieve latest digest.
3290        let result = repo
3291            .latest_digest_for_session(ns_id, "session-abc", "short")
3292            .await
3293            .unwrap();
3294
3295        assert!(result.is_some());
3296        assert_eq!(result.as_ref().unwrap().id, digest_memory.id);
3297
3298        let replacement_memory = repo
3299            .store(StoreMemoryParams {
3300                namespace_id: ns_id,
3301                content: "session summary short updated",
3302                category: &Category::Session,
3303                memory_lane_type: None,
3304                labels: &[],
3305                metadata: &serde_json::json!({"cognitive": {"level": "summary_short"}}),
3306                embedding: None,
3307                embedding_model: None,
3308            })
3309            .await
3310            .unwrap();
3311
3312        let replacement_digest_id = repo
3313            .store_digest(StoreDigestParams {
3314                namespace_id: ns_id,
3315                session_key: "session-abc",
3316                digest_kind: "short",
3317                memory_id: replacement_memory.id,
3318                start_memory_id: Some(1),
3319                end_memory_id: Some(100),
3320                token_count: 64,
3321            })
3322            .await
3323            .unwrap();
3324
3325        assert_eq!(replacement_digest_id, digest_id);
3326
3327        let updated = repo
3328            .latest_digest_for_session(ns_id, "session-abc", "short")
3329            .await
3330            .unwrap()
3331            .unwrap();
3332        assert_eq!(updated.id, replacement_memory.id);
3333
3334        let latest_for_namespace = repo
3335            .latest_digest_for_namespace(ns_id, "short")
3336            .await
3337            .unwrap()
3338            .unwrap();
3339        assert_eq!(latest_for_namespace.id, replacement_memory.id);
3340    }
3341
3342    #[tokio::test]
3343    async fn test_session_digest_rollover_reports_new_signal_since_last_digest() {
3344        let pool = setup_test_db().await;
3345        let ns_id = create_namespace(&pool, "test-agent").await;
3346        let repo = MemoryRepository::new(pool);
3347
3348        let source = repo
3349            .store(StoreMemoryParams {
3350                namespace_id: ns_id,
3351                content: "Implemented bounded digest rollover policy.",
3352                category: &Category::Session,
3353                memory_lane_type: None,
3354                labels: &[],
3355                metadata: &serde_json::json!({
3356                    "cognitive": {
3357                        "level": "explicit",
3358                        "observer": "claude-code",
3359                        "subject": "claude-code",
3360                        "session_key": "session-rollover"
3361                    }
3362                }),
3363                embedding: None,
3364                embedding_model: None,
3365            })
3366            .await
3367            .unwrap();
3368
3369        let first = repo
3370            .session_digest_rollover(ns_id, "session-rollover")
3371            .await
3372            .unwrap();
3373        assert_eq!(first.last_digest_end_memory_id, None);
3374        assert_eq!(first.new_memory_count, 1);
3375        assert!(first.estimated_new_tokens > 0);
3376
3377        let digest_memory = repo
3378            .store(StoreMemoryParams {
3379                namespace_id: ns_id,
3380                content: "Short digest",
3381                category: &Category::Session,
3382                memory_lane_type: None,
3383                labels: &[],
3384                metadata: &serde_json::json!({
3385                    "cognitive": {
3386                        "level": "summary_short",
3387                        "observer": "claude-code",
3388                        "subject": "claude-code",
3389                        "session_key": "session-rollover"
3390                    }
3391                }),
3392                embedding: None,
3393                embedding_model: None,
3394            })
3395            .await
3396            .unwrap();
3397
3398        repo.store_digest(StoreDigestParams {
3399            namespace_id: ns_id,
3400            session_key: "session-rollover",
3401            digest_kind: "short",
3402            memory_id: digest_memory.id,
3403            start_memory_id: Some(source.id),
3404            end_memory_id: Some(source.id),
3405            token_count: 16,
3406        })
3407        .await
3408        .unwrap();
3409
3410        let covered = repo
3411            .session_digest_rollover(ns_id, "session-rollover")
3412            .await
3413            .unwrap();
3414        assert_eq!(covered.last_digest_end_memory_id, Some(source.id));
3415        assert_eq!(covered.new_memory_count, 0);
3416        assert_eq!(covered.estimated_new_tokens, 0);
3417
3418        repo.store(StoreMemoryParams {
3419            namespace_id: ns_id,
3420            content: "Added one more explicit memory after the digest coverage window.",
3421            category: &Category::Session,
3422            memory_lane_type: None,
3423            labels: &[],
3424            metadata: &serde_json::json!({
3425                "cognitive": {
3426                    "level": "explicit",
3427                    "observer": "claude-code",
3428                    "subject": "claude-code",
3429                    "session_key": "session-rollover"
3430                }
3431            }),
3432            embedding: None,
3433            embedding_model: None,
3434        })
3435        .await
3436        .unwrap();
3437
3438        let second = repo
3439            .session_digest_rollover(ns_id, "session-rollover")
3440            .await
3441            .unwrap();
3442        assert_eq!(second.last_digest_end_memory_id, Some(source.id));
3443        assert_eq!(second.new_memory_count, 1);
3444        assert!(second.estimated_new_tokens > 0);
3445    }
3446
3447    #[tokio::test]
3448    async fn test_store_with_lineage() {
3449        let pool = setup_test_db().await;
3450        let ns_id = create_namespace(&pool, "test-agent").await;
3451        let repo = MemoryRepository::new(pool);
3452
3453        // Store two source memories.
3454        let source1 = repo
3455            .store(StoreMemoryParams {
3456                namespace_id: ns_id,
3457                content: "raw observation one",
3458                category: &Category::Facts,
3459                memory_lane_type: None,
3460                labels: &[],
3461                metadata: &serde_json::Value::Null,
3462                embedding: None,
3463                embedding_model: None,
3464            })
3465            .await
3466            .unwrap();
3467
3468        let source2 = repo
3469            .store(StoreMemoryParams {
3470                namespace_id: ns_id,
3471                content: "raw observation two",
3472                category: &Category::Facts,
3473                memory_lane_type: None,
3474                labels: &[],
3475                metadata: &serde_json::Value::Null,
3476                embedding: None,
3477                embedding_model: None,
3478            })
3479            .await
3480            .unwrap();
3481
3482        // Store derived with lineage.
3483        let derived = repo
3484            .store_with_lineage(StoreMemoryWithLineageParams {
3485                store: StoreMemoryParams {
3486                    namespace_id: ns_id,
3487                    content: "derived insight",
3488                    category: &Category::Facts,
3489                    memory_lane_type: None,
3490                    labels: &[],
3491                    metadata: &serde_json::json!({"cognitive": {"level": "derived"}}),
3492                    embedding: None,
3493                    embedding_model: None,
3494                },
3495                source_memory_ids: &[source1.id, source2.id],
3496                evidence_role: "derived_from",
3497            })
3498            .await
3499            .unwrap();
3500
3501        assert_eq!(derived.content, "derived insight");
3502
3503        // Load lineage for the derived memory.
3504        let lineage = repo.load_lineage(derived.id).await.unwrap();
3505        assert_eq!(lineage.len(), 2);
3506        assert!(lineage.iter().any(|e| e.source_memory_id == source1.id));
3507        assert!(lineage.iter().any(|e| e.source_memory_id == source2.id));
3508    }
3509
3510    #[tokio::test]
3511    async fn test_cognitive_queries_by_level_and_perspective() {
3512        let pool = setup_test_db().await;
3513        let ns_id = create_namespace(&pool, "test-agent").await;
3514        let repo = MemoryRepository::new(pool);
3515        let perspective =
3516            PerspectiveKey::new("claude-code", "claude-code", Some("session-1".into()));
3517
3518        let _raw = repo
3519            .store(StoreMemoryParams {
3520                namespace_id: ns_id,
3521                content: "raw note",
3522                category: &Category::Session,
3523                memory_lane_type: None,
3524                labels: &[],
3525                metadata: &cognitive_metadata(CognitiveLevel::Raw, &perspective, 0, 0),
3526                embedding: None,
3527                embedding_model: None,
3528            })
3529            .await
3530            .unwrap();
3531
3532        let explicit = repo
3533            .store(StoreMemoryParams {
3534                namespace_id: ns_id,
3535                content: "explicit note",
3536                category: &Category::Session,
3537                memory_lane_type: None,
3538                labels: &[],
3539                metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective, 1, 0),
3540                embedding: None,
3541                embedding_model: None,
3542            })
3543            .await
3544            .unwrap();
3545
3546        let derived = repo
3547            .store(StoreMemoryParams {
3548                namespace_id: ns_id,
3549                content: "reinforced insight",
3550                category: &Category::Facts,
3551                memory_lane_type: None,
3552                labels: &[],
3553                metadata: &cognitive_metadata(CognitiveLevel::Derived, &perspective, 7, 0),
3554                embedding: None,
3555                embedding_model: None,
3556            })
3557            .await
3558            .unwrap();
3559
3560        let contradiction = repo
3561            .store(StoreMemoryParams {
3562                namespace_id: ns_id,
3563                content: "contradiction note",
3564                category: &Category::Facts,
3565                memory_lane_type: None,
3566                labels: &[],
3567                metadata: &cognitive_metadata(CognitiveLevel::Contradiction, &perspective, 1, 5),
3568                embedding: None,
3569                embedding_model: None,
3570            })
3571            .await
3572            .unwrap();
3573
3574        let explicit_rows = repo
3575            .get_by_cognitive_level(ns_id, CognitiveLevel::Explicit, 10)
3576            .await
3577            .unwrap();
3578        assert_eq!(explicit_rows.len(), 1);
3579        assert_eq!(explicit_rows[0].id, explicit.id);
3580
3581        let recent = repo
3582            .get_recent_by_perspective(ns_id, &perspective, 10)
3583            .await
3584            .unwrap();
3585        assert_eq!(recent.len(), 4);
3586
3587        let reinforced = repo
3588            .get_most_reinforced_by_perspective(ns_id, &perspective, 10)
3589            .await
3590            .unwrap();
3591        assert_eq!(reinforced[0].id, derived.id);
3592        assert!(reinforced
3593            .iter()
3594            .all(|memory| memory.id != contradiction.id));
3595
3596        let contradictions = repo
3597            .get_contradictions_by_perspective(ns_id, &perspective, 10)
3598            .await
3599            .unwrap();
3600        assert_eq!(contradictions.len(), 1);
3601        assert_eq!(contradictions[0].id, contradiction.id);
3602    }
3603
3604    #[tokio::test]
3605    async fn test_store_distilled_summary_archives_sources_and_records_lineage() {
3606        let pool = setup_test_db().await;
3607        let ns_id = create_namespace(&pool, "test-agent").await;
3608        let repo = MemoryRepository::new(pool);
3609
3610        let source1 = repo
3611            .store(StoreMemoryParams {
3612                namespace_id: ns_id,
3613                content: "raw event 1",
3614                category: &Category::Session,
3615                memory_lane_type: None,
3616                labels: &["raw-activity".to_string()],
3617                metadata: &serde_json::json!({"raw_activity": true}),
3618                embedding: None,
3619                embedding_model: None,
3620            })
3621            .await
3622            .unwrap();
3623
3624        let source2 = repo
3625            .store(StoreMemoryParams {
3626                namespace_id: ns_id,
3627                content: "raw event 2",
3628                category: &Category::Session,
3629                memory_lane_type: None,
3630                labels: &["raw-activity".to_string()],
3631                metadata: &serde_json::json!({"raw_activity": true}),
3632                embedding: None,
3633                embedding_model: None,
3634            })
3635            .await
3636            .unwrap();
3637
3638        let summary = repo
3639            .store_distilled_summary(
3640                StoreMemoryParams {
3641                    namespace_id: ns_id,
3642                    content: "distilled summary",
3643                    category: &Category::Session,
3644                    memory_lane_type: None,
3645                    labels: &["activity-summary".to_string()],
3646                    metadata: &serde_json::json!({"pipeline": "distill-v1"}),
3647                    embedding: None,
3648                    embedding_model: None,
3649                },
3650                &[source1.id, source2.id],
3651            )
3652            .await
3653            .unwrap();
3654
3655        let source1_after = repo.get_by_id(source1.id).await.unwrap().unwrap();
3656        let source2_after = repo.get_by_id(source2.id).await.unwrap().unwrap();
3657        assert!(!source1_after.is_active);
3658        assert!(source1_after.is_archived);
3659        assert!(!source2_after.is_active);
3660        assert!(source2_after.is_archived);
3661
3662        let lineage = repo.load_lineage(summary.id).await.unwrap();
3663        assert_eq!(lineage.len(), 2);
3664        assert!(lineage.iter().all(|entry| entry.evidence_role == "source"));
3665    }
3666
3667    #[tokio::test]
3668    async fn test_load_lineage_empty() {
3669        let pool = setup_test_db().await;
3670        let _ns_id = create_namespace(&pool, "test-agent").await;
3671        let repo = MemoryRepository::new(pool);
3672
3673        let lineage = repo.load_lineage(9999).await.unwrap();
3674        assert!(lineage.is_empty());
3675    }
3676
3677    // ---- Raw-noise exclusion tests ----
3678
3679    #[tokio::test]
3680    async fn test_recent_perspective_excludes_raw_noise() {
3681        let pool = setup_test_db().await;
3682        let ns_id = create_namespace(&pool, "test-agent").await;
3683        let repo = MemoryRepository::new(pool);
3684        let perspective = PerspectiveKey::new("claude-code", "claude-code", Some("s1".into()));
3685
3686        // Store a clean memory.
3687        repo.store(StoreMemoryParams {
3688            namespace_id: ns_id,
3689            content: "clean observation",
3690            category: &Category::Facts,
3691            memory_lane_type: None,
3692            labels: &[],
3693            metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective, 0, 0),
3694            embedding: None,
3695            embedding_model: None,
3696        })
3697        .await
3698        .unwrap();
3699
3700        // Store a raw-activity noise memory (both label and metadata markers).
3701        repo.store(StoreMemoryParams {
3702            namespace_id: ns_id,
3703            content: "raw noise payload",
3704            category: &Category::Session,
3705            memory_lane_type: None,
3706            labels: &["raw-activity".to_string()],
3707            metadata: &serde_json::json!({
3708                "raw_activity": true,
3709                "cognitive": {
3710                    "level": "raw",
3711                    "observer": perspective.observer,
3712                    "subject": perspective.subject,
3713                    "session_key": perspective.session_key,
3714                    "source_memory_ids": [],
3715                    "confidence": 0.5,
3716                    "times_reinforced": 0,
3717                    "times_contradicted": 0,
3718                    "generated_by": "test"
3719                }
3720            }),
3721            embedding: None,
3722            embedding_model: None,
3723        })
3724        .await
3725        .unwrap();
3726
3727        // Default query should exclude the noise.
3728        let recent = repo
3729            .get_recent_by_perspective(ns_id, &perspective, 10)
3730            .await
3731            .unwrap();
3732        assert_eq!(recent.len(), 1);
3733        assert_eq!(recent[0].content, "clean observation");
3734
3735        // With include_raw, both should appear.
3736        let recent_all = repo
3737            .get_recent_by_perspective_opts(ns_id, &perspective, 10, true)
3738            .await
3739            .unwrap();
3740        assert_eq!(recent_all.len(), 2);
3741    }
3742
3743    #[tokio::test]
3744    async fn test_semantic_candidates_respect_perspective_and_raw_noise_filtering() {
3745        let pool = setup_test_db().await;
3746        let ns_id = create_namespace(&pool, "test-agent").await;
3747        let repo = MemoryRepository::new(pool);
3748        let perspective = PerspectiveKey::new("claude-code", "claude-code", Some("s1".into()));
3749
3750        repo.store(StoreMemoryParams {
3751            namespace_id: ns_id,
3752            content: "clean semantic observation",
3753            category: &Category::Facts,
3754            memory_lane_type: None,
3755            labels: &[],
3756            metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective, 0, 0),
3757            embedding: Some(&[0.1_f32; 384]),
3758            embedding_model: Some("mock"),
3759        })
3760        .await
3761        .unwrap();
3762
3763        repo.store(StoreMemoryParams {
3764            namespace_id: ns_id,
3765            content: "raw semantic noise",
3766            category: &Category::Session,
3767            memory_lane_type: None,
3768            labels: &["raw-activity".to_string()],
3769            metadata: &serde_json::json!({
3770                "raw_activity": true,
3771                "cognitive": {
3772                    "level": "raw",
3773                    "observer": "claude-code",
3774                    "subject": "claude-code",
3775                    "session_key": "s1",
3776                    "generated_by": "test"
3777                }
3778            }),
3779            embedding: Some(&[0.2_f32; 384]),
3780            embedding_model: Some("mock"),
3781        })
3782        .await
3783        .unwrap();
3784
3785        repo.store(StoreMemoryParams {
3786            namespace_id: ns_id,
3787            content: "other perspective semantic",
3788            category: &Category::Facts,
3789            memory_lane_type: None,
3790            labels: &[],
3791            metadata: &serde_json::json!({
3792                "cognitive": {
3793                    "level": "explicit",
3794                    "observer": "codex",
3795                    "subject": "codex",
3796                    "session_key": "s1",
3797                    "generated_by": "test"
3798                }
3799            }),
3800            embedding: Some(&[0.3_f32; 384]),
3801            embedding_model: Some("mock"),
3802        })
3803        .await
3804        .unwrap();
3805
3806        let candidates = repo
3807            .get_semantic_candidates(SemanticCandidateParams {
3808                namespace_id: ns_id,
3809                perspective: Some(&perspective),
3810                limit: 10,
3811                include_raw: false,
3812            })
3813            .await
3814            .unwrap();
3815
3816        assert_eq!(candidates.len(), 1);
3817        assert_eq!(candidates[0].content, "clean semantic observation");
3818    }
3819
3820    #[tokio::test]
3821    async fn test_semantic_candidates_match_session_keys_array() {
3822        let pool = setup_test_db().await;
3823        let ns_id = create_namespace(&pool, "test-agent").await;
3824        let repo = MemoryRepository::new(pool);
3825        let perspective = PerspectiveKey::new("claude-code", "claude-code", Some("s-array".into()));
3826
3827        repo.store(StoreMemoryParams {
3828            namespace_id: ns_id,
3829            content: "session array semantic observation",
3830            category: &Category::Facts,
3831            memory_lane_type: None,
3832            labels: &[],
3833            metadata: &serde_json::json!({
3834                "cognitive": {
3835                    "level": "explicit",
3836                    "observer": "claude-code",
3837                    "subject": "claude-code",
3838                    "session_keys": ["s-array", "s-other"],
3839                    "generated_by": "test"
3840                }
3841            }),
3842            embedding: Some(&[0.4_f32; 384]),
3843            embedding_model: Some("mock"),
3844        })
3845        .await
3846        .unwrap();
3847
3848        let candidates = repo
3849            .get_semantic_candidates(SemanticCandidateParams {
3850                namespace_id: ns_id,
3851                perspective: Some(&perspective),
3852                limit: 10,
3853                include_raw: false,
3854            })
3855            .await
3856            .unwrap();
3857
3858        assert_eq!(candidates.len(), 1);
3859        assert_eq!(candidates[0].content, "session array semantic observation");
3860    }
3861
3862    #[tokio::test]
3863    async fn test_reinforced_perspective_excludes_raw_noise() {
3864        let pool = setup_test_db().await;
3865        let ns_id = create_namespace(&pool, "test-agent").await;
3866        let repo = MemoryRepository::new(pool);
3867        let perspective = PerspectiveKey::new("claude-code", "claude-code", Some("s1".into()));
3868
3869        repo.store(StoreMemoryParams {
3870            namespace_id: ns_id,
3871            content: "reinforced insight",
3872            category: &Category::Facts,
3873            memory_lane_type: None,
3874            labels: &[],
3875            metadata: &cognitive_metadata(CognitiveLevel::Derived, &perspective, 5, 0),
3876            embedding: None,
3877            embedding_model: None,
3878        })
3879        .await
3880        .unwrap();
3881
3882        repo.store(StoreMemoryParams {
3883            namespace_id: ns_id,
3884            content: "raw noise",
3885            category: &Category::Session,
3886            memory_lane_type: None,
3887            labels: &["raw-activity".to_string()],
3888            metadata: &serde_json::json!({
3889                "raw_activity": true,
3890                "cognitive": {
3891                    "level": "raw",
3892                    "observer": perspective.observer,
3893                    "subject": perspective.subject,
3894                    "session_key": perspective.session_key,
3895                    "source_memory_ids": [],
3896                    "confidence": 0.5,
3897                    "times_reinforced": 0,
3898                    "times_contradicted": 0,
3899                    "generated_by": "test"
3900                }
3901            }),
3902            embedding: None,
3903            embedding_model: None,
3904        })
3905        .await
3906        .unwrap();
3907
3908        let reinforced = repo
3909            .get_most_reinforced_by_perspective(ns_id, &perspective, 10)
3910            .await
3911            .unwrap();
3912        assert_eq!(reinforced.len(), 1);
3913        assert_eq!(reinforced[0].content, "reinforced insight");
3914    }
3915
3916    #[tokio::test]
3917    async fn test_contradictions_perspective_excludes_raw_noise() {
3918        let pool = setup_test_db().await;
3919        let ns_id = create_namespace(&pool, "test-agent").await;
3920        let repo = MemoryRepository::new(pool);
3921        let perspective = PerspectiveKey::new("claude-code", "claude-code", Some("s1".into()));
3922
3923        repo.store(StoreMemoryParams {
3924            namespace_id: ns_id,
3925            content: "a real contradiction",
3926            category: &Category::Facts,
3927            memory_lane_type: None,
3928            labels: &[],
3929            metadata: &cognitive_metadata(CognitiveLevel::Contradiction, &perspective, 0, 3),
3930            embedding: None,
3931            embedding_model: None,
3932        })
3933        .await
3934        .unwrap();
3935
3936        repo.store(StoreMemoryParams {
3937            namespace_id: ns_id,
3938            content: "raw noise",
3939            category: &Category::Session,
3940            memory_lane_type: None,
3941            labels: &["raw-activity".to_string()],
3942            metadata: &serde_json::json!({
3943                "raw_activity": true,
3944                "cognitive": {
3945                    "level": "raw",
3946                    "observer": perspective.observer,
3947                    "subject": perspective.subject,
3948                    "session_key": perspective.session_key,
3949                    "source_memory_ids": [],
3950                    "confidence": 0.5,
3951                    "times_reinforced": 0,
3952                    "times_contradicted": 0,
3953                    "generated_by": "test"
3954                }
3955            }),
3956            embedding: None,
3957            embedding_model: None,
3958        })
3959        .await
3960        .unwrap();
3961
3962        let contradictions = repo
3963            .get_contradictions_by_perspective(ns_id, &perspective, 10)
3964            .await
3965            .unwrap();
3966        assert_eq!(contradictions.len(), 1);
3967        assert_eq!(contradictions[0].content, "a real contradiction");
3968    }
3969
3970    // ---- search_working_set tests ----
3971
3972    #[tokio::test]
3973    async fn test_search_working_set_basic() {
3974        let pool = setup_test_db().await;
3975        let ns_id = create_namespace(&pool, "test-agent").await;
3976        let repo = MemoryRepository::new(pool);
3977        let perspective = PerspectiveKey::new("claude-code", "claude-code", Some("s1".into()));
3978
3979        // Store memories across different cognitive levels.
3980        let _raw = repo
3981            .store(StoreMemoryParams {
3982                namespace_id: ns_id,
3983                content: "raw note",
3984                category: &Category::Session,
3985                memory_lane_type: None,
3986                labels: &[],
3987                metadata: &cognitive_metadata(CognitiveLevel::Raw, &perspective, 0, 0),
3988                embedding: None,
3989                embedding_model: None,
3990            })
3991            .await
3992            .unwrap();
3993
3994        let explicit = repo
3995            .store(StoreMemoryParams {
3996                namespace_id: ns_id,
3997                content: "explicit fact",
3998                category: &Category::Facts,
3999                memory_lane_type: None,
4000                labels: &[],
4001                metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective, 3, 0),
4002                embedding: None,
4003                embedding_model: None,
4004            })
4005            .await
4006            .unwrap();
4007
4008        let derived = repo
4009            .store(StoreMemoryParams {
4010                namespace_id: ns_id,
4011                content: "derived insight",
4012                category: &Category::Facts,
4013                memory_lane_type: None,
4014                labels: &[],
4015                metadata: &cognitive_metadata(CognitiveLevel::Derived, &perspective, 8, 0),
4016                embedding: None,
4017                embedding_model: None,
4018            })
4019            .await
4020            .unwrap();
4021
4022        let contradiction = repo
4023            .store(StoreMemoryParams {
4024                namespace_id: ns_id,
4025                content: "contradiction",
4026                category: &Category::Facts,
4027                memory_lane_type: None,
4028                labels: &[],
4029                metadata: &cognitive_metadata(CognitiveLevel::Contradiction, &perspective, 0, 2),
4030                embedding: None,
4031                embedding_model: None,
4032            })
4033            .await
4034            .unwrap();
4035
4036        let result = repo
4037            .search_working_set(WorkingSetParams {
4038                namespace_id: ns_id,
4039                perspective: Some(&perspective),
4040                max_items: 20,
4041                include_raw: false,
4042            })
4043            .await
4044            .unwrap();
4045
4046        // Should contain all non-noise memories. Order: reinforced first, then recent,
4047        // then contradictions.
4048        assert!(result.len() >= 3);
4049        let ids: Vec<i64> = result.iter().map(|m| m.id).collect();
4050        assert!(ids.contains(&explicit.id));
4051        assert!(ids.contains(&derived.id));
4052        assert!(ids.contains(&contradiction.id));
4053    }
4054
4055    #[tokio::test]
4056    async fn test_search_working_set_dedupes() {
4057        let pool = setup_test_db().await;
4058        let ns_id = create_namespace(&pool, "test-agent").await;
4059        let repo = MemoryRepository::new(pool);
4060        let perspective = PerspectiveKey::new("claude-code", "claude-code", Some("s1".into()));
4061
4062        // A memory that is both reinforced and recent should appear only once.
4063        let shared = repo
4064            .store(StoreMemoryParams {
4065                namespace_id: ns_id,
4066                content: "shared memory",
4067                category: &Category::Facts,
4068                memory_lane_type: None,
4069                labels: &[],
4070                metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective, 10, 0),
4071                embedding: None,
4072                embedding_model: None,
4073            })
4074            .await
4075            .unwrap();
4076
4077        let result = repo
4078            .search_working_set(WorkingSetParams {
4079                namespace_id: ns_id,
4080                perspective: Some(&perspective),
4081                max_items: 20,
4082                include_raw: false,
4083            })
4084            .await
4085            .unwrap();
4086
4087        let count = result.iter().filter(|m| m.id == shared.id).count();
4088        assert_eq!(count, 1, "shared memory should appear exactly once");
4089    }
4090
4091    #[tokio::test]
4092    async fn test_search_working_set_respects_max_items() {
4093        let pool = setup_test_db().await;
4094        let ns_id = create_namespace(&pool, "test-agent").await;
4095        let repo = MemoryRepository::new(pool);
4096        let perspective = PerspectiveKey::new("claude-code", "claude-code", Some("s1".into()));
4097
4098        for i in 0..10 {
4099            let content = format!("memory {}", i);
4100            repo.store(StoreMemoryParams {
4101                namespace_id: ns_id,
4102                content: &content,
4103                category: &Category::Facts,
4104                memory_lane_type: None,
4105                labels: &[],
4106                metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective, i as i64, 0),
4107                embedding: None,
4108                embedding_model: None,
4109            })
4110            .await
4111            .unwrap();
4112        }
4113
4114        let result = repo
4115            .search_working_set(WorkingSetParams {
4116                namespace_id: ns_id,
4117                perspective: Some(&perspective),
4118                max_items: 3,
4119                include_raw: false,
4120            })
4121            .await
4122            .unwrap();
4123
4124        assert_eq!(result.len(), 3);
4125    }
4126
4127    #[tokio::test]
4128    async fn test_search_working_set_excludes_raw_noise() {
4129        let pool = setup_test_db().await;
4130        let ns_id = create_namespace(&pool, "test-agent").await;
4131        let repo = MemoryRepository::new(pool);
4132        let perspective = PerspectiveKey::new("claude-code", "claude-code", Some("s1".into()));
4133
4134        repo.store(StoreMemoryParams {
4135            namespace_id: ns_id,
4136            content: "real observation",
4137            category: &Category::Facts,
4138            memory_lane_type: None,
4139            labels: &[],
4140            metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective, 1, 0),
4141            embedding: None,
4142            embedding_model: None,
4143        })
4144        .await
4145        .unwrap();
4146
4147        repo.store(StoreMemoryParams {
4148            namespace_id: ns_id,
4149            content: "raw noise",
4150            category: &Category::Session,
4151            memory_lane_type: None,
4152            labels: &["raw-activity".to_string()],
4153            metadata: &serde_json::json!({"raw_activity": true, "cognitive": {"level": "raw"}}),
4154            embedding: None,
4155            embedding_model: None,
4156        })
4157        .await
4158        .unwrap();
4159
4160        let result = repo
4161            .search_working_set(WorkingSetParams {
4162                namespace_id: ns_id,
4163                perspective: Some(&perspective),
4164                max_items: 20,
4165                include_raw: false,
4166            })
4167            .await
4168            .unwrap();
4169
4170        assert!(result.iter().all(|m| m.content != "raw noise"));
4171        assert!(result.iter().any(|m| m.content == "real observation"));
4172    }
4173
4174    #[tokio::test]
4175    async fn test_search_working_set_without_perspective() {
4176        let pool = setup_test_db().await;
4177        let ns_id = create_namespace(&pool, "test-agent").await;
4178        let repo = MemoryRepository::new(pool);
4179
4180        repo.store(StoreMemoryParams {
4181            namespace_id: ns_id,
4182            content: "namespace memory one",
4183            category: &Category::Facts,
4184            memory_lane_type: None,
4185            labels: &[],
4186            metadata: &serde_json::json!({"cognitive": {"level": "explicit"}}),
4187            embedding: None,
4188            embedding_model: None,
4189        })
4190        .await
4191        .unwrap();
4192
4193        repo.store(StoreMemoryParams {
4194            namespace_id: ns_id,
4195            content: "namespace memory two",
4196            category: &Category::Facts,
4197            memory_lane_type: None,
4198            labels: &[],
4199            metadata: &serde_json::json!({"cognitive": {"level": "explicit"}}),
4200            embedding: None,
4201            embedding_model: None,
4202        })
4203        .await
4204        .unwrap();
4205
4206        let result = repo
4207            .search_working_set(WorkingSetParams {
4208                namespace_id: ns_id,
4209                perspective: None,
4210                max_items: 20,
4211                include_raw: false,
4212            })
4213            .await
4214            .unwrap();
4215
4216        assert!(result.len() >= 2);
4217    }
4218
4219    #[tokio::test]
4220    async fn test_list_by_session_key_matches_session_keys_array() {
4221        let pool = setup_test_db().await;
4222        let ns_id = create_namespace(&pool, "test-agent").await;
4223        let repo = MemoryRepository::new(pool);
4224
4225        repo.store(StoreMemoryParams {
4226            namespace_id: ns_id,
4227            content: "shared explicit memory",
4228            category: &Category::Facts,
4229            memory_lane_type: None,
4230            labels: &[],
4231            metadata: &serde_json::json!({
4232                "cognitive": {
4233                    "level": "explicit",
4234                    "session_key": "session-b",
4235                    "session_keys": ["session-a", "session-b"]
4236                }
4237            }),
4238            embedding: None,
4239            embedding_model: None,
4240        })
4241        .await
4242        .unwrap();
4243
4244        let session_a = repo
4245            .list_by_session_key(ns_id, "session-a", 10, false)
4246            .await
4247            .unwrap();
4248        let session_b = repo
4249            .list_by_session_key(ns_id, "session-b", 10, false)
4250            .await
4251            .unwrap();
4252
4253        assert_eq!(session_a.len(), 1);
4254        assert_eq!(session_b.len(), 1);
4255    }
4256
4257    #[tokio::test]
4258    async fn test_count_evidence_returns_zero_for_empty_namespace() {
4259        let pool = setup_test_db().await;
4260        let ns_id = create_namespace(&pool, "test-agent").await;
4261        let repo = MemoryRepository::new(pool);
4262
4263        let count = repo.count_evidence(ns_id).await.unwrap();
4264        assert_eq!(count, 0);
4265    }
4266
4267    #[tokio::test]
4268    async fn test_count_evidence_counts_lineage_edges() {
4269        let pool = setup_test_db().await;
4270        let ns_id = create_namespace(&pool, "test-agent").await;
4271        let repo = MemoryRepository::new(pool);
4272
4273        let source = repo
4274            .store(StoreMemoryParams {
4275                namespace_id: ns_id,
4276                content: "source memory",
4277                category: &Category::Session,
4278                memory_lane_type: None,
4279                labels: &[],
4280                metadata: &serde_json::json!({}),
4281                embedding: None,
4282                embedding_model: None,
4283            })
4284            .await
4285            .unwrap();
4286
4287        let _derived = repo
4288            .store_with_lineage(StoreMemoryWithLineageParams {
4289                store: StoreMemoryParams {
4290                    namespace_id: ns_id,
4291                    content: "derived with evidence",
4292                    category: &Category::Facts,
4293                    memory_lane_type: None,
4294                    labels: &[],
4295                    metadata: &serde_json::json!({"cognitive": {"level": "derived"}}),
4296                    embedding: None,
4297                    embedding_model: None,
4298                },
4299                source_memory_ids: &[source.id],
4300                evidence_role: "source",
4301            })
4302            .await
4303            .unwrap();
4304
4305        let count = repo.count_evidence(ns_id).await.unwrap();
4306        assert_eq!(count, 1);
4307    }
4308
4309    #[tokio::test]
4310    async fn test_count_evidence_does_not_count_other_namespace() {
4311        let pool = setup_test_db().await;
4312        let ns_a = create_namespace(&pool, "agent-a").await;
4313        let ns_b = create_namespace(&pool, "agent-b").await;
4314        let repo = MemoryRepository::new(pool);
4315
4316        let source = repo
4317            .store(StoreMemoryParams {
4318                namespace_id: ns_a,
4319                content: "source in ns-a",
4320                category: &Category::Session,
4321                memory_lane_type: None,
4322                labels: &[],
4323                metadata: &serde_json::json!({}),
4324                embedding: None,
4325                embedding_model: None,
4326            })
4327            .await
4328            .unwrap();
4329
4330        let _derived = repo
4331            .store_with_lineage(StoreMemoryWithLineageParams {
4332                store: StoreMemoryParams {
4333                    namespace_id: ns_a,
4334                    content: "derived in ns-a",
4335                    category: &Category::Facts,
4336                    memory_lane_type: None,
4337                    labels: &[],
4338                    metadata: &serde_json::json!({}),
4339                    embedding: None,
4340                    embedding_model: None,
4341                },
4342                source_memory_ids: &[source.id],
4343                evidence_role: "source",
4344            })
4345            .await
4346            .unwrap();
4347
4348        assert_eq!(repo.count_evidence(ns_a).await.unwrap(), 1);
4349        assert_eq!(repo.count_evidence(ns_b).await.unwrap(), 0);
4350    }
4351
4352    #[tokio::test]
4353    async fn test_count_by_cognitive_level_returns_matching_total() {
4354        let pool = setup_test_db().await;
4355        let ns_id = create_namespace(&pool, "level-counts").await;
4356        let repo = MemoryRepository::new(pool);
4357
4358        for (content, level) in [
4359            ("raw event", CognitiveLevel::Raw),
4360            ("derived insight", CognitiveLevel::Derived),
4361            ("derived insight 2", CognitiveLevel::Derived),
4362            ("contradiction note", CognitiveLevel::Contradiction),
4363        ] {
4364            repo.store(StoreMemoryParams {
4365                namespace_id: ns_id,
4366                content,
4367                category: &Category::Session,
4368                memory_lane_type: None,
4369                labels: &[],
4370                metadata: &serde_json::json!({
4371                    "cognitive": {
4372                        "level": level.as_str(),
4373                        "observer": "claude-code",
4374                        "subject": "claude-code",
4375                        "generated_by": "test"
4376                    }
4377                }),
4378                embedding: None,
4379                embedding_model: None,
4380            })
4381            .await
4382            .unwrap();
4383        }
4384
4385        assert_eq!(
4386            repo.count_by_cognitive_level(ns_id, CognitiveLevel::Derived)
4387                .await
4388                .unwrap(),
4389            2
4390        );
4391        assert_eq!(
4392            repo.count_by_cognitive_level(ns_id, CognitiveLevel::Contradiction)
4393                .await
4394                .unwrap(),
4395            1
4396        );
4397    }
4398
4399    /// Regression test: `get_by_cognitive_level_with_perspective` must apply
4400    /// perspective filtering in SQL BEFORE the LIMIT, so callers receive up to
4401    /// `limit` matching results instead of silently getting fewer.
4402    #[tokio::test]
4403    async fn test_get_by_cognitive_level_with_perspective_filters_before_limit() {
4404        let pool = setup_test_db().await;
4405        let ns_id = create_namespace(&pool, "perspective-limit").await;
4406        let repo = MemoryRepository::new(pool);
4407
4408        let perspective_a = PerspectiveKey::new("alice", "project-x", None);
4409        let perspective_b = PerspectiveKey::new("bob", "project-y", None);
4410
4411        // Insert 5 memories for alice + project-x at Explicit level.
4412        for i in 0..5 {
4413            repo.store(StoreMemoryParams {
4414                namespace_id: ns_id,
4415                content: &format!("alice memory {}", i),
4416                category: &Category::Facts,
4417                memory_lane_type: None,
4418                labels: &[],
4419                metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective_a, 0, 0),
4420                embedding: None,
4421                embedding_model: None,
4422            })
4423            .await
4424            .unwrap();
4425        }
4426
4427        // Insert 5 memories for bob + project-y at Explicit level.
4428        for i in 0..5 {
4429            repo.store(StoreMemoryParams {
4430                namespace_id: ns_id,
4431                content: &format!("bob memory {}", i),
4432                category: &Category::Facts,
4433                memory_lane_type: None,
4434                labels: &[],
4435                metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective_b, 0, 0),
4436                embedding: None,
4437                embedding_model: None,
4438            })
4439            .await
4440            .unwrap();
4441        }
4442
4443        // Request 3 results for alice's perspective; should get exactly 3, not fewer.
4444        let alice_results = repo
4445            .get_by_cognitive_level_with_perspective(
4446                ns_id,
4447                CognitiveLevel::Explicit,
4448                &perspective_a,
4449                3,
4450            )
4451            .await
4452            .unwrap();
4453        assert_eq!(alice_results.len(), 3);
4454        assert!(alice_results.iter().all(|m| {
4455            let meta = &m.metadata;
4456            let obs = meta
4457                .get("cognitive")
4458                .and_then(|c| c.get("observer"))
4459                .and_then(|v| v.as_str());
4460            let sub = meta
4461                .get("cognitive")
4462                .and_then(|c| c.get("subject"))
4463                .and_then(|v| v.as_str());
4464            obs == Some("alice") && sub == Some("project-x")
4465        }));
4466
4467        // Request 10 results for alice; there are only 5, so capped at 5.
4468        let alice_many = repo
4469            .get_by_cognitive_level_with_perspective(
4470                ns_id,
4471                CognitiveLevel::Explicit,
4472                &perspective_a,
4473                10,
4474            )
4475            .await
4476            .unwrap();
4477        assert_eq!(alice_many.len(), 5);
4478
4479        // Bob gets a separate set.
4480        let bob_results = repo
4481            .get_by_cognitive_level_with_perspective(
4482                ns_id,
4483                CognitiveLevel::Explicit,
4484                &perspective_b,
4485                3,
4486            )
4487            .await
4488            .unwrap();
4489        assert_eq!(bob_results.len(), 3);
4490        assert!(bob_results.iter().all(|m| {
4491            let meta = &m.metadata;
4492            let obs = meta
4493                .get("cognitive")
4494                .and_then(|c| c.get("observer"))
4495                .and_then(|v| v.as_str());
4496            let sub = meta
4497                .get("cognitive")
4498                .and_then(|c| c.get("subject"))
4499                .and_then(|v| v.as_str());
4500            obs == Some("bob") && sub == Some("project-y")
4501        }));
4502    }
4503
4504    /// Verifies that the scalar session_key field is respected in the SQL filter.
4505    #[tokio::test]
4506    async fn test_get_by_cognitive_level_with_perspective_respects_session_key() {
4507        let pool = setup_test_db().await;
4508        let ns_id = create_namespace(&pool, "session-key-scalar").await;
4509        let repo = MemoryRepository::new(pool);
4510
4511        let perspective_s1 =
4512            PerspectiveKey::new("alice", "project-x", Some("session-1".to_string()));
4513        let perspective_s2 =
4514            PerspectiveKey::new("alice", "project-x", Some("session-2".to_string()));
4515
4516        for i in 0..3 {
4517            repo.store(StoreMemoryParams {
4518                namespace_id: ns_id,
4519                content: &format!("s1 memory {}", i),
4520                category: &Category::Facts,
4521                memory_lane_type: None,
4522                labels: &[],
4523                metadata: &cognitive_metadata(CognitiveLevel::Derived, &perspective_s1, 0, 0),
4524                embedding: None,
4525                embedding_model: None,
4526            })
4527            .await
4528            .unwrap();
4529        }
4530        for i in 0..3 {
4531            repo.store(StoreMemoryParams {
4532                namespace_id: ns_id,
4533                content: &format!("s2 memory {}", i),
4534                category: &Category::Facts,
4535                memory_lane_type: None,
4536                labels: &[],
4537                metadata: &cognitive_metadata(CognitiveLevel::Derived, &perspective_s2, 0, 0),
4538                embedding: None,
4539                embedding_model: None,
4540            })
4541            .await
4542            .unwrap();
4543        }
4544
4545        let s1_results = repo
4546            .get_by_cognitive_level_with_perspective(
4547                ns_id,
4548                CognitiveLevel::Derived,
4549                &perspective_s1,
4550                10,
4551            )
4552            .await
4553            .unwrap();
4554        assert_eq!(s1_results.len(), 3);
4555        assert!(s1_results.iter().all(|m| m.content.starts_with("s1")));
4556
4557        let s2_results = repo
4558            .get_by_cognitive_level_with_perspective(
4559                ns_id,
4560                CognitiveLevel::Derived,
4561                &perspective_s2,
4562                10,
4563            )
4564            .await
4565            .unwrap();
4566        assert_eq!(s2_results.len(), 3);
4567        assert!(s2_results.iter().all(|m| m.content.starts_with("s2")));
4568    }
4569
4570    /// Verifies that memories matching only the session_keys array are included.
4571    #[tokio::test]
4572    async fn test_get_by_cognitive_level_with_perspective_matches_session_keys_array() {
4573        let pool = setup_test_db().await;
4574        let ns_id = create_namespace(&pool, "session-keys-array").await;
4575        let repo = MemoryRepository::new(pool);
4576
4577        let perspective = PerspectiveKey::new("alice", "project-x", Some("session-a".to_string()));
4578
4579        // Memory with session_key set to the same key as the perspective.
4580        repo.store(StoreMemoryParams {
4581            namespace_id: ns_id,
4582            content: "scalar match",
4583            category: &Category::Facts,
4584            memory_lane_type: None,
4585            labels: &[],
4586            metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective, 0, 0),
4587            embedding: None,
4588            embedding_model: None,
4589        })
4590        .await
4591        .unwrap();
4592
4593        // Memory with session_keys array containing the key (but different scalar session_key).
4594        repo.store(StoreMemoryParams {
4595            namespace_id: ns_id,
4596            content: "array match",
4597            category: &Category::Facts,
4598            memory_lane_type: None,
4599            labels: &[],
4600            metadata: &serde_json::json!({
4601                "cognitive": {
4602                    "level": "explicit",
4603                    "observer": "alice",
4604                    "subject": "project-x",
4605                    "session_key": "session-other",
4606                    "session_keys": ["session-a", "session-b"],
4607                    "generated_by": "test"
4608                }
4609            }),
4610            embedding: None,
4611            embedding_model: None,
4612        })
4613        .await
4614        .unwrap();
4615
4616        // Memory that does not match at all.
4617        repo.store(StoreMemoryParams {
4618            namespace_id: ns_id,
4619            content: "no match",
4620            category: &Category::Facts,
4621            memory_lane_type: None,
4622            labels: &[],
4623            metadata: &serde_json::json!({
4624                "cognitive": {
4625                    "level": "explicit",
4626                    "observer": "alice",
4627                    "subject": "project-x",
4628                    "session_key": "session-other",
4629                    "session_keys": ["session-z"],
4630                    "generated_by": "test"
4631                }
4632            }),
4633            embedding: None,
4634            embedding_model: None,
4635        })
4636        .await
4637        .unwrap();
4638
4639        let results = repo
4640            .get_by_cognitive_level_with_perspective(
4641                ns_id,
4642                CognitiveLevel::Explicit,
4643                &perspective,
4644                10,
4645            )
4646            .await
4647            .unwrap();
4648        assert_eq!(results.len(), 2);
4649        let contents: Vec<_> = results.iter().map(|m| m.content.as_str()).collect();
4650        assert!(contents.contains(&"scalar match"));
4651        assert!(contents.contains(&"array match"));
4652    }
4653
4654    #[tokio::test]
4655    async fn test_record_metric_and_latest_metrics_for_namespace() {
4656        let pool = setup_test_db().await;
4657        let ns_id = create_namespace(&pool, "metric-ns").await;
4658        let other_ns = create_namespace(&pool, "metric-other").await;
4659        let repo = MemoryRepository::new(pool);
4660
4661        repo.record_metric(
4662            "cognition.query.total_ms",
4663            12.5,
4664            &serde_json::json!({"namespace_id": ns_id, "stage": "total", "unit": "ms"}),
4665        )
4666        .await
4667        .unwrap();
4668        repo.record_metric(
4669            "cognition.query.total_ms",
4670            18.0,
4671            &serde_json::json!({"namespace_id": other_ns, "stage": "total", "unit": "ms"}),
4672        )
4673        .await
4674        .unwrap();
4675        repo.record_metric(
4676            "cognition.representation.total_ms",
4677            4.0,
4678            &serde_json::json!({"namespace_id": ns_id, "stage": "total", "unit": "ms"}),
4679        )
4680        .await
4681        .unwrap();
4682
4683        let metrics = repo
4684            .latest_metrics_for_namespace(ns_id, Some("cognition."), 10)
4685            .await
4686            .unwrap();
4687
4688        assert_eq!(metrics.len(), 2);
4689        assert!(metrics
4690            .iter()
4691            .all(|metric| metric.labels.contains(&ns_id.to_string())));
4692        assert!(metrics
4693            .iter()
4694            .any(|metric| metric.metric_name == "cognition.query.total_ms"));
4695        assert!(metrics
4696            .iter()
4697            .any(|metric| metric.metric_name == "cognition.representation.total_ms"));
4698        assert!(metrics
4699            .iter()
4700            .all(|metric| { metric.metric_name.starts_with("cognition.") }));
4701    }
4702
4703    #[tokio::test]
4704    async fn test_record_metrics_batch_persists_all_samples() {
4705        let pool = setup_test_db().await;
4706        let ns_id = create_namespace(&pool, "metric-batch").await;
4707        let repo = MemoryRepository::new(pool);
4708
4709        repo.record_metrics_batch(&[
4710            MetricSample {
4711                metric_name: "cognition.query.total_ms".to_string(),
4712                metric_value: 9.5,
4713                labels: serde_json::json!({"namespace_id": ns_id, "stage": "total", "unit": "ms"}),
4714            },
4715            MetricSample {
4716                metric_name: "cognition.query.answer.total_tokens".to_string(),
4717                metric_value: 128.0,
4718                labels: serde_json::json!({"namespace_id": ns_id, "stage": "answer", "unit": "tokens"}),
4719            },
4720        ])
4721        .await
4722        .unwrap();
4723
4724        let metrics = repo
4725            .latest_metrics_for_namespace(ns_id, Some("cognition."), 10)
4726            .await
4727            .unwrap();
4728
4729        assert_eq!(metrics.len(), 2);
4730    }
4731
4732    // ---- Observability query tests ----
4733
4734    #[tokio::test]
4735    async fn test_list_jobs_returns_enqueued_jobs() {
4736        let pool = setup_test_db().await;
4737        let ns_id = create_namespace(&pool, "obs-jobs").await;
4738        let repo = MemoryRepository::new(pool);
4739
4740        repo.enqueue_job(EnqueueJobParams {
4741            namespace_id: ns_id,
4742            job_type: "derive",
4743            priority: 10,
4744            perspective: None,
4745            payload: &serde_json::json!({"a": 1}),
4746        })
4747        .await
4748        .unwrap();
4749
4750        repo.enqueue_job(EnqueueJobParams {
4751            namespace_id: ns_id,
4752            job_type: "digest",
4753            priority: 5,
4754            perspective: None,
4755            payload: &serde_json::json!({"b": 2}),
4756        })
4757        .await
4758        .unwrap();
4759
4760        // List all jobs.
4761        let all = repo.list_jobs(ns_id, None, None, 50, 0).await.unwrap();
4762        assert_eq!(all.len(), 2);
4763
4764        // Filter by job_type.
4765        let derive_only = repo
4766            .list_jobs(ns_id, Some("derive"), None, 50, 0)
4767            .await
4768            .unwrap();
4769        assert_eq!(derive_only.len(), 1);
4770        assert_eq!(derive_only[0].job_type, "derive");
4771
4772        // Filter by status (both pending).
4773        let pending = repo
4774            .list_jobs(ns_id, None, Some("pending"), 50, 0)
4775            .await
4776            .unwrap();
4777        assert_eq!(pending.len(), 2);
4778
4779        // Combined filter.
4780        let digest_pending = repo
4781            .list_jobs(ns_id, Some("digest"), Some("pending"), 50, 0)
4782            .await
4783            .unwrap();
4784        assert_eq!(digest_pending.len(), 1);
4785    }
4786
4787    #[tokio::test]
4788    async fn test_list_jobs_respects_limit_offset() {
4789        let pool = setup_test_db().await;
4790        let ns_id = create_namespace(&pool, "obs-limit").await;
4791        let repo = MemoryRepository::new(pool);
4792
4793        for i in 0..5 {
4794            repo.enqueue_job(EnqueueJobParams {
4795                namespace_id: ns_id,
4796                job_type: "derive",
4797                priority: i,
4798                perspective: None,
4799                payload: &serde_json::json!({"i": i}),
4800            })
4801            .await
4802            .unwrap();
4803        }
4804
4805        let page1 = repo.list_jobs(ns_id, None, None, 2, 0).await.unwrap();
4806        assert_eq!(page1.len(), 2);
4807
4808        let page2 = repo.list_jobs(ns_id, None, None, 2, 2).await.unwrap();
4809        assert_eq!(page2.len(), 2);
4810
4811        let page3 = repo.list_jobs(ns_id, None, None, 2, 4).await.unwrap();
4812        assert_eq!(page3.len(), 1);
4813    }
4814
4815    #[tokio::test]
4816    async fn test_count_jobs_by_status() {
4817        let pool = setup_test_db().await;
4818        let ns_id = create_namespace(&pool, "obs-count").await;
4819        let repo = MemoryRepository::new(pool);
4820
4821        repo.enqueue_job(EnqueueJobParams {
4822            namespace_id: ns_id,
4823            job_type: "derive",
4824            priority: 10,
4825            perspective: None,
4826            payload: &serde_json::json!({}),
4827        })
4828        .await
4829        .unwrap();
4830
4831        repo.enqueue_job(EnqueueJobParams {
4832            namespace_id: ns_id,
4833            job_type: "derive",
4834            priority: 5,
4835            perspective: None,
4836            payload: &serde_json::json!({}),
4837        })
4838        .await
4839        .unwrap();
4840
4841        repo.enqueue_job(EnqueueJobParams {
4842            namespace_id: ns_id,
4843            job_type: "digest",
4844            priority: 10,
4845            perspective: None,
4846            payload: &serde_json::json!({}),
4847        })
4848        .await
4849        .unwrap();
4850
4851        // All jobs.
4852        let all_counts = repo.count_jobs_by_status(ns_id, None).await.unwrap();
4853        let total: i64 = all_counts.iter().map(|(_, c)| c).sum();
4854        assert_eq!(total, 3);
4855
4856        // Filtered by job_type.
4857        let derive_counts = repo
4858            .count_jobs_by_status(ns_id, Some("derive"))
4859            .await
4860            .unwrap();
4861        let derive_total: i64 = derive_counts.iter().map(|(_, c)| c).sum();
4862        assert_eq!(derive_total, 2);
4863    }
4864
4865    #[tokio::test]
4866    async fn test_count_jobs_respects_filters() {
4867        let pool = setup_test_db().await;
4868        let ns_id = create_namespace(&pool, "obs-job-total").await;
4869        let repo = MemoryRepository::new(pool);
4870
4871        repo.enqueue_job(EnqueueJobParams {
4872            namespace_id: ns_id,
4873            job_type: "derive",
4874            priority: 10,
4875            perspective: None,
4876            payload: &serde_json::json!({"index": 1}),
4877        })
4878        .await
4879        .unwrap();
4880        repo.enqueue_job(EnqueueJobParams {
4881            namespace_id: ns_id,
4882            job_type: "derive",
4883            priority: 5,
4884            perspective: None,
4885            payload: &serde_json::json!({"index": 2}),
4886        })
4887        .await
4888        .unwrap();
4889        repo.enqueue_job(EnqueueJobParams {
4890            namespace_id: ns_id,
4891            job_type: "digest",
4892            priority: 1,
4893            perspective: None,
4894            payload: &serde_json::json!({"index": 3}),
4895        })
4896        .await
4897        .unwrap();
4898
4899        assert_eq!(repo.count_jobs(ns_id, None, None).await.unwrap(), 3);
4900        assert_eq!(
4901            repo.count_jobs(ns_id, Some("derive"), None).await.unwrap(),
4902            2
4903        );
4904        assert_eq!(
4905            repo.count_jobs(ns_id, Some("derive"), Some("pending"))
4906                .await
4907                .unwrap(),
4908            2
4909        );
4910        assert_eq!(
4911            repo.count_jobs(ns_id, Some("reflect"), Some("pending"))
4912                .await
4913                .unwrap(),
4914            0
4915        );
4916    }
4917
4918    #[tokio::test]
4919    async fn test_list_digests_and_count() {
4920        let pool = setup_test_db().await;
4921        let ns_id = create_namespace(&pool, "obs-digests").await;
4922        let repo = MemoryRepository::new(pool);
4923
4924        // Store a memory to use as digest content.
4925        let mem = repo
4926            .store(StoreMemoryParams {
4927                namespace_id: ns_id,
4928                content: "digest content",
4929                category: &Category::Session,
4930                memory_lane_type: None,
4931                labels: &[],
4932                metadata: &serde_json::json!({"cognitive": {"level": "summary_short"}}),
4933                embedding: None,
4934                embedding_model: None,
4935            })
4936            .await
4937            .unwrap();
4938
4939        repo.store_digest(StoreDigestParams {
4940            namespace_id: ns_id,
4941            session_key: "session-1",
4942            digest_kind: "short",
4943            memory_id: mem.id,
4944            start_memory_id: Some(1),
4945            end_memory_id: Some(10),
4946            token_count: 50,
4947        })
4948        .await
4949        .unwrap();
4950
4951        repo.store_digest(StoreDigestParams {
4952            namespace_id: ns_id,
4953            session_key: "session-2",
4954            digest_kind: "long",
4955            memory_id: mem.id,
4956            start_memory_id: Some(11),
4957            end_memory_id: Some(20),
4958            token_count: 100,
4959        })
4960        .await
4961        .unwrap();
4962
4963        // List all digests.
4964        let all = repo.list_digests(ns_id, None, 50, 0).await.unwrap();
4965        assert_eq!(all.len(), 2);
4966
4967        let total = repo.count_digests(ns_id, None).await.unwrap();
4968        assert_eq!(total, 2);
4969
4970        // Filter by session_key.
4971        let sess1 = repo
4972            .list_digests(ns_id, Some("session-1"), 50, 0)
4973            .await
4974            .unwrap();
4975        assert_eq!(sess1.len(), 1);
4976        assert_eq!(sess1[0].session_key, "session-1");
4977
4978        let sess1_count = repo.count_digests(ns_id, Some("session-1")).await.unwrap();
4979        assert_eq!(sess1_count, 1);
4980
4981        // Filter by non-existent session_key.
4982        let none = repo
4983            .list_digests(ns_id, Some("session-none"), 50, 0)
4984            .await
4985            .unwrap();
4986        assert!(none.is_empty());
4987
4988        let none_count = repo
4989            .count_digests(ns_id, Some("session-none"))
4990            .await
4991            .unwrap();
4992        assert_eq!(none_count, 0);
4993    }
4994
4995    // ---- Phase 2 Task 2: Explicit JSON decode error tests ----
4996
4997    /// Malformed labels JSON must produce an explicit Storage error,
4998    /// not silently default to an empty Vec.
4999    #[tokio::test]
5000    async fn test_row_to_memory_rejects_malformed_labels() {
5001        let pool = setup_test_db().await;
5002        let ns_id = create_namespace(&pool, "test-agent").await;
5003        let repo = MemoryRepository::new(pool);
5004
5005        // Insert a memory with valid data first.
5006        let memory = repo
5007            .store(StoreMemoryParams {
5008                namespace_id: ns_id,
5009                content: "corruption test labels",
5010                category: &Category::General,
5011                memory_lane_type: None,
5012                labels: &["valid-label".to_string()],
5013                metadata: &serde_json::Value::Null,
5014                embedding: None,
5015                embedding_model: None,
5016            })
5017            .await
5018            .unwrap();
5019
5020        // Corrupt the labels in-place to invalid JSON.
5021        sqlx::query("UPDATE memories SET labels = 'NOT VALID JSON{{{' WHERE id = ?")
5022            .bind(memory.id)
5023            .execute(repo.pool())
5024            .await
5025            .unwrap();
5026
5027        let err = repo.get_by_id(memory.id).await.unwrap_err();
5028        let msg = err.to_string();
5029        assert!(
5030            msg.contains("corrupted labels JSON"),
5031            "expected labels corruption error, got: {msg}"
5032        );
5033        assert!(msg.contains(&memory.id.to_string()));
5034    }
5035
5036    /// Malformed metadata JSON must produce an explicit Storage error,
5037    /// not silently default to Value::Null.
5038    #[tokio::test]
5039    async fn test_row_to_memory_rejects_malformed_metadata() {
5040        let pool = setup_test_db().await;
5041        let repo = MemoryRepository::new(pool);
5042
5043        // Construct a MemoryRow with corrupted metadata directly,
5044        // bypassing SQLite expression-index validation on UPDATE.
5045        let row = MemoryRow {
5046            id: 999,
5047            namespace_id: 1,
5048            content: "test".to_string(),
5049            category: "general".to_string(),
5050            memory_lane_type: None,
5051            labels: "[]".to_string(),
5052            metadata: "[truncated".to_string(), // invalid JSON
5053            similarity_score: None,
5054            relevance_score: None,
5055            content_embedding: None,
5056            embedding_model: None,
5057            created_at: Utc::now(),
5058            updated_at: None,
5059            last_accessed: None,
5060            is_active: true,
5061            is_archived: false,
5062            access_count: 0,
5063        };
5064
5065        let err = repo.row_to_memory(row).unwrap_err();
5066        let msg = err.to_string();
5067        assert!(
5068            msg.contains("corrupted metadata JSON"),
5069            "expected metadata corruption error, got: {msg}"
5070        );
5071    }
5072
5073    /// Malformed embedding JSON must produce an explicit Storage error,
5074    /// not silently default to None.
5075    #[tokio::test]
5076    async fn test_row_to_memory_rejects_malformed_embedding() {
5077        let pool = setup_test_db().await;
5078        let ns_id = create_namespace(&pool, "test-agent").await;
5079        let repo = MemoryRepository::new(pool);
5080
5081        let memory = repo
5082            .store(StoreMemoryParams {
5083                namespace_id: ns_id,
5084                content: "corruption test embedding",
5085                category: &Category::General,
5086                memory_lane_type: None,
5087                labels: &[],
5088                metadata: &serde_json::Value::Null,
5089                embedding: Some(&[0.1, 0.2, 0.3]),
5090                embedding_model: Some("test-model"),
5091            })
5092            .await
5093            .unwrap();
5094
5095        sqlx::query("UPDATE memories SET content_embedding = 'not-an-array' WHERE id = ?")
5096            .bind(memory.id)
5097            .execute(repo.pool())
5098            .await
5099            .unwrap();
5100
5101        let err = repo.get_by_id(memory.id).await.unwrap_err();
5102        let msg = err.to_string();
5103        assert!(
5104            msg.contains("corrupted embedding JSON"),
5105            "expected embedding corruption error, got: {msg}"
5106        );
5107    }
5108
5109    /// Malformed job payload JSON is permanently failed and the claim succeeds
5110    /// with an empty result, rather than poisoning the entire batch.
5111    #[tokio::test]
5112    async fn test_claim_jobs_rejects_malformed_payload() {
5113        let pool = setup_test_db().await;
5114        let ns_id = create_namespace(&pool, "test-agent").await;
5115        let repo = MemoryRepository::new(pool);
5116
5117        // Insert a job directly with corrupted payload JSON.
5118        sqlx::query(
5119            r#"
5120            INSERT INTO memory_jobs (namespace_id, job_type, status, priority, payload_json, created_at, updated_at)
5121            VALUES (?, 'derive_memory', 'pending', 100, '{INVALID_JSON}', datetime('now'), datetime('now'))
5122            "#,
5123        )
5124        .bind(ns_id)
5125        .execute(repo.pool())
5126        .await
5127        .unwrap();
5128
5129        // claim_jobs should succeed with empty result — the bad job is permanently failed.
5130        let claimed = repo
5131            .claim_jobs(ns_id, "derive_memory", "worker-1", 60, 1)
5132            .await
5133            .unwrap();
5134        assert!(
5135            claimed.is_empty(),
5136            "corrupt payload job should not be returned"
5137        );
5138
5139        // Verify the job was permanently failed.
5140        let status: String =
5141            sqlx::query_scalar("SELECT status FROM memory_jobs WHERE namespace_id = ?")
5142                .bind(ns_id)
5143                .fetch_one(repo.pool())
5144                .await
5145                .unwrap();
5146        assert_eq!(status, "failed", "corrupt job should be permanently failed");
5147
5148        let last_error: Option<String> =
5149            sqlx::query_scalar("SELECT last_error FROM memory_jobs WHERE namespace_id = ?")
5150                .bind(ns_id)
5151                .fetch_one(repo.pool())
5152                .await
5153                .unwrap();
5154        assert!(
5155            last_error
5156                .unwrap_or_default()
5157                .contains("corrupted payload JSON"),
5158            "last_error should mention payload corruption"
5159        );
5160    }
5161
5162    /// Malformed perspective JSON in claim_jobs is permanently failed and the
5163    /// claim succeeds with an empty result, rather than poisoning the batch.
5164    #[tokio::test]
5165    async fn test_claim_jobs_rejects_malformed_perspective() {
5166        let pool = setup_test_db().await;
5167        let ns_id = create_namespace(&pool, "test-agent").await;
5168        let repo = MemoryRepository::new(pool);
5169
5170        // Insert a job with valid payload but corrupted perspective JSON.
5171        sqlx::query(
5172            r#"
5173            INSERT INTO memory_jobs (namespace_id, job_type, status, priority, perspective_json, payload_json, created_at, updated_at)
5174            VALUES (?, 'derive_memory', 'pending', 100, '{BOGUS}', '{"ok": true}', datetime('now'), datetime('now'))
5175            "#,
5176        )
5177        .bind(ns_id)
5178        .execute(repo.pool())
5179        .await
5180        .unwrap();
5181
5182        // claim_jobs should succeed with empty result — the bad job is permanently failed.
5183        let claimed = repo
5184            .claim_jobs(ns_id, "derive_memory", "worker-1", 60, 1)
5185            .await
5186            .unwrap();
5187        assert!(
5188            claimed.is_empty(),
5189            "corrupt perspective job should not be returned"
5190        );
5191
5192        // Verify the job was permanently failed.
5193        let status: String =
5194            sqlx::query_scalar("SELECT status FROM memory_jobs WHERE namespace_id = ?")
5195                .bind(ns_id)
5196                .fetch_one(repo.pool())
5197                .await
5198                .unwrap();
5199        assert_eq!(status, "failed", "corrupt job should be permanently failed");
5200
5201        let last_error: Option<String> =
5202            sqlx::query_scalar("SELECT last_error FROM memory_jobs WHERE namespace_id = ?")
5203                .bind(ns_id)
5204                .fetch_one(repo.pool())
5205                .await
5206                .unwrap();
5207        assert!(
5208            last_error
5209                .unwrap_or_default()
5210                .contains("corrupted perspective JSON"),
5211            "last_error should mention perspective corruption"
5212        );
5213    }
5214
5215    /// When a batch contains one corrupt job among valid ones, the valid jobs
5216    /// are returned and only the corrupt job is permanently failed.
5217    #[tokio::test]
5218    async fn test_claim_jobs_skips_corrupt_returns_valid() {
5219        let pool = setup_test_db().await;
5220        let ns_id = create_namespace(&pool, "test-agent").await;
5221        let repo = MemoryRepository::new(pool);
5222
5223        // Enqueue 2 valid jobs.
5224        let p1 = serde_json::json!({"memory_id": 1});
5225        repo.enqueue_job(EnqueueJobParams {
5226            namespace_id: ns_id,
5227            job_type: "derive_memory",
5228            priority: 100,
5229            perspective: None,
5230            payload: &p1,
5231        })
5232        .await
5233        .unwrap();
5234        let p2 = serde_json::json!({"memory_id": 2});
5235        repo.enqueue_job(EnqueueJobParams {
5236            namespace_id: ns_id,
5237            job_type: "derive_memory",
5238            priority: 50,
5239            perspective: None,
5240            payload: &p2,
5241        })
5242        .await
5243        .unwrap();
5244
5245        // Insert 1 corrupt job at the highest priority.
5246        sqlx::query(
5247            r#"
5248            INSERT INTO memory_jobs (namespace_id, job_type, status, priority, payload_json, created_at, updated_at)
5249            VALUES (?, 'derive_memory', 'pending', 200, '{BROKEN}', datetime('now'), datetime('now'))
5250            "#,
5251        )
5252        .bind(ns_id)
5253        .execute(repo.pool())
5254        .await
5255        .unwrap();
5256
5257        // Claim all 3 — should get only the 2 valid jobs.
5258        let claimed = repo
5259            .claim_jobs(ns_id, "derive_memory", "worker-1", 60, 10)
5260            .await
5261            .unwrap();
5262        assert_eq!(
5263            claimed.len(),
5264            2,
5265            "should return 2 valid jobs, skipping the corrupt one"
5266        );
5267
5268        // The corrupt job should be permanently failed.
5269        let failed_count: i64 = sqlx::query_scalar(
5270            "SELECT COUNT(*) FROM memory_jobs WHERE namespace_id = ? AND status = 'failed'",
5271        )
5272        .bind(ns_id)
5273        .fetch_one(repo.pool())
5274        .await
5275        .unwrap();
5276        assert_eq!(failed_count, 1, "corrupt job should be permanently failed");
5277    }
5278
5279    // ---- Lifecycle management: purge tests ----
5280
5281    /// Purge should remove old completed jobs but keep recently completed ones.
5282    #[tokio::test]
5283    async fn test_purge_completed_jobs_removes_old_keeps_recent() {
5284        let pool = setup_test_db().await;
5285        let ns_id = create_namespace(&pool, "purge-test").await;
5286        let repo = MemoryRepository::new(pool);
5287
5288        // Enqueue and complete a job, then backdate its updated_at.
5289        repo.enqueue_job(EnqueueJobParams {
5290            namespace_id: ns_id,
5291            job_type: "derive_memory",
5292            priority: 10,
5293            perspective: None,
5294            payload: &serde_json::json!({"old": true}),
5295        })
5296        .await
5297        .unwrap();
5298
5299        let claimed = repo
5300            .claim_jobs(ns_id, "derive_memory", "w", 60, 10)
5301            .await
5302            .unwrap();
5303        assert_eq!(claimed.len(), 1);
5304        let old_job_id = claimed[0].row.id;
5305
5306        repo.complete_job(&claimed[0]).await.unwrap();
5307
5308        // Backdate the completed job to 30 days ago.
5309        sqlx::query("UPDATE memory_jobs SET updated_at = datetime('now', '-30 days') WHERE id = ?")
5310            .bind(old_job_id)
5311            .execute(repo.pool())
5312            .await
5313            .unwrap();
5314
5315        // Enqueue and complete a second job that stays recent.
5316        repo.enqueue_job(EnqueueJobParams {
5317            namespace_id: ns_id,
5318            job_type: "derive_memory",
5319            priority: 10,
5320            perspective: None,
5321            payload: &serde_json::json!({"new": true}),
5322        })
5323        .await
5324        .unwrap();
5325
5326        let claimed2 = repo
5327            .claim_jobs(ns_id, "derive_memory", "w", 60, 10)
5328            .await
5329            .unwrap();
5330        assert_eq!(claimed2.len(), 1);
5331        repo.complete_job(&claimed2[0]).await.unwrap();
5332
5333        // Purge with a cutoff of 7 days ago.
5334        let cutoff = Utc::now() - chrono::Duration::days(7);
5335        let deleted = repo.purge_completed_jobs(cutoff).await.unwrap();
5336        assert_eq!(deleted, 1);
5337
5338        // Verify: old job is gone, recent job remains.
5339        let remaining = repo.list_jobs(ns_id, None, None, 50, 0).await.unwrap();
5340        assert_eq!(remaining.len(), 1);
5341        assert_eq!(remaining[0].id, claimed2[0].row.id);
5342    }
5343
5344    /// Purge permanently failed jobs should only remove those with attempts >= 5.
5345    #[tokio::test]
5346    async fn test_purge_permanently_failed_jobs_removes_old_keeps_recent() {
5347        let pool = setup_test_db().await;
5348        let ns_id = create_namespace(&pool, "purge-failed").await;
5349        let repo = MemoryRepository::new(pool);
5350
5351        // Enqueue a job and fail it 5 times to make it permanently failed.
5352        repo.enqueue_job(EnqueueJobParams {
5353            namespace_id: ns_id,
5354            job_type: "derive_memory",
5355            priority: 10,
5356            perspective: None,
5357            payload: &serde_json::json!({"fail_me": true}),
5358        })
5359        .await
5360        .unwrap();
5361
5362        for _ in 0..5 {
5363            let claimed = repo
5364                .claim_jobs(ns_id, "derive_memory", "w", 60, 10)
5365                .await
5366                .unwrap();
5367            assert_eq!(claimed.len(), 1);
5368            repo.fail_job(&claimed[0], "persistent error")
5369                .await
5370                .unwrap();
5371        }
5372
5373        // Backdate to 30 days ago.
5374        sqlx::query(
5375            "UPDATE memory_jobs SET updated_at = datetime('now', '-30 days') WHERE status = ?",
5376        )
5377        .bind(memory_job_status::FAILED)
5378        .execute(repo.pool())
5379        .await
5380        .unwrap();
5381
5382        // Enqueue a second job and fail it only 2 times (still re-queueable).
5383        repo.enqueue_job(EnqueueJobParams {
5384            namespace_id: ns_id,
5385            job_type: "derive_memory",
5386            priority: 10,
5387            perspective: None,
5388            payload: &serde_json::json!({"retry_me": true}),
5389        })
5390        .await
5391        .unwrap();
5392
5393        for _ in 0..2 {
5394            let claimed = repo
5395                .claim_jobs(ns_id, "derive_memory", "w", 60, 10)
5396                .await
5397                .unwrap();
5398            assert_eq!(claimed.len(), 1);
5399            repo.fail_job(&claimed[0], "transient error").await.unwrap();
5400        }
5401
5402        // Purge with a cutoff of 7 days ago.
5403        let cutoff = Utc::now() - chrono::Duration::days(7);
5404        let deleted = repo.purge_permanently_failed_jobs(cutoff).await.unwrap();
5405        assert_eq!(deleted, 1);
5406
5407        // The permanently failed job is gone; the re-queueable job remains as pending.
5408        let remaining = repo.list_jobs(ns_id, None, None, 50, 0).await.unwrap();
5409        assert_eq!(remaining.len(), 1);
5410        assert_eq!(remaining[0].status, memory_job_status::PENDING);
5411    }
5412
5413    /// Active leasing should still work after purging old completed/failed jobs.
5414    #[tokio::test]
5415    async fn test_active_leasing_works_after_purge() {
5416        let pool = setup_test_db().await;
5417        let ns_id = create_namespace(&pool, "purge-lease").await;
5418        let repo = MemoryRepository::new(pool);
5419
5420        // Create and complete an old job.
5421        repo.enqueue_job(EnqueueJobParams {
5422            namespace_id: ns_id,
5423            job_type: "derive_memory",
5424            priority: 10,
5425            perspective: None,
5426            payload: &serde_json::json!({"old": true}),
5427        })
5428        .await
5429        .unwrap();
5430
5431        let claimed = repo
5432            .claim_jobs(ns_id, "derive_memory", "w", 60, 10)
5433            .await
5434            .unwrap();
5435        repo.complete_job(&claimed[0]).await.unwrap();
5436
5437        sqlx::query("UPDATE memory_jobs SET updated_at = datetime('now', '-30 days') WHERE id = ?")
5438            .bind(claimed[0].row.id)
5439            .execute(repo.pool())
5440            .await
5441            .unwrap();
5442
5443        // Purge old completed jobs.
5444        let cutoff = Utc::now() - chrono::Duration::days(7);
5445        repo.purge_completed_jobs(cutoff).await.unwrap();
5446
5447        // Enqueue a fresh job and verify the full claim/complete/fail cycle still works.
5448        repo.enqueue_job(EnqueueJobParams {
5449            namespace_id: ns_id,
5450            job_type: "derive_memory",
5451            priority: 20,
5452            perspective: None,
5453            payload: &serde_json::json!({"fresh": true}),
5454        })
5455        .await
5456        .unwrap();
5457
5458        let fresh_claimed = repo
5459            .claim_jobs(ns_id, "derive_memory", "worker-2", 120, 10)
5460            .await
5461            .unwrap();
5462        assert_eq!(fresh_claimed.len(), 1);
5463        assert_eq!(fresh_claimed[0].row.status, "running");
5464        assert_eq!(fresh_claimed[0].payload["fresh"], true);
5465
5466        // Complete it.
5467        repo.complete_job(&fresh_claimed[0]).await.unwrap();
5468
5469        // Verify no more claimable jobs.
5470        let empty = repo
5471            .claim_jobs(ns_id, "derive_memory", "worker-3", 60, 10)
5472            .await
5473            .unwrap();
5474        assert!(empty.is_empty());
5475    }
5476}