use kiseki_common::ids::{ChunkId, NodeId, OrgId, ShardId};
use kiseki_common::time::{ClockQuality, DeltaTimestamp, HybridLogicalClock, WallTime};
use kiseki_log::delta::OperationType;
use kiseki_log::traits::{AppendDeltaRequest, LogOps};
pub(crate) fn emit_delta<L: LogOps + ?Sized>(
log: &L,
shard_id: ShardId,
tenant_id: OrgId,
operation: OperationType,
hashed_key: [u8; 32],
chunk_refs: Vec<ChunkId>,
payload: Vec<u8>,
) -> bool {
let timestamp = now_timestamp();
let req = AppendDeltaRequest {
shard_id,
tenant_id,
operation,
timestamp,
hashed_key,
chunk_refs,
payload,
has_inline_data: false,
};
match log.append_delta(req) {
Ok(_seq) => true,
Err(e) => {
tracing::warn!(error = %e, "delta emission failed");
false
}
}
}
static HLC_LOGICAL: std::sync::atomic::AtomicU32 = std::sync::atomic::AtomicU32::new(0);
fn now_timestamp() -> DeltaTimestamp {
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_or(0, |d| u64::try_from(d.as_millis()).unwrap_or(u64::MAX));
let logical = HLC_LOGICAL.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
DeltaTimestamp {
hlc: HybridLogicalClock {
physical_ms: now_ms,
logical,
node_id: NodeId(0),
},
wall: WallTime {
millis_since_epoch: now_ms,
timezone: "UTC".into(),
},
quality: ClockQuality::Ntp,
}
}