reddb-io-file 1.11.0

RedDB file artifact layer: single-file .rdb layout, WAL, snapshots, checkpoints, locks, and recovery.
Documentation
use super::*;

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ServerlessExtentRef {
    pub collection: String,
    pub range_start: Vec<u8>,
    pub range_end: Vec<u8>,
    pub relative_path: PathBuf,
    pub offset: u64,
    pub bytes: u64,
    pub checksum: u32,
    pub content_hash: ServerlessContentHash,
    pub hot: bool,
}

impl ServerlessExtentRef {
    pub fn new(
        collection: impl Into<String>,
        range_start: impl Into<Vec<u8>>,
        range_end: impl Into<Vec<u8>>,
        relative_path: impl Into<PathBuf>,
        offset: u64,
        payload: &[u8],
        hot: bool,
    ) -> RdbFileResult<Self> {
        let range_start = range_start.into();
        let range_end = range_end.into();
        if !range_end.is_empty() && range_start >= range_end {
            return Err(RdbFileError::InvalidOperation(
                "serverless extent range_start must be before range_end".into(),
            ));
        }
        Ok(Self {
            collection: collection.into(),
            range_start,
            range_end,
            relative_path: relative_path.into(),
            offset,
            bytes: payload.len() as u64,
            checksum: crc32(payload),
            content_hash: ServerlessContentHash::from_bytes(payload),
            hot,
        })
    }

    pub fn contains_key(&self, collection: &str, key: &[u8]) -> bool {
        self.collection == collection
            && key >= self.range_start.as_slice()
            && (self.range_end.is_empty() || key < self.range_end.as_slice())
    }

    pub fn overlaps_range(&self, collection: &str, range_start: &[u8], range_end: &[u8]) -> bool {
        if self.collection != collection {
            return false;
        }
        let extent_ends_after_start =
            self.range_end.is_empty() || self.range_end.as_slice() > range_start;
        let extent_starts_before_end =
            range_end.is_empty() || self.range_start.as_slice() < range_end;
        extent_ends_after_start && extent_starts_before_end
    }
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ServerlessExtentIndex {
    pub generation: u64,
    pub extents: Vec<ServerlessExtentRef>,
}

impl ServerlessExtentIndex {
    pub fn new(generation: u64) -> Self {
        Self {
            generation,
            extents: Vec::new(),
        }
    }

    pub fn push(&mut self, extent: ServerlessExtentRef) {
        self.extents.push(extent);
        self.extents.sort_by(|left, right| {
            (
                left.collection.as_str(),
                left.range_start.as_slice(),
                left.relative_path.to_string_lossy(),
                left.offset,
            )
                .cmp(&(
                    right.collection.as_str(),
                    right.range_start.as_slice(),
                    right.relative_path.to_string_lossy(),
                    right.offset,
                ))
        });
    }

    pub fn extents_for_key(&self, collection: &str, key: &[u8]) -> Vec<&ServerlessExtentRef> {
        self.extents
            .iter()
            .filter(|extent| extent.contains_key(collection, key))
            .collect()
    }

    pub fn extents_for_range(
        &self,
        collection: &str,
        range_start: &[u8],
        range_end: &[u8],
    ) -> RdbFileResult<Vec<&ServerlessExtentRef>> {
        if !range_end.is_empty() && range_start >= range_end {
            return Err(RdbFileError::InvalidOperation(
                "serverless hydration range_start must be before range_end".into(),
            ));
        }
        Ok(self
            .extents
            .iter()
            .filter(|extent| extent.overlaps_range(collection, range_start, range_end))
            .collect())
    }

    pub fn hot_prefetch_paths(&self) -> Vec<PathBuf> {
        let mut paths: Vec<PathBuf> = self
            .extents
            .iter()
            .filter(|extent| extent.hot)
            .map(|extent| extent.relative_path.clone())
            .collect();
        paths.sort();
        paths.dedup();
        paths
    }

