1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
use crate::errors::*;
use crate::service_discovery::mdns::discovery_config::MDNSServiceDiscoveryConfig;
use crate::service_discovery::mdns::state::MDNSServiceDiscoveryEvent;
use bastion_executor::blocking::spawn_blocking;
use libp2p::mdns::service::*;
use libp2p::multiaddr::Protocol;
use libp2p::{identity, Multiaddr, PeerId};
use lightproc::proc_stack::ProcStack;
use crossbeam_channel::{unbounded, Receiver};
use kaos::flunk;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
pub struct MDNSServiceDiscovery {
events: Arc<Receiver<MDNSServiceDiscoveryEvent>>,
}
unsafe impl Send for MDNSServiceDiscovery {}
unsafe impl Sync for MDNSServiceDiscovery {}
impl MDNSServiceDiscovery {
pub fn new_service_discovery(config: MDNSServiceDiscoveryConfig) -> Result<Self> {
let (event_tx, event_rx) = unbounded::<MDNSServiceDiscoveryEvent>();
let peer_id = PeerId::from(identity::Keypair::generate_ed25519().public());
let _discovery_handle = spawn_blocking(
async move {
let mut service = MdnsService::new().expect("Can't launch the MDNS service");
loop {
let (mut srv, packet) = service.next().await;
match packet {
MdnsPacket::Query(query) => {
debug!("Query from {:?}", query.remote_addr());
let address: Multiaddr = format!(
"/ip4/{}/udp/{}",
config.local_service_addr.ip().to_string(),
config.local_service_addr.port()
)
.parse()
.unwrap();
let resp = build_query_response(
query.query_id(),
peer_id.clone(),
vec![address].into_iter(),
config.reply_ttl,
)
.unwrap();
srv.enqueue_response(resp);
}
MdnsPacket::Response(response) => {
// We detected a libp2p mDNS response on the network. Responses are for
// everyone and not just for the requester, which makes it possible to
// passively listen.
for peer in response.discovered_peers() {
debug!("Discovered peer {:?}", peer.id());
// These are the self-reported addresses of the peer we just discovered.
for addr in peer.addresses() {
debug!(" Address = {:?}", addr);
let components = addr.iter().collect::<Vec<_>>();
flunk!("mdns-protocol-fp");
if let Protocol::Ip4(discovered_ip) = components[0] {
if let Protocol::Udp(discovered_port) = components[1] {
let discovered =
format!("{}:{}", discovered_ip, discovered_port)
.parse()
.unwrap();
event_tx
.send(MDNSServiceDiscoveryEvent(discovered))
.unwrap();
} else {
error!(
"Unexpected protocol received: {}",
components[1]
);
}
} else {
error!("Unexpected IP received: {}", components[0]);
}
}
}
}
MdnsPacket::ServiceDiscovery(query) => {
// The last possibility is a service detection query from DNS-SD.
// Just like `Query`, in a real application you probably want to call
// `query.respond`.
debug!("Detected service query from {:?}", query.remote_addr());
}
}
service = srv
}
},
ProcStack::default(),
);
Ok(Self {
events: Arc::new(event_rx),
})
}
pub fn events(&self) -> Arc<Receiver<MDNSServiceDiscoveryEvent>> {
self.events.clone()
}
}
impl Future for MDNSServiceDiscovery {
type Output = MDNSServiceDiscoveryEvent;
fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
match self.events.recv() {
Ok(kv) => Poll::Ready(kv),
Err(_) => Poll::Pending,
}
}
}