1use crate::protocol::{
22 KadInStreamSink, KadOutStreamSink, KadPeer, KadRequestMsg, KadResponseMsg,
23 KademliaProtocolConfig,
24};
25use crate::record::{self, Record};
26use futures::prelude::*;
27use tet_libp2p_swarm::{
28 IntoProtocolsHandler,
29 KeepAlive,
30 NegotiatedSubstream,
31 SubstreamProtocol,
32 ProtocolsHandler,
33 ProtocolsHandlerEvent,
34 ProtocolsHandlerUpgrErr
35};
36use tet_libp2p_core::{
37 ConnectedPoint,
38 PeerId,
39 either::EitherOutput,
40 upgrade::{self, InboundUpgrade, OutboundUpgrade}
41};
42use log::trace;
43use std::{error, fmt, io, marker::PhantomData, pin::Pin, task::Context, task::Poll, time::Duration};
44use wasm_timer::Instant;
45
46pub struct KademliaHandlerProto<T> {
48 config: KademliaHandlerConfig,
49 _type: PhantomData<T>,
50}
51
52impl<T> KademliaHandlerProto<T> {
53 pub fn new(config: KademliaHandlerConfig) -> Self {
54 KademliaHandlerProto { config, _type: PhantomData }
55 }
56}
57
58impl<T: Clone + Send + 'static> IntoProtocolsHandler for KademliaHandlerProto<T> {
59 type Handler = KademliaHandler<T>;
60
61 fn into_handler(self, _: &PeerId, endpoint: &ConnectedPoint) -> Self::Handler {
62 KademliaHandler::new(self.config, endpoint.clone())
63 }
64
65 fn inbound_protocol(&self) -> <Self::Handler as ProtocolsHandler>::InboundProtocol {
66 if self.config.allow_listening {
67 upgrade::EitherUpgrade::A(self.config.protocol_config.clone())
68 } else {
69 upgrade::EitherUpgrade::B(upgrade::DeniedUpgrade)
70 }
71 }
72}
73
74pub struct KademliaHandler<TUserData> {
82 config: KademliaHandlerConfig,
84
85 next_connec_unique_id: UniqueConnecId,
87
88 substreams: Vec<SubstreamState<TUserData>>,
90
91 keep_alive: KeepAlive,
93
94 endpoint: ConnectedPoint,
97
98 protocol_status: ProtocolStatus,
100}
101
102enum ProtocolStatus {
105 Unconfirmed,
108 Confirmed,
111 Reported,
114}
115
116#[derive(Debug, Clone)]
118pub struct KademliaHandlerConfig {
119 pub protocol_config: KademliaProtocolConfig,
121
122 pub allow_listening: bool,
124
125 pub idle_timeout: Duration,
127}
128
129enum SubstreamState<TUserData> {
131 OutPendingOpen(KadRequestMsg, Option<TUserData>),
134 OutPendingSend(
136 KadOutStreamSink<NegotiatedSubstream>,
137 KadRequestMsg,
138 Option<TUserData>,
139 ),
140 OutPendingFlush(KadOutStreamSink<NegotiatedSubstream>, Option<TUserData>),
142 OutWaitingAnswer(KadOutStreamSink<NegotiatedSubstream>, TUserData),
145 OutReportError(KademliaHandlerQueryErr, TUserData),
147 OutClosing(KadOutStreamSink<NegotiatedSubstream>),
149 InWaitingMessage(UniqueConnecId, KadInStreamSink<NegotiatedSubstream>),
151 InWaitingUser(UniqueConnecId, KadInStreamSink<NegotiatedSubstream>),
153 InPendingSend(UniqueConnecId, KadInStreamSink<NegotiatedSubstream>, KadResponseMsg),
155 InPendingFlush(UniqueConnecId, KadInStreamSink<NegotiatedSubstream>),
157 InClosing(KadInStreamSink<NegotiatedSubstream>),
159}
160
161impl<TUserData> SubstreamState<TUserData> {
162 fn try_close(&mut self, cx: &mut Context<'_>) -> Poll<()> {
166 match self {
167 SubstreamState::OutPendingOpen(_, _)
168 | SubstreamState::OutReportError(_, _) => Poll::Ready(()),
169 SubstreamState::OutPendingSend(ref mut stream, _, _)
170 | SubstreamState::OutPendingFlush(ref mut stream, _)
171 | SubstreamState::OutWaitingAnswer(ref mut stream, _)
172 | SubstreamState::OutClosing(ref mut stream) => match Sink::poll_close(Pin::new(stream), cx) {
173 Poll::Ready(_) => Poll::Ready(()),
174 Poll::Pending => Poll::Pending,
175 },
176 SubstreamState::InWaitingMessage(_, ref mut stream)
177 | SubstreamState::InWaitingUser(_, ref mut stream)
178 | SubstreamState::InPendingSend(_, ref mut stream, _)
179 | SubstreamState::InPendingFlush(_, ref mut stream)
180 | SubstreamState::InClosing(ref mut stream) => match Sink::poll_close(Pin::new(stream), cx) {
181 Poll::Ready(_) => Poll::Ready(()),
182 Poll::Pending => Poll::Pending,
183 },
184 }
185 }
186}
187
188#[derive(Debug)]
190pub enum KademliaHandlerEvent<TUserData> {
191 ProtocolConfirmed { endpoint: ConnectedPoint },
199
200 FindNodeReq {
203 key: Vec<u8>,
205 request_id: KademliaRequestId,
207 },
208
209 FindNodeRes {
211 closer_peers: Vec<KadPeer>,
213 user_data: TUserData,
215 },
216
217 GetProvidersReq {
220 key: record::Key,
222 request_id: KademliaRequestId,
224 },
225
226 GetProvidersRes {
228 closer_peers: Vec<KadPeer>,
230 provider_peers: Vec<KadPeer>,
232 user_data: TUserData,
234 },
235
236 QueryError {
238 error: KademliaHandlerQueryErr,
240 user_data: TUserData,
242 },
243
244 AddProvider {
246 key: record::Key,
248 provider: KadPeer,
250 },
251
252 GetRecord {
254 key: record::Key,
256 request_id: KademliaRequestId,
258 },
259
260 GetRecordRes {
262 record: Option<Record>,
264 closer_peers: Vec<KadPeer>,
266 user_data: TUserData,
268 },
269
270 PutRecord {
272 record: Record,
273 request_id: KademliaRequestId,
275 },
276
277 PutRecordRes {
279 key: record::Key,
281 value: Vec<u8>,
283 user_data: TUserData,
285 }
286}
287
288#[derive(Debug)]
290pub enum KademliaHandlerQueryErr {
291 Upgrade(ProtocolsHandlerUpgrErr<io::Error>),
293 UnexpectedMessage,
295 Io(io::Error),
297}
298
299impl fmt::Display for KademliaHandlerQueryErr {
300 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
301 match self {
302 KademliaHandlerQueryErr::Upgrade(err) => {
303 write!(f, "Error while performing Kademlia query: {}", err)
304 },
305 KademliaHandlerQueryErr::UnexpectedMessage => {
306 write!(f, "Remote answered our Kademlia RPC query with the wrong message type")
307 },
308 KademliaHandlerQueryErr::Io(err) => {
309 write!(f, "I/O error during a Kademlia RPC query: {}", err)
310 },
311 }
312 }
313}
314
315impl error::Error for KademliaHandlerQueryErr {
316 fn source(&self) -> Option<&(dyn error::Error + 'static)> {
317 match self {
318 KademliaHandlerQueryErr::Upgrade(err) => Some(err),
319 KademliaHandlerQueryErr::UnexpectedMessage => None,
320 KademliaHandlerQueryErr::Io(err) => Some(err),
321 }
322 }
323}
324
325impl From<ProtocolsHandlerUpgrErr<io::Error>> for KademliaHandlerQueryErr {
326 fn from(err: ProtocolsHandlerUpgrErr<io::Error>) -> Self {
327 KademliaHandlerQueryErr::Upgrade(err)
328 }
329}
330
331#[derive(Debug)]
333pub enum KademliaHandlerIn<TUserData> {
334 Reset(KademliaRequestId),
342
343 FindNodeReq {
346 key: Vec<u8>,
348 user_data: TUserData,
350 },
351
352 FindNodeRes {
354 closer_peers: Vec<KadPeer>,
356 request_id: KademliaRequestId,
360 },
361
362 GetProvidersReq {
365 key: record::Key,
367 user_data: TUserData,
369 },
370
371 GetProvidersRes {
373 closer_peers: Vec<KadPeer>,
375 provider_peers: Vec<KadPeer>,
377 request_id: KademliaRequestId,
381 },
382
383 AddProvider {
388 key: record::Key,
390 provider: KadPeer,
392 },
393
394 GetRecord {
396 key: record::Key,
398 user_data: TUserData,
400 },
401
402 GetRecordRes {
404 record: Option<Record>,
406 closer_peers: Vec<KadPeer>,
408 request_id: KademliaRequestId,
410 },
411
412 PutRecord {
414 record: Record,
415 user_data: TUserData,
417 },
418
419 PutRecordRes {
421 key: record::Key,
423 value: Vec<u8>,
425 request_id: KademliaRequestId,
427 }
428}
429
430#[derive(Debug, PartialEq, Eq)]
433pub struct KademliaRequestId {
434 connec_unique_id: UniqueConnecId,
436}
437
438#[derive(Debug, Copy, Clone, PartialEq, Eq)]
440struct UniqueConnecId(u64);
441
442impl<TUserData> KademliaHandler<TUserData> {
443 pub fn new(config: KademliaHandlerConfig, endpoint: ConnectedPoint) -> Self {
445 let keep_alive = KeepAlive::Until(Instant::now() + config.idle_timeout);
446
447 KademliaHandler {
448 config,
449 endpoint,
450 next_connec_unique_id: UniqueConnecId(0),
451 substreams: Vec::new(),
452 keep_alive,
453 protocol_status: ProtocolStatus::Unconfirmed,
454 }
455 }
456}
457
458impl<TUserData> ProtocolsHandler for KademliaHandler<TUserData>
459where
460 TUserData: Clone + Send + 'static,
461{
462 type InEvent = KademliaHandlerIn<TUserData>;
463 type OutEvent = KademliaHandlerEvent<TUserData>;
464 type Error = io::Error; type InboundProtocol = upgrade::EitherUpgrade<KademliaProtocolConfig, upgrade::DeniedUpgrade>;
466 type OutboundProtocol = KademliaProtocolConfig;
467 type OutboundOpenInfo = (KadRequestMsg, Option<TUserData>);
469 type InboundOpenInfo = ();
470
471 fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
472 if self.config.allow_listening {
473 SubstreamProtocol::new(self.config.protocol_config.clone(), ()).map_upgrade(upgrade::EitherUpgrade::A)
474 } else {
475 SubstreamProtocol::new(upgrade::EitherUpgrade::B(upgrade::DeniedUpgrade), ())
476 }
477 }
478
479 fn inject_fully_negotiated_outbound(
480 &mut self,
481 protocol: <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output,
482 (msg, user_data): Self::OutboundOpenInfo,
483 ) {
484 self.substreams.push(SubstreamState::OutPendingSend(protocol, msg, user_data));
485 if let ProtocolStatus::Unconfirmed = self.protocol_status {
486 self.protocol_status = ProtocolStatus::Confirmed;
490 }
491 }
492
493 fn inject_fully_negotiated_inbound(
494 &mut self,
495 protocol: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
496 (): Self::InboundOpenInfo
497 ) {
498 let protocol = match protocol {
501 EitherOutput::First(p) => p,
502 EitherOutput::Second(p) => void::unreachable(p),
503 };
504
505 debug_assert!(self.config.allow_listening);
506 let connec_unique_id = self.next_connec_unique_id;
507 self.next_connec_unique_id.0 += 1;
508 self.substreams.push(SubstreamState::InWaitingMessage(connec_unique_id, protocol));
509 if let ProtocolStatus::Unconfirmed = self.protocol_status {
510 self.protocol_status = ProtocolStatus::Confirmed;
514 }
515 }
516
517 fn inject_event(&mut self, message: KademliaHandlerIn<TUserData>) {
518 match message {
519 KademliaHandlerIn::Reset(request_id) => {
520 let pos = self.substreams.iter().position(|state| match state {
521 SubstreamState::InWaitingUser(conn_id, _) =>
522 conn_id == &request_id.connec_unique_id,
523 _ => false,
524 });
525 if let Some(pos) = pos {
526 let waker = futures::task::noop_waker();
528 let mut cx = Context::from_waker(&waker);
529 let _ = self.substreams.remove(pos).try_close(&mut cx);
530 }
531 }
532 KademliaHandlerIn::FindNodeReq { key, user_data } => {
533 let msg = KadRequestMsg::FindNode { key };
534 self.substreams.push(SubstreamState::OutPendingOpen(msg, Some(user_data.clone())));
535 }
536 KademliaHandlerIn::FindNodeRes {
537 closer_peers,
538 request_id,
539 } => {
540 let pos = self.substreams.iter().position(|state| match state {
541 SubstreamState::InWaitingUser(ref conn_id, _) =>
542 conn_id == &request_id.connec_unique_id,
543 _ => false,
544 });
545
546 if let Some(pos) = pos {
547 let (conn_id, substream) = match self.substreams.remove(pos) {
548 SubstreamState::InWaitingUser(conn_id, substream) => (conn_id, substream),
549 _ => unreachable!(),
550 };
551
552 let msg = KadResponseMsg::FindNode {
553 closer_peers: closer_peers.clone(),
554 };
555 self.substreams
556 .push(SubstreamState::InPendingSend(conn_id, substream, msg));
557 }
558 }
559 KademliaHandlerIn::GetProvidersReq { key, user_data } => {
560 let msg = KadRequestMsg::GetProviders { key };
561 self.substreams
562 .push(SubstreamState::OutPendingOpen(msg, Some(user_data.clone())));
563 }
564 KademliaHandlerIn::GetProvidersRes {
565 closer_peers,
566 provider_peers,
567 request_id,
568 } => {
569 let pos = self.substreams.iter().position(|state| match state {
570 SubstreamState::InWaitingUser(ref conn_id, _)
571 if conn_id == &request_id.connec_unique_id =>
572 {
573 true
574 }
575 _ => false,
576 });
577
578 if let Some(pos) = pos {
579 let (conn_id, substream) = match self.substreams.remove(pos) {
580 SubstreamState::InWaitingUser(conn_id, substream) => (conn_id, substream),
581 _ => unreachable!(),
582 };
583
584 let msg = KadResponseMsg::GetProviders {
585 closer_peers: closer_peers.clone(),
586 provider_peers: provider_peers.clone(),
587 };
588 self.substreams
589 .push(SubstreamState::InPendingSend(conn_id, substream, msg));
590 }
591 }
592 KademliaHandlerIn::AddProvider { key, provider } => {
593 let msg = KadRequestMsg::AddProvider { key, provider };
594 self.substreams.push(SubstreamState::OutPendingOpen(msg, None));
595 }
596 KademliaHandlerIn::GetRecord { key, user_data } => {
597 let msg = KadRequestMsg::GetValue { key };
598 self.substreams.push(SubstreamState::OutPendingOpen(msg, Some(user_data)));
599
600 }
601 KademliaHandlerIn::PutRecord { record, user_data } => {
602 let msg = KadRequestMsg::PutValue { record };
603 self.substreams
604 .push(SubstreamState::OutPendingOpen(msg, Some(user_data)));
605 }
606 KademliaHandlerIn::GetRecordRes {
607 record,
608 closer_peers,
609 request_id,
610 } => {
611 let pos = self.substreams.iter().position(|state| match state {
612 SubstreamState::InWaitingUser(ref conn_id, _)
613 => conn_id == &request_id.connec_unique_id,
614 _ => false,
615 });
616
617 if let Some(pos) = pos {
618 let (conn_id, substream) = match self.substreams.remove(pos) {
619 SubstreamState::InWaitingUser(conn_id, substream) => (conn_id, substream),
620 _ => unreachable!(),
621 };
622
623 let msg = KadResponseMsg::GetValue {
624 record,
625 closer_peers: closer_peers.clone(),
626 };
627 self.substreams
628 .push(SubstreamState::InPendingSend(conn_id, substream, msg));
629 }
630 }
631 KademliaHandlerIn::PutRecordRes {
632 key,
633 request_id,
634 value,
635 } => {
636 let pos = self.substreams.iter().position(|state| match state {
637 SubstreamState::InWaitingUser(ref conn_id, _)
638 if conn_id == &request_id.connec_unique_id =>
639 {
640 true
641 }
642 _ => false,
643 });
644
645 if let Some(pos) = pos {
646 let (conn_id, substream) = match self.substreams.remove(pos) {
647 SubstreamState::InWaitingUser(conn_id, substream) => (conn_id, substream),
648 _ => unreachable!(),
649 };
650
651 let msg = KadResponseMsg::PutValue {
652 key,
653 value,
654 };
655 self.substreams
656 .push(SubstreamState::InPendingSend(conn_id, substream, msg));
657 }
658 }
659 }
660 }
661
662 fn inject_dial_upgrade_error(
663 &mut self,
664 (_, user_data): Self::OutboundOpenInfo,
665 error: ProtocolsHandlerUpgrErr<io::Error>,
666 ) {
667 if let Some(user_data) = user_data {
670 self.substreams
671 .push(SubstreamState::OutReportError(error.into(), user_data));
672 }
673 }
674
675 fn connection_keep_alive(&self) -> KeepAlive {
676 self.keep_alive
677 }
678
679 fn poll(
680 &mut self,
681 cx: &mut Context<'_>,
682 ) -> Poll<
683 ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent, Self::Error>,
684 > {
685 if self.substreams.is_empty() {
686 return Poll::Pending;
687 }
688
689 if let ProtocolStatus::Confirmed = self.protocol_status {
690 self.protocol_status = ProtocolStatus::Reported;
691 return Poll::Ready(ProtocolsHandlerEvent::Custom(
692 KademliaHandlerEvent::ProtocolConfirmed {
693 endpoint: self.endpoint.clone()
694 }))
695 }
696
697 for n in (0..self.substreams.len()).rev() {
699 let mut substream = self.substreams.swap_remove(n);
700
701 loop {
702 match advance_substream(substream, self.config.protocol_config.clone(), cx) {
703 (Some(new_state), Some(event), _) => {
704 self.substreams.push(new_state);
705 return Poll::Ready(event);
706 }
707 (None, Some(event), _) => {
708 if self.substreams.is_empty() {
709 self.keep_alive = KeepAlive::Until(Instant::now() + self.config.idle_timeout);
710 }
711 return Poll::Ready(event);
712 }
713 (Some(new_state), None, false) => {
714 self.substreams.push(new_state);
715 break;
716 }
717 (Some(new_state), None, true) => {
718 substream = new_state;
719 continue;
720 }
721 (None, None, _) => {
722 break;
723 }
724 }
725 }
726 }
727
728 if self.substreams.is_empty() {
729 self.keep_alive = KeepAlive::Until(Instant::now() + self.config.idle_timeout);
731 } else {
732 self.keep_alive = KeepAlive::Yes;
733 }
734
735 Poll::Pending
736 }
737}
738
739impl Default for KademliaHandlerConfig {
740 fn default() -> Self {
741 KademliaHandlerConfig {
742 protocol_config: Default::default(),
743 allow_listening: true,
744 idle_timeout: Duration::from_secs(10),
745 }
746 }
747}
748
749fn advance_substream<TUserData>(
754 state: SubstreamState<TUserData>,
755 upgrade: KademliaProtocolConfig,
756 cx: &mut Context<'_>,
757) -> (
758 Option<SubstreamState<TUserData>>,
759 Option<
760 ProtocolsHandlerEvent<
761 KademliaProtocolConfig,
762 (KadRequestMsg, Option<TUserData>),
763 KademliaHandlerEvent<TUserData>,
764 io::Error,
765 >,
766 >,
767 bool,
768)
769{
770 match state {
771 SubstreamState::OutPendingOpen(msg, user_data) => {
772 let ev = ProtocolsHandlerEvent::OutboundSubstreamRequest {
773 protocol: SubstreamProtocol::new(upgrade, (msg, user_data))
774 };
775 (None, Some(ev), false)
776 }
777 SubstreamState::OutPendingSend(mut substream, msg, user_data) => {
778 match Sink::poll_ready(Pin::new(&mut substream), cx) {
779 Poll::Ready(Ok(())) => {
780 match Sink::start_send(Pin::new(&mut substream), msg) {
781 Ok(()) => (
782 Some(SubstreamState::OutPendingFlush(substream, user_data)),
783 None,
784 true,
785 ),
786 Err(error) => {
787 let event = if let Some(user_data) = user_data {
788 Some(ProtocolsHandlerEvent::Custom(KademliaHandlerEvent::QueryError {
789 error: KademliaHandlerQueryErr::Io(error),
790 user_data
791 }))
792 } else {
793 None
794 };
795
796 (None, event, false)
797 }
798 }
799 },
800 Poll::Pending => (
801 Some(SubstreamState::OutPendingSend(substream, msg, user_data)),
802 None,
803 false,
804 ),
805 Poll::Ready(Err(error)) => {
806 let event = if let Some(user_data) = user_data {
807 Some(ProtocolsHandlerEvent::Custom(KademliaHandlerEvent::QueryError {
808 error: KademliaHandlerQueryErr::Io(error),
809 user_data
810 }))
811 } else {
812 None
813 };
814
815 (None, event, false)
816 }
817 }
818 }
819 SubstreamState::OutPendingFlush(mut substream, user_data) => {
820 match Sink::poll_flush(Pin::new(&mut substream), cx) {
821 Poll::Ready(Ok(())) => {
822 if let Some(user_data) = user_data {
823 (
824 Some(SubstreamState::OutWaitingAnswer(substream, user_data)),
825 None,
826 true,
827 )
828 } else {
829 (Some(SubstreamState::OutClosing(substream)), None, true)
830 }
831 }
832 Poll::Pending => (
833 Some(SubstreamState::OutPendingFlush(substream, user_data)),
834 None,
835 false,
836 ),
837 Poll::Ready(Err(error)) => {
838 let event = if let Some(user_data) = user_data {
839 Some(ProtocolsHandlerEvent::Custom(KademliaHandlerEvent::QueryError {
840 error: KademliaHandlerQueryErr::Io(error),
841 user_data,
842 }))
843 } else {
844 None
845 };
846
847 (None, event, false)
848 }
849 }
850 }
851 SubstreamState::OutWaitingAnswer(mut substream, user_data) => match Stream::poll_next(Pin::new(&mut substream), cx) {
852 Poll::Ready(Some(Ok(msg))) => {
853 let new_state = SubstreamState::OutClosing(substream);
854 let event = process_kad_response(msg, user_data);
855 (
856 Some(new_state),
857 Some(ProtocolsHandlerEvent::Custom(event)),
858 true,
859 )
860 }
861 Poll::Pending => (
862 Some(SubstreamState::OutWaitingAnswer(substream, user_data)),
863 None,
864 false,
865 ),
866 Poll::Ready(Some(Err(error))) => {
867 let event = KademliaHandlerEvent::QueryError {
868 error: KademliaHandlerQueryErr::Io(error),
869 user_data,
870 };
871 (None, Some(ProtocolsHandlerEvent::Custom(event)), false)
872 }
873 Poll::Ready(None) => {
874 let event = KademliaHandlerEvent::QueryError {
875 error: KademliaHandlerQueryErr::Io(io::ErrorKind::UnexpectedEof.into()),
876 user_data,
877 };
878 (None, Some(ProtocolsHandlerEvent::Custom(event)), false)
879 }
880 },
881 SubstreamState::OutReportError(error, user_data) => {
882 let event = KademliaHandlerEvent::QueryError { error, user_data };
883 (None, Some(ProtocolsHandlerEvent::Custom(event)), false)
884 }
885 SubstreamState::OutClosing(mut stream) => match Sink::poll_close(Pin::new(&mut stream), cx) {
886 Poll::Ready(Ok(())) => (None, None, false),
887 Poll::Pending => (Some(SubstreamState::OutClosing(stream)), None, false),
888 Poll::Ready(Err(_)) => (None, None, false),
889 },
890 SubstreamState::InWaitingMessage(id, mut substream) => match Stream::poll_next(Pin::new(&mut substream), cx) {
891 Poll::Ready(Some(Ok(msg))) => {
892 if let Ok(ev) = process_kad_request(msg, id) {
893 (
894 Some(SubstreamState::InWaitingUser(id, substream)),
895 Some(ProtocolsHandlerEvent::Custom(ev)),
896 false,
897 )
898 } else {
899 (Some(SubstreamState::InClosing(substream)), None, true)
900 }
901 }
902 Poll::Pending => (
903 Some(SubstreamState::InWaitingMessage(id, substream)),
904 None,
905 false,
906 ),
907 Poll::Ready(None) => {
908 trace!("Inbound substream: EOF");
909 (None, None, false)
910 }
911 Poll::Ready(Some(Err(e))) => {
912 trace!("Inbound substream error: {:?}", e);
913 (None, None, false)
914 },
915 },
916 SubstreamState::InWaitingUser(id, substream) => (
917 Some(SubstreamState::InWaitingUser(id, substream)),
918 None,
919 false,
920 ),
921 SubstreamState::InPendingSend(id, mut substream, msg) => match Sink::poll_ready(Pin::new(&mut substream), cx) {
922 Poll::Ready(Ok(())) => match Sink::start_send(Pin::new(&mut substream), msg) {
923 Ok(()) => (
924 Some(SubstreamState::InPendingFlush(id, substream)),
925 None,
926 true,
927 ),
928 Err(_) => (None, None, false),
929 },
930 Poll::Pending => (
931 Some(SubstreamState::InPendingSend(id, substream, msg)),
932 None,
933 false,
934 ),
935 Poll::Ready(Err(_)) => (None, None, false),
936 }
937 SubstreamState::InPendingFlush(id, mut substream) => match Sink::poll_flush(Pin::new(&mut substream), cx) {
938 Poll::Ready(Ok(())) => (
939 Some(SubstreamState::InWaitingMessage(id, substream)),
940 None,
941 true,
942 ),
943 Poll::Pending => (
944 Some(SubstreamState::InPendingFlush(id, substream)),
945 None,
946 false,
947 ),
948 Poll::Ready(Err(_)) => (None, None, false),
949 },
950 SubstreamState::InClosing(mut stream) => match Sink::poll_close(Pin::new(&mut stream), cx) {
951 Poll::Ready(Ok(())) => (None, None, false),
952 Poll::Pending => (Some(SubstreamState::InClosing(stream)), None, false),
953 Poll::Ready(Err(_)) => (None, None, false),
954 },
955 }
956}
957
958fn process_kad_request<TUserData>(
960 event: KadRequestMsg,
961 connec_unique_id: UniqueConnecId,
962) -> Result<KademliaHandlerEvent<TUserData>, io::Error> {
963 match event {
964 KadRequestMsg::Ping => {
965 Err(io::Error::new(
968 io::ErrorKind::InvalidData,
969 "the PING Kademlia message is not implemented",
970 ))
971 }
972 KadRequestMsg::FindNode { key } => Ok(KademliaHandlerEvent::FindNodeReq {
973 key,
974 request_id: KademliaRequestId { connec_unique_id },
975 }),
976 KadRequestMsg::GetProviders { key } => Ok(KademliaHandlerEvent::GetProvidersReq {
977 key,
978 request_id: KademliaRequestId { connec_unique_id },
979 }),
980 KadRequestMsg::AddProvider { key, provider } => {
981 Ok(KademliaHandlerEvent::AddProvider { key, provider })
982 }
983 KadRequestMsg::GetValue { key } => Ok(KademliaHandlerEvent::GetRecord {
984 key,
985 request_id: KademliaRequestId { connec_unique_id },
986 }),
987 KadRequestMsg::PutValue { record } => Ok(KademliaHandlerEvent::PutRecord {
988 record,
989 request_id: KademliaRequestId { connec_unique_id },
990 })
991 }
992}
993
994fn process_kad_response<TUserData>(
996 event: KadResponseMsg,
997 user_data: TUserData,
998) -> KademliaHandlerEvent<TUserData> {
999 match event {
1001 KadResponseMsg::Pong => {
1002 KademliaHandlerEvent::QueryError {
1004 error: KademliaHandlerQueryErr::UnexpectedMessage,
1005 user_data,
1006 }
1007 }
1008 KadResponseMsg::FindNode { closer_peers } => {
1009 KademliaHandlerEvent::FindNodeRes {
1010 closer_peers,
1011 user_data,
1012 }
1013 },
1014 KadResponseMsg::GetProviders {
1015 closer_peers,
1016 provider_peers,
1017 } => KademliaHandlerEvent::GetProvidersRes {
1018 closer_peers,
1019 provider_peers,
1020 user_data,
1021 },
1022 KadResponseMsg::GetValue {
1023 record,
1024 closer_peers,
1025 } => KademliaHandlerEvent::GetRecordRes {
1026 record,
1027 closer_peers,
1028 user_data,
1029 },
1030 KadResponseMsg::PutValue { key, value, .. } => {
1031 KademliaHandlerEvent::PutRecordRes {
1032 key,
1033 value,
1034 user_data,
1035 }
1036 }
1037 }
1038}