1use crate::canonical_message::tracing_support::LazyMessageIds;
6use crate::event_store::{
7 event_store_exists, get_or_create_event_store, EventStore, EventStoreConsumer,
8};
9use crate::models::MemoryConfig;
10use crate::traits::{
11 BatchCommitFunc, BoxFuture, ConsumerError, MessageConsumer, MessageDisposition,
12 MessagePublisher, PublisherError, Received, ReceivedBatch, Sent, SentBatch,
13};
14use crate::CanonicalMessage;
15use anyhow::anyhow;
16use async_channel::{bounded, Receiver, Sender};
17use async_trait::async_trait;
18use once_cell::sync::Lazy;
19use std::any::Any;
20use std::collections::HashMap;
21use std::sync::{Arc, Mutex};
22use tokio::sync::oneshot;
23use tracing::{info, trace};
24
25static RUNTIME_MEMORY_CHANNELS: Lazy<Mutex<HashMap<String, MemoryChannel>>> =
28 Lazy::new(|| Mutex::new(HashMap::new()));
29
30static RUNTIME_RESPONSE_CHANNELS: Lazy<Mutex<HashMap<String, MemoryResponseChannel>>> =
32 Lazy::new(|| Mutex::new(HashMap::new()));
33
34#[derive(Debug, Clone)]
39pub struct MemoryChannel {
40 pub sender: Sender<Vec<CanonicalMessage>>,
41 pub receiver: Receiver<Vec<CanonicalMessage>>,
42}
43
44impl MemoryChannel {
45 pub fn new(capacity: usize) -> Self {
47 let (sender, receiver) = bounded(capacity);
48 Self { sender, receiver }
49 }
50
51 pub async fn send_message(&self, message: CanonicalMessage) -> anyhow::Result<()> {
53 self.sender.send(vec![message]).await?;
54 tracing::debug!("Message sent to memory {} channel", self.sender.len());
55 Ok(())
56 }
57
58 pub async fn fill_messages(&self, messages: Vec<CanonicalMessage>) -> anyhow::Result<()> {
60 self.sender
62 .send(messages)
63 .await
64 .map_err(|e| anyhow!("Memory channel was closed while filling messages: {}", e))?;
65 Ok(())
66 }
67
68 pub fn close(&self) {
70 self.sender.close();
71 }
72
73 pub fn drain_messages(&self) -> Vec<CanonicalMessage> {
75 let mut messages = Vec::new();
76 while let Ok(batch) = self.receiver.try_recv() {
78 messages.extend(batch);
79 }
80 messages
81 }
82
83 pub fn len(&self) -> usize {
85 self.receiver.len()
86 }
87
88 pub fn is_empty(&self) -> bool {
90 self.receiver.is_empty()
91 }
92}
93
94#[derive(Debug, Clone)]
96pub struct MemoryResponseChannel {
97 pub sender: Sender<CanonicalMessage>,
98 pub receiver: Receiver<CanonicalMessage>,
99 waiters: Arc<tokio::sync::Mutex<HashMap<String, oneshot::Sender<CanonicalMessage>>>>,
100}
101
102impl MemoryResponseChannel {
103 pub fn new(capacity: usize) -> Self {
104 let (sender, receiver) = bounded(capacity);
105 Self {
106 sender,
107 receiver,
108 waiters: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
109 }
110 }
111
112 pub fn close(&self) {
113 self.sender.close();
114 }
115
116 pub fn len(&self) -> usize {
117 self.receiver.len()
118 }
119
120 pub fn is_empty(&self) -> bool {
121 self.receiver.is_empty()
122 }
123
124 pub async fn wait_for_response(&self) -> anyhow::Result<CanonicalMessage> {
125 self.receiver
126 .recv()
127 .await
128 .map_err(|e| anyhow!("Error receiving response: {}", e))
129 }
130
131 pub async fn register_waiter(
132 &self,
133 correlation_id: &str,
134 sender: oneshot::Sender<CanonicalMessage>,
135 ) -> anyhow::Result<()> {
136 let mut waiters = self.waiters.lock().await;
137 if waiters.contains_key(correlation_id) {
138 return Err(anyhow!(
139 "Correlation ID {} already registered",
140 correlation_id
141 ));
142 }
143 waiters.insert(correlation_id.to_string(), sender);
144 Ok(())
145 }
146
147 pub async fn remove_waiter(
148 &self,
149 correlation_id: &str,
150 ) -> Option<oneshot::Sender<CanonicalMessage>> {
151 self.waiters.lock().await.remove(correlation_id)
152 }
153}
154
155pub fn get_or_create_channel(config: &MemoryConfig) -> MemoryChannel {
157 let mut channels = RUNTIME_MEMORY_CHANNELS.lock().unwrap();
158 channels
159 .entry(config.topic.clone()) .or_insert_with(|| {
161 info!(topic = %config.topic, "Creating new runtime memory channel");
162 MemoryChannel::new(config.capacity.unwrap_or(100))
163 })
164 .clone()
165}
166
167pub fn get_or_create_response_channel(topic: &str) -> MemoryResponseChannel {
169 let mut channels = RUNTIME_RESPONSE_CHANNELS.lock().unwrap();
170 channels
171 .entry(topic.to_string())
172 .or_insert_with(|| {
173 info!(topic = %topic, "Creating new runtime memory response channel");
174 MemoryResponseChannel::new(100)
175 })
176 .clone()
177}
178
179fn memory_channel_exists(topic: &str) -> bool {
180 let channels = RUNTIME_MEMORY_CHANNELS.lock().unwrap();
181 channels.contains_key(topic)
182}
183
184#[derive(Debug, Clone)]
186pub struct MemoryPublisher {
187 topic: String,
188 backend: PublisherBackend,
189 request_reply: bool,
190 request_timeout: std::time::Duration,
191}
192
193#[derive(Debug, Clone)]
194enum PublisherBackend {
195 Queue(Sender<Vec<CanonicalMessage>>),
196 Log(Arc<EventStore>),
197}
198
199impl MemoryPublisher {
200 pub fn new(config: &MemoryConfig) -> anyhow::Result<Self> {
201 let channel_exists = memory_channel_exists(&config.topic);
202 let store_exists = event_store_exists(&config.topic);
203
204 let backend = if config.subscribe_mode {
205 if channel_exists {
206 return Err(anyhow!("Topic '{}' is already active as a Queue (MemoryChannel), but Subscriber mode (EventStore) was requested.", config.topic));
207 }
208 let store = get_or_create_event_store(&config.topic);
209 PublisherBackend::Log(store)
210 } else if store_exists {
211 tracing::debug!(topic = %config.topic, "Adapting publisher to Log mode due to existing EventStore");
214 let store = get_or_create_event_store(&config.topic);
215 PublisherBackend::Log(store)
216 } else {
217 let channel = get_or_create_channel(config);
218 PublisherBackend::Queue(channel.sender)
219 };
220
221 Ok(Self {
222 topic: config.topic.clone(),
223 backend,
224 request_reply: config.request_reply,
225 request_timeout: std::time::Duration::from_millis(
226 config.request_timeout_ms.unwrap_or(30000),
227 ),
228 })
229 }
230
231 pub fn new_local(topic: &str, capacity: usize) -> Self {
236 Self::new(&MemoryConfig {
237 topic: topic.to_string(),
238 capacity: Some(capacity),
239 ..Default::default()
240 })
241 .expect("Failed to create local memory publisher")
242 }
243
244 pub fn channel(&self) -> MemoryChannel {
247 get_or_create_channel(&MemoryConfig {
248 topic: self.topic.clone(),
249 capacity: None,
250 ..Default::default()
251 })
252 }
253}
254
255#[async_trait]
256impl MessagePublisher for MemoryPublisher {
257 async fn send(&self, mut message: CanonicalMessage) -> Result<Sent, PublisherError> {
258 match &self.backend {
259 PublisherBackend::Log(store) => {
260 store.append(message).await;
261 Ok(Sent::Ack)
262 }
263 PublisherBackend::Queue(sender) => {
264 if self.request_reply {
265 let cid = message
266 .metadata
267 .entry("correlation_id".to_string())
268 .or_insert_with(fast_uuid_v7::gen_id_string)
269 .clone();
270
271 let (tx, rx) = oneshot::channel();
272
273 let response_channel = get_or_create_response_channel(&self.topic);
275 response_channel
276 .register_waiter(&cid, tx)
277 .await
278 .map_err(PublisherError::NonRetryable)?;
279
280 if let Err(e) = sender.send(vec![message]).await {
283 response_channel.remove_waiter(&cid).await;
284 return Err(anyhow!("Failed to send to memory channel: {}", e).into());
285 }
286
287 let response = match tokio::time::timeout(self.request_timeout, rx).await {
289 Ok(Ok(resp)) => resp,
290 Ok(Err(e)) => {
291 response_channel.remove_waiter(&cid).await;
292 return Err(anyhow!(
293 "Failed to receive response for correlation_id {}: {}",
294 cid,
295 e
296 )
297 .into());
298 }
299 Err(_) => {
300 response_channel.remove_waiter(&cid).await;
301 return Err(PublisherError::Retryable(anyhow!(
302 "Request timed out waiting for response for correlation_id {}",
303 cid
304 )));
305 }
306 };
307
308 Ok(Sent::Response(response))
309 } else {
310 self.send_batch(vec![message]).await?;
311 Ok(Sent::Ack)
312 }
313 }
314 }
315 }
316
317 async fn send_batch(
318 &self,
319 messages: Vec<CanonicalMessage>,
320 ) -> Result<SentBatch, PublisherError> {
321 match &self.backend {
322 PublisherBackend::Log(store) => {
323 trace!(
324 topic = %self.topic,
325 message_ids = ?LazyMessageIds(&messages),
326 "Appending batch to event store"
327 );
328 store.append_batch(messages).await;
329 Ok(SentBatch::Ack)
330 }
331 PublisherBackend::Queue(sender) => {
332 trace!(
333 topic = %self.topic,
334 message_ids = ?LazyMessageIds(&messages),
335 "Sending batch to memory channel. Current batch count: {}",
336 sender.len()
337 );
338 sender
339 .send(messages)
340 .await
341 .map_err(|e| anyhow!("Failed to send to memory channel: {}", e))?;
342 Ok(SentBatch::Ack)
343 }
344 }
345 }
346
347 fn as_any(&self) -> &dyn Any {
348 self
349 }
350}
351
352#[derive(Debug)]
354pub struct MemoryQueueConsumer {
355 topic: String,
356 receiver: Receiver<Vec<CanonicalMessage>>,
357 buffer: Vec<CanonicalMessage>,
359 enable_nack: bool,
360}
361
362#[derive(Debug)]
364pub enum MemoryConsumer {
365 Queue(MemoryQueueConsumer),
366 Log {
367 consumer: EventStoreConsumer,
368 topic: String,
369 },
370}
371
372impl MemoryConsumer {
373 pub fn new(config: &MemoryConfig) -> anyhow::Result<Self> {
374 let channel_exists = memory_channel_exists(&config.topic);
375 let store_exists = event_store_exists(&config.topic);
376
377 if config.subscribe_mode {
378 if channel_exists {
379 return Err(anyhow!("Topic '{}' is already active as a Queue (MemoryChannel), but Subscriber mode (EventStore) was requested.", config.topic));
380 }
381 let store = get_or_create_event_store(&config.topic);
382 let subscriber_id = format!("{}-consumer", config.topic);
386 info!(topic = %config.topic, subscriber_id = %subscriber_id, "Memory consumer (Log mode) connected");
387 let consumer = store.consumer(subscriber_id);
388 Ok(Self::Log {
389 consumer,
390 topic: config.topic.clone(),
391 })
392 } else {
393 if store_exists {
394 return Err(anyhow!("Topic '{}' is already active as a Subscriber Log (EventStore), but Queue mode (MemoryChannel) was requested.", config.topic));
399 }
400 let queue = MemoryQueueConsumer::new(config)?;
401 Ok(Self::Queue(queue))
402 }
403 }
404}
405
406impl MemoryQueueConsumer {
407 pub fn new(config: &MemoryConfig) -> anyhow::Result<Self> {
408 let channel = get_or_create_channel(config);
409 let buffer = if let Some(capacity) = config.capacity {
410 Vec::with_capacity(capacity)
411 } else {
412 Vec::new()
413 };
414 Ok(Self {
415 topic: config.topic.clone(),
416 receiver: channel.receiver.clone(),
417 buffer,
418 enable_nack: config.enable_nack,
419 })
420 }
421
422 async fn get_buffered_msgs(
423 &mut self,
424 max_messages: usize,
425 ) -> Result<Vec<CanonicalMessage>, ConsumerError> {
426 if self.buffer.is_empty() {
428 self.buffer = match self.receiver.recv().await {
430 Ok(batch) => batch,
431 Err(_) => return Err(ConsumerError::EndOfStream),
432 };
433 self.buffer.reverse();
435 }
436
437 let num_to_take = self.buffer.len().min(max_messages);
439 let split_at = self.buffer.len() - num_to_take;
440
441 let mut messages = self.buffer.split_off(split_at);
444 messages.reverse(); Ok(messages)
446 }
447}
448
449#[async_trait]
450impl MessageConsumer for MemoryQueueConsumer {
451 async fn receive_batch(&mut self, max_messages: usize) -> Result<ReceivedBatch, ConsumerError> {
452 let mut messages = self.get_buffered_msgs(max_messages).await?;
455 while messages.len() < max_messages / 2 {
456 if let Ok(mut next_batch) = self.receiver.try_recv() {
457 if next_batch.len() + messages.len() > max_messages {
458 let needed = max_messages - messages.len();
459 let mut to_buffer = next_batch.split_off(needed);
460 messages.append(&mut next_batch);
461 self.buffer.append(&mut to_buffer);
462 self.buffer.reverse();
463 break;
464 } else {
465 messages.append(&mut next_batch);
466 }
467 } else {
468 break;
469 }
470 }
471 trace!(count = messages.len(), topic = %self.topic, message_ids = ?LazyMessageIds(&messages), "Received batch of memory messages");
472 if messages.is_empty() {
473 return Ok(ReceivedBatch {
474 messages: Vec::new(),
475 commit: Box::new(|_| {
476 Box::pin(async move { Ok(()) }) as BoxFuture<'static, anyhow::Result<()>>
477 }),
478 });
479 }
480
481 let topic = self.topic.clone();
482 let expected_count = messages.len();
483 let correlation_ids: Vec<Option<String>> = messages
484 .iter()
485 .map(|m| m.metadata.get("correlation_id").cloned())
486 .collect();
487
488 let messages_for_retry = if self.enable_nack {
493 Some(messages.clone())
494 } else {
495 None
496 };
497 let commit = Box::new(move |dispositions: Vec<MessageDisposition>| {
498 Box::pin(async move {
499 if dispositions.len() != expected_count {
500 return Err(anyhow::anyhow!(
501 "Memory batch commit received mismatched disposition count: expected {}, got {}",
502 expected_count,
503 dispositions.len()
504 ));
505 }
506 let response_channel = get_or_create_response_channel(&topic);
507 let mut to_requeue = Vec::new();
508
509 for (i, disposition) in dispositions.into_iter().enumerate() {
510 match disposition {
511 MessageDisposition::Reply(mut resp) => {
512 if !resp.metadata.contains_key("correlation_id") {
513 if let Some(Some(cid)) = correlation_ids.get(i) {
514 resp.metadata
515 .insert("correlation_id".to_string(), cid.clone());
516 }
517 }
518
519 let mut handled = false;
521 if let Some(cid) = resp.metadata.get("correlation_id") {
522 if let Some(tx) = response_channel.remove_waiter(cid).await {
523 let _ = tx.send(resp.clone());
524 handled = true;
525 }
526 }
527 if !handled {
528 let _ = response_channel.sender.send(resp).await;
529 }
530 }
531 MessageDisposition::Nack => {
532 if let Some(msgs) = &messages_for_retry {
534 if let Some(msg) = msgs.get(i) {
535 to_requeue.push(msg.clone());
536 }
537 }
538 }
539 MessageDisposition::Ack => {}
540 }
541 }
542
543 if !to_requeue.is_empty() {
544 let main_channel = get_or_create_channel(&MemoryConfig {
545 topic: topic.to_string(),
546 capacity: None,
547 ..Default::default()
548 });
549 if main_channel.sender.send(to_requeue).await.is_err() {
550 tracing::error!("Failed to re-queue NACKed messages to memory channel as it was closed.");
551 }
552 }
553 Ok(())
554 }) as BoxFuture<'static, anyhow::Result<()>>
555 }) as BatchCommitFunc;
556 Ok(ReceivedBatch { messages, commit })
557 }
558
559 fn as_any(&self) -> &dyn Any {
560 self
561 }
562}
563
564#[async_trait]
565impl MessageConsumer for MemoryConsumer {
566 async fn receive_batch(&mut self, max_messages: usize) -> Result<ReceivedBatch, ConsumerError> {
567 match self {
568 Self::Queue(q) => q.receive_batch(max_messages).await,
569 Self::Log { consumer, .. } => consumer.receive_batch(max_messages).await,
570 }
571 }
572
573 fn as_any(&self) -> &dyn Any {
574 self
575 }
576}
577
578impl MemoryConsumer {
579 pub fn new_local(topic: &str, capacity: usize) -> Self {
580 Self::new(&MemoryConfig {
581 topic: topic.to_string(),
582 capacity: Some(capacity),
583 ..Default::default()
584 })
585 .expect("Failed to create local memory consumer")
586 }
587 pub fn channel(&self) -> MemoryChannel {
588 let topic = match self {
589 Self::Queue(q) => &q.topic,
590 Self::Log { topic, .. } => topic,
591 };
592 get_or_create_channel(&MemoryConfig {
593 topic: topic.clone(),
594 ..Default::default()
595 })
596 }
597}
598
599pub struct MemorySubscriber {
600 consumer: MemoryConsumer,
601}
602
603impl MemorySubscriber {
604 pub fn new(config: &MemoryConfig, id: &str) -> anyhow::Result<Self> {
605 let mut sub_config = config.clone();
606 let consumer = if config.subscribe_mode {
609 let store = get_or_create_event_store(&config.topic);
610 MemoryConsumer::Log {
611 consumer: store.consumer(id.to_string()),
612 topic: config.topic.clone(),
613 }
614 } else {
615 sub_config.topic = format!("{}-{}", config.topic, id);
616 MemoryConsumer::new(&sub_config)?
617 };
618 Ok(Self { consumer })
619 }
620}
621
622#[async_trait]
623impl MessageConsumer for MemorySubscriber {
624 async fn receive_batch(&mut self, max_messages: usize) -> Result<ReceivedBatch, ConsumerError> {
625 self.consumer.receive_batch(max_messages).await
626 }
627
628 async fn receive(&mut self) -> Result<Received, ConsumerError> {
629 self.consumer.receive().await
630 }
631
632 fn as_any(&self) -> &dyn Any {
633 self
634 }
635}
636
637#[cfg(test)]
638mod tests {
639 use super::*;
640 use crate::models::{Endpoint, Route};
641 use crate::traits::Handled;
642 use crate::{msg, CanonicalMessage};
643 use serde_json::json;
644 use tokio::time::sleep;
645
646 #[tokio::test]
647 async fn test_memory_channel_integration() {
648 let mut consumer = MemoryConsumer::new_local("test-mem1", 10);
649 let publisher = MemoryPublisher::new_local("test-mem1", 10);
650
651 let msg = msg!(json!({"hello": "memory"}));
652
653 publisher.send(msg.clone()).await.unwrap();
655
656 sleep(std::time::Duration::from_millis(10)).await;
657 let received = consumer.receive().await.unwrap();
659 let _ = (received.commit)(MessageDisposition::Ack).await;
660 assert_eq!(received.message.payload, msg.payload);
661 assert_eq!(consumer.channel().len(), 0);
662 }
663
664 #[tokio::test]
665 async fn test_memory_publisher_and_consumer_integration() {
666 let mut consumer = MemoryConsumer::new_local("test-mem2", 10);
667 let publisher = MemoryPublisher::new_local("test-mem2", 10);
668
669 let msg1 = msg!(json!({"message": "one"}));
670 let msg2 = msg!(json!({"message": "two"}));
671 let msg3 = msg!(json!({"message": "three"}));
672
673 publisher
675 .send_batch(vec![msg1.clone(), msg2.clone()])
676 .await
677 .unwrap();
678 publisher.send(msg3.clone()).await.unwrap();
679
680 assert_eq!(publisher.channel().len(), 2);
682
683 let received1 = consumer.receive().await.unwrap();
685 let _ = (received1.commit)(MessageDisposition::Ack).await;
686 assert_eq!(received1.message.payload, msg1.payload);
687
688 let batch2 = consumer.receive_batch(1).await.unwrap();
689 let (received_msg2, commit2) = (batch2.messages, batch2.commit);
690 let _ = commit2(vec![MessageDisposition::Ack; received_msg2.len()]).await;
691 assert_eq!(received_msg2.len(), 1);
692 assert_eq!(received_msg2.first().unwrap().payload, msg2.payload);
693 let batch3 = consumer.receive_batch(2).await.unwrap();
694 let (received_msg3, commit3) = (batch3.messages, batch3.commit);
695 let _ = commit3(vec![MessageDisposition::Ack; received_msg3.len()]).await;
696 assert_eq!(received_msg3.first().unwrap().payload, msg3.payload);
697
698 assert_eq!(publisher.channel().len(), 0);
700
701 }
705
706 #[tokio::test]
707 async fn test_memory_subscriber_structure() {
708 let cfg = MemoryConfig {
709 topic: "base_topic".to_string(),
710 capacity: Some(10),
711 ..Default::default()
712 };
713 let subscriber_id = "sub1";
714 let mut subscriber = MemorySubscriber::new(&cfg, subscriber_id).unwrap();
715
716 let pub_cfg = MemoryConfig {
719 topic: format!("base_topic-{}", subscriber_id),
720 capacity: Some(10),
721 ..Default::default()
722 };
723 let publisher = MemoryPublisher::new(&pub_cfg).unwrap();
724
725 publisher.send("hello subscriber".into()).await.unwrap();
726
727 let received = subscriber.receive().await.unwrap();
728 assert_eq!(received.message.get_payload_str(), "hello subscriber");
729 }
730
731 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
732 async fn test_memory_request_reply_mode() {
733 let topic = format!("mem_rr_topic_{}", fast_uuid_v7::gen_id_str());
734 let input_endpoint = Endpoint::new_memory(&topic, 10);
735 let output_endpoint = Endpoint::new_response();
736 let handler = |mut msg: CanonicalMessage| async move {
737 let request_payload = msg.get_payload_str();
738 let response_payload = format!("reply to {}", request_payload);
739 msg.set_payload_str(response_payload);
740 Ok(Handled::Publish(msg))
741 };
742
743 let route = Route::new(input_endpoint, output_endpoint).with_handler(handler);
744 route.deploy("mem_rr_test").await.unwrap();
745
746 let publisher = MemoryPublisher::new(&MemoryConfig {
748 topic: topic.clone(),
749 capacity: Some(10),
750 request_reply: true,
751 request_timeout_ms: Some(2000),
752 ..Default::default()
753 })
754 .unwrap();
755
756 let result = publisher.send("direct request".into()).await.unwrap();
757
758 if let Sent::Response(response_msg) = result {
759 assert_eq!(response_msg.get_payload_str(), "reply to direct request");
760 } else {
761 panic!("Expected Sent::Response, got {:?}", result);
762 }
763
764 Route::stop("mem_rr_test").await;
766 }
767
768 #[tokio::test]
769 async fn test_memory_nack_requeue() {
770 let topic = format!("test_nack_requeue_{}", fast_uuid_v7::gen_id_str());
771 let config = MemoryConfig {
772 topic: topic.clone(),
773 capacity: Some(10),
774 enable_nack: true,
775 ..Default::default()
776 };
777 let mut consumer = MemoryConsumer::new(&config).unwrap();
778 let publisher = MemoryPublisher::new_local(&topic, 10);
779
780 publisher.send("to_be_nacked".into()).await.unwrap();
781
782 let received1 = consumer.receive().await.unwrap();
784 assert_eq!(received1.message.get_payload_str(), "to_be_nacked");
785 (received1.commit)(crate::traits::MessageDisposition::Nack)
786 .await
787 .unwrap();
788
789 let received2 = tokio::time::timeout(std::time::Duration::from_secs(1), consumer.receive())
791 .await
792 .expect("Timed out waiting for re-queued message")
793 .unwrap();
794 assert_eq!(received2.message.get_payload_str(), "to_be_nacked");
795
796 (received2.commit)(crate::traits::MessageDisposition::Ack)
798 .await
799 .unwrap();
800
801 let result =
803 tokio::time::timeout(std::time::Duration::from_millis(100), consumer.receive()).await;
804 assert!(result.is_err(), "Channel should be empty");
805 }
806
807 #[tokio::test]
808 async fn test_memory_event_store_integration() {
809 let topic = "event_store_test";
810 let pub_config = MemoryConfig {
812 topic: topic.to_string(),
813 subscribe_mode: true,
814 ..Default::default()
815 };
816 let publisher = MemoryPublisher::new(&pub_config).unwrap();
817
818 let mut sub1 = MemorySubscriber::new(&pub_config, "sub1").unwrap();
820 let mut sub2 = MemorySubscriber::new(&pub_config, "sub2").unwrap();
822
823 publisher.send("event1".into()).await.unwrap();
824
825 let msg1 = sub1.receive().await.unwrap();
826 assert_eq!(msg1.message.get_payload_str(), "event1");
827 (msg1.commit)(MessageDisposition::Ack).await.unwrap();
828
829 let msg2 = sub2.receive().await.unwrap();
830 assert_eq!(msg2.message.get_payload_str(), "event1");
831 }
832
833 #[tokio::test]
834 async fn test_memory_no_subscribers_persistence() {
835 let topic = format!("no_subs_{}", fast_uuid_v7::gen_id_str());
836 let pub_config = MemoryConfig {
837 topic: topic.clone(),
838 subscribe_mode: true,
839 ..Default::default()
840 };
841
842 let publisher = MemoryPublisher::new(&pub_config).unwrap();
844
845 publisher.send("msg1".into()).await.unwrap();
847 publisher.send("msg2".into()).await.unwrap();
848
849 let sub_config = MemoryConfig {
851 topic: topic.clone(),
852 subscribe_mode: true,
853 ..Default::default()
854 };
855 let mut subscriber = MemorySubscriber::new(&sub_config, "late_sub").unwrap();
856
857 let received1 = subscriber.receive().await.unwrap();
859 assert_eq!(received1.message.get_payload_str(), "msg1");
860 (received1.commit)(MessageDisposition::Ack).await.unwrap();
861
862 let received2 = subscriber.receive().await.unwrap();
863 assert_eq!(received2.message.get_payload_str(), "msg2");
864 (received2.commit)(MessageDisposition::Ack).await.unwrap();
865 }
866
867 #[tokio::test]
868 async fn test_memory_mixed_mode_error() {
869 let topic_q = format!("mixed_q_{}", fast_uuid_v7::gen_id_str());
870 let topic_l = format!("mixed_l_{}", fast_uuid_v7::gen_id_str());
871
872 let _pub_q = MemoryPublisher::new_local(&topic_q, 10); let log_conf = MemoryConfig {
876 topic: topic_q.clone(),
877 subscribe_mode: true,
878 ..Default::default()
879 };
880 let err = MemoryConsumer::new(&log_conf);
881 assert!(err.is_err());
882 assert!(err
883 .unwrap_err()
884 .to_string()
885 .contains("already active as a Queue"));
886
887 let log_pub_conf = MemoryConfig {
889 topic: topic_l.clone(),
890 subscribe_mode: true,
891 ..Default::default()
892 };
893 let _pub_l = MemoryPublisher::new(&log_pub_conf).unwrap(); let queue_conf = MemoryConfig {
896 topic: topic_l.clone(),
897 subscribe_mode: false,
898 ..Default::default()
899 };
900 let err = MemoryConsumer::new(&queue_conf);
901 assert!(err.is_err());
902 assert!(err
903 .unwrap_err()
904 .to_string()
905 .contains("already active as a Subscriber Log"));
906 }
907
908 #[tokio::test]
909 async fn test_memory_publisher_mixed_mode_error() {
910 let topic_q = format!("pub_mixed_q_{}", fast_uuid_v7::gen_id_str());
911
912 let _cons_q = MemoryConsumer::new_local(&topic_q, 10);
914
915 let log_conf = MemoryConfig {
917 topic: topic_q.clone(),
918 subscribe_mode: true,
919 ..Default::default()
920 };
921 let err = MemoryPublisher::new(&log_conf);
922 assert!(err.is_err());
923 assert!(err
924 .unwrap_err()
925 .to_string()
926 .contains("already active as a Queue"));
927 }
928
929 #[tokio::test]
930 async fn test_memory_publisher_adaptive_behavior() {
931 let topic = format!("adaptive_{}", fast_uuid_v7::gen_id_str());
932
933 let sub_config = MemoryConfig {
935 topic: topic.clone(),
936 subscribe_mode: true,
937 ..Default::default()
938 };
939 let mut subscriber = MemorySubscriber::new(&sub_config, "sub1").unwrap();
940
941 let pub_config = MemoryConfig {
943 topic: topic.clone(),
944 subscribe_mode: false, ..Default::default()
946 };
947 let publisher = MemoryPublisher::new(&pub_config).unwrap();
949
950 publisher.send("adaptive_msg".into()).await.unwrap();
952
953 let received = subscriber.receive().await.unwrap();
954 assert_eq!(received.message.get_payload_str(), "adaptive_msg");
955 }
956}