dlog_core/
lib.rs

1use std::sync::RwLock;
2
3mod backlog;
4mod ingest;
5pub mod models;
6pub mod transforms;
7mod worker;
8
9use crate::models::{Log, Priority};
10use crate::transforms::Transforms;
11use crate::worker::{Signal, Worker};
12
13const FLUSH_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(3);
14
15#[derive(Debug)]
16pub struct Logger {
17    signal_sender: flume::Sender<Signal>,
18    flush_receiver: flume::Receiver<()>,
19    handle: RwLock<Option<tokio::runtime::Runtime>>,
20}
21
22impl Logger {
23    pub fn new(api_key: String, transforms: Transforms) -> Result<Self, String> {
24        let (mut worker, mut backlog) = Worker::new(api_key, transforms)?;
25        let (signal_sender, flush_receiver) = (worker.signal_sender.clone(), worker.flush_receiver.clone());
26
27        let (valid_tx, valid_rx) = flume::bounded(1);
28        let runtime = tokio::runtime::Runtime::new().unwrap();
29        runtime.spawn(async move {
30            if let Err(err) = valid_tx.send(worker.has_valid_api_key().await) {
31                panic!(
32                    "[dlog::logger] Internal error: The API_KEY channel is closed (Sending end) | {}",
33                    err
34                )
35            }
36
37            let _ = futures::future::try_join_all(vec![
38                tokio::task::spawn(async move { worker.start().await }),
39                tokio::task::spawn(async move { backlog.start().await }),
40            ])
41            .await;
42        });
43
44        match valid_rx.recv() {
45            Err(err) => {
46                runtime.shutdown_background();
47                panic!(
48                    "[dlog::logger] Internal error: The API_KEY channel is closed (Receiving end) | {}",
49                    err
50                )
51            },
52            Ok(false) => {
53                runtime.shutdown_background();
54                return Err(String::from(
55                    "[dlog::logger] Please configure dlog with a valid API_KEY",
56                ))
57            }
58            _ => (),
59        };
60
61        // Wait for first flush signal => Ready to be used
62        if let Err(err) = flush_receiver.recv_timeout(std::time::Duration::from_secs(3)) {
63            eprintln!("[dlog::logger] Failed to receive ready signal: {}", err);
64        }
65
66        Ok(Self {
67            signal_sender,
68            flush_receiver,
69            handle: RwLock::new(Some(runtime)),
70        })
71    }
72
73    pub fn log(&self, priority: Priority, message: String) -> Result<(), String> {
74        match self.signal_sender.send(Signal::Log(Log::new(priority, message))) {
75            Err(err) => Err(format!("[dlog::logger] Failed to move log to sender: {}", err)),
76            _ => Ok(()),
77        }
78    }
79
80    pub fn flush(&self) -> Result<(), String> {
81        if let Err(err) = self.signal_sender.send_timeout(Signal::Flush, FLUSH_TIMEOUT) {
82            return Err(format!("[dlog::logger] Failed to send thread signal: {}", err));
83        }
84
85        match self.flush_receiver.recv_timeout(FLUSH_TIMEOUT) {
86            Err(flume::RecvTimeoutError::Disconnected) => {
87                Err("[dlog::logger] Failed to receive thread signal".to_string())
88            }
89            _ => Ok(()),
90        }
91    }
92
93    pub fn clean_up(&self) {
94        match self.signal_sender.send_timeout(Signal::Exit, FLUSH_TIMEOUT) {
95            Err(err) => println!(
96                "[dlog::logger] Could not send exit signal, some logs might be lost: {}",
97                err
98            ),
99            Ok(_) => {
100                if let Err(err) = self.flush_receiver.recv_timeout(FLUSH_TIMEOUT) {
101                    eprintln!("[dlog::logger] Failed to exit signal response: {}", err);
102                }
103            }
104        }
105
106        let mut write = match self.handle.write() {
107            Err(err) => {
108                println!("[dlog::logger] Failed to get write lock during cleanup: {}", err);
109                return;
110            }
111            Ok(val) => val,
112        };
113
114        let _ = write.take();
115    }
116}