webrtc_connection/rtc_channel/
async_read.rs

1
2use std::task::Poll;
3use std::sync::Arc;
4use std::io::{self, ErrorKind};
5
6use super::RtcChannel;
7
8use webrtc::data_channel::data_channel_message::DataChannelMessage;
9
10use tokio::sync::{mpsc, Mutex};
11use tokio::io::AsyncRead;
12
13use cs_trace::{Tracer, child};
14
15pub async fn read_from_queue(
16    trace: Box<dyn Tracer>,
17    queue: Arc<Mutex<mpsc::Receiver<DataChannelMessage>>>,
18) -> Result<DataChannelMessage, std::io::Error> {
19    let maybe_message = queue
20        .lock()
21        .await
22        .recv()
23        .await;
24
25    let message = match maybe_message {
26        None => {
27            return Err(io::Error::new(
28                ErrorKind::ConnectionReset,
29                "No message found.",
30            ));
31        },
32        Some(msg) => msg,
33    };
34
35    trace.trace(
36        &format!("got message: {:0>2X?}", &message.data[..]),
37    );
38
39    return Ok(message);
40}
41
42impl AsyncRead for RtcChannel {
43    fn poll_read(
44        mut self: std::pin::Pin<&mut Self>,
45        cx: &mut std::task::Context<'_>,
46        buf: &mut tokio::io::ReadBuf<'_>,
47    ) -> Poll<io::Result<()>> {
48        let trace = &self.trace;
49        let trace = child!(trace, "async-read");
50
51        let maybe_future = self.current_read_future.take();
52        let mut new_future = match maybe_future {
53            // if no future saved, create a new one
54            None => {
55                Box::pin(
56                    read_from_queue(
57                        child!(trace, "read_from_queue"),
58                        Arc::clone(&self.read_message_queue),
59                    ),
60                )
61            },
62            // if future was saved, use it
63            Some(future) => future,
64        };
65
66        // poll the future
67        let result = new_future.as_mut().poll(cx);
68        match result {
69            // if still pending, save the future and sleep
70            Poll::Pending => {
71                self.current_read_future = Some(new_future);
72
73                return Poll::Pending;
74            },
75            // if ready, return the result
76            Poll::Ready(maybe_message) => {
77                match maybe_message {
78                    Err(error) => return Poll::Ready(Err(error)),
79                    Ok(message) => {
80                        buf.put_slice(&message.data[..]);
81
82                        return Poll::Ready(Ok(()));
83                    },
84                };
85            }
86        }
87    }
88}