ethl 0.1.14

Tools for capturing, processing, archiving, and replaying Ethereum events
Documentation
//! Common error tracking utilities for RPC providers.
//!
//! This module provides shared error tracking infrastructure used by both
//! the backfill (HTTP) and heads (WebSocket) streaming modules.

use std::time::{Duration, Instant};
use tracing::{info, warn};

use crate::rpc::config;

/// Maximum length for error messages before truncation
pub const MAX_ERROR_MSG_LEN: usize = 512;

/// Maximum errors allowed within the sliding window before suspension
pub const MAX_ERRORS_PER_WINDOW: usize = config::MAX_RETRIES as usize;

/// Duration of the sliding window for error tracking (10 minutes)
pub const ERROR_WINDOW_SECONDS: u64 = 600;

/// Truncates error messages that are too long for logging
pub fn truncate_error(err: impl std::fmt::Display) -> String {
    let msg = err.to_string();
    if msg.len() <= MAX_ERROR_MSG_LEN {
        msg
    } else {
        format!(
            "{}... [truncated {} chars]",
            &msg[..MAX_ERROR_MSG_LEN],
            msg.len() - MAX_ERROR_MSG_LEN
        )
    }
}

/// Truncates error message to a specified length for inline logging
pub fn truncate_error_short(err: impl std::fmt::Display, max_len: usize) -> String {
    let msg = err.to_string();
    if msg.len() <= max_len {
        msg
    } else {
        msg[..max_len].to_string()
    }
}

/// Categorizes RPC errors for appropriate handling
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ErrorCategory {
    /// Response size too large - reduce batch size and retry
    ResponseTooLarge,
    /// Rate limit hit - apply backoff and retry
    RateLimit,
    /// Connection error - switch provider
    Connection,
    /// Other/unknown error - count towards suspension
    Other,
}

impl ErrorCategory {
    /// Categorize an error message into the appropriate category
    pub fn from_error_msg(error_msg: &str) -> Self {
        // Size/response too large errors
        if error_msg.contains("32081")
            || error_msg.contains("too large")
            || error_msg.contains("max results")
            || error_msg.contains("deserialization")
            || error_msg.contains("EOF")
            || error_msg.contains("truncated")
            || error_msg.contains("decoding response")
        {
            return Self::ResponseTooLarge;
        }

        // Rate limit errors
        if error_msg.contains("429")
            || error_msg.contains("Too Many Requests")
            || error_msg.contains("rate limit")
            || error_msg.contains("Rate limit")
        {
            return Self::RateLimit;
        }

        // Connection errors
        if error_msg.contains("Connection reset")
            || error_msg.contains("connection")
            || error_msg.contains("timeout")
            || error_msg.contains("Timeout")
        {
            return Self::Connection;
        }

        Self::Other
    }
}

/// Tracks error timestamps for a single provider using a sliding window.
///
/// This struct maintains a record of recent errors and can suspend a provider
/// if too many errors occur within the sliding window period.
pub struct ProviderErrorTracker {
    /// Timestamps of recent errors within the sliding window
    error_timestamps: Vec<Instant>,
    /// Whether this provider is currently suspended
    suspended: bool,
    /// Provider identifier for logging (hostname or endpoint name)
    identifier: String,
}

impl ProviderErrorTracker {
    /// Creates a new error tracker for a provider
    pub fn new(identifier: impl Into<String>) -> Self {
        Self {
            error_timestamps: Vec::new(),
            suspended: false,
            identifier: identifier.into(),
        }
    }

    /// Records an error and returns true if the provider should be suspended
    pub fn record_error(&mut self) -> bool {
        let now = Instant::now();
        self.cleanup_old_errors(now);
        self.error_timestamps.push(now);

        if self.error_timestamps.len() >= MAX_ERRORS_PER_WINDOW {
            self.suspended = true;
            warn!(
                "Provider {} suspended: {} errors in {} seconds",
                self.identifier,
                self.error_timestamps.len(),
                ERROR_WINDOW_SECONDS
            );
        }

        self.suspended
    }

    /// Clears error history on successful request
    pub fn record_success(&mut self) {
        self.error_timestamps.clear();
    }

    /// Removes errors older than the sliding window
    fn cleanup_old_errors(&mut self, now: Instant) {
        let window = Duration::from_secs(ERROR_WINDOW_SECONDS);
        self.error_timestamps
            .retain(|&ts| now.duration_since(ts) < window);

        // Un-suspend if errors have aged out
        if self.suspended && self.error_timestamps.len() < MAX_ERRORS_PER_WINDOW {
            info!("Provider {} un-suspended: errors aged out", self.identifier);
            self.suspended = false;
        }
    }

