Skip to main content

appletheia_application/command/
default_command_worker.rs

1use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
2
3use crate::command::{Command, CommandDispatcher, CommandHandler, CommandSelector, CommandWorker};
4use crate::messaging::Subscription;
5use crate::outbox::command::{CommandEnvelope, CommandEnvelopeError};
6use crate::request_context::RequestContext;
7use crate::{Consumer, ConsumerGroup, Delivery, Topic};
8
9use super::CommandWorkerError;
10
11pub struct DefaultCommandWorker<H, D, T> {
12    dispatcher: D,
13    handler: H,
14    topic: T,
15    consumer_group: ConsumerGroup,
16    stop_requested: AtomicBool,
17}
18
19impl<H, D, T> DefaultCommandWorker<H, D, T> {
20    pub fn new(dispatcher: D, handler: H, topic: T, consumer_group: ConsumerGroup) -> Self {
21        Self {
22            dispatcher,
23            handler,
24            topic,
25            consumer_group,
26            stop_requested: AtomicBool::new(false),
27        }
28    }
29}
30
31impl<H, D, T> CommandWorker for DefaultCommandWorker<H, D, T>
32where
33    H: CommandHandler,
34    H::Command: Command,
35    D: CommandDispatcher<Uow = H::Uow>,
36    T: Topic<CommandEnvelope, Selector = CommandSelector>,
37    T::Consumer: Consumer<CommandEnvelope>,
38    <T::Consumer as Consumer<CommandEnvelope>>::Delivery: Delivery<CommandEnvelope>,
39{
40    fn is_stop_requested(&self) -> bool {
41        self.stop_requested.load(AtomicOrdering::SeqCst)
42    }
43
44    fn request_graceful_stop(&mut self) {
45        self.stop_requested.store(true, AtomicOrdering::SeqCst);
46    }
47
48    async fn run_forever(&mut self) -> Result<(), CommandWorkerError> {
49        let selectors = [CommandSelector::new(H::Command::NAME)];
50
51        let mut consumer = self
52            .topic
53            .subscribe(&self.consumer_group, Subscription::Only(&selectors))
54            .await?;
55
56        while !self.is_stop_requested() {
57            let mut delivery = consumer.next().await?;
58            let envelope = delivery.message();
59
60            let command = match envelope.try_into_command::<H::Command>() {
61                Ok(command) => command,
62                Err(CommandEnvelopeError::CommandNameMismatch { .. }) => {
63                    delivery.ack().await?;
64                    continue;
65                }
66                Err(error) => {
67                    delivery.nack().await?;
68                    return Err(error.into());
69                }
70            };
71
72            let request_context = RequestContext {
73                correlation_id: envelope.correlation_id,
74                message_id: envelope.message_id,
75            };
76
77            let result = self
78                .dispatcher
79                .dispatch(&self.handler, &request_context, command)
80                .await;
81
82            match result {
83                Ok(_) => delivery.ack().await?,
84                Err(error) => {
85                    delivery.nack().await?;
86                    return Err(CommandWorkerError::Dispatch(Box::new(error)));
87                }
88            }
89        }
90
91        Ok(())
92    }
93}