Skip to main content

aonyx_memory/
diary.rs

1//! Append-only narrative log per project.
2//!
3//! Port reference: Aonyx RAG `rag_system/agent/diary.py`.
4//!
5//! The diary is the agent's running journal: short, dated, free-form notes
6//! about what happened, what was decided, and what surprised the agent. Unlike
7//! the [`crate::kg::KgStore`] — which models *structured* facts — the diary
8//! stores prose. The two complement each other: a diary entry can reference
9//! a KG entity, and a KG fact can cite a diary entry as its source.
10//!
11//! ## Schema (idempotent SQLite migration)
12//!
13//! ```sql
14//! CREATE TABLE IF NOT EXISTS diary (
15//!     id        TEXT PRIMARY KEY,
16//!     project   TEXT NOT NULL,
17//!     ts        TEXT NOT NULL,
18//!     content   TEXT NOT NULL,
19//!     kind      TEXT,
20//!     refs_json TEXT
21//! );
22//! CREATE INDEX IF NOT EXISTS idx_diary_project_ts ON diary(project, ts DESC);
23//! ```
24
25use std::path::Path;
26use std::sync::{Arc, Mutex};
27
28use aonyx_core::{AonyxError, Result};
29use async_trait::async_trait;
30use chrono::{DateTime, Utc};
31use rusqlite::{params, Connection, Row};
32use serde::{Deserialize, Serialize};
33use serde_json::Value as JsonValue;
34use uuid::Uuid;
35
36/// Stable identifier for a [`DiaryEntry`].
37pub type DiaryEntryId = Uuid;
38
39/// A single diary entry.
40#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
41pub struct DiaryEntry {
42    /// Stable id (UUID v4 by default).
43    pub id: DiaryEntryId,
44    /// Project slug this entry belongs to.
45    pub project: String,
46    /// Wall-clock timestamp the entry was written.
47    pub ts: DateTime<Utc>,
48    /// Free-form markdown body.
49    pub content: String,
50    /// Optional classifier (`"decision"`, `"fact"`, `"note"`, `"surprise"`).
51    pub kind: Option<String>,
52    /// Structured cross-references (KG entity ids, doc ids, urls).
53    #[serde(default)]
54    pub refs: JsonValue,
55}
56
57impl DiaryEntry {
58    /// Build a new free-form entry stamped with the current time.
59    pub fn new(project: impl Into<String>, content: impl Into<String>) -> Self {
60        Self {
61            id: Uuid::new_v4(),
62            project: project.into(),
63            ts: Utc::now(),
64            content: content.into(),
65            kind: None,
66            refs: JsonValue::Null,
67        }
68    }
69
70    /// Attach a classifier (`"decision"`, `"fact"`, …).
71    pub fn with_kind(mut self, kind: impl Into<String>) -> Self {
72        self.kind = Some(kind.into());
73        self
74    }
75
76    /// Attach a JSON references payload.
77    pub fn with_refs(mut self, refs: JsonValue) -> Self {
78        self.refs = refs;
79        self
80    }
81}
82
83/// Asynchronous diary store.
84#[async_trait]
85pub trait DiaryStore: Send + Sync {
86    /// Append a new entry.
87    async fn append(&self, entry: DiaryEntry) -> Result<DiaryEntryId>;
88
89    /// List the most recent entries for a project (newest first).
90    async fn recent(&self, project: &str, limit: usize) -> Result<Vec<DiaryEntry>>;
91
92    /// List every entry for a project (newest first).
93    async fn all(&self, project: &str) -> Result<Vec<DiaryEntry>>;
94
95    /// Count entries for a project.
96    async fn count(&self, project: &str) -> Result<usize>;
97}
98
99/// SQLite-backed [`DiaryStore`].
100#[derive(Clone)]
101pub struct SqliteDiaryStore {
102    conn: Arc<Mutex<Connection>>,
103}
104
105impl SqliteDiaryStore {
106    /// Open (or create) the diary database at `path`.
107    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
108        let conn = Connection::open(path.as_ref())
109            .map_err(|e| AonyxError::Memory(format!("open diary db: {e}")))?;
110        Self::migrate(&conn)?;
111        Ok(Self {
112            conn: Arc::new(Mutex::new(conn)),
113        })
114    }
115
116    /// Open an in-memory database — convenient for tests.
117    pub fn open_in_memory() -> Result<Self> {
118        let conn = Connection::open_in_memory()
119            .map_err(|e| AonyxError::Memory(format!("open in-memory diary: {e}")))?;
120        Self::migrate(&conn)?;
121        Ok(Self {
122            conn: Arc::new(Mutex::new(conn)),
123        })
124    }
125
126    fn migrate(conn: &Connection) -> Result<()> {
127        conn.execute_batch(MIGRATION_V1)
128            .map_err(|e| AonyxError::Memory(format!("migrate diary schema: {e}")))?;
129        Ok(())
130    }
131}
132
133const MIGRATION_V1: &str = r#"
134CREATE TABLE IF NOT EXISTS diary (
135    id        TEXT PRIMARY KEY,
136    project   TEXT NOT NULL,
137    ts        TEXT NOT NULL,
138    content   TEXT NOT NULL,
139    kind      TEXT,
140    refs_json TEXT
141);
142
143CREATE INDEX IF NOT EXISTS idx_diary_project_ts ON diary(project, ts DESC);
144"#;
145
146const DIARY_COLUMNS: &str = "id, project, ts, content, kind, refs_json";
147
148fn entry_from_row(row: &Row<'_>) -> rusqlite::Result<DiaryEntry> {
149    let id_str: String = row.get(0)?;
150    let project: String = row.get(1)?;
151    let ts_raw: String = row.get(2)?;
152    let content: String = row.get(3)?;
153    let kind: Option<String> = row.get(4)?;
154    let refs_json: Option<String> = row.get(5)?;
155
156    let id = Uuid::parse_str(&id_str).map_err(|e| {
157        rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(e))
158    })?;
159    let ts = DateTime::parse_from_rfc3339(&ts_raw)
160        .map(|d| d.with_timezone(&Utc))
161        .unwrap_or_else(|_| Utc::now());
162    let refs = refs_json
163        .and_then(|s| serde_json::from_str(&s).ok())
164        .unwrap_or(JsonValue::Null);
165
166    Ok(DiaryEntry {
167        id,
168        project,
169        ts,
170        content,
171        kind,
172        refs,
173    })
174}
175
176#[async_trait]
177impl DiaryStore for SqliteDiaryStore {
178    async fn append(&self, entry: DiaryEntry) -> Result<DiaryEntryId> {
179        let conn = self.conn.clone();
180        let id = entry.id;
181        tokio::task::spawn_blocking(move || -> Result<()> {
182            let lock = conn.lock().expect("diary mutex poisoned");
183            lock.execute(
184                r#"
185                INSERT INTO diary (id, project, ts, content, kind, refs_json)
186                VALUES (?1, ?2, ?3, ?4, ?5, ?6)
187                "#,
188                params![
189                    entry.id.to_string(),
190                    entry.project,
191                    entry.ts.to_rfc3339(),
192                    entry.content,
193                    entry.kind,
194                    serde_json::to_string(&entry.refs).ok(),
195                ],
196            )
197            .map_err(|e| AonyxError::Memory(format!("diary append: {e}")))?;
198            Ok(())
199        })
200        .await
201        .map_err(|e| AonyxError::Memory(format!("diary append join: {e}")))??;
202        Ok(id)
203    }
204
205    async fn recent(&self, project: &str, limit: usize) -> Result<Vec<DiaryEntry>> {
206        let conn = self.conn.clone();
207        let project = project.to_string();
208        let limit = limit as i64;
209        tokio::task::spawn_blocking(move || -> Result<Vec<DiaryEntry>> {
210            let lock = conn.lock().expect("diary mutex poisoned");
211            let sql = format!(
212                "SELECT {DIARY_COLUMNS} FROM diary WHERE project = ?1 ORDER BY ts DESC LIMIT ?2"
213            );
214            let mut stmt = lock
215                .prepare(&sql)
216                .map_err(|e| AonyxError::Memory(format!("prepare diary recent: {e}")))?;
217            let rows = stmt
218                .query_map(params![project, limit], entry_from_row)
219                .map_err(|e| AonyxError::Memory(format!("query diary recent: {e}")))?;
220            let mut out = Vec::new();
221            for r in rows {
222                out.push(r.map_err(|e| AonyxError::Memory(format!("row decode: {e}")))?);
223            }
224            Ok(out)
225        })
226        .await
227        .map_err(|e| AonyxError::Memory(format!("diary recent join: {e}")))?
228    }
229
230    async fn all(&self, project: &str) -> Result<Vec<DiaryEntry>> {
231        let conn = self.conn.clone();
232        let project = project.to_string();
233        tokio::task::spawn_blocking(move || -> Result<Vec<DiaryEntry>> {
234            let lock = conn.lock().expect("diary mutex poisoned");
235            let sql =
236                format!("SELECT {DIARY_COLUMNS} FROM diary WHERE project = ?1 ORDER BY ts DESC");
237            let mut stmt = lock
238                .prepare(&sql)
239                .map_err(|e| AonyxError::Memory(format!("prepare diary all: {e}")))?;
240            let rows = stmt
241                .query_map(params![project], entry_from_row)
242                .map_err(|e| AonyxError::Memory(format!("query diary all: {e}")))?;
243            let mut out = Vec::new();
244            for r in rows {
245                out.push(r.map_err(|e| AonyxError::Memory(format!("row decode: {e}")))?);
246            }
247            Ok(out)
248        })
249        .await
250        .map_err(|e| AonyxError::Memory(format!("diary all join: {e}")))?
251    }
252
253    async fn count(&self, project: &str) -> Result<usize> {
254        let conn = self.conn.clone();
255        let project = project.to_string();
256        tokio::task::spawn_blocking(move || -> Result<usize> {
257            let lock = conn.lock().expect("diary mutex poisoned");
258            let n: i64 = lock
259                .query_row(
260                    "SELECT COUNT(*) FROM diary WHERE project = ?1",
261                    params![project],
262                    |r| r.get(0),
263                )
264                .map_err(|e| AonyxError::Memory(format!("diary count: {e}")))?;
265            Ok(n.max(0) as usize)
266        })
267        .await
268        .map_err(|e| AonyxError::Memory(format!("diary count join: {e}")))?
269    }
270}
271
272#[cfg(test)]
273mod tests {
274    use super::*;
275    use std::time::Duration;
276
277    #[tokio::test]
278    async fn append_then_count() {
279        let store = SqliteDiaryStore::open_in_memory().unwrap();
280        store
281            .append(DiaryEntry::new("demo", "first note"))
282            .await
283            .unwrap();
284        assert_eq!(store.count("demo").await.unwrap(), 1);
285        assert_eq!(store.count("other").await.unwrap(), 0);
286    }
287
288    #[tokio::test]
289    async fn recent_returns_newest_first() {
290        let store = SqliteDiaryStore::open_in_memory().unwrap();
291        store
292            .append(DiaryEntry::new("demo", "older"))
293            .await
294            .unwrap();
295        // RFC 3339 has millisecond resolution; sleep a tick so timestamps differ.
296        tokio::time::sleep(Duration::from_millis(5)).await;
297        store
298            .append(DiaryEntry::new("demo", "newer"))
299            .await
300            .unwrap();
301
302        let recent = store.recent("demo", 10).await.unwrap();
303        assert_eq!(recent.len(), 2);
304        assert_eq!(recent[0].content, "newer");
305        assert_eq!(recent[1].content, "older");
306    }
307
308    #[tokio::test]
309    async fn recent_honours_limit() {
310        let store = SqliteDiaryStore::open_in_memory().unwrap();
311        for i in 0..5 {
312            store
313                .append(DiaryEntry::new("demo", format!("note {i}")))
314                .await
315                .unwrap();
316            tokio::time::sleep(Duration::from_millis(2)).await;
317        }
318        let recent = store.recent("demo", 2).await.unwrap();
319        assert_eq!(recent.len(), 2);
320    }
321
322    #[tokio::test]
323    async fn entries_are_project_scoped() {
324        let store = SqliteDiaryStore::open_in_memory().unwrap();
325        store
326            .append(DiaryEntry::new("a", "only-in-a"))
327            .await
328            .unwrap();
329        store
330            .append(DiaryEntry::new("b", "only-in-b"))
331            .await
332            .unwrap();
333
334        let in_a = store.all("a").await.unwrap();
335        let in_b = store.all("b").await.unwrap();
336        assert_eq!(in_a.len(), 1);
337        assert_eq!(in_b.len(), 1);
338        assert_eq!(in_a[0].content, "only-in-a");
339        assert_eq!(in_b[0].content, "only-in-b");
340    }
341
342    #[tokio::test]
343    async fn with_kind_and_refs_round_trip() {
344        let store = SqliteDiaryStore::open_in_memory().unwrap();
345        let entry = DiaryEntry::new("demo", "decision: switch to Rust")
346            .with_kind("decision")
347            .with_refs(serde_json::json!({"kg_entity": "abc-123"}));
348        store.append(entry).await.unwrap();
349        let recent = store.recent("demo", 1).await.unwrap();
350        assert_eq!(recent[0].kind.as_deref(), Some("decision"));
351        assert_eq!(recent[0].refs["kg_entity"], "abc-123");
352    }
353}