async_coap/
local_endpoint.rs

1// Copyright 2019 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15
16use super::*;
17
18use super::remote_endpoint::RemoteEndpoint;
19use futures::stream::Collect;
20use std::sync::Arc;
21
22/// Trait representing a local (as opposed to remote) CoAP endpoint. Allows for sending and
23/// receiving CoAP requests.
24///
25/// # Implementations
26///
27/// `LocalEndpoint` is a trait, which allows for multiple back-end implementations.
28/// `async-coap` comes with two: [`NullLocalEndpoint`] and [`DatagramLocalEndpoint`].
29///
30/// [`NullLocalEndpoint`] does what you might expect: nothing. Attempts to send
31/// requests always results in [`Error::ResponseTimeout`] and [`LocalEndpoint::receive`]
32/// will block indefinitely. Creating an instance of it is quite straightforward:
33///
34/// [`NullLocalEndpoint`]: crate::null::NullLocalEndpoint
35/// [`DatagramLocalEndpoint`]: crate::datagram::DatagramLocalEndpoint
36///
37/// ```
38/// use std::sync::Arc;
39/// use async_coap::null::NullLocalEndpoint;
40///
41/// let local_endpoint = Arc::new(NullLocalEndpoint);
42/// ```
43///
44/// If you want to do something more useful, then [`DatagramLocalEndpoint`] is likely
45/// what you are looking for. It takes an instance of [`AsyncDatagramSocket`] at construction:
46///
47/// [`AsyncDatagramSocket`]: crate::datagram::AsyncDatagramSocket
48///
49/// ```
50/// use std::sync::Arc;
51/// use async_coap::prelude::*;
52/// use async_coap::datagram::{DatagramLocalEndpoint,AllowStdUdpSocket};
53///
54/// // `AllowStdUdpSocket`, which is a (inefficient) wrapper around the
55/// // standard rust `UdpSocket`. It is convenient for testing and for examples
56/// // but should not be used in production code.
57/// let socket = AllowStdUdpSocket::bind("[::]:0").expect("UDP bind failed");
58///
59/// // Create a new local endpoint from the socket instance we just created,
60/// // wrapping it in a `Arc<>` to ensure it can live long enough.
61/// let local_endpoint = Arc::new(DatagramLocalEndpoint::new(socket));
62/// ```
63///
64/// # Client Usage
65///
66/// Before you can start sending requests and receiving responses, you
67/// will need to make sure that the [`LocalEndpoint::receive`] method
68/// gets called repeatedly. The easiest way to do that is to add the
69/// [`std::future::Future`] returned by [`LocalEndpointExt::receive_loop_arc`]
70/// to an execution pool:
71///
72/// ```
73/// # use std::sync::Arc;
74/// # use async_coap::prelude::*;
75/// # use async_coap::datagram::{DatagramLocalEndpoint, AllowStdUdpSocket, LoopbackSocket};
76/// # use async_coap::null::NullLocalEndpoint;
77/// #
78/// # let local_endpoint = Arc::new(NullLocalEndpoint);
79/// #
80/// use futures::{prelude::*,executor::ThreadPool,task::Spawn,task::SpawnExt};
81///
82/// let mut pool = ThreadPool::new().expect("Unable to create thread pool");
83///
84/// // We use a receiver handler of `null_receiver!()` because this instance
85/// // will be used purely as a client, not a server.
86/// pool.spawn(local_endpoint
87///     .clone()
88///     .receive_loop_arc(null_receiver!())
89///     .map(|_|unreachable!())
90/// );
91/// ```
92///
93/// Once the `Arc<LocalEndpint>` has been added to an execution pool, the `run_until` method
94/// on the pool can be used to block execution of the futures emitted by `LocalEndpoint`:
95///
96/// ```
97/// # use std::sync::Arc;
98/// # use futures::{prelude::*,executor::LocalPool,task::LocalSpawnExt};
99/// # use async_coap::prelude::*;
100/// # use async_coap::datagram::{DatagramLocalEndpoint, AllowStdUdpSocket, LoopbackSocket};
101/// # use async_coap::null::NullLocalEndpoint;
102/// #
103/// # // Using a NullLocalEndpoint since this is just a simple usage example.
104/// # let local_endpoint = Arc::new(NullLocalEndpoint);
105/// # let mut local_pool = LocalPool::new();
106/// #
107/// # local_pool.spawner().spawn_local(local_endpoint
108/// #     .clone()
109/// #     .receive_loop_arc(null_receiver!())
110/// #     .map(|_|unreachable!())
111/// # );
112///
113/// let result = local_pool.run_until(
114///     local_endpoint.send(
115///         "coap.me:5683",
116///         CoapRequest::get()       // This is a CoAP GET request
117///             .emit_any_response() // Return the first response we get
118///     )
119/// );
120///
121/// println!("result: {:?}", result);
122/// ```
123///
124/// Or, more naturally, the returned futures can be used directly in `async` blocks:
125///
126/// ```
127/// # #![feature(async_await)]
128/// # use std::sync::Arc;
129/// # use futures::{prelude::*,executor::LocalPool,task::LocalSpawnExt};
130/// # use async_coap::prelude::*;
131/// # use async_coap::datagram::{DatagramLocalEndpoint, AllowStdUdpSocket, LoopbackSocket};
132/// # use async_coap::null::NullLocalEndpoint;
133/// #
134/// # // Using a NullLocalEndpoint since this is just a simple usage example.
135/// # let local_endpoint = Arc::new(NullLocalEndpoint);
136/// # let mut pool = LocalPool::new();
137/// #
138/// # pool.spawner().spawn_local(local_endpoint
139/// #     .clone()
140/// #     .receive_loop_arc(null_receiver!())
141/// #     .map(|_|unreachable!())
142/// # );
143/// #
144/// # let future =
145/// async move {
146///     let future = local_endpoint.send(
147///         "coap.me:5683",
148///         CoapRequest::get()       // This is a CoAP GET request
149///             .emit_any_response() // Return the first response we get
150///     );
151///
152///     // Wait for the final result and print it.
153///     println!("result: {:?}", future.await);
154/// }
155/// # ;
156/// #
157/// # pool.run_until(future);
158/// ```
159///
160/// # Server Usage
161///
162/// In order to serve resources for other devices to interact with, you will
163/// need to replace the [`null_receiver!`] we were using earlier with something
164/// more substantial. The method takes a closure as an argument, and the closure
165/// itself has a single argument: a borrowed [`RespondableInboundContext`].
166///
167/// For example, to have our server return a response for a request instead of
168/// just returning an error, we could use the following function as our receive handler:
169///
170/// ```
171/// use async_coap::prelude::*;
172/// use async_coap::{RespondableInboundContext, Error};
173///
174/// fn receive_handler<T: RespondableInboundContext>(context: &T) -> Result<(),Error> {
175///     context.respond(|msg_out|{
176///         msg_out.set_msg_code(MsgCode::SuccessContent);
177///         msg_out.insert_option(option::CONTENT_FORMAT, ContentFormat::TEXT_PLAIN_UTF8)?;
178///         msg_out.append_payload_string("Successfully fetched!")?;
179///         Ok(())
180///     })?;
181///     Ok(())
182/// }
183/// # use std::sync::Arc;
184/// # use futures::{prelude::*,executor::LocalPool,task::LocalSpawnExt};
185/// # use async_coap::datagram::{DatagramLocalEndpoint, AllowStdUdpSocket, LoopbackSocket, LoopbackSocketAddr};
186/// # use async_coap::null::NullLocalEndpoint;
187/// # use async_coap::message::MessageRead;
188/// #
189/// # let local_endpoint = Arc::new(DatagramLocalEndpoint::new(LoopbackSocket::new()));
190/// # let mut pool = LocalPool::new();
191/// #
192/// # pool.spawner().spawn_local(local_endpoint.clone().receive_loop_arc(receive_handler).map(|_|unreachable!()));
193/// #
194/// # let result = pool.run_until(
195/// #     local_endpoint.send(
196/// #         LoopbackSocketAddr::Unicast,
197/// #         CoapRequest::get()       // This is a CoAP GET request
198/// #             .emit_any_response() // Return the first response we get
199/// #     )
200/// # );
201/// # println!("result: {:?}", result);
202/// # let result = result.unwrap();
203/// # assert_eq!(result.msg_code(), MsgCode::SuccessContent);
204/// # assert_eq!(result.msg_type(), MsgType::Ack);
205/// ```
206///
207/// However, that's actually not super useful: it returns a successful result for
208/// every possible request: including bogus ones. Let's say that we wanted to expose a
209/// resource that lives at "`/test`" on our server, returning a [`4.04 Not Found`](MsgCode::ClientErrorNotFound)
210/// for every other request. That might look something like this:
211///
212/// ```
213/// use async_coap::prelude::*;
214/// use async_coap::{RespondableInboundContext, Error, LinkFormatWrite, LINK_ATTR_TITLE};
215/// use core::fmt::Write; // For `write!()`
216/// use core::borrow::Borrow;
217/// use option::CONTENT_FORMAT;
218///
219/// fn receive_handler<T: RespondableInboundContext>(context: &T) -> Result<(),Error> {
220///     let msg = context.message();
221///     let uri = msg.options().extract_uri()?;
222///     let decoded_path = uri.raw_path().unescape_uri().skip_slashes().to_cow();
223///
224///     match (msg.msg_code(), decoded_path.borrow()) {
225///         // Handle GET /test
226///         (MsgCode::MethodGet, "test") => context.respond(|msg_out| {
227///             msg_out.set_msg_code(MsgCode::SuccessContent);
228///             msg_out.insert_option(CONTENT_FORMAT, ContentFormat::TEXT_PLAIN_UTF8);
229///             write!(msg_out,"Successfully fetched {:?}!", uri.as_str())?;
230///             Ok(())
231///         }),
232///
233///         // Handle GET /.well-known/core, for service discovery.
234///         (MsgCode::MethodGet, ".well-known/core") => context.respond(|msg_out| {
235///             msg_out.set_msg_code(MsgCode::SuccessContent);
236///             msg_out.insert_option(CONTENT_FORMAT, ContentFormat::APPLICATION_LINK_FORMAT);
237///             LinkFormatWrite::new(msg_out)
238///                 .link(uri_ref!("/test"))
239///                     .attr(LINK_ATTR_TITLE, "Test Resource")
240///                     .finish()?;
241///             Ok(())
242///         }),
243///
244///         // Handle unsupported methods
245///         (_, "test") | (_, ".well-known/core") => context.respond(|msg_out| {
246///            msg_out.set_msg_code(MsgCode::ClientErrorMethodNotAllowed);
247///             write!(msg_out,"Method \"{:?}\" Not Allowed", msg.msg_code())?;
248///             Ok(())
249///         }),
250///
251///         // Everything else is a 4.04
252///         (_, _) => context.respond(|msg_out| {
253///             msg_out.set_msg_code(MsgCode::ClientErrorNotFound);
254///             write!(msg_out,"{:?} Not Found", uri.as_str())?;
255///             Ok(())
256///         }),
257///     }
258/// }
259/// # use std::sync::Arc;
260/// # use futures::{prelude::*,executor::LocalPool,task::LocalSpawnExt};
261/// # use async_coap::datagram::{DatagramLocalEndpoint, AllowStdUdpSocket, LoopbackSocket, LoopbackSocketAddr};
262/// # use async_coap::null::NullLocalEndpoint;
263/// # use async_coap::message::MessageRead;
264/// # use std::borrow::Cow;
265/// #
266/// # let local_endpoint = Arc::new(DatagramLocalEndpoint::new(LoopbackSocket::new()));
267/// # let mut pool = LocalPool::new();
268/// #
269/// # pool.spawner().spawn_local(local_endpoint
270/// #     .clone()
271/// #     .receive_loop_arc(receive_handler)
272/// #     .map(|_|unreachable!())
273/// # );
274/// #
275/// # let result = pool.run_until(
276/// #     local_endpoint.send(
277/// #         LoopbackSocketAddr::Unicast,
278/// #         CoapRequest::get()       // This is a CoAP GET request
279/// #             .uri_host_path(None, rel_ref!("test")) // Add a path to the message
280/// #             .emit_any_response() // Return the first response we get
281/// #     )
282/// # );
283/// # println!("result: {:?}", result);
284/// # let result = result.unwrap();
285/// # assert_eq!(result.msg_code(), MsgCode::SuccessContent);
286/// # assert_eq!(result.msg_type(), MsgType::Ack);
287/// #
288/// #
289/// # let result = pool.run_until(
290/// #     local_endpoint.send(
291/// #         LoopbackSocketAddr::Unicast,
292/// #         CoapRequest::post()       // This is a CoAP POST request
293/// #             .uri_host_path(None, rel_ref!("test")) // Add a path to the message
294/// #             .emit_successful_response() // Return the first successful response we get
295/// #             .inspect(|cx| {
296/// #                 // Inspect here since we currently can't do
297/// #                 // a detailed check in the return value.
298/// #                 assert_eq!(cx.message().msg_code(), MsgCode::ClientErrorMethodNotAllowed);
299/// #                 assert_eq!(cx.message().msg_type(), MsgType::Ack);
300/// #             })
301/// #     )
302/// # );
303/// # println!("result: {:?}", result);
304/// # assert_eq!(result.err(), Some(Error::ClientRequestError));
305/// #
306/// # let result = pool.run_until(
307/// #     local_endpoint.send(
308/// #         LoopbackSocketAddr::Unicast,
309/// #         CoapRequest::get()       // This is a CoAP GET request
310/// #             .emit_successful_response() // Return the first successful response we get
311/// #             .uri_host_path(None, rel_ref!("/foobar"))
312/// #             .inspect(|cx| {
313/// #                 // Inspect here since we currently can't do
314/// #                 // a detailed check in the return value.
315/// #                 assert_eq!(cx.message().msg_code(), MsgCode::ClientErrorNotFound);
316/// #                 assert_eq!(cx.message().msg_type(), MsgType::Ack);
317/// #             })
318/// #     )
319/// # );
320/// # println!("result: {:?}", result);
321/// # assert_eq!(result.err(), Some(Error::ResourceNotFound));
322/// ```
323///
324pub trait LocalEndpoint: Sized {
325    /// The `SocketAddr` type to use with this local endpoint. This is usually
326    /// simply `std::net::SocketAddr`, but may be different in some cases (like for CoAP-SMS
327    /// endpoints).
328    type SocketAddr: SocketAddrExt
329        + ToSocketAddrs<SocketAddr = Self::SocketAddr, Error = Self::SocketError>;
330
331    /// The error type associated with errors generated by socket and address-lookup operations.
332    /// Typically, this is `std::io::Error`, but it may be different if `Self::SocketAddr` isn't
333    /// `std::net::SocketAddr`.
334    type SocketError: core::fmt::Debug;
335
336    /// The trait representing the default transmission parameters to use.
337    type DefaultTransParams: TransParams;
338
339    /// Type used by closure that is passed into `send()`, representing the context for the
340    /// response.
341    type InboundContext: InboundContext<SocketAddr = Self::SocketAddr>;
342
343    /// Type used by closure that is passed into `receive()`, representing the context for
344    /// inbound requests.
345    type RespondableInboundContext: RespondableInboundContext<SocketAddr = Self::SocketAddr>;
346
347    /// Returns a string representing the scheme of the underlying transport.
348    /// For example, this could return `"coap"`, `"coaps+sms"`, etc.
349    fn scheme(&self) -> &str;
350
351    /// Returns the default port to use when the port is unspecified. This value
352    /// is typically defined by the scheme. Returns zero if port numbers are ignored
353    /// by the underlying technology.
354    fn default_port(&self) -> u16;
355
356    /// The concrete return type of the `lookup()` method.
357    type LookupStream: Stream<Item = Self::SocketAddr> + Unpin;
358
359    /// Method for asynchronously looking up the `Self::SocketAddr` instances for the
360    /// given hostname and port.
361    fn lookup(&self, hostname: &str, port: u16) -> Result<Self::LookupStream, Error>;
362
363    /// The concrete type for a `RemoteEndpoint` associated with this local endpoint.
364    type RemoteEndpoint: RemoteEndpoint<
365        SocketAddr = Self::SocketAddr,
366        InboundContext = Self::InboundContext,
367    >;
368
369    /// Constructs a new [`RemoteEndpoint`] instance for the given address, host, and path.
370    fn remote_endpoint<S, H, P>(&self, addr: S, host: Option<H>, path: P) -> Self::RemoteEndpoint
371    where
372        S: ToSocketAddrs<SocketAddr = Self::SocketAddr, Error = Self::SocketError>,
373        H: Into<String>,
374        P: Into<RelRefBuf>;
375
376    /// Constructs a new [`RemoteEndpoint`] instance for the given Uri.
377    fn remote_endpoint_from_uri(&self, uri: &Uri) -> Result<Self::RemoteEndpoint, Error>;
378
379    /// Sends a message to `remote_addr` based on the criteria provided by
380    /// [`send_desc`][crate::SendDesc].
381    ///
382    /// `send_desc`, which implements [`SendDesc`][crate::SendDesc], is the real heavy lifter here.
383    /// It defines the message content, retransmit timing, resending logic---even the
384    /// return type of this method if defined by `send_desc`.
385    /// This flexibility allows this method to uniformly perform complex interactions
386    /// like [block transfers][IETF-RFC7959] and [resource observing][IETF-RFC7641].
387    ///
388    /// [IETF-RFC7959]: https://tools.ietf.org/html/rfc7959
389    /// [IETF-RFC7641]: https://tools.ietf.org/html/rfc7641
390    ///
391    /// A variant of this method, [`LocalEndpointExt::send_as_stream`], is used to
392    /// handle cases where multiple responses are expected, such as when sending
393    /// multicast requests or doing [resource observing][IETF-RFC7641].
394    ///
395    /// ## Performance Tips
396    ///
397    /// If you are going to be calling this method frequently for a destination that you are
398    /// referencing by a hostname, it will significantly improve performance on some platforms
399    /// if you only pass `SocketAddr` types to `remote_addr` and not rely on having `ToSocketAddrs`
400    /// do hostname lookups inside of `send`.
401    ///
402    /// The easiest way to do this is to use either the [`remote_endpoint`] or
403    /// [`remote_endpoint_from_uri`] methods to create a [`RemoteEndpoint`] instance
404    /// and call the [`send`][crate::RemoteEndpoint::send] method on that instead. From that
405    /// instance you can call `send` multiple times: any hostname that needs to be resolved
406    /// is calculated and cached when the `RemoteEndpoint` is first created.
407    ///
408    /// [`RemoteEndpoint`]: crate::RemoteEndpoint
409    /// [`remote_endpoint`]: LocalEndpoint::remote_endpoint
410    /// [`remote_endpoint_from_uri`]: LocalEndpoint::remote_endpoint_from_uri
411    ///
412    /// ## Transaction Tracking
413    ///
414    /// All state tracking the transmission of the message is stored in the returned future.
415    /// To cancel retransmits, drop the returned future.
416    ///
417    /// The returned future is lazily evaluated, so nothing will be transmitted unless the
418    /// returned future is polled: you cannot simply fire and forget. Because of this lazy
419    /// evaluation, the futures returned by this method do not need to be used immediately and
420    /// may be stored for later use if that happens to be useful.
421    ///
422    #[must_use = "nothing will be sent unless the returned future is polled"]
423    fn send<'a, S, R, SD>(
424        &'a self,
425        remote_addr: S,
426        send_desc: SD,
427    ) -> BoxFuture<'a, Result<R, Error>>
428    where
429        S: ToSocketAddrs<SocketAddr = Self::SocketAddr, Error = Self::SocketError> + 'a,
430        SD: SendDesc<Self::InboundContext, R> + 'a,
431        R: Send + 'a;
432
433    /// Receives a single request and runs the given `handler` on it once.
434    ///
435    /// Each call handles (at most) one single inbound request.
436    /// To handle multiple requests, call this function from a loop.
437    /// The [`LocalEndpointExt`] trait comes with some helpers to make
438    /// implementing such a loop easier: [`receive_as_stream`],
439    /// [`receive_loop`], and [`receive_loop_arc`].
440    ///
441    /// [`receive_as_stream`]: LocalEndpointExt::receive_as_stream
442    /// [`receive_loop`]: LocalEndpointExt::receive_loop
443    /// [`receive_loop_arc`]: LocalEndpointExt::receive_loop_arc
444    ///
445    /// *Properly calling this method in the background is absolutely critical to
446    /// the correct operation of this trait:* **[`send`] will not work without it**.
447    ///
448    /// [`send`]: LocalEndpoint::send
449    ///
450    /// Local endpoints which implement [`Sync`] can have this method called from multiple
451    /// threads, allowing multiple requests to be handled concurrently.
452    ///
453    /// ## Handler
454    ///
455    /// If you are going to be serving resources using this [`LocalEndpoint`], you
456    /// will need specify a handler to handle inbound requests.
457    /// See the section [Server Usage](#server-usage) above for an example.
458    ///
459    /// If instead you are only using this [`LocalEndpoint`] as a client, then you may pass
460    /// `null_receiver!()` as the handler, as shown in [Client Usage](#client-usage).
461    #[must_use = "nothing will be received unless the returned future is polled"]
462    fn receive<'a, F>(&'a self, handler: F) -> BoxFuture<'a, Result<(), Error>>
463    where
464        F: FnMut(&Self::RespondableInboundContext) -> Result<(), Error> + 'a + Send + Unpin;
465}
466
467/// Handler for [`LocalEndpoint::receive`] that does nothing and lets the underlying
468/// [`LocalEndpoint`] implementation decide how best to respond (if at all).
469#[macro_export]
470macro_rules! null_receiver {
471    ( ) => {
472        |_| Ok(())
473    };
474}
475
476/// Extension trait for [`LocalEndpoint`] which implements additional helper methods.
477pub trait LocalEndpointExt: LocalEndpoint {
478    /// Sends a message where multiple responses are expected, returned as a [`SendAsStream`].
479    ///
480    /// In this version of [`LocalEndpoint::send`], the `send_desc` can return
481    /// [`ResponseStatus::Done`] from its handler multiple times, with the results being emitted
482    /// from the returned [`SendAsStream`].
483    ///
484    /// The stream can be cleanly ended by the handler eventually returning
485    /// [`Error::ResponseTimeout`] or [`Error::Cancelled`], neither of which will be emitted
486    /// as an error.
487    fn send_as_stream<'a, S, R, SD>(&'a self, dest: S, send_desc: SD) -> SendAsStream<'a, R>
488    where
489        S: ToSocketAddrs<SocketAddr = Self::SocketAddr, Error = Self::SocketError> + 'a,
490        SD: SendDesc<Self::InboundContext, R> + 'a,
491        R: Send + 'a,
492    {
493        let (sender, receiver) = futures::channel::mpsc::channel::<Result<R, Error>>(10);
494
495        SendAsStream {
496            receiver,
497            send_future: self.send(dest, SendAsStreamDesc::new(send_desc, sender)),
498        }
499    }
500
501    /// Version of [`LocalEndpoint::receive`] that handles more than one inbound message,
502    /// returning a [`crate::ReceiveAsStream`] instead of a future.
503    ///
504    /// This stream will terminate immediately after any of the following errors are emitted by the
505    /// underlying calls to [`LocalEndpoint::receive`]:
506    ///
507    /// * [`Error::IOError`](enum_Error.html#variant.IOError)
508    /// * [`Error::Cancelled`](enum_Error.html#variant.Cancelled)
509    ///
510    /// All other errors are ignored.
511    fn receive_as_stream<'a, F>(&'a self, handler: F) -> ReceiveAsStream<'a, Self, F>
512    where
513        F: FnMut(&Self::RespondableInboundContext) -> Result<(), Error> + 'a + Clone + Unpin + Send,
514    {
515        ReceiveAsStream::new(self, handler)
516    }
517
518    /// Convenience method for implementing a [`receive`](LocalEndpoint::receive) loop.
519    ///
520    /// The returned future will terminate when the underlying [`crate::ReceiveAsStream`]
521    /// terminates, returning the error that was emitted before the stream terminated,
522    /// typically either [`Error::IOError`] or [`Error::Cancelled`].
523    fn receive_loop<'a, F>(&'a self, handler: F) -> Collect<ReceiveAsStream<'a, Self, F>, Error>
524    where
525        F: FnMut(&Self::RespondableInboundContext) -> Result<(), Error> + 'a + Clone + Unpin + Send,
526    {
527        self.receive_as_stream(handler).collect()
528    }
529
530    /// Version of [`LocalEndpointExt::receive_loop`] which consumes and holds an [`Arc<Self>`].
531    ///
532    /// [`LocalEndpoint`s][LocalEndpoint] are often held inside of an [`Arc<>`], which makes
533    /// using methods like [`LocalEndpointExt::receive_loop`] particularly awkward.
534    ///
535    /// `receive_loop_arc` makes this situation relatively painless by returning the receive loop
536    /// future in an (effectively transparent) [`ArcGuard`] wrapper.
537    ///
538    /// ```
539    /// # #![feature(async_await)]
540    /// #
541    /// # use std::sync::Arc;
542    /// # use futures::prelude::*;
543    /// # use async_coap::prelude::*;
544    /// # use async_coap::null::NullLocalEndpoint;
545    /// # use futures::executor::ThreadPool;
546    /// # use futures::task::SpawnExt;
547    ///
548    /// let local_endpoint = Arc::new(NullLocalEndpoint);
549    /// let mut pool = ThreadPool::new().expect("Unable to start thread pool");
550    ///
551    /// pool.spawn(local_endpoint
552    ///     .clone()
553    ///     .receive_loop_arc(null_receiver!())
554    ///     .map(|err| panic!("Receive loop terminated: {}", err))
555    /// );
556    /// ```
557    fn receive_loop_arc<'a, F>(
558        self: Arc<Self>,
559        handler: F,
560    ) -> ArcGuard<Self, Collect<ReceiveAsStream<'a, Self, F>, Error>>
561    where
562        F: FnMut(&Self::RespondableInboundContext) -> Result<(), Error> + 'a + Clone + Send + Unpin,
563        Self: 'a,
564    {
565        self.guard(|x| x.receive_loop(handler))
566    }
567}
568
569/// Blanket implementation of `LocalEndpointExt` for all `LocalEndpoint` instances.
570impl<T: LocalEndpoint> LocalEndpointExt for T {}