use super::error::SignalError;
use super::signal::Signal;
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HistoryEntry<T> {
pub timestamp: SystemTime,
pub payload: T,
pub success: bool,
pub error_message: Option<String>,
pub receiver_count: usize,
}
impl<T> HistoryEntry<T> {
pub fn new(payload: T, success: bool) -> Self {
Self {
timestamp: SystemTime::now(),
payload,
success,
error_message: None,
receiver_count: 0,
}
}
pub fn with_error(payload: T, error: String) -> Self {
Self {
timestamp: SystemTime::now(),
payload,
success: false,
error_message: Some(error),
receiver_count: 0,
}
}
pub fn with_details(
payload: T,
success: bool,
error_message: Option<String>,
receiver_count: usize,
) -> Self {
Self {
timestamp: SystemTime::now(),
payload,
success,
error_message,
receiver_count,
}
}
pub fn success(&self) -> bool {
self.success
}
pub fn error(&self) -> Option<&str> {
self.error_message.as_deref()
}
pub fn age(&self) -> Duration {
SystemTime::now()
.duration_since(self.timestamp)
.unwrap_or(Duration::from_secs(0))
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct HistoryConfig {
max_entries: usize,
ttl: Option<Duration>,
errors_only: bool,
}
impl HistoryConfig {
pub fn new() -> Self {
Self {
max_entries: 1000,
ttl: None,
errors_only: false,
}
}
pub fn with_max_entries(mut self, max: usize) -> Self {
self.max_entries = max;
self
}
pub fn with_ttl(mut self, ttl: Duration) -> Self {
self.ttl = Some(ttl);
self
}
pub fn with_errors_only(mut self, errors_only: bool) -> Self {
self.errors_only = errors_only;
self
}
pub fn max_entries(&self) -> usize {
self.max_entries
}
pub fn ttl(&self) -> Option<Duration> {
self.ttl
}
pub fn errors_only(&self) -> bool {
self.errors_only
}
}
impl Default for HistoryConfig {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Default)]
pub struct HistoryStats {
total_count: usize,
success_count: usize,
error_count: usize,
oldest_timestamp: Option<SystemTime>,
newest_timestamp: Option<SystemTime>,
}
impl HistoryStats {
pub fn new() -> Self {
Self::default()
}
pub fn total_count(&self) -> usize {
self.total_count
}
pub fn success_count(&self) -> usize {
self.success_count
}
pub fn error_count(&self) -> usize {
self.error_count
}
pub fn success_rate(&self) -> f64 {
if self.total_count == 0 {
return 100.0;
}
(self.success_count as f64 / self.total_count as f64) * 100.0
}
pub fn timespan(&self) -> Option<Duration> {
match (self.oldest_timestamp, self.newest_timestamp) {
(Some(oldest), Some(newest)) => newest.duration_since(oldest).ok(),
_ => None,
}
}
}
pub struct SignalHistory<T: Send + Sync + 'static> {
signal: Signal<T>,
config: HistoryConfig,
entries: Arc<RwLock<VecDeque<HistoryEntry<T>>>>,
}
impl<T: Send + Sync + Clone + 'static> SignalHistory<T> {
pub fn new(signal: Signal<T>, config: HistoryConfig) -> Self {
Self {
signal,
config,
entries: Arc::new(RwLock::new(VecDeque::new())),
}
}
pub async fn send(&self, instance: T) -> Result<(), SignalError> {
let payload = instance.clone();
let receiver_count = self.signal.receiver_count();
let result = self.signal.send(instance).await;
let success = result.is_ok();
let error_message = result.as_ref().err().map(|e| e.message.clone());
if !self.config.errors_only || !success {
let entry = HistoryEntry::with_details(payload, success, error_message, receiver_count);
self.add_entry(entry);
}
result
}
fn add_entry(&self, entry: HistoryEntry<T>) {
let mut entries = self.entries.write();
if let Some(ttl) = self.config.ttl {
let cutoff = SystemTime::now() - ttl;
entries.retain(|e| e.timestamp >= cutoff);
}
if entries.len() >= self.config.max_entries {
entries.pop_front();
}
entries.push_back(entry);
}
pub fn get_all(&self) -> Vec<HistoryEntry<T>> {
self.entries.read().iter().cloned().collect()
}
pub fn get_recent(&self, count: usize) -> Vec<HistoryEntry<T>> {
let entries = self.entries.read();
entries.iter().rev().take(count).cloned().collect()
}
pub fn get_errors(&self) -> Vec<HistoryEntry<T>> {
self.entries
.read()
.iter()
.filter(|e| !e.success)
.cloned()
.collect()
}
pub fn get_range(&self, start: SystemTime, end: SystemTime) -> Vec<HistoryEntry<T>> {
self.entries
.read()
.iter()
.filter(|e| e.timestamp >= start && e.timestamp <= end)
.cloned()
.collect()
}
pub fn stats(&self) -> HistoryStats {
let entries = self.entries.read();
let total_count = entries.len();
let success_count = entries.iter().filter(|e| e.success).count();
let error_count = total_count - success_count;
let oldest_timestamp = entries.front().map(|e| e.timestamp);
let newest_timestamp = entries.back().map(|e| e.timestamp);
HistoryStats {
total_count,
success_count,
error_count,
oldest_timestamp,
newest_timestamp,
}
}
pub fn clear(&self) {
self.entries.write().clear();
}
pub fn signal(&self) -> &Signal<T> {
&self.signal
}
}
impl<T: Send + Sync + Clone + 'static> Clone for SignalHistory<T> {
fn clone(&self) -> Self {
Self {
signal: self.signal.clone(),
config: self.config.clone(),
entries: Arc::clone(&self.entries),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::signals::SignalName;
use std::sync::atomic::{AtomicUsize, Ordering};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
struct TestEvent {
id: i32,
message: String,
}
#[test]
fn test_history_config() {
let config = HistoryConfig::new()
.with_max_entries(500)
.with_ttl(Duration::from_secs(3600))
.with_errors_only(true);
assert_eq!(config.max_entries(), 500);
assert_eq!(config.ttl(), Some(Duration::from_secs(3600)));
assert!(config.errors_only());
}
#[test]
fn test_history_entry() {
let entry = HistoryEntry::new(
TestEvent {
id: 1,
message: "test".to_string(),
},
true,
);
assert!(entry.success());
assert!(entry.error().is_none());
assert!(entry.age() < Duration::from_secs(1));
}
#[tokio::test]
async fn test_signal_history_basic() {
let signal = Signal::<TestEvent>::new(SignalName::custom("test_history"));
let config = HistoryConfig::new();
let history = SignalHistory::new(signal.clone(), config);
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = Arc::clone(&counter);
signal.connect(move |_event| {
let counter = Arc::clone(&counter_clone);
async move {
counter.fetch_add(1, Ordering::SeqCst);
Ok(())
}
});
for i in 0..5 {
history
.send(TestEvent {
id: i,
message: format!("Event {}", i),
})
.await
.unwrap();
}
let stats = history.stats();
assert_eq!(stats.total_count(), 5);
assert_eq!(stats.success_count(), 5);
assert_eq!(stats.error_count(), 0);
assert_eq!(stats.success_rate(), 100.0);
assert_eq!(counter.load(Ordering::SeqCst), 5);
}
#[tokio::test]
async fn test_history_max_entries() {
let signal = Signal::<TestEvent>::new(SignalName::custom("test_max"));
let config = HistoryConfig::new().with_max_entries(3);
let history = SignalHistory::new(signal, config);
for i in 0..5 {
history
.send(TestEvent {
id: i,
message: "test".to_string(),
})
.await
.unwrap();
}
let all = history.get_all();
assert_eq!(all.len(), 3);
assert_eq!(all[0].payload.id, 2);
assert_eq!(all[1].payload.id, 3);
assert_eq!(all[2].payload.id, 4);
}
#[tokio::test]
async fn test_history_errors_only() {
let signal = Signal::<TestEvent>::new(SignalName::custom("test_errors"));
let config = HistoryConfig::new().with_errors_only(true);
let history = SignalHistory::new(signal.clone(), config);
signal.connect(|_event| async move { Err(SignalError::new("Test error")) });
for i in 0..3 {
let _ = history
.send(TestEvent {
id: i,
message: "test".to_string(),
})
.await;
}
let stats = history.stats();
assert_eq!(stats.total_count(), 3);
assert_eq!(stats.error_count(), 3);
}
#[tokio::test]
async fn test_history_get_recent() {
let signal = Signal::<TestEvent>::new(SignalName::custom("test_recent"));
let history = SignalHistory::new(signal, HistoryConfig::new());
for i in 0..10 {
history
.send(TestEvent {
id: i,
message: "test".to_string(),
})
.await
.unwrap();
}
let recent = history.get_recent(3);
assert_eq!(recent.len(), 3);
assert_eq!(recent[0].payload.id, 9);
assert_eq!(recent[1].payload.id, 8);
assert_eq!(recent[2].payload.id, 7);
}
#[tokio::test]
async fn test_history_get_errors() {
let signal = Signal::<TestEvent>::new(SignalName::custom("test_get_errors"));
let history = SignalHistory::new(signal.clone(), HistoryConfig::new());
let fail_on_odd = Arc::new(AtomicUsize::new(0));
let fail_clone = Arc::clone(&fail_on_odd);
signal.connect(move |event| {
let fail = Arc::clone(&fail_clone);
async move {
if event.id % 2 == 1 {
fail.fetch_add(1, Ordering::SeqCst);
Err(SignalError::new("Odd number"))
} else {
Ok(())
}
}
});
for i in 0..10 {
let _ = history
.send(TestEvent {
id: i,
message: "test".to_string(),
})
.await;
}
let errors = history.get_errors();
assert_eq!(errors.len(), 5); assert!(errors.iter().all(|e| !e.success));
}
#[tokio::test]
async fn test_history_clear() {
let signal = Signal::<TestEvent>::new(SignalName::custom("test_clear"));
let history = SignalHistory::new(signal, HistoryConfig::new());
for i in 0..5 {
history
.send(TestEvent {
id: i,
message: "test".to_string(),
})
.await
.unwrap();
}
assert_eq!(history.stats().total_count(), 5);
history.clear();
assert_eq!(history.stats().total_count(), 0);
}
}