1use std::sync::Arc;
2use std::time::Duration;
3
4use tokio::sync::{mpsc, oneshot};
5use tracing::warn;
6
7use crate::error::{KubericError, Result};
8use crate::events::{LifecycleEvent, ReplicateRequest, ReplicatorControlEvent};
9use crate::handles::{PartitionHandle, PartitionState, StateReplicatorHandle};
10use crate::types::{AccessStatus, CancellationToken, Epoch, FaultType, OpenMode, Role};
11
12const DEFAULT_REPLY_TIMEOUT: Duration = Duration::from_secs(30);
13const DEFAULT_CONTROL_BUFFER: usize = 16;
14const DEFAULT_DATA_BUFFER: usize = 256;
15const DEFAULT_SERVICE_BUFFER: usize = 16;
16const DEFAULT_FAULT_BUFFER: usize = 4;
17
18pub struct KubericRuntime {
21 lifecycle_tx: mpsc::Sender<LifecycleEvent>,
22 control_tx: mpsc::Sender<ReplicatorControlEvent>,
23 #[allow(dead_code)]
24 data_tx: mpsc::Sender<ReplicateRequest>,
25 state: Arc<PartitionState>,
26 #[allow(dead_code)]
27 fault_rx: mpsc::Receiver<FaultType>,
28 reply_timeout: Duration,
29 token: CancellationToken,
30}
31
32pub struct KubericRuntimeBuilder {
34 reply_timeout: Duration,
35 control_buffer: usize,
36 data_buffer: usize,
37 service_buffer: usize,
38}
39
40impl Default for KubericRuntimeBuilder {
41 fn default() -> Self {
42 Self::new()
43 }
44}
45
46impl KubericRuntimeBuilder {
47 pub fn new() -> Self {
48 Self {
49 reply_timeout: DEFAULT_REPLY_TIMEOUT,
50 control_buffer: DEFAULT_CONTROL_BUFFER,
51 data_buffer: DEFAULT_DATA_BUFFER,
52 service_buffer: DEFAULT_SERVICE_BUFFER,
53 }
54 }
55
56 pub fn reply_timeout(mut self, timeout: Duration) -> Self {
57 self.reply_timeout = timeout;
58 self
59 }
60
61 pub fn build(self) -> RuntimeBundle {
64 let (control_tx, control_rx) = mpsc::channel(self.control_buffer);
65 let (data_tx, data_rx) = mpsc::channel(self.data_buffer);
66 let state = Arc::new(PartitionState::new());
67 let (lifecycle_tx, lifecycle_rx) = mpsc::channel(self.service_buffer);
68 let (fault_tx, fault_rx) = mpsc::channel(DEFAULT_FAULT_BUFFER);
69
70 let partition = Arc::new(PartitionHandle::new(state.clone(), fault_tx));
71 let replicator_handle = StateReplicatorHandle::new(data_tx.clone(), state.clone());
72
73 let runtime = KubericRuntime {
74 lifecycle_tx,
75 control_tx,
76 data_tx,
77 state: state.clone(),
78 fault_rx,
79 reply_timeout: self.reply_timeout,
80 token: CancellationToken::new(),
81 };
82
83 RuntimeBundle {
84 runtime,
85 replicator_control_rx: control_rx,
86 replicator_data_rx: data_rx,
87 lifecycle_rx,
88 partition,
89 replicator_handle,
90 state,
91 }
92 }
93}
94
95pub struct RuntimeBundle {
99 pub runtime: KubericRuntime,
100 pub replicator_control_rx: mpsc::Receiver<ReplicatorControlEvent>,
101 pub replicator_data_rx: mpsc::Receiver<ReplicateRequest>,
102 pub lifecycle_rx: mpsc::Receiver<LifecycleEvent>,
103 pub partition: Arc<PartitionHandle>,
104 pub replicator_handle: StateReplicatorHandle,
105 pub state: Arc<PartitionState>,
106}
107
108impl KubericRuntime {
109 pub fn builder() -> KubericRuntimeBuilder {
110 KubericRuntimeBuilder::new()
111 }
112
113 async fn send_control<T>(
115 &self,
116 make_event: impl FnOnce(oneshot::Sender<Result<T>>) -> ReplicatorControlEvent,
117 ) -> Result<T> {
118 let (tx, rx) = oneshot::channel();
119 self.control_tx
120 .send(make_event(tx))
121 .await
122 .map_err(|_| KubericError::Closed)?;
123
124 match tokio::time::timeout(self.reply_timeout, rx).await {
125 Ok(Ok(result)) => result,
126 Ok(Err(_)) => Err(KubericError::Closed),
127 Err(_) => {
128 warn!("replicator control event timed out");
129 Err(KubericError::Internal("replicator reply timeout".into()))
130 }
131 }
132 }
133
134 async fn send_lifecycle<T>(
136 &self,
137 make_event: impl FnOnce(oneshot::Sender<Result<T>>) -> LifecycleEvent,
138 ) -> Result<T> {
139 let (tx, rx) = oneshot::channel();
140 let event = make_event(tx);
141
142 match tokio::time::timeout(self.reply_timeout, self.lifecycle_tx.send(event)).await {
143 Ok(Ok(())) => {}
144 Ok(Err(_)) => return Err(KubericError::Closed),
145 Err(_) => {
146 warn!("lifecycle event channel send timed out (backpressure)");
147 return Err(KubericError::Internal(
148 "lifecycle event send timeout".into(),
149 ));
150 }
151 }
152
153 match tokio::time::timeout(self.reply_timeout, rx).await {
154 Ok(Ok(result)) => result,
155 Ok(Err(_)) => Err(KubericError::Closed),
156 Err(_) => {
157 warn!("lifecycle event reply timed out");
158 Err(KubericError::Internal("lifecycle reply timeout".into()))
159 }
160 }
161 }
162
163 pub async fn open(&self, mode: OpenMode) -> Result<()> {
176 self.send_control(|reply| ReplicatorControlEvent::Open { mode, reply })
178 .await?;
179
180 let _handle: crate::replicator::ReplicatorHandle = self
184 .send_lifecycle(|reply| LifecycleEvent::Open {
185 ctx: crate::replicator::OpenContext {
186 replica_id: 0, open_mode: mode,
188 data_bind: "127.0.0.1:0".to_string(),
189 token: self.token.child_token(),
190 fault_tx: mpsc::channel(4).0,
191 },
192 reply,
193 })
194 .await?;
195
196 Ok(())
197 }
198
199 pub async fn change_role(&self, epoch: Epoch, new_role: Role, old_role: Role) -> Result<()> {
203 let is_promotion = new_role == Role::Primary
204 || (new_role == Role::ActiveSecondary && old_role == Role::IdleSecondary);
205
206 if is_promotion {
207 self.send_control(|reply| ReplicatorControlEvent::ChangeRole {
209 epoch,
210 role: new_role,
211 reply,
212 })
213 .await?;
214
215 self.set_status_for_role(new_role);
216
217 let _addr: String = self
218 .send_lifecycle(|reply| LifecycleEvent::ChangeRole { new_role, reply })
219 .await?;
220 } else {
221 self.set_status_for_role(new_role);
223
224 let _addr: String = self
225 .send_lifecycle(|reply| LifecycleEvent::ChangeRole { new_role, reply })
226 .await?;
227
228 self.send_control(|reply| ReplicatorControlEvent::ChangeRole {
229 epoch,
230 role: new_role,
231 reply,
232 })
233 .await?;
234 }
235
236 Ok(())
237 }
238
239 pub async fn close(&self) -> Result<()> {
241 self.state
242 .set_read_status(AccessStatus::ReconfigurationPending);
243 self.state
244 .set_write_status(AccessStatus::ReconfigurationPending);
245
246 let _ = self
248 .send_lifecycle(|reply| LifecycleEvent::Close { reply })
249 .await;
250
251 let _ = self
253 .send_control(|reply| ReplicatorControlEvent::Close { reply })
254 .await;
255
256 self.state.set_read_status(AccessStatus::NotPrimary);
257 self.state.set_write_status(AccessStatus::NotPrimary);
258
259 self.token.cancel();
260 Ok(())
261 }
262
263 pub fn abort(&self) {
265 self.state.set_read_status(AccessStatus::NotPrimary);
266 self.state.set_write_status(AccessStatus::NotPrimary);
267
268 let _ = self.lifecycle_tx.try_send(LifecycleEvent::Abort);
269 let _ = self.control_tx.try_send(ReplicatorControlEvent::Abort);
270
271 self.token.cancel();
272 }
273
274 pub async fn update_epoch(&self, epoch: Epoch) -> Result<()> {
279 self.send_control(|reply| ReplicatorControlEvent::UpdateEpoch { epoch, reply })
280 .await
281 }
282
283 pub async fn update_catch_up_configuration(
284 &self,
285 current: crate::types::ReplicaSetConfig,
286 previous: crate::types::ReplicaSetConfig,
287 ) -> Result<()> {
288 self.send_control(|reply| ReplicatorControlEvent::UpdateCatchUpConfiguration {
289 current,
290 previous,
291 reply,
292 })
293 .await
294 }
295
296 pub async fn update_current_configuration(
297 &self,
298 current: crate::types::ReplicaSetConfig,
299 ) -> Result<()> {
300 self.send_control(|reply| ReplicatorControlEvent::UpdateCurrentConfiguration {
301 current,
302 reply,
303 })
304 .await
305 }
306
307 pub async fn wait_for_catch_up_quorum(
308 &self,
309 mode: crate::types::ReplicaSetQuorumMode,
310 ) -> Result<()> {
311 self.send_control(|reply| ReplicatorControlEvent::WaitForCatchUpQuorum { mode, reply })
312 .await
313 }
314
315 pub async fn build_replica(&self, replica: crate::types::ReplicaInfo) -> Result<()> {
316 self.send_control(|reply| ReplicatorControlEvent::BuildReplica { replica, reply })
317 .await
318 }
319
320 pub async fn remove_replica(&self, replica_id: crate::types::ReplicaId) -> Result<()> {
321 self.send_control(|reply| ReplicatorControlEvent::RemoveReplica { replica_id, reply })
322 .await
323 }
324
325 pub async fn on_data_loss(&self) -> Result<crate::types::DataLossAction> {
326 self.send_control(|reply| ReplicatorControlEvent::OnDataLoss { reply })
327 .await
328 }
329
330 pub fn read_status(&self) -> AccessStatus {
335 self.state.read_status()
336 }
337
338 pub fn write_status(&self) -> AccessStatus {
339 self.state.write_status()
340 }
341
342 pub fn current_progress(&self) -> crate::types::Lsn {
343 self.state.current_progress()
344 }
345
346 pub fn catch_up_capability(&self) -> crate::types::Lsn {
347 self.state.catch_up_capability()
348 }
349
350 pub fn committed_lsn(&self) -> crate::types::Lsn {
351 self.state.committed_lsn()
352 }
353
354 fn set_status_for_role(&self, role: Role) {
359 match role {
360 Role::Primary => {
361 self.state.set_read_status(AccessStatus::Granted);
362 self.state.set_write_status(AccessStatus::Granted);
363 }
364 _ => {
365 self.state.set_read_status(AccessStatus::NotPrimary);
366 self.state.set_write_status(AccessStatus::NotPrimary);
367 }
368 }
369 }
370}
371
372#[cfg(test)]
373mod tests {
374 use super::*;
375 use crate::events::LifecycleEvent;
376 use crate::noop::NoopReplicator;
377 use crate::types::AccessStatus;
378
379 #[tokio::test]
382 async fn test_runtime_full_lifecycle() {
383 let bundle = KubericRuntime::builder()
384 .reply_timeout(Duration::from_secs(5))
385 .build();
386
387 let state = bundle.state.clone();
388 let _partition = bundle.partition.clone();
389 let runtime = bundle.runtime;
390 let mut lifecycle_rx = bundle.lifecycle_rx;
391
392 let repl_state = state.clone();
394 let repl_handle = tokio::spawn(async move {
395 NoopReplicator::run(
396 bundle.replicator_control_rx,
397 bundle.replicator_data_rx,
398 repl_state,
399 )
400 .await;
401 });
402
403 let user_replicator = bundle.replicator_handle;
405 let user_handle = tokio::spawn(async move {
406 let mut svc_replicator = None;
407
408 while let Some(event) = lifecycle_rx.recv().await {
409 match event {
410 LifecycleEvent::Open { ctx: _, reply } => {
411 svc_replicator = Some(user_replicator.clone());
415 let dummy_handle = crate::replicator::ReplicatorHandle::new(
416 mpsc::channel(1).0,
417 Arc::new(PartitionState::new()),
418 String::new(),
419 CancellationToken::new(),
420 );
421 let _ = reply.send(Ok(dummy_handle));
422 }
423 LifecycleEvent::ChangeRole { new_role, reply } => {
424 if new_role == Role::Primary {
425 let repl = svc_replicator.as_ref().unwrap();
427 let lsn = repl
428 .replicate(
429 bytes::Bytes::from("from-user-handler"),
430 CancellationToken::new(),
431 )
432 .await
433 .unwrap();
434 assert!(lsn > 0);
435 }
436 let _ = reply.send(Ok(String::new()));
437 }
438 LifecycleEvent::Close { reply } => {
439 let _ = reply.send(Ok(()));
440 break;
441 }
442 LifecycleEvent::Abort => break,
443 }
444 }
445 });
446
447 runtime.open(OpenMode::New).await.unwrap();
451
452 assert_eq!(state.read_status(), AccessStatus::NotPrimary);
454 assert_eq!(state.write_status(), AccessStatus::NotPrimary);
455
456 runtime
458 .change_role(Epoch::new(0, 1), Role::IdleSecondary, Role::None)
459 .await
460 .unwrap();
461
462 assert_eq!(state.read_status(), AccessStatus::NotPrimary);
463
464 runtime
466 .change_role(Epoch::new(0, 1), Role::ActiveSecondary, Role::IdleSecondary)
467 .await
468 .unwrap();
469
470 assert_eq!(state.read_status(), AccessStatus::NotPrimary);
471
472 runtime
474 .change_role(Epoch::new(0, 1), Role::Primary, Role::ActiveSecondary)
475 .await
476 .unwrap();
477
478 assert_eq!(state.read_status(), AccessStatus::Granted);
479 assert_eq!(state.write_status(), AccessStatus::Granted);
480
481 let repl = StateReplicatorHandle::new(runtime.data_tx.clone(), state.clone());
483 let lsn = repl
484 .replicate(bytes::Bytes::from("from-runtime"), CancellationToken::new())
485 .await
486 .unwrap();
487 assert!(lsn > 0);
488 assert!(state.current_progress() > 0);
489
490 runtime
492 .change_role(Epoch::new(0, 2), Role::ActiveSecondary, Role::Primary)
493 .await
494 .unwrap();
495
496 assert_eq!(state.read_status(), AccessStatus::NotPrimary);
497 assert_eq!(state.write_status(), AccessStatus::NotPrimary);
498
499 runtime.close().await.unwrap();
501
502 repl_handle.await.unwrap();
504 user_handle.await.unwrap();
505 }
506
507 #[tokio::test]
509 async fn test_runtime_replicate_before_promote() {
510 let bundle = KubericRuntime::builder()
511 .reply_timeout(Duration::from_secs(5))
512 .build();
513
514 let state = bundle.state.clone();
515 let runtime = bundle.runtime;
516
517 let repl_state = state.clone();
519 let _repl_handle = tokio::spawn(async move {
520 NoopReplicator::run(
521 bundle.replicator_control_rx,
522 bundle.replicator_data_rx,
523 repl_state,
524 )
525 .await;
526 });
527
528 let mut lifecycle_rx = bundle.lifecycle_rx;
530 let _user_handle = tokio::spawn(async move {
531 while let Some(event) = lifecycle_rx.recv().await {
532 match event {
533 LifecycleEvent::Open { reply, .. } => {
534 let dummy_handle = crate::replicator::ReplicatorHandle::new(
535 mpsc::channel(1).0,
536 Arc::new(PartitionState::new()),
537 String::new(),
538 CancellationToken::new(),
539 );
540 let _ = reply.send(Ok(dummy_handle));
541 }
542 LifecycleEvent::ChangeRole { reply, .. } => {
543 let _ = reply.send(Ok(String::new()));
544 }
545 LifecycleEvent::Close { reply } => {
546 let _ = reply.send(Ok(()));
547 break;
548 }
549 LifecycleEvent::Abort => break,
550 }
551 }
552 });
553
554 runtime.open(OpenMode::New).await.unwrap();
556
557 let repl = StateReplicatorHandle::new(runtime.data_tx.clone(), state.clone());
559 let result = repl
560 .replicate(bytes::Bytes::from("should-fail"), CancellationToken::new())
561 .await;
562
563 assert!(matches!(result, Err(KubericError::NotPrimary)));
564
565 runtime.close().await.unwrap();
566 }
567
568 #[tokio::test]
570 async fn test_runtime_abort() {
571 let bundle = KubericRuntime::builder()
572 .reply_timeout(Duration::from_secs(5))
573 .build();
574
575 let state = bundle.state.clone();
576 let runtime = bundle.runtime;
577
578 let repl_state = state.clone();
579 let repl_handle = tokio::spawn(async move {
580 NoopReplicator::run(
581 bundle.replicator_control_rx,
582 bundle.replicator_data_rx,
583 repl_state,
584 )
585 .await;
586 });
587
588 let mut lifecycle_rx = bundle.lifecycle_rx;
589 let user_handle = tokio::spawn(async move {
590 while let Some(event) = lifecycle_rx.recv().await {
591 if let LifecycleEvent::Abort = event {
592 break;
593 }
594 }
595 });
596
597 runtime.abort();
599
600 assert_eq!(state.read_status(), AccessStatus::NotPrimary);
601 assert_eq!(state.write_status(), AccessStatus::NotPrimary);
602
603 repl_handle.await.unwrap();
604 user_handle.await.unwrap();
605 }
606}