webrtc_connection/rtc_channel/
async_write.rs1
2use std::{sync::Arc, task::Poll, pin::Pin, io::{ErrorKind, self}};
3
4use bytes::Bytes;
5use cs_trace::{Tracer, child};
6use tokio::sync::Mutex;
7use tokio::io::AsyncWrite;
8use webrtc::data_channel::RTCDataChannel;
9
10use super::RtcChannel;
11
12pub async fn write_to_channel(
13 trace: Box<dyn Tracer>,
14 channel: Arc<Mutex<Arc<RTCDataChannel>>>,
15 data: Bytes,
16) -> Result<usize, std::io::Error> {
17 let lock = channel
18 .lock()
19 .await;
20
21 trace.trace(
22 &format!("sending data: {:0>2X?}", &data[..]),
23 );
24
25 let result = lock
26 .send(&data)
27 .await;
28
29 match result {
30 Err(error) => {
31 return Err(
32 io::Error::new(
33 ErrorKind::WriteZero,
34 format!("Cannot send data: {:?}.", error),
35 ),
36 );
37 },
38 Ok(bytes_written) => return Ok(bytes_written),
39 };
40}
41
42impl AsyncWrite for RtcChannel {
43 fn poll_write(
44 mut self: Pin<&mut Self>,
45 cx: &mut std::task::Context<'_>,
46 buf: &[u8],
47 ) -> Poll<Result<usize, io::Error>> {
48 let trace = &self.trace;
49 let trace = child!(trace, "async-write");
50
51 let maybe_future = self.current_write_future.take();
52 let mut new_future = match maybe_future {
53 None => {
55 Box::pin(
56 write_to_channel(
57 child!(trace, "write_to_channel"),
58 Arc::clone(&self.channel),
59 Bytes::from(buf.to_vec()),
60 ),
61 )
62 },
63 Some(future) => future,
65 };
66
67 let result = new_future.as_mut().poll(cx);
69 match result {
70 Poll::Pending => {
72 self.current_write_future = Some(new_future);
73
74 return Poll::Pending;
75 },
76 Poll::Ready(maybe_message) => {
78 match maybe_message {
79 Err(error) => return Poll::Ready(Err(error)),
80 Ok(bytes_written) => {
81 return Poll::Ready(Ok(bytes_written));
82 },
83 };
84 }
85 }
86 }
87
88 fn poll_flush(
89 mut self: Pin<&mut Self>,
90 cx: &mut std::task::Context<'_>,
91 ) -> Poll<Result<(), io::Error>> {
92 let maybe_future = self.current_write_future.take();
93 match maybe_future {
94 None => {
96 return Poll::Ready(Ok(()));
97 },
98 Some(mut future) => {
100 let result = future.as_mut().poll(cx);
102 match result {
103 Poll::Pending => {
105 return Poll::Pending;
106 },
107 Poll::Ready(maybe_message) => {
109 match maybe_message {
110 Err(error) => return Poll::Ready(Err(error)),
111 Ok(_) => {
112 return Poll::Ready(Ok(()));
113 },
114 };
115 }
116 }
117 },
118 };
119 }
120
121 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Result<(), io::Error>> {
122 return AsyncWrite::poll_flush(self, cx);
123 }
124}