1pub mod arraymap;
2mod reader;
3mod writer;
4
5#[cfg(test)]
6mod test;
7
8use self::arraymap::ArrayMap;
9pub use self::{
10 reader::{AlreadyCompressed, ArchiveReader},
11 writer::ArchiveWriter,
12};
13use anyhow::{Context, Error, Result};
14use arcstr::ArcStr;
15use bytes::{Buf, BufMut};
16use chrono::prelude::*;
17use fxhash::{FxBuildHasher, FxHashMap};
18use indexmap::IndexMap;
19use log::warn;
20use memmap2::Mmap;
21use netidx::{
22 pack::{decode_varint, encode_varint, varint_len, Pack, PackError},
23 path::Path,
24 subscriber::{Event, FromValue, Value},
25};
26use netidx_derive::Pack;
27use packed_struct::PackedStruct;
28use poolshark::global::{GPooled, Pool};
29use std::{
30 self,
31 cmp::max,
32 collections::VecDeque,
33 error, fmt,
34 fs::OpenOptions,
35 mem,
36 ops::{Bound, RangeBounds},
37 path::Path as FilePath,
38 str::FromStr,
39 sync::LazyLock,
40};
41
42#[derive(Debug, Clone)]
43pub struct FileHeader {
44 pub compressed: bool,
45 pub indexed: bool,
46 pub version: u32,
47 pub committed: u64,
48}
49
50static FILE_MAGIC: &'static [u8] = b"netidx archive";
51static COMMITTED_OFFSET: usize = FILE_MAGIC.len() + mem::size_of::<u32>();
52const FILE_VERSION: u32 = 0;
53
54impl Pack for FileHeader {
55 fn const_encoded_len() -> Option<usize> {
56 Some(COMMITTED_OFFSET + mem::size_of::<u64>())
57 }
58
59 fn encoded_len(&self) -> usize {
60 <FileHeader as Pack>::const_encoded_len().unwrap()
61 }
62
63 fn encode(&self, buf: &mut impl BufMut) -> Result<(), PackError> {
64 buf.put_slice(FILE_MAGIC);
65 buf.put_u32(
66 ((self.compressed as u32) << 31)
67 | ((self.indexed as u32) << 30)
68 | FILE_VERSION,
69 );
70 buf.put_u64(self.committed);
71 Ok(())
72 }
73
74 fn decode(buf: &mut impl Buf) -> Result<Self, PackError> {
75 for byte in FILE_MAGIC {
76 if buf.get_u8() != *byte {
77 return Err(PackError::InvalidFormat);
78 }
79 }
80 let v = buf.get_u32();
81 let version = v & 0x3FFF_FFFF;
82 let compressed = (v & 0x8000_0000) > 0;
83 let indexed = (v & 0x4000_0000) > 0;
84 let committed = buf.get_u64();
85 Ok(FileHeader { compressed, indexed, version, committed })
86 }
87}
88
89#[derive(Pack)]
91pub struct CompressionHeader {
92 pub dictionary: Vec<u8>,
93}
94
95#[derive(PrimitiveEnum, Debug, Clone, Copy)]
96pub enum RecordTyp {
97 Timestamp = 0,
99 PathMappings = 1,
101 DeltaBatch = 2,
103 ImageBatch = 3,
105}
106
107const MAX_RECORD_LEN: u32 = u32::MAX;
108const MAX_TIMESTAMP: u32 = 0x03FFFFFF;
109
110#[derive(PackedStruct, Debug, Clone, Copy)]
112#[packed_struct(bit_numbering = "msb0", size_bytes = "8")]
113pub struct RecordHeader {
114 #[packed_field(bits = "0:1", size_bits = "2", ty = "enum")]
116 pub record_type: RecordTyp,
117 #[packed_field(bits = "2:33", size_bits = "32", endian = "msb")]
119 pub record_length: u32,
120 #[packed_field(bits = "34:63", size_bits = "30", endian = "msb")]
122 pub timestamp: u32,
123}
124
125impl Pack for RecordHeader {
126 fn const_encoded_len() -> Option<usize> {
127 Some(8)
128 }
129
130 fn encoded_len(&self) -> usize {
131 <RecordHeader as Pack>::const_encoded_len().unwrap()
132 }
133
134 fn encode(&self, buf: &mut impl BufMut) -> Result<(), PackError> {
135 let hdr = RecordHeader::pack(self).map_err(|_| PackError::InvalidFormat)?;
136 Ok(buf.put(&hdr[..]))
137 }
138
139 fn decode(buf: &mut impl Buf) -> Result<Self, PackError> {
140 let mut v = [0u8; 8];
141 buf.copy_to_slice(&mut v);
142 RecordHeader::unpack(&v).map_err(|_| PackError::InvalidFormat)
143 }
144}
145
146#[derive(Debug, Clone, Pack)]
147pub struct RecordIndex {
148 pub index: Vec<Id>,
149}
150
151#[derive(Debug, Clone, Copy, PartialEq, Hash, Eq, PartialOrd, Ord)]
152pub struct Id(u32);
153
154impl Pack for Id {
155 fn encoded_len(&self) -> usize {
156 varint_len(self.0 as u64)
157 }
158
159 fn encode(&self, buf: &mut impl BufMut) -> Result<(), PackError> {
160 Ok(encode_varint(self.0 as u64, buf))
161 }
162
163 fn decode(buf: &mut impl Buf) -> Result<Self, PackError> {
164 Ok(Id(decode_varint(buf)? as u32))
165 }
166}
167
168#[derive(Debug, Clone)]
169struct PathMapping(Path, Id);
170
171impl Pack for PathMapping {
172 fn encoded_len(&self) -> usize {
173 <Path as Pack>::encoded_len(&self.0) + <Id as Pack>::encoded_len(&self.1)
174 }
175
176 fn encode(&self, buf: &mut impl BufMut) -> Result<(), PackError> {
177 <Path as Pack>::encode(&self.0, buf)?;
178 <Id as Pack>::encode(&self.1, buf)
179 }
180
181 fn decode(buf: &mut impl Buf) -> Result<Self, PackError> {
182 let path = <Path as Pack>::decode(buf)?;
183 let id = <Id as Pack>::decode(buf)?;
184 Ok(PathMapping(path, id))
185 }
186}
187
188#[derive(Debug, Clone, PartialEq, PartialOrd)]
189pub struct BatchItem(pub Id, pub Event);
190
191impl Pack for BatchItem {
192 fn encoded_len(&self) -> usize {
193 <Id as Pack>::encoded_len(&self.0) + Pack::encoded_len(&self.1)
194 }
195
196 fn encode(&self, buf: &mut impl BufMut) -> Result<(), PackError> {
197 <Id as Pack>::encode(&self.0, buf)?;
198 <Event as Pack>::encode(&self.1, buf)
199 }
200
201 fn decode(buf: &mut impl Buf) -> Result<Self, PackError> {
202 let id = <Id as Pack>::decode(buf)?;
203 Ok(BatchItem(id, <Event as Pack>::decode(buf)?))
204 }
205}
206
207#[derive(Debug, Clone, Copy, Pack)]
208pub enum Seek {
209 Beginning,
210 End,
211 Absolute(DateTime<Utc>),
212 BatchRelative(i8),
213 TimeRelative(chrono::Duration),
214}
215
216impl ToString for Seek {
217 fn to_string(&self) -> String {
218 match self {
219 Seek::Beginning => "beginning".into(),
220 Seek::End => "end".into(),
221 Seek::Absolute(dt) => dt.to_rfc3339(),
222 Seek::BatchRelative(i) => i.to_string(),
223 Seek::TimeRelative(d) => {
224 if d < &chrono::Duration::zero() {
225 format!("{}s", d.num_seconds())
226 } else {
227 format!("+{}s", d.num_seconds())
228 }
229 }
230 }
231 }
232}
233
234impl FromStr for Seek {
235 type Err = Error;
236
237 fn from_str(s: &str) -> Result<Self> {
238 use diligent_date_parser::parse_date;
239 let s = s.trim();
240 if s == "beginning" {
241 Ok(Seek::Beginning)
242 } else if s == "end" {
243 Ok(Seek::End)
244 } else if let Ok(steps) = s.parse::<i8>() {
245 Ok(Seek::BatchRelative(steps))
246 } else if let Some(dt) = parse_date(s) {
247 Ok(Seek::Absolute(dt.with_timezone(&Utc)))
248 } else if s.starts_with(['+', '-'].as_ref())
249 && s.ends_with(['y', 'M', 'd', 'h', 'm', 's', 'u'].as_ref())
250 && s.is_ascii()
251 && s.len() > 2
252 {
253 let dir = s.chars().next().unwrap();
254 let mag = s.chars().next_back().unwrap();
255 match s[1..s.len() - 1].parse::<f64>() {
256 Err(_) => bail!("invalid position expression"),
257 Ok(quantity) => {
258 let quantity = if mag == 'y' {
259 quantity * 365.24 * 86400.
260 } else if mag == 'M' {
261 quantity * (365.24 / 12.) * 86400.
262 } else if mag == 'd' {
263 quantity * 86400.
264 } else if mag == 'h' {
265 quantity * 3600.
266 } else if mag == 'm' {
267 quantity * 60.
268 } else if mag == 's' {
269 quantity
270 } else {
271 quantity * 1e-6
272 };
273 let offset = chrono::Duration::nanoseconds(if dir == '+' {
274 (quantity * 1e9).trunc() as i64
275 } else {
276 (-1. * quantity * 1e9).trunc() as i64
277 });
278 if dir == '+' {
279 Ok(Seek::TimeRelative(offset))
280 } else {
281 Ok(Seek::TimeRelative(offset))
282 }
283 }
284 }
285 } else {
286 bail!("{} is not a valid seek expression", s)
287 }
288 }
289}
290
291impl Into<Value> for Seek {
292 fn into(self) -> Value {
293 self.to_string().into()
294 }
295}
296
297impl FromValue for Seek {
298 fn from_value(v: Value) -> Result<Self> {
299 match v {
300 Value::DateTime(ts) => Ok(Seek::Absolute(*ts)),
301 v if v.number() => Ok(Seek::BatchRelative(v.cast_to::<i8>()?)),
302 v => v.cast_to::<ArcStr>()?.parse::<Seek>(),
303 }
304 }
305
306 fn get(_: Value) -> Option<Self> {
307 None
308 }
309}
310
311static PM_POOL: LazyLock<Pool<Vec<PathMapping>>> =
312 LazyLock::new(|| Pool::new(10, 100_000));
313pub static BATCH_POOL: LazyLock<Pool<Vec<BatchItem>>> =
314 LazyLock::new(|| Pool::new(10, 100_000));
315pub(crate) static CURSOR_BATCH_POOL: LazyLock<
316 Pool<VecDeque<(DateTime<Utc>, GPooled<Vec<BatchItem>>)>>,
317> = LazyLock::new(|| Pool::new(100, 10_000));
318pub(crate) static IMG_POOL: LazyLock<Pool<FxHashMap<Id, Event>>> =
319 LazyLock::new(|| Pool::new(100, 10_000));
320static EPSILON: chrono::Duration = chrono::Duration::microseconds(1);
321
322#[derive(Debug, Clone, Copy)]
323enum Timestamp {
324 NewBasis(DateTime<Utc>),
325 Offset(u32),
326}
327
328impl Timestamp {
329 pub fn offset(&self) -> u32 {
330 match self {
331 Timestamp::NewBasis(_) => 0,
332 Timestamp::Offset(off) => *off,
333 }
334 }
335}
336
337#[derive(Debug, Clone, Copy)]
348struct MonotonicTimestamper {
349 prev: DateTime<Utc>,
350 basis: Option<DateTime<Utc>>,
351 offset: u32,
352}
353
354impl MonotonicTimestamper {
355 fn new() -> Self {
356 MonotonicTimestamper { prev: Utc::now(), basis: None, offset: 0 }
357 }
358
359 fn update_basis(&mut self, new_basis: DateTime<Utc>) -> DateTime<Utc> {
360 use chrono::Duration;
361 match self.basis {
362 None => {
363 self.basis = Some(new_basis);
364 self.offset = 0;
365 new_basis
366 }
367 Some(old_basis) => {
368 let old_ts = old_basis + Duration::microseconds(self.offset as i64);
369 if old_ts > new_basis {
370 self.basis = Some(old_ts);
371 self.offset = 0;
372 old_ts
373 } else {
374 self.basis = Some(new_basis);
375 self.offset = 0;
376 new_basis
377 }
378 }
379 }
380 }
381
382 fn timestamp(&mut self, now: DateTime<Utc>) -> Timestamp {
383 use chrono::Duration;
384 let ts = match self.basis {
385 None => Timestamp::NewBasis(self.update_basis(now)),
386 Some(basis) => match (now - self.prev).num_microseconds() {
387 Some(off) if off <= 0 => {
388 if self.offset < MAX_TIMESTAMP {
389 self.offset += 1;
390 Timestamp::Offset(self.offset)
391 } else {
392 let basis = self.update_basis(basis + Duration::microseconds(1));
393 Timestamp::NewBasis(basis)
394 }
395 }
396 Some(off) if (self.offset as i64 + off) <= MAX_TIMESTAMP as i64 => {
397 self.offset += off as u32;
398 Timestamp::Offset(self.offset)
399 }
400 None | Some(_) => Timestamp::NewBasis(self.update_basis(now)),
401 },
402 };
403 self.prev = now;
404 ts
405 }
406}
407
408#[derive(Debug, Clone, Copy)]
409pub struct Cursor {
410 start: Bound<DateTime<Utc>>,
411 end: Bound<DateTime<Utc>>,
412 current: Option<DateTime<Utc>>,
413}
414
415impl Cursor {
416 pub fn new() -> Self {
417 Cursor { start: Bound::Unbounded, end: Bound::Unbounded, current: None }
418 }
419
420 pub fn create_from(
424 start: Bound<DateTime<Utc>>,
425 end: Bound<DateTime<Utc>>,
426 pos: Option<DateTime<Utc>>,
427 ) -> Self {
428 let mut t = Self::new();
429 t.set_start(start);
430 t.set_end(end);
431 if let Some(pos) = pos {
432 t.set_current(pos);
433 }
434 t
435 }
436
437 pub fn reset(&mut self) {
438 self.current = None;
439 }
440
441 pub fn at_start(&self) -> bool {
447 match self.start {
448 Bound::Unbounded => false,
449 Bound::Excluded(st) => {
450 self.current.map(|pos| st + EPSILON == pos).unwrap_or(false)
451 }
452 Bound::Included(st) => self.current.map(|pos| st == pos).unwrap_or(false),
453 }
454 }
455
456 pub fn at_end(&self) -> bool {
462 match self.end {
463 Bound::Unbounded => false,
464 Bound::Excluded(en) => {
465 self.current.map(|pos| en - EPSILON == pos).unwrap_or(false)
466 }
467 Bound::Included(en) => self.current.map(|pos| en == pos).unwrap_or(false),
468 }
469 }
470
471 pub fn set_current(&mut self, pos: DateTime<Utc>) {
475 if (self.start, self.end).contains(&pos) {
476 self.current = Some(pos);
477 } else {
478 match (self.start, self.end) {
479 (Bound::Unbounded, Bound::Unbounded) => unreachable!(),
480 (Bound::Unbounded, Bound::Included(ts)) => {
481 self.current = Some(ts);
482 }
483 (Bound::Unbounded, Bound::Excluded(ts)) => {
484 self.current = Some(ts - EPSILON);
485 }
486 (Bound::Included(ts), Bound::Unbounded) => {
487 self.current = Some(ts);
488 }
489 (Bound::Excluded(ts), Bound::Unbounded) => {
490 self.current = Some(ts + EPSILON);
491 }
492 (Bound::Included(start), Bound::Included(end)) => {
493 if pos < start {
494 self.current = Some(start);
495 } else {
496 self.current = Some(end);
497 }
498 }
499 (Bound::Excluded(start), Bound::Excluded(end)) => {
500 if pos <= start {
501 self.current = Some(start + EPSILON);
502 } else {
503 self.current = Some(end - EPSILON);
504 }
505 }
506 (Bound::Excluded(start), Bound::Included(end)) => {
507 if pos <= start {
508 self.current = Some(start + EPSILON);
509 } else {
510 self.current = Some(end);
511 }
512 }
513 (Bound::Included(start), Bound::Excluded(end)) => {
514 if pos < start {
515 self.current = Some(start);
516 } else {
517 self.current = Some(end - EPSILON);
518 }
519 }
520 }
521 }
522 }
523
524 pub fn start(&self) -> &Bound<DateTime<Utc>> {
525 &self.start
526 }
527
528 pub fn end(&self) -> &Bound<DateTime<Utc>> {
529 &self.end
530 }
531
532 pub fn current(&self) -> Option<DateTime<Utc>> {
533 self.current
534 }
535
536 pub fn contains(&self, ts: &DateTime<Utc>) -> bool {
537 (self.start, self.end).contains(ts)
538 }
539
540 fn maybe_reset(&mut self) {
541 if let Some(ref current) = self.current {
542 if !(self.start, self.end).contains(current) {
543 self.current = None;
544 }
545 }
546 }
547
548 pub fn set_start(&mut self, start: Bound<DateTime<Utc>>) {
549 self.start = start;
550 self.maybe_reset();
551 }
552
553 pub fn set_end(&mut self, end: Bound<DateTime<Utc>>) {
554 self.end = end;
555 self.maybe_reset();
556 }
557}
558
559#[derive(Debug, Clone, Copy)]
563pub struct RecordTooLarge;
564
565impl fmt::Display for RecordTooLarge {
566 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
567 write!(f, "{:?}", self)
568 }
569}
570
571impl error::Error for RecordTooLarge {}
572
573fn scan_records(
574 path_by_id: &mut IndexMap<Id, Path, FxBuildHasher>,
575 id_by_path: &mut FxHashMap<Path, Id>,
576 mut imagemap: Option<&mut ArrayMap<DateTime<Utc>, usize>>,
577 mut deltamap: Option<&mut ArrayMap<DateTime<Utc>, usize>>,
578 time_basis: &mut DateTime<Utc>,
579 max_id: &mut u32,
580 end: usize,
581 start_pos: usize,
582 buf: &mut impl Buf,
583) -> Result<usize> {
584 let total_size = buf.remaining();
585 let res = loop {
586 let pos = start_pos + (total_size - buf.remaining());
587 if pos >= end {
588 break Ok(pos);
589 }
590 if buf.remaining() < <RecordHeader as Pack>::const_encoded_len().unwrap() {
591 break Ok(pos);
592 }
593 let rh = RecordHeader::decode(buf)
594 .map_err(Error::from)
595 .context("read record header")?;
596 if buf.remaining() < rh.record_length as usize {
597 warn!("truncated record at {}", pos);
598 break Ok(pos);
599 }
600 use chrono::Duration;
601 match rh.record_type {
602 RecordTyp::DeltaBatch => {
603 if let Some(deltamap) = &mut deltamap {
604 let timestamp =
605 *time_basis + Duration::microseconds(rh.timestamp as i64);
606 deltamap.insert(timestamp, pos);
607 }
608 buf.advance(rh.record_length as usize); }
610 RecordTyp::Timestamp => {
611 *time_basis = <DateTime<Utc> as Pack>::decode(buf)?;
612 }
613 RecordTyp::ImageBatch => {
614 if let Some(imagemap) = &mut imagemap {
615 let timestamp =
616 *time_basis + Duration::microseconds(rh.timestamp as i64);
617 imagemap.insert(timestamp, pos);
618 }
619 buf.advance(rh.record_length as usize); }
621 RecordTyp::PathMappings => {
622 let mut m = <GPooled<Vec<PathMapping>> as Pack>::decode(buf)
623 .map_err(Error::from)
624 .context("invalid path mappings record")?;
625 for pm in m.drain(..) {
626 if let Some(old) = id_by_path.insert(pm.0.clone(), pm.1) {
627 warn!(
628 "duplicate path mapping for {}, {:?}, {:?}",
629 &*pm.0, old, pm.1
630 );
631 }
632 if let Some(old) = path_by_id.insert(pm.1, pm.0.clone()) {
633 warn!("duplicate id mapping for {}, {}, {:?}", &*pm.0, old, pm.1)
634 }
635 *max_id = max(pm.1 .0, *max_id);
636 }
637 }
638 }
639 };
640 if let Some(deltamap) = deltamap {
641 deltamap.shrink_to_fit();
642 }
643 if let Some(imagemap) = imagemap {
644 imagemap.shrink_to_fit();
645 }
646 res
647}
648
649fn scan_header(buf: &mut impl Buf) -> Result<FileHeader> {
650 if buf.remaining() < <FileHeader as Pack>::const_encoded_len().unwrap() {
652 bail!("invalid file header: too short")
653 }
654 let header = <FileHeader as Pack>::decode(buf)
655 .map_err(Error::from)
656 .context("read file header")?;
657 if header.version != FILE_VERSION {
659 bail!("file version mismatch, expected {} got {}", header.version, FILE_VERSION)
660 }
661 Ok(header)
662}
663
664pub fn read_file_header(path: impl AsRef<FilePath>) -> Result<FileHeader> {
667 let file = OpenOptions::new().read(true).open(path.as_ref()).context("open file")?;
668 let mmap = unsafe { Mmap::map(&file)? };
669 scan_header(&mut &mmap[..])
670}
671
672fn scan_file(
673 indexed: &mut bool,
674 compressed: &mut Option<CompressionHeader>,
675 path_by_id: &mut IndexMap<Id, Path, FxBuildHasher>,
676 id_by_path: &mut FxHashMap<Path, Id>,
677 imagemap: Option<&mut ArrayMap<DateTime<Utc>, usize>>,
678 deltamap: Option<&mut ArrayMap<DateTime<Utc>, usize>>,
679 time_basis: &mut DateTime<Utc>,
680 max_id: &mut u32,
681 buf: &mut impl Buf,
682) -> Result<usize> {
683 let total_bytes = buf.remaining();
684 let header = scan_header(buf).context("scan header")?;
685 if header.compressed {
686 *compressed =
687 Some(CompressionHeader::decode(buf).context("read compression header")?);
688 }
689 *indexed = header.indexed;
690 scan_records(
691 path_by_id,
692 id_by_path,
693 imagemap,
694 deltamap,
695 time_basis,
696 max_id,
697 header.committed as usize,
698 total_bytes - buf.remaining(),
699 buf,
700 )
701}