appletheia_application/command/
default_command_worker.rs1use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
2
3use crate::command::{Command, CommandDispatcher, CommandHandler, CommandSelector, CommandWorker};
4use crate::outbox::command::{CommandEnvelope, CommandEnvelopeError};
5use crate::request_context::RequestContext;
6use crate::{Consumer, ConsumerGroup, Delivery, Topic};
7
8use super::CommandWorkerError;
9
10pub struct DefaultCommandWorker<H, D, T> {
11 dispatcher: D,
12 handler: H,
13 topic: T,
14 consumer_group: ConsumerGroup,
15 stop_requested: AtomicBool,
16}
17
18impl<H, D, T> DefaultCommandWorker<H, D, T> {
19 pub fn new(dispatcher: D, handler: H, topic: T, consumer_group: ConsumerGroup) -> Self {
20 Self {
21 dispatcher,
22 handler,
23 topic,
24 consumer_group,
25 stop_requested: AtomicBool::new(false),
26 }
27 }
28}
29
30impl<H, D, T> CommandWorker for DefaultCommandWorker<H, D, T>
31where
32 H: CommandHandler,
33 H::Command: Command,
34 D: CommandDispatcher<Uow = H::Uow>,
35 T: Topic<CommandEnvelope, Selector = CommandSelector>,
36 T::Consumer: Consumer<CommandEnvelope>,
37 <T::Consumer as Consumer<CommandEnvelope>>::Delivery: Delivery<CommandEnvelope>,
38{
39 fn is_stop_requested(&self) -> bool {
40 self.stop_requested.load(AtomicOrdering::SeqCst)
41 }
42
43 fn request_graceful_stop(&mut self) {
44 self.stop_requested.store(true, AtomicOrdering::SeqCst);
45 }
46
47 async fn run_forever(&mut self) -> Result<(), CommandWorkerError> {
48 let selectors = [CommandSelector::new(H::Command::NAME)];
49
50 let mut consumer = self
51 .topic
52 .subscribe(&self.consumer_group, &selectors)
53 .await?;
54
55 while !self.is_stop_requested() {
56 let mut delivery = consumer.next().await?;
57 let envelope = delivery.message();
58
59 let command = match envelope.try_into_command::<H::Command>() {
60 Ok(command) => command,
61 Err(CommandEnvelopeError::CommandNameMismatch { .. }) => {
62 delivery.ack().await?;
63 continue;
64 }
65 Err(error) => {
66 delivery.nack().await?;
67 return Err(error.into());
68 }
69 };
70
71 let request_context = RequestContext {
72 correlation_id: envelope.correlation_id,
73 message_id: envelope.message_id,
74 };
75
76 let result = self
77 .dispatcher
78 .dispatch(&self.handler, &request_context, command)
79 .await;
80
81 match result {
82 Ok(_) => delivery.ack().await?,
83 Err(error) => {
84 delivery.nack().await?;
85 return Err(CommandWorkerError::Dispatch(Box::new(error)));
86 }
87 }
88 }
89
90 Ok(())
91 }
92}