1use std::sync::RwLock;
2
3mod backlog;
4mod ingest;
5pub mod models;
6pub mod transforms;
7mod worker;
8
9use crate::models::{Log, Priority};
10use crate::transforms::Transforms;
11use crate::worker::{Signal, Worker};
12
13const FLUSH_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(3);
14
15#[derive(Debug)]
16pub struct Logger {
17 signal_sender: flume::Sender<Signal>,
18 flush_receiver: flume::Receiver<()>,
19 handle: RwLock<Option<tokio::runtime::Runtime>>,
20}
21
22impl Logger {
23 pub fn new(api_key: String, transforms: Transforms) -> Result<Self, String> {
24 let (mut worker, mut backlog) = Worker::new(api_key, transforms)?;
25 let (signal_sender, flush_receiver) = (worker.signal_sender.clone(), worker.flush_receiver.clone());
26
27 let (valid_tx, valid_rx) = flume::bounded(1);
28 let runtime = tokio::runtime::Runtime::new().unwrap();
29 runtime.spawn(async move {
30 if let Err(err) = valid_tx.send(worker.has_valid_api_key().await) {
31 panic!(
32 "[dlog::logger] Internal error: The API_KEY channel is closed (Sending end) | {}",
33 err
34 )
35 }
36
37 let _ = futures::future::try_join_all(vec![
38 tokio::task::spawn(async move { worker.start().await }),
39 tokio::task::spawn(async move { backlog.start().await }),
40 ])
41 .await;
42 });
43
44 match valid_rx.recv() {
45 Err(err) => {
46 runtime.shutdown_background();
47 panic!(
48 "[dlog::logger] Internal error: The API_KEY channel is closed (Receiving end) | {}",
49 err
50 )
51 },
52 Ok(false) => {
53 runtime.shutdown_background();
54 return Err(String::from(
55 "[dlog::logger] Please configure dlog with a valid API_KEY",
56 ))
57 }
58 _ => (),
59 };
60
61 if let Err(err) = flush_receiver.recv_timeout(std::time::Duration::from_secs(3)) {
63 eprintln!("[dlog::logger] Failed to receive ready signal: {}", err);
64 }
65
66 Ok(Self {
67 signal_sender,
68 flush_receiver,
69 handle: RwLock::new(Some(runtime)),
70 })
71 }
72
73 pub fn log(&self, priority: Priority, message: String) -> Result<(), String> {
74 match self.signal_sender.send(Signal::Log(Log::new(priority, message))) {
75 Err(err) => Err(format!("[dlog::logger] Failed to move log to sender: {}", err)),
76 _ => Ok(()),
77 }
78 }
79
80 pub fn flush(&self) -> Result<(), String> {
81 if let Err(err) = self.signal_sender.send_timeout(Signal::Flush, FLUSH_TIMEOUT) {
82 return Err(format!("[dlog::logger] Failed to send thread signal: {}", err));
83 }
84
85 match self.flush_receiver.recv_timeout(FLUSH_TIMEOUT) {
86 Err(flume::RecvTimeoutError::Disconnected) => {
87 Err("[dlog::logger] Failed to receive thread signal".to_string())
88 }
89 _ => Ok(()),
90 }
91 }
92
93 pub fn clean_up(&self) {
94 match self.signal_sender.send_timeout(Signal::Exit, FLUSH_TIMEOUT) {
95 Err(err) => println!(
96 "[dlog::logger] Could not send exit signal, some logs might be lost: {}",
97 err
98 ),
99 Ok(_) => {
100 if let Err(err) = self.flush_receiver.recv_timeout(FLUSH_TIMEOUT) {
101 eprintln!("[dlog::logger] Failed to exit signal response: {}", err);
102 }
103 }
104 }
105
106 let mut write = match self.handle.write() {
107 Err(err) => {
108 println!("[dlog::logger] Failed to get write lock during cleanup: {}", err);
109 return;
110 }
111 Ok(val) => val,
112 };
113
114 let _ = write.take();
115 }
116}