dlog_core 1.1.2

Core library for the dlog logging platform
Documentation
use directories::ProjectDirs;
use std::cmp::min;
use std::fs::OpenOptions;
use std::io::{BufRead, BufReader, Write};
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::time::timeout;

use crate::ingest::HttpIngestor;
use crate::models::Log;

pub enum BacklogSignal {
    Entries(Vec<Log>),
    Exit,
}

pub struct Backlog {
    signal_receiver: flume::Receiver<BacklogSignal>,
    ingest: Arc<HttpIngestor>,
    dirs: ProjectDirs,
    exit: bool,
    queue: Vec<Log>,
    is_empty: Arc<AtomicBool>,
}

const BACKLOG_CHUNK_SIZE: usize = 1_000;
const BACKLOG_CHECK_INTERVAL: Duration = Duration::from_secs(5);
const BACKLOG_MIN_LOOP_INTERVAL: Duration = Duration::from_millis(100);

impl Backlog {
    pub fn new(ingest: Arc<HttpIngestor>) -> (Self, Arc<AtomicBool>, flume::Sender<BacklogSignal>) {
        let dirs = ProjectDirs::from("cloud.dlog", "", "dlog").unwrap();
        let (signal_sender, signal_receiver) = flume::unbounded();
        let is_empty = Arc::new(AtomicBool::new(true));
        (
            Self {
                dirs,
                ingest,
                signal_receiver,
                exit: false,
                queue: Vec::with_capacity(BACKLOG_CHUNK_SIZE),
                is_empty: is_empty.clone(),
            },
            is_empty,
            signal_sender,
        )
    }

    pub async fn start(&mut self) {
        let mut last_check = Instant::now();
        self.load_from_disk();

        while !self.exit {
            if let Ok(val) = timeout(BACKLOG_MIN_LOOP_INTERVAL, self.signal_receiver.recv_async()).await {
                self.receive(val).await;
            }

            if last_check.elapsed() >= BACKLOG_CHECK_INTERVAL {
                last_check = Instant::now();
                self.retry().await;
            }
        }

        self.flush_to_disk().await;
    }

    async fn receive(&mut self, signal: Result<BacklogSignal, flume::RecvError>) {
        self.is_empty.store(false, Ordering::Relaxed);
        match signal {
            Ok(BacklogSignal::Entries(mut logs)) => self.queue.append(&mut logs),
            _ => self.exit = true,
        }
    }

    async fn retry(&mut self) {
        if !self.queue.is_empty() && self.ingest.check().await {
            self.load_from_disk();
            self.is_empty.store(true, Ordering::Relaxed);
            while !self.queue.is_empty() {
                let mut logs = self
                    .queue
                    .drain(..min(self.queue.len(), BACKLOG_CHUNK_SIZE))
                    .collect::<Vec<Log>>();
                if let Err(err) = self.ingest.log_async(&logs).await {
                    self.queue.append(&mut logs);
                    self.queue.push(err);
                    return;
                }
            }
        } else if !self.queue.is_empty() {
            self.flush_to_disk().await;
        }
    }

    async fn flush_to_disk(&mut self) {
        if !self.queue.is_empty() {
            if let Some(mut file) = Self::get_file(&self.dirs, true) {
                for log in self.queue.drain(..self.queue.len()) {
                    if let Ok(ctn) = serde_json::to_string(&log) {
                        if let Err(err) = writeln!(file, "{}", ctn) {
                            eprintln!("[dlog] Cannot write log to cache: {}", err);
                        }
                    }
                }
            }
        }
    }

    fn load_from_disk(&mut self) {
        if let Some(file) = Self::get_file(&self.dirs, false) {
            let reader = BufReader::new(file);
            let mut logs = Vec::new();
            for line in reader.lines() {
                if let Ok(line) = line {
                    if let Ok(log) = serde_json::from_str::<Log>(&line) {
                        logs.push(log);
                    }
                }
            }
            logs.append(&mut self.queue);
            self.queue = logs;
            Self::remove_file(&self.dirs);
        }
    }

    fn get_file(dirs: &ProjectDirs, create: bool) -> Option<std::fs::File> {
        match Self::get_path(dirs) {
            None => None,
            Some(path) => OpenOptions::new()
                .read(true)
                .append(true)
                .create(create)
                .open(path)
                .ok(),
        }
    }

    fn remove_file(dirs: &ProjectDirs) {
        if let Some(path) = Self::get_path(dirs) {
            let _ = std::fs::remove_file(path);
        }
    }

    fn get_path(dirs: &ProjectDirs) -> Option<PathBuf> {
        let path = dirs.config_dir();
        if !path.exists() && std::fs::create_dir_all(&path).is_err() {
            eprintln!("[dlog] Cannot cache logs to {:?}", path);
            return None;
        }
        Some(path.join("backlog.dat"))
    }
}