netidx-archive 0.32.0

netidx archive file format
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
use super::{
    reader::ArchiveIndex, scan_file, ArchiveReader, BatchItem, CompressionHeader,
    FileHeader, Id, MonotonicTimestamper, PathMapping, RecordHeader, RecordIndex,
    RecordTooLarge, RecordTyp, Timestamp, COMMITTED_OFFSET, FILE_VERSION, MAX_RECORD_LEN,
    PM_POOL,
};
use ahash::AHashMap;
use anyhow::Result;
use bytes::BufMut;
use chrono::prelude::*;
use fs3::{allocation_granularity, FileExt};
use indexmap::IndexMap;
use log::warn;
use memmap2::{Mmap, MmapMut};
use netidx::{pack::Pack, path::Path};
use nohash::IntSet;
use parking_lot::RwLock;
use poolshark::global::GPooled;
use std::{
    self,
    cmp::max,
    fs::{File, OpenOptions},
    iter::IntoIterator,
    mem,
    ops::Drop,
    path::Path as FilePath,
    sync::{
        atomic::{AtomicUsize, Ordering},
        Arc,
    },
};

/// This reads and writes the netidx archive format (as written by the
/// "record" command in the tools). The archive format is intended to
/// be a compact format for storing recordings of netidx data for long
/// term storage and access. It uses memory mapped IO for performance
/// and memory efficiency, and as such file size is limited to
/// `usize`.
///
/// Files begin with a file header, which consists of the string
/// "netidx archive" followed by the file format
/// version. Currently there is 1 version, and the version number
/// is 0.
///
/// Following the header are a series of records. Every record begins
/// with a (RecordHeader)[RecordHeader], which is followed by a data
/// item, except in the case of the end of archive record, which is
/// not followed by a data item.
///
/// Items are written to the file using a two phase commit scheme to
/// allow detection of possibly corrupted data. Initially, items are
/// marked as uncommitted, and only upon a successful flush to disk
/// are they then marked as committed.
///
/// When an archive is opened read-only, an index of it's contents is
/// built in memory so that any part of it can be accessed quickly by
/// timestamp. As a result, there is some memory overhead.
///
/// In order to facilitate full reconstruction of the state at any
/// point without requiring to decode the entire file up to that point
/// there are two types of data records, image records contain the
/// entire state of every archived value at a given time, and delta
/// records contain only values that changed since the last delta
/// record. The full state of the values can be constructed at a given
/// time `t` by seeking to the nearest image record that is before
/// `t`, and then processing all the delta records up to `t`.
///
/// Because data sets vary in requirements and size the writing of
/// image records is configurable in the archiver (e.g. write 1 image
/// per 512 MiB of deltas), and it is not required to write any image
/// records, however this will mean that reconstructing the state at
/// any point will require processing the entire file before that
/// point.
///
/// To prevent data corruption the underling file is locked for
/// exclusive access using the advisory file locking mechanism present
/// in the OS (e.g. flock on unix). If the file is modified
/// independent of advisory locking it could cause data corruption.
///
/// The record header is 8 bytes. A data record starts with a LEB128
/// encoded item counter, and then a number of items. Path ids are
/// also LEB128 encoded. So, for example, in an archive containing 1
/// path, a batch with 1 u64 data item would look like.
///
/// 8 byte header
/// 1 byte item count
/// 1 byte path id
/// 1 byte type tag
/// 8 byte u64
/// ----------------
/// 19 bytes (11 bytes of overhead 57%)
///
/// Better overheads can be achieved with larger batches, as should
/// naturally happen on busier systems. For example a batch of 128
/// u64s looks like.
///
/// 8 byte header
/// 1 byte item count
/// (1 byte path id
///  1 byte type tag
///  8 byte u64) * 128
/// ---------------------
/// 1289 bytes (264 bytes of overhead 20%)
pub struct ArchiveWriter {
    time: MonotonicTimestamper,
    path_by_id: IndexMap<Id, Path, nohash::BuildNoHashHasher<Id>>,
    id_by_path: AHashMap<Path, Id>,
    file: Arc<File>,
    _external_lock: Option<Arc<File>>,
    end: Arc<AtomicUsize>,
    committed: usize,
    next_id: u32,
    block_size: usize,
    mmap: MmapMut,
    indexed: bool,
    index: IntSet<Id>,
    index_vec: Vec<Id>,
}

