scepter 0.1.3

Composable primitives for planet-scale time-series routing, indexing, and aggregation.
Documentation
use std::collections::{HashMap, HashSet};
use std::error::Error;
use std::fmt;
use std::hash::Hash;

/// Errors produced by field-hint excerpt generation.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum HintError {
    /// Requested excerpt size was zero.
    EmptyExcerpt,
}

impl fmt::Display for HintError {
    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            Self::EmptyExcerpt => formatter.write_str("excerpt size must be greater than zero"),
        }
    }
}

impl Error for HintError {}

/// Strategy for extracting searchable excerpts from field values.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ExcerptStrategy {
    /// Padded trigrams.
    Trigram,
    /// Padded n-grams of the given width.
    NGram(usize),
    /// Full field value.
    Full,
}

impl ExcerptStrategy {
    /// Extracts excerpts from `value`.
    pub fn excerpts(&self, value: &str) -> Vec<String> {
        match self {
            Self::Trigram => trigrams(value),
            Self::NGram(size) => ngrams(value, *size),
            Self::Full => vec![value.to_owned()],
        }
    }
}

/// Indexed field excerpt.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct FieldHint {
    /// Schema name.
    pub schema: String,
    /// Field name.
    pub field: String,
    /// Searchable excerpt.
    pub excerpt: String,
}

/// Predicate represented as mandatory excerpts.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum FieldPredicate {
    /// Field value equals this string.
    Equals(String),
    /// Field value contains this string.
    Contains(String),
    /// Field value must contain all provided excerpts.
    HasAllExcerpts(Vec<String>),
}

impl FieldPredicate {
    /// Returns mandatory excerpts implied by this predicate.
    pub fn excerpts(&self, strategy: &ExcerptStrategy) -> Vec<String> {
        match self {
            Self::Equals(value) | Self::Contains(value) => strategy.excerpts(value),
            Self::HasAllExcerpts(excerpts) => excerpts.clone(),
        }
    }
}

/// Approximate field-hint index mapping searchable excerpts to child shards.
#[derive(Debug, Clone)]
pub struct FieldHintIndex<ChildId> {
    strategy: ExcerptStrategy,
    hints: HashMap<String, HashMap<String, HashMap<String, PostingList<ChildId>>>>,
    hint_count: usize,
    postings: usize,
}

#[derive(Debug, Clone, PartialEq, Eq)]
struct PostingList<ChildId> {
    children: Vec<ChildId>,
}

impl<ChildId> Default for PostingList<ChildId> {
    fn default() -> Self {
        Self {
            children: Vec::new(),
        }
    }
}

impl<ChildId: Ord> PostingList<ChildId> {
    fn insert(&mut self, child: ChildId) -> bool {
        match self.children.binary_search(&child) {
            Ok(_) => false,
            Err(index) => {
                self.children.insert(index, child);
                true
            }
        }
    }

    fn contains(&self, child: &ChildId) -> bool {
        self.children.binary_search(child).is_ok()
    }

    fn len(&self) -> usize {
        self.children.len()
    }

    fn iter(&self) -> impl Iterator<Item = &ChildId> {
        self.children.iter()
    }
}

impl<ChildId> Default for FieldHintIndex<ChildId> {
    fn default() -> Self {
        Self {
            strategy: ExcerptStrategy::Trigram,
            hints: HashMap::new(),
            hint_count: 0,
            postings: 0,
        }
    }
}

