1use bytes::Bytes;
2use std::io::{self, IoSlice};
3use std::sync::Arc;
4use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
5use tokio::io::{AsyncWrite, AsyncWriteExt};
6use tokio::sync::{Notify, mpsc, oneshot};
7
8use crate::{WriteBackpressure, WriteError};
9
10const MAX_BATCH_ITEMS: usize = 64;
11const MAX_BATCH_BYTES: usize = 1024 * 1024;
12
13#[derive(Clone, Copy, Debug)]
14pub struct WriteHandoffConfig {
15 pub max_items: usize,
16 pub max_pending_bytes: usize,
17}
18
19impl WriteHandoffConfig {
20 pub fn new(max_items: usize, max_pending_bytes: usize) -> Self {
21 Self {
22 max_items,
23 max_pending_bytes,
24 }
25 }
26}
27
28pub struct WriteHandoff {
29 tx: mpsc::Sender<WriteMessage>,
30 budget: Arc<Budget>,
31 closed: Arc<AtomicBool>,
32}
33
34#[derive(Debug)]
35pub struct WriteTicket {
36 rx: oneshot::Receiver<WriteCompletion>,
37}
38
39#[derive(Debug)]
40pub struct WriteCompletion {
41 result: Result<(), WriteError>,
42}
43
44struct WriteRequest {
45 bytes: Bytes,
46 completion: Option<oneshot::Sender<WriteCompletion>>,
47 budget_bytes: usize,
48}
49
50enum WriteMessage {
51 Write(WriteRequest),
52 Shutdown,
53}
54
55struct Budget {
56 pending: AtomicUsize,
57 closed: AtomicBool,
58 notify: Notify,
59 limit: usize,
60}
61
62#[derive(Debug)]
63enum BudgetAcquireError {
64 Closed,
65 LimitExceeded { attempted: usize, limit: usize },
66}
67
68impl WriteHandoff {
69 pub fn spawn<W>(writer: W, config: WriteHandoffConfig) -> Self
70 where
71 W: AsyncWrite + Unpin + Send + 'static,
72 {
73 let (tx, rx) = mpsc::channel(config.max_items);
74 let budget = Arc::new(Budget::new(config.max_pending_bytes));
75 let closed = Arc::new(AtomicBool::new(false));
76 tokio::spawn(writer_loop(writer, rx, closed.clone(), budget.clone()));
77
78 Self { tx, budget, closed }
79 }
80
81 pub fn try_write(&self, bytes: Bytes) -> Result<WriteTicket, WriteBackpressure> {
82 if self.closed.load(Ordering::Acquire) {
83 return Err(WriteBackpressure::closed(bytes));
84 }
85 let permit = match self.budget.try_acquire(bytes.len()) {
86 Ok(permit) => permit,
87 Err(BudgetAcquireError::Closed) => return Err(WriteBackpressure::closed(bytes)),
88 Err(BudgetAcquireError::LimitExceeded { attempted, limit }) => {
89 return Err(WriteBackpressure::byte_budget_exceeded(
90 bytes, attempted, limit,
91 ));
92 }
93 };
94 if self.closed.load(Ordering::Acquire) {
95 self.budget.release(permit);
96 return Err(WriteBackpressure::closed(bytes));
97 }
98
99 let (completion, rx) = oneshot::channel();
100 let request = WriteRequest {
101 bytes,
102 completion: Some(completion),
103 budget_bytes: permit,
104 };
105 match self.tx.try_send(WriteMessage::Write(request)) {
106 Ok(()) => Ok(WriteTicket { rx }),
107 Err(mpsc::error::TrySendError::Full(WriteMessage::Write(mut request))) => {
108 self.budget.release(request.budget_bytes);
109 request.budget_bytes = 0;
110 Err(WriteBackpressure::queue_full(request.bytes))
111 }
112 Err(mpsc::error::TrySendError::Closed(WriteMessage::Write(mut request))) => {
113 self.budget.release(request.budget_bytes);
114 request.budget_bytes = 0;
115 self.closed.store(true, Ordering::Release);
116 self.budget.close();
117 Err(WriteBackpressure::closed(request.bytes))
118 }
119 Err(mpsc::error::TrySendError::Full(WriteMessage::Shutdown))
120 | Err(mpsc::error::TrySendError::Closed(WriteMessage::Shutdown)) => unreachable!(),
121 }
122 }
123
124 pub fn try_write_fire_and_forget(&self, bytes: Bytes) -> Result<(), WriteBackpressure> {
125 if self.closed.load(Ordering::Acquire) {
126 return Err(WriteBackpressure::closed(bytes));
127 }
128 let permit = match self.budget.try_acquire(bytes.len()) {
129 Ok(permit) => permit,
130 Err(BudgetAcquireError::Closed) => return Err(WriteBackpressure::closed(bytes)),
131 Err(BudgetAcquireError::LimitExceeded { attempted, limit }) => {
132 return Err(WriteBackpressure::byte_budget_exceeded(
133 bytes, attempted, limit,
134 ));
135 }
136 };
137 if self.closed.load(Ordering::Acquire) {
138 self.budget.release(permit);
139 return Err(WriteBackpressure::closed(bytes));
140 }
141
142 let request = WriteRequest {
143 bytes,
144 completion: None,
145 budget_bytes: permit,
146 };
147 match self.tx.try_send(WriteMessage::Write(request)) {
148 Ok(()) => Ok(()),
149 Err(mpsc::error::TrySendError::Full(WriteMessage::Write(mut request))) => {
150 self.budget.release(request.budget_bytes);
151 request.budget_bytes = 0;
152 Err(WriteBackpressure::queue_full(request.bytes))
153 }
154 Err(mpsc::error::TrySendError::Closed(WriteMessage::Write(mut request))) => {
155 self.budget.release(request.budget_bytes);
156 request.budget_bytes = 0;
157 self.closed.store(true, Ordering::Release);
158 self.budget.close();
159 Err(WriteBackpressure::closed(request.bytes))
160 }
161 Err(mpsc::error::TrySendError::Full(WriteMessage::Shutdown))
162 | Err(mpsc::error::TrySendError::Closed(WriteMessage::Shutdown)) => unreachable!(),
163 }
164 }
165
166 pub async fn write(&self, bytes: Bytes) -> Result<WriteTicket, WriteError> {
167 if self.closed.load(Ordering::Acquire) {
168 return Err(WriteError::Closed);
169 }
170 let permit = match self.budget.acquire(bytes.len()).await {
171 Ok(permit) => permit,
172 Err(BudgetAcquireError::Closed) => return Err(WriteError::Closed),
173 Err(BudgetAcquireError::LimitExceeded { attempted, limit }) => {
174 return Err(WriteError::ByteBudgetExceeded { attempted, limit });
175 }
176 };
177 if self.closed.load(Ordering::Acquire) {
178 self.budget.release(permit);
179 return Err(WriteError::Closed);
180 }
181
182 let (completion, rx) = oneshot::channel();
183 let request = WriteRequest {
184 bytes,
185 completion: Some(completion),
186 budget_bytes: permit,
187 };
188 if let Err(err) = self.tx.send(WriteMessage::Write(request)).await {
189 let WriteMessage::Write(mut request) = err.0 else {
190 unreachable!();
191 };
192 self.budget.release(request.budget_bytes);
193 request.budget_bytes = 0;
194 self.closed.store(true, Ordering::Release);
195 self.budget.close();
196 return Err(WriteError::Closed);
197 }
198 Ok(WriteTicket { rx })
199 }
200
201 pub async fn write_fire_and_forget(&self, bytes: Bytes) -> Result<(), WriteError> {
202 if self.closed.load(Ordering::Acquire) {
203 return Err(WriteError::Closed);
204 }
205 let permit = match self.budget.acquire(bytes.len()).await {
206 Ok(permit) => permit,
207 Err(BudgetAcquireError::Closed) => return Err(WriteError::Closed),
208 Err(BudgetAcquireError::LimitExceeded { attempted, limit }) => {
209 return Err(WriteError::ByteBudgetExceeded { attempted, limit });
210 }
211 };
212 if self.closed.load(Ordering::Acquire) {
213 self.budget.release(permit);
214 return Err(WriteError::Closed);
215 }
216
217 let request = WriteRequest {
218 bytes,
219 completion: None,
220 budget_bytes: permit,
221 };
222 if let Err(err) = self.tx.send(WriteMessage::Write(request)).await {
223 let WriteMessage::Write(mut request) = err.0 else {
224 unreachable!();
225 };
226 self.budget.release(request.budget_bytes);
227 request.budget_bytes = 0;
228 self.closed.store(true, Ordering::Release);
229 self.budget.close();
230 return Err(WriteError::Closed);
231 }
232 Ok(())
233 }
234
235 pub fn pending_bytes(&self) -> usize {
236 self.budget.pending()
237 }
238
239 pub fn close(&self) {
240 self.closed.store(true, Ordering::Release);
241 self.budget.close();
242 let _ = self.tx.try_send(WriteMessage::Shutdown);
243 }
244}
245
246impl Clone for WriteHandoff {
247 fn clone(&self) -> Self {
248 Self {
249 tx: self.tx.clone(),
250 budget: self.budget.clone(),
251 closed: self.closed.clone(),
252 }
253 }
254}
255
256impl WriteTicket {
257 pub async fn wait(self) -> Result<(), WriteError> {
258 match self.rx.await {
259 Ok(completion) => completion.result,
260 Err(_) => Err(WriteError::Closed),
261 }
262 }
263}
264
265impl Budget {
266 fn new(limit: usize) -> Self {
267 Self {
268 pending: AtomicUsize::new(0),
269 closed: AtomicBool::new(false),
270 notify: Notify::new(),
271 limit,
272 }
273 }
274
275 fn try_acquire(&self, bytes: usize) -> Result<usize, BudgetAcquireError> {
276 if bytes > self.limit {
277 return Err(BudgetAcquireError::LimitExceeded {
278 attempted: self.pending().saturating_add(bytes),
279 limit: self.limit,
280 });
281 }
282 if self.closed.load(Ordering::Acquire) {
283 return Err(BudgetAcquireError::Closed);
284 }
285
286 let mut current = self.pending.load(Ordering::Relaxed);
287 loop {
288 let Some(next) = current.checked_add(bytes) else {
289 return Err(BudgetAcquireError::LimitExceeded {
290 attempted: usize::MAX,
291 limit: self.limit,
292 });
293 };
294 if next > self.limit {
295 return Err(BudgetAcquireError::LimitExceeded {
296 attempted: next,
297 limit: self.limit,
298 });
299 }
300 match self.pending.compare_exchange_weak(
301 current,
302 next,
303 Ordering::AcqRel,
304 Ordering::Relaxed,
305 ) {
306 Ok(_) => return Ok(bytes),
307 Err(actual) => {
308 current = actual;
309 if self.closed.load(Ordering::Acquire) {
310 return Err(BudgetAcquireError::Closed);
311 }
312 }
313 }
314 }
315 }
316
317 async fn acquire(&self, bytes: usize) -> Result<usize, BudgetAcquireError> {
318 if bytes > self.limit {
319 return Err(BudgetAcquireError::LimitExceeded {
320 attempted: self.pending().saturating_add(bytes),
321 limit: self.limit,
322 });
323 }
324
325 loop {
326 let notified = self.notify.notified();
327 match self.try_acquire(bytes) {
328 Ok(acquired) => return Ok(acquired),
329 Err(BudgetAcquireError::Closed) => return Err(BudgetAcquireError::Closed),
330 Err(BudgetAcquireError::LimitExceeded { .. }) => {}
331 }
332 notified.await;
333 }
334 }
335
336 fn release(&self, bytes: usize) {
337 if bytes == 0 {
338 return;
339 }
340 let previous = self.pending.fetch_sub(bytes, Ordering::AcqRel);
341 debug_assert!(previous >= bytes, "released more bytes than acquired");
342 self.notify.notify_waiters();
343 }
344
345 fn pending(&self) -> usize {
346 self.pending.load(Ordering::Acquire)
347 }
348
349 fn close(&self) {
350 self.closed.store(true, Ordering::Release);
351 self.notify.notify_waiters();
352 }
353}
354
355async fn writer_loop<W>(
356 mut writer: W,
357 mut rx: mpsc::Receiver<WriteMessage>,
358 closed: Arc<AtomicBool>,
359 budget: Arc<Budget>,
360) where
361 W: AsyncWrite + Unpin,
362{
363 let mut messages = Vec::with_capacity(MAX_BATCH_ITEMS);
364 let mut requests = Vec::with_capacity(MAX_BATCH_ITEMS);
365
366 loop {
367 messages.clear();
368 let received = rx.recv_many(&mut messages, MAX_BATCH_ITEMS).await;
369 if received == 0 {
370 break;
371 }
372
373 let mut shutdown = false;
374 for message in messages.drain(..) {
375 match message {
376 WriteMessage::Write(request) if !shutdown => requests.push(request),
377 WriteMessage::Write(mut request) => {
378 complete_request(&budget, &mut request, Err(WriteError::Closed));
379 }
380 WriteMessage::Shutdown => shutdown = true,
381 }
382 }
383
384 if write_request_batches(&mut writer, &budget, &mut requests)
385 .await
386 .is_err()
387 {
388 closed.store(true, Ordering::Release);
389 budget.close();
390 drain_closed(&budget, &mut rx);
391 return;
392 }
393 requests.clear();
394
395 if shutdown || (closed.load(Ordering::Acquire) && rx.is_empty()) {
396 break;
397 }
398 }
399 closed.store(true, Ordering::Release);
400 budget.close();
401 drain_closed(&budget, &mut rx);
402}
403
404async fn write_request_batches<W>(
405 writer: &mut W,
406 budget: &Budget,
407 requests: &mut [WriteRequest],
408) -> Result<(), ()>
409where
410 W: AsyncWrite + Unpin,
411{
412 let mut start = 0;
413 while start < requests.len() {
414 let end = batch_end(requests, start);
415 match write_batch(writer, &requests[start..end]).await {
416 Ok(written) => {
417 debug_assert_eq!(written, end - start);
418 complete_ok(budget, &mut requests[start..end]);
419 }
420 Err((written, err)) => {
421 complete_ok(budget, &mut requests[start..start + written]);
422 if start + written < end {
423 complete_request(
424 budget,
425 &mut requests[start + written],
426 Err(WriteError::Io(err)),
427 );
428 }
429 if start + written + 1 < requests.len() {
430 complete_closed(budget, &mut requests[start + written + 1..]);
431 }
432 return Err(());
433 }
434 }
435 start = end;
436 }
437
438 Ok(())
439}
440
441fn batch_end(requests: &[WriteRequest], start: usize) -> usize {
442 let mut bytes = 0usize;
443 let mut end = start;
444 while end < requests.len() && end - start < MAX_BATCH_ITEMS {
445 let request_len = requests[end].bytes.len();
446 if end > start && bytes.saturating_add(request_len) > MAX_BATCH_BYTES {
447 break;
448 }
449 bytes = bytes.saturating_add(request_len);
450 end += 1;
451 if bytes >= MAX_BATCH_BYTES {
452 break;
453 }
454 }
455 end.max(start + 1)
456}
457
458async fn write_batch<W>(
459 writer: &mut W,
460 requests: &[WriteRequest],
461) -> Result<usize, (usize, io::Error)>
462where
463 W: AsyncWrite + Unpin,
464{
465 if let [request] = requests {
466 writer
467 .write_all(&request.bytes)
468 .await
469 .map_err(|err| (0, err))?;
470 return Ok(1);
471 }
472
473 let mut request_index = 0;
474 let mut offset = 0;
475 let mut slices = [IoSlice::new(&[]); MAX_BATCH_ITEMS];
476
477 loop {
478 while request_index < requests.len() && offset == requests[request_index].bytes.len() {
479 request_index += 1;
480 offset = 0;
481 }
482 if request_index == requests.len() {
483 return Ok(requests.len());
484 }
485
486 let slice_count = fill_io_slices(&mut slices, requests, request_index, offset);
487 let written = writer
488 .write_vectored(&slices[..slice_count])
489 .await
490 .map_err(|err| (request_index, err))?;
491 if written == 0 {
492 return Err((
493 request_index,
494 io::Error::new(io::ErrorKind::WriteZero, "failed to write batch"),
495 ));
496 }
497
498 let mut remaining = written;
499 while remaining > 0 && request_index < requests.len() {
500 let available = requests[request_index].bytes.len() - offset;
501 if remaining < available {
502 offset += remaining;
503 break;
504 }
505 remaining -= available;
506 request_index += 1;
507 offset = 0;
508 }
509 }
510}
511
512fn fill_io_slices<'a>(
513 slices: &mut [IoSlice<'a>; MAX_BATCH_ITEMS],
514 requests: &'a [WriteRequest],
515 request_index: usize,
516 offset: usize,
517) -> usize {
518 let mut slice_count = 0;
519 for (index, request) in requests[request_index..].iter().enumerate() {
520 let bytes = if index == 0 {
521 &request.bytes[offset..]
522 } else {
523 &request.bytes
524 };
525 slices[slice_count] = IoSlice::new(bytes);
526 slice_count += 1;
527 }
528 slice_count
529}
530
531fn complete_ok(budget: &Budget, requests: &mut [WriteRequest]) {
532 for request in requests {
533 complete_request(budget, request, Ok(()));
534 }
535}
536
537fn complete_closed(budget: &Budget, requests: &mut [WriteRequest]) {
538 for request in requests {
539 complete_request(budget, request, Err(WriteError::Closed));
540 }
541}
542
543fn complete_request(budget: &Budget, request: &mut WriteRequest, result: Result<(), WriteError>) {
544 budget.release(request.budget_bytes);
545 request.budget_bytes = 0;
546 if let Some(completion) = request.completion.take() {
547 let _ = completion.send(WriteCompletion { result });
548 }
549}
550
551fn drain_closed(budget: &Budget, rx: &mut mpsc::Receiver<WriteMessage>) {
552 while let Ok(message) = rx.try_recv() {
553 if let WriteMessage::Write(mut request) = message {
554 complete_request(budget, &mut request, Err(WriteError::Closed));
555 }
556 }
557}
558
559#[cfg(test)]
560mod tests {
561 use bytes::Bytes;
562 use std::pin::Pin;
563 use std::sync::atomic::{AtomicUsize, Ordering};
564 use std::sync::{Arc, Mutex};
565 use std::task::{Context, Poll};
566 use tokio::io::AsyncReadExt;
567
568 use super::*;
569
570 #[tokio::test]
571 async fn writes_owned_bytes_in_order() {
572 let (client, mut server) = tokio::io::duplex(64);
573 let handoff = WriteHandoff::spawn(client, WriteHandoffConfig::new(4, 64));
574
575 let first = handoff
576 .try_write(Bytes::from_static(b"abc"))
577 .expect("first handoff");
578 let second = handoff
579 .try_write(Bytes::from_static(b"def"))
580 .expect("second handoff");
581
582 first.wait().await.expect("first write completes");
583 second.wait().await.expect("second write completes");
584
585 let mut out = [0_u8; 6];
586 server
587 .read_exact(&mut out)
588 .await
589 .expect("read written bytes");
590 assert_eq!(&out, b"abcdef");
591 assert_eq!(handoff.pending_bytes(), 0);
592 }
593
594 #[tokio::test]
595 async fn try_write_reports_byte_backpressure_without_losing_bytes() {
596 let (client, _server) = tokio::io::duplex(64);
597 let handoff = WriteHandoff::spawn(client, WriteHandoffConfig::new(4, 3));
598
599 let err = handoff
600 .try_write(Bytes::from_static(b"abcd"))
601 .expect_err("over budget");
602 assert_eq!(err.into_bytes(), Bytes::from_static(b"abcd"));
603 assert_eq!(handoff.pending_bytes(), 0);
604 }
605
606 #[tokio::test]
607 async fn async_write_reserves_byte_budget() {
608 let (client, mut server) = tokio::io::duplex(64);
609 let handoff = WriteHandoff::spawn(client, WriteHandoffConfig::new(4, 8));
610
611 let ticket = handoff
612 .write(Bytes::from_static(b"hello"))
613 .await
614 .expect("handoff");
615 assert!(handoff.pending_bytes() <= 8);
616 ticket.wait().await.expect("completion");
617
618 let mut out = [0_u8; 5];
619 server
620 .read_exact(&mut out)
621 .await
622 .expect("read written bytes");
623 assert_eq!(&out, b"hello");
624 }
625
626 #[tokio::test]
627 async fn fire_and_forget_writes_without_completion_ticket() {
628 let (client, mut server) = tokio::io::duplex(64);
629 let handoff = WriteHandoff::spawn(client, WriteHandoffConfig::new(4, 64));
630
631 handoff
632 .try_write_fire_and_forget(Bytes::from_static(b"abc"))
633 .expect("fire-and-forget handoff");
634 handoff
635 .try_write_fire_and_forget(Bytes::from_static(b"def"))
636 .expect("fire-and-forget handoff");
637
638 let mut out = [0_u8; 6];
639 server
640 .read_exact(&mut out)
641 .await
642 .expect("read written bytes");
643 assert_eq!(&out, b"abcdef");
644 }
645
646 #[tokio::test]
647 async fn async_fire_and_forget_waits_for_budget_without_completion_ticket() {
648 let (client, mut server) = tokio::io::duplex(64);
649 let handoff = WriteHandoff::spawn(client, WriteHandoffConfig::new(4, 4));
650
651 handoff
652 .write_fire_and_forget(Bytes::from_static(b"abcd"))
653 .await
654 .expect("first write fits budget");
655 assert_eq!(handoff.pending_bytes(), 4);
656
657 let second = {
658 let handoff = handoff.clone();
659 tokio::spawn(async move {
660 handoff
661 .write_fire_and_forget(Bytes::from_static(b"efgh"))
662 .await
663 .expect("second write waits for budget");
664 })
665 };
666
667 let mut first = [0_u8; 4];
668 server
669 .read_exact(&mut first)
670 .await
671 .expect("read first write");
672 assert_eq!(&first, b"abcd");
673
674 second.await.expect("second write task joins");
675 let mut second = [0_u8; 4];
676 server
677 .read_exact(&mut second)
678 .await
679 .expect("read second write");
680 assert_eq!(&second, b"efgh");
681 assert_eq!(handoff.pending_bytes(), 0);
682 }
683
684 #[tokio::test]
685 async fn close_rejects_new_writes() {
686 let (client, _server) = tokio::io::duplex(64);
687 let handoff = WriteHandoff::spawn(client, WriteHandoffConfig::new(4, 64));
688
689 handoff.close();
690
691 let err = handoff
692 .try_write(Bytes::from_static(b"closed"))
693 .expect_err("closed handoff rejects writes");
694 assert_eq!(err.into_bytes(), Bytes::from_static(b"closed"));
695 assert!(matches!(
696 handoff.write(Bytes::from_static(b"closed")).await,
697 Err(WriteError::Closed)
698 ));
699 }
700
701 #[tokio::test]
702 async fn request_batches_use_vectored_writes() {
703 let writer = CountingWriter::default();
704 let calls = writer.vectored_calls.clone();
705 let output = writer.output.clone();
706 let budget = Budget::new(64);
707 let mut requests = vec![
708 request(Bytes::from_static(b"abc"), &budget),
709 request(Bytes::from_static(b"def"), &budget),
710 request(Bytes::from_static(b"ghi"), &budget),
711 ];
712
713 let mut writer = writer;
714 write_request_batches(&mut writer, &budget, &mut requests)
715 .await
716 .expect("batch writes");
717
718 assert_eq!(calls.load(Ordering::SeqCst), 1);
719 assert_eq!(&*output.lock().expect("output mutex"), b"abcdefghi");
720 assert_eq!(budget.pending(), 0);
721 }
722
723 fn request(bytes: Bytes, budget: &Budget) -> WriteRequest {
724 let budget_bytes = budget
725 .try_acquire(bytes.len())
726 .expect("test budget has capacity");
727 WriteRequest {
728 bytes,
729 completion: None,
730 budget_bytes,
731 }
732 }
733
734 #[derive(Default)]
735 struct CountingWriter {
736 output: Arc<Mutex<Vec<u8>>>,
737 vectored_calls: Arc<AtomicUsize>,
738 }
739
740 impl AsyncWrite for CountingWriter {
741 fn poll_write(
742 self: Pin<&mut Self>,
743 _cx: &mut Context<'_>,
744 buf: &[u8],
745 ) -> Poll<io::Result<usize>> {
746 self.output
747 .lock()
748 .expect("output mutex")
749 .extend_from_slice(buf);
750 Poll::Ready(Ok(buf.len()))
751 }
752
753 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
754 Poll::Ready(Ok(()))
755 }
756
757 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
758 Poll::Ready(Ok(()))
759 }
760
761 fn poll_write_vectored(
762 self: Pin<&mut Self>,
763 _cx: &mut Context<'_>,
764 bufs: &[IoSlice<'_>],
765 ) -> Poll<io::Result<usize>> {
766 self.vectored_calls.fetch_add(1, Ordering::SeqCst);
767 let mut output = self.output.lock().expect("output mutex");
768 let mut written = 0;
769 for buf in bufs {
770 output.extend_from_slice(buf);
771 written += buf.len();
772 }
773 Poll::Ready(Ok(written))
774 }
775
776 fn is_write_vectored(&self) -> bool {
777 true
778 }
779 }
780}