use super::super::fanout::CommittedEventEnvelope;
use super::fence_runtime::FenceLedger;
use super::publish::{CommitFrameView, CommitInternedIds};
use super::Notification;
use super::{
segment, Coordinate, DagPosition, DiskPos, Event, EventKind, FramePayloadRef, HashChain,
StoreError, WriterCore,
};
use super::{StagedCommitMeta, StagedCommitTiming, StagedCommittedEvent};
use crate::store::stats::HlcPoint;
use crate::store::{AppendReceipt, EncodedBytes, ExtensionKey};
use std::collections::BTreeMap;
use tracing::{debug, info, trace};
pub(crate) struct AppendGuards {
pub correlation_id: u128,
pub causation_id: Option<u128>,
pub expected_sequence: Option<u32>,
pub idempotency_key: Option<u128>,
pub dag_lane: u32,
pub dag_depth: u32,
pub dag_branch_root: bool,
pub extensions: BTreeMap<ExtensionKey, EncodedBytes>,
}
impl WriterCore {
pub(super) fn handle_append(
&mut self,
coord: &Coordinate,
mut event: Event<Vec<u8>>,
kind: EventKind,
guards: &AppendGuards,
fence: Option<&mut FenceLedger>,
) -> Result<AppendReceipt, StoreError> {
let correlation_id = guards.correlation_id;
let causation_id = guards.causation_id;
let entity = coord.entity();
let scope = coord.scope();
let latest = self.index.get_latest_committed(entity, guards.dag_lane);
Self::enforce_expected_sequence(latest.as_ref(), guards.expected_sequence, entity)?;
if let Some(key) = guards.idempotency_key {
if let Some(receipt) = self.try_idempotency_replay(key)? {
return Ok(receipt);
}
}
let prev_hash = latest
.as_ref()
.map(|entry| entry.hash_chain.event_hash)
.unwrap_or([0u8; 32]);
let clock = super::checked_next_clock(latest.as_ref().map(|entry| entry.clock), entity)?;
let raw_ms = crate::store::config::wall_ms_from_timestamp_us(event.header.timestamp_us)?;
let last_ms = latest.as_ref().map(|entry| entry.wall_ms).unwrap_or(0);
let now_ms = raw_ms.max(last_ms);
let global_seq = self.index.global_sequence();
let frontier_point = HlcPoint {
wall_ms: now_ms,
global_sequence: global_seq,
};
#[cfg(feature = "dangerous-test-hooks")]
crate::store::fault::maybe_inject(
crate::store::fault::InjectionPoint::SingleAppendStart {
entity: entity.to_string(),
},
&self.config.fault_injector,
)?;
self.watermark_handle
.lock()
.advance_accepted_on_lane(guards.dag_lane, frontier_point);
let dag_depth = if guards.dag_branch_root {
DagPosition::fork(guards.dag_depth, guards.dag_lane).depth()
} else {
guards.dag_depth
};
let position = DagPosition::with_hlc(now_ms, 0, dag_depth, guards.dag_lane, clock);
event.header.position = position;
event.header.event_kind = kind;
event.header.correlation_id = crate::id::CorrelationId::from(correlation_id);
event.header.causation_id = causation_id
.filter(|&id| id != 0)
.map(crate::id::CausationId::from);
#[cfg(feature = "payload-encryption")]
self.encrypt_single_payload(coord, kind, &mut event)?;
let event_hash = crate::event::hash::compute_hash(&event.payload);
event.hash_chain = Some(HashChain {
prev_hash,
event_hash,
});
event.header.content_hash = event_hash;
let mut receipt = AppendReceipt {
event_id: event.header.event_id,
global_sequence: global_seq,
disk_pos: DiskPos {
segment_id: self.segment_id,
offset: 0,
length: 0,
},
content_hash: event_hash,
key_id: [0; 32],
signature: None,
extensions: guards.extensions.clone(),
};
self.runtime
.signing_registry
.sign_append_receipt(&mut receipt, coord, kind, prev_hash)?;
let frame_payload = FramePayloadRef {
event: &event,
entity,
scope,
receipt_extensions: &receipt.extensions,
};
let frame = segment::frame_encode(&frame_payload)?;
if self.maybe_rotate_segment()? {
info!(segment_id = self.segment_id, "segment rotated");
}
let offset = self.active_segment.write_frame(&frame)?;
self.watermark_handle
.lock()
.advance_written_on_lane(guards.dag_lane, frontier_point);
trace!(offset = offset, len = frame.len(), "frame written");
#[cfg(feature = "dangerous-test-hooks")]
crate::store::fault::maybe_inject(
crate::store::fault::InjectionPoint::SingleAppendWritten {
entity: entity.to_string(),
},
&self.config.fault_injector,
)?;
let disk_pos = DiskPos {
segment_id: self.segment_id,
offset,
length: u32::try_from(frame.len())
.map_err(|_| StoreError::ser_msg("encoded frame length exceeds u32::MAX"))?,
};
receipt.disk_pos = disk_pos;
let meta = {
use crate::id::EntityIdType;
StagedCommitMeta::new(
event.header.event_id.as_u128(),
correlation_id,
causation_id,
kind,
global_seq,
)
};
let timing = StagedCommitTiming::new(
event.header.timestamp_us,
now_ms,
clock,
guards.dag_lane,
dag_depth,
);
let staged = StagedCommittedEvent::new(
coord.clone(),
meta,
timing,
HashChain {
prev_hash,
event_hash,
},
);
let emit_envelope = self.reactor_subscribers.has_subscribers();
let interned = CommitInternedIds::for_coord(&self.index, coord)?;
let committed = self.materialize_commit_artifacts(
&staged,
disk_pos,
interned,
CommitFrameView {
payload_bytes: &event.payload,
flags: event.header.flags,
receipt_extensions: &receipt.extensions,
emit_envelope,
},
);
self.record_commit_index_artifacts(
committed.sidx_entry,
&committed.index_entry,
guards.idempotency_key.is_some(),
global_seq,
)?;
self.index.insert(committed.index_entry);
debug!(event_id = %event.header.event_id, clock = clock, "append committed");
self.publish_single_commit(
fence,
global_seq,
frontier_point,
committed.notification,
committed.envelope,
#[cfg(feature = "dangerous-test-hooks")]
entity,
)?;
Ok(receipt)
}
fn publish_single_commit(
&mut self,
fence: Option<&mut FenceLedger>,
global_seq: u64,
frontier_point: HlcPoint,
notification: Notification,
envelope: Option<CommittedEventEnvelope>,
#[cfg(feature = "dangerous-test-hooks")] entity: &str,
) -> Result<(), StoreError> {
if let Some(fence) = fence {
fence.record_publish_up_to(global_seq.saturating_add(1), frontier_point);
self.index.note_visibility_fence_progress(
fence.token,
global_seq,
global_seq.saturating_add(1),
)?;
fence.extend_artifacts([notification], envelope);
} else {
self.publish_then_broadcast_unfenced(
global_seq + 1,
frontier_point,
[notification],
envelope,
)?;
#[cfg(feature = "dangerous-test-hooks")]
crate::store::fault::maybe_inject(
crate::store::fault::InjectionPoint::SingleAppendPublished {
entity: entity.to_string(),
},
&self.config.fault_injector,
)?;
}
Ok(())
}
fn enforce_expected_sequence(
latest: Option<&crate::store::index::IndexEntry>,
expected: Option<u32>,
entity: &str,
) -> Result<(), StoreError> {
if let Some(expected) = expected {
let actual = latest.map(|entry| entry.clock).unwrap_or(0);
if actual != expected {
return Err(StoreError::SequenceMismatch {
entity: entity.to_string(),
expected,
actual,
});
}
}
Ok(())
}
fn record_commit_index_artifacts(
&mut self,
sidx_entry: crate::store::segment::sidx::SidxEntry,
index_entry: &crate::store::index::IndexEntry,
has_idempotency_key: bool,
global_seq: u64,
) -> Result<(), StoreError> {
self.sidx_collector.record(
sidx_entry,
index_entry.coord.entity(),
index_entry.coord.scope(),
)?;
if has_idempotency_key {
self.index
.idemp
.record(crate::store::index::idemp::IdempEntry::from_index_entry(
index_entry,
global_seq,
));
}
Ok(())
}
fn try_idempotency_replay(&mut self, key: u128) -> Result<Option<AppendReceipt>, StoreError> {
if let Some(durable) = self.index.idemp.get(key) {
let mut receipt = AppendReceipt {
event_id: crate::id::EventId::from(durable.event_id),
global_sequence: durable.global_sequence,
disk_pos: durable.disk_pos(),
content_hash: durable.content_hash,
key_id: [0; 32],
signature: None,
extensions: durable.receipt_extensions.clone(),
};
let coord = crate::coordinate::Coordinate::new(&durable.entity, &durable.scope)?;
self.runtime.signing_registry.sign_append_receipt(
&mut receipt,
&coord,
durable.kind,
durable.prev_hash,
)?;
return Ok(Some(receipt));
}
if let Some(entry) = self.index.get_by_id(key) {
let mut receipt = AppendReceipt {
event_id: crate::id::EventId::from(entry.event_id),
global_sequence: entry.global_sequence,
disk_pos: entry.disk_pos,
content_hash: entry.hash_chain.event_hash,
key_id: [0; 32],
signature: None,
extensions: entry.receipt_extensions.clone(),
};
self.runtime.signing_registry.sign_append_receipt(
&mut receipt,
&entry.coord,
entry.kind,
entry.hash_chain.prev_hash,
)?;
return Ok(Some(receipt));
}
self.index
.idemp
.admit_new_key(key, self.index.global_sequence())?;
Ok(None)
}
}
#[cfg(test)]
#[path = "append_mutation_kill.rs"]
mod append_mutation_kill;