use std::io::Write;
use anyhow::{Context, Result, bail};
use kanade_shared::kv::OBJECT_COLLECTIONS;
use kanade_shared::manifest::CollectHint;
use kanade_shared::wire::Command;
use tracing::{info, warn};
pub async fn maybe_collect(
js: &async_nats::jetstream::Context,
cmd: &Command,
pc_id: &str,
stdout: &str,
now: chrono::DateTime<chrono::Utc>,
) -> Option<String> {
let hint = cmd.collect.as_ref()?;
match collect_and_upload(js, cmd, hint, pc_id, stdout, now).await {
Ok(key) => {
info!(job = %cmd.id, %pc_id, key, "collect: bundle uploaded");
Some(key)
}
Err(e) => {
warn!(error = %e, job = %cmd.id, %pc_id, "collect: bundle failed; publishing result without a bundle");
None
}
}
}
fn parse_file_list(stdout: &str, files_field: &str) -> Result<Vec<String>> {
let value: serde_json::Value =
serde_json::from_str(stdout.trim()).context("collect: stdout is not valid JSON")?;
let arr = value
.get(files_field)
.with_context(|| format!("collect: stdout JSON has no '{files_field}' field"))?
.as_array()
.with_context(|| format!("collect: '{files_field}' is not an array"))?;
let mut files = Vec::with_capacity(arr.len());
for item in arr {
let s = item
.as_str()
.context("collect: file-list entry is not a string")?;
if !s.trim().is_empty() {
files.push(s.to_string());
}
}
Ok(files)
}
fn build_zip(files: Vec<String>, max_bytes: u64) -> Result<Vec<u8>> {
use zip::write::SimpleFileOptions;
let mut zw = zip::ZipWriter::new(std::io::Cursor::new(Vec::<u8>::new()));
let opts = SimpleFileOptions::default().compression_method(zip::CompressionMethod::Deflated);
let mut total: u64 = 0;
let mut added = 0usize;
let mut used_names: std::collections::HashSet<String> = std::collections::HashSet::new();
for path in &files {
let meta = match std::fs::metadata(path) {
Ok(m) => m,
Err(e) => {
warn!(error = %e, path, "collect: skipping file (metadata failed)");
continue;
}
};
if !meta.is_file() {
warn!(path, "collect: skipping non-regular-file path");
continue;
}
if total.saturating_add(meta.len()) > max_bytes {
bail!(
"collect: bundle exceeds max_size ({total} + {} > {max_bytes} bytes) at file '{path}'",
meta.len()
);
}
let bytes = match std::fs::read(path) {
Ok(b) => b,
Err(e) => {
warn!(error = %e, path, "collect: skipping unreadable file");
continue;
}
};
total = total.saturating_add(bytes.len() as u64);
let base = std::path::Path::new(path)
.file_name()
.map(|s| s.to_string_lossy().into_owned())
.unwrap_or_else(|| format!("file{added}"));
let mut name = base.clone();
let mut n = 1;
while !used_names.insert(name.clone()) {
name = format!("{n}_{base}");
n += 1;
}
zw.start_file(&name, opts)
.with_context(|| format!("collect: zip start_file {name}"))?;
zw.write_all(&bytes)
.with_context(|| format!("collect: zip write {name}"))?;
added += 1;
}
if added == 0 {
bail!("collect: no readable files to bundle");
}
let cursor = zw.finish().context("collect: zip finish")?;
Ok(cursor.into_inner())
}
async fn collect_and_upload(
js: &async_nats::jetstream::Context,
cmd: &Command,
hint: &CollectHint,
pc_id: &str,
stdout: &str,
now: chrono::DateTime<chrono::Utc>,
) -> Result<String> {
let files = parse_file_list(stdout, &hint.files_field)?;
if files.is_empty() {
bail!("collect: '{}' file list is empty", hint.files_field);
}
let max_bytes = hint.max_size_bytes();
let zip_bytes = tokio::task::spawn_blocking(move || build_zip(files, max_bytes))
.await
.context("collect: zip task join")??;
let store = js
.get_object_store(OBJECT_COLLECTIONS)
.await
.with_context(|| {
format!(
"collect: get_object_store {OBJECT_COLLECTIONS} — \
was bootstrap::ensure_jetstream_resources run on the backend?"
)
})?;
let ts = now.format("%Y%m%dT%H%M%S%.3fZ").to_string();
let key = format!("{pc_id}/{}/{ts}.zip", cmd.id);
let bytes_len = zip_bytes.len();
let mut cursor = std::io::Cursor::new(zip_bytes);
store
.put(key.as_str(), &mut cursor)
.await
.with_context(|| format!("collect: object_store.put {key}"))?;
info!(
key,
bytes = bytes_len,
"collect: uploaded bundle to OBJECT_COLLECTIONS"
);
Ok(key)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_file_list_reads_default_field() {
let stdout = r#"{ "files": ["C:/a.txt", "C:/b.log"] }"#;
let files = parse_file_list(stdout, "files").unwrap();
assert_eq!(files, vec!["C:/a.txt".to_string(), "C:/b.log".to_string()]);
}
#[test]
fn parse_file_list_honors_custom_field_and_skips_blanks() {
let stdout = r#"{ "artifacts": ["x", " ", "y"] }"#;
let files = parse_file_list(stdout, "artifacts").unwrap();
assert_eq!(files, vec!["x".to_string(), "y".to_string()]);
}
#[test]
fn parse_file_list_rejects_non_json_and_missing_field() {
assert!(parse_file_list("not json", "files").is_err());
assert!(parse_file_list(r#"{"other": []}"#, "files").is_err());
assert!(parse_file_list(r#"{"files": "nope"}"#, "files").is_err());
}
#[test]
fn build_zip_packs_files_and_dedups_names() {
let dir = tempfile::tempdir().unwrap();
let a = dir.path().join("report.txt");
let sub = dir.path().join("sub");
std::fs::create_dir(&sub).unwrap();
let b = sub.join("report.txt"); std::fs::write(&a, b"hello").unwrap();
std::fs::write(&b, b"world").unwrap();
let files = vec![
a.to_string_lossy().into_owned(),
b.to_string_lossy().into_owned(),
];
let zip = build_zip(files, 1_000_000).unwrap();
let reader = zip::ZipArchive::new(std::io::Cursor::new(zip)).unwrap();
assert_eq!(reader.len(), 2);
let names: Vec<String> = reader.file_names().map(|s| s.to_string()).collect();
assert!(names.contains(&"report.txt".to_string()));
assert!(names.iter().any(|n| n.ends_with("_report.txt")));
}
#[test]
fn build_zip_enforces_max_size() {
let dir = tempfile::tempdir().unwrap();
let big = dir.path().join("big.bin");
std::fs::write(&big, vec![0u8; 2048]).unwrap();
let err = build_zip(vec![big.to_string_lossy().into_owned()], 1024).unwrap_err();
assert!(err.to_string().contains("max_size"), "{err}");
}
#[test]
fn build_zip_errors_when_no_readable_files() {
let err = build_zip(vec!["C:/definitely/missing/xyz.txt".into()], 1_000_000).unwrap_err();
assert!(err.to_string().contains("no readable files"), "{err}");
}
}