Skip to main content

asupersync/distributed/
bridge.rs

1//! Bridge between local and distributed region operations.
2//!
3//! Provides transparent upgrade paths from local to distributed operation,
4//! lifecycle synchronization between local and distributed state machines,
5//! and type conversions that preserve structured concurrency guarantees.
6//!
7//! # Architecture
8//!
9//! ```text
10//! RegionRecord ↔ RegionBridge ↔ DistributedRegionRecord
11//! ```
12
13#![allow(clippy::result_large_err)]
14
15use std::time::Duration;
16
17use super::snapshot::{BudgetSnapshot, RegionSnapshot, TaskSnapshot, TaskState};
18use crate::error::{Error, ErrorKind};
19use crate::record::distributed_region::{
20    ConsistencyLevel, DistributedRegionConfig, DistributedRegionRecord, DistributedRegionState,
21    ReplicaInfo, StateTransition, TransitionReason,
22};
23use crate::record::region::{RegionRecord, RegionState};
24use crate::types::budget::Budget;
25use crate::types::cancel::CancelReason;
26use crate::types::{RegionId, TaskId, Time};
27
28// ---------------------------------------------------------------------------
29// RegionMode
30// ---------------------------------------------------------------------------
31
32/// Operating mode for a region.
33///
34/// Determines whether a region operates locally or with distributed
35/// replication. Can be promoted (but not demoted) during the region's
36/// lifetime.
37#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
38pub enum RegionMode {
39    /// Local operation only — no replication.
40    #[default]
41    Local,
42    /// Distributed operation with configurable replication.
43    Distributed {
44        /// Number of replicas.
45        replication_factor: u32,
46        /// Consistency level for operations.
47        consistency: ConsistencyLevel,
48    },
49    /// Hybrid mode — local primary with async replication.
50    Hybrid {
51        /// Number of backup replicas.
52        replication_factor: u32,
53        /// Maximum replication lag before blocking.
54        max_lag: Duration,
55    },
56}
57
58impl RegionMode {
59    /// Creates a local-only mode.
60    #[must_use]
61    pub const fn local() -> Self {
62        Self::Local
63    }
64
65    /// Creates a distributed mode with quorum consistency.
66    #[must_use]
67    pub fn distributed(replication_factor: u32) -> Self {
68        Self::Distributed {
69            replication_factor,
70            consistency: ConsistencyLevel::Quorum,
71        }
72    }
73
74    /// Creates a hybrid mode with async replication.
75    #[must_use]
76    pub fn hybrid(replication_factor: u32) -> Self {
77        Self::Hybrid {
78            replication_factor,
79            max_lag: Duration::from_secs(5),
80        }
81    }
82
83    /// Returns true if this mode involves any replication.
84    #[must_use]
85    pub const fn is_replicated(&self) -> bool {
86        !matches!(self, Self::Local)
87    }
88
89    /// Returns true if this mode is fully distributed.
90    #[must_use]
91    pub const fn is_distributed(&self) -> bool {
92        matches!(self, Self::Distributed { .. })
93    }
94
95    /// Returns the replication factor, or 1 for local mode.
96    #[must_use]
97    pub const fn replication_factor(&self) -> u32 {
98        match self {
99            Self::Local => 1,
100            Self::Distributed {
101                replication_factor, ..
102            }
103            | Self::Hybrid {
104                replication_factor, ..
105            } => *replication_factor,
106        }
107    }
108}
109
110// ---------------------------------------------------------------------------
111// BridgeConfig / SyncMode / ConflictResolution
112// ---------------------------------------------------------------------------
113
114/// How synchronization is performed.
115#[derive(Debug, Clone, Copy, PartialEq, Eq)]
116pub enum SyncMode {
117    /// Operations block until replicated.
118    Synchronous,
119    /// Operations complete locally, replicate in background.
120    Asynchronous,
121    /// Block only for writes, reads are local.
122    WriteSync,
123}
124
125/// How to resolve conflicts between local and distributed state.
126#[derive(Debug, Clone, Copy, PartialEq, Eq)]
127pub enum ConflictResolution {
128    /// Distributed state wins.
129    DistributedWins,
130    /// Local state wins.
131    LocalWins,
132    /// Use highest sequence number.
133    HighestSequence,
134    /// Report error on conflict.
135    Error,
136}
137
138/// Configuration for bridge behavior.
139#[derive(Debug, Clone)]
140pub struct BridgeConfig {
141    /// Whether to allow mode upgrades during lifetime.
142    pub allow_upgrade: bool,
143    /// Timeout for synchronization operations.
144    pub sync_timeout: Duration,
145    /// How synchronization is performed.
146    pub sync_mode: SyncMode,
147    /// Conflict resolution strategy.
148    pub conflict_resolution: ConflictResolution,
149}
150
151impl Default for BridgeConfig {
152    fn default() -> Self {
153        Self {
154            allow_upgrade: true,
155            sync_timeout: Duration::from_secs(5),
156            sync_mode: SyncMode::Synchronous,
157            conflict_resolution: ConflictResolution::DistributedWins,
158        }
159    }
160}
161
162// ---------------------------------------------------------------------------
163// SyncState
164// ---------------------------------------------------------------------------
165
166/// Current synchronization state between local and distributed.
167#[derive(Debug, Clone, Default)]
168pub struct SyncState {
169    /// Last successfully synchronized sequence number.
170    pub last_synced_sequence: u64,
171    /// Whether synchronization is pending.
172    pub sync_pending: bool,
173    /// Number of pending operations to sync.
174    pub pending_ops: u32,
175    /// Last sync timestamp.
176    pub last_sync_time: Option<Time>,
177    /// Last sync error, if any.
178    pub last_sync_error: Option<String>,
179}
180
181// ---------------------------------------------------------------------------
182// EffectiveState
183// ---------------------------------------------------------------------------
184
185/// Effective state considering both local and distributed status.
186#[derive(Debug, Clone, Copy, PartialEq, Eq)]
187pub enum EffectiveState {
188    /// Region is open and accepting work.
189    Open,
190    /// Region is active but in degraded mode (distributed only).
191    Degraded,
192    /// Region is recovering (distributed only).
193    Recovering,
194    /// Region is closing.
195    Closing,
196    /// Region is closed.
197    Closed,
198    /// States are inconsistent (error condition).
199    Inconsistent {
200        /// Local state.
201        local: RegionState,
202        /// Distributed state.
203        distributed: DistributedRegionState,
204    },
205}
206
207impl EffectiveState {
208    /// Computes effective state from local and optional distributed state.
209    #[must_use]
210    pub fn compute(local: RegionState, distributed: Option<DistributedRegionState>) -> Self {
211        match (local, distributed) {
212            // Local-only mode.
213            (local_s, None) => Self::from_local(local_s),
214
215            // Distributed mode — both must agree.
216            (
217                RegionState::Open,
218                Some(DistributedRegionState::Active | DistributedRegionState::Initializing),
219            ) => Self::Open,
220            (RegionState::Open, Some(DistributedRegionState::Degraded)) => Self::Degraded,
221            (RegionState::Open, Some(DistributedRegionState::Recovering)) => Self::Recovering,
222
223            // Closing states.
224            (
225                RegionState::Closing | RegionState::Draining | RegionState::Finalizing,
226                Some(DistributedRegionState::Closing),
227            ) => Self::Closing,
228
229            // Closed states.
230            (RegionState::Closed, Some(DistributedRegionState::Closed)) => Self::Closed,
231
232            // Inconsistent states.
233            (local_s, Some(dist_s)) => Self::Inconsistent {
234                local: local_s,
235                distributed: dist_s,
236            },
237        }
238    }
239
240    fn from_local(local: RegionState) -> Self {
241        match local {
242            RegionState::Open => Self::Open,
243            RegionState::Closing | RegionState::Draining | RegionState::Finalizing => Self::Closing,
244            RegionState::Closed => Self::Closed,
245        }
246    }
247
248    /// Returns true if work can be spawned.
249    #[must_use]
250    pub const fn can_spawn(&self) -> bool {
251        matches!(self, Self::Open)
252    }
253
254    /// Returns true if the region is in an error state.
255    #[must_use]
256    pub const fn is_inconsistent(&self) -> bool {
257        matches!(self, Self::Inconsistent { .. })
258    }
259
260    /// Returns true if the region needs recovery.
261    #[must_use]
262    pub const fn needs_recovery(&self) -> bool {
263        matches!(
264            self,
265            Self::Degraded | Self::Recovering | Self::Inconsistent { .. }
266        )
267    }
268}
269
270// ---------------------------------------------------------------------------
271// Type Conversion Traits
272// ---------------------------------------------------------------------------
273
274/// Converts local types to their distributed equivalents.
275pub trait LocalToDistributed {
276    /// The distributed equivalent type.
277    type Distributed;
278
279    /// Converts to the distributed equivalent.
280    fn to_distributed(&self) -> Self::Distributed;
281}
282
283/// Converts distributed types to their local equivalents.
284pub trait DistributedToLocal {
285    /// The local equivalent type.
286    type Local;
287
288    /// Converts to the local equivalent.
289    fn to_local(&self) -> Self::Local;
290
291    /// Returns true if lossless conversion is possible.
292    fn is_lossless(&self) -> bool;
293}
294
295impl LocalToDistributed for RegionState {
296    type Distributed = DistributedRegionState;
297
298    fn to_distributed(&self) -> DistributedRegionState {
299        match self {
300            Self::Open => DistributedRegionState::Active,
301            Self::Closing | Self::Draining | Self::Finalizing => DistributedRegionState::Closing,
302            Self::Closed => DistributedRegionState::Closed,
303        }
304    }
305}
306
307impl DistributedToLocal for DistributedRegionState {
308    type Local = RegionState;
309
310    fn to_local(&self) -> RegionState {
311        match self {
312            Self::Initializing | Self::Active | Self::Degraded | Self::Recovering => {
313                RegionState::Open
314            }
315            Self::Closing => RegionState::Closing,
316            Self::Closed => RegionState::Closed,
317        }
318    }
319
320    fn is_lossless(&self) -> bool {
321        matches!(self, Self::Active | Self::Closing | Self::Closed)
322    }
323}
324
325impl LocalToDistributed for Budget {
326    type Distributed = BudgetSnapshot;
327
328    fn to_distributed(&self) -> BudgetSnapshot {
329        BudgetSnapshot {
330            deadline_nanos: self.deadline.map(Time::as_nanos),
331            polls_remaining: if self.poll_quota > 0 {
332                Some(self.poll_quota)
333            } else {
334                None
335            },
336            cost_remaining: self.cost_quota,
337        }
338    }
339}
340
341impl DistributedToLocal for BudgetSnapshot {
342    type Local = Budget;
343
344    fn to_local(&self) -> Budget {
345        let mut budget = Budget::default();
346        if let Some(d) = self.deadline_nanos {
347            budget.deadline = Some(Time::from_nanos(d));
348        }
349        if let Some(p) = self.polls_remaining {
350            budget.poll_quota = p;
351        }
352        if let Some(c) = self.cost_remaining {
353            budget.cost_quota = Some(c);
354        }
355        budget
356    }
357
358    fn is_lossless(&self) -> bool {
359        false // Priority is lost
360    }
361}
362
363// ---------------------------------------------------------------------------
364// CloseResult / UpgradeResult / SyncResult
365// ---------------------------------------------------------------------------
366
367/// Result of a close operation.
368#[derive(Debug)]
369pub struct CloseResult {
370    /// Whether the local state changed.
371    pub local_changed: bool,
372    /// Distributed transition, if any.
373    pub distributed_transition: Option<StateTransition>,
374    /// New effective state.
375    pub effective_state: EffectiveState,
376}
377
378/// Result of a mode upgrade operation.
379#[derive(Debug)]
380pub struct UpgradeResult {
381    /// Previous operating mode.
382    pub previous_mode: RegionMode,
383    /// New operating mode.
384    pub new_mode: RegionMode,
385    /// Sequence number of the snapshot taken during upgrade.
386    pub snapshot_sequence: u64,
387}
388
389/// Result of a sync operation.
390#[derive(Debug)]
391pub enum SyncResult {
392    /// Sync was not needed (local mode or no changes).
393    NotNeeded,
394    /// Sync completed successfully.
395    Synced {
396        /// Synced sequence number.
397        sequence: u64,
398    },
399    /// Sync is pending (async mode).
400    Pending {
401        /// Pending sequence number.
402        sequence: u64,
403    },
404}
405
406// ---------------------------------------------------------------------------
407// RegionBridge
408// ---------------------------------------------------------------------------
409
410/// Coordinates local and distributed region state.
411///
412/// Keeps both state machines synchronized, translates operations between
413/// systems, handles mode upgrades, and manages replication lifecycle.
414#[derive(Debug)]
415pub struct RegionBridge {
416    local: RegionRecord,
417    distributed: Option<DistributedRegionRecord>,
418    mode: RegionMode,
419    /// Current synchronization state (accessible for tests).
420    pub sync_state: SyncState,
421    /// Bridge configuration (accessible for tests).
422    pub config: BridgeConfig,
423    /// Monotonic sequence counter for snapshots.
424    sequence: u64,
425}
426
427impl RegionBridge {
428    fn mark_sync_pending(&mut self) {
429        self.sync_state.sync_pending = true;
430        self.sync_state.pending_ops = self.sync_state.pending_ops.saturating_add(1);
431    }
432
433    /// Creates a new bridge in local-only mode.
434    #[must_use]
435    pub fn new_local(id: RegionId, parent: Option<RegionId>, budget: Budget) -> Self {
436        Self {
437            local: RegionRecord::new(id, parent, budget),
438            distributed: None,
439            mode: RegionMode::Local,
440            sync_state: SyncState::default(),
441            config: BridgeConfig::default(),
442            sequence: 0,
443        }
444    }
445
446    /// Creates a new bridge in distributed mode.
447    #[must_use]
448    pub fn new_distributed(
449        id: RegionId,
450        parent: Option<RegionId>,
451        budget: Budget,
452        config: DistributedRegionConfig,
453    ) -> Self {
454        let replication_factor = config.replication_factor;
455        let consistency = config.write_consistency;
456        let distributed = DistributedRegionRecord::new(id, config, parent, budget);
457        Self {
458            local: RegionRecord::new(id, parent, budget),
459            distributed: Some(distributed),
460            mode: RegionMode::Distributed {
461                replication_factor,
462                consistency,
463            },
464            sync_state: SyncState::default(),
465            config: BridgeConfig::default(),
466            sequence: 0,
467        }
468    }
469
470    /// Creates a new bridge with a specified mode.
471    #[must_use]
472    pub fn with_mode(
473        id: RegionId,
474        parent: Option<RegionId>,
475        budget: Budget,
476        mode: RegionMode,
477    ) -> Self {
478        match mode {
479            RegionMode::Local | RegionMode::Hybrid { .. } => Self {
480                local: RegionRecord::new(id, parent, budget),
481                distributed: None,
482                mode,
483                sync_state: SyncState::default(),
484                config: BridgeConfig::default(),
485                sequence: 0,
486            },
487            RegionMode::Distributed {
488                replication_factor,
489                consistency,
490            } => {
491                let config = DistributedRegionConfig {
492                    replication_factor,
493                    write_consistency: consistency,
494                    ..Default::default()
495                };
496                Self::new_distributed(id, parent, budget, config)
497            }
498        }
499    }
500
501    // =========================================================================
502    // Query Operations
503    // =========================================================================
504
505    /// Returns the region ID.
506    #[must_use]
507    pub fn id(&self) -> RegionId {
508        self.local.id
509    }
510
511    /// Returns the current mode.
512    #[must_use]
513    pub fn mode(&self) -> RegionMode {
514        self.mode
515    }
516
517    /// Returns the local region state.
518    #[must_use]
519    pub fn local_state(&self) -> RegionState {
520        self.local.state()
521    }
522
523    /// Returns the distributed state if in distributed mode.
524    #[must_use]
525    pub fn distributed_state(&self) -> Option<DistributedRegionState> {
526        self.distributed.as_ref().map(|d| d.state)
527    }
528
529    /// Returns the effective state (considering both local and distributed).
530    #[must_use]
531    pub fn effective_state(&self) -> EffectiveState {
532        EffectiveState::compute(self.local_state(), self.distributed_state())
533    }
534
535    /// Returns true if the region can accept new work.
536    #[must_use]
537    pub fn can_spawn(&self) -> bool {
538        self.effective_state().can_spawn()
539    }
540
541    /// Returns true if the region has any active work.
542    #[must_use]
543    pub fn has_live_work(&self) -> bool {
544        self.local.has_live_work()
545    }
546
547    /// Returns the local region record (read-only).
548    #[must_use]
549    pub fn local(&self) -> &RegionRecord {
550        &self.local
551    }
552
553    /// Returns the distributed record if in distributed mode.
554    #[must_use]
555    pub fn distributed(&self) -> Option<&DistributedRegionRecord> {
556        self.distributed.as_ref()
557    }
558
559    // =========================================================================
560    // Lifecycle Operations
561    // =========================================================================
562
563    /// Begins closing the region.
564    ///
565    /// Coordinates between local and distributed close sequences.
566    pub fn begin_close(
567        &mut self,
568        reason: Option<CancelReason>,
569        now: Time,
570    ) -> Result<CloseResult, Error> {
571        // Extract transition reason before consuming the cancel reason.
572        let transition_reason = reason.as_ref().map_or(TransitionReason::LocalClose, |r| {
573            TransitionReason::Cancelled {
574                reason: r.kind.as_str().to_owned(),
575            }
576        });
577
578        let local_changed = self.local.begin_close(reason);
579
580        let distributed_transition = if let Some(ref mut dist) = self.distributed {
581            match dist.state {
582                DistributedRegionState::Closing | DistributedRegionState::Closed => None,
583                _ => Some(dist.begin_close(transition_reason, now)?),
584            }
585        } else {
586            None
587        };
588
589        if local_changed || distributed_transition.is_some() {
590            self.mark_sync_pending();
591        }
592
593        Ok(CloseResult {
594            local_changed,
595            distributed_transition,
596            effective_state: self.effective_state(),
597        })
598    }
599
600    /// Transitions to draining state.
601    pub fn begin_drain(&mut self) -> Result<bool, Error> {
602        let changed = self.local.begin_drain();
603        if changed {
604            self.mark_sync_pending();
605        }
606        Ok(changed)
607    }
608
609    /// Transitions to finalizing state.
610    pub fn begin_finalize(&mut self) -> Result<bool, Error> {
611        let changed = self.local.begin_finalize();
612        if changed {
613            self.mark_sync_pending();
614        }
615        Ok(changed)
616    }
617
618    /// Completes the close operation.
619    pub fn complete_close(&mut self, now: Time) -> Result<CloseResult, Error> {
620        let local_changed = self.local.complete_close();
621
622        let distributed_transition = if let Some(ref mut dist) = self.distributed {
623            match dist.state {
624                DistributedRegionState::Closed => None,
625                _ => Some(dist.complete_close(now)?),
626            }
627        } else {
628            None
629        };
630
631        if local_changed || distributed_transition.is_some() {
632            self.mark_sync_pending();
633        }
634
635        Ok(CloseResult {
636            local_changed,
637            distributed_transition,
638            effective_state: self.effective_state(),
639        })
640    }
641
642    // =========================================================================
643    // Child/Task Management
644    // =========================================================================
645
646    /// Adds a child region.
647    pub fn add_child(&mut self, child: RegionId) -> Result<(), Error> {
648        if !self.can_spawn() {
649            return Err(
650                Error::new(ErrorKind::RegionClosed).with_message("region not accepting new work")
651            );
652        }
653
654        let before = self.local.child_ids().len();
655        self.local
656            .add_child(child)
657            .map_err(|e| Error::new(ErrorKind::AdmissionDenied).with_message(format!("{e:?}")))?;
658        if self.local.child_ids().len() > before {
659            self.mark_sync_pending();
660        }
661        Ok(())
662    }
663
664    /// Removes a child region.
665    pub fn remove_child(&mut self, child: RegionId) {
666        let before = self.local.child_ids().len();
667        self.local.remove_child(child);
668        if self.local.child_ids().len() < before {
669            self.mark_sync_pending();
670        }
671    }
672
673    /// Adds a task to the region.
674    pub fn add_task(&mut self, task: TaskId) -> Result<(), Error> {
675        if !self.can_spawn() {
676            return Err(
677                Error::new(ErrorKind::RegionClosed).with_message("region not accepting new work")
678            );
679        }
680
681        let before = self.local.task_ids().len();
682        self.local
683            .add_task(task)
684            .map_err(|e| Error::new(ErrorKind::AdmissionDenied).with_message(format!("{e:?}")))?;
685        if self.local.task_ids().len() > before {
686            self.mark_sync_pending();
687        }
688        Ok(())
689    }
690
691    /// Removes a task from the region.
692    pub fn remove_task(&mut self, task: TaskId) {
693        let before = self.local.task_ids().len();
694        self.local.remove_task(task);
695        if self.local.task_ids().len() < before {
696            self.mark_sync_pending();
697        }
698    }
699
700    // =========================================================================
701    // Synchronization
702    // =========================================================================
703
704    /// Synchronizes local state to distributed replicas (sync test path).
705    ///
706    /// Returns [`SyncResult::NotNeeded`] if in local mode or no changes pending.
707    pub fn sync(&mut self) -> Result<SyncResult, Error> {
708        if !self.mode.is_replicated() || !self.sync_state.sync_pending || self.distributed.is_none()
709        {
710            return Ok(SyncResult::NotNeeded);
711        }
712
713        let snapshot = self.create_snapshot();
714        let seq = snapshot.sequence;
715
716        self.sync_state.last_synced_sequence = seq;
717        self.sync_state.sync_pending = false;
718        self.sync_state.pending_ops = 0;
719
720        Ok(SyncResult::Synced { sequence: seq })
721    }
722
723    /// Creates a snapshot of current region state.
724    #[must_use]
725    pub fn create_snapshot(&mut self) -> RegionSnapshot {
726        self.sequence += 1;
727
728        let tasks: Vec<TaskSnapshot> = self
729            .local
730            .task_ids()
731            .into_iter()
732            .map(|id| TaskSnapshot {
733                task_id: id,
734                state: TaskState::Running,
735                priority: 0,
736            })
737            .collect();
738
739        RegionSnapshot {
740            region_id: self.local.id,
741            state: self.local.state(),
742            timestamp: Time::ZERO,
743            sequence: self.sequence,
744            tasks,
745            children: self.local.child_ids(),
746            finalizer_count: self.local.finalizer_count() as u32,
747            budget: self.local.budget().to_distributed(),
748            cancel_reason: self
749                .local
750                .cancel_reason()
751                .map(|r| r.kind.as_str().to_owned()),
752            parent: self.local.parent,
753            metadata: vec![],
754        }
755    }
756
757    /// Applies a recovered snapshot to this bridge.
758    pub fn apply_snapshot(&mut self, snapshot: &RegionSnapshot) -> Result<(), Error> {
759        if snapshot.region_id != self.local.id {
760            return Err(Error::new(ErrorKind::ObjectMismatch)
761                .with_message("snapshot region ID does not match bridge"));
762        }
763
764        // Reconstruct Budget
765        let budget = Budget {
766            deadline: snapshot.budget.deadline_nanos.map(Time::from_nanos),
767            poll_quota: snapshot.budget.polls_remaining.unwrap_or(0),
768            cost_quota: snapshot.budget.cost_remaining,
769            priority: 128, // Default priority (not preserved in snapshot)
770        };
771
772        // Reconstruct CancelReason
773        let cancel_reason = snapshot.cancel_reason.as_ref().map(|reason_str| {
774            // Attempt to parse known kinds from the string
775            let kind = match reason_str.as_str() {
776                "Timeout" => crate::types::cancel::CancelKind::Timeout,
777                "Deadline" => crate::types::cancel::CancelKind::Deadline,
778                "PollQuota" => crate::types::cancel::CancelKind::PollQuota,
779                "CostBudget" => crate::types::cancel::CancelKind::CostBudget,
780                "FailFast" => crate::types::cancel::CancelKind::FailFast,
781                "RaceLost" => crate::types::cancel::CancelKind::RaceLost,
782                "ParentCancelled" => crate::types::cancel::CancelKind::ParentCancelled,
783                "ResourceUnavailable" => crate::types::cancel::CancelKind::ResourceUnavailable,
784                "Shutdown" => crate::types::cancel::CancelKind::Shutdown,
785                "LinkedExit" => crate::types::cancel::CancelKind::LinkedExit,
786                _ => crate::types::cancel::CancelKind::User, // Fallback (includes "User")
787            };
788
789            crate::types::cancel::CancelReason::with_origin(
790                kind,
791                snapshot.region_id,
792                snapshot.timestamp,
793            )
794        });
795
796        // Extract tasks IDs
797        let tasks: Vec<TaskId> = snapshot.tasks.iter().map(|t| t.task_id).collect();
798
799        // Apply state from snapshot to local record
800        self.local.apply_distributed_snapshot(
801            snapshot.state,
802            budget,
803            snapshot.children.clone(),
804            tasks,
805            cancel_reason,
806        );
807
808        // Keep future locally created snapshots monotonic after recovery/apply.
809        self.sequence = self.sequence.max(snapshot.sequence);
810        self.sync_state.last_synced_sequence = snapshot.sequence;
811        self.sync_state.sync_pending = false;
812        self.sync_state.pending_ops = 0;
813
814        Ok(())
815    }
816
817    // =========================================================================
818    // Mode Upgrade
819    // =========================================================================
820
821    /// Upgrades from local to distributed mode (sync test path).
822    ///
823    /// Validates preconditions and creates the distributed record.
824    /// In production, this would also encode and distribute the snapshot.
825    pub fn upgrade_to_distributed(
826        &mut self,
827        config: DistributedRegionConfig,
828        _replicas: &[ReplicaInfo],
829    ) -> Result<UpgradeResult, Error> {
830        if !self.config.allow_upgrade {
831            return Err(Error::new(ErrorKind::InvalidStateTransition)
832                .with_message("mode upgrade not allowed"));
833        }
834
835        if self.mode.is_replicated() {
836            return Err(Error::new(ErrorKind::InvalidStateTransition)
837                .with_message("already in distributed mode"));
838        }
839
840        if self.local.state() != RegionState::Open {
841            return Err(Error::new(ErrorKind::InvalidStateTransition)
842                .with_message("can only upgrade open regions"));
843        }
844
845        let snapshot = self.create_snapshot();
846        let snapshot_sequence = snapshot.sequence;
847
848        let replication_factor = config.replication_factor;
849        let consistency = config.write_consistency;
850
851        let distributed = DistributedRegionRecord::new(
852            self.local.id,
853            config,
854            self.local.parent,
855            self.local.budget(),
856        );
857
858        let previous_mode = self.mode;
859        self.distributed = Some(distributed);
860        self.mode = RegionMode::Distributed {
861            replication_factor,
862            consistency,
863        };
864
865        Ok(UpgradeResult {
866            previous_mode,
867            new_mode: self.mode,
868            snapshot_sequence,
869        })
870    }
871}
872
873// ---------------------------------------------------------------------------
874// Tests
875// ---------------------------------------------------------------------------
876
877#[cfg(test)]
878#[allow(clippy::similar_names)]
879mod tests {
880    use super::*;
881
882    // =====================================================================
883    // RegionMode Tests
884    // =====================================================================
885
886    #[test]
887    fn mode_local() {
888        let mode = RegionMode::local();
889        assert!(!mode.is_replicated());
890        assert!(!mode.is_distributed());
891        assert_eq!(mode.replication_factor(), 1);
892    }
893
894    #[test]
895    fn mode_distributed() {
896        let mode = RegionMode::distributed(3);
897        assert!(mode.is_replicated());
898        assert!(mode.is_distributed());
899        assert_eq!(mode.replication_factor(), 3);
900    }
901
902    #[test]
903    fn mode_hybrid() {
904        let mode = RegionMode::hybrid(2);
905        assert!(mode.is_replicated());
906        assert!(!mode.is_distributed());
907        assert_eq!(mode.replication_factor(), 2);
908    }
909
910    #[test]
911    fn mode_default_is_local() {
912        assert_eq!(RegionMode::default(), RegionMode::Local);
913    }
914
915    // =====================================================================
916    // EffectiveState Tests
917    // =====================================================================
918
919    #[test]
920    fn effective_state_local_open() {
921        let state = EffectiveState::compute(RegionState::Open, None);
922        assert_eq!(state, EffectiveState::Open);
923        assert!(state.can_spawn());
924        assert!(!state.needs_recovery());
925    }
926
927    #[test]
928    fn effective_state_local_closing() {
929        let state = EffectiveState::compute(RegionState::Closing, None);
930        assert_eq!(state, EffectiveState::Closing);
931        assert!(!state.can_spawn());
932    }
933
934    #[test]
935    fn effective_state_local_closed() {
936        let state = EffectiveState::compute(RegionState::Closed, None);
937        assert_eq!(state, EffectiveState::Closed);
938    }
939
940    #[test]
941    fn effective_state_distributed_active() {
942        let state =
943            EffectiveState::compute(RegionState::Open, Some(DistributedRegionState::Active));
944        assert_eq!(state, EffectiveState::Open);
945        assert!(state.can_spawn());
946    }
947
948    #[test]
949    fn effective_state_distributed_initializing() {
950        let state = EffectiveState::compute(
951            RegionState::Open,
952            Some(DistributedRegionState::Initializing),
953        );
954        assert_eq!(state, EffectiveState::Open);
955    }
956
957    #[test]
958    fn effective_state_degraded() {
959        let state =
960            EffectiveState::compute(RegionState::Open, Some(DistributedRegionState::Degraded));
961        assert_eq!(state, EffectiveState::Degraded);
962        assert!(!state.can_spawn());
963        assert!(state.needs_recovery());
964    }
965
966    #[test]
967    fn effective_state_recovering() {
968        let state =
969            EffectiveState::compute(RegionState::Open, Some(DistributedRegionState::Recovering));
970        assert_eq!(state, EffectiveState::Recovering);
971        assert!(state.needs_recovery());
972    }
973
974    #[test]
975    fn effective_state_inconsistent() {
976        let state =
977            EffectiveState::compute(RegionState::Closed, Some(DistributedRegionState::Active));
978        assert!(state.is_inconsistent());
979        assert!(state.needs_recovery());
980    }
981
982    #[test]
983    fn effective_state_closing_distributed() {
984        let state =
985            EffectiveState::compute(RegionState::Closing, Some(DistributedRegionState::Closing));
986        assert_eq!(state, EffectiveState::Closing);
987    }
988
989    #[test]
990    fn effective_state_closed_distributed() {
991        let state =
992            EffectiveState::compute(RegionState::Closed, Some(DistributedRegionState::Closed));
993        assert_eq!(state, EffectiveState::Closed);
994    }
995
996    // =====================================================================
997    // Type Conversion Tests
998    // =====================================================================
999
1000    #[test]
1001    fn local_state_to_distributed() {
1002        assert_eq!(
1003            RegionState::Open.to_distributed(),
1004            DistributedRegionState::Active
1005        );
1006        assert_eq!(
1007            RegionState::Closing.to_distributed(),
1008            DistributedRegionState::Closing
1009        );
1010        assert_eq!(
1011            RegionState::Draining.to_distributed(),
1012            DistributedRegionState::Closing
1013        );
1014        assert_eq!(
1015            RegionState::Finalizing.to_distributed(),
1016            DistributedRegionState::Closing
1017        );
1018        assert_eq!(
1019            RegionState::Closed.to_distributed(),
1020            DistributedRegionState::Closed
1021        );
1022    }
1023
1024    #[test]
1025    fn distributed_state_to_local() {
1026        assert_eq!(DistributedRegionState::Active.to_local(), RegionState::Open);
1027        assert_eq!(
1028            DistributedRegionState::Initializing.to_local(),
1029            RegionState::Open
1030        );
1031        assert_eq!(
1032            DistributedRegionState::Degraded.to_local(),
1033            RegionState::Open
1034        );
1035        assert_eq!(
1036            DistributedRegionState::Recovering.to_local(),
1037            RegionState::Open
1038        );
1039        assert_eq!(
1040            DistributedRegionState::Closing.to_local(),
1041            RegionState::Closing
1042        );
1043        assert_eq!(
1044            DistributedRegionState::Closed.to_local(),
1045            RegionState::Closed
1046        );
1047    }
1048
1049    #[test]
1050    fn is_lossless_conversion() {
1051        assert!(DistributedRegionState::Active.is_lossless());
1052        assert!(DistributedRegionState::Closing.is_lossless());
1053        assert!(DistributedRegionState::Closed.is_lossless());
1054        assert!(!DistributedRegionState::Degraded.is_lossless());
1055        assert!(!DistributedRegionState::Recovering.is_lossless());
1056        assert!(!DistributedRegionState::Initializing.is_lossless());
1057    }
1058
1059    #[test]
1060    fn budget_to_distributed() {
1061        let budget = Budget::new().with_poll_quota(100).with_cost_quota(500);
1062        let snapshot = budget.to_distributed();
1063
1064        assert_eq!(snapshot.polls_remaining, Some(100));
1065        assert_eq!(snapshot.cost_remaining, Some(500));
1066    }
1067
1068    // =====================================================================
1069    // Bridge Creation Tests
1070    // =====================================================================
1071
1072    #[test]
1073    fn bridge_new_local() {
1074        let bridge = RegionBridge::new_local(RegionId::new_for_test(1, 0), None, Budget::default());
1075
1076        assert_eq!(bridge.mode(), RegionMode::Local);
1077        assert!(bridge.distributed().is_none());
1078        assert!(bridge.can_spawn());
1079        assert_eq!(bridge.local_state(), RegionState::Open);
1080    }
1081
1082    #[test]
1083    fn bridge_new_distributed() {
1084        let bridge = RegionBridge::new_distributed(
1085            RegionId::new_for_test(1, 0),
1086            None,
1087            Budget::default(),
1088            DistributedRegionConfig::default(),
1089        );
1090
1091        assert!(bridge.mode().is_distributed());
1092        assert!(bridge.distributed().is_some());
1093    }
1094
1095    #[test]
1096    fn bridge_with_mode_local() {
1097        let bridge = RegionBridge::with_mode(
1098            RegionId::new_for_test(1, 0),
1099            None,
1100            Budget::default(),
1101            RegionMode::Local,
1102        );
1103
1104        assert_eq!(bridge.mode(), RegionMode::Local);
1105    }
1106
1107    #[test]
1108    fn bridge_with_mode_distributed() {
1109        let bridge = RegionBridge::with_mode(
1110            RegionId::new_for_test(1, 0),
1111            None,
1112            Budget::default(),
1113            RegionMode::distributed(3),
1114        );
1115
1116        assert!(bridge.mode().is_distributed());
1117        assert!(bridge.distributed().is_some());
1118    }
1119
1120    // =====================================================================
1121    // Lifecycle Coordination Tests
1122    // =====================================================================
1123
1124    #[test]
1125    fn bridge_begin_close_local() {
1126        let mut bridge = create_local_bridge();
1127
1128        let result = bridge.begin_close(None, Time::from_secs(0)).unwrap();
1129
1130        assert!(result.local_changed);
1131        assert!(result.distributed_transition.is_none());
1132        assert_eq!(result.effective_state, EffectiveState::Closing);
1133    }
1134
1135    #[test]
1136    fn bridge_begin_close_distributed() {
1137        let mut bridge = create_distributed_bridge();
1138        // Activate the distributed region first.
1139        if let Some(ref mut dist) = bridge.distributed {
1140            let _ = dist.activate(Time::from_secs(0));
1141        }
1142
1143        let result = bridge.begin_close(None, Time::from_secs(1)).unwrap();
1144
1145        assert!(result.local_changed);
1146        assert!(result.distributed_transition.is_some());
1147        assert_eq!(result.effective_state, EffectiveState::Closing);
1148    }
1149
1150    #[test]
1151    fn bridge_full_lifecycle() {
1152        let mut bridge = create_local_bridge();
1153
1154        // Close.
1155        bridge.begin_close(None, Time::from_secs(0)).unwrap();
1156        assert!(!bridge.can_spawn());
1157
1158        // Drain.
1159        bridge.begin_drain().unwrap();
1160
1161        // Finalize.
1162        bridge.begin_finalize().unwrap();
1163
1164        // Complete.
1165        bridge.complete_close(Time::from_secs(1)).unwrap();
1166        assert_eq!(bridge.effective_state(), EffectiveState::Closed);
1167    }
1168
1169    #[test]
1170    fn bridge_cannot_spawn_when_closed() {
1171        let mut bridge = create_local_bridge();
1172        bridge.begin_close(None, Time::from_secs(0)).unwrap();
1173
1174        let result = bridge.add_task(TaskId::new_for_test(1, 0));
1175        assert!(result.is_err());
1176    }
1177
1178    // =====================================================================
1179    // Child/Task Management Tests
1180    // =====================================================================
1181
1182    #[test]
1183    fn bridge_add_remove_task() {
1184        let mut bridge = create_local_bridge();
1185        let task_id = TaskId::new_for_test(1, 0);
1186
1187        bridge.add_task(task_id).unwrap();
1188        assert!(bridge.has_live_work());
1189        assert!(bridge.sync_state.sync_pending);
1190
1191        bridge.remove_task(task_id);
1192        assert!(!bridge.has_live_work());
1193    }
1194
1195    #[test]
1196    fn bridge_add_remove_child() {
1197        let mut bridge = create_local_bridge();
1198        let child_id = RegionId::new_for_test(2, 0);
1199
1200        bridge.add_child(child_id).unwrap();
1201        assert!(bridge.has_live_work());
1202
1203        bridge.remove_child(child_id);
1204        assert!(!bridge.has_live_work());
1205    }
1206
1207    // =====================================================================
1208    // Sync Tests
1209    // =====================================================================
1210
1211    #[test]
1212    fn sync_not_needed_local() {
1213        let mut bridge = create_local_bridge();
1214        let result = bridge.sync().unwrap();
1215        assert!(matches!(result, SyncResult::NotNeeded));
1216    }
1217
1218    #[test]
1219    fn sync_after_changes() {
1220        let mut bridge = create_distributed_bridge();
1221        bridge.sync_state.sync_pending = true;
1222
1223        let result = bridge.sync().unwrap();
1224        assert!(matches!(result, SyncResult::Synced { .. }));
1225        assert!(!bridge.sync_state.sync_pending);
1226    }
1227
1228    // =====================================================================
1229    // Snapshot Tests
1230    // =====================================================================
1231
1232    #[test]
1233    fn create_snapshot_increments_sequence() {
1234        let mut bridge = create_local_bridge();
1235
1236        let snap1 = bridge.create_snapshot();
1237        let snap2 = bridge.create_snapshot();
1238
1239        assert_eq!(snap1.sequence, 1);
1240        assert_eq!(snap2.sequence, 2);
1241        assert_eq!(snap1.region_id, bridge.id());
1242    }
1243
1244    #[test]
1245    fn snapshot_includes_tasks() {
1246        let mut bridge = create_local_bridge();
1247        bridge.add_task(TaskId::new_for_test(1, 0)).unwrap();
1248        bridge.add_task(TaskId::new_for_test(2, 0)).unwrap();
1249
1250        let snap = bridge.create_snapshot();
1251        assert_eq!(snap.tasks.len(), 2);
1252    }
1253
1254    #[test]
1255    fn apply_snapshot_updates_sync_state() {
1256        let mut bridge = create_local_bridge();
1257        bridge.sync_state.sync_pending = true;
1258        bridge.sync_state.pending_ops = 7;
1259
1260        let snap = RegionSnapshot {
1261            region_id: bridge.id(),
1262            state: RegionState::Open,
1263            timestamp: Time::from_secs(100),
1264            sequence: 42,
1265            tasks: vec![],
1266            children: vec![],
1267            finalizer_count: 0,
1268            budget: BudgetSnapshot {
1269                deadline_nanos: None,
1270                polls_remaining: None,
1271                cost_remaining: None,
1272            },
1273            cancel_reason: None,
1274            parent: None,
1275            metadata: vec![],
1276        };
1277
1278        bridge.apply_snapshot(&snap).unwrap();
1279        assert_eq!(bridge.sync_state.last_synced_sequence, 42);
1280        assert!(!bridge.sync_state.sync_pending);
1281        assert_eq!(bridge.sync_state.pending_ops, 0);
1282    }
1283
1284    #[test]
1285    fn apply_snapshot_advances_local_sequence_counter() {
1286        let mut bridge = create_local_bridge();
1287
1288        let snap = RegionSnapshot {
1289            region_id: bridge.id(),
1290            state: RegionState::Open,
1291            timestamp: Time::from_secs(100),
1292            sequence: 42,
1293            tasks: vec![],
1294            children: vec![],
1295            finalizer_count: 0,
1296            budget: BudgetSnapshot {
1297                deadline_nanos: None,
1298                polls_remaining: None,
1299                cost_remaining: None,
1300            },
1301            cancel_reason: None,
1302            parent: None,
1303            metadata: vec![],
1304        };
1305
1306        bridge.apply_snapshot(&snap).unwrap();
1307
1308        let next = bridge.create_snapshot();
1309        assert_eq!(next.sequence, 43);
1310    }
1311
1312    #[test]
1313    fn apply_snapshot_mismatch() {
1314        let mut bridge = create_local_bridge();
1315
1316        let snap = RegionSnapshot {
1317            region_id: RegionId::new_for_test(999, 0),
1318            state: RegionState::Open,
1319            timestamp: Time::ZERO,
1320            sequence: 1,
1321            tasks: vec![],
1322            children: vec![],
1323            finalizer_count: 0,
1324            budget: BudgetSnapshot {
1325                deadline_nanos: None,
1326                polls_remaining: None,
1327                cost_remaining: None,
1328            },
1329            cancel_reason: None,
1330            parent: None,
1331            metadata: vec![],
1332        };
1333
1334        let result = bridge.apply_snapshot(&snap);
1335        assert!(result.is_err());
1336        assert_eq!(result.unwrap_err().kind(), ErrorKind::ObjectMismatch);
1337    }
1338
1339    // =====================================================================
1340    // Mode Upgrade Tests
1341    // =====================================================================
1342
1343    #[test]
1344    fn upgrade_local_to_distributed() {
1345        let mut bridge = create_local_bridge();
1346
1347        let config = DistributedRegionConfig {
1348            replication_factor: 3,
1349            ..Default::default()
1350        };
1351        let replicas = create_test_replicas(3);
1352
1353        let result = bridge.upgrade_to_distributed(config, &replicas).unwrap();
1354
1355        assert_eq!(result.previous_mode, RegionMode::Local);
1356        assert!(result.new_mode.is_distributed());
1357        assert!(bridge.distributed().is_some());
1358    }
1359
1360    #[test]
1361    fn upgrade_not_allowed() {
1362        let mut bridge = create_local_bridge();
1363        bridge.config.allow_upgrade = false;
1364
1365        let result = bridge
1366            .upgrade_to_distributed(DistributedRegionConfig::default(), &create_test_replicas(3));
1367
1368        assert!(result.is_err());
1369        assert_eq!(
1370            result.unwrap_err().kind(),
1371            ErrorKind::InvalidStateTransition
1372        );
1373    }
1374
1375    #[test]
1376    fn upgrade_already_distributed() {
1377        let mut bridge = create_distributed_bridge();
1378
1379        let result = bridge
1380            .upgrade_to_distributed(DistributedRegionConfig::default(), &create_test_replicas(3));
1381
1382        assert!(result.is_err());
1383    }
1384
1385    #[test]
1386    fn upgrade_only_from_open() {
1387        let mut bridge = create_local_bridge();
1388        bridge.begin_close(None, Time::from_secs(0)).unwrap();
1389
1390        let result = bridge
1391            .upgrade_to_distributed(DistributedRegionConfig::default(), &create_test_replicas(3));
1392
1393        assert!(result.is_err());
1394    }
1395
1396    // =====================================================================
1397    // Helpers
1398    // =====================================================================
1399
1400    fn create_local_bridge() -> RegionBridge {
1401        RegionBridge::new_local(RegionId::new_for_test(1, 0), None, Budget::default())
1402    }
1403
1404    fn create_distributed_bridge() -> RegionBridge {
1405        RegionBridge::new_distributed(
1406            RegionId::new_for_test(1, 0),
1407            None,
1408            Budget::default(),
1409            DistributedRegionConfig::default(),
1410        )
1411    }
1412
1413    fn create_test_replicas(count: usize) -> Vec<ReplicaInfo> {
1414        (0..count)
1415            .map(|i| ReplicaInfo::new(&format!("r{i}"), &format!("addr{i}")))
1416            .collect()
1417    }
1418
1419    // =====================================================================
1420    // Lifecycle Race / Edge Case Tests (bd-fgs0)
1421    // =====================================================================
1422
1423    #[test]
1424    fn upgrade_while_tasks_running() {
1425        // Upgrade Local→Distributed while tasks are active in the region.
1426        let mut bridge = create_local_bridge();
1427        bridge.add_task(TaskId::new_for_test(1, 0)).unwrap();
1428        bridge.add_task(TaskId::new_for_test(2, 0)).unwrap();
1429        assert!(bridge.has_live_work());
1430
1431        let config = DistributedRegionConfig {
1432            replication_factor: 3,
1433            ..Default::default()
1434        };
1435        let result = bridge
1436            .upgrade_to_distributed(config, &create_test_replicas(3))
1437            .unwrap();
1438
1439        assert!(result.new_mode.is_distributed());
1440        // Tasks should still be present after upgrade.
1441        assert!(bridge.has_live_work());
1442        // Snapshot taken during upgrade should include the tasks.
1443        assert!(result.snapshot_sequence > 0);
1444    }
1445
1446    #[test]
1447    fn snapshot_monotonic_under_rapid_changes() {
1448        let mut bridge = create_local_bridge();
1449
1450        let mut prev_seq = 0;
1451        for i in 0..20 {
1452            // Interleave task add/remove with snapshots.
1453            let tid = TaskId::new_for_test(i, 0);
1454            bridge.add_task(tid).unwrap();
1455            let snap = bridge.create_snapshot();
1456            assert!(
1457                snap.sequence > prev_seq,
1458                "sequence must be monotonically increasing"
1459            );
1460            prev_seq = snap.sequence;
1461            bridge.remove_task(tid);
1462        }
1463    }
1464
1465    #[test]
1466    fn double_close_local() {
1467        let mut bridge = create_local_bridge();
1468
1469        let result1 = bridge.begin_close(None, Time::from_secs(0)).unwrap();
1470        assert!(result1.local_changed);
1471
1472        // Second close — should not change state (already closing).
1473        let result2 = bridge.begin_close(None, Time::from_secs(1)).unwrap();
1474        assert!(!result2.local_changed);
1475        assert_eq!(result2.effective_state, EffectiveState::Closing);
1476    }
1477
1478    #[test]
1479    fn double_close_distributed() {
1480        let mut bridge = create_distributed_bridge();
1481
1482        let result1 = bridge.begin_close(None, Time::from_secs(0)).unwrap();
1483        assert!(result1.local_changed);
1484        assert!(result1.distributed_transition.is_some());
1485        assert_eq!(result1.effective_state, EffectiveState::Closing);
1486
1487        let result2 = bridge.begin_close(None, Time::from_secs(1)).unwrap();
1488        assert!(!result2.local_changed);
1489        assert!(result2.distributed_transition.is_none());
1490        assert_eq!(result2.effective_state, EffectiveState::Closing);
1491    }
1492
1493    #[test]
1494    fn double_complete_close_local() {
1495        let mut bridge = create_local_bridge();
1496        bridge.begin_close(None, Time::from_secs(0)).unwrap();
1497        bridge.begin_drain().unwrap();
1498        bridge.begin_finalize().unwrap();
1499
1500        let result1 = bridge.complete_close(Time::from_secs(1)).unwrap();
1501        assert!(result1.local_changed);
1502        assert_eq!(result1.effective_state, EffectiveState::Closed);
1503
1504        // Second complete_close — already closed, no change.
1505        let result2 = bridge.complete_close(Time::from_secs(2)).unwrap();
1506        assert!(!result2.local_changed);
1507    }
1508
1509    #[test]
1510    fn double_complete_close_distributed() {
1511        let mut bridge = create_distributed_bridge();
1512        bridge.begin_close(None, Time::from_secs(0)).unwrap();
1513        bridge.begin_drain().unwrap();
1514        bridge.begin_finalize().unwrap();
1515
1516        let result1 = bridge.complete_close(Time::from_secs(1)).unwrap();
1517        assert!(result1.local_changed);
1518        assert!(result1.distributed_transition.is_some());
1519        assert_eq!(result1.effective_state, EffectiveState::Closed);
1520
1521        let result2 = bridge.complete_close(Time::from_secs(2)).unwrap();
1522        assert!(!result2.local_changed);
1523        assert!(result2.distributed_transition.is_none());
1524        assert_eq!(result2.effective_state, EffectiveState::Closed);
1525    }
1526
1527    #[test]
1528    fn close_with_cancel_reason() {
1529        let mut bridge = create_local_bridge();
1530
1531        let reason = CancelReason::timeout();
1532        let result = bridge
1533            .begin_close(Some(reason), Time::from_secs(0))
1534            .unwrap();
1535
1536        assert!(result.local_changed);
1537        assert_eq!(result.effective_state, EffectiveState::Closing);
1538    }
1539
1540    #[test]
1541    fn add_child_after_close_rejected() {
1542        let mut bridge = create_local_bridge();
1543        bridge.begin_close(None, Time::from_secs(0)).unwrap();
1544
1545        let result = bridge.add_child(RegionId::new_for_test(2, 0));
1546        assert!(result.is_err());
1547    }
1548
1549    #[test]
1550    fn sync_not_needed_when_no_changes() {
1551        let mut bridge = create_distributed_bridge();
1552        // sync_pending is false by default.
1553        assert!(!bridge.sync_state.sync_pending);
1554
1555        let result = bridge.sync().unwrap();
1556        assert!(matches!(result, SyncResult::NotNeeded));
1557    }
1558
1559    #[test]
1560    fn sync_clears_pending_ops() {
1561        let mut bridge = create_distributed_bridge();
1562        bridge.sync_state.sync_pending = true;
1563        bridge.sync_state.pending_ops = 5;
1564
1565        let result = bridge.sync().unwrap();
1566        assert!(matches!(result, SyncResult::Synced { .. }));
1567        assert_eq!(bridge.sync_state.pending_ops, 0);
1568        assert!(!bridge.sync_state.sync_pending);
1569    }
1570
1571    #[test]
1572    fn pending_ops_counts_only_real_mutations() {
1573        let mut bridge = create_distributed_bridge();
1574
1575        bridge.add_task(TaskId::new_for_test(1, 0)).unwrap();
1576        bridge.add_task(TaskId::new_for_test(1, 0)).unwrap(); // duplicate, no mutation
1577        bridge.remove_task(TaskId::new_for_test(999, 0)); // absent, no mutation
1578        bridge.remove_task(TaskId::new_for_test(1, 0)); // present, mutation
1579
1580        bridge.add_child(RegionId::new_for_test(2, 0)).unwrap();
1581        bridge.add_child(RegionId::new_for_test(2, 0)).unwrap(); // duplicate, no mutation
1582        bridge.remove_child(RegionId::new_for_test(777, 0)); // absent, no mutation
1583        bridge.remove_child(RegionId::new_for_test(2, 0)); // present, mutation
1584
1585        assert!(bridge.sync_state.sync_pending);
1586        assert_eq!(bridge.sync_state.pending_ops, 4);
1587    }
1588
1589    #[test]
1590    fn close_transitions_mark_sync_pending() {
1591        let mut bridge = create_distributed_bridge();
1592        if let Some(ref mut dist) = bridge.distributed {
1593            let _ = dist.activate(Time::from_secs(0));
1594        }
1595        assert!(!bridge.sync_state.sync_pending);
1596        assert_eq!(bridge.sync_state.pending_ops, 0);
1597
1598        bridge.begin_close(None, Time::from_secs(1)).unwrap();
1599        assert!(bridge.sync_state.sync_pending);
1600        assert!(bridge.sync_state.pending_ops >= 1);
1601    }
1602
1603    #[test]
1604    fn upgrade_snapshot_sequence_matches() {
1605        let mut bridge = create_local_bridge();
1606
1607        // Create two snapshots first to advance sequence.
1608        let _ = bridge.create_snapshot();
1609        let _ = bridge.create_snapshot();
1610        assert_eq!(bridge.sequence, 2);
1611
1612        let config = DistributedRegionConfig {
1613            replication_factor: 3,
1614            ..Default::default()
1615        };
1616        let result = bridge
1617            .upgrade_to_distributed(config, &create_test_replicas(3))
1618            .unwrap();
1619
1620        // Upgrade creates a snapshot, so sequence should be 3.
1621        assert_eq!(result.snapshot_sequence, 3);
1622    }
1623
1624    #[test]
1625    fn bridge_with_mode_hybrid() {
1626        let bridge = RegionBridge::with_mode(
1627            RegionId::new_for_test(1, 0),
1628            None,
1629            Budget::default(),
1630            RegionMode::hybrid(2),
1631        );
1632
1633        assert!(bridge.mode().is_replicated());
1634        assert!(!bridge.mode().is_distributed());
1635        // Hybrid mode doesn't create distributed record in with_mode.
1636        assert!(bridge.distributed().is_none());
1637    }
1638
1639    #[test]
1640    fn effective_state_draining_with_distributed_closing() {
1641        let state =
1642            EffectiveState::compute(RegionState::Draining, Some(DistributedRegionState::Closing));
1643        assert_eq!(state, EffectiveState::Closing);
1644    }
1645
1646    #[test]
1647    fn effective_state_finalizing_with_distributed_closing() {
1648        let state = EffectiveState::compute(
1649            RegionState::Finalizing,
1650            Some(DistributedRegionState::Closing),
1651        );
1652        assert_eq!(state, EffectiveState::Closing);
1653    }
1654
1655    #[test]
1656    fn bridge_config_defaults() {
1657        let config = BridgeConfig::default();
1658        assert!(config.allow_upgrade);
1659        assert_eq!(config.sync_timeout, Duration::from_secs(5));
1660        assert_eq!(config.sync_mode, SyncMode::Synchronous);
1661        assert_eq!(
1662            config.conflict_resolution,
1663            ConflictResolution::DistributedWins
1664        );
1665    }
1666
1667    #[test]
1668    fn sync_state_default() {
1669        let state = SyncState::default();
1670        assert_eq!(state.last_synced_sequence, 0);
1671        assert!(!state.sync_pending);
1672        assert_eq!(state.pending_ops, 0);
1673        assert!(state.last_sync_time.is_none());
1674        assert!(state.last_sync_error.is_none());
1675    }
1676
1677    #[test]
1678    fn snapshot_includes_children() {
1679        let mut bridge = create_local_bridge();
1680        bridge.add_child(RegionId::new_for_test(2, 0)).unwrap();
1681        bridge.add_child(RegionId::new_for_test(3, 0)).unwrap();
1682
1683        let snap = bridge.create_snapshot();
1684        assert_eq!(snap.children.len(), 2);
1685    }
1686
1687    #[test]
1688    fn region_mode_debug_clone_copy_default_eq() {
1689        let m = RegionMode::default();
1690        assert_eq!(m, RegionMode::Local);
1691        let dbg = format!("{m:?}");
1692        assert!(dbg.contains("Local"), "{dbg}");
1693
1694        let dist = RegionMode::distributed(3);
1695        let copied: RegionMode = dist;
1696        let cloned = dist;
1697        assert_eq!(copied, cloned);
1698        assert_ne!(dist, RegionMode::Local);
1699    }
1700
1701    #[test]
1702    fn sync_mode_debug_clone_copy_eq() {
1703        let s = SyncMode::Synchronous;
1704        let dbg = format!("{s:?}");
1705        assert!(dbg.contains("Synchronous"), "{dbg}");
1706        let copied: SyncMode = s;
1707        let cloned = s;
1708        assert_eq!(copied, cloned);
1709        assert_ne!(s, SyncMode::Asynchronous);
1710    }
1711
1712    #[test]
1713    fn conflict_resolution_debug_clone_copy_eq() {
1714        let c = ConflictResolution::DistributedWins;
1715        let dbg = format!("{c:?}");
1716        assert!(dbg.contains("DistributedWins"), "{dbg}");
1717        let copied: ConflictResolution = c;
1718        let cloned = c;
1719        assert_eq!(copied, cloned);
1720    }
1721
1722    #[test]
1723    fn bridge_config_debug_clone_default() {
1724        let c = BridgeConfig::default();
1725        let dbg = format!("{c:?}");
1726        assert!(dbg.contains("BridgeConfig"), "{dbg}");
1727        assert!(c.allow_upgrade);
1728        let cloned = c;
1729        assert_eq!(format!("{cloned:?}"), dbg);
1730    }
1731
1732    #[test]
1733    fn effective_state_debug_clone_copy_eq() {
1734        let e = EffectiveState::Open;
1735        let dbg = format!("{e:?}");
1736        assert!(dbg.contains("Open"), "{dbg}");
1737        let copied: EffectiveState = e;
1738        let cloned = e;
1739        assert_eq!(copied, cloned);
1740        assert_ne!(e, EffectiveState::Closed);
1741    }
1742
1743    #[test]
1744    fn sync_state_debug_clone_default() {
1745        let s = SyncState::default();
1746        let dbg = format!("{s:?}");
1747        assert!(dbg.contains("SyncState"), "{dbg}");
1748        assert_eq!(s.pending_ops, 0);
1749        let cloned = s;
1750        assert_eq!(format!("{cloned:?}"), dbg);
1751    }
1752
1753    #[test]
1754    fn distributed_close_full_lifecycle() {
1755        let mut bridge = create_distributed_bridge();
1756        // Activate distributed record.
1757        if let Some(ref mut dist) = bridge.distributed {
1758            let _ = dist.activate(Time::from_secs(0));
1759        }
1760
1761        // Begin close — both local and distributed should transition.
1762        let result = bridge.begin_close(None, Time::from_secs(1)).unwrap();
1763        assert!(result.local_changed);
1764        assert!(result.distributed_transition.is_some());
1765
1766        // Drain and finalize.
1767        bridge.begin_drain().unwrap();
1768        bridge.begin_finalize().unwrap();
1769
1770        // Complete close.
1771        let result = bridge.complete_close(Time::from_secs(2)).unwrap();
1772        assert_eq!(result.effective_state, EffectiveState::Closed);
1773    }
1774
1775    // =================================================================
1776    // B6 Invariant Tests (asupersync-3narc.2.6)
1777    // =================================================================
1778
1779    /// Invariant: all state pairs that do NOT match an explicit rule in
1780    /// `EffectiveState::compute` must produce `Inconsistent` with the
1781    /// correct local and distributed states preserved.
1782    #[test]
1783    fn effective_state_inconsistent_pairs_are_exhaustive() {
1784        // These pairs should all produce Inconsistent.
1785        let inconsistent_pairs: &[(RegionState, DistributedRegionState)] = &[
1786            (RegionState::Closed, DistributedRegionState::Active),
1787            (RegionState::Closed, DistributedRegionState::Initializing),
1788            (RegionState::Closed, DistributedRegionState::Degraded),
1789            (RegionState::Closed, DistributedRegionState::Recovering),
1790            (RegionState::Closed, DistributedRegionState::Closing),
1791            (RegionState::Closing, DistributedRegionState::Active),
1792            (RegionState::Closing, DistributedRegionState::Initializing),
1793            (RegionState::Closing, DistributedRegionState::Degraded),
1794            (RegionState::Closing, DistributedRegionState::Recovering),
1795            (RegionState::Closing, DistributedRegionState::Closed),
1796            (RegionState::Draining, DistributedRegionState::Active),
1797            (RegionState::Draining, DistributedRegionState::Initializing),
1798            (RegionState::Draining, DistributedRegionState::Degraded),
1799            (RegionState::Draining, DistributedRegionState::Recovering),
1800            (RegionState::Draining, DistributedRegionState::Closed),
1801            (RegionState::Finalizing, DistributedRegionState::Active),
1802            (
1803                RegionState::Finalizing,
1804                DistributedRegionState::Initializing,
1805            ),
1806            (RegionState::Finalizing, DistributedRegionState::Degraded),
1807            (RegionState::Finalizing, DistributedRegionState::Recovering),
1808            (RegionState::Finalizing, DistributedRegionState::Closed),
1809            (RegionState::Open, DistributedRegionState::Closing),
1810            (RegionState::Open, DistributedRegionState::Closed),
1811        ];
1812
1813        for (local, distributed) in inconsistent_pairs {
1814            let state = EffectiveState::compute(*local, Some(*distributed));
1815            assert!(
1816                state.is_inconsistent(),
1817                "({local:?}, {distributed:?}) should be Inconsistent, got {state:?}"
1818            );
1819            if let EffectiveState::Inconsistent {
1820                local: l,
1821                distributed: d,
1822            } = state
1823            {
1824                assert_eq!(l, *local, "local state not preserved");
1825                assert_eq!(d, *distributed, "distributed state not preserved");
1826            }
1827        }
1828    }
1829
1830    /// Invariant: Hybrid mode bridge with no distributed record reports
1831    /// sync as NotNeeded, even though mode.is_replicated() is true.
1832    #[test]
1833    fn hybrid_mode_sync_not_needed_without_distributed_record() {
1834        let mut bridge = RegionBridge::with_mode(
1835            RegionId::new_for_test(1, 0),
1836            None,
1837            Budget::default(),
1838            RegionMode::hybrid(3),
1839        );
1840        assert!(bridge.mode().is_replicated());
1841        let sync = bridge.sync().unwrap();
1842        assert!(
1843            matches!(sync, SyncResult::NotNeeded),
1844            "hybrid mode without distributed record must report NotNeeded"
1845        );
1846    }
1847
1848    /// Regression: Hybrid mode sync stays NotNeeded when sync_pending is
1849    /// set but there is no distributed record to sync to. Without the
1850    /// distributed record, creating a snapshot is wasteful.
1851    #[test]
1852    fn hybrid_mode_sync_not_needed_with_pending_ops() {
1853        let mut bridge = RegionBridge::with_mode(
1854            RegionId::new_for_test(1, 0),
1855            None,
1856            Budget::default(),
1857            RegionMode::hybrid(3),
1858        );
1859        // Simulate pending ops without going through the full close path.
1860        bridge.sync_state.sync_pending = true;
1861        bridge.sync_state.pending_ops = 3;
1862
1863        let sync = bridge.sync().unwrap();
1864        assert!(
1865            matches!(sync, SyncResult::NotNeeded),
1866            "hybrid mode without distributed record must report NotNeeded even with pending ops"
1867        );
1868    }
1869}