librapidarchive/
blocking.rs

1//! Buffered writers that work exclusively with one particular block size,
2//! including padding out the last block with null bytes.
3
4use std::io;
5use std::io::Write;
6
7use crate::spanning::{RecoverableWrite, DataZone, DataZoneStream};
8use crate::fs::ArchivalSink;
9
10/// Write implementation that ensures all data written to it is passed along to
11/// it's interior writer in identically-sized buffers of 512 * factor bytes.
12pub struct BlockingWriter<W, P = u64> where P: Clone + PartialEq {
13    blocking_factor: usize,
14    inner: W,
15    block: Vec<u8>,
16    datazone_stream: DataZoneStream<P>
17}
18
19impl<W: Write, P> BlockingWriter<W, P> where P: Clone + PartialEq {
20    pub fn new(inner: W) -> BlockingWriter<W, P> {
21        BlockingWriter {
22            inner: inner,
23            blocking_factor: 20 * 512,
24            block: Vec::with_capacity(20 * 512),
25            datazone_stream: DataZoneStream::new()
26        }
27    }
28    
29    pub fn new_with_factor(inner: W, factor: usize) -> BlockingWriter<W, P> {
30        BlockingWriter {
31            inner: inner,
32            blocking_factor: factor * 512,
33            block: Vec::with_capacity(factor * 512),
34            datazone_stream: DataZoneStream::new()
35        }
36    }
37    
38    pub fn as_inner_writer<'a>(&'a self) -> &'a W {
39        &self.inner
40    }
41    
42    /// Attempts to fill the interior block with as much data as possible.
43    /// 
44    /// # Returns
45    /// 
46    /// If the given data buffer causes the interior data block to exceed it's
47    /// capacity, this function returns a slice of the remaining data.
48    /// 
49    /// Otherwise, returns None.
50    fn fill_block<'a>(&mut self, buf: &'a [u8]) -> Option<&'a [u8]> {
51        let block_space = self.blocking_factor - self.block.len();
52        
53        if block_space >= buf.len() {
54            self.block.extend(buf);
55
56            self.datazone_stream.write_buffered(buf.len() as u64);
57
58            return None;
59        }
60
61        self.block.extend(&buf[0..block_space]);
62        self.datazone_stream.write_buffered(block_space as u64);
63        
64        Some(&buf[block_space..])
65    }
66    
67    /// Forward a full block onto the inner writer.
68    /// 
69    /// Is a null-operation if the block is not full.
70    /// 
71    /// # Returns
72    /// 
73    /// Ok if the write completed successfully (or there was none); Err if it
74    /// didn't. If the block buffer was full it will be empty, otherwise it will
75    /// be unchanged.
76    fn empty_block<'a>(&mut self) -> io::Result<()> {
77        if self.block.len() >= self.blocking_factor {
78            self.inner.write_all(&self.block[..self.blocking_factor])?;
79            self.datazone_stream.write_committed(self.blocking_factor as u64);
80
81            //This is actually safe, because this always acts to shrink
82            //the array, failing to drop values properly is safe (though
83            //bad practice), and u8 doesn't implement Drop anyway.
84            unsafe { self.block.set_len(0); }
85        }
86        
87        Ok(())
88    }
89}
90
91impl<W:Write, P> RecoverableWrite<P> for BlockingWriter<W, P> where P: Clone + PartialEq, W: RecoverableWrite<P> {
92    fn begin_data_zone(&mut self, ident: P) {
93        self.datazone_stream.begin_data_zone(ident.clone());
94        self.inner.begin_data_zone(ident);
95    }
96
97    fn resume_data_zone(&mut self, ident: P, committed: u64) {
98        self.datazone_stream.resume_data_zone(ident.clone(), committed);
99        self.inner.resume_data_zone(ident, committed);
100    }
101
102    fn end_data_zone(&mut self) {
103        self.datazone_stream.end_data_zone();
104        self.inner.end_data_zone();
105    }
106
107    fn uncommitted_writes(&self) -> Vec<DataZone<P>> {
108        let inner_ucw = self.inner.uncommitted_writes();
109        self.datazone_stream.uncommitted_writes(Some(inner_ucw))
110    }
111}
112
113impl<W:Write, P> ArchivalSink<P> for BlockingWriter<W, P> where W: Send + RecoverableWrite<P>, P: Send + Clone + PartialEq {
114    
115}
116
117impl<W:Write, P> Write for BlockingWriter<W, P> where P: Clone + PartialEq, W: RecoverableWrite<P> {
118    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
119        //Precondition: Ensure the write buffer isn't full.
120        self.empty_block()?;
121        
122        //Precondition: Ensure the incoming buffer isn't empty.
123        if buf.len() == 0 {
124            return Ok(0);
125        }
126        
127        //Optimization: If the block buffer is empty, and the incoming data is
128        //larger than a single block, just hand the inner writer slices off the
129        //buffer without copying.
130        let mut shortcircuit_writes = 0;
131        if self.block.len() == 0 && buf.len() >= self.blocking_factor {
132            while shortcircuit_writes <= (buf.len() - self.blocking_factor) {
133                match self.inner.write(&buf[shortcircuit_writes..(shortcircuit_writes + self.blocking_factor)]) {
134                    Ok(blk_write) => {
135                        shortcircuit_writes += blk_write;
136                        self.datazone_stream.write_through(blk_write as u64);
137                    }
138                    Err(x) => return Err(x)
139                }
140            }
141            
142            assert!(shortcircuit_writes > 0);
143            return Ok(shortcircuit_writes);
144        }
145        
146        //Normal path: Buffer incoming data.
147        let remain = match self.fill_block(buf) {
148            Some(remain) => remain.len(),
149            None => 0
150        };
151        let write_size = buf.len() - remain;
152        
153        assert!(write_size > 0);
154        Ok(write_size)
155    }
156    
157    /// Flush the output stream, ensuring that all intermediately buffered
158    /// contents reach their destination.
159    /// 
160    /// Since this is a blocking-based writer, calling flush() may cause zeroes
161    /// to be inserted into the resulting stream. The alternative was to not
162    /// flush intermediary contents, which would result in some data getting
163    /// lost if the client failed to write a correctly divisible number of bytes
164    /// instead.
165    fn flush(&mut self) -> io::Result<()> {
166        self.end_data_zone();
167
168        if self.block.len() < self.blocking_factor {
169            self.block.resize(self.blocking_factor, 0);
170        }
171        
172        self.empty_block()?;
173        self.inner.flush()?;
174        
175        Ok(())
176    }
177}
178
179#[cfg(test)]
180mod tests {
181    use std::io::{Write, Cursor};
182    use crate::blocking::BlockingWriter;
183    use crate::spanning::{UnbufferedWriter, RecoverableWrite};
184    
185    #[test]
186    fn blocking_factor_1_block_passthrough() {
187        let mut blk : BlockingWriter<_, u64> = BlockingWriter::new_with_factor(Cursor::new(vec![]), 1); //1 tar record, or 512 bytes
188        
189        blk.write_all(&vec![0; 512]).unwrap();
190        blk.write_all(&vec![1; 512]).unwrap();
191        
192        assert_eq!(blk.as_inner_writer().get_ref().len(), 1024);
193        assert_eq!(&blk.as_inner_writer().get_ref()[0..512], vec![0 as u8; 512].as_slice());
194        assert_eq!(&blk.as_inner_writer().get_ref()[512..], vec![1 as u8; 512].as_slice());
195    }
196    
197    #[test]
198    fn blocking_factor_1_record_splitting() {
199        let mut blk : BlockingWriter<_, u64> = BlockingWriter::new_with_factor(Cursor::new(vec![]), 1); //1 tar record, or 512 bytes
200        
201        blk.write_all(&vec![0; 384]).unwrap();
202        blk.write_all(&vec![1; 384]).unwrap();
203        
204        assert_eq!(blk.as_inner_writer().get_ref().len(), 512);
205        assert_eq!(&blk.as_inner_writer().get_ref()[0..384], vec![0; 384].as_slice());
206        assert_eq!(&blk.as_inner_writer().get_ref()[384..], vec![1; 128].as_slice());
207        
208        blk.write_all(&vec![2; 384]).unwrap();
209        blk.flush().unwrap();
210        
211        assert_eq!(blk.as_inner_writer().get_ref().len(), 1536);
212        assert_eq!(&blk.as_inner_writer().get_ref()[0..384], vec![0; 384].as_slice());
213        assert_eq!(&blk.as_inner_writer().get_ref()[384..768], vec![1; 384].as_slice());
214        assert_eq!(&blk.as_inner_writer().get_ref()[768..1152], vec![2; 384].as_slice());
215        assert_eq!(&blk.as_inner_writer().get_ref()[1152..], vec![0; 384].as_slice());
216    }
217    
218    #[test]
219    fn blocking_factor_1_record_splitting_shortcircuit() {
220        let mut blk : BlockingWriter<_, u64> = BlockingWriter::new_with_factor(Cursor::new(vec![]), 1); //1 tar record, or 512 bytes
221        
222        blk.write_all(&vec![0; 384]).unwrap();
223        blk.write_all(&vec![1; 1024]).unwrap();
224        
225        assert_eq!(blk.as_inner_writer().get_ref().len(), 1024);
226        assert_eq!(&blk.as_inner_writer().get_ref()[0..384], vec![0; 384].as_slice());
227        assert_eq!(&blk.as_inner_writer().get_ref()[384..], vec![1; 640].as_slice());
228        
229        blk.write_all(&vec![2; 2048]).unwrap();
230        blk.flush().unwrap();
231        
232        assert_eq!(blk.as_inner_writer().get_ref().len(), 3584);
233        assert_eq!(&blk.as_inner_writer().get_ref()[0..384], vec![0; 384].as_slice());
234        assert_eq!(&blk.as_inner_writer().get_ref()[384..1408], vec![1; 1024].as_slice());
235        assert_eq!(&blk.as_inner_writer().get_ref()[1408..3456], vec![2; 2048].as_slice());
236        assert_eq!(&blk.as_inner_writer().get_ref()[3456..], vec![0; 128].as_slice());
237    }
238
239    #[test]
240    fn blocking_factor_4_block_zone_tracking() {
241        let mut blk = BlockingWriter::new_with_factor(UnbufferedWriter::wrap(Cursor::new(vec![])), 4);
242        let ident1 = "ident1";
243        let ident2 = "ident2";
244
245        blk.begin_data_zone(ident1);
246        blk.write_all(&vec![0; 512]).unwrap();
247        blk.begin_data_zone(ident2);
248        blk.write_all(&vec![1; 512]).unwrap();
249
250        let zones = blk.uncommitted_writes();
251
252        assert_eq!(zones.len(), 2);
253        assert_eq!(zones[0].ident, Some(ident1));
254        assert_eq!(zones[0].length, 512);
255        assert_eq!(zones[0].uncommitted_length, 512);
256        assert_eq!(zones[0].committed_length, 0);
257        assert_eq!(zones[1].ident, Some(ident2));
258        assert_eq!(zones[1].length, 512);
259        assert_eq!(zones[1].uncommitted_length, 512);
260        assert_eq!(zones[1].committed_length, 0);
261
262        blk.flush().unwrap();
263
264        let zones_2 = blk.uncommitted_writes();
265
266        assert_eq!(zones_2.len(), 0);
267
268        assert_eq!(blk.as_inner_writer().as_inner_writer().get_ref().len(), 2048);
269        assert_eq!(&blk.as_inner_writer().as_inner_writer().get_ref()[0..512], vec![0 as u8; 512].as_slice());
270        assert_eq!(&blk.as_inner_writer().as_inner_writer().get_ref()[512..1024], vec![1 as u8; 512].as_slice());
271        assert_eq!(&blk.as_inner_writer().as_inner_writer().get_ref()[1024..], vec![0 as u8; 1024].as_slice());
272    }
273}