rtc_mdns/proto/mod.rs
1//! Sans-I/O mDNS Connection implementation.
2//!
3//! This module provides [`Mdns`], a sans-I/O implementation of an mDNS client/server
4//! that implements the [`sansio::Protocol`] trait for integration with any I/O framework.
5//!
6//! # Overview
7//!
8//! The [`Mdns`] struct handles the mDNS protocol logic without performing any I/O.
9//! The caller is responsible for:
10//!
11//! 1. **Network I/O**: Reading/writing UDP packets to/from 224.0.0.251:5353
12//! 2. **Timing**: Calling `handle_timeout()` when `poll_timeout()` expires
13//! 3. **Event Processing**: Handling events from `poll_event()`
14//!
15//! # Client Usage
16//!
17//! To query for a hostname:
18//!
19//! ```rust
20//! use rtc_mdns::{MdnsConfig, Mdns, MdnsEvent};
21//! use sansio::Protocol;
22//! use std::time::Instant;
23//!
24//! let mut mdns_client = Mdns::new(MdnsConfig::default());
25//!
26//! // Start a query - this queues a packet to send
27//! let query_id = mdns_client.query("printer.local");
28//!
29//! // Get the packet to send over the network
30//! if let Some(packet) = mdns_client.poll_write() {
31//! // Send packet.message to packet.transport.peer_addr via UDP
32//! println!("Send {} bytes to {}", packet.message.len(), packet.transport.peer_addr);
33//! }
34//!
35//! // When a response packet arrives, call handle_read()
36//! // Then check poll_event() for QueryAnswered events
37//! ```
38//!
39//! # Server Usage
40//!
41//! To respond to queries:
42//!
43//! ```rust
44//! use rtc_mdns::{MdnsConfig, Mdns};
45//! use std::net::{IpAddr, Ipv4Addr};
46//!
47//! let config = MdnsConfig::default()
48//! .with_local_names(vec!["myserver.local".to_string()])
49//! .with_local_ip(
50//! IpAddr::V4(Ipv4Addr::new(192, 168, 1, 10)),
51//! );
52//!
53//! let mut mdns_client = Mdns::new(config);
54//!
55//! // When a query packet arrives, call handle_read()
56//! // The connection automatically queues responses for configured local_names
57//! // Retrieve them with poll_write()
58//! ```
59//!
60//! # Query Lifecycle
61//!
62//! 1. Call [`Mdns::query()`] with the hostname to resolve
63//! 2. Retrieve the query packet from [`poll_write()`](sansio::Protocol::poll_write)
64//! 3. Send the packet to the mDNS multicast address
65//! 4. When responses arrive, pass them to [`handle_read()`](sansio::Protocol::handle_read)
66//! 5. Check [`poll_event()`](sansio::Protocol::poll_event) for [`MdnsEvent::QueryAnswered`]
67//! 6. If no answer, call [`handle_timeout()`](sansio::Protocol::handle_timeout) to trigger retries
68
69use std::collections::{HashMap, VecDeque};
70use std::net::{IpAddr, Ipv4Addr, SocketAddr};
71use std::time::{Duration, Instant};
72
73use bytes::BytesMut;
74use log::{trace, warn};
75use shared::{TaggedBytesMut, TransportContext, TransportMessage, TransportProtocol};
76
77use crate::config::{DEFAULT_QUERY_INTERVAL, MAX_MESSAGE_RECORDS, MdnsConfig, RESPONSE_TTL};
78use crate::message::header::Header;
79use crate::message::name::Name;
80use crate::message::parser::Parser;
81use crate::message::question::Question;
82use crate::message::resource::a::AResource;
83use crate::message::resource::{Resource, ResourceHeader};
84use crate::message::{DNSCLASS_INET, DnsType, Message};
85use shared::error::{Error, Result};
86
87/// The mDNS multicast group address (224.0.0.251).
88pub const MDNS_MULTICAST_IPV4: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 251);
89
90/// The standard mDNS port (5353).
91pub const MDNS_PORT: u16 = 5353;
92
93/// mDNS multicast destination address (224.0.0.251:5353).
94///
95/// All mDNS queries and responses should be sent to this address.
96///
97/// # Example
98///
99/// ```rust
100/// use rtc_mdns::MDNS_DEST_ADDR;
101///
102/// assert_eq!(MDNS_DEST_ADDR.to_string(), "224.0.0.251:5353");
103/// ```
104pub const MDNS_DEST_ADDR: SocketAddr = SocketAddr::new(IpAddr::V4(MDNS_MULTICAST_IPV4), MDNS_PORT);
105
106/// Unique identifier for tracking mDNS queries.
107///
108/// Each call to [`Mdns::query()`] returns a unique ID that can be used to:
109/// - Track which query was answered in [`MdnsEvent::QueryAnswered`]
110/// - Cancel a pending query with [`Mdns::cancel_query()`]
111/// - Check if a query is still pending with [`Mdns::is_query_pending()`]
112pub type QueryId = u64;
113
114/// A pending mDNS query.
115///
116/// This struct tracks the state of an active query, including when it was
117/// started and when the next retry should occur.
118#[derive(Debug, Clone)]
119pub struct Query {
120 /// Unique identifier for this query.
121 pub id: QueryId,
122 /// The name being queried, with trailing dot (e.g., `"myhost.local."`).
123 pub name_with_suffix: String,
124 /// When the query was started.
125 pub start_time: Instant,
126 /// When the next retry should be sent.
127 pub next_retry: Instant,
128}
129
130/// Events emitted by the mDNS connection.
131///
132/// Poll for events using [`poll_event()`](sansio::Protocol::poll_event) after
133/// calling [`handle_read()`](sansio::Protocol::handle_read) or
134/// [`handle_timeout()`](sansio::Protocol::handle_timeout).
135///
136/// # Example
137///
138/// ```rust,ignore
139/// while let Some(event) = mdns.poll_event() {
140/// match event {
141/// MdnsEvent::QueryAnswered(query_id, addr) => {
142/// println!("Query {} resolved to {}", query_id, addr);
143/// }
144/// MdnsEvent::QueryTimeout(query_id) => {
145/// println!("Query {} timed out", query_id);
146/// }
147/// }
148/// }
149/// ```
150#[derive(Debug)]
151pub enum MdnsEvent {
152 /// A query was successfully answered.
153 ///
154 /// Contains the query ID and the resolved IP address.
155 /// The query is automatically removed from the pending list.
156 QueryAnswered(QueryId, IpAddr),
157
158 /// A query timed out without receiving an answer.
159 ///
160 /// This event is emitted when [`MdnsConfig::query_timeout`](crate::MdnsConfig::query_timeout)
161 /// is set and a query exceeds its timeout duration. The query is automatically
162 /// removed from the pending list when this event is emitted.
163 ///
164 /// To enable query timeouts, configure the connection with
165 /// [`MdnsConfig::with_query_timeout`](crate::MdnsConfig::with_query_timeout):
166 ///
167 /// ```rust
168 /// use rtc_mdns::MdnsConfig;
169 /// use std::time::Duration;
170 ///
171 /// let config = MdnsConfig::default()
172 /// .with_query_timeout(Duration::from_secs(5));
173 /// ```
174 QueryTimeout(QueryId),
175}
176
177/// Sans-I/O mDNS Connection.
178///
179/// This implements a sans-I/O mDNS client/server that can:
180/// - Send mDNS queries and receive answers
181/// - Respond to mDNS questions for configured local names
182///
183/// # Sans-I/O Pattern
184///
185/// This struct implements [`sansio::Protocol`], which means it doesn't perform
186/// any I/O itself. Instead, the caller is responsible for:
187///
188/// 1. Calling [`handle_read()`](sansio::Protocol::handle_read) when packets arrive
189/// 2. Sending packets from [`poll_write()`](sansio::Protocol::poll_write)
190/// 3. Calling [`handle_timeout()`](sansio::Protocol::handle_timeout) on schedule
191/// 4. Processing events from [`poll_event()`](sansio::Protocol::poll_event)
192///
193/// # Example: Complete Event Loop
194///
195/// ```rust
196/// use rtc_mdns::{MdnsConfig, Mdns, MdnsEvent};
197/// use sansio::Protocol;
198/// use std::time::{Duration, Instant};
199///
200/// let mut mdns = Mdns::new(MdnsConfig::default());
201///
202/// // Start a query
203/// let query_id = mdns.query("device.local");
204///
205/// // Simulate an event loop iteration
206/// let now = Instant::now();
207///
208/// // 1. Send queued packets (would go to network in real code)
209/// while let Some(packet) = mdns.poll_write() {
210/// println!("Would send {} bytes to {}", packet.message.len(), packet.transport.peer_addr);
211/// }
212///
213/// // 2. Handle timeout if due
214/// if let Some(deadline) = mdns.poll_timeout() {
215/// if deadline <= now {
216/// mdns.handle_timeout(now).ok();
217/// }
218/// }
219///
220/// // 3. Process any events
221/// while let Some(event) = mdns.poll_event() {
222/// match event {
223/// MdnsEvent::QueryAnswered(query_id, addr) => {
224/// println!("Query {} answered: {}", query_id, addr);
225/// }
226/// MdnsEvent::QueryTimeout(id) => {
227/// println!("Query {} timed out", id);
228/// }
229/// }
230/// }
231/// ```
232///
233/// # Example: Multiple Concurrent Queries
234///
235/// ```rust
236/// use rtc_mdns::{MdnsConfig, Mdns};
237/// use sansio::Protocol;
238///
239/// let mut mdns = Mdns::new(MdnsConfig::default());
240///
241/// // Start multiple queries - each gets a unique ID
242/// let id1 = mdns.query("printer.local");
243/// let id2 = mdns.query("server.local");
244/// let id3 = mdns.query("nas.local");
245///
246/// assert_eq!(mdns.pending_query_count(), 3);
247/// assert!(mdns.is_query_pending(id1));
248/// assert!(mdns.is_query_pending(id2));
249/// assert!(mdns.is_query_pending(id3));
250///
251/// // Cancel one query
252/// mdns.cancel_query(id2);
253/// assert_eq!(mdns.pending_query_count(), 2);
254/// assert!(!mdns.is_query_pending(id2));
255/// ```
256pub struct Mdns {
257 /// MdnsConfiguration
258 config: MdnsConfig,
259
260 /// Local names with trailing dots (for matching questions)
261 local_names: Vec<String>,
262
263 /// Pending queries
264 queries: Vec<Query>,
265
266 /// Next query ID to assign
267 next_query_id: QueryId,
268
269 /// Query retry interval
270 query_interval: Duration,
271
272 /// Query timeout (None = no automatic timeout)
273 query_timeout: Option<Duration>,
274
275 /// Outgoing packet queue
276 write_outs: VecDeque<TaggedBytesMut>,
277
278 /// Event queue
279 event_outs: VecDeque<MdnsEvent>,
280
281 /// Next timeout for query retries
282 next_timeout: Option<Instant>,
283
284 /// Whether the connection is closed
285 closed: bool,
286}
287
288impl Mdns {
289 /// Create a new mDNS connection with the given configuration.
290 ///
291 /// # Arguments
292 ///
293 /// * `config` - MdnsConfiguration for the connection
294 ///
295 /// # Example
296 ///
297 /// ```rust
298 /// use rtc_mdns::{MdnsConfig, Mdns};
299 /// use std::time::Duration;
300 ///
301 /// // Client-only configuration
302 /// let client = Mdns::new(MdnsConfig::default());
303 ///
304 /// // Server configuration
305 /// use std::net::{IpAddr, Ipv4Addr};
306 /// let server = Mdns::new(
307 /// MdnsConfig::default()
308 /// .with_local_names(vec!["myhost.local".to_string()])
309 /// .with_local_ip(
310 /// IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
311 /// )
312 /// );
313 /// ```
314 pub fn new(config: MdnsConfig) -> Self {
315 let local_names = config
316 .local_names
317 .iter()
318 .map(|name| {
319 if name.ends_with('.') {
320 name.clone()
321 } else {
322 format!("{name}.")
323 }
324 })
325 .collect();
326
327 let query_interval = if config.query_interval == Duration::ZERO {
328 DEFAULT_QUERY_INTERVAL
329 } else {
330 config.query_interval
331 };
332
333 let query_timeout = config.query_timeout;
334
335 Self {
336 config,
337 local_names,
338 queries: Vec::new(),
339 next_query_id: 1,
340 query_interval,
341 query_timeout,
342 write_outs: VecDeque::new(),
343 event_outs: VecDeque::new(),
344 next_timeout: None,
345 closed: false,
346 }
347 }
348
349 /// Start a new mDNS query for the given name.
350 ///
351 /// This method queues an mDNS query packet to be sent. The query will be
352 /// automatically retried at the configured interval until either:
353 /// - An answer is received (emits [`MdnsEvent::QueryAnswered`])
354 /// - The query times out (emits [`MdnsEvent::QueryTimeout`] if `query_timeout` is set)
355 /// - The query is cancelled with [`cancel_query()`](Self::cancel_query)
356 /// - The connection is closed
357 ///
358 /// # Arguments
359 ///
360 /// * `name` - The hostname to query (e.g., `"mydevice.local"`)
361 ///
362 /// # Returns
363 ///
364 /// A unique [`QueryId`] that can be used to track this query.
365 ///
366 /// # Example
367 ///
368 /// ```rust
369 /// use rtc_mdns::{MdnsConfig, Mdns, MdnsEvent};
370 /// use sansio::Protocol;
371 ///
372 /// let mut mdns = Mdns::new(MdnsConfig::default());
373 ///
374 /// // Start a query
375 /// let query_id = mdns.query("printer.local");
376 ///
377 /// // The query packet is now queued
378 /// let packet = mdns.poll_write().expect("query packet should be queued");
379 /// assert_eq!(packet.transport.peer_addr.to_string(), "224.0.0.251:5353");
380 ///
381 /// // Track the query
382 /// assert!(mdns.is_query_pending(query_id));
383 /// ```
384 pub fn query(&mut self, name: &str) -> QueryId {
385 let name_with_suffix = if name.ends_with('.') {
386 name.to_string()
387 } else {
388 format!("{name}.")
389 };
390
391 let id = self.next_query_id;
392 self.next_query_id += 1;
393
394 let now = Instant::now();
395 let query = Query {
396 id,
397 name_with_suffix: name_with_suffix.clone(),
398 start_time: now,
399 next_retry: now + self.query_interval, // Schedule first retry after interval
400 };
401 self.queries.push(query);
402
403 // Send the initial query immediately
404 self.send_question(&name_with_suffix, now);
405
406 // Update timeout
407 self.update_next_timeout();
408
409 id
410 }
411
412 /// Cancel a pending query.
413 ///
414 /// Removes the query from the pending list. No more retry packets will
415 /// be sent and no events will be emitted for this query.
416 ///
417 /// # Arguments
418 ///
419 /// * `query_id` - The ID returned by [`query()`](Self::query)
420 ///
421 /// # Example
422 ///
423 /// ```rust
424 /// use rtc_mdns::{MdnsConfig, Mdns};
425 ///
426 /// let mut mdns = Mdns::new(MdnsConfig::default());
427 /// let query_id = mdns.query("device.local");
428 ///
429 /// assert!(mdns.is_query_pending(query_id));
430 /// mdns.cancel_query(query_id);
431 /// assert!(!mdns.is_query_pending(query_id));
432 /// ```
433 pub fn cancel_query(&mut self, query_id: QueryId) {
434 self.queries.retain(|q| q.id != query_id);
435 self.update_next_timeout();
436 }
437
438 /// Check if a query is still pending.
439 ///
440 /// A query is pending until it is either answered or cancelled.
441 ///
442 /// # Arguments
443 ///
444 /// * `query_id` - The ID returned by [`query()`](Self::query)
445 ///
446 /// # Returns
447 ///
448 /// `true` if the query is still waiting for an answer, `false` otherwise.
449 ///
450 /// # Example
451 ///
452 /// ```rust
453 /// use rtc_mdns::{MdnsConfig, Mdns};
454 ///
455 /// let mut mdns = Mdns::new(MdnsConfig::default());
456 /// let query_id = mdns.query("device.local");
457 ///
458 /// // Query is pending until answered or cancelled
459 /// assert!(mdns.is_query_pending(query_id));
460 /// ```
461 pub fn is_query_pending(&self, query_id: QueryId) -> bool {
462 self.queries.iter().any(|q| q.id == query_id)
463 }
464
465 /// Get the number of pending queries.
466 ///
467 /// # Returns
468 ///
469 /// The count of queries that are still waiting for answers.
470 ///
471 /// # Example
472 ///
473 /// ```rust
474 /// use rtc_mdns::{MdnsConfig, Mdns};
475 ///
476 /// let mut mdns = Mdns::new(MdnsConfig::default());
477 /// assert_eq!(mdns.pending_query_count(), 0);
478 ///
479 /// mdns.query("device1.local");
480 /// mdns.query("device2.local");
481 /// assert_eq!(mdns.pending_query_count(), 2);
482 /// ```
483 pub fn pending_query_count(&self) -> usize {
484 self.queries.len()
485 }
486
487 fn send_question(&mut self, name: &str, now: Instant) {
488 let packed_name = match Name::new(name) {
489 Ok(pn) => pn,
490 Err(err) => {
491 log::warn!("Failed to construct mDNS packet: {err}");
492 return;
493 }
494 };
495
496 let raw_query = {
497 let mut msg = Message {
498 header: Header::default(),
499 questions: vec![Question {
500 typ: DnsType::A,
501 class: DNSCLASS_INET,
502 name: packed_name,
503 }],
504 ..Default::default()
505 };
506
507 match msg.pack() {
508 Ok(v) => v,
509 Err(err) => {
510 log::error!("Failed to construct mDNS packet {err}");
511 return;
512 }
513 }
514 };
515
516 log::trace!("Queuing mDNS query for {name}");
517 self.write_outs.push_back(TransportMessage {
518 now,
519 transport: TransportContext {
520 local_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), MDNS_PORT),
521 peer_addr: MDNS_DEST_ADDR,
522 transport_protocol: TransportProtocol::UDP,
523 ecn: None,
524 },
525 message: BytesMut::from(&raw_query[..]),
526 });
527 }
528
529 fn send_answer(&mut self, local_ip: IpAddr, name: &str, now: Instant) {
530 let packed_name = match Name::new(name) {
531 Ok(n) => n,
532 Err(err) => {
533 log::warn!("Failed to pack name for answer: {err}");
534 return;
535 }
536 };
537
538 let raw_answer = {
539 let mut msg = Message {
540 header: Header {
541 response: true,
542 authoritative: true,
543 ..Default::default()
544 },
545 answers: vec![Resource {
546 header: ResourceHeader {
547 typ: DnsType::A,
548 class: DNSCLASS_INET,
549 name: packed_name,
550 ttl: RESPONSE_TTL,
551 ..Default::default()
552 },
553 body: Some(Box::new(AResource {
554 a: match local_ip {
555 IpAddr::V4(ip) => ip.octets(),
556 IpAddr::V6(_) => {
557 log::warn!("Cannot send IPv6 address in A record");
558 //TODO: How to support IPv6 mDNS?
559 return;
560 }
561 },
562 })),
563 }],
564 ..Default::default()
565 };
566
567 match msg.pack() {
568 Ok(v) => v,
569 Err(err) => {
570 log::error!("Failed to pack answer: {err}");
571 return;
572 }
573 }
574 };
575
576 log::trace!("mDNS Queuing answer for {name} -> {local_ip}");
577 self.write_outs.push_back(TransportMessage {
578 now,
579 transport: TransportContext {
580 local_addr: SocketAddr::new(local_ip, MDNS_PORT),
581 peer_addr: MDNS_DEST_ADDR,
582 transport_protocol: TransportProtocol::UDP,
583 ecn: None,
584 },
585 message: BytesMut::from(&raw_answer[..]),
586 });
587 }
588
589 fn process_message(&mut self, msg: &TaggedBytesMut) {
590 let mut parser = Parser::default();
591 if let Err(err) = parser.start(&msg.message) {
592 log::error!("Failed to parse mDNS packet: {err}");
593 return;
594 }
595
596 let src = msg.transport.peer_addr;
597
598 // Process questions (respond if we have local names)
599 self.process_questions(&mut parser, src, msg.now);
600
601 // Process answers (check if they match pending queries)
602 self.process_answers(&mut parser, src);
603 }
604
605 fn process_questions(&mut self, parser: &mut Parser<'_>, _src: SocketAddr, now: Instant) {
606 // Collect names that need answers first to avoid borrow issues
607 let mut names_to_answer: Vec<String> = Vec::new();
608
609 for _ in 0..=MAX_MESSAGE_RECORDS {
610 let q = match parser.question() {
611 Ok(q) => q,
612 Err(err) => {
613 if err == Error::ErrSectionDone {
614 break;
615 }
616 log::error!("Failed to parse question: {err}");
617 return;
618 }
619 };
620
621 // Check if we should answer this question
622 for local_name in &self.local_names {
623 if *local_name == q.name.data {
624 names_to_answer.push(q.name.data.clone());
625 break;
626 }
627 }
628 }
629
630 // Skip remaining questions
631 let _ = parser.skip_all_questions();
632
633 // Now send answers
634 if let Some(local_ip) = self.config.local_ip {
635 for name in names_to_answer {
636 log::trace!(
637 "mDNS Found question for local name: {}, responding with {}",
638 name,
639 local_ip
640 );
641 self.send_answer(local_ip, &name, now);
642 }
643 } else if !names_to_answer.is_empty() {
644 log::warn!("Received questions for local names but no local_addr configured");
645 }
646 }
647
648 fn process_answers(&mut self, parser: &mut Parser<'_>, src: SocketAddr) {
649 for _ in 0..=MAX_MESSAGE_RECORDS {
650 let answer_header = match parser.answer_header() {
651 Ok(a) => a,
652 Err(err) => {
653 if err != Error::ErrSectionDone {
654 log::warn!("Failed to parse answer header: {err}");
655 }
656 return;
657 }
658 };
659
660 // Only process A and AAAA records
661 if answer_header.typ != DnsType::A && answer_header.typ != DnsType::Aaaa {
662 continue;
663 }
664
665 let answer_resource = match parser.answer() {
666 Ok(a) => a,
667 Err(err) => {
668 if err != Error::ErrSectionDone {
669 log::warn!("Failed to parse answer: {err}");
670 }
671 return;
672 }
673 };
674
675 let local_ip = if let Some(body) = answer_resource.body
676 && let Some(a) = body.as_any().downcast_ref::<AResource>()
677 {
678 let local_ip = Ipv4Addr::from_octets(a.a).into();
679 if local_ip != src.ip() {
680 warn!(
681 "mDNS answers with different local ip on AResource {} vs src ip {} on Socket for query {}",
682 local_ip,
683 src.ip(),
684 answer_header.name.data
685 );
686 } else {
687 trace!(
688 "mDNS answers with the local ip {} on AResource and Socket for query {}",
689 local_ip, answer_header.name.data
690 );
691 }
692
693 local_ip
694 } else {
695 warn!(
696 "mDNS answers without AResource, fallback to use src ip {} on Socket for local ip for query {}",
697 src.ip(),
698 answer_header.name.data
699 );
700 src.ip()
701 };
702
703 // Check if this answer matches any pending queries
704 let mut matched_query_ids = HashMap::new();
705 for query in &self.queries {
706 if query.name_with_suffix == answer_header.name.data {
707 matched_query_ids.insert(query.id, local_ip);
708 }
709 }
710
711 // Emit events and remove matched queries
712 for (query_id, local_ip) in matched_query_ids {
713 self.event_outs
714 .push_back(MdnsEvent::QueryAnswered(query_id, local_ip));
715 self.queries.retain(|q| q.id != query_id);
716 }
717 }
718 }
719
720 fn update_next_timeout(&mut self) {
721 self.next_timeout = self.queries.iter().map(|q| q.next_retry).min();
722 }
723}
724
725impl sansio::Protocol<TaggedBytesMut, (), ()> for Mdns {
726 type Rout = ();
727 type Wout = TaggedBytesMut;
728 type Eout = MdnsEvent;
729 type Error = Error;
730 type Time = Instant;
731
732 /// Process an incoming mDNS packet.
733 ///
734 /// Call this method when a UDP packet is received on the mDNS multicast
735 /// address (224.0.0.251:5353).
736 ///
737 /// The connection will:
738 /// - Parse the packet as an mDNS message
739 /// - If it contains questions for our `local_names`, queue response packets
740 /// - If it contains answers matching pending queries, emit events
741 ///
742 /// # Arguments
743 ///
744 /// * `msg` - The received packet with transport context
745 ///
746 /// # Errors
747 ///
748 /// Returns [`Error::ErrConnectionClosed`] if the connection has been closed.
749 ///
750 /// # Example
751 ///
752 /// ```rust,ignore
753 /// use bytes::BytesMut;
754 /// use shared::{TaggedBytesMut, TransportContext, TransportProtocol};
755 /// use std::time::Instant;
756 ///
757 /// // When a packet arrives from the network:
758 /// let msg = TaggedBytesMut {
759 /// now: Instant::now(),
760 /// transport: TransportContext {
761 /// local_addr: "0.0.0.0:5353".parse().unwrap(),
762 /// peer_addr: src_addr,
763 /// transport_protocol: TransportProtocol::UDP,
764 /// ecn: None,
765 /// },
766 /// message: BytesMut::from(&packet_data[..]),
767 /// };
768 /// mdns.handle_read(msg)?;
769 ///
770 /// // Check for events
771 /// while let Some(event) = mdns.poll_event() {
772 /// // handle event
773 /// }
774 /// ```
775 fn handle_read(&mut self, msg: TaggedBytesMut) -> Result<()> {
776 if self.closed {
777 return Err(Error::ErrConnectionClosed);
778 }
779 self.process_message(&msg);
780 self.update_next_timeout();
781 Ok(())
782 }
783
784 /// mDNS doesn't produce read outputs.
785 ///
786 /// Answers to queries are delivered via `poll_event()`
787 /// as [`MdnsEvent::QueryAnswered`] events instead.
788 ///
789 /// # Returns
790 ///
791 /// Always returns `None`.
792 fn poll_read(&mut self) -> Option<Self::Rout> {
793 None
794 }
795
796 /// Handle write requests (not used).
797 ///
798 /// Queries are initiated via the [`query()`](Mdns::query) method instead
799 /// of through this interface.
800 fn handle_write(&mut self, _msg: ()) -> Result<()> {
801 Ok(())
802 }
803
804 /// Get the next packet to send.
805 ///
806 /// Call this method repeatedly until it returns `None` to retrieve all
807 /// queued packets. Packets should be sent via UDP to the address specified
808 /// in `packet.transport.peer_addr` (typically 224.0.0.251:5353).
809 ///
810 /// Packets are queued when:
811 /// - A query is started with [`query()`](Mdns::query)
812 /// - A query retry is triggered by `handle_timeout()`
813 /// - A response is generated for a matching question
814 ///
815 /// # Returns
816 ///
817 /// The next packet to send, or `None` if the queue is empty.
818 ///
819 /// # Example
820 ///
821 /// ```rust
822 /// use rtc_mdns::{MdnsConfig, Mdns};
823 /// use sansio::Protocol;
824 ///
825 /// let mut mdns = Mdns::new(MdnsConfig::default());
826 /// mdns.query("device.local");
827 ///
828 /// // Send all queued packets
829 /// while let Some(packet) = mdns.poll_write() {
830 /// // socket.send_to(&packet.message, packet.transport.peer_addr).await?;
831 /// println!("Send to {}", packet.transport.peer_addr);
832 /// }
833 /// ```
834 fn poll_write(&mut self) -> Option<Self::Wout> {
835 self.write_outs.pop_front()
836 }
837
838 /// Handle external events (not used).
839 ///
840 /// mDNS does not use external events. This method does nothing.
841 fn handle_event(&mut self, _evt: ()) -> Result<()> {
842 Ok(())
843 }
844
845 /// Get the next event.
846 ///
847 /// Call this method repeatedly until it returns `None` to process all
848 /// queued events. Events are generated when:
849 /// - An mDNS answer matches a pending query ([`MdnsEvent::QueryAnswered`])
850 ///
851 /// # Returns
852 ///
853 /// The next event, or `None` if the queue is empty.
854 ///
855 /// # Example
856 ///
857 /// ```rust,ignore
858 /// while let Some(event) = mdns.poll_event() {
859 /// match event {
860 /// MdnsEvent::QueryAnswered(query_id, addr) => {
861 /// println!("Query {} resolved to {}", query_id, addr);
862 /// }
863 /// MdnsEvent::QueryTimeout(id) => {
864 /// println!("Query {} timed out", id);
865 /// }
866 /// }
867 /// }
868 /// ```
869 fn poll_event(&mut self) -> Option<Self::Eout> {
870 self.event_outs.pop_front()
871 }
872
873 /// Handle timeout - retry pending queries.
874 ///
875 /// Call this method when the deadline from `poll_timeout()`
876 /// is reached. This triggers retry logic for pending queries.
877 ///
878 /// For each query whose retry time has passed, a new query packet will
879 /// be queued and can be retrieved with `poll_write()`.
880 ///
881 /// # Arguments
882 ///
883 /// * `now` - The current time
884 ///
885 /// # Errors
886 ///
887 /// Returns [`Error::ErrConnectionClosed`] if the connection has been closed.
888 ///
889 /// # Example
890 ///
891 /// ```rust
892 /// use rtc_mdns::{MdnsConfig, Mdns};
893 /// use sansio::Protocol;
894 /// use std::time::{Duration, Instant};
895 ///
896 /// let mut mdns = Mdns::new(
897 /// MdnsConfig::default().with_query_interval(Duration::from_millis(100))
898 /// );
899 /// mdns.query("device.local");
900 ///
901 /// // Consume initial packet
902 /// mdns.poll_write();
903 ///
904 /// // Simulate time passing
905 /// let future = Instant::now() + Duration::from_millis(150);
906 /// mdns.handle_timeout(future).unwrap();
907 ///
908 /// // A retry packet should be queued
909 /// assert!(mdns.poll_write().is_some());
910 /// ```
911 fn handle_timeout(&mut self, now: Self::Time) -> Result<()> {
912 if self.closed {
913 return Err(Error::ErrConnectionClosed);
914 }
915
916 if let Some(next_timeout) = self.next_timeout.as_ref()
917 && next_timeout <= &now
918 {
919 // Check for timed out queries first
920 if let Some(timeout_duration) = self.query_timeout {
921 let mut timed_out_ids = Vec::new();
922 for query in &self.queries {
923 if now.duration_since(query.start_time) >= timeout_duration {
924 timed_out_ids.push(query.id);
925 }
926 }
927
928 // Emit timeout events and remove timed out queries
929 for query_id in timed_out_ids {
930 log::debug!(
931 "mDNS Query {} timed out after {:?}",
932 query_id,
933 timeout_duration
934 );
935 self.event_outs.push_back(MdnsEvent::QueryTimeout(query_id));
936 self.queries.retain(|q| q.id != query_id);
937 }
938 }
939
940 // Retry queries that are due
941 let mut names_to_query = Vec::new();
942 for query in &mut self.queries {
943 if query.next_retry <= now {
944 names_to_query.push(query.name_with_suffix.clone());
945 query.next_retry = now + self.query_interval;
946 }
947 }
948
949 for name in names_to_query {
950 self.send_question(&name, now);
951 }
952
953 self.update_next_timeout();
954 }
955 Ok(())
956 }
957
958 /// Get the next timeout deadline.
959 ///
960 /// Returns the time at which `handle_timeout()` should
961 /// be called next. Use this to schedule your event loop's sleep/wait.
962 ///
963 /// # Returns
964 ///
965 /// - `Some(instant)` if there are pending queries that need retries
966 /// - `None` if there are no pending queries
967 ///
968 /// # Example
969 ///
970 /// ```rust
971 /// use rtc_mdns::{MdnsConfig, Mdns};
972 /// use sansio::Protocol;
973 ///
974 /// let mut mdns = Mdns::new(MdnsConfig::default());
975 ///
976 /// // No queries, no timeout
977 /// assert!(mdns.poll_timeout().is_none());
978 ///
979 /// // Start a query
980 /// mdns.query("device.local");
981 ///
982 /// // Now there's a timeout scheduled
983 /// assert!(mdns.poll_timeout().is_some());
984 /// ```
985 fn poll_timeout(&mut self) -> Option<Self::Time> {
986 self.next_timeout
987 }
988
989 /// Close the connection.
990 ///
991 /// This clears all pending queries and queued packets/events.
992 /// After closing, `handle_read()` and
993 /// `handle_timeout()` will return
994 /// [`Error::ErrConnectionClosed`].
995 ///
996 /// # Example
997 ///
998 /// ```rust
999 /// use rtc_mdns::{MdnsConfig, Mdns};
1000 /// use sansio::Protocol;
1001 ///
1002 /// let mut mdns = Mdns::new(MdnsConfig::default());
1003 /// mdns.query("device.local");
1004 ///
1005 /// assert_eq!(mdns.pending_query_count(), 1);
1006 ///
1007 /// mdns.close().unwrap();
1008 ///
1009 /// // All state is cleared
1010 /// assert_eq!(mdns.pending_query_count(), 0);
1011 /// assert!(mdns.poll_write().is_none());
1012 /// assert!(mdns.poll_timeout().is_none());
1013 /// ```
1014 fn close(&mut self) -> Result<()> {
1015 self.closed = true;
1016 self.queries.clear();
1017 self.write_outs.clear();
1018 self.event_outs.clear();
1019 self.next_timeout = None;
1020 Ok(())
1021 }
1022}
1023
1024#[cfg(test)]
1025mod mdns_test;