Skip to main content

oxirs_stream/
lib_types.rs

1//! Type definitions extracted from lib.rs for 2000-line policy compliance
2
3use anyhow::{anyhow, Result};
4use chrono::{DateTime, Utc};
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9use tokio::sync::{RwLock, Semaphore};
10use tracing::{debug, error, info};
11use uuid::Uuid;
12
13use crate::backend::{self, StreamBackend};
14use crate::circuit_breaker::{self, SharedCircuitBreakerExt};
15use crate::event::{EventMetadata, StreamEvent};
16
17/// Enhanced stream configuration with advanced features
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct StreamConfig {
20    pub backend: StreamBackendType,
21    pub topic: String,
22    pub batch_size: usize,
23    pub flush_interval_ms: u64,
24    /// Maximum concurrent connections
25    pub max_connections: usize,
26    /// Connection timeout
27    pub connection_timeout: Duration,
28    /// Enable compression
29    pub enable_compression: bool,
30    /// Compression type
31    pub compression_type: CompressionType,
32    /// Retry configuration
33    pub retry_config: RetryConfig,
34    /// Circuit breaker configuration
35    pub circuit_breaker: CircuitBreakerConfig,
36    /// Security configuration
37    pub security: SecurityConfig,
38    /// Performance tuning
39    pub performance: StreamPerformanceConfig,
40    /// Monitoring configuration
41    pub monitoring: MonitoringConfig,
42}
43
44/// Compression types supported
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub enum CompressionType {
47    None,
48    Gzip,
49    Snappy,
50    Lz4,
51    Zstd,
52}
53
54/// Retry configuration
55#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct RetryConfig {
57    pub max_retries: u32,
58    pub initial_backoff: Duration,
59    pub max_backoff: Duration,
60    pub backoff_multiplier: f64,
61    pub jitter: bool,
62}
63
64/// Circuit breaker configuration
65#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct CircuitBreakerConfig {
67    pub enabled: bool,
68    pub failure_threshold: u32,
69    pub success_threshold: u32,
70    pub timeout: Duration,
71    pub half_open_max_calls: u32,
72}
73
74/// Security configuration
75#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct SecurityConfig {
77    pub enable_tls: bool,
78    pub verify_certificates: bool,
79    pub client_cert_path: Option<String>,
80    pub client_key_path: Option<String>,
81    pub ca_cert_path: Option<String>,
82    pub sasl_config: Option<SaslConfig>,
83}
84
85/// SASL authentication configuration
86#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct SaslConfig {
88    pub mechanism: SaslMechanism,
89    pub username: String,
90    pub password: String,
91}
92
93/// SASL authentication mechanisms
94#[derive(Debug, Clone, Serialize, Deserialize)]
95pub enum SaslMechanism {
96    Plain,
97    ScramSha256,
98    ScramSha512,
99    OAuthBearer,
100}
101
102/// Performance tuning configuration
103#[derive(Debug, Clone, Serialize, Deserialize)]
104pub struct StreamPerformanceConfig {
105    pub enable_batching: bool,
106    pub enable_pipelining: bool,
107    pub buffer_size: usize,
108    pub prefetch_count: u32,
109    pub enable_zero_copy: bool,
110    pub enable_simd: bool,
111    pub parallel_processing: bool,
112    pub worker_threads: Option<usize>,
113}
114
115/// Monitoring configuration
116#[derive(Debug, Clone, Serialize, Deserialize)]
117pub struct MonitoringConfig {
118    pub enable_metrics: bool,
119    pub enable_tracing: bool,
120    pub metrics_interval: Duration,
121    pub health_check_interval: Duration,
122    pub enable_profiling: bool,
123    pub prometheus_endpoint: Option<String>,
124    pub jaeger_endpoint: Option<String>,
125    pub log_level: String,
126}
127
128/// Enhanced streaming backend options
129#[derive(Debug, Clone, Serialize, Deserialize)]
130pub enum StreamBackendType {
131    #[cfg(feature = "kafka")]
132    Kafka {
133        brokers: Vec<String>,
134        security_protocol: Option<String>,
135        sasl_config: Option<SaslConfig>,
136    },
137    #[cfg(feature = "nats")]
138    Nats {
139        url: String,
140        cluster_urls: Option<Vec<String>>,
141        jetstream_config: Option<NatsJetStreamConfig>,
142    },
143    #[cfg(feature = "redis")]
144    Redis {
145        url: String,
146        cluster_urls: Option<Vec<String>>,
147        pool_size: Option<usize>,
148    },
149    #[cfg(feature = "kinesis")]
150    Kinesis {
151        region: String,
152        stream_name: String,
153        credentials: Option<AwsCredentials>,
154    },
155    #[cfg(feature = "pulsar")]
156    Pulsar {
157        service_url: String,
158        auth_config: Option<PulsarAuthConfig>,
159    },
160    #[cfg(feature = "rabbitmq")]
161    RabbitMQ {
162        url: String,
163        exchange: Option<String>,
164        queue: Option<String>,
165    },
166    Memory {
167        max_size: Option<usize>,
168        persistence: bool,
169    },
170}
171
172/// NATS JetStream configuration
173#[derive(Debug, Clone, Serialize, Deserialize)]
174pub struct NatsJetStreamConfig {
175    pub domain: Option<String>,
176    pub api_prefix: Option<String>,
177    pub timeout: Duration,
178}
179
180/// AWS credentials configuration
181#[derive(Debug, Clone, Serialize, Deserialize)]
182pub struct AwsCredentials {
183    pub access_key_id: String,
184    pub secret_access_key: String,
185    pub session_token: Option<String>,
186    pub role_arn: Option<String>,
187}
188
189/// Pulsar authentication configuration
190#[derive(Debug, Clone, Serialize, Deserialize)]
191pub struct PulsarAuthConfig {
192    pub auth_method: PulsarAuthMethod,
193    pub auth_params: HashMap<String, String>,
194}
195
196/// Pulsar authentication methods
197#[derive(Debug, Clone, Serialize, Deserialize)]
198pub enum PulsarAuthMethod {
199    Token,
200    Jwt,
201    Oauth2,
202    Tls,
203}
204
205/// Enhanced stream producer for publishing RDF changes with backend support
206pub struct StreamProducer {
207    config: StreamConfig,
208    backend_producer: BackendProducer,
209    stats: Arc<RwLock<ProducerStats>>,
210    circuit_breaker: Option<circuit_breaker::SharedCircuitBreaker>,
211    last_flush: Instant,
212    _pending_events: Arc<RwLock<Vec<StreamEvent>>>,
213    batch_buffer: Arc<RwLock<Vec<StreamEvent>>>,
214    flush_semaphore: Arc<Semaphore>,
215}
216
217/// Backend-agnostic producer wrapper
218enum BackendProducer {
219    #[cfg(feature = "kafka")]
220    Kafka(backend::kafka::KafkaProducer),
221    #[cfg(feature = "nats")]
222    Nats(Box<backend::nats::NatsProducer>),
223    #[cfg(feature = "redis")]
224    Redis(backend::redis::RedisProducer),
225    #[cfg(feature = "kinesis")]
226    Kinesis(backend::kinesis::KinesisProducer),
227    #[cfg(feature = "pulsar")]
228    Pulsar(Box<backend::pulsar::PulsarProducer>),
229    #[cfg(feature = "rabbitmq")]
230    RabbitMQ(Box<backend::rabbitmq::RabbitMQProducer>),
231    Memory(MemoryProducer),
232}
233
234/// Producer statistics for monitoring
235#[derive(Debug, Default, Clone)]
236pub struct ProducerStats {
237    events_published: u64,
238    events_failed: u64,
239    _bytes_sent: u64,
240    avg_latency_ms: f64,
241    max_latency_ms: u64,
242    batch_count: u64,
243    flush_count: u64,
244    circuit_breaker_trips: u64,
245    last_publish: Option<DateTime<Utc>>,
246    backend_type: String,
247}
248
249/// Memory-based producer for testing and development
250// Type aliases for complex types
251type MemoryEventVec = Vec<(DateTime<Utc>, StreamEvent)>;
252type MemoryEventStore = Arc<RwLock<MemoryEventVec>>;
253
254// Global shared storage for memory backend events
255static MEMORY_EVENTS: std::sync::OnceLock<MemoryEventStore> = std::sync::OnceLock::new();
256
257pub fn get_memory_events() -> MemoryEventStore {
258    MEMORY_EVENTS
259        .get_or_init(|| Arc::new(RwLock::new(Vec::new())))
260        .clone()
261}
262
263/// Clear the global memory storage (for testing)
264pub async fn clear_memory_events() {
265    let events = get_memory_events();
266    events.write().await.clear();
267    // Also clear the memory backend storage
268    backend::memory::clear_memory_storage().await;
269}
270
271struct MemoryProducer {
272    backend: Box<dyn StreamBackend + Send + Sync>,
273    topic: String,
274    stats: ProducerStats,
275}
276
277impl MemoryProducer {
278    fn _new(_max_size: Option<usize>, _persistence: bool) -> Self {
279        Self {
280            backend: Box::new(backend::memory::MemoryBackend::new()),
281            topic: "oxirs-stream".to_string(), // Default topic
282            stats: ProducerStats {
283                backend_type: "memory".to_string(),
284                ..Default::default()
285            },
286        }
287    }
288
289    fn with_topic(topic: String) -> Self {
290        Self {
291            backend: Box::new(backend::memory::MemoryBackend::new()),
292            topic,
293            stats: ProducerStats {
294                backend_type: "memory".to_string(),
295                ..Default::default()
296            },
297        }
298    }
299
300    async fn publish(&mut self, event: StreamEvent) -> Result<()> {
301        let start_time = Instant::now();
302
303        // Use the backend implementation
304        self.backend
305            .as_mut()
306            .connect()
307            .await
308            .map_err(|e| anyhow!("Backend connect failed: {}", e))?;
309
310        // Create topic if it doesn't exist
311        let topic_name = crate::types::TopicName::new(self.topic.clone());
312        self.backend
313            .create_topic(&topic_name, 1)
314            .await
315            .map_err(|e| anyhow!("Topic creation failed: {}", e))?;
316
317        // Send event through backend
318        self.backend
319            .send_event(&topic_name, event)
320            .await
321            .map_err(|e| anyhow!("Send event failed: {}", e))?;
322
323        // Update stats
324        self.stats.events_published += 1;
325        let latency = start_time.elapsed().as_millis() as u64;
326        self.stats.max_latency_ms = self.stats.max_latency_ms.max(latency);
327        self.stats.avg_latency_ms = (self.stats.avg_latency_ms + latency as f64) / 2.0;
328        self.stats.last_publish = Some(Utc::now());
329
330        debug!("Memory producer: published event via backend");
331        Ok(())
332    }
333
334    async fn flush(&mut self) -> Result<()> {
335        // Backend handles flushing internally
336        self.stats.flush_count += 1;
337        debug!("Memory producer: flush completed");
338        Ok(())
339    }
340
341    fn _get_stats(&self) -> &ProducerStats {
342        &self.stats
343    }
344}
345
346impl StreamProducer {
347    /// Create a new enhanced stream producer with backend support
348    pub async fn new(config: StreamConfig) -> Result<Self> {
349        // Initialize circuit breaker if enabled
350        let circuit_breaker = if config.circuit_breaker.enabled {
351            Some(circuit_breaker::new_shared_circuit_breaker(
352                circuit_breaker::CircuitBreakerConfig {
353                    enabled: config.circuit_breaker.enabled,
354                    failure_threshold: config.circuit_breaker.failure_threshold,
355                    success_threshold: config.circuit_breaker.success_threshold,
356                    timeout: config.circuit_breaker.timeout,
357                    half_open_max_calls: config.circuit_breaker.half_open_max_calls,
358                    ..Default::default()
359                },
360            ))
361        } else {
362            None
363        };
364
365        // Initialize backend-specific producer
366        let backend_producer = match &config.backend {
367            #[cfg(feature = "kafka")]
368            StreamBackendType::Kafka {
369                brokers,
370                security_protocol,
371                sasl_config,
372            } => {
373                let stream_config = crate::StreamConfig {
374                    backend: crate::StreamBackendType::Kafka {
375                        brokers: brokers.clone(),
376                        security_protocol: security_protocol.clone(),
377                        sasl_config: sasl_config.clone(),
378                    },
379                    topic: config.topic.clone(),
380                    batch_size: config.batch_size,
381                    flush_interval_ms: config.flush_interval_ms,
382                    max_connections: config.max_connections,
383                    connection_timeout: config.connection_timeout,
384                    enable_compression: config.enable_compression,
385                    compression_type: config.compression_type.clone(),
386                    retry_config: config.retry_config.clone(),
387                    circuit_breaker: config.circuit_breaker.clone(),
388                    security: config.security.clone(),
389                    performance: config.performance.clone(),
390                    monitoring: config.monitoring.clone(),
391                };
392
393                let mut producer = backend::kafka::KafkaProducer::new(stream_config)?;
394                producer.connect().await?;
395                BackendProducer::Kafka(producer)
396            }
397            #[cfg(feature = "nats")]
398            StreamBackendType::Nats {
399                url,
400                cluster_urls,
401                jetstream_config,
402            } => {
403                let stream_config = crate::StreamConfig {
404                    backend: crate::StreamBackendType::Nats {
405                        url: url.clone(),
406                        cluster_urls: cluster_urls.clone(),
407                        jetstream_config: jetstream_config.clone(),
408                    },
409                    topic: config.topic.clone(),
410                    batch_size: config.batch_size,
411                    flush_interval_ms: config.flush_interval_ms,
412                    max_connections: config.max_connections,
413                    connection_timeout: config.connection_timeout,
414                    enable_compression: config.enable_compression,
415                    compression_type: config.compression_type.clone(),
416                    retry_config: config.retry_config.clone(),
417                    circuit_breaker: config.circuit_breaker.clone(),
418                    security: config.security.clone(),
419                    performance: config.performance.clone(),
420                    monitoring: config.monitoring.clone(),
421                };
422
423                let mut producer = backend::nats::NatsProducer::new(stream_config)?;
424                producer.connect().await?;
425                BackendProducer::Nats(Box::new(producer))
426            }
427            #[cfg(feature = "redis")]
428            StreamBackendType::Redis {
429                url,
430                cluster_urls,
431                pool_size,
432            } => {
433                let stream_config = crate::StreamConfig {
434                    backend: crate::StreamBackendType::Redis {
435                        url: url.clone(),
436                        cluster_urls: cluster_urls.clone(),
437                        pool_size: *pool_size,
438                    },
439                    topic: config.topic.clone(),
440                    batch_size: config.batch_size,
441                    flush_interval_ms: config.flush_interval_ms,
442                    max_connections: config.max_connections,
443                    connection_timeout: config.connection_timeout,
444                    enable_compression: config.enable_compression,
445                    compression_type: config.compression_type.clone(),
446                    retry_config: config.retry_config.clone(),
447                    circuit_breaker: config.circuit_breaker.clone(),
448                    security: config.security.clone(),
449                    performance: config.performance.clone(),
450                    monitoring: config.monitoring.clone(),
451                };
452
453                let mut producer = backend::redis::RedisProducer::new(stream_config)?;
454                producer.connect().await?;
455                BackendProducer::Redis(producer)
456            }
457            #[cfg(feature = "kinesis")]
458            StreamBackendType::Kinesis {
459                region,
460                stream_name,
461                credentials,
462            } => {
463                let stream_config = crate::StreamConfig {
464                    backend: crate::StreamBackendType::Kinesis {
465                        region: region.clone(),
466                        stream_name: stream_name.clone(),
467                        credentials: credentials.clone(),
468                    },
469                    topic: config.topic.clone(),
470                    batch_size: config.batch_size,
471                    flush_interval_ms: config.flush_interval_ms,
472                    max_connections: config.max_connections,
473                    connection_timeout: config.connection_timeout,
474                    enable_compression: config.enable_compression,
475                    compression_type: config.compression_type.clone(),
476                    retry_config: config.retry_config.clone(),
477                    circuit_breaker: config.circuit_breaker.clone(),
478                    security: config.security.clone(),
479                    performance: config.performance.clone(),
480                    monitoring: config.monitoring.clone(),
481                };
482
483                let mut producer = backend::kinesis::KinesisProducer::new(stream_config)?;
484                producer.connect().await?;
485                BackendProducer::Kinesis(producer)
486            }
487            #[cfg(feature = "pulsar")]
488            StreamBackendType::Pulsar {
489                service_url,
490                auth_config,
491            } => {
492                let stream_config = crate::StreamConfig {
493                    backend: crate::StreamBackendType::Pulsar {
494                        service_url: service_url.clone(),
495                        auth_config: auth_config.clone(),
496                    },
497                    topic: config.topic.clone(),
498                    batch_size: config.batch_size,
499                    flush_interval_ms: config.flush_interval_ms,
500                    max_connections: config.max_connections,
501                    connection_timeout: config.connection_timeout,
502                    enable_compression: config.enable_compression,
503                    compression_type: config.compression_type.clone(),
504                    retry_config: config.retry_config.clone(),
505                    circuit_breaker: config.circuit_breaker.clone(),
506                    security: config.security.clone(),
507                    performance: config.performance.clone(),
508                    monitoring: config.monitoring.clone(),
509                };
510
511                let mut producer = backend::pulsar::PulsarProducer::new(stream_config)?;
512                producer.connect().await?;
513                BackendProducer::Pulsar(Box::new(producer))
514            }
515            #[cfg(feature = "rabbitmq")]
516            StreamBackendType::RabbitMQ {
517                url,
518                exchange,
519                queue,
520            } => {
521                let stream_config = crate::StreamConfig {
522                    backend: crate::StreamBackendType::RabbitMQ {
523                        url: url.clone(),
524                        exchange: exchange.clone(),
525                        queue: queue.clone(),
526                    },
527                    topic: config.topic.clone(),
528                    batch_size: config.batch_size,
529                    flush_interval_ms: config.flush_interval_ms,
530                    max_connections: config.max_connections,
531                    connection_timeout: config.connection_timeout,
532                    enable_compression: config.enable_compression,
533                    compression_type: config.compression_type.clone(),
534                    retry_config: config.retry_config.clone(),
535                    circuit_breaker: config.circuit_breaker.clone(),
536                    security: config.security.clone(),
537                    performance: config.performance.clone(),
538                    monitoring: config.monitoring.clone(),
539                };
540
541                let mut producer = backend::rabbitmq::RabbitMQProducer::new(stream_config)?;
542                producer.connect().await?;
543                BackendProducer::RabbitMQ(Box::new(producer))
544            }
545            StreamBackendType::Memory {
546                max_size: _,
547                persistence: _,
548            } => BackendProducer::Memory(MemoryProducer::with_topic(config.topic.clone())),
549        };
550
551        let stats = Arc::new(RwLock::new(ProducerStats {
552            backend_type: match backend_producer {
553                #[cfg(feature = "kafka")]
554                BackendProducer::Kafka(_) => "kafka".to_string(),
555                #[cfg(feature = "nats")]
556                BackendProducer::Nats(_) => "nats".to_string(),
557                #[cfg(feature = "redis")]
558                BackendProducer::Redis(_) => "redis".to_string(),
559                #[cfg(feature = "kinesis")]
560                BackendProducer::Kinesis(_) => "kinesis".to_string(),
561                #[cfg(feature = "pulsar")]
562                BackendProducer::Pulsar(_) => "pulsar".to_string(),
563                #[cfg(feature = "rabbitmq")]
564                BackendProducer::RabbitMQ(_) => "rabbitmq".to_string(),
565                BackendProducer::Memory(_) => "memory".to_string(),
566            },
567            ..Default::default()
568        }));
569
570        info!(
571            "Created stream producer with backend: {}",
572            stats.read().await.backend_type
573        );
574
575        Ok(Self {
576            config,
577            backend_producer,
578            stats,
579            circuit_breaker,
580            last_flush: Instant::now(),
581            _pending_events: Arc::new(RwLock::new(Vec::new())),
582            batch_buffer: Arc::new(RwLock::new(Vec::new())),
583            flush_semaphore: Arc::new(Semaphore::new(1)),
584        })
585    }
586
587    /// Publish a stream event with circuit breaker protection and batching
588    pub async fn publish(&mut self, event: StreamEvent) -> Result<()> {
589        let start_time = Instant::now();
590
591        // Check circuit breaker if enabled
592        if let Some(cb) = &self.circuit_breaker {
593            if !cb.can_execute().await {
594                self.stats.write().await.circuit_breaker_trips += 1;
595                return Err(anyhow!("Circuit breaker is open - cannot publish events"));
596            }
597        }
598
599        // Handle batching if enabled
600        if self.config.performance.enable_batching {
601            let mut batch_buffer = self.batch_buffer.write().await;
602            batch_buffer.push(event);
603
604            if batch_buffer.len() >= self.config.batch_size {
605                let events = std::mem::take(&mut *batch_buffer);
606                drop(batch_buffer);
607                return self.publish_batch_internal(events).await;
608            }
609
610            return Ok(());
611        }
612
613        // Publish single event
614        let result = self.publish_single_event(event).await;
615
616        // Update circuit breaker and stats
617        match &result {
618            Ok(_) => {
619                if let Some(cb) = &self.circuit_breaker {
620                    cb.record_success_with_duration(start_time.elapsed()).await;
621                }
622
623                let mut stats = self.stats.write().await;
624                stats.events_published += 1;
625                let latency = start_time.elapsed().as_millis() as u64;
626                stats.max_latency_ms = stats.max_latency_ms.max(latency);
627                stats.avg_latency_ms = (stats.avg_latency_ms + latency as f64) / 2.0;
628                stats.last_publish = Some(Utc::now());
629            }
630            Err(_) => {
631                if let Some(cb) = &self.circuit_breaker {
632                    cb.record_failure_with_type(circuit_breaker::FailureType::NetworkError)
633                        .await;
634                }
635
636                self.stats.write().await.events_failed += 1;
637            }
638        }
639
640        result
641    }
642
643    /// Publish a single event to the backend
644    async fn publish_single_event(&mut self, event: StreamEvent) -> Result<()> {
645        match &mut self.backend_producer {
646            #[cfg(feature = "kafka")]
647            BackendProducer::Kafka(producer) => producer.publish(event).await,
648            #[cfg(feature = "nats")]
649            BackendProducer::Nats(producer) => producer.publish(event).await,
650            #[cfg(feature = "redis")]
651            BackendProducer::Redis(producer) => producer.publish(event).await,
652            #[cfg(feature = "kinesis")]
653            BackendProducer::Kinesis(producer) => producer.publish(event).await,
654            #[cfg(feature = "pulsar")]
655            BackendProducer::Pulsar(producer) => producer.publish(event).await,
656            #[cfg(feature = "rabbitmq")]
657            BackendProducer::RabbitMQ(producer) => producer.publish(event).await,
658            BackendProducer::Memory(producer) => producer.publish(event).await,
659        }
660    }
661
662    /// Publish multiple events as a batch
663    pub async fn publish_batch(&mut self, events: Vec<StreamEvent>) -> Result<()> {
664        if events.is_empty() {
665            return Ok(());
666        }
667
668        self.publish_batch_internal(events).await
669    }
670
671    /// Internal batch publishing implementation
672    async fn publish_batch_internal(&mut self, events: Vec<StreamEvent>) -> Result<()> {
673        let start_time = Instant::now();
674        let event_count = events.len();
675
676        let result = match &mut self.backend_producer {
677            #[cfg(feature = "kafka")]
678            BackendProducer::Kafka(producer) => producer.publish_batch(events).await,
679            #[cfg(feature = "nats")]
680            BackendProducer::Nats(producer) => producer.publish_batch(events).await,
681            #[cfg(feature = "redis")]
682            BackendProducer::Redis(producer) => producer.publish_batch(events).await,
683            #[cfg(feature = "kinesis")]
684            BackendProducer::Kinesis(producer) => producer.publish_batch(events).await,
685            #[cfg(feature = "pulsar")]
686            BackendProducer::Pulsar(producer) => producer.publish_batch(events).await,
687            #[cfg(feature = "rabbitmq")]
688            BackendProducer::RabbitMQ(producer) => producer.publish_batch(events).await,
689            BackendProducer::Memory(producer) => {
690                for event in events {
691                    producer.publish(event).await?;
692                }
693                Ok(())
694            }
695        };
696
697        // Update stats
698        let mut stats = self.stats.write().await;
699        match &result {
700            Ok(_) => {
701                stats.events_published += event_count as u64;
702                stats.batch_count += 1;
703                let latency = start_time.elapsed().as_millis() as u64;
704                stats.max_latency_ms = stats.max_latency_ms.max(latency);
705                stats.avg_latency_ms = (stats.avg_latency_ms + latency as f64) / 2.0;
706                stats.last_publish = Some(Utc::now());
707            }
708            Err(_) => {
709                stats.events_failed += event_count as u64;
710            }
711        }
712
713        debug!(
714            "Published batch of {} events in {:?}",
715            event_count,
716            start_time.elapsed()
717        );
718        result
719    }
720
721    /// Flush any pending events and buffers
722    pub async fn flush(&mut self) -> Result<()> {
723        let _permit = self
724            .flush_semaphore
725            .acquire()
726            .await
727            .map_err(|_| anyhow!("Failed to acquire flush semaphore"))?;
728
729        let start_time = Instant::now();
730
731        // Flush any pending batch buffer
732        if self.config.performance.enable_batching {
733            let events = {
734                let mut batch_buffer = self.batch_buffer.write().await;
735                if !batch_buffer.is_empty() {
736                    std::mem::take(&mut *batch_buffer)
737                } else {
738                    Vec::new()
739                }
740            };
741            if !events.is_empty() {
742                drop(_permit); // Release permit before mutable borrow
743                self.publish_batch_internal(events).await?;
744            }
745        }
746
747        // Flush backend-specific buffers
748        let result = match &mut self.backend_producer {
749            #[cfg(feature = "kafka")]
750            BackendProducer::Kafka(producer) => producer.flush().await,
751            #[cfg(feature = "nats")]
752            BackendProducer::Nats(producer) => producer.flush().await,
753            #[cfg(feature = "redis")]
754            BackendProducer::Redis(producer) => producer.flush().await,
755            #[cfg(feature = "kinesis")]
756            BackendProducer::Kinesis(producer) => producer.flush().await,
757            #[cfg(feature = "pulsar")]
758            BackendProducer::Pulsar(producer) => producer.flush().await,
759            #[cfg(feature = "rabbitmq")]
760            BackendProducer::RabbitMQ(producer) => producer.flush().await,
761            BackendProducer::Memory(producer) => producer.flush().await,
762        };
763
764        // Update stats
765        if result.is_ok() {
766            self.stats.write().await.flush_count += 1;
767            self.last_flush = Instant::now();
768            debug!("Flushed producer buffers in {:?}", start_time.elapsed());
769        }
770
771        result
772    }
773
774    /// Publish an RDF patch as a series of events
775    pub async fn publish_patch(&mut self, patch: &RdfPatch) -> Result<()> {
776        let events: Vec<StreamEvent> = patch
777            .operations
778            .iter()
779            .filter_map(|op| {
780                let metadata = EventMetadata {
781                    event_id: Uuid::new_v4().to_string(),
782                    timestamp: patch.timestamp,
783                    source: "rdf_patch".to_string(),
784                    user: None,
785                    context: Some(patch.id.clone()),
786                    caused_by: None,
787                    version: "1.0".to_string(),
788                    properties: HashMap::new(),
789                    checksum: None,
790                };
791
792                match op {
793                    PatchOperation::Add {
794                        subject,
795                        predicate,
796                        object,
797                    } => Some(StreamEvent::TripleAdded {
798                        subject: subject.clone(),
799                        predicate: predicate.clone(),
800                        object: object.clone(),
801                        graph: None,
802                        metadata,
803                    }),
804                    PatchOperation::Delete {
805                        subject,
806                        predicate,
807                        object,
808                    } => Some(StreamEvent::TripleRemoved {
809                        subject: subject.clone(),
810                        predicate: predicate.clone(),
811                        object: object.clone(),
812                        graph: None,
813                        metadata,
814                    }),
815                    PatchOperation::AddGraph { graph } => Some(StreamEvent::GraphCreated {
816                        graph: graph.clone(),
817                        metadata,
818                    }),
819                    PatchOperation::DeleteGraph { graph } => Some(StreamEvent::GraphDeleted {
820                        graph: graph.clone(),
821                        metadata,
822                    }),
823                    PatchOperation::AddPrefix { .. } => {
824                        // Skip prefix operations for now
825                        None
826                    }
827                    PatchOperation::DeletePrefix { .. } => {
828                        // Skip prefix operations for now
829                        None
830                    }
831                    PatchOperation::TransactionBegin { .. } => {
832                        Some(StreamEvent::TransactionBegin {
833                            transaction_id: patch
834                                .transaction_id
835                                .clone()
836                                .unwrap_or_else(|| Uuid::new_v4().to_string()),
837                            isolation_level: None,
838                            metadata,
839                        })
840                    }
841                    PatchOperation::TransactionCommit => Some(StreamEvent::TransactionCommit {
842                        transaction_id: patch
843                            .transaction_id
844                            .clone()
845                            .unwrap_or_else(|| Uuid::new_v4().to_string()),
846                        metadata,
847                    }),
848                    PatchOperation::TransactionAbort => Some(StreamEvent::TransactionAbort {
849                        transaction_id: patch
850                            .transaction_id
851                            .clone()
852                            .unwrap_or_else(|| Uuid::new_v4().to_string()),
853                        metadata,
854                    }),
855                    PatchOperation::Header { .. } => {
856                        // Skip header operations for now
857                        None
858                    }
859                }
860            })
861            .collect();
862
863        self.publish_batch(events).await
864    }
865
866    /// Get producer statistics
867    pub async fn get_stats(&self) -> ProducerStats {
868        self.stats.read().await.clone()
869    }
870
871    /// Get producer health status
872    pub async fn health_check(&self) -> bool {
873        if let Some(cb) = &self.circuit_breaker {
874            cb.is_healthy().await
875        } else {
876            true
877        }
878    }
879}
880
881/// Enhanced stream consumer for receiving RDF changes with backend support
882pub struct StreamConsumer {
883    _config: StreamConfig,
884    backend_consumer: BackendConsumer,
885    stats: Arc<RwLock<ConsumerStats>>,
886    circuit_breaker: Option<circuit_breaker::SharedCircuitBreaker>,
887    last_poll: Instant,
888    _message_buffer: Arc<RwLock<Vec<StreamEvent>>>,
889    consumer_group: Option<String>,
890}
891
892/// Backend-agnostic consumer wrapper
893enum BackendConsumer {
894    #[cfg(feature = "kafka")]
895    Kafka(backend::kafka::KafkaConsumer),
896    #[cfg(feature = "nats")]
897    Nats(Box<backend::nats::NatsConsumer>),
898    #[cfg(feature = "redis")]
899    Redis(backend::redis::RedisConsumer),
900    #[cfg(feature = "kinesis")]
901    Kinesis(backend::kinesis::KinesisConsumer),
902    #[cfg(feature = "pulsar")]
903    Pulsar(Box<backend::pulsar::PulsarConsumer>),
904    #[cfg(feature = "rabbitmq")]
905    RabbitMQ(Box<backend::rabbitmq::RabbitMQConsumer>),
906    Memory(MemoryConsumer),
907}
908
909/// Consumer statistics for monitoring
910#[derive(Debug, Default, Clone)]
911pub struct ConsumerStats {
912    events_consumed: u64,
913    events_failed: u64,
914    _bytes_received: u64,
915    avg_processing_time_ms: f64,
916    max_processing_time_ms: u64,
917    _consumer_lag: u64,
918    circuit_breaker_trips: u64,
919    last_message: Option<DateTime<Utc>>,
920    backend_type: String,
921    _batch_size: usize,
922}
923
924/// Memory-based consumer for testing and development
925struct MemoryConsumer {
926    backend: Box<dyn StreamBackend + Send + Sync>,
927    topic: String,
928    current_offset: u64,
929    stats: ConsumerStats,
930}
931
932impl MemoryConsumer {
933    fn _new() -> Self {
934        Self {
935            backend: Box::new(backend::memory::MemoryBackend::new()),
936            topic: "oxirs-stream".to_string(),
937            current_offset: 0,
938            stats: ConsumerStats {
939                backend_type: "memory".to_string(),
940                ..Default::default()
941            },
942        }
943    }
944
945    fn with_topic(topic: String) -> Self {
946        Self {
947            backend: Box::new(backend::memory::MemoryBackend::new()),
948            topic,
949            current_offset: 0,
950            stats: ConsumerStats {
951                backend_type: "memory".to_string(),
952                ..Default::default()
953            },
954        }
955    }
956
957    async fn consume(&mut self) -> Result<Option<StreamEvent>> {
958        let start_time = Instant::now();
959
960        // Use the backend implementation
961        self.backend
962            .as_mut()
963            .connect()
964            .await
965            .map_err(|e| anyhow!("Backend connect failed: {}", e))?;
966
967        // Try to receive events from the backend
968        let topic_name = crate::types::TopicName::new(self.topic.clone());
969        let events = self
970            .backend
971            .receive_events(
972                &topic_name,
973                None, // No consumer group for now
974                crate::types::StreamPosition::Offset(self.current_offset),
975                1, // Get one event at a time
976            )
977            .await
978            .map_err(|e| anyhow!("Receive events failed: {}", e))?;
979
980        if let Some((event, offset)) = events.first() {
981            self.current_offset = offset.value() + 1;
982
983            // Update stats
984            self.stats.events_consumed += 1;
985            let processing_time = start_time.elapsed().as_millis() as u64;
986            self.stats.max_processing_time_ms =
987                self.stats.max_processing_time_ms.max(processing_time);
988            self.stats.avg_processing_time_ms =
989                (self.stats.avg_processing_time_ms + processing_time as f64) / 2.0;
990            self.stats.last_message = Some(Utc::now());
991
992            debug!("Memory consumer: consumed event via backend");
993            Ok(Some(event.clone()))
994        } else {
995            debug!("Memory consumer: no events available");
996            Ok(None)
997        }
998    }
999
1000    fn _get_stats(&self) -> &ConsumerStats {
1001        &self.stats
1002    }
1003
1004    /// Reset consumer position for testing
1005    fn reset(&mut self) {
1006        self.current_offset = 0;
1007    }
1008}
1009
1010impl StreamConsumer {
1011    /// Create a new enhanced stream consumer with backend support
1012    pub async fn new(config: StreamConfig) -> Result<Self> {
1013        Self::new_with_group(config, None).await
1014    }
1015
1016    /// Create a new stream consumer with a specific consumer group
1017    pub async fn new_with_group(
1018        config: StreamConfig,
1019        consumer_group: Option<String>,
1020    ) -> Result<Self> {
1021        // Initialize circuit breaker if enabled
1022        let circuit_breaker = if config.circuit_breaker.enabled {
1023            Some(circuit_breaker::new_shared_circuit_breaker(
1024                circuit_breaker::CircuitBreakerConfig {
1025                    enabled: config.circuit_breaker.enabled,
1026                    failure_threshold: config.circuit_breaker.failure_threshold,
1027                    success_threshold: config.circuit_breaker.success_threshold,
1028                    timeout: config.circuit_breaker.timeout,
1029                    half_open_max_calls: config.circuit_breaker.half_open_max_calls,
1030                    ..Default::default()
1031                },
1032            ))
1033        } else {
1034            None
1035        };
1036
1037        // Initialize backend-specific consumer
1038        let backend_consumer = match &config.backend {
1039            #[cfg(feature = "kafka")]
1040            StreamBackendType::Kafka {
1041                brokers,
1042                security_protocol,
1043                sasl_config,
1044            } => {
1045                let stream_config = crate::StreamConfig {
1046                    backend: crate::StreamBackendType::Kafka {
1047                        brokers: brokers.clone(),
1048                        security_protocol: security_protocol.clone(),
1049                        sasl_config: sasl_config.clone(),
1050                    },
1051                    topic: config.topic.clone(),
1052                    batch_size: config.batch_size,
1053                    flush_interval_ms: config.flush_interval_ms,
1054                    max_connections: config.max_connections,
1055                    connection_timeout: config.connection_timeout,
1056                    enable_compression: config.enable_compression,
1057                    compression_type: config.compression_type.clone(),
1058                    retry_config: config.retry_config.clone(),
1059                    circuit_breaker: config.circuit_breaker.clone(),
1060                    security: config.security.clone(),
1061                    performance: config.performance.clone(),
1062                    monitoring: config.monitoring.clone(),
1063                };
1064
1065                let mut consumer = backend::kafka::KafkaConsumer::new(stream_config)?;
1066                consumer.connect().await?;
1067                BackendConsumer::Kafka(consumer)
1068            }
1069            #[cfg(feature = "nats")]
1070            StreamBackendType::Nats {
1071                url,
1072                cluster_urls,
1073                jetstream_config,
1074            } => {
1075                let stream_config = crate::StreamConfig {
1076                    backend: crate::StreamBackendType::Nats {
1077                        url: url.clone(),
1078                        cluster_urls: cluster_urls.clone(),
1079                        jetstream_config: jetstream_config.clone(),
1080                    },
1081                    topic: config.topic.clone(),
1082                    batch_size: config.batch_size,
1083                    flush_interval_ms: config.flush_interval_ms,
1084                    max_connections: config.max_connections,
1085                    connection_timeout: config.connection_timeout,
1086                    enable_compression: config.enable_compression,
1087                    compression_type: config.compression_type.clone(),
1088                    retry_config: config.retry_config.clone(),
1089                    circuit_breaker: config.circuit_breaker.clone(),
1090                    security: config.security.clone(),
1091                    performance: config.performance.clone(),
1092                    monitoring: config.monitoring.clone(),
1093                };
1094
1095                let mut consumer = backend::nats::NatsConsumer::new(stream_config)?;
1096                consumer.connect().await?;
1097                BackendConsumer::Nats(Box::new(consumer))
1098            }
1099            #[cfg(feature = "redis")]
1100            StreamBackendType::Redis {
1101                url,
1102                cluster_urls,
1103                pool_size,
1104            } => {
1105                let stream_config = crate::StreamConfig {
1106                    backend: crate::StreamBackendType::Redis {
1107                        url: url.clone(),
1108                        cluster_urls: cluster_urls.clone(),
1109                        pool_size: *pool_size,
1110                    },
1111                    topic: config.topic.clone(),
1112                    batch_size: config.batch_size,
1113                    flush_interval_ms: config.flush_interval_ms,
1114                    max_connections: config.max_connections,
1115                    connection_timeout: config.connection_timeout,
1116                    enable_compression: config.enable_compression,
1117                    compression_type: config.compression_type.clone(),
1118                    retry_config: config.retry_config.clone(),
1119                    circuit_breaker: config.circuit_breaker.clone(),
1120                    security: config.security.clone(),
1121                    performance: config.performance.clone(),
1122                    monitoring: config.monitoring.clone(),
1123                };
1124
1125                let mut consumer = backend::redis::RedisConsumer::new(stream_config)?;
1126                consumer.connect().await?;
1127                BackendConsumer::Redis(consumer)
1128            }
1129            #[cfg(feature = "kinesis")]
1130            StreamBackendType::Kinesis {
1131                region,
1132                stream_name,
1133                credentials,
1134            } => {
1135                let stream_config = crate::StreamConfig {
1136                    backend: crate::StreamBackendType::Kinesis {
1137                        region: region.clone(),
1138                        stream_name: stream_name.clone(),
1139                        credentials: credentials.clone(),
1140                    },
1141                    topic: config.topic.clone(),
1142                    batch_size: config.batch_size,
1143                    flush_interval_ms: config.flush_interval_ms,
1144                    max_connections: config.max_connections,
1145                    connection_timeout: config.connection_timeout,
1146                    enable_compression: config.enable_compression,
1147                    compression_type: config.compression_type.clone(),
1148                    retry_config: config.retry_config.clone(),
1149                    circuit_breaker: config.circuit_breaker.clone(),
1150                    security: config.security.clone(),
1151                    performance: config.performance.clone(),
1152                    monitoring: config.monitoring.clone(),
1153                };
1154
1155                let mut consumer = backend::kinesis::KinesisConsumer::new(stream_config)?;
1156                consumer.connect().await?;
1157                BackendConsumer::Kinesis(consumer)
1158            }
1159            #[cfg(feature = "pulsar")]
1160            StreamBackendType::Pulsar {
1161                service_url,
1162                auth_config,
1163            } => {
1164                let stream_config = crate::StreamConfig {
1165                    backend: crate::StreamBackendType::Pulsar {
1166                        service_url: service_url.clone(),
1167                        auth_config: auth_config.clone(),
1168                    },
1169                    topic: config.topic.clone(),
1170                    batch_size: config.batch_size,
1171                    flush_interval_ms: config.flush_interval_ms,
1172                    max_connections: config.max_connections,
1173                    connection_timeout: config.connection_timeout,
1174                    enable_compression: config.enable_compression,
1175                    compression_type: config.compression_type.clone(),
1176                    retry_config: config.retry_config.clone(),
1177                    circuit_breaker: config.circuit_breaker.clone(),
1178                    security: config.security.clone(),
1179                    performance: config.performance.clone(),
1180                    monitoring: config.monitoring.clone(),
1181                };
1182
1183                let mut consumer = backend::pulsar::PulsarConsumer::new(stream_config)?;
1184                consumer.connect().await?;
1185                BackendConsumer::Pulsar(Box::new(consumer))
1186            }
1187            #[cfg(feature = "rabbitmq")]
1188            StreamBackendType::RabbitMQ {
1189                url,
1190                exchange,
1191                queue,
1192            } => {
1193                let stream_config = crate::StreamConfig {
1194                    backend: crate::StreamBackendType::RabbitMQ {
1195                        url: url.clone(),
1196                        exchange: exchange.clone(),
1197                        queue: queue.clone(),
1198                    },
1199                    topic: config.topic.clone(),
1200                    batch_size: config.batch_size,
1201                    flush_interval_ms: config.flush_interval_ms,
1202                    max_connections: config.max_connections,
1203                    connection_timeout: config.connection_timeout,
1204                    enable_compression: config.enable_compression,
1205                    compression_type: config.compression_type.clone(),
1206                    retry_config: config.retry_config.clone(),
1207                    circuit_breaker: config.circuit_breaker.clone(),
1208                    security: config.security.clone(),
1209                    performance: config.performance.clone(),
1210                    monitoring: config.monitoring.clone(),
1211                };
1212
1213                let mut consumer = backend::rabbitmq::RabbitMQConsumer::new(stream_config)?;
1214                consumer.connect().await?;
1215                BackendConsumer::RabbitMQ(Box::new(consumer))
1216            }
1217            StreamBackendType::Memory {
1218                max_size: _,
1219                persistence: _,
1220            } => BackendConsumer::Memory(MemoryConsumer::with_topic(config.topic.clone())),
1221        };
1222
1223        let stats = Arc::new(RwLock::new(ConsumerStats {
1224            backend_type: match backend_consumer {
1225                #[cfg(feature = "kafka")]
1226                BackendConsumer::Kafka(_) => "kafka".to_string(),
1227                #[cfg(feature = "nats")]
1228                BackendConsumer::Nats(_) => "nats".to_string(),
1229                #[cfg(feature = "redis")]
1230                BackendConsumer::Redis(_) => "redis".to_string(),
1231                #[cfg(feature = "kinesis")]
1232                BackendConsumer::Kinesis(_) => "kinesis".to_string(),
1233                #[cfg(feature = "pulsar")]
1234                BackendConsumer::Pulsar(_) => "pulsar".to_string(),
1235                #[cfg(feature = "rabbitmq")]
1236                BackendConsumer::RabbitMQ(_) => "rabbitmq".to_string(),
1237                BackendConsumer::Memory(_) => "memory".to_string(),
1238            },
1239            _batch_size: config.batch_size,
1240            ..Default::default()
1241        }));
1242
1243        info!(
1244            "Created stream consumer with backend: {} and group: {:?}",
1245            stats.read().await.backend_type,
1246            consumer_group
1247        );
1248
1249        Ok(Self {
1250            _config: config,
1251            backend_consumer,
1252            stats,
1253            circuit_breaker,
1254            last_poll: Instant::now(),
1255            _message_buffer: Arc::new(RwLock::new(Vec::new())),
1256            consumer_group,
1257        })
1258    }
1259
1260    /// Consume stream events with circuit breaker protection
1261    pub async fn consume(&mut self) -> Result<Option<StreamEvent>> {
1262        let start_time = Instant::now();
1263
1264        // Check circuit breaker if enabled
1265        if let Some(cb) = &self.circuit_breaker {
1266            if !cb.can_execute().await {
1267                self.stats.write().await.circuit_breaker_trips += 1;
1268                return Err(anyhow!("Circuit breaker is open - cannot consume events"));
1269            }
1270        }
1271
1272        // Consume from backend
1273        let result = self.consume_single_event().await;
1274
1275        // Update circuit breaker and stats
1276        match &result {
1277            Ok(Some(_)) => {
1278                if let Some(cb) = &self.circuit_breaker {
1279                    cb.record_success_with_duration(start_time.elapsed()).await;
1280                }
1281
1282                let mut stats = self.stats.write().await;
1283                stats.events_consumed += 1;
1284                let processing_time = start_time.elapsed().as_millis() as u64;
1285                stats.max_processing_time_ms = stats.max_processing_time_ms.max(processing_time);
1286                stats.avg_processing_time_ms =
1287                    (stats.avg_processing_time_ms + processing_time as f64) / 2.0;
1288                stats.last_message = Some(Utc::now());
1289            }
1290            Ok(None) => {
1291                // No message available, not an error
1292                if let Some(cb) = &self.circuit_breaker {
1293                    cb.record_success_with_duration(start_time.elapsed()).await;
1294                }
1295            }
1296            Err(_) => {
1297                if let Some(cb) = &self.circuit_breaker {
1298                    cb.record_failure_with_type(circuit_breaker::FailureType::NetworkError)
1299                        .await;
1300                }
1301
1302                self.stats.write().await.events_failed += 1;
1303            }
1304        }
1305
1306        self.last_poll = Instant::now();
1307        result
1308    }
1309
1310    /// Consume a single event from the backend
1311    async fn consume_single_event(&mut self) -> Result<Option<StreamEvent>> {
1312        match &mut self.backend_consumer {
1313            #[cfg(feature = "kafka")]
1314            BackendConsumer::Kafka(consumer) => consumer.consume().await,
1315            #[cfg(feature = "nats")]
1316            BackendConsumer::Nats(consumer) => consumer.consume().await,
1317            #[cfg(feature = "redis")]
1318            BackendConsumer::Redis(consumer) => consumer.consume().await,
1319            #[cfg(feature = "kinesis")]
1320            BackendConsumer::Kinesis(consumer) => consumer.consume().await,
1321            #[cfg(feature = "pulsar")]
1322            BackendConsumer::Pulsar(consumer) => consumer.consume().await,
1323            #[cfg(feature = "rabbitmq")]
1324            BackendConsumer::RabbitMQ(consumer) => consumer.consume().await,
1325            BackendConsumer::Memory(consumer) => consumer.consume().await,
1326        }
1327    }
1328
1329    /// Consume multiple events as a batch
1330    pub async fn consume_batch(
1331        &mut self,
1332        max_events: usize,
1333        timeout: Duration,
1334    ) -> Result<Vec<StreamEvent>> {
1335        let mut events = Vec::new();
1336        let start_time = Instant::now();
1337
1338        while events.len() < max_events && start_time.elapsed() < timeout {
1339            match tokio::time::timeout(Duration::from_millis(50), self.consume()).await {
1340                Ok(Ok(Some(event))) => events.push(event),
1341                Ok(Ok(None)) => continue,
1342                Ok(Err(e)) => return Err(e),
1343                Err(_) => break, // Timeout
1344            }
1345        }
1346
1347        if !events.is_empty() {
1348            debug!(
1349                "Consumed batch of {} events in {:?}",
1350                events.len(),
1351                start_time.elapsed()
1352            );
1353        }
1354
1355        Ok(events)
1356    }
1357
1358    /// Start consuming events with a callback function
1359    pub async fn start_consuming<F>(&mut self, mut callback: F) -> Result<()>
1360    where
1361        F: FnMut(StreamEvent) -> Result<()> + Send,
1362    {
1363        info!("Starting stream consumer loop");
1364
1365        loop {
1366            match self.consume().await {
1367                Ok(Some(event)) => {
1368                    if let Err(e) = callback(event) {
1369                        error!("Callback error: {}", e);
1370                        self.stats.write().await.events_failed += 1;
1371                    }
1372                }
1373                Ok(None) => {
1374                    // No message, wait a bit
1375                    tokio::time::sleep(Duration::from_millis(10)).await;
1376                }
1377                Err(e) => {
1378                    error!("Consumer error: {}", e);
1379                    tokio::time::sleep(Duration::from_millis(100)).await;
1380                }
1381            }
1382        }
1383    }
1384
1385    /// Start consuming events with an async callback function
1386    pub async fn start_consuming_async<F, Fut>(&mut self, mut callback: F) -> Result<()>
1387    where
1388        F: FnMut(StreamEvent) -> Fut + Send,
1389        Fut: std::future::Future<Output = Result<()>> + Send,
1390    {
1391        info!("Starting async stream consumer loop");
1392
1393        loop {
1394            match self.consume().await {
1395                Ok(Some(event)) => {
1396                    if let Err(e) = callback(event).await {
1397                        error!("Async callback error: {}", e);
1398                        self.stats.write().await.events_failed += 1;
1399                    }
1400                }
1401                Ok(None) => {
1402                    // No message, wait a bit
1403                    tokio::time::sleep(Duration::from_millis(10)).await;
1404                }
1405                Err(e) => {
1406                    error!("Consumer error: {}", e);
1407                    tokio::time::sleep(Duration::from_millis(100)).await;
1408                }
1409            }
1410        }
1411    }
1412
1413    /// Get consumer statistics
1414    pub async fn get_stats(&self) -> ConsumerStats {
1415        self.stats.read().await.clone()
1416    }
1417
1418    /// Get consumer health status
1419    pub async fn health_check(&self) -> bool {
1420        if let Some(cb) = &self.circuit_breaker {
1421            cb.is_healthy().await
1422        } else {
1423            true
1424        }
1425    }
1426
1427    /// Get the consumer group name if any
1428    pub fn consumer_group(&self) -> Option<&String> {
1429        self.consumer_group.as_ref()
1430    }
1431
1432    /// Reset consumer position (for testing with memory backend)
1433    pub async fn reset_position(&mut self) -> Result<()> {
1434        match &mut self.backend_consumer {
1435            BackendConsumer::Memory(consumer) => {
1436                consumer.reset();
1437                Ok(())
1438            }
1439            #[cfg(feature = "kafka")]
1440            BackendConsumer::Kafka(_) => {
1441                Err(anyhow!("Reset position not supported for Kafka backend"))
1442            }
1443            #[cfg(feature = "nats")]
1444            BackendConsumer::Nats(_) => {
1445                Err(anyhow!("Reset position not supported for NATS backend"))
1446            }
1447            #[cfg(feature = "redis")]
1448            BackendConsumer::Redis(_) => {
1449                Err(anyhow!("Reset position not supported for Redis backend"))
1450            }
1451            #[cfg(feature = "kinesis")]
1452            BackendConsumer::Kinesis(_) => {
1453                Err(anyhow!("Reset position not supported for Kinesis backend"))
1454            }
1455            #[cfg(feature = "pulsar")]
1456            BackendConsumer::Pulsar(_) => {
1457                Err(anyhow!("Reset position not supported for Pulsar backend"))
1458            }
1459            #[cfg(feature = "rabbitmq")]
1460            BackendConsumer::RabbitMQ(_) => {
1461                Err(anyhow!("Reset position not supported for RabbitMQ backend"))
1462            }
1463        }
1464    }
1465
1466    /// Set test events for memory backend (for testing) - deprecated with backend implementation
1467    pub async fn set_test_events(&mut self, _events: Vec<StreamEvent>) -> Result<()> {
1468        // This method is deprecated with the new backend implementation
1469        // Events should be published through the producer instead
1470        Err(anyhow!("set_test_events is deprecated with backend implementation - use producer to publish events"))
1471    }
1472}
1473
1474/// RDF patch operations with full protocol support
1475#[derive(Debug, Clone, Serialize, Deserialize)]
1476pub enum PatchOperation {
1477    /// Add a triple (A operation)
1478    Add {
1479        subject: String,
1480        predicate: String,
1481        object: String,
1482    },
1483    /// Delete a triple (D operation)
1484    Delete {
1485        subject: String,
1486        predicate: String,
1487        object: String,
1488    },
1489    /// Add a graph (GA operation)
1490    AddGraph { graph: String },
1491    /// Delete a graph (GD operation)
1492    DeleteGraph { graph: String },
1493    /// Add a prefix (PA operation)
1494    AddPrefix { prefix: String, namespace: String },
1495    /// Delete a prefix (PD operation)
1496    DeletePrefix { prefix: String },
1497    /// Transaction begin (TX operation)
1498    TransactionBegin { transaction_id: Option<String> },
1499    /// Transaction commit (TC operation)
1500    TransactionCommit,
1501    /// Transaction abort (TA operation)
1502    TransactionAbort,
1503    /// Header information (H operation)
1504    Header { key: String, value: String },
1505}
1506
1507/// RDF patch for atomic updates with full protocol support
1508#[derive(Debug, Clone, Serialize, Deserialize)]
1509pub struct RdfPatch {
1510    pub operations: Vec<PatchOperation>,
1511    pub timestamp: chrono::DateTime<chrono::Utc>,
1512    pub id: String,
1513    /// Patch headers for metadata
1514    pub headers: HashMap<String, String>,
1515    /// Current transaction ID if in transaction
1516    pub transaction_id: Option<String>,
1517    /// Prefixes used in the patch
1518    pub prefixes: HashMap<String, String>,
1519}
1520
1521impl RdfPatch {
1522    /// Create a new RDF patch
1523    pub fn new() -> Self {
1524        Self {
1525            operations: Vec::new(),
1526            timestamp: chrono::Utc::now(),
1527            id: uuid::Uuid::new_v4().to_string(),
1528            headers: HashMap::new(),
1529            transaction_id: None,
1530            prefixes: HashMap::new(),
1531        }
1532    }
1533
1534    /// Add an operation to the patch
1535    pub fn add_operation(&mut self, operation: PatchOperation) {
1536        self.operations.push(operation);
1537    }
1538
1539    /// Serialize patch to RDF Patch format
1540    pub fn to_rdf_patch_format(&self) -> Result<String> {
1541        let serializer = crate::patch::PatchSerializer::new()
1542            .with_pretty_print(true)
1543            .with_metadata(true);
1544        serializer.serialize(self)
1545    }
1546
1547    /// Parse from RDF Patch format
1548    pub fn from_rdf_patch_format(input: &str) -> Result<Self> {
1549        let mut parser = crate::patch::PatchParser::new().with_strict_mode(false);
1550        parser.parse(input)
1551    }
1552}
1553
1554impl Default for RdfPatch {
1555    fn default() -> Self {
1556        Self::new()
1557    }
1558}
1559
1560// Default implementations for easier configuration
1561impl Default for StreamConfig {
1562    fn default() -> Self {
1563        Self {
1564            backend: StreamBackendType::Memory {
1565                max_size: Some(10000),
1566                persistence: false,
1567            },
1568            topic: "oxirs-stream".to_string(),
1569            batch_size: 100,
1570            flush_interval_ms: 100,
1571            max_connections: 10,
1572            connection_timeout: Duration::from_secs(30),
1573            enable_compression: false,
1574            compression_type: CompressionType::None,
1575            retry_config: RetryConfig::default(),
1576            circuit_breaker: CircuitBreakerConfig::default(),
1577            security: SecurityConfig::default(),
1578            performance: StreamPerformanceConfig::default(),
1579            monitoring: MonitoringConfig::default(),
1580        }
1581    }
1582}
1583
1584impl Default for RetryConfig {
1585    fn default() -> Self {
1586        Self {
1587            max_retries: 3,
1588            initial_backoff: Duration::from_millis(100),
1589            max_backoff: Duration::from_secs(30),
1590            backoff_multiplier: 2.0,
1591            jitter: true,
1592        }
1593    }
1594}
1595
1596impl Default for CircuitBreakerConfig {
1597    fn default() -> Self {
1598        Self {
1599            enabled: true,
1600            failure_threshold: 5,
1601            success_threshold: 3,
1602            timeout: Duration::from_secs(30),
1603            half_open_max_calls: 3,
1604        }
1605    }
1606}
1607
1608impl Default for SecurityConfig {
1609    fn default() -> Self {
1610        Self {
1611            enable_tls: false,
1612            verify_certificates: true,
1613            client_cert_path: None,
1614            client_key_path: None,
1615            ca_cert_path: None,
1616            sasl_config: None,
1617        }
1618    }
1619}
1620
1621impl Default for StreamPerformanceConfig {
1622    fn default() -> Self {
1623        Self {
1624            enable_batching: true,
1625            enable_pipelining: false,
1626            buffer_size: 8192,
1627            prefetch_count: 100,
1628            enable_zero_copy: false,
1629            enable_simd: false,
1630            parallel_processing: true,
1631            worker_threads: None,
1632        }
1633    }
1634}
1635
1636impl Default for MonitoringConfig {
1637    fn default() -> Self {
1638        Self {
1639            enable_metrics: true,
1640            enable_tracing: true,
1641            metrics_interval: Duration::from_secs(60),
1642            health_check_interval: Duration::from_secs(30),
1643            enable_profiling: false,
1644            prometheus_endpoint: None,
1645            jaeger_endpoint: None,
1646            log_level: "info".to_string(),
1647        }
1648    }
1649}
1650
1651/// Helper functions for creating common configurations
1652impl StreamConfig {
1653    /// Create a Redis configuration
1654    #[cfg(feature = "redis")]
1655    pub fn redis(url: String) -> Self {
1656        Self {
1657            backend: StreamBackendType::Redis {
1658                url,
1659                cluster_urls: None,
1660                pool_size: Some(10),
1661            },
1662            ..Default::default()
1663        }
1664    }
1665
1666    /// Create a Kinesis configuration
1667    #[cfg(feature = "kinesis")]
1668    pub fn kinesis(region: String, stream_name: String) -> Self {
1669        Self {
1670            backend: StreamBackendType::Kinesis {
1671                region,
1672                stream_name,
1673                credentials: None,
1674            },
1675            ..Default::default()
1676        }
1677    }
1678
1679    /// Create a memory configuration for testing
1680    pub fn memory() -> Self {
1681        Self {
1682            backend: StreamBackendType::Memory {
1683                max_size: Some(1000),
1684                persistence: false,
1685            },
1686            ..Default::default()
1687        }
1688    }
1689
1690    /// Enable high-performance configuration
1691    pub fn high_performance(mut self) -> Self {
1692        self.performance.enable_batching = true;
1693        self.performance.enable_pipelining = true;
1694        self.performance.parallel_processing = true;
1695        self.performance.buffer_size = 65536;
1696        self.performance.prefetch_count = 1000;
1697        self.batch_size = 1000;
1698        self.flush_interval_ms = 10;
1699        self
1700    }
1701
1702    /// Enable compression
1703    pub fn with_compression(mut self, compression_type: CompressionType) -> Self {
1704        self.enable_compression = true;
1705        self.compression_type = compression_type;
1706        self
1707    }
1708
1709    /// Configure circuit breaker
1710    pub fn with_circuit_breaker(mut self, enabled: bool, failure_threshold: u32) -> Self {
1711        self.circuit_breaker.enabled = enabled;
1712        self.circuit_breaker.failure_threshold = failure_threshold;
1713        self
1714    }
1715
1716    /// Create a development configuration with memory backend and debug settings
1717    pub fn development(topic: &str) -> Self {
1718        Self {
1719            backend: StreamBackendType::Memory {
1720                max_size: Some(10000),
1721                persistence: false,
1722            },
1723            topic: topic.to_string(),
1724            batch_size: 10,
1725            flush_interval_ms: 100,
1726            max_connections: 5,
1727            connection_timeout: Duration::from_secs(10),
1728            enable_compression: false,
1729            compression_type: CompressionType::None,
1730            retry_config: RetryConfig {
1731                max_retries: 3,
1732                initial_backoff: Duration::from_millis(100),
1733                max_backoff: Duration::from_secs(5),
1734                backoff_multiplier: 2.0,
1735                jitter: true,
1736            },
1737            circuit_breaker: CircuitBreakerConfig {
1738                enabled: false,
1739                failure_threshold: 5,
1740                success_threshold: 2,
1741                timeout: Duration::from_secs(60),
1742                half_open_max_calls: 10,
1743            },
1744            security: SecurityConfig::default(),
1745            performance: StreamPerformanceConfig::default(),
1746            monitoring: MonitoringConfig {
1747                enable_metrics: true,
1748                enable_tracing: false,
1749                metrics_interval: Duration::from_secs(5),
1750                health_check_interval: Duration::from_secs(30),
1751                enable_profiling: false,
1752                prometheus_endpoint: None,
1753                jaeger_endpoint: None,
1754                log_level: "debug".to_string(),
1755            },
1756        }
1757    }
1758
1759    /// Create a production configuration with optimal performance settings
1760    pub fn production(topic: &str) -> Self {
1761        Self {
1762            backend: StreamBackendType::Memory {
1763                max_size: Some(100000),
1764                persistence: true,
1765            },
1766            topic: topic.to_string(),
1767            batch_size: 1000,
1768            flush_interval_ms: 10,
1769            max_connections: 50,
1770            connection_timeout: Duration::from_secs(30),
1771            enable_compression: true,
1772            compression_type: CompressionType::Zstd,
1773            retry_config: RetryConfig {
1774                max_retries: 5,
1775                initial_backoff: Duration::from_millis(200),
1776                max_backoff: Duration::from_secs(30),
1777                backoff_multiplier: 2.0,
1778                jitter: true,
1779            },
1780            circuit_breaker: CircuitBreakerConfig {
1781                enabled: true,
1782                failure_threshold: 10,
1783                success_threshold: 3,
1784                timeout: Duration::from_secs(30),
1785                half_open_max_calls: 5,
1786            },
1787            security: SecurityConfig::default(),
1788            performance: StreamPerformanceConfig {
1789                enable_batching: true,
1790                enable_pipelining: true,
1791                parallel_processing: true,
1792                buffer_size: 65536,
1793                prefetch_count: 1000,
1794                enable_zero_copy: true,
1795                enable_simd: true,
1796                worker_threads: None,
1797            },
1798            monitoring: MonitoringConfig {
1799                enable_metrics: true,
1800                enable_tracing: true,
1801                metrics_interval: Duration::from_secs(1),
1802                health_check_interval: Duration::from_secs(10),
1803                enable_profiling: true,
1804                prometheus_endpoint: None,
1805                jaeger_endpoint: None,
1806                log_level: "info".to_string(),
1807            },
1808        }
1809    }
1810}
1811
1812/// Unified Stream interface that combines producer and consumer functionality
1813pub struct Stream {
1814    producer: StreamProducer,
1815    consumer: StreamConsumer,
1816}
1817
1818impl Stream {
1819    /// Create a new unified stream instance
1820    pub async fn new(config: StreamConfig) -> Result<Self> {
1821        // Note: Commented out automatic clearing for performance tests
1822        // Clear memory events if using memory backend (important for testing)
1823        // if matches!(config.backend, StreamBackendType::Memory { .. }) {
1824        //     clear_memory_events().await;
1825        // }
1826
1827        let producer = StreamProducer::new(config.clone()).await?;
1828        let consumer = StreamConsumer::new(config).await?;
1829
1830        Ok(Self { producer, consumer })
1831    }
1832
1833    /// Publish an event to the stream
1834    pub async fn publish(&mut self, event: StreamEvent) -> Result<()> {
1835        self.producer.publish(event).await
1836    }
1837
1838    /// Consume an event from the stream
1839    pub async fn consume(&mut self) -> Result<Option<StreamEvent>> {
1840        self.consumer.consume().await
1841    }
1842
1843    /// Flush any pending events
1844    pub async fn flush(&mut self) -> Result<()> {
1845        self.producer.flush().await
1846    }
1847
1848    /// Get producer statistics
1849    pub async fn producer_stats(&self) -> ProducerStats {
1850        self.producer.get_stats().await
1851    }
1852
1853    /// Get consumer statistics  
1854    pub async fn consumer_stats(&self) -> ConsumerStats {
1855        self.consumer.get_stats().await
1856    }
1857
1858    /// Close the stream and clean up resources
1859    pub async fn close(&mut self) -> Result<()> {
1860        // Flush any pending events
1861        self.producer.flush().await?;
1862
1863        // Close backend connections
1864        // Producer and consumer close operations would be handled by their Drop implementations
1865        debug!("Stream closed successfully");
1866        Ok(())
1867    }
1868
1869    /// Perform a health check on the stream
1870    pub async fn health_check(&self) -> Result<bool> {
1871        // Basic health check - verify that the stream is operational
1872        // This is a simplified implementation
1873        Ok(true)
1874    }
1875
1876    /// Begin a transaction (placeholder implementation)
1877    pub async fn begin_transaction(&mut self) -> Result<()> {
1878        // Placeholder implementation for transaction support
1879        debug!("Transaction begun (placeholder)");
1880        Ok(())
1881    }
1882
1883    /// Commit a transaction (placeholder implementation)
1884    pub async fn commit_transaction(&mut self) -> Result<()> {
1885        // Placeholder implementation for transaction support
1886        debug!("Transaction committed (placeholder)");
1887        Ok(())
1888    }
1889
1890    /// Rollback a transaction (placeholder implementation)
1891    pub async fn rollback_transaction(&mut self) -> Result<()> {
1892        // Placeholder implementation for transaction support
1893        debug!("Transaction rolled back (placeholder)");
1894        Ok(())
1895    }
1896}