cogo/io/sys/unix/net/
socket_write_vectored.rs1use std::io::{self, IoSlice};
2use std::sync::atomic::Ordering;
3use std::time::Duration;
4
5use super::super::{co_io_result, IoData};
6use crate::coroutine_impl::{CoroutineImpl, EventSource};
7use crate::io::AsIoData;
8use crate::scheduler::get_scheduler;
9use crate::yield_now::yield_with;
10
11pub struct SocketWriteVectored<'a> {
12 io_data: &'a IoData,
13 bufs: &'a [IoSlice<'a>],
14 socket: &'a std::net::TcpStream,
15 timeout: Option<Duration>,
16}
17
18impl<'a> SocketWriteVectored<'a> {
19 pub fn new<T: AsIoData>(
20 s: &'a T,
21 socket: &'a std::net::TcpStream,
22 bufs: &'a [IoSlice<'a>],
23 timeout: Option<Duration>,
24 ) -> Self {
25 SocketWriteVectored {
26 io_data: s.as_io_data(),
27 bufs,
28 socket,
29 timeout,
30 }
31 }
32
33 pub fn done(&mut self) -> io::Result<usize> {
34 use std::io::Write;
35
36 loop {
37 co_io_result()?;
38
39 self.io_data.io_flag.store(false, Ordering::Relaxed);
41
42 match self.socket.write_vectored(self.bufs) {
43 Ok(n) => return Ok(n),
44 Err(e) => {
45 let raw_err = e.raw_os_error();
46 if raw_err == Some(libc::EAGAIN) || raw_err == Some(libc::EWOULDBLOCK) {
47 } else {
49 return Err(e);
50 }
51 }
52 }
53
54 if self.io_data.io_flag.swap(false, Ordering::Relaxed) {
55 continue;
56 }
57
58 yield_with(self);
60 }
61 }
62}
63
64impl<'a> EventSource for SocketWriteVectored<'a> {
65 fn subscribe(&mut self, co: CoroutineImpl) {
66 let io_data = (*self.io_data).clone();
67
68 if let Some(dur) = self.timeout {
69 get_scheduler()
70 .get_selector()
71 .add_io_timer(self.io_data, dur);
72 }
73 self.io_data.co.swap(co);
74
75 if io_data.io_flag.load(Ordering::Acquire) {
77 io_data.schedule();
78 }
79 }
80}