reddb-io-file 1.11.0

RedDB file artifact layer: single-file .rdb layout, WAL, snapshots, checkpoints, locks, and recovery.
Documentation
use super::*;

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ReplicaRelayLogRecord {
    pub lsn: u64,
    pub payload: Vec<u8>,
}

impl ReplicaRelayLogRecord {
    pub fn new(lsn: u64, payload: impl Into<Vec<u8>>) -> Self {
        Self {
            lsn,
            payload: payload.into(),
        }
    }
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ReplicaRelayLogSegment {
    pub timeline: TimelineId,
    pub start_lsn: u64,
    pub end_lsn: u64,
    pub records: Vec<ReplicaRelayLogRecord>,
}

impl ReplicaRelayLogSegment {
    pub fn from_records(
        timeline: TimelineId,
        records: Vec<ReplicaRelayLogRecord>,
    ) -> RdbFileResult<Self> {
        let first = records.first().ok_or_else(|| {
            RdbFileError::InvalidOperation("relay segment cannot be empty".into())
        })?;
        let end_lsn = records
            .iter()
            .map(|record| record.lsn)
            .max()
            .unwrap_or(first.lsn);
        let segment = Self {
            timeline,
            start_lsn: first.lsn,
            end_lsn,
            records,
        };
        segment.validate()?;
        Ok(segment)
    }

    pub fn checksum(&self) -> RdbFileResult<u32> {
        Ok(crc32(&self.encode()?))
    }

    pub fn write_to_path(&self, path: impl AsRef<Path>) -> RdbFileResult<()> {
        write_bytes_atomically(path.as_ref(), &self.encode()?)
    }

    pub fn read_from_path(path: impl AsRef<Path>) -> RdbFileResult<Self> {
        Self::decode(&fs::read(path)?)
    }

    pub fn encode(&self) -> RdbFileResult<Vec<u8>> {
        self.validate()?;
        let mut out = Vec::new();
        out.extend_from_slice(RELAY_LOG_SEGMENT_MAGIC);
        put_u16(&mut out, PRIMARY_REPLICA_ARTIFACT_VERSION);
        put_u64(&mut out, self.timeline.0);
        put_u64(&mut out, self.start_lsn);
        put_u64(&mut out, self.end_lsn);
        put_u64(&mut out, self.records.len() as u64);
        for record in &self.records {
            put_u64(&mut out, record.lsn);
            let payload_len = u32::try_from(record.payload.len()).map_err(|_| {
                RdbFileError::InvalidOperation("relay record payload too large".into())
            })?;
            put_u32(&mut out, payload_len);
            out.extend_from_slice(&record.payload);
        }
        let checksum = crc32(&out);
        put_u32(&mut out, checksum);
        Ok(out)
    }

    pub fn decode(bytes: &[u8]) -> RdbFileResult<Self> {
        verify_checksum(bytes, "replica relay log segment")?;
        let payload_end = bytes.len() - CHECKSUM_LEN;
        let mut cursor = 0usize;
        expect_magic(
            bytes,
            &mut cursor,
            payload_end,
            RELAY_LOG_SEGMENT_MAGIC,
            "replica relay log segment",
        )?;
        let version = take_u16(bytes, &mut cursor, payload_end)?;
        if version != PRIMARY_REPLICA_ARTIFACT_VERSION {
            return Err(RdbFileError::InvalidOperation(format!(
                "unsupported relay log segment version {version}"
            )));
        }
        let timeline = TimelineId(take_u64(bytes, &mut cursor, payload_end)?);
        let start_lsn = take_u64(bytes, &mut cursor, payload_end)?;
        let end_lsn = take_u64(bytes, &mut cursor, payload_end)?;
        let count = take_u64(bytes, &mut cursor, payload_end)?;
        let mut records = Vec::with_capacity(count as usize);
        for _ in 0..count {
            let lsn = take_u64(bytes, &mut cursor, payload_end)?;
            let payload_len = take_u32(bytes, &mut cursor, payload_end)? as usize;
            let payload = take_bytes(bytes, &mut cursor, payload_end, payload_len)?.to_vec();
            records.push(ReplicaRelayLogRecord { lsn, payload });
        }
        reject_trailing_bytes(bytes, cursor, payload_end, "replica relay log segment")?;
        let segment = Self {
            timeline,
            start_lsn,
            end_lsn,
            records,
        };
        segment.validate()?;
        Ok(segment)
    }

