1use std::sync::Arc;
2use std::time::{SystemTime, UNIX_EPOCH};
3
4use crate::storage::schema::Value;
5use crate::storage::unified::entity::{EntityData, EntityId, EntityKind, RowData, UnifiedEntity};
6use crate::storage::unified::store::UnifiedStore;
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9pub enum EcOperation {
10 Add,
11 Sub,
12 Set,
13}
14
15impl EcOperation {
16 pub fn as_str(&self) -> &'static str {
17 match self {
18 Self::Add => "add",
19 Self::Sub => "sub",
20 Self::Set => "set",
21 }
22 }
23
24 pub fn from_str(s: &str) -> Option<Self> {
25 match s.to_lowercase().as_str() {
26 "add" => Some(Self::Add),
27 "sub" => Some(Self::Sub),
28 "set" => Some(Self::Set),
29 _ => None,
30 }
31 }
32}
33
34#[derive(Debug, Clone)]
35pub struct EcTransaction {
36 pub target_id: u64,
37 pub field: String,
38 pub value: f64,
39 pub operation: EcOperation,
40 pub timestamp: u64,
41 pub cohort_hour: String,
42 pub applied: bool,
43 pub source: Option<String>,
44}
45
46fn now_ms() -> u64 {
47 SystemTime::now()
48 .duration_since(UNIX_EPOCH)
49 .unwrap_or_default()
50 .as_millis() as u64
51}
52
53fn cohort_hour_from_ms(ms: u64) -> String {
54 let secs = ms / 1000;
55 let hours = secs / 3600;
56 let days = hours / 24;
57 let remaining_hours = hours % 24;
58
59 let epoch_days = days as i64;
60 let year = 1970 + (epoch_days * 400 / 146097) as u32;
61 let month = ((epoch_days % 365) / 30 + 1).min(12) as u32;
62 let day = ((epoch_days % 365) % 30 + 1).min(28) as u32;
63
64 format!("{:04}-{:02}-{:02}T{:02}", year, month, day, remaining_hours)
65}
66
67pub fn create_transaction(
68 store: &UnifiedStore,
69 tx_collection: &str,
70 target_id: u64,
71 field: &str,
72 value: f64,
73 operation: EcOperation,
74 source: Option<&str>,
75) -> Result<EntityId, String> {
76 let _ = store.get_or_create_collection(tx_collection);
77
78 let timestamp = now_ms();
79 let cohort = cohort_hour_from_ms(timestamp);
80
81 let mut named = std::collections::HashMap::new();
82 named.insert("target_id".to_string(), Value::UnsignedInteger(target_id));
83 named.insert("field".to_string(), Value::text(field.to_string()));
84 named.insert("value".to_string(), Value::Float(value));
85 named.insert(
86 "operation".to_string(),
87 Value::text(operation.as_str().to_string()),
88 );
89 named.insert("timestamp".to_string(), Value::UnsignedInteger(timestamp));
90 named.insert("cohort_hour".to_string(), Value::text(cohort));
91 named.insert("applied".to_string(), Value::Boolean(false));
92 if let Some(src) = source {
93 named.insert("source".to_string(), Value::text(src.to_string()));
94 }
95
96 let entity = UnifiedEntity::new(
97 EntityId::new(0),
98 EntityKind::TableRow {
99 table: Arc::from(tx_collection),
100 row_id: 0,
101 },
102 EntityData::Row(RowData {
103 columns: Vec::new(),
104 named: Some(named),
105 schema: None,
106 }),
107 );
108
109 store
110 .insert_auto(tx_collection, entity)
111 .map_err(|e| format!("ec transaction insert failed: {:?}", e))
112}
113
114pub fn query_pending_transactions(
115 store: &UnifiedStore,
116 tx_collection: &str,
117 target_id: Option<u64>,
118) -> Vec<(EntityId, EcTransaction)> {
119 let manager = match store.get_collection(tx_collection) {
120 Some(m) => m,
121 None => return Vec::new(),
122 };
123
124 let mut results = Vec::new();
125
126 manager.for_each_entity(|entity| {
127 let row = match entity.data.as_row() {
128 Some(r) => r,
129 None => return true,
130 };
131
132 let applied = row
133 .get_field("applied")
134 .and_then(|v| match v {
135 Value::Boolean(b) => Some(*b),
136 _ => None,
137 })
138 .unwrap_or(false);
139
140 if applied {
141 return true;
142 }
143
144 let tid = row
145 .get_field("target_id")
146 .and_then(|v| match v {
147 Value::UnsignedInteger(n) => Some(*n),
148 Value::Integer(n) => Some(*n as u64),
149 _ => None,
150 })
151 .unwrap_or(0);
152
153 if let Some(filter_id) = target_id {
154 if tid != filter_id {
155 return true;
156 }
157 }
158
159 let field = row
160 .get_field("field")
161 .and_then(|v| match v {
162 Value::Text(s) => Some(s.to_string()),
163 _ => None,
164 })
165 .unwrap_or_default();
166
167 let value = row
168 .get_field("value")
169 .and_then(|v| match v {
170 Value::Float(f) => Some(*f),
171 Value::Integer(n) => Some(*n as f64),
172 Value::UnsignedInteger(n) => Some(*n as f64),
173 _ => None,
174 })
175 .unwrap_or(0.0);
176
177 let operation = row
178 .get_field("operation")
179 .and_then(|v| match v {
180 Value::Text(s) => EcOperation::from_str(s.as_ref()),
181 _ => None,
182 })
183 .unwrap_or(EcOperation::Add);
184
185 let timestamp = row
186 .get_field("timestamp")
187 .and_then(|v| match v {
188 Value::UnsignedInteger(n) => Some(*n),
189 Value::Integer(n) => Some(*n as u64),
190 _ => None,
191 })
192 .unwrap_or(0);
193
194 let cohort_hour = row
195 .get_field("cohort_hour")
196 .and_then(|v| match v {
197 Value::Text(s) => Some(s.to_string()),
198 _ => None,
199 })
200 .unwrap_or_default();
201
202 let source = row.get_field("source").and_then(|v| match v {
203 Value::Text(s) => Some(s.to_string()),
204 _ => None,
205 });
206
207 results.push((
208 entity.id,
209 EcTransaction {
210 target_id: tid,
211 field,
212 value,
213 operation,
214 timestamp,
215 cohort_hour,
216 applied: false,
217 source,
218 },
219 ));
220
221 true
222 });
223
224 results.sort_by_key(|(_, tx)| tx.timestamp);
225 results
226}
227
228pub fn mark_transactions_applied(
229 store: &UnifiedStore,
230 tx_collection: &str,
231 entity_ids: &[EntityId],
232) {
233 let manager = match store.get_collection(tx_collection) {
234 Some(m) => m,
235 None => return,
236 };
237
238 for &eid in entity_ids {
239 if let Some(mut entity) = manager.get(eid) {
240 if let EntityData::Row(ref mut row) = entity.data {
241 if let Some(ref mut named) = row.named {
242 named.insert("applied".to_string(), Value::Boolean(true));
243 }
244 }
245 let _ = manager.update(entity);
246 }
247 }
248}