1use crate::ec::config::{EcFieldConfig, EcMode};
2use crate::ec::consolidation;
3use crate::ec::transactions::{create_transaction, EcOperation};
4use crate::{RedDBError, RedDBResult};
5
6use super::RedDBRuntime;
7
8impl RedDBRuntime {
9 pub fn ec_add(
10 &self,
11 collection: &str,
12 field: &str,
13 target_id: u64,
14 value: f64,
15 source: Option<&str>,
16 ) -> RedDBResult<u64> {
17 self.ec_mutate(
18 collection,
19 field,
20 target_id,
21 value,
22 EcOperation::Add,
23 source,
24 )
25 }
26
27 pub fn ec_sub(
28 &self,
29 collection: &str,
30 field: &str,
31 target_id: u64,
32 value: f64,
33 source: Option<&str>,
34 ) -> RedDBResult<u64> {
35 self.ec_mutate(
36 collection,
37 field,
38 target_id,
39 value,
40 EcOperation::Sub,
41 source,
42 )
43 }
44
45 pub fn ec_set(
46 &self,
47 collection: &str,
48 field: &str,
49 target_id: u64,
50 value: f64,
51 source: Option<&str>,
52 ) -> RedDBResult<u64> {
53 self.ec_mutate(
54 collection,
55 field,
56 target_id,
57 value,
58 EcOperation::Set,
59 source,
60 )
61 }
62
63 fn ec_mutate(
64 &self,
65 collection: &str,
66 field: &str,
67 target_id: u64,
68 value: f64,
69 operation: EcOperation,
70 source: Option<&str>,
71 ) -> RedDBResult<u64> {
72 let config = self.ec_config_or_default(collection, field);
73 let tx_collection = config.tx_collection_name();
74
75 let id = create_transaction(
76 self.inner.db.store().as_ref(),
77 &tx_collection,
78 target_id,
79 field,
80 value,
81 operation,
82 source,
83 )
84 .map_err(RedDBError::Internal)?;
85
86 if config.mode == EcMode::Sync {
88 consolidation::consolidate(self.inner.db.store().as_ref(), &config, Some(target_id))
89 .map_err(RedDBError::Internal)?;
90 }
91
92 Ok(id.raw())
93 }
94
95 pub fn ec_consolidate(
96 &self,
97 collection: &str,
98 field: &str,
99 target_id: Option<u64>,
100 ) -> RedDBResult<consolidation::ConsolidationResult> {
101 let config = self.ec_config_or_default(collection, field);
102 consolidation::consolidate(self.inner.db.store().as_ref(), &config, target_id)
103 .map_err(RedDBError::Internal)
104 }
105
106 pub fn ec_status(
107 &self,
108 collection: &str,
109 field: &str,
110 target_id: u64,
111 ) -> consolidation::EcStatus {
112 let config = self.ec_config_or_default(collection, field);
113 consolidation::get_ec_status(self.inner.db.store().as_ref(), &config, target_id)
114 }
115
116 pub fn ec_register_field(&self, config: EcFieldConfig) {
117 self.inner.ec_registry.register(config);
118 if !self.inner.ec_worker.is_running() && !self.inner.ec_registry.async_configs().is_empty()
120 {
121 self.inner.ec_worker.start(
122 std::sync::Arc::clone(&self.inner.ec_registry),
123 std::sync::Arc::clone(&self.inner.db.store()),
124 );
125 }
126 }
127
128 pub fn ec_global_status(&self) -> Vec<crate::ec::consolidation::EcStatus> {
129 let configs = self.inner.ec_registry.all_configs();
130 let mut statuses = Vec::new();
131 for config in configs {
132 let tx_collection = config.tx_collection_name();
133 let pending = crate::ec::transactions::query_pending_transactions(
134 self.inner.db.store().as_ref(),
135 &tx_collection,
136 None,
137 );
138 let total_pending = pending.len() as u64;
139 statuses.push(crate::ec::consolidation::EcStatus {
140 consolidated: 0.0,
141 pending_value: 0.0,
142 pending_transactions: total_pending,
143 has_pending_set: false,
144 field: config.field.clone(),
145 collection: config.collection.clone(),
146 reducer: config.reducer.as_str().to_string(),
147 mode: if config.mode == EcMode::Sync {
148 "sync"
149 } else {
150 "async"
151 }
152 .to_string(),
153 });
154 }
155 statuses
156 }
157
158 pub fn ec_shutdown(&self) -> RedDBResult<u64> {
161 self.inner.ec_worker.stop();
162
163 let configs = self.inner.ec_registry.all_configs();
165 let mut total = 0u64;
166 for config in configs {
167 if let Ok(result) =
168 consolidation::consolidate(self.inner.db.store().as_ref(), &config, None)
169 {
170 total += result.transactions_applied;
171 }
172 }
173 Ok(total)
174 }
175
176 fn ec_config_or_default(&self, collection: &str, field: &str) -> EcFieldConfig {
177 self.inner
178 .ec_registry
179 .get(collection, field)
180 .unwrap_or_else(|| EcFieldConfig::new(collection, field))
181 }
182}