use crate::store_utils::{DEFAULT_TIMEOUT, get_with_timeout, list_with_timeout, put_with_timeout};
use anyhow::Result;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use object_store::ObjectStore;
use object_store::path::Path as ObjectStorePath;
use std::collections::HashMap;
use std::sync::Arc;
use tracing::instrument;
use uni_common::core::fork::ForkId;
use uni_common::core::snapshot::SnapshotManifest;
pub struct SnapshotManager {
store: Arc<dyn ObjectStore>,
fork_id: Option<ForkId>,
}
impl SnapshotManager {
pub fn new(store: Arc<dyn ObjectStore>) -> Self {
Self {
store,
fork_id: None,
}
}
pub fn new_for_fork(store: Arc<dyn ObjectStore>, fork_id: ForkId) -> Self {
Self {
store,
fork_id: Some(fork_id),
}
}
pub fn is_fork_scoped(&self) -> bool {
self.fork_id.is_some()
}
fn catalog_prefix(&self) -> String {
match &self.fork_id {
Some(id) => format!("catalog/forks/{id}"),
None => "catalog".to_string(),
}
}
fn manifest_path(&self, snapshot_id: &str) -> ObjectStorePath {
ObjectStorePath::from(format!(
"{}/manifests/{}.json",
self.catalog_prefix(),
snapshot_id
))
}
fn latest_ptr_path(&self) -> ObjectStorePath {
ObjectStorePath::from(format!("{}/latest", self.catalog_prefix()))
}
fn named_snapshots_path(&self) -> ObjectStorePath {
ObjectStorePath::from(format!("{}/named_snapshots.json", self.catalog_prefix()))
}
#[instrument(skip(self, manifest), fields(snapshot_id = %manifest.snapshot_id, size_bytes), level = "info")]
pub async fn save_snapshot(&self, manifest: &SnapshotManifest) -> Result<()> {
let path = self.manifest_path(&manifest.snapshot_id);
let json = serde_json::to_string_pretty(manifest)?;
tracing::Span::current().record("size_bytes", json.len());
put_with_timeout(&self.store, &path, Bytes::from(json), DEFAULT_TIMEOUT).await?;
Ok(())
}
#[instrument(skip(self), level = "info")]
pub async fn load_snapshot(&self, snapshot_id: &str) -> Result<SnapshotManifest> {
match self
.load_snapshot_at(&self.manifest_path(snapshot_id))
.await
{
Ok(m) => Ok(m),
Err(e) if self.fork_id.is_some() => {
let primary_path =
ObjectStorePath::from(format!("catalog/manifests/{snapshot_id}.json"));
self.load_snapshot_at(&primary_path).await.map_err(|_| e)
}
Err(e) => Err(e),
}
}
async fn load_snapshot_at(&self, path: &ObjectStorePath) -> Result<SnapshotManifest> {
let result = get_with_timeout(&self.store, path, DEFAULT_TIMEOUT).await?;
let bytes = result.bytes().await?;
let content = String::from_utf8(bytes.to_vec())?;
let manifest: SnapshotManifest = serde_json::from_str(&content)?;
Ok(manifest)
}
pub async fn list_snapshots(&self) -> Result<Vec<String>> {
let prefix = ObjectStorePath::from(format!("{}/manifests", self.catalog_prefix()));
let metas = list_with_timeout(&self.store, Some(&prefix), DEFAULT_TIMEOUT).await?;
let mut ids = Vec::new();
for meta in metas {
if let Some(filename) = meta.location.filename()
&& filename.ends_with(".json")
{
ids.push(filename.trim_end_matches(".json").to_string());
}
}
Ok(ids)
}
pub async fn has_any_manifests(&self) -> Result<bool> {
let ids = self.list_snapshots().await?;
Ok(!ids.is_empty())
}
pub async fn load_latest_snapshot(&self) -> Result<Option<SnapshotManifest>> {
let latest_path = self.latest_ptr_path();
match get_with_timeout(&self.store, &latest_path, DEFAULT_TIMEOUT).await {
Ok(result) => {
let bytes = result.bytes().await.map_err(anyhow::Error::from)?;
let snapshot_id = String::from_utf8(bytes.to_vec())?;
let snapshot_id = snapshot_id.trim();
if snapshot_id.is_empty() {
return Ok(None);
}
Ok(Some(self.load_snapshot(snapshot_id).await?))
}
Err(e) if e.to_string().contains("not found") => Ok(None),
Err(e) => Err(e),
}
}
#[instrument(skip(self), level = "info")]
pub async fn set_latest_snapshot(&self, snapshot_id: &str) -> Result<()> {
let path = self.latest_ptr_path();
put_with_timeout(
&self.store,
&path,
Bytes::from(snapshot_id.to_string()),
DEFAULT_TIMEOUT,
)
.await?;
Ok(())
}
pub async fn load_named_snapshots(&self) -> Result<HashMap<String, String>> {
let path = self.named_snapshots_path();
match get_with_timeout(&self.store, &path, DEFAULT_TIMEOUT).await {
Ok(result) => {
let bytes = result.bytes().await?;
let content = String::from_utf8(bytes.to_vec())?;
Ok(serde_json::from_str(&content)?)
}
Err(_) => Ok(HashMap::new()),
}
}
pub async fn save_named_snapshot(&self, name: &str, snapshot_id: &str) -> Result<()> {
let mut map = self.load_named_snapshots().await?;
map.insert(name.to_string(), snapshot_id.to_string());
let json = serde_json::to_string_pretty(&map)?;
put_with_timeout(
&self.store,
&self.named_snapshots_path(),
Bytes::from(json),
DEFAULT_TIMEOUT,
)
.await?;
Ok(())
}
pub async fn get_named_snapshot(&self, name: &str) -> Result<Option<String>> {
let map = self.load_named_snapshots().await?;
Ok(map.get(name).cloned())
}
pub async fn find_snapshot_at_time(
&self,
target: DateTime<Utc>,
) -> Result<Option<SnapshotManifest>> {
let ids = self.list_snapshots().await?;
let mut best: Option<SnapshotManifest> = None;
for id in ids {
let m = self.load_snapshot(&id).await?;
if m.created_at <= target && best.as_ref().is_none_or(|b| m.created_at > b.created_at) {
best = Some(m);
}
}
Ok(best)
}
}
pub fn fsync_snapshot_pointer(
local_root: Option<&std::path::Path>,
fork_id: Option<&ForkId>,
snapshot_id: &str,
) -> std::io::Result<()> {
let Some(root) = local_root else {
return Ok(());
};
let prefix = match fork_id {
Some(id) => root.join("catalog").join("forks").join(id.to_string()),
None => root.join("catalog"),
};
let manifest = prefix.join("manifests").join(format!("{snapshot_id}.json"));
let latest = prefix.join("latest");
crate::runtime::wal::sync_file_and_parent(&manifest)?;
crate::runtime::wal::sync_file_and_parent(&latest)?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_fsync_snapshot_pointer_noop_for_remote() {
assert!(fsync_snapshot_pointer(None, None, "snap-1").is_ok());
}
#[test]
fn test_fsync_snapshot_pointer_syncs_local_artifacts() {
let dir = tempfile::TempDir::new().unwrap();
let root = dir.path();
let manifests = root.join("catalog").join("manifests");
std::fs::create_dir_all(&manifests).unwrap();
std::fs::write(manifests.join("snap-1.json"), b"{}").unwrap();
std::fs::write(root.join("catalog").join("latest"), b"snap-1").unwrap();
fsync_snapshot_pointer(Some(root), None, "snap-1").unwrap();
}
}