use std::io::Write;
use anyhow::{Context, Result, bail};
use kanade_shared::kv::OBJECT_COLLECTIONS;
use kanade_shared::manifest::CollectHint;
use kanade_shared::wire::Command;
use tracing::{info, warn};
pub struct BundleResult {
pub label: Option<String>,
pub key: String,
pub files: Vec<String>,
}
pub async fn maybe_collect(
js: &async_nats::jetstream::Context,
cmd: &Command,
pc_id: &str,
stdout: &str,
now: chrono::DateTime<chrono::Utc>,
) -> Vec<BundleResult> {
let Some(hint) = cmd.collect.as_ref() else {
return Vec::new();
};
match collect_and_upload(js, cmd, hint, pc_id, stdout, now).await {
Ok(bundles) => {
info!(job = %cmd.id, %pc_id, bundles = bundles.len(), "collect: bundle(s) uploaded");
bundles
}
Err(e) => {
warn!(error = %e, job = %cmd.id, %pc_id, "collect: bundle failed; publishing result without a bundle");
Vec::new()
}
}
}
struct RequestedBundle {
label: Option<String>,
files: Vec<String>,
}
fn sanitize_label(label: &str) -> String {
let s: String = label
.chars()
.map(|c| {
if c.is_ascii_alphanumeric() || c == '.' || c == '-' {
c
} else {
'-'
}
})
.collect();
if s.is_empty() {
"bundle".to_string()
} else {
s
}
}
fn dedup_label(
used: &mut std::collections::HashSet<String>,
label: Option<String>,
) -> Option<String> {
let base = label.clone().unwrap_or_default();
if used.insert(base.clone()) {
return label;
}
let stem = if base.is_empty() {
"bundle".to_string()
} else {
base
};
let mut n = 2;
loop {
let cand = format!("{stem}-{n}");
if used.insert(cand.clone()) {
return Some(cand);
}
n += 1;
}
}
fn json_string_array(value: Option<&serde_json::Value>, field: &str) -> Result<Vec<String>> {
let arr = value
.with_context(|| format!("collect: stdout JSON has no '{field}'"))?
.as_array()
.with_context(|| format!("collect: '{field}' is not an array"))?;
let mut files = Vec::with_capacity(arr.len());
for item in arr {
let s = item
.as_str()
.with_context(|| format!("collect: '{field}' entry is not a string"))?;
if !s.trim().is_empty() {
files.push(s.to_string());
}
}
Ok(files)
}
fn parse_bundles(payload: &str, files_field: &str) -> Result<Vec<RequestedBundle>> {
let value: serde_json::Value =
serde_json::from_str(payload.trim()).context("collect: stdout is not valid JSON")?;
if let Some(arr) = value.get("bundles").and_then(|b| b.as_array()) {
let mut out = Vec::with_capacity(arr.len());
for (i, b) in arr.iter().enumerate() {
let label = b
.get("label")
.and_then(|l| l.as_str())
.map(|s| s.to_string());
let files = json_string_array(b.get("files"), &format!("bundles[{i}].files"))?;
out.push(RequestedBundle { label, files });
}
Ok(out)
} else {
let files = json_string_array(value.get(files_field), files_field)?;
Ok(vec![RequestedBundle { label: None, files }])
}
}
fn build_zip(files: Vec<String>, max_bytes: u64) -> Result<(Vec<u8>, Vec<String>)> {
use zip::write::SimpleFileOptions;
let mut zw = zip::ZipWriter::new(std::io::Cursor::new(Vec::<u8>::new()));
let opts = SimpleFileOptions::default().compression_method(zip::CompressionMethod::Deflated);
let mut total: u64 = 0;
let mut added = 0usize;
let mut added_paths: Vec<String> = Vec::new();
let mut used_names: std::collections::HashSet<String> = std::collections::HashSet::new();
for path in &files {
let meta = match std::fs::metadata(path) {
Ok(m) => m,
Err(e) => {
warn!(error = %e, path, "collect: skipping file (metadata failed)");
continue;
}
};
if !meta.is_file() {
warn!(path, "collect: skipping non-regular-file path");
continue;
}
if total.saturating_add(meta.len()) > max_bytes {
bail!(
"collect: bundle exceeds max_size ({total} + {} > {max_bytes} bytes) at file '{path}'",
meta.len()
);
}
let bytes = match std::fs::read(path) {
Ok(b) => b,
Err(e) => {
warn!(error = %e, path, "collect: skipping unreadable file");
continue;
}
};
total = total.saturating_add(bytes.len() as u64);
let base = std::path::Path::new(path)
.file_name()
.map(|s| s.to_string_lossy().into_owned())
.unwrap_or_else(|| format!("file{added}"));
let mut name = base.clone();
let mut n = 1;
while !used_names.insert(name.clone()) {
name = format!("{n}_{base}");
n += 1;
}
zw.start_file(&name, opts)
.with_context(|| format!("collect: zip start_file {name}"))?;
zw.write_all(&bytes)
.with_context(|| format!("collect: zip write {name}"))?;
added += 1;
added_paths.push(path.clone());
}
if added == 0 {
bail!("collect: no readable files to bundle");
}
let cursor = zw.finish().context("collect: zip finish")?;
Ok((cursor.into_inner(), added_paths))
}
async fn collect_and_upload(
js: &async_nats::jetstream::Context,
cmd: &Command,
hint: &CollectHint,
pc_id: &str,
stdout: &str,
now: chrono::DateTime<chrono::Utc>,
) -> Result<Vec<BundleResult>> {
let payload = kanade_shared::manifest::fenced_payload(
stdout,
kanade_shared::manifest::COLLECT_BLOCK_BEGIN,
kanade_shared::manifest::COLLECT_BLOCK_END,
);
let requested = parse_bundles(payload, &hint.files_field)?;
if requested.iter().all(|b| b.files.is_empty()) {
bail!("collect: file list is empty");
}
let max_bytes = hint.max_size_bytes();
let store = js
.get_object_store(OBJECT_COLLECTIONS)
.await
.with_context(|| {
format!(
"collect: get_object_store {OBJECT_COLLECTIONS} — \
was bootstrap::ensure_jetstream_resources run on the backend?"
)
})?;
let ts = now.format("%Y%m%dT%H%M%S%.3fZ").to_string();
let mut out: Vec<BundleResult> = Vec::new();
let mut used_labels: std::collections::HashSet<String> = std::collections::HashSet::new();
for b in requested {
if b.files.is_empty() {
continue;
}
let label = b.label.as_deref().map(sanitize_label);
let files = b.files;
let built = tokio::task::spawn_blocking(move || build_zip(files, max_bytes))
.await
.context("collect: zip task join")?;
let (zip_bytes, packed) = match built {
Ok(v) => v,
Err(e) => {
warn!(error = %e, label = ?label, "collect: bundle skipped (zip failed)");
continue;
}
};
let label = dedup_label(&mut used_labels, label);
let key = match &label {
Some(l) => format!("{pc_id}/{}/{l}__{ts}.zip", cmd.id),
None => format!("{pc_id}/{}/{ts}.zip", cmd.id),
};
let bytes_len = zip_bytes.len();
let mut cursor = std::io::Cursor::new(zip_bytes);
if let Err(e) = store.put(key.as_str(), &mut cursor).await {
warn!(error = %e, key, "collect: upload failed; skipping bundle");
continue;
}
info!(
key,
bytes = bytes_len,
"collect: uploaded bundle to OBJECT_COLLECTIONS"
);
out.push(BundleResult {
label,
key,
files: packed,
});
}
if out.is_empty() {
bail!("collect: no bundles uploaded");
}
Ok(out)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn json_string_array_reads_and_skips_blanks() {
let v: serde_json::Value =
serde_json::from_str(r#"{ "artifacts": ["x", " ", "y"] }"#).unwrap();
let files = json_string_array(v.get("artifacts"), "artifacts").unwrap();
assert_eq!(files, vec!["x".to_string(), "y".to_string()]);
}
#[test]
fn parse_bundles_honors_custom_files_field() {
let b = parse_bundles(r#"{ "artifacts": ["x", "y"] }"#, "artifacts").unwrap();
assert_eq!(b.len(), 1);
assert_eq!(b[0].files, vec!["x".to_string(), "y".to_string()]);
}
#[test]
fn parse_bundles_rejects_non_json_and_missing_or_non_array_field() {
assert!(parse_bundles("not json", "files").is_err());
assert!(parse_bundles(r#"{"other": []}"#, "files").is_err());
assert!(parse_bundles(r#"{"files": "nope"}"#, "files").is_err());
}
#[test]
fn build_zip_packs_files_and_dedups_names() {
let dir = tempfile::tempdir().unwrap();
let a = dir.path().join("report.txt");
let sub = dir.path().join("sub");
std::fs::create_dir(&sub).unwrap();
let b = sub.join("report.txt"); std::fs::write(&a, b"hello").unwrap();
std::fs::write(&b, b"world").unwrap();
let files = vec![
a.to_string_lossy().into_owned(),
b.to_string_lossy().into_owned(),
];
let (zip, packed) = build_zip(files, 1_000_000).unwrap();
assert_eq!(packed.len(), 2);
let reader = zip::ZipArchive::new(std::io::Cursor::new(zip)).unwrap();
assert_eq!(reader.len(), 2);
let names: Vec<String> = reader.file_names().map(|s| s.to_string()).collect();
assert!(names.contains(&"report.txt".to_string()));
assert!(names.iter().any(|n| n.ends_with("_report.txt")));
}
#[test]
fn build_zip_enforces_max_size() {
let dir = tempfile::tempdir().unwrap();
let big = dir.path().join("big.bin");
std::fs::write(&big, vec![0u8; 2048]).unwrap();
let err = build_zip(vec![big.to_string_lossy().into_owned()], 1024).unwrap_err();
assert!(err.to_string().contains("max_size"), "{err}");
}
#[test]
fn build_zip_errors_when_no_readable_files() {
let err = build_zip(vec!["C:/definitely/missing/xyz.txt".into()], 1_000_000).unwrap_err();
assert!(err.to_string().contains("no readable files"), "{err}");
}
#[test]
fn parse_bundles_single_files_form_is_one_unlabeled_bundle() {
let b = parse_bundles(r#"{ "files": ["C:/a.png", "C:/b.png"] }"#, "files").unwrap();
assert_eq!(b.len(), 1);
assert!(b[0].label.is_none());
assert_eq!(b[0].files, vec!["C:/a.png", "C:/b.png"]);
}
#[test]
fn parse_bundles_multi_form_keeps_labels_and_files() {
let stdout = r#"{ "bundles": [
{ "label": "20260101", "files": ["C:/1/a.png"] },
{ "label": "20260102", "files": ["C:/2/a.png", "C:/2/b.png"] }
] }"#;
let b = parse_bundles(stdout, "files").unwrap();
assert_eq!(b.len(), 2);
assert_eq!(b[0].label.as_deref(), Some("20260101"));
assert_eq!(b[1].files.len(), 2);
}
#[test]
fn sanitize_label_strips_unsafe_chars_and_underscores() {
assert_eq!(sanitize_label("2026_01/02:03"), "2026-01-02-03");
assert_eq!(sanitize_label("daily"), "daily");
assert_eq!(sanitize_label("###"), "---");
}
#[test]
fn dedup_label_disambiguates_sanitization_collisions() {
let mut used = std::collections::HashSet::new();
assert_eq!(
dedup_label(&mut used, Some("2026-01".into())),
Some("2026-01".into())
);
assert_eq!(
dedup_label(&mut used, Some("2026-01".into())),
Some("2026-01-2".into())
);
let mut u2 = std::collections::HashSet::new();
assert_eq!(dedup_label(&mut u2, None), None);
assert_eq!(dedup_label(&mut u2, None), Some("bundle-2".into()));
}
}