#![warn(
clippy::unwrap_used,
clippy::panic,
clippy::todo,
clippy::unimplemented,
clippy::too_many_lines
)]
#![cfg_attr(
test,
allow(
clippy::unwrap_used,
clippy::panic,
clippy::todo,
clippy::unimplemented,
clippy::too_many_lines
)
)]
use std::{
fmt::{Debug, Display},
ops::{Add, AddAssign, Range, Sub},
time::{SystemTime, UNIX_EPOCH},
};
use bytes::Bytes;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct TenantId(u64);
impl TenantId {
pub fn new(id: u64) -> Self {
Self(id)
}
pub fn from_stream_id(stream_id: StreamId) -> Self {
TenantId::from(u64::from(stream_id) >> 32)
}
}
impl Display for TenantId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl From<u64> for TenantId {
fn from(value: u64) -> Self {
Self(value)
}
}
impl From<TenantId> for u64 {
fn from(id: TenantId) -> Self {
id.0
}
}
pub const MAX_TENANT_ID_FOR_STREAM_ID: u64 = u32::MAX as u64;
#[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)]
pub enum StreamIdEncodingError {
#[error(
"tenant_id {tenant_id} exceeds the StreamId encoding limit of {max}; the encoding silently \
truncates which corrupts per-tenant filtering. Keep tenant ids ≤ u32::MAX."
)]
TenantIdTooLarge {
tenant_id: u64,
max: u64,
},
}
#[derive(
Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, Default,
)]
pub struct StreamId(u64);
impl StreamId {
pub fn new(id: u64) -> Self {
Self(id)
}
#[track_caller]
pub fn from_tenant_and_local(tenant_id: TenantId, local_id: u32) -> Self {
Self::try_from_tenant_and_local(tenant_id, local_id).expect(
"StreamId::from_tenant_and_local: tenant_id exceeds u32::MAX — use try_from_tenant_and_local for fallible construction",
)
}
pub fn try_from_tenant_and_local(
tenant_id: TenantId,
local_id: u32,
) -> Result<Self, StreamIdEncodingError> {
let tenant_raw = u64::from(tenant_id);
if tenant_raw > MAX_TENANT_ID_FOR_STREAM_ID {
return Err(StreamIdEncodingError::TenantIdTooLarge {
tenant_id: tenant_raw,
max: MAX_TENANT_ID_FOR_STREAM_ID,
});
}
let tenant_bits = tenant_raw << 32;
let local_bits = u64::from(local_id);
Ok(StreamId::from(tenant_bits | local_bits))
}
pub fn local_id(self) -> u32 {
(u64::from(self) & 0xFFFF_FFFF) as u32
}
pub fn tenant_id(self) -> TenantId {
TenantId::from_stream_id(self)
}
}
impl Add for StreamId {
type Output = StreamId;
fn add(self, rhs: Self) -> Self::Output {
StreamId::new(self.0.saturating_add(rhs.0))
}
}
impl Display for StreamId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl From<u64> for StreamId {
fn from(value: u64) -> Self {
Self(value)
}
}
impl From<StreamId> for u64 {
fn from(id: StreamId) -> Self {
id.0
}
}
#[derive(
Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, Default,
)]
pub struct Offset(u64);
impl Offset {
pub const ZERO: Offset = Offset(0);
pub fn new(offset: u64) -> Self {
Self(offset)
}
pub fn as_u64(&self) -> u64 {
self.0
}
pub fn as_usize(&self) -> usize {
self.0 as usize
}
}
impl Display for Offset {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl Add for Offset {
type Output = Self;
fn add(self, rhs: Self) -> Self::Output {
Self(self.0 + rhs.0)
}
}
impl AddAssign for Offset {
fn add_assign(&mut self, rhs: Self) {
self.0 += rhs.0;
}
}
impl Sub for Offset {
type Output = Self;
fn sub(self, rhs: Self) -> Self::Output {
Self(self.0 - rhs.0)
}
}
impl From<u64> for Offset {
fn from(value: u64) -> Self {
Self(value)
}
}
impl From<Offset> for u64 {
fn from(offset: Offset) -> Self {
offset.0
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct GroupId(u64);
impl Display for GroupId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl GroupId {
pub fn new(id: u64) -> Self {
Self(id)
}
}
impl From<u64> for GroupId {
fn from(value: u64) -> Self {
Self(value)
}
}
impl From<GroupId> for u64 {
fn from(id: GroupId) -> Self {
id.0
}
}
pub const HASH_LENGTH: usize = 32;
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct Hash([u8; HASH_LENGTH]);
impl Hash {
pub const GENESIS: Hash = Hash([0u8; HASH_LENGTH]);
pub fn from_bytes(bytes: [u8; HASH_LENGTH]) -> Self {
Self(bytes)
}
pub fn as_bytes(&self) -> &[u8; HASH_LENGTH] {
&self.0
}
pub fn is_genesis(&self) -> bool {
self.0 == [0u8; HASH_LENGTH]
}
}
impl Debug for Hash {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Hash({:02x}{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}...)",
self.0[0], self.0[1], self.0[2], self.0[3], self.0[4], self.0[5], self.0[6], self.0[7]
)
}
}
impl Display for Hash {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
for byte in &self.0 {
write!(f, "{byte:02x}")?;
}
Ok(())
}
}
impl Default for Hash {
fn default() -> Self {
Self::GENESIS
}
}
impl From<[u8; HASH_LENGTH]> for Hash {
fn from(bytes: [u8; HASH_LENGTH]) -> Self {
Self(bytes)
}
}
impl From<Hash> for [u8; HASH_LENGTH] {
fn from(hash: Hash) -> Self {
hash.0
}
}
impl AsRef<[u8]> for Hash {
fn as_ref(&self) -> &[u8] {
&self.0
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct Timestamp(u64);
impl Timestamp {
pub const EPOCH: Timestamp = Timestamp(0);
pub fn from_nanos(nanos: u64) -> Self {
Self(nanos)
}
pub fn as_nanos(&self) -> u64 {
self.0
}
pub fn as_secs(&self) -> u64 {
self.0 / 1_000_000_000
}
pub fn now() -> Self {
let duration = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("system clock is before Unix epoch");
Self(duration.as_nanos() as u64)
}
pub fn now_monotonic(last: Option<Timestamp>) -> Self {
let now = Self::now();
match last {
Some(prev) => {
if now.0 <= prev.0 {
Timestamp(prev.0.saturating_add(1))
} else {
now
}
}
None => now,
}
}
}
impl Display for Timestamp {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let secs = self.0 / 1_000_000_000;
let nanos = self.0 % 1_000_000_000;
write!(f, "{secs}.{nanos:09}")
}
}
impl Default for Timestamp {
fn default() -> Self {
Self::EPOCH
}
}
impl From<u64> for Timestamp {
fn from(nanos: u64) -> Self {
Self(nanos)
}
}
impl From<Timestamp> for u64 {
fn from(ts: Timestamp) -> Self {
ts.0
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
pub enum RecordKind {
#[default]
Data,
Checkpoint,
Tombstone,
}
impl RecordKind {
pub fn as_byte(&self) -> u8 {
match self {
RecordKind::Data => 0,
RecordKind::Checkpoint => 1,
RecordKind::Tombstone => 2,
}
}
pub fn from_byte(byte: u8) -> Option<Self> {
match byte {
0 => Some(RecordKind::Data),
1 => Some(RecordKind::Checkpoint),
2 => Some(RecordKind::Tombstone),
_ => None,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct RecordHeader {
pub offset: Offset,
pub prev_hash: Hash,
pub timestamp: Timestamp,
pub payload_len: u32,
pub record_kind: RecordKind,
}
impl RecordHeader {
pub fn new(
offset: Offset,
prev_hash: Hash,
timestamp: Timestamp,
payload_len: u32,
record_kind: RecordKind,
) -> Self {
Self {
offset,
prev_hash,
timestamp,
payload_len,
record_kind,
}
}
pub fn is_genesis(&self) -> bool {
self.offset == Offset::ZERO && self.prev_hash.is_genesis()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct AppliedIndex {
pub offset: Offset,
pub hash: Hash,
}
impl AppliedIndex {
pub fn new(offset: Offset, hash: Hash) -> Self {
Self { offset, hash }
}
pub fn genesis() -> Self {
Self {
offset: Offset::ZERO,
hash: Hash::GENESIS,
}
}
}
impl Default for AppliedIndex {
fn default() -> Self {
Self::genesis()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct Checkpoint {
pub offset: Offset,
pub chain_hash: Hash,
pub record_count: u64,
pub created_at: Timestamp,
}
impl Checkpoint {
pub fn new(offset: Offset, chain_hash: Hash, record_count: u64, created_at: Timestamp) -> Self {
debug_assert_eq!(
record_count,
offset.as_u64() + 1,
"record_count should equal offset + 1"
);
Self {
offset,
chain_hash,
record_count,
created_at,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct CheckpointPolicy {
pub every_n_records: u64,
pub on_shutdown: bool,
pub explicit_only: bool,
}
impl CheckpointPolicy {
pub fn every(n: u64) -> Self {
Self {
every_n_records: n,
on_shutdown: true,
explicit_only: false,
}
}
pub fn explicit_only() -> Self {
Self {
every_n_records: 0,
on_shutdown: false,
explicit_only: true,
}
}
pub fn should_checkpoint(&self, offset: Offset) -> bool {
if self.explicit_only {
return false;
}
if self.every_n_records == 0 {
return false;
}
(offset.as_u64() + 1) % self.every_n_records == 0
}
}
impl Default for CheckpointPolicy {
fn default() -> Self {
Self {
every_n_records: 1000,
on_shutdown: true,
explicit_only: false,
}
}
}
pub const IDEMPOTENCY_ID_LENGTH: usize = 16;
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct IdempotencyId([u8; IDEMPOTENCY_ID_LENGTH]);
impl IdempotencyId {
pub(crate) fn from_random_bytes(bytes: [u8; IDEMPOTENCY_ID_LENGTH]) -> Self {
debug_assert!(
bytes.iter().any(|&b| b != 0),
"idempotency ID bytes are all zeros"
);
Self(bytes)
}
pub fn from_bytes(bytes: [u8; IDEMPOTENCY_ID_LENGTH]) -> Self {
Self(bytes)
}
pub fn as_bytes(&self) -> &[u8; IDEMPOTENCY_ID_LENGTH] {
&self.0
}
pub fn generate() -> Self {
let mut bytes = [0u8; IDEMPOTENCY_ID_LENGTH];
getrandom::fill(&mut bytes).expect("CSPRNG failure is catastrophic");
Self::from_random_bytes(bytes)
}
}
impl Debug for IdempotencyId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "IdempotencyId(")?;
for byte in &self.0 {
write!(f, "{byte:02x}")?;
}
write!(f, ")")
}
}
impl Display for IdempotencyId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
for byte in &self.0 {
write!(f, "{byte:02x}")?;
}
Ok(())
}
}
impl From<[u8; IDEMPOTENCY_ID_LENGTH]> for IdempotencyId {
fn from(bytes: [u8; IDEMPOTENCY_ID_LENGTH]) -> Self {
Self::from_bytes(bytes)
}
}
impl From<IdempotencyId> for [u8; IDEMPOTENCY_ID_LENGTH] {
fn from(id: IdempotencyId) -> Self {
id.0
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct Generation(u64);
impl Generation {
pub const INITIAL: Generation = Generation(0);
pub fn new(value: u64) -> Self {
Self(value)
}
pub fn as_u64(&self) -> u64 {
self.0
}
pub fn next(&self) -> Self {
Generation(self.0.saturating_add(1))
}
}
impl Display for Generation {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "gen:{}", self.0)
}
}
impl Default for Generation {
fn default() -> Self {
Self::INITIAL
}
}
impl From<u64> for Generation {
fn from(value: u64) -> Self {
Self(value)
}
}
impl From<Generation> for u64 {
fn from(generation: Generation) -> Self {
generation.0
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum RecoveryReason {
NodeRestart,
QuorumLoss,
CorruptionDetected,
ManualIntervention,
}
impl Display for RecoveryReason {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
RecoveryReason::NodeRestart => write!(f, "node_restart"),
RecoveryReason::QuorumLoss => write!(f, "quorum_loss"),
RecoveryReason::CorruptionDetected => write!(f, "corruption_detected"),
RecoveryReason::ManualIntervention => write!(f, "manual_intervention"),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct RecoveryRecord {
pub generation: Generation,
pub previous_generation: Generation,
pub known_committed: Offset,
pub recovery_point: Offset,
pub discarded_range: Option<Range<Offset>>,
pub timestamp: Timestamp,
pub reason: RecoveryReason,
}
impl RecoveryRecord {
pub fn new(
generation: Generation,
previous_generation: Generation,
known_committed: Offset,
recovery_point: Offset,
discarded_range: Option<Range<Offset>>,
timestamp: Timestamp,
reason: RecoveryReason,
) -> Self {
debug_assert!(
generation > previous_generation,
"new generation must be greater than previous"
);
debug_assert!(
recovery_point <= known_committed,
"recovery point cannot exceed known committed"
);
Self {
generation,
previous_generation,
known_committed,
recovery_point,
discarded_range,
timestamp,
reason,
}
}
pub fn had_data_loss(&self) -> bool {
self.discarded_range.is_some()
}
pub fn discarded_count(&self) -> u64 {
self.discarded_range
.as_ref()
.map_or(0, |r| r.end.as_u64().saturating_sub(r.start.as_u64()))
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct StreamName(String);
impl StreamName {
pub fn new(name: impl Into<String>) -> Self {
Self(name.into())
}
pub fn as_str(&self) -> &str {
&self.0
}
}
impl Display for StreamName {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl From<String> for StreamName {
fn from(name: String) -> Self {
Self(name)
}
}
impl From<&str> for StreamName {
fn from(name: &str) -> Self {
Self(name.to_string())
}
}
impl From<StreamName> for String {
fn from(value: StreamName) -> Self {
value.0
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub enum DataClass {
PHI,
Deidentified,
PII,
Sensitive,
PCI,
Financial,
Confidential,
Public,
}
impl Default for DataClass {
fn default() -> Self {
Self::PHI
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub enum Placement {
Region(Region),
Global,
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub enum Region {
USEast1,
APSoutheast2,
Custom(String),
}
impl Region {
pub fn custom(name: impl Into<String>) -> Self {
Self::Custom(name.into())
}
}
impl Display for Region {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Region::USEast1 => write!(f, "us-east-1"),
Region::APSoutheast2 => write!(f, "ap-southeast-2"),
Region::Custom(custom) => write!(f, "{custom}"),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct StreamMetadata {
pub stream_id: StreamId,
pub stream_name: StreamName,
pub data_class: DataClass,
pub placement: Placement,
pub current_offset: Offset,
}
impl StreamMetadata {
pub fn new(
stream_id: StreamId,
stream_name: StreamName,
data_class: DataClass,
placement: Placement,
) -> Self {
Self {
stream_id,
stream_name,
data_class,
placement,
current_offset: Offset::default(),
}
}
}
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct BatchPayload {
pub stream_id: StreamId,
pub events: Vec<Bytes>,
pub expected_offset: Offset,
}
impl BatchPayload {
pub fn new(stream_id: StreamId, events: Vec<Bytes>, expected_offset: Offset) -> Self {
Self {
stream_id,
events,
expected_offset,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum AuditAction {
StreamCreated {
stream_id: StreamId,
stream_name: StreamName,
data_class: DataClass,
placement: Placement,
},
EventsAppended {
stream_id: StreamId,
count: u32,
from_offset: Offset,
},
TenantSealed {
tenant_id: TenantId,
reason: SealReason,
},
TenantUnsealed { tenant_id: TenantId },
MaskingPolicyCreated {
tenant_id: TenantId,
policy_name: String,
},
MaskingPolicyDropped {
tenant_id: TenantId,
policy_name: String,
},
MaskingPolicyAttached {
tenant_id: TenantId,
table_id: u64,
column_name: String,
policy_name: String,
},
MaskingPolicyDetached {
tenant_id: TenantId,
table_id: u64,
column_name: String,
},
UpsertApplied {
stream_id: StreamId,
resolution: UpsertResolution,
from_offset: Offset,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum UpsertResolution {
Inserted,
Updated,
NoOp,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum SealReason {
ForensicHold,
AuditInProgress,
BreachInvestigation,
LegalHold,
}
pub trait EventPersister: Send + Sync + Debug {
fn persist_blocking(
&self,
stream_id: StreamId,
events: Vec<Bytes>,
) -> Result<Offset, PersistError>;
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PersistError {
ConsensusFailed,
StorageError,
ShuttingDown,
}
impl std::fmt::Display for PersistError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ConsensusFailed => write!(f, "consensus failed after retries"),
Self::StorageError => write!(f, "storage I/O error"),
Self::ShuttingDown => write!(f, "system is shutting down"),
}
}
}
impl std::error::Error for PersistError {}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
pub enum CompressionKind {
#[default]
None = 0,
Lz4 = 1,
Zstd = 2,
}
impl CompressionKind {
pub fn as_byte(self) -> u8 {
self as u8
}
pub fn from_byte(byte: u8) -> Option<Self> {
match byte {
0 => Some(Self::None),
1 => Some(Self::Lz4),
2 => Some(Self::Zstd),
_ => None,
}
}
}
impl Display for CompressionKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::None => write!(f, "none"),
Self::Lz4 => write!(f, "lz4"),
Self::Zstd => write!(f, "zstd"),
}
}
}
pub mod flux_annotations;
pub mod domain;
pub use domain::{
AGGREGATE_BUDGET_DEFAULT_BYTES, AGGREGATE_BUDGET_MIN_BYTES, AggregateMemoryBudget,
AggregateMemoryBudgetError, BoundedSize, BoundedSizeError, ClearanceLevel, ClearanceLevelError,
DateField, DateFieldParseError, EmptyVecError, Interval, IntervalConstructionError,
NANOS_PER_DAY, NegativeSubstringLength, NonEmptyVec, SqlIdentifier, SqlIdentifierError,
SubstringRange,
};
#[cfg(test)]
mod tests;