use std::sync::Arc;
use std::sync::atomic::{AtomicI64, Ordering};
use anyhow::{anyhow, Result};
use arrow::array::{
Array, BooleanArray, Int64Array, RecordBatch, StringArray, TimestampMicrosecondArray,
};
use iceberg::Catalog;
use iceberg::arrow::schema_to_arrow_schema;
use sha2::{Digest, Sha256};
use uuid::Uuid;
use super::iceberg::{IcebergWarehouse, TABLE_RELEASE_CHANGES, append_batch};
const COL_RELEASE_ID: usize = 0;
const COL_SEQ: usize = 1;
const COL_TS_MICROS: usize = 2;
const COL_KIND: usize = 3;
const COL_REPO: usize = 4;
const COL_CHANGE_SHA: usize = 5;
const COL_CRATE_NAME: usize = 6;
const COL_OLD_VERSION: usize = 7;
const COL_NEW_VERSION: usize = 8;
const COL_FILE: usize = 9;
const COL_OLD_REQ_LINES: usize = 10;
const COL_REMOVED_BLOCK: usize = 11;
const COL_BRANCH: usize = 12;
const COL_PREV_HEAD: usize = 13;
const COL_REGISTRY: usize = 14;
const COL_IMMUTABLE: usize = 15;
pub mod kind {
pub const BUMP: &str = "bump";
pub const PATCH_STRIP: &str = "patch_strip";
pub const BRANCH: &str = "branch";
pub const PUBLISH: &str = "publish";
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum ChangeKind {
Bump {
crate_name: String,
old_version: String,
new_version: String,
file: String,
old_file_text: String,
},
PatchStrip { file: String, removed_block: String },
Branch { repo: String, branch: String, prev_head: String },
Publish { crate_name: String, version: String, registry: String, immutable: bool },
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct ReleaseChange {
pub release_id: String,
pub seq: i64,
pub ts_micros: i64,
pub repo: String,
pub change_sha: String,
pub change: ChangeKind,
}
impl ReleaseChange {
fn key(&self) -> (String, i64) {
(self.release_id.clone(), self.seq)
}
}
#[derive(Clone)]
pub struct ChangeRecorder {
release_id: String,
seq: Arc<AtomicI64>,
}
impl ChangeRecorder {
pub fn new(release_id: impl Into<String>) -> Self {
Self { release_id: release_id.into(), seq: Arc::new(AtomicI64::new(0)) }
}
pub fn release_id(&self) -> &str {
&self.release_id
}
pub fn build(&self, repo: &str, change: ChangeKind) -> ReleaseChange {
let seq = self.seq.fetch_add(1, Ordering::SeqCst);
let change_sha = change_sha(&change);
ReleaseChange {
release_id: self.release_id.clone(),
seq,
ts_micros: chrono::Utc::now().timestamp_micros(),
repo: repo.to_string(),
change_sha,
change,
}
}
pub fn record(&self, wh: &IcebergWarehouse, repo: &str, change: ChangeKind) -> Result<()> {
let row = self.build(repo, change);
wh.block_on(append_release_changes(wh, std::slice::from_ref(&row)))
}
}
fn change_sha(change: &ChangeKind) -> String {
let mut h = Sha256::new();
match change {
ChangeKind::Bump { crate_name, old_version, new_version, file, old_file_text } => {
h.update(b"bump\0");
for p in [crate_name, old_version, new_version, file, old_file_text] {
h.update(p.as_bytes());
h.update(b"\0");
}
}
ChangeKind::PatchStrip { file, removed_block } => {
h.update(b"patch_strip\0");
h.update(file.as_bytes());
h.update(b"\0");
h.update(removed_block.as_bytes());
}
ChangeKind::Branch { repo, branch, prev_head } => {
h.update(b"branch\0");
for p in [repo, branch, prev_head] {
h.update(p.as_bytes());
h.update(b"\0");
}
}
ChangeKind::Publish { crate_name, version, registry, immutable } => {
h.update(b"publish\0");
for p in [crate_name, version, registry] {
h.update(p.as_bytes());
h.update(b"\0");
}
h.update(if *immutable { b"1" } else { b"0" });
}
}
format!("{:x}", h.finalize())
}
pub async fn append_release_changes(wh: &IcebergWarehouse, rows: &[ReleaseChange]) -> Result<()> {
if rows.is_empty() {
return Ok(());
}
let table = wh.catalog().load_table(&wh.table_ident(TABLE_RELEASE_CHANGES)).await?;
let arrow_schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
let kind_of = |c: &ChangeKind| match c {
ChangeKind::Bump { .. } => kind::BUMP,
ChangeKind::PatchStrip { .. } => kind::PATCH_STRIP,
ChangeKind::Branch { .. } => kind::BRANCH,
ChangeKind::Publish { .. } => kind::PUBLISH,
};
let crate_name = |c: &ChangeKind| match c {
ChangeKind::Bump { crate_name, .. } | ChangeKind::Publish { crate_name, .. } => {
Some(crate_name.clone())
}
_ => None,
};
let old_version = |c: &ChangeKind| match c {
ChangeKind::Bump { old_version, .. } => Some(old_version.clone()),
_ => None,
};
let new_version = |c: &ChangeKind| match c {
ChangeKind::Bump { new_version, .. } => Some(new_version.clone()),
ChangeKind::Publish { version, .. } => Some(version.clone()),
_ => None,
};
let file = |c: &ChangeKind| match c {
ChangeKind::Bump { file, .. } | ChangeKind::PatchStrip { file, .. } => Some(file.clone()),
_ => None,
};
let old_req_lines = |c: &ChangeKind| match c {
ChangeKind::Bump { old_file_text, .. } => Some(old_file_text.clone()),
_ => None,
};
let removed_block = |c: &ChangeKind| match c {
ChangeKind::PatchStrip { removed_block, .. } => Some(removed_block.clone()),
_ => None,
};
let branch = |c: &ChangeKind| match c {
ChangeKind::Branch { branch, .. } => Some(branch.clone()),
_ => None,
};
let prev_head = |c: &ChangeKind| match c {
ChangeKind::Branch { prev_head, .. } => Some(prev_head.clone()),
_ => None,
};
let registry = |c: &ChangeKind| match c {
ChangeKind::Publish { registry, .. } => Some(registry.clone()),
_ => None,
};
let immutable = |c: &ChangeKind| match c {
ChangeKind::Publish { immutable, .. } => Some(*immutable),
_ => None,
};
let cols: Vec<Arc<dyn Array>> = vec![
Arc::new(StringArray::from(rows.iter().map(|r| r.release_id.clone()).collect::<Vec<_>>())),
Arc::new(Int64Array::from(rows.iter().map(|r| r.seq).collect::<Vec<_>>())),
Arc::new(
TimestampMicrosecondArray::from(rows.iter().map(|r| r.ts_micros).collect::<Vec<_>>())
.with_timezone("+00:00"),
),
Arc::new(StringArray::from(rows.iter().map(|r| kind_of(&r.change).to_string()).collect::<Vec<_>>())),
Arc::new(StringArray::from(rows.iter().map(|r| r.repo.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(rows.iter().map(|r| r.change_sha.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(rows.iter().map(|r| crate_name(&r.change)).collect::<Vec<_>>())),
Arc::new(StringArray::from(rows.iter().map(|r| old_version(&r.change)).collect::<Vec<_>>())),
Arc::new(StringArray::from(rows.iter().map(|r| new_version(&r.change)).collect::<Vec<_>>())),
Arc::new(StringArray::from(rows.iter().map(|r| file(&r.change)).collect::<Vec<_>>())),
Arc::new(StringArray::from(rows.iter().map(|r| old_req_lines(&r.change)).collect::<Vec<_>>())),
Arc::new(StringArray::from(rows.iter().map(|r| removed_block(&r.change)).collect::<Vec<_>>())),
Arc::new(StringArray::from(rows.iter().map(|r| branch(&r.change)).collect::<Vec<_>>())),
Arc::new(StringArray::from(rows.iter().map(|r| prev_head(&r.change)).collect::<Vec<_>>())),
Arc::new(StringArray::from(rows.iter().map(|r| registry(&r.change)).collect::<Vec<_>>())),
Arc::new(BooleanArray::from(rows.iter().map(|r| immutable(&r.change)).collect::<Vec<_>>())),
];
let batch = RecordBatch::try_new(arrow_schema, cols)?;
append_batch(wh.catalog(), table, batch).await?;
Ok(())
}
pub async fn query_release_changes(
wh: &IcebergWarehouse,
release_id: Option<&str>,
) -> Result<Vec<ReleaseChange>> {
let batches: Vec<RecordBatch> =
super::iceberg::load_and_read_all(wh, TABLE_RELEASE_CHANGES).await?;
let mut out: Vec<ReleaseChange> = Vec::new();
for b in &batches {
let rid = col_str(b, COL_RELEASE_ID)?;
let seq = col_i64(b, COL_SEQ)?;
let ts = col_ts(b, COL_TS_MICROS)?;
let kind = col_str(b, COL_KIND)?;
let repo = col_str(b, COL_REPO)?;
let sha = col_str(b, COL_CHANGE_SHA)?;
let crate_name = col_str(b, COL_CRATE_NAME)?;
let old_version = col_str(b, COL_OLD_VERSION)?;
let new_version = col_str(b, COL_NEW_VERSION)?;
let file = col_str(b, COL_FILE)?;
let old_req = col_str(b, COL_OLD_REQ_LINES)?;
let removed = col_str(b, COL_REMOVED_BLOCK)?;
let branch = col_str(b, COL_BRANCH)?;
let prev_head = col_str(b, COL_PREV_HEAD)?;
let registry = col_str(b, COL_REGISTRY)?;
let immutable = col_bool(b, COL_IMMUTABLE)?;
let s = |a: &StringArray, i: usize| if a.is_null(i) { String::new() } else { a.value(i).to_string() };
for i in 0..b.num_rows() {
let change = match kind.value(i) {
kind::BUMP => ChangeKind::Bump {
crate_name: s(crate_name, i),
old_version: s(old_version, i),
new_version: s(new_version, i),
file: s(file, i),
old_file_text: s(old_req, i),
},
kind::PATCH_STRIP => ChangeKind::PatchStrip {
file: s(file, i),
removed_block: s(removed, i),
},
kind::BRANCH => ChangeKind::Branch {
repo: s(repo, i),
branch: s(branch, i),
prev_head: s(prev_head, i),
},
kind::PUBLISH => ChangeKind::Publish {
crate_name: s(crate_name, i),
version: s(new_version, i),
registry: s(registry, i),
immutable: !immutable.is_null(i) && immutable.value(i),
},
other => return Err(anyhow!("unknown release_change kind `{other}`")),
};
let row = ReleaseChange {
release_id: rid.value(i).to_string(),
seq: seq.value(i),
ts_micros: ts.value(i),
repo: repo.value(i).to_string(),
change_sha: sha.value(i).to_string(),
change,
};
if release_id.map(|id| row.release_id == id).unwrap_or(true) {
out.push(row);
}
}
}
out.sort_by_key(|r| r.key());
Ok(out)
}
pub async fn latest_release_id(wh: &IcebergWarehouse) -> Result<Option<String>> {
let all = query_release_changes(wh, None).await?;
Ok(all.iter().max_by_key(|r| r.ts_micros).map(|r| r.release_id.clone()))
}
pub fn new_release_id() -> String {
Uuid::new_v4().to_string()
}
fn col_str<'a>(b: &'a RecordBatch, idx: usize) -> Result<&'a StringArray> {
b.column(idx)
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| anyhow!("release_changes col {idx} is not StringArray"))
}
fn col_i64<'a>(b: &'a RecordBatch, idx: usize) -> Result<&'a Int64Array> {
b.column(idx)
.as_any()
.downcast_ref::<Int64Array>()
.ok_or_else(|| anyhow!("release_changes col {idx} is not Int64Array"))
}
fn col_ts<'a>(b: &'a RecordBatch, idx: usize) -> Result<&'a TimestampMicrosecondArray> {
b.column(idx)
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.ok_or_else(|| anyhow!("release_changes col {idx} is not TimestampMicrosecondArray"))
}
fn col_bool<'a>(b: &'a RecordBatch, idx: usize) -> Result<&'a BooleanArray> {
b.column(idx)
.as_any()
.downcast_ref::<BooleanArray>()
.ok_or_else(|| anyhow!("release_changes col {idx} is not BooleanArray"))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn append_and_query_round_trips_each_kind() {
let dir = tempfile::tempdir().unwrap();
let wh = IcebergWarehouse::open(dir.path()).unwrap();
let rec = ChangeRecorder::new("run-xyz");
let bump = rec.build("skade", ChangeKind::Bump {
crate_name: "arrow".into(),
old_version: "57".into(),
new_version: "58".into(),
file: "Cargo.toml".into(),
old_file_text: "[dependencies]\narrow = \"57\"\n".into(),
});
let strip = rec.build("nornir", ChangeKind::PatchStrip {
file: "Cargo.toml".into(),
removed_block: "[patch.crates-io]\niceberg = { path = \"../fork\" }\n".into(),
});
let branch = rec.build("nornir", ChangeKind::Branch {
repo: "nornir".into(),
branch: "release/staging".into(),
prev_head: "deadbeef".into(),
});
let publish = rec.build("nornir", ChangeKind::Publish {
crate_name: "nornir".into(),
version: "0.2.0".into(),
registry: "crates.io".into(),
immutable: true,
});
wh.block_on(append_release_changes(&wh, &[bump.clone(), strip.clone(), branch.clone(), publish.clone()]))
.unwrap();
let rows = wh.block_on(query_release_changes(&wh, Some("run-xyz"))).unwrap();
assert_eq!(rows.len(), 4);
assert_eq!(rows.iter().map(|r| r.seq).collect::<Vec<_>>(), vec![0, 1, 2, 3]);
assert_eq!(rows[0].change, bump.change);
assert_eq!(rows[1].change, strip.change);
assert_eq!(rows[2].change, branch.change);
assert_eq!(rows[3].change, publish.change);
assert!(!rows[0].change_sha.is_empty());
assert_eq!(rows[0].change_sha, change_sha(&bump.change));
let latest = wh.block_on(latest_release_id(&wh)).unwrap();
assert_eq!(latest.as_deref(), Some("run-xyz"));
}
}