gflow 0.4.15

A lightweight, single-node job scheduler written in Rust.
Documentation
//! State serialization module supporting multiple formats
//!
//! This module provides serialization and deserialization for scheduler state
//! with support for both JSON (legacy) and MessagePack (binary) formats.
//!
//! The binary format provides:
//! - 50-70% smaller file size compared to JSON
//! - 2-5x faster serialization/deserialization
//! - Support for both JSON (legacy) and MessagePack (binary) state files

use anyhow::{Context, Result};
use gflow::core::scheduler::Scheduler;
use std::path::Path;

fn msgpack_header_hint(bytes: &[u8]) -> Option<String> {
    let b0 = *bytes.first()?;

    // fixmap
    if (0x80..=0x8f).contains(&b0) {
        return Some(format!("fixmap({})", (b0 & 0x0f)));
    }
    // fixarray
    if (0x90..=0x9f).contains(&b0) {
        return Some(format!("fixarray({})", (b0 & 0x0f)));
    }

    match b0 {
        // array 16
        0xdc if bytes.len() >= 3 => {
            let len = u16::from_be_bytes([bytes[1], bytes[2]]);
            Some(format!("array16({})", len))
        }
        // array 32
        0xdd if bytes.len() >= 5 => {
            let len = u32::from_be_bytes([bytes[1], bytes[2], bytes[3], bytes[4]]);
            Some(format!("array32({})", len))
        }
        // map 16
        0xde if bytes.len() >= 3 => {
            let len = u16::from_be_bytes([bytes[1], bytes[2]]);
            Some(format!("map16({})", len))
        }
        // map 32
        0xdf if bytes.len() >= 5 => {
            let len = u32::from_be_bytes([bytes[1], bytes[2], bytes[3], bytes[4]]);
            Some(format!("map32({})", len))
        }
        _ => Some(format!("0x{:02x}", b0)),
    }
}

/// Serialization format for state persistence
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SerializationFormat {
    /// JSON format (legacy, human-readable)
    Json,
    /// MessagePack format (binary, compact)
    MessagePack,
}

impl SerializationFormat {
    /// Get the file extension for this format
    pub fn extension(&self) -> &'static str {
        match self {
            SerializationFormat::Json => "json",
            SerializationFormat::MessagePack => "msgpack",
        }
    }

    /// Detect format from file extension
    #[allow(dead_code)]
    pub fn from_path(path: &Path) -> Option<Self> {
        path.extension()
            .and_then(|ext| ext.to_str())
            .and_then(|ext| match ext {
                "json" => Some(SerializationFormat::Json),
                "msgpack" => Some(SerializationFormat::MessagePack),
                _ => None,
            })
    }
}

/// Serialize scheduler state to bytes
pub fn serialize(scheduler: &Scheduler, format: SerializationFormat) -> Result<Vec<u8>> {
    match format {
        SerializationFormat::Json => {
            let json = serde_json::to_string(scheduler)
                .context("Failed to serialize scheduler to JSON")?;
            Ok(json.into_bytes())
        }
        SerializationFormat::MessagePack => {
            // Use named fields (map) so `#[serde(skip_serializing)]` fields don't break the
            // positional encoding used by the default tuple/seq representation.
            rmp_serde::to_vec_named(scheduler)
                .context("Failed to serialize scheduler to MessagePack")
        }
    }
}

/// Deserialize scheduler state from bytes
pub fn deserialize(bytes: &[u8], format: SerializationFormat) -> Result<Scheduler> {
    let scheduler: Scheduler = match format {
        SerializationFormat::Json => {
            let json = std::str::from_utf8(bytes).context("Invalid UTF-8 in JSON file")?;
            serde_json::from_str(json).context("Failed to deserialize scheduler from JSON")?
        }
        SerializationFormat::MessagePack => rmp_serde::from_slice(bytes)
            .context("Failed to deserialize scheduler from MessagePack")?,
    };

    Ok(scheduler)
}

