1use anyhow::{anyhow, Result};
4use chrono::{DateTime, Utc};
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9use tokio::sync::{RwLock, Semaphore};
10use tracing::{debug, error, info};
11use uuid::Uuid;
12
13use crate::backend::{self, StreamBackend};
14use crate::circuit_breaker::{self, SharedCircuitBreakerExt};
15use crate::event::{EventMetadata, StreamEvent};
16
17#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct StreamConfig {
20 pub backend: StreamBackendType,
21 pub topic: String,
22 pub batch_size: usize,
23 pub flush_interval_ms: u64,
24 pub max_connections: usize,
26 pub connection_timeout: Duration,
28 pub enable_compression: bool,
30 pub compression_type: CompressionType,
32 pub retry_config: RetryConfig,
34 pub circuit_breaker: CircuitBreakerConfig,
36 pub security: SecurityConfig,
38 pub performance: StreamPerformanceConfig,
40 pub monitoring: MonitoringConfig,
42}
43
44#[derive(Debug, Clone, Serialize, Deserialize)]
46pub enum CompressionType {
47 None,
48 Gzip,
49 Snappy,
50 Lz4,
51 Zstd,
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct RetryConfig {
57 pub max_retries: u32,
58 pub initial_backoff: Duration,
59 pub max_backoff: Duration,
60 pub backoff_multiplier: f64,
61 pub jitter: bool,
62}
63
64#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct CircuitBreakerConfig {
67 pub enabled: bool,
68 pub failure_threshold: u32,
69 pub success_threshold: u32,
70 pub timeout: Duration,
71 pub half_open_max_calls: u32,
72}
73
74#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct SecurityConfig {
77 pub enable_tls: bool,
78 pub verify_certificates: bool,
79 pub client_cert_path: Option<String>,
80 pub client_key_path: Option<String>,
81 pub ca_cert_path: Option<String>,
82 pub sasl_config: Option<SaslConfig>,
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct SaslConfig {
88 pub mechanism: SaslMechanism,
89 pub username: String,
90 pub password: String,
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize)]
95pub enum SaslMechanism {
96 Plain,
97 ScramSha256,
98 ScramSha512,
99 OAuthBearer,
100}
101
102#[derive(Debug, Clone, Serialize, Deserialize)]
104pub struct StreamPerformanceConfig {
105 pub enable_batching: bool,
106 pub enable_pipelining: bool,
107 pub buffer_size: usize,
108 pub prefetch_count: u32,
109 pub enable_zero_copy: bool,
110 pub enable_simd: bool,
111 pub parallel_processing: bool,
112 pub worker_threads: Option<usize>,
113}
114
115#[derive(Debug, Clone, Serialize, Deserialize)]
117pub struct MonitoringConfig {
118 pub enable_metrics: bool,
119 pub enable_tracing: bool,
120 pub metrics_interval: Duration,
121 pub health_check_interval: Duration,
122 pub enable_profiling: bool,
123 pub prometheus_endpoint: Option<String>,
124 pub jaeger_endpoint: Option<String>,
125 pub log_level: String,
126}
127
128#[derive(Debug, Clone, Serialize, Deserialize)]
130pub enum StreamBackendType {
131 #[cfg(feature = "kafka")]
132 Kafka {
133 brokers: Vec<String>,
134 security_protocol: Option<String>,
135 sasl_config: Option<SaslConfig>,
136 },
137 #[cfg(feature = "nats")]
138 Nats {
139 url: String,
140 cluster_urls: Option<Vec<String>>,
141 jetstream_config: Option<NatsJetStreamConfig>,
142 },
143 #[cfg(feature = "redis")]
144 Redis {
145 url: String,
146 cluster_urls: Option<Vec<String>>,
147 pool_size: Option<usize>,
148 },
149 #[cfg(feature = "kinesis")]
150 Kinesis {
151 region: String,
152 stream_name: String,
153 credentials: Option<AwsCredentials>,
154 },
155 #[cfg(feature = "pulsar")]
156 Pulsar {
157 service_url: String,
158 auth_config: Option<PulsarAuthConfig>,
159 },
160 #[cfg(feature = "rabbitmq")]
161 RabbitMQ {
162 url: String,
163 exchange: Option<String>,
164 queue: Option<String>,
165 },
166 Memory {
167 max_size: Option<usize>,
168 persistence: bool,
169 },
170}
171
172#[derive(Debug, Clone, Serialize, Deserialize)]
174pub struct NatsJetStreamConfig {
175 pub domain: Option<String>,
176 pub api_prefix: Option<String>,
177 pub timeout: Duration,
178}
179
180#[derive(Debug, Clone, Serialize, Deserialize)]
182pub struct AwsCredentials {
183 pub access_key_id: String,
184 pub secret_access_key: String,
185 pub session_token: Option<String>,
186 pub role_arn: Option<String>,
187}
188
189#[derive(Debug, Clone, Serialize, Deserialize)]
191pub struct PulsarAuthConfig {
192 pub auth_method: PulsarAuthMethod,
193 pub auth_params: HashMap<String, String>,
194}
195
196#[derive(Debug, Clone, Serialize, Deserialize)]
198pub enum PulsarAuthMethod {
199 Token,
200 Jwt,
201 Oauth2,
202 Tls,
203}
204
205pub struct StreamProducer {
207 config: StreamConfig,
208 backend_producer: BackendProducer,
209 stats: Arc<RwLock<ProducerStats>>,
210 circuit_breaker: Option<circuit_breaker::SharedCircuitBreaker>,
211 last_flush: Instant,
212 _pending_events: Arc<RwLock<Vec<StreamEvent>>>,
213 batch_buffer: Arc<RwLock<Vec<StreamEvent>>>,
214 flush_semaphore: Arc<Semaphore>,
215}
216
217enum BackendProducer {
219 #[cfg(feature = "kafka")]
220 Kafka(backend::kafka::KafkaProducer),
221 #[cfg(feature = "nats")]
222 Nats(Box<backend::nats::NatsProducer>),
223 #[cfg(feature = "redis")]
224 Redis(backend::redis::RedisProducer),
225 #[cfg(feature = "kinesis")]
226 Kinesis(backend::kinesis::KinesisProducer),
227 #[cfg(feature = "pulsar")]
228 Pulsar(Box<backend::pulsar::PulsarProducer>),
229 #[cfg(feature = "rabbitmq")]
230 RabbitMQ(Box<backend::rabbitmq::RabbitMQProducer>),
231 Memory(MemoryProducer),
232}
233
234#[derive(Debug, Default, Clone)]
236pub struct ProducerStats {
237 events_published: u64,
238 events_failed: u64,
239 _bytes_sent: u64,
240 avg_latency_ms: f64,
241 max_latency_ms: u64,
242 batch_count: u64,
243 flush_count: u64,
244 circuit_breaker_trips: u64,
245 last_publish: Option<DateTime<Utc>>,
246 backend_type: String,
247}
248
249type MemoryEventVec = Vec<(DateTime<Utc>, StreamEvent)>;
252type MemoryEventStore = Arc<RwLock<MemoryEventVec>>;
253
254static MEMORY_EVENTS: std::sync::OnceLock<MemoryEventStore> = std::sync::OnceLock::new();
256
257pub fn get_memory_events() -> MemoryEventStore {
258 MEMORY_EVENTS
259 .get_or_init(|| Arc::new(RwLock::new(Vec::new())))
260 .clone()
261}
262
263pub async fn clear_memory_events() {
265 let events = get_memory_events();
266 events.write().await.clear();
267 backend::memory::clear_memory_storage().await;
269}
270
271struct MemoryProducer {
272 backend: Box<dyn StreamBackend + Send + Sync>,
273 topic: String,
274 stats: ProducerStats,
275}
276
277impl MemoryProducer {
278 fn _new(_max_size: Option<usize>, _persistence: bool) -> Self {
279 Self {
280 backend: Box::new(backend::memory::MemoryBackend::new()),
281 topic: "oxirs-stream".to_string(), stats: ProducerStats {
283 backend_type: "memory".to_string(),
284 ..Default::default()
285 },
286 }
287 }
288
289 fn with_topic(topic: String) -> Self {
290 Self {
291 backend: Box::new(backend::memory::MemoryBackend::new()),
292 topic,
293 stats: ProducerStats {
294 backend_type: "memory".to_string(),
295 ..Default::default()
296 },
297 }
298 }
299
300 async fn publish(&mut self, event: StreamEvent) -> Result<()> {
301 let start_time = Instant::now();
302
303 self.backend
305 .as_mut()
306 .connect()
307 .await
308 .map_err(|e| anyhow!("Backend connect failed: {}", e))?;
309
310 let topic_name = crate::types::TopicName::new(self.topic.clone());
312 self.backend
313 .create_topic(&topic_name, 1)
314 .await
315 .map_err(|e| anyhow!("Topic creation failed: {}", e))?;
316
317 self.backend
319 .send_event(&topic_name, event)
320 .await
321 .map_err(|e| anyhow!("Send event failed: {}", e))?;
322
323 self.stats.events_published += 1;
325 let latency = start_time.elapsed().as_millis() as u64;
326 self.stats.max_latency_ms = self.stats.max_latency_ms.max(latency);
327 self.stats.avg_latency_ms = (self.stats.avg_latency_ms + latency as f64) / 2.0;
328 self.stats.last_publish = Some(Utc::now());
329
330 debug!("Memory producer: published event via backend");
331 Ok(())
332 }
333
334 async fn flush(&mut self) -> Result<()> {
335 self.stats.flush_count += 1;
337 debug!("Memory producer: flush completed");
338 Ok(())
339 }
340
341 fn _get_stats(&self) -> &ProducerStats {
342 &self.stats
343 }
344}
345
346impl StreamProducer {
347 pub async fn new(config: StreamConfig) -> Result<Self> {
349 let circuit_breaker = if config.circuit_breaker.enabled {
351 Some(circuit_breaker::new_shared_circuit_breaker(
352 circuit_breaker::CircuitBreakerConfig {
353 enabled: config.circuit_breaker.enabled,
354 failure_threshold: config.circuit_breaker.failure_threshold,
355 success_threshold: config.circuit_breaker.success_threshold,
356 timeout: config.circuit_breaker.timeout,
357 half_open_max_calls: config.circuit_breaker.half_open_max_calls,
358 ..Default::default()
359 },
360 ))
361 } else {
362 None
363 };
364
365 let backend_producer = match &config.backend {
367 #[cfg(feature = "kafka")]
368 StreamBackendType::Kafka {
369 brokers,
370 security_protocol,
371 sasl_config,
372 } => {
373 let stream_config = crate::StreamConfig {
374 backend: crate::StreamBackendType::Kafka {
375 brokers: brokers.clone(),
376 security_protocol: security_protocol.clone(),
377 sasl_config: sasl_config.clone(),
378 },
379 topic: config.topic.clone(),
380 batch_size: config.batch_size,
381 flush_interval_ms: config.flush_interval_ms,
382 max_connections: config.max_connections,
383 connection_timeout: config.connection_timeout,
384 enable_compression: config.enable_compression,
385 compression_type: config.compression_type.clone(),
386 retry_config: config.retry_config.clone(),
387 circuit_breaker: config.circuit_breaker.clone(),
388 security: config.security.clone(),
389 performance: config.performance.clone(),
390 monitoring: config.monitoring.clone(),
391 };
392
393 let mut producer = backend::kafka::KafkaProducer::new(stream_config)?;
394 producer.connect().await?;
395 BackendProducer::Kafka(producer)
396 }
397 #[cfg(feature = "nats")]
398 StreamBackendType::Nats {
399 url,
400 cluster_urls,
401 jetstream_config,
402 } => {
403 let stream_config = crate::StreamConfig {
404 backend: crate::StreamBackendType::Nats {
405 url: url.clone(),
406 cluster_urls: cluster_urls.clone(),
407 jetstream_config: jetstream_config.clone(),
408 },
409 topic: config.topic.clone(),
410 batch_size: config.batch_size,
411 flush_interval_ms: config.flush_interval_ms,
412 max_connections: config.max_connections,
413 connection_timeout: config.connection_timeout,
414 enable_compression: config.enable_compression,
415 compression_type: config.compression_type.clone(),
416 retry_config: config.retry_config.clone(),
417 circuit_breaker: config.circuit_breaker.clone(),
418 security: config.security.clone(),
419 performance: config.performance.clone(),
420 monitoring: config.monitoring.clone(),
421 };
422
423 let mut producer = backend::nats::NatsProducer::new(stream_config)?;
424 producer.connect().await?;
425 BackendProducer::Nats(Box::new(producer))
426 }
427 #[cfg(feature = "redis")]
428 StreamBackendType::Redis {
429 url,
430 cluster_urls,
431 pool_size,
432 } => {
433 let stream_config = crate::StreamConfig {
434 backend: crate::StreamBackendType::Redis {
435 url: url.clone(),
436 cluster_urls: cluster_urls.clone(),
437 pool_size: *pool_size,
438 },
439 topic: config.topic.clone(),
440 batch_size: config.batch_size,
441 flush_interval_ms: config.flush_interval_ms,
442 max_connections: config.max_connections,
443 connection_timeout: config.connection_timeout,
444 enable_compression: config.enable_compression,
445 compression_type: config.compression_type.clone(),
446 retry_config: config.retry_config.clone(),
447 circuit_breaker: config.circuit_breaker.clone(),
448 security: config.security.clone(),
449 performance: config.performance.clone(),
450 monitoring: config.monitoring.clone(),
451 };
452
453 let mut producer = backend::redis::RedisProducer::new(stream_config)?;
454 producer.connect().await?;
455 BackendProducer::Redis(producer)
456 }
457 #[cfg(feature = "kinesis")]
458 StreamBackendType::Kinesis {
459 region,
460 stream_name,
461 credentials,
462 } => {
463 let stream_config = crate::StreamConfig {
464 backend: crate::StreamBackendType::Kinesis {
465 region: region.clone(),
466 stream_name: stream_name.clone(),
467 credentials: credentials.clone(),
468 },
469 topic: config.topic.clone(),
470 batch_size: config.batch_size,
471 flush_interval_ms: config.flush_interval_ms,
472 max_connections: config.max_connections,
473 connection_timeout: config.connection_timeout,
474 enable_compression: config.enable_compression,
475 compression_type: config.compression_type.clone(),
476 retry_config: config.retry_config.clone(),
477 circuit_breaker: config.circuit_breaker.clone(),
478 security: config.security.clone(),
479 performance: config.performance.clone(),
480 monitoring: config.monitoring.clone(),
481 };
482
483 let mut producer = backend::kinesis::KinesisProducer::new(stream_config)?;
484 producer.connect().await?;
485 BackendProducer::Kinesis(producer)
486 }
487 #[cfg(feature = "pulsar")]
488 StreamBackendType::Pulsar {
489 service_url,
490 auth_config,
491 } => {
492 let stream_config = crate::StreamConfig {
493 backend: crate::StreamBackendType::Pulsar {
494 service_url: service_url.clone(),
495 auth_config: auth_config.clone(),
496 },
497 topic: config.topic.clone(),
498 batch_size: config.batch_size,
499 flush_interval_ms: config.flush_interval_ms,
500 max_connections: config.max_connections,
501 connection_timeout: config.connection_timeout,
502 enable_compression: config.enable_compression,
503 compression_type: config.compression_type.clone(),
504 retry_config: config.retry_config.clone(),
505 circuit_breaker: config.circuit_breaker.clone(),
506 security: config.security.clone(),
507 performance: config.performance.clone(),
508 monitoring: config.monitoring.clone(),
509 };
510
511 let mut producer = backend::pulsar::PulsarProducer::new(stream_config)?;
512 producer.connect().await?;
513 BackendProducer::Pulsar(Box::new(producer))
514 }
515 #[cfg(feature = "rabbitmq")]
516 StreamBackendType::RabbitMQ {
517 url,
518 exchange,
519 queue,
520 } => {
521 let stream_config = crate::StreamConfig {
522 backend: crate::StreamBackendType::RabbitMQ {
523 url: url.clone(),
524 exchange: exchange.clone(),
525 queue: queue.clone(),
526 },
527 topic: config.topic.clone(),
528 batch_size: config.batch_size,
529 flush_interval_ms: config.flush_interval_ms,
530 max_connections: config.max_connections,
531 connection_timeout: config.connection_timeout,
532 enable_compression: config.enable_compression,
533 compression_type: config.compression_type.clone(),
534 retry_config: config.retry_config.clone(),
535 circuit_breaker: config.circuit_breaker.clone(),
536 security: config.security.clone(),
537 performance: config.performance.clone(),
538 monitoring: config.monitoring.clone(),
539 };
540
541 let mut producer = backend::rabbitmq::RabbitMQProducer::new(stream_config)?;
542 producer.connect().await?;
543 BackendProducer::RabbitMQ(Box::new(producer))
544 }
545 StreamBackendType::Memory {
546 max_size: _,
547 persistence: _,
548 } => BackendProducer::Memory(MemoryProducer::with_topic(config.topic.clone())),
549 };
550
551 let stats = Arc::new(RwLock::new(ProducerStats {
552 backend_type: match backend_producer {
553 #[cfg(feature = "kafka")]
554 BackendProducer::Kafka(_) => "kafka".to_string(),
555 #[cfg(feature = "nats")]
556 BackendProducer::Nats(_) => "nats".to_string(),
557 #[cfg(feature = "redis")]
558 BackendProducer::Redis(_) => "redis".to_string(),
559 #[cfg(feature = "kinesis")]
560 BackendProducer::Kinesis(_) => "kinesis".to_string(),
561 #[cfg(feature = "pulsar")]
562 BackendProducer::Pulsar(_) => "pulsar".to_string(),
563 #[cfg(feature = "rabbitmq")]
564 BackendProducer::RabbitMQ(_) => "rabbitmq".to_string(),
565 BackendProducer::Memory(_) => "memory".to_string(),
566 },
567 ..Default::default()
568 }));
569
570 info!(
571 "Created stream producer with backend: {}",
572 stats.read().await.backend_type
573 );
574
575 Ok(Self {
576 config,
577 backend_producer,
578 stats,
579 circuit_breaker,
580 last_flush: Instant::now(),
581 _pending_events: Arc::new(RwLock::new(Vec::new())),
582 batch_buffer: Arc::new(RwLock::new(Vec::new())),
583 flush_semaphore: Arc::new(Semaphore::new(1)),
584 })
585 }
586
587 pub async fn publish(&mut self, event: StreamEvent) -> Result<()> {
589 let start_time = Instant::now();
590
591 if let Some(cb) = &self.circuit_breaker {
593 if !cb.can_execute().await {
594 self.stats.write().await.circuit_breaker_trips += 1;
595 return Err(anyhow!("Circuit breaker is open - cannot publish events"));
596 }
597 }
598
599 if self.config.performance.enable_batching {
601 let mut batch_buffer = self.batch_buffer.write().await;
602 batch_buffer.push(event);
603
604 if batch_buffer.len() >= self.config.batch_size {
605 let events = std::mem::take(&mut *batch_buffer);
606 drop(batch_buffer);
607 return self.publish_batch_internal(events).await;
608 }
609
610 return Ok(());
611 }
612
613 let result = self.publish_single_event(event).await;
615
616 match &result {
618 Ok(_) => {
619 if let Some(cb) = &self.circuit_breaker {
620 cb.record_success_with_duration(start_time.elapsed()).await;
621 }
622
623 let mut stats = self.stats.write().await;
624 stats.events_published += 1;
625 let latency = start_time.elapsed().as_millis() as u64;
626 stats.max_latency_ms = stats.max_latency_ms.max(latency);
627 stats.avg_latency_ms = (stats.avg_latency_ms + latency as f64) / 2.0;
628 stats.last_publish = Some(Utc::now());
629 }
630 Err(_) => {
631 if let Some(cb) = &self.circuit_breaker {
632 cb.record_failure_with_type(circuit_breaker::FailureType::NetworkError)
633 .await;
634 }
635
636 self.stats.write().await.events_failed += 1;
637 }
638 }
639
640 result
641 }
642
643 async fn publish_single_event(&mut self, event: StreamEvent) -> Result<()> {
645 match &mut self.backend_producer {
646 #[cfg(feature = "kafka")]
647 BackendProducer::Kafka(producer) => producer.publish(event).await,
648 #[cfg(feature = "nats")]
649 BackendProducer::Nats(producer) => producer.publish(event).await,
650 #[cfg(feature = "redis")]
651 BackendProducer::Redis(producer) => producer.publish(event).await,
652 #[cfg(feature = "kinesis")]
653 BackendProducer::Kinesis(producer) => producer.publish(event).await,
654 #[cfg(feature = "pulsar")]
655 BackendProducer::Pulsar(producer) => producer.publish(event).await,
656 #[cfg(feature = "rabbitmq")]
657 BackendProducer::RabbitMQ(producer) => producer.publish(event).await,
658 BackendProducer::Memory(producer) => producer.publish(event).await,
659 }
660 }
661
662 pub async fn publish_batch(&mut self, events: Vec<StreamEvent>) -> Result<()> {
664 if events.is_empty() {
665 return Ok(());
666 }
667
668 self.publish_batch_internal(events).await
669 }
670
671 async fn publish_batch_internal(&mut self, events: Vec<StreamEvent>) -> Result<()> {
673 let start_time = Instant::now();
674 let event_count = events.len();
675
676 let result = match &mut self.backend_producer {
677 #[cfg(feature = "kafka")]
678 BackendProducer::Kafka(producer) => producer.publish_batch(events).await,
679 #[cfg(feature = "nats")]
680 BackendProducer::Nats(producer) => producer.publish_batch(events).await,
681 #[cfg(feature = "redis")]
682 BackendProducer::Redis(producer) => producer.publish_batch(events).await,
683 #[cfg(feature = "kinesis")]
684 BackendProducer::Kinesis(producer) => producer.publish_batch(events).await,
685 #[cfg(feature = "pulsar")]
686 BackendProducer::Pulsar(producer) => producer.publish_batch(events).await,
687 #[cfg(feature = "rabbitmq")]
688 BackendProducer::RabbitMQ(producer) => producer.publish_batch(events).await,
689 BackendProducer::Memory(producer) => {
690 for event in events {
691 producer.publish(event).await?;
692 }
693 Ok(())
694 }
695 };
696
697 let mut stats = self.stats.write().await;
699 match &result {
700 Ok(_) => {
701 stats.events_published += event_count as u64;
702 stats.batch_count += 1;
703 let latency = start_time.elapsed().as_millis() as u64;
704 stats.max_latency_ms = stats.max_latency_ms.max(latency);
705 stats.avg_latency_ms = (stats.avg_latency_ms + latency as f64) / 2.0;
706 stats.last_publish = Some(Utc::now());
707 }
708 Err(_) => {
709 stats.events_failed += event_count as u64;
710 }
711 }
712
713 debug!(
714 "Published batch of {} events in {:?}",
715 event_count,
716 start_time.elapsed()
717 );
718 result
719 }
720
721 pub async fn flush(&mut self) -> Result<()> {
723 let _permit = self
724 .flush_semaphore
725 .acquire()
726 .await
727 .map_err(|_| anyhow!("Failed to acquire flush semaphore"))?;
728
729 let start_time = Instant::now();
730
731 if self.config.performance.enable_batching {
733 let events = {
734 let mut batch_buffer = self.batch_buffer.write().await;
735 if !batch_buffer.is_empty() {
736 std::mem::take(&mut *batch_buffer)
737 } else {
738 Vec::new()
739 }
740 };
741 if !events.is_empty() {
742 drop(_permit); self.publish_batch_internal(events).await?;
744 }
745 }
746
747 let result = match &mut self.backend_producer {
749 #[cfg(feature = "kafka")]
750 BackendProducer::Kafka(producer) => producer.flush().await,
751 #[cfg(feature = "nats")]
752 BackendProducer::Nats(producer) => producer.flush().await,
753 #[cfg(feature = "redis")]
754 BackendProducer::Redis(producer) => producer.flush().await,
755 #[cfg(feature = "kinesis")]
756 BackendProducer::Kinesis(producer) => producer.flush().await,
757 #[cfg(feature = "pulsar")]
758 BackendProducer::Pulsar(producer) => producer.flush().await,
759 #[cfg(feature = "rabbitmq")]
760 BackendProducer::RabbitMQ(producer) => producer.flush().await,
761 BackendProducer::Memory(producer) => producer.flush().await,
762 };
763
764 if result.is_ok() {
766 self.stats.write().await.flush_count += 1;
767 self.last_flush = Instant::now();
768 debug!("Flushed producer buffers in {:?}", start_time.elapsed());
769 }
770
771 result
772 }
773
774 pub async fn publish_patch(&mut self, patch: &RdfPatch) -> Result<()> {
776 let events: Vec<StreamEvent> = patch
777 .operations
778 .iter()
779 .filter_map(|op| {
780 let metadata = EventMetadata {
781 event_id: Uuid::new_v4().to_string(),
782 timestamp: patch.timestamp,
783 source: "rdf_patch".to_string(),
784 user: None,
785 context: Some(patch.id.clone()),
786 caused_by: None,
787 version: "1.0".to_string(),
788 properties: HashMap::new(),
789 checksum: None,
790 };
791
792 match op {
793 PatchOperation::Add {
794 subject,
795 predicate,
796 object,
797 } => Some(StreamEvent::TripleAdded {
798 subject: subject.clone(),
799 predicate: predicate.clone(),
800 object: object.clone(),
801 graph: None,
802 metadata,
803 }),
804 PatchOperation::Delete {
805 subject,
806 predicate,
807 object,
808 } => Some(StreamEvent::TripleRemoved {
809 subject: subject.clone(),
810 predicate: predicate.clone(),
811 object: object.clone(),
812 graph: None,
813 metadata,
814 }),
815 PatchOperation::AddGraph { graph } => Some(StreamEvent::GraphCreated {
816 graph: graph.clone(),
817 metadata,
818 }),
819 PatchOperation::DeleteGraph { graph } => Some(StreamEvent::GraphDeleted {
820 graph: graph.clone(),
821 metadata,
822 }),
823 PatchOperation::AddPrefix { .. } => {
824 None
826 }
827 PatchOperation::DeletePrefix { .. } => {
828 None
830 }
831 PatchOperation::TransactionBegin { .. } => {
832 Some(StreamEvent::TransactionBegin {
833 transaction_id: patch
834 .transaction_id
835 .clone()
836 .unwrap_or_else(|| Uuid::new_v4().to_string()),
837 isolation_level: None,
838 metadata,
839 })
840 }
841 PatchOperation::TransactionCommit => Some(StreamEvent::TransactionCommit {
842 transaction_id: patch
843 .transaction_id
844 .clone()
845 .unwrap_or_else(|| Uuid::new_v4().to_string()),
846 metadata,
847 }),
848 PatchOperation::TransactionAbort => Some(StreamEvent::TransactionAbort {
849 transaction_id: patch
850 .transaction_id
851 .clone()
852 .unwrap_or_else(|| Uuid::new_v4().to_string()),
853 metadata,
854 }),
855 PatchOperation::Header { .. } => {
856 None
858 }
859 }
860 })
861 .collect();
862
863 self.publish_batch(events).await
864 }
865
866 pub async fn get_stats(&self) -> ProducerStats {
868 self.stats.read().await.clone()
869 }
870
871 pub async fn health_check(&self) -> bool {
873 if let Some(cb) = &self.circuit_breaker {
874 cb.is_healthy().await
875 } else {
876 true
877 }
878 }
879}
880
881pub struct StreamConsumer {
883 _config: StreamConfig,
884 backend_consumer: BackendConsumer,
885 stats: Arc<RwLock<ConsumerStats>>,
886 circuit_breaker: Option<circuit_breaker::SharedCircuitBreaker>,
887 last_poll: Instant,
888 _message_buffer: Arc<RwLock<Vec<StreamEvent>>>,
889 consumer_group: Option<String>,
890}
891
892enum BackendConsumer {
894 #[cfg(feature = "kafka")]
895 Kafka(backend::kafka::KafkaConsumer),
896 #[cfg(feature = "nats")]
897 Nats(Box<backend::nats::NatsConsumer>),
898 #[cfg(feature = "redis")]
899 Redis(backend::redis::RedisConsumer),
900 #[cfg(feature = "kinesis")]
901 Kinesis(backend::kinesis::KinesisConsumer),
902 #[cfg(feature = "pulsar")]
903 Pulsar(Box<backend::pulsar::PulsarConsumer>),
904 #[cfg(feature = "rabbitmq")]
905 RabbitMQ(Box<backend::rabbitmq::RabbitMQConsumer>),
906 Memory(MemoryConsumer),
907}
908
909#[derive(Debug, Default, Clone)]
911pub struct ConsumerStats {
912 events_consumed: u64,
913 events_failed: u64,
914 _bytes_received: u64,
915 avg_processing_time_ms: f64,
916 max_processing_time_ms: u64,
917 _consumer_lag: u64,
918 circuit_breaker_trips: u64,
919 last_message: Option<DateTime<Utc>>,
920 backend_type: String,
921 _batch_size: usize,
922}
923
924struct MemoryConsumer {
926 backend: Box<dyn StreamBackend + Send + Sync>,
927 topic: String,
928 current_offset: u64,
929 stats: ConsumerStats,
930}
931
932impl MemoryConsumer {
933 fn _new() -> Self {
934 Self {
935 backend: Box::new(backend::memory::MemoryBackend::new()),
936 topic: "oxirs-stream".to_string(),
937 current_offset: 0,
938 stats: ConsumerStats {
939 backend_type: "memory".to_string(),
940 ..Default::default()
941 },
942 }
943 }
944
945 fn with_topic(topic: String) -> Self {
946 Self {
947 backend: Box::new(backend::memory::MemoryBackend::new()),
948 topic,
949 current_offset: 0,
950 stats: ConsumerStats {
951 backend_type: "memory".to_string(),
952 ..Default::default()
953 },
954 }
955 }
956
957 async fn consume(&mut self) -> Result<Option<StreamEvent>> {
958 let start_time = Instant::now();
959
960 self.backend
962 .as_mut()
963 .connect()
964 .await
965 .map_err(|e| anyhow!("Backend connect failed: {}", e))?;
966
967 let topic_name = crate::types::TopicName::new(self.topic.clone());
969 let events = self
970 .backend
971 .receive_events(
972 &topic_name,
973 None, crate::types::StreamPosition::Offset(self.current_offset),
975 1, )
977 .await
978 .map_err(|e| anyhow!("Receive events failed: {}", e))?;
979
980 if let Some((event, offset)) = events.first() {
981 self.current_offset = offset.value() + 1;
982
983 self.stats.events_consumed += 1;
985 let processing_time = start_time.elapsed().as_millis() as u64;
986 self.stats.max_processing_time_ms =
987 self.stats.max_processing_time_ms.max(processing_time);
988 self.stats.avg_processing_time_ms =
989 (self.stats.avg_processing_time_ms + processing_time as f64) / 2.0;
990 self.stats.last_message = Some(Utc::now());
991
992 debug!("Memory consumer: consumed event via backend");
993 Ok(Some(event.clone()))
994 } else {
995 debug!("Memory consumer: no events available");
996 Ok(None)
997 }
998 }
999
1000 fn _get_stats(&self) -> &ConsumerStats {
1001 &self.stats
1002 }
1003
1004 fn reset(&mut self) {
1006 self.current_offset = 0;
1007 }
1008}
1009
1010impl StreamConsumer {
1011 pub async fn new(config: StreamConfig) -> Result<Self> {
1013 Self::new_with_group(config, None).await
1014 }
1015
1016 pub async fn new_with_group(
1018 config: StreamConfig,
1019 consumer_group: Option<String>,
1020 ) -> Result<Self> {
1021 let circuit_breaker = if config.circuit_breaker.enabled {
1023 Some(circuit_breaker::new_shared_circuit_breaker(
1024 circuit_breaker::CircuitBreakerConfig {
1025 enabled: config.circuit_breaker.enabled,
1026 failure_threshold: config.circuit_breaker.failure_threshold,
1027 success_threshold: config.circuit_breaker.success_threshold,
1028 timeout: config.circuit_breaker.timeout,
1029 half_open_max_calls: config.circuit_breaker.half_open_max_calls,
1030 ..Default::default()
1031 },
1032 ))
1033 } else {
1034 None
1035 };
1036
1037 let backend_consumer = match &config.backend {
1039 #[cfg(feature = "kafka")]
1040 StreamBackendType::Kafka {
1041 brokers,
1042 security_protocol,
1043 sasl_config,
1044 } => {
1045 let stream_config = crate::StreamConfig {
1046 backend: crate::StreamBackendType::Kafka {
1047 brokers: brokers.clone(),
1048 security_protocol: security_protocol.clone(),
1049 sasl_config: sasl_config.clone(),
1050 },
1051 topic: config.topic.clone(),
1052 batch_size: config.batch_size,
1053 flush_interval_ms: config.flush_interval_ms,
1054 max_connections: config.max_connections,
1055 connection_timeout: config.connection_timeout,
1056 enable_compression: config.enable_compression,
1057 compression_type: config.compression_type.clone(),
1058 retry_config: config.retry_config.clone(),
1059 circuit_breaker: config.circuit_breaker.clone(),
1060 security: config.security.clone(),
1061 performance: config.performance.clone(),
1062 monitoring: config.monitoring.clone(),
1063 };
1064
1065 let mut consumer = backend::kafka::KafkaConsumer::new(stream_config)?;
1066 consumer.connect().await?;
1067 BackendConsumer::Kafka(consumer)
1068 }
1069 #[cfg(feature = "nats")]
1070 StreamBackendType::Nats {
1071 url,
1072 cluster_urls,
1073 jetstream_config,
1074 } => {
1075 let stream_config = crate::StreamConfig {
1076 backend: crate::StreamBackendType::Nats {
1077 url: url.clone(),
1078 cluster_urls: cluster_urls.clone(),
1079 jetstream_config: jetstream_config.clone(),
1080 },
1081 topic: config.topic.clone(),
1082 batch_size: config.batch_size,
1083 flush_interval_ms: config.flush_interval_ms,
1084 max_connections: config.max_connections,
1085 connection_timeout: config.connection_timeout,
1086 enable_compression: config.enable_compression,
1087 compression_type: config.compression_type.clone(),
1088 retry_config: config.retry_config.clone(),
1089 circuit_breaker: config.circuit_breaker.clone(),
1090 security: config.security.clone(),
1091 performance: config.performance.clone(),
1092 monitoring: config.monitoring.clone(),
1093 };
1094
1095 let mut consumer = backend::nats::NatsConsumer::new(stream_config)?;
1096 consumer.connect().await?;
1097 BackendConsumer::Nats(Box::new(consumer))
1098 }
1099 #[cfg(feature = "redis")]
1100 StreamBackendType::Redis {
1101 url,
1102 cluster_urls,
1103 pool_size,
1104 } => {
1105 let stream_config = crate::StreamConfig {
1106 backend: crate::StreamBackendType::Redis {
1107 url: url.clone(),
1108 cluster_urls: cluster_urls.clone(),
1109 pool_size: *pool_size,
1110 },
1111 topic: config.topic.clone(),
1112 batch_size: config.batch_size,
1113 flush_interval_ms: config.flush_interval_ms,
1114 max_connections: config.max_connections,
1115 connection_timeout: config.connection_timeout,
1116 enable_compression: config.enable_compression,
1117 compression_type: config.compression_type.clone(),
1118 retry_config: config.retry_config.clone(),
1119 circuit_breaker: config.circuit_breaker.clone(),
1120 security: config.security.clone(),
1121 performance: config.performance.clone(),
1122 monitoring: config.monitoring.clone(),
1123 };
1124
1125 let mut consumer = backend::redis::RedisConsumer::new(stream_config)?;
1126 consumer.connect().await?;
1127 BackendConsumer::Redis(consumer)
1128 }
1129 #[cfg(feature = "kinesis")]
1130 StreamBackendType::Kinesis {
1131 region,
1132 stream_name,
1133 credentials,
1134 } => {
1135 let stream_config = crate::StreamConfig {
1136 backend: crate::StreamBackendType::Kinesis {
1137 region: region.clone(),
1138 stream_name: stream_name.clone(),
1139 credentials: credentials.clone(),
1140 },
1141 topic: config.topic.clone(),
1142 batch_size: config.batch_size,
1143 flush_interval_ms: config.flush_interval_ms,
1144 max_connections: config.max_connections,
1145 connection_timeout: config.connection_timeout,
1146 enable_compression: config.enable_compression,
1147 compression_type: config.compression_type.clone(),
1148 retry_config: config.retry_config.clone(),
1149 circuit_breaker: config.circuit_breaker.clone(),
1150 security: config.security.clone(),
1151 performance: config.performance.clone(),
1152 monitoring: config.monitoring.clone(),
1153 };
1154
1155 let mut consumer = backend::kinesis::KinesisConsumer::new(stream_config)?;
1156 consumer.connect().await?;
1157 BackendConsumer::Kinesis(consumer)
1158 }
1159 #[cfg(feature = "pulsar")]
1160 StreamBackendType::Pulsar {
1161 service_url,
1162 auth_config,
1163 } => {
1164 let stream_config = crate::StreamConfig {
1165 backend: crate::StreamBackendType::Pulsar {
1166 service_url: service_url.clone(),
1167 auth_config: auth_config.clone(),
1168 },
1169 topic: config.topic.clone(),
1170 batch_size: config.batch_size,
1171 flush_interval_ms: config.flush_interval_ms,
1172 max_connections: config.max_connections,
1173 connection_timeout: config.connection_timeout,
1174 enable_compression: config.enable_compression,
1175 compression_type: config.compression_type.clone(),
1176 retry_config: config.retry_config.clone(),
1177 circuit_breaker: config.circuit_breaker.clone(),
1178 security: config.security.clone(),
1179 performance: config.performance.clone(),
1180 monitoring: config.monitoring.clone(),
1181 };
1182
1183 let mut consumer = backend::pulsar::PulsarConsumer::new(stream_config)?;
1184 consumer.connect().await?;
1185 BackendConsumer::Pulsar(Box::new(consumer))
1186 }
1187 #[cfg(feature = "rabbitmq")]
1188 StreamBackendType::RabbitMQ {
1189 url,
1190 exchange,
1191 queue,
1192 } => {
1193 let stream_config = crate::StreamConfig {
1194 backend: crate::StreamBackendType::RabbitMQ {
1195 url: url.clone(),
1196 exchange: exchange.clone(),
1197 queue: queue.clone(),
1198 },
1199 topic: config.topic.clone(),
1200 batch_size: config.batch_size,
1201 flush_interval_ms: config.flush_interval_ms,
1202 max_connections: config.max_connections,
1203 connection_timeout: config.connection_timeout,
1204 enable_compression: config.enable_compression,
1205 compression_type: config.compression_type.clone(),
1206 retry_config: config.retry_config.clone(),
1207 circuit_breaker: config.circuit_breaker.clone(),
1208 security: config.security.clone(),
1209 performance: config.performance.clone(),
1210 monitoring: config.monitoring.clone(),
1211 };
1212
1213 let mut consumer = backend::rabbitmq::RabbitMQConsumer::new(stream_config)?;
1214 consumer.connect().await?;
1215 BackendConsumer::RabbitMQ(Box::new(consumer))
1216 }
1217 StreamBackendType::Memory {
1218 max_size: _,
1219 persistence: _,
1220 } => BackendConsumer::Memory(MemoryConsumer::with_topic(config.topic.clone())),
1221 };
1222
1223 let stats = Arc::new(RwLock::new(ConsumerStats {
1224 backend_type: match backend_consumer {
1225 #[cfg(feature = "kafka")]
1226 BackendConsumer::Kafka(_) => "kafka".to_string(),
1227 #[cfg(feature = "nats")]
1228 BackendConsumer::Nats(_) => "nats".to_string(),
1229 #[cfg(feature = "redis")]
1230 BackendConsumer::Redis(_) => "redis".to_string(),
1231 #[cfg(feature = "kinesis")]
1232 BackendConsumer::Kinesis(_) => "kinesis".to_string(),
1233 #[cfg(feature = "pulsar")]
1234 BackendConsumer::Pulsar(_) => "pulsar".to_string(),
1235 #[cfg(feature = "rabbitmq")]
1236 BackendConsumer::RabbitMQ(_) => "rabbitmq".to_string(),
1237 BackendConsumer::Memory(_) => "memory".to_string(),
1238 },
1239 _batch_size: config.batch_size,
1240 ..Default::default()
1241 }));
1242
1243 info!(
1244 "Created stream consumer with backend: {} and group: {:?}",
1245 stats.read().await.backend_type,
1246 consumer_group
1247 );
1248
1249 Ok(Self {
1250 _config: config,
1251 backend_consumer,
1252 stats,
1253 circuit_breaker,
1254 last_poll: Instant::now(),
1255 _message_buffer: Arc::new(RwLock::new(Vec::new())),
1256 consumer_group,
1257 })
1258 }
1259
1260 pub async fn consume(&mut self) -> Result<Option<StreamEvent>> {
1262 let start_time = Instant::now();
1263
1264 if let Some(cb) = &self.circuit_breaker {
1266 if !cb.can_execute().await {
1267 self.stats.write().await.circuit_breaker_trips += 1;
1268 return Err(anyhow!("Circuit breaker is open - cannot consume events"));
1269 }
1270 }
1271
1272 let result = self.consume_single_event().await;
1274
1275 match &result {
1277 Ok(Some(_)) => {
1278 if let Some(cb) = &self.circuit_breaker {
1279 cb.record_success_with_duration(start_time.elapsed()).await;
1280 }
1281
1282 let mut stats = self.stats.write().await;
1283 stats.events_consumed += 1;
1284 let processing_time = start_time.elapsed().as_millis() as u64;
1285 stats.max_processing_time_ms = stats.max_processing_time_ms.max(processing_time);
1286 stats.avg_processing_time_ms =
1287 (stats.avg_processing_time_ms + processing_time as f64) / 2.0;
1288 stats.last_message = Some(Utc::now());
1289 }
1290 Ok(None) => {
1291 if let Some(cb) = &self.circuit_breaker {
1293 cb.record_success_with_duration(start_time.elapsed()).await;
1294 }
1295 }
1296 Err(_) => {
1297 if let Some(cb) = &self.circuit_breaker {
1298 cb.record_failure_with_type(circuit_breaker::FailureType::NetworkError)
1299 .await;
1300 }
1301
1302 self.stats.write().await.events_failed += 1;
1303 }
1304 }
1305
1306 self.last_poll = Instant::now();
1307 result
1308 }
1309
1310 async fn consume_single_event(&mut self) -> Result<Option<StreamEvent>> {
1312 match &mut self.backend_consumer {
1313 #[cfg(feature = "kafka")]
1314 BackendConsumer::Kafka(consumer) => consumer.consume().await,
1315 #[cfg(feature = "nats")]
1316 BackendConsumer::Nats(consumer) => consumer.consume().await,
1317 #[cfg(feature = "redis")]
1318 BackendConsumer::Redis(consumer) => consumer.consume().await,
1319 #[cfg(feature = "kinesis")]
1320 BackendConsumer::Kinesis(consumer) => consumer.consume().await,
1321 #[cfg(feature = "pulsar")]
1322 BackendConsumer::Pulsar(consumer) => consumer.consume().await,
1323 #[cfg(feature = "rabbitmq")]
1324 BackendConsumer::RabbitMQ(consumer) => consumer.consume().await,
1325 BackendConsumer::Memory(consumer) => consumer.consume().await,
1326 }
1327 }
1328
1329 pub async fn consume_batch(
1331 &mut self,
1332 max_events: usize,
1333 timeout: Duration,
1334 ) -> Result<Vec<StreamEvent>> {
1335 let mut events = Vec::new();
1336 let start_time = Instant::now();
1337
1338 while events.len() < max_events && start_time.elapsed() < timeout {
1339 match tokio::time::timeout(Duration::from_millis(50), self.consume()).await {
1340 Ok(Ok(Some(event))) => events.push(event),
1341 Ok(Ok(None)) => continue,
1342 Ok(Err(e)) => return Err(e),
1343 Err(_) => break, }
1345 }
1346
1347 if !events.is_empty() {
1348 debug!(
1349 "Consumed batch of {} events in {:?}",
1350 events.len(),
1351 start_time.elapsed()
1352 );
1353 }
1354
1355 Ok(events)
1356 }
1357
1358 pub async fn start_consuming<F>(&mut self, mut callback: F) -> Result<()>
1360 where
1361 F: FnMut(StreamEvent) -> Result<()> + Send,
1362 {
1363 info!("Starting stream consumer loop");
1364
1365 loop {
1366 match self.consume().await {
1367 Ok(Some(event)) => {
1368 if let Err(e) = callback(event) {
1369 error!("Callback error: {}", e);
1370 self.stats.write().await.events_failed += 1;
1371 }
1372 }
1373 Ok(None) => {
1374 tokio::time::sleep(Duration::from_millis(10)).await;
1376 }
1377 Err(e) => {
1378 error!("Consumer error: {}", e);
1379 tokio::time::sleep(Duration::from_millis(100)).await;
1380 }
1381 }
1382 }
1383 }
1384
1385 pub async fn start_consuming_async<F, Fut>(&mut self, mut callback: F) -> Result<()>
1387 where
1388 F: FnMut(StreamEvent) -> Fut + Send,
1389 Fut: std::future::Future<Output = Result<()>> + Send,
1390 {
1391 info!("Starting async stream consumer loop");
1392
1393 loop {
1394 match self.consume().await {
1395 Ok(Some(event)) => {
1396 if let Err(e) = callback(event).await {
1397 error!("Async callback error: {}", e);
1398 self.stats.write().await.events_failed += 1;
1399 }
1400 }
1401 Ok(None) => {
1402 tokio::time::sleep(Duration::from_millis(10)).await;
1404 }
1405 Err(e) => {
1406 error!("Consumer error: {}", e);
1407 tokio::time::sleep(Duration::from_millis(100)).await;
1408 }
1409 }
1410 }
1411 }
1412
1413 pub async fn get_stats(&self) -> ConsumerStats {
1415 self.stats.read().await.clone()
1416 }
1417
1418 pub async fn health_check(&self) -> bool {
1420 if let Some(cb) = &self.circuit_breaker {
1421 cb.is_healthy().await
1422 } else {
1423 true
1424 }
1425 }
1426
1427 pub fn consumer_group(&self) -> Option<&String> {
1429 self.consumer_group.as_ref()
1430 }
1431
1432 pub async fn reset_position(&mut self) -> Result<()> {
1434 match &mut self.backend_consumer {
1435 BackendConsumer::Memory(consumer) => {
1436 consumer.reset();
1437 Ok(())
1438 }
1439 #[cfg(feature = "kafka")]
1440 BackendConsumer::Kafka(_) => {
1441 Err(anyhow!("Reset position not supported for Kafka backend"))
1442 }
1443 #[cfg(feature = "nats")]
1444 BackendConsumer::Nats(_) => {
1445 Err(anyhow!("Reset position not supported for NATS backend"))
1446 }
1447 #[cfg(feature = "redis")]
1448 BackendConsumer::Redis(_) => {
1449 Err(anyhow!("Reset position not supported for Redis backend"))
1450 }
1451 #[cfg(feature = "kinesis")]
1452 BackendConsumer::Kinesis(_) => {
1453 Err(anyhow!("Reset position not supported for Kinesis backend"))
1454 }
1455 #[cfg(feature = "pulsar")]
1456 BackendConsumer::Pulsar(_) => {
1457 Err(anyhow!("Reset position not supported for Pulsar backend"))
1458 }
1459 #[cfg(feature = "rabbitmq")]
1460 BackendConsumer::RabbitMQ(_) => {
1461 Err(anyhow!("Reset position not supported for RabbitMQ backend"))
1462 }
1463 }
1464 }
1465
1466 pub async fn set_test_events(&mut self, _events: Vec<StreamEvent>) -> Result<()> {
1468 Err(anyhow!("set_test_events is deprecated with backend implementation - use producer to publish events"))
1471 }
1472}
1473
1474#[derive(Debug, Clone, Serialize, Deserialize)]
1476pub enum PatchOperation {
1477 Add {
1479 subject: String,
1480 predicate: String,
1481 object: String,
1482 },
1483 Delete {
1485 subject: String,
1486 predicate: String,
1487 object: String,
1488 },
1489 AddGraph { graph: String },
1491 DeleteGraph { graph: String },
1493 AddPrefix { prefix: String, namespace: String },
1495 DeletePrefix { prefix: String },
1497 TransactionBegin { transaction_id: Option<String> },
1499 TransactionCommit,
1501 TransactionAbort,
1503 Header { key: String, value: String },
1505}
1506
1507#[derive(Debug, Clone, Serialize, Deserialize)]
1509pub struct RdfPatch {
1510 pub operations: Vec<PatchOperation>,
1511 pub timestamp: chrono::DateTime<chrono::Utc>,
1512 pub id: String,
1513 pub headers: HashMap<String, String>,
1515 pub transaction_id: Option<String>,
1517 pub prefixes: HashMap<String, String>,
1519}
1520
1521impl RdfPatch {
1522 pub fn new() -> Self {
1524 Self {
1525 operations: Vec::new(),
1526 timestamp: chrono::Utc::now(),
1527 id: uuid::Uuid::new_v4().to_string(),
1528 headers: HashMap::new(),
1529 transaction_id: None,
1530 prefixes: HashMap::new(),
1531 }
1532 }
1533
1534 pub fn add_operation(&mut self, operation: PatchOperation) {
1536 self.operations.push(operation);
1537 }
1538
1539 pub fn to_rdf_patch_format(&self) -> Result<String> {
1541 let serializer = crate::patch::PatchSerializer::new()
1542 .with_pretty_print(true)
1543 .with_metadata(true);
1544 serializer.serialize(self)
1545 }
1546
1547 pub fn from_rdf_patch_format(input: &str) -> Result<Self> {
1549 let mut parser = crate::patch::PatchParser::new().with_strict_mode(false);
1550 parser.parse(input)
1551 }
1552}
1553
1554impl Default for RdfPatch {
1555 fn default() -> Self {
1556 Self::new()
1557 }
1558}
1559
1560impl Default for StreamConfig {
1562 fn default() -> Self {
1563 Self {
1564 backend: StreamBackendType::Memory {
1565 max_size: Some(10000),
1566 persistence: false,
1567 },
1568 topic: "oxirs-stream".to_string(),
1569 batch_size: 100,
1570 flush_interval_ms: 100,
1571 max_connections: 10,
1572 connection_timeout: Duration::from_secs(30),
1573 enable_compression: false,
1574 compression_type: CompressionType::None,
1575 retry_config: RetryConfig::default(),
1576 circuit_breaker: CircuitBreakerConfig::default(),
1577 security: SecurityConfig::default(),
1578 performance: StreamPerformanceConfig::default(),
1579 monitoring: MonitoringConfig::default(),
1580 }
1581 }
1582}
1583
1584impl Default for RetryConfig {
1585 fn default() -> Self {
1586 Self {
1587 max_retries: 3,
1588 initial_backoff: Duration::from_millis(100),
1589 max_backoff: Duration::from_secs(30),
1590 backoff_multiplier: 2.0,
1591 jitter: true,
1592 }
1593 }
1594}
1595
1596impl Default for CircuitBreakerConfig {
1597 fn default() -> Self {
1598 Self {
1599 enabled: true,
1600 failure_threshold: 5,
1601 success_threshold: 3,
1602 timeout: Duration::from_secs(30),
1603 half_open_max_calls: 3,
1604 }
1605 }
1606}
1607
1608impl Default for SecurityConfig {
1609 fn default() -> Self {
1610 Self {
1611 enable_tls: false,
1612 verify_certificates: true,
1613 client_cert_path: None,
1614 client_key_path: None,
1615 ca_cert_path: None,
1616 sasl_config: None,
1617 }
1618 }
1619}
1620
1621impl Default for StreamPerformanceConfig {
1622 fn default() -> Self {
1623 Self {
1624 enable_batching: true,
1625 enable_pipelining: false,
1626 buffer_size: 8192,
1627 prefetch_count: 100,
1628 enable_zero_copy: false,
1629 enable_simd: false,
1630 parallel_processing: true,
1631 worker_threads: None,
1632 }
1633 }
1634}
1635
1636impl Default for MonitoringConfig {
1637 fn default() -> Self {
1638 Self {
1639 enable_metrics: true,
1640 enable_tracing: true,
1641 metrics_interval: Duration::from_secs(60),
1642 health_check_interval: Duration::from_secs(30),
1643 enable_profiling: false,
1644 prometheus_endpoint: None,
1645 jaeger_endpoint: None,
1646 log_level: "info".to_string(),
1647 }
1648 }
1649}
1650
1651impl StreamConfig {
1653 #[cfg(feature = "redis")]
1655 pub fn redis(url: String) -> Self {
1656 Self {
1657 backend: StreamBackendType::Redis {
1658 url,
1659 cluster_urls: None,
1660 pool_size: Some(10),
1661 },
1662 ..Default::default()
1663 }
1664 }
1665
1666 #[cfg(feature = "kinesis")]
1668 pub fn kinesis(region: String, stream_name: String) -> Self {
1669 Self {
1670 backend: StreamBackendType::Kinesis {
1671 region,
1672 stream_name,
1673 credentials: None,
1674 },
1675 ..Default::default()
1676 }
1677 }
1678
1679 pub fn memory() -> Self {
1681 Self {
1682 backend: StreamBackendType::Memory {
1683 max_size: Some(1000),
1684 persistence: false,
1685 },
1686 ..Default::default()
1687 }
1688 }
1689
1690 pub fn high_performance(mut self) -> Self {
1692 self.performance.enable_batching = true;
1693 self.performance.enable_pipelining = true;
1694 self.performance.parallel_processing = true;
1695 self.performance.buffer_size = 65536;
1696 self.performance.prefetch_count = 1000;
1697 self.batch_size = 1000;
1698 self.flush_interval_ms = 10;
1699 self
1700 }
1701
1702 pub fn with_compression(mut self, compression_type: CompressionType) -> Self {
1704 self.enable_compression = true;
1705 self.compression_type = compression_type;
1706 self
1707 }
1708
1709 pub fn with_circuit_breaker(mut self, enabled: bool, failure_threshold: u32) -> Self {
1711 self.circuit_breaker.enabled = enabled;
1712 self.circuit_breaker.failure_threshold = failure_threshold;
1713 self
1714 }
1715
1716 pub fn development(topic: &str) -> Self {
1718 Self {
1719 backend: StreamBackendType::Memory {
1720 max_size: Some(10000),
1721 persistence: false,
1722 },
1723 topic: topic.to_string(),
1724 batch_size: 10,
1725 flush_interval_ms: 100,
1726 max_connections: 5,
1727 connection_timeout: Duration::from_secs(10),
1728 enable_compression: false,
1729 compression_type: CompressionType::None,
1730 retry_config: RetryConfig {
1731 max_retries: 3,
1732 initial_backoff: Duration::from_millis(100),
1733 max_backoff: Duration::from_secs(5),
1734 backoff_multiplier: 2.0,
1735 jitter: true,
1736 },
1737 circuit_breaker: CircuitBreakerConfig {
1738 enabled: false,
1739 failure_threshold: 5,
1740 success_threshold: 2,
1741 timeout: Duration::from_secs(60),
1742 half_open_max_calls: 10,
1743 },
1744 security: SecurityConfig::default(),
1745 performance: StreamPerformanceConfig::default(),
1746 monitoring: MonitoringConfig {
1747 enable_metrics: true,
1748 enable_tracing: false,
1749 metrics_interval: Duration::from_secs(5),
1750 health_check_interval: Duration::from_secs(30),
1751 enable_profiling: false,
1752 prometheus_endpoint: None,
1753 jaeger_endpoint: None,
1754 log_level: "debug".to_string(),
1755 },
1756 }
1757 }
1758
1759 pub fn production(topic: &str) -> Self {
1761 Self {
1762 backend: StreamBackendType::Memory {
1763 max_size: Some(100000),
1764 persistence: true,
1765 },
1766 topic: topic.to_string(),
1767 batch_size: 1000,
1768 flush_interval_ms: 10,
1769 max_connections: 50,
1770 connection_timeout: Duration::from_secs(30),
1771 enable_compression: true,
1772 compression_type: CompressionType::Zstd,
1773 retry_config: RetryConfig {
1774 max_retries: 5,
1775 initial_backoff: Duration::from_millis(200),
1776 max_backoff: Duration::from_secs(30),
1777 backoff_multiplier: 2.0,
1778 jitter: true,
1779 },
1780 circuit_breaker: CircuitBreakerConfig {
1781 enabled: true,
1782 failure_threshold: 10,
1783 success_threshold: 3,
1784 timeout: Duration::from_secs(30),
1785 half_open_max_calls: 5,
1786 },
1787 security: SecurityConfig::default(),
1788 performance: StreamPerformanceConfig {
1789 enable_batching: true,
1790 enable_pipelining: true,
1791 parallel_processing: true,
1792 buffer_size: 65536,
1793 prefetch_count: 1000,
1794 enable_zero_copy: true,
1795 enable_simd: true,
1796 worker_threads: None,
1797 },
1798 monitoring: MonitoringConfig {
1799 enable_metrics: true,
1800 enable_tracing: true,
1801 metrics_interval: Duration::from_secs(1),
1802 health_check_interval: Duration::from_secs(10),
1803 enable_profiling: true,
1804 prometheus_endpoint: None,
1805 jaeger_endpoint: None,
1806 log_level: "info".to_string(),
1807 },
1808 }
1809 }
1810}
1811
1812pub struct Stream {
1814 producer: StreamProducer,
1815 consumer: StreamConsumer,
1816}
1817
1818impl Stream {
1819 pub async fn new(config: StreamConfig) -> Result<Self> {
1821 let producer = StreamProducer::new(config.clone()).await?;
1828 let consumer = StreamConsumer::new(config).await?;
1829
1830 Ok(Self { producer, consumer })
1831 }
1832
1833 pub async fn publish(&mut self, event: StreamEvent) -> Result<()> {
1835 self.producer.publish(event).await
1836 }
1837
1838 pub async fn consume(&mut self) -> Result<Option<StreamEvent>> {
1840 self.consumer.consume().await
1841 }
1842
1843 pub async fn flush(&mut self) -> Result<()> {
1845 self.producer.flush().await
1846 }
1847
1848 pub async fn producer_stats(&self) -> ProducerStats {
1850 self.producer.get_stats().await
1851 }
1852
1853 pub async fn consumer_stats(&self) -> ConsumerStats {
1855 self.consumer.get_stats().await
1856 }
1857
1858 pub async fn close(&mut self) -> Result<()> {
1860 self.producer.flush().await?;
1862
1863 debug!("Stream closed successfully");
1866 Ok(())
1867 }
1868
1869 pub async fn health_check(&self) -> Result<bool> {
1871 Ok(true)
1874 }
1875
1876 pub async fn begin_transaction(&mut self) -> Result<()> {
1878 debug!("Transaction begun (placeholder)");
1880 Ok(())
1881 }
1882
1883 pub async fn commit_transaction(&mut self) -> Result<()> {
1885 debug!("Transaction committed (placeholder)");
1887 Ok(())
1888 }
1889
1890 pub async fn rollback_transaction(&mut self) -> Result<()> {
1892 debug!("Transaction rolled back (placeholder)");
1894 Ok(())
1895 }
1896}