use std::path::{Path, PathBuf};
use std::sync::Arc;
use anyhow::{anyhow, Context, Result};
use arrow::array::{Array, Int32Array, Int64Array, LargeBinaryArray, RecordBatch, StringArray, TimestampMicrosecondArray};
use chrono::Utc;
use futures::TryStreamExt;
use iceberg::arrow::schema_to_arrow_schema;
use iceberg::expr::Reference;
use iceberg::spec::Datum;
use iceberg::Catalog;
use sha2::{Digest, Sha256};
use uuid::Uuid;
use crate::warehouse::iceberg::{
append_batch, IcebergWarehouse, TABLE_TANTIVY_INDEX_BLOBS, TABLE_TANTIVY_INDEX_SNAPSHOTS,
};
fn col<'a, T: 'static>(batch: &'a RecordBatch, name: &str) -> Result<&'a T> {
batch
.column_by_name(name)
.ok_or_else(|| anyhow!("projected batch missing column `{name}`"))?
.as_any()
.downcast_ref::<T>()
.ok_or_else(|| anyhow!("column `{name}` has unexpected arrow type"))
}
fn hex_encode(bytes: &[u8]) -> String {
const HEX: &[u8; 16] = b"0123456789abcdef";
let mut s = String::with_capacity(bytes.len() * 2);
for &b in bytes {
s.push(HEX[(b >> 4) as usize] as char);
s.push(HEX[(b & 0x0f) as usize] as char);
}
s
}
#[derive(Debug, Clone)]
pub struct SnapshotRef {
pub snapshot_id: Uuid,
pub workspace: String,
pub repo: String,
pub git_sha: String,
pub branch: String,
pub schema_hash: String,
pub blob_count: i32,
pub total_bytes: i64,
}
struct Blob {
filename: String,
bytes: Vec<u8>,
sha256: String,
}
fn collect_blobs(dir: &Path) -> Result<Vec<Blob>> {
let mut out = Vec::new();
if !dir.exists() {
return Ok(out);
}
let mut stack: Vec<PathBuf> = vec![dir.to_path_buf()];
while let Some(cur) = stack.pop() {
for entry in std::fs::read_dir(&cur)
.with_context(|| format!("read_dir {}", cur.display()))?
{
let entry = entry?;
let path = entry.path();
let meta = entry.metadata()?;
if meta.is_dir() {
stack.push(path);
} else if meta.is_file() {
let bytes = std::fs::read(&path)
.with_context(|| format!("read {}", path.display()))?;
let mut h = Sha256::new();
h.update(&bytes);
let sha256 = hex_encode(&h.finalize());
let filename = path
.strip_prefix(dir)
.unwrap_or(&path)
.to_string_lossy()
.into_owned();
out.push(Blob { filename, bytes, sha256 });
}
}
}
out.sort_by(|a, b| a.filename.cmp(&b.filename));
Ok(out)
}
fn schema_hash_of(blobs: &[Blob]) -> String {
let mut h = Sha256::new();
for b in blobs {
h.update(b.filename.as_bytes());
h.update(b":");
h.update(b.sha256.as_bytes());
h.update(b"\n");
}
hex_encode(&h.finalize())
}
async fn find_existing_snapshot(
wh: &IcebergWarehouse,
snap_table: &str,
repo: &str,
git_sha: &str,
schema_hash: &str,
) -> Result<Option<Uuid>> {
let table = wh
.catalog()
.load_table(&wh.table_ident(snap_table))
.await?;
let predicate = Reference::new("repo")
.equal_to(Datum::string(repo))
.and(Reference::new("git_sha").equal_to(Datum::string(git_sha)))
.and(Reference::new("schema_hash").equal_to(Datum::string(schema_hash)));
let scan = table
.scan()
.with_filter(predicate)
.select(["snapshot_id", "repo", "git_sha", "schema_hash"])
.build()?;
let stream = scan.to_arrow().await?;
let batches: Vec<RecordBatch> = stream.try_collect().await?;
for batch in &batches {
let ids = col::<StringArray>(batch, "snapshot_id")?;
let repos = col::<StringArray>(batch, "repo")?;
let shas = col::<StringArray>(batch, "git_sha")?;
let hashes = col::<StringArray>(batch, "schema_hash")?;
for i in 0..batch.num_rows() {
if repos.value(i) == repo
&& shas.value(i) == git_sha
&& hashes.value(i) == schema_hash
{
return Ok(Some(Uuid::parse_str(ids.value(i))?));
}
}
}
Ok(None)
}
pub fn snapshot_to_iceberg(
wh: &IcebergWarehouse,
workspace: &str,
repo: &str,
git_sha: &str,
branch: &str,
index_dir: &Path,
) -> Result<SnapshotRef> {
snapshot_dir_to_iceberg(
wh,
TABLE_TANTIVY_INDEX_SNAPSHOTS,
TABLE_TANTIVY_INDEX_BLOBS,
workspace,
repo,
git_sha,
branch,
index_dir,
)
}
#[allow(clippy::too_many_arguments)]
pub fn snapshot_dir_to_iceberg(
wh: &IcebergWarehouse,
snap_table: &str,
blob_table: &str,
workspace: &str,
repo: &str,
git_sha: &str,
branch: &str,
index_dir: &Path,
) -> Result<SnapshotRef> {
let blobs = collect_blobs(index_dir)
.with_context(|| format!("walk index dir {}", index_dir.display()))?;
if blobs.is_empty() {
return Err(anyhow!(
"no files under {} — nothing to snapshot",
index_dir.display()
));
}
let schema_hash = schema_hash_of(&blobs);
let blob_count: i32 = blobs.len() as i32;
let total_bytes: i64 = blobs.iter().map(|b| b.bytes.len() as i64).sum();
wh.block_on(async {
if let Some(existing) =
find_existing_snapshot(wh, snap_table, repo, git_sha, &schema_hash).await?
{
return Ok(SnapshotRef {
snapshot_id: existing,
workspace: workspace.to_string(),
repo: repo.to_string(),
git_sha: git_sha.to_string(),
branch: branch.to_string(),
schema_hash,
blob_count,
total_bytes,
});
}
let snapshot_id = Uuid::new_v4();
let id_str = snapshot_id.to_string();
let ts = Utc::now();
let s_table = wh
.catalog()
.load_table(&wh.table_ident(snap_table))
.await?;
let s_schema = Arc::new(schema_to_arrow_schema(s_table.metadata().current_schema())?);
let s_cols: Vec<Arc<dyn Array>> = vec![
Arc::new(StringArray::from(vec![id_str.clone()])),
Arc::new(StringArray::from(vec![workspace.to_string()])),
Arc::new(StringArray::from(vec![repo.to_string()])),
Arc::new(StringArray::from(vec![git_sha.to_string()])),
Arc::new(StringArray::from(vec![branch.to_string()])),
Arc::new(
TimestampMicrosecondArray::from(vec![ts.timestamp_micros()])
.with_timezone("+00:00"),
),
Arc::new(StringArray::from(vec![schema_hash.clone()])),
Arc::new(Int32Array::from(vec![blob_count])),
Arc::new(Int64Array::from(vec![total_bytes])),
];
let s_batch = RecordBatch::try_new(s_schema, s_cols)?;
append_batch(wh.catalog(), s_table, s_batch).await?;
let b_table = wh
.catalog()
.load_table(&wh.table_ident(blob_table))
.await?;
let b_schema = Arc::new(schema_to_arrow_schema(b_table.metadata().current_schema())?);
let mut ids = Vec::with_capacity(blobs.len());
let mut names = Vec::with_capacity(blobs.len());
let mut payloads: Vec<&[u8]> = Vec::with_capacity(blobs.len());
let mut lens = Vec::with_capacity(blobs.len());
let mut hashes = Vec::with_capacity(blobs.len());
for b in &blobs {
ids.push(id_str.clone());
names.push(b.filename.clone());
payloads.push(b.bytes.as_slice());
lens.push(b.bytes.len() as i32);
hashes.push(b.sha256.clone());
}
let b_cols: Vec<Arc<dyn Array>> = vec![
Arc::new(StringArray::from(ids)),
Arc::new(StringArray::from(names)),
Arc::new(LargeBinaryArray::from(payloads)),
Arc::new(Int32Array::from(lens)),
Arc::new(StringArray::from(hashes)),
];
let b_batch = RecordBatch::try_new(b_schema, b_cols)?;
append_batch(wh.catalog(), b_table, b_batch).await?;
Ok(SnapshotRef {
snapshot_id,
workspace: workspace.to_string(),
repo: repo.to_string(),
git_sha: git_sha.to_string(),
branch: branch.to_string(),
schema_hash,
blob_count,
total_bytes,
})
})
}
async fn resolve_snapshot_id(
wh: &IcebergWarehouse,
snap_table: &str,
repo: &str,
git_sha: Option<&str>,
) -> Result<(Uuid, String, String)> {
let table = wh
.catalog()
.load_table(&wh.table_ident(snap_table))
.await?;
let mut predicate = Reference::new("repo").equal_to(Datum::string(repo));
if let Some(want) = git_sha {
predicate = predicate.and(Reference::new("git_sha").equal_to(Datum::string(want)));
}
let scan = table
.scan()
.with_filter(predicate)
.select(["snapshot_id", "repo", "git_sha", "ts_micros"])
.build()?;
let stream = scan.to_arrow().await?;
let batches: Vec<RecordBatch> = stream.try_collect().await?;
let mut candidates: Vec<(Uuid, String, i64)> = Vec::new(); for batch in &batches {
let ids = col::<StringArray>(batch, "snapshot_id")?;
let repos = col::<StringArray>(batch, "repo")?;
let shas = col::<StringArray>(batch, "git_sha")?;
let ts = col::<TimestampMicrosecondArray>(batch, "ts_micros")?;
for i in 0..batch.num_rows() {
if repos.value(i) != repo {
continue;
}
if let Some(want) = git_sha {
if shas.value(i) != want {
continue;
}
}
candidates.push((
Uuid::parse_str(ids.value(i))?,
shas.value(i).to_string(),
ts.value(i),
));
}
}
if candidates.is_empty() {
return Err(anyhow!(
"no tantivy_index_snapshot for repo `{repo}`{}",
git_sha
.map(|s| format!(" at sha {s}"))
.unwrap_or_default()
));
}
candidates.sort_by(|a, b| a.2.cmp(&b.2));
let chosen = candidates.pop().unwrap();
Ok((chosen.0, chosen.1, format!("ts_micros={}", chosen.2)))
}
pub fn restore_from_iceberg(
wh: &IcebergWarehouse,
repo: &str,
git_sha: Option<&str>,
into: &Path,
) -> Result<SnapshotRef> {
restore_dir_from_iceberg(
wh,
TABLE_TANTIVY_INDEX_SNAPSHOTS,
TABLE_TANTIVY_INDEX_BLOBS,
repo,
git_sha,
into,
)
}
pub fn restore_dir_from_iceberg(
wh: &IcebergWarehouse,
snap_table: &str,
blob_table: &str,
repo: &str,
git_sha: Option<&str>,
into: &Path,
) -> Result<SnapshotRef> {
std::fs::create_dir_all(into)
.with_context(|| format!("create restore dir {}", into.display()))?;
wh.block_on(async {
let (snapshot_id, sha, _) = resolve_snapshot_id(wh, snap_table, repo, git_sha).await?;
let s_table = wh
.catalog()
.load_table(&wh.table_ident(snap_table))
.await?;
let id_str = snapshot_id.to_string();
let scan = s_table
.scan()
.with_filter(Reference::new("snapshot_id").equal_to(Datum::string(id_str.clone())))
.build()?;
let stream = scan.to_arrow().await?;
let s_batches: Vec<RecordBatch> = stream.try_collect().await?;
let mut meta: Option<SnapshotRef> = None;
'outer: for batch in &s_batches {
let ids = col::<StringArray>(batch, "snapshot_id")?;
let wss = col::<StringArray>(batch, "workspace")?;
let reps = col::<StringArray>(batch, "repo")?;
let shas = col::<StringArray>(batch, "git_sha")?;
let brs = col::<StringArray>(batch, "branch")?;
let hashes = col::<StringArray>(batch, "schema_hash")?;
let counts = col::<Int32Array>(batch, "blob_count")?;
let totals = col::<Int64Array>(batch, "total_bytes")?;
for i in 0..batch.num_rows() {
if ids.value(i) == id_str {
meta = Some(SnapshotRef {
snapshot_id,
workspace: wss.value(i).to_string(),
repo: reps.value(i).to_string(),
git_sha: shas.value(i).to_string(),
branch: brs.value(i).to_string(),
schema_hash: hashes.value(i).to_string(),
blob_count: counts.value(i),
total_bytes: totals.value(i),
});
break 'outer;
}
}
}
let meta = meta.ok_or_else(|| anyhow!("snapshot {snapshot_id} metadata vanished"))?;
let b_table = wh
.catalog()
.load_table(&wh.table_ident(blob_table))
.await?;
let scan = b_table
.scan()
.with_filter(Reference::new("snapshot_id").equal_to(Datum::string(id_str.clone())))
.select(["snapshot_id", "filename", "bytes"])
.build()?;
let stream = scan.to_arrow().await?;
let b_batches: Vec<RecordBatch> = stream.try_collect().await?;
let mut written = 0usize;
for batch in &b_batches {
let ids = col::<StringArray>(batch, "snapshot_id")?;
let names = col::<StringArray>(batch, "filename")?;
let bytes = col::<LargeBinaryArray>(batch, "bytes")?;
for i in 0..batch.num_rows() {
if ids.value(i) != id_str {
continue;
}
let rel = names.value(i);
let path = into.join(rel);
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent).ok();
}
std::fs::write(&path, bytes.value(i))
.with_context(|| format!("write {}", path.display()))?;
written += 1;
}
}
if written == 0 {
return Err(anyhow!(
"snapshot {snapshot_id} for sha {sha} has 0 blob rows — table corrupt?"
));
}
Ok(meta)
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::warehouse::iceberg::IcebergWarehouse;
#[test]
fn round_trip_snapshot_then_restore() {
let root = tempfile::tempdir().expect("tempdir");
let warehouse_dir = root.path().join("warehouse");
let src_dir = root.path().join("src_index");
let dst_dir = root.path().join("restored_index");
std::fs::create_dir_all(&src_dir).unwrap();
let payloads: Vec<(&str, Vec<u8>)> = vec![
("meta.json", br#"{"segments":[]}"#.to_vec()),
("00000000000000000000000000000001.term", (0u8..=255).collect()),
("00000000000000000000000000000001.idx", vec![0xDEu8; 4096]),
("00000000000000000000000000000001.pos", vec![0xABu8; 1024]),
];
for (name, bytes) in &payloads {
std::fs::write(src_dir.join(name), bytes).unwrap();
}
let wh = IcebergWarehouse::open(&warehouse_dir).expect("open warehouse");
let sha = "0000000000000000000000000000000000000000";
let snap1 = snapshot_to_iceberg(&wh, "ws_test", "repo_test", sha, "main", &src_dir)
.expect("snapshot1");
assert_eq!(snap1.blob_count, 4);
assert_eq!(snap1.total_bytes, payloads.iter().map(|(_, b)| b.len() as i64).sum::<i64>());
let snap2 = snapshot_to_iceberg(&wh, "ws_test", "repo_test", sha, "main", &src_dir)
.expect("snapshot2");
assert_eq!(snap1.snapshot_id, snap2.snapshot_id, "dedup must return same UUID");
let restored = restore_from_iceberg(&wh, "repo_test", Some(sha), &dst_dir)
.expect("restore");
assert_eq!(restored.snapshot_id, snap1.snapshot_id);
for (name, bytes) in &payloads {
let got = std::fs::read(dst_dir.join(name)).expect("read restored");
assert_eq!(&got, bytes, "blob `{name}` mismatch after restore");
}
let dst2 = root.path().join("restored_index2");
let restored2 = restore_from_iceberg(&wh, "repo_test", None, &dst2).expect("restore latest");
assert_eq!(restored2.snapshot_id, snap1.snapshot_id);
}
#[test]
fn restore_selects_correct_snapshot_among_many() {
let root = tempfile::tempdir().expect("tempdir");
let wh = IcebergWarehouse::open(&root.path().join("warehouse")).expect("open");
let mk = |dir: &Path, marker: &str| {
std::fs::create_dir_all(dir).unwrap();
std::fs::write(dir.join(format!("{marker}.seg")), vec![marker.as_bytes()[0]; 512]).unwrap();
std::fs::write(dir.join("meta.json"), format!(r#"{{"m":"{marker}"}}"#)).unwrap();
};
let sha_a1 = "1111111111111111111111111111111111111111";
let sha_a2 = "2222222222222222222222222222222222222222";
let sha_b1 = "3333333333333333333333333333333333333333";
let (da1, da2, db1) = (root.path().join("a1"), root.path().join("a2"), root.path().join("b1"));
mk(&da1, "a"); mk(&da2, "x"); mk(&db1, "b");
let s_a1 = snapshot_to_iceberg(&wh, "ws", "repo_a", sha_a1, "main", &da1).unwrap();
let _s_a2 = snapshot_to_iceberg(&wh, "ws", "repo_a", sha_a2, "main", &da2).unwrap();
let s_b1 = snapshot_to_iceberg(&wh, "ws", "repo_b", sha_b1, "main", &db1).unwrap();
let out = root.path().join("out_a1");
let r = restore_from_iceberg(&wh, "repo_a", Some(sha_a1), &out).unwrap();
assert_eq!(r.snapshot_id, s_a1.snapshot_id);
assert!(out.join("a.seg").exists(), "expected a1's blob");
assert!(!out.join("x.seg").exists(), "must not bleed repo_a@sha_a2 blobs");
assert!(!out.join("b.seg").exists(), "must not bleed repo_b blobs");
let out_b = root.path().join("out_b");
let rb = restore_from_iceberg(&wh, "repo_b", None, &out_b).unwrap();
assert_eq!(rb.snapshot_id, s_b1.snapshot_id);
assert!(out_b.join("b.seg").exists());
assert!(!out_b.join("a.seg").exists());
assert!(!out_b.join("x.seg").exists());
let out_a_latest = root.path().join("out_a_latest");
let r2 = restore_from_iceberg(&wh, "repo_a", None, &out_a_latest).unwrap();
assert_eq!(r2.git_sha, sha_a2, "latest repo_a snapshot is sha_a2");
assert!(out_a_latest.join("x.seg").exists());
assert!(!out_a_latest.join("a.seg").exists());
}
#[test]
fn open_or_restore_rehydrates_empty_cache() {
use crate::index::Index;
let root = tempfile::tempdir().expect("tempdir");
let workspace = root.path();
std::fs::create_dir_all(workspace.join(".nornir/cache/index")).unwrap();
let wh = IcebergWarehouse::open(&workspace.join(".nornir/warehouse"))
.expect("open warehouse");
{
let idx = Index::open(workspace).expect("open empty");
let _ = idx.build().expect("build empty");
}
assert!(workspace.join(".nornir/cache/index/meta.json").exists());
let sha = "deadbeefdeadbeefdeadbeefdeadbeefdeadbeef";
let snap = snapshot_to_iceberg(&wh, "ws_t", "_workspace", sha, "main",
&workspace.join(".nornir/cache/index"))
.expect("snapshot");
assert!(snap.blob_count > 0);
std::fs::remove_dir_all(workspace.join(".nornir/cache/index")).unwrap();
assert!(!workspace.join(".nornir/cache/index/meta.json").exists());
let (_idx, restored) = Index::open_or_restore(workspace, &wh, "_workspace", None)
.expect("open_or_restore");
assert!(restored, "expected restore from iceberg");
assert!(workspace.join(".nornir/cache/index/meta.json").exists(),
"meta.json must be back after restore");
let (_idx, restored2) = Index::open_or_restore(workspace, &wh, "_workspace", None)
.expect("open_or_restore second");
assert!(!restored2, "second call must skip restore (cache hot)");
}
}