1use crate::config::OutboxConfig;
2use crate::error::OutboxError;
3use crate::gc::GarbageCollector;
4use crate::processor::OutboxProcessor;
5use crate::publisher::Transport;
6use crate::storage::OutboxStorage;
7use serde::Serialize;
8use std::fmt::Debug;
9use std::sync::Arc;
10use std::time::Duration;
11use tokio::sync::watch::Receiver;
12use tracing::{debug, error, info, trace};
13
14pub struct OutboxManager<S, P, PT>
15where
16 PT: Debug + Clone + Serialize,
17{
18 storage: Arc<S>,
19 publisher: Arc<P>,
20 config: Arc<OutboxConfig<PT>>,
21 shutdown_rx: Receiver<bool>,
22}
23
24impl<S, P, PT> OutboxManager<S, P, PT>
25where
26 S: OutboxStorage<PT> + Send + Sync + 'static,
27 P: Transport<PT> + Send + Sync + 'static,
28 PT: Debug + Clone + Serialize + Send + Sync + 'static,
29{
30 pub fn new(
31 storage: Arc<S>,
32 publisher: Arc<P>,
33 config: Arc<OutboxConfig<PT>>,
34 shutdown_rx: Receiver<bool>,
35 ) -> Self {
36 Self {
37 storage,
38 publisher,
39 config,
40 shutdown_rx,
41 }
42 }
43
44 pub async fn run(self) -> Result<(), OutboxError> {
54 let storage_for_listen = self.storage.clone();
55 let processor = OutboxProcessor::new(
56 self.storage.clone(),
57 self.publisher.clone(),
58 self.config.clone(),
59 );
60
61 let gc = GarbageCollector::new(self.storage.clone());
62 let mut rx_gc = self.shutdown_rx.clone();
63 let gc_interval_secs = self.config.gc_interval_secs;
64 tokio::spawn(async move {
65 let mut interval = tokio::time::interval(Duration::from_secs(gc_interval_secs));
66 loop {
67 tokio::select! {
68 _ = interval.tick() => { let _ = gc.collect_garbage().await; }
69 _ = rx_gc.changed() => {
70 if rx_gc.has_changed().is_err(){
71 break;
72 }
73 if *rx_gc.borrow() {
74 break
75 }
76 },
77 }
78 }
79 });
80
81 let mut rx_listen = self.shutdown_rx.clone();
82 let poll_interval = self.config.poll_interval_secs;
83 let mut interval = tokio::time::interval(Duration::from_secs(poll_interval));
84 info!("Outbox worker loop started");
85
86 loop {
87 tokio::select! {
88 signal = storage_for_listen.wait_for_notification("outbox_event") => {
89 if let Err(e) = signal {
90 error!("Listen error: {}", e);
91 tokio::time::sleep(Duration::from_secs(5)).await;
92 continue;
93 }
94 }
95 _ = interval.tick() => {
96 trace!("Checking for stale or pending events via interval");
97 }
98 _ = rx_listen.changed() => {
99 if rx_listen.has_changed().is_err(){
100 break;
101 }
102 if *rx_listen.borrow() {
103 break
104 }
105 }
106 }
107 loop {
108 if *rx_listen.borrow() {
109 return Ok(());
110 }
111 match processor.process_pending_events().await {
112 Ok(0) => break,
113 Ok(count) => debug!("Processed {} events", count),
114 Err(e) => {
115 error!("Processing error: {}", e);
116 tokio::time::sleep(Duration::from_secs(1)).await;
117 break;
118 }
119 }
120 }
121 }
122 debug!("Outbox worker loop stopped");
123 Ok(())
124 }
125}
126
127#[cfg(test)]
128#[allow(clippy::unwrap_used)]
129mod tests {
130 use crate::config::{IdempotencyStrategy, OutboxConfig};
131 use crate::manager::OutboxManager;
132 use crate::model::{Event, EventStatus};
133 use crate::object::EventType;
134 use crate::prelude::Payload;
135 use crate::publisher::MockTransport;
136 use crate::storage::MockOutboxStorage;
137 use mockall::Sequence;
138 use rstest::rstest;
139 use serde_json::json;
140 use std::sync::Arc;
141 use tokio::sync::watch;
142
143 #[rstest]
144 #[tokio::test]
145 async fn test_event_send_success() {
146 let config = OutboxConfig {
147 batch_size: 100,
148 retention_days: 7,
149 gc_interval_secs: 3600,
150 poll_interval_secs: 5,
151 lock_timeout_mins: 5,
152 idempotency_strategy: IdempotencyStrategy::None,
153 };
154
155 let mut storage_mock = MockOutboxStorage::new();
156 let mut transport_mock = MockTransport::new();
157
158 let (shutdown_tx, shutdown_rx) = watch::channel(false);
159
160 storage_mock
161 .expect_wait_for_notification()
162 .returning(|_| Ok(()));
163
164 storage_mock
165 .expect_fetch_next_to_process()
166 .withf(move |l| l == &config.batch_size)
167 .times(1)
168 .returning(move |_| {
169 let _ = shutdown_tx.send(true);
170 Ok(vec![
171 Event::new(
172 EventType::new("1"),
173 Payload::new(json!({"some": "some1"})),
174 None,
175 ),
176 Event::new(
177 EventType::new("2"),
178 Payload::new(json!({"some": "some2"})),
179 None,
180 ),
181 Event::new(
182 EventType::new("3"),
183 Payload::new(json!({"some": "some3"})),
184 None,
185 ),
186 Event::new(
187 EventType::new("4"),
188 Payload::new(json!({"some": "some4"})),
189 None,
190 ),
191 ])
192 });
193
194 storage_mock
195 .expect_fetch_next_to_process()
196 .withf(move |l| l == &config.batch_size)
197 .returning(move |_| Ok(vec![]));
198
199 storage_mock
200 .expect_updates_status()
201 .withf(|ids, s| ids.len() == 4 && s == &EventStatus::Sent)
202 .returning(|_, _| Ok(()));
203
204 let mut seq = Sequence::new();
205
206 for i in 1..=4 {
207 let expected_type = i.to_string();
208 let expected_val = json!(format!("some{}", i));
209
210 transport_mock
211 .expect_publish()
212 .withf(move |event| {
213 true
217 })
218 .times(1)
219 .in_sequence(&mut seq)
220 .returning(|_| Ok(()));
221 }
222
223 let manager = OutboxManager::new(
224 Arc::new(storage_mock),
225 Arc::new(transport_mock),
226 Arc::new(config),
227 shutdown_rx,
228 );
229
230 let handle = tokio::spawn(async move {
231 manager.run().await.unwrap();
232 });
233
234 tokio::time::timeout(tokio::time::Duration::from_secs(1), handle)
235 .await
236 .expect("Manager did not stop in time")
237 .unwrap();
238 }
239}