Skip to main content

outbox_core/
manager.rs

1use crate::config::OutboxConfig;
2use crate::error::OutboxError;
3use crate::gc::GarbageCollector;
4use crate::processor::OutboxProcessor;
5use crate::publisher::Transport;
6use crate::storage::OutboxStorage;
7use std::sync::Arc;
8use std::time::Duration;
9use tracing::{debug, error, info, trace};
10
11pub struct OutboxManager<S, P> {
12    storage: S,
13    publisher: P,
14    config: Arc<OutboxConfig>,
15    shutdown_tx: Option<tokio::sync::broadcast::Sender<()>>,
16}
17
18impl<S, P> OutboxManager<S, P>
19where
20    S: OutboxStorage + Clone + Send + Sync + 'static,
21    P: Transport + Clone + Send + Sync + 'static,
22{
23    pub fn new(storage: S, publisher: P, config: Arc<OutboxConfig>) -> Self {
24        Self {
25            storage,
26            publisher,
27            config,
28            shutdown_tx: None,
29        }
30    }
31
32    pub async fn run(&mut self) -> Result<(), OutboxError> {
33        let (tx, _) = tokio::sync::broadcast::channel(1);
34        self.shutdown_tx = Some(tx.clone());
35
36        let storage_for_listen = self.storage.clone();
37        let processor = OutboxProcessor::new(
38            self.storage.clone(),
39            self.publisher.clone(),
40            self.config.clone(),
41        );
42
43        let gc = GarbageCollector::new(self.storage.clone());
44        let mut rx_gc = tx.subscribe();
45        let gc_interval_secs = self.config.gc_interval_secs;
46        tokio::spawn(async move {
47            let mut interval = tokio::time::interval(Duration::from_secs(gc_interval_secs));
48            loop {
49                tokio::select! {
50                    _ = interval.tick() => { let _ = gc.collect_garbage().await; }
51                    _ = rx_gc.recv() => break,
52                }
53            }
54        });
55
56        let mut rx_listen = tx.subscribe();
57        let poll_interval = self.config.poll_interval_secs;
58        let mut interval = tokio::time::interval(Duration::from_secs(poll_interval));
59        info!("Outbox worker loop started");
60
61        loop {
62            tokio::select! {
63                signal = storage_for_listen.wait_for_notification("outbox_event") => {
64                    if let Err(e) = signal {
65                        error!("Listen error: {}", e);
66                        tokio::time::sleep(Duration::from_secs(5)).await;
67                        continue;
68                    }
69                }
70                _ = interval.tick() => {
71                    trace!("Checking for stale or pending events via interval");
72                }
73                _ = rx_listen.recv() => {
74                    break;
75                }
76            }
77            loop {
78                match processor.process_pending_events().await {
79                    Ok(0) => break,
80                    Ok(count) => debug!("Processed {} events", count),
81                    Err(e) => {
82                        error!("Processing error: {}", e);
83                        tokio::time::sleep(Duration::from_secs(1)).await;
84                        break;
85                    }
86                }
87            }
88        }
89
90        Ok(())
91    }
92}