Skip to main content

mnemo_core/storage/
duckdb.rs

1use std::path::Path;
2use std::sync::Arc;
3use tokio::sync::Mutex;
4
5use crate::error::{Error, Result};
6use crate::model::acl::{Acl, Permission};
7use crate::model::agent_profile::AgentProfile;
8use crate::model::checkpoint::Checkpoint;
9use crate::model::delegation::{Delegation, DelegationScope};
10use crate::model::embedding_baseline::EmbeddingBaseline;
11use crate::model::event::AgentEvent;
12use crate::model::memory::MemoryRecord;
13use crate::model::relation::Relation;
14use crate::storage::{MemoryFilter, StorageBackend};
15use uuid::Uuid;
16
17pub struct DuckDbStorage {
18    conn: Arc<Mutex<duckdb::Connection>>,
19}
20
21impl DuckDbStorage {
22    pub fn open(path: &Path) -> Result<Self> {
23        let conn = duckdb::Connection::open(path)?;
24        super::migrations::run_migrations(&conn)?;
25        Ok(Self {
26            conn: Arc::new(Mutex::new(conn)),
27        })
28    }
29
30    pub fn open_in_memory() -> Result<Self> {
31        let conn = duckdb::Connection::open_in_memory()?;
32        super::migrations::run_migrations(&conn)?;
33        Ok(Self {
34            conn: Arc::new(Mutex::new(conn)),
35        })
36    }
37}
38
39fn serialize_embedding(embedding: &Option<Vec<f32>>) -> Option<Vec<u8>> {
40    embedding
41        .as_ref()
42        .map(|v| v.iter().flat_map(|f| f.to_le_bytes()).collect())
43}
44
45fn deserialize_embedding(blob: Option<Vec<u8>>) -> Option<Vec<f32>> {
46    blob.map(|bytes| {
47        bytes
48            .chunks_exact(4)
49            .map(|chunk| f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]))
50            .collect()
51    })
52}
53
54fn row_to_memory(row: &duckdb::Row<'_>) -> duckdb::Result<MemoryRecord> {
55    let id_str: String = row.get(0)?;
56    let tags_json: Option<String> = row.get(6)?;
57    let metadata_json: Option<String> = row.get(7)?;
58    let embedding_blob: Option<Vec<u8>> = row.get(8)?;
59    let content_hash: Vec<u8> = row.get(9)?;
60    let prev_hash: Option<Vec<u8>> = row.get(10)?;
61
62    let memory_type_str: String = row.get(3)?;
63    let scope_str: String = row.get(4)?;
64    let source_type_str: String = row.get(11)?;
65    let consolidation_state_str: String = row.get(13)?;
66
67    Ok(MemoryRecord {
68        id: Uuid::parse_str(&id_str)
69            .map_err(|e| duckdb::Error::FromSqlConversionFailure(0, duckdb::types::Type::Text, Box::new(e)))?,
70        agent_id: row.get(1)?,
71        content: row.get(2)?,
72        memory_type: memory_type_str.parse()
73            .map_err(|e: Error| duckdb::Error::FromSqlConversionFailure(3, duckdb::types::Type::Text, e.to_string().into()))?,
74        scope: scope_str.parse()
75            .map_err(|e: Error| duckdb::Error::FromSqlConversionFailure(4, duckdb::types::Type::Text, e.to_string().into()))?,
76        importance: row.get(5)?,
77        tags: match tags_json {
78            Some(ref s) => serde_json::from_str(s).unwrap_or_else(|e| {
79                tracing::warn!(id = %id_str, error = %e, raw = %s, "corrupted tags JSON, defaulting to empty");
80                vec![]
81            }),
82            None => vec![],
83        },
84        metadata: match metadata_json {
85            Some(ref s) => serde_json::from_str(s).unwrap_or_else(|e| {
86                tracing::warn!(id = %id_str, error = %e, "corrupted metadata JSON, defaulting to empty");
87                serde_json::Value::Object(serde_json::Map::new())
88            }),
89            None => serde_json::Value::Object(serde_json::Map::new()),
90        },
91        embedding: deserialize_embedding(embedding_blob),
92        content_hash,
93        prev_hash,
94        source_type: source_type_str.parse()
95            .map_err(|e: Error| duckdb::Error::FromSqlConversionFailure(11, duckdb::types::Type::Text, e.to_string().into()))?,
96        source_id: row.get(12)?,
97        consolidation_state: consolidation_state_str.parse()
98            .map_err(|e: Error| duckdb::Error::FromSqlConversionFailure(13, duckdb::types::Type::Text, e.to_string().into()))?,
99        access_count: u64::try_from(row.get::<_, i64>(14)?).unwrap_or(0),
100        org_id: row.get(15)?,
101        thread_id: row.get(16)?,
102        created_at: row.get(17)?,
103        updated_at: row.get(18)?,
104        last_accessed_at: row.get(19)?,
105        expires_at: row.get(20)?,
106        deleted_at: row.get(21)?,
107        decay_rate: row.get(22)?,
108        created_by: row.get(23)?,
109        version: u32::try_from(row.get::<_, i32>(24)?).unwrap_or(1),
110        prev_version_id: match row.get::<_, Option<String>>(25)? {
111            Some(s) => Uuid::parse_str(&s).map_err(|e| {
112                tracing::warn!(memory_id = %id_str, error = %e, "corrupted prev_version_id UUID");
113                e
114            }).ok(),
115            None => None,
116        },
117        quarantined: row.get::<_, bool>(26)?,
118        quarantine_reason: row.get(27)?,
119        decay_function: row.get(28).unwrap_or(None),
120    })
121}
122
123#[async_trait::async_trait]
124impl StorageBackend for DuckDbStorage {
125    async fn insert_memory(&self, record: &MemoryRecord) -> Result<()> {
126        let conn = self.conn.lock().await;
127        let tags_json = serde_json::to_string(&record.tags)?;
128        let metadata_json = serde_json::to_string(&record.metadata)?;
129        let embedding_blob = serialize_embedding(&record.embedding);
130
131        conn.execute(
132            "INSERT INTO memories (id, agent_id, content, memory_type, scope, importance, tags, metadata, embedding, content_hash, prev_hash, source_type, source_id, consolidation_state, access_count, org_id, thread_id, created_at, updated_at, last_accessed_at, expires_at, deleted_at, decay_rate, created_by, version, prev_version_id, quarantined, quarantine_reason, decay_function) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
133            duckdb::params![
134                record.id.to_string(),
135                record.agent_id,
136                record.content,
137                record.memory_type.to_string(),
138                record.scope.to_string(),
139                record.importance,
140                tags_json,
141                metadata_json,
142                embedding_blob,
143                record.content_hash,
144                record.prev_hash,
145                record.source_type.to_string(),
146                record.source_id,
147                record.consolidation_state.to_string(),
148                record.access_count as i64,
149                record.org_id,
150                record.thread_id,
151                record.created_at,
152                record.updated_at,
153                record.last_accessed_at,
154                record.expires_at,
155                record.deleted_at,
156                record.decay_rate,
157                record.created_by,
158                record.version as i32,
159                record.prev_version_id.map(|id| id.to_string()),
160                record.quarantined,
161                record.quarantine_reason,
162                record.decay_function,
163            ],
164        )?;
165        Ok(())
166    }
167
168    async fn get_memory(&self, id: Uuid) -> Result<Option<MemoryRecord>> {
169        let conn = self.conn.lock().await;
170        let mut stmt = conn.prepare(
171            "SELECT id, agent_id, content, memory_type, scope, importance, tags, metadata, embedding, content_hash, prev_hash, source_type, source_id, consolidation_state, access_count, org_id, thread_id, created_at, updated_at, last_accessed_at, expires_at, deleted_at, decay_rate, created_by, version, prev_version_id, quarantined, quarantine_reason, decay_function FROM memories WHERE id = ?",
172        )?;
173        let result = stmt.query_row([id.to_string()], row_to_memory);
174        match result {
175            Ok(record) => Ok(Some(record)),
176            Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
177            Err(e) => Err(Error::Storage(e.to_string())),
178        }
179    }
180
181    async fn update_memory(&self, record: &MemoryRecord) -> Result<()> {
182        let conn = self.conn.lock().await;
183        let tags_json = serde_json::to_string(&record.tags)?;
184        let metadata_json = serde_json::to_string(&record.metadata)?;
185        let embedding_blob = serialize_embedding(&record.embedding);
186
187        let affected = conn.execute(
188            "UPDATE memories SET agent_id=?, content=?, memory_type=?, scope=?, importance=?, tags=?, metadata=?, embedding=?, content_hash=?, prev_hash=?, source_type=?, source_id=?, consolidation_state=?, access_count=?, org_id=?, thread_id=?, updated_at=?, last_accessed_at=?, expires_at=?, deleted_at=?, decay_rate=?, created_by=?, version=?, prev_version_id=?, quarantined=?, quarantine_reason=?, decay_function=? WHERE id=?",
189            duckdb::params![
190                record.agent_id,
191                record.content,
192                record.memory_type.to_string(),
193                record.scope.to_string(),
194                record.importance,
195                tags_json,
196                metadata_json,
197                embedding_blob,
198                record.content_hash,
199                record.prev_hash,
200                record.source_type.to_string(),
201                record.source_id,
202                record.consolidation_state.to_string(),
203                record.access_count as i64,
204                record.org_id,
205                record.thread_id,
206                record.updated_at,
207                record.last_accessed_at,
208                record.expires_at,
209                record.deleted_at,
210                record.decay_rate,
211                record.created_by,
212                record.version as i32,
213                record.prev_version_id.map(|id| id.to_string()),
214                record.quarantined,
215                record.quarantine_reason,
216                record.decay_function,
217                record.id.to_string(),
218            ],
219        )?;
220        if affected == 0 {
221            return Err(Error::NotFound(format!("memory {} not found", record.id)));
222        }
223        Ok(())
224    }
225
226    async fn soft_delete_memory(&self, id: Uuid) -> Result<()> {
227        let conn = self.conn.lock().await;
228        let now = chrono::Utc::now().to_rfc3339();
229        let affected = conn.execute(
230            "UPDATE memories SET deleted_at = ?, updated_at = ? WHERE id = ? AND deleted_at IS NULL",
231            duckdb::params![now, now, id.to_string()],
232        )?;
233        if affected == 0 {
234            return Err(Error::NotFound(format!(
235                "memory {id} not found or already deleted"
236            )));
237        }
238        Ok(())
239    }
240
241    async fn hard_delete_memory(&self, id: Uuid) -> Result<()> {
242        let conn = self.conn.lock().await;
243        let affected = conn.execute(
244            "DELETE FROM memories WHERE id = ?",
245            duckdb::params![id.to_string()],
246        )?;
247        if affected == 0 {
248            return Err(Error::NotFound(format!("memory {id} not found")));
249        }
250        // Also clean up ACLs
251        conn.execute(
252            "DELETE FROM acls WHERE memory_id = ?",
253            duckdb::params![id.to_string()],
254        )?;
255        Ok(())
256    }
257
258    async fn list_memories(
259        &self,
260        filter: &MemoryFilter,
261        limit: usize,
262        offset: usize,
263    ) -> Result<Vec<MemoryRecord>> {
264        let conn = self.conn.lock().await;
265        let mut conditions = Vec::new();
266        let mut params: Vec<Box<dyn duckdb::ToSql>> = Vec::new();
267
268        if !filter.include_deleted {
269            conditions.push("deleted_at IS NULL".to_string());
270        }
271
272        if let Some(ref agent_id) = filter.agent_id {
273            conditions.push(format!("agent_id = ${}", params.len() + 1));
274            params.push(Box::new(agent_id.clone()));
275        }
276
277        if let Some(memory_type) = filter.memory_type {
278            conditions.push(format!("memory_type = ${}", params.len() + 1));
279            params.push(Box::new(memory_type.to_string()));
280        }
281
282        if let Some(scope) = filter.scope {
283            conditions.push(format!("scope = ${}", params.len() + 1));
284            params.push(Box::new(scope.to_string()));
285        }
286
287        if let Some(min_importance) = filter.min_importance {
288            conditions.push(format!("importance >= ${}", params.len() + 1));
289            params.push(Box::new(min_importance));
290        }
291
292        if let Some(ref org_id) = filter.org_id {
293            conditions.push(format!("org_id = ${}", params.len() + 1));
294            params.push(Box::new(org_id.clone()));
295        }
296
297        if let Some(ref thread_id) = filter.thread_id {
298            conditions.push(format!("thread_id = ${}", params.len() + 1));
299            params.push(Box::new(thread_id.clone()));
300        }
301
302        let where_clause = if conditions.is_empty() {
303            String::new()
304        } else {
305            format!("WHERE {}", conditions.join(" AND "))
306        };
307
308        let sql = format!(
309            "SELECT id, agent_id, content, memory_type, scope, importance, tags, metadata, embedding, content_hash, prev_hash, source_type, source_id, consolidation_state, access_count, org_id, thread_id, created_at, updated_at, last_accessed_at, expires_at, deleted_at, decay_rate, created_by, version, prev_version_id, quarantined, quarantine_reason, decay_function FROM memories {where_clause} ORDER BY created_at DESC LIMIT {limit} OFFSET {offset}"
310        );
311
312        let mut stmt = conn.prepare(&sql)?;
313        let param_refs: Vec<&dyn duckdb::ToSql> = params.iter().map(|p| p.as_ref()).collect();
314        let rows = stmt.query_map(param_refs.as_slice(), row_to_memory)?;
315
316        let mut results = Vec::new();
317        for row in rows {
318            results.push(row.map_err(|e| Error::Storage(e.to_string()))?);
319        }
320        Ok(results)
321    }
322
323    async fn touch_memory(&self, id: Uuid) -> Result<()> {
324        let conn = self.conn.lock().await;
325        let now = chrono::Utc::now().to_rfc3339();
326        conn.execute(
327            "UPDATE memories SET access_count = access_count + 1, last_accessed_at = ? WHERE id = ?",
328            duckdb::params![now, id.to_string()],
329        )?;
330        Ok(())
331    }
332
333    async fn insert_acl(&self, acl: &Acl) -> Result<()> {
334        let conn = self.conn.lock().await;
335        conn.execute(
336            "INSERT INTO acls (id, memory_id, principal_type, principal_id, permission, granted_by, created_at, expires_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
337            duckdb::params![
338                acl.id.to_string(),
339                acl.memory_id.to_string(),
340                acl.principal_type.to_string(),
341                acl.principal_id,
342                acl.permission.to_string(),
343                acl.granted_by,
344                acl.created_at,
345                acl.expires_at,
346            ],
347        )?;
348        Ok(())
349    }
350
351    async fn check_permission(
352        &self,
353        memory_id: Uuid,
354        principal_id: &str,
355        required: Permission,
356    ) -> Result<bool> {
357        // Do all DuckDB work in one block, then release the lock before delegation check
358        let acl_result = {
359            let conn = self.conn.lock().await;
360
361            // Check if the principal is the owner (agent_id matches)
362            let mut stmt = conn.prepare("SELECT agent_id FROM memories WHERE id = ?")?;
363            let owner_result =
364                stmt.query_row([memory_id.to_string()], |row| row.get::<_, String>(0));
365            match owner_result {
366                Ok(owner) if owner == principal_id => return Ok(true),
367                Err(duckdb::Error::QueryReturnedNoRows) => {
368                    return Err(Error::NotFound(format!("memory {memory_id} not found")));
369                }
370                _ => {}
371            }
372
373            // Check ACLs
374            let now = chrono::Utc::now().to_rfc3339();
375            let mut stmt = conn.prepare(
376                "SELECT permission FROM acls WHERE memory_id = ? AND principal_id = ? AND (expires_at IS NULL OR expires_at > ?)",
377            )?;
378            let rows = stmt.query_map(
379                duckdb::params![memory_id.to_string(), principal_id, now.clone()],
380                |row| row.get::<_, String>(0),
381            )?;
382
383            let mut perms: Vec<String> = Vec::new();
384            for row in rows {
385                perms.push(row.map_err(|e| Error::Storage(e.to_string()))?);
386            }
387
388            // Check public ACLs
389            let mut stmt = conn.prepare(
390                "SELECT permission FROM acls WHERE memory_id = ? AND principal_type = 'public' AND (expires_at IS NULL OR expires_at > ?)",
391            )?;
392            let rows = stmt.query_map(duckdb::params![memory_id.to_string(), now], |row| {
393                row.get::<_, String>(0)
394            })?;
395
396            for row in rows {
397                perms.push(row.map_err(|e| Error::Storage(e.to_string()))?);
398            }
399
400            perms
401        }; // conn lock dropped here
402
403        for perm_str in &acl_result {
404            if let Ok(perm) = perm_str.parse::<Permission>()
405                && perm.satisfies(required)
406            {
407                return Ok(true);
408            }
409        }
410
411        // Check delegations (conn lock is released)
412        if self
413            .check_delegation(principal_id, memory_id, required)
414            .await?
415        {
416            return Ok(true);
417        }
418
419        Ok(false)
420    }
421
422    async fn insert_relation(&self, relation: &Relation) -> Result<()> {
423        let conn = self.conn.lock().await;
424        conn.execute(
425            "INSERT INTO relations (id, source_id, target_id, relation_type, weight, metadata, created_at) VALUES (?, ?, ?, ?, ?, ?, ?)",
426            duckdb::params![
427                relation.id.to_string(),
428                relation.source_id.to_string(),
429                relation.target_id.to_string(),
430                relation.relation_type,
431                relation.weight,
432                serde_json::to_string(&relation.metadata)?,
433                relation.created_at,
434            ],
435        )?;
436        Ok(())
437    }
438
439    async fn get_relations_from(&self, source_id: Uuid) -> Result<Vec<Relation>> {
440        let conn = self.conn.lock().await;
441        let mut stmt = conn.prepare(
442            "SELECT id, source_id, target_id, relation_type, weight, metadata, created_at FROM relations WHERE source_id = ?",
443        )?;
444        let rows = stmt.query_map([source_id.to_string()], row_to_relation)?;
445        let mut results = Vec::new();
446        for row in rows {
447            results.push(row.map_err(|e| Error::Storage(e.to_string()))?);
448        }
449        Ok(results)
450    }
451
452    async fn get_relations_to(&self, target_id: Uuid) -> Result<Vec<Relation>> {
453        let conn = self.conn.lock().await;
454        let mut stmt = conn.prepare(
455            "SELECT id, source_id, target_id, relation_type, weight, metadata, created_at FROM relations WHERE target_id = ?",
456        )?;
457        let rows = stmt.query_map([target_id.to_string()], row_to_relation)?;
458        let mut results = Vec::new();
459        for row in rows {
460            results.push(row.map_err(|e| Error::Storage(e.to_string()))?);
461        }
462        Ok(results)
463    }
464
465    async fn delete_relation(&self, id: Uuid) -> Result<()> {
466        let conn = self.conn.lock().await;
467        let affected = conn.execute(
468            "DELETE FROM relations WHERE id = ?",
469            duckdb::params![id.to_string()],
470        )?;
471        if affected == 0 {
472            return Err(Error::NotFound(format!("relation {id} not found")));
473        }
474        Ok(())
475    }
476
477    async fn get_latest_memory_hash(
478        &self,
479        agent_id: &str,
480        thread_id: Option<&str>,
481    ) -> Result<Option<Vec<u8>>> {
482        let conn = self.conn.lock().await;
483        let (sql, result) = if let Some(tid) = thread_id {
484            let mut stmt = conn.prepare(
485                "SELECT content_hash FROM memories WHERE agent_id = ? AND thread_id = ? AND deleted_at IS NULL ORDER BY created_at DESC LIMIT 1",
486            )?;
487            let r = stmt.query_row(duckdb::params![agent_id, tid], |row| {
488                row.get::<_, Vec<u8>>(0)
489            });
490            ((), r)
491        } else {
492            let mut stmt = conn.prepare(
493                "SELECT content_hash FROM memories WHERE agent_id = ? AND thread_id IS NULL AND deleted_at IS NULL ORDER BY created_at DESC LIMIT 1",
494            )?;
495            let r = stmt.query_row(duckdb::params![agent_id], |row| row.get::<_, Vec<u8>>(0));
496            ((), r)
497        };
498        let _ = sql;
499        match result {
500            Ok(hash) => Ok(Some(hash)),
501            Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
502            Err(e) => Err(Error::Storage(e.to_string())),
503        }
504    }
505
506    async fn get_latest_event_hash(
507        &self,
508        agent_id: &str,
509        thread_id: Option<&str>,
510    ) -> Result<Option<Vec<u8>>> {
511        let conn = self.conn.lock().await;
512        let result = if let Some(tid) = thread_id {
513            let mut stmt = conn.prepare(
514                "SELECT content_hash FROM agent_events WHERE agent_id = ? AND thread_id = ? ORDER BY timestamp DESC LIMIT 1",
515            )?;
516            stmt.query_row(duckdb::params![agent_id, tid], |row| {
517                row.get::<_, Vec<u8>>(0)
518            })
519        } else {
520            let mut stmt = conn.prepare(
521                "SELECT content_hash FROM agent_events WHERE agent_id = ? ORDER BY timestamp DESC LIMIT 1",
522            )?;
523            stmt.query_row(duckdb::params![agent_id], |row| row.get::<_, Vec<u8>>(0))
524        };
525        match result {
526            Ok(hash) => Ok(Some(hash)),
527            Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
528            Err(e) => Err(Error::Storage(e.to_string())),
529        }
530    }
531
532    async fn get_sync_watermark(&self, key: &str) -> Result<Option<String>> {
533        let conn = self.conn.lock().await;
534        let mut stmt = conn.prepare("SELECT value FROM sync_metadata WHERE key = ?")?;
535        let result = stmt.query_row(duckdb::params![key], |row| row.get::<_, String>(0));
536        match result {
537            Ok(value) => Ok(Some(value)),
538            Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
539            Err(e) => Err(Error::Storage(e.to_string())),
540        }
541    }
542
543    async fn set_sync_watermark(&self, key: &str, value: &str) -> Result<()> {
544        let conn = self.conn.lock().await;
545        let now = chrono::Utc::now().to_rfc3339();
546        // Try update first, then insert
547        let affected = conn.execute(
548            "UPDATE sync_metadata SET value = ?, updated_at = ? WHERE key = ?",
549            duckdb::params![value, now, key],
550        )?;
551        if affected == 0 {
552            conn.execute(
553                "INSERT INTO sync_metadata (key, value, updated_at) VALUES (?, ?, ?)",
554                duckdb::params![key, value, now],
555            )?;
556        }
557        Ok(())
558    }
559
560    async fn list_accessible_memory_ids(&self, agent_id: &str, limit: usize) -> Result<Vec<Uuid>> {
561        let conn = self.conn.lock().await;
562        let now = chrono::Utc::now().to_rfc3339();
563        let mut stmt = conn.prepare(
564            "SELECT id FROM memories WHERE (agent_id = ? OR scope = 'public' OR id IN (SELECT memory_id FROM acls WHERE principal_id = ? AND (expires_at IS NULL OR expires_at > ?))) AND deleted_at IS NULL LIMIT ?",
565        )?;
566        let rows = stmt.query_map(
567            duckdb::params![agent_id, agent_id, now, limit as i64],
568            |row| row.get::<_, String>(0),
569        )?;
570        let mut ids = Vec::new();
571        for row in rows {
572            let id_str = row.map_err(|e| Error::Storage(e.to_string()))?;
573            ids.push(Uuid::parse_str(&id_str).map_err(|e| Error::Storage(e.to_string()))?);
574        }
575        Ok(ids)
576    }
577
578    async fn insert_event(&self, event: &AgentEvent) -> Result<()> {
579        let conn = self.conn.lock().await;
580        let payload_json = serde_json::to_string(&event.payload)?;
581        let embedding_blob = serialize_embedding(&event.embedding);
582        conn.execute(
583            "INSERT INTO agent_events (id, agent_id, thread_id, run_id, parent_event_id, event_type, payload, trace_id, span_id, model, tokens_input, tokens_output, latency_ms, cost_usd, timestamp, logical_clock, content_hash, prev_hash, embedding) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
584            duckdb::params![
585                event.id.to_string(),
586                event.agent_id,
587                event.thread_id,
588                event.run_id,
589                event.parent_event_id.map(|id| id.to_string()),
590                event.event_type.to_string(),
591                payload_json,
592                event.trace_id,
593                event.span_id,
594                event.model,
595                event.tokens_input,
596                event.tokens_output,
597                event.latency_ms,
598                event.cost_usd,
599                event.timestamp,
600                event.logical_clock,
601                event.content_hash,
602                event.prev_hash,
603                embedding_blob,
604            ],
605        )?;
606        Ok(())
607    }
608
609    async fn list_events(
610        &self,
611        agent_id: &str,
612        limit: usize,
613        offset: usize,
614    ) -> Result<Vec<AgentEvent>> {
615        let conn = self.conn.lock().await;
616        let mut stmt = conn.prepare(
617            "SELECT id, agent_id, thread_id, run_id, parent_event_id, event_type, payload, trace_id, span_id, model, tokens_input, tokens_output, latency_ms, cost_usd, timestamp, logical_clock, content_hash, prev_hash, embedding FROM agent_events WHERE agent_id = ? ORDER BY timestamp DESC LIMIT ? OFFSET ?",
618        )?;
619        let rows = stmt.query_map(
620            duckdb::params![agent_id, limit as i64, offset as i64],
621            row_to_event,
622        )?;
623        let mut results = Vec::new();
624        for row in rows {
625            results.push(row.map_err(|e| Error::Storage(e.to_string()))?);
626        }
627        Ok(results)
628    }
629
630    async fn get_events_by_thread(&self, thread_id: &str, limit: usize) -> Result<Vec<AgentEvent>> {
631        let conn = self.conn.lock().await;
632        let mut stmt = conn.prepare(
633            "SELECT id, agent_id, thread_id, run_id, parent_event_id, event_type, payload, trace_id, span_id, model, tokens_input, tokens_output, latency_ms, cost_usd, timestamp, logical_clock, content_hash, prev_hash, embedding FROM agent_events WHERE thread_id = ? ORDER BY timestamp ASC LIMIT ?",
634        )?;
635        let rows = stmt.query_map(duckdb::params![thread_id, limit as i64], row_to_event)?;
636        let mut results = Vec::new();
637        for row in rows {
638            results.push(row.map_err(|e| Error::Storage(e.to_string()))?);
639        }
640        Ok(results)
641    }
642
643    async fn get_event(&self, id: Uuid) -> Result<Option<AgentEvent>> {
644        let conn = self.conn.lock().await;
645        let mut stmt = conn.prepare(
646            "SELECT id, agent_id, thread_id, run_id, parent_event_id, event_type, payload, trace_id, span_id, model, tokens_input, tokens_output, latency_ms, cost_usd, timestamp, logical_clock, content_hash, prev_hash, embedding FROM agent_events WHERE id = ?",
647        )?;
648        let result = stmt.query_row([id.to_string()], row_to_event);
649        match result {
650            Ok(event) => Ok(Some(event)),
651            Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
652            Err(e) => Err(Error::Storage(e.to_string())),
653        }
654    }
655
656    async fn list_child_events(
657        &self,
658        parent_event_id: Uuid,
659        limit: usize,
660    ) -> Result<Vec<AgentEvent>> {
661        let conn = self.conn.lock().await;
662        let mut stmt = conn.prepare(
663            "SELECT id, agent_id, thread_id, run_id, parent_event_id, event_type, payload, trace_id, span_id, model, tokens_input, tokens_output, latency_ms, cost_usd, timestamp, logical_clock, content_hash, prev_hash, embedding FROM agent_events WHERE parent_event_id = ? ORDER BY timestamp ASC LIMIT ?",
664        )?;
665        let rows = stmt.query_map(
666            duckdb::params![parent_event_id.to_string(), limit as i64],
667            row_to_event,
668        )?;
669        let mut results = Vec::new();
670        for row in rows {
671            results.push(row.map_err(|e| Error::Storage(e.to_string()))?);
672        }
673        Ok(results)
674    }
675
676    async fn list_memories_by_agent_ordered(
677        &self,
678        agent_id: &str,
679        thread_id: Option<&str>,
680        limit: usize,
681    ) -> Result<Vec<MemoryRecord>> {
682        let conn = self.conn.lock().await;
683        let (result,) = if let Some(tid) = thread_id {
684            let mut stmt = conn.prepare(
685                "SELECT id, agent_id, content, memory_type, scope, importance, tags, metadata, embedding, content_hash, prev_hash, source_type, source_id, consolidation_state, access_count, org_id, thread_id, created_at, updated_at, last_accessed_at, expires_at, deleted_at, decay_rate, created_by, version, prev_version_id, quarantined, quarantine_reason, decay_function FROM memories WHERE agent_id = ? AND thread_id = ? AND deleted_at IS NULL ORDER BY created_at ASC LIMIT ?",
686            )?;
687            let rows =
688                stmt.query_map(duckdb::params![agent_id, tid, limit as i64], row_to_memory)?;
689            let mut results = Vec::new();
690            for row in rows {
691                results.push(row.map_err(|e| Error::Storage(e.to_string()))?);
692            }
693            (results,)
694        } else {
695            let mut stmt = conn.prepare(
696                "SELECT id, agent_id, content, memory_type, scope, importance, tags, metadata, embedding, content_hash, prev_hash, source_type, source_id, consolidation_state, access_count, org_id, thread_id, created_at, updated_at, last_accessed_at, expires_at, deleted_at, decay_rate, created_by, version, prev_version_id, quarantined, quarantine_reason, decay_function FROM memories WHERE agent_id = ? AND deleted_at IS NULL ORDER BY created_at ASC LIMIT ?",
697            )?;
698            let rows = stmt.query_map(duckdb::params![agent_id, limit as i64], row_to_memory)?;
699            let mut results = Vec::new();
700            for row in rows {
701                results.push(row.map_err(|e| Error::Storage(e.to_string()))?);
702            }
703            (results,)
704        };
705        Ok(result)
706    }
707
708    async fn list_memories_since(
709        &self,
710        updated_after: &str,
711        limit: usize,
712    ) -> Result<Vec<MemoryRecord>> {
713        let conn = self.conn.lock().await;
714        let mut stmt = conn.prepare(
715            "SELECT id, agent_id, content, memory_type, scope, importance, tags, metadata, embedding, content_hash, prev_hash, source_type, source_id, consolidation_state, access_count, org_id, thread_id, created_at, updated_at, last_accessed_at, expires_at, deleted_at, decay_rate, created_by, version, prev_version_id, quarantined, quarantine_reason, decay_function FROM memories WHERE updated_at > ? ORDER BY updated_at ASC LIMIT ?",
716        )?;
717        let rows = stmt.query_map(duckdb::params![updated_after, limit as i64], row_to_memory)?;
718        let mut results = Vec::new();
719        for row in rows {
720            results.push(row.map_err(|e| Error::Storage(e.to_string()))?);
721        }
722        Ok(results)
723    }
724
725    async fn upsert_memory(&self, record: &MemoryRecord) -> Result<()> {
726        // Try update first; if no rows affected, insert
727        match self.update_memory(record).await {
728            Ok(()) => Ok(()),
729            Err(Error::NotFound(_)) => self.insert_memory(record).await,
730            Err(e) => Err(e),
731        }
732    }
733
734    async fn cleanup_expired(&self) -> Result<usize> {
735        let conn = self.conn.lock().await;
736        let now = chrono::Utc::now().to_rfc3339();
737        let affected = conn.execute(
738            "UPDATE memories SET deleted_at = ? WHERE expires_at IS NOT NULL AND expires_at < ? AND deleted_at IS NULL",
739            duckdb::params![now.clone(), now],
740        )?;
741        Ok(affected)
742    }
743
744    async fn insert_delegation(&self, d: &Delegation) -> Result<()> {
745        let conn = self.conn.lock().await;
746        let scope_type = d.scope.to_string();
747        let scope_value = match &d.scope {
748            DelegationScope::AllMemories => serde_json::Value::Null,
749            DelegationScope::ByTag(tags) => serde_json::json!(tags),
750            DelegationScope::ByMemoryId(ids) => {
751                serde_json::json!(ids.iter().map(|id| id.to_string()).collect::<Vec<_>>())
752            }
753        };
754        let scope_value_json = serde_json::to_string(&scope_value)?;
755
756        conn.execute(
757            "INSERT INTO delegations (id, delegator_id, delegate_id, permission, scope_type, scope_value, max_depth, current_depth, parent_delegation_id, created_at, expires_at, revoked_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
758            duckdb::params![
759                d.id.to_string(),
760                d.delegator_id,
761                d.delegate_id,
762                d.permission.to_string(),
763                scope_type,
764                scope_value_json,
765                d.max_depth as i32,
766                d.current_depth as i32,
767                d.parent_delegation_id.map(|id| id.to_string()),
768                d.created_at,
769                d.expires_at,
770                d.revoked_at,
771            ],
772        )?;
773        Ok(())
774    }
775
776    async fn list_delegations_for(&self, delegate_id: &str) -> Result<Vec<Delegation>> {
777        let conn = self.conn.lock().await;
778        let now = chrono::Utc::now().to_rfc3339();
779        let mut stmt = conn.prepare(
780            "SELECT id, delegator_id, delegate_id, permission, scope_type, scope_value, max_depth, current_depth, parent_delegation_id, created_at, expires_at, revoked_at FROM delegations WHERE delegate_id = ? AND revoked_at IS NULL AND (expires_at IS NULL OR expires_at > ?)",
781        )?;
782        let rows = stmt.query_map(duckdb::params![delegate_id, now], row_to_delegation)?;
783        let mut results = Vec::new();
784        for row in rows {
785            results.push(row.map_err(|e| Error::Storage(e.to_string()))?);
786        }
787        Ok(results)
788    }
789
790    async fn revoke_delegation(&self, id: Uuid) -> Result<()> {
791        let conn = self.conn.lock().await;
792        let now = chrono::Utc::now().to_rfc3339();
793        let affected = conn.execute(
794            "UPDATE delegations SET revoked_at = ? WHERE id = ? AND revoked_at IS NULL",
795            duckdb::params![now, id.to_string()],
796        )?;
797        if affected == 0 {
798            return Err(Error::NotFound(format!(
799                "delegation {id} not found or already revoked"
800            )));
801        }
802        Ok(())
803    }
804
805    async fn check_delegation(
806        &self,
807        delegate_id: &str,
808        memory_id: Uuid,
809        required: Permission,
810    ) -> Result<bool> {
811        let delegations = self.list_delegations_for(delegate_id).await?;
812        // Get the memory to check scope
813        let memory = match self.get_memory(memory_id).await? {
814            Some(m) => m,
815            None => return Ok(false),
816        };
817
818        for d in &delegations {
819            if !d.permission.satisfies(required) {
820                continue;
821            }
822            match &d.scope {
823                DelegationScope::AllMemories => return Ok(true),
824                DelegationScope::ByMemoryId(ids) => {
825                    if ids.contains(&memory_id) {
826                        return Ok(true);
827                    }
828                }
829                DelegationScope::ByTag(tags) => {
830                    if tags.iter().any(|t| memory.tags.contains(t)) {
831                        return Ok(true);
832                    }
833                }
834            }
835        }
836        Ok(false)
837    }
838
839    async fn insert_or_update_agent_profile(&self, profile: &AgentProfile) -> Result<()> {
840        let conn = self.conn.lock().await;
841        // Try update first, then insert
842        let affected = conn.execute(
843            "UPDATE agent_profiles SET avg_importance = ?, avg_content_length = ?, total_memories = ?, last_updated = ? WHERE agent_id = ?",
844            duckdb::params![
845                profile.avg_importance,
846                profile.avg_content_length,
847                profile.total_memories as i64,
848                profile.last_updated,
849                profile.agent_id,
850            ],
851        )?;
852        if affected == 0 {
853            conn.execute(
854                "INSERT INTO agent_profiles (agent_id, avg_importance, avg_content_length, total_memories, last_updated) VALUES (?, ?, ?, ?, ?)",
855                duckdb::params![
856                    profile.agent_id,
857                    profile.avg_importance,
858                    profile.avg_content_length,
859                    profile.total_memories as i64,
860                    profile.last_updated,
861                ],
862            )?;
863        }
864        Ok(())
865    }
866
867    async fn get_agent_profile(&self, agent_id: &str) -> Result<Option<AgentProfile>> {
868        let conn = self.conn.lock().await;
869        let mut stmt = conn.prepare(
870            "SELECT agent_id, avg_importance, avg_content_length, total_memories, last_updated FROM agent_profiles WHERE agent_id = ?",
871        )?;
872        let result = stmt.query_row([agent_id], |row| {
873            Ok(AgentProfile {
874                agent_id: row.get(0)?,
875                avg_importance: row.get(1)?,
876                avg_content_length: row.get(2)?,
877                total_memories: row.get::<_, i64>(3)? as u64,
878                last_updated: row.get(4)?,
879            })
880        });
881        match result {
882            Ok(profile) => Ok(Some(profile)),
883            Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
884            Err(e) => Err(Error::Storage(e.to_string())),
885        }
886    }
887
888    async fn insert_or_update_embedding_baseline(
889        &self,
890        baseline: &EmbeddingBaseline,
891    ) -> Result<()> {
892        let conn = self.conn.lock().await;
893        let mu_json = serde_json::to_string(&baseline.mu)?;
894        let cov_json = serde_json::to_string(&baseline.cov_diag)?;
895        let affected = conn.execute(
896            "UPDATE embedding_baseline SET mu = ?, cov_diag = ?, n = ?, updated_at = ? WHERE agent_id = ?",
897            duckdb::params![
898                mu_json,
899                cov_json,
900                baseline.n as i64,
901                baseline.updated_at,
902                baseline.agent_id,
903            ],
904        )?;
905        if affected == 0 {
906            let mu_json = serde_json::to_string(&baseline.mu)?;
907            let cov_json = serde_json::to_string(&baseline.cov_diag)?;
908            conn.execute(
909                "INSERT INTO embedding_baseline (agent_id, mu, cov_diag, n, updated_at) VALUES (?, ?, ?, ?, ?)",
910                duckdb::params![
911                    baseline.agent_id,
912                    mu_json,
913                    cov_json,
914                    baseline.n as i64,
915                    baseline.updated_at,
916                ],
917            )?;
918        }
919        Ok(())
920    }
921
922    async fn get_embedding_baseline(&self, agent_id: &str) -> Result<Option<EmbeddingBaseline>> {
923        let conn = self.conn.lock().await;
924        let mut stmt = conn.prepare(
925            "SELECT agent_id, mu, cov_diag, n, updated_at FROM embedding_baseline WHERE agent_id = ?",
926        )?;
927        let result: duckdb::Result<(String, String, String, i64, String)> =
928            stmt.query_row([agent_id], |row| {
929                Ok((
930                    row.get(0)?,
931                    row.get(1)?,
932                    row.get(2)?,
933                    row.get(3)?,
934                    row.get(4)?,
935                ))
936            });
937        match result {
938            Ok((agent_id, mu_json, cov_json, n, updated_at)) => {
939                let mu: Vec<f32> = serde_json::from_str(&mu_json)?;
940                let cov_diag: Vec<f32> = serde_json::from_str(&cov_json)?;
941                Ok(Some(EmbeddingBaseline {
942                    agent_id,
943                    mu,
944                    cov_diag,
945                    n: n as u64,
946                    updated_at,
947                }))
948            }
949            Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
950            Err(e) => Err(Error::Storage(e.to_string())),
951        }
952    }
953
954    async fn insert_checkpoint(&self, cp: &Checkpoint) -> Result<()> {
955        let conn = self.conn.lock().await;
956        let state_snapshot_json = serde_json::to_string(&cp.state_snapshot)?;
957        let state_diff_json = cp
958            .state_diff
959            .as_ref()
960            .map(serde_json::to_string)
961            .transpose()?;
962        let memory_refs_json = serde_json::to_string(
963            &cp.memory_refs
964                .iter()
965                .map(|id| id.to_string())
966                .collect::<Vec<_>>(),
967        )?;
968        let metadata_json = serde_json::to_string(&cp.metadata)?;
969
970        conn.execute(
971            "INSERT INTO checkpoints (id, thread_id, agent_id, parent_id, branch_name, state_snapshot, state_diff, memory_refs, event_cursor, label, created_at, metadata) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
972            duckdb::params![
973                cp.id.to_string(),
974                cp.thread_id,
975                cp.agent_id,
976                cp.parent_id.map(|id| id.to_string()),
977                cp.branch_name,
978                state_snapshot_json,
979                state_diff_json,
980                memory_refs_json,
981                cp.event_cursor.map(|id| id.to_string()),
982                cp.label,
983                cp.created_at,
984                metadata_json,
985            ],
986        )?;
987        Ok(())
988    }
989
990    async fn get_checkpoint(&self, id: Uuid) -> Result<Option<Checkpoint>> {
991        let conn = self.conn.lock().await;
992        let mut stmt = conn.prepare(
993            "SELECT id, thread_id, agent_id, parent_id, branch_name, state_snapshot, state_diff, memory_refs, event_cursor, label, created_at, metadata FROM checkpoints WHERE id = ?",
994        )?;
995        let result = stmt.query_row([id.to_string()], row_to_checkpoint);
996        match result {
997            Ok(cp) => Ok(Some(cp)),
998            Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
999            Err(e) => Err(Error::Storage(e.to_string())),
1000        }
1001    }
1002
1003    async fn list_checkpoints(
1004        &self,
1005        thread_id: &str,
1006        branch: Option<&str>,
1007        limit: usize,
1008    ) -> Result<Vec<Checkpoint>> {
1009        let conn = self.conn.lock().await;
1010        let (sql, rows_result) = if let Some(branch_name) = branch {
1011            let mut stmt = conn.prepare(
1012                "SELECT id, thread_id, agent_id, parent_id, branch_name, state_snapshot, state_diff, memory_refs, event_cursor, label, created_at, metadata FROM checkpoints WHERE thread_id = ? AND branch_name = ? ORDER BY created_at DESC LIMIT ?",
1013            )?;
1014            let rows = stmt.query_map(
1015                duckdb::params![thread_id, branch_name, limit as i64],
1016                row_to_checkpoint,
1017            )?;
1018            let mut results = Vec::new();
1019            for row in rows {
1020                results.push(row.map_err(|e| Error::Storage(e.to_string()))?);
1021            }
1022            ((), Ok(results))
1023        } else {
1024            let mut stmt = conn.prepare(
1025                "SELECT id, thread_id, agent_id, parent_id, branch_name, state_snapshot, state_diff, memory_refs, event_cursor, label, created_at, metadata FROM checkpoints WHERE thread_id = ? ORDER BY created_at DESC LIMIT ?",
1026            )?;
1027            let rows =
1028                stmt.query_map(duckdb::params![thread_id, limit as i64], row_to_checkpoint)?;
1029            let mut results = Vec::new();
1030            for row in rows {
1031                results.push(row.map_err(|e| Error::Storage(e.to_string()))?);
1032            }
1033            ((), Ok(results))
1034        };
1035        let _ = sql;
1036        rows_result
1037    }
1038
1039    async fn get_latest_checkpoint(
1040        &self,
1041        thread_id: &str,
1042        branch: &str,
1043    ) -> Result<Option<Checkpoint>> {
1044        let conn = self.conn.lock().await;
1045        let mut stmt = conn.prepare(
1046            "SELECT id, thread_id, agent_id, parent_id, branch_name, state_snapshot, state_diff, memory_refs, event_cursor, label, created_at, metadata FROM checkpoints WHERE thread_id = ? AND branch_name = ? ORDER BY created_at DESC LIMIT 1",
1047        )?;
1048        let result = stmt.query_row(duckdb::params![thread_id, branch], row_to_checkpoint);
1049        match result {
1050            Ok(cp) => Ok(Some(cp)),
1051            Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
1052            Err(e) => Err(Error::Storage(e.to_string())),
1053        }
1054    }
1055}
1056
1057fn row_to_event(row: &duckdb::Row<'_>) -> duckdb::Result<AgentEvent> {
1058    let id_str: String = row.get(0)?;
1059    let parent_id_str: Option<String> = row.get(4)?;
1060    let payload_json: Option<String> = row.get(6)?;
1061    let event_type_str: String = row.get(5)?;
1062    let content_hash: Vec<u8> = row.get(16)?;
1063    let prev_hash: Option<Vec<u8>> = row.get(17)?;
1064    let embedding_blob: Option<Vec<u8>> = row.get(18).unwrap_or(None);
1065
1066    Ok(AgentEvent {
1067        id: Uuid::parse_str(&id_str).map_err(|e| {
1068            duckdb::Error::FromSqlConversionFailure(0, duckdb::types::Type::Text, Box::new(e))
1069        })?,
1070        agent_id: row.get(1)?,
1071        thread_id: row.get(2)?,
1072        run_id: row.get(3)?,
1073        parent_event_id: parent_id_str.and_then(|s| Uuid::parse_str(&s).ok()),
1074        event_type: event_type_str.parse().map_err(|e: Error| {
1075            duckdb::Error::FromSqlConversionFailure(
1076                5,
1077                duckdb::types::Type::Text,
1078                e.to_string().into(),
1079            )
1080        })?,
1081        payload: payload_json
1082            .and_then(|s| serde_json::from_str(&s).ok())
1083            .unwrap_or(serde_json::Value::Null),
1084        trace_id: row.get(7)?,
1085        span_id: row.get(8)?,
1086        model: row.get(9)?,
1087        tokens_input: row.get(10)?,
1088        tokens_output: row.get(11)?,
1089        latency_ms: row.get(12)?,
1090        cost_usd: row.get(13)?,
1091        timestamp: row.get(14)?,
1092        logical_clock: row.get(15)?,
1093        content_hash,
1094        prev_hash,
1095        embedding: deserialize_embedding(embedding_blob),
1096    })
1097}
1098
1099fn row_to_checkpoint(row: &duckdb::Row<'_>) -> duckdb::Result<Checkpoint> {
1100    let id_str: String = row.get(0)?;
1101    let parent_id_str: Option<String> = row.get(3)?;
1102    let state_snapshot_json: Option<String> = row.get(5)?;
1103    let state_diff_json: Option<String> = row.get(6)?;
1104    let memory_refs_json: Option<String> = row.get(7)?;
1105    let event_cursor_str: Option<String> = row.get(8)?;
1106    let metadata_json: Option<String> = row.get(11)?;
1107
1108    Ok(Checkpoint {
1109        id: Uuid::parse_str(&id_str).map_err(|e| {
1110            duckdb::Error::FromSqlConversionFailure(0, duckdb::types::Type::Text, Box::new(e))
1111        })?,
1112        thread_id: row.get(1)?,
1113        agent_id: row.get(2)?,
1114        parent_id: parent_id_str.and_then(|s| Uuid::parse_str(&s).ok()),
1115        branch_name: row.get(4)?,
1116        state_snapshot: state_snapshot_json
1117            .and_then(|s| serde_json::from_str(&s).ok())
1118            .unwrap_or(serde_json::Value::Object(serde_json::Map::new())),
1119        state_diff: state_diff_json.and_then(|s| serde_json::from_str(&s).ok()),
1120        memory_refs: memory_refs_json
1121            .and_then(|s| serde_json::from_str::<Vec<String>>(&s).ok())
1122            .map(|v| {
1123                v.into_iter()
1124                    .filter_map(|s| Uuid::parse_str(&s).ok())
1125                    .collect()
1126            })
1127            .unwrap_or_default(),
1128        event_cursor: event_cursor_str.and_then(|s| Uuid::parse_str(&s).ok()),
1129        label: row.get(9)?,
1130        created_at: row.get(10)?,
1131        metadata: metadata_json
1132            .and_then(|s| serde_json::from_str(&s).ok())
1133            .unwrap_or(serde_json::Value::Object(serde_json::Map::new())),
1134    })
1135}
1136
1137fn row_to_delegation(row: &duckdb::Row<'_>) -> duckdb::Result<Delegation> {
1138    let id_str: String = row.get(0)?;
1139    let scope_type: String = row.get(4)?;
1140    let scope_value_json: Option<String> = row.get(5)?;
1141    let parent_id_str: Option<String> = row.get(8)?;
1142
1143    let scope = match scope_type.as_str() {
1144        "by_tag" => {
1145            let tags: Vec<String> = scope_value_json
1146                .and_then(|s| serde_json::from_str(&s).ok())
1147                .unwrap_or_default();
1148            DelegationScope::ByTag(tags)
1149        }
1150        "by_memory_id" => {
1151            let ids: Vec<String> = scope_value_json
1152                .and_then(|s| serde_json::from_str(&s).ok())
1153                .unwrap_or_default();
1154            let uuids = ids
1155                .into_iter()
1156                .filter_map(|s| Uuid::parse_str(&s).ok())
1157                .collect();
1158            DelegationScope::ByMemoryId(uuids)
1159        }
1160        _ => DelegationScope::AllMemories,
1161    };
1162
1163    let permission_str: String = row.get(3)?;
1164
1165    Ok(Delegation {
1166        id: Uuid::parse_str(&id_str).map_err(|e| {
1167            duckdb::Error::FromSqlConversionFailure(0, duckdb::types::Type::Text, Box::new(e))
1168        })?,
1169        delegator_id: row.get(1)?,
1170        delegate_id: row.get(2)?,
1171        permission: permission_str.parse().map_err(|e: Error| {
1172            duckdb::Error::FromSqlConversionFailure(
1173                3,
1174                duckdb::types::Type::Text,
1175                e.to_string().into(),
1176            )
1177        })?,
1178        scope,
1179        max_depth: row.get::<_, i32>(6)? as u32,
1180        current_depth: row.get::<_, i32>(7)? as u32,
1181        parent_delegation_id: parent_id_str.and_then(|s| Uuid::parse_str(&s).ok()),
1182        created_at: row.get(9)?,
1183        expires_at: row.get(10)?,
1184        revoked_at: row.get(11)?,
1185    })
1186}
1187
1188fn row_to_relation(row: &duckdb::Row<'_>) -> duckdb::Result<Relation> {
1189    let id_str: String = row.get(0)?;
1190    let source_str: String = row.get(1)?;
1191    let target_str: String = row.get(2)?;
1192    let metadata_json: Option<String> = row.get(5)?;
1193
1194    Ok(Relation {
1195        id: Uuid::parse_str(&id_str).map_err(|e| {
1196            duckdb::Error::FromSqlConversionFailure(0, duckdb::types::Type::Text, Box::new(e))
1197        })?,
1198        source_id: Uuid::parse_str(&source_str).map_err(|e| {
1199            duckdb::Error::FromSqlConversionFailure(1, duckdb::types::Type::Text, Box::new(e))
1200        })?,
1201        target_id: Uuid::parse_str(&target_str).map_err(|e| {
1202            duckdb::Error::FromSqlConversionFailure(2, duckdb::types::Type::Text, Box::new(e))
1203        })?,
1204        relation_type: row.get(3)?,
1205        weight: row.get(4)?,
1206        metadata: metadata_json
1207            .and_then(|s| serde_json::from_str(&s).ok())
1208            .unwrap_or(serde_json::Value::Object(serde_json::Map::new())),
1209        created_at: row.get(6)?,
1210    })
1211}
1212
1213#[cfg(test)]
1214mod tests {
1215    use super::*;
1216    use crate::hash::compute_content_hash;
1217    use crate::model::acl::PrincipalType;
1218    use crate::model::checkpoint::Checkpoint;
1219    use crate::model::event::{AgentEvent, EventType};
1220    use crate::model::memory::{ConsolidationState, MemoryType, Scope, SourceType};
1221
1222    fn make_record(agent_id: &str) -> MemoryRecord {
1223        let now = chrono::Utc::now().to_rfc3339();
1224        let content = "test memory content";
1225        MemoryRecord {
1226            id: Uuid::now_v7(),
1227            agent_id: agent_id.to_string(),
1228            content: content.to_string(),
1229            memory_type: MemoryType::Semantic,
1230            scope: Scope::Private,
1231            importance: 0.7,
1232            tags: vec!["test".to_string()],
1233            metadata: serde_json::json!({"key": "value"}),
1234            embedding: Some(vec![0.1, 0.2, 0.3]),
1235            content_hash: compute_content_hash(content, agent_id, &now),
1236            prev_hash: None,
1237            source_type: SourceType::Agent,
1238            source_id: None,
1239            consolidation_state: ConsolidationState::Raw,
1240            access_count: 0,
1241            org_id: None,
1242            thread_id: None,
1243            created_at: now.clone(),
1244            updated_at: now,
1245            last_accessed_at: None,
1246            expires_at: None,
1247            deleted_at: None,
1248            decay_rate: None,
1249            created_by: None,
1250            version: 1,
1251            prev_version_id: None,
1252            quarantined: false,
1253            quarantine_reason: None,
1254            decay_function: None,
1255        }
1256    }
1257
1258    #[tokio::test]
1259    async fn test_insert_and_get() {
1260        let storage = DuckDbStorage::open_in_memory().unwrap();
1261        let record = make_record("agent-1");
1262        storage.insert_memory(&record).await.unwrap();
1263
1264        let fetched = storage.get_memory(record.id).await.unwrap().unwrap();
1265        assert_eq!(fetched.id, record.id);
1266        assert_eq!(fetched.content, record.content);
1267        assert_eq!(fetched.agent_id, record.agent_id);
1268        assert_eq!(fetched.memory_type, record.memory_type);
1269        assert_eq!(fetched.tags, record.tags);
1270        assert_eq!(fetched.embedding, record.embedding);
1271    }
1272
1273    #[tokio::test]
1274    async fn test_get_nonexistent() {
1275        let storage = DuckDbStorage::open_in_memory().unwrap();
1276        let result = storage.get_memory(Uuid::now_v7()).await.unwrap();
1277        assert!(result.is_none());
1278    }
1279
1280    #[tokio::test]
1281    async fn test_soft_delete() {
1282        let storage = DuckDbStorage::open_in_memory().unwrap();
1283        let record = make_record("agent-1");
1284        storage.insert_memory(&record).await.unwrap();
1285
1286        storage.soft_delete_memory(record.id).await.unwrap();
1287
1288        // Should still exist in DB but with deleted_at set
1289        let fetched = storage.get_memory(record.id).await.unwrap().unwrap();
1290        assert!(fetched.deleted_at.is_some());
1291
1292        // Should not appear in list by default
1293        let filter = MemoryFilter::default();
1294        let list = storage.list_memories(&filter, 100, 0).await.unwrap();
1295        assert!(list.is_empty());
1296
1297        // Should appear with include_deleted
1298        let filter_with_deleted = MemoryFilter {
1299            include_deleted: true,
1300            ..Default::default()
1301        };
1302        let list = storage
1303            .list_memories(&filter_with_deleted, 100, 0)
1304            .await
1305            .unwrap();
1306        assert_eq!(list.len(), 1);
1307    }
1308
1309    #[tokio::test]
1310    async fn test_hard_delete() {
1311        let storage = DuckDbStorage::open_in_memory().unwrap();
1312        let record = make_record("agent-1");
1313        storage.insert_memory(&record).await.unwrap();
1314
1315        storage.hard_delete_memory(record.id).await.unwrap();
1316
1317        let result = storage.get_memory(record.id).await.unwrap();
1318        assert!(result.is_none());
1319    }
1320
1321    #[tokio::test]
1322    async fn test_list_with_filters() {
1323        let storage = DuckDbStorage::open_in_memory().unwrap();
1324
1325        let mut r1 = make_record("agent-1");
1326        r1.memory_type = MemoryType::Episodic;
1327        storage.insert_memory(&r1).await.unwrap();
1328
1329        let mut r2 = make_record("agent-1");
1330        r2.memory_type = MemoryType::Semantic;
1331        storage.insert_memory(&r2).await.unwrap();
1332
1333        let mut r3 = make_record("agent-2");
1334        r3.memory_type = MemoryType::Semantic;
1335        storage.insert_memory(&r3).await.unwrap();
1336
1337        // Filter by agent
1338        let filter = MemoryFilter {
1339            agent_id: Some("agent-1".to_string()),
1340            ..Default::default()
1341        };
1342        let list = storage.list_memories(&filter, 100, 0).await.unwrap();
1343        assert_eq!(list.len(), 2);
1344
1345        // Filter by type
1346        let filter = MemoryFilter {
1347            memory_type: Some(MemoryType::Semantic),
1348            ..Default::default()
1349        };
1350        let list = storage.list_memories(&filter, 100, 0).await.unwrap();
1351        assert_eq!(list.len(), 2);
1352
1353        // Filter by agent + type
1354        let filter = MemoryFilter {
1355            agent_id: Some("agent-1".to_string()),
1356            memory_type: Some(MemoryType::Episodic),
1357            ..Default::default()
1358        };
1359        let list = storage.list_memories(&filter, 100, 0).await.unwrap();
1360        assert_eq!(list.len(), 1);
1361    }
1362
1363    #[tokio::test]
1364    async fn test_touch_memory() {
1365        let storage = DuckDbStorage::open_in_memory().unwrap();
1366        let record = make_record("agent-1");
1367        storage.insert_memory(&record).await.unwrap();
1368
1369        storage.touch_memory(record.id).await.unwrap();
1370        storage.touch_memory(record.id).await.unwrap();
1371
1372        let fetched = storage.get_memory(record.id).await.unwrap().unwrap();
1373        assert_eq!(fetched.access_count, 2);
1374        assert!(fetched.last_accessed_at.is_some());
1375    }
1376
1377    #[tokio::test]
1378    async fn test_acl_and_permission_check() {
1379        let storage = DuckDbStorage::open_in_memory().unwrap();
1380        let record = make_record("agent-1");
1381        storage.insert_memory(&record).await.unwrap();
1382
1383        // Owner always has permission
1384        assert!(
1385            storage
1386                .check_permission(record.id, "agent-1", Permission::Admin)
1387                .await
1388                .unwrap()
1389        );
1390
1391        // Non-owner has no permission by default
1392        assert!(
1393            !storage
1394                .check_permission(record.id, "agent-2", Permission::Read)
1395                .await
1396                .unwrap()
1397        );
1398
1399        // Grant read to agent-2
1400        let acl = Acl {
1401            id: Uuid::now_v7(),
1402            memory_id: record.id,
1403            principal_type: PrincipalType::Agent,
1404            principal_id: "agent-2".to_string(),
1405            permission: Permission::Read,
1406            granted_by: "agent-1".to_string(),
1407            created_at: chrono::Utc::now().to_rfc3339(),
1408            expires_at: None,
1409        };
1410        storage.insert_acl(&acl).await.unwrap();
1411
1412        // Now agent-2 can read
1413        assert!(
1414            storage
1415                .check_permission(record.id, "agent-2", Permission::Read)
1416                .await
1417                .unwrap()
1418        );
1419        // But not write
1420        assert!(
1421            !storage
1422                .check_permission(record.id, "agent-2", Permission::Write)
1423                .await
1424                .unwrap()
1425        );
1426    }
1427
1428    #[tokio::test]
1429    async fn test_event_insert_and_list() {
1430        let storage = DuckDbStorage::open_in_memory().unwrap();
1431        let now = chrono::Utc::now().to_rfc3339();
1432        let event = AgentEvent {
1433            id: Uuid::now_v7(),
1434            agent_id: "agent-1".to_string(),
1435            thread_id: Some("thread-1".to_string()),
1436            run_id: None,
1437            parent_event_id: None,
1438            event_type: EventType::MemoryWrite,
1439            payload: serde_json::json!({"memory_id": "abc"}),
1440            trace_id: None,
1441            span_id: None,
1442            model: None,
1443            tokens_input: None,
1444            tokens_output: None,
1445            latency_ms: None,
1446            cost_usd: None,
1447            timestamp: now.clone(),
1448            logical_clock: 1,
1449            content_hash: vec![1, 2, 3],
1450            prev_hash: None,
1451            embedding: None,
1452        };
1453
1454        storage.insert_event(&event).await.unwrap();
1455
1456        let events = storage.list_events("agent-1", 10, 0).await.unwrap();
1457        assert_eq!(events.len(), 1);
1458        assert_eq!(events[0].id, event.id);
1459        assert_eq!(events[0].event_type, EventType::MemoryWrite);
1460        assert_eq!(events[0].agent_id, "agent-1");
1461
1462        // Get single event
1463        let fetched = storage.get_event(event.id).await.unwrap().unwrap();
1464        assert_eq!(fetched.id, event.id);
1465        assert_eq!(fetched.content_hash, vec![1, 2, 3]);
1466    }
1467
1468    #[tokio::test]
1469    async fn test_events_by_thread() {
1470        let storage = DuckDbStorage::open_in_memory().unwrap();
1471        let now = chrono::Utc::now().to_rfc3339();
1472
1473        for i in 0..3 {
1474            let event = AgentEvent {
1475                id: Uuid::now_v7(),
1476                agent_id: "agent-1".to_string(),
1477                thread_id: Some("thread-A".to_string()),
1478                run_id: None,
1479                parent_event_id: None,
1480                event_type: EventType::MemoryWrite,
1481                payload: serde_json::json!({"i": i}),
1482                trace_id: None,
1483                span_id: None,
1484                model: None,
1485                tokens_input: None,
1486                tokens_output: None,
1487                latency_ms: None,
1488                cost_usd: None,
1489                timestamp: now.clone(),
1490                logical_clock: i,
1491                content_hash: vec![i as u8],
1492                prev_hash: None,
1493                embedding: None,
1494            };
1495            storage.insert_event(&event).await.unwrap();
1496        }
1497
1498        // Different thread
1499        let event = AgentEvent {
1500            id: Uuid::now_v7(),
1501            agent_id: "agent-1".to_string(),
1502            thread_id: Some("thread-B".to_string()),
1503            run_id: None,
1504            parent_event_id: None,
1505            event_type: EventType::MemoryRead,
1506            payload: serde_json::json!({}),
1507            trace_id: None,
1508            span_id: None,
1509            model: None,
1510            tokens_input: None,
1511            tokens_output: None,
1512            latency_ms: None,
1513            cost_usd: None,
1514            timestamp: now.clone(),
1515            logical_clock: 0,
1516            content_hash: vec![99],
1517            prev_hash: None,
1518            embedding: None,
1519        };
1520        storage.insert_event(&event).await.unwrap();
1521
1522        let thread_a = storage.get_events_by_thread("thread-A", 10).await.unwrap();
1523        assert_eq!(thread_a.len(), 3);
1524
1525        let thread_b = storage.get_events_by_thread("thread-B", 10).await.unwrap();
1526        assert_eq!(thread_b.len(), 1);
1527        assert_eq!(thread_b[0].event_type, EventType::MemoryRead);
1528    }
1529
1530    #[tokio::test]
1531    async fn test_checkpoint_insert_and_get() {
1532        let storage = DuckDbStorage::open_in_memory().unwrap();
1533        let mem_id = Uuid::now_v7();
1534        let cp = Checkpoint {
1535            id: Uuid::now_v7(),
1536            thread_id: "thread-1".to_string(),
1537            agent_id: "agent-1".to_string(),
1538            parent_id: None,
1539            branch_name: "main".to_string(),
1540            state_snapshot: serde_json::json!({"step": 1}),
1541            state_diff: None,
1542            memory_refs: vec![mem_id],
1543            event_cursor: None,
1544            label: Some("initial".to_string()),
1545            created_at: chrono::Utc::now().to_rfc3339(),
1546            metadata: serde_json::json!({}),
1547        };
1548
1549        storage.insert_checkpoint(&cp).await.unwrap();
1550
1551        let fetched = storage.get_checkpoint(cp.id).await.unwrap().unwrap();
1552        assert_eq!(fetched.id, cp.id);
1553        assert_eq!(fetched.thread_id, "thread-1");
1554        assert_eq!(fetched.branch_name, "main");
1555        assert_eq!(fetched.memory_refs, vec![mem_id]);
1556        assert_eq!(fetched.label, Some("initial".to_string()));
1557    }
1558
1559    #[tokio::test]
1560    async fn test_checkpoint_list_and_latest() {
1561        let storage = DuckDbStorage::open_in_memory().unwrap();
1562
1563        let cp1 = Checkpoint {
1564            id: Uuid::now_v7(),
1565            thread_id: "thread-1".to_string(),
1566            agent_id: "agent-1".to_string(),
1567            parent_id: None,
1568            branch_name: "main".to_string(),
1569            state_snapshot: serde_json::json!({"step": 1}),
1570            state_diff: None,
1571            memory_refs: vec![],
1572            event_cursor: None,
1573            label: Some("first".to_string()),
1574            created_at: "2025-01-01T00:00:00Z".to_string(),
1575            metadata: serde_json::json!({}),
1576        };
1577        storage.insert_checkpoint(&cp1).await.unwrap();
1578
1579        let cp2 = Checkpoint {
1580            id: Uuid::now_v7(),
1581            thread_id: "thread-1".to_string(),
1582            agent_id: "agent-1".to_string(),
1583            parent_id: Some(cp1.id),
1584            branch_name: "main".to_string(),
1585            state_snapshot: serde_json::json!({"step": 2}),
1586            state_diff: Some(serde_json::json!({"step": [1, 2]})),
1587            memory_refs: vec![],
1588            event_cursor: None,
1589            label: Some("second".to_string()),
1590            created_at: "2025-01-02T00:00:00Z".to_string(),
1591            metadata: serde_json::json!({}),
1592        };
1593        storage.insert_checkpoint(&cp2).await.unwrap();
1594
1595        let cp3 = Checkpoint {
1596            id: Uuid::now_v7(),
1597            thread_id: "thread-1".to_string(),
1598            agent_id: "agent-1".to_string(),
1599            parent_id: Some(cp1.id),
1600            branch_name: "experiment".to_string(),
1601            state_snapshot: serde_json::json!({"step": "alt"}),
1602            state_diff: None,
1603            memory_refs: vec![],
1604            event_cursor: None,
1605            label: None,
1606            created_at: "2025-01-03T00:00:00Z".to_string(),
1607            metadata: serde_json::json!({}),
1608        };
1609        storage.insert_checkpoint(&cp3).await.unwrap();
1610
1611        // List all for thread
1612        let all = storage
1613            .list_checkpoints("thread-1", None, 10)
1614            .await
1615            .unwrap();
1616        assert_eq!(all.len(), 3);
1617
1618        // List by branch
1619        let main_cps = storage
1620            .list_checkpoints("thread-1", Some("main"), 10)
1621            .await
1622            .unwrap();
1623        assert_eq!(main_cps.len(), 2);
1624
1625        let exp_cps = storage
1626            .list_checkpoints("thread-1", Some("experiment"), 10)
1627            .await
1628            .unwrap();
1629        assert_eq!(exp_cps.len(), 1);
1630
1631        // Latest on main
1632        let latest = storage
1633            .get_latest_checkpoint("thread-1", "main")
1634            .await
1635            .unwrap()
1636            .unwrap();
1637        assert_eq!(latest.id, cp2.id);
1638
1639        // Latest on experiment
1640        let latest_exp = storage
1641            .get_latest_checkpoint("thread-1", "experiment")
1642            .await
1643            .unwrap()
1644            .unwrap();
1645        assert_eq!(latest_exp.id, cp3.id);
1646
1647        // No checkpoints for nonexistent branch
1648        let none = storage
1649            .get_latest_checkpoint("thread-1", "nonexistent")
1650            .await
1651            .unwrap();
1652        assert!(none.is_none());
1653    }
1654}