1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
use std::sync::RwLock;

mod backlog;
mod ingest;
pub mod models;
pub mod transforms;
mod worker;

use crate::models::{Log, Priority};
use crate::transforms::Transforms;
use crate::worker::{Signal, Worker};

const FLUSH_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(3);

#[derive(Debug)]
pub struct Logger {
    signal_sender: flume::Sender<Signal>,
    flush_receiver: flume::Receiver<()>,
    handle: RwLock<Option<tokio::runtime::Runtime>>,
}

impl Logger {
    pub fn new(api_key: String, transforms: Transforms) -> Result<Self, String> {
        let (mut worker, mut backlog) = Worker::new(api_key, transforms)?;
        let (signal_sender, flush_receiver) = (worker.signal_sender.clone(), worker.flush_receiver.clone());

        let (valid_tx, valid_rx) = flume::bounded(1);
        let runtime = tokio::runtime::Runtime::new().unwrap();
        runtime.spawn(async move {
            if let Err(err) = valid_tx.send(worker.has_valid_api_key().await) {
                panic!(
                    "[dlog::logger] Internal error: The API_KEY channel is closed (Sending end) | {}",
                    err
                )
            }

            let _ = futures::future::try_join_all(vec![
                tokio::task::spawn(async move { worker.start().await }),
                tokio::task::spawn(async move { backlog.start().await }),
            ])
            .await;
        });

        match valid_rx.recv() {
            Err(err) => {
                runtime.shutdown_background();
                panic!(
                    "[dlog::logger] Internal error: The API_KEY channel is closed (Receiving end) | {}",
                    err
                )
            },
            Ok(false) => {
                runtime.shutdown_background();
                return Err(String::from(
                    "[dlog::logger] Please configure dlog with a valid API_KEY",
                ))
            }
            _ => (),
        };

        // Wait for first flush signal => Ready to be used
        if let Err(err) = flush_receiver.recv_timeout(std::time::Duration::from_secs(3)) {
            eprintln!("[dlog::logger] Failed to receive ready signal: {}", err);
        }

        Ok(Self {
            signal_sender,
            flush_receiver,
            handle: RwLock::new(Some(runtime)),
        })
    }

    pub fn log(&self, priority: Priority, message: String) -> Result<(), String> {
        match self.signal_sender.send(Signal::Log(Log::new(priority, message))) {
            Err(err) => Err(format!("[dlog::logger] Failed to move log to sender: {}", err)),
            _ => Ok(()),
        }
    }

    pub fn flush(&self) -> Result<(), String> {
        if let Err(err) = self.signal_sender.send_timeout(Signal::Flush, FLUSH_TIMEOUT) {
            return Err(format!("[dlog::logger] Failed to send thread signal: {}", err));
        }

        match self.flush_receiver.recv_timeout(FLUSH_TIMEOUT) {
            Err(flume::RecvTimeoutError::Disconnected) => {
                Err("[dlog::logger] Failed to receive thread signal".to_string())
            }
            _ => Ok(()),
        }
    }

    pub fn clean_up(&self) {
        match self.signal_sender.send_timeout(Signal::Exit, FLUSH_TIMEOUT) {
            Err(err) => println!(
                "[dlog::logger] Could not send exit signal, some logs might be lost: {}",
                err
            ),
            Ok(_) => {
                if let Err(err) = self.flush_receiver.recv_timeout(FLUSH_TIMEOUT) {
                    eprintln!("[dlog::logger] Failed to exit signal response: {}", err);
                }
            }
        }

        let mut write = match self.handle.write() {
            Err(err) => {
                println!("[dlog::logger] Failed to get write lock during cleanup: {}", err);
                return;
            }
            Ok(val) => val,
        };

        let _ = write.take();
    }
}