1use std::sync::Arc;
4
5use async_trait::async_trait;
6use uuid::Uuid;
7
8use khive_storage::error::StorageError;
9use khive_storage::note::{FilterOp, Note, NoteFilter, SortDir};
10use khive_storage::types::{BatchWriteSummary, DeleteMode, Page, PageRequest, SqlValue};
11use khive_storage::NoteStore;
12use khive_storage::StorageCapability;
13
14use crate::error::SqliteError;
15use crate::pool::ConnectionPool;
16
17fn map_err(e: rusqlite::Error, op: &'static str) -> StorageError {
18 StorageError::driver(StorageCapability::Notes, op, e)
19}
20
21fn map_sqlite_err(e: SqliteError, op: &'static str) -> StorageError {
22 StorageError::driver(StorageCapability::Notes, op, e)
23}
24
25pub struct SqlNoteStore {
30 pool: Arc<ConnectionPool>,
31 is_file_backed: bool,
32}
33
34impl SqlNoteStore {
35 pub fn new(pool: Arc<ConnectionPool>, is_file_backed: bool) -> Self {
37 Self {
38 pool,
39 is_file_backed,
40 }
41 }
42
43 fn open_standalone_reader(&self) -> Result<rusqlite::Connection, StorageError> {
44 let config = self.pool.config();
45 let path = config.path.as_ref().ok_or_else(|| StorageError::Pool {
46 operation: "note_reader".into(),
47 message: "in-memory databases do not support standalone connections".into(),
48 })?;
49
50 let conn = rusqlite::Connection::open_with_flags(
51 path,
52 rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY
53 | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX
54 | rusqlite::OpenFlags::SQLITE_OPEN_URI,
55 )
56 .map_err(|e| map_err(e, "open_note_reader"))?;
57
58 conn.busy_timeout(config.busy_timeout)
59 .map_err(|e| map_err(e, "open_note_reader"))?;
60 conn.pragma_update(None, "foreign_keys", "ON")
61 .map_err(|e| map_err(e, "open_note_reader"))?;
62 conn.pragma_update(None, "synchronous", "NORMAL")
63 .map_err(|e| map_err(e, "open_note_reader"))?;
64
65 Ok(conn)
66 }
67
68 async fn with_writer<F, R>(&self, op: &'static str, f: F) -> Result<R, StorageError>
70 where
71 F: FnOnce(&rusqlite::Connection) -> Result<R, rusqlite::Error> + Send + 'static,
72 R: Send + 'static,
73 {
74 let pool = Arc::clone(&self.pool);
75 tokio::task::spawn_blocking(move || {
76 let guard = pool.try_writer().map_err(|e| map_sqlite_err(e, op))?;
77 f(guard.conn()).map_err(|e| map_err(e, op))
78 })
79 .await
80 .map_err(|e| StorageError::driver(StorageCapability::Notes, op, e))?
81 }
82
83 async fn with_reader<F, R>(&self, op: &'static str, f: F) -> Result<R, StorageError>
84 where
85 F: FnOnce(&rusqlite::Connection) -> Result<R, rusqlite::Error> + Send + 'static,
86 R: Send + 'static,
87 {
88 if self.is_file_backed {
89 let conn = self.open_standalone_reader()?;
90 tokio::task::spawn_blocking(move || f(&conn).map_err(|e| map_err(e, op)))
91 .await
92 .map_err(|e| StorageError::driver(StorageCapability::Notes, op, e))?
93 } else {
94 let pool = Arc::clone(&self.pool);
95 tokio::task::spawn_blocking(move || {
96 let guard = pool.reader().map_err(|e| map_sqlite_err(e, op))?;
97 f(guard.conn()).map_err(|e| map_err(e, op))
98 })
99 .await
100 .map_err(|e| StorageError::driver(StorageCapability::Notes, op, e))?
101 }
102 }
103}
104
105fn read_note(row: &rusqlite::Row<'_>) -> Result<Note, rusqlite::Error> {
110 let id_str: String = row.get(0)?;
111 let namespace: String = row.get(1)?;
112 let kind: String = row.get(2)?;
113 let status: String = row.get(3)?;
114 let name: Option<String> = row.get(4)?;
115 let content: String = row.get(5)?;
116 let salience: Option<f64> = row.get(6)?;
117 let decay_factor: Option<f64> = row.get(7)?;
118 let expires_at: Option<i64> = row.get(8)?;
119 let properties_str: Option<String> = row.get(9)?;
120 let created_at: i64 = row.get(10)?;
121 let updated_at: i64 = row.get(11)?;
122 let deleted_at: Option<i64> = row.get(12)?;
123
124 let id = parse_uuid(&id_str)?;
125
126 let properties = properties_str
127 .map(|s| {
128 serde_json::from_str(&s).map_err(|e| {
129 rusqlite::Error::FromSqlConversionFailure(
130 9,
131 rusqlite::types::Type::Text,
132 Box::new(e),
133 )
134 })
135 })
136 .transpose()?;
137
138 Ok(Note {
139 id,
140 namespace,
141 kind,
142 status,
143 name,
144 content,
145 salience,
146 decay_factor,
147 expires_at,
148 properties,
149 created_at,
150 updated_at,
151 deleted_at,
152 })
153}
154
155fn parse_uuid(s: &str) -> Result<Uuid, rusqlite::Error> {
156 Uuid::parse_str(s).map_err(|e| {
157 rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(e))
158 })
159}
160
161fn build_note_where(
162 namespace: &str,
163 kind: Option<&str>,
164) -> (String, Vec<Box<dyn rusqlite::types::ToSql>>) {
165 let mut conditions: Vec<String> = vec![
166 "namespace = ?1".to_string(),
167 "deleted_at IS NULL".to_string(),
168 ];
169 let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = vec![Box::new(namespace.to_string())];
170
171 if let Some(k) = kind {
172 params.push(Box::new(k.to_string()));
173 conditions.push(format!("kind = ?{}", params.len()));
174 }
175
176 let clause = format!(" WHERE {}", conditions.join(" AND "));
177 (clause, params)
178}
179
180fn validate_json_path(path: &str) -> Result<(), StorageError> {
183 let valid = path.starts_with("$.")
184 && path[2..].split('.').all(|part| {
185 !part.is_empty() && part.chars().all(|c| c.is_ascii_alphanumeric() || c == '_')
186 });
187 if valid {
188 Ok(())
189 } else {
190 Err(StorageError::InvalidInput {
191 capability: StorageCapability::Notes,
192 operation: "query_notes_filtered".into(),
193 message: format!("invalid JSON path for note filter: {path:?}"),
194 })
195 }
196}
197
198fn json_extract_expr(path: &str) -> String {
199 format!("json_extract(properties, '{path}')")
200}
201
202fn json_type_expr(path: &str) -> String {
203 format!("json_type(properties, '{path}')")
204}
205
206fn sql_value_param(value: &SqlValue) -> Result<Box<dyn rusqlite::types::ToSql>, rusqlite::Error> {
207 Ok(match value {
208 SqlValue::Null => Box::new(Option::<String>::None),
209 SqlValue::Bool(v) => Box::new(*v as i64),
210 SqlValue::Integer(v) => Box::new(*v),
211 SqlValue::Float(v) => Box::new(*v),
212 SqlValue::Text(v) => Box::new(v.clone()),
213 SqlValue::Blob(v) => Box::new(v.clone()),
214 SqlValue::Json(v) => Box::new(
215 serde_json::to_string(v)
216 .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?,
217 ),
218 SqlValue::Uuid(v) => Box::new(v.to_string()),
219 SqlValue::Timestamp(v) => Box::new(v.timestamp_micros()),
220 })
221}
222
223fn build_note_filter_where(
224 namespace: &str,
225 filter: &NoteFilter,
226) -> Result<(String, Vec<Box<dyn rusqlite::types::ToSql>>), rusqlite::Error> {
227 let mut conditions = vec![
228 "namespace = ?1".to_string(),
229 "deleted_at IS NULL".to_string(),
230 ];
231 let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = vec![Box::new(namespace.to_string())];
232
233 if let Some(kind) = &filter.kind {
234 params.push(Box::new(kind.clone()));
235 conditions.push(format!("kind = ?{}", params.len()));
236 }
237
238 for pf in &filter.property_filters {
239 match pf.op {
240 FilterOp::EqOrMissing => {
241 let expr = json_extract_expr(&pf.json_path);
242 params.push(sql_value_param(&pf.value)?);
243 conditions.push(format!(
244 "({expr} = ?{n} OR {expr} IS NULL)",
245 n = params.len()
246 ));
247 }
248 FilterOp::JsonTypeEq => {
249 let type_expr = json_type_expr(&pf.json_path);
250 params.push(sql_value_param(&pf.value)?);
251 conditions.push(format!("{type_expr} = ?{}", params.len()));
252 }
253 FilterOp::JsonTypeNeMissing => {
254 let type_expr = json_type_expr(&pf.json_path);
255 params.push(sql_value_param(&pf.value)?);
256 let n = params.len();
257 conditions.push(format!("({type_expr} IS NULL OR {type_expr} != ?{n})"));
258 }
259 _ => {
260 let expr = json_extract_expr(&pf.json_path);
261 let op = match pf.op {
262 FilterOp::Eq => "=",
263 FilterOp::Ne => "!=",
264 FilterOp::Lt => "<",
265 FilterOp::Lte => "<=",
266 FilterOp::Gt => ">",
267 FilterOp::Gte => ">=",
268 FilterOp::EqOrMissing | FilterOp::JsonTypeEq | FilterOp::JsonTypeNeMissing => {
269 unreachable!()
270 }
271 };
272 params.push(sql_value_param(&pf.value)?);
273 conditions.push(format!("{expr} {op} ?{}", params.len()));
274 }
275 }
276 }
277
278 Ok((format!(" WHERE {}", conditions.join(" AND ")), params))
279}
280
281#[async_trait]
286impl NoteStore for SqlNoteStore {
287 async fn upsert_note(&self, note: Note) -> Result<(), StorageError> {
288 let namespace = note.namespace.clone();
289 let id_str = note.id.to_string();
290 let kind_str = note.kind.to_string();
291 let status_str = note.status.clone();
292 let properties_str = note
293 .properties
294 .as_ref()
295 .map(|v| serde_json::to_string(v).unwrap_or_default());
296
297 self.with_writer("upsert_note", move |conn| {
298 conn.execute(
299 "INSERT OR REPLACE INTO notes \
300 (id, namespace, kind, status, name, content, salience, decay_factor, expires_at, \
301 properties, created_at, updated_at, deleted_at) \
302 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
303 rusqlite::params![
304 id_str,
305 namespace,
306 kind_str,
307 status_str,
308 note.name,
309 note.content,
310 note.salience,
311 note.decay_factor,
312 note.expires_at,
313 properties_str,
314 note.created_at,
315 note.updated_at,
316 note.deleted_at,
317 ],
318 )?;
319 Ok(())
320 })
321 .await
322 }
323
324 async fn upsert_notes(&self, notes: Vec<Note>) -> Result<BatchWriteSummary, StorageError> {
325 let attempted = notes.len() as u64;
326
327 self.with_writer("upsert_notes", move |conn| {
328 conn.execute_batch("BEGIN IMMEDIATE")?;
329 let mut affected = 0u64;
330 let mut failed = 0u64;
331 let mut first_error = String::new();
332
333 for note in ¬es {
334 let id_str = note.id.to_string();
335 let kind_str = note.kind.to_string();
336 let status_str = note.status.clone();
337 let properties_str = note
338 .properties
339 .as_ref()
340 .map(|v| serde_json::to_string(v).unwrap_or_default());
341
342 match conn.execute(
343 "INSERT OR REPLACE INTO notes \
344 (id, namespace, kind, status, name, content, salience, decay_factor, expires_at, \
345 properties, created_at, updated_at, deleted_at) \
346 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
347 rusqlite::params![
348 id_str,
349 ¬e.namespace,
350 kind_str,
351 status_str,
352 ¬e.name,
353 note.content,
354 note.salience,
355 note.decay_factor,
356 note.expires_at,
357 properties_str,
358 note.created_at,
359 note.updated_at,
360 note.deleted_at,
361 ],
362 ) {
363 Ok(_) => affected += 1,
364 Err(e) => {
365 if first_error.is_empty() {
366 first_error = e.to_string();
367 }
368 failed += 1;
369 }
370 }
371 }
372
373 if let Err(e) = conn.execute_batch("COMMIT") {
374 let _ = conn.execute_batch("ROLLBACK");
375 return Err(e);
376 }
377 Ok(BatchWriteSummary {
378 attempted,
379 affected,
380 failed,
381 first_error,
382 })
383 })
384 .await
385 }
386
387 async fn get_note(&self, id: Uuid) -> Result<Option<Note>, StorageError> {
388 let id_str = id.to_string();
389
390 self.with_reader("get_note", move |conn| {
391 let mut stmt = conn.prepare(
392 "SELECT id, namespace, kind, status, name, content, salience, decay_factor, expires_at, \
393 properties, created_at, updated_at, deleted_at \
394 FROM notes WHERE id = ?1 AND deleted_at IS NULL",
395 )?;
396 let mut rows = stmt.query(rusqlite::params![id_str])?;
397 match rows.next()? {
398 Some(row) => Ok(Some(read_note(row)?)),
399 None => Ok(None),
400 }
401 })
402 .await
403 }
404
405 async fn get_note_including_deleted(&self, id: Uuid) -> Result<Option<Note>, StorageError> {
406 let id_str = id.to_string();
407
408 self.with_reader("get_note_including_deleted", move |conn| {
409 let mut stmt = conn.prepare(
410 "SELECT id, namespace, kind, status, name, content, salience, decay_factor, expires_at, \
411 properties, created_at, updated_at, deleted_at \
412 FROM notes WHERE id = ?1",
413 )?;
414 let mut rows = stmt.query(rusqlite::params![id_str])?;
415 match rows.next()? {
416 Some(row) => Ok(Some(read_note(row)?)),
417 None => Ok(None),
418 }
419 })
420 .await
421 }
422
423 async fn get_notes_batch(&self, ids: &[Uuid]) -> Result<Vec<Note>, StorageError> {
424 if ids.is_empty() {
425 return Ok(vec![]);
426 }
427 let id_strings: Vec<String> = ids.iter().map(|id| id.to_string()).collect();
428
429 self.with_reader("get_notes_batch", move |conn| {
430 let placeholders: String = (1..=id_strings.len())
431 .map(|i| format!("?{i}"))
432 .collect::<Vec<_>>()
433 .join(", ");
434 let sql = format!(
435 "SELECT id, namespace, kind, status, name, content, salience, decay_factor, expires_at, \
436 properties, created_at, updated_at, deleted_at \
437 FROM notes WHERE id IN ({placeholders}) AND deleted_at IS NULL"
438 );
439 let mut stmt = conn.prepare(&sql)?;
440 let params: Vec<&dyn rusqlite::types::ToSql> = id_strings
441 .iter()
442 .map(|s| s as &dyn rusqlite::types::ToSql)
443 .collect();
444 let rows = stmt.query_map(params.as_slice(), read_note)?;
445 let mut out = Vec::new();
446 for row in rows {
447 out.push(row?);
448 }
449 Ok(out)
450 })
451 .await
452 }
453
454 async fn delete_note(&self, id: Uuid, mode: DeleteMode) -> Result<bool, StorageError> {
455 let id_str = id.to_string();
456
457 match mode {
458 DeleteMode::Soft => {
459 self.with_writer("delete_note_soft", move |conn| {
460 let now = chrono::Utc::now().timestamp_micros();
461 let deleted = conn.execute(
462 "UPDATE notes SET status = 'deleted', deleted_at = ?1 \
463 WHERE id = ?2 AND deleted_at IS NULL",
464 rusqlite::params![now, id_str],
465 )?;
466 Ok(deleted > 0)
467 })
468 .await
469 }
470 DeleteMode::Hard => {
471 self.with_writer("delete_note_hard", move |conn| {
472 let deleted =
473 conn.execute("DELETE FROM notes WHERE id = ?1", rusqlite::params![id_str])?;
474 Ok(deleted > 0)
475 })
476 .await
477 }
478 }
479 }
480
481 async fn query_notes(
482 &self,
483 namespace: &str,
484 kind: Option<&str>,
485 page: PageRequest,
486 ) -> Result<Page<Note>, StorageError> {
487 let namespace = namespace.to_string();
488 let kind = kind.map(|k| k.to_string());
489
490 self.with_reader("query_notes", move |conn| {
491 let (count_sql, count_params) = build_note_where(&namespace, kind.as_deref());
492 let total: i64 = {
493 let sql = format!("SELECT COUNT(*) FROM notes{}", count_sql);
494 let mut stmt = conn.prepare(&sql)?;
495 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
496 count_params.iter().map(|p| p.as_ref()).collect();
497 stmt.query_row(param_refs.as_slice(), |row| row.get(0))?
498 };
499
500 let (where_sql, mut data_params) = build_note_where(&namespace, kind.as_deref());
501 data_params.push(Box::new(page.limit as i64));
502 data_params.push(Box::new(page.offset as i64));
503
504 let limit_idx = data_params.len() - 1;
505 let offset_idx = data_params.len();
506
507 let data_sql = format!(
508 "SELECT id, namespace, kind, status, name, content, salience, decay_factor, expires_at, \
509 properties, created_at, updated_at, deleted_at \
510 FROM notes{} ORDER BY created_at DESC LIMIT ?{} OFFSET ?{}",
511 where_sql, limit_idx, offset_idx,
512 );
513
514 let mut stmt = conn.prepare(&data_sql)?;
515 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
516 data_params.iter().map(|p| p.as_ref()).collect();
517 let rows = stmt.query_map(param_refs.as_slice(), read_note)?;
518
519 let mut items = Vec::new();
520 for row in rows {
521 items.push(row?);
522 }
523
524 Ok(Page {
525 items,
526 total: Some(total as u64),
527 })
528 })
529 .await
530 }
531
532 async fn query_notes_filtered(
533 &self,
534 namespace: &str,
535 filter: &NoteFilter,
536 page: PageRequest,
537 ) -> Result<Page<Note>, StorageError> {
538 for pf in &filter.property_filters {
540 validate_json_path(&pf.json_path)?;
541 }
542 if let Some((path, _)) = &filter.order_by {
543 validate_json_path(path)?;
544 }
545
546 let namespace = namespace.to_string();
547 let filter = filter.clone();
548
549 self.with_reader("query_notes_filtered", move |conn| {
550 let (count_sql, count_params) = build_note_filter_where(&namespace, &filter)?;
551 let total: i64 = {
552 let sql = format!("SELECT COUNT(*) FROM notes{}", count_sql);
553 let mut stmt = conn.prepare(&sql)?;
554 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
555 count_params.iter().map(|p| p.as_ref()).collect();
556 stmt.query_row(param_refs.as_slice(), |row| row.get(0))?
557 };
558
559 let (where_sql, mut data_params) = build_note_filter_where(&namespace, &filter)?;
560 data_params.push(Box::new(page.limit as i64));
561 data_params.push(Box::new(page.offset as i64));
562
563 let order_clause = match &filter.order_by {
564 Some((path, dir)) => {
565 let dir_str = match dir {
566 SortDir::Asc => "ASC",
567 SortDir::Desc => "DESC",
568 };
569 format!(" ORDER BY {} {dir_str}", json_extract_expr(path))
570 }
571 None => " ORDER BY created_at DESC".to_string(),
572 };
573
574 let limit_idx = data_params.len() - 1;
575 let offset_idx = data_params.len();
576 let data_sql = format!(
577 "SELECT id, namespace, kind, status, name, content, salience, decay_factor, \
578 expires_at, properties, created_at, updated_at, deleted_at \
579 FROM notes{}{order_clause} LIMIT ?{} OFFSET ?{}",
580 where_sql, limit_idx, offset_idx,
581 );
582
583 let mut stmt = conn.prepare(&data_sql)?;
584 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
585 data_params.iter().map(|p| p.as_ref()).collect();
586 let rows = stmt.query_map(param_refs.as_slice(), read_note)?;
587
588 let mut items = Vec::new();
589 for row in rows {
590 items.push(row?);
591 }
592
593 Ok(Page {
594 items,
595 total: Some(total as u64),
596 })
597 })
598 .await
599 }
600
601 async fn count_notes(&self, namespace: &str, kind: Option<&str>) -> Result<u64, StorageError> {
602 let namespace = namespace.to_string();
603 let kind = kind.map(|k| k.to_string());
604
605 self.with_reader("count_notes", move |conn| {
606 let (where_sql, params) = build_note_where(&namespace, kind.as_deref());
607 let sql = format!("SELECT COUNT(*) FROM notes{}", where_sql);
608 let mut stmt = conn.prepare(&sql)?;
609 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
610 params.iter().map(|p| p.as_ref()).collect();
611 let count: i64 = stmt.query_row(param_refs.as_slice(), |row| row.get(0))?;
612 Ok(count as u64)
613 })
614 .await
615 }
616}
617
618const NOTES_DDL: &str = include_str!("../../sql/notes-ddl.sql");
623
624pub(crate) fn ensure_notes_schema(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
625 conn.execute_batch(NOTES_DDL)
626}
627
628#[cfg(test)]
629#[path = "note_tests.rs"]
630mod tests;