analogize/
lib.rs

1#![doc = include_str!("../README.md")]
2
3use std::collections::hash_map::Entry;
4use std::collections::{HashMap, HashSet};
5use std::ffi::c_void;
6use std::fs::{metadata, read_dir, rename, File, Metadata, OpenOptions};
7use std::io::{BufRead, BufReader, ErrorKind, Read, Write};
8use std::os::fd::{AsRawFd, RawFd};
9use std::os::unix::fs::MetadataExt;
10use std::path::{Path, PathBuf};
11use std::str::FromStr;
12use std::sync::atomic::{AtomicBool, Ordering};
13use std::sync::{Arc, Mutex};
14use std::thread::{sleep, JoinHandle};
15use std::time::Duration;
16
17use chrono::{DateTime, DurationRound, TimeDelta, Utc};
18
19use buffertk::Unpackable;
20use indicio::protobuf::ClueVector;
21use indicio::{Clue, Value};
22use mani::{Edit, Manifest, ManifestOptions};
23use prototk::FieldNumber;
24use scrunch::bit_vector::sparse::BitVector;
25use scrunch::bit_vector::BitVector as BitVectorTrait;
26use scrunch::builder::Builder;
27use scrunch::{CompressedDocument, Document, RecordOffset};
28use zerror::{iotoz, Z};
29use zerror_core::ErrorCore;
30
31mod parser;
32
33/////////////////////////////////////////////// Error //////////////////////////////////////////////
34
35#[derive(zerror_derive::Z)]
36pub enum Error {
37    Success {
38        core: ErrorCore,
39    },
40    System {
41        core: ErrorCore,
42        kind: ErrorKind,
43        what: String,
44    },
45    DirectoryNotFound {
46        core: ErrorCore,
47        what: String,
48    },
49    DirectoryAlreadyExists {
50        core: ErrorCore,
51        what: String,
52    },
53    Manifest {
54        core: ErrorCore,
55        what: mani::Error,
56    },
57    InvalidNumberLiteral {
58        core: ErrorCore,
59        as_str: String,
60    },
61    Parsing {
62        core: ErrorCore,
63        what: String,
64    },
65    InvalidSymbolTable {
66        core: ErrorCore,
67        line: String,
68    },
69    InvalidPath {
70        core: ErrorCore,
71        what: String,
72    },
73    InvalidTimestamp {
74        core: ErrorCore,
75        what: i64,
76    },
77    Scrunch {
78        core: ErrorCore,
79        what: scrunch::Error,
80    },
81    Indicio {
82        core: ErrorCore,
83        what: prototk::Error,
84    },
85    EmptyClueFile {
86        core: ErrorCore,
87    },
88    FileTooLarge {
89        core: ErrorCore,
90    },
91}
92
93iotoz! {Error}
94
95impl From<std::io::Error> for Error {
96    fn from(err: std::io::Error) -> Self {
97        Self::System {
98            core: ErrorCore::default(),
99            kind: err.kind(),
100            what: err.to_string(),
101        }
102    }
103}
104
105impl From<mani::Error> for Error {
106    fn from(err: mani::Error) -> Self {
107        Self::Manifest {
108            core: ErrorCore::default(),
109            what: err,
110        }
111    }
112}
113
114impl From<scrunch::Error> for Error {
115    fn from(err: scrunch::Error) -> Self {
116        Self::Scrunch {
117            core: ErrorCore::default(),
118            what: err,
119        }
120    }
121}
122
123impl From<prototk::Error> for Error {
124    fn from(err: prototk::Error) -> Self {
125        Self::Indicio {
126            core: ErrorCore::default(),
127            what: err,
128        }
129    }
130}
131
132impl From<parser::ParseError> for Error {
133    fn from(err: parser::ParseError) -> Self {
134        Self::Parsing {
135            core: ErrorCore::default(),
136            what: err.what().to_string(),
137        }
138    }
139}
140
141//////////////////////////////////////////// SymbolTable ///////////////////////////////////////////
142
143#[derive(Debug)]
144pub struct SymbolTable {
145    symbols: HashMap<String, u32>,
146    next_symbol: u32,
147}
148
149impl SymbolTable {
150    pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
151        if !path.as_ref().exists() {
152            return Ok(Self::default());
153        }
154        let file = File::open(path.as_ref())
155            .as_z()
156            .with_info("path", path.as_ref().to_string_lossy())?;
157        Self::from_reader(file)
158    }
159
160    pub fn from_reader<R: Read>(reader: R) -> Result<Self, Error> {
161        let reader = BufReader::new(reader);
162        let mut syms = SymbolTable::default();
163        for line in reader.lines() {
164            let line = line?;
165            let line = line.trim();
166            let mut pieces: Vec<&str> = line.rsplitn(2, ' ').collect();
167            pieces.reverse();
168            if pieces.len() != 2 {
169                return Err(Error::InvalidSymbolTable {
170                    core: ErrorCore::default(),
171                    line: line.to_string(),
172                });
173            }
174            let mangled = pieces[0].to_string();
175            let symbol = u32::from_str(pieces[1]).map_err(|_| Error::InvalidNumberLiteral {
176                core: ErrorCore::default(),
177                as_str: pieces[1].to_string(),
178            })?;
179            syms.symbols.insert(mangled, symbol);
180            syms.next_symbol = std::cmp::max(syms.next_symbol, symbol + 2);
181        }
182        Ok(syms)
183    }
184
185    pub fn to_file<P: AsRef<Path>>(&self, path: P) -> Result<(), Error> {
186        let mut output = OpenOptions::new()
187            .create(true)
188            .truncate(true)
189            .write(true)
190            .open(path)?;
191        let mut sorted = self.symbols.iter().collect::<Vec<_>>();
192        sorted.sort();
193        for (mangled, symbol) in sorted.into_iter() {
194            writeln!(output, "{} {}", mangled, symbol)?;
195        }
196        output.flush()?;
197        Ok(())
198    }
199
200    pub fn append_dummy_record(&mut self, text: &mut Vec<u32>) {
201        text.push(u32::MAX);
202    }
203
204    pub fn translate(&mut self, clue: Clue, text: &mut Vec<u32>) {
205        let value = indicio::value!({
206            file: clue.file,
207            line: clue.line,
208            level: clue.level,
209            timestamp: clue.timestamp,
210            value: clue.value,
211        });
212        self.translate_recursive(&value, "", text);
213    }
214
215    pub fn reverse_translate(&self, text: &[u32]) -> Option<Value> {
216        // TODO(rescrv): Log the rate of failure.
217        self.reverse_translate_recursive(text, "")
218    }
219
220    pub fn translate_query(&self, query: &Query) -> Vec<Vec<u32>> {
221        self.translate_query_recursive(query, "")
222    }
223
224    pub fn reverse_translate_query(&self, text: &[u32]) -> Option<Value> {
225        if text.is_empty() {
226            return None;
227        }
228        let symbol = &self.reverse_lookup(text[0])?;
229        let terminal = self.reverse_translate_recursive(text, &symbol[..symbol.len() - 1])?;
230        fn build_from_symbol(mut symbol: &str, terminal: Value) -> Option<Value> {
231            if symbol.is_empty() {
232                return Some(terminal);
233            }
234            match &symbol[0..1] {
235                "o" => {
236                    if symbol.len() == 1 {
237                        return Some(terminal);
238                    }
239                    if &symbol[1..2] != "k" {
240                        return None;
241                    }
242                    let len: String = symbol[2..]
243                        .chars()
244                        .take_while(|c| c.is_ascii_digit())
245                        .collect();
246                    symbol = &symbol[2 + len.len()..];
247                    let len = usize::from_str(&len).ok()?;
248                    let key = symbol[..len].to_string();
249                    let obj = build_from_symbol(&symbol[len..], terminal)?;
250                    Some(Value::Object(indicio::Map::from_iter(vec![(key, obj)])))
251                }
252                "a" => {
253                    let obj = build_from_symbol(&symbol[1..], terminal)?;
254                    Some(Value::Array(indicio::Values::from(vec![obj])))
255                }
256                _ => Some(terminal),
257            }
258        }
259        build_from_symbol(symbol, terminal)
260    }
261
262    pub fn iter(&self) -> impl Iterator<Item = (&str, u32)> {
263        self.symbols.iter().map(|(k, v)| (k.as_ref(), *v))
264    }
265
266    pub fn markers(&self) -> impl Iterator<Item = (u32, u32)> {
267        let mut markers = vec![];
268        for (sym, text) in self.symbols.iter() {
269            if sym.ends_with('t') || sym.ends_with('f') || sym.ends_with('n') || sym.ends_with('#')
270            {
271                markers.push((*text, *text));
272            } else {
273                markers.push((*text, *text + 1));
274            }
275        }
276        markers.into_iter()
277    }
278
279    fn translate_recursive(&mut self, value: &Value, symbol: &str, text: &mut Vec<u32>) {
280        match value {
281            Value::Bool(b) => {
282                let symbol = symbol.to_string() + if *b { "T" } else { "F" };
283                let sigma = self.lookup_symbol(&symbol);
284                text.push(sigma);
285            }
286            Value::I64(x) => {
287                let symbol = symbol.to_string() + "i";
288                let sigma = self.lookup_symbol(&symbol);
289                text.push(sigma);
290                for b in x.to_be_bytes() {
291                    text.push(b as u32);
292                }
293                text.push(sigma + 1);
294            }
295            Value::U64(x) => {
296                let symbol = symbol.to_string() + "u";
297                let sigma = self.lookup_symbol(&symbol);
298                text.push(sigma);
299                for b in x.to_be_bytes() {
300                    text.push(b as u32);
301                }
302                text.push(sigma + 1);
303            }
304            Value::F64(x) => {
305                let symbol = symbol.to_string() + "f";
306                let sigma = self.lookup_symbol(&symbol);
307                text.push(sigma);
308                for b in x.to_bits().to_be_bytes() {
309                    text.push(b as u32);
310                }
311                text.push(sigma + 1);
312            }
313            Value::String(s) => {
314                let symbol = symbol.to_string() + "s";
315                let sigma = self.lookup_symbol(&symbol);
316                text.push(sigma);
317                for c in s.chars() {
318                    text.push(c as u32);
319                }
320                text.push(sigma + 1);
321            }
322            Value::Array(a) => {
323                let symbol = symbol.to_string() + "a";
324                let sigma = self.lookup_symbol(&symbol);
325                text.push(sigma);
326                for e in a.iter() {
327                    self.translate_recursive(e, &symbol, text);
328                }
329                text.push(sigma + 1);
330            }
331            Value::Object(o) => {
332                let symbol = symbol.to_string() + "o";
333                let sigma = self.lookup_symbol(&symbol);
334                text.push(sigma);
335                for (k, v) in o.iter() {
336                    let len = k.chars().count();
337                    let symbol = format!("{}k{}{}", symbol, len, k);
338                    self.translate_recursive(v, &symbol, text);
339                }
340                text.push(sigma + 1);
341            }
342        };
343    }
344
345    fn translate_query_recursive(&self, query: &Query, symbol: &str) -> Vec<Vec<u32>> {
346        match query {
347            Query::Any => {
348                let symbol = symbol.to_string();
349                let mut result = vec![];
350                for c in &["o", "a", "T", "F", "i", "u", "f"] {
351                    if let Some(sigma) = self.symbols.get(&(symbol.clone() + c)).copied() {
352                        result.push(vec![sigma])
353                    }
354                }
355                result
356            }
357            Query::True => {
358                let symbol = symbol.to_string() + "t";
359                if let Some(sigma) = self.symbols.get(&symbol).copied() {
360                    vec![vec![sigma]]
361                } else {
362                    vec![]
363                }
364            }
365            Query::False => {
366                let symbol = symbol.to_string() + "f";
367                if let Some(sigma) = self.symbols.get(&symbol).copied() {
368                    vec![vec![sigma]]
369                } else {
370                    vec![]
371                }
372            }
373            Query::I64(x) => {
374                let isymbol = symbol.to_string() + "i";
375                let usymbol = symbol.to_string() + "u";
376                if let Some(sigma) = self.symbols.get(&isymbol) {
377                    let mut text = Vec::new();
378                    text.push(*sigma);
379                    for b in x.to_be_bytes() {
380                        text.push(b as u32);
381                    }
382                    text.push(*sigma + 1);
383                    vec![text]
384                } else if let Some(sigma) = self.symbols.get(&usymbol) {
385                    let mut text = Vec::new();
386                    text.push(*sigma);
387                    for b in x.to_be_bytes() {
388                        text.push(b as u32);
389                    }
390                    text.push(*sigma + 1);
391                    vec![text]
392                } else {
393                    vec![]
394                }
395            }
396            Query::U64(x) => {
397                let isymbol = symbol.to_string() + "i";
398                let usymbol = symbol.to_string() + "u";
399                if let Some(sigma) = self.symbols.get(&usymbol) {
400                    let mut text = Vec::new();
401                    text.push(*sigma);
402                    for b in x.to_be_bytes() {
403                        text.push(b as u32);
404                    }
405                    text.push(*sigma + 1);
406                    vec![text]
407                } else if let Some(sigma) = self.symbols.get(&isymbol) {
408                    let mut text = Vec::new();
409                    text.push(*sigma);
410                    for b in x.to_be_bytes() {
411                        text.push(b as u32);
412                    }
413                    text.push(*sigma + 1);
414                    vec![text]
415                } else {
416                    vec![]
417                }
418            }
419            Query::F64(x) => {
420                let symbol = symbol.to_string() + "f";
421                if let Some(sigma) = self.symbols.get(&symbol) {
422                    let mut text = Vec::new();
423                    text.push(*sigma);
424                    for b in x.to_bits().to_be_bytes() {
425                        text.push(b as u32);
426                    }
427                    text.push(*sigma + 1);
428                    vec![text]
429                } else {
430                    vec![]
431                }
432            }
433            Query::String(s) => {
434                let symbol = symbol.to_string() + "s";
435                if let Some(sigma) = self.symbols.get(&symbol).copied() {
436                    let mut text = vec![sigma];
437                    for c in s.chars() {
438                        text.push(c as u32);
439                    }
440                    text.push(sigma + 1);
441                    vec![text]
442                } else {
443                    vec![]
444                }
445            }
446            Query::Array(a) => {
447                assert!(a.len() <= 1);
448                let symbol = symbol.to_string() + "a";
449                if a.len() == 1 {
450                    self.translate_query_recursive(&a[0], &symbol)
451                } else if let Some(sigma) = self.symbols.get(&symbol).copied() {
452                    vec![vec![sigma]]
453                } else {
454                    vec![]
455                }
456            }
457            Query::Object(o) => {
458                assert!(o.len() <= 1);
459                if o.len() == 1 {
460                    let (k, q) = &o[0];
461                    let len = k.chars().count();
462                    let symbol = format!("{}ok{}{}", symbol, len, k);
463                    self.translate_query_recursive(q, &symbol)
464                } else if o.is_empty() {
465                    let symbol = symbol.to_string() + "o";
466                    if let Some(sigma) = self.symbols.get(&symbol).copied() {
467                        vec![vec![sigma]]
468                    } else {
469                        vec![]
470                    }
471                } else {
472                    unreachable!();
473                }
474            }
475            Query::Or(_) => {
476                panic!("do not translate disjunctions");
477            }
478        }
479    }
480
481    fn reverse_translate_keys(&self, mut text: &[u32], path: &str) -> Option<Value> {
482        let mut map = indicio::Map::default();
483        while !text.is_empty() {
484            let symbol = self.reverse_lookup(text[0])?;
485            let relative = symbol.strip_prefix(path)?;
486            if !relative.starts_with('k') {
487                return None;
488            }
489            let key_len: String = relative[1..]
490                .chars()
491                .take_while(char::is_ascii_digit)
492                .collect();
493            let key = &relative[1 + key_len.len()..];
494            let Ok(key_len) = usize::from_str(&key_len) else {
495                return None;
496            };
497            let key = &key[..key.len() - 1];
498            if key_len != key.len() {
499                return None;
500            }
501            if symbol.ends_with('T') || symbol.ends_with('F') {
502                let value =
503                    self.reverse_translate_recursive(&text[..1], &symbol[..symbol.len() - 1])?;
504                text = &text[1..];
505                map.insert(key.to_string(), value);
506            } else {
507                let position = text.iter().position(|t| *t == text[0] + 1)?;
508                let value = self.reverse_translate_recursive(
509                    &text[..position + 1],
510                    &symbol[..symbol.len() - 1],
511                )?;
512                text = &text[position + 1..];
513                map.insert(key.to_string(), value);
514            }
515        }
516        Some(Value::Object(map))
517    }
518
519    fn reverse_translate_array(&self, mut text: &[u32], path: &str) -> Option<Value> {
520        let mut values = vec![];
521        while !text.is_empty() {
522            let symbol = self.reverse_lookup(text[0])?;
523            if !symbol.starts_with(path) {
524                return None;
525            }
526            if symbol.ends_with('T') || symbol.ends_with('F') {
527                let value =
528                    self.reverse_translate_recursive(&text[..1], &symbol[..symbol.len() - 1])?;
529                text = &text[1..];
530                values.push(value);
531            } else {
532                let position = text.iter().position(|t| *t == text[0] + 1)?;
533                let value = self.reverse_translate_recursive(
534                    &text[..position + 1],
535                    &symbol[..symbol.len() - 1],
536                )?;
537                text = &text[position + 1..];
538                values.push(value);
539            }
540        }
541        Some(Value::from(values))
542    }
543
544    fn reverse_translate_recursive(&self, text: &[u32], path: &str) -> Option<Value> {
545        if text.is_empty() {
546            return None;
547        }
548        let symbol = self.reverse_lookup(text[0])?;
549        let relative = symbol.strip_prefix(path)?;
550        match relative {
551            "o" => self.reverse_translate_keys(&text[1..text.len() - 1], &(path.to_string() + "o")),
552            "a" => {
553                self.reverse_translate_array(&text[1..text.len() - 1], &(path.to_string() + "a"))
554            }
555            "s" => Some(Value::String(
556                text.iter().copied().flat_map(char::from_u32).collect(),
557            )),
558            "i" => {
559                if text.len() != 10 {
560                    return None;
561                }
562                let mut buf = [0u8; 8];
563                for (b, t) in std::iter::zip(buf.iter_mut(), text[1..9].iter()) {
564                    *b = *t as u8;
565                }
566                Some(Value::I64(i64::from_be_bytes(buf)))
567            }
568            "u" => {
569                if text.len() != 10 {
570                    return None;
571                }
572                let mut buf = [0u8; 8];
573                for (b, t) in std::iter::zip(buf.iter_mut(), text[1..9].iter()) {
574                    *b = *t as u8;
575                }
576                Some(Value::U64(u64::from_be_bytes(buf)))
577            }
578            "f" => {
579                if text.len() != 10 {
580                    return None;
581                }
582                let mut buf = [0u8; 8];
583                for (b, t) in std::iter::zip(buf.iter_mut(), text[1..9].iter()) {
584                    *b = *t as u8;
585                }
586                Some(Value::F64(f64::from_bits(u64::from_be_bytes(buf))))
587            }
588            "T" => Some(Value::Bool(true)),
589            "F" => Some(Value::Bool(false)),
590            _ => None,
591        }
592    }
593
594    fn lookup_symbol(&mut self, symbol: &str) -> u32 {
595        match self.symbols.entry(symbol.to_string()) {
596            Entry::Occupied(entry) => *entry.get(),
597            Entry::Vacant(entry) => {
598                let sym = self.next_symbol;
599                self.next_symbol += 2;
600                entry.insert(sym);
601                sym
602            }
603        }
604    }
605
606    fn reverse_lookup(&self, sym: u32) -> Option<&str> {
607        for (s, t) in self.symbols.iter() {
608            if *t == sym {
609                return Some(s);
610            }
611        }
612        None
613    }
614}
615
616impl Default for SymbolTable {
617    fn default() -> Self {
618        Self {
619            symbols: HashMap::new(),
620            next_symbol: 0x110000,
621        }
622    }
623}
624
625//////////////////////////////////// convert_clues_to_analogize ////////////////////////////////////
626
627#[allow(clippy::type_complexity)]
628fn group_by_second(
629    mut watermark: DateTime<Utc>,
630    clues: Vec<Clue>,
631) -> Result<Vec<(DateTime<Utc>, Vec<Clue>)>, Error> {
632    if clues.is_empty() {
633        return Err(Error::EmptyClueFile {
634            core: ErrorCore::default(),
635        });
636    }
637    let one_second = TimeDelta::try_seconds(1).expect("one second should always construct");
638    watermark = watermark.duration_trunc(one_second).unwrap();
639    let mut results = vec![];
640    for clue in clues {
641        let Some(ts) = DateTime::from_timestamp_millis(clue.timestamp as i64 / 1_000) else {
642            return Err(Error::InvalidTimestamp {
643                core: ErrorCore::default(),
644                what: clue.timestamp as i64,
645            });
646        };
647        while watermark <= ts {
648            results.push((watermark, vec![]));
649            watermark += one_second;
650        }
651        let len = results.len() - 1;
652        results[len].1.push(clue);
653    }
654    Ok(results)
655}
656
657#[allow(clippy::type_complexity)]
658fn convert_clues_to_analogize_inner(
659    sym_table: &mut SymbolTable,
660    start_time: DateTime<Utc>,
661    clues: Vec<Clue>,
662) -> Result<(Vec<u32>, Vec<usize>, Vec<usize>), Error> {
663    let mut text = vec![];
664    let mut record_boundaries = vec![];
665    let mut second_boundaries = vec![];
666    if clues.is_empty() {
667        second_boundaries.push(record_boundaries.len());
668        record_boundaries.push(text.len());
669        sym_table.append_dummy_record(&mut text);
670    }
671    if clues.is_empty() {
672        return Err(Error::EmptyClueFile {
673            core: ErrorCore::default(),
674        });
675    }
676    for (_, clues) in group_by_second(start_time, clues)? {
677        if clues.is_empty() {
678            second_boundaries.push(record_boundaries.len());
679            record_boundaries.push(text.len());
680            sym_table.append_dummy_record(&mut text);
681        } else {
682            for clue in clues {
683                record_boundaries.push(text.len());
684                sym_table.translate(clue, &mut text);
685            }
686            second_boundaries.push(record_boundaries.len() - 1);
687        }
688    }
689    second_boundaries.push(record_boundaries.len());
690    Ok((text, record_boundaries, second_boundaries))
691}
692
693pub fn convert_clues_to_analogize<P: AsRef<Path>>(
694    sym_table: &mut SymbolTable,
695    start_time: DateTime<Utc>,
696    clues: Vec<Clue>,
697    analogize: P,
698) -> Result<(), Error> {
699    let (text, record_boundaries, second_boundaries) =
700        convert_clues_to_analogize_inner(sym_table, start_time, clues)?;
701    let mut buf = Vec::new();
702    let mut builder = Builder::new(&mut buf);
703    let mut sub = builder.sub(FieldNumber::must(1));
704    CompressedDocument::construct(text, record_boundaries, &mut sub)?;
705    drop(sub);
706    let mut sub = builder.sub(FieldNumber::must(2));
707    BitVector::from_indices(
708        16,
709        second_boundaries[second_boundaries.len() - 1] + 1,
710        &second_boundaries,
711        &mut sub,
712    )
713    .ok_or(scrunch::Error::InvalidBitVector)?;
714    drop(sub);
715    drop(builder);
716    std::fs::write(analogize.as_ref(), buf)?;
717    Ok(())
718}
719
720///////////////////////////////////////// AnalogizeDocument ////////////////////////////////////////
721
722#[derive(Clone, Debug, Default, prototk_derive::Message)]
723struct AnalogizeDocumentStub<'a> {
724    #[prototk(1, bytes)]
725    document: &'a [u8],
726    #[prototk(2, bytes)]
727    timeline: &'a [u8],
728}
729
730struct AnalogizeDocument<'a> {
731    document: CompressedDocument<'a>,
732    #[allow(dead_code)]
733    timeline: BitVector<'a>,
734}
735
736impl AnalogizeDocument<'_> {
737    fn query(&self, syms: &SymbolTable, query: &Query) -> Result<HashSet<RecordOffset>, Error> {
738        let records = if let Query::Or(subqueries) = query {
739            let mut records = HashSet::new();
740            for query in subqueries {
741                for offset in self.query(syms, query)? {
742                    records.insert(offset);
743                }
744            }
745            records
746        } else {
747            let mut results = HashSet::new();
748            let mut needles = vec![];
749            for conjunction in query.clone().conjunctions() {
750                needles.append(&mut syms.translate_query(&conjunction));
751            }
752            let mut needles = needles.into_iter();
753            if let Some(needle) = needles.next() {
754                for offset in self.document.search(&needle)? {
755                    results.insert(self.document.lookup(offset)?);
756                }
757            }
758            for needle in needles {
759                let inner = std::mem::take(&mut results);
760                for offset in self.document.search(&needle)? {
761                    let offset = self.document.lookup(offset)?;
762                    if inner.contains(&offset) {
763                        results.insert(offset);
764                    }
765                }
766            }
767            results
768        };
769        Ok(records)
770    }
771}
772
773impl<'a> Unpackable<'a> for AnalogizeDocument<'a> {
774    type Error = Error;
775
776    fn unpack<'b: 'a>(buf: &'b [u8]) -> Result<(Self, &'b [u8]), Self::Error> {
777        let (stub, buf) = AnalogizeDocumentStub::unpack(buf).map_err(|_| Error::Scrunch {
778            core: ErrorCore::default(),
779            what: scrunch::Error::InvalidDocument,
780        })?;
781        let document = CompressedDocument::unpack(stub.document)?.0;
782        let timeline = BitVector::parse(stub.timeline)?.0;
783        Ok((AnalogizeDocument { document, timeline }, buf))
784    }
785}
786
787/////////////////////////////////////////////// Query //////////////////////////////////////////////
788
789#[derive(Clone, Debug, Default)]
790pub enum Query {
791    #[default]
792    Any,
793    True,
794    False,
795    I64(i64),
796    U64(u64),
797    F64(f64),
798    String(String),
799    Array(Vec<Query>),
800    Object(Vec<(String, Query)>),
801    Or(Vec<Query>),
802}
803
804impl Query {
805    pub fn parse<S: AsRef<str>>(query: S) -> Result<Self, Error> {
806        Ok(parser::parse_all(parser::query)(query.as_ref())?)
807    }
808
809    pub fn must<S: AsRef<str>>(query: S) -> Self {
810        Query::parse(query).expect("query should parse")
811    }
812
813    pub fn normalize(self) -> Query {
814        match self {
815            Query::Any
816            | Query::True
817            | Query::False
818            | Query::I64(_)
819            | Query::U64(_)
820            | Query::F64(_)
821            | Query::String(_) => self,
822            Query::Array(subqueries) => Self::normalize_array(subqueries),
823            Query::Object(subqueries) => Self::normalize_object(subqueries),
824            Query::Or(subqueries) => Self::normalize_or(subqueries),
825        }
826    }
827
828    fn conjunctions(self) -> impl Iterator<Item = Query> {
829        let answer: Box<dyn Iterator<Item = Query>> = match self {
830            Query::Any
831            | Query::True
832            | Query::False
833            | Query::I64(_)
834            | Query::U64(_)
835            | Query::F64(_)
836            | Query::String(_) => Box::new(vec![self].into_iter()),
837            Query::Array(subqueries) => {
838                if subqueries.is_empty() {
839                    Box::new(vec![Query::Array(vec![])].into_iter())
840                } else {
841                    Box::new(
842                        subqueries
843                            .into_iter()
844                            .flat_map(|q| q.conjunctions())
845                            .map(|q| Query::Array(vec![q])),
846                    )
847                }
848            }
849            Query::Object(subqueries) => {
850                if subqueries.is_empty() {
851                    Box::new(vec![Query::Object(vec![])].into_iter())
852                } else {
853                    let mut results = vec![];
854                    for (s, q) in subqueries.into_iter() {
855                        for q in q.conjunctions() {
856                            results.push(Query::Object(vec![(s.clone(), q)]));
857                        }
858                    }
859                    Box::new(results.into_iter())
860                }
861            }
862            Query::Or(_) => {
863                // TODO(rescrv): Do better here.
864                panic!("calling conjunctions on Or clause");
865            }
866        };
867        answer
868    }
869
870    fn normalize_mut(query: &mut Query) {
871        let q = std::mem::take(query);
872        *query = Self::normalize(q);
873    }
874
875    fn normalize_array(subqueries: Vec<Query>) -> Query {
876        if subqueries.is_empty() {
877            return Query::Array(subqueries);
878        }
879        let mut disjunctions: Vec<Vec<Query>> = vec![vec![]];
880        for subquery in subqueries.into_iter() {
881            let subquery = Self::normalize(subquery);
882            if let Query::Or(subqueries) = subquery {
883                let inner = std::mem::take(&mut disjunctions);
884                for subquery in subqueries.into_iter() {
885                    for inner in inner.iter() {
886                        let mut inner = inner.clone();
887                        inner.push(subquery.clone());
888                        disjunctions.push(inner);
889                    }
890                }
891            } else {
892                for disjunction in disjunctions.iter_mut() {
893                    disjunction.push(subquery.clone());
894                }
895            }
896        }
897        if disjunctions.len() > 1 {
898            Query::Or(disjunctions.into_iter().map(Query::Array).collect())
899        } else if let Some(subqueries) = disjunctions.pop() {
900            Query::Array(subqueries)
901        } else {
902            unreachable!();
903        }
904    }
905
906    fn normalize_object(subqueries: Vec<(String, Query)>) -> Query {
907        if subqueries.is_empty() {
908            return Query::Object(subqueries);
909        }
910        let mut disjunctions: Vec<Vec<(String, Query)>> = vec![vec![]];
911        for (key, subquery) in subqueries.into_iter() {
912            let subquery = Self::normalize(subquery);
913            if let Query::Or(subqueries) = subquery {
914                let inner = std::mem::take(&mut disjunctions);
915                for subquery in subqueries.into_iter() {
916                    for inner in inner.iter() {
917                        let mut inner = inner.clone();
918                        inner.push((key.clone(), subquery.clone()));
919                        disjunctions.push(inner);
920                    }
921                }
922            } else {
923                for disjunction in disjunctions.iter_mut() {
924                    disjunction.push((key.clone(), subquery.clone()));
925                }
926            }
927        }
928        if disjunctions.len() > 1 {
929            Query::Or(disjunctions.into_iter().map(Query::Object).collect())
930        } else if let Some(subqueries) = disjunctions.pop() {
931            Query::Object(subqueries)
932        } else {
933            unreachable!();
934        }
935    }
936
937    fn normalize_or(mut subqueries: Vec<Query>) -> Query {
938        subqueries.iter_mut().for_each(Query::normalize_mut);
939        subqueries.sort_by_key(|x| if let Query::Or(_) = *x { 1 } else { 0 });
940        let partition = subqueries.partition_point(|x| !matches!(x, Query::Or(_)));
941        let disjunctions = subqueries.split_off(partition);
942        for disjunction in disjunctions.into_iter() {
943            if let Query::Or(mut subq) = disjunction {
944                subqueries.append(&mut subq);
945            } else {
946                unreachable!();
947            }
948        }
949        Query::Or(subqueries)
950    }
951}
952
953impl Eq for Query {}
954
955impl PartialEq for Query {
956    fn eq(&self, query: &Query) -> bool {
957        match (self, query) {
958            (Query::Any, Query::Any) => true,
959            (Query::True, Query::True) => true,
960            (Query::False, Query::False) => true,
961            (Query::I64(x), Query::I64(y)) => x == y,
962            (Query::U64(x), Query::U64(y)) => x == y,
963            (Query::F64(x), Query::F64(y)) => x.total_cmp(y).is_eq(),
964            (Query::String(x), Query::String(y)) => x == y,
965            (Query::Array(x), Query::Array(y)) => x == y,
966            (Query::Object(x), Query::Object(y)) => x == y,
967            (Query::Or(x), Query::Or(y)) => x == y,
968            _ => false,
969        }
970    }
971}
972
973////////////////////////////////////////// DocumentMapping /////////////////////////////////////////
974
975struct DocumentMapping {
976    data: *mut c_void,
977    size: usize,
978}
979
980impl DocumentMapping {
981    fn doc(&self) -> Result<AnalogizeDocument, Error> {
982        // SAFETY(rescrv):  We only ever refer to this region of memory as a slice of u8.
983        let buf = unsafe { std::slice::from_raw_parts(self.data as *const u8, self.size) };
984        Ok(AnalogizeDocument::unpack(buf)?.0)
985    }
986}
987
988impl Drop for DocumentMapping {
989    fn drop(&mut self) {
990        // SAFETY(rescrv): It will always be a valid mapping.
991        unsafe {
992            libc::munmap(self.data, self.size);
993        }
994    }
995}
996
997unsafe impl Send for DocumentMapping {}
998unsafe impl Sync for DocumentMapping {}
999
1000/////////////////////////////////////////////// State //////////////////////////////////////////////
1001
1002struct State {
1003    options: AnalogizeOptions,
1004    done: AtomicBool,
1005    logs: PathBuf,
1006    data: PathBuf,
1007    mani: Mutex<Manifest>,
1008    syms: Mutex<SymbolTable>,
1009    docs: Mutex<HashMap<String, Arc<DocumentMapping>>>,
1010}
1011
1012impl State {
1013    fn new(
1014        options: AnalogizeOptions,
1015        logs: PathBuf,
1016        data: PathBuf,
1017        mani: Manifest,
1018    ) -> Result<Self, Error> {
1019        let done = AtomicBool::new(false);
1020        let syms = if data.join("symbols").exists() {
1021            SymbolTable::from_file(data.join("symbols"))?
1022        } else {
1023            SymbolTable::default()
1024        };
1025        let mani = Mutex::new(mani);
1026        let syms = Mutex::new(syms);
1027        let docs = Mutex::new(HashMap::new());
1028        Ok(Self {
1029            options,
1030            done,
1031            logs,
1032            data,
1033            mani,
1034            syms,
1035            docs,
1036        })
1037    }
1038
1039    fn background(self: Arc<Self>) {
1040        let mut workers = Vec::with_capacity(self.options.threads);
1041        for _ in 0..self.options.threads {
1042            let this = Arc::clone(&self);
1043            workers.push(std::thread::spawn(move || this.worker()));
1044        }
1045        while !self.done.load(Ordering::Relaxed) {
1046            sleep(Duration::from_millis(100));
1047            // TODO(rescrv): Log errors from ingest in a shell-compatible way.
1048            self.try_ingest().expect("try ingest should never fail");
1049        }
1050        for worker in workers.into_iter() {
1051            let _ = worker.join();
1052        }
1053    }
1054
1055    fn worker(self: Arc<Self>) {}
1056
1057    fn get_documents(&self) -> Result<Vec<Arc<DocumentMapping>>, Error> {
1058        let mani = self.mani.lock().unwrap();
1059        fn select_data(s: &str) -> Option<String> {
1060            s.strip_prefix("data:").map(String::from)
1061        }
1062        let docs: Vec<_> = mani.strs().filter_map(select_data).collect();
1063        let mut mappings = Vec::with_capacity(docs.len());
1064        for doc in docs {
1065            mappings.push(self.get_document(&doc)?);
1066        }
1067        Ok(mappings)
1068    }
1069
1070    fn get_document(&self, doc: &str) -> Result<Arc<DocumentMapping>, Error> {
1071        let mut docs = self.docs.lock().unwrap();
1072        if let Some(doc) = docs.get(doc) {
1073            return Ok(Arc::clone(doc));
1074        }
1075        let path = self.data.join(doc);
1076        let file = File::open(path)?;
1077        let md = file.metadata()?;
1078        if md.len() > usize::MAX as u64 {
1079            return Err(Error::FileTooLarge {
1080                core: ErrorCore::default(),
1081            });
1082        }
1083        #[cfg(not(target_os = "macos"))]
1084        unsafe fn mmap(len: usize, file: RawFd) -> *mut c_void {
1085            libc::mmap64(
1086                std::ptr::null_mut(),
1087                len,
1088                libc::PROT_READ,
1089                libc::MAP_SHARED,
1090                file,
1091                0,
1092            )
1093        }
1094        #[cfg(target_os = "macos")]
1095        unsafe fn mmap(len: usize, file: RawFd) -> *mut c_void {
1096            libc::mmap(
1097                std::ptr::null_mut(),
1098                len,
1099                libc::PROT_READ,
1100                libc::MAP_SHARED,
1101                file,
1102                0,
1103            )
1104        }
1105        // SAFETY(rescrv):  We treat this mapping with respect and only unmap if it's valid.
1106        let mapping = unsafe { mmap(md.len() as usize, file.as_raw_fd()) };
1107        if mapping == libc::MAP_FAILED {
1108            return Err(std::io::Error::last_os_error().into());
1109        }
1110        let mapping = Arc::new(DocumentMapping {
1111            data: mapping,
1112            size: md.len() as usize,
1113        });
1114        docs.insert(doc.to_string(), Arc::clone(&mapping));
1115        Ok(mapping)
1116    }
1117
1118    fn try_ingest(&self) -> Result<(), Error> {
1119        self.log_to_ingest()?;
1120        self.convert_ingested_logs()?;
1121        Ok(())
1122    }
1123
1124    fn log_to_ingest(&self) -> Result<(), Error> {
1125        let mut mani = self.mani.lock().unwrap();
1126        let threshold_ns = mani.info('L').unwrap_or("0");
1127        let threshold_ns =
1128            i64::from_str(threshold_ns).map_err(|_| Error::InvalidNumberLiteral {
1129                core: ErrorCore::default(),
1130                as_str: threshold_ns.to_string(),
1131            })?;
1132        let (logs_to_ingest, threshold_ns) = take_consistent_cut(&self.logs, threshold_ns)?;
1133        if logs_to_ingest.is_empty() {
1134            return Ok(());
1135        }
1136        let mut edit = Edit::default();
1137        edit.info('L', &format!("{}", threshold_ns))?;
1138        for log in logs_to_ingest.iter() {
1139            edit.add(&format!("log:{}", log))?;
1140        }
1141        mani.apply(edit)?;
1142        Ok(())
1143    }
1144
1145    fn convert_ingested_logs(&self) -> Result<(), Error> {
1146        // First, read the manifest to figure out what JSON needs to be ingested.
1147        let (log_inputs, file_number): (Vec<String>, String) = {
1148            let mani = self.mani.lock().unwrap();
1149            fn select_logs(s: &str) -> Option<String> {
1150                s.strip_prefix("log:").map(String::from)
1151            }
1152            (
1153                mani.strs().filter_map(select_logs).collect(),
1154                mani.info('F').unwrap_or("0").to_string(),
1155            )
1156        };
1157        if log_inputs.is_empty() {
1158            return Ok(());
1159        }
1160        let file_number = u64::from_str(&file_number).map_err(|_| Error::InvalidNumberLiteral {
1161            core: ErrorCore::default(),
1162            as_str: file_number,
1163        })?;
1164        // Second, build the analogize file for all the files at once.
1165        let mut clues = vec![];
1166        for input in log_inputs.iter() {
1167            let buf = std::fs::read(self.logs.join(input))?;
1168            let mut cv = ClueVector::unpack(&buf)?.0;
1169            clues.append(&mut cv.clues);
1170        }
1171        let mut edit = Edit::default();
1172        edit.info('F', &(file_number + 1).to_string())?;
1173        if !clues.is_empty() {
1174            let mut syms = self.syms.lock().unwrap();
1175            let start_time = clues.iter().map(|x| x.timestamp).min().unwrap_or(0);
1176            let end_time = clues.iter().map(|x| x.timestamp).max().unwrap_or(0);
1177            let start_time = DateTime::from_timestamp_millis(start_time as i64 / 1_000).ok_or(
1178                Error::InvalidTimestamp {
1179                    core: ErrorCore::default(),
1180                    what: start_time as i64,
1181                },
1182            )?;
1183            let end_time = DateTime::from_timestamp_millis(end_time as i64 / 1_000).ok_or(
1184                Error::InvalidTimestamp {
1185                    core: ErrorCore::default(),
1186                    what: end_time as i64,
1187                },
1188            )?;
1189            let output_path = format!(
1190                "{}_{}_{}.analogize",
1191                date_time_to_string(start_time),
1192                date_time_to_string(end_time),
1193                file_number
1194            );
1195            convert_clues_to_analogize(&mut syms, start_time, clues, self.data.join(&output_path))?;
1196            let syms_tmp = format!("symbols.{}", Utc::now().timestamp());
1197            let syms_tmp = self.data.join(syms_tmp);
1198            syms.to_file(&syms_tmp)?;
1199            rename(syms_tmp, self.data.join("symbols"))?;
1200            edit.add(&format!("data:{}", output_path))?;
1201        }
1202        for input in log_inputs.iter() {
1203            edit.rm(&format!("log:{}", input))?;
1204        }
1205        let mut mani = self.mani.lock().unwrap();
1206        mani.apply(edit)?;
1207        Ok(())
1208    }
1209
1210    fn done(&self) {
1211        self.done.store(true, Ordering::Relaxed);
1212    }
1213}
1214
1215///////////////////////////////////////// AnalogizeOptions /////////////////////////////////////////
1216
1217#[derive(Clone, Debug, Eq, PartialEq, arrrg_derive::CommandLine)]
1218pub struct AnalogizeOptions {
1219    #[arrrg(required, "Path to indicio log files.")]
1220    logs: String,
1221    #[arrrg(required, "Path to analogize data files.")]
1222    data: String,
1223    #[arrrg(nested)]
1224    mani: ManifestOptions,
1225    #[arrrg(optional, "Number of background worker threads to spawn.")]
1226    threads: usize,
1227}
1228
1229impl AnalogizeOptions {
1230    pub fn data(&self) -> &str {
1231        &self.data
1232    }
1233}
1234
1235impl Default for AnalogizeOptions {
1236    fn default() -> Self {
1237        Self {
1238            logs: "logs".to_string(),
1239            data: "data".to_string(),
1240            mani: ManifestOptions::default(),
1241            threads: 8,
1242        }
1243    }
1244}
1245
1246///////////////////////////////////////////// Analogize ////////////////////////////////////////////
1247
1248pub struct Analogize {
1249    state: Arc<State>,
1250    thread: Option<JoinHandle<()>>,
1251}
1252
1253impl Analogize {
1254    pub fn new(options: AnalogizeOptions) -> Result<Self, Error> {
1255        let logs = PathBuf::from(&options.logs);
1256        if !logs.exists() || !logs.is_dir() {
1257            return Err(Error::DirectoryNotFound {
1258                core: ErrorCore::default(),
1259                what: options.logs,
1260            });
1261        }
1262        let data = PathBuf::from(&options.data);
1263        let mani = Manifest::open(options.mani.clone(), data.clone())?;
1264        let state = Arc::new(State::new(options, logs, data, mani)?);
1265        let state_p = Arc::clone(&state);
1266        let thread = Some(std::thread::spawn(move || state_p.background()));
1267        let this = Self { state, thread };
1268        Ok(this)
1269    }
1270
1271    pub fn query(&self, query: Query) -> Result<Vec<Value>, Error> {
1272        let query = query.normalize();
1273        let docs = self.state.get_documents()?;
1274        let mut values = vec![];
1275        for doc in docs {
1276            let doc = doc.doc()?;
1277            let syms = self.state.syms.lock().unwrap();
1278            let mut records: Vec<RecordOffset> = doc.query(&syms, &query)?.into_iter().collect();
1279            records.sort();
1280            for record in records.into_iter() {
1281                let Ok(record) = doc.document.retrieve(record) else {
1282                    // TODO(rescrv): report error
1283                    continue;
1284                };
1285                let Some(value) = syms.reverse_translate(&record) else {
1286                    // TODO(rescrv): report error
1287                    continue;
1288                };
1289                values.push(value);
1290            }
1291        }
1292        Ok(values)
1293    }
1294
1295    pub fn exemplars(&self, num_results: usize) -> Result<Vec<Value>, Error> {
1296        let doc_ptrs = self.state.get_documents()?;
1297        let mut docs = vec![];
1298        for ptr in doc_ptrs.iter() {
1299            docs.push(ptr.doc()?);
1300        }
1301        let doc_refs: Vec<&CompressedDocument> = docs.iter().map(|d| &d.document).collect();
1302        let syms = self.state.syms.lock().unwrap();
1303        let markers: Vec<_> = syms.markers().collect();
1304        let mut values = vec![];
1305        for exemplar in scrunch::exemplars(&doc_refs, &markers).take(num_results) {
1306            if let Some(exemplar) = syms.reverse_translate_query(exemplar.text()) {
1307                values.push(exemplar);
1308            } else {
1309                // TODO(rescrv): report error
1310            }
1311        }
1312        Ok(values)
1313    }
1314
1315    pub fn correlates(&self, query: Query, num_results: usize) -> Result<Vec<Value>, Error> {
1316        let doc_ptrs = self.state.get_documents()?;
1317        let mut docs = vec![];
1318        for ptr in doc_ptrs.iter() {
1319            docs.push(ptr.doc()?);
1320        }
1321        let doc_refs: Vec<&CompressedDocument> = docs.iter().map(|d| &d.document).collect();
1322        let syms = self.state.syms.lock().unwrap();
1323        let markers: Vec<_> = syms.markers().collect();
1324        let mut offsets: HashMap<usize, HashSet<RecordOffset>> = HashMap::new();
1325        for (idx, doc) in docs.iter().enumerate() {
1326            let records = doc.query(&syms, &query)?;
1327            offsets.insert(idx, records);
1328        }
1329        let mut values = vec![];
1330        for exemplar in scrunch::correlate(&doc_refs, &markers, move |idx, offset| {
1331            offsets
1332                .get(&idx)
1333                .map(|r| r.get(&offset))
1334                .map(|o| o.is_some())
1335                .unwrap_or(false)
1336        })
1337        .take(num_results)
1338        {
1339            if let Some(exemplar) = syms.reverse_translate_query(exemplar.text()) {
1340                values.push(exemplar);
1341            } else {
1342                // TODO(rescrv): report error
1343            }
1344        }
1345        Ok(values)
1346    }
1347}
1348
1349impl Drop for Analogize {
1350    fn drop(&mut self) {
1351        self.state.done();
1352        self.thread.take().map(|t| t.join());
1353    }
1354}
1355
1356/////////////////////////////////////////////// utils //////////////////////////////////////////////
1357
1358fn ctime(md: &Metadata) -> i64 {
1359    md.ctime()
1360        .wrapping_mul(1_000_000_000i64)
1361        .wrapping_add(md.ctime_nsec())
1362}
1363
1364fn take_consistent_cut<P: AsRef<Path>>(
1365    dir: P,
1366    threshold_ns: i64,
1367) -> Result<(Vec<String>, i64), Error> {
1368    loop {
1369        let mut new_threshold_ns = threshold_ns;
1370        let md1 = metadata(dir.as_ref())?;
1371        let mut paths = vec![];
1372        for dirent in read_dir(dir.as_ref())? {
1373            let dirent = dirent?;
1374            let md = dirent.metadata()?;
1375            let ts_ns = ctime(&md);
1376            if ts_ns > threshold_ns {
1377                let mut path = dirent.path();
1378                if path.starts_with(dir.as_ref()) {
1379                    path = path
1380                        .strip_prefix(dir.as_ref())
1381                        .map_err(|_| Error::InvalidPath {
1382                            core: ErrorCore::default(),
1383                            what: format!("path {} prefix won't strip", path.to_string_lossy()),
1384                        })?
1385                        .to_path_buf();
1386                }
1387                let display = path.to_string_lossy().to_string();
1388                if PathBuf::from(&display) != path {
1389                    return Err(Error::InvalidPath {
1390                        core: ErrorCore::default(),
1391                        what: format!("path {} contains invalid characters", display),
1392                    });
1393                }
1394                paths.push(display);
1395                new_threshold_ns = std::cmp::max(ts_ns, new_threshold_ns);
1396            }
1397        }
1398        let md2 = metadata(dir.as_ref())?;
1399        let ctime1 = ctime(&md1);
1400        let ctime2 = ctime(&md2);
1401        if ctime1 == ctime2 {
1402            return Ok((paths, new_threshold_ns));
1403        }
1404    }
1405}
1406
1407fn date_time_to_string(when: DateTime<Utc>) -> String {
1408    when.to_rfc3339_opts(chrono::format::SecondsFormat::Secs, true)
1409}
1410
1411/////////////////////////////////////////////// tests //////////////////////////////////////////////
1412
1413#[cfg(test)]
1414mod tests {
1415    use indicio::{value, Clue};
1416
1417    use super::*;
1418
1419    fn test_case(sym_table: &mut SymbolTable, value: Value) -> Vec<u32> {
1420        let mut translated: Vec<u32> = vec![];
1421        sym_table.translate_recursive(&value, "", &mut translated);
1422        translated
1423    }
1424
1425    #[test]
1426    fn bool_true() {
1427        let mut sym_table = SymbolTable::default();
1428        let expected = vec![0x110000u32, 0x110002, 0x110001];
1429        let returned = test_case(&mut sym_table, value!({key: true}));
1430        assert_eq!(expected, returned);
1431        assert_eq!(
1432            HashMap::from([
1433                ("o".to_string(), 0x110000),
1434                ("ok3keyT".to_string(), 0x110002),
1435            ]),
1436            sym_table.symbols
1437        );
1438    }
1439
1440    #[test]
1441    fn bool_false() {
1442        let mut sym_table = SymbolTable::default();
1443        let expected = vec![0x110000u32, 0x110002, 0x110001];
1444        let returned = test_case(&mut sym_table, value!({key: false}));
1445        assert_eq!(expected, returned);
1446        assert_eq!(
1447            HashMap::from([
1448                ("o".to_string(), 0x110000),
1449                ("ok3keyF".to_string(), 0x110002),
1450            ]),
1451            sym_table.symbols
1452        );
1453    }
1454
1455    #[test]
1456    fn number_i64() {
1457        let mut sym_table = SymbolTable::default();
1458        let expected = vec![
1459            0x110000u32,
1460            0x110002,
1461            127,
1462            255,
1463            255,
1464            255,
1465            255,
1466            255,
1467            255,
1468            255,
1469            0x110003,
1470            0x110001,
1471        ];
1472        let returned = test_case(&mut sym_table, value!({key: i64::MAX}));
1473        assert_eq!(expected, returned);
1474        assert_eq!(
1475            HashMap::from([
1476                ("o".to_string(), 0x110000),
1477                ("ok3keyi".to_string(), 0x110002),
1478            ]),
1479            sym_table.symbols
1480        );
1481    }
1482
1483    #[test]
1484    fn number_u64() {
1485        let mut sym_table = SymbolTable::default();
1486        let expected = vec![
1487            0x110000u32,
1488            0x110002,
1489            255,
1490            255,
1491            255,
1492            255,
1493            255,
1494            255,
1495            255,
1496            255,
1497            0x110003,
1498            0x110001,
1499        ];
1500        let returned = test_case(&mut sym_table, value!({key: u64::MAX}));
1501        assert_eq!(expected, returned);
1502        assert_eq!(
1503            HashMap::from([
1504                ("o".to_string(), 0x110000),
1505                ("ok3keyu".to_string(), 0x110002),
1506            ]),
1507            sym_table.symbols
1508        );
1509    }
1510
1511    #[test]
1512    fn number_f64() {
1513        let mut sym_table = SymbolTable::default();
1514        let expected = vec![
1515            0x110000u32,
1516            0x110002,
1517            64,
1518            9,
1519            33,
1520            251,
1521            84,
1522            68,
1523            45,
1524            24,
1525            0x110003,
1526            0x110001,
1527        ];
1528        let returned = test_case(&mut sym_table, value!({key: std::f64::consts::PI}));
1529        assert_eq!(expected, returned);
1530        assert_eq!(
1531            HashMap::from([
1532                ("o".to_string(), 0x110000),
1533                ("ok3keyf".to_string(), 0x110002),
1534            ]),
1535            sym_table.symbols
1536        );
1537    }
1538
1539    #[test]
1540    fn string() {
1541        let mut sym_table = SymbolTable::default();
1542        let expected = vec![
1543            0x110000u32,
1544            0x110002,
1545            'v' as u32,
1546            'a' as u32,
1547            'l' as u32,
1548            'u' as u32,
1549            'e' as u32,
1550            0x110003,
1551            0x110001,
1552        ];
1553        let returned = test_case(&mut sym_table, value!({key: "value"}));
1554        assert_eq!(expected, returned);
1555        assert_eq!(
1556            HashMap::from([
1557                ("o".to_string(), 0x110000),
1558                ("ok3keys".to_string(), 0x110002),
1559            ]),
1560            sym_table.symbols
1561        );
1562    }
1563
1564    #[test]
1565    fn array() {
1566        let mut sym_table = SymbolTable::default();
1567        let expected = vec![
1568            0x110000u32,
1569            0x110002,
1570            0x110004, //
1571            'v' as u32,
1572            'a' as u32,
1573            'l' as u32,
1574            'u' as u32,
1575            'e' as u32,
1576            '1' as u32, //
1577            0x110005,
1578            0x110004, //
1579            'v' as u32,
1580            'a' as u32,
1581            'l' as u32,
1582            'u' as u32,
1583            'e' as u32,
1584            '2' as u32, //
1585            0x110005,
1586            0x110003,
1587            0x110001, //
1588        ];
1589        let returned = test_case(&mut sym_table, value!({key: ["value1", "value2"]}));
1590        assert_eq!(expected, returned);
1591        assert_eq!(
1592            HashMap::from([
1593                ("o".to_string(), 0x110000),
1594                ("ok3keya".to_string(), 0x110002),
1595                ("ok3keyas".to_string(), 0x110004),
1596            ]),
1597            sym_table.symbols
1598        );
1599    }
1600
1601    #[test]
1602    fn object() {
1603        let mut sym_table = SymbolTable::default();
1604        let expected = vec![
1605            0x110000u32,
1606            0x110002,
1607            0x110004,
1608            'v' as u32,
1609            'a' as u32,
1610            'l' as u32,
1611            'u' as u32,
1612            'e' as u32,
1613            0x110005,
1614            0x110003,
1615            0x110001,
1616        ];
1617        let returned = test_case(&mut sym_table, value!({key: {key: "value"}}));
1618        assert_eq!(expected, returned);
1619        assert_eq!(
1620            HashMap::from([
1621                ("o".to_string(), 0x110000),
1622                ("ok3keyo".to_string(), 0x110002),
1623                ("ok3keyok3keys".to_string(), 0x110004),
1624            ]),
1625            sym_table.symbols
1626        );
1627    }
1628
1629    fn parse_dt(dt: &str) -> DateTime<Utc> {
1630        DateTime::parse_from_rfc3339(dt).unwrap().to_utc()
1631    }
1632
1633    fn make_clue(dt: &str, value: Value) -> Clue {
1634        Clue {
1635            file: "test_file".to_string(),
1636            line: 42,
1637            level: 0,
1638            timestamp: parse_dt(dt).timestamp_nanos_opt().unwrap() as u64 / 1_000,
1639            value,
1640        }
1641    }
1642
1643    #[test]
1644    fn seal_empty() {
1645        let mut sym_table = SymbolTable::default();
1646        let start_time = parse_dt("2024-02-16T15:10:00Z");
1647        assert!(convert_clues_to_analogize_inner(&mut sym_table, start_time, vec![]).is_err());
1648    }
1649
1650    #[test]
1651    fn seal_record_in_first_second() {
1652        let mut sym_table = SymbolTable::default();
1653        let start_time = parse_dt("2024-02-16T15:10:00Z");
1654        let clues = vec![make_clue("2024-02-16T15:10:00.01Z", value!({key: "value"}))];
1655        let (_, record_boundaries, second_boundaries) =
1656            convert_clues_to_analogize_inner(&mut sym_table, start_time, clues).unwrap();
1657        assert_eq!(vec![0], record_boundaries);
1658        assert_eq!(vec![0, 1], second_boundaries);
1659    }
1660
1661    #[test]
1662    fn seal_one_record_per_second() {
1663        let mut sym_table = SymbolTable::default();
1664        let clues = vec![
1665            make_clue("2024-02-16T15:10:00.01Z", value!({key: "value0"})),
1666            make_clue("2024-02-16T15:10:01.01Z", value!({key: "value1"})),
1667            make_clue("2024-02-16T15:10:02.01Z", value!({key: "value2"})),
1668        ];
1669        let start_time = parse_dt("2024-02-16T15:10:00Z");
1670        let (_, record_boundaries, second_boundaries) =
1671            convert_clues_to_analogize_inner(&mut sym_table, start_time, clues).unwrap();
1672        assert_eq!(vec![0, 53, 106], record_boundaries);
1673        assert_eq!(vec![0, 1, 2, 3], second_boundaries);
1674    }
1675
1676    #[test]
1677    fn seal_gap_at_beginning() {
1678        let mut sym_table = SymbolTable::default();
1679        let clues = vec![
1680            make_clue("2024-02-16T15:10:01.01Z", value!({key: "value1"})),
1681            make_clue("2024-02-16T15:10:02.01Z", value!({key: "value2"})),
1682            make_clue("2024-02-16T15:10:03.01Z", value!({key: "value3"})),
1683        ];
1684        let start_time = parse_dt("2024-02-16T15:10:00Z");
1685        let (_, record_boundaries, second_boundaries) =
1686            convert_clues_to_analogize_inner(&mut sym_table, start_time, clues).unwrap();
1687        assert_eq!(vec![0, 1, 54, 107], record_boundaries);
1688        assert_eq!(vec![0, 1, 2, 3, 4], second_boundaries);
1689    }
1690
1691    #[test]
1692    fn seal_gap_after_first_record() {
1693        let mut sym_table = SymbolTable::default();
1694        let clues = vec![
1695            make_clue("2024-02-16T15:10:00.01Z", value!({key: "value0"})),
1696            make_clue("2024-02-16T15:10:02.01Z", value!({key: "value2"})),
1697            make_clue("2024-02-16T15:10:03.01Z", value!({key: "value3"})),
1698        ];
1699        let start_time = parse_dt("2024-02-16T15:10:00Z");
1700        let (_, record_boundaries, second_boundaries) =
1701            convert_clues_to_analogize_inner(&mut sym_table, start_time, clues).unwrap();
1702        assert_eq!(vec![0, 53, 54, 107], record_boundaries);
1703        assert_eq!(vec![0, 1, 2, 3, 4], second_boundaries);
1704    }
1705
1706    #[test]
1707    fn seal_multiple_records_per_second() {
1708        let mut sym_table = SymbolTable::default();
1709        let clues = vec![
1710            make_clue("2024-02-16T15:10:00.01Z", value!({key: "value1"})),
1711            make_clue("2024-02-16T15:10:00.02Z", value!({key: "value2"})),
1712            make_clue("2024-02-16T15:10:00.03Z", value!({key: "value3"})),
1713            make_clue("2024-02-16T15:10:01.04Z", value!({key: "value4"})),
1714        ];
1715        let start_time = parse_dt("2024-02-16T15:10:00Z");
1716        let (_, record_boundaries, second_boundaries) =
1717            convert_clues_to_analogize_inner(&mut sym_table, start_time, clues).unwrap();
1718        assert_eq!(vec![0, 53, 106, 159], record_boundaries);
1719        assert_eq!(vec![2, 3, 4], second_boundaries);
1720    }
1721
1722    #[test]
1723    fn seal_multiple_records_per_second_with_gaps() {
1724        let mut sym_table = SymbolTable::default();
1725        let clues = vec![
1726            make_clue("2024-02-16T15:10:01.01Z", value!({key: "value1"})),
1727            make_clue("2024-02-16T15:10:01.02Z", value!({key: "value2"})),
1728            make_clue("2024-02-16T15:10:01.03Z", value!({key: "value3"})),
1729            make_clue("2024-02-16T15:10:03.04Z", value!({key: "value4"})),
1730        ];
1731        let start_time = parse_dt("2024-02-16T15:10:00Z");
1732        let (_, record_boundaries, second_boundaries) =
1733            convert_clues_to_analogize_inner(&mut sym_table, start_time, clues).unwrap();
1734        assert_eq!(vec![0, 1, 54, 107, 160, 161], record_boundaries);
1735        assert_eq!(vec![0, 3, 4, 5, 6], second_boundaries);
1736    }
1737
1738    #[test]
1739    fn query_no_normalization_expected() {
1740        assert_eq!(Query::Any, Query::Any.normalize());
1741        assert_eq!(Query::True, Query::True.normalize());
1742        assert_eq!(Query::False, Query::False.normalize());
1743        assert_eq!(Query::I64(42), Query::I64(42).normalize());
1744        assert_eq!(Query::U64(42), Query::U64(42).normalize());
1745        assert_eq!(Query::F64(42.0), Query::F64(42.0).normalize());
1746        assert_eq!(
1747            Query::String("Hello World".to_string()),
1748            Query::String("Hello World".to_string()).normalize()
1749        );
1750    }
1751
1752    #[test]
1753    fn query_normalize_array() {
1754        assert_eq!(Query::must("[]"), Query::must("[]").normalize());
1755        assert_eq!(
1756            Query::must("[true, false, \"Hello World\"]"),
1757            Query::must("[true, false, \"Hello World\"]").normalize()
1758        );
1759        assert_eq!(
1760            Query::must("[true, \"Hello World\"] or [false, \"Hello World\"]"),
1761            Query::must("[true or false, \"Hello World\"]").normalize()
1762        );
1763        assert_eq!(
1764            Query::must("[true, false] or [true, \"Hello World\"]"),
1765            Query::must("[true, false or \"Hello World\"]").normalize()
1766        );
1767    }
1768
1769    #[test]
1770    fn query_normalize_object() {
1771        assert_eq!(Query::must("{}"), Query::must("{}").normalize());
1772        assert_eq!(
1773            Query::must("{\"a\": true, \"b\": \"Hello World\"}"),
1774            Query::must("{\"a\": true, \"b\": \"Hello World\"}").normalize()
1775        );
1776        assert_eq!(
1777            Query::must(
1778                "{\"a\": true, \"b\": \"Hello World\"} or {\"a\": false, \"b\": \"Hello World\"}"
1779            ),
1780            Query::must("{\"a\": true or false, \"b\": \"Hello World\"}").normalize()
1781        );
1782        assert_eq!(
1783            Query::must("{\"a\": true, \"b\": \"Hello World\"} or {\"a\": true, \"b\": 42}"),
1784            Query::must("{\"a\": true, \"b\": \"Hello World\" or 42}").normalize()
1785        );
1786    }
1787
1788    #[test]
1789    fn query_normalize_nested() {
1790        assert_eq!(
1791            Query::must(
1792                "
1793            {\"a\": 42, \"b\": [\"Hello World\"]} or
1794            {\"a\": 42, \"b\": [13]} or
1795            {\"a\": 42, \"b\": [false]} or
1796            {\"a\": 42, \"b\": [[\"x\"]]} or
1797            {\"a\": 42, \"b\": [[\"y\"]]}
1798        "
1799                .trim()
1800            ),
1801            Query::must(
1802                "{\"a\": 42, \"b\": [\"Hello World\" or 13] or [false or [\"x\" or \"y\"]]}"
1803            )
1804            .normalize()
1805        );
1806    }
1807
1808    #[test]
1809    fn query_no_conjunctions() {
1810        assert_eq!(
1811            vec![Query::Any],
1812            Query::Any.conjunctions().collect::<Vec<_>>()
1813        );
1814        assert_eq!(
1815            vec![Query::True],
1816            Query::True.conjunctions().collect::<Vec<_>>()
1817        );
1818        assert_eq!(
1819            vec![Query::False],
1820            Query::False.conjunctions().collect::<Vec<_>>()
1821        );
1822        assert_eq!(
1823            vec![Query::I64(42)],
1824            Query::I64(42).conjunctions().collect::<Vec<_>>()
1825        );
1826        assert_eq!(
1827            vec![Query::U64(42)],
1828            Query::U64(42).conjunctions().collect::<Vec<_>>()
1829        );
1830        assert_eq!(
1831            vec![Query::F64(42.0)],
1832            Query::F64(42.0).conjunctions().collect::<Vec<_>>()
1833        );
1834        assert_eq!(
1835            vec![Query::String("Hello World".to_string())],
1836            Query::String("Hello World".to_string())
1837                .conjunctions()
1838                .collect::<Vec<_>>()
1839        );
1840    }
1841
1842    #[test]
1843    fn query_conjunctions_array() {
1844        assert_eq!(
1845            vec![Query::must("[]")],
1846            Query::must("[]").conjunctions().collect::<Vec<_>>()
1847        );
1848        assert_eq!(
1849            vec![
1850                Query::must("[true]"),
1851                Query::must("[false]"),
1852                Query::must("[\"Hello World\"]")
1853            ],
1854            Query::must("[true, false, \"Hello World\"]")
1855                .conjunctions()
1856                .collect::<Vec<_>>()
1857        );
1858    }
1859
1860    #[test]
1861    fn query_conjunctions_object() {
1862        assert_eq!(
1863            vec![Query::must("{}")],
1864            Query::must("{}").conjunctions().collect::<Vec<_>>()
1865        );
1866        assert_eq!(
1867            vec![
1868                Query::must("{\"a\": true}"),
1869                Query::must("{\"b\": \"Hello World\"}")
1870            ],
1871            Query::must("{\"a\": true, \"b\": \"Hello World\"}")
1872                .conjunctions()
1873                .collect::<Vec<_>>()
1874        );
1875    }
1876
1877    #[test]
1878    fn query_conjucnions_nested() {
1879        assert_eq!(vec![
1880            Query::must("{\"a\": 42}"),
1881            Query::must("{\"b\": [\"Hello World\"]}"),
1882            Query::must("{\"b\": [\"Goodbye World\"]}"),
1883            Query::must("{\"c\": [false]}"),
1884            Query::must("{\"c\": [[\"x\"]]}"),
1885            Query::must("{\"c\": [[\"y\"]]}"),
1886        ], Query::must("{\"a\": 42, \"b\": [\"Hello World\", \"Goodbye World\"], \"c\": [false, [\"x\", \"y\"]]}").conjunctions().collect::<Vec<_>>());
1887    }
1888
1889    fn do_reverse_translate(value: Value) {
1890        let mut sym_table = SymbolTable::default();
1891        let mut text = vec![];
1892        sym_table.translate_recursive(&value, "", &mut text);
1893        assert_eq!(Some(value), sym_table.reverse_translate(&text));
1894    }
1895
1896    #[test]
1897    fn reverse_translate_bool() {
1898        do_reverse_translate(Value::Bool(true));
1899        do_reverse_translate(Value::Bool(false));
1900    }
1901
1902    #[test]
1903    fn reverse_translate_numbers() {
1904        do_reverse_translate(Value::I64(42));
1905        do_reverse_translate(Value::U64(42));
1906        do_reverse_translate(Value::F64(std::f64::consts::PI));
1907    }
1908
1909    #[test]
1910    fn reverse_translate_string() {
1911        do_reverse_translate(value!("hello world"));
1912    }
1913
1914    #[test]
1915    fn reverse_translate_array() {
1916        do_reverse_translate(value!([]));
1917        do_reverse_translate(value!(["hello world"]));
1918        do_reverse_translate(value!(["hello world", true]));
1919        do_reverse_translate(value!(["hello world", true, 42]));
1920    }
1921
1922    #[test]
1923    fn reverse_translate_object() {
1924        do_reverse_translate(value!({}));
1925        do_reverse_translate(value!({greeting: "hello world"}));
1926        do_reverse_translate(value!({greeting: "hello world", success: true}));
1927        do_reverse_translate(value!({greeting: "hello world", success: true, number: 42}));
1928    }
1929
1930    #[test]
1931    fn reverse_translate_nesting() {
1932        do_reverse_translate(
1933            value!({greetings: ["hi", "howdy", "hello world"], numbers: [std::f64::consts::PI, 42]}),
1934        );
1935    }
1936}