1use bytes::BytesMut;
30use codec::UviBytes;
31use crate::dht_proto as proto;
32use crate::record::{self, Record};
33use futures::prelude::*;
34use asynchronous_codec::Framed;
35use libp2p_core::{Multiaddr, PeerId};
36use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
37use prost::Message;
38use std::{borrow::Cow, convert::TryFrom, time::Duration};
39use std::{io, iter};
40use unsigned_varint::codec;
41use wasm_timer::Instant;
42
43use derivative::Derivative;
44use libp2p_core::identity::PublicKey;
45use trust_graph::{Certificate, Trust};
46
47pub const DEFAULT_PROTO_NAME: &[u8] = b"/ipfs/kad/1.0.0";
49
50pub const DEFAULT_MAX_PACKET_SIZE: usize = 16 * 1024;
52
53#[derive(Copy, Clone, PartialEq, Eq, Debug, Hash)]
55pub enum KadConnectionType {
56 NotConnected = 0,
58 Connected = 1,
60 CanConnect = 2,
62 CannotConnect = 3,
64}
65
66impl From<proto::message::ConnectionType> for KadConnectionType {
67 fn from(raw: proto::message::ConnectionType) -> KadConnectionType {
68 use proto::message::ConnectionType::*;
69 match raw {
70 NotConnected => KadConnectionType::NotConnected,
71 Connected => KadConnectionType::Connected,
72 CanConnect => KadConnectionType::CanConnect,
73 CannotConnect => KadConnectionType::CannotConnect,
74 }
75 }
76}
77
78impl Into<proto::message::ConnectionType> for KadConnectionType {
79 fn into(self) -> proto::message::ConnectionType {
80 use proto::message::ConnectionType::*;
81 match self {
82 KadConnectionType::NotConnected => NotConnected,
83 KadConnectionType::Connected => Connected,
84 KadConnectionType::CanConnect => CanConnect,
85 KadConnectionType::CannotConnect => CannotConnect,
86 }
87 }
88}
89
90#[derive(Derivative)]
92#[derivative(Debug, Clone, PartialEq, Eq)]
93pub struct KadPeer {
94 #[derivative(Debug="ignore")]
95 pub public_key: PublicKey,
96 pub node_id: PeerId,
98 pub multiaddrs: Vec<Multiaddr>,
100 pub connection_ty: KadConnectionType,
102 pub certificates: Vec<Certificate>,
103}
104
105impl TryFrom<proto::message::Peer> for KadPeer {
107 type Error = io::Error;
108
109 fn try_from(peer: proto::message::Peer) -> Result<KadPeer, Self::Error> {
110 let node_id = PeerId::from_bytes(&peer.id)
113 .map_err(|_| invalid_data("invalid peer id"))?;
114
115 let mut addrs = Vec::with_capacity(peer.addrs.len());
116 for addr in peer.addrs.into_iter() {
117 let as_ma = Multiaddr::try_from(addr).map_err(invalid_data)?;
118 addrs.push(as_ma);
119 }
120 debug_assert_eq!(addrs.len(), addrs.capacity());
121
122 let connection_ty = proto::message::ConnectionType::from_i32(peer.connection)
123 .ok_or_else(|| invalid_data("unknown connection type"))?
124 .into();
125
126 let public_key = PublicKey::from_protobuf_encoding(peer.public_key.as_slice())
127 .map_err(|e|
128 invalid_data(format!("invalid public key: {}", e).as_str())
129 )?;
130
131
132 let mut certificates = Vec::with_capacity(peer.certificates.len());
133 for cert in peer.certificates.into_iter() {
134 let mut chain = Vec::with_capacity(cert.chain.len());
135 for trust in cert.chain.into_iter() {
136 let issued_for = fluence_identity::PublicKey::decode(&trust.issued_for)
137 .map_err(|e|
138 invalid_data(format!("invalid issued_for: {}", e).as_str())
139 )?;
140 let expires_at: Duration = Duration::from_secs(trust.expires_at_secs);
141 let issued_at: Duration = Duration::from_secs(trust.issued_at_secs);
142 let signature = fluence_identity::Signature::decode(trust.signature)
143 .map_err(|e|
144 invalid_data(format!("invalid signature: {}", e).as_str())
145 )?;
146
147 let trust = Trust::new(issued_for, expires_at, issued_at, signature);
148 chain.push(trust);
149 }
150 certificates.push(Certificate::new_unverified(chain));
151 }
152
153 Ok(KadPeer {
154 public_key,
155 node_id,
156 multiaddrs: addrs,
157 connection_ty,
158 certificates
159 })
160 }
161}
162
163impl Into<proto::message::Peer> for KadPeer {
164 fn into(self) -> proto::message::Peer {
165 let certificates = self.certificates.into_iter().map(|cert|
166 proto::Certificate {
167 chain: cert.chain.into_iter().map(|trust| {
168 proto::Trust {
169 issued_for: trust.issued_for.encode(),
170 expires_at_secs: trust.expires_at.as_secs(),
171 signature: trust.signature.encode(),
172 issued_at_secs: trust.issued_at.as_secs(),
173 }
174 }).collect(),
175 }
176 ).collect();
177
178 proto::message::Peer {
179 id: self.node_id.to_bytes(),
180 addrs: self.multiaddrs.into_iter().map(|a| a.to_vec()).collect(),
181 connection: {
182 let ct: proto::message::ConnectionType = self.connection_ty.into();
183 ct as i32
184 },
185 public_key: self.public_key.into_protobuf_encoding(),
186 certificates
187 }
188 }
189}
190
191#[derive(Debug, Clone)]
197pub struct KademliaProtocolConfig {
198 protocol_name: Cow<'static, [u8]>,
199 max_packet_size: usize,
201}
202
203impl KademliaProtocolConfig {
204 pub fn protocol_name(&self) -> &[u8] {
206 &self.protocol_name
207 }
208
209 pub fn set_protocol_name(&mut self, name: impl Into<Cow<'static, [u8]>>) {
212 self.protocol_name = name.into();
213 }
214
215 pub fn set_max_packet_size(&mut self, size: usize) {
217 self.max_packet_size = size;
218 }
219}
220
221impl Default for KademliaProtocolConfig {
222 fn default() -> Self {
223 KademliaProtocolConfig {
224 protocol_name: Cow::Borrowed(DEFAULT_PROTO_NAME),
225 max_packet_size: DEFAULT_MAX_PACKET_SIZE,
226 }
227 }
228}
229
230impl UpgradeInfo for KademliaProtocolConfig {
231 type Info = Cow<'static, [u8]>;
232 type InfoIter = iter::Once<Self::Info>;
233
234 fn protocol_info(&self) -> Self::InfoIter {
235 iter::once(self.protocol_name.clone())
236 }
237}
238
239impl<C> InboundUpgrade<C> for KademliaProtocolConfig
240where
241 C: AsyncRead + AsyncWrite + Unpin,
242{
243 type Output = KadInStreamSink<C>;
244 type Future = future::Ready<Result<Self::Output, io::Error>>;
245 type Error = io::Error;
246
247 fn upgrade_inbound(self, incoming: C, _: Self::Info) -> Self::Future {
248 let mut codec = UviBytes::default();
249 codec.set_max_len(self.max_packet_size);
250
251 future::ok(
252 Framed::new(incoming, codec)
253 .err_into()
254 .with::<_, _, fn(_) -> _, _>(|response| {
255 let proto_struct = resp_msg_to_proto(response);
256 let mut buf = Vec::with_capacity(proto_struct.encoded_len());
257 proto_struct.encode(&mut buf).expect("Vec<u8> provides capacity as needed");
258 future::ready(Ok(io::Cursor::new(buf)))
259 })
260 .and_then::<_, fn(_) -> _>(|bytes| {
261 let request = match proto::Message::decode(bytes) {
262 Ok(r) => r,
263 Err(err) => return future::ready(Err(err.into()))
264 };
265 future::ready(proto_to_req_msg(request))
266 }),
267 )
268 }
269}
270
271impl<C> OutboundUpgrade<C> for KademliaProtocolConfig
272where
273 C: AsyncRead + AsyncWrite + Unpin,
274{
275 type Output = KadOutStreamSink<C>;
276 type Future = future::Ready<Result<Self::Output, io::Error>>;
277 type Error = io::Error;
278
279 fn upgrade_outbound(self, incoming: C, _: Self::Info) -> Self::Future {
280 let mut codec = UviBytes::default();
281 codec.set_max_len(self.max_packet_size);
282
283 future::ok(
284 Framed::new(incoming, codec)
285 .err_into()
286 .with::<_, _, fn(_) -> _, _>(|request| {
287 let proto_struct = req_msg_to_proto(request);
288 let mut buf = Vec::with_capacity(proto_struct.encoded_len());
289 proto_struct.encode(&mut buf).expect("Vec<u8> provides capacity as needed");
290 future::ready(Ok(io::Cursor::new(buf)))
291 })
292 .and_then::<_, fn(_) -> _>(|bytes| {
293 let response = match proto::Message::decode(bytes) {
294 Ok(r) => r,
295 Err(err) => return future::ready(Err(err.into()))
296 };
297 future::ready(proto_to_resp_msg(response))
298 }),
299 )
300 }
301}
302
303pub type KadInStreamSink<S> = KadStreamSink<S, KadResponseMsg, KadRequestMsg>;
305
306pub type KadOutStreamSink<S> = KadStreamSink<S, KadRequestMsg, KadResponseMsg>;
308
309pub type KadStreamSink<S, A, B> = stream::AndThen<
310 sink::With<
311 stream::ErrInto<Framed<S, UviBytes<io::Cursor<Vec<u8>>>>, io::Error>,
312 io::Cursor<Vec<u8>>,
313 A,
314 future::Ready<Result<io::Cursor<Vec<u8>>, io::Error>>,
315 fn(A) -> future::Ready<Result<io::Cursor<Vec<u8>>, io::Error>>,
316 >,
317 future::Ready<Result<B, io::Error>>,
318 fn(BytesMut) -> future::Ready<Result<B, io::Error>>,
319>;
320
321#[derive(Debug, Clone, PartialEq, Eq)]
323pub enum KadRequestMsg {
324 Ping,
326
327 FindNode {
330 key: Vec<u8>,
332 },
333
334 GetProviders {
337 key: record::Key,
339 },
340
341 AddProvider {
343 key: record::Key,
345 provider: KadPeer,
347 },
348
349 GetValue {
351 key: record::Key,
353 },
354
355 PutValue {
357 record: Record,
358 }
359}
360
361#[derive(Debug, Clone, PartialEq, Eq)]
363pub enum KadResponseMsg {
364 Pong,
366
367 FindNode {
369 closer_peers: Vec<KadPeer>,
371 },
372
373 GetProviders {
375 closer_peers: Vec<KadPeer>,
377 provider_peers: Vec<KadPeer>,
379 },
380
381 GetValue {
383 record: Option<Record>,
385 closer_peers: Vec<KadPeer>,
387 },
388
389 PutValue {
391 key: record::Key,
393 value: Vec<u8>,
395 },
396}
397
398fn req_msg_to_proto(kad_msg: KadRequestMsg) -> proto::Message {
400 match kad_msg {
401 KadRequestMsg::Ping => proto::Message {
402 r#type: proto::message::MessageType::Ping as i32,
403 .. proto::Message::default()
404 },
405 KadRequestMsg::FindNode { key } => proto::Message {
406 r#type: proto::message::MessageType::FindNode as i32,
407 key,
408 cluster_level_raw: 10,
409 .. proto::Message::default()
410 },
411 KadRequestMsg::GetProviders { key } => proto::Message {
412 r#type: proto::message::MessageType::GetProviders as i32,
413 key: key.to_vec(),
414 cluster_level_raw: 10,
415 .. proto::Message::default()
416 },
417 KadRequestMsg::AddProvider { key, provider } => proto::Message {
418 r#type: proto::message::MessageType::AddProvider as i32,
419 cluster_level_raw: 10,
420 key: key.to_vec(),
421 provider_peers: vec![provider.into()],
422 .. proto::Message::default()
423 },
424 KadRequestMsg::GetValue { key } => proto::Message {
425 r#type: proto::message::MessageType::GetValue as i32,
426 cluster_level_raw: 10,
427 key: key.to_vec(),
428 .. proto::Message::default()
429 },
430 KadRequestMsg::PutValue { record } => proto::Message {
431 r#type: proto::message::MessageType::PutValue as i32,
432 record: Some(record_to_proto(record)),
433 .. proto::Message::default()
434 }
435 }
436}
437
438fn resp_msg_to_proto(kad_msg: KadResponseMsg) -> proto::Message {
440 match kad_msg {
441 KadResponseMsg::Pong => proto::Message {
442 r#type: proto::message::MessageType::Ping as i32,
443 .. proto::Message::default()
444 },
445 KadResponseMsg::FindNode { closer_peers } => proto::Message {
446 r#type: proto::message::MessageType::FindNode as i32,
447 cluster_level_raw: 9,
448 closer_peers: closer_peers.into_iter().map(KadPeer::into).collect(),
449 .. proto::Message::default()
450 },
451 KadResponseMsg::GetProviders { closer_peers, provider_peers } => proto::Message {
452 r#type: proto::message::MessageType::GetProviders as i32,
453 cluster_level_raw: 9,
454 closer_peers: closer_peers.into_iter().map(KadPeer::into).collect(),
455 provider_peers: provider_peers.into_iter().map(KadPeer::into).collect(),
456 .. proto::Message::default()
457 },
458 KadResponseMsg::GetValue { record, closer_peers } => proto::Message {
459 r#type: proto::message::MessageType::GetValue as i32,
460 cluster_level_raw: 9,
461 closer_peers: closer_peers.into_iter().map(KadPeer::into).collect(),
462 record: record.map(record_to_proto),
463 .. proto::Message::default()
464 },
465 KadResponseMsg::PutValue { key, value } => proto::Message {
466 r#type: proto::message::MessageType::PutValue as i32,
467 key: key.to_vec(),
468 record: Some(proto::Record {
469 key: key.to_vec(),
470 value,
471 .. proto::Record::default()
472 }),
473 .. proto::Message::default()
474 }
475 }
476}
477
478fn proto_to_req_msg(message: proto::Message) -> Result<KadRequestMsg, io::Error> {
482 let msg_type = proto::message::MessageType::from_i32(message.r#type)
483 .ok_or_else(|| invalid_data(format!("unknown message type: {}", message.r#type)))?;
484
485 match msg_type {
486 proto::message::MessageType::Ping => Ok(KadRequestMsg::Ping),
487 proto::message::MessageType::PutValue => {
488 let record = record_from_proto(message.record.unwrap_or_default())?;
489 Ok(KadRequestMsg::PutValue { record })
490 }
491 proto::message::MessageType::GetValue => {
492 Ok(KadRequestMsg::GetValue { key: record::Key::from(message.key) })
493 }
494 proto::message::MessageType::FindNode => {
495 Ok(KadRequestMsg::FindNode { key: message.key })
496 }
497 proto::message::MessageType::GetProviders => {
498 Ok(KadRequestMsg::GetProviders { key: record::Key::from(message.key)})
499 }
500 proto::message::MessageType::AddProvider => {
501 let provider = message.provider_peers
505 .into_iter()
506 .find_map(|peer| KadPeer::try_from(peer).ok());
507
508 if let Some(provider) = provider {
509 let key = record::Key::from(message.key);
510 Ok(KadRequestMsg::AddProvider { key, provider })
511 } else {
512 Err(invalid_data("AddProvider message with no valid peer."))
513 }
514 }
515 }
516}
517
518fn proto_to_resp_msg(message: proto::Message) -> Result<KadResponseMsg, io::Error> {
522 let msg_type = proto::message::MessageType::from_i32(message.r#type)
523 .ok_or_else(|| invalid_data(format!("unknown message type: {}", message.r#type)))?;
524
525 match msg_type {
526 proto::message::MessageType::Ping => Ok(KadResponseMsg::Pong),
527 proto::message::MessageType::GetValue => {
528 let record =
529 if let Some(r) = message.record {
530 Some(record_from_proto(r)?)
531 } else {
532 None
533 };
534
535 let closer_peers = message.closer_peers.into_iter()
536 .filter_map(|peer| KadPeer::try_from(peer).ok())
537 .collect();
538
539 Ok(KadResponseMsg::GetValue { record, closer_peers })
540 }
541
542 proto::message::MessageType::FindNode => {
543 let closer_peers = message.closer_peers.into_iter()
544 .filter_map(|peer| KadPeer::try_from(peer).ok())
545 .collect();
546
547 Ok(KadResponseMsg::FindNode { closer_peers })
548 }
549
550 proto::message::MessageType::GetProviders => {
551 let closer_peers = message.closer_peers.into_iter()
552 .filter_map(|peer| KadPeer::try_from(peer).ok())
553 .collect();
554
555 let provider_peers = message.provider_peers.into_iter()
556 .filter_map(|peer| KadPeer::try_from(peer).ok())
557 .collect();
558
559 Ok(KadResponseMsg::GetProviders {
560 closer_peers,
561 provider_peers,
562 })
563 }
564
565 proto::message::MessageType::PutValue => {
566 let key = record::Key::from(message.key);
567 let rec = message.record.ok_or_else(|| {
568 invalid_data("received PutValue message with no record")
569 })?;
570
571 Ok(KadResponseMsg::PutValue {
572 key,
573 value: rec.value
574 })
575 }
576
577 proto::message::MessageType::AddProvider =>
578 Err(invalid_data("received an unexpected AddProvider message"))
579 }
580}
581
582pub fn record_from_proto(record: proto::Record) -> Result<Record, io::Error> {
583 let key = record::Key::from(record.key);
584 let value = record.value;
585
586 let publisher =
587 if !record.publisher.is_empty() {
588 PeerId::from_bytes(&record.publisher)
589 .map(Some)
590 .map_err(|_| invalid_data("Invalid publisher peer ID."))?
591 } else {
592 None
593 };
594
595 let expires =
596 if record.ttl > 0 {
597 Some(Instant::now() + Duration::from_secs(record.ttl as u64))
598 } else {
599 None
600 };
601
602 Ok(Record { key, value, publisher, expires })
603}
604
605pub fn record_to_proto(record: Record) -> proto::Record {
606 proto::Record {
607 key: record.key.to_vec(),
608 value: record.value,
609 publisher: record.publisher.map(|id| id.to_bytes()).unwrap_or_default(),
610 ttl: record.expires
611 .map(|t| {
612 let now = Instant::now();
613 if t > now {
614 (t - now).as_secs() as u32
615 } else {
616 1 }
618 })
619 .unwrap_or(0),
620 time_received: String::new()
621 }
622}
623
624fn invalid_data<E>(e: E) -> io::Error
626where
627 E: Into<Box<dyn std::error::Error + Send + Sync>>
628{
629 io::Error::new(io::ErrorKind::InvalidData, e)
630}
631
632#[cfg(test)]
633mod tests {
634
635 }