use crate::store_integration_types::*;
use crate::{
embeddings::EmbeddingStrategy, rdf_integration::RdfVectorConfig,
sparql_integration::SparqlVectorService, VectorStoreTrait,
};
use anyhow::Result;
use parking_lot::RwLock;
use std::collections::HashMap;
use std::sync::{
atomic::{AtomicU64, Ordering},
Arc,
};
use std::time::{Duration, SystemTime};
pub fn new_integrated_store(
config: StoreIntegrationConfig,
embedding_strategy: EmbeddingStrategy,
) -> Result<IntegratedVectorStore> {
let vector_store = Arc::new(RwLock::new(
crate::VectorStore::with_embedding_strategy(embedding_strategy.clone())?.with_config(
crate::VectorStoreConfig {
auto_embed: true,
cache_embeddings: config.cache_config.enable_vector_cache,
similarity_threshold: 0.7,
max_results: 1000,
},
),
));
let rdf_config = RdfVectorConfig::default();
let vector_store_wrapper = VectorStoreWrapper {
store: vector_store.clone(),
};
let vector_store_trait: Arc<std::sync::RwLock<dyn VectorStoreTrait>> =
Arc::new(std::sync::RwLock::new(vector_store_wrapper));
let rdf_integration = Arc::new(RwLock::new(
crate::rdf_integration::RdfVectorIntegration::new(rdf_config, vector_store_trait),
));
let sparql_config = crate::sparql_integration::VectorServiceConfig::default();
let sparql_service = Arc::new(RwLock::new(SparqlVectorService::new(
sparql_config,
embedding_strategy,
)?));
let transaction_manager = Arc::new(TransactionManager::new(config.clone()));
let streaming_engine = Arc::new(StreamingEngine::new(config.streaming_config.clone()));
let cache_manager = Arc::new(CacheManager::new(config.cache_config.clone()));
let replication_manager = Arc::new(ReplicationManager::new(config.replication_config.clone()));
let consistency_manager = Arc::new(ConsistencyManager::new(config.consistency_level));
let change_log = Arc::new(ChangeLog::new(10000));
let metrics = Arc::new(StoreMetrics::default());
Ok(IntegratedVectorStore {
config,
vector_store,
rdf_integration,
sparql_service,
transaction_manager,
streaming_engine,
cache_manager,
replication_manager,
consistency_manager,
change_log,
metrics,
})
}
impl Default for WriteAheadLog {
fn default() -> Self {
Self::new()
}
}
impl WriteAheadLog {
pub fn new() -> Self {
Self {
log_entries: Arc::new(RwLock::new(std::collections::VecDeque::new())),
log_file: None,
checkpoint_interval: Duration::from_secs(60),
last_checkpoint: Arc::new(RwLock::new(SystemTime::now())),
}
}
pub fn append(
&self,
transaction_id: TransactionId,
operation: SerializableOperation,
) -> Result<()> {
let entry = LogEntry {
lsn: self.generate_lsn(),
transaction_id,
operation,
timestamp: SystemTime::now(),
checksum: 0,
};
let mut log = self.log_entries.write();
log.push_back(entry);
if self.should_checkpoint() {
self.checkpoint()?;
}
Ok(())
}
fn generate_lsn(&self) -> u64 {
static LSN_COUNTER: AtomicU64 = AtomicU64::new(0);
LSN_COUNTER.fetch_add(1, Ordering::Relaxed)
}
fn should_checkpoint(&self) -> bool {
let last_checkpoint = *self.last_checkpoint.read();
last_checkpoint
.elapsed()
.expect("SystemTime should not go backwards")
> self.checkpoint_interval
}
fn checkpoint(&self) -> Result<()> {
let mut last_checkpoint = self.last_checkpoint.write();
*last_checkpoint = SystemTime::now();
Ok(())
}
}
impl Default for LockManager {
fn default() -> Self {
Self::new()
}
}
impl LockManager {
pub fn new() -> Self {
Self {
locks: Arc::new(RwLock::new(HashMap::new())),
deadlock_detector: Arc::new(DeadlockDetector::new()),
}
}
pub fn acquire_lock(
&self,
transaction_id: TransactionId,
resource: &str,
lock_type: LockType,
) -> Result<()> {
let mut locks = self.locks.write();
let lock_info = locks
.entry(resource.to_string())
.or_insert_with(|| LockInfo {
lock_type: LockType::Shared,
holders: std::collections::HashSet::new(),
waiters: std::collections::VecDeque::new(),
granted_time: SystemTime::now(),
});
if self.can_grant_lock(lock_info, lock_type) {
lock_info.holders.insert(transaction_id);
lock_info.lock_type = lock_type;
lock_info.granted_time = SystemTime::now();
Ok(())
} else {
lock_info.waiters.push_back((transaction_id, lock_type));
self.deadlock_detector.check_deadlock(transaction_id)?;
Err(anyhow::anyhow!("Lock not available, transaction waiting"))
}
}
pub fn release_transaction_locks(&self, transaction_id: TransactionId) {
let mut locks = self.locks.write();
let mut to_remove = Vec::new();
for (resource, lock_info) in locks.iter_mut() {
lock_info.holders.remove(&transaction_id);
lock_info.waiters.retain(|(tid, _)| *tid != transaction_id);
if lock_info.holders.is_empty() {
to_remove.push(resource.clone());
}
}
for resource in to_remove {
locks.remove(&resource);
}
}
fn can_grant_lock(&self, lock_info: &LockInfo, requested_type: LockType) -> bool {
if lock_info.holders.is_empty() {
return true;
}
matches!(
(lock_info.lock_type, requested_type),
(LockType::Shared, LockType::Shared)
)
}
}
impl Default for DeadlockDetector {
fn default() -> Self {
Self::new()
}
}
impl DeadlockDetector {
pub fn new() -> Self {
Self {
wait_for_graph: Arc::new(RwLock::new(HashMap::new())),
detection_interval: Duration::from_secs(1),
}
}
pub fn check_deadlock(&self, _transaction_id: TransactionId) -> Result<()> {
Ok(())
}
}
impl StreamingEngine {
pub fn new(config: StreamingConfig) -> Self {
Self {
config,
stream_buffer: Arc::new(RwLock::new(std::collections::VecDeque::new())),
processor_thread: None,
backpressure_controller: Arc::new(BackpressureController::new()),
stream_metrics: Arc::new(StreamingMetrics::default()),
}
}
pub fn submit_operation(&self, operation: StreamingOperation) -> Result<()> {
if self.backpressure_controller.should_apply_backpressure() {
return Err(anyhow::anyhow!("Backpressure applied, operation rejected"));
}
let mut buffer = self.stream_buffer.write();
buffer.push_back(operation);
self.stream_metrics
.operations_pending
.fetch_add(1, Ordering::Relaxed);
Ok(())
}
pub fn get_metrics(&self) -> &StreamingMetrics {
&self.stream_metrics
}
}
impl Default for BackpressureController {
fn default() -> Self {
Self::new()
}
}
impl BackpressureController {
pub fn new() -> Self {
Self {
current_load: Arc::new(RwLock::new(0.0)),
max_load_threshold: 0.8,
adaptive_batching: true,
load_shedding: true,
}
}
pub fn should_apply_backpressure(&self) -> bool {
let load = *self.current_load.read();
load > self.max_load_threshold
}
}
impl CacheManager {
pub fn new(config: StoreCacheConfig) -> Self {
Self {
vector_cache: Arc::new(RwLock::new(HashMap::new())),
query_cache: Arc::new(RwLock::new(HashMap::new())),
config,
cache_stats: Arc::new(CacheStats::default()),
eviction_policy: EvictionPolicy::LRU,
}
}
pub fn get_vector(&self, uri: &str) -> Option<CachedVector> {
let cache = self.vector_cache.read();
if let Some(cached) = cache.get(uri) {
self.cache_stats
.vector_cache_hits
.fetch_add(1, Ordering::Relaxed);
Some(cached.clone())
} else {
self.cache_stats
.vector_cache_misses
.fetch_add(1, Ordering::Relaxed);
None
}
}
pub fn cache_vector(&self, uri: String, vector: crate::Vector) {
let cached_vector = CachedVector {
vector,
last_accessed: SystemTime::now(),
access_count: 1,
compression_ratio: 1.0,
cache_level: CacheLevel::Memory,
};
let mut cache = self.vector_cache.write();
cache.insert(uri, cached_vector);
}
pub fn get_cached_query(&self, query_hash: &u64) -> Option<CachedQueryResult> {
let cache = self.query_cache.read();
if let Some(cached) = cache.get(&query_hash.to_string()) {
self.cache_stats
.query_cache_hits
.fetch_add(1, Ordering::Relaxed);
Some(cached.clone())
} else {
self.cache_stats
.query_cache_misses
.fetch_add(1, Ordering::Relaxed);
None
}
}
pub fn cache_query_result(&self, query_hash: u64, results: Vec<(String, f32)>) {
let cached_result = CachedQueryResult {
results,
query_hash,
last_accessed: SystemTime::now(),
ttl: self.config.cache_ttl,
hit_count: 0,
};
let mut cache = self.query_cache.write();
cache.insert(query_hash.to_string(), cached_result);
}
pub fn get_stats(&self) -> &CacheStats {
&self.cache_stats
}
}
impl ReplicationManager {
pub fn new(config: ReplicationConfig) -> Self {
Self {
config,
replicas: Arc::new(RwLock::new(Vec::new())),
replication_log: Arc::new(RwLock::new(std::collections::VecDeque::new())),
consensus_algorithm: ConsensusAlgorithm::SimpleMajority,
health_checker: Arc::new(HealthChecker::new()),
}
}
}
impl Default for HealthChecker {
fn default() -> Self {
Self::new()
}
}
impl HealthChecker {
pub fn new() -> Self {
Self {
check_interval: Duration::from_secs(30),
timeout: Duration::from_secs(5),
failure_threshold: 3,
}
}
}
impl ConsistencyManager {
pub fn new(consistency_level: ConsistencyLevel) -> Self {
Self {
consistency_level,
vector_clocks: Arc::new(RwLock::new(HashMap::new())),
conflict_resolver: Arc::new(ConflictResolver::new()),
causal_order_tracker: Arc::new(CausalOrderTracker::new()),
}
}
}
impl Default for ConflictResolver {
fn default() -> Self {
Self::new()
}
}
impl ConflictResolver {
pub fn new() -> Self {
Self {
strategy: ConflictResolution::LastWriteWins,
custom_resolvers: HashMap::new(),
}
}
}
impl Default for CausalOrderTracker {
fn default() -> Self {
Self::new()
}
}
impl CausalOrderTracker {
pub fn new() -> Self {
Self {
happens_before: Arc::new(RwLock::new(HashMap::new())),
}
}
}
impl ChangeLog {
pub fn new(max_entries: usize) -> Self {
Self {
entries: Arc::new(RwLock::new(std::collections::VecDeque::new())),
max_entries,
subscribers: Arc::new(RwLock::new(Vec::new())),
}
}
pub fn add_entry(&self, entry: ChangeLogEntry) {
let mut entries = self.entries.write();
entries.push_back(entry.clone());
if entries.len() > self.max_entries {
entries.pop_front();
}
let subscribers = self.subscribers.read();
for subscriber in subscribers.iter() {
let _ = subscriber.on_change(&entry);
}
}
pub fn add_subscriber(&self, subscriber: Arc<dyn ChangeSubscriber>) {
let mut subscribers = self.subscribers.write();
subscribers.push(subscriber);
}
}