Skip to main content

reddb_server/ec/
transactions.rs

1use std::sync::Arc;
2use std::time::{SystemTime, UNIX_EPOCH};
3
4use crate::storage::schema::Value;
5use crate::storage::unified::entity::{EntityData, EntityId, EntityKind, RowData, UnifiedEntity};
6use crate::storage::unified::store::UnifiedStore;
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9pub enum EcOperation {
10    Add,
11    Sub,
12    Set,
13}
14
15impl EcOperation {
16    pub fn as_str(&self) -> &'static str {
17        match self {
18            Self::Add => "add",
19            Self::Sub => "sub",
20            Self::Set => "set",
21        }
22    }
23
24    pub fn from_str(s: &str) -> Option<Self> {
25        match s.to_lowercase().as_str() {
26            "add" => Some(Self::Add),
27            "sub" => Some(Self::Sub),
28            "set" => Some(Self::Set),
29            _ => None,
30        }
31    }
32}
33
34#[derive(Debug, Clone)]
35pub struct EcTransaction {
36    pub target_id: u64,
37    pub field: String,
38    pub value: f64,
39    pub operation: EcOperation,
40    pub timestamp: u64,
41    pub cohort_hour: String,
42    pub applied: bool,
43    pub source: Option<String>,
44}
45
46fn now_ms() -> u64 {
47    SystemTime::now()
48        .duration_since(UNIX_EPOCH)
49        .unwrap_or_default()
50        .as_millis() as u64
51}
52
53fn cohort_hour_from_ms(ms: u64) -> String {
54    let secs = ms / 1000;
55    let hours = secs / 3600;
56    let days = hours / 24;
57    let remaining_hours = hours % 24;
58
59    let epoch_days = days as i64;
60    let year = 1970 + (epoch_days * 400 / 146097) as u32;
61    let month = ((epoch_days % 365) / 30 + 1).min(12) as u32;
62    let day = ((epoch_days % 365) % 30 + 1).min(28) as u32;
63
64    format!("{:04}-{:02}-{:02}T{:02}", year, month, day, remaining_hours)
65}
66
67pub fn create_transaction(
68    store: &UnifiedStore,
69    tx_collection: &str,
70    target_id: u64,
71    field: &str,
72    value: f64,
73    operation: EcOperation,
74    source: Option<&str>,
75) -> Result<EntityId, String> {
76    let _ = store.get_or_create_collection(tx_collection);
77
78    let timestamp = now_ms();
79    let cohort = cohort_hour_from_ms(timestamp);
80
81    let mut named = std::collections::HashMap::new();
82    named.insert("target_id".to_string(), Value::UnsignedInteger(target_id));
83    named.insert("field".to_string(), Value::text(field.to_string()));
84    named.insert("value".to_string(), Value::Float(value));
85    named.insert(
86        "operation".to_string(),
87        Value::text(operation.as_str().to_string()),
88    );
89    named.insert("timestamp".to_string(), Value::UnsignedInteger(timestamp));
90    named.insert("cohort_hour".to_string(), Value::text(cohort));
91    named.insert("applied".to_string(), Value::Boolean(false));
92    if let Some(src) = source {
93        named.insert("source".to_string(), Value::text(src.to_string()));
94    }
95
96    let entity = UnifiedEntity::new(
97        EntityId::new(0),
98        EntityKind::TableRow {
99            table: Arc::from(tx_collection),
100            row_id: 0,
101        },
102        EntityData::Row(RowData {
103            columns: Vec::new(),
104            named: Some(named),
105            schema: None,
106        }),
107    );
108
109    store
110        .insert_auto(tx_collection, entity)
111        .map_err(|e| format!("ec transaction insert failed: {:?}", e))
112}
113
114pub fn query_pending_transactions(
115    store: &UnifiedStore,
116    tx_collection: &str,
117    target_id: Option<u64>,
118) -> Vec<(EntityId, EcTransaction)> {
119    let manager = match store.get_collection(tx_collection) {
120        Some(m) => m,
121        None => return Vec::new(),
122    };
123
124    let mut results = Vec::new();
125
126    manager.for_each_entity(|entity| {
127        let row = match entity.data.as_row() {
128            Some(r) => r,
129            None => return true,
130        };
131
132        let applied = row
133            .get_field("applied")
134            .and_then(|v| match v {
135                Value::Boolean(b) => Some(*b),
136                _ => None,
137            })
138            .unwrap_or(false);
139
140        if applied {
141            return true;
142        }
143
144        let tid = row
145            .get_field("target_id")
146            .and_then(|v| match v {
147                Value::UnsignedInteger(n) => Some(*n),
148                Value::Integer(n) => Some(*n as u64),
149                _ => None,
150            })
151            .unwrap_or(0);
152
153        if let Some(filter_id) = target_id {
154            if tid != filter_id {
155                return true;
156            }
157        }
158
159        let field = row
160            .get_field("field")
161            .and_then(|v| match v {
162                Value::Text(s) => Some(s.to_string()),
163                _ => None,
164            })
165            .unwrap_or_default();
166
167        let value = row
168            .get_field("value")
169            .and_then(|v| match v {
170                Value::Float(f) => Some(*f),
171                Value::Integer(n) => Some(*n as f64),
172                Value::UnsignedInteger(n) => Some(*n as f64),
173                _ => None,
174            })
175            .unwrap_or(0.0);
176
177        let operation = row
178            .get_field("operation")
179            .and_then(|v| match v {
180                Value::Text(s) => EcOperation::from_str(s.as_ref()),
181                _ => None,
182            })
183            .unwrap_or(EcOperation::Add);
184
185        let timestamp = row
186            .get_field("timestamp")
187            .and_then(|v| match v {
188                Value::UnsignedInteger(n) => Some(*n),
189                Value::Integer(n) => Some(*n as u64),
190                _ => None,
191            })
192            .unwrap_or(0);
193
194        let cohort_hour = row
195            .get_field("cohort_hour")
196            .and_then(|v| match v {
197                Value::Text(s) => Some(s.to_string()),
198                _ => None,
199            })
200            .unwrap_or_default();
201
202        let source = row.get_field("source").and_then(|v| match v {
203            Value::Text(s) => Some(s.to_string()),
204            _ => None,
205        });
206
207        results.push((
208            entity.id,
209            EcTransaction {
210                target_id: tid,
211                field,
212                value,
213                operation,
214                timestamp,
215                cohort_hour,
216                applied: false,
217                source,
218            },
219        ));
220
221        true
222    });
223
224    results.sort_by_key(|(_, tx)| tx.timestamp);
225    results
226}
227
228pub fn mark_transactions_applied(
229    store: &UnifiedStore,
230    tx_collection: &str,
231    entity_ids: &[EntityId],
232) {
233    let manager = match store.get_collection(tx_collection) {
234        Some(m) => m,
235        None => return,
236    };
237
238    for &eid in entity_ids {
239        if let Some(mut entity) = manager.get(eid) {
240            if let EntityData::Row(ref mut row) = entity.data {
241                if let Some(ref mut named) = row.named {
242                    named.insert("applied".to_string(), Value::Boolean(true));
243                }
244            }
245            let _ = manager.update(entity);
246        }
247    }
248}