thenodes 0.2.0

TheNodes is a modular, plugin-driven P2P node framework for Rust, supporting node-embedded plugins (NEP) and core-as-a-library (CAL) modes with async-first APIs.
Documentation
use crate::{
    constants::ICON_PLACEHOLDER,
    events::model::{EventMeta, LogEvent, LogLevel},
};
use async_trait::async_trait;
use tokio::io::AsyncWriteExt;

#[async_trait]
pub trait LogSink: Send + Sync {
    async fn handle(&self, event: &LogEvent);
    async fn flush(&self) {}
}

pub struct ConsoleSink {
    #[allow(dead_code)]
    level_filter: Option<LogLevel>,
}
impl ConsoleSink {
    pub fn new(level_filter: Option<LogLevel>) -> Self {
        Self { level_filter }
    }
}

fn level_rank(level: LogLevel) -> u8 {
    match level {
        LogLevel::Trace => 0,
        LogLevel::Debug => 1,
        LogLevel::Info => 2,
        LogLevel::Warn => 3,
        LogLevel::Error => 4,
    }
}

fn event_meta(event: &LogEvent) -> &EventMeta {
    match event {
        LogEvent::TrustDecision(e) => &e.meta,
        LogEvent::Promotion(e) => &e.meta,
        LogEvent::Network(e) => &e.meta,
        LogEvent::Plugin(e) => &e.meta,
        LogEvent::System(e) => &e.meta,
    }
}

#[async_trait]
impl LogSink for ConsoleSink {
    async fn handle(&self, event: &LogEvent) {
        let meta = event_meta(event);
        if meta.suppress_console {
            return;
        }
        if let Some(min) = self.level_filter {
            if level_rank(meta.level) < level_rank(min) {
                return;
            }
        }
        match event {
            LogEvent::TrustDecision(td) => {
                println!(
                    "{}TRUST role={:?} decision={} mode={} fp={:?} reason={} corr={:?}",
                    ICON_PLACEHOLDER,
                    td.role,
                    td.decision,
                    td.mode,
                    td.fingerprint,
                    td.reason,
                    td.meta.corr_id
                );
            }
            LogEvent::Promotion(p) => {
                println!(
                    "{}PROMOTE fp={} from={} to={} ok={} corr={:?}",
                    ICON_PLACEHOLDER,
                    p.fingerprint,
                    p.from_store,
                    p.to_store,
                    p.success,
                    p.meta.corr_id
                );
            }
            LogEvent::Network(n) => {
                println!(
                    "{}NET action={} addr={:?} detail={:?} corr={:?}",
                    ICON_PLACEHOLDER, n.action, n.addr, n.detail, n.meta.corr_id
                );
            }
            LogEvent::Plugin(p) => {
                println!(
                    "{}PLUGIN name={} action={} detail={:?} corr={:?}",
                    ICON_PLACEHOLDER, p.plugin, p.action, p.detail, p.meta.corr_id
                );
            }
            LogEvent::System(s) => {
                println!(
                    "{}SYS action={} detail={:?} corr={:?}",
                    ICON_PLACEHOLDER, s.action, s.detail, s.meta.corr_id
                );
            }
        }
    }
}

pub struct JsonFileSink {
    path: std::path::PathBuf,
    rotate: bool,
    max_size_bytes: u64,
    max_backups: u32,
    writer: tokio::sync::Mutex<Option<tokio::fs::File>>,
}

impl JsonFileSink {
    pub async fn new<P: Into<std::path::PathBuf>>(
        path: P,
        rotate: bool,
        max_size_bytes: u64,
        max_backups: u32,
    ) -> std::io::Result<Self> {
        let pb = path.into();
        if let Some(parent) = pb.parent() {
            tokio::fs::create_dir_all(parent).await.ok();
        }
        let file = tokio::fs::OpenOptions::new()
            .create(true)
            .append(true)
            .open(&pb)
            .await
            .ok();
        Ok(Self {
            path: pb,
            rotate,
            max_size_bytes,
            max_backups,
            writer: tokio::sync::Mutex::new(file),
        })
    }
    async fn rotate_if_needed(&self) {
        if !self.rotate {
            return;
        }
        if let Ok(meta) = tokio::fs::metadata(&self.path).await {
            if meta.len() >= self.max_size_bytes {
                let _ = self.perform_rotation().await;
            }
        }
    }
    async fn perform_rotation(&self) -> std::io::Result<()> {
        {
            let mut guard = self.writer.lock().await;
            *guard = None;
        }
        for idx in (1..=self.max_backups).rev() {
            let from = self.path.with_extension(format!("jsonl.{}", idx));
            let to = self.path.with_extension(format!("jsonl.{}", idx + 1));
            if from.exists() {
                let _ = std::fs::rename(&from, &to);
            }
        }
        let rotated = self.path.with_extension("jsonl.1");
        std::fs::rename(&self.path, rotated)?;
        let file = tokio::fs::OpenOptions::new()
            .create(true)
            .append(true)
            .open(&self.path)
            .await?;
        let mut guard = self.writer.lock().await;
        *guard = Some(file);
        Ok(())
    }
}

#[async_trait]
impl LogSink for JsonFileSink {
    async fn handle(&self, event: &LogEvent) {
        self.rotate_if_needed().await;
        if let Ok(json) = serde_json::to_string(event) {
            let mut guard = self.writer.lock().await;
            if let Some(f) = guard.as_mut() {
                let _ = f.write_all(json.as_bytes()).await;
                let _ = f.write_all(b"\n").await;
            }
        }
    }
    async fn flush(&self) {
        let guard = self.writer.lock().await;
        if let Some(f) = guard.as_ref() {
            let _ = f.sync_all().await;
        }
    }
}