use crate::coordinate::Coordinate;
use crate::event::{EventKind, EventPayload, StoredEvent};
use crate::id::{CausationId, CorrelationId, EventId, IdempotencyKey};
use crate::store::gate::DurabilityGate;
use crate::store::index::DiskPos;
use crate::store::StoreError;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::marker::PhantomData;
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub enum CausationRef {
#[default]
None,
Absolute(u128),
PriorItem(usize),
}
impl CausationRef {
pub(crate) const fn uses_options_fallback(self) -> bool {
matches!(self, Self::None)
}
pub fn absolute_typed(id: crate::id::CausationId) -> Self {
use crate::id::EntityIdType;
Self::Absolute(id.as_u128())
}
pub(crate) fn resolve(
self,
fallback: Option<u128>,
item_index: usize,
prior_event_id: impl FnOnce(usize) -> u128,
) -> Result<Option<u128>, StoreError> {
match self {
Self::None => Ok(fallback),
Self::Absolute(0) => Ok(None),
Self::Absolute(id) => Ok(Some(id)),
Self::PriorItem(prior_idx) => {
if prior_idx >= item_index {
return Err(StoreError::InvalidCausation {
prior_idx,
item_index,
reason: "PriorItem causation must reference earlier batch item".into(),
});
}
Ok(Some(prior_event_id(prior_idx)))
}
}
}
}
#[derive(Clone, Debug)]
pub struct BatchAppendItem {
coord: Coordinate,
kind: EventKind,
payload_bytes: Vec<u8>,
options: AppendOptions,
causation: CausationRef,
}
impl BatchAppendItem {
pub fn new(
coord: Coordinate,
kind: EventKind,
payload: &impl Serialize,
options: AppendOptions,
causation: CausationRef,
) -> Result<Self, StoreError> {
let payload_bytes = crate::encoding::to_bytes(payload)
.map_err(|e| StoreError::Serialization(Box::new(e)))?;
Ok(Self {
coord,
kind,
payload_bytes,
options,
causation,
})
}
pub fn typed<T: EventPayload>(
coord: Coordinate,
payload: &T,
options: AppendOptions,
causation: CausationRef,
) -> Result<Self, StoreError> {
Self::new(coord, T::KIND, payload, options, causation)
}
pub fn from_msgpack_bytes(
coord: Coordinate,
kind: EventKind,
payload_bytes: Vec<u8>,
options: AppendOptions,
causation: CausationRef,
) -> Self {
Self {
coord,
kind,
payload_bytes,
options,
causation,
}
}
pub(crate) fn into_parts(
self,
) -> (Coordinate, EventKind, Vec<u8>, AppendOptions, CausationRef) {
(
self.coord,
self.kind,
self.payload_bytes,
self.options,
self.causation,
)
}
pub fn options(&self) -> AppendOptions {
self.options.clone()
}
pub fn causation(&self) -> CausationRef {
self.causation
}
pub fn coord(&self) -> &Coordinate {
&self.coord
}
pub fn kind(&self) -> EventKind {
self.kind
}
pub fn payload_bytes(&self) -> &[u8] {
&self.payload_bytes
}
pub(crate) fn with_options(mut self, options: AppendOptions) -> Self {
self.options = options;
self
}
#[must_use]
pub fn with_extension(mut self, key: ExtensionKey, bytes: impl Into<EncodedBytes>) -> Self {
self.options = self.options.with_extension(key, bytes);
self
}
#[must_use]
pub fn with_receipt_extension<P: ReceiptExtensionNamespace>(
mut self,
key: ReceiptExtensionKey<P>,
value: ReceiptExtensionValue<P>,
) -> Self {
self.options = self.options.with_receipt_extension(key, value);
self
}
#[must_use]
pub fn with_extensions(mut self, extensions: BTreeMap<ExtensionKey, EncodedBytes>) -> Self {
self.options = self.options.with_extensions(extensions);
self
}
}
#[derive(Clone, Debug)]
#[non_exhaustive]
pub struct AppendReceipt {
pub event_id: EventId,
pub sequence: u64,
pub(crate) disk_pos: DiskPos,
pub content_hash: [u8; 32],
pub key_id: [u8; 32],
pub signature: Option<[u8; 64]>,
pub extensions: BTreeMap<ExtensionKey, EncodedBytes>,
}
#[derive(Clone, Debug)]
#[non_exhaustive]
pub struct DenialReceipt {
pub event_id: EventId,
pub sequence: u64,
pub(crate) disk_pos: DiskPos,
pub content_hash: [u8; 32],
pub key_id: [u8; 32],
pub signature: Option<[u8; 64]>,
pub extensions: BTreeMap<ExtensionKey, EncodedBytes>,
}
pub type EncodedBytes = Vec<u8>;
pub const SIGNING_DOWNGRADE_SCHEMA_VERSION: u16 = 1;
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[non_exhaustive]
pub struct SigningDowngradeBody {
pub schema_version: u16,
pub reason: SigningDowngradeReason,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[non_exhaustive]
pub enum SigningDowngradeReason {
CoverBuildFailed {
encoding_error: String,
},
}
impl AppendReceipt {
#[must_use]
pub const fn disk_pos(&self) -> DiskPos {
self.disk_pos
}
#[must_use]
pub fn signing_downgrade(&self) -> Option<SigningDowngradeBody> {
self.extensions
.get(&signing_downgrade_extension_key())
.and_then(|bytes| crate::encoding::from_bytes(bytes).ok())
}
}
impl DenialReceipt {
#[must_use]
pub const fn disk_pos(&self) -> DiskPos {
self.disk_pos
}
}
pub(crate) fn encoded_receipt_extensions_len(
extensions: &BTreeMap<ExtensionKey, EncodedBytes>,
) -> Result<usize, StoreError> {
if extensions.is_empty() {
return Ok(0);
}
crate::canonical::to_bytes(extensions)
.map(|bytes| bytes.len())
.map_err(|error| StoreError::Serialization(Box::new(error)))
}
pub(crate) fn checked_append_bytes(
payload_len: usize,
extensions: &BTreeMap<ExtensionKey, EncodedBytes>,
) -> Result<usize, StoreError> {
let extension_len = encoded_receipt_extensions_len(extensions)?;
payload_len
.checked_add(extension_len)
.ok_or_else(|| StoreError::ser_msg("append bytes overflow usize"))
}
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct ExtensionKey(String);
pub trait ReceiptExtensionNamespace {
const PREFIX: &'static str;
}
pub struct SigningExtensionNamespace;
impl ReceiptExtensionNamespace for SigningExtensionNamespace {
const PREFIX: &'static str = "batpak.signing";
}
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct ReceiptExtensionKey<P: ReceiptExtensionNamespace> {
raw: ExtensionKey,
_namespace: PhantomData<P>,
}
impl<P: ReceiptExtensionNamespace> Clone for ReceiptExtensionKey<P> {
fn clone(&self) -> Self {
Self {
raw: self.raw.clone(),
_namespace: PhantomData,
}
}
}
impl<P: ReceiptExtensionNamespace> ReceiptExtensionKey<P> {
pub fn new(field: impl AsRef<str>) -> Result<Self, ExtensionKeyError> {
let raw = ExtensionKey::new(format!("{}.{}", P::PREFIX, field.as_ref()))?;
Ok(Self {
raw,
_namespace: PhantomData,
})
}
#[must_use]
pub fn as_key(&self) -> &ExtensionKey {
&self.raw
}
}
#[derive(Debug, PartialEq, Eq)]
pub struct ReceiptExtensionValue<P: ReceiptExtensionNamespace> {
bytes: EncodedBytes,
_namespace: PhantomData<P>,
}
impl<P: ReceiptExtensionNamespace> Clone for ReceiptExtensionValue<P> {
fn clone(&self) -> Self {
Self {
bytes: self.bytes.clone(),
_namespace: PhantomData,
}
}
}
impl<P: ReceiptExtensionNamespace> ReceiptExtensionValue<P> {
pub fn new(bytes: impl Into<EncodedBytes>) -> Self {
Self {
bytes: bytes.into(),
_namespace: PhantomData,
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub enum ExtensionKeyError {
Empty,
NonAscii,
TooLong,
InvalidNamespaceFormat,
ReservedNamespace,
}
impl std::fmt::Display for ExtensionKeyError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Empty => write!(f, "extension key is empty"),
Self::NonAscii => write!(f, "extension key must be ASCII"),
Self::TooLong => write!(f, "extension key exceeds maximum length"),
Self::InvalidNamespaceFormat => {
write!(
f,
"extension key must have exactly one non-empty namespace separator"
)
}
Self::ReservedNamespace => {
write!(f, "extension key uses the reserved batpak namespace")
}
}
}
}
impl std::error::Error for ExtensionKeyError {}
impl ExtensionKey {
const MAX_LEN: usize = 256;
pub fn new(key: impl Into<String>) -> Result<Self, ExtensionKeyError> {
let key = key.into();
if key.is_empty() {
return Err(ExtensionKeyError::Empty);
}
if !key.is_ascii() {
return Err(ExtensionKeyError::NonAscii);
}
if key.len() > Self::MAX_LEN {
return Err(ExtensionKeyError::TooLong);
}
let Some((prefix, field)) = key.split_once('.') else {
return Err(ExtensionKeyError::InvalidNamespaceFormat);
};
if prefix.is_empty() || field.is_empty() || field.contains('.') {
return Err(ExtensionKeyError::InvalidNamespaceFormat);
}
if prefix == "batpak" {
return Err(ExtensionKeyError::ReservedNamespace);
}
Ok(Self(key))
}
#[must_use]
pub(crate) fn reserved(key: impl Into<String>) -> Self {
let key = key.into();
debug_assert!(key.starts_with("batpak."));
debug_assert!(key.is_ascii());
debug_assert!(key.len() <= Self::MAX_LEN);
debug_assert!(key.split('.').all(|segment| !segment.is_empty()));
Self(key)
}
#[must_use]
pub fn as_str(&self) -> &str {
&self.0
}
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub struct AppendPositionHint {
pub lane: u32,
pub depth: u32,
}
impl AppendPositionHint {
pub const fn new(lane: u32, depth: u32) -> Self {
Self { lane, depth }
}
}
#[derive(Clone, Debug)]
pub struct AppendOptions {
pub expected_sequence: Option<u32>,
pub idempotency_key: Option<IdempotencyKey>,
pub correlation_id: Option<CorrelationId>,
pub causation_id: Option<CausationId>,
pub position_hint: Option<AppendPositionHint>,
pub flags: u8,
pub gate: Option<DurabilityGate>,
pub extensions: BTreeMap<ExtensionKey, EncodedBytes>,
}
impl AppendOptions {
pub fn new() -> Self {
Self {
expected_sequence: None,
idempotency_key: None,
correlation_id: None,
causation_id: None,
position_hint: None,
flags: 0,
gate: None,
extensions: BTreeMap::new(),
}
}
pub fn with_cas(mut self, seq: u32) -> Self {
self.expected_sequence = Some(seq);
self
}
pub fn with_idempotency(mut self, key: IdempotencyKey) -> Self {
self.idempotency_key = Some(key);
self
}
pub fn with_flags(mut self, flags: u8) -> Self {
self.flags = flags;
self
}
pub fn with_correlation(mut self, id: CorrelationId) -> Self {
self.correlation_id = Some(id);
self
}
pub fn with_causation(mut self, id: CausationId) -> Self {
use crate::id::EntityIdType;
if id.as_u128() != 0 {
self.causation_id = Some(id);
}
self
}
pub fn with_position_hint(mut self, hint: AppendPositionHint) -> Self {
self.position_hint = Some(hint);
self
}
pub fn with_gate(mut self, gate: DurabilityGate) -> Self {
self.gate = Some(gate);
self
}
#[must_use]
pub fn with_extension(mut self, key: ExtensionKey, bytes: impl Into<EncodedBytes>) -> Self {
self.extensions.insert(key, bytes.into());
self
}
#[must_use]
pub fn with_receipt_extension<P: ReceiptExtensionNamespace>(
mut self,
key: ReceiptExtensionKey<P>,
value: ReceiptExtensionValue<P>,
) -> Self {
self.extensions.insert(key.raw, value.bytes);
self
}
#[must_use]
pub fn with_extensions(mut self, extensions: BTreeMap<ExtensionKey, EncodedBytes>) -> Self {
self.extensions = extensions;
self
}
}
impl Default for AppendOptions {
fn default() -> Self {
Self::new()
}
}
pub type RetentionPredicate = Box<dyn Fn(&StoredEvent<serde_json::Value>) -> bool + Send>;
#[non_exhaustive]
pub enum CompactionStrategy {
Merge,
Retention(RetentionPredicate),
Tombstone(RetentionPredicate),
}
pub struct CompactionConfig {
pub strategy: CompactionStrategy,
pub min_segments: usize,
}
impl Default for CompactionConfig {
fn default() -> Self {
Self {
strategy: CompactionStrategy::Merge,
min_segments: 2,
}
}
}
pub(crate) fn checked_payload_len(payload_bytes: &[u8]) -> Result<u32, StoreError> {
u32::try_from(payload_bytes.len())
.map_err(|_| StoreError::ser_msg("payload size exceeds u32::MAX (4GB limit)"))
}
pub(crate) fn signing_downgrade_extension_key() -> ExtensionKey {
ExtensionKey::reserved("batpak.signing.downgrade")
}
impl SigningDowngradeBody {
pub(crate) fn cover_build_failed(error: impl Into<String>) -> Self {
Self {
schema_version: SIGNING_DOWNGRADE_SCHEMA_VERSION,
reason: SigningDowngradeReason::CoverBuildFailed {
encoding_error: error.into(),
},
}
}
pub(crate) fn encode_extension(&self) -> Result<EncodedBytes, StoreError> {
crate::encoding::to_bytes(self).map_err(|error| StoreError::Serialization(Box::new(error)))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn with_causation_zero_is_noop() {
let opts = AppendOptions::default().with_causation(CausationId::from(0u128));
assert_eq!(
opts.causation_id, None,
"0 is the wire sentinel — must not become Some(0)"
);
}
#[test]
fn with_causation_nonzero_is_recorded() {
let opts = AppendOptions::default().with_causation(CausationId::from(42u128));
assert_eq!(opts.causation_id, Some(CausationId::from(42u128)));
}
#[test]
fn causation_ref_absolute_zero_resolves_to_none() {
let result = CausationRef::Absolute(0).resolve(None, 0, |_| unreachable!());
assert_eq!(
result.expect("resolve must not error"),
None,
"Absolute(0) must resolve to None"
);
}
#[test]
fn causation_ref_absolute_nonzero_resolves_to_some() {
let result = CausationRef::Absolute(99).resolve(None, 0, |_| unreachable!());
assert_eq!(result.expect("resolve must not error"), Some(99));
}
#[test]
fn extension_key_reserved_constructor_allows_batpak_namespace() {
let key = ExtensionKey::reserved("batpak.signing.downgrade");
assert_eq!(key.as_str(), "batpak.signing.downgrade");
}
#[test]
fn extension_key_rejects_keys_over_max_length() {
let too_long = format!("acme.{}", "a".repeat(252));
assert_eq!(ExtensionKey::new(too_long), Err(ExtensionKeyError::TooLong));
}
#[test]
fn extension_key_error_preserves_display_and_error_trait() {
fn assert_error_trait<E: std::error::Error>() {}
assert_error_trait::<ExtensionKeyError>();
assert_eq!(
ExtensionKeyError::InvalidNamespaceFormat.to_string(),
"extension key must have exactly one non-empty namespace separator"
);
}
}