reddb-io-server 1.12.0

RedDB server-side engine: storage, runtime, replication, MCP, AI, and the gRPC/HTTP/RedWire/PG-wire dispatchers. Re-exported by the umbrella `reddb` crate.
Documentation
use super::*;

impl RedDBRuntime {
    pub fn serverless_file_plan(&self) -> Option<reddb_file::ServerlessFilePlan> {
        let data_path = self.inner.db.options().data_path.as_ref()?;
        let generation = self
            .primary_logical_head_lsn()
            .max(self.cdc_current_lsn())
            .max(1);
        Some(reddb_file::ServerlessFilePlan::for_data_path(
            data_path, generation,
        ))
    }

    fn serverless_file_plan_for_generation(
        &self,
        generation: u64,
    ) -> Option<reddb_file::ServerlessFilePlan> {
        let plan = self.serverless_file_plan()?;
        Some(plan.for_generation(generation))
    }

    fn serverless_local_cache_for_generation(
        &self,
        generation: u64,
    ) -> Option<reddb_file::ServerlessLocalCache> {
        let plan = self.serverless_file_plan_for_generation(generation)?;
        Some(plan.local_cache())
    }

    fn serverless_collection_snapshot_bytes(&self, collection: &str) -> RedDBResult<Vec<u8>> {
        let source = self
            .inner
            .db
            .store()
            .get_collection(collection)
            .ok_or_else(|| {
                RedDBError::Internal(format!("serverless collection not found: {collection}"))
            })?;
        let snapshot = crate::storage::unified::UnifiedStore::with_config(
            crate::storage::unified::UnifiedStoreConfig::default(),
        );
        let mut error: Option<RedDBError> = None;
        source.for_each_entity(|entity| {
            let cloned = entity.clone();
            match snapshot.insert_auto(collection, cloned) {
                Ok(id) => {
                    if let Some(metadata) = source.get_metadata(entity.id) {
                        if let Err(err) = snapshot.set_metadata(collection, id, metadata) {
                            error = Some(RedDBError::Internal(err.to_string()));
                            return false;
                        }
                    }
                    true
                }
                Err(err) => {
                    error = Some(RedDBError::Internal(err.to_string()));
                    false
                }
            }
        });
        if let Some(error) = error {
            return Err(error);
        }
        Ok(snapshot.to_binary_dump_bytes())
    }

    pub fn publish_serverless_generation(
        &self,
    ) -> RedDBResult<Option<reddb_file::ServerlessGenerationPointer>> {
        let Some(base_plan) = self.serverless_file_plan() else {
            return Ok(None);
        };
        self.flush()?;
        let next_generation = match base_plan.read_current_pointer_verified() {
            Ok(pointer) => base_plan
                .generation
                .max(pointer.generation.saturating_add(1)),
            Err(reddb_file::RdbFileError::Io(err))
                if err.kind() == std::io::ErrorKind::NotFound =>
            {
                base_plan.generation
            }
            Err(err) => {
                return Err(RedDBError::InvalidOperation(format!(
                    "corrupt serverless generation: {err}"
                )));
            }
        };
        let plan = reddb_file::ServerlessFilePlan::new(
            base_plan.root,
            base_plan.namespace,
            next_generation,
        )
        .with_cache_policy(base_plan.cache_policy);
        let mut extent_index = reddb_file::ServerlessExtentIndex::new(plan.generation);
        let mut collections = self.inner.db.store().list_collections();
        collections.sort();
        collections.dedup();
        if collections.is_empty() {
            collections.push("__database__".to_string());
        }
        let mut collection_data = Vec::new();
        for collection in collections {
            let payload = if collection == "__database__" {
                self.inner.db.store().to_binary_dump_bytes()
            } else {
                self.serverless_collection_snapshot_bytes(&collection)?
            };
            let offset = collection_data.len() as u64;
            collection_data.extend_from_slice(&payload);
            extent_index.push(
                plan.collection_data_extent_ref(collection, offset, &payload, true)
                    .map_err(|err| RedDBError::Internal(err.to_string()))?,
            );
        }
        let secondary_index =
            reddb_file::ServerlessSecondaryIndex::from_extent_index(&extent_index);
        let pointer = plan
            .publish_core_generation(&extent_index, &collection_data, &secondary_index.encode())
            .map_err(|err| RedDBError::Internal(err.to_string()))?;
        Ok(Some(pointer))
    }

    pub fn read_current_serverless_generation_verified(
        &self,
    ) -> RedDBResult<Option<reddb_file::ServerlessGenerationPointer>> {
        let Some(plan) = self.serverless_file_plan() else {
            return Ok(None);
        };
        match plan.read_current_pointer_verified() {
            Ok(pointer) => Ok(Some(pointer)),
            Err(reddb_file::RdbFileError::Io(err))
                if err.kind() == std::io::ErrorKind::NotFound =>
            {
                Ok(None)
            }
            Err(err) => Err(RedDBError::InvalidOperation(format!(
                "corrupt serverless generation: {err}"
            ))),
        }
    }

