1#![deny(unsafe_code)]
2
3use std::path::Path;
4use std::str::FromStr;
5use std::sync::atomic::{AtomicIsize, Ordering};
6use std::sync::Arc;
7use std::time::Duration;
8
9use anyhow::anyhow;
10use async_trait::async_trait;
11use backoff::{future::retry, ExponentialBackoff};
12use base64::prelude::{Engine, BASE64_STANDARD};
13use bytestring::ByteString;
14use rust_box::task_exec_queue::{SpawnExt, TaskExecQueue};
15use serde_json::{self, json};
16use tokio::{
17 self,
18 fs::{File, OpenOptions},
19 io::AsyncWriteExt,
20 sync::mpsc::{channel, Receiver, Sender},
21 sync::RwLock,
22 time,
23};
24
25use rmqtt::{
26 context::ServerContext,
27 hook::{self, Handler, HookResult, Parameter, Register, ReturnType, Type},
28 macros::Plugin,
29 plugin::{PackageInfo, Plugin},
30 register,
31 types::{DashMap, Topic, TopicFilter},
32 utils::{format_timestamp_millis, timestamp_millis, Counter},
33 Result,
34};
35
36use config::{PluginConfig, Url};
37
38mod config;
39
40type HookWriters = Arc<DashMap<ByteString, Arc<RwLock<HookWriter>>>>;
41
42register!(WebHookPlugin::new);
43
44#[derive(Plugin)]
45struct WebHookPlugin {
46 scx: ServerContext,
47 register: Box<dyn Register>,
48 cfg: Arc<RwLock<PluginConfig>>,
49 chan_queue_count: Arc<AtomicIsize>,
50 tx: Arc<RwLock<Sender<Message>>>,
51 exec: TaskExecQueue,
52 fails: Arc<Counter>,
53}
54
55impl WebHookPlugin {
56 #[inline]
57 async fn new<S: Into<String>>(scx: ServerContext, name: S) -> Result<Self> {
58 let name = name.into();
59 let cfg = Arc::new(RwLock::new(Self::load_config(&scx, &name)?));
60 log::debug!("{} WebHookPlugin cfg: {:?}", name, cfg.read().await);
61 let writers = Arc::new(DashMap::default());
62 let chan_queue_count = Arc::new(AtomicIsize::new(0));
63 let fails = Arc::new(Counter::new());
64 let httpc = new_http_client()?;
65 let (tx, exec) =
66 Self::start(scx.clone(), httpc, cfg.clone(), writers, chan_queue_count.clone(), fails.clone())
67 .await;
68 let tx = Arc::new(RwLock::new(tx));
69 let register = scx.extends.hook_mgr().register();
70 Ok(Self { scx, register, cfg, chan_queue_count, tx, exec, fails })
71 }
72
73 async fn start(
74 scx: ServerContext,
75 httpc: reqwest::Client,
76 cfg: Arc<RwLock<PluginConfig>>,
77 writers: HookWriters,
78 chan_queue_count: Arc<AtomicIsize>,
79 fails: Arc<Counter>,
80 ) -> (Sender<Message>, TaskExecQueue) {
81 let (tx, mut rx): (Sender<Message>, Receiver<Message>) = channel(cfg.read().await.queue_capacity);
82
83 let (exec_tx, exec_rx) = tokio::sync::oneshot::channel();
84 tokio::spawn(async move {
85 log::info!("start web-hook async worker.");
86 let runner = async {
87 let exec = scx.get_exec((
88 "WEB_HOOK_EXEC",
89 cfg.read().await.concurrency_limit,
90 cfg.read().await.queue_capacity,
91 ));
92 if exec_tx.send(exec.clone()).is_err() {
93 log::error!("tokio oneshot channel send failed");
94 }
95 let backoff_strategy = Arc::new(cfg.read().await.get_backoff_strategy());
96 loop {
97 let cfg = cfg.clone();
98 let writers = writers.clone();
99 let backoff_strategy = backoff_strategy.clone();
100 match rx.recv().await {
101 Some(msg) => {
102 chan_queue_count.fetch_sub(1, Ordering::SeqCst);
103 log::trace!("received web-hook Message: {msg:?}");
104 if exec.is_full() {
105 loop {
106 time::sleep(Duration::from_millis(1)).await;
107 if !exec.is_full() {
108 break;
109 }
110 }
111 }
112 Self::handle_msg(
113 &exec,
114 httpc.clone(),
115 cfg,
116 writers,
117 backoff_strategy,
118 msg,
119 fails.clone(),
120 )
121 .await;
122 }
123 None => {
124 log::info!("web hook message channel is closed!");
125 break;
126 }
127 }
128 }
129 };
130 runner.await;
131 log::info!("exit web-hook async worker.");
132 });
133 let exec = exec_rx.await.expect("tokio oneshot channel recv failed");
134 (tx, exec)
135 }
136
137 #[inline]
138 async fn handle_msg(
139 exec: &TaskExecQueue,
140 httpc: reqwest::Client,
141 cfg: Arc<RwLock<PluginConfig>>,
142 writers: HookWriters,
143 backoff_strategy: Arc<ExponentialBackoff>,
144 msg: Message,
145 fails: Arc<Counter>,
146 ) {
147 if let Err(e) = async move {
148 let (typ, topic, data) = msg;
149 if let Err(e) = WebHookHandler::handle(
150 &httpc,
151 cfg,
152 writers,
153 backoff_strategy,
154 typ,
155 topic,
156 data,
157 fails.as_ref(),
158 )
159 .await
160 {
161 log::warn!("Failed to build the web-hook message, {e:?}");
162 }
163 }
164 .spawn(exec)
165 .await
166 {
167 log::error!("send web hook message failure, exec task error, {:?}", e.to_string());
168 }
169 }
170
171 #[inline]
172 fn load_config(scx: &ServerContext, name: &str) -> Result<PluginConfig> {
173 let mut cfg = scx.plugins.read_config_with::<PluginConfig>(name, &["urls"])?;
174 cfg.merge_urls();
175 Ok(cfg)
176 }
177}
178
179#[async_trait]
180impl Plugin for WebHookPlugin {
181 #[inline]
182 async fn init(&mut self) -> Result<()> {
183 log::info!("{} init", self.name());
184 let tx = self.tx.clone();
185 let chan_queue_count = self.chan_queue_count.clone();
186 self.register
187 .add(
188 Type::SessionCreated,
189 Box::new(WebHookHandler { tx: tx.clone(), chan_queue_count: chan_queue_count.clone() }),
190 )
191 .await;
192 self.register
193 .add(
194 Type::SessionTerminated,
195 Box::new(WebHookHandler { tx: tx.clone(), chan_queue_count: chan_queue_count.clone() }),
196 )
197 .await;
198 self.register
199 .add(
200 Type::SessionSubscribed,
201 Box::new(WebHookHandler { tx: tx.clone(), chan_queue_count: chan_queue_count.clone() }),
202 )
203 .await;
204 self.register
205 .add(
206 Type::SessionUnsubscribed,
207 Box::new(WebHookHandler { tx: tx.clone(), chan_queue_count: chan_queue_count.clone() }),
208 )
209 .await;
210
211 self.register
212 .add(
213 Type::ClientConnect,
214 Box::new(WebHookHandler { tx: tx.clone(), chan_queue_count: chan_queue_count.clone() }),
215 )
216 .await;
217 self.register
218 .add(
219 Type::ClientConnack,
220 Box::new(WebHookHandler { tx: tx.clone(), chan_queue_count: chan_queue_count.clone() }),
221 )
222 .await;
223 self.register
224 .add(
225 Type::ClientConnected,
226 Box::new(WebHookHandler { tx: tx.clone(), chan_queue_count: chan_queue_count.clone() }),
227 )
228 .await;
229 self.register
230 .add(
231 Type::ClientDisconnected,
232 Box::new(WebHookHandler { tx: tx.clone(), chan_queue_count: chan_queue_count.clone() }),
233 )
234 .await;
235 self.register
236 .add(
237 Type::ClientSubscribe,
238 Box::new(WebHookHandler { tx: tx.clone(), chan_queue_count: chan_queue_count.clone() }),
239 )
240 .await;
241 self.register
242 .add(
243 Type::ClientUnsubscribe,
244 Box::new(WebHookHandler { tx: tx.clone(), chan_queue_count: chan_queue_count.clone() }),
245 )
246 .await;
247
248 self.register
249 .add(
250 Type::MessagePublish,
251 Box::new(WebHookHandler { tx: tx.clone(), chan_queue_count: chan_queue_count.clone() }),
252 )
253 .await;
254 self.register
255 .add(
256 Type::MessageDelivered,
257 Box::new(WebHookHandler { tx: tx.clone(), chan_queue_count: chan_queue_count.clone() }),
258 )
259 .await;
260 self.register
261 .add(
262 Type::MessageAcked,
263 Box::new(WebHookHandler { tx: tx.clone(), chan_queue_count: chan_queue_count.clone() }),
264 )
265 .await;
266 self.register
267 .add(
268 Type::MessageDropped,
269 Box::new(WebHookHandler { tx: tx.clone(), chan_queue_count: chan_queue_count.clone() }),
270 )
271 .await;
272 self.register
273 .add(
274 Type::OfflineMessage,
275 Box::new(WebHookHandler { tx: tx.clone(), chan_queue_count: chan_queue_count.clone() }),
276 )
277 .await;
278
279 Ok(())
280 }
281
282 #[inline]
283 async fn get_config(&self) -> Result<serde_json::Value> {
284 self.cfg.read().await.to_json()
285 }
286
287 #[inline]
288 async fn load_config(&mut self) -> Result<()> {
289 let new_cfg = Self::load_config(&self.scx, self.name())?;
290 *self.cfg.write().await = new_cfg;
291 log::debug!("load_config ok, {:?}", self.cfg);
292 Ok(())
293 }
294
295 #[inline]
296 async fn start(&mut self) -> Result<()> {
297 log::info!("{} start", self.name());
298 self.register.start().await;
299 Ok(())
300 }
301
302 #[inline]
303 async fn stop(&mut self) -> Result<bool> {
304 log::info!("{} stop", self.name());
305 self.register.stop().await;
306 Ok(true)
307 }
308
309 #[inline]
310 async fn attrs(&self) -> serde_json::Value {
311 let chan_queue_count = self.chan_queue_count.load(Ordering::SeqCst);
312 let exec = &self.exec;
313 json!({
314 "chan_queue_count": chan_queue_count,
315 "task_exec_queue": {
316 "active_count": exec.active_count(),
317 "waiting_count": exec.waiting_count(),
318 "completed_count": exec.completed_count().await,
319 "failure_count": self.fails.count(),
320 }
321 })
322 }
323}
324
325fn new_http_client() -> Result<reqwest::Client> {
326 reqwest::Client::builder()
327 .connect_timeout(Duration::from_secs(8))
328 .timeout(Duration::from_secs(15))
329 .build()
330 .map_err(|e| anyhow!(e))
331}
332
333type Message = (hook::Type, Option<TopicFilter>, serde_json::Value);
334
335struct WebHookHandler {
336 tx: Arc<RwLock<Sender<Message>>>,
337 chan_queue_count: Arc<AtomicIsize>,
338}
339
340impl WebHookHandler {
341 #[allow(clippy::too_many_arguments)]
342 async fn handle(
343 httpc: &reqwest::Client,
344 cfg: Arc<RwLock<PluginConfig>>,
345 writers: HookWriters,
346 backoff_strategy: Arc<ExponentialBackoff>,
347 typ: hook::Type,
348 topic: Option<TopicFilter>,
349 body: serde_json::Value,
350 fails: &Counter,
351 ) -> Result<()> {
352 let topic = if let Some(topic) = topic { Some(Topic::from_str(&topic)?) } else { None };
353 let hook_writes = {
354 let cfg = cfg.read().await;
355 if let Some(rules) = cfg.rules.get(&typ) {
356 let action_urls = rules.iter().filter_map(|r| {
358 let is_allowed = if let Some(topic) = &topic {
359 if let Some((rule_topics, _)) = &r.topics {
360 rule_topics.is_match(topic)
361 } else {
362 true
363 }
364 } else {
365 true
366 };
367
368 if is_allowed {
369 let urls = if r.urls.is_empty() { cfg.urls() } else { &r.urls };
370 if urls.is_empty() {
371 None
372 } else {
373 Some((&r.action, urls))
374 }
375 } else {
376 None
377 }
378 });
379
380 let mut hook_writes = Vec::new();
382 for (action, urls) in action_urls {
383 let mut new_body = body.clone();
384 if let Some(obj) = new_body.as_object_mut() {
385 obj.insert("action".into(), serde_json::Value::String(action.clone()));
386 }
387 if urls.len() == 1 {
388 log::debug!("action: {}, url: {:?}", action, urls[0]);
389 hook_writes.push(Self::write(
390 httpc,
391 writers.clone(),
392 backoff_strategy.clone(),
393 urls[0].clone(),
394 Arc::new(new_body),
395 cfg.http_timeout,
396 fails,
397 ));
398 } else {
399 let new_body = Arc::new(new_body);
400 for url in urls {
401 log::debug!("action: {action}, url: {url:?}");
402 hook_writes.push(Self::write(
403 httpc,
404 writers.clone(),
405 backoff_strategy.clone(),
406 url.clone(),
407 new_body.clone(),
408 cfg.http_timeout,
409 fails,
410 ));
411 }
412 }
413 }
414 Some(hook_writes)
415 } else {
416 None
417 }
418 };
419 if let Some(mut hook_writes) = hook_writes {
421 let c = hook_writes.len();
422 match c {
423 0 => {}
424 1 => {
425 hook_writes.remove(0).await;
426 }
427 _ => {
428 let _ = futures::future::join_all(hook_writes).await;
429 }
430 }
431 }
432
433 Ok(())
434 }
435
436 #[inline]
437 async fn write(
438 httpc: &reqwest::Client,
439 writers: HookWriters,
440 backoff_strategy: Arc<ExponentialBackoff>,
441 url: Url,
442 body: Arc<serde_json::Value>,
443 timeout: Duration,
444 fails: &Counter,
445 ) {
446 if url.is_file() {
447 let data = match serde_json::to_vec(body.as_ref()) {
449 Ok(data) => data,
450 Err(e) => {
451 log::warn!("write hook message failure, {e:?}");
452 return;
453 }
454 };
455 let writer = writers
456 .entry(url.loc.clone())
457 .or_insert_with(|| Arc::new(RwLock::new(HookWriter::new(url.loc))))
458 .value()
459 .clone();
460 let mut writer = writer.write().await;
461 log::debug!("writer.log start ... ");
462 if let Err(e) = writer.log(data.as_slice()).await {
464 fails.current_inc();
465 log::warn!("write hook message failure, file: {:?}, {:?}", writer.file_name, e);
466 }
467 log::debug!("writer.log end ... ");
468 } else {
469 Self::http_request(httpc, backoff_strategy, url, body, timeout, fails).await;
471 }
472 }
473
474 async fn http_request(
475 httpc: &reqwest::Client,
476 backoff_strategy: Arc<ExponentialBackoff>,
477 url: Url,
478 body: Arc<serde_json::Value>,
479 timeout: Duration,
480 fails: &Counter,
481 ) {
482 if let Err(e) = retry(backoff_strategy.as_ref().clone(), || async {
483 Ok(Self::_http_request(httpc, &url.loc, body.clone(), timeout).await?)
484 })
485 .await
486 {
487 fails.current_inc();
488 log::warn!("send web hook message failure, {e:?}");
489 }
490 }
491
492 async fn _http_request(
493 httpc: &reqwest::Client,
494 url: &str,
495 body: Arc<serde_json::Value>,
496 timeout: Duration,
497 ) -> Result<()> {
498 log::debug!("http_request, timeout: {timeout:?}, url: {url}, body: {body}");
499
500 let resp = httpc
501 .request(reqwest::Method::POST, url)
502 .timeout(timeout)
503 .json(body.as_ref())
504 .send()
505 .await
506 .map_err(|e| anyhow!(e))?;
507
508 if resp.status().is_success() {
509 Ok(())
510 } else {
511 Err(anyhow!(format!("response status is not OK, url:{:?}, response:{:?}", url, resp)))
512 }
513 }
514}
515
516#[async_trait]
517impl Handler for WebHookHandler {
518 async fn hook(&self, param: &Parameter, acc: Option<HookResult>) -> ReturnType {
519 let typ = param.get_type();
520 let now = timestamp_millis();
521 let now_time = format_timestamp_millis(now);
522 let bodys = match param {
523 Parameter::ClientConnect(conn_info) => {
524 let mut body = conn_info.to_hook_body(false);
525 if let Some(obj) = body.as_object_mut() {
526 obj.insert("time".into(), serde_json::Value::String(now_time));
527 }
528 Some((None, body))
529 }
530 Parameter::ClientConnack(conn_info, conn_ack) => {
531 let mut body = conn_info.to_hook_body(false);
532 if let Some(obj) = body.as_object_mut() {
533 obj.insert("conn_ack".into(), serde_json::Value::String(conn_ack.reason().to_string()));
534 obj.insert("time".into(), serde_json::Value::String(now_time));
535 }
536 Some((None, body))
537 }
538
539 Parameter::ClientConnected(session) => {
540 let mut body = session.connect_info().await.map(|c| c.to_hook_body(true)).unwrap_or_default();
541 if let Some(obj) = body.as_object_mut() {
542 obj.insert(
543 "connected_at".into(),
544 serde_json::Value::Number(serde_json::Number::from(
545 session.connected_at().await.unwrap_or_default(),
546 )),
547 );
548 obj.insert(
549 "session_present".into(),
550 serde_json::Value::Bool(session.session_present().await.unwrap_or_default()),
551 );
552 obj.insert("time".into(), serde_json::Value::String(now_time));
553 }
554 Some((None, body))
555 }
556
557 Parameter::ClientDisconnected(session, reason) => {
558 let body = json!({
559 "node": session.id.node(),
560 "ipaddress": session.id.remote_addr,
561 "clientid": session.id.client_id,
562 "username": session.id.username_ref(),
563 "disconnected_at": session.disconnected_at().await.unwrap_or_default(),
564 "reason": reason.to_string(),
565 "time": now_time
566 });
567 Some((None, body))
568 }
569
570 Parameter::ClientSubscribe(session, subscribe) => {
571 let body = json!({
572 "node": session.id.node(),
573 "ipaddress": session.id.remote_addr,
574 "clientid": session.id.client_id,
575 "username": session.id.username_ref(),
576 "topic": subscribe.topic_filter,
577 "opts": subscribe.opts.to_json(),
578 "time": now_time
579 });
580 Some((Some(subscribe.topic_filter.clone()), body))
581 }
582
583 Parameter::ClientUnsubscribe(session, unsubscribe) => {
584 let body = json!({
585 "node": session.id.node(),
586 "ipaddress": session.id.remote_addr,
587 "clientid": session.id.client_id,
588 "username": session.id.username_ref(),
589 "topic": unsubscribe.topic_filter,
590 "time": now_time
591 });
592 Some((Some(unsubscribe.topic_filter.clone()), body))
593 }
594
595 Parameter::SessionSubscribed(session, subscribe) => {
596 let body = json!({
597 "node": session.id.node(),
598 "ipaddress": session.id.remote_addr,
599 "clientid": session.id.client_id,
600 "username": session.id.username_ref(),
601 "topic": subscribe.topic_filter,
602 "opts": subscribe.opts.to_json(),
603 "time": now_time
604 });
605 Some((Some(subscribe.topic_filter.clone()), body))
606 }
607
608 Parameter::SessionUnsubscribed(session, unsubscribed) => {
609 let topic = unsubscribed.topic_filter.clone();
610 let body = json!({
611 "node": session.id.node(),
612 "ipaddress": session.id.remote_addr,
613 "clientid": session.id.client_id,
614 "username": session.id.username_ref(),
615 "topic": topic,
616 "time": now_time
617 });
618 Some((Some(topic), body))
619 }
620
621 Parameter::SessionCreated(session) => {
622 let body = json!({
623 "node": session.id.node(),
624 "ipaddress": session.id.remote_addr,
625 "clientid": session.id.client_id,
626 "username": session.id.username_ref(),
627 "created_at": session.created_at().await.unwrap_or_default(),
628 "time": now_time
629 });
630 Some((None, body))
631 }
632
633 Parameter::SessionTerminated(session, reason) => {
634 let body = json!({
635 "node": session.id.node(),
636 "ipaddress": session.id.remote_addr,
637 "clientid": session.id.client_id,
638 "username": session.id.username_ref(),
639 "reason": reason.to_string(),
640 "time": now_time
641 });
642 Some((None, body))
643 }
644
645 Parameter::MessagePublish(_session, from, publish) => {
646 let topic = &publish.topic;
647 let body = json!({
648 "dup": publish.dup,
649 "retain": publish.retain,
650 "qos": publish.qos.value(),
651 "topic": topic,
652 "packet_id": publish.packet_id,
653 "payload": BASE64_STANDARD.encode(publish.payload.as_ref()),
654 "ts": publish.create_time,
655 "time": now_time
656 });
657 let body = from.to_from_json(body);
658 Some((Some(topic.clone()), body))
659 }
660
661 Parameter::MessageDelivered(session, from, publish) => {
662 if from.is_system() {
663 None
664 } else {
665 let topic = &publish.topic;
666 let body = json!({
667 "dup": publish.dup,
668 "retain": publish.retain,
669 "qos": publish.qos.value(),
670 "topic": topic,
671 "packet_id": publish.packet_id,
672 "payload": BASE64_STANDARD.encode(publish.payload.as_ref()),
673 "pts": publish.create_time,
674 "ts": now,
675 "time": now_time
676 });
677 let body = session.id.to_to_json(body);
678 let body = from.to_from_json(body);
679 Some((Some(topic.clone()), body))
680 }
681 }
682
683 Parameter::MessageAcked(session, from, publish) => {
684 if from.is_system() {
685 None
686 } else {
687 let topic = &publish.topic;
688 let body = json!({
689 "dup": publish.dup,
690 "retain": publish.retain,
691 "qos": publish.qos.value(),
692 "topic": topic,
693 "packet_id": publish.packet_id,
694 "payload": BASE64_STANDARD.encode(publish.payload.as_ref()),
695 "pts": publish.create_time,
696 "ts": now,
697 "time": now_time
698 });
699 let body = session.id.to_to_json(body);
700 let body = from.to_from_json(body);
701 Some((Some(topic.clone()), body))
702 }
703 }
704
705 Parameter::MessageDropped(to, from, publish, reason) => {
706 if from.is_system() {
707 None
708 } else {
709 let body = json!({
710 "dup": publish.dup,
711 "retain": publish.retain,
712 "qos": publish.qos.value(),
713 "topic": publish.topic,
714 "packet_id": publish.packet_id,
715 "payload": BASE64_STANDARD.encode(publish.payload.as_ref()),
716 "reason": reason.to_string(),
717 "pts": publish.create_time,
718 "ts": now,
719 "time": now_time
720 });
721 let mut body = from.to_from_json(body);
722 if let Some(to) = to {
723 body = to.to_to_json(body);
724 }
725 Some((None, body))
726 }
727 }
728 Parameter::OfflineMessage(session, from, publish) => {
729 if from.is_system() {
730 None
731 } else {
732 let topic = &publish.topic;
733 let body = json!({
734 "dup": publish.dup,
735 "retain": publish.retain,
736 "qos": publish.qos.value(),
737 "topic": topic,
738 "packet_id": publish.packet_id,
739 "payload": BASE64_STANDARD.encode(publish.payload.as_ref()),
740 "pts": publish.create_time,
741 "ts": now,
742 "time": now_time
743 });
744 let body = session.id.to_to_json(body);
745 let body = from.to_from_json(body);
746 Some((Some(topic.clone()), body))
747 }
748 }
749 _ => {
750 log::error!("parameter is: {param:?}");
751 None
752 }
753 };
754
755 log::debug!("bodys: {bodys:?}");
756
757 if let Some((topic, body)) = bodys {
758 let tx = self.tx.read().await.clone();
759 if let Err(e) = tx.send((typ, topic, body)).await {
760 log::warn!("web-hook send error, typ: {typ:?}, {e:?}");
761 } else {
762 self.chan_queue_count.fetch_add(1, Ordering::SeqCst);
763 }
764 }
765
766 (true, acc)
767 }
768}
769
770struct HookWriter {
771 file_name: String,
772 file: Option<File>,
773}
774
775impl HookWriter {
776 fn new(file: ByteString) -> Self {
777 Self { file_name: file.to_string(), file: None }
778 }
779
780 #[inline]
781 pub async fn log(&mut self, msg: &[u8]) -> std::result::Result<(), Box<dyn std::error::Error>> {
782 if let Some(file) = self.file.as_mut() {
783 file.write_all(msg).await?;
784 file.write_all(b"\n").await?;
785 } else {
786 Self::create_dirs(Path::new(&self.file_name)).await?;
787 let mut file = OpenOptions::new().create(true).append(true).open(&self.file_name).await?;
788 file.write_all(msg).await?;
789 file.write_all(b"\n").await?;
790 self.file.replace(file);
791 }
792 Ok(())
793 }
794
795 #[inline]
796 async fn create_dirs(path: &Path) -> std::result::Result<(), std::io::Error> {
797 if let Some(parent) = path.parent() {
798 if !parent.exists() {
800 tokio::fs::create_dir_all(parent).await?;
801 }
802 }
803 Ok(())
804 }
805}