1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
use std::sync::RwLock;

mod backlog;
mod ingest;
pub mod models;
pub mod transforms;
mod worker;

use crate::models::{Log, Priority};
use crate::transforms::Transforms;
use crate::worker::{Signal, Worker};

const FLUSH_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);

#[derive(Debug)]
pub struct Logger {
    signal_sender: flume::Sender<Signal>,
    flush_receiver: flume::Receiver<()>,
    handle: RwLock<Option<tokio::runtime::Runtime>>,
}

impl Logger {
    pub fn new(api_key: String, transforms: Transforms) -> Result<Self, String> {
        let (mut worker, mut backlog, signal_sender, flush_receiver) = Worker::new(api_key, transforms)?;

        let runtime = tokio::runtime::Runtime::new().unwrap();
        if !runtime.block_on(worker.has_valid_api_key()) {
            return Err(String::from("[dlog] Please configure dlog with a valid API_KEY"));
        }

        runtime.spawn(async move {
            let _ = futures::future::try_join_all(vec![
                tokio::task::spawn(async move { worker.start().await }),
                tokio::task::spawn(async move { backlog.start().await }),
            ])
            .await;
        });

        Ok(Self {
            signal_sender,
            flush_receiver,
            handle: RwLock::new(Some(runtime)),
        })
    }

    pub fn log(&self, priority: Priority, message: String) -> Result<(), String> {
        match self.signal_sender.send(Signal::Log(Log::new(priority, message))) {
            Err(err) => Err(format!("Failed to move log to sender: {}", err)),
            _ => Ok(()),
        }
    }

    pub fn flush(&self) -> Result<(), String> {
        if let Err(err) = self.signal_sender.send(Signal::Flush) {
            return Err(format!("Failed to send thread signal: {}", err));
        }

        match self.flush_receiver.recv_timeout(FLUSH_TIMEOUT) {
            Err(flume::RecvTimeoutError::Disconnected) => Err("Failed to receive thread signal".to_string()),
            _ => Ok(()),
        }
    }

    pub fn clean_up(&self) {
        match self.signal_sender.send(Signal::Exit) {
            Err(err) => println!("[dlog] Could not send exit signal, some logs might be lost: {}", err),
            Ok(_) => {
                let _ = self.flush_receiver.recv_timeout(FLUSH_TIMEOUT);
            }
        }

        let mut write = match self.handle.write() {
            Err(err) => {
                println!("[dlog] Failed to get write lock during cleanup: {}", err);
                return;
            }
            Ok(val) => val,
        };

        let _ = write.take();
    }
}