use std::path::{Path, PathBuf};
use std::sync::Arc;
use anyhow::{anyhow, Context, Result};
use arrow::array::{Array, Int32Array, Int64Array, LargeBinaryArray, RecordBatch, StringArray, TimestampMicrosecondArray};
use chrono::Utc;
use futures::TryStreamExt;
use iceberg::arrow::schema_to_arrow_schema;
use iceberg::Catalog;
use sha2::{Digest, Sha256};
use uuid::Uuid;
use crate::warehouse::iceberg::{
append_batch, IcebergWarehouse, TABLE_TANTIVY_INDEX_BLOBS, TABLE_TANTIVY_INDEX_SNAPSHOTS,
};
fn hex_encode(bytes: &[u8]) -> String {
const HEX: &[u8; 16] = b"0123456789abcdef";
let mut s = String::with_capacity(bytes.len() * 2);
for &b in bytes {
s.push(HEX[(b >> 4) as usize] as char);
s.push(HEX[(b & 0x0f) as usize] as char);
}
s
}
#[derive(Debug, Clone)]
pub struct SnapshotRef {
pub snapshot_id: Uuid,
pub workspace: String,
pub repo: String,
pub git_sha: String,
pub branch: String,
pub schema_hash: String,
pub blob_count: i32,
pub total_bytes: i64,
}
struct Blob {
filename: String,
bytes: Vec<u8>,
sha256: String,
}
fn collect_blobs(dir: &Path) -> Result<Vec<Blob>> {
let mut out = Vec::new();
if !dir.exists() {
return Ok(out);
}
let mut stack: Vec<PathBuf> = vec![dir.to_path_buf()];
while let Some(cur) = stack.pop() {
for entry in std::fs::read_dir(&cur)
.with_context(|| format!("read_dir {}", cur.display()))?
{
let entry = entry?;
let path = entry.path();
let meta = entry.metadata()?;
if meta.is_dir() {
stack.push(path);
} else if meta.is_file() {
let bytes = std::fs::read(&path)
.with_context(|| format!("read {}", path.display()))?;
let mut h = Sha256::new();
h.update(&bytes);
let sha256 = hex_encode(&h.finalize());
let filename = path
.strip_prefix(dir)
.unwrap_or(&path)
.to_string_lossy()
.into_owned();
out.push(Blob { filename, bytes, sha256 });
}
}
}
out.sort_by(|a, b| a.filename.cmp(&b.filename));
Ok(out)
}
fn schema_hash_of(blobs: &[Blob]) -> String {
let mut h = Sha256::new();
for b in blobs {
h.update(b.filename.as_bytes());
h.update(b":");
h.update(b.sha256.as_bytes());
h.update(b"\n");
}
hex_encode(&h.finalize())
}
async fn find_existing_snapshot(
wh: &IcebergWarehouse,
repo: &str,
git_sha: &str,
schema_hash: &str,
) -> Result<Option<Uuid>> {
let table = wh
.catalog()
.load_table(&wh.table_ident(TABLE_TANTIVY_INDEX_SNAPSHOTS))
.await?;
let scan = table.scan().build()?;
let stream = scan.to_arrow().await?;
let batches: Vec<RecordBatch> = stream.try_collect().await?;
for batch in &batches {
let ids = batch.column(0).as_any().downcast_ref::<StringArray>()
.ok_or_else(|| anyhow!("snapshot_id col not String"))?;
let repos = batch.column(2).as_any().downcast_ref::<StringArray>()
.ok_or_else(|| anyhow!("repo col not String"))?;
let shas = batch.column(3).as_any().downcast_ref::<StringArray>()
.ok_or_else(|| anyhow!("git_sha col not String"))?;
let hashes = batch.column(6).as_any().downcast_ref::<StringArray>()
.ok_or_else(|| anyhow!("schema_hash col not String"))?;
for i in 0..batch.num_rows() {
if repos.value(i) == repo
&& shas.value(i) == git_sha
&& hashes.value(i) == schema_hash
{
return Ok(Some(Uuid::parse_str(ids.value(i))?));
}
}
}
Ok(None)
}
pub fn snapshot_to_iceberg(
wh: &IcebergWarehouse,
workspace: &str,
repo: &str,
git_sha: &str,
branch: &str,
index_dir: &Path,
) -> Result<SnapshotRef> {
let blobs = collect_blobs(index_dir)
.with_context(|| format!("walk index dir {}", index_dir.display()))?;
if blobs.is_empty() {
return Err(anyhow!(
"no files under {} — nothing to snapshot",
index_dir.display()
));
}
let schema_hash = schema_hash_of(&blobs);
let blob_count: i32 = blobs.len() as i32;
let total_bytes: i64 = blobs.iter().map(|b| b.bytes.len() as i64).sum();
wh.block_on(async {
if let Some(existing) =
find_existing_snapshot(wh, repo, git_sha, &schema_hash).await?
{
return Ok(SnapshotRef {
snapshot_id: existing,
workspace: workspace.to_string(),
repo: repo.to_string(),
git_sha: git_sha.to_string(),
branch: branch.to_string(),
schema_hash,
blob_count,
total_bytes,
});
}
let snapshot_id = Uuid::new_v4();
let id_str = snapshot_id.to_string();
let ts = Utc::now();
let s_table = wh
.catalog()
.load_table(&wh.table_ident(TABLE_TANTIVY_INDEX_SNAPSHOTS))
.await?;
let s_schema = Arc::new(schema_to_arrow_schema(s_table.metadata().current_schema())?);
let s_cols: Vec<Arc<dyn Array>> = vec![
Arc::new(StringArray::from(vec![id_str.clone()])),
Arc::new(StringArray::from(vec![workspace.to_string()])),
Arc::new(StringArray::from(vec![repo.to_string()])),
Arc::new(StringArray::from(vec![git_sha.to_string()])),
Arc::new(StringArray::from(vec![branch.to_string()])),
Arc::new(
TimestampMicrosecondArray::from(vec![ts.timestamp_micros()])
.with_timezone("+00:00"),
),
Arc::new(StringArray::from(vec![schema_hash.clone()])),
Arc::new(Int32Array::from(vec![blob_count])),
Arc::new(Int64Array::from(vec![total_bytes])),
];
let s_batch = RecordBatch::try_new(s_schema, s_cols)?;
append_batch(wh.catalog(), s_table, s_batch).await?;
let b_table = wh
.catalog()
.load_table(&wh.table_ident(TABLE_TANTIVY_INDEX_BLOBS))
.await?;
let b_schema = Arc::new(schema_to_arrow_schema(b_table.metadata().current_schema())?);
let mut ids = Vec::with_capacity(blobs.len());
let mut names = Vec::with_capacity(blobs.len());
let mut payloads: Vec<&[u8]> = Vec::with_capacity(blobs.len());
let mut lens = Vec::with_capacity(blobs.len());
let mut hashes = Vec::with_capacity(blobs.len());
for b in &blobs {
ids.push(id_str.clone());
names.push(b.filename.clone());
payloads.push(b.bytes.as_slice());
lens.push(b.bytes.len() as i32);
hashes.push(b.sha256.clone());
}
let b_cols: Vec<Arc<dyn Array>> = vec![
Arc::new(StringArray::from(ids)),
Arc::new(StringArray::from(names)),
Arc::new(LargeBinaryArray::from(payloads)),
Arc::new(Int32Array::from(lens)),
Arc::new(StringArray::from(hashes)),
];
let b_batch = RecordBatch::try_new(b_schema, b_cols)?;
append_batch(wh.catalog(), b_table, b_batch).await?;
Ok(SnapshotRef {
snapshot_id,
workspace: workspace.to_string(),
repo: repo.to_string(),
git_sha: git_sha.to_string(),
branch: branch.to_string(),
schema_hash,
blob_count,
total_bytes,
})
})
}
async fn resolve_snapshot_id(
wh: &IcebergWarehouse,
repo: &str,
git_sha: Option<&str>,
) -> Result<(Uuid, String, String)> {
let table = wh
.catalog()
.load_table(&wh.table_ident(TABLE_TANTIVY_INDEX_SNAPSHOTS))
.await?;
let scan = table.scan().build()?;
let stream = scan.to_arrow().await?;
let batches: Vec<RecordBatch> = stream.try_collect().await?;
let mut candidates: Vec<(Uuid, String, i64)> = Vec::new(); for batch in &batches {
let ids = batch.column(0).as_any().downcast_ref::<StringArray>()
.ok_or_else(|| anyhow!("snapshot_id not String"))?;
let repos = batch.column(2).as_any().downcast_ref::<StringArray>()
.ok_or_else(|| anyhow!("repo not String"))?;
let shas = batch.column(3).as_any().downcast_ref::<StringArray>()
.ok_or_else(|| anyhow!("git_sha not String"))?;
let ts = batch.column(5).as_any().downcast_ref::<TimestampMicrosecondArray>()
.ok_or_else(|| anyhow!("ts not Timestamptz"))?;
for i in 0..batch.num_rows() {
if repos.value(i) != repo {
continue;
}
if let Some(want) = git_sha {
if shas.value(i) != want {
continue;
}
}
candidates.push((
Uuid::parse_str(ids.value(i))?,
shas.value(i).to_string(),
ts.value(i),
));
}
}
if candidates.is_empty() {
return Err(anyhow!(
"no tantivy_index_snapshot for repo `{repo}`{}",
git_sha
.map(|s| format!(" at sha {s}"))
.unwrap_or_default()
));
}
candidates.sort_by(|a, b| a.2.cmp(&b.2));
let chosen = candidates.pop().unwrap();
Ok((chosen.0, chosen.1, format!("ts_micros={}", chosen.2)))
}
pub fn restore_from_iceberg(
wh: &IcebergWarehouse,
repo: &str,
git_sha: Option<&str>,
into: &Path,
) -> Result<SnapshotRef> {
std::fs::create_dir_all(into)
.with_context(|| format!("create restore dir {}", into.display()))?;
wh.block_on(async {
let (snapshot_id, sha, _) = resolve_snapshot_id(wh, repo, git_sha).await?;
let s_table = wh
.catalog()
.load_table(&wh.table_ident(TABLE_TANTIVY_INDEX_SNAPSHOTS))
.await?;
let scan = s_table.scan().build()?;
let stream = scan.to_arrow().await?;
let s_batches: Vec<RecordBatch> = stream.try_collect().await?;
let id_str = snapshot_id.to_string();
let mut meta: Option<SnapshotRef> = None;
'outer: for batch in &s_batches {
let ids = batch.column(0).as_any().downcast_ref::<StringArray>().unwrap();
let wss = batch.column(1).as_any().downcast_ref::<StringArray>().unwrap();
let reps = batch.column(2).as_any().downcast_ref::<StringArray>().unwrap();
let shas = batch.column(3).as_any().downcast_ref::<StringArray>().unwrap();
let brs = batch.column(4).as_any().downcast_ref::<StringArray>().unwrap();
let hashes = batch.column(6).as_any().downcast_ref::<StringArray>().unwrap();
let counts = batch.column(7).as_any().downcast_ref::<Int32Array>().unwrap();
let totals = batch.column(8).as_any().downcast_ref::<Int64Array>().unwrap();
for i in 0..batch.num_rows() {
if ids.value(i) == id_str {
meta = Some(SnapshotRef {
snapshot_id,
workspace: wss.value(i).to_string(),
repo: reps.value(i).to_string(),
git_sha: shas.value(i).to_string(),
branch: brs.value(i).to_string(),
schema_hash: hashes.value(i).to_string(),
blob_count: counts.value(i),
total_bytes: totals.value(i),
});
break 'outer;
}
}
}
let meta = meta.ok_or_else(|| anyhow!("snapshot {snapshot_id} metadata vanished"))?;
let b_table = wh
.catalog()
.load_table(&wh.table_ident(TABLE_TANTIVY_INDEX_BLOBS))
.await?;
let scan = b_table.scan().build()?;
let stream = scan.to_arrow().await?;
let b_batches: Vec<RecordBatch> = stream.try_collect().await?;
let mut written = 0usize;
for batch in &b_batches {
let ids = batch.column(0).as_any().downcast_ref::<StringArray>().unwrap();
let names = batch.column(1).as_any().downcast_ref::<StringArray>().unwrap();
let bytes = batch.column(2).as_any().downcast_ref::<LargeBinaryArray>().unwrap();
for i in 0..batch.num_rows() {
if ids.value(i) != id_str {
continue;
}
let rel = names.value(i);
let path = into.join(rel);
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent).ok();
}
std::fs::write(&path, bytes.value(i))
.with_context(|| format!("write {}", path.display()))?;
written += 1;
}
}
if written == 0 {
return Err(anyhow!(
"snapshot {snapshot_id} for sha {sha} has 0 blob rows — table corrupt?"
));
}
Ok(meta)
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::warehouse::iceberg::IcebergWarehouse;
#[test]
fn round_trip_snapshot_then_restore() {
let root = tempfile::tempdir().expect("tempdir");
let warehouse_dir = root.path().join("warehouse");
let src_dir = root.path().join("src_index");
let dst_dir = root.path().join("restored_index");
std::fs::create_dir_all(&src_dir).unwrap();
let payloads: Vec<(&str, Vec<u8>)> = vec![
("meta.json", br#"{"segments":[]}"#.to_vec()),
("00000000000000000000000000000001.term", (0u8..=255).collect()),
("00000000000000000000000000000001.idx", vec![0xDEu8; 4096]),
("00000000000000000000000000000001.pos", vec![0xABu8; 1024]),
];
for (name, bytes) in &payloads {
std::fs::write(src_dir.join(name), bytes).unwrap();
}
let wh = IcebergWarehouse::open(&warehouse_dir).expect("open warehouse");
let sha = "0000000000000000000000000000000000000000";
let snap1 = snapshot_to_iceberg(&wh, "ws_test", "repo_test", sha, "main", &src_dir)
.expect("snapshot1");
assert_eq!(snap1.blob_count, 4);
assert_eq!(snap1.total_bytes, payloads.iter().map(|(_, b)| b.len() as i64).sum::<i64>());
let snap2 = snapshot_to_iceberg(&wh, "ws_test", "repo_test", sha, "main", &src_dir)
.expect("snapshot2");
assert_eq!(snap1.snapshot_id, snap2.snapshot_id, "dedup must return same UUID");
let restored = restore_from_iceberg(&wh, "repo_test", Some(sha), &dst_dir)
.expect("restore");
assert_eq!(restored.snapshot_id, snap1.snapshot_id);
for (name, bytes) in &payloads {
let got = std::fs::read(dst_dir.join(name)).expect("read restored");
assert_eq!(&got, bytes, "blob `{name}` mismatch after restore");
}
let dst2 = root.path().join("restored_index2");
let restored2 = restore_from_iceberg(&wh, "repo_test", None, &dst2).expect("restore latest");
assert_eq!(restored2.snapshot_id, snap1.snapshot_id);
}
#[test]
fn open_or_restore_rehydrates_empty_cache() {
use crate::index::Index;
let root = tempfile::tempdir().expect("tempdir");
let workspace = root.path();
std::fs::create_dir_all(workspace.join(".nornir/cache/index")).unwrap();
let wh = IcebergWarehouse::open(&workspace.join(".nornir/warehouse"))
.expect("open warehouse");
{
let idx = Index::open(workspace).expect("open empty");
let _ = idx.build().expect("build empty");
}
assert!(workspace.join(".nornir/cache/index/meta.json").exists());
let sha = "deadbeefdeadbeefdeadbeefdeadbeefdeadbeef";
let snap = snapshot_to_iceberg(&wh, "ws_t", "_workspace", sha, "main",
&workspace.join(".nornir/cache/index"))
.expect("snapshot");
assert!(snap.blob_count > 0);
std::fs::remove_dir_all(workspace.join(".nornir/cache/index")).unwrap();
assert!(!workspace.join(".nornir/cache/index/meta.json").exists());
let (_idx, restored) = Index::open_or_restore(workspace, &wh, "_workspace", None)
.expect("open_or_restore");
assert!(restored, "expected restore from iceberg");
assert!(workspace.join(".nornir/cache/index/meta.json").exists(),
"meta.json must be back after restore");
let (_idx, restored2) = Index::open_or_restore(workspace, &wh, "_workspace", None)
.expect("open_or_restore second");
assert!(!restored2, "second call must skip restore (cache hot)");
}
}