1use crate::config::OutboxConfig;
2use crate::error::OutboxError;
3use crate::gc::GarbageCollector;
4use crate::processor::OutboxProcessor;
5use crate::publisher::Transport;
6use crate::storage::OutboxStorage;
7use serde::Serialize;
8use std::fmt::Debug;
9use std::sync::Arc;
10use std::time::Duration;
11use tokio::sync::watch::Receiver;
12use tracing::{debug, error, info, trace};
13
14pub struct OutboxManager<S, P, PT>
15where
16 PT: Debug + Clone + Serialize,
17{
18 storage: Arc<S>,
19 publisher: Arc<P>,
20 config: Arc<OutboxConfig<PT>>,
21 shutdown_rx: Receiver<bool>,
22}
23
24impl<S, P, PT> OutboxManager<S, P, PT>
25where
26 S: OutboxStorage<PT> + Send + Sync + 'static,
27 P: Transport<PT> + Send + Sync + 'static,
28 PT: Debug + Clone + Serialize + Send + Sync + 'static,
29{
30 pub fn new(
31 storage: Arc<S>,
32 publisher: Arc<P>,
33 config: Arc<OutboxConfig<PT>>,
34 shutdown_rx: Receiver<bool>,
35 ) -> Self {
36 Self {
37 storage,
38 publisher,
39 config,
40 shutdown_rx,
41 }
42 }
43
44 pub async fn run(self) -> Result<(), OutboxError> {
54 let storage_for_listen = self.storage.clone();
55 let processor = OutboxProcessor::new(
56 self.storage.clone(),
57 self.publisher.clone(),
58 self.config.clone(),
59 );
60
61 let gc = GarbageCollector::new(self.storage.clone());
62 let mut rx_gc = self.shutdown_rx.clone();
63 let gc_interval_secs = self.config.gc_interval_secs;
64 tokio::spawn(async move {
65 let mut interval = tokio::time::interval(Duration::from_secs(gc_interval_secs));
66 loop {
67 tokio::select! {
68 _ = interval.tick() => { let _ = gc.collect_garbage().await; }
69 _ = rx_gc.changed() => {
70 if *rx_gc.borrow() {
71 break
72 }
73 },
74 }
75 }
76 });
77
78 let mut rx_listen = self.shutdown_rx.clone();
79 let poll_interval = self.config.poll_interval_secs;
80 let mut interval = tokio::time::interval(Duration::from_secs(poll_interval));
81 info!("Outbox worker loop started");
82
83 loop {
84 tokio::select! {
85 signal = storage_for_listen.wait_for_notification("outbox_event") => {
86 if let Err(e) = signal {
87 error!("Listen error: {}", e);
88 tokio::time::sleep(Duration::from_secs(5)).await;
89 continue;
90 }
91 }
92 _ = interval.tick() => {
93 trace!("Checking for stale or pending events via interval");
94 }
95 _ = rx_listen.changed() => {
96 if *rx_listen.borrow() {
97 break
98 }
99 }
100 }
101 loop {
102 if *rx_listen.borrow() {
103 return Ok(());
104 }
105 match processor.process_pending_events().await {
106 Ok(0) => break,
107 Ok(count) => debug!("Processed {} events", count),
108 Err(e) => {
109 error!("Processing error: {}", e);
110 tokio::time::sleep(Duration::from_secs(1)).await;
111 break;
112 }
113 }
114 }
115 }
116 Ok(())
117 }
118}
119
120#[cfg(test)]
121#[allow(clippy::unwrap_used)]
122mod tests {
123 use crate::config::{IdempotencyStrategy, OutboxConfig};
124 use crate::manager::OutboxManager;
125 use crate::model::{Event, EventStatus};
126 use crate::object::EventType;
127 use crate::prelude::Payload;
128 use crate::publisher::MockTransport;
129 use crate::storage::MockOutboxStorage;
130 use mockall::Sequence;
131 use rstest::rstest;
132 use serde_json::json;
133 use std::sync::Arc;
134 use tokio::sync::watch;
135
136 #[rstest]
137 #[tokio::test]
138 async fn test_event_send_success() {
139 let config = OutboxConfig {
140 batch_size: 100,
141 retention_days: 7,
142 gc_interval_secs: 3600,
143 poll_interval_secs: 5,
144 lock_timeout_mins: 5,
145 idempotency_strategy: IdempotencyStrategy::None,
146 };
147
148 let mut storage_mock = MockOutboxStorage::new();
149 let mut transport_mock = MockTransport::new();
150
151 let (shutdown_tx, shutdown_rx) = watch::channel(false);
152
153 storage_mock
154 .expect_wait_for_notification()
155 .returning(|_| Ok(()));
156
157 storage_mock
158 .expect_fetch_next_to_process()
159 .withf(move |l| l == &config.batch_size)
160 .times(1)
161 .returning(move |_| {
162 let _ = shutdown_tx.send(true);
163 Ok(vec![
164 Event::new(
165 EventType::new("1"),
166 Payload::new(json!({"some": "some1"})),
167 None,
168 ),
169 Event::new(
170 EventType::new("2"),
171 Payload::new(json!({"some": "some2"})),
172 None,
173 ),
174 Event::new(
175 EventType::new("3"),
176 Payload::new(json!({"some": "some3"})),
177 None,
178 ),
179 Event::new(
180 EventType::new("4"),
181 Payload::new(json!({"some": "some4"})),
182 None,
183 ),
184 ])
185 });
186
187 storage_mock
188 .expect_fetch_next_to_process()
189 .withf(move |l| l == &config.batch_size)
190 .returning(move |_| Ok(vec![]));
191
192 storage_mock
193 .expect_updates_status()
194 .withf(|ids, s| ids.len() == 4 && s == &EventStatus::Sent)
195 .returning(|_, _| Ok(()));
196
197 let mut seq = Sequence::new();
198
199 for i in 1..=4 {
200 let expected_type = i.to_string();
201 let expected_val = json!(format!("some{}", i));
202
203 transport_mock
204 .expect_publish()
205 .withf(move |event| {
206 true
210 })
211 .times(1)
212 .in_sequence(&mut seq)
213 .returning(|_| Ok(()));
214 }
215
216 let manager = OutboxManager::new(
217 Arc::new(storage_mock),
218 Arc::new(transport_mock),
219 Arc::new(config),
220 shutdown_rx,
221 );
222
223 let handle = tokio::spawn(async move {
224 manager.run().await.unwrap();
225 });
226
227 tokio::time::timeout(tokio::time::Duration::from_secs(1), handle)
228 .await
229 .expect("Manager did not stop in time")
230 .unwrap();
231 }
232}