stackpatrol 0.1.0

Single-binary Rust CLI that monitors a server and reports to the StackPatrol control plane.
use std::path::PathBuf;

use anyhow::{Context, Result};
use stackpatrol_core::event::EventEnvelope;
use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};

/// Hard cap on buffer size. When the API is down for hours we'd rather drop the
/// most recent event than fill the disk on a Pi. Roughly 5–10k events at v1
/// envelope sizes.
const MAX_BUFFER_BYTES: u64 = 1_048_576;

pub struct Buffer {
    path: PathBuf,
}

impl Buffer {
    pub fn new(path: PathBuf) -> Self {
        Self { path }
    }

    pub fn path(&self) -> &PathBuf {
        &self.path
    }

    /// Append one envelope as a JSON line. Drops silently if the file is at the cap —
    /// preferable to crashing the daemon over disk pressure.
    pub async fn append(&self, envelope: &EventEnvelope) -> Result<()> {
        if let Ok(meta) = tokio::fs::metadata(&self.path).await {
            if meta.len() >= MAX_BUFFER_BYTES {
                eprintln!(
                    "buffer: cap reached ({} bytes) — dropping event {:?}",
                    meta.len(),
                    envelope.event,
                );
                return Ok(());
            }
        }

        if let Some(parent) = self.path.parent() {
            tokio::fs::create_dir_all(parent)
                .await
                .with_context(|| format!("creating {}", parent.display()))?;
        }

        let line = serde_json::to_string(envelope).context("serializing envelope")?;
        let mut f = OpenOptions::new()
            .append(true)
            .create(true)
            .open(&self.path)
            .await
            .with_context(|| format!("opening {}", self.path.display()))?;
        f.write_all(line.as_bytes()).await?;
        f.write_all(b"\n").await?;
        f.flush().await?;
        Ok(())
    }

    /// Read every queued envelope. Skips lines that fail to parse — that's how we
    /// recover from a partial write left behind by a crash mid-append.
    pub async fn read_all(&self) -> Result<Vec<EventEnvelope>> {
        let f = match File::open(&self.path).await {
            Ok(f) => f,
            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
            Err(e) => {
                return Err(e)
                    .with_context(|| format!("opening {}", self.path.display()));
            }
        };

        let mut lines = BufReader::new(f).lines();
        let mut envelopes = Vec::new();
        while let Some(line) = lines.next_line().await? {
            if line.trim().is_empty() {
                continue;
            }
            match serde_json::from_str::<EventEnvelope>(&line) {
                Ok(e) => envelopes.push(e),
                Err(err) => {
                    eprintln!("buffer: skipping unparseable line: {err}");
                }
            }
        }
        Ok(envelopes)
    }

    /// Replace buffer contents with `remaining`. Atomic on POSIX via tmp + rename,
    /// so a crash during rewrite leaves either the old or the new file intact —
    /// never a half-written one.
    pub async fn rewrite(&self, remaining: &[EventEnvelope]) -> Result<()> {
        if remaining.is_empty() {
            match tokio::fs::remove_file(&self.path).await {
                Ok(()) => return Ok(()),
                Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(()),
                Err(e) => {
                    return Err(e)
                        .with_context(|| format!("removing {}", self.path.display()));
                }
            }
        }

        let tmp = self.path.with_extension("jsonl.tmp");
        if let Some(parent) = self.path.parent() {
            tokio::fs::create_dir_all(parent)
                .await
                .with_context(|| format!("creating {}", parent.display()))?;
        }
        let mut f = File::create(&tmp)
            .await
            .with_context(|| format!("creating {}", tmp.display()))?;
        for env in remaining {
            let line = serde_json::to_string(env).context("serializing envelope")?;
            f.write_all(line.as_bytes()).await?;
            f.write_all(b"\n").await?;
        }
        f.flush().await?;
        drop(f);
        tokio::fs::rename(&tmp, &self.path)
            .await
            .with_context(|| format!("renaming {} to {}", tmp.display(), self.path.display()))?;
        Ok(())
    }
}

pub fn default_path() -> Result<PathBuf> {
    let dir = dirs::data_dir().context("could not resolve data dir")?;
    Ok(dir.join("stackpatrol").join("buffer.jsonl"))
}

#[cfg(test)]
mod tests {
    use super::*;
    use stackpatrol_core::event::Event;
    use tempfile::tempdir;

    fn env(name: &str, ts: i64) -> EventEnvelope {
        EventEnvelope {
            server_name: "test".into(),
            timestamp: ts,
            event: Event::ServiceDown { name: name.into() },
        }
    }

    #[tokio::test]
    async fn append_and_read_roundtrip() {
        let dir = tempdir().unwrap();
        let buf = Buffer::new(dir.path().join("buffer.jsonl"));

        buf.append(&env("web", 1)).await.unwrap();
        buf.append(&env("db", 2)).await.unwrap();

        let read = buf.read_all().await.unwrap();
        assert_eq!(read.len(), 2);
        assert_eq!(read[0].timestamp, 1);
        assert_eq!(read[1].timestamp, 2);
    }

    #[tokio::test]
    async fn read_all_returns_empty_when_missing() {
        let dir = tempdir().unwrap();
        let buf = Buffer::new(dir.path().join("nope.jsonl"));
        assert!(buf.read_all().await.unwrap().is_empty());
    }

    #[tokio::test]
    async fn rewrite_with_empty_removes_file() {
        let dir = tempdir().unwrap();
        let buf = Buffer::new(dir.path().join("buffer.jsonl"));
        buf.append(&env("web", 1)).await.unwrap();
        assert!(buf.path().exists());

        buf.rewrite(&[]).await.unwrap();
        assert!(!buf.path().exists());
    }

    #[tokio::test]
    async fn rewrite_keeps_only_remaining() {
        let dir = tempdir().unwrap();
        let buf = Buffer::new(dir.path().join("buffer.jsonl"));
        buf.append(&env("a", 1)).await.unwrap();
        buf.append(&env("b", 2)).await.unwrap();
        buf.append(&env("c", 3)).await.unwrap();

        let all = buf.read_all().await.unwrap();
        let keep: Vec<_> = all.into_iter().filter(|e| e.timestamp != 2).collect();
        buf.rewrite(&keep).await.unwrap();

        let read = buf.read_all().await.unwrap();
        assert_eq!(read.len(), 2);
        assert_eq!(read[0].timestamp, 1);
        assert_eq!(read[1].timestamp, 3);
    }

    #[tokio::test]
    async fn unparseable_line_is_skipped() {
        let dir = tempdir().unwrap();
        let path = dir.path().join("buffer.jsonl");
        tokio::fs::write(&path, b"{not json}\n{\"server_name\":\"x\",\"timestamp\":5,\"event\":{\"kind\":\"heartbeat\",\"uptime_secs\":1}}\n")
            .await
            .unwrap();
        let buf = Buffer::new(path);
        let read = buf.read_all().await.unwrap();
        assert_eq!(read.len(), 1);
        assert_eq!(read[0].timestamp, 5);
    }
}