cogo/io/sys/unix/net/
socket_write_vectored.rs

1use std::io::{self, IoSlice};
2use std::sync::atomic::Ordering;
3use std::time::Duration;
4
5use super::super::{co_io_result, IoData};
6use crate::coroutine_impl::{CoroutineImpl, EventSource};
7use crate::io::AsIoData;
8use crate::scheduler::get_scheduler;
9use crate::yield_now::yield_with;
10
11pub struct SocketWriteVectored<'a> {
12    io_data: &'a IoData,
13    bufs: &'a [IoSlice<'a>],
14    socket: &'a std::net::TcpStream,
15    timeout: Option<Duration>,
16}
17
18impl<'a> SocketWriteVectored<'a> {
19    pub fn new<T: AsIoData>(
20        s: &'a T,
21        socket: &'a std::net::TcpStream,
22        bufs: &'a [IoSlice<'a>],
23        timeout: Option<Duration>,
24    ) -> Self {
25        SocketWriteVectored {
26            io_data: s.as_io_data(),
27            bufs,
28            socket,
29            timeout,
30        }
31    }
32
33    pub fn done(&mut self) -> io::Result<usize> {
34        use std::io::Write;
35
36        loop {
37            co_io_result()?;
38
39            // clear the io_flag
40            self.io_data.io_flag.store(false, Ordering::Relaxed);
41
42            match self.socket.write_vectored(self.bufs) {
43                Ok(n) => return Ok(n),
44                Err(e) => {
45                    let raw_err = e.raw_os_error();
46                    if raw_err == Some(libc::EAGAIN) || raw_err == Some(libc::EWOULDBLOCK) {
47                        // do nothing here
48                    } else {
49                        return Err(e);
50                    }
51                }
52            }
53
54            if self.io_data.io_flag.swap(false, Ordering::Relaxed) {
55                continue;
56            }
57
58            // the result is still WouldBlock, need to try again
59            yield_with(self);
60        }
61    }
62}
63
64impl<'a> EventSource for SocketWriteVectored<'a> {
65    fn subscribe(&mut self, co: CoroutineImpl) {
66        let io_data = (*self.io_data).clone();
67
68        if let Some(dur) = self.timeout {
69            get_scheduler()
70                .get_selector()
71                .add_io_timer(self.io_data, dur);
72        }
73        self.io_data.co.swap(co);
74
75        // there is event, re-run the coroutine
76        if io_data.io_flag.load(Ordering::Acquire) {
77            io_data.schedule();
78        }
79    }
80}