impl<ChildId> FieldHintIndex<ChildId>
where
    ChildId: Clone + Eq + Hash + Ord,
{
    /// Creates a trigram field-hint index.
    pub fn new() -> Self {
        Self::default()
    }

    /// Creates a field-hint index with a custom excerpt strategy.
    pub fn with_strategy(strategy: ExcerptStrategy) -> Self {
        Self {
            strategy,
            hints: HashMap::new(),
            hint_count: 0,
            postings: 0,
        }
    }

    /// Returns the excerpt strategy used by this index.
    pub fn strategy(&self) -> &ExcerptStrategy {
        &self.strategy
    }

    /// Returns true when the index contains no hints.
    pub fn is_empty(&self) -> bool {
        self.hint_count == 0
    }

    /// Returns the number of distinct field excerpts in the index.
    pub fn hint_count(&self) -> usize {
        self.hint_count
    }

    /// Returns the total number of unique child postings across all hints.
    pub fn posting_count(&self) -> usize {
        self.postings
    }

    /// Indexes all excerpts extracted from `value` for `child`.
    pub fn insert_value(&mut self, schema: &str, field: &str, value: &str, child: ChildId) {
        for excerpt in self.strategy.excerpts(value) {
            self.insert_excerpt(schema, field, excerpt, child.clone());
        }
    }

    /// Indexes one explicit excerpt for `child`.
    pub fn insert_excerpt(
        &mut self,
        schema: &str,
        field: &str,
        excerpt: impl AsRef<str> + Into<String>,
        child: ChildId,
    ) {
        if let Some(fields) = self.hints.get_mut(schema) {
            if let Some(excerpts) = fields.get_mut(field) {
                let posting_list = excerpts.entry(excerpt.into()).or_default();
                let was_empty = posting_list.len() == 0;
                let inserted = posting_list.insert(child);
                if was_empty {
                    self.hint_count += 1;
                }
                if inserted {
                    self.postings += 1;
                }
                return;
            }
        }

        let field_hints = self
            .hints
            .entry(schema.to_owned())
            .or_default()
            .entry(field.to_owned())
            .or_default();
        let posting_list = field_hints.entry(excerpt.into()).or_default();
        let was_empty = posting_list.len() == 0;
        let inserted = posting_list.insert(child);
        if was_empty {
            self.hint_count += 1;
        }
        if inserted {
            self.postings += 1;
        }
    }

    /// Returns children that may match `predicate`.
    pub fn candidates(
        &self,
        schema: &str,
        field: &str,
        predicate: &FieldPredicate,
    ) -> HashSet<ChildId> {
        let excerpts = predicate.excerpts(&self.strategy);
        if excerpts.is_empty() {
            return HashSet::new();
        };

        let mut posting_lists = Vec::with_capacity(excerpts.len());
        for excerpt in &excerpts {
            let Some(children) = self.lookup(schema, field, excerpt) else {
                return HashSet::new();
            };
            posting_lists.push(children);
        }

        posting_lists.sort_unstable_by_key(|children| children.len());
        let Some((smallest, rest)) = posting_lists.split_first() else {
            return HashSet::new();
        };

        let mut result = HashSet::with_capacity(smallest.len());
        'candidate: for child in smallest.iter() {
            for children in rest {
                if !children.contains(child) {
                    continue 'candidate;
                }
            }
            result.insert(child.clone());
        }
        result
    }

    /// Returns the union of candidates for several predicates.
    pub fn candidate_union(
        &self,
        schema: &str,
        field: &str,
        predicates: &[FieldPredicate],
    ) -> HashSet<ChildId> {
        let mut result = HashSet::new();
        for predicate in predicates {
            result.extend(self.candidates(schema, field, predicate));
        }
        result
    }

    fn lookup(&self, schema: &str, field: &str, excerpt: &str) -> Option<&PostingList<ChildId>> {
        self.hints.get(schema)?.get(field)?.get(excerpt)
    }
}

/// Returns padded trigrams for `value`.
pub fn trigrams(value: &str) -> Vec<String> {
    ngrams_with_padding(value, 3)
}

/// Returns padded n-grams, or an empty vector if `size` is zero.
pub fn ngrams(value: &str, size: usize) -> Vec<String> {
    try_ngrams(value, size).unwrap_or_default()
}

/// Returns padded n-grams and reports zero-width requests as an error.
pub fn try_ngrams(value: &str, size: usize) -> Result<Vec<String>, HintError> {
    if size == 0 {
        return Err(HintError::EmptyExcerpt);
    }
    Ok(ngrams_with_padding(value, size))
}

