use anyhow::{Context, Result};
use gflow::core::scheduler::Scheduler;
use std::path::Path;
fn msgpack_header_hint(bytes: &[u8]) -> Option<String> {
let b0 = *bytes.first()?;
if (0x80..=0x8f).contains(&b0) {
return Some(format!("fixmap({})", (b0 & 0x0f)));
}
if (0x90..=0x9f).contains(&b0) {
return Some(format!("fixarray({})", (b0 & 0x0f)));
}
match b0 {
0xdc if bytes.len() >= 3 => {
let len = u16::from_be_bytes([bytes[1], bytes[2]]);
Some(format!("array16({})", len))
}
0xdd if bytes.len() >= 5 => {
let len = u32::from_be_bytes([bytes[1], bytes[2], bytes[3], bytes[4]]);
Some(format!("array32({})", len))
}
0xde if bytes.len() >= 3 => {
let len = u16::from_be_bytes([bytes[1], bytes[2]]);
Some(format!("map16({})", len))
}
0xdf if bytes.len() >= 5 => {
let len = u32::from_be_bytes([bytes[1], bytes[2], bytes[3], bytes[4]]);
Some(format!("map32({})", len))
}
_ => Some(format!("0x{:02x}", b0)),
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SerializationFormat {
Json,
MessagePack,
}
impl SerializationFormat {
pub fn extension(&self) -> &'static str {
match self {
SerializationFormat::Json => "json",
SerializationFormat::MessagePack => "msgpack",
}
}
#[allow(dead_code)]
pub fn from_path(path: &Path) -> Option<Self> {
path.extension()
.and_then(|ext| ext.to_str())
.and_then(|ext| match ext {
"json" => Some(SerializationFormat::Json),
"msgpack" => Some(SerializationFormat::MessagePack),
_ => None,
})
}
}
pub fn serialize(scheduler: &Scheduler, format: SerializationFormat) -> Result<Vec<u8>> {
match format {
SerializationFormat::Json => {
let json = serde_json::to_string(scheduler)
.context("Failed to serialize scheduler to JSON")?;
Ok(json.into_bytes())
}
SerializationFormat::MessagePack => {
rmp_serde::to_vec_named(scheduler)
.context("Failed to serialize scheduler to MessagePack")
}
}
}
pub fn deserialize(bytes: &[u8], format: SerializationFormat) -> Result<Scheduler> {
let scheduler: Scheduler = match format {
SerializationFormat::Json => {
let json = std::str::from_utf8(bytes).context("Invalid UTF-8 in JSON file")?;
serde_json::from_str(json).context("Failed to deserialize scheduler from JSON")?
}
SerializationFormat::MessagePack => rmp_serde::from_slice(bytes)
.context("Failed to deserialize scheduler from MessagePack")?,
};
Ok(scheduler)
}
pub fn load_state_auto(state_dir: &Path) -> Result<Option<Scheduler>> {
let msgpack_path = state_dir.join("state.msgpack");
let json_path = state_dir.join("state.json");
if msgpack_path.exists() {
tracing::debug!("Loading state from MessagePack: {}", msgpack_path.display());
let bytes = std::fs::read(&msgpack_path)
.context(format!("Failed to read {}", msgpack_path.display()))?;
let hint = msgpack_header_hint(&bytes).unwrap_or_else(|| "empty".to_string());
tracing::debug!(
"State msgpack metadata: {} bytes, header={}",
bytes.len(),
hint
);
let scheduler = deserialize(&bytes, SerializationFormat::MessagePack).context(format!(
"Failed to deserialize {} ({} bytes, header={})",
msgpack_path.display(),
bytes.len(),
hint
))?;
return Ok(Some(scheduler));
}
if json_path.exists() {
tracing::debug!("Loading state from JSON: {}", json_path.display());
let bytes =
std::fs::read(&json_path).context(format!("Failed to read {}", json_path.display()))?;
let scheduler = deserialize(&bytes, SerializationFormat::Json)
.context(format!("Failed to deserialize {}", json_path.display()))?;
return Ok(Some(scheduler));
}
Ok(None)
}
pub fn save_state(
scheduler: &Scheduler,
state_dir: &Path,
format: SerializationFormat,
) -> Result<()> {
let filename = format!("state.{}", format.extension());
let path = state_dir.join(&filename);
let tmp_path = state_dir.join(format!("{}.tmp", filename));
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)
.context(format!("Failed to create directory {}", parent.display()))?;
}
let bytes = serialize(scheduler, format)?;
std::fs::write(&tmp_path, &bytes)
.context(format!("Failed to write to {}", tmp_path.display()))?;
std::fs::rename(&tmp_path, &path).context(format!(
"Failed to rename {} to {}",
tmp_path.display(),
path.display()
))?;
tracing::debug!(
"Saved state to {} ({} bytes, {} format)",
path.display(),
bytes.len(),
match format {
SerializationFormat::Json => "JSON",
SerializationFormat::MessagePack => "MessagePack",
}
);
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use gflow::core::scheduler::SchedulerBuilder;
#[test]
fn test_serialize_deserialize_json() {
let scheduler = SchedulerBuilder::new().build();
let bytes = serialize(&scheduler, SerializationFormat::Json).unwrap();
let loaded = deserialize(&bytes, SerializationFormat::Json).unwrap();
assert_eq!(scheduler.next_job_id(), loaded.next_job_id());
}
#[test]
fn test_serialize_deserialize_msgpack() {
let scheduler = SchedulerBuilder::new().build();
let bytes = serialize(&scheduler, SerializationFormat::MessagePack).unwrap();
let loaded = deserialize(&bytes, SerializationFormat::MessagePack).unwrap();
assert_eq!(scheduler.next_job_id(), loaded.next_job_id());
}
#[test]
fn test_msgpack_smaller_than_json() {
let scheduler = SchedulerBuilder::new().build();
let json_bytes = serialize(&scheduler, SerializationFormat::Json).unwrap();
let msgpack_bytes = serialize(&scheduler, SerializationFormat::MessagePack).unwrap();
println!("JSON size: {} bytes", json_bytes.len());
println!("MessagePack size: {} bytes", msgpack_bytes.len());
assert!(msgpack_bytes.len() < json_bytes.len());
}
}