oxirs_stream/
lib.rs

1//! # OxiRS Stream - Ultra-High Performance RDF Streaming Platform
2//!
3//! [![Version](https://img.shields.io/badge/version-0.1.0-blue)](https://github.com/cool-japan/oxirs/releases)
4//! [![docs.rs](https://docs.rs/oxirs-stream/badge.svg)](https://docs.rs/oxirs-stream)
5//!
6//! **Status**: Production Release (v0.1.0)
7//! **Stability**: Public APIs are stable. Production-ready with comprehensive testing.
8//!
9//! Real-time streaming support with Kafka/NATS/Redis I/O, RDF Patch, SPARQL Update delta,
10//! and advanced event processing capabilities.
11//!
12//! This crate provides enterprise-grade real-time data streaming capabilities for RDF datasets,
13//! supporting multiple messaging backends with high-throughput, low-latency guarantees.
14//!
15//! ## Features
16//! - **Multi-Backend Support**: Kafka, NATS JetStream, Redis Streams, AWS Kinesis, Memory
17//! - **High Performance**: 100K+ events/second, <10ms latency, exactly-once delivery
18//! - **Advanced Event Processing**: Real-time pattern detection, windowing, aggregations
19//! - **Enterprise Features**: Circuit breakers, connection pooling, health monitoring
20//! - **Standards Compliance**: RDF Patch protocol, SPARQL Update streaming
21//!
22//! ## Performance Targets
23//! - **Throughput**: 100K+ events/second sustained
24//! - **Latency**: P99 <10ms for real-time processing
25//! - **Reliability**: 99.99% delivery success rate
26//! - **Scalability**: Linear scaling to 1000+ partitions
27
28#![allow(dead_code)]
29
30use anyhow::{anyhow, Result};
31use chrono::{DateTime, Utc};
32use serde::{Deserialize, Serialize};
33use std::collections::HashMap;
34use std::sync::Arc;
35use std::time::{Duration, Instant};
36use tokio::sync::{RwLock, Semaphore};
37use tracing::{debug, error, info};
38use uuid::Uuid;
39
40use crate::backend::StreamBackend;
41
42/// Re-export commonly used types for convenience
43pub use backend_optimizer::{
44    BackendOptimizer, BackendPerformance, BackendRecommendation, ConsistencyLevel, CostModel,
45    OptimizationDecision, OptimizationStats, OptimizerConfig, PatternType, WorkloadPattern,
46};
47pub use backpressure::{
48    BackpressureConfig, BackpressureController, BackpressureStats, BackpressureStrategy,
49    FlowControlSignal, RateLimiter as BackpressureRateLimiter,
50};
51pub use bridge::{
52    BridgeInfo, BridgeStatistics, BridgeType, ExternalMessage, ExternalSystemConfig,
53    ExternalSystemType, MessageBridgeManager, MessageTransformer, RoutingRule,
54};
55pub use circuit_breaker::{
56    CircuitBreakerError, CircuitBreakerMetrics, FailureType, SharedCircuitBreakerExt,
57};
58pub use connection_pool::{
59    ConnectionFactory, ConnectionPool, DetailedPoolMetrics, LoadBalancingStrategy, PoolConfig,
60    PoolStats, PoolStatus,
61};
62pub use cqrs::{
63    CQRSConfig, CQRSHealthStatus, CQRSSystem, Command, CommandBus, CommandBusMetrics,
64    CommandHandler, CommandResult, Query, QueryBus, QueryBusMetrics, QueryCacheConfig,
65    QueryHandler, QueryResult as CQRSQueryResult, ReadModelManager, ReadModelMetrics,
66    ReadModelProjection, RetryConfig as CQRSRetryConfig,
67};
68pub use delta::{BatchDeltaProcessor, DeltaComputer, DeltaProcessor, ProcessorStats};
69pub use dlq::{
70    DeadLetterQueue, DlqConfig, DlqEventProcessor, DlqStats as DlqStatsExport, FailedEvent,
71    FailureReason,
72};
73pub use event::{
74    EventCategory, EventMetadata, EventPriority, IsolationLevel, QueryResult as EventQueryResult,
75    SchemaChangeType, SchemaType, SparqlOperationType, StreamEvent,
76};
77pub use event_sourcing::{
78    EventQuery, EventSnapshot, EventStore, EventStoreConfig, PersistenceBackend, QueryOrder,
79    RetentionPolicy, SnapshotConfig, StoredEvent, TimeRange as EventSourcingTimeRange,
80};
81pub use failover::{ConnectionEndpoint, FailoverConfig, FailoverManager};
82pub use graphql_bridge::{
83    BridgeConfig, BridgeStats, GraphQLBridge, GraphQLSubscription, GraphQLUpdate,
84    GraphQLUpdateType, SubscriptionFilter,
85};
86pub use multi_region_replication::{
87    ConflictResolution, ConflictType, GeographicLocation, MultiRegionReplicationManager,
88    RegionConfig, RegionHealth, ReplicatedEvent, ReplicationConfig, ReplicationStats,
89    ReplicationStrategy, VectorClock,
90};
91pub use patch::{PatchParser, PatchSerializer};
92pub use performance_optimizer::{
93    AdaptiveBatcher, AggregationFunction, AutoTuner, BatchPerformancePoint, BatchSizePredictor,
94    BatchingStats, EnhancedMLConfig, MemoryPool, MemoryPoolStats,
95    PerformanceConfig as OptimizerPerformanceConfig, ProcessingResult, ProcessingStats,
96    ProcessingStatus, TuningDecision, ZeroCopyEvent,
97};
98pub use schema_registry::{
99    CompatibilityMode, ExternalRegistryConfig, RegistryAuth, SchemaDefinition, SchemaFormat,
100    SchemaRegistry, SchemaRegistryConfig, ValidationResult, ValidationStats,
101};
102pub use sparql_streaming::{
103    ContinuousQueryManager, QueryManagerConfig, QueryMetadata, QueryResultChannel,
104    QueryResultUpdate, UpdateType,
105};
106pub use store_integration::{
107    ChangeDetectionStrategy, ChangeNotification, RealtimeUpdateManager, StoreChangeDetector,
108    UpdateChannel, UpdateFilter, UpdateNotification,
109};
110
111// Stream, StreamConsumer, and StreamProducer are defined below in this module
112pub use biological_computing::{
113    AminoAcid, BiologicalProcessingStats, BiologicalStreamProcessor, Cell, CellState,
114    CellularAutomaton, ComputationalFunction, DNASequence, EvolutionaryOptimizer, FunctionalDomain,
115    Individual, Nucleotide, ProteinStructure, SequenceMetadata,
116};
117pub use consciousness_streaming::{
118    ConsciousnessLevel, ConsciousnessStats, ConsciousnessStreamProcessor, DreamSequence,
119    EmotionalContext, IntuitiveEngine, MeditationState,
120};
121pub use disaster_recovery::{
122    BackupCompression, BackupConfig, BackupEncryption, BackupFrequency, BackupJob,
123    BackupRetentionPolicy, BackupSchedule, BackupStatus, BackupStorage, BackupType,
124    BackupVerification, BackupVerificationResult, BackupWindow, BusinessContinuityConfig,
125    ChecksumAlgorithm, CompressionAlgorithm, DRMetrics, DisasterRecoveryConfig,
126    DisasterRecoveryManager, DisasterScenario, EncryptionAlgorithm as BackupEncryptionAlgorithm,
127    FailoverConfig as DRFailoverConfig, ImpactLevel, KeyDerivationFunction, RecoveryConfig,
128    RecoveryOperation, RecoveryPriority, RecoveryRunbook, RecoveryStatus, RecoveryType,
129    ReplicationConfig as DRReplicationConfig, ReplicationMode as DRReplicationMode,
130    ReplicationTarget as DRReplicationTarget, RunbookExecution, RunbookExecutionStatus,
131    RunbookStep, StorageLocation,
132};
133pub use enterprise_audit::{
134    ActionResult, AuditEncryptionConfig, AuditEventType, AuditFilterConfig, AuditMetrics,
135    AuditRetentionConfig, AuditSeverity, AuditStorageBackend, AuditStorageConfig,
136    AuditStreamingConfig, AuthType, ComplianceConfig, ComplianceFinding, ComplianceReport,
137    ComplianceStandard, CompressionType as AuditCompressionType, DestinationAuth, DestinationType,
138    EncryptionAlgorithm, EnterpriseAuditConfig, EnterpriseAuditEvent, EnterpriseAuditLogger,
139    FindingType, KeyManagementConfig, KmsType, S3AuditConfig, StreamingDestination,
140};
141pub use enterprise_monitoring::{
142    Alert, AlertCondition, AlertManager, AlertRule, AlertSeverity as MonitoringAlertSeverity,
143    AlertingConfig, BreachNotificationConfig, ComparisonOperator, EnterpriseMonitoringConfig,
144    EnterpriseMonitoringSystem, EscalationLevel, EscalationPolicy, HealthCheckConfig,
145    HealthCheckEndpoint, HealthCheckType, MeasurementWindow, MetricDefinition, MetricType,
146    MetricValue, MetricsCollector, MetricsConfig, MetricsEndpoint, MetricsEndpointType,
147    MetricsExportConfig, MetricsFormat, NotificationChannel, ProfilingConfig, SlaBreach, SlaConfig,
148    SlaMeasurement, SlaMetricType, SlaObjective, SlaSeverity, SlaStatus, SlaTracker,
149};
150pub use multi_tenancy::{
151    IsolationMode, MultiTenancyConfig, MultiTenancyManager, MultiTenancyMetrics,
152    NamespaceResources, ResourceAllocationStrategy, ResourceType, ResourceUsage, Tenant,
153    TenantLifecycleConfig, TenantNamespace, TenantQuota, TenantStatus, TenantTier,
154};
155pub use observability::{
156    AlertConfig, AlertEvent, AlertSeverity, AlertType, BusinessMetrics, SpanLog, SpanStatus,
157    StreamObservability, StreamingMetrics, TelemetryConfig, TraceSpan,
158};
159pub use performance_utils::{
160    AdaptiveRateLimiter, IntelligentMemoryPool, IntelligentPrefetcher, ParallelStreamProcessor,
161    PerformanceUtilsConfig,
162};
163pub use quantum_communication::{
164    BellState, EntanglementDistribution, QuantumCommConfig, QuantumCommSystem,
165    QuantumOperation as QuantumCommOperation, QuantumSecurityProtocol,
166    QuantumState as QuantumCommState, Qubit,
167};
168pub use quantum_streaming::{
169    QuantumEvent, QuantumOperation, QuantumProcessingStats, QuantumState, QuantumStreamProcessor,
170};
171pub use reliability::{BulkReplayResult, DlqStats, ReplayStatus};
172pub use rsp::{
173    RspConfig, RspLanguage, RspProcessor, RspQuery, StreamClause, StreamDescriptor, Window,
174    WindowConfig, WindowSize, WindowStats, WindowType,
175};
176pub use security::{
177    AuditConfig, AuditLogEntry, AuditLogger, AuthConfig, AuthMethod, AuthenticationProvider,
178    AuthorizationProvider, AuthzConfig, Credentials, EncryptionConfig, Permission, RateLimitConfig,
179    RateLimiter, SecurityConfig as StreamSecurityConfig, SecurityContext, SecurityManager,
180    SecurityMetrics, SessionConfig, ThreatAlert, ThreatDetectionConfig, ThreatDetector,
181};
182pub use temporal_join::{
183    IntervalJoin, JoinResult, LateDataConfig, LateDataStrategy, TemporalJoin, TemporalJoinConfig,
184    TemporalJoinMetrics, TemporalJoinType, TemporalWindow, TimeSemantics, WatermarkConfig,
185    WatermarkStrategy,
186};
187pub use time_travel::{
188    AggregationType, TemporalAggregations, TemporalFilter, TemporalOrdering, TemporalProjection,
189    TemporalQuery, TemporalQueryResult, TemporalResultMetadata, TemporalStatistics, TimePoint,
190    TimeRange as TimeTravelTimeRange, TimeTravelConfig, TimeTravelEngine, TimeTravelMetrics,
191    TimelinePoint,
192};
193pub use tls_security::{
194    CertRotationConfig, CertificateConfig, CertificateFormat, CertificateInfo, CipherSuite,
195    ExpiryWarning, MutualTlsConfig, OcspConfig, RevocationCheckConfig, SessionResumptionConfig,
196    TlsConfig, TlsManager, TlsMetrics, TlsSessionInfo, TlsVersion,
197};
198pub use wasm_edge_computing::{
199    EdgeExecutionResult, EdgeLocation, OptimizationLevel, PerformanceProfile, PluginCapability,
200    PluginSchema, ProcessingSpecialization, ResourceMetrics, SecurityLevel, WasmEdgeConfig,
201    WasmEdgeProcessor, WasmPlugin, WasmProcessingResult, WasmProcessorStats, WasmResourceLimits,
202};
203pub use webhook::{
204    EventFilter as WebhookEventFilter, HttpMethod, RateLimit, RetryConfig as WebhookRetryConfig,
205    WebhookConfig, WebhookInfo, WebhookManager, WebhookMetadata, WebhookSecurity,
206    WebhookStatistics,
207};
208
209// New v0.1.0 feature exports
210pub use custom_serialization::{
211    BenchmarkResults, BsonSerializer, CustomSerializer, FlexBuffersSerializer, IonSerializer,
212    RonSerializer, SerializerBenchmark, SerializerBenchmarkSuite, SerializerRegistry,
213    SerializerStats, ThriftSerializer,
214};
215pub use end_to_end_encryption::{
216    E2EEConfig, E2EEEncryptionAlgorithm, E2EEManager, E2EEStats, EncryptedMessage,
217    HomomorphicEncryption, KeyExchangeAlgorithm, KeyPair, KeyRotationConfig, MultiPartyConfig,
218    ZeroKnowledgeProof,
219};
220pub use gpu_acceleration::{
221    AggregationOp, GpuBackend, GpuBuffer, GpuConfig, GpuContext, GpuProcessorConfig, GpuStats,
222    GpuStreamProcessor,
223};
224pub use ml_integration::{
225    AnomalyDetectionAlgorithm, AnomalyDetectionConfig, AnomalyDetector, AnomalyResult,
226    AnomalyStats, FeatureConfig, FeatureExtractor, FeatureVector, MLIntegrationManager,
227    MLModelConfig, ModelMetrics, ModelType, OnlineLearningModel, PredictionResult,
228};
229pub use rate_limiting::{
230    QuotaCheckResult, QuotaEnforcementMode, QuotaLimits, QuotaManager, QuotaOperation,
231    RateLimitAlgorithm, RateLimitConfig as AdvancedRateLimitConfig, RateLimitMonitoringConfig,
232    RateLimitStats as AdvancedRateLimitStats, RateLimiter as AdvancedRateLimiter,
233    RejectionStrategy,
234};
235pub use scalability::{
236    AdaptiveBuffer, AutoScaler, LoadBalancingStrategy as ScalingLoadBalancingStrategy,
237    Node as ScalingNode, NodeHealth, Partition, PartitionManager, PartitionStrategy,
238    ResourceLimits, ResourceUsage as ScalingResourceUsage, ScalingConfig, ScalingDirection,
239    ScalingMode,
240};
241pub use schema_evolution::{
242    CompatibilityCheckResult, CompatibilityIssue, CompatibilityIssueType,
243    CompatibilityMode as SchemaCompatibilityMode, DeprecationInfo, EvolutionResult,
244    FieldDefinition, FieldType, IssueSeverity, MigrationRule, MigrationStrategy, SchemaChange,
245    SchemaDefinition as SchemaEvolutionDefinition, SchemaEvolutionManager,
246    SchemaFormat as SchemaEvolutionFormat, SchemaVersion,
247};
248pub use stream_replay::{
249    EventProcessor, ReplayCheckpoint, ReplayConfig, ReplayFilter, ReplayMode, ReplaySpeed,
250    ReplayStats, ReplayStatus as StreamReplayStatus, ReplayTransformation, StateSnapshot,
251    StreamReplayManager, TransformationType,
252};
253pub use transactional_processing::{
254    IsolationLevel as TransactionalIsolationLevel, LogEntryType, TransactionCheckpoint,
255    TransactionLogEntry, TransactionMetadata, TransactionState, TransactionalConfig,
256    TransactionalProcessor, TransactionalStats,
257};
258pub use zero_copy::{
259    MemoryMappedBuffer, SharedRefBuffer, SimdBatchProcessor, SimdOperation, SplicedBuffer,
260    ZeroCopyBuffer, ZeroCopyConfig, ZeroCopyManager, ZeroCopyStats,
261};
262
263// New v0.1.0 exports for developer experience and performance
264pub use numa_processing::{
265    CpuAffinityMode, HugePageSize, MemoryBandwidthMonitor, MemoryInterleavePolicy, NodeBufferStats,
266    NodeProcessorStats, NumaAllocationStrategy, NumaBuffer, NumaBufferPool, NumaBufferPoolConfig,
267    NumaBufferPoolStats, NumaConfig, NumaNode, NumaProcessorStats, NumaStreamProcessor,
268    NumaThreadPool, NumaThreadPoolStats, NumaTopology, NumaWorker, NumaWorkerStats,
269    WorkerDistributionStrategy,
270};
271pub use out_of_order::{
272    EmitStrategy, GapFillingStrategy, LateEventStrategy, OrderedEvent, OutOfOrderConfig,
273    OutOfOrderHandler, OutOfOrderHandlerBuilder, OutOfOrderStats, SequenceTracker, Watermark,
274};
275pub use performance_profiler::{
276    HistogramStats, LatencyHistogram, OperationTimer, PerformanceProfiler, PerformanceReport,
277    PerformanceSample, PerformanceWarning, ProfilerBuilder, ProfilerConfig, ProfilerStats,
278    Recommendation, RecommendationCategory, RecommendationEffort, RecommendationImpact, Span,
279    WarningSeverity, WarningThresholds, WarningType,
280};
281pub use stream_sql::{
282    AggregateFunction, BinaryOperator, ColumnDefinition, CreateStreamStatement, DataType,
283    Expression, FromClause, JoinType, Lexer, OrderByItem, Parser,
284    QueryResult as StreamSqlQueryResult, QueryType, ResultRow, SelectItem, SelectStatement,
285    SqlValue, StreamMetadata, StreamSqlConfig, StreamSqlEngine, StreamSqlStats, Token,
286    UnaryOperator, WindowSpec, WindowType as SqlWindowType,
287};
288pub use testing_framework::{
289    Assertion, AssertionType, CapturedEvent, EventGenerator, EventMatcher, GeneratorConfig,
290    GeneratorType, MockClock, PerformanceMetric, TestFixture, TestHarness, TestHarnessBuilder,
291    TestHarnessConfig, TestMetrics, TestReport, TestStatus,
292};
293
294// New v0.1.0 exports for ML, versioning, and migration
295pub use anomaly_detection::{
296    Anomaly, AnomalyAlert, AnomalyConfig, AnomalyDetector as AdaptiveAnomalyDetector,
297    AnomalySeverity, AnomalyStats as AdaptiveAnomalyStats, DetectorType, MultiDimensionalDetector,
298};
299pub use migration_tools::{
300    APIMapping, ConceptMapping, GeneratedFile, GeneratedFileType, ManualReviewItem,
301    MigrationConfig, MigrationError, MigrationReport, MigrationSuggestion, MigrationTool,
302    MigrationWarning, QuickStart, ReviewPriority, SourcePlatform, SuggestionCategory,
303};
304pub use online_learning::{
305    ABTestConfig, ABTestResult, DriftDetection, ModelCheckpoint,
306    ModelMetrics as OnlineModelMetrics, ModelType as OnlineModelType, OnlineLearningConfig,
307    OnlineLearningModel as StreamOnlineLearningModel, OnlineLearningStats, Prediction, Sample,
308    StreamFeatureExtractor,
309};
310pub use stream_versioning::{
311    Branch, BranchId, Change, ChangeType, Changeset, Snapshot, StreamVersioning, TimeTravelQuery,
312    TimeTravelTarget, VersionDiff, VersionId, VersionMetadata, VersionedEvent, VersioningConfig,
313    VersioningStats,
314};
315
316// New v0.1.0 advanced ML exports
317pub use automl_stream::{
318    Algorithm, AutoML, AutoMLConfig, AutoMLStats, HyperParameters, ModelPerformance, TaskType,
319    TrainedModel,
320};
321pub use feature_engineering::{
322    Feature, FeatureExtractionConfig, FeatureMetadata, FeaturePipeline, FeatureSet, FeatureStore,
323    FeatureTransform, FeatureValue, ImputationStrategy, PipelineStats,
324};
325pub use neural_architecture_search::{
326    ActivationType, Architecture, ArchitecturePerformance, LayerType, NASConfig, NASStats,
327    ObjectiveWeights, SearchSpace, SearchStrategy, NAS,
328};
329pub use predictive_analytics::{
330    AccuracyMetrics, ForecastAlgorithm, ForecastResult, ForecastingConfig, PredictiveAnalytics,
331    PredictiveStats, SeasonalityType, TrendDirection,
332};
333pub use reinforcement_learning::{
334    Action, Experience, RLAgent, RLAlgorithm, RLConfig, RLStats, State as RLState,
335};
336
337// Utility exports
338pub use utils::{
339    create_dev_stream, create_prod_stream, BatchProcessor, EventFilter, EventSampler,
340    SimpleRateLimiter, StreamMultiplexer, StreamStats,
341};
342
343// Advanced SciRS2 optimization exports
344pub use advanced_scirs2_optimization::{
345    AdvancedOptimizerConfig, AdvancedStreamOptimizer, MovingStats, OptimizerMetrics,
346};
347pub use cdc_processor::{
348    CdcConfig, CdcConnector, CdcEvent, CdcEventBuilder, CdcMetrics, CdcOperation, CdcProcessor,
349    CdcSource,
350};
351
352// Adaptive load shedding exports
353pub use adaptive_load_shedding::{
354    DropStrategy, LoadMetrics, LoadSheddingConfig, LoadSheddingManager, LoadSheddingStats,
355};
356
357// Stream fusion optimizer exports
358pub use stream_fusion::{
359    FusableChain, FusedOperation, FusedType, FusionAnalysis, FusionConfig, FusionOptimizer,
360    FusionStats, Operation,
361};
362
363// Complex Event Processing (CEP) engine exports
364pub use cep_engine::{
365    CepAggregationFunction, CepConfig, CepEngine, CepMetrics, CepStatistics, CompleteMatch,
366    CorrelationFunction, CorrelationResult, CorrelationStats, DetectedPattern, DetectionAlgorithm,
367    DetectionStats, EnrichmentData, EnrichmentService, EnrichmentSource, EnrichmentSourceType,
368    EnrichmentStats, EventBuffer, EventCorrelator, EventPattern, FieldPredicate, PartialMatch,
369    PatternDetector, ProcessingRule, RuleAction, RuleCondition, RuleEngine, RuleExecutionStats,
370    State, StateMachine, TemporalOperator, TimestampedEvent,
371};
372
373// Data quality and validation framework exports
374pub use data_quality::{
375    AlertCondition as QualityAlertCondition, AlertManager as QualityAlertManager,
376    AlertRule as QualityAlertRule, AlertSeverity as QualityAlertSeverity,
377    AlertStats as QualityAlertStats, AlertType as QualityAlertType, AuditAction, AuditEntry,
378    AuditStats, AuditTrail, CleansingRule, CleansingStats, CorrectionType, DataCleanser,
379    DataCorrection, DataProfiler, DataQualityValidator, DuplicateDetector, DuplicateStats,
380    FailureSeverity, FieldProfile, OutlierMethod, ProfileStats, ProfiledEvent, QualityAlert,
381    QualityConfig, QualityDimension, QualityMetrics, QualityReport, QualityScorer, ScoringStats,
382    ValidationFailure, ValidationResult as QualityValidationResult, ValidationRule,
383};
384
385// Advanced sampling techniques exports
386pub use advanced_sampling::{
387    AdvancedSamplingManager, BloomFilter, BloomFilterStats, CountMinSketch, CountMinSketchStats,
388    HyperLogLog, HyperLogLogStats, ReservoirSampler, ReservoirStats, SamplingConfig,
389    SamplingManagerStats, StratifiedSampler, StratifiedStats, TDigest, TDigestStats,
390};
391
392pub mod backend;
393pub mod backend_optimizer;
394pub mod backpressure;
395pub mod biological_computing;
396pub mod bridge;
397pub mod circuit_breaker;
398pub mod config;
399pub mod connection_pool;
400pub mod consciousness_streaming;
401pub mod consumer;
402pub mod cqels;
403pub mod cqrs;
404pub mod csparql;
405pub mod delta;
406pub mod diagnostics;
407pub mod disaster_recovery;
408pub mod dlq;
409pub mod enterprise_audit;
410pub mod enterprise_monitoring;
411pub mod error;
412pub mod event;
413pub mod event_sourcing;
414pub mod failover;
415pub mod graphql_bridge;
416pub mod graphql_subscriptions;
417pub mod health_monitor;
418pub mod join;
419pub mod monitoring;
420pub mod multi_region_replication;
421pub mod multi_tenancy;
422pub mod observability;
423pub mod patch;
424pub mod performance_optimizer;
425pub mod performance_utils;
426pub mod processing;
427pub mod producer;
428pub mod quantum_communication;
429pub mod quantum_processing;
430pub mod quantum_streaming;
431pub mod reconnect;
432pub mod reliability;
433pub mod rsp;
434pub mod schema_registry;
435pub mod security;
436pub mod serialization;
437pub mod sparql_streaming;
438pub mod state;
439pub mod store_integration;
440pub mod temporal_join;
441pub mod time_travel;
442pub mod tls_security;
443pub mod types;
444pub mod wasm_edge_computing;
445pub mod webhook;
446
447// New v0.1.0 modules for advanced features
448pub mod custom_serialization;
449pub mod end_to_end_encryption;
450pub mod gpu_acceleration;
451pub mod ml_integration;
452pub mod rate_limiting;
453pub mod scalability;
454pub mod schema_evolution;
455pub mod stream_replay;
456pub mod transactional_processing;
457pub mod zero_copy;
458
459// New v0.1.0 modules for developer experience and performance
460pub mod numa_processing;
461pub mod out_of_order;
462pub mod performance_profiler;
463pub mod stream_sql;
464pub mod testing_framework;
465
466// New v0.1.0 modules for ML, versioning, and migration
467pub mod anomaly_detection;
468pub mod migration_tools;
469pub mod online_learning;
470pub mod stream_versioning;
471
472// Advanced ML modules for v0.1.0 completion
473pub mod automl_stream;
474pub mod feature_engineering;
475pub mod neural_architecture_search;
476pub mod predictive_analytics;
477pub mod reinforcement_learning;
478
479// Utilities module
480pub mod utils;
481
482// Advanced SciRS2 optimization module
483pub mod advanced_scirs2_optimization;
484pub mod cdc_processor;
485
486// Adaptive load shedding module
487pub mod adaptive_load_shedding;
488
489// Stream fusion optimizer module
490pub mod stream_fusion;
491
492// Complex Event Processing (CEP) engine module
493pub mod cep_engine;
494
495// Data quality and validation framework module
496pub mod data_quality;
497
498// Advanced sampling techniques module
499pub mod advanced_sampling;
500
501/// Enhanced stream configuration with advanced features
502#[derive(Debug, Clone, Serialize, Deserialize)]
503pub struct StreamConfig {
504    pub backend: StreamBackendType,
505    pub topic: String,
506    pub batch_size: usize,
507    pub flush_interval_ms: u64,
508    /// Maximum concurrent connections
509    pub max_connections: usize,
510    /// Connection timeout
511    pub connection_timeout: Duration,
512    /// Enable compression
513    pub enable_compression: bool,
514    /// Compression type
515    pub compression_type: CompressionType,
516    /// Retry configuration
517    pub retry_config: RetryConfig,
518    /// Circuit breaker configuration
519    pub circuit_breaker: CircuitBreakerConfig,
520    /// Security configuration
521    pub security: SecurityConfig,
522    /// Performance tuning
523    pub performance: StreamPerformanceConfig,
524    /// Monitoring configuration
525    pub monitoring: MonitoringConfig,
526}
527
528/// Compression types supported
529#[derive(Debug, Clone, Serialize, Deserialize)]
530pub enum CompressionType {
531    None,
532    Gzip,
533    Snappy,
534    Lz4,
535    Zstd,
536}
537
538/// Retry configuration
539#[derive(Debug, Clone, Serialize, Deserialize)]
540pub struct RetryConfig {
541    pub max_retries: u32,
542    pub initial_backoff: Duration,
543    pub max_backoff: Duration,
544    pub backoff_multiplier: f64,
545    pub jitter: bool,
546}
547
548/// Circuit breaker configuration
549#[derive(Debug, Clone, Serialize, Deserialize)]
550pub struct CircuitBreakerConfig {
551    pub enabled: bool,
552    pub failure_threshold: u32,
553    pub success_threshold: u32,
554    pub timeout: Duration,
555    pub half_open_max_calls: u32,
556}
557
558/// Security configuration
559#[derive(Debug, Clone, Serialize, Deserialize)]
560pub struct SecurityConfig {
561    pub enable_tls: bool,
562    pub verify_certificates: bool,
563    pub client_cert_path: Option<String>,
564    pub client_key_path: Option<String>,
565    pub ca_cert_path: Option<String>,
566    pub sasl_config: Option<SaslConfig>,
567}
568
569/// SASL authentication configuration
570#[derive(Debug, Clone, Serialize, Deserialize)]
571pub struct SaslConfig {
572    pub mechanism: SaslMechanism,
573    pub username: String,
574    pub password: String,
575}
576
577/// SASL authentication mechanisms
578#[derive(Debug, Clone, Serialize, Deserialize)]
579pub enum SaslMechanism {
580    Plain,
581    ScramSha256,
582    ScramSha512,
583    OAuthBearer,
584}
585
586/// Performance tuning configuration
587#[derive(Debug, Clone, Serialize, Deserialize)]
588pub struct StreamPerformanceConfig {
589    pub enable_batching: bool,
590    pub enable_pipelining: bool,
591    pub buffer_size: usize,
592    pub prefetch_count: u32,
593    pub enable_zero_copy: bool,
594    pub enable_simd: bool,
595    pub parallel_processing: bool,
596    pub worker_threads: Option<usize>,
597}
598
599/// Monitoring configuration
600#[derive(Debug, Clone, Serialize, Deserialize)]
601pub struct MonitoringConfig {
602    pub enable_metrics: bool,
603    pub enable_tracing: bool,
604    pub metrics_interval: Duration,
605    pub health_check_interval: Duration,
606    pub enable_profiling: bool,
607    pub prometheus_endpoint: Option<String>,
608    pub jaeger_endpoint: Option<String>,
609    pub log_level: String,
610}
611
612/// Enhanced streaming backend options
613#[derive(Debug, Clone, Serialize, Deserialize)]
614pub enum StreamBackendType {
615    #[cfg(feature = "kafka")]
616    Kafka {
617        brokers: Vec<String>,
618        security_protocol: Option<String>,
619        sasl_config: Option<SaslConfig>,
620    },
621    #[cfg(feature = "nats")]
622    Nats {
623        url: String,
624        cluster_urls: Option<Vec<String>>,
625        jetstream_config: Option<NatsJetStreamConfig>,
626    },
627    #[cfg(feature = "redis")]
628    Redis {
629        url: String,
630        cluster_urls: Option<Vec<String>>,
631        pool_size: Option<usize>,
632    },
633    #[cfg(feature = "kinesis")]
634    Kinesis {
635        region: String,
636        stream_name: String,
637        credentials: Option<AwsCredentials>,
638    },
639    #[cfg(feature = "pulsar")]
640    Pulsar {
641        service_url: String,
642        auth_config: Option<PulsarAuthConfig>,
643    },
644    #[cfg(feature = "rabbitmq")]
645    RabbitMQ {
646        url: String,
647        exchange: Option<String>,
648        queue: Option<String>,
649    },
650    Memory {
651        max_size: Option<usize>,
652        persistence: bool,
653    },
654}
655
656/// NATS JetStream configuration
657#[derive(Debug, Clone, Serialize, Deserialize)]
658pub struct NatsJetStreamConfig {
659    pub domain: Option<String>,
660    pub api_prefix: Option<String>,
661    pub timeout: Duration,
662}
663
664/// AWS credentials configuration
665#[derive(Debug, Clone, Serialize, Deserialize)]
666pub struct AwsCredentials {
667    pub access_key_id: String,
668    pub secret_access_key: String,
669    pub session_token: Option<String>,
670    pub role_arn: Option<String>,
671}
672
673/// Pulsar authentication configuration
674#[derive(Debug, Clone, Serialize, Deserialize)]
675pub struct PulsarAuthConfig {
676    pub auth_method: PulsarAuthMethod,
677    pub auth_params: HashMap<String, String>,
678}
679
680/// Pulsar authentication methods
681#[derive(Debug, Clone, Serialize, Deserialize)]
682pub enum PulsarAuthMethod {
683    Token,
684    Jwt,
685    Oauth2,
686    Tls,
687}
688
689/// Enhanced stream producer for publishing RDF changes with backend support
690pub struct StreamProducer {
691    config: StreamConfig,
692    backend_producer: BackendProducer,
693    stats: Arc<RwLock<ProducerStats>>,
694    circuit_breaker: Option<circuit_breaker::SharedCircuitBreaker>,
695    last_flush: Instant,
696    _pending_events: Arc<RwLock<Vec<StreamEvent>>>,
697    batch_buffer: Arc<RwLock<Vec<StreamEvent>>>,
698    flush_semaphore: Arc<Semaphore>,
699}
700
701/// Backend-agnostic producer wrapper
702enum BackendProducer {
703    #[cfg(feature = "kafka")]
704    Kafka(backend::kafka::KafkaProducer),
705    #[cfg(feature = "nats")]
706    Nats(Box<backend::nats::NatsProducer>),
707    #[cfg(feature = "redis")]
708    Redis(backend::redis::RedisProducer),
709    #[cfg(feature = "kinesis")]
710    Kinesis(backend::kinesis::KinesisProducer),
711    #[cfg(feature = "pulsar")]
712    Pulsar(Box<backend::pulsar::PulsarProducer>),
713    #[cfg(feature = "rabbitmq")]
714    RabbitMQ(Box<backend::rabbitmq::RabbitMQProducer>),
715    Memory(MemoryProducer),
716}
717
718/// Producer statistics for monitoring
719#[derive(Debug, Default, Clone)]
720pub struct ProducerStats {
721    events_published: u64,
722    events_failed: u64,
723    _bytes_sent: u64,
724    avg_latency_ms: f64,
725    max_latency_ms: u64,
726    batch_count: u64,
727    flush_count: u64,
728    circuit_breaker_trips: u64,
729    last_publish: Option<DateTime<Utc>>,
730    backend_type: String,
731}
732
733/// Memory-based producer for testing and development
734// Type aliases for complex types
735type MemoryEventVec = Vec<(DateTime<Utc>, StreamEvent)>;
736type MemoryEventStore = Arc<RwLock<MemoryEventVec>>;
737
738// Global shared storage for memory backend events
739static MEMORY_EVENTS: std::sync::OnceLock<MemoryEventStore> = std::sync::OnceLock::new();
740
741pub fn get_memory_events() -> MemoryEventStore {
742    MEMORY_EVENTS
743        .get_or_init(|| Arc::new(RwLock::new(Vec::new())))
744        .clone()
745}
746
747/// Clear the global memory storage (for testing)
748pub async fn clear_memory_events() {
749    let events = get_memory_events();
750    events.write().await.clear();
751    // Also clear the memory backend storage
752    backend::memory::clear_memory_storage().await;
753}
754
755struct MemoryProducer {
756    backend: Box<dyn StreamBackend + Send + Sync>,
757    topic: String,
758    stats: ProducerStats,
759}
760
761impl MemoryProducer {
762    fn _new(_max_size: Option<usize>, _persistence: bool) -> Self {
763        Self {
764            backend: Box::new(backend::memory::MemoryBackend::new()),
765            topic: "oxirs-stream".to_string(), // Default topic
766            stats: ProducerStats {
767                backend_type: "memory".to_string(),
768                ..Default::default()
769            },
770        }
771    }
772
773    fn with_topic(topic: String) -> Self {
774        Self {
775            backend: Box::new(backend::memory::MemoryBackend::new()),
776            topic,
777            stats: ProducerStats {
778                backend_type: "memory".to_string(),
779                ..Default::default()
780            },
781        }
782    }
783
784    async fn publish(&mut self, event: StreamEvent) -> Result<()> {
785        let start_time = Instant::now();
786
787        // Use the backend implementation
788        self.backend
789            .as_mut()
790            .connect()
791            .await
792            .map_err(|e| anyhow!("Backend connect failed: {}", e))?;
793
794        // Create topic if it doesn't exist
795        let topic_name = crate::types::TopicName::new(self.topic.clone());
796        self.backend
797            .create_topic(&topic_name, 1)
798            .await
799            .map_err(|e| anyhow!("Topic creation failed: {}", e))?;
800
801        // Send event through backend
802        self.backend
803            .send_event(&topic_name, event)
804            .await
805            .map_err(|e| anyhow!("Send event failed: {}", e))?;
806
807        // Update stats
808        self.stats.events_published += 1;
809        let latency = start_time.elapsed().as_millis() as u64;
810        self.stats.max_latency_ms = self.stats.max_latency_ms.max(latency);
811        self.stats.avg_latency_ms = (self.stats.avg_latency_ms + latency as f64) / 2.0;
812        self.stats.last_publish = Some(Utc::now());
813
814        debug!("Memory producer: published event via backend");
815        Ok(())
816    }
817
818    async fn flush(&mut self) -> Result<()> {
819        // Backend handles flushing internally
820        self.stats.flush_count += 1;
821        debug!("Memory producer: flush completed");
822        Ok(())
823    }
824
825    fn _get_stats(&self) -> &ProducerStats {
826        &self.stats
827    }
828}
829
830impl StreamProducer {
831    /// Create a new enhanced stream producer with backend support
832    pub async fn new(config: StreamConfig) -> Result<Self> {
833        // Initialize circuit breaker if enabled
834        let circuit_breaker = if config.circuit_breaker.enabled {
835            Some(circuit_breaker::new_shared_circuit_breaker(
836                circuit_breaker::CircuitBreakerConfig {
837                    enabled: config.circuit_breaker.enabled,
838                    failure_threshold: config.circuit_breaker.failure_threshold,
839                    success_threshold: config.circuit_breaker.success_threshold,
840                    timeout: config.circuit_breaker.timeout,
841                    half_open_max_calls: config.circuit_breaker.half_open_max_calls,
842                    ..Default::default()
843                },
844            ))
845        } else {
846            None
847        };
848
849        // Initialize backend-specific producer
850        let backend_producer = match &config.backend {
851            #[cfg(feature = "kafka")]
852            StreamBackendType::Kafka {
853                brokers,
854                security_protocol,
855                sasl_config,
856            } => {
857                let stream_config = crate::StreamConfig {
858                    backend: crate::StreamBackendType::Kafka {
859                        brokers: brokers.clone(),
860                        security_protocol: security_protocol.clone(),
861                        sasl_config: sasl_config.clone(),
862                    },
863                    topic: config.topic.clone(),
864                    batch_size: config.batch_size,
865                    flush_interval_ms: config.flush_interval_ms,
866                    max_connections: config.max_connections,
867                    connection_timeout: config.connection_timeout,
868                    enable_compression: config.enable_compression,
869                    compression_type: config.compression_type.clone(),
870                    retry_config: config.retry_config.clone(),
871                    circuit_breaker: config.circuit_breaker.clone(),
872                    security: config.security.clone(),
873                    performance: config.performance.clone(),
874                    monitoring: config.monitoring.clone(),
875                };
876
877                let mut producer = backend::kafka::KafkaProducer::new(stream_config)?;
878                producer.connect().await?;
879                BackendProducer::Kafka(producer)
880            }
881            #[cfg(feature = "nats")]
882            StreamBackendType::Nats {
883                url,
884                cluster_urls,
885                jetstream_config,
886            } => {
887                let stream_config = crate::StreamConfig {
888                    backend: crate::StreamBackendType::Nats {
889                        url: url.clone(),
890                        cluster_urls: cluster_urls.clone(),
891                        jetstream_config: jetstream_config.clone(),
892                    },
893                    topic: config.topic.clone(),
894                    batch_size: config.batch_size,
895                    flush_interval_ms: config.flush_interval_ms,
896                    max_connections: config.max_connections,
897                    connection_timeout: config.connection_timeout,
898                    enable_compression: config.enable_compression,
899                    compression_type: config.compression_type.clone(),
900                    retry_config: config.retry_config.clone(),
901                    circuit_breaker: config.circuit_breaker.clone(),
902                    security: config.security.clone(),
903                    performance: config.performance.clone(),
904                    monitoring: config.monitoring.clone(),
905                };
906
907                let mut producer = backend::nats::NatsProducer::new(stream_config)?;
908                producer.connect().await?;
909                BackendProducer::Nats(Box::new(producer))
910            }
911            #[cfg(feature = "redis")]
912            StreamBackendType::Redis {
913                url,
914                cluster_urls,
915                pool_size,
916            } => {
917                let stream_config = crate::StreamConfig {
918                    backend: crate::StreamBackendType::Redis {
919                        url: url.clone(),
920                        cluster_urls: cluster_urls.clone(),
921                        pool_size: *pool_size,
922                    },
923                    topic: config.topic.clone(),
924                    batch_size: config.batch_size,
925                    flush_interval_ms: config.flush_interval_ms,
926                    max_connections: config.max_connections,
927                    connection_timeout: config.connection_timeout,
928                    enable_compression: config.enable_compression,
929                    compression_type: config.compression_type.clone(),
930                    retry_config: config.retry_config.clone(),
931                    circuit_breaker: config.circuit_breaker.clone(),
932                    security: config.security.clone(),
933                    performance: config.performance.clone(),
934                    monitoring: config.monitoring.clone(),
935                };
936
937                let mut producer = backend::redis::RedisProducer::new(stream_config)?;
938                producer.connect().await?;
939                BackendProducer::Redis(producer)
940            }
941            #[cfg(feature = "kinesis")]
942            StreamBackendType::Kinesis {
943                region,
944                stream_name,
945                credentials,
946            } => {
947                let stream_config = crate::StreamConfig {
948                    backend: crate::StreamBackendType::Kinesis {
949                        region: region.clone(),
950                        stream_name: stream_name.clone(),
951                        credentials: credentials.clone(),
952                    },
953                    topic: config.topic.clone(),
954                    batch_size: config.batch_size,
955                    flush_interval_ms: config.flush_interval_ms,
956                    max_connections: config.max_connections,
957                    connection_timeout: config.connection_timeout,
958                    enable_compression: config.enable_compression,
959                    compression_type: config.compression_type.clone(),
960                    retry_config: config.retry_config.clone(),
961                    circuit_breaker: config.circuit_breaker.clone(),
962                    security: config.security.clone(),
963                    performance: config.performance.clone(),
964                    monitoring: config.monitoring.clone(),
965                };
966
967                let mut producer = backend::kinesis::KinesisProducer::new(stream_config)?;
968                producer.connect().await?;
969                BackendProducer::Kinesis(producer)
970            }
971            #[cfg(feature = "pulsar")]
972            StreamBackendType::Pulsar {
973                service_url,
974                auth_config,
975            } => {
976                let stream_config = crate::StreamConfig {
977                    backend: crate::StreamBackendType::Pulsar {
978                        service_url: service_url.clone(),
979                        auth_config: auth_config.clone(),
980                    },
981                    topic: config.topic.clone(),
982                    batch_size: config.batch_size,
983                    flush_interval_ms: config.flush_interval_ms,
984                    max_connections: config.max_connections,
985                    connection_timeout: config.connection_timeout,
986                    enable_compression: config.enable_compression,
987                    compression_type: config.compression_type.clone(),
988                    retry_config: config.retry_config.clone(),
989                    circuit_breaker: config.circuit_breaker.clone(),
990                    security: config.security.clone(),
991                    performance: config.performance.clone(),
992                    monitoring: config.monitoring.clone(),
993                };
994
995                let mut producer = backend::pulsar::PulsarProducer::new(stream_config)?;
996                producer.connect().await?;
997                BackendProducer::Pulsar(Box::new(producer))
998            }
999            #[cfg(feature = "rabbitmq")]
1000            StreamBackendType::RabbitMQ {
1001                url,
1002                exchange,
1003                queue,
1004            } => {
1005                let stream_config = crate::StreamConfig {
1006                    backend: crate::StreamBackendType::RabbitMQ {
1007                        url: url.clone(),
1008                        exchange: exchange.clone(),
1009                        queue: queue.clone(),
1010                    },
1011                    topic: config.topic.clone(),
1012                    batch_size: config.batch_size,
1013                    flush_interval_ms: config.flush_interval_ms,
1014                    max_connections: config.max_connections,
1015                    connection_timeout: config.connection_timeout,
1016                    enable_compression: config.enable_compression,
1017                    compression_type: config.compression_type.clone(),
1018                    retry_config: config.retry_config.clone(),
1019                    circuit_breaker: config.circuit_breaker.clone(),
1020                    security: config.security.clone(),
1021                    performance: config.performance.clone(),
1022                    monitoring: config.monitoring.clone(),
1023                };
1024
1025                let mut producer = backend::rabbitmq::RabbitMQProducer::new(stream_config)?;
1026                producer.connect().await?;
1027                BackendProducer::RabbitMQ(Box::new(producer))
1028            }
1029            StreamBackendType::Memory {
1030                max_size: _,
1031                persistence: _,
1032            } => BackendProducer::Memory(MemoryProducer::with_topic(config.topic.clone())),
1033        };
1034
1035        let stats = Arc::new(RwLock::new(ProducerStats {
1036            backend_type: match backend_producer {
1037                #[cfg(feature = "kafka")]
1038                BackendProducer::Kafka(_) => "kafka".to_string(),
1039                #[cfg(feature = "nats")]
1040                BackendProducer::Nats(_) => "nats".to_string(),
1041                #[cfg(feature = "redis")]
1042                BackendProducer::Redis(_) => "redis".to_string(),
1043                #[cfg(feature = "kinesis")]
1044                BackendProducer::Kinesis(_) => "kinesis".to_string(),
1045                #[cfg(feature = "pulsar")]
1046                BackendProducer::Pulsar(_) => "pulsar".to_string(),
1047                #[cfg(feature = "rabbitmq")]
1048                BackendProducer::RabbitMQ(_) => "rabbitmq".to_string(),
1049                BackendProducer::Memory(_) => "memory".to_string(),
1050            },
1051            ..Default::default()
1052        }));
1053
1054        info!(
1055            "Created stream producer with backend: {}",
1056            stats.read().await.backend_type
1057        );
1058
1059        Ok(Self {
1060            config,
1061            backend_producer,
1062            stats,
1063            circuit_breaker,
1064            last_flush: Instant::now(),
1065            _pending_events: Arc::new(RwLock::new(Vec::new())),
1066            batch_buffer: Arc::new(RwLock::new(Vec::new())),
1067            flush_semaphore: Arc::new(Semaphore::new(1)),
1068        })
1069    }
1070
1071    /// Publish a stream event with circuit breaker protection and batching
1072    pub async fn publish(&mut self, event: StreamEvent) -> Result<()> {
1073        let start_time = Instant::now();
1074
1075        // Check circuit breaker if enabled
1076        if let Some(cb) = &self.circuit_breaker {
1077            if !cb.can_execute().await {
1078                self.stats.write().await.circuit_breaker_trips += 1;
1079                return Err(anyhow!("Circuit breaker is open - cannot publish events"));
1080            }
1081        }
1082
1083        // Handle batching if enabled
1084        if self.config.performance.enable_batching {
1085            let mut batch_buffer = self.batch_buffer.write().await;
1086            batch_buffer.push(event);
1087
1088            if batch_buffer.len() >= self.config.batch_size {
1089                let events = std::mem::take(&mut *batch_buffer);
1090                drop(batch_buffer);
1091                return self.publish_batch_internal(events).await;
1092            }
1093
1094            return Ok(());
1095        }
1096
1097        // Publish single event
1098        let result = self.publish_single_event(event).await;
1099
1100        // Update circuit breaker and stats
1101        match &result {
1102            Ok(_) => {
1103                if let Some(cb) = &self.circuit_breaker {
1104                    cb.record_success_with_duration(start_time.elapsed()).await;
1105                }
1106
1107                let mut stats = self.stats.write().await;
1108                stats.events_published += 1;
1109                let latency = start_time.elapsed().as_millis() as u64;
1110                stats.max_latency_ms = stats.max_latency_ms.max(latency);
1111                stats.avg_latency_ms = (stats.avg_latency_ms + latency as f64) / 2.0;
1112                stats.last_publish = Some(Utc::now());
1113            }
1114            Err(_) => {
1115                if let Some(cb) = &self.circuit_breaker {
1116                    cb.record_failure_with_type(circuit_breaker::FailureType::NetworkError)
1117                        .await;
1118                }
1119
1120                self.stats.write().await.events_failed += 1;
1121            }
1122        }
1123
1124        result
1125    }
1126
1127    /// Publish a single event to the backend
1128    async fn publish_single_event(&mut self, event: StreamEvent) -> Result<()> {
1129        match &mut self.backend_producer {
1130            #[cfg(feature = "kafka")]
1131            BackendProducer::Kafka(producer) => producer.publish(event).await,
1132            #[cfg(feature = "nats")]
1133            BackendProducer::Nats(producer) => producer.publish(event).await,
1134            #[cfg(feature = "redis")]
1135            BackendProducer::Redis(producer) => producer.publish(event).await,
1136            #[cfg(feature = "kinesis")]
1137            BackendProducer::Kinesis(producer) => producer.publish(event).await,
1138            #[cfg(feature = "pulsar")]
1139            BackendProducer::Pulsar(producer) => producer.publish(event).await,
1140            #[cfg(feature = "rabbitmq")]
1141            BackendProducer::RabbitMQ(producer) => producer.publish(event).await,
1142            BackendProducer::Memory(producer) => producer.publish(event).await,
1143        }
1144    }
1145
1146    /// Publish multiple events as a batch
1147    pub async fn publish_batch(&mut self, events: Vec<StreamEvent>) -> Result<()> {
1148        if events.is_empty() {
1149            return Ok(());
1150        }
1151
1152        self.publish_batch_internal(events).await
1153    }
1154
1155    /// Internal batch publishing implementation
1156    async fn publish_batch_internal(&mut self, events: Vec<StreamEvent>) -> Result<()> {
1157        let start_time = Instant::now();
1158        let event_count = events.len();
1159
1160        let result = match &mut self.backend_producer {
1161            #[cfg(feature = "kafka")]
1162            BackendProducer::Kafka(producer) => producer.publish_batch(events).await,
1163            #[cfg(feature = "nats")]
1164            BackendProducer::Nats(producer) => producer.publish_batch(events).await,
1165            #[cfg(feature = "redis")]
1166            BackendProducer::Redis(producer) => producer.publish_batch(events).await,
1167            #[cfg(feature = "kinesis")]
1168            BackendProducer::Kinesis(producer) => producer.publish_batch(events).await,
1169            #[cfg(feature = "pulsar")]
1170            BackendProducer::Pulsar(producer) => producer.publish_batch(events).await,
1171            #[cfg(feature = "rabbitmq")]
1172            BackendProducer::RabbitMQ(producer) => producer.publish_batch(events).await,
1173            BackendProducer::Memory(producer) => {
1174                for event in events {
1175                    producer.publish(event).await?;
1176                }
1177                Ok(())
1178            }
1179        };
1180
1181        // Update stats
1182        let mut stats = self.stats.write().await;
1183        match &result {
1184            Ok(_) => {
1185                stats.events_published += event_count as u64;
1186                stats.batch_count += 1;
1187                let latency = start_time.elapsed().as_millis() as u64;
1188                stats.max_latency_ms = stats.max_latency_ms.max(latency);
1189                stats.avg_latency_ms = (stats.avg_latency_ms + latency as f64) / 2.0;
1190                stats.last_publish = Some(Utc::now());
1191            }
1192            Err(_) => {
1193                stats.events_failed += event_count as u64;
1194            }
1195        }
1196
1197        debug!(
1198            "Published batch of {} events in {:?}",
1199            event_count,
1200            start_time.elapsed()
1201        );
1202        result
1203    }
1204
1205    /// Flush any pending events and buffers
1206    pub async fn flush(&mut self) -> Result<()> {
1207        let _permit = self
1208            .flush_semaphore
1209            .acquire()
1210            .await
1211            .map_err(|_| anyhow!("Failed to acquire flush semaphore"))?;
1212
1213        let start_time = Instant::now();
1214
1215        // Flush any pending batch buffer
1216        if self.config.performance.enable_batching {
1217            let events = {
1218                let mut batch_buffer = self.batch_buffer.write().await;
1219                if !batch_buffer.is_empty() {
1220                    std::mem::take(&mut *batch_buffer)
1221                } else {
1222                    Vec::new()
1223                }
1224            };
1225            if !events.is_empty() {
1226                drop(_permit); // Release permit before mutable borrow
1227                self.publish_batch_internal(events).await?;
1228            }
1229        }
1230
1231        // Flush backend-specific buffers
1232        let result = match &mut self.backend_producer {
1233            #[cfg(feature = "kafka")]
1234            BackendProducer::Kafka(producer) => producer.flush().await,
1235            #[cfg(feature = "nats")]
1236            BackendProducer::Nats(producer) => producer.flush().await,
1237            #[cfg(feature = "redis")]
1238            BackendProducer::Redis(producer) => producer.flush().await,
1239            #[cfg(feature = "kinesis")]
1240            BackendProducer::Kinesis(producer) => producer.flush().await,
1241            #[cfg(feature = "pulsar")]
1242            BackendProducer::Pulsar(producer) => producer.flush().await,
1243            #[cfg(feature = "rabbitmq")]
1244            BackendProducer::RabbitMQ(producer) => producer.flush().await,
1245            BackendProducer::Memory(producer) => producer.flush().await,
1246        };
1247
1248        // Update stats
1249        if result.is_ok() {
1250            self.stats.write().await.flush_count += 1;
1251            self.last_flush = Instant::now();
1252            debug!("Flushed producer buffers in {:?}", start_time.elapsed());
1253        }
1254
1255        result
1256    }
1257
1258    /// Publish an RDF patch as a series of events
1259    pub async fn publish_patch(&mut self, patch: &RdfPatch) -> Result<()> {
1260        let events: Vec<StreamEvent> = patch
1261            .operations
1262            .iter()
1263            .filter_map(|op| {
1264                let metadata = EventMetadata {
1265                    event_id: Uuid::new_v4().to_string(),
1266                    timestamp: patch.timestamp,
1267                    source: "rdf_patch".to_string(),
1268                    user: None,
1269                    context: Some(patch.id.clone()),
1270                    caused_by: None,
1271                    version: "1.0".to_string(),
1272                    properties: HashMap::new(),
1273                    checksum: None,
1274                };
1275
1276                match op {
1277                    PatchOperation::Add {
1278                        subject,
1279                        predicate,
1280                        object,
1281                    } => Some(StreamEvent::TripleAdded {
1282                        subject: subject.clone(),
1283                        predicate: predicate.clone(),
1284                        object: object.clone(),
1285                        graph: None,
1286                        metadata,
1287                    }),
1288                    PatchOperation::Delete {
1289                        subject,
1290                        predicate,
1291                        object,
1292                    } => Some(StreamEvent::TripleRemoved {
1293                        subject: subject.clone(),
1294                        predicate: predicate.clone(),
1295                        object: object.clone(),
1296                        graph: None,
1297                        metadata,
1298                    }),
1299                    PatchOperation::AddGraph { graph } => Some(StreamEvent::GraphCreated {
1300                        graph: graph.clone(),
1301                        metadata,
1302                    }),
1303                    PatchOperation::DeleteGraph { graph } => Some(StreamEvent::GraphDeleted {
1304                        graph: graph.clone(),
1305                        metadata,
1306                    }),
1307                    PatchOperation::AddPrefix { .. } => {
1308                        // Skip prefix operations for now
1309                        None
1310                    }
1311                    PatchOperation::DeletePrefix { .. } => {
1312                        // Skip prefix operations for now
1313                        None
1314                    }
1315                    PatchOperation::TransactionBegin { .. } => {
1316                        Some(StreamEvent::TransactionBegin {
1317                            transaction_id: patch
1318                                .transaction_id
1319                                .clone()
1320                                .unwrap_or_else(|| Uuid::new_v4().to_string()),
1321                            isolation_level: None,
1322                            metadata,
1323                        })
1324                    }
1325                    PatchOperation::TransactionCommit => Some(StreamEvent::TransactionCommit {
1326                        transaction_id: patch
1327                            .transaction_id
1328                            .clone()
1329                            .unwrap_or_else(|| Uuid::new_v4().to_string()),
1330                        metadata,
1331                    }),
1332                    PatchOperation::TransactionAbort => Some(StreamEvent::TransactionAbort {
1333                        transaction_id: patch
1334                            .transaction_id
1335                            .clone()
1336                            .unwrap_or_else(|| Uuid::new_v4().to_string()),
1337                        metadata,
1338                    }),
1339                    PatchOperation::Header { .. } => {
1340                        // Skip header operations for now
1341                        None
1342                    }
1343                }
1344            })
1345            .collect();
1346
1347        self.publish_batch(events).await
1348    }
1349
1350    /// Get producer statistics
1351    pub async fn get_stats(&self) -> ProducerStats {
1352        self.stats.read().await.clone()
1353    }
1354
1355    /// Get producer health status
1356    pub async fn health_check(&self) -> bool {
1357        if let Some(cb) = &self.circuit_breaker {
1358            cb.is_healthy().await
1359        } else {
1360            true
1361        }
1362    }
1363}
1364
1365/// Enhanced stream consumer for receiving RDF changes with backend support
1366pub struct StreamConsumer {
1367    _config: StreamConfig,
1368    backend_consumer: BackendConsumer,
1369    stats: Arc<RwLock<ConsumerStats>>,
1370    circuit_breaker: Option<circuit_breaker::SharedCircuitBreaker>,
1371    last_poll: Instant,
1372    _message_buffer: Arc<RwLock<Vec<StreamEvent>>>,
1373    consumer_group: Option<String>,
1374}
1375
1376/// Backend-agnostic consumer wrapper
1377enum BackendConsumer {
1378    #[cfg(feature = "kafka")]
1379    Kafka(backend::kafka::KafkaConsumer),
1380    #[cfg(feature = "nats")]
1381    Nats(Box<backend::nats::NatsConsumer>),
1382    #[cfg(feature = "redis")]
1383    Redis(backend::redis::RedisConsumer),
1384    #[cfg(feature = "kinesis")]
1385    Kinesis(backend::kinesis::KinesisConsumer),
1386    #[cfg(feature = "pulsar")]
1387    Pulsar(Box<backend::pulsar::PulsarConsumer>),
1388    #[cfg(feature = "rabbitmq")]
1389    RabbitMQ(Box<backend::rabbitmq::RabbitMQConsumer>),
1390    Memory(MemoryConsumer),
1391}
1392
1393/// Consumer statistics for monitoring
1394#[derive(Debug, Default, Clone)]
1395pub struct ConsumerStats {
1396    events_consumed: u64,
1397    events_failed: u64,
1398    _bytes_received: u64,
1399    avg_processing_time_ms: f64,
1400    max_processing_time_ms: u64,
1401    _consumer_lag: u64,
1402    circuit_breaker_trips: u64,
1403    last_message: Option<DateTime<Utc>>,
1404    backend_type: String,
1405    _batch_size: usize,
1406}
1407
1408/// Memory-based consumer for testing and development
1409struct MemoryConsumer {
1410    backend: Box<dyn StreamBackend + Send + Sync>,
1411    topic: String,
1412    current_offset: u64,
1413    stats: ConsumerStats,
1414}
1415
1416impl MemoryConsumer {
1417    fn _new() -> Self {
1418        Self {
1419            backend: Box::new(backend::memory::MemoryBackend::new()),
1420            topic: "oxirs-stream".to_string(),
1421            current_offset: 0,
1422            stats: ConsumerStats {
1423                backend_type: "memory".to_string(),
1424                ..Default::default()
1425            },
1426        }
1427    }
1428
1429    fn with_topic(topic: String) -> Self {
1430        Self {
1431            backend: Box::new(backend::memory::MemoryBackend::new()),
1432            topic,
1433            current_offset: 0,
1434            stats: ConsumerStats {
1435                backend_type: "memory".to_string(),
1436                ..Default::default()
1437            },
1438        }
1439    }
1440
1441    async fn consume(&mut self) -> Result<Option<StreamEvent>> {
1442        let start_time = Instant::now();
1443
1444        // Use the backend implementation
1445        self.backend
1446            .as_mut()
1447            .connect()
1448            .await
1449            .map_err(|e| anyhow!("Backend connect failed: {}", e))?;
1450
1451        // Try to receive events from the backend
1452        let topic_name = crate::types::TopicName::new(self.topic.clone());
1453        let events = self
1454            .backend
1455            .receive_events(
1456                &topic_name,
1457                None, // No consumer group for now
1458                crate::types::StreamPosition::Offset(self.current_offset),
1459                1, // Get one event at a time
1460            )
1461            .await
1462            .map_err(|e| anyhow!("Receive events failed: {}", e))?;
1463
1464        if let Some((event, offset)) = events.first() {
1465            self.current_offset = offset.value() + 1;
1466
1467            // Update stats
1468            self.stats.events_consumed += 1;
1469            let processing_time = start_time.elapsed().as_millis() as u64;
1470            self.stats.max_processing_time_ms =
1471                self.stats.max_processing_time_ms.max(processing_time);
1472            self.stats.avg_processing_time_ms =
1473                (self.stats.avg_processing_time_ms + processing_time as f64) / 2.0;
1474            self.stats.last_message = Some(Utc::now());
1475
1476            debug!("Memory consumer: consumed event via backend");
1477            Ok(Some(event.clone()))
1478        } else {
1479            debug!("Memory consumer: no events available");
1480            Ok(None)
1481        }
1482    }
1483
1484    fn _get_stats(&self) -> &ConsumerStats {
1485        &self.stats
1486    }
1487
1488    /// Reset consumer position for testing
1489    fn reset(&mut self) {
1490        self.current_offset = 0;
1491    }
1492}
1493
1494impl StreamConsumer {
1495    /// Create a new enhanced stream consumer with backend support
1496    pub async fn new(config: StreamConfig) -> Result<Self> {
1497        Self::new_with_group(config, None).await
1498    }
1499
1500    /// Create a new stream consumer with a specific consumer group
1501    pub async fn new_with_group(
1502        config: StreamConfig,
1503        consumer_group: Option<String>,
1504    ) -> Result<Self> {
1505        // Initialize circuit breaker if enabled
1506        let circuit_breaker = if config.circuit_breaker.enabled {
1507            Some(circuit_breaker::new_shared_circuit_breaker(
1508                circuit_breaker::CircuitBreakerConfig {
1509                    enabled: config.circuit_breaker.enabled,
1510                    failure_threshold: config.circuit_breaker.failure_threshold,
1511                    success_threshold: config.circuit_breaker.success_threshold,
1512                    timeout: config.circuit_breaker.timeout,
1513                    half_open_max_calls: config.circuit_breaker.half_open_max_calls,
1514                    ..Default::default()
1515                },
1516            ))
1517        } else {
1518            None
1519        };
1520
1521        // Initialize backend-specific consumer
1522        let backend_consumer = match &config.backend {
1523            #[cfg(feature = "kafka")]
1524            StreamBackendType::Kafka {
1525                brokers,
1526                security_protocol,
1527                sasl_config,
1528            } => {
1529                let stream_config = crate::StreamConfig {
1530                    backend: crate::StreamBackendType::Kafka {
1531                        brokers: brokers.clone(),
1532                        security_protocol: security_protocol.clone(),
1533                        sasl_config: sasl_config.clone(),
1534                    },
1535                    topic: config.topic.clone(),
1536                    batch_size: config.batch_size,
1537                    flush_interval_ms: config.flush_interval_ms,
1538                    max_connections: config.max_connections,
1539                    connection_timeout: config.connection_timeout,
1540                    enable_compression: config.enable_compression,
1541                    compression_type: config.compression_type.clone(),
1542                    retry_config: config.retry_config.clone(),
1543                    circuit_breaker: config.circuit_breaker.clone(),
1544                    security: config.security.clone(),
1545                    performance: config.performance.clone(),
1546                    monitoring: config.monitoring.clone(),
1547                };
1548
1549                let mut consumer = backend::kafka::KafkaConsumer::new(stream_config)?;
1550                consumer.connect().await?;
1551                BackendConsumer::Kafka(consumer)
1552            }
1553            #[cfg(feature = "nats")]
1554            StreamBackendType::Nats {
1555                url,
1556                cluster_urls,
1557                jetstream_config,
1558            } => {
1559                let stream_config = crate::StreamConfig {
1560                    backend: crate::StreamBackendType::Nats {
1561                        url: url.clone(),
1562                        cluster_urls: cluster_urls.clone(),
1563                        jetstream_config: jetstream_config.clone(),
1564                    },
1565                    topic: config.topic.clone(),
1566                    batch_size: config.batch_size,
1567                    flush_interval_ms: config.flush_interval_ms,
1568                    max_connections: config.max_connections,
1569                    connection_timeout: config.connection_timeout,
1570                    enable_compression: config.enable_compression,
1571                    compression_type: config.compression_type.clone(),
1572                    retry_config: config.retry_config.clone(),
1573                    circuit_breaker: config.circuit_breaker.clone(),
1574                    security: config.security.clone(),
1575                    performance: config.performance.clone(),
1576                    monitoring: config.monitoring.clone(),
1577                };
1578
1579                let mut consumer = backend::nats::NatsConsumer::new(stream_config)?;
1580                consumer.connect().await?;
1581                BackendConsumer::Nats(Box::new(consumer))
1582            }
1583            #[cfg(feature = "redis")]
1584            StreamBackendType::Redis {
1585                url,
1586                cluster_urls,
1587                pool_size,
1588            } => {
1589                let stream_config = crate::StreamConfig {
1590                    backend: crate::StreamBackendType::Redis {
1591                        url: url.clone(),
1592                        cluster_urls: cluster_urls.clone(),
1593                        pool_size: *pool_size,
1594                    },
1595                    topic: config.topic.clone(),
1596                    batch_size: config.batch_size,
1597                    flush_interval_ms: config.flush_interval_ms,
1598                    max_connections: config.max_connections,
1599                    connection_timeout: config.connection_timeout,
1600                    enable_compression: config.enable_compression,
1601                    compression_type: config.compression_type.clone(),
1602                    retry_config: config.retry_config.clone(),
1603                    circuit_breaker: config.circuit_breaker.clone(),
1604                    security: config.security.clone(),
1605                    performance: config.performance.clone(),
1606                    monitoring: config.monitoring.clone(),
1607                };
1608
1609                let mut consumer = backend::redis::RedisConsumer::new(stream_config)?;
1610                consumer.connect().await?;
1611                BackendConsumer::Redis(consumer)
1612            }
1613            #[cfg(feature = "kinesis")]
1614            StreamBackendType::Kinesis {
1615                region,
1616                stream_name,
1617                credentials,
1618            } => {
1619                let stream_config = crate::StreamConfig {
1620                    backend: crate::StreamBackendType::Kinesis {
1621                        region: region.clone(),
1622                        stream_name: stream_name.clone(),
1623                        credentials: credentials.clone(),
1624                    },
1625                    topic: config.topic.clone(),
1626                    batch_size: config.batch_size,
1627                    flush_interval_ms: config.flush_interval_ms,
1628                    max_connections: config.max_connections,
1629                    connection_timeout: config.connection_timeout,
1630                    enable_compression: config.enable_compression,
1631                    compression_type: config.compression_type.clone(),
1632                    retry_config: config.retry_config.clone(),
1633                    circuit_breaker: config.circuit_breaker.clone(),
1634                    security: config.security.clone(),
1635                    performance: config.performance.clone(),
1636                    monitoring: config.monitoring.clone(),
1637                };
1638
1639                let mut consumer = backend::kinesis::KinesisConsumer::new(stream_config)?;
1640                consumer.connect().await?;
1641                BackendConsumer::Kinesis(consumer)
1642            }
1643            #[cfg(feature = "pulsar")]
1644            StreamBackendType::Pulsar {
1645                service_url,
1646                auth_config,
1647            } => {
1648                let stream_config = crate::StreamConfig {
1649                    backend: crate::StreamBackendType::Pulsar {
1650                        service_url: service_url.clone(),
1651                        auth_config: auth_config.clone(),
1652                    },
1653                    topic: config.topic.clone(),
1654                    batch_size: config.batch_size,
1655                    flush_interval_ms: config.flush_interval_ms,
1656                    max_connections: config.max_connections,
1657                    connection_timeout: config.connection_timeout,
1658                    enable_compression: config.enable_compression,
1659                    compression_type: config.compression_type.clone(),
1660                    retry_config: config.retry_config.clone(),
1661                    circuit_breaker: config.circuit_breaker.clone(),
1662                    security: config.security.clone(),
1663                    performance: config.performance.clone(),
1664                    monitoring: config.monitoring.clone(),
1665                };
1666
1667                let mut consumer = backend::pulsar::PulsarConsumer::new(stream_config)?;
1668                consumer.connect().await?;
1669                BackendConsumer::Pulsar(Box::new(consumer))
1670            }
1671            #[cfg(feature = "rabbitmq")]
1672            StreamBackendType::RabbitMQ {
1673                url,
1674                exchange,
1675                queue,
1676            } => {
1677                let stream_config = crate::StreamConfig {
1678                    backend: crate::StreamBackendType::RabbitMQ {
1679                        url: url.clone(),
1680                        exchange: exchange.clone(),
1681                        queue: queue.clone(),
1682                    },
1683                    topic: config.topic.clone(),
1684                    batch_size: config.batch_size,
1685                    flush_interval_ms: config.flush_interval_ms,
1686                    max_connections: config.max_connections,
1687                    connection_timeout: config.connection_timeout,
1688                    enable_compression: config.enable_compression,
1689                    compression_type: config.compression_type.clone(),
1690                    retry_config: config.retry_config.clone(),
1691                    circuit_breaker: config.circuit_breaker.clone(),
1692                    security: config.security.clone(),
1693                    performance: config.performance.clone(),
1694                    monitoring: config.monitoring.clone(),
1695                };
1696
1697                let mut consumer = backend::rabbitmq::RabbitMQConsumer::new(stream_config)?;
1698                consumer.connect().await?;
1699                BackendConsumer::RabbitMQ(Box::new(consumer))
1700            }
1701            StreamBackendType::Memory {
1702                max_size: _,
1703                persistence: _,
1704            } => BackendConsumer::Memory(MemoryConsumer::with_topic(config.topic.clone())),
1705        };
1706
1707        let stats = Arc::new(RwLock::new(ConsumerStats {
1708            backend_type: match backend_consumer {
1709                #[cfg(feature = "kafka")]
1710                BackendConsumer::Kafka(_) => "kafka".to_string(),
1711                #[cfg(feature = "nats")]
1712                BackendConsumer::Nats(_) => "nats".to_string(),
1713                #[cfg(feature = "redis")]
1714                BackendConsumer::Redis(_) => "redis".to_string(),
1715                #[cfg(feature = "kinesis")]
1716                BackendConsumer::Kinesis(_) => "kinesis".to_string(),
1717                #[cfg(feature = "pulsar")]
1718                BackendConsumer::Pulsar(_) => "pulsar".to_string(),
1719                #[cfg(feature = "rabbitmq")]
1720                BackendConsumer::RabbitMQ(_) => "rabbitmq".to_string(),
1721                BackendConsumer::Memory(_) => "memory".to_string(),
1722            },
1723            _batch_size: config.batch_size,
1724            ..Default::default()
1725        }));
1726
1727        info!(
1728            "Created stream consumer with backend: {} and group: {:?}",
1729            stats.read().await.backend_type,
1730            consumer_group
1731        );
1732
1733        Ok(Self {
1734            _config: config,
1735            backend_consumer,
1736            stats,
1737            circuit_breaker,
1738            last_poll: Instant::now(),
1739            _message_buffer: Arc::new(RwLock::new(Vec::new())),
1740            consumer_group,
1741        })
1742    }
1743
1744    /// Consume stream events with circuit breaker protection
1745    pub async fn consume(&mut self) -> Result<Option<StreamEvent>> {
1746        let start_time = Instant::now();
1747
1748        // Check circuit breaker if enabled
1749        if let Some(cb) = &self.circuit_breaker {
1750            if !cb.can_execute().await {
1751                self.stats.write().await.circuit_breaker_trips += 1;
1752                return Err(anyhow!("Circuit breaker is open - cannot consume events"));
1753            }
1754        }
1755
1756        // Consume from backend
1757        let result = self.consume_single_event().await;
1758
1759        // Update circuit breaker and stats
1760        match &result {
1761            Ok(Some(_)) => {
1762                if let Some(cb) = &self.circuit_breaker {
1763                    cb.record_success_with_duration(start_time.elapsed()).await;
1764                }
1765
1766                let mut stats = self.stats.write().await;
1767                stats.events_consumed += 1;
1768                let processing_time = start_time.elapsed().as_millis() as u64;
1769                stats.max_processing_time_ms = stats.max_processing_time_ms.max(processing_time);
1770                stats.avg_processing_time_ms =
1771                    (stats.avg_processing_time_ms + processing_time as f64) / 2.0;
1772                stats.last_message = Some(Utc::now());
1773            }
1774            Ok(None) => {
1775                // No message available, not an error
1776                if let Some(cb) = &self.circuit_breaker {
1777                    cb.record_success_with_duration(start_time.elapsed()).await;
1778                }
1779            }
1780            Err(_) => {
1781                if let Some(cb) = &self.circuit_breaker {
1782                    cb.record_failure_with_type(circuit_breaker::FailureType::NetworkError)
1783                        .await;
1784                }
1785
1786                self.stats.write().await.events_failed += 1;
1787            }
1788        }
1789
1790        self.last_poll = Instant::now();
1791        result
1792    }
1793
1794    /// Consume a single event from the backend
1795    async fn consume_single_event(&mut self) -> Result<Option<StreamEvent>> {
1796        match &mut self.backend_consumer {
1797            #[cfg(feature = "kafka")]
1798            BackendConsumer::Kafka(consumer) => consumer.consume().await,
1799            #[cfg(feature = "nats")]
1800            BackendConsumer::Nats(consumer) => consumer.consume().await,
1801            #[cfg(feature = "redis")]
1802            BackendConsumer::Redis(consumer) => consumer.consume().await,
1803            #[cfg(feature = "kinesis")]
1804            BackendConsumer::Kinesis(consumer) => consumer.consume().await,
1805            #[cfg(feature = "pulsar")]
1806            BackendConsumer::Pulsar(consumer) => consumer.consume().await,
1807            #[cfg(feature = "rabbitmq")]
1808            BackendConsumer::RabbitMQ(consumer) => consumer.consume().await,
1809            BackendConsumer::Memory(consumer) => consumer.consume().await,
1810        }
1811    }
1812
1813    /// Consume multiple events as a batch
1814    pub async fn consume_batch(
1815        &mut self,
1816        max_events: usize,
1817        timeout: Duration,
1818    ) -> Result<Vec<StreamEvent>> {
1819        let mut events = Vec::new();
1820        let start_time = Instant::now();
1821
1822        while events.len() < max_events && start_time.elapsed() < timeout {
1823            match tokio::time::timeout(Duration::from_millis(50), self.consume()).await {
1824                Ok(Ok(Some(event))) => events.push(event),
1825                Ok(Ok(None)) => continue,
1826                Ok(Err(e)) => return Err(e),
1827                Err(_) => break, // Timeout
1828            }
1829        }
1830
1831        if !events.is_empty() {
1832            debug!(
1833                "Consumed batch of {} events in {:?}",
1834                events.len(),
1835                start_time.elapsed()
1836            );
1837        }
1838
1839        Ok(events)
1840    }
1841
1842    /// Start consuming events with a callback function
1843    pub async fn start_consuming<F>(&mut self, mut callback: F) -> Result<()>
1844    where
1845        F: FnMut(StreamEvent) -> Result<()> + Send,
1846    {
1847        info!("Starting stream consumer loop");
1848
1849        loop {
1850            match self.consume().await {
1851                Ok(Some(event)) => {
1852                    if let Err(e) = callback(event) {
1853                        error!("Callback error: {}", e);
1854                        self.stats.write().await.events_failed += 1;
1855                    }
1856                }
1857                Ok(None) => {
1858                    // No message, wait a bit
1859                    tokio::time::sleep(Duration::from_millis(10)).await;
1860                }
1861                Err(e) => {
1862                    error!("Consumer error: {}", e);
1863                    tokio::time::sleep(Duration::from_millis(100)).await;
1864                }
1865            }
1866        }
1867    }
1868
1869    /// Start consuming events with an async callback function
1870    pub async fn start_consuming_async<F, Fut>(&mut self, mut callback: F) -> Result<()>
1871    where
1872        F: FnMut(StreamEvent) -> Fut + Send,
1873        Fut: std::future::Future<Output = Result<()>> + Send,
1874    {
1875        info!("Starting async stream consumer loop");
1876
1877        loop {
1878            match self.consume().await {
1879                Ok(Some(event)) => {
1880                    if let Err(e) = callback(event).await {
1881                        error!("Async callback error: {}", e);
1882                        self.stats.write().await.events_failed += 1;
1883                    }
1884                }
1885                Ok(None) => {
1886                    // No message, wait a bit
1887                    tokio::time::sleep(Duration::from_millis(10)).await;
1888                }
1889                Err(e) => {
1890                    error!("Consumer error: {}", e);
1891                    tokio::time::sleep(Duration::from_millis(100)).await;
1892                }
1893            }
1894        }
1895    }
1896
1897    /// Get consumer statistics
1898    pub async fn get_stats(&self) -> ConsumerStats {
1899        self.stats.read().await.clone()
1900    }
1901
1902    /// Get consumer health status
1903    pub async fn health_check(&self) -> bool {
1904        if let Some(cb) = &self.circuit_breaker {
1905            cb.is_healthy().await
1906        } else {
1907            true
1908        }
1909    }
1910
1911    /// Get the consumer group name if any
1912    pub fn consumer_group(&self) -> Option<&String> {
1913        self.consumer_group.as_ref()
1914    }
1915
1916    /// Reset consumer position (for testing with memory backend)
1917    pub async fn reset_position(&mut self) -> Result<()> {
1918        match &mut self.backend_consumer {
1919            BackendConsumer::Memory(consumer) => {
1920                consumer.reset();
1921                Ok(())
1922            }
1923            #[cfg(feature = "kafka")]
1924            BackendConsumer::Kafka(_) => {
1925                Err(anyhow!("Reset position not supported for Kafka backend"))
1926            }
1927            #[cfg(feature = "nats")]
1928            BackendConsumer::Nats(_) => {
1929                Err(anyhow!("Reset position not supported for NATS backend"))
1930            }
1931            #[cfg(feature = "redis")]
1932            BackendConsumer::Redis(_) => {
1933                Err(anyhow!("Reset position not supported for Redis backend"))
1934            }
1935            #[cfg(feature = "kinesis")]
1936            BackendConsumer::Kinesis(_) => {
1937                Err(anyhow!("Reset position not supported for Kinesis backend"))
1938            }
1939            #[cfg(feature = "pulsar")]
1940            BackendConsumer::Pulsar(_) => {
1941                Err(anyhow!("Reset position not supported for Pulsar backend"))
1942            }
1943            #[cfg(feature = "rabbitmq")]
1944            BackendConsumer::RabbitMQ(_) => {
1945                Err(anyhow!("Reset position not supported for RabbitMQ backend"))
1946            }
1947        }
1948    }
1949
1950    /// Set test events for memory backend (for testing) - deprecated with backend implementation
1951    pub async fn set_test_events(&mut self, _events: Vec<StreamEvent>) -> Result<()> {
1952        // This method is deprecated with the new backend implementation
1953        // Events should be published through the producer instead
1954        Err(anyhow!("set_test_events is deprecated with backend implementation - use producer to publish events"))
1955    }
1956}
1957
1958/// RDF patch operations with full protocol support
1959#[derive(Debug, Clone, Serialize, Deserialize)]
1960pub enum PatchOperation {
1961    /// Add a triple (A operation)
1962    Add {
1963        subject: String,
1964        predicate: String,
1965        object: String,
1966    },
1967    /// Delete a triple (D operation)
1968    Delete {
1969        subject: String,
1970        predicate: String,
1971        object: String,
1972    },
1973    /// Add a graph (GA operation)
1974    AddGraph { graph: String },
1975    /// Delete a graph (GD operation)
1976    DeleteGraph { graph: String },
1977    /// Add a prefix (PA operation)
1978    AddPrefix { prefix: String, namespace: String },
1979    /// Delete a prefix (PD operation)
1980    DeletePrefix { prefix: String },
1981    /// Transaction begin (TX operation)
1982    TransactionBegin { transaction_id: Option<String> },
1983    /// Transaction commit (TC operation)
1984    TransactionCommit,
1985    /// Transaction abort (TA operation)
1986    TransactionAbort,
1987    /// Header information (H operation)
1988    Header { key: String, value: String },
1989}
1990
1991/// RDF patch for atomic updates with full protocol support
1992#[derive(Debug, Clone, Serialize, Deserialize)]
1993pub struct RdfPatch {
1994    pub operations: Vec<PatchOperation>,
1995    pub timestamp: chrono::DateTime<chrono::Utc>,
1996    pub id: String,
1997    /// Patch headers for metadata
1998    pub headers: HashMap<String, String>,
1999    /// Current transaction ID if in transaction
2000    pub transaction_id: Option<String>,
2001    /// Prefixes used in the patch
2002    pub prefixes: HashMap<String, String>,
2003}
2004
2005impl RdfPatch {
2006    /// Create a new RDF patch
2007    pub fn new() -> Self {
2008        Self {
2009            operations: Vec::new(),
2010            timestamp: chrono::Utc::now(),
2011            id: uuid::Uuid::new_v4().to_string(),
2012            headers: HashMap::new(),
2013            transaction_id: None,
2014            prefixes: HashMap::new(),
2015        }
2016    }
2017
2018    /// Add an operation to the patch
2019    pub fn add_operation(&mut self, operation: PatchOperation) {
2020        self.operations.push(operation);
2021    }
2022
2023    /// Serialize patch to RDF Patch format
2024    pub fn to_rdf_patch_format(&self) -> Result<String> {
2025        let serializer = crate::patch::PatchSerializer::new()
2026            .with_pretty_print(true)
2027            .with_metadata(true);
2028        serializer.serialize(self)
2029    }
2030
2031    /// Parse from RDF Patch format
2032    pub fn from_rdf_patch_format(input: &str) -> Result<Self> {
2033        let mut parser = crate::patch::PatchParser::new().with_strict_mode(false);
2034        parser.parse(input)
2035    }
2036}
2037
2038impl Default for RdfPatch {
2039    fn default() -> Self {
2040        Self::new()
2041    }
2042}
2043
2044// Default implementations for easier configuration
2045impl Default for StreamConfig {
2046    fn default() -> Self {
2047        Self {
2048            backend: StreamBackendType::Memory {
2049                max_size: Some(10000),
2050                persistence: false,
2051            },
2052            topic: "oxirs-stream".to_string(),
2053            batch_size: 100,
2054            flush_interval_ms: 100,
2055            max_connections: 10,
2056            connection_timeout: Duration::from_secs(30),
2057            enable_compression: false,
2058            compression_type: CompressionType::None,
2059            retry_config: RetryConfig::default(),
2060            circuit_breaker: CircuitBreakerConfig::default(),
2061            security: SecurityConfig::default(),
2062            performance: StreamPerformanceConfig::default(),
2063            monitoring: MonitoringConfig::default(),
2064        }
2065    }
2066}
2067
2068impl Default for RetryConfig {
2069    fn default() -> Self {
2070        Self {
2071            max_retries: 3,
2072            initial_backoff: Duration::from_millis(100),
2073            max_backoff: Duration::from_secs(30),
2074            backoff_multiplier: 2.0,
2075            jitter: true,
2076        }
2077    }
2078}
2079
2080impl Default for CircuitBreakerConfig {
2081    fn default() -> Self {
2082        Self {
2083            enabled: true,
2084            failure_threshold: 5,
2085            success_threshold: 3,
2086            timeout: Duration::from_secs(30),
2087            half_open_max_calls: 3,
2088        }
2089    }
2090}
2091
2092impl Default for SecurityConfig {
2093    fn default() -> Self {
2094        Self {
2095            enable_tls: false,
2096            verify_certificates: true,
2097            client_cert_path: None,
2098            client_key_path: None,
2099            ca_cert_path: None,
2100            sasl_config: None,
2101        }
2102    }
2103}
2104
2105impl Default for StreamPerformanceConfig {
2106    fn default() -> Self {
2107        Self {
2108            enable_batching: true,
2109            enable_pipelining: false,
2110            buffer_size: 8192,
2111            prefetch_count: 100,
2112            enable_zero_copy: false,
2113            enable_simd: false,
2114            parallel_processing: true,
2115            worker_threads: None,
2116        }
2117    }
2118}
2119
2120impl Default for MonitoringConfig {
2121    fn default() -> Self {
2122        Self {
2123            enable_metrics: true,
2124            enable_tracing: true,
2125            metrics_interval: Duration::from_secs(60),
2126            health_check_interval: Duration::from_secs(30),
2127            enable_profiling: false,
2128            prometheus_endpoint: None,
2129            jaeger_endpoint: None,
2130            log_level: "info".to_string(),
2131        }
2132    }
2133}
2134
2135/// Helper functions for creating common configurations
2136impl StreamConfig {
2137    /// Create a Redis configuration
2138    #[cfg(feature = "redis")]
2139    pub fn redis(url: String) -> Self {
2140        Self {
2141            backend: StreamBackendType::Redis {
2142                url,
2143                cluster_urls: None,
2144                pool_size: Some(10),
2145            },
2146            ..Default::default()
2147        }
2148    }
2149
2150    /// Create a Kinesis configuration
2151    #[cfg(feature = "kinesis")]
2152    pub fn kinesis(region: String, stream_name: String) -> Self {
2153        Self {
2154            backend: StreamBackendType::Kinesis {
2155                region,
2156                stream_name,
2157                credentials: None,
2158            },
2159            ..Default::default()
2160        }
2161    }
2162
2163    /// Create a memory configuration for testing
2164    pub fn memory() -> Self {
2165        Self {
2166            backend: StreamBackendType::Memory {
2167                max_size: Some(1000),
2168                persistence: false,
2169            },
2170            ..Default::default()
2171        }
2172    }
2173
2174    /// Enable high-performance configuration
2175    pub fn high_performance(mut self) -> Self {
2176        self.performance.enable_batching = true;
2177        self.performance.enable_pipelining = true;
2178        self.performance.parallel_processing = true;
2179        self.performance.buffer_size = 65536;
2180        self.performance.prefetch_count = 1000;
2181        self.batch_size = 1000;
2182        self.flush_interval_ms = 10;
2183        self
2184    }
2185
2186    /// Enable compression
2187    pub fn with_compression(mut self, compression_type: CompressionType) -> Self {
2188        self.enable_compression = true;
2189        self.compression_type = compression_type;
2190        self
2191    }
2192
2193    /// Configure circuit breaker
2194    pub fn with_circuit_breaker(mut self, enabled: bool, failure_threshold: u32) -> Self {
2195        self.circuit_breaker.enabled = enabled;
2196        self.circuit_breaker.failure_threshold = failure_threshold;
2197        self
2198    }
2199
2200    /// Create a development configuration with memory backend and debug settings
2201    pub fn development(topic: &str) -> Self {
2202        Self {
2203            backend: StreamBackendType::Memory {
2204                max_size: Some(10000),
2205                persistence: false,
2206            },
2207            topic: topic.to_string(),
2208            batch_size: 10,
2209            flush_interval_ms: 100,
2210            max_connections: 5,
2211            connection_timeout: Duration::from_secs(10),
2212            enable_compression: false,
2213            compression_type: CompressionType::None,
2214            retry_config: RetryConfig {
2215                max_retries: 3,
2216                initial_backoff: Duration::from_millis(100),
2217                max_backoff: Duration::from_secs(5),
2218                backoff_multiplier: 2.0,
2219                jitter: true,
2220            },
2221            circuit_breaker: CircuitBreakerConfig {
2222                enabled: false,
2223                failure_threshold: 5,
2224                success_threshold: 2,
2225                timeout: Duration::from_secs(60),
2226                half_open_max_calls: 10,
2227            },
2228            security: SecurityConfig::default(),
2229            performance: StreamPerformanceConfig::default(),
2230            monitoring: MonitoringConfig {
2231                enable_metrics: true,
2232                enable_tracing: false,
2233                metrics_interval: Duration::from_secs(5),
2234                health_check_interval: Duration::from_secs(30),
2235                enable_profiling: false,
2236                prometheus_endpoint: None,
2237                jaeger_endpoint: None,
2238                log_level: "debug".to_string(),
2239            },
2240        }
2241    }
2242
2243    /// Create a production configuration with optimal performance settings
2244    pub fn production(topic: &str) -> Self {
2245        Self {
2246            backend: StreamBackendType::Memory {
2247                max_size: Some(100000),
2248                persistence: true,
2249            },
2250            topic: topic.to_string(),
2251            batch_size: 1000,
2252            flush_interval_ms: 10,
2253            max_connections: 50,
2254            connection_timeout: Duration::from_secs(30),
2255            enable_compression: true,
2256            compression_type: CompressionType::Zstd,
2257            retry_config: RetryConfig {
2258                max_retries: 5,
2259                initial_backoff: Duration::from_millis(200),
2260                max_backoff: Duration::from_secs(30),
2261                backoff_multiplier: 2.0,
2262                jitter: true,
2263            },
2264            circuit_breaker: CircuitBreakerConfig {
2265                enabled: true,
2266                failure_threshold: 10,
2267                success_threshold: 3,
2268                timeout: Duration::from_secs(30),
2269                half_open_max_calls: 5,
2270            },
2271            security: SecurityConfig::default(),
2272            performance: StreamPerformanceConfig {
2273                enable_batching: true,
2274                enable_pipelining: true,
2275                parallel_processing: true,
2276                buffer_size: 65536,
2277                prefetch_count: 1000,
2278                enable_zero_copy: true,
2279                enable_simd: true,
2280                worker_threads: None,
2281            },
2282            monitoring: MonitoringConfig {
2283                enable_metrics: true,
2284                enable_tracing: true,
2285                metrics_interval: Duration::from_secs(1),
2286                health_check_interval: Duration::from_secs(10),
2287                enable_profiling: true,
2288                prometheus_endpoint: None,
2289                jaeger_endpoint: None,
2290                log_level: "info".to_string(),
2291            },
2292        }
2293    }
2294}
2295
2296/// Unified Stream interface that combines producer and consumer functionality
2297pub struct Stream {
2298    producer: StreamProducer,
2299    consumer: StreamConsumer,
2300}
2301
2302impl Stream {
2303    /// Create a new unified stream instance
2304    pub async fn new(config: StreamConfig) -> Result<Self> {
2305        // Note: Commented out automatic clearing for performance tests
2306        // Clear memory events if using memory backend (important for testing)
2307        // if matches!(config.backend, StreamBackendType::Memory { .. }) {
2308        //     clear_memory_events().await;
2309        // }
2310
2311        let producer = StreamProducer::new(config.clone()).await?;
2312        let consumer = StreamConsumer::new(config).await?;
2313
2314        Ok(Self { producer, consumer })
2315    }
2316
2317    /// Publish an event to the stream
2318    pub async fn publish(&mut self, event: StreamEvent) -> Result<()> {
2319        self.producer.publish(event).await
2320    }
2321
2322    /// Consume an event from the stream
2323    pub async fn consume(&mut self) -> Result<Option<StreamEvent>> {
2324        self.consumer.consume().await
2325    }
2326
2327    /// Flush any pending events
2328    pub async fn flush(&mut self) -> Result<()> {
2329        self.producer.flush().await
2330    }
2331
2332    /// Get producer statistics
2333    pub async fn producer_stats(&self) -> ProducerStats {
2334        self.producer.get_stats().await
2335    }
2336
2337    /// Get consumer statistics  
2338    pub async fn consumer_stats(&self) -> ConsumerStats {
2339        self.consumer.get_stats().await
2340    }
2341
2342    /// Close the stream and clean up resources
2343    pub async fn close(&mut self) -> Result<()> {
2344        // Flush any pending events
2345        self.producer.flush().await?;
2346
2347        // Close backend connections
2348        // Producer and consumer close operations would be handled by their Drop implementations
2349        debug!("Stream closed successfully");
2350        Ok(())
2351    }
2352
2353    /// Perform a health check on the stream
2354    pub async fn health_check(&self) -> Result<bool> {
2355        // Basic health check - verify that the stream is operational
2356        // This is a simplified implementation
2357        Ok(true)
2358    }
2359
2360    /// Begin a transaction (placeholder implementation)
2361    pub async fn begin_transaction(&mut self) -> Result<()> {
2362        // Placeholder implementation for transaction support
2363        debug!("Transaction begun (placeholder)");
2364        Ok(())
2365    }
2366
2367    /// Commit a transaction (placeholder implementation)
2368    pub async fn commit_transaction(&mut self) -> Result<()> {
2369        // Placeholder implementation for transaction support
2370        debug!("Transaction committed (placeholder)");
2371        Ok(())
2372    }
2373
2374    /// Rollback a transaction (placeholder implementation)
2375    pub async fn rollback_transaction(&mut self) -> Result<()> {
2376        // Placeholder implementation for transaction support
2377        debug!("Transaction rolled back (placeholder)");
2378        Ok(())
2379    }
2380}