1use crate::error::{TsdbError, TsdbResult};
20use crate::storage::{ChunkEntry, ColumnarStore, TimeChunk};
21use chrono::{DateTime, Duration, Utc};
22use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
23use std::sync::{Arc, RwLock};
24use tokio::time::interval;
25
26#[derive(Debug, Clone)]
28pub struct CompactionConfig {
29 pub interval: Duration,
31
32 pub min_fill_ratio: f64,
34
35 pub target_chunk_size: usize,
37
38 pub max_chunk_size: usize,
40
41 pub enabled: bool,
43}
44
45impl Default for CompactionConfig {
46 fn default() -> Self {
47 Self {
48 interval: Duration::hours(1),
49 min_fill_ratio: 0.1,
50 target_chunk_size: 10_000,
51 max_chunk_size: 100_000,
52 enabled: true,
53 }
54 }
55}
56
57#[derive(Debug, Clone, Default)]
59pub struct CompactionStats {
60 pub runs: u64,
62 pub chunks_merged: u64,
64 pub chunks_created: u64,
66 pub bytes_saved: u64,
68 pub last_run: Option<DateTime<Utc>>,
70}
71
72#[derive(Debug)]
74pub struct Compactor {
75 config: CompactionConfig,
77
78 stats: Arc<RwLock<CompactionStats>>,
80
81 running: Arc<AtomicBool>,
83
84 bytes_processed: Arc<AtomicU64>,
86}
87
88impl Compactor {
89 pub fn new() -> Self {
91 Self::with_config(CompactionConfig::default())
92 }
93
94 pub fn with_config(config: CompactionConfig) -> Self {
96 Self {
97 config,
98 stats: Arc::new(RwLock::new(CompactionStats::default())),
99 running: Arc::new(AtomicBool::new(false)),
100 bytes_processed: Arc::new(AtomicU64::new(0)),
101 }
102 }
103
104 pub async fn start(&self, store: Arc<ColumnarStore>) -> TsdbResult<()> {
108 if !self.config.enabled {
109 return Ok(());
110 }
111
112 self.running.store(true, Ordering::SeqCst);
113
114 let interval_secs = self.config.interval.num_seconds() as u64;
115 let mut ticker = interval(std::time::Duration::from_secs(interval_secs));
116
117 while self.running.load(Ordering::SeqCst) {
118 ticker.tick().await;
119
120 if let Err(e) = self.compact_once(&store).await {
121 eprintln!("Compaction error: {e}");
122 }
123 }
124
125 Ok(())
126 }
127
128 pub fn stop(&self) {
130 self.running.store(false, Ordering::SeqCst);
131 }
132
133 pub async fn compact_once(&self, store: &ColumnarStore) -> TsdbResult<()> {
135 let start_time = Utc::now();
136 let index = store.index();
137
138 let series_ids = index.series_ids()?;
140
141 let mut total_merged = 0;
142 let mut total_created = 0;
143 let mut bytes_saved = 0;
144
145 for series_id in series_ids {
146 let (merged, created, saved) = self.compact_series(store, series_id).await?;
147 total_merged += merged;
148 total_created += created;
149 bytes_saved += saved;
150 }
151
152 {
154 let mut stats = self
155 .stats
156 .write()
157 .map_err(|e| TsdbError::Query(format!("Lock poisoned: {e}")))?;
158 stats.runs += 1;
159 stats.chunks_merged += total_merged;
160 stats.chunks_created += total_created;
161 stats.bytes_saved += bytes_saved;
162 stats.last_run = Some(start_time);
163 }
164
165 Ok(())
166 }
167
168 async fn compact_series(
170 &self,
171 store: &ColumnarStore,
172 series_id: u64,
173 ) -> TsdbResult<(u64, u64, u64)> {
174 let index = store.index();
175 let chunks = index.get_chunks_for_series(series_id)?;
176
177 let mut small_chunks: Vec<ChunkEntry> = chunks
179 .into_iter()
180 .filter(|chunk| {
181 let fill_ratio = chunk.point_count as f64 / self.config.target_chunk_size as f64;
182 fill_ratio < self.config.min_fill_ratio
183 })
184 .collect();
185
186 if small_chunks.is_empty() {
187 return Ok((0, 0, 0));
188 }
189
190 small_chunks.sort_by_key(|c| c.start_time);
192
193 let merge_groups = self.group_adjacent_chunks(&small_chunks);
195
196 let mut merged_count = 0;
197 let mut created_count = 0;
198 let mut bytes_saved = 0;
199
200 for group in merge_groups {
201 if group.len() < 2 {
202 continue; }
204
205 let (merged, created, saved) = self.merge_chunks(store, series_id, &group).await?;
206 merged_count += merged;
207 created_count += created;
208 bytes_saved += saved;
209 }
210
211 Ok((merged_count, created_count, bytes_saved))
212 }
213
214 fn group_adjacent_chunks(&self, chunks: &[ChunkEntry]) -> Vec<Vec<ChunkEntry>> {
216 let mut groups = Vec::new();
217 let mut current_group = Vec::new();
218 let mut current_size = 0;
219
220 for chunk in chunks {
221 if current_size + chunk.point_count <= self.config.max_chunk_size {
222 current_group.push(chunk.clone());
223 current_size += chunk.point_count;
224 } else {
225 if current_group.len() > 1 {
226 groups.push(current_group);
227 }
228 current_group = vec![chunk.clone()];
229 current_size = chunk.point_count;
230 }
231 }
232
233 if current_group.len() > 1 {
234 groups.push(current_group);
235 }
236
237 groups
238 }
239
240 async fn merge_chunks(
242 &self,
243 store: &ColumnarStore,
244 series_id: u64,
245 chunks: &[ChunkEntry],
246 ) -> TsdbResult<(u64, u64, u64)> {
247 let mut all_points = Vec::new();
249 let mut total_compressed_size = 0;
250
251 for chunk_entry in chunks {
252 let chunk = store.read_chunk(chunk_entry.chunk_id)?;
253 let points = chunk.decompress()?;
254 all_points.extend(points);
255 total_compressed_size += chunk_entry.compressed_size;
256 }
257
258 all_points.sort_by_key(|p| p.timestamp);
260
261 all_points.dedup_by_key(|p| p.timestamp);
263
264 if all_points.is_empty() {
265 return Ok((0, 0, 0));
266 }
267
268 let start_time = all_points[0].timestamp;
270 let chunk_duration = self.config.interval;
271 let new_chunk = TimeChunk::new(series_id, start_time, chunk_duration, all_points)?;
272
273 let new_entry = store.write_chunk(&new_chunk)?;
275
276 let index = store.index();
278 for chunk_entry in chunks {
279 index.remove_chunk(chunk_entry.chunk_id)?;
280
281 if let Some(path) = &chunk_entry.file_path {
283 let _ = std::fs::remove_file(path); }
285 }
286
287 let bytes_saved = total_compressed_size.saturating_sub(new_entry.compressed_size);
289
290 self.bytes_processed
291 .fetch_add(bytes_saved as u64, Ordering::SeqCst);
292
293 Ok((chunks.len() as u64, 1, bytes_saved as u64))
294 }
295
296 pub fn stats(&self) -> TsdbResult<CompactionStats> {
298 let stats = self
299 .stats
300 .read()
301 .map_err(|e| TsdbError::Query(format!("Lock poisoned: {e}")))?;
302 Ok(stats.clone())
303 }
304
305 pub fn reset_stats(&self) -> TsdbResult<()> {
307 let mut stats = self
308 .stats
309 .write()
310 .map_err(|e| TsdbError::Query(format!("Lock poisoned: {e}")))?;
311 *stats = CompactionStats::default();
312 self.bytes_processed.store(0, Ordering::SeqCst);
313 Ok(())
314 }
315
316 pub fn is_running(&self) -> bool {
318 self.running.load(Ordering::SeqCst)
319 }
320}
321
322impl Default for Compactor {
323 fn default() -> Self {
324 Self::new()
325 }
326}
327
328#[cfg(test)]
329mod tests {
330 use super::*;
331 use crate::series::DataPoint;
332 use std::env;
333
334 fn create_test_chunk(
335 series_id: u64,
336 start_timestamp: i64,
337 count: usize,
338 ) -> TsdbResult<TimeChunk> {
339 let start_time = DateTime::from_timestamp(start_timestamp, 0).unwrap();
340 let mut points = Vec::new();
341
342 for i in 0..count {
343 points.push(DataPoint::new(
344 start_time + Duration::seconds(i as i64),
345 20.0 + (i as f64 * 0.1),
346 ));
347 }
348
349 TimeChunk::new(series_id, start_time, Duration::hours(2), points)
350 }
351
352 #[tokio::test]
353 async fn test_compaction_config() {
354 let config = CompactionConfig::default();
355 assert_eq!(config.interval, Duration::hours(1));
356 assert_eq!(config.min_fill_ratio, 0.1);
357 assert_eq!(config.target_chunk_size, 10_000);
358 assert!(config.enabled);
359 }
360
361 #[tokio::test]
362 async fn test_compactor_creation() {
363 let compactor = Compactor::new();
364 assert!(!compactor.is_running());
365
366 let stats = compactor.stats().unwrap();
367 assert_eq!(stats.runs, 0);
368 assert_eq!(stats.chunks_merged, 0);
369 }
370
371 #[tokio::test]
372 async fn test_group_adjacent_chunks() -> TsdbResult<()> {
373 let config = CompactionConfig {
374 max_chunk_size: 200,
375 ..Default::default()
376 };
377 let compactor = Compactor::with_config(config);
378
379 let chunks = vec![
380 ChunkEntry::new(
381 1,
382 100,
383 DateTime::from_timestamp(1000, 0).unwrap(),
384 DateTime::from_timestamp(1100, 0).unwrap(),
385 50,
386 ),
387 ChunkEntry::new(
388 2,
389 100,
390 DateTime::from_timestamp(1200, 0).unwrap(),
391 DateTime::from_timestamp(1300, 0).unwrap(),
392 60,
393 ),
394 ChunkEntry::new(
395 3,
396 100,
397 DateTime::from_timestamp(1400, 0).unwrap(),
398 DateTime::from_timestamp(1500, 0).unwrap(),
399 70,
400 ),
401 ];
402
403 let groups = compactor.group_adjacent_chunks(&chunks);
404 assert_eq!(groups.len(), 1); assert_eq!(groups[0].len(), 3);
406
407 Ok(())
408 }
409
410 #[tokio::test]
411 async fn test_group_respects_max_size() -> TsdbResult<()> {
412 let config = CompactionConfig {
413 max_chunk_size: 100,
414 ..Default::default()
415 };
416 let compactor = Compactor::with_config(config);
417
418 let chunks = vec![
419 ChunkEntry::new(
420 1,
421 100,
422 DateTime::from_timestamp(1000, 0).unwrap(),
423 DateTime::from_timestamp(1100, 0).unwrap(),
424 50,
425 ),
426 ChunkEntry::new(
427 2,
428 100,
429 DateTime::from_timestamp(1200, 0).unwrap(),
430 DateTime::from_timestamp(1300, 0).unwrap(),
431 60,
432 ),
433 ChunkEntry::new(
434 3,
435 100,
436 DateTime::from_timestamp(1400, 0).unwrap(),
437 DateTime::from_timestamp(1500, 0).unwrap(),
438 70,
439 ),
440 ];
441
442 let groups = compactor.group_adjacent_chunks(&chunks);
443 assert!(groups.is_empty() || groups.iter().all(|g| g.len() >= 2));
448
449 Ok(())
450 }
451
452 #[tokio::test]
453 async fn test_merge_chunks() -> TsdbResult<()> {
454 let temp_dir = env::temp_dir().join("tsdb_compactor_merge_test");
455 let _ = std::fs::remove_dir_all(&temp_dir);
456
457 let mut store = ColumnarStore::new(&temp_dir, Duration::hours(2), 100)?;
458 store.set_fsync(false);
459
460 let chunk1 = create_test_chunk(100, 1000, 50)?;
462 let chunk2 = create_test_chunk(100, 1100, 50)?;
463
464 let entry1 = store.write_chunk(&chunk1)?;
465 let entry2 = store.write_chunk(&chunk2)?;
466
467 let compactor = Compactor::new();
469 let (merged, created, _saved) = compactor
470 .merge_chunks(&store, 100, &[entry1, entry2])
471 .await?;
472
473 assert_eq!(merged, 2);
474 assert_eq!(created, 1);
475
476 std::fs::remove_dir_all(&temp_dir)?;
477 Ok(())
478 }
479
480 #[tokio::test]
481 async fn test_stats_tracking() -> TsdbResult<()> {
482 let compactor = Compactor::new();
483
484 let stats = compactor.stats()?;
485 assert_eq!(stats.runs, 0);
486
487 {
489 let mut stats = compactor.stats.write().unwrap();
490 stats.runs += 1;
491 stats.chunks_merged += 5;
492 stats.chunks_created += 2;
493 stats.bytes_saved += 10_000;
494 stats.last_run = Some(Utc::now());
495 }
496
497 let stats = compactor.stats()?;
498 assert_eq!(stats.runs, 1);
499 assert_eq!(stats.chunks_merged, 5);
500 assert_eq!(stats.chunks_created, 2);
501 assert_eq!(stats.bytes_saved, 10_000);
502 assert!(stats.last_run.is_some());
503
504 Ok(())
505 }
506
507 #[tokio::test]
508 async fn test_reset_stats() -> TsdbResult<()> {
509 let compactor = Compactor::new();
510
511 {
513 let mut stats = compactor.stats.write().unwrap();
514 stats.runs = 10;
515 stats.chunks_merged = 50;
516 }
517
518 let stats_before = compactor.stats()?;
519 assert_eq!(stats_before.runs, 10);
520
521 compactor.reset_stats()?;
522
523 let stats_after = compactor.stats()?;
524 assert_eq!(stats_after.runs, 0);
525 assert_eq!(stats_after.chunks_merged, 0);
526
527 Ok(())
528 }
529
530 #[tokio::test]
531 async fn test_compactor_disabled() {
532 let config = CompactionConfig {
533 enabled: false,
534 ..Default::default()
535 };
536 let compactor = Compactor::with_config(config);
537
538 let temp_dir = env::temp_dir().join("tsdb_compactor_disabled_test");
540 let store = Arc::new(ColumnarStore::new(&temp_dir, Duration::hours(2), 100).unwrap());
541
542 let result = compactor.start(store).await;
543 assert!(result.is_ok());
544
545 let _ = std::fs::remove_dir_all(&temp_dir);
546 }
547}