Skip to main content

synapse_pingora/horizon/
signal_buffer.rs

1//! Persistent signal buffer for offline signal storage.
2//!
3//! Provides durable storage for threat signals when the WebSocket connection
4//! is unavailable. Signals are persisted to disk using an append-only JSONL
5//! format and replayed when the connection is restored.
6//!
7//! # Architecture
8//!
9//! ```text
10//! ThreatSignal → SignalBuffer → signals.jsonl
11//!                      │
12//!                      └── On reconnect: drain() → WebSocket
13//! ```
14
15use parking_lot::RwLock;
16use serde::{Deserialize, Serialize};
17use std::fs::{self, File, OpenOptions};
18use std::io::{self, BufRead, BufReader, BufWriter, Write};
19use std::path::{Path, PathBuf};
20use std::sync::atomic::{AtomicU64, Ordering};
21use tracing::{debug, info, warn};
22
23use super::types::ThreatSignal;
24
25/// Default maximum buffer size (10MB)
26const DEFAULT_MAX_BUFFER_SIZE: u64 = 10 * 1024 * 1024;
27
28/// Default maximum signals to buffer
29const DEFAULT_MAX_SIGNALS: usize = 10_000;
30
31/// Configuration for the signal buffer.
32#[derive(Debug, Clone, Serialize, Deserialize)]
33pub struct SignalBufferConfig {
34    /// Whether persistent buffering is enabled
35    pub enabled: bool,
36    /// Path to the buffer file
37    pub buffer_path: PathBuf,
38    /// Maximum buffer file size in bytes
39    pub max_buffer_size: u64,
40    /// Maximum number of signals to buffer
41    pub max_signals: usize,
42}
43
44impl Default for SignalBufferConfig {
45    fn default() -> Self {
46        Self {
47            enabled: false,
48            buffer_path: PathBuf::from("/var/lib/synapse/signals.jsonl"),
49            max_buffer_size: DEFAULT_MAX_BUFFER_SIZE,
50            max_signals: DEFAULT_MAX_SIGNALS,
51        }
52    }
53}
54
55impl SignalBufferConfig {
56    /// Create a new config with the given buffer path.
57    pub fn with_path<P: AsRef<Path>>(path: P) -> Self {
58        Self {
59            enabled: true,
60            buffer_path: path.as_ref().to_path_buf(),
61            ..Default::default()
62        }
63    }
64
65    /// Validate the configuration.
66    pub fn validate(&self) -> Result<(), SignalBufferError> {
67        if self.enabled && self.buffer_path.as_os_str().is_empty() {
68            return Err(SignalBufferError::InvalidConfig(
69                "buffer_path is required when enabled".into(),
70            ));
71        }
72        if self.max_buffer_size == 0 {
73            return Err(SignalBufferError::InvalidConfig(
74                "max_buffer_size must be > 0".into(),
75            ));
76        }
77        if self.max_signals == 0 {
78            return Err(SignalBufferError::InvalidConfig(
79                "max_signals must be > 0".into(),
80            ));
81        }
82        Ok(())
83    }
84}
85
86/// Errors from signal buffer operations.
87#[derive(Debug, thiserror::Error)]
88pub enum SignalBufferError {
89    #[error("I/O error: {0}")]
90    Io(#[from] io::Error),
91
92    #[error("JSON error: {0}")]
93    Json(#[from] serde_json::Error),
94
95    #[error("Invalid configuration: {0}")]
96    InvalidConfig(String),
97
98    #[error("Buffer full: {0}")]
99    BufferFull(String),
100}
101
102/// Statistics for the signal buffer.
103#[derive(Debug, Clone, Default, Serialize)]
104pub struct SignalBufferStats {
105    /// Number of signals currently buffered
106    pub buffered_signals: usize,
107    /// Total signals written to buffer
108    pub signals_written: u64,
109    /// Total signals drained from buffer
110    pub signals_drained: u64,
111    /// Total signals dropped due to buffer full
112    pub signals_dropped: u64,
113    /// Current buffer file size in bytes
114    pub buffer_size_bytes: u64,
115}
116
117/// Persistent signal buffer backed by a JSONL file.
118///
119/// Provides append-only storage for signals during WebSocket disconnection.
120/// Thread-safe and designed for concurrent access.
121pub struct SignalBuffer {
122    config: SignalBufferConfig,
123    /// In-memory buffer for fast access
124    signals: RwLock<Vec<ThreatSignal>>,
125    /// Statistics
126    signals_written: AtomicU64,
127    signals_drained: AtomicU64,
128    signals_dropped: AtomicU64,
129}
130
131impl SignalBuffer {
132    /// Create a new signal buffer with the given configuration.
133    ///
134    /// # Errors
135    /// Returns an error if the configuration is invalid.
136    pub fn new(config: SignalBufferConfig) -> Result<Self, SignalBufferError> {
137        config.validate()?;
138
139        Ok(Self {
140            config,
141            signals: RwLock::new(Vec::new()),
142            signals_written: AtomicU64::new(0),
143            signals_drained: AtomicU64::new(0),
144            signals_dropped: AtomicU64::new(0),
145        })
146    }
147
148    /// Create a disabled buffer (no-op operations).
149    pub fn disabled() -> Self {
150        Self {
151            config: SignalBufferConfig::default(),
152            signals: RwLock::new(Vec::new()),
153            signals_written: AtomicU64::new(0),
154            signals_drained: AtomicU64::new(0),
155            signals_dropped: AtomicU64::new(0),
156        }
157    }
158
159    /// Load existing signals from the buffer file on startup.
160    ///
161    /// Call this when the client starts to recover any signals that
162    /// were buffered before a restart.
163    pub fn load_existing(&self) -> Result<usize, SignalBufferError> {
164        if !self.config.enabled {
165            return Ok(0);
166        }
167
168        let path = &self.config.buffer_path;
169        if !path.exists() {
170            debug!("No existing buffer file at {:?}", path);
171            return Ok(0);
172        }
173
174        let file = File::open(path)?;
175        let reader = BufReader::new(file);
176        let mut signals = self.signals.write();
177        let mut loaded = 0;
178
179        for line in reader.lines() {
180            let line = line?;
181            if line.is_empty() {
182                continue;
183            }
184            match serde_json::from_str::<ThreatSignal>(&line) {
185                Ok(signal) => {
186                    if signals.len() < self.config.max_signals {
187                        signals.push(signal);
188                        loaded += 1;
189                    }
190                }
191                Err(e) => {
192                    warn!("Skipping invalid signal line: {}", e);
193                }
194            }
195        }
196
197        if loaded > 0 {
198            info!("Loaded {} signals from buffer file {:?}", loaded, path);
199        }
200
201        Ok(loaded)
202    }
203
204    /// Append a signal to the buffer.
205    ///
206    /// Signals are stored both in memory and persisted to disk for durability.
207    ///
208    /// # Returns
209    /// - `Ok(true)` if the signal was buffered
210    /// - `Ok(false)` if buffering is disabled
211    /// - `Err` if buffering failed
212    pub fn append(&self, signal: ThreatSignal) -> Result<bool, SignalBufferError> {
213        if !self.config.enabled {
214            return Ok(false);
215        }
216
217        // Check capacity
218        let mut signals = self.signals.write();
219        if signals.len() >= self.config.max_signals {
220            self.signals_dropped.fetch_add(1, Ordering::Relaxed);
221            debug!(
222                "Signal buffer full ({}/{}), dropping signal",
223                signals.len(),
224                self.config.max_signals
225            );
226            return Err(SignalBufferError::BufferFull(format!(
227                "max_signals ({}) reached",
228                self.config.max_signals
229            )));
230        }
231
232        // Persist to disk first for durability
233        self.append_to_file(&signal)?;
234
235        // Then add to memory
236        signals.push(signal);
237        self.signals_written.fetch_add(1, Ordering::Relaxed);
238
239        Ok(true)
240    }
241
242    /// Append a signal to the buffer file (atomic operation).
243    fn append_to_file(&self, signal: &ThreatSignal) -> Result<(), SignalBufferError> {
244        // Ensure parent directory exists
245        if let Some(parent) = self.config.buffer_path.parent() {
246            fs::create_dir_all(parent)?;
247        }
248
249        // Open file in append mode
250        let file = OpenOptions::new()
251            .create(true)
252            .append(true)
253            .open(&self.config.buffer_path)?;
254
255        // Check file size
256        let metadata = file.metadata()?;
257        if metadata.len() >= self.config.max_buffer_size {
258            return Err(SignalBufferError::BufferFull(format!(
259                "max_buffer_size ({}) reached",
260                self.config.max_buffer_size
261            )));
262        }
263
264        // Write signal as JSONL
265        let mut writer = BufWriter::new(file);
266        serde_json::to_writer(&mut writer, signal)?;
267        writeln!(writer)?;
268        writer.flush()?;
269
270        Ok(())
271    }
272
273    /// Drain all buffered signals.
274    ///
275    /// Returns the signals and clears both the in-memory buffer and the file.
276    /// Use this when the WebSocket connection is restored.
277    pub fn drain(&self) -> Result<Vec<ThreatSignal>, SignalBufferError> {
278        if !self.config.enabled {
279            return Ok(Vec::new());
280        }
281
282        let mut signals = self.signals.write();
283        let drained: Vec<ThreatSignal> = signals.drain(..).collect();
284        let count = drained.len() as u64;
285
286        // Clear the file
287        if self.config.buffer_path.exists() {
288            fs::remove_file(&self.config.buffer_path)?;
289        }
290
291        if count > 0 {
292            self.signals_drained.fetch_add(count, Ordering::Relaxed);
293            info!("Drained {} signals from buffer", count);
294        }
295
296        Ok(drained)
297    }
298
299    /// Clear all buffered signals without returning them.
300    pub fn clear(&self) -> Result<(), SignalBufferError> {
301        if !self.config.enabled {
302            return Ok(());
303        }
304
305        let mut signals = self.signals.write();
306        let count = signals.len();
307        signals.clear();
308
309        // Clear the file
310        if self.config.buffer_path.exists() {
311            fs::remove_file(&self.config.buffer_path)?;
312        }
313
314        if count > 0 {
315            debug!("Cleared {} signals from buffer", count);
316        }
317
318        Ok(())
319    }
320
321    /// Get the number of buffered signals.
322    pub fn len(&self) -> usize {
323        self.signals.read().len()
324    }
325
326    /// Check if the buffer is empty.
327    pub fn is_empty(&self) -> bool {
328        self.signals.read().is_empty()
329    }
330
331    /// Check if buffering is enabled.
332    pub fn is_enabled(&self) -> bool {
333        self.config.enabled
334    }
335
336    /// Get buffer statistics.
337    pub fn stats(&self) -> SignalBufferStats {
338        let signals = self.signals.read();
339        let buffer_size = self
340            .config
341            .buffer_path
342            .metadata()
343            .map(|m| m.len())
344            .unwrap_or(0);
345
346        SignalBufferStats {
347            buffered_signals: signals.len(),
348            signals_written: self.signals_written.load(Ordering::Relaxed),
349            signals_drained: self.signals_drained.load(Ordering::Relaxed),
350            signals_dropped: self.signals_dropped.load(Ordering::Relaxed),
351            buffer_size_bytes: buffer_size,
352        }
353    }
354}
355
356#[cfg(test)]
357mod tests {
358    use super::*;
359    use tempfile::tempdir;
360
361    fn create_test_signal() -> ThreatSignal {
362        use super::super::types::{Severity, SignalType};
363        ThreatSignal::new(SignalType::IpThreat, Severity::High)
364            .with_source_ip("192.168.1.100")
365            .with_confidence(0.95)
366    }
367
368    #[test]
369    fn test_disabled_buffer() {
370        let buffer = SignalBuffer::disabled();
371        assert!(!buffer.is_enabled());
372
373        let signal = create_test_signal();
374        let result = buffer.append(signal).unwrap();
375        assert!(!result); // Should return false when disabled
376
377        assert!(buffer.drain().unwrap().is_empty());
378    }
379
380    #[test]
381    fn test_append_and_drain() {
382        let dir = tempdir().unwrap();
383        let path = dir.path().join("signals.jsonl");
384
385        let config = SignalBufferConfig::with_path(&path);
386        let buffer = SignalBuffer::new(config).unwrap();
387
388        // Append signals
389        for _ in 0..3 {
390            buffer.append(create_test_signal()).unwrap();
391        }
392
393        assert_eq!(buffer.len(), 3);
394        assert!(path.exists());
395
396        // Drain signals
397        let signals = buffer.drain().unwrap();
398        assert_eq!(signals.len(), 3);
399        assert!(buffer.is_empty());
400        assert!(!path.exists()); // File should be deleted
401    }
402
403    #[test]
404    fn test_load_existing() {
405        let dir = tempdir().unwrap();
406        let path = dir.path().join("signals.jsonl");
407
408        // Create a buffer and add signals
409        {
410            let config = SignalBufferConfig::with_path(&path);
411            let buffer = SignalBuffer::new(config).unwrap();
412            buffer.append(create_test_signal()).unwrap();
413            buffer.append(create_test_signal()).unwrap();
414        }
415
416        // Create a new buffer and load existing
417        let config = SignalBufferConfig::with_path(&path);
418        let buffer = SignalBuffer::new(config).unwrap();
419        let loaded = buffer.load_existing().unwrap();
420
421        assert_eq!(loaded, 2);
422        assert_eq!(buffer.len(), 2);
423    }
424
425    #[test]
426    fn test_max_signals_limit() {
427        let dir = tempdir().unwrap();
428        let path = dir.path().join("signals.jsonl");
429
430        let config = SignalBufferConfig {
431            enabled: true,
432            buffer_path: path,
433            max_buffer_size: DEFAULT_MAX_BUFFER_SIZE,
434            max_signals: 2,
435        };
436        let buffer = SignalBuffer::new(config).unwrap();
437
438        // First two should succeed
439        assert!(buffer.append(create_test_signal()).is_ok());
440        assert!(buffer.append(create_test_signal()).is_ok());
441
442        // Third should fail
443        let result = buffer.append(create_test_signal());
444        assert!(matches!(result, Err(SignalBufferError::BufferFull(_))));
445
446        let stats = buffer.stats();
447        assert_eq!(stats.signals_dropped, 1);
448    }
449
450    #[test]
451    fn test_stats() {
452        let dir = tempdir().unwrap();
453        let path = dir.path().join("signals.jsonl");
454
455        let config = SignalBufferConfig::with_path(&path);
456        let buffer = SignalBuffer::new(config).unwrap();
457
458        buffer.append(create_test_signal()).unwrap();
459        buffer.append(create_test_signal()).unwrap();
460
461        let stats = buffer.stats();
462        assert_eq!(stats.buffered_signals, 2);
463        assert_eq!(stats.signals_written, 2);
464        assert_eq!(stats.signals_drained, 0);
465
466        buffer.drain().unwrap();
467
468        let stats = buffer.stats();
469        assert_eq!(stats.buffered_signals, 0);
470        assert_eq!(stats.signals_drained, 2);
471    }
472
473    #[test]
474    fn test_config_validation() {
475        // Invalid: empty path when enabled
476        let config = SignalBufferConfig {
477            enabled: true,
478            buffer_path: PathBuf::new(),
479            ..Default::default()
480        };
481        assert!(config.validate().is_err());
482
483        // Invalid: zero max_signals
484        let config = SignalBufferConfig {
485            enabled: true,
486            buffer_path: PathBuf::from("/tmp/test.jsonl"),
487            max_signals: 0,
488            ..Default::default()
489        };
490        assert!(config.validate().is_err());
491
492        // Valid
493        let config = SignalBufferConfig::with_path("/tmp/test.jsonl");
494        assert!(config.validate().is_ok());
495    }
496
497    #[test]
498    fn test_clear() {
499        let dir = tempdir().unwrap();
500        let path = dir.path().join("signals.jsonl");
501
502        let config = SignalBufferConfig::with_path(&path);
503        let buffer = SignalBuffer::new(config).unwrap();
504
505        buffer.append(create_test_signal()).unwrap();
506        buffer.append(create_test_signal()).unwrap();
507        assert_eq!(buffer.len(), 2);
508        assert!(path.exists());
509
510        buffer.clear().unwrap();
511        assert!(buffer.is_empty());
512        assert!(!path.exists());
513    }
514}