1use std::path::Path;
2use std::sync::Arc;
3use tokio::sync::Mutex;
4
5use crate::error::{Error, Result};
6use crate::model::acl::{Acl, Permission};
7use crate::model::agent_profile::AgentProfile;
8use crate::model::checkpoint::Checkpoint;
9use crate::model::delegation::{Delegation, DelegationScope};
10use crate::model::embedding_baseline::EmbeddingBaseline;
11use crate::model::event::AgentEvent;
12use crate::model::memory::MemoryRecord;
13use crate::model::relation::Relation;
14use crate::storage::{MemoryFilter, StorageBackend};
15use uuid::Uuid;
16
17pub struct DuckDbStorage {
18 conn: Arc<Mutex<duckdb::Connection>>,
19}
20
21impl DuckDbStorage {
22 pub fn open(path: &Path) -> Result<Self> {
23 let conn = duckdb::Connection::open(path)?;
24 super::migrations::run_migrations(&conn)?;
25 Ok(Self {
26 conn: Arc::new(Mutex::new(conn)),
27 })
28 }
29
30 pub fn open_in_memory() -> Result<Self> {
31 let conn = duckdb::Connection::open_in_memory()?;
32 super::migrations::run_migrations(&conn)?;
33 Ok(Self {
34 conn: Arc::new(Mutex::new(conn)),
35 })
36 }
37}
38
39fn serialize_embedding(embedding: &Option<Vec<f32>>) -> Option<Vec<u8>> {
40 embedding
41 .as_ref()
42 .map(|v| v.iter().flat_map(|f| f.to_le_bytes()).collect())
43}
44
45fn deserialize_embedding(blob: Option<Vec<u8>>) -> Option<Vec<f32>> {
46 blob.map(|bytes| {
47 bytes
48 .chunks_exact(4)
49 .map(|chunk| f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]))
50 .collect()
51 })
52}
53
54fn row_to_memory(row: &duckdb::Row<'_>) -> duckdb::Result<MemoryRecord> {
55 let id_str: String = row.get(0)?;
56 let tags_json: Option<String> = row.get(6)?;
57 let metadata_json: Option<String> = row.get(7)?;
58 let embedding_blob: Option<Vec<u8>> = row.get(8)?;
59 let content_hash: Vec<u8> = row.get(9)?;
60 let prev_hash: Option<Vec<u8>> = row.get(10)?;
61
62 let memory_type_str: String = row.get(3)?;
63 let scope_str: String = row.get(4)?;
64 let source_type_str: String = row.get(11)?;
65 let consolidation_state_str: String = row.get(13)?;
66
67 Ok(MemoryRecord {
68 id: Uuid::parse_str(&id_str)
69 .map_err(|e| duckdb::Error::FromSqlConversionFailure(0, duckdb::types::Type::Text, Box::new(e)))?,
70 agent_id: row.get(1)?,
71 content: row.get(2)?,
72 memory_type: memory_type_str.parse()
73 .map_err(|e: Error| duckdb::Error::FromSqlConversionFailure(3, duckdb::types::Type::Text, e.to_string().into()))?,
74 scope: scope_str.parse()
75 .map_err(|e: Error| duckdb::Error::FromSqlConversionFailure(4, duckdb::types::Type::Text, e.to_string().into()))?,
76 importance: row.get(5)?,
77 tags: match tags_json {
78 Some(ref s) => serde_json::from_str(s).unwrap_or_else(|e| {
79 tracing::warn!(id = %id_str, error = %e, raw = %s, "corrupted tags JSON, defaulting to empty");
80 vec![]
81 }),
82 None => vec![],
83 },
84 metadata: match metadata_json {
85 Some(ref s) => serde_json::from_str(s).unwrap_or_else(|e| {
86 tracing::warn!(id = %id_str, error = %e, "corrupted metadata JSON, defaulting to empty");
87 serde_json::Value::Object(serde_json::Map::new())
88 }),
89 None => serde_json::Value::Object(serde_json::Map::new()),
90 },
91 embedding: deserialize_embedding(embedding_blob),
92 content_hash,
93 prev_hash,
94 source_type: source_type_str.parse()
95 .map_err(|e: Error| duckdb::Error::FromSqlConversionFailure(11, duckdb::types::Type::Text, e.to_string().into()))?,
96 source_id: row.get(12)?,
97 consolidation_state: consolidation_state_str.parse()
98 .map_err(|e: Error| duckdb::Error::FromSqlConversionFailure(13, duckdb::types::Type::Text, e.to_string().into()))?,
99 access_count: u64::try_from(row.get::<_, i64>(14)?).unwrap_or(0),
100 org_id: row.get(15)?,
101 thread_id: row.get(16)?,
102 created_at: row.get(17)?,
103 updated_at: row.get(18)?,
104 last_accessed_at: row.get(19)?,
105 expires_at: row.get(20)?,
106 deleted_at: row.get(21)?,
107 decay_rate: row.get(22)?,
108 created_by: row.get(23)?,
109 version: u32::try_from(row.get::<_, i32>(24)?).unwrap_or(1),
110 prev_version_id: match row.get::<_, Option<String>>(25)? {
111 Some(s) => Uuid::parse_str(&s).map_err(|e| {
112 tracing::warn!(memory_id = %id_str, error = %e, "corrupted prev_version_id UUID");
113 e
114 }).ok(),
115 None => None,
116 },
117 quarantined: row.get::<_, bool>(26)?,
118 quarantine_reason: row.get(27)?,
119 decay_function: row.get(28).unwrap_or(None),
120 })
121}
122
123#[async_trait::async_trait]
124impl StorageBackend for DuckDbStorage {
125 async fn insert_memory(&self, record: &MemoryRecord) -> Result<()> {
126 let conn = self.conn.lock().await;
127 let tags_json = serde_json::to_string(&record.tags)?;
128 let metadata_json = serde_json::to_string(&record.metadata)?;
129 let embedding_blob = serialize_embedding(&record.embedding);
130
131 conn.execute(
132 "INSERT INTO memories (id, agent_id, content, memory_type, scope, importance, tags, metadata, embedding, content_hash, prev_hash, source_type, source_id, consolidation_state, access_count, org_id, thread_id, created_at, updated_at, last_accessed_at, expires_at, deleted_at, decay_rate, created_by, version, prev_version_id, quarantined, quarantine_reason, decay_function) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
133 duckdb::params![
134 record.id.to_string(),
135 record.agent_id,
136 record.content,
137 record.memory_type.to_string(),
138 record.scope.to_string(),
139 record.importance,
140 tags_json,
141 metadata_json,
142 embedding_blob,
143 record.content_hash,
144 record.prev_hash,
145 record.source_type.to_string(),
146 record.source_id,
147 record.consolidation_state.to_string(),
148 record.access_count as i64,
149 record.org_id,
150 record.thread_id,
151 record.created_at,
152 record.updated_at,
153 record.last_accessed_at,
154 record.expires_at,
155 record.deleted_at,
156 record.decay_rate,
157 record.created_by,
158 record.version as i32,
159 record.prev_version_id.map(|id| id.to_string()),
160 record.quarantined,
161 record.quarantine_reason,
162 record.decay_function,
163 ],
164 )?;
165 Ok(())
166 }
167
168 async fn get_memory(&self, id: Uuid) -> Result<Option<MemoryRecord>> {
169 let conn = self.conn.lock().await;
170 let mut stmt = conn.prepare(
171 "SELECT id, agent_id, content, memory_type, scope, importance, tags, metadata, embedding, content_hash, prev_hash, source_type, source_id, consolidation_state, access_count, org_id, thread_id, created_at, updated_at, last_accessed_at, expires_at, deleted_at, decay_rate, created_by, version, prev_version_id, quarantined, quarantine_reason, decay_function FROM memories WHERE id = ?",
172 )?;
173 let result = stmt.query_row([id.to_string()], row_to_memory);
174 match result {
175 Ok(record) => Ok(Some(record)),
176 Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
177 Err(e) => Err(Error::Storage(e.to_string())),
178 }
179 }
180
181 async fn update_memory(&self, record: &MemoryRecord) -> Result<()> {
182 let conn = self.conn.lock().await;
183 let tags_json = serde_json::to_string(&record.tags)?;
184 let metadata_json = serde_json::to_string(&record.metadata)?;
185 let embedding_blob = serialize_embedding(&record.embedding);
186
187 let affected = conn.execute(
188 "UPDATE memories SET agent_id=?, content=?, memory_type=?, scope=?, importance=?, tags=?, metadata=?, embedding=?, content_hash=?, prev_hash=?, source_type=?, source_id=?, consolidation_state=?, access_count=?, org_id=?, thread_id=?, updated_at=?, last_accessed_at=?, expires_at=?, deleted_at=?, decay_rate=?, created_by=?, version=?, prev_version_id=?, quarantined=?, quarantine_reason=?, decay_function=? WHERE id=?",
189 duckdb::params![
190 record.agent_id,
191 record.content,
192 record.memory_type.to_string(),
193 record.scope.to_string(),
194 record.importance,
195 tags_json,
196 metadata_json,
197 embedding_blob,
198 record.content_hash,
199 record.prev_hash,
200 record.source_type.to_string(),
201 record.source_id,
202 record.consolidation_state.to_string(),
203 record.access_count as i64,
204 record.org_id,
205 record.thread_id,
206 record.updated_at,
207 record.last_accessed_at,
208 record.expires_at,
209 record.deleted_at,
210 record.decay_rate,
211 record.created_by,
212 record.version as i32,
213 record.prev_version_id.map(|id| id.to_string()),
214 record.quarantined,
215 record.quarantine_reason,
216 record.decay_function,
217 record.id.to_string(),
218 ],
219 )?;
220 if affected == 0 {
221 return Err(Error::NotFound(format!("memory {} not found", record.id)));
222 }
223 Ok(())
224 }
225
226 async fn soft_delete_memory(&self, id: Uuid) -> Result<()> {
227 let conn = self.conn.lock().await;
228 let now = chrono::Utc::now().to_rfc3339();
229 let affected = conn.execute(
230 "UPDATE memories SET deleted_at = ?, updated_at = ? WHERE id = ? AND deleted_at IS NULL",
231 duckdb::params![now, now, id.to_string()],
232 )?;
233 if affected == 0 {
234 return Err(Error::NotFound(format!(
235 "memory {id} not found or already deleted"
236 )));
237 }
238 Ok(())
239 }
240
241 async fn hard_delete_memory(&self, id: Uuid) -> Result<()> {
242 let conn = self.conn.lock().await;
243 let affected = conn.execute(
244 "DELETE FROM memories WHERE id = ?",
245 duckdb::params![id.to_string()],
246 )?;
247 if affected == 0 {
248 return Err(Error::NotFound(format!("memory {id} not found")));
249 }
250 conn.execute(
252 "DELETE FROM acls WHERE memory_id = ?",
253 duckdb::params![id.to_string()],
254 )?;
255 Ok(())
256 }
257
258 async fn list_memories(
259 &self,
260 filter: &MemoryFilter,
261 limit: usize,
262 offset: usize,
263 ) -> Result<Vec<MemoryRecord>> {
264 let conn = self.conn.lock().await;
265 let mut conditions = Vec::new();
266 let mut params: Vec<Box<dyn duckdb::ToSql>> = Vec::new();
267
268 if !filter.include_deleted {
269 conditions.push("deleted_at IS NULL".to_string());
270 }
271
272 if let Some(ref agent_id) = filter.agent_id {
273 conditions.push(format!("agent_id = ${}", params.len() + 1));
274 params.push(Box::new(agent_id.clone()));
275 }
276
277 if let Some(memory_type) = filter.memory_type {
278 conditions.push(format!("memory_type = ${}", params.len() + 1));
279 params.push(Box::new(memory_type.to_string()));
280 }
281
282 if let Some(scope) = filter.scope {
283 conditions.push(format!("scope = ${}", params.len() + 1));
284 params.push(Box::new(scope.to_string()));
285 }
286
287 if let Some(min_importance) = filter.min_importance {
288 conditions.push(format!("importance >= ${}", params.len() + 1));
289 params.push(Box::new(min_importance));
290 }
291
292 if let Some(ref org_id) = filter.org_id {
293 conditions.push(format!("org_id = ${}", params.len() + 1));
294 params.push(Box::new(org_id.clone()));
295 }
296
297 if let Some(ref thread_id) = filter.thread_id {
298 conditions.push(format!("thread_id = ${}", params.len() + 1));
299 params.push(Box::new(thread_id.clone()));
300 }
301
302 let where_clause = if conditions.is_empty() {
303 String::new()
304 } else {
305 format!("WHERE {}", conditions.join(" AND "))
306 };
307
308 let sql = format!(
309 "SELECT id, agent_id, content, memory_type, scope, importance, tags, metadata, embedding, content_hash, prev_hash, source_type, source_id, consolidation_state, access_count, org_id, thread_id, created_at, updated_at, last_accessed_at, expires_at, deleted_at, decay_rate, created_by, version, prev_version_id, quarantined, quarantine_reason, decay_function FROM memories {where_clause} ORDER BY created_at DESC LIMIT {limit} OFFSET {offset}"
310 );
311
312 let mut stmt = conn.prepare(&sql)?;
313 let param_refs: Vec<&dyn duckdb::ToSql> = params.iter().map(|p| p.as_ref()).collect();
314 let rows = stmt.query_map(param_refs.as_slice(), row_to_memory)?;
315
316 let mut results = Vec::new();
317 for row in rows {
318 results.push(row.map_err(|e| Error::Storage(e.to_string()))?);
319 }
320 Ok(results)
321 }
322
323 async fn touch_memory(&self, id: Uuid) -> Result<()> {
324 let conn = self.conn.lock().await;
325 let now = chrono::Utc::now().to_rfc3339();
326 conn.execute(
327 "UPDATE memories SET access_count = access_count + 1, last_accessed_at = ? WHERE id = ?",
328 duckdb::params![now, id.to_string()],
329 )?;
330 Ok(())
331 }
332
333 async fn insert_acl(&self, acl: &Acl) -> Result<()> {
334 let conn = self.conn.lock().await;
335 conn.execute(
336 "INSERT INTO acls (id, memory_id, principal_type, principal_id, permission, granted_by, created_at, expires_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
337 duckdb::params![
338 acl.id.to_string(),
339 acl.memory_id.to_string(),
340 acl.principal_type.to_string(),
341 acl.principal_id,
342 acl.permission.to_string(),
343 acl.granted_by,
344 acl.created_at,
345 acl.expires_at,
346 ],
347 )?;
348 Ok(())
349 }
350
351 async fn check_permission(
352 &self,
353 memory_id: Uuid,
354 principal_id: &str,
355 required: Permission,
356 ) -> Result<bool> {
357 let acl_result = {
359 let conn = self.conn.lock().await;
360
361 let mut stmt = conn.prepare("SELECT agent_id FROM memories WHERE id = ?")?;
363 let owner_result =
364 stmt.query_row([memory_id.to_string()], |row| row.get::<_, String>(0));
365 match owner_result {
366 Ok(owner) if owner == principal_id => return Ok(true),
367 Err(duckdb::Error::QueryReturnedNoRows) => {
368 return Err(Error::NotFound(format!("memory {memory_id} not found")));
369 }
370 _ => {}
371 }
372
373 let now = chrono::Utc::now().to_rfc3339();
375 let mut stmt = conn.prepare(
376 "SELECT permission FROM acls WHERE memory_id = ? AND principal_id = ? AND (expires_at IS NULL OR expires_at > ?)",
377 )?;
378 let rows = stmt.query_map(
379 duckdb::params![memory_id.to_string(), principal_id, now.clone()],
380 |row| row.get::<_, String>(0),
381 )?;
382
383 let mut perms: Vec<String> = Vec::new();
384 for row in rows {
385 perms.push(row.map_err(|e| Error::Storage(e.to_string()))?);
386 }
387
388 let mut stmt = conn.prepare(
390 "SELECT permission FROM acls WHERE memory_id = ? AND principal_type = 'public' AND (expires_at IS NULL OR expires_at > ?)",
391 )?;
392 let rows = stmt.query_map(duckdb::params![memory_id.to_string(), now], |row| {
393 row.get::<_, String>(0)
394 })?;
395
396 for row in rows {
397 perms.push(row.map_err(|e| Error::Storage(e.to_string()))?);
398 }
399
400 perms
401 }; for perm_str in &acl_result {
404 if let Ok(perm) = perm_str.parse::<Permission>()
405 && perm.satisfies(required)
406 {
407 return Ok(true);
408 }
409 }
410
411 if self
413 .check_delegation(principal_id, memory_id, required)
414 .await?
415 {
416 return Ok(true);
417 }
418
419 Ok(false)
420 }
421
422 async fn insert_relation(&self, relation: &Relation) -> Result<()> {
423 let conn = self.conn.lock().await;
424 conn.execute(
425 "INSERT INTO relations (id, source_id, target_id, relation_type, weight, metadata, created_at) VALUES (?, ?, ?, ?, ?, ?, ?)",
426 duckdb::params![
427 relation.id.to_string(),
428 relation.source_id.to_string(),
429 relation.target_id.to_string(),
430 relation.relation_type,
431 relation.weight,
432 serde_json::to_string(&relation.metadata)?,
433 relation.created_at,
434 ],
435 )?;
436 Ok(())
437 }
438
439 async fn get_relations_from(&self, source_id: Uuid) -> Result<Vec<Relation>> {
440 let conn = self.conn.lock().await;
441 let mut stmt = conn.prepare(
442 "SELECT id, source_id, target_id, relation_type, weight, metadata, created_at FROM relations WHERE source_id = ?",
443 )?;
444 let rows = stmt.query_map([source_id.to_string()], row_to_relation)?;
445 let mut results = Vec::new();
446 for row in rows {
447 results.push(row.map_err(|e| Error::Storage(e.to_string()))?);
448 }
449 Ok(results)
450 }
451
452 async fn get_relations_to(&self, target_id: Uuid) -> Result<Vec<Relation>> {
453 let conn = self.conn.lock().await;
454 let mut stmt = conn.prepare(
455 "SELECT id, source_id, target_id, relation_type, weight, metadata, created_at FROM relations WHERE target_id = ?",
456 )?;
457 let rows = stmt.query_map([target_id.to_string()], row_to_relation)?;
458 let mut results = Vec::new();
459 for row in rows {
460 results.push(row.map_err(|e| Error::Storage(e.to_string()))?);
461 }
462 Ok(results)
463 }
464
465 async fn delete_relation(&self, id: Uuid) -> Result<()> {
466 let conn = self.conn.lock().await;
467 let affected = conn.execute(
468 "DELETE FROM relations WHERE id = ?",
469 duckdb::params![id.to_string()],
470 )?;
471 if affected == 0 {
472 return Err(Error::NotFound(format!("relation {id} not found")));
473 }
474 Ok(())
475 }
476
477 async fn get_latest_memory_hash(
478 &self,
479 agent_id: &str,
480 thread_id: Option<&str>,
481 ) -> Result<Option<Vec<u8>>> {
482 let conn = self.conn.lock().await;
483 let (sql, result) = if let Some(tid) = thread_id {
484 let mut stmt = conn.prepare(
485 "SELECT content_hash FROM memories WHERE agent_id = ? AND thread_id = ? AND deleted_at IS NULL ORDER BY created_at DESC LIMIT 1",
486 )?;
487 let r = stmt.query_row(duckdb::params![agent_id, tid], |row| {
488 row.get::<_, Vec<u8>>(0)
489 });
490 ((), r)
491 } else {
492 let mut stmt = conn.prepare(
493 "SELECT content_hash FROM memories WHERE agent_id = ? AND thread_id IS NULL AND deleted_at IS NULL ORDER BY created_at DESC LIMIT 1",
494 )?;
495 let r = stmt.query_row(duckdb::params![agent_id], |row| row.get::<_, Vec<u8>>(0));
496 ((), r)
497 };
498 let _ = sql;
499 match result {
500 Ok(hash) => Ok(Some(hash)),
501 Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
502 Err(e) => Err(Error::Storage(e.to_string())),
503 }
504 }
505
506 async fn get_latest_event_hash(
507 &self,
508 agent_id: &str,
509 thread_id: Option<&str>,
510 ) -> Result<Option<Vec<u8>>> {
511 let conn = self.conn.lock().await;
512 let result = if let Some(tid) = thread_id {
513 let mut stmt = conn.prepare(
514 "SELECT content_hash FROM agent_events WHERE agent_id = ? AND thread_id = ? ORDER BY timestamp DESC LIMIT 1",
515 )?;
516 stmt.query_row(duckdb::params![agent_id, tid], |row| {
517 row.get::<_, Vec<u8>>(0)
518 })
519 } else {
520 let mut stmt = conn.prepare(
521 "SELECT content_hash FROM agent_events WHERE agent_id = ? ORDER BY timestamp DESC LIMIT 1",
522 )?;
523 stmt.query_row(duckdb::params![agent_id], |row| row.get::<_, Vec<u8>>(0))
524 };
525 match result {
526 Ok(hash) => Ok(Some(hash)),
527 Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
528 Err(e) => Err(Error::Storage(e.to_string())),
529 }
530 }
531
532 async fn get_sync_watermark(&self, key: &str) -> Result<Option<String>> {
533 let conn = self.conn.lock().await;
534 let mut stmt = conn.prepare("SELECT value FROM sync_metadata WHERE key = ?")?;
535 let result = stmt.query_row(duckdb::params![key], |row| row.get::<_, String>(0));
536 match result {
537 Ok(value) => Ok(Some(value)),
538 Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
539 Err(e) => Err(Error::Storage(e.to_string())),
540 }
541 }
542
543 async fn set_sync_watermark(&self, key: &str, value: &str) -> Result<()> {
544 let conn = self.conn.lock().await;
545 let now = chrono::Utc::now().to_rfc3339();
546 let affected = conn.execute(
548 "UPDATE sync_metadata SET value = ?, updated_at = ? WHERE key = ?",
549 duckdb::params![value, now, key],
550 )?;
551 if affected == 0 {
552 conn.execute(
553 "INSERT INTO sync_metadata (key, value, updated_at) VALUES (?, ?, ?)",
554 duckdb::params![key, value, now],
555 )?;
556 }
557 Ok(())
558 }
559
560 async fn list_accessible_memory_ids(&self, agent_id: &str, limit: usize) -> Result<Vec<Uuid>> {
561 let conn = self.conn.lock().await;
562 let now = chrono::Utc::now().to_rfc3339();
563 let mut stmt = conn.prepare(
564 "SELECT id FROM memories WHERE (agent_id = ? OR scope = 'public' OR id IN (SELECT memory_id FROM acls WHERE principal_id = ? AND (expires_at IS NULL OR expires_at > ?))) AND deleted_at IS NULL LIMIT ?",
565 )?;
566 let rows = stmt.query_map(
567 duckdb::params![agent_id, agent_id, now, limit as i64],
568 |row| row.get::<_, String>(0),
569 )?;
570 let mut ids = Vec::new();
571 for row in rows {
572 let id_str = row.map_err(|e| Error::Storage(e.to_string()))?;
573 ids.push(Uuid::parse_str(&id_str).map_err(|e| Error::Storage(e.to_string()))?);
574 }
575 Ok(ids)
576 }
577
578 async fn insert_event(&self, event: &AgentEvent) -> Result<()> {
579 let conn = self.conn.lock().await;
580 let payload_json = serde_json::to_string(&event.payload)?;
581 let embedding_blob = serialize_embedding(&event.embedding);
582 conn.execute(
583 "INSERT INTO agent_events (id, agent_id, thread_id, run_id, parent_event_id, event_type, payload, trace_id, span_id, model, tokens_input, tokens_output, latency_ms, cost_usd, timestamp, logical_clock, content_hash, prev_hash, embedding) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
584 duckdb::params![
585 event.id.to_string(),
586 event.agent_id,
587 event.thread_id,
588 event.run_id,
589 event.parent_event_id.map(|id| id.to_string()),
590 event.event_type.to_string(),
591 payload_json,
592 event.trace_id,
593 event.span_id,
594 event.model,
595 event.tokens_input,
596 event.tokens_output,
597 event.latency_ms,
598 event.cost_usd,
599 event.timestamp,
600 event.logical_clock,
601 event.content_hash,
602 event.prev_hash,
603 embedding_blob,
604 ],
605 )?;
606 Ok(())
607 }
608
609 async fn list_events(
610 &self,
611 agent_id: &str,
612 limit: usize,
613 offset: usize,
614 ) -> Result<Vec<AgentEvent>> {
615 let conn = self.conn.lock().await;
616 let mut stmt = conn.prepare(
617 "SELECT id, agent_id, thread_id, run_id, parent_event_id, event_type, payload, trace_id, span_id, model, tokens_input, tokens_output, latency_ms, cost_usd, timestamp, logical_clock, content_hash, prev_hash, embedding FROM agent_events WHERE agent_id = ? ORDER BY timestamp DESC LIMIT ? OFFSET ?",
618 )?;
619 let rows = stmt.query_map(
620 duckdb::params![agent_id, limit as i64, offset as i64],
621 row_to_event,
622 )?;
623 let mut results = Vec::new();
624 for row in rows {
625 results.push(row.map_err(|e| Error::Storage(e.to_string()))?);
626 }
627 Ok(results)
628 }
629
630 async fn get_events_by_thread(&self, thread_id: &str, limit: usize) -> Result<Vec<AgentEvent>> {
631 let conn = self.conn.lock().await;
632 let mut stmt = conn.prepare(
633 "SELECT id, agent_id, thread_id, run_id, parent_event_id, event_type, payload, trace_id, span_id, model, tokens_input, tokens_output, latency_ms, cost_usd, timestamp, logical_clock, content_hash, prev_hash, embedding FROM agent_events WHERE thread_id = ? ORDER BY timestamp ASC LIMIT ?",
634 )?;
635 let rows = stmt.query_map(duckdb::params![thread_id, limit as i64], row_to_event)?;
636 let mut results = Vec::new();
637 for row in rows {
638 results.push(row.map_err(|e| Error::Storage(e.to_string()))?);
639 }
640 Ok(results)
641 }
642
643 async fn get_event(&self, id: Uuid) -> Result<Option<AgentEvent>> {
644 let conn = self.conn.lock().await;
645 let mut stmt = conn.prepare(
646 "SELECT id, agent_id, thread_id, run_id, parent_event_id, event_type, payload, trace_id, span_id, model, tokens_input, tokens_output, latency_ms, cost_usd, timestamp, logical_clock, content_hash, prev_hash, embedding FROM agent_events WHERE id = ?",
647 )?;
648 let result = stmt.query_row([id.to_string()], row_to_event);
649 match result {
650 Ok(event) => Ok(Some(event)),
651 Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
652 Err(e) => Err(Error::Storage(e.to_string())),
653 }
654 }
655
656 async fn list_child_events(
657 &self,
658 parent_event_id: Uuid,
659 limit: usize,
660 ) -> Result<Vec<AgentEvent>> {
661 let conn = self.conn.lock().await;
662 let mut stmt = conn.prepare(
663 "SELECT id, agent_id, thread_id, run_id, parent_event_id, event_type, payload, trace_id, span_id, model, tokens_input, tokens_output, latency_ms, cost_usd, timestamp, logical_clock, content_hash, prev_hash, embedding FROM agent_events WHERE parent_event_id = ? ORDER BY timestamp ASC LIMIT ?",
664 )?;
665 let rows = stmt.query_map(
666 duckdb::params![parent_event_id.to_string(), limit as i64],
667 row_to_event,
668 )?;
669 let mut results = Vec::new();
670 for row in rows {
671 results.push(row.map_err(|e| Error::Storage(e.to_string()))?);
672 }
673 Ok(results)
674 }
675
676 async fn list_memories_by_agent_ordered(
677 &self,
678 agent_id: &str,
679 thread_id: Option<&str>,
680 limit: usize,
681 ) -> Result<Vec<MemoryRecord>> {
682 let conn = self.conn.lock().await;
683 let (result,) = if let Some(tid) = thread_id {
684 let mut stmt = conn.prepare(
685 "SELECT id, agent_id, content, memory_type, scope, importance, tags, metadata, embedding, content_hash, prev_hash, source_type, source_id, consolidation_state, access_count, org_id, thread_id, created_at, updated_at, last_accessed_at, expires_at, deleted_at, decay_rate, created_by, version, prev_version_id, quarantined, quarantine_reason, decay_function FROM memories WHERE agent_id = ? AND thread_id = ? AND deleted_at IS NULL ORDER BY created_at ASC LIMIT ?",
686 )?;
687 let rows =
688 stmt.query_map(duckdb::params![agent_id, tid, limit as i64], row_to_memory)?;
689 let mut results = Vec::new();
690 for row in rows {
691 results.push(row.map_err(|e| Error::Storage(e.to_string()))?);
692 }
693 (results,)
694 } else {
695 let mut stmt = conn.prepare(
696 "SELECT id, agent_id, content, memory_type, scope, importance, tags, metadata, embedding, content_hash, prev_hash, source_type, source_id, consolidation_state, access_count, org_id, thread_id, created_at, updated_at, last_accessed_at, expires_at, deleted_at, decay_rate, created_by, version, prev_version_id, quarantined, quarantine_reason, decay_function FROM memories WHERE agent_id = ? AND deleted_at IS NULL ORDER BY created_at ASC LIMIT ?",
697 )?;
698 let rows = stmt.query_map(duckdb::params![agent_id, limit as i64], row_to_memory)?;
699 let mut results = Vec::new();
700 for row in rows {
701 results.push(row.map_err(|e| Error::Storage(e.to_string()))?);
702 }
703 (results,)
704 };
705 Ok(result)
706 }
707
708 async fn list_memories_since(
709 &self,
710 updated_after: &str,
711 limit: usize,
712 ) -> Result<Vec<MemoryRecord>> {
713 let conn = self.conn.lock().await;
714 let mut stmt = conn.prepare(
715 "SELECT id, agent_id, content, memory_type, scope, importance, tags, metadata, embedding, content_hash, prev_hash, source_type, source_id, consolidation_state, access_count, org_id, thread_id, created_at, updated_at, last_accessed_at, expires_at, deleted_at, decay_rate, created_by, version, prev_version_id, quarantined, quarantine_reason, decay_function FROM memories WHERE updated_at > ? ORDER BY updated_at ASC LIMIT ?",
716 )?;
717 let rows = stmt.query_map(duckdb::params![updated_after, limit as i64], row_to_memory)?;
718 let mut results = Vec::new();
719 for row in rows {
720 results.push(row.map_err(|e| Error::Storage(e.to_string()))?);
721 }
722 Ok(results)
723 }
724
725 async fn upsert_memory(&self, record: &MemoryRecord) -> Result<()> {
726 match self.update_memory(record).await {
728 Ok(()) => Ok(()),
729 Err(Error::NotFound(_)) => self.insert_memory(record).await,
730 Err(e) => Err(e),
731 }
732 }
733
734 async fn cleanup_expired(&self) -> Result<usize> {
735 let conn = self.conn.lock().await;
736 let now = chrono::Utc::now().to_rfc3339();
737 let affected = conn.execute(
738 "UPDATE memories SET deleted_at = ? WHERE expires_at IS NOT NULL AND expires_at < ? AND deleted_at IS NULL",
739 duckdb::params![now.clone(), now],
740 )?;
741 Ok(affected)
742 }
743
744 async fn insert_delegation(&self, d: &Delegation) -> Result<()> {
745 let conn = self.conn.lock().await;
746 let scope_type = d.scope.to_string();
747 let scope_value = match &d.scope {
748 DelegationScope::AllMemories => serde_json::Value::Null,
749 DelegationScope::ByTag(tags) => serde_json::json!(tags),
750 DelegationScope::ByMemoryId(ids) => {
751 serde_json::json!(ids.iter().map(|id| id.to_string()).collect::<Vec<_>>())
752 }
753 };
754 let scope_value_json = serde_json::to_string(&scope_value)?;
755
756 conn.execute(
757 "INSERT INTO delegations (id, delegator_id, delegate_id, permission, scope_type, scope_value, max_depth, current_depth, parent_delegation_id, created_at, expires_at, revoked_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
758 duckdb::params![
759 d.id.to_string(),
760 d.delegator_id,
761 d.delegate_id,
762 d.permission.to_string(),
763 scope_type,
764 scope_value_json,
765 d.max_depth as i32,
766 d.current_depth as i32,
767 d.parent_delegation_id.map(|id| id.to_string()),
768 d.created_at,
769 d.expires_at,
770 d.revoked_at,
771 ],
772 )?;
773 Ok(())
774 }
775
776 async fn list_delegations_for(&self, delegate_id: &str) -> Result<Vec<Delegation>> {
777 let conn = self.conn.lock().await;
778 let now = chrono::Utc::now().to_rfc3339();
779 let mut stmt = conn.prepare(
780 "SELECT id, delegator_id, delegate_id, permission, scope_type, scope_value, max_depth, current_depth, parent_delegation_id, created_at, expires_at, revoked_at FROM delegations WHERE delegate_id = ? AND revoked_at IS NULL AND (expires_at IS NULL OR expires_at > ?)",
781 )?;
782 let rows = stmt.query_map(duckdb::params![delegate_id, now], row_to_delegation)?;
783 let mut results = Vec::new();
784 for row in rows {
785 results.push(row.map_err(|e| Error::Storage(e.to_string()))?);
786 }
787 Ok(results)
788 }
789
790 async fn revoke_delegation(&self, id: Uuid) -> Result<()> {
791 let conn = self.conn.lock().await;
792 let now = chrono::Utc::now().to_rfc3339();
793 let affected = conn.execute(
794 "UPDATE delegations SET revoked_at = ? WHERE id = ? AND revoked_at IS NULL",
795 duckdb::params![now, id.to_string()],
796 )?;
797 if affected == 0 {
798 return Err(Error::NotFound(format!(
799 "delegation {id} not found or already revoked"
800 )));
801 }
802 Ok(())
803 }
804
805 async fn check_delegation(
806 &self,
807 delegate_id: &str,
808 memory_id: Uuid,
809 required: Permission,
810 ) -> Result<bool> {
811 let delegations = self.list_delegations_for(delegate_id).await?;
812 let memory = match self.get_memory(memory_id).await? {
814 Some(m) => m,
815 None => return Ok(false),
816 };
817
818 for d in &delegations {
819 if !d.permission.satisfies(required) {
820 continue;
821 }
822 match &d.scope {
823 DelegationScope::AllMemories => return Ok(true),
824 DelegationScope::ByMemoryId(ids) => {
825 if ids.contains(&memory_id) {
826 return Ok(true);
827 }
828 }
829 DelegationScope::ByTag(tags) => {
830 if tags.iter().any(|t| memory.tags.contains(t)) {
831 return Ok(true);
832 }
833 }
834 }
835 }
836 Ok(false)
837 }
838
839 async fn insert_or_update_agent_profile(&self, profile: &AgentProfile) -> Result<()> {
840 let conn = self.conn.lock().await;
841 let affected = conn.execute(
843 "UPDATE agent_profiles SET avg_importance = ?, avg_content_length = ?, total_memories = ?, last_updated = ? WHERE agent_id = ?",
844 duckdb::params![
845 profile.avg_importance,
846 profile.avg_content_length,
847 profile.total_memories as i64,
848 profile.last_updated,
849 profile.agent_id,
850 ],
851 )?;
852 if affected == 0 {
853 conn.execute(
854 "INSERT INTO agent_profiles (agent_id, avg_importance, avg_content_length, total_memories, last_updated) VALUES (?, ?, ?, ?, ?)",
855 duckdb::params![
856 profile.agent_id,
857 profile.avg_importance,
858 profile.avg_content_length,
859 profile.total_memories as i64,
860 profile.last_updated,
861 ],
862 )?;
863 }
864 Ok(())
865 }
866
867 async fn get_agent_profile(&self, agent_id: &str) -> Result<Option<AgentProfile>> {
868 let conn = self.conn.lock().await;
869 let mut stmt = conn.prepare(
870 "SELECT agent_id, avg_importance, avg_content_length, total_memories, last_updated FROM agent_profiles WHERE agent_id = ?",
871 )?;
872 let result = stmt.query_row([agent_id], |row| {
873 Ok(AgentProfile {
874 agent_id: row.get(0)?,
875 avg_importance: row.get(1)?,
876 avg_content_length: row.get(2)?,
877 total_memories: row.get::<_, i64>(3)? as u64,
878 last_updated: row.get(4)?,
879 })
880 });
881 match result {
882 Ok(profile) => Ok(Some(profile)),
883 Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
884 Err(e) => Err(Error::Storage(e.to_string())),
885 }
886 }
887
888 async fn insert_or_update_embedding_baseline(
889 &self,
890 baseline: &EmbeddingBaseline,
891 ) -> Result<()> {
892 let conn = self.conn.lock().await;
893 let mu_json = serde_json::to_string(&baseline.mu)?;
894 let cov_json = serde_json::to_string(&baseline.cov_diag)?;
895 let affected = conn.execute(
896 "UPDATE embedding_baseline SET mu = ?, cov_diag = ?, n = ?, updated_at = ? WHERE agent_id = ?",
897 duckdb::params![
898 mu_json,
899 cov_json,
900 baseline.n as i64,
901 baseline.updated_at,
902 baseline.agent_id,
903 ],
904 )?;
905 if affected == 0 {
906 let mu_json = serde_json::to_string(&baseline.mu)?;
907 let cov_json = serde_json::to_string(&baseline.cov_diag)?;
908 conn.execute(
909 "INSERT INTO embedding_baseline (agent_id, mu, cov_diag, n, updated_at) VALUES (?, ?, ?, ?, ?)",
910 duckdb::params![
911 baseline.agent_id,
912 mu_json,
913 cov_json,
914 baseline.n as i64,
915 baseline.updated_at,
916 ],
917 )?;
918 }
919 Ok(())
920 }
921
922 async fn get_embedding_baseline(&self, agent_id: &str) -> Result<Option<EmbeddingBaseline>> {
923 let conn = self.conn.lock().await;
924 let mut stmt = conn.prepare(
925 "SELECT agent_id, mu, cov_diag, n, updated_at FROM embedding_baseline WHERE agent_id = ?",
926 )?;
927 let result: duckdb::Result<(String, String, String, i64, String)> =
928 stmt.query_row([agent_id], |row| {
929 Ok((
930 row.get(0)?,
931 row.get(1)?,
932 row.get(2)?,
933 row.get(3)?,
934 row.get(4)?,
935 ))
936 });
937 match result {
938 Ok((agent_id, mu_json, cov_json, n, updated_at)) => {
939 let mu: Vec<f32> = serde_json::from_str(&mu_json)?;
940 let cov_diag: Vec<f32> = serde_json::from_str(&cov_json)?;
941 Ok(Some(EmbeddingBaseline {
942 agent_id,
943 mu,
944 cov_diag,
945 n: n as u64,
946 updated_at,
947 }))
948 }
949 Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
950 Err(e) => Err(Error::Storage(e.to_string())),
951 }
952 }
953
954 async fn insert_checkpoint(&self, cp: &Checkpoint) -> Result<()> {
955 let conn = self.conn.lock().await;
956 let state_snapshot_json = serde_json::to_string(&cp.state_snapshot)?;
957 let state_diff_json = cp
958 .state_diff
959 .as_ref()
960 .map(serde_json::to_string)
961 .transpose()?;
962 let memory_refs_json = serde_json::to_string(
963 &cp.memory_refs
964 .iter()
965 .map(|id| id.to_string())
966 .collect::<Vec<_>>(),
967 )?;
968 let metadata_json = serde_json::to_string(&cp.metadata)?;
969
970 conn.execute(
971 "INSERT INTO checkpoints (id, thread_id, agent_id, parent_id, branch_name, state_snapshot, state_diff, memory_refs, event_cursor, label, created_at, metadata) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
972 duckdb::params![
973 cp.id.to_string(),
974 cp.thread_id,
975 cp.agent_id,
976 cp.parent_id.map(|id| id.to_string()),
977 cp.branch_name,
978 state_snapshot_json,
979 state_diff_json,
980 memory_refs_json,
981 cp.event_cursor.map(|id| id.to_string()),
982 cp.label,
983 cp.created_at,
984 metadata_json,
985 ],
986 )?;
987 Ok(())
988 }
989
990 async fn get_checkpoint(&self, id: Uuid) -> Result<Option<Checkpoint>> {
991 let conn = self.conn.lock().await;
992 let mut stmt = conn.prepare(
993 "SELECT id, thread_id, agent_id, parent_id, branch_name, state_snapshot, state_diff, memory_refs, event_cursor, label, created_at, metadata FROM checkpoints WHERE id = ?",
994 )?;
995 let result = stmt.query_row([id.to_string()], row_to_checkpoint);
996 match result {
997 Ok(cp) => Ok(Some(cp)),
998 Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
999 Err(e) => Err(Error::Storage(e.to_string())),
1000 }
1001 }
1002
1003 async fn list_checkpoints(
1004 &self,
1005 thread_id: &str,
1006 branch: Option<&str>,
1007 limit: usize,
1008 ) -> Result<Vec<Checkpoint>> {
1009 let conn = self.conn.lock().await;
1010 let (sql, rows_result) = if let Some(branch_name) = branch {
1011 let mut stmt = conn.prepare(
1012 "SELECT id, thread_id, agent_id, parent_id, branch_name, state_snapshot, state_diff, memory_refs, event_cursor, label, created_at, metadata FROM checkpoints WHERE thread_id = ? AND branch_name = ? ORDER BY created_at DESC LIMIT ?",
1013 )?;
1014 let rows = stmt.query_map(
1015 duckdb::params![thread_id, branch_name, limit as i64],
1016 row_to_checkpoint,
1017 )?;
1018 let mut results = Vec::new();
1019 for row in rows {
1020 results.push(row.map_err(|e| Error::Storage(e.to_string()))?);
1021 }
1022 ((), Ok(results))
1023 } else {
1024 let mut stmt = conn.prepare(
1025 "SELECT id, thread_id, agent_id, parent_id, branch_name, state_snapshot, state_diff, memory_refs, event_cursor, label, created_at, metadata FROM checkpoints WHERE thread_id = ? ORDER BY created_at DESC LIMIT ?",
1026 )?;
1027 let rows =
1028 stmt.query_map(duckdb::params![thread_id, limit as i64], row_to_checkpoint)?;
1029 let mut results = Vec::new();
1030 for row in rows {
1031 results.push(row.map_err(|e| Error::Storage(e.to_string()))?);
1032 }
1033 ((), Ok(results))
1034 };
1035 let _ = sql;
1036 rows_result
1037 }
1038
1039 async fn get_latest_checkpoint(
1040 &self,
1041 thread_id: &str,
1042 branch: &str,
1043 ) -> Result<Option<Checkpoint>> {
1044 let conn = self.conn.lock().await;
1045 let mut stmt = conn.prepare(
1046 "SELECT id, thread_id, agent_id, parent_id, branch_name, state_snapshot, state_diff, memory_refs, event_cursor, label, created_at, metadata FROM checkpoints WHERE thread_id = ? AND branch_name = ? ORDER BY created_at DESC LIMIT 1",
1047 )?;
1048 let result = stmt.query_row(duckdb::params![thread_id, branch], row_to_checkpoint);
1049 match result {
1050 Ok(cp) => Ok(Some(cp)),
1051 Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
1052 Err(e) => Err(Error::Storage(e.to_string())),
1053 }
1054 }
1055}
1056
1057fn row_to_event(row: &duckdb::Row<'_>) -> duckdb::Result<AgentEvent> {
1058 let id_str: String = row.get(0)?;
1059 let parent_id_str: Option<String> = row.get(4)?;
1060 let payload_json: Option<String> = row.get(6)?;
1061 let event_type_str: String = row.get(5)?;
1062 let content_hash: Vec<u8> = row.get(16)?;
1063 let prev_hash: Option<Vec<u8>> = row.get(17)?;
1064 let embedding_blob: Option<Vec<u8>> = row.get(18).unwrap_or(None);
1065
1066 Ok(AgentEvent {
1067 id: Uuid::parse_str(&id_str).map_err(|e| {
1068 duckdb::Error::FromSqlConversionFailure(0, duckdb::types::Type::Text, Box::new(e))
1069 })?,
1070 agent_id: row.get(1)?,
1071 thread_id: row.get(2)?,
1072 run_id: row.get(3)?,
1073 parent_event_id: parent_id_str.and_then(|s| Uuid::parse_str(&s).ok()),
1074 event_type: event_type_str.parse().map_err(|e: Error| {
1075 duckdb::Error::FromSqlConversionFailure(
1076 5,
1077 duckdb::types::Type::Text,
1078 e.to_string().into(),
1079 )
1080 })?,
1081 payload: payload_json
1082 .and_then(|s| serde_json::from_str(&s).ok())
1083 .unwrap_or(serde_json::Value::Null),
1084 trace_id: row.get(7)?,
1085 span_id: row.get(8)?,
1086 model: row.get(9)?,
1087 tokens_input: row.get(10)?,
1088 tokens_output: row.get(11)?,
1089 latency_ms: row.get(12)?,
1090 cost_usd: row.get(13)?,
1091 timestamp: row.get(14)?,
1092 logical_clock: row.get(15)?,
1093 content_hash,
1094 prev_hash,
1095 embedding: deserialize_embedding(embedding_blob),
1096 })
1097}
1098
1099fn row_to_checkpoint(row: &duckdb::Row<'_>) -> duckdb::Result<Checkpoint> {
1100 let id_str: String = row.get(0)?;
1101 let parent_id_str: Option<String> = row.get(3)?;
1102 let state_snapshot_json: Option<String> = row.get(5)?;
1103 let state_diff_json: Option<String> = row.get(6)?;
1104 let memory_refs_json: Option<String> = row.get(7)?;
1105 let event_cursor_str: Option<String> = row.get(8)?;
1106 let metadata_json: Option<String> = row.get(11)?;
1107
1108 Ok(Checkpoint {
1109 id: Uuid::parse_str(&id_str).map_err(|e| {
1110 duckdb::Error::FromSqlConversionFailure(0, duckdb::types::Type::Text, Box::new(e))
1111 })?,
1112 thread_id: row.get(1)?,
1113 agent_id: row.get(2)?,
1114 parent_id: parent_id_str.and_then(|s| Uuid::parse_str(&s).ok()),
1115 branch_name: row.get(4)?,
1116 state_snapshot: state_snapshot_json
1117 .and_then(|s| serde_json::from_str(&s).ok())
1118 .unwrap_or(serde_json::Value::Object(serde_json::Map::new())),
1119 state_diff: state_diff_json.and_then(|s| serde_json::from_str(&s).ok()),
1120 memory_refs: memory_refs_json
1121 .and_then(|s| serde_json::from_str::<Vec<String>>(&s).ok())
1122 .map(|v| {
1123 v.into_iter()
1124 .filter_map(|s| Uuid::parse_str(&s).ok())
1125 .collect()
1126 })
1127 .unwrap_or_default(),
1128 event_cursor: event_cursor_str.and_then(|s| Uuid::parse_str(&s).ok()),
1129 label: row.get(9)?,
1130 created_at: row.get(10)?,
1131 metadata: metadata_json
1132 .and_then(|s| serde_json::from_str(&s).ok())
1133 .unwrap_or(serde_json::Value::Object(serde_json::Map::new())),
1134 })
1135}
1136
1137fn row_to_delegation(row: &duckdb::Row<'_>) -> duckdb::Result<Delegation> {
1138 let id_str: String = row.get(0)?;
1139 let scope_type: String = row.get(4)?;
1140 let scope_value_json: Option<String> = row.get(5)?;
1141 let parent_id_str: Option<String> = row.get(8)?;
1142
1143 let scope = match scope_type.as_str() {
1144 "by_tag" => {
1145 let tags: Vec<String> = scope_value_json
1146 .and_then(|s| serde_json::from_str(&s).ok())
1147 .unwrap_or_default();
1148 DelegationScope::ByTag(tags)
1149 }
1150 "by_memory_id" => {
1151 let ids: Vec<String> = scope_value_json
1152 .and_then(|s| serde_json::from_str(&s).ok())
1153 .unwrap_or_default();
1154 let uuids = ids
1155 .into_iter()
1156 .filter_map(|s| Uuid::parse_str(&s).ok())
1157 .collect();
1158 DelegationScope::ByMemoryId(uuids)
1159 }
1160 _ => DelegationScope::AllMemories,
1161 };
1162
1163 let permission_str: String = row.get(3)?;
1164
1165 Ok(Delegation {
1166 id: Uuid::parse_str(&id_str).map_err(|e| {
1167 duckdb::Error::FromSqlConversionFailure(0, duckdb::types::Type::Text, Box::new(e))
1168 })?,
1169 delegator_id: row.get(1)?,
1170 delegate_id: row.get(2)?,
1171 permission: permission_str.parse().map_err(|e: Error| {
1172 duckdb::Error::FromSqlConversionFailure(
1173 3,
1174 duckdb::types::Type::Text,
1175 e.to_string().into(),
1176 )
1177 })?,
1178 scope,
1179 max_depth: row.get::<_, i32>(6)? as u32,
1180 current_depth: row.get::<_, i32>(7)? as u32,
1181 parent_delegation_id: parent_id_str.and_then(|s| Uuid::parse_str(&s).ok()),
1182 created_at: row.get(9)?,
1183 expires_at: row.get(10)?,
1184 revoked_at: row.get(11)?,
1185 })
1186}
1187
1188fn row_to_relation(row: &duckdb::Row<'_>) -> duckdb::Result<Relation> {
1189 let id_str: String = row.get(0)?;
1190 let source_str: String = row.get(1)?;
1191 let target_str: String = row.get(2)?;
1192 let metadata_json: Option<String> = row.get(5)?;
1193
1194 Ok(Relation {
1195 id: Uuid::parse_str(&id_str).map_err(|e| {
1196 duckdb::Error::FromSqlConversionFailure(0, duckdb::types::Type::Text, Box::new(e))
1197 })?,
1198 source_id: Uuid::parse_str(&source_str).map_err(|e| {
1199 duckdb::Error::FromSqlConversionFailure(1, duckdb::types::Type::Text, Box::new(e))
1200 })?,
1201 target_id: Uuid::parse_str(&target_str).map_err(|e| {
1202 duckdb::Error::FromSqlConversionFailure(2, duckdb::types::Type::Text, Box::new(e))
1203 })?,
1204 relation_type: row.get(3)?,
1205 weight: row.get(4)?,
1206 metadata: metadata_json
1207 .and_then(|s| serde_json::from_str(&s).ok())
1208 .unwrap_or(serde_json::Value::Object(serde_json::Map::new())),
1209 created_at: row.get(6)?,
1210 })
1211}
1212
1213#[cfg(test)]
1214mod tests {
1215 use super::*;
1216 use crate::hash::compute_content_hash;
1217 use crate::model::acl::PrincipalType;
1218 use crate::model::checkpoint::Checkpoint;
1219 use crate::model::event::{AgentEvent, EventType};
1220 use crate::model::memory::{ConsolidationState, MemoryType, Scope, SourceType};
1221
1222 fn make_record(agent_id: &str) -> MemoryRecord {
1223 let now = chrono::Utc::now().to_rfc3339();
1224 let content = "test memory content";
1225 MemoryRecord {
1226 id: Uuid::now_v7(),
1227 agent_id: agent_id.to_string(),
1228 content: content.to_string(),
1229 memory_type: MemoryType::Semantic,
1230 scope: Scope::Private,
1231 importance: 0.7,
1232 tags: vec!["test".to_string()],
1233 metadata: serde_json::json!({"key": "value"}),
1234 embedding: Some(vec![0.1, 0.2, 0.3]),
1235 content_hash: compute_content_hash(content, agent_id, &now),
1236 prev_hash: None,
1237 source_type: SourceType::Agent,
1238 source_id: None,
1239 consolidation_state: ConsolidationState::Raw,
1240 access_count: 0,
1241 org_id: None,
1242 thread_id: None,
1243 created_at: now.clone(),
1244 updated_at: now,
1245 last_accessed_at: None,
1246 expires_at: None,
1247 deleted_at: None,
1248 decay_rate: None,
1249 created_by: None,
1250 version: 1,
1251 prev_version_id: None,
1252 quarantined: false,
1253 quarantine_reason: None,
1254 decay_function: None,
1255 }
1256 }
1257
1258 #[tokio::test]
1259 async fn test_insert_and_get() {
1260 let storage = DuckDbStorage::open_in_memory().unwrap();
1261 let record = make_record("agent-1");
1262 storage.insert_memory(&record).await.unwrap();
1263
1264 let fetched = storage.get_memory(record.id).await.unwrap().unwrap();
1265 assert_eq!(fetched.id, record.id);
1266 assert_eq!(fetched.content, record.content);
1267 assert_eq!(fetched.agent_id, record.agent_id);
1268 assert_eq!(fetched.memory_type, record.memory_type);
1269 assert_eq!(fetched.tags, record.tags);
1270 assert_eq!(fetched.embedding, record.embedding);
1271 }
1272
1273 #[tokio::test]
1274 async fn test_get_nonexistent() {
1275 let storage = DuckDbStorage::open_in_memory().unwrap();
1276 let result = storage.get_memory(Uuid::now_v7()).await.unwrap();
1277 assert!(result.is_none());
1278 }
1279
1280 #[tokio::test]
1281 async fn test_soft_delete() {
1282 let storage = DuckDbStorage::open_in_memory().unwrap();
1283 let record = make_record("agent-1");
1284 storage.insert_memory(&record).await.unwrap();
1285
1286 storage.soft_delete_memory(record.id).await.unwrap();
1287
1288 let fetched = storage.get_memory(record.id).await.unwrap().unwrap();
1290 assert!(fetched.deleted_at.is_some());
1291
1292 let filter = MemoryFilter::default();
1294 let list = storage.list_memories(&filter, 100, 0).await.unwrap();
1295 assert!(list.is_empty());
1296
1297 let filter_with_deleted = MemoryFilter {
1299 include_deleted: true,
1300 ..Default::default()
1301 };
1302 let list = storage
1303 .list_memories(&filter_with_deleted, 100, 0)
1304 .await
1305 .unwrap();
1306 assert_eq!(list.len(), 1);
1307 }
1308
1309 #[tokio::test]
1310 async fn test_hard_delete() {
1311 let storage = DuckDbStorage::open_in_memory().unwrap();
1312 let record = make_record("agent-1");
1313 storage.insert_memory(&record).await.unwrap();
1314
1315 storage.hard_delete_memory(record.id).await.unwrap();
1316
1317 let result = storage.get_memory(record.id).await.unwrap();
1318 assert!(result.is_none());
1319 }
1320
1321 #[tokio::test]
1322 async fn test_list_with_filters() {
1323 let storage = DuckDbStorage::open_in_memory().unwrap();
1324
1325 let mut r1 = make_record("agent-1");
1326 r1.memory_type = MemoryType::Episodic;
1327 storage.insert_memory(&r1).await.unwrap();
1328
1329 let mut r2 = make_record("agent-1");
1330 r2.memory_type = MemoryType::Semantic;
1331 storage.insert_memory(&r2).await.unwrap();
1332
1333 let mut r3 = make_record("agent-2");
1334 r3.memory_type = MemoryType::Semantic;
1335 storage.insert_memory(&r3).await.unwrap();
1336
1337 let filter = MemoryFilter {
1339 agent_id: Some("agent-1".to_string()),
1340 ..Default::default()
1341 };
1342 let list = storage.list_memories(&filter, 100, 0).await.unwrap();
1343 assert_eq!(list.len(), 2);
1344
1345 let filter = MemoryFilter {
1347 memory_type: Some(MemoryType::Semantic),
1348 ..Default::default()
1349 };
1350 let list = storage.list_memories(&filter, 100, 0).await.unwrap();
1351 assert_eq!(list.len(), 2);
1352
1353 let filter = MemoryFilter {
1355 agent_id: Some("agent-1".to_string()),
1356 memory_type: Some(MemoryType::Episodic),
1357 ..Default::default()
1358 };
1359 let list = storage.list_memories(&filter, 100, 0).await.unwrap();
1360 assert_eq!(list.len(), 1);
1361 }
1362
1363 #[tokio::test]
1364 async fn test_touch_memory() {
1365 let storage = DuckDbStorage::open_in_memory().unwrap();
1366 let record = make_record("agent-1");
1367 storage.insert_memory(&record).await.unwrap();
1368
1369 storage.touch_memory(record.id).await.unwrap();
1370 storage.touch_memory(record.id).await.unwrap();
1371
1372 let fetched = storage.get_memory(record.id).await.unwrap().unwrap();
1373 assert_eq!(fetched.access_count, 2);
1374 assert!(fetched.last_accessed_at.is_some());
1375 }
1376
1377 #[tokio::test]
1378 async fn test_acl_and_permission_check() {
1379 let storage = DuckDbStorage::open_in_memory().unwrap();
1380 let record = make_record("agent-1");
1381 storage.insert_memory(&record).await.unwrap();
1382
1383 assert!(
1385 storage
1386 .check_permission(record.id, "agent-1", Permission::Admin)
1387 .await
1388 .unwrap()
1389 );
1390
1391 assert!(
1393 !storage
1394 .check_permission(record.id, "agent-2", Permission::Read)
1395 .await
1396 .unwrap()
1397 );
1398
1399 let acl = Acl {
1401 id: Uuid::now_v7(),
1402 memory_id: record.id,
1403 principal_type: PrincipalType::Agent,
1404 principal_id: "agent-2".to_string(),
1405 permission: Permission::Read,
1406 granted_by: "agent-1".to_string(),
1407 created_at: chrono::Utc::now().to_rfc3339(),
1408 expires_at: None,
1409 };
1410 storage.insert_acl(&acl).await.unwrap();
1411
1412 assert!(
1414 storage
1415 .check_permission(record.id, "agent-2", Permission::Read)
1416 .await
1417 .unwrap()
1418 );
1419 assert!(
1421 !storage
1422 .check_permission(record.id, "agent-2", Permission::Write)
1423 .await
1424 .unwrap()
1425 );
1426 }
1427
1428 #[tokio::test]
1429 async fn test_event_insert_and_list() {
1430 let storage = DuckDbStorage::open_in_memory().unwrap();
1431 let now = chrono::Utc::now().to_rfc3339();
1432 let event = AgentEvent {
1433 id: Uuid::now_v7(),
1434 agent_id: "agent-1".to_string(),
1435 thread_id: Some("thread-1".to_string()),
1436 run_id: None,
1437 parent_event_id: None,
1438 event_type: EventType::MemoryWrite,
1439 payload: serde_json::json!({"memory_id": "abc"}),
1440 trace_id: None,
1441 span_id: None,
1442 model: None,
1443 tokens_input: None,
1444 tokens_output: None,
1445 latency_ms: None,
1446 cost_usd: None,
1447 timestamp: now.clone(),
1448 logical_clock: 1,
1449 content_hash: vec![1, 2, 3],
1450 prev_hash: None,
1451 embedding: None,
1452 };
1453
1454 storage.insert_event(&event).await.unwrap();
1455
1456 let events = storage.list_events("agent-1", 10, 0).await.unwrap();
1457 assert_eq!(events.len(), 1);
1458 assert_eq!(events[0].id, event.id);
1459 assert_eq!(events[0].event_type, EventType::MemoryWrite);
1460 assert_eq!(events[0].agent_id, "agent-1");
1461
1462 let fetched = storage.get_event(event.id).await.unwrap().unwrap();
1464 assert_eq!(fetched.id, event.id);
1465 assert_eq!(fetched.content_hash, vec![1, 2, 3]);
1466 }
1467
1468 #[tokio::test]
1469 async fn test_events_by_thread() {
1470 let storage = DuckDbStorage::open_in_memory().unwrap();
1471 let now = chrono::Utc::now().to_rfc3339();
1472
1473 for i in 0..3 {
1474 let event = AgentEvent {
1475 id: Uuid::now_v7(),
1476 agent_id: "agent-1".to_string(),
1477 thread_id: Some("thread-A".to_string()),
1478 run_id: None,
1479 parent_event_id: None,
1480 event_type: EventType::MemoryWrite,
1481 payload: serde_json::json!({"i": i}),
1482 trace_id: None,
1483 span_id: None,
1484 model: None,
1485 tokens_input: None,
1486 tokens_output: None,
1487 latency_ms: None,
1488 cost_usd: None,
1489 timestamp: now.clone(),
1490 logical_clock: i,
1491 content_hash: vec![i as u8],
1492 prev_hash: None,
1493 embedding: None,
1494 };
1495 storage.insert_event(&event).await.unwrap();
1496 }
1497
1498 let event = AgentEvent {
1500 id: Uuid::now_v7(),
1501 agent_id: "agent-1".to_string(),
1502 thread_id: Some("thread-B".to_string()),
1503 run_id: None,
1504 parent_event_id: None,
1505 event_type: EventType::MemoryRead,
1506 payload: serde_json::json!({}),
1507 trace_id: None,
1508 span_id: None,
1509 model: None,
1510 tokens_input: None,
1511 tokens_output: None,
1512 latency_ms: None,
1513 cost_usd: None,
1514 timestamp: now.clone(),
1515 logical_clock: 0,
1516 content_hash: vec![99],
1517 prev_hash: None,
1518 embedding: None,
1519 };
1520 storage.insert_event(&event).await.unwrap();
1521
1522 let thread_a = storage.get_events_by_thread("thread-A", 10).await.unwrap();
1523 assert_eq!(thread_a.len(), 3);
1524
1525 let thread_b = storage.get_events_by_thread("thread-B", 10).await.unwrap();
1526 assert_eq!(thread_b.len(), 1);
1527 assert_eq!(thread_b[0].event_type, EventType::MemoryRead);
1528 }
1529
1530 #[tokio::test]
1531 async fn test_checkpoint_insert_and_get() {
1532 let storage = DuckDbStorage::open_in_memory().unwrap();
1533 let mem_id = Uuid::now_v7();
1534 let cp = Checkpoint {
1535 id: Uuid::now_v7(),
1536 thread_id: "thread-1".to_string(),
1537 agent_id: "agent-1".to_string(),
1538 parent_id: None,
1539 branch_name: "main".to_string(),
1540 state_snapshot: serde_json::json!({"step": 1}),
1541 state_diff: None,
1542 memory_refs: vec![mem_id],
1543 event_cursor: None,
1544 label: Some("initial".to_string()),
1545 created_at: chrono::Utc::now().to_rfc3339(),
1546 metadata: serde_json::json!({}),
1547 };
1548
1549 storage.insert_checkpoint(&cp).await.unwrap();
1550
1551 let fetched = storage.get_checkpoint(cp.id).await.unwrap().unwrap();
1552 assert_eq!(fetched.id, cp.id);
1553 assert_eq!(fetched.thread_id, "thread-1");
1554 assert_eq!(fetched.branch_name, "main");
1555 assert_eq!(fetched.memory_refs, vec![mem_id]);
1556 assert_eq!(fetched.label, Some("initial".to_string()));
1557 }
1558
1559 #[tokio::test]
1560 async fn test_checkpoint_list_and_latest() {
1561 let storage = DuckDbStorage::open_in_memory().unwrap();
1562
1563 let cp1 = Checkpoint {
1564 id: Uuid::now_v7(),
1565 thread_id: "thread-1".to_string(),
1566 agent_id: "agent-1".to_string(),
1567 parent_id: None,
1568 branch_name: "main".to_string(),
1569 state_snapshot: serde_json::json!({"step": 1}),
1570 state_diff: None,
1571 memory_refs: vec![],
1572 event_cursor: None,
1573 label: Some("first".to_string()),
1574 created_at: "2025-01-01T00:00:00Z".to_string(),
1575 metadata: serde_json::json!({}),
1576 };
1577 storage.insert_checkpoint(&cp1).await.unwrap();
1578
1579 let cp2 = Checkpoint {
1580 id: Uuid::now_v7(),
1581 thread_id: "thread-1".to_string(),
1582 agent_id: "agent-1".to_string(),
1583 parent_id: Some(cp1.id),
1584 branch_name: "main".to_string(),
1585 state_snapshot: serde_json::json!({"step": 2}),
1586 state_diff: Some(serde_json::json!({"step": [1, 2]})),
1587 memory_refs: vec![],
1588 event_cursor: None,
1589 label: Some("second".to_string()),
1590 created_at: "2025-01-02T00:00:00Z".to_string(),
1591 metadata: serde_json::json!({}),
1592 };
1593 storage.insert_checkpoint(&cp2).await.unwrap();
1594
1595 let cp3 = Checkpoint {
1596 id: Uuid::now_v7(),
1597 thread_id: "thread-1".to_string(),
1598 agent_id: "agent-1".to_string(),
1599 parent_id: Some(cp1.id),
1600 branch_name: "experiment".to_string(),
1601 state_snapshot: serde_json::json!({"step": "alt"}),
1602 state_diff: None,
1603 memory_refs: vec![],
1604 event_cursor: None,
1605 label: None,
1606 created_at: "2025-01-03T00:00:00Z".to_string(),
1607 metadata: serde_json::json!({}),
1608 };
1609 storage.insert_checkpoint(&cp3).await.unwrap();
1610
1611 let all = storage
1613 .list_checkpoints("thread-1", None, 10)
1614 .await
1615 .unwrap();
1616 assert_eq!(all.len(), 3);
1617
1618 let main_cps = storage
1620 .list_checkpoints("thread-1", Some("main"), 10)
1621 .await
1622 .unwrap();
1623 assert_eq!(main_cps.len(), 2);
1624
1625 let exp_cps = storage
1626 .list_checkpoints("thread-1", Some("experiment"), 10)
1627 .await
1628 .unwrap();
1629 assert_eq!(exp_cps.len(), 1);
1630
1631 let latest = storage
1633 .get_latest_checkpoint("thread-1", "main")
1634 .await
1635 .unwrap()
1636 .unwrap();
1637 assert_eq!(latest.id, cp2.id);
1638
1639 let latest_exp = storage
1641 .get_latest_checkpoint("thread-1", "experiment")
1642 .await
1643 .unwrap()
1644 .unwrap();
1645 assert_eq!(latest_exp.id, cp3.id);
1646
1647 let none = storage
1649 .get_latest_checkpoint("thread-1", "nonexistent")
1650 .await
1651 .unwrap();
1652 assert!(none.is_none());
1653 }
1654}