Skip to main content

mq_bridge/endpoints/
memory.rs

1//  mq-bridge
2//  © Copyright 2025, by Marco Mengelkoch
3//  Licensed under MIT License, see License file for more details
4//  git clone https://github.com/marcomq/mq-bridge
5use crate::canonical_message::tracing_support::LazyMessageIds;
6use crate::event_store::{
7    event_store_exists, get_or_create_event_store, EventStore, EventStoreConsumer,
8};
9use crate::models::MemoryConfig;
10use crate::traits::{
11    BatchCommitFunc, BoxFuture, ConsumerError, MessageConsumer, MessageDisposition,
12    MessagePublisher, PublisherError, Received, ReceivedBatch, Sent, SentBatch,
13};
14use crate::CanonicalMessage;
15use anyhow::anyhow;
16use async_channel::{bounded, Receiver, Sender};
17use async_trait::async_trait;
18use once_cell::sync::Lazy;
19use std::any::Any;
20use std::collections::HashMap;
21use std::sync::{Arc, Mutex};
22use tokio::sync::oneshot;
23use tracing::{info, trace};
24
25/// A map to hold memory channels for the duration of the bridge setup.
26/// This allows a consumer and publisher in different routes to connect to the same in-memory topic.
27static RUNTIME_MEMORY_CHANNELS: Lazy<Mutex<HashMap<String, MemoryChannel>>> =
28    Lazy::new(|| Mutex::new(HashMap::new()));
29
30/// A map to hold memory response channels.
31static RUNTIME_RESPONSE_CHANNELS: Lazy<Mutex<HashMap<String, MemoryResponseChannel>>> =
32    Lazy::new(|| Mutex::new(HashMap::new()));
33
34/// A shareable, thread-safe, in-memory channel for testing.
35///
36/// This struct holds the sender and receiver for an in-memory queue.
37/// It can be cloned and shared between your test code and the bridge's endpoints. It transports batches of messages.
38#[derive(Debug, Clone)]
39pub struct MemoryChannel {
40    pub sender: Sender<Vec<CanonicalMessage>>,
41    pub receiver: Receiver<Vec<CanonicalMessage>>,
42}
43
44impl MemoryChannel {
45    /// Creates a new batch channel with a specified capacity.
46    pub fn new(capacity: usize) -> Self {
47        let (sender, receiver) = bounded(capacity);
48        Self { sender, receiver }
49    }
50
51    /// Helper function for tests to easily send a message to the channel.
52    pub async fn send_message(&self, message: CanonicalMessage) -> anyhow::Result<()> {
53        self.sender.send(vec![message]).await?;
54        tracing::debug!("Message sent to memory {} channel", self.sender.len());
55        Ok(())
56    }
57
58    /// Helper function for tests to easily fill in messages.
59    pub async fn fill_messages(&self, messages: Vec<CanonicalMessage>) -> anyhow::Result<()> {
60        // Send the entire vector as a single batch.
61        self.sender
62            .send(messages)
63            .await
64            .map_err(|e| anyhow!("Memory channel was closed while filling messages: {}", e))?;
65        Ok(())
66    }
67
68    /// Closes the sender part of the channel.
69    pub fn close(&self) {
70        self.sender.close();
71    }
72
73    /// Helper function for tests to drain all messages from the channel.
74    pub fn drain_messages(&self) -> Vec<CanonicalMessage> {
75        let mut messages = Vec::new();
76        // Drain all batches from the channel and flatten them into a single Vec.
77        while let Ok(batch) = self.receiver.try_recv() {
78            messages.extend(batch);
79        }
80        messages
81    }
82
83    /// Returns the number of bulk messages in the channel.
84    pub fn len(&self) -> usize {
85        self.receiver.len()
86    }
87
88    /// Returns the number of messages currently in the channel.
89    pub fn is_empty(&self) -> bool {
90        self.receiver.is_empty()
91    }
92}
93
94/// A shareable, thread-safe, in-memory channel for responses.
95#[derive(Debug, Clone)]
96pub struct MemoryResponseChannel {
97    pub sender: Sender<CanonicalMessage>,
98    pub receiver: Receiver<CanonicalMessage>,
99    waiters: Arc<tokio::sync::Mutex<HashMap<String, oneshot::Sender<CanonicalMessage>>>>,
100}
101
102impl MemoryResponseChannel {
103    pub fn new(capacity: usize) -> Self {
104        let (sender, receiver) = bounded(capacity);
105        Self {
106            sender,
107            receiver,
108            waiters: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
109        }
110    }
111
112    pub fn close(&self) {
113        self.sender.close();
114    }
115
116    pub fn len(&self) -> usize {
117        self.receiver.len()
118    }
119
120    pub fn is_empty(&self) -> bool {
121        self.receiver.is_empty()
122    }
123
124    pub async fn wait_for_response(&self) -> anyhow::Result<CanonicalMessage> {
125        self.receiver
126            .recv()
127            .await
128            .map_err(|e| anyhow!("Error receiving response: {}", e))
129    }
130
131    pub async fn register_waiter(
132        &self,
133        correlation_id: &str,
134        sender: oneshot::Sender<CanonicalMessage>,
135    ) -> anyhow::Result<()> {
136        let mut waiters = self.waiters.lock().await;
137        if waiters.contains_key(correlation_id) {
138            return Err(anyhow!(
139                "Correlation ID {} already registered",
140                correlation_id
141            ));
142        }
143        waiters.insert(correlation_id.to_string(), sender);
144        Ok(())
145    }
146
147    pub async fn remove_waiter(
148        &self,
149        correlation_id: &str,
150    ) -> Option<oneshot::Sender<CanonicalMessage>> {
151        self.waiters.lock().await.remove(correlation_id)
152    }
153}
154
155/// Gets a shared `MemoryChannel` for a given topic, creating it if it doesn't exist.
156pub fn get_or_create_channel(config: &MemoryConfig) -> MemoryChannel {
157    let mut channels = RUNTIME_MEMORY_CHANNELS.lock().unwrap();
158    channels
159        .entry(config.topic.clone()) // Use the HashMap's entry API
160        .or_insert_with(|| {
161            info!(topic = %config.topic, "Creating new runtime memory channel");
162            MemoryChannel::new(config.capacity.unwrap_or(100))
163        })
164        .clone()
165}
166
167/// Gets a shared `MemoryResponseChannel` for a given topic, creating it if it doesn't exist.
168pub fn get_or_create_response_channel(topic: &str) -> MemoryResponseChannel {
169    let mut channels = RUNTIME_RESPONSE_CHANNELS.lock().unwrap();
170    channels
171        .entry(topic.to_string())
172        .or_insert_with(|| {
173            info!(topic = %topic, "Creating new runtime memory response channel");
174            MemoryResponseChannel::new(100)
175        })
176        .clone()
177}
178
179fn memory_channel_exists(topic: &str) -> bool {
180    let channels = RUNTIME_MEMORY_CHANNELS.lock().unwrap();
181    channels.contains_key(topic)
182}
183
184/// A sink that sends messages to an in-memory channel.
185#[derive(Debug, Clone)]
186pub struct MemoryPublisher {
187    topic: String,
188    backend: PublisherBackend,
189    request_reply: bool,
190    request_timeout: std::time::Duration,
191}
192
193#[derive(Debug, Clone)]
194enum PublisherBackend {
195    Queue(Sender<Vec<CanonicalMessage>>),
196    Log(Arc<EventStore>),
197}
198
199impl MemoryPublisher {
200    pub fn new(config: &MemoryConfig) -> anyhow::Result<Self> {
201        let channel_exists = memory_channel_exists(&config.topic);
202        let store_exists = event_store_exists(&config.topic);
203
204        let backend = if config.subscribe_mode {
205            if channel_exists {
206                return Err(anyhow!("Topic '{}' is already active as a Queue (MemoryChannel), but Subscriber mode (EventStore) was requested.", config.topic));
207            }
208            let store = get_or_create_event_store(&config.topic);
209            PublisherBackend::Log(store)
210        } else if store_exists {
211            // Adaptive behavior: If an EventStore already exists, we publish to it even if
212            // subscribe_mode wasn't explicitly set. This prevents split-brain scenarios.
213            tracing::debug!(topic = %config.topic, "Adapting publisher to Log mode due to existing EventStore");
214            let store = get_or_create_event_store(&config.topic);
215            PublisherBackend::Log(store)
216        } else {
217            let channel = get_or_create_channel(config);
218            PublisherBackend::Queue(channel.sender)
219        };
220
221        Ok(Self {
222            topic: config.topic.clone(),
223            backend,
224            request_reply: config.request_reply,
225            request_timeout: std::time::Duration::from_millis(
226                config.request_timeout_ms.unwrap_or(30000),
227            ),
228        })
229    }
230
231    /// Creates a new local memory publisher.
232    ///
233    /// This method creates a new in-memory publisher with the specified topic and capacity.
234    /// The publisher will send messages to the in-memory channel for the specified topic.
235    pub fn new_local(topic: &str, capacity: usize) -> Self {
236        Self::new(&MemoryConfig {
237            topic: topic.to_string(),
238            capacity: Some(capacity),
239            ..Default::default()
240        })
241        .expect("Failed to create local memory publisher")
242    }
243
244    /// Note: This helper is primarily for tests expecting a Queue.    
245    /// If used on a broadcast publisher, it will create a separate Queue channel.
246    pub fn channel(&self) -> MemoryChannel {
247        get_or_create_channel(&MemoryConfig {
248            topic: self.topic.clone(),
249            capacity: None,
250            ..Default::default()
251        })
252    }
253}
254
255#[async_trait]
256impl MessagePublisher for MemoryPublisher {
257    async fn send(&self, mut message: CanonicalMessage) -> Result<Sent, PublisherError> {
258        match &self.backend {
259            PublisherBackend::Log(store) => {
260                store.append(message).await;
261                Ok(Sent::Ack)
262            }
263            PublisherBackend::Queue(sender) => {
264                if self.request_reply {
265                    let cid = message
266                        .metadata
267                        .entry("correlation_id".to_string())
268                        .or_insert_with(fast_uuid_v7::gen_id_string)
269                        .clone();
270
271                    let (tx, rx) = oneshot::channel();
272
273                    // Register waiter before sending
274                    let response_channel = get_or_create_response_channel(&self.topic);
275                    response_channel
276                        .register_waiter(&cid, tx)
277                        .await
278                        .map_err(PublisherError::NonRetryable)?;
279
280                    // Send the message
281                    // We use the internal sender directly to avoid recursion or cloning issues
282                    if let Err(e) = sender.send(vec![message]).await {
283                        response_channel.remove_waiter(&cid).await;
284                        return Err(anyhow!("Failed to send to memory channel: {}", e).into());
285                    }
286
287                    // Wait for the response
288                    let response = match tokio::time::timeout(self.request_timeout, rx).await {
289                        Ok(Ok(resp)) => resp,
290                        Ok(Err(e)) => {
291                            response_channel.remove_waiter(&cid).await;
292                            return Err(anyhow!(
293                                "Failed to receive response for correlation_id {}: {}",
294                                cid,
295                                e
296                            )
297                            .into());
298                        }
299                        Err(_) => {
300                            response_channel.remove_waiter(&cid).await;
301                            return Err(PublisherError::Retryable(anyhow!(
302                                "Request timed out waiting for response for correlation_id {}",
303                                cid
304                            )));
305                        }
306                    };
307
308                    Ok(Sent::Response(response))
309                } else {
310                    self.send_batch(vec![message]).await?;
311                    Ok(Sent::Ack)
312                }
313            }
314        }
315    }
316
317    async fn send_batch(
318        &self,
319        messages: Vec<CanonicalMessage>,
320    ) -> Result<SentBatch, PublisherError> {
321        match &self.backend {
322            PublisherBackend::Log(store) => {
323                trace!(
324                    topic = %self.topic,
325                    message_ids = ?LazyMessageIds(&messages),
326                    "Appending batch to event store"
327                );
328                store.append_batch(messages).await;
329                Ok(SentBatch::Ack)
330            }
331            PublisherBackend::Queue(sender) => {
332                trace!(
333                    topic = %self.topic,
334                    message_ids = ?LazyMessageIds(&messages),
335                    "Sending batch to memory channel. Current batch count: {}",
336                    sender.len()
337                );
338                sender
339                    .send(messages)
340                    .await
341                    .map_err(|e| anyhow!("Failed to send to memory channel: {}", e))?;
342                Ok(SentBatch::Ack)
343            }
344        }
345    }
346
347    fn as_any(&self) -> &dyn Any {
348        self
349    }
350}
351
352/// A queue-based consumer (legacy behavior).
353#[derive(Debug)]
354pub struct MemoryQueueConsumer {
355    topic: String,
356    receiver: Receiver<Vec<CanonicalMessage>>,
357    // Internal buffer to hold messages from a received batch.
358    buffer: Vec<CanonicalMessage>,
359    enable_nack: bool,
360}
361
362/// A source that reads messages from an in-memory channel or event store.
363#[derive(Debug)]
364pub enum MemoryConsumer {
365    Queue(MemoryQueueConsumer),
366    Log {
367        consumer: EventStoreConsumer,
368        topic: String,
369    },
370}
371
372impl MemoryConsumer {
373    pub fn new(config: &MemoryConfig) -> anyhow::Result<Self> {
374        let channel_exists = memory_channel_exists(&config.topic);
375        let store_exists = event_store_exists(&config.topic);
376
377        if config.subscribe_mode {
378            if channel_exists {
379                return Err(anyhow!("Topic '{}' is already active as a Queue (MemoryChannel), but Subscriber mode (EventStore) was requested.", config.topic));
380            }
381            let store = get_or_create_event_store(&config.topic);
382            // For subscriber mode, we generate a unique ID if one isn't implicit in the usage.
383            // However, MemorySubscriber struct usually handles the ID.
384            // If MemoryConsumer is used directly with subscribe_mode=true, we assume a default ID or ephemeral.
385            let subscriber_id = format!("{}-consumer", config.topic);
386            info!(topic = %config.topic, subscriber_id = %subscriber_id, "Memory consumer (Log mode) connected");
387            let consumer = store.consumer(subscriber_id);
388            Ok(Self::Log {
389                consumer,
390                topic: config.topic.clone(),
391            })
392        } else {
393            if store_exists {
394                // Unlike the Publisher, we cannot silently adapt to Log mode here.
395                // The EventStore implementation currently supports Pub/Sub (broadcast) only.
396                // Adapting would result in this consumer receiving all messages, violating
397                // the expected Queue (competing consumer) semantics requested by `subscribe_mode: false`.
398                return Err(anyhow!("Topic '{}' is already active as a Subscriber Log (EventStore), but Queue mode (MemoryChannel) was requested.", config.topic));
399            }
400            let queue = MemoryQueueConsumer::new(config)?;
401            Ok(Self::Queue(queue))
402        }
403    }
404}
405
406impl MemoryQueueConsumer {
407    pub fn new(config: &MemoryConfig) -> anyhow::Result<Self> {
408        let channel = get_or_create_channel(config);
409        let buffer = if let Some(capacity) = config.capacity {
410            Vec::with_capacity(capacity)
411        } else {
412            Vec::new()
413        };
414        Ok(Self {
415            topic: config.topic.clone(),
416            receiver: channel.receiver.clone(),
417            buffer,
418            enable_nack: config.enable_nack,
419        })
420    }
421
422    async fn get_buffered_msgs(
423        &mut self,
424        max_messages: usize,
425    ) -> Result<Vec<CanonicalMessage>, ConsumerError> {
426        // If the internal buffer has messages, return them first.
427        if self.buffer.is_empty() {
428            // Buffer is empty. Wait for a new batch from the channel.
429            self.buffer = match self.receiver.recv().await {
430                Ok(batch) => batch,
431                Err(_) => return Err(ConsumerError::EndOfStream),
432            };
433            // Reverse the buffer so we can efficiently pop from the end.
434            self.buffer.reverse();
435        }
436
437        // Determine the number of messages to take from the buffer.
438        let num_to_take = self.buffer.len().min(max_messages);
439        let split_at = self.buffer.len() - num_to_take;
440
441        // `split_off` is highly efficient. It splits the Vec in two at the given
442        // index and returns the part after the index, leaving the first part.
443        let mut messages = self.buffer.split_off(split_at);
444        messages.reverse(); // Reverse back to original order.
445        Ok(messages)
446    }
447}
448
449#[async_trait]
450impl MessageConsumer for MemoryQueueConsumer {
451    async fn receive_batch(&mut self, max_messages: usize) -> Result<ReceivedBatch, ConsumerError> {
452        // If the internal buffer has messages, return them first.
453
454        let mut messages = self.get_buffered_msgs(max_messages).await?;
455        while messages.len() < max_messages / 2 {
456            if let Ok(mut next_batch) = self.receiver.try_recv() {
457                if next_batch.len() + messages.len() > max_messages {
458                    let needed = max_messages - messages.len();
459                    let mut to_buffer = next_batch.split_off(needed);
460                    messages.append(&mut next_batch);
461                    self.buffer.append(&mut to_buffer);
462                    self.buffer.reverse();
463                    break;
464                } else {
465                    messages.append(&mut next_batch);
466                }
467            } else {
468                break;
469            }
470        }
471        trace!(count = messages.len(), topic = %self.topic, message_ids = ?LazyMessageIds(&messages), "Received batch of memory messages");
472        if messages.is_empty() {
473            return Ok(ReceivedBatch {
474                messages: Vec::new(),
475                commit: Box::new(|_| {
476                    Box::pin(async move { Ok(()) }) as BoxFuture<'static, anyhow::Result<()>>
477                }),
478            });
479        }
480
481        let topic = self.topic.clone();
482        let expected_count = messages.len();
483        let correlation_ids: Vec<Option<String>> = messages
484            .iter()
485            .map(|m| m.metadata.get("correlation_id").cloned())
486            .collect();
487
488        // This clone is necessary to support NACKs. The commit function needs access
489        // to the messages to re-queue them, but the `ReceivedBatch` must also return
490        // ownership of the messages to the caller. Without changing the core traits,
491        // cloning is the only way to satisfy both owners.
492        let messages_for_retry = if self.enable_nack {
493            Some(messages.clone())
494        } else {
495            None
496        };
497        let commit = Box::new(move |dispositions: Vec<MessageDisposition>| {
498            Box::pin(async move {
499                if dispositions.len() != expected_count {
500                    return Err(anyhow::anyhow!(
501                        "Memory batch commit received mismatched disposition count: expected {}, got {}",
502                        expected_count,
503                        dispositions.len()
504                    ));
505                }
506                let response_channel = get_or_create_response_channel(&topic);
507                let mut to_requeue = Vec::new();
508
509                for (i, disposition) in dispositions.into_iter().enumerate() {
510                    match disposition {
511                        MessageDisposition::Reply(mut resp) => {
512                            if !resp.metadata.contains_key("correlation_id") {
513                                if let Some(Some(cid)) = correlation_ids.get(i) {
514                                    resp.metadata
515                                        .insert("correlation_id".to_string(), cid.clone());
516                                }
517                            }
518
519                            // If the receiver is dropped, sending will fail. We can ignore it.
520                            let mut handled = false;
521                            if let Some(cid) = resp.metadata.get("correlation_id") {
522                                if let Some(tx) = response_channel.remove_waiter(cid).await {
523                                    let _ = tx.send(resp.clone());
524                                    handled = true;
525                                }
526                            }
527                            if !handled {
528                                let _ = response_channel.sender.send(resp).await;
529                            }
530                        }
531                        MessageDisposition::Nack => {
532                            // Re-queue the message if Nacked
533                            if let Some(msgs) = &messages_for_retry {
534                                if let Some(msg) = msgs.get(i) {
535                                    to_requeue.push(msg.clone());
536                                }
537                            }
538                        }
539                        MessageDisposition::Ack => {}
540                    }
541                }
542
543                if !to_requeue.is_empty() {
544                    let main_channel = get_or_create_channel(&MemoryConfig {
545                        topic: topic.to_string(),
546                        capacity: None,
547                        ..Default::default()
548                    });
549                    if main_channel.sender.send(to_requeue).await.is_err() {
550                        tracing::error!("Failed to re-queue NACKed messages to memory channel as it was closed.");
551                    }
552                }
553                Ok(())
554            }) as BoxFuture<'static, anyhow::Result<()>>
555        }) as BatchCommitFunc;
556        Ok(ReceivedBatch { messages, commit })
557    }
558
559    fn as_any(&self) -> &dyn Any {
560        self
561    }
562}
563
564#[async_trait]
565impl MessageConsumer for MemoryConsumer {
566    async fn receive_batch(&mut self, max_messages: usize) -> Result<ReceivedBatch, ConsumerError> {
567        match self {
568            Self::Queue(q) => q.receive_batch(max_messages).await,
569            Self::Log { consumer, .. } => consumer.receive_batch(max_messages).await,
570        }
571    }
572
573    fn as_any(&self) -> &dyn Any {
574        self
575    }
576}
577
578impl MemoryConsumer {
579    pub fn new_local(topic: &str, capacity: usize) -> Self {
580        Self::new(&MemoryConfig {
581            topic: topic.to_string(),
582            capacity: Some(capacity),
583            ..Default::default()
584        })
585        .expect("Failed to create local memory consumer")
586    }
587    pub fn channel(&self) -> MemoryChannel {
588        let topic = match self {
589            Self::Queue(q) => &q.topic,
590            Self::Log { topic, .. } => topic,
591        };
592        get_or_create_channel(&MemoryConfig {
593            topic: topic.clone(),
594            ..Default::default()
595        })
596    }
597}
598
599pub struct MemorySubscriber {
600    consumer: MemoryConsumer,
601}
602
603impl MemorySubscriber {
604    pub fn new(config: &MemoryConfig, id: &str) -> anyhow::Result<Self> {
605        let mut sub_config = config.clone();
606        // If subscribe_mode is true, we use EventStore with the original topic but unique subscriber ID.
607        // If false (legacy), we use the suffixed topic queue.
608        let consumer = if config.subscribe_mode {
609            let store = get_or_create_event_store(&config.topic);
610            MemoryConsumer::Log {
611                consumer: store.consumer(id.to_string()),
612                topic: config.topic.clone(),
613            }
614        } else {
615            sub_config.topic = format!("{}-{}", config.topic, id);
616            MemoryConsumer::new(&sub_config)?
617        };
618        Ok(Self { consumer })
619    }
620}
621
622#[async_trait]
623impl MessageConsumer for MemorySubscriber {
624    async fn receive_batch(&mut self, max_messages: usize) -> Result<ReceivedBatch, ConsumerError> {
625        self.consumer.receive_batch(max_messages).await
626    }
627
628    async fn receive(&mut self) -> Result<Received, ConsumerError> {
629        self.consumer.receive().await
630    }
631
632    fn as_any(&self) -> &dyn Any {
633        self
634    }
635}
636
637#[cfg(test)]
638mod tests {
639    use super::*;
640    use crate::models::{Endpoint, Route};
641    use crate::traits::Handled;
642    use crate::{msg, CanonicalMessage};
643    use serde_json::json;
644    use tokio::time::sleep;
645
646    #[tokio::test]
647    async fn test_memory_channel_integration() {
648        let mut consumer = MemoryConsumer::new_local("test-mem1", 10);
649        let publisher = MemoryPublisher::new_local("test-mem1", 10);
650
651        let msg = msg!(json!({"hello": "memory"}));
652
653        // Send a message via the publisher
654        publisher.send(msg.clone()).await.unwrap();
655
656        sleep(std::time::Duration::from_millis(10)).await;
657        // Receive it with the consumer
658        let received = consumer.receive().await.unwrap();
659        let _ = (received.commit)(MessageDisposition::Ack).await;
660        assert_eq!(received.message.payload, msg.payload);
661        assert_eq!(consumer.channel().len(), 0);
662    }
663
664    #[tokio::test]
665    async fn test_memory_publisher_and_consumer_integration() {
666        let mut consumer = MemoryConsumer::new_local("test-mem2", 10);
667        let publisher = MemoryPublisher::new_local("test-mem2", 10);
668
669        let msg1 = msg!(json!({"message": "one"}));
670        let msg2 = msg!(json!({"message": "two"}));
671        let msg3 = msg!(json!({"message": "three"}));
672
673        // 3. Send messages via the publisher
674        publisher
675            .send_batch(vec![msg1.clone(), msg2.clone()])
676            .await
677            .unwrap();
678        publisher.send(msg3.clone()).await.unwrap();
679
680        // 4. Verify the channel has the messages
681        assert_eq!(publisher.channel().len(), 2);
682
683        // 5. Receive the messages and verify them
684        let received1 = consumer.receive().await.unwrap();
685        let _ = (received1.commit)(MessageDisposition::Ack).await;
686        assert_eq!(received1.message.payload, msg1.payload);
687
688        let batch2 = consumer.receive_batch(1).await.unwrap();
689        let (received_msg2, commit2) = (batch2.messages, batch2.commit);
690        let _ = commit2(vec![MessageDisposition::Ack; received_msg2.len()]).await;
691        assert_eq!(received_msg2.len(), 1);
692        assert_eq!(received_msg2.first().unwrap().payload, msg2.payload);
693        let batch3 = consumer.receive_batch(2).await.unwrap();
694        let (received_msg3, commit3) = (batch3.messages, batch3.commit);
695        let _ = commit3(vec![MessageDisposition::Ack; received_msg3.len()]).await;
696        assert_eq!(received_msg3.first().unwrap().payload, msg3.payload);
697
698        // 6. Verify that the channel is now empty
699        assert_eq!(publisher.channel().len(), 0);
700
701        // 7. Verify that reading again results in an error because the channel is empty and we are not closing it
702        // In a real scenario with a closed channel, this would error out. Here we can just check it's empty.
703        // A `receive` call would just hang, waiting for a message.
704    }
705
706    #[tokio::test]
707    async fn test_memory_subscriber_structure() {
708        let cfg = MemoryConfig {
709            topic: "base_topic".to_string(),
710            capacity: Some(10),
711            ..Default::default()
712        };
713        let subscriber_id = "sub1";
714        let mut subscriber = MemorySubscriber::new(&cfg, subscriber_id).unwrap();
715
716        // The subscriber should be listening on "base_topic-sub1"
717        // We can verify this by creating a publisher for that specific topic.
718        let pub_cfg = MemoryConfig {
719            topic: format!("base_topic-{}", subscriber_id),
720            capacity: Some(10),
721            ..Default::default()
722        };
723        let publisher = MemoryPublisher::new(&pub_cfg).unwrap();
724
725        publisher.send("hello subscriber".into()).await.unwrap();
726
727        let received = subscriber.receive().await.unwrap();
728        assert_eq!(received.message.get_payload_str(), "hello subscriber");
729    }
730
731    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
732    async fn test_memory_request_reply_mode() {
733        let topic = format!("mem_rr_topic_{}", fast_uuid_v7::gen_id_str());
734        let input_endpoint = Endpoint::new_memory(&topic, 10);
735        let output_endpoint = Endpoint::new_response();
736        let handler = |mut msg: CanonicalMessage| async move {
737            let request_payload = msg.get_payload_str();
738            let response_payload = format!("reply to {}", request_payload);
739            msg.set_payload_str(response_payload);
740            Ok(Handled::Publish(msg))
741        };
742
743        let route = Route::new(input_endpoint, output_endpoint).with_handler(handler);
744        route.deploy("mem_rr_test").await.unwrap();
745
746        // Create a publisher with request_reply = true
747        let publisher = MemoryPublisher::new(&MemoryConfig {
748            topic: topic.clone(),
749            capacity: Some(10),
750            request_reply: true,
751            request_timeout_ms: Some(2000),
752            ..Default::default()
753        })
754        .unwrap();
755
756        let result = publisher.send("direct request".into()).await.unwrap();
757
758        if let Sent::Response(response_msg) = result {
759            assert_eq!(response_msg.get_payload_str(), "reply to direct request");
760        } else {
761            panic!("Expected Sent::Response, got {:?}", result);
762        }
763
764        // Clean up
765        Route::stop("mem_rr_test").await;
766    }
767
768    #[tokio::test]
769    async fn test_memory_nack_requeue() {
770        let topic = format!("test_nack_requeue_{}", fast_uuid_v7::gen_id_str());
771        let config = MemoryConfig {
772            topic: topic.clone(),
773            capacity: Some(10),
774            enable_nack: true,
775            ..Default::default()
776        };
777        let mut consumer = MemoryConsumer::new(&config).unwrap();
778        let publisher = MemoryPublisher::new_local(&topic, 10);
779
780        publisher.send("to_be_nacked".into()).await.unwrap();
781
782        // 1. Receive and Nack
783        let received1 = consumer.receive().await.unwrap();
784        assert_eq!(received1.message.get_payload_str(), "to_be_nacked");
785        (received1.commit)(crate::traits::MessageDisposition::Nack)
786            .await
787            .unwrap();
788
789        // 2. Receive again (should be re-queued)
790        let received2 = tokio::time::timeout(std::time::Duration::from_secs(1), consumer.receive())
791            .await
792            .expect("Timed out waiting for re-queued message")
793            .unwrap();
794        assert_eq!(received2.message.get_payload_str(), "to_be_nacked");
795
796        // 3. Ack
797        (received2.commit)(crate::traits::MessageDisposition::Ack)
798            .await
799            .unwrap();
800
801        // 4. Verify empty
802        let result =
803            tokio::time::timeout(std::time::Duration::from_millis(100), consumer.receive()).await;
804        assert!(result.is_err(), "Channel should be empty");
805    }
806
807    #[tokio::test]
808    async fn test_memory_event_store_integration() {
809        let topic = "event_store_test";
810        // Publisher with subscribe_mode=true enables EventStore writing
811        let pub_config = MemoryConfig {
812            topic: topic.to_string(),
813            subscribe_mode: true,
814            ..Default::default()
815        };
816        let publisher = MemoryPublisher::new(&pub_config).unwrap();
817
818        // Subscriber 1
819        let mut sub1 = MemorySubscriber::new(&pub_config, "sub1").unwrap();
820        // Subscriber 2
821        let mut sub2 = MemorySubscriber::new(&pub_config, "sub2").unwrap();
822
823        publisher.send("event1".into()).await.unwrap();
824
825        let msg1 = sub1.receive().await.unwrap();
826        assert_eq!(msg1.message.get_payload_str(), "event1");
827        (msg1.commit)(MessageDisposition::Ack).await.unwrap();
828
829        let msg2 = sub2.receive().await.unwrap();
830        assert_eq!(msg2.message.get_payload_str(), "event1");
831    }
832
833    #[tokio::test]
834    async fn test_memory_no_subscribers_persistence() {
835        let topic = format!("no_subs_{}", fast_uuid_v7::gen_id_str());
836        let pub_config = MemoryConfig {
837            topic: topic.clone(),
838            subscribe_mode: true,
839            ..Default::default()
840        };
841
842        // 1. Create Publisher (Log mode)
843        let publisher = MemoryPublisher::new(&pub_config).unwrap();
844
845        // 2. Publish messages with no subscribers
846        publisher.send("msg1".into()).await.unwrap();
847        publisher.send("msg2".into()).await.unwrap();
848
849        // 3. Create Subscriber (Late joiner)
850        let sub_config = MemoryConfig {
851            topic: topic.clone(),
852            subscribe_mode: true,
853            ..Default::default()
854        };
855        let mut subscriber = MemorySubscriber::new(&sub_config, "late_sub").unwrap();
856
857        // 4. Verify messages are received
858        let received1 = subscriber.receive().await.unwrap();
859        assert_eq!(received1.message.get_payload_str(), "msg1");
860        (received1.commit)(MessageDisposition::Ack).await.unwrap();
861
862        let received2 = subscriber.receive().await.unwrap();
863        assert_eq!(received2.message.get_payload_str(), "msg2");
864        (received2.commit)(MessageDisposition::Ack).await.unwrap();
865    }
866
867    #[tokio::test]
868    async fn test_memory_mixed_mode_error() {
869        let topic_q = format!("mixed_q_{}", fast_uuid_v7::gen_id_str());
870        let topic_l = format!("mixed_l_{}", fast_uuid_v7::gen_id_str());
871
872        // Case 1: Active Queue, try to create Log Consumer
873        let _pub_q = MemoryPublisher::new_local(&topic_q, 10); // Creates Queue backend
874
875        let log_conf = MemoryConfig {
876            topic: topic_q.clone(),
877            subscribe_mode: true,
878            ..Default::default()
879        };
880        let err = MemoryConsumer::new(&log_conf);
881        assert!(err.is_err());
882        assert!(err
883            .unwrap_err()
884            .to_string()
885            .contains("already active as a Queue"));
886
887        // Case 2: Active Log, try to create Queue Consumer
888        let log_pub_conf = MemoryConfig {
889            topic: topic_l.clone(),
890            subscribe_mode: true,
891            ..Default::default()
892        };
893        let _pub_l = MemoryPublisher::new(&log_pub_conf).unwrap(); // Creates Log backend
894
895        let queue_conf = MemoryConfig {
896            topic: topic_l.clone(),
897            subscribe_mode: false,
898            ..Default::default()
899        };
900        let err = MemoryConsumer::new(&queue_conf);
901        assert!(err.is_err());
902        assert!(err
903            .unwrap_err()
904            .to_string()
905            .contains("already active as a Subscriber Log"));
906    }
907
908    #[tokio::test]
909    async fn test_memory_publisher_mixed_mode_error() {
910        let topic_q = format!("pub_mixed_q_{}", fast_uuid_v7::gen_id_str());
911
912        // 1. Create a Queue Consumer to establish the channel
913        let _cons_q = MemoryConsumer::new_local(&topic_q, 10);
914
915        // 2. Try to create a Log Publisher on the same topic
916        let log_conf = MemoryConfig {
917            topic: topic_q.clone(),
918            subscribe_mode: true,
919            ..Default::default()
920        };
921        let err = MemoryPublisher::new(&log_conf);
922        assert!(err.is_err());
923        assert!(err
924            .unwrap_err()
925            .to_string()
926            .contains("already active as a Queue"));
927    }
928
929    #[tokio::test]
930    async fn test_memory_publisher_adaptive_behavior() {
931        let topic = format!("adaptive_{}", fast_uuid_v7::gen_id_str());
932
933        // 1. Create a Log Consumer (Subscriber) to establish the EventStore
934        let sub_config = MemoryConfig {
935            topic: topic.clone(),
936            subscribe_mode: true,
937            ..Default::default()
938        };
939        let mut subscriber = MemorySubscriber::new(&sub_config, "sub1").unwrap();
940
941        // 2. Create a Publisher WITHOUT subscribe_mode explicitly set
942        let pub_config = MemoryConfig {
943            topic: topic.clone(),
944            subscribe_mode: false, // Default is false
945            ..Default::default()
946        };
947        // This should succeed and adapt to Log mode because the store exists
948        let publisher = MemoryPublisher::new(&pub_config).unwrap();
949
950        // 3. Verify it publishes to the store (subscriber receives it)
951        publisher.send("adaptive_msg".into()).await.unwrap();
952
953        let received = subscriber.receive().await.unwrap();
954        assert_eq!(received.message.get_payload_str(), "adaptive_msg");
955    }
956}