1use bytes::BytesMut;
30use codec::UviBytes;
31use crate::dht_proto as proto;
32use crate::record::{self, Record};
33use futures::prelude::*;
34use asynchronous_codec::Framed;
35use tet_libp2p_core::{Multiaddr, PeerId};
36use tet_libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
37use prost::Message;
38use std::{borrow::Cow, convert::TryFrom, time::Duration};
39use std::{io, iter};
40use unsigned_varint::codec;
41use wasm_timer::Instant;
42
43pub const DEFAULT_PROTO_NAME: &[u8] = b"/ipfs/kad/1.0.0";
45
46pub const DEFAULT_MAX_PACKET_SIZE: usize = 16 * 1024;
48
49#[derive(Copy, Clone, PartialEq, Eq, Debug, Hash)]
51pub enum KadConnectionType {
52 NotConnected = 0,
54 Connected = 1,
56 CanConnect = 2,
58 CannotConnect = 3,
60}
61
62impl From<proto::message::ConnectionType> for KadConnectionType {
63 fn from(raw: proto::message::ConnectionType) -> KadConnectionType {
64 use proto::message::ConnectionType::*;
65 match raw {
66 NotConnected => KadConnectionType::NotConnected,
67 Connected => KadConnectionType::Connected,
68 CanConnect => KadConnectionType::CanConnect,
69 CannotConnect => KadConnectionType::CannotConnect,
70 }
71 }
72}
73
74impl Into<proto::message::ConnectionType> for KadConnectionType {
75 fn into(self) -> proto::message::ConnectionType {
76 use proto::message::ConnectionType::*;
77 match self {
78 KadConnectionType::NotConnected => NotConnected,
79 KadConnectionType::Connected => Connected,
80 KadConnectionType::CanConnect => CanConnect,
81 KadConnectionType::CannotConnect => CannotConnect,
82 }
83 }
84}
85
86#[derive(Debug, Clone, PartialEq, Eq)]
88pub struct KadPeer {
89 pub node_id: PeerId,
91 pub multiaddrs: Vec<Multiaddr>,
93 pub connection_ty: KadConnectionType,
95}
96
97impl TryFrom<proto::message::Peer> for KadPeer {
99 type Error = io::Error;
100
101 fn try_from(peer: proto::message::Peer) -> Result<KadPeer, Self::Error> {
102 let node_id = PeerId::from_bytes(&peer.id)
105 .map_err(|_| invalid_data("invalid peer id"))?;
106
107 let mut addrs = Vec::with_capacity(peer.addrs.len());
108 for addr in peer.addrs.into_iter() {
109 let as_ma = Multiaddr::try_from(addr).map_err(invalid_data)?;
110 addrs.push(as_ma);
111 }
112 debug_assert_eq!(addrs.len(), addrs.capacity());
113
114 let connection_ty = proto::message::ConnectionType::from_i32(peer.connection)
115 .ok_or_else(|| invalid_data("unknown connection type"))?
116 .into();
117
118 Ok(KadPeer {
119 node_id,
120 multiaddrs: addrs,
121 connection_ty
122 })
123 }
124}
125
126impl Into<proto::message::Peer> for KadPeer {
127 fn into(self) -> proto::message::Peer {
128 proto::message::Peer {
129 id: self.node_id.to_bytes(),
130 addrs: self.multiaddrs.into_iter().map(|a| a.to_vec()).collect(),
131 connection: {
132 let ct: proto::message::ConnectionType = self.connection_ty.into();
133 ct as i32
134 }
135 }
136 }
137}
138
139#[derive(Debug, Clone)]
145pub struct KademliaProtocolConfig {
146 protocol_name: Cow<'static, [u8]>,
147 max_packet_size: usize,
149}
150
151impl KademliaProtocolConfig {
152 pub fn protocol_name(&self) -> &[u8] {
154 &self.protocol_name
155 }
156
157 pub fn set_protocol_name(&mut self, name: impl Into<Cow<'static, [u8]>>) {
160 self.protocol_name = name.into();
161 }
162
163 pub fn set_max_packet_size(&mut self, size: usize) {
165 self.max_packet_size = size;
166 }
167}
168
169impl Default for KademliaProtocolConfig {
170 fn default() -> Self {
171 KademliaProtocolConfig {
172 protocol_name: Cow::Borrowed(DEFAULT_PROTO_NAME),
173 max_packet_size: DEFAULT_MAX_PACKET_SIZE,
174 }
175 }
176}
177
178impl UpgradeInfo for KademliaProtocolConfig {
179 type Info = Cow<'static, [u8]>;
180 type InfoIter = iter::Once<Self::Info>;
181
182 fn protocol_info(&self) -> Self::InfoIter {
183 iter::once(self.protocol_name.clone())
184 }
185}
186
187impl<C> InboundUpgrade<C> for KademliaProtocolConfig
188where
189 C: AsyncRead + AsyncWrite + Unpin,
190{
191 type Output = KadInStreamSink<C>;
192 type Future = future::Ready<Result<Self::Output, io::Error>>;
193 type Error = io::Error;
194
195 fn upgrade_inbound(self, incoming: C, _: Self::Info) -> Self::Future {
196 let mut codec = UviBytes::default();
197 codec.set_max_len(self.max_packet_size);
198
199 future::ok(
200 Framed::new(incoming, codec)
201 .err_into()
202 .with::<_, _, fn(_) -> _, _>(|response| {
203 let proto_struct = resp_msg_to_proto(response);
204 let mut buf = Vec::with_capacity(proto_struct.encoded_len());
205 proto_struct.encode(&mut buf).expect("Vec<u8> provides capacity as needed");
206 future::ready(Ok(io::Cursor::new(buf)))
207 })
208 .and_then::<_, fn(_) -> _>(|bytes| {
209 let request = match proto::Message::decode(bytes) {
210 Ok(r) => r,
211 Err(err) => return future::ready(Err(err.into()))
212 };
213 future::ready(proto_to_req_msg(request))
214 }),
215 )
216 }
217}
218
219impl<C> OutboundUpgrade<C> for KademliaProtocolConfig
220where
221 C: AsyncRead + AsyncWrite + Unpin,
222{
223 type Output = KadOutStreamSink<C>;
224 type Future = future::Ready<Result<Self::Output, io::Error>>;
225 type Error = io::Error;
226
227 fn upgrade_outbound(self, incoming: C, _: Self::Info) -> Self::Future {
228 let mut codec = UviBytes::default();
229 codec.set_max_len(self.max_packet_size);
230
231 future::ok(
232 Framed::new(incoming, codec)
233 .err_into()
234 .with::<_, _, fn(_) -> _, _>(|request| {
235 let proto_struct = req_msg_to_proto(request);
236 let mut buf = Vec::with_capacity(proto_struct.encoded_len());
237 proto_struct.encode(&mut buf).expect("Vec<u8> provides capacity as needed");
238 future::ready(Ok(io::Cursor::new(buf)))
239 })
240 .and_then::<_, fn(_) -> _>(|bytes| {
241 let response = match proto::Message::decode(bytes) {
242 Ok(r) => r,
243 Err(err) => return future::ready(Err(err.into()))
244 };
245 future::ready(proto_to_resp_msg(response))
246 }),
247 )
248 }
249}
250
251pub type KadInStreamSink<S> = KadStreamSink<S, KadResponseMsg, KadRequestMsg>;
253
254pub type KadOutStreamSink<S> = KadStreamSink<S, KadRequestMsg, KadResponseMsg>;
256
257pub type KadStreamSink<S, A, B> = stream::AndThen<
258 sink::With<
259 stream::ErrInto<Framed<S, UviBytes<io::Cursor<Vec<u8>>>>, io::Error>,
260 io::Cursor<Vec<u8>>,
261 A,
262 future::Ready<Result<io::Cursor<Vec<u8>>, io::Error>>,
263 fn(A) -> future::Ready<Result<io::Cursor<Vec<u8>>, io::Error>>,
264 >,
265 future::Ready<Result<B, io::Error>>,
266 fn(BytesMut) -> future::Ready<Result<B, io::Error>>,
267>;
268
269#[derive(Debug, Clone, PartialEq, Eq)]
271pub enum KadRequestMsg {
272 Ping,
274
275 FindNode {
278 key: Vec<u8>,
280 },
281
282 GetProviders {
285 key: record::Key,
287 },
288
289 AddProvider {
291 key: record::Key,
293 provider: KadPeer,
295 },
296
297 GetValue {
299 key: record::Key,
301 },
302
303 PutValue {
305 record: Record,
306 }
307}
308
309#[derive(Debug, Clone, PartialEq, Eq)]
311pub enum KadResponseMsg {
312 Pong,
314
315 FindNode {
317 closer_peers: Vec<KadPeer>,
319 },
320
321 GetProviders {
323 closer_peers: Vec<KadPeer>,
325 provider_peers: Vec<KadPeer>,
327 },
328
329 GetValue {
331 record: Option<Record>,
333 closer_peers: Vec<KadPeer>,
335 },
336
337 PutValue {
339 key: record::Key,
341 value: Vec<u8>,
343 },
344}
345
346fn req_msg_to_proto(kad_msg: KadRequestMsg) -> proto::Message {
348 match kad_msg {
349 KadRequestMsg::Ping => proto::Message {
350 r#type: proto::message::MessageType::Ping as i32,
351 .. proto::Message::default()
352 },
353 KadRequestMsg::FindNode { key } => proto::Message {
354 r#type: proto::message::MessageType::FindNode as i32,
355 key,
356 cluster_level_raw: 10,
357 .. proto::Message::default()
358 },
359 KadRequestMsg::GetProviders { key } => proto::Message {
360 r#type: proto::message::MessageType::GetProviders as i32,
361 key: key.to_vec(),
362 cluster_level_raw: 10,
363 .. proto::Message::default()
364 },
365 KadRequestMsg::AddProvider { key, provider } => proto::Message {
366 r#type: proto::message::MessageType::AddProvider as i32,
367 cluster_level_raw: 10,
368 key: key.to_vec(),
369 provider_peers: vec![provider.into()],
370 .. proto::Message::default()
371 },
372 KadRequestMsg::GetValue { key } => proto::Message {
373 r#type: proto::message::MessageType::GetValue as i32,
374 cluster_level_raw: 10,
375 key: key.to_vec(),
376 .. proto::Message::default()
377 },
378 KadRequestMsg::PutValue { record } => proto::Message {
379 r#type: proto::message::MessageType::PutValue as i32,
380 record: Some(record_to_proto(record)),
381 .. proto::Message::default()
382 }
383 }
384}
385
386fn resp_msg_to_proto(kad_msg: KadResponseMsg) -> proto::Message {
388 match kad_msg {
389 KadResponseMsg::Pong => proto::Message {
390 r#type: proto::message::MessageType::Ping as i32,
391 .. proto::Message::default()
392 },
393 KadResponseMsg::FindNode { closer_peers } => proto::Message {
394 r#type: proto::message::MessageType::FindNode as i32,
395 cluster_level_raw: 9,
396 closer_peers: closer_peers.into_iter().map(KadPeer::into).collect(),
397 .. proto::Message::default()
398 },
399 KadResponseMsg::GetProviders { closer_peers, provider_peers } => proto::Message {
400 r#type: proto::message::MessageType::GetProviders as i32,
401 cluster_level_raw: 9,
402 closer_peers: closer_peers.into_iter().map(KadPeer::into).collect(),
403 provider_peers: provider_peers.into_iter().map(KadPeer::into).collect(),
404 .. proto::Message::default()
405 },
406 KadResponseMsg::GetValue { record, closer_peers } => proto::Message {
407 r#type: proto::message::MessageType::GetValue as i32,
408 cluster_level_raw: 9,
409 closer_peers: closer_peers.into_iter().map(KadPeer::into).collect(),
410 record: record.map(record_to_proto),
411 .. proto::Message::default()
412 },
413 KadResponseMsg::PutValue { key, value } => proto::Message {
414 r#type: proto::message::MessageType::PutValue as i32,
415 key: key.to_vec(),
416 record: Some(proto::Record {
417 key: key.to_vec(),
418 value,
419 .. proto::Record::default()
420 }),
421 .. proto::Message::default()
422 }
423 }
424}
425
426fn proto_to_req_msg(message: proto::Message) -> Result<KadRequestMsg, io::Error> {
430 let msg_type = proto::message::MessageType::from_i32(message.r#type)
431 .ok_or_else(|| invalid_data(format!("unknown message type: {}", message.r#type)))?;
432
433 match msg_type {
434 proto::message::MessageType::Ping => Ok(KadRequestMsg::Ping),
435 proto::message::MessageType::PutValue => {
436 let record = record_from_proto(message.record.unwrap_or_default())?;
437 Ok(KadRequestMsg::PutValue { record })
438 }
439 proto::message::MessageType::GetValue => {
440 Ok(KadRequestMsg::GetValue { key: record::Key::from(message.key) })
441 }
442 proto::message::MessageType::FindNode => {
443 Ok(KadRequestMsg::FindNode { key: message.key })
444 }
445 proto::message::MessageType::GetProviders => {
446 Ok(KadRequestMsg::GetProviders { key: record::Key::from(message.key)})
447 }
448 proto::message::MessageType::AddProvider => {
449 let provider = message.provider_peers
453 .into_iter()
454 .find_map(|peer| KadPeer::try_from(peer).ok());
455
456 if let Some(provider) = provider {
457 let key = record::Key::from(message.key);
458 Ok(KadRequestMsg::AddProvider { key, provider })
459 } else {
460 Err(invalid_data("AddProvider message with no valid peer."))
461 }
462 }
463 }
464}
465
466fn proto_to_resp_msg(message: proto::Message) -> Result<KadResponseMsg, io::Error> {
470 let msg_type = proto::message::MessageType::from_i32(message.r#type)
471 .ok_or_else(|| invalid_data(format!("unknown message type: {}", message.r#type)))?;
472
473 match msg_type {
474 proto::message::MessageType::Ping => Ok(KadResponseMsg::Pong),
475 proto::message::MessageType::GetValue => {
476 let record =
477 if let Some(r) = message.record {
478 Some(record_from_proto(r)?)
479 } else {
480 None
481 };
482
483 let closer_peers = message.closer_peers.into_iter()
484 .filter_map(|peer| KadPeer::try_from(peer).ok())
485 .collect();
486
487 Ok(KadResponseMsg::GetValue { record, closer_peers })
488 }
489
490 proto::message::MessageType::FindNode => {
491 let closer_peers = message.closer_peers.into_iter()
492 .filter_map(|peer| KadPeer::try_from(peer).ok())
493 .collect();
494
495 Ok(KadResponseMsg::FindNode { closer_peers })
496 }
497
498 proto::message::MessageType::GetProviders => {
499 let closer_peers = message.closer_peers.into_iter()
500 .filter_map(|peer| KadPeer::try_from(peer).ok())
501 .collect();
502
503 let provider_peers = message.provider_peers.into_iter()
504 .filter_map(|peer| KadPeer::try_from(peer).ok())
505 .collect();
506
507 Ok(KadResponseMsg::GetProviders {
508 closer_peers,
509 provider_peers,
510 })
511 }
512
513 proto::message::MessageType::PutValue => {
514 let key = record::Key::from(message.key);
515 let rec = message.record.ok_or_else(|| {
516 invalid_data("received PutValue message with no record")
517 })?;
518
519 Ok(KadResponseMsg::PutValue {
520 key,
521 value: rec.value
522 })
523 }
524
525 proto::message::MessageType::AddProvider =>
526 Err(invalid_data("received an unexpected AddProvider message"))
527 }
528}
529
530fn record_from_proto(record: proto::Record) -> Result<Record, io::Error> {
531 let key = record::Key::from(record.key);
532 let value = record.value;
533
534 let publisher =
535 if !record.publisher.is_empty() {
536 PeerId::from_bytes(&record.publisher)
537 .map(Some)
538 .map_err(|_| invalid_data("Invalid publisher peer ID."))?
539 } else {
540 None
541 };
542
543 let expires =
544 if record.ttl > 0 {
545 Some(Instant::now() + Duration::from_secs(record.ttl as u64))
546 } else {
547 None
548 };
549
550 Ok(Record { key, value, publisher, expires })
551}
552
553fn record_to_proto(record: Record) -> proto::Record {
554 proto::Record {
555 key: record.key.to_vec(),
556 value: record.value,
557 publisher: record.publisher.map(|id| id.to_bytes()).unwrap_or_default(),
558 ttl: record.expires
559 .map(|t| {
560 let now = Instant::now();
561 if t > now {
562 (t - now).as_secs() as u32
563 } else {
564 1 }
566 })
567 .unwrap_or(0),
568 time_received: String::new()
569 }
570}
571
572fn invalid_data<E>(e: E) -> io::Error
574where
575 E: Into<Box<dyn std::error::Error + Send + Sync>>
576{
577 io::Error::new(io::ErrorKind::InvalidData, e)
578}
579
580#[cfg(test)]
581mod tests {
582
583 }