use std::collections::HashMap;
use std::num::NonZeroUsize;
use std::path::Path;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use anyhow::{anyhow, Context, Result};
use drain_rs::DrainTree;
use lru::LruCache;
use ort::session::Session;
use ort::value::Tensor;
use tokenizers::Tokenizer;
pub struct ParsedLog {
pub template_id: u64,
pub template: String,
pub timestamp: i64,
pub metadata: HashMap<String, String>,
pub raw_message: String,
}
fn fnv1a64(s: &str) -> u64 {
let mut h: u64 = 0xcbf29ce484222325;
for b in s.as_bytes() {
h ^= u64::from(*b);
h = h.wrapping_mul(0x100000001b3);
}
h
}
pub struct TemplateParser {
tree: DrainTree,
}
impl Default for TemplateParser {
fn default() -> Self {
Self::new()
}
}
impl TemplateParser {
pub fn new() -> Self {
Self {
tree: DrainTree::new(),
}
}
pub fn parse(&mut self, line: &str) -> ParsedLog {
let template = self
.tree
.add_log_line(line)
.map(|cluster| cluster.as_string())
.unwrap_or_else(|| line.to_string());
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs() as i64)
.unwrap_or(0);
ParsedLog {
template_id: fnv1a64(&template),
template,
timestamp,
metadata: HashMap::new(),
raw_message: line.to_string(),
}
}
}
pub struct Embedder {
session: Session,
tokenizer: Tokenizer,
}
impl Embedder {
pub fn new(model_path: impl AsRef<Path>, tokenizer_path: impl AsRef<Path>) -> Result<Self> {
let session = Session::builder()?
.commit_from_file(model_path.as_ref())
.context("Failed to load ONNX model")?;
let tokenizer = Self::build_tokenizer(
Tokenizer::from_file(tokenizer_path.as_ref())
.map_err(|e| anyhow!("Failed to load tokenizer: {e}"))?,
)?;
Ok(Self { session, tokenizer })
}
pub fn from_bytes(model_bytes: &[u8], tokenizer_bytes: &[u8]) -> Result<Self> {
let session = Session::builder()?
.commit_from_memory(model_bytes)
.context("Failed to load ONNX model from bytes")?;
let tokenizer = Self::build_tokenizer(
Tokenizer::from_bytes(tokenizer_bytes)
.map_err(|e| anyhow!("Failed to load tokenizer from bytes: {e}"))?,
)?;
Ok(Self { session, tokenizer })
}
fn build_tokenizer(mut t: Tokenizer) -> Result<Tokenizer> {
t.with_truncation(Some(tokenizers::TruncationParams {
max_length: 512,
strategy: tokenizers::TruncationStrategy::LongestFirst,
stride: 0,
direction: tokenizers::TruncationDirection::Right,
}))
.map_err(|e| anyhow!("Failed to configure tokenizer truncation: {e}"))?;
Ok(t)
}
pub fn embed(&mut self, text: &str) -> Result<Vec<f32>> {
let encoding = self
.tokenizer
.encode(text, true)
.map_err(|e| anyhow!("Failed to tokenize text: {e}"))?;
let ids: Vec<i64> = encoding.get_ids().iter().map(|&x| i64::from(x)).collect();
let mask: Vec<i64> = encoding
.get_attention_mask()
.iter()
.map(|&x| i64::from(x))
.collect();
let type_ids: Vec<i64> = encoding
.get_type_ids()
.iter()
.map(|&x| i64::from(x))
.collect();
let len = ids.len();
let outputs = self.session.run(ort::inputs![
"input_ids" => Tensor::from_array(([1usize, len], ids))?,
"attention_mask" => Tensor::from_array(([1usize, len], mask.clone()))?,
"token_type_ids" => Tensor::from_array(([1usize, len], type_ids))?,
])?;
let (shape, data) = outputs["last_hidden_state"].try_extract_tensor::<f32>()?;
let hidden = shape[2] as usize;
let mut vector = vec![0f32; hidden];
let mut count = 0f32;
for (token, &m) in mask.iter().enumerate() {
if m == 1 {
count += 1.0;
let offset = token * hidden;
for (j, v) in vector.iter_mut().enumerate() {
*v += data[offset + j];
}
}
}
for v in vector.iter_mut() {
*v /= count.max(1.0);
}
let norm = vector.iter().map(|v| v * v).sum::<f32>().sqrt();
if norm > 0.0 {
for v in vector.iter_mut() {
*v /= norm;
}
}
Ok(vector)
}
}
pub struct TemplateCache {
parser: TemplateParser,
cache: LruCache<u64, Arc<[f32]>>,
hits: u64,
misses: u64,
}
impl TemplateCache {
pub const DEFAULT_CAPACITY: usize = 10_000;
pub fn new() -> Self {
Self::with_capacity(Self::DEFAULT_CAPACITY)
}
pub fn with_capacity(capacity: usize) -> Self {
Self {
parser: TemplateParser::new(),
cache: LruCache::new(NonZeroUsize::new(capacity.max(1)).unwrap()),
hits: 0,
misses: 0,
}
}
pub fn parse_and_lookup(&mut self, log: &str) -> (ParsedLog, Option<Arc<[f32]>>) {
let parsed = self.parser.parse(log);
match self.cache.get(&parsed.template_id) {
Some(vector) => {
self.hits += 1;
let vector = Arc::clone(vector);
(parsed, Some(vector))
}
None => {
self.misses += 1;
(parsed, None)
}
}
}
pub fn insert(&mut self, template_id: u64, vector: Arc<[f32]>) {
self.cache.put(template_id, vector);
}
pub fn hits(&self) -> u64 {
self.hits
}
pub fn misses(&self) -> u64 {
self.misses
}
pub fn hit_rate(&self) -> f64 {
let total = self.hits + self.misses;
if total == 0 {
0.0
} else {
self.hits as f64 / total as f64
}
}
}
impl Default for TemplateCache {
fn default() -> Self {
Self::new()
}
}
pub struct VectorCache {
templates: TemplateCache,
embedder: Embedder,
}
impl VectorCache {
pub const DEFAULT_CAPACITY: usize = TemplateCache::DEFAULT_CAPACITY;
pub fn new(embedder: Embedder) -> Self {
Self::with_capacity(embedder, Self::DEFAULT_CAPACITY)
}
pub fn with_capacity(embedder: Embedder, capacity: usize) -> Self {
Self {
templates: TemplateCache::with_capacity(capacity),
embedder,
}
}
pub fn get_or_embed(&mut self, log: &str) -> Result<(ParsedLog, Arc<[f32]>)> {
let (parsed, cached) = self.templates.parse_and_lookup(log);
if let Some(vector) = cached {
return Ok((parsed, vector));
}
let vector: Arc<[f32]> = self.embedder.embed(&parsed.template)?.into();
self.templates
.insert(parsed.template_id, Arc::clone(&vector));
Ok((parsed, vector))
}
pub fn embed_uncached(&mut self, text: &str) -> Result<Vec<f32>> {
self.embedder.embed(text)
}
pub fn hits(&self) -> u64 {
self.templates.hits()
}
pub fn misses(&self) -> u64 {
self.templates.misses()
}
pub fn hit_rate(&self) -> f64 {
self.templates.hit_rate()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn fnv1a64_deterministic() {
assert_eq!(fnv1a64("Node <*> is online"), fnv1a64("Node <*> is online"));
assert_ne!(fnv1a64("Node <*> is online"), fnv1a64("Node <*> offline"));
}
#[test]
fn same_pattern_same_template_id() {
let mut parser = TemplateParser::new();
let a = parser.parse("Node 2 is online");
let b = parser.parse("Node 4 is online");
assert_eq!(
a.template_id, b.template_id,
"logs differing only in variables should share the same template"
);
assert_eq!(a.raw_message, "Node 2 is online");
}
#[test]
fn different_pattern_different_template_id() {
let mut parser = TemplateParser::new();
let a = parser.parse("connection accepted from 10.0.0.1 port 5432");
let b = parser.parse("disk usage at 91 percent on /var");
assert_ne!(a.template_id, b.template_id);
}
}