webrtc_connection/rtc_channel/
async_write.rs

1
2use std::{sync::Arc, task::Poll, pin::Pin, io::{ErrorKind, self}}; 
3
4use bytes::Bytes;
5use cs_trace::{Tracer, child};
6use tokio::sync::Mutex;
7use tokio::io::AsyncWrite;
8use webrtc::data_channel::RTCDataChannel;
9
10use super::RtcChannel;
11
12pub async fn write_to_channel(
13    trace: Box<dyn Tracer>,
14    channel: Arc<Mutex<Arc<RTCDataChannel>>>,
15    data: Bytes,
16) -> Result<usize, std::io::Error> {
17    let lock = channel
18        .lock()
19        .await;
20
21    trace.trace(
22        &format!("sending data: {:0>2X?}", &data[..]),
23    );
24
25    let result = lock
26        .send(&data)
27        .await;
28
29    match result {
30        Err(error) => {
31            return Err(
32                io::Error::new(
33                    ErrorKind::WriteZero,
34                    format!("Cannot send data: {:?}.", error),
35                ),
36            );
37        },
38        Ok(bytes_written) => return Ok(bytes_written),
39    };
40}
41
42impl AsyncWrite for RtcChannel {
43    fn poll_write(
44        mut self: Pin<&mut Self>,
45        cx: &mut std::task::Context<'_>,
46        buf: &[u8],
47    ) -> Poll<Result<usize, io::Error>> {
48        let trace = &self.trace;
49        let trace = child!(trace, "async-write");
50
51        let maybe_future = self.current_write_future.take();
52        let mut new_future = match maybe_future {
53            // if no future saved, create a new one
54            None => {
55                Box::pin(
56                    write_to_channel(
57                        child!(trace, "write_to_channel"),
58                        Arc::clone(&self.channel),
59                        Bytes::from(buf.to_vec()),
60                    ),
61                )
62            },
63            // if future was saved, use it
64            Some(future) => future,
65        };
66
67        // poll the future
68        let result = new_future.as_mut().poll(cx);
69        match result {
70            // if still pending, save the future and sleep
71            Poll::Pending => {
72                self.current_write_future = Some(new_future);
73
74                return Poll::Pending;
75            },
76            // if ready, return the result
77            Poll::Ready(maybe_message) => {
78                match maybe_message {
79                    Err(error) => return Poll::Ready(Err(error)),
80                    Ok(bytes_written) => {
81                        return Poll::Ready(Ok(bytes_written));
82                    },
83                };
84            }
85        }
86    }
87
88    fn poll_flush(
89        mut self: Pin<&mut Self>,
90        cx: &mut std::task::Context<'_>,
91    ) -> Poll<Result<(), io::Error>> {
92        let maybe_future = self.current_write_future.take();
93        match maybe_future {
94            // if no future saved, create a new one
95            None => {
96                return Poll::Ready(Ok(()));
97            },
98            // if future was saved, use it
99            Some(mut future) => {
100                // poll the future
101                let result = future.as_mut().poll(cx);
102                match result {
103                    // if still pending, save the future and sleep
104                    Poll::Pending => {
105                        return Poll::Pending;
106                    },
107                    // if ready, return the result
108                    Poll::Ready(maybe_message) => {
109                        match maybe_message {
110                            Err(error) => return Poll::Ready(Err(error)),
111                            Ok(_) => {
112                                return Poll::Ready(Ok(()));
113                            },
114                        };
115                    }
116                }
117            },
118        };
119    }
120
121    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Result<(), io::Error>> {
122        return AsyncWrite::poll_flush(self, cx);
123    }
124}