1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
//! mDNS local network discovery for x0x agents.
//!
//! Registers each agent as a `_x0x._udp.local.` DNS-SD service so that
//! other x0x instances on the same LAN can discover and connect without
//! requiring bootstrap nodes or explicit addresses.
//!
//! TXT records carry: `agent_id`, `machine_id`, `words` (four-word
//! speakable identity), and `version`.
use crate::identity::{AgentId, MachineId};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::sync::RwLock;
/// DNS-SD service type for x0x agents.
pub const SERVICE_TYPE: &str = "_x0x._udp.local.";
/// A peer discovered via mDNS on the local network.
#[derive(Debug, Clone)]
pub struct MdnsDiscoveredPeer {
/// The agent's identifier.
pub agent_id: AgentId,
/// The machine identifier.
pub machine_id: MachineId,
/// Four-word speakable identity.
pub words: String,
/// Routable addresses for the QUIC endpoint (loopback filtered out).
pub addrs: Vec<SocketAddr>,
/// The crate version reported by the peer.
pub version: String,
}
/// mDNS service discovery for x0x agents.
///
/// Wraps `mdns_sd::ServiceDaemon` to register this agent on the LAN
/// and browse for other agents. All methods are safe to call from
/// async code — the underlying daemon runs on its own thread.
///
/// Implements `Drop` to ensure the daemon thread and browse task are
/// cleaned up even if `shutdown()` is not called explicitly.
pub struct MdnsDiscovery {
daemon: mdns_sd::ServiceDaemon,
/// Our registered fullname for self-filtering and unregister.
instance_fullname: String,
/// Peers discovered via mDNS browse, keyed by instance fullname.
discovered: Arc<RwLock<HashMap<String, MdnsDiscoveredPeer>>>,
/// Handle for the background browse task.
browse_handle: Arc<tokio::sync::Mutex<Option<tokio::task::JoinHandle<()>>>>,
/// Whether `start_browse` has been called (idempotency guard).
browse_started: AtomicBool,
/// Whether `shutdown` has been called (prevents double-shutdown in Drop).
shut_down: AtomicBool,
}
impl MdnsDiscovery {
/// Create a new mDNS discovery instance and register this agent.
///
/// - `agent_id`: This agent's identifier (hex in TXT record).
/// - `machine_id`: This machine's identifier (hex in TXT record).
/// - `words`: Four-word speakable identity string.
/// - `port`: The QUIC port this agent is listening on.
pub fn new(
agent_id: &AgentId,
machine_id: &MachineId,
words: &str,
port: u16,
) -> Result<Self, String> {
let daemon = mdns_sd::ServiceDaemon::new().map_err(|e| format!("mDNS daemon: {e}"))?;
// Enable multicast loopback so that multiple x0x instances on the
// same machine can discover each other (mDNS queries are sent to
// the multicast group, and loopback ensures they reach other
// processes on the same host).
let _ = daemon.set_multicast_loop_v4(true);
let _ = daemon.set_multicast_loop_v6(true);
// Instance name must be unique per (agent, machine) pair because
// agent keys are portable across machines. Use 8 bytes of each ID
// (16 hex chars each) for 128 bits total, well under the 63-byte
// DNS label limit: "x0x-" (4) + 16 + "-" (1) + 16 = 37 bytes.
let instance_name = format!(
"x0x-{}-{}",
&hex::encode(agent_id.0)[..16],
&hex::encode(machine_id.0)[..16]
);
let instance_fullname = format!("{instance_name}.{SERVICE_TYPE}");
let agent_hex = hex::encode(agent_id.0);
let machine_hex = hex::encode(machine_id.0);
let version = env!("CARGO_PKG_VERSION");
let properties: Vec<(&str, &str)> = vec![
("agent_id", agent_hex.as_str()),
("machine_id", machine_hex.as_str()),
("words", words),
("version", version),
];
// Hostname: use the instance name with .local. suffix.
let hostname = format!("{instance_name}.local.");
let service_info = mdns_sd::ServiceInfo::new(
SERVICE_TYPE,
&instance_name,
&hostname,
"", // empty IP = let mdns-sd auto-detect all interfaces
port,
properties.as_slice(),
)
.map_err(|e| format!("mDNS ServiceInfo: {e}"))?
.enable_addr_auto();
daemon
.register(service_info)
.map_err(|e| format!("mDNS register: {e}"))?;
tracing::info!(
"mDNS: registered {instance_name} on port {port} ({})",
words
);
Ok(Self {
daemon,
instance_fullname,
discovered: Arc::new(RwLock::new(HashMap::new())),
browse_handle: Arc::new(tokio::sync::Mutex::new(None)),
browse_started: AtomicBool::new(false),
shut_down: AtomicBool::new(false),
})
}
/// Start browsing for other x0x agents on the LAN.
///
/// Spawns a background task that processes mDNS browse events and
/// populates the discovered peers map. Returns immediately.
///
/// Calling this more than once is a no-op — the browse task is
/// idempotent.
pub async fn start_browse(&self) -> Result<(), String> {
// Atomic idempotency: CAS ensures exactly one caller wins.
if self
.browse_started
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_err()
{
return Ok(());
}
let receiver = match self.daemon.browse(SERVICE_TYPE) {
Ok(r) => r,
Err(e) => {
// Reset so a future call can retry after transient failure.
self.browse_started.store(false, Ordering::SeqCst);
return Err(format!("mDNS browse: {e}"));
}
};
let discovered = Arc::clone(&self.discovered);
let our_fullname = self.instance_fullname.clone();
let daemon_clone = self.daemon.clone();
let handle = tokio::task::spawn(async move {
// The mdns-sd receiver is sync, so we use spawn_blocking
// inside a loop to avoid blocking the tokio runtime.
loop {
let rx_clone = receiver.clone();
let event = tokio::task::spawn_blocking(move || {
// Block for up to 2 seconds waiting for an event.
rx_clone.recv_timeout(std::time::Duration::from_secs(2))
})
.await;
match event {
Ok(Ok(mdns_sd::ServiceEvent::ServiceFound(_service_type, fullname))) => {
// On macOS, browse() emits ServiceFound but the
// automatic SRV/A resolution often never completes.
// Explicitly call verify() to force the daemon to
// query for SRV + address records, which will then
// trigger a ServiceResolved event on this receiver.
if fullname != our_fullname {
tracing::debug!("mDNS: ServiceFound {fullname}, requesting verify");
if let Err(e) =
daemon_clone.verify(fullname, std::time::Duration::from_secs(5))
{
tracing::debug!("mDNS: verify request failed: {e}");
}
}
}
Ok(Ok(mdns_sd::ServiceEvent::ServiceResolved(info))) => {
// Skip our own registration (exact fullname match).
let full_name = info.get_fullname().to_string();
if full_name == our_fullname {
continue;
}
let props = info.get_properties();
let agent_hex = props
.get("agent_id")
.map(|p| p.val_str().to_string())
.unwrap_or_default();
let machine_hex = props
.get("machine_id")
.map(|p| p.val_str().to_string())
.unwrap_or_default();
let words = props
.get("words")
.map(|p| p.val_str().to_string())
.unwrap_or_default();
let version = props
.get("version")
.map(|p| p.val_str().to_string())
.unwrap_or_default();
// Parse agent_id and machine_id from hex.
let agent_id = match hex::decode(&agent_hex) {
Ok(bytes) if bytes.len() == 32 => {
let mut arr = [0u8; 32];
arr.copy_from_slice(&bytes);
AgentId(arr)
}
_ => {
tracing::warn!("mDNS: invalid agent_id hex from {full_name}");
continue;
}
};
let machine_id = match hex::decode(&machine_hex) {
Ok(bytes) if bytes.len() == 32 => {
let mut arr = [0u8; 32];
arr.copy_from_slice(&bytes);
MachineId(arr)
}
_ => {
tracing::warn!("mDNS: invalid machine_id hex from {full_name}");
continue;
}
};
// Collect routable addresses, filtering out:
// - Loopback (127.0.0.1 / ::1)
// - Link-local IPv6 (fe80::) — needs scope_id not in SocketAddr
// - APIPA / link-local IPv4 (169.254.x.x)
// Deduplicate via collect into a set then back to Vec.
let port = info.get_port();
let addr_set: std::collections::HashSet<SocketAddr> = info
.get_addresses()
.iter()
.map(|ip| SocketAddr::new(ip.to_ip_addr(), port))
.filter(|a| is_routable(a.ip()))
.collect();
let addrs: Vec<SocketAddr> = addr_set.into_iter().collect();
if addrs.is_empty() {
tracing::debug!("mDNS: skipping {full_name} — no routable addresses");
continue;
}
// Only log the first time we discover this peer.
let is_new = !discovered.read().await.contains_key(&full_name);
if is_new {
tracing::info!(
"mDNS: discovered agent {} at {:?} ({})",
&agent_hex[..12],
addrs,
words
);
}
let peer = MdnsDiscoveredPeer {
agent_id,
machine_id,
words,
addrs,
version,
};
discovered.write().await.insert(full_name, peer);
}
Ok(Ok(mdns_sd::ServiceEvent::ServiceRemoved(_, full_name))) => {
tracing::info!("mDNS: agent removed: {full_name}");
discovered.write().await.remove(&full_name);
}
Ok(Ok(other)) => {
tracing::debug!("mDNS: browse event: {other:?}");
}
Ok(Err(_)) => {
// Timeout — normal, just loop.
}
Err(e) => {
// spawn_blocking join error — task was cancelled.
tracing::debug!("mDNS browse task ended: {e}");
break;
}
}
}
});
// Store the handle synchronously so shutdown() can always find it.
*self.browse_handle.lock().await = Some(handle);
tracing::info!("mDNS: browsing for LAN agents on {SERVICE_TYPE}");
Ok(())
}
/// Return a snapshot of all currently discovered LAN peers.
pub async fn discovered_peers(&self) -> Vec<MdnsDiscoveredPeer> {
self.discovered.read().await.values().cloned().collect()
}
/// Shut down mDNS — unregister the service and stop browsing.
pub async fn shutdown(&self) {
// Prevent double shutdown (Drop may also call cleanup).
if self.shut_down.swap(true, Ordering::SeqCst) {
return;
}
// Abort the browse task.
if let Some(handle) = self.browse_handle.lock().await.take() {
handle.abort();
}
// Unregister our service.
if let Err(e) = self.daemon.unregister(&self.instance_fullname) {
tracing::warn!("mDNS: unregister failed: {e}");
}
// Shut down the daemon thread.
if let Err(e) = self.daemon.shutdown() {
tracing::warn!("mDNS: shutdown failed: {e}");
}
tracing::info!("mDNS: shut down");
}
}
/// Clean up daemon thread and browse task if dropped without `shutdown()`.
impl Drop for MdnsDiscovery {
fn drop(&mut self) {
if self.shut_down.load(Ordering::SeqCst) {
return; // Already shut down via shutdown().
}
// Abort the browse task (JoinHandle::abort is sync-safe).
if let Ok(mut guard) = self.browse_handle.try_lock() {
if let Some(handle) = guard.take() {
handle.abort();
}
}
// Unregister service and stop daemon thread.
let _ = self.daemon.unregister(&self.instance_fullname);
let _ = self.daemon.shutdown();
}
}
/// Returns true if the IP is routable on a LAN (not loopback, not
/// link-local IPv6, not APIPA/link-local IPv4).
fn is_routable(ip: std::net::IpAddr) -> bool {
match ip {
std::net::IpAddr::V4(v4) => {
!v4.is_loopback() && !v4.is_link_local() // filters 169.254.x.x
}
std::net::IpAddr::V6(v6) => {
// fe80::/10 = link-local, needs scope_id which SocketAddr doesn't carry.
let segs = v6.segments();
let is_link_local = (segs[0] & 0xffc0) == 0xfe80;
!v6.is_loopback() && !is_link_local
}
}
}