1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; use std::thread::JoinHandle; mod ingestor; pub mod models; use crate::ingestor::HttpIngestor; use crate::models::{Log, Priority}; const QUEUE_BUFFER: usize = 1_000; const FLUSH_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5); const MIN_FLUSH_INTERVAL: std::time::Duration = std::time::Duration::from_secs(1); const MIN_LOOP_INTERVAL: std::time::Duration = std::time::Duration::from_millis(100); enum Signal { HasValidApiKey(bool), Log(Log), Flush, None, } #[derive(Debug)] pub struct Logger { log_tx: flume::Sender<Signal>, thread_rx: flume::Receiver<Signal>, flag: Arc<AtomicBool>, handle: RwLock<Option<JoinHandle<()>>>, } impl Logger { pub fn new(api_key: String) -> Result<Self, String> { let (log_tx, log_rx) = flume::bounded::<Signal>(QUEUE_BUFFER); let flag = Arc::new(AtomicBool::new(false)); let stop_flag = flag.clone(); let (thread_tx, thread_rx) = flume::unbounded::<Signal>(); let handle = std::thread::spawn(move || { let client = HttpIngestor::new(api_key); let has_api_key = client.log(Vec::with_capacity(0)); if let Err(err) = thread_tx.send(Signal::HasValidApiKey(has_api_key)) { println!("[dlog] Failed to signal API_KEY check: {}", err); } if has_api_key { let mut queue = Vec::<Log>::with_capacity(QUEUE_BUFFER); let mut last_flush = std::time::Instant::now(); while !stop_flag.load(Ordering::Relaxed) { let signal = match log_rx.recv_timeout(MIN_LOOP_INTERVAL) { Err(flume::RecvTimeoutError::Disconnected) => break, Ok(signal) => signal, _ => Signal::None, }; let flush = match signal { Signal::Flush => true, Signal::Log(log) => { queue.push(log); false }, _ => false }; if queue.len() > 0 && (flush || queue.len() >= QUEUE_BUFFER || last_flush.elapsed() >= MIN_FLUSH_INTERVAL) { client.log(queue); queue = Vec::<Log>::with_capacity(QUEUE_BUFFER); last_flush = std::time::Instant::now(); } if flush { if let Err(err) = thread_tx.send(Signal::Flush) { println!("[dlog] Failed to send flush signal back: {}", err); } } } client.log(queue); } }); match thread_rx.recv() { Err(err) => println!("[dlog::configure] Failed to receive API_KEY check signal: {}", err), Ok(Signal::HasValidApiKey(false)) => return Err("[dlog::configure] Please configure dlog with a valid API_KEY!".to_owned()), _ => (), }; Ok(Self { log_tx, thread_rx, flag, handle: RwLock::new(Some(handle)), }) } pub fn log(&self, priority: Priority, message: String) -> Result<(), String> { match self.log_tx.send(Signal::Log(Log::new(priority, message))) { Err(err) => Err(format!("Failed to move log to sender: {}", err)), Ok(_) => Ok(()), } } pub fn flush(&self) -> Result<(), String> { if let Err(err) = self.log_tx.send(Signal::Flush) { return Err(format!("Failed to send thread signal: {}", err)) } match self.thread_rx.recv_timeout(FLUSH_TIMEOUT) { Err(flume::RecvTimeoutError::Disconnected) => Err(format!("Failed to receive thread signal")), _ => Ok(()) } } pub fn clean_up(&self) { self.flag.store(true, Ordering::Relaxed); let mut write = match self.handle.write() { Err(err) => { println!("[dlog] Failed to get write lock during cleanup: {}", err); return }, Ok(val) => val, }; let handle = match write.take() { None => return, Some(val) => val, }; if let Err(err) = handle.join() { println!("[dlog] Failed to join ingest thread: {:?}", err) } } }