reddb-io-server 1.1.0

RedDB server-side engine: storage, runtime, replication, MCP, AI, and the gRPC/HTTP/RedWire/PG-wire dispatchers. Re-exported by the umbrella `reddb` crate.
Documentation
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;

use super::config::EcRegistry;
use super::consolidation;
use crate::storage::unified::store::UnifiedStore;

pub struct EcWorker {
    running: Arc<AtomicBool>,
}

impl EcWorker {
    pub fn new() -> Self {
        Self {
            running: Arc::new(AtomicBool::new(false)),
        }
    }

    pub fn start(&self, registry: Arc<EcRegistry>, store: Arc<UnifiedStore>) {
        if self.running.load(Ordering::SeqCst) {
            return;
        }
        self.running.store(true, Ordering::SeqCst);

        let running = Arc::clone(&self.running);

        std::thread::Builder::new()
            .name("reddb-ec-worker".into())
            .spawn(move || {
                while running.load(Ordering::SeqCst) {
                    let configs = registry.async_configs();
                    if configs.is_empty() {
                        std::thread::sleep(Duration::from_secs(10));
                        continue;
                    }

                    let min_interval = configs
                        .iter()
                        .map(|c| c.consolidation_interval_secs)
                        .min()
                        .unwrap_or(60);

                    std::thread::sleep(Duration::from_secs(min_interval));

                    if !running.load(Ordering::SeqCst) {
                        break;
                    }

                    for config in &configs {
                        let _ = consolidation::consolidate(&store, config, None);
                    }
                }
            })
            .ok();
    }

    pub fn stop(&self) {
        self.running.store(false, Ordering::SeqCst);
    }

    pub fn is_running(&self) -> bool {
        self.running.load(Ordering::SeqCst)
    }
}