fn ngrams_with_padding(value: &str, size: usize) -> Vec<String> {
    let padding = "^".repeat(size.saturating_sub(1));
    let suffix = "$".repeat(size.saturating_sub(1));
    let padded = format!("{padding}{value}{suffix}");
    let chars: Vec<char> = padded.chars().collect();
    chars
        .windows(size)
        .map(|window| window.iter().collect())
        .collect()
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn field_hint_index_intersects_trigram_candidates() {
        let mut index = FieldHintIndex::new();
        index.insert_value("ComputeTask", "job", "monarch", "leaf-1");
        index.insert_value("ComputeTask", "job", "monitor", "leaf-2");

        let candidates = index.candidates(
            "ComputeTask",
            "job",
            &FieldPredicate::Equals("monarch".to_owned()),
        );

        assert!(candidates.contains("leaf-1"));
        assert!(!candidates.contains("leaf-2"));
    }

    #[test]
    fn full_string_strategy_indexes_metric_names() {
        let mut index = FieldHintIndex::with_strategy(ExcerptStrategy::Full);
        index.insert_value("Metric", "name", "/rpc/server/latency", "zone-1");

        let candidates = index.candidates(
            "Metric",
            "name",
            &FieldPredicate::Equals("/rpc/server/latency".to_owned()),
        );

        assert_eq!(candidates.len(), 1);
    }

    #[test]
    fn field_hint_index_deduplicates_child_postings() {
        let mut index = FieldHintIndex::with_strategy(ExcerptStrategy::Full);
        index.insert_value("Metric", "name", "/rpc/server/latency", "zone-1");
        index.insert_value("Metric", "name", "/rpc/server/latency", "zone-1");

        assert_eq!(index.hint_count(), 1);
        assert_eq!(index.posting_count(), 1);
    }

    #[test]
    fn field_hint_index_counts_new_hints_and_shared_postings() {
        let mut index = FieldHintIndex::with_strategy(ExcerptStrategy::Full);
        index.insert_value("Metric", "name", "/rpc/server/latency", "zone-1");
        index.insert_value("Metric", "name", "/rpc/client/latency", "zone-1");
        index.insert_value("Metric", "name", "/rpc/client/latency", "zone-2");

        assert_eq!(index.hint_count(), 2);
        assert_eq!(index.posting_count(), 3);
    }

    #[test]
    fn field_hint_index_requires_every_requested_excerpt() {
        let mut index = FieldHintIndex::with_strategy(ExcerptStrategy::Full);
        index.insert_excerpt("Metric", "name", "latency", "zone-1");
        index.insert_excerpt("Metric", "name", "latency", "zone-2");
        index.insert_excerpt("Metric", "name", "rpc", "zone-1");
        index.insert_excerpt("Metric", "name", "rpc", "zone-3");
        index.insert_excerpt("Metric", "name", "rpc", "zone-4");

        let candidates = index.candidates(
            "Metric",
            "name",
            &FieldPredicate::HasAllExcerpts(vec!["latency".to_owned(), "rpc".to_owned()]),
        );

        assert_eq!(candidates.len(), 1);
        assert!(candidates.contains("zone-1"));
        assert!(!candidates.contains("zone-2"));
        assert!(!candidates.contains("zone-3"));
        assert!(!candidates.contains("zone-4"));
    }

    #[test]
    fn field_hint_index_reports_strategy_and_empty_state() {
        let mut index = FieldHintIndex::<u32>::with_strategy(ExcerptStrategy::NGram(4));

        assert!(index.is_empty());
        assert_eq!(index.strategy(), &ExcerptStrategy::NGram(4));

        index.insert_value("ComputeTask", "job", "monarch", 7);

        assert!(!index.is_empty());
        assert!(index.hint_count() > 0);
        assert!(index.posting_count() >= index.hint_count());
    }
}