impl Drop for ArchiveWriter {
    fn drop(&mut self) {
        let _ = self.flush();
        let _ = self.mmap.flush(); // for the committed header
        let _ = FileExt::unlock(&*self.file); // unlock the file for writing
    }
}

impl ArchiveWriter {
    pub(super) fn open_full(
        path: impl AsRef<FilePath>,
        indexed: bool,
        compress: Option<Vec<u8>>,
        external_lock: Option<impl AsRef<FilePath>>,
    ) -> Result<Self> {
        if mem::size_of::<usize>() < mem::size_of::<u64>() {
            warn!("archive file size is limited to 4 GiB on this platform")
        }
        let time = MonotonicTimestamper::new();
        let external_lock = if let Some(path) = external_lock {
            let lock = if FilePath::is_file(path.as_ref()) {
                OpenOptions::new().read(true).write(true).open(path.as_ref())?
            } else {
                OpenOptions::new()
                    .read(true)
                    .write(true)
                    .create(true)
                    .open(path.as_ref())?
            };
            lock.try_lock_exclusive()?;
            Some(Arc::new(lock))
        } else {
            None
        };
        if FilePath::is_file(path.as_ref()) {
            if compress.is_some() {
                bail!("can't write to an already compressed file")
            }
            let mut time_basis = DateTime::<Utc>::MIN_UTC;
            let file = OpenOptions::new().read(true).write(true).open(path.as_ref())?;
            if external_lock.is_none() {
                file.try_lock_exclusive()?;
            }
            let block_size = allocation_granularity(path)? as usize;
            let mmap = unsafe { MmapMut::map_mut(&file)? };
            let mut t = ArchiveWriter {
                time,
                path_by_id: IndexMap::default(),
                id_by_path: AHashMap::default(),
                file: Arc::new(file),
                _external_lock: external_lock,
                end: Arc::new(AtomicUsize::new(0)),
                committed: 0,
                next_id: 0,
                block_size,
                mmap,
                indexed: false,
                index: IntSet::default(),
                index_vec: Vec::new(),
            };
            let mut compress = None;
            let end = scan_file(
                &mut t.indexed,
                &mut compress,
                &mut t.path_by_id,
                &mut t.id_by_path,
                None,
                None,
                &mut time_basis,
                &mut t.next_id,
                &mut &*t.mmap,
            )?;
            if compress.is_some() {
                bail!("can't write to an already compressed file")
            }
            t.next_id += 1;
            t.end.store(end, Ordering::Relaxed);
            t.committed = end;
            Ok(t)
        } else {
            let file = OpenOptions::new()
                .read(true)
                .write(true)
                .create(true)
                .open(path.as_ref())?;
            if external_lock.is_none() {
                file.try_lock_exclusive()?;
            }
            let block_size = allocation_granularity(path.as_ref())? as usize;
            let fh_len = <FileHeader as Pack>::const_encoded_len().unwrap();
            let rh_len = <RecordHeader as Pack>::const_encoded_len().unwrap();
            let comp_hdr = compress.map(|dictionary| CompressionHeader { dictionary });
            let ch_len = comp_hdr.as_ref().map(|ch| ch.encoded_len()).unwrap_or(0);
            file.set_len(max(block_size, fh_len + rh_len + ch_len) as u64)?;
            let mut mmap = unsafe { MmapMut::map_mut(&file)? };
            let mut buf = &mut *mmap;
            let committed = (fh_len + ch_len) as u64;
            let fh = FileHeader {
                compressed: comp_hdr.is_some(),
                indexed,
                version: FILE_VERSION,
                committed,
            };
            <FileHeader as Pack>::encode(&fh, &mut buf)?;
            if let Some(hdr) = comp_hdr {
                hdr.encode(&mut buf)?;
            }
            mmap.flush()?;
            Ok(ArchiveWriter {
                time,
                path_by_id: IndexMap::default(),
                id_by_path: AHashMap::default(),
                file: Arc::new(file),
                _external_lock: external_lock,
                end: Arc::new(AtomicUsize::new(committed as usize)),
                committed: committed as usize,
                next_id: 0,
                block_size,
                mmap,
                indexed: true,
                index: IntSet::default(),
                index_vec: Vec::new(),
            })
        }
    }

    /// Open the specified archive for read/write access, if the file
    /// does not exist then a new archive will be created.
    pub fn open(path: impl AsRef<FilePath>) -> Result<Self> {
        Self::open_full(path, true, None, None::<&str>)
    }

