1use std::time::Duration;
2
3use tokio::sync::{mpsc, oneshot};
4use tonic::transport::Server;
5use tracing::{info, warn};
6
7use crate::error::{KubericError, Result};
8use crate::events::{LifecycleEvent, ReplicatorControlEvent};
9use crate::replicator::{OpenContext, ReplicatorHandle};
10use crate::types::{
11 AccessStatus, CancellationToken, DataLossAction, Epoch, Lsn, OpenMode, ReplicaId, ReplicaInfo,
12 ReplicaSetConfig, ReplicaSetQuorumMode, Role,
13};
14
15const DEFAULT_REPLY_TIMEOUT: Duration = Duration::from_secs(30);
16
17pub enum RuntimeCommand {
24 Open {
25 mode: OpenMode,
26 reply: oneshot::Sender<Result<()>>,
27 },
28 Close {
29 reply: oneshot::Sender<Result<()>>,
30 },
31 ChangeRole {
32 epoch: Epoch,
33 role: Role,
34 reply: oneshot::Sender<Result<()>>,
35 },
36 UpdateEpoch {
37 epoch: Epoch,
38 reply: oneshot::Sender<Result<()>>,
39 },
40 UpdateCatchUpConfiguration {
41 current: ReplicaSetConfig,
42 previous: ReplicaSetConfig,
43 reply: oneshot::Sender<Result<()>>,
44 },
45 UpdateCurrentConfiguration {
46 current: ReplicaSetConfig,
47 reply: oneshot::Sender<Result<()>>,
48 },
49 WaitForCatchUpQuorum {
50 mode: ReplicaSetQuorumMode,
51 reply: oneshot::Sender<Result<()>>,
52 },
53 BuildReplica {
54 replica: ReplicaInfo,
55 reply: oneshot::Sender<Result<()>>,
56 },
57 RemoveReplica {
58 replica_id: ReplicaId,
59 reply: oneshot::Sender<Result<()>>,
60 },
61 OnDataLoss {
62 reply: oneshot::Sender<Result<DataLossAction>>,
63 },
64 RevokeWriteStatus {
65 reply: oneshot::Sender<Result<()>>,
66 },
67 GetStatus {
68 reply: oneshot::Sender<StatusInfo>,
69 },
70}
71
72pub struct StatusInfo {
74 pub role: Role,
75 pub epoch: Epoch,
76 pub current_progress: Lsn,
77 pub catch_up_capability: Lsn,
78 pub committed_lsn: Lsn,
79 pub healthy: bool,
80}
81
82pub struct PodRuntime {
87 lifecycle_tx: mpsc::Sender<LifecycleEvent>,
88 cmd_rx: mpsc::Receiver<RuntimeCommand>,
89 replicator_handle: Option<ReplicatorHandle>,
90 shutdown: CancellationToken,
91 reply_timeout: Duration,
92 role: Role,
93 epoch: Epoch,
94 replica_id: ReplicaId,
95 data_bind: String,
96}
97
98pub struct PodRuntimeBundle {
99 pub runtime: PodRuntime,
100 pub lifecycle_rx: mpsc::Receiver<LifecycleEvent>,
101 pub control_address: String,
102}
103
104pub struct PodRuntimeBuilder {
105 replica_id: ReplicaId,
106 reply_timeout: Duration,
107 control_bind: String,
108 data_bind: String,
109}
110
111impl PodRuntimeBuilder {
112 pub fn new(replica_id: ReplicaId) -> Self {
113 Self {
114 replica_id,
115 reply_timeout: DEFAULT_REPLY_TIMEOUT,
116 control_bind: "127.0.0.1:0".to_string(),
117 data_bind: "127.0.0.1:0".to_string(),
118 }
119 }
120
121 pub fn reply_timeout(mut self, timeout: Duration) -> Self {
122 self.reply_timeout = timeout;
123 self
124 }
125
126 pub fn control_bind(mut self, addr: String) -> Self {
127 self.control_bind = addr;
128 self
129 }
130
131 pub fn data_bind(mut self, addr: String) -> Self {
132 self.data_bind = addr;
133 self
134 }
135
136 pub async fn build(self) -> Result<PodRuntimeBundle> {
137 let shutdown = CancellationToken::new();
138 let (lifecycle_tx, lifecycle_rx) = mpsc::channel(16);
139 let (cmd_tx, cmd_rx) = mpsc::channel(16);
140
141 let control_server = crate::grpc::server::ControlServer::new(self.replica_id, cmd_tx);
145 let control_listener = tokio::net::TcpListener::bind(&self.control_bind)
146 .await
147 .map_err(|e| KubericError::Internal(Box::new(e)))?;
148 let control_addr = control_listener.local_addr().unwrap();
149 let control_address = format!("http://{}", control_addr);
150
151 let ctrl_shutdown = shutdown.child_token();
152 tokio::spawn(async move {
153 let _ = Server::builder()
154 .add_service(
155 crate::proto::replicator_control_server::ReplicatorControlServer::new(
156 control_server,
157 ),
158 )
159 .serve_with_incoming_shutdown(
160 tokio_stream::wrappers::TcpListenerStream::new(control_listener),
161 ctrl_shutdown.cancelled(),
162 )
163 .await;
164 });
165
166 tokio::time::sleep(Duration::from_millis(50)).await;
167
168 info!(
169 replica_id = self.replica_id,
170 %control_address,
171 "pod runtime started (replicator deferred to Open)"
172 );
173
174 let runtime = PodRuntime {
175 lifecycle_tx,
176 cmd_rx,
177 replicator_handle: None,
178 shutdown,
179 reply_timeout: self.reply_timeout,
180 role: Role::None,
181 epoch: Epoch::default(),
182 replica_id: self.replica_id,
183 data_bind: self.data_bind,
184 };
185
186 Ok(PodRuntimeBundle {
187 runtime,
188 lifecycle_rx,
189 control_address,
190 })
191 }
192}
193
194impl PodRuntime {
195 pub fn builder(replica_id: ReplicaId) -> PodRuntimeBuilder {
196 PodRuntimeBuilder::new(replica_id)
197 }
198
199 pub fn shutdown_token(&self) -> CancellationToken {
201 self.shutdown.clone()
202 }
203
204 pub async fn serve(mut self) {
208 info!("PodRuntime serve loop started");
209 while let Some(cmd) = self.cmd_rx.recv().await {
210 match cmd {
211 RuntimeCommand::Open { mode, reply } => {
212 let _ = reply.send(self.handle_open(mode).await);
213 }
214 RuntimeCommand::Close { reply } => {
215 let _ = reply.send(self.handle_close().await);
216 break;
217 }
218 RuntimeCommand::ChangeRole { epoch, role, reply } => {
219 let _ = reply.send(match self.require_handle() {
220 Ok(_) => self.handle_change_role(epoch, role).await,
221 Err(e) => Err(e),
222 });
223 }
224 RuntimeCommand::UpdateEpoch { epoch, reply } => {
225 let _ = reply.send(self.handle_update_epoch(epoch).await);
226 }
227 RuntimeCommand::UpdateCatchUpConfiguration {
228 current,
229 previous,
230 reply,
231 } => {
232 let _ = reply.send(
233 self.send_replicator_control(|r| {
234 ReplicatorControlEvent::UpdateCatchUpConfiguration {
235 current,
236 previous,
237 reply: r,
238 }
239 })
240 .await,
241 );
242 }
243 RuntimeCommand::UpdateCurrentConfiguration { current, reply } => {
244 let _ = reply.send(
245 self.send_replicator_control(|r| {
246 ReplicatorControlEvent::UpdateCurrentConfiguration { current, reply: r }
247 })
248 .await,
249 );
250 }
251 RuntimeCommand::WaitForCatchUpQuorum { mode, reply } => {
252 let _ = reply.send(
253 self.send_replicator_control(|r| {
254 ReplicatorControlEvent::WaitForCatchUpQuorum { mode, reply: r }
255 })
256 .await,
257 );
258 }
259 RuntimeCommand::BuildReplica { replica, reply } => {
260 let _ = reply.send(
263 self.send_replicator_control(|r| ReplicatorControlEvent::BuildReplica {
264 replica,
265 reply: r,
266 })
267 .await,
268 );
269 }
270 RuntimeCommand::RemoveReplica { replica_id, reply } => {
271 let _ = reply.send(
272 self.send_replicator_control(|r| ReplicatorControlEvent::RemoveReplica {
273 replica_id,
274 reply: r,
275 })
276 .await,
277 );
278 }
279 RuntimeCommand::OnDataLoss { reply } => {
280 let _ = reply.send(self.handle_on_data_loss().await);
281 }
282 RuntimeCommand::RevokeWriteStatus { reply } => {
283 info!("revoking write status for switchover");
284 if let Some(handle) = &self.replicator_handle {
285 handle
286 .state()
287 .set_write_status(AccessStatus::ReconfigurationPending);
288 }
289 let _ = reply.send(Ok(()));
290 }
291 RuntimeCommand::GetStatus { reply } => {
292 let handle = self.replicator_handle.as_ref();
293 let _ = reply.send(StatusInfo {
294 role: self.role,
295 epoch: self.epoch,
296 current_progress: handle.map_or(0, |h| h.state().current_progress()),
297 catch_up_capability: handle.map_or(0, |h| h.state().catch_up_capability()),
298 committed_lsn: handle.map_or(0, |h| h.state().committed_lsn()),
299 healthy: handle.is_some(),
300 });
301 }
302 }
303 }
304 self.shutdown.cancel();
305 }
306
307 fn require_handle(&self) -> Result<&ReplicatorHandle> {
313 self.replicator_handle
314 .as_ref()
315 .ok_or(KubericError::Internal("replicator not opened".into()))
316 }
317
318 async fn handle_open(&mut self, mode: OpenMode) -> Result<()> {
319 if self.replicator_handle.is_some() {
320 return Err(KubericError::Internal("already opened".into()));
321 }
322
323 let (fault_tx, _fault_rx) = mpsc::channel(4);
325
326 let handle: ReplicatorHandle = self
327 .send_lifecycle(|reply| LifecycleEvent::Open {
328 ctx: OpenContext {
329 replica_id: self.replica_id,
330 open_mode: mode,
331 data_bind: self.data_bind.clone(),
332 token: self.shutdown.child_token(),
333 fault_tx,
334 },
335 reply,
336 })
337 .await?;
338
339 handle
341 .send_control(
342 |r| ReplicatorControlEvent::Open { mode, reply: r },
343 self.reply_timeout,
344 )
345 .await?;
346
347 info!(
348 data_address = %handle.data_address(),
349 "replicator opened"
350 );
351
352 self.replicator_handle = Some(handle);
354 Ok(())
355 }
356
357 async fn handle_change_role(&mut self, epoch: Epoch, new_role: Role) -> Result<()> {
358 let old_role = self.role;
359 let is_promotion = new_role == Role::Primary
360 || (new_role == Role::ActiveSecondary && old_role == Role::IdleSecondary);
361
362 let handle = self.require_handle()?;
363
364 if is_promotion {
365 handle
367 .send_control(
368 |reply| ReplicatorControlEvent::ChangeRole {
369 epoch,
370 role: new_role,
371 reply,
372 },
373 self.reply_timeout,
374 )
375 .await?;
376 self.set_status_for_role(new_role);
377 let _: String = self
378 .send_lifecycle(|reply| LifecycleEvent::ChangeRole { new_role, reply })
379 .await?;
380 } else {
381 self.set_status_for_role(new_role);
383 let _: String = self
384 .send_lifecycle(|reply| LifecycleEvent::ChangeRole { new_role, reply })
385 .await?;
386 handle
387 .send_control(
388 |reply| ReplicatorControlEvent::ChangeRole {
389 epoch,
390 role: new_role,
391 reply,
392 },
393 self.reply_timeout,
394 )
395 .await?;
396 }
397
398 self.role = new_role;
399 self.epoch = epoch;
400 Ok(())
401 }
402
403 async fn handle_close(&mut self) -> Result<()> {
404 if let Some(handle) = &self.replicator_handle {
405 handle
406 .state()
407 .set_read_status(AccessStatus::ReconfigurationPending);
408 handle
409 .state()
410 .set_write_status(AccessStatus::ReconfigurationPending);
411 }
412
413 let _ = self
414 .send_lifecycle(|reply| LifecycleEvent::Close { reply })
415 .await;
416
417 if let Ok(handle) = self.require_handle() {
418 let _ = handle
419 .send_control(
420 |reply| ReplicatorControlEvent::Close { reply },
421 self.reply_timeout,
422 )
423 .await;
424 handle.state().set_read_status(AccessStatus::NotPrimary);
425 handle.state().set_write_status(AccessStatus::NotPrimary);
426 }
427
428 self.role = Role::None;
429 Ok(())
430 }
431
432 async fn handle_update_epoch(&mut self, epoch: Epoch) -> Result<()> {
433 self.send_replicator_control(|reply| ReplicatorControlEvent::UpdateEpoch { epoch, reply })
435 .await?;
436 self.epoch = epoch;
437 Ok(())
438 }
439
440 async fn handle_on_data_loss(&mut self) -> Result<DataLossAction> {
441 self.send_replicator_control(|reply| ReplicatorControlEvent::OnDataLoss { reply })
443 .await
444 }
445
446 async fn send_replicator_control<T>(
451 &self,
452 make: impl FnOnce(oneshot::Sender<Result<T>>) -> ReplicatorControlEvent,
453 ) -> Result<T> {
454 let handle = self.require_handle()?;
455 handle.send_control(make, self.reply_timeout).await
456 }
457
458 async fn send_lifecycle<T>(
459 &self,
460 make: impl FnOnce(oneshot::Sender<Result<T>>) -> LifecycleEvent,
461 ) -> Result<T> {
462 let (tx, rx) = oneshot::channel();
463 self.lifecycle_tx
464 .send(make(tx))
465 .await
466 .map_err(|_| KubericError::Closed)?;
467 match tokio::time::timeout(self.reply_timeout, rx).await {
468 Ok(Ok(result)) => result,
469 Ok(Err(_)) => Err(KubericError::Closed),
470 Err(_) => {
471 warn!("lifecycle event reply timed out");
472 Err(KubericError::Internal("lifecycle timeout".into()))
473 }
474 }
475 }
476
477 fn set_status_for_role(&self, role: Role) {
478 if let Some(handle) = &self.replicator_handle {
479 match role {
480 Role::Primary => {
481 handle.state().set_read_status(AccessStatus::Granted);
482 handle.state().set_write_status(AccessStatus::Granted);
483 }
484 _ => {
485 handle.state().set_read_status(AccessStatus::NotPrimary);
486 handle.state().set_write_status(AccessStatus::NotPrimary);
487 }
488 }
489 }
490 }
491}
492
493#[cfg(test)]
494mod tests {
495 use super::*;
496 use crate::events::LifecycleEvent;
497 use crate::replicator::WalReplicator;
498
499 #[tokio::test]
500 async fn test_pod_runtime_user_lifecycle() {
501 let bundle = PodRuntime::builder(1)
502 .reply_timeout(Duration::from_secs(5))
503 .build()
504 .await
505 .unwrap();
506
507 let runtime = bundle.runtime;
508 let mut lifecycle_rx = bundle.lifecycle_rx;
509
510 let user_handle = tokio::spawn(async move {
512 let mut replicator = None;
513 let mut replicated_lsns = vec![];
514
515 while let Some(event) = lifecycle_rx.recv().await {
516 match event {
517 LifecycleEvent::Open { ctx, reply } => {
518 let (sp_tx, _sp_rx) = mpsc::unbounded_channel();
520 let (handle, handles) = WalReplicator::create(
521 ctx.replica_id,
522 &ctx.data_bind,
523 ctx.fault_tx.clone(),
524 sp_tx,
525 )
526 .await
527 .unwrap();
528 replicator = Some(handles.replicator);
529 let _ = reply.send(Ok(handle));
530 }
531 LifecycleEvent::ChangeRole { new_role, reply } => {
532 if new_role == Role::Primary {
533 let r = replicator.as_ref().unwrap();
534 let lsn = r
535 .replicate(
536 bytes::Bytes::from("from-user"),
537 CancellationToken::new(),
538 )
539 .await
540 .unwrap();
541 replicated_lsns.push(lsn);
542 }
543 let _ = reply.send(Ok(String::new()));
544 }
545 LifecycleEvent::Close { reply } => {
546 let _ = reply.send(Ok(()));
547 break;
548 }
549 LifecycleEvent::Abort => break,
550 }
551 }
552 replicated_lsns
553 });
554
555 let runtime_handle = tokio::spawn(runtime.serve());
557
558 let mut client = crate::proto::replicator_control_client::ReplicatorControlClient::connect(
560 bundle.control_address.clone(),
561 )
562 .await
563 .unwrap();
564
565 client
567 .open(crate::proto::OpenRequest { mode: 0 })
568 .await
569 .unwrap();
570
571 client
573 .change_role(crate::proto::ChangeRoleRequest {
574 epoch: Some(crate::proto::EpochProto {
575 data_loss_number: 0,
576 configuration_number: 1,
577 }),
578 role: crate::proto::RoleProto::RoleIdleSecondary as i32,
579 })
580 .await
581 .unwrap();
582
583 client
584 .change_role(crate::proto::ChangeRoleRequest {
585 epoch: Some(crate::proto::EpochProto {
586 data_loss_number: 0,
587 configuration_number: 1,
588 }),
589 role: crate::proto::RoleProto::RoleActiveSecondary as i32,
590 })
591 .await
592 .unwrap();
593
594 client
595 .change_role(crate::proto::ChangeRoleRequest {
596 epoch: Some(crate::proto::EpochProto {
597 data_loss_number: 0,
598 configuration_number: 1,
599 }),
600 role: crate::proto::RoleProto::RolePrimary as i32,
601 })
602 .await
603 .unwrap();
604
605 client
607 .change_role(crate::proto::ChangeRoleRequest {
608 epoch: Some(crate::proto::EpochProto {
609 data_loss_number: 0,
610 configuration_number: 2,
611 }),
612 role: crate::proto::RoleProto::RoleActiveSecondary as i32,
613 })
614 .await
615 .unwrap();
616
617 client.close(crate::proto::CloseRequest {}).await.unwrap();
619
620 let lsns = user_handle.await.unwrap();
621 assert_eq!(lsns.len(), 1);
622 assert_eq!(lsns[0], 1);
623
624 runtime_handle.await.unwrap();
625 }
626}