use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use std::fs::{self, File, OpenOptions};
use std::io::{self, BufRead, BufReader, BufWriter, Write};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use tracing::{debug, info, warn};
use super::types::ThreatSignal;
const DEFAULT_MAX_BUFFER_SIZE: u64 = 10 * 1024 * 1024;
const DEFAULT_MAX_SIGNALS: usize = 10_000;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SignalBufferConfig {
pub enabled: bool,
pub buffer_path: PathBuf,
pub max_buffer_size: u64,
pub max_signals: usize,
}
impl Default for SignalBufferConfig {
fn default() -> Self {
Self {
enabled: false,
buffer_path: PathBuf::from("/var/lib/synapse/signals.jsonl"),
max_buffer_size: DEFAULT_MAX_BUFFER_SIZE,
max_signals: DEFAULT_MAX_SIGNALS,
}
}
}
impl SignalBufferConfig {
pub fn with_path<P: AsRef<Path>>(path: P) -> Self {
Self {
enabled: true,
buffer_path: path.as_ref().to_path_buf(),
..Default::default()
}
}
pub fn validate(&self) -> Result<(), SignalBufferError> {
if self.enabled && self.buffer_path.as_os_str().is_empty() {
return Err(SignalBufferError::InvalidConfig(
"buffer_path is required when enabled".into(),
));
}
if self.max_buffer_size == 0 {
return Err(SignalBufferError::InvalidConfig(
"max_buffer_size must be > 0".into(),
));
}
if self.max_signals == 0 {
return Err(SignalBufferError::InvalidConfig(
"max_signals must be > 0".into(),
));
}
Ok(())
}
}
#[derive(Debug, thiserror::Error)]
pub enum SignalBufferError {
#[error("I/O error: {0}")]
Io(#[from] io::Error),
#[error("JSON error: {0}")]
Json(#[from] serde_json::Error),
#[error("Invalid configuration: {0}")]
InvalidConfig(String),
#[error("Buffer full: {0}")]
BufferFull(String),
}
#[derive(Debug, Clone, Default, Serialize)]
pub struct SignalBufferStats {
pub buffered_signals: usize,
pub signals_written: u64,
pub signals_drained: u64,
pub signals_dropped: u64,
pub buffer_size_bytes: u64,
}
pub struct SignalBuffer {
config: SignalBufferConfig,
signals: RwLock<Vec<ThreatSignal>>,
signals_written: AtomicU64,
signals_drained: AtomicU64,
signals_dropped: AtomicU64,
}
impl SignalBuffer {
pub fn new(config: SignalBufferConfig) -> Result<Self, SignalBufferError> {
config.validate()?;
Ok(Self {
config,
signals: RwLock::new(Vec::new()),
signals_written: AtomicU64::new(0),
signals_drained: AtomicU64::new(0),
signals_dropped: AtomicU64::new(0),
})
}
pub fn disabled() -> Self {
Self {
config: SignalBufferConfig::default(),
signals: RwLock::new(Vec::new()),
signals_written: AtomicU64::new(0),
signals_drained: AtomicU64::new(0),
signals_dropped: AtomicU64::new(0),
}
}
pub fn load_existing(&self) -> Result<usize, SignalBufferError> {
if !self.config.enabled {
return Ok(0);
}
let path = &self.config.buffer_path;
if !path.exists() {
debug!("No existing buffer file at {:?}", path);
return Ok(0);
}
let file = File::open(path)?;
let reader = BufReader::new(file);
let mut signals = self.signals.write();
let mut loaded = 0;
for line in reader.lines() {
let line = line?;
if line.is_empty() {
continue;
}
match serde_json::from_str::<ThreatSignal>(&line) {
Ok(signal) => {
if signals.len() < self.config.max_signals {
signals.push(signal);
loaded += 1;
}
}
Err(e) => {
warn!("Skipping invalid signal line: {}", e);
}
}
}
if loaded > 0 {
info!("Loaded {} signals from buffer file {:?}", loaded, path);
}
Ok(loaded)
}
pub fn append(&self, signal: ThreatSignal) -> Result<bool, SignalBufferError> {
if !self.config.enabled {
return Ok(false);
}
let mut signals = self.signals.write();
if signals.len() >= self.config.max_signals {
self.signals_dropped.fetch_add(1, Ordering::Relaxed);
debug!(
"Signal buffer full ({}/{}), dropping signal",
signals.len(),
self.config.max_signals
);
return Err(SignalBufferError::BufferFull(format!(
"max_signals ({}) reached",
self.config.max_signals
)));
}
self.append_to_file(&signal)?;
signals.push(signal);
self.signals_written.fetch_add(1, Ordering::Relaxed);
Ok(true)
}
fn append_to_file(&self, signal: &ThreatSignal) -> Result<(), SignalBufferError> {
if let Some(parent) = self.config.buffer_path.parent() {
fs::create_dir_all(parent)?;
}
let file = OpenOptions::new()
.create(true)
.append(true)
.open(&self.config.buffer_path)?;
let metadata = file.metadata()?;
if metadata.len() >= self.config.max_buffer_size {
return Err(SignalBufferError::BufferFull(format!(
"max_buffer_size ({}) reached",
self.config.max_buffer_size
)));
}
let mut writer = BufWriter::new(file);
serde_json::to_writer(&mut writer, signal)?;
writeln!(writer)?;
writer.flush()?;
Ok(())
}
pub fn drain(&self) -> Result<Vec<ThreatSignal>, SignalBufferError> {
if !self.config.enabled {
return Ok(Vec::new());
}
let mut signals = self.signals.write();
let drained: Vec<ThreatSignal> = signals.drain(..).collect();
let count = drained.len() as u64;
if self.config.buffer_path.exists() {
fs::remove_file(&self.config.buffer_path)?;
}
if count > 0 {
self.signals_drained.fetch_add(count, Ordering::Relaxed);
info!("Drained {} signals from buffer", count);
}
Ok(drained)
}
pub fn clear(&self) -> Result<(), SignalBufferError> {
if !self.config.enabled {
return Ok(());
}
let mut signals = self.signals.write();
let count = signals.len();
signals.clear();
if self.config.buffer_path.exists() {
fs::remove_file(&self.config.buffer_path)?;
}
if count > 0 {
debug!("Cleared {} signals from buffer", count);
}
Ok(())
}
pub fn len(&self) -> usize {
self.signals.read().len()
}
pub fn is_empty(&self) -> bool {
self.signals.read().is_empty()
}
pub fn is_enabled(&self) -> bool {
self.config.enabled
}
pub fn stats(&self) -> SignalBufferStats {
let signals = self.signals.read();
let buffer_size = self
.config
.buffer_path
.metadata()
.map(|m| m.len())
.unwrap_or(0);
SignalBufferStats {
buffered_signals: signals.len(),
signals_written: self.signals_written.load(Ordering::Relaxed),
signals_drained: self.signals_drained.load(Ordering::Relaxed),
signals_dropped: self.signals_dropped.load(Ordering::Relaxed),
buffer_size_bytes: buffer_size,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
fn create_test_signal() -> ThreatSignal {
use super::super::types::{Severity, SignalType};
ThreatSignal::new(SignalType::IpThreat, Severity::High)
.with_source_ip("192.168.1.100")
.with_confidence(0.95)
}
#[test]
fn test_disabled_buffer() {
let buffer = SignalBuffer::disabled();
assert!(!buffer.is_enabled());
let signal = create_test_signal();
let result = buffer.append(signal).unwrap();
assert!(!result);
assert!(buffer.drain().unwrap().is_empty());
}
#[test]
fn test_append_and_drain() {
let dir = tempdir().unwrap();
let path = dir.path().join("signals.jsonl");
let config = SignalBufferConfig::with_path(&path);
let buffer = SignalBuffer::new(config).unwrap();
for _ in 0..3 {
buffer.append(create_test_signal()).unwrap();
}
assert_eq!(buffer.len(), 3);
assert!(path.exists());
let signals = buffer.drain().unwrap();
assert_eq!(signals.len(), 3);
assert!(buffer.is_empty());
assert!(!path.exists()); }
#[test]
fn test_load_existing() {
let dir = tempdir().unwrap();
let path = dir.path().join("signals.jsonl");
{
let config = SignalBufferConfig::with_path(&path);
let buffer = SignalBuffer::new(config).unwrap();
buffer.append(create_test_signal()).unwrap();
buffer.append(create_test_signal()).unwrap();
}
let config = SignalBufferConfig::with_path(&path);
let buffer = SignalBuffer::new(config).unwrap();
let loaded = buffer.load_existing().unwrap();
assert_eq!(loaded, 2);
assert_eq!(buffer.len(), 2);
}
#[test]
fn test_max_signals_limit() {
let dir = tempdir().unwrap();
let path = dir.path().join("signals.jsonl");
let config = SignalBufferConfig {
enabled: true,
buffer_path: path,
max_buffer_size: DEFAULT_MAX_BUFFER_SIZE,
max_signals: 2,
};
let buffer = SignalBuffer::new(config).unwrap();
assert!(buffer.append(create_test_signal()).is_ok());
assert!(buffer.append(create_test_signal()).is_ok());
let result = buffer.append(create_test_signal());
assert!(matches!(result, Err(SignalBufferError::BufferFull(_))));
let stats = buffer.stats();
assert_eq!(stats.signals_dropped, 1);
}
#[test]
fn test_stats() {
let dir = tempdir().unwrap();
let path = dir.path().join("signals.jsonl");
let config = SignalBufferConfig::with_path(&path);
let buffer = SignalBuffer::new(config).unwrap();
buffer.append(create_test_signal()).unwrap();
buffer.append(create_test_signal()).unwrap();
let stats = buffer.stats();
assert_eq!(stats.buffered_signals, 2);
assert_eq!(stats.signals_written, 2);
assert_eq!(stats.signals_drained, 0);
buffer.drain().unwrap();
let stats = buffer.stats();
assert_eq!(stats.buffered_signals, 0);
assert_eq!(stats.signals_drained, 2);
}
#[test]
fn test_config_validation() {
let config = SignalBufferConfig {
enabled: true,
buffer_path: PathBuf::new(),
..Default::default()
};
assert!(config.validate().is_err());
let config = SignalBufferConfig {
enabled: true,
buffer_path: PathBuf::from("/tmp/test.jsonl"),
max_signals: 0,
..Default::default()
};
assert!(config.validate().is_err());
let config = SignalBufferConfig::with_path("/tmp/test.jsonl");
assert!(config.validate().is_ok());
}
#[test]
fn test_clear() {
let dir = tempdir().unwrap();
let path = dir.path().join("signals.jsonl");
let config = SignalBufferConfig::with_path(&path);
let buffer = SignalBuffer::new(config).unwrap();
buffer.append(create_test_signal()).unwrap();
buffer.append(create_test_signal()).unwrap();
assert_eq!(buffer.len(), 2);
assert!(path.exists());
buffer.clear().unwrap();
assert!(buffer.is_empty());
assert!(!path.exists());
}
}