    /// Open the specified archive with an external sentinel file
    /// for excusive lock.  It is intended to be used so that an external
    /// program can write to an archive while [netidx record] reads and
    /// publishes them at the same time.
    ///
    /// THIS IS POTENTIALLY DANGEROUS!  The protection against more
    /// than one writer, and therefore data corruption, is weaker than
    /// with [open](ArchiveWriter::open), since writers have to agree
    /// on the same external lock filename.
    pub fn open_external(
        path: impl AsRef<FilePath>,
        external_lock: impl AsRef<FilePath>,
    ) -> Result<Self> {
        Self::open_full(path, true, None, Some(external_lock))
    }

    // remap the file reserving space for at least additional_capacity bytes
    fn reserve(&mut self, additional_capacity: usize) -> Result<()> {
        let len = self.mmap.len();
        let new_len = len + max(len >> 6, additional_capacity);
        let new_blocks = (new_len / self.block_size as usize) + 1;
        let new_size = new_blocks * self.block_size as usize;
        self.file.set_len(new_size as u64)?;
        Ok(drop(mem::replace(&mut self.mmap, unsafe { MmapMut::map_mut(&*self.file)? })))
    }

    fn check_reserve(&mut self, record_length: usize) -> Result<usize> {
        if record_length > MAX_RECORD_LEN as usize {
            bail!(RecordTooLarge);
        }
        let len = <RecordHeader as Pack>::const_encoded_len().unwrap() + record_length;
        if self.mmap.len() - self.end.load(Ordering::Relaxed) < len {
            self.reserve(len)?;
        }
        Ok(len)
    }

    /// flush uncommitted changes to disk, mark all flushed records as
    /// committed, and update the end of archive marker. Does nothing
    /// if everything is already committed.
    pub fn flush(&mut self) -> Result<()> {
        let end = self.end.load(Ordering::Relaxed);
        if self.committed < end {
            self.mmap.flush()?; // first stage commit
            let mut buf = &mut self.mmap[COMMITTED_OFFSET..];
            buf.put_u64(end as u64);
            self.mmap.flush()?; // second stage commit
            self.committed = end;
        }
        Ok(())
    }

