1use anyhow::{anyhow, Result};
4use chrono::{DateTime, Utc};
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9use tokio::sync::{RwLock, Semaphore};
10use tracing::{debug, error, info};
11use uuid::Uuid;
12
13use crate::backend::{self, StreamBackend};
14use crate::circuit_breaker::{self, SharedCircuitBreakerExt};
15use crate::event::{EventMetadata, StreamEvent};
16
17#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct StreamConfig {
20 pub backend: StreamBackendType,
21 pub topic: String,
22 pub batch_size: usize,
23 pub flush_interval_ms: u64,
24 pub max_connections: usize,
26 pub connection_timeout: Duration,
28 pub enable_compression: bool,
30 pub compression_type: CompressionType,
32 pub retry_config: RetryConfig,
34 pub circuit_breaker: CircuitBreakerConfig,
36 pub security: SecurityConfig,
38 pub performance: StreamPerformanceConfig,
40 pub monitoring: MonitoringConfig,
42}
43
44#[derive(Debug, Clone, Serialize, Deserialize)]
46pub enum CompressionType {
47 None,
48 Gzip,
49 Snappy,
50 Lz4,
51 Zstd,
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct RetryConfig {
57 pub max_retries: u32,
58 pub initial_backoff: Duration,
59 pub max_backoff: Duration,
60 pub backoff_multiplier: f64,
61 pub jitter: bool,
62}
63
64#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct CircuitBreakerConfig {
67 pub enabled: bool,
68 pub failure_threshold: u32,
69 pub success_threshold: u32,
70 pub timeout: Duration,
71 pub half_open_max_calls: u32,
72}
73
74#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct SecurityConfig {
77 pub enable_tls: bool,
78 pub verify_certificates: bool,
79 pub client_cert_path: Option<String>,
80 pub client_key_path: Option<String>,
81 pub ca_cert_path: Option<String>,
82 pub sasl_config: Option<SaslConfig>,
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct SaslConfig {
88 pub mechanism: SaslMechanism,
89 pub username: String,
90 pub password: String,
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize)]
95pub enum SaslMechanism {
96 Plain,
97 ScramSha256,
98 ScramSha512,
99 OAuthBearer,
100}
101
102#[derive(Debug, Clone, Serialize, Deserialize)]
104pub struct StreamPerformanceConfig {
105 pub enable_batching: bool,
106 pub enable_pipelining: bool,
107 pub buffer_size: usize,
108 pub prefetch_count: u32,
109 pub enable_zero_copy: bool,
110 pub enable_simd: bool,
111 pub parallel_processing: bool,
112 pub worker_threads: Option<usize>,
113}
114
115#[derive(Debug, Clone, Serialize, Deserialize)]
117pub struct MonitoringConfig {
118 pub enable_metrics: bool,
119 pub enable_tracing: bool,
120 pub metrics_interval: Duration,
121 pub health_check_interval: Duration,
122 pub enable_profiling: bool,
123 pub prometheus_endpoint: Option<String>,
124 pub jaeger_endpoint: Option<String>,
125 pub log_level: String,
126}
127
128#[derive(Debug, Clone, Serialize, Deserialize)]
130pub enum StreamBackendType {
131 #[cfg(feature = "kafka")]
132 Kafka {
133 brokers: Vec<String>,
134 security_protocol: Option<String>,
135 sasl_config: Option<SaslConfig>,
136 },
137 #[cfg(feature = "nats")]
138 Nats {
139 url: String,
140 cluster_urls: Option<Vec<String>>,
141 jetstream_config: Option<NatsJetStreamConfig>,
142 },
143 #[cfg(feature = "redis")]
144 Redis {
145 url: String,
146 cluster_urls: Option<Vec<String>>,
147 pool_size: Option<usize>,
148 },
149 #[cfg(feature = "kinesis")]
150 Kinesis {
151 region: String,
152 stream_name: String,
153 credentials: Option<AwsCredentials>,
154 },
155 #[cfg(feature = "pulsar")]
156 Pulsar {
157 service_url: String,
158 auth_config: Option<PulsarAuthConfig>,
159 },
160 #[cfg(feature = "rabbitmq")]
161 RabbitMQ {
162 url: String,
163 exchange: Option<String>,
164 queue: Option<String>,
165 },
166 Memory {
167 max_size: Option<usize>,
168 persistence: bool,
169 },
170}
171
172#[derive(Debug, Clone, Serialize, Deserialize)]
174pub struct NatsJetStreamConfig {
175 pub domain: Option<String>,
176 pub api_prefix: Option<String>,
177 pub timeout: Duration,
178}
179
180#[derive(Debug, Clone, Serialize, Deserialize)]
182pub struct AwsCredentials {
183 pub access_key_id: String,
184 pub secret_access_key: String,
185 pub session_token: Option<String>,
186 pub role_arn: Option<String>,
187}
188
189#[derive(Debug, Clone, Serialize, Deserialize)]
191pub struct PulsarAuthConfig {
192 pub auth_method: PulsarAuthMethod,
193 pub auth_params: HashMap<String, String>,
194}
195
196#[derive(Debug, Clone, Serialize, Deserialize)]
198pub enum PulsarAuthMethod {
199 Token,
200 Jwt,
201 Oauth2,
202 Tls,
203}
204
205pub struct StreamProducer {
207 config: StreamConfig,
208 backend_producer: BackendProducer,
209 stats: Arc<RwLock<ProducerStats>>,
210 circuit_breaker: Option<circuit_breaker::SharedCircuitBreaker>,
211 last_flush: Instant,
212 _pending_events: Arc<RwLock<Vec<StreamEvent>>>,
213 batch_buffer: Arc<RwLock<Vec<StreamEvent>>>,
214 flush_semaphore: Arc<Semaphore>,
215}
216
217enum BackendProducer {
219 #[cfg(feature = "kafka")]
220 Kafka(backend::kafka::KafkaProducer),
221 #[cfg(feature = "nats")]
222 Nats(Box<backend::nats::NatsProducer>),
223 #[cfg(feature = "redis")]
224 Redis(backend::redis::RedisProducer),
225 #[cfg(feature = "kinesis")]
226 Kinesis(backend::kinesis::KinesisProducer),
227 #[cfg(feature = "pulsar")]
228 Pulsar(Box<backend::pulsar::PulsarProducer>),
229 #[cfg(feature = "rabbitmq")]
230 RabbitMQ(Box<backend::rabbitmq::RabbitMQProducer>),
231 Memory(MemoryProducer),
232}
233
234#[derive(Debug, Default, Clone)]
236pub struct ProducerStats {
237 events_published: u64,
238 events_failed: u64,
239 _bytes_sent: u64,
240 avg_latency_ms: f64,
241 max_latency_ms: u64,
242 batch_count: u64,
243 flush_count: u64,
244 circuit_breaker_trips: u64,
245 last_publish: Option<DateTime<Utc>>,
246 backend_type: String,
247}
248
249type MemoryEventVec = Vec<(DateTime<Utc>, StreamEvent)>;
252type MemoryEventStore = Arc<RwLock<MemoryEventVec>>;
253
254static MEMORY_EVENTS: std::sync::OnceLock<MemoryEventStore> = std::sync::OnceLock::new();
256
257pub fn get_memory_events() -> MemoryEventStore {
258 MEMORY_EVENTS
259 .get_or_init(|| Arc::new(RwLock::new(Vec::new())))
260 .clone()
261}
262
263pub async fn clear_memory_events() {
265 let events = get_memory_events();
266 events.write().await.clear();
267 backend::memory::clear_memory_storage().await;
269}
270
271struct MemoryProducer {
272 backend: Box<dyn StreamBackend + Send + Sync>,
273 topic: String,
274 stats: ProducerStats,
275}
276
277impl MemoryProducer {
278 fn _new(_max_size: Option<usize>, _persistence: bool) -> Self {
279 Self {
280 backend: Box::new(backend::memory::MemoryBackend::new()),
281 topic: "oxirs-stream".to_string(), stats: ProducerStats {
283 backend_type: "memory".to_string(),
284 ..Default::default()
285 },
286 }
287 }
288
289 fn with_topic(topic: String) -> Self {
290 Self {
291 backend: Box::new(backend::memory::MemoryBackend::new()),
292 topic,
293 stats: ProducerStats {
294 backend_type: "memory".to_string(),
295 ..Default::default()
296 },
297 }
298 }
299
300 async fn publish(&mut self, event: StreamEvent) -> Result<()> {
301 let start_time = Instant::now();
302
303 self.backend
305 .as_mut()
306 .connect()
307 .await
308 .map_err(|e| anyhow!("Backend connect failed: {}", e))?;
309
310 let topic_name = crate::types::TopicName::new(self.topic.clone());
312 self.backend
313 .create_topic(&topic_name, 1)
314 .await
315 .map_err(|e| anyhow!("Topic creation failed: {}", e))?;
316
317 self.backend
319 .send_event(&topic_name, event)
320 .await
321 .map_err(|e| anyhow!("Send event failed: {}", e))?;
322
323 self.stats.events_published += 1;
325 let latency = start_time.elapsed().as_millis() as u64;
326 self.stats.max_latency_ms = self.stats.max_latency_ms.max(latency);
327 self.stats.avg_latency_ms = (self.stats.avg_latency_ms + latency as f64) / 2.0;
328 self.stats.last_publish = Some(Utc::now());
329
330 debug!("Memory producer: published event via backend");
331 Ok(())
332 }
333
334 async fn flush(&mut self) -> Result<()> {
335 self.stats.flush_count += 1;
337 debug!("Memory producer: flush completed");
338 Ok(())
339 }
340
341 fn _get_stats(&self) -> &ProducerStats {
342 &self.stats
343 }
344}
345
346impl StreamProducer {
347 pub async fn new(config: StreamConfig) -> Result<Self> {
349 let circuit_breaker = if config.circuit_breaker.enabled {
351 Some(circuit_breaker::new_shared_circuit_breaker(
352 circuit_breaker::CircuitBreakerConfig {
353 enabled: config.circuit_breaker.enabled,
354 failure_threshold: config.circuit_breaker.failure_threshold,
355 success_threshold: config.circuit_breaker.success_threshold,
356 timeout: config.circuit_breaker.timeout,
357 half_open_max_calls: config.circuit_breaker.half_open_max_calls,
358 ..Default::default()
359 },
360 ))
361 } else {
362 None
363 };
364
365 let backend_producer = match &config.backend {
367 #[cfg(feature = "kafka")]
368 StreamBackendType::Kafka {
369 brokers,
370 security_protocol,
371 sasl_config,
372 } => {
373 let stream_config = crate::StreamConfig {
374 backend: crate::StreamBackendType::Kafka {
375 brokers: brokers.clone(),
376 security_protocol: security_protocol.clone(),
377 sasl_config: sasl_config.clone(),
378 },
379 topic: config.topic.clone(),
380 batch_size: config.batch_size,
381 flush_interval_ms: config.flush_interval_ms,
382 max_connections: config.max_connections,
383 connection_timeout: config.connection_timeout,
384 enable_compression: config.enable_compression,
385 compression_type: config.compression_type.clone(),
386 retry_config: config.retry_config.clone(),
387 circuit_breaker: config.circuit_breaker.clone(),
388 security: config.security.clone(),
389 performance: config.performance.clone(),
390 monitoring: config.monitoring.clone(),
391 };
392
393 let mut producer = backend::kafka::KafkaProducer::new(stream_config)?;
394 producer.connect().await?;
395 BackendProducer::Kafka(producer)
396 }
397 #[cfg(feature = "nats")]
398 StreamBackendType::Nats {
399 url,
400 cluster_urls,
401 jetstream_config,
402 } => {
403 let stream_config = crate::StreamConfig {
404 backend: crate::StreamBackendType::Nats {
405 url: url.clone(),
406 cluster_urls: cluster_urls.clone(),
407 jetstream_config: jetstream_config.clone(),
408 },
409 topic: config.topic.clone(),
410 batch_size: config.batch_size,
411 flush_interval_ms: config.flush_interval_ms,
412 max_connections: config.max_connections,
413 connection_timeout: config.connection_timeout,
414 enable_compression: config.enable_compression,
415 compression_type: config.compression_type.clone(),
416 retry_config: config.retry_config.clone(),
417 circuit_breaker: config.circuit_breaker.clone(),
418 security: config.security.clone(),
419 performance: config.performance.clone(),
420 monitoring: config.monitoring.clone(),
421 };
422
423 let mut producer = backend::nats::NatsProducer::new(stream_config)?;
424 producer.connect().await?;
425 BackendProducer::Nats(Box::new(producer))
426 }
427 #[cfg(feature = "redis")]
428 StreamBackendType::Redis {
429 url,
430 cluster_urls,
431 pool_size,
432 } => {
433 let stream_config = crate::StreamConfig {
434 backend: crate::StreamBackendType::Redis {
435 url: url.clone(),
436 cluster_urls: cluster_urls.clone(),
437 pool_size: *pool_size,
438 },
439 topic: config.topic.clone(),
440 batch_size: config.batch_size,
441 flush_interval_ms: config.flush_interval_ms,
442 max_connections: config.max_connections,
443 connection_timeout: config.connection_timeout,
444 enable_compression: config.enable_compression,
445 compression_type: config.compression_type.clone(),
446 retry_config: config.retry_config.clone(),
447 circuit_breaker: config.circuit_breaker.clone(),
448 security: config.security.clone(),
449 performance: config.performance.clone(),
450 monitoring: config.monitoring.clone(),
451 };
452
453 let mut producer = backend::redis::RedisProducer::new(stream_config)?;
454 producer.connect().await?;
455 BackendProducer::Redis(producer)
456 }
457 #[cfg(feature = "kinesis")]
458 StreamBackendType::Kinesis {
459 region,
460 stream_name,
461 credentials,
462 } => {
463 let stream_config = crate::StreamConfig {
464 backend: crate::StreamBackendType::Kinesis {
465 region: region.clone(),
466 stream_name: stream_name.clone(),
467 credentials: credentials.clone(),
468 },
469 topic: config.topic.clone(),
470 batch_size: config.batch_size,
471 flush_interval_ms: config.flush_interval_ms,
472 max_connections: config.max_connections,
473 connection_timeout: config.connection_timeout,
474 enable_compression: config.enable_compression,
475 compression_type: config.compression_type.clone(),
476 retry_config: config.retry_config.clone(),
477 circuit_breaker: config.circuit_breaker.clone(),
478 security: config.security.clone(),
479 performance: config.performance.clone(),
480 monitoring: config.monitoring.clone(),
481 };
482
483 let mut producer = backend::kinesis::KinesisProducer::new(stream_config)?;
484 producer.connect().await?;
485 BackendProducer::Kinesis(producer)
486 }
487 #[cfg(feature = "pulsar")]
488 StreamBackendType::Pulsar {
489 service_url,
490 auth_config,
491 } => {
492 let stream_config = crate::StreamConfig {
493 backend: crate::StreamBackendType::Pulsar {
494 service_url: service_url.clone(),
495 auth_config: auth_config.clone(),
496 },
497 topic: config.topic.clone(),
498 batch_size: config.batch_size,
499 flush_interval_ms: config.flush_interval_ms,
500 max_connections: config.max_connections,
501 connection_timeout: config.connection_timeout,
502 enable_compression: config.enable_compression,
503 compression_type: config.compression_type.clone(),
504 retry_config: config.retry_config.clone(),
505 circuit_breaker: config.circuit_breaker.clone(),
506 security: config.security.clone(),
507 performance: config.performance.clone(),
508 monitoring: config.monitoring.clone(),
509 };
510
511 let mut producer = backend::pulsar::PulsarProducer::new(stream_config)?;
512 producer.connect().await?;
513 BackendProducer::Pulsar(Box::new(producer))
514 }
515 #[cfg(feature = "rabbitmq")]
516 StreamBackendType::RabbitMQ {
517 url,
518 exchange,
519 queue,
520 } => {
521 let stream_config = crate::StreamConfig {
522 backend: crate::StreamBackendType::RabbitMQ {
523 url: url.clone(),
524 exchange: exchange.clone(),
525 queue: queue.clone(),
526 },
527 topic: config.topic.clone(),
528 batch_size: config.batch_size,
529 flush_interval_ms: config.flush_interval_ms,
530 max_connections: config.max_connections,
531 connection_timeout: config.connection_timeout,
532 enable_compression: config.enable_compression,
533 compression_type: config.compression_type.clone(),
534 retry_config: config.retry_config.clone(),
535 circuit_breaker: config.circuit_breaker.clone(),
536 security: config.security.clone(),
537 performance: config.performance.clone(),
538 monitoring: config.monitoring.clone(),
539 };
540
541 let mut producer = backend::rabbitmq::RabbitMQProducer::new(stream_config)?;
542 producer.connect().await?;
543 BackendProducer::RabbitMQ(Box::new(producer))
544 }
545 StreamBackendType::Memory {
546 max_size: _,
547 persistence: _,
548 } => BackendProducer::Memory(MemoryProducer::with_topic(config.topic.clone())),
549 };
550
551 let stats = Arc::new(RwLock::new(ProducerStats {
552 backend_type: match backend_producer {
553 #[cfg(feature = "kafka")]
554 BackendProducer::Kafka(_) => "kafka".to_string(),
555 #[cfg(feature = "nats")]
556 BackendProducer::Nats(_) => "nats".to_string(),
557 #[cfg(feature = "redis")]
558 BackendProducer::Redis(_) => "redis".to_string(),
559 #[cfg(feature = "kinesis")]
560 BackendProducer::Kinesis(_) => "kinesis".to_string(),
561 #[cfg(feature = "pulsar")]
562 BackendProducer::Pulsar(_) => "pulsar".to_string(),
563 #[cfg(feature = "rabbitmq")]
564 BackendProducer::RabbitMQ(_) => "rabbitmq".to_string(),
565 BackendProducer::Memory(_) => "memory".to_string(),
566 },
567 ..Default::default()
568 }));
569
570 info!(
571 "Created stream producer with backend: {}",
572 stats.read().await.backend_type
573 );
574
575 Ok(Self {
576 config,
577 backend_producer,
578 stats,
579 circuit_breaker,
580 last_flush: Instant::now(),
581 _pending_events: Arc::new(RwLock::new(Vec::new())),
582 batch_buffer: Arc::new(RwLock::new(Vec::new())),
583 flush_semaphore: Arc::new(Semaphore::new(1)),
584 })
585 }
586
587 pub async fn publish(&mut self, event: StreamEvent) -> Result<()> {
589 let start_time = Instant::now();
590
591 if let Some(cb) = &self.circuit_breaker {
593 if !cb.can_execute().await {
594 self.stats.write().await.circuit_breaker_trips += 1;
595 return Err(anyhow!("Circuit breaker is open - cannot publish events"));
596 }
597 }
598
599 if self.config.performance.enable_batching {
601 let mut batch_buffer = self.batch_buffer.write().await;
602 batch_buffer.push(event);
603
604 if batch_buffer.len() >= self.config.batch_size {
605 let events = std::mem::take(&mut *batch_buffer);
606 drop(batch_buffer);
607 return self.publish_batch_internal(events).await;
608 }
609
610 return Ok(());
611 }
612
613 let result = self.publish_single_event(event).await;
615
616 match &result {
618 Ok(_) => {
619 if let Some(cb) = &self.circuit_breaker {
620 cb.record_success_with_duration(start_time.elapsed()).await;
621 }
622
623 let mut stats = self.stats.write().await;
624 stats.events_published += 1;
625 let latency = start_time.elapsed().as_millis() as u64;
626 stats.max_latency_ms = stats.max_latency_ms.max(latency);
627 stats.avg_latency_ms = (stats.avg_latency_ms + latency as f64) / 2.0;
628 stats.last_publish = Some(Utc::now());
629 }
630 Err(_) => {
631 if let Some(cb) = &self.circuit_breaker {
632 cb.record_failure_with_type(circuit_breaker::FailureType::NetworkError)
633 .await;
634 }
635
636 self.stats.write().await.events_failed += 1;
637 }
638 }
639
640 result
641 }
642
643 async fn publish_single_event(&mut self, event: StreamEvent) -> Result<()> {
645 match &mut self.backend_producer {
646 #[cfg(feature = "kafka")]
647 BackendProducer::Kafka(producer) => producer.publish(event).await,
648 #[cfg(feature = "nats")]
649 BackendProducer::Nats(producer) => producer.publish(event).await,
650 #[cfg(feature = "redis")]
651 BackendProducer::Redis(producer) => producer.publish(event).await,
652 #[cfg(feature = "kinesis")]
653 BackendProducer::Kinesis(producer) => producer.publish(event).await,
654 #[cfg(feature = "pulsar")]
655 BackendProducer::Pulsar(producer) => producer.publish(event).await,
656 #[cfg(feature = "rabbitmq")]
657 BackendProducer::RabbitMQ(producer) => producer.publish(event).await,
658 BackendProducer::Memory(producer) => producer.publish(event).await,
659 }
660 }
661
662 pub async fn publish_batch(&mut self, events: Vec<StreamEvent>) -> Result<()> {
664 if events.is_empty() {
665 return Ok(());
666 }
667
668 self.publish_batch_internal(events).await
669 }
670
671 async fn publish_batch_internal(&mut self, events: Vec<StreamEvent>) -> Result<()> {
673 let start_time = Instant::now();
674 let event_count = events.len();
675
676 let result = match &mut self.backend_producer {
677 #[cfg(feature = "kafka")]
678 BackendProducer::Kafka(producer) => producer.publish_batch(events).await,
679 #[cfg(feature = "nats")]
680 BackendProducer::Nats(producer) => producer.publish_batch(events).await,
681 #[cfg(feature = "redis")]
682 BackendProducer::Redis(producer) => producer.publish_batch(events).await,
683 #[cfg(feature = "kinesis")]
684 BackendProducer::Kinesis(producer) => producer.publish_batch(events).await,
685 #[cfg(feature = "pulsar")]
686 BackendProducer::Pulsar(producer) => producer.publish_batch(events).await,
687 #[cfg(feature = "rabbitmq")]
688 BackendProducer::RabbitMQ(producer) => producer.publish_batch(events).await,
689 BackendProducer::Memory(producer) => {
690 for event in events {
691 producer.publish(event).await?;
692 }
693 Ok(())
694 }
695 };
696
697 let mut stats = self.stats.write().await;
699 match &result {
700 Ok(_) => {
701 stats.events_published += event_count as u64;
702 stats.batch_count += 1;
703 let latency = start_time.elapsed().as_millis() as u64;
704 stats.max_latency_ms = stats.max_latency_ms.max(latency);
705 stats.avg_latency_ms = (stats.avg_latency_ms + latency as f64) / 2.0;
706 stats.last_publish = Some(Utc::now());
707 }
708 Err(_) => {
709 stats.events_failed += event_count as u64;
710 }
711 }
712
713 debug!(
714 "Published batch of {} events in {:?}",
715 event_count,
716 start_time.elapsed()
717 );
718 result
719 }
720
721 pub async fn flush(&mut self) -> Result<()> {
723 let _permit = self
724 .flush_semaphore
725 .acquire()
726 .await
727 .map_err(|_| anyhow!("Failed to acquire flush semaphore"))?;
728
729 let start_time = Instant::now();
730
731 if self.config.performance.enable_batching {
733 let events = {
734 let mut batch_buffer = self.batch_buffer.write().await;
735 if !batch_buffer.is_empty() {
736 std::mem::take(&mut *batch_buffer)
737 } else {
738 Vec::new()
739 }
740 };
741 if !events.is_empty() {
742 drop(_permit); self.publish_batch_internal(events).await?;
744 }
745 }
746
747 let result = match &mut self.backend_producer {
749 #[cfg(feature = "kafka")]
750 BackendProducer::Kafka(producer) => producer.flush().await,
751 #[cfg(feature = "nats")]
752 BackendProducer::Nats(producer) => producer.flush().await,
753 #[cfg(feature = "redis")]
754 BackendProducer::Redis(producer) => producer.flush().await,
755 #[cfg(feature = "kinesis")]
756 BackendProducer::Kinesis(producer) => producer.flush().await,
757 #[cfg(feature = "pulsar")]
758 BackendProducer::Pulsar(producer) => producer.flush().await,
759 #[cfg(feature = "rabbitmq")]
760 BackendProducer::RabbitMQ(producer) => producer.flush().await,
761 BackendProducer::Memory(producer) => producer.flush().await,
762 };
763
764 if result.is_ok() {
766 self.stats.write().await.flush_count += 1;
767 self.last_flush = Instant::now();
768 debug!("Flushed producer buffers in {:?}", start_time.elapsed());
769 }
770
771 result
772 }
773
774 pub async fn publish_patch(&mut self, patch: &RdfPatch) -> Result<()> {
776 let events: Vec<StreamEvent> = patch
777 .operations
778 .iter()
779 .filter_map(|op| {
780 let metadata = EventMetadata {
781 event_id: Uuid::new_v4().to_string(),
782 timestamp: patch.timestamp,
783 source: "rdf_patch".to_string(),
784 user: None,
785 context: Some(patch.id.clone()),
786 caused_by: None,
787 version: "1.0".to_string(),
788 properties: HashMap::new(),
789 checksum: None,
790 };
791
792 match op {
793 PatchOperation::Add {
794 subject,
795 predicate,
796 object,
797 } => Some(StreamEvent::TripleAdded {
798 subject: subject.clone(),
799 predicate: predicate.clone(),
800 object: object.clone(),
801 graph: None,
802 metadata,
803 }),
804 PatchOperation::Delete {
805 subject,
806 predicate,
807 object,
808 } => Some(StreamEvent::TripleRemoved {
809 subject: subject.clone(),
810 predicate: predicate.clone(),
811 object: object.clone(),
812 graph: None,
813 metadata,
814 }),
815 PatchOperation::AddGraph { graph } => Some(StreamEvent::GraphCreated {
816 graph: graph.clone(),
817 metadata,
818 }),
819 PatchOperation::DeleteGraph { graph } => Some(StreamEvent::GraphDeleted {
820 graph: graph.clone(),
821 metadata,
822 }),
823 PatchOperation::AddPrefix { .. } => {
824 None
826 }
827 PatchOperation::DeletePrefix { .. } => {
828 None
830 }
831 PatchOperation::TransactionBegin { .. } => {
832 Some(StreamEvent::TransactionBegin {
833 transaction_id: patch
834 .transaction_id
835 .clone()
836 .unwrap_or_else(|| Uuid::new_v4().to_string()),
837 isolation_level: None,
838 metadata,
839 })
840 }
841 PatchOperation::TransactionCommit => Some(StreamEvent::TransactionCommit {
842 transaction_id: patch
843 .transaction_id
844 .clone()
845 .unwrap_or_else(|| Uuid::new_v4().to_string()),
846 metadata,
847 }),
848 PatchOperation::TransactionAbort => Some(StreamEvent::TransactionAbort {
849 transaction_id: patch
850 .transaction_id
851 .clone()
852 .unwrap_or_else(|| Uuid::new_v4().to_string()),
853 metadata,
854 }),
855 PatchOperation::Header { .. } => {
856 None
858 }
859 }
860 })
861 .collect();
862
863 self.publish_batch(events).await
864 }
865
866 pub async fn get_stats(&self) -> ProducerStats {
868 self.stats.read().await.clone()
869 }
870
871 pub async fn health_check(&self) -> bool {
873 if let Some(cb) = &self.circuit_breaker {
874 cb.is_healthy().await
875 } else {
876 true
877 }
878 }
879}
880
881pub struct StreamConsumer {
883 _config: StreamConfig,
884 backend_consumer: BackendConsumer,
885 stats: Arc<RwLock<ConsumerStats>>,
886 circuit_breaker: Option<circuit_breaker::SharedCircuitBreaker>,
887 last_poll: Instant,
888 _message_buffer: Arc<RwLock<Vec<StreamEvent>>>,
889 consumer_group: Option<String>,
890}
891
892enum BackendConsumer {
894 #[cfg(feature = "kafka")]
895 Kafka(Box<backend::kafka::KafkaConsumer>),
896 #[cfg(feature = "nats")]
897 Nats(Box<backend::nats::NatsConsumer>),
898 #[cfg(feature = "redis")]
899 Redis(Box<backend::redis::RedisConsumer>),
900 #[cfg(feature = "kinesis")]
901 Kinesis(Box<backend::kinesis::KinesisConsumer>),
902 #[cfg(feature = "pulsar")]
903 Pulsar(Box<backend::pulsar::PulsarConsumer>),
904 #[cfg(feature = "rabbitmq")]
905 RabbitMQ(Box<backend::rabbitmq::RabbitMQConsumer>),
906 Memory(Box<MemoryConsumer>),
907}
908
909#[derive(Debug, Default, Clone)]
911pub struct ConsumerStats {
912 events_consumed: u64,
913 events_failed: u64,
914 _bytes_received: u64,
915 avg_processing_time_ms: f64,
916 max_processing_time_ms: u64,
917 _consumer_lag: u64,
918 circuit_breaker_trips: u64,
919 last_message: Option<DateTime<Utc>>,
920 backend_type: String,
921 _batch_size: usize,
922}
923
924struct MemoryConsumer {
926 backend: Box<dyn StreamBackend + Send + Sync>,
927 topic: String,
928 current_offset: u64,
929 stats: ConsumerStats,
930}
931
932impl MemoryConsumer {
933 fn _new() -> Self {
934 Self {
935 backend: Box::new(backend::memory::MemoryBackend::new()),
936 topic: "oxirs-stream".to_string(),
937 current_offset: 0,
938 stats: ConsumerStats {
939 backend_type: "memory".to_string(),
940 ..Default::default()
941 },
942 }
943 }
944
945 fn with_topic(topic: String) -> Self {
946 Self {
947 backend: Box::new(backend::memory::MemoryBackend::new()),
948 topic,
949 current_offset: 0,
950 stats: ConsumerStats {
951 backend_type: "memory".to_string(),
952 ..Default::default()
953 },
954 }
955 }
956
957 async fn consume(&mut self) -> Result<Option<StreamEvent>> {
958 let start_time = Instant::now();
959
960 self.backend
962 .as_mut()
963 .connect()
964 .await
965 .map_err(|e| anyhow!("Backend connect failed: {}", e))?;
966
967 let topic_name = crate::types::TopicName::new(self.topic.clone());
969 let events = self
970 .backend
971 .receive_events(
972 &topic_name,
973 None, crate::types::StreamPosition::Offset(self.current_offset),
975 1, )
977 .await
978 .map_err(|e| anyhow!("Receive events failed: {}", e))?;
979
980 if let Some((event, offset)) = events.first() {
981 self.current_offset = offset.value() + 1;
982
983 self.stats.events_consumed += 1;
985 let processing_time = start_time.elapsed().as_millis() as u64;
986 self.stats.max_processing_time_ms =
987 self.stats.max_processing_time_ms.max(processing_time);
988 self.stats.avg_processing_time_ms =
989 (self.stats.avg_processing_time_ms + processing_time as f64) / 2.0;
990 self.stats.last_message = Some(Utc::now());
991
992 debug!("Memory consumer: consumed event via backend");
993 Ok(Some(event.clone()))
994 } else {
995 debug!("Memory consumer: no events available");
996 Ok(None)
997 }
998 }
999
1000 fn _get_stats(&self) -> &ConsumerStats {
1001 &self.stats
1002 }
1003
1004 fn reset(&mut self) {
1006 self.current_offset = 0;
1007 }
1008}
1009
1010impl StreamConsumer {
1011 pub async fn new(config: StreamConfig) -> Result<Self> {
1013 Self::new_with_group(config, None).await
1014 }
1015
1016 pub async fn new_with_group(
1018 config: StreamConfig,
1019 consumer_group: Option<String>,
1020 ) -> Result<Self> {
1021 let circuit_breaker = if config.circuit_breaker.enabled {
1023 Some(circuit_breaker::new_shared_circuit_breaker(
1024 circuit_breaker::CircuitBreakerConfig {
1025 enabled: config.circuit_breaker.enabled,
1026 failure_threshold: config.circuit_breaker.failure_threshold,
1027 success_threshold: config.circuit_breaker.success_threshold,
1028 timeout: config.circuit_breaker.timeout,
1029 half_open_max_calls: config.circuit_breaker.half_open_max_calls,
1030 ..Default::default()
1031 },
1032 ))
1033 } else {
1034 None
1035 };
1036
1037 let backend_consumer = match &config.backend {
1039 #[cfg(feature = "kafka")]
1040 StreamBackendType::Kafka {
1041 brokers,
1042 security_protocol,
1043 sasl_config,
1044 } => {
1045 let stream_config = crate::StreamConfig {
1046 backend: crate::StreamBackendType::Kafka {
1047 brokers: brokers.clone(),
1048 security_protocol: security_protocol.clone(),
1049 sasl_config: sasl_config.clone(),
1050 },
1051 topic: config.topic.clone(),
1052 batch_size: config.batch_size,
1053 flush_interval_ms: config.flush_interval_ms,
1054 max_connections: config.max_connections,
1055 connection_timeout: config.connection_timeout,
1056 enable_compression: config.enable_compression,
1057 compression_type: config.compression_type.clone(),
1058 retry_config: config.retry_config.clone(),
1059 circuit_breaker: config.circuit_breaker.clone(),
1060 security: config.security.clone(),
1061 performance: config.performance.clone(),
1062 monitoring: config.monitoring.clone(),
1063 };
1064
1065 let mut consumer = backend::kafka::KafkaConsumer::new(stream_config)?;
1066 consumer.connect().await?;
1067 BackendConsumer::Kafka(Box::new(consumer))
1068 }
1069 #[cfg(feature = "nats")]
1070 StreamBackendType::Nats {
1071 url,
1072 cluster_urls,
1073 jetstream_config,
1074 } => {
1075 let stream_config = crate::StreamConfig {
1076 backend: crate::StreamBackendType::Nats {
1077 url: url.clone(),
1078 cluster_urls: cluster_urls.clone(),
1079 jetstream_config: jetstream_config.clone(),
1080 },
1081 topic: config.topic.clone(),
1082 batch_size: config.batch_size,
1083 flush_interval_ms: config.flush_interval_ms,
1084 max_connections: config.max_connections,
1085 connection_timeout: config.connection_timeout,
1086 enable_compression: config.enable_compression,
1087 compression_type: config.compression_type.clone(),
1088 retry_config: config.retry_config.clone(),
1089 circuit_breaker: config.circuit_breaker.clone(),
1090 security: config.security.clone(),
1091 performance: config.performance.clone(),
1092 monitoring: config.monitoring.clone(),
1093 };
1094
1095 let mut consumer = backend::nats::NatsConsumer::new(stream_config)?;
1096 consumer.connect().await?;
1097 BackendConsumer::Nats(Box::new(consumer))
1098 }
1099 #[cfg(feature = "redis")]
1100 StreamBackendType::Redis {
1101 url,
1102 cluster_urls,
1103 pool_size,
1104 } => {
1105 let stream_config = crate::StreamConfig {
1106 backend: crate::StreamBackendType::Redis {
1107 url: url.clone(),
1108 cluster_urls: cluster_urls.clone(),
1109 pool_size: *pool_size,
1110 },
1111 topic: config.topic.clone(),
1112 batch_size: config.batch_size,
1113 flush_interval_ms: config.flush_interval_ms,
1114 max_connections: config.max_connections,
1115 connection_timeout: config.connection_timeout,
1116 enable_compression: config.enable_compression,
1117 compression_type: config.compression_type.clone(),
1118 retry_config: config.retry_config.clone(),
1119 circuit_breaker: config.circuit_breaker.clone(),
1120 security: config.security.clone(),
1121 performance: config.performance.clone(),
1122 monitoring: config.monitoring.clone(),
1123 };
1124
1125 let mut consumer = backend::redis::RedisConsumer::new(stream_config)?;
1126 consumer.connect().await?;
1127 BackendConsumer::Redis(Box::new(consumer))
1128 }
1129 #[cfg(feature = "kinesis")]
1130 StreamBackendType::Kinesis {
1131 region,
1132 stream_name,
1133 credentials,
1134 } => {
1135 let stream_config = crate::StreamConfig {
1136 backend: crate::StreamBackendType::Kinesis {
1137 region: region.clone(),
1138 stream_name: stream_name.clone(),
1139 credentials: credentials.clone(),
1140 },
1141 topic: config.topic.clone(),
1142 batch_size: config.batch_size,
1143 flush_interval_ms: config.flush_interval_ms,
1144 max_connections: config.max_connections,
1145 connection_timeout: config.connection_timeout,
1146 enable_compression: config.enable_compression,
1147 compression_type: config.compression_type.clone(),
1148 retry_config: config.retry_config.clone(),
1149 circuit_breaker: config.circuit_breaker.clone(),
1150 security: config.security.clone(),
1151 performance: config.performance.clone(),
1152 monitoring: config.monitoring.clone(),
1153 };
1154
1155 let mut consumer = backend::kinesis::KinesisConsumer::new(stream_config)?;
1156 consumer.connect().await?;
1157 BackendConsumer::Kinesis(Box::new(consumer))
1158 }
1159 #[cfg(feature = "pulsar")]
1160 StreamBackendType::Pulsar {
1161 service_url,
1162 auth_config,
1163 } => {
1164 let stream_config = crate::StreamConfig {
1165 backend: crate::StreamBackendType::Pulsar {
1166 service_url: service_url.clone(),
1167 auth_config: auth_config.clone(),
1168 },
1169 topic: config.topic.clone(),
1170 batch_size: config.batch_size,
1171 flush_interval_ms: config.flush_interval_ms,
1172 max_connections: config.max_connections,
1173 connection_timeout: config.connection_timeout,
1174 enable_compression: config.enable_compression,
1175 compression_type: config.compression_type.clone(),
1176 retry_config: config.retry_config.clone(),
1177 circuit_breaker: config.circuit_breaker.clone(),
1178 security: config.security.clone(),
1179 performance: config.performance.clone(),
1180 monitoring: config.monitoring.clone(),
1181 };
1182
1183 let mut consumer = backend::pulsar::PulsarConsumer::new(stream_config)?;
1184 consumer.connect().await?;
1185 BackendConsumer::Pulsar(Box::new(consumer))
1186 }
1187 #[cfg(feature = "rabbitmq")]
1188 StreamBackendType::RabbitMQ {
1189 url,
1190 exchange,
1191 queue,
1192 } => {
1193 let stream_config = crate::StreamConfig {
1194 backend: crate::StreamBackendType::RabbitMQ {
1195 url: url.clone(),
1196 exchange: exchange.clone(),
1197 queue: queue.clone(),
1198 },
1199 topic: config.topic.clone(),
1200 batch_size: config.batch_size,
1201 flush_interval_ms: config.flush_interval_ms,
1202 max_connections: config.max_connections,
1203 connection_timeout: config.connection_timeout,
1204 enable_compression: config.enable_compression,
1205 compression_type: config.compression_type.clone(),
1206 retry_config: config.retry_config.clone(),
1207 circuit_breaker: config.circuit_breaker.clone(),
1208 security: config.security.clone(),
1209 performance: config.performance.clone(),
1210 monitoring: config.monitoring.clone(),
1211 };
1212
1213 let mut consumer = backend::rabbitmq::RabbitMQConsumer::new(stream_config)?;
1214 consumer.connect().await?;
1215 BackendConsumer::RabbitMQ(Box::new(consumer))
1216 }
1217 StreamBackendType::Memory {
1218 max_size: _,
1219 persistence: _,
1220 } => {
1221 BackendConsumer::Memory(Box::new(MemoryConsumer::with_topic(config.topic.clone())))
1222 }
1223 };
1224
1225 let stats = Arc::new(RwLock::new(ConsumerStats {
1226 backend_type: match backend_consumer {
1227 #[cfg(feature = "kafka")]
1228 BackendConsumer::Kafka(_) => "kafka".to_string(),
1229 #[cfg(feature = "nats")]
1230 BackendConsumer::Nats(_) => "nats".to_string(),
1231 #[cfg(feature = "redis")]
1232 BackendConsumer::Redis(_) => "redis".to_string(),
1233 #[cfg(feature = "kinesis")]
1234 BackendConsumer::Kinesis(_) => "kinesis".to_string(),
1235 #[cfg(feature = "pulsar")]
1236 BackendConsumer::Pulsar(_) => "pulsar".to_string(),
1237 #[cfg(feature = "rabbitmq")]
1238 BackendConsumer::RabbitMQ(_) => "rabbitmq".to_string(),
1239 BackendConsumer::Memory(_) => "memory".to_string(),
1240 },
1241 _batch_size: config.batch_size,
1242 ..Default::default()
1243 }));
1244
1245 info!(
1246 "Created stream consumer with backend: {} and group: {:?}",
1247 stats.read().await.backend_type,
1248 consumer_group
1249 );
1250
1251 Ok(Self {
1252 _config: config,
1253 backend_consumer,
1254 stats,
1255 circuit_breaker,
1256 last_poll: Instant::now(),
1257 _message_buffer: Arc::new(RwLock::new(Vec::new())),
1258 consumer_group,
1259 })
1260 }
1261
1262 pub async fn consume(&mut self) -> Result<Option<StreamEvent>> {
1264 let start_time = Instant::now();
1265
1266 if let Some(cb) = &self.circuit_breaker {
1268 if !cb.can_execute().await {
1269 self.stats.write().await.circuit_breaker_trips += 1;
1270 return Err(anyhow!("Circuit breaker is open - cannot consume events"));
1271 }
1272 }
1273
1274 let result = self.consume_single_event().await;
1276
1277 match &result {
1279 Ok(Some(_)) => {
1280 if let Some(cb) = &self.circuit_breaker {
1281 cb.record_success_with_duration(start_time.elapsed()).await;
1282 }
1283
1284 let mut stats = self.stats.write().await;
1285 stats.events_consumed += 1;
1286 let processing_time = start_time.elapsed().as_millis() as u64;
1287 stats.max_processing_time_ms = stats.max_processing_time_ms.max(processing_time);
1288 stats.avg_processing_time_ms =
1289 (stats.avg_processing_time_ms + processing_time as f64) / 2.0;
1290 stats.last_message = Some(Utc::now());
1291 }
1292 Ok(None) => {
1293 if let Some(cb) = &self.circuit_breaker {
1295 cb.record_success_with_duration(start_time.elapsed()).await;
1296 }
1297 }
1298 Err(_) => {
1299 if let Some(cb) = &self.circuit_breaker {
1300 cb.record_failure_with_type(circuit_breaker::FailureType::NetworkError)
1301 .await;
1302 }
1303
1304 self.stats.write().await.events_failed += 1;
1305 }
1306 }
1307
1308 self.last_poll = Instant::now();
1309 result
1310 }
1311
1312 async fn consume_single_event(&mut self) -> Result<Option<StreamEvent>> {
1314 match &mut self.backend_consumer {
1315 #[cfg(feature = "kafka")]
1316 BackendConsumer::Kafka(consumer) => consumer.consume().await,
1317 #[cfg(feature = "nats")]
1318 BackendConsumer::Nats(consumer) => consumer.consume().await,
1319 #[cfg(feature = "redis")]
1320 BackendConsumer::Redis(consumer) => consumer.consume().await,
1321 #[cfg(feature = "kinesis")]
1322 BackendConsumer::Kinesis(consumer) => consumer.consume().await,
1323 #[cfg(feature = "pulsar")]
1324 BackendConsumer::Pulsar(consumer) => consumer.consume().await,
1325 #[cfg(feature = "rabbitmq")]
1326 BackendConsumer::RabbitMQ(consumer) => consumer.consume().await,
1327 BackendConsumer::Memory(consumer) => consumer.consume().await,
1328 }
1329 }
1330
1331 pub async fn consume_batch(
1333 &mut self,
1334 max_events: usize,
1335 timeout: Duration,
1336 ) -> Result<Vec<StreamEvent>> {
1337 let mut events = Vec::new();
1338 let start_time = Instant::now();
1339
1340 while events.len() < max_events && start_time.elapsed() < timeout {
1341 match tokio::time::timeout(Duration::from_millis(50), self.consume()).await {
1342 Ok(Ok(Some(event))) => events.push(event),
1343 Ok(Ok(None)) => continue,
1344 Ok(Err(e)) => return Err(e),
1345 Err(_) => break, }
1347 }
1348
1349 if !events.is_empty() {
1350 debug!(
1351 "Consumed batch of {} events in {:?}",
1352 events.len(),
1353 start_time.elapsed()
1354 );
1355 }
1356
1357 Ok(events)
1358 }
1359
1360 pub async fn start_consuming<F>(&mut self, mut callback: F) -> Result<()>
1362 where
1363 F: FnMut(StreamEvent) -> Result<()> + Send,
1364 {
1365 info!("Starting stream consumer loop");
1366
1367 loop {
1368 match self.consume().await {
1369 Ok(Some(event)) => {
1370 if let Err(e) = callback(event) {
1371 error!("Callback error: {}", e);
1372 self.stats.write().await.events_failed += 1;
1373 }
1374 }
1375 Ok(None) => {
1376 tokio::time::sleep(Duration::from_millis(10)).await;
1378 }
1379 Err(e) => {
1380 error!("Consumer error: {}", e);
1381 tokio::time::sleep(Duration::from_millis(100)).await;
1382 }
1383 }
1384 }
1385 }
1386
1387 pub async fn start_consuming_async<F, Fut>(&mut self, mut callback: F) -> Result<()>
1389 where
1390 F: FnMut(StreamEvent) -> Fut + Send,
1391 Fut: std::future::Future<Output = Result<()>> + Send,
1392 {
1393 info!("Starting async stream consumer loop");
1394
1395 loop {
1396 match self.consume().await {
1397 Ok(Some(event)) => {
1398 if let Err(e) = callback(event).await {
1399 error!("Async callback error: {}", e);
1400 self.stats.write().await.events_failed += 1;
1401 }
1402 }
1403 Ok(None) => {
1404 tokio::time::sleep(Duration::from_millis(10)).await;
1406 }
1407 Err(e) => {
1408 error!("Consumer error: {}", e);
1409 tokio::time::sleep(Duration::from_millis(100)).await;
1410 }
1411 }
1412 }
1413 }
1414
1415 pub async fn get_stats(&self) -> ConsumerStats {
1417 self.stats.read().await.clone()
1418 }
1419
1420 pub async fn health_check(&self) -> bool {
1422 if let Some(cb) = &self.circuit_breaker {
1423 cb.is_healthy().await
1424 } else {
1425 true
1426 }
1427 }
1428
1429 pub fn consumer_group(&self) -> Option<&String> {
1431 self.consumer_group.as_ref()
1432 }
1433
1434 pub async fn reset_position(&mut self) -> Result<()> {
1436 match &mut self.backend_consumer {
1437 BackendConsumer::Memory(consumer) => {
1438 consumer.reset();
1439 Ok(())
1440 }
1441 #[cfg(feature = "kafka")]
1442 BackendConsumer::Kafka(_) => {
1443 Err(anyhow!("Reset position not supported for Kafka backend"))
1444 }
1445 #[cfg(feature = "nats")]
1446 BackendConsumer::Nats(_) => {
1447 Err(anyhow!("Reset position not supported for NATS backend"))
1448 }
1449 #[cfg(feature = "redis")]
1450 BackendConsumer::Redis(_) => {
1451 Err(anyhow!("Reset position not supported for Redis backend"))
1452 }
1453 #[cfg(feature = "kinesis")]
1454 BackendConsumer::Kinesis(_) => {
1455 Err(anyhow!("Reset position not supported for Kinesis backend"))
1456 }
1457 #[cfg(feature = "pulsar")]
1458 BackendConsumer::Pulsar(_) => {
1459 Err(anyhow!("Reset position not supported for Pulsar backend"))
1460 }
1461 #[cfg(feature = "rabbitmq")]
1462 BackendConsumer::RabbitMQ(_) => {
1463 Err(anyhow!("Reset position not supported for RabbitMQ backend"))
1464 }
1465 }
1466 }
1467
1468 pub async fn set_test_events(&mut self, _events: Vec<StreamEvent>) -> Result<()> {
1470 Err(anyhow!("set_test_events is deprecated with backend implementation - use producer to publish events"))
1473 }
1474}
1475
1476#[derive(Debug, Clone, Serialize, Deserialize)]
1478pub enum PatchOperation {
1479 Add {
1481 subject: String,
1482 predicate: String,
1483 object: String,
1484 },
1485 Delete {
1487 subject: String,
1488 predicate: String,
1489 object: String,
1490 },
1491 AddGraph { graph: String },
1493 DeleteGraph { graph: String },
1495 AddPrefix { prefix: String, namespace: String },
1497 DeletePrefix { prefix: String },
1499 TransactionBegin { transaction_id: Option<String> },
1501 TransactionCommit,
1503 TransactionAbort,
1505 Header { key: String, value: String },
1507}
1508
1509#[derive(Debug, Clone, Serialize, Deserialize)]
1511pub struct RdfPatch {
1512 pub operations: Vec<PatchOperation>,
1513 pub timestamp: chrono::DateTime<chrono::Utc>,
1514 pub id: String,
1515 pub headers: HashMap<String, String>,
1517 pub transaction_id: Option<String>,
1519 pub prefixes: HashMap<String, String>,
1521}
1522
1523impl RdfPatch {
1524 pub fn new() -> Self {
1526 Self {
1527 operations: Vec::new(),
1528 timestamp: chrono::Utc::now(),
1529 id: uuid::Uuid::new_v4().to_string(),
1530 headers: HashMap::new(),
1531 transaction_id: None,
1532 prefixes: HashMap::new(),
1533 }
1534 }
1535
1536 pub fn add_operation(&mut self, operation: PatchOperation) {
1538 self.operations.push(operation);
1539 }
1540
1541 pub fn to_rdf_patch_format(&self) -> Result<String> {
1543 let serializer = crate::patch::PatchSerializer::new()
1544 .with_pretty_print(true)
1545 .with_metadata(true);
1546 serializer.serialize(self)
1547 }
1548
1549 pub fn from_rdf_patch_format(input: &str) -> Result<Self> {
1551 let mut parser = crate::patch::PatchParser::new().with_strict_mode(false);
1552 parser.parse(input)
1553 }
1554}
1555
1556impl Default for RdfPatch {
1557 fn default() -> Self {
1558 Self::new()
1559 }
1560}
1561
1562impl Default for StreamConfig {
1564 fn default() -> Self {
1565 Self {
1566 backend: StreamBackendType::Memory {
1567 max_size: Some(10000),
1568 persistence: false,
1569 },
1570 topic: "oxirs-stream".to_string(),
1571 batch_size: 100,
1572 flush_interval_ms: 100,
1573 max_connections: 10,
1574 connection_timeout: Duration::from_secs(30),
1575 enable_compression: false,
1576 compression_type: CompressionType::None,
1577 retry_config: RetryConfig::default(),
1578 circuit_breaker: CircuitBreakerConfig::default(),
1579 security: SecurityConfig::default(),
1580 performance: StreamPerformanceConfig::default(),
1581 monitoring: MonitoringConfig::default(),
1582 }
1583 }
1584}
1585
1586impl Default for RetryConfig {
1587 fn default() -> Self {
1588 Self {
1589 max_retries: 3,
1590 initial_backoff: Duration::from_millis(100),
1591 max_backoff: Duration::from_secs(30),
1592 backoff_multiplier: 2.0,
1593 jitter: true,
1594 }
1595 }
1596}
1597
1598impl Default for CircuitBreakerConfig {
1599 fn default() -> Self {
1600 Self {
1601 enabled: true,
1602 failure_threshold: 5,
1603 success_threshold: 3,
1604 timeout: Duration::from_secs(30),
1605 half_open_max_calls: 3,
1606 }
1607 }
1608}
1609
1610impl Default for SecurityConfig {
1611 fn default() -> Self {
1612 Self {
1613 enable_tls: false,
1614 verify_certificates: true,
1615 client_cert_path: None,
1616 client_key_path: None,
1617 ca_cert_path: None,
1618 sasl_config: None,
1619 }
1620 }
1621}
1622
1623impl Default for StreamPerformanceConfig {
1624 fn default() -> Self {
1625 Self {
1626 enable_batching: true,
1627 enable_pipelining: false,
1628 buffer_size: 8192,
1629 prefetch_count: 100,
1630 enable_zero_copy: false,
1631 enable_simd: false,
1632 parallel_processing: true,
1633 worker_threads: None,
1634 }
1635 }
1636}
1637
1638impl Default for MonitoringConfig {
1639 fn default() -> Self {
1640 Self {
1641 enable_metrics: true,
1642 enable_tracing: true,
1643 metrics_interval: Duration::from_secs(60),
1644 health_check_interval: Duration::from_secs(30),
1645 enable_profiling: false,
1646 prometheus_endpoint: None,
1647 jaeger_endpoint: None,
1648 log_level: "info".to_string(),
1649 }
1650 }
1651}
1652
1653impl StreamConfig {
1655 #[cfg(feature = "redis")]
1657 pub fn redis(url: String) -> Self {
1658 Self {
1659 backend: StreamBackendType::Redis {
1660 url,
1661 cluster_urls: None,
1662 pool_size: Some(10),
1663 },
1664 ..Default::default()
1665 }
1666 }
1667
1668 #[cfg(feature = "kinesis")]
1670 pub fn kinesis(region: String, stream_name: String) -> Self {
1671 Self {
1672 backend: StreamBackendType::Kinesis {
1673 region,
1674 stream_name,
1675 credentials: None,
1676 },
1677 ..Default::default()
1678 }
1679 }
1680
1681 pub fn memory() -> Self {
1683 Self {
1684 backend: StreamBackendType::Memory {
1685 max_size: Some(1000),
1686 persistence: false,
1687 },
1688 ..Default::default()
1689 }
1690 }
1691
1692 pub fn high_performance(mut self) -> Self {
1694 self.performance.enable_batching = true;
1695 self.performance.enable_pipelining = true;
1696 self.performance.parallel_processing = true;
1697 self.performance.buffer_size = 65536;
1698 self.performance.prefetch_count = 1000;
1699 self.batch_size = 1000;
1700 self.flush_interval_ms = 10;
1701 self
1702 }
1703
1704 pub fn with_compression(mut self, compression_type: CompressionType) -> Self {
1706 self.enable_compression = true;
1707 self.compression_type = compression_type;
1708 self
1709 }
1710
1711 pub fn with_circuit_breaker(mut self, enabled: bool, failure_threshold: u32) -> Self {
1713 self.circuit_breaker.enabled = enabled;
1714 self.circuit_breaker.failure_threshold = failure_threshold;
1715 self
1716 }
1717
1718 pub fn development(topic: &str) -> Self {
1720 Self {
1721 backend: StreamBackendType::Memory {
1722 max_size: Some(10000),
1723 persistence: false,
1724 },
1725 topic: topic.to_string(),
1726 batch_size: 10,
1727 flush_interval_ms: 100,
1728 max_connections: 5,
1729 connection_timeout: Duration::from_secs(10),
1730 enable_compression: false,
1731 compression_type: CompressionType::None,
1732 retry_config: RetryConfig {
1733 max_retries: 3,
1734 initial_backoff: Duration::from_millis(100),
1735 max_backoff: Duration::from_secs(5),
1736 backoff_multiplier: 2.0,
1737 jitter: true,
1738 },
1739 circuit_breaker: CircuitBreakerConfig {
1740 enabled: false,
1741 failure_threshold: 5,
1742 success_threshold: 2,
1743 timeout: Duration::from_secs(60),
1744 half_open_max_calls: 10,
1745 },
1746 security: SecurityConfig::default(),
1747 performance: StreamPerformanceConfig::default(),
1748 monitoring: MonitoringConfig {
1749 enable_metrics: true,
1750 enable_tracing: false,
1751 metrics_interval: Duration::from_secs(5),
1752 health_check_interval: Duration::from_secs(30),
1753 enable_profiling: false,
1754 prometheus_endpoint: None,
1755 jaeger_endpoint: None,
1756 log_level: "debug".to_string(),
1757 },
1758 }
1759 }
1760
1761 pub fn production(topic: &str) -> Self {
1763 Self {
1764 backend: StreamBackendType::Memory {
1765 max_size: Some(100000),
1766 persistence: true,
1767 },
1768 topic: topic.to_string(),
1769 batch_size: 1000,
1770 flush_interval_ms: 10,
1771 max_connections: 50,
1772 connection_timeout: Duration::from_secs(30),
1773 enable_compression: true,
1774 compression_type: CompressionType::Zstd,
1775 retry_config: RetryConfig {
1776 max_retries: 5,
1777 initial_backoff: Duration::from_millis(200),
1778 max_backoff: Duration::from_secs(30),
1779 backoff_multiplier: 2.0,
1780 jitter: true,
1781 },
1782 circuit_breaker: CircuitBreakerConfig {
1783 enabled: true,
1784 failure_threshold: 10,
1785 success_threshold: 3,
1786 timeout: Duration::from_secs(30),
1787 half_open_max_calls: 5,
1788 },
1789 security: SecurityConfig::default(),
1790 performance: StreamPerformanceConfig {
1791 enable_batching: true,
1792 enable_pipelining: true,
1793 parallel_processing: true,
1794 buffer_size: 65536,
1795 prefetch_count: 1000,
1796 enable_zero_copy: true,
1797 enable_simd: true,
1798 worker_threads: None,
1799 },
1800 monitoring: MonitoringConfig {
1801 enable_metrics: true,
1802 enable_tracing: true,
1803 metrics_interval: Duration::from_secs(1),
1804 health_check_interval: Duration::from_secs(10),
1805 enable_profiling: true,
1806 prometheus_endpoint: None,
1807 jaeger_endpoint: None,
1808 log_level: "info".to_string(),
1809 },
1810 }
1811 }
1812}
1813
1814pub struct Stream {
1816 producer: StreamProducer,
1817 consumer: StreamConsumer,
1818}
1819
1820impl Stream {
1821 pub async fn new(config: StreamConfig) -> Result<Self> {
1823 let producer = StreamProducer::new(config.clone()).await?;
1830 let consumer = StreamConsumer::new(config).await?;
1831
1832 Ok(Self { producer, consumer })
1833 }
1834
1835 pub async fn publish(&mut self, event: StreamEvent) -> Result<()> {
1837 self.producer.publish(event).await
1838 }
1839
1840 pub async fn consume(&mut self) -> Result<Option<StreamEvent>> {
1842 self.consumer.consume().await
1843 }
1844
1845 pub async fn flush(&mut self) -> Result<()> {
1847 self.producer.flush().await
1848 }
1849
1850 pub async fn producer_stats(&self) -> ProducerStats {
1852 self.producer.get_stats().await
1853 }
1854
1855 pub async fn consumer_stats(&self) -> ConsumerStats {
1857 self.consumer.get_stats().await
1858 }
1859
1860 pub async fn close(&mut self) -> Result<()> {
1862 self.producer.flush().await?;
1864
1865 debug!("Stream closed successfully");
1868 Ok(())
1869 }
1870
1871 pub async fn health_check(&self) -> Result<bool> {
1873 Ok(true)
1876 }
1877
1878 pub async fn begin_transaction(&mut self) -> Result<()> {
1880 debug!("Transaction begun (placeholder)");
1882 Ok(())
1883 }
1884
1885 pub async fn commit_transaction(&mut self) -> Result<()> {
1887 debug!("Transaction committed (placeholder)");
1889 Ok(())
1890 }
1891
1892 pub async fn rollback_transaction(&mut self) -> Result<()> {
1894 debug!("Transaction rolled back (placeholder)");
1896 Ok(())
1897 }
1898}