Skip to main content

appletheia_application/outbox/
default_outbox_relay.rs

1use std::marker::PhantomData;
2use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
3use std::time::Duration as StdDuration;
4
5use chrono::Duration;
6use tokio::time::sleep;
7
8use crate::messaging::{PublishResult, Publisher, Topic};
9use crate::unit_of_work::UnitOfWork;
10use crate::unit_of_work::UnitOfWorkFactory;
11
12use super::{
13    Outbox, OutboxFetcher, OutboxRelay, OutboxRelayConfig, OutboxRelayError, OutboxRelayRunReport,
14    OutboxState, OutboxWriter, ProcessedOutboxCount,
15};
16
17pub struct DefaultOutboxRelay<UowFactory, O, F, W, T>
18where
19    UowFactory: UnitOfWorkFactory,
20    O: Outbox,
21    F: OutboxFetcher<Uow = UowFactory::Uow, Outbox = O>,
22    W: OutboxWriter<Uow = UowFactory::Uow, Outbox = O>,
23    T: Topic<O::Message> + Sync,
24{
25    config: OutboxRelayConfig,
26    topic: T,
27    fetcher: F,
28    writer: W,
29    uow_factory: UowFactory,
30    stop_requested: AtomicBool,
31    _marker: PhantomData<fn() -> O>,
32}
33
34impl<UowFactory, O, F, W, T> DefaultOutboxRelay<UowFactory, O, F, W, T>
35where
36    UowFactory: UnitOfWorkFactory,
37    O: Outbox,
38    F: OutboxFetcher<Uow = UowFactory::Uow, Outbox = O>,
39    W: OutboxWriter<Uow = UowFactory::Uow, Outbox = O>,
40    T: Topic<O::Message> + Sync,
41{
42    pub fn new(
43        config: OutboxRelayConfig,
44        topic: T,
45        fetcher: F,
46        writer: W,
47        uow_factory: UowFactory,
48    ) -> Self {
49        Self {
50            config,
51            topic,
52            fetcher,
53            writer,
54            uow_factory,
55            stop_requested: AtomicBool::new(false),
56            _marker: PhantomData,
57        }
58    }
59}
60
61impl<UowFactory, O, F, W, T> OutboxRelay for DefaultOutboxRelay<UowFactory, O, F, W, T>
62where
63    UowFactory: UnitOfWorkFactory,
64    O: Outbox,
65    F: OutboxFetcher<Uow = UowFactory::Uow, Outbox = O>,
66    W: OutboxWriter<Uow = UowFactory::Uow, Outbox = O>,
67    T: Topic<O::Message> + Sync,
68{
69    type Outbox = O;
70
71    fn is_stop_requested(&self) -> bool {
72        self.stop_requested.load(AtomicOrdering::SeqCst)
73    }
74
75    fn request_graceful_stop(&mut self) {
76        self.stop_requested.store(true, AtomicOrdering::SeqCst);
77    }
78
79    async fn run_forever(&self) -> Result<(), OutboxRelayError> {
80        let polling_options = &self.config.polling_options;
81        let mut poll_interval = polling_options.base;
82
83        while !self.is_stop_requested() {
84            let run_report = self.run_once().await?;
85
86            match run_report {
87                OutboxRelayRunReport::Progress { .. } => {
88                    poll_interval = polling_options.base;
89                }
90                OutboxRelayRunReport::Idle | OutboxRelayRunReport::Throttled => {
91                    let duration: Duration = poll_interval.into();
92                    let sleep_duration = duration
93                        .to_std()
94                        .unwrap_or_else(|_| StdDuration::from_secs(0));
95
96                    if sleep_duration > StdDuration::from_secs(0) {
97                        sleep(sleep_duration).await;
98                    }
99
100                    poll_interval = poll_interval.next(
101                        polling_options.multiplier,
102                        polling_options.jitter,
103                        polling_options.max,
104                    );
105                }
106            }
107        }
108
109        Ok(())
110    }
111
112    async fn run_once(&self) -> Result<OutboxRelayRunReport, OutboxRelayError> {
113        let relay_instance = &self.config.instance;
114        let lease_duration = self.config.lease_duration;
115        let batch_size = self.config.batch_size;
116        let retry_options = self.config.retry_options;
117
118        let mut uow = self.uow_factory.begin().await?;
119        let outboxes = self.fetcher.fetch(&mut uow, batch_size).await;
120        let mut outboxes = match outboxes {
121            Ok(mut outboxes) => {
122                if outboxes.is_empty() {
123                    uow.commit().await?;
124                    return Ok(OutboxRelayRunReport::Idle);
125                }
126
127                for outbox in &mut outboxes {
128                    match outbox.state() {
129                        OutboxState::Pending { .. } => {
130                            outbox.acquire_lease(relay_instance, lease_duration)?;
131                        }
132                        other => {
133                            return Err(uow
134                                .rollback_with_operation_error(
135                                    OutboxRelayError::NonPendingOutboxState(other.clone()),
136                                )
137                                .await?);
138                        }
139                    }
140                }
141
142                if let Err(operation_error) = self.writer.write_outbox(&mut uow, &outboxes).await {
143                    return Err(uow
144                        .rollback_with_operation_error(OutboxRelayError::Writer(operation_error))
145                        .await?);
146                }
147
148                uow.commit().await?;
149                outboxes
150            }
151            Err(operation_error) => {
152                return Err(uow
153                    .rollback_with_operation_error(OutboxRelayError::Fetcher(operation_error))
154                    .await?);
155            }
156        };
157
158        let publisher = self.topic.new_publisher();
159        let publish_results = publisher
160            .publish(outboxes.iter().map(Outbox::message))
161            .await?;
162
163        for publish_result in publish_results {
164            match publish_result {
165                PublishResult::Success { input_index, .. } => {
166                    outboxes[input_index].ack()?;
167                }
168                PublishResult::Failed { input_index, cause } => {
169                    outboxes[input_index].nack(&cause, &retry_options)?;
170                }
171            }
172        }
173
174        let processed_outbox_count = ProcessedOutboxCount::from_usize_saturating(outboxes.len());
175
176        let mut uow = self.uow_factory.begin().await?;
177        let write_result = self.writer.write_outbox(&mut uow, &outboxes).await;
178        match write_result {
179            Ok(()) => {
180                uow.commit().await?;
181            }
182            Err(operation_error) => {
183                return Err(uow
184                    .rollback_with_operation_error(OutboxRelayError::Writer(operation_error))
185                    .await?);
186            }
187        }
188
189        Ok(OutboxRelayRunReport::Progress {
190            processed_outbox_count,
191        })
192    }
193}