    /// allocate path ids for any of the specified paths that don't
    /// already have one, and write a path mappings record containing
    /// the new assignments.
    pub fn add_paths<'a>(
        &'a mut self,
        paths: impl IntoIterator<Item = &'a Path>,
    ) -> Result<()> {
        let mut pms = PM_POOL.take();
        for path in paths {
            if !self.id_by_path.contains_key(path) {
                let id = Id(self.next_id);
                self.next_id += 1;
                self.id_by_path.insert(path.clone(), id);
                self.path_by_id.insert(id, path.clone());
                pms.push(PathMapping(path.clone(), id));
            }
        }
        self.add_raw_pathmappings(pms)
    }

    pub(super) fn add_raw_pathmappings(
        &mut self,
        pms: GPooled<Vec<PathMapping>>,
    ) -> Result<()> {
        if pms.len() > 0 {
            let record_length = <GPooled<Vec<PathMapping>> as Pack>::encoded_len(&pms);
            let len = self.check_reserve(record_length)?;
            let end = self.end.load(Ordering::Relaxed);
            let mut buf = &mut self.mmap[end..];
            let rh = RecordHeader {
                record_type: RecordTyp::PathMappings,
                record_length: record_length as u32,
                timestamp: 0,
            };
            <RecordHeader as Pack>::encode(&rh, &mut buf)?;
            <GPooled<Vec<PathMapping>> as Pack>::encode(&pms, &mut buf)?;
            self.end.fetch_add(len, Ordering::AcqRel);
        }
        Ok(())
    }

    fn add_batch_f<F: FnOnce(&mut &mut [u8]) -> Result<()>>(
        &mut self,
        image: bool,
        timestamp: Timestamp,
        record_length: usize,
        f: F,
    ) -> Result<()> {
        if record_length > MAX_RECORD_LEN as usize {
            bail!(RecordTooLarge)
        }
        match timestamp {
            Timestamp::Offset(_) => (),
            Timestamp::NewBasis(basis) => {
                let record_length = <DateTime<Utc> as Pack>::encoded_len(&basis);
                let rh = RecordHeader {
                    record_type: RecordTyp::Timestamp,
                    record_length: record_length as u32,
                    timestamp: 0,
                };
                let len = self.check_reserve(record_length)?;
                let mut buf = &mut self.mmap[self.end.load(Ordering::Relaxed)..];
                <RecordHeader as Pack>::encode(&rh, &mut buf)?;
                <DateTime<Utc> as Pack>::encode(&basis, &mut buf)?;
                self.end.fetch_add(len, Ordering::AcqRel);
            }
        }
        let len = self.check_reserve(record_length)?;
        let mut buf = &mut self.mmap[self.end.load(Ordering::Relaxed)..];
        let rh = RecordHeader {
            record_type: if image {
                RecordTyp::ImageBatch
            } else {
                RecordTyp::DeltaBatch
            },
            record_length: record_length as u32,
            timestamp: timestamp.offset(),
        };
        <RecordHeader as Pack>::encode(&rh, &mut buf)?;
        f(&mut buf)?;
        self.end.fetch_add(len, Ordering::AcqRel);
        Ok(())
    }

    /// Add a data batch to the archive. If `image` is true then it
    /// will be marked as an image batch, and should contain a value
    /// for every subscriped path whether it changed or not, otherwise
    /// it will be marked as a delta batch, and should contain only
    /// values that changed since the last delta batch. This method
    /// will fail if any of the path ids in the batch are unknown.
    ///
    /// batch timestamps are monotonicly increasing, with the
    /// granularity of 1us. As such, one should avoid writing
    /// "spurious" batches, and generally for efficiency and
    /// correctness write as few batches as possible.
    pub fn add_batch(
        &mut self,
        image: bool,
        timestamp: DateTime<Utc>,
        batch: &GPooled<Vec<BatchItem>>,
    ) -> Result<()> {
        if batch.len() > 0 {
            let timestamp = self.time.timestamp(timestamp);
            let index = if self.indexed {
                if !image {
                    for BatchItem(id, _) in batch.iter() {
                        self.index.insert(*id);
                    }
                }
                self.index_vec.extend(self.index.drain());
                Some(RecordIndex { index: mem::replace(&mut self.index_vec, vec![]) })
            } else {
                None
            };
            let index_length = index
                .as_ref()
                .map(|i| <RecordIndex as Pack>::encoded_len(i))
                .unwrap_or(0);
            let record_length =
                index_length + <GPooled<Vec<BatchItem>> as Pack>::encoded_len(&batch);
            self.add_batch_f(image, timestamp, record_length, |buf| {
                if let Some(index) = &index {
                    <RecordIndex as Pack>::encode(index, buf)?
                }
                Ok(<GPooled<Vec<BatchItem>> as Pack>::encode(&batch, buf)?)
            })?;
            if let Some(index) = index {
                self.index_vec = index.index;
                self.index_vec.clear()
            }
        }
        Ok(())
    }

    // this is used to build a compressed archive
    pub(super) fn add_batch_raw(
        &mut self,
        image: bool,
        timestamp: DateTime<Utc>,
        batch: &[u8],
    ) -> Result<()> {
        use std::io::Write;
        if batch.len() > 0 {
            let timestamp = self.time.timestamp(timestamp);
            self.add_batch_f(image, timestamp, batch.len(), |buf| {
                Ok(BufMut::writer(buf).write_all(batch)?)
            })?
        }
        Ok(())
    }

    pub fn id_for_path(&self, path: &Path) -> Option<Id> {
        self.id_by_path.get(path).copied()
    }

    pub fn path_for_id(&self, id: &Id) -> Option<&Path> {
        self.path_by_id.get(id)
    }

    pub fn capacity(&self) -> usize {
        self.mmap.len()
    }

    pub fn len(&self) -> usize {
        self.end.load(Ordering::Relaxed)
    }

    pub fn block_size(&self) -> usize {
        self.block_size
    }

    /// Create an archive reader from this writer by creating a
    /// read-only duplicate of the memory map.
    ///
    /// If you need lots of readers it's best to create just one using
    /// this method, and then clone it, that way the same memory map
    /// can be shared by all the readers.
    pub fn reader(&self) -> Result<ArchiveReader> {
        Ok(ArchiveReader {
            index: Arc::new(RwLock::new(ArchiveIndex::new())),
            compressed: None,
            indexed: self.indexed,
            file: self.file.clone(),
            end: self.end.clone(),
            mmap: Arc::new(RwLock::new(unsafe { Mmap::map(&*self.file)? })),
        })
    }
}