reddb-io-file 1.11.0

RedDB file artifact layer: single-file .rdb layout, WAL, snapshots, checkpoints, locks, and recovery.
Documentation
use super::*;

#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct TimelineId(pub u64);

impl TimelineId {
    pub const fn initial() -> Self {
        Self(1)
    }

    pub const fn next(self) -> Self {
        Self(self.0 + 1)
    }
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TimelineHistoryEntry {
    pub timeline: TimelineId,
    pub parent_timeline: Option<TimelineId>,
    pub fork_lsn: u64,
    pub created_at_unix_ms: u64,
    pub reason: String,
}

impl TimelineHistoryEntry {
    pub fn initial(created_at_unix_ms: u64) -> Self {
        Self {
            timeline: TimelineId::initial(),
            parent_timeline: None,
            fork_lsn: 0,
            created_at_unix_ms,
            reason: "initial".into(),
        }
    }

    pub fn fork(
        timeline: TimelineId,
        parent_timeline: TimelineId,
        fork_lsn: u64,
        created_at_unix_ms: u64,
        reason: impl Into<String>,
    ) -> Self {
        Self {
            timeline,
            parent_timeline: Some(parent_timeline),
            fork_lsn,
            created_at_unix_ms,
            reason: reason.into(),
        }
    }
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TimelineHistory {
    pub entries: Vec<TimelineHistoryEntry>,
}

impl TimelineHistory {
    pub fn new(initial_created_at_unix_ms: u64) -> Self {
        Self {
            entries: vec![TimelineHistoryEntry::initial(initial_created_at_unix_ms)],
        }
    }

    pub fn current(&self) -> Option<TimelineId> {
        self.entries.last().map(|entry| entry.timeline)
    }

    pub fn fork(
        &mut self,
        new_timeline: TimelineId,
        parent_timeline: TimelineId,
        fork_lsn: u64,
        created_at_unix_ms: u64,
        reason: impl Into<String>,
    ) -> RdbFileResult<()> {
        if self.current() != Some(parent_timeline) {
            return Err(RdbFileError::InvalidOperation(format!(
                "cannot fork timeline {} from non-current parent {}",
                new_timeline.0, parent_timeline.0
            )));
        }
        if new_timeline <= parent_timeline {
            return Err(RdbFileError::InvalidOperation(format!(
                "new timeline {} must be greater than parent {}",
                new_timeline.0, parent_timeline.0
            )));
        }
        self.entries.push(TimelineHistoryEntry::fork(
            new_timeline,
            parent_timeline,
            fork_lsn,
            created_at_unix_ms,
            reason,
        ));
        Ok(())
    }

    pub fn ancestor_lsn(&self, timeline: TimelineId) -> Option<u64> {
        self.entries
            .iter()
            .find(|entry| entry.timeline == timeline)
            .map(|entry| entry.fork_lsn)
    }

    pub fn descendant_chain_from(&self, timeline: TimelineId) -> Option<Vec<TimelineHistoryEntry>> {
        let index = self
            .entries
            .iter()
            .position(|entry| entry.timeline == timeline)?;
        Some(self.entries[index.saturating_add(1)..].to_vec())
    }

    pub fn rejoin_decision(
        &self,
        node_timeline: TimelineId,
        node_flushed_lsn: u64,
        available_from_lsn: u64,
    ) -> RejoinDecision {
        if self.current() == Some(node_timeline) {
            return RejoinDecision::AlreadyCurrent;
        }
        let Some(current) = self.current() else {
            return RejoinDecision::Reclone;
        };
        let Some(chain) = self.descendant_chain_from(node_timeline) else {
            return RejoinDecision::Reclone;
        };
        let Some(first_child) = chain.first() else {
            return RejoinDecision::Reclone;
        };
        if node_flushed_lsn >= first_child.fork_lsn {
            RejoinDecision::Rewind {
                target_timeline: current,
                rewind_to_lsn: first_child.fork_lsn,
            }
        } else if node_flushed_lsn >= available_from_lsn {
            RejoinDecision::FollowNewTimeline {
                target_timeline: current,
                start_lsn: node_flushed_lsn,
            }
        } else {
            RejoinDecision::Reclone
        }
    }

    pub fn promotion_history(
        &self,
        candidate: &PromotionCandidate,
        new_timeline: TimelineId,
        created_at_unix_ms: u64,
    ) -> RdbFileResult<Self> {
        let mut next = self.clone();
        next.fork(
            new_timeline,
            candidate.timeline,
            candidate.applied_lsn,
            created_at_unix_ms,
            format!("promote {}", candidate.replica_id),
        )?;
        Ok(next)
    }

    pub fn write_to_path(&self, path: impl AsRef<Path>) -> RdbFileResult<()> {
        write_bytes_atomically(path.as_ref(), &self.encode())
    }

    pub fn read_from_path(path: impl AsRef<Path>) -> RdbFileResult<Self> {
        Self::decode(&fs::read(path)?)
    }

