librapidarchive/
blocking.rs1use std::io;
5use std::io::Write;
6
7use crate::spanning::{RecoverableWrite, DataZone, DataZoneStream};
8use crate::fs::ArchivalSink;
9
10pub struct BlockingWriter<W, P = u64> where P: Clone + PartialEq {
13 blocking_factor: usize,
14 inner: W,
15 block: Vec<u8>,
16 datazone_stream: DataZoneStream<P>
17}
18
19impl<W: Write, P> BlockingWriter<W, P> where P: Clone + PartialEq {
20 pub fn new(inner: W) -> BlockingWriter<W, P> {
21 BlockingWriter {
22 inner: inner,
23 blocking_factor: 20 * 512,
24 block: Vec::with_capacity(20 * 512),
25 datazone_stream: DataZoneStream::new()
26 }
27 }
28
29 pub fn new_with_factor(inner: W, factor: usize) -> BlockingWriter<W, P> {
30 BlockingWriter {
31 inner: inner,
32 blocking_factor: factor * 512,
33 block: Vec::with_capacity(factor * 512),
34 datazone_stream: DataZoneStream::new()
35 }
36 }
37
38 pub fn as_inner_writer<'a>(&'a self) -> &'a W {
39 &self.inner
40 }
41
42 fn fill_block<'a>(&mut self, buf: &'a [u8]) -> Option<&'a [u8]> {
51 let block_space = self.blocking_factor - self.block.len();
52
53 if block_space >= buf.len() {
54 self.block.extend(buf);
55
56 self.datazone_stream.write_buffered(buf.len() as u64);
57
58 return None;
59 }
60
61 self.block.extend(&buf[0..block_space]);
62 self.datazone_stream.write_buffered(block_space as u64);
63
64 Some(&buf[block_space..])
65 }
66
67 fn empty_block<'a>(&mut self) -> io::Result<()> {
77 if self.block.len() >= self.blocking_factor {
78 self.inner.write_all(&self.block[..self.blocking_factor])?;
79 self.datazone_stream.write_committed(self.blocking_factor as u64);
80
81 unsafe { self.block.set_len(0); }
85 }
86
87 Ok(())
88 }
89}
90
91impl<W:Write, P> RecoverableWrite<P> for BlockingWriter<W, P> where P: Clone + PartialEq, W: RecoverableWrite<P> {
92 fn begin_data_zone(&mut self, ident: P) {
93 self.datazone_stream.begin_data_zone(ident.clone());
94 self.inner.begin_data_zone(ident);
95 }
96
97 fn resume_data_zone(&mut self, ident: P, committed: u64) {
98 self.datazone_stream.resume_data_zone(ident.clone(), committed);
99 self.inner.resume_data_zone(ident, committed);
100 }
101
102 fn end_data_zone(&mut self) {
103 self.datazone_stream.end_data_zone();
104 self.inner.end_data_zone();
105 }
106
107 fn uncommitted_writes(&self) -> Vec<DataZone<P>> {
108 let inner_ucw = self.inner.uncommitted_writes();
109 self.datazone_stream.uncommitted_writes(Some(inner_ucw))
110 }
111}
112
113impl<W:Write, P> ArchivalSink<P> for BlockingWriter<W, P> where W: Send + RecoverableWrite<P>, P: Send + Clone + PartialEq {
114
115}
116
117impl<W:Write, P> Write for BlockingWriter<W, P> where P: Clone + PartialEq, W: RecoverableWrite<P> {
118 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
119 self.empty_block()?;
121
122 if buf.len() == 0 {
124 return Ok(0);
125 }
126
127 let mut shortcircuit_writes = 0;
131 if self.block.len() == 0 && buf.len() >= self.blocking_factor {
132 while shortcircuit_writes <= (buf.len() - self.blocking_factor) {
133 match self.inner.write(&buf[shortcircuit_writes..(shortcircuit_writes + self.blocking_factor)]) {
134 Ok(blk_write) => {
135 shortcircuit_writes += blk_write;
136 self.datazone_stream.write_through(blk_write as u64);
137 }
138 Err(x) => return Err(x)
139 }
140 }
141
142 assert!(shortcircuit_writes > 0);
143 return Ok(shortcircuit_writes);
144 }
145
146 let remain = match self.fill_block(buf) {
148 Some(remain) => remain.len(),
149 None => 0
150 };
151 let write_size = buf.len() - remain;
152
153 assert!(write_size > 0);
154 Ok(write_size)
155 }
156
157 fn flush(&mut self) -> io::Result<()> {
166 self.end_data_zone();
167
168 if self.block.len() < self.blocking_factor {
169 self.block.resize(self.blocking_factor, 0);
170 }
171
172 self.empty_block()?;
173 self.inner.flush()?;
174
175 Ok(())
176 }
177}
178
179#[cfg(test)]
180mod tests {
181 use std::io::{Write, Cursor};
182 use crate::blocking::BlockingWriter;
183 use crate::spanning::{UnbufferedWriter, RecoverableWrite};
184
185 #[test]
186 fn blocking_factor_1_block_passthrough() {
187 let mut blk : BlockingWriter<_, u64> = BlockingWriter::new_with_factor(Cursor::new(vec![]), 1); blk.write_all(&vec![0; 512]).unwrap();
190 blk.write_all(&vec![1; 512]).unwrap();
191
192 assert_eq!(blk.as_inner_writer().get_ref().len(), 1024);
193 assert_eq!(&blk.as_inner_writer().get_ref()[0..512], vec![0 as u8; 512].as_slice());
194 assert_eq!(&blk.as_inner_writer().get_ref()[512..], vec![1 as u8; 512].as_slice());
195 }
196
197 #[test]
198 fn blocking_factor_1_record_splitting() {
199 let mut blk : BlockingWriter<_, u64> = BlockingWriter::new_with_factor(Cursor::new(vec![]), 1); blk.write_all(&vec![0; 384]).unwrap();
202 blk.write_all(&vec![1; 384]).unwrap();
203
204 assert_eq!(blk.as_inner_writer().get_ref().len(), 512);
205 assert_eq!(&blk.as_inner_writer().get_ref()[0..384], vec![0; 384].as_slice());
206 assert_eq!(&blk.as_inner_writer().get_ref()[384..], vec![1; 128].as_slice());
207
208 blk.write_all(&vec![2; 384]).unwrap();
209 blk.flush().unwrap();
210
211 assert_eq!(blk.as_inner_writer().get_ref().len(), 1536);
212 assert_eq!(&blk.as_inner_writer().get_ref()[0..384], vec![0; 384].as_slice());
213 assert_eq!(&blk.as_inner_writer().get_ref()[384..768], vec![1; 384].as_slice());
214 assert_eq!(&blk.as_inner_writer().get_ref()[768..1152], vec![2; 384].as_slice());
215 assert_eq!(&blk.as_inner_writer().get_ref()[1152..], vec![0; 384].as_slice());
216 }
217
218 #[test]
219 fn blocking_factor_1_record_splitting_shortcircuit() {
220 let mut blk : BlockingWriter<_, u64> = BlockingWriter::new_with_factor(Cursor::new(vec![]), 1); blk.write_all(&vec![0; 384]).unwrap();
223 blk.write_all(&vec![1; 1024]).unwrap();
224
225 assert_eq!(blk.as_inner_writer().get_ref().len(), 1024);
226 assert_eq!(&blk.as_inner_writer().get_ref()[0..384], vec![0; 384].as_slice());
227 assert_eq!(&blk.as_inner_writer().get_ref()[384..], vec![1; 640].as_slice());
228
229 blk.write_all(&vec![2; 2048]).unwrap();
230 blk.flush().unwrap();
231
232 assert_eq!(blk.as_inner_writer().get_ref().len(), 3584);
233 assert_eq!(&blk.as_inner_writer().get_ref()[0..384], vec![0; 384].as_slice());
234 assert_eq!(&blk.as_inner_writer().get_ref()[384..1408], vec![1; 1024].as_slice());
235 assert_eq!(&blk.as_inner_writer().get_ref()[1408..3456], vec![2; 2048].as_slice());
236 assert_eq!(&blk.as_inner_writer().get_ref()[3456..], vec![0; 128].as_slice());
237 }
238
239 #[test]
240 fn blocking_factor_4_block_zone_tracking() {
241 let mut blk = BlockingWriter::new_with_factor(UnbufferedWriter::wrap(Cursor::new(vec![])), 4);
242 let ident1 = "ident1";
243 let ident2 = "ident2";
244
245 blk.begin_data_zone(ident1);
246 blk.write_all(&vec![0; 512]).unwrap();
247 blk.begin_data_zone(ident2);
248 blk.write_all(&vec![1; 512]).unwrap();
249
250 let zones = blk.uncommitted_writes();
251
252 assert_eq!(zones.len(), 2);
253 assert_eq!(zones[0].ident, Some(ident1));
254 assert_eq!(zones[0].length, 512);
255 assert_eq!(zones[0].uncommitted_length, 512);
256 assert_eq!(zones[0].committed_length, 0);
257 assert_eq!(zones[1].ident, Some(ident2));
258 assert_eq!(zones[1].length, 512);
259 assert_eq!(zones[1].uncommitted_length, 512);
260 assert_eq!(zones[1].committed_length, 0);
261
262 blk.flush().unwrap();
263
264 let zones_2 = blk.uncommitted_writes();
265
266 assert_eq!(zones_2.len(), 0);
267
268 assert_eq!(blk.as_inner_writer().as_inner_writer().get_ref().len(), 2048);
269 assert_eq!(&blk.as_inner_writer().as_inner_writer().get_ref()[0..512], vec![0 as u8; 512].as_slice());
270 assert_eq!(&blk.as_inner_writer().as_inner_writer().get_ref()[512..1024], vec![1 as u8; 512].as_slice());
271 assert_eq!(&blk.as_inner_writer().as_inner_writer().get_ref()[1024..], vec![0 as u8; 1024].as_slice());
272 }
273}