Skip to main content

bytes_handoff/
write.rs

1use bytes::Bytes;
2use std::io::{self, IoSlice};
3use std::sync::Arc;
4use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
5use tokio::io::{AsyncWrite, AsyncWriteExt};
6use tokio::sync::{Notify, mpsc, oneshot};
7
8use crate::{WriteBackpressure, WriteError};
9
10const MAX_BATCH_ITEMS: usize = 64;
11const MAX_BATCH_BYTES: usize = 1024 * 1024;
12
13#[derive(Clone, Copy, Debug)]
14pub struct WriteHandoffConfig {
15    pub max_items: usize,
16    pub max_pending_bytes: usize,
17}
18
19impl WriteHandoffConfig {
20    pub fn new(max_items: usize, max_pending_bytes: usize) -> Self {
21        Self {
22            max_items,
23            max_pending_bytes,
24        }
25    }
26}
27
28pub struct WriteHandoff {
29    tx: mpsc::Sender<WriteMessage>,
30    budget: Arc<Budget>,
31    closed: Arc<AtomicBool>,
32}
33
34#[derive(Debug)]
35pub struct WriteTicket {
36    rx: oneshot::Receiver<WriteCompletion>,
37}
38
39#[derive(Debug)]
40pub struct WriteCompletion {
41    result: Result<(), WriteError>,
42}
43
44struct WriteRequest {
45    bytes: Bytes,
46    completion: Option<oneshot::Sender<WriteCompletion>>,
47    budget_bytes: usize,
48}
49
50enum WriteMessage {
51    Write(WriteRequest),
52    Shutdown,
53}
54
55struct Budget {
56    pending: AtomicUsize,
57    closed: AtomicBool,
58    notify: Notify,
59    limit: usize,
60}
61
62#[derive(Debug)]
63enum BudgetAcquireError {
64    Closed,
65    LimitExceeded { attempted: usize, limit: usize },
66}
67
68impl WriteHandoff {
69    pub fn spawn<W>(writer: W, config: WriteHandoffConfig) -> Self
70    where
71        W: AsyncWrite + Unpin + Send + 'static,
72    {
73        let (tx, rx) = mpsc::channel(config.max_items);
74        let budget = Arc::new(Budget::new(config.max_pending_bytes));
75        let closed = Arc::new(AtomicBool::new(false));
76        tokio::spawn(writer_loop(writer, rx, closed.clone(), budget.clone()));
77
78        Self { tx, budget, closed }
79    }
80
81    pub fn try_write(&self, bytes: Bytes) -> Result<WriteTicket, WriteBackpressure> {
82        if self.closed.load(Ordering::Acquire) {
83            return Err(WriteBackpressure::closed(bytes));
84        }
85        let permit = match self.budget.try_acquire(bytes.len()) {
86            Ok(permit) => permit,
87            Err(BudgetAcquireError::Closed) => return Err(WriteBackpressure::closed(bytes)),
88            Err(BudgetAcquireError::LimitExceeded { attempted, limit }) => {
89                return Err(WriteBackpressure::byte_budget_exceeded(
90                    bytes, attempted, limit,
91                ));
92            }
93        };
94        if self.closed.load(Ordering::Acquire) {
95            self.budget.release(permit);
96            return Err(WriteBackpressure::closed(bytes));
97        }
98
99        let (completion, rx) = oneshot::channel();
100        let request = WriteRequest {
101            bytes,
102            completion: Some(completion),
103            budget_bytes: permit,
104        };
105        match self.tx.try_send(WriteMessage::Write(request)) {
106            Ok(()) => Ok(WriteTicket { rx }),
107            Err(mpsc::error::TrySendError::Full(WriteMessage::Write(mut request))) => {
108                self.budget.release(request.budget_bytes);
109                request.budget_bytes = 0;
110                Err(WriteBackpressure::queue_full(request.bytes))
111            }
112            Err(mpsc::error::TrySendError::Closed(WriteMessage::Write(mut request))) => {
113                self.budget.release(request.budget_bytes);
114                request.budget_bytes = 0;
115                self.closed.store(true, Ordering::Release);
116                self.budget.close();
117                Err(WriteBackpressure::closed(request.bytes))
118            }
119            Err(mpsc::error::TrySendError::Full(WriteMessage::Shutdown))
120            | Err(mpsc::error::TrySendError::Closed(WriteMessage::Shutdown)) => unreachable!(),
121        }
122    }
123
124    pub fn try_write_fire_and_forget(&self, bytes: Bytes) -> Result<(), WriteBackpressure> {
125        if self.closed.load(Ordering::Acquire) {
126            return Err(WriteBackpressure::closed(bytes));
127        }
128        let permit = match self.budget.try_acquire(bytes.len()) {
129            Ok(permit) => permit,
130            Err(BudgetAcquireError::Closed) => return Err(WriteBackpressure::closed(bytes)),
131            Err(BudgetAcquireError::LimitExceeded { attempted, limit }) => {
132                return Err(WriteBackpressure::byte_budget_exceeded(
133                    bytes, attempted, limit,
134                ));
135            }
136        };
137        if self.closed.load(Ordering::Acquire) {
138            self.budget.release(permit);
139            return Err(WriteBackpressure::closed(bytes));
140        }
141
142        let request = WriteRequest {
143            bytes,
144            completion: None,
145            budget_bytes: permit,
146        };
147        match self.tx.try_send(WriteMessage::Write(request)) {
148            Ok(()) => Ok(()),
149            Err(mpsc::error::TrySendError::Full(WriteMessage::Write(mut request))) => {
150                self.budget.release(request.budget_bytes);
151                request.budget_bytes = 0;
152                Err(WriteBackpressure::queue_full(request.bytes))
153            }
154            Err(mpsc::error::TrySendError::Closed(WriteMessage::Write(mut request))) => {
155                self.budget.release(request.budget_bytes);
156                request.budget_bytes = 0;
157                self.closed.store(true, Ordering::Release);
158                self.budget.close();
159                Err(WriteBackpressure::closed(request.bytes))
160            }
161            Err(mpsc::error::TrySendError::Full(WriteMessage::Shutdown))
162            | Err(mpsc::error::TrySendError::Closed(WriteMessage::Shutdown)) => unreachable!(),
163        }
164    }
165
166    pub async fn write(&self, bytes: Bytes) -> Result<WriteTicket, WriteError> {
167        if self.closed.load(Ordering::Acquire) {
168            return Err(WriteError::Closed);
169        }
170        let permit = match self.budget.acquire(bytes.len()).await {
171            Ok(permit) => permit,
172            Err(BudgetAcquireError::Closed) => return Err(WriteError::Closed),
173            Err(BudgetAcquireError::LimitExceeded { attempted, limit }) => {
174                return Err(WriteError::ByteBudgetExceeded { attempted, limit });
175            }
176        };
177        if self.closed.load(Ordering::Acquire) {
178            self.budget.release(permit);
179            return Err(WriteError::Closed);
180        }
181
182        let (completion, rx) = oneshot::channel();
183        let request = WriteRequest {
184            bytes,
185            completion: Some(completion),
186            budget_bytes: permit,
187        };
188        if let Err(err) = self.tx.send(WriteMessage::Write(request)).await {
189            let WriteMessage::Write(mut request) = err.0 else {
190                unreachable!();
191            };
192            self.budget.release(request.budget_bytes);
193            request.budget_bytes = 0;
194            self.closed.store(true, Ordering::Release);
195            self.budget.close();
196            return Err(WriteError::Closed);
197        }
198        Ok(WriteTicket { rx })
199    }
200
201    pub async fn write_fire_and_forget(&self, bytes: Bytes) -> Result<(), WriteError> {
202        if self.closed.load(Ordering::Acquire) {
203            return Err(WriteError::Closed);
204        }
205        let permit = match self.budget.acquire(bytes.len()).await {
206            Ok(permit) => permit,
207            Err(BudgetAcquireError::Closed) => return Err(WriteError::Closed),
208            Err(BudgetAcquireError::LimitExceeded { attempted, limit }) => {
209                return Err(WriteError::ByteBudgetExceeded { attempted, limit });
210            }
211        };
212        if self.closed.load(Ordering::Acquire) {
213            self.budget.release(permit);
214            return Err(WriteError::Closed);
215        }
216
217        let request = WriteRequest {
218            bytes,
219            completion: None,
220            budget_bytes: permit,
221        };
222        if let Err(err) = self.tx.send(WriteMessage::Write(request)).await {
223            let WriteMessage::Write(mut request) = err.0 else {
224                unreachable!();
225            };
226            self.budget.release(request.budget_bytes);
227            request.budget_bytes = 0;
228            self.closed.store(true, Ordering::Release);
229            self.budget.close();
230            return Err(WriteError::Closed);
231        }
232        Ok(())
233    }
234
235    pub fn pending_bytes(&self) -> usize {
236        self.budget.pending()
237    }
238
239    pub fn close(&self) {
240        self.closed.store(true, Ordering::Release);
241        self.budget.close();
242        let _ = self.tx.try_send(WriteMessage::Shutdown);
243    }
244}
245
246impl Clone for WriteHandoff {
247    fn clone(&self) -> Self {
248        Self {
249            tx: self.tx.clone(),
250            budget: self.budget.clone(),
251            closed: self.closed.clone(),
252        }
253    }
254}
255
256impl WriteTicket {
257    pub async fn wait(self) -> Result<(), WriteError> {
258        match self.rx.await {
259            Ok(completion) => completion.result,
260            Err(_) => Err(WriteError::Closed),
261        }
262    }
263}
264
265impl Budget {
266    fn new(limit: usize) -> Self {
267        Self {
268            pending: AtomicUsize::new(0),
269            closed: AtomicBool::new(false),
270            notify: Notify::new(),
271            limit,
272        }
273    }
274
275    fn try_acquire(&self, bytes: usize) -> Result<usize, BudgetAcquireError> {
276        if bytes > self.limit {
277            return Err(BudgetAcquireError::LimitExceeded {
278                attempted: self.pending().saturating_add(bytes),
279                limit: self.limit,
280            });
281        }
282        if self.closed.load(Ordering::Acquire) {
283            return Err(BudgetAcquireError::Closed);
284        }
285
286        let mut current = self.pending.load(Ordering::Relaxed);
287        loop {
288            let Some(next) = current.checked_add(bytes) else {
289                return Err(BudgetAcquireError::LimitExceeded {
290                    attempted: usize::MAX,
291                    limit: self.limit,
292                });
293            };
294            if next > self.limit {
295                return Err(BudgetAcquireError::LimitExceeded {
296                    attempted: next,
297                    limit: self.limit,
298                });
299            }
300            match self.pending.compare_exchange_weak(
301                current,
302                next,
303                Ordering::AcqRel,
304                Ordering::Relaxed,
305            ) {
306                Ok(_) => return Ok(bytes),
307                Err(actual) => {
308                    current = actual;
309                    if self.closed.load(Ordering::Acquire) {
310                        return Err(BudgetAcquireError::Closed);
311                    }
312                }
313            }
314        }
315    }
316
317    async fn acquire(&self, bytes: usize) -> Result<usize, BudgetAcquireError> {
318        if bytes > self.limit {
319            return Err(BudgetAcquireError::LimitExceeded {
320                attempted: self.pending().saturating_add(bytes),
321                limit: self.limit,
322            });
323        }
324
325        loop {
326            let notified = self.notify.notified();
327            match self.try_acquire(bytes) {
328                Ok(acquired) => return Ok(acquired),
329                Err(BudgetAcquireError::Closed) => return Err(BudgetAcquireError::Closed),
330                Err(BudgetAcquireError::LimitExceeded { .. }) => {}
331            }
332            notified.await;
333        }
334    }
335
336    fn release(&self, bytes: usize) {
337        if bytes == 0 {
338            return;
339        }
340        let previous = self.pending.fetch_sub(bytes, Ordering::AcqRel);
341        debug_assert!(previous >= bytes, "released more bytes than acquired");
342        self.notify.notify_waiters();
343    }
344
345    fn pending(&self) -> usize {
346        self.pending.load(Ordering::Acquire)
347    }
348
349    fn close(&self) {
350        self.closed.store(true, Ordering::Release);
351        self.notify.notify_waiters();
352    }
353}
354
355async fn writer_loop<W>(
356    mut writer: W,
357    mut rx: mpsc::Receiver<WriteMessage>,
358    closed: Arc<AtomicBool>,
359    budget: Arc<Budget>,
360) where
361    W: AsyncWrite + Unpin,
362{
363    let mut messages = Vec::with_capacity(MAX_BATCH_ITEMS);
364    let mut requests = Vec::with_capacity(MAX_BATCH_ITEMS);
365
366    loop {
367        messages.clear();
368        let received = rx.recv_many(&mut messages, MAX_BATCH_ITEMS).await;
369        if received == 0 {
370            break;
371        }
372
373        let mut shutdown = false;
374        for message in messages.drain(..) {
375            match message {
376                WriteMessage::Write(request) if !shutdown => requests.push(request),
377                WriteMessage::Write(mut request) => {
378                    complete_request(&budget, &mut request, Err(WriteError::Closed));
379                }
380                WriteMessage::Shutdown => shutdown = true,
381            }
382        }
383
384        if write_request_batches(&mut writer, &budget, &mut requests)
385            .await
386            .is_err()
387        {
388            closed.store(true, Ordering::Release);
389            budget.close();
390            drain_closed(&budget, &mut rx);
391            return;
392        }
393        requests.clear();
394
395        if shutdown || (closed.load(Ordering::Acquire) && rx.is_empty()) {
396            break;
397        }
398    }
399    closed.store(true, Ordering::Release);
400    budget.close();
401    drain_closed(&budget, &mut rx);
402}
403
404async fn write_request_batches<W>(
405    writer: &mut W,
406    budget: &Budget,
407    requests: &mut [WriteRequest],
408) -> Result<(), ()>
409where
410    W: AsyncWrite + Unpin,
411{
412    let mut start = 0;
413    while start < requests.len() {
414        let end = batch_end(requests, start);
415        match write_batch(writer, &requests[start..end]).await {
416            Ok(written) => {
417                debug_assert_eq!(written, end - start);
418                complete_ok(budget, &mut requests[start..end]);
419            }
420            Err((written, err)) => {
421                complete_ok(budget, &mut requests[start..start + written]);
422                if start + written < end {
423                    complete_request(
424                        budget,
425                        &mut requests[start + written],
426                        Err(WriteError::Io(err)),
427                    );
428                }
429                if start + written + 1 < requests.len() {
430                    complete_closed(budget, &mut requests[start + written + 1..]);
431                }
432                return Err(());
433            }
434        }
435        start = end;
436    }
437
438    Ok(())
439}
440
441fn batch_end(requests: &[WriteRequest], start: usize) -> usize {
442    let mut bytes = 0usize;
443    let mut end = start;
444    while end < requests.len() && end - start < MAX_BATCH_ITEMS {
445        let request_len = requests[end].bytes.len();
446        if end > start && bytes.saturating_add(request_len) > MAX_BATCH_BYTES {
447            break;
448        }
449        bytes = bytes.saturating_add(request_len);
450        end += 1;
451        if bytes >= MAX_BATCH_BYTES {
452            break;
453        }
454    }
455    end.max(start + 1)
456}
457
458async fn write_batch<W>(
459    writer: &mut W,
460    requests: &[WriteRequest],
461) -> Result<usize, (usize, io::Error)>
462where
463    W: AsyncWrite + Unpin,
464{
465    if let [request] = requests {
466        writer
467            .write_all(&request.bytes)
468            .await
469            .map_err(|err| (0, err))?;
470        return Ok(1);
471    }
472
473    let mut request_index = 0;
474    let mut offset = 0;
475    let mut slices = [IoSlice::new(&[]); MAX_BATCH_ITEMS];
476
477    loop {
478        while request_index < requests.len() && offset == requests[request_index].bytes.len() {
479            request_index += 1;
480            offset = 0;
481        }
482        if request_index == requests.len() {
483            return Ok(requests.len());
484        }
485
486        let slice_count = fill_io_slices(&mut slices, requests, request_index, offset);
487        let written = writer
488            .write_vectored(&slices[..slice_count])
489            .await
490            .map_err(|err| (request_index, err))?;
491        if written == 0 {
492            return Err((
493                request_index,
494                io::Error::new(io::ErrorKind::WriteZero, "failed to write batch"),
495            ));
496        }
497
498        let mut remaining = written;
499        while remaining > 0 && request_index < requests.len() {
500            let available = requests[request_index].bytes.len() - offset;
501            if remaining < available {
502                offset += remaining;
503                break;
504            }
505            remaining -= available;
506            request_index += 1;
507            offset = 0;
508        }
509    }
510}
511
512fn fill_io_slices<'a>(
513    slices: &mut [IoSlice<'a>; MAX_BATCH_ITEMS],
514    requests: &'a [WriteRequest],
515    request_index: usize,
516    offset: usize,
517) -> usize {
518    let mut slice_count = 0;
519    for (index, request) in requests[request_index..].iter().enumerate() {
520        let bytes = if index == 0 {
521            &request.bytes[offset..]
522        } else {
523            &request.bytes
524        };
525        slices[slice_count] = IoSlice::new(bytes);
526        slice_count += 1;
527    }
528    slice_count
529}
530
531fn complete_ok(budget: &Budget, requests: &mut [WriteRequest]) {
532    for request in requests {
533        complete_request(budget, request, Ok(()));
534    }
535}
536
537fn complete_closed(budget: &Budget, requests: &mut [WriteRequest]) {
538    for request in requests {
539        complete_request(budget, request, Err(WriteError::Closed));
540    }
541}
542
543fn complete_request(budget: &Budget, request: &mut WriteRequest, result: Result<(), WriteError>) {
544    budget.release(request.budget_bytes);
545    request.budget_bytes = 0;
546    if let Some(completion) = request.completion.take() {
547        let _ = completion.send(WriteCompletion { result });
548    }
549}
550
551fn drain_closed(budget: &Budget, rx: &mut mpsc::Receiver<WriteMessage>) {
552    while let Ok(message) = rx.try_recv() {
553        if let WriteMessage::Write(mut request) = message {
554            complete_request(budget, &mut request, Err(WriteError::Closed));
555        }
556    }
557}
558
559#[cfg(test)]
560mod tests {
561    use bytes::Bytes;
562    use std::pin::Pin;
563    use std::sync::atomic::{AtomicUsize, Ordering};
564    use std::sync::{Arc, Mutex};
565    use std::task::{Context, Poll};
566    use tokio::io::AsyncReadExt;
567
568    use super::*;
569
570    #[tokio::test]
571    async fn writes_owned_bytes_in_order() {
572        let (client, mut server) = tokio::io::duplex(64);
573        let handoff = WriteHandoff::spawn(client, WriteHandoffConfig::new(4, 64));
574
575        let first = handoff
576            .try_write(Bytes::from_static(b"abc"))
577            .expect("first handoff");
578        let second = handoff
579            .try_write(Bytes::from_static(b"def"))
580            .expect("second handoff");
581
582        first.wait().await.expect("first write completes");
583        second.wait().await.expect("second write completes");
584
585        let mut out = [0_u8; 6];
586        server
587            .read_exact(&mut out)
588            .await
589            .expect("read written bytes");
590        assert_eq!(&out, b"abcdef");
591        assert_eq!(handoff.pending_bytes(), 0);
592    }
593
594    #[tokio::test]
595    async fn try_write_reports_byte_backpressure_without_losing_bytes() {
596        let (client, _server) = tokio::io::duplex(64);
597        let handoff = WriteHandoff::spawn(client, WriteHandoffConfig::new(4, 3));
598
599        let err = handoff
600            .try_write(Bytes::from_static(b"abcd"))
601            .expect_err("over budget");
602        assert_eq!(err.into_bytes(), Bytes::from_static(b"abcd"));
603        assert_eq!(handoff.pending_bytes(), 0);
604    }
605
606    #[tokio::test]
607    async fn async_write_reserves_byte_budget() {
608        let (client, mut server) = tokio::io::duplex(64);
609        let handoff = WriteHandoff::spawn(client, WriteHandoffConfig::new(4, 8));
610
611        let ticket = handoff
612            .write(Bytes::from_static(b"hello"))
613            .await
614            .expect("handoff");
615        assert!(handoff.pending_bytes() <= 8);
616        ticket.wait().await.expect("completion");
617
618        let mut out = [0_u8; 5];
619        server
620            .read_exact(&mut out)
621            .await
622            .expect("read written bytes");
623        assert_eq!(&out, b"hello");
624    }
625
626    #[tokio::test]
627    async fn fire_and_forget_writes_without_completion_ticket() {
628        let (client, mut server) = tokio::io::duplex(64);
629        let handoff = WriteHandoff::spawn(client, WriteHandoffConfig::new(4, 64));
630
631        handoff
632            .try_write_fire_and_forget(Bytes::from_static(b"abc"))
633            .expect("fire-and-forget handoff");
634        handoff
635            .try_write_fire_and_forget(Bytes::from_static(b"def"))
636            .expect("fire-and-forget handoff");
637
638        let mut out = [0_u8; 6];
639        server
640            .read_exact(&mut out)
641            .await
642            .expect("read written bytes");
643        assert_eq!(&out, b"abcdef");
644    }
645
646    #[tokio::test]
647    async fn async_fire_and_forget_waits_for_budget_without_completion_ticket() {
648        let (client, mut server) = tokio::io::duplex(64);
649        let handoff = WriteHandoff::spawn(client, WriteHandoffConfig::new(4, 4));
650
651        handoff
652            .write_fire_and_forget(Bytes::from_static(b"abcd"))
653            .await
654            .expect("first write fits budget");
655        assert_eq!(handoff.pending_bytes(), 4);
656
657        let second = {
658            let handoff = handoff.clone();
659            tokio::spawn(async move {
660                handoff
661                    .write_fire_and_forget(Bytes::from_static(b"efgh"))
662                    .await
663                    .expect("second write waits for budget");
664            })
665        };
666
667        let mut first = [0_u8; 4];
668        server
669            .read_exact(&mut first)
670            .await
671            .expect("read first write");
672        assert_eq!(&first, b"abcd");
673
674        second.await.expect("second write task joins");
675        let mut second = [0_u8; 4];
676        server
677            .read_exact(&mut second)
678            .await
679            .expect("read second write");
680        assert_eq!(&second, b"efgh");
681        assert_eq!(handoff.pending_bytes(), 0);
682    }
683
684    #[tokio::test]
685    async fn close_rejects_new_writes() {
686        let (client, _server) = tokio::io::duplex(64);
687        let handoff = WriteHandoff::spawn(client, WriteHandoffConfig::new(4, 64));
688
689        handoff.close();
690
691        let err = handoff
692            .try_write(Bytes::from_static(b"closed"))
693            .expect_err("closed handoff rejects writes");
694        assert_eq!(err.into_bytes(), Bytes::from_static(b"closed"));
695        assert!(matches!(
696            handoff.write(Bytes::from_static(b"closed")).await,
697            Err(WriteError::Closed)
698        ));
699    }
700
701    #[tokio::test]
702    async fn request_batches_use_vectored_writes() {
703        let writer = CountingWriter::default();
704        let calls = writer.vectored_calls.clone();
705        let output = writer.output.clone();
706        let budget = Budget::new(64);
707        let mut requests = vec![
708            request(Bytes::from_static(b"abc"), &budget),
709            request(Bytes::from_static(b"def"), &budget),
710            request(Bytes::from_static(b"ghi"), &budget),
711        ];
712
713        let mut writer = writer;
714        write_request_batches(&mut writer, &budget, &mut requests)
715            .await
716            .expect("batch writes");
717
718        assert_eq!(calls.load(Ordering::SeqCst), 1);
719        assert_eq!(&*output.lock().expect("output mutex"), b"abcdefghi");
720        assert_eq!(budget.pending(), 0);
721    }
722
723    fn request(bytes: Bytes, budget: &Budget) -> WriteRequest {
724        let budget_bytes = budget
725            .try_acquire(bytes.len())
726            .expect("test budget has capacity");
727        WriteRequest {
728            bytes,
729            completion: None,
730            budget_bytes,
731        }
732    }
733
734    #[derive(Default)]
735    struct CountingWriter {
736        output: Arc<Mutex<Vec<u8>>>,
737        vectored_calls: Arc<AtomicUsize>,
738    }
739
740    impl AsyncWrite for CountingWriter {
741        fn poll_write(
742            self: Pin<&mut Self>,
743            _cx: &mut Context<'_>,
744            buf: &[u8],
745        ) -> Poll<io::Result<usize>> {
746            self.output
747                .lock()
748                .expect("output mutex")
749                .extend_from_slice(buf);
750            Poll::Ready(Ok(buf.len()))
751        }
752
753        fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
754            Poll::Ready(Ok(()))
755        }
756
757        fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
758            Poll::Ready(Ok(()))
759        }
760
761        fn poll_write_vectored(
762            self: Pin<&mut Self>,
763            _cx: &mut Context<'_>,
764            bufs: &[IoSlice<'_>],
765        ) -> Poll<io::Result<usize>> {
766            self.vectored_calls.fetch_add(1, Ordering::SeqCst);
767            let mut output = self.output.lock().expect("output mutex");
768            let mut written = 0;
769            for buf in bufs {
770                output.extend_from_slice(buf);
771                written += buf.len();
772            }
773            Poll::Ready(Ok(written))
774        }
775
776        fn is_write_vectored(&self) -> bool {
777            true
778        }
779    }
780}