use crate::canonical_message::tracing_support::LazyMessageIds;
use crate::event_store::{
event_store_exists, get_or_create_event_store, EventStore, EventStoreConsumer,
};
use crate::models::MemoryConfig;
use crate::traits::{
BatchCommitFunc, BoxFuture, ConsumerError, EndpointStatus, MessageConsumer, MessageDisposition,
MessagePublisher, PublisherError, Received, ReceivedBatch, Sent, SentBatch,
};
use crate::CanonicalMessage;
use anyhow::anyhow;
use async_channel::{bounded, Receiver, Sender};
use async_trait::async_trait;
use once_cell::sync::Lazy;
use std::any::Any;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use tokio::sync::oneshot;
use tracing::{info, trace, warn};
static RUNTIME_MEMORY_CHANNELS: Lazy<Mutex<HashMap<String, MemoryChannel>>> =
Lazy::new(|| Mutex::new(HashMap::new()));
static RUNTIME_RESPONSE_CHANNELS: Lazy<Mutex<HashMap<String, MemoryResponseChannel>>> =
Lazy::new(|| Mutex::new(HashMap::new()));
#[derive(Debug, Clone)]
pub struct MemoryChannel {
pub sender: Sender<Vec<CanonicalMessage>>,
pub receiver: Receiver<Vec<CanonicalMessage>>,
}
impl MemoryChannel {
pub fn new(capacity: usize) -> Self {
let (sender, receiver) = bounded(capacity);
Self { sender, receiver }
}
pub async fn send_message(&self, message: CanonicalMessage) -> anyhow::Result<()> {
self.sender.send(vec![message]).await?;
tracing::debug!("Message sent to memory {} channel", self.sender.len());
Ok(())
}
pub async fn fill_messages(&self, messages: Vec<CanonicalMessage>) -> anyhow::Result<()> {
self.sender
.send(messages)
.await
.map_err(|e| anyhow!("Memory channel was closed while filling messages: {}", e))?;
Ok(())
}
pub fn close(&self) {
self.sender.close();
}
pub fn drain_messages(&self) -> Vec<CanonicalMessage> {
let mut messages = Vec::new();
while let Ok(batch) = self.receiver.try_recv() {
messages.extend(batch);
}
messages
}
pub fn len(&self) -> usize {
self.receiver.len()
}
pub fn is_empty(&self) -> bool {
self.receiver.is_empty()
}
}
#[derive(Debug, Clone)]
pub struct MemoryResponseChannel {
pub sender: Sender<CanonicalMessage>,
pub receiver: Receiver<CanonicalMessage>,
waiters: Arc<tokio::sync::Mutex<HashMap<String, oneshot::Sender<CanonicalMessage>>>>,
}
impl MemoryResponseChannel {
pub fn new(capacity: usize) -> Self {
let (sender, receiver) = bounded(capacity);
Self {
sender,
receiver,
waiters: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
}
}
pub fn close(&self) {
self.sender.close();
}
pub fn len(&self) -> usize {
self.receiver.len()
}
pub fn is_empty(&self) -> bool {
self.receiver.is_empty()
}
pub async fn wait_for_response(&self) -> anyhow::Result<CanonicalMessage> {
self.receiver
.recv()
.await
.map_err(|e| anyhow!("Error receiving response: {}", e))
}
pub async fn register_waiter(
&self,
correlation_id: &str,
sender: oneshot::Sender<CanonicalMessage>,
) -> anyhow::Result<()> {
let mut waiters = self.waiters.lock().await;
if waiters.contains_key(correlation_id) {
return Err(anyhow!(
"Correlation ID {} already registered",
correlation_id
));
}
waiters.insert(correlation_id.to_string(), sender);
Ok(())
}
pub async fn remove_waiter(
&self,
correlation_id: &str,
) -> Option<oneshot::Sender<CanonicalMessage>> {
self.waiters.lock().await.remove(correlation_id)
}
}
pub fn get_or_create_channel(config: &MemoryConfig) -> MemoryChannel {
let mut channels = RUNTIME_MEMORY_CHANNELS.lock().unwrap();
channels
.entry(config.topic.clone()) .or_insert_with(|| {
info!(topic = %config.topic, "Creating new runtime memory channel");
MemoryChannel::new(config.capacity.unwrap_or(100))
})
.clone()
}
pub fn get_or_create_response_channel(topic: &str) -> MemoryResponseChannel {
let mut channels = RUNTIME_RESPONSE_CHANNELS.lock().unwrap();
channels
.entry(topic.to_string())
.or_insert_with(|| {
info!(topic = %topic, "Creating new runtime memory response channel");
MemoryResponseChannel::new(100)
})
.clone()
}
fn memory_channel_exists(topic: &str) -> bool {
let channels = RUNTIME_MEMORY_CHANNELS.lock().unwrap();
channels.contains_key(topic)
}
#[derive(Debug, Clone)]
pub struct MemoryPublisher {
topic: String,
backend: PublisherBackend,
request_reply: bool,
request_timeout: std::time::Duration,
}
#[derive(Debug, Clone)]
enum PublisherBackend {
Queue(Sender<Vec<CanonicalMessage>>),
Log(Arc<EventStore>),
}
impl MemoryPublisher {
pub fn new(config: &MemoryConfig) -> anyhow::Result<Self> {
let channel_exists = memory_channel_exists(&config.topic);
let store_exists = event_store_exists(&config.topic);
let backend = if config.subscribe_mode {
if channel_exists {
return Err(anyhow!("Topic '{}' is already active as a Queue (MemoryChannel), but Subscriber mode (EventStore) was requested.", config.topic));
}
let store = get_or_create_event_store(&config.topic);
PublisherBackend::Log(store)
} else if store_exists {
tracing::debug!(topic = %config.topic, "Adapting publisher to Log mode due to existing EventStore");
let store = get_or_create_event_store(&config.topic);
PublisherBackend::Log(store)
} else {
let channel = get_or_create_channel(config);
PublisherBackend::Queue(channel.sender)
};
Ok(Self {
topic: config.topic.clone(),
backend,
request_reply: config.request_reply,
request_timeout: std::time::Duration::from_millis(
config.request_timeout_ms.unwrap_or(30000),
),
})
}
pub fn new_local(topic: &str, capacity: usize) -> Self {
Self::new(&MemoryConfig {
topic: topic.to_string(),
capacity: Some(capacity),
..Default::default()
})
.expect("Failed to create local memory publisher")
}
pub fn channel(&self) -> MemoryChannel {
get_or_create_channel(&MemoryConfig {
topic: self.topic.clone(),
capacity: None,
..Default::default()
})
}
}
#[async_trait]
impl MessagePublisher for MemoryPublisher {
async fn send(&self, mut message: CanonicalMessage) -> Result<Sent, PublisherError> {
match &self.backend {
PublisherBackend::Log(store) => {
store.append(message).await;
Ok(Sent::Ack)
}
PublisherBackend::Queue(sender) => {
if self.request_reply {
let cid = message
.metadata
.entry("correlation_id".to_string())
.or_insert_with(fast_uuid_v7::gen_id_string)
.clone();
let (tx, rx) = oneshot::channel();
let response_channel = get_or_create_response_channel(&self.topic);
response_channel
.register_waiter(&cid, tx)
.await
.map_err(PublisherError::NonRetryable)?;
if let Err(e) = sender.send(vec![message]).await {
response_channel.remove_waiter(&cid).await;
return Err(anyhow!("Failed to send to memory channel: {}", e).into());
}
let response = match tokio::time::timeout(self.request_timeout, rx).await {
Ok(Ok(resp)) => resp,
Ok(Err(e)) => {
response_channel.remove_waiter(&cid).await;
return Err(anyhow!(
"Failed to receive response for correlation_id {}: {}",
cid,
e
)
.into());
}
Err(_) => {
response_channel.remove_waiter(&cid).await;
return Err(PublisherError::Retryable(anyhow!(
"Request timed out waiting for response for correlation_id {}",
cid
)));
}
};
Ok(Sent::Response(response))
} else {
self.send_batch(vec![message]).await?;
Ok(Sent::Ack)
}
}
}
}
async fn send_batch(
&self,
messages: Vec<CanonicalMessage>,
) -> Result<SentBatch, PublisherError> {
match &self.backend {
PublisherBackend::Log(store) => {
trace!(
topic = %self.topic,
message_ids = ?LazyMessageIds(&messages),
"Appending batch to event store"
);
store.append_batch(messages).await;
Ok(SentBatch::Ack)
}
PublisherBackend::Queue(sender) => {
trace!(
topic = %self.topic,
message_ids = ?LazyMessageIds(&messages),
"Sending batch to memory channel. Current batch count: {}",
sender.len()
);
sender
.send(messages)
.await
.map_err(|e| anyhow!("Failed to send to memory channel: {}", e))?;
Ok(SentBatch::Ack)
}
}
}
async fn status(&self) -> EndpointStatus {
match &self.backend {
PublisherBackend::Queue(sender) => EndpointStatus {
healthy: !sender.is_closed(),
target: self.topic.clone(),
pending: Some(sender.len()),
capacity: Some(sender.capacity().unwrap_or(0)),
..Default::default()
},
PublisherBackend::Log(_store) => EndpointStatus {
healthy: true,
target: self.topic.clone(),
details: serde_json::json!({
"mode": "event_store"
}),
..Default::default()
},
}
}
fn as_any(&self) -> &dyn Any {
self
}
}
#[derive(Debug)]
pub struct MemoryQueueConsumer {
topic: String,
receiver: Receiver<Vec<CanonicalMessage>>,
buffer: Vec<CanonicalMessage>,
enable_nack: bool,
}
#[derive(Debug)]
pub enum MemoryConsumer {
Queue(MemoryQueueConsumer),
Log {
consumer: EventStoreConsumer,
topic: String,
},
}
impl MemoryConsumer {
pub fn new(config: &MemoryConfig) -> anyhow::Result<Self> {
let channel_exists = memory_channel_exists(&config.topic);
let store_exists = event_store_exists(&config.topic);
if config.subscribe_mode {
if channel_exists {
return Err(anyhow!("Topic '{}' is already active as a Queue (MemoryChannel), but Subscriber mode (EventStore) was requested.", config.topic));
}
let store = get_or_create_event_store(&config.topic);
let subscriber_id = format!("{}-consumer", config.topic);
info!(topic = %config.topic, subscriber_id = %subscriber_id, "Memory consumer (Log mode) connected");
let consumer = store.consumer(subscriber_id);
Ok(Self::Log {
consumer,
topic: config.topic.clone(),
})
} else {
if store_exists {
return Err(anyhow!("Topic '{}' is already active as a Subscriber Log (EventStore), but Queue mode (MemoryChannel) was requested.", config.topic));
}
let queue = MemoryQueueConsumer::new(config)?;
Ok(Self::Queue(queue))
}
}
}
impl Drop for MemoryQueueConsumer {
fn drop(&mut self) {
if !self.buffer.is_empty() {
let mut messages = std::mem::take(&mut self.buffer);
messages.reverse();
let channel = get_or_create_channel(&MemoryConfig {
topic: self.topic.clone(),
capacity: None,
..Default::default()
});
match channel.sender.try_send(messages) {
Ok(_) => {
info!(topic = %self.topic, "Requeued buffered messages on consumer drop");
}
Err(e) => {
let msgs = match e {
async_channel::TrySendError::Full(m) => m,
async_channel::TrySendError::Closed(m) => m,
};
warn!(topic = %self.topic, "Channel full on drop, spawning async requeue");
let sender = channel.sender.clone();
if let Ok(handle) = tokio::runtime::Handle::try_current() {
handle.spawn(async move {
if let Err(e) = sender.send(msgs).await {
tracing::error!(
"Failed to requeue buffered messages in background: {}",
e
);
}
});
} else {
tracing::error!(topic = %self.topic, "No active runtime found, could not requeue buffered messages on consumer drop");
}
}
}
}
}
}
impl MemoryQueueConsumer {
pub fn new(config: &MemoryConfig) -> anyhow::Result<Self> {
let channel = get_or_create_channel(config);
let buffer = if let Some(capacity) = config.capacity {
Vec::with_capacity(capacity)
} else {
Vec::new()
};
Ok(Self {
topic: config.topic.clone(),
receiver: channel.receiver.clone(),
buffer,
enable_nack: config.enable_nack,
})
}
async fn get_buffered_msgs(
&mut self,
max_messages: usize,
) -> Result<Vec<CanonicalMessage>, ConsumerError> {
if self.buffer.is_empty() {
self.buffer = match self.receiver.recv().await {
Ok(batch) => batch,
Err(_) => return Err(ConsumerError::EndOfStream),
};
self.buffer.reverse();
}
let num_to_take = self.buffer.len().min(max_messages);
let split_at = self.buffer.len() - num_to_take;
let mut messages = self.buffer.split_off(split_at);
messages.reverse(); Ok(messages)
}
}
struct RequeueGuard {
topic: String,
messages: Vec<CanonicalMessage>,
}
impl Drop for RequeueGuard {
fn drop(&mut self) {
if !self.messages.is_empty() {
let topic = self.topic.clone();
let count = self.messages.len();
let messages = std::mem::take(&mut self.messages);
let channel = get_or_create_channel(&MemoryConfig {
topic: topic.clone(),
capacity: None,
..Default::default()
});
match channel.sender.try_send(messages) {
Ok(_) => {
tracing::info!(topic = %topic, count, "Requeued dropped batch via RequeueGuard");
}
Err(e) => {
let msgs = match e {
async_channel::TrySendError::Full(m) => m,
async_channel::TrySendError::Closed(m) => m,
};
tracing::warn!(topic = %topic, count, "Failed to requeue dropped batch (channel full/closed), spawning retry");
let sender = channel.sender.clone();
if let Ok(handle) = tokio::runtime::Handle::try_current() {
handle.spawn(async move {
if let Err(e) = sender.send(msgs).await {
tracing::error!(
"Failed to requeue dropped batch in background: {}",
e
);
}
});
} else {
tracing::error!(topic = %topic, count, "No active runtime found, could not requeue dropped batch via RequeueGuard");
}
}
}
}
}
}
#[async_trait]
impl MessageConsumer for MemoryQueueConsumer {
async fn receive_batch(&mut self, max_messages: usize) -> Result<ReceivedBatch, ConsumerError> {
let mut messages = self.get_buffered_msgs(max_messages).await?;
while messages.len() < max_messages / 2 {
if let Ok(mut next_batch) = self.receiver.try_recv() {
if next_batch.len() + messages.len() > max_messages {
let needed = max_messages - messages.len();
let mut to_buffer = next_batch.split_off(needed);
messages.append(&mut next_batch);
self.buffer.append(&mut to_buffer);
self.buffer.reverse();
break;
} else {
messages.append(&mut next_batch);
}
} else {
break;
}
}
trace!(count = messages.len(), topic = %self.topic, message_ids = ?LazyMessageIds(&messages), "Received batch of memory messages");
if messages.is_empty() {
return Ok(ReceivedBatch {
messages: Vec::new(),
commit: Box::new(|_| {
Box::pin(async move { Ok(()) }) as BoxFuture<'static, anyhow::Result<()>>
}),
});
}
let topic = self.topic.clone();
let expected_count = messages.len();
let correlation_ids: Vec<Option<String>> = messages
.iter()
.map(|m| m.metadata.get("correlation_id").cloned())
.collect();
let mut guard = if self.enable_nack {
Some(RequeueGuard {
topic: self.topic.clone(),
messages: messages.clone(),
})
} else {
None
};
let commit = Box::new(move |dispositions: Vec<MessageDisposition>| {
Box::pin(async move {
if dispositions.len() != expected_count {
return Err(anyhow::anyhow!(
"Memory batch commit received mismatched disposition count: expected {}, got {}",
expected_count,
dispositions.len()
));
}
let messages_for_retry = if let Some(g) = &guard {
g.messages.clone()
} else {
Vec::new()
};
let response_channel = get_or_create_response_channel(&topic);
let mut to_requeue = Vec::new();
for (i, disposition) in dispositions.into_iter().enumerate() {
match disposition {
MessageDisposition::Reply(resp) => {
handle_memory_reply(resp, i, &correlation_ids, &response_channel).await;
}
MessageDisposition::Nack => {
if let Some(msg) = messages_for_retry.get(i) {
warn!("Requeueing nacked message {}", i);
to_requeue.push(msg.clone());
} else {
warn!("Nack for index {} but no message in retry buffer!", i);
}
}
MessageDisposition::Ack => {}
}
}
if !to_requeue.is_empty() {
let main_channel = get_or_create_channel(&MemoryConfig {
topic: topic.to_string(),
capacity: None,
..Default::default()
});
if main_channel.sender.send(to_requeue).await.is_err() {
tracing::error!("Failed to re-queue NACKed messages to memory channel as it was closed.");
}
}
if let Some(g) = &mut guard {
std::mem::take(&mut g.messages);
}
Ok(())
}) as BoxFuture<'static, anyhow::Result<()>>
}) as BatchCommitFunc;
Ok(ReceivedBatch { messages, commit })
}
async fn status(&self) -> EndpointStatus {
let pending = self.receiver.len();
let capacity = self.receiver.capacity().unwrap_or(0);
EndpointStatus {
healthy: !self.receiver.is_closed(),
target: self.topic.clone(),
pending: Some(pending),
capacity: Some(capacity),
..Default::default()
}
}
fn as_any(&self) -> &dyn Any {
self
}
}
async fn handle_memory_reply(
mut resp: CanonicalMessage,
index: usize,
correlation_ids: &[Option<String>],
response_channel: &MemoryResponseChannel,
) {
if !resp.metadata.contains_key("correlation_id") {
if let Some(Some(cid)) = correlation_ids.get(index) {
resp.metadata
.insert("correlation_id".to_string(), cid.clone());
}
}
if let Some(cid) = resp.metadata.get("correlation_id") {
if let Some(tx) = response_channel.remove_waiter(cid).await {
let _ = tx.send(resp);
return;
}
}
let _ = response_channel.sender.send(resp).await;
}
#[async_trait]
impl MessageConsumer for MemoryConsumer {
async fn receive_batch(&mut self, max_messages: usize) -> Result<ReceivedBatch, ConsumerError> {
match self {
Self::Queue(q) => q.receive_batch(max_messages).await,
Self::Log { consumer, .. } => consumer.receive_batch(max_messages).await,
}
}
async fn status(&self) -> EndpointStatus {
match self {
Self::Queue(q) => q.status().await,
Self::Log { consumer, .. } => consumer.status().await,
}
}
fn as_any(&self) -> &dyn Any {
self
}
}
impl MemoryConsumer {
pub fn new_local(topic: &str, capacity: usize) -> Self {
Self::new(&MemoryConfig {
topic: topic.to_string(),
capacity: Some(capacity),
..Default::default()
})
.expect("Failed to create local memory consumer")
}
pub fn channel(&self) -> MemoryChannel {
let topic = match self {
Self::Queue(q) => &q.topic,
Self::Log { topic, .. } => topic,
};
get_or_create_channel(&MemoryConfig {
topic: topic.clone(),
..Default::default()
})
}
}
pub struct MemorySubscriber {
consumer: MemoryConsumer,
}
impl MemorySubscriber {
pub fn new(config: &MemoryConfig, id: &str) -> anyhow::Result<Self> {
let mut sub_config = config.clone();
let consumer = if config.subscribe_mode {
let store = get_or_create_event_store(&config.topic);
MemoryConsumer::Log {
consumer: store.consumer(id.to_string()),
topic: config.topic.clone(),
}
} else {
sub_config.topic = format!("{}-{}", config.topic, id);
MemoryConsumer::new(&sub_config)?
};
Ok(Self { consumer })
}
}
#[async_trait]
impl MessageConsumer for MemorySubscriber {
async fn receive_batch(&mut self, max_messages: usize) -> Result<ReceivedBatch, ConsumerError> {
self.consumer.receive_batch(max_messages).await
}
async fn receive(&mut self) -> Result<Received, ConsumerError> {
self.consumer.receive().await
}
fn as_any(&self) -> &dyn Any {
self
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::models::{Endpoint, Route};
use crate::traits::Handled;
use crate::{msg, CanonicalMessage};
use serde_json::json;
use tokio::time::sleep;
#[tokio::test]
async fn test_memory_channel_integration() {
let mut consumer = MemoryConsumer::new_local("test-mem1", 10);
let publisher = MemoryPublisher::new_local("test-mem1", 10);
let msg = msg!(json!({"hello": "memory"}));
publisher.send(msg.clone()).await.unwrap();
sleep(std::time::Duration::from_millis(10)).await;
let received = consumer.receive().await.unwrap();
let _ = (received.commit)(MessageDisposition::Ack).await;
assert_eq!(received.message.payload, msg.payload);
assert_eq!(consumer.channel().len(), 0);
}
#[tokio::test]
async fn test_memory_publisher_and_consumer_integration() {
let mut consumer = MemoryConsumer::new_local("test-mem2", 10);
let publisher = MemoryPublisher::new_local("test-mem2", 10);
let msg1 = msg!(json!({"message": "one"}));
let msg2 = msg!(json!({"message": "two"}));
let msg3 = msg!(json!({"message": "three"}));
publisher
.send_batch(vec![msg1.clone(), msg2.clone()])
.await
.unwrap();
publisher.send(msg3.clone()).await.unwrap();
assert_eq!(publisher.channel().len(), 2);
let received1 = consumer.receive().await.unwrap();
let _ = (received1.commit)(MessageDisposition::Ack).await;
assert_eq!(received1.message.payload, msg1.payload);
let batch2 = consumer.receive_batch(1).await.unwrap();
let (received_msg2, commit2) = (batch2.messages, batch2.commit);
let _ = commit2(vec![MessageDisposition::Ack; received_msg2.len()]).await;
assert_eq!(received_msg2.len(), 1);
assert_eq!(received_msg2.first().unwrap().payload, msg2.payload);
let batch3 = consumer.receive_batch(2).await.unwrap();
let (received_msg3, commit3) = (batch3.messages, batch3.commit);
let _ = commit3(vec![MessageDisposition::Ack; received_msg3.len()]).await;
assert_eq!(received_msg3.first().unwrap().payload, msg3.payload);
assert_eq!(publisher.channel().len(), 0);
}
#[tokio::test]
async fn test_memory_subscriber_structure() {
let cfg = MemoryConfig {
topic: "base_topic".to_string(),
capacity: Some(10),
..Default::default()
};
let subscriber_id = "sub1";
let mut subscriber = MemorySubscriber::new(&cfg, subscriber_id).unwrap();
let pub_cfg = MemoryConfig {
topic: format!("base_topic-{}", subscriber_id),
capacity: Some(10),
..Default::default()
};
let publisher = MemoryPublisher::new(&pub_cfg).unwrap();
publisher.send("hello subscriber".into()).await.unwrap();
let received = subscriber.receive().await.unwrap();
assert_eq!(received.message.get_payload_str(), "hello subscriber");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_memory_request_reply_mode() {
let topic = format!("mem_rr_topic_{}", fast_uuid_v7::gen_id_str());
let input_endpoint = Endpoint::new_memory(&topic, 10);
let output_endpoint = Endpoint::new_response();
let handler = |mut msg: CanonicalMessage| async move {
let request_payload = msg.get_payload_str();
let response_payload = format!("reply to {}", request_payload);
msg.set_payload_str(response_payload);
Ok(Handled::Publish(msg))
};
let route = Route::new(input_endpoint, output_endpoint).with_handler(handler);
route.deploy("mem_rr_test").await.unwrap();
let publisher = MemoryPublisher::new(&MemoryConfig {
topic: topic.clone(),
capacity: Some(10),
request_reply: true,
request_timeout_ms: Some(2000),
..Default::default()
})
.unwrap();
let result = publisher.send("direct request".into()).await.unwrap();
if let Sent::Response(response_msg) = result {
assert_eq!(response_msg.get_payload_str(), "reply to direct request");
} else {
panic!("Expected Sent::Response, got {:?}", result);
}
Route::stop("mem_rr_test").await;
}
#[tokio::test]
async fn test_memory_nack_requeue() {
let topic = format!("test_nack_requeue_{}", fast_uuid_v7::gen_id_str());
let config = MemoryConfig {
topic: topic.clone(),
capacity: Some(10),
enable_nack: true,
..Default::default()
};
let mut consumer = MemoryConsumer::new(&config).unwrap();
let publisher = MemoryPublisher::new_local(&topic, 10);
publisher.send("to_be_nacked".into()).await.unwrap();
let received1 = consumer.receive().await.unwrap();
assert_eq!(received1.message.get_payload_str(), "to_be_nacked");
(received1.commit)(crate::traits::MessageDisposition::Nack)
.await
.unwrap();
let received2 = tokio::time::timeout(std::time::Duration::from_secs(1), consumer.receive())
.await
.expect("Timed out waiting for re-queued message")
.unwrap();
assert_eq!(received2.message.get_payload_str(), "to_be_nacked");
(received2.commit)(crate::traits::MessageDisposition::Ack)
.await
.unwrap();
let result =
tokio::time::timeout(std::time::Duration::from_millis(100), consumer.receive()).await;
assert!(result.is_err(), "Channel should be empty");
}
#[tokio::test]
async fn test_memory_event_store_integration() {
let topic = "event_store_test";
let pub_config = MemoryConfig {
topic: topic.to_string(),
subscribe_mode: true,
..Default::default()
};
let publisher = MemoryPublisher::new(&pub_config).unwrap();
let mut sub1 = MemorySubscriber::new(&pub_config, "sub1").unwrap();
let mut sub2 = MemorySubscriber::new(&pub_config, "sub2").unwrap();
publisher.send("event1".into()).await.unwrap();
let msg1 = sub1.receive().await.unwrap();
assert_eq!(msg1.message.get_payload_str(), "event1");
(msg1.commit)(MessageDisposition::Ack).await.unwrap();
let msg2 = sub2.receive().await.unwrap();
assert_eq!(msg2.message.get_payload_str(), "event1");
}
#[tokio::test]
async fn test_memory_no_subscribers_persistence() {
let topic = format!("no_subs_{}", fast_uuid_v7::gen_id_str());
let pub_config = MemoryConfig {
topic: topic.clone(),
subscribe_mode: true,
..Default::default()
};
let publisher = MemoryPublisher::new(&pub_config).unwrap();
publisher.send("msg1".into()).await.unwrap();
publisher.send("msg2".into()).await.unwrap();
let sub_config = MemoryConfig {
topic: topic.clone(),
subscribe_mode: true,
..Default::default()
};
let mut subscriber = MemorySubscriber::new(&sub_config, "late_sub").unwrap();
let received1 = subscriber.receive().await.unwrap();
assert_eq!(received1.message.get_payload_str(), "msg1");
(received1.commit)(MessageDisposition::Ack).await.unwrap();
let received2 = subscriber.receive().await.unwrap();
assert_eq!(received2.message.get_payload_str(), "msg2");
(received2.commit)(MessageDisposition::Ack).await.unwrap();
}
#[tokio::test]
async fn test_memory_mixed_mode_error() {
let topic_q = format!("mixed_q_{}", fast_uuid_v7::gen_id_str());
let topic_l = format!("mixed_l_{}", fast_uuid_v7::gen_id_str());
let _pub_q = MemoryPublisher::new_local(&topic_q, 10);
let log_conf = MemoryConfig {
topic: topic_q.clone(),
subscribe_mode: true,
..Default::default()
};
let err = MemoryConsumer::new(&log_conf);
assert!(err.is_err());
assert!(err
.unwrap_err()
.to_string()
.contains("already active as a Queue"));
let log_pub_conf = MemoryConfig {
topic: topic_l.clone(),
subscribe_mode: true,
..Default::default()
};
let _pub_l = MemoryPublisher::new(&log_pub_conf).unwrap();
let queue_conf = MemoryConfig {
topic: topic_l.clone(),
subscribe_mode: false,
..Default::default()
};
let err = MemoryConsumer::new(&queue_conf);
assert!(err.is_err());
assert!(err
.unwrap_err()
.to_string()
.contains("already active as a Subscriber Log"));
}
#[tokio::test]
async fn test_memory_publisher_mixed_mode_error() {
let topic_q = format!("pub_mixed_q_{}", fast_uuid_v7::gen_id_str());
let _cons_q = MemoryConsumer::new_local(&topic_q, 10);
let log_conf = MemoryConfig {
topic: topic_q.clone(),
subscribe_mode: true,
..Default::default()
};
let err = MemoryPublisher::new(&log_conf);
assert!(err.is_err());
assert!(err
.unwrap_err()
.to_string()
.contains("already active as a Queue"));
}
#[tokio::test]
async fn test_memory_publisher_adaptive_behavior() {
let topic = format!("adaptive_{}", fast_uuid_v7::gen_id_str());
let sub_config = MemoryConfig {
topic: topic.clone(),
subscribe_mode: true,
..Default::default()
};
let mut subscriber = MemorySubscriber::new(&sub_config, "sub1").unwrap();
let pub_config = MemoryConfig {
topic: topic.clone(),
subscribe_mode: false, ..Default::default()
};
let publisher = MemoryPublisher::new(&pub_config).unwrap();
publisher.send("adaptive_msg".into()).await.unwrap();
let received = subscriber.receive().await.unwrap();
assert_eq!(received.message.get_payload_str(), "adaptive_msg");
}
}