kad/dht/
mod.rs

1use std::fmt::{Debug, Display};
2
3use std::collections::HashMap;
4/**
5 * rust-kad
6 * Kademlia core implementation
7 *
8 * https://github.com/ryankurte/rust-kad
9 * Copyright 2018 Ryan Kurte
10 */
11use std::marker::PhantomData;
12use std::time::Instant;
13
14use std::pin::Pin;
15use std::task::{Context, Poll};
16
17use futures::channel::mpsc::Sender;
18use futures::prelude::*;
19
20use log::{debug, trace, warn};
21
22use crate::Config;
23
24use crate::common::*;
25
26use crate::store::{Datastore, HashMapStore};
27use crate::table::{KNodeTable, NodeTable};
28
29mod operation;
30pub use operation::*;
31
32mod connect;
33pub use connect::*;
34
35mod locate;
36pub use locate::*;
37
38mod search;
39pub use search::*;
40
41mod store;
42pub use store::*;
43
44pub struct Dht<Id, Info, Data, ReqId, Table = KNodeTable<Id, Info>, Store = HashMapStore<Id, Data>>
45{
46    id: Id,
47
48    config: Config,
49    table: Table,
50    conn_mgr: Sender<(ReqId, Entry<Id, Info>, Request<Id, Data>)>,
51    datastore: Store,
52
53    operations: HashMap<ReqId, Operation<Id, Info, Data, ReqId>>,
54
55    _addr: PhantomData<Info>,
56    _data: PhantomData<Data>,
57    _req_id: PhantomData<ReqId>,
58}
59
60impl<Id, Info, Data, ReqId, Table, Store> Dht<Id, Info, Data, ReqId, Table, Store>
61where
62    Id: DatabaseId + Clone + Sized + Send + 'static,
63    Info: PartialEq + Clone + Sized + Debug + Send + 'static,
64    Data: PartialEq + Clone + Sized + Debug + Send + 'static,
65    ReqId: RequestId + Clone + Sized + Display + Debug + Send + 'static,
66    Table: NodeTable<Id, Info> + Send + 'static,
67    Store: Datastore<Id, Data> + Send + 'static,
68{
69    /// Create a new DHT with custom node table / data store implementation
70    pub fn custom(
71        id: Id,
72        config: Config,
73        conn_mgr: Sender<(ReqId, Entry<Id, Info>, Request<Id, Data>)>,
74        table: Table,
75        datastore: Store,
76    ) -> Dht<Id, Info, Data, ReqId, Table, Store> {
77        Dht {
78            id,
79            config,
80            table,
81            conn_mgr,
82            datastore,
83
84            operations: HashMap::new(),
85
86            _addr: PhantomData,
87            _data: PhantomData,
88            _req_id: PhantomData,
89        }
90    }
91
92    /// Receive and reply to requests
93    pub fn handle_req(
94        &mut self,
95        _req_id: ReqId,
96        from: &Entry<Id, Info>,
97        req: &Request<Id, Data>,
98    ) -> Result<Response<Id, Info, Data>, Error> {
99        // Build response
100        let resp = match req {
101            Request::Ping => Response::NoResult,
102            Request::FindNode(id) => {
103                let nodes = self.table.nearest(id, 0..self.config.k);
104                Response::NodesFound(id.clone(), nodes)
105            },
106            Request::FindValue(id) => {
107                // Lookup the value
108                if let Some(values) = self.datastore.find(id) {
109                    debug!("Found {} values for id: {:?}", values.len(), id);
110                    Response::ValuesFound(id.clone(), values)
111                } else {
112                    debug!("No values found, returning closer nodes for id: {:?}", id);
113                    let nodes = self.table.nearest(id, 0..self.config.k);
114                    Response::NodesFound(id.clone(), nodes)
115                }
116            },
117            Request::Store(id, value) => {
118                // Write value to local storage
119                let values = self.datastore.store(id, value);
120                // Reply to confirm write was completed
121                if values.len() != 0 {
122                    debug!("Stored {} values for id: {:?}", values.len(), id);
123                    Response::ValuesFound(id.clone(), values)
124                } else {
125                    debug!("Ignored values for id: {:?}", id);
126                    Response::NoResult
127                }
128            },
129        };
130
131        // Update record for sender
132        self.table.create_or_update(from);
133
134        Ok(resp)
135    }
136
137    // Receive responses and update internal state
138    pub fn handle_resp(
139        &mut self,
140        req_id: ReqId,
141        from: &Entry<Id, Info>,
142        resp: &Response<Id, Info, Data>,
143    ) -> Result<(), Error> {
144        // Locate matching operation
145        let mut op = match self.operations.remove(&req_id) {
146            Some(v) => v,
147            None => {
148                warn!("No matching operation for request id: {}", req_id);
149                return Ok(());
150            }
151        };
152
153        debug!(
154            "Operation {} response from {:?}: {}",
155            req_id, from.id(), resp
156        );
157        trace!("Response: {:?}", resp);
158
159        // Check request is expected / from a valid peer and update state
160        match op.nodes.get_mut(from.id()) {
161            Some((_e, s)) if *s != RequestState::Active => {
162                warn!("Operation {} unexpected response from: {:?}", req_id, from.id());
163                *s = RequestState::InvalidResponse;
164                return Ok(());
165            }
166            Some((_e, s)) if *s == RequestState::Active => {
167                *s = RequestState::Complete;
168            }
169            _ => (),
170        }
171
172        // Update global / DHT peer table
173        self.table.create_or_update(from);
174
175        // Add responding node to nodes
176        op.nodes.entry(from.id().clone())
177            .or_insert((from.clone(), RequestState::Complete));
178
179        // Handle incoming response message
180        let v = match &resp {
181            Response::NodesFound(id, entries) if id == &op.target => {
182                debug!("Operation {}, adding {} nodes to map", req_id, entries.len());
183                trace!("Entries: {:?}", entries);
184
185                for e in entries {
186                    debug!("Operation {}, add node {:?}", req_id, e.id());
187
188                    // Skip entries relating to ourself
189                    if e.id() == &self.id {
190                        continue;
191                    }
192
193                    // Update global nodetable
194                    self.table.create_or_update(e);
195
196                    // Skip adding responding node to operation again
197                    // (state is updated above)
198                    if e.id() == from.id() {
199                        continue;
200                    }
201
202                    // Insert new nodes into tracking
203                    // TODO: should we update op node table with responding node info?
204                    op.nodes
205                        .entry(e.id().clone())
206                        .or_insert((e.clone(), RequestState::Pending));
207                }
208
209                RequestState::Complete
210            }
211            Response::NodesFound(id, _entries) => {
212                debug!(
213                    "Operation {}, invalid nodes response: {:?} from: {:?} (id: {:?})",
214                    req_id, resp, from.id(), id
215                );
216                RequestState::InvalidResponse
217            }
218            Response::ValuesFound(id, values) if id == &op.target => {
219                debug!("Operation {}, adding {} values to map", req_id, values.len());
220                trace!("Values: {:?}", values);
221
222                // Add data to data list
223                op.data.insert(from.id().clone(), values.clone());
224
225                RequestState::Complete
226            }
227            Response::ValuesFound(id, _values) => {
228                debug!(
229                    "Operation {}, invalid values response from: {:?} (id: {:?})",
230                    req_id, from.id(), id
231                );
232                trace!("Invalid response: {:?}", resp);
233                RequestState::InvalidResponse
234            }
235            Response::NoResult => {
236                debug!("Operation {}, empty response from: {:?}", req_id, from);
237                RequestState::Complete
238            }
239        };
240
241        // Update operation node state
242        let e = op
243            .nodes
244            .entry(from.id().clone())
245            .and_modify(|(_e, s)| *s = v)
246            .or_insert((from.clone(), v));
247
248        debug!("update node {:?} state: {:?}", e.0.id(), e.1);
249
250        trace!("Operation state: {:?}", op);
251
252        // Replace operation
253        self.operations.insert(req_id, op);
254
255        return Ok(());
256    }
257
258    // Create a new operation
259    pub(crate) fn exec(
260        &mut self,
261        req_id: ReqId,
262        target: Id,
263        kind: OperationKind<Id, Info, Data>,
264    ) -> Result<(), Error> {
265        debug!("Registering operation id: {}", req_id);
266
267        // Create operation object
268        let op = Operation::new(req_id.clone(), target.clone(), kind);
269
270        // Register operation in tracking
271        // Actual execution happens in `Dht::update` methods.
272        self.operations.insert(req_id.clone(), op);
273
274        // Return OK
275        Ok(())
276    }
277
278    /// Update internal state
279    /// Usually this should be called via future `Dht::poll()`
280    pub fn update(&mut self) -> Result<(), Error> {
281        let mut req_sink = self.conn_mgr.clone();
282        let mut done = vec![];
283
284        //  For each currently tracked operation
285        for (req_id, op) in self.operations.iter_mut() {
286            // Generate request objects
287            let req = match &op.kind {
288                OperationKind::Connect(_tx) => Request::FindNode(op.target.clone()),
289                OperationKind::FindNode(_tx) => Request::FindNode(op.target.clone()),
290                OperationKind::FindValues(_tx) => Request::FindNode(op.target.clone()),
291                OperationKind::Store(_v, _tx) => Request::FindNode(op.target.clone()),
292            };
293
294            // Match on states
295            match op.state.clone() {
296                // Initialise new operation
297                OperationState::Init => {
298                    debug!("Operation {} ({}) start", req_id, &op.kind);
299
300                    // Identify nearest nodes if available
301                    let nearest: Vec<_> =
302                        self.table.nearest(&op.target, 0..self.config.concurrency);
303                    for e in &nearest {
304                        op.nodes
305                            .insert(e.id().clone(), (e.clone(), RequestState::Active));
306                    }
307
308                    // Build node array (required to include existing op.nodes)
309                    let mut nodes: Vec<_> =
310                        op.nodes.iter().map(|(_k, (n, _s))| n.clone()).collect();
311                    nodes.sort_by_key(|n| Id::xor(&op.target, n.id()));
312
313                    debug!(
314                        "Initiating {} operation ({}), sending {} request to {} peers",
315                        &op.kind, req_id, req, nodes.len()
316                    );
317
318                    // Issue appropriate request
319                    for e in nodes.iter() {
320                        trace!("Operation {} issuing {} to: {:?}", req_id, req, e.id());
321
322                        // TODO: handle sink errors?
323                        let _ = req_sink.try_send((req_id.clone(), e.clone(), req.clone()));
324                    }
325
326                    // Set to search state
327                    debug!("Operation {} entering searching state", req_id);
328                    op.state = OperationState::Searching(0)
329                }
330                // Awaiting response to connect message
331                OperationState::Connecting => {
332                    // Check for known nodes (connect responses)
333                    let nodes = op.nodes.clone();
334                    let mut known: Vec<_> = nodes.iter().collect();
335                    known.sort_by_key(|(k, _)| Id::xor(&op.target, k));
336
337                    let pending = known
338                        .iter()
339                        .filter(|(_key, (_e, s))| *s == RequestState::Pending)
340                        .count();
341
342                    // Check for expiry
343                    let search_timeout = self.config.search_timeout;
344                    let expired = Instant::now()
345                        .checked_duration_since(op.last_update)
346                        .map(|d| d > search_timeout)
347                        .unwrap_or(false);
348
349                    if nodes.len() > 0 {
350                        debug!(
351                            "Operation {} connect response received ({} peers)",
352                            req_id, pending
353                        );
354
355                        // Short-circuit if we didn't receive other peer information
356                        if pending > 0 {
357                            op.state = OperationState::Search(0);
358                        } else {
359                            op.state = OperationState::Done;
360                        }
361                    } else if expired {
362                        debug!("Operation {} connect timeout", req_id);
363                        op.state = OperationState::Done;
364                    }
365                }
366                // Currently searching / awaiting responses
367                OperationState::Searching(n) => {
368                    // Fetch known nodes
369                    let nodes = op.nodes.clone();
370                    let mut known: Vec<_> = nodes.iter().collect();
371                    known.sort_by_key(|(k, _)| Id::xor(&op.target, k));
372
373                    // Short-circuit to next search if active nodes have completed operations
374                    let active: Vec<_> = (&known[0..usize::min(known.len(), self.config.k)])
375                        .iter()
376                        .filter(|(_key, (_e, s))| *s == RequestState::Active)
377                        .collect();
378
379                    // Exit when no pending nodes remain within K bucket
380                    let pending: Vec<_> = (&known[0..usize::min(known.len(), self.config.k)])
381                        .iter()
382                        .filter(|(_key, (_e, s))| *s == RequestState::Pending)
383                        .collect();
384
385                    // Check for expiry
386                    let search_timeout = self.config.search_timeout;
387                    let expired = Instant::now()
388                        .checked_duration_since(op.last_update)
389                        .map(|d| d > search_timeout)
390                        .unwrap_or(false);
391
392                    trace!("active: {:?}", active);
393                    trace!("pending: {:?}", pending);
394                    trace!("data: {:?}", op.data);
395
396                    match (active.len(), pending.len(), expired, &op.kind) {
397                        // No active or pending nodes, all complete (this will basically never happen)
398                        (0, 0, _, _) => {
399                            debug!("Operation {} search complete!", req_id);
400                            op.state = OperationState::Request;
401                        }
402                        // No active nodes, all replies received, re-start search
403                        (0, _, _, _) => {
404                            debug!(
405                                "Operation {}, all responses received, re-starting search",
406                                req_id
407                            );
408                            op.state = OperationState::Search(n + 1);
409                        }
410                        // Search iteration timeout
411                        (_, _, true, _) => {
412                            debug!("Operation {} timeout at iteration {}", req_id, n);
413
414                            // TODO: Update active nodes to timed-out
415                            for (id, _n) in active {
416                                op.nodes
417                                    .entry((*id).clone())
418                                    .and_modify(|(_n, s)| *s = RequestState::Timeout);
419                            }
420
421                            op.state = OperationState::Search(n + 1);
422                        }
423                        _ => (),
424                    }
425                }
426                // Update a search iteration
427                OperationState::Search(n) => {
428                    // Check for max recursion
429                    if n > self.config.max_recursion {
430                        debug!("Reached recursion limit, aborting search {}", req_id);
431                        op.state = OperationState::Pending;
432                        continue;
433                    }
434
435                    debug!("Operation {} ({}) search iteration {}", req_id, &op.kind, n);
436
437                    // Locate next nearest nodes
438                    let own_id = self.id.clone();
439                    let mut nearest: Vec<_> = op
440                        .nodes
441                        .iter()
442                        .filter(|(k, (_n, s))| (*s == RequestState::Pending) & (*k != &own_id))
443                        .map(|(_k, (n, _s))| n.clone())
444                        .collect();
445
446                    debug!("Operation {} nearest: {:?}", req_id, nearest);
447
448                    // Sort and limit
449                    nearest.sort_by_key(|n| Id::xor(&op.target, n.id()));
450                    let n = usize::min(self.config.concurrency, nearest.len());
451
452                    // Exit search when we have no more requests to make
453                    if n == 0 {
454                        op.state = OperationState::Request;
455                    }
456
457                    // Launch next set of requests
458                    debug!(
459                        "Operation {} issuing search request: {:?} to: {:?}",
460                        req_id,
461                        req,
462                        &nearest[0..n]
463                    );
464                    for n in &nearest[0..n] {
465                        op.nodes
466                            .entry(n.id().clone())
467                            .and_modify(|(_n, s)| *s = RequestState::Active);
468
469                        // TODO: handle sink errors?
470                        let _ = req_sink.try_send((req_id.clone(), n.clone(), req.clone()));
471                    }
472
473                    // Update search state
474                    debug!("Operation {} entering searching state", req_id);
475                    op.state = OperationState::Searching(n);
476                }
477                // Issue find / store operation following search
478                OperationState::Request => {
479                    // TODO: should a search be a find all then query, or a find values with short-circuits?
480                    let req = match &op.kind {
481                        OperationKind::Store(v, _) => Request::Store(op.target.clone(), v.clone()),
482                        OperationKind::FindValues(_) => Request::FindValue(op.target.clone()), 
483                        _ => {
484                            debug!("Operation {} entering done state", req_id);
485                            op.state = OperationState::Done;
486                            continue;
487                        }
488                    };
489
490                    // Locate nearest responding nodes
491                    // TODO: this doesn't _need_ to be per search, could be from the global table?
492                    let own_id = self.id.clone();
493                    let mut nearest: Vec<_> = op
494                        .nodes
495                        .iter()
496                        .filter(|(k, (_n, s))| (*s == RequestState::Complete) & (*k != &own_id))
497                        .map(|(_k, (n, _s))| n.clone())
498                        .collect();
499
500                    // Sort and limit
501                    nearest.sort_by_key(|n| Id::xor(&op.target, n.id()));
502                    let range = 0..usize::min(self.config.concurrency, nearest.len());
503
504                    debug!(
505                        "Operation {} ({}) issuing {} request to {} peers",
506                        req_id,
507                        &op.kind,
508                        req,
509                        nearest[range.clone()].len()
510                    );
511
512                    for n in &nearest[range] {
513                        op.nodes
514                            .entry(n.id().clone())
515                            .and_modify(|(_n, s)| *s = RequestState::Active);
516
517                        // TODO: handle sink errors?
518                        let _ = req_sink.try_send((req_id.clone(), n.clone(), req.clone()));
519                    }
520
521                    op.state = OperationState::Pending;
522                }
523                // Currently awaiting request responses
524                OperationState::Pending => {
525                    // Fetch known nodes
526                    let nodes = op.nodes.clone();
527                    let mut known: Vec<_> = nodes.iter().collect();
528                    known.sort_by_key(|(k, _)| Id::xor(&op.target, k));
529
530                    // Exit when no pending nodes remain
531                    let active: Vec<_> = (&known[0..usize::min(known.len(), self.config.k)])
532                        .iter()
533                        .filter(|(_key, (_e, s))| *s == RequestState::Active)
534                        .collect();
535
536                    // Check for expiry
537                    let search_timeout = self.config.search_timeout;
538                    let expired = Instant::now()
539                        .checked_duration_since(op.last_update)
540                        .map(|d| d > search_timeout)
541                        .unwrap_or(false);
542
543                    if active.len() == 0 || expired {
544                        debug!("Operation {} ({}) entering done state", req_id, &op.kind);
545                        op.state = OperationState::Done;
546                    }
547                }
548                // Update completion state
549                OperationState::Done => {
550                    debug!("Operating {} ({}) done", req_id, &op.kind);
551
552                    match &op.kind {
553                        OperationKind::Connect(tx) => {
554                            let mut peers: Vec<_> = op
555                                .nodes
556                                .iter()
557                                .filter(|(_k, (_n, s))| *s == RequestState::Complete)
558                                .map(|(_k, (e, _s))| e.clone())
559                                .collect();
560
561                            let own_id = self.id.clone();
562                            peers.sort_by_key(|n| Id::xor(&own_id, n.id()));
563                            if peers.len() > 0 {
564                                tx.clone().try_send(Ok(peers)).unwrap();
565                            } else {
566                                tx.clone().try_send(Err(Error::NotFound)).unwrap();
567                            }
568                        }
569                        OperationKind::FindNode(tx) => {
570                            trace!("Found nodes: {:?}", op.nodes);
571                            match op.nodes.get(&op.target) {
572                                Some((n, _s)) => tx.clone().try_send(Ok(n.clone())).unwrap(),
573                                None => tx.clone().try_send(Err(Error::NotFound)).unwrap(),
574                            };
575                        }
576                        OperationKind::FindValues(tx) => {
577                            if op.data.len() > 0 {
578                                // Flatten out response data
579                                let mut flat_data: Vec<Data> =
580                                    op.data.iter().flat_map(|(_k, v)| v.clone()).collect();
581
582                                // Append any already known data
583                                if let Some(mut existing) = self.datastore.find(&op.target) {
584                                    flat_data.append(&mut existing);
585                                }
586
587                                // TODO: apply reducer?
588
589                                debug!("Operation {} values found: {:?}", req_id, flat_data);
590
591                                tx.clone().try_send(Ok(flat_data)).unwrap();
592                            } else {
593                                tx.clone().try_send(Err(Error::NotFound)).unwrap();
594                            }
595                        }
596                        OperationKind::Store(_values, tx) => {
597                            // `Store` responds with a `FoundData` object containing the stored data
598                            // this allows us to check `op.data` for the nodes at which data has been stored
599
600                            if op.data.len() > 0 {
601                                let flat_ids: Vec<_> =
602                                    op.data.iter().map(|(k, _v)| k.clone()).collect();
603                                let mut flat_nodes: Vec<_> = flat_ids
604                                    .iter()
605                                    .filter_map(|id| op.nodes.get(id).map(|(e, _s)| e.clone()))
606                                    .collect();
607                                flat_nodes.sort_by_key(|n| Id::xor(&op.target, n.id()));
608
609                                // TODO: check the stored _values_ too (as this may be reduced on the upstream side)
610
611                                debug!("Operation {} stored at {} peers", req_id, flat_ids.len());
612
613                                tx.clone().try_send(Ok(flat_nodes)).unwrap();
614                            } else {
615                                debug!("Operation {} store failed", req_id);
616                                tx.clone().try_send(Err(Error::NotFound)).unwrap();
617                            }
618                        }
619                    }
620
621                    // TODO: remove from tracking
622                    done.push(req_id.clone());
623                }
624            };
625        }
626
627        for req_id in done {
628            self.operations.remove(&req_id);
629        }
630
631        Ok(())
632    }
633
634    /// Refresh buckets and node table entries
635    #[cfg(nope)]
636    pub async fn refresh(&mut self) -> Result<(), ()> {
637        // TODO: send refresh to buckets that haven't been looked up recently
638        // How to track recently looked up / contacted buckets..?
639
640        // Evict "expired" nodes from buckets
641        // Message oldest node, if no response evict
642        // Maybe this could be implemented as a periodic ping and timeout instead?
643        let timeout = self.config.node_timeout;
644        let oldest: Vec<_> = self
645            .table
646            .iter_oldest()
647            .filter(move |o| {
648                if let Some(seen) = o.seen() {
649                    seen.add(timeout) < Instant::now()
650                } else {
651                    true
652                }
653            })
654            .collect();
655
656        let mut pings = Vec::with_capacity(oldest.len());
657
658        for o in oldest {
659            let mut t = self.table.clone();
660            let mut o = o.clone();
661
662            let mut conn = self.conn_mgr.clone();
663
664            let p = async move {
665                let res = conn
666                    .request(ReqId::generate(), o.clone(), Request::Ping)
667                    .await;
668                match res {
669                    Ok(_resp) => {
670                        debug!("[DHT refresh] updating node: {:?}", o);
671                        o.set_seen(Instant::now());
672                        t.create_or_update(&o);
673                    }
674                    Err(_e) => {
675                        debug!("[DHT refresh] expiring node: {:?}", o);
676                        t.remove_entry(o.id());
677                    }
678                }
679
680                ()
681            };
682
683            pings.push(p);
684        }
685
686        future::join_all(pings).await;
687
688        Ok(())
689    }
690
691    pub fn nodetable(&self) -> &Table {
692        &self.table
693    }
694
695    pub fn nodetable_mut(&mut self) -> &mut Table {
696        &mut self.table
697    }
698
699    pub fn datastore(&self) -> &Store {
700        &self.datastore
701    }
702
703    pub fn datastore_mut(&mut self) -> &mut Store {
704        &mut self.datastore
705    }
706
707    #[cfg(test)]
708    pub fn contains(&mut self, id: &Id) -> Option<Entry<Id, Info>> {
709        self.table.contains(id)
710    }
711}
712
713impl<Id, Info, Data, ReqId, Table, Store> Unpin for Dht<Id, Info, Data, ReqId, Table, Store> {}
714
715impl<Id, Info, Data, ReqId, Table, Store> Future for Dht<Id, Info, Data, ReqId, Table, Store>
716where
717    Id: DatabaseId + Clone + Sized + Send + 'static,
718    Info: PartialEq + Clone + Sized + Debug + Send + 'static,
719    Data: PartialEq + Clone + Sized + Debug + Send + 'static,
720    ReqId: RequestId + Clone + Sized + Display + Debug + Send + 'static,
721    Table: NodeTable<Id, Info> + Clone + Send + 'static,
722    Store: Datastore<Id, Data> + Clone + Send + 'static,
723{
724    type Output = Result<(), Error>;
725
726    // Poll calls internal update function
727    fn poll(mut self: Pin<&mut Self>, _ctx: &mut Context<'_>) -> Poll<Self::Output> {
728        let _ = self.update();
729
730        Poll::Pending
731    }
732}
733
734/// Helper macro to setup DHT instances for testing
735#[cfg(test)]
736#[macro_export]
737#[cfg(test)]
738macro_rules! mock_dht {
739    ($connector: ident, $root: ident, $dht:ident) => {
740        let mut config = Config::default();
741        config.concurrency = 2;
742        mock_dht!($connector, $root, $dht, config);
743    };
744    ($connector: ident, $root: ident, $dht:ident, $config:ident) => {
745        let mut $dht =
746            Dht::<[u8; 1], _, u64, u64>::standard($root.id().clone(), $config, $connector.clone());
747    };
748}
749
750#[cfg(test)]
751mod tests {
752    use std::clone::Clone;
753
754    extern crate futures;
755    use futures::channel::mpsc;
756
757    use super::*;
758    use crate::store::Datastore;
759
760    #[test]
761    fn test_receive_common() {
762        let root = Entry::new([0], 001);
763        let friend = Entry::new([1], 002);
764
765        let (tx, _rx) = mpsc::channel(10);
766        mock_dht!(tx, root, dht);
767
768        // Check node is unknown
769        assert!(dht.table.contains(friend.id()).is_none());
770
771        // Ping
772        assert_eq!(
773            dht.handle_req(1, &friend, &Request::Ping).unwrap(),
774            Response::NoResult,
775        );
776
777        // Adds node to appropriate k bucket
778        let friend1 = dht.table.contains(friend.id()).unwrap();
779
780        // Second ping
781        assert_eq!(
782            dht.handle_req(2, &friend, &Request::Ping).unwrap(),
783            Response::NoResult,
784        );
785
786        // Updates node in appropriate k bucket
787        let friend2 = dht.table.contains(friend.id()).unwrap();
788        assert_ne!(friend1.seen(), friend2.seen());
789    }
790
791    #[test]
792    fn test_receive_ping() {
793        let root = Entry::new([0], 001);
794        let friend = Entry::new([1], 002);
795
796        let (tx, _rx) = mpsc::channel(10);
797        mock_dht!(tx, root, dht);
798
799        // Ping
800        assert_eq!(
801            dht.handle_req(1, &friend, &Request::Ping).unwrap(),
802            Response::NoResult,
803        );
804    }
805
806    #[test]
807    fn test_receive_find_nodes() {
808        let root = Entry::new([0], 001);
809        let friend = Entry::new([1], 002);
810        let other = Entry::new([2], 003);
811
812        let (tx, _rx) = mpsc::channel(10);
813        mock_dht!(tx, root, dht);
814
815        // Add friend to known table
816        dht.table.create_or_update(&friend);
817
818        // FindNodes
819        assert_eq!(
820            dht.handle_req(1, &friend, &Request::FindNode(other.id().clone()))
821                .unwrap(),
822            Response::NodesFound(other.id().clone(), vec![friend.clone()]),
823        );
824    }
825
826    #[test]
827    fn test_receive_find_values() {
828        let root = Entry::new([0], 001);
829        let friend = Entry::new([1], 002);
830        let other = Entry::new([2], 003);
831
832        let (tx, _rx) = mpsc::channel(10);
833        mock_dht!(tx, root, dht);
834
835        // Add friend to known table
836        dht.table.create_or_update(&friend);
837
838        // FindValues (unknown, returns NodesFound)
839        assert_eq!(
840            dht.handle_req(1, &other, &Request::FindValue([201]))
841                .unwrap(),
842            Response::NodesFound([201], vec![friend.clone()]),
843        );
844
845        // Add value to store
846        dht.datastore.store(&[201], &vec![1337]);
847
848        // FindValues
849        assert_eq!(
850            dht.handle_req(2, &other, &Request::FindValue([201]))
851                .unwrap(),
852            Response::ValuesFound([201], vec![1337]),
853        );
854    }
855
856    #[test]
857    fn test_receive_store() {
858        let root = Entry::new([0], 001);
859        let friend = Entry::new([1], 002);
860
861        let (tx, _rx) = mpsc::channel(10);
862        mock_dht!(tx, root, dht);
863
864        // Store
865        assert_eq!(
866            dht.handle_req(1, &friend, &Request::Store([2], vec![1234]))
867                .unwrap(),
868            Response::ValuesFound([2], vec![1234]),
869        );
870
871        let v = dht.datastore.find(&[2]).expect("missing value");
872        assert_eq!(v, vec![1234]);
873    }
874
875    #[cfg(nope)]
876    #[test]
877    fn test_expire() {
878        let mut config = Config::default();
879        config.node_timeout = Duration::from_millis(200);
880
881        let root = Entry::new([0], 001);
882        let n1 = Entry::new([1], 002);
883        let n2 = Entry::new([2], 003);
884
885        let (tx, _rx) = mpsc::channel(10);
886        let c = config.clone();
887        mock_dht!(tx, root, dht, config);
888
889        // Add known nodes
890        dht.table.create_or_update(&n1);
891        dht.table.create_or_update(&n2);
892
893        // No timed out nodes
894        block_on(dht.refresh(())).unwrap();
895        connector.finalise();
896
897        std::thread::sleep(config.node_timeout * 2);
898
899        // Ok response
900        connector.expect(vec![
901            Mt::request(n1.clone(), Request::Ping, Ok((Response::NoResult, ()))),
902            Mt::request(n2.clone(), Request::Ping, Ok((Response::NoResult, ()))),
903        ]);
904        block_on(dht.refresh(())).unwrap();
905
906        assert!(dht.table.contains(n1.id()).is_some());
907        assert!(dht.table.contains(n2.id()).is_some());
908
909        connector.finalise();
910
911        std::thread::sleep(config.node_timeout * 2);
912
913        // No response (evict)
914        connector.expect(vec![
915            Mt::request(n1.clone(), Request::Ping, Ok((Response::NoResult, ()))),
916            Mt::request(n2.clone(), Request::Ping, Err(Error::Timeout)),
917        ]);
918        block_on(dht.refresh(())).unwrap();
919
920        assert!(dht.table.contains(n1.id()).is_some());
921        assert!(dht.table.contains(n2.id()).is_none());
922
923        // Check expectations are done
924        connector.finalise();
925    }
926}