1use std::sync::Arc;
4
5use async_trait::async_trait;
6use uuid::Uuid;
7
8use khive_storage::error::StorageError;
9use khive_storage::note::{FilterOp, Note, NoteFilter, SortDir};
10use khive_storage::types::{BatchWriteSummary, DeleteMode, Page, PageRequest, SqlValue};
11use khive_storage::NoteStore;
12use khive_storage::StorageCapability;
13
14use crate::error::SqliteError;
15use crate::pool::ConnectionPool;
16
17fn map_err(e: rusqlite::Error, op: &'static str) -> StorageError {
18 StorageError::driver(StorageCapability::Notes, op, e)
19}
20
21fn map_sqlite_err(e: SqliteError, op: &'static str) -> StorageError {
22 StorageError::driver(StorageCapability::Notes, op, e)
23}
24
25pub struct SqlNoteStore {
30 pool: Arc<ConnectionPool>,
31 is_file_backed: bool,
32}
33
34impl SqlNoteStore {
35 pub fn new(pool: Arc<ConnectionPool>, is_file_backed: bool) -> Self {
37 Self {
38 pool,
39 is_file_backed,
40 }
41 }
42
43 fn open_standalone_reader(&self) -> Result<rusqlite::Connection, StorageError> {
44 let config = self.pool.config();
45 let path = config.path.as_ref().ok_or_else(|| StorageError::Pool {
46 operation: "note_reader".into(),
47 message: "in-memory databases do not support standalone connections".into(),
48 })?;
49
50 let conn = rusqlite::Connection::open_with_flags(
51 path,
52 rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY
53 | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX
54 | rusqlite::OpenFlags::SQLITE_OPEN_URI,
55 )
56 .map_err(|e| map_err(e, "open_note_reader"))?;
57
58 conn.busy_timeout(config.busy_timeout)
59 .map_err(|e| map_err(e, "open_note_reader"))?;
60 conn.pragma_update(None, "foreign_keys", "ON")
61 .map_err(|e| map_err(e, "open_note_reader"))?;
62 conn.pragma_update(None, "synchronous", "NORMAL")
63 .map_err(|e| map_err(e, "open_note_reader"))?;
64
65 Ok(conn)
66 }
67
68 async fn with_writer<F, R>(&self, op: &'static str, f: F) -> Result<R, StorageError>
70 where
71 F: FnOnce(&rusqlite::Connection) -> Result<R, rusqlite::Error> + Send + 'static,
72 R: Send + 'static,
73 {
74 let pool = Arc::clone(&self.pool);
75 tokio::task::spawn_blocking(move || {
76 let guard = pool.try_writer().map_err(|e| map_sqlite_err(e, op))?;
77 f(guard.conn()).map_err(|e| map_err(e, op))
78 })
79 .await
80 .map_err(|e| StorageError::driver(StorageCapability::Notes, op, e))?
81 }
82
83 async fn with_reader<F, R>(&self, op: &'static str, f: F) -> Result<R, StorageError>
84 where
85 F: FnOnce(&rusqlite::Connection) -> Result<R, rusqlite::Error> + Send + 'static,
86 R: Send + 'static,
87 {
88 if self.is_file_backed {
89 let conn = self.open_standalone_reader()?;
90 tokio::task::spawn_blocking(move || f(&conn).map_err(|e| map_err(e, op)))
91 .await
92 .map_err(|e| StorageError::driver(StorageCapability::Notes, op, e))?
93 } else {
94 let pool = Arc::clone(&self.pool);
95 tokio::task::spawn_blocking(move || {
96 let guard = pool.reader().map_err(|e| map_sqlite_err(e, op))?;
97 f(guard.conn()).map_err(|e| map_err(e, op))
98 })
99 .await
100 .map_err(|e| StorageError::driver(StorageCapability::Notes, op, e))?
101 }
102 }
103}
104
105fn read_note(row: &rusqlite::Row<'_>) -> Result<Note, rusqlite::Error> {
110 let id_str: String = row.get(0)?;
111 let namespace: String = row.get(1)?;
112 let kind: String = row.get(2)?;
113 let status: String = row.get(3)?;
114 let name: Option<String> = row.get(4)?;
115 let content: String = row.get(5)?;
116 let salience: Option<f64> = row.get(6)?;
117 let decay_factor: Option<f64> = row.get(7)?;
118 let expires_at: Option<i64> = row.get(8)?;
119 let properties_str: Option<String> = row.get(9)?;
120 let created_at: i64 = row.get(10)?;
121 let updated_at: i64 = row.get(11)?;
122 let deleted_at: Option<i64> = row.get(12)?;
123
124 let id = parse_uuid(&id_str)?;
125
126 let properties = properties_str
127 .map(|s| {
128 serde_json::from_str(&s).map_err(|e| {
129 rusqlite::Error::FromSqlConversionFailure(
130 9,
131 rusqlite::types::Type::Text,
132 Box::new(e),
133 )
134 })
135 })
136 .transpose()?;
137
138 Ok(Note {
139 id,
140 namespace,
141 kind,
142 status,
143 name,
144 content,
145 salience,
146 decay_factor,
147 expires_at,
148 properties,
149 created_at,
150 updated_at,
151 deleted_at,
152 })
153}
154
155fn parse_uuid(s: &str) -> Result<Uuid, rusqlite::Error> {
156 Uuid::parse_str(s).map_err(|e| {
157 rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(e))
158 })
159}
160
161fn build_note_where(
162 namespace: &str,
163 kind: Option<&str>,
164) -> (String, Vec<Box<dyn rusqlite::types::ToSql>>) {
165 let mut conditions: Vec<String> = vec![
166 "namespace = ?1".to_string(),
167 "deleted_at IS NULL".to_string(),
168 ];
169 let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = vec![Box::new(namespace.to_string())];
170
171 if let Some(k) = kind {
172 params.push(Box::new(k.to_string()));
173 conditions.push(format!("kind = ?{}", params.len()));
174 }
175
176 let clause = format!(" WHERE {}", conditions.join(" AND "));
177 (clause, params)
178}
179
180fn validate_json_path(path: &str) -> Result<(), StorageError> {
183 let valid = path.starts_with("$.")
184 && path[2..].split('.').all(|part| {
185 !part.is_empty() && part.chars().all(|c| c.is_ascii_alphanumeric() || c == '_')
186 });
187 if valid {
188 Ok(())
189 } else {
190 Err(StorageError::InvalidInput {
191 capability: StorageCapability::Notes,
192 operation: "query_notes_filtered".into(),
193 message: format!("invalid JSON path for note filter: {path:?}"),
194 })
195 }
196}
197
198fn json_extract_expr(path: &str) -> String {
199 format!("json_extract(properties, '{path}')")
200}
201
202fn json_type_expr(path: &str) -> String {
203 format!("json_type(properties, '{path}')")
204}
205
206fn sql_value_param(value: &SqlValue) -> Result<Box<dyn rusqlite::types::ToSql>, rusqlite::Error> {
207 Ok(match value {
208 SqlValue::Null => Box::new(Option::<String>::None),
209 SqlValue::Bool(v) => Box::new(*v as i64),
210 SqlValue::Integer(v) => Box::new(*v),
211 SqlValue::Float(v) => Box::new(*v),
212 SqlValue::Text(v) => Box::new(v.clone()),
213 SqlValue::Blob(v) => Box::new(v.clone()),
214 SqlValue::Json(v) => Box::new(
215 serde_json::to_string(v)
216 .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?,
217 ),
218 SqlValue::Uuid(v) => Box::new(v.to_string()),
219 SqlValue::Timestamp(v) => Box::new(v.timestamp_micros()),
220 })
221}
222
223fn build_note_filter_where(
224 namespace: &str,
225 filter: &NoteFilter,
226) -> Result<(String, Vec<Box<dyn rusqlite::types::ToSql>>), rusqlite::Error> {
227 let mut conditions = vec![
228 "namespace = ?1".to_string(),
229 "deleted_at IS NULL".to_string(),
230 ];
231 let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = vec![Box::new(namespace.to_string())];
232
233 if let Some(kind) = &filter.kind {
234 params.push(Box::new(kind.clone()));
235 conditions.push(format!("kind = ?{}", params.len()));
236 }
237
238 for pf in &filter.property_filters {
239 match pf.op {
240 FilterOp::EqOrMissing => {
241 let expr = json_extract_expr(&pf.json_path);
242 params.push(sql_value_param(&pf.value)?);
243 conditions.push(format!(
244 "({expr} = ?{n} OR {expr} IS NULL)",
245 n = params.len()
246 ));
247 }
248 FilterOp::JsonTypeEq => {
249 let type_expr = json_type_expr(&pf.json_path);
250 params.push(sql_value_param(&pf.value)?);
251 conditions.push(format!("{type_expr} = ?{}", params.len()));
252 }
253 FilterOp::JsonTypeNeMissing => {
254 let type_expr = json_type_expr(&pf.json_path);
255 params.push(sql_value_param(&pf.value)?);
256 let n = params.len();
257 conditions.push(format!("({type_expr} IS NULL OR {type_expr} != ?{n})"));
258 }
259 _ => {
260 let expr = json_extract_expr(&pf.json_path);
261 let op = match pf.op {
262 FilterOp::Eq => "=",
263 FilterOp::Ne => "!=",
264 FilterOp::Lt => "<",
265 FilterOp::Lte => "<=",
266 FilterOp::Gt => ">",
267 FilterOp::Gte => ">=",
268 FilterOp::EqOrMissing | FilterOp::JsonTypeEq | FilterOp::JsonTypeNeMissing => {
269 unreachable!()
270 }
271 };
272 params.push(sql_value_param(&pf.value)?);
273 conditions.push(format!("{expr} {op} ?{}", params.len()));
274 }
275 }
276 }
277
278 Ok((format!(" WHERE {}", conditions.join(" AND ")), params))
279}
280
281#[async_trait]
286impl NoteStore for SqlNoteStore {
287 async fn upsert_note(&self, note: Note) -> Result<(), StorageError> {
288 let namespace = note.namespace.clone();
289 let id_str = note.id.to_string();
290 let kind_str = note.kind.to_string();
291 let status_str = note.status.clone();
292 let properties_str = note
293 .properties
294 .as_ref()
295 .map(|v| serde_json::to_string(v).unwrap_or_default());
296
297 self.with_writer("upsert_note", move |conn| {
298 conn.execute(
299 "INSERT OR REPLACE INTO notes \
300 (id, namespace, kind, status, name, content, salience, decay_factor, expires_at, \
301 properties, created_at, updated_at, deleted_at) \
302 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
303 rusqlite::params![
304 id_str,
305 namespace,
306 kind_str,
307 status_str,
308 note.name,
309 note.content,
310 note.salience,
311 note.decay_factor,
312 note.expires_at,
313 properties_str,
314 note.created_at,
315 note.updated_at,
316 note.deleted_at,
317 ],
318 )?;
319 Ok(())
320 })
321 .await
322 }
323
324 async fn upsert_notes(&self, notes: Vec<Note>) -> Result<BatchWriteSummary, StorageError> {
325 let attempted = notes.len() as u64;
326
327 self.with_writer("upsert_notes", move |conn| {
328 conn.execute_batch("BEGIN IMMEDIATE")?;
329 let mut affected = 0u64;
330 let mut failed = 0u64;
331 let mut first_error = String::new();
332
333 for note in ¬es {
334 let id_str = note.id.to_string();
335 let kind_str = note.kind.to_string();
336 let status_str = note.status.clone();
337 let properties_str = note
338 .properties
339 .as_ref()
340 .map(|v| serde_json::to_string(v).unwrap_or_default());
341
342 match conn.execute(
343 "INSERT OR REPLACE INTO notes \
344 (id, namespace, kind, status, name, content, salience, decay_factor, expires_at, \
345 properties, created_at, updated_at, deleted_at) \
346 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
347 rusqlite::params![
348 id_str,
349 ¬e.namespace,
350 kind_str,
351 status_str,
352 ¬e.name,
353 note.content,
354 note.salience,
355 note.decay_factor,
356 note.expires_at,
357 properties_str,
358 note.created_at,
359 note.updated_at,
360 note.deleted_at,
361 ],
362 ) {
363 Ok(_) => affected += 1,
364 Err(e) => {
365 if first_error.is_empty() {
366 first_error = e.to_string();
367 }
368 failed += 1;
369 }
370 }
371 }
372
373 if let Err(e) = conn.execute_batch("COMMIT") {
374 let _ = conn.execute_batch("ROLLBACK");
375 return Err(e);
376 }
377 Ok(BatchWriteSummary {
378 attempted,
379 affected,
380 failed,
381 first_error,
382 })
383 })
384 .await
385 }
386
387 async fn get_note(&self, id: Uuid) -> Result<Option<Note>, StorageError> {
388 let id_str = id.to_string();
389
390 self.with_reader("get_note", move |conn| {
391 let mut stmt = conn.prepare(
392 "SELECT id, namespace, kind, status, name, content, salience, decay_factor, expires_at, \
393 properties, created_at, updated_at, deleted_at \
394 FROM notes WHERE id = ?1 AND deleted_at IS NULL",
395 )?;
396 let mut rows = stmt.query(rusqlite::params![id_str])?;
397 match rows.next()? {
398 Some(row) => Ok(Some(read_note(row)?)),
399 None => Ok(None),
400 }
401 })
402 .await
403 }
404
405 async fn get_notes_batch(&self, ids: &[Uuid]) -> Result<Vec<Note>, StorageError> {
406 if ids.is_empty() {
407 return Ok(vec![]);
408 }
409 let id_strings: Vec<String> = ids.iter().map(|id| id.to_string()).collect();
410
411 self.with_reader("get_notes_batch", move |conn| {
412 let placeholders: String = (1..=id_strings.len())
413 .map(|i| format!("?{i}"))
414 .collect::<Vec<_>>()
415 .join(", ");
416 let sql = format!(
417 "SELECT id, namespace, kind, status, name, content, salience, decay_factor, expires_at, \
418 properties, created_at, updated_at, deleted_at \
419 FROM notes WHERE id IN ({placeholders}) AND deleted_at IS NULL"
420 );
421 let mut stmt = conn.prepare(&sql)?;
422 let params: Vec<&dyn rusqlite::types::ToSql> = id_strings
423 .iter()
424 .map(|s| s as &dyn rusqlite::types::ToSql)
425 .collect();
426 let rows = stmt.query_map(params.as_slice(), read_note)?;
427 let mut out = Vec::new();
428 for row in rows {
429 out.push(row?);
430 }
431 Ok(out)
432 })
433 .await
434 }
435
436 async fn delete_note(&self, id: Uuid, mode: DeleteMode) -> Result<bool, StorageError> {
437 let id_str = id.to_string();
438
439 match mode {
440 DeleteMode::Soft => {
441 self.with_writer("delete_note_soft", move |conn| {
442 let now = chrono::Utc::now().timestamp_micros();
443 let deleted = conn.execute(
444 "UPDATE notes SET status = 'deleted', deleted_at = ?1 \
445 WHERE id = ?2 AND deleted_at IS NULL",
446 rusqlite::params![now, id_str],
447 )?;
448 Ok(deleted > 0)
449 })
450 .await
451 }
452 DeleteMode::Hard => {
453 self.with_writer("delete_note_hard", move |conn| {
454 let deleted =
455 conn.execute("DELETE FROM notes WHERE id = ?1", rusqlite::params![id_str])?;
456 Ok(deleted > 0)
457 })
458 .await
459 }
460 }
461 }
462
463 async fn query_notes(
464 &self,
465 namespace: &str,
466 kind: Option<&str>,
467 page: PageRequest,
468 ) -> Result<Page<Note>, StorageError> {
469 let namespace = namespace.to_string();
470 let kind = kind.map(|k| k.to_string());
471
472 self.with_reader("query_notes", move |conn| {
473 let (count_sql, count_params) = build_note_where(&namespace, kind.as_deref());
474 let total: i64 = {
475 let sql = format!("SELECT COUNT(*) FROM notes{}", count_sql);
476 let mut stmt = conn.prepare(&sql)?;
477 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
478 count_params.iter().map(|p| p.as_ref()).collect();
479 stmt.query_row(param_refs.as_slice(), |row| row.get(0))?
480 };
481
482 let (where_sql, mut data_params) = build_note_where(&namespace, kind.as_deref());
483 data_params.push(Box::new(page.limit as i64));
484 data_params.push(Box::new(page.offset as i64));
485
486 let limit_idx = data_params.len() - 1;
487 let offset_idx = data_params.len();
488
489 let data_sql = format!(
490 "SELECT id, namespace, kind, status, name, content, salience, decay_factor, expires_at, \
491 properties, created_at, updated_at, deleted_at \
492 FROM notes{} ORDER BY created_at DESC LIMIT ?{} OFFSET ?{}",
493 where_sql, limit_idx, offset_idx,
494 );
495
496 let mut stmt = conn.prepare(&data_sql)?;
497 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
498 data_params.iter().map(|p| p.as_ref()).collect();
499 let rows = stmt.query_map(param_refs.as_slice(), read_note)?;
500
501 let mut items = Vec::new();
502 for row in rows {
503 items.push(row?);
504 }
505
506 Ok(Page {
507 items,
508 total: Some(total as u64),
509 })
510 })
511 .await
512 }
513
514 async fn query_notes_filtered(
515 &self,
516 namespace: &str,
517 filter: &NoteFilter,
518 page: PageRequest,
519 ) -> Result<Page<Note>, StorageError> {
520 for pf in &filter.property_filters {
522 validate_json_path(&pf.json_path)?;
523 }
524 if let Some((path, _)) = &filter.order_by {
525 validate_json_path(path)?;
526 }
527
528 let namespace = namespace.to_string();
529 let filter = filter.clone();
530
531 self.with_reader("query_notes_filtered", move |conn| {
532 let (count_sql, count_params) = build_note_filter_where(&namespace, &filter)?;
533 let total: i64 = {
534 let sql = format!("SELECT COUNT(*) FROM notes{}", count_sql);
535 let mut stmt = conn.prepare(&sql)?;
536 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
537 count_params.iter().map(|p| p.as_ref()).collect();
538 stmt.query_row(param_refs.as_slice(), |row| row.get(0))?
539 };
540
541 let (where_sql, mut data_params) = build_note_filter_where(&namespace, &filter)?;
542 data_params.push(Box::new(page.limit as i64));
543 data_params.push(Box::new(page.offset as i64));
544
545 let order_clause = match &filter.order_by {
546 Some((path, dir)) => {
547 let dir_str = match dir {
548 SortDir::Asc => "ASC",
549 SortDir::Desc => "DESC",
550 };
551 format!(" ORDER BY {} {dir_str}", json_extract_expr(path))
552 }
553 None => " ORDER BY created_at DESC".to_string(),
554 };
555
556 let limit_idx = data_params.len() - 1;
557 let offset_idx = data_params.len();
558 let data_sql = format!(
559 "SELECT id, namespace, kind, status, name, content, salience, decay_factor, \
560 expires_at, properties, created_at, updated_at, deleted_at \
561 FROM notes{}{order_clause} LIMIT ?{} OFFSET ?{}",
562 where_sql, limit_idx, offset_idx,
563 );
564
565 let mut stmt = conn.prepare(&data_sql)?;
566 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
567 data_params.iter().map(|p| p.as_ref()).collect();
568 let rows = stmt.query_map(param_refs.as_slice(), read_note)?;
569
570 let mut items = Vec::new();
571 for row in rows {
572 items.push(row?);
573 }
574
575 Ok(Page {
576 items,
577 total: Some(total as u64),
578 })
579 })
580 .await
581 }
582
583 async fn count_notes(&self, namespace: &str, kind: Option<&str>) -> Result<u64, StorageError> {
584 let namespace = namespace.to_string();
585 let kind = kind.map(|k| k.to_string());
586
587 self.with_reader("count_notes", move |conn| {
588 let (where_sql, params) = build_note_where(&namespace, kind.as_deref());
589 let sql = format!("SELECT COUNT(*) FROM notes{}", where_sql);
590 let mut stmt = conn.prepare(&sql)?;
591 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
592 params.iter().map(|p| p.as_ref()).collect();
593 let count: i64 = stmt.query_row(param_refs.as_slice(), |row| row.get(0))?;
594 Ok(count as u64)
595 })
596 .await
597 }
598}
599
600const NOTES_DDL: &str = "\
605 CREATE TABLE IF NOT EXISTS notes (\
606 id TEXT PRIMARY KEY,\
607 namespace TEXT NOT NULL,\
608 kind TEXT NOT NULL,\
609 status TEXT NOT NULL DEFAULT 'active',\
610 name TEXT,\
611 content TEXT NOT NULL DEFAULT '',\
612 salience REAL,\
613 decay_factor REAL,\
614 expires_at INTEGER,\
615 properties TEXT,\
616 created_at INTEGER NOT NULL,\
617 updated_at INTEGER NOT NULL,\
618 deleted_at INTEGER\
619 );\
620 CREATE INDEX IF NOT EXISTS idx_notes_namespace ON notes(namespace);\
621 CREATE INDEX IF NOT EXISTS idx_notes_kind ON notes(namespace, kind);\
622 CREATE INDEX IF NOT EXISTS idx_notes_created ON notes(created_at DESC);\
623";
624
625pub(crate) fn ensure_notes_schema(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
626 conn.execute_batch(NOTES_DDL)
627}
628
629#[cfg(test)]
630#[path = "note_tests.rs"]
631mod tests;