use std::fs::{File, OpenOptions};
use std::io::{Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use anyhow::{anyhow, bail, Result};
use dragoon_proto::models::LogStream;
pub fn task_dir(blobs_root: &Path, task_id: &str) -> PathBuf {
blobs_root.join(task_id)
}
pub fn artifacts_dir(blobs_root: &Path, task_id: &str) -> PathBuf {
task_dir(blobs_root, task_id).join("artifacts")
}
fn stream_filename(stream: LogStream) -> &'static str {
match stream {
LogStream::Stdout => "stdout.log",
LogStream::Stderr => "stderr.log",
}
}
pub fn log_path(blobs_root: &Path, task_id: &str, stream: LogStream) -> PathBuf {
task_dir(blobs_root, task_id).join(stream_filename(stream))
}
pub fn ensure_task_dir(blobs_root: &Path, task_id: &str) -> Result<PathBuf> {
let d = task_dir(blobs_root, task_id);
std::fs::create_dir_all(&d)?;
std::fs::create_dir_all(artifacts_dir(blobs_root, task_id))?;
Ok(d)
}
pub fn append_log(
blobs_root: &Path,
task_id: &str,
stream: LogStream,
data: &[u8],
) -> Result<u64> {
ensure_task_dir(blobs_root, task_id)?;
let p = log_path(blobs_root, task_id, stream);
let mut f = OpenOptions::new().create(true).append(true).open(&p)?;
f.write_all(data)?;
Ok(f.metadata()?.len())
}
pub fn read_log_slice(
blobs_root: &Path,
task_id: &str,
stream: LogStream,
since: u64,
) -> Result<(Vec<u8>, u64)> {
let p = log_path(blobs_root, task_id, stream);
if !p.exists() {
return Ok((Vec::new(), since));
}
let mut f = File::open(&p)?;
let size = f.metadata()?.len();
if size <= since {
return Ok((Vec::new(), since));
}
f.seek(SeekFrom::Start(since))?;
let mut buf = Vec::with_capacity(usize::try_from(size - since).unwrap_or(usize::MAX));
f.read_to_end(&mut buf)?;
Ok((buf, size))
}
fn sanitize_relpath(p: &str) -> Result<String> {
if p.starts_with('/') || p.starts_with('\\') {
bail!("refusing absolute path: {p:?}");
}
let normalised = p.replace('\\', "/");
let mut parts: Vec<String> = Vec::new();
for seg in normalised.split('/') {
if seg.is_empty() || seg == "." {
continue;
}
if seg == ".." {
bail!("refusing parent-traversal in {p:?}");
}
parts.push(seg.to_owned());
}
if parts.is_empty() {
bail!("refusing empty/blank path: {p:?}");
}
Ok(parts.join("/"))
}
pub fn store_artifact(
blobs_root: &Path,
task_id: &str,
rel_path: &str,
data: &[u8],
) -> Result<PathBuf> {
let safe = sanitize_relpath(rel_path)?;
let base = artifacts_dir(blobs_root, task_id);
let dest = base.join(&safe);
if let Some(parent) = dest.parent() {
std::fs::create_dir_all(parent)?;
}
let mut f = File::create(&dest)?;
f.write_all(data)?;
Ok(dest)
}
pub fn delete_task_blobs(blobs_root: &Path, task_id: &str) -> Result<u64> {
let d = task_dir(blobs_root, task_id);
if !d.exists() {
return Ok(0);
}
let mut freed: u64 = 0;
for entry in walkdir::WalkDir::new(&d) {
let entry = entry.map_err(|e| anyhow!("walkdir: {e}"))?;
if entry.file_type().is_file() {
if let Ok(meta) = entry.metadata() {
freed += meta.len();
}
}
}
std::fs::remove_dir_all(&d).ok();
Ok(freed)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn append_and_read_round_trip() {
let dir = tempfile::tempdir().unwrap();
let blobs = dir.path();
let n = append_log(blobs, "t1", LogStream::Stdout, b"hello").unwrap();
assert_eq!(n, 5);
let n2 = append_log(blobs, "t1", LogStream::Stdout, b" world").unwrap();
assert_eq!(n2, 11);
let (data, seq) = read_log_slice(blobs, "t1", LogStream::Stdout, 0).unwrap();
assert_eq!(data, b"hello world");
assert_eq!(seq, 11);
}
#[test]
fn read_since_offset() {
let dir = tempfile::tempdir().unwrap();
append_log(dir.path(), "t", LogStream::Stdout, b"abcdefg").unwrap();
let (data, seq) = read_log_slice(dir.path(), "t", LogStream::Stdout, 3).unwrap();
assert_eq!(data, b"defg");
assert_eq!(seq, 7);
}
#[test]
fn read_missing_file() {
let dir = tempfile::tempdir().unwrap();
let (data, seq) = read_log_slice(dir.path(), "missing", LogStream::Stdout, 0).unwrap();
assert!(data.is_empty());
assert_eq!(seq, 0);
}
#[test]
fn store_artifact_round_trip() {
let dir = tempfile::tempdir().unwrap();
let p = store_artifact(dir.path(), "t", "outputs/a.log", b"hi").unwrap();
assert_eq!(std::fs::read(&p).unwrap(), b"hi");
assert!(p.starts_with(artifacts_dir(dir.path(), "t")));
}
#[test]
fn store_artifact_rejects_traversal() {
let dir = tempfile::tempdir().unwrap();
assert!(store_artifact(dir.path(), "t", "../../etc/passwd", b"x").is_err());
assert!(store_artifact(dir.path(), "t", "/etc/passwd", b"x").is_err());
assert!(store_artifact(dir.path(), "t", "", b"x").is_err());
}
#[test]
fn delete_blobs() {
let dir = tempfile::tempdir().unwrap();
append_log(dir.path(), "t", LogStream::Stdout, b"abc").unwrap();
store_artifact(dir.path(), "t", "x.log", b"abc").unwrap();
let freed = delete_task_blobs(dir.path(), "t").unwrap();
assert!(freed >= 6);
assert!(!task_dir(dir.path(), "t").exists());
}
}