use std::sync::Arc;
use anyhow::{anyhow, Result};
use arrow::array::{
Array, Int64Array, LargeBinaryArray, RecordBatch, StringArray, TimestampMicrosecondArray,
};
use chrono::{DateTime, Utc};
use futures::TryStreamExt;
use iceberg::arrow::schema_to_arrow_schema;
use iceberg::expr::Reference;
use iceberg::spec::Datum;
use iceberg::Catalog;
use sha2::{Digest, Sha256};
use uuid::Uuid;
use crate::warehouse::iceberg::{append_batch, IcebergWarehouse, TABLE_DOC_EXPORTS};
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct DocExport {
pub export_id: String,
pub workspace: String,
pub repo: String,
pub doc_name: String,
pub version: String,
pub format: String,
pub git_sha: String,
pub sha256: String,
pub byte_len: i64,
pub generated_at: String,
}
#[derive(Debug, Default, Clone)]
pub struct ExportFilter {
pub doc_name: Option<String>,
pub version: Option<String>,
pub format: Option<String>,
pub limit: Option<usize>,
}
fn sha256_hex(bytes: &[u8]) -> String {
let mut h = Sha256::new();
h.update(bytes);
let out = h.finalize();
let mut s = String::with_capacity(out.len() * 2);
for b in out {
use std::fmt::Write;
let _ = write!(s, "{b:02x}");
}
s
}
fn col<'a, T: 'static>(batch: &'a RecordBatch, name: &str) -> Result<&'a T> {
batch
.column_by_name(name)
.ok_or_else(|| anyhow!("projected batch missing column `{name}`"))?
.as_any()
.downcast_ref::<T>()
.ok_or_else(|| anyhow!("column `{name}` has unexpected arrow type"))
}
fn rfc3339(ts_micros: i64) -> String {
DateTime::<Utc>::from_timestamp_micros(ts_micros)
.unwrap_or_else(Utc::now)
.to_rfc3339()
}
pub async fn record_doc_export_async(
wh: &IcebergWarehouse,
workspace: &str,
repo: &str,
doc_name: &str,
version: &str,
format: &str,
git_sha: &str,
bytes: &[u8],
) -> Result<DocExport> {
let sha256 = sha256_hex(bytes);
if let Some(existing) =
find_existing(wh, repo, doc_name, version, format, &sha256).await?
{
return Ok(existing);
}
let export_id = Uuid::new_v4().to_string();
let ts = Utc::now();
let byte_len = bytes.len() as i64;
let table = wh
.catalog()
.load_table(&wh.table_ident(TABLE_DOC_EXPORTS))
.await?;
let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
let cols: Vec<Arc<dyn Array>> = vec![
Arc::new(StringArray::from(vec![export_id.clone()])),
Arc::new(StringArray::from(vec![workspace.to_string()])),
Arc::new(StringArray::from(vec![repo.to_string()])),
Arc::new(StringArray::from(vec![doc_name.to_string()])),
Arc::new(StringArray::from(vec![version.to_string()])),
Arc::new(StringArray::from(vec![format.to_string()])),
Arc::new(StringArray::from(vec![git_sha.to_string()])),
Arc::new(
TimestampMicrosecondArray::from(vec![ts.timestamp_micros()])
.with_timezone("+00:00"),
),
Arc::new(StringArray::from(vec![sha256.clone()])),
Arc::new(Int64Array::from(vec![byte_len])),
Arc::new(LargeBinaryArray::from(vec![bytes])),
];
let batch = RecordBatch::try_new(schema, cols)?;
append_batch(wh.catalog(), table, batch).await?;
Ok(DocExport {
export_id,
workspace: workspace.to_string(),
repo: repo.to_string(),
doc_name: doc_name.to_string(),
version: version.to_string(),
format: format.to_string(),
git_sha: git_sha.to_string(),
sha256,
byte_len,
generated_at: ts.to_rfc3339(),
})
}
pub fn record_doc_export(
wh: &IcebergWarehouse,
workspace: &str,
repo: &str,
doc_name: &str,
version: &str,
format: &str,
git_sha: &str,
bytes: &[u8],
) -> Result<DocExport> {
wh.block_on(record_doc_export_async(
wh, workspace, repo, doc_name, version, format, git_sha, bytes,
))
}
async fn find_existing(
wh: &IcebergWarehouse,
repo: &str,
doc_name: &str,
version: &str,
format: &str,
sha256: &str,
) -> Result<Option<DocExport>> {
let table = wh
.catalog()
.load_table(&wh.table_ident(TABLE_DOC_EXPORTS))
.await?;
let predicate = Reference::new("repo")
.equal_to(Datum::string(repo))
.and(Reference::new("doc_name").equal_to(Datum::string(doc_name)))
.and(Reference::new("version").equal_to(Datum::string(version)))
.and(Reference::new("format").equal_to(Datum::string(format)))
.and(Reference::new("sha256").equal_to(Datum::string(sha256)));
let scan = table
.scan()
.with_filter(predicate)
.select([
"export_id", "workspace", "repo", "doc_name", "version", "format", "git_sha",
"ts_micros", "sha256", "byte_len",
])
.build()?;
let batches: Vec<RecordBatch> = scan.to_arrow().await?.try_collect().await?;
for batch in &batches {
let rows = rows_from_batch(batch)?;
for r in rows {
if r.repo == repo
&& r.doc_name == doc_name
&& r.version == version
&& r.format == format
&& r.sha256 == sha256
{
return Ok(Some(r));
}
}
}
Ok(None)
}
pub async fn list_doc_exports_async(
wh: &IcebergWarehouse,
repo: &str,
filter: &ExportFilter,
) -> Result<Vec<DocExport>> {
let table = wh
.catalog()
.load_table(&wh.table_ident(TABLE_DOC_EXPORTS))
.await?;
let mut predicate = Reference::new("repo").equal_to(Datum::string(repo));
if let Some(d) = &filter.doc_name {
predicate = predicate.and(Reference::new("doc_name").equal_to(Datum::string(d)));
}
if let Some(v) = &filter.version {
predicate = predicate.and(Reference::new("version").equal_to(Datum::string(v)));
}
if let Some(f) = &filter.format {
predicate = predicate.and(Reference::new("format").equal_to(Datum::string(f)));
}
let scan = table
.scan()
.with_filter(predicate)
.select([
"export_id", "workspace", "repo", "doc_name", "version", "format", "git_sha",
"ts_micros", "sha256", "byte_len",
])
.build()?;
let batches: Vec<RecordBatch> = scan.to_arrow().await?.try_collect().await?;
let mut out: Vec<(i64, DocExport)> = Vec::new();
for batch in &batches {
let ts = col::<TimestampMicrosecondArray>(batch, "ts_micros")?;
for (i, r) in rows_from_batch(batch)?.into_iter().enumerate() {
if r.repo != repo {
continue;
}
if filter.doc_name.as_deref().is_some_and(|d| d != r.doc_name) {
continue;
}
if filter.version.as_deref().is_some_and(|v| v != r.version) {
continue;
}
if filter.format.as_deref().is_some_and(|f| f != r.format) {
continue;
}
out.push((ts.value(i), r));
}
}
out.sort_by(|a, b| b.0.cmp(&a.0));
let mut rows: Vec<DocExport> = out.into_iter().map(|(_, r)| r).collect();
if let Some(n) = filter.limit {
rows.truncate(n);
}
Ok(rows)
}
pub fn list_doc_exports(
wh: &IcebergWarehouse,
repo: &str,
filter: &ExportFilter,
) -> Result<Vec<DocExport>> {
wh.block_on(list_doc_exports_async(wh, repo, filter))
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq)]
pub struct DocHistoryRow {
pub exported_at: String,
pub doc: String,
pub version: String,
pub format: String,
pub size_bytes: i64,
}
impl DocHistoryRow {
pub fn from_record(r: &DocExport) -> Self {
Self {
exported_at: r.generated_at.clone(),
doc: r.doc_name.clone(),
version: r.version.clone(),
format: r.format.clone(),
size_bytes: r.byte_len,
}
}
}
pub fn history_outcome(repo: &str, rows: Vec<DocHistoryRow>) -> crate::cli_outcome::CommandOutcome {
use crate::cli_outcome::CommandOutcome;
if rows.is_empty() {
return CommandOutcome::fail(
"docs history",
format!("no exports historized for {repo}"),
);
}
let mut human = format!(
"{:<20} {:<10} {:<9} {:<7} {:>9}",
"exported_at", "doc", "version", "format", "bytes"
);
for r in &rows {
let stamp = r.exported_at.get(0..19).unwrap_or(&r.exported_at);
human.push_str(&format!(
"\n{:<20} {:<10} {:<9} {:<7} {:>9}",
stamp, r.doc, r.version, r.format, r.size_bytes
));
}
human.push_str(&format!(
"\n({} row{})",
rows.len(),
if rows.len() == 1 { "" } else { "s" }
));
let data = serde_json::json!({ "repo": repo, "exports": rows });
CommandOutcome::ok("docs history", data, human)
}
fn rows_from_batch(batch: &RecordBatch) -> Result<Vec<DocExport>> {
let ids = col::<StringArray>(batch, "export_id")?;
let workspaces = col::<StringArray>(batch, "workspace")?;
let repos = col::<StringArray>(batch, "repo")?;
let docs = col::<StringArray>(batch, "doc_name")?;
let versions = col::<StringArray>(batch, "version")?;
let formats = col::<StringArray>(batch, "format")?;
let shas = col::<StringArray>(batch, "git_sha")?;
let ts = col::<TimestampMicrosecondArray>(batch, "ts_micros")?;
let hashes = col::<StringArray>(batch, "sha256")?;
let lens = col::<Int64Array>(batch, "byte_len")?;
let mut out = Vec::with_capacity(batch.num_rows());
for i in 0..batch.num_rows() {
out.push(DocExport {
export_id: ids.value(i).to_string(),
workspace: workspaces.value(i).to_string(),
repo: repos.value(i).to_string(),
doc_name: docs.value(i).to_string(),
version: versions.value(i).to_string(),
format: formats.value(i).to_string(),
git_sha: shas.value(i).to_string(),
sha256: hashes.value(i).to_string(),
byte_len: lens.value(i),
generated_at: rfc3339(ts.value(i)),
});
}
Ok(out)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::warehouse::iceberg::IcebergWarehouse;
fn wh() -> (tempfile::TempDir, IcebergWarehouse) {
let dir = tempfile::tempdir().unwrap();
let wh = IcebergWarehouse::open(dir.path()).unwrap();
(dir, wh)
}
#[test]
fn record_then_list() {
let (_d, wh) = wh();
let rec = record_doc_export(&wh, "ws", "demo", "README", "0.1.0", "pdf", "abc123", b"hello")
.unwrap();
assert_eq!(rec.byte_len, 5);
assert_eq!(rec.format, "pdf");
let rows = list_doc_exports(&wh, "demo", &ExportFilter::default()).unwrap();
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].doc_name, "README");
assert_eq!(rows[0].git_sha, "abc123");
}
#[test]
fn dedup_skips_identical_content() {
let (_d, wh) = wh();
let a = record_doc_export(&wh, "ws", "demo", "README", "0.1.0", "pdf", "s", b"same").unwrap();
let b = record_doc_export(&wh, "ws", "demo", "README", "0.1.0", "pdf", "s", b"same").unwrap();
assert_eq!(a.export_id, b.export_id, "identical re-export reuses the row");
let rows = list_doc_exports(&wh, "demo", &ExportFilter::default()).unwrap();
assert_eq!(rows.len(), 1);
}
#[test]
fn different_content_makes_new_row() {
let (_d, wh) = wh();
record_doc_export(&wh, "ws", "demo", "README", "0.1.0", "pdf", "s", b"v1").unwrap();
record_doc_export(&wh, "ws", "demo", "README", "0.1.0", "pdf", "s", b"v2").unwrap();
let rows = list_doc_exports(&wh, "demo", &ExportFilter::default()).unwrap();
assert_eq!(rows.len(), 2, "different bytes => new row");
}
#[test]
fn filter_by_format_and_limit() {
let (_d, wh) = wh();
record_doc_export(&wh, "ws", "demo", "README", "0.1.0", "pdf", "s", b"p1").unwrap();
record_doc_export(&wh, "ws", "demo", "README", "0.1.0", "html", "s", b"h1").unwrap();
record_doc_export(&wh, "ws", "demo", "README", "0.2.0", "pdf", "s", b"p2").unwrap();
let pdfs = list_doc_exports(
&wh,
"demo",
&ExportFilter { format: Some("pdf".into()), ..Default::default() },
)
.unwrap();
assert_eq!(pdfs.len(), 2);
let one = list_doc_exports(
&wh,
"demo",
&ExportFilter { limit: Some(1), ..Default::default() },
)
.unwrap();
assert_eq!(one.len(), 1);
}
fn record(doc: &str, ver: &str, fmt: &str, bytes: i64, at: &str) -> DocExport {
DocExport {
export_id: format!("id-{doc}-{ver}-{fmt}"),
workspace: "ws".into(),
repo: "demo".into(),
doc_name: doc.into(),
version: ver.into(),
format: fmt.into(),
git_sha: "deadbeefcafef00d".into(),
sha256: "0123456789abcdef".into(),
byte_len: bytes,
generated_at: at.into(),
}
}
#[test]
fn docs_history_empty_is_red_not_silently_green() {
let o = history_outcome("demo", Vec::new());
assert_eq!(o.command, "docs history");
assert!(!o.is_sannr(), "empty docs history must be RED");
assert!(o.human.contains("no exports historized"));
assert_eq!(o.data, serde_json::Value::Null);
}
#[test]
fn docs_history_real_rows_are_sannr_with_shared_shape() {
let rows = vec![
DocHistoryRow::from_record(&record("README", "0.2.0", "pdf", 42, "2026-06-23T10:00:00Z")),
DocHistoryRow::from_record(&record("CHANGELOG", "0.2.0", "md", 17, "2026-06-23T11:00:00Z")),
];
let o = history_outcome("demo", rows);
assert!(o.is_sannr(), "two real exports => sannr");
assert_eq!(o.data["repo"], serde_json::json!("demo"));
let arr = o.data["exports"].as_array().unwrap();
assert_eq!(arr.len(), 2);
let obj = arr[0].as_object().unwrap();
let mut keys: Vec<&str> = obj.keys().map(String::as_str).collect();
keys.sort_unstable();
assert_eq!(keys, ["doc", "exported_at", "format", "size_bytes", "version"]);
for absent in ["sha256", "git_sha", "export_id", "path"] {
assert!(!obj.contains_key(absent), "fat-only/thin-only `{absent}` must not leak into shared shape");
}
assert_eq!(arr[0]["doc"], serde_json::json!("README"));
assert_eq!(arr[0]["size_bytes"], serde_json::json!(42));
assert!(o.human.contains("README"));
assert!(o.human.contains("exported_at"), "shared human header");
}
#[test]
fn fat_record_projects_to_shared_row() {
let rec = record("README", "1.0.0", "html", 99, "2026-06-23T09:00:00Z");
let row = DocHistoryRow::from_record(&rec);
assert_eq!(row.exported_at, "2026-06-23T09:00:00Z"); assert_eq!(row.doc, "README"); assert_eq!(row.size_bytes, 99); assert_eq!(row.version, "1.0.0");
assert_eq!(row.format, "html");
}
}