channels_io/transaction/
buffered.rs

1use core::pin::Pin;
2use core::task::{ready, Context, Poll};
3
4use alloc::vec::Vec;
5
6use pin_project::pin_project;
7
8use crate::buf::Cursor;
9use crate::transaction::{AsyncWriteTransaction, WriteTransaction};
10use crate::{AsyncWrite, AsyncWriteExt, Write, WriteExt};
11
12/// A buffered write transaction.
13///
14/// Data written to this transaction will be stored in an internal `Vec<u8>` and
15/// will be written to the underlying writer when it is finished. Flushing the
16/// transaction will instruct it to also flush the underlying writer after it writes
17/// the data to it.
18#[derive(Debug)]
19#[pin_project]
20pub struct Buffered<'a, W> {
21	#[pin]
22	writer: W,
23	buf: Cursor<&'a mut Vec<u8>>,
24	wants_flush: bool,
25}
26
27impl<'a, W> Buffered<'a, W> {
28	/// Create a new [`Buffered`] transaction.
29	///
30	/// `buf` is where data will be buffered in before it is written out.
31	///
32	/// # Panics
33	///
34	/// If `buf` is not empty.
35	pub fn new(writer: W, buf: &'a mut Vec<u8>) -> Self {
36		assert!(buf.is_empty(), "buf should be empty");
37		Self { writer, buf: Cursor::new(buf), wants_flush: false }
38	}
39
40	/// Get a reference to the underlying writer.
41	#[must_use]
42	pub fn writer(&self) -> &W {
43		&self.writer
44	}
45
46	/// Get a mutable reference to the underlying writer.
47	#[must_use]
48	pub fn writer_mut(&mut self) -> &mut W {
49		&mut self.writer
50	}
51
52	/// Get a pinned mutable reference to the underlying writer.
53	#[must_use]
54	pub fn writer_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W> {
55		self.project().writer
56	}
57}
58
59impl<'a, W> Write for Buffered<'a, W>
60where
61	W: Write,
62{
63	type Error = W::Error;
64
65	fn write_slice(
66		&mut self,
67		buf: &[u8],
68	) -> Result<usize, Self::Error> {
69		self.buf.get_mut().extend_from_slice(buf);
70		Ok(buf.len())
71	}
72
73	fn flush_once(&mut self) -> Result<(), Self::Error> {
74		self.wants_flush = true;
75		Ok(())
76	}
77}
78
79impl<'a, W> WriteTransaction for Buffered<'a, W>
80where
81	W: Write,
82{
83	fn finish(self) -> Result<(), Self::Error> {
84		let Self { buf, wants_flush, mut writer } = self;
85
86		writer.write_buf_all(buf)?;
87
88		if wants_flush {
89			writer.flush()?;
90		}
91
92		Ok(())
93	}
94}
95
96impl<'a, W> AsyncWrite for Buffered<'a, W>
97where
98	W: AsyncWrite,
99{
100	type Error = W::Error;
101
102	fn poll_write_slice(
103		self: Pin<&mut Self>,
104		_: &mut Context,
105		buf: &[u8],
106	) -> Poll<Result<usize, Self::Error>> {
107		let this = self.project();
108		this.buf.get_mut().extend_from_slice(buf);
109		Poll::Ready(Ok(buf.len()))
110	}
111
112	fn poll_flush_once(
113		self: Pin<&mut Self>,
114		_: &mut Context,
115	) -> Poll<Result<(), Self::Error>> {
116		let this = self.project();
117		*this.wants_flush = true;
118		Poll::Ready(Ok(()))
119	}
120}
121
122impl<'a, W> AsyncWriteTransaction for Buffered<'a, W>
123where
124	W: AsyncWrite,
125{
126	fn poll_finish(
127		self: Pin<&mut Self>,
128		cx: &mut Context,
129	) -> Poll<Result<(), Self::Error>> {
130		let mut this = self.project();
131
132		ready!(this
133			.writer
134			.as_mut()
135			.poll_write_buf_all(cx, this.buf))?;
136
137		if *this.wants_flush {
138			ready!(this.writer.poll_flush(cx))?;
139		}
140
141		Poll::Ready(Ok(()))
142	}
143}