1use std::collections::BinaryHeap;
2use tokio::sync::{Mutex, RwLock, mpsc};
3
4use crate::{globals::ZIMQ_LOCK, message::Message, types::ZIMQ};
5
6mod binlog;
7mod consumer;
8pub mod defer;
9mod delay_queue;
10pub mod error;
11mod globals;
12pub mod message;
13mod producer;
14mod types;
15
16pub use consumer::subscribe;
17pub use producer::send_delay;
18pub use producer::send_delay_ex;
19pub use producer::send_immediate;
20
21pub async fn init(binlog_path: &str) {
36 if let Err(err) = binlog::check_binlog_path(binlog_path).await {
37 panic!("[ZIMQ] Invalid binlog path. {:?}", err);
38 }
39 let (sender_message, receiver_message) = mpsc::channel::<Message>(64);
40 let (sender_interrupt, receiver_interrupt) = mpsc::channel::<u8>(1);
41 let zimq = ZIMQ {
42 sender_message: RwLock::new(Some(sender_message)),
43 receiver_message: Mutex::new(Some(receiver_message)),
44 recv_handler: RwLock::new(None),
45 sender_interrupt: RwLock::new(Some(sender_interrupt)),
46 receiver_interrupt: Mutex::new(Some(receiver_interrupt)),
47 binlog_path: binlog_path.to_string(),
48 delay_queue: Mutex::new(BinaryHeap::new()),
49 delay_recv_handler: RwLock::new(None),
50 };
51 let mut delay_queue_guard = zimq.delay_queue.lock().await;
52 if let Err(err) = delay_queue::load_message_from_disk(binlog_path, &mut delay_queue_guard).await
53 {
54 panic!("[ZIMQ] Failed to load binlog data. {:?}", err);
55 }
56 drop(delay_queue_guard);
57 if let Err(_) = ZIMQ_LOCK.set(zimq) {
58 panic!("[ZIMQ] Failed to initialize ZIMQ.");
59 }
60}
61
62pub async fn start() {
72 let Some(zimq) = ZIMQ_LOCK.get() else {
73 panic!("[ZIMQ] zimq not initialized.");
74 };
75 let mut receiver_message_guard = zimq.receiver_message.lock().await;
76 let Some(receiver_message) = receiver_message_guard.take() else {
77 panic!("[ZIMQ] zimq not initialized.");
78 };
79 let consumer_handle: tokio::task::JoinHandle<()> =
80 tokio::spawn(consumer::consumer_broker(receiver_message));
81 let mut recv_handler_write = zimq.recv_handler.write().await;
82 *recv_handler_write = Some(consumer_handle);
83
84 let mut receiver_interrupt_guard = zimq.receiver_interrupt.lock().await;
85 let Some(receiver_interrupt) = receiver_interrupt_guard.take() else {
86 panic!("[ZIMQ] zimq not initialized.");
87 };
88 let delay_recv_handle: tokio::task::JoinHandle<()> =
89 tokio::spawn(delay_queue::delay_message_broker(receiver_interrupt, zimq));
90 let mut delay_recv_handler_write = zimq.delay_recv_handler.write().await;
91 *delay_recv_handler_write = Some(delay_recv_handle);
92}
93
94pub async fn cleanup() {
104 let Some(zimq) = ZIMQ_LOCK.get() else {
105 eprintln!("[ZIMQ] zimq not initialized, Exit.");
106 return;
107 };
108 let mut sender_interrupt_guard = zimq.sender_interrupt.write().await;
109 if let Some(send_interrupt) = sender_interrupt_guard.take() {
110 drop(send_interrupt);
111 }
112 let mut delay_recv_handler_guard = zimq.delay_recv_handler.write().await;
113 if let Some(delay_recv_handler) = delay_recv_handler_guard.take() {
114 _ = delay_recv_handler.await;
115 }
116
117 let mut sender_guard = zimq.sender_message.write().await;
118 if let Some(sender) = sender_guard.take() {
119 drop(sender);
120 };
121
122 let mut recv_handler_guard = zimq.recv_handler.write().await;
123 if let Some(recv_handler) = recv_handler_guard.take() {
124 _ = recv_handler.await;
125 };
126}