    /// Returns true if the provider is currently suspended
    pub fn is_suspended(&mut self) -> bool {
        self.cleanup_old_errors(Instant::now());
        self.suspended
    }

    /// Returns the current error count within the sliding window
    pub fn error_count(&mut self) -> usize {
        self.cleanup_old_errors(Instant::now());
        self.error_timestamps.len()
    }

    /// Returns the provider identifier
    pub fn identifier(&self) -> &str {
        &self.identifier
    }

    /// Calculates the backoff duration based on current error count
    pub fn backoff_duration(&mut self) -> Duration {
        let error_count = self.error_count();
        Duration::from_secs(config::BASE_BACKOFF_SECS.pow(error_count as u32))
    }
}

/// Finds the next active (non-suspended) provider starting from the given index.
/// Returns None if all providers are suspended.
pub fn find_active_provider(
    trackers: &mut [ProviderErrorTracker],
    start_index: usize,
) -> Option<usize> {
    let num_providers = trackers.len();
    for i in 0..num_providers {
        let idx = (start_index + i) % num_providers;
        if !trackers[idx].is_suspended() {
            return Some(idx);
        }
    }
    None
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_error_category_size_errors() {
        assert_eq!(
            ErrorCategory::from_error_msg("Response 32081 too large"),
            ErrorCategory::ResponseTooLarge
        );
        assert_eq!(
            ErrorCategory::from_error_msg("max results exceeded"),
            ErrorCategory::ResponseTooLarge
        );
        assert_eq!(
            ErrorCategory::from_error_msg("deserialization failed"),
            ErrorCategory::ResponseTooLarge
        );
        assert_eq!(
            ErrorCategory::from_error_msg("error decoding response body"),
            ErrorCategory::ResponseTooLarge
        );
    }

    #[test]
    fn test_error_category_rate_limit() {
        assert_eq!(
            ErrorCategory::from_error_msg("HTTP error: 429 Too Many Requests"),
            ErrorCategory::RateLimit
        );
        assert_eq!(
            ErrorCategory::from_error_msg("rate limit exceeded"),
            ErrorCategory::RateLimit
        );
    }

    #[test]
    fn test_error_category_connection() {
        assert_eq!(
            ErrorCategory::from_error_msg("Connection reset without closing handshake"),
            ErrorCategory::Connection
        );
        assert_eq!(
            ErrorCategory::from_error_msg("request timeout"),
            ErrorCategory::Connection
        );
    }

    #[test]
    fn test_provider_error_tracker() {
        let mut tracker = ProviderErrorTracker::new("test-provider");
        assert!(!tracker.is_suspended());
        assert_eq!(tracker.error_count(), 0);

        // Record some errors but not enough to suspend
        for _ in 0..(MAX_ERRORS_PER_WINDOW - 1) {
            assert!(!tracker.record_error());
        }
        assert!(!tracker.is_suspended());
        assert_eq!(tracker.error_count(), MAX_ERRORS_PER_WINDOW - 1);

        // One more error should suspend
        assert!(tracker.record_error());
        assert!(tracker.is_suspended());

        // Success should clear errors
        tracker.record_success();
        assert_eq!(tracker.error_count(), 0);
        // Note: suspended state is only cleared by cleanup_old_errors, not record_success
    }

    #[test]
    fn test_truncate_error() {
        let short_msg = "short error";
        assert_eq!(truncate_error(short_msg), short_msg);

        let long_msg = "x".repeat(1000);
        let truncated = truncate_error(&long_msg);
        assert!(truncated.len() < long_msg.len());
        assert!(truncated.contains("[truncated"));
    }

    #[test]
    fn test_find_active_provider() {
        let mut trackers = vec![
            ProviderErrorTracker::new("provider-0"),
            ProviderErrorTracker::new("provider-1"),
            ProviderErrorTracker::new("provider-2"),
        ];

        // All active - should return start index
        assert_eq!(find_active_provider(&mut trackers, 0), Some(0));
        assert_eq!(find_active_provider(&mut trackers, 1), Some(1));

        // Suspend first provider
        for _ in 0..MAX_ERRORS_PER_WINDOW {
            trackers[0].record_error();
        }
        assert_eq!(find_active_provider(&mut trackers, 0), Some(1));

        // Suspend all providers
        for tracker in &mut trackers {
            for _ in 0..MAX_ERRORS_PER_WINDOW {
                tracker.record_error();
            }
        }
        assert_eq!(find_active_provider(&mut trackers, 0), None);
    }
}