Struct rings_core::dht::PeerRing
source · pub struct PeerRing {
pub did: Did,
pub finger: Arc<Mutex<FingerTable>>,
pub successor_seq: Arc<Mutex<SuccessorSeq>>,
pub predecessor: Arc<Mutex<Option<Did>>>,
pub storage: Arc<PersistenceStorage>,
pub cache: Arc<MemStorage<Did, VirtualNode>>,
}Expand description
PeerRing is used to help a node interact with other nodes. All nodes in rings network form a clockwise ring in the order of Did. This struct takes its name from that. PeerRing implemented Chord algorithm. PeerRing implemented ChordStorage protocol.
Fields§
§did: DidThe did of current node.
finger: Arc<Mutex<FingerTable>>FingerTable help node to find successor quickly.
successor_seq: Arc<Mutex<SuccessorSeq>>The next node on the ring. The [SuccessorSeq] may contain multiple node dids for fault tolerance. The min did should be same as the first element in finger table.
predecessor: Arc<Mutex<Option<Did>>>The did of previous node on the ring.
storage: Arc<PersistenceStorage>Local storage for ChordStorage.
cache: Arc<MemStorage<Did, VirtualNode>>Local cache for ChordStorage.
Implementations§
source§impl PeerRing
impl PeerRing
sourcepub async fn new_with_config(did: Did, succ_max: u8) -> Result<Self>
pub async fn new_with_config(did: Did, succ_max: u8) -> Result<Self>
Create a new Chord Ring with given successor_seq max num, and finger_size.
sourcepub fn new_with_storage(
did: Did,
succ_max: u8,
storage: PersistenceStorage
) -> Self
pub fn new_with_storage(
did: Did,
succ_max: u8,
storage: PersistenceStorage
) -> Self
Same as new with config, but with a given storage.
Examples found in repository?
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
pub fn build(self) -> Result<Swarm> {
let session_manager = {
if self.session_manager.is_some() {
Ok(self.session_manager.unwrap())
} else if self.key.is_some() {
SessionManager::new_with_seckey(&self.key.unwrap(), self.session_ttl)
} else {
Err(Error::SwarmBuildFailed(
"Should set session_manager or key".into(),
))
}
}?;
let dht_did = self
.dht_did
.ok_or_else(|| Error::SwarmBuildFailed("Should set session_manager or key".into()))?;
let dht = PeerRing::new_with_storage(dht_did, self.dht_succ_max, self.dht_storage);
Ok(Swarm {
pending_transports: Arc::new(Mutex::new(vec![])),
transports: MemStorage::new(),
transport_event_channel: Channel::new(),
ice_servers: self.ice_servers,
external_address: self.external_address,
dht: Arc::new(dht),
services: self.services,
session_manager,
})
}sourcepub fn lock_successor(&self) -> Result<MutexGuard<'_, SuccessorSeq>>
pub fn lock_successor(&self) -> Result<MutexGuard<'_, SuccessorSeq>>
Lock and return MutexGuard of successor sequence.
Examples found in repository?
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258
pub fn remove(&self, did: Did) -> Result<()> {
let mut finger = self.lock_finger()?;
let mut successor = self.lock_successor()?;
let mut predecessor = self.lock_predecessor()?;
if let Some(pid) = *predecessor {
if pid == did {
*predecessor = None;
}
}
finger.remove(did);
successor.remove(did);
if successor.is_empty() {
if let Some(x) = finger.first() {
successor.update(x);
}
}
Ok(())
}
/// Calculate bias of the Did on the ring.
pub fn bias(&self, did: Did) -> BiasId {
BiasId::new(self.did, did)
}
}
impl Chord<PeerRingAction> for PeerRing {
/// Join a ring containing a node identified by `did`.
/// This method is usually invoked to maintain successor sequence and finger table
/// after connect to another node.
///
/// This method will return a [RemoteAction::FindSuccessorForConnect] to the caller.
/// The caller will send it to the node identified by `did`, and let the node find
/// the successor of current node and make current node connect to that successor.
fn join(&self, did: Did) -> Result<PeerRingAction> {
if did == self.did {
return Ok(PeerRingAction::None);
}
let mut finger = self.lock_finger()?;
let mut successor = self.lock_successor()?;
finger.join(did);
if self.bias(did) < self.bias(successor.max()) || !successor.is_full() {
// 1) id should follows self.id
// 2) #fff should follow #001 because id space is a Finate Ring
// 3) #001 - #fff = #001 + -(#fff) = #001
successor.update(did);
}
Ok(PeerRingAction::RemoteAction(
did,
RemoteAction::FindSuccessorForConnect(self.did),
))
}
/// Find the successor of a Did.
/// May return a remote action for the successor is recorded in another node.
fn find_successor(&self, did: Did) -> Result<PeerRingAction> {
let successor = self.lock_successor()?;
let finger = self.lock_finger()?;
if successor.is_empty() || self.bias(did) <= self.bias(successor.min()) {
// If the did is closer to self than successor, return successor as the
// successor of that did.
Ok(PeerRingAction::Some(successor.min()))
} else {
// Otherwise, find the closest preceding node and ask it to find the successor.
let closest = finger.closest(did);
Ok(PeerRingAction::RemoteAction(
closest,
RemoteAction::FindSuccessor(did),
))
}
}More examples
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
async fn notify_predecessor(&self) -> Result<()> {
let (successor_min, successor_list) = {
let successor = self.chord.lock_successor()?;
(successor.min(), successor.list())
};
let msg = Message::NotifyPredecessorSend(NotifyPredecessorSend {
did: self.chord.did,
});
if self.chord.did != successor_min {
for s in successor_list {
self.swarm
.send_message(msg.clone(), s, self.swarm.did())
.await?;
}
Ok(())
} else {
Ok(())
}
}49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
async fn handle(
&self,
_ctx: &MessagePayload<Message>,
msg: &NotifyPredecessorReport,
) -> Result<()> {
// if successor: predecessor is between (id, successor]
// then update local successor
if self.swarm.get_and_check_transport(msg.did).await.is_none()
&& msg.did != self.swarm.did()
{
self.swarm.connect(msg.did).await?;
} else {
{
self.dht.lock_successor()?.update(msg.did)
}
if let Ok(PeerRingAction::RemoteAction(
next,
PeerRingRemoteAction::SyncVNodeWithSuccessor(data),
)) = self.dht.sync_with_successor(msg.did).await
{
self.send_direct_message(
Message::SyncVNodeWithSuccessor(SyncVNodeWithSuccessor { data }),
next,
)
.await?;
}
}
Ok(())
}228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265
async fn handle(&self, ctx: &MessagePayload<Message>, msg: &FindSuccessorReport) -> Result<()> {
let mut relay = ctx.relay.clone();
relay.relay(self.dht.did, None)?;
if relay.next_hop.is_some() {
return self.forward_payload(ctx, relay).await;
}
match &msg.handler {
// TODO: how to prevent `fix_finger_index` before got `FixFingerTable`?
FindSuccessorReportHandler::FixFingerTable => self.dht.lock_finger()?.set_fix(msg.did),
FindSuccessorReportHandler::Connect => {
if self.swarm.get_and_check_transport(msg.did).await.is_none()
&& msg.did != self.swarm.did()
{
self.swarm.connect(msg.did).await?;
}
}
FindSuccessorReportHandler::SyncStorage => {
self.dht.lock_successor()?.update(msg.did);
if let Ok(PeerRingAction::RemoteAction(
next,
PeerRingRemoteAction::SyncVNodeWithSuccessor(data),
)) = self.dht.sync_with_successor(msg.did).await
{
self.send_direct_message(
Message::SyncVNodeWithSuccessor(SyncVNodeWithSuccessor { data }),
next,
)
.await?;
return Ok(());
}
}
_ => {}
}
Ok(())
}sourcepub fn lock_finger(&self) -> Result<MutexGuard<'_, FingerTable>>
pub fn lock_finger(&self) -> Result<MutexGuard<'_, FingerTable>>
Lock and return MutexGuard of finger table.
Examples found in repository?
121 122 123 124 125 126 127 128 129 130 131 132
pub fn from_ring(name: &str, ring: &PeerRing) -> Result<Self> {
let address: HashStr = name.to_owned().into();
let did = Did::from_str(&address.inner())?;
let finger = ring.lock_finger()?;
Ok(Self {
name: name.to_owned(),
did,
finger: (*finger).clone(),
admin: None,
creator: ring.did,
})
}More examples
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334
pub fn remove(&self, did: Did) -> Result<()> {
let mut finger = self.lock_finger()?;
let mut successor = self.lock_successor()?;
let mut predecessor = self.lock_predecessor()?;
if let Some(pid) = *predecessor {
if pid == did {
*predecessor = None;
}
}
finger.remove(did);
successor.remove(did);
if successor.is_empty() {
if let Some(x) = finger.first() {
successor.update(x);
}
}
Ok(())
}
/// Calculate bias of the Did on the ring.
pub fn bias(&self, did: Did) -> BiasId {
BiasId::new(self.did, did)
}
}
impl Chord<PeerRingAction> for PeerRing {
/// Join a ring containing a node identified by `did`.
/// This method is usually invoked to maintain successor sequence and finger table
/// after connect to another node.
///
/// This method will return a [RemoteAction::FindSuccessorForConnect] to the caller.
/// The caller will send it to the node identified by `did`, and let the node find
/// the successor of current node and make current node connect to that successor.
fn join(&self, did: Did) -> Result<PeerRingAction> {
if did == self.did {
return Ok(PeerRingAction::None);
}
let mut finger = self.lock_finger()?;
let mut successor = self.lock_successor()?;
finger.join(did);
if self.bias(did) < self.bias(successor.max()) || !successor.is_full() {
// 1) id should follows self.id
// 2) #fff should follow #001 because id space is a Finate Ring
// 3) #001 - #fff = #001 + -(#fff) = #001
successor.update(did);
}
Ok(PeerRingAction::RemoteAction(
did,
RemoteAction::FindSuccessorForConnect(self.did),
))
}
/// Find the successor of a Did.
/// May return a remote action for the successor is recorded in another node.
fn find_successor(&self, did: Did) -> Result<PeerRingAction> {
let successor = self.lock_successor()?;
let finger = self.lock_finger()?;
if successor.is_empty() || self.bias(did) <= self.bias(successor.min()) {
// If the did is closer to self than successor, return successor as the
// successor of that did.
Ok(PeerRingAction::Some(successor.min()))
} else {
// Otherwise, find the closest preceding node and ask it to find the successor.
let closest = finger.closest(did);
Ok(PeerRingAction::RemoteAction(
closest,
RemoteAction::FindSuccessor(did),
))
}
}
/// Handle notification from a node that thinks it is the predecessor of current node.
/// The `did` in parameters is the Did of that node.
/// If that node is closer to current node or current node has no predecessor, set it to the did.
/// This method will return that did if it is set to the predecessor.
fn notify(&self, did: Did) -> Result<Option<Did>> {
let mut predecessor = self.lock_predecessor()?;
match *predecessor {
Some(pre) => {
// If the did is closer to self than predecessor, set it to the predecessor.
if self.bias(pre) < self.bias(did) {
*predecessor = Some(did);
Ok(Some(did))
} else {
Ok(None)
}
}
None => {
// Self has no predecessor, set it to the did directly.
*predecessor = Some(did);
Ok(Some(did))
}
}
}
/// Fix finger table by finding the successor for each finger.
/// According to the paper, this method should be called periodically.
/// According to the paper, only one finger should be fixed at a time.
fn fix_fingers(&self) -> Result<PeerRingAction> {
let mut fix_finger_index = self.lock_finger()?.fix_finger_index;
// Only one finger should be fixed at a time.
fix_finger_index += 1;
if fix_finger_index >= 159 {
fix_finger_index = 0;
}
// Get finger did.
let did: BigUint = (BigUint::from(self.did)
+ BigUint::from(2u16).pow(fix_finger_index.into()))
% BigUint::from(2u16).pow(160);
// Caution here that there are also locks in find_successor.
// You cannot lock finger table before calling find_successor.
// Have to lock_finger in each branch of the match.
match self.find_successor(did.into()) {
Ok(res) => match res {
PeerRingAction::Some(v) => {
let mut finger = self.lock_finger()?;
finger.fix_finger_index = fix_finger_index;
finger.set_fix(v);
Ok(PeerRingAction::None)
}
PeerRingAction::RemoteAction(a, RemoteAction::FindSuccessor(b)) => {
let mut finger = self.lock_finger()?;
finger.fix_finger_index = fix_finger_index;
Ok(PeerRingAction::RemoteAction(
a,
RemoteAction::FindSuccessorForFix(b),
))
}
_ => {
let mut finger = self.lock_finger()?;
finger.fix_finger_index = fix_finger_index;
tracing::error!("Invalid PeerRing Action");
Err(Error::PeerRingInvalidAction)
}
},
Err(e) => {
let mut finger = self.lock_finger()?;
finger.fix_finger_index = fix_finger_index;
Err(Error::PeerRingFindSuccessor(e.to_string()))
}
}
}228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265
async fn handle(&self, ctx: &MessagePayload<Message>, msg: &FindSuccessorReport) -> Result<()> {
let mut relay = ctx.relay.clone();
relay.relay(self.dht.did, None)?;
if relay.next_hop.is_some() {
return self.forward_payload(ctx, relay).await;
}
match &msg.handler {
// TODO: how to prevent `fix_finger_index` before got `FixFingerTable`?
FindSuccessorReportHandler::FixFingerTable => self.dht.lock_finger()?.set_fix(msg.did),
FindSuccessorReportHandler::Connect => {
if self.swarm.get_and_check_transport(msg.did).await.is_none()
&& msg.did != self.swarm.did()
{
self.swarm.connect(msg.did).await?;
}
}
FindSuccessorReportHandler::SyncStorage => {
self.dht.lock_successor()?.update(msg.did);
if let Ok(PeerRingAction::RemoteAction(
next,
PeerRingRemoteAction::SyncVNodeWithSuccessor(data),
)) = self.dht.sync_with_successor(msg.did).await
{
self.send_direct_message(
Message::SyncVNodeWithSuccessor(SyncVNodeWithSuccessor { data }),
next,
)
.await?;
return Ok(());
}
}
_ => {}
}
Ok(())
}sourcepub fn lock_predecessor(&self) -> Result<MutexGuard<'_, Option<Did>>>
pub fn lock_predecessor(&self) -> Result<MutexGuard<'_, Option<Did>>>
Lock and return MutexGuard of predecessor.
Examples found in repository?
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343
pub fn remove(&self, did: Did) -> Result<()> {
let mut finger = self.lock_finger()?;
let mut successor = self.lock_successor()?;
let mut predecessor = self.lock_predecessor()?;
if let Some(pid) = *predecessor {
if pid == did {
*predecessor = None;
}
}
finger.remove(did);
successor.remove(did);
if successor.is_empty() {
if let Some(x) = finger.first() {
successor.update(x);
}
}
Ok(())
}
/// Calculate bias of the Did on the ring.
pub fn bias(&self, did: Did) -> BiasId {
BiasId::new(self.did, did)
}
}
impl Chord<PeerRingAction> for PeerRing {
/// Join a ring containing a node identified by `did`.
/// This method is usually invoked to maintain successor sequence and finger table
/// after connect to another node.
///
/// This method will return a [RemoteAction::FindSuccessorForConnect] to the caller.
/// The caller will send it to the node identified by `did`, and let the node find
/// the successor of current node and make current node connect to that successor.
fn join(&self, did: Did) -> Result<PeerRingAction> {
if did == self.did {
return Ok(PeerRingAction::None);
}
let mut finger = self.lock_finger()?;
let mut successor = self.lock_successor()?;
finger.join(did);
if self.bias(did) < self.bias(successor.max()) || !successor.is_full() {
// 1) id should follows self.id
// 2) #fff should follow #001 because id space is a Finate Ring
// 3) #001 - #fff = #001 + -(#fff) = #001
successor.update(did);
}
Ok(PeerRingAction::RemoteAction(
did,
RemoteAction::FindSuccessorForConnect(self.did),
))
}
/// Find the successor of a Did.
/// May return a remote action for the successor is recorded in another node.
fn find_successor(&self, did: Did) -> Result<PeerRingAction> {
let successor = self.lock_successor()?;
let finger = self.lock_finger()?;
if successor.is_empty() || self.bias(did) <= self.bias(successor.min()) {
// If the did is closer to self than successor, return successor as the
// successor of that did.
Ok(PeerRingAction::Some(successor.min()))
} else {
// Otherwise, find the closest preceding node and ask it to find the successor.
let closest = finger.closest(did);
Ok(PeerRingAction::RemoteAction(
closest,
RemoteAction::FindSuccessor(did),
))
}
}
/// Handle notification from a node that thinks it is the predecessor of current node.
/// The `did` in parameters is the Did of that node.
/// If that node is closer to current node or current node has no predecessor, set it to the did.
/// This method will return that did if it is set to the predecessor.
fn notify(&self, did: Did) -> Result<Option<Did>> {
let mut predecessor = self.lock_predecessor()?;
match *predecessor {
Some(pre) => {
// If the did is closer to self than predecessor, set it to the predecessor.
if self.bias(pre) < self.bias(did) {
*predecessor = Some(did);
Ok(Some(did))
} else {
Ok(None)
}
}
None => {
// Self has no predecessor, set it to the did directly.
*predecessor = Some(did);
Ok(Some(did))
}
}
}
/// Fix finger table by finding the successor for each finger.
/// According to the paper, this method should be called periodically.
/// According to the paper, only one finger should be fixed at a time.
fn fix_fingers(&self) -> Result<PeerRingAction> {
let mut fix_finger_index = self.lock_finger()?.fix_finger_index;
// Only one finger should be fixed at a time.
fix_finger_index += 1;
if fix_finger_index >= 159 {
fix_finger_index = 0;
}
// Get finger did.
let did: BigUint = (BigUint::from(self.did)
+ BigUint::from(2u16).pow(fix_finger_index.into()))
% BigUint::from(2u16).pow(160);
// Caution here that there are also locks in find_successor.
// You cannot lock finger table before calling find_successor.
// Have to lock_finger in each branch of the match.
match self.find_successor(did.into()) {
Ok(res) => match res {
PeerRingAction::Some(v) => {
let mut finger = self.lock_finger()?;
finger.fix_finger_index = fix_finger_index;
finger.set_fix(v);
Ok(PeerRingAction::None)
}
PeerRingAction::RemoteAction(a, RemoteAction::FindSuccessor(b)) => {
let mut finger = self.lock_finger()?;
finger.fix_finger_index = fix_finger_index;
Ok(PeerRingAction::RemoteAction(
a,
RemoteAction::FindSuccessorForFix(b),
))
}
_ => {
let mut finger = self.lock_finger()?;
finger.fix_finger_index = fix_finger_index;
tracing::error!("Invalid PeerRing Action");
Err(Error::PeerRingInvalidAction)
}
},
Err(e) => {
let mut finger = self.lock_finger()?;
finger.fix_finger_index = fix_finger_index;
Err(Error::PeerRingFindSuccessor(e.to_string()))
}
}
}
/// called periodically. checks whether predecessor has failed.
fn check_predecessor(&self) -> Result<PeerRingAction> {
let predecessor = *self.lock_predecessor()?;
Ok(match predecessor {
Some(p) => PeerRingAction::RemoteAction(p, RemoteAction::CheckPredecessor),
None => PeerRingAction::None,
})
}More examples
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
async fn handle(
&self,
ctx: &MessagePayload<Message>,
msg: &NotifyPredecessorSend,
) -> Result<()> {
let mut relay = ctx.relay.clone();
let predecessor = { *self.dht.lock_predecessor()? };
relay.relay(self.dht.did, None)?;
self.dht.notify(msg.did)?;
if let Some(did) = predecessor {
if did != relay.origin() {
return self
.send_report_message(
Message::NotifyPredecessorReport(NotifyPredecessorReport { did }),
ctx.tx_id,
relay,
)
.await;
}
}
Ok(())
}sourcepub fn remove(&self, did: Did) -> Result<()>
pub fn remove(&self, did: Did) -> Result<()>
Remove a node from finger table. Also remove it from successor sequence. If successor_seq become empty, try setting the closest node to it.
sourcepub fn bias(&self, did: Did) -> BiasId
pub fn bias(&self, did: Did) -> BiasId
Calculate bias of the Did on the ring.
Examples found in repository?
217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422
fn join(&self, did: Did) -> Result<PeerRingAction> {
if did == self.did {
return Ok(PeerRingAction::None);
}
let mut finger = self.lock_finger()?;
let mut successor = self.lock_successor()?;
finger.join(did);
if self.bias(did) < self.bias(successor.max()) || !successor.is_full() {
// 1) id should follows self.id
// 2) #fff should follow #001 because id space is a Finate Ring
// 3) #001 - #fff = #001 + -(#fff) = #001
successor.update(did);
}
Ok(PeerRingAction::RemoteAction(
did,
RemoteAction::FindSuccessorForConnect(self.did),
))
}
/// Find the successor of a Did.
/// May return a remote action for the successor is recorded in another node.
fn find_successor(&self, did: Did) -> Result<PeerRingAction> {
let successor = self.lock_successor()?;
let finger = self.lock_finger()?;
if successor.is_empty() || self.bias(did) <= self.bias(successor.min()) {
// If the did is closer to self than successor, return successor as the
// successor of that did.
Ok(PeerRingAction::Some(successor.min()))
} else {
// Otherwise, find the closest preceding node and ask it to find the successor.
let closest = finger.closest(did);
Ok(PeerRingAction::RemoteAction(
closest,
RemoteAction::FindSuccessor(did),
))
}
}
/// Handle notification from a node that thinks it is the predecessor of current node.
/// The `did` in parameters is the Did of that node.
/// If that node is closer to current node or current node has no predecessor, set it to the did.
/// This method will return that did if it is set to the predecessor.
fn notify(&self, did: Did) -> Result<Option<Did>> {
let mut predecessor = self.lock_predecessor()?;
match *predecessor {
Some(pre) => {
// If the did is closer to self than predecessor, set it to the predecessor.
if self.bias(pre) < self.bias(did) {
*predecessor = Some(did);
Ok(Some(did))
} else {
Ok(None)
}
}
None => {
// Self has no predecessor, set it to the did directly.
*predecessor = Some(did);
Ok(Some(did))
}
}
}
/// Fix finger table by finding the successor for each finger.
/// According to the paper, this method should be called periodically.
/// According to the paper, only one finger should be fixed at a time.
fn fix_fingers(&self) -> Result<PeerRingAction> {
let mut fix_finger_index = self.lock_finger()?.fix_finger_index;
// Only one finger should be fixed at a time.
fix_finger_index += 1;
if fix_finger_index >= 159 {
fix_finger_index = 0;
}
// Get finger did.
let did: BigUint = (BigUint::from(self.did)
+ BigUint::from(2u16).pow(fix_finger_index.into()))
% BigUint::from(2u16).pow(160);
// Caution here that there are also locks in find_successor.
// You cannot lock finger table before calling find_successor.
// Have to lock_finger in each branch of the match.
match self.find_successor(did.into()) {
Ok(res) => match res {
PeerRingAction::Some(v) => {
let mut finger = self.lock_finger()?;
finger.fix_finger_index = fix_finger_index;
finger.set_fix(v);
Ok(PeerRingAction::None)
}
PeerRingAction::RemoteAction(a, RemoteAction::FindSuccessor(b)) => {
let mut finger = self.lock_finger()?;
finger.fix_finger_index = fix_finger_index;
Ok(PeerRingAction::RemoteAction(
a,
RemoteAction::FindSuccessorForFix(b),
))
}
_ => {
let mut finger = self.lock_finger()?;
finger.fix_finger_index = fix_finger_index;
tracing::error!("Invalid PeerRing Action");
Err(Error::PeerRingInvalidAction)
}
},
Err(e) => {
let mut finger = self.lock_finger()?;
finger.fix_finger_index = fix_finger_index;
Err(Error::PeerRingFindSuccessor(e.to_string()))
}
}
}
/// called periodically. checks whether predecessor has failed.
fn check_predecessor(&self) -> Result<PeerRingAction> {
let predecessor = *self.lock_predecessor()?;
Ok(match predecessor {
Some(p) => PeerRingAction::RemoteAction(p, RemoteAction::CheckPredecessor),
None => PeerRingAction::None,
})
}
}
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl ChordStorage<PeerRingAction> for PeerRing {
/// Look up a VirtualNode by its Did.
/// Always finds resource by finger table, ignoring the local cache.
/// If the `vid` is between current node and its successor, its resource should be
/// stored in current node.
async fn lookup(&self, vid: Did) -> Result<PeerRingAction> {
match self.find_successor(vid) {
// Resource should be stored in current node.
Ok(PeerRingAction::Some(_)) => match self.storage.get(&vid).await {
Ok(v) => Ok(PeerRingAction::SomeVNode(v)),
Err(_) => Ok(PeerRingAction::None),
},
// Resource is stored in other nodes.
// Return an action to describe how to find it.
Ok(PeerRingAction::RemoteAction(n, RemoteAction::FindSuccessor(id))) => {
Ok(PeerRingAction::RemoteAction(n, RemoteAction::FindVNode(id)))
}
Ok(a) => Err(Error::PeerRingUnexpectedAction(a)),
Err(e) => Err(e),
}
}
/// Store `vnode` if it's between current node and the successor of current node,
/// otherwise find the responsible node and return as Action.
async fn store(&self, vnode: VirtualNode) -> Result<PeerRingAction> {
let vid = vnode.did;
match self.find_successor(vid) {
// `vnode` should be stored in current node.
Ok(PeerRingAction::Some(_)) => match self.storage.get(&vid).await {
Ok(v) => {
let _ = self
.storage
.put(&vid, &VirtualNode::concat(&v, &vnode)?)
.await?;
Ok(PeerRingAction::None)
}
Err(_) => {
let _ = self.storage.put(&vid, &vnode).await?;
Ok(PeerRingAction::None)
}
},
// `vnode` should be stored in other nodes.
// Return an action to describe how to store it.
Ok(PeerRingAction::RemoteAction(n, RemoteAction::FindSuccessor(_))) => Ok(
PeerRingAction::RemoteAction(n, RemoteAction::FindAndStore(vnode)),
),
Ok(a) => Err(Error::PeerRingUnexpectedAction(a)),
Err(e) => Err(e),
}
}
/// When the successor of a node is updated, it needs to check if there are
/// `VirtualNode`s that are no longer between current node and `new_successor`,
/// and sync them to the new successor.
async fn sync_with_successor(&self, new_successor: Did) -> Result<PeerRingAction> {
let mut data = Vec::<VirtualNode>::new();
let all_items: Vec<(Did, VirtualNode)> = self.storage.get_all().await?;
// Pop out all items that are not between current node and `new_successor`.
for (vid, vnode) in all_items.iter() {
if self.bias(*vid) > self.bias(new_successor) && self.storage.remove(vid).await.is_ok()
{
data.push(vnode.clone());
}
}
if !data.is_empty() {
Ok(PeerRingAction::RemoteAction(
new_successor,
RemoteAction::SyncVNodeWithSuccessor(data), // TODO: This might be too large.
))
} else {
Ok(PeerRingAction::None)
}
}Trait Implementations§
source§impl Chord<PeerRingAction> for PeerRing
impl Chord<PeerRingAction> for PeerRing
source§fn join(&self, did: Did) -> Result<PeerRingAction>
fn join(&self, did: Did) -> Result<PeerRingAction>
Join a ring containing a node identified by did.
This method is usually invoked to maintain successor sequence and finger table
after connect to another node.
This method will return a RemoteAction::FindSuccessorForConnect to the caller.
The caller will send it to the node identified by did, and let the node find
the successor of current node and make current node connect to that successor.
source§fn find_successor(&self, did: Did) -> Result<PeerRingAction>
fn find_successor(&self, did: Did) -> Result<PeerRingAction>
Find the successor of a Did. May return a remote action for the successor is recorded in another node.
source§fn notify(&self, did: Did) -> Result<Option<Did>>
fn notify(&self, did: Did) -> Result<Option<Did>>
Handle notification from a node that thinks it is the predecessor of current node.
The did in parameters is the Did of that node.
If that node is closer to current node or current node has no predecessor, set it to the did.
This method will return that did if it is set to the predecessor.
source§fn fix_fingers(&self) -> Result<PeerRingAction>
fn fix_fingers(&self) -> Result<PeerRingAction>
Fix finger table by finding the successor for each finger. According to the paper, this method should be called periodically. According to the paper, only one finger should be fixed at a time.
source§fn check_predecessor(&self) -> Result<PeerRingAction>
fn check_predecessor(&self) -> Result<PeerRingAction>
called periodically. checks whether predecessor has failed.
source§impl ChordStorage<PeerRingAction> for PeerRing
impl ChordStorage<PeerRingAction> for PeerRing
source§fn lookup<'life0, 'async_trait>(
&'life0 self,
vid: Did
) -> Pin<Box<dyn Future<Output = Result<PeerRingAction>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn lookup<'life0, 'async_trait>(
&'life0 self,
vid: Did
) -> Pin<Box<dyn Future<Output = Result<PeerRingAction>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Look up a VirtualNode by its Did.
Always finds resource by finger table, ignoring the local cache.
If the vid is between current node and its successor, its resource should be
stored in current node.
source§fn store<'life0, 'async_trait>(
&'life0 self,
vnode: VirtualNode
) -> Pin<Box<dyn Future<Output = Result<PeerRingAction>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn store<'life0, 'async_trait>(
&'life0 self,
vnode: VirtualNode
) -> Pin<Box<dyn Future<Output = Result<PeerRingAction>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Store vnode if it’s between current node and the successor of current node,
otherwise find the responsible node and return as Action.
source§fn sync_with_successor<'life0, 'async_trait>(
&'life0 self,
new_successor: Did
) -> Pin<Box<dyn Future<Output = Result<PeerRingAction>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn sync_with_successor<'life0, 'async_trait>(
&'life0 self,
new_successor: Did
) -> Pin<Box<dyn Future<Output = Result<PeerRingAction>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
When the successor of a node is updated, it needs to check if there are
VirtualNodes that are no longer between current node and new_successor,
and sync them to the new successor.
source§fn local_cache_set(&self, vnode: VirtualNode)
fn local_cache_set(&self, vnode: VirtualNode)
Cache fetched vnode locally.
source§fn local_cache_get(&self, vid: Did) -> Option<VirtualNode>
fn local_cache_get(&self, vid: Did) -> Option<VirtualNode>
Get vnode from local cache.