    pub fn hydration_plan_for_key(&self, collection: &str, key: &[u8]) -> ServerlessHydrationPlan {
        ServerlessHydrationPlan {
            generation: self.generation,
            requests: self
                .extents_for_key(collection, key)
                .into_iter()
                .map(ServerlessHydrationRequest::from_extent)
                .collect(),
        }
    }

    pub fn hydration_plan_for_range(
        &self,
        collection: &str,
        range_start: &[u8],
        range_end: &[u8],
    ) -> RdbFileResult<ServerlessHydrationPlan> {
        Ok(ServerlessHydrationPlan {
            generation: self.generation,
            requests: self
                .extents_for_range(collection, range_start, range_end)?
                .into_iter()
                .map(ServerlessHydrationRequest::from_extent)
                .collect(),
        })
    }

    pub fn hot_hydration_plan(&self) -> ServerlessHydrationPlan {
        ServerlessHydrationPlan {
            generation: self.generation,
            requests: self
                .extents
                .iter()
                .filter(|extent| extent.hot)
                .map(ServerlessHydrationRequest::from_extent)
                .collect(),
        }
    }

    pub fn write_to_path(&self, path: impl AsRef<Path>) -> RdbFileResult<()> {
        write_bytes(path, &self.encode())
    }

    pub fn read_from_path(path: impl AsRef<Path>) -> RdbFileResult<Self> {
        Self::decode(&fs::read(path)?)
    }

    pub fn encode(&self) -> Vec<u8> {
        let mut out = Vec::new();
        out.extend_from_slice(SERVERLESS_EXTENT_INDEX_MAGIC);
        put_u16(&mut out, SERVERLESS_ARTIFACT_VERSION);
        put_u64(&mut out, self.generation);
        put_u32(&mut out, self.extents.len() as u32);
        for extent in &self.extents {
            put_string(&mut out, &extent.collection);
            put_bytes(&mut out, &extent.range_start);
            put_bytes(&mut out, &extent.range_end);
            put_string(&mut out, &extent.relative_path.to_string_lossy());
            put_u64(&mut out, extent.offset);
            put_u64(&mut out, extent.bytes);
            put_u32(&mut out, extent.checksum);
            put_content_hash(&mut out, extent.content_hash);
            out.push(u8::from(extent.hot));
        }
        let checksum = crc32(&out);
        put_u32(&mut out, checksum);
        out
    }

    pub fn decode(bytes: &[u8]) -> RdbFileResult<Self> {
        verify_checksum(bytes)?;
        let mut cursor = 0usize;
        expect_magic(bytes, &mut cursor, SERVERLESS_EXTENT_INDEX_MAGIC)?;
        let version = take_u16(bytes, &mut cursor)?;
        if version != SERVERLESS_ARTIFACT_VERSION {
            return Err(RdbFileError::InvalidOperation(format!(
                "unsupported serverless extent index version {version}"
            )));
        }
        let generation = take_u64(bytes, &mut cursor)?;
        let count = take_u32(bytes, &mut cursor)? as usize;
        let mut index = Self::new(generation);
        for _ in 0..count {
            let collection = take_string(bytes, &mut cursor)?;
            let range_start = take_vec_bytes(bytes, &mut cursor)?;
            let range_end = take_vec_bytes(bytes, &mut cursor)?;
            let relative_path = PathBuf::from(take_string(bytes, &mut cursor)?);
            let offset = take_u64(bytes, &mut cursor)?;
            let bytes_len = take_u64(bytes, &mut cursor)?;
            let checksum = take_u32(bytes, &mut cursor)?;
            let content_hash = take_content_hash(bytes, &mut cursor)?;
            let hot = take_u8(bytes, &mut cursor)? != 0;
            if !range_end.is_empty() && range_start >= range_end {
                return Err(RdbFileError::InvalidOperation(
                    "serverless extent range_start must be before range_end".into(),
                ));
            }
            index.push(ServerlessExtentRef {
                collection,
                range_start,
                range_end,
                relative_path,
                offset,
                bytes: bytes_len,
                checksum,
                content_hash,
                hot,
            });
        }
        reject_trailing_bytes(bytes, cursor)?;
        Ok(index)
    }
}