use std::path::{Path, PathBuf};
use serde::Deserialize;
#[cfg(feature = "documents")]
use crate::extract::SCHEMA_VER;
use crate::extract::{FileMapL1, FileMapL2};
use crate::store::StoreError;
#[derive(Deserialize)]
struct BlobSchemaPeek {
schema_ver: u16,
}
pub(crate) fn read_if_exists(path: &Path) -> Result<Option<Vec<u8>>, StoreError> {
match std::fs::read(path) {
Ok(bytes) => Ok(Some(bytes)),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
Err(source) => Err(StoreError::Io {
path: path.to_path_buf(),
source,
}),
}
}
fn frame_slices(bytes: &[u8]) -> Option<(&[u8], &[u8])> {
let header: [u8; 4] = bytes.get(0..4)?.try_into().ok()?;
let l1_len = u32::from_le_bytes(header) as usize;
let rest = bytes.get(4..)?;
let l1 = rest.get(..l1_len)?;
let l2 = &rest[l1_len..];
Some((l1, l2))
}
pub(crate) fn frame_filemap(l1: &FileMapL1, l2: Option<&FileMapL2>) -> Result<Vec<u8>, StoreError> {
let l1_bytes = rmp_serde::to_vec_named(l1)?;
let l2_bytes = match l2 {
Some(map) => rmp_serde::to_vec_named(map)?,
None => Vec::new(),
};
let l1_len = u32::try_from(l1_bytes.len()).map_err(|_| StoreError::BlobTooLarge)?;
let mut out = Vec::with_capacity(4 + l1_bytes.len() + l2_bytes.len());
out.extend_from_slice(&l1_len.to_le_bytes());
out.extend_from_slice(&l1_bytes);
out.extend_from_slice(&l2_bytes);
Ok(out)
}
pub(crate) fn parse_filemap_l1(path: &Path, bytes: &[u8]) -> Result<FileMapL1, StoreError> {
let (l1, _l2) = frame_slices(bytes).ok_or_else(|| StoreError::CorruptBlob {
path: path.to_path_buf(),
})?;
Ok(rmp_serde::from_slice(l1)?)
}
pub(crate) fn parse_filemap_l2(path: &Path, bytes: &[u8]) -> Result<Option<FileMapL2>, StoreError> {
let (_l1, l2) = frame_slices(bytes).ok_or_else(|| StoreError::CorruptBlob {
path: path.to_path_buf(),
})?;
if l2.is_empty() {
return Ok(None);
}
Ok(Some(rmp_serde::from_slice(l2)?))
}
pub(crate) fn peek_filemap_schema(path: &Path) -> Option<u16> {
let bytes = std::fs::read(path).ok()?;
let (l1, _l2) = frame_slices(&bytes)?;
rmp_serde::from_slice::<BlobSchemaPeek>(l1)
.ok()
.map(|peek| peek.schema_ver)
}
#[cfg(feature = "documents")]
fn peek_blob_schema(path: &Path) -> Option<u16> {
let bytes = std::fs::read(path).ok()?;
rmp_serde::from_slice::<BlobSchemaPeek>(&bytes)
.ok()
.map(|peek| peek.schema_ver)
}
thread_local! {
static TMP_SUFFIX: String = format!(
"{}.{:?}.tmp",
std::process::id(),
std::thread::current().id()
);
}
pub(crate) fn write_bytes_atomic(path: PathBuf, bytes: &[u8]) -> Result<(), StoreError> {
use std::fs::OpenOptions;
use std::io::Write;
let tmp = TMP_SUFFIX.with(|suffix| path.with_extension(format!("msgpack.{suffix}")));
{
let mut f = OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(&tmp)
.map_err(|source| StoreError::Io {
path: tmp.clone(),
source,
})?;
f.write_all(bytes).map_err(|source| StoreError::Io {
path: tmp.clone(),
source,
})?;
}
if let Err(source) = std::fs::rename(&tmp, &path) {
let _ = std::fs::remove_file(&tmp);
return Err(StoreError::Io { path, source });
}
Ok(())
}
#[cfg(feature = "documents")]
pub(crate) fn write_blob<T: serde::Serialize>(path: PathBuf, value: &T) -> Result<(), StoreError> {
if path.exists() && peek_blob_schema(&path) == Some(SCHEMA_VER) {
return Ok(());
}
let bytes = rmp_serde::to_vec_named(value)?;
write_bytes_atomic(path, &bytes)
}