Skip to main content

nexus_memory_storage/
repository.rs

1//! Repository implementations for database operations
2
3use std::collections::{HashMap, HashSet};
4
5use crate::models::{
6    memory_job_status, AgentNamespaceRow, ClaimedMemoryJob, EnqueueJobParams, MemoryEvidenceRow,
7    MemoryJobRow, MemoryLineageEntry, MemoryRow, ProcessedFileRow, SessionDigestRow,
8    SystemMetricRow,
9};
10use crate::{db_error, Result};
11use chrono::{DateTime, Utc};
12use nexus_core::{
13    AgentNamespace, CognitiveLevel, Memory, MemoryCategory, MemoryLaneType, PerspectiveKey,
14};
15use sqlx::SqlitePool;
16
17/// Type alias for backward compatibility
18type Category = MemoryCategory;
19
20/// Parameters for storing a new memory
21pub struct StoreMemoryParams<'a> {
22    pub namespace_id: i64,
23    pub content: &'a str,
24    pub category: &'a Category,
25    pub memory_lane_type: Option<&'a MemoryLaneType>,
26    pub labels: &'a [String],
27    pub metadata: &'a serde_json::Value,
28    pub embedding: Option<&'a [f32]>,
29    pub embedding_model: Option<&'a str>,
30}
31
32/// Parameters for storing a memory with evidence lineage.
33pub struct StoreMemoryWithLineageParams<'a> {
34    pub store: StoreMemoryParams<'a>,
35    pub source_memory_ids: &'a [i64],
36    pub evidence_role: &'a str,
37}
38
39/// Parameters for storing or updating a session digest registration.
40pub struct StoreDigestParams<'a> {
41    pub namespace_id: i64,
42    pub session_key: &'a str,
43    pub digest_kind: &'a str,
44    pub memory_id: i64,
45    pub start_memory_id: Option<i64>,
46    pub end_memory_id: Option<i64>,
47    pub token_count: usize,
48}
49
50pub struct SessionDigestRollover {
51    pub last_digest_end_memory_id: Option<i64>,
52    pub new_memory_count: i64,
53    pub estimated_new_tokens: i64,
54}
55
56pub struct ListMemoryFilters<'a> {
57    pub category: Option<&'a str>,
58    pub since: Option<DateTime<Utc>>,
59    pub until: Option<DateTime<Utc>>,
60    pub content_like: Option<&'a str>,
61    pub include_raw: bool,
62    pub limit: i64,
63    pub offset: i64,
64}
65
66/// Parameters for [`MemoryRepository::search_working_set`].
67pub struct WorkingSetParams<'a> {
68    /// Namespace to search within.
69    pub namespace_id: i64,
70    /// Optional perspective lens. When `None`, queries fall back to
71    /// namespace-only retrieval without observer/subject filtering.
72    pub perspective: Option<&'a PerspectiveKey>,
73    /// Maximum number of memories to return after deduplication.
74    pub max_items: usize,
75    /// When `false` (default), raw-activity hook noise is excluded from
76    /// every bucket.
77    pub include_raw: bool,
78}
79
80/// Parameters for embedding-backed semantic candidate retrieval.
81pub struct SemanticCandidateParams<'a> {
82    /// Namespace to search within.
83    pub namespace_id: i64,
84    /// Optional perspective lens for observer/subject/session scoping.
85    pub perspective: Option<&'a PerspectiveKey>,
86    /// Maximum number of candidate memories to return before vector ranking.
87    pub limit: i64,
88    /// When `false`, raw-activity hook noise is excluded.
89    pub include_raw: bool,
90}
91
92/// A persisted system metric sample.
93#[derive(Debug, Clone)]
94pub struct MetricSample {
95    pub metric_name: String,
96    pub metric_value: f64,
97    pub labels: serde_json::Value,
98}
99
100/// Maximum number of retry attempts before a job is permanently failed.
101const MAX_JOB_ATTEMPTS: i64 = 5;
102const RAW_ACTIVITY_FILTER_SQL: &str =
103    "labels NOT LIKE '%raw-activity%' AND json_extract(COALESCE(metadata, '{}'), '$.raw_activity') IS NULL";
104
105// ---------------------------------------------------------------------------
106// Shared cognitive-metadata query fragments
107// ---------------------------------------------------------------------------
108// These constants are the single source of truth for how the repository
109// extracts values from the `metadata` JSON column.  Every query that touches
110// cognitive fields (observer, subject, session_key, session_keys, level,
111// times_reinforced, times_contradicted) MUST use these fragments so that
112// schema-level changes propagate consistently and cannot drift.
113// ---------------------------------------------------------------------------
114
115/// SQL expression that safely extracts a value from the metadata JSON column,
116/// defaulting to the given JSON literal when metadata is NULL.
117///
118/// Usage in a format! SQL template: `{METADATA}`
119/// Example: `json_extract({METADATA}, '$.cognitive.level')`
120const METADATA: &str = "COALESCE(metadata, '{}')";
121
122/// WHERE-clause fragment that matches a memory whose session context equals the
123/// given session key via either the scalar `session_key` field or the
124/// `session_keys` array.  The fragment expects TWO bound parameters (same value
125/// for both).
126const SESSION_KEY_FILTER: &str =
127    "(json_extract(METADATA, '$.cognitive.session_key') = ? \
128     OR EXISTS (SELECT 1 FROM json_each(COALESCE(json_extract(metadata, '$.cognitive.session_keys'), '[]')) WHERE value = ?))";
129
130/// WHERE-clause fragment that filters by perspective observer and subject.
131/// Does NOT include session_key filtering — combine with [`SESSION_KEY_FILTER`]
132/// when the perspective has a session_key.
133const PERSPECTIVE_IDENTITY_FILTER: &str = "json_extract(METADATA, '$.cognitive.observer') = ? \
134     AND json_extract(METADATA, '$.cognitive.subject') = ?";
135
136/// Cognitive-level extraction expression (no comparison — yields the raw string).
137const COGNITIVE_LEVEL_EXPR: &str = "json_extract(METADATA, '$.cognitive.level')";
138
139/// Assembles the full perspective WHERE clause (observer, subject, and optional
140/// session key) for use in SQL templates.
141///
142/// Returns `(sql_fragment, bind_count)`. The caller must bind:
143/// - observer, subject (always)
144/// - session_key × 2 (if present)
145fn perspective_where_clause(perspective: &PerspectiveKey) -> String {
146    if perspective.session_key.is_some() {
147        format!(
148            "{} AND {}",
149            PERSPECTIVE_IDENTITY_FILTER.replace("METADATA", METADATA),
150            SESSION_KEY_FILTER.replace("METADATA", METADATA),
151        )
152    } else {
153        PERSPECTIVE_IDENTITY_FILTER.replace("METADATA", METADATA)
154    }
155}
156
157/// Bind values for a perspective WHERE clause.
158/// Order: observer, subject, [session_key, session_key]
159fn bind_perspective(perspective: &PerspectiveKey) -> Vec<&str> {
160    let mut vals = vec![perspective.observer.as_str(), perspective.subject.as_str()];
161    if let Some(ref sk) = perspective.session_key {
162        vals.push(sk.as_str());
163        vals.push(sk.as_str());
164    }
165    vals
166}
167
168/// Repository for memory operations
169pub struct MemoryRepository {
170    pool: SqlitePool,
171}
172
173impl MemoryRepository {
174    pub fn new(pool: SqlitePool) -> Self {
175        Self { pool }
176    }
177
178    pub fn pool(&self) -> &SqlitePool {
179        &self.pool
180    }
181
182    /// Store a new memory
183    pub async fn store(&self, params: StoreMemoryParams<'_>) -> Result<Memory> {
184        let labels_json = serde_json::to_string(params.labels)?;
185        let metadata_json = serde_json::to_string(params.metadata)?;
186        let embedding_json = params.embedding.map(serde_json::to_string).transpose()?;
187
188        let result = sqlx::query(
189            r#"
190            INSERT INTO memories (
191                namespace_id, content, category, memory_lane_type, labels, metadata,
192                content_embedding, embedding_model, created_at, is_active, access_count
193            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 1, 0)
194            "#,
195        )
196        .bind(params.namespace_id)
197        .bind(params.content)
198        .bind(params.category.to_string())
199        .bind(params.memory_lane_type.map(|t| t.to_string()))
200        .bind(&labels_json)
201        .bind(&metadata_json)
202        .bind(&embedding_json)
203        .bind(params.embedding_model)
204        .bind(Utc::now())
205        .execute(&self.pool)
206        .await
207        .map_err(db_error)?;
208
209        let id = result.last_insert_rowid();
210
211        // If last_insert_rowid() is 0, the BEFORE INSERT trigger
212        // (trg_memories_same_namespace_merge) detected a duplicate and
213        // raised IGNORE. The existing row was touched (access_count
214        // incremented) — find it by content match.
215        if id == 0 {
216            // Try to find existing row by content match (case-insensitive, trimmed)
217            let row: Option<MemoryRow> = sqlx::query_as(
218                "SELECT * FROM memories WHERE namespace_id = ? AND LOWER(TRIM(content)) = LOWER(TRIM(?)) AND is_active = 1 ORDER BY created_at DESC LIMIT 1"
219            )
220            .bind(params.namespace_id)
221            .bind(params.content)
222            .fetch_optional(&self.pool)
223            .await
224            .map_err(db_error)?;
225
226            if let Some(existing) = row {
227                self.merge_duplicate_memory_context(existing.id, params)
228                    .await?;
229                return self.get_by_id(existing.id).await?.ok_or_else(|| {
230                    nexus_core::NexusError::Storage(format!(
231                        "Duplicate merged row {} could not be reloaded",
232                        existing.id
233                    ))
234                });
235            }
236
237            // If no duplicate found, this is unexpected - log warning but don't fail
238            // This can happen if trigger fires but content doesn't match exactly
239            tracing::warn!(
240                namespace_id = params.namespace_id,
241                content_length = params.content.len(),
242                "Insert returned id 0 but no matching duplicate found - treating as successful insert"
243            );
244
245            // Return success anyway - the insert did happen, we just can't track the id
246            // This prevents hook failures due to this edge case
247            return self
248                .get_by_content(params.namespace_id, params.content)
249                .await;
250        }
251
252        self.get_by_id(id).await?.ok_or_else(|| {
253            nexus_core::NexusError::Storage(format!("Failed to retrieve memory with id {}", id))
254        })
255    }
256
257    async fn merge_duplicate_memory_context(
258        &self,
259        existing_id: i64,
260        params: StoreMemoryParams<'_>,
261    ) -> Result<()> {
262        let current = self.get_by_id(existing_id).await?.ok_or_else(|| {
263            nexus_core::NexusError::Storage(format!(
264                "Failed to load duplicate-merged memory {}",
265                existing_id
266            ))
267        })?;
268
269        let merged_labels = merge_labels(&current.labels, params.labels);
270        let merged_metadata = merge_duplicate_metadata(&current.metadata, params.metadata);
271        let labels_json = serde_json::to_string(&merged_labels)?;
272        let metadata_json = serde_json::to_string(&merged_metadata)?;
273
274        sqlx::query(
275            r#"
276            UPDATE memories
277            SET labels = ?, metadata = ?, updated_at = ?
278            WHERE id = ?
279            "#,
280        )
281        .bind(&labels_json)
282        .bind(&metadata_json)
283        .bind(Utc::now())
284        .bind(existing_id)
285        .execute(&self.pool)
286        .await
287        .map_err(db_error)?;
288
289        Ok(())
290    }
291
292    /// Store a memory and write evidence lineage rows atomically.
293    pub async fn store_with_lineage(
294        &self,
295        params: StoreMemoryWithLineageParams<'_>,
296    ) -> Result<Memory> {
297        let mut tx = self.pool.begin().await.map_err(db_error)?;
298        let memory_id = insert_memory_tx(&mut tx, &params.store).await?;
299        for &source_id in params.source_memory_ids {
300            insert_evidence_tx(&mut tx, memory_id, source_id, params.evidence_role).await?;
301        }
302        tx.commit().await.map_err(db_error)?;
303
304        self.get_by_id(memory_id).await?.ok_or_else(|| {
305            nexus_core::NexusError::Storage(format!(
306                "Failed to retrieve memory with id {} after lineage store",
307                memory_id
308            ))
309        })
310    }
311
312    /// Enqueue a new cognitive job.
313    pub async fn enqueue_job(&self, params: EnqueueJobParams<'_>) -> Result<i64> {
314        let perspective_json = params.perspective.map(serde_json::to_string).transpose()?;
315        let payload_json = serde_json::to_string(params.payload)?;
316
317        let id: i64 = sqlx::query_scalar(
318            r#"
319            INSERT INTO memory_jobs (namespace_id, job_type, status, priority, perspective_json, payload_json, created_at, updated_at)
320            VALUES (?, ?, 'pending', ?, ?, ?, datetime('now'), datetime('now'))
321            RETURNING id
322            "#,
323        )
324        .bind(params.namespace_id)
325        .bind(params.job_type)
326        .bind(params.priority)
327        .bind(&perspective_json)
328        .bind(&payload_json)
329        .fetch_one(&self.pool)
330        .await
331        .map_err(db_error)?;
332
333        Ok(id)
334    }
335
336    /// Claim up to `limit` pending (or stale running) jobs for a given type and namespace.
337    ///
338    /// Stale running jobs are those whose lease has expired (`lease_expires_at < now`).
339    /// Claimed jobs are transitioned to `running` with a lease owner and TTL.
340    pub async fn claim_jobs(
341        &self,
342        namespace_id: i64,
343        job_type: &str,
344        lease_owner: &str,
345        lease_ttl_secs: u64,
346        limit: i64,
347    ) -> Result<Vec<ClaimedMemoryJob>> {
348        let claim_token = new_claim_token(lease_owner);
349        // SQLite's UPDATE ... RETURNING does not preserve CTE ordering.
350        // We must sort in Rust to guarantee priority-first delivery.
351        // This is O(n log n) but n is bounded by the `limit` parameter (typically small).
352        let rows: Vec<MemoryJobRow> = sqlx::query_as::<_, MemoryJobRow>(
353            r#"
354            WITH candidates AS (
355                SELECT id
356                FROM memory_jobs
357                WHERE namespace_id = ? AND job_type = ? AND (
358                    status = ?
359                    OR (status = ? AND lease_expires_at IS NOT NULL AND lease_expires_at < datetime('now'))
360                )
361                ORDER BY priority DESC, created_at ASC
362                LIMIT ?
363            )
364            UPDATE memory_jobs
365            SET status = ?,
366                lease_owner = ?,
367                claim_token = ?,
368                lease_expires_at = datetime('now', '+' || ? || ' seconds'),
369                attempts = attempts + 1,
370                updated_at = datetime('now')
371            WHERE id IN (SELECT id FROM candidates)
372            RETURNING *
373            "#,
374        )
375        .bind(namespace_id)
376        .bind(job_type)
377        .bind(memory_job_status::PENDING)
378        .bind(memory_job_status::RUNNING)
379        .bind(limit)
380        .bind(memory_job_status::RUNNING)
381        .bind(lease_owner)
382        .bind(&claim_token)
383        .bind(lease_ttl_secs as i64)
384        .fetch_all(&self.pool)
385        .await
386        .map_err(db_error)?;
387
388        let mut rows = rows;
389        rows.sort_by(|left, right| {
390            right
391                .priority
392                .cmp(&left.priority)
393                .then_with(|| left.created_at.cmp(&right.created_at))
394        });
395
396        let mut claimed = Vec::with_capacity(rows.len());
397        for row in rows {
398            let perspective = match row.perspective_json.as_deref() {
399                Some(s) => match serde_json::from_str(s) {
400                    Ok(p) => Some(p),
401                    Err(e) => {
402                        tracing::warn!(
403                            job_id = row.id,
404                            error = %e,
405                            "corrupted perspective JSON, permanently failing job"
406                        );
407                        let _ = self
408                            .permanently_fail_job(
409                                row.id,
410                                &row.lease_owner,
411                                &row.claim_token,
412                                &format!("corrupted perspective JSON: {e}"),
413                            )
414                            .await;
415                        continue;
416                    }
417                },
418                None => None,
419            };
420            let payload: serde_json::Value = match serde_json::from_str(&row.payload_json) {
421                Ok(p) => p,
422                Err(e) => {
423                    tracing::warn!(
424                        job_id = row.id,
425                        error = %e,
426                        "corrupted payload JSON, permanently failing job"
427                    );
428                    let _ = self
429                        .permanently_fail_job(
430                            row.id,
431                            &row.lease_owner,
432                            &row.claim_token,
433                            &format!("corrupted payload JSON: {e}"),
434                        )
435                        .await;
436                    continue;
437                }
438            };
439            claimed.push(ClaimedMemoryJob {
440                row,
441                perspective,
442                payload,
443            });
444        }
445
446        Ok(claimed)
447    }
448
449    /// Mark a job as completed.
450    pub async fn complete_job(&self, job: &ClaimedMemoryJob) -> Result<()> {
451        let result = sqlx::query(
452            r#"
453            UPDATE memory_jobs
454            SET status = ?, lease_owner = NULL, claim_token = NULL, lease_expires_at = NULL, updated_at = datetime('now')
455            WHERE id = ?
456              AND lease_owner = ?
457              AND claim_token = ?
458            "#,
459        )
460        .bind(memory_job_status::COMPLETED)
461        .bind(job.row.id)
462        .bind(job.row.lease_owner.as_deref())
463        .bind(job.row.claim_token.as_deref())
464        .execute(&self.pool)
465        .await
466        .map_err(db_error)?;
467
468        if result.rows_affected() == 0 {
469            return Err(nexus_core::NexusError::Storage(format!(
470                "Memory job {} completion lost lease ownership",
471                job.row.id
472            )));
473        }
474
475        Ok(())
476    }
477
478    /// Mark a job as failed. If attempts < MAX_JOB_ATTEMPTS, requeue it as pending.
479    /// Otherwise, mark it permanently failed.
480    pub async fn fail_job(&self, job: &ClaimedMemoryJob, error: &str) -> Result<()> {
481        let row: Option<MemoryJobRow> = sqlx::query_as("SELECT * FROM memory_jobs WHERE id = ?")
482            .bind(job.row.id)
483            .fetch_optional(&self.pool)
484            .await
485            .map_err(db_error)?;
486
487        let row = row.ok_or_else(|| {
488            nexus_core::NexusError::Storage(format!("Memory job {} not found", job.row.id))
489        })?;
490
491        let lease_matches =
492            row.lease_owner == job.row.lease_owner && row.claim_token == job.row.claim_token;
493        if !lease_matches {
494            return Err(nexus_core::NexusError::Storage(format!(
495                "Memory job {} failure lost lease ownership",
496                job.row.id
497            )));
498        }
499
500        if row.attempts >= MAX_JOB_ATTEMPTS {
501            // Permanently failed.
502            let result = sqlx::query(
503                r#"
504                UPDATE memory_jobs
505                SET status = ?, last_error = ?, lease_owner = NULL, claim_token = NULL, lease_expires_at = NULL, updated_at = datetime('now')
506                WHERE id = ? AND lease_owner = ? AND claim_token = ?
507                "#,
508            )
509            .bind(memory_job_status::FAILED)
510            .bind(error)
511            .bind(job.row.id)
512            .bind(job.row.lease_owner.as_deref())
513            .bind(job.row.claim_token.as_deref())
514            .execute(&self.pool)
515            .await
516            .map_err(db_error)?;
517            if result.rows_affected() == 0 {
518                return Err(nexus_core::NexusError::Storage(format!(
519                    "Memory job {} failure lost lease ownership",
520                    job.row.id
521                )));
522            }
523        } else {
524            // Requeue for retry.
525            let result = sqlx::query(
526                r#"
527                UPDATE memory_jobs
528                SET status = ?, last_error = ?, lease_owner = NULL, claim_token = NULL, lease_expires_at = NULL, updated_at = datetime('now')
529                WHERE id = ? AND lease_owner = ? AND claim_token = ?
530                "#,
531            )
532            .bind(memory_job_status::PENDING)
533            .bind(error)
534            .bind(job.row.id)
535            .bind(job.row.lease_owner.as_deref())
536            .bind(job.row.claim_token.as_deref())
537            .execute(&self.pool)
538            .await
539            .map_err(db_error)?;
540            if result.rows_affected() == 0 {
541                return Err(nexus_core::NexusError::Storage(format!(
542                    "Memory job {} failure lost lease ownership",
543                    job.row.id
544                )));
545            }
546        }
547
548        Ok(())
549    }
550
551    /// Permanently fail a job by ID without requiring a ClaimedMemoryJob.
552    /// Used by claim_jobs to handle corrupted jobs without
553    /// poisoning the entire batch.
554    async fn permanently_fail_job(
555        &self,
556        job_id: i64,
557        lease_owner: &Option<String>,
558        claim_token: &Option<String>,
559        error: &str,
560    ) -> Result<()> {
561        sqlx::query(
562            r#"
563            UPDATE memory_jobs
564            SET status = ?, last_error = ?, lease_owner = NULL, claim_token = NULL,
565                lease_expires_at = NULL, updated_at = datetime('now')
566            WHERE id = ? AND lease_owner = ? AND claim_token = ?
567            "#,
568        )
569        .bind(memory_job_status::FAILED)
570        .bind(error)
571        .bind(job_id)
572        .bind(lease_owner.as_deref())
573        .bind(claim_token.as_deref())
574        .execute(&self.pool)
575        .await
576        .map_err(db_error)?;
577        Ok(())
578    }
579
580    pub async fn get_most_reinforced_by_namespace(
581        &self,
582        namespace_id: i64,
583        limit: i64,
584        include_raw: bool,
585    ) -> Result<Vec<Memory>> {
586        let noise_sql = if include_raw {
587            String::new()
588        } else {
589            format!("AND {}", RAW_ACTIVITY_FILTER_SQL)
590        };
591        let rows = sqlx::query_as::<_, MemoryRow>(&format!(
592            r#"
593            SELECT * FROM memories
594            WHERE namespace_id = ?
595              AND is_active = 1
596              AND json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.level') = ?
597              {noise_sql}
598            ORDER BY COALESCE(json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.times_reinforced'), 0) DESC,
599                     created_at DESC
600            LIMIT ?
601            "#
602        ))
603        .bind(namespace_id)
604        .bind(CognitiveLevel::Derived.as_str())
605        .bind(limit)
606        .fetch_all(&self.pool)
607        .await
608        .map_err(db_error)?;
609
610        rows.into_iter()
611            .map(|row| self.row_to_memory(row))
612            .collect()
613    }
614
615    pub async fn get_contradictions_by_namespace(
616        &self,
617        namespace_id: i64,
618        limit: i64,
619        include_raw: bool,
620    ) -> Result<Vec<Memory>> {
621        let noise_sql = if include_raw {
622            String::new()
623        } else {
624            format!("AND {}", RAW_ACTIVITY_FILTER_SQL)
625        };
626        let rows = sqlx::query_as::<_, MemoryRow>(&format!(
627            r#"
628            SELECT * FROM memories
629            WHERE namespace_id = ?
630              AND is_active = 1
631              AND json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.level') = ?
632              {noise_sql}
633            ORDER BY COALESCE(json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.times_contradicted'), 0) DESC,
634                     created_at DESC
635            LIMIT ?
636            "#
637        ))
638        .bind(namespace_id)
639        .bind(CognitiveLevel::Contradiction.as_str())
640        .bind(limit)
641        .fetch_all(&self.pool)
642        .await
643        .map_err(db_error)?;
644
645        rows.into_iter()
646            .map(|row| self.row_to_memory(row))
647            .collect()
648    }
649
650    pub async fn list_by_session_key(
651        &self,
652        namespace_id: i64,
653        session_key: &str,
654        limit: i64,
655        include_raw: bool,
656    ) -> Result<Vec<Memory>> {
657        let noise_sql = if include_raw {
658            String::new()
659        } else {
660            format!("AND {}", RAW_ACTIVITY_FILTER_SQL)
661        };
662        let session_filter = SESSION_KEY_FILTER.replace("METADATA", METADATA);
663        let rows = sqlx::query_as::<_, MemoryRow>(&format!(
664            r#"
665            SELECT * FROM memories
666            WHERE namespace_id = ?
667              AND is_active = 1
668              AND {session_filter}
669              {noise_sql}
670            ORDER BY created_at DESC
671            LIMIT ?
672            "#
673        ))
674        .bind(namespace_id)
675        .bind(session_key)
676        .bind(session_key)
677        .bind(limit)
678        .fetch_all(&self.pool)
679        .await
680        .map_err(db_error)?;
681
682        rows.into_iter()
683            .map(|row| self.row_to_memory(row))
684            .collect()
685    }
686
687    /// Register a session digest for a memory.
688    ///
689    /// Returns the digest row ID. Uses the unique index on
690    /// `(namespace_id, session_key, digest_kind, end_memory_id)` to prevent
691    /// duplicate registrations.
692    pub async fn store_digest(&self, params: StoreDigestParams<'_>) -> Result<i64> {
693        let id: i64 = sqlx::query_scalar(
694            r#"
695            INSERT INTO session_digests (namespace_id, session_key, digest_kind, memory_id, start_memory_id, end_memory_id, token_count, created_at)
696            VALUES (?, ?, ?, ?, ?, ?, ?, datetime('now'))
697            ON CONFLICT(namespace_id, session_key, digest_kind, end_memory_id) DO UPDATE SET
698                memory_id = excluded.memory_id,
699                token_count = excluded.token_count
700            RETURNING id
701            "#,
702        )
703        .bind(params.namespace_id)
704        .bind(params.session_key)
705        .bind(params.digest_kind)
706        .bind(params.memory_id)
707        .bind(params.start_memory_id)
708        .bind(params.end_memory_id)
709        .bind(params.token_count as i64)
710        .fetch_one(&self.pool)
711        .await
712        .map_err(db_error)?;
713
714        Ok(id)
715    }
716
717    /// Get the latest digest memory for a session and digest kind.
718    /// Returns the `Memory` row that the digest points to, or None.
719    pub async fn latest_digest_for_session(
720        &self,
721        namespace_id: i64,
722        session_key: &str,
723        digest_kind: &str,
724    ) -> Result<Option<Memory>> {
725        let digest: Option<SessionDigestRow> = sqlx::query_as::<_, SessionDigestRow>(
726            "SELECT * FROM session_digests WHERE namespace_id = ? AND session_key = ? AND digest_kind = ? ORDER BY created_at DESC LIMIT 1",
727        )
728        .bind(namespace_id)
729        .bind(session_key)
730        .bind(digest_kind)
731        .fetch_optional(&self.pool)
732        .await
733        .map_err(db_error)?;
734
735        match digest {
736            Some(d) => self.get_by_id(d.memory_id).await,
737            None => Ok(None),
738        }
739    }
740
741    /// Get the latest digest memory for a namespace and digest kind, regardless
742    /// of session key. Returns the `Memory` row that the digest points to, or
743    /// None if no digest exists.
744    pub async fn latest_digest_for_namespace(
745        &self,
746        namespace_id: i64,
747        digest_kind: &str,
748    ) -> Result<Option<Memory>> {
749        let digest: Option<SessionDigestRow> = sqlx::query_as::<_, SessionDigestRow>(
750            "SELECT * FROM session_digests WHERE namespace_id = ? AND digest_kind = ? ORDER BY created_at DESC LIMIT 1",
751        )
752        .bind(namespace_id)
753        .bind(digest_kind)
754        .fetch_optional(&self.pool)
755        .await
756        .map_err(db_error)?;
757
758        match digest {
759            Some(d) => self.get_by_id(d.memory_id).await,
760            None => Ok(None),
761        }
762    }
763
764    /// Return how much fresh non-raw, non-digest session content has accumulated
765    /// since the latest covered digest window.
766    pub async fn session_digest_rollover(
767        &self,
768        namespace_id: i64,
769        session_key: &str,
770    ) -> Result<SessionDigestRollover> {
771        let last_digest_end_memory_id: Option<i64> = sqlx::query_scalar(
772            "SELECT MAX(end_memory_id) FROM session_digests WHERE namespace_id = ? AND session_key = ?",
773        )
774        .bind(namespace_id)
775        .bind(session_key)
776        .fetch_one(&self.pool)
777        .await
778        .map_err(db_error)?;
779
780        let (new_memory_count, estimated_new_tokens): (i64, i64) = sqlx::query_as(&format!(
781            r#"
782            SELECT
783                COUNT(*) as new_memory_count,
784                COALESCE(SUM((LENGTH(content) + 3) / 4), 0) as estimated_new_tokens
785            FROM memories
786            WHERE namespace_id = ?
787              AND is_active = 1
788              AND id > ?
789              AND (
790                  json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.session_key') = ?
791                  OR EXISTS (
792                      SELECT 1
793                      FROM json_each(COALESCE(json_extract(metadata, '$.cognitive.session_keys'), '[]'))
794                      WHERE value = ?
795                  )
796              )
797              AND {RAW_ACTIVITY_FILTER_SQL}
798              AND COALESCE(json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.level'), '') NOT IN ('raw', 'summary_short', 'summary_long')
799            "#,
800        ))
801        .bind(namespace_id)
802        .bind(last_digest_end_memory_id.unwrap_or(0))
803        .bind(session_key)
804        .bind(session_key)
805        .fetch_one(&self.pool)
806        .await
807        .map_err(db_error)?;
808
809        Ok(SessionDigestRollover {
810            last_digest_end_memory_id,
811            new_memory_count,
812            estimated_new_tokens,
813        })
814    }
815
816    /// Get recent memories scoped to a perspective, optionally narrowed to a session.
817    ///
818    /// By default, raw-activity noise is excluded. Set `include_raw` to `true` to
819    /// include operational hook event payloads.
820    pub async fn get_recent_by_perspective(
821        &self,
822        namespace_id: i64,
823        perspective: &PerspectiveKey,
824        limit: i64,
825    ) -> Result<Vec<Memory>> {
826        self.get_recent_by_perspective_opts(namespace_id, perspective, limit, false)
827            .await
828    }
829
830    /// Like [`Self::get_recent_by_perspective`] but allows opting into raw-activity
831    /// noise inclusion.
832    pub async fn get_recent_by_perspective_opts(
833        &self,
834        namespace_id: i64,
835        perspective: &PerspectiveKey,
836        limit: i64,
837        include_raw: bool,
838    ) -> Result<Vec<Memory>> {
839        let noise_sql = if include_raw {
840            String::new()
841        } else {
842            format!("AND {}", RAW_ACTIVITY_FILTER_SQL)
843        };
844
845        let perspective_sql = perspective_where_clause(perspective);
846        let sql = format!(
847            r#"
848            SELECT * FROM memories
849            WHERE namespace_id = ?
850              AND is_active = 1
851              AND {perspective_sql}
852              {noise_sql}
853            ORDER BY created_at DESC
854            LIMIT ?
855            "#
856        );
857        let mut query = sqlx::query_as::<_, MemoryRow>(&sql).bind(namespace_id);
858
859        for val in bind_perspective(perspective) {
860            query = query.bind(val);
861        }
862
863        let rows = query
864            .bind(limit)
865            .fetch_all(&self.pool)
866            .await
867            .map_err(db_error)?;
868
869        rows.into_iter()
870            .map(|row| self.row_to_memory(row))
871            .collect()
872    }
873
874    /// Get active memories by cognitive level ordered from newest to oldest.
875    pub async fn get_by_cognitive_level(
876        &self,
877        namespace_id: i64,
878        level: CognitiveLevel,
879        limit: i64,
880    ) -> Result<Vec<Memory>> {
881        let rows = sqlx::query_as::<_, MemoryRow>(&format!(
882            r#"
883            SELECT * FROM memories
884            WHERE namespace_id = ?
885              AND is_active = 1
886              AND {} = ?
887            ORDER BY created_at DESC
888            LIMIT ?
889            "#,
890            COGNITIVE_LEVEL_EXPR,
891        ))
892        .bind(namespace_id)
893        .bind(level.as_str())
894        .bind(limit)
895        .fetch_all(&self.pool)
896        .await
897        .map_err(db_error)?;
898
899        rows.into_iter()
900            .map(|row| self.row_to_memory(row))
901            .collect()
902    }
903
904    /// Get active memories by cognitive level and perspective, ordered from newest to oldest.
905    ///
906    /// Unlike [`Self::get_by_cognitive_level`], this method applies perspective filtering
907    /// (observer, subject, session_key, and session_keys arrays) in the SQL query BEFORE
908    /// the LIMIT is applied, ensuring the caller receives up to `limit` matching results.
909    pub async fn get_by_cognitive_level_with_perspective(
910        &self,
911        namespace_id: i64,
912        level: CognitiveLevel,
913        perspective: &PerspectiveKey,
914        limit: i64,
915    ) -> Result<Vec<Memory>> {
916        let perspective_sql = perspective_where_clause(perspective);
917        let sql = format!(
918            r#"
919            SELECT * FROM memories
920            WHERE namespace_id = ?
921              AND is_active = 1
922              AND {COGNITIVE_LEVEL_EXPR} = ?
923              AND {perspective_sql}
924            ORDER BY created_at DESC
925            LIMIT ?
926            "#
927        );
928        let mut query = sqlx::query_as::<_, MemoryRow>(&sql)
929            .bind(namespace_id)
930            .bind(level.as_str());
931
932        for val in bind_perspective(perspective) {
933            query = query.bind(val);
934        }
935
936        let rows = query
937            .bind(limit)
938            .fetch_all(&self.pool)
939            .await
940            .map_err(db_error)?;
941
942        rows.into_iter()
943            .map(|row| self.row_to_memory(row))
944            .collect()
945    }
946
947    /// Get the most reinforced perspective-aligned memories first.
948    ///
949    /// By default, raw-activity noise and contradiction memories are excluded.
950    pub async fn get_most_reinforced_by_perspective(
951        &self,
952        namespace_id: i64,
953        perspective: &PerspectiveKey,
954        limit: i64,
955    ) -> Result<Vec<Memory>> {
956        self.get_most_reinforced_by_perspective_opts(namespace_id, perspective, limit, false)
957            .await
958    }
959
960    /// Like [`Self::get_most_reinforced_by_perspective`] but allows opting into
961    /// raw-activity noise inclusion.
962    pub async fn get_most_reinforced_by_perspective_opts(
963        &self,
964        namespace_id: i64,
965        perspective: &PerspectiveKey,
966        limit: i64,
967        include_raw: bool,
968    ) -> Result<Vec<Memory>> {
969        let noise_sql = if include_raw {
970            String::new()
971        } else {
972            format!("AND {}", RAW_ACTIVITY_FILTER_SQL)
973        };
974
975        let perspective_sql = perspective_where_clause(perspective);
976        let sql = format!(
977            r#"
978            SELECT * FROM memories
979            WHERE namespace_id = ?
980              AND is_active = 1
981              AND {perspective_sql}
982              AND {COGNITIVE_LEVEL_EXPR} != ?
983              {noise_sql}
984            ORDER BY COALESCE(json_extract({METADATA}, '$.cognitive.times_reinforced'), 0) DESC,
985                     created_at DESC
986            LIMIT ?
987            "#
988        );
989        let mut query = sqlx::query_as::<_, MemoryRow>(&sql).bind(namespace_id);
990
991        for val in bind_perspective(perspective) {
992            query = query.bind(val);
993        }
994
995        let rows = query
996            .bind(CognitiveLevel::Contradiction.as_str())
997            .bind(limit)
998            .fetch_all(&self.pool)
999            .await
1000            .map_err(db_error)?;
1001
1002        rows.into_iter()
1003            .map(|row| self.row_to_memory(row))
1004            .collect()
1005    }
1006
1007    /// Get contradiction memories for the requested perspective.
1008    ///
1009    /// By default, raw-activity noise is excluded.
1010    pub async fn get_contradictions_by_perspective(
1011        &self,
1012        namespace_id: i64,
1013        perspective: &PerspectiveKey,
1014        limit: i64,
1015    ) -> Result<Vec<Memory>> {
1016        self.get_contradictions_by_perspective_opts(namespace_id, perspective, limit, false)
1017            .await
1018    }
1019
1020    /// Like [`Self::get_contradictions_by_perspective`] but allows opting into
1021    /// raw-activity noise inclusion.
1022    pub async fn get_contradictions_by_perspective_opts(
1023        &self,
1024        namespace_id: i64,
1025        perspective: &PerspectiveKey,
1026        limit: i64,
1027        include_raw: bool,
1028    ) -> Result<Vec<Memory>> {
1029        let noise_sql = if include_raw {
1030            String::new()
1031        } else {
1032            format!("AND {}", RAW_ACTIVITY_FILTER_SQL)
1033        };
1034
1035        let perspective_sql = perspective_where_clause(perspective);
1036        let sql = format!(
1037            r#"
1038            SELECT * FROM memories
1039            WHERE namespace_id = ?
1040              AND is_active = 1
1041              AND {perspective_sql}
1042              AND {COGNITIVE_LEVEL_EXPR} = ?
1043              {noise_sql}
1044            ORDER BY COALESCE(json_extract({METADATA}, '$.cognitive.times_contradicted'), 0) DESC,
1045                     created_at DESC
1046            LIMIT ?
1047            "#
1048        );
1049        let mut query = sqlx::query_as::<_, MemoryRow>(&sql).bind(namespace_id);
1050
1051        for val in bind_perspective(perspective) {
1052            query = query.bind(val);
1053        }
1054
1055        let rows = query
1056            .bind(CognitiveLevel::Contradiction.as_str())
1057            .bind(limit)
1058            .fetch_all(&self.pool)
1059            .await
1060            .map_err(db_error)?;
1061
1062        rows.into_iter()
1063            .map(|row| self.row_to_memory(row))
1064            .collect()
1065    }
1066
1067    // ---------------------------------------------------------------------------
1068    // Working-set assembly
1069    // ---------------------------------------------------------------------------
1070
1071    /// Assemble a bounded working set from multiple retrieval buckets.
1072    ///
1073    /// This is the low-level repository primitive used by higher-level services
1074    /// (e.g. `RepresentationService`) to build a [`WorkingRepresentation`].
1075    ///
1076    /// The method queries four buckets in sequence:
1077    ///
1078    /// 1. **Digests** — latest short + long session digests
1079    /// 2. **Reinforced** — highest reinforcement-scored memories
1080    /// 3. **Recent** — newest perspective-aligned memories
1081    /// 4. **Contradictions** — highest-confidence contradiction records
1082    ///
1083    /// Results are deduplicated by memory ID (first-seen-wins in bucket order)
1084    /// and truncated to `max_items`.
1085    pub async fn search_working_set(&self, params: WorkingSetParams<'_>) -> Result<Vec<Memory>> {
1086        let WorkingSetParams {
1087            namespace_id,
1088            perspective,
1089            max_items,
1090            include_raw,
1091        } = params;
1092
1093        // Each sub-query requests more than max_items so dedup still has enough
1094        // material after filtering.
1095        let per_bucket = (max_items as i64).max(8);
1096
1097        // 1. Digests
1098        let session = perspective
1099            .and_then(|p| p.session_key.as_deref())
1100            .unwrap_or("");
1101        let mut digests = Vec::new();
1102        if let Some(short) = self
1103            .latest_digest_for_session(namespace_id, session, "short")
1104            .await?
1105        {
1106            digests.push(short);
1107        }
1108        if let Some(long) = self
1109            .latest_digest_for_session(namespace_id, session, "long")
1110            .await?
1111        {
1112            digests.push(long);
1113        }
1114
1115        // 2-4. Perspective or namespace-level queries
1116        let reinforced = if let Some(persp) = perspective {
1117            self.get_most_reinforced_by_perspective_opts(
1118                namespace_id,
1119                persp,
1120                per_bucket,
1121                include_raw,
1122            )
1123            .await?
1124        } else {
1125            self.get_most_reinforced_by_namespace(namespace_id, per_bucket, include_raw)
1126                .await?
1127        };
1128
1129        let recent = if let Some(persp) = perspective {
1130            self.get_recent_by_perspective_opts(namespace_id, persp, per_bucket, include_raw)
1131                .await?
1132        } else {
1133            let sql = if include_raw {
1134                "SELECT * FROM memories WHERE namespace_id = ? AND is_active = 1 ORDER BY created_at DESC LIMIT ?"
1135                    .to_string()
1136            } else {
1137                format!(
1138                    "SELECT * FROM memories WHERE namespace_id = ? AND is_active = 1 AND {} ORDER BY created_at DESC LIMIT ?",
1139                    RAW_ACTIVITY_FILTER_SQL,
1140                )
1141            };
1142            let rows: Vec<MemoryRow> = sqlx::query_as(&sql)
1143                .bind(namespace_id)
1144                .bind(per_bucket)
1145                .fetch_all(&self.pool)
1146                .await
1147                .map_err(db_error)?;
1148            rows.into_iter()
1149                .map(|r| self.row_to_memory(r))
1150                .collect::<Result<Vec<_>>>()?
1151        };
1152
1153        let contradictions = if let Some(persp) = perspective {
1154            self.get_contradictions_by_perspective_opts(
1155                namespace_id,
1156                persp,
1157                per_bucket,
1158                include_raw,
1159            )
1160            .await?
1161        } else {
1162            self.get_contradictions_by_namespace(namespace_id, per_bucket, include_raw)
1163                .await?
1164        };
1165
1166        // Merge: digests → reinforced → recent → contradictions, dedupe by ID
1167        let mut seen = std::collections::HashSet::new();
1168        let mut result = Vec::with_capacity(max_items);
1169
1170        for memory in digests
1171            .into_iter()
1172            .chain(reinforced)
1173            .chain(recent)
1174            .chain(contradictions)
1175        {
1176            if seen.insert(memory.id) {
1177                result.push(memory);
1178                if result.len() >= max_items {
1179                    break;
1180                }
1181            }
1182        }
1183
1184        Ok(result)
1185    }
1186
1187    /// Load all evidence lineage entries for a given memory (as derived or source).
1188    pub async fn load_lineage(&self, memory_id: i64) -> Result<Vec<MemoryLineageEntry>> {
1189        let rows: Vec<MemoryEvidenceRow> = sqlx::query_as::<_, MemoryEvidenceRow>(
1190            r#"
1191            SELECT * FROM memory_evidence
1192            WHERE derived_memory_id = ? OR source_memory_id = ?
1193            ORDER BY created_at ASC
1194            "#,
1195        )
1196        .bind(memory_id)
1197        .bind(memory_id)
1198        .fetch_all(&self.pool)
1199        .await
1200        .map_err(db_error)?;
1201
1202        Ok(rows
1203            .into_iter()
1204            .map(|r| MemoryLineageEntry {
1205                derived_memory_id: r.derived_memory_id,
1206                source_memory_id: r.source_memory_id,
1207                evidence_role: r.evidence_role,
1208            })
1209            .collect())
1210    }
1211
1212    /// Load lineage rows for many source/derived memory IDs in one query.
1213    pub async fn load_lineage_batch(
1214        &self,
1215        memory_ids: &[i64],
1216    ) -> Result<HashMap<i64, Vec<MemoryLineageEntry>>> {
1217        if memory_ids.is_empty() {
1218            return Ok(HashMap::new());
1219        }
1220
1221        let placeholders = memory_ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1222        let sql = format!(
1223            r#"
1224            SELECT * FROM memory_evidence
1225            WHERE derived_memory_id IN ({placeholders})
1226               OR source_memory_id IN ({placeholders})
1227            ORDER BY created_at ASC
1228            "#
1229        );
1230
1231        let mut query = sqlx::query_as::<_, MemoryEvidenceRow>(&sql);
1232        for id in memory_ids {
1233            query = query.bind(*id);
1234        }
1235        for id in memory_ids {
1236            query = query.bind(*id);
1237        }
1238
1239        let rows = query.fetch_all(&self.pool).await.map_err(db_error)?;
1240        let id_set: HashSet<i64> = memory_ids.iter().copied().collect();
1241        let mut grouped: HashMap<i64, Vec<MemoryLineageEntry>> = HashMap::new();
1242
1243        for row in rows {
1244            let entry = MemoryLineageEntry {
1245                derived_memory_id: row.derived_memory_id,
1246                source_memory_id: row.source_memory_id,
1247                evidence_role: row.evidence_role,
1248            };
1249
1250            if id_set.contains(&entry.derived_memory_id) {
1251                grouped
1252                    .entry(entry.derived_memory_id)
1253                    .or_default()
1254                    .push(entry.clone());
1255            }
1256            if id_set.contains(&entry.source_memory_id) {
1257                grouped
1258                    .entry(entry.source_memory_id)
1259                    .or_default()
1260                    .push(entry);
1261            }
1262        }
1263
1264        Ok(grouped)
1265    }
1266
1267    /// Get a memory by ID
1268    pub async fn get_by_id(&self, id: i64) -> Result<Option<Memory>> {
1269        let row: Option<MemoryRow> = sqlx::query_as("SELECT * FROM memories WHERE id = ?")
1270            .bind(id)
1271            .fetch_optional(&self.pool)
1272            .await
1273            .map_err(db_error)?;
1274
1275        row.map(|r| self.row_to_memory(r)).transpose()
1276    }
1277
1278    /// Get multiple memories by their IDs (batch fetch).
1279    ///
1280    /// Returns only memories that exist; the result Vec length may be less than
1281    /// the input slice length if some IDs are not found.
1282    pub async fn get_by_ids(&self, ids: &[i64]) -> Result<Vec<Memory>> {
1283        if ids.is_empty() {
1284            return Ok(Vec::new());
1285        }
1286
1287        // Build parameter placeholders: (?, ?, ?, ...)
1288        let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1289
1290        let sql = format!("SELECT * FROM memories WHERE id IN ({placeholders})");
1291
1292        let mut query = sqlx::query_as::<_, MemoryRow>(&sql);
1293        for id in ids {
1294            query = query.bind(*id);
1295        }
1296
1297        let rows = query.fetch_all(&self.pool).await.map_err(db_error)?;
1298
1299        let mut memories: Vec<Memory> = Vec::with_capacity(rows.len());
1300        for row in rows {
1301            memories.push(self.row_to_memory(row)?);
1302        }
1303
1304        Ok(memories)
1305    }
1306
1307    /// Get a memory by namespace and content (fallback for id 0 edge case)
1308    pub async fn get_by_content(&self, namespace_id: i64, content: &str) -> Result<Memory> {
1309        let row: Option<MemoryRow> = sqlx::query_as(
1310            "SELECT * FROM memories WHERE namespace_id = ? AND LOWER(TRIM(content)) = LOWER(TRIM(?)) AND is_active = 1 ORDER BY created_at DESC LIMIT 1"
1311        )
1312        .bind(namespace_id)
1313        .bind(content)
1314        .fetch_optional(&self.pool)
1315        .await
1316        .map_err(db_error)?;
1317
1318        row.map(|r| self.row_to_memory(r))
1319            .transpose()?
1320            .ok_or_else(|| {
1321                nexus_core::NexusError::Storage(
1322                    "No memories found in namespace after insert".to_string(),
1323                )
1324            })
1325    }
1326
1327    /// Search memories by namespace
1328    pub async fn search_by_namespace(
1329        &self,
1330        namespace_id: i64,
1331        limit: usize,
1332        offset: usize,
1333    ) -> Result<Vec<Memory>> {
1334        let rows: Vec<MemoryRow> = sqlx::query_as(
1335            "SELECT * FROM memories WHERE namespace_id = ? AND is_active = 1 ORDER BY created_at DESC LIMIT ? OFFSET ?"
1336        )
1337        .bind(namespace_id)
1338        .bind(limit as i64)
1339        .bind(offset as i64)
1340        .fetch_all(&self.pool)
1341        .await
1342        .map_err(db_error)?;
1343
1344        rows.into_iter().map(|r| self.row_to_memory(r)).collect()
1345    }
1346
1347    /// Count memories in namespace
1348    pub async fn count_by_namespace(&self, namespace_id: i64) -> Result<i64> {
1349        let count: (i64,) = sqlx::query_as(
1350            "SELECT COUNT(*) FROM memories WHERE namespace_id = ? AND is_active = 1",
1351        )
1352        .bind(namespace_id)
1353        .fetch_one(&self.pool)
1354        .await
1355        .map_err(db_error)?;
1356
1357        Ok(count.0)
1358    }
1359
1360    /// Count all memories in namespace (including inactive/archived)
1361    pub async fn count_all_by_namespace(&self, namespace_id: i64) -> Result<i64> {
1362        let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM memories WHERE namespace_id = ?")
1363            .bind(namespace_id)
1364            .fetch_one(&self.pool)
1365            .await
1366            .map_err(db_error)?;
1367
1368        Ok(count.0)
1369    }
1370
1371    /// Count archived memories in namespace
1372    pub async fn count_archived_by_namespace(&self, namespace_id: i64) -> Result<i64> {
1373        let count: (i64,) = sqlx::query_as(
1374            "SELECT COUNT(*) FROM memories WHERE namespace_id = ? AND is_archived = 1",
1375        )
1376        .bind(namespace_id)
1377        .fetch_one(&self.pool)
1378        .await
1379        .map_err(db_error)?;
1380
1381        Ok(count.0)
1382    }
1383
1384    /// Delete a memory
1385    pub async fn delete(&self, id: i64) -> Result<bool> {
1386        let result = sqlx::query("DELETE FROM memories WHERE id = ?")
1387            .bind(id)
1388            .execute(&self.pool)
1389            .await
1390            .map_err(db_error)?;
1391
1392        Ok(result.rows_affected() > 0)
1393    }
1394
1395    /// Update access count
1396    pub async fn touch(&self, id: i64) -> Result<()> {
1397        sqlx::query(
1398            "UPDATE memories SET access_count = access_count + 1, last_accessed = ? WHERE id = ?",
1399        )
1400        .bind(Utc::now())
1401        .bind(id)
1402        .execute(&self.pool)
1403        .await
1404        .map_err(db_error)?;
1405
1406        Ok(())
1407    }
1408
1409    /// Get unconsolidated memories
1410    pub async fn get_unconsolidated(
1411        &self,
1412        namespace_id: i64,
1413        limit: i32,
1414    ) -> Result<Vec<MemoryRow>> {
1415        let rows = sqlx::query_as::<_, MemoryRow>(
1416            r#"
1417            SELECT * FROM memories
1418            WHERE namespace_id = ?
1419            AND is_active = 1
1420            AND (metadata IS NULL OR json_extract(metadata, '$.agent.consolidated') IS NULL)
1421            ORDER BY created_at ASC
1422            LIMIT ?
1423            "#,
1424        )
1425        .bind(namespace_id)
1426        .bind(limit)
1427        .fetch_all(&self.pool)
1428        .await
1429        .map_err(db_error)?;
1430
1431        Ok(rows)
1432    }
1433
1434    /// Mark a memory as consolidated
1435    pub async fn mark_consolidated(&self, id: i64) -> Result<()> {
1436        sqlx::query(
1437            r#"
1438            UPDATE memories
1439            SET metadata = json_set(
1440                COALESCE(metadata, '{}'),
1441                '$.agent.consolidated',
1442                true,
1443                '$.agent.consolidated_at',
1444                datetime('now')
1445            ),
1446            updated_at = datetime('now')
1447            WHERE id = ?
1448            "#,
1449        )
1450        .bind(id)
1451        .execute(&self.pool)
1452        .await
1453        .map_err(db_error)?;
1454
1455        Ok(())
1456    }
1457
1458    /// Mark multiple memories as consolidated in a single query
1459    pub async fn mark_consolidated_batch(&self, ids: &[i64]) -> Result<()> {
1460        if ids.is_empty() {
1461            return Ok(());
1462        }
1463        let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1464        let query = format!(
1465            r#"
1466            UPDATE memories
1467            SET metadata = json_set(
1468                COALESCE(metadata, '{{}}'),
1469                '$.agent.consolidated',
1470                true,
1471                '$.agent.consolidated_at',
1472                datetime('now')
1473            ),
1474            updated_at = datetime('now')
1475            WHERE id IN ({})
1476            "#,
1477            placeholders
1478        );
1479        let mut q = sqlx::query(&query);
1480        for id in ids {
1481            q = q.bind(*id);
1482        }
1483        q.execute(&self.pool).await.map_err(db_error)?;
1484        Ok(())
1485    }
1486
1487    /// Search memories by text content (LIKE search)
1488    pub async fn search_by_text(
1489        &self,
1490        namespace_id: i64,
1491        query: &str,
1492        limit: i32,
1493        include_raw: bool,
1494    ) -> Result<Vec<MemoryRow>> {
1495        let pattern = format!("%{}%", query);
1496        let raw_clause = if include_raw {
1497            String::new()
1498        } else {
1499            format!("AND {RAW_ACTIVITY_FILTER_SQL}")
1500        };
1501        let rows = sqlx::query_as::<_, MemoryRow>(&format!(
1502            r#"
1503            SELECT * FROM memories
1504            WHERE namespace_id = ?
1505            AND is_active = 1
1506            AND content LIKE ?
1507            {}
1508            ORDER BY updated_at DESC
1509            LIMIT ?
1510            "#,
1511            raw_clause
1512        ))
1513        .bind(namespace_id)
1514        .bind(&pattern)
1515        .bind(limit)
1516        .fetch_all(&self.pool)
1517        .await
1518        .map_err(db_error)?;
1519
1520        Ok(rows)
1521    }
1522
1523    /// Search memories by text content and return domain memories.
1524    pub async fn search_by_text_memories(
1525        &self,
1526        namespace_id: i64,
1527        query: &str,
1528        limit: i32,
1529        include_raw: bool,
1530    ) -> Result<Vec<Memory>> {
1531        let rows = self
1532            .search_by_text(namespace_id, query, limit, include_raw)
1533            .await?;
1534        rows.into_iter()
1535            .map(|row| self.row_to_memory(row))
1536            .collect()
1537    }
1538
1539    /// Fetch recent, embedding-bearing cognition memories for vector-first semantic recall.
1540    pub async fn get_semantic_candidates(
1541        &self,
1542        params: SemanticCandidateParams<'_>,
1543    ) -> Result<Vec<Memory>> {
1544        let SemanticCandidateParams {
1545            namespace_id,
1546            perspective,
1547            limit,
1548            include_raw,
1549        } = params;
1550
1551        let noise_sql = if include_raw {
1552            String::new()
1553        } else {
1554            format!("AND {}", RAW_ACTIVITY_FILTER_SQL)
1555        };
1556
1557        let rows = if let Some(perspective) = perspective {
1558            let sql = if perspective.session_key.is_some() {
1559                format!(
1560                    r#"
1561                    SELECT * FROM memories
1562                    WHERE namespace_id = ?
1563                      AND is_active = 1
1564                      AND content_embedding IS NOT NULL
1565                      AND json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.observer') = ?
1566                      AND json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.subject') = ?
1567                      AND (
1568                          json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.session_key') = ?
1569                          OR EXISTS (
1570                              SELECT 1
1571                              FROM json_each(COALESCE(json_extract(metadata, '$.cognitive.session_keys'), '[]'))
1572                              WHERE value = ?
1573                          )
1574                      )
1575                      {noise_sql}
1576                    ORDER BY updated_at DESC, created_at DESC
1577                    LIMIT ?
1578                    "#
1579                )
1580            } else {
1581                format!(
1582                    r#"
1583                    SELECT * FROM memories
1584                    WHERE namespace_id = ?
1585                      AND is_active = 1
1586                      AND content_embedding IS NOT NULL
1587                      AND json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.observer') = ?
1588                      AND json_extract(COALESCE(metadata, '{{}}'), '$.cognitive.subject') = ?
1589                      {noise_sql}
1590                    ORDER BY updated_at DESC, created_at DESC
1591                    LIMIT ?
1592                    "#
1593                )
1594            };
1595
1596            let mut query = sqlx::query_as::<_, MemoryRow>(&sql)
1597                .bind(namespace_id)
1598                .bind(&perspective.observer)
1599                .bind(&perspective.subject);
1600
1601            if let Some(session_key) = &perspective.session_key {
1602                query = query.bind(session_key);
1603                query = query.bind(session_key);
1604            }
1605
1606            query
1607                .bind(limit)
1608                .fetch_all(&self.pool)
1609                .await
1610                .map_err(db_error)?
1611        } else {
1612            let sql = if include_raw {
1613                r#"
1614                SELECT * FROM memories
1615                WHERE namespace_id = ?
1616                  AND is_active = 1
1617                  AND content_embedding IS NOT NULL
1618                ORDER BY updated_at DESC, created_at DESC
1619                LIMIT ?
1620                "#
1621                .to_string()
1622            } else {
1623                format!(
1624                    r#"
1625                    SELECT * FROM memories
1626                    WHERE namespace_id = ?
1627                      AND is_active = 1
1628                      AND content_embedding IS NOT NULL
1629                      AND {}
1630                    ORDER BY updated_at DESC, created_at DESC
1631                    LIMIT ?
1632                    "#,
1633                    RAW_ACTIVITY_FILTER_SQL,
1634                )
1635            };
1636
1637            sqlx::query_as::<_, MemoryRow>(&sql)
1638                .bind(namespace_id)
1639                .bind(limit)
1640                .fetch_all(&self.pool)
1641                .await
1642                .map_err(db_error)?
1643        };
1644
1645        rows.into_iter()
1646            .map(|row| self.row_to_memory(row))
1647            .collect()
1648    }
1649
1650    /// List memories with optional filters
1651    pub async fn list_filtered(
1652        &self,
1653        namespace_id: i64,
1654        filters: ListMemoryFilters<'_>,
1655    ) -> Result<Vec<Memory>> {
1656        // Build WHERE clause dynamically
1657        let mut conditions = vec!["namespace_id = ?1".to_string(), "is_active = 1".to_string()];
1658        let mut param_idx = 2u32;
1659
1660        if filters.category.is_some() {
1661            conditions.push(format!("category = ?{}", param_idx));
1662            param_idx += 1;
1663        }
1664        if filters.since.is_some() {
1665            conditions.push(format!("created_at >= ?{}", param_idx));
1666            param_idx += 1;
1667        }
1668        if filters.until.is_some() {
1669            conditions.push(format!("created_at <= ?{}", param_idx));
1670            param_idx += 1;
1671        }
1672        if filters.content_like.is_some() {
1673            conditions.push(format!("content LIKE ?{}", param_idx));
1674            param_idx += 1;
1675        }
1676        if !filters.include_raw {
1677            conditions.push(RAW_ACTIVITY_FILTER_SQL.to_string());
1678        }
1679
1680        let sql = format!(
1681            "SELECT * FROM memories WHERE {} ORDER BY created_at DESC LIMIT ?{} OFFSET ?{}",
1682            conditions.join(" AND "),
1683            param_idx,
1684            param_idx + 1,
1685        );
1686
1687        let mut query = sqlx::query_as::<_, MemoryRow>(&sql).bind(namespace_id);
1688
1689        if let Some(cat) = filters.category {
1690            query = query.bind(cat.to_string());
1691        }
1692        if let Some(s) = filters.since {
1693            query = query.bind(s);
1694        }
1695        if let Some(u) = filters.until {
1696            query = query.bind(u);
1697        }
1698        if let Some(search) = filters.content_like {
1699            query = query.bind(format!("%{}%", search));
1700        }
1701
1702        let rows: Vec<MemoryRow> = query
1703            .bind(filters.limit)
1704            .bind(filters.offset)
1705            .fetch_all(&self.pool)
1706            .await
1707            .map_err(db_error)?;
1708
1709        rows.into_iter().map(|r| self.row_to_memory(r)).collect()
1710    }
1711
1712    /// List memories that still need cognition metadata backfilled.
1713    pub async fn list_missing_cognitive_metadata(
1714        &self,
1715        namespace_id: i64,
1716        limit: i64,
1717        offset: i64,
1718    ) -> Result<Vec<Memory>> {
1719        let rows: Vec<MemoryRow> = sqlx::query_as(
1720            r#"
1721            SELECT * FROM memories
1722            WHERE namespace_id = ?
1723            AND json_extract(COALESCE(metadata, '{}'), '$.cognitive.level') IS NULL
1724            ORDER BY id ASC
1725            LIMIT ? OFFSET ?
1726            "#,
1727        )
1728        .bind(namespace_id)
1729        .bind(limit)
1730        .bind(offset)
1731        .fetch_all(&self.pool)
1732        .await
1733        .map_err(db_error)?;
1734
1735        rows.into_iter().map(|r| self.row_to_memory(r)).collect()
1736    }
1737
1738    /// Count memories that still need cognition metadata backfilled.
1739    pub async fn count_missing_cognitive_metadata(&self, namespace_id: i64) -> Result<i64> {
1740        let count: i64 = sqlx::query_scalar(
1741            r#"
1742            SELECT COUNT(*) FROM memories
1743            WHERE namespace_id = ?
1744            AND json_extract(COALESCE(metadata, '{}'), '$.cognitive.level') IS NULL
1745            "#,
1746        )
1747        .bind(namespace_id)
1748        .fetch_one(&self.pool)
1749        .await
1750        .map_err(db_error)?;
1751
1752        Ok(count)
1753    }
1754
1755    /// Replace the metadata blob for a single memory.
1756    pub async fn update_memory_metadata(
1757        &self,
1758        memory_id: i64,
1759        metadata: &serde_json::Value,
1760    ) -> Result<()> {
1761        let metadata_json = serde_json::to_string(metadata)?;
1762        sqlx::query(
1763            r#"
1764            UPDATE memories
1765            SET metadata = ?, updated_at = ?
1766            WHERE id = ?
1767            "#,
1768        )
1769        .bind(metadata_json)
1770        .bind(Utc::now())
1771        .bind(memory_id)
1772        .execute(&self.pool)
1773        .await
1774        .map_err(db_error)?;
1775
1776        Ok(())
1777    }
1778
1779    /// List distinct session keys that have cognitive metadata but no digest coverage yet.
1780    pub async fn list_session_keys_without_digests(
1781        &self,
1782        namespace_id: i64,
1783        limit: i64,
1784    ) -> Result<Vec<String>> {
1785        let rows: Vec<(String,)> = sqlx::query_as(
1786            r#"
1787            SELECT DISTINCT json_extract(COALESCE(m.metadata, '{}'), '$.cognitive.session_key') AS session_key
1788            FROM memories m
1789            WHERE m.namespace_id = ?
1790            AND json_extract(COALESCE(m.metadata, '{}'), '$.cognitive.session_key') IS NOT NULL
1791            AND TRIM(json_extract(COALESCE(m.metadata, '{}'), '$.cognitive.session_key')) <> ''
1792            AND NOT EXISTS (
1793                SELECT 1 FROM session_digests sd
1794                WHERE sd.namespace_id = m.namespace_id
1795                AND sd.session_key = json_extract(COALESCE(m.metadata, '{}'), '$.cognitive.session_key')
1796            )
1797            ORDER BY session_key ASC
1798            LIMIT ?
1799            "#,
1800        )
1801        .bind(namespace_id)
1802        .bind(limit)
1803        .fetch_all(&self.pool)
1804        .await
1805        .map_err(db_error)?;
1806
1807        Ok(rows.into_iter().map(|(session_key,)| session_key).collect())
1808    }
1809
1810    /// Count distinct non-empty cognitive session keys present in active memories.
1811    pub async fn count_distinct_session_keys_with_cognition(
1812        &self,
1813        namespace_id: i64,
1814    ) -> Result<i64> {
1815        let count: i64 = sqlx::query_scalar(
1816            r#"
1817            SELECT COUNT(DISTINCT json_extract(COALESCE(metadata, '{}'), '$.cognitive.session_key'))
1818            FROM memories
1819            WHERE namespace_id = ?
1820              AND is_active = 1
1821              AND json_extract(COALESCE(metadata, '{}'), '$.cognitive.session_key') IS NOT NULL
1822              AND TRIM(json_extract(COALESCE(metadata, '{}'), '$.cognitive.session_key')) <> ''
1823            "#,
1824        )
1825        .bind(namespace_id)
1826        .fetch_one(&self.pool)
1827        .await
1828        .map_err(db_error)?;
1829
1830        Ok(count)
1831    }
1832
1833    /// List lineage-backed archived raw-activity memories that are safe to prune.
1834    pub async fn list_archived_raw_cleanup_candidates(
1835        &self,
1836        namespace_id: i64,
1837        older_than: DateTime<Utc>,
1838        limit: i64,
1839    ) -> Result<Vec<Memory>> {
1840        let rows: Vec<MemoryRow> = sqlx::query_as(
1841            r#"
1842            SELECT * FROM memories
1843            WHERE namespace_id = ?
1844            AND is_active = 0
1845            AND is_archived = 1
1846            AND (
1847                labels LIKE '%raw-activity%'
1848                OR json_extract(COALESCE(metadata, '{}'), '$.raw_activity') = 1
1849            )
1850            AND json_extract(COALESCE(metadata, '{}'), '$.distillation.summary_memory_id') IS NOT NULL
1851            AND created_at <= ?
1852            ORDER BY created_at ASC
1853            LIMIT ?
1854            "#,
1855        )
1856        .bind(namespace_id)
1857        .bind(older_than)
1858        .bind(limit)
1859        .fetch_all(&self.pool)
1860        .await
1861        .map_err(db_error)?;
1862
1863        rows.into_iter().map(|r| self.row_to_memory(r)).collect()
1864    }
1865
1866    /// Count lineage-backed archived raw-activity memories that are safe to prune.
1867    pub async fn count_archived_raw_cleanup_candidates(
1868        &self,
1869        namespace_id: i64,
1870        older_than: DateTime<Utc>,
1871    ) -> Result<i64> {
1872        let count: i64 = sqlx::query_scalar(
1873            r#"
1874            SELECT COUNT(*) FROM memories
1875            WHERE namespace_id = ?
1876            AND is_active = 0
1877            AND is_archived = 1
1878            AND (
1879                labels LIKE '%raw-activity%'
1880                OR json_extract(COALESCE(metadata, '{}'), '$.raw_activity') = 1
1881            )
1882            AND json_extract(COALESCE(metadata, '{}'), '$.distillation.summary_memory_id') IS NOT NULL
1883            AND created_at <= ?
1884            "#,
1885        )
1886        .bind(namespace_id)
1887        .bind(older_than)
1888        .fetch_one(&self.pool)
1889        .await
1890        .map_err(db_error)?;
1891
1892        Ok(count)
1893    }
1894
1895    /// Delete a batch of memories by id.
1896    pub async fn delete_batch(&self, ids: &[i64]) -> Result<u64> {
1897        if ids.is_empty() {
1898            return Ok(0);
1899        }
1900
1901        let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1902        let sql = format!("DELETE FROM memories WHERE id IN ({placeholders})");
1903        let mut query = sqlx::query(&sql);
1904        for id in ids {
1905            query = query.bind(*id);
1906        }
1907
1908        let result = query.execute(&self.pool).await.map_err(db_error)?;
1909        Ok(result.rows_affected())
1910    }
1911
1912    /// Delete memories matching a content pattern (for cleaning noise)
1913    pub async fn delete_by_content_pattern(&self, namespace_id: i64, pattern: &str) -> Result<u64> {
1914        let result = sqlx::query("DELETE FROM memories WHERE namespace_id = ? AND content LIKE ?")
1915            .bind(namespace_id)
1916            .bind(pattern)
1917            .execute(&self.pool)
1918            .await
1919            .map_err(db_error)?;
1920
1921        Ok(result.rows_affected())
1922    }
1923
1924    /// Count memories matching filters (for stats with time ranges)
1925    pub async fn count_filtered(
1926        &self,
1927        namespace_id: i64,
1928        category: Option<&str>,
1929        since: Option<DateTime<Utc>>,
1930        until: Option<DateTime<Utc>>,
1931        include_raw: bool,
1932    ) -> Result<i64> {
1933        let mut conditions = vec!["namespace_id = ?1".to_string(), "is_active = 1".to_string()];
1934        let mut param_idx = 2u32;
1935
1936        if category.is_some() {
1937            conditions.push(format!("category = ?{}", param_idx));
1938            param_idx += 1;
1939        }
1940        if since.is_some() {
1941            conditions.push(format!("created_at >= ?{}", param_idx));
1942            param_idx += 1;
1943        }
1944        if until.is_some() {
1945            conditions.push(format!("created_at <= ?{}", param_idx));
1946        }
1947        if !include_raw {
1948            conditions.push(RAW_ACTIVITY_FILTER_SQL.to_string());
1949        }
1950
1951        let sql = format!(
1952            "SELECT COUNT(*) FROM memories WHERE {}",
1953            conditions.join(" AND "),
1954        );
1955
1956        let mut query = sqlx::query_scalar::<_, i64>(&sql).bind(namespace_id);
1957
1958        if let Some(cat) = category {
1959            query = query.bind(cat.to_string());
1960        }
1961        if let Some(s) = since {
1962            query = query.bind(s);
1963        }
1964        if let Some(u) = until {
1965            query = query.bind(u);
1966        }
1967
1968        let count = query.fetch_one(&self.pool).await.map_err(db_error)?;
1969        Ok(count)
1970    }
1971
1972    /// Store a distilled summary and archive its source memories atomically.
1973    pub async fn store_distilled_summary(
1974        &self,
1975        params: StoreMemoryParams<'_>,
1976        source_ids: &[i64],
1977    ) -> Result<Memory> {
1978        let labels_json = serde_json::to_string(params.labels)?;
1979        let metadata_json = serde_json::to_string(params.metadata)?;
1980        let embedding_json = params.embedding.map(serde_json::to_string).transpose()?;
1981        let mut tx = self.pool.begin().await.map_err(db_error)?;
1982
1983        let result = sqlx::query(
1984            r#"
1985            INSERT INTO memories (
1986                namespace_id, content, category, memory_lane_type, labels, metadata,
1987                content_embedding, embedding_model, created_at, is_active, access_count
1988            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 1, 0)
1989            "#,
1990        )
1991        .bind(params.namespace_id)
1992        .bind(params.content)
1993        .bind(params.category.to_string())
1994        .bind(params.memory_lane_type.map(|t| t.to_string()))
1995        .bind(&labels_json)
1996        .bind(&metadata_json)
1997        .bind(&embedding_json)
1998        .bind(params.embedding_model)
1999        .bind(Utc::now())
2000        .execute(&mut *tx)
2001        .await
2002        .map_err(db_error)?;
2003
2004        let summary_id = if result.last_insert_rowid() == 0 {
2005            let row: Option<MemoryRow> = sqlx::query_as(
2006                "SELECT * FROM memories WHERE namespace_id = ? AND LOWER(TRIM(content)) = LOWER(TRIM(?)) ORDER BY created_at DESC LIMIT 1",
2007            )
2008            .bind(params.namespace_id)
2009            .bind(params.content)
2010            .fetch_optional(&mut *tx)
2011            .await
2012            .map_err(db_error)?;
2013            row.map(|memory| memory.id).ok_or_else(|| {
2014                nexus_core::NexusError::Storage(
2015                    "Duplicate distilled summary merged but matching row not found".to_string(),
2016                )
2017            })?
2018        } else {
2019            result.last_insert_rowid()
2020        };
2021
2022        if !source_ids.is_empty() {
2023            for source_id in source_ids {
2024                sqlx::query(
2025                    r#"
2026                    INSERT OR IGNORE INTO memory_evidence (
2027                        derived_memory_id,
2028                        source_memory_id,
2029                        evidence_role,
2030                        created_at
2031                    ) VALUES (?, ?, 'source', datetime('now'))
2032                    "#,
2033                )
2034                .bind(summary_id)
2035                .bind(*source_id)
2036                .execute(&mut *tx)
2037                .await
2038                .map_err(db_error)?;
2039            }
2040
2041            let placeholders = source_ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
2042            let sql = format!(
2043                r#"
2044                UPDATE memories
2045                SET
2046                    is_active = 0,
2047                    is_archived = 1,
2048                    updated_at = ?,
2049                    metadata = json_set(
2050                        COALESCE(metadata, '{{}}'),
2051                        '$.distillation.status', 'archived',
2052                        '$.distillation.summary_memory_id', ?,
2053                        '$.distillation.archived_at', ?
2054                    )
2055                WHERE id IN ({})
2056                "#,
2057                placeholders
2058            );
2059            let archived_at = Utc::now().to_rfc3339();
2060            let mut query = sqlx::query(&sql)
2061                .bind(Utc::now())
2062                .bind(summary_id)
2063                .bind(&archived_at);
2064            for source_id in source_ids {
2065                query = query.bind(*source_id);
2066            }
2067            query.execute(&mut *tx).await.map_err(db_error)?;
2068        }
2069
2070        tx.commit().await.map_err(db_error)?;
2071        self.get_by_id(summary_id).await?.ok_or_else(|| {
2072            nexus_core::NexusError::Storage(format!(
2073                "Failed to retrieve distilled summary with id {}",
2074                summary_id
2075            ))
2076        })
2077    }
2078
2079    fn row_to_memory(&self, row: MemoryRow) -> Result<Memory> {
2080        let labels: Vec<String> = serde_json::from_str(&row.labels).map_err(|e| {
2081            nexus_core::NexusError::Storage(format!(
2082                "corrupted labels JSON for memory {}: {e}",
2083                row.id
2084            ))
2085        })?;
2086        let metadata: serde_json::Value = serde_json::from_str(&row.metadata).map_err(|e| {
2087            nexus_core::NexusError::Storage(format!(
2088                "corrupted metadata JSON for memory {}: {e}",
2089                row.id
2090            ))
2091        })?;
2092        let embedding: Option<Vec<f32>> = row
2093            .content_embedding
2094            .map(|e| {
2095                serde_json::from_str(&e).map_err(|err| {
2096                    nexus_core::NexusError::Storage(format!(
2097                        "corrupted embedding JSON for memory {}: {err}",
2098                        row.id
2099                    ))
2100                })
2101            })
2102            .transpose()?;
2103
2104        Ok(Memory {
2105            id: row.id,
2106            namespace_id: row.namespace_id,
2107            content: row.content,
2108            category: parse_category(&row.category)?,
2109            memory_lane_type: match &row.memory_lane_type {
2110                Some(s) => parse_memory_lane_type(s)?,
2111                None => None,
2112            },
2113            labels,
2114            metadata,
2115            similarity_score: row.similarity_score,
2116            relevance_score: row.relevance_score,
2117            content_embedding: embedding,
2118            embedding_model: row.embedding_model,
2119            created_at: row.created_at,
2120            updated_at: row.updated_at,
2121            last_accessed: row.last_accessed,
2122            is_active: row.is_active,
2123            is_archived: row.is_archived,
2124            access_count: row.access_count,
2125        })
2126    }
2127
2128    // ---- Observability queries ----
2129
2130    /// List memory jobs with optional filters.
2131    pub async fn list_jobs(
2132        &self,
2133        namespace_id: i64,
2134        job_type: Option<&str>,
2135        status: Option<&str>,
2136        limit: i64,
2137        offset: i64,
2138    ) -> Result<Vec<MemoryJobRow>> {
2139        let mut where_clauses = vec!["namespace_id = ?".to_string()];
2140        if job_type.is_some() {
2141            where_clauses.push("job_type = ?".to_string());
2142        }
2143        if status.is_some() {
2144            where_clauses.push("status = ?".to_string());
2145        }
2146        let where_sql = where_clauses.join(" AND ");
2147
2148        let sql = format!(
2149            "SELECT * FROM memory_jobs WHERE {} ORDER BY created_at DESC LIMIT ? OFFSET ?",
2150            where_sql
2151        );
2152
2153        let mut query = sqlx::query_as::<_, MemoryJobRow>(&sql).bind(namespace_id);
2154        if let Some(jt) = job_type {
2155            query = query.bind(jt);
2156        }
2157        if let Some(st) = status {
2158            query = query.bind(st);
2159        }
2160        query = query.bind(limit).bind(offset);
2161
2162        let rows = query.fetch_all(&self.pool).await.map_err(db_error)?;
2163        Ok(rows)
2164    }
2165
2166    /// Count memory jobs with optional filters.
2167    pub async fn count_jobs(
2168        &self,
2169        namespace_id: i64,
2170        job_type: Option<&str>,
2171        status: Option<&str>,
2172    ) -> Result<i64> {
2173        let mut where_clauses = vec!["namespace_id = ?".to_string()];
2174        if job_type.is_some() {
2175            where_clauses.push("job_type = ?".to_string());
2176        }
2177        if status.is_some() {
2178            where_clauses.push("status = ?".to_string());
2179        }
2180        let where_sql = where_clauses.join(" AND ");
2181
2182        let sql = format!("SELECT COUNT(*) FROM memory_jobs WHERE {}", where_sql);
2183
2184        let mut query = sqlx::query_scalar::<_, i64>(&sql).bind(namespace_id);
2185        if let Some(jt) = job_type {
2186            query = query.bind(jt);
2187        }
2188        if let Some(st) = status {
2189            query = query.bind(st);
2190        }
2191
2192        let count = query.fetch_one(&self.pool).await.map_err(db_error)?;
2193        Ok(count)
2194    }
2195
2196    /// Count memory jobs grouped by status for a namespace.
2197    pub async fn count_jobs_by_status(
2198        &self,
2199        namespace_id: i64,
2200        job_type: Option<&str>,
2201    ) -> Result<Vec<(String, i64)>> {
2202        let mut where_clauses = vec!["namespace_id = ?".to_string()];
2203        if job_type.is_some() {
2204            where_clauses.push("job_type = ?".to_string());
2205        }
2206        let where_sql = where_clauses.join(" AND ");
2207
2208        let sql = format!(
2209            "SELECT status, COUNT(*) as cnt FROM memory_jobs WHERE {} GROUP BY status",
2210            where_sql
2211        );
2212
2213        let mut query = sqlx::query_as::<_, (String, i64)>(&sql).bind(namespace_id);
2214        if let Some(jt) = job_type {
2215            query = query.bind(jt);
2216        }
2217
2218        let rows = query.fetch_all(&self.pool).await.map_err(db_error)?;
2219        Ok(rows)
2220    }
2221
2222    /// Delete completed jobs that were last updated before the given timestamp.
2223    ///
2224    /// Returns the number of rows removed.
2225    pub async fn purge_completed_jobs(&self, older_than: DateTime<Utc>) -> Result<u64> {
2226        let result = sqlx::query(
2227            r#"
2228            DELETE FROM memory_jobs
2229            WHERE status = ? AND updated_at < ?
2230            "#,
2231        )
2232        .bind(memory_job_status::COMPLETED)
2233        .bind(older_than.format("%Y-%m-%d %H:%M:%S").to_string())
2234        .execute(&self.pool)
2235        .await
2236        .map_err(db_error)?;
2237
2238        Ok(result.rows_affected())
2239    }
2240
2241    /// Delete permanently failed jobs (attempts >= 5) that were last updated
2242    /// before the given timestamp.
2243    ///
2244    /// Returns the number of rows removed.
2245    pub async fn purge_permanently_failed_jobs(&self, older_than: DateTime<Utc>) -> Result<u64> {
2246        let result = sqlx::query(
2247            r#"
2248            DELETE FROM memory_jobs
2249            WHERE status = ? AND attempts >= ? AND updated_at < ?
2250            "#,
2251        )
2252        .bind(memory_job_status::FAILED)
2253        .bind(MAX_JOB_ATTEMPTS)
2254        .bind(older_than.format("%Y-%m-%d %H:%M:%S").to_string())
2255        .execute(&self.pool)
2256        .await
2257        .map_err(db_error)?;
2258
2259        Ok(result.rows_affected())
2260    }
2261
2262    /// List session digests with optional session_key filter.
2263    pub async fn list_digests(
2264        &self,
2265        namespace_id: i64,
2266        session_key: Option<&str>,
2267        limit: i64,
2268        offset: i64,
2269    ) -> Result<Vec<SessionDigestRow>> {
2270        let mut query = if let Some(sk) = session_key {
2271            sqlx::query_as::<_, SessionDigestRow>(
2272                "SELECT * FROM session_digests WHERE namespace_id = ? AND session_key = ? ORDER BY created_at DESC LIMIT ? OFFSET ?"
2273            )
2274            .bind(namespace_id)
2275            .bind(sk)
2276        } else {
2277            sqlx::query_as::<_, SessionDigestRow>(
2278                "SELECT * FROM session_digests WHERE namespace_id = ? ORDER BY created_at DESC LIMIT ? OFFSET ?"
2279            )
2280            .bind(namespace_id)
2281        };
2282
2283        query = query.bind(limit).bind(offset);
2284        let rows = query.fetch_all(&self.pool).await.map_err(db_error)?;
2285        Ok(rows)
2286    }
2287
2288    /// Count session digests for a namespace, optionally filtered by session_key.
2289    pub async fn count_digests(&self, namespace_id: i64, session_key: Option<&str>) -> Result<i64> {
2290        let query = if let Some(sk) = session_key {
2291            sqlx::query_scalar(
2292                "SELECT COUNT(*) FROM session_digests WHERE namespace_id = ? AND session_key = ?",
2293            )
2294            .bind(namespace_id)
2295            .bind(sk)
2296        } else {
2297            sqlx::query_scalar("SELECT COUNT(*) FROM session_digests WHERE namespace_id = ?")
2298                .bind(namespace_id)
2299        };
2300
2301        let count: i64 = query.fetch_one(&self.pool).await.map_err(db_error)?;
2302        Ok(count)
2303    }
2304
2305    /// Count evidence edges for a namespace.
2306    pub async fn count_evidence(&self, namespace_id: i64) -> Result<i64> {
2307        let count: i64 = sqlx::query_scalar(
2308            "SELECT COUNT(*) FROM memory_evidence WHERE derived_memory_id IN (SELECT id FROM memories WHERE namespace_id = ?)"
2309        )
2310        .bind(namespace_id)
2311        .fetch_one(&self.pool)
2312        .await
2313        .map_err(db_error)?;
2314        Ok(count)
2315    }
2316
2317    /// Record a system metric sample.
2318    pub async fn record_metric(
2319        &self,
2320        metric_name: &str,
2321        metric_value: f64,
2322        labels: &serde_json::Value,
2323    ) -> Result<i64> {
2324        let labels_json = serde_json::to_string(labels)?;
2325        let id: i64 = sqlx::query_scalar(
2326            r#"
2327            INSERT INTO system_metrics (metric_name, metric_value, labels, recorded_at)
2328            VALUES (?, ?, ?, ?)
2329            RETURNING id
2330            "#,
2331        )
2332        .bind(metric_name)
2333        .bind(metric_value)
2334        .bind(labels_json)
2335        .bind(Utc::now())
2336        .fetch_one(&self.pool)
2337        .await
2338        .map_err(db_error)?;
2339        Ok(id)
2340    }
2341
2342    /// Persist multiple metric samples in a single transaction.
2343    pub async fn record_metrics_batch(&self, samples: &[MetricSample]) -> Result<()> {
2344        if samples.is_empty() {
2345            return Ok(());
2346        }
2347
2348        let mut tx = self.pool.begin().await.map_err(db_error)?;
2349        for sample in samples {
2350            let labels_json = serde_json::to_string(&sample.labels)?;
2351            sqlx::query(
2352                r#"
2353                INSERT INTO system_metrics (metric_name, metric_value, labels, recorded_at)
2354                VALUES (?, ?, ?, ?)
2355                "#,
2356            )
2357            .bind(&sample.metric_name)
2358            .bind(sample.metric_value)
2359            .bind(labels_json)
2360            .bind(Utc::now())
2361            .execute(&mut *tx)
2362            .await
2363            .map_err(db_error)?;
2364        }
2365        tx.commit().await.map_err(db_error)?;
2366        Ok(())
2367    }
2368
2369    /// Fetch the newest metric samples for a namespace and optional prefix.
2370    pub async fn latest_metrics_for_namespace(
2371        &self,
2372        namespace_id: i64,
2373        metric_prefix: Option<&str>,
2374        limit: i64,
2375    ) -> Result<Vec<SystemMetricRow>> {
2376        let limit = limit.max(1);
2377        let rows = if let Some(prefix) = metric_prefix {
2378            sqlx::query_as::<_, SystemMetricRow>(
2379                r#"
2380                SELECT *
2381                FROM system_metrics
2382                WHERE json_extract(COALESCE(labels, '{}'), '$.namespace_id') = ?
2383                  AND metric_name LIKE ?
2384                ORDER BY recorded_at DESC, id DESC
2385                LIMIT ?
2386                "#,
2387            )
2388            .bind(namespace_id)
2389            .bind(format!("{prefix}%"))
2390            .bind(limit)
2391            .fetch_all(&self.pool)
2392            .await
2393            .map_err(db_error)?
2394        } else {
2395            sqlx::query_as::<_, SystemMetricRow>(
2396                r#"
2397                SELECT *
2398                FROM system_metrics
2399                WHERE json_extract(COALESCE(labels, '{}'), '$.namespace_id') = ?
2400                ORDER BY recorded_at DESC, id DESC
2401                LIMIT ?
2402                "#,
2403            )
2404            .bind(namespace_id)
2405            .bind(limit)
2406            .fetch_all(&self.pool)
2407            .await
2408            .map_err(db_error)?
2409        };
2410        Ok(rows)
2411    }
2412
2413    /// Count active memories for a namespace at one cognitive level.
2414    pub async fn count_by_cognitive_level(
2415        &self,
2416        namespace_id: i64,
2417        level: CognitiveLevel,
2418    ) -> Result<i64> {
2419        let count: i64 = sqlx::query_scalar(
2420            r#"
2421            SELECT COUNT(*) FROM memories
2422            WHERE namespace_id = ?
2423              AND is_active = 1
2424              AND json_extract(COALESCE(metadata, '{}'), '$.cognitive.level') = ?
2425            "#,
2426        )
2427        .bind(namespace_id)
2428        .bind(level.as_str())
2429        .fetch_one(&self.pool)
2430        .await
2431        .map_err(db_error)?;
2432        Ok(count)
2433    }
2434}
2435
2436async fn insert_memory_tx(
2437    tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>,
2438    params: &StoreMemoryParams<'_>,
2439) -> Result<i64> {
2440    let labels_json = serde_json::to_string(params.labels)?;
2441    let metadata_json = serde_json::to_string(params.metadata)?;
2442    let embedding_json = params.embedding.map(serde_json::to_string).transpose()?;
2443
2444    let result = sqlx::query(
2445        r#"
2446        INSERT INTO memories (
2447            namespace_id, content, category, memory_lane_type, labels, metadata,
2448            content_embedding, embedding_model, created_at, is_active, access_count
2449        ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 1, 0)
2450        "#,
2451    )
2452    .bind(params.namespace_id)
2453    .bind(params.content)
2454    .bind(params.category.to_string())
2455    .bind(params.memory_lane_type.map(|t| t.to_string()))
2456    .bind(&labels_json)
2457    .bind(&metadata_json)
2458    .bind(&embedding_json)
2459    .bind(params.embedding_model)
2460    .bind(Utc::now())
2461    .execute(&mut **tx)
2462    .await
2463    .map_err(db_error)?;
2464
2465    let inserted_id = result.last_insert_rowid();
2466    if inserted_id != 0 {
2467        return Ok(inserted_id);
2468    }
2469
2470    let row: Option<MemoryRow> = sqlx::query_as(
2471        "SELECT * FROM memories WHERE namespace_id = ? AND LOWER(TRIM(content)) = LOWER(TRIM(?)) AND is_active = 1 ORDER BY created_at DESC LIMIT 1",
2472    )
2473    .bind(params.namespace_id)
2474    .bind(params.content)
2475    .fetch_optional(&mut **tx)
2476    .await
2477    .map_err(db_error)?;
2478
2479    row.map(|memory| memory.id).ok_or_else(|| {
2480        nexus_core::NexusError::Storage(
2481            "Duplicate merged by trigger but matching row not found".to_string(),
2482        )
2483    })
2484}
2485
2486async fn insert_evidence_tx(
2487    tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>,
2488    derived_memory_id: i64,
2489    source_memory_id: i64,
2490    evidence_role: &str,
2491) -> Result<()> {
2492    sqlx::query(
2493        r#"
2494        INSERT OR IGNORE INTO memory_evidence (derived_memory_id, source_memory_id, evidence_role, created_at)
2495        VALUES (?, ?, ?, datetime('now'))
2496        "#,
2497    )
2498    .bind(derived_memory_id)
2499    .bind(source_memory_id)
2500    .bind(evidence_role)
2501    .execute(&mut **tx)
2502    .await
2503    .map_err(db_error)?;
2504
2505    Ok(())
2506}
2507
2508fn new_claim_token(lease_owner: &str) -> String {
2509    let nanos = Utc::now().timestamp_nanos_opt().unwrap_or_default();
2510    format!("{lease_owner}-{nanos}-{}", std::process::id())
2511}
2512
2513fn merge_labels(existing: &[String], incoming: &[String]) -> Vec<String> {
2514    let mut merged = existing.to_vec();
2515    for label in incoming {
2516        if !merged
2517            .iter()
2518            .any(|current| current.eq_ignore_ascii_case(label))
2519        {
2520            merged.push(label.clone());
2521        }
2522    }
2523    merged
2524}
2525
2526fn merge_duplicate_metadata(
2527    existing: &serde_json::Value,
2528    incoming: &serde_json::Value,
2529) -> serde_json::Value {
2530    let mut merged = existing.clone();
2531
2532    if let Some(session_key) = incoming
2533        .pointer("/cognitive/session_key")
2534        .and_then(serde_json::Value::as_str)
2535    {
2536        let mut session_keys = existing
2537            .pointer("/cognitive/session_keys")
2538            .and_then(serde_json::Value::as_array)
2539            .cloned()
2540            .unwrap_or_default();
2541        if let Some(existing_key) = existing
2542            .pointer("/cognitive/session_key")
2543            .and_then(serde_json::Value::as_str)
2544        {
2545            push_unique_json_string(&mut session_keys, existing_key);
2546        }
2547        push_unique_json_string(&mut session_keys, session_key);
2548        ensure_object_path(&mut merged, "cognitive").insert(
2549            "session_key".to_string(),
2550            serde_json::Value::String(session_key.to_string()),
2551        );
2552        ensure_object_path(&mut merged, "cognitive").insert(
2553            "session_keys".to_string(),
2554            serde_json::Value::Array(session_keys),
2555        );
2556    }
2557
2558    if let Some(derived_session_key) = incoming
2559        .pointer("/source/derived_session_key")
2560        .and_then(serde_json::Value::as_str)
2561    {
2562        let mut derived_keys = existing
2563            .pointer("/source/derived_session_keys")
2564            .and_then(serde_json::Value::as_array)
2565            .cloned()
2566            .unwrap_or_default();
2567        if let Some(existing_key) = existing
2568            .pointer("/source/derived_session_key")
2569            .and_then(serde_json::Value::as_str)
2570        {
2571            push_unique_json_string(&mut derived_keys, existing_key);
2572        }
2573        push_unique_json_string(&mut derived_keys, derived_session_key);
2574        ensure_object_path(&mut merged, "source").insert(
2575            "derived_session_key".to_string(),
2576            serde_json::Value::String(derived_session_key.to_string()),
2577        );
2578        ensure_object_path(&mut merged, "source").insert(
2579            "derived_session_keys".to_string(),
2580            serde_json::Value::Array(derived_keys),
2581        );
2582    }
2583
2584    merged
2585}
2586
2587fn push_unique_json_string(values: &mut Vec<serde_json::Value>, candidate: &str) {
2588    if values
2589        .iter()
2590        .filter_map(serde_json::Value::as_str)
2591        .any(|current| current.eq_ignore_ascii_case(candidate))
2592    {
2593        return;
2594    }
2595    values.push(serde_json::Value::String(candidate.to_string()));
2596}
2597
2598fn ensure_object_path<'a>(
2599    root: &'a mut serde_json::Value,
2600    key: &str,
2601) -> &'a mut serde_json::Map<String, serde_json::Value> {
2602    if !root.is_object() {
2603        *root = serde_json::Value::Object(serde_json::Map::new());
2604    }
2605
2606    let object = root.as_object_mut().expect("root object ensured");
2607    let entry = object
2608        .entry(key.to_string())
2609        .or_insert_with(|| serde_json::Value::Object(serde_json::Map::new()));
2610    if !entry.is_object() {
2611        *entry = serde_json::Value::Object(serde_json::Map::new());
2612    }
2613
2614    entry.as_object_mut().expect("child object ensured")
2615}
2616
2617/// Repository for namespace operations
2618pub struct NamespaceRepository {
2619    pool: SqlitePool,
2620}
2621
2622impl NamespaceRepository {
2623    pub fn new(pool: SqlitePool) -> Self {
2624        Self { pool }
2625    }
2626
2627    /// Get or create a namespace
2628    pub async fn get_or_create(&self, name: &str, agent_type: &str) -> Result<AgentNamespace> {
2629        if let Some(ns) = self.get_by_name(name).await? {
2630            return Ok(ns);
2631        }
2632
2633        let result = sqlx::query(
2634            "INSERT INTO agent_namespaces (name, agent_type, created_at) VALUES (?, ?, ?)",
2635        )
2636        .bind(name)
2637        .bind(agent_type)
2638        .bind(Utc::now())
2639        .execute(&self.pool)
2640        .await
2641        .map_err(db_error)?;
2642
2643        let id = result.last_insert_rowid();
2644        Ok(AgentNamespace {
2645            id,
2646            name: name.to_string(),
2647            description: None,
2648            agent_type: agent_type.to_string(),
2649            created_at: Utc::now(),
2650            updated_at: None,
2651        })
2652    }
2653
2654    /// Get a namespace by name
2655    pub async fn get_by_name(&self, name: &str) -> Result<Option<AgentNamespace>> {
2656        let row: Option<AgentNamespaceRow> =
2657            sqlx::query_as("SELECT * FROM agent_namespaces WHERE name = ?")
2658                .bind(name)
2659                .fetch_optional(&self.pool)
2660                .await
2661                .map_err(db_error)?;
2662
2663        Ok(row.map(|r| AgentNamespace {
2664            id: r.id,
2665            name: r.name,
2666            description: r.description,
2667            agent_type: r.agent_type,
2668            created_at: r.created_at,
2669            updated_at: r.updated_at,
2670        }))
2671    }
2672
2673    /// Get a namespace by ID
2674    pub async fn get_by_id(&self, id: i64) -> Result<Option<AgentNamespace>> {
2675        let row: Option<AgentNamespaceRow> =
2676            sqlx::query_as("SELECT * FROM agent_namespaces WHERE id = ?")
2677                .bind(id)
2678                .fetch_optional(&self.pool)
2679                .await
2680                .map_err(db_error)?;
2681
2682        Ok(row.map(|r| AgentNamespace {
2683            id: r.id,
2684            name: r.name,
2685            description: r.description,
2686            agent_type: r.agent_type,
2687            created_at: r.created_at,
2688            updated_at: r.updated_at,
2689        }))
2690    }
2691
2692    /// List all namespaces
2693    pub async fn list_all(&self) -> Result<Vec<AgentNamespace>> {
2694        let rows: Vec<AgentNamespaceRow> =
2695            sqlx::query_as("SELECT * FROM agent_namespaces ORDER BY name")
2696                .fetch_all(&self.pool)
2697                .await
2698                .map_err(db_error)?;
2699
2700        Ok(rows
2701            .into_iter()
2702            .map(|r| AgentNamespace {
2703                id: r.id,
2704                name: r.name,
2705                description: r.description,
2706                agent_type: r.agent_type,
2707                created_at: r.created_at,
2708                updated_at: r.updated_at,
2709            })
2710            .collect())
2711    }
2712}
2713
2714/// Repository for processed file operations (inbox deduplication)
2715pub struct ProcessedFileRepository<'a> {
2716    pub pool: &'a SqlitePool,
2717}
2718
2719impl<'a> ProcessedFileRepository<'a> {
2720    pub fn new(pool: &'a SqlitePool) -> Self {
2721        Self { pool }
2722    }
2723
2724    /// Check if a file has been successfully processed
2725    pub async fn is_processed(&self, namespace_id: i64, path: &str) -> Result<bool> {
2726        let row: Option<(i64,)> =
2727            sqlx::query_as("SELECT id FROM processed_files WHERE namespace_id = ? AND path = ? AND status = 'completed'")
2728                .bind(namespace_id)
2729                .bind(path)
2730                .fetch_optional(self.pool)
2731                .await
2732                .map_err(db_error)?;
2733
2734        Ok(row.is_some())
2735    }
2736
2737    /// Get all completed file paths for a namespace (for batch dedup checks)
2738    pub async fn get_completed_paths(
2739        &self,
2740        namespace_id: i64,
2741    ) -> Result<std::collections::HashSet<String>> {
2742        let rows: Vec<(String,)> = sqlx::query_as(
2743            "SELECT path FROM processed_files WHERE namespace_id = ? AND status = 'completed'",
2744        )
2745        .bind(namespace_id)
2746        .fetch_all(self.pool)
2747        .await
2748        .map_err(db_error)?;
2749
2750        Ok(rows.into_iter().map(|r| r.0).collect())
2751    }
2752
2753    /// Mark a file as being processed
2754    pub async fn mark_processing(
2755        &self,
2756        namespace_id: i64,
2757        path: &str,
2758        content_hash: Option<&str>,
2759    ) -> Result<i64> {
2760        let id: i64 = sqlx::query_scalar(
2761            r#"
2762            INSERT INTO processed_files (namespace_id, path, content_hash, status, updated_at)
2763            VALUES (?, ?, ?, 'processing', datetime('now'))
2764            ON CONFLICT(namespace_id, path) DO UPDATE SET
2765                content_hash = excluded.content_hash,
2766                status = 'processing',
2767                updated_at = datetime('now')
2768            RETURNING id
2769            "#,
2770        )
2771        .bind(namespace_id)
2772        .bind(path)
2773        .bind(content_hash)
2774        .fetch_one(self.pool)
2775        .await
2776        .map_err(db_error)?;
2777
2778        Ok(id)
2779    }
2780
2781    /// Mark a file as successfully processed with memory reference
2782    pub async fn mark_processed(&self, id: i64, memory_id: i64) -> Result<()> {
2783        sqlx::query(
2784            r#"
2785            UPDATE processed_files
2786            SET status = 'completed', memory_id = ?, processed_at = datetime('now'), updated_at = datetime('now')
2787            WHERE id = ?
2788            "#
2789        )
2790        .bind(memory_id)
2791        .bind(id)
2792        .execute(self.pool)
2793        .await
2794        .map_err(db_error)?;
2795
2796        Ok(())
2797    }
2798
2799    /// Mark a file as failed
2800    pub async fn mark_failed(&self, id: i64, error: &str) -> Result<()> {
2801        sqlx::query(
2802            r#"
2803            UPDATE processed_files
2804            SET status = 'failed', last_error = ?, updated_at = datetime('now')
2805            WHERE id = ?
2806            "#,
2807        )
2808        .bind(error)
2809        .bind(id)
2810        .execute(self.pool)
2811        .await
2812        .map_err(db_error)?;
2813
2814        Ok(())
2815    }
2816
2817    /// Get files pending processing
2818    pub async fn get_pending(
2819        &self,
2820        namespace_id: i64,
2821        limit: i32,
2822    ) -> Result<Vec<ProcessedFileRow>> {
2823        let rows = sqlx::query_as::<_, ProcessedFileRow>(
2824            r#"
2825            SELECT * FROM processed_files
2826            WHERE namespace_id = ? AND status = 'pending'
2827            ORDER BY created_at ASC
2828            LIMIT ?
2829            "#,
2830        )
2831        .bind(namespace_id)
2832        .bind(limit)
2833        .fetch_all(self.pool)
2834        .await
2835        .map_err(db_error)?;
2836
2837        Ok(rows)
2838    }
2839
2840    /// Clear all processed files for a namespace
2841    pub async fn clear_namespace(&self, namespace_id: i64) -> Result<u64> {
2842        let result = sqlx::query("DELETE FROM processed_files WHERE namespace_id = ?")
2843            .bind(namespace_id)
2844            .execute(self.pool)
2845            .await
2846            .map_err(db_error)?;
2847
2848        Ok(result.rows_affected())
2849    }
2850}
2851
2852/// Repository for memory relationship operations
2853pub struct MemoryRelationRepository<'a> {
2854    pub pool: &'a SqlitePool,
2855}
2856
2857impl<'a> MemoryRelationRepository<'a> {
2858    pub fn new(pool: &'a SqlitePool) -> Self {
2859        Self { pool }
2860    }
2861
2862    /// Store a relationship between two memories
2863    pub async fn store(
2864        &self,
2865        source_id: i64,
2866        target_id: i64,
2867        relation_type: &str,
2868        strength: f32,
2869    ) -> Result<i64> {
2870        let id: i64 = sqlx::query_scalar(
2871            r#"
2872            INSERT INTO memory_relations (source_memory_id, target_memory_id, relation_type, strength, created_at)
2873            VALUES (?, ?, ?, ?, datetime('now'))
2874            ON CONFLICT(source_memory_id, target_memory_id, relation_type) DO UPDATE SET
2875                strength = excluded.strength,
2876                created_at = datetime('now')
2877            RETURNING id
2878            "#
2879        )
2880        .bind(source_id)
2881        .bind(target_id)
2882        .bind(relation_type)
2883        .bind(strength)
2884        .fetch_one(self.pool)
2885        .await
2886        .map_err(db_error)?;
2887
2888        Ok(id)
2889    }
2890
2891    /// Get all related memories for a given memory
2892    pub async fn get_related(&self, memory_id: i64) -> Result<Vec<(i64, String, f32)>> {
2893        let rows: Vec<(i64, String, f32)> = sqlx::query_as(
2894            r#"
2895            SELECT target_memory_id as memory_id, relation_type, strength
2896            FROM memory_relations
2897            WHERE source_memory_id = ?
2898            UNION
2899            SELECT source_memory_id as memory_id, relation_type, strength
2900            FROM memory_relations
2901            WHERE target_memory_id = ?
2902            ORDER BY strength DESC
2903            "#,
2904        )
2905        .bind(memory_id)
2906        .bind(memory_id)
2907        .fetch_all(self.pool)
2908        .await
2909        .map_err(db_error)?;
2910
2911        Ok(rows)
2912    }
2913
2914    /// Delete all relations for a memory
2915    pub async fn delete_for_memory(&self, memory_id: i64) -> Result<u64> {
2916        let result = sqlx::query(
2917            "DELETE FROM memory_relations WHERE source_memory_id = ? OR target_memory_id = ?",
2918        )
2919        .bind(memory_id)
2920        .bind(memory_id)
2921        .execute(self.pool)
2922        .await
2923        .map_err(db_error)?;
2924
2925        Ok(result.rows_affected())
2926    }
2927}
2928
2929fn parse_category(s: &str) -> Result<Category> {
2930    match MemoryCategory::parse(s) {
2931        Some(cat) => Ok(cat),
2932        None => Err(nexus_core::NexusError::Storage(format!(
2933            "Unknown memory category '{s}' persisted in database; row may be corrupted"
2934        ))),
2935    }
2936}
2937
2938fn parse_memory_lane_type(s: &str) -> Result<Option<MemoryLaneType>> {
2939    match MemoryLaneType::parse(s) {
2940        Some(t) => Ok(Some(t)),
2941        None => Err(nexus_core::NexusError::Storage(format!(
2942            "Unknown memory_lane_type '{s}' persisted in database; row may be corrupted"
2943        ))),
2944    }
2945}
2946
2947#[cfg(test)]
2948mod tests {
2949    use super::*;
2950    use nexus_core::MemoryLanePriorityType;
2951    use sqlx::sqlite::SqlitePoolOptions;
2952
2953    fn cognitive_metadata(
2954        level: CognitiveLevel,
2955        perspective: &PerspectiveKey,
2956        times_reinforced: i64,
2957        times_contradicted: i64,
2958    ) -> serde_json::Value {
2959        serde_json::json!({
2960            "cognitive": {
2961                "level": level.as_str(),
2962                "observer": perspective.observer,
2963                "subject": perspective.subject,
2964                "session_key": perspective.session_key,
2965                "source_memory_ids": [],
2966                "confidence": 0.9,
2967                "times_reinforced": times_reinforced,
2968                "times_contradicted": times_contradicted,
2969                "generated_by": "test",
2970            }
2971        })
2972    }
2973
2974    #[test]
2975    fn test_parse_category() {
2976        assert!(matches!(parse_category("facts"), Ok(Category::Facts)));
2977        assert!(matches!(
2978            parse_category("preferences"),
2979            Ok(Category::Preferences)
2980        ));
2981        assert!(parse_category("unknown").is_err());
2982    }
2983
2984    #[test]
2985    fn test_parse_memory_lane_type() {
2986        let correction = parse_memory_lane_type("correction");
2987        assert!(matches!(
2988            correction,
2989            Ok(Some(MemoryLaneType::Priority(
2990                MemoryLanePriorityType::Correction
2991            )))
2992        ));
2993
2994        let pattern_seed = parse_memory_lane_type("pattern_seed");
2995        assert!(matches!(
2996            pattern_seed,
2997            Ok(Some(MemoryLaneType::Priority(
2998                MemoryLanePriorityType::PatternSeed
2999            )))
3000        ));
3001
3002        assert!(parse_memory_lane_type("unknown").is_err());
3003    }
3004
3005    #[test]
3006    fn test_parse_category_all_variants() {
3007        assert!(matches!(parse_category("general"), Ok(Category::General)));
3008        assert!(matches!(parse_category("session"), Ok(Category::Session)));
3009        assert!(matches!(parse_category("context"), Ok(Category::Context)));
3010        assert!(matches!(
3011            parse_category("specifications"),
3012            Ok(Category::Specifications)
3013        ));
3014        assert!(matches!(parse_category("facts"), Ok(Category::Facts)));
3015        assert!(matches!(
3016            parse_category("preferences"),
3017            Ok(Category::Preferences)
3018        ));
3019        // Unknown values are rejected (fail-closed)
3020        assert!(parse_category("bogus").is_err());
3021        assert!(parse_category("").is_err());
3022    }
3023
3024    #[test]
3025    fn test_store_memory_params_fields() {
3026        // Verify StoreMemoryParams can be constructed with all fields
3027        let params = StoreMemoryParams {
3028            namespace_id: 1,
3029            content: "test content",
3030            category: &Category::General,
3031            memory_lane_type: None,
3032            labels: &[],
3033            metadata: &serde_json::Value::Null,
3034            embedding: None,
3035            embedding_model: None,
3036        };
3037        assert_eq!(params.namespace_id, 1);
3038        assert_eq!(params.content, "test content");
3039        assert!(params.labels.is_empty());
3040    }
3041
3042    #[test]
3043    fn test_merge_duplicate_metadata_preserves_multiple_session_keys() {
3044        let existing = serde_json::json!({
3045            "cognitive": {
3046                "session_key": "session-a"
3047            },
3048            "source": {
3049                "derived_session_key": "session-a"
3050            }
3051        });
3052        let incoming = serde_json::json!({
3053            "cognitive": {
3054                "session_key": "session-b"
3055            },
3056            "source": {
3057                "derived_session_key": "session-b"
3058            }
3059        });
3060
3061        let merged = merge_duplicate_metadata(&existing, &incoming);
3062        assert_eq!(merged["cognitive"]["session_key"], "session-b");
3063        assert_eq!(
3064            merged["cognitive"]["session_keys"],
3065            serde_json::json!(["session-a", "session-b"])
3066        );
3067        assert_eq!(
3068            merged["source"]["derived_session_keys"],
3069            serde_json::json!(["session-a", "session-b"])
3070        );
3071    }
3072
3073    // ---- Phase 2: Job, Digest, Evidence integration tests ----
3074
3075    async fn setup_test_db() -> SqlitePool {
3076        let pool = SqlitePoolOptions::new()
3077            .max_connections(1)
3078            .connect("sqlite::memory:")
3079            .await
3080            .unwrap();
3081        crate::migrations::run_migrations(&pool).await.unwrap();
3082        pool
3083    }
3084
3085    async fn create_namespace(pool: &SqlitePool, name: &str) -> i64 {
3086        let ns = NamespaceRepository::new(pool.clone());
3087        ns.get_or_create(name, "test").await.unwrap();
3088        ns.get_by_name(name).await.unwrap().unwrap().id
3089    }
3090
3091    // ---- Regression tests for audit findings ----
3092
3093    /// Audit finding #1: `get_by_content` must match `namespace_id + content`,
3094    /// not "latest active row". This prevents the duplicate-insert fallback
3095    /// in `store` from returning the wrong memory.
3096    #[tokio::test]
3097    async fn test_get_by_content_matches_actual_content() {
3098        let pool = setup_test_db().await;
3099        let ns_id = create_namespace(&pool, "test-agent").await;
3100        let repo = MemoryRepository::new(pool);
3101
3102        // Store two different memories in the same namespace.
3103        let mem_a = repo
3104            .store(StoreMemoryParams {
3105                namespace_id: ns_id,
3106                content: "first memory content",
3107                category: &Category::General,
3108                memory_lane_type: None,
3109                labels: &[],
3110                metadata: &serde_json::Value::Null,
3111                embedding: None,
3112                embedding_model: None,
3113            })
3114            .await
3115            .unwrap();
3116
3117        let mem_b = repo
3118            .store(StoreMemoryParams {
3119                namespace_id: ns_id,
3120                content: "second memory content",
3121                category: &Category::General,
3122                memory_lane_type: None,
3123                labels: &[],
3124                metadata: &serde_json::Value::Null,
3125                embedding: None,
3126                embedding_model: None,
3127            })
3128            .await
3129            .unwrap();
3130
3131        assert_ne!(mem_a.id, mem_b.id);
3132
3133        // get_by_content must return the memory with matching content,
3134        // NOT the newest one.
3135        let found_a = repo
3136            .get_by_content(ns_id, "first memory content")
3137            .await
3138            .unwrap();
3139        assert_eq!(found_a.id, mem_a.id);
3140        assert_eq!(found_a.content, "first memory content");
3141
3142        let found_b = repo
3143            .get_by_content(ns_id, "second memory content")
3144            .await
3145            .unwrap();
3146        assert_eq!(found_b.id, mem_b.id);
3147        assert_eq!(found_b.content, "second memory content");
3148
3149        // Non-existent content must error.
3150        let result = repo.get_by_content(ns_id, "nonexistent").await;
3151        assert!(result.is_err());
3152    }
3153
3154    #[tokio::test]
3155    async fn test_enqueue_and_claim_jobs() {
3156        let pool = setup_test_db().await;
3157        let ns_id = create_namespace(&pool, "test-agent").await;
3158        let repo = MemoryRepository::new(pool);
3159
3160        // Enqueue two jobs with different priorities.
3161        let id1 = repo
3162            .enqueue_job(EnqueueJobParams {
3163                namespace_id: ns_id,
3164                job_type: "derive_memory",
3165                priority: 100,
3166                perspective: None,
3167                payload: &serde_json::json!({"memory_id": 1}),
3168            })
3169            .await
3170            .unwrap();
3171
3172        let id2 = repo
3173            .enqueue_job(EnqueueJobParams {
3174                namespace_id: ns_id,
3175                job_type: "derive_memory",
3176                priority: 50,
3177                perspective: None,
3178                payload: &serde_json::json!({"memory_id": 2}),
3179            })
3180            .await
3181            .unwrap();
3182
3183        assert!(id1 > 0);
3184        assert!(id2 > 0);
3185        assert_ne!(id1, id2);
3186
3187        // Claim one job — higher priority first.
3188        let claimed = repo
3189            .claim_jobs(ns_id, "derive_memory", "worker-1", 120, 1)
3190            .await
3191            .unwrap();
3192
3193        assert_eq!(claimed.len(), 1);
3194        assert_eq!(claimed[0].row.id, id1); // Higher priority claimed first
3195        assert_eq!(claimed[0].row.status, "running");
3196        assert_eq!(claimed[0].payload["memory_id"], 1);
3197
3198        // Second claim should get the lower-priority job.
3199        let claimed2 = repo
3200            .claim_jobs(ns_id, "derive_memory", "worker-2", 120, 1)
3201            .await
3202            .unwrap();
3203
3204        assert_eq!(claimed2.len(), 1);
3205        assert_eq!(claimed2[0].row.id, id2);
3206    }
3207
3208    #[tokio::test]
3209    async fn test_complete_and_fail_job() {
3210        let pool = setup_test_db().await;
3211        let ns_id = create_namespace(&pool, "test-agent").await;
3212        let repo = MemoryRepository::new(pool);
3213
3214        let _id = repo
3215            .enqueue_job(EnqueueJobParams {
3216                namespace_id: ns_id,
3217                job_type: "digest_session",
3218                priority: 100,
3219                perspective: None,
3220                payload: &serde_json::json!({"session": "s1"}),
3221            })
3222            .await
3223            .unwrap();
3224
3225        // Claim then complete.
3226        let claimed = repo
3227            .claim_jobs(ns_id, "digest_session", "w", 60, 10)
3228            .await
3229            .unwrap();
3230        assert_eq!(claimed.len(), 1);
3231
3232        repo.complete_job(&claimed[0]).await.unwrap();
3233
3234        // Verify completed status by attempting to claim — should be empty.
3235        let claimed_again = repo
3236            .claim_jobs(ns_id, "digest_session", "w", 60, 10)
3237            .await
3238            .unwrap();
3239        assert!(claimed_again.is_empty());
3240    }
3241
3242    #[tokio::test]
3243    async fn test_fail_job_requeues_before_limit() {
3244        let pool = setup_test_db().await;
3245        let ns_id = create_namespace(&pool, "test-agent").await;
3246        let repo = MemoryRepository::new(pool);
3247
3248        let _id = repo
3249            .enqueue_job(EnqueueJobParams {
3250                namespace_id: ns_id,
3251                job_type: "derive_memory",
3252                priority: 100,
3253                perspective: None,
3254                payload: &serde_json::json!({"test": true}),
3255            })
3256            .await
3257            .unwrap();
3258
3259        // Claim, fail, re-claim (should work).
3260        let claimed = repo
3261            .claim_jobs(ns_id, "derive_memory", "w", 60, 10)
3262            .await
3263            .unwrap();
3264        repo.fail_job(&claimed[0], "transient error").await.unwrap();
3265
3266        let reclaimed = repo
3267            .claim_jobs(ns_id, "derive_memory", "w", 60, 10)
3268            .await
3269            .unwrap();
3270        assert_eq!(reclaimed.len(), 1);
3271        assert_eq!(reclaimed[0].row.attempts, 2);
3272    }
3273
3274    #[tokio::test]
3275    async fn test_complete_job_requires_matching_claim_token() {
3276        let pool = setup_test_db().await;
3277        let ns_id = create_namespace(&pool, "test-agent").await;
3278        let repo = MemoryRepository::new(pool);
3279
3280        repo.enqueue_job(EnqueueJobParams {
3281            namespace_id: ns_id,
3282            job_type: "derive_memory",
3283            priority: 100,
3284            perspective: None,
3285            payload: &serde_json::json!({"memory_id": 7}),
3286        })
3287        .await
3288        .unwrap();
3289
3290        let claimed = repo
3291            .claim_jobs(ns_id, "derive_memory", "worker-1", 60, 1)
3292            .await
3293            .unwrap();
3294        let mut forged = claimed[0].clone();
3295        forged.row.claim_token = Some("forged-token".to_string());
3296
3297        let error = repo.complete_job(&forged).await.unwrap_err();
3298        assert!(error.to_string().contains("lost lease ownership"));
3299    }
3300
3301    #[tokio::test]
3302    async fn test_store_digest_and_latest_digest() {
3303        let pool = setup_test_db().await;
3304        let ns_id = create_namespace(&pool, "test-agent").await;
3305        let repo = MemoryRepository::new(pool);
3306
3307        // Store a memory that will serve as the digest content.
3308        let digest_memory = repo
3309            .store(StoreMemoryParams {
3310                namespace_id: ns_id,
3311                content: "session summary short",
3312                category: &Category::Session,
3313                memory_lane_type: None,
3314                labels: &[],
3315                metadata: &serde_json::json!({"cognitive": {"level": "summary_short"}}),
3316                embedding: None,
3317                embedding_model: None,
3318            })
3319            .await
3320            .unwrap();
3321
3322        let digest_id = repo
3323            .store_digest(StoreDigestParams {
3324                namespace_id: ns_id,
3325                session_key: "session-abc",
3326                digest_kind: "short",
3327                memory_id: digest_memory.id,
3328                start_memory_id: Some(1),
3329                end_memory_id: Some(100),
3330                token_count: 42,
3331            })
3332            .await
3333            .unwrap();
3334
3335        assert!(digest_id > 0);
3336
3337        // Retrieve latest digest.
3338        let result = repo
3339            .latest_digest_for_session(ns_id, "session-abc", "short")
3340            .await
3341            .unwrap();
3342
3343        assert!(result.is_some());
3344        assert_eq!(result.as_ref().unwrap().id, digest_memory.id);
3345
3346        let replacement_memory = repo
3347            .store(StoreMemoryParams {
3348                namespace_id: ns_id,
3349                content: "session summary short updated",
3350                category: &Category::Session,
3351                memory_lane_type: None,
3352                labels: &[],
3353                metadata: &serde_json::json!({"cognitive": {"level": "summary_short"}}),
3354                embedding: None,
3355                embedding_model: None,
3356            })
3357            .await
3358            .unwrap();
3359
3360        let replacement_digest_id = repo
3361            .store_digest(StoreDigestParams {
3362                namespace_id: ns_id,
3363                session_key: "session-abc",
3364                digest_kind: "short",
3365                memory_id: replacement_memory.id,
3366                start_memory_id: Some(1),
3367                end_memory_id: Some(100),
3368                token_count: 64,
3369            })
3370            .await
3371            .unwrap();
3372
3373        assert_eq!(replacement_digest_id, digest_id);
3374
3375        let updated = repo
3376            .latest_digest_for_session(ns_id, "session-abc", "short")
3377            .await
3378            .unwrap()
3379            .unwrap();
3380        assert_eq!(updated.id, replacement_memory.id);
3381
3382        let latest_for_namespace = repo
3383            .latest_digest_for_namespace(ns_id, "short")
3384            .await
3385            .unwrap()
3386            .unwrap();
3387        assert_eq!(latest_for_namespace.id, replacement_memory.id);
3388    }
3389
3390    #[tokio::test]
3391    async fn test_session_digest_rollover_reports_new_signal_since_last_digest() {
3392        let pool = setup_test_db().await;
3393        let ns_id = create_namespace(&pool, "test-agent").await;
3394        let repo = MemoryRepository::new(pool);
3395
3396        let source = repo
3397            .store(StoreMemoryParams {
3398                namespace_id: ns_id,
3399                content: "Implemented bounded digest rollover policy.",
3400                category: &Category::Session,
3401                memory_lane_type: None,
3402                labels: &[],
3403                metadata: &serde_json::json!({
3404                    "cognitive": {
3405                        "level": "explicit",
3406                        "observer": "claude-code",
3407                        "subject": "claude-code",
3408                        "session_key": "session-rollover"
3409                    }
3410                }),
3411                embedding: None,
3412                embedding_model: None,
3413            })
3414            .await
3415            .unwrap();
3416
3417        let first = repo
3418            .session_digest_rollover(ns_id, "session-rollover")
3419            .await
3420            .unwrap();
3421        assert_eq!(first.last_digest_end_memory_id, None);
3422        assert_eq!(first.new_memory_count, 1);
3423        assert!(first.estimated_new_tokens > 0);
3424
3425        let digest_memory = repo
3426            .store(StoreMemoryParams {
3427                namespace_id: ns_id,
3428                content: "Short digest",
3429                category: &Category::Session,
3430                memory_lane_type: None,
3431                labels: &[],
3432                metadata: &serde_json::json!({
3433                    "cognitive": {
3434                        "level": "summary_short",
3435                        "observer": "claude-code",
3436                        "subject": "claude-code",
3437                        "session_key": "session-rollover"
3438                    }
3439                }),
3440                embedding: None,
3441                embedding_model: None,
3442            })
3443            .await
3444            .unwrap();
3445
3446        repo.store_digest(StoreDigestParams {
3447            namespace_id: ns_id,
3448            session_key: "session-rollover",
3449            digest_kind: "short",
3450            memory_id: digest_memory.id,
3451            start_memory_id: Some(source.id),
3452            end_memory_id: Some(source.id),
3453            token_count: 16,
3454        })
3455        .await
3456        .unwrap();
3457
3458        let covered = repo
3459            .session_digest_rollover(ns_id, "session-rollover")
3460            .await
3461            .unwrap();
3462        assert_eq!(covered.last_digest_end_memory_id, Some(source.id));
3463        assert_eq!(covered.new_memory_count, 0);
3464        assert_eq!(covered.estimated_new_tokens, 0);
3465
3466        repo.store(StoreMemoryParams {
3467            namespace_id: ns_id,
3468            content: "Added one more explicit memory after the digest coverage window.",
3469            category: &Category::Session,
3470            memory_lane_type: None,
3471            labels: &[],
3472            metadata: &serde_json::json!({
3473                "cognitive": {
3474                    "level": "explicit",
3475                    "observer": "claude-code",
3476                    "subject": "claude-code",
3477                    "session_key": "session-rollover"
3478                }
3479            }),
3480            embedding: None,
3481            embedding_model: None,
3482        })
3483        .await
3484        .unwrap();
3485
3486        let second = repo
3487            .session_digest_rollover(ns_id, "session-rollover")
3488            .await
3489            .unwrap();
3490        assert_eq!(second.last_digest_end_memory_id, Some(source.id));
3491        assert_eq!(second.new_memory_count, 1);
3492        assert!(second.estimated_new_tokens > 0);
3493    }
3494
3495    #[tokio::test]
3496    async fn test_store_with_lineage() {
3497        let pool = setup_test_db().await;
3498        let ns_id = create_namespace(&pool, "test-agent").await;
3499        let repo = MemoryRepository::new(pool);
3500
3501        // Store two source memories.
3502        let source1 = repo
3503            .store(StoreMemoryParams {
3504                namespace_id: ns_id,
3505                content: "raw observation one",
3506                category: &Category::Facts,
3507                memory_lane_type: None,
3508                labels: &[],
3509                metadata: &serde_json::Value::Null,
3510                embedding: None,
3511                embedding_model: None,
3512            })
3513            .await
3514            .unwrap();
3515
3516        let source2 = repo
3517            .store(StoreMemoryParams {
3518                namespace_id: ns_id,
3519                content: "raw observation two",
3520                category: &Category::Facts,
3521                memory_lane_type: None,
3522                labels: &[],
3523                metadata: &serde_json::Value::Null,
3524                embedding: None,
3525                embedding_model: None,
3526            })
3527            .await
3528            .unwrap();
3529
3530        // Store derived with lineage.
3531        let derived = repo
3532            .store_with_lineage(StoreMemoryWithLineageParams {
3533                store: StoreMemoryParams {
3534                    namespace_id: ns_id,
3535                    content: "derived insight",
3536                    category: &Category::Facts,
3537                    memory_lane_type: None,
3538                    labels: &[],
3539                    metadata: &serde_json::json!({"cognitive": {"level": "derived"}}),
3540                    embedding: None,
3541                    embedding_model: None,
3542                },
3543                source_memory_ids: &[source1.id, source2.id],
3544                evidence_role: "derived_from",
3545            })
3546            .await
3547            .unwrap();
3548
3549        assert_eq!(derived.content, "derived insight");
3550
3551        // Load lineage for the derived memory.
3552        let lineage = repo.load_lineage(derived.id).await.unwrap();
3553        assert_eq!(lineage.len(), 2);
3554        assert!(lineage.iter().any(|e| e.source_memory_id == source1.id));
3555        assert!(lineage.iter().any(|e| e.source_memory_id == source2.id));
3556    }
3557
3558    #[tokio::test]
3559    async fn test_cognitive_queries_by_level_and_perspective() {
3560        let pool = setup_test_db().await;
3561        let ns_id = create_namespace(&pool, "test-agent").await;
3562        let repo = MemoryRepository::new(pool);
3563        let perspective =
3564            PerspectiveKey::new("claude-code", "claude-code", Some("session-1".into()));
3565
3566        let _raw = repo
3567            .store(StoreMemoryParams {
3568                namespace_id: ns_id,
3569                content: "raw note",
3570                category: &Category::Session,
3571                memory_lane_type: None,
3572                labels: &[],
3573                metadata: &cognitive_metadata(CognitiveLevel::Raw, &perspective, 0, 0),
3574                embedding: None,
3575                embedding_model: None,
3576            })
3577            .await
3578            .unwrap();
3579
3580        let explicit = repo
3581            .store(StoreMemoryParams {
3582                namespace_id: ns_id,
3583                content: "explicit note",
3584                category: &Category::Session,
3585                memory_lane_type: None,
3586                labels: &[],
3587                metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective, 1, 0),
3588                embedding: None,
3589                embedding_model: None,
3590            })
3591            .await
3592            .unwrap();
3593
3594        let derived = repo
3595            .store(StoreMemoryParams {
3596                namespace_id: ns_id,
3597                content: "reinforced insight",
3598                category: &Category::Facts,
3599                memory_lane_type: None,
3600                labels: &[],
3601                metadata: &cognitive_metadata(CognitiveLevel::Derived, &perspective, 7, 0),
3602                embedding: None,
3603                embedding_model: None,
3604            })
3605            .await
3606            .unwrap();
3607
3608        let contradiction = repo
3609            .store(StoreMemoryParams {
3610                namespace_id: ns_id,
3611                content: "contradiction note",
3612                category: &Category::Facts,
3613                memory_lane_type: None,
3614                labels: &[],
3615                metadata: &cognitive_metadata(CognitiveLevel::Contradiction, &perspective, 1, 5),
3616                embedding: None,
3617                embedding_model: None,
3618            })
3619            .await
3620            .unwrap();
3621
3622        let explicit_rows = repo
3623            .get_by_cognitive_level(ns_id, CognitiveLevel::Explicit, 10)
3624            .await
3625            .unwrap();
3626        assert_eq!(explicit_rows.len(), 1);
3627        assert_eq!(explicit_rows[0].id, explicit.id);
3628
3629        let recent = repo
3630            .get_recent_by_perspective(ns_id, &perspective, 10)
3631            .await
3632            .unwrap();
3633        assert_eq!(recent.len(), 4);
3634
3635        let reinforced = repo
3636            .get_most_reinforced_by_perspective(ns_id, &perspective, 10)
3637            .await
3638            .unwrap();
3639        assert_eq!(reinforced[0].id, derived.id);
3640        assert!(reinforced
3641            .iter()
3642            .all(|memory| memory.id != contradiction.id));
3643
3644        let contradictions = repo
3645            .get_contradictions_by_perspective(ns_id, &perspective, 10)
3646            .await
3647            .unwrap();
3648        assert_eq!(contradictions.len(), 1);
3649        assert_eq!(contradictions[0].id, contradiction.id);
3650    }
3651
3652    #[tokio::test]
3653    async fn test_store_distilled_summary_archives_sources_and_records_lineage() {
3654        let pool = setup_test_db().await;
3655        let ns_id = create_namespace(&pool, "test-agent").await;
3656        let repo = MemoryRepository::new(pool);
3657
3658        let source1 = repo
3659            .store(StoreMemoryParams {
3660                namespace_id: ns_id,
3661                content: "raw event 1",
3662                category: &Category::Session,
3663                memory_lane_type: None,
3664                labels: &["raw-activity".to_string()],
3665                metadata: &serde_json::json!({"raw_activity": true}),
3666                embedding: None,
3667                embedding_model: None,
3668            })
3669            .await
3670            .unwrap();
3671
3672        let source2 = repo
3673            .store(StoreMemoryParams {
3674                namespace_id: ns_id,
3675                content: "raw event 2",
3676                category: &Category::Session,
3677                memory_lane_type: None,
3678                labels: &["raw-activity".to_string()],
3679                metadata: &serde_json::json!({"raw_activity": true}),
3680                embedding: None,
3681                embedding_model: None,
3682            })
3683            .await
3684            .unwrap();
3685
3686        let summary = repo
3687            .store_distilled_summary(
3688                StoreMemoryParams {
3689                    namespace_id: ns_id,
3690                    content: "distilled summary",
3691                    category: &Category::Session,
3692                    memory_lane_type: None,
3693                    labels: &["activity-summary".to_string()],
3694                    metadata: &serde_json::json!({"pipeline": "distill-v1"}),
3695                    embedding: None,
3696                    embedding_model: None,
3697                },
3698                &[source1.id, source2.id],
3699            )
3700            .await
3701            .unwrap();
3702
3703        let source1_after = repo.get_by_id(source1.id).await.unwrap().unwrap();
3704        let source2_after = repo.get_by_id(source2.id).await.unwrap().unwrap();
3705        assert!(!source1_after.is_active);
3706        assert!(source1_after.is_archived);
3707        assert!(!source2_after.is_active);
3708        assert!(source2_after.is_archived);
3709
3710        let lineage = repo.load_lineage(summary.id).await.unwrap();
3711        assert_eq!(lineage.len(), 2);
3712        assert!(lineage.iter().all(|entry| entry.evidence_role == "source"));
3713    }
3714
3715    #[tokio::test]
3716    async fn test_load_lineage_empty() {
3717        let pool = setup_test_db().await;
3718        let _ns_id = create_namespace(&pool, "test-agent").await;
3719        let repo = MemoryRepository::new(pool);
3720
3721        let lineage = repo.load_lineage(9999).await.unwrap();
3722        assert!(lineage.is_empty());
3723    }
3724
3725    // ---- Raw-noise exclusion tests ----
3726
3727    #[tokio::test]
3728    async fn test_recent_perspective_excludes_raw_noise() {
3729        let pool = setup_test_db().await;
3730        let ns_id = create_namespace(&pool, "test-agent").await;
3731        let repo = MemoryRepository::new(pool);
3732        let perspective = PerspectiveKey::new("claude-code", "claude-code", Some("s1".into()));
3733
3734        // Store a clean memory.
3735        repo.store(StoreMemoryParams {
3736            namespace_id: ns_id,
3737            content: "clean observation",
3738            category: &Category::Facts,
3739            memory_lane_type: None,
3740            labels: &[],
3741            metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective, 0, 0),
3742            embedding: None,
3743            embedding_model: None,
3744        })
3745        .await
3746        .unwrap();
3747
3748        // Store a raw-activity noise memory (both label and metadata markers).
3749        repo.store(StoreMemoryParams {
3750            namespace_id: ns_id,
3751            content: "raw noise payload",
3752            category: &Category::Session,
3753            memory_lane_type: None,
3754            labels: &["raw-activity".to_string()],
3755            metadata: &serde_json::json!({
3756                "raw_activity": true,
3757                "cognitive": {
3758                    "level": "raw",
3759                    "observer": perspective.observer,
3760                    "subject": perspective.subject,
3761                    "session_key": perspective.session_key,
3762                    "source_memory_ids": [],
3763                    "confidence": 0.5,
3764                    "times_reinforced": 0,
3765                    "times_contradicted": 0,
3766                    "generated_by": "test"
3767                }
3768            }),
3769            embedding: None,
3770            embedding_model: None,
3771        })
3772        .await
3773        .unwrap();
3774
3775        // Default query should exclude the noise.
3776        let recent = repo
3777            .get_recent_by_perspective(ns_id, &perspective, 10)
3778            .await
3779            .unwrap();
3780        assert_eq!(recent.len(), 1);
3781        assert_eq!(recent[0].content, "clean observation");
3782
3783        // With include_raw, both should appear.
3784        let recent_all = repo
3785            .get_recent_by_perspective_opts(ns_id, &perspective, 10, true)
3786            .await
3787            .unwrap();
3788        assert_eq!(recent_all.len(), 2);
3789    }
3790
3791    #[tokio::test]
3792    async fn test_semantic_candidates_respect_perspective_and_raw_noise_filtering() {
3793        let pool = setup_test_db().await;
3794        let ns_id = create_namespace(&pool, "test-agent").await;
3795        let repo = MemoryRepository::new(pool);
3796        let perspective = PerspectiveKey::new("claude-code", "claude-code", Some("s1".into()));
3797
3798        repo.store(StoreMemoryParams {
3799            namespace_id: ns_id,
3800            content: "clean semantic observation",
3801            category: &Category::Facts,
3802            memory_lane_type: None,
3803            labels: &[],
3804            metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective, 0, 0),
3805            embedding: Some(&[0.1_f32; 384]),
3806            embedding_model: Some("mock"),
3807        })
3808        .await
3809        .unwrap();
3810
3811        repo.store(StoreMemoryParams {
3812            namespace_id: ns_id,
3813            content: "raw semantic noise",
3814            category: &Category::Session,
3815            memory_lane_type: None,
3816            labels: &["raw-activity".to_string()],
3817            metadata: &serde_json::json!({
3818                "raw_activity": true,
3819                "cognitive": {
3820                    "level": "raw",
3821                    "observer": "claude-code",
3822                    "subject": "claude-code",
3823                    "session_key": "s1",
3824                    "generated_by": "test"
3825                }
3826            }),
3827            embedding: Some(&[0.2_f32; 384]),
3828            embedding_model: Some("mock"),
3829        })
3830        .await
3831        .unwrap();
3832
3833        repo.store(StoreMemoryParams {
3834            namespace_id: ns_id,
3835            content: "other perspective semantic",
3836            category: &Category::Facts,
3837            memory_lane_type: None,
3838            labels: &[],
3839            metadata: &serde_json::json!({
3840                "cognitive": {
3841                    "level": "explicit",
3842                    "observer": "codex",
3843                    "subject": "codex",
3844                    "session_key": "s1",
3845                    "generated_by": "test"
3846                }
3847            }),
3848            embedding: Some(&[0.3_f32; 384]),
3849            embedding_model: Some("mock"),
3850        })
3851        .await
3852        .unwrap();
3853
3854        let candidates = repo
3855            .get_semantic_candidates(SemanticCandidateParams {
3856                namespace_id: ns_id,
3857                perspective: Some(&perspective),
3858                limit: 10,
3859                include_raw: false,
3860            })
3861            .await
3862            .unwrap();
3863
3864        assert_eq!(candidates.len(), 1);
3865        assert_eq!(candidates[0].content, "clean semantic observation");
3866    }
3867
3868    #[tokio::test]
3869    async fn test_semantic_candidates_match_session_keys_array() {
3870        let pool = setup_test_db().await;
3871        let ns_id = create_namespace(&pool, "test-agent").await;
3872        let repo = MemoryRepository::new(pool);
3873        let perspective = PerspectiveKey::new("claude-code", "claude-code", Some("s-array".into()));
3874
3875        repo.store(StoreMemoryParams {
3876            namespace_id: ns_id,
3877            content: "session array semantic observation",
3878            category: &Category::Facts,
3879            memory_lane_type: None,
3880            labels: &[],
3881            metadata: &serde_json::json!({
3882                "cognitive": {
3883                    "level": "explicit",
3884                    "observer": "claude-code",
3885                    "subject": "claude-code",
3886                    "session_keys": ["s-array", "s-other"],
3887                    "generated_by": "test"
3888                }
3889            }),
3890            embedding: Some(&[0.4_f32; 384]),
3891            embedding_model: Some("mock"),
3892        })
3893        .await
3894        .unwrap();
3895
3896        let candidates = repo
3897            .get_semantic_candidates(SemanticCandidateParams {
3898                namespace_id: ns_id,
3899                perspective: Some(&perspective),
3900                limit: 10,
3901                include_raw: false,
3902            })
3903            .await
3904            .unwrap();
3905
3906        assert_eq!(candidates.len(), 1);
3907        assert_eq!(candidates[0].content, "session array semantic observation");
3908    }
3909
3910    #[tokio::test]
3911    async fn test_reinforced_perspective_excludes_raw_noise() {
3912        let pool = setup_test_db().await;
3913        let ns_id = create_namespace(&pool, "test-agent").await;
3914        let repo = MemoryRepository::new(pool);
3915        let perspective = PerspectiveKey::new("claude-code", "claude-code", Some("s1".into()));
3916
3917        repo.store(StoreMemoryParams {
3918            namespace_id: ns_id,
3919            content: "reinforced insight",
3920            category: &Category::Facts,
3921            memory_lane_type: None,
3922            labels: &[],
3923            metadata: &cognitive_metadata(CognitiveLevel::Derived, &perspective, 5, 0),
3924            embedding: None,
3925            embedding_model: None,
3926        })
3927        .await
3928        .unwrap();
3929
3930        repo.store(StoreMemoryParams {
3931            namespace_id: ns_id,
3932            content: "raw noise",
3933            category: &Category::Session,
3934            memory_lane_type: None,
3935            labels: &["raw-activity".to_string()],
3936            metadata: &serde_json::json!({
3937                "raw_activity": true,
3938                "cognitive": {
3939                    "level": "raw",
3940                    "observer": perspective.observer,
3941                    "subject": perspective.subject,
3942                    "session_key": perspective.session_key,
3943                    "source_memory_ids": [],
3944                    "confidence": 0.5,
3945                    "times_reinforced": 0,
3946                    "times_contradicted": 0,
3947                    "generated_by": "test"
3948                }
3949            }),
3950            embedding: None,
3951            embedding_model: None,
3952        })
3953        .await
3954        .unwrap();
3955
3956        let reinforced = repo
3957            .get_most_reinforced_by_perspective(ns_id, &perspective, 10)
3958            .await
3959            .unwrap();
3960        assert_eq!(reinforced.len(), 1);
3961        assert_eq!(reinforced[0].content, "reinforced insight");
3962    }
3963
3964    #[tokio::test]
3965    async fn test_contradictions_perspective_excludes_raw_noise() {
3966        let pool = setup_test_db().await;
3967        let ns_id = create_namespace(&pool, "test-agent").await;
3968        let repo = MemoryRepository::new(pool);
3969        let perspective = PerspectiveKey::new("claude-code", "claude-code", Some("s1".into()));
3970
3971        repo.store(StoreMemoryParams {
3972            namespace_id: ns_id,
3973            content: "a real contradiction",
3974            category: &Category::Facts,
3975            memory_lane_type: None,
3976            labels: &[],
3977            metadata: &cognitive_metadata(CognitiveLevel::Contradiction, &perspective, 0, 3),
3978            embedding: None,
3979            embedding_model: None,
3980        })
3981        .await
3982        .unwrap();
3983
3984        repo.store(StoreMemoryParams {
3985            namespace_id: ns_id,
3986            content: "raw noise",
3987            category: &Category::Session,
3988            memory_lane_type: None,
3989            labels: &["raw-activity".to_string()],
3990            metadata: &serde_json::json!({
3991                "raw_activity": true,
3992                "cognitive": {
3993                    "level": "raw",
3994                    "observer": perspective.observer,
3995                    "subject": perspective.subject,
3996                    "session_key": perspective.session_key,
3997                    "source_memory_ids": [],
3998                    "confidence": 0.5,
3999                    "times_reinforced": 0,
4000                    "times_contradicted": 0,
4001                    "generated_by": "test"
4002                }
4003            }),
4004            embedding: None,
4005            embedding_model: None,
4006        })
4007        .await
4008        .unwrap();
4009
4010        let contradictions = repo
4011            .get_contradictions_by_perspective(ns_id, &perspective, 10)
4012            .await
4013            .unwrap();
4014        assert_eq!(contradictions.len(), 1);
4015        assert_eq!(contradictions[0].content, "a real contradiction");
4016    }
4017
4018    // ---- search_working_set tests ----
4019
4020    #[tokio::test]
4021    async fn test_search_working_set_basic() {
4022        let pool = setup_test_db().await;
4023        let ns_id = create_namespace(&pool, "test-agent").await;
4024        let repo = MemoryRepository::new(pool);
4025        let perspective = PerspectiveKey::new("claude-code", "claude-code", Some("s1".into()));
4026
4027        // Store memories across different cognitive levels.
4028        let _raw = repo
4029            .store(StoreMemoryParams {
4030                namespace_id: ns_id,
4031                content: "raw note",
4032                category: &Category::Session,
4033                memory_lane_type: None,
4034                labels: &[],
4035                metadata: &cognitive_metadata(CognitiveLevel::Raw, &perspective, 0, 0),
4036                embedding: None,
4037                embedding_model: None,
4038            })
4039            .await
4040            .unwrap();
4041
4042        let explicit = repo
4043            .store(StoreMemoryParams {
4044                namespace_id: ns_id,
4045                content: "explicit fact",
4046                category: &Category::Facts,
4047                memory_lane_type: None,
4048                labels: &[],
4049                metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective, 3, 0),
4050                embedding: None,
4051                embedding_model: None,
4052            })
4053            .await
4054            .unwrap();
4055
4056        let derived = repo
4057            .store(StoreMemoryParams {
4058                namespace_id: ns_id,
4059                content: "derived insight",
4060                category: &Category::Facts,
4061                memory_lane_type: None,
4062                labels: &[],
4063                metadata: &cognitive_metadata(CognitiveLevel::Derived, &perspective, 8, 0),
4064                embedding: None,
4065                embedding_model: None,
4066            })
4067            .await
4068            .unwrap();
4069
4070        let contradiction = repo
4071            .store(StoreMemoryParams {
4072                namespace_id: ns_id,
4073                content: "contradiction",
4074                category: &Category::Facts,
4075                memory_lane_type: None,
4076                labels: &[],
4077                metadata: &cognitive_metadata(CognitiveLevel::Contradiction, &perspective, 0, 2),
4078                embedding: None,
4079                embedding_model: None,
4080            })
4081            .await
4082            .unwrap();
4083
4084        let result = repo
4085            .search_working_set(WorkingSetParams {
4086                namespace_id: ns_id,
4087                perspective: Some(&perspective),
4088                max_items: 20,
4089                include_raw: false,
4090            })
4091            .await
4092            .unwrap();
4093
4094        // Should contain all non-noise memories. Order: reinforced first, then recent,
4095        // then contradictions.
4096        assert!(result.len() >= 3);
4097        let ids: Vec<i64> = result.iter().map(|m| m.id).collect();
4098        assert!(ids.contains(&explicit.id));
4099        assert!(ids.contains(&derived.id));
4100        assert!(ids.contains(&contradiction.id));
4101    }
4102
4103    #[tokio::test]
4104    async fn test_search_working_set_dedupes() {
4105        let pool = setup_test_db().await;
4106        let ns_id = create_namespace(&pool, "test-agent").await;
4107        let repo = MemoryRepository::new(pool);
4108        let perspective = PerspectiveKey::new("claude-code", "claude-code", Some("s1".into()));
4109
4110        // A memory that is both reinforced and recent should appear only once.
4111        let shared = repo
4112            .store(StoreMemoryParams {
4113                namespace_id: ns_id,
4114                content: "shared memory",
4115                category: &Category::Facts,
4116                memory_lane_type: None,
4117                labels: &[],
4118                metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective, 10, 0),
4119                embedding: None,
4120                embedding_model: None,
4121            })
4122            .await
4123            .unwrap();
4124
4125        let result = repo
4126            .search_working_set(WorkingSetParams {
4127                namespace_id: ns_id,
4128                perspective: Some(&perspective),
4129                max_items: 20,
4130                include_raw: false,
4131            })
4132            .await
4133            .unwrap();
4134
4135        let count = result.iter().filter(|m| m.id == shared.id).count();
4136        assert_eq!(count, 1, "shared memory should appear exactly once");
4137    }
4138
4139    #[tokio::test]
4140    async fn test_search_working_set_respects_max_items() {
4141        let pool = setup_test_db().await;
4142        let ns_id = create_namespace(&pool, "test-agent").await;
4143        let repo = MemoryRepository::new(pool);
4144        let perspective = PerspectiveKey::new("claude-code", "claude-code", Some("s1".into()));
4145
4146        for i in 0..10 {
4147            let content = format!("memory {}", i);
4148            repo.store(StoreMemoryParams {
4149                namespace_id: ns_id,
4150                content: &content,
4151                category: &Category::Facts,
4152                memory_lane_type: None,
4153                labels: &[],
4154                metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective, i as i64, 0),
4155                embedding: None,
4156                embedding_model: None,
4157            })
4158            .await
4159            .unwrap();
4160        }
4161
4162        let result = repo
4163            .search_working_set(WorkingSetParams {
4164                namespace_id: ns_id,
4165                perspective: Some(&perspective),
4166                max_items: 3,
4167                include_raw: false,
4168            })
4169            .await
4170            .unwrap();
4171
4172        assert_eq!(result.len(), 3);
4173    }
4174
4175    #[tokio::test]
4176    async fn test_search_working_set_excludes_raw_noise() {
4177        let pool = setup_test_db().await;
4178        let ns_id = create_namespace(&pool, "test-agent").await;
4179        let repo = MemoryRepository::new(pool);
4180        let perspective = PerspectiveKey::new("claude-code", "claude-code", Some("s1".into()));
4181
4182        repo.store(StoreMemoryParams {
4183            namespace_id: ns_id,
4184            content: "real observation",
4185            category: &Category::Facts,
4186            memory_lane_type: None,
4187            labels: &[],
4188            metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective, 1, 0),
4189            embedding: None,
4190            embedding_model: None,
4191        })
4192        .await
4193        .unwrap();
4194
4195        repo.store(StoreMemoryParams {
4196            namespace_id: ns_id,
4197            content: "raw noise",
4198            category: &Category::Session,
4199            memory_lane_type: None,
4200            labels: &["raw-activity".to_string()],
4201            metadata: &serde_json::json!({"raw_activity": true, "cognitive": {"level": "raw"}}),
4202            embedding: None,
4203            embedding_model: None,
4204        })
4205        .await
4206        .unwrap();
4207
4208        let result = repo
4209            .search_working_set(WorkingSetParams {
4210                namespace_id: ns_id,
4211                perspective: Some(&perspective),
4212                max_items: 20,
4213                include_raw: false,
4214            })
4215            .await
4216            .unwrap();
4217
4218        assert!(result.iter().all(|m| m.content != "raw noise"));
4219        assert!(result.iter().any(|m| m.content == "real observation"));
4220    }
4221
4222    #[tokio::test]
4223    async fn test_search_working_set_without_perspective() {
4224        let pool = setup_test_db().await;
4225        let ns_id = create_namespace(&pool, "test-agent").await;
4226        let repo = MemoryRepository::new(pool);
4227
4228        repo.store(StoreMemoryParams {
4229            namespace_id: ns_id,
4230            content: "namespace memory one",
4231            category: &Category::Facts,
4232            memory_lane_type: None,
4233            labels: &[],
4234            metadata: &serde_json::json!({"cognitive": {"level": "explicit"}}),
4235            embedding: None,
4236            embedding_model: None,
4237        })
4238        .await
4239        .unwrap();
4240
4241        repo.store(StoreMemoryParams {
4242            namespace_id: ns_id,
4243            content: "namespace memory two",
4244            category: &Category::Facts,
4245            memory_lane_type: None,
4246            labels: &[],
4247            metadata: &serde_json::json!({"cognitive": {"level": "explicit"}}),
4248            embedding: None,
4249            embedding_model: None,
4250        })
4251        .await
4252        .unwrap();
4253
4254        let result = repo
4255            .search_working_set(WorkingSetParams {
4256                namespace_id: ns_id,
4257                perspective: None,
4258                max_items: 20,
4259                include_raw: false,
4260            })
4261            .await
4262            .unwrap();
4263
4264        assert!(result.len() >= 2);
4265    }
4266
4267    #[tokio::test]
4268    async fn test_list_by_session_key_matches_session_keys_array() {
4269        let pool = setup_test_db().await;
4270        let ns_id = create_namespace(&pool, "test-agent").await;
4271        let repo = MemoryRepository::new(pool);
4272
4273        repo.store(StoreMemoryParams {
4274            namespace_id: ns_id,
4275            content: "shared explicit memory",
4276            category: &Category::Facts,
4277            memory_lane_type: None,
4278            labels: &[],
4279            metadata: &serde_json::json!({
4280                "cognitive": {
4281                    "level": "explicit",
4282                    "session_key": "session-b",
4283                    "session_keys": ["session-a", "session-b"]
4284                }
4285            }),
4286            embedding: None,
4287            embedding_model: None,
4288        })
4289        .await
4290        .unwrap();
4291
4292        let session_a = repo
4293            .list_by_session_key(ns_id, "session-a", 10, false)
4294            .await
4295            .unwrap();
4296        let session_b = repo
4297            .list_by_session_key(ns_id, "session-b", 10, false)
4298            .await
4299            .unwrap();
4300
4301        assert_eq!(session_a.len(), 1);
4302        assert_eq!(session_b.len(), 1);
4303    }
4304
4305    #[tokio::test]
4306    async fn test_count_evidence_returns_zero_for_empty_namespace() {
4307        let pool = setup_test_db().await;
4308        let ns_id = create_namespace(&pool, "test-agent").await;
4309        let repo = MemoryRepository::new(pool);
4310
4311        let count = repo.count_evidence(ns_id).await.unwrap();
4312        assert_eq!(count, 0);
4313    }
4314
4315    #[tokio::test]
4316    async fn test_count_evidence_counts_lineage_edges() {
4317        let pool = setup_test_db().await;
4318        let ns_id = create_namespace(&pool, "test-agent").await;
4319        let repo = MemoryRepository::new(pool);
4320
4321        let source = repo
4322            .store(StoreMemoryParams {
4323                namespace_id: ns_id,
4324                content: "source memory",
4325                category: &Category::Session,
4326                memory_lane_type: None,
4327                labels: &[],
4328                metadata: &serde_json::json!({}),
4329                embedding: None,
4330                embedding_model: None,
4331            })
4332            .await
4333            .unwrap();
4334
4335        let _derived = repo
4336            .store_with_lineage(StoreMemoryWithLineageParams {
4337                store: StoreMemoryParams {
4338                    namespace_id: ns_id,
4339                    content: "derived with evidence",
4340                    category: &Category::Facts,
4341                    memory_lane_type: None,
4342                    labels: &[],
4343                    metadata: &serde_json::json!({"cognitive": {"level": "derived"}}),
4344                    embedding: None,
4345                    embedding_model: None,
4346                },
4347                source_memory_ids: &[source.id],
4348                evidence_role: "source",
4349            })
4350            .await
4351            .unwrap();
4352
4353        let count = repo.count_evidence(ns_id).await.unwrap();
4354        assert_eq!(count, 1);
4355    }
4356
4357    #[tokio::test]
4358    async fn test_count_evidence_does_not_count_other_namespace() {
4359        let pool = setup_test_db().await;
4360        let ns_a = create_namespace(&pool, "agent-a").await;
4361        let ns_b = create_namespace(&pool, "agent-b").await;
4362        let repo = MemoryRepository::new(pool);
4363
4364        let source = repo
4365            .store(StoreMemoryParams {
4366                namespace_id: ns_a,
4367                content: "source in ns-a",
4368                category: &Category::Session,
4369                memory_lane_type: None,
4370                labels: &[],
4371                metadata: &serde_json::json!({}),
4372                embedding: None,
4373                embedding_model: None,
4374            })
4375            .await
4376            .unwrap();
4377
4378        let _derived = repo
4379            .store_with_lineage(StoreMemoryWithLineageParams {
4380                store: StoreMemoryParams {
4381                    namespace_id: ns_a,
4382                    content: "derived in ns-a",
4383                    category: &Category::Facts,
4384                    memory_lane_type: None,
4385                    labels: &[],
4386                    metadata: &serde_json::json!({}),
4387                    embedding: None,
4388                    embedding_model: None,
4389                },
4390                source_memory_ids: &[source.id],
4391                evidence_role: "source",
4392            })
4393            .await
4394            .unwrap();
4395
4396        assert_eq!(repo.count_evidence(ns_a).await.unwrap(), 1);
4397        assert_eq!(repo.count_evidence(ns_b).await.unwrap(), 0);
4398    }
4399
4400    #[tokio::test]
4401    async fn test_count_by_cognitive_level_returns_matching_total() {
4402        let pool = setup_test_db().await;
4403        let ns_id = create_namespace(&pool, "level-counts").await;
4404        let repo = MemoryRepository::new(pool);
4405
4406        for (content, level) in [
4407            ("raw event", CognitiveLevel::Raw),
4408            ("derived insight", CognitiveLevel::Derived),
4409            ("derived insight 2", CognitiveLevel::Derived),
4410            ("contradiction note", CognitiveLevel::Contradiction),
4411        ] {
4412            repo.store(StoreMemoryParams {
4413                namespace_id: ns_id,
4414                content,
4415                category: &Category::Session,
4416                memory_lane_type: None,
4417                labels: &[],
4418                metadata: &serde_json::json!({
4419                    "cognitive": {
4420                        "level": level.as_str(),
4421                        "observer": "claude-code",
4422                        "subject": "claude-code",
4423                        "generated_by": "test"
4424                    }
4425                }),
4426                embedding: None,
4427                embedding_model: None,
4428            })
4429            .await
4430            .unwrap();
4431        }
4432
4433        assert_eq!(
4434            repo.count_by_cognitive_level(ns_id, CognitiveLevel::Derived)
4435                .await
4436                .unwrap(),
4437            2
4438        );
4439        assert_eq!(
4440            repo.count_by_cognitive_level(ns_id, CognitiveLevel::Contradiction)
4441                .await
4442                .unwrap(),
4443            1
4444        );
4445    }
4446
4447    /// Regression test: `get_by_cognitive_level_with_perspective` must apply
4448    /// perspective filtering in SQL BEFORE the LIMIT, so callers receive up to
4449    /// `limit` matching results instead of silently getting fewer.
4450    #[tokio::test]
4451    async fn test_get_by_cognitive_level_with_perspective_filters_before_limit() {
4452        let pool = setup_test_db().await;
4453        let ns_id = create_namespace(&pool, "perspective-limit").await;
4454        let repo = MemoryRepository::new(pool);
4455
4456        let perspective_a = PerspectiveKey::new("alice", "project-x", None);
4457        let perspective_b = PerspectiveKey::new("bob", "project-y", None);
4458
4459        // Insert 5 memories for alice + project-x at Explicit level.
4460        for i in 0..5 {
4461            repo.store(StoreMemoryParams {
4462                namespace_id: ns_id,
4463                content: &format!("alice memory {}", i),
4464                category: &Category::Facts,
4465                memory_lane_type: None,
4466                labels: &[],
4467                metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective_a, 0, 0),
4468                embedding: None,
4469                embedding_model: None,
4470            })
4471            .await
4472            .unwrap();
4473        }
4474
4475        // Insert 5 memories for bob + project-y at Explicit level.
4476        for i in 0..5 {
4477            repo.store(StoreMemoryParams {
4478                namespace_id: ns_id,
4479                content: &format!("bob memory {}", i),
4480                category: &Category::Facts,
4481                memory_lane_type: None,
4482                labels: &[],
4483                metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective_b, 0, 0),
4484                embedding: None,
4485                embedding_model: None,
4486            })
4487            .await
4488            .unwrap();
4489        }
4490
4491        // Request 3 results for alice's perspective; should get exactly 3, not fewer.
4492        let alice_results = repo
4493            .get_by_cognitive_level_with_perspective(
4494                ns_id,
4495                CognitiveLevel::Explicit,
4496                &perspective_a,
4497                3,
4498            )
4499            .await
4500            .unwrap();
4501        assert_eq!(alice_results.len(), 3);
4502        assert!(alice_results.iter().all(|m| {
4503            let meta = &m.metadata;
4504            let obs = meta
4505                .get("cognitive")
4506                .and_then(|c| c.get("observer"))
4507                .and_then(|v| v.as_str());
4508            let sub = meta
4509                .get("cognitive")
4510                .and_then(|c| c.get("subject"))
4511                .and_then(|v| v.as_str());
4512            obs == Some("alice") && sub == Some("project-x")
4513        }));
4514
4515        // Request 10 results for alice; there are only 5, so capped at 5.
4516        let alice_many = repo
4517            .get_by_cognitive_level_with_perspective(
4518                ns_id,
4519                CognitiveLevel::Explicit,
4520                &perspective_a,
4521                10,
4522            )
4523            .await
4524            .unwrap();
4525        assert_eq!(alice_many.len(), 5);
4526
4527        // Bob gets a separate set.
4528        let bob_results = repo
4529            .get_by_cognitive_level_with_perspective(
4530                ns_id,
4531                CognitiveLevel::Explicit,
4532                &perspective_b,
4533                3,
4534            )
4535            .await
4536            .unwrap();
4537        assert_eq!(bob_results.len(), 3);
4538        assert!(bob_results.iter().all(|m| {
4539            let meta = &m.metadata;
4540            let obs = meta
4541                .get("cognitive")
4542                .and_then(|c| c.get("observer"))
4543                .and_then(|v| v.as_str());
4544            let sub = meta
4545                .get("cognitive")
4546                .and_then(|c| c.get("subject"))
4547                .and_then(|v| v.as_str());
4548            obs == Some("bob") && sub == Some("project-y")
4549        }));
4550    }
4551
4552    /// Verifies that the scalar session_key field is respected in the SQL filter.
4553    #[tokio::test]
4554    async fn test_get_by_cognitive_level_with_perspective_respects_session_key() {
4555        let pool = setup_test_db().await;
4556        let ns_id = create_namespace(&pool, "session-key-scalar").await;
4557        let repo = MemoryRepository::new(pool);
4558
4559        let perspective_s1 =
4560            PerspectiveKey::new("alice", "project-x", Some("session-1".to_string()));
4561        let perspective_s2 =
4562            PerspectiveKey::new("alice", "project-x", Some("session-2".to_string()));
4563
4564        for i in 0..3 {
4565            repo.store(StoreMemoryParams {
4566                namespace_id: ns_id,
4567                content: &format!("s1 memory {}", i),
4568                category: &Category::Facts,
4569                memory_lane_type: None,
4570                labels: &[],
4571                metadata: &cognitive_metadata(CognitiveLevel::Derived, &perspective_s1, 0, 0),
4572                embedding: None,
4573                embedding_model: None,
4574            })
4575            .await
4576            .unwrap();
4577        }
4578        for i in 0..3 {
4579            repo.store(StoreMemoryParams {
4580                namespace_id: ns_id,
4581                content: &format!("s2 memory {}", i),
4582                category: &Category::Facts,
4583                memory_lane_type: None,
4584                labels: &[],
4585                metadata: &cognitive_metadata(CognitiveLevel::Derived, &perspective_s2, 0, 0),
4586                embedding: None,
4587                embedding_model: None,
4588            })
4589            .await
4590            .unwrap();
4591        }
4592
4593        let s1_results = repo
4594            .get_by_cognitive_level_with_perspective(
4595                ns_id,
4596                CognitiveLevel::Derived,
4597                &perspective_s1,
4598                10,
4599            )
4600            .await
4601            .unwrap();
4602        assert_eq!(s1_results.len(), 3);
4603        assert!(s1_results.iter().all(|m| m.content.starts_with("s1")));
4604
4605        let s2_results = repo
4606            .get_by_cognitive_level_with_perspective(
4607                ns_id,
4608                CognitiveLevel::Derived,
4609                &perspective_s2,
4610                10,
4611            )
4612            .await
4613            .unwrap();
4614        assert_eq!(s2_results.len(), 3);
4615        assert!(s2_results.iter().all(|m| m.content.starts_with("s2")));
4616    }
4617
4618    /// Verifies that memories matching only the session_keys array are included.
4619    #[tokio::test]
4620    async fn test_get_by_cognitive_level_with_perspective_matches_session_keys_array() {
4621        let pool = setup_test_db().await;
4622        let ns_id = create_namespace(&pool, "session-keys-array").await;
4623        let repo = MemoryRepository::new(pool);
4624
4625        let perspective = PerspectiveKey::new("alice", "project-x", Some("session-a".to_string()));
4626
4627        // Memory with session_key set to the same key as the perspective.
4628        repo.store(StoreMemoryParams {
4629            namespace_id: ns_id,
4630            content: "scalar match",
4631            category: &Category::Facts,
4632            memory_lane_type: None,
4633            labels: &[],
4634            metadata: &cognitive_metadata(CognitiveLevel::Explicit, &perspective, 0, 0),
4635            embedding: None,
4636            embedding_model: None,
4637        })
4638        .await
4639        .unwrap();
4640
4641        // Memory with session_keys array containing the key (but different scalar session_key).
4642        repo.store(StoreMemoryParams {
4643            namespace_id: ns_id,
4644            content: "array match",
4645            category: &Category::Facts,
4646            memory_lane_type: None,
4647            labels: &[],
4648            metadata: &serde_json::json!({
4649                "cognitive": {
4650                    "level": "explicit",
4651                    "observer": "alice",
4652                    "subject": "project-x",
4653                    "session_key": "session-other",
4654                    "session_keys": ["session-a", "session-b"],
4655                    "generated_by": "test"
4656                }
4657            }),
4658            embedding: None,
4659            embedding_model: None,
4660        })
4661        .await
4662        .unwrap();
4663
4664        // Memory that does not match at all.
4665        repo.store(StoreMemoryParams {
4666            namespace_id: ns_id,
4667            content: "no match",
4668            category: &Category::Facts,
4669            memory_lane_type: None,
4670            labels: &[],
4671            metadata: &serde_json::json!({
4672                "cognitive": {
4673                    "level": "explicit",
4674                    "observer": "alice",
4675                    "subject": "project-x",
4676                    "session_key": "session-other",
4677                    "session_keys": ["session-z"],
4678                    "generated_by": "test"
4679                }
4680            }),
4681            embedding: None,
4682            embedding_model: None,
4683        })
4684        .await
4685        .unwrap();
4686
4687        let results = repo
4688            .get_by_cognitive_level_with_perspective(
4689                ns_id,
4690                CognitiveLevel::Explicit,
4691                &perspective,
4692                10,
4693            )
4694            .await
4695            .unwrap();
4696        assert_eq!(results.len(), 2);
4697        let contents: Vec<_> = results.iter().map(|m| m.content.as_str()).collect();
4698        assert!(contents.contains(&"scalar match"));
4699        assert!(contents.contains(&"array match"));
4700    }
4701
4702    #[tokio::test]
4703    async fn test_record_metric_and_latest_metrics_for_namespace() {
4704        let pool = setup_test_db().await;
4705        let ns_id = create_namespace(&pool, "metric-ns").await;
4706        let other_ns = create_namespace(&pool, "metric-other").await;
4707        let repo = MemoryRepository::new(pool);
4708
4709        repo.record_metric(
4710            "cognition.query.total_ms",
4711            12.5,
4712            &serde_json::json!({"namespace_id": ns_id, "stage": "total", "unit": "ms"}),
4713        )
4714        .await
4715        .unwrap();
4716        repo.record_metric(
4717            "cognition.query.total_ms",
4718            18.0,
4719            &serde_json::json!({"namespace_id": other_ns, "stage": "total", "unit": "ms"}),
4720        )
4721        .await
4722        .unwrap();
4723        repo.record_metric(
4724            "cognition.representation.total_ms",
4725            4.0,
4726            &serde_json::json!({"namespace_id": ns_id, "stage": "total", "unit": "ms"}),
4727        )
4728        .await
4729        .unwrap();
4730
4731        let metrics = repo
4732            .latest_metrics_for_namespace(ns_id, Some("cognition."), 10)
4733            .await
4734            .unwrap();
4735
4736        assert_eq!(metrics.len(), 2);
4737        assert!(metrics
4738            .iter()
4739            .all(|metric| metric.labels.contains(&ns_id.to_string())));
4740        assert!(metrics
4741            .iter()
4742            .any(|metric| metric.metric_name == "cognition.query.total_ms"));
4743        assert!(metrics
4744            .iter()
4745            .any(|metric| metric.metric_name == "cognition.representation.total_ms"));
4746        assert!(metrics
4747            .iter()
4748            .all(|metric| { metric.metric_name.starts_with("cognition.") }));
4749    }
4750
4751    #[tokio::test]
4752    async fn test_record_metrics_batch_persists_all_samples() {
4753        let pool = setup_test_db().await;
4754        let ns_id = create_namespace(&pool, "metric-batch").await;
4755        let repo = MemoryRepository::new(pool);
4756
4757        repo.record_metrics_batch(&[
4758            MetricSample {
4759                metric_name: "cognition.query.total_ms".to_string(),
4760                metric_value: 9.5,
4761                labels: serde_json::json!({"namespace_id": ns_id, "stage": "total", "unit": "ms"}),
4762            },
4763            MetricSample {
4764                metric_name: "cognition.query.answer.total_tokens".to_string(),
4765                metric_value: 128.0,
4766                labels: serde_json::json!({"namespace_id": ns_id, "stage": "answer", "unit": "tokens"}),
4767            },
4768        ])
4769        .await
4770        .unwrap();
4771
4772        let metrics = repo
4773            .latest_metrics_for_namespace(ns_id, Some("cognition."), 10)
4774            .await
4775            .unwrap();
4776
4777        assert_eq!(metrics.len(), 2);
4778    }
4779
4780    // ---- Observability query tests ----
4781
4782    #[tokio::test]
4783    async fn test_list_jobs_returns_enqueued_jobs() {
4784        let pool = setup_test_db().await;
4785        let ns_id = create_namespace(&pool, "obs-jobs").await;
4786        let repo = MemoryRepository::new(pool);
4787
4788        repo.enqueue_job(EnqueueJobParams {
4789            namespace_id: ns_id,
4790            job_type: "derive",
4791            priority: 10,
4792            perspective: None,
4793            payload: &serde_json::json!({"a": 1}),
4794        })
4795        .await
4796        .unwrap();
4797
4798        repo.enqueue_job(EnqueueJobParams {
4799            namespace_id: ns_id,
4800            job_type: "digest",
4801            priority: 5,
4802            perspective: None,
4803            payload: &serde_json::json!({"b": 2}),
4804        })
4805        .await
4806        .unwrap();
4807
4808        // List all jobs.
4809        let all = repo.list_jobs(ns_id, None, None, 50, 0).await.unwrap();
4810        assert_eq!(all.len(), 2);
4811
4812        // Filter by job_type.
4813        let derive_only = repo
4814            .list_jobs(ns_id, Some("derive"), None, 50, 0)
4815            .await
4816            .unwrap();
4817        assert_eq!(derive_only.len(), 1);
4818        assert_eq!(derive_only[0].job_type, "derive");
4819
4820        // Filter by status (both pending).
4821        let pending = repo
4822            .list_jobs(ns_id, None, Some("pending"), 50, 0)
4823            .await
4824            .unwrap();
4825        assert_eq!(pending.len(), 2);
4826
4827        // Combined filter.
4828        let digest_pending = repo
4829            .list_jobs(ns_id, Some("digest"), Some("pending"), 50, 0)
4830            .await
4831            .unwrap();
4832        assert_eq!(digest_pending.len(), 1);
4833    }
4834
4835    #[tokio::test]
4836    async fn test_list_jobs_respects_limit_offset() {
4837        let pool = setup_test_db().await;
4838        let ns_id = create_namespace(&pool, "obs-limit").await;
4839        let repo = MemoryRepository::new(pool);
4840
4841        for i in 0..5 {
4842            repo.enqueue_job(EnqueueJobParams {
4843                namespace_id: ns_id,
4844                job_type: "derive",
4845                priority: i,
4846                perspective: None,
4847                payload: &serde_json::json!({"i": i}),
4848            })
4849            .await
4850            .unwrap();
4851        }
4852
4853        let page1 = repo.list_jobs(ns_id, None, None, 2, 0).await.unwrap();
4854        assert_eq!(page1.len(), 2);
4855
4856        let page2 = repo.list_jobs(ns_id, None, None, 2, 2).await.unwrap();
4857        assert_eq!(page2.len(), 2);
4858
4859        let page3 = repo.list_jobs(ns_id, None, None, 2, 4).await.unwrap();
4860        assert_eq!(page3.len(), 1);
4861    }
4862
4863    #[tokio::test]
4864    async fn test_count_jobs_by_status() {
4865        let pool = setup_test_db().await;
4866        let ns_id = create_namespace(&pool, "obs-count").await;
4867        let repo = MemoryRepository::new(pool);
4868
4869        repo.enqueue_job(EnqueueJobParams {
4870            namespace_id: ns_id,
4871            job_type: "derive",
4872            priority: 10,
4873            perspective: None,
4874            payload: &serde_json::json!({}),
4875        })
4876        .await
4877        .unwrap();
4878
4879        repo.enqueue_job(EnqueueJobParams {
4880            namespace_id: ns_id,
4881            job_type: "derive",
4882            priority: 5,
4883            perspective: None,
4884            payload: &serde_json::json!({}),
4885        })
4886        .await
4887        .unwrap();
4888
4889        repo.enqueue_job(EnqueueJobParams {
4890            namespace_id: ns_id,
4891            job_type: "digest",
4892            priority: 10,
4893            perspective: None,
4894            payload: &serde_json::json!({}),
4895        })
4896        .await
4897        .unwrap();
4898
4899        // All jobs.
4900        let all_counts = repo.count_jobs_by_status(ns_id, None).await.unwrap();
4901        let total: i64 = all_counts.iter().map(|(_, c)| c).sum();
4902        assert_eq!(total, 3);
4903
4904        // Filtered by job_type.
4905        let derive_counts = repo
4906            .count_jobs_by_status(ns_id, Some("derive"))
4907            .await
4908            .unwrap();
4909        let derive_total: i64 = derive_counts.iter().map(|(_, c)| c).sum();
4910        assert_eq!(derive_total, 2);
4911    }
4912
4913    #[tokio::test]
4914    async fn test_count_jobs_respects_filters() {
4915        let pool = setup_test_db().await;
4916        let ns_id = create_namespace(&pool, "obs-job-total").await;
4917        let repo = MemoryRepository::new(pool);
4918
4919        repo.enqueue_job(EnqueueJobParams {
4920            namespace_id: ns_id,
4921            job_type: "derive",
4922            priority: 10,
4923            perspective: None,
4924            payload: &serde_json::json!({"index": 1}),
4925        })
4926        .await
4927        .unwrap();
4928        repo.enqueue_job(EnqueueJobParams {
4929            namespace_id: ns_id,
4930            job_type: "derive",
4931            priority: 5,
4932            perspective: None,
4933            payload: &serde_json::json!({"index": 2}),
4934        })
4935        .await
4936        .unwrap();
4937        repo.enqueue_job(EnqueueJobParams {
4938            namespace_id: ns_id,
4939            job_type: "digest",
4940            priority: 1,
4941            perspective: None,
4942            payload: &serde_json::json!({"index": 3}),
4943        })
4944        .await
4945        .unwrap();
4946
4947        assert_eq!(repo.count_jobs(ns_id, None, None).await.unwrap(), 3);
4948        assert_eq!(
4949            repo.count_jobs(ns_id, Some("derive"), None).await.unwrap(),
4950            2
4951        );
4952        assert_eq!(
4953            repo.count_jobs(ns_id, Some("derive"), Some("pending"))
4954                .await
4955                .unwrap(),
4956            2
4957        );
4958        assert_eq!(
4959            repo.count_jobs(ns_id, Some("reflect"), Some("pending"))
4960                .await
4961                .unwrap(),
4962            0
4963        );
4964    }
4965
4966    #[tokio::test]
4967    async fn test_list_digests_and_count() {
4968        let pool = setup_test_db().await;
4969        let ns_id = create_namespace(&pool, "obs-digests").await;
4970        let repo = MemoryRepository::new(pool);
4971
4972        // Store a memory to use as digest content.
4973        let mem = repo
4974            .store(StoreMemoryParams {
4975                namespace_id: ns_id,
4976                content: "digest content",
4977                category: &Category::Session,
4978                memory_lane_type: None,
4979                labels: &[],
4980                metadata: &serde_json::json!({"cognitive": {"level": "summary_short"}}),
4981                embedding: None,
4982                embedding_model: None,
4983            })
4984            .await
4985            .unwrap();
4986
4987        repo.store_digest(StoreDigestParams {
4988            namespace_id: ns_id,
4989            session_key: "session-1",
4990            digest_kind: "short",
4991            memory_id: mem.id,
4992            start_memory_id: Some(1),
4993            end_memory_id: Some(10),
4994            token_count: 50,
4995        })
4996        .await
4997        .unwrap();
4998
4999        repo.store_digest(StoreDigestParams {
5000            namespace_id: ns_id,
5001            session_key: "session-2",
5002            digest_kind: "long",
5003            memory_id: mem.id,
5004            start_memory_id: Some(11),
5005            end_memory_id: Some(20),
5006            token_count: 100,
5007        })
5008        .await
5009        .unwrap();
5010
5011        // List all digests.
5012        let all = repo.list_digests(ns_id, None, 50, 0).await.unwrap();
5013        assert_eq!(all.len(), 2);
5014
5015        let total = repo.count_digests(ns_id, None).await.unwrap();
5016        assert_eq!(total, 2);
5017
5018        // Filter by session_key.
5019        let sess1 = repo
5020            .list_digests(ns_id, Some("session-1"), 50, 0)
5021            .await
5022            .unwrap();
5023        assert_eq!(sess1.len(), 1);
5024        assert_eq!(sess1[0].session_key, "session-1");
5025
5026        let sess1_count = repo.count_digests(ns_id, Some("session-1")).await.unwrap();
5027        assert_eq!(sess1_count, 1);
5028
5029        // Filter by non-existent session_key.
5030        let none = repo
5031            .list_digests(ns_id, Some("session-none"), 50, 0)
5032            .await
5033            .unwrap();
5034        assert!(none.is_empty());
5035
5036        let none_count = repo
5037            .count_digests(ns_id, Some("session-none"))
5038            .await
5039            .unwrap();
5040        assert_eq!(none_count, 0);
5041    }
5042
5043    // ---- Phase 2 Task 2: Explicit JSON decode error tests ----
5044
5045    /// Malformed labels JSON must produce an explicit Storage error,
5046    /// not silently default to an empty Vec.
5047    #[tokio::test]
5048    async fn test_row_to_memory_rejects_malformed_labels() {
5049        let pool = setup_test_db().await;
5050        let ns_id = create_namespace(&pool, "test-agent").await;
5051        let repo = MemoryRepository::new(pool);
5052
5053        // Insert a memory with valid data first.
5054        let memory = repo
5055            .store(StoreMemoryParams {
5056                namespace_id: ns_id,
5057                content: "corruption test labels",
5058                category: &Category::General,
5059                memory_lane_type: None,
5060                labels: &["valid-label".to_string()],
5061                metadata: &serde_json::Value::Null,
5062                embedding: None,
5063                embedding_model: None,
5064            })
5065            .await
5066            .unwrap();
5067
5068        // Corrupt the labels in-place to invalid JSON.
5069        sqlx::query("UPDATE memories SET labels = 'NOT VALID JSON{{{' WHERE id = ?")
5070            .bind(memory.id)
5071            .execute(repo.pool())
5072            .await
5073            .unwrap();
5074
5075        let err = repo.get_by_id(memory.id).await.unwrap_err();
5076        let msg = err.to_string();
5077        assert!(
5078            msg.contains("corrupted labels JSON"),
5079            "expected labels corruption error, got: {msg}"
5080        );
5081        assert!(msg.contains(&memory.id.to_string()));
5082    }
5083
5084    /// Malformed metadata JSON must produce an explicit Storage error,
5085    /// not silently default to Value::Null.
5086    #[tokio::test]
5087    async fn test_row_to_memory_rejects_malformed_metadata() {
5088        let pool = setup_test_db().await;
5089        let repo = MemoryRepository::new(pool);
5090
5091        // Construct a MemoryRow with corrupted metadata directly,
5092        // bypassing SQLite expression-index validation on UPDATE.
5093        let row = MemoryRow {
5094            id: 999,
5095            namespace_id: 1,
5096            content: "test".to_string(),
5097            category: "general".to_string(),
5098            memory_lane_type: None,
5099            labels: "[]".to_string(),
5100            metadata: "[truncated".to_string(), // invalid JSON
5101            similarity_score: None,
5102            relevance_score: None,
5103            content_embedding: None,
5104            embedding_model: None,
5105            created_at: Utc::now(),
5106            updated_at: None,
5107            last_accessed: None,
5108            is_active: true,
5109            is_archived: false,
5110            access_count: 0,
5111        };
5112
5113        let err = repo.row_to_memory(row).unwrap_err();
5114        let msg = err.to_string();
5115        assert!(
5116            msg.contains("corrupted metadata JSON"),
5117            "expected metadata corruption error, got: {msg}"
5118        );
5119    }
5120
5121    /// Malformed embedding JSON must produce an explicit Storage error,
5122    /// not silently default to None.
5123    #[tokio::test]
5124    async fn test_row_to_memory_rejects_malformed_embedding() {
5125        let pool = setup_test_db().await;
5126        let ns_id = create_namespace(&pool, "test-agent").await;
5127        let repo = MemoryRepository::new(pool);
5128
5129        let memory = repo
5130            .store(StoreMemoryParams {
5131                namespace_id: ns_id,
5132                content: "corruption test embedding",
5133                category: &Category::General,
5134                memory_lane_type: None,
5135                labels: &[],
5136                metadata: &serde_json::Value::Null,
5137                embedding: Some(&[0.1, 0.2, 0.3]),
5138                embedding_model: Some("test-model"),
5139            })
5140            .await
5141            .unwrap();
5142
5143        sqlx::query("UPDATE memories SET content_embedding = 'not-an-array' WHERE id = ?")
5144            .bind(memory.id)
5145            .execute(repo.pool())
5146            .await
5147            .unwrap();
5148
5149        let err = repo.get_by_id(memory.id).await.unwrap_err();
5150        let msg = err.to_string();
5151        assert!(
5152            msg.contains("corrupted embedding JSON"),
5153            "expected embedding corruption error, got: {msg}"
5154        );
5155    }
5156
5157    /// Malformed job payload JSON is permanently failed and the claim succeeds
5158    /// with an empty result, rather than poisoning the entire batch.
5159    #[tokio::test]
5160    async fn test_claim_jobs_rejects_malformed_payload() {
5161        let pool = setup_test_db().await;
5162        let ns_id = create_namespace(&pool, "test-agent").await;
5163        let repo = MemoryRepository::new(pool);
5164
5165        // Insert a job directly with corrupted payload JSON.
5166        sqlx::query(
5167            r#"
5168            INSERT INTO memory_jobs (namespace_id, job_type, status, priority, payload_json, created_at, updated_at)
5169            VALUES (?, 'derive_memory', 'pending', 100, '{INVALID_JSON}', datetime('now'), datetime('now'))
5170            "#,
5171        )
5172        .bind(ns_id)
5173        .execute(repo.pool())
5174        .await
5175        .unwrap();
5176
5177        // claim_jobs should succeed with empty result — the bad job is permanently failed.
5178        let claimed = repo
5179            .claim_jobs(ns_id, "derive_memory", "worker-1", 60, 1)
5180            .await
5181            .unwrap();
5182        assert!(
5183            claimed.is_empty(),
5184            "corrupt payload job should not be returned"
5185        );
5186
5187        // Verify the job was permanently failed.
5188        let status: String =
5189            sqlx::query_scalar("SELECT status FROM memory_jobs WHERE namespace_id = ?")
5190                .bind(ns_id)
5191                .fetch_one(repo.pool())
5192                .await
5193                .unwrap();
5194        assert_eq!(status, "failed", "corrupt job should be permanently failed");
5195
5196        let last_error: Option<String> =
5197            sqlx::query_scalar("SELECT last_error FROM memory_jobs WHERE namespace_id = ?")
5198                .bind(ns_id)
5199                .fetch_one(repo.pool())
5200                .await
5201                .unwrap();
5202        assert!(
5203            last_error
5204                .unwrap_or_default()
5205                .contains("corrupted payload JSON"),
5206            "last_error should mention payload corruption"
5207        );
5208    }
5209
5210    /// Malformed perspective JSON in claim_jobs is permanently failed and the
5211    /// claim succeeds with an empty result, rather than poisoning the batch.
5212    #[tokio::test]
5213    async fn test_claim_jobs_rejects_malformed_perspective() {
5214        let pool = setup_test_db().await;
5215        let ns_id = create_namespace(&pool, "test-agent").await;
5216        let repo = MemoryRepository::new(pool);
5217
5218        // Insert a job with valid payload but corrupted perspective JSON.
5219        sqlx::query(
5220            r#"
5221            INSERT INTO memory_jobs (namespace_id, job_type, status, priority, perspective_json, payload_json, created_at, updated_at)
5222            VALUES (?, 'derive_memory', 'pending', 100, '{BOGUS}', '{"ok": true}', datetime('now'), datetime('now'))
5223            "#,
5224        )
5225        .bind(ns_id)
5226        .execute(repo.pool())
5227        .await
5228        .unwrap();
5229
5230        // claim_jobs should succeed with empty result — the bad job is permanently failed.
5231        let claimed = repo
5232            .claim_jobs(ns_id, "derive_memory", "worker-1", 60, 1)
5233            .await
5234            .unwrap();
5235        assert!(
5236            claimed.is_empty(),
5237            "corrupt perspective job should not be returned"
5238        );
5239
5240        // Verify the job was permanently failed.
5241        let status: String =
5242            sqlx::query_scalar("SELECT status FROM memory_jobs WHERE namespace_id = ?")
5243                .bind(ns_id)
5244                .fetch_one(repo.pool())
5245                .await
5246                .unwrap();
5247        assert_eq!(status, "failed", "corrupt job should be permanently failed");
5248
5249        let last_error: Option<String> =
5250            sqlx::query_scalar("SELECT last_error FROM memory_jobs WHERE namespace_id = ?")
5251                .bind(ns_id)
5252                .fetch_one(repo.pool())
5253                .await
5254                .unwrap();
5255        assert!(
5256            last_error
5257                .unwrap_or_default()
5258                .contains("corrupted perspective JSON"),
5259            "last_error should mention perspective corruption"
5260        );
5261    }
5262
5263    /// When a batch contains one corrupt job among valid ones, the valid jobs
5264    /// are returned and only the corrupt job is permanently failed.
5265    #[tokio::test]
5266    async fn test_claim_jobs_skips_corrupt_returns_valid() {
5267        let pool = setup_test_db().await;
5268        let ns_id = create_namespace(&pool, "test-agent").await;
5269        let repo = MemoryRepository::new(pool);
5270
5271        // Enqueue 2 valid jobs.
5272        let p1 = serde_json::json!({"memory_id": 1});
5273        repo.enqueue_job(EnqueueJobParams {
5274            namespace_id: ns_id,
5275            job_type: "derive_memory",
5276            priority: 100,
5277            perspective: None,
5278            payload: &p1,
5279        })
5280        .await
5281        .unwrap();
5282        let p2 = serde_json::json!({"memory_id": 2});
5283        repo.enqueue_job(EnqueueJobParams {
5284            namespace_id: ns_id,
5285            job_type: "derive_memory",
5286            priority: 50,
5287            perspective: None,
5288            payload: &p2,
5289        })
5290        .await
5291        .unwrap();
5292
5293        // Insert 1 corrupt job at the highest priority.
5294        sqlx::query(
5295            r#"
5296            INSERT INTO memory_jobs (namespace_id, job_type, status, priority, payload_json, created_at, updated_at)
5297            VALUES (?, 'derive_memory', 'pending', 200, '{BROKEN}', datetime('now'), datetime('now'))
5298            "#,
5299        )
5300        .bind(ns_id)
5301        .execute(repo.pool())
5302        .await
5303        .unwrap();
5304
5305        // Claim all 3 — should get only the 2 valid jobs.
5306        let claimed = repo
5307            .claim_jobs(ns_id, "derive_memory", "worker-1", 60, 10)
5308            .await
5309            .unwrap();
5310        assert_eq!(
5311            claimed.len(),
5312            2,
5313            "should return 2 valid jobs, skipping the corrupt one"
5314        );
5315
5316        // The corrupt job should be permanently failed.
5317        let failed_count: i64 = sqlx::query_scalar(
5318            "SELECT COUNT(*) FROM memory_jobs WHERE namespace_id = ? AND status = 'failed'",
5319        )
5320        .bind(ns_id)
5321        .fetch_one(repo.pool())
5322        .await
5323        .unwrap();
5324        assert_eq!(failed_count, 1, "corrupt job should be permanently failed");
5325    }
5326
5327    // ---- Lifecycle management: purge tests ----
5328
5329    /// Purge should remove old completed jobs but keep recently completed ones.
5330    #[tokio::test]
5331    async fn test_purge_completed_jobs_removes_old_keeps_recent() {
5332        let pool = setup_test_db().await;
5333        let ns_id = create_namespace(&pool, "purge-test").await;
5334        let repo = MemoryRepository::new(pool);
5335
5336        // Enqueue and complete a job, then backdate its updated_at.
5337        repo.enqueue_job(EnqueueJobParams {
5338            namespace_id: ns_id,
5339            job_type: "derive_memory",
5340            priority: 10,
5341            perspective: None,
5342            payload: &serde_json::json!({"old": true}),
5343        })
5344        .await
5345        .unwrap();
5346
5347        let claimed = repo
5348            .claim_jobs(ns_id, "derive_memory", "w", 60, 10)
5349            .await
5350            .unwrap();
5351        assert_eq!(claimed.len(), 1);
5352        let old_job_id = claimed[0].row.id;
5353
5354        repo.complete_job(&claimed[0]).await.unwrap();
5355
5356        // Backdate the completed job to 30 days ago.
5357        sqlx::query("UPDATE memory_jobs SET updated_at = datetime('now', '-30 days') WHERE id = ?")
5358            .bind(old_job_id)
5359            .execute(repo.pool())
5360            .await
5361            .unwrap();
5362
5363        // Enqueue and complete a second job that stays recent.
5364        repo.enqueue_job(EnqueueJobParams {
5365            namespace_id: ns_id,
5366            job_type: "derive_memory",
5367            priority: 10,
5368            perspective: None,
5369            payload: &serde_json::json!({"new": true}),
5370        })
5371        .await
5372        .unwrap();
5373
5374        let claimed2 = repo
5375            .claim_jobs(ns_id, "derive_memory", "w", 60, 10)
5376            .await
5377            .unwrap();
5378        assert_eq!(claimed2.len(), 1);
5379        repo.complete_job(&claimed2[0]).await.unwrap();
5380
5381        // Purge with a cutoff of 7 days ago.
5382        let cutoff = Utc::now() - chrono::Duration::days(7);
5383        let deleted = repo.purge_completed_jobs(cutoff).await.unwrap();
5384        assert_eq!(deleted, 1);
5385
5386        // Verify: old job is gone, recent job remains.
5387        let remaining = repo.list_jobs(ns_id, None, None, 50, 0).await.unwrap();
5388        assert_eq!(remaining.len(), 1);
5389        assert_eq!(remaining[0].id, claimed2[0].row.id);
5390    }
5391
5392    /// Purge permanently failed jobs should only remove those with attempts >= 5.
5393    #[tokio::test]
5394    async fn test_purge_permanently_failed_jobs_removes_old_keeps_recent() {
5395        let pool = setup_test_db().await;
5396        let ns_id = create_namespace(&pool, "purge-failed").await;
5397        let repo = MemoryRepository::new(pool);
5398
5399        // Enqueue a job and fail it 5 times to make it permanently failed.
5400        repo.enqueue_job(EnqueueJobParams {
5401            namespace_id: ns_id,
5402            job_type: "derive_memory",
5403            priority: 10,
5404            perspective: None,
5405            payload: &serde_json::json!({"fail_me": true}),
5406        })
5407        .await
5408        .unwrap();
5409
5410        for _ in 0..5 {
5411            let claimed = repo
5412                .claim_jobs(ns_id, "derive_memory", "w", 60, 10)
5413                .await
5414                .unwrap();
5415            assert_eq!(claimed.len(), 1);
5416            repo.fail_job(&claimed[0], "persistent error")
5417                .await
5418                .unwrap();
5419        }
5420
5421        // Backdate to 30 days ago.
5422        sqlx::query(
5423            "UPDATE memory_jobs SET updated_at = datetime('now', '-30 days') WHERE status = ?",
5424        )
5425        .bind(memory_job_status::FAILED)
5426        .execute(repo.pool())
5427        .await
5428        .unwrap();
5429
5430        // Enqueue a second job and fail it only 2 times (still re-queueable).
5431        repo.enqueue_job(EnqueueJobParams {
5432            namespace_id: ns_id,
5433            job_type: "derive_memory",
5434            priority: 10,
5435            perspective: None,
5436            payload: &serde_json::json!({"retry_me": true}),
5437        })
5438        .await
5439        .unwrap();
5440
5441        for _ in 0..2 {
5442            let claimed = repo
5443                .claim_jobs(ns_id, "derive_memory", "w", 60, 10)
5444                .await
5445                .unwrap();
5446            assert_eq!(claimed.len(), 1);
5447            repo.fail_job(&claimed[0], "transient error").await.unwrap();
5448        }
5449
5450        // Purge with a cutoff of 7 days ago.
5451        let cutoff = Utc::now() - chrono::Duration::days(7);
5452        let deleted = repo.purge_permanently_failed_jobs(cutoff).await.unwrap();
5453        assert_eq!(deleted, 1);
5454
5455        // The permanently failed job is gone; the re-queueable job remains as pending.
5456        let remaining = repo.list_jobs(ns_id, None, None, 50, 0).await.unwrap();
5457        assert_eq!(remaining.len(), 1);
5458        assert_eq!(remaining[0].status, memory_job_status::PENDING);
5459    }
5460
5461    /// Active leasing should still work after purging old completed/failed jobs.
5462    #[tokio::test]
5463    async fn test_active_leasing_works_after_purge() {
5464        let pool = setup_test_db().await;
5465        let ns_id = create_namespace(&pool, "purge-lease").await;
5466        let repo = MemoryRepository::new(pool);
5467
5468        // Create and complete an old job.
5469        repo.enqueue_job(EnqueueJobParams {
5470            namespace_id: ns_id,
5471            job_type: "derive_memory",
5472            priority: 10,
5473            perspective: None,
5474            payload: &serde_json::json!({"old": true}),
5475        })
5476        .await
5477        .unwrap();
5478
5479        let claimed = repo
5480            .claim_jobs(ns_id, "derive_memory", "w", 60, 10)
5481            .await
5482            .unwrap();
5483        repo.complete_job(&claimed[0]).await.unwrap();
5484
5485        sqlx::query("UPDATE memory_jobs SET updated_at = datetime('now', '-30 days') WHERE id = ?")
5486            .bind(claimed[0].row.id)
5487            .execute(repo.pool())
5488            .await
5489            .unwrap();
5490
5491        // Purge old completed jobs.
5492        let cutoff = Utc::now() - chrono::Duration::days(7);
5493        repo.purge_completed_jobs(cutoff).await.unwrap();
5494
5495        // Enqueue a fresh job and verify the full claim/complete/fail cycle still works.
5496        repo.enqueue_job(EnqueueJobParams {
5497            namespace_id: ns_id,
5498            job_type: "derive_memory",
5499            priority: 20,
5500            perspective: None,
5501            payload: &serde_json::json!({"fresh": true}),
5502        })
5503        .await
5504        .unwrap();
5505
5506        let fresh_claimed = repo
5507            .claim_jobs(ns_id, "derive_memory", "worker-2", 120, 10)
5508            .await
5509            .unwrap();
5510        assert_eq!(fresh_claimed.len(), 1);
5511        assert_eq!(fresh_claimed[0].row.status, "running");
5512        assert_eq!(fresh_claimed[0].payload["fresh"], true);
5513
5514        // Complete it.
5515        repo.complete_job(&fresh_claimed[0]).await.unwrap();
5516
5517        // Verify no more claimable jobs.
5518        let empty = repo
5519            .claim_jobs(ns_id, "derive_memory", "worker-3", 60, 10)
5520            .await
5521            .unwrap();
5522        assert!(empty.is_empty());
5523    }
5524}