Skip to main content

khive_db/stores/
note.rs

1//! SQL-backed `NoteStore` implementation.
2
3use std::sync::Arc;
4
5use async_trait::async_trait;
6use uuid::Uuid;
7
8use khive_storage::error::StorageError;
9use khive_storage::note::{FilterOp, Note, NoteFilter, SortDir};
10use khive_storage::types::{BatchWriteSummary, DeleteMode, Page, PageRequest, SqlValue};
11use khive_storage::NoteStore;
12use khive_storage::StorageCapability;
13
14use crate::error::SqliteError;
15use crate::pool::ConnectionPool;
16
17fn map_err(e: rusqlite::Error, op: &'static str) -> StorageError {
18    StorageError::driver(StorageCapability::Notes, op, e)
19}
20
21fn map_sqlite_err(e: SqliteError, op: &'static str) -> StorageError {
22    StorageError::driver(StorageCapability::Notes, op, e)
23}
24
25/// A NoteStore backed by SQLite. Namespace is the caller's responsibility.
26///
27/// UUID is globally unique — get/delete by ID alone. Query/count use the
28/// namespace parameter as passed. The store is just a pool + is_file_backed.
29pub struct SqlNoteStore {
30    pool: Arc<ConnectionPool>,
31    is_file_backed: bool,
32}
33
34impl SqlNoteStore {
35    /// Create a new store.
36    pub fn new(pool: Arc<ConnectionPool>, is_file_backed: bool) -> Self {
37        Self {
38            pool,
39            is_file_backed,
40        }
41    }
42
43    fn open_standalone_reader(&self) -> Result<rusqlite::Connection, StorageError> {
44        let config = self.pool.config();
45        let path = config.path.as_ref().ok_or_else(|| StorageError::Pool {
46            operation: "note_reader".into(),
47            message: "in-memory databases do not support standalone connections".into(),
48        })?;
49
50        let conn = rusqlite::Connection::open_with_flags(
51            path,
52            rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY
53                | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX
54                | rusqlite::OpenFlags::SQLITE_OPEN_URI,
55        )
56        .map_err(|e| map_err(e, "open_note_reader"))?;
57
58        conn.busy_timeout(config.busy_timeout)
59            .map_err(|e| map_err(e, "open_note_reader"))?;
60        conn.pragma_update(None, "foreign_keys", "ON")
61            .map_err(|e| map_err(e, "open_note_reader"))?;
62        conn.pragma_update(None, "synchronous", "NORMAL")
63            .map_err(|e| map_err(e, "open_note_reader"))?;
64
65        Ok(conn)
66    }
67
68    /// Write via pool writer (serializes writes through the mutex).
69    async fn with_writer<F, R>(&self, op: &'static str, f: F) -> Result<R, StorageError>
70    where
71        F: FnOnce(&rusqlite::Connection) -> Result<R, rusqlite::Error> + Send + 'static,
72        R: Send + 'static,
73    {
74        let pool = Arc::clone(&self.pool);
75        tokio::task::spawn_blocking(move || {
76            let guard = pool.try_writer().map_err(|e| map_sqlite_err(e, op))?;
77            f(guard.conn()).map_err(|e| map_err(e, op))
78        })
79        .await
80        .map_err(|e| StorageError::driver(StorageCapability::Notes, op, e))?
81    }
82
83    async fn with_reader<F, R>(&self, op: &'static str, f: F) -> Result<R, StorageError>
84    where
85        F: FnOnce(&rusqlite::Connection) -> Result<R, rusqlite::Error> + Send + 'static,
86        R: Send + 'static,
87    {
88        if self.is_file_backed {
89            let conn = self.open_standalone_reader()?;
90            tokio::task::spawn_blocking(move || f(&conn).map_err(|e| map_err(e, op)))
91                .await
92                .map_err(|e| StorageError::driver(StorageCapability::Notes, op, e))?
93        } else {
94            let pool = Arc::clone(&self.pool);
95            tokio::task::spawn_blocking(move || {
96                let guard = pool.reader().map_err(|e| map_sqlite_err(e, op))?;
97                f(guard.conn()).map_err(|e| map_err(e, op))
98            })
99            .await
100            .map_err(|e| StorageError::driver(StorageCapability::Notes, op, e))?
101        }
102    }
103}
104
105// =============================================================================
106// Helpers
107// =============================================================================
108
109fn read_note(row: &rusqlite::Row<'_>) -> Result<Note, rusqlite::Error> {
110    let id_str: String = row.get(0)?;
111    let namespace: String = row.get(1)?;
112    let kind: String = row.get(2)?;
113    let status: String = row.get(3)?;
114    let name: Option<String> = row.get(4)?;
115    let content: String = row.get(5)?;
116    let salience: Option<f64> = row.get(6)?;
117    let decay_factor: Option<f64> = row.get(7)?;
118    let expires_at: Option<i64> = row.get(8)?;
119    let properties_str: Option<String> = row.get(9)?;
120    let created_at: i64 = row.get(10)?;
121    let updated_at: i64 = row.get(11)?;
122    let deleted_at: Option<i64> = row.get(12)?;
123
124    let id = parse_uuid(&id_str)?;
125
126    let properties = properties_str
127        .map(|s| {
128            serde_json::from_str(&s).map_err(|e| {
129                rusqlite::Error::FromSqlConversionFailure(
130                    9,
131                    rusqlite::types::Type::Text,
132                    Box::new(e),
133                )
134            })
135        })
136        .transpose()?;
137
138    Ok(Note {
139        id,
140        namespace,
141        kind,
142        status,
143        name,
144        content,
145        salience,
146        decay_factor,
147        expires_at,
148        properties,
149        created_at,
150        updated_at,
151        deleted_at,
152    })
153}
154
155fn parse_uuid(s: &str) -> Result<Uuid, rusqlite::Error> {
156    Uuid::parse_str(s).map_err(|e| {
157        rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(e))
158    })
159}
160
161fn build_note_where(
162    namespace: &str,
163    kind: Option<&str>,
164) -> (String, Vec<Box<dyn rusqlite::types::ToSql>>) {
165    let mut conditions: Vec<String> = vec![
166        "namespace = ?1".to_string(),
167        "deleted_at IS NULL".to_string(),
168    ];
169    let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = vec![Box::new(namespace.to_string())];
170
171    if let Some(k) = kind {
172        params.push(Box::new(k.to_string()));
173        conditions.push(format!("kind = ?{}", params.len()));
174    }
175
176    let clause = format!(" WHERE {}", conditions.join(" AND "));
177    (clause, params)
178}
179
180/// Validate that a json_path is safe to interpolate into SQL.
181/// Accepts only `$.field` or `$.field.subfield` paths with alphanumeric/underscore segments.
182fn validate_json_path(path: &str) -> Result<(), StorageError> {
183    let valid = path.starts_with("$.")
184        && path[2..].split('.').all(|part| {
185            !part.is_empty() && part.chars().all(|c| c.is_ascii_alphanumeric() || c == '_')
186        });
187    if valid {
188        Ok(())
189    } else {
190        Err(StorageError::InvalidInput {
191            capability: StorageCapability::Notes,
192            operation: "query_notes_filtered".into(),
193            message: format!("invalid JSON path for note filter: {path:?}"),
194        })
195    }
196}
197
198fn json_extract_expr(path: &str) -> String {
199    format!("json_extract(properties, '{path}')")
200}
201
202fn json_type_expr(path: &str) -> String {
203    format!("json_type(properties, '{path}')")
204}
205
206fn sql_value_param(value: &SqlValue) -> Result<Box<dyn rusqlite::types::ToSql>, rusqlite::Error> {
207    Ok(match value {
208        SqlValue::Null => Box::new(Option::<String>::None),
209        SqlValue::Bool(v) => Box::new(*v as i64),
210        SqlValue::Integer(v) => Box::new(*v),
211        SqlValue::Float(v) => Box::new(*v),
212        SqlValue::Text(v) => Box::new(v.clone()),
213        SqlValue::Blob(v) => Box::new(v.clone()),
214        SqlValue::Json(v) => Box::new(
215            serde_json::to_string(v)
216                .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?,
217        ),
218        SqlValue::Uuid(v) => Box::new(v.to_string()),
219        SqlValue::Timestamp(v) => Box::new(v.timestamp_micros()),
220    })
221}
222
223fn build_note_filter_where(
224    namespace: &str,
225    filter: &NoteFilter,
226) -> Result<(String, Vec<Box<dyn rusqlite::types::ToSql>>), rusqlite::Error> {
227    let mut conditions = vec![
228        "namespace = ?1".to_string(),
229        "deleted_at IS NULL".to_string(),
230    ];
231    let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = vec![Box::new(namespace.to_string())];
232
233    if let Some(kind) = &filter.kind {
234        params.push(Box::new(kind.clone()));
235        conditions.push(format!("kind = ?{}", params.len()));
236    }
237
238    for pf in &filter.property_filters {
239        match pf.op {
240            FilterOp::EqOrMissing => {
241                let expr = json_extract_expr(&pf.json_path);
242                params.push(sql_value_param(&pf.value)?);
243                conditions.push(format!(
244                    "({expr} = ?{n} OR {expr} IS NULL)",
245                    n = params.len()
246                ));
247            }
248            FilterOp::JsonTypeEq => {
249                let type_expr = json_type_expr(&pf.json_path);
250                params.push(sql_value_param(&pf.value)?);
251                conditions.push(format!("{type_expr} = ?{}", params.len()));
252            }
253            FilterOp::JsonTypeNeMissing => {
254                let type_expr = json_type_expr(&pf.json_path);
255                params.push(sql_value_param(&pf.value)?);
256                let n = params.len();
257                conditions.push(format!("({type_expr} IS NULL OR {type_expr} != ?{n})"));
258            }
259            _ => {
260                let expr = json_extract_expr(&pf.json_path);
261                let op = match pf.op {
262                    FilterOp::Eq => "=",
263                    FilterOp::Ne => "!=",
264                    FilterOp::Lt => "<",
265                    FilterOp::Lte => "<=",
266                    FilterOp::Gt => ">",
267                    FilterOp::Gte => ">=",
268                    FilterOp::EqOrMissing | FilterOp::JsonTypeEq | FilterOp::JsonTypeNeMissing => {
269                        unreachable!()
270                    }
271                };
272                params.push(sql_value_param(&pf.value)?);
273                conditions.push(format!("{expr} {op} ?{}", params.len()));
274            }
275        }
276    }
277
278    Ok((format!(" WHERE {}", conditions.join(" AND ")), params))
279}
280
281// =============================================================================
282// NoteStore implementation
283// =============================================================================
284
285#[async_trait]
286impl NoteStore for SqlNoteStore {
287    async fn upsert_note(&self, note: Note) -> Result<(), StorageError> {
288        let namespace = note.namespace.clone();
289        let id_str = note.id.to_string();
290        let kind_str = note.kind.to_string();
291        let status_str = note.status.clone();
292        let properties_str = note
293            .properties
294            .as_ref()
295            .map(|v| serde_json::to_string(v).unwrap_or_default());
296
297        self.with_writer("upsert_note", move |conn| {
298            conn.execute(
299                "INSERT OR REPLACE INTO notes \
300                 (id, namespace, kind, status, name, content, salience, decay_factor, expires_at, \
301                  properties, created_at, updated_at, deleted_at) \
302                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
303                rusqlite::params![
304                    id_str,
305                    namespace,
306                    kind_str,
307                    status_str,
308                    note.name,
309                    note.content,
310                    note.salience,
311                    note.decay_factor,
312                    note.expires_at,
313                    properties_str,
314                    note.created_at,
315                    note.updated_at,
316                    note.deleted_at,
317                ],
318            )?;
319            Ok(())
320        })
321        .await
322    }
323
324    async fn upsert_notes(&self, notes: Vec<Note>) -> Result<BatchWriteSummary, StorageError> {
325        let attempted = notes.len() as u64;
326
327        self.with_writer("upsert_notes", move |conn| {
328            conn.execute_batch("BEGIN IMMEDIATE")?;
329            let mut affected = 0u64;
330            let mut failed = 0u64;
331            let mut first_error = String::new();
332
333            for note in &notes {
334                let id_str = note.id.to_string();
335                let kind_str = note.kind.to_string();
336                let status_str = note.status.clone();
337                let properties_str = note
338                    .properties
339                    .as_ref()
340                    .map(|v| serde_json::to_string(v).unwrap_or_default());
341
342                match conn.execute(
343                    "INSERT OR REPLACE INTO notes \
344                     (id, namespace, kind, status, name, content, salience, decay_factor, expires_at, \
345                      properties, created_at, updated_at, deleted_at) \
346                     VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
347                    rusqlite::params![
348                        id_str,
349                        &note.namespace,
350                        kind_str,
351                        status_str,
352                        &note.name,
353                        note.content,
354                        note.salience,
355                        note.decay_factor,
356                        note.expires_at,
357                        properties_str,
358                        note.created_at,
359                        note.updated_at,
360                        note.deleted_at,
361                    ],
362                ) {
363                    Ok(_) => affected += 1,
364                    Err(e) => {
365                        if first_error.is_empty() {
366                            first_error = e.to_string();
367                        }
368                        failed += 1;
369                    }
370                }
371            }
372
373            if let Err(e) = conn.execute_batch("COMMIT") {
374                let _ = conn.execute_batch("ROLLBACK");
375                return Err(e);
376            }
377            Ok(BatchWriteSummary {
378                attempted,
379                affected,
380                failed,
381                first_error,
382            })
383        })
384        .await
385    }
386
387    async fn get_note(&self, id: Uuid) -> Result<Option<Note>, StorageError> {
388        let id_str = id.to_string();
389
390        self.with_reader("get_note", move |conn| {
391            let mut stmt = conn.prepare(
392                "SELECT id, namespace, kind, status, name, content, salience, decay_factor, expires_at, \
393                 properties, created_at, updated_at, deleted_at \
394                 FROM notes WHERE id = ?1 AND deleted_at IS NULL",
395            )?;
396            let mut rows = stmt.query(rusqlite::params![id_str])?;
397            match rows.next()? {
398                Some(row) => Ok(Some(read_note(row)?)),
399                None => Ok(None),
400            }
401        })
402        .await
403    }
404
405    async fn get_notes_batch(&self, ids: &[Uuid]) -> Result<Vec<Note>, StorageError> {
406        if ids.is_empty() {
407            return Ok(vec![]);
408        }
409        let id_strings: Vec<String> = ids.iter().map(|id| id.to_string()).collect();
410
411        self.with_reader("get_notes_batch", move |conn| {
412            let placeholders: String = (1..=id_strings.len())
413                .map(|i| format!("?{i}"))
414                .collect::<Vec<_>>()
415                .join(", ");
416            let sql = format!(
417                "SELECT id, namespace, kind, status, name, content, salience, decay_factor, expires_at, \
418                 properties, created_at, updated_at, deleted_at \
419                 FROM notes WHERE id IN ({placeholders}) AND deleted_at IS NULL"
420            );
421            let mut stmt = conn.prepare(&sql)?;
422            let params: Vec<&dyn rusqlite::types::ToSql> = id_strings
423                .iter()
424                .map(|s| s as &dyn rusqlite::types::ToSql)
425                .collect();
426            let rows = stmt.query_map(params.as_slice(), read_note)?;
427            let mut out = Vec::new();
428            for row in rows {
429                out.push(row?);
430            }
431            Ok(out)
432        })
433        .await
434    }
435
436    async fn delete_note(&self, id: Uuid, mode: DeleteMode) -> Result<bool, StorageError> {
437        let id_str = id.to_string();
438
439        match mode {
440            DeleteMode::Soft => {
441                self.with_writer("delete_note_soft", move |conn| {
442                    let now = chrono::Utc::now().timestamp_micros();
443                    let deleted = conn.execute(
444                        "UPDATE notes SET status = 'deleted', deleted_at = ?1 \
445                         WHERE id = ?2 AND deleted_at IS NULL",
446                        rusqlite::params![now, id_str],
447                    )?;
448                    Ok(deleted > 0)
449                })
450                .await
451            }
452            DeleteMode::Hard => {
453                self.with_writer("delete_note_hard", move |conn| {
454                    let deleted =
455                        conn.execute("DELETE FROM notes WHERE id = ?1", rusqlite::params![id_str])?;
456                    Ok(deleted > 0)
457                })
458                .await
459            }
460        }
461    }
462
463    async fn query_notes(
464        &self,
465        namespace: &str,
466        kind: Option<&str>,
467        page: PageRequest,
468    ) -> Result<Page<Note>, StorageError> {
469        let namespace = namespace.to_string();
470        let kind = kind.map(|k| k.to_string());
471
472        self.with_reader("query_notes", move |conn| {
473            let (count_sql, count_params) = build_note_where(&namespace, kind.as_deref());
474            let total: i64 = {
475                let sql = format!("SELECT COUNT(*) FROM notes{}", count_sql);
476                let mut stmt = conn.prepare(&sql)?;
477                let param_refs: Vec<&dyn rusqlite::types::ToSql> =
478                    count_params.iter().map(|p| p.as_ref()).collect();
479                stmt.query_row(param_refs.as_slice(), |row| row.get(0))?
480            };
481
482            let (where_sql, mut data_params) = build_note_where(&namespace, kind.as_deref());
483            data_params.push(Box::new(page.limit as i64));
484            data_params.push(Box::new(page.offset as i64));
485
486            let limit_idx = data_params.len() - 1;
487            let offset_idx = data_params.len();
488
489            let data_sql = format!(
490                "SELECT id, namespace, kind, status, name, content, salience, decay_factor, expires_at, \
491                 properties, created_at, updated_at, deleted_at \
492                 FROM notes{} ORDER BY created_at DESC LIMIT ?{} OFFSET ?{}",
493                where_sql, limit_idx, offset_idx,
494            );
495
496            let mut stmt = conn.prepare(&data_sql)?;
497            let param_refs: Vec<&dyn rusqlite::types::ToSql> =
498                data_params.iter().map(|p| p.as_ref()).collect();
499            let rows = stmt.query_map(param_refs.as_slice(), read_note)?;
500
501            let mut items = Vec::new();
502            for row in rows {
503                items.push(row?);
504            }
505
506            Ok(Page {
507                items,
508                total: Some(total as u64),
509            })
510        })
511        .await
512    }
513
514    async fn query_notes_filtered(
515        &self,
516        namespace: &str,
517        filter: &NoteFilter,
518        page: PageRequest,
519    ) -> Result<Page<Note>, StorageError> {
520        // Validate paths before entering spawn_blocking (closures return rusqlite::Error).
521        for pf in &filter.property_filters {
522            validate_json_path(&pf.json_path)?;
523        }
524        if let Some((path, _)) = &filter.order_by {
525            validate_json_path(path)?;
526        }
527
528        let namespace = namespace.to_string();
529        let filter = filter.clone();
530
531        self.with_reader("query_notes_filtered", move |conn| {
532            let (count_sql, count_params) = build_note_filter_where(&namespace, &filter)?;
533            let total: i64 = {
534                let sql = format!("SELECT COUNT(*) FROM notes{}", count_sql);
535                let mut stmt = conn.prepare(&sql)?;
536                let param_refs: Vec<&dyn rusqlite::types::ToSql> =
537                    count_params.iter().map(|p| p.as_ref()).collect();
538                stmt.query_row(param_refs.as_slice(), |row| row.get(0))?
539            };
540
541            let (where_sql, mut data_params) = build_note_filter_where(&namespace, &filter)?;
542            data_params.push(Box::new(page.limit as i64));
543            data_params.push(Box::new(page.offset as i64));
544
545            let order_clause = match &filter.order_by {
546                Some((path, dir)) => {
547                    let dir_str = match dir {
548                        SortDir::Asc => "ASC",
549                        SortDir::Desc => "DESC",
550                    };
551                    format!(" ORDER BY {} {dir_str}", json_extract_expr(path))
552                }
553                None => " ORDER BY created_at DESC".to_string(),
554            };
555
556            let limit_idx = data_params.len() - 1;
557            let offset_idx = data_params.len();
558            let data_sql = format!(
559                "SELECT id, namespace, kind, status, name, content, salience, decay_factor, \
560                 expires_at, properties, created_at, updated_at, deleted_at \
561                 FROM notes{}{order_clause} LIMIT ?{} OFFSET ?{}",
562                where_sql, limit_idx, offset_idx,
563            );
564
565            let mut stmt = conn.prepare(&data_sql)?;
566            let param_refs: Vec<&dyn rusqlite::types::ToSql> =
567                data_params.iter().map(|p| p.as_ref()).collect();
568            let rows = stmt.query_map(param_refs.as_slice(), read_note)?;
569
570            let mut items = Vec::new();
571            for row in rows {
572                items.push(row?);
573            }
574
575            Ok(Page {
576                items,
577                total: Some(total as u64),
578            })
579        })
580        .await
581    }
582
583    async fn count_notes(&self, namespace: &str, kind: Option<&str>) -> Result<u64, StorageError> {
584        let namespace = namespace.to_string();
585        let kind = kind.map(|k| k.to_string());
586
587        self.with_reader("count_notes", move |conn| {
588            let (where_sql, params) = build_note_where(&namespace, kind.as_deref());
589            let sql = format!("SELECT COUNT(*) FROM notes{}", where_sql);
590            let mut stmt = conn.prepare(&sql)?;
591            let param_refs: Vec<&dyn rusqlite::types::ToSql> =
592                params.iter().map(|p| p.as_ref()).collect();
593            let count: i64 = stmt.query_row(param_refs.as_slice(), |row| row.get(0))?;
594            Ok(count as u64)
595        })
596        .await
597    }
598}
599
600// =============================================================================
601// DDL
602// =============================================================================
603
604const NOTES_DDL: &str = "\
605    CREATE TABLE IF NOT EXISTS notes (\
606        id TEXT PRIMARY KEY,\
607        namespace TEXT NOT NULL,\
608        kind TEXT NOT NULL,\
609        status TEXT NOT NULL DEFAULT 'active',\
610        name TEXT,\
611        content TEXT NOT NULL DEFAULT '',\
612        salience REAL,\
613        decay_factor REAL,\
614        expires_at INTEGER,\
615        properties TEXT,\
616        created_at INTEGER NOT NULL,\
617        updated_at INTEGER NOT NULL,\
618        deleted_at INTEGER\
619    );\
620    CREATE INDEX IF NOT EXISTS idx_notes_namespace ON notes(namespace);\
621    CREATE INDEX IF NOT EXISTS idx_notes_kind ON notes(namespace, kind);\
622    CREATE INDEX IF NOT EXISTS idx_notes_created ON notes(created_at DESC);\
623";
624
625pub(crate) fn ensure_notes_schema(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
626    conn.execute_batch(NOTES_DDL)
627}
628
629#[cfg(test)]
630#[path = "note_tests.rs"]
631mod tests;