Skip to main content

appletheia_application/command/
default_command_worker.rs

1use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
2
3use crate::command::{Command, CommandDispatcher, CommandHandler, CommandSelector, CommandWorker};
4use crate::outbox::command::{CommandEnvelope, CommandEnvelopeError};
5use crate::request_context::RequestContext;
6use crate::{Consumer, ConsumerGroup, Delivery, Topic};
7
8use super::CommandWorkerError;
9
10pub struct DefaultCommandWorker<H, D, T> {
11    dispatcher: D,
12    handler: H,
13    topic: T,
14    consumer_group: ConsumerGroup,
15    stop_requested: AtomicBool,
16}
17
18impl<H, D, T> DefaultCommandWorker<H, D, T> {
19    pub fn new(dispatcher: D, handler: H, topic: T, consumer_group: ConsumerGroup) -> Self {
20        Self {
21            dispatcher,
22            handler,
23            topic,
24            consumer_group,
25            stop_requested: AtomicBool::new(false),
26        }
27    }
28}
29
30impl<H, D, T> CommandWorker for DefaultCommandWorker<H, D, T>
31where
32    H: CommandHandler,
33    H::Command: Command,
34    D: CommandDispatcher<Uow = H::Uow>,
35    T: Topic<CommandEnvelope, Selector = CommandSelector>,
36    T::Consumer: Consumer<CommandEnvelope>,
37    <T::Consumer as Consumer<CommandEnvelope>>::Delivery: Delivery<CommandEnvelope>,
38{
39    fn is_stop_requested(&self) -> bool {
40        self.stop_requested.load(AtomicOrdering::SeqCst)
41    }
42
43    fn request_graceful_stop(&mut self) {
44        self.stop_requested.store(true, AtomicOrdering::SeqCst);
45    }
46
47    async fn run_forever(&mut self) -> Result<(), CommandWorkerError> {
48        let selectors = [CommandSelector::new(H::Command::NAME)];
49
50        let mut consumer = self
51            .topic
52            .subscribe(&self.consumer_group, &selectors)
53            .await?;
54
55        while !self.is_stop_requested() {
56            let mut delivery = consumer.next().await?;
57            let envelope = delivery.message();
58
59            let command = match envelope.try_into_command::<H::Command>() {
60                Ok(command) => command,
61                Err(CommandEnvelopeError::CommandNameMismatch { .. }) => {
62                    delivery.ack().await?;
63                    continue;
64                }
65                Err(error) => {
66                    delivery.nack().await?;
67                    return Err(error.into());
68                }
69            };
70
71            let request_context = RequestContext {
72                correlation_id: envelope.correlation_id,
73                message_id: envelope.message_id,
74            };
75
76            let result = self
77                .dispatcher
78                .dispatch(&self.handler, &request_context, command)
79                .await;
80
81            match result {
82                Ok(_) => delivery.ack().await?,
83                Err(error) => {
84                    delivery.nack().await?;
85                    return Err(CommandWorkerError::Dispatch(Box::new(error)));
86                }
87            }
88        }
89
90        Ok(())
91    }
92}