1use std::{
2 collections::HashMap,
3 path::PathBuf,
4 time::{Duration, SystemTime, UNIX_EPOCH},
5};
6
7use uuid::Uuid;
8
9use crate::{
10 binlog,
11 error::ZimqError,
12 globals::ZIMQ_LOCK,
13 message::{Message, Msgtype},
14 types::ZIMQ,
15};
16
17pub async fn send_immediate(topic: &str, data: HashMap<String, String>) -> Result<(), ZimqError> {
33 send_message(topic, Uuid::new_v4().to_string(), data, Duration::ZERO).await
34}
35
36pub async fn send_delay(
53 topic: &str,
54 data: HashMap<String, String>,
55 duration: Duration,
56) -> Result<(), ZimqError> {
57 send_message(topic, Uuid::new_v4().to_string(), data, duration).await
58}
59
60pub async fn send_delay_ex(
84 topic: &str,
85 msgid: String,
86 data: HashMap<String, String>,
87 duration: Duration,
88) -> Result<(), ZimqError> {
89 send_message(topic, msgid, data, duration).await
90}
91
92
93async fn produce(zimq: &ZIMQ, message: Message) -> Result<(), ZimqError> {
94 if message.msgtype == Msgtype::Immediate {
95 let sender_guard = zimq.sender_message.read().await;
96 let Some(sender) = &*(sender_guard) else {
97 return Err(ZimqError::InitError(
98 "Failed to initialize ZIMQ".to_string(),
99 ));
100 };
101 sender.send(message).await?;
102 } else {
103 let mut delay_queue_guard = zimq.delay_queue.lock().await;
104 let sender_guard = zimq.sender_interrupt.read().await;
105 delay_queue_guard.push(message);
106 if let Some(sender) = &*sender_guard {
107 let _ = sender.try_send(1);
108 }
109 }
110 Ok(())
111}
112
113async fn send_message(
114 topic: &str,
115 msgid: String,
116 data: HashMap<String, String>,
117 duration: Duration,
118) -> Result<(), ZimqError> {
119 let mut timestamp = match SystemTime::now().duration_since(UNIX_EPOCH) {
120 Ok(duration) => duration.as_millis(),
121 Err(_) => 0,
122 };
123 timestamp += duration.as_millis();
124 let zimq = ZIMQ_LOCK
125 .get()
126 .ok_or_else(|| ZimqError::UninitializedError("Failed to initialize ZIMQ".to_string()))?;
127 let message = Message {
128 topic: topic.to_string(),
129 msgid,
130 msgtype: if duration.is_zero() {
131 Msgtype::Immediate
132 } else {
133 Msgtype::Delayed
134 },
135 timestamp: timestamp as u64,
136 data,
137 };
138 if message.msgtype == Msgtype::Delayed {
139 let mut path = PathBuf::from(&zimq.binlog_path);
140 path.push(&message.msgid);
141 binlog::save_message(&message, &path).await?;
142 }
143 produce(zimq, message).await?;
144 Ok(())
145}