Skip to main content

khive_db/stores/
note.rs

1//! SQL-backed `NoteStore` implementation.
2
3use std::sync::Arc;
4
5use async_trait::async_trait;
6use uuid::Uuid;
7
8use khive_storage::error::StorageError;
9use khive_storage::note::{FilterOp, Note, NoteFilter, SortDir};
10use khive_storage::types::{BatchWriteSummary, DeleteMode, Page, PageRequest, SqlValue};
11use khive_storage::NoteStore;
12use khive_storage::StorageCapability;
13
14use crate::error::SqliteError;
15use crate::pool::ConnectionPool;
16
17fn map_err(e: rusqlite::Error, op: &'static str) -> StorageError {
18    StorageError::driver(StorageCapability::Notes, op, e)
19}
20
21fn map_sqlite_err(e: SqliteError, op: &'static str) -> StorageError {
22    StorageError::driver(StorageCapability::Notes, op, e)
23}
24
25/// A NoteStore backed by SQLite. Namespace is the caller's responsibility.
26///
27/// UUID is globally unique — get/delete by ID alone. Query/count use the
28/// namespace parameter as passed. The store is just a pool + is_file_backed.
29pub struct SqlNoteStore {
30    pool: Arc<ConnectionPool>,
31    is_file_backed: bool,
32}
33
34impl SqlNoteStore {
35    /// Create a new store.
36    pub fn new(pool: Arc<ConnectionPool>, is_file_backed: bool) -> Self {
37        Self {
38            pool,
39            is_file_backed,
40        }
41    }
42
43    fn open_standalone_reader(&self) -> Result<rusqlite::Connection, StorageError> {
44        let config = self.pool.config();
45        let path = config.path.as_ref().ok_or_else(|| StorageError::Pool {
46            operation: "note_reader".into(),
47            message: "in-memory databases do not support standalone connections".into(),
48        })?;
49
50        let conn = rusqlite::Connection::open_with_flags(
51            path,
52            rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY
53                | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX
54                | rusqlite::OpenFlags::SQLITE_OPEN_URI,
55        )
56        .map_err(|e| map_err(e, "open_note_reader"))?;
57
58        conn.busy_timeout(config.busy_timeout)
59            .map_err(|e| map_err(e, "open_note_reader"))?;
60        conn.pragma_update(None, "foreign_keys", "ON")
61            .map_err(|e| map_err(e, "open_note_reader"))?;
62        conn.pragma_update(None, "synchronous", "NORMAL")
63            .map_err(|e| map_err(e, "open_note_reader"))?;
64
65        Ok(conn)
66    }
67
68    /// Write via pool writer (serializes writes through the mutex).
69    async fn with_writer<F, R>(&self, op: &'static str, f: F) -> Result<R, StorageError>
70    where
71        F: FnOnce(&rusqlite::Connection) -> Result<R, rusqlite::Error> + Send + 'static,
72        R: Send + 'static,
73    {
74        let pool = Arc::clone(&self.pool);
75        tokio::task::spawn_blocking(move || {
76            let guard = pool.try_writer().map_err(|e| map_sqlite_err(e, op))?;
77            f(guard.conn()).map_err(|e| map_err(e, op))
78        })
79        .await
80        .map_err(|e| StorageError::driver(StorageCapability::Notes, op, e))?
81    }
82
83    async fn with_reader<F, R>(&self, op: &'static str, f: F) -> Result<R, StorageError>
84    where
85        F: FnOnce(&rusqlite::Connection) -> Result<R, rusqlite::Error> + Send + 'static,
86        R: Send + 'static,
87    {
88        if self.is_file_backed {
89            let conn = self.open_standalone_reader()?;
90            tokio::task::spawn_blocking(move || f(&conn).map_err(|e| map_err(e, op)))
91                .await
92                .map_err(|e| StorageError::driver(StorageCapability::Notes, op, e))?
93        } else {
94            let pool = Arc::clone(&self.pool);
95            tokio::task::spawn_blocking(move || {
96                let guard = pool.reader().map_err(|e| map_sqlite_err(e, op))?;
97                f(guard.conn()).map_err(|e| map_err(e, op))
98            })
99            .await
100            .map_err(|e| StorageError::driver(StorageCapability::Notes, op, e))?
101        }
102    }
103}
104
105// =============================================================================
106// Helpers
107// =============================================================================
108
109fn read_note(row: &rusqlite::Row<'_>) -> Result<Note, rusqlite::Error> {
110    let id_str: String = row.get(0)?;
111    let namespace: String = row.get(1)?;
112    let kind: String = row.get(2)?;
113    let status: String = row.get(3)?;
114    let name: Option<String> = row.get(4)?;
115    let content: String = row.get(5)?;
116    let salience: Option<f64> = row.get(6)?;
117    let decay_factor: Option<f64> = row.get(7)?;
118    let expires_at: Option<i64> = row.get(8)?;
119    let properties_str: Option<String> = row.get(9)?;
120    let created_at: i64 = row.get(10)?;
121    let updated_at: i64 = row.get(11)?;
122    let deleted_at: Option<i64> = row.get(12)?;
123
124    let id = parse_uuid(&id_str)?;
125
126    let properties = properties_str
127        .map(|s| {
128            serde_json::from_str(&s).map_err(|e| {
129                rusqlite::Error::FromSqlConversionFailure(
130                    9,
131                    rusqlite::types::Type::Text,
132                    Box::new(e),
133                )
134            })
135        })
136        .transpose()?;
137
138    Ok(Note {
139        id,
140        namespace,
141        kind,
142        status,
143        name,
144        content,
145        salience,
146        decay_factor,
147        expires_at,
148        properties,
149        created_at,
150        updated_at,
151        deleted_at,
152    })
153}
154
155fn parse_uuid(s: &str) -> Result<Uuid, rusqlite::Error> {
156    Uuid::parse_str(s).map_err(|e| {
157        rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(e))
158    })
159}
160
161fn build_note_where(
162    namespace: &str,
163    kind: Option<&str>,
164) -> (String, Vec<Box<dyn rusqlite::types::ToSql>>) {
165    let mut conditions: Vec<String> = vec![
166        "namespace = ?1".to_string(),
167        "deleted_at IS NULL".to_string(),
168    ];
169    let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = vec![Box::new(namespace.to_string())];
170
171    if let Some(k) = kind {
172        params.push(Box::new(k.to_string()));
173        conditions.push(format!("kind = ?{}", params.len()));
174    }
175
176    let clause = format!(" WHERE {}", conditions.join(" AND "));
177    (clause, params)
178}
179
180/// Validate that a json_path is safe to interpolate into SQL.
181/// Accepts only `$.field` or `$.field.subfield` paths with alphanumeric/underscore segments.
182fn validate_json_path(path: &str) -> Result<(), StorageError> {
183    let valid = path.starts_with("$.")
184        && path[2..].split('.').all(|part| {
185            !part.is_empty() && part.chars().all(|c| c.is_ascii_alphanumeric() || c == '_')
186        });
187    if valid {
188        Ok(())
189    } else {
190        Err(StorageError::InvalidInput {
191            capability: StorageCapability::Notes,
192            operation: "query_notes_filtered".into(),
193            message: format!("invalid JSON path for note filter: {path:?}"),
194        })
195    }
196}
197
198fn json_extract_expr(path: &str) -> String {
199    format!("json_extract(properties, '{path}')")
200}
201
202fn json_type_expr(path: &str) -> String {
203    format!("json_type(properties, '{path}')")
204}
205
206fn sql_value_param(value: &SqlValue) -> Result<Box<dyn rusqlite::types::ToSql>, rusqlite::Error> {
207    Ok(match value {
208        SqlValue::Null => Box::new(Option::<String>::None),
209        SqlValue::Bool(v) => Box::new(*v as i64),
210        SqlValue::Integer(v) => Box::new(*v),
211        SqlValue::Float(v) => Box::new(*v),
212        SqlValue::Text(v) => Box::new(v.clone()),
213        SqlValue::Blob(v) => Box::new(v.clone()),
214        SqlValue::Json(v) => Box::new(
215            serde_json::to_string(v)
216                .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?,
217        ),
218        SqlValue::Uuid(v) => Box::new(v.to_string()),
219        SqlValue::Timestamp(v) => Box::new(v.timestamp_micros()),
220    })
221}
222
223fn build_note_filter_where(
224    namespace: &str,
225    filter: &NoteFilter,
226) -> Result<(String, Vec<Box<dyn rusqlite::types::ToSql>>), rusqlite::Error> {
227    let mut conditions = vec![
228        "namespace = ?1".to_string(),
229        "deleted_at IS NULL".to_string(),
230    ];
231    let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = vec![Box::new(namespace.to_string())];
232
233    if let Some(kind) = &filter.kind {
234        params.push(Box::new(kind.clone()));
235        conditions.push(format!("kind = ?{}", params.len()));
236    }
237
238    for pf in &filter.property_filters {
239        match pf.op {
240            FilterOp::EqOrMissing => {
241                let expr = json_extract_expr(&pf.json_path);
242                params.push(sql_value_param(&pf.value)?);
243                conditions.push(format!(
244                    "({expr} = ?{n} OR {expr} IS NULL)",
245                    n = params.len()
246                ));
247            }
248            FilterOp::JsonTypeEq => {
249                let type_expr = json_type_expr(&pf.json_path);
250                params.push(sql_value_param(&pf.value)?);
251                conditions.push(format!("{type_expr} = ?{}", params.len()));
252            }
253            FilterOp::JsonTypeNeMissing => {
254                let type_expr = json_type_expr(&pf.json_path);
255                params.push(sql_value_param(&pf.value)?);
256                let n = params.len();
257                conditions.push(format!("({type_expr} IS NULL OR {type_expr} != ?{n})"));
258            }
259            _ => {
260                let expr = json_extract_expr(&pf.json_path);
261                let op = match pf.op {
262                    FilterOp::Eq => "=",
263                    FilterOp::Ne => "!=",
264                    FilterOp::Lt => "<",
265                    FilterOp::Lte => "<=",
266                    FilterOp::Gt => ">",
267                    FilterOp::Gte => ">=",
268                    FilterOp::EqOrMissing | FilterOp::JsonTypeEq | FilterOp::JsonTypeNeMissing => {
269                        unreachable!()
270                    }
271                };
272                params.push(sql_value_param(&pf.value)?);
273                conditions.push(format!("{expr} {op} ?{}", params.len()));
274            }
275        }
276    }
277
278    Ok((format!(" WHERE {}", conditions.join(" AND ")), params))
279}
280
281// =============================================================================
282// NoteStore implementation
283// =============================================================================
284
285#[async_trait]
286impl NoteStore for SqlNoteStore {
287    async fn upsert_note(&self, note: Note) -> Result<(), StorageError> {
288        let namespace = note.namespace.clone();
289        let id_str = note.id.to_string();
290        let kind_str = note.kind.to_string();
291        let status_str = note.status.clone();
292        let properties_str = note
293            .properties
294            .as_ref()
295            .map(|v| serde_json::to_string(v).unwrap_or_default());
296
297        self.with_writer("upsert_note", move |conn| {
298            conn.execute(
299                "INSERT OR REPLACE INTO notes \
300                 (id, namespace, kind, status, name, content, salience, decay_factor, expires_at, \
301                  properties, created_at, updated_at, deleted_at) \
302                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
303                rusqlite::params![
304                    id_str,
305                    namespace,
306                    kind_str,
307                    status_str,
308                    note.name,
309                    note.content,
310                    note.salience,
311                    note.decay_factor,
312                    note.expires_at,
313                    properties_str,
314                    note.created_at,
315                    note.updated_at,
316                    note.deleted_at,
317                ],
318            )?;
319            Ok(())
320        })
321        .await
322    }
323
324    async fn upsert_notes(&self, notes: Vec<Note>) -> Result<BatchWriteSummary, StorageError> {
325        let attempted = notes.len() as u64;
326
327        self.with_writer("upsert_notes", move |conn| {
328            conn.execute_batch("BEGIN IMMEDIATE")?;
329            let mut affected = 0u64;
330            let mut failed = 0u64;
331            let mut first_error = String::new();
332
333            for note in &notes {
334                let id_str = note.id.to_string();
335                let kind_str = note.kind.to_string();
336                let status_str = note.status.clone();
337                let properties_str = note
338                    .properties
339                    .as_ref()
340                    .map(|v| serde_json::to_string(v).unwrap_or_default());
341
342                match conn.execute(
343                    "INSERT OR REPLACE INTO notes \
344                     (id, namespace, kind, status, name, content, salience, decay_factor, expires_at, \
345                      properties, created_at, updated_at, deleted_at) \
346                     VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
347                    rusqlite::params![
348                        id_str,
349                        &note.namespace,
350                        kind_str,
351                        status_str,
352                        &note.name,
353                        note.content,
354                        note.salience,
355                        note.decay_factor,
356                        note.expires_at,
357                        properties_str,
358                        note.created_at,
359                        note.updated_at,
360                        note.deleted_at,
361                    ],
362                ) {
363                    Ok(_) => affected += 1,
364                    Err(e) => {
365                        if first_error.is_empty() {
366                            first_error = e.to_string();
367                        }
368                        failed += 1;
369                    }
370                }
371            }
372
373            if let Err(e) = conn.execute_batch("COMMIT") {
374                let _ = conn.execute_batch("ROLLBACK");
375                return Err(e);
376            }
377            Ok(BatchWriteSummary {
378                attempted,
379                affected,
380                failed,
381                first_error,
382            })
383        })
384        .await
385    }
386
387    async fn get_note(&self, id: Uuid) -> Result<Option<Note>, StorageError> {
388        let id_str = id.to_string();
389
390        self.with_reader("get_note", move |conn| {
391            let mut stmt = conn.prepare(
392                "SELECT id, namespace, kind, status, name, content, salience, decay_factor, expires_at, \
393                 properties, created_at, updated_at, deleted_at \
394                 FROM notes WHERE id = ?1 AND deleted_at IS NULL",
395            )?;
396            let mut rows = stmt.query(rusqlite::params![id_str])?;
397            match rows.next()? {
398                Some(row) => Ok(Some(read_note(row)?)),
399                None => Ok(None),
400            }
401        })
402        .await
403    }
404
405    async fn get_note_including_deleted(&self, id: Uuid) -> Result<Option<Note>, StorageError> {
406        let id_str = id.to_string();
407
408        self.with_reader("get_note_including_deleted", move |conn| {
409            let mut stmt = conn.prepare(
410                "SELECT id, namespace, kind, status, name, content, salience, decay_factor, expires_at, \
411                 properties, created_at, updated_at, deleted_at \
412                 FROM notes WHERE id = ?1",
413            )?;
414            let mut rows = stmt.query(rusqlite::params![id_str])?;
415            match rows.next()? {
416                Some(row) => Ok(Some(read_note(row)?)),
417                None => Ok(None),
418            }
419        })
420        .await
421    }
422
423    async fn get_notes_batch(&self, ids: &[Uuid]) -> Result<Vec<Note>, StorageError> {
424        if ids.is_empty() {
425            return Ok(vec![]);
426        }
427        let id_strings: Vec<String> = ids.iter().map(|id| id.to_string()).collect();
428
429        self.with_reader("get_notes_batch", move |conn| {
430            let placeholders: String = (1..=id_strings.len())
431                .map(|i| format!("?{i}"))
432                .collect::<Vec<_>>()
433                .join(", ");
434            let sql = format!(
435                "SELECT id, namespace, kind, status, name, content, salience, decay_factor, expires_at, \
436                 properties, created_at, updated_at, deleted_at \
437                 FROM notes WHERE id IN ({placeholders}) AND deleted_at IS NULL"
438            );
439            let mut stmt = conn.prepare(&sql)?;
440            let params: Vec<&dyn rusqlite::types::ToSql> = id_strings
441                .iter()
442                .map(|s| s as &dyn rusqlite::types::ToSql)
443                .collect();
444            let rows = stmt.query_map(params.as_slice(), read_note)?;
445            let mut out = Vec::new();
446            for row in rows {
447                out.push(row?);
448            }
449            Ok(out)
450        })
451        .await
452    }
453
454    async fn delete_note(&self, id: Uuid, mode: DeleteMode) -> Result<bool, StorageError> {
455        let id_str = id.to_string();
456
457        match mode {
458            DeleteMode::Soft => {
459                self.with_writer("delete_note_soft", move |conn| {
460                    let now = chrono::Utc::now().timestamp_micros();
461                    let deleted = conn.execute(
462                        "UPDATE notes SET status = 'deleted', deleted_at = ?1 \
463                         WHERE id = ?2 AND deleted_at IS NULL",
464                        rusqlite::params![now, id_str],
465                    )?;
466                    Ok(deleted > 0)
467                })
468                .await
469            }
470            DeleteMode::Hard => {
471                self.with_writer("delete_note_hard", move |conn| {
472                    let deleted =
473                        conn.execute("DELETE FROM notes WHERE id = ?1", rusqlite::params![id_str])?;
474                    Ok(deleted > 0)
475                })
476                .await
477            }
478        }
479    }
480
481    async fn query_notes(
482        &self,
483        namespace: &str,
484        kind: Option<&str>,
485        page: PageRequest,
486    ) -> Result<Page<Note>, StorageError> {
487        let namespace = namespace.to_string();
488        let kind = kind.map(|k| k.to_string());
489
490        self.with_reader("query_notes", move |conn| {
491            let (count_sql, count_params) = build_note_where(&namespace, kind.as_deref());
492            let total: i64 = {
493                let sql = format!("SELECT COUNT(*) FROM notes{}", count_sql);
494                let mut stmt = conn.prepare(&sql)?;
495                let param_refs: Vec<&dyn rusqlite::types::ToSql> =
496                    count_params.iter().map(|p| p.as_ref()).collect();
497                stmt.query_row(param_refs.as_slice(), |row| row.get(0))?
498            };
499
500            let (where_sql, mut data_params) = build_note_where(&namespace, kind.as_deref());
501            data_params.push(Box::new(page.limit as i64));
502            data_params.push(Box::new(page.offset as i64));
503
504            let limit_idx = data_params.len() - 1;
505            let offset_idx = data_params.len();
506
507            let data_sql = format!(
508                "SELECT id, namespace, kind, status, name, content, salience, decay_factor, expires_at, \
509                 properties, created_at, updated_at, deleted_at \
510                 FROM notes{} ORDER BY created_at DESC LIMIT ?{} OFFSET ?{}",
511                where_sql, limit_idx, offset_idx,
512            );
513
514            let mut stmt = conn.prepare(&data_sql)?;
515            let param_refs: Vec<&dyn rusqlite::types::ToSql> =
516                data_params.iter().map(|p| p.as_ref()).collect();
517            let rows = stmt.query_map(param_refs.as_slice(), read_note)?;
518
519            let mut items = Vec::new();
520            for row in rows {
521                items.push(row?);
522            }
523
524            Ok(Page {
525                items,
526                total: Some(total as u64),
527            })
528        })
529        .await
530    }
531
532    async fn query_notes_filtered(
533        &self,
534        namespace: &str,
535        filter: &NoteFilter,
536        page: PageRequest,
537    ) -> Result<Page<Note>, StorageError> {
538        // Validate paths before entering spawn_blocking (closures return rusqlite::Error).
539        for pf in &filter.property_filters {
540            validate_json_path(&pf.json_path)?;
541        }
542        if let Some((path, _)) = &filter.order_by {
543            validate_json_path(path)?;
544        }
545
546        let namespace = namespace.to_string();
547        let filter = filter.clone();
548
549        self.with_reader("query_notes_filtered", move |conn| {
550            let (count_sql, count_params) = build_note_filter_where(&namespace, &filter)?;
551            let total: i64 = {
552                let sql = format!("SELECT COUNT(*) FROM notes{}", count_sql);
553                let mut stmt = conn.prepare(&sql)?;
554                let param_refs: Vec<&dyn rusqlite::types::ToSql> =
555                    count_params.iter().map(|p| p.as_ref()).collect();
556                stmt.query_row(param_refs.as_slice(), |row| row.get(0))?
557            };
558
559            let (where_sql, mut data_params) = build_note_filter_where(&namespace, &filter)?;
560            data_params.push(Box::new(page.limit as i64));
561            data_params.push(Box::new(page.offset as i64));
562
563            let order_clause = match &filter.order_by {
564                Some((path, dir)) => {
565                    let dir_str = match dir {
566                        SortDir::Asc => "ASC",
567                        SortDir::Desc => "DESC",
568                    };
569                    format!(" ORDER BY {} {dir_str}", json_extract_expr(path))
570                }
571                None => " ORDER BY created_at DESC".to_string(),
572            };
573
574            let limit_idx = data_params.len() - 1;
575            let offset_idx = data_params.len();
576            let data_sql = format!(
577                "SELECT id, namespace, kind, status, name, content, salience, decay_factor, \
578                 expires_at, properties, created_at, updated_at, deleted_at \
579                 FROM notes{}{order_clause} LIMIT ?{} OFFSET ?{}",
580                where_sql, limit_idx, offset_idx,
581            );
582
583            let mut stmt = conn.prepare(&data_sql)?;
584            let param_refs: Vec<&dyn rusqlite::types::ToSql> =
585                data_params.iter().map(|p| p.as_ref()).collect();
586            let rows = stmt.query_map(param_refs.as_slice(), read_note)?;
587
588            let mut items = Vec::new();
589            for row in rows {
590                items.push(row?);
591            }
592
593            Ok(Page {
594                items,
595                total: Some(total as u64),
596            })
597        })
598        .await
599    }
600
601    async fn count_notes(&self, namespace: &str, kind: Option<&str>) -> Result<u64, StorageError> {
602        let namespace = namespace.to_string();
603        let kind = kind.map(|k| k.to_string());
604
605        self.with_reader("count_notes", move |conn| {
606            let (where_sql, params) = build_note_where(&namespace, kind.as_deref());
607            let sql = format!("SELECT COUNT(*) FROM notes{}", where_sql);
608            let mut stmt = conn.prepare(&sql)?;
609            let param_refs: Vec<&dyn rusqlite::types::ToSql> =
610                params.iter().map(|p| p.as_ref()).collect();
611            let count: i64 = stmt.query_row(param_refs.as_slice(), |row| row.get(0))?;
612            Ok(count as u64)
613        })
614        .await
615    }
616}
617
618// =============================================================================
619// DDL
620// =============================================================================
621
622const NOTES_DDL: &str = include_str!("../../sql/notes-ddl.sql");
623
624pub(crate) fn ensure_notes_schema(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
625    conn.execute_batch(NOTES_DDL)
626}
627
628#[cfg(test)]
629#[path = "note_tests.rs"]
630mod tests;