1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
use std::sync::RwLock; mod backlog; mod ingest; pub mod models; pub mod transforms; mod worker; use crate::models::{Log, Priority}; use crate::transforms::Transforms; use crate::worker::{Signal, Worker}; const FLUSH_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); #[derive(Debug)] pub struct Logger { signal_sender: flume::Sender<Signal>, flush_receiver: flume::Receiver<()>, handle: RwLock<Option<tokio::runtime::Runtime>>, } impl Logger { pub fn new(api_key: String, transforms: Transforms) -> Result<Self, String> { let (mut worker, mut backlog, signal_sender, flush_receiver) = Worker::new(api_key, transforms)?; let runtime = tokio::runtime::Runtime::new().unwrap(); if !runtime.block_on(worker.has_valid_api_key()) { return Err(String::from("[dlog] Please configure dlog with a valid API_KEY")); } runtime.spawn(async move { let _ = futures::future::try_join_all(vec![ tokio::task::spawn(async move { worker.start().await }), tokio::task::spawn(async move { backlog.start().await }), ]) .await; }); Ok(Self { signal_sender, flush_receiver, handle: RwLock::new(Some(runtime)), }) } pub fn log(&self, priority: Priority, message: String) -> Result<(), String> { match self.signal_sender.send(Signal::Log(Log::new(priority, message))) { Err(err) => Err(format!("Failed to move log to sender: {}", err)), _ => Ok(()), } } pub fn flush(&self) -> Result<(), String> { if let Err(err) = self.signal_sender.send(Signal::Flush) { return Err(format!("Failed to send thread signal: {}", err)); } match self.flush_receiver.recv_timeout(FLUSH_TIMEOUT) { Err(flume::RecvTimeoutError::Disconnected) => Err("Failed to receive thread signal".to_string()), _ => Ok(()), } } pub fn clean_up(&self) { match self.signal_sender.send(Signal::Exit) { Err(err) => println!("[dlog] Could not send exit signal, some logs might be lost: {}", err), Ok(_) => { let _ = self.flush_receiver.recv_timeout(FLUSH_TIMEOUT); } } let mut write = match self.handle.write() { Err(err) => { println!("[dlog] Failed to get write lock during cleanup: {}", err); return; } Ok(val) => val, }; let _ = write.take(); } }