Skip to main content

dragoon_server/
storage.rs

1//! Blob storage on the local filesystem.
2//!
3//! Layout (per design §6 / `python/.../server/storage.py`):
4//! ```text
5//! <data_dir>/blobs/<task_id>/stdout.log
6//! <data_dir>/blobs/<task_id>/stderr.log
7//! <data_dir>/blobs/<task_id>/artifacts/<sanitized-relpath>
8//! ```
9
10use std::fs::{File, OpenOptions};
11use std::io::{Read, Seek, SeekFrom, Write};
12use std::path::{Path, PathBuf};
13
14use anyhow::{anyhow, bail, Result};
15
16use dragoon_proto::models::LogStream;
17
18pub fn task_dir(blobs_root: &Path, task_id: &str) -> PathBuf {
19    blobs_root.join(task_id)
20}
21
22pub fn artifacts_dir(blobs_root: &Path, task_id: &str) -> PathBuf {
23    task_dir(blobs_root, task_id).join("artifacts")
24}
25
26fn stream_filename(stream: LogStream) -> &'static str {
27    match stream {
28        LogStream::Stdout => "stdout.log",
29        LogStream::Stderr => "stderr.log",
30    }
31}
32
33pub fn log_path(blobs_root: &Path, task_id: &str, stream: LogStream) -> PathBuf {
34    task_dir(blobs_root, task_id).join(stream_filename(stream))
35}
36
37pub fn ensure_task_dir(blobs_root: &Path, task_id: &str) -> Result<PathBuf> {
38    let d = task_dir(blobs_root, task_id);
39    std::fs::create_dir_all(&d)?;
40    std::fs::create_dir_all(artifacts_dir(blobs_root, task_id))?;
41    Ok(d)
42}
43
44/// Append `data` to the relevant log file. Returns the new total size.
45pub fn append_log(
46    blobs_root: &Path,
47    task_id: &str,
48    stream: LogStream,
49    data: &[u8],
50) -> Result<u64> {
51    ensure_task_dir(blobs_root, task_id)?;
52    let p = log_path(blobs_root, task_id, stream);
53    let mut f = OpenOptions::new().create(true).append(true).open(&p)?;
54    f.write_all(data)?;
55    Ok(f.metadata()?.len())
56}
57
58/// Read bytes >= `since`. Returns `(data, new_seq)`. If the file does not
59/// exist, both `data` and `new_seq` are empty/`since`.
60pub fn read_log_slice(
61    blobs_root: &Path,
62    task_id: &str,
63    stream: LogStream,
64    since: u64,
65) -> Result<(Vec<u8>, u64)> {
66    let p = log_path(blobs_root, task_id, stream);
67    if !p.exists() {
68        return Ok((Vec::new(), since));
69    }
70    let mut f = File::open(&p)?;
71    let size = f.metadata()?.len();
72    if size <= since {
73        return Ok((Vec::new(), since));
74    }
75    f.seek(SeekFrom::Start(since))?;
76    let mut buf = Vec::with_capacity(usize::try_from(size - since).unwrap_or(usize::MAX));
77    f.read_to_end(&mut buf)?;
78    Ok((buf, size))
79}
80
81/// Sanitize a relative path: reject absolute paths, parent-traversal
82/// (`..`), and the empty string. Single-dot segments are folded out.
83fn sanitize_relpath(p: &str) -> Result<String> {
84    if p.starts_with('/') || p.starts_with('\\') {
85        bail!("refusing absolute path: {p:?}");
86    }
87    let normalised = p.replace('\\', "/");
88    let mut parts: Vec<String> = Vec::new();
89    for seg in normalised.split('/') {
90        if seg.is_empty() || seg == "." {
91            continue;
92        }
93        if seg == ".." {
94            bail!("refusing parent-traversal in {p:?}");
95        }
96        parts.push(seg.to_owned());
97    }
98    if parts.is_empty() {
99        bail!("refusing empty/blank path: {p:?}");
100    }
101    Ok(parts.join("/"))
102}
103
104/// Store an artifact into the per-task `artifacts/` subtree, returning the
105/// absolute path where it was written.
106pub fn store_artifact(
107    blobs_root: &Path,
108    task_id: &str,
109    rel_path: &str,
110    data: &[u8],
111) -> Result<PathBuf> {
112    let safe = sanitize_relpath(rel_path)?;
113    let base = artifacts_dir(blobs_root, task_id);
114    let dest = base.join(&safe);
115    if let Some(parent) = dest.parent() {
116        std::fs::create_dir_all(parent)?;
117    }
118    let mut f = File::create(&dest)?;
119    f.write_all(data)?;
120    Ok(dest)
121}
122
123/// Recursively delete a task's blob tree; returns the bytes freed
124/// (best-effort — IO errors during traversal are ignored after first).
125pub fn delete_task_blobs(blobs_root: &Path, task_id: &str) -> Result<u64> {
126    let d = task_dir(blobs_root, task_id);
127    if !d.exists() {
128        return Ok(0);
129    }
130    let mut freed: u64 = 0;
131    for entry in walkdir::WalkDir::new(&d) {
132        let entry = entry.map_err(|e| anyhow!("walkdir: {e}"))?;
133        if entry.file_type().is_file() {
134            if let Ok(meta) = entry.metadata() {
135                freed += meta.len();
136            }
137        }
138    }
139    std::fs::remove_dir_all(&d).ok();
140    Ok(freed)
141}
142
143#[cfg(test)]
144mod tests {
145    use super::*;
146
147    #[test]
148    fn append_and_read_round_trip() {
149        let dir = tempfile::tempdir().unwrap();
150        let blobs = dir.path();
151        let n = append_log(blobs, "t1", LogStream::Stdout, b"hello").unwrap();
152        assert_eq!(n, 5);
153        let n2 = append_log(blobs, "t1", LogStream::Stdout, b" world").unwrap();
154        assert_eq!(n2, 11);
155        let (data, seq) = read_log_slice(blobs, "t1", LogStream::Stdout, 0).unwrap();
156        assert_eq!(data, b"hello world");
157        assert_eq!(seq, 11);
158    }
159
160    #[test]
161    fn read_since_offset() {
162        let dir = tempfile::tempdir().unwrap();
163        append_log(dir.path(), "t", LogStream::Stdout, b"abcdefg").unwrap();
164        let (data, seq) = read_log_slice(dir.path(), "t", LogStream::Stdout, 3).unwrap();
165        assert_eq!(data, b"defg");
166        assert_eq!(seq, 7);
167    }
168
169    #[test]
170    fn read_missing_file() {
171        let dir = tempfile::tempdir().unwrap();
172        let (data, seq) = read_log_slice(dir.path(), "missing", LogStream::Stdout, 0).unwrap();
173        assert!(data.is_empty());
174        assert_eq!(seq, 0);
175    }
176
177    #[test]
178    fn store_artifact_round_trip() {
179        let dir = tempfile::tempdir().unwrap();
180        let p = store_artifact(dir.path(), "t", "outputs/a.log", b"hi").unwrap();
181        assert_eq!(std::fs::read(&p).unwrap(), b"hi");
182        assert!(p.starts_with(artifacts_dir(dir.path(), "t")));
183    }
184
185    #[test]
186    fn store_artifact_rejects_traversal() {
187        let dir = tempfile::tempdir().unwrap();
188        assert!(store_artifact(dir.path(), "t", "../../etc/passwd", b"x").is_err());
189        assert!(store_artifact(dir.path(), "t", "/etc/passwd", b"x").is_err());
190        assert!(store_artifact(dir.path(), "t", "", b"x").is_err());
191    }
192
193    #[test]
194    fn delete_blobs() {
195        let dir = tempfile::tempdir().unwrap();
196        append_log(dir.path(), "t", LogStream::Stdout, b"abc").unwrap();
197        store_artifact(dir.path(), "t", "x.log", b"abc").unwrap();
198        let freed = delete_task_blobs(dir.path(), "t").unwrap();
199        assert!(freed >= 6);
200        assert!(!task_dir(dir.path(), "t").exists());
201    }
202}