1#![doc = include_str!("../README.md")]
2
3use std::collections::hash_map::Entry;
4use std::collections::{HashMap, HashSet};
5use std::ffi::c_void;
6use std::fs::{metadata, read_dir, rename, File, Metadata, OpenOptions};
7use std::io::{BufRead, BufReader, ErrorKind, Read, Write};
8use std::os::fd::{AsRawFd, RawFd};
9use std::os::unix::fs::MetadataExt;
10use std::path::{Path, PathBuf};
11use std::str::FromStr;
12use std::sync::atomic::{AtomicBool, Ordering};
13use std::sync::{Arc, Mutex};
14use std::thread::{sleep, JoinHandle};
15use std::time::Duration;
16
17use chrono::{DateTime, DurationRound, TimeDelta, Utc};
18
19use buffertk::Unpackable;
20use indicio::protobuf::ClueVector;
21use indicio::{Clue, Value};
22use mani::{Edit, Manifest, ManifestOptions};
23use prototk::FieldNumber;
24use scrunch::bit_vector::sparse::BitVector;
25use scrunch::bit_vector::BitVector as BitVectorTrait;
26use scrunch::builder::Builder;
27use scrunch::{CompressedDocument, Document, RecordOffset};
28use zerror::{iotoz, Z};
29use zerror_core::ErrorCore;
30
31mod parser;
32
33#[derive(zerror_derive::Z)]
36pub enum Error {
37 Success {
38 core: ErrorCore,
39 },
40 System {
41 core: ErrorCore,
42 kind: ErrorKind,
43 what: String,
44 },
45 DirectoryNotFound {
46 core: ErrorCore,
47 what: String,
48 },
49 DirectoryAlreadyExists {
50 core: ErrorCore,
51 what: String,
52 },
53 Manifest {
54 core: ErrorCore,
55 what: mani::Error,
56 },
57 InvalidNumberLiteral {
58 core: ErrorCore,
59 as_str: String,
60 },
61 Parsing {
62 core: ErrorCore,
63 what: String,
64 },
65 InvalidSymbolTable {
66 core: ErrorCore,
67 line: String,
68 },
69 InvalidPath {
70 core: ErrorCore,
71 what: String,
72 },
73 InvalidTimestamp {
74 core: ErrorCore,
75 what: i64,
76 },
77 Scrunch {
78 core: ErrorCore,
79 what: scrunch::Error,
80 },
81 Indicio {
82 core: ErrorCore,
83 what: prototk::Error,
84 },
85 EmptyClueFile {
86 core: ErrorCore,
87 },
88 FileTooLarge {
89 core: ErrorCore,
90 },
91}
92
93iotoz! {Error}
94
95impl From<std::io::Error> for Error {
96 fn from(err: std::io::Error) -> Self {
97 Self::System {
98 core: ErrorCore::default(),
99 kind: err.kind(),
100 what: err.to_string(),
101 }
102 }
103}
104
105impl From<mani::Error> for Error {
106 fn from(err: mani::Error) -> Self {
107 Self::Manifest {
108 core: ErrorCore::default(),
109 what: err,
110 }
111 }
112}
113
114impl From<scrunch::Error> for Error {
115 fn from(err: scrunch::Error) -> Self {
116 Self::Scrunch {
117 core: ErrorCore::default(),
118 what: err,
119 }
120 }
121}
122
123impl From<prototk::Error> for Error {
124 fn from(err: prototk::Error) -> Self {
125 Self::Indicio {
126 core: ErrorCore::default(),
127 what: err,
128 }
129 }
130}
131
132impl From<parser::ParseError> for Error {
133 fn from(err: parser::ParseError) -> Self {
134 Self::Parsing {
135 core: ErrorCore::default(),
136 what: err.what().to_string(),
137 }
138 }
139}
140
141#[derive(Debug)]
144pub struct SymbolTable {
145 symbols: HashMap<String, u32>,
146 next_symbol: u32,
147}
148
149impl SymbolTable {
150 pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
151 if !path.as_ref().exists() {
152 return Ok(Self::default());
153 }
154 let file = File::open(path.as_ref())
155 .as_z()
156 .with_info("path", path.as_ref().to_string_lossy())?;
157 Self::from_reader(file)
158 }
159
160 pub fn from_reader<R: Read>(reader: R) -> Result<Self, Error> {
161 let reader = BufReader::new(reader);
162 let mut syms = SymbolTable::default();
163 for line in reader.lines() {
164 let line = line?;
165 let line = line.trim();
166 let mut pieces: Vec<&str> = line.rsplitn(2, ' ').collect();
167 pieces.reverse();
168 if pieces.len() != 2 {
169 return Err(Error::InvalidSymbolTable {
170 core: ErrorCore::default(),
171 line: line.to_string(),
172 });
173 }
174 let mangled = pieces[0].to_string();
175 let symbol = u32::from_str(pieces[1]).map_err(|_| Error::InvalidNumberLiteral {
176 core: ErrorCore::default(),
177 as_str: pieces[1].to_string(),
178 })?;
179 syms.symbols.insert(mangled, symbol);
180 syms.next_symbol = std::cmp::max(syms.next_symbol, symbol + 2);
181 }
182 Ok(syms)
183 }
184
185 pub fn to_file<P: AsRef<Path>>(&self, path: P) -> Result<(), Error> {
186 let mut output = OpenOptions::new()
187 .create(true)
188 .truncate(true)
189 .write(true)
190 .open(path)?;
191 let mut sorted = self.symbols.iter().collect::<Vec<_>>();
192 sorted.sort();
193 for (mangled, symbol) in sorted.into_iter() {
194 writeln!(output, "{} {}", mangled, symbol)?;
195 }
196 output.flush()?;
197 Ok(())
198 }
199
200 pub fn append_dummy_record(&mut self, text: &mut Vec<u32>) {
201 text.push(u32::MAX);
202 }
203
204 pub fn translate(&mut self, clue: Clue, text: &mut Vec<u32>) {
205 let value = indicio::value!({
206 file: clue.file,
207 line: clue.line,
208 level: clue.level,
209 timestamp: clue.timestamp,
210 value: clue.value,
211 });
212 self.translate_recursive(&value, "", text);
213 }
214
215 pub fn reverse_translate(&self, text: &[u32]) -> Option<Value> {
216 self.reverse_translate_recursive(text, "")
218 }
219
220 pub fn translate_query(&self, query: &Query) -> Vec<Vec<u32>> {
221 self.translate_query_recursive(query, "")
222 }
223
224 pub fn reverse_translate_query(&self, text: &[u32]) -> Option<Value> {
225 if text.is_empty() {
226 return None;
227 }
228 let symbol = &self.reverse_lookup(text[0])?;
229 let terminal = self.reverse_translate_recursive(text, &symbol[..symbol.len() - 1])?;
230 fn build_from_symbol(mut symbol: &str, terminal: Value) -> Option<Value> {
231 if symbol.is_empty() {
232 return Some(terminal);
233 }
234 match &symbol[0..1] {
235 "o" => {
236 if symbol.len() == 1 {
237 return Some(terminal);
238 }
239 if &symbol[1..2] != "k" {
240 return None;
241 }
242 let len: String = symbol[2..]
243 .chars()
244 .take_while(|c| c.is_ascii_digit())
245 .collect();
246 symbol = &symbol[2 + len.len()..];
247 let len = usize::from_str(&len).ok()?;
248 let key = symbol[..len].to_string();
249 let obj = build_from_symbol(&symbol[len..], terminal)?;
250 Some(Value::Object(indicio::Map::from_iter(vec![(key, obj)])))
251 }
252 "a" => {
253 let obj = build_from_symbol(&symbol[1..], terminal)?;
254 Some(Value::Array(indicio::Values::from(vec![obj])))
255 }
256 _ => Some(terminal),
257 }
258 }
259 build_from_symbol(symbol, terminal)
260 }
261
262 pub fn iter(&self) -> impl Iterator<Item = (&str, u32)> {
263 self.symbols.iter().map(|(k, v)| (k.as_ref(), *v))
264 }
265
266 pub fn markers(&self) -> impl Iterator<Item = (u32, u32)> {
267 let mut markers = vec![];
268 for (sym, text) in self.symbols.iter() {
269 if sym.ends_with('t') || sym.ends_with('f') || sym.ends_with('n') || sym.ends_with('#')
270 {
271 markers.push((*text, *text));
272 } else {
273 markers.push((*text, *text + 1));
274 }
275 }
276 markers.into_iter()
277 }
278
279 fn translate_recursive(&mut self, value: &Value, symbol: &str, text: &mut Vec<u32>) {
280 match value {
281 Value::Bool(b) => {
282 let symbol = symbol.to_string() + if *b { "T" } else { "F" };
283 let sigma = self.lookup_symbol(&symbol);
284 text.push(sigma);
285 }
286 Value::I64(x) => {
287 let symbol = symbol.to_string() + "i";
288 let sigma = self.lookup_symbol(&symbol);
289 text.push(sigma);
290 for b in x.to_be_bytes() {
291 text.push(b as u32);
292 }
293 text.push(sigma + 1);
294 }
295 Value::U64(x) => {
296 let symbol = symbol.to_string() + "u";
297 let sigma = self.lookup_symbol(&symbol);
298 text.push(sigma);
299 for b in x.to_be_bytes() {
300 text.push(b as u32);
301 }
302 text.push(sigma + 1);
303 }
304 Value::F64(x) => {
305 let symbol = symbol.to_string() + "f";
306 let sigma = self.lookup_symbol(&symbol);
307 text.push(sigma);
308 for b in x.to_bits().to_be_bytes() {
309 text.push(b as u32);
310 }
311 text.push(sigma + 1);
312 }
313 Value::String(s) => {
314 let symbol = symbol.to_string() + "s";
315 let sigma = self.lookup_symbol(&symbol);
316 text.push(sigma);
317 for c in s.chars() {
318 text.push(c as u32);
319 }
320 text.push(sigma + 1);
321 }
322 Value::Array(a) => {
323 let symbol = symbol.to_string() + "a";
324 let sigma = self.lookup_symbol(&symbol);
325 text.push(sigma);
326 for e in a.iter() {
327 self.translate_recursive(e, &symbol, text);
328 }
329 text.push(sigma + 1);
330 }
331 Value::Object(o) => {
332 let symbol = symbol.to_string() + "o";
333 let sigma = self.lookup_symbol(&symbol);
334 text.push(sigma);
335 for (k, v) in o.iter() {
336 let len = k.chars().count();
337 let symbol = format!("{}k{}{}", symbol, len, k);
338 self.translate_recursive(v, &symbol, text);
339 }
340 text.push(sigma + 1);
341 }
342 };
343 }
344
345 fn translate_query_recursive(&self, query: &Query, symbol: &str) -> Vec<Vec<u32>> {
346 match query {
347 Query::Any => {
348 let symbol = symbol.to_string();
349 let mut result = vec![];
350 for c in &["o", "a", "T", "F", "i", "u", "f"] {
351 if let Some(sigma) = self.symbols.get(&(symbol.clone() + c)).copied() {
352 result.push(vec![sigma])
353 }
354 }
355 result
356 }
357 Query::True => {
358 let symbol = symbol.to_string() + "t";
359 if let Some(sigma) = self.symbols.get(&symbol).copied() {
360 vec![vec![sigma]]
361 } else {
362 vec![]
363 }
364 }
365 Query::False => {
366 let symbol = symbol.to_string() + "f";
367 if let Some(sigma) = self.symbols.get(&symbol).copied() {
368 vec![vec![sigma]]
369 } else {
370 vec![]
371 }
372 }
373 Query::I64(x) => {
374 let isymbol = symbol.to_string() + "i";
375 let usymbol = symbol.to_string() + "u";
376 if let Some(sigma) = self.symbols.get(&isymbol) {
377 let mut text = Vec::new();
378 text.push(*sigma);
379 for b in x.to_be_bytes() {
380 text.push(b as u32);
381 }
382 text.push(*sigma + 1);
383 vec![text]
384 } else if let Some(sigma) = self.symbols.get(&usymbol) {
385 let mut text = Vec::new();
386 text.push(*sigma);
387 for b in x.to_be_bytes() {
388 text.push(b as u32);
389 }
390 text.push(*sigma + 1);
391 vec![text]
392 } else {
393 vec![]
394 }
395 }
396 Query::U64(x) => {
397 let isymbol = symbol.to_string() + "i";
398 let usymbol = symbol.to_string() + "u";
399 if let Some(sigma) = self.symbols.get(&usymbol) {
400 let mut text = Vec::new();
401 text.push(*sigma);
402 for b in x.to_be_bytes() {
403 text.push(b as u32);
404 }
405 text.push(*sigma + 1);
406 vec![text]
407 } else if let Some(sigma) = self.symbols.get(&isymbol) {
408 let mut text = Vec::new();
409 text.push(*sigma);
410 for b in x.to_be_bytes() {
411 text.push(b as u32);
412 }
413 text.push(*sigma + 1);
414 vec![text]
415 } else {
416 vec![]
417 }
418 }
419 Query::F64(x) => {
420 let symbol = symbol.to_string() + "f";
421 if let Some(sigma) = self.symbols.get(&symbol) {
422 let mut text = Vec::new();
423 text.push(*sigma);
424 for b in x.to_bits().to_be_bytes() {
425 text.push(b as u32);
426 }
427 text.push(*sigma + 1);
428 vec![text]
429 } else {
430 vec![]
431 }
432 }
433 Query::String(s) => {
434 let symbol = symbol.to_string() + "s";
435 if let Some(sigma) = self.symbols.get(&symbol).copied() {
436 let mut text = vec![sigma];
437 for c in s.chars() {
438 text.push(c as u32);
439 }
440 text.push(sigma + 1);
441 vec![text]
442 } else {
443 vec![]
444 }
445 }
446 Query::Array(a) => {
447 assert!(a.len() <= 1);
448 let symbol = symbol.to_string() + "a";
449 if a.len() == 1 {
450 self.translate_query_recursive(&a[0], &symbol)
451 } else if let Some(sigma) = self.symbols.get(&symbol).copied() {
452 vec![vec![sigma]]
453 } else {
454 vec![]
455 }
456 }
457 Query::Object(o) => {
458 assert!(o.len() <= 1);
459 if o.len() == 1 {
460 let (k, q) = &o[0];
461 let len = k.chars().count();
462 let symbol = format!("{}ok{}{}", symbol, len, k);
463 self.translate_query_recursive(q, &symbol)
464 } else if o.is_empty() {
465 let symbol = symbol.to_string() + "o";
466 if let Some(sigma) = self.symbols.get(&symbol).copied() {
467 vec![vec![sigma]]
468 } else {
469 vec![]
470 }
471 } else {
472 unreachable!();
473 }
474 }
475 Query::Or(_) => {
476 panic!("do not translate disjunctions");
477 }
478 }
479 }
480
481 fn reverse_translate_keys(&self, mut text: &[u32], path: &str) -> Option<Value> {
482 let mut map = indicio::Map::default();
483 while !text.is_empty() {
484 let symbol = self.reverse_lookup(text[0])?;
485 let relative = symbol.strip_prefix(path)?;
486 if !relative.starts_with('k') {
487 return None;
488 }
489 let key_len: String = relative[1..]
490 .chars()
491 .take_while(char::is_ascii_digit)
492 .collect();
493 let key = &relative[1 + key_len.len()..];
494 let Ok(key_len) = usize::from_str(&key_len) else {
495 return None;
496 };
497 let key = &key[..key.len() - 1];
498 if key_len != key.len() {
499 return None;
500 }
501 if symbol.ends_with('T') || symbol.ends_with('F') {
502 let value =
503 self.reverse_translate_recursive(&text[..1], &symbol[..symbol.len() - 1])?;
504 text = &text[1..];
505 map.insert(key.to_string(), value);
506 } else {
507 let position = text.iter().position(|t| *t == text[0] + 1)?;
508 let value = self.reverse_translate_recursive(
509 &text[..position + 1],
510 &symbol[..symbol.len() - 1],
511 )?;
512 text = &text[position + 1..];
513 map.insert(key.to_string(), value);
514 }
515 }
516 Some(Value::Object(map))
517 }
518
519 fn reverse_translate_array(&self, mut text: &[u32], path: &str) -> Option<Value> {
520 let mut values = vec![];
521 while !text.is_empty() {
522 let symbol = self.reverse_lookup(text[0])?;
523 if !symbol.starts_with(path) {
524 return None;
525 }
526 if symbol.ends_with('T') || symbol.ends_with('F') {
527 let value =
528 self.reverse_translate_recursive(&text[..1], &symbol[..symbol.len() - 1])?;
529 text = &text[1..];
530 values.push(value);
531 } else {
532 let position = text.iter().position(|t| *t == text[0] + 1)?;
533 let value = self.reverse_translate_recursive(
534 &text[..position + 1],
535 &symbol[..symbol.len() - 1],
536 )?;
537 text = &text[position + 1..];
538 values.push(value);
539 }
540 }
541 Some(Value::from(values))
542 }
543
544 fn reverse_translate_recursive(&self, text: &[u32], path: &str) -> Option<Value> {
545 if text.is_empty() {
546 return None;
547 }
548 let symbol = self.reverse_lookup(text[0])?;
549 let relative = symbol.strip_prefix(path)?;
550 match relative {
551 "o" => self.reverse_translate_keys(&text[1..text.len() - 1], &(path.to_string() + "o")),
552 "a" => {
553 self.reverse_translate_array(&text[1..text.len() - 1], &(path.to_string() + "a"))
554 }
555 "s" => Some(Value::String(
556 text.iter().copied().flat_map(char::from_u32).collect(),
557 )),
558 "i" => {
559 if text.len() != 10 {
560 return None;
561 }
562 let mut buf = [0u8; 8];
563 for (b, t) in std::iter::zip(buf.iter_mut(), text[1..9].iter()) {
564 *b = *t as u8;
565 }
566 Some(Value::I64(i64::from_be_bytes(buf)))
567 }
568 "u" => {
569 if text.len() != 10 {
570 return None;
571 }
572 let mut buf = [0u8; 8];
573 for (b, t) in std::iter::zip(buf.iter_mut(), text[1..9].iter()) {
574 *b = *t as u8;
575 }
576 Some(Value::U64(u64::from_be_bytes(buf)))
577 }
578 "f" => {
579 if text.len() != 10 {
580 return None;
581 }
582 let mut buf = [0u8; 8];
583 for (b, t) in std::iter::zip(buf.iter_mut(), text[1..9].iter()) {
584 *b = *t as u8;
585 }
586 Some(Value::F64(f64::from_bits(u64::from_be_bytes(buf))))
587 }
588 "T" => Some(Value::Bool(true)),
589 "F" => Some(Value::Bool(false)),
590 _ => None,
591 }
592 }
593
594 fn lookup_symbol(&mut self, symbol: &str) -> u32 {
595 match self.symbols.entry(symbol.to_string()) {
596 Entry::Occupied(entry) => *entry.get(),
597 Entry::Vacant(entry) => {
598 let sym = self.next_symbol;
599 self.next_symbol += 2;
600 entry.insert(sym);
601 sym
602 }
603 }
604 }
605
606 fn reverse_lookup(&self, sym: u32) -> Option<&str> {
607 for (s, t) in self.symbols.iter() {
608 if *t == sym {
609 return Some(s);
610 }
611 }
612 None
613 }
614}
615
616impl Default for SymbolTable {
617 fn default() -> Self {
618 Self {
619 symbols: HashMap::new(),
620 next_symbol: 0x110000,
621 }
622 }
623}
624
625#[allow(clippy::type_complexity)]
628fn group_by_second(
629 mut watermark: DateTime<Utc>,
630 clues: Vec<Clue>,
631) -> Result<Vec<(DateTime<Utc>, Vec<Clue>)>, Error> {
632 if clues.is_empty() {
633 return Err(Error::EmptyClueFile {
634 core: ErrorCore::default(),
635 });
636 }
637 let one_second = TimeDelta::try_seconds(1).expect("one second should always construct");
638 watermark = watermark.duration_trunc(one_second).unwrap();
639 let mut results = vec![];
640 for clue in clues {
641 let Some(ts) = DateTime::from_timestamp_millis(clue.timestamp as i64 / 1_000) else {
642 return Err(Error::InvalidTimestamp {
643 core: ErrorCore::default(),
644 what: clue.timestamp as i64,
645 });
646 };
647 while watermark <= ts {
648 results.push((watermark, vec![]));
649 watermark += one_second;
650 }
651 let len = results.len() - 1;
652 results[len].1.push(clue);
653 }
654 Ok(results)
655}
656
657#[allow(clippy::type_complexity)]
658fn convert_clues_to_analogize_inner(
659 sym_table: &mut SymbolTable,
660 start_time: DateTime<Utc>,
661 clues: Vec<Clue>,
662) -> Result<(Vec<u32>, Vec<usize>, Vec<usize>), Error> {
663 let mut text = vec![];
664 let mut record_boundaries = vec![];
665 let mut second_boundaries = vec![];
666 if clues.is_empty() {
667 second_boundaries.push(record_boundaries.len());
668 record_boundaries.push(text.len());
669 sym_table.append_dummy_record(&mut text);
670 }
671 if clues.is_empty() {
672 return Err(Error::EmptyClueFile {
673 core: ErrorCore::default(),
674 });
675 }
676 for (_, clues) in group_by_second(start_time, clues)? {
677 if clues.is_empty() {
678 second_boundaries.push(record_boundaries.len());
679 record_boundaries.push(text.len());
680 sym_table.append_dummy_record(&mut text);
681 } else {
682 for clue in clues {
683 record_boundaries.push(text.len());
684 sym_table.translate(clue, &mut text);
685 }
686 second_boundaries.push(record_boundaries.len() - 1);
687 }
688 }
689 second_boundaries.push(record_boundaries.len());
690 Ok((text, record_boundaries, second_boundaries))
691}
692
693pub fn convert_clues_to_analogize<P: AsRef<Path>>(
694 sym_table: &mut SymbolTable,
695 start_time: DateTime<Utc>,
696 clues: Vec<Clue>,
697 analogize: P,
698) -> Result<(), Error> {
699 let (text, record_boundaries, second_boundaries) =
700 convert_clues_to_analogize_inner(sym_table, start_time, clues)?;
701 let mut buf = Vec::new();
702 let mut builder = Builder::new(&mut buf);
703 let mut sub = builder.sub(FieldNumber::must(1));
704 CompressedDocument::construct(text, record_boundaries, &mut sub)?;
705 drop(sub);
706 let mut sub = builder.sub(FieldNumber::must(2));
707 BitVector::from_indices(
708 16,
709 second_boundaries[second_boundaries.len() - 1] + 1,
710 &second_boundaries,
711 &mut sub,
712 )
713 .ok_or(scrunch::Error::InvalidBitVector)?;
714 drop(sub);
715 drop(builder);
716 std::fs::write(analogize.as_ref(), buf)?;
717 Ok(())
718}
719
720#[derive(Clone, Debug, Default, prototk_derive::Message)]
723struct AnalogizeDocumentStub<'a> {
724 #[prototk(1, bytes)]
725 document: &'a [u8],
726 #[prototk(2, bytes)]
727 timeline: &'a [u8],
728}
729
730struct AnalogizeDocument<'a> {
731 document: CompressedDocument<'a>,
732 #[allow(dead_code)]
733 timeline: BitVector<'a>,
734}
735
736impl AnalogizeDocument<'_> {
737 fn query(&self, syms: &SymbolTable, query: &Query) -> Result<HashSet<RecordOffset>, Error> {
738 let records = if let Query::Or(subqueries) = query {
739 let mut records = HashSet::new();
740 for query in subqueries {
741 for offset in self.query(syms, query)? {
742 records.insert(offset);
743 }
744 }
745 records
746 } else {
747 let mut results = HashSet::new();
748 let mut needles = vec![];
749 for conjunction in query.clone().conjunctions() {
750 needles.append(&mut syms.translate_query(&conjunction));
751 }
752 let mut needles = needles.into_iter();
753 if let Some(needle) = needles.next() {
754 for offset in self.document.search(&needle)? {
755 results.insert(self.document.lookup(offset)?);
756 }
757 }
758 for needle in needles {
759 let inner = std::mem::take(&mut results);
760 for offset in self.document.search(&needle)? {
761 let offset = self.document.lookup(offset)?;
762 if inner.contains(&offset) {
763 results.insert(offset);
764 }
765 }
766 }
767 results
768 };
769 Ok(records)
770 }
771}
772
773impl<'a> Unpackable<'a> for AnalogizeDocument<'a> {
774 type Error = Error;
775
776 fn unpack<'b: 'a>(buf: &'b [u8]) -> Result<(Self, &'b [u8]), Self::Error> {
777 let (stub, buf) = AnalogizeDocumentStub::unpack(buf).map_err(|_| Error::Scrunch {
778 core: ErrorCore::default(),
779 what: scrunch::Error::InvalidDocument,
780 })?;
781 let document = CompressedDocument::unpack(stub.document)?.0;
782 let timeline = BitVector::parse(stub.timeline)?.0;
783 Ok((AnalogizeDocument { document, timeline }, buf))
784 }
785}
786
787#[derive(Clone, Debug, Default)]
790pub enum Query {
791 #[default]
792 Any,
793 True,
794 False,
795 I64(i64),
796 U64(u64),
797 F64(f64),
798 String(String),
799 Array(Vec<Query>),
800 Object(Vec<(String, Query)>),
801 Or(Vec<Query>),
802}
803
804impl Query {
805 pub fn parse<S: AsRef<str>>(query: S) -> Result<Self, Error> {
806 Ok(parser::parse_all(parser::query)(query.as_ref())?)
807 }
808
809 pub fn must<S: AsRef<str>>(query: S) -> Self {
810 Query::parse(query).expect("query should parse")
811 }
812
813 pub fn normalize(self) -> Query {
814 match self {
815 Query::Any
816 | Query::True
817 | Query::False
818 | Query::I64(_)
819 | Query::U64(_)
820 | Query::F64(_)
821 | Query::String(_) => self,
822 Query::Array(subqueries) => Self::normalize_array(subqueries),
823 Query::Object(subqueries) => Self::normalize_object(subqueries),
824 Query::Or(subqueries) => Self::normalize_or(subqueries),
825 }
826 }
827
828 fn conjunctions(self) -> impl Iterator<Item = Query> {
829 let answer: Box<dyn Iterator<Item = Query>> = match self {
830 Query::Any
831 | Query::True
832 | Query::False
833 | Query::I64(_)
834 | Query::U64(_)
835 | Query::F64(_)
836 | Query::String(_) => Box::new(vec![self].into_iter()),
837 Query::Array(subqueries) => {
838 if subqueries.is_empty() {
839 Box::new(vec![Query::Array(vec![])].into_iter())
840 } else {
841 Box::new(
842 subqueries
843 .into_iter()
844 .flat_map(|q| q.conjunctions())
845 .map(|q| Query::Array(vec![q])),
846 )
847 }
848 }
849 Query::Object(subqueries) => {
850 if subqueries.is_empty() {
851 Box::new(vec![Query::Object(vec![])].into_iter())
852 } else {
853 let mut results = vec![];
854 for (s, q) in subqueries.into_iter() {
855 for q in q.conjunctions() {
856 results.push(Query::Object(vec![(s.clone(), q)]));
857 }
858 }
859 Box::new(results.into_iter())
860 }
861 }
862 Query::Or(_) => {
863 panic!("calling conjunctions on Or clause");
865 }
866 };
867 answer
868 }
869
870 fn normalize_mut(query: &mut Query) {
871 let q = std::mem::take(query);
872 *query = Self::normalize(q);
873 }
874
875 fn normalize_array(subqueries: Vec<Query>) -> Query {
876 if subqueries.is_empty() {
877 return Query::Array(subqueries);
878 }
879 let mut disjunctions: Vec<Vec<Query>> = vec![vec![]];
880 for subquery in subqueries.into_iter() {
881 let subquery = Self::normalize(subquery);
882 if let Query::Or(subqueries) = subquery {
883 let inner = std::mem::take(&mut disjunctions);
884 for subquery in subqueries.into_iter() {
885 for inner in inner.iter() {
886 let mut inner = inner.clone();
887 inner.push(subquery.clone());
888 disjunctions.push(inner);
889 }
890 }
891 } else {
892 for disjunction in disjunctions.iter_mut() {
893 disjunction.push(subquery.clone());
894 }
895 }
896 }
897 if disjunctions.len() > 1 {
898 Query::Or(disjunctions.into_iter().map(Query::Array).collect())
899 } else if let Some(subqueries) = disjunctions.pop() {
900 Query::Array(subqueries)
901 } else {
902 unreachable!();
903 }
904 }
905
906 fn normalize_object(subqueries: Vec<(String, Query)>) -> Query {
907 if subqueries.is_empty() {
908 return Query::Object(subqueries);
909 }
910 let mut disjunctions: Vec<Vec<(String, Query)>> = vec![vec![]];
911 for (key, subquery) in subqueries.into_iter() {
912 let subquery = Self::normalize(subquery);
913 if let Query::Or(subqueries) = subquery {
914 let inner = std::mem::take(&mut disjunctions);
915 for subquery in subqueries.into_iter() {
916 for inner in inner.iter() {
917 let mut inner = inner.clone();
918 inner.push((key.clone(), subquery.clone()));
919 disjunctions.push(inner);
920 }
921 }
922 } else {
923 for disjunction in disjunctions.iter_mut() {
924 disjunction.push((key.clone(), subquery.clone()));
925 }
926 }
927 }
928 if disjunctions.len() > 1 {
929 Query::Or(disjunctions.into_iter().map(Query::Object).collect())
930 } else if let Some(subqueries) = disjunctions.pop() {
931 Query::Object(subqueries)
932 } else {
933 unreachable!();
934 }
935 }
936
937 fn normalize_or(mut subqueries: Vec<Query>) -> Query {
938 subqueries.iter_mut().for_each(Query::normalize_mut);
939 subqueries.sort_by_key(|x| if let Query::Or(_) = *x { 1 } else { 0 });
940 let partition = subqueries.partition_point(|x| !matches!(x, Query::Or(_)));
941 let disjunctions = subqueries.split_off(partition);
942 for disjunction in disjunctions.into_iter() {
943 if let Query::Or(mut subq) = disjunction {
944 subqueries.append(&mut subq);
945 } else {
946 unreachable!();
947 }
948 }
949 Query::Or(subqueries)
950 }
951}
952
953impl Eq for Query {}
954
955impl PartialEq for Query {
956 fn eq(&self, query: &Query) -> bool {
957 match (self, query) {
958 (Query::Any, Query::Any) => true,
959 (Query::True, Query::True) => true,
960 (Query::False, Query::False) => true,
961 (Query::I64(x), Query::I64(y)) => x == y,
962 (Query::U64(x), Query::U64(y)) => x == y,
963 (Query::F64(x), Query::F64(y)) => x.total_cmp(y).is_eq(),
964 (Query::String(x), Query::String(y)) => x == y,
965 (Query::Array(x), Query::Array(y)) => x == y,
966 (Query::Object(x), Query::Object(y)) => x == y,
967 (Query::Or(x), Query::Or(y)) => x == y,
968 _ => false,
969 }
970 }
971}
972
973struct DocumentMapping {
976 data: *mut c_void,
977 size: usize,
978}
979
980impl DocumentMapping {
981 fn doc(&self) -> Result<AnalogizeDocument, Error> {
982 let buf = unsafe { std::slice::from_raw_parts(self.data as *const u8, self.size) };
984 Ok(AnalogizeDocument::unpack(buf)?.0)
985 }
986}
987
988impl Drop for DocumentMapping {
989 fn drop(&mut self) {
990 unsafe {
992 libc::munmap(self.data, self.size);
993 }
994 }
995}
996
997unsafe impl Send for DocumentMapping {}
998unsafe impl Sync for DocumentMapping {}
999
1000struct State {
1003 options: AnalogizeOptions,
1004 done: AtomicBool,
1005 logs: PathBuf,
1006 data: PathBuf,
1007 mani: Mutex<Manifest>,
1008 syms: Mutex<SymbolTable>,
1009 docs: Mutex<HashMap<String, Arc<DocumentMapping>>>,
1010}
1011
1012impl State {
1013 fn new(
1014 options: AnalogizeOptions,
1015 logs: PathBuf,
1016 data: PathBuf,
1017 mani: Manifest,
1018 ) -> Result<Self, Error> {
1019 let done = AtomicBool::new(false);
1020 let syms = if data.join("symbols").exists() {
1021 SymbolTable::from_file(data.join("symbols"))?
1022 } else {
1023 SymbolTable::default()
1024 };
1025 let mani = Mutex::new(mani);
1026 let syms = Mutex::new(syms);
1027 let docs = Mutex::new(HashMap::new());
1028 Ok(Self {
1029 options,
1030 done,
1031 logs,
1032 data,
1033 mani,
1034 syms,
1035 docs,
1036 })
1037 }
1038
1039 fn background(self: Arc<Self>) {
1040 let mut workers = Vec::with_capacity(self.options.threads);
1041 for _ in 0..self.options.threads {
1042 let this = Arc::clone(&self);
1043 workers.push(std::thread::spawn(move || this.worker()));
1044 }
1045 while !self.done.load(Ordering::Relaxed) {
1046 sleep(Duration::from_millis(100));
1047 self.try_ingest().expect("try ingest should never fail");
1049 }
1050 for worker in workers.into_iter() {
1051 let _ = worker.join();
1052 }
1053 }
1054
1055 fn worker(self: Arc<Self>) {}
1056
1057 fn get_documents(&self) -> Result<Vec<Arc<DocumentMapping>>, Error> {
1058 let mani = self.mani.lock().unwrap();
1059 fn select_data(s: &str) -> Option<String> {
1060 s.strip_prefix("data:").map(String::from)
1061 }
1062 let docs: Vec<_> = mani.strs().filter_map(select_data).collect();
1063 let mut mappings = Vec::with_capacity(docs.len());
1064 for doc in docs {
1065 mappings.push(self.get_document(&doc)?);
1066 }
1067 Ok(mappings)
1068 }
1069
1070 fn get_document(&self, doc: &str) -> Result<Arc<DocumentMapping>, Error> {
1071 let mut docs = self.docs.lock().unwrap();
1072 if let Some(doc) = docs.get(doc) {
1073 return Ok(Arc::clone(doc));
1074 }
1075 let path = self.data.join(doc);
1076 let file = File::open(path)?;
1077 let md = file.metadata()?;
1078 if md.len() > usize::MAX as u64 {
1079 return Err(Error::FileTooLarge {
1080 core: ErrorCore::default(),
1081 });
1082 }
1083 #[cfg(not(target_os = "macos"))]
1084 unsafe fn mmap(len: usize, file: RawFd) -> *mut c_void {
1085 libc::mmap64(
1086 std::ptr::null_mut(),
1087 len,
1088 libc::PROT_READ,
1089 libc::MAP_SHARED,
1090 file,
1091 0,
1092 )
1093 }
1094 #[cfg(target_os = "macos")]
1095 unsafe fn mmap(len: usize, file: RawFd) -> *mut c_void {
1096 libc::mmap(
1097 std::ptr::null_mut(),
1098 len,
1099 libc::PROT_READ,
1100 libc::MAP_SHARED,
1101 file,
1102 0,
1103 )
1104 }
1105 let mapping = unsafe { mmap(md.len() as usize, file.as_raw_fd()) };
1107 if mapping == libc::MAP_FAILED {
1108 return Err(std::io::Error::last_os_error().into());
1109 }
1110 let mapping = Arc::new(DocumentMapping {
1111 data: mapping,
1112 size: md.len() as usize,
1113 });
1114 docs.insert(doc.to_string(), Arc::clone(&mapping));
1115 Ok(mapping)
1116 }
1117
1118 fn try_ingest(&self) -> Result<(), Error> {
1119 self.log_to_ingest()?;
1120 self.convert_ingested_logs()?;
1121 Ok(())
1122 }
1123
1124 fn log_to_ingest(&self) -> Result<(), Error> {
1125 let mut mani = self.mani.lock().unwrap();
1126 let threshold_ns = mani.info('L').unwrap_or("0");
1127 let threshold_ns =
1128 i64::from_str(threshold_ns).map_err(|_| Error::InvalidNumberLiteral {
1129 core: ErrorCore::default(),
1130 as_str: threshold_ns.to_string(),
1131 })?;
1132 let (logs_to_ingest, threshold_ns) = take_consistent_cut(&self.logs, threshold_ns)?;
1133 if logs_to_ingest.is_empty() {
1134 return Ok(());
1135 }
1136 let mut edit = Edit::default();
1137 edit.info('L', &format!("{}", threshold_ns))?;
1138 for log in logs_to_ingest.iter() {
1139 edit.add(&format!("log:{}", log))?;
1140 }
1141 mani.apply(edit)?;
1142 Ok(())
1143 }
1144
1145 fn convert_ingested_logs(&self) -> Result<(), Error> {
1146 let (log_inputs, file_number): (Vec<String>, String) = {
1148 let mani = self.mani.lock().unwrap();
1149 fn select_logs(s: &str) -> Option<String> {
1150 s.strip_prefix("log:").map(String::from)
1151 }
1152 (
1153 mani.strs().filter_map(select_logs).collect(),
1154 mani.info('F').unwrap_or("0").to_string(),
1155 )
1156 };
1157 if log_inputs.is_empty() {
1158 return Ok(());
1159 }
1160 let file_number = u64::from_str(&file_number).map_err(|_| Error::InvalidNumberLiteral {
1161 core: ErrorCore::default(),
1162 as_str: file_number,
1163 })?;
1164 let mut clues = vec![];
1166 for input in log_inputs.iter() {
1167 let buf = std::fs::read(self.logs.join(input))?;
1168 let mut cv = ClueVector::unpack(&buf)?.0;
1169 clues.append(&mut cv.clues);
1170 }
1171 let mut edit = Edit::default();
1172 edit.info('F', &(file_number + 1).to_string())?;
1173 if !clues.is_empty() {
1174 let mut syms = self.syms.lock().unwrap();
1175 let start_time = clues.iter().map(|x| x.timestamp).min().unwrap_or(0);
1176 let end_time = clues.iter().map(|x| x.timestamp).max().unwrap_or(0);
1177 let start_time = DateTime::from_timestamp_millis(start_time as i64 / 1_000).ok_or(
1178 Error::InvalidTimestamp {
1179 core: ErrorCore::default(),
1180 what: start_time as i64,
1181 },
1182 )?;
1183 let end_time = DateTime::from_timestamp_millis(end_time as i64 / 1_000).ok_or(
1184 Error::InvalidTimestamp {
1185 core: ErrorCore::default(),
1186 what: end_time as i64,
1187 },
1188 )?;
1189 let output_path = format!(
1190 "{}_{}_{}.analogize",
1191 date_time_to_string(start_time),
1192 date_time_to_string(end_time),
1193 file_number
1194 );
1195 convert_clues_to_analogize(&mut syms, start_time, clues, self.data.join(&output_path))?;
1196 let syms_tmp = format!("symbols.{}", Utc::now().timestamp());
1197 let syms_tmp = self.data.join(syms_tmp);
1198 syms.to_file(&syms_tmp)?;
1199 rename(syms_tmp, self.data.join("symbols"))?;
1200 edit.add(&format!("data:{}", output_path))?;
1201 }
1202 for input in log_inputs.iter() {
1203 edit.rm(&format!("log:{}", input))?;
1204 }
1205 let mut mani = self.mani.lock().unwrap();
1206 mani.apply(edit)?;
1207 Ok(())
1208 }
1209
1210 fn done(&self) {
1211 self.done.store(true, Ordering::Relaxed);
1212 }
1213}
1214
1215#[derive(Clone, Debug, Eq, PartialEq, arrrg_derive::CommandLine)]
1218pub struct AnalogizeOptions {
1219 #[arrrg(required, "Path to indicio log files.")]
1220 logs: String,
1221 #[arrrg(required, "Path to analogize data files.")]
1222 data: String,
1223 #[arrrg(nested)]
1224 mani: ManifestOptions,
1225 #[arrrg(optional, "Number of background worker threads to spawn.")]
1226 threads: usize,
1227}
1228
1229impl AnalogizeOptions {
1230 pub fn data(&self) -> &str {
1231 &self.data
1232 }
1233}
1234
1235impl Default for AnalogizeOptions {
1236 fn default() -> Self {
1237 Self {
1238 logs: "logs".to_string(),
1239 data: "data".to_string(),
1240 mani: ManifestOptions::default(),
1241 threads: 8,
1242 }
1243 }
1244}
1245
1246pub struct Analogize {
1249 state: Arc<State>,
1250 thread: Option<JoinHandle<()>>,
1251}
1252
1253impl Analogize {
1254 pub fn new(options: AnalogizeOptions) -> Result<Self, Error> {
1255 let logs = PathBuf::from(&options.logs);
1256 if !logs.exists() || !logs.is_dir() {
1257 return Err(Error::DirectoryNotFound {
1258 core: ErrorCore::default(),
1259 what: options.logs,
1260 });
1261 }
1262 let data = PathBuf::from(&options.data);
1263 let mani = Manifest::open(options.mani.clone(), data.clone())?;
1264 let state = Arc::new(State::new(options, logs, data, mani)?);
1265 let state_p = Arc::clone(&state);
1266 let thread = Some(std::thread::spawn(move || state_p.background()));
1267 let this = Self { state, thread };
1268 Ok(this)
1269 }
1270
1271 pub fn query(&self, query: Query) -> Result<Vec<Value>, Error> {
1272 let query = query.normalize();
1273 let docs = self.state.get_documents()?;
1274 let mut values = vec![];
1275 for doc in docs {
1276 let doc = doc.doc()?;
1277 let syms = self.state.syms.lock().unwrap();
1278 let mut records: Vec<RecordOffset> = doc.query(&syms, &query)?.into_iter().collect();
1279 records.sort();
1280 for record in records.into_iter() {
1281 let Ok(record) = doc.document.retrieve(record) else {
1282 continue;
1284 };
1285 let Some(value) = syms.reverse_translate(&record) else {
1286 continue;
1288 };
1289 values.push(value);
1290 }
1291 }
1292 Ok(values)
1293 }
1294
1295 pub fn exemplars(&self, num_results: usize) -> Result<Vec<Value>, Error> {
1296 let doc_ptrs = self.state.get_documents()?;
1297 let mut docs = vec![];
1298 for ptr in doc_ptrs.iter() {
1299 docs.push(ptr.doc()?);
1300 }
1301 let doc_refs: Vec<&CompressedDocument> = docs.iter().map(|d| &d.document).collect();
1302 let syms = self.state.syms.lock().unwrap();
1303 let markers: Vec<_> = syms.markers().collect();
1304 let mut values = vec![];
1305 for exemplar in scrunch::exemplars(&doc_refs, &markers).take(num_results) {
1306 if let Some(exemplar) = syms.reverse_translate_query(exemplar.text()) {
1307 values.push(exemplar);
1308 } else {
1309 }
1311 }
1312 Ok(values)
1313 }
1314
1315 pub fn correlates(&self, query: Query, num_results: usize) -> Result<Vec<Value>, Error> {
1316 let doc_ptrs = self.state.get_documents()?;
1317 let mut docs = vec![];
1318 for ptr in doc_ptrs.iter() {
1319 docs.push(ptr.doc()?);
1320 }
1321 let doc_refs: Vec<&CompressedDocument> = docs.iter().map(|d| &d.document).collect();
1322 let syms = self.state.syms.lock().unwrap();
1323 let markers: Vec<_> = syms.markers().collect();
1324 let mut offsets: HashMap<usize, HashSet<RecordOffset>> = HashMap::new();
1325 for (idx, doc) in docs.iter().enumerate() {
1326 let records = doc.query(&syms, &query)?;
1327 offsets.insert(idx, records);
1328 }
1329 let mut values = vec![];
1330 for exemplar in scrunch::correlate(&doc_refs, &markers, move |idx, offset| {
1331 offsets
1332 .get(&idx)
1333 .map(|r| r.get(&offset))
1334 .map(|o| o.is_some())
1335 .unwrap_or(false)
1336 })
1337 .take(num_results)
1338 {
1339 if let Some(exemplar) = syms.reverse_translate_query(exemplar.text()) {
1340 values.push(exemplar);
1341 } else {
1342 }
1344 }
1345 Ok(values)
1346 }
1347}
1348
1349impl Drop for Analogize {
1350 fn drop(&mut self) {
1351 self.state.done();
1352 self.thread.take().map(|t| t.join());
1353 }
1354}
1355
1356fn ctime(md: &Metadata) -> i64 {
1359 md.ctime()
1360 .wrapping_mul(1_000_000_000i64)
1361 .wrapping_add(md.ctime_nsec())
1362}
1363
1364fn take_consistent_cut<P: AsRef<Path>>(
1365 dir: P,
1366 threshold_ns: i64,
1367) -> Result<(Vec<String>, i64), Error> {
1368 loop {
1369 let mut new_threshold_ns = threshold_ns;
1370 let md1 = metadata(dir.as_ref())?;
1371 let mut paths = vec![];
1372 for dirent in read_dir(dir.as_ref())? {
1373 let dirent = dirent?;
1374 let md = dirent.metadata()?;
1375 let ts_ns = ctime(&md);
1376 if ts_ns > threshold_ns {
1377 let mut path = dirent.path();
1378 if path.starts_with(dir.as_ref()) {
1379 path = path
1380 .strip_prefix(dir.as_ref())
1381 .map_err(|_| Error::InvalidPath {
1382 core: ErrorCore::default(),
1383 what: format!("path {} prefix won't strip", path.to_string_lossy()),
1384 })?
1385 .to_path_buf();
1386 }
1387 let display = path.to_string_lossy().to_string();
1388 if PathBuf::from(&display) != path {
1389 return Err(Error::InvalidPath {
1390 core: ErrorCore::default(),
1391 what: format!("path {} contains invalid characters", display),
1392 });
1393 }
1394 paths.push(display);
1395 new_threshold_ns = std::cmp::max(ts_ns, new_threshold_ns);
1396 }
1397 }
1398 let md2 = metadata(dir.as_ref())?;
1399 let ctime1 = ctime(&md1);
1400 let ctime2 = ctime(&md2);
1401 if ctime1 == ctime2 {
1402 return Ok((paths, new_threshold_ns));
1403 }
1404 }
1405}
1406
1407fn date_time_to_string(when: DateTime<Utc>) -> String {
1408 when.to_rfc3339_opts(chrono::format::SecondsFormat::Secs, true)
1409}
1410
1411#[cfg(test)]
1414mod tests {
1415 use indicio::{value, Clue};
1416
1417 use super::*;
1418
1419 fn test_case(sym_table: &mut SymbolTable, value: Value) -> Vec<u32> {
1420 let mut translated: Vec<u32> = vec![];
1421 sym_table.translate_recursive(&value, "", &mut translated);
1422 translated
1423 }
1424
1425 #[test]
1426 fn bool_true() {
1427 let mut sym_table = SymbolTable::default();
1428 let expected = vec![0x110000u32, 0x110002, 0x110001];
1429 let returned = test_case(&mut sym_table, value!({key: true}));
1430 assert_eq!(expected, returned);
1431 assert_eq!(
1432 HashMap::from([
1433 ("o".to_string(), 0x110000),
1434 ("ok3keyT".to_string(), 0x110002),
1435 ]),
1436 sym_table.symbols
1437 );
1438 }
1439
1440 #[test]
1441 fn bool_false() {
1442 let mut sym_table = SymbolTable::default();
1443 let expected = vec![0x110000u32, 0x110002, 0x110001];
1444 let returned = test_case(&mut sym_table, value!({key: false}));
1445 assert_eq!(expected, returned);
1446 assert_eq!(
1447 HashMap::from([
1448 ("o".to_string(), 0x110000),
1449 ("ok3keyF".to_string(), 0x110002),
1450 ]),
1451 sym_table.symbols
1452 );
1453 }
1454
1455 #[test]
1456 fn number_i64() {
1457 let mut sym_table = SymbolTable::default();
1458 let expected = vec![
1459 0x110000u32,
1460 0x110002,
1461 127,
1462 255,
1463 255,
1464 255,
1465 255,
1466 255,
1467 255,
1468 255,
1469 0x110003,
1470 0x110001,
1471 ];
1472 let returned = test_case(&mut sym_table, value!({key: i64::MAX}));
1473 assert_eq!(expected, returned);
1474 assert_eq!(
1475 HashMap::from([
1476 ("o".to_string(), 0x110000),
1477 ("ok3keyi".to_string(), 0x110002),
1478 ]),
1479 sym_table.symbols
1480 );
1481 }
1482
1483 #[test]
1484 fn number_u64() {
1485 let mut sym_table = SymbolTable::default();
1486 let expected = vec![
1487 0x110000u32,
1488 0x110002,
1489 255,
1490 255,
1491 255,
1492 255,
1493 255,
1494 255,
1495 255,
1496 255,
1497 0x110003,
1498 0x110001,
1499 ];
1500 let returned = test_case(&mut sym_table, value!({key: u64::MAX}));
1501 assert_eq!(expected, returned);
1502 assert_eq!(
1503 HashMap::from([
1504 ("o".to_string(), 0x110000),
1505 ("ok3keyu".to_string(), 0x110002),
1506 ]),
1507 sym_table.symbols
1508 );
1509 }
1510
1511 #[test]
1512 fn number_f64() {
1513 let mut sym_table = SymbolTable::default();
1514 let expected = vec![
1515 0x110000u32,
1516 0x110002,
1517 64,
1518 9,
1519 33,
1520 251,
1521 84,
1522 68,
1523 45,
1524 24,
1525 0x110003,
1526 0x110001,
1527 ];
1528 let returned = test_case(&mut sym_table, value!({key: std::f64::consts::PI}));
1529 assert_eq!(expected, returned);
1530 assert_eq!(
1531 HashMap::from([
1532 ("o".to_string(), 0x110000),
1533 ("ok3keyf".to_string(), 0x110002),
1534 ]),
1535 sym_table.symbols
1536 );
1537 }
1538
1539 #[test]
1540 fn string() {
1541 let mut sym_table = SymbolTable::default();
1542 let expected = vec![
1543 0x110000u32,
1544 0x110002,
1545 'v' as u32,
1546 'a' as u32,
1547 'l' as u32,
1548 'u' as u32,
1549 'e' as u32,
1550 0x110003,
1551 0x110001,
1552 ];
1553 let returned = test_case(&mut sym_table, value!({key: "value"}));
1554 assert_eq!(expected, returned);
1555 assert_eq!(
1556 HashMap::from([
1557 ("o".to_string(), 0x110000),
1558 ("ok3keys".to_string(), 0x110002),
1559 ]),
1560 sym_table.symbols
1561 );
1562 }
1563
1564 #[test]
1565 fn array() {
1566 let mut sym_table = SymbolTable::default();
1567 let expected = vec![
1568 0x110000u32,
1569 0x110002,
1570 0x110004, 'v' as u32,
1572 'a' as u32,
1573 'l' as u32,
1574 'u' as u32,
1575 'e' as u32,
1576 '1' as u32, 0x110005,
1578 0x110004, 'v' as u32,
1580 'a' as u32,
1581 'l' as u32,
1582 'u' as u32,
1583 'e' as u32,
1584 '2' as u32, 0x110005,
1586 0x110003,
1587 0x110001, ];
1589 let returned = test_case(&mut sym_table, value!({key: ["value1", "value2"]}));
1590 assert_eq!(expected, returned);
1591 assert_eq!(
1592 HashMap::from([
1593 ("o".to_string(), 0x110000),
1594 ("ok3keya".to_string(), 0x110002),
1595 ("ok3keyas".to_string(), 0x110004),
1596 ]),
1597 sym_table.symbols
1598 );
1599 }
1600
1601 #[test]
1602 fn object() {
1603 let mut sym_table = SymbolTable::default();
1604 let expected = vec![
1605 0x110000u32,
1606 0x110002,
1607 0x110004,
1608 'v' as u32,
1609 'a' as u32,
1610 'l' as u32,
1611 'u' as u32,
1612 'e' as u32,
1613 0x110005,
1614 0x110003,
1615 0x110001,
1616 ];
1617 let returned = test_case(&mut sym_table, value!({key: {key: "value"}}));
1618 assert_eq!(expected, returned);
1619 assert_eq!(
1620 HashMap::from([
1621 ("o".to_string(), 0x110000),
1622 ("ok3keyo".to_string(), 0x110002),
1623 ("ok3keyok3keys".to_string(), 0x110004),
1624 ]),
1625 sym_table.symbols
1626 );
1627 }
1628
1629 fn parse_dt(dt: &str) -> DateTime<Utc> {
1630 DateTime::parse_from_rfc3339(dt).unwrap().to_utc()
1631 }
1632
1633 fn make_clue(dt: &str, value: Value) -> Clue {
1634 Clue {
1635 file: "test_file".to_string(),
1636 line: 42,
1637 level: 0,
1638 timestamp: parse_dt(dt).timestamp_nanos_opt().unwrap() as u64 / 1_000,
1639 value,
1640 }
1641 }
1642
1643 #[test]
1644 fn seal_empty() {
1645 let mut sym_table = SymbolTable::default();
1646 let start_time = parse_dt("2024-02-16T15:10:00Z");
1647 assert!(convert_clues_to_analogize_inner(&mut sym_table, start_time, vec![]).is_err());
1648 }
1649
1650 #[test]
1651 fn seal_record_in_first_second() {
1652 let mut sym_table = SymbolTable::default();
1653 let start_time = parse_dt("2024-02-16T15:10:00Z");
1654 let clues = vec![make_clue("2024-02-16T15:10:00.01Z", value!({key: "value"}))];
1655 let (_, record_boundaries, second_boundaries) =
1656 convert_clues_to_analogize_inner(&mut sym_table, start_time, clues).unwrap();
1657 assert_eq!(vec![0], record_boundaries);
1658 assert_eq!(vec![0, 1], second_boundaries);
1659 }
1660
1661 #[test]
1662 fn seal_one_record_per_second() {
1663 let mut sym_table = SymbolTable::default();
1664 let clues = vec![
1665 make_clue("2024-02-16T15:10:00.01Z", value!({key: "value0"})),
1666 make_clue("2024-02-16T15:10:01.01Z", value!({key: "value1"})),
1667 make_clue("2024-02-16T15:10:02.01Z", value!({key: "value2"})),
1668 ];
1669 let start_time = parse_dt("2024-02-16T15:10:00Z");
1670 let (_, record_boundaries, second_boundaries) =
1671 convert_clues_to_analogize_inner(&mut sym_table, start_time, clues).unwrap();
1672 assert_eq!(vec![0, 53, 106], record_boundaries);
1673 assert_eq!(vec![0, 1, 2, 3], second_boundaries);
1674 }
1675
1676 #[test]
1677 fn seal_gap_at_beginning() {
1678 let mut sym_table = SymbolTable::default();
1679 let clues = vec![
1680 make_clue("2024-02-16T15:10:01.01Z", value!({key: "value1"})),
1681 make_clue("2024-02-16T15:10:02.01Z", value!({key: "value2"})),
1682 make_clue("2024-02-16T15:10:03.01Z", value!({key: "value3"})),
1683 ];
1684 let start_time = parse_dt("2024-02-16T15:10:00Z");
1685 let (_, record_boundaries, second_boundaries) =
1686 convert_clues_to_analogize_inner(&mut sym_table, start_time, clues).unwrap();
1687 assert_eq!(vec![0, 1, 54, 107], record_boundaries);
1688 assert_eq!(vec![0, 1, 2, 3, 4], second_boundaries);
1689 }
1690
1691 #[test]
1692 fn seal_gap_after_first_record() {
1693 let mut sym_table = SymbolTable::default();
1694 let clues = vec![
1695 make_clue("2024-02-16T15:10:00.01Z", value!({key: "value0"})),
1696 make_clue("2024-02-16T15:10:02.01Z", value!({key: "value2"})),
1697 make_clue("2024-02-16T15:10:03.01Z", value!({key: "value3"})),
1698 ];
1699 let start_time = parse_dt("2024-02-16T15:10:00Z");
1700 let (_, record_boundaries, second_boundaries) =
1701 convert_clues_to_analogize_inner(&mut sym_table, start_time, clues).unwrap();
1702 assert_eq!(vec![0, 53, 54, 107], record_boundaries);
1703 assert_eq!(vec![0, 1, 2, 3, 4], second_boundaries);
1704 }
1705
1706 #[test]
1707 fn seal_multiple_records_per_second() {
1708 let mut sym_table = SymbolTable::default();
1709 let clues = vec![
1710 make_clue("2024-02-16T15:10:00.01Z", value!({key: "value1"})),
1711 make_clue("2024-02-16T15:10:00.02Z", value!({key: "value2"})),
1712 make_clue("2024-02-16T15:10:00.03Z", value!({key: "value3"})),
1713 make_clue("2024-02-16T15:10:01.04Z", value!({key: "value4"})),
1714 ];
1715 let start_time = parse_dt("2024-02-16T15:10:00Z");
1716 let (_, record_boundaries, second_boundaries) =
1717 convert_clues_to_analogize_inner(&mut sym_table, start_time, clues).unwrap();
1718 assert_eq!(vec![0, 53, 106, 159], record_boundaries);
1719 assert_eq!(vec![2, 3, 4], second_boundaries);
1720 }
1721
1722 #[test]
1723 fn seal_multiple_records_per_second_with_gaps() {
1724 let mut sym_table = SymbolTable::default();
1725 let clues = vec![
1726 make_clue("2024-02-16T15:10:01.01Z", value!({key: "value1"})),
1727 make_clue("2024-02-16T15:10:01.02Z", value!({key: "value2"})),
1728 make_clue("2024-02-16T15:10:01.03Z", value!({key: "value3"})),
1729 make_clue("2024-02-16T15:10:03.04Z", value!({key: "value4"})),
1730 ];
1731 let start_time = parse_dt("2024-02-16T15:10:00Z");
1732 let (_, record_boundaries, second_boundaries) =
1733 convert_clues_to_analogize_inner(&mut sym_table, start_time, clues).unwrap();
1734 assert_eq!(vec![0, 1, 54, 107, 160, 161], record_boundaries);
1735 assert_eq!(vec![0, 3, 4, 5, 6], second_boundaries);
1736 }
1737
1738 #[test]
1739 fn query_no_normalization_expected() {
1740 assert_eq!(Query::Any, Query::Any.normalize());
1741 assert_eq!(Query::True, Query::True.normalize());
1742 assert_eq!(Query::False, Query::False.normalize());
1743 assert_eq!(Query::I64(42), Query::I64(42).normalize());
1744 assert_eq!(Query::U64(42), Query::U64(42).normalize());
1745 assert_eq!(Query::F64(42.0), Query::F64(42.0).normalize());
1746 assert_eq!(
1747 Query::String("Hello World".to_string()),
1748 Query::String("Hello World".to_string()).normalize()
1749 );
1750 }
1751
1752 #[test]
1753 fn query_normalize_array() {
1754 assert_eq!(Query::must("[]"), Query::must("[]").normalize());
1755 assert_eq!(
1756 Query::must("[true, false, \"Hello World\"]"),
1757 Query::must("[true, false, \"Hello World\"]").normalize()
1758 );
1759 assert_eq!(
1760 Query::must("[true, \"Hello World\"] or [false, \"Hello World\"]"),
1761 Query::must("[true or false, \"Hello World\"]").normalize()
1762 );
1763 assert_eq!(
1764 Query::must("[true, false] or [true, \"Hello World\"]"),
1765 Query::must("[true, false or \"Hello World\"]").normalize()
1766 );
1767 }
1768
1769 #[test]
1770 fn query_normalize_object() {
1771 assert_eq!(Query::must("{}"), Query::must("{}").normalize());
1772 assert_eq!(
1773 Query::must("{\"a\": true, \"b\": \"Hello World\"}"),
1774 Query::must("{\"a\": true, \"b\": \"Hello World\"}").normalize()
1775 );
1776 assert_eq!(
1777 Query::must(
1778 "{\"a\": true, \"b\": \"Hello World\"} or {\"a\": false, \"b\": \"Hello World\"}"
1779 ),
1780 Query::must("{\"a\": true or false, \"b\": \"Hello World\"}").normalize()
1781 );
1782 assert_eq!(
1783 Query::must("{\"a\": true, \"b\": \"Hello World\"} or {\"a\": true, \"b\": 42}"),
1784 Query::must("{\"a\": true, \"b\": \"Hello World\" or 42}").normalize()
1785 );
1786 }
1787
1788 #[test]
1789 fn query_normalize_nested() {
1790 assert_eq!(
1791 Query::must(
1792 "
1793 {\"a\": 42, \"b\": [\"Hello World\"]} or
1794 {\"a\": 42, \"b\": [13]} or
1795 {\"a\": 42, \"b\": [false]} or
1796 {\"a\": 42, \"b\": [[\"x\"]]} or
1797 {\"a\": 42, \"b\": [[\"y\"]]}
1798 "
1799 .trim()
1800 ),
1801 Query::must(
1802 "{\"a\": 42, \"b\": [\"Hello World\" or 13] or [false or [\"x\" or \"y\"]]}"
1803 )
1804 .normalize()
1805 );
1806 }
1807
1808 #[test]
1809 fn query_no_conjunctions() {
1810 assert_eq!(
1811 vec![Query::Any],
1812 Query::Any.conjunctions().collect::<Vec<_>>()
1813 );
1814 assert_eq!(
1815 vec![Query::True],
1816 Query::True.conjunctions().collect::<Vec<_>>()
1817 );
1818 assert_eq!(
1819 vec![Query::False],
1820 Query::False.conjunctions().collect::<Vec<_>>()
1821 );
1822 assert_eq!(
1823 vec![Query::I64(42)],
1824 Query::I64(42).conjunctions().collect::<Vec<_>>()
1825 );
1826 assert_eq!(
1827 vec![Query::U64(42)],
1828 Query::U64(42).conjunctions().collect::<Vec<_>>()
1829 );
1830 assert_eq!(
1831 vec![Query::F64(42.0)],
1832 Query::F64(42.0).conjunctions().collect::<Vec<_>>()
1833 );
1834 assert_eq!(
1835 vec![Query::String("Hello World".to_string())],
1836 Query::String("Hello World".to_string())
1837 .conjunctions()
1838 .collect::<Vec<_>>()
1839 );
1840 }
1841
1842 #[test]
1843 fn query_conjunctions_array() {
1844 assert_eq!(
1845 vec![Query::must("[]")],
1846 Query::must("[]").conjunctions().collect::<Vec<_>>()
1847 );
1848 assert_eq!(
1849 vec![
1850 Query::must("[true]"),
1851 Query::must("[false]"),
1852 Query::must("[\"Hello World\"]")
1853 ],
1854 Query::must("[true, false, \"Hello World\"]")
1855 .conjunctions()
1856 .collect::<Vec<_>>()
1857 );
1858 }
1859
1860 #[test]
1861 fn query_conjunctions_object() {
1862 assert_eq!(
1863 vec![Query::must("{}")],
1864 Query::must("{}").conjunctions().collect::<Vec<_>>()
1865 );
1866 assert_eq!(
1867 vec![
1868 Query::must("{\"a\": true}"),
1869 Query::must("{\"b\": \"Hello World\"}")
1870 ],
1871 Query::must("{\"a\": true, \"b\": \"Hello World\"}")
1872 .conjunctions()
1873 .collect::<Vec<_>>()
1874 );
1875 }
1876
1877 #[test]
1878 fn query_conjucnions_nested() {
1879 assert_eq!(vec![
1880 Query::must("{\"a\": 42}"),
1881 Query::must("{\"b\": [\"Hello World\"]}"),
1882 Query::must("{\"b\": [\"Goodbye World\"]}"),
1883 Query::must("{\"c\": [false]}"),
1884 Query::must("{\"c\": [[\"x\"]]}"),
1885 Query::must("{\"c\": [[\"y\"]]}"),
1886 ], Query::must("{\"a\": 42, \"b\": [\"Hello World\", \"Goodbye World\"], \"c\": [false, [\"x\", \"y\"]]}").conjunctions().collect::<Vec<_>>());
1887 }
1888
1889 fn do_reverse_translate(value: Value) {
1890 let mut sym_table = SymbolTable::default();
1891 let mut text = vec![];
1892 sym_table.translate_recursive(&value, "", &mut text);
1893 assert_eq!(Some(value), sym_table.reverse_translate(&text));
1894 }
1895
1896 #[test]
1897 fn reverse_translate_bool() {
1898 do_reverse_translate(Value::Bool(true));
1899 do_reverse_translate(Value::Bool(false));
1900 }
1901
1902 #[test]
1903 fn reverse_translate_numbers() {
1904 do_reverse_translate(Value::I64(42));
1905 do_reverse_translate(Value::U64(42));
1906 do_reverse_translate(Value::F64(std::f64::consts::PI));
1907 }
1908
1909 #[test]
1910 fn reverse_translate_string() {
1911 do_reverse_translate(value!("hello world"));
1912 }
1913
1914 #[test]
1915 fn reverse_translate_array() {
1916 do_reverse_translate(value!([]));
1917 do_reverse_translate(value!(["hello world"]));
1918 do_reverse_translate(value!(["hello world", true]));
1919 do_reverse_translate(value!(["hello world", true, 42]));
1920 }
1921
1922 #[test]
1923 fn reverse_translate_object() {
1924 do_reverse_translate(value!({}));
1925 do_reverse_translate(value!({greeting: "hello world"}));
1926 do_reverse_translate(value!({greeting: "hello world", success: true}));
1927 do_reverse_translate(value!({greeting: "hello world", success: true, number: 42}));
1928 }
1929
1930 #[test]
1931 fn reverse_translate_nesting() {
1932 do_reverse_translate(
1933 value!({greetings: ["hi", "howdy", "hello world"], numbers: [std::f64::consts::PI, 42]}),
1934 );
1935 }
1936}