ugnos/
core.rs

1//! Core database logic: main API, background flush thread, and orchestration of storage, buffer, and persistence.
2
3use crate::buffer::WriteBuffer;
4use crate::error::DbError;
5use crate::persistence::{Snapshotter, WriteAheadLog};
6use crate::query::execute_query;
7use crate::storage::InMemoryStorage;
8use crate::types::{DataPoint, TagSet, Timestamp, Value};
9
10use std::ops::Range;
11use std::path::{Path, PathBuf};
12use std::sync::{mpsc, Arc, Mutex, RwLock};
13use std::thread::{self, JoinHandle};
14use std::time::{Duration, SystemTime, UNIX_EPOCH};
15
16/// Commands sent to the background flush thread to control flushing, shutdown, and snapshotting.
17enum FlushCommand {
18    Flush,
19    Shutdown,
20    Snapshot,
21}
22
23/// Configuration options for the DbCore
24#[derive(Debug, Clone)]
25pub struct DbConfig {
26    /// Interval between automatic buffer flushes
27    pub flush_interval: Duration,
28    /// Directory for persistence files (WAL and snapshots)
29    pub data_dir: PathBuf,
30    /// Maximum number of entries to buffer in WAL before writing to disk
31    pub wal_buffer_size: usize,
32    /// Whether to enable WAL (Write-Ahead Logging)
33    pub enable_wal: bool,
34    /// Whether to enable snapshots
35    pub enable_snapshots: bool,
36    /// Interval between automatic snapshots (if enabled)
37    pub snapshot_interval: Duration,
38}
39
40impl Default for DbConfig {
41    fn default() -> Self {
42        DbConfig {
43            flush_interval: Duration::from_secs(1),
44            data_dir: PathBuf::from("./data"),
45            wal_buffer_size: 1000,
46            enable_wal: true,
47            enable_snapshots: true,
48            snapshot_interval: Duration::from_secs(60 * 15), // 15 minutes
49        }
50    }
51}
52
53/// The main concurrent time-series database core struct.
54#[derive(Debug)]
55pub struct DbCore {
56    /// In-memory storage for all time series data.
57    storage: Arc<RwLock<InMemoryStorage>>,
58    /// Buffer for staging writes before flush.
59    write_buffer: Arc<Mutex<WriteBuffer>>,
60    /// Channel sender for flush thread commands.
61    flush_cmd_tx: mpsc::Sender<FlushCommand>,
62    /// Handle for the background flush thread.
63    flush_handle: Option<JoinHandle<()>>,
64    /// Write-Ahead Log for durability (if enabled).
65    wal: Option<Arc<Mutex<WriteAheadLog>>>,
66    /// Snapshot manager (if enabled).
67    snapshotter: Option<Arc<Snapshotter>>,
68    /// Database configuration.
69    config: DbConfig,
70}
71
72impl DbCore {
73    /// Creates a new `DbCore` instance with the provided configuration.
74    ///
75    /// This sets up the in-memory storage, write buffer, and (if enabled) persistence mechanisms
76    /// such as the Write-Ahead Log (WAL) and snapshotting. It also spawns the background flush thread,
77    /// which periodically flushes staged writes to storage and handles snapshot creation.
78    ///
79    /// # Arguments
80    /// * `config` - The database configuration to use.
81    ///
82    /// # Returns
83    /// * `Ok(DbCore)` if initialization succeeds.
84    /// * `Err(DbError)` if any component fails to initialize (e.g., WAL or snapshotter).
85    ///
86    /// # Errors
87    /// Returns an error if persistence components cannot be initialized.
88    pub fn with_config(config: DbConfig) -> Result<Self, DbError> {
89        let storage = Arc::new(RwLock::new(InMemoryStorage::default()));
90        let write_buffer = Arc::new(Mutex::new(WriteBuffer::default()));
91
92        // Initialize persistence components if enabled
93        let wal = if config.enable_wal {
94            let wal_dir = config.data_dir.join("wal");
95            let wal = WriteAheadLog::new(wal_dir, config.wal_buffer_size)?;
96            Some(Arc::new(Mutex::new(wal)))
97        } else {
98            None
99        };
100
101        let snapshotter = if config.enable_snapshots {
102            let snapshot_dir = config.data_dir.join("snapshots");
103            let snapshotter = Snapshotter::new(snapshot_dir)?;
104            Some(Arc::new(snapshotter))
105        } else {
106            None
107        };
108
109        // Create a channel for communication with the flush thread
110        let (flush_cmd_tx, flush_cmd_rx) = mpsc::channel::<FlushCommand>();
111
112        // Clone Arcs for the background thread
113        let buffer_clone = Arc::clone(&write_buffer);
114        let storage_clone = Arc::clone(&storage);
115        let wal_clone = wal.clone();
116        let snapshotter_clone = snapshotter.clone();
117        let config_clone = config.clone();
118        let flush_cmd_tx_clone = flush_cmd_tx.clone(); // Clone the sender for the thread
119
120        // Time tracking for snapshots
121        let mut last_snapshot_time = SystemTime::now();
122
123        // The background flush thread periodically flushes the write buffer to storage,
124        // handles snapshot creation, and responds to explicit flush/snapshot/shutdown commands.
125
126        // Spawn the background flush thread
127        let flush_handle = thread::spawn(move || {
128            println!("Flush thread started.");
129            loop {
130                // Check if it's time for a snapshot
131                if config_clone.enable_snapshots {
132                    let now = SystemTime::now();
133                    if now.duration_since(last_snapshot_time).unwrap_or_default() >= config_clone.snapshot_interval {
134                        // It's time for a snapshot, but we'll do it in the next iteration to avoid blocking here
135                        let _ = flush_cmd_tx_clone.send(FlushCommand::Snapshot);
136                        last_snapshot_time = now;
137                    }
138                }
139                
140                // Wait for a command or timeout
141                match flush_cmd_rx.recv_timeout(config_clone.flush_interval) {
142                    // Received a command to flush or timed out
143                    Ok(FlushCommand::Flush) | Err(mpsc::RecvTimeoutError::Timeout) => {
144                        // Acquire lock on the buffer
145                        let mut buffer_guard = match buffer_clone.lock() {
146                            Ok(guard) => guard,
147                            Err(poisoned) => {
148                                eprintln!(
149                                    "Flush thread: Write buffer lock poisoned: {}. Shutting down.",
150                                    poisoned
151                                );
152                                // If the lock is poisoned, we can't recover, so shut down.
153                                break;
154                            }
155                        };
156
157                        // Drain data from the buffer
158                        let data_to_flush = buffer_guard.drain_all_buffers();
159                        // Release buffer lock *before* acquiring storage lock
160                        drop(buffer_guard);
161
162                        if !data_to_flush.is_empty() {
163                            println!(
164                                "Flush thread: Drained data for {} series. Acquiring storage lock...",
165                                data_to_flush.len()
166                            );
167                            // Acquire write lock on storage
168                            let mut storage_guard = match storage_clone.write() {
169                                Ok(guard) => guard,
170                                Err(poisoned) => {
171                                    eprintln!(
172                                        "Flush thread: Storage lock poisoned: {}. Shutting down.",
173                                        poisoned
174                                    );
175                                    // If the lock is poisoned, we can't recover, so shut down.
176                                    break;
177                                }
178                            };
179
180                            // Append data to storage
181                            match storage_guard.append_batch(data_to_flush) {
182                                Ok(_) => println!("Flush thread: Data flushed successfully."),
183                                Err(e) => eprintln!("Flush thread: Error flushing data: {}", e),
184                            }
185                            // Storage lock released here
186                        }
187                    }
188                    // Received command to create a snapshot
189                    Ok(FlushCommand::Snapshot) => {
190                        if let Some(snapshotter) = &snapshotter_clone {
191                            // First flush any pending data
192                            let mut buffer_guard = match buffer_clone.lock() {
193                                Ok(guard) => guard,
194                                Err(_) => continue, // Skip if poisoned
195                            };
196                            
197                            let data_to_flush = buffer_guard.drain_all_buffers();
198                            drop(buffer_guard);
199                            
200                            if !data_to_flush.is_empty() {
201                                if let Ok(mut storage_guard) = storage_clone.write() {
202                                    let _ = storage_guard.append_batch(data_to_flush);
203                                }
204                            }
205                            
206                            // Now create the snapshot
207                            if let Ok(storage_guard) = storage_clone.read() {
208                                let now = SystemTime::now()
209                                    .duration_since(UNIX_EPOCH)
210                                    .unwrap_or_default()
211                                    .as_nanos() as u64;
212                                
213                                match snapshotter.create_snapshot(storage_guard.get_all_series(), now) {
214                                    Ok(path) => println!("Flush thread: Created snapshot at {:?}", path),
215                                    Err(e) => eprintln!("Flush thread: Error creating snapshot: {}", e),
216                                }
217                                
218                                // Log the snapshot in WAL if enabled
219                                if let Some(wal) = &wal_clone {
220                                    if let Ok(mut wal_guard) = wal.lock() {
221                                        if let Err(e) = wal_guard.log_flush(now) {
222                                            eprintln!("Flush thread: Error logging snapshot to WAL: {}", e);
223                                        }
224                                    }
225                                }
226                            }
227                        }
228                    }
229                    // Received shutdown command
230                    Ok(FlushCommand::Shutdown) => {
231                        println!("Flush thread: Received shutdown command. Flushing one last time...");
232                        // Perform a final flush before shutting down
233                        let mut buffer_guard = match buffer_clone.lock() {
234                            Ok(guard) => guard,
235                            Err(_) => break, // Already poisoned, just exit
236                        };
237                        let data_to_flush = buffer_guard.drain_all_buffers();
238                        drop(buffer_guard);
239                        
240                        if !data_to_flush.is_empty() {
241                            if let Ok(mut storage_guard) = storage_clone.write() {
242                                let _ = storage_guard.append_batch(data_to_flush);
243                            }
244                        }
245                        
246                        // Flush and close WAL if enabled
247                        if let Some(wal) = &wal_clone {
248                            if let Ok(mut wal_guard) = wal.lock() {
249                                let now = SystemTime::now()
250                                    .duration_since(UNIX_EPOCH)
251                                    .unwrap_or_default()
252                                    .as_nanos() as u64;
253                                
254                                // Log the final flush
255                                if let Err(e) = wal_guard.log_flush(now) {
256                                    eprintln!("Flush thread: Error logging final flush to WAL: {}", e);
257                                }
258                                
259                                // Close the WAL
260                                if let Err(e) = wal_guard.close() {
261                                    eprintln!("Flush thread: Error closing WAL: {}", e);
262                                }
263                            }
264                        }
265                        
266                        println!("Flush thread: Final flush complete.");
267                        println!("Flush thread: Exiting.");
268                        break; // Exit the loop
269                    }
270                    // Channel disconnected (DbCore dropped)
271                    Err(mpsc::RecvTimeoutError::Disconnected) => {
272                        println!("Flush thread: Command channel disconnected. Exiting.");
273                        break; // Exit the loop
274                    }
275                }
276            }
277        });
278
279        Ok(DbCore {
280            storage,
281            write_buffer,
282            flush_cmd_tx,
283            flush_handle: Some(flush_handle),
284            wal,
285            snapshotter,
286            config,
287        })
288    }
289
290    /// Creates a new `DbCore` instance with default configuration, but with a custom flush interval.
291    ///
292    /// This is a convenience constructor for quickly creating a database with a specific flush interval.
293    /// All other configuration options use their default values.
294    ///
295    /// # Arguments
296    /// * `flush_interval` - The interval between automatic buffer flushes.
297    ///
298    /// # Panics
299    /// Panics if the database cannot be initialized with the default configuration.
300    pub fn new(flush_interval: Duration) -> Self {
301        let mut config = DbConfig::default();
302        config.flush_interval = flush_interval;
303        Self::with_config(config).expect("Failed to initialize DbCore with default configuration")
304    }
305    
306    /// Recovers the database state from disk using the latest snapshot and any newer WAL entries.
307    ///
308    /// This method should be called after constructing the database if you want to restore
309    /// persisted data. It loads the most recent snapshot (if enabled), then applies any
310    /// WAL entries that occurred after the snapshot.
311    ///
312    /// # Returns
313    /// * `Ok(())` if recovery succeeds or if persistence is not enabled.
314    /// * `Err(DbError)` if recovery fails.
315    ///
316    /// # Errors
317    /// Returns an error if loading the snapshot or WAL fails.
318    pub fn recover(&mut self) -> Result<(), DbError> {
319        if self.snapshotter.is_none() && self.wal.is_none() {
320            // No persistence enabled, nothing to recover
321            return Ok(());
322        }
323        
324        // First try to load from the latest snapshot
325        let mut latest_timestamp = 0;
326        if let Some(snapshotter) = &self.snapshotter {
327            if let Some(data) = snapshotter.load_latest_snapshot()? {
328                // Get latest snapshot timestamp
329                if let Ok(Some(ts)) = snapshotter.get_latest_snapshot_timestamp() {
330                    latest_timestamp = ts;
331                }
332                
333                // Load snapshot data into storage
334                let mut storage_guard = self.storage.write()?;
335                for (series, points) in data {
336                    storage_guard.append_points(&series, points)?;
337                }
338                
339                println!("Recovered from snapshot with timestamp {}", latest_timestamp);
340            }
341        }
342        
343        // Apply any WAL entries that are newer than the snapshot
344        if let Some(wal) = &self.wal {
345            let wal_entries = wal.lock()?.read_all_entries()?;
346            
347            let mut pending_inserts = std::collections::HashMap::new();
348            
349            for entry in wal_entries {
350                match entry {
351                    crate::persistence::WalEntry::Insert { series, timestamp, value, tags } => {
352                        // Only apply if newer than snapshot
353                        if timestamp > latest_timestamp {
354                            let point = DataPoint { timestamp, value, tags };
355                            pending_inserts
356                                .entry(series)
357                                .or_insert_with(Vec::new)
358                                .push(point);
359                        }
360                    },
361                    crate::persistence::WalEntry::Flush { timestamp } => {
362                        // This was a flush or snapshot point
363                        latest_timestamp = timestamp;
364                        
365                        // Apply all pending inserts
366                        if !pending_inserts.is_empty() {
367                            let mut storage_guard = self.storage.write()?;
368                            for (series, points) in pending_inserts.drain() {
369                                storage_guard.append_points(&series, points)?;
370                            }
371                        }
372                    }
373                }
374            }
375            
376            // Apply any remaining pending inserts
377            if !pending_inserts.is_empty() {
378                let mut storage_guard = self.storage.write()?;
379                for (series, points) in pending_inserts {
380                    storage_guard.append_points(&series, points)?;
381                }
382            }
383            
384            println!("Applied WAL entries");
385        }
386        
387        Ok(())
388    }
389
390    /// Inserts a data point into the specified time series.
391    ///
392    /// This method is thread-safe and can be called concurrently from multiple threads.
393    /// The data point is first staged in the write buffer and will be flushed to storage
394    /// either automatically (by the background thread) or manually (via `flush()`).
395    /// If WAL is enabled, the insert is also logged for durability.
396    ///
397    /// # Arguments
398    /// * `series` - Name of the time series.
399    /// * `timestamp` - Timestamp of the data point.
400    /// * `value` - Value to insert.
401    /// * `tags` - Associated tags for the data point.
402    ///
403    /// # Returns
404    /// * `Ok(())` if the data point is staged successfully.
405    /// * `Err(DbError)` if staging or logging fails.
406    ///
407    /// # Errors
408    /// Returns an error if the WAL or write buffer cannot be accessed.
409    pub fn insert(
410        &self,
411        series: &str,
412        timestamp: Timestamp,
413        value: Value,
414        tags: TagSet,
415    ) -> Result<(), DbError> {
416        let point = DataPoint { timestamp, value, tags: tags.clone() };
417        
418        // Log to WAL if enabled
419        if let Some(wal) = &self.wal {
420            let mut wal_guard = wal.lock()?;
421            wal_guard.log_insert(series, timestamp, value, tags)?;
422        }
423        
424        // Acquire lock on the write buffer
425        let mut buffer_guard = self.write_buffer.lock()?; // Propagate PoisonError
426        // Stage the data point
427        buffer_guard.stage(series, point)
428    }
429
430    /// Queries data points from a specific time series within a given time range,
431    /// optionally filtering by a set of tags.
432    ///
433    /// This method is thread-safe and allows concurrent queries. It acquires a read lock
434    /// on the storage and the relevant series chunk, then executes the query in parallel.
435    ///
436    /// # Arguments
437    /// * `series` - The name of the time series to query.
438    /// * `time_range` - The time range for the query (start inclusive, end exclusive).
439    /// * `tag_filter` - An optional set of tags to filter by. Only points matching all tags are returned.
440    ///
441    /// # Returns
442    /// * `Ok(Vec<(Timestamp, Value)>)` with all matching data points.
443    /// * `Err(DbError)` if the series does not exist or a lock cannot be acquired.
444    ///
445    /// # Errors
446    /// Returns an error if the series is not found or if a lock is poisoned.
447    pub fn query(
448        &self,
449        series: &str,
450        time_range: Range<Timestamp>,
451        tag_filter: Option<&TagSet>,
452    ) -> Result<Vec<(Timestamp, Value)>, DbError> {
453        // Acquire read lock on the storage
454        let storage_guard = self.storage.read()?; // Propagate PoisonError
455
456        // Get the specific series chunk (as an Arc<RwLock<TimeSeriesChunk>>)
457        let chunk_arc = storage_guard
458            .get_chunk_for_query(series)
459            .ok_or_else(|| DbError::SeriesNotFound(series.to_string()))?;
460
461        // Acquire read lock on the specific chunk
462        // This allows concurrent queries on the same series
463        let chunk_guard = chunk_arc.read()?; // Propagate PoisonError
464
465        // Execute the query using the query module function
466        execute_query(chunk_guard, time_range, tag_filter)
467    }
468
469    /// Triggers an immediate flush of the write buffer to storage.
470    ///
471    /// This sends a command to the background flush thread to flush all staged data points
472    /// to the in-memory storage. Useful for testing or ensuring data is persisted before shutdown.
473    ///
474    /// # Returns
475    /// * `Ok(())` if the flush command is sent successfully.
476    /// * `Err(DbError)` if the command cannot be sent.
477    ///
478    /// # Errors
479    /// Returns an error if the background thread cannot be reached.
480    pub fn flush(&self) -> Result<(), DbError> {
481        self.flush_cmd_tx.send(FlushCommand::Flush).map_err(|e| {
482            DbError::BackgroundTaskError(format!("Failed to send flush command: {}", e))
483        })
484    }
485    
486    /// Triggers an immediate snapshot of the current database state.
487    ///
488    /// This sends a command to the background flush thread to create a snapshot of all
489    /// in-memory data. Snapshots are only available if enabled in the configuration.
490    ///
491    /// # Returns
492    /// * `Ok(())` if the snapshot command is sent successfully.
493    /// * `Err(DbError)` if snapshots are not enabled or the command cannot be sent.
494    ///
495    /// # Errors
496    /// Returns an error if snapshots are disabled or if the background thread cannot be reached.
497    pub fn snapshot(&self) -> Result<(), DbError> {
498        if self.snapshotter.is_none() {
499            return Err(DbError::ConfigError("Snapshots are not enabled".to_string()));
500        }
501        
502        self.flush_cmd_tx.send(FlushCommand::Snapshot).map_err(|e| {
503            DbError::BackgroundTaskError(format!("Failed to send snapshot command: {}", e))
504        })
505    }
506    
507    /// Returns a reference to the current database configuration.
508    ///
509    /// This allows inspection of the configuration used to initialize the database.
510    ///
511    /// # Returns
512    /// * A reference to the `DbConfig` struct.
513    pub fn get_config(&self) -> &DbConfig {
514        &self.config
515    }
516}
517
518/// Default implementation uses a 1-second flush interval.
519impl Default for DbCore {
520    fn default() -> Self {
521        Self::with_config(DbConfig::default()).expect("Failed to initialize DbCore with default configuration")
522    }
523}
524
525/// Implement Drop to gracefully shut down the background flush thread.
526impl Drop for DbCore {
527    fn drop(&mut self) {
528        println!("DbCore dropping. Sending shutdown command to flush thread...");
529        // Send the shutdown command, ignoring potential errors if the thread already panicked
530        let _ = self.flush_cmd_tx.send(FlushCommand::Shutdown);
531
532        // Wait for the flush thread to finish
533        if let Some(handle) = self.flush_handle.take() {
534            println!("Waiting for flush thread to exit...");
535            if let Err(e) = handle.join() {
536                eprintln!("Flush thread panicked: {:?}", e);
537            }
538            println!("Flush thread joined.");
539        } else {
540            println!("Flush thread handle already taken or thread never started.");
541        }
542        println!("DbCore dropped.");
543    }
544}
545