use crate::error::{Error, Result};
use byteorder::{ReadBytesExt, WriteBytesExt};
use std::fs::{self, File};
use std::io::{BufReader, BufWriter, Read, Write};
use std::path::Path;
#[derive(Debug, Clone)]
pub(crate) struct StreamState {
pub next_id: u64,
pub active_segment_start_id: u64,
}
#[derive(Debug, Clone)]
pub struct StreamStateFile {
pub active_segment_name: String,
pub version: u8, }
impl StreamStateFile {
const FILENAME: &'static str = "head.state";
pub const VERSION: u8 = 1;
pub fn read_from(stream_dir: &Path) -> Result<Option<Self>> {
let path = stream_dir.join(Self::FILENAME);
if !path.exists() {
return Ok(None);
}
let file = File::open(path)?;
let mut reader = BufReader::new(file);
let version = reader.read_u8()?;
if version != Self::VERSION {
return Err(Error::Corruption(format!(
"Unsupported head.state version: expected {}, got {}",
Self::VERSION,
version
)));
}
let name_len = reader.read_u8()? as usize;
if name_len == 0 || name_len > 255 {
return Err(Error::Corruption("Invalid filename length in head.state".into()));
}
let mut name_buf = vec![0u8; name_len];
reader.read_exact(&mut name_buf)?;
let active_segment_name = String::from_utf8(name_buf)
.map_err(|e| Error::Corruption(format!("Invalid UTF-8 in head.state filename: {}", e)))?;
Ok(Some(Self {
active_segment_name,
version: version,
}))
}
pub fn write_to(&self, stream_dir: &Path) -> Result<()> {
let temp_path = stream_dir.join(format!("{}.tmp", Self::FILENAME));
let final_path = stream_dir.join(Self::FILENAME);
let file = File::create(&temp_path)?;
let mut writer = BufWriter::new(file);
writer.write_u8(Self::VERSION)?;
let name_bytes = self.active_segment_name.as_bytes();
if name_bytes.len() > 255 {
return Err(Error::Serialization(
"Segment filename is too long for state file".into(),
));
}
writer.write_u8(name_bytes.len() as u8)?;
writer.write_all(name_bytes)?;
writer.flush()?;
writer.get_ref().sync_all()?;
fs::rename(&temp_path, &final_path)?;
let parent_dir = File::open(stream_dir)?;
parent_dir.sync_all()?;
Ok(())
}
}