aedb 0.1.10

Embedded Rust storage engine with transactional commits, WAL durability, and snapshot-consistent reads
Documentation
use std::time::{Duration, Instant};

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(super) enum GroupCommitPhase {
    Filling,
    Flushing,
    Complete,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(super) enum GroupCommitFlushReason {
    MaxGroupSize,
    MaxGroupDelay,
    IngressDrained,
    StructuralBarrier,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(super) struct GroupCommitPolicy {
    pub max_group_size: usize,
    pub max_group_delay: Duration,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(super) struct GroupCommitEpochSnapshot {
    pub phase: GroupCommitPhase,
    pub epoch: u64,
    pub pending_commits: usize,
    pub flush_reason: Option<GroupCommitFlushReason>,
}

#[derive(Debug)]
pub(super) struct GroupCommitStateMachine {
    phase: GroupCommitPhase,
    epoch: u64,
    policy: GroupCommitPolicy,
    filling_started: Option<Instant>,
    pending_commits: usize,
    flush_reason: Option<GroupCommitFlushReason>,
}

impl GroupCommitStateMachine {
    pub(super) fn new(policy: GroupCommitPolicy) -> Self {
        Self {
            phase: GroupCommitPhase::Complete,
            epoch: 0,
            policy: sanitize_policy(policy),
            filling_started: None,
            pending_commits: 0,
            flush_reason: None,
        }
    }

    pub(super) fn begin_filling(&mut self, now: Instant, policy: GroupCommitPolicy) {
        self.policy = sanitize_policy(policy);
        self.phase = GroupCommitPhase::Filling;
        self.filling_started = Some(now);
        self.pending_commits = 0;
        self.flush_reason = None;
    }

    pub(super) fn record_pending_commits(&mut self, commits: usize) {
        self.pending_commits = commits;
    }

    pub(super) fn begin_flushing(&mut self, reason: GroupCommitFlushReason) {
        assert!(
            matches!(self.phase, GroupCommitPhase::Filling),
            "group commit phase invariant violated: begin_flushing outside Filling"
        );
        self.phase = GroupCommitPhase::Flushing;
        self.flush_reason = Some(reason);
        self.epoch = self.epoch.saturating_add(1);
    }

    pub(super) fn complete_flush(&mut self) {
        assert!(
            matches!(self.phase, GroupCommitPhase::Flushing),
            "group commit phase invariant violated: complete_flush outside Flushing"
        );
        self.phase = GroupCommitPhase::Complete;
        self.pending_commits = 0;
        self.filling_started = None;
    }

    pub(super) fn reached_size_limit(&self) -> bool {
        self.pending_commits >= self.policy.max_group_size
    }

    pub(super) fn reached_delay_limit(&self, now: Instant) -> bool {
        self.filling_started
            .is_some_and(|started| now.duration_since(started) >= self.policy.max_group_delay)
    }

    pub(super) fn snapshot(&self) -> GroupCommitEpochSnapshot {
        GroupCommitEpochSnapshot {
            phase: self.phase,
            epoch: self.epoch,
            pending_commits: self.pending_commits,
            flush_reason: self.flush_reason,
        }
    }
}

fn sanitize_policy(policy: GroupCommitPolicy) -> GroupCommitPolicy {
    GroupCommitPolicy {
        max_group_size: policy.max_group_size.max(1),
        max_group_delay: if policy.max_group_delay.is_zero() {
            Duration::from_micros(1)
        } else {
            policy.max_group_delay
        },
    }
}

#[cfg(test)]
mod tests {
    use super::{
        GroupCommitFlushReason, GroupCommitPhase, GroupCommitPolicy, GroupCommitStateMachine,
    };
    use std::time::{Duration, Instant};

    #[test]
    fn state_machine_filling_flushing_complete_cycle() {
        let now = Instant::now();
        let mut sm = GroupCommitStateMachine::new(GroupCommitPolicy {
            max_group_size: 4,
            max_group_delay: Duration::from_millis(2),
        });

        sm.begin_filling(
            now,
            GroupCommitPolicy {
                max_group_size: 4,
                max_group_delay: Duration::from_millis(2),
            },
        );
        assert_eq!(sm.snapshot().phase, GroupCommitPhase::Filling);
        sm.record_pending_commits(4);
        assert!(sm.reached_size_limit());
        sm.begin_flushing(GroupCommitFlushReason::MaxGroupSize);
        assert_eq!(sm.snapshot().phase, GroupCommitPhase::Flushing);
        assert_eq!(
            sm.snapshot().flush_reason,
            Some(GroupCommitFlushReason::MaxGroupSize)
        );
        sm.complete_flush();
        assert_eq!(sm.snapshot().phase, GroupCommitPhase::Complete);
    }

    #[test]
    fn state_machine_delay_limit_detection() {
        let mut sm = GroupCommitStateMachine::new(GroupCommitPolicy {
            max_group_size: 16,
            max_group_delay: Duration::from_millis(5),
        });
        let started = Instant::now();
        sm.begin_filling(
            started,
            GroupCommitPolicy {
                max_group_size: 16,
                max_group_delay: Duration::from_millis(5),
            },
        );
        sm.record_pending_commits(1);
        assert!(!sm.reached_delay_limit(started + Duration::from_millis(1)));
        assert!(sm.reached_delay_limit(started + Duration::from_millis(6)));
    }

    #[test]
    fn state_machine_sanitizes_zero_policy_values() {
        let mut sm = GroupCommitStateMachine::new(GroupCommitPolicy {
            max_group_size: 0,
            max_group_delay: Duration::ZERO,
        });
        sm.begin_filling(
            Instant::now(),
            GroupCommitPolicy {
                max_group_size: 0,
                max_group_delay: Duration::ZERO,
            },
        );
        sm.record_pending_commits(1);
        assert!(sm.reached_size_limit());
        assert!(sm.reached_delay_limit(Instant::now() + Duration::from_millis(1)));
    }
}