1use std::sync::Arc;
4
5use async_trait::async_trait;
6use uuid::Uuid;
7
8use khive_storage::error::StorageError;
9use khive_storage::note::{FilterOp, Note, NoteFilter, SortDir};
10use khive_storage::types::{BatchWriteSummary, DeleteMode, Page, PageRequest, SqlValue};
11use khive_storage::NoteStore;
12use khive_storage::StorageCapability;
13
14use crate::error::SqliteError;
15use crate::pool::ConnectionPool;
16
17fn map_err(e: rusqlite::Error, op: &'static str) -> StorageError {
18 StorageError::driver(StorageCapability::Notes, op, e)
19}
20
21fn map_sqlite_err(e: SqliteError, op: &'static str) -> StorageError {
22 StorageError::driver(StorageCapability::Notes, op, e)
23}
24
25pub struct SqlNoteStore {
30 pool: Arc<ConnectionPool>,
31 is_file_backed: bool,
32}
33
34impl SqlNoteStore {
35 pub fn new(pool: Arc<ConnectionPool>, is_file_backed: bool) -> Self {
37 Self {
38 pool,
39 is_file_backed,
40 }
41 }
42
43 fn open_standalone_reader(&self) -> Result<rusqlite::Connection, StorageError> {
44 let config = self.pool.config();
45 let path = config.path.as_ref().ok_or_else(|| StorageError::Pool {
46 operation: "note_reader".into(),
47 message: "in-memory databases do not support standalone connections".into(),
48 })?;
49
50 let conn = rusqlite::Connection::open_with_flags(
51 path,
52 rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY
53 | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX
54 | rusqlite::OpenFlags::SQLITE_OPEN_URI,
55 )
56 .map_err(|e| map_err(e, "open_note_reader"))?;
57
58 conn.busy_timeout(config.busy_timeout)
59 .map_err(|e| map_err(e, "open_note_reader"))?;
60 conn.pragma_update(None, "foreign_keys", "ON")
61 .map_err(|e| map_err(e, "open_note_reader"))?;
62 conn.pragma_update(None, "synchronous", "NORMAL")
63 .map_err(|e| map_err(e, "open_note_reader"))?;
64
65 Ok(conn)
66 }
67
68 async fn with_writer<F, R>(&self, op: &'static str, f: F) -> Result<R, StorageError>
70 where
71 F: FnOnce(&rusqlite::Connection) -> Result<R, rusqlite::Error> + Send + 'static,
72 R: Send + 'static,
73 {
74 let pool = Arc::clone(&self.pool);
75 tokio::task::spawn_blocking(move || {
76 let guard = pool.try_writer().map_err(|e| map_sqlite_err(e, op))?;
77 f(guard.conn()).map_err(|e| map_err(e, op))
78 })
79 .await
80 .map_err(|e| StorageError::driver(StorageCapability::Notes, op, e))?
81 }
82
83 async fn with_reader<F, R>(&self, op: &'static str, f: F) -> Result<R, StorageError>
84 where
85 F: FnOnce(&rusqlite::Connection) -> Result<R, rusqlite::Error> + Send + 'static,
86 R: Send + 'static,
87 {
88 if self.is_file_backed {
89 let conn = self.open_standalone_reader()?;
90 tokio::task::spawn_blocking(move || f(&conn).map_err(|e| map_err(e, op)))
91 .await
92 .map_err(|e| StorageError::driver(StorageCapability::Notes, op, e))?
93 } else {
94 let pool = Arc::clone(&self.pool);
95 tokio::task::spawn_blocking(move || {
96 let guard = pool.reader().map_err(|e| map_sqlite_err(e, op))?;
97 f(guard.conn()).map_err(|e| map_err(e, op))
98 })
99 .await
100 .map_err(|e| StorageError::driver(StorageCapability::Notes, op, e))?
101 }
102 }
103}
104
105fn read_note(row: &rusqlite::Row<'_>) -> Result<Note, rusqlite::Error> {
110 let id_str: String = row.get(0)?;
111 let namespace: String = row.get(1)?;
112 let kind: String = row.get(2)?;
113 let status: String = row.get(3)?;
114 let name: Option<String> = row.get(4)?;
115 let content: String = row.get(5)?;
116 let salience: Option<f64> = row.get(6)?;
117 let decay_factor: Option<f64> = row.get(7)?;
118 let expires_at: Option<i64> = row.get(8)?;
119 let properties_str: Option<String> = row.get(9)?;
120 let created_at: i64 = row.get(10)?;
121 let updated_at: i64 = row.get(11)?;
122 let deleted_at: Option<i64> = row.get(12)?;
123
124 let id = parse_uuid(&id_str)?;
125
126 let properties = properties_str
127 .map(|s| {
128 serde_json::from_str(&s).map_err(|e| {
129 rusqlite::Error::FromSqlConversionFailure(
130 9,
131 rusqlite::types::Type::Text,
132 Box::new(e),
133 )
134 })
135 })
136 .transpose()?;
137
138 Ok(Note {
139 id,
140 namespace,
141 kind,
142 status,
143 name,
144 content,
145 salience,
146 decay_factor,
147 expires_at,
148 properties,
149 created_at,
150 updated_at,
151 deleted_at,
152 })
153}
154
155fn parse_uuid(s: &str) -> Result<Uuid, rusqlite::Error> {
156 Uuid::parse_str(s).map_err(|e| {
157 rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(e))
158 })
159}
160
161fn build_note_where(
162 namespace: &str,
163 kind: Option<&str>,
164) -> (String, Vec<Box<dyn rusqlite::types::ToSql>>) {
165 let mut conditions: Vec<String> = vec![
166 "namespace = ?1".to_string(),
167 "deleted_at IS NULL".to_string(),
168 ];
169 let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = vec![Box::new(namespace.to_string())];
170
171 if let Some(k) = kind {
172 params.push(Box::new(k.to_string()));
173 conditions.push(format!("kind = ?{}", params.len()));
174 }
175
176 let clause = format!(" WHERE {}", conditions.join(" AND "));
177 (clause, params)
178}
179
180fn validate_json_path(path: &str) -> Result<(), StorageError> {
183 let valid = path.starts_with("$.")
184 && path[2..].split('.').all(|part| {
185 !part.is_empty() && part.chars().all(|c| c.is_ascii_alphanumeric() || c == '_')
186 });
187 if valid {
188 Ok(())
189 } else {
190 Err(StorageError::InvalidInput {
191 capability: StorageCapability::Notes,
192 operation: "query_notes_filtered".into(),
193 message: format!("invalid JSON path for note filter: {path:?}"),
194 })
195 }
196}
197
198fn json_extract_expr(path: &str) -> String {
199 format!("json_extract(properties, '{path}')")
200}
201
202fn sql_value_param(value: &SqlValue) -> Result<Box<dyn rusqlite::types::ToSql>, rusqlite::Error> {
203 Ok(match value {
204 SqlValue::Null => Box::new(Option::<String>::None),
205 SqlValue::Bool(v) => Box::new(*v as i64),
206 SqlValue::Integer(v) => Box::new(*v),
207 SqlValue::Float(v) => Box::new(*v),
208 SqlValue::Text(v) => Box::new(v.clone()),
209 SqlValue::Blob(v) => Box::new(v.clone()),
210 SqlValue::Json(v) => Box::new(
211 serde_json::to_string(v)
212 .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?,
213 ),
214 SqlValue::Uuid(v) => Box::new(v.to_string()),
215 SqlValue::Timestamp(v) => Box::new(v.timestamp_micros()),
216 })
217}
218
219fn build_note_filter_where(
220 namespace: &str,
221 filter: &NoteFilter,
222) -> Result<(String, Vec<Box<dyn rusqlite::types::ToSql>>), rusqlite::Error> {
223 let mut conditions = vec![
224 "namespace = ?1".to_string(),
225 "deleted_at IS NULL".to_string(),
226 ];
227 let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = vec![Box::new(namespace.to_string())];
228
229 if let Some(kind) = &filter.kind {
230 params.push(Box::new(kind.clone()));
231 conditions.push(format!("kind = ?{}", params.len()));
232 }
233
234 for pf in &filter.property_filters {
235 let expr = json_extract_expr(&pf.json_path);
236 if matches!(pf.op, FilterOp::EqOrMissing) {
237 params.push(sql_value_param(&pf.value)?);
238 conditions.push(format!(
239 "({expr} = ?{n} OR {expr} IS NULL)",
240 n = params.len()
241 ));
242 continue;
243 }
244 let op = match pf.op {
245 FilterOp::Eq => "=",
246 FilterOp::Ne => "!=",
247 FilterOp::Lt => "<",
248 FilterOp::Lte => "<=",
249 FilterOp::Gt => ">",
250 FilterOp::Gte => ">=",
251 FilterOp::EqOrMissing => unreachable!(),
252 };
253 params.push(sql_value_param(&pf.value)?);
254 conditions.push(format!("{expr} {op} ?{}", params.len()));
255 }
256
257 Ok((format!(" WHERE {}", conditions.join(" AND ")), params))
258}
259
260#[async_trait]
265impl NoteStore for SqlNoteStore {
266 async fn upsert_note(&self, note: Note) -> Result<(), StorageError> {
267 let namespace = note.namespace.clone();
268 let id_str = note.id.to_string();
269 let kind_str = note.kind.to_string();
270 let status_str = note.status.clone();
271 let properties_str = note
272 .properties
273 .as_ref()
274 .map(|v| serde_json::to_string(v).unwrap_or_default());
275
276 self.with_writer("upsert_note", move |conn| {
277 conn.execute(
278 "INSERT OR REPLACE INTO notes \
279 (id, namespace, kind, status, name, content, salience, decay_factor, expires_at, \
280 properties, created_at, updated_at, deleted_at) \
281 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
282 rusqlite::params![
283 id_str,
284 namespace,
285 kind_str,
286 status_str,
287 note.name,
288 note.content,
289 note.salience,
290 note.decay_factor,
291 note.expires_at,
292 properties_str,
293 note.created_at,
294 note.updated_at,
295 note.deleted_at,
296 ],
297 )?;
298 Ok(())
299 })
300 .await
301 }
302
303 async fn upsert_notes(&self, notes: Vec<Note>) -> Result<BatchWriteSummary, StorageError> {
304 let attempted = notes.len() as u64;
305
306 self.with_writer("upsert_notes", move |conn| {
307 conn.execute_batch("BEGIN IMMEDIATE")?;
308 let mut affected = 0u64;
309 let mut failed = 0u64;
310 let mut first_error = String::new();
311
312 for note in ¬es {
313 let id_str = note.id.to_string();
314 let kind_str = note.kind.to_string();
315 let status_str = note.status.clone();
316 let properties_str = note
317 .properties
318 .as_ref()
319 .map(|v| serde_json::to_string(v).unwrap_or_default());
320
321 match conn.execute(
322 "INSERT OR REPLACE INTO notes \
323 (id, namespace, kind, status, name, content, salience, decay_factor, expires_at, \
324 properties, created_at, updated_at, deleted_at) \
325 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
326 rusqlite::params![
327 id_str,
328 ¬e.namespace,
329 kind_str,
330 status_str,
331 ¬e.name,
332 note.content,
333 note.salience,
334 note.decay_factor,
335 note.expires_at,
336 properties_str,
337 note.created_at,
338 note.updated_at,
339 note.deleted_at,
340 ],
341 ) {
342 Ok(_) => affected += 1,
343 Err(e) => {
344 if first_error.is_empty() {
345 first_error = e.to_string();
346 }
347 failed += 1;
348 }
349 }
350 }
351
352 if let Err(e) = conn.execute_batch("COMMIT") {
353 let _ = conn.execute_batch("ROLLBACK");
354 return Err(e);
355 }
356 Ok(BatchWriteSummary {
357 attempted,
358 affected,
359 failed,
360 first_error,
361 })
362 })
363 .await
364 }
365
366 async fn get_note(&self, id: Uuid) -> Result<Option<Note>, StorageError> {
367 let id_str = id.to_string();
368
369 self.with_reader("get_note", move |conn| {
370 let mut stmt = conn.prepare(
371 "SELECT id, namespace, kind, status, name, content, salience, decay_factor, expires_at, \
372 properties, created_at, updated_at, deleted_at \
373 FROM notes WHERE id = ?1 AND deleted_at IS NULL",
374 )?;
375 let mut rows = stmt.query(rusqlite::params![id_str])?;
376 match rows.next()? {
377 Some(row) => Ok(Some(read_note(row)?)),
378 None => Ok(None),
379 }
380 })
381 .await
382 }
383
384 async fn get_notes_batch(&self, ids: &[Uuid]) -> Result<Vec<Note>, StorageError> {
385 if ids.is_empty() {
386 return Ok(vec![]);
387 }
388 let id_strings: Vec<String> = ids.iter().map(|id| id.to_string()).collect();
389
390 self.with_reader("get_notes_batch", move |conn| {
391 let placeholders: String = (1..=id_strings.len())
392 .map(|i| format!("?{i}"))
393 .collect::<Vec<_>>()
394 .join(", ");
395 let sql = format!(
396 "SELECT id, namespace, kind, status, name, content, salience, decay_factor, expires_at, \
397 properties, created_at, updated_at, deleted_at \
398 FROM notes WHERE id IN ({placeholders}) AND deleted_at IS NULL"
399 );
400 let mut stmt = conn.prepare(&sql)?;
401 let params: Vec<&dyn rusqlite::types::ToSql> = id_strings
402 .iter()
403 .map(|s| s as &dyn rusqlite::types::ToSql)
404 .collect();
405 let rows = stmt.query_map(params.as_slice(), read_note)?;
406 let mut out = Vec::new();
407 for row in rows {
408 out.push(row?);
409 }
410 Ok(out)
411 })
412 .await
413 }
414
415 async fn delete_note(&self, id: Uuid, mode: DeleteMode) -> Result<bool, StorageError> {
416 let id_str = id.to_string();
417
418 match mode {
419 DeleteMode::Soft => {
420 self.with_writer("delete_note_soft", move |conn| {
421 let now = chrono::Utc::now().timestamp_micros();
422 let deleted = conn.execute(
423 "UPDATE notes SET status = 'deleted', deleted_at = ?1 \
424 WHERE id = ?2 AND deleted_at IS NULL",
425 rusqlite::params![now, id_str],
426 )?;
427 Ok(deleted > 0)
428 })
429 .await
430 }
431 DeleteMode::Hard => {
432 self.with_writer("delete_note_hard", move |conn| {
433 let deleted =
434 conn.execute("DELETE FROM notes WHERE id = ?1", rusqlite::params![id_str])?;
435 Ok(deleted > 0)
436 })
437 .await
438 }
439 }
440 }
441
442 async fn query_notes(
443 &self,
444 namespace: &str,
445 kind: Option<&str>,
446 page: PageRequest,
447 ) -> Result<Page<Note>, StorageError> {
448 let namespace = namespace.to_string();
449 let kind = kind.map(|k| k.to_string());
450
451 self.with_reader("query_notes", move |conn| {
452 let (count_sql, count_params) = build_note_where(&namespace, kind.as_deref());
453 let total: i64 = {
454 let sql = format!("SELECT COUNT(*) FROM notes{}", count_sql);
455 let mut stmt = conn.prepare(&sql)?;
456 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
457 count_params.iter().map(|p| p.as_ref()).collect();
458 stmt.query_row(param_refs.as_slice(), |row| row.get(0))?
459 };
460
461 let (where_sql, mut data_params) = build_note_where(&namespace, kind.as_deref());
462 data_params.push(Box::new(page.limit as i64));
463 data_params.push(Box::new(page.offset as i64));
464
465 let limit_idx = data_params.len() - 1;
466 let offset_idx = data_params.len();
467
468 let data_sql = format!(
469 "SELECT id, namespace, kind, status, name, content, salience, decay_factor, expires_at, \
470 properties, created_at, updated_at, deleted_at \
471 FROM notes{} ORDER BY created_at DESC LIMIT ?{} OFFSET ?{}",
472 where_sql, limit_idx, offset_idx,
473 );
474
475 let mut stmt = conn.prepare(&data_sql)?;
476 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
477 data_params.iter().map(|p| p.as_ref()).collect();
478 let rows = stmt.query_map(param_refs.as_slice(), read_note)?;
479
480 let mut items = Vec::new();
481 for row in rows {
482 items.push(row?);
483 }
484
485 Ok(Page {
486 items,
487 total: Some(total as u64),
488 })
489 })
490 .await
491 }
492
493 async fn query_notes_filtered(
494 &self,
495 namespace: &str,
496 filter: &NoteFilter,
497 page: PageRequest,
498 ) -> Result<Page<Note>, StorageError> {
499 for pf in &filter.property_filters {
501 validate_json_path(&pf.json_path)?;
502 }
503 if let Some((path, _)) = &filter.order_by {
504 validate_json_path(path)?;
505 }
506
507 let namespace = namespace.to_string();
508 let filter = filter.clone();
509
510 self.with_reader("query_notes_filtered", move |conn| {
511 let (count_sql, count_params) = build_note_filter_where(&namespace, &filter)?;
512 let total: i64 = {
513 let sql = format!("SELECT COUNT(*) FROM notes{}", count_sql);
514 let mut stmt = conn.prepare(&sql)?;
515 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
516 count_params.iter().map(|p| p.as_ref()).collect();
517 stmt.query_row(param_refs.as_slice(), |row| row.get(0))?
518 };
519
520 let (where_sql, mut data_params) = build_note_filter_where(&namespace, &filter)?;
521 data_params.push(Box::new(page.limit as i64));
522 data_params.push(Box::new(page.offset as i64));
523
524 let order_clause = match &filter.order_by {
525 Some((path, dir)) => {
526 let dir_str = match dir {
527 SortDir::Asc => "ASC",
528 SortDir::Desc => "DESC",
529 };
530 format!(" ORDER BY {} {dir_str}", json_extract_expr(path))
531 }
532 None => " ORDER BY created_at DESC".to_string(),
533 };
534
535 let limit_idx = data_params.len() - 1;
536 let offset_idx = data_params.len();
537 let data_sql = format!(
538 "SELECT id, namespace, kind, status, name, content, salience, decay_factor, \
539 expires_at, properties, created_at, updated_at, deleted_at \
540 FROM notes{}{order_clause} LIMIT ?{} OFFSET ?{}",
541 where_sql, limit_idx, offset_idx,
542 );
543
544 let mut stmt = conn.prepare(&data_sql)?;
545 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
546 data_params.iter().map(|p| p.as_ref()).collect();
547 let rows = stmt.query_map(param_refs.as_slice(), read_note)?;
548
549 let mut items = Vec::new();
550 for row in rows {
551 items.push(row?);
552 }
553
554 Ok(Page {
555 items,
556 total: Some(total as u64),
557 })
558 })
559 .await
560 }
561
562 async fn count_notes(&self, namespace: &str, kind: Option<&str>) -> Result<u64, StorageError> {
563 let namespace = namespace.to_string();
564 let kind = kind.map(|k| k.to_string());
565
566 self.with_reader("count_notes", move |conn| {
567 let (where_sql, params) = build_note_where(&namespace, kind.as_deref());
568 let sql = format!("SELECT COUNT(*) FROM notes{}", where_sql);
569 let mut stmt = conn.prepare(&sql)?;
570 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
571 params.iter().map(|p| p.as_ref()).collect();
572 let count: i64 = stmt.query_row(param_refs.as_slice(), |row| row.get(0))?;
573 Ok(count as u64)
574 })
575 .await
576 }
577}
578
579const NOTES_DDL: &str = "\
584 CREATE TABLE IF NOT EXISTS notes (\
585 id TEXT PRIMARY KEY,\
586 namespace TEXT NOT NULL,\
587 kind TEXT NOT NULL,\
588 status TEXT NOT NULL DEFAULT 'active',\
589 name TEXT,\
590 content TEXT NOT NULL DEFAULT '',\
591 salience REAL,\
592 decay_factor REAL,\
593 expires_at INTEGER,\
594 properties TEXT,\
595 created_at INTEGER NOT NULL,\
596 updated_at INTEGER NOT NULL,\
597 deleted_at INTEGER\
598 );\
599 CREATE INDEX IF NOT EXISTS idx_notes_namespace ON notes(namespace);\
600 CREATE INDEX IF NOT EXISTS idx_notes_kind ON notes(namespace, kind);\
601 CREATE INDEX IF NOT EXISTS idx_notes_created ON notes(created_at DESC);\
602";
603
604pub(crate) fn ensure_notes_schema(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
605 conn.execute_batch(NOTES_DDL)
606}
607
608#[cfg(test)]
609mod tests {
610 use super::*;
611 use crate::pool::PoolConfig;
612
613 fn setup_pool() -> Arc<ConnectionPool> {
614 let config = PoolConfig {
615 path: None,
616 ..PoolConfig::default()
617 };
618 let pool = Arc::new(ConnectionPool::new(config).unwrap());
619 {
620 let writer = pool.writer().unwrap();
621 writer.conn().execute_batch(NOTES_DDL).unwrap();
622 }
623 pool
624 }
625
626 fn setup_memory_store() -> SqlNoteStore {
627 SqlNoteStore::new(setup_pool(), false)
628 }
629
630 fn make_note(namespace: &str, kind: &str, content: &str) -> Note {
631 Note::new(namespace, kind, content)
632 }
633
634 #[tokio::test]
635 async fn test_upsert_and_get_note() {
636 let store = setup_memory_store();
637
638 let note = make_note("default", "observation", "Hello world");
639 let id = note.id;
640
641 store.upsert_note(note).await.unwrap();
642
643 let fetched = store.get_note(id).await.unwrap();
644 assert!(fetched.is_some());
645 let fetched = fetched.unwrap();
646 assert_eq!(fetched.id, id);
647 assert_eq!(fetched.content, "Hello world");
648 assert_eq!(fetched.kind, "observation");
649 }
650
651 #[tokio::test]
652 async fn test_kind_roundtrip_all_variants() {
653 let store = setup_memory_store();
654 for kind in [
655 "observation",
656 "insight",
657 "question",
658 "decision",
659 "reference",
660 ] {
661 let note = make_note("default", kind, "content");
662 let id = note.id;
663 store.upsert_note(note).await.unwrap();
664 let fetched = store.get_note(id).await.unwrap().unwrap();
665 assert_eq!(fetched.kind, kind);
666 }
667 }
668
669 #[tokio::test]
670 async fn test_soft_delete() {
671 let store = setup_memory_store();
672
673 let note = make_note("default", "observation", "to be deleted");
674 let id = note.id;
675 store.upsert_note(note).await.unwrap();
676
677 let deleted = store.delete_note(id, DeleteMode::Soft).await.unwrap();
678 assert!(deleted);
679
680 let fetched = store.get_note(id).await.unwrap();
681 assert!(fetched.is_none());
682 }
683
684 #[tokio::test]
685 async fn test_hard_delete() {
686 let store = setup_memory_store();
687
688 let note = make_note("default", "observation", "to be hard deleted");
689 let id = note.id;
690 store.upsert_note(note).await.unwrap();
691
692 let deleted = store.delete_note(id, DeleteMode::Hard).await.unwrap();
693 assert!(deleted);
694
695 let fetched = store.get_note(id).await.unwrap();
696 assert!(fetched.is_none());
697 }
698
699 #[tokio::test]
701 async fn test_namespace_isolation() {
702 let pool = setup_pool();
703 let store = SqlNoteStore::new(Arc::clone(&pool), false);
704
705 for _ in 0..3 {
706 store
707 .upsert_note(make_note("ns1", "observation", "content"))
708 .await
709 .unwrap();
710 }
711 store
712 .upsert_note(make_note("ns2", "observation", "other"))
713 .await
714 .unwrap();
715
716 let count_ns1 = store.count_notes("ns1", None).await.unwrap();
717 assert_eq!(count_ns1, 3);
718
719 let count_ns2 = store.count_notes("ns2", None).await.unwrap();
720 assert_eq!(count_ns2, 1);
721 }
722
723 #[tokio::test]
725 async fn test_query_and_count_use_caller_namespace() {
726 let pool = setup_pool();
727 let store = SqlNoteStore::new(Arc::clone(&pool), false);
728
729 store
730 .upsert_note(make_note("ns_a", "observation", "A"))
731 .await
732 .unwrap();
733 store
734 .upsert_note(make_note("ns_b", "insight", "B"))
735 .await
736 .unwrap();
737
738 let page_a = store
739 .query_notes("ns_a", None, PageRequest::default())
740 .await
741 .unwrap();
742 assert_eq!(page_a.items.len(), 1);
743 assert_eq!(page_a.items[0].content, "A");
744 assert_eq!(page_a.total, Some(1));
745
746 let page_b = store
747 .query_notes("ns_b", None, PageRequest::default())
748 .await
749 .unwrap();
750 assert_eq!(page_b.items.len(), 1);
751 assert_eq!(page_b.items[0].content, "B");
752 assert_eq!(page_b.total, Some(1));
753
754 let count_a = store.count_notes("ns_a", None).await.unwrap();
755 let count_b = store.count_notes("ns_b", None).await.unwrap();
756 assert_eq!(count_a, 1);
757 assert_eq!(count_b, 1);
758 }
759
760 #[tokio::test]
761 async fn test_soft_delete_sets_status_deleted() {
762 let pool = setup_pool();
763 let store = SqlNoteStore::new(Arc::clone(&pool), false);
764 let note = make_note("default", "observation", "to delete");
765 let id = note.id;
766 store.upsert_note(note).await.unwrap();
767 let deleted = store.delete_note(id, DeleteMode::Soft).await.unwrap();
768 assert!(deleted);
769 let writer = pool.writer().unwrap();
771 let status: String = writer
772 .conn()
773 .query_row(
774 "SELECT status FROM notes WHERE id = ?1",
775 [id.to_string()],
776 |r| r.get(0),
777 )
778 .unwrap();
779 assert_eq!(status, "deleted");
780 }
781
782 #[tokio::test]
783 async fn test_note_status_field_roundtrip() {
784 let store = setup_memory_store();
785 let note = make_note("default", "observation", "status test");
786 let id = note.id;
787 store.upsert_note(note).await.unwrap();
788 let fetched = store.get_note(id).await.unwrap().unwrap();
789 assert_eq!(fetched.status, "active");
790 }
791
792 fn make_note_with_props(
795 namespace: &str,
796 kind: &str,
797 content: &str,
798 props: serde_json::Value,
799 ) -> Note {
800 Note::new(namespace, kind, content).with_properties(props)
801 }
802
803 #[tokio::test]
804 async fn test_filtered_namespace_and_kind_isolation() {
805 let store = setup_memory_store();
806 use khive_storage::note::PropertyFilter as NotePropFilter;
807 use khive_storage::note::{FilterOp, NoteFilter};
808 use khive_storage::types::{PageRequest, SqlValue};
809
810 let n1 = make_note_with_props(
812 "ns1",
813 "scheduled_event",
814 "event1",
815 serde_json::json!({"status": "pending", "trigger_at": "2027-01-01T00:00:00Z"}),
816 );
817 let n2 = make_note_with_props(
818 "ns1",
819 "scheduled_event",
820 "event2",
821 serde_json::json!({"status": "done", "trigger_at": "2027-01-02T00:00:00Z"}),
822 );
823 let n3 = make_note_with_props(
824 "ns2",
825 "scheduled_event",
826 "event3",
827 serde_json::json!({"status": "pending", "trigger_at": "2027-01-03T00:00:00Z"}),
828 );
829 store.upsert_note(n1).await.unwrap();
830 store.upsert_note(n2).await.unwrap();
831 store.upsert_note(n3).await.unwrap();
832
833 let filter = NoteFilter {
834 kind: Some("scheduled_event".to_string()),
835 property_filters: vec![NotePropFilter {
836 json_path: "$.status".to_string(),
837 op: FilterOp::Eq,
838 value: SqlValue::Text("pending".to_string()),
839 }],
840 order_by: None,
841 };
842
843 let page = store
844 .query_notes_filtered("ns1", &filter, PageRequest::default())
845 .await
846 .unwrap();
847 assert_eq!(
848 page.items.len(),
849 1,
850 "only the pending ns1 event should appear"
851 );
852 assert_eq!(page.items[0].content, "event1");
853 assert_eq!(page.total, Some(1));
854 }
855
856 #[tokio::test]
857 async fn test_filtered_order_by_json_path_asc() {
858 let store = setup_memory_store();
859 use khive_storage::note::PropertyFilter as NotePropFilter;
860 use khive_storage::note::{FilterOp, NoteFilter, SortDir};
861 use khive_storage::types::{PageRequest, SqlValue};
862
863 let n3 = make_note_with_props(
865 "ns1",
866 "scheduled_event",
867 "third",
868 serde_json::json!({"status": "pending", "trigger_at": "2027-01-03T00:00:00Z"}),
869 );
870 let n1 = make_note_with_props(
871 "ns1",
872 "scheduled_event",
873 "first",
874 serde_json::json!({"status": "pending", "trigger_at": "2027-01-01T00:00:00Z"}),
875 );
876 let n2 = make_note_with_props(
877 "ns1",
878 "scheduled_event",
879 "second",
880 serde_json::json!({"status": "pending", "trigger_at": "2027-01-02T00:00:00Z"}),
881 );
882 store.upsert_note(n3).await.unwrap();
883 store.upsert_note(n1).await.unwrap();
884 store.upsert_note(n2).await.unwrap();
885
886 let filter = NoteFilter {
887 kind: Some("scheduled_event".to_string()),
888 property_filters: vec![NotePropFilter {
889 json_path: "$.status".to_string(),
890 op: FilterOp::Eq,
891 value: SqlValue::Text("pending".to_string()),
892 }],
893 order_by: Some(("$.trigger_at".to_string(), SortDir::Asc)),
894 };
895
896 let page = store
897 .query_notes_filtered("ns1", &filter, PageRequest::default())
898 .await
899 .unwrap();
900 assert_eq!(page.items.len(), 3);
901 assert_eq!(page.items[0].content, "first");
902 assert_eq!(page.items[1].content, "second");
903 assert_eq!(page.items[2].content, "third");
904 }
905
906 #[tokio::test]
907 async fn test_filtered_soft_deleted_excluded() {
908 let store = setup_memory_store();
909 use khive_storage::note::PropertyFilter as NotePropFilter;
910 use khive_storage::note::{FilterOp, NoteFilter};
911 use khive_storage::types::{DeleteMode, PageRequest, SqlValue};
912
913 let n = make_note_with_props(
914 "ns1",
915 "scheduled_event",
916 "to_delete",
917 serde_json::json!({"status": "pending"}),
918 );
919 let id = n.id;
920 store.upsert_note(n).await.unwrap();
921 store.delete_note(id, DeleteMode::Soft).await.unwrap();
922
923 let filter = NoteFilter {
924 kind: Some("scheduled_event".to_string()),
925 property_filters: vec![NotePropFilter {
926 json_path: "$.status".to_string(),
927 op: FilterOp::Eq,
928 value: SqlValue::Text("pending".to_string()),
929 }],
930 order_by: None,
931 };
932
933 let page = store
934 .query_notes_filtered("ns1", &filter, PageRequest::default())
935 .await
936 .unwrap();
937 assert_eq!(page.items.len(), 0, "soft-deleted rows must not appear");
938 }
939
940 #[tokio::test]
941 async fn test_filtered_invalid_json_path_rejected() {
942 let store = setup_memory_store();
943 use khive_storage::note::PropertyFilter as NotePropFilter;
944 use khive_storage::note::{FilterOp, NoteFilter};
945 use khive_storage::types::{PageRequest, SqlValue};
946
947 let filter = NoteFilter {
948 kind: None,
949 property_filters: vec![NotePropFilter {
950 json_path: "DROP TABLE notes".to_string(),
951 op: FilterOp::Eq,
952 value: SqlValue::Text("x".to_string()),
953 }],
954 order_by: None,
955 };
956
957 let result = store
958 .query_notes_filtered("ns1", &filter, PageRequest::default())
959 .await;
960 assert!(
961 result.is_err(),
962 "invalid json_path must be rejected before SQL"
963 );
964 }
965}