Skip to main content

reddb_server/runtime/
impl_ec.rs

1use crate::ec::config::{EcFieldConfig, EcMode};
2use crate::ec::consolidation;
3use crate::ec::transactions::{create_transaction, EcOperation};
4use crate::{RedDBError, RedDBResult};
5
6use super::RedDBRuntime;
7
8impl RedDBRuntime {
9    pub fn ec_add(
10        &self,
11        collection: &str,
12        field: &str,
13        target_id: u64,
14        value: f64,
15        source: Option<&str>,
16    ) -> RedDBResult<u64> {
17        self.ec_mutate(
18            collection,
19            field,
20            target_id,
21            value,
22            EcOperation::Add,
23            source,
24        )
25    }
26
27    pub fn ec_sub(
28        &self,
29        collection: &str,
30        field: &str,
31        target_id: u64,
32        value: f64,
33        source: Option<&str>,
34    ) -> RedDBResult<u64> {
35        self.ec_mutate(
36            collection,
37            field,
38            target_id,
39            value,
40            EcOperation::Sub,
41            source,
42        )
43    }
44
45    pub fn ec_set(
46        &self,
47        collection: &str,
48        field: &str,
49        target_id: u64,
50        value: f64,
51        source: Option<&str>,
52    ) -> RedDBResult<u64> {
53        self.ec_mutate(
54            collection,
55            field,
56            target_id,
57            value,
58            EcOperation::Set,
59            source,
60        )
61    }
62
63    fn ec_mutate(
64        &self,
65        collection: &str,
66        field: &str,
67        target_id: u64,
68        value: f64,
69        operation: EcOperation,
70        source: Option<&str>,
71    ) -> RedDBResult<u64> {
72        let config = self.ec_config_or_default(collection, field);
73        let tx_collection = config.tx_collection_name();
74
75        let id = create_transaction(
76            self.inner.db.store().as_ref(),
77            &tx_collection,
78            target_id,
79            field,
80            value,
81            operation,
82            source,
83        )
84        .map_err(RedDBError::Internal)?;
85
86        // Sync mode: consolidate immediately
87        if config.mode == EcMode::Sync {
88            consolidation::consolidate(self.inner.db.store().as_ref(), &config, Some(target_id))
89                .map_err(RedDBError::Internal)?;
90        }
91
92        Ok(id.raw())
93    }
94
95    pub fn ec_consolidate(
96        &self,
97        collection: &str,
98        field: &str,
99        target_id: Option<u64>,
100    ) -> RedDBResult<consolidation::ConsolidationResult> {
101        let config = self.ec_config_or_default(collection, field);
102        consolidation::consolidate(self.inner.db.store().as_ref(), &config, target_id)
103            .map_err(RedDBError::Internal)
104    }
105
106    pub fn ec_status(
107        &self,
108        collection: &str,
109        field: &str,
110        target_id: u64,
111    ) -> consolidation::EcStatus {
112        let config = self.ec_config_or_default(collection, field);
113        consolidation::get_ec_status(self.inner.db.store().as_ref(), &config, target_id)
114    }
115
116    pub fn ec_register_field(&self, config: EcFieldConfig) {
117        self.inner.ec_registry.register(config);
118        // Restart worker if needed
119        if !self.inner.ec_worker.is_running() && !self.inner.ec_registry.async_configs().is_empty()
120        {
121            self.inner.ec_worker.start(
122                std::sync::Arc::clone(&self.inner.ec_registry),
123                std::sync::Arc::clone(&self.inner.db.store()),
124            );
125        }
126    }
127
128    pub fn ec_global_status(&self) -> Vec<crate::ec::consolidation::EcStatus> {
129        let configs = self.inner.ec_registry.all_configs();
130        let mut statuses = Vec::new();
131        for config in configs {
132            let tx_collection = config.tx_collection_name();
133            let pending = crate::ec::transactions::query_pending_transactions(
134                self.inner.db.store().as_ref(),
135                &tx_collection,
136                None,
137            );
138            let total_pending = pending.len() as u64;
139            statuses.push(crate::ec::consolidation::EcStatus {
140                consolidated: 0.0,
141                pending_value: 0.0,
142                pending_transactions: total_pending,
143                has_pending_set: false,
144                field: config.field.clone(),
145                collection: config.collection.clone(),
146                reducer: config.reducer.as_str().to_string(),
147                mode: if config.mode == EcMode::Sync {
148                    "sync"
149                } else {
150                    "async"
151                }
152                .to_string(),
153            });
154        }
155        statuses
156    }
157
158    /// Graceful EC shutdown: consolidate all pending, stop worker, flush.
159    /// Call during serverless reclaim or application shutdown.
160    pub fn ec_shutdown(&self) -> RedDBResult<u64> {
161        self.inner.ec_worker.stop();
162
163        // Consolidate all async fields
164        let configs = self.inner.ec_registry.all_configs();
165        let mut total = 0u64;
166        for config in configs {
167            if let Ok(result) =
168                consolidation::consolidate(self.inner.db.store().as_ref(), &config, None)
169            {
170                total += result.transactions_applied;
171            }
172        }
173        Ok(total)
174    }
175
176    fn ec_config_or_default(&self, collection: &str, field: &str) -> EcFieldConfig {
177        self.inner
178            .ec_registry
179            .get(collection, field)
180            .unwrap_or_else(|| EcFieldConfig::new(collection, field))
181    }
182}