webrtc_connection/rtc_channel/
async_read.rs1
2use std::task::Poll;
3use std::sync::Arc;
4use std::io::{self, ErrorKind};
5
6use super::RtcChannel;
7
8use webrtc::data_channel::data_channel_message::DataChannelMessage;
9
10use tokio::sync::{mpsc, Mutex};
11use tokio::io::AsyncRead;
12
13use cs_trace::{Tracer, child};
14
15pub async fn read_from_queue(
16 trace: Box<dyn Tracer>,
17 queue: Arc<Mutex<mpsc::Receiver<DataChannelMessage>>>,
18) -> Result<DataChannelMessage, std::io::Error> {
19 let maybe_message = queue
20 .lock()
21 .await
22 .recv()
23 .await;
24
25 let message = match maybe_message {
26 None => {
27 return Err(io::Error::new(
28 ErrorKind::ConnectionReset,
29 "No message found.",
30 ));
31 },
32 Some(msg) => msg,
33 };
34
35 trace.trace(
36 &format!("got message: {:0>2X?}", &message.data[..]),
37 );
38
39 return Ok(message);
40}
41
42impl AsyncRead for RtcChannel {
43 fn poll_read(
44 mut self: std::pin::Pin<&mut Self>,
45 cx: &mut std::task::Context<'_>,
46 buf: &mut tokio::io::ReadBuf<'_>,
47 ) -> Poll<io::Result<()>> {
48 let trace = &self.trace;
49 let trace = child!(trace, "async-read");
50
51 let maybe_future = self.current_read_future.take();
52 let mut new_future = match maybe_future {
53 None => {
55 Box::pin(
56 read_from_queue(
57 child!(trace, "read_from_queue"),
58 Arc::clone(&self.read_message_queue),
59 ),
60 )
61 },
62 Some(future) => future,
64 };
65
66 let result = new_future.as_mut().poll(cx);
68 match result {
69 Poll::Pending => {
71 self.current_read_future = Some(new_future);
72
73 return Poll::Pending;
74 },
75 Poll::Ready(maybe_message) => {
77 match maybe_message {
78 Err(error) => return Poll::Ready(Err(error)),
79 Ok(message) => {
80 buf.put_slice(&message.data[..]);
81
82 return Poll::Ready(Ok(()));
83 },
84 };
85 }
86 }
87 }
88}