    fn validate(&self) -> RdbFileResult<()> {
        if self.records.is_empty() {
            return Err(RdbFileError::InvalidOperation(
                "relay segment cannot be empty".into(),
            ));
        }
        if self.end_lsn < self.start_lsn {
            return Err(RdbFileError::InvalidOperation(
                "relay segment end lsn is before start lsn".into(),
            ));
        }
        let mut previous = None;
        for record in &self.records {
            if record.lsn < self.start_lsn || record.lsn > self.end_lsn {
                return Err(RdbFileError::InvalidOperation(format!(
                    "relay record lsn {} is outside segment range {}..={}",
                    record.lsn, self.start_lsn, self.end_lsn
                )));
            }
            if previous.map(|lsn| record.lsn <= lsn).unwrap_or(false) {
                return Err(RdbFileError::InvalidOperation(
                    "relay segment records are not strictly ordered".into(),
                ));
            }
            previous = Some(record.lsn);
        }
        Ok(())
    }
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RelayLogSegmentRef {
    pub relative_path: PathBuf,
    pub start_lsn: u64,
    pub end_lsn: u64,
    pub checksum: u32,
}

impl RelayLogSegmentRef {
    pub fn new(
        relative_path: impl Into<PathBuf>,
        start_lsn: u64,
        end_lsn: u64,
        checksum: u32,
    ) -> RdbFileResult<Self> {
        let relative_path = relative_path.into();
        validate_relay_relative_path(&relative_path)?;
        if end_lsn < start_lsn {
            return Err(RdbFileError::InvalidOperation(
                "relay segment end lsn is before start lsn".into(),
            ));
        }
        Ok(Self {
            relative_path,
            start_lsn,
            end_lsn,
            checksum,
        })
    }
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ReplicaRelayLogManifest {
    pub replica_id: String,
    pub timeline: TimelineId,
    pub received_lsn: u64,
    pub flushed_lsn: u64,
    pub applied_lsn: u64,
    pub segments: Vec<RelayLogSegmentRef>,
}

impl ReplicaRelayLogManifest {
    pub fn new(replica_id: impl Into<String>, timeline: TimelineId) -> Self {
        Self {
            replica_id: replica_id.into(),
            timeline,
            received_lsn: 0,
            flushed_lsn: 0,
            applied_lsn: 0,
            segments: Vec::new(),
        }
    }

    pub fn push_segment(&mut self, segment: RelayLogSegmentRef) -> RdbFileResult<()> {
        if let Some(previous) = self.segments.last() {
            if segment.start_lsn < previous.end_lsn {
                return Err(RdbFileError::InvalidOperation(
                    "relay segment overlaps previous segment".into(),
                ));
            }
        }
        self.received_lsn = self.received_lsn.max(segment.end_lsn);
        self.flushed_lsn = self.flushed_lsn.max(segment.end_lsn);
        self.segments.push(segment);
        Ok(())
    }

    pub fn mark_applied(&mut self, applied_lsn: u64) -> RdbFileResult<()> {
        if applied_lsn > self.flushed_lsn {
            return Err(RdbFileError::InvalidOperation(format!(
                "relay applied lsn {applied_lsn} is beyond flushed lsn {}",
                self.flushed_lsn
            )));
        }
        self.applied_lsn = self.applied_lsn.max(applied_lsn);
        Ok(())
    }

    pub fn ack(&self) -> RdbFileResult<ReplicaAck> {
        ReplicaAck::with_positions(
            &self.replica_id,
            self.timeline,
            self.received_lsn,
            self.flushed_lsn,
            self.flushed_lsn,
            self.applied_lsn,
        )
    }

    pub fn write_to_path(&self, path: impl AsRef<Path>) -> RdbFileResult<()> {
        write_bytes_atomically(path.as_ref(), &self.encode()?)
    }

    pub fn read_from_path(path: impl AsRef<Path>) -> RdbFileResult<Self> {
        Self::decode(&fs::read(path)?)
    }

    pub fn validate_segments(&self, relay_dir: impl AsRef<Path>) -> RdbFileResult<()> {
        let relay_dir = relay_dir.as_ref();
        for segment_ref in &self.segments {
            validate_relay_relative_path(&segment_ref.relative_path)?;
            let segment =
                ReplicaRelayLogSegment::read_from_path(relay_dir.join(&segment_ref.relative_path))?;
            if segment.timeline != self.timeline {
                return Err(RdbFileError::InvalidOperation(format!(
                    "relay segment timeline {} does not match manifest timeline {}",
                    segment.timeline.0, self.timeline.0
                )));
            }
            if segment.start_lsn != segment_ref.start_lsn || segment.end_lsn != segment_ref.end_lsn
            {
                return Err(RdbFileError::InvalidOperation(format!(
                    "relay segment range {}..{} does not match manifest range {}..{}",
                    segment.start_lsn, segment.end_lsn, segment_ref.start_lsn, segment_ref.end_lsn
                )));
            }
            let checksum = segment.checksum()?;
            if checksum != segment_ref.checksum {
                return Err(RdbFileError::InvalidOperation(format!(
                    "relay segment checksum mismatch: stored {:#010x}, computed {:#010x}",
                    segment_ref.checksum, checksum
                )));
            }
        }
        Ok(())
    }

