glowbarn 2.0.0

Multi-Sensor Anomaly Detection System - High Performance Native Application
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
// Copyright (c) 2026 bad-antics
// Licensed under the MIT License. See LICENSE file in the project root.
// https://github.com/bad-antics/glowbarn-rs

//! Database module for persistent storage

use anyhow::{anyhow, Result};
use rusqlite::{params, Connection};
use serde::{Deserialize, Serialize};
use std::path::Path;
use std::sync::{Arc, Mutex};
use chrono::{DateTime, Utc};
use tracing::{info, warn, debug};

use crate::sensors::SensorReading;
use crate::detection::Detection;
use crate::config::DatabaseConfig;

/// Database manager
pub struct Database {
    conn: Arc<Mutex<Connection>>,
    config: DatabaseConfig,
}

impl Database {
    /// Open or create database
    pub fn open(config: &DatabaseConfig) -> Result<Self> {
        // Create parent directories
        if let Some(parent) = config.path.parent() {
            std::fs::create_dir_all(parent)?;
        }
        
        let conn = Connection::open(&config.path)?;
        
        // Configure SQLite for performance
        conn.execute_batch(r#"
            PRAGMA journal_mode = WAL;
            PRAGMA synchronous = NORMAL;
            PRAGMA cache_size = -64000;
            PRAGMA temp_store = MEMORY;
            PRAGMA mmap_size = 268435456;
        "#)?;
        
        let db = Self {
            conn: Arc::new(Mutex::new(conn)),
            config: config.clone(),
        };
        
        db.create_tables()?;
        
        info!("Database opened at {:?}", config.path);
        Ok(db)
    }
    
    /// Create database tables
    fn create_tables(&self) -> Result<()> {
        let conn = self.conn.lock().unwrap();
        
        conn.execute_batch(r#"
            -- Sensor readings table
            CREATE TABLE IF NOT EXISTS readings (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                timestamp TEXT NOT NULL,
                sensor_id TEXT NOT NULL,
                sensor_type TEXT NOT NULL,
                quality REAL NOT NULL,
                data BLOB NOT NULL,
                created_at TEXT DEFAULT CURRENT_TIMESTAMP
            );
            
            CREATE INDEX IF NOT EXISTS idx_readings_timestamp ON readings(timestamp);
            CREATE INDEX IF NOT EXISTS idx_readings_sensor ON readings(sensor_id);
            
            -- Detections table
            CREATE TABLE IF NOT EXISTS detections (
                id TEXT PRIMARY KEY,
                timestamp TEXT NOT NULL,
                detection_type TEXT NOT NULL,
                confidence REAL NOT NULL,
                severity TEXT NOT NULL,
                sensor_count INTEGER NOT NULL,
                entropy_deviation REAL,
                correlation_score REAL,
                classification TEXT,
                data BLOB NOT NULL,
                created_at TEXT DEFAULT CURRENT_TIMESTAMP
            );
            
            CREATE INDEX IF NOT EXISTS idx_detections_timestamp ON detections(timestamp);
            CREATE INDEX IF NOT EXISTS idx_detections_type ON detections(detection_type);
            
            -- Sessions table
            CREATE TABLE IF NOT EXISTS sessions (
                id TEXT PRIMARY KEY,
                start_time TEXT NOT NULL,
                end_time TEXT,
                location TEXT,
                notes TEXT,
                reading_count INTEGER DEFAULT 0,
                detection_count INTEGER DEFAULT 0
            );
            
            -- Sensors table
            CREATE TABLE IF NOT EXISTS sensors (
                id TEXT PRIMARY KEY,
                name TEXT NOT NULL,
                sensor_type TEXT NOT NULL,
                calibration_data BLOB,
                last_seen TEXT,
                status TEXT DEFAULT 'unknown'
            );
            
            -- Settings table
            CREATE TABLE IF NOT EXISTS settings (
                key TEXT PRIMARY KEY,
                value TEXT NOT NULL,
                updated_at TEXT DEFAULT CURRENT_TIMESTAMP
            );
        "#)?;
        
        Ok(())
    }
    
    /// Store a sensor reading
    pub fn store_reading(&self, reading: &SensorReading) -> Result<()> {
        let conn = self.conn.lock().unwrap();
        
        let data = bincode::serialize(&reading.data)?;
        
        conn.execute(
            "INSERT INTO readings (timestamp, sensor_id, sensor_type, quality, data) VALUES (?1, ?2, ?3, ?4, ?5)",
            params![
                reading.timestamp.to_rfc3339(),
                reading.sensor_id,
                format!("{:?}", reading.sensor_type),
                reading.quality,
                data
            ],
        )?;
        
        Ok(())
    }
    
    /// Store multiple readings in batch
    pub fn store_readings_batch(&self, readings: &[SensorReading]) -> Result<usize> {
        let conn = self.conn.lock().unwrap();
        
        let tx = conn.unchecked_transaction()?;
        let mut count = 0;
        
        for reading in readings {
            let data = bincode::serialize(&reading.data)?;
            
            tx.execute(
                "INSERT INTO readings (timestamp, sensor_id, sensor_type, quality, data) VALUES (?1, ?2, ?3, ?4, ?5)",
                params![
                    reading.timestamp.to_rfc3339(),
                    reading.sensor_id,
                    format!("{:?}", reading.sensor_type),
                    reading.quality,
                    data
                ],
            )?;
            count += 1;
        }
        
        tx.commit()?;
        Ok(count)
    }
    
    /// Store a detection
    pub fn store_detection(&self, detection: &Detection) -> Result<()> {
        let conn = self.conn.lock().unwrap();
        
        let data = bincode::serialize(detection)?;
        let classification = detection.classification.as_ref()
            .map(|c| serde_json::to_string(c).ok())
            .flatten();
        
        conn.execute(
            r#"INSERT INTO detections 
               (id, timestamp, detection_type, confidence, severity, sensor_count, 
                entropy_deviation, correlation_score, classification, data)
               VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)"#,
            params![
                detection.id,
                detection.timestamp.to_rfc3339(),
                format!("{:?}", detection.detection_type),
                detection.confidence,
                format!("{:?}", detection.severity),
                detection.sensors.len() as i32,
                detection.entropy_deviation,
                detection.correlation_score,
                classification,
                data
            ],
        )?;
        
        Ok(())
    }
    
    /// Query readings by time range
    pub fn query_readings(
        &self,
        start: DateTime<Utc>,
        end: DateTime<Utc>,
        sensor_id: Option<&str>,
        limit: Option<usize>,
    ) -> Result<Vec<StoredReading>> {
        let conn = self.conn.lock().unwrap();
        
        let sql = if let Some(sid) = sensor_id {
            format!(
                "SELECT id, timestamp, sensor_id, sensor_type, quality, data FROM readings 
                 WHERE timestamp >= ?1 AND timestamp <= ?2 AND sensor_id = ?3
                 ORDER BY timestamp DESC LIMIT {}",
                limit.unwrap_or(1000)
            )
        } else {
            format!(
                "SELECT id, timestamp, sensor_id, sensor_type, quality, data FROM readings 
                 WHERE timestamp >= ?1 AND timestamp <= ?2
                 ORDER BY timestamp DESC LIMIT {}",
                limit.unwrap_or(1000)
            )
        };
        
        let mut stmt = conn.prepare(&sql)?;
        
        let mut results = Vec::new();
        
        if let Some(sid) = sensor_id {
            let mut rows = stmt.query(params![start.to_rfc3339(), end.to_rfc3339(), sid])?;
            while let Some(row) = rows.next()? {
                results.push(StoredReading {
                    id: row.get(0)?,
                    timestamp: row.get(1)?,
                    sensor_id: row.get(2)?,
                    sensor_type: row.get(3)?,
                    quality: row.get(4)?,
                    data: row.get(5)?,
                });
            }
        } else {
            let mut rows = stmt.query(params![start.to_rfc3339(), end.to_rfc3339()])?;
            while let Some(row) = rows.next()? {
                results.push(StoredReading {
                    id: row.get(0)?,
                    timestamp: row.get(1)?,
                    sensor_id: row.get(2)?,
                    sensor_type: row.get(3)?,
                    quality: row.get(4)?,
                    data: row.get(5)?,
                });
            }
        }
        
        Ok(results)
    }
    
    /// Query detections by time range
    pub fn query_detections(
        &self,
        start: DateTime<Utc>,
        end: DateTime<Utc>,
        min_confidence: Option<f64>,
        limit: Option<usize>,
    ) -> Result<Vec<StoredDetection>> {
        let conn = self.conn.lock().unwrap();
        
        let min_conf = min_confidence.unwrap_or(0.0);
        
        let sql = format!(
            "SELECT id, timestamp, detection_type, confidence, severity, sensor_count, data 
             FROM detections 
             WHERE timestamp >= ?1 AND timestamp <= ?2 AND confidence >= ?3
             ORDER BY timestamp DESC LIMIT {}",
            limit.unwrap_or(100)
        );
        
        let mut stmt = conn.prepare(&sql)?;
        
        let rows = stmt.query_map(params![start.to_rfc3339(), end.to_rfc3339(), min_conf], |row| {
            Ok(StoredDetection {
                id: row.get(0)?,
                timestamp: row.get(1)?,
                detection_type: row.get(2)?,
                confidence: row.get(3)?,
                severity: row.get(4)?,
                sensor_count: row.get(5)?,
                data: row.get(6)?,
            })
        })?;
        
        let mut results = Vec::new();
        for row in rows {
            results.push(row?);
        }
        
        Ok(results)
    }
    
    /// Get database statistics
    pub fn get_stats(&self) -> Result<DatabaseStats> {
        let conn = self.conn.lock().unwrap();
        
        let reading_count: i64 = conn.query_row(
            "SELECT COUNT(*) FROM readings",
            [],
            |row| row.get(0),
        )?;
        
        let detection_count: i64 = conn.query_row(
            "SELECT COUNT(*) FROM detections",
            [],
            |row| row.get(0),
        )?;
        
        let size_bytes: i64 = conn.query_row(
            "SELECT page_count * page_size as size FROM pragma_page_count(), pragma_page_size()",
            [],
            |row| row.get(0),
        ).unwrap_or(0);
        
        Ok(DatabaseStats {
            reading_count: reading_count as usize,
            detection_count: detection_count as usize,
            size_bytes: size_bytes as u64,
        })
    }
    
    /// Cleanup old data
    pub fn cleanup(&self, retention_days: u32) -> Result<usize> {
        let conn = self.conn.lock().unwrap();
        
        let cutoff = Utc::now() - chrono::Duration::days(retention_days as i64);
        
        let deleted_readings = conn.execute(
            "DELETE FROM readings WHERE timestamp < ?1",
            params![cutoff.to_rfc3339()],
        )?;
        
        let deleted_detections = conn.execute(
            "DELETE FROM detections WHERE timestamp < ?1",
            params![cutoff.to_rfc3339()],
        )?;
        
        // Vacuum to reclaim space
        conn.execute("VACUUM", [])?;
        
        info!("Cleaned up {} readings and {} detections older than {} days",
            deleted_readings, deleted_detections, retention_days);
        
        Ok(deleted_readings + deleted_detections)
    }
    
    /// Store a setting
    pub fn set_setting(&self, key: &str, value: &str) -> Result<()> {
        let conn = self.conn.lock().unwrap();
        
        conn.execute(
            "INSERT OR REPLACE INTO settings (key, value, updated_at) VALUES (?1, ?2, ?3)",
            params![key, value, Utc::now().to_rfc3339()],
        )?;
        
        Ok(())
    }
    
    /// Get a setting
    pub fn get_setting(&self, key: &str) -> Result<Option<String>> {
        let conn = self.conn.lock().unwrap();
        
        let result: Result<String, _> = conn.query_row(
            "SELECT value FROM settings WHERE key = ?1",
            params![key],
            |row| row.get(0),
        );
        
        match result {
            Ok(value) => Ok(Some(value)),
            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
            Err(e) => Err(e.into()),
        }
    }
}

#[derive(Debug, Clone)]
pub struct StoredReading {
    pub id: i64,
    pub timestamp: String,
    pub sensor_id: String,
    pub sensor_type: String,
    pub quality: f32,
    pub data: Vec<u8>,
}

#[derive(Debug, Clone)]
pub struct StoredDetection {
    pub id: String,
    pub timestamp: String,
    pub detection_type: String,
    pub confidence: f64,
    pub severity: String,
    pub sensor_count: i32,
    pub data: Vec<u8>,
}

#[derive(Debug, Clone)]
pub struct DatabaseStats {
    pub reading_count: usize,
    pub detection_count: usize,
    pub size_bytes: u64,
}