Skip to main content

zimq_impl/
consumer.rs

1use std::{path::PathBuf, pin::Pin, sync::Arc};
2use futures::{StreamExt, stream::FuturesUnordered};
3use tokio::{
4    sync::{OwnedSemaphorePermit, Semaphore, mpsc::Receiver},
5    task::JoinError,
6};
7
8use crate::{
9    binlog,
10    globals::{ZIMQ_LOCK, consumerfns},
11    message::{Message, Msgtype},
12};
13
14/// Subscribes to messages on a specific topic.
15/// # Arguments
16/// * `topic` - Message topic
17/// * `consumer` - A callback function to process incoming messages from the topic.
18///
19/// # Examples
20///  ```
21/// zimq::subscribe("user", user_consume).await;
22///
23/// async fn user_consume(msg: Message) {
24///     // TODO
25/// }
26/// ```
27///
28pub async fn subscribe<F, R>(topic: &str, consumer: F)
29where
30    F: Fn(Message) -> R + Send + Sync + 'static,
31    R: Future<Output = ()> + Send + Sync + 'static,
32{
33    let consumer_pack = Arc::new(move |message: Message| {
34        let fut: Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>> =
35            Box::pin(consumer(message));
36        fut
37    });
38    let mut consumerfns = consumerfns().write().await;
39    consumerfns.insert(topic.to_string(), consumer_pack);
40}
41
42pub async fn consumer_broker(mut receiver: Receiver<Message>) {
43    let tasks: &mut FuturesUnordered<
44        Pin<Box<dyn Future<Output = Result<(), JoinError>> + Send + Sync + 'static>>,
45    > = &mut FuturesUnordered::new();
46
47    let binlog_path = &ZIMQ_LOCK.wait().binlog_path;
48    //
49    let semaphore = Arc::new(Semaphore::new(20));
50    let receiver_ptr = &mut receiver;
51    loop {
52        tokio::select! {
53            Some(_) = tasks.next() => {
54                continue;
55            },
56            permit = semaphore.clone().acquire_owned()=>{
57                let permit = match permit {
58                    Ok(permit) => permit,
59                    Err(err) => {
60                        eprintln!("[ZIMQ] Semaphore permit acquisition failed : {}", err);
61                        continue
62                    },
63                };
64                let result = fetch_queue_data_and_process(permit,receiver_ptr,tasks,binlog_path).await;
65                if !result {
66                    break;
67                }
68            },
69            else => break,
70        }
71    }
72    while let Some(_) = tasks.next().await {
73        //println!("Task done: {:?}", res);
74    }
75    println!("[ZIMQ] Message queue consumer tasks completed, exiting.");
76}
77
78async fn fetch_queue_data_and_process(
79    permit: OwnedSemaphorePermit,
80    receiver: &mut Receiver<Message>,
81    tasks: &mut FuturesUnordered<
82        Pin<Box<dyn Future<Output = Result<(), JoinError>> + Send + Sync + 'static>>,
83    >,
84    binlog_path: &'static str,
85) -> bool {
86    let result = tokio::select! {
87        msg = receiver.recv() => match msg{
88            Some(msg) => {
89                let guard = consumerfns().read().await;
90                let Some(consumer_fn) = guard.get(msg.topic.as_str()) else{
91                    return true;
92                };
93                let consumer_fn = consumer_fn.clone();
94                let task_future = tokio::spawn(async move{
95                    let delay_msg = msg.msgtype == Msgtype::Delayed;
96                    let msgid = msg.msgid.clone();
97                    // Avoid program termination on panic by spawning a new tokio task here
98                    let join_handle = tokio::spawn(async move{
99                        consumer_fn(msg).await;
100                    });
101                    if let Err(e) = join_handle.await {
102                        eprintln!("[ZIMQ] Message callback processing panic :{:?}",e);
103                    }
104                    drop(permit);
105                    if delay_msg {
106                        let mut path = PathBuf::from(binlog_path);
107                        path.push(msgid);
108                        binlog::delete_binlog(&path).await;
109                    }
110                });
111                tasks.push(Box::pin(task_future));
112                true
113            },
114            None => {
115                println!("[ZIMQ] Sender closed, stop accepting new messages");
116                drop(permit);
117                false
118            },
119        },
120        Some(_) = tasks.next() => {
121            drop(permit);
122            true
123        }
124    };
125    result
126}