1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124

use std::{sync::Arc, task::Poll, pin::Pin, io::{ErrorKind, self}}; 

use bytes::Bytes;
use cs_trace::{Tracer, child};
use tokio::sync::Mutex;
use tokio::io::AsyncWrite;
use webrtc::data_channel::RTCDataChannel;

use super::RtcChannel;

pub async fn write_to_channel(
    trace: Box<dyn Tracer>,
    channel: Arc<Mutex<Arc<RTCDataChannel>>>,
    data: Bytes,
) -> Result<usize, std::io::Error> {
    let lock = channel
        .lock()
        .await;

    trace.trace(
        &format!("sending data: {:0>2X?}", &data[..]),
    );

    let result = lock
        .send(&data)
        .await;

    match result {
        Err(error) => {
            return Err(
                io::Error::new(
                    ErrorKind::WriteZero,
                    format!("Cannot send data: {:?}.", error),
                ),
            );
        },
        Ok(bytes_written) => return Ok(bytes_written),
    };
}

impl AsyncWrite for RtcChannel {
    fn poll_write(
        mut self: Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
        buf: &[u8],
    ) -> Poll<Result<usize, io::Error>> {
        let trace = &self.trace;
        let trace = child!(trace, "async-write");

        let maybe_future = self.current_write_future.take();
        let mut new_future = match maybe_future {
            // if no future saved, create a new one
            None => {
                Box::pin(
                    write_to_channel(
                        child!(trace, "write_to_channel"),
                        Arc::clone(&self.channel),
                        Bytes::from(buf.to_vec()),
                    ),
                )
            },
            // if future was saved, use it
            Some(future) => future,
        };

        // poll the future
        let result = new_future.as_mut().poll(cx);
        match result {
            // if still pending, save the future and sleep
            Poll::Pending => {
                self.current_write_future = Some(new_future);

                return Poll::Pending;
            },
            // if ready, return the result
            Poll::Ready(maybe_message) => {
                match maybe_message {
                    Err(error) => return Poll::Ready(Err(error)),
                    Ok(bytes_written) => {
                        return Poll::Ready(Ok(bytes_written));
                    },
                };
            }
        }
    }

    fn poll_flush(
        mut self: Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> Poll<Result<(), io::Error>> {
        let maybe_future = self.current_write_future.take();
        match maybe_future {
            // if no future saved, create a new one
            None => {
                return Poll::Ready(Ok(()));
            },
            // if future was saved, use it
            Some(mut future) => {
                // poll the future
                let result = future.as_mut().poll(cx);
                match result {
                    // if still pending, save the future and sleep
                    Poll::Pending => {
                        return Poll::Pending;
                    },
                    // if ready, return the result
                    Poll::Ready(maybe_message) => {
                        match maybe_message {
                            Err(error) => return Poll::Ready(Err(error)),
                            Ok(_) => {
                                return Poll::Ready(Ok(()));
                            },
                        };
                    }
                }
            },
        };
    }

    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Result<(), io::Error>> {
        return AsyncWrite::poll_flush(self, cx);
    }
}