1use crate::event;
6use oxicode::{Decode, Encode};
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::fmt;
10
11#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
13pub struct TopicName(String);
14
15impl TopicName {
16 pub fn new(name: String) -> Self {
17 Self(name)
18 }
19
20 pub fn as_str(&self) -> &str {
21 &self.0
22 }
23}
24
25impl fmt::Display for TopicName {
26 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
27 write!(f, "{}", self.0)
28 }
29}
30
31impl From<&str> for TopicName {
32 fn from(s: &str) -> Self {
33 Self(s.to_string())
34 }
35}
36
37impl From<String> for TopicName {
38 fn from(s: String) -> Self {
39 Self(s)
40 }
41}
42
43#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
45pub struct PartitionId(u32);
46
47impl PartitionId {
48 pub fn new(id: u32) -> Self {
49 Self(id)
50 }
51
52 pub fn value(&self) -> u32 {
53 self.0
54 }
55}
56
57impl fmt::Display for PartitionId {
58 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
59 write!(f, "{}", self.0)
60 }
61}
62
63#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
65pub struct Offset(u64);
66
67impl Offset {
68 pub fn new(offset: u64) -> Self {
69 Self(offset)
70 }
71
72 pub fn value(&self) -> u64 {
73 self.0
74 }
75}
76
77impl fmt::Display for Offset {
78 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
79 write!(f, "{}", self.0)
80 }
81}
82
83#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Encode, Decode)]
85pub enum StreamPosition {
86 Beginning,
88 End,
90 Offset(u64),
92}
93
94#[derive(Debug, Clone, Serialize, Deserialize)]
96pub struct EventMetadata {
97 pub source: String,
99 pub user: Option<String>,
101 pub session_id: Option<String>,
103 pub trace_id: Option<String>,
105 pub causality_token: Option<String>,
107 pub version: Option<String>,
109
110 pub timestamp: chrono::DateTime<chrono::Utc>,
113 pub operation_context: Option<OperationContext>,
115 pub priority: EventPriority,
117 pub partition: Option<PartitionId>,
119 pub correlation_id: Option<String>,
121 pub checksum: Option<String>,
123 pub schema_version: String,
125 pub tags: HashMap<String, String>,
127 pub ttl_seconds: Option<u64>,
129 pub compression: Option<CompressionType>,
131 pub serialization_format: SerializationFormat,
133 pub message_size: Option<usize>,
135 pub processing_hints: ProcessingHints,
137}
138
139impl From<EventMetadata> for event::EventMetadata {
141 fn from(metadata: EventMetadata) -> Self {
142 Self {
143 event_id: format!(
144 "evt_{}",
145 chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0)
146 ), timestamp: metadata.timestamp,
148 source: metadata.source,
149 user: metadata.user,
150 context: metadata.operation_context.map(|ctx| ctx.operation_type),
151 caused_by: metadata.causality_token,
152 version: metadata.version.unwrap_or(metadata.schema_version),
153 properties: HashMap::new(), checksum: metadata.checksum,
155 }
156 }
157}
158
159impl From<event::EventMetadata> for EventMetadata {
161 fn from(metadata: event::EventMetadata) -> Self {
162 Self {
163 source: metadata.source,
164 user: metadata.user,
165 session_id: None,
166 trace_id: None,
167 causality_token: metadata.caused_by,
168 version: Some(metadata.version),
169 timestamp: metadata.timestamp,
170 operation_context: metadata.context.map(|ctx| OperationContext {
171 operation_type: ctx,
172 request_id: None,
173 client_info: None,
174 metrics: None,
175 auth_context: None,
176 custom_fields: HashMap::new(),
177 }),
178 priority: EventPriority::Normal,
179 partition: None,
180 correlation_id: None,
181 checksum: metadata.checksum,
182 schema_version: "1.0".to_string(),
183 tags: metadata.properties,
184 ttl_seconds: None,
185 compression: None,
186 serialization_format: SerializationFormat::Json,
187 message_size: None,
188 processing_hints: ProcessingHints::default(),
189 }
190 }
191}
192
193#[derive(Debug, Clone, Serialize, Deserialize)]
195pub struct OperationContext {
196 pub operation_type: String,
198 pub request_id: Option<String>,
200 pub client_info: Option<ClientInfo>,
202 pub metrics: Option<PerformanceMetrics>,
204 pub auth_context: Option<AuthContext>,
206 pub custom_fields: HashMap<String, String>,
208}
209
210#[derive(Debug, Clone, Serialize, Deserialize, Encode, Decode)]
212pub struct ClientInfo {
213 pub application: String,
215 pub version: String,
217 pub ip_address: Option<String>,
219 pub user_agent: Option<String>,
221 pub location: Option<GeoLocation>,
223}
224
225#[derive(Debug, Clone, Serialize, Deserialize, Encode, Decode)]
227pub struct GeoLocation {
228 pub country: String,
230 pub region: Option<String>,
232 pub city: Option<String>,
234 pub lat: Option<f64>,
236 pub lon: Option<f64>,
238}
239
240#[derive(Debug, Clone, Serialize, Deserialize, Encode, Decode)]
242pub struct PerformanceMetrics {
243 pub processing_latency_us: Option<u64>,
245 pub queue_wait_time_us: Option<u64>,
247 pub serialization_time_us: Option<u64>,
249 pub network_latency_us: Option<u64>,
251 pub memory_usage_bytes: Option<u64>,
253 pub cpu_time_us: Option<u64>,
255}
256
257#[derive(Debug, Clone, Serialize, Deserialize)]
259pub struct AuthContext {
260 pub user_id: String,
262 pub roles: Vec<String>,
264 pub permissions: Vec<String>,
266 pub auth_method: String,
268 pub token_expires_at: Option<chrono::DateTime<chrono::Utc>>,
270}
271
272#[derive(
274 Debug,
275 Clone,
276 Copy,
277 PartialEq,
278 Eq,
279 PartialOrd,
280 Ord,
281 Serialize,
282 Deserialize,
283 Default,
284 Encode,
285 Decode,
286)]
287pub enum EventPriority {
288 Low = 0,
289 #[default]
290 Normal = 1,
291 High = 2,
292 Critical = 3,
293}
294
295#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default, Encode, Decode)]
297pub enum CompressionType {
298 #[default]
299 None,
300 Gzip,
301 Lz4,
302 Zstd,
303 Snappy,
304 Brotli,
305}
306
307#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default, Encode, Decode)]
309pub enum SerializationFormat {
310 #[default]
311 Json,
312 MessagePack,
313 Protobuf,
314 Avro,
315 Cbor,
316 Bincode,
317}
318
319#[derive(Debug, Clone, Serialize, Deserialize, Encode, Decode)]
321pub struct ProcessingHints {
322 pub allow_out_of_order: bool,
324 pub allow_deduplication: bool,
326 pub batch_preference: BatchPreference,
328 pub consistency_level: ConsistencyLevel,
330 pub retry_policy: RetryPolicy,
332 pub processing_timeout_ms: Option<u64>,
334}
335
336#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Encode, Decode)]
338pub enum BatchPreference {
339 Immediate,
341 Batchable,
343 RequiredBatch,
345}
346
347#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Encode, Decode)]
349pub enum ConsistencyLevel {
350 Eventual,
352 PerPartition,
354 Strong,
356}
357
358#[derive(Debug, Clone, Serialize, Deserialize, Encode, Decode)]
360pub struct RetryPolicy {
361 pub max_retries: u32,
363 pub base_delay_ms: u64,
365 pub max_delay_ms: u64,
367 pub backoff_multiplier: f64,
369 pub use_jitter: bool,
371}
372
373impl Default for EventMetadata {
374 fn default() -> Self {
375 Self {
376 source: "oxirs-stream".to_string(),
377 user: None,
378 session_id: None,
379 trace_id: None,
380 causality_token: None,
381 version: Some("1.0".to_string()),
382 timestamp: chrono::Utc::now(),
383 operation_context: None,
384 priority: EventPriority::Normal,
385 partition: None,
386 correlation_id: None,
387 checksum: None,
388 schema_version: "1.0".to_string(),
389 tags: HashMap::new(),
390 ttl_seconds: None,
391 compression: None,
392 serialization_format: SerializationFormat::Json,
393 message_size: None,
394 processing_hints: ProcessingHints::default(),
395 }
396 }
397}
398
399impl Default for ProcessingHints {
400 fn default() -> Self {
401 Self {
402 allow_out_of_order: false,
403 allow_deduplication: true,
404 batch_preference: BatchPreference::Batchable,
405 consistency_level: ConsistencyLevel::PerPartition,
406 retry_policy: RetryPolicy::default(),
407 processing_timeout_ms: Some(30000), }
409 }
410}
411
412impl Default for RetryPolicy {
413 fn default() -> Self {
414 Self {
415 max_retries: 3,
416 base_delay_ms: 100,
417 max_delay_ms: 10000,
418 backoff_multiplier: 2.0,
419 use_jitter: true,
420 }
421 }
422}
423
424pub mod serialization {
426 use super::*;
427 use anyhow::{anyhow, Result};
428
429 pub fn serialize_metadata(
431 metadata: &EventMetadata,
432 format: SerializationFormat,
433 ) -> Result<Vec<u8>> {
434 match format {
435 SerializationFormat::Json => {
436 serde_json::to_vec(metadata).map_err(|e| anyhow!("JSON serialization failed: {e}"))
437 }
438 SerializationFormat::MessagePack => rmp_serde::to_vec(metadata)
439 .map_err(|e| anyhow!("MessagePack serialization failed: {e}")),
440 SerializationFormat::Cbor => {
441 let mut buf = Vec::new();
442 ciborium::ser::into_writer(metadata, &mut buf)
443 .map_err(|e| anyhow!("CBOR serialization failed: {e}"))?;
444 Ok(buf)
445 }
446 SerializationFormat::Bincode => {
447 oxicode::serde::encode_to_vec(metadata, oxicode::config::standard())
448 .map_err(|e| anyhow!("Bincode serialization failed: {e}"))
449 }
450 SerializationFormat::Protobuf | SerializationFormat::Avro => {
451 serde_json::to_vec(metadata)
454 .map_err(|e| anyhow!("Protobuf/Avro serialization fallback failed: {e}"))
455 }
456 }
457 }
458
459 pub fn deserialize_metadata(data: &[u8], format: SerializationFormat) -> Result<EventMetadata> {
461 match format {
462 SerializationFormat::Json => serde_json::from_slice(data)
463 .map_err(|e| anyhow!("JSON deserialization failed: {e}")),
464 SerializationFormat::MessagePack => rmp_serde::from_slice(data)
465 .map_err(|e| anyhow!("MessagePack deserialization failed: {e}")),
466 SerializationFormat::Cbor => ciborium::de::from_reader(data)
467 .map_err(|e| anyhow!("CBOR deserialization failed: {e}")),
468 SerializationFormat::Bincode => {
469 oxicode::serde::decode_from_slice(data, oxicode::config::standard())
470 .map(|(v, _)| v)
471 .map_err(|e| anyhow!("Bincode deserialization failed: {e}"))
472 }
473 SerializationFormat::Protobuf | SerializationFormat::Avro => {
474 serde_json::from_slice(data)
477 .map_err(|e| anyhow!("Protobuf/Avro deserialization fallback failed: {e}"))
478 }
479 }
480 }
481
482 pub fn compress_data(data: &[u8], compression: CompressionType) -> Result<Vec<u8>> {
484 match compression {
485 CompressionType::None => Ok(data.to_vec()),
486 CompressionType::Gzip => {
487 use flate2::write::GzEncoder;
488 use flate2::Compression;
489 use std::io::Write;
490
491 let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
492 encoder.write_all(data)?;
493 Ok(encoder.finish()?)
494 }
495 CompressionType::Lz4 => {
496 oxiarc_lz4::compress(data).map_err(|e| anyhow!("LZ4 compression failed: {e}"))
497 }
498 CompressionType::Zstd => {
499 oxiarc_zstd::compress(data).map_err(|e| anyhow!("Zstd compression failed: {e}"))
500 }
501 CompressionType::Snappy => Ok(snap::raw::Encoder::new().compress_vec(data)?),
502 CompressionType::Brotli => {
503 use brotli::CompressorWriter;
504 use std::io::Write;
505 let mut compressed = Vec::new();
506 {
507 let mut compressor = CompressorWriter::new(&mut compressed, 4096, 6, 22);
508 compressor.write_all(data)?;
509 } Ok(compressed)
511 }
512 }
513 }
514
515 pub fn decompress_data(data: &[u8], compression: CompressionType) -> Result<Vec<u8>> {
517 match compression {
518 CompressionType::None => Ok(data.to_vec()),
519 CompressionType::Gzip => {
520 use flate2::read::GzDecoder;
521 use std::io::Read;
522
523 let mut decoder = GzDecoder::new(data);
524 let mut decompressed = Vec::new();
525 decoder.read_to_end(&mut decompressed)?;
526 Ok(decompressed)
527 }
528 CompressionType::Lz4 => oxiarc_lz4::decompress(data, 100 * 1024 * 1024)
529 .map_err(|e| anyhow!("LZ4 decompression failed: {e}")),
530 CompressionType::Zstd => {
531 oxiarc_zstd::decode_all(data).map_err(|e| anyhow!("Zstd decompression failed: {e}"))
532 }
533 CompressionType::Snappy => snap::raw::Decoder::new()
534 .decompress_vec(data)
535 .map_err(|e| anyhow!("Snappy decompression failed: {e}")),
536 CompressionType::Brotli => {
537 use std::io::Read;
538 let mut decompressed = Vec::new();
539 let mut decompressor = brotli::Decompressor::new(data, 4096);
540 decompressor.read_to_end(&mut decompressed)?;
541 Ok(decompressed)
542 }
543 }
544 }
545}
546
547pub mod processing {
549 use super::*;
550 use std::time::{Duration, Instant};
551
552 pub struct EventProcessor {
554 pub deduplication_cache: std::collections::HashSet<String>,
555 pub batch_buffer: Vec<(crate::event::StreamEvent, EventMetadata)>,
556 pub last_flush: Instant,
557 pub flush_interval: Duration,
558 }
559
560 impl Default for EventProcessor {
561 fn default() -> Self {
562 Self::new()
563 }
564 }
565
566 impl EventProcessor {
567 pub fn new() -> Self {
568 Self {
569 deduplication_cache: std::collections::HashSet::new(),
570 batch_buffer: Vec::new(),
571 last_flush: Instant::now(),
572 flush_interval: Duration::from_millis(100),
573 }
574 }
575
576 pub fn process_event(
578 &mut self,
579 mut event: crate::event::StreamEvent,
580 ) -> anyhow::Result<Option<crate::event::StreamEvent>> {
581 let metadata = self.extract_metadata(&event)?;
583 let enhanced_metadata = self.enhance_metadata(metadata)?;
584
585 if enhanced_metadata.processing_hints.allow_deduplication {
587 if let Some(correlation_id) = &enhanced_metadata.correlation_id {
588 if self.deduplication_cache.contains(correlation_id) {
589 return Ok(None); }
591 self.deduplication_cache.insert(correlation_id.clone());
592 }
593 }
594
595 self.update_event_metadata(&mut event, enhanced_metadata)?;
597
598 match self.get_batch_preference(&event) {
600 BatchPreference::Immediate => Ok(Some(event)),
601 BatchPreference::Batchable | BatchPreference::RequiredBatch => {
602 self.add_to_batch(event);
603
604 if self.should_flush_batch() {
606 Ok(self.batch_buffer.last().map(|(e, _)| e.clone()))
609 } else {
610 Ok(None)
611 }
612 }
613 }
614 }
615
616 fn extract_metadata(
617 &self,
618 event: &crate::event::StreamEvent,
619 ) -> anyhow::Result<EventMetadata> {
620 match event {
622 crate::event::StreamEvent::TripleAdded { metadata, .. } => {
623 Ok(metadata.clone().into())
624 }
625 crate::event::StreamEvent::TripleRemoved { metadata, .. } => {
626 Ok(metadata.clone().into())
627 }
628 crate::event::StreamEvent::GraphCreated { metadata, .. } => {
629 Ok(metadata.clone().into())
630 }
631 crate::event::StreamEvent::SparqlUpdate { metadata, .. } => {
632 Ok(metadata.clone().into())
633 }
634 crate::event::StreamEvent::TransactionBegin { metadata, .. } => {
635 Ok(metadata.clone().into())
636 }
637 crate::event::StreamEvent::Heartbeat { metadata, .. } => {
638 Ok(metadata.clone().into())
639 }
640 _ => Ok(EventMetadata::default()),
641 }
642 }
643
644 fn enhance_metadata(&self, mut metadata: EventMetadata) -> anyhow::Result<EventMetadata> {
645 if metadata.timestamp == chrono::DateTime::<chrono::Utc>::MIN_UTC {
647 metadata.timestamp = chrono::Utc::now();
648 }
649
650 if metadata.correlation_id.is_none() {
652 metadata.correlation_id = Some(uuid::Uuid::new_v4().to_string());
653 }
654
655 if metadata.schema_version.is_empty() {
657 metadata.schema_version = "1.0".to_string();
658 }
659
660 if metadata.operation_context.is_none() {
662 metadata.operation_context = Some(OperationContext {
663 operation_type: "stream_event".to_string(),
664 request_id: Some(uuid::Uuid::new_v4().to_string()),
665 client_info: None,
666 metrics: Some(PerformanceMetrics {
667 processing_latency_us: Some(0),
668 queue_wait_time_us: Some(0),
669 serialization_time_us: Some(0),
670 network_latency_us: Some(0),
671 memory_usage_bytes: Some(0),
672 cpu_time_us: Some(0),
673 }),
674 auth_context: None,
675 custom_fields: HashMap::new(),
676 });
677 }
678
679 Ok(metadata)
680 }
681
682 fn update_event_metadata(
683 &self,
684 event: &mut crate::event::StreamEvent,
685 metadata: EventMetadata,
686 ) -> anyhow::Result<()> {
687 let event_metadata = event::EventMetadata::from(metadata);
688 match event {
689 crate::event::StreamEvent::TripleAdded { metadata: m, .. } => *m = event_metadata,
690 crate::event::StreamEvent::TripleRemoved { metadata: m, .. } => *m = event_metadata,
691 crate::event::StreamEvent::GraphCreated { metadata: m, .. } => *m = event_metadata,
692 crate::event::StreamEvent::SparqlUpdate { metadata: m, .. } => *m = event_metadata,
693 crate::event::StreamEvent::TransactionBegin { metadata: m, .. } => {
694 *m = event_metadata
695 }
696 crate::event::StreamEvent::Heartbeat { metadata: m, .. } => *m = event_metadata,
697 _ => {}
698 }
699 Ok(())
700 }
701
702 fn get_batch_preference(&self, event: &crate::event::StreamEvent) -> BatchPreference {
703 match event {
704 crate::event::StreamEvent::Heartbeat { .. } => BatchPreference::Immediate,
705 crate::event::StreamEvent::TransactionBegin { .. } => BatchPreference::Immediate,
706 crate::event::StreamEvent::TransactionCommit { .. } => BatchPreference::Immediate,
707 crate::event::StreamEvent::TransactionAbort { .. } => BatchPreference::Immediate,
708 _ => BatchPreference::Batchable,
709 }
710 }
711
712 fn add_to_batch(&mut self, event: crate::event::StreamEvent) {
713 let metadata = self.extract_metadata(&event).unwrap_or_default();
714 self.batch_buffer.push((event, metadata));
715 }
716
717 fn should_flush_batch(&self) -> bool {
718 self.batch_buffer.len() >= 100 || self.last_flush.elapsed() >= self.flush_interval
719 }
720 }
721
722 #[cfg(test)]
723 mod tests {
724 use super::*;
725 use crate::types::serialization::{compress_data, decompress_data};
726
727 #[test]
728 fn test_compression_round_trip() {
729 let test_data = b"Hello, World! This is a test message for compression.";
730 let compression_types = vec![
731 CompressionType::None,
732 CompressionType::Gzip,
733 CompressionType::Lz4,
734 CompressionType::Zstd,
735 CompressionType::Snappy,
736 CompressionType::Brotli,
737 ];
738
739 for compression in compression_types {
740 let compressed = compress_data(test_data, compression).unwrap();
741 let decompressed = decompress_data(&compressed, compression).unwrap();
742 assert_eq!(
743 test_data,
744 decompressed.as_slice(),
745 "Failed round-trip for {compression:?}"
746 );
747 }
748 }
749
750 #[test]
751 fn test_compression_effectiveness() {
752 let test_data = b"AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"; let compression_types = vec![
754 CompressionType::Gzip,
755 CompressionType::Lz4,
756 CompressionType::Zstd,
757 CompressionType::Snappy,
758 CompressionType::Brotli,
759 ];
760
761 for compression in compression_types {
762 let compressed = compress_data(test_data, compression).unwrap();
763 assert!(
765 compressed.len() < test_data.len(),
766 "Compression {compression:?} did not reduce size"
767 );
768 }
769 }
770
771 #[test]
772 fn test_empty_data_compression() {
773 let test_data = b"";
774 let compression_types = vec![
775 CompressionType::None,
776 CompressionType::Gzip,
777 CompressionType::Lz4,
778 CompressionType::Zstd,
779 CompressionType::Snappy,
780 CompressionType::Brotli,
781 ];
782
783 for compression in compression_types {
784 let compressed = compress_data(test_data, compression).unwrap();
785 let decompressed = decompress_data(&compressed, compression).unwrap();
786 assert_eq!(
787 test_data,
788 decompressed.as_slice(),
789 "Failed empty data round-trip for {compression:?}"
790 );
791 }
792 }
793
794 #[test]
795 fn test_large_data_compression() {
796 let test_data = vec![42u8; 10000]; let compression_types = vec![
798 CompressionType::None,
799 CompressionType::Gzip,
800 CompressionType::Lz4,
801 CompressionType::Zstd,
802 CompressionType::Snappy,
803 CompressionType::Brotli,
804 ];
805
806 for compression in compression_types {
807 let compressed = compress_data(&test_data, compression).unwrap();
808 let decompressed = decompress_data(&compressed, compression).unwrap();
809 assert_eq!(
810 test_data, decompressed,
811 "Failed large data round-trip for {compression:?}"
812 );
813 }
814 }
815
816 #[test]
817 fn test_random_data_compression() {
818 use scirs2_core::random::{Random, Rng};
819 let mut random_gen = Random::default();
820 let test_data: Vec<u8> = (0..1000).map(|_| random_gen.random()).collect();
821 let compression_types = vec![
822 CompressionType::None,
823 CompressionType::Gzip,
824 CompressionType::Lz4,
825 CompressionType::Zstd,
826 CompressionType::Snappy,
827 CompressionType::Brotli,
828 ];
829
830 for compression in compression_types {
831 let compressed = compress_data(&test_data, compression).unwrap();
832 let decompressed = decompress_data(&compressed, compression).unwrap();
833 assert_eq!(
834 test_data, decompressed,
835 "Failed random data round-trip for {compression:?}"
836 );
837 }
838 }
839 }
840}