Skip to main content

kuberic_core/grpc/
server.rs

1use tokio::sync::{mpsc, oneshot};
2use tonic::{Request, Response, Status};
3
4use crate::pod::RuntimeCommand;
5use crate::proto::replicator_control_server::ReplicatorControl;
6use crate::proto::*;
7use crate::types::{Epoch, Role};
8
9/// Control server that routes all commands through the PodRuntime's
10/// command channel. This ensures correct replicator/event ordering
11/// (e.g., replicator setup before user notification on promotion).
12pub struct ControlServer {
13    cmd_tx: mpsc::Sender<RuntimeCommand>,
14    #[allow(dead_code)]
15    replica_id: i64,
16}
17
18impl ControlServer {
19    pub fn new(replica_id: i64, cmd_tx: mpsc::Sender<RuntimeCommand>) -> Self {
20        Self { cmd_tx, replica_id }
21    }
22
23    async fn send_cmd<T>(
24        &self,
25        make: impl FnOnce(oneshot::Sender<crate::Result<T>>) -> RuntimeCommand,
26    ) -> Result<T, Status> {
27        let (tx, rx) = oneshot::channel();
28        self.cmd_tx
29            .send(make(tx))
30            .await
31            .map_err(|_| Status::unavailable("runtime closed"))?;
32        rx.await
33            .map_err(|_| Status::unavailable("runtime closed"))?
34            .map_err(|e| Status::internal(e.to_string()))
35    }
36}
37
38#[tonic::async_trait]
39impl ReplicatorControl for ControlServer {
40    async fn open(&self, req: Request<OpenRequest>) -> Result<Response<OpenResponse>, Status> {
41        let mode = crate::types::OpenMode::from(req.into_inner().mode);
42        self.send_cmd(|reply| RuntimeCommand::Open { mode, reply })
43            .await?;
44        Ok(Response::new(OpenResponse {}))
45    }
46
47    async fn close(&self, _req: Request<CloseRequest>) -> Result<Response<CloseResponse>, Status> {
48        self.send_cmd(|reply| RuntimeCommand::Close { reply })
49            .await?;
50        Ok(Response::new(CloseResponse {}))
51    }
52
53    async fn change_role(
54        &self,
55        req: Request<ChangeRoleRequest>,
56    ) -> Result<Response<ChangeRoleResponse>, Status> {
57        let inner = req.into_inner();
58        let epoch: Epoch = inner.epoch.unwrap_or_default().into();
59        let role: Role = Role::from(inner.role);
60        self.send_cmd(|reply| RuntimeCommand::ChangeRole { epoch, role, reply })
61            .await?;
62        Ok(Response::new(ChangeRoleResponse {}))
63    }
64
65    async fn update_epoch(
66        &self,
67        req: Request<UpdateEpochRequest>,
68    ) -> Result<Response<UpdateEpochResponse>, Status> {
69        let epoch: Epoch = req.into_inner().epoch.unwrap_or_default().into();
70        self.send_cmd(|reply| RuntimeCommand::UpdateEpoch { epoch, reply })
71            .await?;
72        Ok(Response::new(UpdateEpochResponse {}))
73    }
74
75    async fn get_status(
76        &self,
77        _req: Request<GetStatusRequest>,
78    ) -> Result<Response<GetStatusResponse>, Status> {
79        let (tx, rx) = oneshot::channel();
80        self.cmd_tx
81            .send(RuntimeCommand::GetStatus { reply: tx })
82            .await
83            .map_err(|_| Status::unavailable("runtime closed"))?;
84        let info = rx
85            .await
86            .map_err(|_| Status::unavailable("runtime closed"))?;
87        Ok(Response::new(GetStatusResponse {
88            role: crate::proto::RoleProto::from(info.role) as i32,
89            epoch: Some(info.epoch.into()),
90            current_progress: info.current_progress,
91            catch_up_capability: info.catch_up_capability,
92            committed_lsn: info.committed_lsn,
93            healthy: info.healthy,
94        }))
95    }
96
97    async fn update_catch_up_configuration(
98        &self,
99        req: Request<UpdateCatchUpConfigRequest>,
100    ) -> Result<Response<UpdateCatchUpConfigResponse>, Status> {
101        let inner = req.into_inner();
102        self.send_cmd(|reply| RuntimeCommand::UpdateCatchUpConfiguration {
103            current: inner.current.unwrap_or_default().into(),
104            previous: inner.previous.unwrap_or_default().into(),
105            reply,
106        })
107        .await?;
108        Ok(Response::new(UpdateCatchUpConfigResponse {}))
109    }
110
111    async fn update_current_configuration(
112        &self,
113        req: Request<UpdateCurrentConfigRequest>,
114    ) -> Result<Response<UpdateCurrentConfigResponse>, Status> {
115        let inner = req.into_inner();
116        self.send_cmd(|reply| RuntimeCommand::UpdateCurrentConfiguration {
117            current: inner.current.unwrap_or_default().into(),
118            reply,
119        })
120        .await?;
121        Ok(Response::new(UpdateCurrentConfigResponse {}))
122    }
123
124    async fn wait_for_catch_up_quorum(
125        &self,
126        req: Request<WaitForCatchUpQuorumRequest>,
127    ) -> Result<Response<WaitForCatchUpQuorumResponse>, Status> {
128        let mode = crate::types::ReplicaSetQuorumMode::from(req.into_inner().mode);
129        self.send_cmd(|reply| RuntimeCommand::WaitForCatchUpQuorum { mode, reply })
130            .await?;
131        Ok(Response::new(WaitForCatchUpQuorumResponse {}))
132    }
133
134    async fn build_replica(
135        &self,
136        req: Request<BuildReplicaRequest>,
137    ) -> Result<Response<BuildReplicaResponse>, Status> {
138        let replica = req
139            .into_inner()
140            .replica
141            .ok_or_else(|| Status::invalid_argument("missing replica"))?
142            .into();
143        self.send_cmd(|reply| RuntimeCommand::BuildReplica { replica, reply })
144            .await?;
145        Ok(Response::new(BuildReplicaResponse {}))
146    }
147
148    async fn remove_replica(
149        &self,
150        req: Request<RemoveReplicaRequest>,
151    ) -> Result<Response<RemoveReplicaResponse>, Status> {
152        let replica_id = req.into_inner().replica_id;
153        self.send_cmd(|reply| RuntimeCommand::RemoveReplica { replica_id, reply })
154            .await?;
155        Ok(Response::new(RemoveReplicaResponse {}))
156    }
157
158    async fn on_data_loss(
159        &self,
160        _req: Request<OnDataLossRequest>,
161    ) -> Result<Response<OnDataLossResponse>, Status> {
162        let action = self
163            .send_cmd(|reply| RuntimeCommand::OnDataLoss { reply })
164            .await?;
165        Ok(Response::new(OnDataLossResponse {
166            state_changed: action == crate::types::DataLossAction::StateChanged,
167        }))
168    }
169
170    async fn revoke_write_status(
171        &self,
172        _req: Request<RevokeWriteStatusRequest>,
173    ) -> Result<Response<RevokeWriteStatusResponse>, Status> {
174        self.send_cmd(|reply| RuntimeCommand::RevokeWriteStatus { reply })
175            .await?;
176        Ok(Response::new(RevokeWriteStatusResponse {}))
177    }
178}