1use crate::canonical_message::tracing_support::LazyMessageIds;
6use crate::event_store::{
7 event_store_exists, get_or_create_event_store, EventStore, EventStoreConsumer,
8};
9use crate::models::MemoryConfig;
10use crate::traits::{
11 BatchCommitFunc, BoxFuture, ConsumerError, EndpointStatus, MessageConsumer, MessageDisposition,
12 MessagePublisher, PublisherError, Received, ReceivedBatch, Sent, SentBatch,
13};
14use crate::CanonicalMessage;
15use anyhow::anyhow;
16use async_channel::{bounded, Receiver, Sender};
17use async_trait::async_trait;
18use once_cell::sync::Lazy;
19use std::any::Any;
20use std::collections::HashMap;
21use std::sync::{Arc, Mutex};
22use tokio::sync::oneshot;
23use tracing::{info, trace, warn};
24
25static RUNTIME_MEMORY_CHANNELS: Lazy<Mutex<HashMap<String, MemoryChannel>>> =
28 Lazy::new(|| Mutex::new(HashMap::new()));
29
30static RUNTIME_RESPONSE_CHANNELS: Lazy<Mutex<HashMap<String, MemoryResponseChannel>>> =
32 Lazy::new(|| Mutex::new(HashMap::new()));
33
34#[derive(Debug, Clone)]
39pub struct MemoryChannel {
40 pub sender: Sender<Vec<CanonicalMessage>>,
41 pub receiver: Receiver<Vec<CanonicalMessage>>,
42}
43
44impl MemoryChannel {
45 pub fn new(capacity: usize) -> Self {
47 let (sender, receiver) = bounded(capacity);
48 Self { sender, receiver }
49 }
50
51 pub async fn send_message(&self, message: CanonicalMessage) -> anyhow::Result<()> {
53 self.sender.send(vec![message]).await?;
54 tracing::debug!("Message sent to memory {} channel", self.sender.len());
55 Ok(())
56 }
57
58 pub async fn fill_messages(&self, messages: Vec<CanonicalMessage>) -> anyhow::Result<()> {
60 self.sender
62 .send(messages)
63 .await
64 .map_err(|e| anyhow!("Memory channel was closed while filling messages: {}", e))?;
65 Ok(())
66 }
67
68 pub fn close(&self) {
70 self.sender.close();
71 }
72
73 pub fn drain_messages(&self) -> Vec<CanonicalMessage> {
75 let mut messages = Vec::new();
76 while let Ok(batch) = self.receiver.try_recv() {
78 messages.extend(batch);
79 }
80 messages
81 }
82
83 pub fn len(&self) -> usize {
85 self.receiver.len()
86 }
87
88 pub fn is_empty(&self) -> bool {
90 self.receiver.is_empty()
91 }
92}
93
94#[derive(Debug, Clone)]
96pub struct MemoryResponseChannel {
97 pub sender: Sender<CanonicalMessage>,
98 pub receiver: Receiver<CanonicalMessage>,
99 waiters: Arc<tokio::sync::Mutex<HashMap<String, oneshot::Sender<CanonicalMessage>>>>,
100}
101
102impl MemoryResponseChannel {
103 pub fn new(capacity: usize) -> Self {
104 let (sender, receiver) = bounded(capacity);
105 Self {
106 sender,
107 receiver,
108 waiters: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
109 }
110 }
111
112 pub fn close(&self) {
113 self.sender.close();
114 }
115
116 pub fn len(&self) -> usize {
117 self.receiver.len()
118 }
119
120 pub fn is_empty(&self) -> bool {
121 self.receiver.is_empty()
122 }
123
124 pub async fn wait_for_response(&self) -> anyhow::Result<CanonicalMessage> {
125 self.receiver
126 .recv()
127 .await
128 .map_err(|e| anyhow!("Error receiving response: {}", e))
129 }
130
131 pub async fn register_waiter(
132 &self,
133 correlation_id: &str,
134 sender: oneshot::Sender<CanonicalMessage>,
135 ) -> anyhow::Result<()> {
136 let mut waiters = self.waiters.lock().await;
137 if waiters.contains_key(correlation_id) {
138 return Err(anyhow!(
139 "Correlation ID {} already registered",
140 correlation_id
141 ));
142 }
143 waiters.insert(correlation_id.to_string(), sender);
144 Ok(())
145 }
146
147 pub async fn remove_waiter(
148 &self,
149 correlation_id: &str,
150 ) -> Option<oneshot::Sender<CanonicalMessage>> {
151 self.waiters.lock().await.remove(correlation_id)
152 }
153}
154
155pub fn get_or_create_channel(config: &MemoryConfig) -> MemoryChannel {
157 let mut channels = RUNTIME_MEMORY_CHANNELS.lock().unwrap();
158 channels
159 .entry(config.topic.clone()) .or_insert_with(|| {
161 info!(topic = %config.topic, "Creating new runtime memory channel");
162 MemoryChannel::new(config.capacity.unwrap_or(100))
163 })
164 .clone()
165}
166
167pub fn get_or_create_response_channel(topic: &str) -> MemoryResponseChannel {
169 let mut channels = RUNTIME_RESPONSE_CHANNELS.lock().unwrap();
170 channels
171 .entry(topic.to_string())
172 .or_insert_with(|| {
173 info!(topic = %topic, "Creating new runtime memory response channel");
174 MemoryResponseChannel::new(100)
175 })
176 .clone()
177}
178
179fn memory_channel_exists(topic: &str) -> bool {
180 let channels = RUNTIME_MEMORY_CHANNELS.lock().unwrap();
181 channels.contains_key(topic)
182}
183
184#[derive(Debug, Clone)]
186pub struct MemoryPublisher {
187 topic: String,
188 backend: PublisherBackend,
189 request_reply: bool,
190 request_timeout: std::time::Duration,
191}
192
193#[derive(Debug, Clone)]
194enum PublisherBackend {
195 Queue(Sender<Vec<CanonicalMessage>>),
196 Log(Arc<EventStore>),
197}
198
199impl MemoryPublisher {
200 pub fn new(config: &MemoryConfig) -> anyhow::Result<Self> {
201 let channel_exists = memory_channel_exists(&config.topic);
202 let store_exists = event_store_exists(&config.topic);
203
204 let backend = if config.subscribe_mode {
205 if channel_exists {
206 return Err(anyhow!("Topic '{}' is already active as a Queue (MemoryChannel), but Subscriber mode (EventStore) was requested.", config.topic));
207 }
208 let store = get_or_create_event_store(&config.topic);
209 PublisherBackend::Log(store)
210 } else if store_exists {
211 tracing::debug!(topic = %config.topic, "Adapting publisher to Log mode due to existing EventStore");
214 let store = get_or_create_event_store(&config.topic);
215 PublisherBackend::Log(store)
216 } else {
217 let channel = get_or_create_channel(config);
218 PublisherBackend::Queue(channel.sender)
219 };
220
221 Ok(Self {
222 topic: config.topic.clone(),
223 backend,
224 request_reply: config.request_reply,
225 request_timeout: std::time::Duration::from_millis(
226 config.request_timeout_ms.unwrap_or(30000),
227 ),
228 })
229 }
230
231 pub fn new_local(topic: &str, capacity: usize) -> Self {
236 Self::new(&MemoryConfig {
237 topic: topic.to_string(),
238 capacity: Some(capacity),
239 ..Default::default()
240 })
241 .expect("Failed to create local memory publisher")
242 }
243
244 pub fn channel(&self) -> MemoryChannel {
247 get_or_create_channel(&MemoryConfig {
248 topic: self.topic.clone(),
249 capacity: None,
250 ..Default::default()
251 })
252 }
253}
254
255#[async_trait]
256impl MessagePublisher for MemoryPublisher {
257 async fn send(&self, mut message: CanonicalMessage) -> Result<Sent, PublisherError> {
258 match &self.backend {
259 PublisherBackend::Log(store) => {
260 store.append(message).await;
261 Ok(Sent::Ack)
262 }
263 PublisherBackend::Queue(sender) => {
264 if self.request_reply {
265 let cid = message
266 .metadata
267 .entry("correlation_id".to_string())
268 .or_insert_with(fast_uuid_v7::gen_id_string)
269 .clone();
270
271 let (tx, rx) = oneshot::channel();
272
273 let response_channel = get_or_create_response_channel(&self.topic);
275 response_channel
276 .register_waiter(&cid, tx)
277 .await
278 .map_err(PublisherError::NonRetryable)?;
279
280 if let Err(e) = sender.send(vec![message]).await {
283 response_channel.remove_waiter(&cid).await;
284 return Err(anyhow!("Failed to send to memory channel: {}", e).into());
285 }
286
287 let response = match tokio::time::timeout(self.request_timeout, rx).await {
289 Ok(Ok(resp)) => resp,
290 Ok(Err(e)) => {
291 response_channel.remove_waiter(&cid).await;
292 return Err(anyhow!(
293 "Failed to receive response for correlation_id {}: {}",
294 cid,
295 e
296 )
297 .into());
298 }
299 Err(_) => {
300 response_channel.remove_waiter(&cid).await;
301 return Err(PublisherError::Retryable(anyhow!(
302 "Request timed out waiting for response for correlation_id {}",
303 cid
304 )));
305 }
306 };
307
308 Ok(Sent::Response(response))
309 } else {
310 self.send_batch(vec![message]).await?;
311 Ok(Sent::Ack)
312 }
313 }
314 }
315 }
316
317 async fn send_batch(
318 &self,
319 messages: Vec<CanonicalMessage>,
320 ) -> Result<SentBatch, PublisherError> {
321 match &self.backend {
322 PublisherBackend::Log(store) => {
323 trace!(
324 topic = %self.topic,
325 message_ids = ?LazyMessageIds(&messages),
326 "Appending batch to event store"
327 );
328 store.append_batch(messages).await;
329 Ok(SentBatch::Ack)
330 }
331 PublisherBackend::Queue(sender) => {
332 trace!(
333 topic = %self.topic,
334 message_ids = ?LazyMessageIds(&messages),
335 "Sending batch to memory channel. Current batch count: {}",
336 sender.len()
337 );
338 sender
339 .send(messages)
340 .await
341 .map_err(|e| anyhow!("Failed to send to memory channel: {}", e))?;
342 Ok(SentBatch::Ack)
343 }
344 }
345 }
346
347 async fn status(&self) -> EndpointStatus {
348 match &self.backend {
349 PublisherBackend::Queue(sender) => EndpointStatus {
350 healthy: !sender.is_closed(),
351 target: self.topic.clone(),
352 pending: Some(sender.len()),
353 capacity: Some(sender.capacity().unwrap_or(0)),
354 ..Default::default()
355 },
356 PublisherBackend::Log(_store) => EndpointStatus {
357 healthy: true,
358 target: self.topic.clone(),
359 details: serde_json::json!({
360 "mode": "event_store"
361 }),
362 ..Default::default()
363 },
364 }
365 }
366
367 fn as_any(&self) -> &dyn Any {
368 self
369 }
370}
371
372#[derive(Debug)]
374pub struct MemoryQueueConsumer {
375 topic: String,
376 receiver: Receiver<Vec<CanonicalMessage>>,
377 buffer: Vec<CanonicalMessage>,
379 enable_nack: bool,
380}
381
382#[derive(Debug)]
384pub enum MemoryConsumer {
385 Queue(MemoryQueueConsumer),
386 Log {
387 consumer: EventStoreConsumer,
388 topic: String,
389 },
390}
391
392impl MemoryConsumer {
393 pub fn new(config: &MemoryConfig) -> anyhow::Result<Self> {
394 let channel_exists = memory_channel_exists(&config.topic);
395 let store_exists = event_store_exists(&config.topic);
396
397 if config.subscribe_mode {
398 if channel_exists {
399 return Err(anyhow!("Topic '{}' is already active as a Queue (MemoryChannel), but Subscriber mode (EventStore) was requested.", config.topic));
400 }
401 let store = get_or_create_event_store(&config.topic);
402 let subscriber_id = format!("{}-consumer", config.topic);
406 info!(topic = %config.topic, subscriber_id = %subscriber_id, "Memory consumer (Log mode) connected");
407 let consumer = store.consumer(subscriber_id);
408 Ok(Self::Log {
409 consumer,
410 topic: config.topic.clone(),
411 })
412 } else {
413 if store_exists {
414 return Err(anyhow!("Topic '{}' is already active as a Subscriber Log (EventStore), but Queue mode (MemoryChannel) was requested.", config.topic));
419 }
420 let queue = MemoryQueueConsumer::new(config)?;
421 Ok(Self::Queue(queue))
422 }
423 }
424}
425
426impl Drop for MemoryQueueConsumer {
427 fn drop(&mut self) {
428 if !self.buffer.is_empty() {
429 let mut messages = std::mem::take(&mut self.buffer);
430 messages.reverse();
431
432 let channel = get_or_create_channel(&MemoryConfig {
433 topic: self.topic.clone(),
434 capacity: None,
435 ..Default::default()
436 });
437
438 match channel.sender.try_send(messages) {
439 Ok(_) => {
440 info!(topic = %self.topic, "Requeued buffered messages on consumer drop");
441 }
442 Err(e) => {
443 let msgs = match e {
444 async_channel::TrySendError::Full(m) => m,
445 async_channel::TrySendError::Closed(m) => m,
446 };
447 warn!(topic = %self.topic, "Channel full on drop, spawning async requeue");
448 let sender = channel.sender.clone();
449 if let Ok(handle) = tokio::runtime::Handle::try_current() {
450 handle.spawn(async move {
451 if let Err(e) = sender.send(msgs).await {
452 tracing::error!(
453 "Failed to requeue buffered messages in background: {}",
454 e
455 );
456 }
457 });
458 } else {
459 tracing::error!(topic = %self.topic, "No active runtime found, could not requeue buffered messages on consumer drop");
460 }
461 }
462 }
463 }
464 }
465}
466
467impl MemoryQueueConsumer {
468 pub fn new(config: &MemoryConfig) -> anyhow::Result<Self> {
469 let channel = get_or_create_channel(config);
470 let buffer = if let Some(capacity) = config.capacity {
471 Vec::with_capacity(capacity)
472 } else {
473 Vec::new()
474 };
475 Ok(Self {
476 topic: config.topic.clone(),
477 receiver: channel.receiver.clone(),
478 buffer,
479 enable_nack: config.enable_nack,
480 })
481 }
482
483 async fn get_buffered_msgs(
484 &mut self,
485 max_messages: usize,
486 ) -> Result<Vec<CanonicalMessage>, ConsumerError> {
487 if self.buffer.is_empty() {
489 self.buffer = match self.receiver.recv().await {
491 Ok(batch) => batch,
492 Err(_) => return Err(ConsumerError::EndOfStream),
493 };
494 self.buffer.reverse();
496 }
497
498 let num_to_take = self.buffer.len().min(max_messages);
500 let split_at = self.buffer.len() - num_to_take;
501
502 let mut messages = self.buffer.split_off(split_at);
505 messages.reverse(); Ok(messages)
507 }
508}
509
510struct RequeueGuard {
511 topic: String,
512 messages: Vec<CanonicalMessage>,
513}
514
515impl Drop for RequeueGuard {
516 fn drop(&mut self) {
517 if !self.messages.is_empty() {
518 let topic = self.topic.clone();
519 let count = self.messages.len();
520 let messages = std::mem::take(&mut self.messages);
521
522 let channel = get_or_create_channel(&MemoryConfig {
523 topic: topic.clone(),
524 capacity: None,
525 ..Default::default()
526 });
527
528 match channel.sender.try_send(messages) {
529 Ok(_) => {
530 tracing::info!(topic = %topic, count, "Requeued dropped batch via RequeueGuard");
531 }
532 Err(e) => {
533 let msgs = match e {
534 async_channel::TrySendError::Full(m) => m,
535 async_channel::TrySendError::Closed(m) => m,
536 };
537 tracing::warn!(topic = %topic, count, "Failed to requeue dropped batch (channel full/closed), spawning retry");
538 let sender = channel.sender.clone();
539 if let Ok(handle) = tokio::runtime::Handle::try_current() {
540 handle.spawn(async move {
541 if let Err(e) = sender.send(msgs).await {
542 tracing::error!(
543 "Failed to requeue dropped batch in background: {}",
544 e
545 );
546 }
547 });
548 } else {
549 tracing::error!(topic = %topic, count, "No active runtime found, could not requeue dropped batch via RequeueGuard");
550 }
551 }
552 }
553 }
554 }
555}
556
557#[async_trait]
558impl MessageConsumer for MemoryQueueConsumer {
559 async fn receive_batch(&mut self, max_messages: usize) -> Result<ReceivedBatch, ConsumerError> {
560 let mut messages = self.get_buffered_msgs(max_messages).await?;
563 while messages.len() < max_messages / 2 {
564 if let Ok(mut next_batch) = self.receiver.try_recv() {
565 if next_batch.len() + messages.len() > max_messages {
566 let needed = max_messages - messages.len();
567 let mut to_buffer = next_batch.split_off(needed);
568 messages.append(&mut next_batch);
569 self.buffer.append(&mut to_buffer);
570 self.buffer.reverse();
571 break;
572 } else {
573 messages.append(&mut next_batch);
574 }
575 } else {
576 break;
577 }
578 }
579 trace!(count = messages.len(), topic = %self.topic, message_ids = ?LazyMessageIds(&messages), "Received batch of memory messages");
580 if messages.is_empty() {
581 return Ok(ReceivedBatch {
582 messages: Vec::new(),
583 commit: Box::new(|_| {
584 Box::pin(async move { Ok(()) }) as BoxFuture<'static, anyhow::Result<()>>
585 }),
586 });
587 }
588
589 let topic = self.topic.clone();
590 let expected_count = messages.len();
591 let correlation_ids: Vec<Option<String>> = messages
592 .iter()
593 .map(|m| m.metadata.get("correlation_id").cloned())
594 .collect();
595
596 let mut guard = if self.enable_nack {
598 Some(RequeueGuard {
599 topic: self.topic.clone(),
600 messages: messages.clone(),
601 })
602 } else {
603 None
604 };
605
606 let commit = Box::new(move |dispositions: Vec<MessageDisposition>| {
607 Box::pin(async move {
608 if dispositions.len() != expected_count {
609 return Err(anyhow::anyhow!(
610 "Memory batch commit received mismatched disposition count: expected {}, got {}",
611 expected_count,
612 dispositions.len()
613 ));
614 }
615
616 let messages_for_retry = if let Some(g) = &guard {
618 g.messages.clone()
619 } else {
620 Vec::new()
621 };
622
623 let response_channel = get_or_create_response_channel(&topic);
624 let mut to_requeue = Vec::new();
625
626 for (i, disposition) in dispositions.into_iter().enumerate() {
627 match disposition {
628 MessageDisposition::Reply(resp) => {
629 handle_memory_reply(resp, i, &correlation_ids, &response_channel).await;
630 }
631 MessageDisposition::Nack => {
632 if let Some(msg) = messages_for_retry.get(i) {
633 warn!("Requeueing nacked message {}", i);
634 to_requeue.push(msg.clone());
635 } else {
636 warn!("Nack for index {} but no message in retry buffer!", i);
637 }
638 }
639 MessageDisposition::Ack => {}
640 }
641 }
642
643 if !to_requeue.is_empty() {
644 let main_channel = get_or_create_channel(&MemoryConfig {
645 topic: topic.to_string(),
646 capacity: None,
647 ..Default::default()
648 });
649 if main_channel.sender.send(to_requeue).await.is_err() {
650 tracing::error!("Failed to re-queue NACKed messages to memory channel as it was closed.");
651 }
652 }
653
654 if let Some(g) = &mut guard {
656 std::mem::take(&mut g.messages);
657 }
658
659 Ok(())
660 }) as BoxFuture<'static, anyhow::Result<()>>
661 }) as BatchCommitFunc;
662 Ok(ReceivedBatch { messages, commit })
663 }
664
665 async fn status(&self) -> EndpointStatus {
666 let pending = self.receiver.len();
667 let capacity = self.receiver.capacity().unwrap_or(0);
668 EndpointStatus {
669 healthy: !self.receiver.is_closed(),
670 target: self.topic.clone(),
671 pending: Some(pending),
672 capacity: Some(capacity),
673 ..Default::default()
674 }
675 }
676
677 fn as_any(&self) -> &dyn Any {
678 self
679 }
680}
681
682async fn handle_memory_reply(
683 mut resp: CanonicalMessage,
684 index: usize,
685 correlation_ids: &[Option<String>],
686 response_channel: &MemoryResponseChannel,
687) {
688 if !resp.metadata.contains_key("correlation_id") {
689 if let Some(Some(cid)) = correlation_ids.get(index) {
690 resp.metadata
691 .insert("correlation_id".to_string(), cid.clone());
692 }
693 }
694
695 if let Some(cid) = resp.metadata.get("correlation_id") {
696 if let Some(tx) = response_channel.remove_waiter(cid).await {
697 let _ = tx.send(resp);
698 return;
699 }
700 }
701 let _ = response_channel.sender.send(resp).await;
702}
703
704#[async_trait]
705impl MessageConsumer for MemoryConsumer {
706 async fn receive_batch(&mut self, max_messages: usize) -> Result<ReceivedBatch, ConsumerError> {
707 match self {
708 Self::Queue(q) => q.receive_batch(max_messages).await,
709 Self::Log { consumer, .. } => consumer.receive_batch(max_messages).await,
710 }
711 }
712
713 async fn status(&self) -> EndpointStatus {
714 match self {
715 Self::Queue(q) => q.status().await,
716 Self::Log { consumer, .. } => consumer.status().await,
717 }
718 }
719
720 fn as_any(&self) -> &dyn Any {
721 self
722 }
723}
724
725impl MemoryConsumer {
726 pub fn new_local(topic: &str, capacity: usize) -> Self {
727 Self::new(&MemoryConfig {
728 topic: topic.to_string(),
729 capacity: Some(capacity),
730 ..Default::default()
731 })
732 .expect("Failed to create local memory consumer")
733 }
734 pub fn channel(&self) -> MemoryChannel {
735 let topic = match self {
736 Self::Queue(q) => &q.topic,
737 Self::Log { topic, .. } => topic,
738 };
739 get_or_create_channel(&MemoryConfig {
740 topic: topic.clone(),
741 ..Default::default()
742 })
743 }
744}
745
746pub struct MemorySubscriber {
747 consumer: MemoryConsumer,
748}
749
750impl MemorySubscriber {
751 pub fn new(config: &MemoryConfig, id: &str) -> anyhow::Result<Self> {
752 let mut sub_config = config.clone();
753 let consumer = if config.subscribe_mode {
756 let store = get_or_create_event_store(&config.topic);
757 MemoryConsumer::Log {
758 consumer: store.consumer(id.to_string()),
759 topic: config.topic.clone(),
760 }
761 } else {
762 sub_config.topic = format!("{}-{}", config.topic, id);
763 MemoryConsumer::new(&sub_config)?
764 };
765 Ok(Self { consumer })
766 }
767}
768
769#[async_trait]
770impl MessageConsumer for MemorySubscriber {
771 async fn receive_batch(&mut self, max_messages: usize) -> Result<ReceivedBatch, ConsumerError> {
772 self.consumer.receive_batch(max_messages).await
773 }
774
775 async fn receive(&mut self) -> Result<Received, ConsumerError> {
776 self.consumer.receive().await
777 }
778
779 fn as_any(&self) -> &dyn Any {
780 self
781 }
782}
783
784#[cfg(test)]
785mod tests {
786 use super::*;
787 use crate::models::{Endpoint, Route};
788 use crate::traits::Handled;
789 use crate::{msg, CanonicalMessage};
790 use serde_json::json;
791 use tokio::time::sleep;
792
793 #[tokio::test]
794 async fn test_memory_channel_integration() {
795 let mut consumer = MemoryConsumer::new_local("test-mem1", 10);
796 let publisher = MemoryPublisher::new_local("test-mem1", 10);
797
798 let msg = msg!(json!({"hello": "memory"}));
799
800 publisher.send(msg.clone()).await.unwrap();
802
803 sleep(std::time::Duration::from_millis(10)).await;
804 let received = consumer.receive().await.unwrap();
806 let _ = (received.commit)(MessageDisposition::Ack).await;
807 assert_eq!(received.message.payload, msg.payload);
808 assert_eq!(consumer.channel().len(), 0);
809 }
810
811 #[tokio::test]
812 async fn test_memory_publisher_and_consumer_integration() {
813 let mut consumer = MemoryConsumer::new_local("test-mem2", 10);
814 let publisher = MemoryPublisher::new_local("test-mem2", 10);
815
816 let msg1 = msg!(json!({"message": "one"}));
817 let msg2 = msg!(json!({"message": "two"}));
818 let msg3 = msg!(json!({"message": "three"}));
819
820 publisher
822 .send_batch(vec![msg1.clone(), msg2.clone()])
823 .await
824 .unwrap();
825 publisher.send(msg3.clone()).await.unwrap();
826
827 assert_eq!(publisher.channel().len(), 2);
829
830 let received1 = consumer.receive().await.unwrap();
832 let _ = (received1.commit)(MessageDisposition::Ack).await;
833 assert_eq!(received1.message.payload, msg1.payload);
834
835 let batch2 = consumer.receive_batch(1).await.unwrap();
836 let (received_msg2, commit2) = (batch2.messages, batch2.commit);
837 let _ = commit2(vec![MessageDisposition::Ack; received_msg2.len()]).await;
838 assert_eq!(received_msg2.len(), 1);
839 assert_eq!(received_msg2.first().unwrap().payload, msg2.payload);
840 let batch3 = consumer.receive_batch(2).await.unwrap();
841 let (received_msg3, commit3) = (batch3.messages, batch3.commit);
842 let _ = commit3(vec![MessageDisposition::Ack; received_msg3.len()]).await;
843 assert_eq!(received_msg3.first().unwrap().payload, msg3.payload);
844
845 assert_eq!(publisher.channel().len(), 0);
847
848 }
852
853 #[tokio::test]
854 async fn test_memory_subscriber_structure() {
855 let cfg = MemoryConfig {
856 topic: "base_topic".to_string(),
857 capacity: Some(10),
858 ..Default::default()
859 };
860 let subscriber_id = "sub1";
861 let mut subscriber = MemorySubscriber::new(&cfg, subscriber_id).unwrap();
862
863 let pub_cfg = MemoryConfig {
866 topic: format!("base_topic-{}", subscriber_id),
867 capacity: Some(10),
868 ..Default::default()
869 };
870 let publisher = MemoryPublisher::new(&pub_cfg).unwrap();
871
872 publisher.send("hello subscriber".into()).await.unwrap();
873
874 let received = subscriber.receive().await.unwrap();
875 assert_eq!(received.message.get_payload_str(), "hello subscriber");
876 }
877
878 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
879 async fn test_memory_request_reply_mode() {
880 let topic = format!("mem_rr_topic_{}", fast_uuid_v7::gen_id_str());
881 let input_endpoint = Endpoint::new_memory(&topic, 10);
882 let output_endpoint = Endpoint::new_response();
883 let handler = |mut msg: CanonicalMessage| async move {
884 let request_payload = msg.get_payload_str();
885 let response_payload = format!("reply to {}", request_payload);
886 msg.set_payload_str(response_payload);
887 Ok(Handled::Publish(msg))
888 };
889
890 let route = Route::new(input_endpoint, output_endpoint).with_handler(handler);
891 route.deploy("mem_rr_test").await.unwrap();
892
893 let publisher = MemoryPublisher::new(&MemoryConfig {
895 topic: topic.clone(),
896 capacity: Some(10),
897 request_reply: true,
898 request_timeout_ms: Some(2000),
899 ..Default::default()
900 })
901 .unwrap();
902
903 let result = publisher.send("direct request".into()).await.unwrap();
904
905 if let Sent::Response(response_msg) = result {
906 assert_eq!(response_msg.get_payload_str(), "reply to direct request");
907 } else {
908 panic!("Expected Sent::Response, got {:?}", result);
909 }
910
911 Route::stop("mem_rr_test").await;
913 }
914
915 #[tokio::test]
916 async fn test_memory_nack_requeue() {
917 let topic = format!("test_nack_requeue_{}", fast_uuid_v7::gen_id_str());
918 let config = MemoryConfig {
919 topic: topic.clone(),
920 capacity: Some(10),
921 enable_nack: true,
922 ..Default::default()
923 };
924 let mut consumer = MemoryConsumer::new(&config).unwrap();
925 let publisher = MemoryPublisher::new_local(&topic, 10);
926
927 publisher.send("to_be_nacked".into()).await.unwrap();
928
929 let received1 = consumer.receive().await.unwrap();
931 assert_eq!(received1.message.get_payload_str(), "to_be_nacked");
932 (received1.commit)(crate::traits::MessageDisposition::Nack)
933 .await
934 .unwrap();
935
936 let received2 = tokio::time::timeout(std::time::Duration::from_secs(1), consumer.receive())
938 .await
939 .expect("Timed out waiting for re-queued message")
940 .unwrap();
941 assert_eq!(received2.message.get_payload_str(), "to_be_nacked");
942
943 (received2.commit)(crate::traits::MessageDisposition::Ack)
945 .await
946 .unwrap();
947
948 let result =
950 tokio::time::timeout(std::time::Duration::from_millis(100), consumer.receive()).await;
951 assert!(result.is_err(), "Channel should be empty");
952 }
953
954 #[tokio::test]
955 async fn test_memory_event_store_integration() {
956 let topic = "event_store_test";
957 let pub_config = MemoryConfig {
959 topic: topic.to_string(),
960 subscribe_mode: true,
961 ..Default::default()
962 };
963 let publisher = MemoryPublisher::new(&pub_config).unwrap();
964
965 let mut sub1 = MemorySubscriber::new(&pub_config, "sub1").unwrap();
967 let mut sub2 = MemorySubscriber::new(&pub_config, "sub2").unwrap();
969
970 publisher.send("event1".into()).await.unwrap();
971
972 let msg1 = sub1.receive().await.unwrap();
973 assert_eq!(msg1.message.get_payload_str(), "event1");
974 (msg1.commit)(MessageDisposition::Ack).await.unwrap();
975
976 let msg2 = sub2.receive().await.unwrap();
977 assert_eq!(msg2.message.get_payload_str(), "event1");
978 }
979
980 #[tokio::test]
981 async fn test_memory_no_subscribers_persistence() {
982 let topic = format!("no_subs_{}", fast_uuid_v7::gen_id_str());
983 let pub_config = MemoryConfig {
984 topic: topic.clone(),
985 subscribe_mode: true,
986 ..Default::default()
987 };
988
989 let publisher = MemoryPublisher::new(&pub_config).unwrap();
991
992 publisher.send("msg1".into()).await.unwrap();
994 publisher.send("msg2".into()).await.unwrap();
995
996 let sub_config = MemoryConfig {
998 topic: topic.clone(),
999 subscribe_mode: true,
1000 ..Default::default()
1001 };
1002 let mut subscriber = MemorySubscriber::new(&sub_config, "late_sub").unwrap();
1003
1004 let received1 = subscriber.receive().await.unwrap();
1006 assert_eq!(received1.message.get_payload_str(), "msg1");
1007 (received1.commit)(MessageDisposition::Ack).await.unwrap();
1008
1009 let received2 = subscriber.receive().await.unwrap();
1010 assert_eq!(received2.message.get_payload_str(), "msg2");
1011 (received2.commit)(MessageDisposition::Ack).await.unwrap();
1012 }
1013
1014 #[tokio::test]
1015 async fn test_memory_mixed_mode_error() {
1016 let topic_q = format!("mixed_q_{}", fast_uuid_v7::gen_id_str());
1017 let topic_l = format!("mixed_l_{}", fast_uuid_v7::gen_id_str());
1018
1019 let _pub_q = MemoryPublisher::new_local(&topic_q, 10); let log_conf = MemoryConfig {
1023 topic: topic_q.clone(),
1024 subscribe_mode: true,
1025 ..Default::default()
1026 };
1027 let err = MemoryConsumer::new(&log_conf);
1028 assert!(err.is_err());
1029 assert!(err
1030 .unwrap_err()
1031 .to_string()
1032 .contains("already active as a Queue"));
1033
1034 let log_pub_conf = MemoryConfig {
1036 topic: topic_l.clone(),
1037 subscribe_mode: true,
1038 ..Default::default()
1039 };
1040 let _pub_l = MemoryPublisher::new(&log_pub_conf).unwrap(); let queue_conf = MemoryConfig {
1043 topic: topic_l.clone(),
1044 subscribe_mode: false,
1045 ..Default::default()
1046 };
1047 let err = MemoryConsumer::new(&queue_conf);
1048 assert!(err.is_err());
1049 assert!(err
1050 .unwrap_err()
1051 .to_string()
1052 .contains("already active as a Subscriber Log"));
1053 }
1054
1055 #[tokio::test]
1056 async fn test_memory_publisher_mixed_mode_error() {
1057 let topic_q = format!("pub_mixed_q_{}", fast_uuid_v7::gen_id_str());
1058
1059 let _cons_q = MemoryConsumer::new_local(&topic_q, 10);
1061
1062 let log_conf = MemoryConfig {
1064 topic: topic_q.clone(),
1065 subscribe_mode: true,
1066 ..Default::default()
1067 };
1068 let err = MemoryPublisher::new(&log_conf);
1069 assert!(err.is_err());
1070 assert!(err
1071 .unwrap_err()
1072 .to_string()
1073 .contains("already active as a Queue"));
1074 }
1075
1076 #[tokio::test]
1077 async fn test_memory_publisher_adaptive_behavior() {
1078 let topic = format!("adaptive_{}", fast_uuid_v7::gen_id_str());
1079
1080 let sub_config = MemoryConfig {
1082 topic: topic.clone(),
1083 subscribe_mode: true,
1084 ..Default::default()
1085 };
1086 let mut subscriber = MemorySubscriber::new(&sub_config, "sub1").unwrap();
1087
1088 let pub_config = MemoryConfig {
1090 topic: topic.clone(),
1091 subscribe_mode: false, ..Default::default()
1093 };
1094 let publisher = MemoryPublisher::new(&pub_config).unwrap();
1096
1097 publisher.send("adaptive_msg".into()).await.unwrap();
1099
1100 let received = subscriber.receive().await.unwrap();
1101 assert_eq!(received.message.get_payload_str(), "adaptive_msg");
1102 }
1103}