webrtc-connection 0.2.0

WebRTC connection.
Documentation

use std::{sync::Arc, task::Poll, pin::Pin, io::{ErrorKind, self}}; 

use bytes::Bytes;
use cs_trace::{Tracer, child};
use tokio::sync::Mutex;
use tokio::io::AsyncWrite;
use webrtc::data_channel::RTCDataChannel;

use super::RtcChannel;

pub async fn write_to_channel(
    trace: Box<dyn Tracer>,
    channel: Arc<Mutex<Arc<RTCDataChannel>>>,
    data: Bytes,
) -> Result<usize, std::io::Error> {
    let lock = channel
        .lock()
        .await;

    trace.trace(
        &format!("sending data: {:0>2X?}", &data[..]),
    );

    let result = lock
        .send(&data)
        .await;

    match result {
        Err(error) => {
            return Err(
                io::Error::new(
                    ErrorKind::WriteZero,
                    format!("Cannot send data: {:?}.", error),
                ),
            );
        },
        Ok(bytes_written) => return Ok(bytes_written),
    };
}

impl AsyncWrite for RtcChannel {
    fn poll_write(
        mut self: Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
        buf: &[u8],
    ) -> Poll<Result<usize, io::Error>> {
        let trace = &self.trace;
        let trace = child!(trace, "async-write");

        let maybe_future = self.current_write_future.take();
        let mut new_future = match maybe_future {
            // if no future saved, create a new one
            None => {
                Box::pin(
                    write_to_channel(
                        child!(trace, "write_to_channel"),
                        Arc::clone(&self.channel),
                        Bytes::from(buf.to_vec()),
                    ),
                )
            },
            // if future was saved, use it
            Some(future) => future,
        };

        // poll the future
        let result = new_future.as_mut().poll(cx);
        match result {
            // if still pending, save the future and sleep
            Poll::Pending => {
                self.current_write_future = Some(new_future);

                return Poll::Pending;
            },
            // if ready, return the result
            Poll::Ready(maybe_message) => {
                match maybe_message {
                    Err(error) => return Poll::Ready(Err(error)),
                    Ok(bytes_written) => {
                        return Poll::Ready(Ok(bytes_written));
                    },
                };
            }
        }
    }

    fn poll_flush(
        mut self: Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> Poll<Result<(), io::Error>> {
        let maybe_future = self.current_write_future.take();
        match maybe_future {
            // if no future saved, create a new one
            None => {
                return Poll::Ready(Ok(()));
            },
            // if future was saved, use it
            Some(mut future) => {
                // poll the future
                let result = future.as_mut().poll(cx);
                match result {
                    // if still pending, save the future and sleep
                    Poll::Pending => {
                        return Poll::Pending;
                    },
                    // if ready, return the result
                    Poll::Ready(maybe_message) => {
                        match maybe_message {
                            Err(error) => return Poll::Ready(Err(error)),
                            Ok(_) => {
                                return Poll::Ready(Ok(()));
                            },
                        };
                    }
                }
            },
        };
    }

    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Result<(), io::Error>> {
        return AsyncWrite::poll_flush(self, cx);
    }
}