summavy/directory/
composite_file.rs1use std::collections::HashMap;
2use std::io::{self, Read, Write};
3use std::iter::ExactSizeIterator;
4use std::ops::Range;
5
6use common::{BinarySerializable, CountingWriter, HasLen, VInt};
7
8use crate::directory::{FileSlice, TerminatingWrite, WritePtr};
9use crate::schema::Field;
10use crate::space_usage::{FieldUsage, PerFieldSpaceUsage};
11
12#[derive(Eq, PartialEq, Hash, Copy, Ord, PartialOrd, Clone, Debug)]
13pub struct FileAddr {
14 field: Field,
15 idx: usize,
16}
17
18impl FileAddr {
19 fn new(field: Field, idx: usize) -> FileAddr {
20 FileAddr { field, idx }
21 }
22}
23
24impl BinarySerializable for FileAddr {
25 fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
26 self.field.serialize(writer)?;
27 VInt(self.idx as u64).serialize(writer)?;
28 Ok(())
29 }
30
31 fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
32 let field = Field::deserialize(reader)?;
33 let idx = VInt::deserialize(reader)?.0 as usize;
34 Ok(FileAddr { field, idx })
35 }
36}
37
38pub struct CompositeWrite<W = WritePtr> {
40 write: CountingWriter<W>,
41 offsets: Vec<(FileAddr, u64)>,
42}
43
44impl<W: TerminatingWrite + Write> CompositeWrite<W> {
45 pub fn wrap(w: W) -> CompositeWrite<W> {
48 CompositeWrite {
49 write: CountingWriter::wrap(w),
50 offsets: Vec::new(),
51 }
52 }
53
54 pub fn for_field(&mut self, field: Field) -> &mut CountingWriter<W> {
56 self.for_field_with_idx(field, 0)
57 }
58
59 pub fn for_field_with_idx(&mut self, field: Field, idx: usize) -> &mut CountingWriter<W> {
61 let offset = self.write.written_bytes();
62 let file_addr = FileAddr::new(field, idx);
63 assert!(!self.offsets.iter().any(|el| el.0 == file_addr));
64 self.offsets.push((file_addr, offset));
65 &mut self.write
66 }
67
68 pub fn close(mut self) -> io::Result<()> {
73 let footer_offset = self.write.written_bytes();
74 VInt(self.offsets.len() as u64).serialize(&mut self.write)?;
75
76 let mut prev_offset = 0;
77 for (file_addr, offset) in self.offsets {
78 VInt(offset - prev_offset).serialize(&mut self.write)?;
79 file_addr.serialize(&mut self.write)?;
80 prev_offset = offset;
81 }
82
83 let footer_len = (self.write.written_bytes() - footer_offset) as u32;
84 footer_len.serialize(&mut self.write)?;
85 self.write.terminate()
86 }
87}
88
89#[derive(Clone)]
96pub struct CompositeFile {
97 data: FileSlice,
98 offsets_index: HashMap<FileAddr, Range<usize>>,
99}
100
101impl std::fmt::Debug for CompositeFile {
102 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
103 f.debug_struct("CompositeFile")
104 .field("offsets_index", &self.offsets_index)
105 .finish()
106 }
107}
108
109impl CompositeFile {
110 pub fn open(data: &FileSlice) -> io::Result<CompositeFile> {
113 let end = data.len();
114 let footer_len_data = data.slice_from(end - 4).read_bytes()?;
115 let footer_len = u32::deserialize(&mut footer_len_data.as_slice())? as usize;
116 let footer_start = end - 4 - footer_len;
117 let footer_data = data
118 .slice(footer_start..footer_start + footer_len)
119 .read_bytes()?;
120 let mut footer_buffer = footer_data.as_slice();
121 let num_fields = VInt::deserialize(&mut footer_buffer)?.0 as usize;
122
123 let mut file_addrs = vec![];
124 let mut offsets = vec![];
125 let mut field_index = HashMap::new();
126
127 let mut offset = 0;
128 for _ in 0..num_fields {
129 offset += VInt::deserialize(&mut footer_buffer)?.0 as usize;
130 let file_addr = FileAddr::deserialize(&mut footer_buffer)?;
131 offsets.push(offset);
132 file_addrs.push(file_addr);
133 }
134 offsets.push(footer_start);
135 for i in 0..num_fields {
136 let file_addr = file_addrs[i];
137 let start_offset = offsets[i];
138 let end_offset = offsets[i + 1];
139 field_index.insert(file_addr, start_offset..end_offset);
140 }
141
142 Ok(CompositeFile {
143 data: data.slice_to(footer_start),
144 offsets_index: field_index,
145 })
146 }
147
148 pub fn empty() -> CompositeFile {
151 CompositeFile {
152 offsets_index: HashMap::new(),
153 data: FileSlice::empty(),
154 }
155 }
156
157 pub fn open_read(&self, field: Field) -> Option<FileSlice> {
160 self.open_read_with_idx(field, 0)
161 }
162
163 pub fn open_read_with_idx(&self, field: Field, idx: usize) -> Option<FileSlice> {
166 self.offsets_index
167 .get(&FileAddr { field, idx })
168 .map(|byte_range| self.data.slice(byte_range.clone()))
169 }
170
171 pub fn space_usage(&self) -> PerFieldSpaceUsage {
172 let mut fields = HashMap::new();
173 for (&field_addr, byte_range) in &self.offsets_index {
174 fields
175 .entry(field_addr.field)
176 .or_insert_with(|| FieldUsage::empty(field_addr.field))
177 .add_field_idx(field_addr.idx, byte_range.len());
178 }
179 PerFieldSpaceUsage::new(fields)
180 }
181}
182
183#[cfg(test)]
184mod test {
185
186 use std::io::Write;
187 use std::path::Path;
188
189 use common::{BinarySerializable, VInt};
190
191 use super::{CompositeFile, CompositeWrite};
192 use crate::directory::{Directory, RamDirectory};
193 use crate::schema::Field;
194
195 #[test]
196 fn test_composite_file() -> crate::Result<()> {
197 let path = Path::new("test_path");
198 let directory = RamDirectory::create();
199 {
200 let w = directory.open_write(path).unwrap();
201 let mut composite_write = CompositeWrite::wrap(w);
202 let mut write_0 = composite_write.for_field(Field::from_field_id(0u32));
203 VInt(32431123u64).serialize(&mut write_0)?;
204 write_0.flush()?;
205 let mut write_4 = composite_write.for_field(Field::from_field_id(4u32));
206 VInt(2).serialize(&mut write_4)?;
207 write_4.flush()?;
208 composite_write.close()?;
209 }
210 {
211 let r = directory.open_read(path)?;
212 let composite_file = CompositeFile::open(&r)?;
213 {
214 let file0 = composite_file
215 .open_read(Field::from_field_id(0u32))
216 .unwrap()
217 .read_bytes()?;
218 let mut file0_buf = file0.as_slice();
219 let payload_0 = VInt::deserialize(&mut file0_buf)?.0;
220 assert_eq!(file0_buf.len(), 0);
221 assert_eq!(payload_0, 32431123u64);
222 }
223 {
224 let file4 = composite_file
225 .open_read(Field::from_field_id(4u32))
226 .unwrap()
227 .read_bytes()?;
228 let mut file4_buf = file4.as_slice();
229 let payload_4 = VInt::deserialize(&mut file4_buf)?.0;
230 assert_eq!(file4_buf.len(), 0);
231 assert_eq!(payload_4, 2u64);
232 }
233 }
234 Ok(())
235 }
236
237 #[test]
238 fn test_composite_file_bug() -> crate::Result<()> {
239 let path = Path::new("test_path");
240 let directory = RamDirectory::create();
241 {
242 let w = directory.open_write(path).unwrap();
243 let mut composite_write = CompositeWrite::wrap(w);
244 let mut write = composite_write.for_field_with_idx(Field::from_field_id(1u32), 0);
245 VInt(32431123u64).serialize(&mut write)?;
246 write.flush()?;
247 let write = composite_write.for_field_with_idx(Field::from_field_id(1u32), 1);
248 write.flush()?;
249
250 let mut write = composite_write.for_field_with_idx(Field::from_field_id(0u32), 0);
251 VInt(1_000_000).serialize(&mut write)?;
252 write.flush()?;
253
254 composite_write.close()?;
255 }
256 {
257 let r = directory.open_read(path)?;
258 let composite_file = CompositeFile::open(&r)?;
259 {
260 let file = composite_file
261 .open_read_with_idx(Field::from_field_id(1u32), 0)
262 .unwrap()
263 .read_bytes()?;
264 let mut file0_buf = file.as_slice();
265 let payload_0 = VInt::deserialize(&mut file0_buf)?.0;
266 assert_eq!(file0_buf.len(), 0);
267 assert_eq!(payload_0, 32431123u64);
268 }
269 {
270 let file = composite_file
271 .open_read_with_idx(Field::from_field_id(1u32), 1)
272 .unwrap()
273 .read_bytes()?;
274 let file = file.as_slice();
275 assert_eq!(file.len(), 0);
276 }
277 {
278 let file = composite_file
279 .open_read_with_idx(Field::from_field_id(0u32), 0)
280 .unwrap()
281 .read_bytes()?;
282 let file = file.as_slice();
283 assert_eq!(file.len(), 3);
284 }
285 }
286 Ok(())
287 }
288}