summavy/directory/
composite_file.rs

1use std::collections::HashMap;
2use std::io::{self, Read, Write};
3use std::iter::ExactSizeIterator;
4use std::ops::Range;
5
6use common::{BinarySerializable, CountingWriter, HasLen, VInt};
7
8use crate::directory::{FileSlice, TerminatingWrite, WritePtr};
9use crate::schema::Field;
10use crate::space_usage::{FieldUsage, PerFieldSpaceUsage};
11
12#[derive(Eq, PartialEq, Hash, Copy, Ord, PartialOrd, Clone, Debug)]
13pub struct FileAddr {
14    field: Field,
15    idx: usize,
16}
17
18impl FileAddr {
19    fn new(field: Field, idx: usize) -> FileAddr {
20        FileAddr { field, idx }
21    }
22}
23
24impl BinarySerializable for FileAddr {
25    fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
26        self.field.serialize(writer)?;
27        VInt(self.idx as u64).serialize(writer)?;
28        Ok(())
29    }
30
31    fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
32        let field = Field::deserialize(reader)?;
33        let idx = VInt::deserialize(reader)?.0 as usize;
34        Ok(FileAddr { field, idx })
35    }
36}
37
38/// A `CompositeWrite` is used to write a `CompositeFile`.
39pub struct CompositeWrite<W = WritePtr> {
40    write: CountingWriter<W>,
41    offsets: Vec<(FileAddr, u64)>,
42}
43
44impl<W: TerminatingWrite + Write> CompositeWrite<W> {
45    /// Crate a new API writer that writes a composite file
46    /// in a given write.
47    pub fn wrap(w: W) -> CompositeWrite<W> {
48        CompositeWrite {
49            write: CountingWriter::wrap(w),
50            offsets: Vec::new(),
51        }
52    }
53
54    /// Start writing a new field.
55    pub fn for_field(&mut self, field: Field) -> &mut CountingWriter<W> {
56        self.for_field_with_idx(field, 0)
57    }
58
59    /// Start writing a new field.
60    pub fn for_field_with_idx(&mut self, field: Field, idx: usize) -> &mut CountingWriter<W> {
61        let offset = self.write.written_bytes();
62        let file_addr = FileAddr::new(field, idx);
63        assert!(!self.offsets.iter().any(|el| el.0 == file_addr));
64        self.offsets.push((file_addr, offset));
65        &mut self.write
66    }
67
68    /// Close the composite file
69    ///
70    /// An index of the different field offsets
71    /// will be written as a footer.
72    pub fn close(mut self) -> io::Result<()> {
73        let footer_offset = self.write.written_bytes();
74        VInt(self.offsets.len() as u64).serialize(&mut self.write)?;
75
76        let mut prev_offset = 0;
77        for (file_addr, offset) in self.offsets {
78            VInt(offset - prev_offset).serialize(&mut self.write)?;
79            file_addr.serialize(&mut self.write)?;
80            prev_offset = offset;
81        }
82
83        let footer_len = (self.write.written_bytes() - footer_offset) as u32;
84        footer_len.serialize(&mut self.write)?;
85        self.write.terminate()
86    }
87}
88
89/// A composite file is an abstraction to store a
90/// file partitioned by field.
91///
92/// The file needs to be written field by field.
93/// A footer describes the start and stop offsets
94/// for each field.
95#[derive(Clone)]
96pub struct CompositeFile {
97    data: FileSlice,
98    offsets_index: HashMap<FileAddr, Range<usize>>,
99}
100
101impl std::fmt::Debug for CompositeFile {
102    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
103        f.debug_struct("CompositeFile")
104            .field("offsets_index", &self.offsets_index)
105            .finish()
106    }
107}
108
109impl CompositeFile {
110    /// Opens a composite file stored in a given
111    /// `FileSlice`.
112    pub fn open(data: &FileSlice) -> io::Result<CompositeFile> {
113        let end = data.len();
114        let footer_len_data = data.slice_from(end - 4).read_bytes()?;
115        let footer_len = u32::deserialize(&mut footer_len_data.as_slice())? as usize;
116        let footer_start = end - 4 - footer_len;
117        let footer_data = data
118            .slice(footer_start..footer_start + footer_len)
119            .read_bytes()?;
120        let mut footer_buffer = footer_data.as_slice();
121        let num_fields = VInt::deserialize(&mut footer_buffer)?.0 as usize;
122
123        let mut file_addrs = vec![];
124        let mut offsets = vec![];
125        let mut field_index = HashMap::new();
126
127        let mut offset = 0;
128        for _ in 0..num_fields {
129            offset += VInt::deserialize(&mut footer_buffer)?.0 as usize;
130            let file_addr = FileAddr::deserialize(&mut footer_buffer)?;
131            offsets.push(offset);
132            file_addrs.push(file_addr);
133        }
134        offsets.push(footer_start);
135        for i in 0..num_fields {
136            let file_addr = file_addrs[i];
137            let start_offset = offsets[i];
138            let end_offset = offsets[i + 1];
139            field_index.insert(file_addr, start_offset..end_offset);
140        }
141
142        Ok(CompositeFile {
143            data: data.slice_to(footer_start),
144            offsets_index: field_index,
145        })
146    }
147
148    /// Returns a composite file that stores
149    /// no fields.
150    pub fn empty() -> CompositeFile {
151        CompositeFile {
152            offsets_index: HashMap::new(),
153            data: FileSlice::empty(),
154        }
155    }
156
157    /// Returns the `FileSlice` associated with
158    /// a given `Field` and stored in a `CompositeFile`.
159    pub fn open_read(&self, field: Field) -> Option<FileSlice> {
160        self.open_read_with_idx(field, 0)
161    }
162
163    /// Returns the `FileSlice` associated with
164    /// a given `Field` and stored in a `CompositeFile`.
165    pub fn open_read_with_idx(&self, field: Field, idx: usize) -> Option<FileSlice> {
166        self.offsets_index
167            .get(&FileAddr { field, idx })
168            .map(|byte_range| self.data.slice(byte_range.clone()))
169    }
170
171    pub fn space_usage(&self) -> PerFieldSpaceUsage {
172        let mut fields = HashMap::new();
173        for (&field_addr, byte_range) in &self.offsets_index {
174            fields
175                .entry(field_addr.field)
176                .or_insert_with(|| FieldUsage::empty(field_addr.field))
177                .add_field_idx(field_addr.idx, byte_range.len());
178        }
179        PerFieldSpaceUsage::new(fields)
180    }
181}
182
183#[cfg(test)]
184mod test {
185
186    use std::io::Write;
187    use std::path::Path;
188
189    use common::{BinarySerializable, VInt};
190
191    use super::{CompositeFile, CompositeWrite};
192    use crate::directory::{Directory, RamDirectory};
193    use crate::schema::Field;
194
195    #[test]
196    fn test_composite_file() -> crate::Result<()> {
197        let path = Path::new("test_path");
198        let directory = RamDirectory::create();
199        {
200            let w = directory.open_write(path).unwrap();
201            let mut composite_write = CompositeWrite::wrap(w);
202            let mut write_0 = composite_write.for_field(Field::from_field_id(0u32));
203            VInt(32431123u64).serialize(&mut write_0)?;
204            write_0.flush()?;
205            let mut write_4 = composite_write.for_field(Field::from_field_id(4u32));
206            VInt(2).serialize(&mut write_4)?;
207            write_4.flush()?;
208            composite_write.close()?;
209        }
210        {
211            let r = directory.open_read(path)?;
212            let composite_file = CompositeFile::open(&r)?;
213            {
214                let file0 = composite_file
215                    .open_read(Field::from_field_id(0u32))
216                    .unwrap()
217                    .read_bytes()?;
218                let mut file0_buf = file0.as_slice();
219                let payload_0 = VInt::deserialize(&mut file0_buf)?.0;
220                assert_eq!(file0_buf.len(), 0);
221                assert_eq!(payload_0, 32431123u64);
222            }
223            {
224                let file4 = composite_file
225                    .open_read(Field::from_field_id(4u32))
226                    .unwrap()
227                    .read_bytes()?;
228                let mut file4_buf = file4.as_slice();
229                let payload_4 = VInt::deserialize(&mut file4_buf)?.0;
230                assert_eq!(file4_buf.len(), 0);
231                assert_eq!(payload_4, 2u64);
232            }
233        }
234        Ok(())
235    }
236
237    #[test]
238    fn test_composite_file_bug() -> crate::Result<()> {
239        let path = Path::new("test_path");
240        let directory = RamDirectory::create();
241        {
242            let w = directory.open_write(path).unwrap();
243            let mut composite_write = CompositeWrite::wrap(w);
244            let mut write = composite_write.for_field_with_idx(Field::from_field_id(1u32), 0);
245            VInt(32431123u64).serialize(&mut write)?;
246            write.flush()?;
247            let write = composite_write.for_field_with_idx(Field::from_field_id(1u32), 1);
248            write.flush()?;
249
250            let mut write = composite_write.for_field_with_idx(Field::from_field_id(0u32), 0);
251            VInt(1_000_000).serialize(&mut write)?;
252            write.flush()?;
253
254            composite_write.close()?;
255        }
256        {
257            let r = directory.open_read(path)?;
258            let composite_file = CompositeFile::open(&r)?;
259            {
260                let file = composite_file
261                    .open_read_with_idx(Field::from_field_id(1u32), 0)
262                    .unwrap()
263                    .read_bytes()?;
264                let mut file0_buf = file.as_slice();
265                let payload_0 = VInt::deserialize(&mut file0_buf)?.0;
266                assert_eq!(file0_buf.len(), 0);
267                assert_eq!(payload_0, 32431123u64);
268            }
269            {
270                let file = composite_file
271                    .open_read_with_idx(Field::from_field_id(1u32), 1)
272                    .unwrap()
273                    .read_bytes()?;
274                let file = file.as_slice();
275                assert_eq!(file.len(), 0);
276            }
277            {
278                let file = composite_file
279                    .open_read_with_idx(Field::from_field_id(0u32), 0)
280                    .unwrap()
281                    .read_bytes()?;
282                let file = file.as_slice();
283                assert_eq!(file.len(), 3);
284            }
285        }
286        Ok(())
287    }
288}