1use std::{path::PathBuf, pin::Pin, sync::Arc};
2use futures::{StreamExt, stream::FuturesUnordered};
3use tokio::{
4 sync::{OwnedSemaphorePermit, Semaphore, mpsc::Receiver},
5 task::JoinError,
6};
7
8use crate::{
9 binlog,
10 globals::{ZIMQ_LOCK, consumerfns},
11 message::{Message, Msgtype},
12};
13
14pub async fn subscribe<F, R>(topic: &str, consumer: F)
29where
30 F: Fn(Message) -> R + Send + Sync + 'static,
31 R: Future<Output = ()> + Send + Sync + 'static,
32{
33 let consumer_pack = Arc::new(move |message: Message| {
34 let fut: Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>> =
35 Box::pin(consumer(message));
36 fut
37 });
38 let mut consumerfns = consumerfns().write().await;
39 consumerfns.insert(topic.to_string(), consumer_pack);
40}
41
42pub async fn consumer_broker(mut receiver: Receiver<Message>) {
43 let tasks: &mut FuturesUnordered<
44 Pin<Box<dyn Future<Output = Result<(), JoinError>> + Send + Sync + 'static>>,
45 > = &mut FuturesUnordered::new();
46
47 let binlog_path = &ZIMQ_LOCK.wait().binlog_path;
48 let semaphore = Arc::new(Semaphore::new(20));
50 let receiver_ptr = &mut receiver;
51 loop {
52 tokio::select! {
53 Some(_) = tasks.next() => {
54 continue;
55 },
56 permit = semaphore.clone().acquire_owned()=>{
57 let permit = match permit {
58 Ok(permit) => permit,
59 Err(err) => {
60 eprintln!("[ZIMQ] Semaphore permit acquisition failed : {}", err);
61 continue
62 },
63 };
64 let result = fetch_queue_data_and_process(permit,receiver_ptr,tasks,binlog_path).await;
65 if !result {
66 break;
67 }
68 },
69 else => break,
70 }
71 }
72 while let Some(_) = tasks.next().await {
73 }
75 println!("[ZIMQ] Message queue consumer tasks completed, exiting.");
76}
77
78async fn fetch_queue_data_and_process(
79 permit: OwnedSemaphorePermit,
80 receiver: &mut Receiver<Message>,
81 tasks: &mut FuturesUnordered<
82 Pin<Box<dyn Future<Output = Result<(), JoinError>> + Send + Sync + 'static>>,
83 >,
84 binlog_path: &'static str,
85) -> bool {
86 let result = tokio::select! {
87 msg = receiver.recv() => match msg{
88 Some(msg) => {
89 let guard = consumerfns().read().await;
90 let Some(consumer_fn) = guard.get(msg.topic.as_str()) else{
91 return true;
92 };
93 let consumer_fn = consumer_fn.clone();
94 let task_future = tokio::spawn(async move{
95 let delay_msg = msg.msgtype == Msgtype::Delayed;
96 let msgid = msg.msgid.clone();
97 let join_handle = tokio::spawn(async move{
99 consumer_fn(msg).await;
100 });
101 if let Err(e) = join_handle.await {
102 eprintln!("[ZIMQ] Message callback processing panic :{:?}",e);
103 }
104 drop(permit);
105 if delay_msg {
106 let mut path = PathBuf::from(binlog_path);
107 path.push(msgid);
108 binlog::delete_binlog(&path).await;
109 }
110 });
111 tasks.push(Box::pin(task_future));
112 true
113 },
114 None => {
115 println!("[ZIMQ] Sender closed, stop accepting new messages");
116 drop(permit);
117 false
118 },
119 },
120 Some(_) = tasks.next() => {
121 drop(permit);
122 true
123 }
124 };
125 result
126}