1use std::collections::HashMap;
2
3use super::config::{EcFieldConfig, EcReducer};
4use super::transactions::{
5 mark_transactions_applied, query_pending_transactions, EcOperation, EcTransaction,
6};
7use crate::storage::schema::Value;
8use crate::storage::unified::entity::{EntityData, EntityId};
9use crate::storage::unified::store::UnifiedStore;
10
11#[derive(Debug, Clone, Default)]
12pub struct ConsolidationResult {
13 pub records_consolidated: u64,
14 pub transactions_applied: u64,
15 pub errors: u64,
16}
17
18pub fn consolidate(
19 store: &UnifiedStore,
20 config: &EcFieldConfig,
21 target_id: Option<u64>,
22) -> Result<ConsolidationResult, String> {
23 let tx_collection = config.tx_collection_name();
24 let pending = query_pending_transactions(store, &tx_collection, target_id);
25
26 if pending.is_empty() {
27 return Ok(ConsolidationResult::default());
28 }
29
30 let mut groups: HashMap<u64, Vec<(EntityId, EcTransaction)>> = HashMap::new();
32 for (eid, tx) in pending {
33 groups.entry(tx.target_id).or_default().push((eid, tx));
34 }
35
36 let mut result = ConsolidationResult::default();
37
38 for (tid, transactions) in groups {
39 let applied_ids: Vec<EntityId> = transactions.iter().map(|(eid, _)| *eid).collect();
43 mark_transactions_applied(store, &tx_collection, &applied_ids);
44
45 match consolidate_record(store, config, tid, &transactions) {
46 Ok(applied_count) => {
47 result.records_consolidated += 1;
48 result.transactions_applied += applied_count;
49 }
50 Err(_) => {
51 result.errors += 1;
52 }
53 }
54 }
55
56 Ok(result)
57}
58
59fn consolidate_record(
60 store: &UnifiedStore,
61 config: &EcFieldConfig,
62 target_id: u64,
63 transactions: &[(EntityId, EcTransaction)],
64) -> Result<u64, String> {
65 if transactions.is_empty() {
66 return Ok(0);
67 }
68
69 let last_set_idx = transactions
71 .iter()
72 .rposition(|(_, tx)| tx.operation == EcOperation::Set);
73
74 let current_value = read_field_value(store, &config.collection, target_id, &config.field);
76 let base_value = if let Some(idx) = last_set_idx {
77 transactions[idx].1.value
78 } else {
79 current_value.unwrap_or(config.initial_value)
80 };
81
82 let start_idx = last_set_idx.map(|i| i + 1).unwrap_or(0);
84 let mut new_value = base_value;
85 let mut count = 0u64;
86
87 for (_, tx) in &transactions[start_idx..] {
88 match tx.operation {
89 EcOperation::Add => {
90 new_value = config.reducer.apply(new_value, tx.value, count);
91 count += 1;
92 }
93 EcOperation::Sub => {
94 let negated = match config.reducer {
95 EcReducer::Sum => new_value - tx.value,
96 EcReducer::Min => new_value.min(tx.value),
97 EcReducer::Max => new_value.max(tx.value),
98 _ => config.reducer.apply(new_value, -tx.value, count),
99 };
100 new_value = negated;
101 count += 1;
102 }
103 EcOperation::Set => {
104 new_value = tx.value;
105 count = 0;
106 }
107 }
108 }
109
110 write_field_value(
112 store,
113 &config.collection,
114 target_id,
115 &config.field,
116 new_value,
117 )?;
118
119 Ok(transactions.len() as u64)
120}
121
122fn read_field_value(
123 store: &UnifiedStore,
124 collection: &str,
125 entity_id: u64,
126 field: &str,
127) -> Option<f64> {
128 let manager = store.get_collection(collection)?;
129 let entity = manager.get(EntityId::new(entity_id))?;
130 let row = entity.data.as_row()?;
131 let value = row.get_field(field)?;
132 match value {
133 Value::Float(f) => Some(*f),
134 Value::Integer(n) => Some(*n as f64),
135 Value::UnsignedInteger(n) => Some(*n as f64),
136 _ => None,
137 }
138}
139
140fn write_field_value(
141 store: &UnifiedStore,
142 collection: &str,
143 entity_id: u64,
144 field: &str,
145 value: f64,
146) -> Result<(), String> {
147 let manager = store
148 .get_collection(collection)
149 .ok_or_else(|| format!("collection '{}' not found", collection))?;
150
151 let mut entity = manager
152 .get(EntityId::new(entity_id))
153 .ok_or_else(|| format!("entity {} not found in '{}'", entity_id, collection))?;
154
155 if let EntityData::Row(ref mut row) = entity.data {
156 if let Some(ref mut named) = row.named {
157 named.insert(field.to_string(), Value::Float(value));
158 }
159 }
160
161 manager
162 .update(entity)
163 .map_err(|e| format!("update failed: {:?}", e))?;
164
165 Ok(())
166}
167
168pub fn get_ec_status(store: &UnifiedStore, config: &EcFieldConfig, target_id: u64) -> EcStatus {
169 let consolidated = read_field_value(store, &config.collection, target_id, &config.field)
170 .unwrap_or(config.initial_value);
171
172 let tx_collection = config.tx_collection_name();
173 let pending = query_pending_transactions(store, &tx_collection, Some(target_id));
174
175 let pending_value: f64 = pending
176 .iter()
177 .map(|(_, tx)| match tx.operation {
178 EcOperation::Add => tx.value,
179 EcOperation::Sub => -tx.value,
180 EcOperation::Set => 0.0,
181 })
182 .sum();
183
184 let has_set = pending
185 .iter()
186 .any(|(_, tx)| tx.operation == EcOperation::Set);
187
188 EcStatus {
189 consolidated,
190 pending_value,
191 pending_transactions: pending.len() as u64,
192 has_pending_set: has_set,
193 field: config.field.clone(),
194 collection: config.collection.clone(),
195 reducer: config.reducer.as_str().to_string(),
196 mode: if config.mode == super::config::EcMode::Sync {
197 "sync"
198 } else {
199 "async"
200 }
201 .to_string(),
202 }
203}
204
205#[derive(Debug, Clone)]
206pub struct EcStatus {
207 pub consolidated: f64,
208 pub pending_value: f64,
209 pub pending_transactions: u64,
210 pub has_pending_set: bool,
211 pub field: String,
212 pub collection: String,
213 pub reducer: String,
214 pub mode: String,
215}