netsky-core 0.2.0

netsky core: agent model, prompt loader, spawner, config
Documentation
use std::fs::{self, File};
use std::io::{self, Write};
use std::path::Path;

use fs4::fs_std::FileExt;
use serde::Serialize;
use serde_json::Value;

/// Append exactly one serialized JSON value plus a newline to a JSONL file.
///
/// The append is guarded by an exclusive cross-process file lock, uses
/// `O_APPEND`, and calls `sync_all` before releasing the lock.
pub fn append_json_line<T: Serialize>(path: impl AsRef<Path>, value: &T) -> io::Result<()> {
    let line = serde_json::to_string(value).map_err(io::Error::other)?;
    append_serialized_json_line(path, &line)
}

/// Append one pre-serialized JSON value plus a newline to a JSONL file.
///
/// The caller owns JSON serialization. This function rejects embedded
/// newlines so one call cannot create more than one JSONL record.
pub fn append_serialized_json_line(path: impl AsRef<Path>, line: &str) -> io::Result<()> {
    if line.contains('\n') || line.contains('\r') {
        return Err(io::Error::new(
            io::ErrorKind::InvalidInput,
            "jsonl line contains newline",
        ));
    }
    let path = path.as_ref();
    if let Some(parent) = path.parent() {
        fs::create_dir_all(parent)?;
    }
    let mut file = fs::OpenOptions::new()
        .create(true)
        .append(true)
        .open(path)?;
    file.lock_exclusive()?;
    let result = (|| {
        writeln!(file, "{line}")?;
        file.sync_all()
    })();
    let unlock = file.unlock();
    result.and(unlock)
}

/// Read JSONL records, tolerating one malformed crash-tail record.
///
/// A malformed final line is skipped only when the file does not end in a
/// newline. Malformed middle lines are yielded as parse errors.
pub fn read_records(
    path: impl AsRef<Path>,
) -> io::Result<impl Iterator<Item = serde_json::Result<Value>>> {
    let path = path.as_ref();
    let body = fs::read_to_string(path)?;
    Ok(parse_records(path, &body).into_iter())
}

/// Atomically replace a file with fsynced contents.
///
/// The data is written to a sibling temp file, fsynced, renamed over the
/// destination, then the parent directory is fsynced.
pub fn write_file_atomic(path: impl AsRef<Path>, body: &str) -> io::Result<()> {
    let path = path.as_ref();
    if let Some(parent) = path.parent() {
        fs::create_dir_all(parent)?;
    }
    let tmp = path.with_extension(format!(
        "{}.tmp.{}",
        path.extension()
            .and_then(|ext| ext.to_str())
            .unwrap_or("json"),
        std::process::id()
    ));
    {
        let mut file = fs::OpenOptions::new()
            .create(true)
            .write(true)
            .truncate(true)
            .open(&tmp)?;
        file.write_all(body.as_bytes())?;
        file.sync_all()?;
    }
    fs::rename(&tmp, path)?;
    if let Some(parent) = path.parent() {
        let dir = File::open(parent)?;
        dir.sync_all()?;
    }
    Ok(())
}

fn parse_records(path: &Path, body: &str) -> Vec<serde_json::Result<Value>> {
    let missing_final_newline = !body.is_empty() && !body.ends_with('\n');
    let mut lines = body.lines().peekable();
    let mut out = Vec::new();
    let mut line_number = 0;

    while let Some(line) = lines.next() {
        line_number += 1;
        let is_final_line = lines.peek().is_none();
        let line = line.trim();
        if line.is_empty() {
            continue;
        }
        match serde_json::from_str(line) {
            Ok(value) => out.push(Ok(value)),
            Err(error) if missing_final_newline && is_final_line => {
                tracing::debug!(
                    path = %path.display(),
                    line = line_number,
                    error = %error,
                    "skipping malformed trailing jsonl record"
                );
            }
            Err(error) => out.push(Err(error)),
        }
    }

    out
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::process::Command;
    use std::process::Stdio;

    #[derive(Serialize)]
    struct TestRecord {
        worker: usize,
        seq: usize,
        payload: String,
    }

    #[test]
    fn concurrent_process_writers_produce_valid_jsonl() {
        if let Ok(path) = std::env::var("NETSKY_JSONL_TEST_PATH") {
            let worker = std::env::var("NETSKY_JSONL_TEST_WORKER")
                .unwrap()
                .parse::<usize>()
                .unwrap();
            for seq in 0..25 {
                append_json_line(
                    &path,
                    &TestRecord {
                        worker,
                        seq,
                        payload: format!("worker-{worker}-seq-{seq}"),
                    },
                )
                .unwrap();
            }
            return;
        }

        let dir = tempfile::tempdir().unwrap();
        let path = dir.path().join("events.jsonl");
        let exe = std::env::current_exe().unwrap();
        let mut children = Vec::new();

        for worker in 0..16 {
            children.push(
                Command::new(&exe)
                    .arg("--exact")
                    .arg("jsonl::tests::concurrent_process_writers_produce_valid_jsonl")
                    .arg("--quiet")
                    .env("NETSKY_JSONL_TEST_PATH", &path)
                    .env("NETSKY_JSONL_TEST_WORKER", worker.to_string())
                    .stdout(Stdio::null())
                    .stderr(Stdio::null())
                    .spawn()
                    .unwrap(),
            );
        }

        for mut child in children {
            let status = child.wait().unwrap();
            assert!(status.success(), "child writer exited {status}");
        }

        let body = fs::read_to_string(path).unwrap();
        let lines: Vec<_> = body.lines().collect();
        assert_eq!(lines.len(), 16 * 25);
        for line in lines {
            let value: serde_json::Value = serde_json::from_str(line).unwrap();
            assert!(value["worker"].is_number());
            assert!(value["seq"].is_number());
            assert!(value["payload"].is_string());
        }
    }

    #[test]
    fn read_records_skips_one_malformed_trailing_record_without_newline() {
        let dir = tempfile::tempdir().unwrap();
        let path = dir.path().join("events.jsonl");
        fs::write(
            &path,
            concat!("{\"seq\":1}\n", "{\"seq\":2}\n", "{\"seq\":"),
        )
        .unwrap();

        let records = read_records(&path)
            .unwrap()
            .collect::<serde_json::Result<Vec<_>>>()
            .unwrap();

        assert_eq!(records.len(), 2);
        assert_eq!(records[0]["seq"], 1);
        assert_eq!(records[1]["seq"], 2);
    }

    #[test]
    fn read_records_yields_valid_final_record_without_newline() {
        let dir = tempfile::tempdir().unwrap();
        let path = dir.path().join("events.jsonl");
        fs::write(&path, "{\"seq\":1}\n{\"seq\":2}").unwrap();

        let records = read_records(&path)
            .unwrap()
            .collect::<serde_json::Result<Vec<_>>>()
            .unwrap();

        assert_eq!(records.len(), 2);
        assert_eq!(records[1]["seq"], 2);
    }

    #[test]
    fn read_records_keeps_middle_parse_errors() {
        let dir = tempfile::tempdir().unwrap();
        let path = dir.path().join("events.jsonl");
        fs::write(&path, "{\"seq\":1}\n{\"seq\":\n{\"seq\":2}\n").unwrap();

        let mut records = read_records(&path).unwrap();
        assert!(records.next().unwrap().is_ok());
        assert!(records.next().unwrap().is_err());
        assert!(records.next().unwrap().is_ok());
        assert!(records.next().is_none());
    }
}