use super::Analyzer;
use crate::framework::core::Event;
use crate::framework::core::timestamp::boot_ns_to_epoch_ms;
use async_trait::async_trait;
use futures::stream::{Stream, StreamExt};
use std::pin::Pin;
type EventStream = Pin<Box<dyn Stream<Item = Event> + Send>>;
#[derive(Debug)]
pub struct TimestampNormalizer {}
impl TimestampNormalizer {
pub fn new() -> Self {
Self {}
}
}
impl Default for TimestampNormalizer {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl Analyzer for TimestampNormalizer {
async fn process(
&mut self,
stream: EventStream,
) -> Result<EventStream, Box<dyn std::error::Error + Send + Sync>> {
let normalized_stream = stream.map(|mut event| {
let timestamp_ms = boot_ns_to_epoch_ms(event.timestamp);
event.timestamp = timestamp_ms;
event
});
Ok(Box::pin(normalized_stream))
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::stream;
use serde_json::json;
#[tokio::test]
async fn test_timestamp_normalizer() {
let mut normalizer = TimestampNormalizer::new();
let test_event = Event::new_with_timestamp(
1_000_000_000, "test".to_string(),
1234,
"test_comm".to_string(),
json!({"test": "data"}),
);
let input_stream = stream::iter(vec![test_event]);
let output_stream = normalizer.process(Box::pin(input_stream)).await.unwrap();
let results: Vec<Event> = output_stream.collect().await;
assert_eq!(results.len(), 1);
assert!(results[0].timestamp > 1_000_000_000_000); }
#[tokio::test]
async fn test_timestamp_normalizer_multiple_events() {
let mut normalizer = TimestampNormalizer::new();
let events = vec![
Event::new_with_timestamp(
1_000_000_000, "test".to_string(),
1234,
"test1".to_string(),
json!({"id": 1}),
),
Event::new_with_timestamp(
2_000_000_000, "test".to_string(),
1234,
"test2".to_string(),
json!({"id": 2}),
),
Event::new_with_timestamp(
3_000_000_000, "test".to_string(),
1234,
"test3".to_string(),
json!({"id": 3}),
),
];
let input_stream = stream::iter(events);
let output_stream = normalizer.process(Box::pin(input_stream)).await.unwrap();
let results: Vec<Event> = output_stream.collect().await;
assert_eq!(results.len(), 3);
assert!(results[0].timestamp < results[1].timestamp);
assert!(results[1].timestamp < results[2].timestamp);
for result in &results {
assert!(result.timestamp > 1_000_000_000_000); }
}
}