Skip to main content

engram/storage/
queries.rs

1//! Database queries for memory operations
2
3use chrono::{DateTime, Utc};
4use rusqlite::{params, Connection, OptionalExtension, Row};
5use serde::{Deserialize, Serialize};
6use sha2::{Digest, Sha256};
7use std::collections::HashMap;
8
9use crate::error::{EngramError, Result};
10use crate::types::*;
11
12/// Parse a memory from a database row
13pub fn memory_from_row(row: &Row) -> rusqlite::Result<Memory> {
14    let id: i64 = row.get("id")?;
15    let content: String = row.get("content")?;
16    let memory_type_str: String = row.get("memory_type")?;
17    let importance: f32 = row.get("importance")?;
18    let access_count: i32 = row.get("access_count")?;
19    let created_at: String = row.get("created_at")?;
20    let updated_at: String = row.get("updated_at")?;
21    let last_accessed_at: Option<String> = row.get("last_accessed_at")?;
22    let owner_id: Option<String> = row.get("owner_id")?;
23    let visibility_str: String = row.get("visibility")?;
24    let version: i32 = row.get("version")?;
25    let has_embedding: i32 = row.get("has_embedding")?;
26    let metadata_str: String = row.get("metadata")?;
27
28    // Scope columns (with fallback for backward compatibility)
29    let scope_type: String = row
30        .get("scope_type")
31        .unwrap_or_else(|_| "global".to_string());
32    let scope_id: Option<String> = row.get("scope_id").unwrap_or(None);
33
34    // TTL column (with fallback for backward compatibility)
35    let expires_at: Option<String> = row.get("expires_at").unwrap_or(None);
36
37    // Content hash column (with fallback for backward compatibility)
38    let content_hash: Option<String> = row.get("content_hash").unwrap_or(None);
39
40    let memory_type = memory_type_str.parse().unwrap_or(MemoryType::Note);
41    let visibility = match visibility_str.as_str() {
42        "shared" => Visibility::Shared,
43        "public" => Visibility::Public,
44        _ => Visibility::Private,
45    };
46
47    // Parse scope from type and id
48    let scope = match (scope_type.as_str(), scope_id) {
49        ("user", Some(id)) => MemoryScope::User { user_id: id },
50        ("session", Some(id)) => MemoryScope::Session { session_id: id },
51        ("agent", Some(id)) => MemoryScope::Agent { agent_id: id },
52        _ => MemoryScope::Global,
53    };
54
55    let metadata: HashMap<String, serde_json::Value> =
56        serde_json::from_str(&metadata_str).unwrap_or_default();
57
58    // Workspace column (with fallback for backward compatibility)
59    let workspace: String = row
60        .get("workspace")
61        .unwrap_or_else(|_| "default".to_string());
62
63    // Tier column (with fallback for backward compatibility)
64    let tier_str: String = row.get("tier").unwrap_or_else(|_| "permanent".to_string());
65    let tier = tier_str.parse().unwrap_or_default();
66
67    let event_time: Option<String> = row.get("event_time").unwrap_or(None);
68    let event_duration_seconds: Option<i64> = row.get("event_duration_seconds").unwrap_or(None);
69    let trigger_pattern: Option<String> = row.get("trigger_pattern").unwrap_or(None);
70    let procedure_success_count: i32 = row.get("procedure_success_count").unwrap_or(0);
71    let procedure_failure_count: i32 = row.get("procedure_failure_count").unwrap_or(0);
72    let summary_of_id: Option<i64> = row.get("summary_of_id").unwrap_or(None);
73    let lifecycle_state_str: Option<String> = row.get("lifecycle_state").unwrap_or(None);
74
75    let lifecycle_state = lifecycle_state_str
76        .and_then(|s| s.parse().ok())
77        .unwrap_or(crate::types::LifecycleState::Active);
78
79    // media_url column (additive, nullable — with fallback for older schema versions)
80    let media_url: Option<String> = row.get("media_url").unwrap_or(None);
81
82    Ok(Memory {
83        id,
84        content,
85        memory_type,
86        tags: vec![], // Loaded separately
87        metadata,
88        importance,
89        access_count,
90        created_at: DateTime::parse_from_rfc3339(&created_at)
91            .map(|dt| dt.with_timezone(&Utc))
92            .unwrap_or_else(|_| Utc::now()),
93        updated_at: DateTime::parse_from_rfc3339(&updated_at)
94            .map(|dt| dt.with_timezone(&Utc))
95            .unwrap_or_else(|_| Utc::now()),
96        last_accessed_at: last_accessed_at.and_then(|s| {
97            DateTime::parse_from_rfc3339(&s)
98                .map(|dt| dt.with_timezone(&Utc))
99                .ok()
100        }),
101        owner_id,
102        visibility,
103        scope,
104        workspace,
105        tier,
106        version,
107        has_embedding: has_embedding != 0,
108        expires_at: expires_at.and_then(|s| {
109            DateTime::parse_from_rfc3339(&s)
110                .map(|dt| dt.with_timezone(&Utc))
111                .ok()
112        }),
113        content_hash,
114        event_time: event_time.and_then(|s| {
115            DateTime::parse_from_rfc3339(&s)
116                .map(|dt| dt.with_timezone(&Utc))
117                .ok()
118        }),
119        event_duration_seconds,
120        trigger_pattern,
121        procedure_success_count,
122        procedure_failure_count,
123        summary_of_id,
124        lifecycle_state,
125        media_url,
126    })
127}
128
129pub(crate) fn metadata_value_to_param(
130    key: &str,
131    value: &serde_json::Value,
132    conditions: &mut Vec<String>,
133    params: &mut Vec<Box<dyn rusqlite::ToSql>>,
134) -> Result<()> {
135    match value {
136        serde_json::Value::String(s) => {
137            conditions.push(format!("json_extract(m.metadata, '$.{}') = ?", key));
138            params.push(Box::new(s.clone()));
139        }
140        serde_json::Value::Number(n) => {
141            conditions.push(format!("json_extract(m.metadata, '$.{}') = ?", key));
142            if let Some(i) = n.as_i64() {
143                params.push(Box::new(i));
144            } else if let Some(f) = n.as_f64() {
145                params.push(Box::new(f));
146            } else {
147                return Err(EngramError::InvalidInput("Invalid number".to_string()));
148            }
149        }
150        serde_json::Value::Bool(b) => {
151            conditions.push(format!("json_extract(m.metadata, '$.{}') = ?", key));
152            params.push(Box::new(*b));
153        }
154        serde_json::Value::Null => {
155            conditions.push(format!("json_extract(m.metadata, '$.{}') IS NULL", key));
156        }
157        _ => {
158            return Err(EngramError::InvalidInput(format!(
159                "Unsupported metadata filter value for key: {}",
160                key
161            )));
162        }
163    }
164
165    Ok(())
166}
167
168fn get_memory_internal(conn: &Connection, id: i64, track_access: bool) -> Result<Memory> {
169    let now = Utc::now().to_rfc3339();
170
171    let mut stmt = conn.prepare_cached(
172        "SELECT id, content, memory_type, importance, access_count,
173                created_at, updated_at, last_accessed_at, owner_id,
174                visibility, version, has_embedding, metadata,
175                scope_type, scope_id, workspace, tier, expires_at, content_hash,
176                event_time, event_duration_seconds, trigger_pattern, procedure_success_count,
177                procedure_failure_count, summary_of_id, lifecycle_state, media_url
178         FROM memories
179         WHERE id = ? AND valid_to IS NULL
180           AND (expires_at IS NULL OR expires_at > ?)",
181    )?;
182
183    let mut memory = stmt
184        .query_row(params![id, now], memory_from_row)
185        .map_err(|_| EngramError::NotFound(id))?;
186
187    memory.tags = load_tags(conn, id)?;
188
189    if track_access {
190        // Update access tracking
191        let now = Utc::now().to_rfc3339();
192        conn.execute(
193            "UPDATE memories SET access_count = access_count + 1, last_accessed_at = ?
194             WHERE id = ?",
195            params![now, id],
196        )?;
197    }
198
199    Ok(memory)
200}
201
202/// Load tags for a memory
203pub fn load_tags(conn: &Connection, memory_id: i64) -> Result<Vec<String>> {
204    let mut stmt = conn.prepare_cached(
205        "SELECT t.name FROM tags t
206         JOIN memory_tags mt ON t.id = mt.tag_id
207         WHERE mt.memory_id = ?",
208    )?;
209
210    let tags: Vec<String> = stmt
211        .query_map([memory_id], |row| row.get(0))?
212        .filter_map(|r| r.ok())
213        .collect();
214
215    Ok(tags)
216}
217
218/// Compute SHA256 hash of normalized content for deduplication
219pub fn compute_content_hash(content: &str) -> String {
220    // Normalize: lowercase, collapse whitespace, trim
221    let normalized = content
222        .to_lowercase()
223        .split_whitespace()
224        .collect::<Vec<_>>()
225        .join(" ");
226
227    let mut hasher = Sha256::new();
228    hasher.update(normalized.as_bytes());
229    format!("sha256:{}", hex::encode(hasher.finalize()))
230}
231
232/// Find a memory by content hash within the same scope and workspace (exact duplicate detection)
233///
234/// Deduplication respects both scope and workspace isolation:
235/// - User-scoped memories only dedupe against other memories with same user_id
236/// - Session-scoped memories only dedupe against other memories with same session_id
237/// - Global memories only dedupe against other global memories
238/// - All deduplication is workspace-scoped (memories in different workspaces are never duplicates)
239pub fn find_by_content_hash(
240    conn: &Connection,
241    content_hash: &str,
242    scope: &MemoryScope,
243    workspace: Option<&str>,
244) -> Result<Option<Memory>> {
245    let now = Utc::now().to_rfc3339();
246    let scope_type = scope.scope_type();
247    let scope_id = scope.scope_id().map(|s| s.to_string());
248    let workspace = workspace.unwrap_or("default");
249
250    let mut stmt = conn.prepare_cached(
251        "SELECT id, content, memory_type, importance, access_count,
252                created_at, updated_at, last_accessed_at, owner_id,
253                visibility, version, has_embedding, metadata,
254                scope_type, scope_id, workspace, tier, expires_at, content_hash,
255                event_time, event_duration_seconds, trigger_pattern, procedure_success_count,
256                procedure_failure_count, summary_of_id, lifecycle_state, media_url
257         FROM memories
258         WHERE content_hash = ? AND valid_to IS NULL
259           AND (expires_at IS NULL OR expires_at > ?)
260           AND scope_type = ?
261           AND (scope_id = ? OR (scope_id IS NULL AND ? IS NULL))
262           AND workspace = ?
263         LIMIT 1",
264    )?;
265
266    let result = stmt
267        .query_row(
268            params![content_hash, now, scope_type, scope_id, scope_id, workspace],
269            memory_from_row,
270        )
271        .ok();
272
273    if let Some(mut memory) = result {
274        memory.tags = load_tags(conn, memory.id)?;
275        Ok(Some(memory))
276    } else {
277        Ok(None)
278    }
279}
280
281/// Find the most similar memory to given embedding within the same scope AND workspace (semantic duplicate detection)
282///
283/// Returns the memory with the highest similarity score if it meets the threshold.
284/// Only checks memories that have embeddings computed.
285pub fn find_similar_by_embedding(
286    conn: &Connection,
287    query_embedding: &[f32],
288    scope: &MemoryScope,
289    workspace: Option<&str>,
290    threshold: f32,
291) -> Result<Option<(Memory, f32)>> {
292    use crate::embedding::{cosine_similarity, get_embedding};
293
294    let now = Utc::now().to_rfc3339();
295    let scope_type = scope.scope_type();
296    let scope_id = scope.scope_id().map(|s| s.to_string());
297    let workspace = workspace.unwrap_or("default");
298
299    // Get all memories with embeddings in the same scope AND workspace
300    let mut stmt = conn.prepare_cached(
301        "SELECT id, content, memory_type, importance, access_count,
302                created_at, updated_at, last_accessed_at, owner_id,
303                visibility, version, has_embedding, metadata,
304                scope_type, scope_id, workspace, tier, expires_at, content_hash,
305                event_time, event_duration_seconds, trigger_pattern, procedure_success_count,
306                procedure_failure_count, summary_of_id, lifecycle_state, media_url
307         FROM memories
308         WHERE has_embedding = 1 AND valid_to IS NULL
309           AND (expires_at IS NULL OR expires_at > ?)
310           AND scope_type = ?
311           AND (scope_id = ? OR (scope_id IS NULL AND ? IS NULL))
312           AND workspace = ?",
313    )?;
314
315    let memories: Vec<Memory> = stmt
316        .query_map(
317            params![now, scope_type, scope_id, scope_id, workspace],
318            memory_from_row,
319        )?
320        .filter_map(|r| r.ok())
321        .collect();
322
323    let mut best_match: Option<(Memory, f32)> = None;
324
325    for memory in memories {
326        if let Ok(Some(embedding)) = get_embedding(conn, memory.id) {
327            let similarity = cosine_similarity(query_embedding, &embedding);
328            if similarity >= threshold {
329                match &best_match {
330                    None => best_match = Some((memory, similarity)),
331                    Some((_, best_score)) if similarity > *best_score => {
332                        best_match = Some((memory, similarity));
333                    }
334                    _ => {}
335                }
336            }
337        }
338    }
339
340    // Load tags for the best match
341    if let Some((mut memory, score)) = best_match {
342        memory.tags = load_tags(conn, memory.id)?;
343        Ok(Some((memory, score)))
344    } else {
345        Ok(None)
346    }
347}
348
349/// A pair of potentially duplicate memories with their similarity score
350#[derive(Debug, Clone, serde::Serialize)]
351pub struct DuplicatePair {
352    pub memory_a: Memory,
353    pub memory_b: Memory,
354    pub similarity_score: f64,
355    pub match_type: DuplicateMatchType,
356}
357
358/// How the duplicate was detected
359#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
360#[serde(rename_all = "snake_case")]
361pub enum DuplicateMatchType {
362    /// Exact content hash match
363    ExactHash,
364    /// High similarity score from crossrefs
365    HighSimilarity,
366    /// Semantic similarity via embedding cosine distance
367    EmbeddingSimilarity,
368}
369
370/// Find all potential duplicate memory pairs
371///
372/// Returns pairs of memories that are either:
373/// 1. Exact duplicates (same content hash within same scope)
374/// 2. High similarity (crossref score >= threshold within same scope)
375///
376/// Duplicates are scoped - memories in different scopes are not considered duplicates.
377pub fn find_duplicates(conn: &Connection, threshold: f64) -> Result<Vec<DuplicatePair>> {
378    find_duplicates_in_workspace(conn, threshold, None)
379}
380
381/// Find duplicate memories within a specific workspace (or all if None)
382pub fn find_duplicates_in_workspace(
383    conn: &Connection,
384    threshold: f64,
385    workspace: Option<&str>,
386) -> Result<Vec<DuplicatePair>> {
387    let now = Utc::now().to_rfc3339();
388    let mut duplicates = Vec::new();
389
390    // First, find exact hash duplicates (same content_hash within same scope AND workspace)
391    let (hash_sql, hash_params): (&str, Vec<Box<dyn rusqlite::ToSql>>) = if let Some(ws) = workspace
392    {
393        (
394            "SELECT content_hash, scope_type, scope_id, GROUP_CONCAT(id) as ids
395             FROM memories
396             WHERE content_hash IS NOT NULL
397               AND valid_to IS NULL
398               AND (expires_at IS NULL OR expires_at > ?)
399               AND workspace = ?
400             GROUP BY content_hash, scope_type, scope_id, workspace
401             HAVING COUNT(*) > 1",
402            vec![Box::new(now.clone()), Box::new(ws.to_string())],
403        )
404    } else {
405        (
406            "SELECT content_hash, scope_type, scope_id, GROUP_CONCAT(id) as ids
407             FROM memories
408             WHERE content_hash IS NOT NULL
409               AND valid_to IS NULL
410               AND (expires_at IS NULL OR expires_at > ?)
411             GROUP BY content_hash, scope_type, scope_id, workspace
412             HAVING COUNT(*) > 1",
413            vec![Box::new(now.clone())],
414        )
415    };
416
417    let mut hash_stmt = conn.prepare_cached(hash_sql)?;
418    let hash_rows = hash_stmt.query_map(
419        rusqlite::params_from_iter(hash_params.iter().map(|p| p.as_ref())),
420        |row| {
421            let ids_str: String = row.get(3)?;
422            Ok(ids_str)
423        },
424    )?;
425
426    for ids_result in hash_rows {
427        let ids_str = ids_result?;
428        let ids: Vec<i64> = ids_str
429            .split(',')
430            .filter_map(|s| s.trim().parse().ok())
431            .collect();
432
433        // Create pairs from all IDs with same hash
434        // Use get_memory_internal with track_access=false to avoid inflating access stats
435        for i in 0..ids.len() {
436            for j in (i + 1)..ids.len() {
437                let memory_a = get_memory_internal(conn, ids[i], false)?;
438                let memory_b = get_memory_internal(conn, ids[j], false)?;
439                duplicates.push(DuplicatePair {
440                    memory_a,
441                    memory_b,
442                    similarity_score: 1.0, // Exact match
443                    match_type: DuplicateMatchType::ExactHash,
444                });
445            }
446        }
447    }
448
449    // Second, find high-similarity pairs from crossrefs (within same scope AND workspace)
450    let (sim_sql, sim_params): (&str, Vec<Box<dyn rusqlite::ToSql>>) = if let Some(ws) = workspace {
451        (
452            "SELECT DISTINCT c.from_id, c.to_id, c.score
453             FROM crossrefs c
454             JOIN memories m1 ON c.from_id = m1.id
455             JOIN memories m2 ON c.to_id = m2.id
456             WHERE c.score >= ?
457               AND m1.valid_to IS NULL
458               AND m2.valid_to IS NULL
459               AND (m1.expires_at IS NULL OR m1.expires_at > ?)
460               AND (m2.expires_at IS NULL OR m2.expires_at > ?)
461               AND c.from_id < c.to_id
462               AND m1.scope_type = m2.scope_type
463               AND (m1.scope_id = m2.scope_id OR (m1.scope_id IS NULL AND m2.scope_id IS NULL))
464               AND m1.workspace = ?
465               AND m2.workspace = ?
466             ORDER BY c.score DESC",
467            vec![
468                Box::new(threshold),
469                Box::new(now.clone()),
470                Box::new(now.clone()),
471                Box::new(ws.to_string()),
472                Box::new(ws.to_string()),
473            ],
474        )
475    } else {
476        (
477            "SELECT DISTINCT c.from_id, c.to_id, c.score
478             FROM crossrefs c
479             JOIN memories m1 ON c.from_id = m1.id
480             JOIN memories m2 ON c.to_id = m2.id
481             WHERE c.score >= ?
482               AND m1.valid_to IS NULL
483               AND m2.valid_to IS NULL
484               AND (m1.expires_at IS NULL OR m1.expires_at > ?)
485               AND (m2.expires_at IS NULL OR m2.expires_at > ?)
486               AND c.from_id < c.to_id
487               AND m1.scope_type = m2.scope_type
488               AND (m1.scope_id = m2.scope_id OR (m1.scope_id IS NULL AND m2.scope_id IS NULL))
489               AND m1.workspace = m2.workspace
490             ORDER BY c.score DESC",
491            vec![
492                Box::new(threshold),
493                Box::new(now.clone()),
494                Box::new(now.clone()),
495            ],
496        )
497    };
498
499    let mut sim_stmt = conn.prepare_cached(sim_sql)?;
500    let sim_rows = sim_stmt.query_map(
501        rusqlite::params_from_iter(sim_params.iter().map(|p| p.as_ref())),
502        |row| {
503            Ok((
504                row.get::<_, i64>(0)?,
505                row.get::<_, i64>(1)?,
506                row.get::<_, f64>(2)?,
507            ))
508        },
509    )?;
510
511    for row_result in sim_rows {
512        let (from_id, to_id, score) = row_result?;
513
514        // Skip if this pair was already found as exact hash match
515        let already_found = duplicates.iter().any(|d| {
516            (d.memory_a.id == from_id && d.memory_b.id == to_id)
517                || (d.memory_a.id == to_id && d.memory_b.id == from_id)
518        });
519
520        if !already_found {
521            // Use get_memory_internal with track_access=false to avoid inflating access stats
522            let memory_a = get_memory_internal(conn, from_id, false)?;
523            let memory_b = get_memory_internal(conn, to_id, false)?;
524            duplicates.push(DuplicatePair {
525                memory_a,
526                memory_b,
527                similarity_score: score,
528                match_type: DuplicateMatchType::HighSimilarity,
529            });
530        }
531    }
532
533    Ok(duplicates)
534}
535
536/// Find semantically similar memories using embedding cosine similarity.
537/// This is "LLM-powered" dedup — goes beyond hash/n-gram matching to detect
538/// memories that convey the same information with different wording.
539pub fn find_duplicates_by_embedding(
540    conn: &Connection,
541    threshold: f32,
542    workspace: Option<&str>,
543    limit: usize,
544) -> Result<Vec<DuplicatePair>> {
545    use crate::embedding::{cosine_similarity, get_embedding};
546
547    let now = Utc::now().to_rfc3339();
548
549    // Get all memory IDs with embeddings (scoped to workspace if provided)
550    let (sql, params_vec): (&str, Vec<Box<dyn rusqlite::ToSql>>) = if let Some(ws) = workspace {
551        (
552            "SELECT id FROM memories
553             WHERE has_embedding = 1 AND valid_to IS NULL
554               AND (expires_at IS NULL OR expires_at > ?)
555               AND COALESCE(lifecycle_state, 'active') = 'active'
556               AND workspace = ?
557             ORDER BY id",
558            vec![Box::new(now), Box::new(ws.to_string())],
559        )
560    } else {
561        (
562            "SELECT id FROM memories
563             WHERE has_embedding = 1 AND valid_to IS NULL
564               AND (expires_at IS NULL OR expires_at > ?)
565               AND COALESCE(lifecycle_state, 'active') = 'active'
566             ORDER BY id",
567            vec![Box::new(now)],
568        )
569    };
570
571    let mut stmt = conn.prepare(sql)?;
572    let ids: Vec<i64> = stmt
573        .query_map(
574            rusqlite::params_from_iter(params_vec.iter().map(|p| p.as_ref())),
575            |row| row.get(0),
576        )?
577        .filter_map(|r| r.ok())
578        .collect();
579
580    // Load all embeddings into memory for pairwise comparison
581    let mut embeddings: Vec<(i64, Vec<f32>)> = Vec::with_capacity(ids.len());
582    for &id in &ids {
583        if let Ok(Some(emb)) = get_embedding(conn, id) {
584            embeddings.push((id, emb));
585        }
586    }
587
588    let mut duplicates = Vec::new();
589
590    // Pairwise comparison (O(n^2) — bounded by limit)
591    for i in 0..embeddings.len() {
592        if duplicates.len() >= limit {
593            break;
594        }
595        for j in (i + 1)..embeddings.len() {
596            if duplicates.len() >= limit {
597                break;
598            }
599            let sim = cosine_similarity(&embeddings[i].1, &embeddings[j].1);
600            if sim >= threshold {
601                let memory_a = get_memory_internal(conn, embeddings[i].0, false)?;
602                let memory_b = get_memory_internal(conn, embeddings[j].0, false)?;
603                duplicates.push(DuplicatePair {
604                    memory_a,
605                    memory_b,
606                    similarity_score: sim as f64,
607                    match_type: DuplicateMatchType::EmbeddingSimilarity,
608                });
609            }
610        }
611    }
612
613    // Sort by similarity descending
614    duplicates.sort_by(|a, b| {
615        b.similarity_score
616            .partial_cmp(&a.similarity_score)
617            .unwrap_or(std::cmp::Ordering::Equal)
618    });
619
620    Ok(duplicates)
621}
622
623/// Create a new memory with deduplication support
624pub fn create_memory(conn: &Connection, input: &CreateMemoryInput) -> Result<Memory> {
625    let now = Utc::now();
626    let now_str = now.to_rfc3339();
627    let metadata_json = serde_json::to_string(&input.metadata)?;
628    let importance = input.importance.unwrap_or(0.5);
629
630    // Compute content hash for deduplication
631    let content_hash = compute_content_hash(&input.content);
632
633    // Normalize workspace early for dedup checking
634    let workspace = match &input.workspace {
635        Some(ws) => crate::types::normalize_workspace(ws)
636            .map_err(|e| EngramError::InvalidInput(format!("Invalid workspace: {}", e)))?,
637        None => "default".to_string(),
638    };
639
640    // Check for duplicates based on dedup_mode (scoped to same scope AND workspace)
641    if input.dedup_mode != DedupMode::Allow {
642        if let Some(existing) =
643            find_by_content_hash(conn, &content_hash, &input.scope, Some(&workspace))?
644        {
645            match input.dedup_mode {
646                DedupMode::Reject => {
647                    return Err(EngramError::Duplicate {
648                        existing_id: existing.id,
649                        message: format!(
650                            "Duplicate memory detected (id={}). Content hash: {}",
651                            existing.id, content_hash
652                        ),
653                    });
654                }
655                DedupMode::Skip => {
656                    // Return existing memory without modification
657                    return Ok(existing);
658                }
659                DedupMode::Merge => {
660                    // Merge: update existing memory with new tags and metadata
661                    let mut merged_tags = existing.tags.clone();
662                    for tag in &input.tags {
663                        if !merged_tags.contains(tag) {
664                            merged_tags.push(tag.clone());
665                        }
666                    }
667
668                    let mut merged_metadata = existing.metadata.clone();
669                    for (key, value) in &input.metadata {
670                        merged_metadata.insert(key.clone(), value.clone());
671                    }
672
673                    let update_input = UpdateMemoryInput {
674                        content: None, // Keep existing content
675                        memory_type: None,
676                        tags: Some(merged_tags),
677                        metadata: Some(merged_metadata),
678                        importance: input.importance, // Use new importance if provided
679                        scope: None,
680                        ttl_seconds: input.ttl_seconds, // Apply new TTL if provided
681                        event_time: None,
682                        trigger_pattern: None,
683                        media_url: input.media_url.clone().map(Some),
684                    };
685
686                    return update_memory(conn, existing.id, &update_input);
687                }
688                DedupMode::Allow => unreachable!(),
689            }
690        }
691    }
692
693    // Extract scope type and id for database storage
694    let scope_type = input.scope.scope_type();
695    let scope_id = input.scope.scope_id().map(|s| s.to_string());
696
697    // workspace was already normalized above for dedup checking
698
699    // Determine tier and enforce tier invariants
700    let tier = input.tier;
701
702    // Calculate expires_at based on tier and ttl_seconds
703    // Tier invariants:
704    //   - Permanent: expires_at MUST be NULL (cannot expire)
705    //   - Daily: expires_at MUST be set (default: created_at + 24h)
706    let expires_at = match tier {
707        MemoryTier::Permanent => {
708            // Permanent memories cannot have an expiration
709            if input.ttl_seconds.is_some() && input.ttl_seconds != Some(0) {
710                return Err(EngramError::InvalidInput(
711                    "Permanent tier memories cannot have a TTL. Use Daily tier for expiring memories.".to_string()
712                ));
713            }
714            None
715        }
716        MemoryTier::Daily => {
717            // Daily memories must have an expiration (default: 24 hours)
718            let ttl = input.ttl_seconds.filter(|&t| t > 0).unwrap_or(86400); // 24h default
719            Some((now + chrono::Duration::seconds(ttl)).to_rfc3339())
720        }
721    };
722
723    let event_time = input.event_time.map(|dt| dt.to_rfc3339());
724
725    conn.execute(
726        "INSERT INTO memories (content, memory_type, importance, metadata, created_at, updated_at, valid_from, scope_type, scope_id, workspace, tier, expires_at, content_hash, event_time, event_duration_seconds, trigger_pattern, summary_of_id, media_url)
727         VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
728        params![
729            input.content,
730            input.memory_type.as_str(),
731            importance,
732            metadata_json,
733            now_str,
734            now_str,
735            now_str,
736            scope_type,
737            scope_id,
738            workspace,
739            tier.as_str(),
740            expires_at,
741            content_hash,
742            event_time,
743            input.event_duration_seconds,
744            input.trigger_pattern,
745            input.summary_of_id,
746            input.media_url,
747        ],
748    )?;
749
750    let id = conn.last_insert_rowid();
751
752    // Insert tags
753    for tag in &input.tags {
754        ensure_tag(conn, tag)?;
755        conn.execute(
756            "INSERT OR IGNORE INTO memory_tags (memory_id, tag_id)
757             SELECT ?, id FROM tags WHERE name = ?",
758            params![id, tag],
759        )?;
760    }
761
762    // Queue for embedding if not deferred
763    if !input.defer_embedding {
764        conn.execute(
765            "INSERT INTO embedding_queue (memory_id, status, queued_at)
766             VALUES (?, 'pending', ?)",
767            params![id, now_str],
768        )?;
769    }
770
771    // Create initial version
772    let tags_json = serde_json::to_string(&input.tags)?;
773    conn.execute(
774        "INSERT INTO memory_versions (memory_id, version, content, tags, metadata, created_at)
775         VALUES (?, 1, ?, ?, ?, ?)",
776        params![id, input.content, tags_json, metadata_json, now_str],
777    )?;
778
779    // Record event for sync delta tracking
780    record_event(
781        conn,
782        MemoryEventType::Created,
783        Some(id),
784        None,
785        serde_json::json!({
786            "workspace": input.workspace.as_deref().unwrap_or("default"),
787            "memory_type": input.memory_type.as_str(),
788        }),
789    )?;
790
791    // Update sync state (version now tracks event count for delta sync)
792    conn.execute(
793        "UPDATE sync_state SET pending_changes = pending_changes + 1, version = (SELECT MAX(id) FROM memory_events) WHERE id = 1",
794        [],
795    )?;
796
797    get_memory_internal(conn, id, false)
798}
799
800/// Ensure a tag exists and return its ID
801fn ensure_tag(conn: &Connection, tag: &str) -> Result<i64> {
802    conn.execute("INSERT OR IGNORE INTO tags (name) VALUES (?)", params![tag])?;
803
804    let id: i64 = conn.query_row("SELECT id FROM tags WHERE name = ?", params![tag], |row| {
805        row.get(0)
806    })?;
807
808    Ok(id)
809}
810
811/// Get a memory by ID
812pub fn get_memory(conn: &Connection, id: i64) -> Result<Memory> {
813    get_memory_internal(conn, id, true)
814}
815
816/// Update a memory
817pub fn update_memory(conn: &Connection, id: i64, input: &UpdateMemoryInput) -> Result<Memory> {
818    // Get current memory for versioning
819    let current = get_memory_internal(conn, id, false)?;
820    let now = Utc::now().to_rfc3339();
821
822    // Build update query dynamically
823    let mut updates = vec!["updated_at = ?".to_string()];
824    let mut values: Vec<Box<dyn rusqlite::ToSql>> = vec![Box::new(now.clone())];
825
826    if let Some(ref content) = input.content {
827        updates.push("content = ?".to_string());
828        values.push(Box::new(content.clone()));
829        // Recalculate content_hash when content changes
830        let new_hash = compute_content_hash(content);
831        updates.push("content_hash = ?".to_string());
832        values.push(Box::new(new_hash));
833    }
834
835    if let Some(ref memory_type) = input.memory_type {
836        updates.push("memory_type = ?".to_string());
837        values.push(Box::new(memory_type.as_str().to_string()));
838    }
839
840    if let Some(importance) = input.importance {
841        updates.push("importance = ?".to_string());
842        values.push(Box::new(importance));
843    }
844
845    if let Some(ref metadata) = input.metadata {
846        let metadata_json = serde_json::to_string(metadata)?;
847        updates.push("metadata = ?".to_string());
848        values.push(Box::new(metadata_json));
849    }
850
851    if let Some(ref scope) = input.scope {
852        updates.push("scope_type = ?".to_string());
853        values.push(Box::new(scope.scope_type().to_string()));
854        updates.push("scope_id = ?".to_string());
855        values.push(Box::new(scope.scope_id().map(|s| s.to_string())));
856    }
857
858    // Update event_time if provided (Some(None) clears)
859    if let Some(event_time) = &input.event_time {
860        updates.push("event_time = ?".to_string());
861        let value = event_time.as_ref().map(|dt| dt.to_rfc3339());
862        values.push(Box::new(value));
863    }
864
865    // Update trigger_pattern if provided (Some(None) clears)
866    if let Some(trigger_pattern) = &input.trigger_pattern {
867        updates.push("trigger_pattern = ?".to_string());
868        values.push(Box::new(trigger_pattern.clone()));
869    }
870
871    // Update media_url if provided (Some(None) clears, Some(Some(url)) sets)
872    if let Some(media_url) = &input.media_url {
873        updates.push("media_url = ?".to_string());
874        values.push(Box::new(media_url.clone()));
875    }
876
877    // Handle TTL update with tier invariant enforcement
878    // Normalize: ttl_seconds <= 0 means "no expiration" (consistent with create_memory)
879    // Invariants:
880    //   - Permanent tier: expires_at MUST be NULL
881    //   - Daily tier: expires_at MUST be set
882    if let Some(ttl) = input.ttl_seconds {
883        if ttl <= 0 {
884            // Request to remove expiration
885            // Only allowed for Permanent tier; for Daily tier, this is an error
886            if current.tier == MemoryTier::Daily {
887                return Err(crate::error::EngramError::InvalidInput(
888                    "Cannot remove expiration from a Daily tier memory. Use promote_to_permanent first.".to_string()
889                ));
890            }
891            updates.push("expires_at = NULL".to_string());
892        } else {
893            // Request to set expiration
894            // Only allowed for Daily tier; for Permanent tier, this is an error
895            if current.tier == MemoryTier::Permanent {
896                return Err(crate::error::EngramError::InvalidInput(
897                    "Cannot set expiration on a Permanent tier memory. Permanent memories cannot expire.".to_string()
898                ));
899            }
900            let expires_at = (Utc::now() + chrono::Duration::seconds(ttl)).to_rfc3339();
901            updates.push("expires_at = ?".to_string());
902            values.push(Box::new(expires_at));
903        }
904    }
905
906    // Increment version
907    updates.push("version = version + 1".to_string());
908
909    // Execute update
910    let sql = format!("UPDATE memories SET {} WHERE id = ?", updates.join(", "));
911    values.push(Box::new(id));
912
913    let params: Vec<&dyn rusqlite::ToSql> = values.iter().map(|b| b.as_ref()).collect();
914    conn.execute(&sql, params.as_slice())?;
915
916    // Update tags if provided
917    if let Some(ref tags) = input.tags {
918        conn.execute("DELETE FROM memory_tags WHERE memory_id = ?", params![id])?;
919        for tag in tags {
920            ensure_tag(conn, tag)?;
921            conn.execute(
922                "INSERT OR IGNORE INTO memory_tags (memory_id, tag_id)
923                 SELECT ?, id FROM tags WHERE name = ?",
924                params![id, tag],
925            )?;
926        }
927    }
928
929    // Create new version
930    let new_content = input.content.as_ref().unwrap_or(&current.content);
931    let new_tags = input.tags.as_ref().unwrap_or(&current.tags);
932    let new_metadata = input.metadata.as_ref().unwrap_or(&current.metadata);
933    let tags_json = serde_json::to_string(new_tags)?;
934    let metadata_json = serde_json::to_string(new_metadata)?;
935
936    conn.execute(
937        "INSERT INTO memory_versions (memory_id, version, content, tags, metadata, created_at)
938         VALUES (?, (SELECT version FROM memories WHERE id = ?), ?, ?, ?, ?)",
939        params![id, id, new_content, tags_json, metadata_json, now],
940    )?;
941
942    // Re-queue for embedding if content changed
943    if input.content.is_some() {
944        conn.execute(
945            "INSERT OR REPLACE INTO embedding_queue (memory_id, status, queued_at)
946             VALUES (?, 'pending', ?)",
947            params![id, now],
948        )?;
949        conn.execute(
950            "UPDATE memories SET has_embedding = 0 WHERE id = ?",
951            params![id],
952        )?;
953    }
954
955    // Build list of changed fields for event data
956    let mut changed_fields = Vec::new();
957    if input.content.is_some() {
958        changed_fields.push("content");
959    }
960    if input.tags.is_some() {
961        changed_fields.push("tags");
962    }
963    if input.metadata.is_some() {
964        changed_fields.push("metadata");
965    }
966    if input.importance.is_some() {
967        changed_fields.push("importance");
968    }
969    if input.ttl_seconds.is_some() {
970        changed_fields.push("ttl");
971    }
972
973    // Record event for sync delta tracking
974    record_event(
975        conn,
976        MemoryEventType::Updated,
977        Some(id),
978        None,
979        serde_json::json!({
980            "changed_fields": changed_fields,
981        }),
982    )?;
983
984    // Update sync state (version now tracks event count for delta sync)
985    conn.execute(
986        "UPDATE sync_state SET pending_changes = pending_changes + 1, version = (SELECT MAX(id) FROM memory_events) WHERE id = 1",
987        [],
988    )?;
989
990    get_memory_internal(conn, id, false)
991}
992
993/// Promote a memory from Daily tier to Permanent tier.
994///
995/// This operation:
996/// - Changes the tier from Daily to Permanent
997/// - Clears the expires_at field (permanent memories cannot expire)
998/// - Updates the updated_at timestamp
999///
1000/// # Errors
1001/// - Returns `NotFound` if memory doesn't exist
1002/// - Returns `Validation` if memory is already Permanent
1003pub fn promote_to_permanent(conn: &Connection, id: i64) -> Result<Memory> {
1004    let memory = get_memory_internal(conn, id, false)?;
1005
1006    if memory.tier == MemoryTier::Permanent {
1007        return Err(EngramError::InvalidInput(format!(
1008            "Memory {} is already in the Permanent tier",
1009            id
1010        )));
1011    }
1012
1013    let now = Utc::now().to_rfc3339();
1014
1015    conn.execute(
1016        "UPDATE memories SET tier = 'permanent', expires_at = NULL, updated_at = ?, version = version + 1 WHERE id = ?",
1017        params![now, id],
1018    )?;
1019
1020    // Record event for sync delta tracking
1021    record_event(
1022        conn,
1023        MemoryEventType::Updated,
1024        Some(id),
1025        None,
1026        serde_json::json!({
1027            "changed_fields": ["tier", "expires_at"],
1028            "action": "promote_to_permanent",
1029        }),
1030    )?;
1031
1032    // Update sync state (version now tracks event count for delta sync)
1033    conn.execute(
1034        "UPDATE sync_state SET pending_changes = pending_changes + 1, version = (SELECT MAX(id) FROM memory_events) WHERE id = 1",
1035        [],
1036    )?;
1037
1038    tracing::info!(memory_id = id, "Promoted memory to permanent tier");
1039
1040    get_memory_internal(conn, id, false)
1041}
1042
1043/// Move a memory to a different workspace.
1044///
1045/// # Arguments
1046/// - `id`: Memory ID
1047/// - `workspace`: New workspace name (will be normalized)
1048///
1049/// # Errors
1050/// - Returns `NotFound` if memory doesn't exist
1051/// - Returns `Validation` if workspace name is invalid
1052pub fn move_to_workspace(conn: &Connection, id: i64, workspace: &str) -> Result<Memory> {
1053    // Validate workspace exists (by checking the memory exists first)
1054    let _memory = get_memory_internal(conn, id, false)?;
1055
1056    // Normalize the workspace name
1057    let normalized = crate::types::normalize_workspace(workspace)
1058        .map_err(|e| EngramError::InvalidInput(format!("Invalid workspace: {}", e)))?;
1059
1060    let now = Utc::now().to_rfc3339();
1061
1062    conn.execute(
1063        "UPDATE memories SET workspace = ?, updated_at = ?, version = version + 1 WHERE id = ?",
1064        params![normalized, now, id],
1065    )?;
1066
1067    // Record event for sync delta tracking
1068    record_event(
1069        conn,
1070        MemoryEventType::Updated,
1071        Some(id),
1072        None,
1073        serde_json::json!({
1074            "changed_fields": ["workspace"],
1075            "action": "move_to_workspace",
1076            "new_workspace": normalized,
1077        }),
1078    )?;
1079
1080    // Update sync state (version now tracks event count for delta sync)
1081    conn.execute(
1082        "UPDATE sync_state SET pending_changes = pending_changes + 1, version = (SELECT MAX(id) FROM memory_events) WHERE id = 1",
1083        [],
1084    )?;
1085
1086    tracing::info!(memory_id = id, workspace = %normalized, "Moved memory to workspace");
1087
1088    get_memory_internal(conn, id, false)
1089}
1090
1091/// List all workspaces with their statistics.
1092///
1093/// Returns computed stats for each workspace that has at least one memory.
1094/// Stats are computed on-demand (not cached at the database level).
1095pub fn list_workspaces(conn: &Connection) -> Result<Vec<WorkspaceStats>> {
1096    let now = Utc::now().to_rfc3339();
1097
1098    let mut stmt = conn.prepare(
1099        r#"
1100        SELECT
1101            workspace,
1102            COUNT(*) as memory_count,
1103            SUM(CASE WHEN tier = 'permanent' THEN 1 ELSE 0 END) as permanent_count,
1104            SUM(CASE WHEN tier = 'daily' THEN 1 ELSE 0 END) as daily_count,
1105            MIN(created_at) as first_memory_at,
1106            MAX(created_at) as last_memory_at,
1107            AVG(importance) as avg_importance
1108        FROM memories
1109        WHERE valid_to IS NULL AND (expires_at IS NULL OR expires_at > ?)
1110        GROUP BY workspace
1111        ORDER BY memory_count DESC
1112        "#,
1113    )?;
1114
1115    let workspaces: Vec<WorkspaceStats> = stmt
1116        .query_map(params![now], |row| {
1117            let workspace: String = row.get(0)?;
1118            let memory_count: i64 = row.get(1)?;
1119            let permanent_count: i64 = row.get(2)?;
1120            let daily_count: i64 = row.get(3)?;
1121            let first_memory_at: Option<String> = row.get(4)?;
1122            let last_memory_at: Option<String> = row.get(5)?;
1123            let avg_importance: Option<f64> = row.get(6)?;
1124
1125            Ok(WorkspaceStats {
1126                workspace,
1127                memory_count,
1128                permanent_count,
1129                daily_count,
1130                first_memory_at: first_memory_at.and_then(|s| {
1131                    DateTime::parse_from_rfc3339(&s)
1132                        .map(|dt| dt.with_timezone(&Utc))
1133                        .ok()
1134                }),
1135                last_memory_at: last_memory_at.and_then(|s| {
1136                    DateTime::parse_from_rfc3339(&s)
1137                        .map(|dt| dt.with_timezone(&Utc))
1138                        .ok()
1139                }),
1140                top_tags: vec![], // Loaded separately if needed
1141                avg_importance: avg_importance.map(|v| v as f32),
1142            })
1143        })?
1144        .filter_map(|r| r.ok())
1145        .collect();
1146
1147    Ok(workspaces)
1148}
1149
1150/// Get statistics for a specific workspace.
1151pub fn get_workspace_stats(conn: &Connection, workspace: &str) -> Result<WorkspaceStats> {
1152    let normalized = crate::types::normalize_workspace(workspace)
1153        .map_err(|e| EngramError::InvalidInput(format!("Invalid workspace: {}", e)))?;
1154
1155    let now = Utc::now().to_rfc3339();
1156
1157    let stats = conn
1158        .query_row(
1159            r#"
1160        SELECT
1161            workspace,
1162            COUNT(*) as memory_count,
1163            SUM(CASE WHEN tier = 'permanent' THEN 1 ELSE 0 END) as permanent_count,
1164            SUM(CASE WHEN tier = 'daily' THEN 1 ELSE 0 END) as daily_count,
1165            MIN(created_at) as first_memory_at,
1166            MAX(created_at) as last_memory_at,
1167            AVG(importance) as avg_importance
1168        FROM memories
1169        WHERE workspace = ? AND valid_to IS NULL AND (expires_at IS NULL OR expires_at > ?)
1170        GROUP BY workspace
1171        "#,
1172            params![normalized, now],
1173            |row| {
1174                let workspace: String = row.get(0)?;
1175                let memory_count: i64 = row.get(1)?;
1176                let permanent_count: i64 = row.get(2)?;
1177                let daily_count: i64 = row.get(3)?;
1178                let first_memory_at: Option<String> = row.get(4)?;
1179                let last_memory_at: Option<String> = row.get(5)?;
1180                let avg_importance: Option<f64> = row.get(6)?;
1181
1182                Ok(WorkspaceStats {
1183                    workspace,
1184                    memory_count,
1185                    permanent_count,
1186                    daily_count,
1187                    first_memory_at: first_memory_at.and_then(|s| {
1188                        DateTime::parse_from_rfc3339(&s)
1189                            .map(|dt| dt.with_timezone(&Utc))
1190                            .ok()
1191                    }),
1192                    last_memory_at: last_memory_at.and_then(|s| {
1193                        DateTime::parse_from_rfc3339(&s)
1194                            .map(|dt| dt.with_timezone(&Utc))
1195                            .ok()
1196                    }),
1197                    top_tags: vec![],
1198                    avg_importance: avg_importance.map(|v| v as f32),
1199                })
1200            },
1201        )
1202        .map_err(|e| match e {
1203            rusqlite::Error::QueryReturnedNoRows => {
1204                EngramError::NotFound(0) // Workspace doesn't exist
1205            }
1206            _ => EngramError::Database(e),
1207        })?;
1208
1209    Ok(stats)
1210}
1211
1212/// Delete a workspace by moving all its memories to the default workspace or deleting them.
1213///
1214/// # Arguments
1215/// - `workspace`: Workspace to delete
1216/// - `move_to_default`: If true, moves memories to "default" workspace. If false, deletes them.
1217///
1218/// # Returns
1219/// Number of memories affected.
1220pub fn delete_workspace(conn: &Connection, workspace: &str, move_to_default: bool) -> Result<i64> {
1221    let normalized = crate::types::normalize_workspace(workspace)
1222        .map_err(|e| EngramError::InvalidInput(format!("Invalid workspace: {}", e)))?;
1223
1224    if normalized == "default" {
1225        return Err(EngramError::InvalidInput(
1226            "Cannot delete the default workspace".to_string(),
1227        ));
1228    }
1229
1230    let now = Utc::now().to_rfc3339();
1231
1232    // First, get the IDs of all affected memories so we can record individual events
1233    let affected_ids: Vec<i64> = {
1234        let mut stmt =
1235            conn.prepare("SELECT id FROM memories WHERE workspace = ? AND valid_to IS NULL")?;
1236        let rows = stmt.query_map(params![&normalized], |row| row.get(0))?;
1237        rows.collect::<std::result::Result<Vec<_>, _>>()?
1238    };
1239
1240    let affected = affected_ids.len() as i64;
1241
1242    if affected > 0 {
1243        if move_to_default {
1244            // Move all memories to the default workspace
1245            conn.execute(
1246                "UPDATE memories SET workspace = 'default', updated_at = ?, version = version + 1 WHERE workspace = ? AND valid_to IS NULL",
1247                params![&now, &normalized],
1248            )?;
1249        } else {
1250            // Soft delete all memories in the workspace
1251            conn.execute(
1252                "UPDATE memories SET valid_to = ? WHERE workspace = ? AND valid_to IS NULL",
1253                params![&now, &normalized],
1254            )?;
1255        }
1256
1257        // Record individual events for each affected memory (for proper sync delta tracking)
1258        let event_type = if move_to_default {
1259            MemoryEventType::Updated
1260        } else {
1261            MemoryEventType::Deleted
1262        };
1263
1264        for memory_id in &affected_ids {
1265            record_event(
1266                conn,
1267                event_type.clone(),
1268                Some(*memory_id),
1269                None,
1270                serde_json::json!({
1271                    "action": "delete_workspace",
1272                    "workspace": normalized,
1273                    "move_to_default": move_to_default,
1274                }),
1275            )?;
1276        }
1277    }
1278
1279    // Update sync state (version now tracks event count for delta sync)
1280    conn.execute(
1281        "UPDATE sync_state SET pending_changes = pending_changes + ?, version = (SELECT COALESCE(MAX(id), 0) FROM memory_events) WHERE id = 1",
1282        params![affected],
1283    )?;
1284
1285    tracing::info!(
1286        workspace = %normalized,
1287        move_to_default,
1288        affected,
1289        "Deleted workspace"
1290    );
1291
1292    Ok(affected)
1293}
1294
1295/// Delete a memory (soft delete by setting valid_to)
1296pub fn delete_memory(conn: &Connection, id: i64) -> Result<()> {
1297    let now = Utc::now().to_rfc3339();
1298
1299    // Get memory info before deletion for event data
1300    let memory_info: Option<(String, String)> = conn
1301        .query_row(
1302            "SELECT workspace, memory_type FROM memories WHERE id = ? AND valid_to IS NULL",
1303            params![id],
1304            |row| Ok((row.get(0)?, row.get(1)?)),
1305        )
1306        .ok();
1307
1308    let affected = conn.execute(
1309        "UPDATE memories SET valid_to = ? WHERE id = ? AND valid_to IS NULL",
1310        params![now, id],
1311    )?;
1312
1313    if affected == 0 {
1314        return Err(EngramError::NotFound(id));
1315    }
1316
1317    // Also invalidate cross-references
1318    conn.execute(
1319        "UPDATE crossrefs SET valid_to = ? WHERE (from_id = ? OR to_id = ?) AND valid_to IS NULL",
1320        params![now, id, id],
1321    )?;
1322
1323    // Record event for sync delta tracking
1324    let (workspace, memory_type) =
1325        memory_info.unwrap_or(("default".to_string(), "unknown".to_string()));
1326    record_event(
1327        conn,
1328        MemoryEventType::Deleted,
1329        Some(id),
1330        None,
1331        serde_json::json!({
1332            "workspace": workspace,
1333            "memory_type": memory_type,
1334        }),
1335    )?;
1336
1337    // Update sync state (version now tracks event count for delta sync)
1338    conn.execute(
1339        "UPDATE sync_state SET pending_changes = pending_changes + 1, version = (SELECT MAX(id) FROM memory_events) WHERE id = 1",
1340        [],
1341    )?;
1342
1343    Ok(())
1344}
1345
1346/// List memories with filtering and pagination
1347pub fn list_memories(conn: &Connection, options: &ListOptions) -> Result<Vec<Memory>> {
1348    let now = Utc::now().to_rfc3339();
1349
1350    let mut sql = String::from(
1351        "SELECT DISTINCT m.id, m.content, m.memory_type, m.importance, m.access_count,
1352                m.created_at, m.updated_at, m.last_accessed_at, m.owner_id,
1353                m.visibility, m.version, m.has_embedding, m.metadata,
1354                m.scope_type, m.scope_id, m.workspace, m.tier, m.expires_at, m.content_hash,
1355                m.event_time, m.event_duration_seconds, m.trigger_pattern, m.procedure_success_count,
1356                m.procedure_failure_count, m.summary_of_id, m.lifecycle_state, m.media_url
1357         FROM memories m",
1358    );
1359
1360    let mut conditions = vec!["m.valid_to IS NULL".to_string()];
1361    let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![];
1362
1363    // Exclude expired memories
1364    conditions.push("(m.expires_at IS NULL OR m.expires_at > ?)".to_string());
1365    params.push(Box::new(now));
1366
1367    // Tag filter (requires join)
1368    if let Some(ref tags) = options.tags {
1369        if !tags.is_empty() {
1370            sql.push_str(
1371                " JOIN memory_tags mt ON m.id = mt.memory_id
1372                  JOIN tags t ON mt.tag_id = t.id",
1373            );
1374            let placeholders: Vec<String> = tags.iter().map(|_| "?".to_string()).collect();
1375            conditions.push(format!("t.name IN ({})", placeholders.join(", ")));
1376            for tag in tags {
1377                params.push(Box::new(tag.clone()));
1378            }
1379        }
1380    }
1381
1382    // Type filter
1383    if let Some(ref memory_type) = options.memory_type {
1384        conditions.push("m.memory_type = ?".to_string());
1385        params.push(Box::new(memory_type.as_str().to_string()));
1386    }
1387
1388    // Metadata filter (JSON)
1389    if let Some(ref metadata_filter) = options.metadata_filter {
1390        for (key, value) in metadata_filter {
1391            metadata_value_to_param(key, value, &mut conditions, &mut params)?;
1392        }
1393    }
1394
1395    // Scope filter
1396    if let Some(ref scope) = options.scope {
1397        conditions.push("m.scope_type = ?".to_string());
1398        params.push(Box::new(scope.scope_type().to_string()));
1399        if let Some(scope_id) = scope.scope_id() {
1400            conditions.push("m.scope_id = ?".to_string());
1401            params.push(Box::new(scope_id.to_string()));
1402        } else {
1403            conditions.push("m.scope_id IS NULL".to_string());
1404        }
1405    }
1406
1407    // Workspace filter
1408    if let Some(ref workspace) = options.workspace {
1409        conditions.push("m.workspace = ?".to_string());
1410        params.push(Box::new(workspace.clone()));
1411    }
1412
1413    // Tier filter
1414    if let Some(ref tier) = options.tier {
1415        conditions.push("m.tier = ?".to_string());
1416        params.push(Box::new(tier.as_str().to_string()));
1417    }
1418
1419    sql.push_str(" WHERE ");
1420    sql.push_str(&conditions.join(" AND "));
1421
1422    // Sorting
1423    let sort_field = match options.sort_by.unwrap_or_default() {
1424        SortField::CreatedAt => "m.created_at",
1425        SortField::UpdatedAt => "m.updated_at",
1426        SortField::LastAccessedAt => "m.last_accessed_at",
1427        SortField::Importance => "m.importance",
1428        SortField::AccessCount => "m.access_count",
1429    };
1430    let sort_order = match options.sort_order.unwrap_or_default() {
1431        SortOrder::Asc => "ASC",
1432        SortOrder::Desc => "DESC",
1433    };
1434    sql.push_str(&format!(" ORDER BY {} {}", sort_field, sort_order));
1435
1436    // Pagination
1437    let limit = options.limit.unwrap_or(100);
1438    let offset = options.offset.unwrap_or(0);
1439    sql.push_str(&format!(" LIMIT {} OFFSET {}", limit, offset));
1440
1441    let param_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|b| b.as_ref()).collect();
1442    let mut stmt = conn.prepare(&sql)?;
1443
1444    let memories: Vec<Memory> = stmt
1445        .query_map(param_refs.as_slice(), memory_from_row)?
1446        .filter_map(|r| r.ok())
1447        .map(|mut m| {
1448            m.tags = load_tags(conn, m.id).unwrap_or_default();
1449            m
1450        })
1451        .collect();
1452
1453    Ok(memories)
1454}
1455
1456/// Query episodic memories ordered by event_time within a time range.
1457pub fn get_episodic_timeline(
1458    conn: &Connection,
1459    start_time: Option<DateTime<Utc>>,
1460    end_time: Option<DateTime<Utc>>,
1461    workspace: Option<&str>,
1462    tags: Option<&[String]>,
1463    limit: i64,
1464) -> Result<Vec<Memory>> {
1465    let now = Utc::now().to_rfc3339();
1466
1467    let mut sql = String::from(
1468        "SELECT DISTINCT m.id, m.content, m.memory_type, m.importance, m.access_count,
1469                m.created_at, m.updated_at, m.last_accessed_at, m.owner_id,
1470                m.visibility, m.version, m.has_embedding, m.metadata,
1471                m.scope_type, m.scope_id, m.workspace, m.tier, m.expires_at, m.content_hash,
1472                m.event_time, m.event_duration_seconds, m.trigger_pattern, m.procedure_success_count,
1473                m.procedure_failure_count, m.summary_of_id, m.lifecycle_state, m.media_url,
1474                m.event_time, m.event_duration_seconds, m.trigger_pattern,
1475                m.procedure_success_count, m.procedure_failure_count, m.summary_of_id,
1476                m.lifecycle_state
1477         FROM memories m",
1478    );
1479
1480    let mut conditions = vec![
1481        "m.valid_to IS NULL".to_string(),
1482        "(m.expires_at IS NULL OR m.expires_at > ?)".to_string(),
1483        "m.memory_type = 'episodic'".to_string(),
1484        "m.event_time IS NOT NULL".to_string(),
1485    ];
1486    let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![Box::new(now)];
1487
1488    if let Some(start) = start_time {
1489        conditions.push("m.event_time >= ?".to_string());
1490        params.push(Box::new(start.to_rfc3339()));
1491    }
1492
1493    if let Some(end) = end_time {
1494        conditions.push("m.event_time <= ?".to_string());
1495        params.push(Box::new(end.to_rfc3339()));
1496    }
1497
1498    if let Some(ws) = workspace {
1499        conditions.push("m.workspace = ?".to_string());
1500        params.push(Box::new(ws.to_string()));
1501    }
1502
1503    if let Some(tag_list) = tags {
1504        if !tag_list.is_empty() {
1505            sql.push_str(
1506                " JOIN memory_tags mt ON m.id = mt.memory_id
1507                  JOIN tags t ON mt.tag_id = t.id",
1508            );
1509            let placeholders: Vec<String> = tag_list.iter().map(|_| "?".to_string()).collect();
1510            conditions.push(format!("t.name IN ({})", placeholders.join(", ")));
1511            for tag in tag_list {
1512                params.push(Box::new(tag.clone()));
1513            }
1514        }
1515    }
1516
1517    sql.push_str(" WHERE ");
1518    sql.push_str(&conditions.join(" AND "));
1519    sql.push_str(" ORDER BY m.event_time ASC");
1520    sql.push_str(&format!(" LIMIT {}", limit));
1521
1522    let param_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|b| b.as_ref()).collect();
1523    let mut stmt = conn.prepare(&sql)?;
1524
1525    let memories: Vec<Memory> = stmt
1526        .query_map(param_refs.as_slice(), memory_from_row)?
1527        .filter_map(|r| r.ok())
1528        .map(|mut m| {
1529            m.tags = load_tags(conn, m.id).unwrap_or_default();
1530            m
1531        })
1532        .collect();
1533
1534    Ok(memories)
1535}
1536
1537/// Query procedural memories, optionally filtered by trigger pattern and success rate.
1538pub fn get_procedural_memories(
1539    conn: &Connection,
1540    trigger_pattern: Option<&str>,
1541    workspace: Option<&str>,
1542    min_success_rate: Option<f32>,
1543    limit: i64,
1544) -> Result<Vec<Memory>> {
1545    let now = Utc::now().to_rfc3339();
1546
1547    let sql_base = "SELECT m.id, m.content, m.memory_type, m.importance, m.access_count,
1548                m.created_at, m.updated_at, m.last_accessed_at, m.owner_id,
1549                m.visibility, m.version, m.has_embedding, m.metadata,
1550                m.scope_type, m.scope_id, m.workspace, m.tier, m.expires_at, m.content_hash,
1551                m.event_time, m.event_duration_seconds, m.trigger_pattern, m.procedure_success_count,
1552                m.procedure_failure_count, m.summary_of_id, m.lifecycle_state, m.media_url,
1553                m.event_time, m.event_duration_seconds, m.trigger_pattern,
1554                m.procedure_success_count, m.procedure_failure_count, m.summary_of_id,
1555                m.lifecycle_state
1556         FROM memories m";
1557
1558    let mut conditions = vec![
1559        "m.valid_to IS NULL".to_string(),
1560        "(m.expires_at IS NULL OR m.expires_at > ?)".to_string(),
1561        "m.memory_type = 'procedural'".to_string(),
1562    ];
1563    let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![Box::new(now)];
1564
1565    if let Some(pattern) = trigger_pattern {
1566        conditions.push("m.trigger_pattern LIKE ?".to_string());
1567        params.push(Box::new(format!("%{}%", pattern)));
1568    }
1569
1570    if let Some(ws) = workspace {
1571        conditions.push("m.workspace = ?".to_string());
1572        params.push(Box::new(ws.to_string()));
1573    }
1574
1575    if let Some(min_rate) = min_success_rate {
1576        // Filter: success / (success + failure) >= min_rate
1577        // Only apply when there's at least one execution
1578        conditions.push("(m.procedure_success_count + m.procedure_failure_count) > 0".to_string());
1579        conditions.push(
1580            "CAST(m.procedure_success_count AS REAL) / (m.procedure_success_count + m.procedure_failure_count) >= ?"
1581                .to_string(),
1582        );
1583        params.push(Box::new(min_rate as f64));
1584    }
1585
1586    let sql = format!(
1587        "{} WHERE {} ORDER BY m.procedure_success_count DESC LIMIT {}",
1588        sql_base,
1589        conditions.join(" AND "),
1590        limit
1591    );
1592
1593    let param_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|b| b.as_ref()).collect();
1594    let mut stmt = conn.prepare(&sql)?;
1595
1596    let memories: Vec<Memory> = stmt
1597        .query_map(param_refs.as_slice(), memory_from_row)?
1598        .filter_map(|r| r.ok())
1599        .map(|mut m| {
1600            m.tags = load_tags(conn, m.id).unwrap_or_default();
1601            m
1602        })
1603        .collect();
1604
1605    Ok(memories)
1606}
1607
1608/// Record a success or failure outcome for a procedural memory.
1609pub fn record_procedure_outcome(
1610    conn: &Connection,
1611    memory_id: i64,
1612    success: bool,
1613) -> Result<Memory> {
1614    let column = if success {
1615        "procedure_success_count"
1616    } else {
1617        "procedure_failure_count"
1618    };
1619
1620    let now = Utc::now().to_rfc3339();
1621
1622    // Verify the memory exists and is procedural
1623    let memory_type: String = conn
1624        .query_row(
1625            "SELECT memory_type FROM memories WHERE id = ? AND valid_to IS NULL",
1626            params![memory_id],
1627            |row| row.get(0),
1628        )
1629        .map_err(|_| EngramError::NotFound(memory_id))?;
1630
1631    if memory_type != "procedural" {
1632        return Err(EngramError::InvalidInput(format!(
1633            "Memory {} is type '{}', not 'procedural'",
1634            memory_id, memory_type
1635        )));
1636    }
1637
1638    conn.execute(
1639        &format!(
1640            "UPDATE memories SET {} = {} + 1, updated_at = ? WHERE id = ?",
1641            column, column
1642        ),
1643        params![now, memory_id],
1644    )?;
1645
1646    get_memory(conn, memory_id)
1647}
1648
1649/// Create a cross-reference between memories
1650pub fn create_crossref(conn: &Connection, input: &CreateCrossRefInput) -> Result<CrossReference> {
1651    let now = Utc::now().to_rfc3339();
1652
1653    // Verify both memories exist
1654    let _ = get_memory_internal(conn, input.from_id, false)?;
1655    let _ = get_memory_internal(conn, input.to_id, false)?;
1656
1657    let strength = input.strength.unwrap_or(1.0);
1658
1659    conn.execute(
1660        "INSERT INTO crossrefs (from_id, to_id, edge_type, score, strength, source, source_context, pinned, created_at, valid_from)
1661         VALUES (?, ?, ?, 1.0, ?, 'manual', ?, ?, ?, ?)
1662         ON CONFLICT(from_id, to_id, edge_type) DO UPDATE SET
1663            strength = excluded.strength,
1664            source_context = COALESCE(excluded.source_context, crossrefs.source_context),
1665            pinned = excluded.pinned",
1666        params![
1667            input.from_id,
1668            input.to_id,
1669            input.edge_type.as_str(),
1670            strength,
1671            input.source_context,
1672            input.pinned,
1673            now,
1674            now,
1675        ],
1676    )?;
1677
1678    get_crossref(conn, input.from_id, input.to_id, input.edge_type)
1679}
1680
1681/// Get a cross-reference
1682pub fn get_crossref(
1683    conn: &Connection,
1684    from_id: i64,
1685    to_id: i64,
1686    edge_type: EdgeType,
1687) -> Result<CrossReference> {
1688    let mut stmt = conn.prepare_cached(
1689        "SELECT from_id, to_id, edge_type, score, confidence, strength, source,
1690                source_context, created_at, valid_from, valid_to, pinned, metadata
1691         FROM crossrefs
1692         WHERE from_id = ? AND to_id = ? AND edge_type = ? AND valid_to IS NULL",
1693    )?;
1694
1695    let crossref = stmt.query_row(params![from_id, to_id, edge_type.as_str()], |row| {
1696        let edge_type_str: String = row.get("edge_type")?;
1697        let source_str: String = row.get("source")?;
1698        let created_at_str: String = row.get("created_at")?;
1699        let valid_from_str: String = row.get("valid_from")?;
1700        let valid_to_str: Option<String> = row.get("valid_to")?;
1701        let metadata_str: String = row.get("metadata")?;
1702
1703        Ok(CrossReference {
1704            from_id: row.get("from_id")?,
1705            to_id: row.get("to_id")?,
1706            edge_type: edge_type_str.parse().unwrap_or(EdgeType::RelatedTo),
1707            score: row.get("score")?,
1708            confidence: row.get("confidence")?,
1709            strength: row.get("strength")?,
1710            source: match source_str.as_str() {
1711                "manual" => RelationSource::Manual,
1712                "llm" => RelationSource::Llm,
1713                _ => RelationSource::Auto,
1714            },
1715            source_context: row.get("source_context")?,
1716            created_at: DateTime::parse_from_rfc3339(&created_at_str)
1717                .map(|dt| dt.with_timezone(&Utc))
1718                .unwrap_or_else(|_| Utc::now()),
1719            valid_from: DateTime::parse_from_rfc3339(&valid_from_str)
1720                .map(|dt| dt.with_timezone(&Utc))
1721                .unwrap_or_else(|_| Utc::now()),
1722            valid_to: valid_to_str.and_then(|s| {
1723                DateTime::parse_from_rfc3339(&s)
1724                    .map(|dt| dt.with_timezone(&Utc))
1725                    .ok()
1726            }),
1727            pinned: row.get::<_, i32>("pinned")? != 0,
1728            metadata: serde_json::from_str(&metadata_str).unwrap_or_default(),
1729        })
1730    })?;
1731
1732    Ok(crossref)
1733}
1734
1735/// Get all cross-references for a memory
1736pub fn get_related(conn: &Connection, memory_id: i64) -> Result<Vec<CrossReference>> {
1737    let mut stmt = conn.prepare_cached(
1738        "SELECT from_id, to_id, edge_type, score, confidence, strength, source,
1739                source_context, created_at, valid_from, valid_to, pinned, metadata
1740         FROM crossrefs
1741         WHERE (from_id = ? OR to_id = ?) AND valid_to IS NULL
1742         ORDER BY score DESC",
1743    )?;
1744
1745    let crossrefs: Vec<CrossReference> = stmt
1746        .query_map(params![memory_id, memory_id], |row| {
1747            let edge_type_str: String = row.get("edge_type")?;
1748            let source_str: String = row.get("source")?;
1749            let created_at_str: String = row.get("created_at")?;
1750            let valid_from_str: String = row.get("valid_from")?;
1751            let valid_to_str: Option<String> = row.get("valid_to")?;
1752            let metadata_str: String = row.get("metadata")?;
1753
1754            Ok(CrossReference {
1755                from_id: row.get("from_id")?,
1756                to_id: row.get("to_id")?,
1757                edge_type: edge_type_str.parse().unwrap_or(EdgeType::RelatedTo),
1758                score: row.get("score")?,
1759                confidence: row.get("confidence")?,
1760                strength: row.get("strength")?,
1761                source: match source_str.as_str() {
1762                    "manual" => RelationSource::Manual,
1763                    "llm" => RelationSource::Llm,
1764                    _ => RelationSource::Auto,
1765                },
1766                source_context: row.get("source_context")?,
1767                created_at: DateTime::parse_from_rfc3339(&created_at_str)
1768                    .map(|dt| dt.with_timezone(&Utc))
1769                    .unwrap_or_else(|_| Utc::now()),
1770                valid_from: DateTime::parse_from_rfc3339(&valid_from_str)
1771                    .map(|dt| dt.with_timezone(&Utc))
1772                    .unwrap_or_else(|_| Utc::now()),
1773                valid_to: valid_to_str.and_then(|s| {
1774                    DateTime::parse_from_rfc3339(&s)
1775                        .map(|dt| dt.with_timezone(&Utc))
1776                        .ok()
1777                }),
1778                pinned: row.get::<_, i32>("pinned")? != 0,
1779                metadata: serde_json::from_str(&metadata_str).unwrap_or_default(),
1780            })
1781        })?
1782        .filter_map(|r| r.ok())
1783        .collect();
1784
1785    Ok(crossrefs)
1786}
1787
1788/// Delete a cross-reference (soft delete)
1789pub fn delete_crossref(
1790    conn: &Connection,
1791    from_id: i64,
1792    to_id: i64,
1793    edge_type: EdgeType,
1794) -> Result<()> {
1795    let now = Utc::now().to_rfc3339();
1796
1797    let affected = conn.execute(
1798        "UPDATE crossrefs SET valid_to = ?
1799         WHERE from_id = ? AND to_id = ? AND edge_type = ? AND valid_to IS NULL",
1800        params![now, from_id, to_id, edge_type.as_str()],
1801    )?;
1802
1803    if affected == 0 {
1804        return Err(EngramError::NotFound(from_id));
1805    }
1806
1807    Ok(())
1808}
1809
1810/// Set expiration on an existing memory
1811///
1812/// # Arguments
1813/// * `conn` - Database connection
1814/// * `id` - Memory ID
1815/// * `ttl_seconds` - Time-to-live in seconds (0 = remove expiration, None = no change)
1816pub fn set_memory_expiration(
1817    conn: &Connection,
1818    id: i64,
1819    ttl_seconds: Option<i64>,
1820) -> Result<Memory> {
1821    // Verify memory exists and is not expired
1822    let _ = get_memory_internal(conn, id, false)?;
1823
1824    match ttl_seconds {
1825        Some(0) => {
1826            // Remove expiration
1827            conn.execute(
1828                "UPDATE memories SET expires_at = NULL, updated_at = ? WHERE id = ?",
1829                params![Utc::now().to_rfc3339(), id],
1830            )?;
1831        }
1832        Some(ttl) => {
1833            // Set new expiration
1834            let expires_at = (Utc::now() + chrono::Duration::seconds(ttl)).to_rfc3339();
1835            conn.execute(
1836                "UPDATE memories SET expires_at = ?, updated_at = ? WHERE id = ?",
1837                params![expires_at, Utc::now().to_rfc3339(), id],
1838            )?;
1839        }
1840        None => {
1841            // No change - don't record event or update sync state
1842            return get_memory_internal(conn, id, false);
1843        }
1844    }
1845
1846    // Record event for sync delta tracking
1847    record_event(
1848        conn,
1849        MemoryEventType::Updated,
1850        Some(id),
1851        None,
1852        serde_json::json!({
1853            "changed_fields": ["expires_at"],
1854            "action": "set_expiration",
1855        }),
1856    )?;
1857
1858    // Update sync state (version now tracks event count for delta sync)
1859    conn.execute(
1860        "UPDATE sync_state SET pending_changes = pending_changes + 1, version = (SELECT MAX(id) FROM memory_events) WHERE id = 1",
1861        [],
1862    )?;
1863
1864    get_memory_internal(conn, id, false)
1865}
1866
1867/// Delete all expired memories (cleanup job)
1868///
1869/// Returns the number of memories deleted
1870pub fn cleanup_expired_memories(conn: &Connection) -> Result<i64> {
1871    let now = Utc::now().to_rfc3339();
1872
1873    // Soft delete expired memories by setting valid_to
1874    let affected = conn.execute(
1875        "UPDATE memories SET valid_to = ?
1876         WHERE expires_at IS NOT NULL AND expires_at <= ? AND valid_to IS NULL",
1877        params![now, now],
1878    )?;
1879
1880    if affected > 0 {
1881        // Also invalidate cross-references involving expired memories
1882        conn.execute(
1883            "UPDATE crossrefs SET valid_to = ?
1884             WHERE valid_to IS NULL AND (
1885                 from_id IN (SELECT id FROM memories WHERE valid_to IS NOT NULL AND expires_at IS NOT NULL AND expires_at <= ?)
1886                 OR to_id IN (SELECT id FROM memories WHERE valid_to IS NOT NULL AND expires_at IS NOT NULL AND expires_at <= ?)
1887             )",
1888            params![now, now, now],
1889        )?;
1890
1891        // Remove memory_entities links for expired memories
1892        // This ensures expired memories don't appear in entity-based queries
1893        conn.execute(
1894            "DELETE FROM memory_entities
1895             WHERE memory_id IN (
1896                 SELECT id FROM memories
1897                 WHERE valid_to IS NOT NULL AND expires_at IS NOT NULL AND expires_at <= ?
1898             )",
1899            params![now],
1900        )?;
1901
1902        // Remove memory_tags links for expired memories
1903        conn.execute(
1904            "DELETE FROM memory_tags
1905             WHERE memory_id IN (
1906                 SELECT id FROM memories
1907                 WHERE valid_to IS NOT NULL AND expires_at IS NOT NULL AND expires_at <= ?
1908             )",
1909            params![now],
1910        )?;
1911
1912        // Record batch event for sync delta tracking
1913        record_event(
1914            conn,
1915            MemoryEventType::Deleted,
1916            None, // Batch operation
1917            None,
1918            serde_json::json!({
1919                "action": "cleanup_expired",
1920                "affected_count": affected,
1921            }),
1922        )?;
1923
1924        // Update sync state (version now tracks event count for delta sync)
1925        conn.execute(
1926            "UPDATE sync_state SET pending_changes = pending_changes + ?, version = (SELECT COALESCE(MAX(id), 0) FROM memory_events) WHERE id = 1",
1927            params![affected as i64],
1928        )?;
1929    }
1930
1931    Ok(affected as i64)
1932}
1933
1934/// Get count of expired memories (for monitoring)
1935pub fn count_expired_memories(conn: &Connection) -> Result<i64> {
1936    let now = Utc::now().to_rfc3339();
1937
1938    let count: i64 = conn.query_row(
1939        "SELECT COUNT(*) FROM memories
1940         WHERE expires_at IS NOT NULL AND expires_at <= ? AND valid_to IS NULL",
1941        params![now],
1942        |row| row.get(0),
1943    )?;
1944
1945    Ok(count)
1946}
1947
1948/// A per-workspace retention policy.
1949#[derive(Debug, Clone, Serialize, Deserialize)]
1950pub struct RetentionPolicy {
1951    pub id: i64,
1952    pub workspace: String,
1953    pub max_age_days: Option<i64>,
1954    pub max_memories: Option<i64>,
1955    pub compress_after_days: Option<i64>,
1956    pub compress_max_importance: f32,
1957    pub compress_min_access: i32,
1958    pub auto_delete_after_days: Option<i64>,
1959    pub exclude_types: Vec<String>,
1960    pub created_at: String,
1961    pub updated_at: String,
1962}
1963
1964/// Get retention policy for a workspace.
1965pub fn get_retention_policy(conn: &Connection, workspace: &str) -> Result<Option<RetentionPolicy>> {
1966    conn.query_row(
1967        "SELECT id, workspace, max_age_days, max_memories, compress_after_days,
1968                compress_max_importance, compress_min_access, auto_delete_after_days,
1969                exclude_types, created_at, updated_at
1970         FROM retention_policies WHERE workspace = ?",
1971        params![workspace],
1972        |row| {
1973            let exclude_str: Option<String> = row.get(8)?;
1974            Ok(RetentionPolicy {
1975                id: row.get(0)?,
1976                workspace: row.get(1)?,
1977                max_age_days: row.get(2)?,
1978                max_memories: row.get(3)?,
1979                compress_after_days: row.get(4)?,
1980                compress_max_importance: row.get::<_, f32>(5).unwrap_or(0.3),
1981                compress_min_access: row.get::<_, i32>(6).unwrap_or(3),
1982                auto_delete_after_days: row.get(7)?,
1983                exclude_types: exclude_str
1984                    .map(|s| s.split(',').map(|t| t.trim().to_string()).collect())
1985                    .unwrap_or_default(),
1986                created_at: row.get(9)?,
1987                updated_at: row.get(10)?,
1988            })
1989        },
1990    )
1991    .optional()
1992    .map_err(EngramError::from)
1993}
1994
1995/// List all retention policies.
1996pub fn list_retention_policies(conn: &Connection) -> Result<Vec<RetentionPolicy>> {
1997    let mut stmt = conn.prepare(
1998        "SELECT id, workspace, max_age_days, max_memories, compress_after_days,
1999                compress_max_importance, compress_min_access, auto_delete_after_days,
2000                exclude_types, created_at, updated_at
2001         FROM retention_policies ORDER BY workspace",
2002    )?;
2003
2004    let policies = stmt
2005        .query_map([], |row| {
2006            let exclude_str: Option<String> = row.get(8)?;
2007            Ok(RetentionPolicy {
2008                id: row.get(0)?,
2009                workspace: row.get(1)?,
2010                max_age_days: row.get(2)?,
2011                max_memories: row.get(3)?,
2012                compress_after_days: row.get(4)?,
2013                compress_max_importance: row.get::<_, f32>(5).unwrap_or(0.3),
2014                compress_min_access: row.get::<_, i32>(6).unwrap_or(3),
2015                auto_delete_after_days: row.get(7)?,
2016                exclude_types: exclude_str
2017                    .map(|s| s.split(',').map(|t| t.trim().to_string()).collect())
2018                    .unwrap_or_default(),
2019                created_at: row.get(9)?,
2020                updated_at: row.get(10)?,
2021            })
2022        })?
2023        .filter_map(|r| r.ok())
2024        .collect();
2025
2026    Ok(policies)
2027}
2028
2029/// Upsert a retention policy for a workspace.
2030pub fn set_retention_policy(
2031    conn: &Connection,
2032    workspace: &str,
2033    max_age_days: Option<i64>,
2034    max_memories: Option<i64>,
2035    compress_after_days: Option<i64>,
2036    compress_max_importance: Option<f32>,
2037    compress_min_access: Option<i32>,
2038    auto_delete_after_days: Option<i64>,
2039    exclude_types: Option<Vec<String>>,
2040) -> Result<RetentionPolicy> {
2041    let now = Utc::now().to_rfc3339();
2042    let exclude_str = exclude_types.map(|v| v.join(","));
2043
2044    conn.execute(
2045        "INSERT INTO retention_policies (workspace, max_age_days, max_memories, compress_after_days,
2046            compress_max_importance, compress_min_access, auto_delete_after_days, exclude_types,
2047            created_at, updated_at)
2048         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?9)
2049         ON CONFLICT(workspace) DO UPDATE SET
2050            max_age_days = COALESCE(?2, max_age_days),
2051            max_memories = COALESCE(?3, max_memories),
2052            compress_after_days = COALESCE(?4, compress_after_days),
2053            compress_max_importance = COALESCE(?5, compress_max_importance),
2054            compress_min_access = COALESCE(?6, compress_min_access),
2055            auto_delete_after_days = COALESCE(?7, auto_delete_after_days),
2056            exclude_types = COALESCE(?8, exclude_types),
2057            updated_at = ?9",
2058        params![
2059            workspace,
2060            max_age_days,
2061            max_memories,
2062            compress_after_days,
2063            compress_max_importance.unwrap_or(0.3),
2064            compress_min_access.unwrap_or(3),
2065            auto_delete_after_days,
2066            exclude_str,
2067            now,
2068        ],
2069    )?;
2070
2071    get_retention_policy(conn, workspace)?.ok_or_else(|| EngramError::NotFound(0))
2072}
2073
2074/// Delete a retention policy for a workspace.
2075pub fn delete_retention_policy(conn: &Connection, workspace: &str) -> Result<bool> {
2076    let affected = conn.execute(
2077        "DELETE FROM retention_policies WHERE workspace = ?",
2078        params![workspace],
2079    )?;
2080    Ok(affected > 0)
2081}
2082
2083/// Apply all retention policies. Returns total memories affected across all workspaces.
2084pub fn apply_retention_policies(conn: &Connection) -> Result<i64> {
2085    let policies = list_retention_policies(conn)?;
2086    let mut total_affected = 0i64;
2087
2088    for policy in &policies {
2089        // 1. Auto-compress based on compress_after_days
2090        if let Some(compress_days) = policy.compress_after_days {
2091            let compressed = compress_old_memories(
2092                conn,
2093                compress_days,
2094                policy.compress_max_importance,
2095                policy.compress_min_access,
2096                100,
2097            )?;
2098            total_affected += compressed;
2099        }
2100
2101        // 2. Enforce max_memories limit per workspace
2102        if let Some(max_mem) = policy.max_memories {
2103            let count: i64 = conn
2104                .query_row(
2105                    "SELECT COUNT(*) FROM memories WHERE workspace = ? AND valid_to IS NULL
2106                     AND COALESCE(lifecycle_state, 'active') = 'active'",
2107                    params![policy.workspace],
2108                    |row| row.get(0),
2109                )
2110                .unwrap_or(0);
2111
2112            if count > max_mem {
2113                // Archive excess (oldest, lowest importance first)
2114                let excess = count - max_mem;
2115                let archived = conn.execute(
2116                    "UPDATE memories SET lifecycle_state = 'archived'
2117                     WHERE id IN (
2118                        SELECT id FROM memories
2119                        WHERE workspace = ? AND valid_to IS NULL
2120                          AND COALESCE(lifecycle_state, 'active') = 'active'
2121                          AND memory_type NOT IN ('summary', 'checkpoint')
2122                        ORDER BY importance ASC, access_count ASC, created_at ASC
2123                        LIMIT ?
2124                     )",
2125                    params![policy.workspace, excess],
2126                )?;
2127                total_affected += archived as i64;
2128            }
2129        }
2130
2131        // 3. Auto-delete very old archived memories
2132        if let Some(delete_days) = policy.auto_delete_after_days {
2133            let cutoff = (Utc::now() - chrono::Duration::days(delete_days)).to_rfc3339();
2134            let now = Utc::now().to_rfc3339();
2135            let deleted = conn.execute(
2136                "UPDATE memories SET valid_to = ?
2137                 WHERE workspace = ? AND valid_to IS NULL
2138                   AND lifecycle_state = 'archived'
2139                   AND created_at < ?",
2140                params![now, policy.workspace, cutoff],
2141            )?;
2142            total_affected += deleted as i64;
2143        }
2144    }
2145
2146    Ok(total_affected)
2147}
2148
2149/// Auto-compress old, rarely-accessed memories by creating summaries and archiving originals.
2150/// Returns the number of memories archived.
2151pub fn compress_old_memories(
2152    conn: &Connection,
2153    max_age_days: i64,
2154    max_importance: f32,
2155    min_access_count: i32,
2156    batch_limit: usize,
2157) -> Result<i64> {
2158    let cutoff = (Utc::now() - chrono::Duration::days(max_age_days)).to_rfc3339();
2159    let now = Utc::now().to_rfc3339();
2160
2161    // Find candidates: old, low-importance, rarely-accessed, not already archived/summary
2162    let mut stmt = conn.prepare(
2163        "SELECT id, content, memory_type, importance, tags, workspace
2164         FROM (
2165            SELECT m.id, m.content, m.memory_type, m.importance, m.access_count, m.workspace,
2166                   COALESCE(m.lifecycle_state, 'active') as lifecycle_state,
2167                   (SELECT GROUP_CONCAT(t.name, ',') FROM memory_tags mt JOIN tags t ON mt.tag_id = t.id WHERE mt.memory_id = m.id) as tags
2168            FROM memories m
2169            WHERE m.valid_to IS NULL
2170              AND (m.expires_at IS NULL OR m.expires_at > ?1)
2171              AND m.created_at < ?2
2172              AND m.importance <= ?3
2173              AND m.access_count < ?4
2174              AND m.memory_type NOT IN ('summary', 'checkpoint')
2175              AND COALESCE(m.lifecycle_state, 'active') = 'active'
2176            ORDER BY m.created_at ASC
2177            LIMIT ?5
2178         )",
2179    )?;
2180
2181    let candidates: Vec<(i64, String, String, f32, Option<String>, String)> = stmt
2182        .query_map(
2183            params![
2184                now,
2185                cutoff,
2186                max_importance,
2187                min_access_count,
2188                batch_limit as i64
2189            ],
2190            |row| {
2191                Ok((
2192                    row.get(0)?,
2193                    row.get(1)?,
2194                    row.get(2)?,
2195                    row.get(3)?,
2196                    row.get(4)?,
2197                    row.get::<_, String>(5)
2198                        .unwrap_or_else(|_| "default".to_string()),
2199                ))
2200            },
2201        )?
2202        .filter_map(|r| r.ok())
2203        .collect();
2204
2205    let mut archived = 0i64;
2206
2207    for (id, content, memory_type, importance, tags_csv, workspace) in &candidates {
2208        // Create compressed summary
2209        let summary_text = if content.len() > 200 {
2210            let head: String = content.chars().take(120).collect();
2211            let tail: String = content
2212                .chars()
2213                .rev()
2214                .take(60)
2215                .collect::<String>()
2216                .chars()
2217                .rev()
2218                .collect();
2219            format!("{}...{}", head, tail)
2220        } else {
2221            content.clone()
2222        };
2223
2224        let tags: Vec<String> = tags_csv
2225            .as_deref()
2226            .unwrap_or("")
2227            .split(',')
2228            .filter(|s| !s.is_empty())
2229            .map(|s| s.to_string())
2230            .collect();
2231
2232        let input = CreateMemoryInput {
2233            content: format!("[Archived {}] {}", memory_type, summary_text),
2234            memory_type: MemoryType::Summary,
2235            importance: Some(*importance),
2236            tags,
2237            workspace: Some(workspace.clone()),
2238            tier: MemoryTier::Permanent,
2239            summary_of_id: Some(*id),
2240            ..Default::default()
2241        };
2242
2243        if create_memory(conn, &input).is_ok()
2244            && conn
2245                .execute(
2246                    "UPDATE memories SET lifecycle_state = 'archived' WHERE id = ? AND valid_to IS NULL",
2247                    params![id],
2248                )
2249                .is_ok()
2250        {
2251            archived += 1;
2252        }
2253    }
2254
2255    Ok(archived)
2256}
2257
2258/// A compact memory representation for efficient list views.
2259/// Contains only essential fields and a truncated content preview.
2260#[derive(Debug, Clone, serde::Serialize)]
2261pub struct CompactMemoryRow {
2262    /// Memory ID
2263    pub id: i64,
2264    /// Content preview (first line or N chars)
2265    pub preview: String,
2266    /// Whether content was truncated
2267    pub truncated: bool,
2268    /// Memory type
2269    pub memory_type: MemoryType,
2270    /// Tags
2271    pub tags: Vec<String>,
2272    /// Importance score
2273    pub importance: f32,
2274    /// Creation timestamp
2275    pub created_at: DateTime<Utc>,
2276    /// Last update timestamp
2277    pub updated_at: DateTime<Utc>,
2278    /// Workspace name
2279    pub workspace: String,
2280    /// Memory tier
2281    pub tier: MemoryTier,
2282    /// Original content length in chars
2283    pub content_length: usize,
2284    /// Number of lines in original content
2285    pub line_count: usize,
2286}
2287
2288/// List memories in compact format with preview only.
2289///
2290/// This is more efficient than `list_memories` when you don't need full content,
2291/// such as for browsing/listing UIs.
2292///
2293/// # Arguments
2294/// * `conn` - Database connection
2295/// * `options` - List filtering/pagination options
2296/// * `preview_chars` - Max chars for preview (default: 100)
2297pub fn list_memories_compact(
2298    conn: &Connection,
2299    options: &ListOptions,
2300    preview_chars: Option<usize>,
2301) -> Result<Vec<CompactMemoryRow>> {
2302    use crate::intelligence::compact_preview;
2303
2304    let now = Utc::now().to_rfc3339();
2305    let max_preview = preview_chars.unwrap_or(100);
2306
2307    let mut sql = String::from(
2308        "SELECT DISTINCT m.id, m.content, m.memory_type, m.importance,
2309                m.created_at, m.updated_at, m.workspace, m.tier
2310         FROM memories m",
2311    );
2312
2313    let mut conditions = vec!["m.valid_to IS NULL".to_string()];
2314    let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![];
2315
2316    // Exclude expired memories
2317    conditions.push("(m.expires_at IS NULL OR m.expires_at > ?)".to_string());
2318    params.push(Box::new(now));
2319
2320    // Tag filter (requires join)
2321    if let Some(ref tags) = options.tags {
2322        if !tags.is_empty() {
2323            sql.push_str(
2324                " JOIN memory_tags mt ON m.id = mt.memory_id
2325                  JOIN tags t ON mt.tag_id = t.id",
2326            );
2327            let placeholders: Vec<String> = tags.iter().map(|_| "?".to_string()).collect();
2328            conditions.push(format!("t.name IN ({})", placeholders.join(", ")));
2329            for tag in tags {
2330                params.push(Box::new(tag.clone()));
2331            }
2332        }
2333    }
2334
2335    // Type filter
2336    if let Some(ref memory_type) = options.memory_type {
2337        conditions.push("m.memory_type = ?".to_string());
2338        params.push(Box::new(memory_type.as_str().to_string()));
2339    }
2340
2341    // Metadata filter (JSON)
2342    if let Some(ref metadata_filter) = options.metadata_filter {
2343        for (key, value) in metadata_filter {
2344            metadata_value_to_param(key, value, &mut conditions, &mut params)?;
2345        }
2346    }
2347
2348    // Scope filter
2349    if let Some(ref scope) = options.scope {
2350        conditions.push("m.scope_type = ?".to_string());
2351        params.push(Box::new(scope.scope_type().to_string()));
2352        if let Some(scope_id) = scope.scope_id() {
2353            conditions.push("m.scope_id = ?".to_string());
2354            params.push(Box::new(scope_id.to_string()));
2355        } else {
2356            conditions.push("m.scope_id IS NULL".to_string());
2357        }
2358    }
2359
2360    // Workspace filter
2361    if let Some(ref workspace) = options.workspace {
2362        conditions.push("m.workspace = ?".to_string());
2363        params.push(Box::new(workspace.clone()));
2364    }
2365
2366    // Tier filter
2367    if let Some(ref tier) = options.tier {
2368        conditions.push("m.tier = ?".to_string());
2369        params.push(Box::new(tier.as_str().to_string()));
2370    }
2371
2372    sql.push_str(" WHERE ");
2373    sql.push_str(&conditions.join(" AND "));
2374
2375    // Sorting
2376    let sort_field = match options.sort_by.unwrap_or_default() {
2377        SortField::CreatedAt => "m.created_at",
2378        SortField::UpdatedAt => "m.updated_at",
2379        SortField::LastAccessedAt => "m.last_accessed_at",
2380        SortField::Importance => "m.importance",
2381        SortField::AccessCount => "m.access_count",
2382    };
2383    let sort_order = match options.sort_order.unwrap_or_default() {
2384        SortOrder::Asc => "ASC",
2385        SortOrder::Desc => "DESC",
2386    };
2387    sql.push_str(&format!(" ORDER BY {} {}", sort_field, sort_order));
2388
2389    // Pagination
2390    let limit = options.limit.unwrap_or(100);
2391    let offset = options.offset.unwrap_or(0);
2392    sql.push_str(&format!(" LIMIT {} OFFSET {}", limit, offset));
2393
2394    let param_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|b| b.as_ref()).collect();
2395    let mut stmt = conn.prepare(&sql)?;
2396
2397    let memories: Vec<CompactMemoryRow> = stmt
2398        .query_map(param_refs.as_slice(), |row| {
2399            let id: i64 = row.get("id")?;
2400            let content: String = row.get("content")?;
2401            let memory_type_str: String = row.get("memory_type")?;
2402            let importance: f32 = row.get("importance")?;
2403            let created_at_str: String = row.get("created_at")?;
2404            let updated_at_str: String = row.get("updated_at")?;
2405            let workspace: String = row.get("workspace")?;
2406            let tier_str: String = row.get("tier")?;
2407
2408            let memory_type = memory_type_str.parse().unwrap_or(MemoryType::Note);
2409            let tier = tier_str.parse().unwrap_or_default();
2410
2411            // Generate compact preview
2412            let (preview, truncated) = compact_preview(&content, max_preview);
2413            let content_length = content.len();
2414            let line_count = content.lines().count();
2415
2416            Ok(CompactMemoryRow {
2417                id,
2418                preview,
2419                truncated,
2420                memory_type,
2421                tags: vec![], // Will be loaded separately
2422                importance,
2423                created_at: DateTime::parse_from_rfc3339(&created_at_str)
2424                    .map(|dt| dt.with_timezone(&Utc))
2425                    .unwrap_or_else(|_| Utc::now()),
2426                updated_at: DateTime::parse_from_rfc3339(&updated_at_str)
2427                    .map(|dt| dt.with_timezone(&Utc))
2428                    .unwrap_or_else(|_| Utc::now()),
2429                workspace,
2430                tier,
2431                content_length,
2432                line_count,
2433            })
2434        })?
2435        .filter_map(|r| r.ok())
2436        .map(|mut m| {
2437            m.tags = load_tags(conn, m.id).unwrap_or_default();
2438            m
2439        })
2440        .collect();
2441
2442    Ok(memories)
2443}
2444
2445/// Get storage statistics
2446pub fn get_stats(conn: &Connection) -> Result<StorageStats> {
2447    let total_memories: i64 = conn.query_row(
2448        "SELECT COUNT(*) FROM memories WHERE valid_to IS NULL",
2449        [],
2450        |row| row.get(0),
2451    )?;
2452
2453    let total_tags: i64 = conn.query_row("SELECT COUNT(*) FROM tags", [], |row| row.get(0))?;
2454
2455    let total_crossrefs: i64 = conn.query_row(
2456        "SELECT COUNT(*) FROM crossrefs WHERE valid_to IS NULL",
2457        [],
2458        |row| row.get(0),
2459    )?;
2460
2461    let total_versions: i64 =
2462        conn.query_row("SELECT COUNT(*) FROM memory_versions", [], |row| row.get(0))?;
2463
2464    let _total_identities: i64 =
2465        conn.query_row("SELECT COUNT(*) FROM identities", [], |row| row.get(0))?;
2466
2467    let _total_entities: i64 =
2468        conn.query_row("SELECT COUNT(*) FROM entities", [], |row| row.get(0))?;
2469
2470    let db_size_bytes: i64 = conn.query_row(
2471        "SELECT page_count * page_size FROM pragma_page_count(), pragma_page_size()",
2472        [],
2473        |row| row.get(0),
2474    )?;
2475
2476    let _schema_version: i32 = conn
2477        .query_row("SELECT MAX(version) FROM schema_version", [], |row| {
2478            row.get(0)
2479        })
2480        .unwrap_or(0);
2481
2482    let mut workspace_stmt = conn.prepare(
2483        "SELECT workspace, COUNT(*) FROM memories WHERE valid_to IS NULL GROUP BY workspace",
2484    )?;
2485    let workspaces: HashMap<String, i64> = workspace_stmt
2486        .query_map([], |row| {
2487            Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
2488        })?
2489        .filter_map(|r| r.ok())
2490        .collect();
2491
2492    let mut type_stmt = conn.prepare(
2493        "SELECT memory_type, COUNT(*) FROM memories WHERE valid_to IS NULL GROUP BY memory_type",
2494    )?;
2495    let type_counts: HashMap<String, i64> = type_stmt
2496        .query_map([], |row| {
2497            Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
2498        })?
2499        .filter_map(|r| r.ok())
2500        .collect();
2501
2502    let mut tier_stmt = conn.prepare(
2503        "SELECT COALESCE(tier, 'permanent'), COUNT(*) FROM memories GROUP BY COALESCE(tier, 'permanent')",
2504    )?;
2505    let tier_counts: HashMap<String, i64> = tier_stmt
2506        .query_map([], |row| {
2507            Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
2508        })?
2509        .filter_map(|r| r.ok())
2510        .collect();
2511
2512    let memories_with_embeddings: i64 = conn.query_row(
2513        "SELECT COUNT(*) FROM memories WHERE has_embedding = 1 AND valid_to IS NULL",
2514        [],
2515        |row| row.get(0),
2516    )?;
2517
2518    let memories_pending_embedding: i64 = conn.query_row(
2519        "SELECT COUNT(*) FROM embedding_queue WHERE status = 'pending'",
2520        [],
2521        |row| row.get(0),
2522    )?;
2523
2524    let (last_sync, sync_pending): (Option<String>, i64) = conn.query_row(
2525        "SELECT last_sync, pending_changes FROM sync_state WHERE id = 1",
2526        [],
2527        |row| Ok((row.get(0)?, row.get(1)?)),
2528    )?;
2529
2530    Ok(StorageStats {
2531        total_memories,
2532        total_tags,
2533        total_crossrefs,
2534        total_versions,
2535        total_identities: 0,
2536        total_entities: 0,
2537        db_size_bytes,
2538        memories_with_embeddings,
2539        memories_pending_embedding,
2540        last_sync: last_sync.and_then(|s| {
2541            DateTime::parse_from_rfc3339(&s)
2542                .map(|dt| dt.with_timezone(&Utc))
2543                .ok()
2544        }),
2545        sync_pending: sync_pending > 0,
2546        storage_mode: "sqlite".to_string(),
2547        schema_version: 0,
2548        workspaces,
2549        type_counts,
2550        tier_counts,
2551    })
2552}
2553
2554/// Get memory versions
2555pub fn get_memory_versions(conn: &Connection, memory_id: i64) -> Result<Vec<MemoryVersion>> {
2556    let mut stmt = conn.prepare_cached(
2557        "SELECT version, content, tags, metadata, created_at, created_by, change_summary
2558         FROM memory_versions WHERE memory_id = ? ORDER BY version DESC",
2559    )?;
2560
2561    let versions: Vec<MemoryVersion> = stmt
2562        .query_map([memory_id], |row| {
2563            let tags_str: String = row.get("tags")?;
2564            let metadata_str: String = row.get("metadata")?;
2565            let created_at_str: String = row.get("created_at")?;
2566
2567            Ok(MemoryVersion {
2568                version: row.get("version")?,
2569                content: row.get("content")?,
2570                tags: serde_json::from_str(&tags_str).unwrap_or_default(),
2571                metadata: serde_json::from_str(&metadata_str).unwrap_or_default(),
2572                created_at: DateTime::parse_from_rfc3339(&created_at_str)
2573                    .map(|dt| dt.with_timezone(&Utc))
2574                    .unwrap_or_else(|_| Utc::now()),
2575                created_by: row.get("created_by")?,
2576                change_summary: row.get("change_summary")?,
2577            })
2578        })?
2579        .filter_map(|r| r.ok())
2580        .collect();
2581
2582    Ok(versions)
2583}
2584
2585// ============================================================================
2586// Batch Operations
2587// ============================================================================
2588
2589/// Result of a batch create operation
2590#[derive(Debug, Clone, serde::Serialize)]
2591pub struct BatchCreateResult {
2592    pub created: Vec<Memory>,
2593    pub failed: Vec<BatchError>,
2594    pub total_created: usize,
2595    pub total_failed: usize,
2596}
2597
2598/// Result of a batch delete operation
2599#[derive(Debug, Clone, serde::Serialize)]
2600pub struct BatchDeleteResult {
2601    pub deleted: Vec<i64>,
2602    pub failed: Vec<BatchError>,
2603    pub total_deleted: usize,
2604    pub total_failed: usize,
2605}
2606
2607/// Error information for batch operations
2608#[derive(Debug, Clone, serde::Serialize)]
2609pub struct BatchError {
2610    pub index: usize,
2611    pub id: Option<i64>,
2612    pub error: String,
2613}
2614
2615/// Create multiple memories in a single transaction
2616pub fn create_memory_batch(
2617    conn: &Connection,
2618    inputs: &[CreateMemoryInput],
2619) -> Result<BatchCreateResult> {
2620    let mut created = Vec::new();
2621    let mut failed = Vec::new();
2622
2623    for (index, input) in inputs.iter().enumerate() {
2624        match create_memory(conn, input) {
2625            Ok(memory) => created.push(memory),
2626            Err(e) => failed.push(BatchError {
2627                index,
2628                id: None,
2629                error: e.to_string(),
2630            }),
2631        }
2632    }
2633
2634    Ok(BatchCreateResult {
2635        total_created: created.len(),
2636        total_failed: failed.len(),
2637        created,
2638        failed,
2639    })
2640}
2641
2642/// Delete multiple memories in a single transaction
2643pub fn delete_memory_batch(conn: &Connection, ids: &[i64]) -> Result<BatchDeleteResult> {
2644    let mut deleted = Vec::new();
2645    let mut failed = Vec::new();
2646
2647    for (index, &id) in ids.iter().enumerate() {
2648        match delete_memory(conn, id) {
2649            Ok(()) => deleted.push(id),
2650            Err(e) => failed.push(BatchError {
2651                index,
2652                id: Some(id),
2653                error: e.to_string(),
2654            }),
2655        }
2656    }
2657
2658    Ok(BatchDeleteResult {
2659        total_deleted: deleted.len(),
2660        total_failed: failed.len(),
2661        deleted,
2662        failed,
2663    })
2664}
2665
2666// ============================================================================
2667// Tag Utilities
2668// ============================================================================
2669
2670/// Tag with usage count
2671#[derive(Debug, Clone, serde::Serialize)]
2672pub struct TagInfo {
2673    pub name: String,
2674    pub count: i64,
2675    pub last_used: Option<DateTime<Utc>>,
2676}
2677
2678/// Get all tags with their usage counts
2679pub fn list_tags(conn: &Connection) -> Result<Vec<TagInfo>> {
2680    let mut stmt = conn.prepare(
2681        r#"
2682        SELECT t.name, COUNT(mt.memory_id) as count,
2683               MAX(m.updated_at) as last_used
2684        FROM tags t
2685        LEFT JOIN memory_tags mt ON t.id = mt.tag_id
2686        LEFT JOIN memories m ON mt.memory_id = m.id AND m.valid_to IS NULL
2687        GROUP BY t.id, t.name
2688        ORDER BY count DESC, t.name ASC
2689        "#,
2690    )?;
2691
2692    let tags: Vec<TagInfo> = stmt
2693        .query_map([], |row| {
2694            let name: String = row.get(0)?;
2695            let count: i64 = row.get(1)?;
2696            let last_used: Option<String> = row.get(2)?;
2697
2698            Ok(TagInfo {
2699                name,
2700                count,
2701                last_used: last_used.and_then(|s| {
2702                    DateTime::parse_from_rfc3339(&s)
2703                        .map(|dt| dt.with_timezone(&Utc))
2704                        .ok()
2705                }),
2706            })
2707        })?
2708        .filter_map(|r| r.ok())
2709        .collect();
2710
2711    Ok(tags)
2712}
2713
2714/// Tag hierarchy node
2715#[derive(Debug, Clone, serde::Serialize)]
2716pub struct TagHierarchyNode {
2717    pub name: String,
2718    pub full_path: String,
2719    pub count: i64,
2720    pub children: Vec<TagHierarchyNode>,
2721}
2722
2723/// Build tag hierarchy from slash-separated tags (e.g., "project/engram/core")
2724pub fn get_tag_hierarchy(conn: &Connection) -> Result<Vec<TagHierarchyNode>> {
2725    let tags = list_tags(conn)?;
2726
2727    // Build hierarchy from slash-separated paths
2728    let mut root_nodes: HashMap<String, TagHierarchyNode> = HashMap::new();
2729
2730    for tag in tags {
2731        let parts: Vec<&str> = tag.name.split('/').collect();
2732        if parts.is_empty() {
2733            continue;
2734        }
2735
2736        let root_name = parts[0].to_string();
2737        if !root_nodes.contains_key(&root_name) {
2738            root_nodes.insert(
2739                root_name.clone(),
2740                TagHierarchyNode {
2741                    name: root_name.clone(),
2742                    full_path: root_name.clone(),
2743                    count: 0,
2744                    children: Vec::new(),
2745                },
2746            );
2747        }
2748
2749        // Add count to appropriate level
2750        if parts.len() == 1 {
2751            if let Some(node) = root_nodes.get_mut(&root_name) {
2752                node.count += tag.count;
2753            }
2754        } else {
2755            // For nested tags, we'd need recursive building
2756            // For now, just add to root count
2757            if let Some(node) = root_nodes.get_mut(&root_name) {
2758                node.count += tag.count;
2759            }
2760        }
2761    }
2762
2763    Ok(root_nodes.into_values().collect())
2764}
2765
2766/// Tag validation result
2767#[derive(Debug, Clone, serde::Serialize)]
2768pub struct TagValidationResult {
2769    pub valid: bool,
2770    pub orphaned_tags: Vec<String>,
2771    pub empty_tags: Vec<String>,
2772    pub duplicate_assignments: Vec<(i64, String)>,
2773    pub total_tags: i64,
2774    pub total_assignments: i64,
2775}
2776
2777/// Validate tag consistency
2778pub fn validate_tags(conn: &Connection) -> Result<TagValidationResult> {
2779    // Find orphaned tags (tags with no memories)
2780    let orphaned: Vec<String> = conn
2781        .prepare(
2782            "SELECT t.name FROM tags t
2783             LEFT JOIN memory_tags mt ON t.id = mt.tag_id
2784             WHERE mt.tag_id IS NULL",
2785        )?
2786        .query_map([], |row| row.get(0))?
2787        .filter_map(|r| r.ok())
2788        .collect();
2789
2790    // Find empty tag names
2791    let empty: Vec<String> = conn
2792        .prepare("SELECT name FROM tags WHERE name = '' OR name IS NULL")?
2793        .query_map([], |row| row.get(0))?
2794        .filter_map(|r| r.ok())
2795        .collect();
2796
2797    // Count totals
2798    let total_tags: i64 = conn.query_row("SELECT COUNT(*) FROM tags", [], |row| row.get(0))?;
2799    let total_assignments: i64 =
2800        conn.query_row("SELECT COUNT(*) FROM memory_tags", [], |row| row.get(0))?;
2801
2802    Ok(TagValidationResult {
2803        valid: orphaned.is_empty() && empty.is_empty(),
2804        orphaned_tags: orphaned,
2805        empty_tags: empty,
2806        duplicate_assignments: vec![], // Would need more complex query
2807        total_tags,
2808        total_assignments,
2809    })
2810}
2811
2812// ============================================================================
2813// Import/Export
2814// ============================================================================
2815
2816/// Exported memory format
2817#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
2818pub struct ExportedMemory {
2819    pub id: i64,
2820    pub content: String,
2821    pub memory_type: String,
2822    pub tags: Vec<String>,
2823    pub metadata: HashMap<String, serde_json::Value>,
2824    pub importance: f32,
2825    pub workspace: String,
2826    pub tier: String,
2827    pub created_at: String,
2828    pub updated_at: String,
2829}
2830
2831/// Export format
2832#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
2833pub struct ExportData {
2834    pub version: String,
2835    pub exported_at: String,
2836    pub memory_count: usize,
2837    pub memories: Vec<ExportedMemory>,
2838}
2839
2840/// Export all memories to JSON-serializable format
2841pub fn export_memories(conn: &Connection) -> Result<ExportData> {
2842    let memories = list_memories(
2843        conn,
2844        &ListOptions {
2845            limit: Some(100000),
2846            ..Default::default()
2847        },
2848    )?;
2849
2850    let exported: Vec<ExportedMemory> = memories
2851        .into_iter()
2852        .map(|m| ExportedMemory {
2853            id: m.id,
2854            content: m.content,
2855            memory_type: m.memory_type.as_str().to_string(),
2856            tags: m.tags,
2857            metadata: m.metadata,
2858            importance: m.importance,
2859            workspace: m.workspace,
2860            tier: m.tier.as_str().to_string(),
2861            created_at: m.created_at.to_rfc3339(),
2862            updated_at: m.updated_at.to_rfc3339(),
2863        })
2864        .collect();
2865
2866    Ok(ExportData {
2867        version: "1.0".to_string(),
2868        exported_at: Utc::now().to_rfc3339(),
2869        memory_count: exported.len(),
2870        memories: exported,
2871    })
2872}
2873
2874/// Import result
2875#[derive(Debug, Clone, serde::Serialize)]
2876pub struct ImportResult {
2877    pub imported: usize,
2878    pub skipped: usize,
2879    pub failed: usize,
2880    pub errors: Vec<String>,
2881}
2882
2883/// Import memories from exported format
2884pub fn import_memories(
2885    conn: &Connection,
2886    data: &ExportData,
2887    skip_duplicates: bool,
2888) -> Result<ImportResult> {
2889    let mut imported = 0;
2890    let mut skipped = 0;
2891    let mut failed = 0;
2892    let mut errors = Vec::new();
2893
2894    for mem in &data.memories {
2895        let memory_type = mem.memory_type.parse().unwrap_or(MemoryType::Note);
2896        let tier = mem.tier.parse().unwrap_or(MemoryTier::Permanent);
2897
2898        let input = CreateMemoryInput {
2899            content: mem.content.clone(),
2900            memory_type,
2901            tags: mem.tags.clone(),
2902            metadata: mem.metadata.clone(),
2903            importance: Some(mem.importance),
2904            scope: MemoryScope::Global,
2905            workspace: Some(mem.workspace.clone()),
2906            tier,
2907            defer_embedding: false,
2908            ttl_seconds: None,
2909            dedup_mode: if skip_duplicates {
2910                DedupMode::Skip
2911            } else {
2912                DedupMode::Allow
2913            },
2914            dedup_threshold: None,
2915            event_time: None,
2916            event_duration_seconds: None,
2917            trigger_pattern: None,
2918            summary_of_id: None,
2919            media_url: None,
2920        };
2921
2922        match create_memory(conn, &input) {
2923            Ok(_) => imported += 1,
2924            Err(EngramError::Duplicate { .. }) => skipped += 1,
2925            Err(e) => {
2926                failed += 1;
2927                errors.push(format!("Failed to import memory {}: {}", mem.id, e));
2928            }
2929        }
2930    }
2931
2932    Ok(ImportResult {
2933        imported,
2934        skipped,
2935        failed,
2936        errors,
2937    })
2938}
2939
2940// ============================================================================
2941// Maintenance Operations
2942// ============================================================================
2943
2944/// Queue all memories for re-embedding
2945pub fn rebuild_embeddings(conn: &Connection) -> Result<i64> {
2946    let now = Utc::now().to_rfc3339();
2947
2948    // Clear existing queue
2949    conn.execute("DELETE FROM embedding_queue", [])?;
2950
2951    // Queue all memories
2952    let count = conn.execute(
2953        "INSERT INTO embedding_queue (memory_id, status, queued_at)
2954         SELECT id, 'pending', ? FROM memories WHERE valid_to IS NULL",
2955        params![now],
2956    )?;
2957
2958    // Reset has_embedding flag
2959    conn.execute(
2960        "UPDATE memories SET has_embedding = 0 WHERE valid_to IS NULL",
2961        [],
2962    )?;
2963
2964    Ok(count as i64)
2965}
2966
2967/// Rebuild all cross-references based on embeddings
2968pub fn rebuild_crossrefs(conn: &Connection) -> Result<i64> {
2969    let now = Utc::now().to_rfc3339();
2970
2971    // Clear existing auto-generated crossrefs (keep manual ones)
2972    let deleted = conn.execute(
2973        "UPDATE crossrefs SET valid_to = ? WHERE source = 'auto' AND valid_to IS NULL",
2974        params![now],
2975    )?;
2976
2977    // Note: Actual crossref generation requires embeddings and is done by the embedding worker
2978    // This just clears the old ones so they can be regenerated
2979
2980    Ok(deleted as i64)
2981}
2982
2983// ============================================================================
2984// Special Memory Types
2985// ============================================================================
2986
2987/// Create a section memory (for document structure)
2988pub fn create_section_memory(
2989    conn: &Connection,
2990    title: &str,
2991    content: &str,
2992    parent_id: Option<i64>,
2993    level: i32,
2994    workspace: Option<&str>,
2995) -> Result<Memory> {
2996    let mut metadata = HashMap::new();
2997    metadata.insert("section_title".to_string(), serde_json::json!(title));
2998    metadata.insert("section_level".to_string(), serde_json::json!(level));
2999    if let Some(pid) = parent_id {
3000        metadata.insert("parent_memory_id".to_string(), serde_json::json!(pid));
3001    }
3002
3003    let input = CreateMemoryInput {
3004        content: format!("# {}\n\n{}", title, content),
3005        memory_type: MemoryType::Context,
3006        tags: vec!["section".to_string()],
3007        metadata,
3008        importance: Some(0.6),
3009        scope: MemoryScope::Global,
3010        workspace: workspace.map(String::from),
3011        tier: MemoryTier::Permanent,
3012        defer_embedding: false,
3013        ttl_seconds: None,
3014        dedup_mode: DedupMode::Skip,
3015        dedup_threshold: None,
3016        event_time: None,
3017        event_duration_seconds: None,
3018        trigger_pattern: None,
3019        summary_of_id: None,
3020        media_url: None,
3021    };
3022
3023    create_memory(conn, &input)
3024}
3025
3026/// Create a checkpoint memory for session state
3027pub fn create_checkpoint(
3028    conn: &Connection,
3029    session_id: &str,
3030    summary: &str,
3031    context: &HashMap<String, serde_json::Value>,
3032    workspace: Option<&str>,
3033) -> Result<Memory> {
3034    let mut metadata = context.clone();
3035    metadata.insert(
3036        "checkpoint_session".to_string(),
3037        serde_json::json!(session_id),
3038    );
3039    metadata.insert(
3040        "checkpoint_time".to_string(),
3041        serde_json::json!(Utc::now().to_rfc3339()),
3042    );
3043
3044    let input = CreateMemoryInput {
3045        content: format!("Session Checkpoint: {}\n\n{}", session_id, summary),
3046        memory_type: MemoryType::Context,
3047        tags: vec!["checkpoint".to_string(), format!("session:{}", session_id)],
3048        metadata,
3049        importance: Some(0.7),
3050        scope: MemoryScope::Global,
3051        workspace: workspace.map(String::from),
3052        tier: MemoryTier::Permanent,
3053        defer_embedding: false,
3054        ttl_seconds: None,
3055        dedup_mode: DedupMode::Allow,
3056        dedup_threshold: None,
3057        event_time: None,
3058        event_duration_seconds: None,
3059        trigger_pattern: None,
3060        summary_of_id: None,
3061        media_url: None,
3062    };
3063
3064    create_memory(conn, &input)
3065}
3066
3067/// Temporarily boost a memory's importance
3068pub fn boost_memory(
3069    conn: &Connection,
3070    id: i64,
3071    boost_amount: f32,
3072    duration_seconds: Option<i64>,
3073) -> Result<Memory> {
3074    let memory = get_memory(conn, id)?;
3075    let new_importance = (memory.importance + boost_amount).min(1.0);
3076    let now = Utc::now();
3077
3078    // Update importance
3079    conn.execute(
3080        "UPDATE memories SET importance = ?, updated_at = ? WHERE id = ?",
3081        params![new_importance, now.to_rfc3339(), id],
3082    )?;
3083
3084    // If duration specified, store boost info in metadata for later decay
3085    if let Some(duration) = duration_seconds {
3086        let expires = now + chrono::Duration::seconds(duration);
3087        let mut metadata = memory.metadata.clone();
3088        metadata.insert(
3089            "boost_expires".to_string(),
3090            serde_json::json!(expires.to_rfc3339()),
3091        );
3092        metadata.insert(
3093            "boost_original_importance".to_string(),
3094            serde_json::json!(memory.importance),
3095        );
3096
3097        let metadata_json = serde_json::to_string(&metadata)?;
3098        conn.execute(
3099            "UPDATE memories SET metadata = ? WHERE id = ?",
3100            params![metadata_json, id],
3101        )?;
3102    }
3103
3104    get_memory(conn, id)
3105}
3106
3107// =============================================================================
3108// Event System
3109// =============================================================================
3110
3111/// Event types for the memory system
3112#[derive(Debug, Clone, Serialize, Deserialize)]
3113pub enum MemoryEventType {
3114    Created,
3115    Updated,
3116    Deleted,
3117    Linked,
3118    Unlinked,
3119    Shared,
3120    Synced,
3121}
3122
3123impl std::fmt::Display for MemoryEventType {
3124    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3125        match self {
3126            MemoryEventType::Created => write!(f, "created"),
3127            MemoryEventType::Updated => write!(f, "updated"),
3128            MemoryEventType::Deleted => write!(f, "deleted"),
3129            MemoryEventType::Linked => write!(f, "linked"),
3130            MemoryEventType::Unlinked => write!(f, "unlinked"),
3131            MemoryEventType::Shared => write!(f, "shared"),
3132            MemoryEventType::Synced => write!(f, "synced"),
3133        }
3134    }
3135}
3136
3137impl std::str::FromStr for MemoryEventType {
3138    type Err = EngramError;
3139    fn from_str(s: &str) -> Result<Self> {
3140        match s {
3141            "created" => Ok(MemoryEventType::Created),
3142            "updated" => Ok(MemoryEventType::Updated),
3143            "deleted" => Ok(MemoryEventType::Deleted),
3144            "linked" => Ok(MemoryEventType::Linked),
3145            "unlinked" => Ok(MemoryEventType::Unlinked),
3146            "shared" => Ok(MemoryEventType::Shared),
3147            "synced" => Ok(MemoryEventType::Synced),
3148            _ => Err(EngramError::InvalidInput(format!(
3149                "Invalid event type: {}",
3150                s
3151            ))),
3152        }
3153    }
3154}
3155
3156/// A memory event for tracking changes
3157#[derive(Debug, Clone, Serialize, Deserialize)]
3158pub struct MemoryEvent {
3159    pub id: i64,
3160    pub event_type: String,
3161    pub memory_id: Option<i64>,
3162    pub agent_id: Option<String>,
3163    pub data: serde_json::Value,
3164    pub created_at: DateTime<Utc>,
3165}
3166
3167/// Record an event in the event system
3168pub fn record_event(
3169    conn: &Connection,
3170    event_type: MemoryEventType,
3171    memory_id: Option<i64>,
3172    agent_id: Option<&str>,
3173    data: serde_json::Value,
3174) -> Result<i64> {
3175    let now = Utc::now();
3176    let data_json = serde_json::to_string(&data)?;
3177
3178    conn.execute(
3179        "INSERT INTO memory_events (event_type, memory_id, agent_id, data, created_at)
3180         VALUES (?, ?, ?, ?, ?)",
3181        params![
3182            event_type.to_string(),
3183            memory_id,
3184            agent_id,
3185            data_json,
3186            now.to_rfc3339()
3187        ],
3188    )?;
3189
3190    Ok(conn.last_insert_rowid())
3191}
3192
3193/// Poll for events since a given timestamp or event ID
3194pub fn poll_events(
3195    conn: &Connection,
3196    since_id: Option<i64>,
3197    since_time: Option<DateTime<Utc>>,
3198    agent_id: Option<&str>,
3199    limit: Option<usize>,
3200) -> Result<Vec<MemoryEvent>> {
3201    let limit = limit.unwrap_or(100);
3202
3203    let (query, params): (&str, Vec<Box<dyn rusqlite::ToSql>>) =
3204        match (since_id, since_time, agent_id) {
3205            (Some(id), _, Some(agent)) => (
3206                "SELECT id, event_type, memory_id, agent_id, data, created_at
3207             FROM memory_events WHERE id > ? AND (agent_id = ? OR agent_id IS NULL)
3208             ORDER BY id ASC LIMIT ?",
3209                vec![
3210                    Box::new(id),
3211                    Box::new(agent.to_string()),
3212                    Box::new(limit as i64),
3213                ],
3214            ),
3215            (Some(id), _, None) => (
3216                "SELECT id, event_type, memory_id, agent_id, data, created_at
3217             FROM memory_events WHERE id > ?
3218             ORDER BY id ASC LIMIT ?",
3219                vec![Box::new(id), Box::new(limit as i64)],
3220            ),
3221            (None, Some(time), Some(agent)) => (
3222                "SELECT id, event_type, memory_id, agent_id, data, created_at
3223             FROM memory_events WHERE created_at > ? AND (agent_id = ? OR agent_id IS NULL)
3224             ORDER BY id ASC LIMIT ?",
3225                vec![
3226                    Box::new(time.to_rfc3339()),
3227                    Box::new(agent.to_string()),
3228                    Box::new(limit as i64),
3229                ],
3230            ),
3231            (None, Some(time), None) => (
3232                "SELECT id, event_type, memory_id, agent_id, data, created_at
3233             FROM memory_events WHERE created_at > ?
3234             ORDER BY id ASC LIMIT ?",
3235                vec![Box::new(time.to_rfc3339()), Box::new(limit as i64)],
3236            ),
3237            (None, None, Some(agent)) => (
3238                "SELECT id, event_type, memory_id, agent_id, data, created_at
3239             FROM memory_events WHERE agent_id = ? OR agent_id IS NULL
3240             ORDER BY id DESC LIMIT ?",
3241                vec![Box::new(agent.to_string()), Box::new(limit as i64)],
3242            ),
3243            (None, None, None) => (
3244                "SELECT id, event_type, memory_id, agent_id, data, created_at
3245             FROM memory_events ORDER BY id DESC LIMIT ?",
3246                vec![Box::new(limit as i64)],
3247            ),
3248        };
3249
3250    let params_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect();
3251    let mut stmt = conn.prepare(query)?;
3252    let events = stmt
3253        .query_map(params_refs.as_slice(), |row| {
3254            let data_str: String = row.get(4)?;
3255            let created_str: String = row.get(5)?;
3256            Ok(MemoryEvent {
3257                id: row.get(0)?,
3258                event_type: row.get(1)?,
3259                memory_id: row.get(2)?,
3260                agent_id: row.get(3)?,
3261                data: serde_json::from_str(&data_str).unwrap_or(serde_json::json!({})),
3262                created_at: DateTime::parse_from_rfc3339(&created_str)
3263                    .map(|dt| dt.with_timezone(&Utc))
3264                    .unwrap_or_else(|_| Utc::now()),
3265            })
3266        })?
3267        .collect::<std::result::Result<Vec<_>, _>>()?;
3268
3269    Ok(events)
3270}
3271
3272/// Clear old events (cleanup)
3273pub fn clear_events(
3274    conn: &Connection,
3275    before_id: Option<i64>,
3276    before_time: Option<DateTime<Utc>>,
3277    keep_recent: Option<usize>,
3278) -> Result<i64> {
3279    let deleted = if let Some(id) = before_id {
3280        conn.execute("DELETE FROM memory_events WHERE id < ?", params![id])?
3281    } else if let Some(time) = before_time {
3282        conn.execute(
3283            "DELETE FROM memory_events WHERE created_at < ?",
3284            params![time.to_rfc3339()],
3285        )?
3286    } else if let Some(keep) = keep_recent {
3287        // Keep only the most recent N events
3288        conn.execute(
3289            "DELETE FROM memory_events WHERE id NOT IN (
3290                SELECT id FROM memory_events ORDER BY id DESC LIMIT ?
3291            )",
3292            params![keep as i64],
3293        )?
3294    } else {
3295        // Clear all events
3296        conn.execute("DELETE FROM memory_events", [])?
3297    };
3298
3299    Ok(deleted as i64)
3300}
3301
3302// =============================================================================
3303// Advanced Sync
3304// =============================================================================
3305
3306/// Sync version info
3307#[derive(Debug, Clone, Serialize, Deserialize)]
3308pub struct SyncVersion {
3309    pub version: i64,
3310    pub last_modified: DateTime<Utc>,
3311    pub memory_count: i64,
3312    pub checksum: String,
3313}
3314
3315/// Sync task status record (Phase 3 - Langfuse integration)
3316#[derive(Debug, Clone, Serialize, Deserialize)]
3317pub struct SyncTask {
3318    pub task_id: String,
3319    pub task_type: String,
3320    pub status: String,
3321    pub progress_percent: i32,
3322    pub traces_processed: i64,
3323    pub memories_created: i64,
3324    pub error_message: Option<String>,
3325    pub started_at: String,
3326    pub completed_at: Option<String>,
3327}
3328
3329/// Get the current sync version
3330pub fn get_sync_version(conn: &Connection) -> Result<SyncVersion> {
3331    let memory_count: i64 =
3332        conn.query_row("SELECT COUNT(*) FROM memories", [], |row| row.get(0))?;
3333
3334    let last_modified: Option<String> = conn
3335        .query_row("SELECT MAX(updated_at) FROM memories", [], |row| row.get(0))
3336        .ok();
3337
3338    let version: i64 = conn
3339        .query_row("SELECT MAX(version) FROM sync_state", [], |row| row.get(0))
3340        .unwrap_or(0);
3341
3342    // Simple checksum based on count and last modified
3343    let checksum = format!(
3344        "{}-{}-{}",
3345        memory_count,
3346        version,
3347        last_modified.as_deref().unwrap_or("none")
3348    );
3349
3350    Ok(SyncVersion {
3351        version,
3352        last_modified: last_modified
3353            .and_then(|s| DateTime::parse_from_rfc3339(&s).ok())
3354            .map(|dt| dt.with_timezone(&Utc))
3355            .unwrap_or_else(Utc::now),
3356        memory_count,
3357        checksum,
3358    })
3359}
3360
3361/// Insert or update a sync task record
3362pub fn upsert_sync_task(conn: &Connection, task: &SyncTask) -> Result<()> {
3363    conn.execute(
3364        r#"
3365        INSERT INTO sync_tasks (
3366            task_id, task_type, status, progress_percent, traces_processed, memories_created,
3367            error_message, started_at, completed_at
3368        )
3369        VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
3370        ON CONFLICT(task_id) DO UPDATE SET
3371            task_type = excluded.task_type,
3372            status = excluded.status,
3373            progress_percent = excluded.progress_percent,
3374            traces_processed = excluded.traces_processed,
3375            memories_created = excluded.memories_created,
3376            error_message = excluded.error_message,
3377            started_at = excluded.started_at,
3378            completed_at = excluded.completed_at
3379        "#,
3380        params![
3381            task.task_id,
3382            task.task_type,
3383            task.status,
3384            task.progress_percent,
3385            task.traces_processed,
3386            task.memories_created,
3387            task.error_message,
3388            task.started_at,
3389            task.completed_at
3390        ],
3391    )?;
3392
3393    Ok(())
3394}
3395
3396/// Get a sync task by ID
3397pub fn get_sync_task(conn: &Connection, task_id: &str) -> Result<Option<SyncTask>> {
3398    let mut stmt = conn.prepare(
3399        r#"
3400        SELECT task_id, task_type, status, progress_percent, traces_processed, memories_created,
3401               error_message, started_at, completed_at
3402        FROM sync_tasks
3403        WHERE task_id = ?
3404        "#,
3405    )?;
3406
3407    let mut rows = stmt.query(params![task_id])?;
3408    if let Some(row) = rows.next()? {
3409        Ok(Some(SyncTask {
3410            task_id: row.get("task_id")?,
3411            task_type: row.get("task_type")?,
3412            status: row.get("status")?,
3413            progress_percent: row.get("progress_percent")?,
3414            traces_processed: row.get("traces_processed")?,
3415            memories_created: row.get("memories_created")?,
3416            error_message: row.get("error_message")?,
3417            started_at: row.get("started_at")?,
3418            completed_at: row.get("completed_at")?,
3419        }))
3420    } else {
3421        Ok(None)
3422    }
3423}
3424
3425/// Delta entry for sync
3426#[derive(Debug, Clone, Serialize, Deserialize)]
3427pub struct SyncDelta {
3428    pub created: Vec<Memory>,
3429    pub updated: Vec<Memory>,
3430    pub deleted: Vec<i64>,
3431    pub from_version: i64,
3432    pub to_version: i64,
3433}
3434
3435/// Get changes since a specific version
3436pub fn get_sync_delta(conn: &Connection, since_version: i64) -> Result<SyncDelta> {
3437    let current_version = get_sync_version(conn)?.version;
3438
3439    // Get events since that version to determine what changed
3440    let events = poll_events(conn, Some(since_version), None, None, Some(10000))?;
3441
3442    let mut created_ids = std::collections::HashSet::new();
3443    let mut updated_ids = std::collections::HashSet::new();
3444    let mut deleted_ids = std::collections::HashSet::new();
3445
3446    for event in events {
3447        if let Some(memory_id) = event.memory_id {
3448            match event.event_type.as_str() {
3449                "created" => {
3450                    created_ids.insert(memory_id);
3451                }
3452                "updated" => {
3453                    if !created_ids.contains(&memory_id) {
3454                        updated_ids.insert(memory_id);
3455                    }
3456                }
3457                "deleted" => {
3458                    created_ids.remove(&memory_id);
3459                    updated_ids.remove(&memory_id);
3460                    deleted_ids.insert(memory_id);
3461                }
3462                _ => {}
3463            }
3464        }
3465    }
3466
3467    let created: Vec<Memory> = created_ids
3468        .iter()
3469        .filter_map(|id| get_memory(conn, *id).ok())
3470        .collect();
3471
3472    let updated: Vec<Memory> = updated_ids
3473        .iter()
3474        .filter_map(|id| get_memory(conn, *id).ok())
3475        .collect();
3476
3477    Ok(SyncDelta {
3478        created,
3479        updated,
3480        deleted: deleted_ids.into_iter().collect(),
3481        from_version: since_version,
3482        to_version: current_version,
3483    })
3484}
3485
3486/// Agent sync state
3487#[derive(Debug, Clone, Serialize, Deserialize)]
3488pub struct AgentSyncState {
3489    pub agent_id: String,
3490    pub last_sync_version: i64,
3491    pub last_sync_time: DateTime<Utc>,
3492    pub pending_changes: i64,
3493}
3494
3495/// Get sync state for a specific agent
3496pub fn get_agent_sync_state(conn: &Connection, agent_id: &str) -> Result<AgentSyncState> {
3497    let result: std::result::Result<(i64, String), rusqlite::Error> = conn.query_row(
3498        "SELECT last_sync_version, last_sync_time FROM agent_sync_state WHERE agent_id = ?",
3499        params![agent_id],
3500        |row| Ok((row.get(0)?, row.get(1)?)),
3501    );
3502
3503    match result {
3504        Ok((version, time_str)) => {
3505            let current_version = get_sync_version(conn)?.version;
3506            let pending = (current_version - version).max(0);
3507
3508            Ok(AgentSyncState {
3509                agent_id: agent_id.to_string(),
3510                last_sync_version: version,
3511                last_sync_time: DateTime::parse_from_rfc3339(&time_str)
3512                    .map(|dt| dt.with_timezone(&Utc))
3513                    .unwrap_or_else(|_| Utc::now()),
3514                pending_changes: pending,
3515            })
3516        }
3517        Err(_) => {
3518            // No sync state yet for this agent
3519            Ok(AgentSyncState {
3520                agent_id: agent_id.to_string(),
3521                last_sync_version: 0,
3522                last_sync_time: Utc::now(),
3523                pending_changes: get_sync_version(conn)?.version,
3524            })
3525        }
3526    }
3527}
3528
3529/// Update sync state for an agent
3530pub fn update_agent_sync_state(conn: &Connection, agent_id: &str, version: i64) -> Result<()> {
3531    let now = Utc::now();
3532    conn.execute(
3533        "INSERT INTO agent_sync_state (agent_id, last_sync_version, last_sync_time)
3534         VALUES (?, ?, ?)
3535         ON CONFLICT(agent_id) DO UPDATE SET
3536            last_sync_version = excluded.last_sync_version,
3537            last_sync_time = excluded.last_sync_time",
3538        params![agent_id, version, now.to_rfc3339()],
3539    )?;
3540    Ok(())
3541}
3542
3543/// Cleanup old sync data
3544pub fn cleanup_sync_data(conn: &Connection, older_than_days: i64) -> Result<i64> {
3545    let cutoff = Utc::now() - chrono::Duration::days(older_than_days);
3546    let deleted = conn.execute(
3547        "DELETE FROM memory_events WHERE created_at < ?",
3548        params![cutoff.to_rfc3339()],
3549    )?;
3550    Ok(deleted as i64)
3551}
3552
3553// =============================================================================
3554// Multi-Agent Sharing
3555// =============================================================================
3556
3557/// A shared memory entry
3558#[derive(Debug, Clone, Serialize, Deserialize)]
3559pub struct SharedMemory {
3560    pub id: i64,
3561    pub memory_id: i64,
3562    pub from_agent: String,
3563    pub to_agent: String,
3564    pub message: Option<String>,
3565    pub acknowledged: bool,
3566    pub acknowledged_at: Option<DateTime<Utc>>,
3567    pub created_at: DateTime<Utc>,
3568}
3569
3570/// Share a memory with another agent
3571pub fn share_memory(
3572    conn: &Connection,
3573    memory_id: i64,
3574    from_agent: &str,
3575    to_agent: &str,
3576    message: Option<&str>,
3577) -> Result<i64> {
3578    let now = Utc::now();
3579
3580    // Verify memory exists
3581    let _ = get_memory(conn, memory_id)?;
3582
3583    conn.execute(
3584        "INSERT INTO shared_memories (memory_id, from_agent, to_agent, message, acknowledged, created_at)
3585         VALUES (?, ?, ?, ?, 0, ?)",
3586        params![memory_id, from_agent, to_agent, message, now.to_rfc3339()],
3587    )?;
3588
3589    let share_id = conn.last_insert_rowid();
3590
3591    // Record event
3592    record_event(
3593        conn,
3594        MemoryEventType::Shared,
3595        Some(memory_id),
3596        Some(from_agent),
3597        serde_json::json!({
3598            "to_agent": to_agent,
3599            "share_id": share_id,
3600            "message": message
3601        }),
3602    )?;
3603
3604    Ok(share_id)
3605}
3606
3607/// Poll for shared memories sent to this agent
3608pub fn poll_shared_memories(
3609    conn: &Connection,
3610    to_agent: &str,
3611    include_acknowledged: bool,
3612) -> Result<Vec<SharedMemory>> {
3613    let query = if include_acknowledged {
3614        "SELECT id, memory_id, from_agent, to_agent, message, acknowledged, acknowledged_at, created_at
3615         FROM shared_memories WHERE to_agent = ?
3616         ORDER BY created_at DESC"
3617    } else {
3618        "SELECT id, memory_id, from_agent, to_agent, message, acknowledged, acknowledged_at, created_at
3619         FROM shared_memories WHERE to_agent = ? AND acknowledged = 0
3620         ORDER BY created_at DESC"
3621    };
3622
3623    let mut stmt = conn.prepare(query)?;
3624    let shares = stmt
3625        .query_map(params![to_agent], |row| {
3626            let created_str: String = row.get(7)?;
3627            let ack_str: Option<String> = row.get(6)?;
3628            Ok(SharedMemory {
3629                id: row.get(0)?,
3630                memory_id: row.get(1)?,
3631                from_agent: row.get(2)?,
3632                to_agent: row.get(3)?,
3633                message: row.get(4)?,
3634                acknowledged: row.get(5)?,
3635                acknowledged_at: ack_str.and_then(|s| {
3636                    DateTime::parse_from_rfc3339(&s)
3637                        .ok()
3638                        .map(|dt| dt.with_timezone(&Utc))
3639                }),
3640                created_at: DateTime::parse_from_rfc3339(&created_str)
3641                    .map(|dt| dt.with_timezone(&Utc))
3642                    .unwrap_or_else(|_| Utc::now()),
3643            })
3644        })?
3645        .collect::<std::result::Result<Vec<_>, _>>()?;
3646
3647    Ok(shares)
3648}
3649
3650/// Acknowledge a shared memory
3651pub fn acknowledge_share(conn: &Connection, share_id: i64, agent_id: &str) -> Result<()> {
3652    let now = Utc::now();
3653
3654    let affected = conn.execute(
3655        "UPDATE shared_memories SET acknowledged = 1, acknowledged_at = ?
3656         WHERE id = ? AND to_agent = ?",
3657        params![now.to_rfc3339(), share_id, agent_id],
3658    )?;
3659
3660    if affected == 0 {
3661        return Err(EngramError::NotFound(share_id));
3662    }
3663
3664    Ok(())
3665}
3666
3667// =============================================================================
3668// Search Variants
3669// =============================================================================
3670
3671/// Search memories by identity (canonical ID or alias)
3672pub fn search_by_identity(
3673    conn: &Connection,
3674    identity: &str,
3675    workspace: Option<&str>,
3676    limit: Option<usize>,
3677) -> Result<Vec<Memory>> {
3678    let limit = limit.unwrap_or(50);
3679    let now = Utc::now().to_rfc3339();
3680
3681    // Search in content and tags for the identity
3682    // Tags are in a junction table, so we need to use a subquery or JOIN
3683    let pattern = format!("%{}%", identity);
3684
3685    let query = if workspace.is_some() {
3686        "SELECT DISTINCT m.id, m.content, m.memory_type, m.importance, m.access_count,
3687                m.created_at, m.updated_at, m.last_accessed_at, m.owner_id,
3688                m.visibility, m.version, m.has_embedding, m.metadata,
3689                m.scope_type, m.scope_id, m.workspace, m.tier, m.expires_at, m.content_hash,
3690                m.event_time, m.event_duration_seconds, m.trigger_pattern, m.procedure_success_count,
3691                m.procedure_failure_count, m.summary_of_id, m.lifecycle_state, m.media_url
3692         FROM memories m
3693         LEFT JOIN memory_tags mt ON m.id = mt.memory_id
3694         LEFT JOIN tags t ON mt.tag_id = t.id
3695         WHERE m.workspace = ? AND (m.content LIKE ? OR t.name LIKE ?)
3696           AND m.valid_to IS NULL
3697           AND (m.expires_at IS NULL OR m.expires_at > ?)
3698         ORDER BY m.importance DESC, m.created_at DESC
3699         LIMIT ?"
3700    } else {
3701        "SELECT DISTINCT m.id, m.content, m.memory_type, m.importance, m.access_count,
3702                m.created_at, m.updated_at, m.last_accessed_at, m.owner_id,
3703                m.visibility, m.version, m.has_embedding, m.metadata,
3704                m.scope_type, m.scope_id, m.workspace, m.tier, m.expires_at, m.content_hash,
3705                m.event_time, m.event_duration_seconds, m.trigger_pattern, m.procedure_success_count,
3706                m.procedure_failure_count, m.summary_of_id, m.lifecycle_state, m.media_url
3707         FROM memories m
3708         LEFT JOIN memory_tags mt ON m.id = mt.memory_id
3709         LEFT JOIN tags t ON mt.tag_id = t.id
3710         WHERE (m.content LIKE ? OR t.name LIKE ?)
3711           AND m.valid_to IS NULL
3712           AND (m.expires_at IS NULL OR m.expires_at > ?)
3713         ORDER BY m.importance DESC, m.created_at DESC
3714         LIMIT ?"
3715    };
3716
3717    let mut stmt = conn.prepare(query)?;
3718
3719    let memories = if let Some(ws) = workspace {
3720        stmt.query_map(
3721            params![ws, &pattern, &pattern, &now, limit as i64],
3722            memory_from_row,
3723        )?
3724        .collect::<std::result::Result<Vec<_>, _>>()?
3725    } else {
3726        stmt.query_map(
3727            params![&pattern, &pattern, &now, limit as i64],
3728            memory_from_row,
3729        )?
3730        .collect::<std::result::Result<Vec<_>, _>>()?
3731    };
3732
3733    Ok(memories)
3734}
3735
3736/// Search within session transcript chunks
3737pub fn search_sessions(
3738    conn: &Connection,
3739    query_text: &str,
3740    session_id: Option<&str>,
3741    workspace: Option<&str>,
3742    limit: Option<usize>,
3743) -> Result<Vec<Memory>> {
3744    let limit = limit.unwrap_or(20);
3745    let now = Utc::now().to_rfc3339();
3746    let pattern = format!("%{}%", query_text);
3747
3748    // Build query based on filters
3749    // Session chunks are stored as TranscriptChunk type (not Context)
3750    let mut conditions = vec![
3751        "m.memory_type = 'transcript_chunk'",
3752        "m.valid_to IS NULL",
3753        "(m.expires_at IS NULL OR m.expires_at > ?)",
3754    ];
3755    let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = vec![Box::new(now)];
3756
3757    // Add session filter via tags (tags are in junction table)
3758    let use_tag_join = session_id.is_some();
3759    if let Some(sid) = session_id {
3760        let tag_name = format!("session:{}", sid);
3761        conditions.push("t.name = ?");
3762        params_vec.push(Box::new(tag_name));
3763    }
3764
3765    // Add workspace filter
3766    if let Some(ws) = workspace {
3767        conditions.push("m.workspace = ?");
3768        params_vec.push(Box::new(ws.to_string()));
3769    }
3770
3771    // Add content search
3772    conditions.push("m.content LIKE ?");
3773    params_vec.push(Box::new(pattern));
3774
3775    // Add limit
3776    params_vec.push(Box::new(limit as i64));
3777
3778    // Build query with optional tag join
3779    let join_clause = if use_tag_join {
3780        "JOIN memory_tags mt ON m.id = mt.memory_id JOIN tags t ON mt.tag_id = t.id"
3781    } else {
3782        ""
3783    };
3784
3785    let query = format!(
3786        "SELECT DISTINCT m.id, m.content, m.memory_type, m.importance, m.access_count,
3787                m.created_at, m.updated_at, m.last_accessed_at, m.owner_id,
3788                m.visibility, m.version, m.has_embedding, m.metadata,
3789                m.scope_type, m.scope_id, m.workspace, m.tier, m.expires_at, m.content_hash,
3790                m.event_time, m.event_duration_seconds, m.trigger_pattern, m.procedure_success_count,
3791                m.procedure_failure_count, m.summary_of_id, m.lifecycle_state, m.media_url
3792         FROM memories m {} WHERE {} ORDER BY m.created_at DESC LIMIT ?",
3793        join_clause,
3794        conditions.join(" AND ")
3795    );
3796
3797    let params_refs: Vec<&dyn rusqlite::ToSql> = params_vec.iter().map(|p| p.as_ref()).collect();
3798    let mut stmt = conn.prepare(&query)?;
3799    let memories = stmt
3800        .query_map(params_refs.as_slice(), memory_from_row)?
3801        .collect::<std::result::Result<Vec<_>, _>>()?;
3802
3803    Ok(memories)
3804}
3805
3806#[cfg(test)]
3807mod tests {
3808    use super::*;
3809    use crate::storage::Storage;
3810    use serde_json::json;
3811    use std::collections::HashMap;
3812
3813    #[test]
3814    fn test_list_memories_metadata_filter_types() {
3815        let storage = Storage::open_in_memory().unwrap();
3816
3817        storage
3818            .with_connection(|conn| {
3819                let mut metadata1 = HashMap::new();
3820                metadata1.insert("status".to_string(), json!("active"));
3821                metadata1.insert("count".to_string(), json!(3));
3822                metadata1.insert("flag".to_string(), json!(true));
3823
3824                let mut metadata2 = HashMap::new();
3825                metadata2.insert("status".to_string(), json!("inactive"));
3826                metadata2.insert("count".to_string(), json!(5));
3827                metadata2.insert("flag".to_string(), json!(false));
3828                metadata2.insert("optional".to_string(), json!("set"));
3829
3830                let memory1 = create_memory(
3831                    conn,
3832                    &CreateMemoryInput {
3833                        content: "First".to_string(),
3834                        memory_type: MemoryType::Note,
3835                        tags: vec![],
3836                        metadata: metadata1,
3837                        importance: None,
3838                        scope: Default::default(),
3839                        workspace: None,
3840                        tier: Default::default(),
3841                        defer_embedding: true,
3842                        ttl_seconds: None,
3843                        dedup_mode: Default::default(),
3844                        dedup_threshold: None,
3845                        event_time: None,
3846                        event_duration_seconds: None,
3847                        trigger_pattern: None,
3848                        summary_of_id: None,
3849                            media_url: None,
3850                    },
3851                )?;
3852                let memory2 = create_memory(
3853                    conn,
3854                    &CreateMemoryInput {
3855                        content: "Second".to_string(),
3856                        memory_type: MemoryType::Note,
3857                        tags: vec![],
3858                        metadata: metadata2,
3859                        importance: None,
3860                        scope: Default::default(),
3861                        workspace: None,
3862                        tier: Default::default(),
3863                        defer_embedding: true,
3864                        ttl_seconds: None,
3865                        dedup_mode: Default::default(),
3866                        dedup_threshold: None,
3867                        event_time: None,
3868                        event_duration_seconds: None,
3869                        trigger_pattern: None,
3870                        summary_of_id: None,
3871                            media_url: None,
3872                    },
3873                )?;
3874
3875                let mut filter = HashMap::new();
3876                filter.insert("status".to_string(), json!("active"));
3877                let results = list_memories(
3878                    conn,
3879                    &ListOptions {
3880                        metadata_filter: Some(filter),
3881                        ..Default::default()
3882                    },
3883                )?;
3884                assert_eq!(results.len(), 1);
3885                assert_eq!(results[0].id, memory1.id);
3886
3887                let mut filter = HashMap::new();
3888                filter.insert("count".to_string(), json!(5));
3889                let results = list_memories(
3890                    conn,
3891                    &ListOptions {
3892                        metadata_filter: Some(filter),
3893                        ..Default::default()
3894                    },
3895                )?;
3896                assert_eq!(results.len(), 1);
3897                assert_eq!(results[0].id, memory2.id);
3898
3899                let mut filter = HashMap::new();
3900                filter.insert("flag".to_string(), json!(true));
3901                let results = list_memories(
3902                    conn,
3903                    &ListOptions {
3904                        metadata_filter: Some(filter),
3905                        ..Default::default()
3906                    },
3907                )?;
3908                assert_eq!(results.len(), 1);
3909                assert_eq!(results[0].id, memory1.id);
3910
3911                let mut filter = HashMap::new();
3912                filter.insert("optional".to_string(), serde_json::Value::Null);
3913                let results = list_memories(
3914                    conn,
3915                    &ListOptions {
3916                        metadata_filter: Some(filter),
3917                        ..Default::default()
3918                    },
3919                )?;
3920                assert_eq!(results.len(), 1);
3921                assert_eq!(results[0].id, memory1.id);
3922
3923                Ok(())
3924            })
3925            .unwrap();
3926    }
3927
3928    #[test]
3929    fn test_memory_scope_isolation() {
3930        use crate::types::MemoryScope;
3931
3932        let storage = Storage::open_in_memory().unwrap();
3933
3934        storage
3935            .with_connection(|conn| {
3936                // Create memory with user scope
3937                let user1_memory = create_memory(
3938                    conn,
3939                    &CreateMemoryInput {
3940                        content: "User 1 memory".to_string(),
3941                        memory_type: MemoryType::Note,
3942                        tags: vec!["test".to_string()],
3943                        metadata: HashMap::new(),
3944                        importance: None,
3945                        scope: MemoryScope::user("user-1"),
3946                        workspace: None,
3947                        tier: Default::default(),
3948                        defer_embedding: true,
3949                        ttl_seconds: None,
3950                        dedup_mode: Default::default(),
3951                        dedup_threshold: None,
3952                        event_time: None,
3953                        event_duration_seconds: None,
3954                        trigger_pattern: None,
3955                        summary_of_id: None,
3956                            media_url: None,
3957                    },
3958                )?;
3959
3960                // Create memory with different user scope
3961                let user2_memory = create_memory(
3962                    conn,
3963                    &CreateMemoryInput {
3964                        content: "User 2 memory".to_string(),
3965                        memory_type: MemoryType::Note,
3966                        tags: vec!["test".to_string()],
3967                        metadata: HashMap::new(),
3968                        importance: None,
3969                        scope: MemoryScope::user("user-2"),
3970                        workspace: None,
3971                        tier: Default::default(),
3972                        defer_embedding: true,
3973                        ttl_seconds: None,
3974                        dedup_mode: Default::default(),
3975                        dedup_threshold: None,
3976                        event_time: None,
3977                        event_duration_seconds: None,
3978                        trigger_pattern: None,
3979                        summary_of_id: None,
3980                            media_url: None,
3981                    },
3982                )?;
3983
3984                // Create memory with session scope
3985                let session_memory = create_memory(
3986                    conn,
3987                    &CreateMemoryInput {
3988                        content: "Session memory".to_string(),
3989                        memory_type: MemoryType::Note,
3990                        tags: vec!["test".to_string()],
3991                        metadata: HashMap::new(),
3992                        importance: None,
3993                        scope: MemoryScope::session("session-abc"),
3994                        workspace: None,
3995                        tier: Default::default(),
3996                        defer_embedding: true,
3997                        ttl_seconds: None,
3998                        dedup_mode: Default::default(),
3999                        dedup_threshold: None,
4000                        event_time: None,
4001                        event_duration_seconds: None,
4002                        trigger_pattern: None,
4003                        summary_of_id: None,
4004                            media_url: None,
4005                    },
4006                )?;
4007
4008                // Create memory with global scope
4009                let global_memory = create_memory(
4010                    conn,
4011                    &CreateMemoryInput {
4012                        content: "Global memory".to_string(),
4013                        memory_type: MemoryType::Note,
4014                        tags: vec!["test".to_string()],
4015                        metadata: HashMap::new(),
4016                        importance: None,
4017                        scope: MemoryScope::Global,
4018                        workspace: None,
4019                        tier: Default::default(),
4020                        defer_embedding: true,
4021                        ttl_seconds: None,
4022                        dedup_mode: Default::default(),
4023                        dedup_threshold: None,
4024                        event_time: None,
4025                        event_duration_seconds: None,
4026                        trigger_pattern: None,
4027                        summary_of_id: None,
4028                            media_url: None,
4029                    },
4030                )?;
4031
4032                // Test: List all memories (no scope filter) should return all 4
4033                let all_results = list_memories(conn, &ListOptions::default())?;
4034                assert_eq!(all_results.len(), 4);
4035
4036                // Test: Filter by user-1 scope should return only user-1's memory
4037                let user1_results = list_memories(
4038                    conn,
4039                    &ListOptions {
4040                        scope: Some(MemoryScope::user("user-1")),
4041                        ..Default::default()
4042                    },
4043                )?;
4044                assert_eq!(user1_results.len(), 1);
4045                assert_eq!(user1_results[0].id, user1_memory.id);
4046                assert_eq!(user1_results[0].scope, MemoryScope::user("user-1"));
4047
4048                // Test: Filter by user-2 scope should return only user-2's memory
4049                let user2_results = list_memories(
4050                    conn,
4051                    &ListOptions {
4052                        scope: Some(MemoryScope::user("user-2")),
4053                        ..Default::default()
4054                    },
4055                )?;
4056                assert_eq!(user2_results.len(), 1);
4057                assert_eq!(user2_results[0].id, user2_memory.id);
4058
4059                // Test: Filter by session scope should return only session memory
4060                let session_results = list_memories(
4061                    conn,
4062                    &ListOptions {
4063                        scope: Some(MemoryScope::session("session-abc")),
4064                        ..Default::default()
4065                    },
4066                )?;
4067                assert_eq!(session_results.len(), 1);
4068                assert_eq!(session_results[0].id, session_memory.id);
4069
4070                // Test: Filter by global scope should return only global memory
4071                let global_results = list_memories(
4072                    conn,
4073                    &ListOptions {
4074                        scope: Some(MemoryScope::Global),
4075                        ..Default::default()
4076                    },
4077                )?;
4078                assert_eq!(global_results.len(), 1);
4079                assert_eq!(global_results[0].id, global_memory.id);
4080
4081                // Test: Verify scope is correctly stored and retrieved
4082                let retrieved = get_memory(conn, user1_memory.id)?;
4083                assert_eq!(retrieved.scope, MemoryScope::user("user-1"));
4084
4085                Ok(())
4086            })
4087            .unwrap();
4088    }
4089
4090    #[test]
4091    fn test_memory_scope_can_access() {
4092        use crate::types::MemoryScope;
4093
4094        // Global can access everything
4095        assert!(MemoryScope::Global.can_access(&MemoryScope::user("user-1")));
4096        assert!(MemoryScope::Global.can_access(&MemoryScope::session("session-1")));
4097        assert!(MemoryScope::Global.can_access(&MemoryScope::agent("agent-1")));
4098        assert!(MemoryScope::Global.can_access(&MemoryScope::Global));
4099
4100        // Same scope can access
4101        assert!(MemoryScope::user("user-1").can_access(&MemoryScope::user("user-1")));
4102        assert!(MemoryScope::session("s1").can_access(&MemoryScope::session("s1")));
4103        assert!(MemoryScope::agent("a1").can_access(&MemoryScope::agent("a1")));
4104
4105        // Different scope IDs cannot access each other
4106        assert!(!MemoryScope::user("user-1").can_access(&MemoryScope::user("user-2")));
4107        assert!(!MemoryScope::session("s1").can_access(&MemoryScope::session("s2")));
4108        assert!(!MemoryScope::agent("a1").can_access(&MemoryScope::agent("a2")));
4109
4110        // Different scope types cannot access each other
4111        assert!(!MemoryScope::user("user-1").can_access(&MemoryScope::session("s1")));
4112        assert!(!MemoryScope::session("s1").can_access(&MemoryScope::agent("a1")));
4113
4114        // Anyone can access global memories
4115        assert!(MemoryScope::user("user-1").can_access(&MemoryScope::Global));
4116        assert!(MemoryScope::session("s1").can_access(&MemoryScope::Global));
4117        assert!(MemoryScope::agent("a1").can_access(&MemoryScope::Global));
4118    }
4119
4120    #[test]
4121    fn test_memory_ttl_creation() {
4122        let storage = Storage::open_in_memory().unwrap();
4123
4124        storage
4125            .with_transaction(|conn| {
4126                // Create daily memory with TTL of 1 hour
4127                let memory = create_memory(
4128                    conn,
4129                    &CreateMemoryInput {
4130                        content: "Temporary memory".to_string(),
4131                        memory_type: MemoryType::Note,
4132                        tags: vec![],
4133                        metadata: HashMap::new(),
4134                        importance: None,
4135                        scope: Default::default(),
4136                        workspace: None,
4137                        tier: MemoryTier::Daily, // Daily tier for expiring memories
4138                        defer_embedding: true,
4139                        ttl_seconds: Some(3600), // 1 hour
4140                        dedup_mode: Default::default(),
4141                        dedup_threshold: None,
4142                        event_time: None,
4143                        event_duration_seconds: None,
4144                        trigger_pattern: None,
4145                        summary_of_id: None,
4146                            media_url: None,
4147                    },
4148                )?;
4149
4150                // Verify expires_at is set and tier is daily
4151                assert!(memory.expires_at.is_some());
4152                assert_eq!(memory.tier, MemoryTier::Daily);
4153                let expires_at = memory.expires_at.unwrap();
4154                let now = Utc::now();
4155
4156                // Should expire approximately 1 hour from now (within 5 seconds tolerance)
4157                let diff = (expires_at - now).num_seconds();
4158                assert!(
4159                    (3595..=3605).contains(&diff),
4160                    "Expected ~3600 seconds, got {}",
4161                    diff
4162                );
4163
4164                // Create memory without TTL
4165                let permanent = create_memory(
4166                    conn,
4167                    &CreateMemoryInput {
4168                        content: "Permanent memory".to_string(),
4169                        memory_type: MemoryType::Note,
4170                        tags: vec![],
4171                        metadata: HashMap::new(),
4172                        importance: None,
4173                        scope: Default::default(),
4174                        workspace: None,
4175                        tier: Default::default(),
4176                        defer_embedding: true,
4177                        ttl_seconds: None,
4178                        dedup_mode: Default::default(),
4179                        dedup_threshold: None,
4180                        event_time: None,
4181                        event_duration_seconds: None,
4182                        trigger_pattern: None,
4183                        summary_of_id: None,
4184                            media_url: None,
4185                    },
4186                )?;
4187
4188                // Verify expires_at is None for permanent memory
4189                assert!(permanent.expires_at.is_none());
4190
4191                Ok(())
4192            })
4193            .unwrap();
4194    }
4195
4196    #[test]
4197    fn test_expired_memories_excluded_from_queries() {
4198        let storage = Storage::open_in_memory().unwrap();
4199
4200        storage
4201            .with_transaction(|conn| {
4202                // Create a daily memory with TTL (will expire)
4203                let memory1 = create_memory(
4204                    conn,
4205                    &CreateMemoryInput {
4206                        content: "Memory to expire".to_string(),
4207                        memory_type: MemoryType::Note,
4208                        tags: vec!["test".to_string()],
4209                        metadata: HashMap::new(),
4210                        importance: None,
4211                        scope: Default::default(),
4212                        workspace: None,
4213                        tier: MemoryTier::Daily, // Daily tier for expiring memories
4214                        defer_embedding: true,
4215                        ttl_seconds: Some(3600), // 1 hour TTL
4216                        dedup_mode: Default::default(),
4217                        dedup_threshold: None,
4218                        event_time: None,
4219                        event_duration_seconds: None,
4220                        trigger_pattern: None,
4221                        summary_of_id: None,
4222                            media_url: None,
4223                    },
4224                )?;
4225
4226                // Create a permanent memory
4227                let active = create_memory(
4228                    conn,
4229                    &CreateMemoryInput {
4230                        content: "Active memory".to_string(),
4231                        memory_type: MemoryType::Note,
4232                        tags: vec!["test".to_string()],
4233                        metadata: HashMap::new(),
4234                        importance: None,
4235                        scope: Default::default(),
4236                        workspace: None,
4237                        tier: Default::default(),
4238                        defer_embedding: true,
4239                        ttl_seconds: None,
4240                        dedup_mode: Default::default(),
4241                        dedup_threshold: None,
4242                        event_time: None,
4243                        event_duration_seconds: None,
4244                        trigger_pattern: None,
4245                        summary_of_id: None,
4246                            media_url: None,
4247                    },
4248                )?;
4249
4250                // Both should be visible initially
4251                let results = list_memories(conn, &ListOptions::default())?;
4252                assert_eq!(results.len(), 2);
4253
4254                // Manually expire memory1 by setting expires_at to the past
4255                let past = (Utc::now() - chrono::Duration::hours(1)).to_rfc3339();
4256                conn.execute(
4257                    "UPDATE memories SET expires_at = ? WHERE id = ?",
4258                    params![past, memory1.id],
4259                )?;
4260
4261                // List should only return active memory now
4262                let results = list_memories(conn, &ListOptions::default())?;
4263                assert_eq!(results.len(), 1);
4264                assert_eq!(results[0].id, active.id);
4265
4266                // Direct get_memory should fail for expired
4267                let get_result = get_memory(conn, memory1.id);
4268                assert!(get_result.is_err());
4269
4270                // Direct get_memory should succeed for active
4271                let get_result = get_memory(conn, active.id);
4272                assert!(get_result.is_ok());
4273
4274                Ok(())
4275            })
4276            .unwrap();
4277    }
4278
4279    #[test]
4280    fn test_set_memory_expiration() {
4281        let storage = Storage::open_in_memory().unwrap();
4282
4283        storage
4284            .with_transaction(|conn| {
4285                // Create a permanent memory
4286                let memory = create_memory(
4287                    conn,
4288                    &CreateMemoryInput {
4289                        content: "Initially permanent".to_string(),
4290                        memory_type: MemoryType::Note,
4291                        tags: vec![],
4292                        metadata: HashMap::new(),
4293                        importance: None,
4294                        scope: Default::default(),
4295                        workspace: None,
4296                        tier: Default::default(),
4297                        defer_embedding: true,
4298                        ttl_seconds: None,
4299                        dedup_mode: Default::default(),
4300                        dedup_threshold: None,
4301                        event_time: None,
4302                        event_duration_seconds: None,
4303                        trigger_pattern: None,
4304                        summary_of_id: None,
4305                            media_url: None,
4306                    },
4307                )?;
4308
4309                assert!(memory.expires_at.is_none());
4310
4311                // Set expiration to 30 minutes
4312                let updated = set_memory_expiration(conn, memory.id, Some(1800))?;
4313                assert!(updated.expires_at.is_some());
4314
4315                // Remove expiration (make permanent again) - use Some(0) to clear
4316                let permanent_again = set_memory_expiration(conn, memory.id, Some(0))?;
4317                assert!(permanent_again.expires_at.is_none());
4318
4319                Ok(())
4320            })
4321            .unwrap();
4322    }
4323
4324    #[test]
4325    fn test_cleanup_expired_memories() {
4326        let storage = Storage::open_in_memory().unwrap();
4327
4328        storage
4329            .with_transaction(|conn| {
4330                // Create 3 daily memories that we'll expire manually
4331                let mut expired_ids = vec![];
4332                for i in 0..3 {
4333                    let mem = create_memory(
4334                        conn,
4335                        &CreateMemoryInput {
4336                            content: format!("To expire {}", i),
4337                            memory_type: MemoryType::Note,
4338                            tags: vec![],
4339                            metadata: HashMap::new(),
4340                            importance: None,
4341                            scope: Default::default(),
4342                            workspace: None,
4343                            tier: MemoryTier::Daily, // Daily tier for expiring memories
4344                            defer_embedding: true,
4345                            ttl_seconds: Some(3600), // 1 hour TTL
4346                            dedup_mode: Default::default(),
4347                            dedup_threshold: None,
4348                            event_time: None,
4349                            event_duration_seconds: None,
4350                            trigger_pattern: None,
4351                            summary_of_id: None,
4352                                media_url: None,
4353                        },
4354                    )?;
4355                    expired_ids.push(mem.id);
4356                }
4357
4358                // Create 2 active memories (permanent)
4359                for i in 0..2 {
4360                    create_memory(
4361                        conn,
4362                        &CreateMemoryInput {
4363                            content: format!("Active {}", i),
4364                            memory_type: MemoryType::Note,
4365                            tags: vec![],
4366                            metadata: HashMap::new(),
4367                            importance: None,
4368                            scope: Default::default(),
4369                            workspace: None,
4370                            tier: Default::default(),
4371                            defer_embedding: true,
4372                            ttl_seconds: None,
4373                            dedup_mode: Default::default(),
4374                            dedup_threshold: None,
4375                            event_time: None,
4376                            event_duration_seconds: None,
4377                            trigger_pattern: None,
4378                            summary_of_id: None,
4379                                media_url: None,
4380                        },
4381                    )?;
4382                }
4383
4384                // All 5 should be visible initially
4385                let results = list_memories(conn, &ListOptions::default())?;
4386                assert_eq!(results.len(), 5);
4387
4388                // Manually expire the first 3 memories
4389                let past = (Utc::now() - chrono::Duration::hours(1)).to_rfc3339();
4390                for id in &expired_ids {
4391                    conn.execute(
4392                        "UPDATE memories SET expires_at = ? WHERE id = ?",
4393                        params![past, id],
4394                    )?;
4395                }
4396
4397                // Count expired
4398                let expired_count = count_expired_memories(conn)?;
4399                assert_eq!(expired_count, 3);
4400
4401                // Cleanup should delete 3
4402                let deleted = cleanup_expired_memories(conn)?;
4403                assert_eq!(deleted, 3);
4404
4405                // Verify only 2 remain
4406                let remaining = list_memories(conn, &ListOptions::default())?;
4407                assert_eq!(remaining.len(), 2);
4408
4409                // No more expired
4410                let expired_count = count_expired_memories(conn)?;
4411                assert_eq!(expired_count, 0);
4412
4413                Ok(())
4414            })
4415            .unwrap();
4416    }
4417
4418    // ========== Deduplication Tests (RML-931) ==========
4419
4420    #[test]
4421    fn test_content_hash_computation() {
4422        // Test that content hash is consistent and normalized
4423        let hash1 = compute_content_hash("Hello World");
4424        let hash2 = compute_content_hash("hello world"); // Different case
4425        let hash3 = compute_content_hash("  hello   world  "); // Extra whitespace
4426        let hash4 = compute_content_hash("Hello World!"); // Different content
4427
4428        // Same normalized content should produce same hash
4429        assert_eq!(hash1, hash2);
4430        assert_eq!(hash2, hash3);
4431
4432        // Different content should produce different hash
4433        assert_ne!(hash1, hash4);
4434
4435        // Hash should be prefixed with algorithm
4436        assert!(hash1.starts_with("sha256:"));
4437    }
4438
4439    #[test]
4440    fn test_dedup_mode_reject() {
4441        use crate::types::DedupMode;
4442
4443        let storage = Storage::open_in_memory().unwrap();
4444
4445        storage
4446            .with_transaction(|conn| {
4447                // Create first memory
4448                let _memory1 = create_memory(
4449                    conn,
4450                    &CreateMemoryInput {
4451                        content: "Unique content for testing".to_string(),
4452                        memory_type: MemoryType::Note,
4453                        tags: vec![],
4454                        metadata: HashMap::new(),
4455                        importance: None,
4456                        scope: Default::default(),
4457                        workspace: None,
4458                        tier: Default::default(),
4459                        defer_embedding: true,
4460                        ttl_seconds: None,
4461                        dedup_mode: DedupMode::Allow, // First one allows
4462                        dedup_threshold: None,
4463                        event_time: None,
4464                        event_duration_seconds: None,
4465                        trigger_pattern: None,
4466                        summary_of_id: None,
4467                            media_url: None,
4468                    },
4469                )?;
4470
4471                // Try to create duplicate with reject mode
4472                let result = create_memory(
4473                    conn,
4474                    &CreateMemoryInput {
4475                        content: "Unique content for testing".to_string(), // Same content
4476                        memory_type: MemoryType::Note,
4477                        tags: vec!["new-tag".to_string()],
4478                        metadata: HashMap::new(),
4479                        importance: None,
4480                        scope: Default::default(),
4481                        workspace: None,
4482                        tier: Default::default(),
4483                        defer_embedding: true,
4484                        ttl_seconds: None,
4485                        dedup_mode: DedupMode::Reject,
4486                        dedup_threshold: None,
4487                        event_time: None,
4488                        event_duration_seconds: None,
4489                        trigger_pattern: None,
4490                        summary_of_id: None,
4491                            media_url: None,
4492                    },
4493                );
4494
4495                // Should fail with Duplicate error
4496                assert!(result.is_err());
4497                let err = result.unwrap_err();
4498                assert!(matches!(err, crate::error::EngramError::Duplicate { .. }));
4499
4500                Ok(())
4501            })
4502            .unwrap();
4503    }
4504
4505    #[test]
4506    fn test_dedup_mode_skip() {
4507        use crate::types::DedupMode;
4508
4509        let storage = Storage::open_in_memory().unwrap();
4510
4511        storage
4512            .with_transaction(|conn| {
4513                // Create first memory
4514                let memory1 = create_memory(
4515                    conn,
4516                    &CreateMemoryInput {
4517                        content: "Skip test content".to_string(),
4518                        memory_type: MemoryType::Note,
4519                        tags: vec!["original".to_string()],
4520                        metadata: HashMap::new(),
4521                        importance: Some(0.5),
4522                        scope: Default::default(),
4523                        workspace: None,
4524                        tier: Default::default(),
4525                        defer_embedding: true,
4526                        ttl_seconds: None,
4527                        dedup_mode: DedupMode::Allow,
4528                        dedup_threshold: None,
4529                        event_time: None,
4530                        event_duration_seconds: None,
4531                        trigger_pattern: None,
4532                        summary_of_id: None,
4533                            media_url: None,
4534                    },
4535                )?;
4536
4537                // Try to create duplicate with skip mode
4538                let memory2 = create_memory(
4539                    conn,
4540                    &CreateMemoryInput {
4541                        content: "Skip test content".to_string(), // Same content
4542                        memory_type: MemoryType::Note,
4543                        tags: vec!["new-tag".to_string()], // Different tags
4544                        metadata: HashMap::new(),
4545                        importance: Some(0.9), // Different importance
4546                        scope: Default::default(),
4547                        workspace: None,
4548                        tier: Default::default(),
4549                        defer_embedding: true,
4550                        ttl_seconds: None,
4551                        dedup_mode: DedupMode::Skip,
4552                        dedup_threshold: None,
4553                        event_time: None,
4554                        event_duration_seconds: None,
4555                        trigger_pattern: None,
4556                        summary_of_id: None,
4557                            media_url: None,
4558                    },
4559                )?;
4560
4561                // Should return existing memory unchanged
4562                assert_eq!(memory1.id, memory2.id);
4563                assert_eq!(memory2.tags, vec!["original".to_string()]); // Original tags
4564                assert!((memory2.importance - 0.5).abs() < 0.01); // Original importance
4565
4566                // Only one memory should exist
4567                let all = list_memories(conn, &ListOptions::default())?;
4568                assert_eq!(all.len(), 1);
4569
4570                Ok(())
4571            })
4572            .unwrap();
4573    }
4574
4575    #[test]
4576    fn test_dedup_mode_merge() {
4577        use crate::types::DedupMode;
4578
4579        let storage = Storage::open_in_memory().unwrap();
4580
4581        storage
4582            .with_transaction(|conn| {
4583                // Create first memory with some tags and metadata
4584                let memory1 = create_memory(
4585                    conn,
4586                    &CreateMemoryInput {
4587                        content: "Merge test content".to_string(),
4588                        memory_type: MemoryType::Note,
4589                        tags: vec!["tag1".to_string(), "tag2".to_string()],
4590                        metadata: {
4591                            let mut m = HashMap::new();
4592                            m.insert("key1".to_string(), serde_json::json!("value1"));
4593                            m
4594                        },
4595                        importance: Some(0.5),
4596                        scope: Default::default(),
4597                        workspace: None,
4598                        tier: Default::default(),
4599                        defer_embedding: true,
4600                        ttl_seconds: None,
4601                        dedup_mode: DedupMode::Allow,
4602                        dedup_threshold: None,
4603                        event_time: None,
4604                        event_duration_seconds: None,
4605                        trigger_pattern: None,
4606                        summary_of_id: None,
4607                            media_url: None,
4608                    },
4609                )?;
4610
4611                // Try to create duplicate with merge mode
4612                let memory2 = create_memory(
4613                    conn,
4614                    &CreateMemoryInput {
4615                        content: "Merge test content".to_string(), // Same content
4616                        memory_type: MemoryType::Note,
4617                        tags: vec!["tag2".to_string(), "tag3".to_string()], // Overlapping + new
4618                        metadata: {
4619                            let mut m = HashMap::new();
4620                            m.insert("key2".to_string(), serde_json::json!("value2"));
4621                            m
4622                        },
4623                        importance: Some(0.8), // Higher importance
4624                        scope: Default::default(),
4625                        workspace: None,
4626                        tier: Default::default(),
4627                        defer_embedding: true,
4628                        ttl_seconds: None,
4629                        dedup_mode: DedupMode::Merge,
4630                        dedup_threshold: None,
4631                        event_time: None,
4632                        event_duration_seconds: None,
4633                        trigger_pattern: None,
4634                        summary_of_id: None,
4635                            media_url: None,
4636                    },
4637                )?;
4638
4639                // Should return same memory ID
4640                assert_eq!(memory1.id, memory2.id);
4641
4642                // Tags should be merged (no duplicates)
4643                assert!(memory2.tags.contains(&"tag1".to_string()));
4644                assert!(memory2.tags.contains(&"tag2".to_string()));
4645                assert!(memory2.tags.contains(&"tag3".to_string()));
4646                assert_eq!(memory2.tags.len(), 3);
4647
4648                // Metadata should be merged
4649                assert!(memory2.metadata.contains_key("key1"));
4650                assert!(memory2.metadata.contains_key("key2"));
4651
4652                // Only one memory should exist
4653                let all = list_memories(conn, &ListOptions::default())?;
4654                assert_eq!(all.len(), 1);
4655
4656                Ok(())
4657            })
4658            .unwrap();
4659    }
4660
4661    #[test]
4662    fn test_dedup_mode_allow() {
4663        use crate::types::DedupMode;
4664
4665        let storage = Storage::open_in_memory().unwrap();
4666
4667        storage
4668            .with_transaction(|conn| {
4669                // Create first memory
4670                let memory1 = create_memory(
4671                    conn,
4672                    &CreateMemoryInput {
4673                        content: "Allow duplicates content".to_string(),
4674                        memory_type: MemoryType::Note,
4675                        tags: vec![],
4676                        metadata: HashMap::new(),
4677                        importance: None,
4678                        scope: Default::default(),
4679                        workspace: None,
4680                        tier: Default::default(),
4681                        defer_embedding: true,
4682                        ttl_seconds: None,
4683                        dedup_mode: DedupMode::Allow,
4684                        dedup_threshold: None,
4685                        event_time: None,
4686                        event_duration_seconds: None,
4687                        trigger_pattern: None,
4688                        summary_of_id: None,
4689                            media_url: None,
4690                    },
4691                )?;
4692
4693                // Create duplicate with allow mode (default)
4694                let memory2 = create_memory(
4695                    conn,
4696                    &CreateMemoryInput {
4697                        content: "Allow duplicates content".to_string(), // Same content
4698                        memory_type: MemoryType::Note,
4699                        tags: vec![],
4700                        metadata: HashMap::new(),
4701                        importance: None,
4702                        scope: Default::default(),
4703                        workspace: None,
4704                        tier: Default::default(),
4705                        defer_embedding: true,
4706                        ttl_seconds: None,
4707                        dedup_mode: DedupMode::Allow,
4708                        dedup_threshold: None,
4709                        event_time: None,
4710                        event_duration_seconds: None,
4711                        trigger_pattern: None,
4712                        summary_of_id: None,
4713                            media_url: None,
4714                    },
4715                )?;
4716
4717                // Should create separate memory
4718                assert_ne!(memory1.id, memory2.id);
4719
4720                // Both memories should exist
4721                let all = list_memories(conn, &ListOptions::default())?;
4722                assert_eq!(all.len(), 2);
4723
4724                // Both should have same content hash
4725                assert_eq!(memory1.content_hash, memory2.content_hash);
4726
4727                Ok(())
4728            })
4729            .unwrap();
4730    }
4731
4732    #[test]
4733    fn test_find_duplicates_exact_hash() {
4734        use crate::types::DedupMode;
4735
4736        let storage = Storage::open_in_memory().unwrap();
4737
4738        storage
4739            .with_transaction(|conn| {
4740                // Create two memories with same content (exact hash duplicates)
4741                let _memory1 = create_memory(
4742                    conn,
4743                    &CreateMemoryInput {
4744                        content: "Duplicate content".to_string(),
4745                        memory_type: MemoryType::Note,
4746                        tags: vec!["first".to_string()],
4747                        metadata: HashMap::new(),
4748                        importance: None,
4749                        scope: Default::default(),
4750                        workspace: None,
4751                        tier: Default::default(),
4752                        defer_embedding: true,
4753                        ttl_seconds: None,
4754                        dedup_mode: DedupMode::Allow,
4755                        dedup_threshold: None,
4756                        event_time: None,
4757                        event_duration_seconds: None,
4758                        trigger_pattern: None,
4759                        summary_of_id: None,
4760                            media_url: None,
4761                    },
4762                )?;
4763
4764                let _memory2 = create_memory(
4765                    conn,
4766                    &CreateMemoryInput {
4767                        content: "Duplicate content".to_string(), // Same content
4768                        memory_type: MemoryType::Note,
4769                        tags: vec!["second".to_string()],
4770                        metadata: HashMap::new(),
4771                        importance: None,
4772                        scope: Default::default(),
4773                        workspace: None,
4774                        tier: Default::default(),
4775                        defer_embedding: true,
4776                        ttl_seconds: None,
4777                        dedup_mode: DedupMode::Allow,
4778                        dedup_threshold: None,
4779                        event_time: None,
4780                        event_duration_seconds: None,
4781                        trigger_pattern: None,
4782                        summary_of_id: None,
4783                            media_url: None,
4784                    },
4785                )?;
4786
4787                // Create a unique memory (not a duplicate)
4788                let _memory3 = create_memory(
4789                    conn,
4790                    &CreateMemoryInput {
4791                        content: "Unique content".to_string(),
4792                        memory_type: MemoryType::Note,
4793                        tags: vec![],
4794                        metadata: HashMap::new(),
4795                        importance: None,
4796                        scope: Default::default(),
4797                        workspace: None,
4798                        tier: Default::default(),
4799                        defer_embedding: true,
4800                        ttl_seconds: None,
4801                        dedup_mode: DedupMode::Allow,
4802                        dedup_threshold: None,
4803                        event_time: None,
4804                        event_duration_seconds: None,
4805                        trigger_pattern: None,
4806                        summary_of_id: None,
4807                            media_url: None,
4808                    },
4809                )?;
4810
4811                // Find duplicates
4812                let duplicates = find_duplicates(conn, 0.9)?;
4813
4814                // Should find one duplicate pair
4815                assert_eq!(duplicates.len(), 1);
4816
4817                // Should be exact hash match
4818                assert_eq!(duplicates[0].match_type, DuplicateMatchType::ExactHash);
4819                assert!((duplicates[0].similarity_score - 1.0).abs() < 0.01);
4820
4821                Ok(())
4822            })
4823            .unwrap();
4824    }
4825
4826    #[test]
4827    fn test_content_hash_stored_on_create() {
4828        let storage = Storage::open_in_memory().unwrap();
4829
4830        storage
4831            .with_transaction(|conn| {
4832                let memory = create_memory(
4833                    conn,
4834                    &CreateMemoryInput {
4835                        content: "Test content for hash".to_string(),
4836                        memory_type: MemoryType::Note,
4837                        tags: vec![],
4838                        metadata: HashMap::new(),
4839                        importance: None,
4840                        scope: Default::default(),
4841                        workspace: None,
4842                        tier: Default::default(),
4843                        defer_embedding: true,
4844                        ttl_seconds: None,
4845                        dedup_mode: Default::default(),
4846                        dedup_threshold: None,
4847                        event_time: None,
4848                        event_duration_seconds: None,
4849                        trigger_pattern: None,
4850                        summary_of_id: None,
4851                            media_url: None,
4852                    },
4853                )?;
4854
4855                // Content hash should be set
4856                assert!(memory.content_hash.is_some());
4857                let hash = memory.content_hash.as_ref().unwrap();
4858                assert!(hash.starts_with("sha256:"));
4859
4860                // Fetch from DB and verify hash is persisted
4861                let fetched = get_memory(conn, memory.id)?;
4862                assert_eq!(fetched.content_hash, memory.content_hash);
4863
4864                Ok(())
4865            })
4866            .unwrap();
4867    }
4868
4869    #[test]
4870    fn test_update_memory_recalculates_hash() {
4871        let storage = Storage::open_in_memory().unwrap();
4872
4873        storage
4874            .with_transaction(|conn| {
4875                // Create a memory
4876                let memory = create_memory(
4877                    conn,
4878                    &CreateMemoryInput {
4879                        content: "Original content".to_string(),
4880                        memory_type: MemoryType::Note,
4881                        tags: vec![],
4882                        metadata: HashMap::new(),
4883                        importance: None,
4884                        scope: Default::default(),
4885                        workspace: None,
4886                        tier: Default::default(),
4887                        defer_embedding: true,
4888                        ttl_seconds: None,
4889                        dedup_mode: Default::default(),
4890                        dedup_threshold: None,
4891                        event_time: None,
4892                        event_duration_seconds: None,
4893                        trigger_pattern: None,
4894                        summary_of_id: None,
4895                            media_url: None,
4896                    },
4897                )?;
4898
4899                let original_hash = memory.content_hash.clone();
4900
4901                // Update the content
4902                let updated = update_memory(
4903                    conn,
4904                    memory.id,
4905                    &UpdateMemoryInput {
4906                        content: Some("Updated content".to_string()),
4907                        memory_type: None,
4908                        tags: None,
4909                        metadata: None,
4910                        importance: None,
4911                        scope: None,
4912                        ttl_seconds: None,
4913                        event_time: None,
4914                        trigger_pattern: None,
4915                        media_url: None,
4916                    },
4917                )?;
4918
4919                // Hash should be different
4920                assert_ne!(updated.content_hash, original_hash);
4921                assert!(updated.content_hash.is_some());
4922
4923                // Verify against expected hash
4924                let expected_hash = compute_content_hash("Updated content");
4925                assert_eq!(updated.content_hash.as_ref().unwrap(), &expected_hash);
4926
4927                Ok(())
4928            })
4929            .unwrap();
4930    }
4931
4932    #[test]
4933    fn test_dedup_scope_isolation() {
4934        use crate::types::{DedupMode, MemoryScope};
4935
4936        let storage = Storage::open_in_memory().unwrap();
4937
4938        storage
4939            .with_transaction(|conn| {
4940                // Create memory in user-1 scope
4941                let _user1_memory = create_memory(
4942                    conn,
4943                    &CreateMemoryInput {
4944                        content: "Shared content".to_string(),
4945                        memory_type: MemoryType::Note,
4946                        tags: vec!["user1".to_string()],
4947                        metadata: HashMap::new(),
4948                        importance: None,
4949                        scope: MemoryScope::user("user-1"),
4950                        workspace: None,
4951                        tier: Default::default(),
4952                        defer_embedding: true,
4953                        ttl_seconds: None,
4954                        dedup_mode: DedupMode::Allow,
4955                        dedup_threshold: None,
4956                        event_time: None,
4957                        event_duration_seconds: None,
4958                        trigger_pattern: None,
4959                        summary_of_id: None,
4960                            media_url: None,
4961                    },
4962                )?;
4963
4964                // Create same content in user-2 scope with Reject mode
4965                // Should succeed because scopes are different
4966                let user2_result = create_memory(
4967                    conn,
4968                    &CreateMemoryInput {
4969                        content: "Shared content".to_string(), // Same content!
4970                        memory_type: MemoryType::Note,
4971                        tags: vec!["user2".to_string()],
4972                        metadata: HashMap::new(),
4973                        importance: None,
4974                        scope: MemoryScope::user("user-2"), // Different scope
4975                        workspace: None,
4976                        tier: Default::default(),
4977                        defer_embedding: true,
4978                        ttl_seconds: None,
4979                        dedup_mode: DedupMode::Reject, // Should not reject - different scope
4980                        dedup_threshold: None,
4981                        event_time: None,
4982                        event_duration_seconds: None,
4983                        trigger_pattern: None,
4984                        summary_of_id: None,
4985                            media_url: None,
4986                    },
4987                );
4988
4989                // Should succeed - different scopes are not considered duplicates
4990                assert!(user2_result.is_ok());
4991                let _user2_memory = user2_result.unwrap();
4992
4993                // Now try to create duplicate in same scope (user-2)
4994                let duplicate_result = create_memory(
4995                    conn,
4996                    &CreateMemoryInput {
4997                        content: "Shared content".to_string(), // Same content
4998                        memory_type: MemoryType::Note,
4999                        tags: vec![],
5000                        metadata: HashMap::new(),
5001                        importance: None,
5002                        scope: MemoryScope::user("user-2"), // Same scope as user2_memory
5003                        workspace: None,
5004                        tier: Default::default(),
5005                        defer_embedding: true,
5006                        ttl_seconds: None,
5007                        dedup_mode: DedupMode::Reject, // Should reject - same scope
5008                        dedup_threshold: None,
5009                        event_time: None,
5010                        event_duration_seconds: None,
5011                        trigger_pattern: None,
5012                        summary_of_id: None,
5013                            media_url: None,
5014                    },
5015                );
5016
5017                // Should fail - same scope with same content
5018                assert!(duplicate_result.is_err());
5019                assert!(matches!(
5020                    duplicate_result.unwrap_err(),
5021                    crate::error::EngramError::Duplicate { .. }
5022                ));
5023
5024                // Verify we have exactly 2 memories (one per user)
5025                let all = list_memories(conn, &ListOptions::default())?;
5026                assert_eq!(all.len(), 2);
5027
5028                Ok(())
5029            })
5030            .unwrap();
5031    }
5032
5033    #[test]
5034    fn test_find_similar_by_embedding() {
5035        // Helper to store embedding (convert f32 vec to bytes for SQLite)
5036        fn store_test_embedding(
5037            conn: &Connection,
5038            memory_id: i64,
5039            embedding: &[f32],
5040        ) -> crate::error::Result<()> {
5041            let bytes: Vec<u8> = embedding.iter().flat_map(|f| f.to_le_bytes()).collect();
5042            conn.execute(
5043                "INSERT INTO embeddings (memory_id, embedding, model, dimensions, created_at)
5044                 VALUES (?, ?, ?, ?, datetime('now'))",
5045                params![memory_id, bytes, "test", embedding.len() as i32],
5046            )?;
5047            // Mark memory as having embedding
5048            conn.execute(
5049                "UPDATE memories SET has_embedding = 1 WHERE id = ?",
5050                params![memory_id],
5051            )?;
5052            Ok(())
5053        }
5054
5055        let storage = Storage::open_in_memory().unwrap();
5056        storage
5057            .with_transaction(|conn| {
5058                // Create a memory with an embedding
5059                let memory1 = create_memory(
5060                    conn,
5061                    &CreateMemoryInput {
5062                        content: "Rust is a systems programming language".to_string(),
5063                        memory_type: MemoryType::Note,
5064                        tags: vec!["rust".to_string()],
5065                        metadata: std::collections::HashMap::new(),
5066                        importance: None,
5067                        scope: MemoryScope::Global,
5068                        workspace: None,
5069                        tier: Default::default(),
5070                        defer_embedding: false,
5071                        ttl_seconds: None,
5072                        dedup_mode: DedupMode::Allow,
5073                        dedup_threshold: None,
5074                        event_time: None,
5075                        event_duration_seconds: None,
5076                        trigger_pattern: None,
5077                        summary_of_id: None,
5078                            media_url: None,
5079                    },
5080                )?;
5081
5082                // Store an embedding for it (simple test embedding)
5083                let embedding1 = vec![0.8, 0.4, 0.2, 0.1]; // Normalized-ish vector
5084                store_test_embedding(conn, memory1.id, &embedding1)?;
5085
5086                // Create another memory with different embedding
5087                let memory2 = create_memory(
5088                    conn,
5089                    &CreateMemoryInput {
5090                        content: "Python is a scripting language".to_string(),
5091                        memory_type: MemoryType::Note,
5092                        tags: vec!["python".to_string()],
5093                        metadata: std::collections::HashMap::new(),
5094                        importance: None,
5095                        scope: MemoryScope::Global,
5096                        workspace: None,
5097                        tier: Default::default(),
5098                        defer_embedding: false,
5099                        ttl_seconds: None,
5100                        dedup_mode: DedupMode::Allow,
5101                        dedup_threshold: None,
5102                        event_time: None,
5103                        event_duration_seconds: None,
5104                        trigger_pattern: None,
5105                        summary_of_id: None,
5106                            media_url: None,
5107                    },
5108                )?;
5109
5110                // Store a very different embedding
5111                let embedding2 = vec![0.1, 0.2, 0.8, 0.4]; // Different direction
5112                store_test_embedding(conn, memory2.id, &embedding2)?;
5113
5114                // Test 1: Query with embedding similar to memory1
5115                let query_similar_to_1 = vec![0.79, 0.41, 0.21, 0.11]; // Very similar to embedding1
5116                let result = find_similar_by_embedding(
5117                    conn,
5118                    &query_similar_to_1,
5119                    &MemoryScope::Global,
5120                    None, // default workspace
5121                    0.95, // High threshold
5122                )?;
5123                assert!(result.is_some());
5124                let (found_memory, similarity) = result.unwrap();
5125                assert_eq!(found_memory.id, memory1.id);
5126                assert!(similarity > 0.95);
5127
5128                // Test 2: Query with low threshold should still find memory1
5129                let result_low_threshold = find_similar_by_embedding(
5130                    conn,
5131                    &query_similar_to_1,
5132                    &MemoryScope::Global,
5133                    None,
5134                    0.5,
5135                )?;
5136                assert!(result_low_threshold.is_some());
5137
5138                // Test 3: Query with embedding not similar to anything (threshold too high)
5139                let query_orthogonal = vec![0.0, 0.0, 0.0, 1.0]; // Different direction
5140                let result_no_match = find_similar_by_embedding(
5141                    conn,
5142                    &query_orthogonal,
5143                    &MemoryScope::Global,
5144                    None,
5145                    0.99, // Very high threshold
5146                )?;
5147                assert!(result_no_match.is_none());
5148
5149                // Test 4: Different scope should not find anything
5150                let result_wrong_scope = find_similar_by_embedding(
5151                    conn,
5152                    &query_similar_to_1,
5153                    &MemoryScope::User {
5154                        user_id: "other-user".to_string(),
5155                    },
5156                    None,
5157                    0.5,
5158                )?;
5159                assert!(result_wrong_scope.is_none());
5160
5161                Ok(())
5162            })
5163            .unwrap();
5164    }
5165
5166    // ── T1: Multimodal MemoryType tests ──────────────────────────────────────
5167
5168    fn open_test_storage() -> crate::storage::Storage {
5169        crate::storage::Storage::open_in_memory().expect("in-memory storage")
5170    }
5171
5172    #[test]
5173    fn test_create_image_memory_with_media_url() {
5174        let storage = open_test_storage();
5175        let memory = storage
5176            .with_transaction(|conn| {
5177                create_memory(
5178                    conn,
5179                    &CreateMemoryInput {
5180                        content: "A screenshot of the dashboard".to_string(),
5181                        memory_type: MemoryType::Image,
5182                        media_url: Some("local:///tmp/dashboard.png".to_string()),
5183                        ..Default::default()
5184                    },
5185                )
5186            })
5187            .expect("create image memory");
5188        assert_eq!(memory.memory_type, MemoryType::Image);
5189        assert_eq!(
5190            memory.media_url.as_deref(),
5191            Some("local:///tmp/dashboard.png")
5192        );
5193    }
5194
5195    #[test]
5196    fn test_create_audio_memory_with_media_url() {
5197        let storage = open_test_storage();
5198        let memory = storage
5199            .with_transaction(|conn| {
5200                create_memory(
5201                    conn,
5202                    &CreateMemoryInput {
5203                        content: "Meeting recording transcript".to_string(),
5204                        memory_type: MemoryType::Audio,
5205                        media_url: Some("local:///tmp/meeting.mp3".to_string()),
5206                        ..Default::default()
5207                    },
5208                )
5209            })
5210            .expect("create audio memory");
5211        assert_eq!(memory.memory_type, MemoryType::Audio);
5212        assert_eq!(
5213            memory.media_url.as_deref(),
5214            Some("local:///tmp/meeting.mp3")
5215        );
5216    }
5217
5218    #[test]
5219    fn test_create_video_memory_with_media_url() {
5220        let storage = open_test_storage();
5221        let memory = storage
5222            .with_transaction(|conn| {
5223                create_memory(
5224                    conn,
5225                    &CreateMemoryInput {
5226                        content: "Keynote presentation video".to_string(),
5227                        memory_type: MemoryType::Video,
5228                        media_url: Some("https://cdn.example.com/keynote.mp4".to_string()),
5229                        ..Default::default()
5230                    },
5231                )
5232            })
5233            .expect("create video memory");
5234        assert_eq!(memory.memory_type, MemoryType::Video);
5235        assert_eq!(
5236            memory.media_url.as_deref(),
5237            Some("https://cdn.example.com/keynote.mp4")
5238        );
5239    }
5240
5241    #[test]
5242    fn test_get_memory_returns_media_url() {
5243        let storage = open_test_storage();
5244        let created = storage
5245            .with_transaction(|conn| {
5246                create_memory(
5247                    conn,
5248                    &CreateMemoryInput {
5249                        content: "Image with URL".to_string(),
5250                        memory_type: MemoryType::Image,
5251                        media_url: Some("local:///tmp/image.png".to_string()),
5252                        ..Default::default()
5253                    },
5254                )
5255            })
5256            .expect("create");
5257        let fetched = storage
5258            .with_connection(|conn| get_memory(conn, created.id))
5259            .expect("get");
5260        assert_eq!(fetched.media_url, created.media_url);
5261    }
5262
5263    #[test]
5264    fn test_create_image_memory_without_media_url() {
5265        let storage = open_test_storage();
5266        let memory = storage
5267            .with_transaction(|conn| {
5268                create_memory(
5269                    conn,
5270                    &CreateMemoryInput {
5271                        content: "Image described in text only".to_string(),
5272                        memory_type: MemoryType::Image,
5273                        media_url: None,
5274                        ..Default::default()
5275                    },
5276                )
5277            })
5278            .expect("create image memory without media_url");
5279        assert_eq!(memory.memory_type, MemoryType::Image);
5280        assert!(memory.media_url.is_none());
5281    }
5282
5283    #[test]
5284    fn test_update_memory_sets_media_url() {
5285        let storage = open_test_storage();
5286        let created = storage
5287            .with_transaction(|conn| {
5288                create_memory(
5289                    conn,
5290                    &CreateMemoryInput {
5291                        content: "Image memory".to_string(),
5292                        memory_type: MemoryType::Image,
5293                        ..Default::default()
5294                    },
5295                )
5296            })
5297            .expect("create");
5298        let updated = storage
5299            .with_transaction(|conn| {
5300                update_memory(
5301                    conn,
5302                    created.id,
5303                    &UpdateMemoryInput {
5304                        media_url: Some(Some("local:///tmp/updated.png".to_string())),
5305                        content: None,
5306                        memory_type: None,
5307                        tags: None,
5308                        metadata: None,
5309                        importance: None,
5310                        scope: None,
5311                        ttl_seconds: None,
5312                        event_time: None,
5313                        trigger_pattern: None,
5314                    },
5315                )
5316            })
5317            .expect("update");
5318        assert_eq!(
5319            updated.media_url.as_deref(),
5320            Some("local:///tmp/updated.png")
5321        );
5322    }
5323
5324    #[test]
5325    fn test_memory_type_is_multimodal() {
5326        assert!(MemoryType::Image.is_multimodal());
5327        assert!(MemoryType::Audio.is_multimodal());
5328        assert!(MemoryType::Video.is_multimodal());
5329        assert!(!MemoryType::Note.is_multimodal());
5330        assert!(!MemoryType::Episodic.is_multimodal());
5331    }
5332
5333    #[test]
5334    fn test_schema_migration_v34_idempotent() {
5335        use crate::storage::migrations::run_migrations;
5336        let conn =
5337            rusqlite::Connection::open_in_memory().expect("in-memory db");
5338        run_migrations(&conn).expect("run migrations");
5339        // Running again should be a no-op
5340        run_migrations(&conn).expect("idempotent second run");
5341        let version: i32 = conn
5342            .query_row(
5343                "SELECT COALESCE(MAX(version), 0) FROM schema_version",
5344                [],
5345                |row| row.get(0),
5346            )
5347            .expect("query version");
5348        assert_eq!(version, 34);
5349    }
5350}