1use crate::config::OutboxConfig;
2use crate::error::OutboxError;
3use crate::gc::GarbageCollector;
4use crate::processor::OutboxProcessor;
5use crate::publisher::Transport;
6use crate::storage::OutboxStorage;
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::sync::watch::Receiver;
10use tracing::{debug, error, info, trace};
11
12pub struct OutboxManager<S, P> {
13 storage: Arc<S>,
14 publisher: Arc<P>,
15 config: Arc<OutboxConfig>,
16 shutdown_rx: Receiver<bool>,
17}
18
19impl<S, P> OutboxManager<S, P>
20where
21 S: OutboxStorage + Send + Sync + 'static,
22 P: Transport + Send + Sync + 'static,
23{
24 pub fn new(
25 storage: Arc<S>,
26 publisher: Arc<P>,
27 config: Arc<OutboxConfig>,
28 shutdown_rx: Receiver<bool>,
29 ) -> Self {
30 Self {
31 storage,
32 publisher,
33 config,
34 shutdown_rx,
35 }
36 }
37
38 pub async fn run(self) -> Result<(), OutboxError> {
48 let storage_for_listen = self.storage.clone();
49 let processor = OutboxProcessor::new(
50 self.storage.clone(),
51 self.publisher.clone(),
52 self.config.clone(),
53 );
54
55 let gc = GarbageCollector::new(self.storage.clone());
56 let mut rx_gc = self.shutdown_rx.clone();
57 let gc_interval_secs = self.config.gc_interval_secs;
58 tokio::spawn(async move {
59 let mut interval = tokio::time::interval(Duration::from_secs(gc_interval_secs));
60 loop {
61 tokio::select! {
62 _ = interval.tick() => { let _ = gc.collect_garbage().await; }
63 _ = rx_gc.changed() => {
64 if *rx_gc.borrow() {
65 break
66 }
67 },
68 }
69 }
70 });
71
72 let mut rx_listen = self.shutdown_rx.clone();
73 let poll_interval = self.config.poll_interval_secs;
74 let mut interval = tokio::time::interval(Duration::from_secs(poll_interval));
75 info!("Outbox worker loop started");
76
77 loop {
78 tokio::select! {
79 signal = storage_for_listen.wait_for_notification("outbox_event") => {
80 if let Err(e) = signal {
81 error!("Listen error: {}", e);
82 tokio::time::sleep(Duration::from_secs(5)).await;
83 continue;
84 }
85 }
86 _ = interval.tick() => {
87 trace!("Checking for stale or pending events via interval");
88 }
89 _ = rx_listen.changed() => {
90 if *rx_listen.borrow() {
91 break
92 }
93 }
94 }
95 loop {
96 if *rx_listen.borrow() {
97 return Ok(());
98 }
99 match processor.process_pending_events().await {
100 Ok(0) => break,
101 Ok(count) => debug!("Processed {} events", count),
102 Err(e) => {
103 error!("Processing error: {}", e);
104 tokio::time::sleep(Duration::from_secs(1)).await;
105 break;
106 }
107 }
108 }
109 }
110 Ok(())
111 }
112}
113
114#[cfg(test)]
115#[allow(clippy::unwrap_used)]
116mod tests {
117 use crate::config::{IdempotencyStrategy, OutboxConfig};
118 use crate::manager::OutboxManager;
119 use crate::model::{Event, EventStatus};
120 use crate::object::{EventType, Payload};
121 use crate::publisher::MockTransport;
122 use crate::storage::MockOutboxStorage;
123 use mockall::Sequence;
124 use rstest::rstest;
125 use serde_json::json;
126 use std::sync::Arc;
127 use tokio::sync::watch;
128
129 #[rstest]
130 #[tokio::test]
131 async fn test_event_send_success() {
132 let config = OutboxConfig {
133 batch_size: 100,
134 retention_days: 7,
135 gc_interval_secs: 3600,
136 poll_interval_secs: 5,
137 lock_timeout_mins: 5,
138 idempotency_strategy: IdempotencyStrategy::None,
139 };
140
141 let mut storage_mock = MockOutboxStorage::new();
142 let mut transport_mock = MockTransport::new();
143
144 let (shutdown_tx, shutdown_rx) = watch::channel(false);
145
146 storage_mock
147 .expect_wait_for_notification()
148 .returning(|_| Ok(()));
149
150 storage_mock
151 .expect_fetch_next_to_process()
152 .withf(move |l| l == &config.batch_size)
153 .times(1)
154 .returning(move |_| {
155 let _ = shutdown_tx.send(true);
156 Ok(vec![
157 Event::new(
158 EventType::new("1"),
159 Payload::new(json!({"some": "some1"})),
160 None,
161 ),
162 Event::new(
163 EventType::new("2"),
164 Payload::new(json!({"some": "some2"})),
165 None,
166 ),
167 Event::new(
168 EventType::new("3"),
169 Payload::new(json!({"some": "some3"})),
170 None,
171 ),
172 Event::new(
173 EventType::new("4"),
174 Payload::new(json!({"some": "some4"})),
175 None,
176 ),
177 ])
178 });
179
180 storage_mock
181 .expect_fetch_next_to_process()
182 .withf(move |l| l == &config.batch_size)
183 .returning(move |_| Ok(vec![]));
184
185 storage_mock
186 .expect_updates_status()
187 .withf(|ids, s| ids.len() == 4 && s == &EventStatus::Sent)
188 .returning(|_, _| Ok(()));
189
190 let mut seq = Sequence::new();
191
192 for i in 1..=4 {
193 let expected_type = i.to_string();
194 let expected_val = json!(format!("some{}", i));
195
196 transport_mock
197 .expect_publish()
198 .withf(move |event| {
199 let type_matches = event.event_type.as_str() == expected_type;
200 let payload_matches = event.payload.as_json()["some"] == expected_val;
201 type_matches && payload_matches
202 })
203 .times(1)
204 .in_sequence(&mut seq)
205 .returning(|_| Ok(()));
206 }
207
208 let manager = OutboxManager::new(
209 Arc::new(storage_mock),
210 Arc::new(transport_mock),
211 Arc::new(config),
212 shutdown_rx,
213 );
214
215 let handle = tokio::spawn(async move {
216 manager.run().await.unwrap();
217 });
218
219 tokio::time::timeout(tokio::time::Duration::from_secs(1), handle)
220 .await
221 .expect("Manager did not stop in time")
222 .unwrap();
223 }
224}