use crate::{EventMetadata, PatchOperation, RdfPatch, StreamConfig, StreamEvent};
use anyhow::{anyhow, Result};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
use tokio::time;
use tracing::{debug, error, info};
use uuid::Uuid;
#[cfg(feature = "rabbitmq")]
use lapin::{
options::*,
types::{AMQPValue, FieldTable, ShortString},
Channel, Connection, ConnectionProperties, Consumer as LapinConsumer, ExchangeKind,
};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RabbitMQConfig {
pub url: String,
pub exchange: String,
pub exchange_type: ExchangeType,
pub queue: String,
pub routing_key: String,
pub persistent: bool,
pub consumer_tag: String,
pub prefetch_count: u16,
pub auto_ack: bool,
pub connection_timeout: Duration,
pub heartbeat_interval: Duration,
pub max_reconnect_attempts: u32,
pub reconnect_delay: Duration,
pub compression_enabled: bool,
pub dlx_exchange: Option<String>,
pub message_ttl_ms: Option<u32>,
pub max_queue_length: Option<u32>,
}
impl Default for RabbitMQConfig {
fn default() -> Self {
Self {
url: "amqp://guest:guest@localhost:5672/%2f".to_string(),
exchange: "oxirs.rdf.stream".to_string(),
exchange_type: ExchangeType::Topic,
queue: "oxirs.rdf.queue".to_string(),
routing_key: "rdf.events.#".to_string(),
persistent: true,
consumer_tag: format!("consumer-{}", Uuid::new_v4()),
prefetch_count: 100,
auto_ack: false,
connection_timeout: Duration::from_secs(10),
heartbeat_interval: Duration::from_secs(60),
max_reconnect_attempts: 5,
reconnect_delay: Duration::from_secs(2),
compression_enabled: true,
dlx_exchange: Some("oxirs.rdf.dlx".to_string()),
message_ttl_ms: Some(86400000), max_queue_length: Some(1000000),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum ExchangeType {
Direct,
Topic,
Fanout,
Headers,
}
impl From<&ExchangeType> for ExchangeKind {
fn from(et: &ExchangeType) -> Self {
match et {
ExchangeType::Direct => ExchangeKind::Direct,
ExchangeType::Topic => ExchangeKind::Topic,
ExchangeType::Fanout => ExchangeKind::Fanout,
ExchangeType::Headers => ExchangeKind::Headers,
}
}
}
pub struct RabbitMQConnection {
#[cfg(feature = "rabbitmq")]
connection: Option<Connection>,
#[cfg(feature = "rabbitmq")]
channel: Option<Channel>,
#[cfg(not(feature = "rabbitmq"))]
_phantom: std::marker::PhantomData<()>,
config: RabbitMQConfig,
reconnect_attempts: u32,
}
pub struct RabbitMQProducer {
config: StreamConfig,
rabbitmq_config: RabbitMQConfig,
connection: Option<RabbitMQConnection>,
pending_events: Vec<RabbitMQStreamEvent>,
stats: Arc<RwLock<ProducerStats>>,
sequence_number: u64,
last_flush: Instant,
}
#[derive(Debug, Default, Clone)]
pub struct ProducerStats {
events_published: u64,
events_failed: u64,
bytes_sent: u64,
connection_retries: u64,
last_publish: Option<DateTime<Utc>>,
avg_latency_ms: f64,
max_latency_ms: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RabbitMQStreamEvent {
pub event_id: String,
pub event_type: String,
pub timestamp: DateTime<Utc>,
pub sequence: u64,
pub data: serde_json::Value,
pub metadata: EventMetadata,
pub checksum: Option<String>,
pub compressed: bool,
}
impl From<StreamEvent> for RabbitMQStreamEvent {
fn from(event: StreamEvent) -> Self {
let (event_type, data, metadata) = match event {
StreamEvent::TripleAdded {
subject,
predicate,
object,
graph,
metadata,
} => (
"triple_added".to_string(),
serde_json::json!({
"subject": subject,
"predicate": predicate,
"object": object,
"graph": graph
}),
metadata,
),
StreamEvent::TripleRemoved {
subject,
predicate,
object,
graph,
metadata,
} => (
"triple_removed".to_string(),
serde_json::json!({
"subject": subject,
"predicate": predicate,
"object": object,
"graph": graph
}),
metadata,
),
StreamEvent::QuadAdded {
subject,
predicate,
object,
graph,
metadata,
} => (
"quad_added".to_string(),
serde_json::json!({
"subject": subject,
"predicate": predicate,
"object": object,
"graph": graph
}),
metadata,
),
StreamEvent::QuadRemoved {
subject,
predicate,
object,
graph,
metadata,
} => (
"quad_removed".to_string(),
serde_json::json!({
"subject": subject,
"predicate": predicate,
"object": object,
"graph": graph
}),
metadata,
),
StreamEvent::GraphCreated { graph, metadata } => (
"graph_created".to_string(),
serde_json::json!({
"graph": graph
}),
metadata,
),
StreamEvent::GraphCleared { graph, metadata } => (
"graph_cleared".to_string(),
serde_json::json!({
"graph": graph
}),
metadata,
),
StreamEvent::GraphDeleted { graph, metadata } => (
"graph_deleted".to_string(),
serde_json::json!({
"graph": graph
}),
metadata,
),
StreamEvent::SparqlUpdate {
query,
operation_type,
metadata,
} => (
"sparql_update".to_string(),
serde_json::json!({
"query": query,
"operation_type": operation_type
}),
metadata,
),
StreamEvent::TransactionBegin {
transaction_id,
isolation_level,
metadata,
} => (
"transaction_begin".to_string(),
serde_json::json!({
"transaction_id": transaction_id,
"isolation_level": isolation_level
}),
metadata,
),
StreamEvent::TransactionCommit {
transaction_id,
metadata,
} => (
"transaction_commit".to_string(),
serde_json::json!({
"transaction_id": transaction_id
}),
metadata,
),
StreamEvent::TransactionAbort {
transaction_id,
metadata,
} => (
"transaction_abort".to_string(),
serde_json::json!({
"transaction_id": transaction_id
}),
metadata,
),
StreamEvent::SchemaChanged {
schema_type,
change_type,
details,
metadata,
} => (
"schema_changed".to_string(),
serde_json::json!({
"schema_type": schema_type,
"change_type": change_type,
"details": details
}),
metadata,
),
StreamEvent::Heartbeat {
timestamp,
source,
metadata: _,
} => (
"heartbeat".to_string(),
serde_json::json!({
"source": source
}),
EventMetadata {
event_id: Uuid::new_v4().to_string(),
timestamp,
source: source.clone(),
user: None,
context: None,
caused_by: None,
version: "1.0".to_string(),
properties: HashMap::new(),
checksum: None,
},
),
_ => {
(
"other".to_string(),
serde_json::json!({}),
EventMetadata::default(),
)
}
};
Self {
event_id: metadata.event_id.clone(),
event_type,
timestamp: metadata.timestamp,
sequence: 0,
data,
metadata,
checksum: None,
compressed: false,
}
}
}
impl RabbitMQConnection {
pub fn new(config: RabbitMQConfig) -> Self {
Self {
#[cfg(feature = "rabbitmq")]
connection: None,
#[cfg(feature = "rabbitmq")]
channel: None,
#[cfg(not(feature = "rabbitmq"))]
_phantom: std::marker::PhantomData,
config,
reconnect_attempts: 0,
}
}
#[cfg(feature = "rabbitmq")]
pub async fn connect(&mut self) -> Result<()> {
let conn = Connection::connect(&self.config.url, ConnectionProperties::default())
.await
.map_err(|e| anyhow!("Failed to connect to RabbitMQ: {}", e))?;
let channel = conn
.create_channel()
.await
.map_err(|e| anyhow!("Failed to create channel: {}", e))?;
channel
.exchange_declare(
self.config.exchange.as_str().into(),
ExchangeKind::from(&self.config.exchange_type),
ExchangeDeclareOptions {
passive: false,
durable: true,
auto_delete: false,
internal: false,
nowait: false,
},
FieldTable::default(),
)
.await
.map_err(|e| anyhow!("Failed to declare exchange: {}", e))?;
if let Some(dlx) = &self.config.dlx_exchange {
channel
.exchange_declare(
dlx.as_str().into(),
ExchangeKind::Fanout,
ExchangeDeclareOptions {
passive: false,
durable: true,
auto_delete: false,
internal: false,
nowait: false,
},
FieldTable::default(),
)
.await
.map_err(|e| anyhow!("Failed to declare DLX: {}", e))?;
}
self.connection = Some(conn);
self.channel = Some(channel);
self.reconnect_attempts = 0;
info!("Connected to RabbitMQ: {}", self.config.url);
Ok(())
}
#[cfg(not(feature = "rabbitmq"))]
pub async fn connect(&mut self) -> Result<()> {
warn!("RabbitMQ feature not enabled, using mock connection");
Ok(())
}
#[cfg(feature = "rabbitmq")]
pub fn channel(&self) -> Result<&Channel> {
self.channel
.as_ref()
.ok_or_else(|| anyhow!("No channel available"))
}
#[cfg(feature = "rabbitmq")]
pub fn is_connected(&self) -> bool {
self.connection
.as_ref()
.map(|c| c.status().connected())
.unwrap_or(false)
}
#[cfg(not(feature = "rabbitmq"))]
pub fn is_connected(&self) -> bool {
false
}
}
impl RabbitMQProducer {
pub fn new(config: StreamConfig) -> Result<Self> {
let rabbitmq_config =
if let crate::StreamBackendType::RabbitMQ { url, .. } = &config.backend {
RabbitMQConfig {
url: url.clone(),
..Default::default()
}
} else {
RabbitMQConfig::default()
};
let connection = Some(RabbitMQConnection::new(rabbitmq_config.clone()));
Ok(Self {
config,
rabbitmq_config,
connection,
pending_events: Vec::new(),
stats: Arc::new(RwLock::new(ProducerStats::default())),
sequence_number: 0,
last_flush: Instant::now(),
})
}
pub fn with_rabbitmq_config(mut self, config: RabbitMQConfig) -> Self {
self.rabbitmq_config = config.clone();
self.connection = Some(RabbitMQConnection::new(config));
self
}
pub async fn connect(&mut self) -> Result<()> {
if let Some(conn) = &mut self.connection {
conn.connect().await?;
}
Ok(())
}
pub async fn publish(&mut self, event: StreamEvent) -> Result<()> {
let start_time = Instant::now();
if self
.connection
.as_ref()
.map(|c| !c.is_connected())
.unwrap_or(true)
{
self.connect().await?;
}
let mut rabbitmq_event = RabbitMQStreamEvent::from(event);
rabbitmq_event.sequence = self.sequence_number;
self.sequence_number += 1;
if self.rabbitmq_config.compression_enabled {
rabbitmq_event = self.compress_event(rabbitmq_event)?;
}
self.publish_single_event(&rabbitmq_event).await?;
let latency = start_time.elapsed().as_millis() as u64;
let mut stats = self.stats.write().await;
stats.max_latency_ms = stats.max_latency_ms.max(latency);
stats.avg_latency_ms = (stats.avg_latency_ms + latency as f64) / 2.0;
stats.last_publish = Some(Utc::now());
Ok(())
}
#[cfg(feature = "rabbitmq")]
async fn publish_single_event(&mut self, event: &RabbitMQStreamEvent) -> Result<()> {
let serialized =
serde_json::to_vec(event).map_err(|e| anyhow!("Failed to serialize event: {}", e))?;
let mut properties = lapin::BasicProperties::default()
.with_content_type(ShortString::from("application/json"))
.with_content_encoding(ShortString::from(if event.compressed {
"gzip"
} else {
"utf-8"
}))
.with_message_id(ShortString::from(event.event_id.clone()))
.with_timestamp(event.timestamp.timestamp() as u64)
.with_app_id(ShortString::from("oxirs-stream"));
if self.rabbitmq_config.persistent {
properties = properties.with_delivery_mode(2); }
let mut headers = FieldTable::default();
headers.insert(
ShortString::from("event_type"),
AMQPValue::LongString(event.event_type.clone().into()),
);
headers.insert(
ShortString::from("sequence"),
AMQPValue::LongLongInt(event.sequence as i64),
);
properties = properties.with_headers(headers);
if let Some(conn) = &self.connection {
let channel = conn.channel()?;
channel
.basic_publish(
self.rabbitmq_config.exchange.as_str().into(),
self.rabbitmq_config.routing_key.as_str().into(),
BasicPublishOptions::default(),
&serialized,
properties,
)
.await
.map_err(|e| anyhow!("Failed to publish to RabbitMQ: {}", e))?
.await
.map_err(|e| anyhow!("Publisher confirm failed: {}", e))?;
let mut stats = self.stats.write().await;
stats.events_published += 1;
stats.bytes_sent += serialized.len() as u64;
debug!("Published event {} to RabbitMQ", event.event_id);
}
Ok(())
}
#[cfg(not(feature = "rabbitmq"))]
async fn publish_single_event(&mut self, event: &RabbitMQStreamEvent) -> Result<()> {
self.pending_events.push(event.clone());
debug!("Mock RabbitMQ publish: {}", event.event_id);
Ok(())
}
pub async fn publish_batch(&mut self, events: Vec<StreamEvent>) -> Result<()> {
for event in events {
self.publish(event).await?;
}
self.flush().await
}
pub async fn publish_patch(&mut self, patch: &RdfPatch) -> Result<()> {
for operation in &patch.operations {
let metadata = EventMetadata {
event_id: Uuid::new_v4().to_string(),
timestamp: Utc::now(),
source: "rdf_patch".to_string(),
user: None,
context: Some(patch.id.clone()),
caused_by: None,
version: "1.0".to_string(),
properties: HashMap::new(),
checksum: None,
};
let event = match operation {
PatchOperation::Add {
subject,
predicate,
object,
} => StreamEvent::TripleAdded {
subject: subject.clone(),
predicate: predicate.clone(),
object: object.clone(),
graph: None,
metadata,
},
PatchOperation::Delete {
subject,
predicate,
object,
} => StreamEvent::TripleRemoved {
subject: subject.clone(),
predicate: predicate.clone(),
object: object.clone(),
graph: None,
metadata,
},
PatchOperation::AddGraph { graph } => StreamEvent::GraphCreated {
graph: graph.clone(),
metadata,
},
PatchOperation::DeleteGraph { graph } => StreamEvent::GraphDeleted {
graph: graph.clone(),
metadata,
},
PatchOperation::AddPrefix {
prefix: _,
namespace: _,
} => continue,
PatchOperation::DeletePrefix { prefix: _ } => continue,
PatchOperation::TransactionBegin { transaction_id } => {
StreamEvent::TransactionBegin {
transaction_id: transaction_id
.clone()
.unwrap_or_else(|| uuid::Uuid::new_v4().to_string()),
isolation_level: Some(crate::IsolationLevel::ReadCommitted),
metadata,
}
}
PatchOperation::TransactionCommit => StreamEvent::TransactionCommit {
transaction_id: "unknown".to_string(),
metadata,
},
PatchOperation::TransactionAbort => StreamEvent::TransactionAbort {
transaction_id: "unknown".to_string(),
metadata,
},
PatchOperation::Header { key, value } => {
tracing::debug!("Processing patch header: {} = {}", key, value);
continue;
}
};
self.publish(event).await?;
}
self.flush().await
}
pub async fn flush(&mut self) -> Result<()> {
self.last_flush = Instant::now();
debug!("Flushed RabbitMQ producer");
Ok(())
}
fn compress_event(&self, mut event: RabbitMQStreamEvent) -> Result<RabbitMQStreamEvent> {
if self.rabbitmq_config.compression_enabled {
let data_str = event.data.to_string();
if data_str.len() > 1024 {
event.compressed = true;
debug!(
"Compressed event {} from {} bytes",
event.event_id,
data_str.len()
);
}
}
Ok(event)
}
pub async fn get_stats(&self) -> ProducerStats {
(*self.stats.read().await).clone()
}
}
pub struct RabbitMQConsumer {
config: StreamConfig,
rabbitmq_config: RabbitMQConfig,
connection: Option<RabbitMQConnection>,
#[cfg(feature = "rabbitmq")]
consumer: Option<LapinConsumer>,
#[cfg(not(feature = "rabbitmq"))]
_consumer: std::marker::PhantomData<()>,
stats: Arc<RwLock<ConsumerStats>>,
}
#[derive(Debug, Default, Clone)]
pub struct ConsumerStats {
events_consumed: u64,
events_failed: u64,
bytes_received: u64,
last_message: Option<DateTime<Utc>>,
avg_processing_time_ms: f64,
connection_errors: u64,
acknowledged_messages: u64,
rejected_messages: u64,
}
impl RabbitMQConsumer {
pub fn new(config: StreamConfig) -> Result<Self> {
let rabbitmq_config =
if let crate::StreamBackendType::RabbitMQ { url, .. } = &config.backend {
RabbitMQConfig {
url: url.clone(),
..Default::default()
}
} else {
RabbitMQConfig::default()
};
let connection = Some(RabbitMQConnection::new(rabbitmq_config.clone()));
Ok(Self {
config,
rabbitmq_config,
connection,
#[cfg(feature = "rabbitmq")]
consumer: None,
#[cfg(not(feature = "rabbitmq"))]
_consumer: std::marker::PhantomData,
stats: Arc::new(RwLock::new(ConsumerStats::default())),
})
}
pub fn with_rabbitmq_config(mut self, config: RabbitMQConfig) -> Self {
self.rabbitmq_config = config.clone();
self.connection = Some(RabbitMQConnection::new(config));
self
}
#[cfg(feature = "rabbitmq")]
pub async fn connect(&mut self) -> Result<()> {
if let Some(conn) = &mut self.connection {
conn.connect().await?;
let channel = conn.channel()?;
let mut queue_args = FieldTable::default();
if let Some(dlx) = &self.rabbitmq_config.dlx_exchange {
queue_args.insert(
ShortString::from("x-dead-letter-exchange"),
AMQPValue::LongString(dlx.clone().into()),
);
}
if let Some(ttl) = self.rabbitmq_config.message_ttl_ms {
queue_args.insert(
ShortString::from("x-message-ttl"),
AMQPValue::LongInt(ttl as i32),
);
}
if let Some(max_len) = self.rabbitmq_config.max_queue_length {
queue_args.insert(
ShortString::from("x-max-length"),
AMQPValue::LongInt(max_len as i32),
);
}
channel
.queue_declare(
self.rabbitmq_config.queue.as_str().into(),
QueueDeclareOptions {
passive: false,
durable: true,
exclusive: false,
auto_delete: false,
nowait: false,
},
queue_args,
)
.await
.map_err(|e| anyhow!("Failed to declare queue: {}", e))?;
channel
.queue_bind(
self.rabbitmq_config.queue.as_str().into(),
self.rabbitmq_config.exchange.as_str().into(),
self.rabbitmq_config.routing_key.as_str().into(),
QueueBindOptions::default(),
FieldTable::default(),
)
.await
.map_err(|e| anyhow!("Failed to bind queue: {}", e))?;
channel
.basic_qos(
self.rabbitmq_config.prefetch_count,
BasicQosOptions::default(),
)
.await
.map_err(|e| anyhow!("Failed to set QoS: {}", e))?;
let consumer = channel
.basic_consume(
self.rabbitmq_config.queue.as_str().into(),
self.rabbitmq_config.consumer_tag.as_str().into(),
BasicConsumeOptions {
no_local: false,
no_ack: self.rabbitmq_config.auto_ack,
exclusive: false,
nowait: false,
},
FieldTable::default(),
)
.await
.map_err(|e| anyhow!("Failed to start consuming: {}", e))?;
self.consumer = Some(consumer);
info!(
"Started RabbitMQ consumer on queue: {}",
self.rabbitmq_config.queue
);
}
Ok(())
}
#[cfg(not(feature = "rabbitmq"))]
pub async fn connect(&mut self) -> Result<()> {
warn!("RabbitMQ feature not enabled, using mock consumer");
Ok(())
}
#[cfg(feature = "rabbitmq")]
pub async fn consume(&mut self) -> Result<Option<StreamEvent>> {
use futures::StreamExt;
if self.consumer.is_none() {
self.connect().await?;
}
if let Some(consumer) = &mut self.consumer {
match time::timeout(Duration::from_millis(100), consumer.next()).await {
Ok(Some(Ok(delivery))) => {
let start_time = Instant::now();
match serde_json::from_slice::<RabbitMQStreamEvent>(&delivery.data) {
Ok(rabbitmq_event) => {
let mut stats = self.stats.write().await;
stats.events_consumed += 1;
stats.bytes_received += delivery.data.len() as u64;
stats.last_message = Some(Utc::now());
let processing_time = start_time.elapsed().as_millis() as f64;
stats.avg_processing_time_ms =
(stats.avg_processing_time_ms + processing_time) / 2.0;
if !self.rabbitmq_config.auto_ack {
delivery
.ack(BasicAckOptions::default())
.await
.map_err(|e| anyhow!("Failed to ack message: {}", e))?;
stats.acknowledged_messages += 1;
}
let stream_event = self.convert_rabbitmq_event(rabbitmq_event)?;
debug!("Consumed RabbitMQ event: {:?}", stream_event);
Ok(Some(stream_event))
}
Err(e) => {
let mut stats = self.stats.write().await;
stats.events_failed += 1;
if !self.rabbitmq_config.auto_ack {
delivery
.nack(BasicNackOptions {
requeue: false, multiple: false,
})
.await
.map_err(|e| anyhow!("Failed to nack message: {}", e))?;
stats.rejected_messages += 1;
}
error!("Failed to parse RabbitMQ event: {}", e);
Err(anyhow!("Event parse error: {}", e))
}
}
}
Ok(Some(Err(e))) => {
let mut stats = self.stats.write().await;
stats.connection_errors += 1;
error!("RabbitMQ consumer error: {}", e);
Err(anyhow!("Consumer error: {}", e))
}
Ok(None) => Ok(None),
Err(_) => Ok(None), }
} else {
Ok(None)
}
}
#[cfg(not(feature = "rabbitmq"))]
pub async fn consume(&mut self) -> Result<Option<StreamEvent>> {
time::sleep(Duration::from_millis(100)).await;
Ok(None)
}
fn convert_rabbitmq_event(&self, rabbitmq_event: RabbitMQStreamEvent) -> Result<StreamEvent> {
let metadata = rabbitmq_event.metadata;
match rabbitmq_event.event_type.as_str() {
"triple_added" => {
let subject = rabbitmq_event.data["subject"]
.as_str()
.ok_or_else(|| anyhow!("Missing subject"))?
.to_string();
let predicate = rabbitmq_event.data["predicate"]
.as_str()
.ok_or_else(|| anyhow!("Missing predicate"))?
.to_string();
let object = rabbitmq_event.data["object"]
.as_str()
.ok_or_else(|| anyhow!("Missing object"))?
.to_string();
let graph = rabbitmq_event.data["graph"].as_str().map(|s| s.to_string());
Ok(StreamEvent::TripleAdded {
subject,
predicate,
object,
graph,
metadata,
})
}
"triple_removed" => {
let subject = rabbitmq_event.data["subject"]
.as_str()
.ok_or_else(|| anyhow!("Missing subject"))?
.to_string();
let predicate = rabbitmq_event.data["predicate"]
.as_str()
.ok_or_else(|| anyhow!("Missing predicate"))?
.to_string();
let object = rabbitmq_event.data["object"]
.as_str()
.ok_or_else(|| anyhow!("Missing object"))?
.to_string();
let graph = rabbitmq_event.data["graph"].as_str().map(|s| s.to_string());
Ok(StreamEvent::TripleRemoved {
subject,
predicate,
object,
graph,
metadata,
})
}
"graph_created" => {
let graph = rabbitmq_event.data["graph"]
.as_str()
.ok_or_else(|| anyhow!("Missing graph"))?
.to_string();
Ok(StreamEvent::GraphCreated { graph, metadata })
}
"graph_cleared" => {
let graph = rabbitmq_event.data["graph"].as_str().map(|s| s.to_string());
Ok(StreamEvent::GraphCleared { graph, metadata })
}
"heartbeat" => {
let source = rabbitmq_event.data["source"]
.as_str()
.ok_or_else(|| anyhow!("Missing source"))?
.to_string();
Ok(StreamEvent::Heartbeat {
timestamp: rabbitmq_event.timestamp,
source,
metadata,
})
}
_ => Err(anyhow!("Unknown event type: {}", rabbitmq_event.event_type)),
}
}
pub async fn consume_batch(
&mut self,
max_events: usize,
timeout: Duration,
) -> Result<Vec<StreamEvent>> {
let mut events = Vec::new();
let start_time = time::Instant::now();
while events.len() < max_events && start_time.elapsed() < timeout {
match time::timeout(Duration::from_millis(50), self.consume()).await {
Ok(Ok(Some(event))) => events.push(event),
Ok(Ok(None)) => continue,
Ok(Err(e)) => return Err(e),
Err(_) => break,
}
}
Ok(events)
}
pub async fn get_stats(&self) -> ConsumerStats {
(*self.stats.read().await).clone()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_rabbitmq_config_default() {
let config = RabbitMQConfig::default();
assert_eq!(config.exchange_type, ExchangeType::Topic);
assert!(config.persistent);
assert_eq!(config.prefetch_count, 100);
}
#[test]
fn test_event_conversion() {
let event = StreamEvent::TripleAdded {
subject: "s".to_string(),
predicate: "p".to_string(),
object: "o".to_string(),
graph: None,
metadata: EventMetadata::default(),
};
let rabbitmq_event = RabbitMQStreamEvent::from(event);
assert_eq!(rabbitmq_event.event_type, "triple_added");
assert!(!rabbitmq_event.compressed);
}
}