Skip to main content

oxirs_stream/
lib_types.rs

1//! Type definitions extracted from lib.rs for 2000-line policy compliance
2
3use anyhow::{anyhow, Result};
4use chrono::{DateTime, Utc};
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9use tokio::sync::{RwLock, Semaphore};
10use tracing::{debug, error, info};
11use uuid::Uuid;
12
13use crate::backend::{self, StreamBackend};
14use crate::circuit_breaker::{self, SharedCircuitBreakerExt};
15use crate::event::{EventMetadata, StreamEvent};
16
17/// Enhanced stream configuration with advanced features
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct StreamConfig {
20    pub backend: StreamBackendType,
21    pub topic: String,
22    pub batch_size: usize,
23    pub flush_interval_ms: u64,
24    /// Maximum concurrent connections
25    pub max_connections: usize,
26    /// Connection timeout
27    pub connection_timeout: Duration,
28    /// Enable compression
29    pub enable_compression: bool,
30    /// Compression type
31    pub compression_type: CompressionType,
32    /// Retry configuration
33    pub retry_config: RetryConfig,
34    /// Circuit breaker configuration
35    pub circuit_breaker: CircuitBreakerConfig,
36    /// Security configuration
37    pub security: SecurityConfig,
38    /// Performance tuning
39    pub performance: StreamPerformanceConfig,
40    /// Monitoring configuration
41    pub monitoring: MonitoringConfig,
42}
43
44/// Compression types supported
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub enum CompressionType {
47    None,
48    Gzip,
49    Snappy,
50    Lz4,
51    Zstd,
52}
53
54/// Retry configuration
55#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct RetryConfig {
57    pub max_retries: u32,
58    pub initial_backoff: Duration,
59    pub max_backoff: Duration,
60    pub backoff_multiplier: f64,
61    pub jitter: bool,
62}
63
64/// Circuit breaker configuration
65#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct CircuitBreakerConfig {
67    pub enabled: bool,
68    pub failure_threshold: u32,
69    pub success_threshold: u32,
70    pub timeout: Duration,
71    pub half_open_max_calls: u32,
72}
73
74/// Security configuration
75#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct SecurityConfig {
77    pub enable_tls: bool,
78    pub verify_certificates: bool,
79    pub client_cert_path: Option<String>,
80    pub client_key_path: Option<String>,
81    pub ca_cert_path: Option<String>,
82    pub sasl_config: Option<SaslConfig>,
83}
84
85/// SASL authentication configuration
86#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct SaslConfig {
88    pub mechanism: SaslMechanism,
89    pub username: String,
90    pub password: String,
91}
92
93/// SASL authentication mechanisms
94#[derive(Debug, Clone, Serialize, Deserialize)]
95pub enum SaslMechanism {
96    Plain,
97    ScramSha256,
98    ScramSha512,
99    OAuthBearer,
100}
101
102/// Performance tuning configuration
103#[derive(Debug, Clone, Serialize, Deserialize)]
104pub struct StreamPerformanceConfig {
105    pub enable_batching: bool,
106    pub enable_pipelining: bool,
107    pub buffer_size: usize,
108    pub prefetch_count: u32,
109    pub enable_zero_copy: bool,
110    pub enable_simd: bool,
111    pub parallel_processing: bool,
112    pub worker_threads: Option<usize>,
113}
114
115/// Monitoring configuration
116#[derive(Debug, Clone, Serialize, Deserialize)]
117pub struct MonitoringConfig {
118    pub enable_metrics: bool,
119    pub enable_tracing: bool,
120    pub metrics_interval: Duration,
121    pub health_check_interval: Duration,
122    pub enable_profiling: bool,
123    pub prometheus_endpoint: Option<String>,
124    pub jaeger_endpoint: Option<String>,
125    pub log_level: String,
126}
127
128/// Enhanced streaming backend options
129#[derive(Debug, Clone, Serialize, Deserialize)]
130pub enum StreamBackendType {
131    #[cfg(feature = "kafka")]
132    Kafka {
133        brokers: Vec<String>,
134        security_protocol: Option<String>,
135        sasl_config: Option<SaslConfig>,
136    },
137    #[cfg(feature = "nats")]
138    Nats {
139        url: String,
140        cluster_urls: Option<Vec<String>>,
141        jetstream_config: Option<NatsJetStreamConfig>,
142    },
143    #[cfg(feature = "redis")]
144    Redis {
145        url: String,
146        cluster_urls: Option<Vec<String>>,
147        pool_size: Option<usize>,
148    },
149    #[cfg(feature = "kinesis")]
150    Kinesis {
151        region: String,
152        stream_name: String,
153        credentials: Option<AwsCredentials>,
154    },
155    #[cfg(feature = "pulsar")]
156    Pulsar {
157        service_url: String,
158        auth_config: Option<PulsarAuthConfig>,
159    },
160    #[cfg(feature = "rabbitmq")]
161    RabbitMQ {
162        url: String,
163        exchange: Option<String>,
164        queue: Option<String>,
165    },
166    Memory {
167        max_size: Option<usize>,
168        persistence: bool,
169    },
170}
171
172/// NATS JetStream configuration
173#[derive(Debug, Clone, Serialize, Deserialize)]
174pub struct NatsJetStreamConfig {
175    pub domain: Option<String>,
176    pub api_prefix: Option<String>,
177    pub timeout: Duration,
178}
179
180/// AWS credentials configuration
181#[derive(Debug, Clone, Serialize, Deserialize)]
182pub struct AwsCredentials {
183    pub access_key_id: String,
184    pub secret_access_key: String,
185    pub session_token: Option<String>,
186    pub role_arn: Option<String>,
187}
188
189/// Pulsar authentication configuration
190#[derive(Debug, Clone, Serialize, Deserialize)]
191pub struct PulsarAuthConfig {
192    pub auth_method: PulsarAuthMethod,
193    pub auth_params: HashMap<String, String>,
194}
195
196/// Pulsar authentication methods
197#[derive(Debug, Clone, Serialize, Deserialize)]
198pub enum PulsarAuthMethod {
199    Token,
200    Jwt,
201    Oauth2,
202    Tls,
203}
204
205/// Enhanced stream producer for publishing RDF changes with backend support
206pub struct StreamProducer {
207    config: StreamConfig,
208    backend_producer: BackendProducer,
209    stats: Arc<RwLock<ProducerStats>>,
210    circuit_breaker: Option<circuit_breaker::SharedCircuitBreaker>,
211    last_flush: Instant,
212    _pending_events: Arc<RwLock<Vec<StreamEvent>>>,
213    batch_buffer: Arc<RwLock<Vec<StreamEvent>>>,
214    flush_semaphore: Arc<Semaphore>,
215}
216
217/// Backend-agnostic producer wrapper
218enum BackendProducer {
219    #[cfg(feature = "kafka")]
220    Kafka(backend::kafka::KafkaProducer),
221    #[cfg(feature = "nats")]
222    Nats(Box<backend::nats::NatsProducer>),
223    #[cfg(feature = "redis")]
224    Redis(backend::redis::RedisProducer),
225    #[cfg(feature = "kinesis")]
226    Kinesis(backend::kinesis::KinesisProducer),
227    #[cfg(feature = "pulsar")]
228    Pulsar(Box<backend::pulsar::PulsarProducer>),
229    #[cfg(feature = "rabbitmq")]
230    RabbitMQ(Box<backend::rabbitmq::RabbitMQProducer>),
231    Memory(MemoryProducer),
232}
233
234/// Producer statistics for monitoring
235#[derive(Debug, Default, Clone)]
236pub struct ProducerStats {
237    events_published: u64,
238    events_failed: u64,
239    _bytes_sent: u64,
240    avg_latency_ms: f64,
241    max_latency_ms: u64,
242    batch_count: u64,
243    flush_count: u64,
244    circuit_breaker_trips: u64,
245    last_publish: Option<DateTime<Utc>>,
246    backend_type: String,
247}
248
249/// Memory-based producer for testing and development
250// Type aliases for complex types
251type MemoryEventVec = Vec<(DateTime<Utc>, StreamEvent)>;
252type MemoryEventStore = Arc<RwLock<MemoryEventVec>>;
253
254// Global shared storage for memory backend events
255static MEMORY_EVENTS: std::sync::OnceLock<MemoryEventStore> = std::sync::OnceLock::new();
256
257pub fn get_memory_events() -> MemoryEventStore {
258    MEMORY_EVENTS
259        .get_or_init(|| Arc::new(RwLock::new(Vec::new())))
260        .clone()
261}
262
263/// Clear the global memory storage (for testing)
264pub async fn clear_memory_events() {
265    let events = get_memory_events();
266    events.write().await.clear();
267    // Also clear the memory backend storage
268    backend::memory::clear_memory_storage().await;
269}
270
271struct MemoryProducer {
272    backend: Box<dyn StreamBackend + Send + Sync>,
273    topic: String,
274    stats: ProducerStats,
275}
276
277impl MemoryProducer {
278    fn _new(_max_size: Option<usize>, _persistence: bool) -> Self {
279        Self {
280            backend: Box::new(backend::memory::MemoryBackend::new()),
281            topic: "oxirs-stream".to_string(), // Default topic
282            stats: ProducerStats {
283                backend_type: "memory".to_string(),
284                ..Default::default()
285            },
286        }
287    }
288
289    fn with_topic(topic: String) -> Self {
290        Self {
291            backend: Box::new(backend::memory::MemoryBackend::new()),
292            topic,
293            stats: ProducerStats {
294                backend_type: "memory".to_string(),
295                ..Default::default()
296            },
297        }
298    }
299
300    async fn publish(&mut self, event: StreamEvent) -> Result<()> {
301        let start_time = Instant::now();
302
303        // Use the backend implementation
304        self.backend
305            .as_mut()
306            .connect()
307            .await
308            .map_err(|e| anyhow!("Backend connect failed: {}", e))?;
309
310        // Create topic if it doesn't exist
311        let topic_name = crate::types::TopicName::new(self.topic.clone());
312        self.backend
313            .create_topic(&topic_name, 1)
314            .await
315            .map_err(|e| anyhow!("Topic creation failed: {}", e))?;
316
317        // Send event through backend
318        self.backend
319            .send_event(&topic_name, event)
320            .await
321            .map_err(|e| anyhow!("Send event failed: {}", e))?;
322
323        // Update stats
324        self.stats.events_published += 1;
325        let latency = start_time.elapsed().as_millis() as u64;
326        self.stats.max_latency_ms = self.stats.max_latency_ms.max(latency);
327        self.stats.avg_latency_ms = (self.stats.avg_latency_ms + latency as f64) / 2.0;
328        self.stats.last_publish = Some(Utc::now());
329
330        debug!("Memory producer: published event via backend");
331        Ok(())
332    }
333
334    async fn flush(&mut self) -> Result<()> {
335        // Backend handles flushing internally
336        self.stats.flush_count += 1;
337        debug!("Memory producer: flush completed");
338        Ok(())
339    }
340
341    fn _get_stats(&self) -> &ProducerStats {
342        &self.stats
343    }
344}
345
346impl StreamProducer {
347    /// Create a new enhanced stream producer with backend support
348    pub async fn new(config: StreamConfig) -> Result<Self> {
349        // Initialize circuit breaker if enabled
350        let circuit_breaker = if config.circuit_breaker.enabled {
351            Some(circuit_breaker::new_shared_circuit_breaker(
352                circuit_breaker::CircuitBreakerConfig {
353                    enabled: config.circuit_breaker.enabled,
354                    failure_threshold: config.circuit_breaker.failure_threshold,
355                    success_threshold: config.circuit_breaker.success_threshold,
356                    timeout: config.circuit_breaker.timeout,
357                    half_open_max_calls: config.circuit_breaker.half_open_max_calls,
358                    ..Default::default()
359                },
360            ))
361        } else {
362            None
363        };
364
365        // Initialize backend-specific producer
366        let backend_producer = match &config.backend {
367            #[cfg(feature = "kafka")]
368            StreamBackendType::Kafka {
369                brokers,
370                security_protocol,
371                sasl_config,
372            } => {
373                let stream_config = crate::StreamConfig {
374                    backend: crate::StreamBackendType::Kafka {
375                        brokers: brokers.clone(),
376                        security_protocol: security_protocol.clone(),
377                        sasl_config: sasl_config.clone(),
378                    },
379                    topic: config.topic.clone(),
380                    batch_size: config.batch_size,
381                    flush_interval_ms: config.flush_interval_ms,
382                    max_connections: config.max_connections,
383                    connection_timeout: config.connection_timeout,
384                    enable_compression: config.enable_compression,
385                    compression_type: config.compression_type.clone(),
386                    retry_config: config.retry_config.clone(),
387                    circuit_breaker: config.circuit_breaker.clone(),
388                    security: config.security.clone(),
389                    performance: config.performance.clone(),
390                    monitoring: config.monitoring.clone(),
391                };
392
393                let mut producer = backend::kafka::KafkaProducer::new(stream_config)?;
394                producer.connect().await?;
395                BackendProducer::Kafka(producer)
396            }
397            #[cfg(feature = "nats")]
398            StreamBackendType::Nats {
399                url,
400                cluster_urls,
401                jetstream_config,
402            } => {
403                let stream_config = crate::StreamConfig {
404                    backend: crate::StreamBackendType::Nats {
405                        url: url.clone(),
406                        cluster_urls: cluster_urls.clone(),
407                        jetstream_config: jetstream_config.clone(),
408                    },
409                    topic: config.topic.clone(),
410                    batch_size: config.batch_size,
411                    flush_interval_ms: config.flush_interval_ms,
412                    max_connections: config.max_connections,
413                    connection_timeout: config.connection_timeout,
414                    enable_compression: config.enable_compression,
415                    compression_type: config.compression_type.clone(),
416                    retry_config: config.retry_config.clone(),
417                    circuit_breaker: config.circuit_breaker.clone(),
418                    security: config.security.clone(),
419                    performance: config.performance.clone(),
420                    monitoring: config.monitoring.clone(),
421                };
422
423                let mut producer = backend::nats::NatsProducer::new(stream_config)?;
424                producer.connect().await?;
425                BackendProducer::Nats(Box::new(producer))
426            }
427            #[cfg(feature = "redis")]
428            StreamBackendType::Redis {
429                url,
430                cluster_urls,
431                pool_size,
432            } => {
433                let stream_config = crate::StreamConfig {
434                    backend: crate::StreamBackendType::Redis {
435                        url: url.clone(),
436                        cluster_urls: cluster_urls.clone(),
437                        pool_size: *pool_size,
438                    },
439                    topic: config.topic.clone(),
440                    batch_size: config.batch_size,
441                    flush_interval_ms: config.flush_interval_ms,
442                    max_connections: config.max_connections,
443                    connection_timeout: config.connection_timeout,
444                    enable_compression: config.enable_compression,
445                    compression_type: config.compression_type.clone(),
446                    retry_config: config.retry_config.clone(),
447                    circuit_breaker: config.circuit_breaker.clone(),
448                    security: config.security.clone(),
449                    performance: config.performance.clone(),
450                    monitoring: config.monitoring.clone(),
451                };
452
453                let mut producer = backend::redis::RedisProducer::new(stream_config)?;
454                producer.connect().await?;
455                BackendProducer::Redis(producer)
456            }
457            #[cfg(feature = "kinesis")]
458            StreamBackendType::Kinesis {
459                region,
460                stream_name,
461                credentials,
462            } => {
463                let stream_config = crate::StreamConfig {
464                    backend: crate::StreamBackendType::Kinesis {
465                        region: region.clone(),
466                        stream_name: stream_name.clone(),
467                        credentials: credentials.clone(),
468                    },
469                    topic: config.topic.clone(),
470                    batch_size: config.batch_size,
471                    flush_interval_ms: config.flush_interval_ms,
472                    max_connections: config.max_connections,
473                    connection_timeout: config.connection_timeout,
474                    enable_compression: config.enable_compression,
475                    compression_type: config.compression_type.clone(),
476                    retry_config: config.retry_config.clone(),
477                    circuit_breaker: config.circuit_breaker.clone(),
478                    security: config.security.clone(),
479                    performance: config.performance.clone(),
480                    monitoring: config.monitoring.clone(),
481                };
482
483                let mut producer = backend::kinesis::KinesisProducer::new(stream_config)?;
484                producer.connect().await?;
485                BackendProducer::Kinesis(producer)
486            }
487            #[cfg(feature = "pulsar")]
488            StreamBackendType::Pulsar {
489                service_url,
490                auth_config,
491            } => {
492                let stream_config = crate::StreamConfig {
493                    backend: crate::StreamBackendType::Pulsar {
494                        service_url: service_url.clone(),
495                        auth_config: auth_config.clone(),
496                    },
497                    topic: config.topic.clone(),
498                    batch_size: config.batch_size,
499                    flush_interval_ms: config.flush_interval_ms,
500                    max_connections: config.max_connections,
501                    connection_timeout: config.connection_timeout,
502                    enable_compression: config.enable_compression,
503                    compression_type: config.compression_type.clone(),
504                    retry_config: config.retry_config.clone(),
505                    circuit_breaker: config.circuit_breaker.clone(),
506                    security: config.security.clone(),
507                    performance: config.performance.clone(),
508                    monitoring: config.monitoring.clone(),
509                };
510
511                let mut producer = backend::pulsar::PulsarProducer::new(stream_config)?;
512                producer.connect().await?;
513                BackendProducer::Pulsar(Box::new(producer))
514            }
515            #[cfg(feature = "rabbitmq")]
516            StreamBackendType::RabbitMQ {
517                url,
518                exchange,
519                queue,
520            } => {
521                let stream_config = crate::StreamConfig {
522                    backend: crate::StreamBackendType::RabbitMQ {
523                        url: url.clone(),
524                        exchange: exchange.clone(),
525                        queue: queue.clone(),
526                    },
527                    topic: config.topic.clone(),
528                    batch_size: config.batch_size,
529                    flush_interval_ms: config.flush_interval_ms,
530                    max_connections: config.max_connections,
531                    connection_timeout: config.connection_timeout,
532                    enable_compression: config.enable_compression,
533                    compression_type: config.compression_type.clone(),
534                    retry_config: config.retry_config.clone(),
535                    circuit_breaker: config.circuit_breaker.clone(),
536                    security: config.security.clone(),
537                    performance: config.performance.clone(),
538                    monitoring: config.monitoring.clone(),
539                };
540
541                let mut producer = backend::rabbitmq::RabbitMQProducer::new(stream_config)?;
542                producer.connect().await?;
543                BackendProducer::RabbitMQ(Box::new(producer))
544            }
545            StreamBackendType::Memory {
546                max_size: _,
547                persistence: _,
548            } => BackendProducer::Memory(MemoryProducer::with_topic(config.topic.clone())),
549        };
550
551        let stats = Arc::new(RwLock::new(ProducerStats {
552            backend_type: match backend_producer {
553                #[cfg(feature = "kafka")]
554                BackendProducer::Kafka(_) => "kafka".to_string(),
555                #[cfg(feature = "nats")]
556                BackendProducer::Nats(_) => "nats".to_string(),
557                #[cfg(feature = "redis")]
558                BackendProducer::Redis(_) => "redis".to_string(),
559                #[cfg(feature = "kinesis")]
560                BackendProducer::Kinesis(_) => "kinesis".to_string(),
561                #[cfg(feature = "pulsar")]
562                BackendProducer::Pulsar(_) => "pulsar".to_string(),
563                #[cfg(feature = "rabbitmq")]
564                BackendProducer::RabbitMQ(_) => "rabbitmq".to_string(),
565                BackendProducer::Memory(_) => "memory".to_string(),
566            },
567            ..Default::default()
568        }));
569
570        info!(
571            "Created stream producer with backend: {}",
572            stats.read().await.backend_type
573        );
574
575        Ok(Self {
576            config,
577            backend_producer,
578            stats,
579            circuit_breaker,
580            last_flush: Instant::now(),
581            _pending_events: Arc::new(RwLock::new(Vec::new())),
582            batch_buffer: Arc::new(RwLock::new(Vec::new())),
583            flush_semaphore: Arc::new(Semaphore::new(1)),
584        })
585    }
586
587    /// Publish a stream event with circuit breaker protection and batching
588    pub async fn publish(&mut self, event: StreamEvent) -> Result<()> {
589        let start_time = Instant::now();
590
591        // Check circuit breaker if enabled
592        if let Some(cb) = &self.circuit_breaker {
593            if !cb.can_execute().await {
594                self.stats.write().await.circuit_breaker_trips += 1;
595                return Err(anyhow!("Circuit breaker is open - cannot publish events"));
596            }
597        }
598
599        // Handle batching if enabled
600        if self.config.performance.enable_batching {
601            let mut batch_buffer = self.batch_buffer.write().await;
602            batch_buffer.push(event);
603
604            if batch_buffer.len() >= self.config.batch_size {
605                let events = std::mem::take(&mut *batch_buffer);
606                drop(batch_buffer);
607                return self.publish_batch_internal(events).await;
608            }
609
610            return Ok(());
611        }
612
613        // Publish single event
614        let result = self.publish_single_event(event).await;
615
616        // Update circuit breaker and stats
617        match &result {
618            Ok(_) => {
619                if let Some(cb) = &self.circuit_breaker {
620                    cb.record_success_with_duration(start_time.elapsed()).await;
621                }
622
623                let mut stats = self.stats.write().await;
624                stats.events_published += 1;
625                let latency = start_time.elapsed().as_millis() as u64;
626                stats.max_latency_ms = stats.max_latency_ms.max(latency);
627                stats.avg_latency_ms = (stats.avg_latency_ms + latency as f64) / 2.0;
628                stats.last_publish = Some(Utc::now());
629            }
630            Err(_) => {
631                if let Some(cb) = &self.circuit_breaker {
632                    cb.record_failure_with_type(circuit_breaker::FailureType::NetworkError)
633                        .await;
634                }
635
636                self.stats.write().await.events_failed += 1;
637            }
638        }
639
640        result
641    }
642
643    /// Publish a single event to the backend
644    async fn publish_single_event(&mut self, event: StreamEvent) -> Result<()> {
645        match &mut self.backend_producer {
646            #[cfg(feature = "kafka")]
647            BackendProducer::Kafka(producer) => producer.publish(event).await,
648            #[cfg(feature = "nats")]
649            BackendProducer::Nats(producer) => producer.publish(event).await,
650            #[cfg(feature = "redis")]
651            BackendProducer::Redis(producer) => producer.publish(event).await,
652            #[cfg(feature = "kinesis")]
653            BackendProducer::Kinesis(producer) => producer.publish(event).await,
654            #[cfg(feature = "pulsar")]
655            BackendProducer::Pulsar(producer) => producer.publish(event).await,
656            #[cfg(feature = "rabbitmq")]
657            BackendProducer::RabbitMQ(producer) => producer.publish(event).await,
658            BackendProducer::Memory(producer) => producer.publish(event).await,
659        }
660    }
661
662    /// Publish multiple events as a batch
663    pub async fn publish_batch(&mut self, events: Vec<StreamEvent>) -> Result<()> {
664        if events.is_empty() {
665            return Ok(());
666        }
667
668        self.publish_batch_internal(events).await
669    }
670
671    /// Internal batch publishing implementation
672    async fn publish_batch_internal(&mut self, events: Vec<StreamEvent>) -> Result<()> {
673        let start_time = Instant::now();
674        let event_count = events.len();
675
676        let result = match &mut self.backend_producer {
677            #[cfg(feature = "kafka")]
678            BackendProducer::Kafka(producer) => producer.publish_batch(events).await,
679            #[cfg(feature = "nats")]
680            BackendProducer::Nats(producer) => producer.publish_batch(events).await,
681            #[cfg(feature = "redis")]
682            BackendProducer::Redis(producer) => producer.publish_batch(events).await,
683            #[cfg(feature = "kinesis")]
684            BackendProducer::Kinesis(producer) => producer.publish_batch(events).await,
685            #[cfg(feature = "pulsar")]
686            BackendProducer::Pulsar(producer) => producer.publish_batch(events).await,
687            #[cfg(feature = "rabbitmq")]
688            BackendProducer::RabbitMQ(producer) => producer.publish_batch(events).await,
689            BackendProducer::Memory(producer) => {
690                for event in events {
691                    producer.publish(event).await?;
692                }
693                Ok(())
694            }
695        };
696
697        // Update stats
698        let mut stats = self.stats.write().await;
699        match &result {
700            Ok(_) => {
701                stats.events_published += event_count as u64;
702                stats.batch_count += 1;
703                let latency = start_time.elapsed().as_millis() as u64;
704                stats.max_latency_ms = stats.max_latency_ms.max(latency);
705                stats.avg_latency_ms = (stats.avg_latency_ms + latency as f64) / 2.0;
706                stats.last_publish = Some(Utc::now());
707            }
708            Err(_) => {
709                stats.events_failed += event_count as u64;
710            }
711        }
712
713        debug!(
714            "Published batch of {} events in {:?}",
715            event_count,
716            start_time.elapsed()
717        );
718        result
719    }
720
721    /// Flush any pending events and buffers
722    pub async fn flush(&mut self) -> Result<()> {
723        let _permit = self
724            .flush_semaphore
725            .acquire()
726            .await
727            .map_err(|_| anyhow!("Failed to acquire flush semaphore"))?;
728
729        let start_time = Instant::now();
730
731        // Flush any pending batch buffer
732        if self.config.performance.enable_batching {
733            let events = {
734                let mut batch_buffer = self.batch_buffer.write().await;
735                if !batch_buffer.is_empty() {
736                    std::mem::take(&mut *batch_buffer)
737                } else {
738                    Vec::new()
739                }
740            };
741            if !events.is_empty() {
742                drop(_permit); // Release permit before mutable borrow
743                self.publish_batch_internal(events).await?;
744            }
745        }
746
747        // Flush backend-specific buffers
748        let result = match &mut self.backend_producer {
749            #[cfg(feature = "kafka")]
750            BackendProducer::Kafka(producer) => producer.flush().await,
751            #[cfg(feature = "nats")]
752            BackendProducer::Nats(producer) => producer.flush().await,
753            #[cfg(feature = "redis")]
754            BackendProducer::Redis(producer) => producer.flush().await,
755            #[cfg(feature = "kinesis")]
756            BackendProducer::Kinesis(producer) => producer.flush().await,
757            #[cfg(feature = "pulsar")]
758            BackendProducer::Pulsar(producer) => producer.flush().await,
759            #[cfg(feature = "rabbitmq")]
760            BackendProducer::RabbitMQ(producer) => producer.flush().await,
761            BackendProducer::Memory(producer) => producer.flush().await,
762        };
763
764        // Update stats
765        if result.is_ok() {
766            self.stats.write().await.flush_count += 1;
767            self.last_flush = Instant::now();
768            debug!("Flushed producer buffers in {:?}", start_time.elapsed());
769        }
770
771        result
772    }
773
774    /// Publish an RDF patch as a series of events
775    pub async fn publish_patch(&mut self, patch: &RdfPatch) -> Result<()> {
776        let events: Vec<StreamEvent> = patch
777            .operations
778            .iter()
779            .filter_map(|op| {
780                let metadata = EventMetadata {
781                    event_id: Uuid::new_v4().to_string(),
782                    timestamp: patch.timestamp,
783                    source: "rdf_patch".to_string(),
784                    user: None,
785                    context: Some(patch.id.clone()),
786                    caused_by: None,
787                    version: "1.0".to_string(),
788                    properties: HashMap::new(),
789                    checksum: None,
790                };
791
792                match op {
793                    PatchOperation::Add {
794                        subject,
795                        predicate,
796                        object,
797                    } => Some(StreamEvent::TripleAdded {
798                        subject: subject.clone(),
799                        predicate: predicate.clone(),
800                        object: object.clone(),
801                        graph: None,
802                        metadata,
803                    }),
804                    PatchOperation::Delete {
805                        subject,
806                        predicate,
807                        object,
808                    } => Some(StreamEvent::TripleRemoved {
809                        subject: subject.clone(),
810                        predicate: predicate.clone(),
811                        object: object.clone(),
812                        graph: None,
813                        metadata,
814                    }),
815                    PatchOperation::AddGraph { graph } => Some(StreamEvent::GraphCreated {
816                        graph: graph.clone(),
817                        metadata,
818                    }),
819                    PatchOperation::DeleteGraph { graph } => Some(StreamEvent::GraphDeleted {
820                        graph: graph.clone(),
821                        metadata,
822                    }),
823                    PatchOperation::AddPrefix { .. } => {
824                        // Skip prefix operations for now
825                        None
826                    }
827                    PatchOperation::DeletePrefix { .. } => {
828                        // Skip prefix operations for now
829                        None
830                    }
831                    PatchOperation::TransactionBegin { .. } => {
832                        Some(StreamEvent::TransactionBegin {
833                            transaction_id: patch
834                                .transaction_id
835                                .clone()
836                                .unwrap_or_else(|| Uuid::new_v4().to_string()),
837                            isolation_level: None,
838                            metadata,
839                        })
840                    }
841                    PatchOperation::TransactionCommit => Some(StreamEvent::TransactionCommit {
842                        transaction_id: patch
843                            .transaction_id
844                            .clone()
845                            .unwrap_or_else(|| Uuid::new_v4().to_string()),
846                        metadata,
847                    }),
848                    PatchOperation::TransactionAbort => Some(StreamEvent::TransactionAbort {
849                        transaction_id: patch
850                            .transaction_id
851                            .clone()
852                            .unwrap_or_else(|| Uuid::new_v4().to_string()),
853                        metadata,
854                    }),
855                    PatchOperation::Header { .. } => {
856                        // Skip header operations for now
857                        None
858                    }
859                }
860            })
861            .collect();
862
863        self.publish_batch(events).await
864    }
865
866    /// Get producer statistics
867    pub async fn get_stats(&self) -> ProducerStats {
868        self.stats.read().await.clone()
869    }
870
871    /// Get producer health status
872    pub async fn health_check(&self) -> bool {
873        if let Some(cb) = &self.circuit_breaker {
874            cb.is_healthy().await
875        } else {
876            true
877        }
878    }
879}
880
881/// Enhanced stream consumer for receiving RDF changes with backend support
882pub struct StreamConsumer {
883    _config: StreamConfig,
884    backend_consumer: BackendConsumer,
885    stats: Arc<RwLock<ConsumerStats>>,
886    circuit_breaker: Option<circuit_breaker::SharedCircuitBreaker>,
887    last_poll: Instant,
888    _message_buffer: Arc<RwLock<Vec<StreamEvent>>>,
889    consumer_group: Option<String>,
890}
891
892/// Backend-agnostic consumer wrapper
893enum BackendConsumer {
894    #[cfg(feature = "kafka")]
895    Kafka(Box<backend::kafka::KafkaConsumer>),
896    #[cfg(feature = "nats")]
897    Nats(Box<backend::nats::NatsConsumer>),
898    #[cfg(feature = "redis")]
899    Redis(Box<backend::redis::RedisConsumer>),
900    #[cfg(feature = "kinesis")]
901    Kinesis(Box<backend::kinesis::KinesisConsumer>),
902    #[cfg(feature = "pulsar")]
903    Pulsar(Box<backend::pulsar::PulsarConsumer>),
904    #[cfg(feature = "rabbitmq")]
905    RabbitMQ(Box<backend::rabbitmq::RabbitMQConsumer>),
906    Memory(Box<MemoryConsumer>),
907}
908
909/// Consumer statistics for monitoring
910#[derive(Debug, Default, Clone)]
911pub struct ConsumerStats {
912    events_consumed: u64,
913    events_failed: u64,
914    _bytes_received: u64,
915    avg_processing_time_ms: f64,
916    max_processing_time_ms: u64,
917    _consumer_lag: u64,
918    circuit_breaker_trips: u64,
919    last_message: Option<DateTime<Utc>>,
920    backend_type: String,
921    _batch_size: usize,
922}
923
924/// Memory-based consumer for testing and development
925struct MemoryConsumer {
926    backend: Box<dyn StreamBackend + Send + Sync>,
927    topic: String,
928    current_offset: u64,
929    stats: ConsumerStats,
930}
931
932impl MemoryConsumer {
933    fn _new() -> Self {
934        Self {
935            backend: Box::new(backend::memory::MemoryBackend::new()),
936            topic: "oxirs-stream".to_string(),
937            current_offset: 0,
938            stats: ConsumerStats {
939                backend_type: "memory".to_string(),
940                ..Default::default()
941            },
942        }
943    }
944
945    fn with_topic(topic: String) -> Self {
946        Self {
947            backend: Box::new(backend::memory::MemoryBackend::new()),
948            topic,
949            current_offset: 0,
950            stats: ConsumerStats {
951                backend_type: "memory".to_string(),
952                ..Default::default()
953            },
954        }
955    }
956
957    async fn consume(&mut self) -> Result<Option<StreamEvent>> {
958        let start_time = Instant::now();
959
960        // Use the backend implementation
961        self.backend
962            .as_mut()
963            .connect()
964            .await
965            .map_err(|e| anyhow!("Backend connect failed: {}", e))?;
966
967        // Try to receive events from the backend
968        let topic_name = crate::types::TopicName::new(self.topic.clone());
969        let events = self
970            .backend
971            .receive_events(
972                &topic_name,
973                None, // No consumer group for now
974                crate::types::StreamPosition::Offset(self.current_offset),
975                1, // Get one event at a time
976            )
977            .await
978            .map_err(|e| anyhow!("Receive events failed: {}", e))?;
979
980        if let Some((event, offset)) = events.first() {
981            self.current_offset = offset.value() + 1;
982
983            // Update stats
984            self.stats.events_consumed += 1;
985            let processing_time = start_time.elapsed().as_millis() as u64;
986            self.stats.max_processing_time_ms =
987                self.stats.max_processing_time_ms.max(processing_time);
988            self.stats.avg_processing_time_ms =
989                (self.stats.avg_processing_time_ms + processing_time as f64) / 2.0;
990            self.stats.last_message = Some(Utc::now());
991
992            debug!("Memory consumer: consumed event via backend");
993            Ok(Some(event.clone()))
994        } else {
995            debug!("Memory consumer: no events available");
996            Ok(None)
997        }
998    }
999
1000    fn _get_stats(&self) -> &ConsumerStats {
1001        &self.stats
1002    }
1003
1004    /// Reset consumer position for testing
1005    fn reset(&mut self) {
1006        self.current_offset = 0;
1007    }
1008}
1009
1010impl StreamConsumer {
1011    /// Create a new enhanced stream consumer with backend support
1012    pub async fn new(config: StreamConfig) -> Result<Self> {
1013        Self::new_with_group(config, None).await
1014    }
1015
1016    /// Create a new stream consumer with a specific consumer group
1017    pub async fn new_with_group(
1018        config: StreamConfig,
1019        consumer_group: Option<String>,
1020    ) -> Result<Self> {
1021        // Initialize circuit breaker if enabled
1022        let circuit_breaker = if config.circuit_breaker.enabled {
1023            Some(circuit_breaker::new_shared_circuit_breaker(
1024                circuit_breaker::CircuitBreakerConfig {
1025                    enabled: config.circuit_breaker.enabled,
1026                    failure_threshold: config.circuit_breaker.failure_threshold,
1027                    success_threshold: config.circuit_breaker.success_threshold,
1028                    timeout: config.circuit_breaker.timeout,
1029                    half_open_max_calls: config.circuit_breaker.half_open_max_calls,
1030                    ..Default::default()
1031                },
1032            ))
1033        } else {
1034            None
1035        };
1036
1037        // Initialize backend-specific consumer
1038        let backend_consumer = match &config.backend {
1039            #[cfg(feature = "kafka")]
1040            StreamBackendType::Kafka {
1041                brokers,
1042                security_protocol,
1043                sasl_config,
1044            } => {
1045                let stream_config = crate::StreamConfig {
1046                    backend: crate::StreamBackendType::Kafka {
1047                        brokers: brokers.clone(),
1048                        security_protocol: security_protocol.clone(),
1049                        sasl_config: sasl_config.clone(),
1050                    },
1051                    topic: config.topic.clone(),
1052                    batch_size: config.batch_size,
1053                    flush_interval_ms: config.flush_interval_ms,
1054                    max_connections: config.max_connections,
1055                    connection_timeout: config.connection_timeout,
1056                    enable_compression: config.enable_compression,
1057                    compression_type: config.compression_type.clone(),
1058                    retry_config: config.retry_config.clone(),
1059                    circuit_breaker: config.circuit_breaker.clone(),
1060                    security: config.security.clone(),
1061                    performance: config.performance.clone(),
1062                    monitoring: config.monitoring.clone(),
1063                };
1064
1065                let mut consumer = backend::kafka::KafkaConsumer::new(stream_config)?;
1066                consumer.connect().await?;
1067                BackendConsumer::Kafka(Box::new(consumer))
1068            }
1069            #[cfg(feature = "nats")]
1070            StreamBackendType::Nats {
1071                url,
1072                cluster_urls,
1073                jetstream_config,
1074            } => {
1075                let stream_config = crate::StreamConfig {
1076                    backend: crate::StreamBackendType::Nats {
1077                        url: url.clone(),
1078                        cluster_urls: cluster_urls.clone(),
1079                        jetstream_config: jetstream_config.clone(),
1080                    },
1081                    topic: config.topic.clone(),
1082                    batch_size: config.batch_size,
1083                    flush_interval_ms: config.flush_interval_ms,
1084                    max_connections: config.max_connections,
1085                    connection_timeout: config.connection_timeout,
1086                    enable_compression: config.enable_compression,
1087                    compression_type: config.compression_type.clone(),
1088                    retry_config: config.retry_config.clone(),
1089                    circuit_breaker: config.circuit_breaker.clone(),
1090                    security: config.security.clone(),
1091                    performance: config.performance.clone(),
1092                    monitoring: config.monitoring.clone(),
1093                };
1094
1095                let mut consumer = backend::nats::NatsConsumer::new(stream_config)?;
1096                consumer.connect().await?;
1097                BackendConsumer::Nats(Box::new(consumer))
1098            }
1099            #[cfg(feature = "redis")]
1100            StreamBackendType::Redis {
1101                url,
1102                cluster_urls,
1103                pool_size,
1104            } => {
1105                let stream_config = crate::StreamConfig {
1106                    backend: crate::StreamBackendType::Redis {
1107                        url: url.clone(),
1108                        cluster_urls: cluster_urls.clone(),
1109                        pool_size: *pool_size,
1110                    },
1111                    topic: config.topic.clone(),
1112                    batch_size: config.batch_size,
1113                    flush_interval_ms: config.flush_interval_ms,
1114                    max_connections: config.max_connections,
1115                    connection_timeout: config.connection_timeout,
1116                    enable_compression: config.enable_compression,
1117                    compression_type: config.compression_type.clone(),
1118                    retry_config: config.retry_config.clone(),
1119                    circuit_breaker: config.circuit_breaker.clone(),
1120                    security: config.security.clone(),
1121                    performance: config.performance.clone(),
1122                    monitoring: config.monitoring.clone(),
1123                };
1124
1125                let mut consumer = backend::redis::RedisConsumer::new(stream_config)?;
1126                consumer.connect().await?;
1127                BackendConsumer::Redis(Box::new(consumer))
1128            }
1129            #[cfg(feature = "kinesis")]
1130            StreamBackendType::Kinesis {
1131                region,
1132                stream_name,
1133                credentials,
1134            } => {
1135                let stream_config = crate::StreamConfig {
1136                    backend: crate::StreamBackendType::Kinesis {
1137                        region: region.clone(),
1138                        stream_name: stream_name.clone(),
1139                        credentials: credentials.clone(),
1140                    },
1141                    topic: config.topic.clone(),
1142                    batch_size: config.batch_size,
1143                    flush_interval_ms: config.flush_interval_ms,
1144                    max_connections: config.max_connections,
1145                    connection_timeout: config.connection_timeout,
1146                    enable_compression: config.enable_compression,
1147                    compression_type: config.compression_type.clone(),
1148                    retry_config: config.retry_config.clone(),
1149                    circuit_breaker: config.circuit_breaker.clone(),
1150                    security: config.security.clone(),
1151                    performance: config.performance.clone(),
1152                    monitoring: config.monitoring.clone(),
1153                };
1154
1155                let mut consumer = backend::kinesis::KinesisConsumer::new(stream_config)?;
1156                consumer.connect().await?;
1157                BackendConsumer::Kinesis(Box::new(consumer))
1158            }
1159            #[cfg(feature = "pulsar")]
1160            StreamBackendType::Pulsar {
1161                service_url,
1162                auth_config,
1163            } => {
1164                let stream_config = crate::StreamConfig {
1165                    backend: crate::StreamBackendType::Pulsar {
1166                        service_url: service_url.clone(),
1167                        auth_config: auth_config.clone(),
1168                    },
1169                    topic: config.topic.clone(),
1170                    batch_size: config.batch_size,
1171                    flush_interval_ms: config.flush_interval_ms,
1172                    max_connections: config.max_connections,
1173                    connection_timeout: config.connection_timeout,
1174                    enable_compression: config.enable_compression,
1175                    compression_type: config.compression_type.clone(),
1176                    retry_config: config.retry_config.clone(),
1177                    circuit_breaker: config.circuit_breaker.clone(),
1178                    security: config.security.clone(),
1179                    performance: config.performance.clone(),
1180                    monitoring: config.monitoring.clone(),
1181                };
1182
1183                let mut consumer = backend::pulsar::PulsarConsumer::new(stream_config)?;
1184                consumer.connect().await?;
1185                BackendConsumer::Pulsar(Box::new(consumer))
1186            }
1187            #[cfg(feature = "rabbitmq")]
1188            StreamBackendType::RabbitMQ {
1189                url,
1190                exchange,
1191                queue,
1192            } => {
1193                let stream_config = crate::StreamConfig {
1194                    backend: crate::StreamBackendType::RabbitMQ {
1195                        url: url.clone(),
1196                        exchange: exchange.clone(),
1197                        queue: queue.clone(),
1198                    },
1199                    topic: config.topic.clone(),
1200                    batch_size: config.batch_size,
1201                    flush_interval_ms: config.flush_interval_ms,
1202                    max_connections: config.max_connections,
1203                    connection_timeout: config.connection_timeout,
1204                    enable_compression: config.enable_compression,
1205                    compression_type: config.compression_type.clone(),
1206                    retry_config: config.retry_config.clone(),
1207                    circuit_breaker: config.circuit_breaker.clone(),
1208                    security: config.security.clone(),
1209                    performance: config.performance.clone(),
1210                    monitoring: config.monitoring.clone(),
1211                };
1212
1213                let mut consumer = backend::rabbitmq::RabbitMQConsumer::new(stream_config)?;
1214                consumer.connect().await?;
1215                BackendConsumer::RabbitMQ(Box::new(consumer))
1216            }
1217            StreamBackendType::Memory {
1218                max_size: _,
1219                persistence: _,
1220            } => {
1221                BackendConsumer::Memory(Box::new(MemoryConsumer::with_topic(config.topic.clone())))
1222            }
1223        };
1224
1225        let stats = Arc::new(RwLock::new(ConsumerStats {
1226            backend_type: match backend_consumer {
1227                #[cfg(feature = "kafka")]
1228                BackendConsumer::Kafka(_) => "kafka".to_string(),
1229                #[cfg(feature = "nats")]
1230                BackendConsumer::Nats(_) => "nats".to_string(),
1231                #[cfg(feature = "redis")]
1232                BackendConsumer::Redis(_) => "redis".to_string(),
1233                #[cfg(feature = "kinesis")]
1234                BackendConsumer::Kinesis(_) => "kinesis".to_string(),
1235                #[cfg(feature = "pulsar")]
1236                BackendConsumer::Pulsar(_) => "pulsar".to_string(),
1237                #[cfg(feature = "rabbitmq")]
1238                BackendConsumer::RabbitMQ(_) => "rabbitmq".to_string(),
1239                BackendConsumer::Memory(_) => "memory".to_string(),
1240            },
1241            _batch_size: config.batch_size,
1242            ..Default::default()
1243        }));
1244
1245        info!(
1246            "Created stream consumer with backend: {} and group: {:?}",
1247            stats.read().await.backend_type,
1248            consumer_group
1249        );
1250
1251        Ok(Self {
1252            _config: config,
1253            backend_consumer,
1254            stats,
1255            circuit_breaker,
1256            last_poll: Instant::now(),
1257            _message_buffer: Arc::new(RwLock::new(Vec::new())),
1258            consumer_group,
1259        })
1260    }
1261
1262    /// Consume stream events with circuit breaker protection
1263    pub async fn consume(&mut self) -> Result<Option<StreamEvent>> {
1264        let start_time = Instant::now();
1265
1266        // Check circuit breaker if enabled
1267        if let Some(cb) = &self.circuit_breaker {
1268            if !cb.can_execute().await {
1269                self.stats.write().await.circuit_breaker_trips += 1;
1270                return Err(anyhow!("Circuit breaker is open - cannot consume events"));
1271            }
1272        }
1273
1274        // Consume from backend
1275        let result = self.consume_single_event().await;
1276
1277        // Update circuit breaker and stats
1278        match &result {
1279            Ok(Some(_)) => {
1280                if let Some(cb) = &self.circuit_breaker {
1281                    cb.record_success_with_duration(start_time.elapsed()).await;
1282                }
1283
1284                let mut stats = self.stats.write().await;
1285                stats.events_consumed += 1;
1286                let processing_time = start_time.elapsed().as_millis() as u64;
1287                stats.max_processing_time_ms = stats.max_processing_time_ms.max(processing_time);
1288                stats.avg_processing_time_ms =
1289                    (stats.avg_processing_time_ms + processing_time as f64) / 2.0;
1290                stats.last_message = Some(Utc::now());
1291            }
1292            Ok(None) => {
1293                // No message available, not an error
1294                if let Some(cb) = &self.circuit_breaker {
1295                    cb.record_success_with_duration(start_time.elapsed()).await;
1296                }
1297            }
1298            Err(_) => {
1299                if let Some(cb) = &self.circuit_breaker {
1300                    cb.record_failure_with_type(circuit_breaker::FailureType::NetworkError)
1301                        .await;
1302                }
1303
1304                self.stats.write().await.events_failed += 1;
1305            }
1306        }
1307
1308        self.last_poll = Instant::now();
1309        result
1310    }
1311
1312    /// Consume a single event from the backend
1313    async fn consume_single_event(&mut self) -> Result<Option<StreamEvent>> {
1314        match &mut self.backend_consumer {
1315            #[cfg(feature = "kafka")]
1316            BackendConsumer::Kafka(consumer) => consumer.consume().await,
1317            #[cfg(feature = "nats")]
1318            BackendConsumer::Nats(consumer) => consumer.consume().await,
1319            #[cfg(feature = "redis")]
1320            BackendConsumer::Redis(consumer) => consumer.consume().await,
1321            #[cfg(feature = "kinesis")]
1322            BackendConsumer::Kinesis(consumer) => consumer.consume().await,
1323            #[cfg(feature = "pulsar")]
1324            BackendConsumer::Pulsar(consumer) => consumer.consume().await,
1325            #[cfg(feature = "rabbitmq")]
1326            BackendConsumer::RabbitMQ(consumer) => consumer.consume().await,
1327            BackendConsumer::Memory(consumer) => consumer.consume().await,
1328        }
1329    }
1330
1331    /// Consume multiple events as a batch
1332    pub async fn consume_batch(
1333        &mut self,
1334        max_events: usize,
1335        timeout: Duration,
1336    ) -> Result<Vec<StreamEvent>> {
1337        let mut events = Vec::new();
1338        let start_time = Instant::now();
1339
1340        while events.len() < max_events && start_time.elapsed() < timeout {
1341            match tokio::time::timeout(Duration::from_millis(50), self.consume()).await {
1342                Ok(Ok(Some(event))) => events.push(event),
1343                Ok(Ok(None)) => continue,
1344                Ok(Err(e)) => return Err(e),
1345                Err(_) => break, // Timeout
1346            }
1347        }
1348
1349        if !events.is_empty() {
1350            debug!(
1351                "Consumed batch of {} events in {:?}",
1352                events.len(),
1353                start_time.elapsed()
1354            );
1355        }
1356
1357        Ok(events)
1358    }
1359
1360    /// Start consuming events with a callback function
1361    pub async fn start_consuming<F>(&mut self, mut callback: F) -> Result<()>
1362    where
1363        F: FnMut(StreamEvent) -> Result<()> + Send,
1364    {
1365        info!("Starting stream consumer loop");
1366
1367        loop {
1368            match self.consume().await {
1369                Ok(Some(event)) => {
1370                    if let Err(e) = callback(event) {
1371                        error!("Callback error: {}", e);
1372                        self.stats.write().await.events_failed += 1;
1373                    }
1374                }
1375                Ok(None) => {
1376                    // No message, wait a bit
1377                    tokio::time::sleep(Duration::from_millis(10)).await;
1378                }
1379                Err(e) => {
1380                    error!("Consumer error: {}", e);
1381                    tokio::time::sleep(Duration::from_millis(100)).await;
1382                }
1383            }
1384        }
1385    }
1386
1387    /// Start consuming events with an async callback function
1388    pub async fn start_consuming_async<F, Fut>(&mut self, mut callback: F) -> Result<()>
1389    where
1390        F: FnMut(StreamEvent) -> Fut + Send,
1391        Fut: std::future::Future<Output = Result<()>> + Send,
1392    {
1393        info!("Starting async stream consumer loop");
1394
1395        loop {
1396            match self.consume().await {
1397                Ok(Some(event)) => {
1398                    if let Err(e) = callback(event).await {
1399                        error!("Async callback error: {}", e);
1400                        self.stats.write().await.events_failed += 1;
1401                    }
1402                }
1403                Ok(None) => {
1404                    // No message, wait a bit
1405                    tokio::time::sleep(Duration::from_millis(10)).await;
1406                }
1407                Err(e) => {
1408                    error!("Consumer error: {}", e);
1409                    tokio::time::sleep(Duration::from_millis(100)).await;
1410                }
1411            }
1412        }
1413    }
1414
1415    /// Get consumer statistics
1416    pub async fn get_stats(&self) -> ConsumerStats {
1417        self.stats.read().await.clone()
1418    }
1419
1420    /// Get consumer health status
1421    pub async fn health_check(&self) -> bool {
1422        if let Some(cb) = &self.circuit_breaker {
1423            cb.is_healthy().await
1424        } else {
1425            true
1426        }
1427    }
1428
1429    /// Get the consumer group name if any
1430    pub fn consumer_group(&self) -> Option<&String> {
1431        self.consumer_group.as_ref()
1432    }
1433
1434    /// Reset consumer position (for testing with memory backend)
1435    pub async fn reset_position(&mut self) -> Result<()> {
1436        match &mut self.backend_consumer {
1437            BackendConsumer::Memory(consumer) => {
1438                consumer.reset();
1439                Ok(())
1440            }
1441            #[cfg(feature = "kafka")]
1442            BackendConsumer::Kafka(_) => {
1443                Err(anyhow!("Reset position not supported for Kafka backend"))
1444            }
1445            #[cfg(feature = "nats")]
1446            BackendConsumer::Nats(_) => {
1447                Err(anyhow!("Reset position not supported for NATS backend"))
1448            }
1449            #[cfg(feature = "redis")]
1450            BackendConsumer::Redis(_) => {
1451                Err(anyhow!("Reset position not supported for Redis backend"))
1452            }
1453            #[cfg(feature = "kinesis")]
1454            BackendConsumer::Kinesis(_) => {
1455                Err(anyhow!("Reset position not supported for Kinesis backend"))
1456            }
1457            #[cfg(feature = "pulsar")]
1458            BackendConsumer::Pulsar(_) => {
1459                Err(anyhow!("Reset position not supported for Pulsar backend"))
1460            }
1461            #[cfg(feature = "rabbitmq")]
1462            BackendConsumer::RabbitMQ(_) => {
1463                Err(anyhow!("Reset position not supported for RabbitMQ backend"))
1464            }
1465        }
1466    }
1467
1468    /// Set test events for memory backend (for testing) - deprecated with backend implementation
1469    pub async fn set_test_events(&mut self, _events: Vec<StreamEvent>) -> Result<()> {
1470        // This method is deprecated with the new backend implementation
1471        // Events should be published through the producer instead
1472        Err(anyhow!("set_test_events is deprecated with backend implementation - use producer to publish events"))
1473    }
1474}
1475
1476/// RDF patch operations with full protocol support
1477#[derive(Debug, Clone, Serialize, Deserialize)]
1478pub enum PatchOperation {
1479    /// Add a triple (A operation)
1480    Add {
1481        subject: String,
1482        predicate: String,
1483        object: String,
1484    },
1485    /// Delete a triple (D operation)
1486    Delete {
1487        subject: String,
1488        predicate: String,
1489        object: String,
1490    },
1491    /// Add a graph (GA operation)
1492    AddGraph { graph: String },
1493    /// Delete a graph (GD operation)
1494    DeleteGraph { graph: String },
1495    /// Add a prefix (PA operation)
1496    AddPrefix { prefix: String, namespace: String },
1497    /// Delete a prefix (PD operation)
1498    DeletePrefix { prefix: String },
1499    /// Transaction begin (TX operation)
1500    TransactionBegin { transaction_id: Option<String> },
1501    /// Transaction commit (TC operation)
1502    TransactionCommit,
1503    /// Transaction abort (TA operation)
1504    TransactionAbort,
1505    /// Header information (H operation)
1506    Header { key: String, value: String },
1507}
1508
1509/// RDF patch for atomic updates with full protocol support
1510#[derive(Debug, Clone, Serialize, Deserialize)]
1511pub struct RdfPatch {
1512    pub operations: Vec<PatchOperation>,
1513    pub timestamp: chrono::DateTime<chrono::Utc>,
1514    pub id: String,
1515    /// Patch headers for metadata
1516    pub headers: HashMap<String, String>,
1517    /// Current transaction ID if in transaction
1518    pub transaction_id: Option<String>,
1519    /// Prefixes used in the patch
1520    pub prefixes: HashMap<String, String>,
1521}
1522
1523impl RdfPatch {
1524    /// Create a new RDF patch
1525    pub fn new() -> Self {
1526        Self {
1527            operations: Vec::new(),
1528            timestamp: chrono::Utc::now(),
1529            id: uuid::Uuid::new_v4().to_string(),
1530            headers: HashMap::new(),
1531            transaction_id: None,
1532            prefixes: HashMap::new(),
1533        }
1534    }
1535
1536    /// Add an operation to the patch
1537    pub fn add_operation(&mut self, operation: PatchOperation) {
1538        self.operations.push(operation);
1539    }
1540
1541    /// Serialize patch to RDF Patch format
1542    pub fn to_rdf_patch_format(&self) -> Result<String> {
1543        let serializer = crate::patch::PatchSerializer::new()
1544            .with_pretty_print(true)
1545            .with_metadata(true);
1546        serializer.serialize(self)
1547    }
1548
1549    /// Parse from RDF Patch format
1550    pub fn from_rdf_patch_format(input: &str) -> Result<Self> {
1551        let mut parser = crate::patch::PatchParser::new().with_strict_mode(false);
1552        parser.parse(input)
1553    }
1554}
1555
1556impl Default for RdfPatch {
1557    fn default() -> Self {
1558        Self::new()
1559    }
1560}
1561
1562// Default implementations for easier configuration
1563impl Default for StreamConfig {
1564    fn default() -> Self {
1565        Self {
1566            backend: StreamBackendType::Memory {
1567                max_size: Some(10000),
1568                persistence: false,
1569            },
1570            topic: "oxirs-stream".to_string(),
1571            batch_size: 100,
1572            flush_interval_ms: 100,
1573            max_connections: 10,
1574            connection_timeout: Duration::from_secs(30),
1575            enable_compression: false,
1576            compression_type: CompressionType::None,
1577            retry_config: RetryConfig::default(),
1578            circuit_breaker: CircuitBreakerConfig::default(),
1579            security: SecurityConfig::default(),
1580            performance: StreamPerformanceConfig::default(),
1581            monitoring: MonitoringConfig::default(),
1582        }
1583    }
1584}
1585
1586impl Default for RetryConfig {
1587    fn default() -> Self {
1588        Self {
1589            max_retries: 3,
1590            initial_backoff: Duration::from_millis(100),
1591            max_backoff: Duration::from_secs(30),
1592            backoff_multiplier: 2.0,
1593            jitter: true,
1594        }
1595    }
1596}
1597
1598impl Default for CircuitBreakerConfig {
1599    fn default() -> Self {
1600        Self {
1601            enabled: true,
1602            failure_threshold: 5,
1603            success_threshold: 3,
1604            timeout: Duration::from_secs(30),
1605            half_open_max_calls: 3,
1606        }
1607    }
1608}
1609
1610impl Default for SecurityConfig {
1611    fn default() -> Self {
1612        Self {
1613            enable_tls: false,
1614            verify_certificates: true,
1615            client_cert_path: None,
1616            client_key_path: None,
1617            ca_cert_path: None,
1618            sasl_config: None,
1619        }
1620    }
1621}
1622
1623impl Default for StreamPerformanceConfig {
1624    fn default() -> Self {
1625        Self {
1626            enable_batching: true,
1627            enable_pipelining: false,
1628            buffer_size: 8192,
1629            prefetch_count: 100,
1630            enable_zero_copy: false,
1631            enable_simd: false,
1632            parallel_processing: true,
1633            worker_threads: None,
1634        }
1635    }
1636}
1637
1638impl Default for MonitoringConfig {
1639    fn default() -> Self {
1640        Self {
1641            enable_metrics: true,
1642            enable_tracing: true,
1643            metrics_interval: Duration::from_secs(60),
1644            health_check_interval: Duration::from_secs(30),
1645            enable_profiling: false,
1646            prometheus_endpoint: None,
1647            jaeger_endpoint: None,
1648            log_level: "info".to_string(),
1649        }
1650    }
1651}
1652
1653/// Helper functions for creating common configurations
1654impl StreamConfig {
1655    /// Create a Redis configuration
1656    #[cfg(feature = "redis")]
1657    pub fn redis(url: String) -> Self {
1658        Self {
1659            backend: StreamBackendType::Redis {
1660                url,
1661                cluster_urls: None,
1662                pool_size: Some(10),
1663            },
1664            ..Default::default()
1665        }
1666    }
1667
1668    /// Create a Kinesis configuration
1669    #[cfg(feature = "kinesis")]
1670    pub fn kinesis(region: String, stream_name: String) -> Self {
1671        Self {
1672            backend: StreamBackendType::Kinesis {
1673                region,
1674                stream_name,
1675                credentials: None,
1676            },
1677            ..Default::default()
1678        }
1679    }
1680
1681    /// Create a memory configuration for testing
1682    pub fn memory() -> Self {
1683        Self {
1684            backend: StreamBackendType::Memory {
1685                max_size: Some(1000),
1686                persistence: false,
1687            },
1688            ..Default::default()
1689        }
1690    }
1691
1692    /// Enable high-performance configuration
1693    pub fn high_performance(mut self) -> Self {
1694        self.performance.enable_batching = true;
1695        self.performance.enable_pipelining = true;
1696        self.performance.parallel_processing = true;
1697        self.performance.buffer_size = 65536;
1698        self.performance.prefetch_count = 1000;
1699        self.batch_size = 1000;
1700        self.flush_interval_ms = 10;
1701        self
1702    }
1703
1704    /// Enable compression
1705    pub fn with_compression(mut self, compression_type: CompressionType) -> Self {
1706        self.enable_compression = true;
1707        self.compression_type = compression_type;
1708        self
1709    }
1710
1711    /// Configure circuit breaker
1712    pub fn with_circuit_breaker(mut self, enabled: bool, failure_threshold: u32) -> Self {
1713        self.circuit_breaker.enabled = enabled;
1714        self.circuit_breaker.failure_threshold = failure_threshold;
1715        self
1716    }
1717
1718    /// Create a development configuration with memory backend and debug settings
1719    pub fn development(topic: &str) -> Self {
1720        Self {
1721            backend: StreamBackendType::Memory {
1722                max_size: Some(10000),
1723                persistence: false,
1724            },
1725            topic: topic.to_string(),
1726            batch_size: 10,
1727            flush_interval_ms: 100,
1728            max_connections: 5,
1729            connection_timeout: Duration::from_secs(10),
1730            enable_compression: false,
1731            compression_type: CompressionType::None,
1732            retry_config: RetryConfig {
1733                max_retries: 3,
1734                initial_backoff: Duration::from_millis(100),
1735                max_backoff: Duration::from_secs(5),
1736                backoff_multiplier: 2.0,
1737                jitter: true,
1738            },
1739            circuit_breaker: CircuitBreakerConfig {
1740                enabled: false,
1741                failure_threshold: 5,
1742                success_threshold: 2,
1743                timeout: Duration::from_secs(60),
1744                half_open_max_calls: 10,
1745            },
1746            security: SecurityConfig::default(),
1747            performance: StreamPerformanceConfig::default(),
1748            monitoring: MonitoringConfig {
1749                enable_metrics: true,
1750                enable_tracing: false,
1751                metrics_interval: Duration::from_secs(5),
1752                health_check_interval: Duration::from_secs(30),
1753                enable_profiling: false,
1754                prometheus_endpoint: None,
1755                jaeger_endpoint: None,
1756                log_level: "debug".to_string(),
1757            },
1758        }
1759    }
1760
1761    /// Create a production configuration with optimal performance settings
1762    pub fn production(topic: &str) -> Self {
1763        Self {
1764            backend: StreamBackendType::Memory {
1765                max_size: Some(100000),
1766                persistence: true,
1767            },
1768            topic: topic.to_string(),
1769            batch_size: 1000,
1770            flush_interval_ms: 10,
1771            max_connections: 50,
1772            connection_timeout: Duration::from_secs(30),
1773            enable_compression: true,
1774            compression_type: CompressionType::Zstd,
1775            retry_config: RetryConfig {
1776                max_retries: 5,
1777                initial_backoff: Duration::from_millis(200),
1778                max_backoff: Duration::from_secs(30),
1779                backoff_multiplier: 2.0,
1780                jitter: true,
1781            },
1782            circuit_breaker: CircuitBreakerConfig {
1783                enabled: true,
1784                failure_threshold: 10,
1785                success_threshold: 3,
1786                timeout: Duration::from_secs(30),
1787                half_open_max_calls: 5,
1788            },
1789            security: SecurityConfig::default(),
1790            performance: StreamPerformanceConfig {
1791                enable_batching: true,
1792                enable_pipelining: true,
1793                parallel_processing: true,
1794                buffer_size: 65536,
1795                prefetch_count: 1000,
1796                enable_zero_copy: true,
1797                enable_simd: true,
1798                worker_threads: None,
1799            },
1800            monitoring: MonitoringConfig {
1801                enable_metrics: true,
1802                enable_tracing: true,
1803                metrics_interval: Duration::from_secs(1),
1804                health_check_interval: Duration::from_secs(10),
1805                enable_profiling: true,
1806                prometheus_endpoint: None,
1807                jaeger_endpoint: None,
1808                log_level: "info".to_string(),
1809            },
1810        }
1811    }
1812}
1813
1814/// Unified Stream interface that combines producer and consumer functionality
1815pub struct Stream {
1816    producer: StreamProducer,
1817    consumer: StreamConsumer,
1818}
1819
1820impl Stream {
1821    /// Create a new unified stream instance
1822    pub async fn new(config: StreamConfig) -> Result<Self> {
1823        // Note: Commented out automatic clearing for performance tests
1824        // Clear memory events if using memory backend (important for testing)
1825        // if matches!(config.backend, StreamBackendType::Memory { .. }) {
1826        //     clear_memory_events().await;
1827        // }
1828
1829        let producer = StreamProducer::new(config.clone()).await?;
1830        let consumer = StreamConsumer::new(config).await?;
1831
1832        Ok(Self { producer, consumer })
1833    }
1834
1835    /// Publish an event to the stream
1836    pub async fn publish(&mut self, event: StreamEvent) -> Result<()> {
1837        self.producer.publish(event).await
1838    }
1839
1840    /// Consume an event from the stream
1841    pub async fn consume(&mut self) -> Result<Option<StreamEvent>> {
1842        self.consumer.consume().await
1843    }
1844
1845    /// Flush any pending events
1846    pub async fn flush(&mut self) -> Result<()> {
1847        self.producer.flush().await
1848    }
1849
1850    /// Get producer statistics
1851    pub async fn producer_stats(&self) -> ProducerStats {
1852        self.producer.get_stats().await
1853    }
1854
1855    /// Get consumer statistics  
1856    pub async fn consumer_stats(&self) -> ConsumerStats {
1857        self.consumer.get_stats().await
1858    }
1859
1860    /// Close the stream and clean up resources
1861    pub async fn close(&mut self) -> Result<()> {
1862        // Flush any pending events
1863        self.producer.flush().await?;
1864
1865        // Close backend connections
1866        // Producer and consumer close operations would be handled by their Drop implementations
1867        debug!("Stream closed successfully");
1868        Ok(())
1869    }
1870
1871    /// Perform a health check on the stream
1872    pub async fn health_check(&self) -> Result<bool> {
1873        // Basic health check - verify that the stream is operational
1874        // This is a simplified implementation
1875        Ok(true)
1876    }
1877
1878    /// Begin a transaction (placeholder implementation)
1879    pub async fn begin_transaction(&mut self) -> Result<()> {
1880        // Placeholder implementation for transaction support
1881        debug!("Transaction begun (placeholder)");
1882        Ok(())
1883    }
1884
1885    /// Commit a transaction (placeholder implementation)
1886    pub async fn commit_transaction(&mut self) -> Result<()> {
1887        // Placeholder implementation for transaction support
1888        debug!("Transaction committed (placeholder)");
1889        Ok(())
1890    }
1891
1892    /// Rollback a transaction (placeholder implementation)
1893    pub async fn rollback_transaction(&mut self) -> Result<()> {
1894        // Placeholder implementation for transaction support
1895        debug!("Transaction rolled back (placeholder)");
1896        Ok(())
1897    }
1898}