batpak 0.8.2

Event sourcing with causal graphs and caller-defined gates. Sync API, no async runtime.
Documentation
use super::super::fanout::CommittedEventEnvelope;
use super::{
    ignore_closed_response_channel, AppendGuards, AppendReceipt, BatchAppendItem, Coordinate,
    Event, EventKind, Notification, StoreError, WriterState,
};
use crate::store::stats::HlcPoint;
use flume::Sender;

enum PendingFenceResponse {
    Single {
        respond: Sender<Result<AppendReceipt, StoreError>>,
        receipt: AppendReceipt,
    },
    Batch {
        respond: Sender<Result<Vec<AppendReceipt>, StoreError>>,
        receipts: Vec<AppendReceipt>,
    },
}

impl PendingFenceResponse {
    fn complete_cancelled(self) {
        match self {
            Self::Single { respond, .. } => {
                ignore_closed_response_channel(
                    respond.send(Err(StoreError::VisibilityFenceCancelled)),
                );
            }
            Self::Batch { respond, .. } => {
                ignore_closed_response_channel(
                    respond.send(Err(StoreError::VisibilityFenceCancelled)),
                );
            }
        }
    }

    fn complete_ok(self) {
        match self {
            Self::Single { respond, receipt } => {
                ignore_closed_response_channel(respond.send(Ok(receipt)));
            }
            Self::Batch { respond, receipts } => {
                ignore_closed_response_channel(respond.send(Ok(receipts)));
            }
        }
    }
}

pub(super) struct FenceLedger {
    pub(super) token: u64,
    pub(super) publish_up_to: Option<u64>,
    pub(super) frontier_point: Option<HlcPoint>,
    pub(super) notifications: Vec<Notification>,
    pub(super) envelopes: Vec<CommittedEventEnvelope>,
    responses: Vec<PendingFenceResponse>,
}

impl FenceLedger {
    pub(super) fn new(token: u64) -> Self {
        Self {
            token,
            publish_up_to: None,
            frontier_point: None,
            notifications: Vec::new(),
            envelopes: Vec::new(),
            responses: Vec::new(),
        }
    }

    pub(super) fn record_publish_up_to(&mut self, publish_up_to: u64, frontier_point: HlcPoint) {
        self.publish_up_to = Some(self.publish_up_to.unwrap_or(0).max(publish_up_to));
        self.frontier_point = Some(
            self.frontier_point
                .unwrap_or(HlcPoint::ORIGIN)
                .max(frontier_point),
        );
    }

    pub(super) fn extend_artifacts(
        &mut self,
        notifications: impl IntoIterator<Item = Notification>,
        envelopes: impl IntoIterator<Item = CommittedEventEnvelope>,
    ) {
        self.notifications.extend(notifications);
        self.envelopes.extend(envelopes);
    }

    fn push_response(&mut self, response: PendingFenceResponse) {
        self.responses.push(response);
    }

    fn complete_cancelled(self) {
        for response in self.responses {
            response.complete_cancelled();
        }
    }
}

#[derive(Debug)]
pub(super) enum DeferredReply {
    None,
    Sync {
        respond: Sender<Result<(), StoreError>>,
    },
    BeginVisibilityFence {
        token: u64,
        respond: Sender<Result<(), StoreError>>,
    },
    CommitVisibilityFence {
        token: u64,
        respond: Sender<Result<(), StoreError>>,
    },
    Shutdown {
        respond: Sender<Result<(), StoreError>>,
    },
}

impl DeferredReply {
    pub(super) fn send(
        self,
        state: &mut WriterState<'_>,
        sync_result: Result<(), StoreError>,
    ) -> Result<(), StoreError> {
        match self {
            Self::None => Ok(()),
            Self::Sync { respond } => {
                ignore_closed_response_channel(respond.send(sync_result));
                Ok(())
            }
            Self::BeginVisibilityFence { token, respond } => {
                let result = sync_result.and_then(|_| state.begin_visibility_fence(token));
                ignore_closed_response_channel(respond.send(result));
                Ok(())
            }
            Self::CommitVisibilityFence { token, respond } => {
                let result = sync_result.and_then(|_| state.commit_visibility_fence(token));
                ignore_closed_response_channel(respond.send(result));
                Ok(())
            }
            Self::Shutdown { respond } => {
                ignore_closed_response_channel(respond.send(sync_result));
                Ok(())
            }
        }
    }
}

#[derive(Debug)]
pub(super) struct CommandResult {
    pub(super) sync_event_delta: u32,
    pub(super) break_after_reply: bool,
    pub(super) must_sync_before_continue: bool,
    pub(super) exit_writer: bool,
    pub(super) deferred_reply: DeferredReply,
    pub(super) shutdown_drain_respond: Option<Sender<Result<(), StoreError>>>,
    pub(super) enter_group_commit_drain: bool,
}

impl CommandResult {
    pub(super) fn immediate(sync_event_delta: u32) -> Self {
        Self {
            sync_event_delta,
            break_after_reply: false,
            must_sync_before_continue: false,
            exit_writer: false,
            deferred_reply: DeferredReply::None,
            shutdown_drain_respond: None,
            enter_group_commit_drain: false,
        }
    }

    pub(super) fn break_after_reply(mut self) -> Self {
        self.break_after_reply = true;
        self
    }

