use super::super::fanout::CommittedEventEnvelope;
use super::super::staging::{PreparedBatch, StagedCommittedEvent};
use super::{kind_to_raw, Notification, WriterState};
use crate::store::index::{DiskPos, IndexEntry};
use crate::store::segment::sidx::SidxEntry;
use crate::store::stats::HlcPoint;
use crate::store::{AppendReceipt, EncodedBytes, ExtensionKey};
use std::collections::BTreeMap;
fn broadcast_all<T>(values: impl IntoIterator<Item = T>, mut broadcast: impl FnMut(&T)) -> usize {
let mut count = 0usize;
for value in values {
count += 1;
broadcast(&value);
}
count
}
pub(super) struct CommitArtifacts {
pub(super) index_entry: IndexEntry,
pub(super) sidx_entry: SidxEntry,
pub(super) notification: Notification,
pub(super) envelope: Option<CommittedEventEnvelope>,
}
#[derive(Clone, Copy)]
pub(super) struct CommitInternedIds {
pub(super) entity_id: crate::store::index::interner::InternId,
pub(super) scope_id: crate::store::index::interner::InternId,
}
pub(super) struct BatchCommitArtifacts {
pub(super) entries: Vec<IndexEntry>,
pub(super) sidx_entries: Vec<SidxEntry>,
pub(super) notifications: Vec<Notification>,
pub(super) envelopes: Vec<CommittedEventEnvelope>,
}
#[derive(Clone, Copy)]
pub(super) struct CommitFrameView<'a> {
pub(super) payload_bytes: &'a [u8],
pub(super) flags: u8,
pub(super) receipt_extensions: &'a BTreeMap<ExtensionKey, EncodedBytes>,
pub(super) emit_envelope: bool,
}
impl BatchCommitArtifacts {
pub(super) fn with_capacity(len: usize) -> Self {
Self {
entries: Vec::with_capacity(len),
sidx_entries: Vec::with_capacity(len),
notifications: Vec::with_capacity(len),
envelopes: Vec::with_capacity(len),
}
}
fn push(&mut self, committed: CommitArtifacts) {
self.entries.push(committed.index_entry);
self.sidx_entries.push(committed.sidx_entry);
self.notifications.push(committed.notification);
if let Some(envelope) = committed.envelope {
self.envelopes.push(envelope);
}
}
}
impl WriterState<'_> {
pub(super) fn materialize_commit_artifacts(
&self,
staged: &StagedCommittedEvent,
disk_pos: DiskPos,
interned_ids: CommitInternedIds,
frame: CommitFrameView<'_>,
) -> CommitArtifacts {
let coord = staged.coord.clone();
let position = staged.position();
let notification = Notification {
event_id: staged.meta.event_id,
correlation_id: staged.meta.correlation_id,
causation_id: staged.meta.causation_id,
coord: coord.clone(),
kind: staged.meta.kind,
sequence: staged.meta.global_sequence,
position,
};
let index_entry = IndexEntry {
event_id: staged.meta.event_id,
correlation_id: staged.meta.correlation_id,
causation_id: staged.meta.causation_id,
coord: coord.clone(),
entity_id: interned_ids.entity_id,
scope_id: interned_ids.scope_id,
kind: staged.meta.kind,
wall_ms: staged.timing.wall_ms,
clock: staged.timing.clock,
dag_lane: staged.timing.dag_lane,
dag_depth: staged.timing.dag_depth,
hash_chain: staged.hash_chain.clone(),
disk_pos,
global_sequence: staged.meta.global_sequence,
receipt_extensions: frame.receipt_extensions.clone(),
};
let sidx_entry = SidxEntry {
event_id: staged.meta.event_id,
entity_idx: 0,
scope_idx: 0,
kind: kind_to_raw(staged.meta.kind),
wall_ms: staged.timing.wall_ms,
clock: staged.timing.clock,
dag_lane: staged.timing.dag_lane,
dag_depth: staged.timing.dag_depth,
prev_hash: staged.hash_chain.prev_hash,
event_hash: staged.hash_chain.event_hash,
frame_offset: disk_pos.offset,
frame_length: disk_pos.length,
global_sequence: staged.meta.global_sequence,
correlation_id: staged.meta.correlation_id,
causation_id: staged.meta.causation_id.unwrap_or(0),
};
let envelope = if frame.emit_envelope {
staged
.stored_event(frame.payload_bytes, frame.flags)
.map(|stored| CommittedEventEnvelope {
notification: notification.clone(),
stored,
})
.ok()
} else {
None
};
CommitArtifacts {
index_entry,
sidx_entry,
notification,
envelope,
}
}
pub(super) fn materialize_batch_commit_artifacts(
&self,
prepared: &PreparedBatch,
staged: &[StagedCommittedEvent],
receipts: &[AppendReceipt],
) -> BatchCommitArtifacts {
let emit_envelope = self.reactor_subscribers.has_subscribers();
let mut artifacts = BatchCommitArtifacts::with_capacity(staged.len());
let interned_ids = prepared.interned_ids(self.index);
for ((item, staged), receipt) in prepared
.items()
.iter()
.zip(staged.iter())
.zip(receipts.iter())
{
let committed = self.materialize_commit_artifacts(
staged,
receipt.disk_pos,
CommitInternedIds {
entity_id: interned_ids.entity_id(item),
scope_id: interned_ids.scope_id(item),
},
CommitFrameView {
payload_bytes: item.payload_bytes(),
flags: item.options().flags,
receipt_extensions: &receipt.extensions,
emit_envelope,
},
);
artifacts.push(committed);
}
artifacts
}
pub(super) fn broadcast_commit_artifacts(
&self,
notifications: impl IntoIterator<Item = Notification>,
envelopes: impl IntoIterator<Item = CommittedEventEnvelope>,
) {
let push_notifications = broadcast_all(notifications, |notification| {
self.subscribers.broadcast(notification)
});
let push_envelopes = broadcast_all(envelopes, |envelope| {
self.reactor_subscribers.broadcast(envelope);
});
tracing::trace!(
target: "batpak::fanout",
push_notifications,
push_envelopes,
"commit fanout batch",
);
}
#[inline]
pub(super) fn publish_then_broadcast_unfenced(
&mut self,
publish_up_to: u64,
frontier_point: HlcPoint,
notifications: impl IntoIterator<Item = Notification>,
envelopes: impl IntoIterator<Item = CommittedEventEnvelope>,
) -> Result<(), crate::store::StoreError> {
self.index
.publish(publish_up_to, "publish_then_broadcast_unfenced")?;
self.broadcast_commit_artifacts(notifications, envelopes);
self.watermark_handle
.lock()
.advance_visible_and_emitted(frontier_point);
Ok(())
}
#[inline]
pub(super) fn fence_finish_then_broadcast(
&mut self,
token: u64,
publish_up_to: Option<u64>,
frontier_point: Option<HlcPoint>,
notifications: impl IntoIterator<Item = Notification>,
envelopes: impl IntoIterator<Item = CommittedEventEnvelope>,
) -> Result<(), crate::store::StoreError> {
self.index.finish_visibility_fence(token, publish_up_to)?;
self.broadcast_commit_artifacts(notifications, envelopes);
if let Some(point) = frontier_point {
self.watermark_handle
.lock()
.advance_visible_and_emitted(point);
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::broadcast_all;
#[test]
fn broadcast_all_counts_every_pushed_item() {
let mut pushed = Vec::new();
let count = broadcast_all([10, 20, 30], |item| pushed.push(*item));
assert_eq!(
count, 3,
"PROPERTY: fanout telemetry count must advance once per pushed item"
);
assert_eq!(
pushed,
vec![10, 20, 30],
"PROPERTY: count helper must still broadcast each item in order"
);
}
}