1use std::fmt::{Debug, Display};
2
3use std::collections::HashMap;
4use std::marker::PhantomData;
12use std::time::Instant;
13
14use std::pin::Pin;
15use std::task::{Context, Poll};
16
17use futures::channel::mpsc::Sender;
18use futures::prelude::*;
19
20use log::{debug, trace, warn};
21
22use crate::Config;
23
24use crate::common::*;
25
26use crate::store::{Datastore, HashMapStore};
27use crate::table::{KNodeTable, NodeTable};
28
29mod operation;
30pub use operation::*;
31
32mod connect;
33pub use connect::*;
34
35mod locate;
36pub use locate::*;
37
38mod search;
39pub use search::*;
40
41mod store;
42pub use store::*;
43
44pub struct Dht<Id, Info, Data, ReqId, Table = KNodeTable<Id, Info>, Store = HashMapStore<Id, Data>>
45{
46 id: Id,
47
48 config: Config,
49 table: Table,
50 conn_mgr: Sender<(ReqId, Entry<Id, Info>, Request<Id, Data>)>,
51 datastore: Store,
52
53 operations: HashMap<ReqId, Operation<Id, Info, Data, ReqId>>,
54
55 _addr: PhantomData<Info>,
56 _data: PhantomData<Data>,
57 _req_id: PhantomData<ReqId>,
58}
59
60impl<Id, Info, Data, ReqId, Table, Store> Dht<Id, Info, Data, ReqId, Table, Store>
61where
62 Id: DatabaseId + Clone + Sized + Send + 'static,
63 Info: PartialEq + Clone + Sized + Debug + Send + 'static,
64 Data: PartialEq + Clone + Sized + Debug + Send + 'static,
65 ReqId: RequestId + Clone + Sized + Display + Debug + Send + 'static,
66 Table: NodeTable<Id, Info> + Send + 'static,
67 Store: Datastore<Id, Data> + Send + 'static,
68{
69 pub fn custom(
71 id: Id,
72 config: Config,
73 conn_mgr: Sender<(ReqId, Entry<Id, Info>, Request<Id, Data>)>,
74 table: Table,
75 datastore: Store,
76 ) -> Dht<Id, Info, Data, ReqId, Table, Store> {
77 Dht {
78 id,
79 config,
80 table,
81 conn_mgr,
82 datastore,
83
84 operations: HashMap::new(),
85
86 _addr: PhantomData,
87 _data: PhantomData,
88 _req_id: PhantomData,
89 }
90 }
91
92 pub fn handle_req(
94 &mut self,
95 _req_id: ReqId,
96 from: &Entry<Id, Info>,
97 req: &Request<Id, Data>,
98 ) -> Result<Response<Id, Info, Data>, Error> {
99 let resp = match req {
101 Request::Ping => Response::NoResult,
102 Request::FindNode(id) => {
103 let nodes = self.table.nearest(id, 0..self.config.k);
104 Response::NodesFound(id.clone(), nodes)
105 },
106 Request::FindValue(id) => {
107 if let Some(values) = self.datastore.find(id) {
109 debug!("Found {} values for id: {:?}", values.len(), id);
110 Response::ValuesFound(id.clone(), values)
111 } else {
112 debug!("No values found, returning closer nodes for id: {:?}", id);
113 let nodes = self.table.nearest(id, 0..self.config.k);
114 Response::NodesFound(id.clone(), nodes)
115 }
116 },
117 Request::Store(id, value) => {
118 let values = self.datastore.store(id, value);
120 if values.len() != 0 {
122 debug!("Stored {} values for id: {:?}", values.len(), id);
123 Response::ValuesFound(id.clone(), values)
124 } else {
125 debug!("Ignored values for id: {:?}", id);
126 Response::NoResult
127 }
128 },
129 };
130
131 self.table.create_or_update(from);
133
134 Ok(resp)
135 }
136
137 pub fn handle_resp(
139 &mut self,
140 req_id: ReqId,
141 from: &Entry<Id, Info>,
142 resp: &Response<Id, Info, Data>,
143 ) -> Result<(), Error> {
144 let mut op = match self.operations.remove(&req_id) {
146 Some(v) => v,
147 None => {
148 warn!("No matching operation for request id: {}", req_id);
149 return Ok(());
150 }
151 };
152
153 debug!(
154 "Operation {} response from {:?}: {}",
155 req_id, from.id(), resp
156 );
157 trace!("Response: {:?}", resp);
158
159 match op.nodes.get_mut(from.id()) {
161 Some((_e, s)) if *s != RequestState::Active => {
162 warn!("Operation {} unexpected response from: {:?}", req_id, from.id());
163 *s = RequestState::InvalidResponse;
164 return Ok(());
165 }
166 Some((_e, s)) if *s == RequestState::Active => {
167 *s = RequestState::Complete;
168 }
169 _ => (),
170 }
171
172 self.table.create_or_update(from);
174
175 op.nodes.entry(from.id().clone())
177 .or_insert((from.clone(), RequestState::Complete));
178
179 let v = match &resp {
181 Response::NodesFound(id, entries) if id == &op.target => {
182 debug!("Operation {}, adding {} nodes to map", req_id, entries.len());
183 trace!("Entries: {:?}", entries);
184
185 for e in entries {
186 debug!("Operation {}, add node {:?}", req_id, e.id());
187
188 if e.id() == &self.id {
190 continue;
191 }
192
193 self.table.create_or_update(e);
195
196 if e.id() == from.id() {
199 continue;
200 }
201
202 op.nodes
205 .entry(e.id().clone())
206 .or_insert((e.clone(), RequestState::Pending));
207 }
208
209 RequestState::Complete
210 }
211 Response::NodesFound(id, _entries) => {
212 debug!(
213 "Operation {}, invalid nodes response: {:?} from: {:?} (id: {:?})",
214 req_id, resp, from.id(), id
215 );
216 RequestState::InvalidResponse
217 }
218 Response::ValuesFound(id, values) if id == &op.target => {
219 debug!("Operation {}, adding {} values to map", req_id, values.len());
220 trace!("Values: {:?}", values);
221
222 op.data.insert(from.id().clone(), values.clone());
224
225 RequestState::Complete
226 }
227 Response::ValuesFound(id, _values) => {
228 debug!(
229 "Operation {}, invalid values response from: {:?} (id: {:?})",
230 req_id, from.id(), id
231 );
232 trace!("Invalid response: {:?}", resp);
233 RequestState::InvalidResponse
234 }
235 Response::NoResult => {
236 debug!("Operation {}, empty response from: {:?}", req_id, from);
237 RequestState::Complete
238 }
239 };
240
241 let e = op
243 .nodes
244 .entry(from.id().clone())
245 .and_modify(|(_e, s)| *s = v)
246 .or_insert((from.clone(), v));
247
248 debug!("update node {:?} state: {:?}", e.0.id(), e.1);
249
250 trace!("Operation state: {:?}", op);
251
252 self.operations.insert(req_id, op);
254
255 return Ok(());
256 }
257
258 pub(crate) fn exec(
260 &mut self,
261 req_id: ReqId,
262 target: Id,
263 kind: OperationKind<Id, Info, Data>,
264 ) -> Result<(), Error> {
265 debug!("Registering operation id: {}", req_id);
266
267 let op = Operation::new(req_id.clone(), target.clone(), kind);
269
270 self.operations.insert(req_id.clone(), op);
273
274 Ok(())
276 }
277
278 pub fn update(&mut self) -> Result<(), Error> {
281 let mut req_sink = self.conn_mgr.clone();
282 let mut done = vec![];
283
284 for (req_id, op) in self.operations.iter_mut() {
286 let req = match &op.kind {
288 OperationKind::Connect(_tx) => Request::FindNode(op.target.clone()),
289 OperationKind::FindNode(_tx) => Request::FindNode(op.target.clone()),
290 OperationKind::FindValues(_tx) => Request::FindNode(op.target.clone()),
291 OperationKind::Store(_v, _tx) => Request::FindNode(op.target.clone()),
292 };
293
294 match op.state.clone() {
296 OperationState::Init => {
298 debug!("Operation {} ({}) start", req_id, &op.kind);
299
300 let nearest: Vec<_> =
302 self.table.nearest(&op.target, 0..self.config.concurrency);
303 for e in &nearest {
304 op.nodes
305 .insert(e.id().clone(), (e.clone(), RequestState::Active));
306 }
307
308 let mut nodes: Vec<_> =
310 op.nodes.iter().map(|(_k, (n, _s))| n.clone()).collect();
311 nodes.sort_by_key(|n| Id::xor(&op.target, n.id()));
312
313 debug!(
314 "Initiating {} operation ({}), sending {} request to {} peers",
315 &op.kind, req_id, req, nodes.len()
316 );
317
318 for e in nodes.iter() {
320 trace!("Operation {} issuing {} to: {:?}", req_id, req, e.id());
321
322 let _ = req_sink.try_send((req_id.clone(), e.clone(), req.clone()));
324 }
325
326 debug!("Operation {} entering searching state", req_id);
328 op.state = OperationState::Searching(0)
329 }
330 OperationState::Connecting => {
332 let nodes = op.nodes.clone();
334 let mut known: Vec<_> = nodes.iter().collect();
335 known.sort_by_key(|(k, _)| Id::xor(&op.target, k));
336
337 let pending = known
338 .iter()
339 .filter(|(_key, (_e, s))| *s == RequestState::Pending)
340 .count();
341
342 let search_timeout = self.config.search_timeout;
344 let expired = Instant::now()
345 .checked_duration_since(op.last_update)
346 .map(|d| d > search_timeout)
347 .unwrap_or(false);
348
349 if nodes.len() > 0 {
350 debug!(
351 "Operation {} connect response received ({} peers)",
352 req_id, pending
353 );
354
355 if pending > 0 {
357 op.state = OperationState::Search(0);
358 } else {
359 op.state = OperationState::Done;
360 }
361 } else if expired {
362 debug!("Operation {} connect timeout", req_id);
363 op.state = OperationState::Done;
364 }
365 }
366 OperationState::Searching(n) => {
368 let nodes = op.nodes.clone();
370 let mut known: Vec<_> = nodes.iter().collect();
371 known.sort_by_key(|(k, _)| Id::xor(&op.target, k));
372
373 let active: Vec<_> = (&known[0..usize::min(known.len(), self.config.k)])
375 .iter()
376 .filter(|(_key, (_e, s))| *s == RequestState::Active)
377 .collect();
378
379 let pending: Vec<_> = (&known[0..usize::min(known.len(), self.config.k)])
381 .iter()
382 .filter(|(_key, (_e, s))| *s == RequestState::Pending)
383 .collect();
384
385 let search_timeout = self.config.search_timeout;
387 let expired = Instant::now()
388 .checked_duration_since(op.last_update)
389 .map(|d| d > search_timeout)
390 .unwrap_or(false);
391
392 trace!("active: {:?}", active);
393 trace!("pending: {:?}", pending);
394 trace!("data: {:?}", op.data);
395
396 match (active.len(), pending.len(), expired, &op.kind) {
397 (0, 0, _, _) => {
399 debug!("Operation {} search complete!", req_id);
400 op.state = OperationState::Request;
401 }
402 (0, _, _, _) => {
404 debug!(
405 "Operation {}, all responses received, re-starting search",
406 req_id
407 );
408 op.state = OperationState::Search(n + 1);
409 }
410 (_, _, true, _) => {
412 debug!("Operation {} timeout at iteration {}", req_id, n);
413
414 for (id, _n) in active {
416 op.nodes
417 .entry((*id).clone())
418 .and_modify(|(_n, s)| *s = RequestState::Timeout);
419 }
420
421 op.state = OperationState::Search(n + 1);
422 }
423 _ => (),
424 }
425 }
426 OperationState::Search(n) => {
428 if n > self.config.max_recursion {
430 debug!("Reached recursion limit, aborting search {}", req_id);
431 op.state = OperationState::Pending;
432 continue;
433 }
434
435 debug!("Operation {} ({}) search iteration {}", req_id, &op.kind, n);
436
437 let own_id = self.id.clone();
439 let mut nearest: Vec<_> = op
440 .nodes
441 .iter()
442 .filter(|(k, (_n, s))| (*s == RequestState::Pending) & (*k != &own_id))
443 .map(|(_k, (n, _s))| n.clone())
444 .collect();
445
446 debug!("Operation {} nearest: {:?}", req_id, nearest);
447
448 nearest.sort_by_key(|n| Id::xor(&op.target, n.id()));
450 let n = usize::min(self.config.concurrency, nearest.len());
451
452 if n == 0 {
454 op.state = OperationState::Request;
455 }
456
457 debug!(
459 "Operation {} issuing search request: {:?} to: {:?}",
460 req_id,
461 req,
462 &nearest[0..n]
463 );
464 for n in &nearest[0..n] {
465 op.nodes
466 .entry(n.id().clone())
467 .and_modify(|(_n, s)| *s = RequestState::Active);
468
469 let _ = req_sink.try_send((req_id.clone(), n.clone(), req.clone()));
471 }
472
473 debug!("Operation {} entering searching state", req_id);
475 op.state = OperationState::Searching(n);
476 }
477 OperationState::Request => {
479 let req = match &op.kind {
481 OperationKind::Store(v, _) => Request::Store(op.target.clone(), v.clone()),
482 OperationKind::FindValues(_) => Request::FindValue(op.target.clone()),
483 _ => {
484 debug!("Operation {} entering done state", req_id);
485 op.state = OperationState::Done;
486 continue;
487 }
488 };
489
490 let own_id = self.id.clone();
493 let mut nearest: Vec<_> = op
494 .nodes
495 .iter()
496 .filter(|(k, (_n, s))| (*s == RequestState::Complete) & (*k != &own_id))
497 .map(|(_k, (n, _s))| n.clone())
498 .collect();
499
500 nearest.sort_by_key(|n| Id::xor(&op.target, n.id()));
502 let range = 0..usize::min(self.config.concurrency, nearest.len());
503
504 debug!(
505 "Operation {} ({}) issuing {} request to {} peers",
506 req_id,
507 &op.kind,
508 req,
509 nearest[range.clone()].len()
510 );
511
512 for n in &nearest[range] {
513 op.nodes
514 .entry(n.id().clone())
515 .and_modify(|(_n, s)| *s = RequestState::Active);
516
517 let _ = req_sink.try_send((req_id.clone(), n.clone(), req.clone()));
519 }
520
521 op.state = OperationState::Pending;
522 }
523 OperationState::Pending => {
525 let nodes = op.nodes.clone();
527 let mut known: Vec<_> = nodes.iter().collect();
528 known.sort_by_key(|(k, _)| Id::xor(&op.target, k));
529
530 let active: Vec<_> = (&known[0..usize::min(known.len(), self.config.k)])
532 .iter()
533 .filter(|(_key, (_e, s))| *s == RequestState::Active)
534 .collect();
535
536 let search_timeout = self.config.search_timeout;
538 let expired = Instant::now()
539 .checked_duration_since(op.last_update)
540 .map(|d| d > search_timeout)
541 .unwrap_or(false);
542
543 if active.len() == 0 || expired {
544 debug!("Operation {} ({}) entering done state", req_id, &op.kind);
545 op.state = OperationState::Done;
546 }
547 }
548 OperationState::Done => {
550 debug!("Operating {} ({}) done", req_id, &op.kind);
551
552 match &op.kind {
553 OperationKind::Connect(tx) => {
554 let mut peers: Vec<_> = op
555 .nodes
556 .iter()
557 .filter(|(_k, (_n, s))| *s == RequestState::Complete)
558 .map(|(_k, (e, _s))| e.clone())
559 .collect();
560
561 let own_id = self.id.clone();
562 peers.sort_by_key(|n| Id::xor(&own_id, n.id()));
563 if peers.len() > 0 {
564 tx.clone().try_send(Ok(peers)).unwrap();
565 } else {
566 tx.clone().try_send(Err(Error::NotFound)).unwrap();
567 }
568 }
569 OperationKind::FindNode(tx) => {
570 trace!("Found nodes: {:?}", op.nodes);
571 match op.nodes.get(&op.target) {
572 Some((n, _s)) => tx.clone().try_send(Ok(n.clone())).unwrap(),
573 None => tx.clone().try_send(Err(Error::NotFound)).unwrap(),
574 };
575 }
576 OperationKind::FindValues(tx) => {
577 if op.data.len() > 0 {
578 let mut flat_data: Vec<Data> =
580 op.data.iter().flat_map(|(_k, v)| v.clone()).collect();
581
582 if let Some(mut existing) = self.datastore.find(&op.target) {
584 flat_data.append(&mut existing);
585 }
586
587 debug!("Operation {} values found: {:?}", req_id, flat_data);
590
591 tx.clone().try_send(Ok(flat_data)).unwrap();
592 } else {
593 tx.clone().try_send(Err(Error::NotFound)).unwrap();
594 }
595 }
596 OperationKind::Store(_values, tx) => {
597 if op.data.len() > 0 {
601 let flat_ids: Vec<_> =
602 op.data.iter().map(|(k, _v)| k.clone()).collect();
603 let mut flat_nodes: Vec<_> = flat_ids
604 .iter()
605 .filter_map(|id| op.nodes.get(id).map(|(e, _s)| e.clone()))
606 .collect();
607 flat_nodes.sort_by_key(|n| Id::xor(&op.target, n.id()));
608
609 debug!("Operation {} stored at {} peers", req_id, flat_ids.len());
612
613 tx.clone().try_send(Ok(flat_nodes)).unwrap();
614 } else {
615 debug!("Operation {} store failed", req_id);
616 tx.clone().try_send(Err(Error::NotFound)).unwrap();
617 }
618 }
619 }
620
621 done.push(req_id.clone());
623 }
624 };
625 }
626
627 for req_id in done {
628 self.operations.remove(&req_id);
629 }
630
631 Ok(())
632 }
633
634 #[cfg(nope)]
636 pub async fn refresh(&mut self) -> Result<(), ()> {
637 let timeout = self.config.node_timeout;
644 let oldest: Vec<_> = self
645 .table
646 .iter_oldest()
647 .filter(move |o| {
648 if let Some(seen) = o.seen() {
649 seen.add(timeout) < Instant::now()
650 } else {
651 true
652 }
653 })
654 .collect();
655
656 let mut pings = Vec::with_capacity(oldest.len());
657
658 for o in oldest {
659 let mut t = self.table.clone();
660 let mut o = o.clone();
661
662 let mut conn = self.conn_mgr.clone();
663
664 let p = async move {
665 let res = conn
666 .request(ReqId::generate(), o.clone(), Request::Ping)
667 .await;
668 match res {
669 Ok(_resp) => {
670 debug!("[DHT refresh] updating node: {:?}", o);
671 o.set_seen(Instant::now());
672 t.create_or_update(&o);
673 }
674 Err(_e) => {
675 debug!("[DHT refresh] expiring node: {:?}", o);
676 t.remove_entry(o.id());
677 }
678 }
679
680 ()
681 };
682
683 pings.push(p);
684 }
685
686 future::join_all(pings).await;
687
688 Ok(())
689 }
690
691 pub fn nodetable(&self) -> &Table {
692 &self.table
693 }
694
695 pub fn nodetable_mut(&mut self) -> &mut Table {
696 &mut self.table
697 }
698
699 pub fn datastore(&self) -> &Store {
700 &self.datastore
701 }
702
703 pub fn datastore_mut(&mut self) -> &mut Store {
704 &mut self.datastore
705 }
706
707 #[cfg(test)]
708 pub fn contains(&mut self, id: &Id) -> Option<Entry<Id, Info>> {
709 self.table.contains(id)
710 }
711}
712
713impl<Id, Info, Data, ReqId, Table, Store> Unpin for Dht<Id, Info, Data, ReqId, Table, Store> {}
714
715impl<Id, Info, Data, ReqId, Table, Store> Future for Dht<Id, Info, Data, ReqId, Table, Store>
716where
717 Id: DatabaseId + Clone + Sized + Send + 'static,
718 Info: PartialEq + Clone + Sized + Debug + Send + 'static,
719 Data: PartialEq + Clone + Sized + Debug + Send + 'static,
720 ReqId: RequestId + Clone + Sized + Display + Debug + Send + 'static,
721 Table: NodeTable<Id, Info> + Clone + Send + 'static,
722 Store: Datastore<Id, Data> + Clone + Send + 'static,
723{
724 type Output = Result<(), Error>;
725
726 fn poll(mut self: Pin<&mut Self>, _ctx: &mut Context<'_>) -> Poll<Self::Output> {
728 let _ = self.update();
729
730 Poll::Pending
731 }
732}
733
734#[cfg(test)]
736#[macro_export]
737#[cfg(test)]
738macro_rules! mock_dht {
739 ($connector: ident, $root: ident, $dht:ident) => {
740 let mut config = Config::default();
741 config.concurrency = 2;
742 mock_dht!($connector, $root, $dht, config);
743 };
744 ($connector: ident, $root: ident, $dht:ident, $config:ident) => {
745 let mut $dht =
746 Dht::<[u8; 1], _, u64, u64>::standard($root.id().clone(), $config, $connector.clone());
747 };
748}
749
750#[cfg(test)]
751mod tests {
752 use std::clone::Clone;
753
754 extern crate futures;
755 use futures::channel::mpsc;
756
757 use super::*;
758 use crate::store::Datastore;
759
760 #[test]
761 fn test_receive_common() {
762 let root = Entry::new([0], 001);
763 let friend = Entry::new([1], 002);
764
765 let (tx, _rx) = mpsc::channel(10);
766 mock_dht!(tx, root, dht);
767
768 assert!(dht.table.contains(friend.id()).is_none());
770
771 assert_eq!(
773 dht.handle_req(1, &friend, &Request::Ping).unwrap(),
774 Response::NoResult,
775 );
776
777 let friend1 = dht.table.contains(friend.id()).unwrap();
779
780 assert_eq!(
782 dht.handle_req(2, &friend, &Request::Ping).unwrap(),
783 Response::NoResult,
784 );
785
786 let friend2 = dht.table.contains(friend.id()).unwrap();
788 assert_ne!(friend1.seen(), friend2.seen());
789 }
790
791 #[test]
792 fn test_receive_ping() {
793 let root = Entry::new([0], 001);
794 let friend = Entry::new([1], 002);
795
796 let (tx, _rx) = mpsc::channel(10);
797 mock_dht!(tx, root, dht);
798
799 assert_eq!(
801 dht.handle_req(1, &friend, &Request::Ping).unwrap(),
802 Response::NoResult,
803 );
804 }
805
806 #[test]
807 fn test_receive_find_nodes() {
808 let root = Entry::new([0], 001);
809 let friend = Entry::new([1], 002);
810 let other = Entry::new([2], 003);
811
812 let (tx, _rx) = mpsc::channel(10);
813 mock_dht!(tx, root, dht);
814
815 dht.table.create_or_update(&friend);
817
818 assert_eq!(
820 dht.handle_req(1, &friend, &Request::FindNode(other.id().clone()))
821 .unwrap(),
822 Response::NodesFound(other.id().clone(), vec![friend.clone()]),
823 );
824 }
825
826 #[test]
827 fn test_receive_find_values() {
828 let root = Entry::new([0], 001);
829 let friend = Entry::new([1], 002);
830 let other = Entry::new([2], 003);
831
832 let (tx, _rx) = mpsc::channel(10);
833 mock_dht!(tx, root, dht);
834
835 dht.table.create_or_update(&friend);
837
838 assert_eq!(
840 dht.handle_req(1, &other, &Request::FindValue([201]))
841 .unwrap(),
842 Response::NodesFound([201], vec![friend.clone()]),
843 );
844
845 dht.datastore.store(&[201], &vec![1337]);
847
848 assert_eq!(
850 dht.handle_req(2, &other, &Request::FindValue([201]))
851 .unwrap(),
852 Response::ValuesFound([201], vec![1337]),
853 );
854 }
855
856 #[test]
857 fn test_receive_store() {
858 let root = Entry::new([0], 001);
859 let friend = Entry::new([1], 002);
860
861 let (tx, _rx) = mpsc::channel(10);
862 mock_dht!(tx, root, dht);
863
864 assert_eq!(
866 dht.handle_req(1, &friend, &Request::Store([2], vec![1234]))
867 .unwrap(),
868 Response::ValuesFound([2], vec![1234]),
869 );
870
871 let v = dht.datastore.find(&[2]).expect("missing value");
872 assert_eq!(v, vec![1234]);
873 }
874
875 #[cfg(nope)]
876 #[test]
877 fn test_expire() {
878 let mut config = Config::default();
879 config.node_timeout = Duration::from_millis(200);
880
881 let root = Entry::new([0], 001);
882 let n1 = Entry::new([1], 002);
883 let n2 = Entry::new([2], 003);
884
885 let (tx, _rx) = mpsc::channel(10);
886 let c = config.clone();
887 mock_dht!(tx, root, dht, config);
888
889 dht.table.create_or_update(&n1);
891 dht.table.create_or_update(&n2);
892
893 block_on(dht.refresh(())).unwrap();
895 connector.finalise();
896
897 std::thread::sleep(config.node_timeout * 2);
898
899 connector.expect(vec![
901 Mt::request(n1.clone(), Request::Ping, Ok((Response::NoResult, ()))),
902 Mt::request(n2.clone(), Request::Ping, Ok((Response::NoResult, ()))),
903 ]);
904 block_on(dht.refresh(())).unwrap();
905
906 assert!(dht.table.contains(n1.id()).is_some());
907 assert!(dht.table.contains(n2.id()).is_some());
908
909 connector.finalise();
910
911 std::thread::sleep(config.node_timeout * 2);
912
913 connector.expect(vec![
915 Mt::request(n1.clone(), Request::Ping, Ok((Response::NoResult, ()))),
916 Mt::request(n2.clone(), Request::Ping, Err(Error::Timeout)),
917 ]);
918 block_on(dht.refresh(())).unwrap();
919
920 assert!(dht.table.contains(n1.id()).is_some());
921 assert!(dht.table.contains(n2.id()).is_none());
922
923 connector.finalise();
925 }
926}