1#![warn(missing_docs)]
2use num_bigint::BigUint;
22
23use super::Did;
24
25pub const RING_BITS: usize = 160;
27
28pub const DEFAULT_SUCCESSOR_CAPACITY: usize = 3;
30
31#[derive(Clone, Debug, PartialEq, Eq)]
33pub struct TopologyState {
34 pub local: Did,
36 pub successors: Vec<Did>,
38 pub predecessor: Option<Did>,
40 pub fingers: Vec<Option<Did>>,
42 pub fix_finger_index: usize,
44}
45
46impl TopologyState {
47 pub fn new(
49 local: Did,
50 successors: Vec<Did>,
51 predecessor: Option<Did>,
52 fingers: Vec<Option<Did>>,
53 fix_finger_index: usize,
54 ) -> Self {
55 Self {
56 local,
57 successors,
58 predecessor,
59 fingers,
60 fix_finger_index,
61 }
62 }
63}
64
65#[derive(Clone, Copy, Debug, PartialEq, Eq)]
67pub enum FindSuccessorStep {
68 Local(Did),
70 Remote {
72 next: Did,
74 did: Did,
76 },
77}
78
79#[derive(Clone, Debug, PartialEq, Eq)]
81pub enum TopologyEvent {
82 Join {
84 peer: Did,
86 },
87 Remove {
89 peer: Did,
91 },
92 UpdateSuccessor {
94 successor: Did,
96 },
97 Notify {
99 predecessor: Did,
101 },
102 Stabilize {
105 successors: Vec<Did>,
107 predecessor: Option<Did>,
109 },
110 FixFinger,
112 ApplyFinger {
114 index: usize,
116 successor: Did,
118 },
119}
120
121#[derive(Clone, Copy, Debug, PartialEq, Eq)]
123pub enum TopologyAction {
124 FindSuccessorForConnect {
126 next: Did,
128 did: Did,
130 },
131 FindSuccessorForFix {
133 next: Did,
135 did: Did,
137 index: usize,
139 },
140 QuerySuccessorList(Did),
142 Notify(Did),
144}
145
146#[derive(Clone, Debug, PartialEq, Eq)]
148pub struct TopologyStep {
149 pub state: TopologyState,
151 pub actions: Vec<TopologyAction>,
153}
154
155pub fn dist(a: Did, b: Did) -> BigUint {
157 BigUint::from(b - a)
158}
159
160fn push_unique(xs: &mut Vec<Did>, x: Did) {
161 if !xs.contains(&x) {
162 xs.push(x);
163 }
164}
165
166fn sorted_successors(mut candidates: Vec<Did>, local: Did, capacity: usize) -> Vec<Did> {
167 candidates.retain(|&did| did != local);
168 candidates.sort_by_key(|&did| dist(local, did));
169 candidates.dedup();
170 candidates.truncate(capacity);
171 candidates
172}
173
174pub fn successors(all: &[Did], n: Did, capacity: usize) -> Vec<Did> {
176 sorted_successors(all.to_vec(), n, capacity)
177}
178
179pub fn predecessor(all: &[Did], n: Did) -> Option<Did> {
181 all.iter()
182 .copied()
183 .filter(|&did| did != n)
184 .max_by_key(|&did| dist(n, did))
185}
186
187pub fn finger(all: &[Did], n: Did, bit: usize) -> Option<Did> {
192 let threshold = BigUint::from(1u8) << bit;
193 all.iter()
194 .copied()
195 .filter(|&did| did != n && dist(n, did) >= threshold)
196 .min_by_key(|&did| dist(n, did))
197}
198
199pub fn finger_table(all: &[Did], n: Did) -> Vec<Option<Did>> {
201 (0..RING_BITS).map(|bit| finger(all, n, bit)).collect()
202}
203
204pub fn update_successors(local: Did, current: &[Did], candidate: Did, capacity: usize) -> Vec<Did> {
206 let mut candidates = current.to_vec();
207 push_unique(&mut candidates, candidate);
208 sorted_successors(candidates, local, capacity)
209}
210
211fn finger_join(local: Did, current: &[Option<Did>], peer: Did) -> Vec<Option<Did>> {
212 let bias = dist(local, peer);
213 current
214 .iter()
215 .copied()
216 .enumerate()
217 .map(|(slot, old)| {
218 let pos = BigUint::from(Did::power_of_two(slot));
219 if bias < pos || peer == local {
220 old
221 } else {
222 match old {
223 Some(existing) if dist(local, existing) < bias => old,
224 _ => Some(peer),
225 }
226 }
227 })
228 .collect()
229}
230
231fn finger_remove(current: &[Option<Did>], peer: Did) -> Vec<Option<Did>> {
232 let mut next = current.to_vec();
233 let indexes = next
234 .iter()
235 .enumerate()
236 .filter(|(_, did)| **did == Some(peer))
237 .map(|(index, _)| index)
238 .collect::<Vec<_>>();
239 let (Some(first), Some(last)) = (indexes.first().copied(), indexes.last().copied()) else {
240 return next;
241 };
242 let replacement = next.get(last.saturating_add(1)).copied().flatten();
243 for slot in next.iter_mut().take(last.saturating_add(1)).skip(first) {
244 *slot = replacement;
245 }
246 next
247}
248
249fn finger_set(
250 local: Did,
251 current: &[Option<Did>],
252 index: usize,
253 successor: Did,
254) -> Vec<Option<Did>> {
255 let mut next = current.to_vec();
256 if successor != local {
257 if let Some(slot) = next.get_mut(index) {
258 *slot = Some(successor);
259 }
260 }
261 next
262}
263
264pub fn find_successor(state: &TopologyState, did: Did) -> FindSuccessorStep {
266 let head = state.successors.first().copied().unwrap_or(state.local);
267 if state.successors.is_empty() || dist(state.local, did) <= dist(state.local, head) {
268 FindSuccessorStep::Local(head)
269 } else {
270 let next = state
271 .fingers
272 .iter()
273 .rev()
274 .flatten()
275 .copied()
276 .find(|peer| dist(state.local, *peer) < dist(state.local, did))
277 .unwrap_or(state.local);
278 FindSuccessorStep::Remote { next, did }
279 }
280}
281
282pub fn rectify_predecessor(local: Did, current: Option<Did>, candidate: Did) -> Option<Did> {
284 match current {
285 Some(cur) if dist(local, cur) >= dist(local, candidate) => Some(cur),
286 _ => Some(candidate),
287 }
288}
289
290pub fn stabilize_successors(
292 local: Did,
293 current: &[Did],
294 topo_successors: &[Did],
295 topo_predecessor: Option<Did>,
296 capacity: usize,
297) -> Vec<Did> {
298 let mut known = vec![local];
299 for &did in current {
300 push_unique(&mut known, did);
301 }
302 if let Some(pred) = topo_predecessor {
303 push_unique(&mut known, pred);
304 }
305 for &did in topo_successors
306 .iter()
307 .take(topo_successors.len().saturating_sub(1))
308 {
309 push_unique(&mut known, did);
310 }
311 successors(&known, local, capacity)
312}
313
314pub fn stabilize_query(local: Did, current: &[Did], topo_predecessor: Option<Did>) -> Option<Did> {
316 let pred = topo_predecessor?;
317 if pred == local {
318 return None;
319 }
320 let old_head = current.iter().copied().min_by_key(|&did| dist(local, did));
321 match old_head {
322 Some(head) if dist(local, pred) >= dist(local, head) => None,
323 _ => Some(pred),
324 }
325}
326
327pub fn stabilize_notify(local: Did, next_successors: &[Did]) -> Option<Did> {
329 next_successors.first().copied().filter(|&did| did != local)
330}
331
332fn step_join(state: &TopologyState, peer: Did, capacity: usize) -> TopologyStep {
333 if peer == state.local {
334 return TopologyStep {
335 state: state.clone(),
336 actions: Vec::new(),
337 };
338 }
339 TopologyStep {
340 state: TopologyState {
341 successors: update_successors(state.local, &state.successors, peer, capacity),
342 fingers: finger_join(state.local, &state.fingers, peer),
343 ..state.clone()
344 },
345 actions: vec![TopologyAction::FindSuccessorForConnect {
346 next: peer,
347 did: state.local,
348 }],
349 }
350}
351
352fn step_remove(state: &TopologyState, peer: Did, capacity: usize) -> TopologyStep {
353 let mut next_successors = state
354 .successors
355 .iter()
356 .copied()
357 .filter(|&did| did != peer)
358 .collect::<Vec<_>>();
359 let fingers = finger_remove(&state.fingers, peer);
360 if next_successors.is_empty() {
361 if let Some(first_finger) = fingers.iter().flatten().copied().next() {
362 next_successors =
363 update_successors(state.local, &next_successors, first_finger, capacity);
364 }
365 }
366 TopologyStep {
367 state: TopologyState {
368 successors: next_successors,
369 predecessor: state.predecessor.filter(|&did| did != peer),
370 fingers,
371 ..state.clone()
372 },
373 actions: Vec::new(),
374 }
375}
376
377fn step_update_successor(state: &TopologyState, successor: Did, capacity: usize) -> TopologyStep {
378 let next_successors = update_successors(state.local, &state.successors, successor, capacity);
379 let inserted = !state.successors.contains(&successor) && next_successors.contains(&successor);
380 TopologyStep {
381 state: TopologyState {
382 successors: next_successors,
383 ..state.clone()
384 },
385 actions: if inserted {
386 vec![TopologyAction::QuerySuccessorList(successor)]
387 } else {
388 Vec::new()
389 },
390 }
391}
392
393fn step_fix_finger(state: &TopologyState) -> TopologyStep {
394 if state.fingers.is_empty() {
395 return TopologyStep {
396 state: state.clone(),
397 actions: Vec::new(),
398 };
399 }
400 let index = (state.fix_finger_index + 1) % state.fingers.len();
401 let did = state.local + Did::power_of_two(index);
402 match find_successor(state, did) {
403 FindSuccessorStep::Local(successor) => TopologyStep {
404 state: TopologyState {
405 fingers: finger_set(state.local, &state.fingers, index, successor),
406 fix_finger_index: index,
407 ..state.clone()
408 },
409 actions: Vec::new(),
410 },
411 FindSuccessorStep::Remote { next, did } => TopologyStep {
412 state: TopologyState {
413 fix_finger_index: index,
414 ..state.clone()
415 },
416 actions: vec![TopologyAction::FindSuccessorForFix { next, did, index }],
417 },
418 }
419}
420
421pub fn step(state: &TopologyState, event: TopologyEvent, capacity: usize) -> TopologyStep {
426 match event {
427 TopologyEvent::Join { peer } => step_join(state, peer, capacity),
428 TopologyEvent::Remove { peer } => step_remove(state, peer, capacity),
429 TopologyEvent::UpdateSuccessor { successor } => {
430 step_update_successor(state, successor, capacity)
431 }
432 TopologyEvent::Notify { predecessor } => TopologyStep {
433 state: TopologyState {
434 predecessor: rectify_predecessor(state.local, state.predecessor, predecessor),
435 ..state.clone()
436 },
437 actions: Vec::new(),
438 },
439 TopologyEvent::Stabilize {
440 successors: topo_successors,
441 predecessor: topo_predecessor,
442 } => {
443 let next_successors = stabilize_successors(
444 state.local,
445 &state.successors,
446 &topo_successors,
447 topo_predecessor,
448 capacity,
449 );
450 let mut actions = Vec::new();
451 if let Some(query) = stabilize_query(state.local, &state.successors, topo_predecessor) {
452 actions.push(TopologyAction::QuerySuccessorList(query));
453 }
454 if let Some(notify) = stabilize_notify(state.local, &next_successors) {
455 actions.push(TopologyAction::Notify(notify));
456 }
457 TopologyStep {
458 state: TopologyState {
459 successors: next_successors,
460 ..state.clone()
461 },
462 actions,
463 }
464 }
465 TopologyEvent::FixFinger => step_fix_finger(state),
466 TopologyEvent::ApplyFinger { index, successor } => TopologyStep {
467 state: TopologyState {
468 fingers: finger_set(state.local, &state.fingers, index, successor),
469 ..state.clone()
470 },
471 actions: Vec::new(),
472 },
473 }
474}
475
476#[cfg(test)]
477mod tests {
478 use num_bigint::BigUint;
479
480 use super::*;
481
482 fn did(value: u32) -> Did {
483 Did::from(value)
484 }
485
486 fn state(
487 local: Did,
488 successors: Vec<Did>,
489 predecessor: Option<Did>,
490 fingers: Vec<Option<Did>>,
491 fix_finger_index: usize,
492 ) -> TopologyState {
493 TopologyState::new(local, successors, predecessor, fingers, fix_finger_index)
494 }
495
496 fn successor_distances(local: Did, successors: &[Did], capacity: usize) -> Vec<BigUint> {
497 let infinity = BigUint::from(1u8) << RING_BITS;
498 (0..capacity)
499 .map(|index| {
500 successors
501 .get(index)
502 .map(|successor| dist(local, *successor))
503 .unwrap_or_else(|| infinity.clone())
504 })
505 .collect()
506 }
507
508 fn refines_successor_distances(before: &TopologyState, after: &TopologyState) -> bool {
509 let before_distances =
510 successor_distances(before.local, &before.successors, DEFAULT_SUCCESSOR_CAPACITY);
511 let after_distances =
512 successor_distances(after.local, &after.successors, DEFAULT_SUCCESSOR_CAPACITY);
513 before_distances
514 .iter()
515 .zip(after_distances.iter())
516 .all(|(before, after)| after <= before)
517 }
518
519 #[test]
520 fn join_step_updates_successors_fingers_and_connect_action() {
521 let local = did(0);
522 let peer = did(8);
523 let next = step(
524 &state(local, vec![], None, vec![None; 5], 0),
525 TopologyEvent::Join { peer },
526 DEFAULT_SUCCESSOR_CAPACITY,
527 );
528
529 assert_eq!(next.state.successors, vec![peer]);
530 assert_eq!(next.state.fingers, vec![
531 Some(peer),
532 Some(peer),
533 Some(peer),
534 Some(peer),
535 None
536 ]);
537 assert_eq!(next.actions, vec![
538 TopologyAction::FindSuccessorForConnect {
539 next: peer,
540 did: local
541 }
542 ]);
543 }
544
545 #[test]
546 fn join_step_refines_successor_distance_vector() {
547 let local = did(0);
548 let current = state(local, vec![did(20), did(40)], None, vec![None; 5], 0);
549 let next = step(
550 ¤t,
551 TopologyEvent::Join { peer: did(10) },
552 DEFAULT_SUCCESSOR_CAPACITY,
553 );
554
555 assert!(refines_successor_distances(¤t, &next.state));
556 }
557
558 #[test]
559 fn stabilize_step_refines_successor_distance_vector() {
560 let local = did(0);
561 let current = state(local, vec![did(40)], None, vec![None; 5], 0);
562 let next = step(
563 ¤t,
564 TopologyEvent::Stabilize {
565 successors: vec![did(50), did(60)],
566 predecessor: Some(did(10)),
567 },
568 DEFAULT_SUCCESSOR_CAPACITY,
569 );
570
571 assert!(refines_successor_distances(¤t, &next.state));
572 }
573
574 #[test]
575 fn remove_step_removes_peer_from_every_topology_slot() {
576 let local = did(0);
577 let peer = did(8);
578 let next = step(
579 &state(
580 local,
581 vec![peer],
582 Some(peer),
583 vec![Some(peer), Some(peer)],
584 0,
585 ),
586 TopologyEvent::Remove { peer },
587 DEFAULT_SUCCESSOR_CAPACITY,
588 );
589
590 assert!(next.state.successors.is_empty());
591 assert_eq!(next.state.predecessor, None);
592 assert_eq!(next.state.fingers, vec![None, None]);
593 assert!(next.actions.is_empty());
594 }
595
596 #[test]
597 fn fix_finger_step_updates_local_successor_slot() {
598 let local = did(0);
599 let successor = did(8);
600 let next = step(
601 &state(local, vec![successor], None, vec![None; 4], 2),
602 TopologyEvent::FixFinger,
603 DEFAULT_SUCCESSOR_CAPACITY,
604 );
605
606 assert_eq!(next.state.fix_finger_index, 3);
607 assert_eq!(next.state.fingers, vec![None, None, None, Some(successor)]);
608 assert!(next.actions.is_empty());
609 }
610
611 #[test]
612 fn fix_finger_step_emits_indexed_remote_action() {
613 let local = did(0);
614 let successor = did(4);
615 let next_hop = did(6);
616 let next = step(
617 &state(
618 local,
619 vec![successor],
620 None,
621 vec![None, None, Some(next_hop), None],
622 2,
623 ),
624 TopologyEvent::FixFinger,
625 DEFAULT_SUCCESSOR_CAPACITY,
626 );
627
628 assert_eq!(next.state.fix_finger_index, 3);
629 assert_eq!(next.actions, vec![TopologyAction::FindSuccessorForFix {
630 next: next_hop,
631 did: Did::power_of_two(3),
632 index: 3
633 }]);
634 }
635
636 #[test]
637 fn fix_finger_step_queries_local_relative_probe() {
638 let local = did(100);
639 let successor = did(104);
640 let next_hop = did(106);
641 let next = step(
642 &state(
643 local,
644 vec![successor],
645 None,
646 vec![None, None, Some(next_hop), None],
647 2,
648 ),
649 TopologyEvent::FixFinger,
650 DEFAULT_SUCCESSOR_CAPACITY,
651 );
652
653 assert_eq!(next.state.fix_finger_index, 3);
654 assert_eq!(next.actions, vec![TopologyAction::FindSuccessorForFix {
655 next: next_hop,
656 did: local + Did::power_of_two(3),
657 index: 3
658 }]);
659 }
660
661 #[test]
662 fn apply_finger_step_updates_exact_slot() {
663 let local = did(0);
664 let successor = did(8);
665 let next = step(
666 &state(local, vec![], None, vec![None; 4], 0),
667 TopologyEvent::ApplyFinger {
668 index: 2,
669 successor,
670 },
671 DEFAULT_SUCCESSOR_CAPACITY,
672 );
673
674 assert_eq!(next.state.fingers, vec![None, None, Some(successor), None]);
675 assert!(next.actions.is_empty());
676 }
677
678 #[test]
679 fn apply_finger_step_ignores_self_and_out_of_range_slot() {
680 let local = did(0);
681 let current = state(local, vec![], None, vec![None; 2], 0);
682 let self_update = step(
683 ¤t,
684 TopologyEvent::ApplyFinger {
685 index: 1,
686 successor: local,
687 },
688 DEFAULT_SUCCESSOR_CAPACITY,
689 );
690 let out_of_range = step(
691 ¤t,
692 TopologyEvent::ApplyFinger {
693 index: 9,
694 successor: did(9),
695 },
696 DEFAULT_SUCCESSOR_CAPACITY,
697 );
698
699 assert_eq!(self_update.state, current);
700 assert_eq!(out_of_range.state, current);
701 }
702}