Skip to main content

reddb_server/storage/unified/devx/reddb/
impl_ec.rs

1use super::*;
2
3use crate::ec::config::{EcFieldConfig, EcMode};
4use crate::ec::consolidation::{self, ConsolidationResult, EcStatus};
5use crate::ec::transactions::{self, EcOperation};
6
7impl RedDB {
8    /// Add a value to a field via eventual consistency.
9    /// In Sync mode, consolidates immediately. In Async mode, queues for background consolidation.
10    pub fn ec_add(
11        &self,
12        collection: &str,
13        field: &str,
14        target_id: EntityId,
15        value: f64,
16    ) -> Result<EntityId, Box<dyn std::error::Error>> {
17        self.ec_mutate(collection, field, target_id, value, EcOperation::Add)
18    }
19
20    /// Subtract a value from a field via eventual consistency.
21    pub fn ec_sub(
22        &self,
23        collection: &str,
24        field: &str,
25        target_id: EntityId,
26        value: f64,
27    ) -> Result<EntityId, Box<dyn std::error::Error>> {
28        self.ec_mutate(collection, field, target_id, value, EcOperation::Sub)
29    }
30
31    /// Set a field to a specific value via eventual consistency (overrides previous adds/subs).
32    pub fn ec_set(
33        &self,
34        collection: &str,
35        field: &str,
36        target_id: EntityId,
37        value: f64,
38    ) -> Result<EntityId, Box<dyn std::error::Error>> {
39        self.ec_mutate(collection, field, target_id, value, EcOperation::Set)
40    }
41
42    fn ec_mutate(
43        &self,
44        collection: &str,
45        field: &str,
46        target_id: EntityId,
47        value: f64,
48        operation: EcOperation,
49    ) -> Result<EntityId, Box<dyn std::error::Error>> {
50        let config = self.ec_config_or_default(collection, field);
51        let tx_collection = config.tx_collection_name();
52
53        let id = transactions::create_transaction(
54            &self.store,
55            &tx_collection,
56            target_id.raw(),
57            field,
58            value,
59            operation,
60            None,
61        )?;
62
63        if config.mode == EcMode::Sync {
64            consolidation::consolidate(&self.store, &config, Some(target_id.raw()))
65                .map_err(|e| -> Box<dyn std::error::Error> { e.into() })?;
66        }
67
68        Ok(id)
69    }
70
71    /// Consolidate all pending transactions for a field (or a specific entity).
72    pub fn ec_consolidate(
73        &self,
74        collection: &str,
75        field: &str,
76        target_id: Option<u64>,
77    ) -> Result<ConsolidationResult, Box<dyn std::error::Error>> {
78        let config = self.ec_config_or_default(collection, field);
79        consolidation::consolidate(&self.store, &config, target_id)
80            .map_err(|e| -> Box<dyn std::error::Error> { e.into() })
81    }
82
83    /// Consolidate ALL registered EC fields. Useful before flush().
84    pub fn ec_consolidate_all(&self) -> Result<u64, Box<dyn std::error::Error>> {
85        let configs = self.ec_registry.all_configs();
86        let mut total = 0u64;
87        for config in configs {
88            if let Ok(result) = consolidation::consolidate(&self.store, &config, None) {
89                total += result.transactions_applied;
90            }
91        }
92        Ok(total)
93    }
94
95    /// Get the consolidation status for a specific entity's field.
96    pub fn ec_status(&self, collection: &str, field: &str, target_id: u64) -> EcStatus {
97        let config = self.ec_config_or_default(collection, field);
98        consolidation::get_ec_status(&self.store, &config, target_id)
99    }
100
101    /// Register a field for eventual consistency.
102    pub fn ec_register(&self, config: EcFieldConfig) {
103        self.ec_registry.register(config);
104    }
105
106    fn ec_config_or_default(&self, collection: &str, field: &str) -> EcFieldConfig {
107        self.ec_registry
108            .get(collection, field)
109            .unwrap_or_else(|| EcFieldConfig::new(collection, field))
110    }
111}