Skip to main content

mq_bridge/endpoints/
memory.rs

1//  mq-bridge
2//  © Copyright 2025, by Marco Mengelkoch
3//  Licensed under MIT License, see License file for more details
4//  git clone https://github.com/marcomq/mq-bridge
5use crate::canonical_message::tracing_support::LazyMessageIds;
6use crate::event_store::{
7    event_store_exists, get_or_create_event_store, EventStore, EventStoreConsumer,
8};
9use crate::models::MemoryConfig;
10use crate::traits::{
11    BatchCommitFunc, BoxFuture, ConsumerError, MessageConsumer, MessageDisposition,
12    MessagePublisher, PublisherError, Received, ReceivedBatch, Sent, SentBatch,
13};
14use crate::CanonicalMessage;
15use anyhow::anyhow;
16use async_channel::{bounded, Receiver, Sender};
17use async_trait::async_trait;
18use once_cell::sync::Lazy;
19use std::any::Any;
20use std::collections::HashMap;
21use std::sync::{Arc, Mutex};
22use tokio::sync::oneshot;
23use tracing::{info, trace};
24
25/// A map to hold memory channels for the duration of the bridge setup.
26/// This allows a consumer and publisher in different routes to connect to the same in-memory topic.
27static RUNTIME_MEMORY_CHANNELS: Lazy<Mutex<HashMap<String, MemoryChannel>>> =
28    Lazy::new(|| Mutex::new(HashMap::new()));
29
30/// A map to hold memory response channels.
31static RUNTIME_RESPONSE_CHANNELS: Lazy<Mutex<HashMap<String, MemoryResponseChannel>>> =
32    Lazy::new(|| Mutex::new(HashMap::new()));
33
34/// A shareable, thread-safe, in-memory channel for testing.
35///
36/// This struct holds the sender and receiver for an in-memory queue.
37/// It can be cloned and shared between your test code and the bridge's endpoints. It transports batches of messages.
38#[derive(Debug, Clone)]
39pub struct MemoryChannel {
40    pub sender: Sender<Vec<CanonicalMessage>>,
41    pub receiver: Receiver<Vec<CanonicalMessage>>,
42}
43
44impl MemoryChannel {
45    /// Creates a new batch channel with a specified capacity.
46    pub fn new(capacity: usize) -> Self {
47        let (sender, receiver) = bounded(capacity);
48        Self { sender, receiver }
49    }
50
51    /// Helper function for tests to easily send a message to the channel.
52    pub async fn send_message(&self, message: CanonicalMessage) -> anyhow::Result<()> {
53        self.sender.send(vec![message]).await?;
54        tracing::debug!("Message sent to memory {} channel", self.sender.len());
55        Ok(())
56    }
57
58    /// Helper function for tests to easily fill in messages.
59    pub async fn fill_messages(&self, messages: Vec<CanonicalMessage>) -> anyhow::Result<()> {
60        // Send the entire vector as a single batch.
61        self.sender
62            .send(messages)
63            .await
64            .map_err(|e| anyhow!("Memory channel was closed while filling messages: {}", e))?;
65        Ok(())
66    }
67
68    /// Closes the sender part of the channel.
69    pub fn close(&self) {
70        self.sender.close();
71    }
72
73    /// Helper function for tests to drain all messages from the channel.
74    pub fn drain_messages(&self) -> Vec<CanonicalMessage> {
75        let mut messages = Vec::new();
76        // Drain all batches from the channel and flatten them into a single Vec.
77        while let Ok(batch) = self.receiver.try_recv() {
78            messages.extend(batch);
79        }
80        messages
81    }
82
83    /// Returns the number of bulk messages in the channel.
84    pub fn len(&self) -> usize {
85        self.receiver.len()
86    }
87
88    /// Returns the number of messages currently in the channel.
89    pub fn is_empty(&self) -> bool {
90        self.receiver.is_empty()
91    }
92}
93
94/// A shareable, thread-safe, in-memory channel for responses.
95#[derive(Debug, Clone)]
96pub struct MemoryResponseChannel {
97    pub sender: Sender<CanonicalMessage>,
98    pub receiver: Receiver<CanonicalMessage>,
99    waiters: Arc<tokio::sync::Mutex<HashMap<String, oneshot::Sender<CanonicalMessage>>>>,
100}
101
102impl MemoryResponseChannel {
103    pub fn new(capacity: usize) -> Self {
104        let (sender, receiver) = bounded(capacity);
105        Self {
106            sender,
107            receiver,
108            waiters: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
109        }
110    }
111
112    pub fn close(&self) {
113        self.sender.close();
114    }
115
116    pub fn len(&self) -> usize {
117        self.receiver.len()
118    }
119
120    pub fn is_empty(&self) -> bool {
121        self.receiver.is_empty()
122    }
123
124    pub async fn wait_for_response(&self) -> anyhow::Result<CanonicalMessage> {
125        self.receiver
126            .recv()
127            .await
128            .map_err(|e| anyhow!("Error receiving response: {}", e))
129    }
130
131    pub async fn register_waiter(
132        &self,
133        correlation_id: &str,
134        sender: oneshot::Sender<CanonicalMessage>,
135    ) -> anyhow::Result<()> {
136        let mut waiters = self.waiters.lock().await;
137        if waiters.contains_key(correlation_id) {
138            return Err(anyhow!(
139                "Correlation ID {} already registered",
140                correlation_id
141            ));
142        }
143        waiters.insert(correlation_id.to_string(), sender);
144        Ok(())
145    }
146
147    pub async fn remove_waiter(
148        &self,
149        correlation_id: &str,
150    ) -> Option<oneshot::Sender<CanonicalMessage>> {
151        self.waiters.lock().await.remove(correlation_id)
152    }
153}
154
155/// Gets a shared `MemoryChannel` for a given topic, creating it if it doesn't exist.
156pub fn get_or_create_channel(config: &MemoryConfig) -> MemoryChannel {
157    let mut channels = RUNTIME_MEMORY_CHANNELS.lock().unwrap();
158    channels
159        .entry(config.topic.clone()) // Use the HashMap's entry API
160        .or_insert_with(|| {
161            info!(topic = %config.topic, "Creating new runtime memory channel");
162            MemoryChannel::new(config.capacity.unwrap_or(100))
163        })
164        .clone()
165}
166
167/// Gets a shared `MemoryResponseChannel` for a given topic, creating it if it doesn't exist.
168pub fn get_or_create_response_channel(topic: &str) -> MemoryResponseChannel {
169    let mut channels = RUNTIME_RESPONSE_CHANNELS.lock().unwrap();
170    channels
171        .entry(topic.to_string())
172        .or_insert_with(|| {
173            info!(topic = %topic, "Creating new runtime memory response channel");
174            MemoryResponseChannel::new(100)
175        })
176        .clone()
177}
178
179fn memory_channel_exists(topic: &str) -> bool {
180    let channels = RUNTIME_MEMORY_CHANNELS.lock().unwrap();
181    channels.contains_key(topic)
182}
183
184/// A sink that sends messages to an in-memory channel.
185#[derive(Debug, Clone)]
186pub struct MemoryPublisher {
187    topic: String,
188    backend: PublisherBackend,
189    request_reply: bool,
190    request_timeout: std::time::Duration,
191}
192
193#[derive(Debug, Clone)]
194enum PublisherBackend {
195    Queue(Sender<Vec<CanonicalMessage>>),
196    Log(Arc<EventStore>),
197}
198
199impl MemoryPublisher {
200    pub fn new(config: &MemoryConfig) -> anyhow::Result<Self> {
201        let channel_exists = memory_channel_exists(&config.topic);
202        let store_exists = event_store_exists(&config.topic);
203
204        let backend = if config.subscribe_mode {
205            if channel_exists {
206                return Err(anyhow!("Topic '{}' is already active as a Queue (MemoryChannel), but Subscriber mode (EventStore) was requested.", config.topic));
207            }
208            let store = get_or_create_event_store(&config.topic);
209            PublisherBackend::Log(store)
210        } else if store_exists {
211            // Adaptive behavior: If an EventStore already exists, we publish to it even if
212            // subscribe_mode wasn't explicitly set. This prevents split-brain scenarios.
213            tracing::debug!(topic = %config.topic, "Adapting publisher to Log mode due to existing EventStore");
214            let store = get_or_create_event_store(&config.topic);
215            PublisherBackend::Log(store)
216        } else {
217            let channel = get_or_create_channel(config);
218            PublisherBackend::Queue(channel.sender)
219        };
220
221        Ok(Self {
222            topic: config.topic.clone(),
223            backend,
224            request_reply: config.request_reply,
225            request_timeout: std::time::Duration::from_millis(
226                config.request_timeout_ms.unwrap_or(30000),
227            ),
228        })
229    }
230
231    /// Creates a new local memory publisher.
232    ///
233    /// This method creates a new in-memory publisher with the specified topic and capacity.
234    /// The publisher will send messages to the in-memory channel for the specified topic.
235    pub fn new_local(topic: &str, capacity: usize) -> Self {
236        Self::new(&MemoryConfig {
237            topic: topic.to_string(),
238            capacity: Some(capacity),
239            ..Default::default()
240        })
241        .expect("Failed to create local memory publisher")
242    }
243
244    /// Note: This helper is primarily for tests expecting a Queue.    
245    /// If used on a broadcast publisher, it will create a separate Queue channel.
246    pub fn channel(&self) -> MemoryChannel {
247        get_or_create_channel(&MemoryConfig {
248            topic: self.topic.clone(),
249            capacity: None,
250            ..Default::default()
251        })
252    }
253}
254
255#[async_trait]
256impl MessagePublisher for MemoryPublisher {
257    async fn send(&self, mut message: CanonicalMessage) -> Result<Sent, PublisherError> {
258        match &self.backend {
259            PublisherBackend::Log(store) => {
260                store.append(message).await;
261                Ok(Sent::Ack)
262            }
263            PublisherBackend::Queue(sender) => {
264                if self.request_reply {
265                    let cid = message
266                        .metadata
267                        .entry("correlation_id".to_string())
268                        .or_insert_with(fast_uuid_v7::gen_id_string)
269                        .clone();
270
271                    let (tx, rx) = oneshot::channel();
272
273                    // Register waiter before sending
274                    let response_channel = get_or_create_response_channel(&self.topic);
275                    response_channel
276                        .register_waiter(&cid, tx)
277                        .await
278                        .map_err(PublisherError::NonRetryable)?;
279
280                    // Send the message
281                    // We use the internal sender directly to avoid recursion or cloning issues
282                    if let Err(e) = sender.send(vec![message]).await {
283                        response_channel.remove_waiter(&cid).await;
284                        return Err(anyhow!("Failed to send to memory channel: {}", e).into());
285                    }
286
287                    // Wait for the response
288                    let response = match tokio::time::timeout(self.request_timeout, rx).await {
289                        Ok(Ok(resp)) => resp,
290                        Ok(Err(e)) => {
291                            response_channel.remove_waiter(&cid).await;
292                            return Err(anyhow!(
293                                "Failed to receive response for correlation_id {}: {}",
294                                cid,
295                                e
296                            )
297                            .into());
298                        }
299                        Err(_) => {
300                            response_channel.remove_waiter(&cid).await;
301                            return Err(PublisherError::Retryable(anyhow!(
302                                "Request timed out waiting for response for correlation_id {}",
303                                cid
304                            )));
305                        }
306                    };
307
308                    Ok(Sent::Response(response))
309                } else {
310                    self.send_batch(vec![message]).await?;
311                    Ok(Sent::Ack)
312                }
313            }
314        }
315    }
316
317    async fn send_batch(
318        &self,
319        messages: Vec<CanonicalMessage>,
320    ) -> Result<SentBatch, PublisherError> {
321        match &self.backend {
322            PublisherBackend::Log(store) => {
323                trace!(
324                    topic = %self.topic,
325                    message_ids = ?LazyMessageIds(&messages),
326                    "Appending batch to event store"
327                );
328                store.append_batch(messages).await;
329                Ok(SentBatch::Ack)
330            }
331            PublisherBackend::Queue(sender) => {
332                trace!(
333                    topic = %self.topic,
334                    message_ids = ?LazyMessageIds(&messages),
335                    "Sending batch to memory channel. Current batch count: {}",
336                    sender.len()
337                );
338                sender
339                    .send(messages)
340                    .await
341                    .map_err(|e| anyhow!("Failed to send to memory channel: {}", e))?;
342                Ok(SentBatch::Ack)
343            }
344        }
345    }
346
347    fn as_any(&self) -> &dyn Any {
348        self
349    }
350}
351
352/// A queue-based consumer (legacy behavior).
353#[derive(Debug)]
354pub struct MemoryQueueConsumer {
355    topic: String,
356    receiver: Receiver<Vec<CanonicalMessage>>,
357    // Internal buffer to hold messages from a received batch.
358    buffer: Vec<CanonicalMessage>,
359    enable_nack: bool,
360}
361
362/// A source that reads messages from an in-memory channel or event store.
363#[derive(Debug)]
364pub enum MemoryConsumer {
365    Queue(MemoryQueueConsumer),
366    Log {
367        consumer: EventStoreConsumer,
368        topic: String,
369    },
370}
371
372impl MemoryConsumer {
373    pub fn new(config: &MemoryConfig) -> anyhow::Result<Self> {
374        let channel_exists = memory_channel_exists(&config.topic);
375        let store_exists = event_store_exists(&config.topic);
376
377        if config.subscribe_mode {
378            if channel_exists {
379                return Err(anyhow!("Topic '{}' is already active as a Queue (MemoryChannel), but Subscriber mode (EventStore) was requested.", config.topic));
380            }
381            let store = get_or_create_event_store(&config.topic);
382            // For subscriber mode, we generate a unique ID if one isn't implicit in the usage.
383            // However, MemorySubscriber struct usually handles the ID.
384            // If MemoryConsumer is used directly with subscribe_mode=true, we assume a default ID or ephemeral.
385            let subscriber_id = format!("{}-consumer", config.topic);
386            info!(topic = %config.topic, subscriber_id = %subscriber_id, "Memory consumer (Log mode) connected");
387            let consumer = store.consumer(subscriber_id);
388            Ok(Self::Log {
389                consumer,
390                topic: config.topic.clone(),
391            })
392        } else {
393            if store_exists {
394                // Unlike the Publisher, we cannot silently adapt to Log mode here.
395                // The EventStore implementation currently supports Pub/Sub (broadcast) only.
396                // Adapting would result in this consumer receiving all messages, violating
397                // the expected Queue (competing consumer) semantics requested by `subscribe_mode: false`.
398                return Err(anyhow!("Topic '{}' is already active as a Subscriber Log (EventStore), but Queue mode (MemoryChannel) was requested.", config.topic));
399            }
400            let queue = MemoryQueueConsumer::new(config)?;
401            Ok(Self::Queue(queue))
402        }
403    }
404}
405
406impl MemoryQueueConsumer {
407    pub fn new(config: &MemoryConfig) -> anyhow::Result<Self> {
408        let channel = get_or_create_channel(config);
409        Ok(Self {
410            topic: config.topic.clone(),
411            receiver: channel.receiver.clone(),
412            buffer: Vec::new(),
413            enable_nack: config.enable_nack,
414        })
415    }
416
417    async fn get_buffered_msgs(
418        &mut self,
419        max_messages: usize,
420    ) -> Result<Vec<CanonicalMessage>, ConsumerError> {
421        // If the internal buffer has messages, return them first.
422        if self.buffer.is_empty() {
423            // Buffer is empty. Wait for a new batch from the channel.
424            self.buffer = match self.receiver.recv().await {
425                Ok(batch) => batch,
426                Err(_) => return Err(ConsumerError::EndOfStream),
427            };
428            // Reverse the buffer so we can efficiently pop from the end.
429            self.buffer.reverse();
430        }
431
432        // Determine the number of messages to take from the buffer.
433        let num_to_take = self.buffer.len().min(max_messages);
434        let split_at = self.buffer.len() - num_to_take;
435
436        // `split_off` is highly efficient. It splits the Vec in two at the given
437        // index and returns the part after the index, leaving the first part.
438        let mut messages = self.buffer.split_off(split_at);
439        messages.reverse(); // Reverse back to original order.
440        Ok(messages)
441    }
442}
443
444#[async_trait]
445impl MessageConsumer for MemoryQueueConsumer {
446    async fn receive_batch(&mut self, max_messages: usize) -> Result<ReceivedBatch, ConsumerError> {
447        // If the internal buffer has messages, return them first.
448
449        let mut messages = self.get_buffered_msgs(max_messages).await?;
450        while messages.len() < max_messages / 2 {
451            if let Ok(mut next_batch) = self.receiver.try_recv() {
452                if next_batch.len() + messages.len() > max_messages {
453                    let needed = max_messages - messages.len();
454                    let mut to_buffer = next_batch.split_off(needed);
455                    messages.append(&mut next_batch);
456                    self.buffer.append(&mut to_buffer);
457                    self.buffer.reverse();
458                    break;
459                } else {
460                    messages.append(&mut next_batch);
461                }
462            } else {
463                break;
464            }
465        }
466        trace!(count = messages.len(), topic = %self.topic, message_ids = ?LazyMessageIds(&messages), "Received batch of memory messages");
467        if messages.is_empty() {
468            return Ok(ReceivedBatch {
469                messages: Vec::new(),
470                commit: Box::new(|_| {
471                    Box::pin(async move { Ok(()) }) as BoxFuture<'static, anyhow::Result<()>>
472                }),
473            });
474        }
475
476        let topic = self.topic.clone();
477        let expected_count = messages.len();
478        let correlation_ids: Vec<Option<String>> = messages
479            .iter()
480            .map(|m| m.metadata.get("correlation_id").cloned())
481            .collect();
482
483        // This clone is necessary to support NACKs. The commit function needs access
484        // to the messages to re-queue them, but the `ReceivedBatch` must also return
485        // ownership of the messages to the caller. Without changing the core traits,
486        // cloning is the only way to satisfy both owners.
487        let messages_for_retry = if self.enable_nack {
488            Some(messages.clone())
489        } else {
490            None
491        };
492        let commit = Box::new(move |dispositions: Vec<MessageDisposition>| {
493            Box::pin(async move {
494                if dispositions.len() != expected_count {
495                    return Err(anyhow::anyhow!(
496                        "Memory batch commit received mismatched disposition count: expected {}, got {}",
497                        expected_count,
498                        dispositions.len()
499                    ));
500                }
501                let response_channel = get_or_create_response_channel(&topic);
502                let mut to_requeue = Vec::new();
503
504                for (i, disposition) in dispositions.into_iter().enumerate() {
505                    match disposition {
506                        MessageDisposition::Reply(mut resp) => {
507                            if !resp.metadata.contains_key("correlation_id") {
508                                if let Some(Some(cid)) = correlation_ids.get(i) {
509                                    resp.metadata
510                                        .insert("correlation_id".to_string(), cid.clone());
511                                }
512                            }
513
514                            // If the receiver is dropped, sending will fail. We can ignore it.
515                            let mut handled = false;
516                            if let Some(cid) = resp.metadata.get("correlation_id") {
517                                if let Some(tx) = response_channel.remove_waiter(cid).await {
518                                    let _ = tx.send(resp.clone());
519                                    handled = true;
520                                }
521                            }
522                            if !handled {
523                                let _ = response_channel.sender.send(resp).await;
524                            }
525                        }
526                        MessageDisposition::Nack => {
527                            // Re-queue the message if Nacked
528                            if let Some(msgs) = &messages_for_retry {
529                                if let Some(msg) = msgs.get(i) {
530                                    to_requeue.push(msg.clone());
531                                }
532                            }
533                        }
534                        MessageDisposition::Ack => {}
535                    }
536                }
537
538                if !to_requeue.is_empty() {
539                    let main_channel = get_or_create_channel(&MemoryConfig {
540                        topic: topic.to_string(),
541                        capacity: None,
542                        ..Default::default()
543                    });
544                    if main_channel.sender.send(to_requeue).await.is_err() {
545                        tracing::error!("Failed to re-queue NACKed messages to memory channel as it was closed.");
546                    }
547                }
548                Ok(())
549            }) as BoxFuture<'static, anyhow::Result<()>>
550        }) as BatchCommitFunc;
551        Ok(ReceivedBatch { messages, commit })
552    }
553
554    fn as_any(&self) -> &dyn Any {
555        self
556    }
557}
558
559#[async_trait]
560impl MessageConsumer for MemoryConsumer {
561    async fn receive_batch(&mut self, max_messages: usize) -> Result<ReceivedBatch, ConsumerError> {
562        match self {
563            Self::Queue(q) => q.receive_batch(max_messages).await,
564            Self::Log { consumer, .. } => consumer.receive_batch(max_messages).await,
565        }
566    }
567
568    fn as_any(&self) -> &dyn Any {
569        self
570    }
571}
572
573impl MemoryConsumer {
574    pub fn new_local(topic: &str, capacity: usize) -> Self {
575        Self::new(&MemoryConfig {
576            topic: topic.to_string(),
577            capacity: Some(capacity),
578            ..Default::default()
579        })
580        .expect("Failed to create local memory consumer")
581    }
582    pub fn channel(&self) -> MemoryChannel {
583        let topic = match self {
584            Self::Queue(q) => &q.topic,
585            Self::Log { topic, .. } => topic,
586        };
587        get_or_create_channel(&MemoryConfig {
588            topic: topic.clone(),
589            ..Default::default()
590        })
591    }
592}
593
594pub struct MemorySubscriber {
595    consumer: MemoryConsumer,
596}
597
598impl MemorySubscriber {
599    pub fn new(config: &MemoryConfig, id: &str) -> anyhow::Result<Self> {
600        let mut sub_config = config.clone();
601        // If subscribe_mode is true, we use EventStore with the original topic but unique subscriber ID.
602        // If false (legacy), we use the suffixed topic queue.
603        let consumer = if config.subscribe_mode {
604            let store = get_or_create_event_store(&config.topic);
605            MemoryConsumer::Log {
606                consumer: store.consumer(id.to_string()),
607                topic: config.topic.clone(),
608            }
609        } else {
610            sub_config.topic = format!("{}-{}", config.topic, id);
611            MemoryConsumer::new(&sub_config)?
612        };
613        Ok(Self { consumer })
614    }
615}
616
617#[async_trait]
618impl MessageConsumer for MemorySubscriber {
619    async fn receive_batch(&mut self, max_messages: usize) -> Result<ReceivedBatch, ConsumerError> {
620        self.consumer.receive_batch(max_messages).await
621    }
622
623    async fn receive(&mut self) -> Result<Received, ConsumerError> {
624        self.consumer.receive().await
625    }
626
627    fn as_any(&self) -> &dyn Any {
628        self
629    }
630}
631
632#[cfg(test)]
633mod tests {
634    use super::*;
635    use crate::models::{Endpoint, Route};
636    use crate::traits::Handled;
637    use crate::{msg, CanonicalMessage};
638    use serde_json::json;
639    use tokio::time::sleep;
640
641    #[tokio::test]
642    async fn test_memory_channel_integration() {
643        let mut consumer = MemoryConsumer::new_local("test-mem1", 10);
644        let publisher = MemoryPublisher::new_local("test-mem1", 10);
645
646        let msg = msg!(json!({"hello": "memory"}));
647
648        // Send a message via the publisher
649        publisher.send(msg.clone()).await.unwrap();
650
651        sleep(std::time::Duration::from_millis(10)).await;
652        // Receive it with the consumer
653        let received = consumer.receive().await.unwrap();
654        let _ = (received.commit)(MessageDisposition::Ack).await;
655        assert_eq!(received.message.payload, msg.payload);
656        assert_eq!(consumer.channel().len(), 0);
657    }
658
659    #[tokio::test]
660    async fn test_memory_publisher_and_consumer_integration() {
661        let mut consumer = MemoryConsumer::new_local("test-mem2", 10);
662        let publisher = MemoryPublisher::new_local("test-mem2", 10);
663
664        let msg1 = msg!(json!({"message": "one"}));
665        let msg2 = msg!(json!({"message": "two"}));
666        let msg3 = msg!(json!({"message": "three"}));
667
668        // 3. Send messages via the publisher
669        publisher
670            .send_batch(vec![msg1.clone(), msg2.clone()])
671            .await
672            .unwrap();
673        publisher.send(msg3.clone()).await.unwrap();
674
675        // 4. Verify the channel has the messages
676        assert_eq!(publisher.channel().len(), 2);
677
678        // 5. Receive the messages and verify them
679        let received1 = consumer.receive().await.unwrap();
680        let _ = (received1.commit)(MessageDisposition::Ack).await;
681        assert_eq!(received1.message.payload, msg1.payload);
682
683        let batch2 = consumer.receive_batch(1).await.unwrap();
684        let (received_msg2, commit2) = (batch2.messages, batch2.commit);
685        let _ = commit2(vec![MessageDisposition::Ack; received_msg2.len()]).await;
686        assert_eq!(received_msg2.len(), 1);
687        assert_eq!(received_msg2.first().unwrap().payload, msg2.payload);
688        let batch3 = consumer.receive_batch(2).await.unwrap();
689        let (received_msg3, commit3) = (batch3.messages, batch3.commit);
690        let _ = commit3(vec![MessageDisposition::Ack; received_msg3.len()]).await;
691        assert_eq!(received_msg3.first().unwrap().payload, msg3.payload);
692
693        // 6. Verify that the channel is now empty
694        assert_eq!(publisher.channel().len(), 0);
695
696        // 7. Verify that reading again results in an error because the channel is empty and we are not closing it
697        // In a real scenario with a closed channel, this would error out. Here we can just check it's empty.
698        // A `receive` call would just hang, waiting for a message.
699    }
700
701    #[tokio::test]
702    async fn test_memory_subscriber_structure() {
703        let cfg = MemoryConfig {
704            topic: "base_topic".to_string(),
705            capacity: Some(10),
706            ..Default::default()
707        };
708        let subscriber_id = "sub1";
709        let mut subscriber = MemorySubscriber::new(&cfg, subscriber_id).unwrap();
710
711        // The subscriber should be listening on "base_topic-sub1"
712        // We can verify this by creating a publisher for that specific topic.
713        let pub_cfg = MemoryConfig {
714            topic: format!("base_topic-{}", subscriber_id),
715            capacity: Some(10),
716            ..Default::default()
717        };
718        let publisher = MemoryPublisher::new(&pub_cfg).unwrap();
719
720        publisher.send("hello subscriber".into()).await.unwrap();
721
722        let received = subscriber.receive().await.unwrap();
723        assert_eq!(received.message.get_payload_str(), "hello subscriber");
724    }
725
726    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
727    async fn test_memory_request_reply_mode() {
728        let topic = format!("mem_rr_topic_{}", fast_uuid_v7::gen_id_str());
729        let input_endpoint = Endpoint::new_memory(&topic, 10);
730        let output_endpoint = Endpoint::new_response();
731        let handler = |mut msg: CanonicalMessage| async move {
732            let request_payload = msg.get_payload_str();
733            let response_payload = format!("reply to {}", request_payload);
734            msg.set_payload_str(response_payload);
735            Ok(Handled::Publish(msg))
736        };
737
738        let route = Route::new(input_endpoint, output_endpoint).with_handler(handler);
739        route.deploy("mem_rr_test").await.unwrap();
740
741        // Create a publisher with request_reply = true
742        let publisher = MemoryPublisher::new(&MemoryConfig {
743            topic: topic.clone(),
744            capacity: Some(10),
745            request_reply: true,
746            request_timeout_ms: Some(2000),
747            ..Default::default()
748        })
749        .unwrap();
750
751        let result = publisher.send("direct request".into()).await.unwrap();
752
753        if let Sent::Response(response_msg) = result {
754            assert_eq!(response_msg.get_payload_str(), "reply to direct request");
755        } else {
756            panic!("Expected Sent::Response, got {:?}", result);
757        }
758
759        // Clean up
760        Route::stop("mem_rr_test").await;
761    }
762
763    #[tokio::test]
764    async fn test_memory_nack_requeue() {
765        let topic = format!("test_nack_requeue_{}", fast_uuid_v7::gen_id_str());
766        let config = MemoryConfig {
767            topic: topic.clone(),
768            capacity: Some(10),
769            enable_nack: true,
770            ..Default::default()
771        };
772        let mut consumer = MemoryConsumer::new(&config).unwrap();
773        let publisher = MemoryPublisher::new_local(&topic, 10);
774
775        publisher.send("to_be_nacked".into()).await.unwrap();
776
777        // 1. Receive and Nack
778        let received1 = consumer.receive().await.unwrap();
779        assert_eq!(received1.message.get_payload_str(), "to_be_nacked");
780        (received1.commit)(crate::traits::MessageDisposition::Nack)
781            .await
782            .unwrap();
783
784        // 2. Receive again (should be re-queued)
785        let received2 = tokio::time::timeout(std::time::Duration::from_secs(1), consumer.receive())
786            .await
787            .expect("Timed out waiting for re-queued message")
788            .unwrap();
789        assert_eq!(received2.message.get_payload_str(), "to_be_nacked");
790
791        // 3. Ack
792        (received2.commit)(crate::traits::MessageDisposition::Ack)
793            .await
794            .unwrap();
795
796        // 4. Verify empty
797        let result =
798            tokio::time::timeout(std::time::Duration::from_millis(100), consumer.receive()).await;
799        assert!(result.is_err(), "Channel should be empty");
800    }
801
802    #[tokio::test]
803    async fn test_memory_event_store_integration() {
804        let topic = "event_store_test";
805        // Publisher with subscribe_mode=true enables EventStore writing
806        let pub_config = MemoryConfig {
807            topic: topic.to_string(),
808            subscribe_mode: true,
809            ..Default::default()
810        };
811        let publisher = MemoryPublisher::new(&pub_config).unwrap();
812
813        // Subscriber 1
814        let mut sub1 = MemorySubscriber::new(&pub_config, "sub1").unwrap();
815        // Subscriber 2
816        let mut sub2 = MemorySubscriber::new(&pub_config, "sub2").unwrap();
817
818        publisher.send("event1".into()).await.unwrap();
819
820        let msg1 = sub1.receive().await.unwrap();
821        assert_eq!(msg1.message.get_payload_str(), "event1");
822        (msg1.commit)(MessageDisposition::Ack).await.unwrap();
823
824        let msg2 = sub2.receive().await.unwrap();
825        assert_eq!(msg2.message.get_payload_str(), "event1");
826    }
827
828    #[tokio::test]
829    async fn test_memory_no_subscribers_persistence() {
830        let topic = format!("no_subs_{}", fast_uuid_v7::gen_id_str());
831        let pub_config = MemoryConfig {
832            topic: topic.clone(),
833            subscribe_mode: true,
834            ..Default::default()
835        };
836
837        // 1. Create Publisher (Log mode)
838        let publisher = MemoryPublisher::new(&pub_config).unwrap();
839
840        // 2. Publish messages with no subscribers
841        publisher.send("msg1".into()).await.unwrap();
842        publisher.send("msg2".into()).await.unwrap();
843
844        // 3. Create Subscriber (Late joiner)
845        let sub_config = MemoryConfig {
846            topic: topic.clone(),
847            subscribe_mode: true,
848            ..Default::default()
849        };
850        let mut subscriber = MemorySubscriber::new(&sub_config, "late_sub").unwrap();
851
852        // 4. Verify messages are received
853        let received1 = subscriber.receive().await.unwrap();
854        assert_eq!(received1.message.get_payload_str(), "msg1");
855        (received1.commit)(MessageDisposition::Ack).await.unwrap();
856
857        let received2 = subscriber.receive().await.unwrap();
858        assert_eq!(received2.message.get_payload_str(), "msg2");
859        (received2.commit)(MessageDisposition::Ack).await.unwrap();
860    }
861
862    #[tokio::test]
863    async fn test_memory_mixed_mode_error() {
864        let topic_q = format!("mixed_q_{}", fast_uuid_v7::gen_id_str());
865        let topic_l = format!("mixed_l_{}", fast_uuid_v7::gen_id_str());
866
867        // Case 1: Active Queue, try to create Log Consumer
868        let _pub_q = MemoryPublisher::new_local(&topic_q, 10); // Creates Queue backend
869
870        let log_conf = MemoryConfig {
871            topic: topic_q.clone(),
872            subscribe_mode: true,
873            ..Default::default()
874        };
875        let err = MemoryConsumer::new(&log_conf);
876        assert!(err.is_err());
877        assert!(err
878            .unwrap_err()
879            .to_string()
880            .contains("already active as a Queue"));
881
882        // Case 2: Active Log, try to create Queue Consumer
883        let log_pub_conf = MemoryConfig {
884            topic: topic_l.clone(),
885            subscribe_mode: true,
886            ..Default::default()
887        };
888        let _pub_l = MemoryPublisher::new(&log_pub_conf).unwrap(); // Creates Log backend
889
890        let queue_conf = MemoryConfig {
891            topic: topic_l.clone(),
892            subscribe_mode: false,
893            ..Default::default()
894        };
895        let err = MemoryConsumer::new(&queue_conf);
896        assert!(err.is_err());
897        assert!(err
898            .unwrap_err()
899            .to_string()
900            .contains("already active as a Subscriber Log"));
901    }
902
903    #[tokio::test]
904    async fn test_memory_publisher_mixed_mode_error() {
905        let topic_q = format!("pub_mixed_q_{}", fast_uuid_v7::gen_id_str());
906
907        // 1. Create a Queue Consumer to establish the channel
908        let _cons_q = MemoryConsumer::new_local(&topic_q, 10);
909
910        // 2. Try to create a Log Publisher on the same topic
911        let log_conf = MemoryConfig {
912            topic: topic_q.clone(),
913            subscribe_mode: true,
914            ..Default::default()
915        };
916        let err = MemoryPublisher::new(&log_conf);
917        assert!(err.is_err());
918        assert!(err
919            .unwrap_err()
920            .to_string()
921            .contains("already active as a Queue"));
922    }
923
924    #[tokio::test]
925    async fn test_memory_publisher_adaptive_behavior() {
926        let topic = format!("adaptive_{}", fast_uuid_v7::gen_id_str());
927
928        // 1. Create a Log Consumer (Subscriber) to establish the EventStore
929        let sub_config = MemoryConfig {
930            topic: topic.clone(),
931            subscribe_mode: true,
932            ..Default::default()
933        };
934        let mut subscriber = MemorySubscriber::new(&sub_config, "sub1").unwrap();
935
936        // 2. Create a Publisher WITHOUT subscribe_mode explicitly set
937        let pub_config = MemoryConfig {
938            topic: topic.clone(),
939            subscribe_mode: false, // Default is false
940            ..Default::default()
941        };
942        // This should succeed and adapt to Log mode because the store exists
943        let publisher = MemoryPublisher::new(&pub_config).unwrap();
944
945        // 3. Verify it publishes to the store (subscriber receives it)
946        publisher.send("adaptive_msg".into()).await.unwrap();
947
948        let received = subscriber.receive().await.unwrap();
949        assert_eq!(received.message.get_payload_str(), "adaptive_msg");
950    }
951}