    pub fn hydrate_current_serverless_collection(
        &self,
        collection: &str,
    ) -> RedDBResult<Option<Vec<reddb_file::ServerlessHydratedRange>>> {
        let Some(pointer) = self.read_current_serverless_generation_verified()? else {
            return Ok(None);
        };
        let Some(plan) = self.serverless_file_plan_for_generation(pointer.generation) else {
            return Ok(None);
        };
        let secondary =
            reddb_file::ServerlessSecondaryIndex::read_from_path(plan.secondary_index_path())
                .map_err(|err| {
                    RedDBError::InvalidOperation(format!("corrupt serverless generation: {err}"))
                })?;
        let hydration = secondary.hydration_plan_for_collection(collection);
        plan.hydrate_local_plan(&hydration)
            .map(Some)
            .map_err(|err| {
                RedDBError::InvalidOperation(format!("serverless hydrate failed: {err}"))
            })
    }

    pub fn hydrate_current_serverless_key(
        &self,
        collection: &str,
        key: &[u8],
    ) -> RedDBResult<Option<Vec<reddb_file::ServerlessHydratedRange>>> {
        let Some(pointer) = self.read_current_serverless_generation_verified()? else {
            return Ok(None);
        };
        let Some(plan) = self.serverless_file_plan_for_generation(pointer.generation) else {
            return Ok(None);
        };
        let index = reddb_file::ServerlessExtentIndex::read_from_path(plan.extent_index_path())
            .map_err(|err| {
                RedDBError::InvalidOperation(format!("corrupt serverless generation: {err}"))
            })?;
        let hydration = index.hydration_plan_for_key(collection, key);
        plan.hydrate_local_plan(&hydration)
            .map(Some)
            .map_err(|err| {
                RedDBError::InvalidOperation(format!("serverless hydrate failed: {err}"))
            })
    }

    pub fn hydrate_current_serverless_range(
        &self,
        collection: &str,
        range_start: &[u8],
        range_end: &[u8],
    ) -> RedDBResult<Option<Vec<reddb_file::ServerlessHydratedRange>>> {
        let Some(pointer) = self.read_current_serverless_generation_verified()? else {
            return Ok(None);
        };
        let Some(plan) = self.serverless_file_plan_for_generation(pointer.generation) else {
            return Ok(None);
        };
        let index = reddb_file::ServerlessExtentIndex::read_from_path(plan.extent_index_path())
            .map_err(|err| {
                RedDBError::InvalidOperation(format!("corrupt serverless generation: {err}"))
            })?;
        let hydration = index
            .hydration_plan_for_range(collection, range_start, range_end)
            .map_err(|err| RedDBError::InvalidOperation(err.to_string()))?;
        plan.hydrate_local_plan(&hydration)
            .map(Some)
            .map_err(|err| {
                RedDBError::InvalidOperation(format!("serverless hydrate failed: {err}"))
            })
    }

    pub fn hydrate_current_serverless_key_cached(
        &self,
        collection: &str,
        key: &[u8],
    ) -> RedDBResult<Option<Vec<reddb_file::ServerlessHydratedRange>>> {
        let Some(pointer) = self.read_current_serverless_generation_verified()? else {
            return Ok(None);
        };
        let Some(plan) = self.serverless_file_plan_for_generation(pointer.generation) else {
            return Ok(None);
        };
        let Some(cache) = self.serverless_local_cache_for_generation(pointer.generation) else {
            return Ok(None);
        };
        let index = reddb_file::ServerlessExtentIndex::read_from_path(plan.extent_index_path())
            .map_err(|err| {
                RedDBError::InvalidOperation(format!("corrupt serverless generation: {err}"))
            })?;
        let hydration = index.hydration_plan_for_key(collection, key);
        plan.hydrate_local_plan_cached(&hydration, &cache)
            .map(Some)
            .map_err(|err| {
                RedDBError::InvalidOperation(format!("serverless hydrate failed: {err}"))
            })
    }

    pub fn prefetch_current_serverless_hot_extents_cached(
        &self,
    ) -> RedDBResult<Option<Vec<reddb_file::ServerlessHydratedRange>>> {
        let Some(pointer) = self.read_current_serverless_generation_verified()? else {
            return Ok(None);
        };
        let Some(plan) = self.serverless_file_plan_for_generation(pointer.generation) else {
            return Ok(None);
        };
        let Some(cache) = self.serverless_local_cache_for_generation(pointer.generation) else {
            return Ok(None);
        };
        let index = reddb_file::ServerlessExtentIndex::read_from_path(plan.extent_index_path())
            .map_err(|err| {
                RedDBError::InvalidOperation(format!("corrupt serverless generation: {err}"))
            })?;
        plan.prefetch_hot_extents_cached(&index, &cache)
            .map(Some)
            .map_err(|err| {
                RedDBError::InvalidOperation(format!("serverless hydrate failed: {err}"))
            })
    }
}