use reifydb_core::{common::CommitVersion, key::cdc_consumer::CdcConsumerKeyRange};
use reifydb_transaction::transaction::Transaction;
use reifydb_type::Result;
pub fn compute_watermark(txn: &mut Transaction<'_>) -> Result<CommitVersion> {
let mut min_version: Option<CommitVersion> = None;
for multi in txn.range(CdcConsumerKeyRange::full_scan(), 1024)? {
let multi = multi?;
if multi.row.len() >= 8 {
let mut buffer = [0u8; 8];
buffer.copy_from_slice(&multi.row[0..8]);
let version = CommitVersion(u64::from_be_bytes(buffer));
min_version = Some(match min_version {
None => version,
Some(current_min) => current_min.min(version),
});
}
}
Ok(min_version.unwrap_or(CommitVersion(1)))
}