Skip to main content

zimq_impl/
lib.rs

1use std::collections::BinaryHeap;
2use tokio::sync::{Mutex, RwLock, mpsc};
3
4use crate::{globals::ZIMQ_LOCK, message::Message, types::ZIMQ};
5
6mod binlog;
7mod consumer;
8pub mod defer;
9mod delay_queue;
10pub mod error;
11mod globals;
12pub mod message;
13mod producer;
14mod types;
15
16pub use consumer::subscribe;
17pub use producer::send_delay;
18pub use producer::send_delay_ex;
19pub use producer::send_immediate;
20
21/// Initialize ZIMQ
22/// # Arguments
23/// * `binlog_path` - The storage path. Supports absolute paths or relative paths (relative to the working directory).
24///
25/// # Panics
26/// * Invalid binlog path
27/// * Failed to load binlog data
28/// * Failed to initialize ZIMQ
29///
30/// # Examples
31/// ```
32/// zimq::init("zimq").await;
33/// ```
34///
35pub async fn init(binlog_path: &str) {
36    if let Err(err) = binlog::check_binlog_path(binlog_path).await {
37        panic!("[ZIMQ] Invalid binlog path. {:?}", err);
38    }
39    let (sender_message, receiver_message) = mpsc::channel::<Message>(64);
40    let (sender_interrupt, receiver_interrupt) = mpsc::channel::<u8>(1);
41    let zimq = ZIMQ {
42        sender_message: RwLock::new(Some(sender_message)),
43        receiver_message: Mutex::new(Some(receiver_message)),
44        recv_handler: RwLock::new(None),
45        sender_interrupt: RwLock::new(Some(sender_interrupt)),
46        receiver_interrupt: Mutex::new(Some(receiver_interrupt)),
47        binlog_path: binlog_path.to_string(),
48        delay_queue: Mutex::new(BinaryHeap::new()),
49        delay_recv_handler: RwLock::new(None),
50    };
51    let mut delay_queue_guard = zimq.delay_queue.lock().await;
52    if let Err(err) = delay_queue::load_message_from_disk(binlog_path, &mut delay_queue_guard).await
53    {
54        panic!("[ZIMQ] Failed to load binlog data. {:?}", err);
55    }
56    drop(delay_queue_guard);
57    if let Err(_) = ZIMQ_LOCK.set(zimq) {
58        panic!("[ZIMQ] Failed to initialize ZIMQ.");
59    }
60}
61
62/// Starts the core services of the ZIMQ.
63/// # Panics
64/// * zimq not initialized
65///
66/// # Examples
67/// ```
68/// zimq::start().await;
69/// ```
70///
71pub async fn start() {
72    let Some(zimq) = ZIMQ_LOCK.get() else {
73        panic!("[ZIMQ] zimq not initialized.");
74    };
75    let mut receiver_message_guard = zimq.receiver_message.lock().await;
76    let Some(receiver_message) = receiver_message_guard.take() else {
77        panic!("[ZIMQ] zimq not initialized.");
78    };
79    let consumer_handle: tokio::task::JoinHandle<()> =
80        tokio::spawn(consumer::consumer_broker(receiver_message));
81    let mut recv_handler_write = zimq.recv_handler.write().await;
82    *recv_handler_write = Some(consumer_handle);
83
84    let mut receiver_interrupt_guard = zimq.receiver_interrupt.lock().await;
85    let Some(receiver_interrupt) = receiver_interrupt_guard.take() else {
86        panic!("[ZIMQ] zimq not initialized.");
87    };
88    let delay_recv_handle: tokio::task::JoinHandle<()> =
89        tokio::spawn(delay_queue::delay_message_broker(receiver_interrupt, zimq));
90    let mut delay_recv_handler_write = zimq.delay_recv_handler.write().await;
91    *delay_recv_handler_write = Some(delay_recv_handle);
92}
93
94/// First interrupts the delayed message expiration detector,
95/// then waits for pending messages (immediate and expired) in the execution queue to complete,
96/// and finally releases resources before exiting.
97///
98/// # Examples
99/// ```
100/// zimq::cleanup("zimq").await;
101/// ```
102///
103pub async fn cleanup() {
104    let Some(zimq) = ZIMQ_LOCK.get() else {
105        eprintln!("[ZIMQ] zimq not initialized, Exit.");
106        return;
107    };
108    let mut sender_interrupt_guard = zimq.sender_interrupt.write().await;
109    if let Some(send_interrupt) = sender_interrupt_guard.take() {
110        drop(send_interrupt);
111    }
112    let mut delay_recv_handler_guard = zimq.delay_recv_handler.write().await;
113    if let Some(delay_recv_handler) = delay_recv_handler_guard.take() {
114        _ = delay_recv_handler.await;
115    }
116
117    let mut sender_guard = zimq.sender_message.write().await;
118    if let Some(sender) = sender_guard.take() {
119        drop(sender);
120    };
121
122    let mut recv_handler_guard = zimq.recv_handler.write().await;
123    if let Some(recv_handler) = recv_handler_guard.take() {
124        _ = recv_handler.await;
125    };
126}