Skip to main content

engram/storage/
queries.rs

1//! Database queries for memory operations
2
3use chrono::{DateTime, Utc};
4use rusqlite::{params, Connection, OptionalExtension, Row};
5use serde::{Deserialize, Serialize};
6use sha2::{Digest, Sha256};
7use std::collections::HashMap;
8
9use crate::error::{EngramError, Result};
10use crate::types::*;
11
12/// Parse a memory from a database row
13pub fn memory_from_row(row: &Row) -> rusqlite::Result<Memory> {
14    let id: i64 = row.get("id")?;
15    let content: String = row.get("content")?;
16    let memory_type_str: String = row.get("memory_type")?;
17    let importance: f32 = row.get("importance")?;
18    let access_count: i32 = row.get("access_count")?;
19    let created_at: String = row.get("created_at")?;
20    let updated_at: String = row.get("updated_at")?;
21    let last_accessed_at: Option<String> = row.get("last_accessed_at")?;
22    let owner_id: Option<String> = row.get("owner_id")?;
23    let visibility_str: String = row.get("visibility")?;
24    let version: i32 = row.get("version")?;
25    let has_embedding: i32 = row.get("has_embedding")?;
26    let metadata_str: String = row.get("metadata")?;
27
28    // Scope columns (with fallback for backward compatibility)
29    let scope_type: String = row
30        .get("scope_type")
31        .unwrap_or_else(|_| "global".to_string());
32    let scope_id: Option<String> = row.get("scope_id").unwrap_or(None);
33
34    // TTL column (with fallback for backward compatibility)
35    let expires_at: Option<String> = row.get("expires_at").unwrap_or(None);
36
37    // Content hash column (with fallback for backward compatibility)
38    let content_hash: Option<String> = row.get("content_hash").unwrap_or(None);
39
40    let memory_type = memory_type_str.parse().unwrap_or(MemoryType::Note);
41    let visibility = match visibility_str.as_str() {
42        "shared" => Visibility::Shared,
43        "public" => Visibility::Public,
44        _ => Visibility::Private,
45    };
46
47    // Parse scope from type and id
48    let scope = match (scope_type.as_str(), scope_id) {
49        ("user", Some(id)) => MemoryScope::User { user_id: id },
50        ("session", Some(id)) => MemoryScope::Session { session_id: id },
51        ("agent", Some(id)) => MemoryScope::Agent { agent_id: id },
52        _ => MemoryScope::Global,
53    };
54
55    let metadata: HashMap<String, serde_json::Value> =
56        serde_json::from_str(&metadata_str).unwrap_or_default();
57
58    // Workspace column (with fallback for backward compatibility)
59    let workspace: String = row
60        .get("workspace")
61        .unwrap_or_else(|_| "default".to_string());
62
63    // Tier column (with fallback for backward compatibility)
64    let tier_str: String = row.get("tier").unwrap_or_else(|_| "permanent".to_string());
65    let tier = tier_str.parse().unwrap_or_default();
66
67    let event_time: Option<String> = row.get("event_time").unwrap_or(None);
68    let event_duration_seconds: Option<i64> = row.get("event_duration_seconds").unwrap_or(None);
69    let trigger_pattern: Option<String> = row.get("trigger_pattern").unwrap_or(None);
70    let procedure_success_count: i32 = row.get("procedure_success_count").unwrap_or(0);
71    let procedure_failure_count: i32 = row.get("procedure_failure_count").unwrap_or(0);
72    let summary_of_id: Option<i64> = row.get("summary_of_id").unwrap_or(None);
73    let lifecycle_state_str: Option<String> = row.get("lifecycle_state").unwrap_or(None);
74
75    let lifecycle_state = lifecycle_state_str
76        .and_then(|s| s.parse().ok())
77        .unwrap_or(crate::types::LifecycleState::Active);
78
79    Ok(Memory {
80        id,
81        content,
82        memory_type,
83        tags: vec![], // Loaded separately
84        metadata,
85        importance,
86        access_count,
87        created_at: DateTime::parse_from_rfc3339(&created_at)
88            .map(|dt| dt.with_timezone(&Utc))
89            .unwrap_or_else(|_| Utc::now()),
90        updated_at: DateTime::parse_from_rfc3339(&updated_at)
91            .map(|dt| dt.with_timezone(&Utc))
92            .unwrap_or_else(|_| Utc::now()),
93        last_accessed_at: last_accessed_at.and_then(|s| {
94            DateTime::parse_from_rfc3339(&s)
95                .map(|dt| dt.with_timezone(&Utc))
96                .ok()
97        }),
98        owner_id,
99        visibility,
100        scope,
101        workspace,
102        tier,
103        version,
104        has_embedding: has_embedding != 0,
105        expires_at: expires_at.and_then(|s| {
106            DateTime::parse_from_rfc3339(&s)
107                .map(|dt| dt.with_timezone(&Utc))
108                .ok()
109        }),
110        content_hash,
111        event_time: event_time.and_then(|s| {
112            DateTime::parse_from_rfc3339(&s)
113                .map(|dt| dt.with_timezone(&Utc))
114                .ok()
115        }),
116        event_duration_seconds,
117        trigger_pattern,
118        procedure_success_count,
119        procedure_failure_count,
120        summary_of_id,
121        lifecycle_state,
122    })
123}
124
125pub(crate) fn metadata_value_to_param(
126    key: &str,
127    value: &serde_json::Value,
128    conditions: &mut Vec<String>,
129    params: &mut Vec<Box<dyn rusqlite::ToSql>>,
130) -> Result<()> {
131    match value {
132        serde_json::Value::String(s) => {
133            conditions.push(format!("json_extract(m.metadata, '$.{}') = ?", key));
134            params.push(Box::new(s.clone()));
135        }
136        serde_json::Value::Number(n) => {
137            conditions.push(format!("json_extract(m.metadata, '$.{}') = ?", key));
138            if let Some(i) = n.as_i64() {
139                params.push(Box::new(i));
140            } else if let Some(f) = n.as_f64() {
141                params.push(Box::new(f));
142            } else {
143                return Err(EngramError::InvalidInput("Invalid number".to_string()));
144            }
145        }
146        serde_json::Value::Bool(b) => {
147            conditions.push(format!("json_extract(m.metadata, '$.{}') = ?", key));
148            params.push(Box::new(*b));
149        }
150        serde_json::Value::Null => {
151            conditions.push(format!("json_extract(m.metadata, '$.{}') IS NULL", key));
152        }
153        _ => {
154            return Err(EngramError::InvalidInput(format!(
155                "Unsupported metadata filter value for key: {}",
156                key
157            )));
158        }
159    }
160
161    Ok(())
162}
163
164fn get_memory_internal(conn: &Connection, id: i64, track_access: bool) -> Result<Memory> {
165    let now = Utc::now().to_rfc3339();
166
167    let mut stmt = conn.prepare_cached(
168        "SELECT id, content, memory_type, importance, access_count,
169                created_at, updated_at, last_accessed_at, owner_id,
170                visibility, version, has_embedding, metadata,
171                scope_type, scope_id, workspace, tier, expires_at, content_hash
172         FROM memories
173         WHERE id = ? AND valid_to IS NULL
174           AND (expires_at IS NULL OR expires_at > ?)",
175    )?;
176
177    let mut memory = stmt
178        .query_row(params![id, now], memory_from_row)
179        .map_err(|_| EngramError::NotFound(id))?;
180
181    memory.tags = load_tags(conn, id)?;
182
183    if track_access {
184        // Update access tracking
185        let now = Utc::now().to_rfc3339();
186        conn.execute(
187            "UPDATE memories SET access_count = access_count + 1, last_accessed_at = ?
188             WHERE id = ?",
189            params![now, id],
190        )?;
191    }
192
193    Ok(memory)
194}
195
196/// Load tags for a memory
197pub fn load_tags(conn: &Connection, memory_id: i64) -> Result<Vec<String>> {
198    let mut stmt = conn.prepare_cached(
199        "SELECT t.name FROM tags t
200         JOIN memory_tags mt ON t.id = mt.tag_id
201         WHERE mt.memory_id = ?",
202    )?;
203
204    let tags: Vec<String> = stmt
205        .query_map([memory_id], |row| row.get(0))?
206        .filter_map(|r| r.ok())
207        .collect();
208
209    Ok(tags)
210}
211
212/// Compute SHA256 hash of normalized content for deduplication
213pub fn compute_content_hash(content: &str) -> String {
214    // Normalize: lowercase, collapse whitespace, trim
215    let normalized = content
216        .to_lowercase()
217        .split_whitespace()
218        .collect::<Vec<_>>()
219        .join(" ");
220
221    let mut hasher = Sha256::new();
222    hasher.update(normalized.as_bytes());
223    format!("sha256:{}", hex::encode(hasher.finalize()))
224}
225
226/// Find a memory by content hash within the same scope and workspace (exact duplicate detection)
227///
228/// Deduplication respects both scope and workspace isolation:
229/// - User-scoped memories only dedupe against other memories with same user_id
230/// - Session-scoped memories only dedupe against other memories with same session_id
231/// - Global memories only dedupe against other global memories
232/// - All deduplication is workspace-scoped (memories in different workspaces are never duplicates)
233pub fn find_by_content_hash(
234    conn: &Connection,
235    content_hash: &str,
236    scope: &MemoryScope,
237    workspace: Option<&str>,
238) -> Result<Option<Memory>> {
239    let now = Utc::now().to_rfc3339();
240    let scope_type = scope.scope_type();
241    let scope_id = scope.scope_id().map(|s| s.to_string());
242    let workspace = workspace.unwrap_or("default");
243
244    let mut stmt = conn.prepare_cached(
245        "SELECT id, content, memory_type, importance, access_count,
246                created_at, updated_at, last_accessed_at, owner_id,
247                visibility, version, has_embedding, metadata,
248                scope_type, scope_id, workspace, tier, expires_at, content_hash
249         FROM memories
250         WHERE content_hash = ? AND valid_to IS NULL
251           AND (expires_at IS NULL OR expires_at > ?)
252           AND scope_type = ?
253           AND (scope_id = ? OR (scope_id IS NULL AND ? IS NULL))
254           AND workspace = ?
255         LIMIT 1",
256    )?;
257
258    let result = stmt
259        .query_row(
260            params![content_hash, now, scope_type, scope_id, scope_id, workspace],
261            memory_from_row,
262        )
263        .ok();
264
265    if let Some(mut memory) = result {
266        memory.tags = load_tags(conn, memory.id)?;
267        Ok(Some(memory))
268    } else {
269        Ok(None)
270    }
271}
272
273/// Find the most similar memory to given embedding within the same scope AND workspace (semantic duplicate detection)
274///
275/// Returns the memory with the highest similarity score if it meets the threshold.
276/// Only checks memories that have embeddings computed.
277pub fn find_similar_by_embedding(
278    conn: &Connection,
279    query_embedding: &[f32],
280    scope: &MemoryScope,
281    workspace: Option<&str>,
282    threshold: f32,
283) -> Result<Option<(Memory, f32)>> {
284    use crate::embedding::{cosine_similarity, get_embedding};
285
286    let now = Utc::now().to_rfc3339();
287    let scope_type = scope.scope_type();
288    let scope_id = scope.scope_id().map(|s| s.to_string());
289    let workspace = workspace.unwrap_or("default");
290
291    // Get all memories with embeddings in the same scope AND workspace
292    let mut stmt = conn.prepare_cached(
293        "SELECT id, content, memory_type, importance, access_count,
294                created_at, updated_at, last_accessed_at, owner_id,
295                visibility, version, has_embedding, metadata,
296                scope_type, scope_id, workspace, tier, expires_at, content_hash
297         FROM memories
298         WHERE has_embedding = 1 AND valid_to IS NULL
299           AND (expires_at IS NULL OR expires_at > ?)
300           AND scope_type = ?
301           AND (scope_id = ? OR (scope_id IS NULL AND ? IS NULL))
302           AND workspace = ?",
303    )?;
304
305    let memories: Vec<Memory> = stmt
306        .query_map(
307            params![now, scope_type, scope_id, scope_id, workspace],
308            memory_from_row,
309        )?
310        .filter_map(|r| r.ok())
311        .collect();
312
313    let mut best_match: Option<(Memory, f32)> = None;
314
315    for memory in memories {
316        if let Ok(Some(embedding)) = get_embedding(conn, memory.id) {
317            let similarity = cosine_similarity(query_embedding, &embedding);
318            if similarity >= threshold {
319                match &best_match {
320                    None => best_match = Some((memory, similarity)),
321                    Some((_, best_score)) if similarity > *best_score => {
322                        best_match = Some((memory, similarity));
323                    }
324                    _ => {}
325                }
326            }
327        }
328    }
329
330    // Load tags for the best match
331    if let Some((mut memory, score)) = best_match {
332        memory.tags = load_tags(conn, memory.id)?;
333        Ok(Some((memory, score)))
334    } else {
335        Ok(None)
336    }
337}
338
339/// A pair of potentially duplicate memories with their similarity score
340#[derive(Debug, Clone, serde::Serialize)]
341pub struct DuplicatePair {
342    pub memory_a: Memory,
343    pub memory_b: Memory,
344    pub similarity_score: f64,
345    pub match_type: DuplicateMatchType,
346}
347
348/// How the duplicate was detected
349#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
350#[serde(rename_all = "snake_case")]
351pub enum DuplicateMatchType {
352    /// Exact content hash match
353    ExactHash,
354    /// High similarity score from crossrefs
355    HighSimilarity,
356    /// Semantic similarity via embedding cosine distance
357    EmbeddingSimilarity,
358}
359
360/// Find all potential duplicate memory pairs
361///
362/// Returns pairs of memories that are either:
363/// 1. Exact duplicates (same content hash within same scope)
364/// 2. High similarity (crossref score >= threshold within same scope)
365///
366/// Duplicates are scoped - memories in different scopes are not considered duplicates.
367pub fn find_duplicates(conn: &Connection, threshold: f64) -> Result<Vec<DuplicatePair>> {
368    find_duplicates_in_workspace(conn, threshold, None)
369}
370
371/// Find duplicate memories within a specific workspace (or all if None)
372pub fn find_duplicates_in_workspace(
373    conn: &Connection,
374    threshold: f64,
375    workspace: Option<&str>,
376) -> Result<Vec<DuplicatePair>> {
377    let now = Utc::now().to_rfc3339();
378    let mut duplicates = Vec::new();
379
380    // First, find exact hash duplicates (same content_hash within same scope AND workspace)
381    let (hash_sql, hash_params): (&str, Vec<Box<dyn rusqlite::ToSql>>) = if let Some(ws) = workspace
382    {
383        (
384            "SELECT content_hash, scope_type, scope_id, GROUP_CONCAT(id) as ids
385             FROM memories
386             WHERE content_hash IS NOT NULL
387               AND valid_to IS NULL
388               AND (expires_at IS NULL OR expires_at > ?)
389               AND workspace = ?
390             GROUP BY content_hash, scope_type, scope_id, workspace
391             HAVING COUNT(*) > 1",
392            vec![Box::new(now.clone()), Box::new(ws.to_string())],
393        )
394    } else {
395        (
396            "SELECT content_hash, scope_type, scope_id, GROUP_CONCAT(id) as ids
397             FROM memories
398             WHERE content_hash IS NOT NULL
399               AND valid_to IS NULL
400               AND (expires_at IS NULL OR expires_at > ?)
401             GROUP BY content_hash, scope_type, scope_id, workspace
402             HAVING COUNT(*) > 1",
403            vec![Box::new(now.clone())],
404        )
405    };
406
407    let mut hash_stmt = conn.prepare_cached(hash_sql)?;
408    let hash_rows = hash_stmt.query_map(
409        rusqlite::params_from_iter(hash_params.iter().map(|p| p.as_ref())),
410        |row| {
411            let ids_str: String = row.get(3)?;
412            Ok(ids_str)
413        },
414    )?;
415
416    for ids_result in hash_rows {
417        let ids_str = ids_result?;
418        let ids: Vec<i64> = ids_str
419            .split(',')
420            .filter_map(|s| s.trim().parse().ok())
421            .collect();
422
423        // Create pairs from all IDs with same hash
424        // Use get_memory_internal with track_access=false to avoid inflating access stats
425        for i in 0..ids.len() {
426            for j in (i + 1)..ids.len() {
427                let memory_a = get_memory_internal(conn, ids[i], false)?;
428                let memory_b = get_memory_internal(conn, ids[j], false)?;
429                duplicates.push(DuplicatePair {
430                    memory_a,
431                    memory_b,
432                    similarity_score: 1.0, // Exact match
433                    match_type: DuplicateMatchType::ExactHash,
434                });
435            }
436        }
437    }
438
439    // Second, find high-similarity pairs from crossrefs (within same scope AND workspace)
440    let (sim_sql, sim_params): (&str, Vec<Box<dyn rusqlite::ToSql>>) = if let Some(ws) = workspace {
441        (
442            "SELECT DISTINCT c.from_id, c.to_id, c.score
443             FROM crossrefs c
444             JOIN memories m1 ON c.from_id = m1.id
445             JOIN memories m2 ON c.to_id = m2.id
446             WHERE c.score >= ?
447               AND m1.valid_to IS NULL
448               AND m2.valid_to IS NULL
449               AND (m1.expires_at IS NULL OR m1.expires_at > ?)
450               AND (m2.expires_at IS NULL OR m2.expires_at > ?)
451               AND c.from_id < c.to_id
452               AND m1.scope_type = m2.scope_type
453               AND (m1.scope_id = m2.scope_id OR (m1.scope_id IS NULL AND m2.scope_id IS NULL))
454               AND m1.workspace = ?
455               AND m2.workspace = ?
456             ORDER BY c.score DESC",
457            vec![
458                Box::new(threshold),
459                Box::new(now.clone()),
460                Box::new(now.clone()),
461                Box::new(ws.to_string()),
462                Box::new(ws.to_string()),
463            ],
464        )
465    } else {
466        (
467            "SELECT DISTINCT c.from_id, c.to_id, c.score
468             FROM crossrefs c
469             JOIN memories m1 ON c.from_id = m1.id
470             JOIN memories m2 ON c.to_id = m2.id
471             WHERE c.score >= ?
472               AND m1.valid_to IS NULL
473               AND m2.valid_to IS NULL
474               AND (m1.expires_at IS NULL OR m1.expires_at > ?)
475               AND (m2.expires_at IS NULL OR m2.expires_at > ?)
476               AND c.from_id < c.to_id
477               AND m1.scope_type = m2.scope_type
478               AND (m1.scope_id = m2.scope_id OR (m1.scope_id IS NULL AND m2.scope_id IS NULL))
479               AND m1.workspace = m2.workspace
480             ORDER BY c.score DESC",
481            vec![
482                Box::new(threshold),
483                Box::new(now.clone()),
484                Box::new(now.clone()),
485            ],
486        )
487    };
488
489    let mut sim_stmt = conn.prepare_cached(sim_sql)?;
490    let sim_rows = sim_stmt.query_map(
491        rusqlite::params_from_iter(sim_params.iter().map(|p| p.as_ref())),
492        |row| {
493            Ok((
494                row.get::<_, i64>(0)?,
495                row.get::<_, i64>(1)?,
496                row.get::<_, f64>(2)?,
497            ))
498        },
499    )?;
500
501    for row_result in sim_rows {
502        let (from_id, to_id, score) = row_result?;
503
504        // Skip if this pair was already found as exact hash match
505        let already_found = duplicates.iter().any(|d| {
506            (d.memory_a.id == from_id && d.memory_b.id == to_id)
507                || (d.memory_a.id == to_id && d.memory_b.id == from_id)
508        });
509
510        if !already_found {
511            // Use get_memory_internal with track_access=false to avoid inflating access stats
512            let memory_a = get_memory_internal(conn, from_id, false)?;
513            let memory_b = get_memory_internal(conn, to_id, false)?;
514            duplicates.push(DuplicatePair {
515                memory_a,
516                memory_b,
517                similarity_score: score,
518                match_type: DuplicateMatchType::HighSimilarity,
519            });
520        }
521    }
522
523    Ok(duplicates)
524}
525
526/// Find semantically similar memories using embedding cosine similarity.
527/// This is "LLM-powered" dedup — goes beyond hash/n-gram matching to detect
528/// memories that convey the same information with different wording.
529pub fn find_duplicates_by_embedding(
530    conn: &Connection,
531    threshold: f32,
532    workspace: Option<&str>,
533    limit: usize,
534) -> Result<Vec<DuplicatePair>> {
535    use crate::embedding::{cosine_similarity, get_embedding};
536
537    let now = Utc::now().to_rfc3339();
538
539    // Get all memory IDs with embeddings (scoped to workspace if provided)
540    let (sql, params_vec): (&str, Vec<Box<dyn rusqlite::ToSql>>) = if let Some(ws) = workspace {
541        (
542            "SELECT id FROM memories
543             WHERE has_embedding = 1 AND valid_to IS NULL
544               AND (expires_at IS NULL OR expires_at > ?)
545               AND COALESCE(lifecycle_state, 'active') = 'active'
546               AND workspace = ?
547             ORDER BY id",
548            vec![Box::new(now), Box::new(ws.to_string())],
549        )
550    } else {
551        (
552            "SELECT id FROM memories
553             WHERE has_embedding = 1 AND valid_to IS NULL
554               AND (expires_at IS NULL OR expires_at > ?)
555               AND COALESCE(lifecycle_state, 'active') = 'active'
556             ORDER BY id",
557            vec![Box::new(now)],
558        )
559    };
560
561    let mut stmt = conn.prepare(sql)?;
562    let ids: Vec<i64> = stmt
563        .query_map(
564            rusqlite::params_from_iter(params_vec.iter().map(|p| p.as_ref())),
565            |row| row.get(0),
566        )?
567        .filter_map(|r| r.ok())
568        .collect();
569
570    // Load all embeddings into memory for pairwise comparison
571    let mut embeddings: Vec<(i64, Vec<f32>)> = Vec::with_capacity(ids.len());
572    for &id in &ids {
573        if let Ok(Some(emb)) = get_embedding(conn, id) {
574            embeddings.push((id, emb));
575        }
576    }
577
578    let mut duplicates = Vec::new();
579
580    // Pairwise comparison (O(n^2) — bounded by limit)
581    for i in 0..embeddings.len() {
582        if duplicates.len() >= limit {
583            break;
584        }
585        for j in (i + 1)..embeddings.len() {
586            if duplicates.len() >= limit {
587                break;
588            }
589            let sim = cosine_similarity(&embeddings[i].1, &embeddings[j].1);
590            if sim >= threshold {
591                let memory_a = get_memory_internal(conn, embeddings[i].0, false)?;
592                let memory_b = get_memory_internal(conn, embeddings[j].0, false)?;
593                duplicates.push(DuplicatePair {
594                    memory_a,
595                    memory_b,
596                    similarity_score: sim as f64,
597                    match_type: DuplicateMatchType::EmbeddingSimilarity,
598                });
599            }
600        }
601    }
602
603    // Sort by similarity descending
604    duplicates.sort_by(|a, b| {
605        b.similarity_score
606            .partial_cmp(&a.similarity_score)
607            .unwrap_or(std::cmp::Ordering::Equal)
608    });
609
610    Ok(duplicates)
611}
612
613/// Create a new memory with deduplication support
614pub fn create_memory(conn: &Connection, input: &CreateMemoryInput) -> Result<Memory> {
615    let now = Utc::now();
616    let now_str = now.to_rfc3339();
617    let metadata_json = serde_json::to_string(&input.metadata)?;
618    let importance = input.importance.unwrap_or(0.5);
619
620    // Compute content hash for deduplication
621    let content_hash = compute_content_hash(&input.content);
622
623    // Normalize workspace early for dedup checking
624    let workspace = match &input.workspace {
625        Some(ws) => crate::types::normalize_workspace(ws)
626            .map_err(|e| EngramError::InvalidInput(format!("Invalid workspace: {}", e)))?,
627        None => "default".to_string(),
628    };
629
630    // Check for duplicates based on dedup_mode (scoped to same scope AND workspace)
631    if input.dedup_mode != DedupMode::Allow {
632        if let Some(existing) =
633            find_by_content_hash(conn, &content_hash, &input.scope, Some(&workspace))?
634        {
635            match input.dedup_mode {
636                DedupMode::Reject => {
637                    return Err(EngramError::Duplicate {
638                        existing_id: existing.id,
639                        message: format!(
640                            "Duplicate memory detected (id={}). Content hash: {}",
641                            existing.id, content_hash
642                        ),
643                    });
644                }
645                DedupMode::Skip => {
646                    // Return existing memory without modification
647                    return Ok(existing);
648                }
649                DedupMode::Merge => {
650                    // Merge: update existing memory with new tags and metadata
651                    let mut merged_tags = existing.tags.clone();
652                    for tag in &input.tags {
653                        if !merged_tags.contains(tag) {
654                            merged_tags.push(tag.clone());
655                        }
656                    }
657
658                    let mut merged_metadata = existing.metadata.clone();
659                    for (key, value) in &input.metadata {
660                        merged_metadata.insert(key.clone(), value.clone());
661                    }
662
663                    let update_input = UpdateMemoryInput {
664                        content: None, // Keep existing content
665                        memory_type: None,
666                        tags: Some(merged_tags),
667                        metadata: Some(merged_metadata),
668                        importance: input.importance, // Use new importance if provided
669                        scope: None,
670                        ttl_seconds: input.ttl_seconds, // Apply new TTL if provided
671
672                        event_time: None,
673                        trigger_pattern: None,
674                    };
675
676                    return update_memory(conn, existing.id, &update_input);
677                }
678                DedupMode::Allow => unreachable!(),
679            }
680        }
681    }
682
683    // Extract scope type and id for database storage
684    let scope_type = input.scope.scope_type();
685    let scope_id = input.scope.scope_id().map(|s| s.to_string());
686
687    // workspace was already normalized above for dedup checking
688
689    // Determine tier and enforce tier invariants
690    let tier = input.tier;
691
692    // Calculate expires_at based on tier and ttl_seconds
693    // Tier invariants:
694    //   - Permanent: expires_at MUST be NULL (cannot expire)
695    //   - Daily: expires_at MUST be set (default: created_at + 24h)
696    let expires_at = match tier {
697        MemoryTier::Permanent => {
698            // Permanent memories cannot have an expiration
699            if input.ttl_seconds.is_some() && input.ttl_seconds != Some(0) {
700                return Err(EngramError::InvalidInput(
701                    "Permanent tier memories cannot have a TTL. Use Daily tier for expiring memories.".to_string()
702                ));
703            }
704            None
705        }
706        MemoryTier::Daily => {
707            // Daily memories must have an expiration (default: 24 hours)
708            let ttl = input.ttl_seconds.filter(|&t| t > 0).unwrap_or(86400); // 24h default
709            Some((now + chrono::Duration::seconds(ttl)).to_rfc3339())
710        }
711    };
712
713    let event_time = input.event_time.map(|dt| dt.to_rfc3339());
714
715    conn.execute(
716        "INSERT INTO memories (content, memory_type, importance, metadata, created_at, updated_at, valid_from, scope_type, scope_id, workspace, tier, expires_at, content_hash, event_time, event_duration_seconds, trigger_pattern, summary_of_id)
717         VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
718        params![
719            input.content,
720            input.memory_type.as_str(),
721            importance,
722            metadata_json,
723            now_str,
724            now_str,
725            now_str,
726            scope_type,
727            scope_id,
728            workspace,
729            tier.as_str(),
730            expires_at,
731            content_hash,
732            event_time,
733            input.event_duration_seconds,
734            input.trigger_pattern,
735            input.summary_of_id,
736        ],
737    )?;
738
739    let id = conn.last_insert_rowid();
740
741    // Insert tags
742    for tag in &input.tags {
743        ensure_tag(conn, tag)?;
744        conn.execute(
745            "INSERT OR IGNORE INTO memory_tags (memory_id, tag_id)
746             SELECT ?, id FROM tags WHERE name = ?",
747            params![id, tag],
748        )?;
749    }
750
751    // Queue for embedding if not deferred
752    if !input.defer_embedding {
753        conn.execute(
754            "INSERT INTO embedding_queue (memory_id, status, queued_at)
755             VALUES (?, 'pending', ?)",
756            params![id, now_str],
757        )?;
758    }
759
760    // Create initial version
761    let tags_json = serde_json::to_string(&input.tags)?;
762    conn.execute(
763        "INSERT INTO memory_versions (memory_id, version, content, tags, metadata, created_at)
764         VALUES (?, 1, ?, ?, ?, ?)",
765        params![id, input.content, tags_json, metadata_json, now_str],
766    )?;
767
768    // Record event for sync delta tracking
769    record_event(
770        conn,
771        MemoryEventType::Created,
772        Some(id),
773        None,
774        serde_json::json!({
775            "workspace": input.workspace.as_deref().unwrap_or("default"),
776            "memory_type": input.memory_type.as_str(),
777        }),
778    )?;
779
780    // Update sync state (version now tracks event count for delta sync)
781    conn.execute(
782        "UPDATE sync_state SET pending_changes = pending_changes + 1, version = (SELECT MAX(id) FROM memory_events) WHERE id = 1",
783        [],
784    )?;
785
786    get_memory_internal(conn, id, false)
787}
788
789/// Ensure a tag exists and return its ID
790fn ensure_tag(conn: &Connection, tag: &str) -> Result<i64> {
791    conn.execute("INSERT OR IGNORE INTO tags (name) VALUES (?)", params![tag])?;
792
793    let id: i64 = conn.query_row("SELECT id FROM tags WHERE name = ?", params![tag], |row| {
794        row.get(0)
795    })?;
796
797    Ok(id)
798}
799
800/// Get a memory by ID
801pub fn get_memory(conn: &Connection, id: i64) -> Result<Memory> {
802    get_memory_internal(conn, id, true)
803}
804
805/// Update a memory
806pub fn update_memory(conn: &Connection, id: i64, input: &UpdateMemoryInput) -> Result<Memory> {
807    // Get current memory for versioning
808    let current = get_memory_internal(conn, id, false)?;
809    let now = Utc::now().to_rfc3339();
810
811    // Build update query dynamically
812    let mut updates = vec!["updated_at = ?".to_string()];
813    let mut values: Vec<Box<dyn rusqlite::ToSql>> = vec![Box::new(now.clone())];
814
815    if let Some(ref content) = input.content {
816        updates.push("content = ?".to_string());
817        values.push(Box::new(content.clone()));
818        // Recalculate content_hash when content changes
819        let new_hash = compute_content_hash(content);
820        updates.push("content_hash = ?".to_string());
821        values.push(Box::new(new_hash));
822    }
823
824    if let Some(ref memory_type) = input.memory_type {
825        updates.push("memory_type = ?".to_string());
826        values.push(Box::new(memory_type.as_str().to_string()));
827    }
828
829    if let Some(importance) = input.importance {
830        updates.push("importance = ?".to_string());
831        values.push(Box::new(importance));
832    }
833
834    if let Some(ref metadata) = input.metadata {
835        let metadata_json = serde_json::to_string(metadata)?;
836        updates.push("metadata = ?".to_string());
837        values.push(Box::new(metadata_json));
838    }
839
840    if let Some(ref scope) = input.scope {
841        updates.push("scope_type = ?".to_string());
842        values.push(Box::new(scope.scope_type().to_string()));
843        updates.push("scope_id = ?".to_string());
844        values.push(Box::new(scope.scope_id().map(|s| s.to_string())));
845    }
846
847    // Update event_time if provided (Some(None) clears)
848    if let Some(event_time) = &input.event_time {
849        updates.push("event_time = ?".to_string());
850        let value = event_time.as_ref().map(|dt| dt.to_rfc3339());
851        values.push(Box::new(value));
852    }
853
854    // Update trigger_pattern if provided (Some(None) clears)
855    if let Some(trigger_pattern) = &input.trigger_pattern {
856        updates.push("trigger_pattern = ?".to_string());
857        values.push(Box::new(trigger_pattern.clone()));
858    }
859
860    // Handle TTL update with tier invariant enforcement
861    // Normalize: ttl_seconds <= 0 means "no expiration" (consistent with create_memory)
862    // Invariants:
863    //   - Permanent tier: expires_at MUST be NULL
864    //   - Daily tier: expires_at MUST be set
865    if let Some(ttl) = input.ttl_seconds {
866        if ttl <= 0 {
867            // Request to remove expiration
868            // Only allowed for Permanent tier; for Daily tier, this is an error
869            if current.tier == MemoryTier::Daily {
870                return Err(crate::error::EngramError::InvalidInput(
871                    "Cannot remove expiration from a Daily tier memory. Use promote_to_permanent first.".to_string()
872                ));
873            }
874            updates.push("expires_at = NULL".to_string());
875        } else {
876            // Request to set expiration
877            // Only allowed for Daily tier; for Permanent tier, this is an error
878            if current.tier == MemoryTier::Permanent {
879                return Err(crate::error::EngramError::InvalidInput(
880                    "Cannot set expiration on a Permanent tier memory. Permanent memories cannot expire.".to_string()
881                ));
882            }
883            let expires_at = (Utc::now() + chrono::Duration::seconds(ttl)).to_rfc3339();
884            updates.push("expires_at = ?".to_string());
885            values.push(Box::new(expires_at));
886        }
887    }
888
889    // Increment version
890    updates.push("version = version + 1".to_string());
891
892    // Execute update
893    let sql = format!("UPDATE memories SET {} WHERE id = ?", updates.join(", "));
894    values.push(Box::new(id));
895
896    let params: Vec<&dyn rusqlite::ToSql> = values.iter().map(|b| b.as_ref()).collect();
897    conn.execute(&sql, params.as_slice())?;
898
899    // Update tags if provided
900    if let Some(ref tags) = input.tags {
901        conn.execute("DELETE FROM memory_tags WHERE memory_id = ?", params![id])?;
902        for tag in tags {
903            ensure_tag(conn, tag)?;
904            conn.execute(
905                "INSERT OR IGNORE INTO memory_tags (memory_id, tag_id)
906                 SELECT ?, id FROM tags WHERE name = ?",
907                params![id, tag],
908            )?;
909        }
910    }
911
912    // Create new version
913    let new_content = input.content.as_ref().unwrap_or(&current.content);
914    let new_tags = input.tags.as_ref().unwrap_or(&current.tags);
915    let new_metadata = input.metadata.as_ref().unwrap_or(&current.metadata);
916    let tags_json = serde_json::to_string(new_tags)?;
917    let metadata_json = serde_json::to_string(new_metadata)?;
918
919    conn.execute(
920        "INSERT INTO memory_versions (memory_id, version, content, tags, metadata, created_at)
921         VALUES (?, (SELECT version FROM memories WHERE id = ?), ?, ?, ?, ?)",
922        params![id, id, new_content, tags_json, metadata_json, now],
923    )?;
924
925    // Re-queue for embedding if content changed
926    if input.content.is_some() {
927        conn.execute(
928            "INSERT OR REPLACE INTO embedding_queue (memory_id, status, queued_at)
929             VALUES (?, 'pending', ?)",
930            params![id, now],
931        )?;
932        conn.execute(
933            "UPDATE memories SET has_embedding = 0 WHERE id = ?",
934            params![id],
935        )?;
936    }
937
938    // Build list of changed fields for event data
939    let mut changed_fields = Vec::new();
940    if input.content.is_some() {
941        changed_fields.push("content");
942    }
943    if input.tags.is_some() {
944        changed_fields.push("tags");
945    }
946    if input.metadata.is_some() {
947        changed_fields.push("metadata");
948    }
949    if input.importance.is_some() {
950        changed_fields.push("importance");
951    }
952    if input.ttl_seconds.is_some() {
953        changed_fields.push("ttl");
954    }
955
956    // Record event for sync delta tracking
957    record_event(
958        conn,
959        MemoryEventType::Updated,
960        Some(id),
961        None,
962        serde_json::json!({
963            "changed_fields": changed_fields,
964        }),
965    )?;
966
967    // Update sync state (version now tracks event count for delta sync)
968    conn.execute(
969        "UPDATE sync_state SET pending_changes = pending_changes + 1, version = (SELECT MAX(id) FROM memory_events) WHERE id = 1",
970        [],
971    )?;
972
973    get_memory_internal(conn, id, false)
974}
975
976/// Promote a memory from Daily tier to Permanent tier.
977///
978/// This operation:
979/// - Changes the tier from Daily to Permanent
980/// - Clears the expires_at field (permanent memories cannot expire)
981/// - Updates the updated_at timestamp
982///
983/// # Errors
984/// - Returns `NotFound` if memory doesn't exist
985/// - Returns `Validation` if memory is already Permanent
986pub fn promote_to_permanent(conn: &Connection, id: i64) -> Result<Memory> {
987    let memory = get_memory_internal(conn, id, false)?;
988
989    if memory.tier == MemoryTier::Permanent {
990        return Err(EngramError::InvalidInput(format!(
991            "Memory {} is already in the Permanent tier",
992            id
993        )));
994    }
995
996    let now = Utc::now().to_rfc3339();
997
998    conn.execute(
999        "UPDATE memories SET tier = 'permanent', expires_at = NULL, updated_at = ?, version = version + 1 WHERE id = ?",
1000        params![now, id],
1001    )?;
1002
1003    // Record event for sync delta tracking
1004    record_event(
1005        conn,
1006        MemoryEventType::Updated,
1007        Some(id),
1008        None,
1009        serde_json::json!({
1010            "changed_fields": ["tier", "expires_at"],
1011            "action": "promote_to_permanent",
1012        }),
1013    )?;
1014
1015    // Update sync state (version now tracks event count for delta sync)
1016    conn.execute(
1017        "UPDATE sync_state SET pending_changes = pending_changes + 1, version = (SELECT MAX(id) FROM memory_events) WHERE id = 1",
1018        [],
1019    )?;
1020
1021    tracing::info!(memory_id = id, "Promoted memory to permanent tier");
1022
1023    get_memory_internal(conn, id, false)
1024}
1025
1026/// Move a memory to a different workspace.
1027///
1028/// # Arguments
1029/// - `id`: Memory ID
1030/// - `workspace`: New workspace name (will be normalized)
1031///
1032/// # Errors
1033/// - Returns `NotFound` if memory doesn't exist
1034/// - Returns `Validation` if workspace name is invalid
1035pub fn move_to_workspace(conn: &Connection, id: i64, workspace: &str) -> Result<Memory> {
1036    // Validate workspace exists (by checking the memory exists first)
1037    let _memory = get_memory_internal(conn, id, false)?;
1038
1039    // Normalize the workspace name
1040    let normalized = crate::types::normalize_workspace(workspace)
1041        .map_err(|e| EngramError::InvalidInput(format!("Invalid workspace: {}", e)))?;
1042
1043    let now = Utc::now().to_rfc3339();
1044
1045    conn.execute(
1046        "UPDATE memories SET workspace = ?, updated_at = ?, version = version + 1 WHERE id = ?",
1047        params![normalized, now, id],
1048    )?;
1049
1050    // Record event for sync delta tracking
1051    record_event(
1052        conn,
1053        MemoryEventType::Updated,
1054        Some(id),
1055        None,
1056        serde_json::json!({
1057            "changed_fields": ["workspace"],
1058            "action": "move_to_workspace",
1059            "new_workspace": normalized,
1060        }),
1061    )?;
1062
1063    // Update sync state (version now tracks event count for delta sync)
1064    conn.execute(
1065        "UPDATE sync_state SET pending_changes = pending_changes + 1, version = (SELECT MAX(id) FROM memory_events) WHERE id = 1",
1066        [],
1067    )?;
1068
1069    tracing::info!(memory_id = id, workspace = %normalized, "Moved memory to workspace");
1070
1071    get_memory_internal(conn, id, false)
1072}
1073
1074/// List all workspaces with their statistics.
1075///
1076/// Returns computed stats for each workspace that has at least one memory.
1077/// Stats are computed on-demand (not cached at the database level).
1078pub fn list_workspaces(conn: &Connection) -> Result<Vec<WorkspaceStats>> {
1079    let now = Utc::now().to_rfc3339();
1080
1081    let mut stmt = conn.prepare(
1082        r#"
1083        SELECT
1084            workspace,
1085            COUNT(*) as memory_count,
1086            SUM(CASE WHEN tier = 'permanent' THEN 1 ELSE 0 END) as permanent_count,
1087            SUM(CASE WHEN tier = 'daily' THEN 1 ELSE 0 END) as daily_count,
1088            MIN(created_at) as first_memory_at,
1089            MAX(created_at) as last_memory_at,
1090            AVG(importance) as avg_importance
1091        FROM memories
1092        WHERE valid_to IS NULL AND (expires_at IS NULL OR expires_at > ?)
1093        GROUP BY workspace
1094        ORDER BY memory_count DESC
1095        "#,
1096    )?;
1097
1098    let workspaces: Vec<WorkspaceStats> = stmt
1099        .query_map(params![now], |row| {
1100            let workspace: String = row.get(0)?;
1101            let memory_count: i64 = row.get(1)?;
1102            let permanent_count: i64 = row.get(2)?;
1103            let daily_count: i64 = row.get(3)?;
1104            let first_memory_at: Option<String> = row.get(4)?;
1105            let last_memory_at: Option<String> = row.get(5)?;
1106            let avg_importance: Option<f64> = row.get(6)?;
1107
1108            Ok(WorkspaceStats {
1109                workspace,
1110                memory_count,
1111                permanent_count,
1112                daily_count,
1113                first_memory_at: first_memory_at.and_then(|s| {
1114                    DateTime::parse_from_rfc3339(&s)
1115                        .map(|dt| dt.with_timezone(&Utc))
1116                        .ok()
1117                }),
1118                last_memory_at: last_memory_at.and_then(|s| {
1119                    DateTime::parse_from_rfc3339(&s)
1120                        .map(|dt| dt.with_timezone(&Utc))
1121                        .ok()
1122                }),
1123                top_tags: vec![], // Loaded separately if needed
1124                avg_importance: avg_importance.map(|v| v as f32),
1125            })
1126        })?
1127        .filter_map(|r| r.ok())
1128        .collect();
1129
1130    Ok(workspaces)
1131}
1132
1133/// Get statistics for a specific workspace.
1134pub fn get_workspace_stats(conn: &Connection, workspace: &str) -> Result<WorkspaceStats> {
1135    let normalized = crate::types::normalize_workspace(workspace)
1136        .map_err(|e| EngramError::InvalidInput(format!("Invalid workspace: {}", e)))?;
1137
1138    let now = Utc::now().to_rfc3339();
1139
1140    let stats = conn
1141        .query_row(
1142            r#"
1143        SELECT
1144            workspace,
1145            COUNT(*) as memory_count,
1146            SUM(CASE WHEN tier = 'permanent' THEN 1 ELSE 0 END) as permanent_count,
1147            SUM(CASE WHEN tier = 'daily' THEN 1 ELSE 0 END) as daily_count,
1148            MIN(created_at) as first_memory_at,
1149            MAX(created_at) as last_memory_at,
1150            AVG(importance) as avg_importance
1151        FROM memories
1152        WHERE workspace = ? AND valid_to IS NULL AND (expires_at IS NULL OR expires_at > ?)
1153        GROUP BY workspace
1154        "#,
1155            params![normalized, now],
1156            |row| {
1157                let workspace: String = row.get(0)?;
1158                let memory_count: i64 = row.get(1)?;
1159                let permanent_count: i64 = row.get(2)?;
1160                let daily_count: i64 = row.get(3)?;
1161                let first_memory_at: Option<String> = row.get(4)?;
1162                let last_memory_at: Option<String> = row.get(5)?;
1163                let avg_importance: Option<f64> = row.get(6)?;
1164
1165                Ok(WorkspaceStats {
1166                    workspace,
1167                    memory_count,
1168                    permanent_count,
1169                    daily_count,
1170                    first_memory_at: first_memory_at.and_then(|s| {
1171                        DateTime::parse_from_rfc3339(&s)
1172                            .map(|dt| dt.with_timezone(&Utc))
1173                            .ok()
1174                    }),
1175                    last_memory_at: last_memory_at.and_then(|s| {
1176                        DateTime::parse_from_rfc3339(&s)
1177                            .map(|dt| dt.with_timezone(&Utc))
1178                            .ok()
1179                    }),
1180                    top_tags: vec![],
1181                    avg_importance: avg_importance.map(|v| v as f32),
1182                })
1183            },
1184        )
1185        .map_err(|e| match e {
1186            rusqlite::Error::QueryReturnedNoRows => {
1187                EngramError::NotFound(0) // Workspace doesn't exist
1188            }
1189            _ => EngramError::Database(e),
1190        })?;
1191
1192    Ok(stats)
1193}
1194
1195/// Delete a workspace by moving all its memories to the default workspace or deleting them.
1196///
1197/// # Arguments
1198/// - `workspace`: Workspace to delete
1199/// - `move_to_default`: If true, moves memories to "default" workspace. If false, deletes them.
1200///
1201/// # Returns
1202/// Number of memories affected.
1203pub fn delete_workspace(conn: &Connection, workspace: &str, move_to_default: bool) -> Result<i64> {
1204    let normalized = crate::types::normalize_workspace(workspace)
1205        .map_err(|e| EngramError::InvalidInput(format!("Invalid workspace: {}", e)))?;
1206
1207    if normalized == "default" {
1208        return Err(EngramError::InvalidInput(
1209            "Cannot delete the default workspace".to_string(),
1210        ));
1211    }
1212
1213    let now = Utc::now().to_rfc3339();
1214
1215    // First, get the IDs of all affected memories so we can record individual events
1216    let affected_ids: Vec<i64> = {
1217        let mut stmt =
1218            conn.prepare("SELECT id FROM memories WHERE workspace = ? AND valid_to IS NULL")?;
1219        let rows = stmt.query_map(params![&normalized], |row| row.get(0))?;
1220        rows.collect::<std::result::Result<Vec<_>, _>>()?
1221    };
1222
1223    let affected = affected_ids.len() as i64;
1224
1225    if affected > 0 {
1226        if move_to_default {
1227            // Move all memories to the default workspace
1228            conn.execute(
1229                "UPDATE memories SET workspace = 'default', updated_at = ?, version = version + 1 WHERE workspace = ? AND valid_to IS NULL",
1230                params![&now, &normalized],
1231            )?;
1232        } else {
1233            // Soft delete all memories in the workspace
1234            conn.execute(
1235                "UPDATE memories SET valid_to = ? WHERE workspace = ? AND valid_to IS NULL",
1236                params![&now, &normalized],
1237            )?;
1238        }
1239
1240        // Record individual events for each affected memory (for proper sync delta tracking)
1241        let event_type = if move_to_default {
1242            MemoryEventType::Updated
1243        } else {
1244            MemoryEventType::Deleted
1245        };
1246
1247        for memory_id in &affected_ids {
1248            record_event(
1249                conn,
1250                event_type.clone(),
1251                Some(*memory_id),
1252                None,
1253                serde_json::json!({
1254                    "action": "delete_workspace",
1255                    "workspace": normalized,
1256                    "move_to_default": move_to_default,
1257                }),
1258            )?;
1259        }
1260    }
1261
1262    // Update sync state (version now tracks event count for delta sync)
1263    conn.execute(
1264        "UPDATE sync_state SET pending_changes = pending_changes + ?, version = (SELECT COALESCE(MAX(id), 0) FROM memory_events) WHERE id = 1",
1265        params![affected],
1266    )?;
1267
1268    tracing::info!(
1269        workspace = %normalized,
1270        move_to_default,
1271        affected,
1272        "Deleted workspace"
1273    );
1274
1275    Ok(affected)
1276}
1277
1278/// Delete a memory (soft delete by setting valid_to)
1279pub fn delete_memory(conn: &Connection, id: i64) -> Result<()> {
1280    let now = Utc::now().to_rfc3339();
1281
1282    // Get memory info before deletion for event data
1283    let memory_info: Option<(String, String)> = conn
1284        .query_row(
1285            "SELECT workspace, memory_type FROM memories WHERE id = ? AND valid_to IS NULL",
1286            params![id],
1287            |row| Ok((row.get(0)?, row.get(1)?)),
1288        )
1289        .ok();
1290
1291    let affected = conn.execute(
1292        "UPDATE memories SET valid_to = ? WHERE id = ? AND valid_to IS NULL",
1293        params![now, id],
1294    )?;
1295
1296    if affected == 0 {
1297        return Err(EngramError::NotFound(id));
1298    }
1299
1300    // Also invalidate cross-references
1301    conn.execute(
1302        "UPDATE crossrefs SET valid_to = ? WHERE (from_id = ? OR to_id = ?) AND valid_to IS NULL",
1303        params![now, id, id],
1304    )?;
1305
1306    // Record event for sync delta tracking
1307    let (workspace, memory_type) =
1308        memory_info.unwrap_or(("default".to_string(), "unknown".to_string()));
1309    record_event(
1310        conn,
1311        MemoryEventType::Deleted,
1312        Some(id),
1313        None,
1314        serde_json::json!({
1315            "workspace": workspace,
1316            "memory_type": memory_type,
1317        }),
1318    )?;
1319
1320    // Update sync state (version now tracks event count for delta sync)
1321    conn.execute(
1322        "UPDATE sync_state SET pending_changes = pending_changes + 1, version = (SELECT MAX(id) FROM memory_events) WHERE id = 1",
1323        [],
1324    )?;
1325
1326    Ok(())
1327}
1328
1329/// List memories with filtering and pagination
1330pub fn list_memories(conn: &Connection, options: &ListOptions) -> Result<Vec<Memory>> {
1331    let now = Utc::now().to_rfc3339();
1332
1333    let mut sql = String::from(
1334        "SELECT DISTINCT m.id, m.content, m.memory_type, m.importance, m.access_count,
1335                m.created_at, m.updated_at, m.last_accessed_at, m.owner_id,
1336                m.visibility, m.version, m.has_embedding, m.metadata,
1337                m.scope_type, m.scope_id, m.workspace, m.tier, m.expires_at, m.content_hash
1338         FROM memories m",
1339    );
1340
1341    let mut conditions = vec!["m.valid_to IS NULL".to_string()];
1342    let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![];
1343
1344    // Exclude expired memories
1345    conditions.push("(m.expires_at IS NULL OR m.expires_at > ?)".to_string());
1346    params.push(Box::new(now));
1347
1348    // Tag filter (requires join)
1349    if let Some(ref tags) = options.tags {
1350        if !tags.is_empty() {
1351            sql.push_str(
1352                " JOIN memory_tags mt ON m.id = mt.memory_id
1353                  JOIN tags t ON mt.tag_id = t.id",
1354            );
1355            let placeholders: Vec<String> = tags.iter().map(|_| "?".to_string()).collect();
1356            conditions.push(format!("t.name IN ({})", placeholders.join(", ")));
1357            for tag in tags {
1358                params.push(Box::new(tag.clone()));
1359            }
1360        }
1361    }
1362
1363    // Type filter
1364    if let Some(ref memory_type) = options.memory_type {
1365        conditions.push("m.memory_type = ?".to_string());
1366        params.push(Box::new(memory_type.as_str().to_string()));
1367    }
1368
1369    // Metadata filter (JSON)
1370    if let Some(ref metadata_filter) = options.metadata_filter {
1371        for (key, value) in metadata_filter {
1372            metadata_value_to_param(key, value, &mut conditions, &mut params)?;
1373        }
1374    }
1375
1376    // Scope filter
1377    if let Some(ref scope) = options.scope {
1378        conditions.push("m.scope_type = ?".to_string());
1379        params.push(Box::new(scope.scope_type().to_string()));
1380        if let Some(scope_id) = scope.scope_id() {
1381            conditions.push("m.scope_id = ?".to_string());
1382            params.push(Box::new(scope_id.to_string()));
1383        } else {
1384            conditions.push("m.scope_id IS NULL".to_string());
1385        }
1386    }
1387
1388    // Workspace filter
1389    if let Some(ref workspace) = options.workspace {
1390        conditions.push("m.workspace = ?".to_string());
1391        params.push(Box::new(workspace.clone()));
1392    }
1393
1394    // Tier filter
1395    if let Some(ref tier) = options.tier {
1396        conditions.push("m.tier = ?".to_string());
1397        params.push(Box::new(tier.as_str().to_string()));
1398    }
1399
1400    sql.push_str(" WHERE ");
1401    sql.push_str(&conditions.join(" AND "));
1402
1403    // Sorting
1404    let sort_field = match options.sort_by.unwrap_or_default() {
1405        SortField::CreatedAt => "m.created_at",
1406        SortField::UpdatedAt => "m.updated_at",
1407        SortField::LastAccessedAt => "m.last_accessed_at",
1408        SortField::Importance => "m.importance",
1409        SortField::AccessCount => "m.access_count",
1410    };
1411    let sort_order = match options.sort_order.unwrap_or_default() {
1412        SortOrder::Asc => "ASC",
1413        SortOrder::Desc => "DESC",
1414    };
1415    sql.push_str(&format!(" ORDER BY {} {}", sort_field, sort_order));
1416
1417    // Pagination
1418    let limit = options.limit.unwrap_or(100);
1419    let offset = options.offset.unwrap_or(0);
1420    sql.push_str(&format!(" LIMIT {} OFFSET {}", limit, offset));
1421
1422    let param_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|b| b.as_ref()).collect();
1423    let mut stmt = conn.prepare(&sql)?;
1424
1425    let memories: Vec<Memory> = stmt
1426        .query_map(param_refs.as_slice(), memory_from_row)?
1427        .filter_map(|r| r.ok())
1428        .map(|mut m| {
1429            m.tags = load_tags(conn, m.id).unwrap_or_default();
1430            m
1431        })
1432        .collect();
1433
1434    Ok(memories)
1435}
1436
1437/// Query episodic memories ordered by event_time within a time range.
1438pub fn get_episodic_timeline(
1439    conn: &Connection,
1440    start_time: Option<DateTime<Utc>>,
1441    end_time: Option<DateTime<Utc>>,
1442    workspace: Option<&str>,
1443    tags: Option<&[String]>,
1444    limit: i64,
1445) -> Result<Vec<Memory>> {
1446    let now = Utc::now().to_rfc3339();
1447
1448    let mut sql = String::from(
1449        "SELECT DISTINCT m.id, m.content, m.memory_type, m.importance, m.access_count,
1450                m.created_at, m.updated_at, m.last_accessed_at, m.owner_id,
1451                m.visibility, m.version, m.has_embedding, m.metadata,
1452                m.scope_type, m.scope_id, m.workspace, m.tier, m.expires_at, m.content_hash,
1453                m.event_time, m.event_duration_seconds, m.trigger_pattern,
1454                m.procedure_success_count, m.procedure_failure_count, m.summary_of_id,
1455                m.lifecycle_state
1456         FROM memories m",
1457    );
1458
1459    let mut conditions = vec![
1460        "m.valid_to IS NULL".to_string(),
1461        "(m.expires_at IS NULL OR m.expires_at > ?)".to_string(),
1462        "m.memory_type = 'episodic'".to_string(),
1463        "m.event_time IS NOT NULL".to_string(),
1464    ];
1465    let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![Box::new(now)];
1466
1467    if let Some(start) = start_time {
1468        conditions.push("m.event_time >= ?".to_string());
1469        params.push(Box::new(start.to_rfc3339()));
1470    }
1471
1472    if let Some(end) = end_time {
1473        conditions.push("m.event_time <= ?".to_string());
1474        params.push(Box::new(end.to_rfc3339()));
1475    }
1476
1477    if let Some(ws) = workspace {
1478        conditions.push("m.workspace = ?".to_string());
1479        params.push(Box::new(ws.to_string()));
1480    }
1481
1482    if let Some(tag_list) = tags {
1483        if !tag_list.is_empty() {
1484            sql.push_str(
1485                " JOIN memory_tags mt ON m.id = mt.memory_id
1486                  JOIN tags t ON mt.tag_id = t.id",
1487            );
1488            let placeholders: Vec<String> = tag_list.iter().map(|_| "?".to_string()).collect();
1489            conditions.push(format!("t.name IN ({})", placeholders.join(", ")));
1490            for tag in tag_list {
1491                params.push(Box::new(tag.clone()));
1492            }
1493        }
1494    }
1495
1496    sql.push_str(" WHERE ");
1497    sql.push_str(&conditions.join(" AND "));
1498    sql.push_str(" ORDER BY m.event_time ASC");
1499    sql.push_str(&format!(" LIMIT {}", limit));
1500
1501    let param_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|b| b.as_ref()).collect();
1502    let mut stmt = conn.prepare(&sql)?;
1503
1504    let memories: Vec<Memory> = stmt
1505        .query_map(param_refs.as_slice(), memory_from_row)?
1506        .filter_map(|r| r.ok())
1507        .map(|mut m| {
1508            m.tags = load_tags(conn, m.id).unwrap_or_default();
1509            m
1510        })
1511        .collect();
1512
1513    Ok(memories)
1514}
1515
1516/// Query procedural memories, optionally filtered by trigger pattern and success rate.
1517pub fn get_procedural_memories(
1518    conn: &Connection,
1519    trigger_pattern: Option<&str>,
1520    workspace: Option<&str>,
1521    min_success_rate: Option<f32>,
1522    limit: i64,
1523) -> Result<Vec<Memory>> {
1524    let now = Utc::now().to_rfc3339();
1525
1526    let sql_base = "SELECT m.id, m.content, m.memory_type, m.importance, m.access_count,
1527                m.created_at, m.updated_at, m.last_accessed_at, m.owner_id,
1528                m.visibility, m.version, m.has_embedding, m.metadata,
1529                m.scope_type, m.scope_id, m.workspace, m.tier, m.expires_at, m.content_hash,
1530                m.event_time, m.event_duration_seconds, m.trigger_pattern,
1531                m.procedure_success_count, m.procedure_failure_count, m.summary_of_id,
1532                m.lifecycle_state
1533         FROM memories m";
1534
1535    let mut conditions = vec![
1536        "m.valid_to IS NULL".to_string(),
1537        "(m.expires_at IS NULL OR m.expires_at > ?)".to_string(),
1538        "m.memory_type = 'procedural'".to_string(),
1539    ];
1540    let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![Box::new(now)];
1541
1542    if let Some(pattern) = trigger_pattern {
1543        conditions.push("m.trigger_pattern LIKE ?".to_string());
1544        params.push(Box::new(format!("%{}%", pattern)));
1545    }
1546
1547    if let Some(ws) = workspace {
1548        conditions.push("m.workspace = ?".to_string());
1549        params.push(Box::new(ws.to_string()));
1550    }
1551
1552    if let Some(min_rate) = min_success_rate {
1553        // Filter: success / (success + failure) >= min_rate
1554        // Only apply when there's at least one execution
1555        conditions.push("(m.procedure_success_count + m.procedure_failure_count) > 0".to_string());
1556        conditions.push(
1557            "CAST(m.procedure_success_count AS REAL) / (m.procedure_success_count + m.procedure_failure_count) >= ?"
1558                .to_string(),
1559        );
1560        params.push(Box::new(min_rate as f64));
1561    }
1562
1563    let sql = format!(
1564        "{} WHERE {} ORDER BY m.procedure_success_count DESC LIMIT {}",
1565        sql_base,
1566        conditions.join(" AND "),
1567        limit
1568    );
1569
1570    let param_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|b| b.as_ref()).collect();
1571    let mut stmt = conn.prepare(&sql)?;
1572
1573    let memories: Vec<Memory> = stmt
1574        .query_map(param_refs.as_slice(), memory_from_row)?
1575        .filter_map(|r| r.ok())
1576        .map(|mut m| {
1577            m.tags = load_tags(conn, m.id).unwrap_or_default();
1578            m
1579        })
1580        .collect();
1581
1582    Ok(memories)
1583}
1584
1585/// Record a success or failure outcome for a procedural memory.
1586pub fn record_procedure_outcome(
1587    conn: &Connection,
1588    memory_id: i64,
1589    success: bool,
1590) -> Result<Memory> {
1591    let column = if success {
1592        "procedure_success_count"
1593    } else {
1594        "procedure_failure_count"
1595    };
1596
1597    let now = Utc::now().to_rfc3339();
1598
1599    // Verify the memory exists and is procedural
1600    let memory_type: String = conn
1601        .query_row(
1602            "SELECT memory_type FROM memories WHERE id = ? AND valid_to IS NULL",
1603            params![memory_id],
1604            |row| row.get(0),
1605        )
1606        .map_err(|_| EngramError::NotFound(memory_id))?;
1607
1608    if memory_type != "procedural" {
1609        return Err(EngramError::InvalidInput(format!(
1610            "Memory {} is type '{}', not 'procedural'",
1611            memory_id, memory_type
1612        )));
1613    }
1614
1615    conn.execute(
1616        &format!(
1617            "UPDATE memories SET {} = {} + 1, updated_at = ? WHERE id = ?",
1618            column, column
1619        ),
1620        params![now, memory_id],
1621    )?;
1622
1623    get_memory(conn, memory_id)
1624}
1625
1626/// Create a cross-reference between memories
1627pub fn create_crossref(conn: &Connection, input: &CreateCrossRefInput) -> Result<CrossReference> {
1628    let now = Utc::now().to_rfc3339();
1629
1630    // Verify both memories exist
1631    let _ = get_memory_internal(conn, input.from_id, false)?;
1632    let _ = get_memory_internal(conn, input.to_id, false)?;
1633
1634    let strength = input.strength.unwrap_or(1.0);
1635
1636    conn.execute(
1637        "INSERT INTO crossrefs (from_id, to_id, edge_type, score, strength, source, source_context, pinned, created_at, valid_from)
1638         VALUES (?, ?, ?, 1.0, ?, 'manual', ?, ?, ?, ?)
1639         ON CONFLICT(from_id, to_id, edge_type) DO UPDATE SET
1640            strength = excluded.strength,
1641            source_context = COALESCE(excluded.source_context, crossrefs.source_context),
1642            pinned = excluded.pinned",
1643        params![
1644            input.from_id,
1645            input.to_id,
1646            input.edge_type.as_str(),
1647            strength,
1648            input.source_context,
1649            input.pinned,
1650            now,
1651            now,
1652        ],
1653    )?;
1654
1655    get_crossref(conn, input.from_id, input.to_id, input.edge_type)
1656}
1657
1658/// Get a cross-reference
1659pub fn get_crossref(
1660    conn: &Connection,
1661    from_id: i64,
1662    to_id: i64,
1663    edge_type: EdgeType,
1664) -> Result<CrossReference> {
1665    let mut stmt = conn.prepare_cached(
1666        "SELECT from_id, to_id, edge_type, score, confidence, strength, source,
1667                source_context, created_at, valid_from, valid_to, pinned, metadata
1668         FROM crossrefs
1669         WHERE from_id = ? AND to_id = ? AND edge_type = ? AND valid_to IS NULL",
1670    )?;
1671
1672    let crossref = stmt.query_row(params![from_id, to_id, edge_type.as_str()], |row| {
1673        let edge_type_str: String = row.get("edge_type")?;
1674        let source_str: String = row.get("source")?;
1675        let created_at_str: String = row.get("created_at")?;
1676        let valid_from_str: String = row.get("valid_from")?;
1677        let valid_to_str: Option<String> = row.get("valid_to")?;
1678        let metadata_str: String = row.get("metadata")?;
1679
1680        Ok(CrossReference {
1681            from_id: row.get("from_id")?,
1682            to_id: row.get("to_id")?,
1683            edge_type: edge_type_str.parse().unwrap_or(EdgeType::RelatedTo),
1684            score: row.get("score")?,
1685            confidence: row.get("confidence")?,
1686            strength: row.get("strength")?,
1687            source: match source_str.as_str() {
1688                "manual" => RelationSource::Manual,
1689                "llm" => RelationSource::Llm,
1690                _ => RelationSource::Auto,
1691            },
1692            source_context: row.get("source_context")?,
1693            created_at: DateTime::parse_from_rfc3339(&created_at_str)
1694                .map(|dt| dt.with_timezone(&Utc))
1695                .unwrap_or_else(|_| Utc::now()),
1696            valid_from: DateTime::parse_from_rfc3339(&valid_from_str)
1697                .map(|dt| dt.with_timezone(&Utc))
1698                .unwrap_or_else(|_| Utc::now()),
1699            valid_to: valid_to_str.and_then(|s| {
1700                DateTime::parse_from_rfc3339(&s)
1701                    .map(|dt| dt.with_timezone(&Utc))
1702                    .ok()
1703            }),
1704            pinned: row.get::<_, i32>("pinned")? != 0,
1705            metadata: serde_json::from_str(&metadata_str).unwrap_or_default(),
1706        })
1707    })?;
1708
1709    Ok(crossref)
1710}
1711
1712/// Get all cross-references for a memory
1713pub fn get_related(conn: &Connection, memory_id: i64) -> Result<Vec<CrossReference>> {
1714    let mut stmt = conn.prepare_cached(
1715        "SELECT from_id, to_id, edge_type, score, confidence, strength, source,
1716                source_context, created_at, valid_from, valid_to, pinned, metadata
1717         FROM crossrefs
1718         WHERE (from_id = ? OR to_id = ?) AND valid_to IS NULL
1719         ORDER BY score DESC",
1720    )?;
1721
1722    let crossrefs: Vec<CrossReference> = stmt
1723        .query_map(params![memory_id, memory_id], |row| {
1724            let edge_type_str: String = row.get("edge_type")?;
1725            let source_str: String = row.get("source")?;
1726            let created_at_str: String = row.get("created_at")?;
1727            let valid_from_str: String = row.get("valid_from")?;
1728            let valid_to_str: Option<String> = row.get("valid_to")?;
1729            let metadata_str: String = row.get("metadata")?;
1730
1731            Ok(CrossReference {
1732                from_id: row.get("from_id")?,
1733                to_id: row.get("to_id")?,
1734                edge_type: edge_type_str.parse().unwrap_or(EdgeType::RelatedTo),
1735                score: row.get("score")?,
1736                confidence: row.get("confidence")?,
1737                strength: row.get("strength")?,
1738                source: match source_str.as_str() {
1739                    "manual" => RelationSource::Manual,
1740                    "llm" => RelationSource::Llm,
1741                    _ => RelationSource::Auto,
1742                },
1743                source_context: row.get("source_context")?,
1744                created_at: DateTime::parse_from_rfc3339(&created_at_str)
1745                    .map(|dt| dt.with_timezone(&Utc))
1746                    .unwrap_or_else(|_| Utc::now()),
1747                valid_from: DateTime::parse_from_rfc3339(&valid_from_str)
1748                    .map(|dt| dt.with_timezone(&Utc))
1749                    .unwrap_or_else(|_| Utc::now()),
1750                valid_to: valid_to_str.and_then(|s| {
1751                    DateTime::parse_from_rfc3339(&s)
1752                        .map(|dt| dt.with_timezone(&Utc))
1753                        .ok()
1754                }),
1755                pinned: row.get::<_, i32>("pinned")? != 0,
1756                metadata: serde_json::from_str(&metadata_str).unwrap_or_default(),
1757            })
1758        })?
1759        .filter_map(|r| r.ok())
1760        .collect();
1761
1762    Ok(crossrefs)
1763}
1764
1765/// Delete a cross-reference (soft delete)
1766pub fn delete_crossref(
1767    conn: &Connection,
1768    from_id: i64,
1769    to_id: i64,
1770    edge_type: EdgeType,
1771) -> Result<()> {
1772    let now = Utc::now().to_rfc3339();
1773
1774    let affected = conn.execute(
1775        "UPDATE crossrefs SET valid_to = ?
1776         WHERE from_id = ? AND to_id = ? AND edge_type = ? AND valid_to IS NULL",
1777        params![now, from_id, to_id, edge_type.as_str()],
1778    )?;
1779
1780    if affected == 0 {
1781        return Err(EngramError::NotFound(from_id));
1782    }
1783
1784    Ok(())
1785}
1786
1787/// Set expiration on an existing memory
1788///
1789/// # Arguments
1790/// * `conn` - Database connection
1791/// * `id` - Memory ID
1792/// * `ttl_seconds` - Time-to-live in seconds (0 = remove expiration, None = no change)
1793pub fn set_memory_expiration(
1794    conn: &Connection,
1795    id: i64,
1796    ttl_seconds: Option<i64>,
1797) -> Result<Memory> {
1798    // Verify memory exists and is not expired
1799    let _ = get_memory_internal(conn, id, false)?;
1800
1801    match ttl_seconds {
1802        Some(0) => {
1803            // Remove expiration
1804            conn.execute(
1805                "UPDATE memories SET expires_at = NULL, updated_at = ? WHERE id = ?",
1806                params![Utc::now().to_rfc3339(), id],
1807            )?;
1808        }
1809        Some(ttl) => {
1810            // Set new expiration
1811            let expires_at = (Utc::now() + chrono::Duration::seconds(ttl)).to_rfc3339();
1812            conn.execute(
1813                "UPDATE memories SET expires_at = ?, updated_at = ? WHERE id = ?",
1814                params![expires_at, Utc::now().to_rfc3339(), id],
1815            )?;
1816        }
1817        None => {
1818            // No change - don't record event or update sync state
1819            return get_memory_internal(conn, id, false);
1820        }
1821    }
1822
1823    // Record event for sync delta tracking
1824    record_event(
1825        conn,
1826        MemoryEventType::Updated,
1827        Some(id),
1828        None,
1829        serde_json::json!({
1830            "changed_fields": ["expires_at"],
1831            "action": "set_expiration",
1832        }),
1833    )?;
1834
1835    // Update sync state (version now tracks event count for delta sync)
1836    conn.execute(
1837        "UPDATE sync_state SET pending_changes = pending_changes + 1, version = (SELECT MAX(id) FROM memory_events) WHERE id = 1",
1838        [],
1839    )?;
1840
1841    get_memory_internal(conn, id, false)
1842}
1843
1844/// Delete all expired memories (cleanup job)
1845///
1846/// Returns the number of memories deleted
1847pub fn cleanup_expired_memories(conn: &Connection) -> Result<i64> {
1848    let now = Utc::now().to_rfc3339();
1849
1850    // Soft delete expired memories by setting valid_to
1851    let affected = conn.execute(
1852        "UPDATE memories SET valid_to = ?
1853         WHERE expires_at IS NOT NULL AND expires_at <= ? AND valid_to IS NULL",
1854        params![now, now],
1855    )?;
1856
1857    if affected > 0 {
1858        // Also invalidate cross-references involving expired memories
1859        conn.execute(
1860            "UPDATE crossrefs SET valid_to = ?
1861             WHERE valid_to IS NULL AND (
1862                 from_id IN (SELECT id FROM memories WHERE valid_to IS NOT NULL AND expires_at IS NOT NULL AND expires_at <= ?)
1863                 OR to_id IN (SELECT id FROM memories WHERE valid_to IS NOT NULL AND expires_at IS NOT NULL AND expires_at <= ?)
1864             )",
1865            params![now, now, now],
1866        )?;
1867
1868        // Remove memory_entities links for expired memories
1869        // This ensures expired memories don't appear in entity-based queries
1870        conn.execute(
1871            "DELETE FROM memory_entities
1872             WHERE memory_id IN (
1873                 SELECT id FROM memories
1874                 WHERE valid_to IS NOT NULL AND expires_at IS NOT NULL AND expires_at <= ?
1875             )",
1876            params![now],
1877        )?;
1878
1879        // Remove memory_tags links for expired memories
1880        conn.execute(
1881            "DELETE FROM memory_tags
1882             WHERE memory_id IN (
1883                 SELECT id FROM memories
1884                 WHERE valid_to IS NOT NULL AND expires_at IS NOT NULL AND expires_at <= ?
1885             )",
1886            params![now],
1887        )?;
1888
1889        // Record batch event for sync delta tracking
1890        record_event(
1891            conn,
1892            MemoryEventType::Deleted,
1893            None, // Batch operation
1894            None,
1895            serde_json::json!({
1896                "action": "cleanup_expired",
1897                "affected_count": affected,
1898            }),
1899        )?;
1900
1901        // Update sync state (version now tracks event count for delta sync)
1902        conn.execute(
1903            "UPDATE sync_state SET pending_changes = pending_changes + ?, version = (SELECT COALESCE(MAX(id), 0) FROM memory_events) WHERE id = 1",
1904            params![affected as i64],
1905        )?;
1906    }
1907
1908    Ok(affected as i64)
1909}
1910
1911/// Get count of expired memories (for monitoring)
1912pub fn count_expired_memories(conn: &Connection) -> Result<i64> {
1913    let now = Utc::now().to_rfc3339();
1914
1915    let count: i64 = conn.query_row(
1916        "SELECT COUNT(*) FROM memories
1917         WHERE expires_at IS NOT NULL AND expires_at <= ? AND valid_to IS NULL",
1918        params![now],
1919        |row| row.get(0),
1920    )?;
1921
1922    Ok(count)
1923}
1924
1925/// A per-workspace retention policy.
1926#[derive(Debug, Clone, Serialize, Deserialize)]
1927pub struct RetentionPolicy {
1928    pub id: i64,
1929    pub workspace: String,
1930    pub max_age_days: Option<i64>,
1931    pub max_memories: Option<i64>,
1932    pub compress_after_days: Option<i64>,
1933    pub compress_max_importance: f32,
1934    pub compress_min_access: i32,
1935    pub auto_delete_after_days: Option<i64>,
1936    pub exclude_types: Vec<String>,
1937    pub created_at: String,
1938    pub updated_at: String,
1939}
1940
1941/// Get retention policy for a workspace.
1942pub fn get_retention_policy(conn: &Connection, workspace: &str) -> Result<Option<RetentionPolicy>> {
1943    conn.query_row(
1944        "SELECT id, workspace, max_age_days, max_memories, compress_after_days,
1945                compress_max_importance, compress_min_access, auto_delete_after_days,
1946                exclude_types, created_at, updated_at
1947         FROM retention_policies WHERE workspace = ?",
1948        params![workspace],
1949        |row| {
1950            let exclude_str: Option<String> = row.get(8)?;
1951            Ok(RetentionPolicy {
1952                id: row.get(0)?,
1953                workspace: row.get(1)?,
1954                max_age_days: row.get(2)?,
1955                max_memories: row.get(3)?,
1956                compress_after_days: row.get(4)?,
1957                compress_max_importance: row.get::<_, f32>(5).unwrap_or(0.3),
1958                compress_min_access: row.get::<_, i32>(6).unwrap_or(3),
1959                auto_delete_after_days: row.get(7)?,
1960                exclude_types: exclude_str
1961                    .map(|s| s.split(',').map(|t| t.trim().to_string()).collect())
1962                    .unwrap_or_default(),
1963                created_at: row.get(9)?,
1964                updated_at: row.get(10)?,
1965            })
1966        },
1967    )
1968    .optional()
1969    .map_err(EngramError::from)
1970}
1971
1972/// List all retention policies.
1973pub fn list_retention_policies(conn: &Connection) -> Result<Vec<RetentionPolicy>> {
1974    let mut stmt = conn.prepare(
1975        "SELECT id, workspace, max_age_days, max_memories, compress_after_days,
1976                compress_max_importance, compress_min_access, auto_delete_after_days,
1977                exclude_types, created_at, updated_at
1978         FROM retention_policies ORDER BY workspace",
1979    )?;
1980
1981    let policies = stmt
1982        .query_map([], |row| {
1983            let exclude_str: Option<String> = row.get(8)?;
1984            Ok(RetentionPolicy {
1985                id: row.get(0)?,
1986                workspace: row.get(1)?,
1987                max_age_days: row.get(2)?,
1988                max_memories: row.get(3)?,
1989                compress_after_days: row.get(4)?,
1990                compress_max_importance: row.get::<_, f32>(5).unwrap_or(0.3),
1991                compress_min_access: row.get::<_, i32>(6).unwrap_or(3),
1992                auto_delete_after_days: row.get(7)?,
1993                exclude_types: exclude_str
1994                    .map(|s| s.split(',').map(|t| t.trim().to_string()).collect())
1995                    .unwrap_or_default(),
1996                created_at: row.get(9)?,
1997                updated_at: row.get(10)?,
1998            })
1999        })?
2000        .filter_map(|r| r.ok())
2001        .collect();
2002
2003    Ok(policies)
2004}
2005
2006/// Upsert a retention policy for a workspace.
2007pub fn set_retention_policy(
2008    conn: &Connection,
2009    workspace: &str,
2010    max_age_days: Option<i64>,
2011    max_memories: Option<i64>,
2012    compress_after_days: Option<i64>,
2013    compress_max_importance: Option<f32>,
2014    compress_min_access: Option<i32>,
2015    auto_delete_after_days: Option<i64>,
2016    exclude_types: Option<Vec<String>>,
2017) -> Result<RetentionPolicy> {
2018    let now = Utc::now().to_rfc3339();
2019    let exclude_str = exclude_types.map(|v| v.join(","));
2020
2021    conn.execute(
2022        "INSERT INTO retention_policies (workspace, max_age_days, max_memories, compress_after_days,
2023            compress_max_importance, compress_min_access, auto_delete_after_days, exclude_types,
2024            created_at, updated_at)
2025         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?9)
2026         ON CONFLICT(workspace) DO UPDATE SET
2027            max_age_days = COALESCE(?2, max_age_days),
2028            max_memories = COALESCE(?3, max_memories),
2029            compress_after_days = COALESCE(?4, compress_after_days),
2030            compress_max_importance = COALESCE(?5, compress_max_importance),
2031            compress_min_access = COALESCE(?6, compress_min_access),
2032            auto_delete_after_days = COALESCE(?7, auto_delete_after_days),
2033            exclude_types = COALESCE(?8, exclude_types),
2034            updated_at = ?9",
2035        params![
2036            workspace,
2037            max_age_days,
2038            max_memories,
2039            compress_after_days,
2040            compress_max_importance.unwrap_or(0.3),
2041            compress_min_access.unwrap_or(3),
2042            auto_delete_after_days,
2043            exclude_str,
2044            now,
2045        ],
2046    )?;
2047
2048    get_retention_policy(conn, workspace)?.ok_or_else(|| EngramError::NotFound(0))
2049}
2050
2051/// Delete a retention policy for a workspace.
2052pub fn delete_retention_policy(conn: &Connection, workspace: &str) -> Result<bool> {
2053    let affected = conn.execute(
2054        "DELETE FROM retention_policies WHERE workspace = ?",
2055        params![workspace],
2056    )?;
2057    Ok(affected > 0)
2058}
2059
2060/// Apply all retention policies. Returns total memories affected across all workspaces.
2061pub fn apply_retention_policies(conn: &Connection) -> Result<i64> {
2062    let policies = list_retention_policies(conn)?;
2063    let mut total_affected = 0i64;
2064
2065    for policy in &policies {
2066        // 1. Auto-compress based on compress_after_days
2067        if let Some(compress_days) = policy.compress_after_days {
2068            let compressed = compress_old_memories(
2069                conn,
2070                compress_days,
2071                policy.compress_max_importance,
2072                policy.compress_min_access,
2073                100,
2074            )?;
2075            total_affected += compressed;
2076        }
2077
2078        // 2. Enforce max_memories limit per workspace
2079        if let Some(max_mem) = policy.max_memories {
2080            let count: i64 = conn
2081                .query_row(
2082                    "SELECT COUNT(*) FROM memories WHERE workspace = ? AND valid_to IS NULL
2083                     AND COALESCE(lifecycle_state, 'active') = 'active'",
2084                    params![policy.workspace],
2085                    |row| row.get(0),
2086                )
2087                .unwrap_or(0);
2088
2089            if count > max_mem {
2090                // Archive excess (oldest, lowest importance first)
2091                let excess = count - max_mem;
2092                let archived = conn.execute(
2093                    "UPDATE memories SET lifecycle_state = 'archived'
2094                     WHERE id IN (
2095                        SELECT id FROM memories
2096                        WHERE workspace = ? AND valid_to IS NULL
2097                          AND COALESCE(lifecycle_state, 'active') = 'active'
2098                          AND memory_type NOT IN ('summary', 'checkpoint')
2099                        ORDER BY importance ASC, access_count ASC, created_at ASC
2100                        LIMIT ?
2101                     )",
2102                    params![policy.workspace, excess],
2103                )?;
2104                total_affected += archived as i64;
2105            }
2106        }
2107
2108        // 3. Auto-delete very old archived memories
2109        if let Some(delete_days) = policy.auto_delete_after_days {
2110            let cutoff = (Utc::now() - chrono::Duration::days(delete_days)).to_rfc3339();
2111            let now = Utc::now().to_rfc3339();
2112            let deleted = conn.execute(
2113                "UPDATE memories SET valid_to = ?
2114                 WHERE workspace = ? AND valid_to IS NULL
2115                   AND lifecycle_state = 'archived'
2116                   AND created_at < ?",
2117                params![now, policy.workspace, cutoff],
2118            )?;
2119            total_affected += deleted as i64;
2120        }
2121    }
2122
2123    Ok(total_affected)
2124}
2125
2126/// Auto-compress old, rarely-accessed memories by creating summaries and archiving originals.
2127/// Returns the number of memories archived.
2128pub fn compress_old_memories(
2129    conn: &Connection,
2130    max_age_days: i64,
2131    max_importance: f32,
2132    min_access_count: i32,
2133    batch_limit: usize,
2134) -> Result<i64> {
2135    let cutoff = (Utc::now() - chrono::Duration::days(max_age_days)).to_rfc3339();
2136    let now = Utc::now().to_rfc3339();
2137
2138    // Find candidates: old, low-importance, rarely-accessed, not already archived/summary
2139    let mut stmt = conn.prepare(
2140        "SELECT id, content, memory_type, importance, tags, workspace
2141         FROM (
2142            SELECT m.id, m.content, m.memory_type, m.importance, m.access_count, m.workspace,
2143                   COALESCE(m.lifecycle_state, 'active') as lifecycle_state,
2144                   (SELECT GROUP_CONCAT(t.name, ',') FROM memory_tags mt JOIN tags t ON mt.tag_id = t.id WHERE mt.memory_id = m.id) as tags
2145            FROM memories m
2146            WHERE m.valid_to IS NULL
2147              AND (m.expires_at IS NULL OR m.expires_at > ?1)
2148              AND m.created_at < ?2
2149              AND m.importance <= ?3
2150              AND m.access_count < ?4
2151              AND m.memory_type NOT IN ('summary', 'checkpoint')
2152              AND COALESCE(m.lifecycle_state, 'active') = 'active'
2153            ORDER BY m.created_at ASC
2154            LIMIT ?5
2155         )",
2156    )?;
2157
2158    let candidates: Vec<(i64, String, String, f32, Option<String>, String)> = stmt
2159        .query_map(
2160            params![
2161                now,
2162                cutoff,
2163                max_importance,
2164                min_access_count,
2165                batch_limit as i64
2166            ],
2167            |row| {
2168                Ok((
2169                    row.get(0)?,
2170                    row.get(1)?,
2171                    row.get(2)?,
2172                    row.get(3)?,
2173                    row.get(4)?,
2174                    row.get::<_, String>(5)
2175                        .unwrap_or_else(|_| "default".to_string()),
2176                ))
2177            },
2178        )?
2179        .filter_map(|r| r.ok())
2180        .collect();
2181
2182    let mut archived = 0i64;
2183
2184    for (id, content, memory_type, importance, tags_csv, workspace) in &candidates {
2185        // Create compressed summary
2186        let summary_text = if content.len() > 200 {
2187            let head: String = content.chars().take(120).collect();
2188            let tail: String = content
2189                .chars()
2190                .rev()
2191                .take(60)
2192                .collect::<String>()
2193                .chars()
2194                .rev()
2195                .collect();
2196            format!("{}...{}", head, tail)
2197        } else {
2198            content.clone()
2199        };
2200
2201        let tags: Vec<String> = tags_csv
2202            .as_deref()
2203            .unwrap_or("")
2204            .split(',')
2205            .filter(|s| !s.is_empty())
2206            .map(|s| s.to_string())
2207            .collect();
2208
2209        let input = CreateMemoryInput {
2210            content: format!("[Archived {}] {}", memory_type, summary_text),
2211            memory_type: MemoryType::Summary,
2212            importance: Some(*importance),
2213            tags,
2214            workspace: Some(workspace.clone()),
2215            tier: MemoryTier::Permanent,
2216            summary_of_id: Some(*id),
2217            ..Default::default()
2218        };
2219
2220        if create_memory(conn, &input).is_ok()
2221            && conn
2222                .execute(
2223                    "UPDATE memories SET lifecycle_state = 'archived' WHERE id = ? AND valid_to IS NULL",
2224                    params![id],
2225                )
2226                .is_ok()
2227        {
2228            archived += 1;
2229        }
2230    }
2231
2232    Ok(archived)
2233}
2234
2235/// A compact memory representation for efficient list views.
2236/// Contains only essential fields and a truncated content preview.
2237#[derive(Debug, Clone, serde::Serialize)]
2238pub struct CompactMemoryRow {
2239    /// Memory ID
2240    pub id: i64,
2241    /// Content preview (first line or N chars)
2242    pub preview: String,
2243    /// Whether content was truncated
2244    pub truncated: bool,
2245    /// Memory type
2246    pub memory_type: MemoryType,
2247    /// Tags
2248    pub tags: Vec<String>,
2249    /// Importance score
2250    pub importance: f32,
2251    /// Creation timestamp
2252    pub created_at: DateTime<Utc>,
2253    /// Last update timestamp
2254    pub updated_at: DateTime<Utc>,
2255    /// Workspace name
2256    pub workspace: String,
2257    /// Memory tier
2258    pub tier: MemoryTier,
2259    /// Original content length in chars
2260    pub content_length: usize,
2261    /// Number of lines in original content
2262    pub line_count: usize,
2263}
2264
2265/// List memories in compact format with preview only.
2266///
2267/// This is more efficient than `list_memories` when you don't need full content,
2268/// such as for browsing/listing UIs.
2269///
2270/// # Arguments
2271/// * `conn` - Database connection
2272/// * `options` - List filtering/pagination options
2273/// * `preview_chars` - Max chars for preview (default: 100)
2274pub fn list_memories_compact(
2275    conn: &Connection,
2276    options: &ListOptions,
2277    preview_chars: Option<usize>,
2278) -> Result<Vec<CompactMemoryRow>> {
2279    use crate::intelligence::compact_preview;
2280
2281    let now = Utc::now().to_rfc3339();
2282    let max_preview = preview_chars.unwrap_or(100);
2283
2284    let mut sql = String::from(
2285        "SELECT DISTINCT m.id, m.content, m.memory_type, m.importance,
2286                m.created_at, m.updated_at, m.workspace, m.tier
2287         FROM memories m",
2288    );
2289
2290    let mut conditions = vec!["m.valid_to IS NULL".to_string()];
2291    let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![];
2292
2293    // Exclude expired memories
2294    conditions.push("(m.expires_at IS NULL OR m.expires_at > ?)".to_string());
2295    params.push(Box::new(now));
2296
2297    // Tag filter (requires join)
2298    if let Some(ref tags) = options.tags {
2299        if !tags.is_empty() {
2300            sql.push_str(
2301                " JOIN memory_tags mt ON m.id = mt.memory_id
2302                  JOIN tags t ON mt.tag_id = t.id",
2303            );
2304            let placeholders: Vec<String> = tags.iter().map(|_| "?".to_string()).collect();
2305            conditions.push(format!("t.name IN ({})", placeholders.join(", ")));
2306            for tag in tags {
2307                params.push(Box::new(tag.clone()));
2308            }
2309        }
2310    }
2311
2312    // Type filter
2313    if let Some(ref memory_type) = options.memory_type {
2314        conditions.push("m.memory_type = ?".to_string());
2315        params.push(Box::new(memory_type.as_str().to_string()));
2316    }
2317
2318    // Metadata filter (JSON)
2319    if let Some(ref metadata_filter) = options.metadata_filter {
2320        for (key, value) in metadata_filter {
2321            metadata_value_to_param(key, value, &mut conditions, &mut params)?;
2322        }
2323    }
2324
2325    // Scope filter
2326    if let Some(ref scope) = options.scope {
2327        conditions.push("m.scope_type = ?".to_string());
2328        params.push(Box::new(scope.scope_type().to_string()));
2329        if let Some(scope_id) = scope.scope_id() {
2330            conditions.push("m.scope_id = ?".to_string());
2331            params.push(Box::new(scope_id.to_string()));
2332        } else {
2333            conditions.push("m.scope_id IS NULL".to_string());
2334        }
2335    }
2336
2337    // Workspace filter
2338    if let Some(ref workspace) = options.workspace {
2339        conditions.push("m.workspace = ?".to_string());
2340        params.push(Box::new(workspace.clone()));
2341    }
2342
2343    // Tier filter
2344    if let Some(ref tier) = options.tier {
2345        conditions.push("m.tier = ?".to_string());
2346        params.push(Box::new(tier.as_str().to_string()));
2347    }
2348
2349    sql.push_str(" WHERE ");
2350    sql.push_str(&conditions.join(" AND "));
2351
2352    // Sorting
2353    let sort_field = match options.sort_by.unwrap_or_default() {
2354        SortField::CreatedAt => "m.created_at",
2355        SortField::UpdatedAt => "m.updated_at",
2356        SortField::LastAccessedAt => "m.last_accessed_at",
2357        SortField::Importance => "m.importance",
2358        SortField::AccessCount => "m.access_count",
2359    };
2360    let sort_order = match options.sort_order.unwrap_or_default() {
2361        SortOrder::Asc => "ASC",
2362        SortOrder::Desc => "DESC",
2363    };
2364    sql.push_str(&format!(" ORDER BY {} {}", sort_field, sort_order));
2365
2366    // Pagination
2367    let limit = options.limit.unwrap_or(100);
2368    let offset = options.offset.unwrap_or(0);
2369    sql.push_str(&format!(" LIMIT {} OFFSET {}", limit, offset));
2370
2371    let param_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|b| b.as_ref()).collect();
2372    let mut stmt = conn.prepare(&sql)?;
2373
2374    let memories: Vec<CompactMemoryRow> = stmt
2375        .query_map(param_refs.as_slice(), |row| {
2376            let id: i64 = row.get("id")?;
2377            let content: String = row.get("content")?;
2378            let memory_type_str: String = row.get("memory_type")?;
2379            let importance: f32 = row.get("importance")?;
2380            let created_at_str: String = row.get("created_at")?;
2381            let updated_at_str: String = row.get("updated_at")?;
2382            let workspace: String = row.get("workspace")?;
2383            let tier_str: String = row.get("tier")?;
2384
2385            let memory_type = memory_type_str.parse().unwrap_or(MemoryType::Note);
2386            let tier = tier_str.parse().unwrap_or_default();
2387
2388            // Generate compact preview
2389            let (preview, truncated) = compact_preview(&content, max_preview);
2390            let content_length = content.len();
2391            let line_count = content.lines().count();
2392
2393            Ok(CompactMemoryRow {
2394                id,
2395                preview,
2396                truncated,
2397                memory_type,
2398                tags: vec![], // Will be loaded separately
2399                importance,
2400                created_at: DateTime::parse_from_rfc3339(&created_at_str)
2401                    .map(|dt| dt.with_timezone(&Utc))
2402                    .unwrap_or_else(|_| Utc::now()),
2403                updated_at: DateTime::parse_from_rfc3339(&updated_at_str)
2404                    .map(|dt| dt.with_timezone(&Utc))
2405                    .unwrap_or_else(|_| Utc::now()),
2406                workspace,
2407                tier,
2408                content_length,
2409                line_count,
2410            })
2411        })?
2412        .filter_map(|r| r.ok())
2413        .map(|mut m| {
2414            m.tags = load_tags(conn, m.id).unwrap_or_default();
2415            m
2416        })
2417        .collect();
2418
2419    Ok(memories)
2420}
2421
2422/// Get storage statistics
2423pub fn get_stats(conn: &Connection) -> Result<StorageStats> {
2424    let total_memories: i64 = conn.query_row(
2425        "SELECT COUNT(*) FROM memories WHERE valid_to IS NULL",
2426        [],
2427        |row| row.get(0),
2428    )?;
2429
2430    let total_tags: i64 = conn.query_row("SELECT COUNT(*) FROM tags", [], |row| row.get(0))?;
2431
2432    let total_crossrefs: i64 = conn.query_row(
2433        "SELECT COUNT(*) FROM crossrefs WHERE valid_to IS NULL",
2434        [],
2435        |row| row.get(0),
2436    )?;
2437
2438    let total_versions: i64 =
2439        conn.query_row("SELECT COUNT(*) FROM memory_versions", [], |row| row.get(0))?;
2440
2441    let _total_identities: i64 =
2442        conn.query_row("SELECT COUNT(*) FROM identities", [], |row| row.get(0))?;
2443
2444    let _total_entities: i64 =
2445        conn.query_row("SELECT COUNT(*) FROM entities", [], |row| row.get(0))?;
2446
2447    let db_size_bytes: i64 = conn.query_row(
2448        "SELECT page_count * page_size FROM pragma_page_count(), pragma_page_size()",
2449        [],
2450        |row| row.get(0),
2451    )?;
2452
2453    let _schema_version: i32 = conn
2454        .query_row("SELECT MAX(version) FROM schema_version", [], |row| {
2455            row.get(0)
2456        })
2457        .unwrap_or(0);
2458
2459    let mut workspace_stmt = conn.prepare(
2460        "SELECT workspace, COUNT(*) FROM memories WHERE valid_to IS NULL GROUP BY workspace",
2461    )?;
2462    let workspaces: HashMap<String, i64> = workspace_stmt
2463        .query_map([], |row| {
2464            Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
2465        })?
2466        .filter_map(|r| r.ok())
2467        .collect();
2468
2469    let mut type_stmt = conn.prepare(
2470        "SELECT memory_type, COUNT(*) FROM memories WHERE valid_to IS NULL GROUP BY memory_type",
2471    )?;
2472    let type_counts: HashMap<String, i64> = type_stmt
2473        .query_map([], |row| {
2474            Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
2475        })?
2476        .filter_map(|r| r.ok())
2477        .collect();
2478
2479    let mut tier_stmt = conn.prepare(
2480        "SELECT COALESCE(tier, 'permanent'), COUNT(*) FROM memories GROUP BY COALESCE(tier, 'permanent')",
2481    )?;
2482    let tier_counts: HashMap<String, i64> = tier_stmt
2483        .query_map([], |row| {
2484            Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
2485        })?
2486        .filter_map(|r| r.ok())
2487        .collect();
2488
2489    let memories_with_embeddings: i64 = conn.query_row(
2490        "SELECT COUNT(*) FROM memories WHERE has_embedding = 1 AND valid_to IS NULL",
2491        [],
2492        |row| row.get(0),
2493    )?;
2494
2495    let memories_pending_embedding: i64 = conn.query_row(
2496        "SELECT COUNT(*) FROM embedding_queue WHERE status = 'pending'",
2497        [],
2498        |row| row.get(0),
2499    )?;
2500
2501    let (last_sync, sync_pending): (Option<String>, i64) = conn.query_row(
2502        "SELECT last_sync, pending_changes FROM sync_state WHERE id = 1",
2503        [],
2504        |row| Ok((row.get(0)?, row.get(1)?)),
2505    )?;
2506
2507    Ok(StorageStats {
2508        total_memories,
2509        total_tags,
2510        total_crossrefs,
2511        total_versions,
2512        total_identities: 0,
2513        total_entities: 0,
2514        db_size_bytes,
2515        memories_with_embeddings,
2516        memories_pending_embedding,
2517        last_sync: last_sync.and_then(|s| {
2518            DateTime::parse_from_rfc3339(&s)
2519                .map(|dt| dt.with_timezone(&Utc))
2520                .ok()
2521        }),
2522        sync_pending: sync_pending > 0,
2523        storage_mode: "sqlite".to_string(),
2524        schema_version: 0,
2525        workspaces,
2526        type_counts,
2527        tier_counts,
2528    })
2529}
2530
2531/// Get memory versions
2532pub fn get_memory_versions(conn: &Connection, memory_id: i64) -> Result<Vec<MemoryVersion>> {
2533    let mut stmt = conn.prepare_cached(
2534        "SELECT version, content, tags, metadata, created_at, created_by, change_summary
2535         FROM memory_versions WHERE memory_id = ? ORDER BY version DESC",
2536    )?;
2537
2538    let versions: Vec<MemoryVersion> = stmt
2539        .query_map([memory_id], |row| {
2540            let tags_str: String = row.get("tags")?;
2541            let metadata_str: String = row.get("metadata")?;
2542            let created_at_str: String = row.get("created_at")?;
2543
2544            Ok(MemoryVersion {
2545                version: row.get("version")?,
2546                content: row.get("content")?,
2547                tags: serde_json::from_str(&tags_str).unwrap_or_default(),
2548                metadata: serde_json::from_str(&metadata_str).unwrap_or_default(),
2549                created_at: DateTime::parse_from_rfc3339(&created_at_str)
2550                    .map(|dt| dt.with_timezone(&Utc))
2551                    .unwrap_or_else(|_| Utc::now()),
2552                created_by: row.get("created_by")?,
2553                change_summary: row.get("change_summary")?,
2554            })
2555        })?
2556        .filter_map(|r| r.ok())
2557        .collect();
2558
2559    Ok(versions)
2560}
2561
2562// ============================================================================
2563// Batch Operations
2564// ============================================================================
2565
2566/// Result of a batch create operation
2567#[derive(Debug, Clone, serde::Serialize)]
2568pub struct BatchCreateResult {
2569    pub created: Vec<Memory>,
2570    pub failed: Vec<BatchError>,
2571    pub total_created: usize,
2572    pub total_failed: usize,
2573}
2574
2575/// Result of a batch delete operation
2576#[derive(Debug, Clone, serde::Serialize)]
2577pub struct BatchDeleteResult {
2578    pub deleted: Vec<i64>,
2579    pub failed: Vec<BatchError>,
2580    pub total_deleted: usize,
2581    pub total_failed: usize,
2582}
2583
2584/// Error information for batch operations
2585#[derive(Debug, Clone, serde::Serialize)]
2586pub struct BatchError {
2587    pub index: usize,
2588    pub id: Option<i64>,
2589    pub error: String,
2590}
2591
2592/// Create multiple memories in a single transaction
2593pub fn create_memory_batch(
2594    conn: &Connection,
2595    inputs: &[CreateMemoryInput],
2596) -> Result<BatchCreateResult> {
2597    let mut created = Vec::new();
2598    let mut failed = Vec::new();
2599
2600    for (index, input) in inputs.iter().enumerate() {
2601        match create_memory(conn, input) {
2602            Ok(memory) => created.push(memory),
2603            Err(e) => failed.push(BatchError {
2604                index,
2605                id: None,
2606                error: e.to_string(),
2607            }),
2608        }
2609    }
2610
2611    Ok(BatchCreateResult {
2612        total_created: created.len(),
2613        total_failed: failed.len(),
2614        created,
2615        failed,
2616    })
2617}
2618
2619/// Delete multiple memories in a single transaction
2620pub fn delete_memory_batch(conn: &Connection, ids: &[i64]) -> Result<BatchDeleteResult> {
2621    let mut deleted = Vec::new();
2622    let mut failed = Vec::new();
2623
2624    for (index, &id) in ids.iter().enumerate() {
2625        match delete_memory(conn, id) {
2626            Ok(()) => deleted.push(id),
2627            Err(e) => failed.push(BatchError {
2628                index,
2629                id: Some(id),
2630                error: e.to_string(),
2631            }),
2632        }
2633    }
2634
2635    Ok(BatchDeleteResult {
2636        total_deleted: deleted.len(),
2637        total_failed: failed.len(),
2638        deleted,
2639        failed,
2640    })
2641}
2642
2643// ============================================================================
2644// Tag Utilities
2645// ============================================================================
2646
2647/// Tag with usage count
2648#[derive(Debug, Clone, serde::Serialize)]
2649pub struct TagInfo {
2650    pub name: String,
2651    pub count: i64,
2652    pub last_used: Option<DateTime<Utc>>,
2653}
2654
2655/// Get all tags with their usage counts
2656pub fn list_tags(conn: &Connection) -> Result<Vec<TagInfo>> {
2657    let mut stmt = conn.prepare(
2658        r#"
2659        SELECT t.name, COUNT(mt.memory_id) as count,
2660               MAX(m.updated_at) as last_used
2661        FROM tags t
2662        LEFT JOIN memory_tags mt ON t.id = mt.tag_id
2663        LEFT JOIN memories m ON mt.memory_id = m.id AND m.valid_to IS NULL
2664        GROUP BY t.id, t.name
2665        ORDER BY count DESC, t.name ASC
2666        "#,
2667    )?;
2668
2669    let tags: Vec<TagInfo> = stmt
2670        .query_map([], |row| {
2671            let name: String = row.get(0)?;
2672            let count: i64 = row.get(1)?;
2673            let last_used: Option<String> = row.get(2)?;
2674
2675            Ok(TagInfo {
2676                name,
2677                count,
2678                last_used: last_used.and_then(|s| {
2679                    DateTime::parse_from_rfc3339(&s)
2680                        .map(|dt| dt.with_timezone(&Utc))
2681                        .ok()
2682                }),
2683            })
2684        })?
2685        .filter_map(|r| r.ok())
2686        .collect();
2687
2688    Ok(tags)
2689}
2690
2691/// Tag hierarchy node
2692#[derive(Debug, Clone, serde::Serialize)]
2693pub struct TagHierarchyNode {
2694    pub name: String,
2695    pub full_path: String,
2696    pub count: i64,
2697    pub children: Vec<TagHierarchyNode>,
2698}
2699
2700/// Build tag hierarchy from slash-separated tags (e.g., "project/engram/core")
2701pub fn get_tag_hierarchy(conn: &Connection) -> Result<Vec<TagHierarchyNode>> {
2702    let tags = list_tags(conn)?;
2703
2704    // Build hierarchy from slash-separated paths
2705    let mut root_nodes: HashMap<String, TagHierarchyNode> = HashMap::new();
2706
2707    for tag in tags {
2708        let parts: Vec<&str> = tag.name.split('/').collect();
2709        if parts.is_empty() {
2710            continue;
2711        }
2712
2713        let root_name = parts[0].to_string();
2714        if !root_nodes.contains_key(&root_name) {
2715            root_nodes.insert(
2716                root_name.clone(),
2717                TagHierarchyNode {
2718                    name: root_name.clone(),
2719                    full_path: root_name.clone(),
2720                    count: 0,
2721                    children: Vec::new(),
2722                },
2723            );
2724        }
2725
2726        // Add count to appropriate level
2727        if parts.len() == 1 {
2728            if let Some(node) = root_nodes.get_mut(&root_name) {
2729                node.count += tag.count;
2730            }
2731        } else {
2732            // For nested tags, we'd need recursive building
2733            // For now, just add to root count
2734            if let Some(node) = root_nodes.get_mut(&root_name) {
2735                node.count += tag.count;
2736            }
2737        }
2738    }
2739
2740    Ok(root_nodes.into_values().collect())
2741}
2742
2743/// Tag validation result
2744#[derive(Debug, Clone, serde::Serialize)]
2745pub struct TagValidationResult {
2746    pub valid: bool,
2747    pub orphaned_tags: Vec<String>,
2748    pub empty_tags: Vec<String>,
2749    pub duplicate_assignments: Vec<(i64, String)>,
2750    pub total_tags: i64,
2751    pub total_assignments: i64,
2752}
2753
2754/// Validate tag consistency
2755pub fn validate_tags(conn: &Connection) -> Result<TagValidationResult> {
2756    // Find orphaned tags (tags with no memories)
2757    let orphaned: Vec<String> = conn
2758        .prepare(
2759            "SELECT t.name FROM tags t
2760             LEFT JOIN memory_tags mt ON t.id = mt.tag_id
2761             WHERE mt.tag_id IS NULL",
2762        )?
2763        .query_map([], |row| row.get(0))?
2764        .filter_map(|r| r.ok())
2765        .collect();
2766
2767    // Find empty tag names
2768    let empty: Vec<String> = conn
2769        .prepare("SELECT name FROM tags WHERE name = '' OR name IS NULL")?
2770        .query_map([], |row| row.get(0))?
2771        .filter_map(|r| r.ok())
2772        .collect();
2773
2774    // Count totals
2775    let total_tags: i64 = conn.query_row("SELECT COUNT(*) FROM tags", [], |row| row.get(0))?;
2776    let total_assignments: i64 =
2777        conn.query_row("SELECT COUNT(*) FROM memory_tags", [], |row| row.get(0))?;
2778
2779    Ok(TagValidationResult {
2780        valid: orphaned.is_empty() && empty.is_empty(),
2781        orphaned_tags: orphaned,
2782        empty_tags: empty,
2783        duplicate_assignments: vec![], // Would need more complex query
2784        total_tags,
2785        total_assignments,
2786    })
2787}
2788
2789// ============================================================================
2790// Import/Export
2791// ============================================================================
2792
2793/// Exported memory format
2794#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
2795pub struct ExportedMemory {
2796    pub id: i64,
2797    pub content: String,
2798    pub memory_type: String,
2799    pub tags: Vec<String>,
2800    pub metadata: HashMap<String, serde_json::Value>,
2801    pub importance: f32,
2802    pub workspace: String,
2803    pub tier: String,
2804    pub created_at: String,
2805    pub updated_at: String,
2806}
2807
2808/// Export format
2809#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
2810pub struct ExportData {
2811    pub version: String,
2812    pub exported_at: String,
2813    pub memory_count: usize,
2814    pub memories: Vec<ExportedMemory>,
2815}
2816
2817/// Export all memories to JSON-serializable format
2818pub fn export_memories(conn: &Connection) -> Result<ExportData> {
2819    let memories = list_memories(
2820        conn,
2821        &ListOptions {
2822            limit: Some(100000),
2823            ..Default::default()
2824        },
2825    )?;
2826
2827    let exported: Vec<ExportedMemory> = memories
2828        .into_iter()
2829        .map(|m| ExportedMemory {
2830            id: m.id,
2831            content: m.content,
2832            memory_type: m.memory_type.as_str().to_string(),
2833            tags: m.tags,
2834            metadata: m.metadata,
2835            importance: m.importance,
2836            workspace: m.workspace,
2837            tier: m.tier.as_str().to_string(),
2838            created_at: m.created_at.to_rfc3339(),
2839            updated_at: m.updated_at.to_rfc3339(),
2840        })
2841        .collect();
2842
2843    Ok(ExportData {
2844        version: "1.0".to_string(),
2845        exported_at: Utc::now().to_rfc3339(),
2846        memory_count: exported.len(),
2847        memories: exported,
2848    })
2849}
2850
2851/// Import result
2852#[derive(Debug, Clone, serde::Serialize)]
2853pub struct ImportResult {
2854    pub imported: usize,
2855    pub skipped: usize,
2856    pub failed: usize,
2857    pub errors: Vec<String>,
2858}
2859
2860/// Import memories from exported format
2861pub fn import_memories(
2862    conn: &Connection,
2863    data: &ExportData,
2864    skip_duplicates: bool,
2865) -> Result<ImportResult> {
2866    let mut imported = 0;
2867    let mut skipped = 0;
2868    let mut failed = 0;
2869    let mut errors = Vec::new();
2870
2871    for mem in &data.memories {
2872        let memory_type = mem.memory_type.parse().unwrap_or(MemoryType::Note);
2873        let tier = mem.tier.parse().unwrap_or(MemoryTier::Permanent);
2874
2875        let input = CreateMemoryInput {
2876            content: mem.content.clone(),
2877            memory_type,
2878            tags: mem.tags.clone(),
2879            metadata: mem.metadata.clone(),
2880            importance: Some(mem.importance),
2881            scope: MemoryScope::Global,
2882            workspace: Some(mem.workspace.clone()),
2883            tier,
2884            defer_embedding: false,
2885            ttl_seconds: None,
2886            dedup_mode: if skip_duplicates {
2887                DedupMode::Skip
2888            } else {
2889                DedupMode::Allow
2890            },
2891            dedup_threshold: None,
2892            event_time: None,
2893            event_duration_seconds: None,
2894            trigger_pattern: None,
2895            summary_of_id: None,
2896        };
2897
2898        match create_memory(conn, &input) {
2899            Ok(_) => imported += 1,
2900            Err(EngramError::Duplicate { .. }) => skipped += 1,
2901            Err(e) => {
2902                failed += 1;
2903                errors.push(format!("Failed to import memory {}: {}", mem.id, e));
2904            }
2905        }
2906    }
2907
2908    Ok(ImportResult {
2909        imported,
2910        skipped,
2911        failed,
2912        errors,
2913    })
2914}
2915
2916// ============================================================================
2917// Maintenance Operations
2918// ============================================================================
2919
2920/// Queue all memories for re-embedding
2921pub fn rebuild_embeddings(conn: &Connection) -> Result<i64> {
2922    let now = Utc::now().to_rfc3339();
2923
2924    // Clear existing queue
2925    conn.execute("DELETE FROM embedding_queue", [])?;
2926
2927    // Queue all memories
2928    let count = conn.execute(
2929        "INSERT INTO embedding_queue (memory_id, status, queued_at)
2930         SELECT id, 'pending', ? FROM memories WHERE valid_to IS NULL",
2931        params![now],
2932    )?;
2933
2934    // Reset has_embedding flag
2935    conn.execute(
2936        "UPDATE memories SET has_embedding = 0 WHERE valid_to IS NULL",
2937        [],
2938    )?;
2939
2940    Ok(count as i64)
2941}
2942
2943/// Rebuild all cross-references based on embeddings
2944pub fn rebuild_crossrefs(conn: &Connection) -> Result<i64> {
2945    let now = Utc::now().to_rfc3339();
2946
2947    // Clear existing auto-generated crossrefs (keep manual ones)
2948    let deleted = conn.execute(
2949        "UPDATE crossrefs SET valid_to = ? WHERE source = 'auto' AND valid_to IS NULL",
2950        params![now],
2951    )?;
2952
2953    // Note: Actual crossref generation requires embeddings and is done by the embedding worker
2954    // This just clears the old ones so they can be regenerated
2955
2956    Ok(deleted as i64)
2957}
2958
2959// ============================================================================
2960// Special Memory Types
2961// ============================================================================
2962
2963/// Create a section memory (for document structure)
2964pub fn create_section_memory(
2965    conn: &Connection,
2966    title: &str,
2967    content: &str,
2968    parent_id: Option<i64>,
2969    level: i32,
2970    workspace: Option<&str>,
2971) -> Result<Memory> {
2972    let mut metadata = HashMap::new();
2973    metadata.insert("section_title".to_string(), serde_json::json!(title));
2974    metadata.insert("section_level".to_string(), serde_json::json!(level));
2975    if let Some(pid) = parent_id {
2976        metadata.insert("parent_memory_id".to_string(), serde_json::json!(pid));
2977    }
2978
2979    let input = CreateMemoryInput {
2980        content: format!("# {}\n\n{}", title, content),
2981        memory_type: MemoryType::Context,
2982        tags: vec!["section".to_string()],
2983        metadata,
2984        importance: Some(0.6),
2985        scope: MemoryScope::Global,
2986        workspace: workspace.map(String::from),
2987        tier: MemoryTier::Permanent,
2988        defer_embedding: false,
2989        ttl_seconds: None,
2990        dedup_mode: DedupMode::Skip,
2991        dedup_threshold: None,
2992        event_time: None,
2993        event_duration_seconds: None,
2994        trigger_pattern: None,
2995        summary_of_id: None,
2996    };
2997
2998    create_memory(conn, &input)
2999}
3000
3001/// Create a checkpoint memory for session state
3002pub fn create_checkpoint(
3003    conn: &Connection,
3004    session_id: &str,
3005    summary: &str,
3006    context: &HashMap<String, serde_json::Value>,
3007    workspace: Option<&str>,
3008) -> Result<Memory> {
3009    let mut metadata = context.clone();
3010    metadata.insert(
3011        "checkpoint_session".to_string(),
3012        serde_json::json!(session_id),
3013    );
3014    metadata.insert(
3015        "checkpoint_time".to_string(),
3016        serde_json::json!(Utc::now().to_rfc3339()),
3017    );
3018
3019    let input = CreateMemoryInput {
3020        content: format!("Session Checkpoint: {}\n\n{}", session_id, summary),
3021        memory_type: MemoryType::Context,
3022        tags: vec!["checkpoint".to_string(), format!("session:{}", session_id)],
3023        metadata,
3024        importance: Some(0.7),
3025        scope: MemoryScope::Global,
3026        workspace: workspace.map(String::from),
3027        tier: MemoryTier::Permanent,
3028        defer_embedding: false,
3029        ttl_seconds: None,
3030        dedup_mode: DedupMode::Allow,
3031        dedup_threshold: None,
3032        event_time: None,
3033        event_duration_seconds: None,
3034        trigger_pattern: None,
3035        summary_of_id: None,
3036    };
3037
3038    create_memory(conn, &input)
3039}
3040
3041/// Temporarily boost a memory's importance
3042pub fn boost_memory(
3043    conn: &Connection,
3044    id: i64,
3045    boost_amount: f32,
3046    duration_seconds: Option<i64>,
3047) -> Result<Memory> {
3048    let memory = get_memory(conn, id)?;
3049    let new_importance = (memory.importance + boost_amount).min(1.0);
3050    let now = Utc::now();
3051
3052    // Update importance
3053    conn.execute(
3054        "UPDATE memories SET importance = ?, updated_at = ? WHERE id = ?",
3055        params![new_importance, now.to_rfc3339(), id],
3056    )?;
3057
3058    // If duration specified, store boost info in metadata for later decay
3059    if let Some(duration) = duration_seconds {
3060        let expires = now + chrono::Duration::seconds(duration);
3061        let mut metadata = memory.metadata.clone();
3062        metadata.insert(
3063            "boost_expires".to_string(),
3064            serde_json::json!(expires.to_rfc3339()),
3065        );
3066        metadata.insert(
3067            "boost_original_importance".to_string(),
3068            serde_json::json!(memory.importance),
3069        );
3070
3071        let metadata_json = serde_json::to_string(&metadata)?;
3072        conn.execute(
3073            "UPDATE memories SET metadata = ? WHERE id = ?",
3074            params![metadata_json, id],
3075        )?;
3076    }
3077
3078    get_memory(conn, id)
3079}
3080
3081// =============================================================================
3082// Event System
3083// =============================================================================
3084
3085/// Event types for the memory system
3086#[derive(Debug, Clone, Serialize, Deserialize)]
3087pub enum MemoryEventType {
3088    Created,
3089    Updated,
3090    Deleted,
3091    Linked,
3092    Unlinked,
3093    Shared,
3094    Synced,
3095}
3096
3097impl std::fmt::Display for MemoryEventType {
3098    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3099        match self {
3100            MemoryEventType::Created => write!(f, "created"),
3101            MemoryEventType::Updated => write!(f, "updated"),
3102            MemoryEventType::Deleted => write!(f, "deleted"),
3103            MemoryEventType::Linked => write!(f, "linked"),
3104            MemoryEventType::Unlinked => write!(f, "unlinked"),
3105            MemoryEventType::Shared => write!(f, "shared"),
3106            MemoryEventType::Synced => write!(f, "synced"),
3107        }
3108    }
3109}
3110
3111impl std::str::FromStr for MemoryEventType {
3112    type Err = EngramError;
3113    fn from_str(s: &str) -> Result<Self> {
3114        match s {
3115            "created" => Ok(MemoryEventType::Created),
3116            "updated" => Ok(MemoryEventType::Updated),
3117            "deleted" => Ok(MemoryEventType::Deleted),
3118            "linked" => Ok(MemoryEventType::Linked),
3119            "unlinked" => Ok(MemoryEventType::Unlinked),
3120            "shared" => Ok(MemoryEventType::Shared),
3121            "synced" => Ok(MemoryEventType::Synced),
3122            _ => Err(EngramError::InvalidInput(format!(
3123                "Invalid event type: {}",
3124                s
3125            ))),
3126        }
3127    }
3128}
3129
3130/// A memory event for tracking changes
3131#[derive(Debug, Clone, Serialize, Deserialize)]
3132pub struct MemoryEvent {
3133    pub id: i64,
3134    pub event_type: String,
3135    pub memory_id: Option<i64>,
3136    pub agent_id: Option<String>,
3137    pub data: serde_json::Value,
3138    pub created_at: DateTime<Utc>,
3139}
3140
3141/// Record an event in the event system
3142pub fn record_event(
3143    conn: &Connection,
3144    event_type: MemoryEventType,
3145    memory_id: Option<i64>,
3146    agent_id: Option<&str>,
3147    data: serde_json::Value,
3148) -> Result<i64> {
3149    let now = Utc::now();
3150    let data_json = serde_json::to_string(&data)?;
3151
3152    conn.execute(
3153        "INSERT INTO memory_events (event_type, memory_id, agent_id, data, created_at)
3154         VALUES (?, ?, ?, ?, ?)",
3155        params![
3156            event_type.to_string(),
3157            memory_id,
3158            agent_id,
3159            data_json,
3160            now.to_rfc3339()
3161        ],
3162    )?;
3163
3164    Ok(conn.last_insert_rowid())
3165}
3166
3167/// Poll for events since a given timestamp or event ID
3168pub fn poll_events(
3169    conn: &Connection,
3170    since_id: Option<i64>,
3171    since_time: Option<DateTime<Utc>>,
3172    agent_id: Option<&str>,
3173    limit: Option<usize>,
3174) -> Result<Vec<MemoryEvent>> {
3175    let limit = limit.unwrap_or(100);
3176
3177    let (query, params): (&str, Vec<Box<dyn rusqlite::ToSql>>) =
3178        match (since_id, since_time, agent_id) {
3179            (Some(id), _, Some(agent)) => (
3180                "SELECT id, event_type, memory_id, agent_id, data, created_at
3181             FROM memory_events WHERE id > ? AND (agent_id = ? OR agent_id IS NULL)
3182             ORDER BY id ASC LIMIT ?",
3183                vec![
3184                    Box::new(id),
3185                    Box::new(agent.to_string()),
3186                    Box::new(limit as i64),
3187                ],
3188            ),
3189            (Some(id), _, None) => (
3190                "SELECT id, event_type, memory_id, agent_id, data, created_at
3191             FROM memory_events WHERE id > ?
3192             ORDER BY id ASC LIMIT ?",
3193                vec![Box::new(id), Box::new(limit as i64)],
3194            ),
3195            (None, Some(time), Some(agent)) => (
3196                "SELECT id, event_type, memory_id, agent_id, data, created_at
3197             FROM memory_events WHERE created_at > ? AND (agent_id = ? OR agent_id IS NULL)
3198             ORDER BY id ASC LIMIT ?",
3199                vec![
3200                    Box::new(time.to_rfc3339()),
3201                    Box::new(agent.to_string()),
3202                    Box::new(limit as i64),
3203                ],
3204            ),
3205            (None, Some(time), None) => (
3206                "SELECT id, event_type, memory_id, agent_id, data, created_at
3207             FROM memory_events WHERE created_at > ?
3208             ORDER BY id ASC LIMIT ?",
3209                vec![Box::new(time.to_rfc3339()), Box::new(limit as i64)],
3210            ),
3211            (None, None, Some(agent)) => (
3212                "SELECT id, event_type, memory_id, agent_id, data, created_at
3213             FROM memory_events WHERE agent_id = ? OR agent_id IS NULL
3214             ORDER BY id DESC LIMIT ?",
3215                vec![Box::new(agent.to_string()), Box::new(limit as i64)],
3216            ),
3217            (None, None, None) => (
3218                "SELECT id, event_type, memory_id, agent_id, data, created_at
3219             FROM memory_events ORDER BY id DESC LIMIT ?",
3220                vec![Box::new(limit as i64)],
3221            ),
3222        };
3223
3224    let params_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect();
3225    let mut stmt = conn.prepare(query)?;
3226    let events = stmt
3227        .query_map(params_refs.as_slice(), |row| {
3228            let data_str: String = row.get(4)?;
3229            let created_str: String = row.get(5)?;
3230            Ok(MemoryEvent {
3231                id: row.get(0)?,
3232                event_type: row.get(1)?,
3233                memory_id: row.get(2)?,
3234                agent_id: row.get(3)?,
3235                data: serde_json::from_str(&data_str).unwrap_or(serde_json::json!({})),
3236                created_at: DateTime::parse_from_rfc3339(&created_str)
3237                    .map(|dt| dt.with_timezone(&Utc))
3238                    .unwrap_or_else(|_| Utc::now()),
3239            })
3240        })?
3241        .collect::<std::result::Result<Vec<_>, _>>()?;
3242
3243    Ok(events)
3244}
3245
3246/// Clear old events (cleanup)
3247pub fn clear_events(
3248    conn: &Connection,
3249    before_id: Option<i64>,
3250    before_time: Option<DateTime<Utc>>,
3251    keep_recent: Option<usize>,
3252) -> Result<i64> {
3253    let deleted = if let Some(id) = before_id {
3254        conn.execute("DELETE FROM memory_events WHERE id < ?", params![id])?
3255    } else if let Some(time) = before_time {
3256        conn.execute(
3257            "DELETE FROM memory_events WHERE created_at < ?",
3258            params![time.to_rfc3339()],
3259        )?
3260    } else if let Some(keep) = keep_recent {
3261        // Keep only the most recent N events
3262        conn.execute(
3263            "DELETE FROM memory_events WHERE id NOT IN (
3264                SELECT id FROM memory_events ORDER BY id DESC LIMIT ?
3265            )",
3266            params![keep as i64],
3267        )?
3268    } else {
3269        // Clear all events
3270        conn.execute("DELETE FROM memory_events", [])?
3271    };
3272
3273    Ok(deleted as i64)
3274}
3275
3276// =============================================================================
3277// Advanced Sync
3278// =============================================================================
3279
3280/// Sync version info
3281#[derive(Debug, Clone, Serialize, Deserialize)]
3282pub struct SyncVersion {
3283    pub version: i64,
3284    pub last_modified: DateTime<Utc>,
3285    pub memory_count: i64,
3286    pub checksum: String,
3287}
3288
3289/// Sync task status record (Phase 3 - Langfuse integration)
3290#[derive(Debug, Clone, Serialize, Deserialize)]
3291pub struct SyncTask {
3292    pub task_id: String,
3293    pub task_type: String,
3294    pub status: String,
3295    pub progress_percent: i32,
3296    pub traces_processed: i64,
3297    pub memories_created: i64,
3298    pub error_message: Option<String>,
3299    pub started_at: String,
3300    pub completed_at: Option<String>,
3301}
3302
3303/// Get the current sync version
3304pub fn get_sync_version(conn: &Connection) -> Result<SyncVersion> {
3305    let memory_count: i64 =
3306        conn.query_row("SELECT COUNT(*) FROM memories", [], |row| row.get(0))?;
3307
3308    let last_modified: Option<String> = conn
3309        .query_row("SELECT MAX(updated_at) FROM memories", [], |row| row.get(0))
3310        .ok();
3311
3312    let version: i64 = conn
3313        .query_row("SELECT MAX(version) FROM sync_state", [], |row| row.get(0))
3314        .unwrap_or(0);
3315
3316    // Simple checksum based on count and last modified
3317    let checksum = format!(
3318        "{}-{}-{}",
3319        memory_count,
3320        version,
3321        last_modified.as_deref().unwrap_or("none")
3322    );
3323
3324    Ok(SyncVersion {
3325        version,
3326        last_modified: last_modified
3327            .and_then(|s| DateTime::parse_from_rfc3339(&s).ok())
3328            .map(|dt| dt.with_timezone(&Utc))
3329            .unwrap_or_else(Utc::now),
3330        memory_count,
3331        checksum,
3332    })
3333}
3334
3335/// Insert or update a sync task record
3336pub fn upsert_sync_task(conn: &Connection, task: &SyncTask) -> Result<()> {
3337    conn.execute(
3338        r#"
3339        INSERT INTO sync_tasks (
3340            task_id, task_type, status, progress_percent, traces_processed, memories_created,
3341            error_message, started_at, completed_at
3342        )
3343        VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
3344        ON CONFLICT(task_id) DO UPDATE SET
3345            task_type = excluded.task_type,
3346            status = excluded.status,
3347            progress_percent = excluded.progress_percent,
3348            traces_processed = excluded.traces_processed,
3349            memories_created = excluded.memories_created,
3350            error_message = excluded.error_message,
3351            started_at = excluded.started_at,
3352            completed_at = excluded.completed_at
3353        "#,
3354        params![
3355            task.task_id,
3356            task.task_type,
3357            task.status,
3358            task.progress_percent,
3359            task.traces_processed,
3360            task.memories_created,
3361            task.error_message,
3362            task.started_at,
3363            task.completed_at
3364        ],
3365    )?;
3366
3367    Ok(())
3368}
3369
3370/// Get a sync task by ID
3371pub fn get_sync_task(conn: &Connection, task_id: &str) -> Result<Option<SyncTask>> {
3372    let mut stmt = conn.prepare(
3373        r#"
3374        SELECT task_id, task_type, status, progress_percent, traces_processed, memories_created,
3375               error_message, started_at, completed_at
3376        FROM sync_tasks
3377        WHERE task_id = ?
3378        "#,
3379    )?;
3380
3381    let mut rows = stmt.query(params![task_id])?;
3382    if let Some(row) = rows.next()? {
3383        Ok(Some(SyncTask {
3384            task_id: row.get("task_id")?,
3385            task_type: row.get("task_type")?,
3386            status: row.get("status")?,
3387            progress_percent: row.get("progress_percent")?,
3388            traces_processed: row.get("traces_processed")?,
3389            memories_created: row.get("memories_created")?,
3390            error_message: row.get("error_message")?,
3391            started_at: row.get("started_at")?,
3392            completed_at: row.get("completed_at")?,
3393        }))
3394    } else {
3395        Ok(None)
3396    }
3397}
3398
3399/// Delta entry for sync
3400#[derive(Debug, Clone, Serialize, Deserialize)]
3401pub struct SyncDelta {
3402    pub created: Vec<Memory>,
3403    pub updated: Vec<Memory>,
3404    pub deleted: Vec<i64>,
3405    pub from_version: i64,
3406    pub to_version: i64,
3407}
3408
3409/// Get changes since a specific version
3410pub fn get_sync_delta(conn: &Connection, since_version: i64) -> Result<SyncDelta> {
3411    let current_version = get_sync_version(conn)?.version;
3412
3413    // Get events since that version to determine what changed
3414    let events = poll_events(conn, Some(since_version), None, None, Some(10000))?;
3415
3416    let mut created_ids = std::collections::HashSet::new();
3417    let mut updated_ids = std::collections::HashSet::new();
3418    let mut deleted_ids = std::collections::HashSet::new();
3419
3420    for event in events {
3421        if let Some(memory_id) = event.memory_id {
3422            match event.event_type.as_str() {
3423                "created" => {
3424                    created_ids.insert(memory_id);
3425                }
3426                "updated" => {
3427                    if !created_ids.contains(&memory_id) {
3428                        updated_ids.insert(memory_id);
3429                    }
3430                }
3431                "deleted" => {
3432                    created_ids.remove(&memory_id);
3433                    updated_ids.remove(&memory_id);
3434                    deleted_ids.insert(memory_id);
3435                }
3436                _ => {}
3437            }
3438        }
3439    }
3440
3441    let created: Vec<Memory> = created_ids
3442        .iter()
3443        .filter_map(|id| get_memory(conn, *id).ok())
3444        .collect();
3445
3446    let updated: Vec<Memory> = updated_ids
3447        .iter()
3448        .filter_map(|id| get_memory(conn, *id).ok())
3449        .collect();
3450
3451    Ok(SyncDelta {
3452        created,
3453        updated,
3454        deleted: deleted_ids.into_iter().collect(),
3455        from_version: since_version,
3456        to_version: current_version,
3457    })
3458}
3459
3460/// Agent sync state
3461#[derive(Debug, Clone, Serialize, Deserialize)]
3462pub struct AgentSyncState {
3463    pub agent_id: String,
3464    pub last_sync_version: i64,
3465    pub last_sync_time: DateTime<Utc>,
3466    pub pending_changes: i64,
3467}
3468
3469/// Get sync state for a specific agent
3470pub fn get_agent_sync_state(conn: &Connection, agent_id: &str) -> Result<AgentSyncState> {
3471    let result: std::result::Result<(i64, String), rusqlite::Error> = conn.query_row(
3472        "SELECT last_sync_version, last_sync_time FROM agent_sync_state WHERE agent_id = ?",
3473        params![agent_id],
3474        |row| Ok((row.get(0)?, row.get(1)?)),
3475    );
3476
3477    match result {
3478        Ok((version, time_str)) => {
3479            let current_version = get_sync_version(conn)?.version;
3480            let pending = (current_version - version).max(0);
3481
3482            Ok(AgentSyncState {
3483                agent_id: agent_id.to_string(),
3484                last_sync_version: version,
3485                last_sync_time: DateTime::parse_from_rfc3339(&time_str)
3486                    .map(|dt| dt.with_timezone(&Utc))
3487                    .unwrap_or_else(|_| Utc::now()),
3488                pending_changes: pending,
3489            })
3490        }
3491        Err(_) => {
3492            // No sync state yet for this agent
3493            Ok(AgentSyncState {
3494                agent_id: agent_id.to_string(),
3495                last_sync_version: 0,
3496                last_sync_time: Utc::now(),
3497                pending_changes: get_sync_version(conn)?.version,
3498            })
3499        }
3500    }
3501}
3502
3503/// Update sync state for an agent
3504pub fn update_agent_sync_state(conn: &Connection, agent_id: &str, version: i64) -> Result<()> {
3505    let now = Utc::now();
3506    conn.execute(
3507        "INSERT INTO agent_sync_state (agent_id, last_sync_version, last_sync_time)
3508         VALUES (?, ?, ?)
3509         ON CONFLICT(agent_id) DO UPDATE SET
3510            last_sync_version = excluded.last_sync_version,
3511            last_sync_time = excluded.last_sync_time",
3512        params![agent_id, version, now.to_rfc3339()],
3513    )?;
3514    Ok(())
3515}
3516
3517/// Cleanup old sync data
3518pub fn cleanup_sync_data(conn: &Connection, older_than_days: i64) -> Result<i64> {
3519    let cutoff = Utc::now() - chrono::Duration::days(older_than_days);
3520    let deleted = conn.execute(
3521        "DELETE FROM memory_events WHERE created_at < ?",
3522        params![cutoff.to_rfc3339()],
3523    )?;
3524    Ok(deleted as i64)
3525}
3526
3527// =============================================================================
3528// Multi-Agent Sharing
3529// =============================================================================
3530
3531/// A shared memory entry
3532#[derive(Debug, Clone, Serialize, Deserialize)]
3533pub struct SharedMemory {
3534    pub id: i64,
3535    pub memory_id: i64,
3536    pub from_agent: String,
3537    pub to_agent: String,
3538    pub message: Option<String>,
3539    pub acknowledged: bool,
3540    pub acknowledged_at: Option<DateTime<Utc>>,
3541    pub created_at: DateTime<Utc>,
3542}
3543
3544/// Share a memory with another agent
3545pub fn share_memory(
3546    conn: &Connection,
3547    memory_id: i64,
3548    from_agent: &str,
3549    to_agent: &str,
3550    message: Option<&str>,
3551) -> Result<i64> {
3552    let now = Utc::now();
3553
3554    // Verify memory exists
3555    let _ = get_memory(conn, memory_id)?;
3556
3557    conn.execute(
3558        "INSERT INTO shared_memories (memory_id, from_agent, to_agent, message, acknowledged, created_at)
3559         VALUES (?, ?, ?, ?, 0, ?)",
3560        params![memory_id, from_agent, to_agent, message, now.to_rfc3339()],
3561    )?;
3562
3563    let share_id = conn.last_insert_rowid();
3564
3565    // Record event
3566    record_event(
3567        conn,
3568        MemoryEventType::Shared,
3569        Some(memory_id),
3570        Some(from_agent),
3571        serde_json::json!({
3572            "to_agent": to_agent,
3573            "share_id": share_id,
3574            "message": message
3575        }),
3576    )?;
3577
3578    Ok(share_id)
3579}
3580
3581/// Poll for shared memories sent to this agent
3582pub fn poll_shared_memories(
3583    conn: &Connection,
3584    to_agent: &str,
3585    include_acknowledged: bool,
3586) -> Result<Vec<SharedMemory>> {
3587    let query = if include_acknowledged {
3588        "SELECT id, memory_id, from_agent, to_agent, message, acknowledged, acknowledged_at, created_at
3589         FROM shared_memories WHERE to_agent = ?
3590         ORDER BY created_at DESC"
3591    } else {
3592        "SELECT id, memory_id, from_agent, to_agent, message, acknowledged, acknowledged_at, created_at
3593         FROM shared_memories WHERE to_agent = ? AND acknowledged = 0
3594         ORDER BY created_at DESC"
3595    };
3596
3597    let mut stmt = conn.prepare(query)?;
3598    let shares = stmt
3599        .query_map(params![to_agent], |row| {
3600            let created_str: String = row.get(7)?;
3601            let ack_str: Option<String> = row.get(6)?;
3602            Ok(SharedMemory {
3603                id: row.get(0)?,
3604                memory_id: row.get(1)?,
3605                from_agent: row.get(2)?,
3606                to_agent: row.get(3)?,
3607                message: row.get(4)?,
3608                acknowledged: row.get(5)?,
3609                acknowledged_at: ack_str.and_then(|s| {
3610                    DateTime::parse_from_rfc3339(&s)
3611                        .ok()
3612                        .map(|dt| dt.with_timezone(&Utc))
3613                }),
3614                created_at: DateTime::parse_from_rfc3339(&created_str)
3615                    .map(|dt| dt.with_timezone(&Utc))
3616                    .unwrap_or_else(|_| Utc::now()),
3617            })
3618        })?
3619        .collect::<std::result::Result<Vec<_>, _>>()?;
3620
3621    Ok(shares)
3622}
3623
3624/// Acknowledge a shared memory
3625pub fn acknowledge_share(conn: &Connection, share_id: i64, agent_id: &str) -> Result<()> {
3626    let now = Utc::now();
3627
3628    let affected = conn.execute(
3629        "UPDATE shared_memories SET acknowledged = 1, acknowledged_at = ?
3630         WHERE id = ? AND to_agent = ?",
3631        params![now.to_rfc3339(), share_id, agent_id],
3632    )?;
3633
3634    if affected == 0 {
3635        return Err(EngramError::NotFound(share_id));
3636    }
3637
3638    Ok(())
3639}
3640
3641// =============================================================================
3642// Search Variants
3643// =============================================================================
3644
3645/// Search memories by identity (canonical ID or alias)
3646pub fn search_by_identity(
3647    conn: &Connection,
3648    identity: &str,
3649    workspace: Option<&str>,
3650    limit: Option<usize>,
3651) -> Result<Vec<Memory>> {
3652    let limit = limit.unwrap_or(50);
3653    let now = Utc::now().to_rfc3339();
3654
3655    // Search in content and tags for the identity
3656    // Tags are in a junction table, so we need to use a subquery or JOIN
3657    let pattern = format!("%{}%", identity);
3658
3659    let query = if workspace.is_some() {
3660        "SELECT DISTINCT m.id, m.content, m.memory_type, m.importance, m.access_count,
3661                m.created_at, m.updated_at, m.last_accessed_at, m.owner_id,
3662                m.visibility, m.version, m.has_embedding, m.metadata,
3663                m.scope_type, m.scope_id, m.workspace, m.tier, m.expires_at, m.content_hash
3664         FROM memories m
3665         LEFT JOIN memory_tags mt ON m.id = mt.memory_id
3666         LEFT JOIN tags t ON mt.tag_id = t.id
3667         WHERE m.workspace = ? AND (m.content LIKE ? OR t.name LIKE ?)
3668           AND m.valid_to IS NULL
3669           AND (m.expires_at IS NULL OR m.expires_at > ?)
3670         ORDER BY m.importance DESC, m.created_at DESC
3671         LIMIT ?"
3672    } else {
3673        "SELECT DISTINCT m.id, m.content, m.memory_type, m.importance, m.access_count,
3674                m.created_at, m.updated_at, m.last_accessed_at, m.owner_id,
3675                m.visibility, m.version, m.has_embedding, m.metadata,
3676                m.scope_type, m.scope_id, m.workspace, m.tier, m.expires_at, m.content_hash
3677         FROM memories m
3678         LEFT JOIN memory_tags mt ON m.id = mt.memory_id
3679         LEFT JOIN tags t ON mt.tag_id = t.id
3680         WHERE (m.content LIKE ? OR t.name LIKE ?)
3681           AND m.valid_to IS NULL
3682           AND (m.expires_at IS NULL OR m.expires_at > ?)
3683         ORDER BY m.importance DESC, m.created_at DESC
3684         LIMIT ?"
3685    };
3686
3687    let mut stmt = conn.prepare(query)?;
3688
3689    let memories = if let Some(ws) = workspace {
3690        stmt.query_map(
3691            params![ws, &pattern, &pattern, &now, limit as i64],
3692            memory_from_row,
3693        )?
3694        .collect::<std::result::Result<Vec<_>, _>>()?
3695    } else {
3696        stmt.query_map(
3697            params![&pattern, &pattern, &now, limit as i64],
3698            memory_from_row,
3699        )?
3700        .collect::<std::result::Result<Vec<_>, _>>()?
3701    };
3702
3703    Ok(memories)
3704}
3705
3706/// Search within session transcript chunks
3707pub fn search_sessions(
3708    conn: &Connection,
3709    query_text: &str,
3710    session_id: Option<&str>,
3711    workspace: Option<&str>,
3712    limit: Option<usize>,
3713) -> Result<Vec<Memory>> {
3714    let limit = limit.unwrap_or(20);
3715    let now = Utc::now().to_rfc3339();
3716    let pattern = format!("%{}%", query_text);
3717
3718    // Build query based on filters
3719    // Session chunks are stored as TranscriptChunk type (not Context)
3720    let mut conditions = vec![
3721        "m.memory_type = 'transcript_chunk'",
3722        "m.valid_to IS NULL",
3723        "(m.expires_at IS NULL OR m.expires_at > ?)",
3724    ];
3725    let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = vec![Box::new(now)];
3726
3727    // Add session filter via tags (tags are in junction table)
3728    let use_tag_join = session_id.is_some();
3729    if let Some(sid) = session_id {
3730        let tag_name = format!("session:{}", sid);
3731        conditions.push("t.name = ?");
3732        params_vec.push(Box::new(tag_name));
3733    }
3734
3735    // Add workspace filter
3736    if let Some(ws) = workspace {
3737        conditions.push("m.workspace = ?");
3738        params_vec.push(Box::new(ws.to_string()));
3739    }
3740
3741    // Add content search
3742    conditions.push("m.content LIKE ?");
3743    params_vec.push(Box::new(pattern));
3744
3745    // Add limit
3746    params_vec.push(Box::new(limit as i64));
3747
3748    // Build query with optional tag join
3749    let join_clause = if use_tag_join {
3750        "JOIN memory_tags mt ON m.id = mt.memory_id JOIN tags t ON mt.tag_id = t.id"
3751    } else {
3752        ""
3753    };
3754
3755    let query = format!(
3756        "SELECT DISTINCT m.id, m.content, m.memory_type, m.importance, m.access_count,
3757                m.created_at, m.updated_at, m.last_accessed_at, m.owner_id,
3758                m.visibility, m.version, m.has_embedding, m.metadata,
3759                m.scope_type, m.scope_id, m.workspace, m.tier, m.expires_at, m.content_hash
3760         FROM memories m {} WHERE {} ORDER BY m.created_at DESC LIMIT ?",
3761        join_clause,
3762        conditions.join(" AND ")
3763    );
3764
3765    let params_refs: Vec<&dyn rusqlite::ToSql> = params_vec.iter().map(|p| p.as_ref()).collect();
3766    let mut stmt = conn.prepare(&query)?;
3767    let memories = stmt
3768        .query_map(params_refs.as_slice(), memory_from_row)?
3769        .collect::<std::result::Result<Vec<_>, _>>()?;
3770
3771    Ok(memories)
3772}
3773
3774#[cfg(test)]
3775mod tests {
3776    use super::*;
3777    use crate::storage::Storage;
3778    use serde_json::json;
3779    use std::collections::HashMap;
3780
3781    #[test]
3782    fn test_list_memories_metadata_filter_types() {
3783        let storage = Storage::open_in_memory().unwrap();
3784
3785        storage
3786            .with_connection(|conn| {
3787                let mut metadata1 = HashMap::new();
3788                metadata1.insert("status".to_string(), json!("active"));
3789                metadata1.insert("count".to_string(), json!(3));
3790                metadata1.insert("flag".to_string(), json!(true));
3791
3792                let mut metadata2 = HashMap::new();
3793                metadata2.insert("status".to_string(), json!("inactive"));
3794                metadata2.insert("count".to_string(), json!(5));
3795                metadata2.insert("flag".to_string(), json!(false));
3796                metadata2.insert("optional".to_string(), json!("set"));
3797
3798                let memory1 = create_memory(
3799                    conn,
3800                    &CreateMemoryInput {
3801                        content: "First".to_string(),
3802                        memory_type: MemoryType::Note,
3803                        tags: vec![],
3804                        metadata: metadata1,
3805                        importance: None,
3806                        scope: Default::default(),
3807                        workspace: None,
3808                        tier: Default::default(),
3809                        defer_embedding: true,
3810                        ttl_seconds: None,
3811                        dedup_mode: Default::default(),
3812                        dedup_threshold: None,
3813                        event_time: None,
3814                        event_duration_seconds: None,
3815                        trigger_pattern: None,
3816                        summary_of_id: None,
3817                    },
3818                )?;
3819                let memory2 = create_memory(
3820                    conn,
3821                    &CreateMemoryInput {
3822                        content: "Second".to_string(),
3823                        memory_type: MemoryType::Note,
3824                        tags: vec![],
3825                        metadata: metadata2,
3826                        importance: None,
3827                        scope: Default::default(),
3828                        workspace: None,
3829                        tier: Default::default(),
3830                        defer_embedding: true,
3831                        ttl_seconds: None,
3832                        dedup_mode: Default::default(),
3833                        dedup_threshold: None,
3834                        event_time: None,
3835                        event_duration_seconds: None,
3836                        trigger_pattern: None,
3837                        summary_of_id: None,
3838                    },
3839                )?;
3840
3841                let mut filter = HashMap::new();
3842                filter.insert("status".to_string(), json!("active"));
3843                let results = list_memories(
3844                    conn,
3845                    &ListOptions {
3846                        metadata_filter: Some(filter),
3847                        ..Default::default()
3848                    },
3849                )?;
3850                assert_eq!(results.len(), 1);
3851                assert_eq!(results[0].id, memory1.id);
3852
3853                let mut filter = HashMap::new();
3854                filter.insert("count".to_string(), json!(5));
3855                let results = list_memories(
3856                    conn,
3857                    &ListOptions {
3858                        metadata_filter: Some(filter),
3859                        ..Default::default()
3860                    },
3861                )?;
3862                assert_eq!(results.len(), 1);
3863                assert_eq!(results[0].id, memory2.id);
3864
3865                let mut filter = HashMap::new();
3866                filter.insert("flag".to_string(), json!(true));
3867                let results = list_memories(
3868                    conn,
3869                    &ListOptions {
3870                        metadata_filter: Some(filter),
3871                        ..Default::default()
3872                    },
3873                )?;
3874                assert_eq!(results.len(), 1);
3875                assert_eq!(results[0].id, memory1.id);
3876
3877                let mut filter = HashMap::new();
3878                filter.insert("optional".to_string(), serde_json::Value::Null);
3879                let results = list_memories(
3880                    conn,
3881                    &ListOptions {
3882                        metadata_filter: Some(filter),
3883                        ..Default::default()
3884                    },
3885                )?;
3886                assert_eq!(results.len(), 1);
3887                assert_eq!(results[0].id, memory1.id);
3888
3889                Ok(())
3890            })
3891            .unwrap();
3892    }
3893
3894    #[test]
3895    fn test_memory_scope_isolation() {
3896        use crate::types::MemoryScope;
3897
3898        let storage = Storage::open_in_memory().unwrap();
3899
3900        storage
3901            .with_connection(|conn| {
3902                // Create memory with user scope
3903                let user1_memory = create_memory(
3904                    conn,
3905                    &CreateMemoryInput {
3906                        content: "User 1 memory".to_string(),
3907                        memory_type: MemoryType::Note,
3908                        tags: vec!["test".to_string()],
3909                        metadata: HashMap::new(),
3910                        importance: None,
3911                        scope: MemoryScope::user("user-1"),
3912                        workspace: None,
3913                        tier: Default::default(),
3914                        defer_embedding: true,
3915                        ttl_seconds: None,
3916                        dedup_mode: Default::default(),
3917                        dedup_threshold: None,
3918                        event_time: None,
3919                        event_duration_seconds: None,
3920                        trigger_pattern: None,
3921                        summary_of_id: None,
3922                    },
3923                )?;
3924
3925                // Create memory with different user scope
3926                let user2_memory = create_memory(
3927                    conn,
3928                    &CreateMemoryInput {
3929                        content: "User 2 memory".to_string(),
3930                        memory_type: MemoryType::Note,
3931                        tags: vec!["test".to_string()],
3932                        metadata: HashMap::new(),
3933                        importance: None,
3934                        scope: MemoryScope::user("user-2"),
3935                        workspace: None,
3936                        tier: Default::default(),
3937                        defer_embedding: true,
3938                        ttl_seconds: None,
3939                        dedup_mode: Default::default(),
3940                        dedup_threshold: None,
3941                        event_time: None,
3942                        event_duration_seconds: None,
3943                        trigger_pattern: None,
3944                        summary_of_id: None,
3945                    },
3946                )?;
3947
3948                // Create memory with session scope
3949                let session_memory = create_memory(
3950                    conn,
3951                    &CreateMemoryInput {
3952                        content: "Session memory".to_string(),
3953                        memory_type: MemoryType::Note,
3954                        tags: vec!["test".to_string()],
3955                        metadata: HashMap::new(),
3956                        importance: None,
3957                        scope: MemoryScope::session("session-abc"),
3958                        workspace: None,
3959                        tier: Default::default(),
3960                        defer_embedding: true,
3961                        ttl_seconds: None,
3962                        dedup_mode: Default::default(),
3963                        dedup_threshold: None,
3964                        event_time: None,
3965                        event_duration_seconds: None,
3966                        trigger_pattern: None,
3967                        summary_of_id: None,
3968                    },
3969                )?;
3970
3971                // Create memory with global scope
3972                let global_memory = create_memory(
3973                    conn,
3974                    &CreateMemoryInput {
3975                        content: "Global memory".to_string(),
3976                        memory_type: MemoryType::Note,
3977                        tags: vec!["test".to_string()],
3978                        metadata: HashMap::new(),
3979                        importance: None,
3980                        scope: MemoryScope::Global,
3981                        workspace: None,
3982                        tier: Default::default(),
3983                        defer_embedding: true,
3984                        ttl_seconds: None,
3985                        dedup_mode: Default::default(),
3986                        dedup_threshold: None,
3987                        event_time: None,
3988                        event_duration_seconds: None,
3989                        trigger_pattern: None,
3990                        summary_of_id: None,
3991                    },
3992                )?;
3993
3994                // Test: List all memories (no scope filter) should return all 4
3995                let all_results = list_memories(conn, &ListOptions::default())?;
3996                assert_eq!(all_results.len(), 4);
3997
3998                // Test: Filter by user-1 scope should return only user-1's memory
3999                let user1_results = list_memories(
4000                    conn,
4001                    &ListOptions {
4002                        scope: Some(MemoryScope::user("user-1")),
4003                        ..Default::default()
4004                    },
4005                )?;
4006                assert_eq!(user1_results.len(), 1);
4007                assert_eq!(user1_results[0].id, user1_memory.id);
4008                assert_eq!(user1_results[0].scope, MemoryScope::user("user-1"));
4009
4010                // Test: Filter by user-2 scope should return only user-2's memory
4011                let user2_results = list_memories(
4012                    conn,
4013                    &ListOptions {
4014                        scope: Some(MemoryScope::user("user-2")),
4015                        ..Default::default()
4016                    },
4017                )?;
4018                assert_eq!(user2_results.len(), 1);
4019                assert_eq!(user2_results[0].id, user2_memory.id);
4020
4021                // Test: Filter by session scope should return only session memory
4022                let session_results = list_memories(
4023                    conn,
4024                    &ListOptions {
4025                        scope: Some(MemoryScope::session("session-abc")),
4026                        ..Default::default()
4027                    },
4028                )?;
4029                assert_eq!(session_results.len(), 1);
4030                assert_eq!(session_results[0].id, session_memory.id);
4031
4032                // Test: Filter by global scope should return only global memory
4033                let global_results = list_memories(
4034                    conn,
4035                    &ListOptions {
4036                        scope: Some(MemoryScope::Global),
4037                        ..Default::default()
4038                    },
4039                )?;
4040                assert_eq!(global_results.len(), 1);
4041                assert_eq!(global_results[0].id, global_memory.id);
4042
4043                // Test: Verify scope is correctly stored and retrieved
4044                let retrieved = get_memory(conn, user1_memory.id)?;
4045                assert_eq!(retrieved.scope, MemoryScope::user("user-1"));
4046
4047                Ok(())
4048            })
4049            .unwrap();
4050    }
4051
4052    #[test]
4053    fn test_memory_scope_can_access() {
4054        use crate::types::MemoryScope;
4055
4056        // Global can access everything
4057        assert!(MemoryScope::Global.can_access(&MemoryScope::user("user-1")));
4058        assert!(MemoryScope::Global.can_access(&MemoryScope::session("session-1")));
4059        assert!(MemoryScope::Global.can_access(&MemoryScope::agent("agent-1")));
4060        assert!(MemoryScope::Global.can_access(&MemoryScope::Global));
4061
4062        // Same scope can access
4063        assert!(MemoryScope::user("user-1").can_access(&MemoryScope::user("user-1")));
4064        assert!(MemoryScope::session("s1").can_access(&MemoryScope::session("s1")));
4065        assert!(MemoryScope::agent("a1").can_access(&MemoryScope::agent("a1")));
4066
4067        // Different scope IDs cannot access each other
4068        assert!(!MemoryScope::user("user-1").can_access(&MemoryScope::user("user-2")));
4069        assert!(!MemoryScope::session("s1").can_access(&MemoryScope::session("s2")));
4070        assert!(!MemoryScope::agent("a1").can_access(&MemoryScope::agent("a2")));
4071
4072        // Different scope types cannot access each other
4073        assert!(!MemoryScope::user("user-1").can_access(&MemoryScope::session("s1")));
4074        assert!(!MemoryScope::session("s1").can_access(&MemoryScope::agent("a1")));
4075
4076        // Anyone can access global memories
4077        assert!(MemoryScope::user("user-1").can_access(&MemoryScope::Global));
4078        assert!(MemoryScope::session("s1").can_access(&MemoryScope::Global));
4079        assert!(MemoryScope::agent("a1").can_access(&MemoryScope::Global));
4080    }
4081
4082    #[test]
4083    fn test_memory_ttl_creation() {
4084        let storage = Storage::open_in_memory().unwrap();
4085
4086        storage
4087            .with_transaction(|conn| {
4088                // Create daily memory with TTL of 1 hour
4089                let memory = create_memory(
4090                    conn,
4091                    &CreateMemoryInput {
4092                        content: "Temporary memory".to_string(),
4093                        memory_type: MemoryType::Note,
4094                        tags: vec![],
4095                        metadata: HashMap::new(),
4096                        importance: None,
4097                        scope: Default::default(),
4098                        workspace: None,
4099                        tier: MemoryTier::Daily, // Daily tier for expiring memories
4100                        defer_embedding: true,
4101                        ttl_seconds: Some(3600), // 1 hour
4102                        dedup_mode: Default::default(),
4103                        dedup_threshold: None,
4104                        event_time: None,
4105                        event_duration_seconds: None,
4106                        trigger_pattern: None,
4107                        summary_of_id: None,
4108                    },
4109                )?;
4110
4111                // Verify expires_at is set and tier is daily
4112                assert!(memory.expires_at.is_some());
4113                assert_eq!(memory.tier, MemoryTier::Daily);
4114                let expires_at = memory.expires_at.unwrap();
4115                let now = Utc::now();
4116
4117                // Should expire approximately 1 hour from now (within 5 seconds tolerance)
4118                let diff = (expires_at - now).num_seconds();
4119                assert!(
4120                    (3595..=3605).contains(&diff),
4121                    "Expected ~3600 seconds, got {}",
4122                    diff
4123                );
4124
4125                // Create memory without TTL
4126                let permanent = create_memory(
4127                    conn,
4128                    &CreateMemoryInput {
4129                        content: "Permanent memory".to_string(),
4130                        memory_type: MemoryType::Note,
4131                        tags: vec![],
4132                        metadata: HashMap::new(),
4133                        importance: None,
4134                        scope: Default::default(),
4135                        workspace: None,
4136                        tier: Default::default(),
4137                        defer_embedding: true,
4138                        ttl_seconds: None,
4139                        dedup_mode: Default::default(),
4140                        dedup_threshold: None,
4141                        event_time: None,
4142                        event_duration_seconds: None,
4143                        trigger_pattern: None,
4144                        summary_of_id: None,
4145                    },
4146                )?;
4147
4148                // Verify expires_at is None for permanent memory
4149                assert!(permanent.expires_at.is_none());
4150
4151                Ok(())
4152            })
4153            .unwrap();
4154    }
4155
4156    #[test]
4157    fn test_expired_memories_excluded_from_queries() {
4158        let storage = Storage::open_in_memory().unwrap();
4159
4160        storage
4161            .with_transaction(|conn| {
4162                // Create a daily memory with TTL (will expire)
4163                let memory1 = create_memory(
4164                    conn,
4165                    &CreateMemoryInput {
4166                        content: "Memory to expire".to_string(),
4167                        memory_type: MemoryType::Note,
4168                        tags: vec!["test".to_string()],
4169                        metadata: HashMap::new(),
4170                        importance: None,
4171                        scope: Default::default(),
4172                        workspace: None,
4173                        tier: MemoryTier::Daily, // Daily tier for expiring memories
4174                        defer_embedding: true,
4175                        ttl_seconds: Some(3600), // 1 hour TTL
4176                        dedup_mode: Default::default(),
4177                        dedup_threshold: None,
4178                        event_time: None,
4179                        event_duration_seconds: None,
4180                        trigger_pattern: None,
4181                        summary_of_id: None,
4182                    },
4183                )?;
4184
4185                // Create a permanent memory
4186                let active = create_memory(
4187                    conn,
4188                    &CreateMemoryInput {
4189                        content: "Active memory".to_string(),
4190                        memory_type: MemoryType::Note,
4191                        tags: vec!["test".to_string()],
4192                        metadata: HashMap::new(),
4193                        importance: None,
4194                        scope: Default::default(),
4195                        workspace: None,
4196                        tier: Default::default(),
4197                        defer_embedding: true,
4198                        ttl_seconds: None,
4199                        dedup_mode: Default::default(),
4200                        dedup_threshold: None,
4201                        event_time: None,
4202                        event_duration_seconds: None,
4203                        trigger_pattern: None,
4204                        summary_of_id: None,
4205                    },
4206                )?;
4207
4208                // Both should be visible initially
4209                let results = list_memories(conn, &ListOptions::default())?;
4210                assert_eq!(results.len(), 2);
4211
4212                // Manually expire memory1 by setting expires_at to the past
4213                let past = (Utc::now() - chrono::Duration::hours(1)).to_rfc3339();
4214                conn.execute(
4215                    "UPDATE memories SET expires_at = ? WHERE id = ?",
4216                    params![past, memory1.id],
4217                )?;
4218
4219                // List should only return active memory now
4220                let results = list_memories(conn, &ListOptions::default())?;
4221                assert_eq!(results.len(), 1);
4222                assert_eq!(results[0].id, active.id);
4223
4224                // Direct get_memory should fail for expired
4225                let get_result = get_memory(conn, memory1.id);
4226                assert!(get_result.is_err());
4227
4228                // Direct get_memory should succeed for active
4229                let get_result = get_memory(conn, active.id);
4230                assert!(get_result.is_ok());
4231
4232                Ok(())
4233            })
4234            .unwrap();
4235    }
4236
4237    #[test]
4238    fn test_set_memory_expiration() {
4239        let storage = Storage::open_in_memory().unwrap();
4240
4241        storage
4242            .with_transaction(|conn| {
4243                // Create a permanent memory
4244                let memory = create_memory(
4245                    conn,
4246                    &CreateMemoryInput {
4247                        content: "Initially permanent".to_string(),
4248                        memory_type: MemoryType::Note,
4249                        tags: vec![],
4250                        metadata: HashMap::new(),
4251                        importance: None,
4252                        scope: Default::default(),
4253                        workspace: None,
4254                        tier: Default::default(),
4255                        defer_embedding: true,
4256                        ttl_seconds: None,
4257                        dedup_mode: Default::default(),
4258                        dedup_threshold: None,
4259                        event_time: None,
4260                        event_duration_seconds: None,
4261                        trigger_pattern: None,
4262                        summary_of_id: None,
4263                    },
4264                )?;
4265
4266                assert!(memory.expires_at.is_none());
4267
4268                // Set expiration to 30 minutes
4269                let updated = set_memory_expiration(conn, memory.id, Some(1800))?;
4270                assert!(updated.expires_at.is_some());
4271
4272                // Remove expiration (make permanent again) - use Some(0) to clear
4273                let permanent_again = set_memory_expiration(conn, memory.id, Some(0))?;
4274                assert!(permanent_again.expires_at.is_none());
4275
4276                Ok(())
4277            })
4278            .unwrap();
4279    }
4280
4281    #[test]
4282    fn test_cleanup_expired_memories() {
4283        let storage = Storage::open_in_memory().unwrap();
4284
4285        storage
4286            .with_transaction(|conn| {
4287                // Create 3 daily memories that we'll expire manually
4288                let mut expired_ids = vec![];
4289                for i in 0..3 {
4290                    let mem = create_memory(
4291                        conn,
4292                        &CreateMemoryInput {
4293                            content: format!("To expire {}", i),
4294                            memory_type: MemoryType::Note,
4295                            tags: vec![],
4296                            metadata: HashMap::new(),
4297                            importance: None,
4298                            scope: Default::default(),
4299                            workspace: None,
4300                            tier: MemoryTier::Daily, // Daily tier for expiring memories
4301                            defer_embedding: true,
4302                            ttl_seconds: Some(3600), // 1 hour TTL
4303                            dedup_mode: Default::default(),
4304                            dedup_threshold: None,
4305                            event_time: None,
4306                            event_duration_seconds: None,
4307                            trigger_pattern: None,
4308                            summary_of_id: None,
4309                        },
4310                    )?;
4311                    expired_ids.push(mem.id);
4312                }
4313
4314                // Create 2 active memories (permanent)
4315                for i in 0..2 {
4316                    create_memory(
4317                        conn,
4318                        &CreateMemoryInput {
4319                            content: format!("Active {}", i),
4320                            memory_type: MemoryType::Note,
4321                            tags: vec![],
4322                            metadata: HashMap::new(),
4323                            importance: None,
4324                            scope: Default::default(),
4325                            workspace: None,
4326                            tier: Default::default(),
4327                            defer_embedding: true,
4328                            ttl_seconds: None,
4329                            dedup_mode: Default::default(),
4330                            dedup_threshold: None,
4331                            event_time: None,
4332                            event_duration_seconds: None,
4333                            trigger_pattern: None,
4334                            summary_of_id: None,
4335                        },
4336                    )?;
4337                }
4338
4339                // All 5 should be visible initially
4340                let results = list_memories(conn, &ListOptions::default())?;
4341                assert_eq!(results.len(), 5);
4342
4343                // Manually expire the first 3 memories
4344                let past = (Utc::now() - chrono::Duration::hours(1)).to_rfc3339();
4345                for id in &expired_ids {
4346                    conn.execute(
4347                        "UPDATE memories SET expires_at = ? WHERE id = ?",
4348                        params![past, id],
4349                    )?;
4350                }
4351
4352                // Count expired
4353                let expired_count = count_expired_memories(conn)?;
4354                assert_eq!(expired_count, 3);
4355
4356                // Cleanup should delete 3
4357                let deleted = cleanup_expired_memories(conn)?;
4358                assert_eq!(deleted, 3);
4359
4360                // Verify only 2 remain
4361                let remaining = list_memories(conn, &ListOptions::default())?;
4362                assert_eq!(remaining.len(), 2);
4363
4364                // No more expired
4365                let expired_count = count_expired_memories(conn)?;
4366                assert_eq!(expired_count, 0);
4367
4368                Ok(())
4369            })
4370            .unwrap();
4371    }
4372
4373    // ========== Deduplication Tests (RML-931) ==========
4374
4375    #[test]
4376    fn test_content_hash_computation() {
4377        // Test that content hash is consistent and normalized
4378        let hash1 = compute_content_hash("Hello World");
4379        let hash2 = compute_content_hash("hello world"); // Different case
4380        let hash3 = compute_content_hash("  hello   world  "); // Extra whitespace
4381        let hash4 = compute_content_hash("Hello World!"); // Different content
4382
4383        // Same normalized content should produce same hash
4384        assert_eq!(hash1, hash2);
4385        assert_eq!(hash2, hash3);
4386
4387        // Different content should produce different hash
4388        assert_ne!(hash1, hash4);
4389
4390        // Hash should be prefixed with algorithm
4391        assert!(hash1.starts_with("sha256:"));
4392    }
4393
4394    #[test]
4395    fn test_dedup_mode_reject() {
4396        use crate::types::DedupMode;
4397
4398        let storage = Storage::open_in_memory().unwrap();
4399
4400        storage
4401            .with_transaction(|conn| {
4402                // Create first memory
4403                let _memory1 = create_memory(
4404                    conn,
4405                    &CreateMemoryInput {
4406                        content: "Unique content for testing".to_string(),
4407                        memory_type: MemoryType::Note,
4408                        tags: vec![],
4409                        metadata: HashMap::new(),
4410                        importance: None,
4411                        scope: Default::default(),
4412                        workspace: None,
4413                        tier: Default::default(),
4414                        defer_embedding: true,
4415                        ttl_seconds: None,
4416                        dedup_mode: DedupMode::Allow, // First one allows
4417                        dedup_threshold: None,
4418                        event_time: None,
4419                        event_duration_seconds: None,
4420                        trigger_pattern: None,
4421                        summary_of_id: None,
4422                    },
4423                )?;
4424
4425                // Try to create duplicate with reject mode
4426                let result = create_memory(
4427                    conn,
4428                    &CreateMemoryInput {
4429                        content: "Unique content for testing".to_string(), // Same content
4430                        memory_type: MemoryType::Note,
4431                        tags: vec!["new-tag".to_string()],
4432                        metadata: HashMap::new(),
4433                        importance: None,
4434                        scope: Default::default(),
4435                        workspace: None,
4436                        tier: Default::default(),
4437                        defer_embedding: true,
4438                        ttl_seconds: None,
4439                        dedup_mode: DedupMode::Reject,
4440                        dedup_threshold: None,
4441                        event_time: None,
4442                        event_duration_seconds: None,
4443                        trigger_pattern: None,
4444                        summary_of_id: None,
4445                    },
4446                );
4447
4448                // Should fail with Duplicate error
4449                assert!(result.is_err());
4450                let err = result.unwrap_err();
4451                assert!(matches!(err, crate::error::EngramError::Duplicate { .. }));
4452
4453                Ok(())
4454            })
4455            .unwrap();
4456    }
4457
4458    #[test]
4459    fn test_dedup_mode_skip() {
4460        use crate::types::DedupMode;
4461
4462        let storage = Storage::open_in_memory().unwrap();
4463
4464        storage
4465            .with_transaction(|conn| {
4466                // Create first memory
4467                let memory1 = create_memory(
4468                    conn,
4469                    &CreateMemoryInput {
4470                        content: "Skip test content".to_string(),
4471                        memory_type: MemoryType::Note,
4472                        tags: vec!["original".to_string()],
4473                        metadata: HashMap::new(),
4474                        importance: Some(0.5),
4475                        scope: Default::default(),
4476                        workspace: None,
4477                        tier: Default::default(),
4478                        defer_embedding: true,
4479                        ttl_seconds: None,
4480                        dedup_mode: DedupMode::Allow,
4481                        dedup_threshold: None,
4482                        event_time: None,
4483                        event_duration_seconds: None,
4484                        trigger_pattern: None,
4485                        summary_of_id: None,
4486                    },
4487                )?;
4488
4489                // Try to create duplicate with skip mode
4490                let memory2 = create_memory(
4491                    conn,
4492                    &CreateMemoryInput {
4493                        content: "Skip test content".to_string(), // Same content
4494                        memory_type: MemoryType::Note,
4495                        tags: vec!["new-tag".to_string()], // Different tags
4496                        metadata: HashMap::new(),
4497                        importance: Some(0.9), // Different importance
4498                        scope: Default::default(),
4499                        workspace: None,
4500                        tier: Default::default(),
4501                        defer_embedding: true,
4502                        ttl_seconds: None,
4503                        dedup_mode: DedupMode::Skip,
4504                        dedup_threshold: None,
4505                        event_time: None,
4506                        event_duration_seconds: None,
4507                        trigger_pattern: None,
4508                        summary_of_id: None,
4509                    },
4510                )?;
4511
4512                // Should return existing memory unchanged
4513                assert_eq!(memory1.id, memory2.id);
4514                assert_eq!(memory2.tags, vec!["original".to_string()]); // Original tags
4515                assert!((memory2.importance - 0.5).abs() < 0.01); // Original importance
4516
4517                // Only one memory should exist
4518                let all = list_memories(conn, &ListOptions::default())?;
4519                assert_eq!(all.len(), 1);
4520
4521                Ok(())
4522            })
4523            .unwrap();
4524    }
4525
4526    #[test]
4527    fn test_dedup_mode_merge() {
4528        use crate::types::DedupMode;
4529
4530        let storage = Storage::open_in_memory().unwrap();
4531
4532        storage
4533            .with_transaction(|conn| {
4534                // Create first memory with some tags and metadata
4535                let memory1 = create_memory(
4536                    conn,
4537                    &CreateMemoryInput {
4538                        content: "Merge test content".to_string(),
4539                        memory_type: MemoryType::Note,
4540                        tags: vec!["tag1".to_string(), "tag2".to_string()],
4541                        metadata: {
4542                            let mut m = HashMap::new();
4543                            m.insert("key1".to_string(), serde_json::json!("value1"));
4544                            m
4545                        },
4546                        importance: Some(0.5),
4547                        scope: Default::default(),
4548                        workspace: None,
4549                        tier: Default::default(),
4550                        defer_embedding: true,
4551                        ttl_seconds: None,
4552                        dedup_mode: DedupMode::Allow,
4553                        dedup_threshold: None,
4554                        event_time: None,
4555                        event_duration_seconds: None,
4556                        trigger_pattern: None,
4557                        summary_of_id: None,
4558                    },
4559                )?;
4560
4561                // Try to create duplicate with merge mode
4562                let memory2 = create_memory(
4563                    conn,
4564                    &CreateMemoryInput {
4565                        content: "Merge test content".to_string(), // Same content
4566                        memory_type: MemoryType::Note,
4567                        tags: vec!["tag2".to_string(), "tag3".to_string()], // Overlapping + new
4568                        metadata: {
4569                            let mut m = HashMap::new();
4570                            m.insert("key2".to_string(), serde_json::json!("value2"));
4571                            m
4572                        },
4573                        importance: Some(0.8), // Higher importance
4574                        scope: Default::default(),
4575                        workspace: None,
4576                        tier: Default::default(),
4577                        defer_embedding: true,
4578                        ttl_seconds: None,
4579                        dedup_mode: DedupMode::Merge,
4580                        dedup_threshold: None,
4581                        event_time: None,
4582                        event_duration_seconds: None,
4583                        trigger_pattern: None,
4584                        summary_of_id: None,
4585                    },
4586                )?;
4587
4588                // Should return same memory ID
4589                assert_eq!(memory1.id, memory2.id);
4590
4591                // Tags should be merged (no duplicates)
4592                assert!(memory2.tags.contains(&"tag1".to_string()));
4593                assert!(memory2.tags.contains(&"tag2".to_string()));
4594                assert!(memory2.tags.contains(&"tag3".to_string()));
4595                assert_eq!(memory2.tags.len(), 3);
4596
4597                // Metadata should be merged
4598                assert!(memory2.metadata.contains_key("key1"));
4599                assert!(memory2.metadata.contains_key("key2"));
4600
4601                // Only one memory should exist
4602                let all = list_memories(conn, &ListOptions::default())?;
4603                assert_eq!(all.len(), 1);
4604
4605                Ok(())
4606            })
4607            .unwrap();
4608    }
4609
4610    #[test]
4611    fn test_dedup_mode_allow() {
4612        use crate::types::DedupMode;
4613
4614        let storage = Storage::open_in_memory().unwrap();
4615
4616        storage
4617            .with_transaction(|conn| {
4618                // Create first memory
4619                let memory1 = create_memory(
4620                    conn,
4621                    &CreateMemoryInput {
4622                        content: "Allow duplicates content".to_string(),
4623                        memory_type: MemoryType::Note,
4624                        tags: vec![],
4625                        metadata: HashMap::new(),
4626                        importance: None,
4627                        scope: Default::default(),
4628                        workspace: None,
4629                        tier: Default::default(),
4630                        defer_embedding: true,
4631                        ttl_seconds: None,
4632                        dedup_mode: DedupMode::Allow,
4633                        dedup_threshold: None,
4634                        event_time: None,
4635                        event_duration_seconds: None,
4636                        trigger_pattern: None,
4637                        summary_of_id: None,
4638                    },
4639                )?;
4640
4641                // Create duplicate with allow mode (default)
4642                let memory2 = create_memory(
4643                    conn,
4644                    &CreateMemoryInput {
4645                        content: "Allow duplicates content".to_string(), // Same content
4646                        memory_type: MemoryType::Note,
4647                        tags: vec![],
4648                        metadata: HashMap::new(),
4649                        importance: None,
4650                        scope: Default::default(),
4651                        workspace: None,
4652                        tier: Default::default(),
4653                        defer_embedding: true,
4654                        ttl_seconds: None,
4655                        dedup_mode: DedupMode::Allow,
4656                        dedup_threshold: None,
4657                        event_time: None,
4658                        event_duration_seconds: None,
4659                        trigger_pattern: None,
4660                        summary_of_id: None,
4661                    },
4662                )?;
4663
4664                // Should create separate memory
4665                assert_ne!(memory1.id, memory2.id);
4666
4667                // Both memories should exist
4668                let all = list_memories(conn, &ListOptions::default())?;
4669                assert_eq!(all.len(), 2);
4670
4671                // Both should have same content hash
4672                assert_eq!(memory1.content_hash, memory2.content_hash);
4673
4674                Ok(())
4675            })
4676            .unwrap();
4677    }
4678
4679    #[test]
4680    fn test_find_duplicates_exact_hash() {
4681        use crate::types::DedupMode;
4682
4683        let storage = Storage::open_in_memory().unwrap();
4684
4685        storage
4686            .with_transaction(|conn| {
4687                // Create two memories with same content (exact hash duplicates)
4688                let _memory1 = create_memory(
4689                    conn,
4690                    &CreateMemoryInput {
4691                        content: "Duplicate content".to_string(),
4692                        memory_type: MemoryType::Note,
4693                        tags: vec!["first".to_string()],
4694                        metadata: HashMap::new(),
4695                        importance: None,
4696                        scope: Default::default(),
4697                        workspace: None,
4698                        tier: Default::default(),
4699                        defer_embedding: true,
4700                        ttl_seconds: None,
4701                        dedup_mode: DedupMode::Allow,
4702                        dedup_threshold: None,
4703                        event_time: None,
4704                        event_duration_seconds: None,
4705                        trigger_pattern: None,
4706                        summary_of_id: None,
4707                    },
4708                )?;
4709
4710                let _memory2 = create_memory(
4711                    conn,
4712                    &CreateMemoryInput {
4713                        content: "Duplicate content".to_string(), // Same content
4714                        memory_type: MemoryType::Note,
4715                        tags: vec!["second".to_string()],
4716                        metadata: HashMap::new(),
4717                        importance: None,
4718                        scope: Default::default(),
4719                        workspace: None,
4720                        tier: Default::default(),
4721                        defer_embedding: true,
4722                        ttl_seconds: None,
4723                        dedup_mode: DedupMode::Allow,
4724                        dedup_threshold: None,
4725                        event_time: None,
4726                        event_duration_seconds: None,
4727                        trigger_pattern: None,
4728                        summary_of_id: None,
4729                    },
4730                )?;
4731
4732                // Create a unique memory (not a duplicate)
4733                let _memory3 = create_memory(
4734                    conn,
4735                    &CreateMemoryInput {
4736                        content: "Unique content".to_string(),
4737                        memory_type: MemoryType::Note,
4738                        tags: vec![],
4739                        metadata: HashMap::new(),
4740                        importance: None,
4741                        scope: Default::default(),
4742                        workspace: None,
4743                        tier: Default::default(),
4744                        defer_embedding: true,
4745                        ttl_seconds: None,
4746                        dedup_mode: DedupMode::Allow,
4747                        dedup_threshold: None,
4748                        event_time: None,
4749                        event_duration_seconds: None,
4750                        trigger_pattern: None,
4751                        summary_of_id: None,
4752                    },
4753                )?;
4754
4755                // Find duplicates
4756                let duplicates = find_duplicates(conn, 0.9)?;
4757
4758                // Should find one duplicate pair
4759                assert_eq!(duplicates.len(), 1);
4760
4761                // Should be exact hash match
4762                assert_eq!(duplicates[0].match_type, DuplicateMatchType::ExactHash);
4763                assert!((duplicates[0].similarity_score - 1.0).abs() < 0.01);
4764
4765                Ok(())
4766            })
4767            .unwrap();
4768    }
4769
4770    #[test]
4771    fn test_content_hash_stored_on_create() {
4772        let storage = Storage::open_in_memory().unwrap();
4773
4774        storage
4775            .with_transaction(|conn| {
4776                let memory = create_memory(
4777                    conn,
4778                    &CreateMemoryInput {
4779                        content: "Test content for hash".to_string(),
4780                        memory_type: MemoryType::Note,
4781                        tags: vec![],
4782                        metadata: HashMap::new(),
4783                        importance: None,
4784                        scope: Default::default(),
4785                        workspace: None,
4786                        tier: Default::default(),
4787                        defer_embedding: true,
4788                        ttl_seconds: None,
4789                        dedup_mode: Default::default(),
4790                        dedup_threshold: None,
4791                        event_time: None,
4792                        event_duration_seconds: None,
4793                        trigger_pattern: None,
4794                        summary_of_id: None,
4795                    },
4796                )?;
4797
4798                // Content hash should be set
4799                assert!(memory.content_hash.is_some());
4800                let hash = memory.content_hash.as_ref().unwrap();
4801                assert!(hash.starts_with("sha256:"));
4802
4803                // Fetch from DB and verify hash is persisted
4804                let fetched = get_memory(conn, memory.id)?;
4805                assert_eq!(fetched.content_hash, memory.content_hash);
4806
4807                Ok(())
4808            })
4809            .unwrap();
4810    }
4811
4812    #[test]
4813    fn test_update_memory_recalculates_hash() {
4814        let storage = Storage::open_in_memory().unwrap();
4815
4816        storage
4817            .with_transaction(|conn| {
4818                // Create a memory
4819                let memory = create_memory(
4820                    conn,
4821                    &CreateMemoryInput {
4822                        content: "Original content".to_string(),
4823                        memory_type: MemoryType::Note,
4824                        tags: vec![],
4825                        metadata: HashMap::new(),
4826                        importance: None,
4827                        scope: Default::default(),
4828                        workspace: None,
4829                        tier: Default::default(),
4830                        defer_embedding: true,
4831                        ttl_seconds: None,
4832                        dedup_mode: Default::default(),
4833                        dedup_threshold: None,
4834                        event_time: None,
4835                        event_duration_seconds: None,
4836                        trigger_pattern: None,
4837                        summary_of_id: None,
4838                    },
4839                )?;
4840
4841                let original_hash = memory.content_hash.clone();
4842
4843                // Update the content
4844                let updated = update_memory(
4845                    conn,
4846                    memory.id,
4847                    &UpdateMemoryInput {
4848                        content: Some("Updated content".to_string()),
4849                        memory_type: None,
4850                        tags: None,
4851                        metadata: None,
4852                        importance: None,
4853                        scope: None,
4854                        ttl_seconds: None,
4855                        event_time: None,
4856                        trigger_pattern: None,
4857                    },
4858                )?;
4859
4860                // Hash should be different
4861                assert_ne!(updated.content_hash, original_hash);
4862                assert!(updated.content_hash.is_some());
4863
4864                // Verify against expected hash
4865                let expected_hash = compute_content_hash("Updated content");
4866                assert_eq!(updated.content_hash.as_ref().unwrap(), &expected_hash);
4867
4868                Ok(())
4869            })
4870            .unwrap();
4871    }
4872
4873    #[test]
4874    fn test_dedup_scope_isolation() {
4875        use crate::types::{DedupMode, MemoryScope};
4876
4877        let storage = Storage::open_in_memory().unwrap();
4878
4879        storage
4880            .with_transaction(|conn| {
4881                // Create memory in user-1 scope
4882                let _user1_memory = create_memory(
4883                    conn,
4884                    &CreateMemoryInput {
4885                        content: "Shared content".to_string(),
4886                        memory_type: MemoryType::Note,
4887                        tags: vec!["user1".to_string()],
4888                        metadata: HashMap::new(),
4889                        importance: None,
4890                        scope: MemoryScope::user("user-1"),
4891                        workspace: None,
4892                        tier: Default::default(),
4893                        defer_embedding: true,
4894                        ttl_seconds: None,
4895                        dedup_mode: DedupMode::Allow,
4896                        dedup_threshold: None,
4897                        event_time: None,
4898                        event_duration_seconds: None,
4899                        trigger_pattern: None,
4900                        summary_of_id: None,
4901                    },
4902                )?;
4903
4904                // Create same content in user-2 scope with Reject mode
4905                // Should succeed because scopes are different
4906                let user2_result = create_memory(
4907                    conn,
4908                    &CreateMemoryInput {
4909                        content: "Shared content".to_string(), // Same content!
4910                        memory_type: MemoryType::Note,
4911                        tags: vec!["user2".to_string()],
4912                        metadata: HashMap::new(),
4913                        importance: None,
4914                        scope: MemoryScope::user("user-2"), // Different scope
4915                        workspace: None,
4916                        tier: Default::default(),
4917                        defer_embedding: true,
4918                        ttl_seconds: None,
4919                        dedup_mode: DedupMode::Reject, // Should not reject - different scope
4920                        dedup_threshold: None,
4921                        event_time: None,
4922                        event_duration_seconds: None,
4923                        trigger_pattern: None,
4924                        summary_of_id: None,
4925                    },
4926                );
4927
4928                // Should succeed - different scopes are not considered duplicates
4929                assert!(user2_result.is_ok());
4930                let _user2_memory = user2_result.unwrap();
4931
4932                // Now try to create duplicate in same scope (user-2)
4933                let duplicate_result = create_memory(
4934                    conn,
4935                    &CreateMemoryInput {
4936                        content: "Shared content".to_string(), // Same content
4937                        memory_type: MemoryType::Note,
4938                        tags: vec![],
4939                        metadata: HashMap::new(),
4940                        importance: None,
4941                        scope: MemoryScope::user("user-2"), // Same scope as user2_memory
4942                        workspace: None,
4943                        tier: Default::default(),
4944                        defer_embedding: true,
4945                        ttl_seconds: None,
4946                        dedup_mode: DedupMode::Reject, // Should reject - same scope
4947                        dedup_threshold: None,
4948                        event_time: None,
4949                        event_duration_seconds: None,
4950                        trigger_pattern: None,
4951                        summary_of_id: None,
4952                    },
4953                );
4954
4955                // Should fail - same scope with same content
4956                assert!(duplicate_result.is_err());
4957                assert!(matches!(
4958                    duplicate_result.unwrap_err(),
4959                    crate::error::EngramError::Duplicate { .. }
4960                ));
4961
4962                // Verify we have exactly 2 memories (one per user)
4963                let all = list_memories(conn, &ListOptions::default())?;
4964                assert_eq!(all.len(), 2);
4965
4966                Ok(())
4967            })
4968            .unwrap();
4969    }
4970
4971    #[test]
4972    fn test_find_similar_by_embedding() {
4973        // Helper to store embedding (convert f32 vec to bytes for SQLite)
4974        fn store_test_embedding(
4975            conn: &Connection,
4976            memory_id: i64,
4977            embedding: &[f32],
4978        ) -> crate::error::Result<()> {
4979            let bytes: Vec<u8> = embedding.iter().flat_map(|f| f.to_le_bytes()).collect();
4980            conn.execute(
4981                "INSERT INTO embeddings (memory_id, embedding, model, dimensions, created_at)
4982                 VALUES (?, ?, ?, ?, datetime('now'))",
4983                params![memory_id, bytes, "test", embedding.len() as i32],
4984            )?;
4985            // Mark memory as having embedding
4986            conn.execute(
4987                "UPDATE memories SET has_embedding = 1 WHERE id = ?",
4988                params![memory_id],
4989            )?;
4990            Ok(())
4991        }
4992
4993        let storage = Storage::open_in_memory().unwrap();
4994        storage
4995            .with_transaction(|conn| {
4996                // Create a memory with an embedding
4997                let memory1 = create_memory(
4998                    conn,
4999                    &CreateMemoryInput {
5000                        content: "Rust is a systems programming language".to_string(),
5001                        memory_type: MemoryType::Note,
5002                        tags: vec!["rust".to_string()],
5003                        metadata: std::collections::HashMap::new(),
5004                        importance: None,
5005                        scope: MemoryScope::Global,
5006                        workspace: None,
5007                        tier: Default::default(),
5008                        defer_embedding: false,
5009                        ttl_seconds: None,
5010                        dedup_mode: DedupMode::Allow,
5011                        dedup_threshold: None,
5012                        event_time: None,
5013                        event_duration_seconds: None,
5014                        trigger_pattern: None,
5015                        summary_of_id: None,
5016                    },
5017                )?;
5018
5019                // Store an embedding for it (simple test embedding)
5020                let embedding1 = vec![0.8, 0.4, 0.2, 0.1]; // Normalized-ish vector
5021                store_test_embedding(conn, memory1.id, &embedding1)?;
5022
5023                // Create another memory with different embedding
5024                let memory2 = create_memory(
5025                    conn,
5026                    &CreateMemoryInput {
5027                        content: "Python is a scripting language".to_string(),
5028                        memory_type: MemoryType::Note,
5029                        tags: vec!["python".to_string()],
5030                        metadata: std::collections::HashMap::new(),
5031                        importance: None,
5032                        scope: MemoryScope::Global,
5033                        workspace: None,
5034                        tier: Default::default(),
5035                        defer_embedding: false,
5036                        ttl_seconds: None,
5037                        dedup_mode: DedupMode::Allow,
5038                        dedup_threshold: None,
5039                        event_time: None,
5040                        event_duration_seconds: None,
5041                        trigger_pattern: None,
5042                        summary_of_id: None,
5043                    },
5044                )?;
5045
5046                // Store a very different embedding
5047                let embedding2 = vec![0.1, 0.2, 0.8, 0.4]; // Different direction
5048                store_test_embedding(conn, memory2.id, &embedding2)?;
5049
5050                // Test 1: Query with embedding similar to memory1
5051                let query_similar_to_1 = vec![0.79, 0.41, 0.21, 0.11]; // Very similar to embedding1
5052                let result = find_similar_by_embedding(
5053                    conn,
5054                    &query_similar_to_1,
5055                    &MemoryScope::Global,
5056                    None, // default workspace
5057                    0.95, // High threshold
5058                )?;
5059                assert!(result.is_some());
5060                let (found_memory, similarity) = result.unwrap();
5061                assert_eq!(found_memory.id, memory1.id);
5062                assert!(similarity > 0.95);
5063
5064                // Test 2: Query with low threshold should still find memory1
5065                let result_low_threshold = find_similar_by_embedding(
5066                    conn,
5067                    &query_similar_to_1,
5068                    &MemoryScope::Global,
5069                    None,
5070                    0.5,
5071                )?;
5072                assert!(result_low_threshold.is_some());
5073
5074                // Test 3: Query with embedding not similar to anything (threshold too high)
5075                let query_orthogonal = vec![0.0, 0.0, 0.0, 1.0]; // Different direction
5076                let result_no_match = find_similar_by_embedding(
5077                    conn,
5078                    &query_orthogonal,
5079                    &MemoryScope::Global,
5080                    None,
5081                    0.99, // Very high threshold
5082                )?;
5083                assert!(result_no_match.is_none());
5084
5085                // Test 4: Different scope should not find anything
5086                let result_wrong_scope = find_similar_by_embedding(
5087                    conn,
5088                    &query_similar_to_1,
5089                    &MemoryScope::User {
5090                        user_id: "other-user".to_string(),
5091                    },
5092                    None,
5093                    0.5,
5094                )?;
5095                assert!(result_wrong_scope.is_none());
5096
5097                Ok(())
5098            })
5099            .unwrap();
5100    }
5101}