kuberic-core 0.0.1

Core replication framework for kuberic — quorum replication, failover, epoch fencing
Documentation
use tokio::sync::{mpsc, oneshot};
use tonic::{Request, Response, Status};

use crate::pod::RuntimeCommand;
use crate::proto::replicator_control_server::ReplicatorControl;
use crate::proto::*;
use crate::types::{Epoch, Role};

/// Control server that routes all commands through the PodRuntime's
/// command channel. This ensures correct replicator/event ordering
/// (e.g., replicator setup before user notification on promotion).
pub struct ControlServer {
    cmd_tx: mpsc::Sender<RuntimeCommand>,
    #[allow(dead_code)]
    replica_id: i64,
}

impl ControlServer {
    pub fn new(replica_id: i64, cmd_tx: mpsc::Sender<RuntimeCommand>) -> Self {
        Self { cmd_tx, replica_id }
    }

    async fn send_cmd<T>(
        &self,
        make: impl FnOnce(oneshot::Sender<crate::Result<T>>) -> RuntimeCommand,
    ) -> Result<T, Status> {
        let (tx, rx) = oneshot::channel();
        self.cmd_tx
            .send(make(tx))
            .await
            .map_err(|_| Status::unavailable("runtime closed"))?;
        rx.await
            .map_err(|_| Status::unavailable("runtime closed"))?
            .map_err(|e| Status::internal(e.to_string()))
    }
}

#[tonic::async_trait]
impl ReplicatorControl for ControlServer {
    async fn open(&self, req: Request<OpenRequest>) -> Result<Response<OpenResponse>, Status> {
        let mode = crate::types::OpenMode::from(req.into_inner().mode);
        self.send_cmd(|reply| RuntimeCommand::Open { mode, reply })
            .await?;
        Ok(Response::new(OpenResponse {}))
    }

    async fn close(&self, _req: Request<CloseRequest>) -> Result<Response<CloseResponse>, Status> {
        self.send_cmd(|reply| RuntimeCommand::Close { reply })
            .await?;
        Ok(Response::new(CloseResponse {}))
    }

    async fn change_role(
        &self,
        req: Request<ChangeRoleRequest>,
    ) -> Result<Response<ChangeRoleResponse>, Status> {
        let inner = req.into_inner();
        let epoch: Epoch = inner.epoch.unwrap_or_default().into();
        let role: Role = Role::from(inner.role);
        self.send_cmd(|reply| RuntimeCommand::ChangeRole { epoch, role, reply })
            .await?;
        Ok(Response::new(ChangeRoleResponse {}))
    }

    async fn update_epoch(
        &self,
        req: Request<UpdateEpochRequest>,
    ) -> Result<Response<UpdateEpochResponse>, Status> {
        let epoch: Epoch = req.into_inner().epoch.unwrap_or_default().into();
        self.send_cmd(|reply| RuntimeCommand::UpdateEpoch { epoch, reply })
            .await?;
        Ok(Response::new(UpdateEpochResponse {}))
    }

    async fn get_status(
        &self,
        _req: Request<GetStatusRequest>,
    ) -> Result<Response<GetStatusResponse>, Status> {
        let (tx, rx) = oneshot::channel();
        self.cmd_tx
            .send(RuntimeCommand::GetStatus { reply: tx })
            .await
            .map_err(|_| Status::unavailable("runtime closed"))?;
        let info = rx
            .await
            .map_err(|_| Status::unavailable("runtime closed"))?;
        Ok(Response::new(GetStatusResponse {
            role: crate::proto::RoleProto::from(info.role) as i32,
            epoch: Some(info.epoch.into()),
            current_progress: info.current_progress,
            catch_up_capability: info.catch_up_capability,
            committed_lsn: info.committed_lsn,
            healthy: info.healthy,
        }))
    }

    async fn update_catch_up_configuration(
        &self,
        req: Request<UpdateCatchUpConfigRequest>,
    ) -> Result<Response<UpdateCatchUpConfigResponse>, Status> {
        let inner = req.into_inner();
        self.send_cmd(|reply| RuntimeCommand::UpdateCatchUpConfiguration {
            current: inner.current.unwrap_or_default().into(),
            previous: inner.previous.unwrap_or_default().into(),
            reply,
        })
        .await?;
        Ok(Response::new(UpdateCatchUpConfigResponse {}))
    }

    async fn update_current_configuration(
        &self,
        req: Request<UpdateCurrentConfigRequest>,
    ) -> Result<Response<UpdateCurrentConfigResponse>, Status> {
        let inner = req.into_inner();
        self.send_cmd(|reply| RuntimeCommand::UpdateCurrentConfiguration {
            current: inner.current.unwrap_or_default().into(),
            reply,
        })
        .await?;
        Ok(Response::new(UpdateCurrentConfigResponse {}))
    }

    async fn wait_for_catch_up_quorum(
        &self,
        req: Request<WaitForCatchUpQuorumRequest>,
    ) -> Result<Response<WaitForCatchUpQuorumResponse>, Status> {
        let mode = crate::types::ReplicaSetQuorumMode::from(req.into_inner().mode);
        self.send_cmd(|reply| RuntimeCommand::WaitForCatchUpQuorum { mode, reply })
            .await?;
        Ok(Response::new(WaitForCatchUpQuorumResponse {}))
    }

    async fn build_replica(
        &self,
        req: Request<BuildReplicaRequest>,
    ) -> Result<Response<BuildReplicaResponse>, Status> {
        let replica = req
            .into_inner()
            .replica
            .ok_or_else(|| Status::invalid_argument("missing replica"))?
            .into();
        self.send_cmd(|reply| RuntimeCommand::BuildReplica { replica, reply })
            .await?;
        Ok(Response::new(BuildReplicaResponse {}))
    }

    async fn remove_replica(
        &self,
        req: Request<RemoveReplicaRequest>,
    ) -> Result<Response<RemoveReplicaResponse>, Status> {
        let replica_id = req.into_inner().replica_id;
        self.send_cmd(|reply| RuntimeCommand::RemoveReplica { replica_id, reply })
            .await?;
        Ok(Response::new(RemoveReplicaResponse {}))
    }

    async fn on_data_loss(
        &self,
        _req: Request<OnDataLossRequest>,
    ) -> Result<Response<OnDataLossResponse>, Status> {
        let action = self
            .send_cmd(|reply| RuntimeCommand::OnDataLoss { reply })
            .await?;
        Ok(Response::new(OnDataLossResponse {
            state_changed: action == crate::types::DataLossAction::StateChanged,
        }))
    }

    async fn revoke_write_status(
        &self,
        _req: Request<RevokeWriteStatusRequest>,
    ) -> Result<Response<RevokeWriteStatusResponse>, Status> {
        self.send_cmd(|reply| RuntimeCommand::RevokeWriteStatus { reply })
            .await?;
        Ok(Response::new(RevokeWriteStatusResponse {}))
    }
}