dragoon-server 0.1.0

Public-relay server for the dragoon remote-executor: axum + rusqlite + ed25519 task signing + per-user message inbox.
Documentation
//! Blob storage on the local filesystem.
//!
//! Layout (per design ยง6 / `python/.../server/storage.py`):
//! ```text
//! <data_dir>/blobs/<task_id>/stdout.log
//! <data_dir>/blobs/<task_id>/stderr.log
//! <data_dir>/blobs/<task_id>/artifacts/<sanitized-relpath>
//! ```

use std::fs::{File, OpenOptions};
use std::io::{Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};

use anyhow::{anyhow, bail, Result};

use dragoon_proto::models::LogStream;

pub fn task_dir(blobs_root: &Path, task_id: &str) -> PathBuf {
    blobs_root.join(task_id)
}

pub fn artifacts_dir(blobs_root: &Path, task_id: &str) -> PathBuf {
    task_dir(blobs_root, task_id).join("artifacts")
}

fn stream_filename(stream: LogStream) -> &'static str {
    match stream {
        LogStream::Stdout => "stdout.log",
        LogStream::Stderr => "stderr.log",
    }
}

pub fn log_path(blobs_root: &Path, task_id: &str, stream: LogStream) -> PathBuf {
    task_dir(blobs_root, task_id).join(stream_filename(stream))
}

pub fn ensure_task_dir(blobs_root: &Path, task_id: &str) -> Result<PathBuf> {
    let d = task_dir(blobs_root, task_id);
    std::fs::create_dir_all(&d)?;
    std::fs::create_dir_all(artifacts_dir(blobs_root, task_id))?;
    Ok(d)
}

/// Append `data` to the relevant log file. Returns the new total size.
pub fn append_log(
    blobs_root: &Path,
    task_id: &str,
    stream: LogStream,
    data: &[u8],
) -> Result<u64> {
    ensure_task_dir(blobs_root, task_id)?;
    let p = log_path(blobs_root, task_id, stream);
    let mut f = OpenOptions::new().create(true).append(true).open(&p)?;
    f.write_all(data)?;
    Ok(f.metadata()?.len())
}

/// Read bytes >= `since`. Returns `(data, new_seq)`. If the file does not
/// exist, both `data` and `new_seq` are empty/`since`.
pub fn read_log_slice(
    blobs_root: &Path,
    task_id: &str,
    stream: LogStream,
    since: u64,
) -> Result<(Vec<u8>, u64)> {
    let p = log_path(blobs_root, task_id, stream);
    if !p.exists() {
        return Ok((Vec::new(), since));
    }
    let mut f = File::open(&p)?;
    let size = f.metadata()?.len();
    if size <= since {
        return Ok((Vec::new(), since));
    }
    f.seek(SeekFrom::Start(since))?;
    let mut buf = Vec::with_capacity(usize::try_from(size - since).unwrap_or(usize::MAX));
    f.read_to_end(&mut buf)?;
    Ok((buf, size))
}

/// Sanitize a relative path: reject absolute paths, parent-traversal
/// (`..`), and the empty string. Single-dot segments are folded out.
fn sanitize_relpath(p: &str) -> Result<String> {
    if p.starts_with('/') || p.starts_with('\\') {
        bail!("refusing absolute path: {p:?}");
    }
    let normalised = p.replace('\\', "/");
    let mut parts: Vec<String> = Vec::new();
    for seg in normalised.split('/') {
        if seg.is_empty() || seg == "." {
            continue;
        }
        if seg == ".." {
            bail!("refusing parent-traversal in {p:?}");
        }
        parts.push(seg.to_owned());
    }
    if parts.is_empty() {
        bail!("refusing empty/blank path: {p:?}");
    }
    Ok(parts.join("/"))
}

/// Store an artifact into the per-task `artifacts/` subtree, returning the
/// absolute path where it was written.
pub fn store_artifact(
    blobs_root: &Path,
    task_id: &str,
    rel_path: &str,
    data: &[u8],
) -> Result<PathBuf> {
    let safe = sanitize_relpath(rel_path)?;
    let base = artifacts_dir(blobs_root, task_id);
    let dest = base.join(&safe);
    if let Some(parent) = dest.parent() {
        std::fs::create_dir_all(parent)?;
    }
    let mut f = File::create(&dest)?;
    f.write_all(data)?;
    Ok(dest)
}

/// Recursively delete a task's blob tree; returns the bytes freed
/// (best-effort โ€” IO errors during traversal are ignored after first).
pub fn delete_task_blobs(blobs_root: &Path, task_id: &str) -> Result<u64> {
    let d = task_dir(blobs_root, task_id);
    if !d.exists() {
        return Ok(0);
    }
    let mut freed: u64 = 0;
    for entry in walkdir::WalkDir::new(&d) {
        let entry = entry.map_err(|e| anyhow!("walkdir: {e}"))?;
        if entry.file_type().is_file() {
            if let Ok(meta) = entry.metadata() {
                freed += meta.len();
            }
        }
    }
    std::fs::remove_dir_all(&d).ok();
    Ok(freed)
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn append_and_read_round_trip() {
        let dir = tempfile::tempdir().unwrap();
        let blobs = dir.path();
        let n = append_log(blobs, "t1", LogStream::Stdout, b"hello").unwrap();
        assert_eq!(n, 5);
        let n2 = append_log(blobs, "t1", LogStream::Stdout, b" world").unwrap();
        assert_eq!(n2, 11);
        let (data, seq) = read_log_slice(blobs, "t1", LogStream::Stdout, 0).unwrap();
        assert_eq!(data, b"hello world");
        assert_eq!(seq, 11);
    }

    #[test]
    fn read_since_offset() {
        let dir = tempfile::tempdir().unwrap();
        append_log(dir.path(), "t", LogStream::Stdout, b"abcdefg").unwrap();
        let (data, seq) = read_log_slice(dir.path(), "t", LogStream::Stdout, 3).unwrap();
        assert_eq!(data, b"defg");
        assert_eq!(seq, 7);
    }

    #[test]
    fn read_missing_file() {
        let dir = tempfile::tempdir().unwrap();
        let (data, seq) = read_log_slice(dir.path(), "missing", LogStream::Stdout, 0).unwrap();
        assert!(data.is_empty());
        assert_eq!(seq, 0);
    }

    #[test]
    fn store_artifact_round_trip() {
        let dir = tempfile::tempdir().unwrap();
        let p = store_artifact(dir.path(), "t", "outputs/a.log", b"hi").unwrap();
        assert_eq!(std::fs::read(&p).unwrap(), b"hi");
        assert!(p.starts_with(artifacts_dir(dir.path(), "t")));
    }

    #[test]
    fn store_artifact_rejects_traversal() {
        let dir = tempfile::tempdir().unwrap();
        assert!(store_artifact(dir.path(), "t", "../../etc/passwd", b"x").is_err());
        assert!(store_artifact(dir.path(), "t", "/etc/passwd", b"x").is_err());
        assert!(store_artifact(dir.path(), "t", "", b"x").is_err());
    }

    #[test]
    fn delete_blobs() {
        let dir = tempfile::tempdir().unwrap();
        append_log(dir.path(), "t", LogStream::Stdout, b"abc").unwrap();
        store_artifact(dir.path(), "t", "x.log", b"abc").unwrap();
        let freed = delete_task_blobs(dir.path(), "t").unwrap();
        assert!(freed >= 6);
        assert!(!task_dir(dir.path(), "t").exists());
    }
}