Skip to main content

kuberic_core/
runtime.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use tokio::sync::{mpsc, oneshot};
5use tracing::warn;
6
7use crate::error::{KubericError, Result};
8use crate::events::{LifecycleEvent, ReplicateRequest, ReplicatorControlEvent};
9use crate::handles::{PartitionHandle, PartitionState, StateReplicatorHandle};
10use crate::types::{AccessStatus, CancellationToken, Epoch, FaultType, OpenMode, Role};
11
12const DEFAULT_REPLY_TIMEOUT: Duration = Duration::from_secs(30);
13const DEFAULT_CONTROL_BUFFER: usize = 16;
14const DEFAULT_DATA_BUFFER: usize = 256;
15const DEFAULT_SERVICE_BUFFER: usize = 16;
16const DEFAULT_FAULT_BUFFER: usize = 4;
17
18/// The kuberic runtime. Wires the replicator actor, manages access status,
19/// enforces promotion/demotion ordering, and delivers events to the user.
20pub struct KubericRuntime {
21    lifecycle_tx: mpsc::Sender<LifecycleEvent>,
22    control_tx: mpsc::Sender<ReplicatorControlEvent>,
23    #[allow(dead_code)]
24    data_tx: mpsc::Sender<ReplicateRequest>,
25    state: Arc<PartitionState>,
26    #[allow(dead_code)]
27    fault_rx: mpsc::Receiver<FaultType>,
28    reply_timeout: Duration,
29    token: CancellationToken,
30}
31
32/// Builder for constructing a KubericRuntime.
33pub struct KubericRuntimeBuilder {
34    reply_timeout: Duration,
35    control_buffer: usize,
36    data_buffer: usize,
37    service_buffer: usize,
38}
39
40impl Default for KubericRuntimeBuilder {
41    fn default() -> Self {
42        Self::new()
43    }
44}
45
46impl KubericRuntimeBuilder {
47    pub fn new() -> Self {
48        Self {
49            reply_timeout: DEFAULT_REPLY_TIMEOUT,
50            control_buffer: DEFAULT_CONTROL_BUFFER,
51            data_buffer: DEFAULT_DATA_BUFFER,
52            service_buffer: DEFAULT_SERVICE_BUFFER,
53        }
54    }
55
56    pub fn reply_timeout(mut self, timeout: Duration) -> Self {
57        self.reply_timeout = timeout;
58        self
59    }
60
61    /// Build the runtime, returning it along with the channels needed to
62    /// spawn the replicator actor and the user service event receivers.
63    pub fn build(self) -> RuntimeBundle {
64        let (control_tx, control_rx) = mpsc::channel(self.control_buffer);
65        let (data_tx, data_rx) = mpsc::channel(self.data_buffer);
66        let state = Arc::new(PartitionState::new());
67        let (lifecycle_tx, lifecycle_rx) = mpsc::channel(self.service_buffer);
68        let (fault_tx, fault_rx) = mpsc::channel(DEFAULT_FAULT_BUFFER);
69
70        let partition = Arc::new(PartitionHandle::new(state.clone(), fault_tx));
71        let replicator_handle = StateReplicatorHandle::new(data_tx.clone(), state.clone());
72
73        let runtime = KubericRuntime {
74            lifecycle_tx,
75            control_tx,
76            data_tx,
77            state: state.clone(),
78            fault_rx,
79            reply_timeout: self.reply_timeout,
80            token: CancellationToken::new(),
81        };
82
83        RuntimeBundle {
84            runtime,
85            replicator_control_rx: control_rx,
86            replicator_data_rx: data_rx,
87            lifecycle_rx,
88            partition,
89            replicator_handle,
90            state,
91        }
92    }
93}
94
95/// Everything produced by the builder. The caller spawns the replicator actor
96/// with `replicator_control_rx` + `replicator_data_rx`, and runs the user
97/// service event loop with `lifecycle_rx`.
98pub struct RuntimeBundle {
99    pub runtime: KubericRuntime,
100    pub replicator_control_rx: mpsc::Receiver<ReplicatorControlEvent>,
101    pub replicator_data_rx: mpsc::Receiver<ReplicateRequest>,
102    pub lifecycle_rx: mpsc::Receiver<LifecycleEvent>,
103    pub partition: Arc<PartitionHandle>,
104    pub replicator_handle: StateReplicatorHandle,
105    pub state: Arc<PartitionState>,
106}
107
108impl KubericRuntime {
109    pub fn builder() -> KubericRuntimeBuilder {
110        KubericRuntimeBuilder::new()
111    }
112
113    /// Send a control event to the replicator and await the reply with timeout.
114    async fn send_control<T>(
115        &self,
116        make_event: impl FnOnce(oneshot::Sender<Result<T>>) -> ReplicatorControlEvent,
117    ) -> Result<T> {
118        let (tx, rx) = oneshot::channel();
119        self.control_tx
120            .send(make_event(tx))
121            .await
122            .map_err(|_| KubericError::Closed)?;
123
124        match tokio::time::timeout(self.reply_timeout, rx).await {
125            Ok(Ok(result)) => result,
126            Ok(Err(_)) => Err(KubericError::Closed),
127            Err(_) => {
128                warn!("replicator control event timed out");
129                Err(KubericError::Internal("replicator reply timeout".into()))
130            }
131        }
132    }
133
134    /// Send a lifecycle event to the user and await the reply with timeout.
135    async fn send_lifecycle<T>(
136        &self,
137        make_event: impl FnOnce(oneshot::Sender<Result<T>>) -> LifecycleEvent,
138    ) -> Result<T> {
139        let (tx, rx) = oneshot::channel();
140        let event = make_event(tx);
141
142        match tokio::time::timeout(self.reply_timeout, self.lifecycle_tx.send(event)).await {
143            Ok(Ok(())) => {}
144            Ok(Err(_)) => return Err(KubericError::Closed),
145            Err(_) => {
146                warn!("lifecycle event channel send timed out (backpressure)");
147                return Err(KubericError::Internal(
148                    "lifecycle event send timeout".into(),
149                ));
150            }
151        }
152
153        match tokio::time::timeout(self.reply_timeout, rx).await {
154            Ok(Ok(result)) => result,
155            Ok(Err(_)) => Err(KubericError::Closed),
156            Err(_) => {
157                warn!("lifecycle event reply timed out");
158                Err(KubericError::Internal("lifecycle reply timeout".into()))
159            }
160        }
161    }
162
163    // -----------------------------------------------------------------------
164    // Lifecycle operations (called by the operator or test harness)
165    // -----------------------------------------------------------------------
166
167    /// Open the replicator. In the new API, the user creates the replicator
168    /// in the Open callback. This legacy runtime pre-creates everything and
169    /// sends OpenContext for the user to create the replicator.
170    ///
171    /// For backward compatibility with tests that use RuntimeBundle's
172    /// pre-created channels, this method sends a minimal OpenContext and
173    /// the test's Open handler returns a ReplicatorHandle that wraps
174    /// the pre-created control_tx.
175    pub async fn open(&self, mode: OpenMode) -> Result<()> {
176        // 1. Open replicator directly (we still hold control_tx)
177        self.send_control(|reply| ReplicatorControlEvent::Open { mode, reply })
178            .await?;
179
180        // 2. Deliver Open to user — they return a ReplicatorHandle
181        // For KubericRuntime tests, the user returns a handle wrapping
182        // the same control_tx we already have (no-op separation).
183        let _handle: crate::replicator::ReplicatorHandle = self
184            .send_lifecycle(|reply| LifecycleEvent::Open {
185                ctx: crate::replicator::OpenContext {
186                    replica_id: 0, // tests don't use this
187                    open_mode: mode,
188                    data_bind: "127.0.0.1:0".to_string(),
189                    token: self.token.child_token(),
190                    fault_tx: mpsc::channel(4).0,
191                },
192                reply,
193            })
194            .await?;
195
196        Ok(())
197    }
198
199    /// Change role with correct ordering:
200    /// - Promotion (to Primary): replicator first, then user
201    /// - Demotion (from Primary): user first, then replicator
202    pub async fn change_role(&self, epoch: Epoch, new_role: Role, old_role: Role) -> Result<()> {
203        let is_promotion = new_role == Role::Primary
204            || (new_role == Role::ActiveSecondary && old_role == Role::IdleSecondary);
205
206        if is_promotion {
207            // Promotion: replicator sets up replication BEFORE user starts work
208            self.send_control(|reply| ReplicatorControlEvent::ChangeRole {
209                epoch,
210                role: new_role,
211                reply,
212            })
213            .await?;
214
215            self.set_status_for_role(new_role);
216
217            let _addr: String = self
218                .send_lifecycle(|reply| LifecycleEvent::ChangeRole { new_role, reply })
219                .await?;
220        } else {
221            // Demotion: user stops work BEFORE replicator tears down
222            self.set_status_for_role(new_role);
223
224            let _addr: String = self
225                .send_lifecycle(|reply| LifecycleEvent::ChangeRole { new_role, reply })
226                .await?;
227
228            self.send_control(|reply| ReplicatorControlEvent::ChangeRole {
229                epoch,
230                role: new_role,
231                reply,
232            })
233            .await?;
234        }
235
236        Ok(())
237    }
238
239    /// Close: user first, then replicator.
240    pub async fn close(&self) -> Result<()> {
241        self.state
242            .set_read_status(AccessStatus::ReconfigurationPending);
243        self.state
244            .set_write_status(AccessStatus::ReconfigurationPending);
245
246        // 1. Close user service
247        let _ = self
248            .send_lifecycle(|reply| LifecycleEvent::Close { reply })
249            .await;
250
251        // 2. Close replicator
252        let _ = self
253            .send_control(|reply| ReplicatorControlEvent::Close { reply })
254            .await;
255
256        self.state.set_read_status(AccessStatus::NotPrimary);
257        self.state.set_write_status(AccessStatus::NotPrimary);
258
259        self.token.cancel();
260        Ok(())
261    }
262
263    /// Abort: fire-and-forget to both.
264    pub fn abort(&self) {
265        self.state.set_read_status(AccessStatus::NotPrimary);
266        self.state.set_write_status(AccessStatus::NotPrimary);
267
268        let _ = self.lifecycle_tx.try_send(LifecycleEvent::Abort);
269        let _ = self.control_tx.try_send(ReplicatorControlEvent::Abort);
270
271        self.token.cancel();
272    }
273
274    // -----------------------------------------------------------------------
275    // Replicator-only control operations (no lifecycle event counterpart)
276    // -----------------------------------------------------------------------
277
278    pub async fn update_epoch(&self, epoch: Epoch) -> Result<()> {
279        self.send_control(|reply| ReplicatorControlEvent::UpdateEpoch { epoch, reply })
280            .await
281    }
282
283    pub async fn update_catch_up_configuration(
284        &self,
285        current: crate::types::ReplicaSetConfig,
286        previous: crate::types::ReplicaSetConfig,
287    ) -> Result<()> {
288        self.send_control(|reply| ReplicatorControlEvent::UpdateCatchUpConfiguration {
289            current,
290            previous,
291            reply,
292        })
293        .await
294    }
295
296    pub async fn update_current_configuration(
297        &self,
298        current: crate::types::ReplicaSetConfig,
299    ) -> Result<()> {
300        self.send_control(|reply| ReplicatorControlEvent::UpdateCurrentConfiguration {
301            current,
302            reply,
303        })
304        .await
305    }
306
307    pub async fn wait_for_catch_up_quorum(
308        &self,
309        mode: crate::types::ReplicaSetQuorumMode,
310    ) -> Result<()> {
311        self.send_control(|reply| ReplicatorControlEvent::WaitForCatchUpQuorum { mode, reply })
312            .await
313    }
314
315    pub async fn build_replica(&self, replica: crate::types::ReplicaInfo) -> Result<()> {
316        self.send_control(|reply| ReplicatorControlEvent::BuildReplica { replica, reply })
317            .await
318    }
319
320    pub async fn remove_replica(&self, replica_id: crate::types::ReplicaId) -> Result<()> {
321        self.send_control(|reply| ReplicatorControlEvent::RemoveReplica { replica_id, reply })
322            .await
323    }
324
325    pub async fn on_data_loss(&self) -> Result<crate::types::DataLossAction> {
326        self.send_control(|reply| ReplicatorControlEvent::OnDataLoss { reply })
327            .await
328    }
329
330    // -----------------------------------------------------------------------
331    // Status queries (lock-free reads from PartitionState)
332    // -----------------------------------------------------------------------
333
334    pub fn read_status(&self) -> AccessStatus {
335        self.state.read_status()
336    }
337
338    pub fn write_status(&self) -> AccessStatus {
339        self.state.write_status()
340    }
341
342    pub fn current_progress(&self) -> crate::types::Lsn {
343        self.state.current_progress()
344    }
345
346    pub fn catch_up_capability(&self) -> crate::types::Lsn {
347        self.state.catch_up_capability()
348    }
349
350    pub fn committed_lsn(&self) -> crate::types::Lsn {
351        self.state.committed_lsn()
352    }
353
354    // -----------------------------------------------------------------------
355    // Internal
356    // -----------------------------------------------------------------------
357
358    fn set_status_for_role(&self, role: Role) {
359        match role {
360            Role::Primary => {
361                self.state.set_read_status(AccessStatus::Granted);
362                self.state.set_write_status(AccessStatus::Granted);
363            }
364            _ => {
365                self.state.set_read_status(AccessStatus::NotPrimary);
366                self.state.set_write_status(AccessStatus::NotPrimary);
367            }
368        }
369    }
370}
371
372#[cfg(test)]
373mod tests {
374    use super::*;
375    use crate::events::LifecycleEvent;
376    use crate::noop::NoopReplicator;
377    use crate::types::AccessStatus;
378
379    /// Full lifecycle integration test with NoopReplicator.
380    /// Exercises: open → promote → replicate → demote → close.
381    #[tokio::test]
382    async fn test_runtime_full_lifecycle() {
383        let bundle = KubericRuntime::builder()
384            .reply_timeout(Duration::from_secs(5))
385            .build();
386
387        let state = bundle.state.clone();
388        let _partition = bundle.partition.clone();
389        let runtime = bundle.runtime;
390        let mut lifecycle_rx = bundle.lifecycle_rx;
391
392        // Spawn the noop replicator actor
393        let repl_state = state.clone();
394        let repl_handle = tokio::spawn(async move {
395            NoopReplicator::run(
396                bundle.replicator_control_rx,
397                bundle.replicator_data_rx,
398                repl_state,
399            )
400            .await;
401        });
402
403        // Spawn the user lifecycle event loop
404        let user_replicator = bundle.replicator_handle;
405        let user_handle = tokio::spawn(async move {
406            let mut svc_replicator = None;
407
408            while let Some(event) = lifecycle_rx.recv().await {
409                match event {
410                    LifecycleEvent::Open { ctx: _, reply } => {
411                        // In KubericRuntime tests, the replicator is pre-created.
412                        // We return a dummy handle since the runtime already opened
413                        // the replicator directly.
414                        svc_replicator = Some(user_replicator.clone());
415                        let dummy_handle = crate::replicator::ReplicatorHandle::new(
416                            mpsc::channel(1).0,
417                            Arc::new(PartitionState::new()),
418                            String::new(),
419                            CancellationToken::new(),
420                        );
421                        let _ = reply.send(Ok(dummy_handle));
422                    }
423                    LifecycleEvent::ChangeRole { new_role, reply } => {
424                        if new_role == Role::Primary {
425                            // Verify we can replicate as primary
426                            let repl = svc_replicator.as_ref().unwrap();
427                            let lsn = repl
428                                .replicate(
429                                    bytes::Bytes::from("from-user-handler"),
430                                    CancellationToken::new(),
431                                )
432                                .await
433                                .unwrap();
434                            assert!(lsn > 0);
435                        }
436                        let _ = reply.send(Ok(String::new()));
437                    }
438                    LifecycleEvent::Close { reply } => {
439                        let _ = reply.send(Ok(()));
440                        break;
441                    }
442                    LifecycleEvent::Abort => break,
443                }
444            }
445        });
446
447        // --- Drive lifecycle as the operator would ---
448
449        // 1. Open
450        runtime.open(OpenMode::New).await.unwrap();
451
452        // Status: not primary yet
453        assert_eq!(state.read_status(), AccessStatus::NotPrimary);
454        assert_eq!(state.write_status(), AccessStatus::NotPrimary);
455
456        // 2. ChangeRole to IdleSecondary
457        runtime
458            .change_role(Epoch::new(0, 1), Role::IdleSecondary, Role::None)
459            .await
460            .unwrap();
461
462        assert_eq!(state.read_status(), AccessStatus::NotPrimary);
463
464        // 3. ChangeRole to ActiveSecondary (promotion from idle)
465        runtime
466            .change_role(Epoch::new(0, 1), Role::ActiveSecondary, Role::IdleSecondary)
467            .await
468            .unwrap();
469
470        assert_eq!(state.read_status(), AccessStatus::NotPrimary);
471
472        // 4. ChangeRole to Primary (promotion)
473        runtime
474            .change_role(Epoch::new(0, 1), Role::Primary, Role::ActiveSecondary)
475            .await
476            .unwrap();
477
478        assert_eq!(state.read_status(), AccessStatus::Granted);
479        assert_eq!(state.write_status(), AccessStatus::Granted);
480
481        // 5. Replicate via the partition handle (outside user event loop)
482        let repl = StateReplicatorHandle::new(runtime.data_tx.clone(), state.clone());
483        let lsn = repl
484            .replicate(bytes::Bytes::from("from-runtime"), CancellationToken::new())
485            .await
486            .unwrap();
487        assert!(lsn > 0);
488        assert!(state.current_progress() > 0);
489
490        // 6. ChangeRole to ActiveSecondary (demotion)
491        runtime
492            .change_role(Epoch::new(0, 2), Role::ActiveSecondary, Role::Primary)
493            .await
494            .unwrap();
495
496        assert_eq!(state.read_status(), AccessStatus::NotPrimary);
497        assert_eq!(state.write_status(), AccessStatus::NotPrimary);
498
499        // 7. Close
500        runtime.close().await.unwrap();
501
502        // 8. Wait for actors to finish
503        repl_handle.await.unwrap();
504        user_handle.await.unwrap();
505    }
506
507    /// Test that replicate fails with NotPrimary when not promoted.
508    #[tokio::test]
509    async fn test_runtime_replicate_before_promote() {
510        let bundle = KubericRuntime::builder()
511            .reply_timeout(Duration::from_secs(5))
512            .build();
513
514        let state = bundle.state.clone();
515        let runtime = bundle.runtime;
516
517        // Spawn noop replicator
518        let repl_state = state.clone();
519        let _repl_handle = tokio::spawn(async move {
520            NoopReplicator::run(
521                bundle.replicator_control_rx,
522                bundle.replicator_data_rx,
523                repl_state,
524            )
525            .await;
526        });
527
528        // Spawn minimal user lifecycle loop
529        let mut lifecycle_rx = bundle.lifecycle_rx;
530        let _user_handle = tokio::spawn(async move {
531            while let Some(event) = lifecycle_rx.recv().await {
532                match event {
533                    LifecycleEvent::Open { reply, .. } => {
534                        let dummy_handle = crate::replicator::ReplicatorHandle::new(
535                            mpsc::channel(1).0,
536                            Arc::new(PartitionState::new()),
537                            String::new(),
538                            CancellationToken::new(),
539                        );
540                        let _ = reply.send(Ok(dummy_handle));
541                    }
542                    LifecycleEvent::ChangeRole { reply, .. } => {
543                        let _ = reply.send(Ok(String::new()));
544                    }
545                    LifecycleEvent::Close { reply } => {
546                        let _ = reply.send(Ok(()));
547                        break;
548                    }
549                    LifecycleEvent::Abort => break,
550                }
551            }
552        });
553
554        // Open but don't promote
555        runtime.open(OpenMode::New).await.unwrap();
556
557        // Try to replicate — should fail fast
558        let repl = StateReplicatorHandle::new(runtime.data_tx.clone(), state.clone());
559        let result = repl
560            .replicate(bytes::Bytes::from("should-fail"), CancellationToken::new())
561            .await;
562
563        assert!(matches!(result, Err(KubericError::NotPrimary)));
564
565        runtime.close().await.unwrap();
566    }
567
568    /// Test abort path.
569    #[tokio::test]
570    async fn test_runtime_abort() {
571        let bundle = KubericRuntime::builder()
572            .reply_timeout(Duration::from_secs(5))
573            .build();
574
575        let state = bundle.state.clone();
576        let runtime = bundle.runtime;
577
578        let repl_state = state.clone();
579        let repl_handle = tokio::spawn(async move {
580            NoopReplicator::run(
581                bundle.replicator_control_rx,
582                bundle.replicator_data_rx,
583                repl_state,
584            )
585            .await;
586        });
587
588        let mut lifecycle_rx = bundle.lifecycle_rx;
589        let user_handle = tokio::spawn(async move {
590            while let Some(event) = lifecycle_rx.recv().await {
591                if let LifecycleEvent::Abort = event {
592                    break;
593                }
594            }
595        });
596
597        // Abort without open
598        runtime.abort();
599
600        assert_eq!(state.read_status(), AccessStatus::NotPrimary);
601        assert_eq!(state.write_status(), AccessStatus::NotPrimary);
602
603        repl_handle.await.unwrap();
604        user_handle.await.unwrap();
605    }
606}