use super::super::fanout::CommittedEventEnvelope;
use super::super::staging::{PreparedBatch, StagedCommittedEvent};
use super::{kind_to_raw, Notification, WriterState};
use crate::store::index::{DiskPos, IndexEntry};
use crate::store::segment::sidx::SidxEntry;
use crate::store::stats::HlcPoint;
use crate::store::AppendReceipt;
pub(super) struct CommitArtifacts {
pub(super) index_entry: IndexEntry,
pub(super) sidx_entry: SidxEntry,
pub(super) notification: Notification,
pub(super) envelope: Option<CommittedEventEnvelope>,
}
#[derive(Clone, Copy)]
pub(super) struct CommitInternedIds {
pub(super) entity_id: crate::store::index::interner::InternId,
pub(super) scope_id: crate::store::index::interner::InternId,
}
pub(super) struct BatchCommitArtifacts {
pub(super) entries: Vec<IndexEntry>,
pub(super) sidx_entries: Vec<SidxEntry>,
pub(super) notifications: Vec<Notification>,
pub(super) envelopes: Vec<CommittedEventEnvelope>,
}
impl BatchCommitArtifacts {
pub(super) fn with_capacity(len: usize) -> Self {
Self {
entries: Vec::with_capacity(len),
sidx_entries: Vec::with_capacity(len),
notifications: Vec::with_capacity(len),
envelopes: Vec::with_capacity(len),
}
}
fn push(&mut self, committed: CommitArtifacts) {
self.entries.push(committed.index_entry);
self.sidx_entries.push(committed.sidx_entry);
self.notifications.push(committed.notification);
if let Some(envelope) = committed.envelope {
self.envelopes.push(envelope);
}
}
}
impl WriterState<'_> {
pub(super) fn materialize_commit_artifacts(
&self,
staged: &StagedCommittedEvent,
disk_pos: DiskPos,
interned_ids: CommitInternedIds,
payload_bytes: &[u8],
flags: u8,
emit_envelope: bool,
) -> CommitArtifacts {
let coord = staged.coord.clone();
let position = staged.position();
let notification = Notification {
event_id: staged.meta.event_id,
correlation_id: staged.meta.correlation_id,
causation_id: staged.meta.causation_id,
coord: coord.clone(),
kind: staged.meta.kind,
sequence: staged.meta.global_sequence,
position,
};
let index_entry = IndexEntry {
event_id: staged.meta.event_id,
correlation_id: staged.meta.correlation_id,
causation_id: staged.meta.causation_id,
coord: coord.clone(),
entity_id: interned_ids.entity_id,
scope_id: interned_ids.scope_id,
kind: staged.meta.kind,
wall_ms: staged.timing.wall_ms,
clock: staged.timing.clock,
dag_lane: staged.timing.dag_lane,
dag_depth: staged.timing.dag_depth,
hash_chain: staged.hash_chain.clone(),
disk_pos,
global_sequence: staged.meta.global_sequence,
};
let sidx_entry = SidxEntry {
event_id: staged.meta.event_id,
entity_idx: 0,
scope_idx: 0,
kind: kind_to_raw(staged.meta.kind),
wall_ms: staged.timing.wall_ms,
clock: staged.timing.clock,
dag_lane: staged.timing.dag_lane,
dag_depth: staged.timing.dag_depth,
prev_hash: staged.hash_chain.prev_hash,
event_hash: staged.hash_chain.event_hash,
frame_offset: disk_pos.offset,
frame_length: disk_pos.length,
global_sequence: staged.meta.global_sequence,
correlation_id: staged.meta.correlation_id,
causation_id: staged.meta.causation_id.unwrap_or(0),
};
let envelope = if emit_envelope {
staged
.stored_event(payload_bytes, flags)
.map(|stored| CommittedEventEnvelope {
notification: notification.clone(),
stored,
})
.ok()
} else {
None
};
CommitArtifacts {
index_entry,
sidx_entry,
notification,
envelope,
}
}
pub(super) fn materialize_batch_commit_artifacts(
&self,
prepared: &PreparedBatch,
staged: &[StagedCommittedEvent],
receipts: &[AppendReceipt],
) -> BatchCommitArtifacts {
let emit_envelope = self.reactor_subscribers.has_subscribers();
let mut artifacts = BatchCommitArtifacts::with_capacity(staged.len());
let interned_ids = prepared.interned_ids(self.index);
for ((item, staged), receipt) in prepared
.items()
.iter()
.zip(staged.iter())
.zip(receipts.iter())
{
let committed = self.materialize_commit_artifacts(
staged,
receipt.disk_pos,
CommitInternedIds {
entity_id: interned_ids.entity_id(item),
scope_id: interned_ids.scope_id(item),
},
item.payload_bytes(),
item.options().flags,
emit_envelope,
);
artifacts.push(committed);
}
artifacts
}
pub(super) fn broadcast_commit_artifacts(
&self,
notifications: impl IntoIterator<Item = Notification>,
envelopes: impl IntoIterator<Item = CommittedEventEnvelope>,
) {
for notification in notifications {
self.subscribers.broadcast(¬ification);
}
for envelope in envelopes {
self.reactor_subscribers.broadcast(&envelope);
}
}
#[inline]
pub(super) fn publish_then_broadcast_unfenced(
&mut self,
publish_up_to: u64,
frontier_point: HlcPoint,
notifications: impl IntoIterator<Item = Notification>,
envelopes: impl IntoIterator<Item = CommittedEventEnvelope>,
) -> Result<(), crate::store::StoreError> {
self.index
.publish(publish_up_to, "publish_then_broadcast_unfenced")?;
self.broadcast_commit_artifacts(notifications, envelopes);
self.watermark_handle
.lock()
.advance_visible_and_emitted(frontier_point);
Ok(())
}
#[inline]
pub(super) fn fence_finish_then_broadcast(
&mut self,
token: u64,
publish_up_to: Option<u64>,
frontier_point: Option<HlcPoint>,
notifications: impl IntoIterator<Item = Notification>,
envelopes: impl IntoIterator<Item = CommittedEventEnvelope>,
) -> Result<(), crate::store::StoreError> {
self.index.finish_visibility_fence(token, publish_up_to)?;
self.broadcast_commit_artifacts(notifications, envelopes);
if let Some(point) = frontier_point {
self.watermark_handle
.lock()
.advance_visible_and_emitted(point);
}
Ok(())
}
}