Skip to main content

reddb_server/ec/
worker.rs

1use std::sync::atomic::{AtomicBool, Ordering};
2use std::sync::Arc;
3use std::time::Duration;
4
5use super::config::EcRegistry;
6use super::consolidation;
7use crate::storage::unified::store::UnifiedStore;
8
9pub struct EcWorker {
10    running: Arc<AtomicBool>,
11}
12
13impl EcWorker {
14    pub fn new() -> Self {
15        Self {
16            running: Arc::new(AtomicBool::new(false)),
17        }
18    }
19
20    pub fn start(&self, registry: Arc<EcRegistry>, store: Arc<UnifiedStore>) {
21        if self.running.load(Ordering::SeqCst) {
22            return;
23        }
24        self.running.store(true, Ordering::SeqCst);
25
26        let running = Arc::clone(&self.running);
27
28        std::thread::Builder::new()
29            .name("reddb-ec-worker".into())
30            .spawn(move || {
31                while running.load(Ordering::SeqCst) {
32                    let configs = registry.async_configs();
33                    if configs.is_empty() {
34                        std::thread::sleep(Duration::from_secs(10));
35                        continue;
36                    }
37
38                    let min_interval = configs
39                        .iter()
40                        .map(|c| c.consolidation_interval_secs)
41                        .min()
42                        .unwrap_or(60);
43
44                    std::thread::sleep(Duration::from_secs(min_interval));
45
46                    if !running.load(Ordering::SeqCst) {
47                        break;
48                    }
49
50                    for config in &configs {
51                        let _ = consolidation::consolidate(&store, config, None);
52                    }
53                }
54            })
55            .ok();
56    }
57
58    pub fn stop(&self) {
59        self.running.store(false, Ordering::SeqCst);
60    }
61
62    pub fn is_running(&self) -> bool {
63        self.running.load(Ordering::SeqCst)
64    }
65}