1#![allow(dead_code)]
29
30use anyhow::{anyhow, Result};
31use chrono::{DateTime, Utc};
32use serde::{Deserialize, Serialize};
33use std::collections::HashMap;
34use std::sync::Arc;
35use std::time::{Duration, Instant};
36use tokio::sync::{RwLock, Semaphore};
37use tracing::{debug, error, info};
38use uuid::Uuid;
39
40use crate::backend::StreamBackend;
41
42pub use backend_optimizer::{
44 BackendOptimizer, BackendPerformance, BackendRecommendation, ConsistencyLevel, CostModel,
45 OptimizationDecision, OptimizationStats, OptimizerConfig, PatternType, WorkloadPattern,
46};
47pub use backpressure::{
48 BackpressureConfig, BackpressureController, BackpressureStats, BackpressureStrategy,
49 FlowControlSignal, RateLimiter as BackpressureRateLimiter,
50};
51pub use bridge::{
52 BridgeInfo, BridgeStatistics, BridgeType, ExternalMessage, ExternalSystemConfig,
53 ExternalSystemType, MessageBridgeManager, MessageTransformer, RoutingRule,
54};
55pub use circuit_breaker::{
56 CircuitBreakerError, CircuitBreakerMetrics, FailureType, SharedCircuitBreakerExt,
57};
58pub use connection_pool::{
59 ConnectionFactory, ConnectionPool, DetailedPoolMetrics, LoadBalancingStrategy, PoolConfig,
60 PoolStats, PoolStatus,
61};
62pub use cqrs::{
63 CQRSConfig, CQRSHealthStatus, CQRSSystem, Command, CommandBus, CommandBusMetrics,
64 CommandHandler, CommandResult, Query, QueryBus, QueryBusMetrics, QueryCacheConfig,
65 QueryHandler, QueryResult as CQRSQueryResult, ReadModelManager, ReadModelMetrics,
66 ReadModelProjection, RetryConfig as CQRSRetryConfig,
67};
68pub use delta::{BatchDeltaProcessor, DeltaComputer, DeltaProcessor, ProcessorStats};
69pub use dlq::{
70 DeadLetterQueue, DlqConfig, DlqEventProcessor, DlqStats as DlqStatsExport, FailedEvent,
71 FailureReason,
72};
73pub use event::{
74 EventCategory, EventMetadata, EventPriority, IsolationLevel, QueryResult as EventQueryResult,
75 SchemaChangeType, SchemaType, SparqlOperationType, StreamEvent,
76};
77pub use event_sourcing::{
78 EventQuery, EventSnapshot, EventStore, EventStoreConfig, PersistenceBackend, QueryOrder,
79 RetentionPolicy, SnapshotConfig, StoredEvent, TimeRange as EventSourcingTimeRange,
80};
81pub use failover::{ConnectionEndpoint, FailoverConfig, FailoverManager};
82pub use graphql_bridge::{
83 BridgeConfig, BridgeStats, GraphQLBridge, GraphQLSubscription, GraphQLUpdate,
84 GraphQLUpdateType, SubscriptionFilter,
85};
86pub use multi_region_replication::{
87 ConflictResolution, ConflictType, GeographicLocation, MultiRegionReplicationManager,
88 RegionConfig, RegionHealth, ReplicatedEvent, ReplicationConfig, ReplicationStats,
89 ReplicationStrategy, VectorClock,
90};
91pub use patch::{PatchParser, PatchSerializer};
92pub use performance_optimizer::{
93 AdaptiveBatcher, AggregationFunction, AutoTuner, BatchPerformancePoint, BatchSizePredictor,
94 BatchingStats, EnhancedMLConfig, MemoryPool, MemoryPoolStats,
95 PerformanceConfig as OptimizerPerformanceConfig, ProcessingResult, ProcessingStats,
96 ProcessingStatus, TuningDecision, ZeroCopyEvent,
97};
98pub use schema_registry::{
99 CompatibilityMode, ExternalRegistryConfig, RegistryAuth, SchemaDefinition, SchemaFormat,
100 SchemaRegistry, SchemaRegistryConfig, ValidationResult, ValidationStats,
101};
102pub use sparql_streaming::{
103 ContinuousQueryManager, QueryManagerConfig, QueryMetadata, QueryResultChannel,
104 QueryResultUpdate, UpdateType,
105};
106pub use store_integration::{
107 ChangeDetectionStrategy, ChangeNotification, RealtimeUpdateManager, StoreChangeDetector,
108 UpdateChannel, UpdateFilter, UpdateNotification,
109};
110
111pub use biological_computing::{
113 AminoAcid, BiologicalProcessingStats, BiologicalStreamProcessor, Cell, CellState,
114 CellularAutomaton, ComputationalFunction, DNASequence, EvolutionaryOptimizer, FunctionalDomain,
115 Individual, Nucleotide, ProteinStructure, SequenceMetadata,
116};
117pub use consciousness_streaming::{
118 ConsciousnessLevel, ConsciousnessStats, ConsciousnessStreamProcessor, DreamSequence,
119 EmotionalContext, IntuitiveEngine, MeditationState,
120};
121pub use disaster_recovery::{
122 BackupCompression, BackupConfig, BackupEncryption, BackupFrequency, BackupJob,
123 BackupRetentionPolicy, BackupSchedule, BackupStatus, BackupStorage, BackupType,
124 BackupVerification, BackupVerificationResult, BackupWindow, BusinessContinuityConfig,
125 ChecksumAlgorithm, CompressionAlgorithm, DRMetrics, DisasterRecoveryConfig,
126 DisasterRecoveryManager, DisasterScenario, EncryptionAlgorithm as BackupEncryptionAlgorithm,
127 FailoverConfig as DRFailoverConfig, ImpactLevel, KeyDerivationFunction, RecoveryConfig,
128 RecoveryOperation, RecoveryPriority, RecoveryRunbook, RecoveryStatus, RecoveryType,
129 ReplicationConfig as DRReplicationConfig, ReplicationMode as DRReplicationMode,
130 ReplicationTarget as DRReplicationTarget, RunbookExecution, RunbookExecutionStatus,
131 RunbookStep, StorageLocation,
132};
133pub use enterprise_audit::{
134 ActionResult, AuditEncryptionConfig, AuditEventType, AuditFilterConfig, AuditMetrics,
135 AuditRetentionConfig, AuditSeverity, AuditStorageBackend, AuditStorageConfig,
136 AuditStreamingConfig, AuthType, ComplianceConfig, ComplianceFinding, ComplianceReport,
137 ComplianceStandard, CompressionType as AuditCompressionType, DestinationAuth, DestinationType,
138 EncryptionAlgorithm, EnterpriseAuditConfig, EnterpriseAuditEvent, EnterpriseAuditLogger,
139 FindingType, KeyManagementConfig, KmsType, S3AuditConfig, StreamingDestination,
140};
141pub use enterprise_monitoring::{
142 Alert, AlertCondition, AlertManager, AlertRule, AlertSeverity as MonitoringAlertSeverity,
143 AlertingConfig, BreachNotificationConfig, ComparisonOperator, EnterpriseMonitoringConfig,
144 EnterpriseMonitoringSystem, EscalationLevel, EscalationPolicy, HealthCheckConfig,
145 HealthCheckEndpoint, HealthCheckType, MeasurementWindow, MetricDefinition, MetricType,
146 MetricValue, MetricsCollector, MetricsConfig, MetricsEndpoint, MetricsEndpointType,
147 MetricsExportConfig, MetricsFormat, NotificationChannel, ProfilingConfig, SlaBreach, SlaConfig,
148 SlaMeasurement, SlaMetricType, SlaObjective, SlaSeverity, SlaStatus, SlaTracker,
149};
150pub use multi_tenancy::{
151 IsolationMode, MultiTenancyConfig, MultiTenancyManager, MultiTenancyMetrics,
152 NamespaceResources, ResourceAllocationStrategy, ResourceType, ResourceUsage, Tenant,
153 TenantLifecycleConfig, TenantNamespace, TenantQuota, TenantStatus, TenantTier,
154};
155pub use observability::{
156 AlertConfig, AlertEvent, AlertSeverity, AlertType, BusinessMetrics, SpanLog, SpanStatus,
157 StreamObservability, StreamingMetrics, TelemetryConfig, TraceSpan,
158};
159pub use performance_utils::{
160 AdaptiveRateLimiter, IntelligentMemoryPool, IntelligentPrefetcher, ParallelStreamProcessor,
161 PerformanceUtilsConfig,
162};
163pub use quantum_communication::{
164 BellState, EntanglementDistribution, QuantumCommConfig, QuantumCommSystem,
165 QuantumOperation as QuantumCommOperation, QuantumSecurityProtocol,
166 QuantumState as QuantumCommState, Qubit,
167};
168pub use quantum_streaming::{
169 QuantumEvent, QuantumOperation, QuantumProcessingStats, QuantumState, QuantumStreamProcessor,
170};
171pub use reliability::{BulkReplayResult, DlqStats, ReplayStatus};
172pub use rsp::{
173 RspConfig, RspLanguage, RspProcessor, RspQuery, StreamClause, StreamDescriptor, Window,
174 WindowConfig, WindowSize, WindowStats, WindowType,
175};
176pub use security::{
177 AuditConfig, AuditLogEntry, AuditLogger, AuthConfig, AuthMethod, AuthenticationProvider,
178 AuthorizationProvider, AuthzConfig, Credentials, EncryptionConfig, Permission, RateLimitConfig,
179 RateLimiter, SecurityConfig as StreamSecurityConfig, SecurityContext, SecurityManager,
180 SecurityMetrics, SessionConfig, ThreatAlert, ThreatDetectionConfig, ThreatDetector,
181};
182pub use temporal_join::{
183 IntervalJoin, JoinResult, LateDataConfig, LateDataStrategy, TemporalJoin, TemporalJoinConfig,
184 TemporalJoinMetrics, TemporalJoinType, TemporalWindow, TimeSemantics, WatermarkConfig,
185 WatermarkStrategy,
186};
187pub use time_travel::{
188 AggregationType, TemporalAggregations, TemporalFilter, TemporalOrdering, TemporalProjection,
189 TemporalQuery, TemporalQueryResult, TemporalResultMetadata, TemporalStatistics, TimePoint,
190 TimeRange as TimeTravelTimeRange, TimeTravelConfig, TimeTravelEngine, TimeTravelMetrics,
191 TimelinePoint,
192};
193pub use tls_security::{
194 CertRotationConfig, CertificateConfig, CertificateFormat, CertificateInfo, CipherSuite,
195 ExpiryWarning, MutualTlsConfig, OcspConfig, RevocationCheckConfig, SessionResumptionConfig,
196 TlsConfig, TlsManager, TlsMetrics, TlsSessionInfo, TlsVersion,
197};
198pub use wasm_edge_computing::{
199 EdgeExecutionResult, EdgeLocation, OptimizationLevel, PerformanceProfile, PluginCapability,
200 PluginSchema, ProcessingSpecialization, ResourceMetrics, SecurityLevel, WasmEdgeConfig,
201 WasmEdgeProcessor, WasmPlugin, WasmProcessingResult, WasmProcessorStats, WasmResourceLimits,
202};
203pub use webhook::{
204 EventFilter as WebhookEventFilter, HttpMethod, RateLimit, RetryConfig as WebhookRetryConfig,
205 WebhookConfig, WebhookInfo, WebhookManager, WebhookMetadata, WebhookSecurity,
206 WebhookStatistics,
207};
208
209pub use custom_serialization::{
211 BenchmarkResults, BsonSerializer, CustomSerializer, FlexBuffersSerializer, IonSerializer,
212 RonSerializer, SerializerBenchmark, SerializerBenchmarkSuite, SerializerRegistry,
213 SerializerStats, ThriftSerializer,
214};
215pub use end_to_end_encryption::{
216 E2EEConfig, E2EEEncryptionAlgorithm, E2EEManager, E2EEStats, EncryptedMessage,
217 HomomorphicEncryption, KeyExchangeAlgorithm, KeyPair, KeyRotationConfig, MultiPartyConfig,
218 ZeroKnowledgeProof,
219};
220pub use gpu_acceleration::{
221 AggregationOp, GpuBackend, GpuBuffer, GpuConfig, GpuContext, GpuProcessorConfig, GpuStats,
222 GpuStreamProcessor,
223};
224pub use ml_integration::{
225 AnomalyDetectionAlgorithm, AnomalyDetectionConfig, AnomalyDetector, AnomalyResult,
226 AnomalyStats, FeatureConfig, FeatureExtractor, FeatureVector, MLIntegrationManager,
227 MLModelConfig, ModelMetrics, ModelType, OnlineLearningModel, PredictionResult,
228};
229pub use rate_limiting::{
230 QuotaCheckResult, QuotaEnforcementMode, QuotaLimits, QuotaManager, QuotaOperation,
231 RateLimitAlgorithm, RateLimitConfig as AdvancedRateLimitConfig, RateLimitMonitoringConfig,
232 RateLimitStats as AdvancedRateLimitStats, RateLimiter as AdvancedRateLimiter,
233 RejectionStrategy,
234};
235pub use scalability::{
236 AdaptiveBuffer, AutoScaler, LoadBalancingStrategy as ScalingLoadBalancingStrategy,
237 Node as ScalingNode, NodeHealth, Partition, PartitionManager, PartitionStrategy,
238 ResourceLimits, ResourceUsage as ScalingResourceUsage, ScalingConfig, ScalingDirection,
239 ScalingMode,
240};
241pub use schema_evolution::{
242 CompatibilityCheckResult, CompatibilityIssue, CompatibilityIssueType,
243 CompatibilityMode as SchemaCompatibilityMode, DeprecationInfo, EvolutionResult,
244 FieldDefinition, FieldType, IssueSeverity, MigrationRule, MigrationStrategy, SchemaChange,
245 SchemaDefinition as SchemaEvolutionDefinition, SchemaEvolutionManager,
246 SchemaFormat as SchemaEvolutionFormat, SchemaVersion,
247};
248pub use stream_replay::{
249 EventProcessor, ReplayCheckpoint, ReplayConfig, ReplayFilter, ReplayMode, ReplaySpeed,
250 ReplayStats, ReplayStatus as StreamReplayStatus, ReplayTransformation, StateSnapshot,
251 StreamReplayManager, TransformationType,
252};
253pub use transactional_processing::{
254 IsolationLevel as TransactionalIsolationLevel, LogEntryType, TransactionCheckpoint,
255 TransactionLogEntry, TransactionMetadata, TransactionState, TransactionalConfig,
256 TransactionalProcessor, TransactionalStats,
257};
258pub use zero_copy::{
259 MemoryMappedBuffer, SharedRefBuffer, SimdBatchProcessor, SimdOperation, SplicedBuffer,
260 ZeroCopyBuffer, ZeroCopyConfig, ZeroCopyManager, ZeroCopyStats,
261};
262
263pub use numa_processing::{
265 CpuAffinityMode, HugePageSize, MemoryBandwidthMonitor, MemoryInterleavePolicy, NodeBufferStats,
266 NodeProcessorStats, NumaAllocationStrategy, NumaBuffer, NumaBufferPool, NumaBufferPoolConfig,
267 NumaBufferPoolStats, NumaConfig, NumaNode, NumaProcessorStats, NumaStreamProcessor,
268 NumaThreadPool, NumaThreadPoolStats, NumaTopology, NumaWorker, NumaWorkerStats,
269 WorkerDistributionStrategy,
270};
271pub use out_of_order::{
272 EmitStrategy, GapFillingStrategy, LateEventStrategy, OrderedEvent, OutOfOrderConfig,
273 OutOfOrderHandler, OutOfOrderHandlerBuilder, OutOfOrderStats, SequenceTracker, Watermark,
274};
275pub use performance_profiler::{
276 HistogramStats, LatencyHistogram, OperationTimer, PerformanceProfiler, PerformanceReport,
277 PerformanceSample, PerformanceWarning, ProfilerBuilder, ProfilerConfig, ProfilerStats,
278 Recommendation, RecommendationCategory, RecommendationEffort, RecommendationImpact, Span,
279 WarningSeverity, WarningThresholds, WarningType,
280};
281pub use stream_sql::{
282 AggregateFunction, BinaryOperator, ColumnDefinition, CreateStreamStatement, DataType,
283 Expression, FromClause, JoinType, Lexer, OrderByItem, Parser,
284 QueryResult as StreamSqlQueryResult, QueryType, ResultRow, SelectItem, SelectStatement,
285 SqlValue, StreamMetadata, StreamSqlConfig, StreamSqlEngine, StreamSqlStats, Token,
286 UnaryOperator, WindowSpec, WindowType as SqlWindowType,
287};
288pub use testing_framework::{
289 Assertion, AssertionType, CapturedEvent, EventGenerator, EventMatcher, GeneratorConfig,
290 GeneratorType, MockClock, PerformanceMetric, TestFixture, TestHarness, TestHarnessBuilder,
291 TestHarnessConfig, TestMetrics, TestReport, TestStatus,
292};
293
294pub use anomaly_detection::{
296 Anomaly, AnomalyAlert, AnomalyConfig, AnomalyDetector as AdaptiveAnomalyDetector,
297 AnomalySeverity, AnomalyStats as AdaptiveAnomalyStats, DetectorType, MultiDimensionalDetector,
298};
299pub use migration_tools::{
300 APIMapping, ConceptMapping, GeneratedFile, GeneratedFileType, ManualReviewItem,
301 MigrationConfig, MigrationError, MigrationReport, MigrationSuggestion, MigrationTool,
302 MigrationWarning, QuickStart, ReviewPriority, SourcePlatform, SuggestionCategory,
303};
304pub use online_learning::{
305 ABTestConfig, ABTestResult, DriftDetection, ModelCheckpoint,
306 ModelMetrics as OnlineModelMetrics, ModelType as OnlineModelType, OnlineLearningConfig,
307 OnlineLearningModel as StreamOnlineLearningModel, OnlineLearningStats, Prediction, Sample,
308 StreamFeatureExtractor,
309};
310pub use stream_versioning::{
311 Branch, BranchId, Change, ChangeType, Changeset, Snapshot, StreamVersioning, TimeTravelQuery,
312 TimeTravelTarget, VersionDiff, VersionId, VersionMetadata, VersionedEvent, VersioningConfig,
313 VersioningStats,
314};
315
316pub use automl_stream::{
318 Algorithm, AutoML, AutoMLConfig, AutoMLStats, HyperParameters, ModelPerformance, TaskType,
319 TrainedModel,
320};
321pub use feature_engineering::{
322 Feature, FeatureExtractionConfig, FeatureMetadata, FeaturePipeline, FeatureSet, FeatureStore,
323 FeatureTransform, FeatureValue, ImputationStrategy, PipelineStats,
324};
325pub use neural_architecture_search::{
326 ActivationType, Architecture, ArchitecturePerformance, LayerType, NASConfig, NASStats,
327 ObjectiveWeights, SearchSpace, SearchStrategy, NAS,
328};
329pub use predictive_analytics::{
330 AccuracyMetrics, ForecastAlgorithm, ForecastResult, ForecastingConfig, PredictiveAnalytics,
331 PredictiveStats, SeasonalityType, TrendDirection,
332};
333pub use reinforcement_learning::{
334 Action, Experience, RLAgent, RLAlgorithm, RLConfig, RLStats, State as RLState,
335};
336
337pub use utils::{
339 create_dev_stream, create_prod_stream, BatchProcessor, EventFilter, EventSampler,
340 SimpleRateLimiter, StreamMultiplexer, StreamStats,
341};
342
343pub use advanced_scirs2_optimization::{
345 AdvancedOptimizerConfig, AdvancedStreamOptimizer, MovingStats, OptimizerMetrics,
346};
347pub use cdc_processor::{
348 CdcConfig, CdcConnector, CdcEvent, CdcEventBuilder, CdcMetrics, CdcOperation, CdcProcessor,
349 CdcSource,
350};
351
352pub use adaptive_load_shedding::{
354 DropStrategy, LoadMetrics, LoadSheddingConfig, LoadSheddingManager, LoadSheddingStats,
355};
356
357pub use stream_fusion::{
359 FusableChain, FusedOperation, FusedType, FusionAnalysis, FusionConfig, FusionOptimizer,
360 FusionStats, Operation,
361};
362
363pub use cep_engine::{
365 CepAggregationFunction, CepConfig, CepEngine, CepMetrics, CepStatistics, CompleteMatch,
366 CorrelationFunction, CorrelationResult, CorrelationStats, DetectedPattern, DetectionAlgorithm,
367 DetectionStats, EnrichmentData, EnrichmentService, EnrichmentSource, EnrichmentSourceType,
368 EnrichmentStats, EventBuffer, EventCorrelator, EventPattern, FieldPredicate, PartialMatch,
369 PatternDetector, ProcessingRule, RuleAction, RuleCondition, RuleEngine, RuleExecutionStats,
370 State, StateMachine, TemporalOperator, TimestampedEvent,
371};
372
373pub use data_quality::{
375 AlertCondition as QualityAlertCondition, AlertManager as QualityAlertManager,
376 AlertRule as QualityAlertRule, AlertSeverity as QualityAlertSeverity,
377 AlertStats as QualityAlertStats, AlertType as QualityAlertType, AuditAction, AuditEntry,
378 AuditStats, AuditTrail, CleansingRule, CleansingStats, CorrectionType, DataCleanser,
379 DataCorrection, DataProfiler, DataQualityValidator, DuplicateDetector, DuplicateStats,
380 FailureSeverity, FieldProfile, OutlierMethod, ProfileStats, ProfiledEvent, QualityAlert,
381 QualityConfig, QualityDimension, QualityMetrics, QualityReport, QualityScorer, ScoringStats,
382 ValidationFailure, ValidationResult as QualityValidationResult, ValidationRule,
383};
384
385pub use advanced_sampling::{
387 AdvancedSamplingManager, BloomFilter, BloomFilterStats, CountMinSketch, CountMinSketchStats,
388 HyperLogLog, HyperLogLogStats, ReservoirSampler, ReservoirStats, SamplingConfig,
389 SamplingManagerStats, StratifiedSampler, StratifiedStats, TDigest, TDigestStats,
390};
391
392pub mod backend;
393pub mod backend_optimizer;
394pub mod backpressure;
395pub mod biological_computing;
396pub mod bridge;
397pub mod circuit_breaker;
398pub mod config;
399pub mod connection_pool;
400pub mod consciousness_streaming;
401pub mod consumer;
402pub mod cqels;
403pub mod cqrs;
404pub mod csparql;
405pub mod delta;
406pub mod diagnostics;
407pub mod disaster_recovery;
408pub mod dlq;
409pub mod enterprise_audit;
410pub mod enterprise_monitoring;
411pub mod error;
412pub mod event;
413pub mod event_sourcing;
414pub mod failover;
415pub mod graphql_bridge;
416pub mod graphql_subscriptions;
417pub mod health_monitor;
418pub mod join;
419pub mod monitoring;
420pub mod multi_region_replication;
421pub mod multi_tenancy;
422pub mod observability;
423pub mod patch;
424pub mod performance_optimizer;
425pub mod performance_utils;
426pub mod processing;
427pub mod producer;
428pub mod quantum_communication;
429pub mod quantum_processing;
430pub mod quantum_streaming;
431pub mod reconnect;
432pub mod reliability;
433pub mod rsp;
434pub mod schema_registry;
435pub mod security;
436pub mod serialization;
437pub mod sparql_streaming;
438pub mod state;
439pub mod store_integration;
440pub mod temporal_join;
441pub mod time_travel;
442pub mod tls_security;
443pub mod types;
444pub mod wasm_edge_computing;
445pub mod webhook;
446
447pub mod custom_serialization;
449pub mod end_to_end_encryption;
450pub mod gpu_acceleration;
451pub mod ml_integration;
452pub mod rate_limiting;
453pub mod scalability;
454pub mod schema_evolution;
455pub mod stream_replay;
456pub mod transactional_processing;
457pub mod zero_copy;
458
459pub mod numa_processing;
461pub mod out_of_order;
462pub mod performance_profiler;
463pub mod stream_sql;
464pub mod testing_framework;
465
466pub mod anomaly_detection;
468pub mod migration_tools;
469pub mod online_learning;
470pub mod stream_versioning;
471
472pub mod automl_stream;
474pub mod feature_engineering;
475pub mod neural_architecture_search;
476pub mod predictive_analytics;
477pub mod reinforcement_learning;
478
479pub mod utils;
481
482pub mod advanced_scirs2_optimization;
484pub mod cdc_processor;
485
486pub mod adaptive_load_shedding;
488
489pub mod stream_fusion;
491
492pub mod cep_engine;
494
495pub mod data_quality;
497
498pub mod advanced_sampling;
500
501#[derive(Debug, Clone, Serialize, Deserialize)]
503pub struct StreamConfig {
504 pub backend: StreamBackendType,
505 pub topic: String,
506 pub batch_size: usize,
507 pub flush_interval_ms: u64,
508 pub max_connections: usize,
510 pub connection_timeout: Duration,
512 pub enable_compression: bool,
514 pub compression_type: CompressionType,
516 pub retry_config: RetryConfig,
518 pub circuit_breaker: CircuitBreakerConfig,
520 pub security: SecurityConfig,
522 pub performance: StreamPerformanceConfig,
524 pub monitoring: MonitoringConfig,
526}
527
528#[derive(Debug, Clone, Serialize, Deserialize)]
530pub enum CompressionType {
531 None,
532 Gzip,
533 Snappy,
534 Lz4,
535 Zstd,
536}
537
538#[derive(Debug, Clone, Serialize, Deserialize)]
540pub struct RetryConfig {
541 pub max_retries: u32,
542 pub initial_backoff: Duration,
543 pub max_backoff: Duration,
544 pub backoff_multiplier: f64,
545 pub jitter: bool,
546}
547
548#[derive(Debug, Clone, Serialize, Deserialize)]
550pub struct CircuitBreakerConfig {
551 pub enabled: bool,
552 pub failure_threshold: u32,
553 pub success_threshold: u32,
554 pub timeout: Duration,
555 pub half_open_max_calls: u32,
556}
557
558#[derive(Debug, Clone, Serialize, Deserialize)]
560pub struct SecurityConfig {
561 pub enable_tls: bool,
562 pub verify_certificates: bool,
563 pub client_cert_path: Option<String>,
564 pub client_key_path: Option<String>,
565 pub ca_cert_path: Option<String>,
566 pub sasl_config: Option<SaslConfig>,
567}
568
569#[derive(Debug, Clone, Serialize, Deserialize)]
571pub struct SaslConfig {
572 pub mechanism: SaslMechanism,
573 pub username: String,
574 pub password: String,
575}
576
577#[derive(Debug, Clone, Serialize, Deserialize)]
579pub enum SaslMechanism {
580 Plain,
581 ScramSha256,
582 ScramSha512,
583 OAuthBearer,
584}
585
586#[derive(Debug, Clone, Serialize, Deserialize)]
588pub struct StreamPerformanceConfig {
589 pub enable_batching: bool,
590 pub enable_pipelining: bool,
591 pub buffer_size: usize,
592 pub prefetch_count: u32,
593 pub enable_zero_copy: bool,
594 pub enable_simd: bool,
595 pub parallel_processing: bool,
596 pub worker_threads: Option<usize>,
597}
598
599#[derive(Debug, Clone, Serialize, Deserialize)]
601pub struct MonitoringConfig {
602 pub enable_metrics: bool,
603 pub enable_tracing: bool,
604 pub metrics_interval: Duration,
605 pub health_check_interval: Duration,
606 pub enable_profiling: bool,
607 pub prometheus_endpoint: Option<String>,
608 pub jaeger_endpoint: Option<String>,
609 pub log_level: String,
610}
611
612#[derive(Debug, Clone, Serialize, Deserialize)]
614pub enum StreamBackendType {
615 #[cfg(feature = "kafka")]
616 Kafka {
617 brokers: Vec<String>,
618 security_protocol: Option<String>,
619 sasl_config: Option<SaslConfig>,
620 },
621 #[cfg(feature = "nats")]
622 Nats {
623 url: String,
624 cluster_urls: Option<Vec<String>>,
625 jetstream_config: Option<NatsJetStreamConfig>,
626 },
627 #[cfg(feature = "redis")]
628 Redis {
629 url: String,
630 cluster_urls: Option<Vec<String>>,
631 pool_size: Option<usize>,
632 },
633 #[cfg(feature = "kinesis")]
634 Kinesis {
635 region: String,
636 stream_name: String,
637 credentials: Option<AwsCredentials>,
638 },
639 #[cfg(feature = "pulsar")]
640 Pulsar {
641 service_url: String,
642 auth_config: Option<PulsarAuthConfig>,
643 },
644 #[cfg(feature = "rabbitmq")]
645 RabbitMQ {
646 url: String,
647 exchange: Option<String>,
648 queue: Option<String>,
649 },
650 Memory {
651 max_size: Option<usize>,
652 persistence: bool,
653 },
654}
655
656#[derive(Debug, Clone, Serialize, Deserialize)]
658pub struct NatsJetStreamConfig {
659 pub domain: Option<String>,
660 pub api_prefix: Option<String>,
661 pub timeout: Duration,
662}
663
664#[derive(Debug, Clone, Serialize, Deserialize)]
666pub struct AwsCredentials {
667 pub access_key_id: String,
668 pub secret_access_key: String,
669 pub session_token: Option<String>,
670 pub role_arn: Option<String>,
671}
672
673#[derive(Debug, Clone, Serialize, Deserialize)]
675pub struct PulsarAuthConfig {
676 pub auth_method: PulsarAuthMethod,
677 pub auth_params: HashMap<String, String>,
678}
679
680#[derive(Debug, Clone, Serialize, Deserialize)]
682pub enum PulsarAuthMethod {
683 Token,
684 Jwt,
685 Oauth2,
686 Tls,
687}
688
689pub struct StreamProducer {
691 config: StreamConfig,
692 backend_producer: BackendProducer,
693 stats: Arc<RwLock<ProducerStats>>,
694 circuit_breaker: Option<circuit_breaker::SharedCircuitBreaker>,
695 last_flush: Instant,
696 _pending_events: Arc<RwLock<Vec<StreamEvent>>>,
697 batch_buffer: Arc<RwLock<Vec<StreamEvent>>>,
698 flush_semaphore: Arc<Semaphore>,
699}
700
701enum BackendProducer {
703 #[cfg(feature = "kafka")]
704 Kafka(backend::kafka::KafkaProducer),
705 #[cfg(feature = "nats")]
706 Nats(Box<backend::nats::NatsProducer>),
707 #[cfg(feature = "redis")]
708 Redis(backend::redis::RedisProducer),
709 #[cfg(feature = "kinesis")]
710 Kinesis(backend::kinesis::KinesisProducer),
711 #[cfg(feature = "pulsar")]
712 Pulsar(Box<backend::pulsar::PulsarProducer>),
713 #[cfg(feature = "rabbitmq")]
714 RabbitMQ(Box<backend::rabbitmq::RabbitMQProducer>),
715 Memory(MemoryProducer),
716}
717
718#[derive(Debug, Default, Clone)]
720pub struct ProducerStats {
721 events_published: u64,
722 events_failed: u64,
723 _bytes_sent: u64,
724 avg_latency_ms: f64,
725 max_latency_ms: u64,
726 batch_count: u64,
727 flush_count: u64,
728 circuit_breaker_trips: u64,
729 last_publish: Option<DateTime<Utc>>,
730 backend_type: String,
731}
732
733type MemoryEventVec = Vec<(DateTime<Utc>, StreamEvent)>;
736type MemoryEventStore = Arc<RwLock<MemoryEventVec>>;
737
738static MEMORY_EVENTS: std::sync::OnceLock<MemoryEventStore> = std::sync::OnceLock::new();
740
741pub fn get_memory_events() -> MemoryEventStore {
742 MEMORY_EVENTS
743 .get_or_init(|| Arc::new(RwLock::new(Vec::new())))
744 .clone()
745}
746
747pub async fn clear_memory_events() {
749 let events = get_memory_events();
750 events.write().await.clear();
751 backend::memory::clear_memory_storage().await;
753}
754
755struct MemoryProducer {
756 backend: Box<dyn StreamBackend + Send + Sync>,
757 topic: String,
758 stats: ProducerStats,
759}
760
761impl MemoryProducer {
762 fn _new(_max_size: Option<usize>, _persistence: bool) -> Self {
763 Self {
764 backend: Box::new(backend::memory::MemoryBackend::new()),
765 topic: "oxirs-stream".to_string(), stats: ProducerStats {
767 backend_type: "memory".to_string(),
768 ..Default::default()
769 },
770 }
771 }
772
773 fn with_topic(topic: String) -> Self {
774 Self {
775 backend: Box::new(backend::memory::MemoryBackend::new()),
776 topic,
777 stats: ProducerStats {
778 backend_type: "memory".to_string(),
779 ..Default::default()
780 },
781 }
782 }
783
784 async fn publish(&mut self, event: StreamEvent) -> Result<()> {
785 let start_time = Instant::now();
786
787 self.backend
789 .as_mut()
790 .connect()
791 .await
792 .map_err(|e| anyhow!("Backend connect failed: {}", e))?;
793
794 let topic_name = crate::types::TopicName::new(self.topic.clone());
796 self.backend
797 .create_topic(&topic_name, 1)
798 .await
799 .map_err(|e| anyhow!("Topic creation failed: {}", e))?;
800
801 self.backend
803 .send_event(&topic_name, event)
804 .await
805 .map_err(|e| anyhow!("Send event failed: {}", e))?;
806
807 self.stats.events_published += 1;
809 let latency = start_time.elapsed().as_millis() as u64;
810 self.stats.max_latency_ms = self.stats.max_latency_ms.max(latency);
811 self.stats.avg_latency_ms = (self.stats.avg_latency_ms + latency as f64) / 2.0;
812 self.stats.last_publish = Some(Utc::now());
813
814 debug!("Memory producer: published event via backend");
815 Ok(())
816 }
817
818 async fn flush(&mut self) -> Result<()> {
819 self.stats.flush_count += 1;
821 debug!("Memory producer: flush completed");
822 Ok(())
823 }
824
825 fn _get_stats(&self) -> &ProducerStats {
826 &self.stats
827 }
828}
829
830impl StreamProducer {
831 pub async fn new(config: StreamConfig) -> Result<Self> {
833 let circuit_breaker = if config.circuit_breaker.enabled {
835 Some(circuit_breaker::new_shared_circuit_breaker(
836 circuit_breaker::CircuitBreakerConfig {
837 enabled: config.circuit_breaker.enabled,
838 failure_threshold: config.circuit_breaker.failure_threshold,
839 success_threshold: config.circuit_breaker.success_threshold,
840 timeout: config.circuit_breaker.timeout,
841 half_open_max_calls: config.circuit_breaker.half_open_max_calls,
842 ..Default::default()
843 },
844 ))
845 } else {
846 None
847 };
848
849 let backend_producer = match &config.backend {
851 #[cfg(feature = "kafka")]
852 StreamBackendType::Kafka {
853 brokers,
854 security_protocol,
855 sasl_config,
856 } => {
857 let stream_config = crate::StreamConfig {
858 backend: crate::StreamBackendType::Kafka {
859 brokers: brokers.clone(),
860 security_protocol: security_protocol.clone(),
861 sasl_config: sasl_config.clone(),
862 },
863 topic: config.topic.clone(),
864 batch_size: config.batch_size,
865 flush_interval_ms: config.flush_interval_ms,
866 max_connections: config.max_connections,
867 connection_timeout: config.connection_timeout,
868 enable_compression: config.enable_compression,
869 compression_type: config.compression_type.clone(),
870 retry_config: config.retry_config.clone(),
871 circuit_breaker: config.circuit_breaker.clone(),
872 security: config.security.clone(),
873 performance: config.performance.clone(),
874 monitoring: config.monitoring.clone(),
875 };
876
877 let mut producer = backend::kafka::KafkaProducer::new(stream_config)?;
878 producer.connect().await?;
879 BackendProducer::Kafka(producer)
880 }
881 #[cfg(feature = "nats")]
882 StreamBackendType::Nats {
883 url,
884 cluster_urls,
885 jetstream_config,
886 } => {
887 let stream_config = crate::StreamConfig {
888 backend: crate::StreamBackendType::Nats {
889 url: url.clone(),
890 cluster_urls: cluster_urls.clone(),
891 jetstream_config: jetstream_config.clone(),
892 },
893 topic: config.topic.clone(),
894 batch_size: config.batch_size,
895 flush_interval_ms: config.flush_interval_ms,
896 max_connections: config.max_connections,
897 connection_timeout: config.connection_timeout,
898 enable_compression: config.enable_compression,
899 compression_type: config.compression_type.clone(),
900 retry_config: config.retry_config.clone(),
901 circuit_breaker: config.circuit_breaker.clone(),
902 security: config.security.clone(),
903 performance: config.performance.clone(),
904 monitoring: config.monitoring.clone(),
905 };
906
907 let mut producer = backend::nats::NatsProducer::new(stream_config)?;
908 producer.connect().await?;
909 BackendProducer::Nats(Box::new(producer))
910 }
911 #[cfg(feature = "redis")]
912 StreamBackendType::Redis {
913 url,
914 cluster_urls,
915 pool_size,
916 } => {
917 let stream_config = crate::StreamConfig {
918 backend: crate::StreamBackendType::Redis {
919 url: url.clone(),
920 cluster_urls: cluster_urls.clone(),
921 pool_size: *pool_size,
922 },
923 topic: config.topic.clone(),
924 batch_size: config.batch_size,
925 flush_interval_ms: config.flush_interval_ms,
926 max_connections: config.max_connections,
927 connection_timeout: config.connection_timeout,
928 enable_compression: config.enable_compression,
929 compression_type: config.compression_type.clone(),
930 retry_config: config.retry_config.clone(),
931 circuit_breaker: config.circuit_breaker.clone(),
932 security: config.security.clone(),
933 performance: config.performance.clone(),
934 monitoring: config.monitoring.clone(),
935 };
936
937 let mut producer = backend::redis::RedisProducer::new(stream_config)?;
938 producer.connect().await?;
939 BackendProducer::Redis(producer)
940 }
941 #[cfg(feature = "kinesis")]
942 StreamBackendType::Kinesis {
943 region,
944 stream_name,
945 credentials,
946 } => {
947 let stream_config = crate::StreamConfig {
948 backend: crate::StreamBackendType::Kinesis {
949 region: region.clone(),
950 stream_name: stream_name.clone(),
951 credentials: credentials.clone(),
952 },
953 topic: config.topic.clone(),
954 batch_size: config.batch_size,
955 flush_interval_ms: config.flush_interval_ms,
956 max_connections: config.max_connections,
957 connection_timeout: config.connection_timeout,
958 enable_compression: config.enable_compression,
959 compression_type: config.compression_type.clone(),
960 retry_config: config.retry_config.clone(),
961 circuit_breaker: config.circuit_breaker.clone(),
962 security: config.security.clone(),
963 performance: config.performance.clone(),
964 monitoring: config.monitoring.clone(),
965 };
966
967 let mut producer = backend::kinesis::KinesisProducer::new(stream_config)?;
968 producer.connect().await?;
969 BackendProducer::Kinesis(producer)
970 }
971 #[cfg(feature = "pulsar")]
972 StreamBackendType::Pulsar {
973 service_url,
974 auth_config,
975 } => {
976 let stream_config = crate::StreamConfig {
977 backend: crate::StreamBackendType::Pulsar {
978 service_url: service_url.clone(),
979 auth_config: auth_config.clone(),
980 },
981 topic: config.topic.clone(),
982 batch_size: config.batch_size,
983 flush_interval_ms: config.flush_interval_ms,
984 max_connections: config.max_connections,
985 connection_timeout: config.connection_timeout,
986 enable_compression: config.enable_compression,
987 compression_type: config.compression_type.clone(),
988 retry_config: config.retry_config.clone(),
989 circuit_breaker: config.circuit_breaker.clone(),
990 security: config.security.clone(),
991 performance: config.performance.clone(),
992 monitoring: config.monitoring.clone(),
993 };
994
995 let mut producer = backend::pulsar::PulsarProducer::new(stream_config)?;
996 producer.connect().await?;
997 BackendProducer::Pulsar(Box::new(producer))
998 }
999 #[cfg(feature = "rabbitmq")]
1000 StreamBackendType::RabbitMQ {
1001 url,
1002 exchange,
1003 queue,
1004 } => {
1005 let stream_config = crate::StreamConfig {
1006 backend: crate::StreamBackendType::RabbitMQ {
1007 url: url.clone(),
1008 exchange: exchange.clone(),
1009 queue: queue.clone(),
1010 },
1011 topic: config.topic.clone(),
1012 batch_size: config.batch_size,
1013 flush_interval_ms: config.flush_interval_ms,
1014 max_connections: config.max_connections,
1015 connection_timeout: config.connection_timeout,
1016 enable_compression: config.enable_compression,
1017 compression_type: config.compression_type.clone(),
1018 retry_config: config.retry_config.clone(),
1019 circuit_breaker: config.circuit_breaker.clone(),
1020 security: config.security.clone(),
1021 performance: config.performance.clone(),
1022 monitoring: config.monitoring.clone(),
1023 };
1024
1025 let mut producer = backend::rabbitmq::RabbitMQProducer::new(stream_config)?;
1026 producer.connect().await?;
1027 BackendProducer::RabbitMQ(Box::new(producer))
1028 }
1029 StreamBackendType::Memory {
1030 max_size: _,
1031 persistence: _,
1032 } => BackendProducer::Memory(MemoryProducer::with_topic(config.topic.clone())),
1033 };
1034
1035 let stats = Arc::new(RwLock::new(ProducerStats {
1036 backend_type: match backend_producer {
1037 #[cfg(feature = "kafka")]
1038 BackendProducer::Kafka(_) => "kafka".to_string(),
1039 #[cfg(feature = "nats")]
1040 BackendProducer::Nats(_) => "nats".to_string(),
1041 #[cfg(feature = "redis")]
1042 BackendProducer::Redis(_) => "redis".to_string(),
1043 #[cfg(feature = "kinesis")]
1044 BackendProducer::Kinesis(_) => "kinesis".to_string(),
1045 #[cfg(feature = "pulsar")]
1046 BackendProducer::Pulsar(_) => "pulsar".to_string(),
1047 #[cfg(feature = "rabbitmq")]
1048 BackendProducer::RabbitMQ(_) => "rabbitmq".to_string(),
1049 BackendProducer::Memory(_) => "memory".to_string(),
1050 },
1051 ..Default::default()
1052 }));
1053
1054 info!(
1055 "Created stream producer with backend: {}",
1056 stats.read().await.backend_type
1057 );
1058
1059 Ok(Self {
1060 config,
1061 backend_producer,
1062 stats,
1063 circuit_breaker,
1064 last_flush: Instant::now(),
1065 _pending_events: Arc::new(RwLock::new(Vec::new())),
1066 batch_buffer: Arc::new(RwLock::new(Vec::new())),
1067 flush_semaphore: Arc::new(Semaphore::new(1)),
1068 })
1069 }
1070
1071 pub async fn publish(&mut self, event: StreamEvent) -> Result<()> {
1073 let start_time = Instant::now();
1074
1075 if let Some(cb) = &self.circuit_breaker {
1077 if !cb.can_execute().await {
1078 self.stats.write().await.circuit_breaker_trips += 1;
1079 return Err(anyhow!("Circuit breaker is open - cannot publish events"));
1080 }
1081 }
1082
1083 if self.config.performance.enable_batching {
1085 let mut batch_buffer = self.batch_buffer.write().await;
1086 batch_buffer.push(event);
1087
1088 if batch_buffer.len() >= self.config.batch_size {
1089 let events = std::mem::take(&mut *batch_buffer);
1090 drop(batch_buffer);
1091 return self.publish_batch_internal(events).await;
1092 }
1093
1094 return Ok(());
1095 }
1096
1097 let result = self.publish_single_event(event).await;
1099
1100 match &result {
1102 Ok(_) => {
1103 if let Some(cb) = &self.circuit_breaker {
1104 cb.record_success_with_duration(start_time.elapsed()).await;
1105 }
1106
1107 let mut stats = self.stats.write().await;
1108 stats.events_published += 1;
1109 let latency = start_time.elapsed().as_millis() as u64;
1110 stats.max_latency_ms = stats.max_latency_ms.max(latency);
1111 stats.avg_latency_ms = (stats.avg_latency_ms + latency as f64) / 2.0;
1112 stats.last_publish = Some(Utc::now());
1113 }
1114 Err(_) => {
1115 if let Some(cb) = &self.circuit_breaker {
1116 cb.record_failure_with_type(circuit_breaker::FailureType::NetworkError)
1117 .await;
1118 }
1119
1120 self.stats.write().await.events_failed += 1;
1121 }
1122 }
1123
1124 result
1125 }
1126
1127 async fn publish_single_event(&mut self, event: StreamEvent) -> Result<()> {
1129 match &mut self.backend_producer {
1130 #[cfg(feature = "kafka")]
1131 BackendProducer::Kafka(producer) => producer.publish(event).await,
1132 #[cfg(feature = "nats")]
1133 BackendProducer::Nats(producer) => producer.publish(event).await,
1134 #[cfg(feature = "redis")]
1135 BackendProducer::Redis(producer) => producer.publish(event).await,
1136 #[cfg(feature = "kinesis")]
1137 BackendProducer::Kinesis(producer) => producer.publish(event).await,
1138 #[cfg(feature = "pulsar")]
1139 BackendProducer::Pulsar(producer) => producer.publish(event).await,
1140 #[cfg(feature = "rabbitmq")]
1141 BackendProducer::RabbitMQ(producer) => producer.publish(event).await,
1142 BackendProducer::Memory(producer) => producer.publish(event).await,
1143 }
1144 }
1145
1146 pub async fn publish_batch(&mut self, events: Vec<StreamEvent>) -> Result<()> {
1148 if events.is_empty() {
1149 return Ok(());
1150 }
1151
1152 self.publish_batch_internal(events).await
1153 }
1154
1155 async fn publish_batch_internal(&mut self, events: Vec<StreamEvent>) -> Result<()> {
1157 let start_time = Instant::now();
1158 let event_count = events.len();
1159
1160 let result = match &mut self.backend_producer {
1161 #[cfg(feature = "kafka")]
1162 BackendProducer::Kafka(producer) => producer.publish_batch(events).await,
1163 #[cfg(feature = "nats")]
1164 BackendProducer::Nats(producer) => producer.publish_batch(events).await,
1165 #[cfg(feature = "redis")]
1166 BackendProducer::Redis(producer) => producer.publish_batch(events).await,
1167 #[cfg(feature = "kinesis")]
1168 BackendProducer::Kinesis(producer) => producer.publish_batch(events).await,
1169 #[cfg(feature = "pulsar")]
1170 BackendProducer::Pulsar(producer) => producer.publish_batch(events).await,
1171 #[cfg(feature = "rabbitmq")]
1172 BackendProducer::RabbitMQ(producer) => producer.publish_batch(events).await,
1173 BackendProducer::Memory(producer) => {
1174 for event in events {
1175 producer.publish(event).await?;
1176 }
1177 Ok(())
1178 }
1179 };
1180
1181 let mut stats = self.stats.write().await;
1183 match &result {
1184 Ok(_) => {
1185 stats.events_published += event_count as u64;
1186 stats.batch_count += 1;
1187 let latency = start_time.elapsed().as_millis() as u64;
1188 stats.max_latency_ms = stats.max_latency_ms.max(latency);
1189 stats.avg_latency_ms = (stats.avg_latency_ms + latency as f64) / 2.0;
1190 stats.last_publish = Some(Utc::now());
1191 }
1192 Err(_) => {
1193 stats.events_failed += event_count as u64;
1194 }
1195 }
1196
1197 debug!(
1198 "Published batch of {} events in {:?}",
1199 event_count,
1200 start_time.elapsed()
1201 );
1202 result
1203 }
1204
1205 pub async fn flush(&mut self) -> Result<()> {
1207 let _permit = self
1208 .flush_semaphore
1209 .acquire()
1210 .await
1211 .map_err(|_| anyhow!("Failed to acquire flush semaphore"))?;
1212
1213 let start_time = Instant::now();
1214
1215 if self.config.performance.enable_batching {
1217 let events = {
1218 let mut batch_buffer = self.batch_buffer.write().await;
1219 if !batch_buffer.is_empty() {
1220 std::mem::take(&mut *batch_buffer)
1221 } else {
1222 Vec::new()
1223 }
1224 };
1225 if !events.is_empty() {
1226 drop(_permit); self.publish_batch_internal(events).await?;
1228 }
1229 }
1230
1231 let result = match &mut self.backend_producer {
1233 #[cfg(feature = "kafka")]
1234 BackendProducer::Kafka(producer) => producer.flush().await,
1235 #[cfg(feature = "nats")]
1236 BackendProducer::Nats(producer) => producer.flush().await,
1237 #[cfg(feature = "redis")]
1238 BackendProducer::Redis(producer) => producer.flush().await,
1239 #[cfg(feature = "kinesis")]
1240 BackendProducer::Kinesis(producer) => producer.flush().await,
1241 #[cfg(feature = "pulsar")]
1242 BackendProducer::Pulsar(producer) => producer.flush().await,
1243 #[cfg(feature = "rabbitmq")]
1244 BackendProducer::RabbitMQ(producer) => producer.flush().await,
1245 BackendProducer::Memory(producer) => producer.flush().await,
1246 };
1247
1248 if result.is_ok() {
1250 self.stats.write().await.flush_count += 1;
1251 self.last_flush = Instant::now();
1252 debug!("Flushed producer buffers in {:?}", start_time.elapsed());
1253 }
1254
1255 result
1256 }
1257
1258 pub async fn publish_patch(&mut self, patch: &RdfPatch) -> Result<()> {
1260 let events: Vec<StreamEvent> = patch
1261 .operations
1262 .iter()
1263 .filter_map(|op| {
1264 let metadata = EventMetadata {
1265 event_id: Uuid::new_v4().to_string(),
1266 timestamp: patch.timestamp,
1267 source: "rdf_patch".to_string(),
1268 user: None,
1269 context: Some(patch.id.clone()),
1270 caused_by: None,
1271 version: "1.0".to_string(),
1272 properties: HashMap::new(),
1273 checksum: None,
1274 };
1275
1276 match op {
1277 PatchOperation::Add {
1278 subject,
1279 predicate,
1280 object,
1281 } => Some(StreamEvent::TripleAdded {
1282 subject: subject.clone(),
1283 predicate: predicate.clone(),
1284 object: object.clone(),
1285 graph: None,
1286 metadata,
1287 }),
1288 PatchOperation::Delete {
1289 subject,
1290 predicate,
1291 object,
1292 } => Some(StreamEvent::TripleRemoved {
1293 subject: subject.clone(),
1294 predicate: predicate.clone(),
1295 object: object.clone(),
1296 graph: None,
1297 metadata,
1298 }),
1299 PatchOperation::AddGraph { graph } => Some(StreamEvent::GraphCreated {
1300 graph: graph.clone(),
1301 metadata,
1302 }),
1303 PatchOperation::DeleteGraph { graph } => Some(StreamEvent::GraphDeleted {
1304 graph: graph.clone(),
1305 metadata,
1306 }),
1307 PatchOperation::AddPrefix { .. } => {
1308 None
1310 }
1311 PatchOperation::DeletePrefix { .. } => {
1312 None
1314 }
1315 PatchOperation::TransactionBegin { .. } => {
1316 Some(StreamEvent::TransactionBegin {
1317 transaction_id: patch
1318 .transaction_id
1319 .clone()
1320 .unwrap_or_else(|| Uuid::new_v4().to_string()),
1321 isolation_level: None,
1322 metadata,
1323 })
1324 }
1325 PatchOperation::TransactionCommit => Some(StreamEvent::TransactionCommit {
1326 transaction_id: patch
1327 .transaction_id
1328 .clone()
1329 .unwrap_or_else(|| Uuid::new_v4().to_string()),
1330 metadata,
1331 }),
1332 PatchOperation::TransactionAbort => Some(StreamEvent::TransactionAbort {
1333 transaction_id: patch
1334 .transaction_id
1335 .clone()
1336 .unwrap_or_else(|| Uuid::new_v4().to_string()),
1337 metadata,
1338 }),
1339 PatchOperation::Header { .. } => {
1340 None
1342 }
1343 }
1344 })
1345 .collect();
1346
1347 self.publish_batch(events).await
1348 }
1349
1350 pub async fn get_stats(&self) -> ProducerStats {
1352 self.stats.read().await.clone()
1353 }
1354
1355 pub async fn health_check(&self) -> bool {
1357 if let Some(cb) = &self.circuit_breaker {
1358 cb.is_healthy().await
1359 } else {
1360 true
1361 }
1362 }
1363}
1364
1365pub struct StreamConsumer {
1367 _config: StreamConfig,
1368 backend_consumer: BackendConsumer,
1369 stats: Arc<RwLock<ConsumerStats>>,
1370 circuit_breaker: Option<circuit_breaker::SharedCircuitBreaker>,
1371 last_poll: Instant,
1372 _message_buffer: Arc<RwLock<Vec<StreamEvent>>>,
1373 consumer_group: Option<String>,
1374}
1375
1376enum BackendConsumer {
1378 #[cfg(feature = "kafka")]
1379 Kafka(backend::kafka::KafkaConsumer),
1380 #[cfg(feature = "nats")]
1381 Nats(Box<backend::nats::NatsConsumer>),
1382 #[cfg(feature = "redis")]
1383 Redis(backend::redis::RedisConsumer),
1384 #[cfg(feature = "kinesis")]
1385 Kinesis(backend::kinesis::KinesisConsumer),
1386 #[cfg(feature = "pulsar")]
1387 Pulsar(Box<backend::pulsar::PulsarConsumer>),
1388 #[cfg(feature = "rabbitmq")]
1389 RabbitMQ(Box<backend::rabbitmq::RabbitMQConsumer>),
1390 Memory(MemoryConsumer),
1391}
1392
1393#[derive(Debug, Default, Clone)]
1395pub struct ConsumerStats {
1396 events_consumed: u64,
1397 events_failed: u64,
1398 _bytes_received: u64,
1399 avg_processing_time_ms: f64,
1400 max_processing_time_ms: u64,
1401 _consumer_lag: u64,
1402 circuit_breaker_trips: u64,
1403 last_message: Option<DateTime<Utc>>,
1404 backend_type: String,
1405 _batch_size: usize,
1406}
1407
1408struct MemoryConsumer {
1410 backend: Box<dyn StreamBackend + Send + Sync>,
1411 topic: String,
1412 current_offset: u64,
1413 stats: ConsumerStats,
1414}
1415
1416impl MemoryConsumer {
1417 fn _new() -> Self {
1418 Self {
1419 backend: Box::new(backend::memory::MemoryBackend::new()),
1420 topic: "oxirs-stream".to_string(),
1421 current_offset: 0,
1422 stats: ConsumerStats {
1423 backend_type: "memory".to_string(),
1424 ..Default::default()
1425 },
1426 }
1427 }
1428
1429 fn with_topic(topic: String) -> Self {
1430 Self {
1431 backend: Box::new(backend::memory::MemoryBackend::new()),
1432 topic,
1433 current_offset: 0,
1434 stats: ConsumerStats {
1435 backend_type: "memory".to_string(),
1436 ..Default::default()
1437 },
1438 }
1439 }
1440
1441 async fn consume(&mut self) -> Result<Option<StreamEvent>> {
1442 let start_time = Instant::now();
1443
1444 self.backend
1446 .as_mut()
1447 .connect()
1448 .await
1449 .map_err(|e| anyhow!("Backend connect failed: {}", e))?;
1450
1451 let topic_name = crate::types::TopicName::new(self.topic.clone());
1453 let events = self
1454 .backend
1455 .receive_events(
1456 &topic_name,
1457 None, crate::types::StreamPosition::Offset(self.current_offset),
1459 1, )
1461 .await
1462 .map_err(|e| anyhow!("Receive events failed: {}", e))?;
1463
1464 if let Some((event, offset)) = events.first() {
1465 self.current_offset = offset.value() + 1;
1466
1467 self.stats.events_consumed += 1;
1469 let processing_time = start_time.elapsed().as_millis() as u64;
1470 self.stats.max_processing_time_ms =
1471 self.stats.max_processing_time_ms.max(processing_time);
1472 self.stats.avg_processing_time_ms =
1473 (self.stats.avg_processing_time_ms + processing_time as f64) / 2.0;
1474 self.stats.last_message = Some(Utc::now());
1475
1476 debug!("Memory consumer: consumed event via backend");
1477 Ok(Some(event.clone()))
1478 } else {
1479 debug!("Memory consumer: no events available");
1480 Ok(None)
1481 }
1482 }
1483
1484 fn _get_stats(&self) -> &ConsumerStats {
1485 &self.stats
1486 }
1487
1488 fn reset(&mut self) {
1490 self.current_offset = 0;
1491 }
1492}
1493
1494impl StreamConsumer {
1495 pub async fn new(config: StreamConfig) -> Result<Self> {
1497 Self::new_with_group(config, None).await
1498 }
1499
1500 pub async fn new_with_group(
1502 config: StreamConfig,
1503 consumer_group: Option<String>,
1504 ) -> Result<Self> {
1505 let circuit_breaker = if config.circuit_breaker.enabled {
1507 Some(circuit_breaker::new_shared_circuit_breaker(
1508 circuit_breaker::CircuitBreakerConfig {
1509 enabled: config.circuit_breaker.enabled,
1510 failure_threshold: config.circuit_breaker.failure_threshold,
1511 success_threshold: config.circuit_breaker.success_threshold,
1512 timeout: config.circuit_breaker.timeout,
1513 half_open_max_calls: config.circuit_breaker.half_open_max_calls,
1514 ..Default::default()
1515 },
1516 ))
1517 } else {
1518 None
1519 };
1520
1521 let backend_consumer = match &config.backend {
1523 #[cfg(feature = "kafka")]
1524 StreamBackendType::Kafka {
1525 brokers,
1526 security_protocol,
1527 sasl_config,
1528 } => {
1529 let stream_config = crate::StreamConfig {
1530 backend: crate::StreamBackendType::Kafka {
1531 brokers: brokers.clone(),
1532 security_protocol: security_protocol.clone(),
1533 sasl_config: sasl_config.clone(),
1534 },
1535 topic: config.topic.clone(),
1536 batch_size: config.batch_size,
1537 flush_interval_ms: config.flush_interval_ms,
1538 max_connections: config.max_connections,
1539 connection_timeout: config.connection_timeout,
1540 enable_compression: config.enable_compression,
1541 compression_type: config.compression_type.clone(),
1542 retry_config: config.retry_config.clone(),
1543 circuit_breaker: config.circuit_breaker.clone(),
1544 security: config.security.clone(),
1545 performance: config.performance.clone(),
1546 monitoring: config.monitoring.clone(),
1547 };
1548
1549 let mut consumer = backend::kafka::KafkaConsumer::new(stream_config)?;
1550 consumer.connect().await?;
1551 BackendConsumer::Kafka(consumer)
1552 }
1553 #[cfg(feature = "nats")]
1554 StreamBackendType::Nats {
1555 url,
1556 cluster_urls,
1557 jetstream_config,
1558 } => {
1559 let stream_config = crate::StreamConfig {
1560 backend: crate::StreamBackendType::Nats {
1561 url: url.clone(),
1562 cluster_urls: cluster_urls.clone(),
1563 jetstream_config: jetstream_config.clone(),
1564 },
1565 topic: config.topic.clone(),
1566 batch_size: config.batch_size,
1567 flush_interval_ms: config.flush_interval_ms,
1568 max_connections: config.max_connections,
1569 connection_timeout: config.connection_timeout,
1570 enable_compression: config.enable_compression,
1571 compression_type: config.compression_type.clone(),
1572 retry_config: config.retry_config.clone(),
1573 circuit_breaker: config.circuit_breaker.clone(),
1574 security: config.security.clone(),
1575 performance: config.performance.clone(),
1576 monitoring: config.monitoring.clone(),
1577 };
1578
1579 let mut consumer = backend::nats::NatsConsumer::new(stream_config)?;
1580 consumer.connect().await?;
1581 BackendConsumer::Nats(Box::new(consumer))
1582 }
1583 #[cfg(feature = "redis")]
1584 StreamBackendType::Redis {
1585 url,
1586 cluster_urls,
1587 pool_size,
1588 } => {
1589 let stream_config = crate::StreamConfig {
1590 backend: crate::StreamBackendType::Redis {
1591 url: url.clone(),
1592 cluster_urls: cluster_urls.clone(),
1593 pool_size: *pool_size,
1594 },
1595 topic: config.topic.clone(),
1596 batch_size: config.batch_size,
1597 flush_interval_ms: config.flush_interval_ms,
1598 max_connections: config.max_connections,
1599 connection_timeout: config.connection_timeout,
1600 enable_compression: config.enable_compression,
1601 compression_type: config.compression_type.clone(),
1602 retry_config: config.retry_config.clone(),
1603 circuit_breaker: config.circuit_breaker.clone(),
1604 security: config.security.clone(),
1605 performance: config.performance.clone(),
1606 monitoring: config.monitoring.clone(),
1607 };
1608
1609 let mut consumer = backend::redis::RedisConsumer::new(stream_config)?;
1610 consumer.connect().await?;
1611 BackendConsumer::Redis(consumer)
1612 }
1613 #[cfg(feature = "kinesis")]
1614 StreamBackendType::Kinesis {
1615 region,
1616 stream_name,
1617 credentials,
1618 } => {
1619 let stream_config = crate::StreamConfig {
1620 backend: crate::StreamBackendType::Kinesis {
1621 region: region.clone(),
1622 stream_name: stream_name.clone(),
1623 credentials: credentials.clone(),
1624 },
1625 topic: config.topic.clone(),
1626 batch_size: config.batch_size,
1627 flush_interval_ms: config.flush_interval_ms,
1628 max_connections: config.max_connections,
1629 connection_timeout: config.connection_timeout,
1630 enable_compression: config.enable_compression,
1631 compression_type: config.compression_type.clone(),
1632 retry_config: config.retry_config.clone(),
1633 circuit_breaker: config.circuit_breaker.clone(),
1634 security: config.security.clone(),
1635 performance: config.performance.clone(),
1636 monitoring: config.monitoring.clone(),
1637 };
1638
1639 let mut consumer = backend::kinesis::KinesisConsumer::new(stream_config)?;
1640 consumer.connect().await?;
1641 BackendConsumer::Kinesis(consumer)
1642 }
1643 #[cfg(feature = "pulsar")]
1644 StreamBackendType::Pulsar {
1645 service_url,
1646 auth_config,
1647 } => {
1648 let stream_config = crate::StreamConfig {
1649 backend: crate::StreamBackendType::Pulsar {
1650 service_url: service_url.clone(),
1651 auth_config: auth_config.clone(),
1652 },
1653 topic: config.topic.clone(),
1654 batch_size: config.batch_size,
1655 flush_interval_ms: config.flush_interval_ms,
1656 max_connections: config.max_connections,
1657 connection_timeout: config.connection_timeout,
1658 enable_compression: config.enable_compression,
1659 compression_type: config.compression_type.clone(),
1660 retry_config: config.retry_config.clone(),
1661 circuit_breaker: config.circuit_breaker.clone(),
1662 security: config.security.clone(),
1663 performance: config.performance.clone(),
1664 monitoring: config.monitoring.clone(),
1665 };
1666
1667 let mut consumer = backend::pulsar::PulsarConsumer::new(stream_config)?;
1668 consumer.connect().await?;
1669 BackendConsumer::Pulsar(Box::new(consumer))
1670 }
1671 #[cfg(feature = "rabbitmq")]
1672 StreamBackendType::RabbitMQ {
1673 url,
1674 exchange,
1675 queue,
1676 } => {
1677 let stream_config = crate::StreamConfig {
1678 backend: crate::StreamBackendType::RabbitMQ {
1679 url: url.clone(),
1680 exchange: exchange.clone(),
1681 queue: queue.clone(),
1682 },
1683 topic: config.topic.clone(),
1684 batch_size: config.batch_size,
1685 flush_interval_ms: config.flush_interval_ms,
1686 max_connections: config.max_connections,
1687 connection_timeout: config.connection_timeout,
1688 enable_compression: config.enable_compression,
1689 compression_type: config.compression_type.clone(),
1690 retry_config: config.retry_config.clone(),
1691 circuit_breaker: config.circuit_breaker.clone(),
1692 security: config.security.clone(),
1693 performance: config.performance.clone(),
1694 monitoring: config.monitoring.clone(),
1695 };
1696
1697 let mut consumer = backend::rabbitmq::RabbitMQConsumer::new(stream_config)?;
1698 consumer.connect().await?;
1699 BackendConsumer::RabbitMQ(Box::new(consumer))
1700 }
1701 StreamBackendType::Memory {
1702 max_size: _,
1703 persistence: _,
1704 } => BackendConsumer::Memory(MemoryConsumer::with_topic(config.topic.clone())),
1705 };
1706
1707 let stats = Arc::new(RwLock::new(ConsumerStats {
1708 backend_type: match backend_consumer {
1709 #[cfg(feature = "kafka")]
1710 BackendConsumer::Kafka(_) => "kafka".to_string(),
1711 #[cfg(feature = "nats")]
1712 BackendConsumer::Nats(_) => "nats".to_string(),
1713 #[cfg(feature = "redis")]
1714 BackendConsumer::Redis(_) => "redis".to_string(),
1715 #[cfg(feature = "kinesis")]
1716 BackendConsumer::Kinesis(_) => "kinesis".to_string(),
1717 #[cfg(feature = "pulsar")]
1718 BackendConsumer::Pulsar(_) => "pulsar".to_string(),
1719 #[cfg(feature = "rabbitmq")]
1720 BackendConsumer::RabbitMQ(_) => "rabbitmq".to_string(),
1721 BackendConsumer::Memory(_) => "memory".to_string(),
1722 },
1723 _batch_size: config.batch_size,
1724 ..Default::default()
1725 }));
1726
1727 info!(
1728 "Created stream consumer with backend: {} and group: {:?}",
1729 stats.read().await.backend_type,
1730 consumer_group
1731 );
1732
1733 Ok(Self {
1734 _config: config,
1735 backend_consumer,
1736 stats,
1737 circuit_breaker,
1738 last_poll: Instant::now(),
1739 _message_buffer: Arc::new(RwLock::new(Vec::new())),
1740 consumer_group,
1741 })
1742 }
1743
1744 pub async fn consume(&mut self) -> Result<Option<StreamEvent>> {
1746 let start_time = Instant::now();
1747
1748 if let Some(cb) = &self.circuit_breaker {
1750 if !cb.can_execute().await {
1751 self.stats.write().await.circuit_breaker_trips += 1;
1752 return Err(anyhow!("Circuit breaker is open - cannot consume events"));
1753 }
1754 }
1755
1756 let result = self.consume_single_event().await;
1758
1759 match &result {
1761 Ok(Some(_)) => {
1762 if let Some(cb) = &self.circuit_breaker {
1763 cb.record_success_with_duration(start_time.elapsed()).await;
1764 }
1765
1766 let mut stats = self.stats.write().await;
1767 stats.events_consumed += 1;
1768 let processing_time = start_time.elapsed().as_millis() as u64;
1769 stats.max_processing_time_ms = stats.max_processing_time_ms.max(processing_time);
1770 stats.avg_processing_time_ms =
1771 (stats.avg_processing_time_ms + processing_time as f64) / 2.0;
1772 stats.last_message = Some(Utc::now());
1773 }
1774 Ok(None) => {
1775 if let Some(cb) = &self.circuit_breaker {
1777 cb.record_success_with_duration(start_time.elapsed()).await;
1778 }
1779 }
1780 Err(_) => {
1781 if let Some(cb) = &self.circuit_breaker {
1782 cb.record_failure_with_type(circuit_breaker::FailureType::NetworkError)
1783 .await;
1784 }
1785
1786 self.stats.write().await.events_failed += 1;
1787 }
1788 }
1789
1790 self.last_poll = Instant::now();
1791 result
1792 }
1793
1794 async fn consume_single_event(&mut self) -> Result<Option<StreamEvent>> {
1796 match &mut self.backend_consumer {
1797 #[cfg(feature = "kafka")]
1798 BackendConsumer::Kafka(consumer) => consumer.consume().await,
1799 #[cfg(feature = "nats")]
1800 BackendConsumer::Nats(consumer) => consumer.consume().await,
1801 #[cfg(feature = "redis")]
1802 BackendConsumer::Redis(consumer) => consumer.consume().await,
1803 #[cfg(feature = "kinesis")]
1804 BackendConsumer::Kinesis(consumer) => consumer.consume().await,
1805 #[cfg(feature = "pulsar")]
1806 BackendConsumer::Pulsar(consumer) => consumer.consume().await,
1807 #[cfg(feature = "rabbitmq")]
1808 BackendConsumer::RabbitMQ(consumer) => consumer.consume().await,
1809 BackendConsumer::Memory(consumer) => consumer.consume().await,
1810 }
1811 }
1812
1813 pub async fn consume_batch(
1815 &mut self,
1816 max_events: usize,
1817 timeout: Duration,
1818 ) -> Result<Vec<StreamEvent>> {
1819 let mut events = Vec::new();
1820 let start_time = Instant::now();
1821
1822 while events.len() < max_events && start_time.elapsed() < timeout {
1823 match tokio::time::timeout(Duration::from_millis(50), self.consume()).await {
1824 Ok(Ok(Some(event))) => events.push(event),
1825 Ok(Ok(None)) => continue,
1826 Ok(Err(e)) => return Err(e),
1827 Err(_) => break, }
1829 }
1830
1831 if !events.is_empty() {
1832 debug!(
1833 "Consumed batch of {} events in {:?}",
1834 events.len(),
1835 start_time.elapsed()
1836 );
1837 }
1838
1839 Ok(events)
1840 }
1841
1842 pub async fn start_consuming<F>(&mut self, mut callback: F) -> Result<()>
1844 where
1845 F: FnMut(StreamEvent) -> Result<()> + Send,
1846 {
1847 info!("Starting stream consumer loop");
1848
1849 loop {
1850 match self.consume().await {
1851 Ok(Some(event)) => {
1852 if let Err(e) = callback(event) {
1853 error!("Callback error: {}", e);
1854 self.stats.write().await.events_failed += 1;
1855 }
1856 }
1857 Ok(None) => {
1858 tokio::time::sleep(Duration::from_millis(10)).await;
1860 }
1861 Err(e) => {
1862 error!("Consumer error: {}", e);
1863 tokio::time::sleep(Duration::from_millis(100)).await;
1864 }
1865 }
1866 }
1867 }
1868
1869 pub async fn start_consuming_async<F, Fut>(&mut self, mut callback: F) -> Result<()>
1871 where
1872 F: FnMut(StreamEvent) -> Fut + Send,
1873 Fut: std::future::Future<Output = Result<()>> + Send,
1874 {
1875 info!("Starting async stream consumer loop");
1876
1877 loop {
1878 match self.consume().await {
1879 Ok(Some(event)) => {
1880 if let Err(e) = callback(event).await {
1881 error!("Async callback error: {}", e);
1882 self.stats.write().await.events_failed += 1;
1883 }
1884 }
1885 Ok(None) => {
1886 tokio::time::sleep(Duration::from_millis(10)).await;
1888 }
1889 Err(e) => {
1890 error!("Consumer error: {}", e);
1891 tokio::time::sleep(Duration::from_millis(100)).await;
1892 }
1893 }
1894 }
1895 }
1896
1897 pub async fn get_stats(&self) -> ConsumerStats {
1899 self.stats.read().await.clone()
1900 }
1901
1902 pub async fn health_check(&self) -> bool {
1904 if let Some(cb) = &self.circuit_breaker {
1905 cb.is_healthy().await
1906 } else {
1907 true
1908 }
1909 }
1910
1911 pub fn consumer_group(&self) -> Option<&String> {
1913 self.consumer_group.as_ref()
1914 }
1915
1916 pub async fn reset_position(&mut self) -> Result<()> {
1918 match &mut self.backend_consumer {
1919 BackendConsumer::Memory(consumer) => {
1920 consumer.reset();
1921 Ok(())
1922 }
1923 #[cfg(feature = "kafka")]
1924 BackendConsumer::Kafka(_) => {
1925 Err(anyhow!("Reset position not supported for Kafka backend"))
1926 }
1927 #[cfg(feature = "nats")]
1928 BackendConsumer::Nats(_) => {
1929 Err(anyhow!("Reset position not supported for NATS backend"))
1930 }
1931 #[cfg(feature = "redis")]
1932 BackendConsumer::Redis(_) => {
1933 Err(anyhow!("Reset position not supported for Redis backend"))
1934 }
1935 #[cfg(feature = "kinesis")]
1936 BackendConsumer::Kinesis(_) => {
1937 Err(anyhow!("Reset position not supported for Kinesis backend"))
1938 }
1939 #[cfg(feature = "pulsar")]
1940 BackendConsumer::Pulsar(_) => {
1941 Err(anyhow!("Reset position not supported for Pulsar backend"))
1942 }
1943 #[cfg(feature = "rabbitmq")]
1944 BackendConsumer::RabbitMQ(_) => {
1945 Err(anyhow!("Reset position not supported for RabbitMQ backend"))
1946 }
1947 }
1948 }
1949
1950 pub async fn set_test_events(&mut self, _events: Vec<StreamEvent>) -> Result<()> {
1952 Err(anyhow!("set_test_events is deprecated with backend implementation - use producer to publish events"))
1955 }
1956}
1957
1958#[derive(Debug, Clone, Serialize, Deserialize)]
1960pub enum PatchOperation {
1961 Add {
1963 subject: String,
1964 predicate: String,
1965 object: String,
1966 },
1967 Delete {
1969 subject: String,
1970 predicate: String,
1971 object: String,
1972 },
1973 AddGraph { graph: String },
1975 DeleteGraph { graph: String },
1977 AddPrefix { prefix: String, namespace: String },
1979 DeletePrefix { prefix: String },
1981 TransactionBegin { transaction_id: Option<String> },
1983 TransactionCommit,
1985 TransactionAbort,
1987 Header { key: String, value: String },
1989}
1990
1991#[derive(Debug, Clone, Serialize, Deserialize)]
1993pub struct RdfPatch {
1994 pub operations: Vec<PatchOperation>,
1995 pub timestamp: chrono::DateTime<chrono::Utc>,
1996 pub id: String,
1997 pub headers: HashMap<String, String>,
1999 pub transaction_id: Option<String>,
2001 pub prefixes: HashMap<String, String>,
2003}
2004
2005impl RdfPatch {
2006 pub fn new() -> Self {
2008 Self {
2009 operations: Vec::new(),
2010 timestamp: chrono::Utc::now(),
2011 id: uuid::Uuid::new_v4().to_string(),
2012 headers: HashMap::new(),
2013 transaction_id: None,
2014 prefixes: HashMap::new(),
2015 }
2016 }
2017
2018 pub fn add_operation(&mut self, operation: PatchOperation) {
2020 self.operations.push(operation);
2021 }
2022
2023 pub fn to_rdf_patch_format(&self) -> Result<String> {
2025 let serializer = crate::patch::PatchSerializer::new()
2026 .with_pretty_print(true)
2027 .with_metadata(true);
2028 serializer.serialize(self)
2029 }
2030
2031 pub fn from_rdf_patch_format(input: &str) -> Result<Self> {
2033 let mut parser = crate::patch::PatchParser::new().with_strict_mode(false);
2034 parser.parse(input)
2035 }
2036}
2037
2038impl Default for RdfPatch {
2039 fn default() -> Self {
2040 Self::new()
2041 }
2042}
2043
2044impl Default for StreamConfig {
2046 fn default() -> Self {
2047 Self {
2048 backend: StreamBackendType::Memory {
2049 max_size: Some(10000),
2050 persistence: false,
2051 },
2052 topic: "oxirs-stream".to_string(),
2053 batch_size: 100,
2054 flush_interval_ms: 100,
2055 max_connections: 10,
2056 connection_timeout: Duration::from_secs(30),
2057 enable_compression: false,
2058 compression_type: CompressionType::None,
2059 retry_config: RetryConfig::default(),
2060 circuit_breaker: CircuitBreakerConfig::default(),
2061 security: SecurityConfig::default(),
2062 performance: StreamPerformanceConfig::default(),
2063 monitoring: MonitoringConfig::default(),
2064 }
2065 }
2066}
2067
2068impl Default for RetryConfig {
2069 fn default() -> Self {
2070 Self {
2071 max_retries: 3,
2072 initial_backoff: Duration::from_millis(100),
2073 max_backoff: Duration::from_secs(30),
2074 backoff_multiplier: 2.0,
2075 jitter: true,
2076 }
2077 }
2078}
2079
2080impl Default for CircuitBreakerConfig {
2081 fn default() -> Self {
2082 Self {
2083 enabled: true,
2084 failure_threshold: 5,
2085 success_threshold: 3,
2086 timeout: Duration::from_secs(30),
2087 half_open_max_calls: 3,
2088 }
2089 }
2090}
2091
2092impl Default for SecurityConfig {
2093 fn default() -> Self {
2094 Self {
2095 enable_tls: false,
2096 verify_certificates: true,
2097 client_cert_path: None,
2098 client_key_path: None,
2099 ca_cert_path: None,
2100 sasl_config: None,
2101 }
2102 }
2103}
2104
2105impl Default for StreamPerformanceConfig {
2106 fn default() -> Self {
2107 Self {
2108 enable_batching: true,
2109 enable_pipelining: false,
2110 buffer_size: 8192,
2111 prefetch_count: 100,
2112 enable_zero_copy: false,
2113 enable_simd: false,
2114 parallel_processing: true,
2115 worker_threads: None,
2116 }
2117 }
2118}
2119
2120impl Default for MonitoringConfig {
2121 fn default() -> Self {
2122 Self {
2123 enable_metrics: true,
2124 enable_tracing: true,
2125 metrics_interval: Duration::from_secs(60),
2126 health_check_interval: Duration::from_secs(30),
2127 enable_profiling: false,
2128 prometheus_endpoint: None,
2129 jaeger_endpoint: None,
2130 log_level: "info".to_string(),
2131 }
2132 }
2133}
2134
2135impl StreamConfig {
2137 #[cfg(feature = "redis")]
2139 pub fn redis(url: String) -> Self {
2140 Self {
2141 backend: StreamBackendType::Redis {
2142 url,
2143 cluster_urls: None,
2144 pool_size: Some(10),
2145 },
2146 ..Default::default()
2147 }
2148 }
2149
2150 #[cfg(feature = "kinesis")]
2152 pub fn kinesis(region: String, stream_name: String) -> Self {
2153 Self {
2154 backend: StreamBackendType::Kinesis {
2155 region,
2156 stream_name,
2157 credentials: None,
2158 },
2159 ..Default::default()
2160 }
2161 }
2162
2163 pub fn memory() -> Self {
2165 Self {
2166 backend: StreamBackendType::Memory {
2167 max_size: Some(1000),
2168 persistence: false,
2169 },
2170 ..Default::default()
2171 }
2172 }
2173
2174 pub fn high_performance(mut self) -> Self {
2176 self.performance.enable_batching = true;
2177 self.performance.enable_pipelining = true;
2178 self.performance.parallel_processing = true;
2179 self.performance.buffer_size = 65536;
2180 self.performance.prefetch_count = 1000;
2181 self.batch_size = 1000;
2182 self.flush_interval_ms = 10;
2183 self
2184 }
2185
2186 pub fn with_compression(mut self, compression_type: CompressionType) -> Self {
2188 self.enable_compression = true;
2189 self.compression_type = compression_type;
2190 self
2191 }
2192
2193 pub fn with_circuit_breaker(mut self, enabled: bool, failure_threshold: u32) -> Self {
2195 self.circuit_breaker.enabled = enabled;
2196 self.circuit_breaker.failure_threshold = failure_threshold;
2197 self
2198 }
2199
2200 pub fn development(topic: &str) -> Self {
2202 Self {
2203 backend: StreamBackendType::Memory {
2204 max_size: Some(10000),
2205 persistence: false,
2206 },
2207 topic: topic.to_string(),
2208 batch_size: 10,
2209 flush_interval_ms: 100,
2210 max_connections: 5,
2211 connection_timeout: Duration::from_secs(10),
2212 enable_compression: false,
2213 compression_type: CompressionType::None,
2214 retry_config: RetryConfig {
2215 max_retries: 3,
2216 initial_backoff: Duration::from_millis(100),
2217 max_backoff: Duration::from_secs(5),
2218 backoff_multiplier: 2.0,
2219 jitter: true,
2220 },
2221 circuit_breaker: CircuitBreakerConfig {
2222 enabled: false,
2223 failure_threshold: 5,
2224 success_threshold: 2,
2225 timeout: Duration::from_secs(60),
2226 half_open_max_calls: 10,
2227 },
2228 security: SecurityConfig::default(),
2229 performance: StreamPerformanceConfig::default(),
2230 monitoring: MonitoringConfig {
2231 enable_metrics: true,
2232 enable_tracing: false,
2233 metrics_interval: Duration::from_secs(5),
2234 health_check_interval: Duration::from_secs(30),
2235 enable_profiling: false,
2236 prometheus_endpoint: None,
2237 jaeger_endpoint: None,
2238 log_level: "debug".to_string(),
2239 },
2240 }
2241 }
2242
2243 pub fn production(topic: &str) -> Self {
2245 Self {
2246 backend: StreamBackendType::Memory {
2247 max_size: Some(100000),
2248 persistence: true,
2249 },
2250 topic: topic.to_string(),
2251 batch_size: 1000,
2252 flush_interval_ms: 10,
2253 max_connections: 50,
2254 connection_timeout: Duration::from_secs(30),
2255 enable_compression: true,
2256 compression_type: CompressionType::Zstd,
2257 retry_config: RetryConfig {
2258 max_retries: 5,
2259 initial_backoff: Duration::from_millis(200),
2260 max_backoff: Duration::from_secs(30),
2261 backoff_multiplier: 2.0,
2262 jitter: true,
2263 },
2264 circuit_breaker: CircuitBreakerConfig {
2265 enabled: true,
2266 failure_threshold: 10,
2267 success_threshold: 3,
2268 timeout: Duration::from_secs(30),
2269 half_open_max_calls: 5,
2270 },
2271 security: SecurityConfig::default(),
2272 performance: StreamPerformanceConfig {
2273 enable_batching: true,
2274 enable_pipelining: true,
2275 parallel_processing: true,
2276 buffer_size: 65536,
2277 prefetch_count: 1000,
2278 enable_zero_copy: true,
2279 enable_simd: true,
2280 worker_threads: None,
2281 },
2282 monitoring: MonitoringConfig {
2283 enable_metrics: true,
2284 enable_tracing: true,
2285 metrics_interval: Duration::from_secs(1),
2286 health_check_interval: Duration::from_secs(10),
2287 enable_profiling: true,
2288 prometheus_endpoint: None,
2289 jaeger_endpoint: None,
2290 log_level: "info".to_string(),
2291 },
2292 }
2293 }
2294}
2295
2296pub struct Stream {
2298 producer: StreamProducer,
2299 consumer: StreamConsumer,
2300}
2301
2302impl Stream {
2303 pub async fn new(config: StreamConfig) -> Result<Self> {
2305 let producer = StreamProducer::new(config.clone()).await?;
2312 let consumer = StreamConsumer::new(config).await?;
2313
2314 Ok(Self { producer, consumer })
2315 }
2316
2317 pub async fn publish(&mut self, event: StreamEvent) -> Result<()> {
2319 self.producer.publish(event).await
2320 }
2321
2322 pub async fn consume(&mut self) -> Result<Option<StreamEvent>> {
2324 self.consumer.consume().await
2325 }
2326
2327 pub async fn flush(&mut self) -> Result<()> {
2329 self.producer.flush().await
2330 }
2331
2332 pub async fn producer_stats(&self) -> ProducerStats {
2334 self.producer.get_stats().await
2335 }
2336
2337 pub async fn consumer_stats(&self) -> ConsumerStats {
2339 self.consumer.get_stats().await
2340 }
2341
2342 pub async fn close(&mut self) -> Result<()> {
2344 self.producer.flush().await?;
2346
2347 debug!("Stream closed successfully");
2350 Ok(())
2351 }
2352
2353 pub async fn health_check(&self) -> Result<bool> {
2355 Ok(true)
2358 }
2359
2360 pub async fn begin_transaction(&mut self) -> Result<()> {
2362 debug!("Transaction begun (placeholder)");
2364 Ok(())
2365 }
2366
2367 pub async fn commit_transaction(&mut self) -> Result<()> {
2369 debug!("Transaction committed (placeholder)");
2371 Ok(())
2372 }
2373
2374 pub async fn rollback_transaction(&mut self) -> Result<()> {
2376 debug!("Transaction rolled back (placeholder)");
2378 Ok(())
2379 }
2380}