kuberic_core/grpc/
server.rs1use tokio::sync::{mpsc, oneshot};
2use tonic::{Request, Response, Status};
3
4use crate::pod::RuntimeCommand;
5use crate::proto::replicator_control_server::ReplicatorControl;
6use crate::proto::*;
7use crate::types::{Epoch, Role};
8
9pub struct ControlServer {
13 cmd_tx: mpsc::Sender<RuntimeCommand>,
14 #[allow(dead_code)]
15 replica_id: i64,
16}
17
18impl ControlServer {
19 pub fn new(replica_id: i64, cmd_tx: mpsc::Sender<RuntimeCommand>) -> Self {
20 Self { cmd_tx, replica_id }
21 }
22
23 async fn send_cmd<T>(
24 &self,
25 make: impl FnOnce(oneshot::Sender<crate::Result<T>>) -> RuntimeCommand,
26 ) -> Result<T, Status> {
27 let (tx, rx) = oneshot::channel();
28 self.cmd_tx
29 .send(make(tx))
30 .await
31 .map_err(|_| Status::unavailable("runtime closed"))?;
32 rx.await
33 .map_err(|_| Status::unavailable("runtime closed"))?
34 .map_err(|e| Status::internal(e.to_string()))
35 }
36}
37
38#[tonic::async_trait]
39impl ReplicatorControl for ControlServer {
40 async fn open(&self, req: Request<OpenRequest>) -> Result<Response<OpenResponse>, Status> {
41 let mode = crate::types::OpenMode::from(req.into_inner().mode);
42 self.send_cmd(|reply| RuntimeCommand::Open { mode, reply })
43 .await?;
44 Ok(Response::new(OpenResponse {}))
45 }
46
47 async fn close(&self, _req: Request<CloseRequest>) -> Result<Response<CloseResponse>, Status> {
48 self.send_cmd(|reply| RuntimeCommand::Close { reply })
49 .await?;
50 Ok(Response::new(CloseResponse {}))
51 }
52
53 async fn change_role(
54 &self,
55 req: Request<ChangeRoleRequest>,
56 ) -> Result<Response<ChangeRoleResponse>, Status> {
57 let inner = req.into_inner();
58 let epoch: Epoch = inner.epoch.unwrap_or_default().into();
59 let role: Role = Role::from(inner.role);
60 self.send_cmd(|reply| RuntimeCommand::ChangeRole { epoch, role, reply })
61 .await?;
62 Ok(Response::new(ChangeRoleResponse {}))
63 }
64
65 async fn update_epoch(
66 &self,
67 req: Request<UpdateEpochRequest>,
68 ) -> Result<Response<UpdateEpochResponse>, Status> {
69 let epoch: Epoch = req.into_inner().epoch.unwrap_or_default().into();
70 self.send_cmd(|reply| RuntimeCommand::UpdateEpoch { epoch, reply })
71 .await?;
72 Ok(Response::new(UpdateEpochResponse {}))
73 }
74
75 async fn get_status(
76 &self,
77 _req: Request<GetStatusRequest>,
78 ) -> Result<Response<GetStatusResponse>, Status> {
79 let (tx, rx) = oneshot::channel();
80 self.cmd_tx
81 .send(RuntimeCommand::GetStatus { reply: tx })
82 .await
83 .map_err(|_| Status::unavailable("runtime closed"))?;
84 let info = rx
85 .await
86 .map_err(|_| Status::unavailable("runtime closed"))?;
87 Ok(Response::new(GetStatusResponse {
88 role: crate::proto::RoleProto::from(info.role) as i32,
89 epoch: Some(info.epoch.into()),
90 current_progress: info.current_progress,
91 catch_up_capability: info.catch_up_capability,
92 committed_lsn: info.committed_lsn,
93 healthy: info.healthy,
94 }))
95 }
96
97 async fn update_catch_up_configuration(
98 &self,
99 req: Request<UpdateCatchUpConfigRequest>,
100 ) -> Result<Response<UpdateCatchUpConfigResponse>, Status> {
101 let inner = req.into_inner();
102 self.send_cmd(|reply| RuntimeCommand::UpdateCatchUpConfiguration {
103 current: inner.current.unwrap_or_default().into(),
104 previous: inner.previous.unwrap_or_default().into(),
105 reply,
106 })
107 .await?;
108 Ok(Response::new(UpdateCatchUpConfigResponse {}))
109 }
110
111 async fn update_current_configuration(
112 &self,
113 req: Request<UpdateCurrentConfigRequest>,
114 ) -> Result<Response<UpdateCurrentConfigResponse>, Status> {
115 let inner = req.into_inner();
116 self.send_cmd(|reply| RuntimeCommand::UpdateCurrentConfiguration {
117 current: inner.current.unwrap_or_default().into(),
118 reply,
119 })
120 .await?;
121 Ok(Response::new(UpdateCurrentConfigResponse {}))
122 }
123
124 async fn wait_for_catch_up_quorum(
125 &self,
126 req: Request<WaitForCatchUpQuorumRequest>,
127 ) -> Result<Response<WaitForCatchUpQuorumResponse>, Status> {
128 let mode = crate::types::ReplicaSetQuorumMode::from(req.into_inner().mode);
129 self.send_cmd(|reply| RuntimeCommand::WaitForCatchUpQuorum { mode, reply })
130 .await?;
131 Ok(Response::new(WaitForCatchUpQuorumResponse {}))
132 }
133
134 async fn build_replica(
135 &self,
136 req: Request<BuildReplicaRequest>,
137 ) -> Result<Response<BuildReplicaResponse>, Status> {
138 let replica = req
139 .into_inner()
140 .replica
141 .ok_or_else(|| Status::invalid_argument("missing replica"))?
142 .into();
143 self.send_cmd(|reply| RuntimeCommand::BuildReplica { replica, reply })
144 .await?;
145 Ok(Response::new(BuildReplicaResponse {}))
146 }
147
148 async fn remove_replica(
149 &self,
150 req: Request<RemoveReplicaRequest>,
151 ) -> Result<Response<RemoveReplicaResponse>, Status> {
152 let replica_id = req.into_inner().replica_id;
153 self.send_cmd(|reply| RuntimeCommand::RemoveReplica { replica_id, reply })
154 .await?;
155 Ok(Response::new(RemoveReplicaResponse {}))
156 }
157
158 async fn on_data_loss(
159 &self,
160 _req: Request<OnDataLossRequest>,
161 ) -> Result<Response<OnDataLossResponse>, Status> {
162 let action = self
163 .send_cmd(|reply| RuntimeCommand::OnDataLoss { reply })
164 .await?;
165 Ok(Response::new(OnDataLossResponse {
166 state_changed: action == crate::types::DataLossAction::StateChanged,
167 }))
168 }
169
170 async fn revoke_write_status(
171 &self,
172 _req: Request<RevokeWriteStatusRequest>,
173 ) -> Result<Response<RevokeWriteStatusResponse>, Status> {
174 self.send_cmd(|reply| RuntimeCommand::RevokeWriteStatus { reply })
175 .await?;
176 Ok(Response::new(RevokeWriteStatusResponse {}))
177 }
178}