    pub fn encode(&self) -> RdbFileResult<Vec<u8>> {
        if self.applied_lsn > self.flushed_lsn || self.flushed_lsn > self.received_lsn {
            return Err(RdbFileError::InvalidOperation(
                "relay manifest positions are not ordered".into(),
            ));
        }
        let mut out = Vec::new();
        out.extend_from_slice(RELAY_LOG_MANIFEST_MAGIC);
        put_u16(&mut out, PRIMARY_REPLICA_ARTIFACT_VERSION);
        put_string(&mut out, &self.replica_id);
        put_u64(&mut out, self.timeline.0);
        put_u64(&mut out, self.received_lsn);
        put_u64(&mut out, self.flushed_lsn);
        put_u64(&mut out, self.applied_lsn);
        put_u32(&mut out, self.segments.len() as u32);
        for segment in &self.segments {
            put_string(&mut out, &segment.relative_path.to_string_lossy());
            put_u64(&mut out, segment.start_lsn);
            put_u64(&mut out, segment.end_lsn);
            put_u32(&mut out, segment.checksum);
        }
        let checksum = crc32(&out);
        put_u32(&mut out, checksum);
        Ok(out)
    }

    pub fn decode(bytes: &[u8]) -> RdbFileResult<Self> {
        verify_checksum(bytes, "replica relay log manifest")?;
        let payload_end = bytes.len() - CHECKSUM_LEN;
        let mut cursor = 0usize;
        expect_magic(
            bytes,
            &mut cursor,
            payload_end,
            RELAY_LOG_MANIFEST_MAGIC,
            "replica relay log manifest",
        )?;
        let version = take_u16(bytes, &mut cursor, payload_end)?;
        if version != PRIMARY_REPLICA_ARTIFACT_VERSION {
            return Err(RdbFileError::InvalidOperation(format!(
                "unsupported relay log manifest version {version}"
            )));
        }
        let replica_id = take_string(bytes, &mut cursor, payload_end)?;
        let timeline = TimelineId(take_u64(bytes, &mut cursor, payload_end)?);
        let received_lsn = take_u64(bytes, &mut cursor, payload_end)?;
        let flushed_lsn = take_u64(bytes, &mut cursor, payload_end)?;
        let applied_lsn = take_u64(bytes, &mut cursor, payload_end)?;
        let count = take_u32(bytes, &mut cursor, payload_end)? as usize;
        let mut manifest = Self {
            replica_id,
            timeline,
            received_lsn,
            flushed_lsn,
            applied_lsn,
            segments: Vec::with_capacity(count),
        };
        for _ in 0..count {
            let relative_path = take_string(bytes, &mut cursor, payload_end)?;
            let start_lsn = take_u64(bytes, &mut cursor, payload_end)?;
            let end_lsn = take_u64(bytes, &mut cursor, payload_end)?;
            let checksum = take_u32(bytes, &mut cursor, payload_end)?;
            manifest.segments.push(RelayLogSegmentRef::new(
                relative_path,
                start_lsn,
                end_lsn,
                checksum,
            )?);
        }
        reject_trailing_bytes(bytes, cursor, payload_end, "replica relay log manifest")?;
        if manifest.applied_lsn > manifest.flushed_lsn
            || manifest.flushed_lsn > manifest.received_lsn
        {
            return Err(RdbFileError::InvalidOperation(
                "relay manifest positions are not ordered".into(),
            ));
        }
        Ok(manifest)
    }
}

fn validate_relay_relative_path(path: &Path) -> RdbFileResult<()> {
    if path.is_absolute() {
        return Err(RdbFileError::InvalidOperation(
            "relay segment path must be relative".into(),
        ));
    }
    if path
        .components()
        .any(|component| matches!(component, std::path::Component::ParentDir))
    {
        return Err(RdbFileError::InvalidOperation(
            "relay segment path must not escape relay directory".into(),
        ));
    }
    Ok(())
}