1use crate::{
13 embeddings::{EmbeddableContent, EmbeddingStrategy},
14 rdf_integration::{RdfVectorConfig, RdfVectorIntegration},
15 sparql_integration::SparqlVectorService,
16 Vector, VectorId, VectorStore, VectorStoreTrait,
17};
18use anyhow::{anyhow, Result};
19use parking_lot::RwLock;
20use serde::{Deserialize, Serialize};
21use std::collections::{HashMap, HashSet, VecDeque};
22use std::sync::{
23 atomic::{AtomicU64, Ordering},
24 Arc,
25};
26use std::time::{Duration, Instant, SystemTime};
27
28#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct StoreIntegrationConfig {
31 pub real_time_sync: bool,
33 pub batch_size: usize,
35 pub transaction_timeout: Duration,
37 pub incremental_updates: bool,
39 pub consistency_level: ConsistencyLevel,
41 pub conflict_resolution: ConflictResolution,
43 pub multi_tenant: bool,
45 pub cache_config: StoreCacheConfig,
47 pub streaming_config: StreamingConfig,
49 pub replication_config: ReplicationConfig,
51}
52
53#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
55pub enum ConsistencyLevel {
56 Eventual,
58 Session,
60 Strong,
62 Causal,
64}
65
66#[derive(Debug, Clone, Serialize, Deserialize)]
68pub enum ConflictResolution {
69 LastWriteWins,
71 FirstWriteWins,
73 Merge,
75 Custom(String),
77 Manual,
79}
80
81#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct StoreCacheConfig {
84 pub enable_vector_cache: bool,
86 pub enable_query_cache: bool,
88 pub cache_size_mb: usize,
90 pub cache_ttl: Duration,
92 pub enable_compression: bool,
94}
95
96#[derive(Debug, Clone, Serialize, Deserialize)]
98pub struct StreamingConfig {
99 pub enable_streaming: bool,
101 pub buffer_size: usize,
103 pub flush_interval: Duration,
105 pub enable_backpressure: bool,
107 pub max_lag: Duration,
109}
110
111#[derive(Debug, Clone, Serialize, Deserialize)]
113pub struct ReplicationConfig {
114 pub enable_replication: bool,
116 pub replication_factor: usize,
118 pub synchronous: bool,
120 pub replica_endpoints: Vec<String>,
122}
123
124pub struct IntegratedVectorStore {
126 config: StoreIntegrationConfig,
127 vector_store: Arc<RwLock<VectorStore>>,
128 rdf_integration: Arc<RwLock<RdfVectorIntegration>>,
129 sparql_service: Arc<RwLock<SparqlVectorService>>,
130 transaction_manager: Arc<TransactionManager>,
131 streaming_engine: Arc<StreamingEngine>,
132 cache_manager: Arc<CacheManager>,
133 replication_manager: Arc<ReplicationManager>,
134 consistency_manager: Arc<ConsistencyManager>,
135 change_log: Arc<ChangeLog>,
136 metrics: Arc<StoreMetrics>,
137}
138
139pub struct TransactionManager {
141 active_transactions: Arc<RwLock<HashMap<TransactionId, Transaction>>>,
142 transaction_counter: AtomicU64,
143 config: StoreIntegrationConfig,
144 write_ahead_log: Arc<WriteAheadLog>,
145 lock_manager: Arc<LockManager>,
146}
147
148#[derive(Debug, Clone)]
150pub struct Transaction {
151 pub id: TransactionId,
152 pub start_time: SystemTime,
153 pub timeout: Duration,
154 pub operations: Vec<TransactionOperation>,
155 pub status: TransactionStatus,
156 pub isolation_level: IsolationLevel,
157 pub read_set: HashSet<String>,
158 pub write_set: HashSet<String>,
159}
160
161pub type TransactionId = u64;
162
163#[derive(Debug, Clone, Copy, PartialEq)]
165pub enum TransactionStatus {
166 Active,
167 Committed,
168 Aborted,
169 Preparing,
170 Prepared,
171}
172
173#[derive(Debug, Clone, Copy)]
175pub enum IsolationLevel {
176 ReadUncommitted,
177 ReadCommitted,
178 RepeatableRead,
179 Serializable,
180}
181
182#[derive(Debug, Clone)]
184pub enum TransactionOperation {
185 Insert {
186 uri: String,
187 vector: Vector,
188 embedding_content: Option<EmbeddableContent>,
189 },
190 Update {
191 uri: String,
192 vector: Vector,
193 old_vector: Option<Vector>,
194 },
195 Delete {
196 uri: String,
197 vector: Option<Vector>,
198 },
199 BatchInsert {
200 items: Vec<(String, Vector)>,
201 },
202 IndexRebuild {
203 algorithm: String,
204 parameters: HashMap<String, String>,
205 },
206}
207
208pub struct WriteAheadLog {
210 log_entries: Arc<RwLock<VecDeque<LogEntry>>>,
211 log_file: Option<String>,
212 checkpoint_interval: Duration,
213 last_checkpoint: Arc<RwLock<SystemTime>>,
214}
215
216#[derive(Debug, Clone, Serialize, Deserialize)]
218pub struct LogEntry {
219 pub lsn: u64, pub transaction_id: TransactionId,
221 pub operation: SerializableOperation,
222 pub timestamp: SystemTime,
223 pub checksum: u64,
224}
225
226#[derive(Debug, Clone, Serialize, Deserialize)]
228pub enum SerializableOperation {
229 Insert {
230 uri: String,
231 vector_data: Vec<f32>,
232 },
233 Update {
234 uri: String,
235 new_vector: Vec<f32>,
236 old_vector: Option<Vec<f32>>,
237 },
238 Delete {
239 uri: String,
240 },
241 Commit {
242 transaction_id: TransactionId,
243 },
244 Abort {
245 transaction_id: TransactionId,
246 },
247}
248
249pub struct LockManager {
251 locks: Arc<RwLock<HashMap<String, LockInfo>>>,
252 deadlock_detector: Arc<DeadlockDetector>,
253}
254
255#[derive(Debug, Clone)]
257pub struct LockInfo {
258 pub lock_type: LockType,
259 pub holders: HashSet<TransactionId>,
260 pub waiters: VecDeque<(TransactionId, LockType)>,
261 pub granted_time: SystemTime,
262}
263
264#[derive(Debug, Clone, Copy, PartialEq)]
266pub enum LockType {
267 Shared,
268 Exclusive,
269 IntentionShared,
270 IntentionExclusive,
271 SharedIntentionExclusive,
272}
273
274pub struct DeadlockDetector {
276 wait_for_graph: Arc<RwLock<HashMap<TransactionId, HashSet<TransactionId>>>>,
277 detection_interval: Duration,
278}
279
280pub struct StreamingEngine {
282 config: StreamingConfig,
283 stream_buffer: Arc<RwLock<VecDeque<StreamingOperation>>>,
284 processor_thread: Option<std::thread::JoinHandle<()>>,
285 backpressure_controller: Arc<BackpressureController>,
286 stream_metrics: Arc<StreamingMetrics>,
287}
288
289#[derive(Debug, Clone)]
291pub enum StreamingOperation {
292 VectorInsert {
293 uri: String,
294 vector: Vector,
295 priority: Priority,
296 },
297 VectorUpdate {
298 uri: String,
299 vector: Vector,
300 priority: Priority,
301 },
302 VectorDelete {
303 uri: String,
304 priority: Priority,
305 },
306 EmbeddingRequest {
307 content: EmbeddableContent,
308 uri: String,
309 priority: Priority,
310 },
311 BatchOperation {
312 operations: Vec<StreamingOperation>,
313 },
314}
315
316#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
318pub enum Priority {
319 Low = 0,
320 Normal = 1,
321 High = 2,
322 Critical = 3,
323}
324
325pub struct BackpressureController {
327 current_load: Arc<RwLock<f64>>,
328 max_load_threshold: f64,
329 adaptive_batching: bool,
330 load_shedding: bool,
331}
332
333#[derive(Debug, Default)]
335pub struct StreamingMetrics {
336 pub operations_processed: AtomicU64,
337 pub operations_pending: AtomicU64,
338 pub operations_dropped: AtomicU64,
339 pub average_latency_ms: Arc<RwLock<f64>>,
340 pub throughput_ops_sec: Arc<RwLock<f64>>,
341 pub backpressure_events: AtomicU64,
342}
343
344pub struct CacheManager {
346 vector_cache: Arc<RwLock<HashMap<String, CachedVector>>>,
347 query_cache: Arc<RwLock<HashMap<String, CachedQueryResult>>>,
348 config: StoreCacheConfig,
349 cache_stats: Arc<CacheStats>,
350 eviction_policy: EvictionPolicy,
351}
352
353#[derive(Debug, Clone)]
355pub struct CachedVector {
356 pub vector: Vector,
357 pub last_accessed: SystemTime,
358 pub access_count: u64,
359 pub compression_ratio: f32,
360 pub cache_level: CacheLevel,
361}
362
363#[derive(Debug, Clone, Copy)]
365pub enum CacheLevel {
366 Memory,
367 SSD,
368 Disk,
369}
370
371#[derive(Debug, Clone)]
373pub struct CachedQueryResult {
374 pub results: Vec<(String, f32)>,
375 pub query_hash: u64,
376 pub last_accessed: SystemTime,
377 pub ttl: Duration,
378 pub hit_count: u64,
379}
380
381#[derive(Debug, Default)]
383pub struct CacheStats {
384 pub vector_cache_hits: AtomicU64,
385 pub vector_cache_misses: AtomicU64,
386 pub query_cache_hits: AtomicU64,
387 pub query_cache_misses: AtomicU64,
388 pub evictions: AtomicU64,
389 pub memory_usage_bytes: AtomicU64,
390}
391
392#[derive(Debug, Clone)]
394pub enum EvictionPolicy {
395 LRU,
396 LFU,
397 ARC, TTL,
399 Custom(String),
400}
401
402pub struct ReplicationManager {
404 config: ReplicationConfig,
405 replicas: Arc<RwLock<Vec<ReplicaInfo>>>,
406 replication_log: Arc<RwLock<VecDeque<ReplicationEntry>>>,
407 consensus_algorithm: ConsensusAlgorithm,
408 health_checker: Arc<HealthChecker>,
409}
410
411#[derive(Debug, Clone)]
413pub struct ReplicaInfo {
414 pub endpoint: String,
415 pub status: ReplicaStatus,
416 pub last_sync: SystemTime,
417 pub lag: Duration,
418 pub priority: u8,
419}
420
421#[derive(Debug, Clone, Copy, PartialEq)]
423pub enum ReplicaStatus {
424 Active,
425 Inactive,
426 Synchronizing,
427 Failed,
428 Maintenance,
429}
430
431#[derive(Debug, Clone, Serialize, Deserialize)]
433pub struct ReplicationEntry {
434 pub sequence_number: u64,
435 pub operation: SerializableOperation,
436 pub timestamp: SystemTime,
437 pub source_node: String,
438}
439
440#[derive(Debug, Clone)]
442pub enum ConsensusAlgorithm {
443 Raft,
444 PBFT, SimpleMajority,
446}
447
448pub struct HealthChecker {
450 check_interval: Duration,
451 timeout: Duration,
452 failure_threshold: u32,
453}
454
455pub struct ConsistencyManager {
457 consistency_level: ConsistencyLevel,
458 vector_clocks: Arc<RwLock<HashMap<String, VectorClock>>>,
459 conflict_resolver: Arc<ConflictResolver>,
460 causal_order_tracker: Arc<CausalOrderTracker>,
461}
462
463#[derive(Debug, Clone, Default)]
465pub struct VectorClock {
466 pub clocks: HashMap<String, u64>,
467}
468
469pub struct ConflictResolver {
471 strategy: ConflictResolution,
472 custom_resolvers: HashMap<String, Box<dyn ConflictResolverTrait>>,
473}
474
475pub trait ConflictResolverTrait: Send + Sync {
476 fn resolve_conflict(
477 &self,
478 local: &Vector,
479 remote: &Vector,
480 metadata: &ConflictMetadata,
481 ) -> Result<Vector>;
482}
483
484#[derive(Debug, Clone)]
486pub struct ConflictMetadata {
487 pub local_timestamp: SystemTime,
488 pub remote_timestamp: SystemTime,
489 pub local_version: u64,
490 pub remote_version: u64,
491 pub operation_type: String,
492}
493
494pub struct CausalOrderTracker {
496 happens_before: Arc<RwLock<HashMap<String, HashSet<String>>>>,
497}
498
499pub struct ChangeLog {
501 entries: Arc<RwLock<VecDeque<ChangeLogEntry>>>,
502 max_entries: usize,
503 subscribers: Arc<RwLock<Vec<Arc<dyn ChangeSubscriber>>>>,
504}
505
506#[derive(Debug, Clone)]
508pub struct ChangeLogEntry {
509 pub id: u64,
510 pub timestamp: SystemTime,
511 pub operation: ChangeOperation,
512 pub metadata: HashMap<String, String>,
513 pub transaction_id: Option<TransactionId>,
514}
515
516#[derive(Debug, Clone)]
518pub enum ChangeOperation {
519 VectorInserted {
520 uri: String,
521 vector: Vector,
522 },
523 VectorUpdated {
524 uri: String,
525 old_vector: Vector,
526 new_vector: Vector,
527 },
528 VectorDeleted {
529 uri: String,
530 vector: Vector,
531 },
532 IndexRebuilt {
533 algorithm: String,
534 },
535 ConfigurationChanged {
536 changes: HashMap<String, String>,
537 },
538}
539
540pub trait ChangeSubscriber: Send + Sync {
542 fn on_change(&self, entry: &ChangeLogEntry) -> Result<()>;
543 fn subscriber_id(&self) -> String;
544 fn interest_patterns(&self) -> Vec<String>;
545}
546
547#[derive(Debug, Default)]
549pub struct StoreMetrics {
550 pub total_vectors: AtomicU64,
551 pub total_operations: AtomicU64,
552 pub successful_operations: AtomicU64,
553 pub failed_operations: AtomicU64,
554 pub average_operation_time_ms: Arc<RwLock<f64>>,
555 pub active_transactions: AtomicU64,
556 pub committed_transactions: AtomicU64,
557 pub aborted_transactions: AtomicU64,
558 pub replication_lag_ms: Arc<RwLock<f64>>,
559 pub consistency_violations: AtomicU64,
560}
561
562impl Default for StoreIntegrationConfig {
563 fn default() -> Self {
564 Self {
565 real_time_sync: true,
566 batch_size: 1000,
567 transaction_timeout: Duration::from_secs(30),
568 incremental_updates: true,
569 consistency_level: ConsistencyLevel::Session,
570 conflict_resolution: ConflictResolution::LastWriteWins,
571 multi_tenant: false,
572 cache_config: StoreCacheConfig {
573 enable_vector_cache: true,
574 enable_query_cache: true,
575 cache_size_mb: 512,
576 cache_ttl: Duration::from_secs(3600),
577 enable_compression: true,
578 },
579 streaming_config: StreamingConfig {
580 enable_streaming: true,
581 buffer_size: 10000,
582 flush_interval: Duration::from_millis(100),
583 enable_backpressure: true,
584 max_lag: Duration::from_secs(5),
585 },
586 replication_config: ReplicationConfig {
587 enable_replication: false,
588 replication_factor: 3,
589 synchronous: false,
590 replica_endpoints: Vec::new(),
591 },
592 }
593 }
594}
595
596pub struct VectorStoreWrapper {
598 store: Arc<parking_lot::RwLock<VectorStore>>,
599}
600
601impl VectorStoreTrait for VectorStoreWrapper {
602 fn insert_vector(&mut self, id: VectorId, vector: Vector) -> Result<()> {
603 let mut store = self.store.write();
604 store.insert_vector(id, vector)
605 }
606
607 fn add_vector(&mut self, vector: Vector) -> Result<VectorId> {
608 let mut store = self.store.write();
609 store.add_vector(vector)
610 }
611
612 fn get_vector(&self, id: &VectorId) -> Result<Option<Vector>> {
613 let store = self.store.read();
614 let result = store.get_vector(id);
615 Ok(result.cloned())
616 }
617
618 fn get_all_vector_ids(&self) -> Result<Vec<VectorId>> {
619 let store = self.store.read();
620 store.get_all_vector_ids()
621 }
622
623 fn search_similar(&self, query: &Vector, k: usize) -> Result<Vec<(VectorId, f32)>> {
624 let store = self.store.read();
625 store.search_similar(query, k)
626 }
627
628 fn remove_vector(&mut self, id: &VectorId) -> Result<bool> {
629 let mut store = self.store.write();
630 store.remove_vector(id)?;
631 Ok(true)
632 }
633
634 fn len(&self) -> usize {
635 let _store = self.store.read();
636 0 }
639}
640
641impl IntegratedVectorStore {
642 pub fn new(
643 config: StoreIntegrationConfig,
644 embedding_strategy: EmbeddingStrategy,
645 ) -> Result<Self> {
646 let vector_store = Arc::new(RwLock::new(
647 VectorStore::with_embedding_strategy(embedding_strategy.clone())?.with_config(
648 crate::VectorStoreConfig {
649 auto_embed: true,
650 cache_embeddings: config.cache_config.enable_vector_cache,
651 similarity_threshold: 0.7,
652 max_results: 1000,
653 },
654 ),
655 ));
656
657 let rdf_config = RdfVectorConfig::default();
658 let vector_store_wrapper = VectorStoreWrapper {
660 store: vector_store.clone(),
661 };
662 let vector_store_trait: Arc<std::sync::RwLock<dyn VectorStoreTrait>> =
663 Arc::new(std::sync::RwLock::new(vector_store_wrapper));
664 let rdf_integration = Arc::new(RwLock::new(RdfVectorIntegration::new(
665 rdf_config,
666 vector_store_trait,
667 )));
668
669 let sparql_config = crate::sparql_integration::VectorServiceConfig::default();
670 let sparql_service = Arc::new(RwLock::new(SparqlVectorService::new(
671 sparql_config,
672 embedding_strategy,
673 )?));
674
675 let transaction_manager = Arc::new(TransactionManager::new(config.clone()));
676 let streaming_engine = Arc::new(StreamingEngine::new(config.streaming_config.clone()));
677 let cache_manager = Arc::new(CacheManager::new(config.cache_config.clone()));
678 let replication_manager =
679 Arc::new(ReplicationManager::new(config.replication_config.clone()));
680 let consistency_manager = Arc::new(ConsistencyManager::new(config.consistency_level));
681 let change_log = Arc::new(ChangeLog::new(10000)); let metrics = Arc::new(StoreMetrics::default());
683
684 Ok(Self {
685 config,
686 vector_store,
687 rdf_integration,
688 sparql_service,
689 transaction_manager,
690 streaming_engine,
691 cache_manager,
692 replication_manager,
693 consistency_manager,
694 change_log,
695 metrics,
696 })
697 }
698
699 pub fn begin_transaction(&self, isolation_level: IsolationLevel) -> Result<TransactionId> {
701 let transaction_id = self
702 .transaction_manager
703 .begin_transaction(isolation_level)?;
704 self.metrics
705 .active_transactions
706 .fetch_add(1, Ordering::Relaxed);
707
708 tracing::debug!("Started transaction {}", transaction_id);
709 Ok(transaction_id)
710 }
711
712 pub fn commit_transaction(&self, transaction_id: TransactionId) -> Result<()> {
714 self.transaction_manager
715 .commit_transaction(transaction_id)?;
716 self.metrics
717 .active_transactions
718 .fetch_sub(1, Ordering::Relaxed);
719 self.metrics
720 .committed_transactions
721 .fetch_add(1, Ordering::Relaxed);
722
723 tracing::debug!("Committed transaction {}", transaction_id);
724 Ok(())
725 }
726
727 pub fn abort_transaction(&self, transaction_id: TransactionId) -> Result<()> {
729 self.transaction_manager.abort_transaction(transaction_id)?;
730 self.metrics
731 .active_transactions
732 .fetch_sub(1, Ordering::Relaxed);
733 self.metrics
734 .aborted_transactions
735 .fetch_add(1, Ordering::Relaxed);
736
737 tracing::debug!("Aborted transaction {}", transaction_id);
738 Ok(())
739 }
740
741 pub fn transactional_insert(
743 &self,
744 transaction_id: TransactionId,
745 uri: String,
746 vector: Vector,
747 embedding_content: Option<EmbeddableContent>,
748 ) -> Result<()> {
749 if let Some(cached) = self.cache_manager.get_vector(&uri) {
751 if cached.vector == vector {
752 return Ok(()); }
754 }
755
756 let operation = TransactionOperation::Insert {
757 uri: uri.clone(),
758 vector: vector.clone(),
759 embedding_content,
760 };
761
762 self.transaction_manager
763 .add_operation(transaction_id, operation)?;
764
765 self.cache_manager.cache_vector(uri.clone(), vector.clone());
767
768 let change_entry = ChangeLogEntry {
770 id: self.generate_change_id(),
771 timestamp: SystemTime::now(),
772 operation: ChangeOperation::VectorInserted { uri, vector },
773 metadata: HashMap::new(),
774 transaction_id: Some(transaction_id),
775 };
776 self.change_log.add_entry(change_entry);
777
778 self.metrics
779 .total_operations
780 .fetch_add(1, Ordering::Relaxed);
781 Ok(())
782 }
783
784 pub fn stream_insert(&self, uri: String, vector: Vector, priority: Priority) -> Result<()> {
786 let operation = StreamingOperation::VectorInsert {
787 uri,
788 vector,
789 priority,
790 };
791 self.streaming_engine.submit_operation(operation)?;
792 Ok(())
793 }
794
795 pub fn batch_insert(
797 &self,
798 items: Vec<(String, Vector)>,
799 auto_commit: bool,
800 ) -> Result<TransactionId> {
801 let transaction_id = self.begin_transaction(IsolationLevel::ReadCommitted)?;
802
803 for batch in items.chunks(self.config.batch_size) {
805 let batch_operation = TransactionOperation::BatchInsert {
806 items: batch.to_vec(),
807 };
808 self.transaction_manager
809 .add_operation(transaction_id, batch_operation)?;
810 }
811
812 if auto_commit {
813 self.commit_transaction(transaction_id)?;
814 }
815
816 Ok(transaction_id)
817 }
818
819 pub fn consistent_search(
821 &self,
822 query: &Vector,
823 k: usize,
824 consistency_level: Option<ConsistencyLevel>,
825 ) -> Result<Vec<(String, f32)>> {
826 let effective_consistency = consistency_level.unwrap_or(self.config.consistency_level);
827
828 let query_hash = self.compute_query_hash(query, k);
830 if let Some(cached_result) = self.cache_manager.get_cached_query(&query_hash) {
831 return Ok(cached_result.results);
832 }
833
834 match effective_consistency {
836 ConsistencyLevel::Strong => {
837 self.wait_for_consistency()?;
839 }
840 ConsistencyLevel::Session => {
841 self.ensure_session_consistency()?;
843 }
844 ConsistencyLevel::Causal => {
845 self.ensure_causal_consistency()?;
847 }
848 ConsistencyLevel::Eventual => {
849 }
851 }
852
853 let store = self.vector_store.read();
855 let results = store.similarity_search_vector(query, k)?;
856
857 self.cache_manager
859 .cache_query_result(query_hash, results.clone());
860
861 Ok(results)
862 }
863
864 pub fn rdf_vector_search(
866 &self,
867 rdf_term: &str,
868 k: usize,
869 graph_context: Option<&str>,
870 ) -> Result<Vec<(String, f32)>> {
871 let rdf_integration = self.rdf_integration.read();
872
873 let term = oxirs_core::model::Term::NamedNode(
875 oxirs_core::model::NamedNode::new(rdf_term)
876 .map_err(|e| anyhow!("Invalid IRI: {}", e))?,
877 );
878
879 let graph_name = graph_context
881 .map(|ctx| -> Result<oxirs_core::model::GraphName> {
882 Ok(oxirs_core::model::GraphName::NamedNode(
883 oxirs_core::model::NamedNode::new(ctx)
884 .map_err(|e| anyhow!("Invalid graph IRI: {}", e))?,
885 ))
886 })
887 .transpose()?;
888
889 let results = rdf_integration.find_similar_terms(&term, k, None, graph_name.as_ref())?;
891
892 let converted_results = results
894 .into_iter()
895 .map(|result| (result.term.to_string(), result.score))
896 .collect();
897
898 Ok(converted_results)
899 }
900
901 pub fn sparql_vector_search(
903 &self,
904 _query: &str,
905 bindings: &HashMap<String, String>,
906 ) -> Result<Vec<HashMap<String, String>>> {
907 let _sparql_service = self.sparql_service.read();
908 Ok(vec![bindings.clone()])
911 }
912
913 pub fn subscribe_to_changes(&self, subscriber: Arc<dyn ChangeSubscriber>) -> Result<()> {
915 self.change_log.add_subscriber(subscriber);
916 Ok(())
917 }
918
919 pub fn get_metrics(&self) -> StoreMetrics {
921 StoreMetrics {
922 total_vectors: AtomicU64::new(self.metrics.total_vectors.load(Ordering::Relaxed)),
923 total_operations: AtomicU64::new(self.metrics.total_operations.load(Ordering::Relaxed)),
924 successful_operations: AtomicU64::new(
925 self.metrics.successful_operations.load(Ordering::Relaxed),
926 ),
927 failed_operations: AtomicU64::new(
928 self.metrics.failed_operations.load(Ordering::Relaxed),
929 ),
930 average_operation_time_ms: Arc::new(RwLock::new(
931 *self.metrics.average_operation_time_ms.read(),
932 )),
933 active_transactions: AtomicU64::new(
934 self.metrics.active_transactions.load(Ordering::Relaxed),
935 ),
936 committed_transactions: AtomicU64::new(
937 self.metrics.committed_transactions.load(Ordering::Relaxed),
938 ),
939 aborted_transactions: AtomicU64::new(
940 self.metrics.aborted_transactions.load(Ordering::Relaxed),
941 ),
942 replication_lag_ms: Arc::new(RwLock::new(*self.metrics.replication_lag_ms.read())),
943 consistency_violations: AtomicU64::new(
944 self.metrics.consistency_violations.load(Ordering::Relaxed),
945 ),
946 }
947 }
948
949 pub fn health_check(&self) -> Result<HealthStatus> {
951 let mut issues = Vec::new();
952
953 if self.metrics.active_transactions.load(Ordering::Relaxed) > 1000 {
955 issues.push("High number of active transactions".to_string());
956 }
957
958 let streaming_metrics = self.streaming_engine.get_metrics();
960 if streaming_metrics.operations_pending.load(Ordering::Relaxed) > 10000 {
961 issues.push("High number of pending streaming operations".to_string());
962 }
963
964 let cache_stats = self.cache_manager.get_stats();
966 let hit_ratio = cache_stats.vector_cache_hits.load(Ordering::Relaxed) as f64
967 / (cache_stats.vector_cache_hits.load(Ordering::Relaxed)
968 + cache_stats.vector_cache_misses.load(Ordering::Relaxed)) as f64;
969
970 if hit_ratio < 0.8 {
971 issues.push("Low cache hit ratio".to_string());
972 }
973
974 if self.config.replication_config.enable_replication {
976 let replication_lag = *self.metrics.replication_lag_ms.read();
977 if replication_lag > 1000.0 {
978 issues.push("High replication lag".to_string());
979 }
980 }
981
982 let status = if issues.is_empty() {
983 HealthStatus::Healthy
984 } else if issues.len() <= 2 {
985 HealthStatus::Warning(issues)
986 } else {
987 HealthStatus::Critical(issues)
988 };
989
990 Ok(status)
991 }
992
993 fn generate_change_id(&self) -> u64 {
995 static COUNTER: AtomicU64 = AtomicU64::new(0);
996 COUNTER.fetch_add(1, Ordering::Relaxed)
997 }
998
999 fn compute_query_hash(&self, query: &Vector, k: usize) -> u64 {
1000 use std::collections::hash_map::DefaultHasher;
1001 use std::hash::{Hash, Hasher};
1002
1003 let mut hasher = DefaultHasher::new();
1004 for value in query.as_f32() {
1006 value.to_bits().hash(&mut hasher);
1007 }
1008 k.hash(&mut hasher);
1009 hasher.finish()
1010 }
1011
1012 fn wait_for_consistency(&self) -> Result<()> {
1013 let start = Instant::now();
1015 let timeout = Duration::from_secs(30);
1016
1017 while self.metrics.active_transactions.load(Ordering::Relaxed) > 0 {
1018 if start.elapsed() > timeout {
1019 return Err(anyhow!("Timeout waiting for consistency"));
1020 }
1021 std::thread::sleep(Duration::from_millis(10));
1022 }
1023
1024 Ok(())
1025 }
1026
1027 fn ensure_session_consistency(&self) -> Result<()> {
1028 Ok(())
1031 }
1032
1033 fn ensure_causal_consistency(&self) -> Result<()> {
1034 Ok(())
1037 }
1038}
1039
1040#[derive(Debug, Clone)]
1042pub enum HealthStatus {
1043 Healthy,
1044 Warning(Vec<String>),
1045 Critical(Vec<String>),
1046}
1047
1048impl TransactionManager {
1049 pub fn new(config: StoreIntegrationConfig) -> Self {
1050 Self {
1051 active_transactions: Arc::new(RwLock::new(HashMap::new())),
1052 transaction_counter: AtomicU64::new(1),
1053 config,
1054 write_ahead_log: Arc::new(WriteAheadLog::new()),
1055 lock_manager: Arc::new(LockManager::new()),
1056 }
1057 }
1058
1059 pub fn begin_transaction(&self, isolation_level: IsolationLevel) -> Result<TransactionId> {
1060 let transaction_id = self.transaction_counter.fetch_add(1, Ordering::Relaxed);
1061 let transaction = Transaction {
1062 id: transaction_id,
1063 start_time: SystemTime::now(),
1064 timeout: self.config.transaction_timeout,
1065 operations: Vec::new(),
1066 status: TransactionStatus::Active,
1067 isolation_level,
1068 read_set: HashSet::new(),
1069 write_set: HashSet::new(),
1070 };
1071
1072 let mut active_txns = self.active_transactions.write();
1073 active_txns.insert(transaction_id, transaction);
1074
1075 Ok(transaction_id)
1076 }
1077
1078 pub fn add_operation(
1079 &self,
1080 transaction_id: TransactionId,
1081 operation: TransactionOperation,
1082 ) -> Result<()> {
1083 let mut active_txns = self.active_transactions.write();
1084 let transaction = active_txns
1085 .get_mut(&transaction_id)
1086 .ok_or_else(|| anyhow!("Transaction not found: {}", transaction_id))?;
1087
1088 if transaction.status != TransactionStatus::Active {
1089 return Err(anyhow!("Transaction is not active"));
1090 }
1091
1092 if transaction.start_time.elapsed().unwrap() > transaction.timeout {
1094 transaction.status = TransactionStatus::Aborted;
1095 return Err(anyhow!("Transaction timeout"));
1096 }
1097
1098 self.acquire_locks_for_operation(transaction_id, &operation)?;
1100
1101 let serializable_op = self.convert_to_serializable(&operation);
1103 self.write_ahead_log
1104 .append(transaction_id, serializable_op)?;
1105
1106 transaction.operations.push(operation);
1107 Ok(())
1108 }
1109
1110 pub fn commit_transaction(&self, transaction_id: TransactionId) -> Result<()> {
1111 let mut active_txns = self.active_transactions.write();
1112 let transaction = active_txns
1113 .remove(&transaction_id)
1114 .ok_or_else(|| anyhow!("Transaction not found: {}", transaction_id))?;
1115
1116 if transaction.status != TransactionStatus::Active {
1117 return Err(anyhow!("Transaction is not active"));
1118 }
1119
1120 self.validate_transaction(&transaction)?;
1122
1123 self.write_ahead_log.append(
1125 transaction_id,
1126 SerializableOperation::Commit { transaction_id },
1127 )?;
1128
1129 for operation in &transaction.operations {
1131 self.execute_operation(operation)?;
1132 }
1133
1134 self.lock_manager.release_transaction_locks(transaction_id);
1136
1137 tracing::debug!("Transaction {} committed successfully", transaction_id);
1138 Ok(())
1139 }
1140
1141 pub fn abort_transaction(&self, transaction_id: TransactionId) -> Result<()> {
1142 let mut active_txns = self.active_transactions.write();
1143 let _transaction = active_txns
1144 .remove(&transaction_id)
1145 .ok_or_else(|| anyhow!("Transaction not found: {}", transaction_id))?;
1146
1147 self.write_ahead_log.append(
1149 transaction_id,
1150 SerializableOperation::Abort { transaction_id },
1151 )?;
1152
1153 self.lock_manager.release_transaction_locks(transaction_id);
1155
1156 tracing::debug!("Transaction {} aborted", transaction_id);
1157 Ok(())
1158 }
1159
1160 fn acquire_locks_for_operation(
1161 &self,
1162 transaction_id: TransactionId,
1163 operation: &TransactionOperation,
1164 ) -> Result<()> {
1165 match operation {
1166 TransactionOperation::Insert { uri, .. } => {
1167 self.lock_manager
1168 .acquire_lock(transaction_id, uri, LockType::Exclusive)?;
1169 }
1170 TransactionOperation::Update { uri, .. } => {
1171 self.lock_manager
1172 .acquire_lock(transaction_id, uri, LockType::Exclusive)?;
1173 }
1174 TransactionOperation::Delete { uri, .. } => {
1175 self.lock_manager
1176 .acquire_lock(transaction_id, uri, LockType::Exclusive)?;
1177 }
1178 TransactionOperation::BatchInsert { items } => {
1179 for (uri, _) in items {
1180 self.lock_manager
1181 .acquire_lock(transaction_id, uri, LockType::Exclusive)?;
1182 }
1183 }
1184 TransactionOperation::IndexRebuild { .. } => {
1185 self.lock_manager
1187 .acquire_lock(transaction_id, "_global_", LockType::Exclusive)?;
1188 }
1189 }
1190 Ok(())
1191 }
1192
1193 fn validate_transaction(&self, _transaction: &Transaction) -> Result<()> {
1194 Ok(())
1197 }
1198
1199 fn execute_operation(&self, _operation: &TransactionOperation) -> Result<()> {
1200 Ok(())
1203 }
1204
1205 fn convert_to_serializable(&self, operation: &TransactionOperation) -> SerializableOperation {
1206 match operation {
1207 TransactionOperation::Insert { uri, vector, .. } => SerializableOperation::Insert {
1208 uri: uri.clone(),
1209 vector_data: vector.as_f32(),
1210 },
1211 TransactionOperation::Update {
1212 uri,
1213 vector,
1214 old_vector,
1215 } => SerializableOperation::Update {
1216 uri: uri.clone(),
1217 new_vector: vector.as_f32(),
1218 old_vector: old_vector.as_ref().map(|v| v.as_f32()),
1219 },
1220 TransactionOperation::Delete { uri, .. } => {
1221 SerializableOperation::Delete { uri: uri.clone() }
1222 }
1223 _ => {
1224 SerializableOperation::Insert {
1226 uri: "batch_operation".to_string(),
1227 vector_data: vec![0.0],
1228 }
1229 }
1230 }
1231 }
1232}
1233
1234impl Default for WriteAheadLog {
1235 fn default() -> Self {
1236 Self::new()
1237 }
1238}
1239
1240impl WriteAheadLog {
1241 pub fn new() -> Self {
1242 Self {
1243 log_entries: Arc::new(RwLock::new(VecDeque::new())),
1244 log_file: None,
1245 checkpoint_interval: Duration::from_secs(60),
1246 last_checkpoint: Arc::new(RwLock::new(SystemTime::now())),
1247 }
1248 }
1249
1250 pub fn append(
1251 &self,
1252 transaction_id: TransactionId,
1253 operation: SerializableOperation,
1254 ) -> Result<()> {
1255 let entry = LogEntry {
1256 lsn: self.generate_lsn(),
1257 transaction_id,
1258 operation,
1259 timestamp: SystemTime::now(),
1260 checksum: 0, };
1262
1263 let mut log = self.log_entries.write();
1264 log.push_back(entry);
1265
1266 if self.should_checkpoint() {
1268 self.checkpoint()?;
1269 }
1270
1271 Ok(())
1272 }
1273
1274 fn generate_lsn(&self) -> u64 {
1275 static LSN_COUNTER: AtomicU64 = AtomicU64::new(0);
1276 LSN_COUNTER.fetch_add(1, Ordering::Relaxed)
1277 }
1278
1279 fn should_checkpoint(&self) -> bool {
1280 let last_checkpoint = *self.last_checkpoint.read();
1281 last_checkpoint.elapsed().unwrap() > self.checkpoint_interval
1282 }
1283
1284 fn checkpoint(&self) -> Result<()> {
1285 let mut last_checkpoint = self.last_checkpoint.write();
1287 *last_checkpoint = SystemTime::now();
1288 Ok(())
1289 }
1290}
1291
1292impl Default for LockManager {
1293 fn default() -> Self {
1294 Self::new()
1295 }
1296}
1297
1298impl LockManager {
1299 pub fn new() -> Self {
1300 Self {
1301 locks: Arc::new(RwLock::new(HashMap::new())),
1302 deadlock_detector: Arc::new(DeadlockDetector::new()),
1303 }
1304 }
1305
1306 pub fn acquire_lock(
1307 &self,
1308 transaction_id: TransactionId,
1309 resource: &str,
1310 lock_type: LockType,
1311 ) -> Result<()> {
1312 let mut locks = self.locks.write();
1313 let lock_info = locks
1314 .entry(resource.to_string())
1315 .or_insert_with(|| LockInfo {
1316 lock_type: LockType::Shared,
1317 holders: HashSet::new(),
1318 waiters: VecDeque::new(),
1319 granted_time: SystemTime::now(),
1320 });
1321
1322 if self.can_grant_lock(lock_info, lock_type) {
1324 lock_info.holders.insert(transaction_id);
1325 lock_info.lock_type = lock_type;
1326 lock_info.granted_time = SystemTime::now();
1327 Ok(())
1328 } else {
1329 lock_info.waiters.push_back((transaction_id, lock_type));
1331
1332 self.deadlock_detector.check_deadlock(transaction_id)?;
1334
1335 Err(anyhow!("Lock not available, transaction waiting"))
1336 }
1337 }
1338
1339 pub fn release_transaction_locks(&self, transaction_id: TransactionId) {
1340 let mut locks = self.locks.write();
1341 let mut to_remove = Vec::new();
1342
1343 for (resource, lock_info) in locks.iter_mut() {
1344 lock_info.holders.remove(&transaction_id);
1345
1346 lock_info.waiters.retain(|(tid, _)| *tid != transaction_id);
1348
1349 if lock_info.holders.is_empty() {
1350 to_remove.push(resource.clone());
1351 }
1352 }
1353
1354 for resource in to_remove {
1355 locks.remove(&resource);
1356 }
1357 }
1358
1359 fn can_grant_lock(&self, lock_info: &LockInfo, requested_type: LockType) -> bool {
1360 if lock_info.holders.is_empty() {
1361 return true;
1362 }
1363
1364 matches!(
1365 (lock_info.lock_type, requested_type),
1366 (LockType::Shared, LockType::Shared)
1367 )
1368 }
1369}
1370
1371impl Default for DeadlockDetector {
1372 fn default() -> Self {
1373 Self::new()
1374 }
1375}
1376
1377impl DeadlockDetector {
1378 pub fn new() -> Self {
1379 Self {
1380 wait_for_graph: Arc::new(RwLock::new(HashMap::new())),
1381 detection_interval: Duration::from_secs(1),
1382 }
1383 }
1384
1385 pub fn check_deadlock(&self, _transaction_id: TransactionId) -> Result<()> {
1386 Ok(())
1389 }
1390}
1391
1392impl StreamingEngine {
1393 pub fn new(config: StreamingConfig) -> Self {
1394 Self {
1395 config,
1396 stream_buffer: Arc::new(RwLock::new(VecDeque::new())),
1397 processor_thread: None,
1398 backpressure_controller: Arc::new(BackpressureController::new()),
1399 stream_metrics: Arc::new(StreamingMetrics::default()),
1400 }
1401 }
1402
1403 pub fn submit_operation(&self, operation: StreamingOperation) -> Result<()> {
1404 if self.backpressure_controller.should_apply_backpressure() {
1406 return Err(anyhow!("Backpressure applied, operation rejected"));
1407 }
1408
1409 let mut buffer = self.stream_buffer.write();
1410 buffer.push_back(operation);
1411
1412 self.stream_metrics
1413 .operations_pending
1414 .fetch_add(1, Ordering::Relaxed);
1415 Ok(())
1416 }
1417
1418 pub fn get_metrics(&self) -> &StreamingMetrics {
1419 &self.stream_metrics
1420 }
1421}
1422
1423impl Default for BackpressureController {
1424 fn default() -> Self {
1425 Self::new()
1426 }
1427}
1428
1429impl BackpressureController {
1430 pub fn new() -> Self {
1431 Self {
1432 current_load: Arc::new(RwLock::new(0.0)),
1433 max_load_threshold: 0.8,
1434 adaptive_batching: true,
1435 load_shedding: true,
1436 }
1437 }
1438
1439 pub fn should_apply_backpressure(&self) -> bool {
1440 let load = *self.current_load.read();
1441 load > self.max_load_threshold
1442 }
1443}
1444
1445impl CacheManager {
1446 pub fn new(config: StoreCacheConfig) -> Self {
1447 Self {
1448 vector_cache: Arc::new(RwLock::new(HashMap::new())),
1449 query_cache: Arc::new(RwLock::new(HashMap::new())),
1450 config,
1451 cache_stats: Arc::new(CacheStats::default()),
1452 eviction_policy: EvictionPolicy::LRU,
1453 }
1454 }
1455
1456 pub fn get_vector(&self, uri: &str) -> Option<CachedVector> {
1457 let cache = self.vector_cache.read();
1458 if let Some(cached) = cache.get(uri) {
1459 self.cache_stats
1460 .vector_cache_hits
1461 .fetch_add(1, Ordering::Relaxed);
1462 Some(cached.clone())
1463 } else {
1464 self.cache_stats
1465 .vector_cache_misses
1466 .fetch_add(1, Ordering::Relaxed);
1467 None
1468 }
1469 }
1470
1471 pub fn cache_vector(&self, uri: String, vector: Vector) {
1472 let cached_vector = CachedVector {
1473 vector,
1474 last_accessed: SystemTime::now(),
1475 access_count: 1,
1476 compression_ratio: 1.0,
1477 cache_level: CacheLevel::Memory,
1478 };
1479
1480 let mut cache = self.vector_cache.write();
1481 cache.insert(uri, cached_vector);
1482 }
1483
1484 pub fn get_cached_query(&self, query_hash: &u64) -> Option<CachedQueryResult> {
1485 let cache = self.query_cache.read();
1486 if let Some(cached) = cache.get(&query_hash.to_string()) {
1487 self.cache_stats
1488 .query_cache_hits
1489 .fetch_add(1, Ordering::Relaxed);
1490 Some(cached.clone())
1491 } else {
1492 self.cache_stats
1493 .query_cache_misses
1494 .fetch_add(1, Ordering::Relaxed);
1495 None
1496 }
1497 }
1498
1499 pub fn cache_query_result(&self, query_hash: u64, results: Vec<(String, f32)>) {
1500 let cached_result = CachedQueryResult {
1501 results,
1502 query_hash,
1503 last_accessed: SystemTime::now(),
1504 ttl: self.config.cache_ttl,
1505 hit_count: 0,
1506 };
1507
1508 let mut cache = self.query_cache.write();
1509 cache.insert(query_hash.to_string(), cached_result);
1510 }
1511
1512 pub fn get_stats(&self) -> &CacheStats {
1513 &self.cache_stats
1514 }
1515}
1516
1517impl ReplicationManager {
1518 pub fn new(config: ReplicationConfig) -> Self {
1519 Self {
1520 config,
1521 replicas: Arc::new(RwLock::new(Vec::new())),
1522 replication_log: Arc::new(RwLock::new(VecDeque::new())),
1523 consensus_algorithm: ConsensusAlgorithm::SimpleMajority,
1524 health_checker: Arc::new(HealthChecker::new()),
1525 }
1526 }
1527}
1528
1529impl Default for HealthChecker {
1530 fn default() -> Self {
1531 Self::new()
1532 }
1533}
1534
1535impl HealthChecker {
1536 pub fn new() -> Self {
1537 Self {
1538 check_interval: Duration::from_secs(30),
1539 timeout: Duration::from_secs(5),
1540 failure_threshold: 3,
1541 }
1542 }
1543}
1544
1545impl ConsistencyManager {
1546 pub fn new(consistency_level: ConsistencyLevel) -> Self {
1547 Self {
1548 consistency_level,
1549 vector_clocks: Arc::new(RwLock::new(HashMap::new())),
1550 conflict_resolver: Arc::new(ConflictResolver::new()),
1551 causal_order_tracker: Arc::new(CausalOrderTracker::new()),
1552 }
1553 }
1554}
1555
1556impl Default for ConflictResolver {
1557 fn default() -> Self {
1558 Self::new()
1559 }
1560}
1561
1562impl ConflictResolver {
1563 pub fn new() -> Self {
1564 Self {
1565 strategy: ConflictResolution::LastWriteWins,
1566 custom_resolvers: HashMap::new(),
1567 }
1568 }
1569}
1570
1571impl Default for CausalOrderTracker {
1572 fn default() -> Self {
1573 Self::new()
1574 }
1575}
1576
1577impl CausalOrderTracker {
1578 pub fn new() -> Self {
1579 Self {
1580 happens_before: Arc::new(RwLock::new(HashMap::new())),
1581 }
1582 }
1583}
1584
1585impl ChangeLog {
1586 pub fn new(max_entries: usize) -> Self {
1587 Self {
1588 entries: Arc::new(RwLock::new(VecDeque::new())),
1589 max_entries,
1590 subscribers: Arc::new(RwLock::new(Vec::new())),
1591 }
1592 }
1593
1594 pub fn add_entry(&self, entry: ChangeLogEntry) {
1595 let mut entries = self.entries.write();
1596 entries.push_back(entry.clone());
1597
1598 if entries.len() > self.max_entries {
1600 entries.pop_front();
1601 }
1602
1603 let subscribers = self.subscribers.read();
1605 for subscriber in subscribers.iter() {
1606 let _ = subscriber.on_change(&entry);
1607 }
1608 }
1609
1610 pub fn add_subscriber(&self, subscriber: Arc<dyn ChangeSubscriber>) {
1611 let mut subscribers = self.subscribers.write();
1612 subscribers.push(subscriber);
1613 }
1614}
1615
1616#[cfg(test)]
1617mod tests {
1618 use super::*;
1619
1620 #[test]
1621 fn test_store_integration_config() {
1622 let config = StoreIntegrationConfig::default();
1623 assert!(config.real_time_sync);
1624 assert_eq!(config.batch_size, 1000);
1625 assert!(config.incremental_updates);
1626 }
1627
1628 #[test]
1629 fn test_transaction_lifecycle() {
1630 let config = StoreIntegrationConfig::default();
1631 let tm = TransactionManager::new(config);
1632
1633 let tx_id = tm.begin_transaction(IsolationLevel::ReadCommitted).unwrap();
1634 assert!(tx_id > 0);
1635
1636 let result = tm.commit_transaction(tx_id);
1637 assert!(result.is_ok());
1638 }
1639
1640 #[test]
1641 fn test_cache_manager() {
1642 let config = StoreCacheConfig {
1643 enable_vector_cache: true,
1644 enable_query_cache: true,
1645 cache_size_mb: 128,
1646 cache_ttl: Duration::from_secs(300),
1647 enable_compression: false,
1648 };
1649
1650 let cache_manager = CacheManager::new(config);
1651 let vector = Vector::new(vec![1.0, 2.0, 3.0]);
1652
1653 cache_manager.cache_vector("test_uri".to_string(), vector.clone());
1654 let cached = cache_manager.get_vector("test_uri");
1655
1656 assert!(cached.is_some());
1657 assert_eq!(cached.unwrap().vector, vector);
1658 }
1659
1660 #[test]
1661 fn test_streaming_engine() {
1662 let config = StreamingConfig {
1663 enable_streaming: true,
1664 buffer_size: 1000,
1665 flush_interval: Duration::from_millis(100),
1666 enable_backpressure: true,
1667 max_lag: Duration::from_secs(1),
1668 };
1669
1670 let streaming_engine = StreamingEngine::new(config);
1671 let operation = StreamingOperation::VectorInsert {
1672 uri: "test_uri".to_string(),
1673 vector: Vector::new(vec![1.0, 2.0, 3.0]),
1674 priority: Priority::Normal,
1675 };
1676
1677 let result = streaming_engine.submit_operation(operation);
1678 assert!(result.is_ok());
1679 }
1680
1681 #[test]
1682 fn test_integrated_vector_store() {
1683 let config = StoreIntegrationConfig::default();
1684 let store = IntegratedVectorStore::new(config, EmbeddingStrategy::TfIdf).unwrap();
1685
1686 let tx_id = store
1687 .begin_transaction(IsolationLevel::ReadCommitted)
1688 .unwrap();
1689 assert!(tx_id > 0);
1690
1691 let vector = Vector::new(vec![1.0, 2.0, 3.0]);
1692 let result = store.transactional_insert(tx_id, "test_uri".to_string(), vector, None);
1693 assert!(result.is_ok());
1694
1695 let commit_result = store.commit_transaction(tx_id);
1696 assert!(commit_result.is_ok());
1697 }
1698}