1use parking_lot::Mutex;
28use rustc_hash::FxHashMap;
29use std::cmp::min;
30use std::convert::TryInto;
31use std::error::Error;
32use std::fmt::Debug;
33use std::fs;
34use std::io::Write;
35use std::sync::Arc;
36
37const MAX_PAGE_SIZE: usize = 256 * 1024;
38
39const MIN_PAGE_SIZE: usize = MAX_PAGE_SIZE / 2;
43
44#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
45#[repr(u8)]
46pub enum PageTag {
47 Events = 0,
48 StringData = 1,
49 StringIndex = 2,
50}
51
52impl std::convert::TryFrom<u8> for PageTag {
53 type Error = String;
54
55 fn try_from(value: u8) -> Result<Self, Self::Error> {
56 match value {
57 0 => Ok(PageTag::Events),
58 1 => Ok(PageTag::StringData),
59 2 => Ok(PageTag::StringIndex),
60 _ => Err(format!("Could not convert byte `{}` to PageTag.", value)),
61 }
62 }
63}
64
65#[derive(Clone, Copy, Eq, PartialEq, Debug)]
73pub struct Addr(pub u64);
74
75impl Addr {
76 pub fn as_usize(self) -> usize {
77 self.0 as usize
78 }
79}
80
81#[derive(Debug)]
82pub struct SerializationSink {
83 shared_state: SharedState,
84 data: Mutex<SerializationSinkInner>,
85 page_tag: PageTag,
86}
87
88pub struct SerializationSinkBuilder(SharedState);
89
90impl SerializationSinkBuilder {
91 pub fn new_from_file(file: fs::File) -> Result<Self, Box<dyn Error + Send + Sync>> {
92 Ok(Self(SharedState(Arc::new(Mutex::new(
93 BackingStorage::File(file),
94 )))))
95 }
96
97 pub fn new_in_memory() -> SerializationSinkBuilder {
98 Self(SharedState(Arc::new(Mutex::new(BackingStorage::Memory(
99 Vec::new(),
100 )))))
101 }
102
103 pub fn new_sink(&self, page_tag: PageTag) -> SerializationSink {
104 SerializationSink {
105 data: Mutex::new(SerializationSinkInner {
106 buffer: Vec::with_capacity(MAX_PAGE_SIZE),
107 addr: 0,
108 }),
109 shared_state: self.0.clone(),
110 page_tag,
111 }
112 }
113}
114
115#[derive(Debug)]
118enum BackingStorage {
119 File(fs::File),
120 Memory(Vec<u8>),
121}
122
123impl Write for BackingStorage {
124 #[inline]
125 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
126 match *self {
127 BackingStorage::File(ref mut file) => file.write(buf),
128 BackingStorage::Memory(ref mut vec) => vec.write(buf),
129 }
130 }
131
132 fn flush(&mut self) -> std::io::Result<()> {
133 match *self {
134 BackingStorage::File(ref mut file) => file.flush(),
135 BackingStorage::Memory(_) => {
136 Ok(())
138 }
139 }
140 }
141}
142
143pub struct StdWriteAdapter<'a>(&'a SerializationSink);
145
146impl<'a> Write for StdWriteAdapter<'a> {
147 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
148 self.0.write_bytes_atomic(buf);
149 Ok(buf.len())
150 }
151
152 fn flush(&mut self) -> std::io::Result<()> {
153 let mut data = self.0.data.lock();
154 let SerializationSinkInner {
155 ref mut buffer,
156 addr: _,
157 } = *data;
158
159 self.0.flush(buffer);
161
162 self.0.shared_state.0.lock().flush()?;
164
165 Ok(())
166 }
167}
168
169#[derive(Debug)]
170struct SerializationSinkInner {
171 buffer: Vec<u8>,
172 addr: u64,
173}
174
175#[derive(Clone, Debug)]
178struct SharedState(Arc<Mutex<BackingStorage>>);
179
180impl SharedState {
181 fn copy_bytes_with_page_tag(&self, page_tag: PageTag) -> Vec<u8> {
186 let data = self.0.lock();
187 let data = match *data {
188 BackingStorage::File(_) => panic!(),
189 BackingStorage::Memory(ref data) => data,
190 };
191
192 split_streams(data).remove(&page_tag).unwrap_or(Vec::new())
193 }
194}
195
196pub fn split_streams(paged_data: &[u8]) -> FxHashMap<PageTag, Vec<u8>> {
216 let mut result: FxHashMap<PageTag, Vec<u8>> = FxHashMap::default();
217
218 let mut pos = 0;
219 while pos < paged_data.len() {
220 let tag = TryInto::try_into(paged_data[pos]).unwrap();
221 let page_size =
222 u32::from_le_bytes(paged_data[pos + 1..pos + 5].try_into().unwrap()) as usize;
223
224 assert!(page_size > 0);
225
226 result
227 .entry(tag)
228 .or_default()
229 .extend_from_slice(&paged_data[pos + 5..pos + 5 + page_size]);
230
231 pos += page_size + 5;
232 }
233
234 result
235}
236
237impl SerializationSink {
238 fn write_page(&self, bytes: &[u8]) {
243 if bytes.len() > 0 {
244 assert!(bytes.len() <= MAX_PAGE_SIZE);
248
249 let mut file = self.shared_state.0.lock();
250
251 file.write_all(&[self.page_tag as u8]).unwrap();
252
253 let page_size: [u8; 4] = (bytes.len() as u32).to_le_bytes();
254 file.write_all(&page_size).unwrap();
255 file.write_all(&bytes[..]).unwrap();
256 }
257 }
258
259 fn flush(&self, buffer: &mut Vec<u8>) {
262 self.write_page(&buffer[..]);
263 buffer.clear();
264 }
265
266 pub fn into_bytes(mut self) -> Vec<u8> {
270 let mut data = Mutex::new(SerializationSinkInner {
273 buffer: Vec::new(),
274 addr: 0,
275 });
276 std::mem::swap(&mut self.data, &mut data);
277
278 let SerializationSinkInner {
280 ref mut buffer,
281 addr: _,
282 } = data.into_inner();
283
284 self.flush(buffer);
287
288 self.shared_state.copy_bytes_with_page_tag(self.page_tag)
289 }
290
291 pub fn write_atomic<W>(&self, num_bytes: usize, write: W) -> Addr
302 where
303 W: FnOnce(&mut [u8]),
304 {
305 if num_bytes > MAX_PAGE_SIZE {
306 let mut bytes = vec![0u8; num_bytes];
307 write(&mut bytes[..]);
308 return self.write_bytes_atomic(&bytes[..]);
309 }
310
311 let mut data = self.data.lock();
312 let SerializationSinkInner {
313 ref mut buffer,
314 ref mut addr,
315 } = *data;
316
317 if buffer.len() + num_bytes > MAX_PAGE_SIZE {
318 self.flush(buffer);
319 assert!(buffer.is_empty());
320 }
321
322 let curr_addr = *addr;
323
324 let buf_start = buffer.len();
325 let buf_end = buf_start + num_bytes;
326 buffer.resize(buf_end, 0u8);
327 write(&mut buffer[buf_start..buf_end]);
328
329 *addr += num_bytes as u64;
330
331 Addr(curr_addr)
332 }
333
334 pub fn write_bytes_atomic(&self, bytes: &[u8]) -> Addr {
345 if bytes.len() <= 128 {
347 return self.write_atomic(bytes.len(), |sink| {
348 sink.copy_from_slice(bytes);
349 });
350 }
351
352 let mut data = self.data.lock();
353 let SerializationSinkInner {
354 ref mut buffer,
355 ref mut addr,
356 } = *data;
357
358 let curr_addr = Addr(*addr);
359 *addr += bytes.len() as u64;
360
361 let mut bytes_left = bytes;
362
363 if buffer.len() < MIN_PAGE_SIZE {
366 let num_bytes_to_take = min(MIN_PAGE_SIZE - buffer.len(), bytes_left.len());
367 buffer.extend_from_slice(&bytes_left[..num_bytes_to_take]);
368 bytes_left = &bytes_left[num_bytes_to_take..];
369 }
370
371 if bytes_left.is_empty() {
372 return curr_addr;
373 }
374
375 self.flush(buffer);
377
378 for chunk in bytes_left.chunks(MAX_PAGE_SIZE) {
379 if chunk.len() == MAX_PAGE_SIZE {
380 self.write_page(chunk);
385 } else {
386 if chunk.len() >= MIN_PAGE_SIZE {
391 self.write_page(chunk);
392 } else {
393 debug_assert!(buffer.is_empty());
394 buffer.extend_from_slice(chunk);
395 }
396 }
397 }
398
399 curr_addr
400 }
401
402 pub fn as_std_write<'a>(&'a self) -> impl Write + 'a {
403 StdWriteAdapter(self)
404 }
405}
406
407impl Drop for SerializationSink {
408 fn drop(&mut self) {
409 let mut data = self.data.lock();
410 let SerializationSinkInner {
411 ref mut buffer,
412 addr: _,
413 } = *data;
414
415 self.flush(buffer);
416 }
417}
418
419#[cfg(test)]
420mod tests {
421 use super::*;
422
423 fn test_roundtrip<W>(chunk_size: usize, chunk_count: usize, write: W)
429 where
430 W: Fn(&SerializationSink, &[u8]) -> Addr,
431 {
432 let sink_builder = SerializationSinkBuilder::new_in_memory();
433 let tags = [PageTag::Events, PageTag::StringData, PageTag::StringIndex];
434 let expected_chunk: Vec<u8> = (0..chunk_size).map(|x| (x % 239) as u8).collect();
435
436 {
437 let sinks: Vec<SerializationSink> =
438 tags.iter().map(|&tag| sink_builder.new_sink(tag)).collect();
439
440 for chunk_index in 0..chunk_count {
441 let expected_addr = Addr((chunk_index * chunk_size) as u64);
442 for sink in sinks.iter() {
443 assert_eq!(write(sink, &expected_chunk[..]), expected_addr);
444 }
445 }
446 }
447
448 let streams: Vec<Vec<u8>> = tags
449 .iter()
450 .map(|&tag| sink_builder.0.copy_bytes_with_page_tag(tag))
451 .collect();
452
453 for stream in streams {
454 for chunk in stream.chunks(chunk_size) {
455 assert_eq!(chunk, expected_chunk);
456 }
457 }
458 }
459
460 fn write_closure(sink: &SerializationSink, bytes: &[u8]) -> Addr {
461 sink.write_atomic(bytes.len(), |dest| dest.copy_from_slice(bytes))
462 }
463
464 fn write_slice(sink: &SerializationSink, bytes: &[u8]) -> Addr {
465 sink.write_bytes_atomic(bytes)
466 }
467
468 macro_rules! mk_roundtrip_test {
471 ($name:ident, $chunk_size:expr, $chunk_count:expr) => {
472 mod $name {
473 use super::*;
474
475 #[test]
476 fn write_atomic() {
477 test_roundtrip($chunk_size, $chunk_count, write_closure);
478 }
479
480 #[test]
481 fn write_bytes_atomic() {
482 test_roundtrip($chunk_size, $chunk_count, write_slice);
483 }
484 }
485 };
486 }
487
488 mk_roundtrip_test!(small_data, 10, (90 * MAX_PAGE_SIZE) / 100);
489 mk_roundtrip_test!(huge_data, MAX_PAGE_SIZE * 10, 5);
490
491 mk_roundtrip_test!(exactly_max_page_size, MAX_PAGE_SIZE, 10);
492 mk_roundtrip_test!(max_page_size_plus_one, MAX_PAGE_SIZE + 1, 10);
493 mk_roundtrip_test!(max_page_size_minus_one, MAX_PAGE_SIZE - 1, 10);
494
495 mk_roundtrip_test!(exactly_min_page_size, MIN_PAGE_SIZE, 10);
496 mk_roundtrip_test!(min_page_size_plus_one, MIN_PAGE_SIZE + 1, 10);
497 mk_roundtrip_test!(min_page_size_minus_one, MIN_PAGE_SIZE - 1, 10);
498}