Skip to main content

aura_agent/
reconfiguration.rs

1//! Runtime reconfiguration controller for protocol link/delegate operations.
2
3use aura_core::{
4    time::ProvenancedTime, AuthorityId, ComposedBundle, DelegationReceipt, SessionFootprint,
5    SessionId,
6};
7use std::collections::{BTreeSet, HashMap};
8
9#[cfg(feature = "choreo-backend-telltale-machine")]
10use telltale_machine::{
11    CanonicalPublicationContinuity, PendingEffectTreatment, ReconfigurationRuntimeSnapshot,
12    RuntimeUpgradeArtifact, RuntimeUpgradeExecution, RuntimeUpgradeRequest,
13    TransitionArtifactPhase,
14};
15
16/// Reconfiguration controller errors.
17#[derive(Debug, thiserror::Error, PartialEq, Eq)]
18pub enum ReconfigurationError {
19    /// Attempted to register a bundle id that already exists.
20    #[error("bundle already exists: {bundle_id}")]
21    DuplicateBundle { bundle_id: String },
22    /// Required bundle id does not exist.
23    #[error("bundle not found: {bundle_id}")]
24    BundleNotFound { bundle_id: String },
25    /// Linked bundles contain overlapping sessions.
26    #[error("cannot link bundles with overlapping sessions")]
27    OverlappingSessions,
28    /// Bundle interfaces are incompatible for linking.
29    #[error("bundle interfaces are incompatible for link")]
30    IncompatibleInterfaces,
31    /// Requested delegation references an unknown session owner.
32    #[error("session {session_id} not owned by authority {authority}")]
33    SessionNotOwned {
34        session_id: SessionId,
35        authority: AuthorityId,
36    },
37    /// Delegation produced a coherence violation.
38    #[error("reconfiguration coherence violation: {reason}")]
39    CoherenceViolation { reason: String },
40    /// Runtime-upgrade request violated the admitted public contract.
41    #[cfg(feature = "choreo-backend-telltale-machine")]
42    #[error("runtime upgrade rejected for bundle `{bundle_id}`: {reason}")]
43    InvalidRuntimeUpgrade { bundle_id: String, reason: String },
44}
45
46/// Coherence result for session footprints.
47#[derive(Debug, Clone, PartialEq, Eq)]
48pub enum CoherenceStatus {
49    /// No coherence violations detected.
50    Coherent,
51    /// One or more coherence violations detected.
52    Violations(Vec<String>),
53}
54
55/// Target footprint class for lifecycle session updates.
56#[derive(Debug, Clone, Copy, PartialEq, Eq)]
57pub enum SessionFootprintClass {
58    /// Session is hosted natively by this authority.
59    Native,
60    /// Session is delegated into this authority.
61    DelegatedIn,
62    /// Session is delegated out from this authority.
63    DelegatedOut,
64}
65
66/// Mutable runtime controller for link/delegate operations.
67#[derive(Debug, Clone, Default)]
68pub(crate) struct ReconfigurationController {
69    bundles: HashMap<String, ComposedBundle>,
70    footprints: HashMap<AuthorityId, SessionFootprint>,
71    delegation_log: Vec<DelegationReceipt>,
72    #[cfg(feature = "choreo-backend-telltale-machine")]
73    runtime_upgrade_snapshots: HashMap<String, ReconfigurationRuntimeSnapshot>,
74}
75
76impl ReconfigurationController {
77    /// Create an empty controller.
78    #[must_use]
79    pub(crate) fn new() -> Self {
80        Self::default()
81    }
82
83    /// Register an existing bundle before link/delegate operations.
84    pub(crate) fn register_bundle(
85        &mut self,
86        bundle: ComposedBundle,
87    ) -> Result<(), ReconfigurationError> {
88        if self.bundles.contains_key(&bundle.bundle_id) {
89            return Err(ReconfigurationError::DuplicateBundle {
90                bundle_id: bundle.bundle_id,
91            });
92        }
93        #[cfg(feature = "choreo-backend-telltale-machine")]
94        self.runtime_upgrade_snapshots
95            .entry(bundle.bundle_id.clone())
96            .or_insert_with(empty_runtime_upgrade_snapshot);
97        self.bundles.insert(bundle.bundle_id.clone(), bundle);
98        Ok(())
99    }
100
101    /// Snapshot a registered bundle by id.
102    #[must_use]
103    pub(crate) fn bundle(&self, bundle_id: &str) -> Option<&ComposedBundle> {
104        self.bundles.get(bundle_id)
105    }
106
107    /// Snapshot per-authority session footprint.
108    #[must_use]
109    pub(crate) fn footprint(&self, authority: &AuthorityId) -> Option<&SessionFootprint> {
110        self.footprints.get(authority)
111    }
112
113    /// Append/replace one authority footprint.
114    #[cfg(test)]
115    pub(crate) fn set_footprint(&mut self, authority: AuthorityId, footprint: SessionFootprint) {
116        self.footprints.insert(authority, footprint);
117    }
118
119    /// Extend an authority footprint with one session classification.
120    pub(crate) fn footprint_extend(
121        &mut self,
122        authority: AuthorityId,
123        session_id: SessionId,
124        class: SessionFootprintClass,
125    ) {
126        let footprint = self.footprints.entry(authority).or_default();
127        match class {
128            SessionFootprintClass::Native => footprint.add_native(session_id),
129            SessionFootprintClass::DelegatedIn => footprint.add_delegated_in(session_id),
130            SessionFootprintClass::DelegatedOut => footprint.add_delegated_out(session_id),
131        }
132    }
133
134    /// Remove a session from all ownership classes for one authority.
135    pub(crate) fn footprint_remove(&mut self, authority: AuthorityId, session_id: SessionId) {
136        if let Some(footprint) = self.footprints.get_mut(&authority) {
137            footprint.remove(session_id);
138        }
139    }
140
141    /// Read delegation receipts in insertion order.
142    #[cfg(test)]
143    #[must_use]
144    pub(crate) fn delegation_log(&self) -> &[DelegationReceipt] {
145        &self.delegation_log
146    }
147
148    /// Statically compose two bundles into one linked bundle.
149    pub(crate) fn link(
150        &mut self,
151        bundle_a: &str,
152        bundle_b: &str,
153        linked_bundle_id: impl Into<String>,
154    ) -> Result<ComposedBundle, ReconfigurationError> {
155        let left = self.bundles.get(bundle_a).cloned().ok_or_else(|| {
156            ReconfigurationError::BundleNotFound {
157                bundle_id: bundle_a.to_string(),
158            }
159        })?;
160        let right = self.bundles.get(bundle_b).cloned().ok_or_else(|| {
161            ReconfigurationError::BundleNotFound {
162                bundle_id: bundle_b.to_string(),
163            }
164        })?;
165
166        if !left.compatible_with(&right) || !right.compatible_with(&left) {
167            return Err(ReconfigurationError::IncompatibleInterfaces);
168        }
169
170        let left_sessions = left.session_footprint.all_sessions();
171        let right_sessions = right.session_footprint.all_sessions();
172        if !left_sessions.is_disjoint(&right_sessions) {
173            return Err(ReconfigurationError::OverlappingSessions);
174        }
175
176        let linked_bundle_id = linked_bundle_id.into();
177        let mut protocol_ids = left.protocol_ids;
178        protocol_ids.extend(right.protocol_ids);
179
180        let mut exports = left.exports;
181        exports.extend(right.exports);
182        let mut imports = left.imports;
183        imports.extend(right.imports);
184
185        let mut session_footprint = SessionFootprint::new();
186        for session_id in left.session_footprint.native_sessions {
187            session_footprint.add_native(session_id);
188        }
189        for session_id in right.session_footprint.native_sessions {
190            session_footprint.add_native(session_id);
191        }
192        for session_id in left.session_footprint.delegated_in_sessions {
193            session_footprint.add_delegated_in(session_id);
194        }
195        for session_id in right.session_footprint.delegated_in_sessions {
196            session_footprint.add_delegated_in(session_id);
197        }
198        for session_id in left.session_footprint.delegated_out_sessions {
199            session_footprint.add_delegated_out(session_id);
200        }
201        for session_id in right.session_footprint.delegated_out_sessions {
202            session_footprint.add_delegated_out(session_id);
203        }
204
205        let linked = ComposedBundle::new(
206            linked_bundle_id.clone(),
207            protocol_ids,
208            exports,
209            imports,
210            session_footprint,
211        );
212        self.register_bundle(linked.clone())?;
213        Ok(linked)
214    }
215
216    /// Dynamically delegate one session endpoint from `from_authority` to `to_authority`.
217    pub(crate) fn delegate(
218        &mut self,
219        session_id: SessionId,
220        from_authority: AuthorityId,
221        to_authority: AuthorityId,
222        bundle_id: Option<String>,
223        delegated_at: ProvenancedTime,
224    ) -> Result<DelegationReceipt, ReconfigurationError> {
225        if let Some(bundle_id) = &bundle_id {
226            if !self.bundles.contains_key(bundle_id) {
227                return Err(ReconfigurationError::BundleNotFound {
228                    bundle_id: bundle_id.clone(),
229                });
230            }
231        }
232
233        let from_before = self
234            .footprints
235            .get(&from_authority)
236            .cloned()
237            .unwrap_or_else(SessionFootprint::new);
238        if !from_before.contains(session_id) {
239            return Err(ReconfigurationError::SessionNotOwned {
240                session_id,
241                authority: from_authority,
242            });
243        }
244        let to_before = self
245            .footprints
246            .get(&to_authority)
247            .cloned()
248            .unwrap_or_else(SessionFootprint::new);
249
250        let mut candidate = self.clone_without_log();
251
252        // If from_authority had delegated_in for this session, they received it from
253        // a previous delegator. When re-delegating, the previous delegator's
254        // delegated_out should be cleared since the session has moved on.
255        if from_before.delegated_in_sessions.contains(&session_id) {
256            // Find the previous delegator (who has delegated_out for this session)
257            // and clear their delegated_out since the chain is being extended
258            for (authority, footprint) in &self.footprints {
259                if footprint.delegated_out_sessions.contains(&session_id)
260                    && *authority != from_authority
261                {
262                    candidate.footprint_remove(*authority, session_id);
263                }
264            }
265        }
266
267        candidate.footprint_remove(from_authority, session_id);
268        candidate.footprint_extend(
269            from_authority,
270            session_id,
271            SessionFootprintClass::DelegatedOut,
272        );
273        candidate.footprint_extend(to_authority, session_id, SessionFootprintClass::DelegatedIn);
274
275        if let CoherenceStatus::Violations(violations) = verify_coherence_map(&candidate.footprints)
276        {
277            return Err(ReconfigurationError::CoherenceViolation {
278                reason: violations.join("; "),
279            });
280        }
281        let from_after = candidate
282            .footprints
283            .get(&from_authority)
284            .cloned()
285            .unwrap_or_else(SessionFootprint::new);
286        let to_after = candidate
287            .footprints
288            .get(&to_authority)
289            .cloned()
290            .unwrap_or_else(SessionFootprint::new);
291
292        self.footprints = candidate.footprints;
293        let receipt = DelegationReceipt::new(
294            session_id,
295            from_authority,
296            to_authority,
297            bundle_id,
298            from_before,
299            from_after,
300            to_before,
301            to_after,
302            delegated_at,
303        );
304        self.delegation_log.push(receipt.clone());
305        Ok(receipt)
306    }
307
308    /// Verify global reconfiguration coherence across all tracked footprints.
309    #[must_use]
310    pub(crate) fn verify_coherence(&self) -> CoherenceStatus {
311        verify_coherence_map(&self.footprints)
312    }
313
314    fn clone_without_log(&self) -> Self {
315        Self {
316            bundles: self.bundles.clone(),
317            footprints: self.footprints.clone(),
318            delegation_log: Vec::new(),
319            #[cfg(feature = "choreo-backend-telltale-machine")]
320            runtime_upgrade_snapshots: self.runtime_upgrade_snapshots.clone(),
321        }
322    }
323
324    #[cfg(feature = "choreo-backend-telltale-machine")]
325    pub(crate) fn seed_runtime_upgrade_membership<I, S>(
326        &mut self,
327        bundle_id: &str,
328        members: I,
329    ) -> Result<(), ReconfigurationError>
330    where
331        I: IntoIterator<Item = S>,
332        S: Into<String>,
333    {
334        self.ensure_bundle_registered(bundle_id)?;
335        let active_members = canonical_member_list(members);
336        if active_members.is_empty() {
337            return Err(Self::invalid_runtime_upgrade(
338                bundle_id,
339                "active membership may not be empty",
340            ));
341        }
342        let snapshot = self.runtime_upgrade_snapshot_mut(bundle_id)?;
343        snapshot.epoch = 0;
344        snapshot.active_members = active_members;
345        snapshot.history.clear();
346        snapshot.plan_executions.clear();
347        snapshot.runtime_upgrades.clear();
348        Ok(())
349    }
350
351    #[cfg(feature = "choreo-backend-telltale-machine")]
352    pub(crate) fn runtime_upgrade_snapshot(
353        &self,
354        bundle_id: &str,
355    ) -> Result<ReconfigurationRuntimeSnapshot, ReconfigurationError> {
356        self.ensure_bundle_registered(bundle_id)?;
357        let snapshot = self.runtime_upgrade_snapshot_ref(bundle_id)?;
358        if snapshot.active_members.is_empty() {
359            return Err(Self::invalid_runtime_upgrade(
360                bundle_id,
361                "active membership must be seeded before exporting a runtime-upgrade snapshot",
362            ));
363        }
364        Ok(snapshot.clone())
365    }
366
367    #[cfg(feature = "choreo-backend-telltale-machine")]
368    pub(crate) fn execute_runtime_upgrade(
369        &mut self,
370        bundle_id: &str,
371        request: &RuntimeUpgradeRequest,
372    ) -> Result<RuntimeUpgradeExecution, ReconfigurationError> {
373        self.ensure_bundle_registered(bundle_id)?;
374        let snapshot = self.runtime_upgrade_snapshot_mut(bundle_id)?;
375        if snapshot.active_members.is_empty() {
376            return Err(Self::invalid_runtime_upgrade(
377                bundle_id,
378                "active membership must be seeded before executing a runtime upgrade",
379            ));
380        }
381
382        let active_members: BTreeSet<String> = snapshot.active_members.iter().cloned().collect();
383        let initial_members = snapshot.active_members.clone();
384        let next_members = request
385            .plan
386            .steps
387            .last()
388            .map(|step| canonical_member_list(step.next_members.iter().cloned()))
389            .unwrap_or_default();
390        let staged = runtime_upgrade_artifact(
391            request,
392            TransitionArtifactPhase::Staged,
393            initial_members.clone(),
394            next_members.clone(),
395            None,
396        );
397
398        if let Err(error) = validate_runtime_upgrade_request(bundle_id, &active_members, request) {
399            let execution = RuntimeUpgradeExecution {
400                artifact_id: bundle_id.to_string(),
401                upgrade_id: request.upgrade_id.clone(),
402                final_members: initial_members.clone(),
403                artifacts: vec![
404                    staged,
405                    runtime_upgrade_artifact(
406                        request,
407                        TransitionArtifactPhase::Failed,
408                        initial_members.clone(),
409                        initial_members.clone(),
410                        Some(error.to_string()),
411                    ),
412                ],
413            };
414            snapshot.runtime_upgrades.push(execution);
415            snapshot.epoch = snapshot.runtime_upgrades.len() as u64;
416            return Err(error);
417        }
418
419        let admitted = runtime_upgrade_artifact(
420            request,
421            TransitionArtifactPhase::Admitted,
422            initial_members.clone(),
423            next_members.clone(),
424            None,
425        );
426        snapshot.active_members = next_members.clone();
427        let execution = RuntimeUpgradeExecution {
428            artifact_id: bundle_id.to_string(),
429            upgrade_id: request.upgrade_id.clone(),
430            final_members: snapshot.active_members.clone(),
431            artifacts: vec![
432                staged,
433                admitted,
434                runtime_upgrade_artifact(
435                    request,
436                    TransitionArtifactPhase::CommittedCutover,
437                    initial_members,
438                    next_members,
439                    None,
440                ),
441            ],
442        };
443        snapshot.runtime_upgrades.push(execution.clone());
444        snapshot.epoch = snapshot.runtime_upgrades.len() as u64;
445        Ok(execution)
446    }
447
448    #[cfg(feature = "choreo-backend-telltale-machine")]
449    fn ensure_bundle_registered(&self, bundle_id: &str) -> Result<(), ReconfigurationError> {
450        if self.bundles.contains_key(bundle_id) {
451            Ok(())
452        } else {
453            Err(ReconfigurationError::BundleNotFound {
454                bundle_id: bundle_id.to_string(),
455            })
456        }
457    }
458
459    #[cfg(feature = "choreo-backend-telltale-machine")]
460    fn runtime_upgrade_snapshot_ref(
461        &self,
462        bundle_id: &str,
463    ) -> Result<&ReconfigurationRuntimeSnapshot, ReconfigurationError> {
464        self.runtime_upgrade_snapshots
465            .get(bundle_id)
466            .ok_or_else(|| ReconfigurationError::BundleNotFound {
467                bundle_id: bundle_id.to_string(),
468            })
469    }
470
471    #[cfg(feature = "choreo-backend-telltale-machine")]
472    fn runtime_upgrade_snapshot_mut(
473        &mut self,
474        bundle_id: &str,
475    ) -> Result<&mut ReconfigurationRuntimeSnapshot, ReconfigurationError> {
476        self.runtime_upgrade_snapshots
477            .get_mut(bundle_id)
478            .ok_or_else(|| ReconfigurationError::BundleNotFound {
479                bundle_id: bundle_id.to_string(),
480            })
481    }
482
483    #[cfg(feature = "choreo-backend-telltale-machine")]
484    fn invalid_runtime_upgrade(bundle_id: &str, reason: impl Into<String>) -> ReconfigurationError {
485        ReconfigurationError::InvalidRuntimeUpgrade {
486            bundle_id: bundle_id.to_string(),
487            reason: reason.into(),
488        }
489    }
490}
491
492#[cfg(feature = "choreo-backend-telltale-machine")]
493fn empty_runtime_upgrade_snapshot() -> ReconfigurationRuntimeSnapshot {
494    ReconfigurationRuntimeSnapshot {
495        epoch: 0,
496        active_members: Vec::new(),
497        history: Vec::new(),
498        plan_executions: Vec::new(),
499        runtime_upgrades: Vec::new(),
500    }
501}
502
503#[cfg(feature = "choreo-backend-telltale-machine")]
504fn canonical_member_list<I, S>(members: I) -> Vec<String>
505where
506    I: IntoIterator<Item = S>,
507    S: Into<String>,
508{
509    members
510        .into_iter()
511        .map(Into::into)
512        .collect::<BTreeSet<_>>()
513        .into_iter()
514        .collect()
515}
516
517#[cfg(feature = "choreo-backend-telltale-machine")]
518fn validate_runtime_upgrade_request(
519    bundle_id: &str,
520    active_members: &BTreeSet<String>,
521    request: &RuntimeUpgradeRequest,
522) -> Result<(), ReconfigurationError> {
523    if request.plan.steps.is_empty() {
524        return Err(ReconfigurationController::invalid_runtime_upgrade(
525            bundle_id,
526            format!(
527                "runtime upgrade `{}` must contain at least one plan step",
528                request.upgrade_id
529            ),
530        ));
531    }
532
533    if request
534        .carried_publication_ids
535        .iter()
536        .any(|publication_id| request.invalidated_publication_ids.contains(publication_id))
537    {
538        return Err(ReconfigurationController::invalid_runtime_upgrade(
539            bundle_id,
540            format!(
541                "runtime upgrade `{}` may not both carry and invalidate the same canonical publication",
542                request.upgrade_id
543            ),
544        ));
545    }
546
547    if request
548        .carried_obligation_ids
549        .iter()
550        .any(|obligation_id| request.invalidated_obligation_ids.contains(obligation_id))
551    {
552        return Err(ReconfigurationController::invalid_runtime_upgrade(
553            bundle_id,
554            format!(
555                "runtime upgrade `{}` may not both carry and invalidate the same obligation",
556                request.upgrade_id
557            ),
558        ));
559    }
560
561    if request.compatibility.ownership_continuity_required {
562        let first_members = request.plan.steps[0]
563            .next_members
564            .iter()
565            .cloned()
566            .collect::<BTreeSet<_>>();
567        if active_members.is_disjoint(&first_members) {
568            return Err(ReconfigurationController::invalid_runtime_upgrade(
569                bundle_id,
570                format!(
571                    "runtime upgrade `{}` requires ownership continuity across the first cutover",
572                    request.upgrade_id
573                ),
574            ));
575        }
576    }
577
578    if matches!(
579        request.compatibility.pending_effect_treatment,
580        PendingEffectTreatment::InvalidateBlocked
581    ) && request.carried_obligation_ids.is_empty()
582        && request.invalidated_obligation_ids.is_empty()
583    {
584        return Err(ReconfigurationController::invalid_runtime_upgrade(
585            bundle_id,
586            format!(
587                "runtime upgrade `{}` must make pending-effect treatment explicit",
588                request.upgrade_id
589            ),
590        ));
591    }
592
593    if matches!(
594        request.compatibility.canonical_publication_continuity,
595        CanonicalPublicationContinuity::PreserveCanonicalTruth
596    ) && !request.invalidated_publication_ids.is_empty()
597    {
598        return Err(ReconfigurationController::invalid_runtime_upgrade(
599            bundle_id,
600            format!(
601                "runtime upgrade `{}` may not invalidate canonical publications when continuity is required",
602                request.upgrade_id
603            ),
604        ));
605    }
606
607    for step in &request.plan.steps {
608        let members = step.next_members.iter().cloned().collect::<BTreeSet<_>>();
609        if members.is_empty() {
610            return Err(ReconfigurationController::invalid_runtime_upgrade(
611                bundle_id,
612                format!(
613                    "runtime upgrade `{}` step `{}` must preserve a non-empty membership set",
614                    request.upgrade_id, step.step_id
615                ),
616            ));
617        }
618    }
619
620    Ok(())
621}
622
623#[cfg(feature = "choreo-backend-telltale-machine")]
624fn runtime_upgrade_artifact(
625    request: &RuntimeUpgradeRequest,
626    phase: TransitionArtifactPhase,
627    previous_members: Vec<String>,
628    next_members: Vec<String>,
629    reason: Option<String>,
630) -> RuntimeUpgradeArtifact {
631    RuntimeUpgradeArtifact {
632        upgrade_id: request.upgrade_id.clone(),
633        phase,
634        previous_members,
635        next_members,
636        compatibility: request.compatibility.clone(),
637        carried_publication_ids: request.carried_publication_ids.clone(),
638        invalidated_publication_ids: request.invalidated_publication_ids.clone(),
639        carried_obligation_ids: request.carried_obligation_ids.clone(),
640        invalidated_obligation_ids: request.invalidated_obligation_ids.clone(),
641        reason,
642    }
643}
644
645fn verify_coherence_map(footprints: &HashMap<AuthorityId, SessionFootprint>) -> CoherenceStatus {
646    let mut violations = Vec::new();
647    let mut active_owners: HashMap<SessionId, Vec<AuthorityId>> = HashMap::new();
648    let mut delegated_out: HashMap<SessionId, BTreeSet<AuthorityId>> = HashMap::new();
649    let mut delegated_in: HashMap<SessionId, BTreeSet<AuthorityId>> = HashMap::new();
650
651    for (authority, footprint) in footprints {
652        for session_id in footprint
653            .native_sessions
654            .iter()
655            .chain(footprint.delegated_in_sessions.iter())
656        {
657            active_owners
658                .entry(*session_id)
659                .or_default()
660                .push(*authority);
661        }
662        for session_id in &footprint.delegated_out_sessions {
663            delegated_out
664                .entry(*session_id)
665                .or_default()
666                .insert(*authority);
667        }
668        for session_id in &footprint.delegated_in_sessions {
669            delegated_in
670                .entry(*session_id)
671                .or_default()
672                .insert(*authority);
673        }
674    }
675
676    for (session_id, owners) in active_owners {
677        if owners.len() > 1 {
678            violations.push(format!(
679                "session {session_id} has multiple active owners ({})",
680                owners.len()
681            ));
682        }
683    }
684
685    for (session_id, from_authorities) in delegated_out {
686        match delegated_in.get(&session_id) {
687            Some(to_authorities) if to_authorities.len() == 1 => {
688                if from_authorities.len() != 1 {
689                    violations.push(format!(
690                        "session {session_id} delegated out by {} authorities",
691                        from_authorities.len()
692                    ));
693                }
694            }
695            Some(to_authorities) => {
696                violations.push(format!(
697                    "session {session_id} delegated in to {} authorities",
698                    to_authorities.len()
699                ));
700            }
701            None => violations.push(format!(
702                "session {session_id} delegated out without delegated-in receiver"
703            )),
704        }
705    }
706
707    if violations.is_empty() {
708        CoherenceStatus::Coherent
709    } else {
710        CoherenceStatus::Violations(violations)
711    }
712}
713
714#[cfg(test)]
715mod tests {
716    use super::*;
717    use aura_core::time::{PhysicalTime, TimeStamp};
718    use std::collections::BTreeSet;
719    use uuid::Uuid;
720
721    fn authority(seed: u8) -> AuthorityId {
722        AuthorityId::new_from_entropy([seed; 32])
723    }
724
725    fn session(seed: u8) -> SessionId {
726        SessionId::from_uuid(Uuid::from_bytes([seed; 16]))
727    }
728
729    fn test_time(ts_ms: u64) -> ProvenancedTime {
730        ProvenancedTime {
731            stamp: TimeStamp::PhysicalClock(PhysicalTime {
732                ts_ms,
733                uncertainty: None,
734            }),
735            proofs: vec![],
736            origin: None,
737        }
738    }
739
740    #[test]
741    fn link_rejects_overlapping_session_footprints() {
742        let shared = session(9);
743        let mut controller = ReconfigurationController::new();
744        let mut left_fp = SessionFootprint::new();
745        left_fp.add_native(shared);
746        let mut right_fp = SessionFootprint::new();
747        right_fp.add_native(shared);
748
749        controller
750            .register_bundle(ComposedBundle::new(
751                "left",
752                vec!["p.left".to_string()],
753                BTreeSet::from(["x".to_string()]),
754                BTreeSet::new(),
755                left_fp,
756            ))
757            .expect("left bundle should register");
758        controller
759            .register_bundle(ComposedBundle::new(
760                "right",
761                vec!["p.right".to_string()],
762                BTreeSet::from(["y".to_string()]),
763                BTreeSet::new(),
764                right_fp,
765            ))
766            .expect("right bundle should register");
767
768        let err = controller
769            .link("left", "right", "linked")
770            .expect_err("overlapping sessions must be rejected");
771        assert_eq!(err, ReconfigurationError::OverlappingSessions);
772    }
773
774    #[test]
775    fn delegate_updates_footprints_and_appends_receipt() {
776        let from = authority(1);
777        let to = authority(2);
778        let sid = session(7);
779        let mut from_fp = SessionFootprint::new();
780        from_fp.add_native(sid);
781
782        let mut controller = ReconfigurationController::new();
783        controller.set_footprint(from, from_fp);
784
785        let receipt = controller
786            .delegate(sid, from, to, None, test_time(100))
787            .expect("delegation should succeed");
788
789        assert!(receipt.from_after.delegated_out_sessions.contains(&sid));
790        assert!(receipt.to_after.delegated_in_sessions.contains(&sid));
791        assert_eq!(controller.delegation_log().len(), 1);
792        assert_eq!(controller.verify_coherence(), CoherenceStatus::Coherent);
793    }
794
795    #[test]
796    fn footprint_extend_and_remove_updates_classification() {
797        let authority = authority(4);
798        let sid = session(6);
799        let mut controller = ReconfigurationController::new();
800
801        controller.footprint_extend(authority, sid, SessionFootprintClass::Native);
802        let footprint = controller
803            .footprint(&authority)
804            .expect("footprint should exist after extend");
805        assert!(footprint.native_sessions.contains(&sid));
806
807        controller.footprint_remove(authority, sid);
808        let footprint = controller
809            .footprint(&authority)
810            .expect("footprint should remain allocated");
811        assert!(!footprint.contains(sid));
812    }
813
814    #[test]
815    fn coherence_detects_orphaned_delegated_out_session() {
816        let sid = session(5);
817        let authority = authority(3);
818        let mut footprint = SessionFootprint::new();
819        footprint.add_delegated_out(sid);
820
821        let mut controller = ReconfigurationController::new();
822        controller.set_footprint(authority, footprint);
823
824        match controller.verify_coherence() {
825            CoherenceStatus::Coherent => panic!("orphaned delegated_out must be flagged"),
826            CoherenceStatus::Violations(violations) => assert!(!violations.is_empty()),
827        }
828    }
829
830    #[test]
831    fn repeated_delegation_under_churn_preserves_coherence() {
832        let a = authority(1);
833        let b = authority(2);
834        let c = authority(3);
835        let sid = session(10);
836        let mut controller = ReconfigurationController::new();
837        let mut footprint = SessionFootprint::new();
838        footprint.add_native(sid);
839        controller.set_footprint(a, footprint);
840
841        controller
842            .delegate(sid, a, b, None, test_time(1))
843            .expect("a->b delegation must succeed");
844        assert_eq!(controller.verify_coherence(), CoherenceStatus::Coherent);
845
846        controller
847            .delegate(sid, b, c, None, test_time(2))
848            .expect("b->c delegation must succeed");
849        assert_eq!(controller.verify_coherence(), CoherenceStatus::Coherent);
850
851        controller
852            .delegate(sid, c, a, None, test_time(3))
853            .expect("c->a delegation must succeed");
854        assert_eq!(controller.verify_coherence(), CoherenceStatus::Coherent);
855    }
856}