1use std::{
2 collections::{HashMap},
3 sync::{Arc, RwLock}
4};
5use rand::{Rng, SeedableRng};
6use tarpc::{
7 context,
8 tokio_serde::formats::Bincode,
9 server::Channel,
10 serde::Serialize,
11 serde::Deserialize
12};
13use futures::{future, prelude::*};
14use log::{info, warn, debug};
15use super::{
16 ring::*,
17 config::*,
18 data_store::*,
19 error::{
20 *,
21 DhtError::*
22 }
23};
24use crate::{rpc::*, server::ServerManager};
25use super::calculate_hash;
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct Node {
30 pub id: Digest,
31 pub addr: String
32}
33
34impl std::fmt::Display for Node {
35 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36 write!(f, "Node({}, {})", self.id, self.addr)
37 }
38}
39
40#[derive(Clone)]
41pub struct NodeServer {
42 node: Node,
43 store: DataStore,
44 config: Config,
45 predecessor: Arc<RwLock<Option<Node>>>,
46 finger_table: Arc<RwLock<Vec<Node>>>,
48 successor_list: Arc<RwLock<Vec<Node>>>,
50 connection_map: Arc<RwLock<HashMap<Digest, NodeServiceClient>>>
52}
53
54impl NodeServer {
55 pub fn new(node: Node, config: Config) -> Self {
56 assert!(config.replication_factor != 0, "replication_factor equal to 0");
57 assert!(config.replication_factor <= config.fault_tolerance + 1, "replication_factor greater than fault_tolerance + 1");
58
59 let finger_table = vec![node.clone(); NUM_BITS];
62 let successor_list = vec![node.clone(); config.fault_tolerance as usize + 1];
63
64 NodeServer {
65 node: node.clone(),
66 store: DataStore::new(),
67 config: config,
68 predecessor: Arc::new(RwLock::new(Some(node.clone()))),
69 finger_table: Arc::new(RwLock::new(finger_table)),
70 successor_list: Arc::new(RwLock::new(successor_list)),
71 connection_map: Arc::new(RwLock::new(HashMap::new()))
72 }
73 }
74
75 pub fn get_successor(&self) -> Node {
76 self.successor_list.read().unwrap()[0].clone()
77 }
78
79 pub fn get_successor_list(&self) -> Vec<Node> {
80 self.successor_list.read().unwrap().clone()
81 }
82
83 pub fn set_successor_list(&self, succ_list: Vec<Node>) {
84 *self.successor_list.write().unwrap() = succ_list;
85 }
86
87 pub fn get_predecessor(&self) -> Option<Node> {
88 self.predecessor.read().unwrap().clone()
89 }
90
91 pub fn set_predecessor(&self, node: Option<Node>) {
92 *self.predecessor.write().unwrap() = node;
93 }
94
95 pub async fn start(&mut self, join_node: Option<Node>) -> DhtResult<ServerManager> {
98 let (tx, rx) = tokio::sync::watch::channel(false);
100
101 let mut listener = tarpc::serde_transport::tcp::listen(&self.node.addr, Bincode::default).await?;
103 let server = self.clone();
104 let mut listener_rx = rx.clone();
105 let listener_handle = tokio::spawn(async move {
107 listener.config_mut().max_frame_length(usize::MAX);
108 let listener_fut = listener
109 .filter_map(|r| future::ready(r.ok()))
110 .map(tarpc::server::BaseChannel::with_defaults)
111 .map(|channel| async {
112 channel.execute(server.clone().serve()).await;
114 })
115 .buffer_unordered(server.config.max_connections as usize)
116 .for_each(|_| async {});
117
118 debug!("{}: listening", server.node);
120
121 tokio::select! {
122 _ = listener_fut => {
123 warn!("{}: listener terminated", server.node);
124 },
125 _ = listener_rx.changed() => {
126 debug!("{}: listener stopped gracefully", server.node);
127 }
128 };
129 });
130
131 if let Some(n) = join_node.as_ref() {
133 match self.join(&n).await {
134 Ok(_) => (),
135 Err(e) => {
136 return Err(JoinFailure {
137 node: n.clone(),
138 message: e.to_string()
139 });
140 }
141 };
142 }
143
144 let mut server = self.clone();
146 let mut stabilize_rx = rx.clone();
147 let stabilize_interval = self.config.stabilize_interval;
148 let stabilize_handle = tokio::spawn(async move {
149 if stabilize_interval > 0 {
150 let mut interval = tokio::time::interval(
151 tokio::time::Duration::from_millis(stabilize_interval)
152 );
153
154 tokio::select! {
155 _ = async {
156 interval.tick().await;
157 server.stabilize().await;
158 } => (),
159 _ = stabilize_rx.changed() => {
160 debug!("{}: stabilize task stopped gracefully", server.node);
161 }
162 };
163 }
164 });
165
166 let mut server = self.clone();
168 let mut fix_finger_rx = rx.clone();
169 let fix_finger_interval = self.config.fix_finger_interval;
170 let fix_finger_handle = tokio::spawn(async move {
171 if fix_finger_interval > 0 {
172 let mut interval = tokio::time::interval(
173 tokio::time::Duration::from_millis(fix_finger_interval)
174 );
175 let mut rng = rand::prelude::StdRng::from_entropy();
177
178 tokio::select! {
179 _ = async {
180 interval.tick().await;
181 let index = rng.gen_range(1..NUM_BITS);
182 server.fix_finger(index).await;
183 } => (),
184 _ = fix_finger_rx.changed() => {
185 debug!("{}: fix_finger task stopped gracefully", server.node);
186 }
187 };
188 }
189 });
190
191 info!("{}: listening at {}", self.node, self.node.addr);
192 let joined_handle = future::join_all(vec![
194 listener_handle,
195 stabilize_handle,
196 fix_finger_handle
197 ]);
198
199 Ok(ServerManager {
200 handle: joined_handle,
201 tx: tx
202 })
203 }
204
205 pub fn finger_table_start(&self, k: usize) -> u64 {
208 self.node.id.wrapping_add(1 << k)
209 }
210
211 async fn get_connection(&mut self, node: &Node) -> DhtResult<NodeServiceClient> {
212 {
214 let map = self.connection_map.read().unwrap();
215 if let Some(c) = map.get(&node.id) {
216 return Ok(c.clone());
218 }
219 }
220 {
221 debug!("{}: connecting to {}", self.node, node);
222 let c = crate::client::setup_client(&node.addr).await?;
223 debug!("{}: connected to {}", self.node, node);
224 let mut map = self.connection_map.write().unwrap();
225 map.insert(node.id, c.clone());
226 return Ok(c);
227 }
228 }
229
230 pub fn remove_connection(&self, node: &Node) {
232 let mut map = self.connection_map.write().unwrap();
233 map.remove(&node.id);
234 }
235
236 pub async fn join(&mut self, node: &Node) -> DhtResult<()> {
238 debug!("{}: joining {}", self.node, node);
239 self.set_predecessor(None);
240 let ctx = context::current();
241 let n = self.get_connection(node).await?;
242 let succ_list = n.find_successor_list_rpc(ctx, self.node.id).await?;
243 self.set_successor_list(succ_list);
244 debug!("{}: joined {}", self.node, node);
245 Ok(())
246 }
247
248 pub async fn stabilize(&mut self) {
250 let ctx = context::current();
251
252 let successor_list = self.get_successor_list();
253 for mut succ in successor_list.into_iter() {
254 let mut n = match self.get_connection(&succ).await {
255 Ok(v) => v,
256 Err(e) => {
257 warn!("{}: failed to connect to {}: {}", self.node, succ, e);
258 continue;
260 }
261 };
262
263 match n.get_predecessor_rpc(ctx).await {
264 Ok(pred) => {
265 let x = match pred {
267 Some(v) => v,
268 None => {
269 warn!("{}: empty predecessor of successor {}", self.node, succ);
270 return;
271 }
272 };
273 if in_range(x.id, self.node.id, succ.id) {
274 n = match self.get_connection(&x).await {
276 Ok(v) => v,
277 Err(e) => {
278 warn!("{}: failed to connect to {}: {}", self.node, succ, e);
279 continue;
281 }
282 };
283 succ = x;
285 }
286
287 if let Ok(mut new_succ_list) = n.get_successor_list_rpc(ctx).await {
290 new_succ_list.pop();
291 new_succ_list.insert(0, succ);
292 self.set_successor_list(new_succ_list);
293 n.notify_rpc(ctx, self.node.clone()).await.unwrap_or(());
295 }
296
297 return;
298 },
299 Err(e) => {
300 warn!("{}: fail to stabilize: {}", self.node, e);
301 self.remove_connection(&succ);
303 }
304 }
305 }
306 panic!("{}: no live successors!", self.node);
307 }
308
309 pub async fn fix_finger(&mut self, index: usize) {
311 match self.find_successor_list(self.finger_table_start(index)).await {
312 Ok(succ) => {
313 let mut table = self.finger_table.write().unwrap();
314 table[index] = succ[0].clone();
315 },
316 Err(e) => {
317 warn!("{}: failed to fix finger: {}", self.node, e);
318 }
319 };
320 }
321
322 async fn find_successor_list(&mut self, id: Digest) -> DhtResult<Vec<Node>> {
325 let n = self.find_predecessor(id).await?;
326 let c = self.get_connection(&n).await?;
327 let succ_list = c.get_successor_list_rpc(context::current()).await?;
328 Ok(succ_list)
329 }
330
331 async fn find_predecessor(&mut self, id: Digest) -> DhtResult<Node> {
333 debug!("{}: find_predecessor({})", self.node, id);
334 let mut n = self.node.clone();
335 let mut succ = self.get_successor();
336 let mut conn = self.get_connection(&n).await?;
337 let ctx = context::current();
338
339 while !(in_range(id, n.id, succ.id) || id == succ.id) {
341 debug!("{}: find_predecessor range ({}, {}]", self.node, n.id, succ.id);
342 n = conn.closest_preceding_finger_rpc(ctx, id).await?;
343 conn = self.get_connection(&n).await?;
344 succ = conn.get_successor_rpc(ctx).await?;
345 }
346 debug!("{}: find_predecessor({}) returns {}", self.node, id, n);
347 Ok(n)
348 }
349
350 async fn closest_preceding_finger(&mut self, id: Digest) -> Node {
352 let table = self.finger_table.read().unwrap();
353 for i in (0..NUM_BITS).rev() {
354 let f = if i > 0 {
355 table[i].clone()
356 } else {
357 self.get_successor()
359 };
360 if in_range(f.id, self.node.id, id) {
361 return f;
362 };
363 }
364 self.node.clone()
365 }
366
367 async fn notify(&mut self, node: Node) {
369 let pred = self.get_predecessor();
370 if let Some(p) = pred {
371 if !in_range(node.id, p.id, self.node.id) {
372 return;
373 }
374 }
375
376 debug!("{}: new predecessor set in notify: {}", self.node, node);
377 self.set_predecessor(Some(node));
378 }
379
380 async fn get(&mut self, key: Key) -> DhtResult<Option<Value>> {
382 match self.store.get(&key) {
384 Some(v) => return Ok(Some(v)),
385 None => ()
386 };
387
388 let id = calculate_hash(&key);
390 let succ_list = self.find_successor_list(id).await?;
391 for succ in succ_list.iter() {
392 let c = self.get_connection(&succ).await?;
393 match c.get_local_rpc(context::current(), key.clone()).await {
394 Ok(value) => return Ok(value),
395 Err(e) => {
396 warn!("{}: fail to get key digest {} from {}: {}", self.node, id, succ, e);
397 }
399 };
400 }
401
402 Err(NoLiveReplica(id))
403 }
404
405 async fn set(&mut self, key: Key, value: Option<Value>) -> DhtResult<()> {
407 let id = calculate_hash(&key);
408 let succ_list = self.find_successor_list(id).await?;
409 let c = self.get_connection(&succ_list[0]).await?;
410
411 c.replicate_rpc(context::current(), key, value).await?;
412 Ok(())
413 }
414
415 async fn replicate(&mut self, key: Key, value: Option<Value>) -> DhtResult<()> {
417 self.store.set(key.clone(), value.clone());
419
420 let num = (self.config.replication_factor - 1) as usize;
422 if num > 0 {
423 let ctx = context::current();
424 let mut conn_list = Vec::new();
426 let mut fut_list = Vec::new();
427 for i in 0..num {
428 let node = self.successor_list.read().unwrap()[i].clone();
429 let c = self.get_connection(&node).await?;
430 conn_list.push(c);
431 }
432
433 for c in conn_list.iter() {
434 let k = key.clone();
435 let v = value.clone();
436 fut_list.push(c.set_local_rpc(ctx, k, v));
437 }
438
439 future::join_all(fut_list)
441 .await
442 .into_iter()
443 .collect::<Result<Vec<_>, _>>()?;
444 }
445 Ok(())
446 }
447}
448
449#[tarpc::server]
450impl NodeService for NodeServer {
451 async fn get_node_rpc(self, _: context::Context) -> Node {
452 self.node.clone()
453 }
454
455 async fn get_predecessor_rpc(self, _: context::Context) -> Option<Node> {
456 self.get_predecessor()
457 }
458
459 async fn get_successor_rpc(self, _: context::Context) -> Node {
460 self.get_successor()
461 }
462
463 async fn get_successor_list_rpc(self, _: context::Context) -> Vec<Node> {
464 self.get_successor_list()
465 }
466
467 async fn find_successor_list_rpc(mut self, _: context::Context, id: Digest) -> Vec<Node> {
468 loop {
469 for i in 0..(self.config.retry_limit+1) {
470 match self.find_successor_list(id).await {
471 Ok(succ_list) => return succ_list,
472 Err(e) => {
473 warn!("{}: find_successor_list_rpc failed (retry {}): {}", self.node, i, e);
474 tokio::time::sleep(
475 tokio::time::Duration::from_millis(self.config.retry_interval)
476 ).await;
477 }
478 };
479 }
480
481 warn!("{}: find_successor_list_rpc retry limit reached", self.node);
482 self.stabilize().await;
484 }
485 }
486
487 async fn find_predecessor_rpc(mut self, _: context::Context, id: Digest) -> Node {
488 loop {
489 for i in 0..(self.config.retry_limit+1) {
490 match self.find_predecessor(id).await {
491 Ok(succ_list) => return succ_list,
492 Err(e) => {
493 warn!("{}: find_predecessor_rpc failed (retry {}): {}", self.node, i, e);
494 tokio::time::sleep(
495 tokio::time::Duration::from_millis(self.config.retry_interval)
496 ).await;
497 }
498 };
499 }
500
501 warn!("{}: find_predecessor_rpc retry limit reached", self.node);
502 self.stabilize().await;
504 }
505 }
506
507 async fn closest_preceding_finger_rpc(mut self, _: context::Context, id: Digest) -> Node {
508 self.closest_preceding_finger(id).await
509 }
510
511 async fn notify_rpc(mut self, _: context::Context, node: Node) {
512 self.notify(node).await
513 }
514
515 async fn stabilize_rpc(mut self, _: context::Context) {
516 self.stabilize().await
517 }
518
519 async fn get_local_rpc(self, _: context::Context, key: Key) -> Option<Value> {
520 self.store.get(&key)
521 }
522
523 async fn set_local_rpc(self, _: context::Context, key: Key, value: Option<Value>) {
524 self.store.set(key, value)
525 }
526
527 async fn get_rpc(mut self, _: context::Context, key: Key) -> Option<Value> {
528 loop {
529 for i in 0..(self.config.retry_limit+1) {
530 match self.get(key.clone()).await {
531 Ok(value) => return value,
532 Err(e) => {
533 warn!("{}: get_rpc failed (retry {}): {}", self.node, i, e);
534 tokio::time::sleep(
535 tokio::time::Duration::from_millis(self.config.retry_interval)
536 ).await;
537 }
538 };
539 }
540
541 warn!("{}: get_rpc retry limit reached", self.node);
542 self.stabilize().await;
544 }
545 }
546
547 async fn set_rpc(mut self, _: context::Context, key: Key, value: Option<Value>) {
548 loop {
549 for i in 0..(self.config.retry_limit+1) {
550 match self.set(key.clone(), value.clone()).await {
551 Ok(_) => return,
552 Err(e) => {
553 warn!("{}: set_rpc failed (retry {}): {}", self.node, i, e);
554 tokio::time::sleep(
555 tokio::time::Duration::from_millis(self.config.retry_interval)
556 ).await;
557 }
558 };
559 }
560
561 warn!("{}: set_rpc retry limit reached", self.node);
562 self.stabilize().await;
564 }
565 }
566
567 async fn replicate_rpc(mut self, _: context::Context, key: Key, value: Option<Value>) {
568 loop {
569 for i in 0..(self.config.retry_limit+1) {
570 match self.replicate(key.clone(), value.clone()).await {
571 Ok(_) => return,
572 Err(e) => {
573 warn!("{}: replicate_rpc failed (retry {}): {}", self.node, i, e);
574 tokio::time::sleep(
575 tokio::time::Duration::from_millis(self.config.retry_interval)
576 ).await;
577 }
578 };
579 }
580
581 warn!("{}: replicate_rpc retry limit reached", self.node);
582 self.stabilize().await;
584 }
585 }
586}
587
588
589#[cfg(test)]
590mod tests {
591 use super::*;
592
593 async fn fix_all_fingers(server: &mut NodeServer) {
594 for i in 1..NUM_BITS {
595 server.fix_finger(i).await;
596 }
597 }
598
599 #[tokio::test]
601 async fn test_node_metadata() -> DhtResult<()> {
602 env_logger::init();
603
604 let n0 = Node {
606 addr: "localhost:9800".to_string(),
607 id: 0
608 };
609 let n1 = Node {
611 addr: "localhost:9801".to_string(),
612 id: 1
613 };
614 let n3 = Node {
616 addr: "localhost:9803".to_string(),
617 id: 3
618 };
619 let n6 = Node {
621 addr: "localhost:9806".to_string(),
622 id: 6
623 };
624
625 let config = Config {
627 fix_finger_interval: 0,
628 stabilize_interval: 0,
629 ..Config::default()
630 };
631 let mut s0 = NodeServer::new(n0.clone(), config.clone());
632 let m0 = s0.start(None).await?;
633 s0.stabilize().await;
634 assert_eq!(s0.get_predecessor().unwrap().id, 0);
636 assert_eq!(s0.get_successor().id, 0);
637
638
639 let mut s1 = NodeServer::new(n1.clone(), config.clone());
641 let m1 = s1.start(Some(n0.clone())).await?;
642 assert_eq!(s1.get_successor().id, 0);
643
644 s1.stabilize().await;
646 assert_eq!(s0.get_predecessor().unwrap().id, 1);
647 s0.stabilize().await;
648 assert_eq!(s0.get_predecessor().unwrap().id, 1);
649 assert_eq!(s0.get_successor().id, 1);
650 assert_eq!(s1.get_predecessor().unwrap().id, 0);
651 assert_eq!(s1.get_successor().id, 0);
652
653 fix_all_fingers(&mut s0).await;
655 {
656 let table = s0.finger_table.read().unwrap();
657 assert_eq!(table[1].id, 0);
658 }
659 fix_all_fingers(&mut s1).await;
660 {
661 let table = s1.finger_table.read().unwrap();
662 assert_eq!(table[1].id, 0);
663 assert_eq!(table[2].id, 0);
664 }
665
666
667 let mut s3 = NodeServer::new(n3.clone(), config.clone());
669 let m3 = s3.start(Some(n1.clone())).await?;
670 s3.stabilize().await;
671 s1.stabilize().await;
672 s0.stabilize().await;
673
674 assert_eq!(s3.get_predecessor().unwrap().id, 1);
675 assert_eq!(s1.get_predecessor().unwrap().id, 0);
676 assert_eq!(s0.get_predecessor().unwrap().id, 3);
677
678 fix_all_fingers(&mut s0).await;
680 {
681 let table = s0.finger_table.read().unwrap();
682 assert_eq!(s0.get_successor().id, 1);
683 assert_eq!(table[1].id, 3);
684 assert_eq!(table[2].id, 0);
685 }
686 fix_all_fingers(&mut s1).await;
687 {
688 let table = s1.finger_table.read().unwrap();
689 assert_eq!(s1.get_successor().id, 3);
690 assert_eq!(table[1].id, 3);
691 assert_eq!(table[2].id, 0);
692 }
693 fix_all_fingers(&mut s3).await;
694 {
695 let table = s3.finger_table.read().unwrap();
696 assert_eq!(s3.get_successor().id, 0);
697 assert_eq!(table[1].id, 0);
698 assert_eq!(table[2].id, 0);
699 }
700
701
702 let mut s6 = NodeServer::new(n6.clone(), config.clone());
704 let m6 = s6.start(Some(n0.clone())).await?;
705 s6.stabilize().await;
706 s3.stabilize().await;
707 s1.stabilize().await;
708 s0.stabilize().await;
709
710 assert_eq!(s6.get_predecessor().unwrap().id, 3);
711 assert_eq!(s0.get_predecessor().unwrap().id, 6);
712 assert_eq!(s1.get_predecessor().unwrap().id, 0);
713 assert_eq!(s3.get_predecessor().unwrap().id, 1);
714
715 fix_all_fingers(&mut s0).await;
717 {
718 let table = s0.finger_table.read().unwrap();
719 assert_eq!(s0.get_successor().id, 1);
720 assert_eq!(table[1].id, 3);
721 assert_eq!(table[2].id, 6);
722 }
723 fix_all_fingers(&mut s1).await;
724 {
725 let table = s1.finger_table.read().unwrap();
726 assert_eq!(s1.get_successor().id, 3);
727 assert_eq!(table[1].id, 3);
728 assert_eq!(table[2].id, 6);
729 }
730 fix_all_fingers(&mut s3).await;
731 {
732 let table = s3.finger_table.read().unwrap();
733 assert_eq!(s3.get_successor().id, 6);
734 assert_eq!(table[1].id, 6);
735 assert_eq!(table[2].id, 0);
736 }
737 fix_all_fingers(&mut s6).await;
738 {
739 let table = s6.finger_table.read().unwrap();
740 assert_eq!(s6.get_successor().id, 0);
741 assert_eq!(table[1].id, 0);
742 assert_eq!(table[2].id, 0);
744 }
745
746 m0.stop().await?;
747 m1.stop().await?;
748 m3.stop().await?;
749 m6.stop().await?;
750 Ok(())
751 }
752}