channels_io/transaction/
buffered.rs1use core::pin::Pin;
2use core::task::{ready, Context, Poll};
3
4use alloc::vec::Vec;
5
6use pin_project::pin_project;
7
8use crate::buf::Cursor;
9use crate::transaction::{AsyncWriteTransaction, WriteTransaction};
10use crate::{AsyncWrite, AsyncWriteExt, Write, WriteExt};
11
12#[derive(Debug)]
19#[pin_project]
20pub struct Buffered<'a, W> {
21 #[pin]
22 writer: W,
23 buf: Cursor<&'a mut Vec<u8>>,
24 wants_flush: bool,
25}
26
27impl<'a, W> Buffered<'a, W> {
28 pub fn new(writer: W, buf: &'a mut Vec<u8>) -> Self {
36 assert!(buf.is_empty(), "buf should be empty");
37 Self { writer, buf: Cursor::new(buf), wants_flush: false }
38 }
39
40 #[must_use]
42 pub fn writer(&self) -> &W {
43 &self.writer
44 }
45
46 #[must_use]
48 pub fn writer_mut(&mut self) -> &mut W {
49 &mut self.writer
50 }
51
52 #[must_use]
54 pub fn writer_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W> {
55 self.project().writer
56 }
57}
58
59impl<'a, W> Write for Buffered<'a, W>
60where
61 W: Write,
62{
63 type Error = W::Error;
64
65 fn write_slice(
66 &mut self,
67 buf: &[u8],
68 ) -> Result<usize, Self::Error> {
69 self.buf.get_mut().extend_from_slice(buf);
70 Ok(buf.len())
71 }
72
73 fn flush_once(&mut self) -> Result<(), Self::Error> {
74 self.wants_flush = true;
75 Ok(())
76 }
77}
78
79impl<'a, W> WriteTransaction for Buffered<'a, W>
80where
81 W: Write,
82{
83 fn finish(self) -> Result<(), Self::Error> {
84 let Self { buf, wants_flush, mut writer } = self;
85
86 writer.write_buf_all(buf)?;
87
88 if wants_flush {
89 writer.flush()?;
90 }
91
92 Ok(())
93 }
94}
95
96impl<'a, W> AsyncWrite for Buffered<'a, W>
97where
98 W: AsyncWrite,
99{
100 type Error = W::Error;
101
102 fn poll_write_slice(
103 self: Pin<&mut Self>,
104 _: &mut Context,
105 buf: &[u8],
106 ) -> Poll<Result<usize, Self::Error>> {
107 let this = self.project();
108 this.buf.get_mut().extend_from_slice(buf);
109 Poll::Ready(Ok(buf.len()))
110 }
111
112 fn poll_flush_once(
113 self: Pin<&mut Self>,
114 _: &mut Context,
115 ) -> Poll<Result<(), Self::Error>> {
116 let this = self.project();
117 *this.wants_flush = true;
118 Poll::Ready(Ok(()))
119 }
120}
121
122impl<'a, W> AsyncWriteTransaction for Buffered<'a, W>
123where
124 W: AsyncWrite,
125{
126 fn poll_finish(
127 self: Pin<&mut Self>,
128 cx: &mut Context,
129 ) -> Poll<Result<(), Self::Error>> {
130 let mut this = self.project();
131
132 ready!(this
133 .writer
134 .as_mut()
135 .poll_write_buf_all(cx, this.buf))?;
136
137 if *this.wants_flush {
138 ready!(this.writer.poll_flush(cx))?;
139 }
140
141 Poll::Ready(Ok(()))
142 }
143}