use crate::types::{Hash, NodeId, Timestamp};
use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Signal {
pub id: Hash,
pub signal_type: String,
pub target_node: Option<NodeId>,
pub payload: serde_json::Value,
pub source: String,
pub timestamp: Timestamp,
}
impl Signal {
pub fn new(
signal_type: impl Into<String>,
payload: serde_json::Value,
source: impl Into<String>,
) -> Self {
let signal_type = signal_type.into();
let source = source.into();
let content = serde_json::json!({
"type": signal_type,
"payload": payload,
"source": source,
});
let id = Hash::digest(content.to_string().as_bytes());
Self {
id,
signal_type,
target_node: None,
payload,
source,
timestamp: Timestamp::now(),
}
}
pub fn with_target(mut self, node_id: NodeId) -> Self {
self.target_node = Some(node_id);
self
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SignalBatch {
pub signals: Vec<Signal>,
pub created_at: Timestamp,
}
impl SignalBatch {
pub fn new() -> Self {
Self {
signals: Vec::new(),
created_at: Timestamp::now(),
}
}
pub fn add(&mut self, signal: Signal) {
self.signals.push(signal);
}
pub fn len(&self) -> usize {
self.signals.len()
}
pub fn is_empty(&self) -> bool {
self.signals.is_empty()
}
}
impl Default for SignalBatch {
fn default() -> Self {
Self::new()
}
}
pub struct SignalIngestion {
buffer: VecDeque<Signal>,
max_batch_size: usize,
processed_ids: std::collections::HashSet<Hash>,
}
impl SignalIngestion {
pub fn new(max_batch_size: usize) -> Self {
Self {
buffer: VecDeque::new(),
max_batch_size,
processed_ids: std::collections::HashSet::new(),
}
}
pub fn ingest(&mut self, signal: Signal) -> bool {
if self.processed_ids.contains(&signal.id) {
return false;
}
self.processed_ids.insert(signal.id);
self.buffer.push_back(signal);
true
}
pub fn next_batch(&mut self) -> Option<SignalBatch> {
if self.buffer.is_empty() {
return None;
}
let mut batch = SignalBatch::new();
while batch.len() < self.max_batch_size {
if let Some(signal) = self.buffer.pop_front() {
batch.add(signal);
} else {
break;
}
}
if batch.is_empty() {
None
} else {
Some(batch)
}
}
pub fn buffer_size(&self) -> usize {
self.buffer.len()
}
pub fn clear_processed_ids(&mut self) {
self.processed_ids.clear();
}
}
impl Default for SignalIngestion {
fn default() -> Self {
Self::new(100)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_signal_creation() {
let signal = Signal::new(
"observation",
serde_json::json!({"value": 42}),
"test-source",
);
assert_eq!(signal.signal_type, "observation");
assert_eq!(signal.source, "test-source");
}
#[test]
fn test_duplicate_rejection() {
let mut ingestion = SignalIngestion::new(10);
let signal = Signal::new("test", serde_json::json!({}), "source");
let signal_clone = signal.clone();
assert!(ingestion.ingest(signal));
assert!(!ingestion.ingest(signal_clone)); }
#[test]
fn test_batching() {
let mut ingestion = SignalIngestion::new(2);
for i in 0..5 {
let signal = Signal::new("test", serde_json::json!({"i": i}), "source");
ingestion.ingest(signal);
}
let batch1 = ingestion.next_batch().unwrap();
assert_eq!(batch1.len(), 2);
let batch2 = ingestion.next_batch().unwrap();
assert_eq!(batch2.len(), 2);
let batch3 = ingestion.next_batch().unwrap();
assert_eq!(batch3.len(), 1);
assert!(ingestion.next_batch().is_none());
}
}