use calimero_context::group_store::{membership_status_at, MembershipStatus};
use calimero_primitives::context::ContextId;
use calimero_primitives::identity::PublicKey;
use eyre::Result;
use tracing::{debug, info, warn};
use super::{
apply_authorized_state_delta, choose_owned_identity, state_delta_message_from_buffered,
verify_position_group_id_matches_context, GroupIdCheck, StateDeltaContext,
};
pub(super) async fn drain_governance_pending(input: &StateDeltaContext, context_id: &ContextId) {
let snapshot_len = input.node_state.governance_pending_len(context_id);
if snapshot_len == 0 {
return;
}
debug!(
%context_id,
count = snapshot_len,
"governance-pending drain: draining governance-pending buffer"
);
for _ in 0..snapshot_len {
let Some(buffered) = input.node_state.pop_governance_pending(context_id) else {
break;
};
let Some(pos) = buffered.governance_position.as_ref() else {
warn!(
%context_id,
delta_id = ?buffered.id,
"governance-pending drain: pending delta has no governance_position; dropping"
);
crate::node_metrics::record_governance_drain_outcome("no_governance_position");
continue;
};
let datastore = input.node_clients.context.datastore();
match verify_position_group_id_matches_context(datastore, context_id, Some(pos.group_id)) {
GroupIdCheck::Match => {}
GroupIdCheck::NonGroupOk | GroupIdCheck::GroupContextNoPosition { .. } => {
debug_assert!(
false,
"GroupIdCheck::{{NonGroupOk, GroupContextNoPosition}} require \
claimed_group_id=None, but drain always passes Some(pos.group_id) — \
the call-site contract has been broken"
);
warn!(
%context_id,
delta_id = ?buffered.id,
author = %buffered.author_id,
"governance-pending drain: dropping pending delta — \
verify_position_group_id_matches_context returned an outcome that \
requires claimed_group_id=None despite drain passing Some \
(call-site contract violated; investigate)"
);
crate::node_metrics::record_governance_drain_outcome("helper_contract_violation");
continue;
}
GroupIdCheck::Mismatch { owning, claimed } => {
warn!(
%context_id,
delta_id = ?buffered.id,
author = %buffered.author_id,
owning_group = ?owning,
claimed_group = ?claimed,
"governance-pending drain: rejecting pending delta — governance_position \
references a different group than the context's owning group"
);
crate::node_metrics::record_governance_drain_outcome("group_mismatch");
continue;
}
GroupIdCheck::NonGroupContextWithPosition { claimed } => {
warn!(
%context_id,
delta_id = ?buffered.id,
author = %buffered.author_id,
claimed_group = ?claimed,
"governance-pending drain: rejecting pending delta — governance_position \
present but context is not part of any group (group disappeared since buffering?)"
);
crate::node_metrics::record_governance_drain_outcome("group_disappeared");
continue;
}
GroupIdCheck::LookupError(err) => {
warn!(
%context_id,
delta_id = ?buffered.id,
author = %buffered.author_id,
%err,
"governance-pending drain: get_group_for_context failed; dropping delta to avoid silent bypass"
);
crate::node_metrics::record_governance_drain_outcome("group_lookup_failed");
continue;
}
}
let status = membership_status_at(datastore, &buffered.author_id, pos);
match status {
Ok(MembershipStatus::Member(_)) => {
debug!(
%context_id,
delta_id = ?buffered.id,
author = %buffered.author_id,
"governance-pending drain: pending delta now authorized; re-applying"
);
crate::node_metrics::record_governance_drain_outcome("applied");
let reconstructed = state_delta_message_from_buffered(buffered, *context_id);
if let Err(err) =
apply_authorized_state_delta(input.clone(), reconstructed, false).await
{
warn!(
%context_id,
%err,
"governance-pending drain: re-apply of authorized buffered delta failed"
);
}
}
Ok(MembershipStatus::Removed { last_role }) => {
warn!(
%context_id,
delta_id = ?buffered.id,
author = %buffered.author_id,
last_role = ?last_role,
"governance-pending drain: pending delta from removed author; dropping"
);
crate::node_metrics::record_governance_drain_outcome("removed");
}
Ok(MembershipStatus::NeverMember) => {
warn!(
%context_id,
delta_id = ?buffered.id,
author = %buffered.author_id,
"governance-pending drain: pending delta from non-member; dropping"
);
crate::node_metrics::record_governance_drain_outcome("never_member");
}
Ok(MembershipStatus::Unknown { needed }) => {
let mut buffered = buffered;
buffered.governance_drain_attempts =
buffered.governance_drain_attempts.saturating_add(1);
if buffered.governance_drain_attempts
>= calimero_node_primitives::delta_buffer::MAX_GOVERNANCE_DRAIN_ATTEMPTS
{
warn!(
%context_id,
delta_id = ?buffered.id,
attempts = buffered.governance_drain_attempts,
"governance-pending drain: dropping pending delta after exhausting drain attempts \
(governance heads still unknown — likely permanently missing)"
);
crate::node_metrics::record_governance_drain_outcome("dropped_max_attempts");
} else {
debug!(
%context_id,
delta_id = ?buffered.id,
needed_count = needed.len(),
attempts = buffered.governance_drain_attempts,
"governance-pending drain: still pending governance catchup; re-buffering"
);
crate::node_metrics::record_governance_drain_outcome("rebuffered");
input
.node_state
.buffer_governance_pending(*context_id, buffered);
}
}
Err(err) => {
warn!(
%context_id,
delta_id = ?buffered.id,
%err,
"governance-pending drain: membership lookup failed for pending delta; dropping"
);
crate::node_metrics::record_governance_drain_outcome("lookup_error");
}
}
}
}
pub(crate) async fn drain_all_governance_pending(input: &StateDeltaContext) {
let context_ids = input.node_state.governance_pending_context_ids();
if context_ids.is_empty() {
return;
}
debug!(
count = context_ids.len(),
"governance-pending drain: governance-apply hook draining pending buffers across contexts"
);
for context_id in context_ids {
drain_governance_pending(input, &context_id).await;
}
}
pub(crate) async fn drain_absorbed(input: &StateDeltaContext, context_id: &ContextId) {
let store = input.node_clients.context.datastore();
let drained = drain_absorbed_records(store, context_id, |buffered| {
let input = input.clone();
let context_id = *context_id;
async move {
let reconstructed = state_delta_message_from_buffered(buffered, context_id);
apply_authorized_state_delta(input, reconstructed, true).await?;
Ok::<bool, eyre::Report>(true)
}
})
.await;
match drained {
Ok(0) => {}
Ok(n) => info!(
%context_id,
drained = n,
"absorb drain: replayed buffered straggler deltas verbatim after binary advance"
),
Err(err) => warn!(
%context_id,
%err,
"absorb drain: failed to enumerate absorb buffer"
),
}
if let Err(err) = drain_absorbed_leaves(input, context_id).await {
warn!(%context_id, %err, "absorb drain: leaf drain failed");
}
}
async fn drain_absorbed_leaves(input: &StateDeltaContext, context_id: &ContextId) -> Result<()> {
use borsh::BorshDeserialize;
use calimero_context::group_store::AbsorbRepository;
use calimero_context::hlc_fence::loaded_reader_app_key;
use calimero_node_primitives::sync::storage_bridge::create_runtime_env;
use calimero_node_primitives::sync::TreeLeafData;
let store = input.node_clients.context.datastore();
let Some(loaded) = loaded_reader_app_key(store, context_id)? else {
return Ok(());
};
let repo = AbsorbRepository::new(store);
let pending = repo.enumerate_pending(context_id)?;
if !pending
.iter()
.any(|(_, r)| r.leaf.is_some() || r.entity.is_some())
{
return Ok(());
}
let identity = choose_owned_identity(&input.node_clients.context, context_id).await?;
let runtime_env = create_runtime_env(store, *context_id, identity);
let mut drained = 0usize;
for ((producing_app_key, delta_id), record) in pending {
if let Some(entity_absorb) = record.entity {
if entity_absorb.schema_app_key != loaded {
continue;
}
let mut handle = input.node_clients.context.datastore_handle();
match crate::sync::snapshot::persist_buffered_snapshot_entity(
&mut handle,
*context_id,
entity_absorb.id,
&entity_absorb.entry,
&entity_absorb.index,
) {
Ok(crate::sync::snapshot::SnapshotEntityDrainOutcome::Persisted) => {
repo.delete(context_id, producing_app_key, delta_id)?;
drained += 1;
}
Ok(crate::sync::snapshot::SnapshotEntityDrainOutcome::RedrivenElsewhere) => {
repo.delete(context_id, producing_app_key, delta_id)?;
}
Ok(crate::sync::snapshot::SnapshotEntityDrainOutcome::Pending) => {
}
Err(err) => warn!(
%context_id,
delta_id = ?delta_id,
%err,
"absorb entity drain: persist failed — leaving record pending for retry"
),
}
continue;
}
let Some(leaf_absorb) = record.leaf else {
continue; };
if leaf_absorb.schema_app_key != loaded {
continue;
}
let leaf = match TreeLeafData::try_from_slice(&leaf_absorb.leaf_bytes) {
Ok(l) => l,
Err(err) => {
warn!(
%context_id,
delta_id = ?delta_id,
%err,
"absorb leaf drain: corrupt buffered leaf bytes — skipping"
);
continue;
}
};
let ctx = *context_id;
let apply = calimero_storage::env::with_runtime_env(runtime_env.clone(), || {
crate::sync::helpers::apply_leaf_with_crdt_merge(ctx, &leaf)
});
match apply {
Ok(()) => {
repo.delete(context_id, producing_app_key, delta_id)?;
drained += 1;
}
Err(err) => warn!(
%context_id,
delta_id = ?delta_id,
%err,
"absorb leaf drain: re-apply failed — leaving record pending for retry"
),
}
}
if drained > 0 {
info!(
%context_id,
drained,
"absorb drain: re-applied buffered sync-repair leaves/entities after binary advance"
);
}
Ok(())
}
pub(crate) async fn drain_absorbed_records<F, Fut>(
store: &calimero_store::Store,
context_id: &ContextId,
replay: F,
) -> Result<usize>
where
F: Fn(calimero_node_primitives::delta_buffer::BufferedDelta) -> Fut,
Fut: std::future::Future<Output = Result<bool>>,
{
use calimero_context::group_store::AbsorbRepository;
use calimero_context::hlc_fence::{loaded_reader_app_key, target_reader_app_key};
let Some(loaded) = loaded_reader_app_key(store, context_id)? else {
return Ok(0);
};
let target = target_reader_app_key(store, context_id)?.unwrap_or(loaded);
let repo = AbsorbRepository::new(store);
let pending = repo.enumerate_pending(context_id)?;
let mut drained = 0usize;
for ((producing_app_key, delta_id), record) in pending {
if record.leaf.is_some() || record.entity.is_some() {
continue;
}
if loaded != target && Some(loaded) != record.producing_app_key {
continue;
}
let buffered = match record.into_buffered() {
Ok(b) => b,
Err(err) => {
warn!(
%context_id,
delta_id = ?delta_id,
%err,
"absorb drain: corrupt AbsorbRecord — cannot reconstruct buffered delta; skipping"
);
continue;
}
};
match replay(buffered).await {
Ok(true) => {
repo.delete(context_id, producing_app_key, delta_id)?;
drained += 1;
}
Ok(false) => {
debug!(
%context_id,
delta_id = ?delta_id,
"absorb drain: replay declined to consume delta — leaving record pending"
);
}
Err(err) => {
warn!(
%context_id,
delta_id = ?delta_id,
%err,
"absorb drain: verbatim replay failed — leaving record pending for retry"
);
}
}
}
Ok(drained)
}
pub(crate) async fn drain_all_absorbed(input: &StateDeltaContext) {
use calimero_context::group_store::AbsorbRepository;
let store = input.node_clients.context.datastore();
let context_ids = match AbsorbRepository::new(store).enumerate_all_contexts() {
Ok(ids) => ids,
Err(err) => {
warn!(%err, "absorb drain: failed to enumerate contexts with pending absorbs");
return;
}
};
if context_ids.is_empty() {
return;
}
debug!(
count = context_ids.len(),
"absorb drain: binary-advance hook draining absorb buffers across contexts"
);
for context_id in context_ids {
drain_absorbed(input, &context_id).await;
}
}
pub(crate) async fn recover_absorbed_records<F, Fut>(
store: &calimero_store::Store,
replay: F,
) -> Result<usize>
where
F: Fn(ContextId, calimero_node_primitives::delta_buffer::BufferedDelta) -> Fut + Clone,
Fut: std::future::Future<Output = Result<bool>>,
{
use calimero_context::group_store::AbsorbRepository;
let context_ids = AbsorbRepository::new(store).enumerate_all_contexts()?;
if context_ids.is_empty() {
return Ok(0);
}
let mut total = 0usize;
for context_id in context_ids {
let replay = replay.clone();
match drain_absorbed_records(store, &context_id, move |buffered| {
replay(context_id, buffered)
})
.await
{
Ok(n) => total += n,
Err(err) => warn!(
%context_id,
%err,
"absorb recovery: drain failed for context; skipping (durable buffer will retry)"
),
}
}
Ok(total)
}
pub(crate) async fn recover_absorbed_on_startup(input: &StateDeltaContext) {
let store = input.node_clients.context.datastore();
let recovered = recover_absorbed_records(store, |context_id, buffered| {
let input = input.clone();
async move {
let reconstructed = state_delta_message_from_buffered(buffered, context_id);
apply_authorized_state_delta(input, reconstructed, true).await?;
Ok::<bool, eyre::Report>(true)
}
})
.await;
match recovered {
Ok(0) => {}
Ok(n) => info!(
recovered = n,
"absorb recovery: replayed buffered straggler deltas verbatim on startup"
),
Err(err) => warn!(
%err,
"absorb recovery: failed to scan absorb buffer on startup"
),
}
let leaf_contexts = match calimero_context::group_store::AbsorbRepository::new(store)
.enumerate_all_contexts()
{
Ok(ids) => ids,
Err(err) => {
warn!(%err, "absorb recovery: failed to enumerate contexts for leaf drain on startup");
return;
}
};
for context_id in leaf_contexts {
if let Err(err) = drain_absorbed_leaves(input, &context_id).await {
warn!(%context_id, %err, "absorb recovery: leaf drain failed on startup");
}
}
}
pub(super) enum FenceOutcome {
Fall,
Handled,
}
#[allow(clippy::too_many_arguments)]
pub(super) fn fence_and_maybe_absorb(
store: &calimero_store::Store,
context_id: &ContextId,
producing_app_key: [u8; 32],
delta_id: [u8; 32],
author_id: PublicKey,
delta_hlc: calimero_storage::logical_clock::HybridTimestamp,
bypass: bool,
build_buffered: impl FnOnce() -> calimero_node_primitives::delta_buffer::BufferedDelta,
) -> Result<FenceOutcome> {
use calimero_context::group_store::{AbsorbRecord, AbsorbRepository};
use calimero_context::hlc_fence::{delta_fence_decision, FenceDecision};
if bypass {
return Ok(FenceOutcome::Fall);
}
match delta_fence_decision(store, context_id, producing_app_key, delta_hlc)? {
FenceDecision::Apply => Ok(FenceOutcome::Fall),
FenceDecision::Buffer => {
let buffered = build_buffered();
let record = AbsorbRecord::from_buffered(&buffered);
AbsorbRepository::new(store).save(context_id, producing_app_key, &record)?;
info!(
%context_id,
%author_id,
delta_id = ?delta_id,
producing_app_key = %hex::encode(producing_app_key),
"Absorbing state delta — loaded reader behind incoming schema; buffered for verbatim replay"
);
crate::node_metrics::record_delta_outcome("absorbed_for_migration");
Ok(FenceOutcome::Handled)
}
FenceDecision::Drop => {
warn!(
%context_id,
%author_id,
delta_id = ?delta_id,
producing_app_key = %hex::encode(producing_app_key),
"Dropping state delta — HLC fence: stale schema after cascade migration"
);
crate::node_metrics::record_delta_outcome("fenced_stale_schema");
Ok(FenceOutcome::Handled)
}
}
}