Skip to main content

cynos_database/
transaction.rs

1//! Transaction API for atomic database operations.
2//!
3//! This module provides transaction support with commit and rollback capabilities.
4
5use crate::convert::{js_array_to_rows, js_to_value};
6use crate::expr::Expr;
7use crate::query_builder::evaluate_predicate;
8use crate::reactive_bridge::QueryRegistry;
9use alloc::rc::Rc;
10use alloc::string::{String, ToString};
11use alloc::vec::Vec;
12use cynos_core::{reserve_row_ids, Row};
13use cynos_reactive::TableId;
14use cynos_storage::{TableCache, Transaction, TransactionState};
15use core::cell::RefCell;
16use hashbrown::HashSet;
17use wasm_bindgen::prelude::*;
18
19/// JavaScript-friendly transaction wrapper.
20#[wasm_bindgen]
21pub struct JsTransaction {
22    cache: Rc<RefCell<TableCache>>,
23    query_registry: Rc<RefCell<QueryRegistry>>,
24    table_id_map: Rc<RefCell<hashbrown::HashMap<String, TableId>>>,
25    inner: Option<Transaction>,
26    /// Pending changes: (table_id, changed_row_ids)
27    pending_changes: Vec<(TableId, HashSet<u64>)>,
28}
29
30impl JsTransaction {
31    pub(crate) fn new(
32        cache: Rc<RefCell<TableCache>>,
33        query_registry: Rc<RefCell<QueryRegistry>>,
34        table_id_map: Rc<RefCell<hashbrown::HashMap<String, TableId>>>,
35    ) -> Self {
36        Self {
37            cache,
38            query_registry,
39            table_id_map,
40            inner: Some(Transaction::begin()),
41            pending_changes: Vec::new(),
42        }
43    }
44}
45
46#[wasm_bindgen]
47impl JsTransaction {
48    /// Inserts rows into a table within the transaction.
49    pub fn insert(&mut self, table: &str, values: &JsValue) -> Result<(), JsValue> {
50        let tx = self
51            .inner
52            .as_mut()
53            .ok_or_else(|| JsValue::from_str("Transaction already completed"))?;
54
55        let mut cache = self.cache.borrow_mut();
56        let store = cache
57            .get_table_mut(table)
58            .ok_or_else(|| JsValue::from_str(&alloc::format!("Table not found: {}", table)))?;
59
60        let schema = store.schema().clone();
61
62        // Get the count of rows to insert first
63        let arr = js_sys::Array::from(values);
64        let row_count = arr.length() as u64;
65
66        // Reserve row IDs for all rows at once to avoid ID conflicts
67        let start_row_id = reserve_row_ids(row_count);
68
69        let rows = js_array_to_rows(values, &schema, start_row_id)?;
70
71        // Collect inserted row IDs
72        let mut inserted_ids = HashSet::new();
73
74        // Insert through transaction
75        for row in rows {
76            inserted_ids.insert(row.id());
77            tx.insert(&mut *cache, table, row)
78                .map_err(|e| JsValue::from_str(&alloc::format!("{:?}", e)))?;
79        }
80
81        // Store pending changes
82        if let Some(table_id) = self.table_id_map.borrow().get(table).copied() {
83            self.pending_changes.push((table_id, inserted_ids));
84        }
85
86        Ok(())
87    }
88
89    /// Updates rows in a table within the transaction.
90    pub fn update(
91        &mut self,
92        table: &str,
93        set_values: &JsValue,
94        predicate: Option<Expr>,
95    ) -> Result<usize, JsValue> {
96        let tx = self
97            .inner
98            .as_mut()
99            .ok_or_else(|| JsValue::from_str("Transaction already completed"))?;
100
101        let mut cache = self.cache.borrow_mut();
102        let store = cache
103            .get_table_mut(table)
104            .ok_or_else(|| JsValue::from_str(&alloc::format!("Table not found: {}", table)))?;
105
106        let schema = store.schema().clone();
107
108        // Parse set values
109        let set_obj = set_values
110            .dyn_ref::<js_sys::Object>()
111            .ok_or_else(|| JsValue::from_str("set_values must be an object"))?;
112
113        let keys = js_sys::Object::keys(set_obj);
114        let mut updates: Vec<(String, JsValue)> = Vec::new();
115        for key in keys.iter() {
116            if let Some(k) = key.as_string() {
117                let val = js_sys::Reflect::get(set_obj, &key).unwrap_or(JsValue::NULL);
118                updates.push((k, val));
119            }
120        }
121
122        // Find rows to update
123        let rows_to_update: Vec<Row> = store
124            .scan()
125            .filter(|row| {
126                if let Some(ref pred) = predicate {
127                    evaluate_predicate(pred, &**row, &schema)
128                } else {
129                    true
130                }
131            })
132            .map(|rc| (*rc).clone())
133            .collect();
134
135        let mut updated_ids = HashSet::new();
136        let mut update_count = 0;
137
138        for old_row in rows_to_update {
139            let mut new_values = old_row.values().to_vec();
140
141            for (col_name, js_val) in &updates {
142                if let Some(col) = schema.get_column(col_name) {
143                    let idx = col.index();
144                    let value = js_to_value(js_val, col.data_type())?;
145                    if idx < new_values.len() {
146                        new_values[idx] = value;
147                    }
148                }
149            }
150
151            // Create new row with incremented version
152            let new_version = old_row.version().wrapping_add(1);
153            let new_row = Row::new_with_version(old_row.id(), new_version, new_values);
154
155            updated_ids.insert(old_row.id());
156
157            tx.update(&mut *cache, table, old_row.id(), new_row)
158                .map_err(|e| JsValue::from_str(&alloc::format!("{:?}", e)))?;
159
160            update_count += 1;
161        }
162
163        if let Some(table_id) = self.table_id_map.borrow().get(table).copied() {
164            self.pending_changes.push((table_id, updated_ids));
165        }
166
167        Ok(update_count)
168    }
169
170    /// Deletes rows from a table within the transaction.
171    pub fn delete(&mut self, table: &str, predicate: Option<Expr>) -> Result<usize, JsValue> {
172        let tx = self
173            .inner
174            .as_mut()
175            .ok_or_else(|| JsValue::from_str("Transaction already completed"))?;
176
177        let mut cache = self.cache.borrow_mut();
178        let store = cache
179            .get_table_mut(table)
180            .ok_or_else(|| JsValue::from_str(&alloc::format!("Table not found: {}", table)))?;
181
182        let schema = store.schema().clone();
183
184        // Find rows to delete
185        let rows_to_delete: Vec<Row> = store
186            .scan()
187            .filter(|row| {
188                if let Some(ref pred) = predicate {
189                    evaluate_predicate(pred, &**row, &schema)
190                } else {
191                    true
192                }
193            })
194            .map(|rc| (*rc).clone())
195            .collect();
196
197        let delete_count = rows_to_delete.len();
198
199        let mut deleted_ids = HashSet::new();
200        for row in rows_to_delete {
201            deleted_ids.insert(row.id());
202            tx.delete(&mut *cache, table, row.id())
203                .map_err(|e| JsValue::from_str(&alloc::format!("{:?}", e)))?;
204        }
205
206        if let Some(table_id) = self.table_id_map.borrow().get(table).copied() {
207            self.pending_changes.push((table_id, deleted_ids));
208        }
209
210        Ok(delete_count)
211    }
212
213    /// Commits the transaction.
214    pub fn commit(&mut self) -> Result<(), JsValue> {
215        let tx = self
216            .inner
217            .take()
218            .ok_or_else(|| JsValue::from_str("Transaction already completed"))?;
219
220        tx.commit()
221            .map_err(|e| JsValue::from_str(&alloc::format!("{:?}", e)))?;
222
223        // Notify query registry of all changes
224        for (table_id, changed_ids) in self.pending_changes.drain(..) {
225            self.query_registry
226                .borrow_mut()
227                .on_table_change(table_id, &changed_ids);
228        }
229
230        Ok(())
231    }
232
233    /// Rolls back the transaction.
234    pub fn rollback(&mut self) -> Result<(), JsValue> {
235        let tx = self
236            .inner
237            .take()
238            .ok_or_else(|| JsValue::from_str("Transaction already completed"))?;
239
240        let mut cache = self.cache.borrow_mut();
241        tx.rollback(&mut *cache)
242            .map_err(|e| JsValue::from_str(&alloc::format!("{:?}", e)))?;
243
244        // Notify Live Query of rollback changes (data was restored)
245        for (table_id, changed_ids) in self.pending_changes.drain(..) {
246            self.query_registry
247                .borrow_mut()
248                .on_table_change(table_id, &changed_ids);
249        }
250
251        Ok(())
252    }
253
254    /// Returns whether the transaction is still active.
255    #[wasm_bindgen(getter)]
256    pub fn active(&self) -> bool {
257        self.inner.is_some()
258    }
259
260    /// Returns the transaction state.
261    #[wasm_bindgen(getter)]
262    pub fn state(&self) -> String {
263        match &self.inner {
264            Some(tx) => match tx.state() {
265                TransactionState::Active => "active".to_string(),
266                TransactionState::Committed => "committed".to_string(),
267                TransactionState::RolledBack => "rolledback".to_string(),
268            },
269            None => "completed".to_string(),
270        }
271    }
272}
273
274#[cfg(test)]
275mod tests {
276    use super::*;
277    use crate::database::Database;
278    use crate::table::ColumnOptions;
279    use crate::JsDataType;
280    use wasm_bindgen_test::*;
281
282    wasm_bindgen_test_configure!(run_in_browser);
283
284    fn setup_db() -> Database {
285        let db = Database::new("test");
286        let builder = db
287            .create_table("users")
288            .column(
289                "id",
290                JsDataType::Int64,
291                Some(ColumnOptions::new().set_primary_key(true)),
292            )
293            .column("name", JsDataType::String, None)
294            .column("age", JsDataType::Int32, None);
295        db.register_table(&builder).unwrap();
296        db
297    }
298
299    #[wasm_bindgen_test]
300    fn test_transaction_insert_commit() {
301        let db = setup_db();
302        let mut tx = db.transaction();
303
304        let values = js_sys::JSON::parse(r#"[{"id": 1, "name": "Alice", "age": 25}]"#).unwrap();
305        tx.insert("users", &values).unwrap();
306        tx.commit().unwrap();
307
308        assert_eq!(db.total_row_count(), 1);
309    }
310
311    #[wasm_bindgen_test]
312    fn test_transaction_insert_rollback() {
313        let db = setup_db();
314        let mut tx = db.transaction();
315
316        let values = js_sys::JSON::parse(r#"[{"id": 1, "name": "Alice", "age": 25}]"#).unwrap();
317        tx.insert("users", &values).unwrap();
318        tx.rollback().unwrap();
319
320        assert_eq!(db.total_row_count(), 0);
321    }
322
323    #[wasm_bindgen_test]
324    fn test_transaction_state() {
325        let db = setup_db();
326        let mut tx = db.transaction();
327
328        assert!(tx.active());
329        assert_eq!(tx.state(), "active");
330
331        tx.commit().unwrap();
332
333        assert!(!tx.active());
334    }
335
336    #[wasm_bindgen_test]
337    fn test_transaction_multiple_operations() {
338        let db = setup_db();
339        let mut tx = db.transaction();
340
341        let values1 = js_sys::JSON::parse(r#"[{"id": 1, "name": "Alice", "age": 25}]"#).unwrap();
342        tx.insert("users", &values1).unwrap();
343
344        let values2 = js_sys::JSON::parse(r#"[{"id": 2, "name": "Bob", "age": 30}]"#).unwrap();
345        tx.insert("users", &values2).unwrap();
346
347        tx.commit().unwrap();
348
349        assert_eq!(db.total_row_count(), 2);
350    }
351}