/// Load scheduler state with automatic format detection and fallback
///
/// This function tries to load state in the following order:
/// 1. MessagePack format (state.msgpack)
/// 2. JSON format (state.json)
///
/// Returns Ok(Some(scheduler)) if state was loaded successfully,
/// Ok(None) if no state file exists,
/// Err if state file exists but couldn't be loaded.
pub fn load_state_auto(state_dir: &Path) -> Result<Option<Scheduler>> {
    let msgpack_path = state_dir.join("state.msgpack");
    let json_path = state_dir.join("state.json");

    // Try MessagePack first (preferred format)
    if msgpack_path.exists() {
        tracing::debug!("Loading state from MessagePack: {}", msgpack_path.display());
        let bytes = std::fs::read(&msgpack_path)
            .context(format!("Failed to read {}", msgpack_path.display()))?;
        let hint = msgpack_header_hint(&bytes).unwrap_or_else(|| "empty".to_string());
        tracing::debug!(
            "State msgpack metadata: {} bytes, header={}",
            bytes.len(),
            hint
        );
        let scheduler = deserialize(&bytes, SerializationFormat::MessagePack).context(format!(
            "Failed to deserialize {} ({} bytes, header={})",
            msgpack_path.display(),
            bytes.len(),
            hint
        ))?;
        return Ok(Some(scheduler));
    }

    // Fallback to JSON
    if json_path.exists() {
        tracing::debug!("Loading state from JSON: {}", json_path.display());
        let bytes =
            std::fs::read(&json_path).context(format!("Failed to read {}", json_path.display()))?;
        let scheduler = deserialize(&bytes, SerializationFormat::Json)
            .context(format!("Failed to deserialize {}", json_path.display()))?;

        return Ok(Some(scheduler));
    }

    // No state file found
    Ok(None)
}

/// Save scheduler state to disk
///
/// Uses atomic write (write to temp file, then rename) to prevent corruption.
pub fn save_state(
    scheduler: &Scheduler,
    state_dir: &Path,
    format: SerializationFormat,
) -> Result<()> {
    let filename = format!("state.{}", format.extension());
    let path = state_dir.join(&filename);
    let tmp_path = state_dir.join(format!("{}.tmp", filename));

    // Ensure parent directory exists
    if let Some(parent) = path.parent() {
        std::fs::create_dir_all(parent)
            .context(format!("Failed to create directory {}", parent.display()))?;
    }

    // Serialize to bytes
    let bytes = serialize(scheduler, format)?;

    // Write to temporary file
    std::fs::write(&tmp_path, &bytes)
        .context(format!("Failed to write to {}", tmp_path.display()))?;

    // Atomic rename
    std::fs::rename(&tmp_path, &path).context(format!(
        "Failed to rename {} to {}",
        tmp_path.display(),
        path.display()
    ))?;

    tracing::debug!(
        "Saved state to {} ({} bytes, {} format)",
        path.display(),
        bytes.len(),
        match format {
            SerializationFormat::Json => "JSON",
            SerializationFormat::MessagePack => "MessagePack",
        }
    );

    Ok(())
}

#[cfg(test)]
mod tests {
    use super::*;
    use gflow::core::scheduler::SchedulerBuilder;

    #[test]
    fn test_serialize_deserialize_json() {
        let scheduler = SchedulerBuilder::new().build();
        let bytes = serialize(&scheduler, SerializationFormat::Json).unwrap();
        let loaded = deserialize(&bytes, SerializationFormat::Json).unwrap();
        assert_eq!(scheduler.next_job_id(), loaded.next_job_id());
    }

    #[test]
    fn test_serialize_deserialize_msgpack() {
        let scheduler = SchedulerBuilder::new().build();
        let bytes = serialize(&scheduler, SerializationFormat::MessagePack).unwrap();
        let loaded = deserialize(&bytes, SerializationFormat::MessagePack).unwrap();
        assert_eq!(scheduler.next_job_id(), loaded.next_job_id());
    }

    #[test]
    fn test_msgpack_smaller_than_json() {
        let scheduler = SchedulerBuilder::new().build();
        let json_bytes = serialize(&scheduler, SerializationFormat::Json).unwrap();
        let msgpack_bytes = serialize(&scheduler, SerializationFormat::MessagePack).unwrap();

        println!("JSON size: {} bytes", json_bytes.len());
        println!("MessagePack size: {} bytes", msgpack_bytes.len());

        // MessagePack should be significantly smaller
        assert!(msgpack_bytes.len() < json_bytes.len());
    }
}