1use crate::{
13 embeddings::{EmbeddableContent, EmbeddingStrategy},
14 rdf_integration::{RdfVectorConfig, RdfVectorIntegration},
15 sparql_integration::SparqlVectorService,
16 Vector, VectorId, VectorStore, VectorStoreTrait,
17};
18use anyhow::{anyhow, Result};
19use parking_lot::RwLock;
20use serde::{Deserialize, Serialize};
21use std::collections::{HashMap, HashSet, VecDeque};
22use std::sync::{
23 atomic::{AtomicU64, Ordering},
24 Arc,
25};
26use std::time::{Duration, Instant, SystemTime};
27
28#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct StoreIntegrationConfig {
31 pub real_time_sync: bool,
33 pub batch_size: usize,
35 pub transaction_timeout: Duration,
37 pub incremental_updates: bool,
39 pub consistency_level: ConsistencyLevel,
41 pub conflict_resolution: ConflictResolution,
43 pub multi_tenant: bool,
45 pub cache_config: StoreCacheConfig,
47 pub streaming_config: StreamingConfig,
49 pub replication_config: ReplicationConfig,
51}
52
53#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
55pub enum ConsistencyLevel {
56 Eventual,
58 Session,
60 Strong,
62 Causal,
64}
65
66#[derive(Debug, Clone, Serialize, Deserialize)]
68pub enum ConflictResolution {
69 LastWriteWins,
71 FirstWriteWins,
73 Merge,
75 Custom(String),
77 Manual,
79}
80
81#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct StoreCacheConfig {
84 pub enable_vector_cache: bool,
86 pub enable_query_cache: bool,
88 pub cache_size_mb: usize,
90 pub cache_ttl: Duration,
92 pub enable_compression: bool,
94}
95
96#[derive(Debug, Clone, Serialize, Deserialize)]
98pub struct StreamingConfig {
99 pub enable_streaming: bool,
101 pub buffer_size: usize,
103 pub flush_interval: Duration,
105 pub enable_backpressure: bool,
107 pub max_lag: Duration,
109}
110
111#[derive(Debug, Clone, Serialize, Deserialize)]
113pub struct ReplicationConfig {
114 pub enable_replication: bool,
116 pub replication_factor: usize,
118 pub synchronous: bool,
120 pub replica_endpoints: Vec<String>,
122}
123
124pub struct IntegratedVectorStore {
126 config: StoreIntegrationConfig,
127 vector_store: Arc<RwLock<VectorStore>>,
128 rdf_integration: Arc<RwLock<RdfVectorIntegration>>,
129 sparql_service: Arc<RwLock<SparqlVectorService>>,
130 transaction_manager: Arc<TransactionManager>,
131 streaming_engine: Arc<StreamingEngine>,
132 cache_manager: Arc<CacheManager>,
133 replication_manager: Arc<ReplicationManager>,
134 consistency_manager: Arc<ConsistencyManager>,
135 change_log: Arc<ChangeLog>,
136 metrics: Arc<StoreMetrics>,
137}
138
139pub struct TransactionManager {
141 active_transactions: Arc<RwLock<HashMap<TransactionId, Transaction>>>,
142 transaction_counter: AtomicU64,
143 config: StoreIntegrationConfig,
144 write_ahead_log: Arc<WriteAheadLog>,
145 lock_manager: Arc<LockManager>,
146}
147
148#[derive(Debug, Clone)]
150pub struct Transaction {
151 pub id: TransactionId,
152 pub start_time: SystemTime,
153 pub timeout: Duration,
154 pub operations: Vec<TransactionOperation>,
155 pub status: TransactionStatus,
156 pub isolation_level: IsolationLevel,
157 pub read_set: HashSet<String>,
158 pub write_set: HashSet<String>,
159}
160
161pub type TransactionId = u64;
162
163#[derive(Debug, Clone, Copy, PartialEq)]
165pub enum TransactionStatus {
166 Active,
167 Committed,
168 Aborted,
169 Preparing,
170 Prepared,
171}
172
173#[derive(Debug, Clone, Copy)]
175pub enum IsolationLevel {
176 ReadUncommitted,
177 ReadCommitted,
178 RepeatableRead,
179 Serializable,
180}
181
182#[derive(Debug, Clone)]
184pub enum TransactionOperation {
185 Insert {
186 uri: String,
187 vector: Vector,
188 embedding_content: Option<EmbeddableContent>,
189 },
190 Update {
191 uri: String,
192 vector: Vector,
193 old_vector: Option<Vector>,
194 },
195 Delete {
196 uri: String,
197 vector: Option<Vector>,
198 },
199 BatchInsert {
200 items: Vec<(String, Vector)>,
201 },
202 IndexRebuild {
203 algorithm: String,
204 parameters: HashMap<String, String>,
205 },
206}
207
208pub struct WriteAheadLog {
210 log_entries: Arc<RwLock<VecDeque<LogEntry>>>,
211 log_file: Option<String>,
212 checkpoint_interval: Duration,
213 last_checkpoint: Arc<RwLock<SystemTime>>,
214}
215
216#[derive(Debug, Clone, Serialize, Deserialize)]
218pub struct LogEntry {
219 pub lsn: u64, pub transaction_id: TransactionId,
221 pub operation: SerializableOperation,
222 pub timestamp: SystemTime,
223 pub checksum: u64,
224}
225
226#[derive(Debug, Clone, Serialize, Deserialize)]
228pub enum SerializableOperation {
229 Insert {
230 uri: String,
231 vector_data: Vec<f32>,
232 },
233 Update {
234 uri: String,
235 new_vector: Vec<f32>,
236 old_vector: Option<Vec<f32>>,
237 },
238 Delete {
239 uri: String,
240 },
241 Commit {
242 transaction_id: TransactionId,
243 },
244 Abort {
245 transaction_id: TransactionId,
246 },
247}
248
249pub struct LockManager {
251 locks: Arc<RwLock<HashMap<String, LockInfo>>>,
252 deadlock_detector: Arc<DeadlockDetector>,
253}
254
255#[derive(Debug, Clone)]
257pub struct LockInfo {
258 pub lock_type: LockType,
259 pub holders: HashSet<TransactionId>,
260 pub waiters: VecDeque<(TransactionId, LockType)>,
261 pub granted_time: SystemTime,
262}
263
264#[derive(Debug, Clone, Copy, PartialEq)]
266pub enum LockType {
267 Shared,
268 Exclusive,
269 IntentionShared,
270 IntentionExclusive,
271 SharedIntentionExclusive,
272}
273
274pub struct DeadlockDetector {
276 wait_for_graph: Arc<RwLock<HashMap<TransactionId, HashSet<TransactionId>>>>,
277 detection_interval: Duration,
278}
279
280pub struct StreamingEngine {
282 config: StreamingConfig,
283 stream_buffer: Arc<RwLock<VecDeque<StreamingOperation>>>,
284 processor_thread: Option<std::thread::JoinHandle<()>>,
285 backpressure_controller: Arc<BackpressureController>,
286 stream_metrics: Arc<StreamingMetrics>,
287}
288
289#[derive(Debug, Clone)]
291pub enum StreamingOperation {
292 VectorInsert {
293 uri: String,
294 vector: Vector,
295 priority: Priority,
296 },
297 VectorUpdate {
298 uri: String,
299 vector: Vector,
300 priority: Priority,
301 },
302 VectorDelete {
303 uri: String,
304 priority: Priority,
305 },
306 EmbeddingRequest {
307 content: EmbeddableContent,
308 uri: String,
309 priority: Priority,
310 },
311 BatchOperation {
312 operations: Vec<StreamingOperation>,
313 },
314}
315
316#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
318pub enum Priority {
319 Low = 0,
320 Normal = 1,
321 High = 2,
322 Critical = 3,
323}
324
325pub struct BackpressureController {
327 current_load: Arc<RwLock<f64>>,
328 max_load_threshold: f64,
329 adaptive_batching: bool,
330 load_shedding: bool,
331}
332
333#[derive(Debug, Default)]
335pub struct StreamingMetrics {
336 pub operations_processed: AtomicU64,
337 pub operations_pending: AtomicU64,
338 pub operations_dropped: AtomicU64,
339 pub average_latency_ms: Arc<RwLock<f64>>,
340 pub throughput_ops_sec: Arc<RwLock<f64>>,
341 pub backpressure_events: AtomicU64,
342}
343
344pub struct CacheManager {
346 vector_cache: Arc<RwLock<HashMap<String, CachedVector>>>,
347 query_cache: Arc<RwLock<HashMap<String, CachedQueryResult>>>,
348 config: StoreCacheConfig,
349 cache_stats: Arc<CacheStats>,
350 eviction_policy: EvictionPolicy,
351}
352
353#[derive(Debug, Clone)]
355pub struct CachedVector {
356 pub vector: Vector,
357 pub last_accessed: SystemTime,
358 pub access_count: u64,
359 pub compression_ratio: f32,
360 pub cache_level: CacheLevel,
361}
362
363#[derive(Debug, Clone, Copy)]
365pub enum CacheLevel {
366 Memory,
367 SSD,
368 Disk,
369}
370
371#[derive(Debug, Clone)]
373pub struct CachedQueryResult {
374 pub results: Vec<(String, f32)>,
375 pub query_hash: u64,
376 pub last_accessed: SystemTime,
377 pub ttl: Duration,
378 pub hit_count: u64,
379}
380
381#[derive(Debug, Default)]
383pub struct CacheStats {
384 pub vector_cache_hits: AtomicU64,
385 pub vector_cache_misses: AtomicU64,
386 pub query_cache_hits: AtomicU64,
387 pub query_cache_misses: AtomicU64,
388 pub evictions: AtomicU64,
389 pub memory_usage_bytes: AtomicU64,
390}
391
392#[derive(Debug, Clone)]
394pub enum EvictionPolicy {
395 LRU,
396 LFU,
397 ARC, TTL,
399 Custom(String),
400}
401
402pub struct ReplicationManager {
404 config: ReplicationConfig,
405 replicas: Arc<RwLock<Vec<ReplicaInfo>>>,
406 replication_log: Arc<RwLock<VecDeque<ReplicationEntry>>>,
407 consensus_algorithm: ConsensusAlgorithm,
408 health_checker: Arc<HealthChecker>,
409}
410
411#[derive(Debug, Clone)]
413pub struct ReplicaInfo {
414 pub endpoint: String,
415 pub status: ReplicaStatus,
416 pub last_sync: SystemTime,
417 pub lag: Duration,
418 pub priority: u8,
419}
420
421#[derive(Debug, Clone, Copy, PartialEq)]
423pub enum ReplicaStatus {
424 Active,
425 Inactive,
426 Synchronizing,
427 Failed,
428 Maintenance,
429}
430
431#[derive(Debug, Clone, Serialize, Deserialize)]
433pub struct ReplicationEntry {
434 pub sequence_number: u64,
435 pub operation: SerializableOperation,
436 pub timestamp: SystemTime,
437 pub source_node: String,
438}
439
440#[derive(Debug, Clone)]
442pub enum ConsensusAlgorithm {
443 Raft,
444 PBFT, SimpleMajority,
446}
447
448pub struct HealthChecker {
450 check_interval: Duration,
451 timeout: Duration,
452 failure_threshold: u32,
453}
454
455pub struct ConsistencyManager {
457 consistency_level: ConsistencyLevel,
458 vector_clocks: Arc<RwLock<HashMap<String, VectorClock>>>,
459 conflict_resolver: Arc<ConflictResolver>,
460 causal_order_tracker: Arc<CausalOrderTracker>,
461}
462
463#[derive(Debug, Clone, Default)]
465pub struct VectorClock {
466 pub clocks: HashMap<String, u64>,
467}
468
469pub struct ConflictResolver {
471 strategy: ConflictResolution,
472 custom_resolvers: HashMap<String, Box<dyn ConflictResolverTrait>>,
473}
474
475pub trait ConflictResolverTrait: Send + Sync {
476 fn resolve_conflict(
477 &self,
478 local: &Vector,
479 remote: &Vector,
480 metadata: &ConflictMetadata,
481 ) -> Result<Vector>;
482}
483
484#[derive(Debug, Clone)]
486pub struct ConflictMetadata {
487 pub local_timestamp: SystemTime,
488 pub remote_timestamp: SystemTime,
489 pub local_version: u64,
490 pub remote_version: u64,
491 pub operation_type: String,
492}
493
494pub struct CausalOrderTracker {
496 happens_before: Arc<RwLock<HashMap<String, HashSet<String>>>>,
497}
498
499pub struct ChangeLog {
501 entries: Arc<RwLock<VecDeque<ChangeLogEntry>>>,
502 max_entries: usize,
503 subscribers: Arc<RwLock<Vec<Arc<dyn ChangeSubscriber>>>>,
504}
505
506#[derive(Debug, Clone)]
508pub struct ChangeLogEntry {
509 pub id: u64,
510 pub timestamp: SystemTime,
511 pub operation: ChangeOperation,
512 pub metadata: HashMap<String, String>,
513 pub transaction_id: Option<TransactionId>,
514}
515
516#[derive(Debug, Clone)]
518pub enum ChangeOperation {
519 VectorInserted {
520 uri: String,
521 vector: Vector,
522 },
523 VectorUpdated {
524 uri: String,
525 old_vector: Vector,
526 new_vector: Vector,
527 },
528 VectorDeleted {
529 uri: String,
530 vector: Vector,
531 },
532 IndexRebuilt {
533 algorithm: String,
534 },
535 ConfigurationChanged {
536 changes: HashMap<String, String>,
537 },
538}
539
540pub trait ChangeSubscriber: Send + Sync {
542 fn on_change(&self, entry: &ChangeLogEntry) -> Result<()>;
543 fn subscriber_id(&self) -> String;
544 fn interest_patterns(&self) -> Vec<String>;
545}
546
547#[derive(Debug, Default)]
549pub struct StoreMetrics {
550 pub total_vectors: AtomicU64,
551 pub total_operations: AtomicU64,
552 pub successful_operations: AtomicU64,
553 pub failed_operations: AtomicU64,
554 pub average_operation_time_ms: Arc<RwLock<f64>>,
555 pub active_transactions: AtomicU64,
556 pub committed_transactions: AtomicU64,
557 pub aborted_transactions: AtomicU64,
558 pub replication_lag_ms: Arc<RwLock<f64>>,
559 pub consistency_violations: AtomicU64,
560}
561
562impl Default for StoreIntegrationConfig {
563 fn default() -> Self {
564 Self {
565 real_time_sync: true,
566 batch_size: 1000,
567 transaction_timeout: Duration::from_secs(30),
568 incremental_updates: true,
569 consistency_level: ConsistencyLevel::Session,
570 conflict_resolution: ConflictResolution::LastWriteWins,
571 multi_tenant: false,
572 cache_config: StoreCacheConfig {
573 enable_vector_cache: true,
574 enable_query_cache: true,
575 cache_size_mb: 512,
576 cache_ttl: Duration::from_secs(3600),
577 enable_compression: true,
578 },
579 streaming_config: StreamingConfig {
580 enable_streaming: true,
581 buffer_size: 10000,
582 flush_interval: Duration::from_millis(100),
583 enable_backpressure: true,
584 max_lag: Duration::from_secs(5),
585 },
586 replication_config: ReplicationConfig {
587 enable_replication: false,
588 replication_factor: 3,
589 synchronous: false,
590 replica_endpoints: Vec::new(),
591 },
592 }
593 }
594}
595
596pub struct VectorStoreWrapper {
598 store: Arc<parking_lot::RwLock<VectorStore>>,
599}
600
601impl VectorStoreTrait for VectorStoreWrapper {
602 fn insert_vector(&mut self, id: VectorId, vector: Vector) -> Result<()> {
603 let mut store = self.store.write();
604 store.insert_vector(id, vector)
605 }
606
607 fn add_vector(&mut self, vector: Vector) -> Result<VectorId> {
608 let mut store = self.store.write();
609 store.add_vector(vector)
610 }
611
612 fn get_vector(&self, id: &VectorId) -> Result<Option<Vector>> {
613 let store = self.store.read();
614 let result = store.get_vector(id);
615 Ok(result.cloned())
616 }
617
618 fn get_all_vector_ids(&self) -> Result<Vec<VectorId>> {
619 let store = self.store.read();
620 store.get_all_vector_ids()
621 }
622
623 fn search_similar(&self, query: &Vector, k: usize) -> Result<Vec<(VectorId, f32)>> {
624 let store = self.store.read();
625 store.search_similar(query, k)
626 }
627
628 fn remove_vector(&mut self, id: &VectorId) -> Result<bool> {
629 let mut store = self.store.write();
630 store.remove_vector(id)
631 }
632
633 fn len(&self) -> usize {
634 let _store = self.store.read();
635 0 }
638}
639
640impl IntegratedVectorStore {
641 pub fn new(
642 config: StoreIntegrationConfig,
643 embedding_strategy: EmbeddingStrategy,
644 ) -> Result<Self> {
645 let vector_store = Arc::new(RwLock::new(
646 VectorStore::with_embedding_strategy(embedding_strategy.clone())?.with_config(
647 crate::VectorStoreConfig {
648 auto_embed: true,
649 cache_embeddings: config.cache_config.enable_vector_cache,
650 similarity_threshold: 0.7,
651 max_results: 1000,
652 },
653 ),
654 ));
655
656 let rdf_config = RdfVectorConfig::default();
657 let vector_store_wrapper = VectorStoreWrapper {
659 store: vector_store.clone(),
660 };
661 let vector_store_trait: Arc<std::sync::RwLock<dyn VectorStoreTrait>> =
662 Arc::new(std::sync::RwLock::new(vector_store_wrapper));
663 let rdf_integration = Arc::new(RwLock::new(RdfVectorIntegration::new(
664 rdf_config,
665 vector_store_trait,
666 )));
667
668 let sparql_config = crate::sparql_integration::VectorServiceConfig::default();
669 let sparql_service = Arc::new(RwLock::new(SparqlVectorService::new(
670 sparql_config,
671 embedding_strategy,
672 )?));
673
674 let transaction_manager = Arc::new(TransactionManager::new(config.clone()));
675 let streaming_engine = Arc::new(StreamingEngine::new(config.streaming_config.clone()));
676 let cache_manager = Arc::new(CacheManager::new(config.cache_config.clone()));
677 let replication_manager =
678 Arc::new(ReplicationManager::new(config.replication_config.clone()));
679 let consistency_manager = Arc::new(ConsistencyManager::new(config.consistency_level));
680 let change_log = Arc::new(ChangeLog::new(10000)); let metrics = Arc::new(StoreMetrics::default());
682
683 Ok(Self {
684 config,
685 vector_store,
686 rdf_integration,
687 sparql_service,
688 transaction_manager,
689 streaming_engine,
690 cache_manager,
691 replication_manager,
692 consistency_manager,
693 change_log,
694 metrics,
695 })
696 }
697
698 pub fn begin_transaction(&self, isolation_level: IsolationLevel) -> Result<TransactionId> {
700 let transaction_id = self
701 .transaction_manager
702 .begin_transaction(isolation_level)?;
703 self.metrics
704 .active_transactions
705 .fetch_add(1, Ordering::Relaxed);
706
707 tracing::debug!("Started transaction {}", transaction_id);
708 Ok(transaction_id)
709 }
710
711 pub fn commit_transaction(&self, transaction_id: TransactionId) -> Result<()> {
713 self.transaction_manager
714 .commit_transaction(transaction_id)?;
715 self.metrics
716 .active_transactions
717 .fetch_sub(1, Ordering::Relaxed);
718 self.metrics
719 .committed_transactions
720 .fetch_add(1, Ordering::Relaxed);
721
722 tracing::debug!("Committed transaction {}", transaction_id);
723 Ok(())
724 }
725
726 pub fn abort_transaction(&self, transaction_id: TransactionId) -> Result<()> {
728 self.transaction_manager.abort_transaction(transaction_id)?;
729 self.metrics
730 .active_transactions
731 .fetch_sub(1, Ordering::Relaxed);
732 self.metrics
733 .aborted_transactions
734 .fetch_add(1, Ordering::Relaxed);
735
736 tracing::debug!("Aborted transaction {}", transaction_id);
737 Ok(())
738 }
739
740 pub fn transactional_insert(
742 &self,
743 transaction_id: TransactionId,
744 uri: String,
745 vector: Vector,
746 embedding_content: Option<EmbeddableContent>,
747 ) -> Result<()> {
748 if let Some(cached) = self.cache_manager.get_vector(&uri) {
750 if cached.vector == vector {
751 return Ok(()); }
753 }
754
755 let operation = TransactionOperation::Insert {
756 uri: uri.clone(),
757 vector: vector.clone(),
758 embedding_content,
759 };
760
761 self.transaction_manager
762 .add_operation(transaction_id, operation)?;
763
764 self.cache_manager.cache_vector(uri.clone(), vector.clone());
766
767 let change_entry = ChangeLogEntry {
769 id: self.generate_change_id(),
770 timestamp: SystemTime::now(),
771 operation: ChangeOperation::VectorInserted { uri, vector },
772 metadata: HashMap::new(),
773 transaction_id: Some(transaction_id),
774 };
775 self.change_log.add_entry(change_entry);
776
777 self.metrics
778 .total_operations
779 .fetch_add(1, Ordering::Relaxed);
780 Ok(())
781 }
782
783 pub fn stream_insert(&self, uri: String, vector: Vector, priority: Priority) -> Result<()> {
785 let operation = StreamingOperation::VectorInsert {
786 uri,
787 vector,
788 priority,
789 };
790 self.streaming_engine.submit_operation(operation)?;
791 Ok(())
792 }
793
794 pub fn batch_insert(
796 &self,
797 items: Vec<(String, Vector)>,
798 auto_commit: bool,
799 ) -> Result<TransactionId> {
800 let transaction_id = self.begin_transaction(IsolationLevel::ReadCommitted)?;
801
802 for batch in items.chunks(self.config.batch_size) {
804 let batch_operation = TransactionOperation::BatchInsert {
805 items: batch.to_vec(),
806 };
807 self.transaction_manager
808 .add_operation(transaction_id, batch_operation)?;
809 }
810
811 if auto_commit {
812 self.commit_transaction(transaction_id)?;
813 }
814
815 Ok(transaction_id)
816 }
817
818 pub fn consistent_search(
820 &self,
821 query: &Vector,
822 k: usize,
823 consistency_level: Option<ConsistencyLevel>,
824 ) -> Result<Vec<(String, f32)>> {
825 let effective_consistency = consistency_level.unwrap_or(self.config.consistency_level);
826
827 let query_hash = self.compute_query_hash(query, k);
829 if let Some(cached_result) = self.cache_manager.get_cached_query(&query_hash) {
830 return Ok(cached_result.results);
831 }
832
833 match effective_consistency {
835 ConsistencyLevel::Strong => {
836 self.wait_for_consistency()?;
838 }
839 ConsistencyLevel::Session => {
840 self.ensure_session_consistency()?;
842 }
843 ConsistencyLevel::Causal => {
844 self.ensure_causal_consistency()?;
846 }
847 ConsistencyLevel::Eventual => {
848 }
850 }
851
852 let store = self.vector_store.read();
854 let results = store.similarity_search_vector(query, k)?;
855
856 self.cache_manager
858 .cache_query_result(query_hash, results.clone());
859
860 Ok(results)
861 }
862
863 pub fn rdf_vector_search(
865 &self,
866 rdf_term: &str,
867 k: usize,
868 graph_context: Option<&str>,
869 ) -> Result<Vec<(String, f32)>> {
870 let rdf_integration = self.rdf_integration.read();
871
872 let term = oxirs_core::model::Term::NamedNode(
874 oxirs_core::model::NamedNode::new(rdf_term)
875 .map_err(|e| anyhow!("Invalid IRI: {}", e))?,
876 );
877
878 let graph_name = graph_context
880 .map(|ctx| -> Result<oxirs_core::model::GraphName> {
881 Ok(oxirs_core::model::GraphName::NamedNode(
882 oxirs_core::model::NamedNode::new(ctx)
883 .map_err(|e| anyhow!("Invalid graph IRI: {}", e))?,
884 ))
885 })
886 .transpose()?;
887
888 let results = rdf_integration.find_similar_terms(&term, k, None, graph_name.as_ref())?;
890
891 let converted_results = results
893 .into_iter()
894 .map(|result| (result.term.to_string(), result.score))
895 .collect();
896
897 Ok(converted_results)
898 }
899
900 pub fn sparql_vector_search(
902 &self,
903 _query: &str,
904 bindings: &HashMap<String, String>,
905 ) -> Result<Vec<HashMap<String, String>>> {
906 let _sparql_service = self.sparql_service.read();
907 Ok(vec![bindings.clone()])
910 }
911
912 pub fn subscribe_to_changes(&self, subscriber: Arc<dyn ChangeSubscriber>) -> Result<()> {
914 self.change_log.add_subscriber(subscriber);
915 Ok(())
916 }
917
918 pub fn get_metrics(&self) -> StoreMetrics {
920 StoreMetrics {
921 total_vectors: AtomicU64::new(self.metrics.total_vectors.load(Ordering::Relaxed)),
922 total_operations: AtomicU64::new(self.metrics.total_operations.load(Ordering::Relaxed)),
923 successful_operations: AtomicU64::new(
924 self.metrics.successful_operations.load(Ordering::Relaxed),
925 ),
926 failed_operations: AtomicU64::new(
927 self.metrics.failed_operations.load(Ordering::Relaxed),
928 ),
929 average_operation_time_ms: Arc::new(RwLock::new(
930 *self.metrics.average_operation_time_ms.read(),
931 )),
932 active_transactions: AtomicU64::new(
933 self.metrics.active_transactions.load(Ordering::Relaxed),
934 ),
935 committed_transactions: AtomicU64::new(
936 self.metrics.committed_transactions.load(Ordering::Relaxed),
937 ),
938 aborted_transactions: AtomicU64::new(
939 self.metrics.aborted_transactions.load(Ordering::Relaxed),
940 ),
941 replication_lag_ms: Arc::new(RwLock::new(*self.metrics.replication_lag_ms.read())),
942 consistency_violations: AtomicU64::new(
943 self.metrics.consistency_violations.load(Ordering::Relaxed),
944 ),
945 }
946 }
947
948 pub fn health_check(&self) -> Result<HealthStatus> {
950 let mut issues = Vec::new();
951
952 if self.metrics.active_transactions.load(Ordering::Relaxed) > 1000 {
954 issues.push("High number of active transactions".to_string());
955 }
956
957 let streaming_metrics = self.streaming_engine.get_metrics();
959 if streaming_metrics.operations_pending.load(Ordering::Relaxed) > 10000 {
960 issues.push("High number of pending streaming operations".to_string());
961 }
962
963 let cache_stats = self.cache_manager.get_stats();
965 let hit_ratio = cache_stats.vector_cache_hits.load(Ordering::Relaxed) as f64
966 / (cache_stats.vector_cache_hits.load(Ordering::Relaxed)
967 + cache_stats.vector_cache_misses.load(Ordering::Relaxed)) as f64;
968
969 if hit_ratio < 0.8 {
970 issues.push("Low cache hit ratio".to_string());
971 }
972
973 if self.config.replication_config.enable_replication {
975 let replication_lag = *self.metrics.replication_lag_ms.read();
976 if replication_lag > 1000.0 {
977 issues.push("High replication lag".to_string());
978 }
979 }
980
981 let status = if issues.is_empty() {
982 HealthStatus::Healthy
983 } else if issues.len() <= 2 {
984 HealthStatus::Warning(issues)
985 } else {
986 HealthStatus::Critical(issues)
987 };
988
989 Ok(status)
990 }
991
992 fn generate_change_id(&self) -> u64 {
994 static COUNTER: AtomicU64 = AtomicU64::new(0);
995 COUNTER.fetch_add(1, Ordering::Relaxed)
996 }
997
998 fn compute_query_hash(&self, query: &Vector, k: usize) -> u64 {
999 use std::collections::hash_map::DefaultHasher;
1000 use std::hash::{Hash, Hasher};
1001
1002 let mut hasher = DefaultHasher::new();
1003 for value in query.as_f32() {
1005 value.to_bits().hash(&mut hasher);
1006 }
1007 k.hash(&mut hasher);
1008 hasher.finish()
1009 }
1010
1011 fn wait_for_consistency(&self) -> Result<()> {
1012 let start = Instant::now();
1014 let timeout = Duration::from_secs(30);
1015
1016 while self.metrics.active_transactions.load(Ordering::Relaxed) > 0 {
1017 if start.elapsed() > timeout {
1018 return Err(anyhow!("Timeout waiting for consistency"));
1019 }
1020 std::thread::sleep(Duration::from_millis(10));
1021 }
1022
1023 Ok(())
1024 }
1025
1026 fn ensure_session_consistency(&self) -> Result<()> {
1027 Ok(())
1030 }
1031
1032 fn ensure_causal_consistency(&self) -> Result<()> {
1033 Ok(())
1036 }
1037}
1038
1039#[derive(Debug, Clone)]
1041pub enum HealthStatus {
1042 Healthy,
1043 Warning(Vec<String>),
1044 Critical(Vec<String>),
1045}
1046
1047impl TransactionManager {
1048 pub fn new(config: StoreIntegrationConfig) -> Self {
1049 Self {
1050 active_transactions: Arc::new(RwLock::new(HashMap::new())),
1051 transaction_counter: AtomicU64::new(1),
1052 config,
1053 write_ahead_log: Arc::new(WriteAheadLog::new()),
1054 lock_manager: Arc::new(LockManager::new()),
1055 }
1056 }
1057
1058 pub fn begin_transaction(&self, isolation_level: IsolationLevel) -> Result<TransactionId> {
1059 let transaction_id = self.transaction_counter.fetch_add(1, Ordering::Relaxed);
1060 let transaction = Transaction {
1061 id: transaction_id,
1062 start_time: SystemTime::now(),
1063 timeout: self.config.transaction_timeout,
1064 operations: Vec::new(),
1065 status: TransactionStatus::Active,
1066 isolation_level,
1067 read_set: HashSet::new(),
1068 write_set: HashSet::new(),
1069 };
1070
1071 let mut active_txns = self.active_transactions.write();
1072 active_txns.insert(transaction_id, transaction);
1073
1074 Ok(transaction_id)
1075 }
1076
1077 pub fn add_operation(
1078 &self,
1079 transaction_id: TransactionId,
1080 operation: TransactionOperation,
1081 ) -> Result<()> {
1082 let mut active_txns = self.active_transactions.write();
1083 let transaction = active_txns
1084 .get_mut(&transaction_id)
1085 .ok_or_else(|| anyhow!("Transaction not found: {}", transaction_id))?;
1086
1087 if transaction.status != TransactionStatus::Active {
1088 return Err(anyhow!("Transaction is not active"));
1089 }
1090
1091 if transaction.start_time.elapsed().unwrap() > transaction.timeout {
1093 transaction.status = TransactionStatus::Aborted;
1094 return Err(anyhow!("Transaction timeout"));
1095 }
1096
1097 self.acquire_locks_for_operation(transaction_id, &operation)?;
1099
1100 let serializable_op = self.convert_to_serializable(&operation);
1102 self.write_ahead_log
1103 .append(transaction_id, serializable_op)?;
1104
1105 transaction.operations.push(operation);
1106 Ok(())
1107 }
1108
1109 pub fn commit_transaction(&self, transaction_id: TransactionId) -> Result<()> {
1110 let mut active_txns = self.active_transactions.write();
1111 let transaction = active_txns
1112 .remove(&transaction_id)
1113 .ok_or_else(|| anyhow!("Transaction not found: {}", transaction_id))?;
1114
1115 if transaction.status != TransactionStatus::Active {
1116 return Err(anyhow!("Transaction is not active"));
1117 }
1118
1119 self.validate_transaction(&transaction)?;
1121
1122 self.write_ahead_log.append(
1124 transaction_id,
1125 SerializableOperation::Commit { transaction_id },
1126 )?;
1127
1128 for operation in &transaction.operations {
1130 self.execute_operation(operation)?;
1131 }
1132
1133 self.lock_manager.release_transaction_locks(transaction_id);
1135
1136 tracing::debug!("Transaction {} committed successfully", transaction_id);
1137 Ok(())
1138 }
1139
1140 pub fn abort_transaction(&self, transaction_id: TransactionId) -> Result<()> {
1141 let mut active_txns = self.active_transactions.write();
1142 let _transaction = active_txns
1143 .remove(&transaction_id)
1144 .ok_or_else(|| anyhow!("Transaction not found: {}", transaction_id))?;
1145
1146 self.write_ahead_log.append(
1148 transaction_id,
1149 SerializableOperation::Abort { transaction_id },
1150 )?;
1151
1152 self.lock_manager.release_transaction_locks(transaction_id);
1154
1155 tracing::debug!("Transaction {} aborted", transaction_id);
1156 Ok(())
1157 }
1158
1159 fn acquire_locks_for_operation(
1160 &self,
1161 transaction_id: TransactionId,
1162 operation: &TransactionOperation,
1163 ) -> Result<()> {
1164 match operation {
1165 TransactionOperation::Insert { uri, .. } => {
1166 self.lock_manager
1167 .acquire_lock(transaction_id, uri, LockType::Exclusive)?;
1168 }
1169 TransactionOperation::Update { uri, .. } => {
1170 self.lock_manager
1171 .acquire_lock(transaction_id, uri, LockType::Exclusive)?;
1172 }
1173 TransactionOperation::Delete { uri, .. } => {
1174 self.lock_manager
1175 .acquire_lock(transaction_id, uri, LockType::Exclusive)?;
1176 }
1177 TransactionOperation::BatchInsert { items } => {
1178 for (uri, _) in items {
1179 self.lock_manager
1180 .acquire_lock(transaction_id, uri, LockType::Exclusive)?;
1181 }
1182 }
1183 TransactionOperation::IndexRebuild { .. } => {
1184 self.lock_manager
1186 .acquire_lock(transaction_id, "_global_", LockType::Exclusive)?;
1187 }
1188 }
1189 Ok(())
1190 }
1191
1192 fn validate_transaction(&self, _transaction: &Transaction) -> Result<()> {
1193 Ok(())
1196 }
1197
1198 fn execute_operation(&self, _operation: &TransactionOperation) -> Result<()> {
1199 Ok(())
1202 }
1203
1204 fn convert_to_serializable(&self, operation: &TransactionOperation) -> SerializableOperation {
1205 match operation {
1206 TransactionOperation::Insert { uri, vector, .. } => SerializableOperation::Insert {
1207 uri: uri.clone(),
1208 vector_data: vector.as_f32(),
1209 },
1210 TransactionOperation::Update {
1211 uri,
1212 vector,
1213 old_vector,
1214 } => SerializableOperation::Update {
1215 uri: uri.clone(),
1216 new_vector: vector.as_f32(),
1217 old_vector: old_vector.as_ref().map(|v| v.as_f32()),
1218 },
1219 TransactionOperation::Delete { uri, .. } => {
1220 SerializableOperation::Delete { uri: uri.clone() }
1221 }
1222 _ => {
1223 SerializableOperation::Insert {
1225 uri: "batch_operation".to_string(),
1226 vector_data: vec![0.0],
1227 }
1228 }
1229 }
1230 }
1231}
1232
1233impl Default for WriteAheadLog {
1234 fn default() -> Self {
1235 Self::new()
1236 }
1237}
1238
1239impl WriteAheadLog {
1240 pub fn new() -> Self {
1241 Self {
1242 log_entries: Arc::new(RwLock::new(VecDeque::new())),
1243 log_file: None,
1244 checkpoint_interval: Duration::from_secs(60),
1245 last_checkpoint: Arc::new(RwLock::new(SystemTime::now())),
1246 }
1247 }
1248
1249 pub fn append(
1250 &self,
1251 transaction_id: TransactionId,
1252 operation: SerializableOperation,
1253 ) -> Result<()> {
1254 let entry = LogEntry {
1255 lsn: self.generate_lsn(),
1256 transaction_id,
1257 operation,
1258 timestamp: SystemTime::now(),
1259 checksum: 0, };
1261
1262 let mut log = self.log_entries.write();
1263 log.push_back(entry);
1264
1265 if self.should_checkpoint() {
1267 self.checkpoint()?;
1268 }
1269
1270 Ok(())
1271 }
1272
1273 fn generate_lsn(&self) -> u64 {
1274 static LSN_COUNTER: AtomicU64 = AtomicU64::new(0);
1275 LSN_COUNTER.fetch_add(1, Ordering::Relaxed)
1276 }
1277
1278 fn should_checkpoint(&self) -> bool {
1279 let last_checkpoint = *self.last_checkpoint.read();
1280 last_checkpoint.elapsed().unwrap() > self.checkpoint_interval
1281 }
1282
1283 fn checkpoint(&self) -> Result<()> {
1284 let mut last_checkpoint = self.last_checkpoint.write();
1286 *last_checkpoint = SystemTime::now();
1287 Ok(())
1288 }
1289}
1290
1291impl Default for LockManager {
1292 fn default() -> Self {
1293 Self::new()
1294 }
1295}
1296
1297impl LockManager {
1298 pub fn new() -> Self {
1299 Self {
1300 locks: Arc::new(RwLock::new(HashMap::new())),
1301 deadlock_detector: Arc::new(DeadlockDetector::new()),
1302 }
1303 }
1304
1305 pub fn acquire_lock(
1306 &self,
1307 transaction_id: TransactionId,
1308 resource: &str,
1309 lock_type: LockType,
1310 ) -> Result<()> {
1311 let mut locks = self.locks.write();
1312 let lock_info = locks
1313 .entry(resource.to_string())
1314 .or_insert_with(|| LockInfo {
1315 lock_type: LockType::Shared,
1316 holders: HashSet::new(),
1317 waiters: VecDeque::new(),
1318 granted_time: SystemTime::now(),
1319 });
1320
1321 if self.can_grant_lock(lock_info, lock_type) {
1323 lock_info.holders.insert(transaction_id);
1324 lock_info.lock_type = lock_type;
1325 lock_info.granted_time = SystemTime::now();
1326 Ok(())
1327 } else {
1328 lock_info.waiters.push_back((transaction_id, lock_type));
1330
1331 self.deadlock_detector.check_deadlock(transaction_id)?;
1333
1334 Err(anyhow!("Lock not available, transaction waiting"))
1335 }
1336 }
1337
1338 pub fn release_transaction_locks(&self, transaction_id: TransactionId) {
1339 let mut locks = self.locks.write();
1340 let mut to_remove = Vec::new();
1341
1342 for (resource, lock_info) in locks.iter_mut() {
1343 lock_info.holders.remove(&transaction_id);
1344
1345 lock_info.waiters.retain(|(tid, _)| *tid != transaction_id);
1347
1348 if lock_info.holders.is_empty() {
1349 to_remove.push(resource.clone());
1350 }
1351 }
1352
1353 for resource in to_remove {
1354 locks.remove(&resource);
1355 }
1356 }
1357
1358 fn can_grant_lock(&self, lock_info: &LockInfo, requested_type: LockType) -> bool {
1359 if lock_info.holders.is_empty() {
1360 return true;
1361 }
1362
1363 matches!(
1364 (lock_info.lock_type, requested_type),
1365 (LockType::Shared, LockType::Shared)
1366 )
1367 }
1368}
1369
1370impl Default for DeadlockDetector {
1371 fn default() -> Self {
1372 Self::new()
1373 }
1374}
1375
1376impl DeadlockDetector {
1377 pub fn new() -> Self {
1378 Self {
1379 wait_for_graph: Arc::new(RwLock::new(HashMap::new())),
1380 detection_interval: Duration::from_secs(1),
1381 }
1382 }
1383
1384 pub fn check_deadlock(&self, _transaction_id: TransactionId) -> Result<()> {
1385 Ok(())
1388 }
1389}
1390
1391impl StreamingEngine {
1392 pub fn new(config: StreamingConfig) -> Self {
1393 Self {
1394 config,
1395 stream_buffer: Arc::new(RwLock::new(VecDeque::new())),
1396 processor_thread: None,
1397 backpressure_controller: Arc::new(BackpressureController::new()),
1398 stream_metrics: Arc::new(StreamingMetrics::default()),
1399 }
1400 }
1401
1402 pub fn submit_operation(&self, operation: StreamingOperation) -> Result<()> {
1403 if self.backpressure_controller.should_apply_backpressure() {
1405 return Err(anyhow!("Backpressure applied, operation rejected"));
1406 }
1407
1408 let mut buffer = self.stream_buffer.write();
1409 buffer.push_back(operation);
1410
1411 self.stream_metrics
1412 .operations_pending
1413 .fetch_add(1, Ordering::Relaxed);
1414 Ok(())
1415 }
1416
1417 pub fn get_metrics(&self) -> &StreamingMetrics {
1418 &self.stream_metrics
1419 }
1420}
1421
1422impl Default for BackpressureController {
1423 fn default() -> Self {
1424 Self::new()
1425 }
1426}
1427
1428impl BackpressureController {
1429 pub fn new() -> Self {
1430 Self {
1431 current_load: Arc::new(RwLock::new(0.0)),
1432 max_load_threshold: 0.8,
1433 adaptive_batching: true,
1434 load_shedding: true,
1435 }
1436 }
1437
1438 pub fn should_apply_backpressure(&self) -> bool {
1439 let load = *self.current_load.read();
1440 load > self.max_load_threshold
1441 }
1442}
1443
1444impl CacheManager {
1445 pub fn new(config: StoreCacheConfig) -> Self {
1446 Self {
1447 vector_cache: Arc::new(RwLock::new(HashMap::new())),
1448 query_cache: Arc::new(RwLock::new(HashMap::new())),
1449 config,
1450 cache_stats: Arc::new(CacheStats::default()),
1451 eviction_policy: EvictionPolicy::LRU,
1452 }
1453 }
1454
1455 pub fn get_vector(&self, uri: &str) -> Option<CachedVector> {
1456 let cache = self.vector_cache.read();
1457 if let Some(cached) = cache.get(uri) {
1458 self.cache_stats
1459 .vector_cache_hits
1460 .fetch_add(1, Ordering::Relaxed);
1461 Some(cached.clone())
1462 } else {
1463 self.cache_stats
1464 .vector_cache_misses
1465 .fetch_add(1, Ordering::Relaxed);
1466 None
1467 }
1468 }
1469
1470 pub fn cache_vector(&self, uri: String, vector: Vector) {
1471 let cached_vector = CachedVector {
1472 vector,
1473 last_accessed: SystemTime::now(),
1474 access_count: 1,
1475 compression_ratio: 1.0,
1476 cache_level: CacheLevel::Memory,
1477 };
1478
1479 let mut cache = self.vector_cache.write();
1480 cache.insert(uri, cached_vector);
1481 }
1482
1483 pub fn get_cached_query(&self, query_hash: &u64) -> Option<CachedQueryResult> {
1484 let cache = self.query_cache.read();
1485 if let Some(cached) = cache.get(&query_hash.to_string()) {
1486 self.cache_stats
1487 .query_cache_hits
1488 .fetch_add(1, Ordering::Relaxed);
1489 Some(cached.clone())
1490 } else {
1491 self.cache_stats
1492 .query_cache_misses
1493 .fetch_add(1, Ordering::Relaxed);
1494 None
1495 }
1496 }
1497
1498 pub fn cache_query_result(&self, query_hash: u64, results: Vec<(String, f32)>) {
1499 let cached_result = CachedQueryResult {
1500 results,
1501 query_hash,
1502 last_accessed: SystemTime::now(),
1503 ttl: self.config.cache_ttl,
1504 hit_count: 0,
1505 };
1506
1507 let mut cache = self.query_cache.write();
1508 cache.insert(query_hash.to_string(), cached_result);
1509 }
1510
1511 pub fn get_stats(&self) -> &CacheStats {
1512 &self.cache_stats
1513 }
1514}
1515
1516impl ReplicationManager {
1517 pub fn new(config: ReplicationConfig) -> Self {
1518 Self {
1519 config,
1520 replicas: Arc::new(RwLock::new(Vec::new())),
1521 replication_log: Arc::new(RwLock::new(VecDeque::new())),
1522 consensus_algorithm: ConsensusAlgorithm::SimpleMajority,
1523 health_checker: Arc::new(HealthChecker::new()),
1524 }
1525 }
1526}
1527
1528impl Default for HealthChecker {
1529 fn default() -> Self {
1530 Self::new()
1531 }
1532}
1533
1534impl HealthChecker {
1535 pub fn new() -> Self {
1536 Self {
1537 check_interval: Duration::from_secs(30),
1538 timeout: Duration::from_secs(5),
1539 failure_threshold: 3,
1540 }
1541 }
1542}
1543
1544impl ConsistencyManager {
1545 pub fn new(consistency_level: ConsistencyLevel) -> Self {
1546 Self {
1547 consistency_level,
1548 vector_clocks: Arc::new(RwLock::new(HashMap::new())),
1549 conflict_resolver: Arc::new(ConflictResolver::new()),
1550 causal_order_tracker: Arc::new(CausalOrderTracker::new()),
1551 }
1552 }
1553}
1554
1555impl Default for ConflictResolver {
1556 fn default() -> Self {
1557 Self::new()
1558 }
1559}
1560
1561impl ConflictResolver {
1562 pub fn new() -> Self {
1563 Self {
1564 strategy: ConflictResolution::LastWriteWins,
1565 custom_resolvers: HashMap::new(),
1566 }
1567 }
1568}
1569
1570impl Default for CausalOrderTracker {
1571 fn default() -> Self {
1572 Self::new()
1573 }
1574}
1575
1576impl CausalOrderTracker {
1577 pub fn new() -> Self {
1578 Self {
1579 happens_before: Arc::new(RwLock::new(HashMap::new())),
1580 }
1581 }
1582}
1583
1584impl ChangeLog {
1585 pub fn new(max_entries: usize) -> Self {
1586 Self {
1587 entries: Arc::new(RwLock::new(VecDeque::new())),
1588 max_entries,
1589 subscribers: Arc::new(RwLock::new(Vec::new())),
1590 }
1591 }
1592
1593 pub fn add_entry(&self, entry: ChangeLogEntry) {
1594 let mut entries = self.entries.write();
1595 entries.push_back(entry.clone());
1596
1597 if entries.len() > self.max_entries {
1599 entries.pop_front();
1600 }
1601
1602 let subscribers = self.subscribers.read();
1604 for subscriber in subscribers.iter() {
1605 let _ = subscriber.on_change(&entry);
1606 }
1607 }
1608
1609 pub fn add_subscriber(&self, subscriber: Arc<dyn ChangeSubscriber>) {
1610 let mut subscribers = self.subscribers.write();
1611 subscribers.push(subscriber);
1612 }
1613}
1614
1615#[cfg(test)]
1616mod tests {
1617 use super::*;
1618
1619 #[test]
1620 fn test_store_integration_config() {
1621 let config = StoreIntegrationConfig::default();
1622 assert!(config.real_time_sync);
1623 assert_eq!(config.batch_size, 1000);
1624 assert!(config.incremental_updates);
1625 }
1626
1627 #[test]
1628 fn test_transaction_lifecycle() {
1629 let config = StoreIntegrationConfig::default();
1630 let tm = TransactionManager::new(config);
1631
1632 let tx_id = tm.begin_transaction(IsolationLevel::ReadCommitted).unwrap();
1633 assert!(tx_id > 0);
1634
1635 let result = tm.commit_transaction(tx_id);
1636 assert!(result.is_ok());
1637 }
1638
1639 #[test]
1640 fn test_cache_manager() {
1641 let config = StoreCacheConfig {
1642 enable_vector_cache: true,
1643 enable_query_cache: true,
1644 cache_size_mb: 128,
1645 cache_ttl: Duration::from_secs(300),
1646 enable_compression: false,
1647 };
1648
1649 let cache_manager = CacheManager::new(config);
1650 let vector = Vector::new(vec![1.0, 2.0, 3.0]);
1651
1652 cache_manager.cache_vector("test_uri".to_string(), vector.clone());
1653 let cached = cache_manager.get_vector("test_uri");
1654
1655 assert!(cached.is_some());
1656 assert_eq!(cached.unwrap().vector, vector);
1657 }
1658
1659 #[test]
1660 fn test_streaming_engine() {
1661 let config = StreamingConfig {
1662 enable_streaming: true,
1663 buffer_size: 1000,
1664 flush_interval: Duration::from_millis(100),
1665 enable_backpressure: true,
1666 max_lag: Duration::from_secs(1),
1667 };
1668
1669 let streaming_engine = StreamingEngine::new(config);
1670 let operation = StreamingOperation::VectorInsert {
1671 uri: "test_uri".to_string(),
1672 vector: Vector::new(vec![1.0, 2.0, 3.0]),
1673 priority: Priority::Normal,
1674 };
1675
1676 let result = streaming_engine.submit_operation(operation);
1677 assert!(result.is_ok());
1678 }
1679
1680 #[test]
1681 fn test_integrated_vector_store() {
1682 let config = StoreIntegrationConfig::default();
1683 let store = IntegratedVectorStore::new(config, EmbeddingStrategy::TfIdf).unwrap();
1684
1685 let tx_id = store
1686 .begin_transaction(IsolationLevel::ReadCommitted)
1687 .unwrap();
1688 assert!(tx_id > 0);
1689
1690 let vector = Vector::new(vec![1.0, 2.0, 3.0]);
1691 let result = store.transactional_insert(tx_id, "test_uri".to_string(), vector, None);
1692 assert!(result.is_ok());
1693
1694 let commit_result = store.commit_transaction(tx_id);
1695 assert!(commit_result.is_ok());
1696 }
1697}