Skip to main content

rtc_mdns/proto/
mod.rs

1//! Sans-I/O mDNS Connection implementation.
2//!
3//! This module provides [`Mdns`], a sans-I/O implementation of an mDNS client/server
4//! that implements the [`sansio::Protocol`] trait for integration with any I/O framework.
5//!
6//! # Overview
7//!
8//! The [`Mdns`] struct handles the mDNS protocol logic without performing any I/O.
9//! The caller is responsible for:
10//!
11//! 1. **Network I/O**: Reading/writing UDP packets to/from 224.0.0.251:5353
12//! 2. **Timing**: Calling `handle_timeout()` when `poll_timeout()` expires
13//! 3. **Event Processing**: Handling events from `poll_event()`
14//!
15//! # Client Usage
16//!
17//! To query for a hostname:
18//!
19//! ```rust
20//! use rtc_mdns::{MdnsConfig, Mdns, MdnsEvent};
21//! use sansio::Protocol;
22//! use std::time::Instant;
23//!
24//! let mut mdns_client = Mdns::new(MdnsConfig::default());
25//!
26//! // Start a query - this queues a packet to send
27//! let query_id = mdns_client.query("printer.local");
28//!
29//! // Get the packet to send over the network
30//! if let Some(packet) = mdns_client.poll_write() {
31//!     // Send packet.message to packet.transport.peer_addr via UDP
32//!     println!("Send {} bytes to {}", packet.message.len(), packet.transport.peer_addr);
33//! }
34//!
35//! // When a response packet arrives, call handle_read()
36//! // Then check poll_event() for QueryAnswered events
37//! ```
38//!
39//! # Server Usage
40//!
41//! To respond to queries:
42//!
43//! ```rust
44//! use rtc_mdns::{MdnsConfig, Mdns};
45//! use std::net::{IpAddr, Ipv4Addr};
46//!
47//! let config = MdnsConfig::default()
48//!     .with_local_names(vec!["myserver.local".to_string()])
49//!     .with_local_ip(
50//!         IpAddr::V4(Ipv4Addr::new(192, 168, 1, 10)),
51//!     );
52//!
53//! let mut mdns_client = Mdns::new(config);
54//!
55//! // When a query packet arrives, call handle_read()
56//! // The connection automatically queues responses for configured local_names
57//! // Retrieve them with poll_write()
58//! ```
59//!
60//! # Query Lifecycle
61//!
62//! 1. Call [`Mdns::query()`] with the hostname to resolve
63//! 2. Retrieve the query packet from [`poll_write()`](sansio::Protocol::poll_write)
64//! 3. Send the packet to the mDNS multicast address
65//! 4. When responses arrive, pass them to [`handle_read()`](sansio::Protocol::handle_read)
66//! 5. Check [`poll_event()`](sansio::Protocol::poll_event) for [`MdnsEvent::QueryAnswered`]
67//! 6. If no answer, call [`handle_timeout()`](sansio::Protocol::handle_timeout) to trigger retries
68
69use std::collections::{HashMap, VecDeque};
70use std::net::{IpAddr, Ipv4Addr, SocketAddr};
71use std::time::{Duration, Instant};
72
73use bytes::BytesMut;
74use log::{trace, warn};
75use shared::{TaggedBytesMut, TransportContext, TransportMessage, TransportProtocol};
76
77use crate::config::{DEFAULT_QUERY_INTERVAL, MAX_MESSAGE_RECORDS, MdnsConfig, RESPONSE_TTL};
78use crate::message::header::Header;
79use crate::message::name::Name;
80use crate::message::parser::Parser;
81use crate::message::question::Question;
82use crate::message::resource::a::AResource;
83use crate::message::resource::{Resource, ResourceHeader};
84use crate::message::{DNSCLASS_INET, DnsType, Message};
85use shared::error::{Error, Result};
86
87/// The mDNS multicast group address (224.0.0.251).
88pub const MDNS_MULTICAST_IPV4: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 251);
89
90/// The standard mDNS port (5353).
91pub const MDNS_PORT: u16 = 5353;
92
93/// mDNS multicast destination address (224.0.0.251:5353).
94///
95/// All mDNS queries and responses should be sent to this address.
96///
97/// # Example
98///
99/// ```rust
100/// use rtc_mdns::MDNS_DEST_ADDR;
101///
102/// assert_eq!(MDNS_DEST_ADDR.to_string(), "224.0.0.251:5353");
103/// ```
104pub const MDNS_DEST_ADDR: SocketAddr = SocketAddr::new(IpAddr::V4(MDNS_MULTICAST_IPV4), MDNS_PORT);
105
106/// Unique identifier for tracking mDNS queries.
107///
108/// Each call to [`Mdns::query()`] returns a unique ID that can be used to:
109/// - Track which query was answered in [`MdnsEvent::QueryAnswered`]
110/// - Cancel a pending query with [`Mdns::cancel_query()`]
111/// - Check if a query is still pending with [`Mdns::is_query_pending()`]
112pub type QueryId = u64;
113
114/// A pending mDNS query.
115///
116/// This struct tracks the state of an active query, including when it was
117/// started and when the next retry should occur.
118#[derive(Debug, Clone)]
119pub struct Query {
120    /// Unique identifier for this query.
121    pub id: QueryId,
122    /// The name being queried, with trailing dot (e.g., `"myhost.local."`).
123    pub name_with_suffix: String,
124    /// When the query was started.
125    pub start_time: Instant,
126    /// When the next retry should be sent.
127    pub next_retry: Instant,
128}
129
130/// Events emitted by the mDNS connection.
131///
132/// Poll for events using [`poll_event()`](sansio::Protocol::poll_event) after
133/// calling [`handle_read()`](sansio::Protocol::handle_read) or
134/// [`handle_timeout()`](sansio::Protocol::handle_timeout).
135///
136/// # Example
137///
138/// ```rust,ignore
139/// while let Some(event) = mdns.poll_event() {
140///     match event {
141///         MdnsEvent::QueryAnswered(query_id, addr) => {
142///             println!("Query {} resolved to {}", query_id, addr);
143///         }
144///         MdnsEvent::QueryTimeout(query_id) => {
145///             println!("Query {} timed out", query_id);
146///         }
147///     }
148/// }
149/// ```
150#[derive(Debug)]
151pub enum MdnsEvent {
152    /// A query was successfully answered.
153    ///
154    /// Contains the query ID and the resolved IP address.
155    /// The query is automatically removed from the pending list.
156    QueryAnswered(QueryId, IpAddr),
157
158    /// A query timed out without receiving an answer.
159    ///
160    /// This event is emitted when [`MdnsConfig::query_timeout`](crate::MdnsConfig::query_timeout)
161    /// is set and a query exceeds its timeout duration. The query is automatically
162    /// removed from the pending list when this event is emitted.
163    ///
164    /// To enable query timeouts, configure the connection with
165    /// [`MdnsConfig::with_query_timeout`](crate::MdnsConfig::with_query_timeout):
166    ///
167    /// ```rust
168    /// use rtc_mdns::MdnsConfig;
169    /// use std::time::Duration;
170    ///
171    /// let config = MdnsConfig::default()
172    ///     .with_query_timeout(Duration::from_secs(5));
173    /// ```
174    QueryTimeout(QueryId),
175}
176
177/// Sans-I/O mDNS Connection.
178///
179/// This implements a sans-I/O mDNS client/server that can:
180/// - Send mDNS queries and receive answers
181/// - Respond to mDNS questions for configured local names
182///
183/// # Sans-I/O Pattern
184///
185/// This struct implements [`sansio::Protocol`], which means it doesn't perform
186/// any I/O itself. Instead, the caller is responsible for:
187///
188/// 1. Calling [`handle_read()`](sansio::Protocol::handle_read) when packets arrive
189/// 2. Sending packets from [`poll_write()`](sansio::Protocol::poll_write)
190/// 3. Calling [`handle_timeout()`](sansio::Protocol::handle_timeout) on schedule
191/// 4. Processing events from [`poll_event()`](sansio::Protocol::poll_event)
192///
193/// # Example: Complete Event Loop
194///
195/// ```rust
196/// use rtc_mdns::{MdnsConfig, Mdns, MdnsEvent};
197/// use sansio::Protocol;
198/// use std::time::{Duration, Instant};
199///
200/// let mut mdns = Mdns::new(MdnsConfig::default());
201///
202/// // Start a query
203/// let query_id = mdns.query("device.local");
204///
205/// // Simulate an event loop iteration
206/// let now = Instant::now();
207///
208/// // 1. Send queued packets (would go to network in real code)
209/// while let Some(packet) = mdns.poll_write() {
210///     println!("Would send {} bytes to {}", packet.message.len(), packet.transport.peer_addr);
211/// }
212///
213/// // 2. Handle timeout if due
214/// if let Some(deadline) = mdns.poll_timeout() {
215///     if deadline <= now {
216///         mdns.handle_timeout(now).ok();
217///     }
218/// }
219///
220/// // 3. Process any events
221/// while let Some(event) = mdns.poll_event() {
222///     match event {
223///         MdnsEvent::QueryAnswered(query_id, addr) => {
224///             println!("Query {} answered: {}", query_id, addr);
225///         }
226///         MdnsEvent::QueryTimeout(id) => {
227///             println!("Query {} timed out", id);
228///         }
229///     }
230/// }
231/// ```
232///
233/// # Example: Multiple Concurrent Queries
234///
235/// ```rust
236/// use rtc_mdns::{MdnsConfig, Mdns};
237/// use sansio::Protocol;
238///
239/// let mut mdns = Mdns::new(MdnsConfig::default());
240///
241/// // Start multiple queries - each gets a unique ID
242/// let id1 = mdns.query("printer.local");
243/// let id2 = mdns.query("server.local");
244/// let id3 = mdns.query("nas.local");
245///
246/// assert_eq!(mdns.pending_query_count(), 3);
247/// assert!(mdns.is_query_pending(id1));
248/// assert!(mdns.is_query_pending(id2));
249/// assert!(mdns.is_query_pending(id3));
250///
251/// // Cancel one query
252/// mdns.cancel_query(id2);
253/// assert_eq!(mdns.pending_query_count(), 2);
254/// assert!(!mdns.is_query_pending(id2));
255/// ```
256pub struct Mdns {
257    /// MdnsConfiguration
258    config: MdnsConfig,
259
260    /// Local names with trailing dots (for matching questions)
261    local_names: Vec<String>,
262
263    /// Pending queries
264    queries: Vec<Query>,
265
266    /// Next query ID to assign
267    next_query_id: QueryId,
268
269    /// Query retry interval
270    query_interval: Duration,
271
272    /// Query timeout (None = no automatic timeout)
273    query_timeout: Option<Duration>,
274
275    /// Outgoing packet queue
276    write_outs: VecDeque<TaggedBytesMut>,
277
278    /// Event queue
279    event_outs: VecDeque<MdnsEvent>,
280
281    /// Next timeout for query retries
282    next_timeout: Option<Instant>,
283
284    /// Whether the connection is closed
285    closed: bool,
286}
287
288impl Mdns {
289    /// Create a new mDNS connection with the given configuration.
290    ///
291    /// # Arguments
292    ///
293    /// * `config` - MdnsConfiguration for the connection
294    ///
295    /// # Example
296    ///
297    /// ```rust
298    /// use rtc_mdns::{MdnsConfig, Mdns};
299    /// use std::time::Duration;
300    ///
301    /// // Client-only configuration
302    /// let client = Mdns::new(MdnsConfig::default());
303    ///
304    /// // Server configuration
305    /// use std::net::{IpAddr, Ipv4Addr};
306    /// let server = Mdns::new(
307    ///     MdnsConfig::default()
308    ///         .with_local_names(vec!["myhost.local".to_string()])
309    ///         .with_local_ip(
310    ///             IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
311    ///         )
312    /// );
313    /// ```
314    pub fn new(config: MdnsConfig) -> Self {
315        let local_names = config
316            .local_names
317            .iter()
318            .map(|name| {
319                if name.ends_with('.') {
320                    name.clone()
321                } else {
322                    format!("{name}.")
323                }
324            })
325            .collect();
326
327        let query_interval = if config.query_interval == Duration::ZERO {
328            DEFAULT_QUERY_INTERVAL
329        } else {
330            config.query_interval
331        };
332
333        let query_timeout = config.query_timeout;
334
335        Self {
336            config,
337            local_names,
338            queries: Vec::new(),
339            next_query_id: 1,
340            query_interval,
341            query_timeout,
342            write_outs: VecDeque::new(),
343            event_outs: VecDeque::new(),
344            next_timeout: None,
345            closed: false,
346        }
347    }
348
349    /// Start a new mDNS query for the given name.
350    ///
351    /// This method queues an mDNS query packet to be sent. The query will be
352    /// automatically retried at the configured interval until either:
353    /// - An answer is received (emits [`MdnsEvent::QueryAnswered`])
354    /// - The query times out (emits [`MdnsEvent::QueryTimeout`] if `query_timeout` is set)
355    /// - The query is cancelled with [`cancel_query()`](Self::cancel_query)
356    /// - The connection is closed
357    ///
358    /// # Arguments
359    ///
360    /// * `name` - The hostname to query (e.g., `"mydevice.local"`)
361    ///
362    /// # Returns
363    ///
364    /// A unique [`QueryId`] that can be used to track this query.
365    ///
366    /// # Example
367    ///
368    /// ```rust
369    /// use rtc_mdns::{MdnsConfig, Mdns, MdnsEvent};
370    /// use sansio::Protocol;
371    ///
372    /// let mut mdns = Mdns::new(MdnsConfig::default());
373    ///
374    /// // Start a query
375    /// let query_id = mdns.query("printer.local");
376    ///
377    /// // The query packet is now queued
378    /// let packet = mdns.poll_write().expect("query packet should be queued");
379    /// assert_eq!(packet.transport.peer_addr.to_string(), "224.0.0.251:5353");
380    ///
381    /// // Track the query
382    /// assert!(mdns.is_query_pending(query_id));
383    /// ```
384    pub fn query(&mut self, name: &str) -> QueryId {
385        let name_with_suffix = if name.ends_with('.') {
386            name.to_string()
387        } else {
388            format!("{name}.")
389        };
390
391        let id = self.next_query_id;
392        self.next_query_id += 1;
393
394        let now = Instant::now();
395        let query = Query {
396            id,
397            name_with_suffix: name_with_suffix.clone(),
398            start_time: now,
399            next_retry: now + self.query_interval, // Schedule first retry after interval
400        };
401        self.queries.push(query);
402
403        // Send the initial query immediately
404        self.send_question(&name_with_suffix, now);
405
406        // Update timeout
407        self.update_next_timeout();
408
409        id
410    }
411
412    /// Cancel a pending query.
413    ///
414    /// Removes the query from the pending list. No more retry packets will
415    /// be sent and no events will be emitted for this query.
416    ///
417    /// # Arguments
418    ///
419    /// * `query_id` - The ID returned by [`query()`](Self::query)
420    ///
421    /// # Example
422    ///
423    /// ```rust
424    /// use rtc_mdns::{MdnsConfig, Mdns};
425    ///
426    /// let mut mdns = Mdns::new(MdnsConfig::default());
427    /// let query_id = mdns.query("device.local");
428    ///
429    /// assert!(mdns.is_query_pending(query_id));
430    /// mdns.cancel_query(query_id);
431    /// assert!(!mdns.is_query_pending(query_id));
432    /// ```
433    pub fn cancel_query(&mut self, query_id: QueryId) {
434        self.queries.retain(|q| q.id != query_id);
435        self.update_next_timeout();
436    }
437
438    /// Check if a query is still pending.
439    ///
440    /// A query is pending until it is either answered or cancelled.
441    ///
442    /// # Arguments
443    ///
444    /// * `query_id` - The ID returned by [`query()`](Self::query)
445    ///
446    /// # Returns
447    ///
448    /// `true` if the query is still waiting for an answer, `false` otherwise.
449    ///
450    /// # Example
451    ///
452    /// ```rust
453    /// use rtc_mdns::{MdnsConfig, Mdns};
454    ///
455    /// let mut mdns = Mdns::new(MdnsConfig::default());
456    /// let query_id = mdns.query("device.local");
457    ///
458    /// // Query is pending until answered or cancelled
459    /// assert!(mdns.is_query_pending(query_id));
460    /// ```
461    pub fn is_query_pending(&self, query_id: QueryId) -> bool {
462        self.queries.iter().any(|q| q.id == query_id)
463    }
464
465    /// Get the number of pending queries.
466    ///
467    /// # Returns
468    ///
469    /// The count of queries that are still waiting for answers.
470    ///
471    /// # Example
472    ///
473    /// ```rust
474    /// use rtc_mdns::{MdnsConfig, Mdns};
475    ///
476    /// let mut mdns = Mdns::new(MdnsConfig::default());
477    /// assert_eq!(mdns.pending_query_count(), 0);
478    ///
479    /// mdns.query("device1.local");
480    /// mdns.query("device2.local");
481    /// assert_eq!(mdns.pending_query_count(), 2);
482    /// ```
483    pub fn pending_query_count(&self) -> usize {
484        self.queries.len()
485    }
486
487    fn send_question(&mut self, name: &str, now: Instant) {
488        let packed_name = match Name::new(name) {
489            Ok(pn) => pn,
490            Err(err) => {
491                log::warn!("Failed to construct mDNS packet: {err}");
492                return;
493            }
494        };
495
496        let raw_query = {
497            let mut msg = Message {
498                header: Header::default(),
499                questions: vec![Question {
500                    typ: DnsType::A,
501                    class: DNSCLASS_INET,
502                    name: packed_name,
503                }],
504                ..Default::default()
505            };
506
507            match msg.pack() {
508                Ok(v) => v,
509                Err(err) => {
510                    log::error!("Failed to construct mDNS packet {err}");
511                    return;
512                }
513            }
514        };
515
516        log::trace!("Queuing mDNS query for {name}");
517        self.write_outs.push_back(TransportMessage {
518            now,
519            transport: TransportContext {
520                local_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), MDNS_PORT),
521                peer_addr: MDNS_DEST_ADDR,
522                transport_protocol: TransportProtocol::UDP,
523                ecn: None,
524            },
525            message: BytesMut::from(&raw_query[..]),
526        });
527    }
528
529    fn send_answer(&mut self, local_ip: IpAddr, name: &str, now: Instant) {
530        let packed_name = match Name::new(name) {
531            Ok(n) => n,
532            Err(err) => {
533                log::warn!("Failed to pack name for answer: {err}");
534                return;
535            }
536        };
537
538        let raw_answer = {
539            let mut msg = Message {
540                header: Header {
541                    response: true,
542                    authoritative: true,
543                    ..Default::default()
544                },
545                answers: vec![Resource {
546                    header: ResourceHeader {
547                        typ: DnsType::A,
548                        class: DNSCLASS_INET,
549                        name: packed_name,
550                        ttl: RESPONSE_TTL,
551                        ..Default::default()
552                    },
553                    body: Some(Box::new(AResource {
554                        a: match local_ip {
555                            IpAddr::V4(ip) => ip.octets(),
556                            IpAddr::V6(_) => {
557                                log::warn!("Cannot send IPv6 address in A record");
558                                //TODO: How to support IPv6 mDNS?
559                                return;
560                            }
561                        },
562                    })),
563                }],
564                ..Default::default()
565            };
566
567            match msg.pack() {
568                Ok(v) => v,
569                Err(err) => {
570                    log::error!("Failed to pack answer: {err}");
571                    return;
572                }
573            }
574        };
575
576        log::trace!("mDNS Queuing answer for {name} -> {local_ip}");
577        self.write_outs.push_back(TransportMessage {
578            now,
579            transport: TransportContext {
580                local_addr: SocketAddr::new(local_ip, MDNS_PORT),
581                peer_addr: MDNS_DEST_ADDR,
582                transport_protocol: TransportProtocol::UDP,
583                ecn: None,
584            },
585            message: BytesMut::from(&raw_answer[..]),
586        });
587    }
588
589    fn process_message(&mut self, msg: &TaggedBytesMut) {
590        let mut parser = Parser::default();
591        if let Err(err) = parser.start(&msg.message) {
592            log::error!("Failed to parse mDNS packet: {err}");
593            return;
594        }
595
596        let src = msg.transport.peer_addr;
597
598        // Process questions (respond if we have local names)
599        self.process_questions(&mut parser, src, msg.now);
600
601        // Process answers (check if they match pending queries)
602        self.process_answers(&mut parser, src);
603    }
604
605    fn process_questions(&mut self, parser: &mut Parser<'_>, _src: SocketAddr, now: Instant) {
606        // Collect names that need answers first to avoid borrow issues
607        let mut names_to_answer: Vec<String> = Vec::new();
608
609        for _ in 0..=MAX_MESSAGE_RECORDS {
610            let q = match parser.question() {
611                Ok(q) => q,
612                Err(err) => {
613                    if err == Error::ErrSectionDone {
614                        break;
615                    }
616                    log::error!("Failed to parse question: {err}");
617                    return;
618                }
619            };
620
621            // Check if we should answer this question
622            for local_name in &self.local_names {
623                if *local_name == q.name.data {
624                    names_to_answer.push(q.name.data.clone());
625                    break;
626                }
627            }
628        }
629
630        // Skip remaining questions
631        let _ = parser.skip_all_questions();
632
633        // Now send answers
634        if let Some(local_ip) = self.config.local_ip {
635            for name in names_to_answer {
636                log::trace!(
637                    "mDNS Found question for local name: {}, responding with {}",
638                    name,
639                    local_ip
640                );
641                self.send_answer(local_ip, &name, now);
642            }
643        } else if !names_to_answer.is_empty() {
644            log::warn!("Received questions for local names but no local_addr configured");
645        }
646    }
647
648    fn process_answers(&mut self, parser: &mut Parser<'_>, src: SocketAddr) {
649        for _ in 0..=MAX_MESSAGE_RECORDS {
650            let answer_header = match parser.answer_header() {
651                Ok(a) => a,
652                Err(err) => {
653                    if err != Error::ErrSectionDone {
654                        log::warn!("Failed to parse answer header: {err}");
655                    }
656                    return;
657                }
658            };
659
660            // Only process A and AAAA records
661            if answer_header.typ != DnsType::A && answer_header.typ != DnsType::Aaaa {
662                continue;
663            }
664
665            let answer_resource = match parser.answer() {
666                Ok(a) => a,
667                Err(err) => {
668                    if err != Error::ErrSectionDone {
669                        log::warn!("Failed to parse answer: {err}");
670                    }
671                    return;
672                }
673            };
674
675            let local_ip = if let Some(body) = answer_resource.body
676                && let Some(a) = body.as_any().downcast_ref::<AResource>()
677            {
678                let local_ip = Ipv4Addr::from_octets(a.a).into();
679                if local_ip != src.ip() {
680                    warn!(
681                        "mDNS answers with different local ip on AResource {} vs src ip {} on Socket for query {}",
682                        local_ip,
683                        src.ip(),
684                        answer_header.name.data
685                    );
686                } else {
687                    trace!(
688                        "mDNS answers with the local ip {} on AResource and Socket for query {}",
689                        local_ip, answer_header.name.data
690                    );
691                }
692
693                local_ip
694            } else {
695                warn!(
696                    "mDNS answers without AResource, fallback to use src ip {} on Socket for local ip for query {}",
697                    src.ip(),
698                    answer_header.name.data
699                );
700                src.ip()
701            };
702
703            // Check if this answer matches any pending queries
704            let mut matched_query_ids = HashMap::new();
705            for query in &self.queries {
706                if query.name_with_suffix == answer_header.name.data {
707                    matched_query_ids.insert(query.id, local_ip);
708                }
709            }
710
711            // Emit events and remove matched queries
712            for (query_id, local_ip) in matched_query_ids {
713                self.event_outs
714                    .push_back(MdnsEvent::QueryAnswered(query_id, local_ip));
715                self.queries.retain(|q| q.id != query_id);
716            }
717        }
718    }
719
720    fn update_next_timeout(&mut self) {
721        self.next_timeout = self.queries.iter().map(|q| q.next_retry).min();
722    }
723}
724
725impl sansio::Protocol<TaggedBytesMut, (), ()> for Mdns {
726    type Rout = ();
727    type Wout = TaggedBytesMut;
728    type Eout = MdnsEvent;
729    type Error = Error;
730    type Time = Instant;
731
732    /// Process an incoming mDNS packet.
733    ///
734    /// Call this method when a UDP packet is received on the mDNS multicast
735    /// address (224.0.0.251:5353).
736    ///
737    /// The connection will:
738    /// - Parse the packet as an mDNS message
739    /// - If it contains questions for our `local_names`, queue response packets
740    /// - If it contains answers matching pending queries, emit events
741    ///
742    /// # Arguments
743    ///
744    /// * `msg` - The received packet with transport context
745    ///
746    /// # Errors
747    ///
748    /// Returns [`Error::ErrConnectionClosed`] if the connection has been closed.
749    ///
750    /// # Example
751    ///
752    /// ```rust,ignore
753    /// use bytes::BytesMut;
754    /// use shared::{TaggedBytesMut, TransportContext, TransportProtocol};
755    /// use std::time::Instant;
756    ///
757    /// // When a packet arrives from the network:
758    /// let msg = TaggedBytesMut {
759    ///     now: Instant::now(),
760    ///     transport: TransportContext {
761    ///         local_addr: "0.0.0.0:5353".parse().unwrap(),
762    ///         peer_addr: src_addr,
763    ///         transport_protocol: TransportProtocol::UDP,
764    ///         ecn: None,
765    ///     },
766    ///     message: BytesMut::from(&packet_data[..]),
767    /// };
768    /// mdns.handle_read(msg)?;
769    ///
770    /// // Check for events
771    /// while let Some(event) = mdns.poll_event() {
772    ///     // handle event
773    /// }
774    /// ```
775    fn handle_read(&mut self, msg: TaggedBytesMut) -> Result<()> {
776        if self.closed {
777            return Err(Error::ErrConnectionClosed);
778        }
779        self.process_message(&msg);
780        self.update_next_timeout();
781        Ok(())
782    }
783
784    /// mDNS doesn't produce read outputs.
785    ///
786    /// Answers to queries are delivered via `poll_event()`
787    /// as [`MdnsEvent::QueryAnswered`] events instead.
788    ///
789    /// # Returns
790    ///
791    /// Always returns `None`.
792    fn poll_read(&mut self) -> Option<Self::Rout> {
793        None
794    }
795
796    /// Handle write requests (not used).
797    ///
798    /// Queries are initiated via the [`query()`](Mdns::query) method instead
799    /// of through this interface.
800    fn handle_write(&mut self, _msg: ()) -> Result<()> {
801        Ok(())
802    }
803
804    /// Get the next packet to send.
805    ///
806    /// Call this method repeatedly until it returns `None` to retrieve all
807    /// queued packets. Packets should be sent via UDP to the address specified
808    /// in `packet.transport.peer_addr` (typically 224.0.0.251:5353).
809    ///
810    /// Packets are queued when:
811    /// - A query is started with [`query()`](Mdns::query)
812    /// - A query retry is triggered by `handle_timeout()`
813    /// - A response is generated for a matching question
814    ///
815    /// # Returns
816    ///
817    /// The next packet to send, or `None` if the queue is empty.
818    ///
819    /// # Example
820    ///
821    /// ```rust
822    /// use rtc_mdns::{MdnsConfig, Mdns};
823    /// use sansio::Protocol;
824    ///
825    /// let mut mdns = Mdns::new(MdnsConfig::default());
826    /// mdns.query("device.local");
827    ///
828    /// // Send all queued packets
829    /// while let Some(packet) = mdns.poll_write() {
830    ///     // socket.send_to(&packet.message, packet.transport.peer_addr).await?;
831    ///     println!("Send to {}", packet.transport.peer_addr);
832    /// }
833    /// ```
834    fn poll_write(&mut self) -> Option<Self::Wout> {
835        self.write_outs.pop_front()
836    }
837
838    /// Handle external events (not used).
839    ///
840    /// mDNS does not use external events. This method does nothing.
841    fn handle_event(&mut self, _evt: ()) -> Result<()> {
842        Ok(())
843    }
844
845    /// Get the next event.
846    ///
847    /// Call this method repeatedly until it returns `None` to process all
848    /// queued events. Events are generated when:
849    /// - An mDNS answer matches a pending query ([`MdnsEvent::QueryAnswered`])
850    ///
851    /// # Returns
852    ///
853    /// The next event, or `None` if the queue is empty.
854    ///
855    /// # Example
856    ///
857    /// ```rust,ignore
858    /// while let Some(event) = mdns.poll_event() {
859    ///     match event {
860    ///         MdnsEvent::QueryAnswered(query_id, addr) => {
861    ///             println!("Query {} resolved to {}", query_id, addr);
862    ///         }
863    ///         MdnsEvent::QueryTimeout(id) => {
864    ///             println!("Query {} timed out", id);
865    ///         }
866    ///     }
867    /// }
868    /// ```
869    fn poll_event(&mut self) -> Option<Self::Eout> {
870        self.event_outs.pop_front()
871    }
872
873    /// Handle timeout - retry pending queries.
874    ///
875    /// Call this method when the deadline from `poll_timeout()`
876    /// is reached. This triggers retry logic for pending queries.
877    ///
878    /// For each query whose retry time has passed, a new query packet will
879    /// be queued and can be retrieved with `poll_write()`.
880    ///
881    /// # Arguments
882    ///
883    /// * `now` - The current time
884    ///
885    /// # Errors
886    ///
887    /// Returns [`Error::ErrConnectionClosed`] if the connection has been closed.
888    ///
889    /// # Example
890    ///
891    /// ```rust
892    /// use rtc_mdns::{MdnsConfig, Mdns};
893    /// use sansio::Protocol;
894    /// use std::time::{Duration, Instant};
895    ///
896    /// let mut mdns = Mdns::new(
897    ///     MdnsConfig::default().with_query_interval(Duration::from_millis(100))
898    /// );
899    /// mdns.query("device.local");
900    ///
901    /// // Consume initial packet
902    /// mdns.poll_write();
903    ///
904    /// // Simulate time passing
905    /// let future = Instant::now() + Duration::from_millis(150);
906    /// mdns.handle_timeout(future).unwrap();
907    ///
908    /// // A retry packet should be queued
909    /// assert!(mdns.poll_write().is_some());
910    /// ```
911    fn handle_timeout(&mut self, now: Self::Time) -> Result<()> {
912        if self.closed {
913            return Err(Error::ErrConnectionClosed);
914        }
915
916        if let Some(next_timeout) = self.next_timeout.as_ref()
917            && next_timeout <= &now
918        {
919            // Check for timed out queries first
920            if let Some(timeout_duration) = self.query_timeout {
921                let mut timed_out_ids = Vec::new();
922                for query in &self.queries {
923                    if now.duration_since(query.start_time) >= timeout_duration {
924                        timed_out_ids.push(query.id);
925                    }
926                }
927
928                // Emit timeout events and remove timed out queries
929                for query_id in timed_out_ids {
930                    log::debug!(
931                        "mDNS Query {} timed out after {:?}",
932                        query_id,
933                        timeout_duration
934                    );
935                    self.event_outs.push_back(MdnsEvent::QueryTimeout(query_id));
936                    self.queries.retain(|q| q.id != query_id);
937                }
938            }
939
940            // Retry queries that are due
941            let mut names_to_query = Vec::new();
942            for query in &mut self.queries {
943                if query.next_retry <= now {
944                    names_to_query.push(query.name_with_suffix.clone());
945                    query.next_retry = now + self.query_interval;
946                }
947            }
948
949            for name in names_to_query {
950                self.send_question(&name, now);
951            }
952
953            self.update_next_timeout();
954        }
955        Ok(())
956    }
957
958    /// Get the next timeout deadline.
959    ///
960    /// Returns the time at which `handle_timeout()` should
961    /// be called next. Use this to schedule your event loop's sleep/wait.
962    ///
963    /// # Returns
964    ///
965    /// - `Some(instant)` if there are pending queries that need retries
966    /// - `None` if there are no pending queries
967    ///
968    /// # Example
969    ///
970    /// ```rust
971    /// use rtc_mdns::{MdnsConfig, Mdns};
972    /// use sansio::Protocol;
973    ///
974    /// let mut mdns = Mdns::new(MdnsConfig::default());
975    ///
976    /// // No queries, no timeout
977    /// assert!(mdns.poll_timeout().is_none());
978    ///
979    /// // Start a query
980    /// mdns.query("device.local");
981    ///
982    /// // Now there's a timeout scheduled
983    /// assert!(mdns.poll_timeout().is_some());
984    /// ```
985    fn poll_timeout(&mut self) -> Option<Self::Time> {
986        self.next_timeout
987    }
988
989    /// Close the connection.
990    ///
991    /// This clears all pending queries and queued packets/events.
992    /// After closing, `handle_read()` and
993    /// `handle_timeout()` will return
994    /// [`Error::ErrConnectionClosed`].
995    ///
996    /// # Example
997    ///
998    /// ```rust
999    /// use rtc_mdns::{MdnsConfig, Mdns};
1000    /// use sansio::Protocol;
1001    ///
1002    /// let mut mdns = Mdns::new(MdnsConfig::default());
1003    /// mdns.query("device.local");
1004    ///
1005    /// assert_eq!(mdns.pending_query_count(), 1);
1006    ///
1007    /// mdns.close().unwrap();
1008    ///
1009    /// // All state is cleared
1010    /// assert_eq!(mdns.pending_query_count(), 0);
1011    /// assert!(mdns.poll_write().is_none());
1012    /// assert!(mdns.poll_timeout().is_none());
1013    /// ```
1014    fn close(&mut self) -> Result<()> {
1015        self.closed = true;
1016        self.queries.clear();
1017        self.write_outs.clear();
1018        self.event_outs.clear();
1019        self.next_timeout = None;
1020        Ok(())
1021    }
1022}
1023
1024#[cfg(test)]
1025mod mdns_test;