oxirs_tsdb/write/
compactor.rs

1//! Background Compaction for Time-Series Chunks
2//!
3//! This module provides automatic chunk compaction to improve compression ratios
4//! and reduce storage overhead.
5//!
6//! ## Compaction Strategies
7//!
8//! 1. **Merge small chunks** - Combine multiple small chunks into larger ones
9//! 2. **Recompress old chunks** - Rewrite chunks with better compression
10//! 3. **Remove duplicates** - Deduplicate identical data points
11//!
12//! ## Compaction Policy
13//!
14//! - Run every 1 hour (configurable)
15//! - Target: Merge chunks <10% full
16//! - Minimum chunk size: 1000 points
17//! - Maximum chunk size: 100,000 points
18
19use crate::error::{TsdbError, TsdbResult};
20use crate::storage::{ChunkEntry, ColumnarStore, TimeChunk};
21use chrono::{DateTime, Duration, Utc};
22use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
23use std::sync::{Arc, RwLock};
24use tokio::time::interval;
25
26/// Compaction configuration
27#[derive(Debug, Clone)]
28pub struct CompactionConfig {
29    /// Compaction interval (default: 1 hour)
30    pub interval: Duration,
31
32    /// Minimum chunk fill ratio to trigger compaction (default: 0.1 = 10%)
33    pub min_fill_ratio: f64,
34
35    /// Target chunk size in points (default: 10,000)
36    pub target_chunk_size: usize,
37
38    /// Maximum chunk size in points (default: 100,000)
39    pub max_chunk_size: usize,
40
41    /// Enable automatic compaction
42    pub enabled: bool,
43}
44
45impl Default for CompactionConfig {
46    fn default() -> Self {
47        Self {
48            interval: Duration::hours(1),
49            min_fill_ratio: 0.1,
50            target_chunk_size: 10_000,
51            max_chunk_size: 100_000,
52            enabled: true,
53        }
54    }
55}
56
57/// Compaction statistics
58#[derive(Debug, Clone, Default)]
59pub struct CompactionStats {
60    /// Number of compaction runs
61    pub runs: u64,
62    /// Number of chunks merged
63    pub chunks_merged: u64,
64    /// Number of chunks created
65    pub chunks_created: u64,
66    /// Total bytes saved
67    pub bytes_saved: u64,
68    /// Last compaction time
69    pub last_run: Option<DateTime<Utc>>,
70}
71
72/// Background compactor for time-series chunks
73#[derive(Debug)]
74pub struct Compactor {
75    /// Compaction configuration
76    config: CompactionConfig,
77
78    /// Compaction statistics
79    stats: Arc<RwLock<CompactionStats>>,
80
81    /// Running flag
82    running: Arc<AtomicBool>,
83
84    /// Total bytes processed
85    bytes_processed: Arc<AtomicU64>,
86}
87
88impl Compactor {
89    /// Create a new compactor with default configuration
90    pub fn new() -> Self {
91        Self::with_config(CompactionConfig::default())
92    }
93
94    /// Create a new compactor with custom configuration
95    pub fn with_config(config: CompactionConfig) -> Self {
96        Self {
97            config,
98            stats: Arc::new(RwLock::new(CompactionStats::default())),
99            running: Arc::new(AtomicBool::new(false)),
100            bytes_processed: Arc::new(AtomicU64::new(0)),
101        }
102    }
103
104    /// Start background compaction task
105    ///
106    /// Runs compaction at configured intervals until stopped.
107    pub async fn start(&self, store: Arc<ColumnarStore>) -> TsdbResult<()> {
108        if !self.config.enabled {
109            return Ok(());
110        }
111
112        self.running.store(true, Ordering::SeqCst);
113
114        let interval_secs = self.config.interval.num_seconds() as u64;
115        let mut ticker = interval(std::time::Duration::from_secs(interval_secs));
116
117        while self.running.load(Ordering::SeqCst) {
118            ticker.tick().await;
119
120            if let Err(e) = self.compact_once(&store).await {
121                eprintln!("Compaction error: {e}");
122            }
123        }
124
125        Ok(())
126    }
127
128    /// Stop background compaction
129    pub fn stop(&self) {
130        self.running.store(false, Ordering::SeqCst);
131    }
132
133    /// Run a single compaction cycle
134    pub async fn compact_once(&self, store: &ColumnarStore) -> TsdbResult<()> {
135        let start_time = Utc::now();
136        let index = store.index();
137
138        // Get all series
139        let series_ids = index.series_ids()?;
140
141        let mut total_merged = 0;
142        let mut total_created = 0;
143        let mut bytes_saved = 0;
144
145        for series_id in series_ids {
146            let (merged, created, saved) = self.compact_series(store, series_id).await?;
147            total_merged += merged;
148            total_created += created;
149            bytes_saved += saved;
150        }
151
152        // Update statistics
153        {
154            let mut stats = self
155                .stats
156                .write()
157                .map_err(|e| TsdbError::Query(format!("Lock poisoned: {e}")))?;
158            stats.runs += 1;
159            stats.chunks_merged += total_merged;
160            stats.chunks_created += total_created;
161            stats.bytes_saved += bytes_saved;
162            stats.last_run = Some(start_time);
163        }
164
165        Ok(())
166    }
167
168    /// Compact chunks for a single series
169    async fn compact_series(
170        &self,
171        store: &ColumnarStore,
172        series_id: u64,
173    ) -> TsdbResult<(u64, u64, u64)> {
174        let index = store.index();
175        let chunks = index.get_chunks_for_series(series_id)?;
176
177        // Find small chunks that need compaction
178        let mut small_chunks: Vec<ChunkEntry> = chunks
179            .into_iter()
180            .filter(|chunk| {
181                let fill_ratio = chunk.point_count as f64 / self.config.target_chunk_size as f64;
182                fill_ratio < self.config.min_fill_ratio
183            })
184            .collect();
185
186        if small_chunks.is_empty() {
187            return Ok((0, 0, 0));
188        }
189
190        // Sort by time
191        small_chunks.sort_by_key(|c| c.start_time);
192
193        // Group adjacent chunks for merging
194        let merge_groups = self.group_adjacent_chunks(&small_chunks);
195
196        let mut merged_count = 0;
197        let mut created_count = 0;
198        let mut bytes_saved = 0;
199
200        for group in merge_groups {
201            if group.len() < 2 {
202                continue; // Need at least 2 chunks to merge
203            }
204
205            let (merged, created, saved) = self.merge_chunks(store, series_id, &group).await?;
206            merged_count += merged;
207            created_count += created;
208            bytes_saved += saved;
209        }
210
211        Ok((merged_count, created_count, bytes_saved))
212    }
213
214    /// Group adjacent chunks for merging
215    fn group_adjacent_chunks(&self, chunks: &[ChunkEntry]) -> Vec<Vec<ChunkEntry>> {
216        let mut groups = Vec::new();
217        let mut current_group = Vec::new();
218        let mut current_size = 0;
219
220        for chunk in chunks {
221            if current_size + chunk.point_count <= self.config.max_chunk_size {
222                current_group.push(chunk.clone());
223                current_size += chunk.point_count;
224            } else {
225                if current_group.len() > 1 {
226                    groups.push(current_group);
227                }
228                current_group = vec![chunk.clone()];
229                current_size = chunk.point_count;
230            }
231        }
232
233        if current_group.len() > 1 {
234            groups.push(current_group);
235        }
236
237        groups
238    }
239
240    /// Merge a group of chunks into a single chunk
241    async fn merge_chunks(
242        &self,
243        store: &ColumnarStore,
244        series_id: u64,
245        chunks: &[ChunkEntry],
246    ) -> TsdbResult<(u64, u64, u64)> {
247        // Read all chunks and collect data points
248        let mut all_points = Vec::new();
249        let mut total_compressed_size = 0;
250
251        for chunk_entry in chunks {
252            let chunk = store.read_chunk(chunk_entry.chunk_id)?;
253            let points = chunk.decompress()?;
254            all_points.extend(points);
255            total_compressed_size += chunk_entry.compressed_size;
256        }
257
258        // Sort points by timestamp (should already be sorted, but ensure)
259        all_points.sort_by_key(|p| p.timestamp);
260
261        // Remove duplicates
262        all_points.dedup_by_key(|p| p.timestamp);
263
264        if all_points.is_empty() {
265            return Ok((0, 0, 0));
266        }
267
268        // Create new merged chunk
269        let start_time = all_points[0].timestamp;
270        let chunk_duration = self.config.interval;
271        let new_chunk = TimeChunk::new(series_id, start_time, chunk_duration, all_points)?;
272
273        // Write new chunk
274        let new_entry = store.write_chunk(&new_chunk)?;
275
276        // Remove old chunks from index
277        let index = store.index();
278        for chunk_entry in chunks {
279            index.remove_chunk(chunk_entry.chunk_id)?;
280
281            // Delete old chunk file
282            if let Some(path) = &chunk_entry.file_path {
283                let _ = std::fs::remove_file(path); // Ignore errors
284            }
285        }
286
287        // Calculate bytes saved
288        let bytes_saved = total_compressed_size.saturating_sub(new_entry.compressed_size);
289
290        self.bytes_processed
291            .fetch_add(bytes_saved as u64, Ordering::SeqCst);
292
293        Ok((chunks.len() as u64, 1, bytes_saved as u64))
294    }
295
296    /// Get compaction statistics
297    pub fn stats(&self) -> TsdbResult<CompactionStats> {
298        let stats = self
299            .stats
300            .read()
301            .map_err(|e| TsdbError::Query(format!("Lock poisoned: {e}")))?;
302        Ok(stats.clone())
303    }
304
305    /// Reset compaction statistics
306    pub fn reset_stats(&self) -> TsdbResult<()> {
307        let mut stats = self
308            .stats
309            .write()
310            .map_err(|e| TsdbError::Query(format!("Lock poisoned: {e}")))?;
311        *stats = CompactionStats::default();
312        self.bytes_processed.store(0, Ordering::SeqCst);
313        Ok(())
314    }
315
316    /// Check if compactor is running
317    pub fn is_running(&self) -> bool {
318        self.running.load(Ordering::SeqCst)
319    }
320}
321
322impl Default for Compactor {
323    fn default() -> Self {
324        Self::new()
325    }
326}
327
328#[cfg(test)]
329mod tests {
330    use super::*;
331    use crate::series::DataPoint;
332    use std::env;
333
334    fn create_test_chunk(
335        series_id: u64,
336        start_timestamp: i64,
337        count: usize,
338    ) -> TsdbResult<TimeChunk> {
339        let start_time = DateTime::from_timestamp(start_timestamp, 0).unwrap();
340        let mut points = Vec::new();
341
342        for i in 0..count {
343            points.push(DataPoint::new(
344                start_time + Duration::seconds(i as i64),
345                20.0 + (i as f64 * 0.1),
346            ));
347        }
348
349        TimeChunk::new(series_id, start_time, Duration::hours(2), points)
350    }
351
352    #[tokio::test]
353    async fn test_compaction_config() {
354        let config = CompactionConfig::default();
355        assert_eq!(config.interval, Duration::hours(1));
356        assert_eq!(config.min_fill_ratio, 0.1);
357        assert_eq!(config.target_chunk_size, 10_000);
358        assert!(config.enabled);
359    }
360
361    #[tokio::test]
362    async fn test_compactor_creation() {
363        let compactor = Compactor::new();
364        assert!(!compactor.is_running());
365
366        let stats = compactor.stats().unwrap();
367        assert_eq!(stats.runs, 0);
368        assert_eq!(stats.chunks_merged, 0);
369    }
370
371    #[tokio::test]
372    async fn test_group_adjacent_chunks() -> TsdbResult<()> {
373        let config = CompactionConfig {
374            max_chunk_size: 200,
375            ..Default::default()
376        };
377        let compactor = Compactor::with_config(config);
378
379        let chunks = vec![
380            ChunkEntry::new(
381                1,
382                100,
383                DateTime::from_timestamp(1000, 0).unwrap(),
384                DateTime::from_timestamp(1100, 0).unwrap(),
385                50,
386            ),
387            ChunkEntry::new(
388                2,
389                100,
390                DateTime::from_timestamp(1200, 0).unwrap(),
391                DateTime::from_timestamp(1300, 0).unwrap(),
392                60,
393            ),
394            ChunkEntry::new(
395                3,
396                100,
397                DateTime::from_timestamp(1400, 0).unwrap(),
398                DateTime::from_timestamp(1500, 0).unwrap(),
399                70,
400            ),
401        ];
402
403        let groups = compactor.group_adjacent_chunks(&chunks);
404        assert_eq!(groups.len(), 1); // All fit in one group (50+60+70=180 < 200)
405        assert_eq!(groups[0].len(), 3);
406
407        Ok(())
408    }
409
410    #[tokio::test]
411    async fn test_group_respects_max_size() -> TsdbResult<()> {
412        let config = CompactionConfig {
413            max_chunk_size: 100,
414            ..Default::default()
415        };
416        let compactor = Compactor::with_config(config);
417
418        let chunks = vec![
419            ChunkEntry::new(
420                1,
421                100,
422                DateTime::from_timestamp(1000, 0).unwrap(),
423                DateTime::from_timestamp(1100, 0).unwrap(),
424                50,
425            ),
426            ChunkEntry::new(
427                2,
428                100,
429                DateTime::from_timestamp(1200, 0).unwrap(),
430                DateTime::from_timestamp(1300, 0).unwrap(),
431                60,
432            ),
433            ChunkEntry::new(
434                3,
435                100,
436                DateTime::from_timestamp(1400, 0).unwrap(),
437                DateTime::from_timestamp(1500, 0).unwrap(),
438                70,
439            ),
440        ];
441
442        let groups = compactor.group_adjacent_chunks(&chunks);
443        // First two fit (50+60=110 > 100, so just 50)
444        // Second chunk starts new group (60)
445        // Third chunk starts another group (70)
446        // But groups must have >1 chunk, so none qualify
447        assert!(groups.is_empty() || groups.iter().all(|g| g.len() >= 2));
448
449        Ok(())
450    }
451
452    #[tokio::test]
453    async fn test_merge_chunks() -> TsdbResult<()> {
454        let temp_dir = env::temp_dir().join("tsdb_compactor_merge_test");
455        let _ = std::fs::remove_dir_all(&temp_dir);
456
457        let mut store = ColumnarStore::new(&temp_dir, Duration::hours(2), 100)?;
458        store.set_fsync(false);
459
460        // Create two small chunks
461        let chunk1 = create_test_chunk(100, 1000, 50)?;
462        let chunk2 = create_test_chunk(100, 1100, 50)?;
463
464        let entry1 = store.write_chunk(&chunk1)?;
465        let entry2 = store.write_chunk(&chunk2)?;
466
467        // Compact them
468        let compactor = Compactor::new();
469        let (merged, created, _saved) = compactor
470            .merge_chunks(&store, 100, &[entry1, entry2])
471            .await?;
472
473        assert_eq!(merged, 2);
474        assert_eq!(created, 1);
475
476        std::fs::remove_dir_all(&temp_dir)?;
477        Ok(())
478    }
479
480    #[tokio::test]
481    async fn test_stats_tracking() -> TsdbResult<()> {
482        let compactor = Compactor::new();
483
484        let stats = compactor.stats()?;
485        assert_eq!(stats.runs, 0);
486
487        // Simulate a compaction run
488        {
489            let mut stats = compactor.stats.write().unwrap();
490            stats.runs += 1;
491            stats.chunks_merged += 5;
492            stats.chunks_created += 2;
493            stats.bytes_saved += 10_000;
494            stats.last_run = Some(Utc::now());
495        }
496
497        let stats = compactor.stats()?;
498        assert_eq!(stats.runs, 1);
499        assert_eq!(stats.chunks_merged, 5);
500        assert_eq!(stats.chunks_created, 2);
501        assert_eq!(stats.bytes_saved, 10_000);
502        assert!(stats.last_run.is_some());
503
504        Ok(())
505    }
506
507    #[tokio::test]
508    async fn test_reset_stats() -> TsdbResult<()> {
509        let compactor = Compactor::new();
510
511        // Set some stats
512        {
513            let mut stats = compactor.stats.write().unwrap();
514            stats.runs = 10;
515            stats.chunks_merged = 50;
516        }
517
518        let stats_before = compactor.stats()?;
519        assert_eq!(stats_before.runs, 10);
520
521        compactor.reset_stats()?;
522
523        let stats_after = compactor.stats()?;
524        assert_eq!(stats_after.runs, 0);
525        assert_eq!(stats_after.chunks_merged, 0);
526
527        Ok(())
528    }
529
530    #[tokio::test]
531    async fn test_compactor_disabled() {
532        let config = CompactionConfig {
533            enabled: false,
534            ..Default::default()
535        };
536        let compactor = Compactor::with_config(config);
537
538        // Should return immediately without error
539        let temp_dir = env::temp_dir().join("tsdb_compactor_disabled_test");
540        let store = Arc::new(ColumnarStore::new(&temp_dir, Duration::hours(2), 100).unwrap());
541
542        let result = compactor.start(store).await;
543        assert!(result.is_ok());
544
545        let _ = std::fs::remove_dir_all(&temp_dir);
546    }
547}