1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
use async_recursion::async_recursion;
use error_stack::{Report, Result, ResultExt};

use crate::client::{ClientError, ClientsPool};
use crate::node::store::{Db, NodeStore};
use crate::node::Finger;
use crate::{Client, Node, NodeId};
use std::net::SocketAddr;
use std::sync::Arc;
use std::vec;

#[cfg(test)]
pub(crate) mod tests;

#[derive(Debug)]
pub struct NodeService<C: Client> {
    id: NodeId,
    addr: SocketAddr,
    store: NodeStore,

    clients: ClientsPool<C>,
}

impl<C: Client + Clone + Sync + Send + 'static> NodeService<C> {
    /// Create a new node service
    ///
    /// # Arguments
    ///
    /// * `socket_addr` - The address of the node
    /// * `replication_factor` - The number of successors to keep track of
    pub fn new(socket_addr: SocketAddr, replication_factor: usize) -> Self {
        let id: NodeId = socket_addr.into();
        Self::with_id(id, socket_addr, replication_factor)
    }

    fn with_id(id: impl Into<NodeId>, addr: SocketAddr, replication_factor: usize) -> Self {
        let id = id.into();
        let store = NodeStore::new(Node::with_id(id, addr), replication_factor);
        Self {
            id,
            addr,
            store,
            clients: ClientsPool::default(),
        }
    }

    pub fn id(&self) -> NodeId {
        self.id
    }

    pub(crate) fn store(&self) -> Db {
        self.store.db()
    }

    /// Find the successor of the given id.
    ///
    /// If the given id is in the range of the current node and its successor, the successor is returned.
    /// Otherwise, the successor of the closest preceding node is returned.
    ///
    /// # Arguments
    ///
    /// * `id` - The id to find the successor for
    pub async fn find_successor(&self, id: NodeId) -> Result<Node, error::ServiceError> {
        if let Some(successor) = self.find_immediate_successor(id).await? {
            Ok(successor)
        } else {
            self.find_successor_using_finger_table(id, None).await
        }
    }

    /// Find the successor of the given id using the successor list.
    async fn find_immediate_successor(
        &self,
        id: NodeId,
    ) -> Result<Option<Node>, error::ServiceError> {
        let successors = self.store().successor_list();
        for successor in successors {
            if Node::is_between_on_ring(id.0, self.id.0, successor.id.0) {
                return Ok(Some(successor));
            }
        }

        Ok(None)
    }

    /// Find the successor of the given id using the finger table.
    /// This method is called recursively until the successor is found or until the closest preceding node is the current node.
    ///
    /// If a node fails to respond, it's id is used to find new closest preceding node.
    /// If all nodes fail to respond, an error is returned.
    ///
    /// # Arguments
    ///
    /// * `id` - The id to find the successor for
    /// * `failing_node` - The id of the node that failed to respond. It is used to find the new closest preceding node.    
    #[async_recursion]
    async fn find_successor_using_finger_table(
        &self,
        id: NodeId,
        failing_node: Option<NodeId>,
    ) -> Result<Node, error::ServiceError> {
        let search_id = failing_node.unwrap_or(id);
        let n = self.closest_preceding_node(search_id);

        if n.id == self.id {
            let error = format!("Cannot find successor of id '{}' using finger table", id);
            log::error!("{}", error);
            return Err(Report::new(error::ServiceError::Unexpected));
        }

        let client: Arc<C> = self.client(&n).await;
        match client.find_successor(id).await {
            Ok(successor) => Result::Ok(successor),
            Err(report) => match (*report.current_context()).clone() {
                ClientError::ConnectionFailed(_) => {
                    self.find_successor_using_finger_table(id, Some(n.id)).await
                }
                err => Result::Err(report.change_context(err.into())),
            },
        }
    }

    pub async fn get_predecessor(&self) -> Result<Option<Node>, error::ServiceError> {
        Ok(self.store().predecessor())
    }

    pub async fn get_successor(&self) -> Result<Node, error::ServiceError> {
        Ok(self.store().successor())
    }

    pub async fn get_successor_list(&self) -> Result<Vec<Node>, error::ServiceError> {
        Ok(self.store().successor_list())
    }

    /// Join the chord ring.
    ///
    /// This method is used to join the chord ring. It will find the successor of its own id
    /// and set it as the successor.
    ///
    /// # Arguments
    ///
    /// * `node` - The node to join the ring with. It's an existing node in the ring.
    pub async fn join(&self, node: Node) -> Result<(), error::ServiceError> {
        let client: Arc<C> = self.client(&node).await;
        let successor = client
            .find_successor(self.id)
            .await
            .change_context(error::ServiceError::Unexpected)?;
        self.store().set_successor(successor);

        Ok(())
    }

    /// Notify the node about a potential new predecessor.
    ///
    /// If the predecessor is not set or the given node is in the range of the current node and the
    /// predecessor, the predecessor is set to the given node.
    ///
    /// # Arguments
    ///
    /// * `node` - The node which might be the new predecessor
    pub fn notify(&self, node: Node) {
        let predecessor = self.store().predecessor();
        if predecessor.is_none()
            || Node::is_between_on_ring(node.id.0, predecessor.unwrap().id.0, self.id.0)
        {
            self.store().set_predecessor(node);
        }
    }

    /// Stabilize the node
    ///
    /// This method is used to stabilize the node. It will check if a predecessor of the successor
    /// is in the range of the current node and its successor. If so, the successor will be set to
    /// the retrieved predecessor.
    ///
    /// It will also notify the successor about the current node.
    ///
    /// > **Note**
    /// >
    /// > This method should be called periodically.
    pub async fn stabilize(&self) -> Result<(), error::ServiceError> {
        let successor = self.store().successor();
        let client: Arc<C> = self.client(&successor).await;
        let result = client.predecessor().await;
        drop(client);

        if let Ok(Some(x)) = result {
            if Node::is_between_on_ring(x.id.0, self.id.0, self.store().successor().id.0) {
                self.store().set_successor(x);
            }
        }

        let successor = self.store().successor();
        let client: Arc<C> = self.client(&successor).await;

        client
            .notify(Node {
                id: self.id,
                addr: self.addr,
            })
            .await
            .change_context(error::ServiceError::Unexpected)?;

        Ok(())
    }

    pub async fn reconcile_successors(&self) {
        let successor = self.store().successor();
        let client: Arc<C> = self.client(&successor).await;

        match client.successor_list().await {
            Ok(successors) => {
                let mut new_successors = vec![successor];
                new_successors.extend(successors);

                self.store().set_successor_list(new_successors);
            }
            Err(err) => {
                log::info!(
                    "Successor {:?} is down, removing from the successor list",
                    successor.addr
                );
                log::debug!("Successor {:?} error: {err:?}", successor.addr);

                let successors = self.store().successor_list();
                self.store().set_successor_list(successors[1..].to_vec());
            }
        }
    }

    /// Check predecessor
    ///
    /// This method is used to check if the predecessor is still alive. If not, the predecessor is
    /// set to `None`.
    ///
    /// > **Note**
    /// >
    /// > This method should be called periodically.
    pub async fn check_predecessor(&self) -> Result<(), error::ServiceError> {
        if let Some(predecessor) = self.store().predecessor() {
            let client: Arc<C> = self.client(&predecessor).await;
            match client.ping().await {
                Ok(_) => Ok(()),
                Err(err) => {
                    log::info!(
                        "Predecessor {:?} is down, removing. Error: {:?}",
                        predecessor.addr,
                        err
                    );
                    self.store().unset_predecessor();
                    Ok(())
                }
            }
        } else {
            Ok(())
        }
    }

    /// Fix fingers
    ///
    /// This method is used to fix the fingers. It iterates over all fingers and re-requests the
    /// successor of the finger's id. Then sets the successor of the finger to the retrieved node.
    ///
    /// > **Note**
    /// >
    /// > This method should be called periodically.
    pub async fn fix_fingers(&self) {
        for i in 0..Finger::FINGER_TABLE_SIZE {
            let finger_id = Finger::finger_id(self.id.0, (i + 1) as u8);
            let result = { self.find_successor(NodeId(finger_id)).await };
            if let Ok(successor) = result {
                self.store().update_finger(i.into(), successor)
            } else {
                log::error!("Failed to fix finger: {:?}", result.unwrap_err());
            }
        }
    }

    /// Get finger table
    ///
    /// This method is used to get the finger table of the node.
    pub fn finger_table(&self) -> Vec<Finger> {
        self.store().finger_table()
    }

    /// Get closest preceding node
    ///
    /// This method is used to get the closest preceding node of the given id.
    /// It will iterate over the finger table and return the closest node to the given id.
    ///
    /// # Arguments
    ///
    /// * `id` - The id to find the closest preceding node for
    ///
    /// # Returns
    ///
    /// The closest preceding node
    fn closest_preceding_node(&self, id: NodeId) -> Node {
        self.store()
            .closest_preceding_node(self.id.0, id.0)
            .unwrap_or(Node::with_id(self.id, self.addr))
    }

    async fn client(&self, node: &Node) -> Arc<C> {
        self.clients.get_or_init(node).await
    }
}

pub mod error {
    use thiserror::Error;

    use crate::client;

    #[derive(Debug, Error)]
    pub enum ServiceError {
        #[error("Unexpected error")]
        Unexpected,
        #[error("Client disconnected")]
        ClientDisconnected,
    }

    impl From<client::ClientError> for ServiceError {
        fn from(err: client::ClientError) -> Self {
            match err {
                client::ClientError::ConnectionFailed(_) => Self::ClientDisconnected,
                _ => Self::Unexpected,
            }
        }
    }
}