Skip to main content

zimq_impl/
producer.rs

1use std::{
2    collections::HashMap,
3    path::PathBuf,
4    time::{Duration, SystemTime, UNIX_EPOCH},
5};
6
7use uuid::Uuid;
8
9use crate::{
10    binlog,
11    error::ZimqError,
12    globals::ZIMQ_LOCK,
13    message::{Message, Msgtype},
14    types::ZIMQ,
15};
16
17/// Publishes an immediate message to the specified topic.
18/// # Arguments
19/// * `topic` - Message topic
20/// * `data` - Message data
21///
22/// # Errors
23/// * `ZimqError`
24///
25/// # Examples
26/// ```
27/// let mut data = HashMap::new();
28/// data.insert("id".to_string(), "123456".to_string());
29/// let result: Result<(), ZimqError> = zimq::send_immediate("order", data.clone()).await;
30/// ```
31///
32pub async fn send_immediate(topic: &str, data: HashMap<String, String>) -> Result<(), ZimqError> {
33    send_message(topic, Uuid::new_v4().to_string(), data, Duration::ZERO).await
34}
35
36/// Publishes a delayed message to the specified topic.
37/// # Arguments
38/// * `topic` - Message topic
39/// * `data` - Message data
40/// * `duration` - Duration
41///
42/// # Errors
43/// * `ZimqError`
44///
45/// # Examples
46/// ```
47/// let mut data = HashMap::new();
48/// data.insert("id".to_string(), "123456".to_string());
49/// let result = zimq::send_delay("user",data,Duration::from_secs(20)).await
50/// ```
51///
52pub async fn send_delay(
53    topic: &str,
54    data: HashMap<String, String>,
55    duration: Duration,
56) -> Result<(), ZimqError> {
57    send_message(topic, Uuid::new_v4().to_string(), data, duration).await
58}
59
60/// Publishes a delayed message with a custom message ID to the specified topic.
61/// If a delayed message with the same ID already exists and has not been consumed yet, this method will return ZimqError::DuplicatedMessageError to prevent duplicates.
62/// # Arguments
63///
64/// * `topic` - Message topic
65/// * `msgid` - Message ID
66/// * `data` - Message data
67/// * `duration` - Duration
68///
69/// # Errors
70/// * `ZimqError`
71///
72/// # Examples
73/// ```
74/// let mut data = HashMap::new();
75/// data.insert("id".to_string(), "123456".to_string());
76/// let msgid = "msgid123456".to_string();
77/// let result = zimq::send_delay_ex("user",msgid,data,Duration::from_secs(20)).await;
78/// if let Err(ZimqError::DuplicatedMessageError(msg) ) = result {
79///     println!("DuplicatedMessageError: {}", msg);
80///  }
81/// ```
82///
83pub async fn send_delay_ex(
84    topic: &str,
85    msgid: String,
86    data: HashMap<String, String>,
87    duration: Duration,
88) -> Result<(), ZimqError> {
89    send_message(topic, msgid, data, duration).await
90}
91
92
93async fn produce(zimq: &ZIMQ, message: Message) -> Result<(), ZimqError> {
94    if message.msgtype == Msgtype::Immediate {
95        let sender_guard = zimq.sender_message.read().await;
96        let Some(sender) = &*(sender_guard) else {
97            return Err(ZimqError::InitError(
98                "Failed to initialize ZIMQ".to_string(),
99            ));
100        };
101        sender.send(message).await?;
102    } else {
103        let mut delay_queue_guard = zimq.delay_queue.lock().await;
104        let sender_guard = zimq.sender_interrupt.read().await;
105        delay_queue_guard.push(message);
106        if let Some(sender) = &*sender_guard {
107            let _ = sender.try_send(1);
108        }
109    }
110    Ok(())
111}
112
113async fn send_message(
114    topic: &str,
115    msgid: String,
116    data: HashMap<String, String>,
117    duration: Duration,
118) -> Result<(), ZimqError> {
119    let mut timestamp = match SystemTime::now().duration_since(UNIX_EPOCH) {
120        Ok(duration) => duration.as_millis(),
121        Err(_) => 0,
122    };
123    timestamp += duration.as_millis();
124    let zimq = ZIMQ_LOCK
125        .get()
126        .ok_or_else(|| ZimqError::UninitializedError("Failed to initialize ZIMQ".to_string()))?;
127    let message = Message {
128        topic: topic.to_string(),
129        msgid,
130        msgtype: if duration.is_zero() {
131            Msgtype::Immediate
132        } else {
133            Msgtype::Delayed
134        },
135        timestamp: timestamp as u64,
136        data,
137    };
138    if message.msgtype == Msgtype::Delayed {
139        let mut path = PathBuf::from(&zimq.binlog_path);
140        path.push(&message.msgid);
141        binlog::save_message(&message, &path).await?;
142    }
143    produce(zimq, message).await?;
144    Ok(())
145}