Skip to main content

kuberic_core/
pod.rs

1use std::time::Duration;
2
3use tokio::sync::{mpsc, oneshot};
4use tonic::transport::Server;
5use tracing::{info, warn};
6
7use crate::error::{KubericError, Result};
8use crate::events::{LifecycleEvent, ReplicatorControlEvent};
9use crate::replicator::{OpenContext, ReplicatorHandle};
10use crate::types::{
11    AccessStatus, CancellationToken, DataLossAction, Epoch, Lsn, OpenMode, ReplicaId, ReplicaInfo,
12    ReplicaSetConfig, ReplicaSetQuorumMode, Role,
13};
14
15const DEFAULT_REPLY_TIMEOUT: Duration = Duration::from_secs(30);
16
17// ---------------------------------------------------------------------------
18// RuntimeCommand — what the gRPC control server sends to the runtime
19// ---------------------------------------------------------------------------
20
21/// Commands sent by the gRPC ControlServer to the PodRuntime.
22/// The runtime processes these with correct ordering (replicator + user events).
23pub enum RuntimeCommand {
24    Open {
25        mode: OpenMode,
26        reply: oneshot::Sender<Result<()>>,
27    },
28    Close {
29        reply: oneshot::Sender<Result<()>>,
30    },
31    ChangeRole {
32        epoch: Epoch,
33        role: Role,
34        reply: oneshot::Sender<Result<()>>,
35    },
36    UpdateEpoch {
37        epoch: Epoch,
38        reply: oneshot::Sender<Result<()>>,
39    },
40    UpdateCatchUpConfiguration {
41        current: ReplicaSetConfig,
42        previous: ReplicaSetConfig,
43        reply: oneshot::Sender<Result<()>>,
44    },
45    UpdateCurrentConfiguration {
46        current: ReplicaSetConfig,
47        reply: oneshot::Sender<Result<()>>,
48    },
49    WaitForCatchUpQuorum {
50        mode: ReplicaSetQuorumMode,
51        reply: oneshot::Sender<Result<()>>,
52    },
53    BuildReplica {
54        replica: ReplicaInfo,
55        reply: oneshot::Sender<Result<()>>,
56    },
57    RemoveReplica {
58        replica_id: ReplicaId,
59        reply: oneshot::Sender<Result<()>>,
60    },
61    OnDataLoss {
62        reply: oneshot::Sender<Result<DataLossAction>>,
63    },
64    RevokeWriteStatus {
65        reply: oneshot::Sender<Result<()>>,
66    },
67    GetStatus {
68        reply: oneshot::Sender<StatusInfo>,
69    },
70}
71
72/// Status info returned by GetStatus.
73pub struct StatusInfo {
74    pub role: Role,
75    pub epoch: Epoch,
76    pub current_progress: Lsn,
77    pub catch_up_capability: Lsn,
78    pub committed_lsn: Lsn,
79    pub healthy: bool,
80}
81
82// ---------------------------------------------------------------------------
83// PodRuntime
84// ---------------------------------------------------------------------------
85
86pub struct PodRuntime {
87    lifecycle_tx: mpsc::Sender<LifecycleEvent>,
88    cmd_rx: mpsc::Receiver<RuntimeCommand>,
89    replicator_handle: Option<ReplicatorHandle>,
90    shutdown: CancellationToken,
91    reply_timeout: Duration,
92    role: Role,
93    epoch: Epoch,
94    replica_id: ReplicaId,
95    data_bind: String,
96}
97
98pub struct PodRuntimeBundle {
99    pub runtime: PodRuntime,
100    pub lifecycle_rx: mpsc::Receiver<LifecycleEvent>,
101    pub control_address: String,
102}
103
104pub struct PodRuntimeBuilder {
105    replica_id: ReplicaId,
106    reply_timeout: Duration,
107    control_bind: String,
108    data_bind: String,
109}
110
111impl PodRuntimeBuilder {
112    pub fn new(replica_id: ReplicaId) -> Self {
113        Self {
114            replica_id,
115            reply_timeout: DEFAULT_REPLY_TIMEOUT,
116            control_bind: "127.0.0.1:0".to_string(),
117            data_bind: "127.0.0.1:0".to_string(),
118        }
119    }
120
121    pub fn reply_timeout(mut self, timeout: Duration) -> Self {
122        self.reply_timeout = timeout;
123        self
124    }
125
126    pub fn control_bind(mut self, addr: String) -> Self {
127        self.control_bind = addr;
128        self
129    }
130
131    pub fn data_bind(mut self, addr: String) -> Self {
132        self.data_bind = addr;
133        self
134    }
135
136    pub async fn build(self) -> Result<PodRuntimeBundle> {
137        let shutdown = CancellationToken::new();
138        let (lifecycle_tx, lifecycle_rx) = mpsc::channel(16);
139        let (cmd_tx, cmd_rx) = mpsc::channel(16);
140
141        // Control plane gRPC server (runtime-owned, unchanged)
142        // PartitionState is not available yet — it's created by the replicator
143        // at Open time. ControlServer needs to work without it initially.
144        let control_server = crate::grpc::server::ControlServer::new(self.replica_id, cmd_tx);
145        let control_listener = tokio::net::TcpListener::bind(&self.control_bind)
146            .await
147            .map_err(|e| KubericError::Internal(Box::new(e)))?;
148        let control_addr = control_listener.local_addr().unwrap();
149        let control_address = format!("http://{}", control_addr);
150
151        let ctrl_shutdown = shutdown.child_token();
152        tokio::spawn(async move {
153            let _ = Server::builder()
154                .add_service(
155                    crate::proto::replicator_control_server::ReplicatorControlServer::new(
156                        control_server,
157                    ),
158                )
159                .serve_with_incoming_shutdown(
160                    tokio_stream::wrappers::TcpListenerStream::new(control_listener),
161                    ctrl_shutdown.cancelled(),
162                )
163                .await;
164        });
165
166        tokio::time::sleep(Duration::from_millis(50)).await;
167
168        info!(
169            replica_id = self.replica_id,
170            %control_address,
171            "pod runtime started (replicator deferred to Open)"
172        );
173
174        let runtime = PodRuntime {
175            lifecycle_tx,
176            cmd_rx,
177            replicator_handle: None,
178            shutdown,
179            reply_timeout: self.reply_timeout,
180            role: Role::None,
181            epoch: Epoch::default(),
182            replica_id: self.replica_id,
183            data_bind: self.data_bind,
184        };
185
186        Ok(PodRuntimeBundle {
187            runtime,
188            lifecycle_rx,
189            control_address,
190        })
191    }
192}
193
194impl PodRuntime {
195    pub fn builder(replica_id: ReplicaId) -> PodRuntimeBuilder {
196        PodRuntimeBuilder::new(replica_id)
197    }
198
199    /// Get the shutdown token. Cancelling it triggers graceful shutdown.
200    pub fn shutdown_token(&self) -> CancellationToken {
201        self.shutdown.clone()
202    }
203
204    /// Run the runtime command loop. Processes operator commands from the
205    /// gRPC control server with correct replicator/user event ordering.
206    /// Blocks until shutdown.
207    pub async fn serve(mut self) {
208        info!("PodRuntime serve loop started");
209        while let Some(cmd) = self.cmd_rx.recv().await {
210            match cmd {
211                RuntimeCommand::Open { mode, reply } => {
212                    let _ = reply.send(self.handle_open(mode).await);
213                }
214                RuntimeCommand::Close { reply } => {
215                    let _ = reply.send(self.handle_close().await);
216                    break;
217                }
218                RuntimeCommand::ChangeRole { epoch, role, reply } => {
219                    let _ = reply.send(match self.require_handle() {
220                        Ok(_) => self.handle_change_role(epoch, role).await,
221                        Err(e) => Err(e),
222                    });
223                }
224                RuntimeCommand::UpdateEpoch { epoch, reply } => {
225                    let _ = reply.send(self.handle_update_epoch(epoch).await);
226                }
227                RuntimeCommand::UpdateCatchUpConfiguration {
228                    current,
229                    previous,
230                    reply,
231                } => {
232                    let _ = reply.send(
233                        self.send_replicator_control(|r| {
234                            ReplicatorControlEvent::UpdateCatchUpConfiguration {
235                                current,
236                                previous,
237                                reply: r,
238                            }
239                        })
240                        .await,
241                    );
242                }
243                RuntimeCommand::UpdateCurrentConfiguration { current, reply } => {
244                    let _ = reply.send(
245                        self.send_replicator_control(|r| {
246                            ReplicatorControlEvent::UpdateCurrentConfiguration { current, reply: r }
247                        })
248                        .await,
249                    );
250                }
251                RuntimeCommand::WaitForCatchUpQuorum { mode, reply } => {
252                    let _ = reply.send(
253                        self.send_replicator_control(|r| {
254                            ReplicatorControlEvent::WaitForCatchUpQuorum { mode, reply: r }
255                        })
256                        .await,
257                    );
258                }
259                RuntimeCommand::BuildReplica { replica, reply } => {
260                    // Copy protocol now runs inside the replicator actor.
261                    // The actor spawns a task and replies when copy completes.
262                    let _ = reply.send(
263                        self.send_replicator_control(|r| ReplicatorControlEvent::BuildReplica {
264                            replica,
265                            reply: r,
266                        })
267                        .await,
268                    );
269                }
270                RuntimeCommand::RemoveReplica { replica_id, reply } => {
271                    let _ = reply.send(
272                        self.send_replicator_control(|r| ReplicatorControlEvent::RemoveReplica {
273                            replica_id,
274                            reply: r,
275                        })
276                        .await,
277                    );
278                }
279                RuntimeCommand::OnDataLoss { reply } => {
280                    let _ = reply.send(self.handle_on_data_loss().await);
281                }
282                RuntimeCommand::RevokeWriteStatus { reply } => {
283                    info!("revoking write status for switchover");
284                    if let Some(handle) = &self.replicator_handle {
285                        handle
286                            .state()
287                            .set_write_status(AccessStatus::ReconfigurationPending);
288                    }
289                    let _ = reply.send(Ok(()));
290                }
291                RuntimeCommand::GetStatus { reply } => {
292                    let handle = self.replicator_handle.as_ref();
293                    let _ = reply.send(StatusInfo {
294                        role: self.role,
295                        epoch: self.epoch,
296                        current_progress: handle.map_or(0, |h| h.state().current_progress()),
297                        catch_up_capability: handle.map_or(0, |h| h.state().catch_up_capability()),
298                        committed_lsn: handle.map_or(0, |h| h.state().committed_lsn()),
299                        healthy: handle.is_some(),
300                    });
301                }
302            }
303        }
304        self.shutdown.cancel();
305    }
306
307    // -----------------------------------------------------------------------
308    // Command handlers with correct ordering
309    // -----------------------------------------------------------------------
310
311    /// Get handle or return error for pre-Open commands.
312    fn require_handle(&self) -> Result<&ReplicatorHandle> {
313        self.replicator_handle
314            .as_ref()
315            .ok_or(KubericError::Internal("replicator not opened".into()))
316    }
317
318    async fn handle_open(&mut self, mode: OpenMode) -> Result<()> {
319        if self.replicator_handle.is_some() {
320            return Err(KubericError::Internal("already opened".into()));
321        }
322
323        // 1. Send OpenContext to user, receive ReplicatorHandle back
324        let (fault_tx, _fault_rx) = mpsc::channel(4);
325
326        let handle: ReplicatorHandle = self
327            .send_lifecycle(|reply| LifecycleEvent::Open {
328                ctx: OpenContext {
329                    replica_id: self.replica_id,
330                    open_mode: mode,
331                    data_bind: self.data_bind.clone(),
332                    token: self.shutdown.child_token(),
333                    fault_tx,
334                },
335                reply,
336            })
337            .await?;
338
339        // 2. Open the replicator (via channel)
340        handle
341            .send_control(
342                |r| ReplicatorControlEvent::Open { mode, reply: r },
343                self.reply_timeout,
344            )
345            .await?;
346
347        info!(
348            data_address = %handle.data_address(),
349            "replicator opened"
350        );
351
352        // 3. Store handle for future lifecycle calls
353        self.replicator_handle = Some(handle);
354        Ok(())
355    }
356
357    async fn handle_change_role(&mut self, epoch: Epoch, new_role: Role) -> Result<()> {
358        let old_role = self.role;
359        let is_promotion = new_role == Role::Primary
360            || (new_role == Role::ActiveSecondary && old_role == Role::IdleSecondary);
361
362        let handle = self.require_handle()?;
363
364        if is_promotion {
365            // Promotion: replicator first, then status, then user
366            handle
367                .send_control(
368                    |reply| ReplicatorControlEvent::ChangeRole {
369                        epoch,
370                        role: new_role,
371                        reply,
372                    },
373                    self.reply_timeout,
374                )
375                .await?;
376            self.set_status_for_role(new_role);
377            let _: String = self
378                .send_lifecycle(|reply| LifecycleEvent::ChangeRole { new_role, reply })
379                .await?;
380        } else {
381            // Demotion: status first, then user, then replicator
382            self.set_status_for_role(new_role);
383            let _: String = self
384                .send_lifecycle(|reply| LifecycleEvent::ChangeRole { new_role, reply })
385                .await?;
386            handle
387                .send_control(
388                    |reply| ReplicatorControlEvent::ChangeRole {
389                        epoch,
390                        role: new_role,
391                        reply,
392                    },
393                    self.reply_timeout,
394                )
395                .await?;
396        }
397
398        self.role = new_role;
399        self.epoch = epoch;
400        Ok(())
401    }
402
403    async fn handle_close(&mut self) -> Result<()> {
404        if let Some(handle) = &self.replicator_handle {
405            handle
406                .state()
407                .set_read_status(AccessStatus::ReconfigurationPending);
408            handle
409                .state()
410                .set_write_status(AccessStatus::ReconfigurationPending);
411        }
412
413        let _ = self
414            .send_lifecycle(|reply| LifecycleEvent::Close { reply })
415            .await;
416
417        if let Ok(handle) = self.require_handle() {
418            let _ = handle
419                .send_control(
420                    |reply| ReplicatorControlEvent::Close { reply },
421                    self.reply_timeout,
422                )
423                .await;
424            handle.state().set_read_status(AccessStatus::NotPrimary);
425            handle.state().set_write_status(AccessStatus::NotPrimary);
426        }
427
428        self.role = Role::None;
429        Ok(())
430    }
431
432    async fn handle_update_epoch(&mut self, epoch: Epoch) -> Result<()> {
433        // Route entirely through replicator — it handles user notification
434        self.send_replicator_control(|reply| ReplicatorControlEvent::UpdateEpoch { epoch, reply })
435            .await?;
436        self.epoch = epoch;
437        Ok(())
438    }
439
440    async fn handle_on_data_loss(&mut self) -> Result<DataLossAction> {
441        // Route through replicator — it handles dual-query (replicator + user)
442        self.send_replicator_control(|reply| ReplicatorControlEvent::OnDataLoss { reply })
443            .await
444    }
445
446    // -----------------------------------------------------------------------
447    // Helpers
448    // -----------------------------------------------------------------------
449
450    async fn send_replicator_control<T>(
451        &self,
452        make: impl FnOnce(oneshot::Sender<Result<T>>) -> ReplicatorControlEvent,
453    ) -> Result<T> {
454        let handle = self.require_handle()?;
455        handle.send_control(make, self.reply_timeout).await
456    }
457
458    async fn send_lifecycle<T>(
459        &self,
460        make: impl FnOnce(oneshot::Sender<Result<T>>) -> LifecycleEvent,
461    ) -> Result<T> {
462        let (tx, rx) = oneshot::channel();
463        self.lifecycle_tx
464            .send(make(tx))
465            .await
466            .map_err(|_| KubericError::Closed)?;
467        match tokio::time::timeout(self.reply_timeout, rx).await {
468            Ok(Ok(result)) => result,
469            Ok(Err(_)) => Err(KubericError::Closed),
470            Err(_) => {
471                warn!("lifecycle event reply timed out");
472                Err(KubericError::Internal("lifecycle timeout".into()))
473            }
474        }
475    }
476
477    fn set_status_for_role(&self, role: Role) {
478        if let Some(handle) = &self.replicator_handle {
479            match role {
480                Role::Primary => {
481                    handle.state().set_read_status(AccessStatus::Granted);
482                    handle.state().set_write_status(AccessStatus::Granted);
483                }
484                _ => {
485                    handle.state().set_read_status(AccessStatus::NotPrimary);
486                    handle.state().set_write_status(AccessStatus::NotPrimary);
487                }
488            }
489        }
490    }
491}
492
493#[cfg(test)]
494mod tests {
495    use super::*;
496    use crate::events::LifecycleEvent;
497    use crate::replicator::WalReplicator;
498
499    #[tokio::test]
500    async fn test_pod_runtime_user_lifecycle() {
501        let bundle = PodRuntime::builder(1)
502            .reply_timeout(Duration::from_secs(5))
503            .build()
504            .await
505            .unwrap();
506
507        let runtime = bundle.runtime;
508        let mut lifecycle_rx = bundle.lifecycle_rx;
509
510        // Spawn user event loop — creates replicator at Open
511        let user_handle = tokio::spawn(async move {
512            let mut replicator = None;
513            let mut replicated_lsns = vec![];
514
515            while let Some(event) = lifecycle_rx.recv().await {
516                match event {
517                    LifecycleEvent::Open { ctx, reply } => {
518                        // User creates channel and replicator
519                        let (sp_tx, _sp_rx) = mpsc::unbounded_channel();
520                        let (handle, handles) = WalReplicator::create(
521                            ctx.replica_id,
522                            &ctx.data_bind,
523                            ctx.fault_tx.clone(),
524                            sp_tx,
525                        )
526                        .await
527                        .unwrap();
528                        replicator = Some(handles.replicator);
529                        let _ = reply.send(Ok(handle));
530                    }
531                    LifecycleEvent::ChangeRole { new_role, reply } => {
532                        if new_role == Role::Primary {
533                            let r = replicator.as_ref().unwrap();
534                            let lsn = r
535                                .replicate(
536                                    bytes::Bytes::from("from-user"),
537                                    CancellationToken::new(),
538                                )
539                                .await
540                                .unwrap();
541                            replicated_lsns.push(lsn);
542                        }
543                        let _ = reply.send(Ok(String::new()));
544                    }
545                    LifecycleEvent::Close { reply } => {
546                        let _ = reply.send(Ok(()));
547                        break;
548                    }
549                    LifecycleEvent::Abort => break,
550                }
551            }
552            replicated_lsns
553        });
554
555        // Spawn the runtime command loop
556        let runtime_handle = tokio::spawn(runtime.serve());
557
558        // Drive lifecycle via the gRPC control server (simulating operator)
559        let mut client = crate::proto::replicator_control_client::ReplicatorControlClient::connect(
560            bundle.control_address.clone(),
561        )
562        .await
563        .unwrap();
564
565        // Open
566        client
567            .open(crate::proto::OpenRequest { mode: 0 })
568            .await
569            .unwrap();
570
571        // ChangeRole Idle → Active → Primary
572        client
573            .change_role(crate::proto::ChangeRoleRequest {
574                epoch: Some(crate::proto::EpochProto {
575                    data_loss_number: 0,
576                    configuration_number: 1,
577                }),
578                role: crate::proto::RoleProto::RoleIdleSecondary as i32,
579            })
580            .await
581            .unwrap();
582
583        client
584            .change_role(crate::proto::ChangeRoleRequest {
585                epoch: Some(crate::proto::EpochProto {
586                    data_loss_number: 0,
587                    configuration_number: 1,
588                }),
589                role: crate::proto::RoleProto::RoleActiveSecondary as i32,
590            })
591            .await
592            .unwrap();
593
594        client
595            .change_role(crate::proto::ChangeRoleRequest {
596                epoch: Some(crate::proto::EpochProto {
597                    data_loss_number: 0,
598                    configuration_number: 1,
599                }),
600                role: crate::proto::RoleProto::RolePrimary as i32,
601            })
602            .await
603            .unwrap();
604
605        // Demote
606        client
607            .change_role(crate::proto::ChangeRoleRequest {
608                epoch: Some(crate::proto::EpochProto {
609                    data_loss_number: 0,
610                    configuration_number: 2,
611                }),
612                role: crate::proto::RoleProto::RoleActiveSecondary as i32,
613            })
614            .await
615            .unwrap();
616
617        // Close
618        client.close(crate::proto::CloseRequest {}).await.unwrap();
619
620        let lsns = user_handle.await.unwrap();
621        assert_eq!(lsns.len(), 1);
622        assert_eq!(lsns[0], 1);
623
624        runtime_handle.await.unwrap();
625    }
626}