#![deny(missing_docs)]
#![cfg_attr(miri, allow(warnings))]
#[doc(hidden)]
pub mod automaton;
mod case_folding;
#[doc(hidden)]
pub mod flatten_json;
mod flattener;
#[doc(hidden)]
pub mod json;
#[doc(hidden)]
pub mod numbits;
#[doc(hidden)]
pub mod regexp;
#[doc(hidden)]
pub mod segments_tree;
mod unicode_categories;
#[cfg(test)]
mod regexp_samples;
#[cfg(kani)]
mod kani_proofs;
pub use crate::flatten_json::ArrayPos;
pub use crate::flattener::{Flattener, JsonFlattener, OwnedField, SegmentsTreeTracker};
use automaton::{NfaBuffers, ThreadSafeCoreMatcher};
use json::Matcher;
use parking_lot::Mutex;
use rustc_hash::{FxHashMap, FxHashSet};
use segments_tree::SegmentsTree;
use std::cell::RefCell;
use std::fmt;
use std::hash::Hash;
use std::sync::atomic::{AtomicU64, Ordering};
thread_local! {
static TL_FLATTENER: RefCell<flatten_json::State> = RefCell::new(flatten_json::State::new());
static TL_NFA_BUFS: RefCell<NfaBuffers> = RefCell::new(NfaBuffers::new());
}
#[derive(Debug, Default)]
pub struct PrunerStats {
emitted: AtomicU64,
filtered: AtomicU64,
}
impl PrunerStats {
fn new() -> Self {
Self::default()
}
fn reset(&self) {
self.emitted.store(0, Ordering::Relaxed);
self.filtered.store(0, Ordering::Relaxed);
}
fn add_emitted(&self, count: u64) {
self.emitted.fetch_add(count, Ordering::Relaxed);
}
fn add_filtered(&self, count: u64) {
self.filtered.fetch_add(count, Ordering::Relaxed);
}
pub fn emitted(&self) -> u64 {
self.emitted.load(Ordering::Relaxed)
}
pub fn filtered(&self) -> u64 {
self.filtered.load(Ordering::Relaxed)
}
fn should_rebuild(&self) -> bool {
let emitted = self.emitted.load(Ordering::Relaxed);
let filtered = self.filtered.load(Ordering::Relaxed);
if emitted + filtered < 1000 {
return false;
}
if emitted == 0 {
return false;
}
filtered.checked_mul(5).is_none_or(|fx5| fx5 > emitted)
}
}
impl Clone for PrunerStats {
fn clone(&self) -> Self {
Self {
emitted: AtomicU64::new(self.emitted.load(Ordering::Relaxed)),
filtered: AtomicU64::new(self.filtered.load(Ordering::Relaxed)),
}
}
}
type PatternDef = FxHashMap<String, Vec<Matcher>>;
#[derive(Debug)]
pub enum QuaminaError {
InvalidJson(String),
InvalidPattern(String),
InvalidUtf8,
UnsupportedMediaType(String),
PatternTooComplex(String),
}
impl fmt::Display for QuaminaError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::InvalidJson(msg) => write!(f, "invalid JSON: {msg}"),
Self::InvalidPattern(msg) => write!(f, "invalid pattern: {msg}"),
Self::InvalidUtf8 => write!(f, "invalid UTF-8"),
Self::UnsupportedMediaType(mt) => {
write!(f, "media type \"{mt}\" is not supported by Quamina")
}
Self::PatternTooComplex(msg) => {
write!(f, "pattern too complex: {msg}")
}
}
}
}
#[derive(Debug, Clone)]
pub struct PatternLimits {
pub max_pattern_depth: usize,
pub max_fields_per_pattern: usize,
pub arena_byte_budget: usize,
pub max_states_per_pattern: usize,
}
impl Default for PatternLimits {
fn default() -> Self {
Self {
max_pattern_depth: 256,
max_fields_per_pattern: 256,
arena_byte_budget: 10 * 1024 * 1024, max_states_per_pattern: 1024,
}
}
}
impl std::error::Error for QuaminaError {}
pub struct QuaminaBuilder<X: Clone + Eq + Hash + Send + Sync = String> {
auto_rebuild_enabled: bool,
media_type_validated: bool,
custom_flattener: Option<Box<dyn flattener::Flattener>>,
pattern_limits: PatternLimits,
_phantom: std::marker::PhantomData<X>,
}
impl<X: Clone + Eq + Hash + Send + Sync> QuaminaBuilder<X> {
#[must_use]
pub fn new() -> Self {
Self {
auto_rebuild_enabled: true,
media_type_validated: false,
custom_flattener: None,
pattern_limits: PatternLimits::default(),
_phantom: std::marker::PhantomData,
}
}
pub fn with_media_type(mut self, media_type: &str) -> Result<Self, QuaminaError> {
if self.custom_flattener.is_some() {
return Err(QuaminaError::InvalidPattern(
"flattener already specified".into(),
));
}
match media_type {
"application/json" => {
self.media_type_validated = true;
Ok(self)
}
other => Err(QuaminaError::UnsupportedMediaType(other.to_string())),
}
}
pub fn with_flattener(
mut self,
flattener: Box<dyn flattener::Flattener>,
) -> Result<Self, QuaminaError> {
if self.media_type_validated {
return Err(QuaminaError::InvalidPattern(
"media-type already specified".into(),
));
}
if self.custom_flattener.is_some() {
return Err(QuaminaError::InvalidPattern(
"flattener specified more than once".into(),
));
}
self.custom_flattener = Some(flattener);
Ok(self)
}
#[must_use]
pub fn with_max_pattern_depth(mut self, depth: usize) -> Self {
assert!(depth > 0, "max_pattern_depth must be at least 1");
self.pattern_limits.max_pattern_depth = depth;
self
}
#[must_use]
pub fn with_max_fields_per_pattern(mut self, count: usize) -> Self {
assert!(count > 0, "max_fields_per_pattern must be at least 1");
self.pattern_limits.max_fields_per_pattern = count;
self
}
#[must_use]
pub fn with_arena_byte_budget(mut self, budget: usize) -> Self {
assert!(budget > 0, "arena_byte_budget must be at least 1");
self.pattern_limits.arena_byte_budget = budget;
self
}
#[must_use]
pub fn with_max_states_per_pattern(mut self, max_states: usize) -> Self {
assert!(max_states > 0, "max_states_per_pattern must be at least 1");
self.pattern_limits.max_states_per_pattern = max_states;
self
}
#[must_use]
pub const fn with_auto_rebuild(mut self, enabled: bool) -> Self {
self.auto_rebuild_enabled = enabled;
self
}
pub fn build(self) -> Result<Quamina<X>, QuaminaError> {
Ok(Quamina {
automaton: ThreadSafeCoreMatcher::with_limits(
self.pattern_limits.arena_byte_budget,
self.pattern_limits.max_states_per_pattern,
),
pattern_defs: FxHashMap::default(),
deleted_patterns: FxHashSet::default(),
segments_tree: SegmentsTree::new(),
custom_flattener: self.custom_flattener.map(Mutex::new),
pruner_stats: PrunerStats::new(),
auto_rebuild_enabled: self.auto_rebuild_enabled,
pattern_limits: self.pattern_limits,
})
}
}
impl<X: Clone + Eq + Hash + Send + Sync> Default for QuaminaBuilder<X> {
fn default() -> Self {
Self::new()
}
}
pub struct Quamina<X: Clone + Eq + Hash + Send + Sync = String> {
automaton: ThreadSafeCoreMatcher<X>,
pattern_defs: FxHashMap<X, Vec<PatternDef>>,
deleted_patterns: FxHashSet<X>,
segments_tree: SegmentsTree,
custom_flattener: Option<Mutex<Box<dyn flattener::Flattener>>>,
pruner_stats: PrunerStats,
auto_rebuild_enabled: bool,
pattern_limits: PatternLimits,
}
impl<X: Clone + Eq + Hash + Send + Sync> Clone for Quamina<X> {
fn clone(&self) -> Self {
let automaton = ThreadSafeCoreMatcher::with_limits(
self.automaton.memory_budget(),
self.pattern_limits.max_states_per_pattern,
);
self.replay_patterns_into(&automaton);
let custom_flattener = self.custom_flattener.as_ref().map(|f| {
let flattener = f.lock();
Mutex::new(flattener.copy())
});
Self {
automaton,
pattern_defs: self.pattern_defs.clone(),
deleted_patterns: self.deleted_patterns.clone(),
segments_tree: self.segments_tree.clone(),
custom_flattener,
pruner_stats: self.pruner_stats.clone(),
auto_rebuild_enabled: self.auto_rebuild_enabled,
pattern_limits: self.pattern_limits.clone(),
}
}
}
impl<X: Clone + Eq + Hash + Send + Sync> Quamina<X> {
#[must_use]
pub fn new() -> Self {
let limits = PatternLimits::default();
Self {
automaton: ThreadSafeCoreMatcher::with_limits(
limits.arena_byte_budget,
limits.max_states_per_pattern,
),
pattern_defs: FxHashMap::default(),
deleted_patterns: FxHashSet::default(),
segments_tree: SegmentsTree::new(),
custom_flattener: None,
pruner_stats: PrunerStats::new(),
auto_rebuild_enabled: true,
pattern_limits: limits,
}
}
pub fn add_pattern(&mut self, x: X, pattern_json: &str) -> Result<(), QuaminaError> {
let fields = json::parse_pattern(pattern_json, &self.pattern_limits)?;
let pattern_fields: Vec<(String, Vec<Matcher>)> = fields.clone().into_iter().collect();
self.automaton.add_pattern(x.clone(), &pattern_fields)?;
for field_path in fields.keys() {
let segment_path = field_path.replace('.', "\n");
self.segments_tree.add(&segment_path);
}
self.deleted_patterns.remove(&x);
self.pattern_defs.entry(x).or_default().push(fields);
Ok(())
}
pub fn matches_for_event(&self, event: &[u8]) -> Result<Vec<X>, QuaminaError> {
if let Some(ref custom_flattener_mutex) = self.custom_flattener {
return self.matches_for_event_custom_flattener(event, custom_flattener_mutex);
}
TL_FLATTENER.with(|flattener_cell| {
TL_NFA_BUFS.with(|bufs_cell| {
let mut flattener = flattener_cell.borrow_mut();
let mut bufs = bufs_cell.borrow_mut();
let streaming_fields = flattener.flatten(event, &self.segments_tree)?;
streaming_fields.sort_unstable_by(|a, b| a.path.cmp(&b.path));
let raw_matches = self
.automaton
.matches_for_fields_direct(streaming_fields, &mut bufs);
Ok(self.filter_deleted_matches(raw_matches))
})
})
}
fn matches_for_event_custom_flattener(
&self,
event: &[u8],
custom_flattener_mutex: &Mutex<Box<dyn flattener::Flattener>>,
) -> Result<Vec<X>, QuaminaError> {
use std::sync::Arc;
let mut custom_flattener = custom_flattener_mutex.lock();
let owned_fields = custom_flattener.flatten(event, &self.segments_tree)?;
drop(custom_flattener);
let mut streaming_fields: Vec<flatten_json::Field<'static>> = owned_fields
.into_iter()
.map(|f| flatten_json::Field {
path: Arc::from(f.path.as_slice()),
val: flatten_json::FieldValue::Owned(f.val),
array_trail: f.array_trail.into(),
is_number: f.is_number,
})
.collect();
streaming_fields.sort_unstable_by(|a, b| a.path.cmp(&b.path));
let matches = TL_NFA_BUFS.with(|bufs_cell| {
let mut bufs = bufs_cell.borrow_mut();
let raw_matches = self
.automaton
.matches_for_fields_direct(&streaming_fields, &mut bufs);
self.filter_deleted_matches(raw_matches)
});
Ok(matches)
}
fn replay_patterns_into(&self, automaton: &ThreadSafeCoreMatcher<X>) {
for (id, patterns) in &self.pattern_defs {
if self.deleted_patterns.contains(id) {
continue;
}
for fields in patterns {
let pattern_fields: Vec<(String, Vec<Matcher>)> =
fields.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
automaton
.add_pattern(id.clone(), &pattern_fields)
.expect("pre-validated pattern should not fail on rebuild");
}
}
}
fn filter_deleted_matches(&self, raw_matches: Vec<X>) -> Vec<X> {
if self.deleted_patterns.is_empty() {
self.pruner_stats.add_emitted(raw_matches.len() as u64);
raw_matches
} else {
let raw_count = raw_matches.len();
let filtered: Vec<X> = raw_matches
.into_iter()
.filter(|x| !self.deleted_patterns.contains(x))
.collect();
let filtered_count = raw_count - filtered.len();
self.pruner_stats.add_emitted(filtered.len() as u64);
self.pruner_stats.add_filtered(filtered_count as u64);
filtered
}
}
#[doc(hidden)]
pub const fn automaton(&self) -> &ThreadSafeCoreMatcher<X> {
&self.automaton
}
#[doc(hidden)]
pub const fn segments_tree(&self) -> &SegmentsTree {
&self.segments_tree
}
#[doc(hidden)]
pub fn flatten_only(&self, event: &[u8]) -> Result<usize, QuaminaError> {
TL_FLATTENER.with(|flattener_cell| {
let mut flattener = flattener_cell.borrow_mut();
let fields = flattener.flatten(event, &self.segments_tree)?;
Ok(fields.len())
})
}
pub fn delete_patterns(&mut self, x: &X) -> Result<(), QuaminaError> {
if !self.pattern_defs.contains_key(x) || self.deleted_patterns.contains(x) {
return Ok(()); }
self.deleted_patterns.insert(x.clone());
Ok(())
}
pub fn has_matches(&self, event: &[u8]) -> Result<bool, QuaminaError> {
Ok(!self.matches_for_event(event)?.is_empty())
}
pub fn count_matches(&self, event: &[u8]) -> Result<usize, QuaminaError> {
Ok(self.matches_for_event(event)?.len())
}
pub fn pattern_count(&self) -> usize {
self.pattern_defs
.keys()
.filter(|k| !self.deleted_patterns.contains(*k))
.count()
}
pub fn is_empty(&self) -> bool {
self.pattern_count() == 0
}
pub const fn pruner_stats(&self) -> &PrunerStats {
&self.pruner_stats
}
pub fn arena_stats(&self) -> automaton::arena::Stats {
self.automaton.arena_stats()
}
#[must_use]
pub fn get_memory_budget(&self) -> (usize, usize) {
(
self.automaton.memory_budget(),
self.automaton.current_memory_usage(),
)
}
pub fn set_memory_budget(&self, budget: usize) -> Result<usize, QuaminaError> {
let current = self.automaton.current_memory_usage();
if budget != 0 && budget < current {
return Err(QuaminaError::PatternTooComplex(format!(
"set_memory_budget: requested budget ({budget} bytes) is smaller \
than the memory currently in use ({current} bytes)"
)));
}
self.automaton.set_memory_budget(budget);
Ok(current)
}
pub const fn set_auto_rebuild(&mut self, enabled: bool) {
self.auto_rebuild_enabled = enabled;
}
pub const fn auto_rebuild_enabled(&self) -> bool {
self.auto_rebuild_enabled
}
pub fn rebuild(&mut self) -> usize {
let purged = self.deleted_patterns.len();
if purged == 0 {
return 0;
}
let new_automaton = ThreadSafeCoreMatcher::with_limits(
self.automaton.memory_budget(),
self.pattern_limits.max_states_per_pattern,
);
self.replay_patterns_into(&new_automaton);
self.pattern_defs
.retain(|id, _| !self.deleted_patterns.contains(id));
self.deleted_patterns.clear();
self.pruner_stats.reset();
self.automaton = new_automaton;
purged
}
pub fn should_rebuild(&self) -> bool {
self.pruner_stats.should_rebuild()
}
pub fn maybe_rebuild(&mut self) -> usize {
if self.auto_rebuild_enabled && self.pruner_stats.should_rebuild() {
self.rebuild()
} else {
0
}
}
pub fn clear(&mut self) {
self.automaton = ThreadSafeCoreMatcher::with_limits(
self.automaton.memory_budget(),
self.pattern_limits.max_states_per_pattern,
);
self.pattern_defs.clear();
self.deleted_patterns.clear();
self.pruner_stats.reset();
}
pub fn list_pattern_ids(&self) -> Vec<&X> {
self.pattern_defs
.keys()
.filter(|id| !self.deleted_patterns.contains(*id))
.collect()
}
pub fn contains_pattern(&self, id: &X) -> bool {
self.pattern_defs.contains_key(id) && !self.deleted_patterns.contains(id)
}
}
impl<X: Clone + Eq + Hash + Send + Sync> Default for Quamina<X> {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
#[macro_use]
mod test_helpers;
#[cfg(test)]
mod tests_core;
#[cfg(test)]
mod tests_operators;
#[cfg(test)]
mod tests_stress;