appletheia_application/command/
default_command_worker.rs1use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
2
3use crate::command::{Command, CommandDispatcher, CommandHandler, CommandSelector, CommandWorker};
4use crate::messaging::Subscription;
5use crate::outbox::command::{CommandEnvelope, CommandEnvelopeError};
6use crate::request_context::RequestContext;
7use crate::{Consumer, ConsumerGroup, Delivery, Topic};
8
9use super::CommandWorkerError;
10
11pub struct DefaultCommandWorker<H, D, T> {
12 dispatcher: D,
13 handler: H,
14 topic: T,
15 consumer_group: ConsumerGroup,
16 stop_requested: AtomicBool,
17}
18
19impl<H, D, T> DefaultCommandWorker<H, D, T> {
20 pub fn new(dispatcher: D, handler: H, topic: T, consumer_group: ConsumerGroup) -> Self {
21 Self {
22 dispatcher,
23 handler,
24 topic,
25 consumer_group,
26 stop_requested: AtomicBool::new(false),
27 }
28 }
29}
30
31impl<H, D, T> CommandWorker for DefaultCommandWorker<H, D, T>
32where
33 H: CommandHandler,
34 H::Command: Command,
35 D: CommandDispatcher<Uow = H::Uow>,
36 T: Topic<CommandEnvelope, Selector = CommandSelector>,
37 T::Consumer: Consumer<CommandEnvelope>,
38 <T::Consumer as Consumer<CommandEnvelope>>::Delivery: Delivery<CommandEnvelope>,
39{
40 fn is_stop_requested(&self) -> bool {
41 self.stop_requested.load(AtomicOrdering::SeqCst)
42 }
43
44 fn request_graceful_stop(&mut self) {
45 self.stop_requested.store(true, AtomicOrdering::SeqCst);
46 }
47
48 async fn run_forever(&mut self) -> Result<(), CommandWorkerError> {
49 let selectors = [CommandSelector::new(H::Command::NAME)];
50
51 let mut consumer = self
52 .topic
53 .subscribe(&self.consumer_group, Subscription::Only(&selectors))
54 .await?;
55
56 while !self.is_stop_requested() {
57 let mut delivery = consumer.next().await?;
58 let envelope = delivery.message();
59
60 let command = match envelope.try_into_command::<H::Command>() {
61 Ok(command) => command,
62 Err(CommandEnvelopeError::CommandNameMismatch { .. }) => {
63 delivery.ack().await?;
64 continue;
65 }
66 Err(error) => {
67 delivery.nack().await?;
68 return Err(error.into());
69 }
70 };
71
72 let request_context = RequestContext {
73 correlation_id: envelope.correlation_id,
74 message_id: envelope.message_id,
75 };
76
77 let result = self
78 .dispatcher
79 .dispatch(&self.handler, &request_context, command)
80 .await;
81
82 match result {
83 Ok(_) => delivery.ack().await?,
84 Err(error) => {
85 delivery.nack().await?;
86 return Err(CommandWorkerError::Dispatch(Box::new(error)));
87 }
88 }
89 }
90
91 Ok(())
92 }
93}