1use std::fs::{File, OpenOptions};
11use std::io::{Read, Seek, SeekFrom, Write};
12use std::path::{Path, PathBuf};
13
14use anyhow::{anyhow, bail, Result};
15
16use dragoon_proto::models::LogStream;
17
18pub fn task_dir(blobs_root: &Path, task_id: &str) -> PathBuf {
19 blobs_root.join(task_id)
20}
21
22pub fn artifacts_dir(blobs_root: &Path, task_id: &str) -> PathBuf {
23 task_dir(blobs_root, task_id).join("artifacts")
24}
25
26fn stream_filename(stream: LogStream) -> &'static str {
27 match stream {
28 LogStream::Stdout => "stdout.log",
29 LogStream::Stderr => "stderr.log",
30 }
31}
32
33pub fn log_path(blobs_root: &Path, task_id: &str, stream: LogStream) -> PathBuf {
34 task_dir(blobs_root, task_id).join(stream_filename(stream))
35}
36
37pub fn ensure_task_dir(blobs_root: &Path, task_id: &str) -> Result<PathBuf> {
38 let d = task_dir(blobs_root, task_id);
39 std::fs::create_dir_all(&d)?;
40 std::fs::create_dir_all(artifacts_dir(blobs_root, task_id))?;
41 Ok(d)
42}
43
44pub fn append_log(
46 blobs_root: &Path,
47 task_id: &str,
48 stream: LogStream,
49 data: &[u8],
50) -> Result<u64> {
51 ensure_task_dir(blobs_root, task_id)?;
52 let p = log_path(blobs_root, task_id, stream);
53 let mut f = OpenOptions::new().create(true).append(true).open(&p)?;
54 f.write_all(data)?;
55 Ok(f.metadata()?.len())
56}
57
58pub fn read_log_slice(
61 blobs_root: &Path,
62 task_id: &str,
63 stream: LogStream,
64 since: u64,
65) -> Result<(Vec<u8>, u64)> {
66 let p = log_path(blobs_root, task_id, stream);
67 if !p.exists() {
68 return Ok((Vec::new(), since));
69 }
70 let mut f = File::open(&p)?;
71 let size = f.metadata()?.len();
72 if size <= since {
73 return Ok((Vec::new(), since));
74 }
75 f.seek(SeekFrom::Start(since))?;
76 let mut buf = Vec::with_capacity(usize::try_from(size - since).unwrap_or(usize::MAX));
77 f.read_to_end(&mut buf)?;
78 Ok((buf, size))
79}
80
81fn sanitize_relpath(p: &str) -> Result<String> {
84 if p.starts_with('/') || p.starts_with('\\') {
85 bail!("refusing absolute path: {p:?}");
86 }
87 let normalised = p.replace('\\', "/");
88 let mut parts: Vec<String> = Vec::new();
89 for seg in normalised.split('/') {
90 if seg.is_empty() || seg == "." {
91 continue;
92 }
93 if seg == ".." {
94 bail!("refusing parent-traversal in {p:?}");
95 }
96 parts.push(seg.to_owned());
97 }
98 if parts.is_empty() {
99 bail!("refusing empty/blank path: {p:?}");
100 }
101 Ok(parts.join("/"))
102}
103
104pub fn store_artifact(
107 blobs_root: &Path,
108 task_id: &str,
109 rel_path: &str,
110 data: &[u8],
111) -> Result<PathBuf> {
112 let safe = sanitize_relpath(rel_path)?;
113 let base = artifacts_dir(blobs_root, task_id);
114 let dest = base.join(&safe);
115 if let Some(parent) = dest.parent() {
116 std::fs::create_dir_all(parent)?;
117 }
118 let mut f = File::create(&dest)?;
119 f.write_all(data)?;
120 Ok(dest)
121}
122
123pub fn delete_task_blobs(blobs_root: &Path, task_id: &str) -> Result<u64> {
126 let d = task_dir(blobs_root, task_id);
127 if !d.exists() {
128 return Ok(0);
129 }
130 let mut freed: u64 = 0;
131 for entry in walkdir::WalkDir::new(&d) {
132 let entry = entry.map_err(|e| anyhow!("walkdir: {e}"))?;
133 if entry.file_type().is_file() {
134 if let Ok(meta) = entry.metadata() {
135 freed += meta.len();
136 }
137 }
138 }
139 std::fs::remove_dir_all(&d).ok();
140 Ok(freed)
141}
142
143#[cfg(test)]
144mod tests {
145 use super::*;
146
147 #[test]
148 fn append_and_read_round_trip() {
149 let dir = tempfile::tempdir().unwrap();
150 let blobs = dir.path();
151 let n = append_log(blobs, "t1", LogStream::Stdout, b"hello").unwrap();
152 assert_eq!(n, 5);
153 let n2 = append_log(blobs, "t1", LogStream::Stdout, b" world").unwrap();
154 assert_eq!(n2, 11);
155 let (data, seq) = read_log_slice(blobs, "t1", LogStream::Stdout, 0).unwrap();
156 assert_eq!(data, b"hello world");
157 assert_eq!(seq, 11);
158 }
159
160 #[test]
161 fn read_since_offset() {
162 let dir = tempfile::tempdir().unwrap();
163 append_log(dir.path(), "t", LogStream::Stdout, b"abcdefg").unwrap();
164 let (data, seq) = read_log_slice(dir.path(), "t", LogStream::Stdout, 3).unwrap();
165 assert_eq!(data, b"defg");
166 assert_eq!(seq, 7);
167 }
168
169 #[test]
170 fn read_missing_file() {
171 let dir = tempfile::tempdir().unwrap();
172 let (data, seq) = read_log_slice(dir.path(), "missing", LogStream::Stdout, 0).unwrap();
173 assert!(data.is_empty());
174 assert_eq!(seq, 0);
175 }
176
177 #[test]
178 fn store_artifact_round_trip() {
179 let dir = tempfile::tempdir().unwrap();
180 let p = store_artifact(dir.path(), "t", "outputs/a.log", b"hi").unwrap();
181 assert_eq!(std::fs::read(&p).unwrap(), b"hi");
182 assert!(p.starts_with(artifacts_dir(dir.path(), "t")));
183 }
184
185 #[test]
186 fn store_artifact_rejects_traversal() {
187 let dir = tempfile::tempdir().unwrap();
188 assert!(store_artifact(dir.path(), "t", "../../etc/passwd", b"x").is_err());
189 assert!(store_artifact(dir.path(), "t", "/etc/passwd", b"x").is_err());
190 assert!(store_artifact(dir.path(), "t", "", b"x").is_err());
191 }
192
193 #[test]
194 fn delete_blobs() {
195 let dir = tempfile::tempdir().unwrap();
196 append_log(dir.path(), "t", LogStream::Stdout, b"abc").unwrap();
197 store_artifact(dir.path(), "t", "x.log", b"abc").unwrap();
198 let freed = delete_task_blobs(dir.path(), "t").unwrap();
199 assert!(freed >= 6);
200 assert!(!task_dir(dir.path(), "t").exists());
201 }
202}