1use crate::canonical_message::tracing_support::LazyMessageIds;
6use crate::event_store::{
7 event_store_exists, get_or_create_event_store, EventStore, EventStoreConsumer,
8};
9use crate::models::MemoryConfig;
10use crate::traits::{
11 BatchCommitFunc, BoxFuture, ConsumerError, MessageConsumer, MessageDisposition,
12 MessagePublisher, PublisherError, Received, ReceivedBatch, Sent, SentBatch,
13};
14use crate::CanonicalMessage;
15use anyhow::anyhow;
16use async_channel::{bounded, Receiver, Sender};
17use async_trait::async_trait;
18use once_cell::sync::Lazy;
19use std::any::Any;
20use std::collections::HashMap;
21use std::sync::{Arc, Mutex};
22use tokio::sync::oneshot;
23use tracing::{info, trace};
24
25static RUNTIME_MEMORY_CHANNELS: Lazy<Mutex<HashMap<String, MemoryChannel>>> =
28 Lazy::new(|| Mutex::new(HashMap::new()));
29
30static RUNTIME_RESPONSE_CHANNELS: Lazy<Mutex<HashMap<String, MemoryResponseChannel>>> =
32 Lazy::new(|| Mutex::new(HashMap::new()));
33
34#[derive(Debug, Clone)]
39pub struct MemoryChannel {
40 pub sender: Sender<Vec<CanonicalMessage>>,
41 pub receiver: Receiver<Vec<CanonicalMessage>>,
42}
43
44impl MemoryChannel {
45 pub fn new(capacity: usize) -> Self {
47 let (sender, receiver) = bounded(capacity);
48 Self { sender, receiver }
49 }
50
51 pub async fn send_message(&self, message: CanonicalMessage) -> anyhow::Result<()> {
53 self.sender.send(vec![message]).await?;
54 tracing::debug!("Message sent to memory {} channel", self.sender.len());
55 Ok(())
56 }
57
58 pub async fn fill_messages(&self, messages: Vec<CanonicalMessage>) -> anyhow::Result<()> {
60 self.sender
62 .send(messages)
63 .await
64 .map_err(|e| anyhow!("Memory channel was closed while filling messages: {}", e))?;
65 Ok(())
66 }
67
68 pub fn close(&self) {
70 self.sender.close();
71 }
72
73 pub fn drain_messages(&self) -> Vec<CanonicalMessage> {
75 let mut messages = Vec::new();
76 while let Ok(batch) = self.receiver.try_recv() {
78 messages.extend(batch);
79 }
80 messages
81 }
82
83 pub fn len(&self) -> usize {
85 self.receiver.len()
86 }
87
88 pub fn is_empty(&self) -> bool {
90 self.receiver.is_empty()
91 }
92}
93
94#[derive(Debug, Clone)]
96pub struct MemoryResponseChannel {
97 pub sender: Sender<CanonicalMessage>,
98 pub receiver: Receiver<CanonicalMessage>,
99 waiters: Arc<tokio::sync::Mutex<HashMap<String, oneshot::Sender<CanonicalMessage>>>>,
100}
101
102impl MemoryResponseChannel {
103 pub fn new(capacity: usize) -> Self {
104 let (sender, receiver) = bounded(capacity);
105 Self {
106 sender,
107 receiver,
108 waiters: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
109 }
110 }
111
112 pub fn close(&self) {
113 self.sender.close();
114 }
115
116 pub fn len(&self) -> usize {
117 self.receiver.len()
118 }
119
120 pub fn is_empty(&self) -> bool {
121 self.receiver.is_empty()
122 }
123
124 pub async fn wait_for_response(&self) -> anyhow::Result<CanonicalMessage> {
125 self.receiver
126 .recv()
127 .await
128 .map_err(|e| anyhow!("Error receiving response: {}", e))
129 }
130
131 pub async fn register_waiter(
132 &self,
133 correlation_id: &str,
134 sender: oneshot::Sender<CanonicalMessage>,
135 ) -> anyhow::Result<()> {
136 let mut waiters = self.waiters.lock().await;
137 if waiters.contains_key(correlation_id) {
138 return Err(anyhow!(
139 "Correlation ID {} already registered",
140 correlation_id
141 ));
142 }
143 waiters.insert(correlation_id.to_string(), sender);
144 Ok(())
145 }
146
147 pub async fn remove_waiter(
148 &self,
149 correlation_id: &str,
150 ) -> Option<oneshot::Sender<CanonicalMessage>> {
151 self.waiters.lock().await.remove(correlation_id)
152 }
153}
154
155pub fn get_or_create_channel(config: &MemoryConfig) -> MemoryChannel {
157 let mut channels = RUNTIME_MEMORY_CHANNELS.lock().unwrap();
158 channels
159 .entry(config.topic.clone()) .or_insert_with(|| {
161 info!(topic = %config.topic, "Creating new runtime memory channel");
162 MemoryChannel::new(config.capacity.unwrap_or(100))
163 })
164 .clone()
165}
166
167pub fn get_or_create_response_channel(topic: &str) -> MemoryResponseChannel {
169 let mut channels = RUNTIME_RESPONSE_CHANNELS.lock().unwrap();
170 channels
171 .entry(topic.to_string())
172 .or_insert_with(|| {
173 info!(topic = %topic, "Creating new runtime memory response channel");
174 MemoryResponseChannel::new(100)
175 })
176 .clone()
177}
178
179fn memory_channel_exists(topic: &str) -> bool {
180 let channels = RUNTIME_MEMORY_CHANNELS.lock().unwrap();
181 channels.contains_key(topic)
182}
183
184#[derive(Debug, Clone)]
186pub struct MemoryPublisher {
187 topic: String,
188 backend: PublisherBackend,
189 request_reply: bool,
190 request_timeout: std::time::Duration,
191}
192
193#[derive(Debug, Clone)]
194enum PublisherBackend {
195 Queue(Sender<Vec<CanonicalMessage>>),
196 Log(Arc<EventStore>),
197}
198
199impl MemoryPublisher {
200 pub fn new(config: &MemoryConfig) -> anyhow::Result<Self> {
201 let channel_exists = memory_channel_exists(&config.topic);
202 let store_exists = event_store_exists(&config.topic);
203
204 let backend = if config.subscribe_mode {
205 if channel_exists {
206 return Err(anyhow!("Topic '{}' is already active as a Queue (MemoryChannel), but Subscriber mode (EventStore) was requested.", config.topic));
207 }
208 let store = get_or_create_event_store(&config.topic);
209 PublisherBackend::Log(store)
210 } else if store_exists {
211 tracing::debug!(topic = %config.topic, "Adapting publisher to Log mode due to existing EventStore");
214 let store = get_or_create_event_store(&config.topic);
215 PublisherBackend::Log(store)
216 } else {
217 let channel = get_or_create_channel(config);
218 PublisherBackend::Queue(channel.sender)
219 };
220
221 Ok(Self {
222 topic: config.topic.clone(),
223 backend,
224 request_reply: config.request_reply,
225 request_timeout: std::time::Duration::from_millis(
226 config.request_timeout_ms.unwrap_or(30000),
227 ),
228 })
229 }
230
231 pub fn new_local(topic: &str, capacity: usize) -> Self {
236 Self::new(&MemoryConfig {
237 topic: topic.to_string(),
238 capacity: Some(capacity),
239 ..Default::default()
240 })
241 .expect("Failed to create local memory publisher")
242 }
243
244 pub fn channel(&self) -> MemoryChannel {
247 get_or_create_channel(&MemoryConfig {
248 topic: self.topic.clone(),
249 capacity: None,
250 ..Default::default()
251 })
252 }
253}
254
255#[async_trait]
256impl MessagePublisher for MemoryPublisher {
257 async fn send(&self, mut message: CanonicalMessage) -> Result<Sent, PublisherError> {
258 match &self.backend {
259 PublisherBackend::Log(store) => {
260 store.append(message).await;
261 Ok(Sent::Ack)
262 }
263 PublisherBackend::Queue(sender) => {
264 if self.request_reply {
265 let cid = message
266 .metadata
267 .entry("correlation_id".to_string())
268 .or_insert_with(fast_uuid_v7::gen_id_string)
269 .clone();
270
271 let (tx, rx) = oneshot::channel();
272
273 let response_channel = get_or_create_response_channel(&self.topic);
275 response_channel
276 .register_waiter(&cid, tx)
277 .await
278 .map_err(PublisherError::NonRetryable)?;
279
280 if let Err(e) = sender.send(vec![message]).await {
283 response_channel.remove_waiter(&cid).await;
284 return Err(anyhow!("Failed to send to memory channel: {}", e).into());
285 }
286
287 let response = match tokio::time::timeout(self.request_timeout, rx).await {
289 Ok(Ok(resp)) => resp,
290 Ok(Err(e)) => {
291 response_channel.remove_waiter(&cid).await;
292 return Err(anyhow!(
293 "Failed to receive response for correlation_id {}: {}",
294 cid,
295 e
296 )
297 .into());
298 }
299 Err(_) => {
300 response_channel.remove_waiter(&cid).await;
301 return Err(PublisherError::Retryable(anyhow!(
302 "Request timed out waiting for response for correlation_id {}",
303 cid
304 )));
305 }
306 };
307
308 Ok(Sent::Response(response))
309 } else {
310 self.send_batch(vec![message]).await?;
311 Ok(Sent::Ack)
312 }
313 }
314 }
315 }
316
317 async fn send_batch(
318 &self,
319 messages: Vec<CanonicalMessage>,
320 ) -> Result<SentBatch, PublisherError> {
321 match &self.backend {
322 PublisherBackend::Log(store) => {
323 trace!(
324 topic = %self.topic,
325 message_ids = ?LazyMessageIds(&messages),
326 "Appending batch to event store"
327 );
328 store.append_batch(messages).await;
329 Ok(SentBatch::Ack)
330 }
331 PublisherBackend::Queue(sender) => {
332 trace!(
333 topic = %self.topic,
334 message_ids = ?LazyMessageIds(&messages),
335 "Sending batch to memory channel. Current batch count: {}",
336 sender.len()
337 );
338 sender
339 .send(messages)
340 .await
341 .map_err(|e| anyhow!("Failed to send to memory channel: {}", e))?;
342 Ok(SentBatch::Ack)
343 }
344 }
345 }
346
347 fn as_any(&self) -> &dyn Any {
348 self
349 }
350}
351
352#[derive(Debug)]
354pub struct MemoryQueueConsumer {
355 topic: String,
356 receiver: Receiver<Vec<CanonicalMessage>>,
357 buffer: Vec<CanonicalMessage>,
359 enable_nack: bool,
360}
361
362#[derive(Debug)]
364pub enum MemoryConsumer {
365 Queue(MemoryQueueConsumer),
366 Log {
367 consumer: EventStoreConsumer,
368 topic: String,
369 },
370}
371
372impl MemoryConsumer {
373 pub fn new(config: &MemoryConfig) -> anyhow::Result<Self> {
374 let channel_exists = memory_channel_exists(&config.topic);
375 let store_exists = event_store_exists(&config.topic);
376
377 if config.subscribe_mode {
378 if channel_exists {
379 return Err(anyhow!("Topic '{}' is already active as a Queue (MemoryChannel), but Subscriber mode (EventStore) was requested.", config.topic));
380 }
381 let store = get_or_create_event_store(&config.topic);
382 let subscriber_id = format!("{}-consumer", config.topic);
386 info!(topic = %config.topic, subscriber_id = %subscriber_id, "Memory consumer (Log mode) connected");
387 let consumer = store.consumer(subscriber_id);
388 Ok(Self::Log {
389 consumer,
390 topic: config.topic.clone(),
391 })
392 } else {
393 if store_exists {
394 return Err(anyhow!("Topic '{}' is already active as a Subscriber Log (EventStore), but Queue mode (MemoryChannel) was requested.", config.topic));
399 }
400 let queue = MemoryQueueConsumer::new(config)?;
401 Ok(Self::Queue(queue))
402 }
403 }
404}
405
406impl MemoryQueueConsumer {
407 pub fn new(config: &MemoryConfig) -> anyhow::Result<Self> {
408 let channel = get_or_create_channel(config);
409 Ok(Self {
410 topic: config.topic.clone(),
411 receiver: channel.receiver.clone(),
412 buffer: Vec::new(),
413 enable_nack: config.enable_nack,
414 })
415 }
416
417 async fn get_buffered_msgs(
418 &mut self,
419 max_messages: usize,
420 ) -> Result<Vec<CanonicalMessage>, ConsumerError> {
421 if self.buffer.is_empty() {
423 self.buffer = match self.receiver.recv().await {
425 Ok(batch) => batch,
426 Err(_) => return Err(ConsumerError::EndOfStream),
427 };
428 self.buffer.reverse();
430 }
431
432 let num_to_take = self.buffer.len().min(max_messages);
434 let split_at = self.buffer.len() - num_to_take;
435
436 let mut messages = self.buffer.split_off(split_at);
439 messages.reverse(); Ok(messages)
441 }
442}
443
444#[async_trait]
445impl MessageConsumer for MemoryQueueConsumer {
446 async fn receive_batch(&mut self, max_messages: usize) -> Result<ReceivedBatch, ConsumerError> {
447 let mut messages = self.get_buffered_msgs(max_messages).await?;
450 while messages.len() < max_messages / 2 {
451 if let Ok(mut next_batch) = self.receiver.try_recv() {
452 if next_batch.len() + messages.len() > max_messages {
453 let needed = max_messages - messages.len();
454 let mut to_buffer = next_batch.split_off(needed);
455 messages.append(&mut next_batch);
456 self.buffer.append(&mut to_buffer);
457 self.buffer.reverse();
458 break;
459 } else {
460 messages.append(&mut next_batch);
461 }
462 } else {
463 break;
464 }
465 }
466 trace!(count = messages.len(), topic = %self.topic, message_ids = ?LazyMessageIds(&messages), "Received batch of memory messages");
467 if messages.is_empty() {
468 return Ok(ReceivedBatch {
469 messages: Vec::new(),
470 commit: Box::new(|_| {
471 Box::pin(async move { Ok(()) }) as BoxFuture<'static, anyhow::Result<()>>
472 }),
473 });
474 }
475
476 let topic = self.topic.clone();
477 let expected_count = messages.len();
478 let correlation_ids: Vec<Option<String>> = messages
479 .iter()
480 .map(|m| m.metadata.get("correlation_id").cloned())
481 .collect();
482
483 let messages_for_retry = if self.enable_nack {
488 Some(messages.clone())
489 } else {
490 None
491 };
492 let commit = Box::new(move |dispositions: Vec<MessageDisposition>| {
493 Box::pin(async move {
494 if dispositions.len() != expected_count {
495 return Err(anyhow::anyhow!(
496 "Memory batch commit received mismatched disposition count: expected {}, got {}",
497 expected_count,
498 dispositions.len()
499 ));
500 }
501 let response_channel = get_or_create_response_channel(&topic);
502 let mut to_requeue = Vec::new();
503
504 for (i, disposition) in dispositions.into_iter().enumerate() {
505 match disposition {
506 MessageDisposition::Reply(mut resp) => {
507 if !resp.metadata.contains_key("correlation_id") {
508 if let Some(Some(cid)) = correlation_ids.get(i) {
509 resp.metadata
510 .insert("correlation_id".to_string(), cid.clone());
511 }
512 }
513
514 let mut handled = false;
516 if let Some(cid) = resp.metadata.get("correlation_id") {
517 if let Some(tx) = response_channel.remove_waiter(cid).await {
518 let _ = tx.send(resp.clone());
519 handled = true;
520 }
521 }
522 if !handled {
523 let _ = response_channel.sender.send(resp).await;
524 }
525 }
526 MessageDisposition::Nack => {
527 if let Some(msgs) = &messages_for_retry {
529 if let Some(msg) = msgs.get(i) {
530 to_requeue.push(msg.clone());
531 }
532 }
533 }
534 MessageDisposition::Ack => {}
535 }
536 }
537
538 if !to_requeue.is_empty() {
539 let main_channel = get_or_create_channel(&MemoryConfig {
540 topic: topic.to_string(),
541 capacity: None,
542 ..Default::default()
543 });
544 if main_channel.sender.send(to_requeue).await.is_err() {
545 tracing::error!("Failed to re-queue NACKed messages to memory channel as it was closed.");
546 }
547 }
548 Ok(())
549 }) as BoxFuture<'static, anyhow::Result<()>>
550 }) as BatchCommitFunc;
551 Ok(ReceivedBatch { messages, commit })
552 }
553
554 fn as_any(&self) -> &dyn Any {
555 self
556 }
557}
558
559#[async_trait]
560impl MessageConsumer for MemoryConsumer {
561 async fn receive_batch(&mut self, max_messages: usize) -> Result<ReceivedBatch, ConsumerError> {
562 match self {
563 Self::Queue(q) => q.receive_batch(max_messages).await,
564 Self::Log { consumer, .. } => consumer.receive_batch(max_messages).await,
565 }
566 }
567
568 fn as_any(&self) -> &dyn Any {
569 self
570 }
571}
572
573impl MemoryConsumer {
574 pub fn new_local(topic: &str, capacity: usize) -> Self {
575 Self::new(&MemoryConfig {
576 topic: topic.to_string(),
577 capacity: Some(capacity),
578 ..Default::default()
579 })
580 .expect("Failed to create local memory consumer")
581 }
582 pub fn channel(&self) -> MemoryChannel {
583 let topic = match self {
584 Self::Queue(q) => &q.topic,
585 Self::Log { topic, .. } => topic,
586 };
587 get_or_create_channel(&MemoryConfig {
588 topic: topic.clone(),
589 ..Default::default()
590 })
591 }
592}
593
594pub struct MemorySubscriber {
595 consumer: MemoryConsumer,
596}
597
598impl MemorySubscriber {
599 pub fn new(config: &MemoryConfig, id: &str) -> anyhow::Result<Self> {
600 let mut sub_config = config.clone();
601 let consumer = if config.subscribe_mode {
604 let store = get_or_create_event_store(&config.topic);
605 MemoryConsumer::Log {
606 consumer: store.consumer(id.to_string()),
607 topic: config.topic.clone(),
608 }
609 } else {
610 sub_config.topic = format!("{}-{}", config.topic, id);
611 MemoryConsumer::new(&sub_config)?
612 };
613 Ok(Self { consumer })
614 }
615}
616
617#[async_trait]
618impl MessageConsumer for MemorySubscriber {
619 async fn receive_batch(&mut self, max_messages: usize) -> Result<ReceivedBatch, ConsumerError> {
620 self.consumer.receive_batch(max_messages).await
621 }
622
623 async fn receive(&mut self) -> Result<Received, ConsumerError> {
624 self.consumer.receive().await
625 }
626
627 fn as_any(&self) -> &dyn Any {
628 self
629 }
630}
631
632#[cfg(test)]
633mod tests {
634 use super::*;
635 use crate::models::{Endpoint, Route};
636 use crate::traits::Handled;
637 use crate::{msg, CanonicalMessage};
638 use serde_json::json;
639 use tokio::time::sleep;
640
641 #[tokio::test]
642 async fn test_memory_channel_integration() {
643 let mut consumer = MemoryConsumer::new_local("test-mem1", 10);
644 let publisher = MemoryPublisher::new_local("test-mem1", 10);
645
646 let msg = msg!(json!({"hello": "memory"}));
647
648 publisher.send(msg.clone()).await.unwrap();
650
651 sleep(std::time::Duration::from_millis(10)).await;
652 let received = consumer.receive().await.unwrap();
654 let _ = (received.commit)(MessageDisposition::Ack).await;
655 assert_eq!(received.message.payload, msg.payload);
656 assert_eq!(consumer.channel().len(), 0);
657 }
658
659 #[tokio::test]
660 async fn test_memory_publisher_and_consumer_integration() {
661 let mut consumer = MemoryConsumer::new_local("test-mem2", 10);
662 let publisher = MemoryPublisher::new_local("test-mem2", 10);
663
664 let msg1 = msg!(json!({"message": "one"}));
665 let msg2 = msg!(json!({"message": "two"}));
666 let msg3 = msg!(json!({"message": "three"}));
667
668 publisher
670 .send_batch(vec![msg1.clone(), msg2.clone()])
671 .await
672 .unwrap();
673 publisher.send(msg3.clone()).await.unwrap();
674
675 assert_eq!(publisher.channel().len(), 2);
677
678 let received1 = consumer.receive().await.unwrap();
680 let _ = (received1.commit)(MessageDisposition::Ack).await;
681 assert_eq!(received1.message.payload, msg1.payload);
682
683 let batch2 = consumer.receive_batch(1).await.unwrap();
684 let (received_msg2, commit2) = (batch2.messages, batch2.commit);
685 let _ = commit2(vec![MessageDisposition::Ack; received_msg2.len()]).await;
686 assert_eq!(received_msg2.len(), 1);
687 assert_eq!(received_msg2.first().unwrap().payload, msg2.payload);
688 let batch3 = consumer.receive_batch(2).await.unwrap();
689 let (received_msg3, commit3) = (batch3.messages, batch3.commit);
690 let _ = commit3(vec![MessageDisposition::Ack; received_msg3.len()]).await;
691 assert_eq!(received_msg3.first().unwrap().payload, msg3.payload);
692
693 assert_eq!(publisher.channel().len(), 0);
695
696 }
700
701 #[tokio::test]
702 async fn test_memory_subscriber_structure() {
703 let cfg = MemoryConfig {
704 topic: "base_topic".to_string(),
705 capacity: Some(10),
706 ..Default::default()
707 };
708 let subscriber_id = "sub1";
709 let mut subscriber = MemorySubscriber::new(&cfg, subscriber_id).unwrap();
710
711 let pub_cfg = MemoryConfig {
714 topic: format!("base_topic-{}", subscriber_id),
715 capacity: Some(10),
716 ..Default::default()
717 };
718 let publisher = MemoryPublisher::new(&pub_cfg).unwrap();
719
720 publisher.send("hello subscriber".into()).await.unwrap();
721
722 let received = subscriber.receive().await.unwrap();
723 assert_eq!(received.message.get_payload_str(), "hello subscriber");
724 }
725
726 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
727 async fn test_memory_request_reply_mode() {
728 let topic = format!("mem_rr_topic_{}", fast_uuid_v7::gen_id_str());
729 let input_endpoint = Endpoint::new_memory(&topic, 10);
730 let output_endpoint = Endpoint::new_response();
731 let handler = |mut msg: CanonicalMessage| async move {
732 let request_payload = msg.get_payload_str();
733 let response_payload = format!("reply to {}", request_payload);
734 msg.set_payload_str(response_payload);
735 Ok(Handled::Publish(msg))
736 };
737
738 let route = Route::new(input_endpoint, output_endpoint).with_handler(handler);
739 route.deploy("mem_rr_test").await.unwrap();
740
741 let publisher = MemoryPublisher::new(&MemoryConfig {
743 topic: topic.clone(),
744 capacity: Some(10),
745 request_reply: true,
746 request_timeout_ms: Some(2000),
747 ..Default::default()
748 })
749 .unwrap();
750
751 let result = publisher.send("direct request".into()).await.unwrap();
752
753 if let Sent::Response(response_msg) = result {
754 assert_eq!(response_msg.get_payload_str(), "reply to direct request");
755 } else {
756 panic!("Expected Sent::Response, got {:?}", result);
757 }
758
759 Route::stop("mem_rr_test").await;
761 }
762
763 #[tokio::test]
764 async fn test_memory_nack_requeue() {
765 let topic = format!("test_nack_requeue_{}", fast_uuid_v7::gen_id_str());
766 let config = MemoryConfig {
767 topic: topic.clone(),
768 capacity: Some(10),
769 enable_nack: true,
770 ..Default::default()
771 };
772 let mut consumer = MemoryConsumer::new(&config).unwrap();
773 let publisher = MemoryPublisher::new_local(&topic, 10);
774
775 publisher.send("to_be_nacked".into()).await.unwrap();
776
777 let received1 = consumer.receive().await.unwrap();
779 assert_eq!(received1.message.get_payload_str(), "to_be_nacked");
780 (received1.commit)(crate::traits::MessageDisposition::Nack)
781 .await
782 .unwrap();
783
784 let received2 = tokio::time::timeout(std::time::Duration::from_secs(1), consumer.receive())
786 .await
787 .expect("Timed out waiting for re-queued message")
788 .unwrap();
789 assert_eq!(received2.message.get_payload_str(), "to_be_nacked");
790
791 (received2.commit)(crate::traits::MessageDisposition::Ack)
793 .await
794 .unwrap();
795
796 let result =
798 tokio::time::timeout(std::time::Duration::from_millis(100), consumer.receive()).await;
799 assert!(result.is_err(), "Channel should be empty");
800 }
801
802 #[tokio::test]
803 async fn test_memory_event_store_integration() {
804 let topic = "event_store_test";
805 let pub_config = MemoryConfig {
807 topic: topic.to_string(),
808 subscribe_mode: true,
809 ..Default::default()
810 };
811 let publisher = MemoryPublisher::new(&pub_config).unwrap();
812
813 let mut sub1 = MemorySubscriber::new(&pub_config, "sub1").unwrap();
815 let mut sub2 = MemorySubscriber::new(&pub_config, "sub2").unwrap();
817
818 publisher.send("event1".into()).await.unwrap();
819
820 let msg1 = sub1.receive().await.unwrap();
821 assert_eq!(msg1.message.get_payload_str(), "event1");
822 (msg1.commit)(MessageDisposition::Ack).await.unwrap();
823
824 let msg2 = sub2.receive().await.unwrap();
825 assert_eq!(msg2.message.get_payload_str(), "event1");
826 }
827
828 #[tokio::test]
829 async fn test_memory_no_subscribers_persistence() {
830 let topic = format!("no_subs_{}", fast_uuid_v7::gen_id_str());
831 let pub_config = MemoryConfig {
832 topic: topic.clone(),
833 subscribe_mode: true,
834 ..Default::default()
835 };
836
837 let publisher = MemoryPublisher::new(&pub_config).unwrap();
839
840 publisher.send("msg1".into()).await.unwrap();
842 publisher.send("msg2".into()).await.unwrap();
843
844 let sub_config = MemoryConfig {
846 topic: topic.clone(),
847 subscribe_mode: true,
848 ..Default::default()
849 };
850 let mut subscriber = MemorySubscriber::new(&sub_config, "late_sub").unwrap();
851
852 let received1 = subscriber.receive().await.unwrap();
854 assert_eq!(received1.message.get_payload_str(), "msg1");
855 (received1.commit)(MessageDisposition::Ack).await.unwrap();
856
857 let received2 = subscriber.receive().await.unwrap();
858 assert_eq!(received2.message.get_payload_str(), "msg2");
859 (received2.commit)(MessageDisposition::Ack).await.unwrap();
860 }
861
862 #[tokio::test]
863 async fn test_memory_mixed_mode_error() {
864 let topic_q = format!("mixed_q_{}", fast_uuid_v7::gen_id_str());
865 let topic_l = format!("mixed_l_{}", fast_uuid_v7::gen_id_str());
866
867 let _pub_q = MemoryPublisher::new_local(&topic_q, 10); let log_conf = MemoryConfig {
871 topic: topic_q.clone(),
872 subscribe_mode: true,
873 ..Default::default()
874 };
875 let err = MemoryConsumer::new(&log_conf);
876 assert!(err.is_err());
877 assert!(err
878 .unwrap_err()
879 .to_string()
880 .contains("already active as a Queue"));
881
882 let log_pub_conf = MemoryConfig {
884 topic: topic_l.clone(),
885 subscribe_mode: true,
886 ..Default::default()
887 };
888 let _pub_l = MemoryPublisher::new(&log_pub_conf).unwrap(); let queue_conf = MemoryConfig {
891 topic: topic_l.clone(),
892 subscribe_mode: false,
893 ..Default::default()
894 };
895 let err = MemoryConsumer::new(&queue_conf);
896 assert!(err.is_err());
897 assert!(err
898 .unwrap_err()
899 .to_string()
900 .contains("already active as a Subscriber Log"));
901 }
902
903 #[tokio::test]
904 async fn test_memory_publisher_mixed_mode_error() {
905 let topic_q = format!("pub_mixed_q_{}", fast_uuid_v7::gen_id_str());
906
907 let _cons_q = MemoryConsumer::new_local(&topic_q, 10);
909
910 let log_conf = MemoryConfig {
912 topic: topic_q.clone(),
913 subscribe_mode: true,
914 ..Default::default()
915 };
916 let err = MemoryPublisher::new(&log_conf);
917 assert!(err.is_err());
918 assert!(err
919 .unwrap_err()
920 .to_string()
921 .contains("already active as a Queue"));
922 }
923
924 #[tokio::test]
925 async fn test_memory_publisher_adaptive_behavior() {
926 let topic = format!("adaptive_{}", fast_uuid_v7::gen_id_str());
927
928 let sub_config = MemoryConfig {
930 topic: topic.clone(),
931 subscribe_mode: true,
932 ..Default::default()
933 };
934 let mut subscriber = MemorySubscriber::new(&sub_config, "sub1").unwrap();
935
936 let pub_config = MemoryConfig {
938 topic: topic.clone(),
939 subscribe_mode: false, ..Default::default()
941 };
942 let publisher = MemoryPublisher::new(&pub_config).unwrap();
944
945 publisher.send("adaptive_msg".into()).await.unwrap();
947
948 let received = subscriber.receive().await.unwrap();
949 assert_eq!(received.message.get_payload_str(), "adaptive_msg");
950 }
951}