    pub(super) fn break_after_reply_if(self, condition: bool) -> Self {
        if condition {
            self.break_after_reply()
        } else {
            self
        }
    }

    pub(super) fn with_sync(mut self, deferred_reply: DeferredReply) -> Self {
        self.must_sync_before_continue = true;
        self.deferred_reply = deferred_reply;
        self
    }

    pub(super) fn exit_writer(mut self) -> Self {
        self.exit_writer = true;
        self
    }

    pub(super) fn enter_shutdown_drain(mut self, respond: Sender<Result<(), StoreError>>) -> Self {
        self.exit_writer = true;
        self.shutdown_drain_respond = Some(respond);
        self
    }

    pub(super) fn enter_group_commit_drain(mut self) -> Self {
        self.enter_group_commit_drain = true;
        self
    }
}

impl WriterState<'_> {
    pub(super) fn auto_cancel_fence_on_shutdown(&mut self) {
        if let Some(fence) = self.fence_ledger.take() {
            tracing::warn!(
                token = fence.token,
                pending = fence.responses.len(),
                "auto-cancelling active visibility fence during shutdown"
            );
            if let Err(error) = self.index.cancel_visibility_fence(fence.token) {
                tracing::error!(
                    token = fence.token,
                    error = %error,
                    "failed to cancel active visibility fence during shutdown"
                );
            }
            if let Err(error) = self.persist_cancelled_visibility_ranges() {
                tracing::error!(
                    error = %error,
                    "failed to persist cancelled visibility ranges during shutdown"
                );
            }
            fence.complete_cancelled();
        }
    }

    fn with_matching_fence_ledger<R>(
        &mut self,
        token: u64,
        f: impl FnOnce(&mut Self, &mut FenceLedger) -> Result<R, StoreError>,
    ) -> Result<R, StoreError> {
        if self.fence_ledger.as_ref().map(|fence| fence.token) != Some(token) {
            return Err(StoreError::VisibilityFenceNotActive);
        }
        let Some(mut fence) = self.fence_ledger.take() else {
            return Err(StoreError::VisibilityFenceNotActive);
        };
        let result = f(self, &mut fence);
        self.fence_ledger = Some(fence);
        result
    }

    pub(super) fn handle_fence_append_command(
        &mut self,
        token: u64,
        coord: &Coordinate,
        event: Event<Vec<u8>>,
        kind: EventKind,
        guards: &AppendGuards,
        respond: Sender<Result<AppendReceipt, StoreError>>,
    ) -> Result<(), StoreError> {
        self.with_matching_fence_ledger(token, |state, fence| {
            let receipt = state.handle_append(coord, event, kind, guards, Some(fence))?;
            fence.push_response(PendingFenceResponse::Single { respond, receipt });
            Ok(())
        })
    }

    pub(super) fn handle_fence_append_batch_command(
        &mut self,
        token: u64,
        items: Vec<BatchAppendItem>,
        respond: Sender<Result<Vec<AppendReceipt>, StoreError>>,
    ) -> Result<(), StoreError> {
        self.with_matching_fence_ledger(token, |state, fence| {
            let receipts = state.handle_append_batch(items, Some(fence))?;
            fence.push_response(PendingFenceResponse::Batch { respond, receipts });
            Ok(())
        })
    }

    pub(super) fn begin_visibility_fence(&mut self, token: u64) -> Result<(), StoreError> {
        if self.fence_ledger.is_some() {
            return Err(StoreError::VisibilityFenceActive);
        }
        if self.index.active_visibility_fence() != Some(token) {
            return Err(StoreError::VisibilityFenceNotActive);
        }
        self.fence_ledger = Some(FenceLedger::new(token));
        Ok(())
    }

    pub(super) fn commit_visibility_fence(&mut self, token: u64) -> Result<(), StoreError> {
        let Some(fence) = self.fence_ledger.take() else {
            return Err(StoreError::VisibilityFenceNotActive);
        };
        if fence.token != token {
            self.fence_ledger = Some(fence);
            return Err(StoreError::VisibilityFenceNotActive);
        }

        let FenceLedger {
            publish_up_to,
            frontier_point,
            notifications,
            envelopes,
            responses,
            ..
        } = fence;
        self.fence_finish_then_broadcast(
            token,
            publish_up_to,
            frontier_point,
            notifications,
            envelopes,
        )?;
        for response in responses {
            response.complete_ok();
        }
        Ok(())
    }

    pub(super) fn cancel_visibility_fence(&mut self, token: u64) -> Result<(), StoreError> {
        let Some(fence) = self.fence_ledger.take() else {
            return Err(StoreError::VisibilityFenceNotActive);
        };
        if fence.token != token {
            self.fence_ledger = Some(fence);
            return Err(StoreError::VisibilityFenceNotActive);
        }

        self.index.cancel_visibility_fence(token)?;
        self.persist_cancelled_visibility_ranges()?;
        fence.complete_cancelled();
        Ok(())
    }

    fn persist_cancelled_visibility_ranges(&self) -> Result<(), StoreError> {
        crate::store::hidden_ranges::write_cancelled_ranges(
            &self.config.data_dir,
            &self.index.cancelled_visibility_ranges(),
        )
    }
}