use std::sync::Arc;
use bytes::Bytes;
use object_store::path::Path as OsPath;
use object_store::{ObjectStore, ObjectStoreExt, PutPayload};
use serde::{Deserialize, Serialize};
const MANIFEST_PATH: &str = "catalog/manifest.json";
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct CatalogManifestEntry {
pub name: String,
pub ddl: String,
}
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct CatalogManifest {
pub version: u64,
pub updated_at_ms: i64,
pub entries: Vec<CatalogManifestEntry>,
}
pub struct CatalogManifestStore {
store: Arc<dyn ObjectStore>,
}
impl std::fmt::Debug for CatalogManifestStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CatalogManifestStore")
.finish_non_exhaustive()
}
}
#[derive(Debug, thiserror::Error)]
pub enum CatalogManifestError {
#[error("object store I/O: {0}")]
Io(String),
#[error("JSON: {0}")]
Json(#[from] serde_json::Error),
}
impl CatalogManifestStore {
#[must_use]
pub fn new(store: Arc<dyn ObjectStore>) -> Self {
Self { store }
}
fn path() -> OsPath {
OsPath::from(MANIFEST_PATH)
}
pub async fn load(&self) -> Result<Option<CatalogManifest>, CatalogManifestError> {
match self.store.get(&Self::path()).await {
Ok(res) => {
let bytes = res
.bytes()
.await
.map_err(|e| CatalogManifestError::Io(e.to_string()))?;
Ok(Some(serde_json::from_slice(&bytes)?))
}
Err(object_store::Error::NotFound { .. }) => Ok(None),
Err(e) => Err(CatalogManifestError::Io(e.to_string())),
}
}
pub async fn save(&self, manifest: &CatalogManifest) -> Result<(), CatalogManifestError> {
let bytes = serde_json::to_vec_pretty(manifest)?;
self.store
.put(&Self::path(), PutPayload::from(Bytes::from(bytes)))
.await
.map_err(|e| CatalogManifestError::Io(e.to_string()))?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use object_store::local::LocalFileSystem;
use tempfile::tempdir;
fn store_in(dir: &std::path::Path) -> CatalogManifestStore {
let fs: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new_with_prefix(dir).unwrap());
CatalogManifestStore::new(fs)
}
fn entry(name: &str, ddl: &str) -> CatalogManifestEntry {
CatalogManifestEntry {
name: name.to_string(),
ddl: ddl.to_string(),
}
}
#[tokio::test]
async fn load_missing_returns_none() {
let dir = tempdir().unwrap();
let s = store_in(dir.path());
assert!(s.load().await.unwrap().is_none());
}
#[tokio::test]
async fn save_then_load_preserves_order() {
let dir = tempdir().unwrap();
let s = store_in(dir.path());
let manifest = CatalogManifest {
version: 3,
updated_at_ms: 123,
entries: vec![
entry("src", "CREATE SOURCE src (k BIGINT)"),
entry("mv1", "CREATE MATERIALIZED VIEW mv1 AS SELECT k FROM src"),
entry("mv2", "CREATE MATERIALIZED VIEW mv2 AS SELECT k FROM mv1"),
],
};
s.save(&manifest).await.unwrap();
let loaded = s.load().await.unwrap().unwrap();
assert_eq!(loaded, manifest);
assert_eq!(loaded.entries[0].name, "src");
assert_eq!(loaded.entries[2].name, "mv2");
}
#[tokio::test]
async fn save_overwrites_in_place() {
let dir = tempdir().unwrap();
let s = store_in(dir.path());
s.save(&CatalogManifest {
version: 1,
updated_at_ms: 1,
entries: vec![entry("a", "CREATE SOURCE a (k BIGINT)")],
})
.await
.unwrap();
let v2 = CatalogManifest {
version: 2,
updated_at_ms: 2,
entries: vec![
entry("a", "CREATE SOURCE a (k BIGINT)"),
entry("b", "CREATE SOURCE b (k BIGINT)"),
],
};
s.save(&v2).await.unwrap();
assert_eq!(s.load().await.unwrap().unwrap(), v2);
}
}