use std::time::SystemTime;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use super::mutation::{MemoryMutation, OpId, TenantId};
use crate::version::VersionError;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct CommitReceipt {
pub op_id: OpId,
pub tenant_id: TenantId,
pub term: u64,
pub log_index: u64,
pub committed_at: SystemTime,
pub applied_at: Option<SystemTime>,
}
pub trait CommitObserver: Send + Sync {
fn after_commit(&self, tenant_id: TenantId, mutation: &MemoryMutation);
}
#[derive(Debug, Clone)]
pub struct CommitOptions {
pub expected_log_index: Option<u64>,
pub wait_for_apply: bool,
pub op_id: Option<super::mutation::OpId>,
}
impl Default for CommitOptions {
fn default() -> Self {
Self::new()
}
}
impl CommitOptions {
pub fn new() -> Self {
Self {
expected_log_index: None,
wait_for_apply: true,
op_id: None,
}
}
pub fn no_wait(mut self) -> Self {
self.wait_for_apply = false;
self
}
pub fn expecting_index(mut self, idx: u64) -> Self {
self.expected_log_index = Some(idx);
self
}
pub fn with_op_id(mut self, op_id: super::mutation::OpId) -> Self {
self.op_id = Some(op_id);
self
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct CommittedEntry {
pub op_id: OpId,
pub tenant_id: TenantId,
pub term: u64,
pub log_index: u64,
pub mutation: MemoryMutation,
pub committed_at: SystemTime,
pub applied_at: Option<SystemTime>,
}
#[derive(Debug, Clone, PartialEq, Error)]
pub enum CommitError {
#[error(
"op_id collision for tenant {tenant_id}: existing log_index {existing_index} \
has different mutation payload than the one being committed. Client bug: \
the same op_id must always carry the same mutation."
)]
OpIdCollision {
op_id: OpId,
tenant_id: TenantId,
existing_index: u64,
},
#[error(
"expected log_index {expected} for tenant {tenant_id}, but next index is {actual}. \
Concurrent write happened — caller should re-read state and retry."
)]
UnexpectedLogIndex {
tenant_id: TenantId,
expected: u64,
actual: u64,
},
#[error("version check failed: {0}")]
Version(#[from] VersionError),
#[error("mutation variant `{variant}` not yet implemented (planned in RFC {planned_rfc})")]
NotYetImplemented {
variant: &'static str,
planned_rfc: &'static str,
},
#[error("storage failure: {message}")]
StorageFailure { message: String },
#[error("server shutting down")]
Shutdown,
#[error("not the cluster leader; redirect to leader id={leader_id:?} addr={leader_addr:?}")]
NotLeader {
leader_id: Option<u64>,
leader_addr: Option<String>,
},
#[error("commit + apply timed out for op_id {op_id}; retry idempotently against leader")]
CommitTimeout { op_id: OpId },
}
impl CommitError {
pub fn metric_label(&self) -> &'static str {
match self {
CommitError::OpIdCollision { .. } => "op_id_collision",
CommitError::UnexpectedLogIndex { .. } => "unexpected_log_index",
CommitError::Version(_) => "version",
CommitError::NotYetImplemented { .. } => "not_yet_implemented",
CommitError::StorageFailure { .. } => "storage_failure",
CommitError::Shutdown => "shutdown",
CommitError::NotLeader { .. } => "not_leader",
CommitError::CommitTimeout { .. } => "commit_timeout",
}
}
pub fn is_retryable(&self) -> bool {
matches!(
self,
CommitError::UnexpectedLogIndex { .. }
| CommitError::StorageFailure { .. }
| CommitError::NotLeader { .. }
| CommitError::CommitTimeout { .. }
)
}
}
#[async_trait]
pub trait MutationCommitter: Send + Sync {
async fn commit(
&self,
tenant_id: TenantId,
mutation: MemoryMutation,
opts: CommitOptions,
) -> Result<CommitReceipt, CommitError>;
async fn read_range(
&self,
tenant_id: TenantId,
from_index: u64,
limit: usize,
) -> Result<Vec<CommittedEntry>, CommitError>;
async fn high_watermark(&self, tenant_id: TenantId) -> Result<u64, CommitError>;
async fn list_active_tenants(&self) -> Result<Vec<TenantId>, CommitError>;
async fn ensure_linearizable(&self) -> Result<(), CommitError> {
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn commit_options_builder_pattern() {
let opts = CommitOptions::new().no_wait().expecting_index(42);
assert!(!opts.wait_for_apply);
assert_eq!(opts.expected_log_index, Some(42));
}
#[test]
fn commit_options_default_is_safe() {
let from_new = CommitOptions::new();
assert!(
from_new.wait_for_apply,
"Self::new() must default wait_for_apply to true"
);
assert_eq!(from_new.expected_log_index, None);
assert!(from_new.op_id.is_none());
let from_default = CommitOptions::default();
assert!(
from_default.wait_for_apply,
"Default::default() must equal Self::new() — see issue #37 (do NOT revert to #[derive(Default)])"
);
assert_eq!(from_default.expected_log_index, None);
assert!(from_default.op_id.is_none());
}
#[test]
fn metric_labels_are_stable() {
assert_eq!(
CommitError::OpIdCollision {
op_id: OpId::new_random(),
tenant_id: TenantId::new(1),
existing_index: 0,
}
.metric_label(),
"op_id_collision"
);
assert_eq!(
CommitError::UnexpectedLogIndex {
tenant_id: TenantId::new(1),
expected: 1,
actual: 2,
}
.metric_label(),
"unexpected_log_index"
);
assert_eq!(
CommitError::NotYetImplemented {
variant: "PurgeMemory",
planned_rfc: "011",
}
.metric_label(),
"not_yet_implemented"
);
assert_eq!(
CommitError::StorageFailure {
message: "disk full".into(),
}
.metric_label(),
"storage_failure"
);
assert_eq!(CommitError::Shutdown.metric_label(), "shutdown");
}
#[test]
fn retryable_classification_is_correct() {
assert!(!CommitError::OpIdCollision {
op_id: OpId::new_random(),
tenant_id: TenantId::new(1),
existing_index: 0,
}
.is_retryable());
assert!(CommitError::UnexpectedLogIndex {
tenant_id: TenantId::new(1),
expected: 1,
actual: 2,
}
.is_retryable());
assert!(CommitError::StorageFailure {
message: "transient".into(),
}
.is_retryable());
assert!(!CommitError::Shutdown.is_retryable());
assert!(!CommitError::NotYetImplemented {
variant: "x",
planned_rfc: "y",
}
.is_retryable());
}
#[test]
fn commit_receipt_serde_round_trip() {
let r = CommitReceipt {
op_id: OpId::new_random(),
tenant_id: TenantId::new(7),
term: 1,
log_index: 42,
committed_at: SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(1_700_000_000),
applied_at: Some(
SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(1_700_000_001),
),
};
let json = serde_json::to_string(&r).unwrap();
let back: CommitReceipt = serde_json::from_str(&json).unwrap();
assert_eq!(r, back);
}
}