use std::task::Poll;
use std::sync::Arc;
use std::io::{self, ErrorKind};
use super::RtcChannel;
use webrtc::data_channel::data_channel_message::DataChannelMessage;
use tokio::sync::{mpsc, Mutex};
use tokio::io::AsyncRead;
use cs_trace::{Tracer, child};
pub async fn read_from_queue(
trace: Box<dyn Tracer>,
queue: Arc<Mutex<mpsc::Receiver<DataChannelMessage>>>,
) -> Result<DataChannelMessage, std::io::Error> {
let maybe_message = queue
.lock()
.await
.recv()
.await;
let message = match maybe_message {
None => {
return Err(io::Error::new(
ErrorKind::ConnectionReset,
"No message found.",
));
},
Some(msg) => msg,
};
trace.trace(
&format!("got message: {:0>2X?}", &message.data[..]),
);
return Ok(message);
}
impl AsyncRead for RtcChannel {
fn poll_read(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let trace = &self.trace;
let trace = child!(trace, "async-read");
let maybe_future = self.current_read_future.take();
let mut new_future = match maybe_future {
None => {
Box::pin(
read_from_queue(
child!(trace, "read_from_queue"),
Arc::clone(&self.read_message_queue),
),
)
},
Some(future) => future,
};
let result = new_future.as_mut().poll(cx);
match result {
Poll::Pending => {
self.current_read_future = Some(new_future);
return Poll::Pending;
},
Poll::Ready(maybe_message) => {
match maybe_message {
Err(error) => return Poll::Ready(Err(error)),
Ok(message) => {
buf.put_slice(&message.data[..]);
return Poll::Ready(Ok(()));
},
};
}
}
}
}