1use parking_lot::RwLock;
16use serde::{Deserialize, Serialize};
17use std::fs::{self, File, OpenOptions};
18use std::io::{self, BufRead, BufReader, BufWriter, Write};
19use std::path::{Path, PathBuf};
20use std::sync::atomic::{AtomicU64, Ordering};
21use tracing::{debug, info, warn};
22
23use super::types::ThreatSignal;
24
25const DEFAULT_MAX_BUFFER_SIZE: u64 = 10 * 1024 * 1024;
27
28const DEFAULT_MAX_SIGNALS: usize = 10_000;
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
33pub struct SignalBufferConfig {
34 pub enabled: bool,
36 pub buffer_path: PathBuf,
38 pub max_buffer_size: u64,
40 pub max_signals: usize,
42}
43
44impl Default for SignalBufferConfig {
45 fn default() -> Self {
46 Self {
47 enabled: false,
48 buffer_path: PathBuf::from("/var/lib/synapse/signals.jsonl"),
49 max_buffer_size: DEFAULT_MAX_BUFFER_SIZE,
50 max_signals: DEFAULT_MAX_SIGNALS,
51 }
52 }
53}
54
55impl SignalBufferConfig {
56 pub fn with_path<P: AsRef<Path>>(path: P) -> Self {
58 Self {
59 enabled: true,
60 buffer_path: path.as_ref().to_path_buf(),
61 ..Default::default()
62 }
63 }
64
65 pub fn validate(&self) -> Result<(), SignalBufferError> {
67 if self.enabled && self.buffer_path.as_os_str().is_empty() {
68 return Err(SignalBufferError::InvalidConfig(
69 "buffer_path is required when enabled".into(),
70 ));
71 }
72 if self.max_buffer_size == 0 {
73 return Err(SignalBufferError::InvalidConfig(
74 "max_buffer_size must be > 0".into(),
75 ));
76 }
77 if self.max_signals == 0 {
78 return Err(SignalBufferError::InvalidConfig(
79 "max_signals must be > 0".into(),
80 ));
81 }
82 Ok(())
83 }
84}
85
86#[derive(Debug, thiserror::Error)]
88pub enum SignalBufferError {
89 #[error("I/O error: {0}")]
90 Io(#[from] io::Error),
91
92 #[error("JSON error: {0}")]
93 Json(#[from] serde_json::Error),
94
95 #[error("Invalid configuration: {0}")]
96 InvalidConfig(String),
97
98 #[error("Buffer full: {0}")]
99 BufferFull(String),
100}
101
102#[derive(Debug, Clone, Default, Serialize)]
104pub struct SignalBufferStats {
105 pub buffered_signals: usize,
107 pub signals_written: u64,
109 pub signals_drained: u64,
111 pub signals_dropped: u64,
113 pub buffer_size_bytes: u64,
115}
116
117pub struct SignalBuffer {
122 config: SignalBufferConfig,
123 signals: RwLock<Vec<ThreatSignal>>,
125 signals_written: AtomicU64,
127 signals_drained: AtomicU64,
128 signals_dropped: AtomicU64,
129}
130
131impl SignalBuffer {
132 pub fn new(config: SignalBufferConfig) -> Result<Self, SignalBufferError> {
137 config.validate()?;
138
139 Ok(Self {
140 config,
141 signals: RwLock::new(Vec::new()),
142 signals_written: AtomicU64::new(0),
143 signals_drained: AtomicU64::new(0),
144 signals_dropped: AtomicU64::new(0),
145 })
146 }
147
148 pub fn disabled() -> Self {
150 Self {
151 config: SignalBufferConfig::default(),
152 signals: RwLock::new(Vec::new()),
153 signals_written: AtomicU64::new(0),
154 signals_drained: AtomicU64::new(0),
155 signals_dropped: AtomicU64::new(0),
156 }
157 }
158
159 pub fn load_existing(&self) -> Result<usize, SignalBufferError> {
164 if !self.config.enabled {
165 return Ok(0);
166 }
167
168 let path = &self.config.buffer_path;
169 if !path.exists() {
170 debug!("No existing buffer file at {:?}", path);
171 return Ok(0);
172 }
173
174 let file = File::open(path)?;
175 let reader = BufReader::new(file);
176 let mut signals = self.signals.write();
177 let mut loaded = 0;
178
179 for line in reader.lines() {
180 let line = line?;
181 if line.is_empty() {
182 continue;
183 }
184 match serde_json::from_str::<ThreatSignal>(&line) {
185 Ok(signal) => {
186 if signals.len() < self.config.max_signals {
187 signals.push(signal);
188 loaded += 1;
189 }
190 }
191 Err(e) => {
192 warn!("Skipping invalid signal line: {}", e);
193 }
194 }
195 }
196
197 if loaded > 0 {
198 info!("Loaded {} signals from buffer file {:?}", loaded, path);
199 }
200
201 Ok(loaded)
202 }
203
204 pub fn append(&self, signal: ThreatSignal) -> Result<bool, SignalBufferError> {
213 if !self.config.enabled {
214 return Ok(false);
215 }
216
217 let mut signals = self.signals.write();
219 if signals.len() >= self.config.max_signals {
220 self.signals_dropped.fetch_add(1, Ordering::Relaxed);
221 debug!(
222 "Signal buffer full ({}/{}), dropping signal",
223 signals.len(),
224 self.config.max_signals
225 );
226 return Err(SignalBufferError::BufferFull(format!(
227 "max_signals ({}) reached",
228 self.config.max_signals
229 )));
230 }
231
232 self.append_to_file(&signal)?;
234
235 signals.push(signal);
237 self.signals_written.fetch_add(1, Ordering::Relaxed);
238
239 Ok(true)
240 }
241
242 fn append_to_file(&self, signal: &ThreatSignal) -> Result<(), SignalBufferError> {
244 if let Some(parent) = self.config.buffer_path.parent() {
246 fs::create_dir_all(parent)?;
247 }
248
249 let file = OpenOptions::new()
251 .create(true)
252 .append(true)
253 .open(&self.config.buffer_path)?;
254
255 let metadata = file.metadata()?;
257 if metadata.len() >= self.config.max_buffer_size {
258 return Err(SignalBufferError::BufferFull(format!(
259 "max_buffer_size ({}) reached",
260 self.config.max_buffer_size
261 )));
262 }
263
264 let mut writer = BufWriter::new(file);
266 serde_json::to_writer(&mut writer, signal)?;
267 writeln!(writer)?;
268 writer.flush()?;
269
270 Ok(())
271 }
272
273 pub fn drain(&self) -> Result<Vec<ThreatSignal>, SignalBufferError> {
278 if !self.config.enabled {
279 return Ok(Vec::new());
280 }
281
282 let mut signals = self.signals.write();
283 let drained: Vec<ThreatSignal> = signals.drain(..).collect();
284 let count = drained.len() as u64;
285
286 if self.config.buffer_path.exists() {
288 fs::remove_file(&self.config.buffer_path)?;
289 }
290
291 if count > 0 {
292 self.signals_drained.fetch_add(count, Ordering::Relaxed);
293 info!("Drained {} signals from buffer", count);
294 }
295
296 Ok(drained)
297 }
298
299 pub fn clear(&self) -> Result<(), SignalBufferError> {
301 if !self.config.enabled {
302 return Ok(());
303 }
304
305 let mut signals = self.signals.write();
306 let count = signals.len();
307 signals.clear();
308
309 if self.config.buffer_path.exists() {
311 fs::remove_file(&self.config.buffer_path)?;
312 }
313
314 if count > 0 {
315 debug!("Cleared {} signals from buffer", count);
316 }
317
318 Ok(())
319 }
320
321 pub fn len(&self) -> usize {
323 self.signals.read().len()
324 }
325
326 pub fn is_empty(&self) -> bool {
328 self.signals.read().is_empty()
329 }
330
331 pub fn is_enabled(&self) -> bool {
333 self.config.enabled
334 }
335
336 pub fn stats(&self) -> SignalBufferStats {
338 let signals = self.signals.read();
339 let buffer_size = self
340 .config
341 .buffer_path
342 .metadata()
343 .map(|m| m.len())
344 .unwrap_or(0);
345
346 SignalBufferStats {
347 buffered_signals: signals.len(),
348 signals_written: self.signals_written.load(Ordering::Relaxed),
349 signals_drained: self.signals_drained.load(Ordering::Relaxed),
350 signals_dropped: self.signals_dropped.load(Ordering::Relaxed),
351 buffer_size_bytes: buffer_size,
352 }
353 }
354}
355
356#[cfg(test)]
357mod tests {
358 use super::*;
359 use tempfile::tempdir;
360
361 fn create_test_signal() -> ThreatSignal {
362 use super::super::types::{Severity, SignalType};
363 ThreatSignal::new(SignalType::IpThreat, Severity::High)
364 .with_source_ip("192.168.1.100")
365 .with_confidence(0.95)
366 }
367
368 #[test]
369 fn test_disabled_buffer() {
370 let buffer = SignalBuffer::disabled();
371 assert!(!buffer.is_enabled());
372
373 let signal = create_test_signal();
374 let result = buffer.append(signal).unwrap();
375 assert!(!result); assert!(buffer.drain().unwrap().is_empty());
378 }
379
380 #[test]
381 fn test_append_and_drain() {
382 let dir = tempdir().unwrap();
383 let path = dir.path().join("signals.jsonl");
384
385 let config = SignalBufferConfig::with_path(&path);
386 let buffer = SignalBuffer::new(config).unwrap();
387
388 for _ in 0..3 {
390 buffer.append(create_test_signal()).unwrap();
391 }
392
393 assert_eq!(buffer.len(), 3);
394 assert!(path.exists());
395
396 let signals = buffer.drain().unwrap();
398 assert_eq!(signals.len(), 3);
399 assert!(buffer.is_empty());
400 assert!(!path.exists()); }
402
403 #[test]
404 fn test_load_existing() {
405 let dir = tempdir().unwrap();
406 let path = dir.path().join("signals.jsonl");
407
408 {
410 let config = SignalBufferConfig::with_path(&path);
411 let buffer = SignalBuffer::new(config).unwrap();
412 buffer.append(create_test_signal()).unwrap();
413 buffer.append(create_test_signal()).unwrap();
414 }
415
416 let config = SignalBufferConfig::with_path(&path);
418 let buffer = SignalBuffer::new(config).unwrap();
419 let loaded = buffer.load_existing().unwrap();
420
421 assert_eq!(loaded, 2);
422 assert_eq!(buffer.len(), 2);
423 }
424
425 #[test]
426 fn test_max_signals_limit() {
427 let dir = tempdir().unwrap();
428 let path = dir.path().join("signals.jsonl");
429
430 let config = SignalBufferConfig {
431 enabled: true,
432 buffer_path: path,
433 max_buffer_size: DEFAULT_MAX_BUFFER_SIZE,
434 max_signals: 2,
435 };
436 let buffer = SignalBuffer::new(config).unwrap();
437
438 assert!(buffer.append(create_test_signal()).is_ok());
440 assert!(buffer.append(create_test_signal()).is_ok());
441
442 let result = buffer.append(create_test_signal());
444 assert!(matches!(result, Err(SignalBufferError::BufferFull(_))));
445
446 let stats = buffer.stats();
447 assert_eq!(stats.signals_dropped, 1);
448 }
449
450 #[test]
451 fn test_stats() {
452 let dir = tempdir().unwrap();
453 let path = dir.path().join("signals.jsonl");
454
455 let config = SignalBufferConfig::with_path(&path);
456 let buffer = SignalBuffer::new(config).unwrap();
457
458 buffer.append(create_test_signal()).unwrap();
459 buffer.append(create_test_signal()).unwrap();
460
461 let stats = buffer.stats();
462 assert_eq!(stats.buffered_signals, 2);
463 assert_eq!(stats.signals_written, 2);
464 assert_eq!(stats.signals_drained, 0);
465
466 buffer.drain().unwrap();
467
468 let stats = buffer.stats();
469 assert_eq!(stats.buffered_signals, 0);
470 assert_eq!(stats.signals_drained, 2);
471 }
472
473 #[test]
474 fn test_config_validation() {
475 let config = SignalBufferConfig {
477 enabled: true,
478 buffer_path: PathBuf::new(),
479 ..Default::default()
480 };
481 assert!(config.validate().is_err());
482
483 let config = SignalBufferConfig {
485 enabled: true,
486 buffer_path: PathBuf::from("/tmp/test.jsonl"),
487 max_signals: 0,
488 ..Default::default()
489 };
490 assert!(config.validate().is_err());
491
492 let config = SignalBufferConfig::with_path("/tmp/test.jsonl");
494 assert!(config.validate().is_ok());
495 }
496
497 #[test]
498 fn test_clear() {
499 let dir = tempdir().unwrap();
500 let path = dir.path().join("signals.jsonl");
501
502 let config = SignalBufferConfig::with_path(&path);
503 let buffer = SignalBuffer::new(config).unwrap();
504
505 buffer.append(create_test_signal()).unwrap();
506 buffer.append(create_test_signal()).unwrap();
507 assert_eq!(buffer.len(), 2);
508 assert!(path.exists());
509
510 buffer.clear().unwrap();
511 assert!(buffer.is_empty());
512 assert!(!path.exists());
513 }
514}