use async_trait::async_trait;
use chrono::{DateTime, Utc};
use parking_lot::Mutex;
use regex::Regex;
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use std::cmp::Reverse;
use std::collections::{BinaryHeap, HashMap, VecDeque};
use std::sync::Arc;
use std::sync::OnceLock;
use crate::context::Context;
use crate::errors::{ErrorCode, ModuleError};
use crate::middleware::base::Middleware;
use super::storage::StorageBackend;
use super::store::{InMemoryObservabilityStore, ObservabilityStore};
fn error_code_string(code: ErrorCode) -> String {
serde_json::to_value(code)
.ok()
.and_then(|v| v.as_str().map(str::to_owned))
.unwrap_or_else(|| format!("{code:?}"))
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ErrorEntry {
pub module_id: String,
#[serde(rename = "code", alias = "error_code")]
pub error_code: String,
pub message: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub ai_guidance: Option<String>,
pub timestamp: DateTime<Utc>,
pub count: u64,
pub first_occurred: DateTime<Utc>,
pub last_occurred: DateTime<Utc>,
#[serde(default)]
pub fingerprint: String,
}
fn uuid_re() -> &'static Regex {
static RE: OnceLock<Regex> = OnceLock::new();
RE.get_or_init(|| {
Regex::new(r"[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}")
.expect("UUID regex is valid")
})
}
fn iso8601_re() -> &'static Regex {
static RE: OnceLock<Regex> = OnceLock::new();
RE.get_or_init(|| {
Regex::new(r"\d{4}-\d{2}-\d{2}(T\d{2}:\d{2}:\d{2}(\.\d+)?(Z|[+-]\d{2}:\d{2})?)?")
.expect("ISO8601 regex is valid")
})
}
fn integer_re() -> &'static Regex {
static RE: OnceLock<Regex> = OnceLock::new();
RE.get_or_init(|| Regex::new(r"\b\d{4,}\b").expect("integer regex is valid"))
}
#[must_use]
pub fn normalize_message(msg: &str) -> String {
let s = uuid_re().replace_all(msg, "<UUID>");
let s = iso8601_re().replace_all(&s, "<TIMESTAMP>");
let s = integer_re().replace_all(&s, "<ID>");
s.trim().to_lowercase()
}
#[must_use]
pub fn compute_fingerprint(error_code: &str, module_id: &str, message: &str) -> String {
let normalized = normalize_message(message);
let raw = format!("{error_code}:{module_id}:{normalized}");
let mut hasher = Sha256::new();
hasher.update(raw.as_bytes());
format!("{:x}", hasher.finalize())
}
#[must_use]
pub fn compute_fingerprint_from_error(error: &ModuleError, module_id: &str) -> String {
let error_code = error_code_string(error.code);
compute_fingerprint(&error_code, module_id, &error.message)
}
type HeapEntry = Reverse<(DateTime<Utc>, u64, String)>;
#[derive(Debug, Default)]
struct ErrorHistoryState {
fp_index: HashMap<String, ErrorEntry>,
module_index: HashMap<String, VecDeque<String>>,
heap: BinaryHeap<HeapEntry>,
seq: u64,
}
#[derive(Debug, Clone)]
pub struct ErrorHistory {
state: Arc<Mutex<ErrorHistoryState>>,
max_entries_per_module: usize,
max_total_entries: usize,
store: Arc<dyn ObservabilityStore>,
storage_backend: Option<Arc<dyn StorageBackend>>,
}
impl ErrorHistory {
#[must_use]
pub fn new(max_entries_per_module: usize) -> Self {
Self::with_store_and_limits(
max_entries_per_module,
max_entries_per_module * 100,
Arc::new(InMemoryObservabilityStore::new()),
)
}
#[must_use]
pub fn with_limits(max_entries_per_module: usize, max_total_entries: usize) -> Self {
Self::with_store_and_limits(
max_entries_per_module,
max_total_entries,
Arc::new(InMemoryObservabilityStore::new()),
)
}
#[must_use]
pub fn with_store(store: Arc<dyn ObservabilityStore>) -> Self {
Self::with_store_and_limits(50, 1000, store)
}
#[must_use]
pub fn with_store_and_limits(
max_entries_per_module: usize,
max_total_entries: usize,
store: Arc<dyn ObservabilityStore>,
) -> Self {
Self {
state: Arc::new(Mutex::new(ErrorHistoryState::default())),
max_entries_per_module,
max_total_entries,
store,
storage_backend: None,
}
}
#[must_use]
pub fn with_storage_backend(
max_entries_per_module: usize,
max_total_entries: usize,
storage_backend: Option<Arc<dyn StorageBackend>>,
) -> Self {
Self {
state: Arc::new(Mutex::new(ErrorHistoryState::default())),
max_entries_per_module,
max_total_entries,
store: Arc::new(InMemoryObservabilityStore::new()),
storage_backend,
}
}
#[must_use]
pub fn with_storage(mut self, storage_backend: Option<Arc<dyn StorageBackend>>) -> Self {
self.storage_backend = storage_backend;
self
}
#[must_use]
pub fn store(&self) -> Arc<dyn ObservabilityStore> {
self.store.clone()
}
pub fn record(&self, module_id: &str, error: &ModuleError) {
self.record_at(module_id, error, Utc::now());
}
pub fn record_at(&self, module_id: &str, error: &ModuleError, when: DateTime<Utc>) {
let error_code = error_code_string(error.code);
let fp = compute_fingerprint(&error_code, module_id, &error.message);
let entry_to_notify: ErrorEntry;
{
let mut state = self.state.lock();
if let Some(existing) = state.fp_index.get_mut(&fp) {
existing.count += 1;
existing.last_occurred = when;
existing.timestamp = when;
entry_to_notify = existing.clone();
state.seq += 1;
let seq = state.seq;
state.heap.push(Reverse((when, seq, fp.clone())));
} else {
let entry = ErrorEntry {
module_id: module_id.to_string(),
error_code,
message: error.message.clone(),
ai_guidance: error.ai_guidance.clone(),
timestamp: when,
count: 1,
first_occurred: when,
last_occurred: when,
fingerprint: fp.clone(),
};
entry_to_notify = entry.clone();
state.fp_index.insert(fp.clone(), entry);
state
.module_index
.entry(module_id.to_string())
.or_default()
.push_back(fp.clone());
state.seq += 1;
let seq = state.seq;
state.heap.push(Reverse((when, seq, fp)));
evict_per_module(&mut state, module_id, self.max_entries_per_module);
evict_total(&mut state, self.max_total_entries);
}
}
let store = self.store.clone();
let backend = self.storage_backend.clone();
let entry_clone = entry_to_notify.clone();
if let Ok(handle) = tokio::runtime::Handle::try_current() {
handle.spawn(async move {
store.record_error(entry_to_notify).await;
});
if let Some(backend) = backend {
let fp = entry_clone.fingerprint.clone();
handle.spawn(async move {
if let Ok(value) = serde_json::to_value(&entry_clone) {
let _ = backend.save("error_history", &fp, value).await;
}
});
}
} else {
tracing::debug!(
"ErrorHistory::record called outside a tokio runtime; \
store notification skipped"
);
}
}
#[must_use]
pub fn get(&self, module_id: &str, limit: Option<usize>) -> Vec<ErrorEntry> {
let state = self.state.lock();
let Some(fps) = state.module_index.get(module_id) else {
return Vec::new();
};
let mut entries: Vec<ErrorEntry> = fps
.iter()
.filter_map(|fp| state.fp_index.get(fp).cloned())
.collect();
entries.sort_by_key(|e| Reverse(e.last_occurred));
if let Some(n) = limit {
entries.truncate(n);
}
entries
}
#[must_use]
pub fn get_all(&self, limit: Option<usize>) -> Vec<ErrorEntry> {
let state = self.state.lock();
let mut all: Vec<ErrorEntry> = state.fp_index.values().cloned().collect();
all.sort_by_key(|e| Reverse(e.last_occurred));
if let Some(n) = limit {
all.truncate(n);
}
all
}
#[must_use]
pub fn count(&self) -> usize {
self.state.lock().fp_index.len()
}
pub fn clear(&self, module_id: Option<&str>) {
let mut state = self.state.lock();
if let Some(id) = module_id {
if let Some(fps) = state.module_index.remove(id) {
for fp in fps {
state.fp_index.remove(&fp);
}
}
} else {
state.fp_index.clear();
state.module_index.clear();
state.heap.clear();
state.seq = 0;
}
}
}
fn evict_per_module(state: &mut ErrorHistoryState, module_id: &str, max_per_module: usize) {
let Some(fps) = state.module_index.get_mut(module_id) else {
return;
};
while fps.len() > max_per_module {
if let Some(evicted_fp) = fps.pop_front() {
state.fp_index.remove(&evicted_fp);
} else {
break;
}
}
if state
.module_index
.get(module_id)
.is_some_and(VecDeque::is_empty)
{
state.module_index.remove(module_id);
}
}
fn evict_total(state: &mut ErrorHistoryState, max_total: usize) {
while state.fp_index.len() > max_total {
if !pop_oldest(state) {
break;
}
}
}
fn pop_oldest(state: &mut ErrorHistoryState) -> bool {
while let Some(Reverse((heap_ts, _seq, fp))) = state.heap.pop() {
if let Some(entry) = state.fp_index.get(&fp) {
if entry.last_occurred != heap_ts {
continue;
}
let module_id = entry.module_id.clone();
state.fp_index.remove(&fp);
if let Some(fps) = state.module_index.get_mut(&module_id) {
fps.retain(|f| f != &fp);
if fps.is_empty() {
state.module_index.remove(&module_id);
}
}
return true;
}
}
false
}
#[derive(Debug)]
pub struct ErrorHistoryMiddleware {
history: ErrorHistory,
}
impl ErrorHistoryMiddleware {
#[must_use]
pub fn new(history: ErrorHistory) -> Self {
Self { history }
}
#[must_use]
pub fn history(&self) -> &ErrorHistory {
&self.history
}
}
#[async_trait]
impl Middleware for ErrorHistoryMiddleware {
fn name(&self) -> &'static str {
"error_history"
}
async fn before(
&self,
_module_id: &str,
_inputs: serde_json::Value,
_ctx: &Context<serde_json::Value>,
) -> Result<Option<serde_json::Value>, ModuleError> {
Ok(None)
}
async fn after(
&self,
_module_id: &str,
_inputs: serde_json::Value,
_output: serde_json::Value,
_ctx: &Context<serde_json::Value>,
) -> Result<Option<serde_json::Value>, ModuleError> {
Ok(None)
}
async fn on_error(
&self,
module_id: &str,
_inputs: serde_json::Value,
error: &ModuleError,
_ctx: &Context<serde_json::Value>,
) -> Result<Option<serde_json::Value>, ModuleError> {
self.history.record(module_id, error);
Ok(None)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn normalize_replaces_uuid() {
let s = normalize_message("token a1b2c3d4-e5f6-7890-abcd-ef1234567890 is invalid");
assert_eq!(s, "token <uuid> is invalid");
}
#[test]
fn normalize_replaces_integers_over_3_digits() {
let s = normalize_message("retry after 30000 ms");
assert_eq!(s, "retry after <id> ms");
}
#[test]
fn normalize_replaces_iso8601() {
let s = normalize_message("at 2026-01-01T10:00:00Z something failed");
assert_eq!(s, "at <timestamp> something failed");
}
#[test]
fn fingerprint_is_64_char_hex() {
let fp = compute_fingerprint("DB_TIMEOUT", "executor.db", "connection timed out");
assert_eq!(fp.len(), 64);
assert!(fp.chars().all(|c| c.is_ascii_hexdigit()));
}
#[test]
fn fingerprint_dedup_normalized_messages() {
let a = compute_fingerprint(
"TOKEN_INVALID",
"executor.auth",
"token a1b2c3d4-e5f6-7890-abcd-ef1234567890 is invalid",
);
let b = compute_fingerprint(
"TOKEN_INVALID",
"executor.auth",
"token 00000000-0000-0000-0000-000000000001 is invalid",
);
assert_eq!(a, b);
}
}