reddb_server/storage/unified/devx/reddb/
impl_ec.rs1use super::*;
2
3use crate::ec::config::{EcFieldConfig, EcMode};
4use crate::ec::consolidation::{self, ConsolidationResult, EcStatus};
5use crate::ec::transactions::{self, EcOperation};
6
7impl RedDB {
8 pub fn ec_add(
11 &self,
12 collection: &str,
13 field: &str,
14 target_id: EntityId,
15 value: f64,
16 ) -> Result<EntityId, Box<dyn std::error::Error>> {
17 self.ec_mutate(collection, field, target_id, value, EcOperation::Add)
18 }
19
20 pub fn ec_sub(
22 &self,
23 collection: &str,
24 field: &str,
25 target_id: EntityId,
26 value: f64,
27 ) -> Result<EntityId, Box<dyn std::error::Error>> {
28 self.ec_mutate(collection, field, target_id, value, EcOperation::Sub)
29 }
30
31 pub fn ec_set(
33 &self,
34 collection: &str,
35 field: &str,
36 target_id: EntityId,
37 value: f64,
38 ) -> Result<EntityId, Box<dyn std::error::Error>> {
39 self.ec_mutate(collection, field, target_id, value, EcOperation::Set)
40 }
41
42 fn ec_mutate(
43 &self,
44 collection: &str,
45 field: &str,
46 target_id: EntityId,
47 value: f64,
48 operation: EcOperation,
49 ) -> Result<EntityId, Box<dyn std::error::Error>> {
50 let config = self.ec_config_or_default(collection, field);
51 let tx_collection = config.tx_collection_name();
52
53 let id = transactions::create_transaction(
54 &self.store,
55 &tx_collection,
56 target_id.raw(),
57 field,
58 value,
59 operation,
60 None,
61 )?;
62
63 if config.mode == EcMode::Sync {
64 consolidation::consolidate(&self.store, &config, Some(target_id.raw()))
65 .map_err(|e| -> Box<dyn std::error::Error> { e.into() })?;
66 }
67
68 Ok(id)
69 }
70
71 pub fn ec_consolidate(
73 &self,
74 collection: &str,
75 field: &str,
76 target_id: Option<u64>,
77 ) -> Result<ConsolidationResult, Box<dyn std::error::Error>> {
78 let config = self.ec_config_or_default(collection, field);
79 consolidation::consolidate(&self.store, &config, target_id)
80 .map_err(|e| -> Box<dyn std::error::Error> { e.into() })
81 }
82
83 pub fn ec_consolidate_all(&self) -> Result<u64, Box<dyn std::error::Error>> {
85 let configs = self.ec_registry.all_configs();
86 let mut total = 0u64;
87 for config in configs {
88 if let Ok(result) = consolidation::consolidate(&self.store, &config, None) {
89 total += result.transactions_applied;
90 }
91 }
92 Ok(total)
93 }
94
95 pub fn ec_status(&self, collection: &str, field: &str, target_id: u64) -> EcStatus {
97 let config = self.ec_config_or_default(collection, field);
98 consolidation::get_ec_status(&self.store, &config, target_id)
99 }
100
101 pub fn ec_register(&self, config: EcFieldConfig) {
103 self.ec_registry.register(config);
104 }
105
106 fn ec_config_or_default(&self, collection: &str, field: &str) -> EcFieldConfig {
107 self.ec_registry
108 .get(collection, field)
109 .unwrap_or_else(|| EcFieldConfig::new(collection, field))
110 }
111}