Skip to main content

khive_db/stores/
sparse.rs

1//! SQLite-backed `SparseStore` implementation.
2
3use std::cmp::Reverse;
4use std::collections::BinaryHeap;
5use std::sync::Arc;
6
7use async_trait::async_trait;
8use uuid::Uuid;
9
10use khive_score::DeterministicScore;
11use khive_storage::error::StorageError;
12use khive_storage::types::{
13    BatchWriteSummary, SparseRecord, SparseSearchHit, SparseSearchRequest, SparseVector,
14};
15use khive_storage::{SparseStore, StorageCapability};
16use khive_types::SubstrateKind;
17
18use crate::error::SqliteError;
19use crate::pool::ConnectionPool;
20
21fn map_err(e: rusqlite::Error, op: &'static str) -> StorageError {
22    StorageError::driver(StorageCapability::Sparse, op, e)
23}
24
25fn map_sqlite_err(e: SqliteError, op: &'static str) -> StorageError {
26    StorageError::driver(StorageCapability::Sparse, op, e)
27}
28
29/// Validate that a sparse vector is well-formed.
30///
31/// - indices and values must have equal lengths
32/// - at least one element
33/// - all values must be finite
34/// - indices must be strictly increasing (no duplicates)
35fn validate_sparse_vector(vector: &SparseVector, op: &'static str) -> Result<(), StorageError> {
36    if vector.indices.len() != vector.values.len() {
37        return Err(StorageError::InvalidInput {
38            capability: StorageCapability::Sparse,
39            operation: op.into(),
40            message: format!(
41                "indices length ({}) != values length ({})",
42                vector.indices.len(),
43                vector.values.len()
44            ),
45        });
46    }
47    if vector.indices.is_empty() {
48        return Err(StorageError::InvalidInput {
49            capability: StorageCapability::Sparse,
50            operation: op.into(),
51            message: "sparse vector must have at least one element".into(),
52        });
53    }
54    for (i, v) in vector.values.iter().enumerate() {
55        if !v.is_finite() {
56            return Err(StorageError::InvalidInput {
57                capability: StorageCapability::Sparse,
58                operation: op.into(),
59                message: format!("non-finite value at position {i}: {v}"),
60            });
61        }
62    }
63    // Verify strictly increasing indices.
64    for window in vector.indices.windows(2) {
65        if window[0] >= window[1] {
66            return Err(StorageError::InvalidInput {
67                capability: StorageCapability::Sparse,
68                operation: op.into(),
69                message: format!(
70                    "indices must be strictly increasing; found {} then {}",
71                    window[0], window[1]
72                ),
73            });
74        }
75    }
76    Ok(())
77}
78
79/// Serialize f32 slice to little-endian bytes (same pattern as vectors.rs).
80fn f32_slice_as_bytes(data: &[f32]) -> &[u8] {
81    // SAFETY: same safety argument as vectors.rs — valid &[f32], alignment = 1, lifetime tied to input.
82    unsafe { std::slice::from_raw_parts(data.as_ptr() as *const u8, std::mem::size_of_val(data)) }
83}
84
85/// Create the sparse table and its index for the given model_key.
86pub(crate) fn ensure_sparse_schema(
87    conn: &rusqlite::Connection,
88    model_key: &str,
89) -> Result<(), rusqlite::Error> {
90    let table = format!("sparse_{}", model_key);
91    let ddl = format!(
92        "CREATE TABLE IF NOT EXISTS {table} (\
93         subject_id TEXT NOT NULL, \
94         namespace TEXT NOT NULL, \
95         kind TEXT NOT NULL, \
96         field TEXT NOT NULL, \
97         indices_json TEXT NOT NULL, \
98         values_blob BLOB NOT NULL, \
99         updated_at INTEGER NOT NULL, \
100         PRIMARY KEY(subject_id, namespace, field)\
101         ); \
102         CREATE INDEX IF NOT EXISTS idx_{table}_namespace_kind \
103         ON {table}(namespace, kind);"
104    );
105    conn.execute_batch(&ddl)
106}
107
108/// SQLite-backed sparse vector store.
109pub struct SqliteSparseStore {
110    pool: Arc<ConnectionPool>,
111    is_file_backed: bool,
112    table_name: String,
113    namespace: String,
114}
115
116impl SqliteSparseStore {
117    /// Create a new sparse store for the given model key and namespace.
118    pub fn new(
119        pool: Arc<ConnectionPool>,
120        is_file_backed: bool,
121        model_key: String,
122        namespace: String,
123    ) -> Result<Self, SqliteError> {
124        let table_name = format!("sparse_{}", model_key);
125        Ok(Self {
126            pool,
127            is_file_backed,
128            table_name,
129            namespace,
130        })
131    }
132
133    async fn with_writer<F, R>(&self, op: &'static str, f: F) -> Result<R, StorageError>
134    where
135        F: FnOnce(&rusqlite::Connection) -> Result<R, rusqlite::Error> + Send + 'static,
136        R: Send + 'static,
137    {
138        let pool = Arc::clone(&self.pool);
139        tokio::task::spawn_blocking(move || {
140            let guard = pool.try_writer().map_err(|e| map_sqlite_err(e, op))?;
141            f(guard.conn()).map_err(|e| map_err(e, op))
142        })
143        .await
144        .map_err(|e| StorageError::driver(StorageCapability::Sparse, op, e))?
145    }
146
147    async fn with_reader<F, R>(&self, op: &'static str, f: F) -> Result<R, StorageError>
148    where
149        F: FnOnce(&rusqlite::Connection) -> Result<R, rusqlite::Error> + Send + 'static,
150        R: Send + 'static,
151    {
152        if self.is_file_backed {
153            // For file-backed DBs open a standalone read-only connection.
154            let config = self.pool.config();
155            let path = config.path.as_ref().ok_or_else(|| StorageError::Pool {
156                operation: "sparse_reader".into(),
157                message: "in-memory databases do not support standalone connections".into(),
158            })?;
159            let conn = rusqlite::Connection::open_with_flags(
160                path,
161                rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY
162                    | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX
163                    | rusqlite::OpenFlags::SQLITE_OPEN_URI,
164            )
165            .map_err(|e| map_err(e, op))?;
166            tokio::task::spawn_blocking(move || f(&conn).map_err(|e| map_err(e, op)))
167                .await
168                .map_err(|e| StorageError::driver(StorageCapability::Sparse, op, e))?
169        } else {
170            let pool = Arc::clone(&self.pool);
171            tokio::task::spawn_blocking(move || {
172                let guard = pool.reader().map_err(|e| map_sqlite_err(e, op))?;
173                f(guard.conn()).map_err(|e| map_err(e, op))
174            })
175            .await
176            .map_err(|e| StorageError::driver(StorageCapability::Sparse, op, e))?
177        }
178    }
179
180    async fn upsert_sparse_vector(
181        &self,
182        subject_id: Uuid,
183        kind: SubstrateKind,
184        namespace: &str,
185        field: &str,
186        vector: SparseVector,
187    ) -> Result<(), StorageError> {
188        let table = self.table_name.clone();
189        let ns = namespace.to_string();
190        let field = field.to_string();
191        let id_str = subject_id.to_string();
192        let kind_str = kind.to_string();
193
194        self.with_writer("sparse_upsert", move |conn| {
195            let indices_json = serde_json::to_string(&vector.indices).map_err(|e| {
196                rusqlite::Error::FromSqlConversionFailure(
197                    0,
198                    rusqlite::types::Type::Text,
199                    Box::new(e),
200                )
201            })?;
202            let values_blob = f32_slice_as_bytes(&vector.values);
203            let now = chrono::Utc::now().timestamp();
204            let sql = format!(
205                "INSERT INTO {table} \
206                 (subject_id, namespace, kind, field, indices_json, values_blob, updated_at) \
207                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7) \
208                 ON CONFLICT(subject_id, namespace, field) DO UPDATE SET \
209                 kind = excluded.kind, \
210                 indices_json = excluded.indices_json, \
211                 values_blob = excluded.values_blob, \
212                 updated_at = excluded.updated_at"
213            );
214            conn.execute(
215                &sql,
216                rusqlite::params![
217                    &id_str,
218                    &ns,
219                    &kind_str,
220                    &field,
221                    &indices_json,
222                    values_blob,
223                    now
224                ],
225            )?;
226            Ok(())
227        })
228        .await
229    }
230
231    async fn insert_sparse_batch(
232        &self,
233        records: Vec<SparseRecord>,
234    ) -> Result<BatchWriteSummary, StorageError> {
235        let table = self.table_name.clone();
236        let attempted = records.len() as u64;
237
238        self.with_writer("sparse_insert_batch", move |conn| {
239            let sql = format!(
240                "INSERT INTO {table} \
241                 (subject_id, namespace, kind, field, indices_json, values_blob, updated_at) \
242                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7) \
243                 ON CONFLICT(subject_id, namespace, field) DO UPDATE SET \
244                 indices_json = excluded.indices_json, \
245                 values_blob = excluded.values_blob, \
246                 updated_at = excluded.updated_at"
247            );
248
249            conn.execute_batch("BEGIN IMMEDIATE")?;
250            let mut affected = 0u64;
251            let mut failed = 0u64;
252            let mut first_error = String::new();
253
254            for record in &records {
255                // Validate inline — skip invalid records rather than aborting the batch.
256                if record.vector.indices.len() != record.vector.values.len()
257                    || record.vector.indices.is_empty()
258                    || record.vector.values.iter().any(|v| !v.is_finite())
259                    || record.vector.indices.windows(2).any(|w| w[0] >= w[1])
260                {
261                    if first_error.is_empty() {
262                        first_error =
263                            format!("invalid sparse vector for subject {}", record.subject_id);
264                    }
265                    failed += 1;
266                    continue;
267                }
268
269                let indices_json = match serde_json::to_string(&record.vector.indices) {
270                    Ok(j) => j,
271                    Err(e) => {
272                        if first_error.is_empty() {
273                            first_error = e.to_string();
274                        }
275                        failed += 1;
276                        continue;
277                    }
278                };
279                let values_blob = f32_slice_as_bytes(&record.vector.values);
280                let now = record.updated_at.timestamp();
281                let id_str = record.subject_id.to_string();
282                let kind_str = record.kind.to_string();
283
284                match conn.execute(
285                    &sql,
286                    rusqlite::params![
287                        &id_str,
288                        &record.namespace,
289                        &kind_str,
290                        &record.field,
291                        &indices_json,
292                        values_blob,
293                        now
294                    ],
295                ) {
296                    Ok(_) => affected += 1,
297                    Err(e) => {
298                        if first_error.is_empty() {
299                            first_error = e.to_string();
300                        }
301                        failed += 1;
302                    }
303                }
304            }
305
306            conn.execute_batch("COMMIT")?;
307            Ok(BatchWriteSummary {
308                attempted,
309                affected,
310                failed,
311                first_error,
312            })
313        })
314        .await
315    }
316
317    async fn delete_sparse_subject(&self, subject_id: Uuid) -> Result<bool, StorageError> {
318        let table = self.table_name.clone();
319        let namespace = self.namespace.clone();
320        let id_str = subject_id.to_string();
321
322        self.with_writer("sparse_delete", move |conn| {
323            let sql = format!("DELETE FROM {table} WHERE subject_id = ?1 AND namespace = ?2");
324            let deleted = conn.execute(&sql, rusqlite::params![&id_str, &namespace])?;
325            Ok(deleted > 0)
326        })
327        .await
328    }
329
330    async fn search_sparse_vectors(
331        &self,
332        request: SparseSearchRequest,
333    ) -> Result<Vec<SparseSearchHit>, StorageError> {
334        let table = self.table_name.clone();
335        let ns = request
336            .namespace
337            .clone()
338            .unwrap_or_else(|| self.namespace.clone());
339        let kind_filter = request.kind.map(|k| k.to_string());
340        let query = request.query;
341        let top_k = request.top_k as usize;
342
343        self.with_reader("sparse_search", move |conn| {
344            // Load candidate rows for namespace (and optional kind).
345            let (sql, kind_str_ref) = if let Some(ref kind_str) = kind_filter {
346                (
347                    format!(
348                        "SELECT subject_id, indices_json, values_blob \
349                         FROM {table} WHERE namespace = ?1 AND kind = ?2"
350                    ),
351                    Some(kind_str.as_str()),
352                )
353            } else {
354                (
355                    format!(
356                        "SELECT subject_id, indices_json, values_blob \
357                         FROM {table} WHERE namespace = ?1"
358                    ),
359                    None,
360                )
361            };
362
363            let mut stmt = conn.prepare(&sql)?;
364
365            // Collect rows.
366            let rows: Vec<rusqlite::Result<(String, String, Vec<u8>)>> =
367                if let Some(kind_str) = kind_str_ref {
368                    stmt.query_map(rusqlite::params![&ns, kind_str], |row| {
369                        Ok((row.get(0)?, row.get(1)?, row.get(2)?))
370                    })?
371                    .collect()
372                } else {
373                    stmt.query_map(rusqlite::params![&ns], |row| {
374                        Ok((row.get(0)?, row.get(1)?, row.get(2)?))
375                    })?
376                    .collect()
377                };
378
379            // Bounded min-heap for top-k selection (KDB-AUD-003).
380            let mut heap: BinaryHeap<Reverse<ScoredCandidate>> =
381                BinaryHeap::with_capacity(top_k + 1);
382
383            for row_result in rows {
384                let (id_str, indices_json, values_blob) = row_result?;
385
386                let subject_id = Uuid::parse_str(&id_str).map_err(|e| {
387                    rusqlite::Error::FromSqlConversionFailure(
388                        0,
389                        rusqlite::types::Type::Text,
390                        Box::new(e),
391                    )
392                })?;
393
394                // surface malformed rows as errors instead of silently skipping them
395                let stored_indices: Vec<u32> =
396                    serde_json::from_str(&indices_json).map_err(|e| {
397                        rusqlite::Error::FromSqlConversionFailure(
398                            0,
399                            rusqlite::types::Type::Text,
400                            Box::<dyn std::error::Error + Send + Sync>::from(format!(
401                                "corrupt sparse row {id_str}: invalid indices JSON: {e}"
402                            )),
403                        )
404                    })?;
405
406                if values_blob.len() % 4 != 0 {
407                    return Err(rusqlite::Error::FromSqlConversionFailure(
408                        0,
409                        rusqlite::types::Type::Blob,
410                        Box::<dyn std::error::Error + Send + Sync>::from(format!(
411                            "corrupt sparse row {id_str}: values blob length {} not a multiple of 4",
412                            values_blob.len()
413                        )),
414                    ));
415                }
416
417                let stored_values: Vec<f32> = values_blob
418                    .chunks_exact(4)
419                    .map(|b| f32::from_le_bytes([b[0], b[1], b[2], b[3]]))
420                    .collect();
421
422                validate_persisted_sparse(&id_str, &stored_indices, &stored_values)?;
423
424                let score = sparse_dot_product(
425                    &query.indices,
426                    &query.values,
427                    &stored_indices,
428                    &stored_values,
429                );
430
431                heap.push(Reverse(ScoredCandidate { score, subject_id }));
432                if heap.len() > top_k {
433                    heap.pop();
434                }
435            }
436
437            // Drain heap and sort descending by score, ascending by UUID on tie.
438            let mut top: Vec<_> = heap.into_iter().map(|Reverse(c)| c).collect();
439            top.sort_by(|a, b| {
440                b.score
441                    .partial_cmp(&a.score)
442                    .unwrap_or(std::cmp::Ordering::Equal)
443                    .then_with(|| a.subject_id.cmp(&b.subject_id))
444            });
445
446            let hits = top
447                .into_iter()
448                .enumerate()
449                .map(|(i, c)| SparseSearchHit {
450                    subject_id: c.subject_id,
451                    score: DeterministicScore::from_f64(c.score),
452                    rank: (i + 1) as u32,
453                })
454                .collect();
455
456            Ok(hits)
457        })
458        .await
459    }
460
461    async fn count_sparse_rows(&self) -> Result<u64, StorageError> {
462        let table = self.table_name.clone();
463        let namespace = self.namespace.clone();
464        self.with_reader("sparse_count", move |conn| {
465            let sql = format!("SELECT COUNT(*) FROM {table} WHERE namespace = ?1");
466            let count: i64 =
467                conn.query_row(&sql, rusqlite::params![&namespace], |row| row.get(0))?;
468            Ok(count as u64)
469        })
470        .await
471    }
472}
473
474/// Candidate scored during sparse search, ordered for a min-heap so we can
475/// maintain a bounded top-k set: (score desc, subject_id asc) tie-breaking.
476#[derive(PartialEq)]
477struct ScoredCandidate {
478    score: f64,
479    subject_id: Uuid,
480}
481
482impl Eq for ScoredCandidate {}
483
484impl PartialOrd for ScoredCandidate {
485    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
486        Some(self.cmp(other))
487    }
488}
489
490impl Ord for ScoredCandidate {
491    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
492        // Min-heap: lower score pops first. On tie, higher UUID pops first
493        // (so lower UUID is retained = deterministic ascending tie-break).
494        match self
495            .score
496            .partial_cmp(&other.score)
497            .unwrap_or(std::cmp::Ordering::Equal)
498        {
499            std::cmp::Ordering::Equal => other.subject_id.cmp(&self.subject_id),
500            ord => ord,
501        }
502    }
503}
504
505/// Validate invariants on a deserialized sparse vector from the database.
506/// Returns a storage error describing the corruption instead of silently
507/// skipping the row (KDB-AUD-002).
508fn validate_persisted_sparse(
509    subject_id: &str,
510    indices: &[u32],
511    values: &[f32],
512) -> Result<(), rusqlite::Error> {
513    if indices.len() != values.len() {
514        return Err(rusqlite::Error::FromSqlConversionFailure(
515            0,
516            rusqlite::types::Type::Blob,
517            Box::<dyn std::error::Error + Send + Sync>::from(format!(
518                "corrupt sparse row {subject_id}: indices len {} != values len {}",
519                indices.len(),
520                values.len()
521            )),
522        ));
523    }
524    for (i, v) in values.iter().enumerate() {
525        if !v.is_finite() {
526            return Err(rusqlite::Error::FromSqlConversionFailure(
527                0,
528                rusqlite::types::Type::Blob,
529                Box::<dyn std::error::Error + Send + Sync>::from(format!(
530                    "corrupt sparse row {subject_id}: non-finite value at position {i}: {v}"
531                )),
532            ));
533        }
534    }
535    for window in indices.windows(2) {
536        if window[0] >= window[1] {
537            return Err(rusqlite::Error::FromSqlConversionFailure(
538                0,
539                rusqlite::types::Type::Blob,
540                Box::<dyn std::error::Error + Send + Sync>::from(format!(
541                    "corrupt sparse row {subject_id}: indices not strictly increasing at {} >= {}",
542                    window[0], window[1]
543                )),
544            ));
545        }
546    }
547    Ok(())
548}
549
550/// Sparse dot product via merge of two sorted index arrays.
551fn sparse_dot_product(q_idx: &[u32], q_val: &[f32], s_idx: &[u32], s_val: &[f32]) -> f64 {
552    let mut dot = 0.0f64;
553    let mut qi = 0;
554    let mut si = 0;
555    while qi < q_idx.len() && si < s_idx.len() {
556        match q_idx[qi].cmp(&s_idx[si]) {
557            std::cmp::Ordering::Equal => {
558                dot += q_val[qi] as f64 * s_val[si] as f64;
559                qi += 1;
560                si += 1;
561            }
562            std::cmp::Ordering::Less => qi += 1,
563            std::cmp::Ordering::Greater => si += 1,
564        }
565    }
566    dot
567}
568
569#[async_trait]
570impl SparseStore for SqliteSparseStore {
571    async fn insert_sparse(
572        &self,
573        subject_id: Uuid,
574        kind: SubstrateKind,
575        namespace: &str,
576        field: &str,
577        vector: SparseVector,
578    ) -> Result<(), StorageError> {
579        validate_sparse_vector(&vector, "sparse_insert")?;
580        self.upsert_sparse_vector(subject_id, kind, namespace, field, vector)
581            .await
582    }
583
584    async fn insert_batch(
585        &self,
586        records: Vec<SparseRecord>,
587    ) -> Result<BatchWriteSummary, StorageError> {
588        self.insert_sparse_batch(records).await
589    }
590
591    async fn delete(&self, subject_id: Uuid) -> Result<bool, StorageError> {
592        self.delete_sparse_subject(subject_id).await
593    }
594
595    async fn search_sparse(
596        &self,
597        request: SparseSearchRequest,
598    ) -> Result<Vec<SparseSearchHit>, StorageError> {
599        validate_sparse_vector(&request.query, "sparse_search")?;
600        self.search_sparse_vectors(request).await
601    }
602
603    async fn count(&self) -> Result<u64, StorageError> {
604        self.count_sparse_rows().await
605    }
606}
607
608#[cfg(test)]
609mod tests {
610    use super::*;
611    use crate::pool::{ConnectionPool, PoolConfig};
612
613    fn make_store(model_key: &str) -> SqliteSparseStore {
614        let config = PoolConfig {
615            path: None,
616            ..PoolConfig::default()
617        };
618        let pool = Arc::new(ConnectionPool::new(config).expect("pool"));
619        // Create schema.
620        {
621            let writer = pool.try_writer().expect("writer");
622            ensure_sparse_schema(writer.conn(), model_key).expect("schema");
623        }
624        SqliteSparseStore::new(pool, false, model_key.to_string(), "ns:test".to_string())
625            .expect("store")
626    }
627
628    fn sv(indices: Vec<u32>, values: Vec<f32>) -> SparseVector {
629        SparseVector { indices, values }
630    }
631
632    #[tokio::test]
633    async fn insert_and_count() {
634        let store = make_store("test_count");
635        let id = Uuid::new_v4();
636        store
637            .insert_sparse(
638                id,
639                SubstrateKind::Entity,
640                "ns:test",
641                "body",
642                sv(vec![0, 2], vec![1.0, 0.5]),
643            )
644            .await
645            .unwrap();
646        assert_eq!(store.count().await.unwrap(), 1);
647    }
648
649    #[tokio::test]
650    async fn insert_and_search() {
651        let store = make_store("test_search");
652        let id1 = Uuid::new_v4();
653        let id2 = Uuid::new_v4();
654        store
655            .insert_sparse(
656                id1,
657                SubstrateKind::Entity,
658                "ns:test",
659                "body",
660                sv(vec![0, 1], vec![1.0, 0.0]),
661            )
662            .await
663            .unwrap();
664        store
665            .insert_sparse(
666                id2,
667                SubstrateKind::Entity,
668                "ns:test",
669                "body",
670                sv(vec![0, 1], vec![0.0, 1.0]),
671            )
672            .await
673            .unwrap();
674
675        let hits = store
676            .search_sparse(SparseSearchRequest {
677                query: sv(vec![0], vec![1.0]),
678                top_k: 2,
679                namespace: Some("ns:test".into()),
680                kind: None,
681            })
682            .await
683            .unwrap();
684
685        assert!(!hits.is_empty());
686        assert_eq!(hits[0].subject_id, id1, "id1 should rank first");
687        assert_eq!(hits[0].rank, 1);
688    }
689
690    #[tokio::test]
691    async fn delete_removes_row() {
692        let store = make_store("test_delete");
693        let id = Uuid::new_v4();
694        store
695            .insert_sparse(
696                id,
697                SubstrateKind::Entity,
698                "ns:test",
699                "body",
700                sv(vec![1], vec![1.0]),
701            )
702            .await
703            .unwrap();
704        assert_eq!(store.count().await.unwrap(), 1);
705
706        let deleted = store.delete(id).await.unwrap();
707        assert!(deleted);
708        assert_eq!(store.count().await.unwrap(), 0);
709    }
710
711    #[tokio::test]
712    async fn mismatched_lengths_rejected() {
713        let store = make_store("test_mismatch");
714        let result = store
715            .insert_sparse(
716                Uuid::new_v4(),
717                SubstrateKind::Entity,
718                "ns:test",
719                "body",
720                SparseVector {
721                    indices: vec![0, 1],
722                    values: vec![1.0],
723                },
724            )
725            .await;
726        assert!(matches!(result, Err(StorageError::InvalidInput { .. })));
727    }
728
729    #[tokio::test]
730    async fn non_finite_values_rejected() {
731        let store = make_store("test_nonfinite");
732        let result = store
733            .insert_sparse(
734                Uuid::new_v4(),
735                SubstrateKind::Entity,
736                "ns:test",
737                "body",
738                sv(vec![0], vec![f32::NAN]),
739            )
740            .await;
741        assert!(matches!(result, Err(StorageError::InvalidInput { .. })));
742    }
743
744    #[tokio::test]
745    async fn duplicate_indices_rejected() {
746        let store = make_store("test_dup_idx");
747        let result = store
748            .insert_sparse(
749                Uuid::new_v4(),
750                SubstrateKind::Entity,
751                "ns:test",
752                "body",
753                sv(vec![0, 0], vec![1.0, 2.0]),
754            )
755            .await;
756        assert!(matches!(result, Err(StorageError::InvalidInput { .. })));
757    }
758
759    #[tokio::test]
760    async fn empty_vector_rejected() {
761        let store = make_store("test_empty");
762        let result = store
763            .insert_sparse(
764                Uuid::new_v4(),
765                SubstrateKind::Entity,
766                "ns:test",
767                "body",
768                sv(vec![], vec![]),
769            )
770            .await;
771        assert!(matches!(result, Err(StorageError::InvalidInput { .. })));
772    }
773
774    #[tokio::test]
775    async fn namespace_isolation() {
776        let store = make_store("test_ns_iso");
777        let id = Uuid::new_v4();
778        store
779            .insert_sparse(
780                id,
781                SubstrateKind::Entity,
782                "ns:a",
783                "body",
784                sv(vec![0], vec![1.0]),
785            )
786            .await
787            .unwrap();
788
789        let hits = store
790            .search_sparse(SparseSearchRequest {
791                query: sv(vec![0], vec![1.0]),
792                top_k: 5,
793                namespace: Some("ns:b".into()),
794                kind: None,
795            })
796            .await
797            .unwrap();
798        assert!(hits.is_empty(), "ns:b should not see ns:a data");
799    }
800
801    #[tokio::test]
802    async fn insert_batch_happy_path() {
803        use chrono::Utc;
804        use khive_types::SubstrateKind;
805
806        let store = make_store("test_batch");
807        let id1 = Uuid::new_v4();
808        let id2 = Uuid::new_v4();
809        let records = vec![
810            SparseRecord {
811                subject_id: id1,
812                kind: SubstrateKind::Entity,
813                namespace: "ns:test".into(),
814                field: "body".into(),
815                vector: sv(vec![0, 3], vec![0.5, 0.8]),
816                updated_at: Utc::now(),
817            },
818            SparseRecord {
819                subject_id: id2,
820                kind: SubstrateKind::Entity,
821                namespace: "ns:test".into(),
822                field: "body".into(),
823                vector: sv(vec![1], vec![1.0]),
824                updated_at: Utc::now(),
825            },
826        ];
827        let summary = store.insert_batch(records).await.unwrap();
828        assert_eq!(summary.attempted, 2);
829        assert_eq!(summary.affected, 2);
830        assert_eq!(summary.failed, 0);
831        assert_eq!(store.count().await.unwrap(), 2);
832    }
833}