1use std::cmp::Reverse;
4use std::collections::BinaryHeap;
5use std::sync::Arc;
6
7use async_trait::async_trait;
8use uuid::Uuid;
9
10use khive_score::DeterministicScore;
11use khive_storage::error::StorageError;
12use khive_storage::types::{
13 BatchWriteSummary, SparseRecord, SparseSearchHit, SparseSearchRequest, SparseVector,
14};
15use khive_storage::{SparseStore, StorageCapability};
16use khive_types::SubstrateKind;
17
18use crate::error::SqliteError;
19use crate::pool::ConnectionPool;
20
21fn map_err(e: rusqlite::Error, op: &'static str) -> StorageError {
22 StorageError::driver(StorageCapability::Sparse, op, e)
23}
24
25fn map_sqlite_err(e: SqliteError, op: &'static str) -> StorageError {
26 StorageError::driver(StorageCapability::Sparse, op, e)
27}
28
29fn validate_sparse_vector(vector: &SparseVector, op: &'static str) -> Result<(), StorageError> {
36 if vector.indices.len() != vector.values.len() {
37 return Err(StorageError::InvalidInput {
38 capability: StorageCapability::Sparse,
39 operation: op.into(),
40 message: format!(
41 "indices length ({}) != values length ({})",
42 vector.indices.len(),
43 vector.values.len()
44 ),
45 });
46 }
47 if vector.indices.is_empty() {
48 return Err(StorageError::InvalidInput {
49 capability: StorageCapability::Sparse,
50 operation: op.into(),
51 message: "sparse vector must have at least one element".into(),
52 });
53 }
54 for (i, v) in vector.values.iter().enumerate() {
55 if !v.is_finite() {
56 return Err(StorageError::InvalidInput {
57 capability: StorageCapability::Sparse,
58 operation: op.into(),
59 message: format!("non-finite value at position {i}: {v}"),
60 });
61 }
62 }
63 for window in vector.indices.windows(2) {
65 if window[0] >= window[1] {
66 return Err(StorageError::InvalidInput {
67 capability: StorageCapability::Sparse,
68 operation: op.into(),
69 message: format!(
70 "indices must be strictly increasing; found {} then {}",
71 window[0], window[1]
72 ),
73 });
74 }
75 }
76 Ok(())
77}
78
79fn f32_slice_as_bytes(data: &[f32]) -> &[u8] {
81 unsafe { std::slice::from_raw_parts(data.as_ptr() as *const u8, std::mem::size_of_val(data)) }
83}
84
85pub(crate) fn ensure_sparse_schema(
87 conn: &rusqlite::Connection,
88 model_key: &str,
89) -> Result<(), rusqlite::Error> {
90 let table = format!("sparse_{}", model_key);
91 let ddl = format!(
92 "CREATE TABLE IF NOT EXISTS {table} (\
93 subject_id TEXT NOT NULL, \
94 namespace TEXT NOT NULL, \
95 kind TEXT NOT NULL, \
96 field TEXT NOT NULL, \
97 indices_json TEXT NOT NULL, \
98 values_blob BLOB NOT NULL, \
99 updated_at INTEGER NOT NULL, \
100 PRIMARY KEY(subject_id, namespace, field)\
101 ); \
102 CREATE INDEX IF NOT EXISTS idx_{table}_namespace_kind \
103 ON {table}(namespace, kind);"
104 );
105 conn.execute_batch(&ddl)
106}
107
108pub struct SqliteSparseStore {
110 pool: Arc<ConnectionPool>,
111 is_file_backed: bool,
112 table_name: String,
113 namespace: String,
114}
115
116impl SqliteSparseStore {
117 pub fn new(
119 pool: Arc<ConnectionPool>,
120 is_file_backed: bool,
121 model_key: String,
122 namespace: String,
123 ) -> Result<Self, SqliteError> {
124 let table_name = format!("sparse_{}", model_key);
125 Ok(Self {
126 pool,
127 is_file_backed,
128 table_name,
129 namespace,
130 })
131 }
132
133 async fn with_writer<F, R>(&self, op: &'static str, f: F) -> Result<R, StorageError>
134 where
135 F: FnOnce(&rusqlite::Connection) -> Result<R, rusqlite::Error> + Send + 'static,
136 R: Send + 'static,
137 {
138 let pool = Arc::clone(&self.pool);
139 tokio::task::spawn_blocking(move || {
140 let guard = pool.try_writer().map_err(|e| map_sqlite_err(e, op))?;
141 f(guard.conn()).map_err(|e| map_err(e, op))
142 })
143 .await
144 .map_err(|e| StorageError::driver(StorageCapability::Sparse, op, e))?
145 }
146
147 async fn with_reader<F, R>(&self, op: &'static str, f: F) -> Result<R, StorageError>
148 where
149 F: FnOnce(&rusqlite::Connection) -> Result<R, rusqlite::Error> + Send + 'static,
150 R: Send + 'static,
151 {
152 if self.is_file_backed {
153 let config = self.pool.config();
155 let path = config.path.as_ref().ok_or_else(|| StorageError::Pool {
156 operation: "sparse_reader".into(),
157 message: "in-memory databases do not support standalone connections".into(),
158 })?;
159 let conn = rusqlite::Connection::open_with_flags(
160 path,
161 rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY
162 | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX
163 | rusqlite::OpenFlags::SQLITE_OPEN_URI,
164 )
165 .map_err(|e| map_err(e, op))?;
166 tokio::task::spawn_blocking(move || f(&conn).map_err(|e| map_err(e, op)))
167 .await
168 .map_err(|e| StorageError::driver(StorageCapability::Sparse, op, e))?
169 } else {
170 let pool = Arc::clone(&self.pool);
171 tokio::task::spawn_blocking(move || {
172 let guard = pool.reader().map_err(|e| map_sqlite_err(e, op))?;
173 f(guard.conn()).map_err(|e| map_err(e, op))
174 })
175 .await
176 .map_err(|e| StorageError::driver(StorageCapability::Sparse, op, e))?
177 }
178 }
179
180 async fn upsert_sparse_vector(
181 &self,
182 subject_id: Uuid,
183 kind: SubstrateKind,
184 namespace: &str,
185 field: &str,
186 vector: SparseVector,
187 ) -> Result<(), StorageError> {
188 let table = self.table_name.clone();
189 let ns = namespace.to_string();
190 let field = field.to_string();
191 let id_str = subject_id.to_string();
192 let kind_str = kind.to_string();
193
194 self.with_writer("sparse_upsert", move |conn| {
195 let indices_json = serde_json::to_string(&vector.indices).map_err(|e| {
196 rusqlite::Error::FromSqlConversionFailure(
197 0,
198 rusqlite::types::Type::Text,
199 Box::new(e),
200 )
201 })?;
202 let values_blob = f32_slice_as_bytes(&vector.values);
203 let now = chrono::Utc::now().timestamp();
204 let sql = format!(
205 "INSERT INTO {table} \
206 (subject_id, namespace, kind, field, indices_json, values_blob, updated_at) \
207 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7) \
208 ON CONFLICT(subject_id, namespace, field) DO UPDATE SET \
209 kind = excluded.kind, \
210 indices_json = excluded.indices_json, \
211 values_blob = excluded.values_blob, \
212 updated_at = excluded.updated_at"
213 );
214 conn.execute(
215 &sql,
216 rusqlite::params![
217 &id_str,
218 &ns,
219 &kind_str,
220 &field,
221 &indices_json,
222 values_blob,
223 now
224 ],
225 )?;
226 Ok(())
227 })
228 .await
229 }
230
231 async fn insert_sparse_batch(
232 &self,
233 records: Vec<SparseRecord>,
234 ) -> Result<BatchWriteSummary, StorageError> {
235 let table = self.table_name.clone();
236 let attempted = records.len() as u64;
237
238 self.with_writer("sparse_insert_batch", move |conn| {
239 let sql = format!(
240 "INSERT INTO {table} \
241 (subject_id, namespace, kind, field, indices_json, values_blob, updated_at) \
242 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7) \
243 ON CONFLICT(subject_id, namespace, field) DO UPDATE SET \
244 indices_json = excluded.indices_json, \
245 values_blob = excluded.values_blob, \
246 updated_at = excluded.updated_at"
247 );
248
249 conn.execute_batch("BEGIN IMMEDIATE")?;
250 let mut affected = 0u64;
251 let mut failed = 0u64;
252 let mut first_error = String::new();
253
254 for record in &records {
255 if record.vector.indices.len() != record.vector.values.len()
257 || record.vector.indices.is_empty()
258 || record.vector.values.iter().any(|v| !v.is_finite())
259 || record.vector.indices.windows(2).any(|w| w[0] >= w[1])
260 {
261 if first_error.is_empty() {
262 first_error =
263 format!("invalid sparse vector for subject {}", record.subject_id);
264 }
265 failed += 1;
266 continue;
267 }
268
269 let indices_json = match serde_json::to_string(&record.vector.indices) {
270 Ok(j) => j,
271 Err(e) => {
272 if first_error.is_empty() {
273 first_error = e.to_string();
274 }
275 failed += 1;
276 continue;
277 }
278 };
279 let values_blob = f32_slice_as_bytes(&record.vector.values);
280 let now = record.updated_at.timestamp();
281 let id_str = record.subject_id.to_string();
282 let kind_str = record.kind.to_string();
283
284 match conn.execute(
285 &sql,
286 rusqlite::params![
287 &id_str,
288 &record.namespace,
289 &kind_str,
290 &record.field,
291 &indices_json,
292 values_blob,
293 now
294 ],
295 ) {
296 Ok(_) => affected += 1,
297 Err(e) => {
298 if first_error.is_empty() {
299 first_error = e.to_string();
300 }
301 failed += 1;
302 }
303 }
304 }
305
306 conn.execute_batch("COMMIT")?;
307 Ok(BatchWriteSummary {
308 attempted,
309 affected,
310 failed,
311 first_error,
312 })
313 })
314 .await
315 }
316
317 async fn delete_sparse_subject(&self, subject_id: Uuid) -> Result<bool, StorageError> {
318 let table = self.table_name.clone();
319 let namespace = self.namespace.clone();
320 let id_str = subject_id.to_string();
321
322 self.with_writer("sparse_delete", move |conn| {
323 let sql = format!("DELETE FROM {table} WHERE subject_id = ?1 AND namespace = ?2");
324 let deleted = conn.execute(&sql, rusqlite::params![&id_str, &namespace])?;
325 Ok(deleted > 0)
326 })
327 .await
328 }
329
330 async fn search_sparse_vectors(
331 &self,
332 request: SparseSearchRequest,
333 ) -> Result<Vec<SparseSearchHit>, StorageError> {
334 let table = self.table_name.clone();
335 let ns = request
336 .namespace
337 .clone()
338 .unwrap_or_else(|| self.namespace.clone());
339 let kind_filter = request.kind.map(|k| k.to_string());
340 let query = request.query;
341 let top_k = request.top_k as usize;
342
343 self.with_reader("sparse_search", move |conn| {
344 let (sql, kind_str_ref) = if let Some(ref kind_str) = kind_filter {
346 (
347 format!(
348 "SELECT subject_id, indices_json, values_blob \
349 FROM {table} WHERE namespace = ?1 AND kind = ?2"
350 ),
351 Some(kind_str.as_str()),
352 )
353 } else {
354 (
355 format!(
356 "SELECT subject_id, indices_json, values_blob \
357 FROM {table} WHERE namespace = ?1"
358 ),
359 None,
360 )
361 };
362
363 let mut stmt = conn.prepare(&sql)?;
364
365 let rows: Vec<rusqlite::Result<(String, String, Vec<u8>)>> =
367 if let Some(kind_str) = kind_str_ref {
368 stmt.query_map(rusqlite::params![&ns, kind_str], |row| {
369 Ok((row.get(0)?, row.get(1)?, row.get(2)?))
370 })?
371 .collect()
372 } else {
373 stmt.query_map(rusqlite::params![&ns], |row| {
374 Ok((row.get(0)?, row.get(1)?, row.get(2)?))
375 })?
376 .collect()
377 };
378
379 let mut heap: BinaryHeap<Reverse<ScoredCandidate>> =
381 BinaryHeap::with_capacity(top_k + 1);
382
383 for row_result in rows {
384 let (id_str, indices_json, values_blob) = row_result?;
385
386 let subject_id = Uuid::parse_str(&id_str).map_err(|e| {
387 rusqlite::Error::FromSqlConversionFailure(
388 0,
389 rusqlite::types::Type::Text,
390 Box::new(e),
391 )
392 })?;
393
394 let stored_indices: Vec<u32> =
396 serde_json::from_str(&indices_json).map_err(|e| {
397 rusqlite::Error::FromSqlConversionFailure(
398 0,
399 rusqlite::types::Type::Text,
400 Box::<dyn std::error::Error + Send + Sync>::from(format!(
401 "corrupt sparse row {id_str}: invalid indices JSON: {e}"
402 )),
403 )
404 })?;
405
406 if values_blob.len() % 4 != 0 {
407 return Err(rusqlite::Error::FromSqlConversionFailure(
408 0,
409 rusqlite::types::Type::Blob,
410 Box::<dyn std::error::Error + Send + Sync>::from(format!(
411 "corrupt sparse row {id_str}: values blob length {} not a multiple of 4",
412 values_blob.len()
413 )),
414 ));
415 }
416
417 let stored_values: Vec<f32> = values_blob
418 .chunks_exact(4)
419 .map(|b| f32::from_le_bytes([b[0], b[1], b[2], b[3]]))
420 .collect();
421
422 validate_persisted_sparse(&id_str, &stored_indices, &stored_values)?;
423
424 let score = sparse_dot_product(
425 &query.indices,
426 &query.values,
427 &stored_indices,
428 &stored_values,
429 );
430
431 heap.push(Reverse(ScoredCandidate { score, subject_id }));
432 if heap.len() > top_k {
433 heap.pop();
434 }
435 }
436
437 let mut top: Vec<_> = heap.into_iter().map(|Reverse(c)| c).collect();
439 top.sort_by(|a, b| {
440 b.score
441 .partial_cmp(&a.score)
442 .unwrap_or(std::cmp::Ordering::Equal)
443 .then_with(|| a.subject_id.cmp(&b.subject_id))
444 });
445
446 let hits = top
447 .into_iter()
448 .enumerate()
449 .map(|(i, c)| SparseSearchHit {
450 subject_id: c.subject_id,
451 score: DeterministicScore::from_f64(c.score),
452 rank: (i + 1) as u32,
453 })
454 .collect();
455
456 Ok(hits)
457 })
458 .await
459 }
460
461 async fn count_sparse_rows(&self) -> Result<u64, StorageError> {
462 let table = self.table_name.clone();
463 let namespace = self.namespace.clone();
464 self.with_reader("sparse_count", move |conn| {
465 let sql = format!("SELECT COUNT(*) FROM {table} WHERE namespace = ?1");
466 let count: i64 =
467 conn.query_row(&sql, rusqlite::params![&namespace], |row| row.get(0))?;
468 Ok(count as u64)
469 })
470 .await
471 }
472}
473
474#[derive(PartialEq)]
477struct ScoredCandidate {
478 score: f64,
479 subject_id: Uuid,
480}
481
482impl Eq for ScoredCandidate {}
483
484impl PartialOrd for ScoredCandidate {
485 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
486 Some(self.cmp(other))
487 }
488}
489
490impl Ord for ScoredCandidate {
491 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
492 match self
495 .score
496 .partial_cmp(&other.score)
497 .unwrap_or(std::cmp::Ordering::Equal)
498 {
499 std::cmp::Ordering::Equal => other.subject_id.cmp(&self.subject_id),
500 ord => ord,
501 }
502 }
503}
504
505fn validate_persisted_sparse(
509 subject_id: &str,
510 indices: &[u32],
511 values: &[f32],
512) -> Result<(), rusqlite::Error> {
513 if indices.len() != values.len() {
514 return Err(rusqlite::Error::FromSqlConversionFailure(
515 0,
516 rusqlite::types::Type::Blob,
517 Box::<dyn std::error::Error + Send + Sync>::from(format!(
518 "corrupt sparse row {subject_id}: indices len {} != values len {}",
519 indices.len(),
520 values.len()
521 )),
522 ));
523 }
524 for (i, v) in values.iter().enumerate() {
525 if !v.is_finite() {
526 return Err(rusqlite::Error::FromSqlConversionFailure(
527 0,
528 rusqlite::types::Type::Blob,
529 Box::<dyn std::error::Error + Send + Sync>::from(format!(
530 "corrupt sparse row {subject_id}: non-finite value at position {i}: {v}"
531 )),
532 ));
533 }
534 }
535 for window in indices.windows(2) {
536 if window[0] >= window[1] {
537 return Err(rusqlite::Error::FromSqlConversionFailure(
538 0,
539 rusqlite::types::Type::Blob,
540 Box::<dyn std::error::Error + Send + Sync>::from(format!(
541 "corrupt sparse row {subject_id}: indices not strictly increasing at {} >= {}",
542 window[0], window[1]
543 )),
544 ));
545 }
546 }
547 Ok(())
548}
549
550fn sparse_dot_product(q_idx: &[u32], q_val: &[f32], s_idx: &[u32], s_val: &[f32]) -> f64 {
552 let mut dot = 0.0f64;
553 let mut qi = 0;
554 let mut si = 0;
555 while qi < q_idx.len() && si < s_idx.len() {
556 match q_idx[qi].cmp(&s_idx[si]) {
557 std::cmp::Ordering::Equal => {
558 dot += q_val[qi] as f64 * s_val[si] as f64;
559 qi += 1;
560 si += 1;
561 }
562 std::cmp::Ordering::Less => qi += 1,
563 std::cmp::Ordering::Greater => si += 1,
564 }
565 }
566 dot
567}
568
569#[async_trait]
570impl SparseStore for SqliteSparseStore {
571 async fn insert_sparse(
572 &self,
573 subject_id: Uuid,
574 kind: SubstrateKind,
575 namespace: &str,
576 field: &str,
577 vector: SparseVector,
578 ) -> Result<(), StorageError> {
579 validate_sparse_vector(&vector, "sparse_insert")?;
580 self.upsert_sparse_vector(subject_id, kind, namespace, field, vector)
581 .await
582 }
583
584 async fn insert_batch(
585 &self,
586 records: Vec<SparseRecord>,
587 ) -> Result<BatchWriteSummary, StorageError> {
588 self.insert_sparse_batch(records).await
589 }
590
591 async fn delete(&self, subject_id: Uuid) -> Result<bool, StorageError> {
592 self.delete_sparse_subject(subject_id).await
593 }
594
595 async fn search_sparse(
596 &self,
597 request: SparseSearchRequest,
598 ) -> Result<Vec<SparseSearchHit>, StorageError> {
599 validate_sparse_vector(&request.query, "sparse_search")?;
600 self.search_sparse_vectors(request).await
601 }
602
603 async fn count(&self) -> Result<u64, StorageError> {
604 self.count_sparse_rows().await
605 }
606}
607
608#[cfg(test)]
609mod tests {
610 use super::*;
611 use crate::pool::{ConnectionPool, PoolConfig};
612
613 fn make_store(model_key: &str) -> SqliteSparseStore {
614 let config = PoolConfig {
615 path: None,
616 ..PoolConfig::default()
617 };
618 let pool = Arc::new(ConnectionPool::new(config).expect("pool"));
619 {
621 let writer = pool.try_writer().expect("writer");
622 ensure_sparse_schema(writer.conn(), model_key).expect("schema");
623 }
624 SqliteSparseStore::new(pool, false, model_key.to_string(), "ns:test".to_string())
625 .expect("store")
626 }
627
628 fn sv(indices: Vec<u32>, values: Vec<f32>) -> SparseVector {
629 SparseVector { indices, values }
630 }
631
632 #[tokio::test]
633 async fn insert_and_count() {
634 let store = make_store("test_count");
635 let id = Uuid::new_v4();
636 store
637 .insert_sparse(
638 id,
639 SubstrateKind::Entity,
640 "ns:test",
641 "body",
642 sv(vec![0, 2], vec![1.0, 0.5]),
643 )
644 .await
645 .unwrap();
646 assert_eq!(store.count().await.unwrap(), 1);
647 }
648
649 #[tokio::test]
650 async fn insert_and_search() {
651 let store = make_store("test_search");
652 let id1 = Uuid::new_v4();
653 let id2 = Uuid::new_v4();
654 store
655 .insert_sparse(
656 id1,
657 SubstrateKind::Entity,
658 "ns:test",
659 "body",
660 sv(vec![0, 1], vec![1.0, 0.0]),
661 )
662 .await
663 .unwrap();
664 store
665 .insert_sparse(
666 id2,
667 SubstrateKind::Entity,
668 "ns:test",
669 "body",
670 sv(vec![0, 1], vec![0.0, 1.0]),
671 )
672 .await
673 .unwrap();
674
675 let hits = store
676 .search_sparse(SparseSearchRequest {
677 query: sv(vec![0], vec![1.0]),
678 top_k: 2,
679 namespace: Some("ns:test".into()),
680 kind: None,
681 })
682 .await
683 .unwrap();
684
685 assert!(!hits.is_empty());
686 assert_eq!(hits[0].subject_id, id1, "id1 should rank first");
687 assert_eq!(hits[0].rank, 1);
688 }
689
690 #[tokio::test]
691 async fn delete_removes_row() {
692 let store = make_store("test_delete");
693 let id = Uuid::new_v4();
694 store
695 .insert_sparse(
696 id,
697 SubstrateKind::Entity,
698 "ns:test",
699 "body",
700 sv(vec![1], vec![1.0]),
701 )
702 .await
703 .unwrap();
704 assert_eq!(store.count().await.unwrap(), 1);
705
706 let deleted = store.delete(id).await.unwrap();
707 assert!(deleted);
708 assert_eq!(store.count().await.unwrap(), 0);
709 }
710
711 #[tokio::test]
712 async fn mismatched_lengths_rejected() {
713 let store = make_store("test_mismatch");
714 let result = store
715 .insert_sparse(
716 Uuid::new_v4(),
717 SubstrateKind::Entity,
718 "ns:test",
719 "body",
720 SparseVector {
721 indices: vec![0, 1],
722 values: vec![1.0],
723 },
724 )
725 .await;
726 assert!(matches!(result, Err(StorageError::InvalidInput { .. })));
727 }
728
729 #[tokio::test]
730 async fn non_finite_values_rejected() {
731 let store = make_store("test_nonfinite");
732 let result = store
733 .insert_sparse(
734 Uuid::new_v4(),
735 SubstrateKind::Entity,
736 "ns:test",
737 "body",
738 sv(vec![0], vec![f32::NAN]),
739 )
740 .await;
741 assert!(matches!(result, Err(StorageError::InvalidInput { .. })));
742 }
743
744 #[tokio::test]
745 async fn duplicate_indices_rejected() {
746 let store = make_store("test_dup_idx");
747 let result = store
748 .insert_sparse(
749 Uuid::new_v4(),
750 SubstrateKind::Entity,
751 "ns:test",
752 "body",
753 sv(vec![0, 0], vec![1.0, 2.0]),
754 )
755 .await;
756 assert!(matches!(result, Err(StorageError::InvalidInput { .. })));
757 }
758
759 #[tokio::test]
760 async fn empty_vector_rejected() {
761 let store = make_store("test_empty");
762 let result = store
763 .insert_sparse(
764 Uuid::new_v4(),
765 SubstrateKind::Entity,
766 "ns:test",
767 "body",
768 sv(vec![], vec![]),
769 )
770 .await;
771 assert!(matches!(result, Err(StorageError::InvalidInput { .. })));
772 }
773
774 #[tokio::test]
775 async fn namespace_isolation() {
776 let store = make_store("test_ns_iso");
777 let id = Uuid::new_v4();
778 store
779 .insert_sparse(
780 id,
781 SubstrateKind::Entity,
782 "ns:a",
783 "body",
784 sv(vec![0], vec![1.0]),
785 )
786 .await
787 .unwrap();
788
789 let hits = store
790 .search_sparse(SparseSearchRequest {
791 query: sv(vec![0], vec![1.0]),
792 top_k: 5,
793 namespace: Some("ns:b".into()),
794 kind: None,
795 })
796 .await
797 .unwrap();
798 assert!(hits.is_empty(), "ns:b should not see ns:a data");
799 }
800
801 #[tokio::test]
802 async fn insert_batch_happy_path() {
803 use chrono::Utc;
804 use khive_types::SubstrateKind;
805
806 let store = make_store("test_batch");
807 let id1 = Uuid::new_v4();
808 let id2 = Uuid::new_v4();
809 let records = vec![
810 SparseRecord {
811 subject_id: id1,
812 kind: SubstrateKind::Entity,
813 namespace: "ns:test".into(),
814 field: "body".into(),
815 vector: sv(vec![0, 3], vec![0.5, 0.8]),
816 updated_at: Utc::now(),
817 },
818 SparseRecord {
819 subject_id: id2,
820 kind: SubstrateKind::Entity,
821 namespace: "ns:test".into(),
822 field: "body".into(),
823 vector: sv(vec![1], vec![1.0]),
824 updated_at: Utc::now(),
825 },
826 ];
827 let summary = store.insert_batch(records).await.unwrap();
828 assert_eq!(summary.attempted, 2);
829 assert_eq!(summary.affected, 2);
830 assert_eq!(summary.failed, 0);
831 assert_eq!(store.count().await.unwrap(), 2);
832 }
833}