oxirs_vec/real_time_embedding_pipeline/
mod.rs

1//! Real-time embedding pipeline module
2//!
3//! This module provides a comprehensive real-time embedding update system that can handle
4//! high-throughput streaming data with low-latency updates while maintaining consistency
5//! and performance guarantees.
6//!
7//! ## Architecture
8//!
9//! The real-time embedding pipeline is organized into several components:
10//!
11//! - **Configuration**: Pipeline configuration and settings
12//! - **Traits**: Core traits for extensibility
13//! - **Types**: Common data structures and types
14//! - **Pipeline**: Main pipeline implementation
15//! - **Streaming**: Stream processing components
16//! - **Coordination**: Update coordination and synchronization
17//! - **Monitoring**: Performance monitoring and alerting
18//! - **Versioning**: Version management and storage
19//! - **Consistency**: Consistency management and repair
20//!
21//! ## Usage
22//!
23//! ```rust,no_run
24//! # use tokio::runtime::Runtime;
25//! use oxirs_vec::real_time_embedding_pipeline::{
26//!     RealTimeEmbeddingPipeline, PipelineConfig, ConsistencyLevel
27//! };
28//!
29//! // Create pipeline configuration
30//! let config = PipelineConfig {
31//!     max_batch_size: 1000,
32//!     consistency_level: ConsistencyLevel::Session,
33//!     ..Default::default()
34//! };
35//!
36//! # let runtime = Runtime::new().unwrap();
37//! # runtime.block_on(async {
38//! // Create and start pipeline
39//! let mut pipeline = RealTimeEmbeddingPipeline::new(config).unwrap();
40//! pipeline.start().await.unwrap();
41//! pipeline.stop().await.unwrap();
42//! # });
43//! ```
44
45pub mod config;
46pub mod pipeline;
47pub mod streaming;
48pub mod traits;
49pub mod types;
50// TODO: Implement these modules
51// pub mod coordination;
52// pub mod monitoring;
53// pub mod versioning;
54// pub mod consistency;
55
56// Re-export commonly used types
57pub use config::{
58    AlertThresholds, AutoScalingConfig, BackpressureStrategy, CompressionConfig, CompressionMethod,
59    ConsistencyLevel, MonitoringConfig, PipelineConfig, QualityAssuranceConfig, RetryConfig,
60    VersionControlConfig,
61};
62
63pub use traits::{
64    Alert, AlertCategory, AlertConfig, AlertHandler, AlertSeverity, AlertThrottling,
65    ConflictResolutionFunction, ConsistencyRepairStrategy, ContentItem, EmbeddingGenerator,
66    GeneratorStatistics, HealthStatus, Inconsistency, InconsistencyDetectionAlgorithm,
67    InconsistencySeverity, InconsistencyType, IncrementalVectorIndex, IndexStatistics, MetricPoint,
68    MetricsStorage, ProcessingPriority, ProcessingResult, ProcessingStatus, RepairResult,
69    RepairStatus, Transaction, TransactionLog, TransactionStatus, TransactionType, Version,
70    VersionStorage,
71};
72
73pub use types::{
74    BackpressureState, CircuitBreakerConfig, CircuitBreakerState, CoordinationState,
75    HealthCheckResult, NodeStatus, PerformanceMetrics, PipelineStatistics, QualityMetrics,
76    RealTimeConfig, ResourceUtilization, StreamState, StreamStatus, UpdateBatch, UpdateOperation,
77    UpdatePriority, UpdateStats, VersioningStrategy,
78};
79
80pub use pipeline::RealTimeEmbeddingPipeline;
81pub use streaming::{StreamProcessor, StreamProcessorConfig};
82// TODO: Uncomment when modules are implemented
83// pub use coordination::{UpdateCoordinator, CoordinationConfig};
84// pub use monitoring::{
85//     PipelinePerformanceMonitor, AlertManager, MetricsCollector,
86//     MonitoringConfig as PipelineMonitoringConfig,
87// };
88// pub use versioning::{VersionManager, VersionManagerConfig};
89// pub use consistency::{ConsistencyManager, ConsistencyConfig};
90
91// Re-export error types
92pub use anyhow::{Context, Error, Result};
93
94// Pipeline-specific error types
95#[derive(Debug, thiserror::Error)]
96pub enum PipelineError {
97    #[error("Pipeline not initialized")]
98    NotInitialized,
99
100    #[error("Pipeline already running")]
101    AlreadyRunning,
102
103    #[error("Pipeline not running")]
104    NotRunning,
105
106    #[error("Configuration error: {message}")]
107    ConfigurationError { message: String },
108
109    #[error("Stream processing error: {message}")]
110    StreamProcessingError { message: String },
111
112    #[error("Coordination error: {message}")]
113    CoordinationError { message: String },
114
115    #[error("Consistency error: {message}")]
116    ConsistencyError { message: String },
117
118    #[error("Monitoring error: {message}")]
119    MonitoringError { message: String },
120
121    #[error("Version management error: {message}")]
122    VersionError { message: String },
123
124    #[error("Resource exhausted: {resource}")]
125    ResourceExhausted { resource: String },
126
127    #[error("Timeout error: {operation}")]
128    Timeout { operation: String },
129
130    #[error("Backpressure limit exceeded")]
131    BackpressureExceeded,
132
133    #[error("Quality check failed: {reason}")]
134    QualityCheckFailed { reason: String },
135
136    #[error("Circuit breaker open for: {component}")]
137    CircuitBreakerOpen { component: String },
138}
139
140/// Result type alias for pipeline operations
141pub type PipelineResult<T> = std::result::Result<T, PipelineError>;
142
143#[cfg(test)]
144mod tests {
145    use super::*;
146
147    #[test]
148    fn test_pipeline_config_default() {
149        let config = PipelineConfig::default();
150        assert_eq!(config.max_batch_size, 1000);
151        assert_eq!(config.consistency_level, ConsistencyLevel::Session);
152        assert_eq!(config.backpressure_strategy, BackpressureStrategy::Adaptive);
153    }
154
155    #[test]
156    fn test_processing_priority_ordering() {
157        use ProcessingPriority::*;
158        assert!(Critical > High);
159        assert!(High > Normal);
160        assert!(Normal > Low);
161    }
162
163    #[test]
164    fn test_update_priority_ordering() {
165        use UpdatePriority::*;
166        assert!(Urgent > High);
167        assert!(High > Normal);
168        assert!(Normal > Background);
169    }
170
171    #[test]
172    fn test_alert_severity_ordering() {
173        use AlertSeverity::*;
174        assert!(Critical > Error);
175        assert!(Error > Warning);
176        assert!(Warning > Info);
177    }
178
179    #[test]
180    fn test_inconsistency_severity_ordering() {
181        use InconsistencySeverity::*;
182        assert!(Critical > High);
183        assert!(High > Medium);
184        assert!(Medium > Low);
185    }
186}