1use crate::{
13 embeddings::{EmbeddableContent, EmbeddingStrategy},
14 rdf_integration::{RdfVectorConfig, RdfVectorIntegration},
15 sparql_integration::SparqlVectorService,
16 Vector, VectorId, VectorStore, VectorStoreTrait,
17};
18use anyhow::{anyhow, Result};
19use parking_lot::RwLock;
20use serde::{Deserialize, Serialize};
21use std::collections::{HashMap, HashSet, VecDeque};
22use std::sync::{
23 atomic::{AtomicU64, Ordering},
24 Arc,
25};
26use std::time::{Duration, Instant, SystemTime};
27
28#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct StoreIntegrationConfig {
31 pub real_time_sync: bool,
33 pub batch_size: usize,
35 pub transaction_timeout: Duration,
37 pub incremental_updates: bool,
39 pub consistency_level: ConsistencyLevel,
41 pub conflict_resolution: ConflictResolution,
43 pub multi_tenant: bool,
45 pub cache_config: StoreCacheConfig,
47 pub streaming_config: StreamingConfig,
49 pub replication_config: ReplicationConfig,
51}
52
53#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
55pub enum ConsistencyLevel {
56 Eventual,
58 Session,
60 Strong,
62 Causal,
64}
65
66#[derive(Debug, Clone, Serialize, Deserialize)]
68pub enum ConflictResolution {
69 LastWriteWins,
71 FirstWriteWins,
73 Merge,
75 Custom(String),
77 Manual,
79}
80
81#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct StoreCacheConfig {
84 pub enable_vector_cache: bool,
86 pub enable_query_cache: bool,
88 pub cache_size_mb: usize,
90 pub cache_ttl: Duration,
92 pub enable_compression: bool,
94}
95
96#[derive(Debug, Clone, Serialize, Deserialize)]
98pub struct StreamingConfig {
99 pub enable_streaming: bool,
101 pub buffer_size: usize,
103 pub flush_interval: Duration,
105 pub enable_backpressure: bool,
107 pub max_lag: Duration,
109}
110
111#[derive(Debug, Clone, Serialize, Deserialize)]
113pub struct ReplicationConfig {
114 pub enable_replication: bool,
116 pub replication_factor: usize,
118 pub synchronous: bool,
120 pub replica_endpoints: Vec<String>,
122}
123
124pub struct IntegratedVectorStore {
126 config: StoreIntegrationConfig,
127 vector_store: Arc<RwLock<VectorStore>>,
128 rdf_integration: Arc<RwLock<RdfVectorIntegration>>,
129 sparql_service: Arc<RwLock<SparqlVectorService>>,
130 transaction_manager: Arc<TransactionManager>,
131 streaming_engine: Arc<StreamingEngine>,
132 cache_manager: Arc<CacheManager>,
133 replication_manager: Arc<ReplicationManager>,
134 consistency_manager: Arc<ConsistencyManager>,
135 change_log: Arc<ChangeLog>,
136 metrics: Arc<StoreMetrics>,
137}
138
139pub struct TransactionManager {
141 active_transactions: Arc<RwLock<HashMap<TransactionId, Transaction>>>,
142 transaction_counter: AtomicU64,
143 config: StoreIntegrationConfig,
144 write_ahead_log: Arc<WriteAheadLog>,
145 lock_manager: Arc<LockManager>,
146}
147
148#[derive(Debug, Clone)]
150pub struct Transaction {
151 pub id: TransactionId,
152 pub start_time: SystemTime,
153 pub timeout: Duration,
154 pub operations: Vec<TransactionOperation>,
155 pub status: TransactionStatus,
156 pub isolation_level: IsolationLevel,
157 pub read_set: HashSet<String>,
158 pub write_set: HashSet<String>,
159}
160
161pub type TransactionId = u64;
162
163#[derive(Debug, Clone, Copy, PartialEq)]
165pub enum TransactionStatus {
166 Active,
167 Committed,
168 Aborted,
169 Preparing,
170 Prepared,
171}
172
173#[derive(Debug, Clone, Copy)]
175pub enum IsolationLevel {
176 ReadUncommitted,
177 ReadCommitted,
178 RepeatableRead,
179 Serializable,
180}
181
182#[derive(Debug, Clone)]
184pub enum TransactionOperation {
185 Insert {
186 uri: String,
187 vector: Vector,
188 embedding_content: Option<EmbeddableContent>,
189 },
190 Update {
191 uri: String,
192 vector: Vector,
193 old_vector: Option<Vector>,
194 },
195 Delete {
196 uri: String,
197 vector: Option<Vector>,
198 },
199 BatchInsert {
200 items: Vec<(String, Vector)>,
201 },
202 IndexRebuild {
203 algorithm: String,
204 parameters: HashMap<String, String>,
205 },
206}
207
208pub struct WriteAheadLog {
210 log_entries: Arc<RwLock<VecDeque<LogEntry>>>,
211 log_file: Option<String>,
212 checkpoint_interval: Duration,
213 last_checkpoint: Arc<RwLock<SystemTime>>,
214}
215
216#[derive(Debug, Clone, Serialize, Deserialize)]
218pub struct LogEntry {
219 pub lsn: u64, pub transaction_id: TransactionId,
221 pub operation: SerializableOperation,
222 pub timestamp: SystemTime,
223 pub checksum: u64,
224}
225
226#[derive(Debug, Clone, Serialize, Deserialize)]
228pub enum SerializableOperation {
229 Insert {
230 uri: String,
231 vector_data: Vec<f32>,
232 },
233 Update {
234 uri: String,
235 new_vector: Vec<f32>,
236 old_vector: Option<Vec<f32>>,
237 },
238 Delete {
239 uri: String,
240 },
241 Commit {
242 transaction_id: TransactionId,
243 },
244 Abort {
245 transaction_id: TransactionId,
246 },
247}
248
249pub struct LockManager {
251 locks: Arc<RwLock<HashMap<String, LockInfo>>>,
252 deadlock_detector: Arc<DeadlockDetector>,
253}
254
255#[derive(Debug, Clone)]
257pub struct LockInfo {
258 pub lock_type: LockType,
259 pub holders: HashSet<TransactionId>,
260 pub waiters: VecDeque<(TransactionId, LockType)>,
261 pub granted_time: SystemTime,
262}
263
264#[derive(Debug, Clone, Copy, PartialEq)]
266pub enum LockType {
267 Shared,
268 Exclusive,
269 IntentionShared,
270 IntentionExclusive,
271 SharedIntentionExclusive,
272}
273
274pub struct DeadlockDetector {
276 wait_for_graph: Arc<RwLock<HashMap<TransactionId, HashSet<TransactionId>>>>,
277 detection_interval: Duration,
278}
279
280pub struct StreamingEngine {
282 config: StreamingConfig,
283 stream_buffer: Arc<RwLock<VecDeque<StreamingOperation>>>,
284 processor_thread: Option<std::thread::JoinHandle<()>>,
285 backpressure_controller: Arc<BackpressureController>,
286 stream_metrics: Arc<StreamingMetrics>,
287}
288
289#[derive(Debug, Clone)]
291pub enum StreamingOperation {
292 VectorInsert {
293 uri: String,
294 vector: Vector,
295 priority: Priority,
296 },
297 VectorUpdate {
298 uri: String,
299 vector: Vector,
300 priority: Priority,
301 },
302 VectorDelete {
303 uri: String,
304 priority: Priority,
305 },
306 EmbeddingRequest {
307 content: EmbeddableContent,
308 uri: String,
309 priority: Priority,
310 },
311 BatchOperation {
312 operations: Vec<StreamingOperation>,
313 },
314}
315
316#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
318pub enum Priority {
319 Low = 0,
320 Normal = 1,
321 High = 2,
322 Critical = 3,
323}
324
325pub struct BackpressureController {
327 current_load: Arc<RwLock<f64>>,
328 max_load_threshold: f64,
329 adaptive_batching: bool,
330 load_shedding: bool,
331}
332
333#[derive(Debug, Default)]
335pub struct StreamingMetrics {
336 pub operations_processed: AtomicU64,
337 pub operations_pending: AtomicU64,
338 pub operations_dropped: AtomicU64,
339 pub average_latency_ms: Arc<RwLock<f64>>,
340 pub throughput_ops_sec: Arc<RwLock<f64>>,
341 pub backpressure_events: AtomicU64,
342}
343
344pub struct CacheManager {
346 vector_cache: Arc<RwLock<HashMap<String, CachedVector>>>,
347 query_cache: Arc<RwLock<HashMap<String, CachedQueryResult>>>,
348 config: StoreCacheConfig,
349 cache_stats: Arc<CacheStats>,
350 eviction_policy: EvictionPolicy,
351}
352
353#[derive(Debug, Clone)]
355pub struct CachedVector {
356 pub vector: Vector,
357 pub last_accessed: SystemTime,
358 pub access_count: u64,
359 pub compression_ratio: f32,
360 pub cache_level: CacheLevel,
361}
362
363#[derive(Debug, Clone, Copy)]
365pub enum CacheLevel {
366 Memory,
367 SSD,
368 Disk,
369}
370
371#[derive(Debug, Clone)]
373pub struct CachedQueryResult {
374 pub results: Vec<(String, f32)>,
375 pub query_hash: u64,
376 pub last_accessed: SystemTime,
377 pub ttl: Duration,
378 pub hit_count: u64,
379}
380
381#[derive(Debug, Default)]
383pub struct CacheStats {
384 pub vector_cache_hits: AtomicU64,
385 pub vector_cache_misses: AtomicU64,
386 pub query_cache_hits: AtomicU64,
387 pub query_cache_misses: AtomicU64,
388 pub evictions: AtomicU64,
389 pub memory_usage_bytes: AtomicU64,
390}
391
392#[derive(Debug, Clone)]
394pub enum EvictionPolicy {
395 LRU,
396 LFU,
397 ARC, TTL,
399 Custom(String),
400}
401
402pub struct ReplicationManager {
404 config: ReplicationConfig,
405 replicas: Arc<RwLock<Vec<ReplicaInfo>>>,
406 replication_log: Arc<RwLock<VecDeque<ReplicationEntry>>>,
407 consensus_algorithm: ConsensusAlgorithm,
408 health_checker: Arc<HealthChecker>,
409}
410
411#[derive(Debug, Clone)]
413pub struct ReplicaInfo {
414 pub endpoint: String,
415 pub status: ReplicaStatus,
416 pub last_sync: SystemTime,
417 pub lag: Duration,
418 pub priority: u8,
419}
420
421#[derive(Debug, Clone, Copy, PartialEq)]
423pub enum ReplicaStatus {
424 Active,
425 Inactive,
426 Synchronizing,
427 Failed,
428 Maintenance,
429}
430
431#[derive(Debug, Clone, Serialize, Deserialize)]
433pub struct ReplicationEntry {
434 pub sequence_number: u64,
435 pub operation: SerializableOperation,
436 pub timestamp: SystemTime,
437 pub source_node: String,
438}
439
440#[derive(Debug, Clone)]
442pub enum ConsensusAlgorithm {
443 Raft,
444 PBFT, SimpleMajority,
446}
447
448pub struct HealthChecker {
450 check_interval: Duration,
451 timeout: Duration,
452 failure_threshold: u32,
453}
454
455pub struct ConsistencyManager {
457 consistency_level: ConsistencyLevel,
458 vector_clocks: Arc<RwLock<HashMap<String, VectorClock>>>,
459 conflict_resolver: Arc<ConflictResolver>,
460 causal_order_tracker: Arc<CausalOrderTracker>,
461}
462
463#[derive(Debug, Clone, Default)]
465pub struct VectorClock {
466 pub clocks: HashMap<String, u64>,
467}
468
469pub struct ConflictResolver {
471 strategy: ConflictResolution,
472 custom_resolvers: HashMap<String, Box<dyn ConflictResolverTrait>>,
473}
474
475pub trait ConflictResolverTrait: Send + Sync {
476 fn resolve_conflict(
477 &self,
478 local: &Vector,
479 remote: &Vector,
480 metadata: &ConflictMetadata,
481 ) -> Result<Vector>;
482}
483
484#[derive(Debug, Clone)]
486pub struct ConflictMetadata {
487 pub local_timestamp: SystemTime,
488 pub remote_timestamp: SystemTime,
489 pub local_version: u64,
490 pub remote_version: u64,
491 pub operation_type: String,
492}
493
494pub struct CausalOrderTracker {
496 happens_before: Arc<RwLock<HashMap<String, HashSet<String>>>>,
497}
498
499pub struct ChangeLog {
501 entries: Arc<RwLock<VecDeque<ChangeLogEntry>>>,
502 max_entries: usize,
503 subscribers: Arc<RwLock<Vec<Arc<dyn ChangeSubscriber>>>>,
504}
505
506#[derive(Debug, Clone)]
508pub struct ChangeLogEntry {
509 pub id: u64,
510 pub timestamp: SystemTime,
511 pub operation: ChangeOperation,
512 pub metadata: HashMap<String, String>,
513 pub transaction_id: Option<TransactionId>,
514}
515
516#[derive(Debug, Clone)]
518pub enum ChangeOperation {
519 VectorInserted {
520 uri: String,
521 vector: Vector,
522 },
523 VectorUpdated {
524 uri: String,
525 old_vector: Vector,
526 new_vector: Vector,
527 },
528 VectorDeleted {
529 uri: String,
530 vector: Vector,
531 },
532 IndexRebuilt {
533 algorithm: String,
534 },
535 ConfigurationChanged {
536 changes: HashMap<String, String>,
537 },
538}
539
540pub trait ChangeSubscriber: Send + Sync {
542 fn on_change(&self, entry: &ChangeLogEntry) -> Result<()>;
543 fn subscriber_id(&self) -> String;
544 fn interest_patterns(&self) -> Vec<String>;
545}
546
547#[derive(Debug, Default)]
549pub struct StoreMetrics {
550 pub total_vectors: AtomicU64,
551 pub total_operations: AtomicU64,
552 pub successful_operations: AtomicU64,
553 pub failed_operations: AtomicU64,
554 pub average_operation_time_ms: Arc<RwLock<f64>>,
555 pub active_transactions: AtomicU64,
556 pub committed_transactions: AtomicU64,
557 pub aborted_transactions: AtomicU64,
558 pub replication_lag_ms: Arc<RwLock<f64>>,
559 pub consistency_violations: AtomicU64,
560}
561
562impl Default for StoreIntegrationConfig {
563 fn default() -> Self {
564 Self {
565 real_time_sync: true,
566 batch_size: 1000,
567 transaction_timeout: Duration::from_secs(30),
568 incremental_updates: true,
569 consistency_level: ConsistencyLevel::Session,
570 conflict_resolution: ConflictResolution::LastWriteWins,
571 multi_tenant: false,
572 cache_config: StoreCacheConfig {
573 enable_vector_cache: true,
574 enable_query_cache: true,
575 cache_size_mb: 512,
576 cache_ttl: Duration::from_secs(3600),
577 enable_compression: true,
578 },
579 streaming_config: StreamingConfig {
580 enable_streaming: true,
581 buffer_size: 10000,
582 flush_interval: Duration::from_millis(100),
583 enable_backpressure: true,
584 max_lag: Duration::from_secs(5),
585 },
586 replication_config: ReplicationConfig {
587 enable_replication: false,
588 replication_factor: 3,
589 synchronous: false,
590 replica_endpoints: Vec::new(),
591 },
592 }
593 }
594}
595
596pub struct VectorStoreWrapper {
598 store: Arc<parking_lot::RwLock<VectorStore>>,
599}
600
601impl VectorStoreTrait for VectorStoreWrapper {
602 fn insert_vector(&mut self, id: VectorId, vector: Vector) -> Result<()> {
603 let mut store = self.store.write();
604 store.insert_vector(id, vector)
605 }
606
607 fn add_vector(&mut self, vector: Vector) -> Result<VectorId> {
608 let mut store = self.store.write();
609 store.add_vector(vector)
610 }
611
612 fn get_vector(&self, id: &VectorId) -> Result<Option<Vector>> {
613 let store = self.store.read();
614 let result = store.get_vector(id);
615 Ok(result.cloned())
616 }
617
618 fn get_all_vector_ids(&self) -> Result<Vec<VectorId>> {
619 let store = self.store.read();
620 store.get_all_vector_ids()
621 }
622
623 fn search_similar(&self, query: &Vector, k: usize) -> Result<Vec<(VectorId, f32)>> {
624 let store = self.store.read();
625 store.search_similar(query, k)
626 }
627
628 fn remove_vector(&mut self, id: &VectorId) -> Result<bool> {
629 let mut store = self.store.write();
630 store.remove_vector(id)?;
631 Ok(true)
632 }
633
634 fn len(&self) -> usize {
635 let _store = self.store.read();
636 0 }
639}
640
641impl IntegratedVectorStore {
642 pub fn new(
643 config: StoreIntegrationConfig,
644 embedding_strategy: EmbeddingStrategy,
645 ) -> Result<Self> {
646 let vector_store = Arc::new(RwLock::new(
647 VectorStore::with_embedding_strategy(embedding_strategy.clone())?.with_config(
648 crate::VectorStoreConfig {
649 auto_embed: true,
650 cache_embeddings: config.cache_config.enable_vector_cache,
651 similarity_threshold: 0.7,
652 max_results: 1000,
653 },
654 ),
655 ));
656
657 let rdf_config = RdfVectorConfig::default();
658 let vector_store_wrapper = VectorStoreWrapper {
660 store: vector_store.clone(),
661 };
662 let vector_store_trait: Arc<std::sync::RwLock<dyn VectorStoreTrait>> =
663 Arc::new(std::sync::RwLock::new(vector_store_wrapper));
664 let rdf_integration = Arc::new(RwLock::new(RdfVectorIntegration::new(
665 rdf_config,
666 vector_store_trait,
667 )));
668
669 let sparql_config = crate::sparql_integration::VectorServiceConfig::default();
670 let sparql_service = Arc::new(RwLock::new(SparqlVectorService::new(
671 sparql_config,
672 embedding_strategy,
673 )?));
674
675 let transaction_manager = Arc::new(TransactionManager::new(config.clone()));
676 let streaming_engine = Arc::new(StreamingEngine::new(config.streaming_config.clone()));
677 let cache_manager = Arc::new(CacheManager::new(config.cache_config.clone()));
678 let replication_manager =
679 Arc::new(ReplicationManager::new(config.replication_config.clone()));
680 let consistency_manager = Arc::new(ConsistencyManager::new(config.consistency_level));
681 let change_log = Arc::new(ChangeLog::new(10000)); let metrics = Arc::new(StoreMetrics::default());
683
684 Ok(Self {
685 config,
686 vector_store,
687 rdf_integration,
688 sparql_service,
689 transaction_manager,
690 streaming_engine,
691 cache_manager,
692 replication_manager,
693 consistency_manager,
694 change_log,
695 metrics,
696 })
697 }
698
699 pub fn begin_transaction(&self, isolation_level: IsolationLevel) -> Result<TransactionId> {
701 let transaction_id = self
702 .transaction_manager
703 .begin_transaction(isolation_level)?;
704 self.metrics
705 .active_transactions
706 .fetch_add(1, Ordering::Relaxed);
707
708 tracing::debug!("Started transaction {}", transaction_id);
709 Ok(transaction_id)
710 }
711
712 pub fn commit_transaction(&self, transaction_id: TransactionId) -> Result<()> {
714 self.transaction_manager
715 .commit_transaction(transaction_id)?;
716 self.metrics
717 .active_transactions
718 .fetch_sub(1, Ordering::Relaxed);
719 self.metrics
720 .committed_transactions
721 .fetch_add(1, Ordering::Relaxed);
722
723 tracing::debug!("Committed transaction {}", transaction_id);
724 Ok(())
725 }
726
727 pub fn abort_transaction(&self, transaction_id: TransactionId) -> Result<()> {
729 self.transaction_manager.abort_transaction(transaction_id)?;
730 self.metrics
731 .active_transactions
732 .fetch_sub(1, Ordering::Relaxed);
733 self.metrics
734 .aborted_transactions
735 .fetch_add(1, Ordering::Relaxed);
736
737 tracing::debug!("Aborted transaction {}", transaction_id);
738 Ok(())
739 }
740
741 pub fn transactional_insert(
743 &self,
744 transaction_id: TransactionId,
745 uri: String,
746 vector: Vector,
747 embedding_content: Option<EmbeddableContent>,
748 ) -> Result<()> {
749 if let Some(cached) = self.cache_manager.get_vector(&uri) {
751 if cached.vector == vector {
752 return Ok(()); }
754 }
755
756 let operation = TransactionOperation::Insert {
757 uri: uri.clone(),
758 vector: vector.clone(),
759 embedding_content,
760 };
761
762 self.transaction_manager
763 .add_operation(transaction_id, operation)?;
764
765 self.cache_manager.cache_vector(uri.clone(), vector.clone());
767
768 let change_entry = ChangeLogEntry {
770 id: self.generate_change_id(),
771 timestamp: SystemTime::now(),
772 operation: ChangeOperation::VectorInserted { uri, vector },
773 metadata: HashMap::new(),
774 transaction_id: Some(transaction_id),
775 };
776 self.change_log.add_entry(change_entry);
777
778 self.metrics
779 .total_operations
780 .fetch_add(1, Ordering::Relaxed);
781 Ok(())
782 }
783
784 pub fn stream_insert(&self, uri: String, vector: Vector, priority: Priority) -> Result<()> {
786 let operation = StreamingOperation::VectorInsert {
787 uri,
788 vector,
789 priority,
790 };
791 self.streaming_engine.submit_operation(operation)?;
792 Ok(())
793 }
794
795 pub fn batch_insert(
797 &self,
798 items: Vec<(String, Vector)>,
799 auto_commit: bool,
800 ) -> Result<TransactionId> {
801 let transaction_id = self.begin_transaction(IsolationLevel::ReadCommitted)?;
802
803 for batch in items.chunks(self.config.batch_size) {
805 let batch_operation = TransactionOperation::BatchInsert {
806 items: batch.to_vec(),
807 };
808 self.transaction_manager
809 .add_operation(transaction_id, batch_operation)?;
810 }
811
812 if auto_commit {
813 self.commit_transaction(transaction_id)?;
814 }
815
816 Ok(transaction_id)
817 }
818
819 pub fn consistent_search(
821 &self,
822 query: &Vector,
823 k: usize,
824 consistency_level: Option<ConsistencyLevel>,
825 ) -> Result<Vec<(String, f32)>> {
826 let effective_consistency = consistency_level.unwrap_or(self.config.consistency_level);
827
828 let query_hash = self.compute_query_hash(query, k);
830 if let Some(cached_result) = self.cache_manager.get_cached_query(&query_hash) {
831 return Ok(cached_result.results);
832 }
833
834 match effective_consistency {
836 ConsistencyLevel::Strong => {
837 self.wait_for_consistency()?;
839 }
840 ConsistencyLevel::Session => {
841 self.ensure_session_consistency()?;
843 }
844 ConsistencyLevel::Causal => {
845 self.ensure_causal_consistency()?;
847 }
848 ConsistencyLevel::Eventual => {
849 }
851 }
852
853 let store = self.vector_store.read();
855 let results = store.similarity_search_vector(query, k)?;
856
857 self.cache_manager
859 .cache_query_result(query_hash, results.clone());
860
861 Ok(results)
862 }
863
864 pub fn rdf_vector_search(
866 &self,
867 rdf_term: &str,
868 k: usize,
869 graph_context: Option<&str>,
870 ) -> Result<Vec<(String, f32)>> {
871 let rdf_integration = self.rdf_integration.read();
872
873 let term = oxirs_core::model::Term::NamedNode(
875 oxirs_core::model::NamedNode::new(rdf_term)
876 .map_err(|e| anyhow!("Invalid IRI: {}", e))?,
877 );
878
879 let graph_name = graph_context
881 .map(|ctx| -> Result<oxirs_core::model::GraphName> {
882 Ok(oxirs_core::model::GraphName::NamedNode(
883 oxirs_core::model::NamedNode::new(ctx)
884 .map_err(|e| anyhow!("Invalid graph IRI: {}", e))?,
885 ))
886 })
887 .transpose()?;
888
889 let results = rdf_integration.find_similar_terms(&term, k, None, graph_name.as_ref())?;
891
892 let converted_results = results
894 .into_iter()
895 .map(|result| (result.term.to_string(), result.score))
896 .collect();
897
898 Ok(converted_results)
899 }
900
901 pub fn sparql_vector_search(
903 &self,
904 _query: &str,
905 bindings: &HashMap<String, String>,
906 ) -> Result<Vec<HashMap<String, String>>> {
907 let _sparql_service = self.sparql_service.read();
908 Ok(vec![bindings.clone()])
911 }
912
913 pub fn subscribe_to_changes(&self, subscriber: Arc<dyn ChangeSubscriber>) -> Result<()> {
915 self.change_log.add_subscriber(subscriber);
916 Ok(())
917 }
918
919 pub fn get_metrics(&self) -> StoreMetrics {
921 StoreMetrics {
922 total_vectors: AtomicU64::new(self.metrics.total_vectors.load(Ordering::Relaxed)),
923 total_operations: AtomicU64::new(self.metrics.total_operations.load(Ordering::Relaxed)),
924 successful_operations: AtomicU64::new(
925 self.metrics.successful_operations.load(Ordering::Relaxed),
926 ),
927 failed_operations: AtomicU64::new(
928 self.metrics.failed_operations.load(Ordering::Relaxed),
929 ),
930 average_operation_time_ms: Arc::new(RwLock::new(
931 *self.metrics.average_operation_time_ms.read(),
932 )),
933 active_transactions: AtomicU64::new(
934 self.metrics.active_transactions.load(Ordering::Relaxed),
935 ),
936 committed_transactions: AtomicU64::new(
937 self.metrics.committed_transactions.load(Ordering::Relaxed),
938 ),
939 aborted_transactions: AtomicU64::new(
940 self.metrics.aborted_transactions.load(Ordering::Relaxed),
941 ),
942 replication_lag_ms: Arc::new(RwLock::new(*self.metrics.replication_lag_ms.read())),
943 consistency_violations: AtomicU64::new(
944 self.metrics.consistency_violations.load(Ordering::Relaxed),
945 ),
946 }
947 }
948
949 pub fn health_check(&self) -> Result<HealthStatus> {
951 let mut issues = Vec::new();
952
953 if self.metrics.active_transactions.load(Ordering::Relaxed) > 1000 {
955 issues.push("High number of active transactions".to_string());
956 }
957
958 let streaming_metrics = self.streaming_engine.get_metrics();
960 if streaming_metrics.operations_pending.load(Ordering::Relaxed) > 10000 {
961 issues.push("High number of pending streaming operations".to_string());
962 }
963
964 let cache_stats = self.cache_manager.get_stats();
966 let hit_ratio = cache_stats.vector_cache_hits.load(Ordering::Relaxed) as f64
967 / (cache_stats.vector_cache_hits.load(Ordering::Relaxed)
968 + cache_stats.vector_cache_misses.load(Ordering::Relaxed)) as f64;
969
970 if hit_ratio < 0.8 {
971 issues.push("Low cache hit ratio".to_string());
972 }
973
974 if self.config.replication_config.enable_replication {
976 let replication_lag = *self.metrics.replication_lag_ms.read();
977 if replication_lag > 1000.0 {
978 issues.push("High replication lag".to_string());
979 }
980 }
981
982 let status = if issues.is_empty() {
983 HealthStatus::Healthy
984 } else if issues.len() <= 2 {
985 HealthStatus::Warning(issues)
986 } else {
987 HealthStatus::Critical(issues)
988 };
989
990 Ok(status)
991 }
992
993 fn generate_change_id(&self) -> u64 {
995 static COUNTER: AtomicU64 = AtomicU64::new(0);
996 COUNTER.fetch_add(1, Ordering::Relaxed)
997 }
998
999 fn compute_query_hash(&self, query: &Vector, k: usize) -> u64 {
1000 use std::collections::hash_map::DefaultHasher;
1001 use std::hash::{Hash, Hasher};
1002
1003 let mut hasher = DefaultHasher::new();
1004 for value in query.as_f32() {
1006 value.to_bits().hash(&mut hasher);
1007 }
1008 k.hash(&mut hasher);
1009 hasher.finish()
1010 }
1011
1012 fn wait_for_consistency(&self) -> Result<()> {
1013 let start = Instant::now();
1015 let timeout = Duration::from_secs(30);
1016
1017 while self.metrics.active_transactions.load(Ordering::Relaxed) > 0 {
1018 if start.elapsed() > timeout {
1019 return Err(anyhow!("Timeout waiting for consistency"));
1020 }
1021 std::thread::sleep(Duration::from_millis(10));
1022 }
1023
1024 Ok(())
1025 }
1026
1027 fn ensure_session_consistency(&self) -> Result<()> {
1028 Ok(())
1031 }
1032
1033 fn ensure_causal_consistency(&self) -> Result<()> {
1034 Ok(())
1037 }
1038}
1039
1040#[derive(Debug, Clone)]
1042pub enum HealthStatus {
1043 Healthy,
1044 Warning(Vec<String>),
1045 Critical(Vec<String>),
1046}
1047
1048impl TransactionManager {
1049 pub fn new(config: StoreIntegrationConfig) -> Self {
1050 Self {
1051 active_transactions: Arc::new(RwLock::new(HashMap::new())),
1052 transaction_counter: AtomicU64::new(1),
1053 config,
1054 write_ahead_log: Arc::new(WriteAheadLog::new()),
1055 lock_manager: Arc::new(LockManager::new()),
1056 }
1057 }
1058
1059 pub fn begin_transaction(&self, isolation_level: IsolationLevel) -> Result<TransactionId> {
1060 let transaction_id = self.transaction_counter.fetch_add(1, Ordering::Relaxed);
1061 let transaction = Transaction {
1062 id: transaction_id,
1063 start_time: SystemTime::now(),
1064 timeout: self.config.transaction_timeout,
1065 operations: Vec::new(),
1066 status: TransactionStatus::Active,
1067 isolation_level,
1068 read_set: HashSet::new(),
1069 write_set: HashSet::new(),
1070 };
1071
1072 let mut active_txns = self.active_transactions.write();
1073 active_txns.insert(transaction_id, transaction);
1074
1075 Ok(transaction_id)
1076 }
1077
1078 pub fn add_operation(
1079 &self,
1080 transaction_id: TransactionId,
1081 operation: TransactionOperation,
1082 ) -> Result<()> {
1083 let mut active_txns = self.active_transactions.write();
1084 let transaction = active_txns
1085 .get_mut(&transaction_id)
1086 .ok_or_else(|| anyhow!("Transaction not found: {}", transaction_id))?;
1087
1088 if transaction.status != TransactionStatus::Active {
1089 return Err(anyhow!("Transaction is not active"));
1090 }
1091
1092 if transaction
1094 .start_time
1095 .elapsed()
1096 .expect("SystemTime should not go backwards")
1097 > transaction.timeout
1098 {
1099 transaction.status = TransactionStatus::Aborted;
1100 return Err(anyhow!("Transaction timeout"));
1101 }
1102
1103 self.acquire_locks_for_operation(transaction_id, &operation)?;
1105
1106 let serializable_op = self.convert_to_serializable(&operation);
1108 self.write_ahead_log
1109 .append(transaction_id, serializable_op)?;
1110
1111 transaction.operations.push(operation);
1112 Ok(())
1113 }
1114
1115 pub fn commit_transaction(&self, transaction_id: TransactionId) -> Result<()> {
1116 let mut active_txns = self.active_transactions.write();
1117 let transaction = active_txns
1118 .remove(&transaction_id)
1119 .ok_or_else(|| anyhow!("Transaction not found: {}", transaction_id))?;
1120
1121 if transaction.status != TransactionStatus::Active {
1122 return Err(anyhow!("Transaction is not active"));
1123 }
1124
1125 self.validate_transaction(&transaction)?;
1127
1128 self.write_ahead_log.append(
1130 transaction_id,
1131 SerializableOperation::Commit { transaction_id },
1132 )?;
1133
1134 for operation in &transaction.operations {
1136 self.execute_operation(operation)?;
1137 }
1138
1139 self.lock_manager.release_transaction_locks(transaction_id);
1141
1142 tracing::debug!("Transaction {} committed successfully", transaction_id);
1143 Ok(())
1144 }
1145
1146 pub fn abort_transaction(&self, transaction_id: TransactionId) -> Result<()> {
1147 let mut active_txns = self.active_transactions.write();
1148 let _transaction = active_txns
1149 .remove(&transaction_id)
1150 .ok_or_else(|| anyhow!("Transaction not found: {}", transaction_id))?;
1151
1152 self.write_ahead_log.append(
1154 transaction_id,
1155 SerializableOperation::Abort { transaction_id },
1156 )?;
1157
1158 self.lock_manager.release_transaction_locks(transaction_id);
1160
1161 tracing::debug!("Transaction {} aborted", transaction_id);
1162 Ok(())
1163 }
1164
1165 fn acquire_locks_for_operation(
1166 &self,
1167 transaction_id: TransactionId,
1168 operation: &TransactionOperation,
1169 ) -> Result<()> {
1170 match operation {
1171 TransactionOperation::Insert { uri, .. } => {
1172 self.lock_manager
1173 .acquire_lock(transaction_id, uri, LockType::Exclusive)?;
1174 }
1175 TransactionOperation::Update { uri, .. } => {
1176 self.lock_manager
1177 .acquire_lock(transaction_id, uri, LockType::Exclusive)?;
1178 }
1179 TransactionOperation::Delete { uri, .. } => {
1180 self.lock_manager
1181 .acquire_lock(transaction_id, uri, LockType::Exclusive)?;
1182 }
1183 TransactionOperation::BatchInsert { items } => {
1184 for (uri, _) in items {
1185 self.lock_manager
1186 .acquire_lock(transaction_id, uri, LockType::Exclusive)?;
1187 }
1188 }
1189 TransactionOperation::IndexRebuild { .. } => {
1190 self.lock_manager
1192 .acquire_lock(transaction_id, "_global_", LockType::Exclusive)?;
1193 }
1194 }
1195 Ok(())
1196 }
1197
1198 fn validate_transaction(&self, _transaction: &Transaction) -> Result<()> {
1199 Ok(())
1202 }
1203
1204 fn execute_operation(&self, _operation: &TransactionOperation) -> Result<()> {
1205 Ok(())
1208 }
1209
1210 fn convert_to_serializable(&self, operation: &TransactionOperation) -> SerializableOperation {
1211 match operation {
1212 TransactionOperation::Insert { uri, vector, .. } => SerializableOperation::Insert {
1213 uri: uri.clone(),
1214 vector_data: vector.as_f32(),
1215 },
1216 TransactionOperation::Update {
1217 uri,
1218 vector,
1219 old_vector,
1220 } => SerializableOperation::Update {
1221 uri: uri.clone(),
1222 new_vector: vector.as_f32(),
1223 old_vector: old_vector.as_ref().map(|v| v.as_f32()),
1224 },
1225 TransactionOperation::Delete { uri, .. } => {
1226 SerializableOperation::Delete { uri: uri.clone() }
1227 }
1228 _ => {
1229 SerializableOperation::Insert {
1231 uri: "batch_operation".to_string(),
1232 vector_data: vec![0.0],
1233 }
1234 }
1235 }
1236 }
1237}
1238
1239impl Default for WriteAheadLog {
1240 fn default() -> Self {
1241 Self::new()
1242 }
1243}
1244
1245impl WriteAheadLog {
1246 pub fn new() -> Self {
1247 Self {
1248 log_entries: Arc::new(RwLock::new(VecDeque::new())),
1249 log_file: None,
1250 checkpoint_interval: Duration::from_secs(60),
1251 last_checkpoint: Arc::new(RwLock::new(SystemTime::now())),
1252 }
1253 }
1254
1255 pub fn append(
1256 &self,
1257 transaction_id: TransactionId,
1258 operation: SerializableOperation,
1259 ) -> Result<()> {
1260 let entry = LogEntry {
1261 lsn: self.generate_lsn(),
1262 transaction_id,
1263 operation,
1264 timestamp: SystemTime::now(),
1265 checksum: 0, };
1267
1268 let mut log = self.log_entries.write();
1269 log.push_back(entry);
1270
1271 if self.should_checkpoint() {
1273 self.checkpoint()?;
1274 }
1275
1276 Ok(())
1277 }
1278
1279 fn generate_lsn(&self) -> u64 {
1280 static LSN_COUNTER: AtomicU64 = AtomicU64::new(0);
1281 LSN_COUNTER.fetch_add(1, Ordering::Relaxed)
1282 }
1283
1284 fn should_checkpoint(&self) -> bool {
1285 let last_checkpoint = *self.last_checkpoint.read();
1286 last_checkpoint
1287 .elapsed()
1288 .expect("SystemTime should not go backwards")
1289 > self.checkpoint_interval
1290 }
1291
1292 fn checkpoint(&self) -> Result<()> {
1293 let mut last_checkpoint = self.last_checkpoint.write();
1295 *last_checkpoint = SystemTime::now();
1296 Ok(())
1297 }
1298}
1299
1300impl Default for LockManager {
1301 fn default() -> Self {
1302 Self::new()
1303 }
1304}
1305
1306impl LockManager {
1307 pub fn new() -> Self {
1308 Self {
1309 locks: Arc::new(RwLock::new(HashMap::new())),
1310 deadlock_detector: Arc::new(DeadlockDetector::new()),
1311 }
1312 }
1313
1314 pub fn acquire_lock(
1315 &self,
1316 transaction_id: TransactionId,
1317 resource: &str,
1318 lock_type: LockType,
1319 ) -> Result<()> {
1320 let mut locks = self.locks.write();
1321 let lock_info = locks
1322 .entry(resource.to_string())
1323 .or_insert_with(|| LockInfo {
1324 lock_type: LockType::Shared,
1325 holders: HashSet::new(),
1326 waiters: VecDeque::new(),
1327 granted_time: SystemTime::now(),
1328 });
1329
1330 if self.can_grant_lock(lock_info, lock_type) {
1332 lock_info.holders.insert(transaction_id);
1333 lock_info.lock_type = lock_type;
1334 lock_info.granted_time = SystemTime::now();
1335 Ok(())
1336 } else {
1337 lock_info.waiters.push_back((transaction_id, lock_type));
1339
1340 self.deadlock_detector.check_deadlock(transaction_id)?;
1342
1343 Err(anyhow!("Lock not available, transaction waiting"))
1344 }
1345 }
1346
1347 pub fn release_transaction_locks(&self, transaction_id: TransactionId) {
1348 let mut locks = self.locks.write();
1349 let mut to_remove = Vec::new();
1350
1351 for (resource, lock_info) in locks.iter_mut() {
1352 lock_info.holders.remove(&transaction_id);
1353
1354 lock_info.waiters.retain(|(tid, _)| *tid != transaction_id);
1356
1357 if lock_info.holders.is_empty() {
1358 to_remove.push(resource.clone());
1359 }
1360 }
1361
1362 for resource in to_remove {
1363 locks.remove(&resource);
1364 }
1365 }
1366
1367 fn can_grant_lock(&self, lock_info: &LockInfo, requested_type: LockType) -> bool {
1368 if lock_info.holders.is_empty() {
1369 return true;
1370 }
1371
1372 matches!(
1373 (lock_info.lock_type, requested_type),
1374 (LockType::Shared, LockType::Shared)
1375 )
1376 }
1377}
1378
1379impl Default for DeadlockDetector {
1380 fn default() -> Self {
1381 Self::new()
1382 }
1383}
1384
1385impl DeadlockDetector {
1386 pub fn new() -> Self {
1387 Self {
1388 wait_for_graph: Arc::new(RwLock::new(HashMap::new())),
1389 detection_interval: Duration::from_secs(1),
1390 }
1391 }
1392
1393 pub fn check_deadlock(&self, _transaction_id: TransactionId) -> Result<()> {
1394 Ok(())
1397 }
1398}
1399
1400impl StreamingEngine {
1401 pub fn new(config: StreamingConfig) -> Self {
1402 Self {
1403 config,
1404 stream_buffer: Arc::new(RwLock::new(VecDeque::new())),
1405 processor_thread: None,
1406 backpressure_controller: Arc::new(BackpressureController::new()),
1407 stream_metrics: Arc::new(StreamingMetrics::default()),
1408 }
1409 }
1410
1411 pub fn submit_operation(&self, operation: StreamingOperation) -> Result<()> {
1412 if self.backpressure_controller.should_apply_backpressure() {
1414 return Err(anyhow!("Backpressure applied, operation rejected"));
1415 }
1416
1417 let mut buffer = self.stream_buffer.write();
1418 buffer.push_back(operation);
1419
1420 self.stream_metrics
1421 .operations_pending
1422 .fetch_add(1, Ordering::Relaxed);
1423 Ok(())
1424 }
1425
1426 pub fn get_metrics(&self) -> &StreamingMetrics {
1427 &self.stream_metrics
1428 }
1429}
1430
1431impl Default for BackpressureController {
1432 fn default() -> Self {
1433 Self::new()
1434 }
1435}
1436
1437impl BackpressureController {
1438 pub fn new() -> Self {
1439 Self {
1440 current_load: Arc::new(RwLock::new(0.0)),
1441 max_load_threshold: 0.8,
1442 adaptive_batching: true,
1443 load_shedding: true,
1444 }
1445 }
1446
1447 pub fn should_apply_backpressure(&self) -> bool {
1448 let load = *self.current_load.read();
1449 load > self.max_load_threshold
1450 }
1451}
1452
1453impl CacheManager {
1454 pub fn new(config: StoreCacheConfig) -> Self {
1455 Self {
1456 vector_cache: Arc::new(RwLock::new(HashMap::new())),
1457 query_cache: Arc::new(RwLock::new(HashMap::new())),
1458 config,
1459 cache_stats: Arc::new(CacheStats::default()),
1460 eviction_policy: EvictionPolicy::LRU,
1461 }
1462 }
1463
1464 pub fn get_vector(&self, uri: &str) -> Option<CachedVector> {
1465 let cache = self.vector_cache.read();
1466 if let Some(cached) = cache.get(uri) {
1467 self.cache_stats
1468 .vector_cache_hits
1469 .fetch_add(1, Ordering::Relaxed);
1470 Some(cached.clone())
1471 } else {
1472 self.cache_stats
1473 .vector_cache_misses
1474 .fetch_add(1, Ordering::Relaxed);
1475 None
1476 }
1477 }
1478
1479 pub fn cache_vector(&self, uri: String, vector: Vector) {
1480 let cached_vector = CachedVector {
1481 vector,
1482 last_accessed: SystemTime::now(),
1483 access_count: 1,
1484 compression_ratio: 1.0,
1485 cache_level: CacheLevel::Memory,
1486 };
1487
1488 let mut cache = self.vector_cache.write();
1489 cache.insert(uri, cached_vector);
1490 }
1491
1492 pub fn get_cached_query(&self, query_hash: &u64) -> Option<CachedQueryResult> {
1493 let cache = self.query_cache.read();
1494 if let Some(cached) = cache.get(&query_hash.to_string()) {
1495 self.cache_stats
1496 .query_cache_hits
1497 .fetch_add(1, Ordering::Relaxed);
1498 Some(cached.clone())
1499 } else {
1500 self.cache_stats
1501 .query_cache_misses
1502 .fetch_add(1, Ordering::Relaxed);
1503 None
1504 }
1505 }
1506
1507 pub fn cache_query_result(&self, query_hash: u64, results: Vec<(String, f32)>) {
1508 let cached_result = CachedQueryResult {
1509 results,
1510 query_hash,
1511 last_accessed: SystemTime::now(),
1512 ttl: self.config.cache_ttl,
1513 hit_count: 0,
1514 };
1515
1516 let mut cache = self.query_cache.write();
1517 cache.insert(query_hash.to_string(), cached_result);
1518 }
1519
1520 pub fn get_stats(&self) -> &CacheStats {
1521 &self.cache_stats
1522 }
1523}
1524
1525impl ReplicationManager {
1526 pub fn new(config: ReplicationConfig) -> Self {
1527 Self {
1528 config,
1529 replicas: Arc::new(RwLock::new(Vec::new())),
1530 replication_log: Arc::new(RwLock::new(VecDeque::new())),
1531 consensus_algorithm: ConsensusAlgorithm::SimpleMajority,
1532 health_checker: Arc::new(HealthChecker::new()),
1533 }
1534 }
1535}
1536
1537impl Default for HealthChecker {
1538 fn default() -> Self {
1539 Self::new()
1540 }
1541}
1542
1543impl HealthChecker {
1544 pub fn new() -> Self {
1545 Self {
1546 check_interval: Duration::from_secs(30),
1547 timeout: Duration::from_secs(5),
1548 failure_threshold: 3,
1549 }
1550 }
1551}
1552
1553impl ConsistencyManager {
1554 pub fn new(consistency_level: ConsistencyLevel) -> Self {
1555 Self {
1556 consistency_level,
1557 vector_clocks: Arc::new(RwLock::new(HashMap::new())),
1558 conflict_resolver: Arc::new(ConflictResolver::new()),
1559 causal_order_tracker: Arc::new(CausalOrderTracker::new()),
1560 }
1561 }
1562}
1563
1564impl Default for ConflictResolver {
1565 fn default() -> Self {
1566 Self::new()
1567 }
1568}
1569
1570impl ConflictResolver {
1571 pub fn new() -> Self {
1572 Self {
1573 strategy: ConflictResolution::LastWriteWins,
1574 custom_resolvers: HashMap::new(),
1575 }
1576 }
1577}
1578
1579impl Default for CausalOrderTracker {
1580 fn default() -> Self {
1581 Self::new()
1582 }
1583}
1584
1585impl CausalOrderTracker {
1586 pub fn new() -> Self {
1587 Self {
1588 happens_before: Arc::new(RwLock::new(HashMap::new())),
1589 }
1590 }
1591}
1592
1593impl ChangeLog {
1594 pub fn new(max_entries: usize) -> Self {
1595 Self {
1596 entries: Arc::new(RwLock::new(VecDeque::new())),
1597 max_entries,
1598 subscribers: Arc::new(RwLock::new(Vec::new())),
1599 }
1600 }
1601
1602 pub fn add_entry(&self, entry: ChangeLogEntry) {
1603 let mut entries = self.entries.write();
1604 entries.push_back(entry.clone());
1605
1606 if entries.len() > self.max_entries {
1608 entries.pop_front();
1609 }
1610
1611 let subscribers = self.subscribers.read();
1613 for subscriber in subscribers.iter() {
1614 let _ = subscriber.on_change(&entry);
1615 }
1616 }
1617
1618 pub fn add_subscriber(&self, subscriber: Arc<dyn ChangeSubscriber>) {
1619 let mut subscribers = self.subscribers.write();
1620 subscribers.push(subscriber);
1621 }
1622}
1623
1624#[cfg(test)]
1625mod tests {
1626 use super::*;
1627
1628 #[test]
1629 fn test_store_integration_config() {
1630 let config = StoreIntegrationConfig::default();
1631 assert!(config.real_time_sync);
1632 assert_eq!(config.batch_size, 1000);
1633 assert!(config.incremental_updates);
1634 }
1635
1636 #[test]
1637 fn test_transaction_lifecycle() {
1638 let config = StoreIntegrationConfig::default();
1639 let tm = TransactionManager::new(config);
1640
1641 let tx_id = tm.begin_transaction(IsolationLevel::ReadCommitted).unwrap();
1642 assert!(tx_id > 0);
1643
1644 let result = tm.commit_transaction(tx_id);
1645 assert!(result.is_ok());
1646 }
1647
1648 #[test]
1649 fn test_cache_manager() {
1650 let config = StoreCacheConfig {
1651 enable_vector_cache: true,
1652 enable_query_cache: true,
1653 cache_size_mb: 128,
1654 cache_ttl: Duration::from_secs(300),
1655 enable_compression: false,
1656 };
1657
1658 let cache_manager = CacheManager::new(config);
1659 let vector = Vector::new(vec![1.0, 2.0, 3.0]);
1660
1661 cache_manager.cache_vector("test_uri".to_string(), vector.clone());
1662 let cached = cache_manager.get_vector("test_uri");
1663
1664 assert!(cached.is_some());
1665 assert_eq!(cached.unwrap().vector, vector);
1666 }
1667
1668 #[test]
1669 fn test_streaming_engine() {
1670 let config = StreamingConfig {
1671 enable_streaming: true,
1672 buffer_size: 1000,
1673 flush_interval: Duration::from_millis(100),
1674 enable_backpressure: true,
1675 max_lag: Duration::from_secs(1),
1676 };
1677
1678 let streaming_engine = StreamingEngine::new(config);
1679 let operation = StreamingOperation::VectorInsert {
1680 uri: "test_uri".to_string(),
1681 vector: Vector::new(vec![1.0, 2.0, 3.0]),
1682 priority: Priority::Normal,
1683 };
1684
1685 let result = streaming_engine.submit_operation(operation);
1686 assert!(result.is_ok());
1687 }
1688
1689 #[test]
1690 fn test_integrated_vector_store() {
1691 let config = StoreIntegrationConfig::default();
1692 let store = IntegratedVectorStore::new(config, EmbeddingStrategy::TfIdf).unwrap();
1693
1694 let tx_id = store
1695 .begin_transaction(IsolationLevel::ReadCommitted)
1696 .unwrap();
1697 assert!(tx_id > 0);
1698
1699 let vector = Vector::new(vec![1.0, 2.0, 3.0]);
1700 let result = store.transactional_insert(tx_id, "test_uri".to_string(), vector, None);
1701 assert!(result.is_ok());
1702
1703 let commit_result = store.commit_transaction(tx_id);
1704 assert!(commit_result.is_ok());
1705 }
1706}