1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
use crate::dht::{ChordStablize, PeerRing, PeerRingAction, PeerRingRemoteAction};
use crate::err::Result;
use crate::message::{
FindSuccessorSend, Message, MessageRelay, MessageRelayMethod, NotifyPredecessorSend,
};
use crate::swarm::Swarm;
use futures::lock::Mutex;
use std::sync::Arc;
pub struct Stabilization {
chord: Arc<Mutex<PeerRing>>,
swarm: Arc<Swarm>,
timeout: usize,
}
impl Stabilization {
pub fn new(chord: Arc<Mutex<PeerRing>>, swarm: Arc<Swarm>, timeout: usize) -> Self {
Self {
chord,
swarm,
timeout,
}
}
pub fn get_timeout(&self) -> usize {
self.timeout
}
async fn notify_predecessor(&self) -> Result<()> {
let chord = self.chord.lock().await;
let message = MessageRelay::new(
Message::NotifyPredecessorSend(NotifyPredecessorSend { id: chord.id }),
&self.swarm.session(),
None,
None,
None,
MessageRelayMethod::SEND,
)?;
if chord.id != chord.successor.min() {
for s in chord.successor.list() {
self.swarm.send_message(&s.into(), message.clone()).await?;
}
Ok(())
} else {
Ok(())
}
}
async fn fix_fingers(&self) -> Result<()> {
let mut chord = self.chord.lock().await;
match chord.fix_fingers() {
Ok(action) => match action {
PeerRingAction::None => {
log::info!("wait to next round");
Ok(())
}
PeerRingAction::RemoteAction(
next,
PeerRingRemoteAction::FindSuccessorForFix(current),
) => {
let message = MessageRelay::new(
Message::FindSuccessorSend(FindSuccessorSend {
id: current,
for_fix: true,
}),
&self.swarm.session(),
None,
None,
None,
MessageRelayMethod::SEND,
)?;
self.swarm.send_message(&next.into(), message).await
}
_ => {
log::error!("Invalid PeerRing Action");
unreachable!();
}
},
Err(e) => {
log::error!("{:?}", e);
Err(e)
}
}
}
pub async fn stabilize(&self) -> Result<()> {
self.notify_predecessor().await?;
self.fix_fingers().await?;
Ok(())
}
}