Skip to main content

reddb_server/ec/
consolidation.rs

1use std::collections::HashMap;
2
3use super::config::{EcFieldConfig, EcReducer};
4use super::transactions::{
5    mark_transactions_applied, query_pending_transactions, EcOperation, EcTransaction,
6};
7use crate::storage::schema::Value;
8use crate::storage::unified::entity::{EntityData, EntityId};
9use crate::storage::unified::store::UnifiedStore;
10
11#[derive(Debug, Clone, Default)]
12pub struct ConsolidationResult {
13    pub records_consolidated: u64,
14    pub transactions_applied: u64,
15    pub errors: u64,
16}
17
18pub fn consolidate(
19    store: &UnifiedStore,
20    config: &EcFieldConfig,
21    target_id: Option<u64>,
22) -> Result<ConsolidationResult, String> {
23    let tx_collection = config.tx_collection_name();
24    let pending = query_pending_transactions(store, &tx_collection, target_id);
25
26    if pending.is_empty() {
27        return Ok(ConsolidationResult::default());
28    }
29
30    // Group by target_id
31    let mut groups: HashMap<u64, Vec<(EntityId, EcTransaction)>> = HashMap::new();
32    for (eid, tx) in pending {
33        groups.entry(tx.target_id).or_default().push((eid, tx));
34    }
35
36    let mut result = ConsolidationResult::default();
37
38    for (tid, transactions) in groups {
39        // Mark applied FIRST — if crash occurs after this but before write,
40        // transactions won't be re-processed (no double-counting).
41        // The value update is idempotent and will be recomputed on next consolidation.
42        let applied_ids: Vec<EntityId> = transactions.iter().map(|(eid, _)| *eid).collect();
43        mark_transactions_applied(store, &tx_collection, &applied_ids);
44
45        match consolidate_record(store, config, tid, &transactions) {
46            Ok(applied_count) => {
47                result.records_consolidated += 1;
48                result.transactions_applied += applied_count;
49            }
50            Err(_) => {
51                result.errors += 1;
52            }
53        }
54    }
55
56    Ok(result)
57}
58
59fn consolidate_record(
60    store: &UnifiedStore,
61    config: &EcFieldConfig,
62    target_id: u64,
63    transactions: &[(EntityId, EcTransaction)],
64) -> Result<u64, String> {
65    if transactions.is_empty() {
66        return Ok(0);
67    }
68
69    // Find the last SET operation (if any)
70    let last_set_idx = transactions
71        .iter()
72        .rposition(|(_, tx)| tx.operation == EcOperation::Set);
73
74    // Determine base value
75    let current_value = read_field_value(store, &config.collection, target_id, &config.field);
76    let base_value = if let Some(idx) = last_set_idx {
77        transactions[idx].1.value
78    } else {
79        current_value.unwrap_or(config.initial_value)
80    };
81
82    // Apply subsequent operations
83    let start_idx = last_set_idx.map(|i| i + 1).unwrap_or(0);
84    let mut new_value = base_value;
85    let mut count = 0u64;
86
87    for (_, tx) in &transactions[start_idx..] {
88        match tx.operation {
89            EcOperation::Add => {
90                new_value = config.reducer.apply(new_value, tx.value, count);
91                count += 1;
92            }
93            EcOperation::Sub => {
94                let negated = match config.reducer {
95                    EcReducer::Sum => new_value - tx.value,
96                    EcReducer::Min => new_value.min(tx.value),
97                    EcReducer::Max => new_value.max(tx.value),
98                    _ => config.reducer.apply(new_value, -tx.value, count),
99                };
100                new_value = negated;
101                count += 1;
102            }
103            EcOperation::Set => {
104                new_value = tx.value;
105                count = 0;
106            }
107        }
108    }
109
110    // Write the consolidated value back to the target entity
111    write_field_value(
112        store,
113        &config.collection,
114        target_id,
115        &config.field,
116        new_value,
117    )?;
118
119    Ok(transactions.len() as u64)
120}
121
122fn read_field_value(
123    store: &UnifiedStore,
124    collection: &str,
125    entity_id: u64,
126    field: &str,
127) -> Option<f64> {
128    let manager = store.get_collection(collection)?;
129    let entity = manager.get(EntityId::new(entity_id))?;
130    let row = entity.data.as_row()?;
131    let value = row.get_field(field)?;
132    match value {
133        Value::Float(f) => Some(*f),
134        Value::Integer(n) => Some(*n as f64),
135        Value::UnsignedInteger(n) => Some(*n as f64),
136        _ => None,
137    }
138}
139
140fn write_field_value(
141    store: &UnifiedStore,
142    collection: &str,
143    entity_id: u64,
144    field: &str,
145    value: f64,
146) -> Result<(), String> {
147    let manager = store
148        .get_collection(collection)
149        .ok_or_else(|| format!("collection '{}' not found", collection))?;
150
151    let mut entity = manager
152        .get(EntityId::new(entity_id))
153        .ok_or_else(|| format!("entity {} not found in '{}'", entity_id, collection))?;
154
155    if let EntityData::Row(ref mut row) = entity.data {
156        if let Some(ref mut named) = row.named {
157            named.insert(field.to_string(), Value::Float(value));
158        }
159    }
160
161    manager
162        .update(entity)
163        .map_err(|e| format!("update failed: {:?}", e))?;
164
165    Ok(())
166}
167
168pub fn get_ec_status(store: &UnifiedStore, config: &EcFieldConfig, target_id: u64) -> EcStatus {
169    let consolidated = read_field_value(store, &config.collection, target_id, &config.field)
170        .unwrap_or(config.initial_value);
171
172    let tx_collection = config.tx_collection_name();
173    let pending = query_pending_transactions(store, &tx_collection, Some(target_id));
174
175    let pending_value: f64 = pending
176        .iter()
177        .map(|(_, tx)| match tx.operation {
178            EcOperation::Add => tx.value,
179            EcOperation::Sub => -tx.value,
180            EcOperation::Set => 0.0,
181        })
182        .sum();
183
184    let has_set = pending
185        .iter()
186        .any(|(_, tx)| tx.operation == EcOperation::Set);
187
188    EcStatus {
189        consolidated,
190        pending_value,
191        pending_transactions: pending.len() as u64,
192        has_pending_set: has_set,
193        field: config.field.clone(),
194        collection: config.collection.clone(),
195        reducer: config.reducer.as_str().to_string(),
196        mode: if config.mode == super::config::EcMode::Sync {
197            "sync"
198        } else {
199            "async"
200        }
201        .to_string(),
202    }
203}
204
205#[derive(Debug, Clone)]
206pub struct EcStatus {
207    pub consolidated: f64,
208    pub pending_value: f64,
209    pub pending_transactions: u64,
210    pub has_pending_set: bool,
211    pub field: String,
212    pub collection: String,
213    pub reducer: String,
214    pub mode: String,
215}