use std::sync::Arc;
use bytes::Bytes;
use futures::StreamExt;
use futures::stream::FuturesUnordered;
use lance_core::{Error, Result};
use lance_index::mem_wal::RegionManifest;
use lance_io::object_store::ObjectStore;
use lance_table::format::pb;
use log::{info, warn};
use object_store::PutMode;
use object_store::PutOptions;
use object_store::path::Path;
use prost::Message;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use super::util::{manifest_filename, parse_bit_reversed_filename, region_manifest_path};
#[derive(Debug, Serialize, Deserialize)]
struct VersionHint {
version: u64,
}
#[derive(Debug)]
pub struct RegionManifestStore {
object_store: Arc<ObjectStore>,
region_id: Uuid,
manifest_dir: Path,
manifest_scan_batch_size: usize,
}
impl RegionManifestStore {
pub fn new(
object_store: Arc<ObjectStore>,
base_path: &Path,
region_id: Uuid,
manifest_scan_batch_size: usize,
) -> Self {
let manifest_dir = region_manifest_path(base_path, ®ion_id);
Self {
object_store,
region_id,
manifest_dir,
manifest_scan_batch_size,
}
}
pub async fn read_latest(&self) -> Result<Option<RegionManifest>> {
let version = self.find_latest_version().await?;
if version == 0 {
return Ok(None);
}
self.read_version(version).await.map(Some)
}
pub async fn read_version(&self, version: u64) -> Result<RegionManifest> {
let filename = manifest_filename(version);
let path = self.manifest_dir.child(filename.as_str());
let data = self.object_store.inner.get(&path).await.map_err(|e| {
Error::io(format!(
"Failed to read manifest version {} for region {}: {}",
version, self.region_id, e
))
})?;
let bytes = data
.bytes()
.await
.map_err(|e| Error::io(format!("Failed to read manifest bytes: {}", e)))?;
let pb_manifest = pb::RegionManifest::decode(bytes)
.map_err(|e| Error::io(format!("Failed to decode manifest protobuf: {}", e)))?;
RegionManifest::try_from(pb_manifest)
}
pub async fn write(&self, manifest: &RegionManifest) -> Result<u64> {
let version = manifest.version;
let filename = manifest_filename(version);
let path = self.manifest_dir.child(filename.as_str());
let pb_manifest = pb::RegionManifest::from(manifest);
let bytes = pb_manifest.encode_to_vec();
if self.object_store.is_local() {
let temp_filename = format!("{}.tmp.{}", filename, uuid::Uuid::new_v4());
let temp_path = self.manifest_dir.child(temp_filename.as_str());
self.object_store
.inner
.put(&temp_path, Bytes::from(bytes).into())
.await
.map_err(|e| Error::io(format!("Failed to write temp manifest: {}", e)))?;
match self
.object_store
.inner
.rename_if_not_exists(&temp_path, &path)
.await
{
Ok(()) => {}
Err(object_store::Error::AlreadyExists { .. }) => {
let _ = self.object_store.delete(&temp_path).await;
return Err(Error::io(format!(
"Manifest version {} already exists for region {}",
version, self.region_id
)));
}
Err(e) => {
let _ = self.object_store.delete(&temp_path).await;
return Err(Error::io(format!(
"Failed to write manifest version {} for region {}: {}",
version, self.region_id, e
)));
}
}
} else {
let put_opts = PutOptions {
mode: PutMode::Create,
..Default::default()
};
self.object_store
.inner
.put_opts(&path, Bytes::from(bytes).into(), put_opts)
.await
.map_err(|e| {
if matches!(e, object_store::Error::AlreadyExists { .. }) {
Error::io(format!(
"Manifest version {} already exists for region {}",
version, self.region_id
))
} else {
Error::io(format!(
"Failed to write manifest version {} for region {}: {}",
version, self.region_id, e
))
}
})?;
}
self.write_version_hint(version).await;
Ok(version)
}
async fn find_latest_version(&self) -> Result<u64> {
let hint = self.read_version_hint().await.unwrap_or(1);
let mut latest_found = 0u64;
if hint > 0 && self.version_exists(hint).await? {
latest_found = hint;
} else if hint > 1 {
if self.version_exists(1).await? {
latest_found = 1;
}
}
let batch_size = self.manifest_scan_batch_size;
loop {
let mut futures = FuturesUnordered::new();
for offset in 0..batch_size {
let version = latest_found + 1 + offset as u64;
futures.push(async move { (version, self.version_exists(version).await) });
}
let mut found_any = false;
while let Some((version, result)) = futures.next().await {
if let Ok(true) = result
&& version > latest_found
{
latest_found = version;
found_any = true;
}
}
if !found_any {
break;
}
}
Ok(latest_found)
}
async fn version_exists(&self, version: u64) -> Result<bool> {
let filename = manifest_filename(version);
let path = self.manifest_dir.child(filename.as_str());
match self.object_store.inner.head(&path).await {
Ok(_) => Ok(true),
Err(object_store::Error::NotFound { .. }) => Ok(false),
Err(e) => Err(Error::io(format!(
"HEAD request failed for version {}: {}",
version, e
))),
}
}
async fn read_version_hint(&self) -> Option<u64> {
let path = self.manifest_dir.child("version_hint.json");
let data = self.object_store.inner.get(&path).await.ok()?;
let bytes = data.bytes().await.ok()?;
let hint: VersionHint = serde_json::from_slice(&bytes).ok()?;
Some(hint.version)
}
async fn write_version_hint(&self, version: u64) {
let path = self.manifest_dir.child("version_hint.json");
let hint = VersionHint { version };
match serde_json::to_vec(&hint) {
Ok(bytes) => {
if let Err(e) = self
.object_store
.inner
.put(&path, Bytes::from(bytes).into())
.await
{
warn!(
"Failed to write version hint for region {}: {}",
self.region_id, e
);
}
}
Err(e) => {
warn!("Failed to serialize version hint: {}", e);
}
}
}
pub async fn list_versions(&self) -> Result<Vec<u64>> {
let mut versions = Vec::new();
let list_result = self
.object_store
.inner
.list(Some(&self.manifest_dir))
.collect::<Vec<_>>()
.await;
for item in list_result {
match item {
Ok(meta) => {
if let Some(filename) = meta.location.filename()
&& filename.ends_with(".binpb")
&& let Some(version) = parse_bit_reversed_filename(filename)
{
versions.push(version);
}
}
Err(e) => {
warn!("Error listing manifest directory: {}", e);
}
}
}
versions.sort_unstable();
Ok(versions)
}
pub fn region_id(&self) -> Uuid {
self.region_id
}
pub async fn claim_epoch(&self, region_spec_id: u32) -> Result<(u64, RegionManifest)> {
let current = self.read_latest().await?;
let (next_version, next_epoch, base_manifest) = match current {
Some(m) => (m.version + 1, m.writer_epoch + 1, Some(m)),
None => (1, 1, None),
};
let new_manifest = if let Some(base) = base_manifest {
RegionManifest {
version: next_version,
writer_epoch: next_epoch,
..base
}
} else {
RegionManifest {
region_id: self.region_id,
version: next_version,
region_spec_id,
writer_epoch: next_epoch,
replay_after_wal_entry_position: 0,
wal_entry_position_last_seen: 0,
current_generation: 1,
flushed_generations: vec![],
}
};
self.write(&new_manifest).await.map_err(|e| {
Error::io(format!(
"Failed to claim region {} (version {}): another writer may have claimed it: {}",
self.region_id, next_version, e
))
})?;
info!(
"Claimed region {} with epoch {} (version {})",
self.region_id, next_epoch, next_version
);
Ok((next_epoch, new_manifest))
}
pub async fn check_fenced(&self, local_epoch: u64) -> Result<()> {
let current = self.read_latest().await?;
Self::check_fenced_against(¤t, local_epoch, self.region_id)
}
fn check_fenced_against(
manifest: &Option<RegionManifest>,
local_epoch: u64,
region_id: Uuid,
) -> Result<()> {
match manifest {
Some(m) if m.writer_epoch > local_epoch => Err(Error::io(format!(
"Writer fenced: local epoch {} < stored epoch {} for region {}",
local_epoch, m.writer_epoch, region_id
))),
_ => Ok(()),
}
}
pub async fn commit_update<F>(&self, local_epoch: u64, prepare_fn: F) -> Result<RegionManifest>
where
F: Fn(&RegionManifest) -> RegionManifest,
{
const MAX_RETRIES: usize = 10;
for attempt in 0..MAX_RETRIES {
let current = self
.read_latest()
.await?
.ok_or_else(|| Error::io("Region manifest not found"))?;
Self::check_fenced_against(&Some(current.clone()), local_epoch, self.region_id)?;
let new_manifest = prepare_fn(¤t);
if new_manifest.writer_epoch != local_epoch {
return Err(Error::invalid_input(format!(
"Manifest epoch {} doesn't match local epoch {}",
new_manifest.writer_epoch, local_epoch
)));
}
match self.write(&new_manifest).await {
Ok(_) => {
return Ok(new_manifest);
}
Err(e) => {
let is_version_conflict = e.to_string().contains("already exists");
if is_version_conflict && attempt < MAX_RETRIES - 1 {
continue;
}
return Err(e);
}
}
}
Err(Error::io(format!(
"Failed to update manifest for region {} after {} attempts",
self.region_id, MAX_RETRIES
)))
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
async fn create_local_store() -> (Arc<ObjectStore>, Path, TempDir) {
let temp_dir = tempfile::tempdir().unwrap();
let uri = format!("file://{}", temp_dir.path().display());
let (store, path) = ObjectStore::from_uri(&uri).await.unwrap();
(store, path, temp_dir)
}
fn create_test_manifest(region_id: Uuid, version: u64, epoch: u64) -> RegionManifest {
RegionManifest {
region_id,
version,
region_spec_id: 0,
writer_epoch: epoch,
replay_after_wal_entry_position: 0,
wal_entry_position_last_seen: 0,
current_generation: 1,
flushed_generations: vec![],
}
}
#[tokio::test]
async fn test_read_latest_empty() {
let (store, base_path, _temp_dir) = create_local_store().await;
let region_id = Uuid::new_v4();
let manifest_store = RegionManifestStore::new(store, &base_path, region_id, 2);
let result = manifest_store.read_latest().await.unwrap();
assert!(result.is_none());
}
#[tokio::test]
async fn test_write_and_read_manifest() {
let (store, base_path, _temp_dir) = create_local_store().await;
let region_id = Uuid::new_v4();
let manifest_store = RegionManifestStore::new(store, &base_path, region_id, 2);
let manifest = create_test_manifest(region_id, 1, 1);
manifest_store.write(&manifest).await.unwrap();
let loaded = manifest_store.read_latest().await.unwrap().unwrap();
assert_eq!(loaded.version, 1);
assert_eq!(loaded.writer_epoch, 1);
assert_eq!(loaded.region_id, region_id);
}
#[tokio::test]
async fn test_multiple_versions() {
let (store, base_path, _temp_dir) = create_local_store().await;
let region_id = Uuid::new_v4();
let manifest_store = RegionManifestStore::new(store, &base_path, region_id, 2);
for version in 1..=5 {
let manifest = create_test_manifest(region_id, version, version);
manifest_store.write(&manifest).await.unwrap();
}
let loaded = manifest_store.read_latest().await.unwrap().unwrap();
assert_eq!(loaded.version, 5);
assert_eq!(loaded.writer_epoch, 5);
let versions = manifest_store.list_versions().await.unwrap();
assert_eq!(versions, vec![1, 2, 3, 4, 5]);
}
#[tokio::test]
async fn test_read_specific_version() {
let (store, base_path, _temp_dir) = create_local_store().await;
let region_id = Uuid::new_v4();
let manifest_store = RegionManifestStore::new(store, &base_path, region_id, 2);
for version in 1..=3 {
let manifest = create_test_manifest(region_id, version, version * 10);
manifest_store.write(&manifest).await.unwrap();
}
let v2 = manifest_store.read_version(2).await.unwrap();
assert_eq!(v2.version, 2);
assert_eq!(v2.writer_epoch, 20);
}
#[tokio::test]
async fn test_put_if_not_exists() {
let (store, base_path, _temp_dir) = create_local_store().await;
let region_id = Uuid::new_v4();
let manifest_store = RegionManifestStore::new(store, &base_path, region_id, 2);
let manifest1 = create_test_manifest(region_id, 1, 1);
manifest_store.write(&manifest1).await.unwrap();
let manifest2 = create_test_manifest(region_id, 1, 2);
let result = manifest_store.write(&manifest2).await;
assert!(result.is_err());
}
}