Skip to main content

oxirs_stream/
types.rs

1//! # Stream Types
2//!
3//! Common types used throughout the streaming module.
4
5use crate::event;
6use oxicode::{Decode, Encode};
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::fmt;
10
11/// Topic name wrapper
12#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
13pub struct TopicName(String);
14
15impl TopicName {
16    pub fn new(name: String) -> Self {
17        Self(name)
18    }
19
20    pub fn as_str(&self) -> &str {
21        &self.0
22    }
23}
24
25impl fmt::Display for TopicName {
26    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
27        write!(f, "{}", self.0)
28    }
29}
30
31impl From<&str> for TopicName {
32    fn from(s: &str) -> Self {
33        Self(s.to_string())
34    }
35}
36
37impl From<String> for TopicName {
38    fn from(s: String) -> Self {
39        Self(s)
40    }
41}
42
43/// Partition identifier
44#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
45pub struct PartitionId(u32);
46
47impl PartitionId {
48    pub fn new(id: u32) -> Self {
49        Self(id)
50    }
51
52    pub fn value(&self) -> u32 {
53        self.0
54    }
55}
56
57impl fmt::Display for PartitionId {
58    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
59        write!(f, "{}", self.0)
60    }
61}
62
63/// Message offset
64#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
65pub struct Offset(u64);
66
67impl Offset {
68    pub fn new(offset: u64) -> Self {
69        Self(offset)
70    }
71
72    pub fn value(&self) -> u64 {
73        self.0
74    }
75}
76
77impl fmt::Display for Offset {
78    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
79        write!(f, "{}", self.0)
80    }
81}
82
83/// Stream position for seeking
84#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Encode, Decode)]
85pub enum StreamPosition {
86    /// Start from the beginning
87    Beginning,
88    /// Start from the end
89    End,
90    /// Start from a specific offset
91    Offset(u64),
92}
93
94/// Enhanced event metadata for tracking and provenance with advanced features
95#[derive(Debug, Clone, Serialize, Deserialize)]
96pub struct EventMetadata {
97    /// Source system or component
98    pub source: String,
99    /// User who triggered the event
100    pub user: Option<String>,
101    /// Session identifier
102    pub session_id: Option<String>,
103    /// Trace identifier for distributed tracing
104    pub trace_id: Option<String>,
105    /// Causality token for event ordering
106    pub causality_token: Option<String>,
107    /// Event version for schema evolution
108    pub version: Option<String>,
109
110    // Enhanced metadata fields (TODO items)
111    /// Event timestamp with high precision
112    pub timestamp: chrono::DateTime<chrono::Utc>,
113    /// Operation context with request details
114    pub operation_context: Option<OperationContext>,
115    /// Event priority for processing order
116    pub priority: EventPriority,
117    /// Partition information for routing
118    pub partition: Option<PartitionId>,
119    /// Event correlation ID for related events
120    pub correlation_id: Option<String>,
121    /// Checksum for data integrity
122    pub checksum: Option<String>,
123    /// Schema version for data format
124    pub schema_version: String,
125    /// Event tags for filtering and routing
126    pub tags: HashMap<String, String>,
127    /// Event TTL (time to live) in seconds
128    pub ttl_seconds: Option<u64>,
129    /// Compression type used for payload
130    pub compression: Option<CompressionType>,
131    /// Serialization format used
132    pub serialization_format: SerializationFormat,
133    /// Message size in bytes
134    pub message_size: Option<usize>,
135    /// Processing hints for consumers
136    pub processing_hints: ProcessingHints,
137}
138
139/// Conversion from types::EventMetadata to event::EventMetadata
140impl From<EventMetadata> for event::EventMetadata {
141    fn from(metadata: EventMetadata) -> Self {
142        Self {
143            event_id: format!(
144                "evt_{}",
145                chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0)
146            ), // Generate simple ID
147            timestamp: metadata.timestamp,
148            source: metadata.source,
149            user: metadata.user,
150            context: metadata.operation_context.map(|ctx| ctx.operation_type),
151            caused_by: metadata.causality_token,
152            version: metadata.version.unwrap_or(metadata.schema_version),
153            properties: HashMap::new(), // Could be populated from custom fields
154            checksum: metadata.checksum,
155        }
156    }
157}
158
159/// Conversion from event::EventMetadata to types::EventMetadata
160impl From<event::EventMetadata> for EventMetadata {
161    fn from(metadata: event::EventMetadata) -> Self {
162        Self {
163            source: metadata.source,
164            user: metadata.user,
165            session_id: None,
166            trace_id: None,
167            causality_token: metadata.caused_by,
168            version: Some(metadata.version),
169            timestamp: metadata.timestamp,
170            operation_context: metadata.context.map(|ctx| OperationContext {
171                operation_type: ctx,
172                request_id: None,
173                client_info: None,
174                metrics: None,
175                auth_context: None,
176                custom_fields: HashMap::new(),
177            }),
178            priority: EventPriority::Normal,
179            partition: None,
180            correlation_id: None,
181            checksum: metadata.checksum,
182            schema_version: "1.0".to_string(),
183            tags: metadata.properties,
184            ttl_seconds: None,
185            compression: None,
186            serialization_format: SerializationFormat::Json,
187            message_size: None,
188            processing_hints: ProcessingHints::default(),
189        }
190    }
191}
192
193/// Operation context for enhanced tracking
194#[derive(Debug, Clone, Serialize, Deserialize)]
195pub struct OperationContext {
196    /// Operation type (INSERT, DELETE, UPDATE, etc.)
197    pub operation_type: String,
198    /// Request ID from the original request
199    pub request_id: Option<String>,
200    /// Client information
201    pub client_info: Option<ClientInfo>,
202    /// Performance metrics
203    pub metrics: Option<PerformanceMetrics>,
204    /// Authentication context
205    pub auth_context: Option<AuthContext>,
206    /// Additional custom context
207    pub custom_fields: HashMap<String, String>,
208}
209
210/// Client information
211#[derive(Debug, Clone, Serialize, Deserialize, Encode, Decode)]
212pub struct ClientInfo {
213    /// Client application name
214    pub application: String,
215    /// Client version
216    pub version: String,
217    /// Client IP address
218    pub ip_address: Option<String>,
219    /// User agent string
220    pub user_agent: Option<String>,
221    /// Geographic location
222    pub location: Option<GeoLocation>,
223}
224
225/// Geographic location information
226#[derive(Debug, Clone, Serialize, Deserialize, Encode, Decode)]
227pub struct GeoLocation {
228    /// Country code (ISO 3166-1 alpha-2)
229    pub country: String,
230    /// Region or state
231    pub region: Option<String>,
232    /// City
233    pub city: Option<String>,
234    /// Latitude
235    pub lat: Option<f64>,
236    /// Longitude
237    pub lon: Option<f64>,
238}
239
240/// Performance metrics
241#[derive(Debug, Clone, Serialize, Deserialize, Encode, Decode)]
242pub struct PerformanceMetrics {
243    /// Processing latency in microseconds
244    pub processing_latency_us: Option<u64>,
245    /// Queue wait time in microseconds
246    pub queue_wait_time_us: Option<u64>,
247    /// Serialization time in microseconds
248    pub serialization_time_us: Option<u64>,
249    /// Network latency in microseconds
250    pub network_latency_us: Option<u64>,
251    /// Memory usage in bytes
252    pub memory_usage_bytes: Option<u64>,
253    /// CPU time used in microseconds
254    pub cpu_time_us: Option<u64>,
255}
256
257/// Authentication context
258#[derive(Debug, Clone, Serialize, Deserialize)]
259pub struct AuthContext {
260    /// Authenticated user ID
261    pub user_id: String,
262    /// User roles
263    pub roles: Vec<String>,
264    /// Permissions granted
265    pub permissions: Vec<String>,
266    /// Authentication method used
267    pub auth_method: String,
268    /// Token expiration time
269    pub token_expires_at: Option<chrono::DateTime<chrono::Utc>>,
270}
271
272/// Event priority levels
273#[derive(
274    Debug,
275    Clone,
276    Copy,
277    PartialEq,
278    Eq,
279    PartialOrd,
280    Ord,
281    Serialize,
282    Deserialize,
283    Default,
284    Encode,
285    Decode,
286)]
287pub enum EventPriority {
288    Low = 0,
289    #[default]
290    Normal = 1,
291    High = 2,
292    Critical = 3,
293}
294
295/// Compression types for payload optimization
296#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default, Encode, Decode)]
297pub enum CompressionType {
298    #[default]
299    None,
300    Gzip,
301    Lz4,
302    Zstd,
303    Snappy,
304    Brotli,
305}
306
307/// Serialization formats supported
308#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default, Encode, Decode)]
309pub enum SerializationFormat {
310    #[default]
311    Json,
312    MessagePack,
313    Protobuf,
314    Avro,
315    Cbor,
316    Bincode,
317}
318
319/// Processing hints for optimized handling
320#[derive(Debug, Clone, Serialize, Deserialize, Encode, Decode)]
321pub struct ProcessingHints {
322    /// Whether event can be processed out of order
323    pub allow_out_of_order: bool,
324    /// Whether event can be deduplicated
325    pub allow_deduplication: bool,
326    /// Batch processing preference
327    pub batch_preference: BatchPreference,
328    /// Required consistency level
329    pub consistency_level: ConsistencyLevel,
330    /// Retry policy for failures
331    pub retry_policy: RetryPolicy,
332    /// Processing timeout in milliseconds
333    pub processing_timeout_ms: Option<u64>,
334}
335
336/// Batch processing preferences
337#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Encode, Decode)]
338pub enum BatchPreference {
339    /// Process immediately, don't batch
340    Immediate,
341    /// Can be batched for efficiency
342    Batchable,
343    /// Must be batched with related events
344    RequiredBatch,
345}
346
347/// Consistency level requirements
348#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Encode, Decode)]
349pub enum ConsistencyLevel {
350    /// Eventual consistency is acceptable
351    Eventual,
352    /// Strong consistency required within partition
353    PerPartition,
354    /// Strong consistency required globally
355    Strong,
356}
357
358/// Retry policy configuration
359#[derive(Debug, Clone, Serialize, Deserialize, Encode, Decode)]
360pub struct RetryPolicy {
361    /// Maximum number of retries
362    pub max_retries: u32,
363    /// Base delay between retries in milliseconds
364    pub base_delay_ms: u64,
365    /// Maximum delay between retries in milliseconds
366    pub max_delay_ms: u64,
367    /// Exponential backoff multiplier
368    pub backoff_multiplier: f64,
369    /// Whether to use jitter
370    pub use_jitter: bool,
371}
372
373impl Default for EventMetadata {
374    fn default() -> Self {
375        Self {
376            source: "oxirs-stream".to_string(),
377            user: None,
378            session_id: None,
379            trace_id: None,
380            causality_token: None,
381            version: Some("1.0".to_string()),
382            timestamp: chrono::Utc::now(),
383            operation_context: None,
384            priority: EventPriority::Normal,
385            partition: None,
386            correlation_id: None,
387            checksum: None,
388            schema_version: "1.0".to_string(),
389            tags: HashMap::new(),
390            ttl_seconds: None,
391            compression: None,
392            serialization_format: SerializationFormat::Json,
393            message_size: None,
394            processing_hints: ProcessingHints::default(),
395        }
396    }
397}
398
399impl Default for ProcessingHints {
400    fn default() -> Self {
401        Self {
402            allow_out_of_order: false,
403            allow_deduplication: true,
404            batch_preference: BatchPreference::Batchable,
405            consistency_level: ConsistencyLevel::PerPartition,
406            retry_policy: RetryPolicy::default(),
407            processing_timeout_ms: Some(30000), // 30 seconds
408        }
409    }
410}
411
412impl Default for RetryPolicy {
413    fn default() -> Self {
414        Self {
415            max_retries: 3,
416            base_delay_ms: 100,
417            max_delay_ms: 10000,
418            backoff_multiplier: 2.0,
419            use_jitter: true,
420        }
421    }
422}
423
424/// Enhanced serialization utilities for different formats
425pub mod serialization {
426    use super::*;
427    use anyhow::{anyhow, Result};
428
429    /// Serialize event metadata using specified format
430    pub fn serialize_metadata(
431        metadata: &EventMetadata,
432        format: SerializationFormat,
433    ) -> Result<Vec<u8>> {
434        match format {
435            SerializationFormat::Json => {
436                serde_json::to_vec(metadata).map_err(|e| anyhow!("JSON serialization failed: {e}"))
437            }
438            SerializationFormat::MessagePack => rmp_serde::to_vec(metadata)
439                .map_err(|e| anyhow!("MessagePack serialization failed: {e}")),
440            SerializationFormat::Cbor => {
441                let mut buf = Vec::new();
442                ciborium::ser::into_writer(metadata, &mut buf)
443                    .map_err(|e| anyhow!("CBOR serialization failed: {e}"))?;
444                Ok(buf)
445            }
446            SerializationFormat::Bincode => {
447                oxicode::serde::encode_to_vec(metadata, oxicode::config::standard())
448                    .map_err(|e| anyhow!("Bincode serialization failed: {e}"))
449            }
450            SerializationFormat::Protobuf | SerializationFormat::Avro => {
451                // These would require schema generation and external dependencies
452                // For now, fallback to JSON
453                serde_json::to_vec(metadata)
454                    .map_err(|e| anyhow!("Protobuf/Avro serialization fallback failed: {e}"))
455            }
456        }
457    }
458
459    /// Deserialize event metadata from specified format
460    pub fn deserialize_metadata(data: &[u8], format: SerializationFormat) -> Result<EventMetadata> {
461        match format {
462            SerializationFormat::Json => serde_json::from_slice(data)
463                .map_err(|e| anyhow!("JSON deserialization failed: {e}")),
464            SerializationFormat::MessagePack => rmp_serde::from_slice(data)
465                .map_err(|e| anyhow!("MessagePack deserialization failed: {e}")),
466            SerializationFormat::Cbor => ciborium::de::from_reader(data)
467                .map_err(|e| anyhow!("CBOR deserialization failed: {e}")),
468            SerializationFormat::Bincode => {
469                oxicode::serde::decode_from_slice(data, oxicode::config::standard())
470                    .map(|(v, _)| v)
471                    .map_err(|e| anyhow!("Bincode deserialization failed: {e}"))
472            }
473            SerializationFormat::Protobuf | SerializationFormat::Avro => {
474                // These would require schema generation and external dependencies
475                // For now, fallback to JSON
476                serde_json::from_slice(data)
477                    .map_err(|e| anyhow!("Protobuf/Avro deserialization fallback failed: {e}"))
478            }
479        }
480    }
481
482    /// Compress data using specified compression type
483    pub fn compress_data(data: &[u8], compression: CompressionType) -> Result<Vec<u8>> {
484        match compression {
485            CompressionType::None => Ok(data.to_vec()),
486            CompressionType::Gzip => {
487                use flate2::write::GzEncoder;
488                use flate2::Compression;
489                use std::io::Write;
490
491                let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
492                encoder.write_all(data)?;
493                Ok(encoder.finish()?)
494            }
495            CompressionType::Lz4 => {
496                oxiarc_lz4::compress(data).map_err(|e| anyhow!("LZ4 compression failed: {e}"))
497            }
498            CompressionType::Zstd => {
499                oxiarc_zstd::compress(data).map_err(|e| anyhow!("Zstd compression failed: {e}"))
500            }
501            CompressionType::Snappy => Ok(snap::raw::Encoder::new().compress_vec(data)?),
502            CompressionType::Brotli => {
503                use brotli::CompressorWriter;
504                use std::io::Write;
505                let mut compressed = Vec::new();
506                {
507                    let mut compressor = CompressorWriter::new(&mut compressed, 4096, 6, 22);
508                    compressor.write_all(data)?;
509                } // Compressor is dropped here, flushing data to compressed
510                Ok(compressed)
511            }
512        }
513    }
514
515    /// Decompress data using specified compression type
516    pub fn decompress_data(data: &[u8], compression: CompressionType) -> Result<Vec<u8>> {
517        match compression {
518            CompressionType::None => Ok(data.to_vec()),
519            CompressionType::Gzip => {
520                use flate2::read::GzDecoder;
521                use std::io::Read;
522
523                let mut decoder = GzDecoder::new(data);
524                let mut decompressed = Vec::new();
525                decoder.read_to_end(&mut decompressed)?;
526                Ok(decompressed)
527            }
528            CompressionType::Lz4 => oxiarc_lz4::decompress(data, 100 * 1024 * 1024)
529                .map_err(|e| anyhow!("LZ4 decompression failed: {e}")),
530            CompressionType::Zstd => {
531                oxiarc_zstd::decode_all(data).map_err(|e| anyhow!("Zstd decompression failed: {e}"))
532            }
533            CompressionType::Snappy => snap::raw::Decoder::new()
534                .decompress_vec(data)
535                .map_err(|e| anyhow!("Snappy decompression failed: {e}")),
536            CompressionType::Brotli => {
537                use std::io::Read;
538                let mut decompressed = Vec::new();
539                let mut decompressor = brotli::Decompressor::new(data, 4096);
540                decompressor.read_to_end(&mut decompressed)?;
541                Ok(decompressed)
542            }
543        }
544    }
545}
546
547/// Event processing utilities
548pub mod processing {
549    use super::*;
550    use std::time::{Duration, Instant};
551
552    /// Event processor for handling metadata and optimizations
553    pub struct EventProcessor {
554        pub deduplication_cache: std::collections::HashSet<String>,
555        pub batch_buffer: Vec<(crate::event::StreamEvent, EventMetadata)>,
556        pub last_flush: Instant,
557        pub flush_interval: Duration,
558    }
559
560    impl Default for EventProcessor {
561        fn default() -> Self {
562            Self::new()
563        }
564    }
565
566    impl EventProcessor {
567        pub fn new() -> Self {
568            Self {
569                deduplication_cache: std::collections::HashSet::new(),
570                batch_buffer: Vec::new(),
571                last_flush: Instant::now(),
572                flush_interval: Duration::from_millis(100),
573            }
574        }
575
576        /// Process event with metadata enhancements
577        pub fn process_event(
578            &mut self,
579            mut event: crate::event::StreamEvent,
580        ) -> anyhow::Result<Option<crate::event::StreamEvent>> {
581            // Extract and enhance metadata
582            let metadata = self.extract_metadata(&event)?;
583            let enhanced_metadata = self.enhance_metadata(metadata)?;
584
585            // Check for deduplication
586            if enhanced_metadata.processing_hints.allow_deduplication {
587                if let Some(correlation_id) = &enhanced_metadata.correlation_id {
588                    if self.deduplication_cache.contains(correlation_id) {
589                        return Ok(None); // Duplicate event, skip
590                    }
591                    self.deduplication_cache.insert(correlation_id.clone());
592                }
593            }
594
595            // Update event metadata
596            self.update_event_metadata(&mut event, enhanced_metadata)?;
597
598            // Handle batching
599            match self.get_batch_preference(&event) {
600                BatchPreference::Immediate => Ok(Some(event)),
601                BatchPreference::Batchable | BatchPreference::RequiredBatch => {
602                    self.add_to_batch(event);
603
604                    // Check if we should flush the batch
605                    if self.should_flush_batch() {
606                        // For simplicity, return the last event
607                        // In a real implementation, this would return a batch
608                        Ok(self.batch_buffer.last().map(|(e, _)| e.clone()))
609                    } else {
610                        Ok(None)
611                    }
612                }
613            }
614        }
615
616        fn extract_metadata(
617            &self,
618            event: &crate::event::StreamEvent,
619        ) -> anyhow::Result<EventMetadata> {
620            // Extract metadata from event based on event type
621            match event {
622                crate::event::StreamEvent::TripleAdded { metadata, .. } => {
623                    Ok(metadata.clone().into())
624                }
625                crate::event::StreamEvent::TripleRemoved { metadata, .. } => {
626                    Ok(metadata.clone().into())
627                }
628                crate::event::StreamEvent::GraphCreated { metadata, .. } => {
629                    Ok(metadata.clone().into())
630                }
631                crate::event::StreamEvent::SparqlUpdate { metadata, .. } => {
632                    Ok(metadata.clone().into())
633                }
634                crate::event::StreamEvent::TransactionBegin { metadata, .. } => {
635                    Ok(metadata.clone().into())
636                }
637                crate::event::StreamEvent::Heartbeat { metadata, .. } => {
638                    Ok(metadata.clone().into())
639                }
640                _ => Ok(EventMetadata::default()),
641            }
642        }
643
644        fn enhance_metadata(&self, mut metadata: EventMetadata) -> anyhow::Result<EventMetadata> {
645            // Add timestamp if not present
646            if metadata.timestamp == chrono::DateTime::<chrono::Utc>::MIN_UTC {
647                metadata.timestamp = chrono::Utc::now();
648            }
649
650            // Generate correlation ID if not present
651            if metadata.correlation_id.is_none() {
652                metadata.correlation_id = Some(uuid::Uuid::new_v4().to_string());
653            }
654
655            // Set default schema version
656            if metadata.schema_version.is_empty() {
657                metadata.schema_version = "1.0".to_string();
658            }
659
660            // Add performance metrics
661            if metadata.operation_context.is_none() {
662                metadata.operation_context = Some(OperationContext {
663                    operation_type: "stream_event".to_string(),
664                    request_id: Some(uuid::Uuid::new_v4().to_string()),
665                    client_info: None,
666                    metrics: Some(PerformanceMetrics {
667                        processing_latency_us: Some(0),
668                        queue_wait_time_us: Some(0),
669                        serialization_time_us: Some(0),
670                        network_latency_us: Some(0),
671                        memory_usage_bytes: Some(0),
672                        cpu_time_us: Some(0),
673                    }),
674                    auth_context: None,
675                    custom_fields: HashMap::new(),
676                });
677            }
678
679            Ok(metadata)
680        }
681
682        fn update_event_metadata(
683            &self,
684            event: &mut crate::event::StreamEvent,
685            metadata: EventMetadata,
686        ) -> anyhow::Result<()> {
687            let event_metadata = event::EventMetadata::from(metadata);
688            match event {
689                crate::event::StreamEvent::TripleAdded { metadata: m, .. } => *m = event_metadata,
690                crate::event::StreamEvent::TripleRemoved { metadata: m, .. } => *m = event_metadata,
691                crate::event::StreamEvent::GraphCreated { metadata: m, .. } => *m = event_metadata,
692                crate::event::StreamEvent::SparqlUpdate { metadata: m, .. } => *m = event_metadata,
693                crate::event::StreamEvent::TransactionBegin { metadata: m, .. } => {
694                    *m = event_metadata
695                }
696                crate::event::StreamEvent::Heartbeat { metadata: m, .. } => *m = event_metadata,
697                _ => {}
698            }
699            Ok(())
700        }
701
702        fn get_batch_preference(&self, event: &crate::event::StreamEvent) -> BatchPreference {
703            match event {
704                crate::event::StreamEvent::Heartbeat { .. } => BatchPreference::Immediate,
705                crate::event::StreamEvent::TransactionBegin { .. } => BatchPreference::Immediate,
706                crate::event::StreamEvent::TransactionCommit { .. } => BatchPreference::Immediate,
707                crate::event::StreamEvent::TransactionAbort { .. } => BatchPreference::Immediate,
708                _ => BatchPreference::Batchable,
709            }
710        }
711
712        fn add_to_batch(&mut self, event: crate::event::StreamEvent) {
713            let metadata = self.extract_metadata(&event).unwrap_or_default();
714            self.batch_buffer.push((event, metadata));
715        }
716
717        fn should_flush_batch(&self) -> bool {
718            self.batch_buffer.len() >= 100 || self.last_flush.elapsed() >= self.flush_interval
719        }
720    }
721
722    #[cfg(test)]
723    mod tests {
724        use super::*;
725        use crate::types::serialization::{compress_data, decompress_data};
726
727        #[test]
728        fn test_compression_round_trip() {
729            let test_data = b"Hello, World! This is a test message for compression.";
730            let compression_types = vec![
731                CompressionType::None,
732                CompressionType::Gzip,
733                CompressionType::Lz4,
734                CompressionType::Zstd,
735                CompressionType::Snappy,
736                CompressionType::Brotli,
737            ];
738
739            for compression in compression_types {
740                let compressed = compress_data(test_data, compression).unwrap();
741                let decompressed = decompress_data(&compressed, compression).unwrap();
742                assert_eq!(
743                    test_data,
744                    decompressed.as_slice(),
745                    "Failed round-trip for {compression:?}"
746                );
747            }
748        }
749
750        #[test]
751        fn test_compression_effectiveness() {
752            let test_data = b"AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"; // Repetitive data
753            let compression_types = vec![
754                CompressionType::Gzip,
755                CompressionType::Lz4,
756                CompressionType::Zstd,
757                CompressionType::Snappy,
758                CompressionType::Brotli,
759            ];
760
761            for compression in compression_types {
762                let compressed = compress_data(test_data, compression).unwrap();
763                // Compressed data should be smaller than original for repetitive data
764                assert!(
765                    compressed.len() < test_data.len(),
766                    "Compression {compression:?} did not reduce size"
767                );
768            }
769        }
770
771        #[test]
772        fn test_empty_data_compression() {
773            let test_data = b"";
774            let compression_types = vec![
775                CompressionType::None,
776                CompressionType::Gzip,
777                CompressionType::Lz4,
778                CompressionType::Zstd,
779                CompressionType::Snappy,
780                CompressionType::Brotli,
781            ];
782
783            for compression in compression_types {
784                let compressed = compress_data(test_data, compression).unwrap();
785                let decompressed = decompress_data(&compressed, compression).unwrap();
786                assert_eq!(
787                    test_data,
788                    decompressed.as_slice(),
789                    "Failed empty data round-trip for {compression:?}"
790                );
791            }
792        }
793
794        #[test]
795        fn test_large_data_compression() {
796            let test_data = vec![42u8; 10000]; // 10KB of data
797            let compression_types = vec![
798                CompressionType::None,
799                CompressionType::Gzip,
800                CompressionType::Lz4,
801                CompressionType::Zstd,
802                CompressionType::Snappy,
803                CompressionType::Brotli,
804            ];
805
806            for compression in compression_types {
807                let compressed = compress_data(&test_data, compression).unwrap();
808                let decompressed = decompress_data(&compressed, compression).unwrap();
809                assert_eq!(
810                    test_data, decompressed,
811                    "Failed large data round-trip for {compression:?}"
812                );
813            }
814        }
815
816        #[test]
817        fn test_random_data_compression() {
818            use scirs2_core::random::{Random, Rng};
819            let mut random_gen = Random::default();
820            let test_data: Vec<u8> = (0..1000).map(|_| random_gen.random()).collect();
821            let compression_types = vec![
822                CompressionType::None,
823                CompressionType::Gzip,
824                CompressionType::Lz4,
825                CompressionType::Zstd,
826                CompressionType::Snappy,
827                CompressionType::Brotli,
828            ];
829
830            for compression in compression_types {
831                let compressed = compress_data(&test_data, compression).unwrap();
832                let decompressed = decompress_data(&compressed, compression).unwrap();
833                assert_eq!(
834                    test_data, decompressed,
835                    "Failed random data round-trip for {compression:?}"
836                );
837            }
838        }
839    }
840}