Skip to main content

outbox_core/
manager.rs

1use crate::config::OutboxConfig;
2use crate::error::OutboxError;
3use crate::gc::GarbageCollector;
4use crate::processor::OutboxProcessor;
5use crate::publisher::Transport;
6use crate::storage::OutboxStorage;
7use serde::Serialize;
8use std::fmt::Debug;
9use std::sync::Arc;
10use std::time::Duration;
11use tokio::sync::watch::Receiver;
12use tracing::{debug, error, info, trace};
13
14pub struct OutboxManager<S, P, PT>
15where
16    PT: Debug + Clone + Serialize,
17{
18    storage: Arc<S>,
19    publisher: Arc<P>,
20    config: Arc<OutboxConfig<PT>>,
21    shutdown_rx: Receiver<bool>,
22}
23
24impl<S, P, PT> OutboxManager<S, P, PT>
25where
26    S: OutboxStorage<PT> + Send + Sync + 'static,
27    P: Transport<PT> + Send + Sync + 'static,
28    PT: Debug + Clone + Serialize + Send + Sync + 'static,
29{
30    pub fn new(
31        storage: Arc<S>,
32        publisher: Arc<P>,
33        config: Arc<OutboxConfig<PT>>,
34        shutdown_rx: Receiver<bool>,
35    ) -> Self {
36        Self {
37            storage,
38            publisher,
39            config,
40            shutdown_rx,
41        }
42    }
43
44    /// Starts the main outbox worker loop.
45    ///
46    /// This method will run until a shutdown signal is received via the `shutdown_rx` channel.
47    /// It handles event processing, database notifications, and periodic garbage collection.
48    ///
49    /// # Errors
50    ///
51    /// Returns an [`OutboxError`] if the worker encounters a terminal failure that it cannot
52    /// recover from (though currently the loop primarily logs errors and continues).
53    pub async fn run(self) -> Result<(), OutboxError> {
54        let storage_for_listen = self.storage.clone();
55        let processor = OutboxProcessor::new(
56            self.storage.clone(),
57            self.publisher.clone(),
58            self.config.clone(),
59        );
60
61        let gc = GarbageCollector::new(self.storage.clone());
62        let mut rx_gc = self.shutdown_rx.clone();
63        let gc_interval_secs = self.config.gc_interval_secs;
64        tokio::spawn(async move {
65            let mut interval = tokio::time::interval(Duration::from_secs(gc_interval_secs));
66            loop {
67                tokio::select! {
68                    _ = interval.tick() => { let _ = gc.collect_garbage().await; }
69                    _ = rx_gc.changed() => {
70                            if rx_gc.has_changed().is_err(){
71                            break;
72                        }
73                        if *rx_gc.borrow() {
74                            break
75                        }
76                    },
77                }
78            }
79        });
80
81        let mut rx_listen = self.shutdown_rx.clone();
82        let poll_interval = self.config.poll_interval_secs;
83        let mut interval = tokio::time::interval(Duration::from_secs(poll_interval));
84        info!("Outbox worker loop started");
85
86        loop {
87            tokio::select! {
88                signal = storage_for_listen.wait_for_notification("outbox_event") => {
89                    if let Err(e) = signal {
90                        error!("Listen error: {}", e);
91                        tokio::time::sleep(Duration::from_secs(5)).await;
92                        continue;
93                    }
94                }
95                _ = interval.tick() => {
96                    trace!("Checking for stale or pending events via interval");
97                }
98                _ = rx_listen.changed() => {
99                    if rx_listen.has_changed().is_err(){
100                        break;
101                    }
102                    if *rx_listen.borrow() {
103                        break
104                    }
105                }
106            }
107            loop {
108                if *rx_listen.borrow() {
109                    return Ok(());
110                }
111                match processor.process_pending_events().await {
112                    Ok(0) => break,
113                    Ok(count) => debug!("Processed {} events", count),
114                    Err(e) => {
115                        error!("Processing error: {}", e);
116                        tokio::time::sleep(Duration::from_secs(1)).await;
117                        break;
118                    }
119                }
120            }
121        }
122        debug!("Outbox worker loop stopped");
123        Ok(())
124    }
125}
126
127#[cfg(test)]
128#[allow(clippy::unwrap_used)]
129mod tests {
130    use crate::config::{IdempotencyStrategy, OutboxConfig};
131    use crate::manager::OutboxManager;
132    use crate::model::{Event, EventStatus};
133    use crate::object::EventType;
134    use crate::prelude::Payload;
135    use crate::publisher::MockTransport;
136    use crate::storage::MockOutboxStorage;
137    use mockall::Sequence;
138    use rstest::rstest;
139    use serde_json::json;
140    use std::sync::Arc;
141    use tokio::sync::watch;
142
143    #[rstest]
144    #[tokio::test]
145    async fn test_event_send_success() {
146        let config = OutboxConfig {
147            batch_size: 100,
148            retention_days: 7,
149            gc_interval_secs: 3600,
150            poll_interval_secs: 5,
151            lock_timeout_mins: 5,
152            idempotency_strategy: IdempotencyStrategy::None,
153        };
154
155        let mut storage_mock = MockOutboxStorage::new();
156        let mut transport_mock = MockTransport::new();
157
158        let (shutdown_tx, shutdown_rx) = watch::channel(false);
159
160        storage_mock
161            .expect_wait_for_notification()
162            .returning(|_| Ok(()));
163
164        storage_mock
165            .expect_fetch_next_to_process()
166            .withf(move |l| l == &config.batch_size)
167            .times(1)
168            .returning(move |_| {
169                let _ = shutdown_tx.send(true);
170                Ok(vec![
171                    Event::new(
172                        EventType::new("1"),
173                        Payload::new(json!({"some": "some1"})),
174                        None,
175                    ),
176                    Event::new(
177                        EventType::new("2"),
178                        Payload::new(json!({"some": "some2"})),
179                        None,
180                    ),
181                    Event::new(
182                        EventType::new("3"),
183                        Payload::new(json!({"some": "some3"})),
184                        None,
185                    ),
186                    Event::new(
187                        EventType::new("4"),
188                        Payload::new(json!({"some": "some4"})),
189                        None,
190                    ),
191                ])
192            });
193
194        storage_mock
195            .expect_fetch_next_to_process()
196            .withf(move |l| l == &config.batch_size)
197            .returning(move |_| Ok(vec![]));
198
199        storage_mock
200            .expect_updates_status()
201            .withf(|ids, s| ids.len() == 4 && s == &EventStatus::Sent)
202            .returning(|_, _| Ok(()));
203
204        let mut seq = Sequence::new();
205
206        for i in 1..=4 {
207            let expected_type = i.to_string();
208            let expected_val = json!(format!("some{}", i));
209
210            transport_mock
211                .expect_publish()
212                .withf(move |event| {
213                    // let type_matches = event.event_type.as_str() == expected_type;
214                    // let payload_matches = event.payload.as_json()["some"] == expected_val;
215                    // type_matches && payload_matches
216                    true
217                })
218                .times(1)
219                .in_sequence(&mut seq)
220                .returning(|_| Ok(()));
221        }
222
223        let manager = OutboxManager::new(
224            Arc::new(storage_mock),
225            Arc::new(transport_mock),
226            Arc::new(config),
227            shutdown_rx,
228        );
229
230        let handle = tokio::spawn(async move {
231            manager.run().await.unwrap();
232        });
233
234        tokio::time::timeout(tokio::time::Duration::from_secs(1), handle)
235            .await
236            .expect("Manager did not stop in time")
237            .unwrap();
238    }
239}