1use crate::convert::{js_array_to_rows, js_to_value};
6use crate::expr::Expr;
7use crate::query_builder::evaluate_predicate;
8use crate::reactive_bridge::QueryRegistry;
9use alloc::rc::Rc;
10use alloc::string::{String, ToString};
11use alloc::vec::Vec;
12use cynos_core::{reserve_row_ids, Row};
13use cynos_reactive::TableId;
14use cynos_storage::{TableCache, Transaction, TransactionState};
15use core::cell::RefCell;
16use hashbrown::HashSet;
17use wasm_bindgen::prelude::*;
18
19#[wasm_bindgen]
21pub struct JsTransaction {
22 cache: Rc<RefCell<TableCache>>,
23 query_registry: Rc<RefCell<QueryRegistry>>,
24 table_id_map: Rc<RefCell<hashbrown::HashMap<String, TableId>>>,
25 inner: Option<Transaction>,
26 pending_changes: Vec<(TableId, HashSet<u64>)>,
28}
29
30impl JsTransaction {
31 pub(crate) fn new(
32 cache: Rc<RefCell<TableCache>>,
33 query_registry: Rc<RefCell<QueryRegistry>>,
34 table_id_map: Rc<RefCell<hashbrown::HashMap<String, TableId>>>,
35 ) -> Self {
36 Self {
37 cache,
38 query_registry,
39 table_id_map,
40 inner: Some(Transaction::begin()),
41 pending_changes: Vec::new(),
42 }
43 }
44}
45
46#[wasm_bindgen]
47impl JsTransaction {
48 pub fn insert(&mut self, table: &str, values: &JsValue) -> Result<(), JsValue> {
50 let tx = self
51 .inner
52 .as_mut()
53 .ok_or_else(|| JsValue::from_str("Transaction already completed"))?;
54
55 let mut cache = self.cache.borrow_mut();
56 let store = cache
57 .get_table_mut(table)
58 .ok_or_else(|| JsValue::from_str(&alloc::format!("Table not found: {}", table)))?;
59
60 let schema = store.schema().clone();
61
62 let arr = js_sys::Array::from(values);
64 let row_count = arr.length() as u64;
65
66 let start_row_id = reserve_row_ids(row_count);
68
69 let rows = js_array_to_rows(values, &schema, start_row_id)?;
70
71 let mut inserted_ids = HashSet::new();
73
74 for row in rows {
76 inserted_ids.insert(row.id());
77 tx.insert(&mut *cache, table, row)
78 .map_err(|e| JsValue::from_str(&alloc::format!("{:?}", e)))?;
79 }
80
81 if let Some(table_id) = self.table_id_map.borrow().get(table).copied() {
83 self.pending_changes.push((table_id, inserted_ids));
84 }
85
86 Ok(())
87 }
88
89 pub fn update(
91 &mut self,
92 table: &str,
93 set_values: &JsValue,
94 predicate: Option<Expr>,
95 ) -> Result<usize, JsValue> {
96 let tx = self
97 .inner
98 .as_mut()
99 .ok_or_else(|| JsValue::from_str("Transaction already completed"))?;
100
101 let mut cache = self.cache.borrow_mut();
102 let store = cache
103 .get_table_mut(table)
104 .ok_or_else(|| JsValue::from_str(&alloc::format!("Table not found: {}", table)))?;
105
106 let schema = store.schema().clone();
107
108 let set_obj = set_values
110 .dyn_ref::<js_sys::Object>()
111 .ok_or_else(|| JsValue::from_str("set_values must be an object"))?;
112
113 let keys = js_sys::Object::keys(set_obj);
114 let mut updates: Vec<(String, JsValue)> = Vec::new();
115 for key in keys.iter() {
116 if let Some(k) = key.as_string() {
117 let val = js_sys::Reflect::get(set_obj, &key).unwrap_or(JsValue::NULL);
118 updates.push((k, val));
119 }
120 }
121
122 let rows_to_update: Vec<Row> = store
124 .scan()
125 .filter(|row| {
126 if let Some(ref pred) = predicate {
127 evaluate_predicate(pred, &**row, &schema)
128 } else {
129 true
130 }
131 })
132 .map(|rc| (*rc).clone())
133 .collect();
134
135 let mut updated_ids = HashSet::new();
136 let mut update_count = 0;
137
138 for old_row in rows_to_update {
139 let mut new_values = old_row.values().to_vec();
140
141 for (col_name, js_val) in &updates {
142 if let Some(col) = schema.get_column(col_name) {
143 let idx = col.index();
144 let value = js_to_value(js_val, col.data_type())?;
145 if idx < new_values.len() {
146 new_values[idx] = value;
147 }
148 }
149 }
150
151 let new_version = old_row.version().wrapping_add(1);
153 let new_row = Row::new_with_version(old_row.id(), new_version, new_values);
154
155 updated_ids.insert(old_row.id());
156
157 tx.update(&mut *cache, table, old_row.id(), new_row)
158 .map_err(|e| JsValue::from_str(&alloc::format!("{:?}", e)))?;
159
160 update_count += 1;
161 }
162
163 if let Some(table_id) = self.table_id_map.borrow().get(table).copied() {
164 self.pending_changes.push((table_id, updated_ids));
165 }
166
167 Ok(update_count)
168 }
169
170 pub fn delete(&mut self, table: &str, predicate: Option<Expr>) -> Result<usize, JsValue> {
172 let tx = self
173 .inner
174 .as_mut()
175 .ok_or_else(|| JsValue::from_str("Transaction already completed"))?;
176
177 let mut cache = self.cache.borrow_mut();
178 let store = cache
179 .get_table_mut(table)
180 .ok_or_else(|| JsValue::from_str(&alloc::format!("Table not found: {}", table)))?;
181
182 let schema = store.schema().clone();
183
184 let rows_to_delete: Vec<Row> = store
186 .scan()
187 .filter(|row| {
188 if let Some(ref pred) = predicate {
189 evaluate_predicate(pred, &**row, &schema)
190 } else {
191 true
192 }
193 })
194 .map(|rc| (*rc).clone())
195 .collect();
196
197 let delete_count = rows_to_delete.len();
198
199 let mut deleted_ids = HashSet::new();
200 for row in rows_to_delete {
201 deleted_ids.insert(row.id());
202 tx.delete(&mut *cache, table, row.id())
203 .map_err(|e| JsValue::from_str(&alloc::format!("{:?}", e)))?;
204 }
205
206 if let Some(table_id) = self.table_id_map.borrow().get(table).copied() {
207 self.pending_changes.push((table_id, deleted_ids));
208 }
209
210 Ok(delete_count)
211 }
212
213 pub fn commit(&mut self) -> Result<(), JsValue> {
215 let tx = self
216 .inner
217 .take()
218 .ok_or_else(|| JsValue::from_str("Transaction already completed"))?;
219
220 tx.commit()
221 .map_err(|e| JsValue::from_str(&alloc::format!("{:?}", e)))?;
222
223 for (table_id, changed_ids) in self.pending_changes.drain(..) {
225 self.query_registry
226 .borrow_mut()
227 .on_table_change(table_id, &changed_ids);
228 }
229
230 Ok(())
231 }
232
233 pub fn rollback(&mut self) -> Result<(), JsValue> {
235 let tx = self
236 .inner
237 .take()
238 .ok_or_else(|| JsValue::from_str("Transaction already completed"))?;
239
240 let mut cache = self.cache.borrow_mut();
241 tx.rollback(&mut *cache)
242 .map_err(|e| JsValue::from_str(&alloc::format!("{:?}", e)))?;
243
244 for (table_id, changed_ids) in self.pending_changes.drain(..) {
246 self.query_registry
247 .borrow_mut()
248 .on_table_change(table_id, &changed_ids);
249 }
250
251 Ok(())
252 }
253
254 #[wasm_bindgen(getter)]
256 pub fn active(&self) -> bool {
257 self.inner.is_some()
258 }
259
260 #[wasm_bindgen(getter)]
262 pub fn state(&self) -> String {
263 match &self.inner {
264 Some(tx) => match tx.state() {
265 TransactionState::Active => "active".to_string(),
266 TransactionState::Committed => "committed".to_string(),
267 TransactionState::RolledBack => "rolledback".to_string(),
268 },
269 None => "completed".to_string(),
270 }
271 }
272}
273
274#[cfg(test)]
275mod tests {
276 use super::*;
277 use crate::database::Database;
278 use crate::table::ColumnOptions;
279 use crate::JsDataType;
280 use wasm_bindgen_test::*;
281
282 wasm_bindgen_test_configure!(run_in_browser);
283
284 fn setup_db() -> Database {
285 let db = Database::new("test");
286 let builder = db
287 .create_table("users")
288 .column(
289 "id",
290 JsDataType::Int64,
291 Some(ColumnOptions::new().set_primary_key(true)),
292 )
293 .column("name", JsDataType::String, None)
294 .column("age", JsDataType::Int32, None);
295 db.register_table(&builder).unwrap();
296 db
297 }
298
299 #[wasm_bindgen_test]
300 fn test_transaction_insert_commit() {
301 let db = setup_db();
302 let mut tx = db.transaction();
303
304 let values = js_sys::JSON::parse(r#"[{"id": 1, "name": "Alice", "age": 25}]"#).unwrap();
305 tx.insert("users", &values).unwrap();
306 tx.commit().unwrap();
307
308 assert_eq!(db.total_row_count(), 1);
309 }
310
311 #[wasm_bindgen_test]
312 fn test_transaction_insert_rollback() {
313 let db = setup_db();
314 let mut tx = db.transaction();
315
316 let values = js_sys::JSON::parse(r#"[{"id": 1, "name": "Alice", "age": 25}]"#).unwrap();
317 tx.insert("users", &values).unwrap();
318 tx.rollback().unwrap();
319
320 assert_eq!(db.total_row_count(), 0);
321 }
322
323 #[wasm_bindgen_test]
324 fn test_transaction_state() {
325 let db = setup_db();
326 let mut tx = db.transaction();
327
328 assert!(tx.active());
329 assert_eq!(tx.state(), "active");
330
331 tx.commit().unwrap();
332
333 assert!(!tx.active());
334 }
335
336 #[wasm_bindgen_test]
337 fn test_transaction_multiple_operations() {
338 let db = setup_db();
339 let mut tx = db.transaction();
340
341 let values1 = js_sys::JSON::parse(r#"[{"id": 1, "name": "Alice", "age": 25}]"#).unwrap();
342 tx.insert("users", &values1).unwrap();
343
344 let values2 = js_sys::JSON::parse(r#"[{"id": 2, "name": "Bob", "age": 30}]"#).unwrap();
345 tx.insert("users", &values2).unwrap();
346
347 tx.commit().unwrap();
348
349 assert_eq!(db.total_row_count(), 2);
350 }
351}