Skip to main content

rmqtt_web_hook/
lib.rs

1#![deny(unsafe_code)]
2
3use std::path::Path;
4use std::str::FromStr;
5use std::sync::atomic::{AtomicIsize, Ordering};
6use std::sync::Arc;
7use std::time::Duration;
8
9use anyhow::anyhow;
10use async_trait::async_trait;
11use backoff::{future::retry, ExponentialBackoff};
12use base64::prelude::{Engine, BASE64_STANDARD};
13use bytestring::ByteString;
14use rust_box::task_exec_queue::{SpawnExt, TaskExecQueue};
15use serde_json::{self, json};
16use tokio::{
17    self,
18    fs::{File, OpenOptions},
19    io::AsyncWriteExt,
20    sync::mpsc::{channel, Receiver, Sender},
21    sync::RwLock,
22    time,
23};
24
25use rmqtt::{
26    context::ServerContext,
27    hook::{self, Handler, HookResult, Parameter, Register, ReturnType, Type},
28    macros::Plugin,
29    plugin::{PackageInfo, Plugin},
30    register,
31    types::{DashMap, Topic, TopicFilter},
32    utils::{format_timestamp_millis, timestamp_millis, Counter},
33    Result,
34};
35
36use config::{PluginConfig, Url};
37
38mod config;
39
40type HookWriters = Arc<DashMap<ByteString, Arc<RwLock<HookWriter>>>>;
41
42register!(WebHookPlugin::new);
43
44#[derive(Plugin)]
45struct WebHookPlugin {
46    scx: ServerContext,
47    register: Box<dyn Register>,
48    cfg: Arc<RwLock<PluginConfig>>,
49    chan_queue_count: Arc<AtomicIsize>,
50    tx: Arc<RwLock<Sender<Message>>>,
51    exec: TaskExecQueue,
52    fails: Arc<Counter>,
53}
54
55impl WebHookPlugin {
56    #[inline]
57    async fn new<S: Into<String>>(scx: ServerContext, name: S) -> Result<Self> {
58        let name = name.into();
59        let cfg = Arc::new(RwLock::new(Self::load_config(&scx, &name)?));
60        log::debug!("{} WebHookPlugin cfg: {:?}", name, cfg.read().await);
61        let writers = Arc::new(DashMap::default());
62        let chan_queue_count = Arc::new(AtomicIsize::new(0));
63        let fails = Arc::new(Counter::new());
64        let httpc = new_http_client()?;
65        let (tx, exec) =
66            Self::start(scx.clone(), httpc, cfg.clone(), writers, chan_queue_count.clone(), fails.clone())
67                .await;
68        let tx = Arc::new(RwLock::new(tx));
69        let register = scx.extends.hook_mgr().register();
70        Ok(Self { scx, register, cfg, chan_queue_count, tx, exec, fails })
71    }
72
73    async fn start(
74        scx: ServerContext,
75        httpc: reqwest::Client,
76        cfg: Arc<RwLock<PluginConfig>>,
77        writers: HookWriters,
78        chan_queue_count: Arc<AtomicIsize>,
79        fails: Arc<Counter>,
80    ) -> (Sender<Message>, TaskExecQueue) {
81        let (tx, mut rx): (Sender<Message>, Receiver<Message>) = channel(cfg.read().await.queue_capacity);
82
83        let (exec_tx, exec_rx) = tokio::sync::oneshot::channel();
84        tokio::spawn(async move {
85            log::info!("start web-hook async worker.");
86            let runner = async {
87                let exec = scx.get_exec((
88                    "WEB_HOOK_EXEC",
89                    cfg.read().await.concurrency_limit,
90                    cfg.read().await.queue_capacity,
91                ));
92                if exec_tx.send(exec.clone()).is_err() {
93                    log::error!("tokio oneshot channel send failed");
94                }
95                let backoff_strategy = Arc::new(cfg.read().await.get_backoff_strategy());
96                loop {
97                    let cfg = cfg.clone();
98                    let writers = writers.clone();
99                    let backoff_strategy = backoff_strategy.clone();
100                    match rx.recv().await {
101                        Some(msg) => {
102                            chan_queue_count.fetch_sub(1, Ordering::SeqCst);
103                            log::trace!("received web-hook Message: {msg:?}");
104                            if exec.is_full() {
105                                loop {
106                                    time::sleep(Duration::from_millis(1)).await;
107                                    if !exec.is_full() {
108                                        break;
109                                    }
110                                }
111                            }
112                            Self::handle_msg(
113                                &exec,
114                                httpc.clone(),
115                                cfg,
116                                writers,
117                                backoff_strategy,
118                                msg,
119                                fails.clone(),
120                            )
121                            .await;
122                        }
123                        None => {
124                            log::info!("web hook message channel is closed!");
125                            break;
126                        }
127                    }
128                }
129            };
130            runner.await;
131            log::info!("exit web-hook async worker.");
132        });
133        let exec = exec_rx.await.expect("tokio oneshot channel recv failed");
134        (tx, exec)
135    }
136
137    #[inline]
138    async fn handle_msg(
139        exec: &TaskExecQueue,
140        httpc: reqwest::Client,
141        cfg: Arc<RwLock<PluginConfig>>,
142        writers: HookWriters,
143        backoff_strategy: Arc<ExponentialBackoff>,
144        msg: Message,
145        fails: Arc<Counter>,
146    ) {
147        if let Err(e) = async move {
148            let (typ, topic, data) = msg;
149            if let Err(e) = WebHookHandler::handle(
150                &httpc,
151                cfg,
152                writers,
153                backoff_strategy,
154                typ,
155                topic,
156                data,
157                fails.as_ref(),
158            )
159            .await
160            {
161                log::warn!("Failed to build the web-hook message, {e:?}");
162            }
163        }
164        .spawn(exec)
165        .await
166        {
167            log::error!("send web hook message failure, exec task error, {:?}", e.to_string());
168        }
169    }
170
171    #[inline]
172    fn load_config(scx: &ServerContext, name: &str) -> Result<PluginConfig> {
173        let mut cfg = scx.plugins.read_config_with::<PluginConfig>(name, &["urls"])?;
174        cfg.merge_urls();
175        Ok(cfg)
176    }
177}
178
179#[async_trait]
180impl Plugin for WebHookPlugin {
181    #[inline]
182    async fn init(&mut self) -> Result<()> {
183        log::info!("{} init", self.name());
184        let tx = self.tx.clone();
185        let chan_queue_count = self.chan_queue_count.clone();
186        self.register
187            .add(
188                Type::SessionCreated,
189                Box::new(WebHookHandler { tx: tx.clone(), chan_queue_count: chan_queue_count.clone() }),
190            )
191            .await;
192        self.register
193            .add(
194                Type::SessionTerminated,
195                Box::new(WebHookHandler { tx: tx.clone(), chan_queue_count: chan_queue_count.clone() }),
196            )
197            .await;
198        self.register
199            .add(
200                Type::SessionSubscribed,
201                Box::new(WebHookHandler { tx: tx.clone(), chan_queue_count: chan_queue_count.clone() }),
202            )
203            .await;
204        self.register
205            .add(
206                Type::SessionUnsubscribed,
207                Box::new(WebHookHandler { tx: tx.clone(), chan_queue_count: chan_queue_count.clone() }),
208            )
209            .await;
210
211        self.register
212            .add(
213                Type::ClientConnect,
214                Box::new(WebHookHandler { tx: tx.clone(), chan_queue_count: chan_queue_count.clone() }),
215            )
216            .await;
217        self.register
218            .add(
219                Type::ClientConnack,
220                Box::new(WebHookHandler { tx: tx.clone(), chan_queue_count: chan_queue_count.clone() }),
221            )
222            .await;
223        self.register
224            .add(
225                Type::ClientConnected,
226                Box::new(WebHookHandler { tx: tx.clone(), chan_queue_count: chan_queue_count.clone() }),
227            )
228            .await;
229        self.register
230            .add(
231                Type::ClientDisconnected,
232                Box::new(WebHookHandler { tx: tx.clone(), chan_queue_count: chan_queue_count.clone() }),
233            )
234            .await;
235        self.register
236            .add(
237                Type::ClientSubscribe,
238                Box::new(WebHookHandler { tx: tx.clone(), chan_queue_count: chan_queue_count.clone() }),
239            )
240            .await;
241        self.register
242            .add(
243                Type::ClientUnsubscribe,
244                Box::new(WebHookHandler { tx: tx.clone(), chan_queue_count: chan_queue_count.clone() }),
245            )
246            .await;
247
248        self.register
249            .add(
250                Type::MessagePublish,
251                Box::new(WebHookHandler { tx: tx.clone(), chan_queue_count: chan_queue_count.clone() }),
252            )
253            .await;
254        self.register
255            .add(
256                Type::MessageDelivered,
257                Box::new(WebHookHandler { tx: tx.clone(), chan_queue_count: chan_queue_count.clone() }),
258            )
259            .await;
260        self.register
261            .add(
262                Type::MessageAcked,
263                Box::new(WebHookHandler { tx: tx.clone(), chan_queue_count: chan_queue_count.clone() }),
264            )
265            .await;
266        self.register
267            .add(
268                Type::MessageDropped,
269                Box::new(WebHookHandler { tx: tx.clone(), chan_queue_count: chan_queue_count.clone() }),
270            )
271            .await;
272        self.register
273            .add(
274                Type::OfflineMessage,
275                Box::new(WebHookHandler { tx: tx.clone(), chan_queue_count: chan_queue_count.clone() }),
276            )
277            .await;
278
279        Ok(())
280    }
281
282    #[inline]
283    async fn get_config(&self) -> Result<serde_json::Value> {
284        self.cfg.read().await.to_json()
285    }
286
287    #[inline]
288    async fn load_config(&mut self) -> Result<()> {
289        let new_cfg = Self::load_config(&self.scx, self.name())?;
290        *self.cfg.write().await = new_cfg;
291        log::debug!("load_config ok,  {:?}", self.cfg);
292        Ok(())
293    }
294
295    #[inline]
296    async fn start(&mut self) -> Result<()> {
297        log::info!("{} start", self.name());
298        self.register.start().await;
299        Ok(())
300    }
301
302    #[inline]
303    async fn stop(&mut self) -> Result<bool> {
304        log::info!("{} stop", self.name());
305        self.register.stop().await;
306        Ok(true)
307    }
308
309    #[inline]
310    async fn attrs(&self) -> serde_json::Value {
311        let chan_queue_count = self.chan_queue_count.load(Ordering::SeqCst);
312        let exec = &self.exec;
313        json!({
314            "chan_queue_count": chan_queue_count,
315            "task_exec_queue": {
316                "active_count": exec.active_count(),
317                "waiting_count": exec.waiting_count(),
318                "completed_count": exec.completed_count().await,
319                "failure_count": self.fails.count(),
320            }
321        })
322    }
323}
324
325fn new_http_client() -> Result<reqwest::Client> {
326    reqwest::Client::builder()
327        .connect_timeout(Duration::from_secs(8))
328        .timeout(Duration::from_secs(15))
329        .build()
330        .map_err(|e| anyhow!(e))
331}
332
333type Message = (hook::Type, Option<TopicFilter>, serde_json::Value);
334
335struct WebHookHandler {
336    tx: Arc<RwLock<Sender<Message>>>,
337    chan_queue_count: Arc<AtomicIsize>,
338}
339
340impl WebHookHandler {
341    #[allow(clippy::too_many_arguments)]
342    async fn handle(
343        httpc: &reqwest::Client,
344        cfg: Arc<RwLock<PluginConfig>>,
345        writers: HookWriters,
346        backoff_strategy: Arc<ExponentialBackoff>,
347        typ: hook::Type,
348        topic: Option<TopicFilter>,
349        body: serde_json::Value,
350        fails: &Counter,
351    ) -> Result<()> {
352        let topic = if let Some(topic) = topic { Some(Topic::from_str(&topic)?) } else { None };
353        let hook_writes = {
354            let cfg = cfg.read().await;
355            if let Some(rules) = cfg.rules.get(&typ) {
356                //get action and urls
357                let action_urls = rules.iter().filter_map(|r| {
358                    let is_allowed = if let Some(topic) = &topic {
359                        if let Some((rule_topics, _)) = &r.topics {
360                            rule_topics.is_match(topic)
361                        } else {
362                            true
363                        }
364                    } else {
365                        true
366                    };
367
368                    if is_allowed {
369                        let urls = if r.urls.is_empty() { cfg.urls() } else { &r.urls };
370                        if urls.is_empty() {
371                            None
372                        } else {
373                            Some((&r.action, urls))
374                        }
375                    } else {
376                        None
377                    }
378                });
379
380                //build hook log write futures
381                let mut hook_writes = Vec::new();
382                for (action, urls) in action_urls {
383                    let mut new_body = body.clone();
384                    if let Some(obj) = new_body.as_object_mut() {
385                        obj.insert("action".into(), serde_json::Value::String(action.clone()));
386                    }
387                    if urls.len() == 1 {
388                        log::debug!("action: {}, url: {:?}", action, urls[0]);
389                        hook_writes.push(Self::write(
390                            httpc,
391                            writers.clone(),
392                            backoff_strategy.clone(),
393                            urls[0].clone(),
394                            Arc::new(new_body),
395                            cfg.http_timeout,
396                            fails,
397                        ));
398                    } else {
399                        let new_body = Arc::new(new_body);
400                        for url in urls {
401                            log::debug!("action: {action}, url: {url:?}");
402                            hook_writes.push(Self::write(
403                                httpc,
404                                writers.clone(),
405                                backoff_strategy.clone(),
406                                url.clone(),
407                                new_body.clone(),
408                                cfg.http_timeout,
409                                fails,
410                            ));
411                        }
412                    }
413                }
414                Some(hook_writes)
415            } else {
416                None
417            }
418        };
419        //send hook_writes
420        if let Some(mut hook_writes) = hook_writes {
421            let c = hook_writes.len();
422            match c {
423                0 => {}
424                1 => {
425                    hook_writes.remove(0).await;
426                }
427                _ => {
428                    let _ = futures::future::join_all(hook_writes).await;
429                }
430            }
431        }
432
433        Ok(())
434    }
435
436    #[inline]
437    async fn write(
438        httpc: &reqwest::Client,
439        writers: HookWriters,
440        backoff_strategy: Arc<ExponentialBackoff>,
441        url: Url,
442        body: Arc<serde_json::Value>,
443        timeout: Duration,
444        fails: &Counter,
445    ) {
446        if url.is_file() {
447            //is file
448            let data = match serde_json::to_vec(body.as_ref()) {
449                Ok(data) => data,
450                Err(e) => {
451                    log::warn!("write hook message failure, {e:?}");
452                    return;
453                }
454            };
455            let writer = writers
456                .entry(url.loc.clone())
457                .or_insert_with(|| Arc::new(RwLock::new(HookWriter::new(url.loc))))
458                .value()
459                .clone();
460            let mut writer = writer.write().await;
461            log::debug!("writer.log start ... ");
462            //time::sleep(time::Duration::from_secs(2)).await;
463            if let Err(e) = writer.log(data.as_slice()).await {
464                fails.current_inc();
465                log::warn!("write hook message failure, file: {:?}, {:?}", writer.file_name, e);
466            }
467            log::debug!("writer.log end ... ");
468        } else {
469            //is http
470            Self::http_request(httpc, backoff_strategy, url, body, timeout, fails).await;
471        }
472    }
473
474    async fn http_request(
475        httpc: &reqwest::Client,
476        backoff_strategy: Arc<ExponentialBackoff>,
477        url: Url,
478        body: Arc<serde_json::Value>,
479        timeout: Duration,
480        fails: &Counter,
481    ) {
482        if let Err(e) = retry(backoff_strategy.as_ref().clone(), || async {
483            Ok(Self::_http_request(httpc, &url.loc, body.clone(), timeout).await?)
484        })
485        .await
486        {
487            fails.current_inc();
488            log::warn!("send web hook message failure, {e:?}");
489        }
490    }
491
492    async fn _http_request(
493        httpc: &reqwest::Client,
494        url: &str,
495        body: Arc<serde_json::Value>,
496        timeout: Duration,
497    ) -> Result<()> {
498        log::debug!("http_request, timeout: {timeout:?}, url: {url}, body: {body}");
499
500        let resp = httpc
501            .request(reqwest::Method::POST, url)
502            .timeout(timeout)
503            .json(body.as_ref())
504            .send()
505            .await
506            .map_err(|e| anyhow!(e))?;
507
508        if resp.status().is_success() {
509            Ok(())
510        } else {
511            Err(anyhow!(format!("response status is not OK, url:{:?}, response:{:?}", url, resp)))
512        }
513    }
514}
515
516#[async_trait]
517impl Handler for WebHookHandler {
518    async fn hook(&self, param: &Parameter, acc: Option<HookResult>) -> ReturnType {
519        let typ = param.get_type();
520        let now = timestamp_millis();
521        let now_time = format_timestamp_millis(now);
522        let bodys = match param {
523            Parameter::ClientConnect(conn_info) => {
524                let mut body = conn_info.to_hook_body(false);
525                if let Some(obj) = body.as_object_mut() {
526                    obj.insert("time".into(), serde_json::Value::String(now_time));
527                }
528                Some((None, body))
529            }
530            Parameter::ClientConnack(conn_info, conn_ack) => {
531                let mut body = conn_info.to_hook_body(false);
532                if let Some(obj) = body.as_object_mut() {
533                    obj.insert("conn_ack".into(), serde_json::Value::String(conn_ack.reason().to_string()));
534                    obj.insert("time".into(), serde_json::Value::String(now_time));
535                }
536                Some((None, body))
537            }
538
539            Parameter::ClientConnected(session) => {
540                let mut body = session.connect_info().await.map(|c| c.to_hook_body(true)).unwrap_or_default();
541                if let Some(obj) = body.as_object_mut() {
542                    obj.insert(
543                        "connected_at".into(),
544                        serde_json::Value::Number(serde_json::Number::from(
545                            session.connected_at().await.unwrap_or_default(),
546                        )),
547                    );
548                    obj.insert(
549                        "session_present".into(),
550                        serde_json::Value::Bool(session.session_present().await.unwrap_or_default()),
551                    );
552                    obj.insert("time".into(), serde_json::Value::String(now_time));
553                }
554                Some((None, body))
555            }
556
557            Parameter::ClientDisconnected(session, reason) => {
558                let body = json!({
559                    "node": session.id.node(),
560                    "ipaddress": session.id.remote_addr,
561                    "clientid": session.id.client_id,
562                    "username": session.id.username_ref(),
563                    "disconnected_at": session.disconnected_at().await.unwrap_or_default(),
564                    "reason": reason.to_string(),
565                    "time": now_time
566                });
567                Some((None, body))
568            }
569
570            Parameter::ClientSubscribe(session, subscribe) => {
571                let body = json!({
572                    "node": session.id.node(),
573                    "ipaddress": session.id.remote_addr,
574                    "clientid": session.id.client_id,
575                    "username": session.id.username_ref(),
576                    "topic": subscribe.topic_filter,
577                    "opts": subscribe.opts.to_json(),
578                    "time": now_time
579                });
580                Some((Some(subscribe.topic_filter.clone()), body))
581            }
582
583            Parameter::ClientUnsubscribe(session, unsubscribe) => {
584                let body = json!({
585                    "node": session.id.node(),
586                    "ipaddress": session.id.remote_addr,
587                    "clientid": session.id.client_id,
588                    "username": session.id.username_ref(),
589                    "topic": unsubscribe.topic_filter,
590                    "time": now_time
591                });
592                Some((Some(unsubscribe.topic_filter.clone()), body))
593            }
594
595            Parameter::SessionSubscribed(session, subscribe) => {
596                let body = json!({
597                    "node": session.id.node(),
598                    "ipaddress": session.id.remote_addr,
599                    "clientid": session.id.client_id,
600                    "username": session.id.username_ref(),
601                    "topic": subscribe.topic_filter,
602                    "opts": subscribe.opts.to_json(),
603                    "time": now_time
604                });
605                Some((Some(subscribe.topic_filter.clone()), body))
606            }
607
608            Parameter::SessionUnsubscribed(session, unsubscribed) => {
609                let topic = unsubscribed.topic_filter.clone();
610                let body = json!({
611                    "node": session.id.node(),
612                    "ipaddress": session.id.remote_addr,
613                    "clientid": session.id.client_id,
614                    "username": session.id.username_ref(),
615                    "topic": topic,
616                    "time": now_time
617                });
618                Some((Some(topic), body))
619            }
620
621            Parameter::SessionCreated(session) => {
622                let body = json!({
623                    "node": session.id.node(),
624                    "ipaddress": session.id.remote_addr,
625                    "clientid": session.id.client_id,
626                    "username": session.id.username_ref(),
627                    "created_at": session.created_at().await.unwrap_or_default(),
628                    "time": now_time
629                });
630                Some((None, body))
631            }
632
633            Parameter::SessionTerminated(session, reason) => {
634                let body = json!({
635                    "node": session.id.node(),
636                    "ipaddress": session.id.remote_addr,
637                    "clientid": session.id.client_id,
638                    "username": session.id.username_ref(),
639                    "reason": reason.to_string(),
640                    "time": now_time
641                });
642                Some((None, body))
643            }
644
645            Parameter::MessagePublish(_session, from, publish) => {
646                let topic = &publish.topic;
647                let body = json!({
648                    "dup": publish.dup,
649                    "retain": publish.retain,
650                    "qos": publish.qos.value(),
651                    "topic": topic,
652                    "packet_id": publish.packet_id,
653                    "payload": BASE64_STANDARD.encode(publish.payload.as_ref()),
654                    "ts": publish.create_time,
655                    "time": now_time
656                });
657                let body = from.to_from_json(body);
658                Some((Some(topic.clone()), body))
659            }
660
661            Parameter::MessageDelivered(session, from, publish) => {
662                if from.is_system() {
663                    None
664                } else {
665                    let topic = &publish.topic;
666                    let body = json!({
667                        "dup": publish.dup,
668                        "retain": publish.retain,
669                        "qos": publish.qos.value(),
670                        "topic": topic,
671                        "packet_id": publish.packet_id,
672                        "payload": BASE64_STANDARD.encode(publish.payload.as_ref()),
673                        "pts": publish.create_time,
674                        "ts": now,
675                        "time": now_time
676                    });
677                    let body = session.id.to_to_json(body);
678                    let body = from.to_from_json(body);
679                    Some((Some(topic.clone()), body))
680                }
681            }
682
683            Parameter::MessageAcked(session, from, publish) => {
684                if from.is_system() {
685                    None
686                } else {
687                    let topic = &publish.topic;
688                    let body = json!({
689                        "dup": publish.dup,
690                        "retain": publish.retain,
691                        "qos": publish.qos.value(),
692                        "topic": topic,
693                        "packet_id": publish.packet_id,
694                        "payload": BASE64_STANDARD.encode(publish.payload.as_ref()),
695                        "pts": publish.create_time,
696                        "ts": now,
697                        "time": now_time
698                    });
699                    let body = session.id.to_to_json(body);
700                    let body = from.to_from_json(body);
701                    Some((Some(topic.clone()), body))
702                }
703            }
704
705            Parameter::MessageDropped(to, from, publish, reason) => {
706                if from.is_system() {
707                    None
708                } else {
709                    let body = json!({
710                        "dup": publish.dup,
711                        "retain": publish.retain,
712                        "qos": publish.qos.value(),
713                        "topic": publish.topic,
714                        "packet_id": publish.packet_id,
715                        "payload": BASE64_STANDARD.encode(publish.payload.as_ref()),
716                        "reason": reason.to_string(),
717                        "pts": publish.create_time,
718                        "ts": now,
719                        "time": now_time
720                    });
721                    let mut body = from.to_from_json(body);
722                    if let Some(to) = to {
723                        body = to.to_to_json(body);
724                    }
725                    Some((None, body))
726                }
727            }
728            Parameter::OfflineMessage(session, from, publish) => {
729                if from.is_system() {
730                    None
731                } else {
732                    let topic = &publish.topic;
733                    let body = json!({
734                        "dup": publish.dup,
735                        "retain": publish.retain,
736                        "qos": publish.qos.value(),
737                        "topic": topic,
738                        "packet_id": publish.packet_id,
739                        "payload": BASE64_STANDARD.encode(publish.payload.as_ref()),
740                        "pts": publish.create_time,
741                        "ts": now,
742                        "time": now_time
743                    });
744                    let body = session.id.to_to_json(body);
745                    let body = from.to_from_json(body);
746                    Some((Some(topic.clone()), body))
747                }
748            }
749            _ => {
750                log::error!("parameter is: {param:?}");
751                None
752            }
753        };
754
755        log::debug!("bodys: {bodys:?}");
756
757        if let Some((topic, body)) = bodys {
758            let tx = self.tx.read().await.clone();
759            if let Err(e) = tx.send((typ, topic, body)).await {
760                log::warn!("web-hook send error, typ: {typ:?}, {e:?}");
761            } else {
762                self.chan_queue_count.fetch_add(1, Ordering::SeqCst);
763            }
764        }
765
766        (true, acc)
767    }
768}
769
770struct HookWriter {
771    file_name: String,
772    file: Option<File>,
773}
774
775impl HookWriter {
776    fn new(file: ByteString) -> Self {
777        Self { file_name: file.to_string(), file: None }
778    }
779
780    #[inline]
781    pub async fn log(&mut self, msg: &[u8]) -> std::result::Result<(), Box<dyn std::error::Error>> {
782        if let Some(file) = self.file.as_mut() {
783            file.write_all(msg).await?;
784            file.write_all(b"\n").await?;
785        } else {
786            Self::create_dirs(Path::new(&self.file_name)).await?;
787            let mut file = OpenOptions::new().create(true).append(true).open(&self.file_name).await?;
788            file.write_all(msg).await?;
789            file.write_all(b"\n").await?;
790            self.file.replace(file);
791        }
792        Ok(())
793    }
794
795    #[inline]
796    async fn create_dirs(path: &Path) -> std::result::Result<(), std::io::Error> {
797        if let Some(parent) = path.parent() {
798            // If the parent directory does not exist, create it recursively.
799            if !parent.exists() {
800                tokio::fs::create_dir_all(parent).await?;
801            }
802        }
803        Ok(())
804    }
805}