use anyhow::Result;
use serde::{Deserialize, Serialize};
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use crate::real_time_embedding_pipeline::{
traits::{ContentItem, HealthStatus},
types::{StreamState, StreamStatus},
};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamConfig {
pub stream_id: String,
pub buffer_size: usize,
pub timeout: Duration,
pub max_retries: usize,
pub enable_compression: bool,
}
impl Default for StreamConfig {
fn default() -> Self {
Self {
stream_id: "default".to_string(),
buffer_size: 1000,
timeout: Duration::from_secs(30),
max_retries: 3,
enable_compression: false,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamProcessorConfig {
pub max_concurrent_streams: usize,
pub timeout_config: TimeoutConfig,
pub buffer_config: BufferConfig,
pub error_config: ErrorConfig,
}
impl Default for StreamProcessorConfig {
fn default() -> Self {
Self {
max_concurrent_streams: 10,
timeout_config: TimeoutConfig::default(),
buffer_config: BufferConfig::default(),
error_config: ErrorConfig::default(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TimeoutConfig {
pub connection_timeout: Duration,
pub read_timeout: Duration,
pub write_timeout: Duration,
pub idle_timeout: Duration,
}
impl Default for TimeoutConfig {
fn default() -> Self {
Self {
connection_timeout: Duration::from_secs(10),
read_timeout: Duration::from_secs(30),
write_timeout: Duration::from_secs(30),
idle_timeout: Duration::from_secs(300),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BufferConfig {
pub initial_size: usize,
pub max_size: usize,
pub growth_factor: f64,
pub adaptive_sizing: bool,
}
impl Default for BufferConfig {
fn default() -> Self {
Self {
initial_size: 1000,
max_size: 100000,
growth_factor: 1.5,
adaptive_sizing: true,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ErrorConfig {
pub max_retries: usize,
pub retry_delay: Duration,
pub backoff_factor: f64,
pub max_retry_delay: Duration,
pub enable_circuit_breaker: bool,
}
impl Default for ErrorConfig {
fn default() -> Self {
Self {
max_retries: 3,
retry_delay: Duration::from_millis(100),
backoff_factor: 2.0,
max_retry_delay: Duration::from_secs(30),
enable_circuit_breaker: true,
}
}
}
pub struct StreamProcessor {
stream_id: String,
config: StreamConfig,
is_running: AtomicBool,
state: StreamState,
}
impl StreamProcessor {
pub fn new(stream_id: String, config: StreamConfig) -> Result<Self> {
let state = StreamState {
stream_id: stream_id.clone(),
offset: 0,
last_processed: std::time::SystemTime::now(),
status: StreamStatus::Initializing,
error_count: 0,
last_error: None,
};
Ok(Self {
stream_id,
config,
is_running: AtomicBool::new(false),
state,
})
}
pub async fn start(&self) -> Result<()> {
if self.is_running.load(Ordering::Acquire) {
return Err(anyhow::anyhow!("Stream processor is already running"));
}
self.is_running.store(true, Ordering::Release);
self.initialize_stream().await?;
Ok(())
}
pub async fn stop(&self) -> Result<()> {
self.is_running.store(false, Ordering::Release);
self.cleanup_stream().await?;
Ok(())
}
pub async fn process_item(&self, item: ContentItem) -> Result<()> {
if !self.is_running.load(Ordering::Acquire) {
return Err(anyhow::anyhow!("Stream processor is not running"));
}
self.handle_content_item(item).await?;
Ok(())
}
pub fn get_state(&self) -> &StreamState {
&self.state
}
pub fn get_config(&self) -> &StreamConfig {
&self.config
}
pub async fn health_check(&self) -> Result<HealthStatus> {
if !self.is_running.load(Ordering::Acquire) {
return Ok(HealthStatus::Unhealthy {
message: "Stream processor is not running".to_string(),
});
}
if self.state.error_count > 10 {
return Ok(HealthStatus::Warning {
message: format!("High error count: {}", self.state.error_count),
});
}
Ok(HealthStatus::Healthy)
}
pub fn is_running(&self) -> bool {
self.is_running.load(Ordering::Acquire)
}
async fn initialize_stream(&self) -> Result<()> {
tokio::time::sleep(Duration::from_millis(10)).await;
Ok(())
}
async fn cleanup_stream(&self) -> Result<()> {
tokio::time::sleep(Duration::from_millis(10)).await;
Ok(())
}
async fn handle_content_item(&self, _item: ContentItem) -> Result<()> {
tokio::time::sleep(Duration::from_millis(1)).await;
Ok(())
}
}
pub struct StreamMultiplexer {
processors: std::sync::RwLock<std::collections::HashMap<String, StreamProcessor>>,
config: StreamProcessorConfig,
}
impl StreamMultiplexer {
pub fn new(config: StreamProcessorConfig) -> Self {
Self {
processors: std::sync::RwLock::new(std::collections::HashMap::new()),
config,
}
}
pub fn add_processor(&self, processor: StreamProcessor) -> Result<()> {
let mut processors = self
.processors
.write()
.map_err(|_| anyhow::anyhow!("Failed to acquire processors lock"))?;
let stream_id = processor.stream_id.clone();
processors.insert(stream_id, processor);
Ok(())
}
pub async fn remove_processor(&self, stream_id: &str) -> Result<()> {
let processor = {
let mut processors = self
.processors
.write()
.map_err(|_| anyhow::anyhow!("Failed to acquire processors lock"))?;
processors.remove(stream_id)
};
if let Some(processor) = processor {
processor.stop().await?;
}
Ok(())
}
pub fn processor_count(&self) -> usize {
self.processors.read().map(|p| p.len()).unwrap_or(0)
}
pub async fn health_check(&self) -> Result<HealthStatus> {
let processor_count = {
let processors = self
.processors
.read()
.map_err(|_| anyhow::anyhow!("Failed to acquire processors lock"))?;
processors.len()
};
let unhealthy_count = {
0
};
if unhealthy_count == 0 {
Ok(HealthStatus::Healthy)
} else if unhealthy_count < processor_count {
Ok(HealthStatus::Warning {
message: format!("{unhealthy_count} processors are unhealthy"),
})
} else {
Ok(HealthStatus::Unhealthy {
message: "All processors are unhealthy".to_string(),
})
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_stream_config_default() {
let config = StreamConfig::default();
assert_eq!(config.buffer_size, 1000);
assert_eq!(config.max_retries, 3);
}
#[tokio::test]
async fn test_stream_processor_creation() {
let config = StreamConfig::default();
let processor = StreamProcessor::new("test_stream".to_string(), config);
assert!(processor.is_ok());
}
#[tokio::test]
async fn test_stream_processor_start_stop() -> Result<()> {
let config = StreamConfig::default();
let processor = StreamProcessor::new("test_stream".to_string(), config)?;
assert!(!processor.is_running());
let start_result = processor.start().await;
assert!(start_result.is_ok());
assert!(processor.is_running());
let stop_result = processor.stop().await;
assert!(stop_result.is_ok());
Ok(())
}
#[test]
fn test_stream_multiplexer() {
let config = StreamProcessorConfig::default();
let multiplexer = StreamMultiplexer::new(config);
assert_eq!(multiplexer.processor_count(), 0);
}
}