1use crate::config::OutboxConfig;
2use crate::error::OutboxError;
3use crate::gc::GarbageCollector;
4use crate::processor::OutboxProcessor;
5use crate::publisher::Transport;
6use crate::storage::OutboxStorage;
7use std::sync::Arc;
8use std::time::Duration;
9use tracing::{debug, error, info, trace};
10
11pub struct OutboxManager<S, P> {
12 storage: S,
13 publisher: P,
14 config: Arc<OutboxConfig>,
15 shutdown_tx: Option<tokio::sync::broadcast::Sender<()>>,
16}
17
18impl<S, P> OutboxManager<S, P>
19where
20 S: OutboxStorage + Clone + Send + Sync + 'static,
21 P: Transport + Clone + Send + Sync + 'static,
22{
23 pub fn new(storage: S, publisher: P, config: Arc<OutboxConfig>) -> Self {
24 Self {
25 storage,
26 publisher,
27 config,
28 shutdown_tx: None,
29 }
30 }
31
32 pub async fn run(&mut self) -> Result<(), OutboxError> {
33 let (tx, _) = tokio::sync::broadcast::channel(1);
34 self.shutdown_tx = Some(tx.clone());
35
36 let storage_for_listen = self.storage.clone();
37 let processor = OutboxProcessor::new(
38 self.storage.clone(),
39 self.publisher.clone(),
40 self.config.clone(),
41 );
42
43 let gc = GarbageCollector::new(self.storage.clone());
44 let mut rx_gc = tx.subscribe();
45 let gc_interval_secs = self.config.gc_interval_secs;
46 tokio::spawn(async move {
47 let mut interval = tokio::time::interval(Duration::from_secs(gc_interval_secs));
48 loop {
49 tokio::select! {
50 _ = interval.tick() => { let _ = gc.collect_garbage().await; }
51 _ = rx_gc.recv() => break,
52 }
53 }
54 });
55
56 let mut rx_listen = tx.subscribe();
57 let poll_interval = self.config.poll_interval_secs;
58 let mut interval = tokio::time::interval(Duration::from_secs(poll_interval));
59 info!("Outbox worker loop started");
60
61 loop {
62 tokio::select! {
63 signal = storage_for_listen.wait_for_notification("outbox_event") => {
64 if let Err(e) = signal {
65 error!("Listen error: {}", e);
66 tokio::time::sleep(Duration::from_secs(5)).await;
67 continue;
68 }
69 }
70 _ = interval.tick() => {
71 trace!("Checking for stale or pending events via interval");
72 }
73 _ = rx_listen.recv() => {
74 break;
75 }
76 }
77 loop {
78 match processor.process_pending_events().await {
79 Ok(0) => break,
80 Ok(count) => debug!("Processed {} events", count),
81 Err(e) => {
82 error!("Processing error: {}", e);
83 tokio::time::sleep(Duration::from_secs(1)).await;
84 break;
85 }
86 }
87 }
88 }
89
90 Ok(())
91 }
92}