reddb_server/ec/
worker.rs1use std::sync::atomic::{AtomicBool, Ordering};
2use std::sync::Arc;
3use std::time::Duration;
4
5use super::config::EcRegistry;
6use super::consolidation;
7use crate::storage::unified::store::UnifiedStore;
8
9pub struct EcWorker {
10 running: Arc<AtomicBool>,
11}
12
13impl EcWorker {
14 pub fn new() -> Self {
15 Self {
16 running: Arc::new(AtomicBool::new(false)),
17 }
18 }
19
20 pub fn start(&self, registry: Arc<EcRegistry>, store: Arc<UnifiedStore>) {
21 if self.running.load(Ordering::SeqCst) {
22 return;
23 }
24 self.running.store(true, Ordering::SeqCst);
25
26 let running = Arc::clone(&self.running);
27
28 std::thread::Builder::new()
29 .name("reddb-ec-worker".into())
30 .spawn(move || {
31 while running.load(Ordering::SeqCst) {
32 let configs = registry.async_configs();
33 if configs.is_empty() {
34 std::thread::sleep(Duration::from_secs(10));
35 continue;
36 }
37
38 let min_interval = configs
39 .iter()
40 .map(|c| c.consolidation_interval_secs)
41 .min()
42 .unwrap_or(60);
43
44 std::thread::sleep(Duration::from_secs(min_interval));
45
46 if !running.load(Ordering::SeqCst) {
47 break;
48 }
49
50 for config in &configs {
51 let _ = consolidation::consolidate(&store, config, None);
52 }
53 }
54 })
55 .ok();
56 }
57
58 pub fn stop(&self) {
59 self.running.store(false, Ordering::SeqCst);
60 }
61
62 pub fn is_running(&self) -> bool {
63 self.running.load(Ordering::SeqCst)
64 }
65}