use crate::event;
use oxicode::{Decode, Encode};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fmt;
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
pub struct TopicName(String);
impl TopicName {
pub fn new(name: String) -> Self {
Self(name)
}
pub fn as_str(&self) -> &str {
&self.0
}
}
impl fmt::Display for TopicName {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
impl From<&str> for TopicName {
fn from(s: &str) -> Self {
Self(s.to_string())
}
}
impl From<String> for TopicName {
fn from(s: String) -> Self {
Self(s)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
pub struct PartitionId(u32);
impl PartitionId {
pub fn new(id: u32) -> Self {
Self(id)
}
pub fn value(&self) -> u32 {
self.0
}
}
impl fmt::Display for PartitionId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
pub struct Offset(u64);
impl Offset {
pub fn new(offset: u64) -> Self {
Self(offset)
}
pub fn value(&self) -> u64 {
self.0
}
}
impl fmt::Display for Offset {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Encode, Decode)]
pub enum StreamPosition {
Beginning,
End,
Offset(u64),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EventMetadata {
pub source: String,
pub user: Option<String>,
pub session_id: Option<String>,
pub trace_id: Option<String>,
pub causality_token: Option<String>,
pub version: Option<String>,
pub timestamp: chrono::DateTime<chrono::Utc>,
pub operation_context: Option<OperationContext>,
pub priority: EventPriority,
pub partition: Option<PartitionId>,
pub correlation_id: Option<String>,
pub checksum: Option<String>,
pub schema_version: String,
pub tags: HashMap<String, String>,
pub ttl_seconds: Option<u64>,
pub compression: Option<CompressionType>,
pub serialization_format: SerializationFormat,
pub message_size: Option<usize>,
pub processing_hints: ProcessingHints,
}
impl From<EventMetadata> for event::EventMetadata {
fn from(metadata: EventMetadata) -> Self {
Self {
event_id: format!(
"evt_{}",
chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0)
), timestamp: metadata.timestamp,
source: metadata.source,
user: metadata.user,
context: metadata.operation_context.map(|ctx| ctx.operation_type),
caused_by: metadata.causality_token,
version: metadata.version.unwrap_or(metadata.schema_version),
properties: HashMap::new(), checksum: metadata.checksum,
}
}
}
impl From<event::EventMetadata> for EventMetadata {
fn from(metadata: event::EventMetadata) -> Self {
Self {
source: metadata.source,
user: metadata.user,
session_id: None,
trace_id: None,
causality_token: metadata.caused_by,
version: Some(metadata.version),
timestamp: metadata.timestamp,
operation_context: metadata.context.map(|ctx| OperationContext {
operation_type: ctx,
request_id: None,
client_info: None,
metrics: None,
auth_context: None,
custom_fields: HashMap::new(),
}),
priority: EventPriority::Normal,
partition: None,
correlation_id: None,
checksum: metadata.checksum,
schema_version: "1.0".to_string(),
tags: metadata.properties,
ttl_seconds: None,
compression: None,
serialization_format: SerializationFormat::Json,
message_size: None,
processing_hints: ProcessingHints::default(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OperationContext {
pub operation_type: String,
pub request_id: Option<String>,
pub client_info: Option<ClientInfo>,
pub metrics: Option<PerformanceMetrics>,
pub auth_context: Option<AuthContext>,
pub custom_fields: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Encode, Decode)]
pub struct ClientInfo {
pub application: String,
pub version: String,
pub ip_address: Option<String>,
pub user_agent: Option<String>,
pub location: Option<GeoLocation>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Encode, Decode)]
pub struct GeoLocation {
pub country: String,
pub region: Option<String>,
pub city: Option<String>,
pub lat: Option<f64>,
pub lon: Option<f64>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Encode, Decode)]
pub struct PerformanceMetrics {
pub processing_latency_us: Option<u64>,
pub queue_wait_time_us: Option<u64>,
pub serialization_time_us: Option<u64>,
pub network_latency_us: Option<u64>,
pub memory_usage_bytes: Option<u64>,
pub cpu_time_us: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AuthContext {
pub user_id: String,
pub roles: Vec<String>,
pub permissions: Vec<String>,
pub auth_method: String,
pub token_expires_at: Option<chrono::DateTime<chrono::Utc>>,
}
#[derive(
Debug,
Clone,
Copy,
PartialEq,
Eq,
PartialOrd,
Ord,
Serialize,
Deserialize,
Default,
Encode,
Decode,
)]
pub enum EventPriority {
Low = 0,
#[default]
Normal = 1,
High = 2,
Critical = 3,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default, Encode, Decode)]
pub enum CompressionType {
#[default]
None,
Gzip,
Lz4,
Zstd,
Snappy,
Brotli,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default, Encode, Decode)]
pub enum SerializationFormat {
#[default]
Json,
MessagePack,
Protobuf,
Avro,
Cbor,
Bincode,
}
#[derive(Debug, Clone, Serialize, Deserialize, Encode, Decode)]
pub struct ProcessingHints {
pub allow_out_of_order: bool,
pub allow_deduplication: bool,
pub batch_preference: BatchPreference,
pub consistency_level: ConsistencyLevel,
pub retry_policy: RetryPolicy,
pub processing_timeout_ms: Option<u64>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Encode, Decode)]
pub enum BatchPreference {
Immediate,
Batchable,
RequiredBatch,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Encode, Decode)]
pub enum ConsistencyLevel {
Eventual,
PerPartition,
Strong,
}
#[derive(Debug, Clone, Serialize, Deserialize, Encode, Decode)]
pub struct RetryPolicy {
pub max_retries: u32,
pub base_delay_ms: u64,
pub max_delay_ms: u64,
pub backoff_multiplier: f64,
pub use_jitter: bool,
}
impl Default for EventMetadata {
fn default() -> Self {
Self {
source: "oxirs-stream".to_string(),
user: None,
session_id: None,
trace_id: None,
causality_token: None,
version: Some("1.0".to_string()),
timestamp: chrono::Utc::now(),
operation_context: None,
priority: EventPriority::Normal,
partition: None,
correlation_id: None,
checksum: None,
schema_version: "1.0".to_string(),
tags: HashMap::new(),
ttl_seconds: None,
compression: None,
serialization_format: SerializationFormat::Json,
message_size: None,
processing_hints: ProcessingHints::default(),
}
}
}
impl Default for ProcessingHints {
fn default() -> Self {
Self {
allow_out_of_order: false,
allow_deduplication: true,
batch_preference: BatchPreference::Batchable,
consistency_level: ConsistencyLevel::PerPartition,
retry_policy: RetryPolicy::default(),
processing_timeout_ms: Some(30000), }
}
}
impl Default for RetryPolicy {
fn default() -> Self {
Self {
max_retries: 3,
base_delay_ms: 100,
max_delay_ms: 10000,
backoff_multiplier: 2.0,
use_jitter: true,
}
}
}
pub mod serialization {
use super::*;
use anyhow::{anyhow, Result};
pub fn serialize_metadata(
metadata: &EventMetadata,
format: SerializationFormat,
) -> Result<Vec<u8>> {
match format {
SerializationFormat::Json => {
serde_json::to_vec(metadata).map_err(|e| anyhow!("JSON serialization failed: {e}"))
}
SerializationFormat::MessagePack => rmp_serde::to_vec(metadata)
.map_err(|e| anyhow!("MessagePack serialization failed: {e}")),
SerializationFormat::Cbor => {
let mut buf = Vec::new();
ciborium::ser::into_writer(metadata, &mut buf)
.map_err(|e| anyhow!("CBOR serialization failed: {e}"))?;
Ok(buf)
}
SerializationFormat::Bincode => {
oxicode::serde::encode_to_vec(metadata, oxicode::config::standard())
.map_err(|e| anyhow!("Bincode serialization failed: {e}"))
}
SerializationFormat::Protobuf | SerializationFormat::Avro => {
serde_json::to_vec(metadata)
.map_err(|e| anyhow!("Protobuf/Avro serialization fallback failed: {e}"))
}
}
}
pub fn deserialize_metadata(data: &[u8], format: SerializationFormat) -> Result<EventMetadata> {
match format {
SerializationFormat::Json => serde_json::from_slice(data)
.map_err(|e| anyhow!("JSON deserialization failed: {e}")),
SerializationFormat::MessagePack => rmp_serde::from_slice(data)
.map_err(|e| anyhow!("MessagePack deserialization failed: {e}")),
SerializationFormat::Cbor => ciborium::de::from_reader(data)
.map_err(|e| anyhow!("CBOR deserialization failed: {e}")),
SerializationFormat::Bincode => {
oxicode::serde::decode_from_slice(data, oxicode::config::standard())
.map(|(v, _)| v)
.map_err(|e| anyhow!("Bincode deserialization failed: {e}"))
}
SerializationFormat::Protobuf | SerializationFormat::Avro => {
serde_json::from_slice(data)
.map_err(|e| anyhow!("Protobuf/Avro deserialization fallback failed: {e}"))
}
}
}
pub fn compress_data(data: &[u8], compression: CompressionType) -> Result<Vec<u8>> {
match compression {
CompressionType::None => Ok(data.to_vec()),
CompressionType::Gzip => {
use flate2::write::GzEncoder;
use flate2::Compression;
use std::io::Write;
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
encoder.write_all(data)?;
Ok(encoder.finish()?)
}
CompressionType::Lz4 => {
oxiarc_lz4::compress(data).map_err(|e| anyhow!("LZ4 compression failed: {e}"))
}
CompressionType::Zstd => {
oxiarc_zstd::compress(data).map_err(|e| anyhow!("Zstd compression failed: {e}"))
}
CompressionType::Snappy => Ok(snap::raw::Encoder::new().compress_vec(data)?),
CompressionType::Brotli => {
use brotli::CompressorWriter;
use std::io::Write;
let mut compressed = Vec::new();
{
let mut compressor = CompressorWriter::new(&mut compressed, 4096, 6, 22);
compressor.write_all(data)?;
} Ok(compressed)
}
}
}
pub fn decompress_data(data: &[u8], compression: CompressionType) -> Result<Vec<u8>> {
match compression {
CompressionType::None => Ok(data.to_vec()),
CompressionType::Gzip => {
use flate2::read::GzDecoder;
use std::io::Read;
let mut decoder = GzDecoder::new(data);
let mut decompressed = Vec::new();
decoder.read_to_end(&mut decompressed)?;
Ok(decompressed)
}
CompressionType::Lz4 => oxiarc_lz4::decompress(data, 100 * 1024 * 1024)
.map_err(|e| anyhow!("LZ4 decompression failed: {e}")),
CompressionType::Zstd => {
oxiarc_zstd::decode_all(data).map_err(|e| anyhow!("Zstd decompression failed: {e}"))
}
CompressionType::Snappy => snap::raw::Decoder::new()
.decompress_vec(data)
.map_err(|e| anyhow!("Snappy decompression failed: {e}")),
CompressionType::Brotli => {
use std::io::Read;
let mut decompressed = Vec::new();
let mut decompressor = brotli::Decompressor::new(data, 4096);
decompressor.read_to_end(&mut decompressed)?;
Ok(decompressed)
}
}
}
}
pub mod processing {
use super::*;
use std::time::{Duration, Instant};
pub struct EventProcessor {
pub deduplication_cache: std::collections::HashSet<String>,
pub batch_buffer: Vec<(crate::event::StreamEvent, EventMetadata)>,
pub last_flush: Instant,
pub flush_interval: Duration,
}
impl Default for EventProcessor {
fn default() -> Self {
Self::new()
}
}
impl EventProcessor {
pub fn new() -> Self {
Self {
deduplication_cache: std::collections::HashSet::new(),
batch_buffer: Vec::new(),
last_flush: Instant::now(),
flush_interval: Duration::from_millis(100),
}
}
pub fn process_event(
&mut self,
mut event: crate::event::StreamEvent,
) -> anyhow::Result<Option<crate::event::StreamEvent>> {
let metadata = self.extract_metadata(&event)?;
let enhanced_metadata = self.enhance_metadata(metadata)?;
if enhanced_metadata.processing_hints.allow_deduplication {
if let Some(correlation_id) = &enhanced_metadata.correlation_id {
if self.deduplication_cache.contains(correlation_id) {
return Ok(None); }
self.deduplication_cache.insert(correlation_id.clone());
}
}
self.update_event_metadata(&mut event, enhanced_metadata)?;
match self.get_batch_preference(&event) {
BatchPreference::Immediate => Ok(Some(event)),
BatchPreference::Batchable | BatchPreference::RequiredBatch => {
self.add_to_batch(event);
if self.should_flush_batch() {
Ok(self.batch_buffer.last().map(|(e, _)| e.clone()))
} else {
Ok(None)
}
}
}
}
fn extract_metadata(
&self,
event: &crate::event::StreamEvent,
) -> anyhow::Result<EventMetadata> {
match event {
crate::event::StreamEvent::TripleAdded { metadata, .. } => {
Ok(metadata.clone().into())
}
crate::event::StreamEvent::TripleRemoved { metadata, .. } => {
Ok(metadata.clone().into())
}
crate::event::StreamEvent::GraphCreated { metadata, .. } => {
Ok(metadata.clone().into())
}
crate::event::StreamEvent::SparqlUpdate { metadata, .. } => {
Ok(metadata.clone().into())
}
crate::event::StreamEvent::TransactionBegin { metadata, .. } => {
Ok(metadata.clone().into())
}
crate::event::StreamEvent::Heartbeat { metadata, .. } => {
Ok(metadata.clone().into())
}
_ => Ok(EventMetadata::default()),
}
}
fn enhance_metadata(&self, mut metadata: EventMetadata) -> anyhow::Result<EventMetadata> {
if metadata.timestamp == chrono::DateTime::<chrono::Utc>::MIN_UTC {
metadata.timestamp = chrono::Utc::now();
}
if metadata.correlation_id.is_none() {
metadata.correlation_id = Some(uuid::Uuid::new_v4().to_string());
}
if metadata.schema_version.is_empty() {
metadata.schema_version = "1.0".to_string();
}
if metadata.operation_context.is_none() {
metadata.operation_context = Some(OperationContext {
operation_type: "stream_event".to_string(),
request_id: Some(uuid::Uuid::new_v4().to_string()),
client_info: None,
metrics: Some(PerformanceMetrics {
processing_latency_us: Some(0),
queue_wait_time_us: Some(0),
serialization_time_us: Some(0),
network_latency_us: Some(0),
memory_usage_bytes: Some(0),
cpu_time_us: Some(0),
}),
auth_context: None,
custom_fields: HashMap::new(),
});
}
Ok(metadata)
}
fn update_event_metadata(
&self,
event: &mut crate::event::StreamEvent,
metadata: EventMetadata,
) -> anyhow::Result<()> {
let event_metadata = event::EventMetadata::from(metadata);
match event {
crate::event::StreamEvent::TripleAdded { metadata: m, .. } => *m = event_metadata,
crate::event::StreamEvent::TripleRemoved { metadata: m, .. } => *m = event_metadata,
crate::event::StreamEvent::GraphCreated { metadata: m, .. } => *m = event_metadata,
crate::event::StreamEvent::SparqlUpdate { metadata: m, .. } => *m = event_metadata,
crate::event::StreamEvent::TransactionBegin { metadata: m, .. } => {
*m = event_metadata
}
crate::event::StreamEvent::Heartbeat { metadata: m, .. } => *m = event_metadata,
_ => {}
}
Ok(())
}
fn get_batch_preference(&self, event: &crate::event::StreamEvent) -> BatchPreference {
match event {
crate::event::StreamEvent::Heartbeat { .. } => BatchPreference::Immediate,
crate::event::StreamEvent::TransactionBegin { .. } => BatchPreference::Immediate,
crate::event::StreamEvent::TransactionCommit { .. } => BatchPreference::Immediate,
crate::event::StreamEvent::TransactionAbort { .. } => BatchPreference::Immediate,
_ => BatchPreference::Batchable,
}
}
fn add_to_batch(&mut self, event: crate::event::StreamEvent) {
let metadata = self.extract_metadata(&event).unwrap_or_default();
self.batch_buffer.push((event, metadata));
}
fn should_flush_batch(&self) -> bool {
self.batch_buffer.len() >= 100 || self.last_flush.elapsed() >= self.flush_interval
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::serialization::{compress_data, decompress_data};
#[test]
fn test_compression_round_trip() {
let test_data = b"Hello, World! This is a test message for compression.";
let compression_types = vec![
CompressionType::None,
CompressionType::Gzip,
CompressionType::Lz4,
CompressionType::Zstd,
CompressionType::Snappy,
CompressionType::Brotli,
];
for compression in compression_types {
let compressed = compress_data(test_data, compression).unwrap();
let decompressed = decompress_data(&compressed, compression).unwrap();
assert_eq!(
test_data,
decompressed.as_slice(),
"Failed round-trip for {compression:?}"
);
}
}
#[test]
fn test_compression_effectiveness() {
let test_data = b"AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"; let compression_types = vec![
CompressionType::Gzip,
CompressionType::Lz4,
CompressionType::Zstd,
CompressionType::Snappy,
CompressionType::Brotli,
];
for compression in compression_types {
let compressed = compress_data(test_data, compression).unwrap();
assert!(
compressed.len() < test_data.len(),
"Compression {compression:?} did not reduce size"
);
}
}
#[test]
fn test_empty_data_compression() {
let test_data = b"";
let compression_types = vec![
CompressionType::None,
CompressionType::Gzip,
CompressionType::Lz4,
CompressionType::Zstd,
CompressionType::Snappy,
CompressionType::Brotli,
];
for compression in compression_types {
let compressed = compress_data(test_data, compression).unwrap();
let decompressed = decompress_data(&compressed, compression).unwrap();
assert_eq!(
test_data,
decompressed.as_slice(),
"Failed empty data round-trip for {compression:?}"
);
}
}
#[test]
fn test_large_data_compression() {
let test_data = vec![42u8; 10000]; let compression_types = vec![
CompressionType::None,
CompressionType::Gzip,
CompressionType::Lz4,
CompressionType::Zstd,
CompressionType::Snappy,
CompressionType::Brotli,
];
for compression in compression_types {
let compressed = compress_data(&test_data, compression).unwrap();
let decompressed = decompress_data(&compressed, compression).unwrap();
assert_eq!(
test_data, decompressed,
"Failed large data round-trip for {compression:?}"
);
}
}
#[test]
fn test_random_data_compression() {
use scirs2_core::random::Random;
use scirs2_core::RngExt;
let mut random_gen = Random::default();
let test_data: Vec<u8> = (0..1000).map(|_| random_gen.random()).collect();
let compression_types = vec![
CompressionType::None,
CompressionType::Gzip,
CompressionType::Lz4,
CompressionType::Zstd,
CompressionType::Snappy,
CompressionType::Brotli,
];
for compression in compression_types {
let compressed = compress_data(&test_data, compression).unwrap();
let decompressed = decompress_data(&compressed, compression).unwrap();
assert_eq!(
test_data, decompressed,
"Failed random data round-trip for {compression:?}"
);
}
}
}
}