Skip to main content

outbox_core/
manager.rs

1use crate::config::OutboxConfig;
2use crate::error::OutboxError;
3use crate::gc::GarbageCollector;
4use crate::processor::OutboxProcessor;
5use crate::publisher::Transport;
6use crate::storage::OutboxStorage;
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::sync::watch::Receiver;
10use tracing::{debug, error, info, trace};
11
12pub struct OutboxManager<S, P> {
13    storage: Arc<S>,
14    publisher: Arc<P>,
15    config: Arc<OutboxConfig>,
16    shutdown_rx: Receiver<bool>,
17}
18
19impl<S, P> OutboxManager<S, P>
20where
21    S: OutboxStorage + Send + Sync + 'static,
22    P: Transport + Send + Sync + 'static,
23{
24    pub fn new(
25        storage: Arc<S>,
26        publisher: Arc<P>,
27        config: Arc<OutboxConfig>,
28        shutdown_rx: Receiver<bool>,
29    ) -> Self {
30        Self {
31            storage,
32            publisher,
33            config,
34            shutdown_rx,
35        }
36    }
37
38    /// Starts the main outbox worker loop.
39    ///
40    /// This method will run until a shutdown signal is received via the `shutdown_rx` channel.
41    /// It handles event processing, database notifications, and periodic garbage collection.
42    ///
43    /// # Errors
44    ///
45    /// Returns an [`OutboxError`] if the worker encounters a terminal failure that it cannot
46    /// recover from (though currently the loop primarily logs errors and continues).
47    pub async fn run(self) -> Result<(), OutboxError> {
48        let storage_for_listen = self.storage.clone();
49        let processor = OutboxProcessor::new(
50            self.storage.clone(),
51            self.publisher.clone(),
52            self.config.clone(),
53        );
54
55        let gc = GarbageCollector::new(self.storage.clone());
56        let mut rx_gc = self.shutdown_rx.clone();
57        let gc_interval_secs = self.config.gc_interval_secs;
58        tokio::spawn(async move {
59            let mut interval = tokio::time::interval(Duration::from_secs(gc_interval_secs));
60            loop {
61                tokio::select! {
62                    _ = interval.tick() => { let _ = gc.collect_garbage().await; }
63                    _ = rx_gc.changed() => {
64                        if *rx_gc.borrow() {
65                            break
66                        }
67                    },
68                }
69            }
70        });
71
72        let mut rx_listen = self.shutdown_rx.clone();
73        let poll_interval = self.config.poll_interval_secs;
74        let mut interval = tokio::time::interval(Duration::from_secs(poll_interval));
75        info!("Outbox worker loop started");
76
77        loop {
78            tokio::select! {
79                signal = storage_for_listen.wait_for_notification("outbox_event") => {
80                    if let Err(e) = signal {
81                        error!("Listen error: {}", e);
82                        tokio::time::sleep(Duration::from_secs(5)).await;
83                        continue;
84                    }
85                }
86                _ = interval.tick() => {
87                    trace!("Checking for stale or pending events via interval");
88                }
89                _ = rx_listen.changed() => {
90                    if *rx_listen.borrow() {
91                        break
92                    }
93                }
94            }
95            loop {
96                if *rx_listen.borrow() {
97                    return Ok(());
98                }
99                match processor.process_pending_events().await {
100                    Ok(0) => break,
101                    Ok(count) => debug!("Processed {} events", count),
102                    Err(e) => {
103                        error!("Processing error: {}", e);
104                        tokio::time::sleep(Duration::from_secs(1)).await;
105                        break;
106                    }
107                }
108            }
109        }
110        Ok(())
111    }
112}
113
114#[cfg(test)]
115#[allow(clippy::unwrap_used)]
116mod tests {
117    use crate::config::{IdempotencyStrategy, OutboxConfig};
118    use crate::manager::OutboxManager;
119    use crate::model::{Event, EventStatus};
120    use crate::object::{EventType, Payload};
121    use crate::publisher::MockTransport;
122    use crate::storage::MockOutboxStorage;
123    use mockall::Sequence;
124    use rstest::rstest;
125    use serde_json::json;
126    use std::sync::Arc;
127    use tokio::sync::watch;
128
129    #[rstest]
130    #[tokio::test]
131    async fn test_event_send_success() {
132        let config = OutboxConfig {
133            batch_size: 100,
134            retention_days: 7,
135            gc_interval_secs: 3600,
136            poll_interval_secs: 5,
137            lock_timeout_mins: 5,
138            idempotency_strategy: IdempotencyStrategy::None,
139        };
140
141        let mut storage_mock = MockOutboxStorage::new();
142        let mut transport_mock = MockTransport::new();
143
144        let (shutdown_tx, shutdown_rx) = watch::channel(false);
145
146        storage_mock
147            .expect_wait_for_notification()
148            .returning(|_| Ok(()));
149
150        storage_mock
151            .expect_fetch_next_to_process()
152            .withf(move |l| l == &config.batch_size)
153            .times(1)
154            .returning(move |_| {
155                let _ = shutdown_tx.send(true);
156                Ok(vec![
157                    Event::new(
158                        EventType::new("1"),
159                        Payload::new(json!({"some": "some1"})),
160                        None,
161                    ),
162                    Event::new(
163                        EventType::new("2"),
164                        Payload::new(json!({"some": "some2"})),
165                        None,
166                    ),
167                    Event::new(
168                        EventType::new("3"),
169                        Payload::new(json!({"some": "some3"})),
170                        None,
171                    ),
172                    Event::new(
173                        EventType::new("4"),
174                        Payload::new(json!({"some": "some4"})),
175                        None,
176                    ),
177                ])
178            });
179
180        storage_mock
181            .expect_fetch_next_to_process()
182            .withf(move |l| l == &config.batch_size)
183            .returning(move |_| Ok(vec![]));
184
185        storage_mock
186            .expect_updates_status()
187            .withf(|ids, s| ids.len() == 4 && s == &EventStatus::Sent)
188            .returning(|_, _| Ok(()));
189
190        let mut seq = Sequence::new();
191
192        for i in 1..=4 {
193            let expected_type = i.to_string();
194            let expected_val = json!(format!("some{}", i));
195
196            transport_mock
197                .expect_publish()
198                .withf(move |event| {
199                    let type_matches = event.event_type.as_str() == expected_type;
200                    let payload_matches = event.payload.as_json()["some"] == expected_val;
201                    type_matches && payload_matches
202                })
203                .times(1)
204                .in_sequence(&mut seq)
205                .returning(|_| Ok(()));
206        }
207
208        let manager = OutboxManager::new(
209            Arc::new(storage_mock),
210            Arc::new(transport_mock),
211            Arc::new(config),
212            shutdown_rx,
213        );
214
215        let handle = tokio::spawn(async move {
216            manager.run().await.unwrap();
217        });
218
219        tokio::time::timeout(tokio::time::Duration::from_secs(1), handle)
220            .await
221            .expect("Manager did not stop in time")
222            .unwrap();
223    }
224}