pub const INVALID_CONFIG: &str = "LDB-0001";
pub const MISSING_CONFIG: &str = "LDB-0002";
pub const UNRESOLVED_CONFIG_VAR: &str = "LDB-0003";
pub const SHUTDOWN: &str = "LDB-0004";
pub const INVALID_OPERATION: &str = "LDB-0005";
pub const SCHEMA_MISMATCH: &str = "LDB-0006";
pub const SQL_UNSUPPORTED: &str = "LDB-1001";
pub const SQL_PLANNING_FAILED: &str = "LDB-1002";
pub const SQL_COLUMN_NOT_FOUND: &str = "LDB-1100";
pub const SQL_TABLE_NOT_FOUND: &str = "LDB-1101";
pub const SQL_TYPE_MISMATCH: &str = "LDB-1200";
pub const WATERMARK_REQUIRED: &str = "LDB-2001";
pub const WINDOW_INVALID: &str = "LDB-2002";
pub const WINDOW_SIZE_INVALID: &str = "LDB-2003";
pub const LATE_DATA_REJECTED: &str = "LDB-2004";
pub const JOIN_KEY_MISSING: &str = "LDB-3001";
pub const JOIN_TIME_BOUND_MISSING: &str = "LDB-3002";
pub const TEMPORAL_JOIN_NO_PK: &str = "LDB-3003";
pub const JOIN_TYPE_UNSUPPORTED: &str = "LDB-3004";
pub const SERIALIZATION_FAILED: &str = "LDB-4001";
pub const DESERIALIZATION_FAILED: &str = "LDB-4002";
pub const JSON_PARSE_ERROR: &str = "LDB-4003";
pub const BASE64_DECODE_ERROR: &str = "LDB-4004";
pub const STATE_KEY_MISSING: &str = "LDB-4005";
pub const STATE_CORRUPTION: &str = "LDB-4006";
pub const CONNECTOR_CONNECTION_FAILED: &str = "LDB-5001";
pub const CONNECTOR_AUTH_FAILED: &str = "LDB-5002";
pub const CONNECTOR_READ_ERROR: &str = "LDB-5003";
pub const CONNECTOR_WRITE_ERROR: &str = "LDB-5004";
pub const CONNECTOR_CONFIG_ERROR: &str = "LDB-5005";
pub const SOURCE_NOT_FOUND: &str = "LDB-5010";
pub const SINK_NOT_FOUND: &str = "LDB-5011";
pub const SOURCE_ALREADY_EXISTS: &str = "LDB-5012";
pub const SINK_ALREADY_EXISTS: &str = "LDB-5013";
pub const CONNECTOR_SERDE_ERROR: &str = "LDB-5020";
pub const CONNECTOR_SCHEMA_ERROR: &str = "LDB-5021";
pub const EXACTLY_ONCE_NON_REPLAYABLE: &str = "LDB-5030";
pub const EXACTLY_ONCE_SINK_UNSUPPORTED: &str = "LDB-5031";
pub const EXACTLY_ONCE_NO_CHECKPOINT: &str = "LDB-5032";
pub const MIXED_DELIVERY_CAPABILITIES: &str = "LDB-5033";
pub const CHECKPOINT_FAILED: &str = "LDB-6001";
pub const CHECKPOINT_NOT_FOUND: &str = "LDB-6002";
pub const RECOVERY_FAILED: &str = "LDB-6003";
pub const SINK_ROLLBACK_FAILED: &str = "LDB-6004";
pub const WAL_ERROR: &str = "LDB-6005";
pub const WAL_INVALID_LENGTH: &str = "LDB-6006";
pub const WAL_CHECKSUM_MISMATCH: &str = "LDB-6007";
pub const MANIFEST_PERSIST_FAILED: &str = "LDB-6008";
pub const CHECKPOINT_PRUNE_FAILED: &str = "LDB-6009";
pub const SIDECAR_CORRUPTION: &str = "LDB-6010";
pub const OFFSET_METADATA_MISSING: &str = "LDB-6011";
pub const DURABILITY_GATE_MISS: &str = "LDB-6020";
pub const DURABILITY_GATE_ROLLBACK_FAILED: &str = "LDB-6021";
pub const DURABILITY_GATE_BACKEND_ERROR: &str = "LDB-6022";
pub const DURABILITY_GATE_ROLLBACK_ON_ERROR_FAILED: &str = "LDB-6023";
pub const QUERY_EXECUTION_FAILED: &str = "LDB-7001";
pub const ARROW_ERROR: &str = "LDB-7002";
pub const PLAN_OPTIMIZATION_FAILED: &str = "LDB-7003";
pub const INTERNAL: &str = "LDB-8001";
pub const PIPELINE_ERROR: &str = "LDB-8002";
pub const MATERIALIZED_VIEW_ERROR: &str = "LDB-8003";
pub const QUERY_PIPELINE_ERROR: &str = "LDB-8004";
pub const NO_COMPILED_PROJECTION: &str = "LDB-8050";
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u16)]
pub enum HotPathError {
LateEvent = 0x0001,
StateKeyMissing = 0x0002,
Backpressure = 0x0003,
SerializationOverflow = 0x0004,
SchemaMismatch = 0x0005,
AggregateStateCorruption = 0x0006,
QueueFull = 0x0007,
ChannelClosed = 0x0008,
}
impl HotPathError {
#[must_use]
pub const fn message(self) -> &'static str {
match self {
Self::LateEvent => "Event arrived after watermark; dropped",
Self::StateKeyMissing => "State key not found in store",
Self::Backpressure => "Downstream backpressure; event buffered",
Self::SerializationOverflow => "Serialization buffer capacity exceeded",
Self::SchemaMismatch => "Record batch schema does not match expected",
Self::AggregateStateCorruption => "Aggregate state checksum mismatch detected",
Self::QueueFull => "Queue is full; cannot push event",
Self::ChannelClosed => "Channel is closed or disconnected",
}
}
#[must_use]
pub const fn code(self) -> u16 {
self as u16
}
#[must_use]
pub const fn ldb_code(self) -> &'static str {
match self {
Self::LateEvent => LATE_DATA_REJECTED,
Self::StateKeyMissing => STATE_KEY_MISSING,
Self::Backpressure => "LDB-8010",
Self::SerializationOverflow => SERIALIZATION_FAILED,
Self::SchemaMismatch => SCHEMA_MISMATCH,
Self::AggregateStateCorruption => STATE_CORRUPTION,
Self::QueueFull => "LDB-8011",
Self::ChannelClosed => "LDB-8012",
}
}
}
impl std::fmt::Display for HotPathError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "[{}] {}", self.ldb_code(), self.message())
}
}
impl std::error::Error for HotPathError {}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn hot_path_error_is_copy_and_small() {
let e = HotPathError::LateEvent;
let e2 = e; assert_eq!(e, e2);
assert_eq!(std::mem::size_of::<HotPathError>(), 2);
}
#[test]
fn hot_path_error_codes_are_nonzero() {
let variants = [
HotPathError::LateEvent,
HotPathError::StateKeyMissing,
HotPathError::Backpressure,
HotPathError::SerializationOverflow,
HotPathError::SchemaMismatch,
HotPathError::AggregateStateCorruption,
HotPathError::QueueFull,
HotPathError::ChannelClosed,
];
for v in &variants {
assert!(v.code() > 0, "{v:?} has zero code");
assert!(!v.message().is_empty(), "{v:?} has empty message");
assert!(
v.ldb_code().starts_with("LDB-"),
"{v:?} has bad ldb_code: {}",
v.ldb_code()
);
}
}
#[test]
fn hot_path_error_display() {
let e = HotPathError::LateEvent;
let s = e.to_string();
assert!(s.starts_with("[LDB-"));
assert!(s.contains("watermark"));
}
#[test]
fn error_codes_are_stable_strings() {
assert_eq!(INVALID_CONFIG, "LDB-0001");
assert_eq!(SERIALIZATION_FAILED, "LDB-4001");
assert_eq!(CHECKPOINT_FAILED, "LDB-6001");
assert_eq!(INTERNAL, "LDB-8001");
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum WarningSeverity {
Info,
Warning,
Error,
}