async_coap/local_endpoint.rs
1// Copyright 2019 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15
16use super::*;
17
18use super::remote_endpoint::RemoteEndpoint;
19use futures::stream::Collect;
20use std::sync::Arc;
21
22/// Trait representing a local (as opposed to remote) CoAP endpoint. Allows for sending and
23/// receiving CoAP requests.
24///
25/// # Implementations
26///
27/// `LocalEndpoint` is a trait, which allows for multiple back-end implementations.
28/// `async-coap` comes with two: [`NullLocalEndpoint`] and [`DatagramLocalEndpoint`].
29///
30/// [`NullLocalEndpoint`] does what you might expect: nothing. Attempts to send
31/// requests always results in [`Error::ResponseTimeout`] and [`LocalEndpoint::receive`]
32/// will block indefinitely. Creating an instance of it is quite straightforward:
33///
34/// [`NullLocalEndpoint`]: crate::null::NullLocalEndpoint
35/// [`DatagramLocalEndpoint`]: crate::datagram::DatagramLocalEndpoint
36///
37/// ```
38/// use std::sync::Arc;
39/// use async_coap::null::NullLocalEndpoint;
40///
41/// let local_endpoint = Arc::new(NullLocalEndpoint);
42/// ```
43///
44/// If you want to do something more useful, then [`DatagramLocalEndpoint`] is likely
45/// what you are looking for. It takes an instance of [`AsyncDatagramSocket`] at construction:
46///
47/// [`AsyncDatagramSocket`]: crate::datagram::AsyncDatagramSocket
48///
49/// ```
50/// use std::sync::Arc;
51/// use async_coap::prelude::*;
52/// use async_coap::datagram::{DatagramLocalEndpoint,AllowStdUdpSocket};
53///
54/// // `AllowStdUdpSocket`, which is a (inefficient) wrapper around the
55/// // standard rust `UdpSocket`. It is convenient for testing and for examples
56/// // but should not be used in production code.
57/// let socket = AllowStdUdpSocket::bind("[::]:0").expect("UDP bind failed");
58///
59/// // Create a new local endpoint from the socket instance we just created,
60/// // wrapping it in a `Arc<>` to ensure it can live long enough.
61/// let local_endpoint = Arc::new(DatagramLocalEndpoint::new(socket));
62/// ```
63///
64/// # Client Usage
65///
66/// Before you can start sending requests and receiving responses, you
67/// will need to make sure that the [`LocalEndpoint::receive`] method
68/// gets called repeatedly. The easiest way to do that is to add the
69/// [`std::future::Future`] returned by [`LocalEndpointExt::receive_loop_arc`]
70/// to an execution pool:
71///
72/// ```
73/// # use std::sync::Arc;
74/// # use async_coap::prelude::*;
75/// # use async_coap::datagram::{DatagramLocalEndpoint, AllowStdUdpSocket, LoopbackSocket};
76/// # use async_coap::null::NullLocalEndpoint;
77/// #
78/// # let local_endpoint = Arc::new(NullLocalEndpoint);
79/// #
80/// use futures::{prelude::*,executor::ThreadPool,task::Spawn,task::SpawnExt};
81///
82/// let mut pool = ThreadPool::new().expect("Unable to create thread pool");
83///
84/// // We use a receiver handler of `null_receiver!()` because this instance
85/// // will be used purely as a client, not a server.
86/// pool.spawn(local_endpoint
87/// .clone()
88/// .receive_loop_arc(null_receiver!())
89/// .map(|_|unreachable!())
90/// );
91/// ```
92///
93/// Once the `Arc<LocalEndpint>` has been added to an execution pool, the `run_until` method
94/// on the pool can be used to block execution of the futures emitted by `LocalEndpoint`:
95///
96/// ```
97/// # use std::sync::Arc;
98/// # use futures::{prelude::*,executor::LocalPool,task::LocalSpawnExt};
99/// # use async_coap::prelude::*;
100/// # use async_coap::datagram::{DatagramLocalEndpoint, AllowStdUdpSocket, LoopbackSocket};
101/// # use async_coap::null::NullLocalEndpoint;
102/// #
103/// # // Using a NullLocalEndpoint since this is just a simple usage example.
104/// # let local_endpoint = Arc::new(NullLocalEndpoint);
105/// # let mut local_pool = LocalPool::new();
106/// #
107/// # local_pool.spawner().spawn_local(local_endpoint
108/// # .clone()
109/// # .receive_loop_arc(null_receiver!())
110/// # .map(|_|unreachable!())
111/// # );
112///
113/// let result = local_pool.run_until(
114/// local_endpoint.send(
115/// "coap.me:5683",
116/// CoapRequest::get() // This is a CoAP GET request
117/// .emit_any_response() // Return the first response we get
118/// )
119/// );
120///
121/// println!("result: {:?}", result);
122/// ```
123///
124/// Or, more naturally, the returned futures can be used directly in `async` blocks:
125///
126/// ```
127/// # #![feature(async_await)]
128/// # use std::sync::Arc;
129/// # use futures::{prelude::*,executor::LocalPool,task::LocalSpawnExt};
130/// # use async_coap::prelude::*;
131/// # use async_coap::datagram::{DatagramLocalEndpoint, AllowStdUdpSocket, LoopbackSocket};
132/// # use async_coap::null::NullLocalEndpoint;
133/// #
134/// # // Using a NullLocalEndpoint since this is just a simple usage example.
135/// # let local_endpoint = Arc::new(NullLocalEndpoint);
136/// # let mut pool = LocalPool::new();
137/// #
138/// # pool.spawner().spawn_local(local_endpoint
139/// # .clone()
140/// # .receive_loop_arc(null_receiver!())
141/// # .map(|_|unreachable!())
142/// # );
143/// #
144/// # let future =
145/// async move {
146/// let future = local_endpoint.send(
147/// "coap.me:5683",
148/// CoapRequest::get() // This is a CoAP GET request
149/// .emit_any_response() // Return the first response we get
150/// );
151///
152/// // Wait for the final result and print it.
153/// println!("result: {:?}", future.await);
154/// }
155/// # ;
156/// #
157/// # pool.run_until(future);
158/// ```
159///
160/// # Server Usage
161///
162/// In order to serve resources for other devices to interact with, you will
163/// need to replace the [`null_receiver!`] we were using earlier with something
164/// more substantial. The method takes a closure as an argument, and the closure
165/// itself has a single argument: a borrowed [`RespondableInboundContext`].
166///
167/// For example, to have our server return a response for a request instead of
168/// just returning an error, we could use the following function as our receive handler:
169///
170/// ```
171/// use async_coap::prelude::*;
172/// use async_coap::{RespondableInboundContext, Error};
173///
174/// fn receive_handler<T: RespondableInboundContext>(context: &T) -> Result<(),Error> {
175/// context.respond(|msg_out|{
176/// msg_out.set_msg_code(MsgCode::SuccessContent);
177/// msg_out.insert_option(option::CONTENT_FORMAT, ContentFormat::TEXT_PLAIN_UTF8)?;
178/// msg_out.append_payload_string("Successfully fetched!")?;
179/// Ok(())
180/// })?;
181/// Ok(())
182/// }
183/// # use std::sync::Arc;
184/// # use futures::{prelude::*,executor::LocalPool,task::LocalSpawnExt};
185/// # use async_coap::datagram::{DatagramLocalEndpoint, AllowStdUdpSocket, LoopbackSocket, LoopbackSocketAddr};
186/// # use async_coap::null::NullLocalEndpoint;
187/// # use async_coap::message::MessageRead;
188/// #
189/// # let local_endpoint = Arc::new(DatagramLocalEndpoint::new(LoopbackSocket::new()));
190/// # let mut pool = LocalPool::new();
191/// #
192/// # pool.spawner().spawn_local(local_endpoint.clone().receive_loop_arc(receive_handler).map(|_|unreachable!()));
193/// #
194/// # let result = pool.run_until(
195/// # local_endpoint.send(
196/// # LoopbackSocketAddr::Unicast,
197/// # CoapRequest::get() // This is a CoAP GET request
198/// # .emit_any_response() // Return the first response we get
199/// # )
200/// # );
201/// # println!("result: {:?}", result);
202/// # let result = result.unwrap();
203/// # assert_eq!(result.msg_code(), MsgCode::SuccessContent);
204/// # assert_eq!(result.msg_type(), MsgType::Ack);
205/// ```
206///
207/// However, that's actually not super useful: it returns a successful result for
208/// every possible request: including bogus ones. Let's say that we wanted to expose a
209/// resource that lives at "`/test`" on our server, returning a [`4.04 Not Found`](MsgCode::ClientErrorNotFound)
210/// for every other request. That might look something like this:
211///
212/// ```
213/// use async_coap::prelude::*;
214/// use async_coap::{RespondableInboundContext, Error, LinkFormatWrite, LINK_ATTR_TITLE};
215/// use core::fmt::Write; // For `write!()`
216/// use core::borrow::Borrow;
217/// use option::CONTENT_FORMAT;
218///
219/// fn receive_handler<T: RespondableInboundContext>(context: &T) -> Result<(),Error> {
220/// let msg = context.message();
221/// let uri = msg.options().extract_uri()?;
222/// let decoded_path = uri.raw_path().unescape_uri().skip_slashes().to_cow();
223///
224/// match (msg.msg_code(), decoded_path.borrow()) {
225/// // Handle GET /test
226/// (MsgCode::MethodGet, "test") => context.respond(|msg_out| {
227/// msg_out.set_msg_code(MsgCode::SuccessContent);
228/// msg_out.insert_option(CONTENT_FORMAT, ContentFormat::TEXT_PLAIN_UTF8);
229/// write!(msg_out,"Successfully fetched {:?}!", uri.as_str())?;
230/// Ok(())
231/// }),
232///
233/// // Handle GET /.well-known/core, for service discovery.
234/// (MsgCode::MethodGet, ".well-known/core") => context.respond(|msg_out| {
235/// msg_out.set_msg_code(MsgCode::SuccessContent);
236/// msg_out.insert_option(CONTENT_FORMAT, ContentFormat::APPLICATION_LINK_FORMAT);
237/// LinkFormatWrite::new(msg_out)
238/// .link(uri_ref!("/test"))
239/// .attr(LINK_ATTR_TITLE, "Test Resource")
240/// .finish()?;
241/// Ok(())
242/// }),
243///
244/// // Handle unsupported methods
245/// (_, "test") | (_, ".well-known/core") => context.respond(|msg_out| {
246/// msg_out.set_msg_code(MsgCode::ClientErrorMethodNotAllowed);
247/// write!(msg_out,"Method \"{:?}\" Not Allowed", msg.msg_code())?;
248/// Ok(())
249/// }),
250///
251/// // Everything else is a 4.04
252/// (_, _) => context.respond(|msg_out| {
253/// msg_out.set_msg_code(MsgCode::ClientErrorNotFound);
254/// write!(msg_out,"{:?} Not Found", uri.as_str())?;
255/// Ok(())
256/// }),
257/// }
258/// }
259/// # use std::sync::Arc;
260/// # use futures::{prelude::*,executor::LocalPool,task::LocalSpawnExt};
261/// # use async_coap::datagram::{DatagramLocalEndpoint, AllowStdUdpSocket, LoopbackSocket, LoopbackSocketAddr};
262/// # use async_coap::null::NullLocalEndpoint;
263/// # use async_coap::message::MessageRead;
264/// # use std::borrow::Cow;
265/// #
266/// # let local_endpoint = Arc::new(DatagramLocalEndpoint::new(LoopbackSocket::new()));
267/// # let mut pool = LocalPool::new();
268/// #
269/// # pool.spawner().spawn_local(local_endpoint
270/// # .clone()
271/// # .receive_loop_arc(receive_handler)
272/// # .map(|_|unreachable!())
273/// # );
274/// #
275/// # let result = pool.run_until(
276/// # local_endpoint.send(
277/// # LoopbackSocketAddr::Unicast,
278/// # CoapRequest::get() // This is a CoAP GET request
279/// # .uri_host_path(None, rel_ref!("test")) // Add a path to the message
280/// # .emit_any_response() // Return the first response we get
281/// # )
282/// # );
283/// # println!("result: {:?}", result);
284/// # let result = result.unwrap();
285/// # assert_eq!(result.msg_code(), MsgCode::SuccessContent);
286/// # assert_eq!(result.msg_type(), MsgType::Ack);
287/// #
288/// #
289/// # let result = pool.run_until(
290/// # local_endpoint.send(
291/// # LoopbackSocketAddr::Unicast,
292/// # CoapRequest::post() // This is a CoAP POST request
293/// # .uri_host_path(None, rel_ref!("test")) // Add a path to the message
294/// # .emit_successful_response() // Return the first successful response we get
295/// # .inspect(|cx| {
296/// # // Inspect here since we currently can't do
297/// # // a detailed check in the return value.
298/// # assert_eq!(cx.message().msg_code(), MsgCode::ClientErrorMethodNotAllowed);
299/// # assert_eq!(cx.message().msg_type(), MsgType::Ack);
300/// # })
301/// # )
302/// # );
303/// # println!("result: {:?}", result);
304/// # assert_eq!(result.err(), Some(Error::ClientRequestError));
305/// #
306/// # let result = pool.run_until(
307/// # local_endpoint.send(
308/// # LoopbackSocketAddr::Unicast,
309/// # CoapRequest::get() // This is a CoAP GET request
310/// # .emit_successful_response() // Return the first successful response we get
311/// # .uri_host_path(None, rel_ref!("/foobar"))
312/// # .inspect(|cx| {
313/// # // Inspect here since we currently can't do
314/// # // a detailed check in the return value.
315/// # assert_eq!(cx.message().msg_code(), MsgCode::ClientErrorNotFound);
316/// # assert_eq!(cx.message().msg_type(), MsgType::Ack);
317/// # })
318/// # )
319/// # );
320/// # println!("result: {:?}", result);
321/// # assert_eq!(result.err(), Some(Error::ResourceNotFound));
322/// ```
323///
324pub trait LocalEndpoint: Sized {
325 /// The `SocketAddr` type to use with this local endpoint. This is usually
326 /// simply `std::net::SocketAddr`, but may be different in some cases (like for CoAP-SMS
327 /// endpoints).
328 type SocketAddr: SocketAddrExt
329 + ToSocketAddrs<SocketAddr = Self::SocketAddr, Error = Self::SocketError>;
330
331 /// The error type associated with errors generated by socket and address-lookup operations.
332 /// Typically, this is `std::io::Error`, but it may be different if `Self::SocketAddr` isn't
333 /// `std::net::SocketAddr`.
334 type SocketError: core::fmt::Debug;
335
336 /// The trait representing the default transmission parameters to use.
337 type DefaultTransParams: TransParams;
338
339 /// Type used by closure that is passed into `send()`, representing the context for the
340 /// response.
341 type InboundContext: InboundContext<SocketAddr = Self::SocketAddr>;
342
343 /// Type used by closure that is passed into `receive()`, representing the context for
344 /// inbound requests.
345 type RespondableInboundContext: RespondableInboundContext<SocketAddr = Self::SocketAddr>;
346
347 /// Returns a string representing the scheme of the underlying transport.
348 /// For example, this could return `"coap"`, `"coaps+sms"`, etc.
349 fn scheme(&self) -> &str;
350
351 /// Returns the default port to use when the port is unspecified. This value
352 /// is typically defined by the scheme. Returns zero if port numbers are ignored
353 /// by the underlying technology.
354 fn default_port(&self) -> u16;
355
356 /// The concrete return type of the `lookup()` method.
357 type LookupStream: Stream<Item = Self::SocketAddr> + Unpin;
358
359 /// Method for asynchronously looking up the `Self::SocketAddr` instances for the
360 /// given hostname and port.
361 fn lookup(&self, hostname: &str, port: u16) -> Result<Self::LookupStream, Error>;
362
363 /// The concrete type for a `RemoteEndpoint` associated with this local endpoint.
364 type RemoteEndpoint: RemoteEndpoint<
365 SocketAddr = Self::SocketAddr,
366 InboundContext = Self::InboundContext,
367 >;
368
369 /// Constructs a new [`RemoteEndpoint`] instance for the given address, host, and path.
370 fn remote_endpoint<S, H, P>(&self, addr: S, host: Option<H>, path: P) -> Self::RemoteEndpoint
371 where
372 S: ToSocketAddrs<SocketAddr = Self::SocketAddr, Error = Self::SocketError>,
373 H: Into<String>,
374 P: Into<RelRefBuf>;
375
376 /// Constructs a new [`RemoteEndpoint`] instance for the given Uri.
377 fn remote_endpoint_from_uri(&self, uri: &Uri) -> Result<Self::RemoteEndpoint, Error>;
378
379 /// Sends a message to `remote_addr` based on the criteria provided by
380 /// [`send_desc`][crate::SendDesc].
381 ///
382 /// `send_desc`, which implements [`SendDesc`][crate::SendDesc], is the real heavy lifter here.
383 /// It defines the message content, retransmit timing, resending logic---even the
384 /// return type of this method if defined by `send_desc`.
385 /// This flexibility allows this method to uniformly perform complex interactions
386 /// like [block transfers][IETF-RFC7959] and [resource observing][IETF-RFC7641].
387 ///
388 /// [IETF-RFC7959]: https://tools.ietf.org/html/rfc7959
389 /// [IETF-RFC7641]: https://tools.ietf.org/html/rfc7641
390 ///
391 /// A variant of this method, [`LocalEndpointExt::send_as_stream`], is used to
392 /// handle cases where multiple responses are expected, such as when sending
393 /// multicast requests or doing [resource observing][IETF-RFC7641].
394 ///
395 /// ## Performance Tips
396 ///
397 /// If you are going to be calling this method frequently for a destination that you are
398 /// referencing by a hostname, it will significantly improve performance on some platforms
399 /// if you only pass `SocketAddr` types to `remote_addr` and not rely on having `ToSocketAddrs`
400 /// do hostname lookups inside of `send`.
401 ///
402 /// The easiest way to do this is to use either the [`remote_endpoint`] or
403 /// [`remote_endpoint_from_uri`] methods to create a [`RemoteEndpoint`] instance
404 /// and call the [`send`][crate::RemoteEndpoint::send] method on that instead. From that
405 /// instance you can call `send` multiple times: any hostname that needs to be resolved
406 /// is calculated and cached when the `RemoteEndpoint` is first created.
407 ///
408 /// [`RemoteEndpoint`]: crate::RemoteEndpoint
409 /// [`remote_endpoint`]: LocalEndpoint::remote_endpoint
410 /// [`remote_endpoint_from_uri`]: LocalEndpoint::remote_endpoint_from_uri
411 ///
412 /// ## Transaction Tracking
413 ///
414 /// All state tracking the transmission of the message is stored in the returned future.
415 /// To cancel retransmits, drop the returned future.
416 ///
417 /// The returned future is lazily evaluated, so nothing will be transmitted unless the
418 /// returned future is polled: you cannot simply fire and forget. Because of this lazy
419 /// evaluation, the futures returned by this method do not need to be used immediately and
420 /// may be stored for later use if that happens to be useful.
421 ///
422 #[must_use = "nothing will be sent unless the returned future is polled"]
423 fn send<'a, S, R, SD>(
424 &'a self,
425 remote_addr: S,
426 send_desc: SD,
427 ) -> BoxFuture<'a, Result<R, Error>>
428 where
429 S: ToSocketAddrs<SocketAddr = Self::SocketAddr, Error = Self::SocketError> + 'a,
430 SD: SendDesc<Self::InboundContext, R> + 'a,
431 R: Send + 'a;
432
433 /// Receives a single request and runs the given `handler` on it once.
434 ///
435 /// Each call handles (at most) one single inbound request.
436 /// To handle multiple requests, call this function from a loop.
437 /// The [`LocalEndpointExt`] trait comes with some helpers to make
438 /// implementing such a loop easier: [`receive_as_stream`],
439 /// [`receive_loop`], and [`receive_loop_arc`].
440 ///
441 /// [`receive_as_stream`]: LocalEndpointExt::receive_as_stream
442 /// [`receive_loop`]: LocalEndpointExt::receive_loop
443 /// [`receive_loop_arc`]: LocalEndpointExt::receive_loop_arc
444 ///
445 /// *Properly calling this method in the background is absolutely critical to
446 /// the correct operation of this trait:* **[`send`] will not work without it**.
447 ///
448 /// [`send`]: LocalEndpoint::send
449 ///
450 /// Local endpoints which implement [`Sync`] can have this method called from multiple
451 /// threads, allowing multiple requests to be handled concurrently.
452 ///
453 /// ## Handler
454 ///
455 /// If you are going to be serving resources using this [`LocalEndpoint`], you
456 /// will need specify a handler to handle inbound requests.
457 /// See the section [Server Usage](#server-usage) above for an example.
458 ///
459 /// If instead you are only using this [`LocalEndpoint`] as a client, then you may pass
460 /// `null_receiver!()` as the handler, as shown in [Client Usage](#client-usage).
461 #[must_use = "nothing will be received unless the returned future is polled"]
462 fn receive<'a, F>(&'a self, handler: F) -> BoxFuture<'a, Result<(), Error>>
463 where
464 F: FnMut(&Self::RespondableInboundContext) -> Result<(), Error> + 'a + Send + Unpin;
465}
466
467/// Handler for [`LocalEndpoint::receive`] that does nothing and lets the underlying
468/// [`LocalEndpoint`] implementation decide how best to respond (if at all).
469#[macro_export]
470macro_rules! null_receiver {
471 ( ) => {
472 |_| Ok(())
473 };
474}
475
476/// Extension trait for [`LocalEndpoint`] which implements additional helper methods.
477pub trait LocalEndpointExt: LocalEndpoint {
478 /// Sends a message where multiple responses are expected, returned as a [`SendAsStream`].
479 ///
480 /// In this version of [`LocalEndpoint::send`], the `send_desc` can return
481 /// [`ResponseStatus::Done`] from its handler multiple times, with the results being emitted
482 /// from the returned [`SendAsStream`].
483 ///
484 /// The stream can be cleanly ended by the handler eventually returning
485 /// [`Error::ResponseTimeout`] or [`Error::Cancelled`], neither of which will be emitted
486 /// as an error.
487 fn send_as_stream<'a, S, R, SD>(&'a self, dest: S, send_desc: SD) -> SendAsStream<'a, R>
488 where
489 S: ToSocketAddrs<SocketAddr = Self::SocketAddr, Error = Self::SocketError> + 'a,
490 SD: SendDesc<Self::InboundContext, R> + 'a,
491 R: Send + 'a,
492 {
493 let (sender, receiver) = futures::channel::mpsc::channel::<Result<R, Error>>(10);
494
495 SendAsStream {
496 receiver,
497 send_future: self.send(dest, SendAsStreamDesc::new(send_desc, sender)),
498 }
499 }
500
501 /// Version of [`LocalEndpoint::receive`] that handles more than one inbound message,
502 /// returning a [`crate::ReceiveAsStream`] instead of a future.
503 ///
504 /// This stream will terminate immediately after any of the following errors are emitted by the
505 /// underlying calls to [`LocalEndpoint::receive`]:
506 ///
507 /// * [`Error::IOError`](enum_Error.html#variant.IOError)
508 /// * [`Error::Cancelled`](enum_Error.html#variant.Cancelled)
509 ///
510 /// All other errors are ignored.
511 fn receive_as_stream<'a, F>(&'a self, handler: F) -> ReceiveAsStream<'a, Self, F>
512 where
513 F: FnMut(&Self::RespondableInboundContext) -> Result<(), Error> + 'a + Clone + Unpin + Send,
514 {
515 ReceiveAsStream::new(self, handler)
516 }
517
518 /// Convenience method for implementing a [`receive`](LocalEndpoint::receive) loop.
519 ///
520 /// The returned future will terminate when the underlying [`crate::ReceiveAsStream`]
521 /// terminates, returning the error that was emitted before the stream terminated,
522 /// typically either [`Error::IOError`] or [`Error::Cancelled`].
523 fn receive_loop<'a, F>(&'a self, handler: F) -> Collect<ReceiveAsStream<'a, Self, F>, Error>
524 where
525 F: FnMut(&Self::RespondableInboundContext) -> Result<(), Error> + 'a + Clone + Unpin + Send,
526 {
527 self.receive_as_stream(handler).collect()
528 }
529
530 /// Version of [`LocalEndpointExt::receive_loop`] which consumes and holds an [`Arc<Self>`].
531 ///
532 /// [`LocalEndpoint`s][LocalEndpoint] are often held inside of an [`Arc<>`], which makes
533 /// using methods like [`LocalEndpointExt::receive_loop`] particularly awkward.
534 ///
535 /// `receive_loop_arc` makes this situation relatively painless by returning the receive loop
536 /// future in an (effectively transparent) [`ArcGuard`] wrapper.
537 ///
538 /// ```
539 /// # #![feature(async_await)]
540 /// #
541 /// # use std::sync::Arc;
542 /// # use futures::prelude::*;
543 /// # use async_coap::prelude::*;
544 /// # use async_coap::null::NullLocalEndpoint;
545 /// # use futures::executor::ThreadPool;
546 /// # use futures::task::SpawnExt;
547 ///
548 /// let local_endpoint = Arc::new(NullLocalEndpoint);
549 /// let mut pool = ThreadPool::new().expect("Unable to start thread pool");
550 ///
551 /// pool.spawn(local_endpoint
552 /// .clone()
553 /// .receive_loop_arc(null_receiver!())
554 /// .map(|err| panic!("Receive loop terminated: {}", err))
555 /// );
556 /// ```
557 fn receive_loop_arc<'a, F>(
558 self: Arc<Self>,
559 handler: F,
560 ) -> ArcGuard<Self, Collect<ReceiveAsStream<'a, Self, F>, Error>>
561 where
562 F: FnMut(&Self::RespondableInboundContext) -> Result<(), Error> + 'a + Clone + Send + Unpin,
563 Self: 'a,
564 {
565 self.guard(|x| x.receive_loop(handler))
566 }
567}
568
569/// Blanket implementation of `LocalEndpointExt` for all `LocalEndpoint` instances.
570impl<T: LocalEndpoint> LocalEndpointExt for T {}