use std::path::Path;
use std::sync::{Arc, Mutex};
use aonyx_core::{AonyxError, Result};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use rusqlite::{params, Connection, Row};
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
use uuid::Uuid;
pub type DiaryEntryId = Uuid;
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct DiaryEntry {
pub id: DiaryEntryId,
pub project: String,
pub ts: DateTime<Utc>,
pub content: String,
pub kind: Option<String>,
#[serde(default)]
pub refs: JsonValue,
}
impl DiaryEntry {
pub fn new(project: impl Into<String>, content: impl Into<String>) -> Self {
Self {
id: Uuid::new_v4(),
project: project.into(),
ts: Utc::now(),
content: content.into(),
kind: None,
refs: JsonValue::Null,
}
}
pub fn with_kind(mut self, kind: impl Into<String>) -> Self {
self.kind = Some(kind.into());
self
}
pub fn with_refs(mut self, refs: JsonValue) -> Self {
self.refs = refs;
self
}
}
#[async_trait]
pub trait DiaryStore: Send + Sync {
async fn append(&self, entry: DiaryEntry) -> Result<DiaryEntryId>;
async fn recent(&self, project: &str, limit: usize) -> Result<Vec<DiaryEntry>>;
async fn all(&self, project: &str) -> Result<Vec<DiaryEntry>>;
async fn count(&self, project: &str) -> Result<usize>;
}
#[derive(Clone)]
pub struct SqliteDiaryStore {
conn: Arc<Mutex<Connection>>,
}
impl SqliteDiaryStore {
pub fn open(path: impl AsRef<Path>) -> Result<Self> {
let conn = Connection::open(path.as_ref())
.map_err(|e| AonyxError::Memory(format!("open diary db: {e}")))?;
Self::migrate(&conn)?;
Ok(Self {
conn: Arc::new(Mutex::new(conn)),
})
}
pub fn open_in_memory() -> Result<Self> {
let conn = Connection::open_in_memory()
.map_err(|e| AonyxError::Memory(format!("open in-memory diary: {e}")))?;
Self::migrate(&conn)?;
Ok(Self {
conn: Arc::new(Mutex::new(conn)),
})
}
fn migrate(conn: &Connection) -> Result<()> {
conn.execute_batch(MIGRATION_V1)
.map_err(|e| AonyxError::Memory(format!("migrate diary schema: {e}")))?;
Ok(())
}
}
const MIGRATION_V1: &str = r#"
CREATE TABLE IF NOT EXISTS diary (
id TEXT PRIMARY KEY,
project TEXT NOT NULL,
ts TEXT NOT NULL,
content TEXT NOT NULL,
kind TEXT,
refs_json TEXT
);
CREATE INDEX IF NOT EXISTS idx_diary_project_ts ON diary(project, ts DESC);
"#;
const DIARY_COLUMNS: &str = "id, project, ts, content, kind, refs_json";
fn entry_from_row(row: &Row<'_>) -> rusqlite::Result<DiaryEntry> {
let id_str: String = row.get(0)?;
let project: String = row.get(1)?;
let ts_raw: String = row.get(2)?;
let content: String = row.get(3)?;
let kind: Option<String> = row.get(4)?;
let refs_json: Option<String> = row.get(5)?;
let id = Uuid::parse_str(&id_str).map_err(|e| {
rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(e))
})?;
let ts = DateTime::parse_from_rfc3339(&ts_raw)
.map(|d| d.with_timezone(&Utc))
.unwrap_or_else(|_| Utc::now());
let refs = refs_json
.and_then(|s| serde_json::from_str(&s).ok())
.unwrap_or(JsonValue::Null);
Ok(DiaryEntry {
id,
project,
ts,
content,
kind,
refs,
})
}
#[async_trait]
impl DiaryStore for SqliteDiaryStore {
async fn append(&self, entry: DiaryEntry) -> Result<DiaryEntryId> {
let conn = self.conn.clone();
let id = entry.id;
tokio::task::spawn_blocking(move || -> Result<()> {
let lock = conn.lock().expect("diary mutex poisoned");
lock.execute(
r#"
INSERT INTO diary (id, project, ts, content, kind, refs_json)
VALUES (?1, ?2, ?3, ?4, ?5, ?6)
"#,
params![
entry.id.to_string(),
entry.project,
entry.ts.to_rfc3339(),
entry.content,
entry.kind,
serde_json::to_string(&entry.refs).ok(),
],
)
.map_err(|e| AonyxError::Memory(format!("diary append: {e}")))?;
Ok(())
})
.await
.map_err(|e| AonyxError::Memory(format!("diary append join: {e}")))??;
Ok(id)
}
async fn recent(&self, project: &str, limit: usize) -> Result<Vec<DiaryEntry>> {
let conn = self.conn.clone();
let project = project.to_string();
let limit = limit as i64;
tokio::task::spawn_blocking(move || -> Result<Vec<DiaryEntry>> {
let lock = conn.lock().expect("diary mutex poisoned");
let sql = format!(
"SELECT {DIARY_COLUMNS} FROM diary WHERE project = ?1 ORDER BY ts DESC LIMIT ?2"
);
let mut stmt = lock
.prepare(&sql)
.map_err(|e| AonyxError::Memory(format!("prepare diary recent: {e}")))?;
let rows = stmt
.query_map(params![project, limit], entry_from_row)
.map_err(|e| AonyxError::Memory(format!("query diary recent: {e}")))?;
let mut out = Vec::new();
for r in rows {
out.push(r.map_err(|e| AonyxError::Memory(format!("row decode: {e}")))?);
}
Ok(out)
})
.await
.map_err(|e| AonyxError::Memory(format!("diary recent join: {e}")))?
}
async fn all(&self, project: &str) -> Result<Vec<DiaryEntry>> {
let conn = self.conn.clone();
let project = project.to_string();
tokio::task::spawn_blocking(move || -> Result<Vec<DiaryEntry>> {
let lock = conn.lock().expect("diary mutex poisoned");
let sql =
format!("SELECT {DIARY_COLUMNS} FROM diary WHERE project = ?1 ORDER BY ts DESC");
let mut stmt = lock
.prepare(&sql)
.map_err(|e| AonyxError::Memory(format!("prepare diary all: {e}")))?;
let rows = stmt
.query_map(params![project], entry_from_row)
.map_err(|e| AonyxError::Memory(format!("query diary all: {e}")))?;
let mut out = Vec::new();
for r in rows {
out.push(r.map_err(|e| AonyxError::Memory(format!("row decode: {e}")))?);
}
Ok(out)
})
.await
.map_err(|e| AonyxError::Memory(format!("diary all join: {e}")))?
}
async fn count(&self, project: &str) -> Result<usize> {
let conn = self.conn.clone();
let project = project.to_string();
tokio::task::spawn_blocking(move || -> Result<usize> {
let lock = conn.lock().expect("diary mutex poisoned");
let n: i64 = lock
.query_row(
"SELECT COUNT(*) FROM diary WHERE project = ?1",
params![project],
|r| r.get(0),
)
.map_err(|e| AonyxError::Memory(format!("diary count: {e}")))?;
Ok(n.max(0) as usize)
})
.await
.map_err(|e| AonyxError::Memory(format!("diary count join: {e}")))?
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
#[tokio::test]
async fn append_then_count() {
let store = SqliteDiaryStore::open_in_memory().unwrap();
store
.append(DiaryEntry::new("demo", "first note"))
.await
.unwrap();
assert_eq!(store.count("demo").await.unwrap(), 1);
assert_eq!(store.count("other").await.unwrap(), 0);
}
#[tokio::test]
async fn recent_returns_newest_first() {
let store = SqliteDiaryStore::open_in_memory().unwrap();
store
.append(DiaryEntry::new("demo", "older"))
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(5)).await;
store
.append(DiaryEntry::new("demo", "newer"))
.await
.unwrap();
let recent = store.recent("demo", 10).await.unwrap();
assert_eq!(recent.len(), 2);
assert_eq!(recent[0].content, "newer");
assert_eq!(recent[1].content, "older");
}
#[tokio::test]
async fn recent_honours_limit() {
let store = SqliteDiaryStore::open_in_memory().unwrap();
for i in 0..5 {
store
.append(DiaryEntry::new("demo", format!("note {i}")))
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(2)).await;
}
let recent = store.recent("demo", 2).await.unwrap();
assert_eq!(recent.len(), 2);
}
#[tokio::test]
async fn entries_are_project_scoped() {
let store = SqliteDiaryStore::open_in_memory().unwrap();
store
.append(DiaryEntry::new("a", "only-in-a"))
.await
.unwrap();
store
.append(DiaryEntry::new("b", "only-in-b"))
.await
.unwrap();
let in_a = store.all("a").await.unwrap();
let in_b = store.all("b").await.unwrap();
assert_eq!(in_a.len(), 1);
assert_eq!(in_b.len(), 1);
assert_eq!(in_a[0].content, "only-in-a");
assert_eq!(in_b[0].content, "only-in-b");
}
#[tokio::test]
async fn with_kind_and_refs_round_trip() {
let store = SqliteDiaryStore::open_in_memory().unwrap();
let entry = DiaryEntry::new("demo", "decision: switch to Rust")
.with_kind("decision")
.with_refs(serde_json::json!({"kg_entity": "abc-123"}));
store.append(entry).await.unwrap();
let recent = store.recent("demo", 1).await.unwrap();
assert_eq!(recent[0].kind.as_deref(), Some("decision"));
assert_eq!(recent[0].refs["kg_entity"], "abc-123");
}
}