use std::{str::from_utf8, sync::Arc};
use bytes::Bytes;
use zstd::zstd_safe::get_frame_content_size;
use crate::{
storage::{ObjectMeta, StorageError, StorageProvider},
supertable::{
ManifestLoadError, ManifestSnapshot,
error::CommitError,
manifest::{
list::{self as list_mod, Manifest as PersistedManifest},
part::{
self as part_mod, BLAKE3_DIGEST_BYTES, BLAKE3_HEX_LEN, ContentHash, ManifestPart,
PartId,
},
},
},
};
pub const POINTER_PATH: &str = "_supertable/current";
pub const MANIFEST_DIR: &str = "manifest";
pub const MANIFEST_PARTS_DIR: &str = "manifest-parts";
pub fn manifest_uri(manifest_id: u64) -> String {
format!("{MANIFEST_DIR}/manifest-{manifest_id:06}.json")
}
pub fn part_uri(content_hash: &ContentHash) -> String {
format!(
"{MANIFEST_PARTS_DIR}/part-{}.avro.zst",
content_hash.to_hex()
)
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PointerFile {
pub manifest_id: u64,
pub manifest_uri: String,
pub content_hash: ContentHash,
}
impl PointerFile {
pub fn to_bytes(&self) -> Vec<u8> {
format!(
"manifest_id={}\nmanifest_uri={}\ncontent_hash=blake3:{}\n",
self.manifest_id,
self.manifest_uri,
self.content_hash.to_hex(),
)
.into_bytes()
}
pub fn from_bytes(bytes: &[u8]) -> Result<Self, ManifestLoadError> {
let s = from_utf8(bytes)
.map_err(|e| ManifestLoadError::PointerParse(format!("not utf-8: {e}")))?;
let mut manifest_id: Option<u64> = None;
let mut manifest_uri: Option<String> = None;
let mut content_hash: Option<ContentHash> = None;
for line in s.lines() {
if line.is_empty() {
continue;
}
let (key, value) = line.split_once('=').ok_or_else(|| {
ManifestLoadError::PointerParse(format!("no '=' in line: {line:?}"))
})?;
match key {
"manifest_id" => {
manifest_id = Some(value.parse::<u64>().map_err(|e| {
ManifestLoadError::PointerParse(format!("manifest_id: {e}"))
})?);
}
"manifest_uri" => {
manifest_uri = Some(value.to_string());
}
"content_hash" => {
let hex = value.strip_prefix("blake3:").ok_or_else(|| {
ManifestLoadError::PointerParse(format!(
"content_hash missing 'blake3:' prefix: {value}"
))
})?;
if hex.len() != BLAKE3_HEX_LEN {
return Err(ManifestLoadError::PointerParse(format!(
"content_hash hex must be {BLAKE3_HEX_LEN} chars; got {}",
hex.len()
)));
}
let mut bytes = [0u8; BLAKE3_DIGEST_BYTES];
for i in 0..BLAKE3_DIGEST_BYTES {
bytes[i] =
u8::from_str_radix(&hex[2 * i..2 * i + 2], 16).map_err(|_| {
ManifestLoadError::PointerParse(format!("content_hash hex: {hex}"))
})?;
}
content_hash = Some(ContentHash(bytes));
}
_ => {
}
}
}
Ok(Self {
manifest_id: manifest_id
.ok_or_else(|| ManifestLoadError::PointerParse("missing manifest_id".into()))?,
manifest_uri: manifest_uri
.ok_or_else(|| ManifestLoadError::PointerParse("missing manifest_uri".into()))?,
content_hash: content_hash
.ok_or_else(|| ManifestLoadError::PointerParse("missing content_hash".into()))?,
})
}
pub fn get_manifest_id(&self) -> u64 {
self.manifest_id
}
}
pub async fn read_pointer(
storage: &dyn StorageProvider,
) -> Result<Option<(PointerFile, ObjectMeta)>, ManifestLoadError> {
match storage.get(POINTER_PATH).await {
Ok((bytes, meta)) => Ok(Some((PointerFile::from_bytes(&bytes)?, meta))),
Err(StorageError::NotFound { .. }) => Ok(None),
Err(e) => Err(ManifestLoadError::Storage(e)),
}
}
pub struct EncodedPart {
pub part: ManifestPart,
pub encoded: Vec<u8>,
}
#[derive(Debug, Clone)]
pub struct PartWriteResult {
pub part_id: PartId,
pub uri: String,
pub content_hash: ContentHash,
pub size_bytes_compressed: u64,
pub size_bytes_uncompressed: u64,
}
#[derive(Debug, Clone)]
pub struct ManifestWriteResult {
pub uri: String,
pub content_hash: ContentHash,
pub size_bytes: u64,
}
pub async fn write_manifest_part(
storage: &dyn StorageProvider,
part: &ManifestPart,
zstd_level: i32,
) -> Result<PartWriteResult, CommitError> {
let compressed = part_mod::encode(part, zstd_level);
let content_hash = ContentHash::of(&compressed);
let uri = part_uri(&content_hash);
let size_compressed = compressed.len() as u64;
let size_uncompressed = frame_content_size(&compressed, size_compressed);
match storage.put_atomic(&uri, Bytes::from(compressed)).await {
Ok(_) => {}
Err(StorageError::PreconditionFailed { .. }) => {}
Err(e) => return Err(e.into()),
}
Ok(PartWriteResult {
part_id: part.part_id,
uri,
content_hash,
size_bytes_compressed: size_compressed,
size_bytes_uncompressed: size_uncompressed,
})
}
pub(crate) fn frame_content_size(compressed: &[u8], fallback: u64) -> u64 {
get_frame_content_size(compressed)
.ok()
.flatten()
.unwrap_or(fallback)
}
pub(crate) async fn write_part_bytes(
storage: &dyn StorageProvider,
encoded: &[u8],
) -> Result<(), CommitError> {
let uri = part_uri(&ContentHash::of(encoded));
match storage
.put_atomic(&uri, Bytes::copy_from_slice(encoded))
.await
{
Ok(_) | Err(StorageError::PreconditionFailed { .. }) => Ok(()),
Err(e) => Err(e.into()),
}
}
pub async fn write_manifest(
storage: &dyn StorageProvider,
list: &PersistedManifest,
) -> Result<ManifestWriteResult, CommitError> {
let json = list_mod::encode(list).map_err(|e| CommitError::Encode(e.to_string()))?;
let content_hash = ContentHash::of(&json);
let uri = manifest_uri(list.manifest_id);
let size = json.len() as u64;
storage.put_atomic(&uri, Bytes::from(json)).await?;
Ok(ManifestWriteResult {
uri,
content_hash,
size_bytes: size,
})
}
pub async fn write_pointer(
storage: &dyn StorageProvider,
pointer: &PointerFile,
expected_prev_etag: Option<&str>,
) -> Result<(), CommitError> {
let bytes = Bytes::from(pointer.to_bytes());
let result = match expected_prev_etag {
None => storage.put_atomic(POINTER_PATH, bytes).await,
Some(_) => {
storage
.put_if_match(POINTER_PATH, bytes, expected_prev_etag)
.await
}
};
match result {
Ok(_) => Ok(()),
Err(StorageError::PreconditionFailed { .. }) => Err(CommitError::WriteContentionExhausted),
Err(e) => Err(e.into()),
}
}
#[doc(hidden)]
pub fn as_dyn(p: &Arc<dyn StorageProvider>) -> &dyn StorageProvider {
p.as_ref()
}
pub(crate) fn translate_contention(e: CommitError) -> CommitError {
match e {
CommitError::Storage(StorageError::PreconditionFailed { .. }) => {
CommitError::WriteContentionExhausted
}
other => other,
}
}
pub async fn get_current_manifest_etag(
storage: &Arc<dyn StorageProvider>,
current: Arc<ManifestSnapshot>,
) -> Result<Option<String>, CommitError> {
let Some((pointer_file, meta)) = read_pointer(storage.as_ref())
.await
.map_err(|e| CommitError::PointerParse(e.to_string()))?
else {
return Ok(None);
};
let Some(meta_list) = current.list.as_ref() else {
return Ok(None);
};
if pointer_file.get_manifest_id() == meta_list.manifest_id {
return Ok(meta.etag);
}
Err(CommitError::WriteContentionExhausted)
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use tempfile::TempDir;
use super::*;
use crate::{
storage::LocalFsStorageProvider,
supertable::manifest::list::{
FORMAT_VERSION as LIST_FORMAT_VERSION, Manifest as PersistedManifest, PartitionStrategy,
},
};
#[test]
fn manifest_uri_zero_pads_to_six_digits() {
assert_eq!(manifest_uri(0), "manifest/manifest-000000.json");
assert_eq!(manifest_uri(42), "manifest/manifest-000042.json");
assert_eq!(manifest_uri(123_456), "manifest/manifest-123456.json");
}
#[test]
fn manifest_uri_overflows_padding_for_large_ids_intentionally() {
assert_eq!(manifest_uri(1_000_000), "manifest/manifest-1000000.json");
}
#[test]
fn part_uri_uses_content_hash_hex() {
let h = ContentHash::of(b"hello manifest part");
let uri_a = part_uri(&h);
let uri_b = part_uri(&ContentHash::of(b"hello manifest part"));
assert_eq!(uri_a, uri_b);
assert!(uri_a.starts_with("manifest-parts/part-"));
assert!(uri_a.ends_with(".avro.zst"));
assert_eq!(
uri_a,
format!("manifest-parts/part-{}.avro.zst", h.to_hex())
);
}
fn sample_pointer() -> PointerFile {
let mut bytes = [0u8; 32];
for (i, b) in bytes.iter_mut().enumerate() {
*b = i as u8;
}
PointerFile {
manifest_id: 7,
manifest_uri: "manifest/manifest-000007.json".into(),
content_hash: ContentHash(bytes),
}
}
#[test]
fn pointer_file_text_roundtrip() {
let p = sample_pointer();
let bytes = p.to_bytes();
let s = from_utf8(&bytes).expect("utf-8");
assert!(s.contains("manifest_id=7"));
assert!(s.contains("manifest_uri=manifest/manifest-000007.json"));
assert!(s.contains("content_hash=blake3:"));
let parsed = PointerFile::from_bytes(&bytes).expect("parse");
assert_eq!(parsed, p);
}
#[test]
fn pointer_file_from_bytes_skips_blank_lines() {
let bytes = b"\nmanifest_id=1\n\nmanifest_uri=foo.json\ncontent_hash=blake3:0000000000000000000000000000000000000000000000000000000000000000\n";
let parsed = PointerFile::from_bytes(bytes).expect("parse");
assert_eq!(parsed.manifest_id, 1);
assert_eq!(parsed.manifest_uri, "foo.json");
assert_eq!(parsed.content_hash.0, [0u8; 32]);
}
#[test]
fn pointer_file_from_bytes_ignores_unknown_keys() {
let bytes = b"manifest_id=2\nmanifest_uri=x.json\ncontent_hash=blake3:1111111111111111111111111111111111111111111111111111111111111111\nfuture_field=ignored\n";
let parsed = PointerFile::from_bytes(bytes).expect("parse");
assert_eq!(parsed.manifest_id, 2);
}
fn assert_parse_err(bytes: &[u8], needle: &str) {
let err = PointerFile::from_bytes(bytes).expect_err("must error");
match err {
ManifestLoadError::PointerParse(msg) => assert!(
msg.contains(needle),
"expected `{needle}` in error; got: {msg}"
),
other => panic!("expected PointerParse; got {other:?}"),
}
}
#[test]
fn pointer_file_from_bytes_rejects_invalid_utf8() {
let bytes = [0xff, 0xfe, 0xfd];
assert_parse_err(&bytes, "not utf-8");
}
#[test]
fn pointer_file_from_bytes_rejects_missing_equals() {
assert_parse_err(b"manifest_id 1\n", "no '='");
}
#[test]
fn pointer_file_from_bytes_rejects_bad_manifest_id() {
assert_parse_err(
b"manifest_id=abc\nmanifest_uri=x\ncontent_hash=blake3:0000000000000000000000000000000000000000000000000000000000000000\n",
"manifest_id",
);
}
#[test]
fn pointer_file_from_bytes_rejects_content_hash_without_prefix() {
assert_parse_err(
b"manifest_id=1\nmanifest_uri=x\ncontent_hash=cafebabe\n",
"blake3:",
);
}
#[test]
fn pointer_file_from_bytes_rejects_short_hex() {
assert_parse_err(
b"manifest_id=1\nmanifest_uri=x\ncontent_hash=blake3:dead\n",
"64 chars",
);
}
#[test]
fn pointer_file_from_bytes_rejects_bad_hex_chars() {
let mut hex = String::from("blake3:");
hex.push_str(&"z".repeat(64));
let payload = format!("manifest_id=1\nmanifest_uri=x\ncontent_hash={hex}\n");
assert_parse_err(payload.as_bytes(), "content_hash hex");
}
#[test]
fn pointer_file_from_bytes_rejects_missing_manifest_id() {
assert_parse_err(
b"manifest_uri=x\ncontent_hash=blake3:0000000000000000000000000000000000000000000000000000000000000000\n",
"missing manifest_id",
);
}
#[test]
fn pointer_file_from_bytes_rejects_missing_manifest_uri() {
assert_parse_err(
b"manifest_id=1\ncontent_hash=blake3:0000000000000000000000000000000000000000000000000000000000000000\n",
"missing manifest_uri",
);
}
#[test]
fn pointer_file_from_bytes_rejects_missing_content_hash() {
assert_parse_err(b"manifest_id=1\nmanifest_uri=x\n", "missing content_hash");
}
#[test]
fn translate_contention_maps_precondition_failed() {
let in_err = CommitError::Storage(StorageError::PreconditionFailed { uri: "x".into() });
match translate_contention(in_err) {
CommitError::WriteContentionExhausted => {}
other => panic!("expected WriteContentionExhausted; got {other:?}"),
}
}
#[test]
fn translate_contention_passes_through_other_storage_errors() {
let in_err = CommitError::Encode("downstream zstd".into());
match translate_contention(in_err) {
CommitError::Encode(_) => {}
other => panic!("expected Encode passthrough; got {other:?}"),
}
}
fn local_storage() -> (TempDir, Arc<dyn StorageProvider>) {
let dir = TempDir::new().expect("tempdir");
let store: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("local"));
(dir, store)
}
#[tokio::test]
async fn read_pointer_returns_none_when_absent() {
let (_dir, storage) = local_storage();
let p = read_pointer(storage.as_ref()).await.expect("read");
assert!(p.is_none());
}
#[tokio::test]
async fn write_pointer_create_then_read_roundtrip() {
let (_dir, storage) = local_storage();
let p = sample_pointer();
write_pointer(storage.as_ref(), &p, None)
.await
.expect("write");
let (read, _) = read_pointer(storage.as_ref())
.await
.expect("read")
.expect("some");
assert_eq!(read, p);
}
#[tokio::test]
async fn write_pointer_second_create_surfaces_contention() {
let (_dir, storage) = local_storage();
let p = sample_pointer();
write_pointer(storage.as_ref(), &p, None)
.await
.expect("first");
let err = write_pointer(storage.as_ref(), &p, None)
.await
.expect_err("second must lose");
assert!(
matches!(err, CommitError::WriteContentionExhausted),
"{err:?}"
);
}
#[tokio::test]
async fn write_manifest_list_succeeds_and_addresses_uri() {
let (_dir, storage) = local_storage();
let list = PersistedManifest {
format_version: LIST_FORMAT_VERSION.into(),
manifest_id: 1,
options_hash: ContentHash([0u8; 32]),
schema: Vec::new(),
id_column: "_id".into(),
fts_columns: Vec::new(),
vector_columns: Vec::new(),
partition_strategy: PartitionStrategy::TimeRange {
column: "_id".into(),
granularity_secs: 86_400,
},
parts: Vec::new(),
};
let res = write_manifest(storage.as_ref(), &list)
.await
.expect("write");
assert_eq!(res.uri, manifest_uri(1));
assert!(res.size_bytes > 0);
let _ = storage.get(&res.uri).await.expect("get list back");
}
#[test]
fn point_constants_match_layout_doc() {
assert_eq!(POINTER_PATH, "_supertable/current");
assert_eq!(MANIFEST_DIR, "manifest");
assert_eq!(MANIFEST_PARTS_DIR, "manifest-parts");
}
}