Skip to main content

rings_core/dht/
topology.rs

1#![warn(missing_docs)]
2//! Pure topology transition model for Chord.
3//!
4//! This module is the production home of the algebraic operators previously
5//! mirrored only in convergence tests. The mutable [`PeerRing`](crate::dht::PeerRing)
6//! shell interprets these pure transitions by writing successor/predecessor
7//! fields and by turning [`TopologyAction`](crate::dht::topology::TopologyAction)
8//! values into transport actions.
9//!
10//! State variables:
11//! - `R = Z / 2^160`, represented by [`Did`](crate::dht::Did).
12//! - `succ[n]` is the bounded successor sequence for node `n`.
13//! - `pred[n]` is the optional predecessor for node `n`.
14//! - `finger[n][i]` is the optional sparse/no-wrap finger-table entry at slot `i`.
15//!
16//! Law: join, remove, notify, stabilize, and finger maintenance are pure
17//! transitions over this state. Stabilize/notify/finger refinement are monotone
18//! over the finite known topology set; their least fixpoint is the converged
19//! Chord state plus a finger table derived from that topology.
20
21use num_bigint::BigUint;
22
23use super::Did;
24
25/// Ring bit-width; `Did` is `Z/2^160`.
26pub const RING_BITS: usize = 160;
27
28/// Default successor-list capacity used by the production builder and tests.
29pub const DEFAULT_SUCCESSOR_CAPACITY: usize = 3;
30
31/// Pure per-node topology state.
32#[derive(Clone, Debug, PartialEq, Eq)]
33pub struct TopologyState {
34    /// Local node identifier.
35    pub local: Did,
36    /// Known successors, ordered by clockwise distance from `local`.
37    pub successors: Vec<Did>,
38    /// Known predecessor.
39    pub predecessor: Option<Did>,
40    /// Sparse/no-wrap finger table.
41    pub fingers: Vec<Option<Did>>,
42    /// Next finger index maintained by the periodic finger fixer.
43    pub fix_finger_index: usize,
44}
45
46impl TopologyState {
47    /// Construct a pure topology state.
48    pub fn new(
49        local: Did,
50        successors: Vec<Did>,
51        predecessor: Option<Did>,
52        fingers: Vec<Option<Did>>,
53        fix_finger_index: usize,
54    ) -> Self {
55        Self {
56            local,
57            successors,
58            predecessor,
59            fingers,
60            fix_finger_index,
61        }
62    }
63}
64
65/// Pure result of looking up the owner of a DID in local topology state.
66#[derive(Clone, Copy, Debug, PartialEq, Eq)]
67pub enum FindSuccessorStep {
68    /// The local state can answer with this successor.
69    Local(Did),
70    /// The query must be forwarded to `next`.
71    Remote {
72        /// Next hop.
73        next: Did,
74        /// DID whose successor is being searched.
75        did: Did,
76    },
77}
78
79/// Pure topology input event.
80#[derive(Clone, Debug, PartialEq, Eq)]
81pub enum TopologyEvent {
82    /// A connected peer is introduced to the topology state.
83    Join {
84        /// Peer learned by the local node.
85        peer: Did,
86    },
87    /// A peer is removed from successor, predecessor, and finger state.
88    Remove {
89        /// Peer that left or failed.
90        peer: Did,
91    },
92    /// A successor candidate was accepted by the liveness/interpreter boundary.
93    UpdateSuccessor {
94        /// Candidate successor.
95        successor: Did,
96    },
97    /// HMCC/Zave notify input: one candidate predecessor notified this node.
98    Notify {
99        /// Candidate predecessor.
100        predecessor: Did,
101    },
102    /// HMCC/Zave stabilize input: topological information returned by the
103    /// current successor.
104    Stabilize {
105        /// Successor list reported by the successor.
106        successors: Vec<Did>,
107        /// Predecessor reported by the successor.
108        predecessor: Option<Did>,
109    },
110    /// Periodic finger-fix transition.
111    FixFinger,
112    /// Apply a reported successor to a fixed finger slot.
113    ApplyFinger {
114        /// Finger slot to update.
115        index: usize,
116        /// Successor reported for that slot.
117        successor: Did,
118    },
119}
120
121/// Pure topology side effect emitted by a transition.
122#[derive(Clone, Copy, Debug, PartialEq, Eq)]
123pub enum TopologyAction {
124    /// Ask `next` to find `did` and report with the connect handler.
125    FindSuccessorForConnect {
126        /// Next hop.
127        next: Did,
128        /// DID being searched.
129        did: Did,
130    },
131    /// Ask `next` to find `did` and report with the finger-fix handler.
132    FindSuccessorForFix {
133        /// Next hop.
134        next: Did,
135        /// DID being searched.
136        did: Did,
137        /// Finger slot to update when the report returns.
138        index: usize,
139    },
140    /// Query this improved successor for its successor list.
141    QuerySuccessorList(Did),
142    /// Notify this successor that `local` is its predecessor candidate.
143    Notify(Did),
144}
145
146/// Result of applying one pure topology transition.
147#[derive(Clone, Debug, PartialEq, Eq)]
148pub struct TopologyStep {
149    /// Next topology state.
150    pub state: TopologyState,
151    /// Actions to be interpreted by the effect layer.
152    pub actions: Vec<TopologyAction>,
153}
154
155/// `dist(a,b) == (b - a) mod 2^160`, the clockwise distance from `a` to `b`.
156pub fn dist(a: Did, b: Did) -> BigUint {
157    BigUint::from(b - a)
158}
159
160fn push_unique(xs: &mut Vec<Did>, x: Did) {
161    if !xs.contains(&x) {
162        xs.push(x);
163    }
164}
165
166fn sorted_successors(mut candidates: Vec<Did>, local: Did, capacity: usize) -> Vec<Did> {
167    candidates.retain(|&did| did != local);
168    candidates.sort_by_key(|&did| dist(local, did));
169    candidates.dedup();
170    candidates.truncate(capacity);
171    candidates
172}
173
174/// `Successors(n)`: the nearest forward nodes, ordered by clockwise distance.
175pub fn successors(all: &[Did], n: Did, capacity: usize) -> Vec<Did> {
176    sorted_successors(all.to_vec(), n, capacity)
177}
178
179/// `Predecessor(n)`: the nearest node behind `n`.
180pub fn predecessor(all: &[Did], n: Did) -> Option<Did> {
181    all.iter()
182        .copied()
183        .filter(|&did| did != n)
184        .max_by_key(|&did| dist(n, did))
185}
186
187/// `Finger(n, bit)`: nearest forward node at distance `>= 2^bit`, else `None`.
188///
189/// This mirrors Rings' sparse/no-wrap finger table, not the Chord paper's
190/// wrapping finger definition.
191pub fn finger(all: &[Did], n: Did, bit: usize) -> Option<Did> {
192    let threshold = BigUint::from(1u8) << bit;
193    all.iter()
194        .copied()
195        .filter(|&did| did != n && dist(n, did) >= threshold)
196        .min_by_key(|&did| dist(n, did))
197}
198
199/// Full sparse/no-wrap finger table predicted by the topology operator.
200pub fn finger_table(all: &[Did], n: Did) -> Vec<Option<Did>> {
201    (0..RING_BITS).map(|bit| finger(all, n, bit)).collect()
202}
203
204/// Correct successor list after introducing one candidate successor.
205pub fn update_successors(local: Did, current: &[Did], candidate: Did, capacity: usize) -> Vec<Did> {
206    let mut candidates = current.to_vec();
207    push_unique(&mut candidates, candidate);
208    sorted_successors(candidates, local, capacity)
209}
210
211fn finger_join(local: Did, current: &[Option<Did>], peer: Did) -> Vec<Option<Did>> {
212    let bias = dist(local, peer);
213    current
214        .iter()
215        .copied()
216        .enumerate()
217        .map(|(slot, old)| {
218            let pos = BigUint::from(Did::power_of_two(slot));
219            if bias < pos || peer == local {
220                old
221            } else {
222                match old {
223                    Some(existing) if dist(local, existing) < bias => old,
224                    _ => Some(peer),
225                }
226            }
227        })
228        .collect()
229}
230
231fn finger_remove(current: &[Option<Did>], peer: Did) -> Vec<Option<Did>> {
232    let mut next = current.to_vec();
233    let indexes = next
234        .iter()
235        .enumerate()
236        .filter(|(_, did)| **did == Some(peer))
237        .map(|(index, _)| index)
238        .collect::<Vec<_>>();
239    let (Some(first), Some(last)) = (indexes.first().copied(), indexes.last().copied()) else {
240        return next;
241    };
242    let replacement = next.get(last.saturating_add(1)).copied().flatten();
243    for slot in next.iter_mut().take(last.saturating_add(1)).skip(first) {
244        *slot = replacement;
245    }
246    next
247}
248
249fn finger_set(
250    local: Did,
251    current: &[Option<Did>],
252    index: usize,
253    successor: Did,
254) -> Vec<Option<Did>> {
255    let mut next = current.to_vec();
256    if successor != local {
257        if let Some(slot) = next.get_mut(index) {
258            *slot = Some(successor);
259        }
260    }
261    next
262}
263
264/// Pure Chord successor lookup against one topology state.
265pub fn find_successor(state: &TopologyState, did: Did) -> FindSuccessorStep {
266    let head = state.successors.first().copied().unwrap_or(state.local);
267    if state.successors.is_empty() || dist(state.local, did) <= dist(state.local, head) {
268        FindSuccessorStep::Local(head)
269    } else {
270        let next = state
271            .fingers
272            .iter()
273            .rev()
274            .flatten()
275            .copied()
276            .find(|peer| dist(state.local, *peer) < dist(state.local, did))
277            .unwrap_or(state.local);
278        FindSuccessorStep::Remote { next, did }
279    }
280}
281
282/// Correct predecessor value after one HMCC/Zave rectify transition.
283pub fn rectify_predecessor(local: Did, current: Option<Did>, candidate: Did) -> Option<Did> {
284    match current {
285        Some(cur) if dist(local, cur) >= dist(local, candidate) => Some(cur),
286        _ => Some(candidate),
287    }
288}
289
290/// Correct successor list after one HMCC/Zave stabilize transition.
291pub fn stabilize_successors(
292    local: Did,
293    current: &[Did],
294    topo_successors: &[Did],
295    topo_predecessor: Option<Did>,
296    capacity: usize,
297) -> Vec<Did> {
298    let mut known = vec![local];
299    for &did in current {
300        push_unique(&mut known, did);
301    }
302    if let Some(pred) = topo_predecessor {
303        push_unique(&mut known, pred);
304    }
305    for &did in topo_successors
306        .iter()
307        .take(topo_successors.len().saturating_sub(1))
308    {
309        push_unique(&mut known, did);
310    }
311    successors(&known, local, capacity)
312}
313
314/// Improved-successor query emitted by one HMCC/Zave stabilize transition.
315pub fn stabilize_query(local: Did, current: &[Did], topo_predecessor: Option<Did>) -> Option<Did> {
316    let pred = topo_predecessor?;
317    if pred == local {
318        return None;
319    }
320    let old_head = current.iter().copied().min_by_key(|&did| dist(local, did));
321    match old_head {
322        Some(head) if dist(local, pred) >= dist(local, head) => None,
323        _ => Some(pred),
324    }
325}
326
327/// Notify action emitted after one HMCC/Zave stabilize transition.
328pub fn stabilize_notify(local: Did, next_successors: &[Did]) -> Option<Did> {
329    next_successors.first().copied().filter(|&did| did != local)
330}
331
332fn step_join(state: &TopologyState, peer: Did, capacity: usize) -> TopologyStep {
333    if peer == state.local {
334        return TopologyStep {
335            state: state.clone(),
336            actions: Vec::new(),
337        };
338    }
339    TopologyStep {
340        state: TopologyState {
341            successors: update_successors(state.local, &state.successors, peer, capacity),
342            fingers: finger_join(state.local, &state.fingers, peer),
343            ..state.clone()
344        },
345        actions: vec![TopologyAction::FindSuccessorForConnect {
346            next: peer,
347            did: state.local,
348        }],
349    }
350}
351
352fn step_remove(state: &TopologyState, peer: Did, capacity: usize) -> TopologyStep {
353    let mut next_successors = state
354        .successors
355        .iter()
356        .copied()
357        .filter(|&did| did != peer)
358        .collect::<Vec<_>>();
359    let fingers = finger_remove(&state.fingers, peer);
360    if next_successors.is_empty() {
361        if let Some(first_finger) = fingers.iter().flatten().copied().next() {
362            next_successors =
363                update_successors(state.local, &next_successors, first_finger, capacity);
364        }
365    }
366    TopologyStep {
367        state: TopologyState {
368            successors: next_successors,
369            predecessor: state.predecessor.filter(|&did| did != peer),
370            fingers,
371            ..state.clone()
372        },
373        actions: Vec::new(),
374    }
375}
376
377fn step_update_successor(state: &TopologyState, successor: Did, capacity: usize) -> TopologyStep {
378    let next_successors = update_successors(state.local, &state.successors, successor, capacity);
379    let inserted = !state.successors.contains(&successor) && next_successors.contains(&successor);
380    TopologyStep {
381        state: TopologyState {
382            successors: next_successors,
383            ..state.clone()
384        },
385        actions: if inserted {
386            vec![TopologyAction::QuerySuccessorList(successor)]
387        } else {
388            Vec::new()
389        },
390    }
391}
392
393fn step_fix_finger(state: &TopologyState) -> TopologyStep {
394    if state.fingers.is_empty() {
395        return TopologyStep {
396            state: state.clone(),
397            actions: Vec::new(),
398        };
399    }
400    let index = (state.fix_finger_index + 1) % state.fingers.len();
401    let did = state.local + Did::power_of_two(index);
402    match find_successor(state, did) {
403        FindSuccessorStep::Local(successor) => TopologyStep {
404            state: TopologyState {
405                fingers: finger_set(state.local, &state.fingers, index, successor),
406                fix_finger_index: index,
407                ..state.clone()
408            },
409            actions: Vec::new(),
410        },
411        FindSuccessorStep::Remote { next, did } => TopologyStep {
412            state: TopologyState {
413                fix_finger_index: index,
414                ..state.clone()
415            },
416            actions: vec![TopologyAction::FindSuccessorForFix { next, did, index }],
417        },
418    }
419}
420
421/// Apply one pure topology transition.
422///
423/// Post: the returned state depends only on `state` and `event`; no locks,
424/// storage, clocks, randomness, or transport effects are read here.
425pub fn step(state: &TopologyState, event: TopologyEvent, capacity: usize) -> TopologyStep {
426    match event {
427        TopologyEvent::Join { peer } => step_join(state, peer, capacity),
428        TopologyEvent::Remove { peer } => step_remove(state, peer, capacity),
429        TopologyEvent::UpdateSuccessor { successor } => {
430            step_update_successor(state, successor, capacity)
431        }
432        TopologyEvent::Notify { predecessor } => TopologyStep {
433            state: TopologyState {
434                predecessor: rectify_predecessor(state.local, state.predecessor, predecessor),
435                ..state.clone()
436            },
437            actions: Vec::new(),
438        },
439        TopologyEvent::Stabilize {
440            successors: topo_successors,
441            predecessor: topo_predecessor,
442        } => {
443            let next_successors = stabilize_successors(
444                state.local,
445                &state.successors,
446                &topo_successors,
447                topo_predecessor,
448                capacity,
449            );
450            let mut actions = Vec::new();
451            if let Some(query) = stabilize_query(state.local, &state.successors, topo_predecessor) {
452                actions.push(TopologyAction::QuerySuccessorList(query));
453            }
454            if let Some(notify) = stabilize_notify(state.local, &next_successors) {
455                actions.push(TopologyAction::Notify(notify));
456            }
457            TopologyStep {
458                state: TopologyState {
459                    successors: next_successors,
460                    ..state.clone()
461                },
462                actions,
463            }
464        }
465        TopologyEvent::FixFinger => step_fix_finger(state),
466        TopologyEvent::ApplyFinger { index, successor } => TopologyStep {
467            state: TopologyState {
468                fingers: finger_set(state.local, &state.fingers, index, successor),
469                ..state.clone()
470            },
471            actions: Vec::new(),
472        },
473    }
474}
475
476#[cfg(test)]
477mod tests {
478    use num_bigint::BigUint;
479
480    use super::*;
481
482    fn did(value: u32) -> Did {
483        Did::from(value)
484    }
485
486    fn state(
487        local: Did,
488        successors: Vec<Did>,
489        predecessor: Option<Did>,
490        fingers: Vec<Option<Did>>,
491        fix_finger_index: usize,
492    ) -> TopologyState {
493        TopologyState::new(local, successors, predecessor, fingers, fix_finger_index)
494    }
495
496    fn successor_distances(local: Did, successors: &[Did], capacity: usize) -> Vec<BigUint> {
497        let infinity = BigUint::from(1u8) << RING_BITS;
498        (0..capacity)
499            .map(|index| {
500                successors
501                    .get(index)
502                    .map(|successor| dist(local, *successor))
503                    .unwrap_or_else(|| infinity.clone())
504            })
505            .collect()
506    }
507
508    fn refines_successor_distances(before: &TopologyState, after: &TopologyState) -> bool {
509        let before_distances =
510            successor_distances(before.local, &before.successors, DEFAULT_SUCCESSOR_CAPACITY);
511        let after_distances =
512            successor_distances(after.local, &after.successors, DEFAULT_SUCCESSOR_CAPACITY);
513        before_distances
514            .iter()
515            .zip(after_distances.iter())
516            .all(|(before, after)| after <= before)
517    }
518
519    #[test]
520    fn join_step_updates_successors_fingers_and_connect_action() {
521        let local = did(0);
522        let peer = did(8);
523        let next = step(
524            &state(local, vec![], None, vec![None; 5], 0),
525            TopologyEvent::Join { peer },
526            DEFAULT_SUCCESSOR_CAPACITY,
527        );
528
529        assert_eq!(next.state.successors, vec![peer]);
530        assert_eq!(next.state.fingers, vec![
531            Some(peer),
532            Some(peer),
533            Some(peer),
534            Some(peer),
535            None
536        ]);
537        assert_eq!(next.actions, vec![
538            TopologyAction::FindSuccessorForConnect {
539                next: peer,
540                did: local
541            }
542        ]);
543    }
544
545    #[test]
546    fn join_step_refines_successor_distance_vector() {
547        let local = did(0);
548        let current = state(local, vec![did(20), did(40)], None, vec![None; 5], 0);
549        let next = step(
550            &current,
551            TopologyEvent::Join { peer: did(10) },
552            DEFAULT_SUCCESSOR_CAPACITY,
553        );
554
555        assert!(refines_successor_distances(&current, &next.state));
556    }
557
558    #[test]
559    fn stabilize_step_refines_successor_distance_vector() {
560        let local = did(0);
561        let current = state(local, vec![did(40)], None, vec![None; 5], 0);
562        let next = step(
563            &current,
564            TopologyEvent::Stabilize {
565                successors: vec![did(50), did(60)],
566                predecessor: Some(did(10)),
567            },
568            DEFAULT_SUCCESSOR_CAPACITY,
569        );
570
571        assert!(refines_successor_distances(&current, &next.state));
572    }
573
574    #[test]
575    fn remove_step_removes_peer_from_every_topology_slot() {
576        let local = did(0);
577        let peer = did(8);
578        let next = step(
579            &state(
580                local,
581                vec![peer],
582                Some(peer),
583                vec![Some(peer), Some(peer)],
584                0,
585            ),
586            TopologyEvent::Remove { peer },
587            DEFAULT_SUCCESSOR_CAPACITY,
588        );
589
590        assert!(next.state.successors.is_empty());
591        assert_eq!(next.state.predecessor, None);
592        assert_eq!(next.state.fingers, vec![None, None]);
593        assert!(next.actions.is_empty());
594    }
595
596    #[test]
597    fn fix_finger_step_updates_local_successor_slot() {
598        let local = did(0);
599        let successor = did(8);
600        let next = step(
601            &state(local, vec![successor], None, vec![None; 4], 2),
602            TopologyEvent::FixFinger,
603            DEFAULT_SUCCESSOR_CAPACITY,
604        );
605
606        assert_eq!(next.state.fix_finger_index, 3);
607        assert_eq!(next.state.fingers, vec![None, None, None, Some(successor)]);
608        assert!(next.actions.is_empty());
609    }
610
611    #[test]
612    fn fix_finger_step_emits_indexed_remote_action() {
613        let local = did(0);
614        let successor = did(4);
615        let next_hop = did(6);
616        let next = step(
617            &state(
618                local,
619                vec![successor],
620                None,
621                vec![None, None, Some(next_hop), None],
622                2,
623            ),
624            TopologyEvent::FixFinger,
625            DEFAULT_SUCCESSOR_CAPACITY,
626        );
627
628        assert_eq!(next.state.fix_finger_index, 3);
629        assert_eq!(next.actions, vec![TopologyAction::FindSuccessorForFix {
630            next: next_hop,
631            did: Did::power_of_two(3),
632            index: 3
633        }]);
634    }
635
636    #[test]
637    fn fix_finger_step_queries_local_relative_probe() {
638        let local = did(100);
639        let successor = did(104);
640        let next_hop = did(106);
641        let next = step(
642            &state(
643                local,
644                vec![successor],
645                None,
646                vec![None, None, Some(next_hop), None],
647                2,
648            ),
649            TopologyEvent::FixFinger,
650            DEFAULT_SUCCESSOR_CAPACITY,
651        );
652
653        assert_eq!(next.state.fix_finger_index, 3);
654        assert_eq!(next.actions, vec![TopologyAction::FindSuccessorForFix {
655            next: next_hop,
656            did: local + Did::power_of_two(3),
657            index: 3
658        }]);
659    }
660
661    #[test]
662    fn apply_finger_step_updates_exact_slot() {
663        let local = did(0);
664        let successor = did(8);
665        let next = step(
666            &state(local, vec![], None, vec![None; 4], 0),
667            TopologyEvent::ApplyFinger {
668                index: 2,
669                successor,
670            },
671            DEFAULT_SUCCESSOR_CAPACITY,
672        );
673
674        assert_eq!(next.state.fingers, vec![None, None, Some(successor), None]);
675        assert!(next.actions.is_empty());
676    }
677
678    #[test]
679    fn apply_finger_step_ignores_self_and_out_of_range_slot() {
680        let local = did(0);
681        let current = state(local, vec![], None, vec![None; 2], 0);
682        let self_update = step(
683            &current,
684            TopologyEvent::ApplyFinger {
685                index: 1,
686                successor: local,
687            },
688            DEFAULT_SUCCESSOR_CAPACITY,
689        );
690        let out_of_range = step(
691            &current,
692            TopologyEvent::ApplyFinger {
693                index: 9,
694                successor: did(9),
695            },
696            DEFAULT_SUCCESSOR_CAPACITY,
697        );
698
699        assert_eq!(self_update.state, current);
700        assert_eq!(out_of_range.state, current);
701    }
702}