use std::sync::Arc;
use anyhow::{anyhow, Result};
use arrow::array::{
Array, Int64Array, LargeBinaryArray, RecordBatch, StringArray, TimestampMicrosecondArray,
};
use chrono::{DateTime, Utc};
use futures::TryStreamExt;
use iceberg::arrow::schema_to_arrow_schema;
use iceberg::expr::Reference;
use iceberg::spec::Datum;
use iceberg::Catalog;
use sha2::{Digest, Sha256};
use uuid::Uuid;
use crate::warehouse::iceberg::{append_batch, IcebergWarehouse, TABLE_DOC_EXPORTS};
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct DocExport {
pub export_id: String,
pub workspace: String,
pub repo: String,
pub doc_name: String,
pub version: String,
pub format: String,
pub git_sha: String,
pub sha256: String,
pub byte_len: i64,
pub generated_at: String,
}
#[derive(Debug, Default, Clone)]
pub struct ExportFilter {
pub doc_name: Option<String>,
pub version: Option<String>,
pub format: Option<String>,
pub limit: Option<usize>,
}
fn sha256_hex(bytes: &[u8]) -> String {
let mut h = Sha256::new();
h.update(bytes);
let out = h.finalize();
let mut s = String::with_capacity(out.len() * 2);
for b in out {
use std::fmt::Write;
let _ = write!(s, "{b:02x}");
}
s
}
fn col<'a, T: 'static>(batch: &'a RecordBatch, name: &str) -> Result<&'a T> {
batch
.column_by_name(name)
.ok_or_else(|| anyhow!("projected batch missing column `{name}`"))?
.as_any()
.downcast_ref::<T>()
.ok_or_else(|| anyhow!("column `{name}` has unexpected arrow type"))
}
fn rfc3339(ts_micros: i64) -> String {
DateTime::<Utc>::from_timestamp_micros(ts_micros)
.unwrap_or_else(Utc::now)
.to_rfc3339()
}
pub async fn record_doc_export_async(
wh: &IcebergWarehouse,
workspace: &str,
repo: &str,
doc_name: &str,
version: &str,
format: &str,
git_sha: &str,
bytes: &[u8],
) -> Result<DocExport> {
let sha256 = sha256_hex(bytes);
if let Some(existing) =
find_existing(wh, repo, doc_name, version, format, &sha256).await?
{
return Ok(existing);
}
let export_id = Uuid::new_v4().to_string();
let ts = Utc::now();
let byte_len = bytes.len() as i64;
let table = wh
.catalog()
.load_table(&wh.table_ident(TABLE_DOC_EXPORTS))
.await?;
let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
let cols: Vec<Arc<dyn Array>> = vec![
Arc::new(StringArray::from(vec![export_id.clone()])),
Arc::new(StringArray::from(vec![workspace.to_string()])),
Arc::new(StringArray::from(vec![repo.to_string()])),
Arc::new(StringArray::from(vec![doc_name.to_string()])),
Arc::new(StringArray::from(vec![version.to_string()])),
Arc::new(StringArray::from(vec![format.to_string()])),
Arc::new(StringArray::from(vec![git_sha.to_string()])),
Arc::new(
TimestampMicrosecondArray::from(vec![ts.timestamp_micros()])
.with_timezone("+00:00"),
),
Arc::new(StringArray::from(vec![sha256.clone()])),
Arc::new(Int64Array::from(vec![byte_len])),
Arc::new(LargeBinaryArray::from(vec![bytes])),
];
let batch = RecordBatch::try_new(schema, cols)?;
append_batch(wh.catalog(), table, batch).await?;
Ok(DocExport {
export_id,
workspace: workspace.to_string(),
repo: repo.to_string(),
doc_name: doc_name.to_string(),
version: version.to_string(),
format: format.to_string(),
git_sha: git_sha.to_string(),
sha256,
byte_len,
generated_at: ts.to_rfc3339(),
})
}
pub fn record_doc_export(
wh: &IcebergWarehouse,
workspace: &str,
repo: &str,
doc_name: &str,
version: &str,
format: &str,
git_sha: &str,
bytes: &[u8],
) -> Result<DocExport> {
wh.block_on(record_doc_export_async(
wh, workspace, repo, doc_name, version, format, git_sha, bytes,
))
}
async fn find_existing(
wh: &IcebergWarehouse,
repo: &str,
doc_name: &str,
version: &str,
format: &str,
sha256: &str,
) -> Result<Option<DocExport>> {
let table = wh
.catalog()
.load_table(&wh.table_ident(TABLE_DOC_EXPORTS))
.await?;
let predicate = Reference::new("repo")
.equal_to(Datum::string(repo))
.and(Reference::new("doc_name").equal_to(Datum::string(doc_name)))
.and(Reference::new("version").equal_to(Datum::string(version)))
.and(Reference::new("format").equal_to(Datum::string(format)))
.and(Reference::new("sha256").equal_to(Datum::string(sha256)));
let scan = table
.scan()
.with_filter(predicate)
.select([
"export_id", "workspace", "repo", "doc_name", "version", "format", "git_sha",
"ts_micros", "sha256", "byte_len",
])
.build()?;
let batches: Vec<RecordBatch> = scan.to_arrow().await?.try_collect().await?;
for batch in &batches {
let rows = rows_from_batch(batch)?;
for r in rows {
if r.repo == repo
&& r.doc_name == doc_name
&& r.version == version
&& r.format == format
&& r.sha256 == sha256
{
return Ok(Some(r));
}
}
}
Ok(None)
}
pub async fn list_doc_exports_async(
wh: &IcebergWarehouse,
repo: &str,
filter: &ExportFilter,
) -> Result<Vec<DocExport>> {
let table = wh
.catalog()
.load_table(&wh.table_ident(TABLE_DOC_EXPORTS))
.await?;
let mut predicate = Reference::new("repo").equal_to(Datum::string(repo));
if let Some(d) = &filter.doc_name {
predicate = predicate.and(Reference::new("doc_name").equal_to(Datum::string(d)));
}
if let Some(v) = &filter.version {
predicate = predicate.and(Reference::new("version").equal_to(Datum::string(v)));
}
if let Some(f) = &filter.format {
predicate = predicate.and(Reference::new("format").equal_to(Datum::string(f)));
}
let scan = table
.scan()
.with_filter(predicate)
.select([
"export_id", "workspace", "repo", "doc_name", "version", "format", "git_sha",
"ts_micros", "sha256", "byte_len",
])
.build()?;
let batches: Vec<RecordBatch> = scan.to_arrow().await?.try_collect().await?;
let mut out: Vec<(i64, DocExport)> = Vec::new();
for batch in &batches {
let ts = col::<TimestampMicrosecondArray>(batch, "ts_micros")?;
for (i, r) in rows_from_batch(batch)?.into_iter().enumerate() {
if r.repo != repo {
continue;
}
if filter.doc_name.as_deref().is_some_and(|d| d != r.doc_name) {
continue;
}
if filter.version.as_deref().is_some_and(|v| v != r.version) {
continue;
}
if filter.format.as_deref().is_some_and(|f| f != r.format) {
continue;
}
out.push((ts.value(i), r));
}
}
out.sort_by(|a, b| b.0.cmp(&a.0));
let mut rows: Vec<DocExport> = out.into_iter().map(|(_, r)| r).collect();
if let Some(n) = filter.limit {
rows.truncate(n);
}
Ok(rows)
}
pub fn list_doc_exports(
wh: &IcebergWarehouse,
repo: &str,
filter: &ExportFilter,
) -> Result<Vec<DocExport>> {
wh.block_on(list_doc_exports_async(wh, repo, filter))
}
fn rows_from_batch(batch: &RecordBatch) -> Result<Vec<DocExport>> {
let ids = col::<StringArray>(batch, "export_id")?;
let workspaces = col::<StringArray>(batch, "workspace")?;
let repos = col::<StringArray>(batch, "repo")?;
let docs = col::<StringArray>(batch, "doc_name")?;
let versions = col::<StringArray>(batch, "version")?;
let formats = col::<StringArray>(batch, "format")?;
let shas = col::<StringArray>(batch, "git_sha")?;
let ts = col::<TimestampMicrosecondArray>(batch, "ts_micros")?;
let hashes = col::<StringArray>(batch, "sha256")?;
let lens = col::<Int64Array>(batch, "byte_len")?;
let mut out = Vec::with_capacity(batch.num_rows());
for i in 0..batch.num_rows() {
out.push(DocExport {
export_id: ids.value(i).to_string(),
workspace: workspaces.value(i).to_string(),
repo: repos.value(i).to_string(),
doc_name: docs.value(i).to_string(),
version: versions.value(i).to_string(),
format: formats.value(i).to_string(),
git_sha: shas.value(i).to_string(),
sha256: hashes.value(i).to_string(),
byte_len: lens.value(i),
generated_at: rfc3339(ts.value(i)),
});
}
Ok(out)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::warehouse::iceberg::IcebergWarehouse;
fn wh() -> (tempfile::TempDir, IcebergWarehouse) {
let dir = tempfile::tempdir().unwrap();
let wh = IcebergWarehouse::open(dir.path()).unwrap();
(dir, wh)
}
#[test]
fn record_then_list() {
let (_d, wh) = wh();
let rec = record_doc_export(&wh, "ws", "demo", "README", "0.1.0", "pdf", "abc123", b"hello")
.unwrap();
assert_eq!(rec.byte_len, 5);
assert_eq!(rec.format, "pdf");
let rows = list_doc_exports(&wh, "demo", &ExportFilter::default()).unwrap();
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].doc_name, "README");
assert_eq!(rows[0].git_sha, "abc123");
}
#[test]
fn dedup_skips_identical_content() {
let (_d, wh) = wh();
let a = record_doc_export(&wh, "ws", "demo", "README", "0.1.0", "pdf", "s", b"same").unwrap();
let b = record_doc_export(&wh, "ws", "demo", "README", "0.1.0", "pdf", "s", b"same").unwrap();
assert_eq!(a.export_id, b.export_id, "identical re-export reuses the row");
let rows = list_doc_exports(&wh, "demo", &ExportFilter::default()).unwrap();
assert_eq!(rows.len(), 1);
}
#[test]
fn different_content_makes_new_row() {
let (_d, wh) = wh();
record_doc_export(&wh, "ws", "demo", "README", "0.1.0", "pdf", "s", b"v1").unwrap();
record_doc_export(&wh, "ws", "demo", "README", "0.1.0", "pdf", "s", b"v2").unwrap();
let rows = list_doc_exports(&wh, "demo", &ExportFilter::default()).unwrap();
assert_eq!(rows.len(), 2, "different bytes => new row");
}
#[test]
fn filter_by_format_and_limit() {
let (_d, wh) = wh();
record_doc_export(&wh, "ws", "demo", "README", "0.1.0", "pdf", "s", b"p1").unwrap();
record_doc_export(&wh, "ws", "demo", "README", "0.1.0", "html", "s", b"h1").unwrap();
record_doc_export(&wh, "ws", "demo", "README", "0.2.0", "pdf", "s", b"p2").unwrap();
let pdfs = list_doc_exports(
&wh,
"demo",
&ExportFilter { format: Some("pdf".into()), ..Default::default() },
)
.unwrap();
assert_eq!(pdfs.len(), 2);
let one = list_doc_exports(
&wh,
"demo",
&ExportFilter { limit: Some(1), ..Default::default() },
)
.unwrap();
assert_eq!(one.len(), 1);
}
}