Skip to main content

mq_bridge/endpoints/
memory.rs

1//  mq-bridge
2//  © Copyright 2025, by Marco Mengelkoch
3//  Licensed under MIT License, see License file for more details
4//  git clone https://github.com/marcomq/mq-bridge
5use crate::canonical_message::tracing_support::LazyMessageIds;
6use crate::event_store::{
7    event_store_exists, get_or_create_event_store, EventStore, EventStoreConsumer,
8};
9use crate::models::MemoryConfig;
10use crate::traits::{
11    BatchCommitFunc, BoxFuture, ConsumerError, EndpointStatus, MessageConsumer, MessageDisposition,
12    MessagePublisher, PublisherError, Received, ReceivedBatch, Sent, SentBatch,
13};
14use crate::CanonicalMessage;
15use anyhow::anyhow;
16use async_channel::{bounded, Receiver, Sender};
17use async_trait::async_trait;
18use once_cell::sync::Lazy;
19use std::any::Any;
20use std::collections::HashMap;
21use std::sync::{Arc, Mutex};
22use tokio::sync::oneshot;
23use tracing::{info, trace, warn};
24
25/// A map to hold memory channels for the duration of the bridge setup.
26/// This allows a consumer and publisher in different routes to connect to the same in-memory topic.
27static RUNTIME_MEMORY_CHANNELS: Lazy<Mutex<HashMap<String, MemoryChannel>>> =
28    Lazy::new(|| Mutex::new(HashMap::new()));
29
30/// A map to hold memory response channels.
31static RUNTIME_RESPONSE_CHANNELS: Lazy<Mutex<HashMap<String, MemoryResponseChannel>>> =
32    Lazy::new(|| Mutex::new(HashMap::new()));
33
34/// A shareable, thread-safe, in-memory channel for testing.
35///
36/// This struct holds the sender and receiver for an in-memory queue.
37/// It can be cloned and shared between your test code and the bridge's endpoints. It transports batches of messages.
38#[derive(Debug, Clone)]
39pub struct MemoryChannel {
40    pub sender: Sender<Vec<CanonicalMessage>>,
41    pub receiver: Receiver<Vec<CanonicalMessage>>,
42}
43
44impl MemoryChannel {
45    /// Creates a new batch channel with a specified capacity.
46    pub fn new(capacity: usize) -> Self {
47        let (sender, receiver) = bounded(capacity);
48        Self { sender, receiver }
49    }
50
51    /// Helper function for tests to easily send a message to the channel.
52    pub async fn send_message(&self, message: CanonicalMessage) -> anyhow::Result<()> {
53        self.sender.send(vec![message]).await?;
54        tracing::debug!("Message sent to memory {} channel", self.sender.len());
55        Ok(())
56    }
57
58    /// Helper function for tests to easily fill in messages.
59    pub async fn fill_messages(&self, messages: Vec<CanonicalMessage>) -> anyhow::Result<()> {
60        // Send the entire vector as a single batch.
61        self.sender
62            .send(messages)
63            .await
64            .map_err(|e| anyhow!("Memory channel was closed while filling messages: {}", e))?;
65        Ok(())
66    }
67
68    /// Closes the sender part of the channel.
69    pub fn close(&self) {
70        self.sender.close();
71    }
72
73    /// Helper function for tests to drain all messages from the channel.
74    pub fn drain_messages(&self) -> Vec<CanonicalMessage> {
75        let mut messages = Vec::new();
76        // Drain all batches from the channel and flatten them into a single Vec.
77        while let Ok(batch) = self.receiver.try_recv() {
78            messages.extend(batch);
79        }
80        messages
81    }
82
83    /// Returns the number of bulk messages in the channel.
84    pub fn len(&self) -> usize {
85        self.receiver.len()
86    }
87
88    /// Returns the number of messages currently in the channel.
89    pub fn is_empty(&self) -> bool {
90        self.receiver.is_empty()
91    }
92}
93
94/// A shareable, thread-safe, in-memory channel for responses.
95#[derive(Debug, Clone)]
96pub struct MemoryResponseChannel {
97    pub sender: Sender<CanonicalMessage>,
98    pub receiver: Receiver<CanonicalMessage>,
99    waiters: Arc<tokio::sync::Mutex<HashMap<String, oneshot::Sender<CanonicalMessage>>>>,
100}
101
102impl MemoryResponseChannel {
103    pub fn new(capacity: usize) -> Self {
104        let (sender, receiver) = bounded(capacity);
105        Self {
106            sender,
107            receiver,
108            waiters: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
109        }
110    }
111
112    pub fn close(&self) {
113        self.sender.close();
114    }
115
116    pub fn len(&self) -> usize {
117        self.receiver.len()
118    }
119
120    pub fn is_empty(&self) -> bool {
121        self.receiver.is_empty()
122    }
123
124    pub async fn wait_for_response(&self) -> anyhow::Result<CanonicalMessage> {
125        self.receiver
126            .recv()
127            .await
128            .map_err(|e| anyhow!("Error receiving response: {}", e))
129    }
130
131    pub async fn register_waiter(
132        &self,
133        correlation_id: &str,
134        sender: oneshot::Sender<CanonicalMessage>,
135    ) -> anyhow::Result<()> {
136        let mut waiters = self.waiters.lock().await;
137        if waiters.contains_key(correlation_id) {
138            return Err(anyhow!(
139                "Correlation ID {} already registered",
140                correlation_id
141            ));
142        }
143        waiters.insert(correlation_id.to_string(), sender);
144        Ok(())
145    }
146
147    pub async fn remove_waiter(
148        &self,
149        correlation_id: &str,
150    ) -> Option<oneshot::Sender<CanonicalMessage>> {
151        self.waiters.lock().await.remove(correlation_id)
152    }
153}
154
155/// Gets a shared `MemoryChannel` for a given topic, creating it if it doesn't exist.
156pub fn get_or_create_channel(config: &MemoryConfig) -> MemoryChannel {
157    let mut channels = RUNTIME_MEMORY_CHANNELS.lock().unwrap();
158    channels
159        .entry(config.topic.clone()) // Use the HashMap's entry API
160        .or_insert_with(|| {
161            info!(topic = %config.topic, "Creating new runtime memory channel");
162            MemoryChannel::new(config.capacity.unwrap_or(100))
163        })
164        .clone()
165}
166
167/// Gets a shared `MemoryResponseChannel` for a given topic, creating it if it doesn't exist.
168pub fn get_or_create_response_channel(topic: &str) -> MemoryResponseChannel {
169    let mut channels = RUNTIME_RESPONSE_CHANNELS.lock().unwrap();
170    channels
171        .entry(topic.to_string())
172        .or_insert_with(|| {
173            info!(topic = %topic, "Creating new runtime memory response channel");
174            MemoryResponseChannel::new(100)
175        })
176        .clone()
177}
178
179fn memory_channel_exists(topic: &str) -> bool {
180    let channels = RUNTIME_MEMORY_CHANNELS.lock().unwrap();
181    channels.contains_key(topic)
182}
183
184/// A sink that sends messages to an in-memory channel.
185#[derive(Debug, Clone)]
186pub struct MemoryPublisher {
187    topic: String,
188    backend: PublisherBackend,
189    request_reply: bool,
190    request_timeout: std::time::Duration,
191}
192
193#[derive(Debug, Clone)]
194enum PublisherBackend {
195    Queue(Sender<Vec<CanonicalMessage>>),
196    Log(Arc<EventStore>),
197}
198
199impl MemoryPublisher {
200    pub fn new(config: &MemoryConfig) -> anyhow::Result<Self> {
201        let channel_exists = memory_channel_exists(&config.topic);
202        let store_exists = event_store_exists(&config.topic);
203
204        let backend = if config.subscribe_mode {
205            if channel_exists {
206                return Err(anyhow!("Topic '{}' is already active as a Queue (MemoryChannel), but Subscriber mode (EventStore) was requested.", config.topic));
207            }
208            let store = get_or_create_event_store(&config.topic);
209            PublisherBackend::Log(store)
210        } else if store_exists {
211            // Adaptive behavior: If an EventStore already exists, we publish to it even if
212            // subscribe_mode wasn't explicitly set. This prevents split-brain scenarios.
213            tracing::debug!(topic = %config.topic, "Adapting publisher to Log mode due to existing EventStore");
214            let store = get_or_create_event_store(&config.topic);
215            PublisherBackend::Log(store)
216        } else {
217            let channel = get_or_create_channel(config);
218            PublisherBackend::Queue(channel.sender)
219        };
220
221        Ok(Self {
222            topic: config.topic.clone(),
223            backend,
224            request_reply: config.request_reply,
225            request_timeout: std::time::Duration::from_millis(
226                config.request_timeout_ms.unwrap_or(30000),
227            ),
228        })
229    }
230
231    /// Creates a new local memory publisher.
232    ///
233    /// This method creates a new in-memory publisher with the specified topic and capacity.
234    /// The publisher will send messages to the in-memory channel for the specified topic.
235    pub fn new_local(topic: &str, capacity: usize) -> Self {
236        Self::new(&MemoryConfig {
237            topic: topic.to_string(),
238            capacity: Some(capacity),
239            ..Default::default()
240        })
241        .expect("Failed to create local memory publisher")
242    }
243
244    /// Note: This helper is primarily for tests expecting a Queue.    
245    /// If used on a broadcast publisher, it will create a separate Queue channel.
246    pub fn channel(&self) -> MemoryChannel {
247        get_or_create_channel(&MemoryConfig {
248            topic: self.topic.clone(),
249            capacity: None,
250            ..Default::default()
251        })
252    }
253}
254
255#[async_trait]
256impl MessagePublisher for MemoryPublisher {
257    async fn send(&self, mut message: CanonicalMessage) -> Result<Sent, PublisherError> {
258        match &self.backend {
259            PublisherBackend::Log(store) => {
260                store.append(message).await;
261                Ok(Sent::Ack)
262            }
263            PublisherBackend::Queue(sender) => {
264                if self.request_reply {
265                    let cid = message
266                        .metadata
267                        .entry("correlation_id".to_string())
268                        .or_insert_with(fast_uuid_v7::gen_id_string)
269                        .clone();
270
271                    let (tx, rx) = oneshot::channel();
272
273                    // Register waiter before sending
274                    let response_channel = get_or_create_response_channel(&self.topic);
275                    response_channel
276                        .register_waiter(&cid, tx)
277                        .await
278                        .map_err(PublisherError::NonRetryable)?;
279
280                    // Send the message
281                    // We use the internal sender directly to avoid recursion or cloning issues
282                    if let Err(e) = sender.send(vec![message]).await {
283                        response_channel.remove_waiter(&cid).await;
284                        return Err(anyhow!("Failed to send to memory channel: {}", e).into());
285                    }
286
287                    // Wait for the response
288                    let response = match tokio::time::timeout(self.request_timeout, rx).await {
289                        Ok(Ok(resp)) => resp,
290                        Ok(Err(e)) => {
291                            response_channel.remove_waiter(&cid).await;
292                            return Err(anyhow!(
293                                "Failed to receive response for correlation_id {}: {}",
294                                cid,
295                                e
296                            )
297                            .into());
298                        }
299                        Err(_) => {
300                            response_channel.remove_waiter(&cid).await;
301                            return Err(PublisherError::Retryable(anyhow!(
302                                "Request timed out waiting for response for correlation_id {}",
303                                cid
304                            )));
305                        }
306                    };
307
308                    Ok(Sent::Response(response))
309                } else {
310                    self.send_batch(vec![message]).await?;
311                    Ok(Sent::Ack)
312                }
313            }
314        }
315    }
316
317    async fn send_batch(
318        &self,
319        messages: Vec<CanonicalMessage>,
320    ) -> Result<SentBatch, PublisherError> {
321        match &self.backend {
322            PublisherBackend::Log(store) => {
323                trace!(
324                    topic = %self.topic,
325                    message_ids = ?LazyMessageIds(&messages),
326                    "Appending batch to event store"
327                );
328                store.append_batch(messages).await;
329                Ok(SentBatch::Ack)
330            }
331            PublisherBackend::Queue(sender) => {
332                trace!(
333                    topic = %self.topic,
334                    message_ids = ?LazyMessageIds(&messages),
335                    "Sending batch to memory channel. Current batch count: {}",
336                    sender.len()
337                );
338                sender
339                    .send(messages)
340                    .await
341                    .map_err(|e| anyhow!("Failed to send to memory channel: {}", e))?;
342                Ok(SentBatch::Ack)
343            }
344        }
345    }
346
347    async fn status(&self) -> EndpointStatus {
348        match &self.backend {
349            PublisherBackend::Queue(sender) => EndpointStatus {
350                healthy: !sender.is_closed(),
351                target: self.topic.clone(),
352                pending: Some(sender.len()),
353                capacity: Some(sender.capacity().unwrap_or(0)),
354                ..Default::default()
355            },
356            PublisherBackend::Log(_store) => EndpointStatus {
357                healthy: true,
358                target: self.topic.clone(),
359                details: serde_json::json!({
360                    "mode": "event_store"
361                }),
362                ..Default::default()
363            },
364        }
365    }
366
367    fn as_any(&self) -> &dyn Any {
368        self
369    }
370}
371
372/// A queue-based consumer (legacy behavior).
373#[derive(Debug)]
374pub struct MemoryQueueConsumer {
375    topic: String,
376    receiver: Receiver<Vec<CanonicalMessage>>,
377    // Internal buffer to hold messages from a received batch.
378    buffer: Vec<CanonicalMessage>,
379    enable_nack: bool,
380}
381
382/// A source that reads messages from an in-memory channel or event store.
383#[derive(Debug)]
384pub enum MemoryConsumer {
385    Queue(MemoryQueueConsumer),
386    Log {
387        consumer: EventStoreConsumer,
388        topic: String,
389    },
390}
391
392impl MemoryConsumer {
393    pub fn new(config: &MemoryConfig) -> anyhow::Result<Self> {
394        let channel_exists = memory_channel_exists(&config.topic);
395        let store_exists = event_store_exists(&config.topic);
396
397        if config.subscribe_mode {
398            if channel_exists {
399                return Err(anyhow!("Topic '{}' is already active as a Queue (MemoryChannel), but Subscriber mode (EventStore) was requested.", config.topic));
400            }
401            let store = get_or_create_event_store(&config.topic);
402            // For subscriber mode, we generate a unique ID if one isn't implicit in the usage.
403            // However, MemorySubscriber struct usually handles the ID.
404            // If MemoryConsumer is used directly with subscribe_mode=true, we assume a default ID or ephemeral.
405            let subscriber_id = format!("{}-consumer", config.topic);
406            info!(topic = %config.topic, subscriber_id = %subscriber_id, "Memory consumer (Log mode) connected");
407            let consumer = store.consumer(subscriber_id);
408            Ok(Self::Log {
409                consumer,
410                topic: config.topic.clone(),
411            })
412        } else {
413            if store_exists {
414                // Unlike the Publisher, we cannot silently adapt to Log mode here.
415                // The EventStore implementation currently supports Pub/Sub (broadcast) only.
416                // Adapting would result in this consumer receiving all messages, violating
417                // the expected Queue (competing consumer) semantics requested by `subscribe_mode: false`.
418                return Err(anyhow!("Topic '{}' is already active as a Subscriber Log (EventStore), but Queue mode (MemoryChannel) was requested.", config.topic));
419            }
420            let queue = MemoryQueueConsumer::new(config)?;
421            Ok(Self::Queue(queue))
422        }
423    }
424}
425
426impl Drop for MemoryQueueConsumer {
427    fn drop(&mut self) {
428        if !self.buffer.is_empty() {
429            let mut messages = std::mem::take(&mut self.buffer);
430            messages.reverse();
431
432            let channel = get_or_create_channel(&MemoryConfig {
433                topic: self.topic.clone(),
434                capacity: None,
435                ..Default::default()
436            });
437
438            match channel.sender.try_send(messages) {
439                Ok(_) => {
440                    info!(topic = %self.topic, "Requeued buffered messages on consumer drop");
441                }
442                Err(e) => {
443                    let msgs = match e {
444                        async_channel::TrySendError::Full(m) => m,
445                        async_channel::TrySendError::Closed(m) => m,
446                    };
447                    warn!(topic = %self.topic, "Channel full on drop, spawning async requeue");
448                    let sender = channel.sender.clone();
449                    if let Ok(handle) = tokio::runtime::Handle::try_current() {
450                        handle.spawn(async move {
451                            if let Err(e) = sender.send(msgs).await {
452                                tracing::error!(
453                                    "Failed to requeue buffered messages in background: {}",
454                                    e
455                                );
456                            }
457                        });
458                    } else {
459                        tracing::error!(topic = %self.topic, "No active runtime found, could not requeue buffered messages on consumer drop");
460                    }
461                }
462            }
463        }
464    }
465}
466
467impl MemoryQueueConsumer {
468    pub fn new(config: &MemoryConfig) -> anyhow::Result<Self> {
469        let channel = get_or_create_channel(config);
470        let buffer = if let Some(capacity) = config.capacity {
471            Vec::with_capacity(capacity)
472        } else {
473            Vec::new()
474        };
475        Ok(Self {
476            topic: config.topic.clone(),
477            receiver: channel.receiver.clone(),
478            buffer,
479            enable_nack: config.enable_nack,
480        })
481    }
482
483    async fn get_buffered_msgs(
484        &mut self,
485        max_messages: usize,
486    ) -> Result<Vec<CanonicalMessage>, ConsumerError> {
487        // If the internal buffer has messages, return them first.
488        if self.buffer.is_empty() {
489            // Buffer is empty. Wait for a new batch from the channel.
490            self.buffer = match self.receiver.recv().await {
491                Ok(batch) => batch,
492                Err(_) => return Err(ConsumerError::EndOfStream),
493            };
494            // Reverse the buffer so we can efficiently pop from the end.
495            self.buffer.reverse();
496        }
497
498        // Determine the number of messages to take from the buffer.
499        let num_to_take = self.buffer.len().min(max_messages);
500        let split_at = self.buffer.len() - num_to_take;
501
502        // `split_off` is highly efficient. It splits the Vec in two at the given
503        // index and returns the part after the index, leaving the first part.
504        let mut messages = self.buffer.split_off(split_at);
505        messages.reverse(); // Reverse back to original order.
506        Ok(messages)
507    }
508}
509
510struct RequeueGuard {
511    topic: String,
512    messages: Vec<CanonicalMessage>,
513}
514
515impl Drop for RequeueGuard {
516    fn drop(&mut self) {
517        if !self.messages.is_empty() {
518            let topic = self.topic.clone();
519            let count = self.messages.len();
520            let messages = std::mem::take(&mut self.messages);
521
522            let channel = get_or_create_channel(&MemoryConfig {
523                topic: topic.clone(),
524                capacity: None,
525                ..Default::default()
526            });
527
528            match channel.sender.try_send(messages) {
529                Ok(_) => {
530                    tracing::info!(topic = %topic, count, "Requeued dropped batch via RequeueGuard");
531                }
532                Err(e) => {
533                    let msgs = match e {
534                        async_channel::TrySendError::Full(m) => m,
535                        async_channel::TrySendError::Closed(m) => m,
536                    };
537                    tracing::warn!(topic = %topic, count, "Failed to requeue dropped batch (channel full/closed), spawning retry");
538                    let sender = channel.sender.clone();
539                    if let Ok(handle) = tokio::runtime::Handle::try_current() {
540                        handle.spawn(async move {
541                            if let Err(e) = sender.send(msgs).await {
542                                tracing::error!(
543                                    "Failed to requeue dropped batch in background: {}",
544                                    e
545                                );
546                            }
547                        });
548                    } else {
549                        tracing::error!(topic = %topic, count, "No active runtime found, could not requeue dropped batch via RequeueGuard");
550                    }
551                }
552            }
553        }
554    }
555}
556
557#[async_trait]
558impl MessageConsumer for MemoryQueueConsumer {
559    async fn receive_batch(&mut self, max_messages: usize) -> Result<ReceivedBatch, ConsumerError> {
560        // If the internal buffer has messages, return them first.
561
562        let mut messages = self.get_buffered_msgs(max_messages).await?;
563        while messages.len() < max_messages / 2 {
564            if let Ok(mut next_batch) = self.receiver.try_recv() {
565                if next_batch.len() + messages.len() > max_messages {
566                    let needed = max_messages - messages.len();
567                    let mut to_buffer = next_batch.split_off(needed);
568                    messages.append(&mut next_batch);
569                    self.buffer.append(&mut to_buffer);
570                    self.buffer.reverse();
571                    break;
572                } else {
573                    messages.append(&mut next_batch);
574                }
575            } else {
576                break;
577            }
578        }
579        trace!(count = messages.len(), topic = %self.topic, message_ids = ?LazyMessageIds(&messages), "Received batch of memory messages");
580        if messages.is_empty() {
581            return Ok(ReceivedBatch {
582                messages: Vec::new(),
583                commit: Box::new(|_| {
584                    Box::pin(async move { Ok(()) }) as BoxFuture<'static, anyhow::Result<()>>
585                }),
586            });
587        }
588
589        let topic = self.topic.clone();
590        let expected_count = messages.len();
591        let correlation_ids: Vec<Option<String>> = messages
592            .iter()
593            .map(|m| m.metadata.get("correlation_id").cloned())
594            .collect();
595
596        // Guard to requeue messages if the batch is dropped without commit/nack.
597        let mut guard = if self.enable_nack {
598            Some(RequeueGuard {
599                topic: self.topic.clone(),
600                messages: messages.clone(),
601            })
602        } else {
603            None
604        };
605
606        let commit = Box::new(move |dispositions: Vec<MessageDisposition>| {
607            Box::pin(async move {
608                if dispositions.len() != expected_count {
609                    return Err(anyhow::anyhow!(
610                        "Memory batch commit received mismatched disposition count: expected {}, got {}",
611                        expected_count,
612                        dispositions.len()
613                    ));
614                }
615
616                // Clone messages from guard to keep it armed during async operations
617                let messages_for_retry = if let Some(g) = &guard {
618                    g.messages.clone()
619                } else {
620                    Vec::new()
621                };
622
623                let response_channel = get_or_create_response_channel(&topic);
624                let mut to_requeue = Vec::new();
625
626                for (i, disposition) in dispositions.into_iter().enumerate() {
627                    match disposition {
628                        MessageDisposition::Reply(resp) => {
629                            handle_memory_reply(resp, i, &correlation_ids, &response_channel).await;
630                        }
631                        MessageDisposition::Nack => {
632                            if let Some(msg) = messages_for_retry.get(i) {
633                                warn!("Requeueing nacked message {}", i);
634                                to_requeue.push(msg.clone());
635                            } else {
636                                warn!("Nack for index {} but no message in retry buffer!", i);
637                            }
638                        }
639                        MessageDisposition::Ack => {}
640                    }
641                }
642
643                if !to_requeue.is_empty() {
644                    let main_channel = get_or_create_channel(&MemoryConfig {
645                        topic: topic.to_string(),
646                        capacity: None,
647                        ..Default::default()
648                    });
649                    if main_channel.sender.send(to_requeue).await.is_err() {
650                        tracing::error!("Failed to re-queue NACKed messages to memory channel as it was closed.");
651                    }
652                }
653
654                // Disarm the guard after all awaits are finished.
655                if let Some(g) = &mut guard {
656                    std::mem::take(&mut g.messages);
657                }
658
659                Ok(())
660            }) as BoxFuture<'static, anyhow::Result<()>>
661        }) as BatchCommitFunc;
662        Ok(ReceivedBatch { messages, commit })
663    }
664
665    async fn status(&self) -> EndpointStatus {
666        let pending = self.receiver.len();
667        let capacity = self.receiver.capacity().unwrap_or(0);
668        EndpointStatus {
669            healthy: !self.receiver.is_closed(),
670            target: self.topic.clone(),
671            pending: Some(pending),
672            capacity: Some(capacity),
673            ..Default::default()
674        }
675    }
676
677    fn as_any(&self) -> &dyn Any {
678        self
679    }
680}
681
682async fn handle_memory_reply(
683    mut resp: CanonicalMessage,
684    index: usize,
685    correlation_ids: &[Option<String>],
686    response_channel: &MemoryResponseChannel,
687) {
688    if !resp.metadata.contains_key("correlation_id") {
689        if let Some(Some(cid)) = correlation_ids.get(index) {
690            resp.metadata
691                .insert("correlation_id".to_string(), cid.clone());
692        }
693    }
694
695    if let Some(cid) = resp.metadata.get("correlation_id") {
696        if let Some(tx) = response_channel.remove_waiter(cid).await {
697            let _ = tx.send(resp);
698            return;
699        }
700    }
701    let _ = response_channel.sender.send(resp).await;
702}
703
704#[async_trait]
705impl MessageConsumer for MemoryConsumer {
706    async fn receive_batch(&mut self, max_messages: usize) -> Result<ReceivedBatch, ConsumerError> {
707        match self {
708            Self::Queue(q) => q.receive_batch(max_messages).await,
709            Self::Log { consumer, .. } => consumer.receive_batch(max_messages).await,
710        }
711    }
712
713    async fn status(&self) -> EndpointStatus {
714        match self {
715            Self::Queue(q) => q.status().await,
716            Self::Log { consumer, .. } => consumer.status().await,
717        }
718    }
719
720    fn as_any(&self) -> &dyn Any {
721        self
722    }
723}
724
725impl MemoryConsumer {
726    pub fn new_local(topic: &str, capacity: usize) -> Self {
727        Self::new(&MemoryConfig {
728            topic: topic.to_string(),
729            capacity: Some(capacity),
730            ..Default::default()
731        })
732        .expect("Failed to create local memory consumer")
733    }
734    pub fn channel(&self) -> MemoryChannel {
735        let topic = match self {
736            Self::Queue(q) => &q.topic,
737            Self::Log { topic, .. } => topic,
738        };
739        get_or_create_channel(&MemoryConfig {
740            topic: topic.clone(),
741            ..Default::default()
742        })
743    }
744}
745
746pub struct MemorySubscriber {
747    consumer: MemoryConsumer,
748}
749
750impl MemorySubscriber {
751    pub fn new(config: &MemoryConfig, id: &str) -> anyhow::Result<Self> {
752        let mut sub_config = config.clone();
753        // If subscribe_mode is true, we use EventStore with the original topic but unique subscriber ID.
754        // If false (legacy), we use the suffixed topic queue.
755        let consumer = if config.subscribe_mode {
756            let store = get_or_create_event_store(&config.topic);
757            MemoryConsumer::Log {
758                consumer: store.consumer(id.to_string()),
759                topic: config.topic.clone(),
760            }
761        } else {
762            sub_config.topic = format!("{}-{}", config.topic, id);
763            MemoryConsumer::new(&sub_config)?
764        };
765        Ok(Self { consumer })
766    }
767}
768
769#[async_trait]
770impl MessageConsumer for MemorySubscriber {
771    async fn receive_batch(&mut self, max_messages: usize) -> Result<ReceivedBatch, ConsumerError> {
772        self.consumer.receive_batch(max_messages).await
773    }
774
775    async fn receive(&mut self) -> Result<Received, ConsumerError> {
776        self.consumer.receive().await
777    }
778
779    fn as_any(&self) -> &dyn Any {
780        self
781    }
782}
783
784#[cfg(test)]
785mod tests {
786    use super::*;
787    use crate::models::{Endpoint, Route};
788    use crate::traits::Handled;
789    use crate::{msg, CanonicalMessage};
790    use serde_json::json;
791    use tokio::time::sleep;
792
793    #[tokio::test]
794    async fn test_memory_channel_integration() {
795        let mut consumer = MemoryConsumer::new_local("test-mem1", 10);
796        let publisher = MemoryPublisher::new_local("test-mem1", 10);
797
798        let msg = msg!(json!({"hello": "memory"}));
799
800        // Send a message via the publisher
801        publisher.send(msg.clone()).await.unwrap();
802
803        sleep(std::time::Duration::from_millis(10)).await;
804        // Receive it with the consumer
805        let received = consumer.receive().await.unwrap();
806        let _ = (received.commit)(MessageDisposition::Ack).await;
807        assert_eq!(received.message.payload, msg.payload);
808        assert_eq!(consumer.channel().len(), 0);
809    }
810
811    #[tokio::test]
812    async fn test_memory_publisher_and_consumer_integration() {
813        let mut consumer = MemoryConsumer::new_local("test-mem2", 10);
814        let publisher = MemoryPublisher::new_local("test-mem2", 10);
815
816        let msg1 = msg!(json!({"message": "one"}));
817        let msg2 = msg!(json!({"message": "two"}));
818        let msg3 = msg!(json!({"message": "three"}));
819
820        // 3. Send messages via the publisher
821        publisher
822            .send_batch(vec![msg1.clone(), msg2.clone()])
823            .await
824            .unwrap();
825        publisher.send(msg3.clone()).await.unwrap();
826
827        // 4. Verify the channel has the messages
828        assert_eq!(publisher.channel().len(), 2);
829
830        // 5. Receive the messages and verify them
831        let received1 = consumer.receive().await.unwrap();
832        let _ = (received1.commit)(MessageDisposition::Ack).await;
833        assert_eq!(received1.message.payload, msg1.payload);
834
835        let batch2 = consumer.receive_batch(1).await.unwrap();
836        let (received_msg2, commit2) = (batch2.messages, batch2.commit);
837        let _ = commit2(vec![MessageDisposition::Ack; received_msg2.len()]).await;
838        assert_eq!(received_msg2.len(), 1);
839        assert_eq!(received_msg2.first().unwrap().payload, msg2.payload);
840        let batch3 = consumer.receive_batch(2).await.unwrap();
841        let (received_msg3, commit3) = (batch3.messages, batch3.commit);
842        let _ = commit3(vec![MessageDisposition::Ack; received_msg3.len()]).await;
843        assert_eq!(received_msg3.first().unwrap().payload, msg3.payload);
844
845        // 6. Verify that the channel is now empty
846        assert_eq!(publisher.channel().len(), 0);
847
848        // 7. Verify that reading again results in an error because the channel is empty and we are not closing it
849        // In a real scenario with a closed channel, this would error out. Here we can just check it's empty.
850        // A `receive` call would just hang, waiting for a message.
851    }
852
853    #[tokio::test]
854    async fn test_memory_subscriber_structure() {
855        let cfg = MemoryConfig {
856            topic: "base_topic".to_string(),
857            capacity: Some(10),
858            ..Default::default()
859        };
860        let subscriber_id = "sub1";
861        let mut subscriber = MemorySubscriber::new(&cfg, subscriber_id).unwrap();
862
863        // The subscriber should be listening on "base_topic-sub1"
864        // We can verify this by creating a publisher for that specific topic.
865        let pub_cfg = MemoryConfig {
866            topic: format!("base_topic-{}", subscriber_id),
867            capacity: Some(10),
868            ..Default::default()
869        };
870        let publisher = MemoryPublisher::new(&pub_cfg).unwrap();
871
872        publisher.send("hello subscriber".into()).await.unwrap();
873
874        let received = subscriber.receive().await.unwrap();
875        assert_eq!(received.message.get_payload_str(), "hello subscriber");
876    }
877
878    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
879    async fn test_memory_request_reply_mode() {
880        let topic = format!("mem_rr_topic_{}", fast_uuid_v7::gen_id_str());
881        let input_endpoint = Endpoint::new_memory(&topic, 10);
882        let output_endpoint = Endpoint::new_response();
883        let handler = |mut msg: CanonicalMessage| async move {
884            let request_payload = msg.get_payload_str();
885            let response_payload = format!("reply to {}", request_payload);
886            msg.set_payload_str(response_payload);
887            Ok(Handled::Publish(msg))
888        };
889
890        let route = Route::new(input_endpoint, output_endpoint).with_handler(handler);
891        route.deploy("mem_rr_test").await.unwrap();
892
893        // Create a publisher with request_reply = true
894        let publisher = MemoryPublisher::new(&MemoryConfig {
895            topic: topic.clone(),
896            capacity: Some(10),
897            request_reply: true,
898            request_timeout_ms: Some(2000),
899            ..Default::default()
900        })
901        .unwrap();
902
903        let result = publisher.send("direct request".into()).await.unwrap();
904
905        if let Sent::Response(response_msg) = result {
906            assert_eq!(response_msg.get_payload_str(), "reply to direct request");
907        } else {
908            panic!("Expected Sent::Response, got {:?}", result);
909        }
910
911        // Clean up
912        Route::stop("mem_rr_test").await;
913    }
914
915    #[tokio::test]
916    async fn test_memory_nack_requeue() {
917        let topic = format!("test_nack_requeue_{}", fast_uuid_v7::gen_id_str());
918        let config = MemoryConfig {
919            topic: topic.clone(),
920            capacity: Some(10),
921            enable_nack: true,
922            ..Default::default()
923        };
924        let mut consumer = MemoryConsumer::new(&config).unwrap();
925        let publisher = MemoryPublisher::new_local(&topic, 10);
926
927        publisher.send("to_be_nacked".into()).await.unwrap();
928
929        // 1. Receive and Nack
930        let received1 = consumer.receive().await.unwrap();
931        assert_eq!(received1.message.get_payload_str(), "to_be_nacked");
932        (received1.commit)(crate::traits::MessageDisposition::Nack)
933            .await
934            .unwrap();
935
936        // 2. Receive again (should be re-queued)
937        let received2 = tokio::time::timeout(std::time::Duration::from_secs(1), consumer.receive())
938            .await
939            .expect("Timed out waiting for re-queued message")
940            .unwrap();
941        assert_eq!(received2.message.get_payload_str(), "to_be_nacked");
942
943        // 3. Ack
944        (received2.commit)(crate::traits::MessageDisposition::Ack)
945            .await
946            .unwrap();
947
948        // 4. Verify empty
949        let result =
950            tokio::time::timeout(std::time::Duration::from_millis(100), consumer.receive()).await;
951        assert!(result.is_err(), "Channel should be empty");
952    }
953
954    #[tokio::test]
955    async fn test_memory_event_store_integration() {
956        let topic = "event_store_test";
957        // Publisher with subscribe_mode=true enables EventStore writing
958        let pub_config = MemoryConfig {
959            topic: topic.to_string(),
960            subscribe_mode: true,
961            ..Default::default()
962        };
963        let publisher = MemoryPublisher::new(&pub_config).unwrap();
964
965        // Subscriber 1
966        let mut sub1 = MemorySubscriber::new(&pub_config, "sub1").unwrap();
967        // Subscriber 2
968        let mut sub2 = MemorySubscriber::new(&pub_config, "sub2").unwrap();
969
970        publisher.send("event1".into()).await.unwrap();
971
972        let msg1 = sub1.receive().await.unwrap();
973        assert_eq!(msg1.message.get_payload_str(), "event1");
974        (msg1.commit)(MessageDisposition::Ack).await.unwrap();
975
976        let msg2 = sub2.receive().await.unwrap();
977        assert_eq!(msg2.message.get_payload_str(), "event1");
978    }
979
980    #[tokio::test]
981    async fn test_memory_no_subscribers_persistence() {
982        let topic = format!("no_subs_{}", fast_uuid_v7::gen_id_str());
983        let pub_config = MemoryConfig {
984            topic: topic.clone(),
985            subscribe_mode: true,
986            ..Default::default()
987        };
988
989        // 1. Create Publisher (Log mode)
990        let publisher = MemoryPublisher::new(&pub_config).unwrap();
991
992        // 2. Publish messages with no subscribers
993        publisher.send("msg1".into()).await.unwrap();
994        publisher.send("msg2".into()).await.unwrap();
995
996        // 3. Create Subscriber (Late joiner)
997        let sub_config = MemoryConfig {
998            topic: topic.clone(),
999            subscribe_mode: true,
1000            ..Default::default()
1001        };
1002        let mut subscriber = MemorySubscriber::new(&sub_config, "late_sub").unwrap();
1003
1004        // 4. Verify messages are received
1005        let received1 = subscriber.receive().await.unwrap();
1006        assert_eq!(received1.message.get_payload_str(), "msg1");
1007        (received1.commit)(MessageDisposition::Ack).await.unwrap();
1008
1009        let received2 = subscriber.receive().await.unwrap();
1010        assert_eq!(received2.message.get_payload_str(), "msg2");
1011        (received2.commit)(MessageDisposition::Ack).await.unwrap();
1012    }
1013
1014    #[tokio::test]
1015    async fn test_memory_mixed_mode_error() {
1016        let topic_q = format!("mixed_q_{}", fast_uuid_v7::gen_id_str());
1017        let topic_l = format!("mixed_l_{}", fast_uuid_v7::gen_id_str());
1018
1019        // Case 1: Active Queue, try to create Log Consumer
1020        let _pub_q = MemoryPublisher::new_local(&topic_q, 10); // Creates Queue backend
1021
1022        let log_conf = MemoryConfig {
1023            topic: topic_q.clone(),
1024            subscribe_mode: true,
1025            ..Default::default()
1026        };
1027        let err = MemoryConsumer::new(&log_conf);
1028        assert!(err.is_err());
1029        assert!(err
1030            .unwrap_err()
1031            .to_string()
1032            .contains("already active as a Queue"));
1033
1034        // Case 2: Active Log, try to create Queue Consumer
1035        let log_pub_conf = MemoryConfig {
1036            topic: topic_l.clone(),
1037            subscribe_mode: true,
1038            ..Default::default()
1039        };
1040        let _pub_l = MemoryPublisher::new(&log_pub_conf).unwrap(); // Creates Log backend
1041
1042        let queue_conf = MemoryConfig {
1043            topic: topic_l.clone(),
1044            subscribe_mode: false,
1045            ..Default::default()
1046        };
1047        let err = MemoryConsumer::new(&queue_conf);
1048        assert!(err.is_err());
1049        assert!(err
1050            .unwrap_err()
1051            .to_string()
1052            .contains("already active as a Subscriber Log"));
1053    }
1054
1055    #[tokio::test]
1056    async fn test_memory_publisher_mixed_mode_error() {
1057        let topic_q = format!("pub_mixed_q_{}", fast_uuid_v7::gen_id_str());
1058
1059        // 1. Create a Queue Consumer to establish the channel
1060        let _cons_q = MemoryConsumer::new_local(&topic_q, 10);
1061
1062        // 2. Try to create a Log Publisher on the same topic
1063        let log_conf = MemoryConfig {
1064            topic: topic_q.clone(),
1065            subscribe_mode: true,
1066            ..Default::default()
1067        };
1068        let err = MemoryPublisher::new(&log_conf);
1069        assert!(err.is_err());
1070        assert!(err
1071            .unwrap_err()
1072            .to_string()
1073            .contains("already active as a Queue"));
1074    }
1075
1076    #[tokio::test]
1077    async fn test_memory_publisher_adaptive_behavior() {
1078        let topic = format!("adaptive_{}", fast_uuid_v7::gen_id_str());
1079
1080        // 1. Create a Log Consumer (Subscriber) to establish the EventStore
1081        let sub_config = MemoryConfig {
1082            topic: topic.clone(),
1083            subscribe_mode: true,
1084            ..Default::default()
1085        };
1086        let mut subscriber = MemorySubscriber::new(&sub_config, "sub1").unwrap();
1087
1088        // 2. Create a Publisher WITHOUT subscribe_mode explicitly set
1089        let pub_config = MemoryConfig {
1090            topic: topic.clone(),
1091            subscribe_mode: false, // Default is false
1092            ..Default::default()
1093        };
1094        // This should succeed and adapt to Log mode because the store exists
1095        let publisher = MemoryPublisher::new(&pub_config).unwrap();
1096
1097        // 3. Verify it publishes to the store (subscriber receives it)
1098        publisher.send("adaptive_msg".into()).await.unwrap();
1099
1100        let received = subscriber.receive().await.unwrap();
1101        assert_eq!(received.message.get_payload_str(), "adaptive_msg");
1102    }
1103}