    pub fn encode(&self) -> Vec<u8> {
        let mut out = Vec::new();
        out.extend_from_slice(TIMELINE_HISTORY_MAGIC);
        put_u16(&mut out, PRIMARY_REPLICA_ARTIFACT_VERSION);
        put_u32(&mut out, self.entries.len() as u32);
        for entry in &self.entries {
            put_u64(&mut out, entry.timeline.0);
            match entry.parent_timeline {
                Some(parent) => {
                    out.push(1);
                    put_u64(&mut out, parent.0);
                }
                None => {
                    out.push(0);
                    put_u64(&mut out, 0);
                }
            }
            put_u64(&mut out, entry.fork_lsn);
            put_u64(&mut out, entry.created_at_unix_ms);
            put_string(&mut out, &entry.reason);
        }
        let checksum = crc32(&out);
        put_u32(&mut out, checksum);
        out
    }

    pub fn decode(bytes: &[u8]) -> RdbFileResult<Self> {
        verify_checksum(bytes, "timeline history")?;
        let payload_end = bytes.len() - CHECKSUM_LEN;
        let mut cursor = 0usize;
        expect_magic(
            bytes,
            &mut cursor,
            payload_end,
            TIMELINE_HISTORY_MAGIC,
            "timeline history",
        )?;
        let version = take_u16(bytes, &mut cursor, payload_end)?;
        if version != PRIMARY_REPLICA_ARTIFACT_VERSION {
            return Err(RdbFileError::InvalidOperation(format!(
                "unsupported timeline history version {version}"
            )));
        }
        let count = take_u32(bytes, &mut cursor, payload_end)? as usize;
        if count == 0 {
            return Err(RdbFileError::InvalidOperation(
                "timeline history must contain an initial entry".into(),
            ));
        }
        let mut entries = Vec::with_capacity(count);
        let mut previous = None;
        for index in 0..count {
            let timeline = TimelineId(take_u64(bytes, &mut cursor, payload_end)?);
            let has_parent = take_u8(bytes, &mut cursor, payload_end)? != 0;
            let parent_raw = TimelineId(take_u64(bytes, &mut cursor, payload_end)?);
            let parent_timeline = has_parent.then_some(parent_raw);
            let fork_lsn = take_u64(bytes, &mut cursor, payload_end)?;
            let created_at_unix_ms = take_u64(bytes, &mut cursor, payload_end)?;
            let reason = take_string(bytes, &mut cursor, payload_end)?;
            if index == 0 && parent_timeline.is_some() {
                return Err(RdbFileError::InvalidOperation(
                    "initial timeline history entry cannot have a parent".into(),
                ));
            }
            if index == 0 && timeline != TimelineId::initial() {
                return Err(RdbFileError::InvalidOperation(
                    "timeline history must start at initial timeline".into(),
                ));
            }
            if index > 0 && parent_timeline != previous {
                return Err(RdbFileError::InvalidOperation(
                    "timeline history parent does not match previous timeline".into(),
                ));
            }
            if let Some(parent) = parent_timeline {
                if timeline <= parent {
                    return Err(RdbFileError::InvalidOperation(format!(
                        "timeline {} must be greater than parent {}",
                        timeline.0, parent.0
                    )));
                }
            }
            previous = Some(timeline);
            entries.push(TimelineHistoryEntry {
                timeline,
                parent_timeline,
                fork_lsn,
                created_at_unix_ms,
                reason,
            });
        }
        reject_trailing_bytes(bytes, cursor, payload_end, "timeline history")?;
        Ok(Self { entries })
    }
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RejoinDecision {
    AlreadyCurrent,
    FollowNewTimeline {
        target_timeline: TimelineId,
        start_lsn: u64,
    },
    Rewind {
        target_timeline: TimelineId,
        rewind_to_lsn: u64,
    },
    Reclone,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PromotionCandidate {
    pub replica_id: String,
    pub timeline: TimelineId,
    pub received_lsn: u64,
    pub flushed_lsn: u64,
    pub applied_lsn: u64,
}

impl PromotionCandidate {
    pub fn select(timeline: TimelineId, acks: &[ReplicaAck]) -> Option<Self> {
        acks.iter()
            .filter(|ack| ack.timeline == timeline)
            .max_by(|left, right| {
                (
                    left.applied_lsn,
                    left.flushed_lsn,
                    left.received_lsn,
                    std::cmp::Reverse(left.replica_id.as_str()),
                )
                    .cmp(&(
                        right.applied_lsn,
                        right.flushed_lsn,
                        right.received_lsn,
                        std::cmp::Reverse(right.replica_id.as_str()),
                    ))
            })
            .map(|ack| Self {
                replica_id: ack.replica_id.clone(),
                timeline: ack.timeline,
                received_lsn: ack.received_lsn,
                flushed_lsn: ack.flushed_lsn,
                applied_lsn: ack.applied_lsn,
            })
    }
}