use anyhow::{anyhow, Result};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{RwLock, Semaphore};
use tracing::{debug, error, info};
use uuid::Uuid;
use crate::backend::{self, StreamBackend};
use crate::circuit_breaker::{self, SharedCircuitBreakerExt};
use crate::event::{EventMetadata, StreamEvent};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamConfig {
pub backend: StreamBackendType,
pub topic: String,
pub batch_size: usize,
pub flush_interval_ms: u64,
pub max_connections: usize,
pub connection_timeout: Duration,
pub enable_compression: bool,
pub compression_type: CompressionType,
pub retry_config: RetryConfig,
pub circuit_breaker: CircuitBreakerConfig,
pub security: SecurityConfig,
pub performance: StreamPerformanceConfig,
pub monitoring: MonitoringConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum CompressionType {
None,
Gzip,
Snappy,
Lz4,
Zstd,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RetryConfig {
pub max_retries: u32,
pub initial_backoff: Duration,
pub max_backoff: Duration,
pub backoff_multiplier: f64,
pub jitter: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CircuitBreakerConfig {
pub enabled: bool,
pub failure_threshold: u32,
pub success_threshold: u32,
pub timeout: Duration,
pub half_open_max_calls: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SecurityConfig {
pub enable_tls: bool,
pub verify_certificates: bool,
pub client_cert_path: Option<String>,
pub client_key_path: Option<String>,
pub ca_cert_path: Option<String>,
pub sasl_config: Option<SaslConfig>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SaslConfig {
pub mechanism: SaslMechanism,
pub username: String,
pub password: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum SaslMechanism {
Plain,
ScramSha256,
ScramSha512,
OAuthBearer,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamPerformanceConfig {
pub enable_batching: bool,
pub enable_pipelining: bool,
pub buffer_size: usize,
pub prefetch_count: u32,
pub enable_zero_copy: bool,
pub enable_simd: bool,
pub parallel_processing: bool,
pub worker_threads: Option<usize>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MonitoringConfig {
pub enable_metrics: bool,
pub enable_tracing: bool,
pub metrics_interval: Duration,
pub health_check_interval: Duration,
pub enable_profiling: bool,
pub prometheus_endpoint: Option<String>,
pub jaeger_endpoint: Option<String>,
pub log_level: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum StreamBackendType {
#[cfg(feature = "kafka")]
Kafka {
brokers: Vec<String>,
security_protocol: Option<String>,
sasl_config: Option<SaslConfig>,
},
#[cfg(feature = "nats")]
Nats {
url: String,
cluster_urls: Option<Vec<String>>,
jetstream_config: Option<NatsJetStreamConfig>,
},
#[cfg(feature = "redis")]
Redis {
url: String,
cluster_urls: Option<Vec<String>>,
pool_size: Option<usize>,
},
#[cfg(feature = "kinesis")]
Kinesis {
region: String,
stream_name: String,
credentials: Option<AwsCredentials>,
},
#[cfg(feature = "pulsar")]
Pulsar {
service_url: String,
auth_config: Option<PulsarAuthConfig>,
},
#[cfg(feature = "rabbitmq")]
RabbitMQ {
url: String,
exchange: Option<String>,
queue: Option<String>,
},
Memory {
max_size: Option<usize>,
persistence: bool,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NatsJetStreamConfig {
pub domain: Option<String>,
pub api_prefix: Option<String>,
pub timeout: Duration,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AwsCredentials {
pub access_key_id: String,
pub secret_access_key: String,
pub session_token: Option<String>,
pub role_arn: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PulsarAuthConfig {
pub auth_method: PulsarAuthMethod,
pub auth_params: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum PulsarAuthMethod {
Token,
Jwt,
Oauth2,
Tls,
}
pub struct StreamProducer {
config: StreamConfig,
backend_producer: BackendProducer,
stats: Arc<RwLock<ProducerStats>>,
circuit_breaker: Option<circuit_breaker::SharedCircuitBreaker>,
last_flush: Instant,
_pending_events: Arc<RwLock<Vec<StreamEvent>>>,
batch_buffer: Arc<RwLock<Vec<StreamEvent>>>,
flush_semaphore: Arc<Semaphore>,
}
enum BackendProducer {
#[cfg(feature = "kafka")]
Kafka(backend::kafka::KafkaProducer),
#[cfg(feature = "nats")]
Nats(Box<backend::nats::NatsProducer>),
#[cfg(feature = "redis")]
Redis(backend::redis::RedisProducer),
#[cfg(feature = "kinesis")]
Kinesis(backend::kinesis::KinesisProducer),
#[cfg(feature = "pulsar")]
Pulsar(Box<backend::pulsar::PulsarProducer>),
#[cfg(feature = "rabbitmq")]
RabbitMQ(Box<backend::rabbitmq::RabbitMQProducer>),
Memory(MemoryProducer),
}
#[derive(Debug, Default, Clone)]
pub struct ProducerStats {
events_published: u64,
events_failed: u64,
_bytes_sent: u64,
avg_latency_ms: f64,
max_latency_ms: u64,
batch_count: u64,
flush_count: u64,
circuit_breaker_trips: u64,
last_publish: Option<DateTime<Utc>>,
backend_type: String,
}
type MemoryEventVec = Vec<(DateTime<Utc>, StreamEvent)>;
type MemoryEventStore = Arc<RwLock<MemoryEventVec>>;
static MEMORY_EVENTS: std::sync::OnceLock<MemoryEventStore> = std::sync::OnceLock::new();
pub fn get_memory_events() -> MemoryEventStore {
MEMORY_EVENTS
.get_or_init(|| Arc::new(RwLock::new(Vec::new())))
.clone()
}
pub async fn clear_memory_events() {
let events = get_memory_events();
events.write().await.clear();
backend::memory::clear_memory_storage().await;
}
struct MemoryProducer {
backend: Box<dyn StreamBackend + Send + Sync>,
topic: String,
stats: ProducerStats,
}
impl MemoryProducer {
fn _new(_max_size: Option<usize>, _persistence: bool) -> Self {
Self {
backend: Box::new(backend::memory::MemoryBackend::new()),
topic: "oxirs-stream".to_string(), stats: ProducerStats {
backend_type: "memory".to_string(),
..Default::default()
},
}
}
fn with_topic(topic: String) -> Self {
Self {
backend: Box::new(backend::memory::MemoryBackend::new()),
topic,
stats: ProducerStats {
backend_type: "memory".to_string(),
..Default::default()
},
}
}
async fn publish(&mut self, event: StreamEvent) -> Result<()> {
let start_time = Instant::now();
self.backend
.as_mut()
.connect()
.await
.map_err(|e| anyhow!("Backend connect failed: {}", e))?;
let topic_name = crate::types::TopicName::new(self.topic.clone());
self.backend
.create_topic(&topic_name, 1)
.await
.map_err(|e| anyhow!("Topic creation failed: {}", e))?;
self.backend
.send_event(&topic_name, event)
.await
.map_err(|e| anyhow!("Send event failed: {}", e))?;
self.stats.events_published += 1;
let latency = start_time.elapsed().as_millis() as u64;
self.stats.max_latency_ms = self.stats.max_latency_ms.max(latency);
self.stats.avg_latency_ms = (self.stats.avg_latency_ms + latency as f64) / 2.0;
self.stats.last_publish = Some(Utc::now());
debug!("Memory producer: published event via backend");
Ok(())
}
async fn flush(&mut self) -> Result<()> {
self.stats.flush_count += 1;
debug!("Memory producer: flush completed");
Ok(())
}
fn _get_stats(&self) -> &ProducerStats {
&self.stats
}
}
impl StreamProducer {
pub async fn new(config: StreamConfig) -> Result<Self> {
let circuit_breaker = if config.circuit_breaker.enabled {
Some(circuit_breaker::new_shared_circuit_breaker(
circuit_breaker::CircuitBreakerConfig {
enabled: config.circuit_breaker.enabled,
failure_threshold: config.circuit_breaker.failure_threshold,
success_threshold: config.circuit_breaker.success_threshold,
timeout: config.circuit_breaker.timeout,
half_open_max_calls: config.circuit_breaker.half_open_max_calls,
..Default::default()
},
))
} else {
None
};
let backend_producer = match &config.backend {
#[cfg(feature = "kafka")]
StreamBackendType::Kafka {
brokers,
security_protocol,
sasl_config,
} => {
let stream_config = crate::StreamConfig {
backend: crate::StreamBackendType::Kafka {
brokers: brokers.clone(),
security_protocol: security_protocol.clone(),
sasl_config: sasl_config.clone(),
},
topic: config.topic.clone(),
batch_size: config.batch_size,
flush_interval_ms: config.flush_interval_ms,
max_connections: config.max_connections,
connection_timeout: config.connection_timeout,
enable_compression: config.enable_compression,
compression_type: config.compression_type.clone(),
retry_config: config.retry_config.clone(),
circuit_breaker: config.circuit_breaker.clone(),
security: config.security.clone(),
performance: config.performance.clone(),
monitoring: config.monitoring.clone(),
};
let mut producer = backend::kafka::KafkaProducer::new(stream_config)?;
producer.connect().await?;
BackendProducer::Kafka(producer)
}
#[cfg(feature = "nats")]
StreamBackendType::Nats {
url,
cluster_urls,
jetstream_config,
} => {
let stream_config = crate::StreamConfig {
backend: crate::StreamBackendType::Nats {
url: url.clone(),
cluster_urls: cluster_urls.clone(),
jetstream_config: jetstream_config.clone(),
},
topic: config.topic.clone(),
batch_size: config.batch_size,
flush_interval_ms: config.flush_interval_ms,
max_connections: config.max_connections,
connection_timeout: config.connection_timeout,
enable_compression: config.enable_compression,
compression_type: config.compression_type.clone(),
retry_config: config.retry_config.clone(),
circuit_breaker: config.circuit_breaker.clone(),
security: config.security.clone(),
performance: config.performance.clone(),
monitoring: config.monitoring.clone(),
};
let mut producer = backend::nats::NatsProducer::new(stream_config)?;
producer.connect().await?;
BackendProducer::Nats(Box::new(producer))
}
#[cfg(feature = "redis")]
StreamBackendType::Redis {
url,
cluster_urls,
pool_size,
} => {
let stream_config = crate::StreamConfig {
backend: crate::StreamBackendType::Redis {
url: url.clone(),
cluster_urls: cluster_urls.clone(),
pool_size: *pool_size,
},
topic: config.topic.clone(),
batch_size: config.batch_size,
flush_interval_ms: config.flush_interval_ms,
max_connections: config.max_connections,
connection_timeout: config.connection_timeout,
enable_compression: config.enable_compression,
compression_type: config.compression_type.clone(),
retry_config: config.retry_config.clone(),
circuit_breaker: config.circuit_breaker.clone(),
security: config.security.clone(),
performance: config.performance.clone(),
monitoring: config.monitoring.clone(),
};
let mut producer = backend::redis::RedisProducer::new(stream_config)?;
producer.connect().await?;
BackendProducer::Redis(producer)
}
#[cfg(feature = "kinesis")]
StreamBackendType::Kinesis {
region,
stream_name,
credentials,
} => {
let stream_config = crate::StreamConfig {
backend: crate::StreamBackendType::Kinesis {
region: region.clone(),
stream_name: stream_name.clone(),
credentials: credentials.clone(),
},
topic: config.topic.clone(),
batch_size: config.batch_size,
flush_interval_ms: config.flush_interval_ms,
max_connections: config.max_connections,
connection_timeout: config.connection_timeout,
enable_compression: config.enable_compression,
compression_type: config.compression_type.clone(),
retry_config: config.retry_config.clone(),
circuit_breaker: config.circuit_breaker.clone(),
security: config.security.clone(),
performance: config.performance.clone(),
monitoring: config.monitoring.clone(),
};
let mut producer = backend::kinesis::KinesisProducer::new(stream_config)?;
producer.connect().await?;
BackendProducer::Kinesis(producer)
}
#[cfg(feature = "pulsar")]
StreamBackendType::Pulsar {
service_url,
auth_config,
} => {
let stream_config = crate::StreamConfig {
backend: crate::StreamBackendType::Pulsar {
service_url: service_url.clone(),
auth_config: auth_config.clone(),
},
topic: config.topic.clone(),
batch_size: config.batch_size,
flush_interval_ms: config.flush_interval_ms,
max_connections: config.max_connections,
connection_timeout: config.connection_timeout,
enable_compression: config.enable_compression,
compression_type: config.compression_type.clone(),
retry_config: config.retry_config.clone(),
circuit_breaker: config.circuit_breaker.clone(),
security: config.security.clone(),
performance: config.performance.clone(),
monitoring: config.monitoring.clone(),
};
let mut producer = backend::pulsar::PulsarProducer::new(stream_config)?;
producer.connect().await?;
BackendProducer::Pulsar(Box::new(producer))
}
#[cfg(feature = "rabbitmq")]
StreamBackendType::RabbitMQ {
url,
exchange,
queue,
} => {
let stream_config = crate::StreamConfig {
backend: crate::StreamBackendType::RabbitMQ {
url: url.clone(),
exchange: exchange.clone(),
queue: queue.clone(),
},
topic: config.topic.clone(),
batch_size: config.batch_size,
flush_interval_ms: config.flush_interval_ms,
max_connections: config.max_connections,
connection_timeout: config.connection_timeout,
enable_compression: config.enable_compression,
compression_type: config.compression_type.clone(),
retry_config: config.retry_config.clone(),
circuit_breaker: config.circuit_breaker.clone(),
security: config.security.clone(),
performance: config.performance.clone(),
monitoring: config.monitoring.clone(),
};
let mut producer = backend::rabbitmq::RabbitMQProducer::new(stream_config)?;
producer.connect().await?;
BackendProducer::RabbitMQ(Box::new(producer))
}
StreamBackendType::Memory {
max_size: _,
persistence: _,
} => BackendProducer::Memory(MemoryProducer::with_topic(config.topic.clone())),
};
let stats = Arc::new(RwLock::new(ProducerStats {
backend_type: match backend_producer {
#[cfg(feature = "kafka")]
BackendProducer::Kafka(_) => "kafka".to_string(),
#[cfg(feature = "nats")]
BackendProducer::Nats(_) => "nats".to_string(),
#[cfg(feature = "redis")]
BackendProducer::Redis(_) => "redis".to_string(),
#[cfg(feature = "kinesis")]
BackendProducer::Kinesis(_) => "kinesis".to_string(),
#[cfg(feature = "pulsar")]
BackendProducer::Pulsar(_) => "pulsar".to_string(),
#[cfg(feature = "rabbitmq")]
BackendProducer::RabbitMQ(_) => "rabbitmq".to_string(),
BackendProducer::Memory(_) => "memory".to_string(),
},
..Default::default()
}));
info!(
"Created stream producer with backend: {}",
stats.read().await.backend_type
);
Ok(Self {
config,
backend_producer,
stats,
circuit_breaker,
last_flush: Instant::now(),
_pending_events: Arc::new(RwLock::new(Vec::new())),
batch_buffer: Arc::new(RwLock::new(Vec::new())),
flush_semaphore: Arc::new(Semaphore::new(1)),
})
}
pub async fn publish(&mut self, event: StreamEvent) -> Result<()> {
let start_time = Instant::now();
if let Some(cb) = &self.circuit_breaker {
if !cb.can_execute().await {
self.stats.write().await.circuit_breaker_trips += 1;
return Err(anyhow!("Circuit breaker is open - cannot publish events"));
}
}
if self.config.performance.enable_batching {
let mut batch_buffer = self.batch_buffer.write().await;
batch_buffer.push(event);
if batch_buffer.len() >= self.config.batch_size {
let events = std::mem::take(&mut *batch_buffer);
drop(batch_buffer);
return self.publish_batch_internal(events).await;
}
return Ok(());
}
let result = self.publish_single_event(event).await;
match &result {
Ok(_) => {
if let Some(cb) = &self.circuit_breaker {
cb.record_success_with_duration(start_time.elapsed()).await;
}
let mut stats = self.stats.write().await;
stats.events_published += 1;
let latency = start_time.elapsed().as_millis() as u64;
stats.max_latency_ms = stats.max_latency_ms.max(latency);
stats.avg_latency_ms = (stats.avg_latency_ms + latency as f64) / 2.0;
stats.last_publish = Some(Utc::now());
}
Err(_) => {
if let Some(cb) = &self.circuit_breaker {
cb.record_failure_with_type(circuit_breaker::FailureType::NetworkError)
.await;
}
self.stats.write().await.events_failed += 1;
}
}
result
}
async fn publish_single_event(&mut self, event: StreamEvent) -> Result<()> {
match &mut self.backend_producer {
#[cfg(feature = "kafka")]
BackendProducer::Kafka(producer) => producer.publish(event).await,
#[cfg(feature = "nats")]
BackendProducer::Nats(producer) => producer.publish(event).await,
#[cfg(feature = "redis")]
BackendProducer::Redis(producer) => producer.publish(event).await,
#[cfg(feature = "kinesis")]
BackendProducer::Kinesis(producer) => producer.publish(event).await,
#[cfg(feature = "pulsar")]
BackendProducer::Pulsar(producer) => producer.publish(event).await,
#[cfg(feature = "rabbitmq")]
BackendProducer::RabbitMQ(producer) => producer.publish(event).await,
BackendProducer::Memory(producer) => producer.publish(event).await,
}
}
pub async fn publish_batch(&mut self, events: Vec<StreamEvent>) -> Result<()> {
if events.is_empty() {
return Ok(());
}
self.publish_batch_internal(events).await
}
async fn publish_batch_internal(&mut self, events: Vec<StreamEvent>) -> Result<()> {
let start_time = Instant::now();
let event_count = events.len();
let result = match &mut self.backend_producer {
#[cfg(feature = "kafka")]
BackendProducer::Kafka(producer) => producer.publish_batch(events).await,
#[cfg(feature = "nats")]
BackendProducer::Nats(producer) => producer.publish_batch(events).await,
#[cfg(feature = "redis")]
BackendProducer::Redis(producer) => producer.publish_batch(events).await,
#[cfg(feature = "kinesis")]
BackendProducer::Kinesis(producer) => producer.publish_batch(events).await,
#[cfg(feature = "pulsar")]
BackendProducer::Pulsar(producer) => producer.publish_batch(events).await,
#[cfg(feature = "rabbitmq")]
BackendProducer::RabbitMQ(producer) => producer.publish_batch(events).await,
BackendProducer::Memory(producer) => {
for event in events {
producer.publish(event).await?;
}
Ok(())
}
};
let mut stats = self.stats.write().await;
match &result {
Ok(_) => {
stats.events_published += event_count as u64;
stats.batch_count += 1;
let latency = start_time.elapsed().as_millis() as u64;
stats.max_latency_ms = stats.max_latency_ms.max(latency);
stats.avg_latency_ms = (stats.avg_latency_ms + latency as f64) / 2.0;
stats.last_publish = Some(Utc::now());
}
Err(_) => {
stats.events_failed += event_count as u64;
}
}
debug!(
"Published batch of {} events in {:?}",
event_count,
start_time.elapsed()
);
result
}
pub async fn flush(&mut self) -> Result<()> {
let _permit = self
.flush_semaphore
.acquire()
.await
.map_err(|_| anyhow!("Failed to acquire flush semaphore"))?;
let start_time = Instant::now();
if self.config.performance.enable_batching {
let events = {
let mut batch_buffer = self.batch_buffer.write().await;
if !batch_buffer.is_empty() {
std::mem::take(&mut *batch_buffer)
} else {
Vec::new()
}
};
if !events.is_empty() {
drop(_permit); self.publish_batch_internal(events).await?;
}
}
let result = match &mut self.backend_producer {
#[cfg(feature = "kafka")]
BackendProducer::Kafka(producer) => producer.flush().await,
#[cfg(feature = "nats")]
BackendProducer::Nats(producer) => producer.flush().await,
#[cfg(feature = "redis")]
BackendProducer::Redis(producer) => producer.flush().await,
#[cfg(feature = "kinesis")]
BackendProducer::Kinesis(producer) => producer.flush().await,
#[cfg(feature = "pulsar")]
BackendProducer::Pulsar(producer) => producer.flush().await,
#[cfg(feature = "rabbitmq")]
BackendProducer::RabbitMQ(producer) => producer.flush().await,
BackendProducer::Memory(producer) => producer.flush().await,
};
if result.is_ok() {
self.stats.write().await.flush_count += 1;
self.last_flush = Instant::now();
debug!("Flushed producer buffers in {:?}", start_time.elapsed());
}
result
}
pub async fn publish_patch(&mut self, patch: &RdfPatch) -> Result<()> {
let events: Vec<StreamEvent> = patch
.operations
.iter()
.filter_map(|op| {
let metadata = EventMetadata {
event_id: Uuid::new_v4().to_string(),
timestamp: patch.timestamp,
source: "rdf_patch".to_string(),
user: None,
context: Some(patch.id.clone()),
caused_by: None,
version: "1.0".to_string(),
properties: HashMap::new(),
checksum: None,
};
match op {
PatchOperation::Add {
subject,
predicate,
object,
} => Some(StreamEvent::TripleAdded {
subject: subject.clone(),
predicate: predicate.clone(),
object: object.clone(),
graph: None,
metadata,
}),
PatchOperation::Delete {
subject,
predicate,
object,
} => Some(StreamEvent::TripleRemoved {
subject: subject.clone(),
predicate: predicate.clone(),
object: object.clone(),
graph: None,
metadata,
}),
PatchOperation::AddGraph { graph } => Some(StreamEvent::GraphCreated {
graph: graph.clone(),
metadata,
}),
PatchOperation::DeleteGraph { graph } => Some(StreamEvent::GraphDeleted {
graph: graph.clone(),
metadata,
}),
PatchOperation::AddPrefix { .. } => {
None
}
PatchOperation::DeletePrefix { .. } => {
None
}
PatchOperation::TransactionBegin { .. } => {
Some(StreamEvent::TransactionBegin {
transaction_id: patch
.transaction_id
.clone()
.unwrap_or_else(|| Uuid::new_v4().to_string()),
isolation_level: None,
metadata,
})
}
PatchOperation::TransactionCommit => Some(StreamEvent::TransactionCommit {
transaction_id: patch
.transaction_id
.clone()
.unwrap_or_else(|| Uuid::new_v4().to_string()),
metadata,
}),
PatchOperation::TransactionAbort => Some(StreamEvent::TransactionAbort {
transaction_id: patch
.transaction_id
.clone()
.unwrap_or_else(|| Uuid::new_v4().to_string()),
metadata,
}),
PatchOperation::Header { .. } => {
None
}
}
})
.collect();
self.publish_batch(events).await
}
pub async fn get_stats(&self) -> ProducerStats {
self.stats.read().await.clone()
}
pub async fn health_check(&self) -> bool {
if let Some(cb) = &self.circuit_breaker {
cb.is_healthy().await
} else {
true
}
}
}
pub struct StreamConsumer {
_config: StreamConfig,
backend_consumer: BackendConsumer,
stats: Arc<RwLock<ConsumerStats>>,
circuit_breaker: Option<circuit_breaker::SharedCircuitBreaker>,
last_poll: Instant,
_message_buffer: Arc<RwLock<Vec<StreamEvent>>>,
consumer_group: Option<String>,
}
enum BackendConsumer {
#[cfg(feature = "kafka")]
Kafka(Box<backend::kafka::KafkaConsumer>),
#[cfg(feature = "nats")]
Nats(Box<backend::nats::NatsConsumer>),
#[cfg(feature = "redis")]
Redis(Box<backend::redis::RedisConsumer>),
#[cfg(feature = "kinesis")]
Kinesis(Box<backend::kinesis::KinesisConsumer>),
#[cfg(feature = "pulsar")]
Pulsar(Box<backend::pulsar::PulsarConsumer>),
#[cfg(feature = "rabbitmq")]
RabbitMQ(Box<backend::rabbitmq::RabbitMQConsumer>),
Memory(Box<MemoryConsumer>),
}
#[derive(Debug, Default, Clone)]
pub struct ConsumerStats {
events_consumed: u64,
events_failed: u64,
_bytes_received: u64,
avg_processing_time_ms: f64,
max_processing_time_ms: u64,
_consumer_lag: u64,
circuit_breaker_trips: u64,
last_message: Option<DateTime<Utc>>,
backend_type: String,
_batch_size: usize,
}
struct MemoryConsumer {
backend: Box<dyn StreamBackend + Send + Sync>,
topic: String,
current_offset: u64,
stats: ConsumerStats,
}
impl MemoryConsumer {
fn _new() -> Self {
Self {
backend: Box::new(backend::memory::MemoryBackend::new()),
topic: "oxirs-stream".to_string(),
current_offset: 0,
stats: ConsumerStats {
backend_type: "memory".to_string(),
..Default::default()
},
}
}
fn with_topic(topic: String) -> Self {
Self {
backend: Box::new(backend::memory::MemoryBackend::new()),
topic,
current_offset: 0,
stats: ConsumerStats {
backend_type: "memory".to_string(),
..Default::default()
},
}
}
async fn consume(&mut self) -> Result<Option<StreamEvent>> {
let start_time = Instant::now();
self.backend
.as_mut()
.connect()
.await
.map_err(|e| anyhow!("Backend connect failed: {}", e))?;
let topic_name = crate::types::TopicName::new(self.topic.clone());
let events = self
.backend
.receive_events(
&topic_name,
None, crate::types::StreamPosition::Offset(self.current_offset),
1, )
.await
.map_err(|e| anyhow!("Receive events failed: {}", e))?;
if let Some((event, offset)) = events.first() {
self.current_offset = offset.value() + 1;
self.stats.events_consumed += 1;
let processing_time = start_time.elapsed().as_millis() as u64;
self.stats.max_processing_time_ms =
self.stats.max_processing_time_ms.max(processing_time);
self.stats.avg_processing_time_ms =
(self.stats.avg_processing_time_ms + processing_time as f64) / 2.0;
self.stats.last_message = Some(Utc::now());
debug!("Memory consumer: consumed event via backend");
Ok(Some(event.clone()))
} else {
debug!("Memory consumer: no events available");
Ok(None)
}
}
fn _get_stats(&self) -> &ConsumerStats {
&self.stats
}
fn reset(&mut self) {
self.current_offset = 0;
}
}
impl StreamConsumer {
pub async fn new(config: StreamConfig) -> Result<Self> {
Self::new_with_group(config, None).await
}
pub async fn new_with_group(
config: StreamConfig,
consumer_group: Option<String>,
) -> Result<Self> {
let circuit_breaker = if config.circuit_breaker.enabled {
Some(circuit_breaker::new_shared_circuit_breaker(
circuit_breaker::CircuitBreakerConfig {
enabled: config.circuit_breaker.enabled,
failure_threshold: config.circuit_breaker.failure_threshold,
success_threshold: config.circuit_breaker.success_threshold,
timeout: config.circuit_breaker.timeout,
half_open_max_calls: config.circuit_breaker.half_open_max_calls,
..Default::default()
},
))
} else {
None
};
let backend_consumer = match &config.backend {
#[cfg(feature = "kafka")]
StreamBackendType::Kafka {
brokers,
security_protocol,
sasl_config,
} => {
let stream_config = crate::StreamConfig {
backend: crate::StreamBackendType::Kafka {
brokers: brokers.clone(),
security_protocol: security_protocol.clone(),
sasl_config: sasl_config.clone(),
},
topic: config.topic.clone(),
batch_size: config.batch_size,
flush_interval_ms: config.flush_interval_ms,
max_connections: config.max_connections,
connection_timeout: config.connection_timeout,
enable_compression: config.enable_compression,
compression_type: config.compression_type.clone(),
retry_config: config.retry_config.clone(),
circuit_breaker: config.circuit_breaker.clone(),
security: config.security.clone(),
performance: config.performance.clone(),
monitoring: config.monitoring.clone(),
};
let mut consumer = backend::kafka::KafkaConsumer::new(stream_config)?;
consumer.connect().await?;
BackendConsumer::Kafka(Box::new(consumer))
}
#[cfg(feature = "nats")]
StreamBackendType::Nats {
url,
cluster_urls,
jetstream_config,
} => {
let stream_config = crate::StreamConfig {
backend: crate::StreamBackendType::Nats {
url: url.clone(),
cluster_urls: cluster_urls.clone(),
jetstream_config: jetstream_config.clone(),
},
topic: config.topic.clone(),
batch_size: config.batch_size,
flush_interval_ms: config.flush_interval_ms,
max_connections: config.max_connections,
connection_timeout: config.connection_timeout,
enable_compression: config.enable_compression,
compression_type: config.compression_type.clone(),
retry_config: config.retry_config.clone(),
circuit_breaker: config.circuit_breaker.clone(),
security: config.security.clone(),
performance: config.performance.clone(),
monitoring: config.monitoring.clone(),
};
let mut consumer = backend::nats::NatsConsumer::new(stream_config)?;
consumer.connect().await?;
BackendConsumer::Nats(Box::new(consumer))
}
#[cfg(feature = "redis")]
StreamBackendType::Redis {
url,
cluster_urls,
pool_size,
} => {
let stream_config = crate::StreamConfig {
backend: crate::StreamBackendType::Redis {
url: url.clone(),
cluster_urls: cluster_urls.clone(),
pool_size: *pool_size,
},
topic: config.topic.clone(),
batch_size: config.batch_size,
flush_interval_ms: config.flush_interval_ms,
max_connections: config.max_connections,
connection_timeout: config.connection_timeout,
enable_compression: config.enable_compression,
compression_type: config.compression_type.clone(),
retry_config: config.retry_config.clone(),
circuit_breaker: config.circuit_breaker.clone(),
security: config.security.clone(),
performance: config.performance.clone(),
monitoring: config.monitoring.clone(),
};
let mut consumer = backend::redis::RedisConsumer::new(stream_config)?;
consumer.connect().await?;
BackendConsumer::Redis(Box::new(consumer))
}
#[cfg(feature = "kinesis")]
StreamBackendType::Kinesis {
region,
stream_name,
credentials,
} => {
let stream_config = crate::StreamConfig {
backend: crate::StreamBackendType::Kinesis {
region: region.clone(),
stream_name: stream_name.clone(),
credentials: credentials.clone(),
},
topic: config.topic.clone(),
batch_size: config.batch_size,
flush_interval_ms: config.flush_interval_ms,
max_connections: config.max_connections,
connection_timeout: config.connection_timeout,
enable_compression: config.enable_compression,
compression_type: config.compression_type.clone(),
retry_config: config.retry_config.clone(),
circuit_breaker: config.circuit_breaker.clone(),
security: config.security.clone(),
performance: config.performance.clone(),
monitoring: config.monitoring.clone(),
};
let mut consumer = backend::kinesis::KinesisConsumer::new(stream_config)?;
consumer.connect().await?;
BackendConsumer::Kinesis(Box::new(consumer))
}
#[cfg(feature = "pulsar")]
StreamBackendType::Pulsar {
service_url,
auth_config,
} => {
let stream_config = crate::StreamConfig {
backend: crate::StreamBackendType::Pulsar {
service_url: service_url.clone(),
auth_config: auth_config.clone(),
},
topic: config.topic.clone(),
batch_size: config.batch_size,
flush_interval_ms: config.flush_interval_ms,
max_connections: config.max_connections,
connection_timeout: config.connection_timeout,
enable_compression: config.enable_compression,
compression_type: config.compression_type.clone(),
retry_config: config.retry_config.clone(),
circuit_breaker: config.circuit_breaker.clone(),
security: config.security.clone(),
performance: config.performance.clone(),
monitoring: config.monitoring.clone(),
};
let mut consumer = backend::pulsar::PulsarConsumer::new(stream_config)?;
consumer.connect().await?;
BackendConsumer::Pulsar(Box::new(consumer))
}
#[cfg(feature = "rabbitmq")]
StreamBackendType::RabbitMQ {
url,
exchange,
queue,
} => {
let stream_config = crate::StreamConfig {
backend: crate::StreamBackendType::RabbitMQ {
url: url.clone(),
exchange: exchange.clone(),
queue: queue.clone(),
},
topic: config.topic.clone(),
batch_size: config.batch_size,
flush_interval_ms: config.flush_interval_ms,
max_connections: config.max_connections,
connection_timeout: config.connection_timeout,
enable_compression: config.enable_compression,
compression_type: config.compression_type.clone(),
retry_config: config.retry_config.clone(),
circuit_breaker: config.circuit_breaker.clone(),
security: config.security.clone(),
performance: config.performance.clone(),
monitoring: config.monitoring.clone(),
};
let mut consumer = backend::rabbitmq::RabbitMQConsumer::new(stream_config)?;
consumer.connect().await?;
BackendConsumer::RabbitMQ(Box::new(consumer))
}
StreamBackendType::Memory {
max_size: _,
persistence: _,
} => {
BackendConsumer::Memory(Box::new(MemoryConsumer::with_topic(config.topic.clone())))
}
};
let stats = Arc::new(RwLock::new(ConsumerStats {
backend_type: match backend_consumer {
#[cfg(feature = "kafka")]
BackendConsumer::Kafka(_) => "kafka".to_string(),
#[cfg(feature = "nats")]
BackendConsumer::Nats(_) => "nats".to_string(),
#[cfg(feature = "redis")]
BackendConsumer::Redis(_) => "redis".to_string(),
#[cfg(feature = "kinesis")]
BackendConsumer::Kinesis(_) => "kinesis".to_string(),
#[cfg(feature = "pulsar")]
BackendConsumer::Pulsar(_) => "pulsar".to_string(),
#[cfg(feature = "rabbitmq")]
BackendConsumer::RabbitMQ(_) => "rabbitmq".to_string(),
BackendConsumer::Memory(_) => "memory".to_string(),
},
_batch_size: config.batch_size,
..Default::default()
}));
info!(
"Created stream consumer with backend: {} and group: {:?}",
stats.read().await.backend_type,
consumer_group
);
Ok(Self {
_config: config,
backend_consumer,
stats,
circuit_breaker,
last_poll: Instant::now(),
_message_buffer: Arc::new(RwLock::new(Vec::new())),
consumer_group,
})
}
pub async fn consume(&mut self) -> Result<Option<StreamEvent>> {
let start_time = Instant::now();
if let Some(cb) = &self.circuit_breaker {
if !cb.can_execute().await {
self.stats.write().await.circuit_breaker_trips += 1;
return Err(anyhow!("Circuit breaker is open - cannot consume events"));
}
}
let result = self.consume_single_event().await;
match &result {
Ok(Some(_)) => {
if let Some(cb) = &self.circuit_breaker {
cb.record_success_with_duration(start_time.elapsed()).await;
}
let mut stats = self.stats.write().await;
stats.events_consumed += 1;
let processing_time = start_time.elapsed().as_millis() as u64;
stats.max_processing_time_ms = stats.max_processing_time_ms.max(processing_time);
stats.avg_processing_time_ms =
(stats.avg_processing_time_ms + processing_time as f64) / 2.0;
stats.last_message = Some(Utc::now());
}
Ok(None) => {
if let Some(cb) = &self.circuit_breaker {
cb.record_success_with_duration(start_time.elapsed()).await;
}
}
Err(_) => {
if let Some(cb) = &self.circuit_breaker {
cb.record_failure_with_type(circuit_breaker::FailureType::NetworkError)
.await;
}
self.stats.write().await.events_failed += 1;
}
}
self.last_poll = Instant::now();
result
}
async fn consume_single_event(&mut self) -> Result<Option<StreamEvent>> {
match &mut self.backend_consumer {
#[cfg(feature = "kafka")]
BackendConsumer::Kafka(consumer) => consumer.consume().await,
#[cfg(feature = "nats")]
BackendConsumer::Nats(consumer) => consumer.consume().await,
#[cfg(feature = "redis")]
BackendConsumer::Redis(consumer) => consumer.consume().await,
#[cfg(feature = "kinesis")]
BackendConsumer::Kinesis(consumer) => consumer.consume().await,
#[cfg(feature = "pulsar")]
BackendConsumer::Pulsar(consumer) => consumer.consume().await,
#[cfg(feature = "rabbitmq")]
BackendConsumer::RabbitMQ(consumer) => consumer.consume().await,
BackendConsumer::Memory(consumer) => consumer.consume().await,
}
}
pub async fn consume_batch(
&mut self,
max_events: usize,
timeout: Duration,
) -> Result<Vec<StreamEvent>> {
let mut events = Vec::new();
let start_time = Instant::now();
while events.len() < max_events && start_time.elapsed() < timeout {
match tokio::time::timeout(Duration::from_millis(50), self.consume()).await {
Ok(Ok(Some(event))) => events.push(event),
Ok(Ok(None)) => continue,
Ok(Err(e)) => return Err(e),
Err(_) => break, }
}
if !events.is_empty() {
debug!(
"Consumed batch of {} events in {:?}",
events.len(),
start_time.elapsed()
);
}
Ok(events)
}
pub async fn start_consuming<F>(&mut self, mut callback: F) -> Result<()>
where
F: FnMut(StreamEvent) -> Result<()> + Send,
{
info!("Starting stream consumer loop");
loop {
match self.consume().await {
Ok(Some(event)) => {
if let Err(e) = callback(event) {
error!("Callback error: {}", e);
self.stats.write().await.events_failed += 1;
}
}
Ok(None) => {
tokio::time::sleep(Duration::from_millis(10)).await;
}
Err(e) => {
error!("Consumer error: {}", e);
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
}
}
pub async fn start_consuming_async<F, Fut>(&mut self, mut callback: F) -> Result<()>
where
F: FnMut(StreamEvent) -> Fut + Send,
Fut: std::future::Future<Output = Result<()>> + Send,
{
info!("Starting async stream consumer loop");
loop {
match self.consume().await {
Ok(Some(event)) => {
if let Err(e) = callback(event).await {
error!("Async callback error: {}", e);
self.stats.write().await.events_failed += 1;
}
}
Ok(None) => {
tokio::time::sleep(Duration::from_millis(10)).await;
}
Err(e) => {
error!("Consumer error: {}", e);
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
}
}
pub async fn get_stats(&self) -> ConsumerStats {
self.stats.read().await.clone()
}
pub async fn health_check(&self) -> bool {
if let Some(cb) = &self.circuit_breaker {
cb.is_healthy().await
} else {
true
}
}
pub fn consumer_group(&self) -> Option<&String> {
self.consumer_group.as_ref()
}
pub async fn reset_position(&mut self) -> Result<()> {
match &mut self.backend_consumer {
BackendConsumer::Memory(consumer) => {
consumer.reset();
Ok(())
}
#[cfg(feature = "kafka")]
BackendConsumer::Kafka(_) => {
Err(anyhow!("Reset position not supported for Kafka backend"))
}
#[cfg(feature = "nats")]
BackendConsumer::Nats(_) => {
Err(anyhow!("Reset position not supported for NATS backend"))
}
#[cfg(feature = "redis")]
BackendConsumer::Redis(_) => {
Err(anyhow!("Reset position not supported for Redis backend"))
}
#[cfg(feature = "kinesis")]
BackendConsumer::Kinesis(_) => {
Err(anyhow!("Reset position not supported for Kinesis backend"))
}
#[cfg(feature = "pulsar")]
BackendConsumer::Pulsar(_) => {
Err(anyhow!("Reset position not supported for Pulsar backend"))
}
#[cfg(feature = "rabbitmq")]
BackendConsumer::RabbitMQ(_) => {
Err(anyhow!("Reset position not supported for RabbitMQ backend"))
}
}
}
pub async fn set_test_events(&mut self, _events: Vec<StreamEvent>) -> Result<()> {
Err(anyhow!("set_test_events is deprecated with backend implementation - use producer to publish events"))
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum PatchOperation {
Add {
subject: String,
predicate: String,
object: String,
},
Delete {
subject: String,
predicate: String,
object: String,
},
AddGraph { graph: String },
DeleteGraph { graph: String },
AddPrefix { prefix: String, namespace: String },
DeletePrefix { prefix: String },
TransactionBegin { transaction_id: Option<String> },
TransactionCommit,
TransactionAbort,
Header { key: String, value: String },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RdfPatch {
pub operations: Vec<PatchOperation>,
pub timestamp: chrono::DateTime<chrono::Utc>,
pub id: String,
pub headers: HashMap<String, String>,
pub transaction_id: Option<String>,
pub prefixes: HashMap<String, String>,
}
impl RdfPatch {
pub fn new() -> Self {
Self {
operations: Vec::new(),
timestamp: chrono::Utc::now(),
id: uuid::Uuid::new_v4().to_string(),
headers: HashMap::new(),
transaction_id: None,
prefixes: HashMap::new(),
}
}
pub fn add_operation(&mut self, operation: PatchOperation) {
self.operations.push(operation);
}
pub fn to_rdf_patch_format(&self) -> Result<String> {
let serializer = crate::patch::PatchSerializer::new()
.with_pretty_print(true)
.with_metadata(true);
serializer.serialize(self)
}
pub fn from_rdf_patch_format(input: &str) -> Result<Self> {
let mut parser = crate::patch::PatchParser::new().with_strict_mode(false);
parser.parse(input)
}
}
impl Default for RdfPatch {
fn default() -> Self {
Self::new()
}
}
impl Default for StreamConfig {
fn default() -> Self {
Self {
backend: StreamBackendType::Memory {
max_size: Some(10000),
persistence: false,
},
topic: "oxirs-stream".to_string(),
batch_size: 100,
flush_interval_ms: 100,
max_connections: 10,
connection_timeout: Duration::from_secs(30),
enable_compression: false,
compression_type: CompressionType::None,
retry_config: RetryConfig::default(),
circuit_breaker: CircuitBreakerConfig::default(),
security: SecurityConfig::default(),
performance: StreamPerformanceConfig::default(),
monitoring: MonitoringConfig::default(),
}
}
}
impl Default for RetryConfig {
fn default() -> Self {
Self {
max_retries: 3,
initial_backoff: Duration::from_millis(100),
max_backoff: Duration::from_secs(30),
backoff_multiplier: 2.0,
jitter: true,
}
}
}
impl Default for CircuitBreakerConfig {
fn default() -> Self {
Self {
enabled: true,
failure_threshold: 5,
success_threshold: 3,
timeout: Duration::from_secs(30),
half_open_max_calls: 3,
}
}
}
impl Default for SecurityConfig {
fn default() -> Self {
Self {
enable_tls: false,
verify_certificates: true,
client_cert_path: None,
client_key_path: None,
ca_cert_path: None,
sasl_config: None,
}
}
}
impl Default for StreamPerformanceConfig {
fn default() -> Self {
Self {
enable_batching: true,
enable_pipelining: false,
buffer_size: 8192,
prefetch_count: 100,
enable_zero_copy: false,
enable_simd: false,
parallel_processing: true,
worker_threads: None,
}
}
}
impl Default for MonitoringConfig {
fn default() -> Self {
Self {
enable_metrics: true,
enable_tracing: true,
metrics_interval: Duration::from_secs(60),
health_check_interval: Duration::from_secs(30),
enable_profiling: false,
prometheus_endpoint: None,
jaeger_endpoint: None,
log_level: "info".to_string(),
}
}
}
impl StreamConfig {
#[cfg(feature = "redis")]
pub fn redis(url: String) -> Self {
Self {
backend: StreamBackendType::Redis {
url,
cluster_urls: None,
pool_size: Some(10),
},
..Default::default()
}
}
#[cfg(feature = "kinesis")]
pub fn kinesis(region: String, stream_name: String) -> Self {
Self {
backend: StreamBackendType::Kinesis {
region,
stream_name,
credentials: None,
},
..Default::default()
}
}
pub fn memory() -> Self {
Self {
backend: StreamBackendType::Memory {
max_size: Some(1000),
persistence: false,
},
..Default::default()
}
}
pub fn high_performance(mut self) -> Self {
self.performance.enable_batching = true;
self.performance.enable_pipelining = true;
self.performance.parallel_processing = true;
self.performance.buffer_size = 65536;
self.performance.prefetch_count = 1000;
self.batch_size = 1000;
self.flush_interval_ms = 10;
self
}
pub fn with_compression(mut self, compression_type: CompressionType) -> Self {
self.enable_compression = true;
self.compression_type = compression_type;
self
}
pub fn with_circuit_breaker(mut self, enabled: bool, failure_threshold: u32) -> Self {
self.circuit_breaker.enabled = enabled;
self.circuit_breaker.failure_threshold = failure_threshold;
self
}
pub fn development(topic: &str) -> Self {
Self {
backend: StreamBackendType::Memory {
max_size: Some(10000),
persistence: false,
},
topic: topic.to_string(),
batch_size: 10,
flush_interval_ms: 100,
max_connections: 5,
connection_timeout: Duration::from_secs(10),
enable_compression: false,
compression_type: CompressionType::None,
retry_config: RetryConfig {
max_retries: 3,
initial_backoff: Duration::from_millis(100),
max_backoff: Duration::from_secs(5),
backoff_multiplier: 2.0,
jitter: true,
},
circuit_breaker: CircuitBreakerConfig {
enabled: false,
failure_threshold: 5,
success_threshold: 2,
timeout: Duration::from_secs(60),
half_open_max_calls: 10,
},
security: SecurityConfig::default(),
performance: StreamPerformanceConfig::default(),
monitoring: MonitoringConfig {
enable_metrics: true,
enable_tracing: false,
metrics_interval: Duration::from_secs(5),
health_check_interval: Duration::from_secs(30),
enable_profiling: false,
prometheus_endpoint: None,
jaeger_endpoint: None,
log_level: "debug".to_string(),
},
}
}
pub fn production(topic: &str) -> Self {
Self {
backend: StreamBackendType::Memory {
max_size: Some(100000),
persistence: true,
},
topic: topic.to_string(),
batch_size: 1000,
flush_interval_ms: 10,
max_connections: 50,
connection_timeout: Duration::from_secs(30),
enable_compression: true,
compression_type: CompressionType::Zstd,
retry_config: RetryConfig {
max_retries: 5,
initial_backoff: Duration::from_millis(200),
max_backoff: Duration::from_secs(30),
backoff_multiplier: 2.0,
jitter: true,
},
circuit_breaker: CircuitBreakerConfig {
enabled: true,
failure_threshold: 10,
success_threshold: 3,
timeout: Duration::from_secs(30),
half_open_max_calls: 5,
},
security: SecurityConfig::default(),
performance: StreamPerformanceConfig {
enable_batching: true,
enable_pipelining: true,
parallel_processing: true,
buffer_size: 65536,
prefetch_count: 1000,
enable_zero_copy: true,
enable_simd: true,
worker_threads: None,
},
monitoring: MonitoringConfig {
enable_metrics: true,
enable_tracing: true,
metrics_interval: Duration::from_secs(1),
health_check_interval: Duration::from_secs(10),
enable_profiling: true,
prometheus_endpoint: None,
jaeger_endpoint: None,
log_level: "info".to_string(),
},
}
}
}
pub struct Stream {
producer: StreamProducer,
consumer: StreamConsumer,
}
impl Stream {
pub async fn new(config: StreamConfig) -> Result<Self> {
let producer = StreamProducer::new(config.clone()).await?;
let consumer = StreamConsumer::new(config).await?;
Ok(Self { producer, consumer })
}
pub async fn publish(&mut self, event: StreamEvent) -> Result<()> {
self.producer.publish(event).await
}
pub async fn consume(&mut self) -> Result<Option<StreamEvent>> {
self.consumer.consume().await
}
pub async fn flush(&mut self) -> Result<()> {
self.producer.flush().await
}
pub async fn producer_stats(&self) -> ProducerStats {
self.producer.get_stats().await
}
pub async fn consumer_stats(&self) -> ConsumerStats {
self.consumer.get_stats().await
}
pub async fn close(&mut self) -> Result<()> {
self.producer.flush().await?;
debug!("Stream closed successfully");
Ok(())
}
pub async fn health_check(&self) -> Result<bool> {
Ok(true)
}
pub async fn begin_transaction(&mut self) -> Result<()> {
debug!("Transaction begun (placeholder)");
Ok(())
}
pub async fn commit_transaction(&mut self) -> Result<()> {
debug!("Transaction committed (placeholder)");
Ok(())
}
pub async fn rollback_transaction(&mut self) -> Result<()> {
debug!("Transaction rolled back (placeholder)");
Ok(())
}
}