1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331
use async_recursion::async_recursion;
use error_stack::{Report, Result, ResultExt};
use crate::client::{ClientError, ClientsPool};
use crate::node::store::{Db, NodeStore};
use crate::node::Finger;
use crate::{Client, Node, NodeId};
use std::net::SocketAddr;
use std::sync::Arc;
use std::vec;
#[cfg(test)]
pub(crate) mod tests;
#[derive(Debug)]
pub struct NodeService<C: Client> {
id: NodeId,
addr: SocketAddr,
store: NodeStore,
clients: ClientsPool<C>,
}
impl<C: Client + Clone + Sync + Send + 'static> NodeService<C> {
/// Create a new node service
///
/// # Arguments
///
/// * `socket_addr` - The address of the node
/// * `replication_factor` - The number of successors to keep track of
pub fn new(socket_addr: SocketAddr, replication_factor: usize) -> Self {
let id: NodeId = socket_addr.into();
Self::with_id(id, socket_addr, replication_factor)
}
fn with_id(id: impl Into<NodeId>, addr: SocketAddr, replication_factor: usize) -> Self {
let id = id.into();
let store = NodeStore::new(Node::with_id(id, addr), replication_factor);
Self {
id,
addr,
store,
clients: ClientsPool::default(),
}
}
pub fn id(&self) -> NodeId {
self.id
}
pub(crate) fn store(&self) -> Db {
self.store.db()
}
/// Find the successor of the given id.
///
/// If the given id is in the range of the current node and its successor, the successor is returned.
/// Otherwise, the successor of the closest preceding node is returned.
///
/// # Arguments
///
/// * `id` - The id to find the successor for
pub async fn find_successor(&self, id: NodeId) -> Result<Node, error::ServiceError> {
if let Some(successor) = self.find_immediate_successor(id).await? {
Ok(successor)
} else {
self.find_successor_using_finger_table(id, None).await
}
}
/// Find the successor of the given id using the successor list.
async fn find_immediate_successor(
&self,
id: NodeId,
) -> Result<Option<Node>, error::ServiceError> {
let successors = self.store().successor_list();
for successor in successors {
if Node::is_between_on_ring(id.0, self.id.0, successor.id.0) {
return Ok(Some(successor));
}
}
Ok(None)
}
/// Find the successor of the given id using the finger table.
/// This method is called recursively until the successor is found or until the closest preceding node is the current node.
///
/// If a node fails to respond, it's id is used to find new closest preceding node.
/// If all nodes fail to respond, an error is returned.
///
/// # Arguments
///
/// * `id` - The id to find the successor for
/// * `failing_node` - The id of the node that failed to respond. It is used to find the new closest preceding node.
#[async_recursion]
async fn find_successor_using_finger_table(
&self,
id: NodeId,
failing_node: Option<NodeId>,
) -> Result<Node, error::ServiceError> {
let search_id = failing_node.unwrap_or(id);
let n = self.closest_preceding_node(search_id);
if n.id == self.id {
let error = format!("Cannot find successor of id '{}' using finger table", id);
log::error!("{}", error);
return Err(Report::new(error::ServiceError::Unexpected));
}
let client: Arc<C> = self.client(&n).await;
match client.find_successor(id).await {
Ok(successor) => Result::Ok(successor),
Err(report) => match (*report.current_context()).clone() {
ClientError::ConnectionFailed(_) => {
self.find_successor_using_finger_table(id, Some(n.id)).await
}
err => Result::Err(report.change_context(err.into())),
},
}
}
pub async fn get_predecessor(&self) -> Result<Option<Node>, error::ServiceError> {
Ok(self.store().predecessor())
}
pub async fn get_successor(&self) -> Result<Node, error::ServiceError> {
Ok(self.store().successor())
}
pub async fn get_successor_list(&self) -> Result<Vec<Node>, error::ServiceError> {
Ok(self.store().successor_list())
}
/// Join the chord ring.
///
/// This method is used to join the chord ring. It will find the successor of its own id
/// and set it as the successor.
///
/// # Arguments
///
/// * `node` - The node to join the ring with. It's an existing node in the ring.
pub async fn join(&self, node: Node) -> Result<(), error::ServiceError> {
let client: Arc<C> = self.client(&node).await;
let successor = client
.find_successor(self.id)
.await
.change_context(error::ServiceError::Unexpected)?;
self.store().set_successor(successor);
Ok(())
}
/// Notify the node about a potential new predecessor.
///
/// If the predecessor is not set or the given node is in the range of the current node and the
/// predecessor, the predecessor is set to the given node.
///
/// # Arguments
///
/// * `node` - The node which might be the new predecessor
pub fn notify(&self, node: Node) {
let predecessor = self.store().predecessor();
if predecessor.is_none()
|| Node::is_between_on_ring(node.id.0, predecessor.unwrap().id.0, self.id.0)
{
self.store().set_predecessor(node);
}
}
/// Stabilize the node
///
/// This method is used to stabilize the node. It will check if a predecessor of the successor
/// is in the range of the current node and its successor. If so, the successor will be set to
/// the retrieved predecessor.
///
/// It will also notify the successor about the current node.
///
/// > **Note**
/// >
/// > This method should be called periodically.
pub async fn stabilize(&self) -> Result<(), error::ServiceError> {
let successor = self.store().successor();
let client: Arc<C> = self.client(&successor).await;
let result = client.predecessor().await;
drop(client);
if let Ok(Some(x)) = result {
if Node::is_between_on_ring(x.id.0, self.id.0, self.store().successor().id.0) {
self.store().set_successor(x);
}
}
let successor = self.store().successor();
let client: Arc<C> = self.client(&successor).await;
client
.notify(Node {
id: self.id,
addr: self.addr,
})
.await
.change_context(error::ServiceError::Unexpected)?;
Ok(())
}
pub async fn reconcile_successors(&self) {
let successor = self.store().successor();
let client: Arc<C> = self.client(&successor).await;
match client.successor_list().await {
Ok(successors) => {
let mut new_successors = vec![successor];
new_successors.extend(successors);
self.store().set_successor_list(new_successors);
}
Err(err) => {
log::info!(
"Successor {:?} is down, removing from the successor list",
successor.addr
);
log::debug!("Successor {:?} error: {err:?}", successor.addr);
let successors = self.store().successor_list();
self.store().set_successor_list(successors[1..].to_vec());
}
}
}
/// Check predecessor
///
/// This method is used to check if the predecessor is still alive. If not, the predecessor is
/// set to `None`.
///
/// > **Note**
/// >
/// > This method should be called periodically.
pub async fn check_predecessor(&self) -> Result<(), error::ServiceError> {
if let Some(predecessor) = self.store().predecessor() {
let client: Arc<C> = self.client(&predecessor).await;
match client.ping().await {
Ok(_) => Ok(()),
Err(err) => {
log::info!(
"Predecessor {:?} is down, removing. Error: {:?}",
predecessor.addr,
err
);
self.store().unset_predecessor();
Ok(())
}
}
} else {
Ok(())
}
}
/// Fix fingers
///
/// This method is used to fix the fingers. It iterates over all fingers and re-requests the
/// successor of the finger's id. Then sets the successor of the finger to the retrieved node.
///
/// > **Note**
/// >
/// > This method should be called periodically.
pub async fn fix_fingers(&self) {
for i in 0..Finger::FINGER_TABLE_SIZE {
let finger_id = Finger::finger_id(self.id.0, (i + 1) as u8);
let result = { self.find_successor(NodeId(finger_id)).await };
if let Ok(successor) = result {
self.store().update_finger(i.into(), successor)
} else {
log::error!("Failed to fix finger: {:?}", result.unwrap_err());
}
}
}
/// Get finger table
///
/// This method is used to get the finger table of the node.
pub fn finger_table(&self) -> Vec<Finger> {
self.store().finger_table()
}
/// Get closest preceding node
///
/// This method is used to get the closest preceding node of the given id.
/// It will iterate over the finger table and return the closest node to the given id.
///
/// # Arguments
///
/// * `id` - The id to find the closest preceding node for
///
/// # Returns
///
/// The closest preceding node
fn closest_preceding_node(&self, id: NodeId) -> Node {
self.store()
.closest_preceding_node(self.id.0, id.0)
.unwrap_or(Node::with_id(self.id, self.addr))
}
async fn client(&self, node: &Node) -> Arc<C> {
self.clients.get_or_init(node).await
}
}
pub mod error {
use thiserror::Error;
use crate::client;
#[derive(Debug, Error)]
pub enum ServiceError {
#[error("Unexpected error")]
Unexpected,
#[error("Client disconnected")]
ClientDisconnected,
}
impl From<client::ClientError> for ServiceError {
fn from(err: client::ClientError) -> Self {
match err {
client::ClientError::ConnectionFailed(_) => Self::ClientDisconnected,
_ => Self::Unexpected,
}
}
}
}