ugnos/core.rs
1//! Core database logic: main API, background flush thread, and orchestration of storage, buffer, and persistence.
2
3use crate::buffer::WriteBuffer;
4use crate::error::DbError;
5use crate::persistence::{Snapshotter, WriteAheadLog};
6use crate::query::execute_query;
7use crate::storage::InMemoryStorage;
8use crate::types::{DataPoint, TagSet, Timestamp, Value};
9
10use std::ops::Range;
11use std::path::{Path, PathBuf};
12use std::sync::{mpsc, Arc, Mutex, RwLock};
13use std::thread::{self, JoinHandle};
14use std::time::{Duration, SystemTime, UNIX_EPOCH};
15
16/// Commands sent to the background flush thread to control flushing, shutdown, and snapshotting.
17enum FlushCommand {
18 Flush,
19 Shutdown,
20 Snapshot,
21}
22
23/// Configuration options for the DbCore
24#[derive(Debug, Clone)]
25pub struct DbConfig {
26 /// Interval between automatic buffer flushes
27 pub flush_interval: Duration,
28 /// Directory for persistence files (WAL and snapshots)
29 pub data_dir: PathBuf,
30 /// Maximum number of entries to buffer in WAL before writing to disk
31 pub wal_buffer_size: usize,
32 /// Whether to enable WAL (Write-Ahead Logging)
33 pub enable_wal: bool,
34 /// Whether to enable snapshots
35 pub enable_snapshots: bool,
36 /// Interval between automatic snapshots (if enabled)
37 pub snapshot_interval: Duration,
38}
39
40impl Default for DbConfig {
41 fn default() -> Self {
42 DbConfig {
43 flush_interval: Duration::from_secs(1),
44 data_dir: PathBuf::from("./data"),
45 wal_buffer_size: 1000,
46 enable_wal: true,
47 enable_snapshots: true,
48 snapshot_interval: Duration::from_secs(60 * 15), // 15 minutes
49 }
50 }
51}
52
53/// The main concurrent time-series database core struct.
54#[derive(Debug)]
55pub struct DbCore {
56 /// In-memory storage for all time series data.
57 storage: Arc<RwLock<InMemoryStorage>>,
58 /// Buffer for staging writes before flush.
59 write_buffer: Arc<Mutex<WriteBuffer>>,
60 /// Channel sender for flush thread commands.
61 flush_cmd_tx: mpsc::Sender<FlushCommand>,
62 /// Handle for the background flush thread.
63 flush_handle: Option<JoinHandle<()>>,
64 /// Write-Ahead Log for durability (if enabled).
65 wal: Option<Arc<Mutex<WriteAheadLog>>>,
66 /// Snapshot manager (if enabled).
67 snapshotter: Option<Arc<Snapshotter>>,
68 /// Database configuration.
69 config: DbConfig,
70}
71
72impl DbCore {
73 /// Creates a new `DbCore` instance with the provided configuration.
74 ///
75 /// This sets up the in-memory storage, write buffer, and (if enabled) persistence mechanisms
76 /// such as the Write-Ahead Log (WAL) and snapshotting. It also spawns the background flush thread,
77 /// which periodically flushes staged writes to storage and handles snapshot creation.
78 ///
79 /// # Arguments
80 /// * `config` - The database configuration to use.
81 ///
82 /// # Returns
83 /// * `Ok(DbCore)` if initialization succeeds.
84 /// * `Err(DbError)` if any component fails to initialize (e.g., WAL or snapshotter).
85 ///
86 /// # Errors
87 /// Returns an error if persistence components cannot be initialized.
88 pub fn with_config(config: DbConfig) -> Result<Self, DbError> {
89 let storage = Arc::new(RwLock::new(InMemoryStorage::default()));
90 let write_buffer = Arc::new(Mutex::new(WriteBuffer::default()));
91
92 // Initialize persistence components if enabled
93 let wal = if config.enable_wal {
94 let wal_dir = config.data_dir.join("wal");
95 let wal = WriteAheadLog::new(wal_dir, config.wal_buffer_size)?;
96 Some(Arc::new(Mutex::new(wal)))
97 } else {
98 None
99 };
100
101 let snapshotter = if config.enable_snapshots {
102 let snapshot_dir = config.data_dir.join("snapshots");
103 let snapshotter = Snapshotter::new(snapshot_dir)?;
104 Some(Arc::new(snapshotter))
105 } else {
106 None
107 };
108
109 // Create a channel for communication with the flush thread
110 let (flush_cmd_tx, flush_cmd_rx) = mpsc::channel::<FlushCommand>();
111
112 // Clone Arcs for the background thread
113 let buffer_clone = Arc::clone(&write_buffer);
114 let storage_clone = Arc::clone(&storage);
115 let wal_clone = wal.clone();
116 let snapshotter_clone = snapshotter.clone();
117 let config_clone = config.clone();
118 let flush_cmd_tx_clone = flush_cmd_tx.clone(); // Clone the sender for the thread
119
120 // Time tracking for snapshots
121 let mut last_snapshot_time = SystemTime::now();
122
123 // The background flush thread periodically flushes the write buffer to storage,
124 // handles snapshot creation, and responds to explicit flush/snapshot/shutdown commands.
125
126 // Spawn the background flush thread
127 let flush_handle = thread::spawn(move || {
128 println!("Flush thread started.");
129 loop {
130 // Check if it's time for a snapshot
131 if config_clone.enable_snapshots {
132 let now = SystemTime::now();
133 if now.duration_since(last_snapshot_time).unwrap_or_default() >= config_clone.snapshot_interval {
134 // It's time for a snapshot, but we'll do it in the next iteration to avoid blocking here
135 let _ = flush_cmd_tx_clone.send(FlushCommand::Snapshot);
136 last_snapshot_time = now;
137 }
138 }
139
140 // Wait for a command or timeout
141 match flush_cmd_rx.recv_timeout(config_clone.flush_interval) {
142 // Received a command to flush or timed out
143 Ok(FlushCommand::Flush) | Err(mpsc::RecvTimeoutError::Timeout) => {
144 // Acquire lock on the buffer
145 let mut buffer_guard = match buffer_clone.lock() {
146 Ok(guard) => guard,
147 Err(poisoned) => {
148 eprintln!(
149 "Flush thread: Write buffer lock poisoned: {}. Shutting down.",
150 poisoned
151 );
152 // If the lock is poisoned, we can't recover, so shut down.
153 break;
154 }
155 };
156
157 // Drain data from the buffer
158 let data_to_flush = buffer_guard.drain_all_buffers();
159 // Release buffer lock *before* acquiring storage lock
160 drop(buffer_guard);
161
162 if !data_to_flush.is_empty() {
163 println!(
164 "Flush thread: Drained data for {} series. Acquiring storage lock...",
165 data_to_flush.len()
166 );
167 // Acquire write lock on storage
168 let mut storage_guard = match storage_clone.write() {
169 Ok(guard) => guard,
170 Err(poisoned) => {
171 eprintln!(
172 "Flush thread: Storage lock poisoned: {}. Shutting down.",
173 poisoned
174 );
175 // If the lock is poisoned, we can't recover, so shut down.
176 break;
177 }
178 };
179
180 // Append data to storage
181 match storage_guard.append_batch(data_to_flush) {
182 Ok(_) => println!("Flush thread: Data flushed successfully."),
183 Err(e) => eprintln!("Flush thread: Error flushing data: {}", e),
184 }
185 // Storage lock released here
186 }
187 }
188 // Received command to create a snapshot
189 Ok(FlushCommand::Snapshot) => {
190 if let Some(snapshotter) = &snapshotter_clone {
191 // First flush any pending data
192 let mut buffer_guard = match buffer_clone.lock() {
193 Ok(guard) => guard,
194 Err(_) => continue, // Skip if poisoned
195 };
196
197 let data_to_flush = buffer_guard.drain_all_buffers();
198 drop(buffer_guard);
199
200 if !data_to_flush.is_empty() {
201 if let Ok(mut storage_guard) = storage_clone.write() {
202 let _ = storage_guard.append_batch(data_to_flush);
203 }
204 }
205
206 // Now create the snapshot
207 if let Ok(storage_guard) = storage_clone.read() {
208 let now = SystemTime::now()
209 .duration_since(UNIX_EPOCH)
210 .unwrap_or_default()
211 .as_nanos() as u64;
212
213 match snapshotter.create_snapshot(storage_guard.get_all_series(), now) {
214 Ok(path) => println!("Flush thread: Created snapshot at {:?}", path),
215 Err(e) => eprintln!("Flush thread: Error creating snapshot: {}", e),
216 }
217
218 // Log the snapshot in WAL if enabled
219 if let Some(wal) = &wal_clone {
220 if let Ok(mut wal_guard) = wal.lock() {
221 if let Err(e) = wal_guard.log_flush(now) {
222 eprintln!("Flush thread: Error logging snapshot to WAL: {}", e);
223 }
224 }
225 }
226 }
227 }
228 }
229 // Received shutdown command
230 Ok(FlushCommand::Shutdown) => {
231 println!("Flush thread: Received shutdown command. Flushing one last time...");
232 // Perform a final flush before shutting down
233 let mut buffer_guard = match buffer_clone.lock() {
234 Ok(guard) => guard,
235 Err(_) => break, // Already poisoned, just exit
236 };
237 let data_to_flush = buffer_guard.drain_all_buffers();
238 drop(buffer_guard);
239
240 if !data_to_flush.is_empty() {
241 if let Ok(mut storage_guard) = storage_clone.write() {
242 let _ = storage_guard.append_batch(data_to_flush);
243 }
244 }
245
246 // Flush and close WAL if enabled
247 if let Some(wal) = &wal_clone {
248 if let Ok(mut wal_guard) = wal.lock() {
249 let now = SystemTime::now()
250 .duration_since(UNIX_EPOCH)
251 .unwrap_or_default()
252 .as_nanos() as u64;
253
254 // Log the final flush
255 if let Err(e) = wal_guard.log_flush(now) {
256 eprintln!("Flush thread: Error logging final flush to WAL: {}", e);
257 }
258
259 // Close the WAL
260 if let Err(e) = wal_guard.close() {
261 eprintln!("Flush thread: Error closing WAL: {}", e);
262 }
263 }
264 }
265
266 println!("Flush thread: Final flush complete.");
267 println!("Flush thread: Exiting.");
268 break; // Exit the loop
269 }
270 // Channel disconnected (DbCore dropped)
271 Err(mpsc::RecvTimeoutError::Disconnected) => {
272 println!("Flush thread: Command channel disconnected. Exiting.");
273 break; // Exit the loop
274 }
275 }
276 }
277 });
278
279 Ok(DbCore {
280 storage,
281 write_buffer,
282 flush_cmd_tx,
283 flush_handle: Some(flush_handle),
284 wal,
285 snapshotter,
286 config,
287 })
288 }
289
290 /// Creates a new `DbCore` instance with default configuration, but with a custom flush interval.
291 ///
292 /// This is a convenience constructor for quickly creating a database with a specific flush interval.
293 /// All other configuration options use their default values.
294 ///
295 /// # Arguments
296 /// * `flush_interval` - The interval between automatic buffer flushes.
297 ///
298 /// # Panics
299 /// Panics if the database cannot be initialized with the default configuration.
300 pub fn new(flush_interval: Duration) -> Self {
301 let mut config = DbConfig::default();
302 config.flush_interval = flush_interval;
303 Self::with_config(config).expect("Failed to initialize DbCore with default configuration")
304 }
305
306 /// Recovers the database state from disk using the latest snapshot and any newer WAL entries.
307 ///
308 /// This method should be called after constructing the database if you want to restore
309 /// persisted data. It loads the most recent snapshot (if enabled), then applies any
310 /// WAL entries that occurred after the snapshot.
311 ///
312 /// # Returns
313 /// * `Ok(())` if recovery succeeds or if persistence is not enabled.
314 /// * `Err(DbError)` if recovery fails.
315 ///
316 /// # Errors
317 /// Returns an error if loading the snapshot or WAL fails.
318 pub fn recover(&mut self) -> Result<(), DbError> {
319 if self.snapshotter.is_none() && self.wal.is_none() {
320 // No persistence enabled, nothing to recover
321 return Ok(());
322 }
323
324 // First try to load from the latest snapshot
325 let mut latest_timestamp = 0;
326 if let Some(snapshotter) = &self.snapshotter {
327 if let Some(data) = snapshotter.load_latest_snapshot()? {
328 // Get latest snapshot timestamp
329 if let Ok(Some(ts)) = snapshotter.get_latest_snapshot_timestamp() {
330 latest_timestamp = ts;
331 }
332
333 // Load snapshot data into storage
334 let mut storage_guard = self.storage.write()?;
335 for (series, points) in data {
336 storage_guard.append_points(&series, points)?;
337 }
338
339 println!("Recovered from snapshot with timestamp {}", latest_timestamp);
340 }
341 }
342
343 // Apply any WAL entries that are newer than the snapshot
344 if let Some(wal) = &self.wal {
345 let wal_entries = wal.lock()?.read_all_entries()?;
346
347 let mut pending_inserts = std::collections::HashMap::new();
348
349 for entry in wal_entries {
350 match entry {
351 crate::persistence::WalEntry::Insert { series, timestamp, value, tags } => {
352 // Only apply if newer than snapshot
353 if timestamp > latest_timestamp {
354 let point = DataPoint { timestamp, value, tags };
355 pending_inserts
356 .entry(series)
357 .or_insert_with(Vec::new)
358 .push(point);
359 }
360 },
361 crate::persistence::WalEntry::Flush { timestamp } => {
362 // This was a flush or snapshot point
363 latest_timestamp = timestamp;
364
365 // Apply all pending inserts
366 if !pending_inserts.is_empty() {
367 let mut storage_guard = self.storage.write()?;
368 for (series, points) in pending_inserts.drain() {
369 storage_guard.append_points(&series, points)?;
370 }
371 }
372 }
373 }
374 }
375
376 // Apply any remaining pending inserts
377 if !pending_inserts.is_empty() {
378 let mut storage_guard = self.storage.write()?;
379 for (series, points) in pending_inserts {
380 storage_guard.append_points(&series, points)?;
381 }
382 }
383
384 println!("Applied WAL entries");
385 }
386
387 Ok(())
388 }
389
390 /// Inserts a data point into the specified time series.
391 ///
392 /// This method is thread-safe and can be called concurrently from multiple threads.
393 /// The data point is first staged in the write buffer and will be flushed to storage
394 /// either automatically (by the background thread) or manually (via `flush()`).
395 /// If WAL is enabled, the insert is also logged for durability.
396 ///
397 /// # Arguments
398 /// * `series` - Name of the time series.
399 /// * `timestamp` - Timestamp of the data point.
400 /// * `value` - Value to insert.
401 /// * `tags` - Associated tags for the data point.
402 ///
403 /// # Returns
404 /// * `Ok(())` if the data point is staged successfully.
405 /// * `Err(DbError)` if staging or logging fails.
406 ///
407 /// # Errors
408 /// Returns an error if the WAL or write buffer cannot be accessed.
409 pub fn insert(
410 &self,
411 series: &str,
412 timestamp: Timestamp,
413 value: Value,
414 tags: TagSet,
415 ) -> Result<(), DbError> {
416 let point = DataPoint { timestamp, value, tags: tags.clone() };
417
418 // Log to WAL if enabled
419 if let Some(wal) = &self.wal {
420 let mut wal_guard = wal.lock()?;
421 wal_guard.log_insert(series, timestamp, value, tags)?;
422 }
423
424 // Acquire lock on the write buffer
425 let mut buffer_guard = self.write_buffer.lock()?; // Propagate PoisonError
426 // Stage the data point
427 buffer_guard.stage(series, point)
428 }
429
430 /// Queries data points from a specific time series within a given time range,
431 /// optionally filtering by a set of tags.
432 ///
433 /// This method is thread-safe and allows concurrent queries. It acquires a read lock
434 /// on the storage and the relevant series chunk, then executes the query in parallel.
435 ///
436 /// # Arguments
437 /// * `series` - The name of the time series to query.
438 /// * `time_range` - The time range for the query (start inclusive, end exclusive).
439 /// * `tag_filter` - An optional set of tags to filter by. Only points matching all tags are returned.
440 ///
441 /// # Returns
442 /// * `Ok(Vec<(Timestamp, Value)>)` with all matching data points.
443 /// * `Err(DbError)` if the series does not exist or a lock cannot be acquired.
444 ///
445 /// # Errors
446 /// Returns an error if the series is not found or if a lock is poisoned.
447 pub fn query(
448 &self,
449 series: &str,
450 time_range: Range<Timestamp>,
451 tag_filter: Option<&TagSet>,
452 ) -> Result<Vec<(Timestamp, Value)>, DbError> {
453 // Acquire read lock on the storage
454 let storage_guard = self.storage.read()?; // Propagate PoisonError
455
456 // Get the specific series chunk (as an Arc<RwLock<TimeSeriesChunk>>)
457 let chunk_arc = storage_guard
458 .get_chunk_for_query(series)
459 .ok_or_else(|| DbError::SeriesNotFound(series.to_string()))?;
460
461 // Acquire read lock on the specific chunk
462 // This allows concurrent queries on the same series
463 let chunk_guard = chunk_arc.read()?; // Propagate PoisonError
464
465 // Execute the query using the query module function
466 execute_query(chunk_guard, time_range, tag_filter)
467 }
468
469 /// Triggers an immediate flush of the write buffer to storage.
470 ///
471 /// This sends a command to the background flush thread to flush all staged data points
472 /// to the in-memory storage. Useful for testing or ensuring data is persisted before shutdown.
473 ///
474 /// # Returns
475 /// * `Ok(())` if the flush command is sent successfully.
476 /// * `Err(DbError)` if the command cannot be sent.
477 ///
478 /// # Errors
479 /// Returns an error if the background thread cannot be reached.
480 pub fn flush(&self) -> Result<(), DbError> {
481 self.flush_cmd_tx.send(FlushCommand::Flush).map_err(|e| {
482 DbError::BackgroundTaskError(format!("Failed to send flush command: {}", e))
483 })
484 }
485
486 /// Triggers an immediate snapshot of the current database state.
487 ///
488 /// This sends a command to the background flush thread to create a snapshot of all
489 /// in-memory data. Snapshots are only available if enabled in the configuration.
490 ///
491 /// # Returns
492 /// * `Ok(())` if the snapshot command is sent successfully.
493 /// * `Err(DbError)` if snapshots are not enabled or the command cannot be sent.
494 ///
495 /// # Errors
496 /// Returns an error if snapshots are disabled or if the background thread cannot be reached.
497 pub fn snapshot(&self) -> Result<(), DbError> {
498 if self.snapshotter.is_none() {
499 return Err(DbError::ConfigError("Snapshots are not enabled".to_string()));
500 }
501
502 self.flush_cmd_tx.send(FlushCommand::Snapshot).map_err(|e| {
503 DbError::BackgroundTaskError(format!("Failed to send snapshot command: {}", e))
504 })
505 }
506
507 /// Returns a reference to the current database configuration.
508 ///
509 /// This allows inspection of the configuration used to initialize the database.
510 ///
511 /// # Returns
512 /// * A reference to the `DbConfig` struct.
513 pub fn get_config(&self) -> &DbConfig {
514 &self.config
515 }
516}
517
518/// Default implementation uses a 1-second flush interval.
519impl Default for DbCore {
520 fn default() -> Self {
521 Self::with_config(DbConfig::default()).expect("Failed to initialize DbCore with default configuration")
522 }
523}
524
525/// Implement Drop to gracefully shut down the background flush thread.
526impl Drop for DbCore {
527 fn drop(&mut self) {
528 println!("DbCore dropping. Sending shutdown command to flush thread...");
529 // Send the shutdown command, ignoring potential errors if the thread already panicked
530 let _ = self.flush_cmd_tx.send(FlushCommand::Shutdown);
531
532 // Wait for the flush thread to finish
533 if let Some(handle) = self.flush_handle.take() {
534 println!("Waiting for flush thread to exit...");
535 if let Err(e) = handle.join() {
536 eprintln!("Flush thread panicked: {:?}", e);
537 }
538 println!("Flush thread joined.");
539 } else {
540 println!("Flush thread handle already taken or thread never started.");
541 }
542 println!("DbCore dropped.");
543 }
544}
545