1#![allow(clippy::result_large_err)]
14
15use std::time::Duration;
16
17use super::snapshot::{BudgetSnapshot, RegionSnapshot, TaskSnapshot, TaskState};
18use crate::error::{Error, ErrorKind};
19use crate::record::distributed_region::{
20 ConsistencyLevel, DistributedRegionConfig, DistributedRegionRecord, DistributedRegionState,
21 ReplicaInfo, StateTransition, TransitionReason,
22};
23use crate::record::region::{RegionRecord, RegionState};
24use crate::types::budget::Budget;
25use crate::types::cancel::CancelReason;
26use crate::types::{RegionId, TaskId, Time};
27
28#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
38pub enum RegionMode {
39 #[default]
41 Local,
42 Distributed {
44 replication_factor: u32,
46 consistency: ConsistencyLevel,
48 },
49 Hybrid {
51 replication_factor: u32,
53 max_lag: Duration,
55 },
56}
57
58impl RegionMode {
59 #[must_use]
61 pub const fn local() -> Self {
62 Self::Local
63 }
64
65 #[must_use]
67 pub fn distributed(replication_factor: u32) -> Self {
68 Self::Distributed {
69 replication_factor,
70 consistency: ConsistencyLevel::Quorum,
71 }
72 }
73
74 #[must_use]
76 pub fn hybrid(replication_factor: u32) -> Self {
77 Self::Hybrid {
78 replication_factor,
79 max_lag: Duration::from_secs(5),
80 }
81 }
82
83 #[must_use]
85 pub const fn is_replicated(&self) -> bool {
86 !matches!(self, Self::Local)
87 }
88
89 #[must_use]
91 pub const fn is_distributed(&self) -> bool {
92 matches!(self, Self::Distributed { .. })
93 }
94
95 #[must_use]
97 pub const fn replication_factor(&self) -> u32 {
98 match self {
99 Self::Local => 1,
100 Self::Distributed {
101 replication_factor, ..
102 }
103 | Self::Hybrid {
104 replication_factor, ..
105 } => *replication_factor,
106 }
107 }
108}
109
110#[derive(Debug, Clone, Copy, PartialEq, Eq)]
116pub enum SyncMode {
117 Synchronous,
119 Asynchronous,
121 WriteSync,
123}
124
125#[derive(Debug, Clone, Copy, PartialEq, Eq)]
127pub enum ConflictResolution {
128 DistributedWins,
130 LocalWins,
132 HighestSequence,
134 Error,
136}
137
138#[derive(Debug, Clone)]
140pub struct BridgeConfig {
141 pub allow_upgrade: bool,
143 pub sync_timeout: Duration,
145 pub sync_mode: SyncMode,
147 pub conflict_resolution: ConflictResolution,
149}
150
151impl Default for BridgeConfig {
152 fn default() -> Self {
153 Self {
154 allow_upgrade: true,
155 sync_timeout: Duration::from_secs(5),
156 sync_mode: SyncMode::Synchronous,
157 conflict_resolution: ConflictResolution::DistributedWins,
158 }
159 }
160}
161
162#[derive(Debug, Clone, Default)]
168pub struct SyncState {
169 pub last_synced_sequence: u64,
171 pub sync_pending: bool,
173 pub pending_ops: u32,
175 pub last_sync_time: Option<Time>,
177 pub last_sync_error: Option<String>,
179}
180
181#[derive(Debug, Clone, Copy, PartialEq, Eq)]
187pub enum EffectiveState {
188 Open,
190 Degraded,
192 Recovering,
194 Closing,
196 Closed,
198 Inconsistent {
200 local: RegionState,
202 distributed: DistributedRegionState,
204 },
205}
206
207impl EffectiveState {
208 #[must_use]
210 pub fn compute(local: RegionState, distributed: Option<DistributedRegionState>) -> Self {
211 match (local, distributed) {
212 (local_s, None) => Self::from_local(local_s),
214
215 (
217 RegionState::Open,
218 Some(DistributedRegionState::Active | DistributedRegionState::Initializing),
219 ) => Self::Open,
220 (RegionState::Open, Some(DistributedRegionState::Degraded)) => Self::Degraded,
221 (RegionState::Open, Some(DistributedRegionState::Recovering)) => Self::Recovering,
222
223 (
225 RegionState::Closing | RegionState::Draining | RegionState::Finalizing,
226 Some(DistributedRegionState::Closing),
227 ) => Self::Closing,
228
229 (RegionState::Closed, Some(DistributedRegionState::Closed)) => Self::Closed,
231
232 (local_s, Some(dist_s)) => Self::Inconsistent {
234 local: local_s,
235 distributed: dist_s,
236 },
237 }
238 }
239
240 fn from_local(local: RegionState) -> Self {
241 match local {
242 RegionState::Open => Self::Open,
243 RegionState::Closing | RegionState::Draining | RegionState::Finalizing => Self::Closing,
244 RegionState::Closed => Self::Closed,
245 }
246 }
247
248 #[must_use]
250 pub const fn can_spawn(&self) -> bool {
251 matches!(self, Self::Open)
252 }
253
254 #[must_use]
256 pub const fn is_inconsistent(&self) -> bool {
257 matches!(self, Self::Inconsistent { .. })
258 }
259
260 #[must_use]
262 pub const fn needs_recovery(&self) -> bool {
263 matches!(
264 self,
265 Self::Degraded | Self::Recovering | Self::Inconsistent { .. }
266 )
267 }
268}
269
270pub trait LocalToDistributed {
276 type Distributed;
278
279 fn to_distributed(&self) -> Self::Distributed;
281}
282
283pub trait DistributedToLocal {
285 type Local;
287
288 fn to_local(&self) -> Self::Local;
290
291 fn is_lossless(&self) -> bool;
293}
294
295impl LocalToDistributed for RegionState {
296 type Distributed = DistributedRegionState;
297
298 fn to_distributed(&self) -> DistributedRegionState {
299 match self {
300 Self::Open => DistributedRegionState::Active,
301 Self::Closing | Self::Draining | Self::Finalizing => DistributedRegionState::Closing,
302 Self::Closed => DistributedRegionState::Closed,
303 }
304 }
305}
306
307impl DistributedToLocal for DistributedRegionState {
308 type Local = RegionState;
309
310 fn to_local(&self) -> RegionState {
311 match self {
312 Self::Initializing | Self::Active | Self::Degraded | Self::Recovering => {
313 RegionState::Open
314 }
315 Self::Closing => RegionState::Closing,
316 Self::Closed => RegionState::Closed,
317 }
318 }
319
320 fn is_lossless(&self) -> bool {
321 matches!(self, Self::Active | Self::Closing | Self::Closed)
322 }
323}
324
325impl LocalToDistributed for Budget {
326 type Distributed = BudgetSnapshot;
327
328 fn to_distributed(&self) -> BudgetSnapshot {
329 BudgetSnapshot {
330 deadline_nanos: self.deadline.map(Time::as_nanos),
331 polls_remaining: if self.poll_quota > 0 {
332 Some(self.poll_quota)
333 } else {
334 None
335 },
336 cost_remaining: self.cost_quota,
337 }
338 }
339}
340
341impl DistributedToLocal for BudgetSnapshot {
342 type Local = Budget;
343
344 fn to_local(&self) -> Budget {
345 let mut budget = Budget::default();
346 if let Some(d) = self.deadline_nanos {
347 budget.deadline = Some(Time::from_nanos(d));
348 }
349 if let Some(p) = self.polls_remaining {
350 budget.poll_quota = p;
351 }
352 if let Some(c) = self.cost_remaining {
353 budget.cost_quota = Some(c);
354 }
355 budget
356 }
357
358 fn is_lossless(&self) -> bool {
359 false }
361}
362
363#[derive(Debug)]
369pub struct CloseResult {
370 pub local_changed: bool,
372 pub distributed_transition: Option<StateTransition>,
374 pub effective_state: EffectiveState,
376}
377
378#[derive(Debug)]
380pub struct UpgradeResult {
381 pub previous_mode: RegionMode,
383 pub new_mode: RegionMode,
385 pub snapshot_sequence: u64,
387}
388
389#[derive(Debug)]
391pub enum SyncResult {
392 NotNeeded,
394 Synced {
396 sequence: u64,
398 },
399 Pending {
401 sequence: u64,
403 },
404}
405
406#[derive(Debug)]
415pub struct RegionBridge {
416 local: RegionRecord,
417 distributed: Option<DistributedRegionRecord>,
418 mode: RegionMode,
419 pub sync_state: SyncState,
421 pub config: BridgeConfig,
423 sequence: u64,
425}
426
427impl RegionBridge {
428 fn mark_sync_pending(&mut self) {
429 self.sync_state.sync_pending = true;
430 self.sync_state.pending_ops = self.sync_state.pending_ops.saturating_add(1);
431 }
432
433 #[must_use]
435 pub fn new_local(id: RegionId, parent: Option<RegionId>, budget: Budget) -> Self {
436 Self {
437 local: RegionRecord::new(id, parent, budget),
438 distributed: None,
439 mode: RegionMode::Local,
440 sync_state: SyncState::default(),
441 config: BridgeConfig::default(),
442 sequence: 0,
443 }
444 }
445
446 #[must_use]
448 pub fn new_distributed(
449 id: RegionId,
450 parent: Option<RegionId>,
451 budget: Budget,
452 config: DistributedRegionConfig,
453 ) -> Self {
454 let replication_factor = config.replication_factor;
455 let consistency = config.write_consistency;
456 let distributed = DistributedRegionRecord::new(id, config, parent, budget);
457 Self {
458 local: RegionRecord::new(id, parent, budget),
459 distributed: Some(distributed),
460 mode: RegionMode::Distributed {
461 replication_factor,
462 consistency,
463 },
464 sync_state: SyncState::default(),
465 config: BridgeConfig::default(),
466 sequence: 0,
467 }
468 }
469
470 #[must_use]
472 pub fn with_mode(
473 id: RegionId,
474 parent: Option<RegionId>,
475 budget: Budget,
476 mode: RegionMode,
477 ) -> Self {
478 match mode {
479 RegionMode::Local | RegionMode::Hybrid { .. } => Self {
480 local: RegionRecord::new(id, parent, budget),
481 distributed: None,
482 mode,
483 sync_state: SyncState::default(),
484 config: BridgeConfig::default(),
485 sequence: 0,
486 },
487 RegionMode::Distributed {
488 replication_factor,
489 consistency,
490 } => {
491 let config = DistributedRegionConfig {
492 replication_factor,
493 write_consistency: consistency,
494 ..Default::default()
495 };
496 Self::new_distributed(id, parent, budget, config)
497 }
498 }
499 }
500
501 #[must_use]
507 pub fn id(&self) -> RegionId {
508 self.local.id
509 }
510
511 #[must_use]
513 pub fn mode(&self) -> RegionMode {
514 self.mode
515 }
516
517 #[must_use]
519 pub fn local_state(&self) -> RegionState {
520 self.local.state()
521 }
522
523 #[must_use]
525 pub fn distributed_state(&self) -> Option<DistributedRegionState> {
526 self.distributed.as_ref().map(|d| d.state)
527 }
528
529 #[must_use]
531 pub fn effective_state(&self) -> EffectiveState {
532 EffectiveState::compute(self.local_state(), self.distributed_state())
533 }
534
535 #[must_use]
537 pub fn can_spawn(&self) -> bool {
538 self.effective_state().can_spawn()
539 }
540
541 #[must_use]
543 pub fn has_live_work(&self) -> bool {
544 self.local.has_live_work()
545 }
546
547 #[must_use]
549 pub fn local(&self) -> &RegionRecord {
550 &self.local
551 }
552
553 #[must_use]
555 pub fn distributed(&self) -> Option<&DistributedRegionRecord> {
556 self.distributed.as_ref()
557 }
558
559 pub fn begin_close(
567 &mut self,
568 reason: Option<CancelReason>,
569 now: Time,
570 ) -> Result<CloseResult, Error> {
571 let transition_reason = reason.as_ref().map_or(TransitionReason::LocalClose, |r| {
573 TransitionReason::Cancelled {
574 reason: r.kind.as_str().to_owned(),
575 }
576 });
577
578 let local_changed = self.local.begin_close(reason);
579
580 let distributed_transition = if let Some(ref mut dist) = self.distributed {
581 match dist.state {
582 DistributedRegionState::Closing | DistributedRegionState::Closed => None,
583 _ => Some(dist.begin_close(transition_reason, now)?),
584 }
585 } else {
586 None
587 };
588
589 if local_changed || distributed_transition.is_some() {
590 self.mark_sync_pending();
591 }
592
593 Ok(CloseResult {
594 local_changed,
595 distributed_transition,
596 effective_state: self.effective_state(),
597 })
598 }
599
600 pub fn begin_drain(&mut self) -> Result<bool, Error> {
602 let changed = self.local.begin_drain();
603 if changed {
604 self.mark_sync_pending();
605 }
606 Ok(changed)
607 }
608
609 pub fn begin_finalize(&mut self) -> Result<bool, Error> {
611 let changed = self.local.begin_finalize();
612 if changed {
613 self.mark_sync_pending();
614 }
615 Ok(changed)
616 }
617
618 pub fn complete_close(&mut self, now: Time) -> Result<CloseResult, Error> {
620 let local_changed = self.local.complete_close();
621
622 let distributed_transition = if let Some(ref mut dist) = self.distributed {
623 match dist.state {
624 DistributedRegionState::Closed => None,
625 _ => Some(dist.complete_close(now)?),
626 }
627 } else {
628 None
629 };
630
631 if local_changed || distributed_transition.is_some() {
632 self.mark_sync_pending();
633 }
634
635 Ok(CloseResult {
636 local_changed,
637 distributed_transition,
638 effective_state: self.effective_state(),
639 })
640 }
641
642 pub fn add_child(&mut self, child: RegionId) -> Result<(), Error> {
648 if !self.can_spawn() {
649 return Err(
650 Error::new(ErrorKind::RegionClosed).with_message("region not accepting new work")
651 );
652 }
653
654 let before = self.local.child_ids().len();
655 self.local
656 .add_child(child)
657 .map_err(|e| Error::new(ErrorKind::AdmissionDenied).with_message(format!("{e:?}")))?;
658 if self.local.child_ids().len() > before {
659 self.mark_sync_pending();
660 }
661 Ok(())
662 }
663
664 pub fn remove_child(&mut self, child: RegionId) {
666 let before = self.local.child_ids().len();
667 self.local.remove_child(child);
668 if self.local.child_ids().len() < before {
669 self.mark_sync_pending();
670 }
671 }
672
673 pub fn add_task(&mut self, task: TaskId) -> Result<(), Error> {
675 if !self.can_spawn() {
676 return Err(
677 Error::new(ErrorKind::RegionClosed).with_message("region not accepting new work")
678 );
679 }
680
681 let before = self.local.task_ids().len();
682 self.local
683 .add_task(task)
684 .map_err(|e| Error::new(ErrorKind::AdmissionDenied).with_message(format!("{e:?}")))?;
685 if self.local.task_ids().len() > before {
686 self.mark_sync_pending();
687 }
688 Ok(())
689 }
690
691 pub fn remove_task(&mut self, task: TaskId) {
693 let before = self.local.task_ids().len();
694 self.local.remove_task(task);
695 if self.local.task_ids().len() < before {
696 self.mark_sync_pending();
697 }
698 }
699
700 pub fn sync(&mut self) -> Result<SyncResult, Error> {
708 if !self.mode.is_replicated() || !self.sync_state.sync_pending || self.distributed.is_none()
709 {
710 return Ok(SyncResult::NotNeeded);
711 }
712
713 let snapshot = self.create_snapshot();
714 let seq = snapshot.sequence;
715
716 self.sync_state.last_synced_sequence = seq;
717 self.sync_state.sync_pending = false;
718 self.sync_state.pending_ops = 0;
719
720 Ok(SyncResult::Synced { sequence: seq })
721 }
722
723 #[must_use]
725 pub fn create_snapshot(&mut self) -> RegionSnapshot {
726 self.sequence += 1;
727
728 let tasks: Vec<TaskSnapshot> = self
729 .local
730 .task_ids()
731 .into_iter()
732 .map(|id| TaskSnapshot {
733 task_id: id,
734 state: TaskState::Running,
735 priority: 0,
736 })
737 .collect();
738
739 RegionSnapshot {
740 region_id: self.local.id,
741 state: self.local.state(),
742 timestamp: Time::ZERO,
743 sequence: self.sequence,
744 tasks,
745 children: self.local.child_ids(),
746 finalizer_count: self.local.finalizer_count() as u32,
747 budget: self.local.budget().to_distributed(),
748 cancel_reason: self
749 .local
750 .cancel_reason()
751 .map(|r| r.kind.as_str().to_owned()),
752 parent: self.local.parent,
753 metadata: vec![],
754 }
755 }
756
757 pub fn apply_snapshot(&mut self, snapshot: &RegionSnapshot) -> Result<(), Error> {
759 if snapshot.region_id != self.local.id {
760 return Err(Error::new(ErrorKind::ObjectMismatch)
761 .with_message("snapshot region ID does not match bridge"));
762 }
763
764 let budget = Budget {
766 deadline: snapshot.budget.deadline_nanos.map(Time::from_nanos),
767 poll_quota: snapshot.budget.polls_remaining.unwrap_or(0),
768 cost_quota: snapshot.budget.cost_remaining,
769 priority: 128, };
771
772 let cancel_reason = snapshot.cancel_reason.as_ref().map(|reason_str| {
774 let kind = match reason_str.as_str() {
776 "Timeout" => crate::types::cancel::CancelKind::Timeout,
777 "Deadline" => crate::types::cancel::CancelKind::Deadline,
778 "PollQuota" => crate::types::cancel::CancelKind::PollQuota,
779 "CostBudget" => crate::types::cancel::CancelKind::CostBudget,
780 "FailFast" => crate::types::cancel::CancelKind::FailFast,
781 "RaceLost" => crate::types::cancel::CancelKind::RaceLost,
782 "ParentCancelled" => crate::types::cancel::CancelKind::ParentCancelled,
783 "ResourceUnavailable" => crate::types::cancel::CancelKind::ResourceUnavailable,
784 "Shutdown" => crate::types::cancel::CancelKind::Shutdown,
785 "LinkedExit" => crate::types::cancel::CancelKind::LinkedExit,
786 _ => crate::types::cancel::CancelKind::User, };
788
789 crate::types::cancel::CancelReason::with_origin(
790 kind,
791 snapshot.region_id,
792 snapshot.timestamp,
793 )
794 });
795
796 let tasks: Vec<TaskId> = snapshot.tasks.iter().map(|t| t.task_id).collect();
798
799 self.local.apply_distributed_snapshot(
801 snapshot.state,
802 budget,
803 snapshot.children.clone(),
804 tasks,
805 cancel_reason,
806 );
807
808 self.sequence = self.sequence.max(snapshot.sequence);
810 self.sync_state.last_synced_sequence = snapshot.sequence;
811 self.sync_state.sync_pending = false;
812 self.sync_state.pending_ops = 0;
813
814 Ok(())
815 }
816
817 pub fn upgrade_to_distributed(
826 &mut self,
827 config: DistributedRegionConfig,
828 _replicas: &[ReplicaInfo],
829 ) -> Result<UpgradeResult, Error> {
830 if !self.config.allow_upgrade {
831 return Err(Error::new(ErrorKind::InvalidStateTransition)
832 .with_message("mode upgrade not allowed"));
833 }
834
835 if self.mode.is_replicated() {
836 return Err(Error::new(ErrorKind::InvalidStateTransition)
837 .with_message("already in distributed mode"));
838 }
839
840 if self.local.state() != RegionState::Open {
841 return Err(Error::new(ErrorKind::InvalidStateTransition)
842 .with_message("can only upgrade open regions"));
843 }
844
845 let snapshot = self.create_snapshot();
846 let snapshot_sequence = snapshot.sequence;
847
848 let replication_factor = config.replication_factor;
849 let consistency = config.write_consistency;
850
851 let distributed = DistributedRegionRecord::new(
852 self.local.id,
853 config,
854 self.local.parent,
855 self.local.budget(),
856 );
857
858 let previous_mode = self.mode;
859 self.distributed = Some(distributed);
860 self.mode = RegionMode::Distributed {
861 replication_factor,
862 consistency,
863 };
864
865 Ok(UpgradeResult {
866 previous_mode,
867 new_mode: self.mode,
868 snapshot_sequence,
869 })
870 }
871}
872
873#[cfg(test)]
878#[allow(clippy::similar_names)]
879mod tests {
880 use super::*;
881
882 #[test]
887 fn mode_local() {
888 let mode = RegionMode::local();
889 assert!(!mode.is_replicated());
890 assert!(!mode.is_distributed());
891 assert_eq!(mode.replication_factor(), 1);
892 }
893
894 #[test]
895 fn mode_distributed() {
896 let mode = RegionMode::distributed(3);
897 assert!(mode.is_replicated());
898 assert!(mode.is_distributed());
899 assert_eq!(mode.replication_factor(), 3);
900 }
901
902 #[test]
903 fn mode_hybrid() {
904 let mode = RegionMode::hybrid(2);
905 assert!(mode.is_replicated());
906 assert!(!mode.is_distributed());
907 assert_eq!(mode.replication_factor(), 2);
908 }
909
910 #[test]
911 fn mode_default_is_local() {
912 assert_eq!(RegionMode::default(), RegionMode::Local);
913 }
914
915 #[test]
920 fn effective_state_local_open() {
921 let state = EffectiveState::compute(RegionState::Open, None);
922 assert_eq!(state, EffectiveState::Open);
923 assert!(state.can_spawn());
924 assert!(!state.needs_recovery());
925 }
926
927 #[test]
928 fn effective_state_local_closing() {
929 let state = EffectiveState::compute(RegionState::Closing, None);
930 assert_eq!(state, EffectiveState::Closing);
931 assert!(!state.can_spawn());
932 }
933
934 #[test]
935 fn effective_state_local_closed() {
936 let state = EffectiveState::compute(RegionState::Closed, None);
937 assert_eq!(state, EffectiveState::Closed);
938 }
939
940 #[test]
941 fn effective_state_distributed_active() {
942 let state =
943 EffectiveState::compute(RegionState::Open, Some(DistributedRegionState::Active));
944 assert_eq!(state, EffectiveState::Open);
945 assert!(state.can_spawn());
946 }
947
948 #[test]
949 fn effective_state_distributed_initializing() {
950 let state = EffectiveState::compute(
951 RegionState::Open,
952 Some(DistributedRegionState::Initializing),
953 );
954 assert_eq!(state, EffectiveState::Open);
955 }
956
957 #[test]
958 fn effective_state_degraded() {
959 let state =
960 EffectiveState::compute(RegionState::Open, Some(DistributedRegionState::Degraded));
961 assert_eq!(state, EffectiveState::Degraded);
962 assert!(!state.can_spawn());
963 assert!(state.needs_recovery());
964 }
965
966 #[test]
967 fn effective_state_recovering() {
968 let state =
969 EffectiveState::compute(RegionState::Open, Some(DistributedRegionState::Recovering));
970 assert_eq!(state, EffectiveState::Recovering);
971 assert!(state.needs_recovery());
972 }
973
974 #[test]
975 fn effective_state_inconsistent() {
976 let state =
977 EffectiveState::compute(RegionState::Closed, Some(DistributedRegionState::Active));
978 assert!(state.is_inconsistent());
979 assert!(state.needs_recovery());
980 }
981
982 #[test]
983 fn effective_state_closing_distributed() {
984 let state =
985 EffectiveState::compute(RegionState::Closing, Some(DistributedRegionState::Closing));
986 assert_eq!(state, EffectiveState::Closing);
987 }
988
989 #[test]
990 fn effective_state_closed_distributed() {
991 let state =
992 EffectiveState::compute(RegionState::Closed, Some(DistributedRegionState::Closed));
993 assert_eq!(state, EffectiveState::Closed);
994 }
995
996 #[test]
1001 fn local_state_to_distributed() {
1002 assert_eq!(
1003 RegionState::Open.to_distributed(),
1004 DistributedRegionState::Active
1005 );
1006 assert_eq!(
1007 RegionState::Closing.to_distributed(),
1008 DistributedRegionState::Closing
1009 );
1010 assert_eq!(
1011 RegionState::Draining.to_distributed(),
1012 DistributedRegionState::Closing
1013 );
1014 assert_eq!(
1015 RegionState::Finalizing.to_distributed(),
1016 DistributedRegionState::Closing
1017 );
1018 assert_eq!(
1019 RegionState::Closed.to_distributed(),
1020 DistributedRegionState::Closed
1021 );
1022 }
1023
1024 #[test]
1025 fn distributed_state_to_local() {
1026 assert_eq!(DistributedRegionState::Active.to_local(), RegionState::Open);
1027 assert_eq!(
1028 DistributedRegionState::Initializing.to_local(),
1029 RegionState::Open
1030 );
1031 assert_eq!(
1032 DistributedRegionState::Degraded.to_local(),
1033 RegionState::Open
1034 );
1035 assert_eq!(
1036 DistributedRegionState::Recovering.to_local(),
1037 RegionState::Open
1038 );
1039 assert_eq!(
1040 DistributedRegionState::Closing.to_local(),
1041 RegionState::Closing
1042 );
1043 assert_eq!(
1044 DistributedRegionState::Closed.to_local(),
1045 RegionState::Closed
1046 );
1047 }
1048
1049 #[test]
1050 fn is_lossless_conversion() {
1051 assert!(DistributedRegionState::Active.is_lossless());
1052 assert!(DistributedRegionState::Closing.is_lossless());
1053 assert!(DistributedRegionState::Closed.is_lossless());
1054 assert!(!DistributedRegionState::Degraded.is_lossless());
1055 assert!(!DistributedRegionState::Recovering.is_lossless());
1056 assert!(!DistributedRegionState::Initializing.is_lossless());
1057 }
1058
1059 #[test]
1060 fn budget_to_distributed() {
1061 let budget = Budget::new().with_poll_quota(100).with_cost_quota(500);
1062 let snapshot = budget.to_distributed();
1063
1064 assert_eq!(snapshot.polls_remaining, Some(100));
1065 assert_eq!(snapshot.cost_remaining, Some(500));
1066 }
1067
1068 #[test]
1073 fn bridge_new_local() {
1074 let bridge = RegionBridge::new_local(RegionId::new_for_test(1, 0), None, Budget::default());
1075
1076 assert_eq!(bridge.mode(), RegionMode::Local);
1077 assert!(bridge.distributed().is_none());
1078 assert!(bridge.can_spawn());
1079 assert_eq!(bridge.local_state(), RegionState::Open);
1080 }
1081
1082 #[test]
1083 fn bridge_new_distributed() {
1084 let bridge = RegionBridge::new_distributed(
1085 RegionId::new_for_test(1, 0),
1086 None,
1087 Budget::default(),
1088 DistributedRegionConfig::default(),
1089 );
1090
1091 assert!(bridge.mode().is_distributed());
1092 assert!(bridge.distributed().is_some());
1093 }
1094
1095 #[test]
1096 fn bridge_with_mode_local() {
1097 let bridge = RegionBridge::with_mode(
1098 RegionId::new_for_test(1, 0),
1099 None,
1100 Budget::default(),
1101 RegionMode::Local,
1102 );
1103
1104 assert_eq!(bridge.mode(), RegionMode::Local);
1105 }
1106
1107 #[test]
1108 fn bridge_with_mode_distributed() {
1109 let bridge = RegionBridge::with_mode(
1110 RegionId::new_for_test(1, 0),
1111 None,
1112 Budget::default(),
1113 RegionMode::distributed(3),
1114 );
1115
1116 assert!(bridge.mode().is_distributed());
1117 assert!(bridge.distributed().is_some());
1118 }
1119
1120 #[test]
1125 fn bridge_begin_close_local() {
1126 let mut bridge = create_local_bridge();
1127
1128 let result = bridge.begin_close(None, Time::from_secs(0)).unwrap();
1129
1130 assert!(result.local_changed);
1131 assert!(result.distributed_transition.is_none());
1132 assert_eq!(result.effective_state, EffectiveState::Closing);
1133 }
1134
1135 #[test]
1136 fn bridge_begin_close_distributed() {
1137 let mut bridge = create_distributed_bridge();
1138 if let Some(ref mut dist) = bridge.distributed {
1140 let _ = dist.activate(Time::from_secs(0));
1141 }
1142
1143 let result = bridge.begin_close(None, Time::from_secs(1)).unwrap();
1144
1145 assert!(result.local_changed);
1146 assert!(result.distributed_transition.is_some());
1147 assert_eq!(result.effective_state, EffectiveState::Closing);
1148 }
1149
1150 #[test]
1151 fn bridge_full_lifecycle() {
1152 let mut bridge = create_local_bridge();
1153
1154 bridge.begin_close(None, Time::from_secs(0)).unwrap();
1156 assert!(!bridge.can_spawn());
1157
1158 bridge.begin_drain().unwrap();
1160
1161 bridge.begin_finalize().unwrap();
1163
1164 bridge.complete_close(Time::from_secs(1)).unwrap();
1166 assert_eq!(bridge.effective_state(), EffectiveState::Closed);
1167 }
1168
1169 #[test]
1170 fn bridge_cannot_spawn_when_closed() {
1171 let mut bridge = create_local_bridge();
1172 bridge.begin_close(None, Time::from_secs(0)).unwrap();
1173
1174 let result = bridge.add_task(TaskId::new_for_test(1, 0));
1175 assert!(result.is_err());
1176 }
1177
1178 #[test]
1183 fn bridge_add_remove_task() {
1184 let mut bridge = create_local_bridge();
1185 let task_id = TaskId::new_for_test(1, 0);
1186
1187 bridge.add_task(task_id).unwrap();
1188 assert!(bridge.has_live_work());
1189 assert!(bridge.sync_state.sync_pending);
1190
1191 bridge.remove_task(task_id);
1192 assert!(!bridge.has_live_work());
1193 }
1194
1195 #[test]
1196 fn bridge_add_remove_child() {
1197 let mut bridge = create_local_bridge();
1198 let child_id = RegionId::new_for_test(2, 0);
1199
1200 bridge.add_child(child_id).unwrap();
1201 assert!(bridge.has_live_work());
1202
1203 bridge.remove_child(child_id);
1204 assert!(!bridge.has_live_work());
1205 }
1206
1207 #[test]
1212 fn sync_not_needed_local() {
1213 let mut bridge = create_local_bridge();
1214 let result = bridge.sync().unwrap();
1215 assert!(matches!(result, SyncResult::NotNeeded));
1216 }
1217
1218 #[test]
1219 fn sync_after_changes() {
1220 let mut bridge = create_distributed_bridge();
1221 bridge.sync_state.sync_pending = true;
1222
1223 let result = bridge.sync().unwrap();
1224 assert!(matches!(result, SyncResult::Synced { .. }));
1225 assert!(!bridge.sync_state.sync_pending);
1226 }
1227
1228 #[test]
1233 fn create_snapshot_increments_sequence() {
1234 let mut bridge = create_local_bridge();
1235
1236 let snap1 = bridge.create_snapshot();
1237 let snap2 = bridge.create_snapshot();
1238
1239 assert_eq!(snap1.sequence, 1);
1240 assert_eq!(snap2.sequence, 2);
1241 assert_eq!(snap1.region_id, bridge.id());
1242 }
1243
1244 #[test]
1245 fn snapshot_includes_tasks() {
1246 let mut bridge = create_local_bridge();
1247 bridge.add_task(TaskId::new_for_test(1, 0)).unwrap();
1248 bridge.add_task(TaskId::new_for_test(2, 0)).unwrap();
1249
1250 let snap = bridge.create_snapshot();
1251 assert_eq!(snap.tasks.len(), 2);
1252 }
1253
1254 #[test]
1255 fn apply_snapshot_updates_sync_state() {
1256 let mut bridge = create_local_bridge();
1257 bridge.sync_state.sync_pending = true;
1258 bridge.sync_state.pending_ops = 7;
1259
1260 let snap = RegionSnapshot {
1261 region_id: bridge.id(),
1262 state: RegionState::Open,
1263 timestamp: Time::from_secs(100),
1264 sequence: 42,
1265 tasks: vec![],
1266 children: vec![],
1267 finalizer_count: 0,
1268 budget: BudgetSnapshot {
1269 deadline_nanos: None,
1270 polls_remaining: None,
1271 cost_remaining: None,
1272 },
1273 cancel_reason: None,
1274 parent: None,
1275 metadata: vec![],
1276 };
1277
1278 bridge.apply_snapshot(&snap).unwrap();
1279 assert_eq!(bridge.sync_state.last_synced_sequence, 42);
1280 assert!(!bridge.sync_state.sync_pending);
1281 assert_eq!(bridge.sync_state.pending_ops, 0);
1282 }
1283
1284 #[test]
1285 fn apply_snapshot_advances_local_sequence_counter() {
1286 let mut bridge = create_local_bridge();
1287
1288 let snap = RegionSnapshot {
1289 region_id: bridge.id(),
1290 state: RegionState::Open,
1291 timestamp: Time::from_secs(100),
1292 sequence: 42,
1293 tasks: vec![],
1294 children: vec![],
1295 finalizer_count: 0,
1296 budget: BudgetSnapshot {
1297 deadline_nanos: None,
1298 polls_remaining: None,
1299 cost_remaining: None,
1300 },
1301 cancel_reason: None,
1302 parent: None,
1303 metadata: vec![],
1304 };
1305
1306 bridge.apply_snapshot(&snap).unwrap();
1307
1308 let next = bridge.create_snapshot();
1309 assert_eq!(next.sequence, 43);
1310 }
1311
1312 #[test]
1313 fn apply_snapshot_mismatch() {
1314 let mut bridge = create_local_bridge();
1315
1316 let snap = RegionSnapshot {
1317 region_id: RegionId::new_for_test(999, 0),
1318 state: RegionState::Open,
1319 timestamp: Time::ZERO,
1320 sequence: 1,
1321 tasks: vec![],
1322 children: vec![],
1323 finalizer_count: 0,
1324 budget: BudgetSnapshot {
1325 deadline_nanos: None,
1326 polls_remaining: None,
1327 cost_remaining: None,
1328 },
1329 cancel_reason: None,
1330 parent: None,
1331 metadata: vec![],
1332 };
1333
1334 let result = bridge.apply_snapshot(&snap);
1335 assert!(result.is_err());
1336 assert_eq!(result.unwrap_err().kind(), ErrorKind::ObjectMismatch);
1337 }
1338
1339 #[test]
1344 fn upgrade_local_to_distributed() {
1345 let mut bridge = create_local_bridge();
1346
1347 let config = DistributedRegionConfig {
1348 replication_factor: 3,
1349 ..Default::default()
1350 };
1351 let replicas = create_test_replicas(3);
1352
1353 let result = bridge.upgrade_to_distributed(config, &replicas).unwrap();
1354
1355 assert_eq!(result.previous_mode, RegionMode::Local);
1356 assert!(result.new_mode.is_distributed());
1357 assert!(bridge.distributed().is_some());
1358 }
1359
1360 #[test]
1361 fn upgrade_not_allowed() {
1362 let mut bridge = create_local_bridge();
1363 bridge.config.allow_upgrade = false;
1364
1365 let result = bridge
1366 .upgrade_to_distributed(DistributedRegionConfig::default(), &create_test_replicas(3));
1367
1368 assert!(result.is_err());
1369 assert_eq!(
1370 result.unwrap_err().kind(),
1371 ErrorKind::InvalidStateTransition
1372 );
1373 }
1374
1375 #[test]
1376 fn upgrade_already_distributed() {
1377 let mut bridge = create_distributed_bridge();
1378
1379 let result = bridge
1380 .upgrade_to_distributed(DistributedRegionConfig::default(), &create_test_replicas(3));
1381
1382 assert!(result.is_err());
1383 }
1384
1385 #[test]
1386 fn upgrade_only_from_open() {
1387 let mut bridge = create_local_bridge();
1388 bridge.begin_close(None, Time::from_secs(0)).unwrap();
1389
1390 let result = bridge
1391 .upgrade_to_distributed(DistributedRegionConfig::default(), &create_test_replicas(3));
1392
1393 assert!(result.is_err());
1394 }
1395
1396 fn create_local_bridge() -> RegionBridge {
1401 RegionBridge::new_local(RegionId::new_for_test(1, 0), None, Budget::default())
1402 }
1403
1404 fn create_distributed_bridge() -> RegionBridge {
1405 RegionBridge::new_distributed(
1406 RegionId::new_for_test(1, 0),
1407 None,
1408 Budget::default(),
1409 DistributedRegionConfig::default(),
1410 )
1411 }
1412
1413 fn create_test_replicas(count: usize) -> Vec<ReplicaInfo> {
1414 (0..count)
1415 .map(|i| ReplicaInfo::new(&format!("r{i}"), &format!("addr{i}")))
1416 .collect()
1417 }
1418
1419 #[test]
1424 fn upgrade_while_tasks_running() {
1425 let mut bridge = create_local_bridge();
1427 bridge.add_task(TaskId::new_for_test(1, 0)).unwrap();
1428 bridge.add_task(TaskId::new_for_test(2, 0)).unwrap();
1429 assert!(bridge.has_live_work());
1430
1431 let config = DistributedRegionConfig {
1432 replication_factor: 3,
1433 ..Default::default()
1434 };
1435 let result = bridge
1436 .upgrade_to_distributed(config, &create_test_replicas(3))
1437 .unwrap();
1438
1439 assert!(result.new_mode.is_distributed());
1440 assert!(bridge.has_live_work());
1442 assert!(result.snapshot_sequence > 0);
1444 }
1445
1446 #[test]
1447 fn snapshot_monotonic_under_rapid_changes() {
1448 let mut bridge = create_local_bridge();
1449
1450 let mut prev_seq = 0;
1451 for i in 0..20 {
1452 let tid = TaskId::new_for_test(i, 0);
1454 bridge.add_task(tid).unwrap();
1455 let snap = bridge.create_snapshot();
1456 assert!(
1457 snap.sequence > prev_seq,
1458 "sequence must be monotonically increasing"
1459 );
1460 prev_seq = snap.sequence;
1461 bridge.remove_task(tid);
1462 }
1463 }
1464
1465 #[test]
1466 fn double_close_local() {
1467 let mut bridge = create_local_bridge();
1468
1469 let result1 = bridge.begin_close(None, Time::from_secs(0)).unwrap();
1470 assert!(result1.local_changed);
1471
1472 let result2 = bridge.begin_close(None, Time::from_secs(1)).unwrap();
1474 assert!(!result2.local_changed);
1475 assert_eq!(result2.effective_state, EffectiveState::Closing);
1476 }
1477
1478 #[test]
1479 fn double_close_distributed() {
1480 let mut bridge = create_distributed_bridge();
1481
1482 let result1 = bridge.begin_close(None, Time::from_secs(0)).unwrap();
1483 assert!(result1.local_changed);
1484 assert!(result1.distributed_transition.is_some());
1485 assert_eq!(result1.effective_state, EffectiveState::Closing);
1486
1487 let result2 = bridge.begin_close(None, Time::from_secs(1)).unwrap();
1488 assert!(!result2.local_changed);
1489 assert!(result2.distributed_transition.is_none());
1490 assert_eq!(result2.effective_state, EffectiveState::Closing);
1491 }
1492
1493 #[test]
1494 fn double_complete_close_local() {
1495 let mut bridge = create_local_bridge();
1496 bridge.begin_close(None, Time::from_secs(0)).unwrap();
1497 bridge.begin_drain().unwrap();
1498 bridge.begin_finalize().unwrap();
1499
1500 let result1 = bridge.complete_close(Time::from_secs(1)).unwrap();
1501 assert!(result1.local_changed);
1502 assert_eq!(result1.effective_state, EffectiveState::Closed);
1503
1504 let result2 = bridge.complete_close(Time::from_secs(2)).unwrap();
1506 assert!(!result2.local_changed);
1507 }
1508
1509 #[test]
1510 fn double_complete_close_distributed() {
1511 let mut bridge = create_distributed_bridge();
1512 bridge.begin_close(None, Time::from_secs(0)).unwrap();
1513 bridge.begin_drain().unwrap();
1514 bridge.begin_finalize().unwrap();
1515
1516 let result1 = bridge.complete_close(Time::from_secs(1)).unwrap();
1517 assert!(result1.local_changed);
1518 assert!(result1.distributed_transition.is_some());
1519 assert_eq!(result1.effective_state, EffectiveState::Closed);
1520
1521 let result2 = bridge.complete_close(Time::from_secs(2)).unwrap();
1522 assert!(!result2.local_changed);
1523 assert!(result2.distributed_transition.is_none());
1524 assert_eq!(result2.effective_state, EffectiveState::Closed);
1525 }
1526
1527 #[test]
1528 fn close_with_cancel_reason() {
1529 let mut bridge = create_local_bridge();
1530
1531 let reason = CancelReason::timeout();
1532 let result = bridge
1533 .begin_close(Some(reason), Time::from_secs(0))
1534 .unwrap();
1535
1536 assert!(result.local_changed);
1537 assert_eq!(result.effective_state, EffectiveState::Closing);
1538 }
1539
1540 #[test]
1541 fn add_child_after_close_rejected() {
1542 let mut bridge = create_local_bridge();
1543 bridge.begin_close(None, Time::from_secs(0)).unwrap();
1544
1545 let result = bridge.add_child(RegionId::new_for_test(2, 0));
1546 assert!(result.is_err());
1547 }
1548
1549 #[test]
1550 fn sync_not_needed_when_no_changes() {
1551 let mut bridge = create_distributed_bridge();
1552 assert!(!bridge.sync_state.sync_pending);
1554
1555 let result = bridge.sync().unwrap();
1556 assert!(matches!(result, SyncResult::NotNeeded));
1557 }
1558
1559 #[test]
1560 fn sync_clears_pending_ops() {
1561 let mut bridge = create_distributed_bridge();
1562 bridge.sync_state.sync_pending = true;
1563 bridge.sync_state.pending_ops = 5;
1564
1565 let result = bridge.sync().unwrap();
1566 assert!(matches!(result, SyncResult::Synced { .. }));
1567 assert_eq!(bridge.sync_state.pending_ops, 0);
1568 assert!(!bridge.sync_state.sync_pending);
1569 }
1570
1571 #[test]
1572 fn pending_ops_counts_only_real_mutations() {
1573 let mut bridge = create_distributed_bridge();
1574
1575 bridge.add_task(TaskId::new_for_test(1, 0)).unwrap();
1576 bridge.add_task(TaskId::new_for_test(1, 0)).unwrap(); bridge.remove_task(TaskId::new_for_test(999, 0)); bridge.remove_task(TaskId::new_for_test(1, 0)); bridge.add_child(RegionId::new_for_test(2, 0)).unwrap();
1581 bridge.add_child(RegionId::new_for_test(2, 0)).unwrap(); bridge.remove_child(RegionId::new_for_test(777, 0)); bridge.remove_child(RegionId::new_for_test(2, 0)); assert!(bridge.sync_state.sync_pending);
1586 assert_eq!(bridge.sync_state.pending_ops, 4);
1587 }
1588
1589 #[test]
1590 fn close_transitions_mark_sync_pending() {
1591 let mut bridge = create_distributed_bridge();
1592 if let Some(ref mut dist) = bridge.distributed {
1593 let _ = dist.activate(Time::from_secs(0));
1594 }
1595 assert!(!bridge.sync_state.sync_pending);
1596 assert_eq!(bridge.sync_state.pending_ops, 0);
1597
1598 bridge.begin_close(None, Time::from_secs(1)).unwrap();
1599 assert!(bridge.sync_state.sync_pending);
1600 assert!(bridge.sync_state.pending_ops >= 1);
1601 }
1602
1603 #[test]
1604 fn upgrade_snapshot_sequence_matches() {
1605 let mut bridge = create_local_bridge();
1606
1607 let _ = bridge.create_snapshot();
1609 let _ = bridge.create_snapshot();
1610 assert_eq!(bridge.sequence, 2);
1611
1612 let config = DistributedRegionConfig {
1613 replication_factor: 3,
1614 ..Default::default()
1615 };
1616 let result = bridge
1617 .upgrade_to_distributed(config, &create_test_replicas(3))
1618 .unwrap();
1619
1620 assert_eq!(result.snapshot_sequence, 3);
1622 }
1623
1624 #[test]
1625 fn bridge_with_mode_hybrid() {
1626 let bridge = RegionBridge::with_mode(
1627 RegionId::new_for_test(1, 0),
1628 None,
1629 Budget::default(),
1630 RegionMode::hybrid(2),
1631 );
1632
1633 assert!(bridge.mode().is_replicated());
1634 assert!(!bridge.mode().is_distributed());
1635 assert!(bridge.distributed().is_none());
1637 }
1638
1639 #[test]
1640 fn effective_state_draining_with_distributed_closing() {
1641 let state =
1642 EffectiveState::compute(RegionState::Draining, Some(DistributedRegionState::Closing));
1643 assert_eq!(state, EffectiveState::Closing);
1644 }
1645
1646 #[test]
1647 fn effective_state_finalizing_with_distributed_closing() {
1648 let state = EffectiveState::compute(
1649 RegionState::Finalizing,
1650 Some(DistributedRegionState::Closing),
1651 );
1652 assert_eq!(state, EffectiveState::Closing);
1653 }
1654
1655 #[test]
1656 fn bridge_config_defaults() {
1657 let config = BridgeConfig::default();
1658 assert!(config.allow_upgrade);
1659 assert_eq!(config.sync_timeout, Duration::from_secs(5));
1660 assert_eq!(config.sync_mode, SyncMode::Synchronous);
1661 assert_eq!(
1662 config.conflict_resolution,
1663 ConflictResolution::DistributedWins
1664 );
1665 }
1666
1667 #[test]
1668 fn sync_state_default() {
1669 let state = SyncState::default();
1670 assert_eq!(state.last_synced_sequence, 0);
1671 assert!(!state.sync_pending);
1672 assert_eq!(state.pending_ops, 0);
1673 assert!(state.last_sync_time.is_none());
1674 assert!(state.last_sync_error.is_none());
1675 }
1676
1677 #[test]
1678 fn snapshot_includes_children() {
1679 let mut bridge = create_local_bridge();
1680 bridge.add_child(RegionId::new_for_test(2, 0)).unwrap();
1681 bridge.add_child(RegionId::new_for_test(3, 0)).unwrap();
1682
1683 let snap = bridge.create_snapshot();
1684 assert_eq!(snap.children.len(), 2);
1685 }
1686
1687 #[test]
1688 fn region_mode_debug_clone_copy_default_eq() {
1689 let m = RegionMode::default();
1690 assert_eq!(m, RegionMode::Local);
1691 let dbg = format!("{m:?}");
1692 assert!(dbg.contains("Local"), "{dbg}");
1693
1694 let dist = RegionMode::distributed(3);
1695 let copied: RegionMode = dist;
1696 let cloned = dist;
1697 assert_eq!(copied, cloned);
1698 assert_ne!(dist, RegionMode::Local);
1699 }
1700
1701 #[test]
1702 fn sync_mode_debug_clone_copy_eq() {
1703 let s = SyncMode::Synchronous;
1704 let dbg = format!("{s:?}");
1705 assert!(dbg.contains("Synchronous"), "{dbg}");
1706 let copied: SyncMode = s;
1707 let cloned = s;
1708 assert_eq!(copied, cloned);
1709 assert_ne!(s, SyncMode::Asynchronous);
1710 }
1711
1712 #[test]
1713 fn conflict_resolution_debug_clone_copy_eq() {
1714 let c = ConflictResolution::DistributedWins;
1715 let dbg = format!("{c:?}");
1716 assert!(dbg.contains("DistributedWins"), "{dbg}");
1717 let copied: ConflictResolution = c;
1718 let cloned = c;
1719 assert_eq!(copied, cloned);
1720 }
1721
1722 #[test]
1723 fn bridge_config_debug_clone_default() {
1724 let c = BridgeConfig::default();
1725 let dbg = format!("{c:?}");
1726 assert!(dbg.contains("BridgeConfig"), "{dbg}");
1727 assert!(c.allow_upgrade);
1728 let cloned = c;
1729 assert_eq!(format!("{cloned:?}"), dbg);
1730 }
1731
1732 #[test]
1733 fn effective_state_debug_clone_copy_eq() {
1734 let e = EffectiveState::Open;
1735 let dbg = format!("{e:?}");
1736 assert!(dbg.contains("Open"), "{dbg}");
1737 let copied: EffectiveState = e;
1738 let cloned = e;
1739 assert_eq!(copied, cloned);
1740 assert_ne!(e, EffectiveState::Closed);
1741 }
1742
1743 #[test]
1744 fn sync_state_debug_clone_default() {
1745 let s = SyncState::default();
1746 let dbg = format!("{s:?}");
1747 assert!(dbg.contains("SyncState"), "{dbg}");
1748 assert_eq!(s.pending_ops, 0);
1749 let cloned = s;
1750 assert_eq!(format!("{cloned:?}"), dbg);
1751 }
1752
1753 #[test]
1754 fn distributed_close_full_lifecycle() {
1755 let mut bridge = create_distributed_bridge();
1756 if let Some(ref mut dist) = bridge.distributed {
1758 let _ = dist.activate(Time::from_secs(0));
1759 }
1760
1761 let result = bridge.begin_close(None, Time::from_secs(1)).unwrap();
1763 assert!(result.local_changed);
1764 assert!(result.distributed_transition.is_some());
1765
1766 bridge.begin_drain().unwrap();
1768 bridge.begin_finalize().unwrap();
1769
1770 let result = bridge.complete_close(Time::from_secs(2)).unwrap();
1772 assert_eq!(result.effective_state, EffectiveState::Closed);
1773 }
1774
1775 #[test]
1783 fn effective_state_inconsistent_pairs_are_exhaustive() {
1784 let inconsistent_pairs: &[(RegionState, DistributedRegionState)] = &[
1786 (RegionState::Closed, DistributedRegionState::Active),
1787 (RegionState::Closed, DistributedRegionState::Initializing),
1788 (RegionState::Closed, DistributedRegionState::Degraded),
1789 (RegionState::Closed, DistributedRegionState::Recovering),
1790 (RegionState::Closed, DistributedRegionState::Closing),
1791 (RegionState::Closing, DistributedRegionState::Active),
1792 (RegionState::Closing, DistributedRegionState::Initializing),
1793 (RegionState::Closing, DistributedRegionState::Degraded),
1794 (RegionState::Closing, DistributedRegionState::Recovering),
1795 (RegionState::Closing, DistributedRegionState::Closed),
1796 (RegionState::Draining, DistributedRegionState::Active),
1797 (RegionState::Draining, DistributedRegionState::Initializing),
1798 (RegionState::Draining, DistributedRegionState::Degraded),
1799 (RegionState::Draining, DistributedRegionState::Recovering),
1800 (RegionState::Draining, DistributedRegionState::Closed),
1801 (RegionState::Finalizing, DistributedRegionState::Active),
1802 (
1803 RegionState::Finalizing,
1804 DistributedRegionState::Initializing,
1805 ),
1806 (RegionState::Finalizing, DistributedRegionState::Degraded),
1807 (RegionState::Finalizing, DistributedRegionState::Recovering),
1808 (RegionState::Finalizing, DistributedRegionState::Closed),
1809 (RegionState::Open, DistributedRegionState::Closing),
1810 (RegionState::Open, DistributedRegionState::Closed),
1811 ];
1812
1813 for (local, distributed) in inconsistent_pairs {
1814 let state = EffectiveState::compute(*local, Some(*distributed));
1815 assert!(
1816 state.is_inconsistent(),
1817 "({local:?}, {distributed:?}) should be Inconsistent, got {state:?}"
1818 );
1819 if let EffectiveState::Inconsistent {
1820 local: l,
1821 distributed: d,
1822 } = state
1823 {
1824 assert_eq!(l, *local, "local state not preserved");
1825 assert_eq!(d, *distributed, "distributed state not preserved");
1826 }
1827 }
1828 }
1829
1830 #[test]
1833 fn hybrid_mode_sync_not_needed_without_distributed_record() {
1834 let mut bridge = RegionBridge::with_mode(
1835 RegionId::new_for_test(1, 0),
1836 None,
1837 Budget::default(),
1838 RegionMode::hybrid(3),
1839 );
1840 assert!(bridge.mode().is_replicated());
1841 let sync = bridge.sync().unwrap();
1842 assert!(
1843 matches!(sync, SyncResult::NotNeeded),
1844 "hybrid mode without distributed record must report NotNeeded"
1845 );
1846 }
1847
1848 #[test]
1852 fn hybrid_mode_sync_not_needed_with_pending_ops() {
1853 let mut bridge = RegionBridge::with_mode(
1854 RegionId::new_for_test(1, 0),
1855 None,
1856 Budget::default(),
1857 RegionMode::hybrid(3),
1858 );
1859 bridge.sync_state.sync_pending = true;
1861 bridge.sync_state.pending_ops = 3;
1862
1863 let sync = bridge.sync().unwrap();
1864 assert!(
1865 matches!(sync, SyncResult::NotNeeded),
1866 "hybrid mode without distributed record must report NotNeeded even with pending ops"
1867 );
1868 }
1869}