Skip to main content

outbox_core/
manager.rs

1use crate::config::OutboxConfig;
2use crate::error::OutboxError;
3use crate::gc::GarbageCollector;
4use crate::processor::OutboxProcessor;
5use crate::publisher::Transport;
6use crate::storage::OutboxStorage;
7use serde::Serialize;
8use std::fmt::Debug;
9use std::sync::Arc;
10use std::time::Duration;
11use tokio::sync::watch::Receiver;
12use tracing::{debug, error, info, trace};
13
14pub struct OutboxManager<S, P, PT>
15where
16    PT: Debug + Clone + Serialize,
17{
18    storage: Arc<S>,
19    publisher: Arc<P>,
20    config: Arc<OutboxConfig<PT>>,
21    shutdown_rx: Receiver<bool>,
22}
23
24impl<S, P, PT> OutboxManager<S, P, PT>
25where
26    S: OutboxStorage<PT> + Send + Sync + 'static,
27    P: Transport<PT> + Send + Sync + 'static,
28    PT: Debug + Clone + Serialize + Send + Sync + 'static,
29{
30    pub fn new(
31        storage: Arc<S>,
32        publisher: Arc<P>,
33        config: Arc<OutboxConfig<PT>>,
34        shutdown_rx: Receiver<bool>,
35    ) -> Self {
36        Self {
37            storage,
38            publisher,
39            config,
40            shutdown_rx,
41        }
42    }
43
44    /// Starts the main outbox worker loop.
45    ///
46    /// This method will run until a shutdown signal is received via the `shutdown_rx` channel.
47    /// It handles event processing, database notifications, and periodic garbage collection.
48    ///
49    /// # Errors
50    ///
51    /// Returns an [`OutboxError`] if the worker encounters a terminal failure that it cannot
52    /// recover from (though currently the loop primarily logs errors and continues).
53    pub async fn run(self) -> Result<(), OutboxError> {
54        let storage_for_listen = self.storage.clone();
55        let processor = OutboxProcessor::new(
56            self.storage.clone(),
57            self.publisher.clone(),
58            self.config.clone(),
59        );
60
61        let gc = GarbageCollector::new(self.storage.clone());
62        let mut rx_gc = self.shutdown_rx.clone();
63        let gc_interval_secs = self.config.gc_interval_secs;
64        tokio::spawn(async move {
65            let mut interval = tokio::time::interval(Duration::from_secs(gc_interval_secs));
66            loop {
67                tokio::select! {
68                    _ = interval.tick() => { let _ = gc.collect_garbage().await; }
69                    _ = rx_gc.changed() => {
70                        if *rx_gc.borrow() {
71                            break
72                        }
73                    },
74                }
75            }
76        });
77
78        let mut rx_listen = self.shutdown_rx.clone();
79        let poll_interval = self.config.poll_interval_secs;
80        let mut interval = tokio::time::interval(Duration::from_secs(poll_interval));
81        info!("Outbox worker loop started");
82
83        loop {
84            tokio::select! {
85                signal = storage_for_listen.wait_for_notification("outbox_event") => {
86                    if let Err(e) = signal {
87                        error!("Listen error: {}", e);
88                        tokio::time::sleep(Duration::from_secs(5)).await;
89                        continue;
90                    }
91                }
92                _ = interval.tick() => {
93                    trace!("Checking for stale or pending events via interval");
94                }
95                _ = rx_listen.changed() => {
96                    if *rx_listen.borrow() {
97                        break
98                    }
99                }
100            }
101            loop {
102                if *rx_listen.borrow() {
103                    return Ok(());
104                }
105                match processor.process_pending_events().await {
106                    Ok(0) => break,
107                    Ok(count) => debug!("Processed {} events", count),
108                    Err(e) => {
109                        error!("Processing error: {}", e);
110                        tokio::time::sleep(Duration::from_secs(1)).await;
111                        break;
112                    }
113                }
114            }
115        }
116        Ok(())
117    }
118}
119
120#[cfg(test)]
121#[allow(clippy::unwrap_used)]
122mod tests {
123    use crate::config::{IdempotencyStrategy, OutboxConfig};
124    use crate::manager::OutboxManager;
125    use crate::model::{Event, EventStatus};
126    use crate::object::EventType;
127    use crate::prelude::Payload;
128    use crate::publisher::MockTransport;
129    use crate::storage::MockOutboxStorage;
130    use mockall::Sequence;
131    use rstest::rstest;
132    use serde_json::json;
133    use std::sync::Arc;
134    use tokio::sync::watch;
135
136    #[rstest]
137    #[tokio::test]
138    async fn test_event_send_success() {
139        let config = OutboxConfig {
140            batch_size: 100,
141            retention_days: 7,
142            gc_interval_secs: 3600,
143            poll_interval_secs: 5,
144            lock_timeout_mins: 5,
145            idempotency_strategy: IdempotencyStrategy::None,
146        };
147
148        let mut storage_mock = MockOutboxStorage::new();
149        let mut transport_mock = MockTransport::new();
150
151        let (shutdown_tx, shutdown_rx) = watch::channel(false);
152
153        storage_mock
154            .expect_wait_for_notification()
155            .returning(|_| Ok(()));
156
157        storage_mock
158            .expect_fetch_next_to_process()
159            .withf(move |l| l == &config.batch_size)
160            .times(1)
161            .returning(move |_| {
162                let _ = shutdown_tx.send(true);
163                Ok(vec![
164                    Event::new(
165                        EventType::new("1"),
166                        Payload::new(json!({"some": "some1"})),
167                        None,
168                    ),
169                    Event::new(
170                        EventType::new("2"),
171                        Payload::new(json!({"some": "some2"})),
172                        None,
173                    ),
174                    Event::new(
175                        EventType::new("3"),
176                        Payload::new(json!({"some": "some3"})),
177                        None,
178                    ),
179                    Event::new(
180                        EventType::new("4"),
181                        Payload::new(json!({"some": "some4"})),
182                        None,
183                    ),
184                ])
185            });
186
187        storage_mock
188            .expect_fetch_next_to_process()
189            .withf(move |l| l == &config.batch_size)
190            .returning(move |_| Ok(vec![]));
191
192        storage_mock
193            .expect_updates_status()
194            .withf(|ids, s| ids.len() == 4 && s == &EventStatus::Sent)
195            .returning(|_, _| Ok(()));
196
197        let mut seq = Sequence::new();
198
199        for i in 1..=4 {
200            let expected_type = i.to_string();
201            let expected_val = json!(format!("some{}", i));
202
203            transport_mock
204                .expect_publish()
205                .withf(move |event| {
206                    // let type_matches = event.event_type.as_str() == expected_type;
207                    // let payload_matches = event.payload.as_json()["some"] == expected_val;
208                    // type_matches && payload_matches
209                    true
210                })
211                .times(1)
212                .in_sequence(&mut seq)
213                .returning(|_| Ok(()));
214        }
215
216        let manager = OutboxManager::new(
217            Arc::new(storage_mock),
218            Arc::new(transport_mock),
219            Arc::new(config),
220            shutdown_rx,
221        );
222
223        let handle = tokio::spawn(async move {
224            manager.run().await.unwrap();
225        });
226
227        tokio::time::timeout(tokio::time::Duration::from_secs(1), handle)
228            .await
229            .expect("Manager did not stop in time")
230            .unwrap();
231    }
232}