appletheia_application/outbox/
default_outbox_relay.rs1use std::marker::PhantomData;
2use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
3use std::time::Duration as StdDuration;
4
5use chrono::Duration;
6use tokio::time::sleep;
7
8use crate::messaging::{PublishResult, Publisher, Topic};
9use crate::unit_of_work::UnitOfWork;
10use crate::unit_of_work::UnitOfWorkFactory;
11
12use super::{
13 Outbox, OutboxFetcher, OutboxRelay, OutboxRelayConfig, OutboxRelayError, OutboxRelayRunReport,
14 OutboxState, OutboxWriter, ProcessedOutboxCount,
15};
16
17pub struct DefaultOutboxRelay<UowFactory, O, F, W, T>
18where
19 UowFactory: UnitOfWorkFactory,
20 O: Outbox,
21 F: OutboxFetcher<Uow = UowFactory::Uow, Outbox = O>,
22 W: OutboxWriter<Uow = UowFactory::Uow, Outbox = O>,
23 T: Topic<O::Message> + Sync,
24{
25 config: OutboxRelayConfig,
26 topic: T,
27 fetcher: F,
28 writer: W,
29 uow_factory: UowFactory,
30 stop_requested: AtomicBool,
31 _marker: PhantomData<fn() -> O>,
32}
33
34impl<UowFactory, O, F, W, T> DefaultOutboxRelay<UowFactory, O, F, W, T>
35where
36 UowFactory: UnitOfWorkFactory,
37 O: Outbox,
38 F: OutboxFetcher<Uow = UowFactory::Uow, Outbox = O>,
39 W: OutboxWriter<Uow = UowFactory::Uow, Outbox = O>,
40 T: Topic<O::Message> + Sync,
41{
42 pub fn new(
43 config: OutboxRelayConfig,
44 topic: T,
45 fetcher: F,
46 writer: W,
47 uow_factory: UowFactory,
48 ) -> Self {
49 Self {
50 config,
51 topic,
52 fetcher,
53 writer,
54 uow_factory,
55 stop_requested: AtomicBool::new(false),
56 _marker: PhantomData,
57 }
58 }
59}
60
61impl<UowFactory, O, F, W, T> OutboxRelay for DefaultOutboxRelay<UowFactory, O, F, W, T>
62where
63 UowFactory: UnitOfWorkFactory,
64 O: Outbox,
65 F: OutboxFetcher<Uow = UowFactory::Uow, Outbox = O>,
66 W: OutboxWriter<Uow = UowFactory::Uow, Outbox = O>,
67 T: Topic<O::Message> + Sync,
68{
69 type Outbox = O;
70
71 fn is_stop_requested(&self) -> bool {
72 self.stop_requested.load(AtomicOrdering::SeqCst)
73 }
74
75 fn request_graceful_stop(&mut self) {
76 self.stop_requested.store(true, AtomicOrdering::SeqCst);
77 }
78
79 async fn run_forever(&self) -> Result<(), OutboxRelayError> {
80 let polling_options = &self.config.polling_options;
81 let mut poll_interval = polling_options.base;
82
83 while !self.is_stop_requested() {
84 let run_report = self.run_once().await?;
85
86 match run_report {
87 OutboxRelayRunReport::Progress { .. } => {
88 poll_interval = polling_options.base;
89 }
90 OutboxRelayRunReport::Idle | OutboxRelayRunReport::Throttled => {
91 let duration: Duration = poll_interval.into();
92 let sleep_duration = duration
93 .to_std()
94 .unwrap_or_else(|_| StdDuration::from_secs(0));
95
96 if sleep_duration > StdDuration::from_secs(0) {
97 sleep(sleep_duration).await;
98 }
99
100 poll_interval = poll_interval.next(
101 polling_options.multiplier,
102 polling_options.jitter,
103 polling_options.max,
104 );
105 }
106 }
107 }
108
109 Ok(())
110 }
111
112 async fn run_once(&self) -> Result<OutboxRelayRunReport, OutboxRelayError> {
113 let relay_instance = &self.config.instance;
114 let lease_duration = self.config.lease_duration;
115 let batch_size = self.config.batch_size;
116 let retry_options = self.config.retry_options;
117
118 let mut uow = self.uow_factory.begin().await?;
119 let outboxes = self.fetcher.fetch(&mut uow, batch_size).await;
120 let mut outboxes = match outboxes {
121 Ok(mut outboxes) => {
122 if outboxes.is_empty() {
123 uow.commit().await?;
124 return Ok(OutboxRelayRunReport::Idle);
125 }
126
127 for outbox in &mut outboxes {
128 match outbox.state() {
129 OutboxState::Pending { .. } => {
130 outbox.acquire_lease(relay_instance, lease_duration)?;
131 }
132 other => {
133 return Err(uow
134 .rollback_with_operation_error(
135 OutboxRelayError::NonPendingOutboxState(other.clone()),
136 )
137 .await?);
138 }
139 }
140 }
141
142 if let Err(operation_error) = self.writer.write_outbox(&mut uow, &outboxes).await {
143 return Err(uow
144 .rollback_with_operation_error(OutboxRelayError::Writer(operation_error))
145 .await?);
146 }
147
148 uow.commit().await?;
149 outboxes
150 }
151 Err(operation_error) => {
152 return Err(uow
153 .rollback_with_operation_error(OutboxRelayError::Fetcher(operation_error))
154 .await?);
155 }
156 };
157
158 let publisher = self.topic.new_publisher();
159 let publish_results = publisher
160 .publish(outboxes.iter().map(Outbox::message))
161 .await?;
162
163 for publish_result in publish_results {
164 match publish_result {
165 PublishResult::Success { input_index, .. } => {
166 outboxes[input_index].ack()?;
167 }
168 PublishResult::Failed { input_index, cause } => {
169 outboxes[input_index].nack(&cause, &retry_options)?;
170 }
171 }
172 }
173
174 let processed_outbox_count = ProcessedOutboxCount::from_usize_saturating(outboxes.len());
175
176 let mut uow = self.uow_factory.begin().await?;
177 let write_result = self.writer.write_outbox(&mut uow, &outboxes).await;
178 match write_result {
179 Ok(()) => {
180 uow.commit().await?;
181 }
182 Err(operation_error) => {
183 return Err(uow
184 .rollback_with_operation_error(OutboxRelayError::Writer(operation_error))
185 .await?);
186 }
187 }
188
189 Ok(OutboxRelayRunReport::Progress {
190 processed_outbox_count,
191 })
192 }
193}