use crate::io::FileIO;
use crate::spec::Snapshot;
use std::str;
const SNAPSHOT_DIR: &str = "snapshot";
const SNAPSHOT_PREFIX: &str = "snapshot-";
const LATEST_HINT: &str = "LATEST";
const EARLIEST_HINT: &str = "EARLIEST";
#[derive(Debug, Clone)]
pub struct SnapshotManager {
file_io: FileIO,
table_path: String,
}
impl SnapshotManager {
pub fn new(file_io: FileIO, table_path: String) -> Self {
Self {
file_io,
table_path,
}
}
pub fn file_io(&self) -> &FileIO {
&self.file_io
}
pub fn snapshot_dir(&self) -> String {
format!("{}/{}", self.table_path, SNAPSHOT_DIR)
}
fn latest_hint_path(&self) -> String {
format!("{}/{}", self.snapshot_dir(), LATEST_HINT)
}
fn earliest_hint_path(&self) -> String {
format!("{}/{}", self.snapshot_dir(), EARLIEST_HINT)
}
pub fn snapshot_path(&self, snapshot_id: i64) -> String {
format!("{}/snapshot-{}", self.snapshot_dir(), snapshot_id)
}
pub fn manifest_dir(&self) -> String {
format!("{}/manifest", self.table_path)
}
pub fn manifest_path(&self, manifest_name: &str) -> String {
format!("{}/{}", self.manifest_dir(), manifest_name)
}
async fn read_hint(&self, path: &str) -> Option<i64> {
let input = self.file_io.new_input(path).ok()?;
let content = input.read().await.ok()?;
let id_str = str::from_utf8(&content).ok()?;
id_str.trim().parse().ok()
}
async fn find_by_list_files(&self, reducer: fn(i64, i64) -> i64) -> crate::Result<Option<i64>> {
let snapshot_dir = self.snapshot_dir();
let statuses = self.file_io.list_status(&snapshot_dir).await?;
let mut result: Option<i64> = None;
for status in statuses {
if status.is_dir {
continue;
}
let name = status.path.rsplit('/').next().unwrap_or(&status.path);
if let Some(id_str) = name.strip_prefix(SNAPSHOT_PREFIX) {
if let Ok(id) = id_str.parse::<i64>() {
result = Some(match result {
Some(r) => reducer(r, id),
None => id,
});
}
}
}
Ok(result)
}
pub async fn get_latest_snapshot_id(&self) -> crate::Result<Option<i64>> {
let hint_path = self.latest_hint_path();
if let Some(hint_id) = self.read_hint(&hint_path).await {
if hint_id > 0 {
let next_path = self.snapshot_path(hint_id + 1);
let next_input = self.file_io.new_input(&next_path)?;
if !next_input.exists().await? {
return Ok(Some(hint_id));
}
}
}
self.find_by_list_files(i64::max).await
}
pub async fn earliest_snapshot_id(&self) -> crate::Result<Option<i64>> {
let hint_path = self.earliest_hint_path();
if let Some(hint_id) = self.read_hint(&hint_path).await {
let snap_path = self.snapshot_path(hint_id);
let snap_input = self.file_io.new_input(&snap_path)?;
if snap_input.exists().await? {
return Ok(Some(hint_id));
}
}
self.find_by_list_files(i64::min).await
}
pub async fn get_snapshot(&self, snapshot_id: i64) -> crate::Result<Snapshot> {
let snapshot_path = self.snapshot_path(snapshot_id);
let snap_input = self.file_io.new_input(&snapshot_path)?;
if !snap_input.exists().await? {
return Err(crate::Error::DataInvalid {
message: format!("snapshot file does not exist: {snapshot_path}"),
source: None,
});
}
let snap_bytes = snap_input.read().await?;
let snapshot: Snapshot =
serde_json::from_slice(&snap_bytes).map_err(|e| crate::Error::DataInvalid {
message: format!("snapshot JSON invalid: {e}"),
source: Some(Box::new(e)),
})?;
if snapshot.id() != snapshot_id {
return Err(crate::Error::DataInvalid {
message: format!(
"snapshot file id mismatch: in file name is {snapshot_id}, but file contains snapshot id {}",
snapshot.id()
),
source: None
});
}
Ok(snapshot)
}
pub async fn get_latest_snapshot(&self) -> crate::Result<Option<Snapshot>> {
let snapshot_id = match self.get_latest_snapshot_id().await? {
Some(id) => id,
None => return Ok(None),
};
let snapshot = self.get_snapshot(snapshot_id).await?;
Ok(Some(snapshot))
}
pub async fn commit_snapshot(&self, snapshot: &Snapshot) -> crate::Result<bool> {
let target_path = self.snapshot_path(snapshot.id());
let json = serde_json::to_string(snapshot).map_err(|e| crate::Error::DataInvalid {
message: format!("failed to serialize snapshot: {e}"),
source: Some(Box::new(e)),
})?;
let tmp_path = format!("{}.tmp-{}", target_path, uuid::Uuid::new_v4());
let output = self.file_io.new_output(&tmp_path)?;
output.write(bytes::Bytes::from(json.clone())).await?;
if self.file_io.exists(&target_path).await? {
let _ = self.file_io.delete_file(&tmp_path).await;
return Ok(false);
}
match self.file_io.rename(&tmp_path, &target_path).await {
Ok(()) => {}
Err(_) => {
let _ = self.file_io.delete_file(&tmp_path).await;
if self.file_io.exists(&target_path).await? {
return Ok(false);
}
let output = self.file_io.new_output(&target_path)?;
output.write(bytes::Bytes::from(json)).await?;
}
}
let _ = self.write_latest_hint(snapshot.id()).await;
Ok(true)
}
pub async fn write_latest_hint(&self, snapshot_id: i64) -> crate::Result<()> {
let hint_path = self.latest_hint_path();
let output = self.file_io.new_output(&hint_path)?;
output
.write(bytes::Bytes::from(snapshot_id.to_string()))
.await
}
pub async fn earlier_or_equal_time_mills(
&self,
timestamp_millis: i64,
) -> crate::Result<Option<Snapshot>> {
let mut latest = match self.get_latest_snapshot_id().await? {
Some(id) => id,
None => return Ok(None),
};
let earliest_snapshot = match self.earliest_snapshot_id().await? {
Some(id) => self.get_snapshot(id).await?,
None => return Ok(None),
};
if (earliest_snapshot.time_millis() as i64) > timestamp_millis {
return Ok(None);
}
let mut earliest = earliest_snapshot.id();
let mut result: Option<Snapshot> = None;
while earliest <= latest {
let mid = earliest + (latest - earliest) / 2;
let snapshot = self.get_snapshot(mid).await?;
let commit_time = snapshot.time_millis() as i64;
if commit_time > timestamp_millis {
latest = mid - 1;
} else if commit_time < timestamp_millis {
earliest = mid + 1;
result = Some(snapshot);
} else {
result = Some(snapshot);
break;
}
}
Ok(result)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::io::FileIOBuilder;
use crate::spec::CommitKind;
fn test_file_io() -> FileIO {
FileIOBuilder::new("memory").build().unwrap()
}
async fn setup(table_path: &str) -> (FileIO, SnapshotManager) {
let file_io = test_file_io();
file_io
.mkdirs(&format!("{table_path}/snapshot/"))
.await
.unwrap();
let sm = SnapshotManager::new(file_io.clone(), table_path.to_string());
(file_io, sm)
}
fn test_snapshot(id: i64) -> Snapshot {
Snapshot::builder()
.version(3)
.id(id)
.schema_id(0)
.base_manifest_list("base-list".to_string())
.delta_manifest_list("delta-list".to_string())
.commit_user("test-user".to_string())
.commit_identifier(0)
.commit_kind(CommitKind::APPEND)
.time_millis(1000 * id as u64)
.build()
}
#[tokio::test]
async fn test_commit_snapshot_first() {
let (_, sm) = setup("memory:/test_commit_first").await;
let snap = test_snapshot(1);
let result = sm.commit_snapshot(&snap).await.unwrap();
assert!(result);
let loaded = sm.get_snapshot(1).await.unwrap();
assert_eq!(loaded.id(), 1);
}
#[tokio::test]
async fn test_commit_snapshot_already_exists() {
let (_, sm) = setup("memory:/test_commit_exists").await;
let snap = test_snapshot(1);
assert!(sm.commit_snapshot(&snap).await.unwrap());
let result = sm.commit_snapshot(&snap).await.unwrap();
assert!(!result);
}
#[tokio::test]
async fn test_commit_updates_latest_hint() {
let (_, sm) = setup("memory:/test_commit_hint").await;
let snap = test_snapshot(1);
sm.commit_snapshot(&snap).await.unwrap();
let latest_id = sm.get_latest_snapshot_id().await.unwrap();
assert_eq!(latest_id, Some(1));
}
#[tokio::test]
async fn test_write_latest_hint() {
let (_, sm) = setup("memory:/test_write_hint").await;
sm.write_latest_hint(42).await.unwrap();
let hint = sm.read_hint(&sm.latest_hint_path()).await;
assert_eq!(hint, Some(42));
}
}