reddb-io-file 1.11.0

RedDB file artifact layer: single-file .rdb layout, WAL, snapshots, checkpoints, locks, and recovery.
Documentation
use super::*;
use std::io::{Read, Seek, SeekFrom};

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ServerlessHydrationRequest {
    pub relative_path: PathBuf,
    pub offset: u64,
    pub bytes: u64,
    pub checksum: u32,
    pub content_hash: ServerlessContentHash,
}

impl ServerlessHydrationRequest {
    pub fn from_extent(extent: &ServerlessExtentRef) -> Self {
        Self {
            relative_path: extent.relative_path.clone(),
            offset: extent.offset,
            bytes: extent.bytes,
            checksum: extent.checksum,
            content_hash: extent.content_hash,
        }
    }

    pub fn validate_payload(&self, payload: &[u8]) -> RdbFileResult<()> {
        if payload.len() as u64 != self.bytes {
            return Err(RdbFileError::InvalidOperation(format!(
                "serverless hydration range {} has {} bytes, expected {}",
                self.relative_path.display(),
                payload.len(),
                self.bytes
            )));
        }
        let computed_crc = crc32(payload);
        if computed_crc != self.checksum {
            return Err(RdbFileError::InvalidOperation(format!(
                "serverless hydration range {} checksum mismatch: stored {:#010x}, computed {computed_crc:#010x}",
                self.relative_path.display(),
                self.checksum
            )));
        }
        let computed_hash = ServerlessContentHash::from_bytes(payload);
        if !self.content_hash.is_zero() && computed_hash != self.content_hash {
            return Err(RdbFileError::InvalidOperation(format!(
                "serverless hydration range {} content hash mismatch",
                self.relative_path.display()
            )));
        }
        Ok(())
    }
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ServerlessHydrationPlan {
    pub generation: u64,
    pub requests: Vec<ServerlessHydrationRequest>,
}

impl ServerlessHydrationPlan {
    pub fn total_bytes(&self) -> u64 {
        self.requests.iter().map(|request| request.bytes).sum()
    }

    pub fn is_empty(&self) -> bool {
        self.requests.is_empty()
    }
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ServerlessHydratedRange {
    pub request: ServerlessHydrationRequest,
    pub payload: Vec<u8>,
}

impl ServerlessFilePlan {
    pub fn hydrate_local_plan(
        &self,
        plan: &ServerlessHydrationPlan,
    ) -> RdbFileResult<Vec<ServerlessHydratedRange>> {
        if plan.generation != self.generation {
            return Err(RdbFileError::InvalidOperation(format!(
                "hydration plan generation {} does not match file plan generation {}",
                plan.generation, self.generation
            )));
        }
        let mut hydrated = Vec::with_capacity(plan.requests.len());
        for request in &plan.requests {
            hydrated.push(self.hydrate_local_request(request)?);
        }
        Ok(hydrated)
    }

    pub fn hydrate_local_plan_cached(
        &self,
        plan: &ServerlessHydrationPlan,
        cache: &ServerlessLocalCache,
    ) -> RdbFileResult<Vec<ServerlessHydratedRange>> {
        if plan.generation != self.generation {
            return Err(RdbFileError::InvalidOperation(format!(
                "hydration plan generation {} does not match file plan generation {}",
                plan.generation, self.generation
            )));
        }
        let mut hydrated = Vec::with_capacity(plan.requests.len());
        for request in &plan.requests {
            hydrated.push(self.hydrate_local_request_cached(request, cache)?);
        }
        Ok(hydrated)
    }

    pub fn hydrate_local_request(
        &self,
        request: &ServerlessHydrationRequest,
    ) -> RdbFileResult<ServerlessHydratedRange> {
        validate_hydration_relative_path(&request.relative_path)?;
        let end = request
            .offset
            .checked_add(request.bytes)
            .ok_or_else(|| RdbFileError::InvalidOperation("hydration range overflow".into()))?;
        let len = usize::try_from(request.bytes).map_err(|_| {
            RdbFileError::InvalidOperation("hydration range too large for local memory".into())
        })?;
        let path = self.generation_dir().join(&request.relative_path);
        let mut file = File::open(&path)?;
        let file_len = file.metadata()?.len();
        if end > file_len {
            return Err(RdbFileError::InvalidOperation(format!(
                "hydration range {}..{} exceeds pack {} length {}",
                request.offset,
                end,
                request.relative_path.display(),
                file_len
            )));
        }
        file.seek(SeekFrom::Start(request.offset))?;
        let mut payload = vec![0u8; len];
        file.read_exact(&mut payload)?;
        request.validate_payload(&payload)?;
        Ok(ServerlessHydratedRange {
            request: request.clone(),
            payload,
        })
    }

    pub fn hydrate_local_request_cached(
        &self,
        request: &ServerlessHydrationRequest,
        cache: &ServerlessLocalCache,
    ) -> RdbFileResult<ServerlessHydratedRange> {
        if cache.generation != self.generation {
            return Err(RdbFileError::InvalidOperation(format!(
                "serverless cache generation {} does not match file plan generation {}",
                cache.generation, self.generation
            )));
        }
        if let Ok(range) = cache.read_hydrated_range(request) {
            return Ok(range);
        }
        let range = self.hydrate_local_request(request)?;
        cache.write_hydrated_range(&range)?;
        cache.enforce_max_bytes(self.cache_policy.max_hot_bytes)?;
        Ok(range)
    }

    pub fn prefetch_hot_extents(
        &self,
        index: &ServerlessExtentIndex,
    ) -> RdbFileResult<Vec<ServerlessHydratedRange>> {
        self.hydrate_local_plan(&index.hot_hydration_plan())
    }

    pub fn prefetch_hot_extents_cached(
        &self,
        index: &ServerlessExtentIndex,
        cache: &ServerlessLocalCache,
    ) -> RdbFileResult<Vec<ServerlessHydratedRange>> {
        self.hydrate_local_plan_cached(&index.hot_hydration_plan(), cache)
    }
}

fn validate_hydration_relative_path(path: &Path) -> RdbFileResult<()> {
    if path.is_absolute() {
        return Err(RdbFileError::InvalidOperation(
            "serverless hydration path must be relative".into(),
        ));
    }
    if path
        .components()
        .any(|component| matches!(component, std::path::Component::ParentDir))
    {
        return Err(RdbFileError::InvalidOperation(
            "serverless hydration path must not contain parent components".into(),
        ));
    }
    Ok(())
}