Skip to main content

netidx_archive/logfile/
mod.rs

1pub mod arraymap;
2mod reader;
3mod writer;
4
5#[cfg(test)]
6mod test;
7
8use self::arraymap::ArrayMap;
9pub use self::{
10    reader::{AlreadyCompressed, ArchiveReader},
11    writer::ArchiveWriter,
12};
13use anyhow::{Context, Error, Result};
14use arcstr::ArcStr;
15use bytes::{Buf, BufMut};
16use chrono::prelude::*;
17use fxhash::{FxBuildHasher, FxHashMap};
18use indexmap::IndexMap;
19use log::warn;
20use memmap2::Mmap;
21use netidx::{
22    pack::{decode_varint, encode_varint, varint_len, Pack, PackError},
23    path::Path,
24    subscriber::{Event, FromValue, Value},
25};
26use netidx_derive::Pack;
27use packed_struct::PackedStruct;
28use poolshark::global::{GPooled, Pool};
29use std::{
30    self,
31    cmp::max,
32    collections::VecDeque,
33    error, fmt,
34    fs::OpenOptions,
35    mem,
36    ops::{Bound, RangeBounds},
37    path::Path as FilePath,
38    str::FromStr,
39    sync::LazyLock,
40};
41
42#[derive(Debug, Clone)]
43pub struct FileHeader {
44    pub compressed: bool,
45    pub indexed: bool,
46    pub version: u32,
47    pub committed: u64,
48}
49
50static FILE_MAGIC: &'static [u8] = b"netidx archive";
51static COMMITTED_OFFSET: usize = FILE_MAGIC.len() + mem::size_of::<u32>();
52const FILE_VERSION: u32 = 0;
53
54impl Pack for FileHeader {
55    fn const_encoded_len() -> Option<usize> {
56        Some(COMMITTED_OFFSET + mem::size_of::<u64>())
57    }
58
59    fn encoded_len(&self) -> usize {
60        <FileHeader as Pack>::const_encoded_len().unwrap()
61    }
62
63    fn encode(&self, buf: &mut impl BufMut) -> Result<(), PackError> {
64        buf.put_slice(FILE_MAGIC);
65        buf.put_u32(
66            ((self.compressed as u32) << 31)
67                | ((self.indexed as u32) << 30)
68                | FILE_VERSION,
69        );
70        buf.put_u64(self.committed);
71        Ok(())
72    }
73
74    fn decode(buf: &mut impl Buf) -> Result<Self, PackError> {
75        for byte in FILE_MAGIC {
76            if buf.get_u8() != *byte {
77                return Err(PackError::InvalidFormat);
78            }
79        }
80        let v = buf.get_u32();
81        let version = v & 0x3FFF_FFFF;
82        let compressed = (v & 0x8000_0000) > 0;
83        let indexed = (v & 0x4000_0000) > 0;
84        let committed = buf.get_u64();
85        Ok(FileHeader { compressed, indexed, version, committed })
86    }
87}
88
89/// this is only present if the file has compressed batches
90#[derive(Pack)]
91pub struct CompressionHeader {
92    pub dictionary: Vec<u8>,
93}
94
95#[derive(PrimitiveEnum, Debug, Clone, Copy)]
96pub enum RecordTyp {
97    /// A time basis record
98    Timestamp = 0,
99    /// A record mapping paths to ids
100    PathMappings = 1,
101    /// A data batch containing deltas from the previous batch
102    DeltaBatch = 2,
103    /// A data batch containing a full image
104    ImageBatch = 3,
105}
106
107const MAX_RECORD_LEN: u32 = u32::MAX;
108const MAX_TIMESTAMP: u32 = 0x03FFFFFF;
109
110// Every record in the archive starts with this header
111#[derive(PackedStruct, Debug, Clone, Copy)]
112#[packed_struct(bit_numbering = "msb0", size_bytes = "8")]
113pub struct RecordHeader {
114    // the record type
115    #[packed_field(bits = "0:1", size_bits = "2", ty = "enum")]
116    pub record_type: RecordTyp,
117    // the record length, up to MAX_RECORD_LEN, not including this header
118    #[packed_field(bits = "2:33", size_bits = "32", endian = "msb")]
119    pub record_length: u32,
120    // microsecond offset from last timestamp record, up to MAX_TIMESTAMP
121    #[packed_field(bits = "34:63", size_bits = "30", endian = "msb")]
122    pub timestamp: u32,
123}
124
125impl Pack for RecordHeader {
126    fn const_encoded_len() -> Option<usize> {
127        Some(8)
128    }
129
130    fn encoded_len(&self) -> usize {
131        <RecordHeader as Pack>::const_encoded_len().unwrap()
132    }
133
134    fn encode(&self, buf: &mut impl BufMut) -> Result<(), PackError> {
135        let hdr = RecordHeader::pack(self).map_err(|_| PackError::InvalidFormat)?;
136        Ok(buf.put(&hdr[..]))
137    }
138
139    fn decode(buf: &mut impl Buf) -> Result<Self, PackError> {
140        let mut v = [0u8; 8];
141        buf.copy_to_slice(&mut v);
142        RecordHeader::unpack(&v).map_err(|_| PackError::InvalidFormat)
143    }
144}
145
146#[derive(Debug, Clone, Pack)]
147pub struct RecordIndex {
148    pub index: Vec<Id>,
149}
150
151#[derive(Debug, Clone, Copy, PartialEq, Hash, Eq, PartialOrd, Ord)]
152pub struct Id(u32);
153
154impl Pack for Id {
155    fn encoded_len(&self) -> usize {
156        varint_len(self.0 as u64)
157    }
158
159    fn encode(&self, buf: &mut impl BufMut) -> Result<(), PackError> {
160        Ok(encode_varint(self.0 as u64, buf))
161    }
162
163    fn decode(buf: &mut impl Buf) -> Result<Self, PackError> {
164        Ok(Id(decode_varint(buf)? as u32))
165    }
166}
167
168#[derive(Debug, Clone)]
169struct PathMapping(Path, Id);
170
171impl Pack for PathMapping {
172    fn encoded_len(&self) -> usize {
173        <Path as Pack>::encoded_len(&self.0) + <Id as Pack>::encoded_len(&self.1)
174    }
175
176    fn encode(&self, buf: &mut impl BufMut) -> Result<(), PackError> {
177        <Path as Pack>::encode(&self.0, buf)?;
178        <Id as Pack>::encode(&self.1, buf)
179    }
180
181    fn decode(buf: &mut impl Buf) -> Result<Self, PackError> {
182        let path = <Path as Pack>::decode(buf)?;
183        let id = <Id as Pack>::decode(buf)?;
184        Ok(PathMapping(path, id))
185    }
186}
187
188#[derive(Debug, Clone, PartialEq, PartialOrd)]
189pub struct BatchItem(pub Id, pub Event);
190
191impl Pack for BatchItem {
192    fn encoded_len(&self) -> usize {
193        <Id as Pack>::encoded_len(&self.0) + Pack::encoded_len(&self.1)
194    }
195
196    fn encode(&self, buf: &mut impl BufMut) -> Result<(), PackError> {
197        <Id as Pack>::encode(&self.0, buf)?;
198        <Event as Pack>::encode(&self.1, buf)
199    }
200
201    fn decode(buf: &mut impl Buf) -> Result<Self, PackError> {
202        let id = <Id as Pack>::decode(buf)?;
203        Ok(BatchItem(id, <Event as Pack>::decode(buf)?))
204    }
205}
206
207#[derive(Debug, Clone, Copy, Pack)]
208pub enum Seek {
209    Beginning,
210    End,
211    Absolute(DateTime<Utc>),
212    BatchRelative(i8),
213    TimeRelative(chrono::Duration),
214}
215
216impl ToString for Seek {
217    fn to_string(&self) -> String {
218        match self {
219            Seek::Beginning => "beginning".into(),
220            Seek::End => "end".into(),
221            Seek::Absolute(dt) => dt.to_rfc3339(),
222            Seek::BatchRelative(i) => i.to_string(),
223            Seek::TimeRelative(d) => {
224                if d < &chrono::Duration::zero() {
225                    format!("{}s", d.num_seconds())
226                } else {
227                    format!("+{}s", d.num_seconds())
228                }
229            }
230        }
231    }
232}
233
234impl FromStr for Seek {
235    type Err = Error;
236
237    fn from_str(s: &str) -> Result<Self> {
238        use diligent_date_parser::parse_date;
239        let s = s.trim();
240        if s == "beginning" {
241            Ok(Seek::Beginning)
242        } else if s == "end" {
243            Ok(Seek::End)
244        } else if let Ok(steps) = s.parse::<i8>() {
245            Ok(Seek::BatchRelative(steps))
246        } else if let Some(dt) = parse_date(s) {
247            Ok(Seek::Absolute(dt.with_timezone(&Utc)))
248        } else if s.starts_with(['+', '-'].as_ref())
249            && s.ends_with(['y', 'M', 'd', 'h', 'm', 's', 'u'].as_ref())
250            && s.is_ascii()
251            && s.len() > 2
252        {
253            let dir = s.chars().next().unwrap();
254            let mag = s.chars().next_back().unwrap();
255            match s[1..s.len() - 1].parse::<f64>() {
256                Err(_) => bail!("invalid position expression"),
257                Ok(quantity) => {
258                    let quantity = if mag == 'y' {
259                        quantity * 365.24 * 86400.
260                    } else if mag == 'M' {
261                        quantity * (365.24 / 12.) * 86400.
262                    } else if mag == 'd' {
263                        quantity * 86400.
264                    } else if mag == 'h' {
265                        quantity * 3600.
266                    } else if mag == 'm' {
267                        quantity * 60.
268                    } else if mag == 's' {
269                        quantity
270                    } else {
271                        quantity * 1e-6
272                    };
273                    let offset = chrono::Duration::nanoseconds(if dir == '+' {
274                        (quantity * 1e9).trunc() as i64
275                    } else {
276                        (-1. * quantity * 1e9).trunc() as i64
277                    });
278                    if dir == '+' {
279                        Ok(Seek::TimeRelative(offset))
280                    } else {
281                        Ok(Seek::TimeRelative(offset))
282                    }
283                }
284            }
285        } else {
286            bail!("{} is not a valid seek expression", s)
287        }
288    }
289}
290
291impl Into<Value> for Seek {
292    fn into(self) -> Value {
293        self.to_string().into()
294    }
295}
296
297impl FromValue for Seek {
298    fn from_value(v: Value) -> Result<Self> {
299        match v {
300            Value::DateTime(ts) => Ok(Seek::Absolute(*ts)),
301            v if v.number() => Ok(Seek::BatchRelative(v.cast_to::<i8>()?)),
302            v => v.cast_to::<ArcStr>()?.parse::<Seek>(),
303        }
304    }
305
306    fn get(_: Value) -> Option<Self> {
307        None
308    }
309}
310
311static PM_POOL: LazyLock<Pool<Vec<PathMapping>>> =
312    LazyLock::new(|| Pool::new(10, 100_000));
313pub static BATCH_POOL: LazyLock<Pool<Vec<BatchItem>>> =
314    LazyLock::new(|| Pool::new(10, 100_000));
315pub(crate) static CURSOR_BATCH_POOL: LazyLock<
316    Pool<VecDeque<(DateTime<Utc>, GPooled<Vec<BatchItem>>)>>,
317> = LazyLock::new(|| Pool::new(100, 10_000));
318pub(crate) static IMG_POOL: LazyLock<Pool<FxHashMap<Id, Event>>> =
319    LazyLock::new(|| Pool::new(100, 10_000));
320static EPSILON: chrono::Duration = chrono::Duration::microseconds(1);
321
322#[derive(Debug, Clone, Copy)]
323enum Timestamp {
324    NewBasis(DateTime<Utc>),
325    Offset(u32),
326}
327
328impl Timestamp {
329    pub fn offset(&self) -> u32 {
330        match self {
331            Timestamp::NewBasis(_) => 0,
332            Timestamp::Offset(off) => *off,
333        }
334    }
335}
336
337/// The goal of this structure are as follows in order of importance
338/// 1. Monotonic. subsuquent calls to timestamp() will always be greater than previous calls.
339/// 2. Steady. Clock skew should be minimized where possible.
340/// 3. Accurate. Time stamps should be close to the actual time
341/// 4. Precise. Small differences in time should be representable.
342/// 5. Compact. Time stamps should use as little space as possible.
343///
344/// Unfortunatly because system provided time functions are often
345/// awful some careful and elaborate logic is required in order to
346/// meet the above goals.
347#[derive(Debug, Clone, Copy)]
348struct MonotonicTimestamper {
349    prev: DateTime<Utc>,
350    basis: Option<DateTime<Utc>>,
351    offset: u32,
352}
353
354impl MonotonicTimestamper {
355    fn new() -> Self {
356        MonotonicTimestamper { prev: Utc::now(), basis: None, offset: 0 }
357    }
358
359    fn update_basis(&mut self, new_basis: DateTime<Utc>) -> DateTime<Utc> {
360        use chrono::Duration;
361        match self.basis {
362            None => {
363                self.basis = Some(new_basis);
364                self.offset = 0;
365                new_basis
366            }
367            Some(old_basis) => {
368                let old_ts = old_basis + Duration::microseconds(self.offset as i64);
369                if old_ts > new_basis {
370                    self.basis = Some(old_ts);
371                    self.offset = 0;
372                    old_ts
373                } else {
374                    self.basis = Some(new_basis);
375                    self.offset = 0;
376                    new_basis
377                }
378            }
379        }
380    }
381
382    fn timestamp(&mut self, now: DateTime<Utc>) -> Timestamp {
383        use chrono::Duration;
384        let ts = match self.basis {
385            None => Timestamp::NewBasis(self.update_basis(now)),
386            Some(basis) => match (now - self.prev).num_microseconds() {
387                Some(off) if off <= 0 => {
388                    if self.offset < MAX_TIMESTAMP {
389                        self.offset += 1;
390                        Timestamp::Offset(self.offset)
391                    } else {
392                        let basis = self.update_basis(basis + Duration::microseconds(1));
393                        Timestamp::NewBasis(basis)
394                    }
395                }
396                Some(off) if (self.offset as i64 + off) <= MAX_TIMESTAMP as i64 => {
397                    self.offset += off as u32;
398                    Timestamp::Offset(self.offset)
399                }
400                None | Some(_) => Timestamp::NewBasis(self.update_basis(now)),
401            },
402        };
403        self.prev = now;
404        ts
405    }
406}
407
408#[derive(Debug, Clone, Copy)]
409pub struct Cursor {
410    start: Bound<DateTime<Utc>>,
411    end: Bound<DateTime<Utc>>,
412    current: Option<DateTime<Utc>>,
413}
414
415impl Cursor {
416    pub fn new() -> Self {
417        Cursor { start: Bound::Unbounded, end: Bound::Unbounded, current: None }
418    }
419
420    /// create a cursor with pre initialized start, end, and pos. pos
421    /// will be adjusted so it falls within the bounds of start and
422    /// end.
423    pub fn create_from(
424        start: Bound<DateTime<Utc>>,
425        end: Bound<DateTime<Utc>>,
426        pos: Option<DateTime<Utc>>,
427    ) -> Self {
428        let mut t = Self::new();
429        t.set_start(start);
430        t.set_end(end);
431        if let Some(pos) = pos {
432            t.set_current(pos);
433        }
434        t
435    }
436
437    pub fn reset(&mut self) {
438        self.current = None;
439    }
440
441    /// return true if the cursor is at the start. If the start is
442    /// unbounded then this will always return false.
443    ///
444    /// if the cursor doesn't have a position then this method will
445    /// return false.
446    pub fn at_start(&self) -> bool {
447        match self.start {
448            Bound::Unbounded => false,
449            Bound::Excluded(st) => {
450                self.current.map(|pos| st + EPSILON == pos).unwrap_or(false)
451            }
452            Bound::Included(st) => self.current.map(|pos| st == pos).unwrap_or(false),
453        }
454    }
455
456    /// return true if the cursor is at the end. If the end is
457    /// unbounded then this will always return false.
458    ///
459    /// if the cursor doesn't have a position then this method will
460    /// return false.
461    pub fn at_end(&self) -> bool {
462        match self.end {
463            Bound::Unbounded => false,
464            Bound::Excluded(en) => {
465                self.current.map(|pos| en - EPSILON == pos).unwrap_or(false)
466            }
467            Bound::Included(en) => self.current.map(|pos| en == pos).unwrap_or(false),
468        }
469    }
470
471    /// Move the current to the specified position in the archive. If
472    /// `pos` is outside the bounds of the cursor, then set the
473    /// current to the closest value that is in bounds.
474    pub fn set_current(&mut self, pos: DateTime<Utc>) {
475        if (self.start, self.end).contains(&pos) {
476            self.current = Some(pos);
477        } else {
478            match (self.start, self.end) {
479                (Bound::Unbounded, Bound::Unbounded) => unreachable!(),
480                (Bound::Unbounded, Bound::Included(ts)) => {
481                    self.current = Some(ts);
482                }
483                (Bound::Unbounded, Bound::Excluded(ts)) => {
484                    self.current = Some(ts - EPSILON);
485                }
486                (Bound::Included(ts), Bound::Unbounded) => {
487                    self.current = Some(ts);
488                }
489                (Bound::Excluded(ts), Bound::Unbounded) => {
490                    self.current = Some(ts + EPSILON);
491                }
492                (Bound::Included(start), Bound::Included(end)) => {
493                    if pos < start {
494                        self.current = Some(start);
495                    } else {
496                        self.current = Some(end);
497                    }
498                }
499                (Bound::Excluded(start), Bound::Excluded(end)) => {
500                    if pos <= start {
501                        self.current = Some(start + EPSILON);
502                    } else {
503                        self.current = Some(end - EPSILON);
504                    }
505                }
506                (Bound::Excluded(start), Bound::Included(end)) => {
507                    if pos <= start {
508                        self.current = Some(start + EPSILON);
509                    } else {
510                        self.current = Some(end);
511                    }
512                }
513                (Bound::Included(start), Bound::Excluded(end)) => {
514                    if pos < start {
515                        self.current = Some(start);
516                    } else {
517                        self.current = Some(end - EPSILON);
518                    }
519                }
520            }
521        }
522    }
523
524    pub fn start(&self) -> &Bound<DateTime<Utc>> {
525        &self.start
526    }
527
528    pub fn end(&self) -> &Bound<DateTime<Utc>> {
529        &self.end
530    }
531
532    pub fn current(&self) -> Option<DateTime<Utc>> {
533        self.current
534    }
535
536    pub fn contains(&self, ts: &DateTime<Utc>) -> bool {
537        (self.start, self.end).contains(ts)
538    }
539
540    fn maybe_reset(&mut self) {
541        if let Some(ref current) = self.current {
542            if !(self.start, self.end).contains(current) {
543                self.current = None;
544            }
545        }
546    }
547
548    pub fn set_start(&mut self, start: Bound<DateTime<Utc>>) {
549        self.start = start;
550        self.maybe_reset();
551    }
552
553    pub fn set_end(&mut self, end: Bound<DateTime<Utc>>) {
554        self.end = end;
555        self.maybe_reset();
556    }
557}
558
559/// This error will be raised if you try to write a record that is too
560/// large to represent in 31 bits to the file. Nothing will be written
561/// in that case, so you can just split the record and try again.
562#[derive(Debug, Clone, Copy)]
563pub struct RecordTooLarge;
564
565impl fmt::Display for RecordTooLarge {
566    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
567        write!(f, "{:?}", self)
568    }
569}
570
571impl error::Error for RecordTooLarge {}
572
573fn scan_records(
574    path_by_id: &mut IndexMap<Id, Path, FxBuildHasher>,
575    id_by_path: &mut FxHashMap<Path, Id>,
576    mut imagemap: Option<&mut ArrayMap<DateTime<Utc>, usize>>,
577    mut deltamap: Option<&mut ArrayMap<DateTime<Utc>, usize>>,
578    time_basis: &mut DateTime<Utc>,
579    max_id: &mut u32,
580    end: usize,
581    start_pos: usize,
582    buf: &mut impl Buf,
583) -> Result<usize> {
584    let total_size = buf.remaining();
585    let res = loop {
586        let pos = start_pos + (total_size - buf.remaining());
587        if pos >= end {
588            break Ok(pos);
589        }
590        if buf.remaining() < <RecordHeader as Pack>::const_encoded_len().unwrap() {
591            break Ok(pos);
592        }
593        let rh = RecordHeader::decode(buf)
594            .map_err(Error::from)
595            .context("read record header")?;
596        if buf.remaining() < rh.record_length as usize {
597            warn!("truncated record at {}", pos);
598            break Ok(pos);
599        }
600        use chrono::Duration;
601        match rh.record_type {
602            RecordTyp::DeltaBatch => {
603                if let Some(deltamap) = &mut deltamap {
604                    let timestamp =
605                        *time_basis + Duration::microseconds(rh.timestamp as i64);
606                    deltamap.insert(timestamp, pos);
607                }
608                buf.advance(rh.record_length as usize); // skip the contents
609            }
610            RecordTyp::Timestamp => {
611                *time_basis = <DateTime<Utc> as Pack>::decode(buf)?;
612            }
613            RecordTyp::ImageBatch => {
614                if let Some(imagemap) = &mut imagemap {
615                    let timestamp =
616                        *time_basis + Duration::microseconds(rh.timestamp as i64);
617                    imagemap.insert(timestamp, pos);
618                }
619                buf.advance(rh.record_length as usize); // skip the contents
620            }
621            RecordTyp::PathMappings => {
622                let mut m = <GPooled<Vec<PathMapping>> as Pack>::decode(buf)
623                    .map_err(Error::from)
624                    .context("invalid path mappings record")?;
625                for pm in m.drain(..) {
626                    if let Some(old) = id_by_path.insert(pm.0.clone(), pm.1) {
627                        warn!(
628                            "duplicate path mapping for {}, {:?}, {:?}",
629                            &*pm.0, old, pm.1
630                        );
631                    }
632                    if let Some(old) = path_by_id.insert(pm.1, pm.0.clone()) {
633                        warn!("duplicate id mapping for {}, {}, {:?}", &*pm.0, old, pm.1)
634                    }
635                    *max_id = max(pm.1 .0, *max_id);
636                }
637            }
638        }
639    };
640    if let Some(deltamap) = deltamap {
641        deltamap.shrink_to_fit();
642    }
643    if let Some(imagemap) = imagemap {
644        imagemap.shrink_to_fit();
645    }
646    res
647}
648
649fn scan_header(buf: &mut impl Buf) -> Result<FileHeader> {
650    // check the file header
651    if buf.remaining() < <FileHeader as Pack>::const_encoded_len().unwrap() {
652        bail!("invalid file header: too short")
653    }
654    let header = <FileHeader as Pack>::decode(buf)
655        .map_err(Error::from)
656        .context("read file header")?;
657    // this is the first version, so no upgrading can be done yet
658    if header.version != FILE_VERSION {
659        bail!("file version mismatch, expected {} got {}", header.version, FILE_VERSION)
660    }
661    Ok(header)
662}
663
664/// just read the file header directly from the file, bypass locking,
665/// and don't touch any other part of the file.
666pub fn read_file_header(path: impl AsRef<FilePath>) -> Result<FileHeader> {
667    let file = OpenOptions::new().read(true).open(path.as_ref()).context("open file")?;
668    let mmap = unsafe { Mmap::map(&file)? };
669    scan_header(&mut &mmap[..])
670}
671
672fn scan_file(
673    indexed: &mut bool,
674    compressed: &mut Option<CompressionHeader>,
675    path_by_id: &mut IndexMap<Id, Path, FxBuildHasher>,
676    id_by_path: &mut FxHashMap<Path, Id>,
677    imagemap: Option<&mut ArrayMap<DateTime<Utc>, usize>>,
678    deltamap: Option<&mut ArrayMap<DateTime<Utc>, usize>>,
679    time_basis: &mut DateTime<Utc>,
680    max_id: &mut u32,
681    buf: &mut impl Buf,
682) -> Result<usize> {
683    let total_bytes = buf.remaining();
684    let header = scan_header(buf).context("scan header")?;
685    if header.compressed {
686        *compressed =
687            Some(CompressionHeader::decode(buf).context("read compression header")?);
688    }
689    *indexed = header.indexed;
690    scan_records(
691        path_by_id,
692        id_by_path,
693        imagemap,
694        deltamap,
695        time_basis,
696        max_id,
697        header.committed as usize,
698        total_bytes - buf.remaining(),
699        buf,
700    )
701}