Skip to main content

cynos_storage/
row_store.rs

1//! Row storage for Cynos database.
2//!
3//! This module provides the `RowStore` struct which manages rows for a single table,
4//! including primary key and secondary index maintenance.
5
6use alloc::collections::BTreeMap;
7use alloc::format;
8use alloc::rc::Rc;
9use alloc::string::{String, ToString};
10use alloc::vec::Vec;
11use cynos_core::schema::Table;
12use cynos_core::{Error, Result, Row, RowId, Value};
13use cynos_incremental::Delta;
14use cynos_index::{BTreeIndex, GinIndex, HashIndex, Index, KeyRange, RangeIndex};
15
16/// Row storage backend: HashMap (O(1) lookup) or BTreeMap (O(log n) lookup).
17#[cfg(feature = "hash-store")]
18type RowMap = hashbrown::HashMap<RowId, Rc<Row>>;
19#[cfg(not(feature = "hash-store"))]
20type RowMap = BTreeMap<RowId, Rc<Row>>;
21
22/// Trait for index storage that supports both point and range queries.
23pub trait IndexStore {
24    /// Adds a key-value pair to the index.
25    fn add(&mut self, key: Value, row_id: RowId) -> core::result::Result<(), cynos_index::IndexError>;
26    /// Sets a key-value pair, replacing any existing values.
27    fn set(&mut self, key: Value, row_id: RowId);
28    /// Gets all row IDs for a key.
29    fn get(&self, key: &Value) -> Vec<RowId>;
30    /// Removes a key-value pair.
31    fn remove(&mut self, key: &Value, row_id: Option<RowId>);
32    /// Checks if the index contains a key.
33    fn contains_key(&self, key: &Value) -> bool;
34    /// Returns the number of entries.
35    fn len(&self) -> usize;
36    /// Returns true if empty.
37    fn is_empty(&self) -> bool {
38        self.len() == 0
39    }
40    /// Clears all entries.
41    fn clear(&mut self);
42    /// Gets range of row IDs.
43    fn get_range(&self, range: Option<&KeyRange<Value>>, reverse: bool, limit: Option<usize>, skip: usize) -> Vec<RowId>;
44    /// Returns all row IDs in the index.
45    fn get_all(&self) -> Vec<RowId>;
46}
47
48/// Wrapper for BTreeIndex that implements IndexStore.
49pub struct BTreeIndexStore {
50    inner: BTreeIndex<Value>,
51}
52
53impl BTreeIndexStore {
54    /// Creates a new BTree index store.
55    pub fn new(unique: bool) -> Self {
56        Self {
57            inner: BTreeIndex::new(64, unique),
58        }
59    }
60}
61
62impl IndexStore for BTreeIndexStore {
63    fn add(&mut self, key: Value, row_id: RowId) -> core::result::Result<(), cynos_index::IndexError> {
64        self.inner.add(key, row_id)
65    }
66
67    fn set(&mut self, key: Value, row_id: RowId) {
68        self.inner.set(key, row_id);
69    }
70
71    fn get(&self, key: &Value) -> Vec<RowId> {
72        self.inner.get(key)
73    }
74
75    fn remove(&mut self, key: &Value, row_id: Option<RowId>) {
76        self.inner.remove(key, row_id);
77    }
78
79    fn contains_key(&self, key: &Value) -> bool {
80        self.inner.contains_key(key)
81    }
82
83    fn len(&self) -> usize {
84        self.inner.len()
85    }
86
87    fn clear(&mut self) {
88        self.inner.clear();
89    }
90
91    fn get_range(&self, range: Option<&KeyRange<Value>>, reverse: bool, limit: Option<usize>, skip: usize) -> Vec<RowId> {
92        self.inner.get_range(range, reverse, limit, skip)
93    }
94
95    fn get_all(&self) -> Vec<RowId> {
96        self.inner.get_range(None, false, None, 0)
97    }
98}
99
100/// Wrapper for HashIndex that implements IndexStore.
101pub struct HashIndexStore {
102    inner: HashIndex<Value>,
103}
104
105impl HashIndexStore {
106    /// Creates a new Hash index store.
107    pub fn new(unique: bool) -> Self {
108        Self {
109            inner: HashIndex::new(unique),
110        }
111    }
112}
113
114impl IndexStore for HashIndexStore {
115    fn add(&mut self, key: Value, row_id: RowId) -> core::result::Result<(), cynos_index::IndexError> {
116        self.inner.add(key, row_id)
117    }
118
119    fn set(&mut self, key: Value, row_id: RowId) {
120        self.inner.set(key, row_id);
121    }
122
123    fn get(&self, key: &Value) -> Vec<RowId> {
124        self.inner.get(key)
125    }
126
127    fn remove(&mut self, key: &Value, row_id: Option<RowId>) {
128        self.inner.remove(key, row_id);
129    }
130
131    fn contains_key(&self, key: &Value) -> bool {
132        self.inner.contains_key(key)
133    }
134
135    fn len(&self) -> usize {
136        self.inner.len()
137    }
138
139    fn clear(&mut self) {
140        self.inner.clear();
141    }
142
143    fn get_range(&self, _range: Option<&KeyRange<Value>>, _reverse: bool, _limit: Option<usize>, _skip: usize) -> Vec<RowId> {
144        self.get_all()
145    }
146
147    fn get_all(&self) -> Vec<RowId> {
148        self.inner.get_all_row_ids()
149    }
150}
151
152/// Extracts the key value from a row for the given column indices.
153fn extract_key(row: &Row, col_indices: &[usize]) -> Value {
154    if col_indices.len() == 1 {
155        row.get(col_indices[0]).cloned().unwrap_or(Value::Null)
156    } else {
157        let values: Vec<Value> = col_indices
158            .iter()
159            .map(|&i| row.get(i).cloned().unwrap_or(Value::Null))
160            .collect();
161        let key_str: String = values
162            .iter()
163            .map(|v| format!("{:?}", v))
164            .collect::<Vec<_>>()
165            .join("|");
166        Value::String(key_str)
167    }
168}
169
170/// Row storage for a single table.
171pub struct RowStore {
172    schema: Table,
173    rows: RowMap,
174    row_id_index: BTreeIndexStore,
175    primary_index: Option<BTreeIndexStore>,
176    pk_columns: Vec<usize>,
177    secondary_indices: BTreeMap<String, BTreeIndexStore>,
178    index_columns: BTreeMap<String, Vec<usize>>,
179    /// GIN indexes for JSONB columns
180    gin_indices: BTreeMap<String, GinIndex>,
181    /// Column indices for GIN indexes
182    gin_index_columns: BTreeMap<String, usize>,
183}
184
185impl RowStore {
186    /// Creates a new row store for the given table schema.
187    pub fn new(schema: Table) -> Self {
188        let mut store = Self {
189            schema: schema.clone(),
190            rows: RowMap::default(),
191            row_id_index: BTreeIndexStore::new(true),
192            primary_index: None,
193            pk_columns: Vec::new(),
194            secondary_indices: BTreeMap::new(),
195            index_columns: BTreeMap::new(),
196            gin_indices: BTreeMap::new(),
197            gin_index_columns: BTreeMap::new(),
198        };
199
200        if let Some(pk) = schema.primary_key() {
201            store.primary_index = Some(BTreeIndexStore::new(true));
202            store.pk_columns = pk
203                .columns()
204                .iter()
205                .filter_map(|c| schema.get_column_index(&c.name))
206                .collect();
207        }
208
209        for idx in schema.indices() {
210            let cols: Vec<usize> = idx
211                .columns()
212                .iter()
213                .filter_map(|c| schema.get_column_index(&c.name))
214                .collect();
215
216            // Check if this is a GIN index (for JSONB columns)
217            if idx.get_index_type() == cynos_core::schema::IndexType::Gin {
218                if let Some(&col_idx) = cols.first() {
219                    store.gin_indices.insert(idx.name().to_string(), GinIndex::new());
220                    store.gin_index_columns.insert(idx.name().to_string(), col_idx);
221                }
222            } else {
223                store.secondary_indices.insert(
224                    idx.name().to_string(),
225                    BTreeIndexStore::new(idx.is_unique()),
226                );
227                store.index_columns.insert(idx.name().to_string(), cols);
228            }
229        }
230
231        store
232    }
233
234    /// Returns the table schema.
235    pub fn schema(&self) -> &Table {
236        &self.schema
237    }
238
239    /// Returns the number of rows.
240    pub fn len(&self) -> usize {
241        self.rows.len()
242    }
243
244    /// Returns true if the store is empty.
245    pub fn is_empty(&self) -> bool {
246        self.rows.is_empty()
247    }
248
249    /// Inserts a row into the store.
250    pub fn insert(&mut self, row: Row) -> Result<RowId> {
251        let row_id = row.id();
252
253        if self.rows.contains_key(&row_id) {
254            return Err(Error::invalid_operation("Row ID already exists"));
255        }
256
257        // Check primary key uniqueness
258        let pk_value = if !self.pk_columns.is_empty() {
259            let pk = extract_key(&row, &self.pk_columns);
260            if let Some(ref pk_index) = self.primary_index {
261                if pk_index.contains_key(&pk) {
262                    return Err(Error::UniqueConstraint {
263                        column: "primary_key".into(),
264                        value: pk,
265                    });
266                }
267            }
268            Some(pk)
269        } else {
270            None
271        };
272
273        // Add to row ID index
274        self.row_id_index
275            .add(Value::Int64(row_id as i64), row_id)
276            .map_err(|_| Error::invalid_operation("Failed to add to row ID index"))?;
277
278        // Add to primary key index
279        if let (Some(ref mut pk_index), Some(pk)) = (&mut self.primary_index, pk_value.clone()) {
280            if pk_index.add(pk.clone(), row_id).is_err() {
281                self.row_id_index.remove(&Value::Int64(row_id as i64), Some(row_id));
282                return Err(Error::UniqueConstraint {
283                    column: "primary_key".into(),
284                    value: pk,
285                });
286            }
287        }
288
289        // Add to secondary indices
290        // Collect index names first to avoid borrow conflict
291        let index_names: Vec<String> = self.index_columns.keys().cloned().collect();
292        for idx_name in &index_names {
293            let cols = &self.index_columns[idx_name];
294            let key = extract_key(&row, cols);
295            if let Some(idx) = self.secondary_indices.get_mut(idx_name) {
296                if idx.add(key.clone(), row_id).is_err() {
297                    self.rollback_insert(row_id, &row);
298                    return Err(Error::UniqueConstraint {
299                        column: idx_name.clone(),
300                        value: key,
301                    });
302                }
303            }
304        }
305
306        // Add to GIN indices
307        let gin_index_names: Vec<String> = self.gin_index_columns.keys().cloned().collect();
308        for idx_name in &gin_index_names {
309            let col_idx = self.gin_index_columns[idx_name];
310            if let Some(gin_idx) = self.gin_indices.get_mut(idx_name) {
311                if let Some(value) = row.get(col_idx) {
312                    Self::index_jsonb_value(gin_idx, value, row_id);
313                }
314            }
315        }
316
317        self.rows.insert(row_id, Rc::new(row));
318        Ok(row_id)
319    }
320
321    fn rollback_insert(&mut self, row_id: RowId, row: &Row) {
322        self.row_id_index.remove(&Value::Int64(row_id as i64), Some(row_id));
323
324        if let Some(ref mut pk_index) = self.primary_index {
325            let pk_value = extract_key(row, &self.pk_columns);
326            pk_index.remove(&pk_value, Some(row_id));
327        }
328
329        let index_names: Vec<String> = self.index_columns.keys().cloned().collect();
330        for idx_name in &index_names {
331            let cols = &self.index_columns[idx_name];
332            let key = extract_key(row, cols);
333            if let Some(idx) = self.secondary_indices.get_mut(idx_name) {
334                idx.remove(&key, Some(row_id));
335            }
336        }
337    }
338
339    /// Updates a row in the store.
340    pub fn update(&mut self, row_id: RowId, new_row: Row) -> Result<()> {
341        let old_row = self.rows.get(&row_id).ok_or_else(|| {
342            Error::not_found(self.schema.name(), Value::Int64(row_id as i64))
343        })?.clone();
344
345        // Check primary key uniqueness if PK changed
346        if !self.pk_columns.is_empty() {
347            let old_pk = extract_key(&old_row, &self.pk_columns);
348            let new_pk = extract_key(&new_row, &self.pk_columns);
349            if let Some(ref pk_index) = self.primary_index {
350                if old_pk != new_pk && pk_index.contains_key(&new_pk) {
351                    return Err(Error::UniqueConstraint {
352                        column: "primary_key".into(),
353                        value: new_pk,
354                    });
355                }
356            }
357        }
358
359        // Check secondary index uniqueness
360        for (idx_name, cols) in &self.index_columns {
361            let old_key = extract_key(&old_row, cols);
362            let new_key = extract_key(&new_row, cols);
363            if let Some(idx) = self.secondary_indices.get(idx_name) {
364                if old_key != new_key && idx.contains_key(&new_key) {
365                    return Err(Error::UniqueConstraint {
366                        column: idx_name.clone(),
367                        value: new_key,
368                    });
369                }
370            }
371        }
372
373        // Update primary key index
374        if !self.pk_columns.is_empty() {
375            let old_pk = extract_key(&old_row, &self.pk_columns);
376            let new_pk = extract_key(&new_row, &self.pk_columns);
377            if let Some(ref mut pk_index) = self.primary_index {
378                if old_pk != new_pk {
379                    pk_index.remove(&old_pk, Some(row_id));
380                    let _ = pk_index.add(new_pk, row_id);
381                }
382            }
383        }
384
385        // Update secondary indices
386        let index_names: Vec<String> = self.index_columns.keys().cloned().collect();
387        for idx_name in &index_names {
388            let cols = &self.index_columns[idx_name];
389            let old_key = extract_key(&old_row, cols);
390            let new_key = extract_key(&new_row, cols);
391            if let Some(idx) = self.secondary_indices.get_mut(idx_name) {
392                if old_key != new_key {
393                    idx.remove(&old_key, Some(row_id));
394                    let _ = idx.add(new_key, row_id);
395                }
396            }
397        }
398
399        self.rows.insert(row_id, Rc::new(new_row));
400        Ok(())
401    }
402
403    /// Deletes a row from the store.
404    pub fn delete(&mut self, row_id: RowId) -> Result<Rc<Row>> {
405        let row = self.rows.remove(&row_id).ok_or_else(|| {
406            Error::not_found(self.schema.name(), Value::Int64(row_id as i64))
407        })?;
408
409        self.row_id_index.remove(&Value::Int64(row_id as i64), Some(row_id));
410
411        if !self.pk_columns.is_empty() {
412            let pk_value = extract_key(&row, &self.pk_columns);
413            if let Some(ref mut pk_index) = self.primary_index {
414                pk_index.remove(&pk_value, Some(row_id));
415            }
416        }
417
418        let index_names: Vec<String> = self.index_columns.keys().cloned().collect();
419        for idx_name in &index_names {
420            let cols = &self.index_columns[idx_name];
421            let key = extract_key(&row, cols);
422            if let Some(idx) = self.secondary_indices.get_mut(idx_name) {
423                idx.remove(&key, Some(row_id));
424            }
425        }
426
427        Ok(row)
428    }
429
430    /// Gets a row by ID.
431    pub fn get(&self, row_id: RowId) -> Option<Rc<Row>> {
432        self.rows.get(&row_id).cloned()
433    }
434
435    /// Gets a mutable reference to a row by ID (requires exclusive access).
436    /// Note: This clones the Rc and returns a new Row if mutation is needed.
437    pub fn get_mut(&mut self, row_id: RowId) -> Option<&mut Row> {
438        self.rows.get_mut(&row_id).map(|rc| Rc::make_mut(rc))
439    }
440
441    /// Returns an iterator over all rows.
442    pub fn scan(&self) -> impl Iterator<Item = Rc<Row>> + '_ {
443        self.rows.values().cloned()
444    }
445
446    /// Returns all row IDs.
447    pub fn row_ids(&self) -> Vec<RowId> {
448        self.rows.keys().copied().collect()
449    }
450
451    /// Gets rows by primary key value.
452    pub fn get_by_pk(&self, pk_value: &Value) -> Vec<Rc<Row>> {
453        if let Some(ref pk_index) = self.primary_index {
454            pk_index
455                .get(pk_value)
456                .iter()
457                .filter_map(|&id| self.rows.get(&id).cloned())
458                .collect()
459        } else {
460            Vec::new()
461        }
462    }
463
464    /// Finds existing row ID by primary key.
465    pub fn find_row_id_by_pk(&self, row: &Row) -> Option<RowId> {
466        if let Some(ref pk_index) = self.primary_index {
467            let pk_value = extract_key(row, &self.pk_columns);
468            pk_index.get(&pk_value).first().copied()
469        } else {
470            None
471        }
472    }
473
474    /// Checks if a primary key value exists.
475    pub fn pk_exists(&self, pk_value: &Value) -> bool {
476        if let Some(ref pk_index) = self.primary_index {
477            pk_index.contains_key(pk_value)
478        } else {
479            false
480        }
481    }
482
483    /// Gets rows by index scan.
484    pub fn index_scan(&self, index_name: &str, range: Option<&KeyRange<Value>>) -> Vec<Rc<Row>> {
485        if let Some(idx) = self.secondary_indices.get(index_name) {
486            idx.get_range(range, false, None, 0)
487                .iter()
488                .filter_map(|&id| self.rows.get(&id).cloned())
489                .collect()
490        } else {
491            Vec::new()
492        }
493    }
494
495    /// Gets rows by index scan with limit.
496    pub fn index_scan_with_limit(
497        &self,
498        index_name: &str,
499        range: Option<&KeyRange<Value>>,
500        limit: Option<usize>,
501    ) -> Vec<Rc<Row>> {
502        self.index_scan_with_limit_offset(index_name, range, limit, 0)
503    }
504
505    /// Gets rows by index scan with limit and offset.
506    /// This enables true pushdown of LIMIT/OFFSET to the storage layer.
507    pub fn index_scan_with_limit_offset(
508        &self,
509        index_name: &str,
510        range: Option<&KeyRange<Value>>,
511        limit: Option<usize>,
512        offset: usize,
513    ) -> Vec<Rc<Row>> {
514        self.index_scan_with_options(index_name, range, limit, offset, false)
515    }
516
517    /// Gets rows by index scan with limit, offset, and reverse option.
518    /// This enables true pushdown of LIMIT/OFFSET/ORDER to the storage layer.
519    pub fn index_scan_with_options(
520        &self,
521        index_name: &str,
522        range: Option<&KeyRange<Value>>,
523        limit: Option<usize>,
524        offset: usize,
525        reverse: bool,
526    ) -> Vec<Rc<Row>> {
527        if let Some(idx) = self.secondary_indices.get(index_name) {
528            idx.get_range(range, reverse, limit, offset)
529                .iter()
530                .filter_map(|&id| self.rows.get(&id).cloned())
531                .collect()
532        } else {
533            Vec::new()
534        }
535    }
536
537    /// Clears all rows and indices.
538    pub fn clear(&mut self) {
539        self.rows.clear();
540        self.row_id_index.clear();
541        if let Some(ref mut pk_index) = self.primary_index {
542            pk_index.clear();
543        }
544        for idx in self.secondary_indices.values_mut() {
545            idx.clear();
546        }
547    }
548
549    /// Gets multiple rows by IDs.
550    pub fn get_many(&self, row_ids: &[RowId]) -> Vec<Option<Rc<Row>>> {
551        row_ids.iter().map(|&id| self.rows.get(&id).cloned()).collect()
552    }
553
554    /// Inserts a row or replaces an existing row with the same primary key.
555    /// Returns the row ID and whether it was a replacement.
556    pub fn insert_or_replace(&mut self, row: Row) -> Result<(RowId, bool)> {
557        // Check if a row with the same PK already exists
558        if let Some(existing_row_id) = self.find_row_id_by_pk(&row) {
559            // Replace: update the existing row, preserving the original row ID
560            let updated_row = Row::new(existing_row_id, row.values().to_vec());
561            self.update(existing_row_id, updated_row)?;
562            Ok((existing_row_id, true))
563        } else {
564            // Insert: add as new row
565            let row_id = self.insert(row)?;
566            Ok((row_id, false))
567        }
568    }
569
570    /// Checks if a secondary index contains a key (for unique constraint checking).
571    pub fn secondary_index_contains(&self, index_name: &str, key: &Value) -> bool {
572        if let Some(idx) = self.secondary_indices.get(index_name) {
573            idx.contains_key(key)
574        } else {
575            false
576        }
577    }
578
579    /// Gets the primary key columns indices.
580    pub fn pk_columns(&self) -> &[usize] {
581        &self.pk_columns
582    }
583
584    /// Extracts the primary key value from a row.
585    pub fn extract_pk(&self, row: &Row) -> Option<Value> {
586        if self.pk_columns.is_empty() {
587            None
588        } else {
589            Some(extract_key(row, &self.pk_columns))
590        }
591    }
592
593    /// Inserts a row and returns a Delta for IVM propagation.
594    pub fn insert_with_delta(&mut self, row: Row) -> Result<Delta<Row>> {
595        let row_clone = row.clone();
596        self.insert(row)?;
597        Ok(Delta::insert(row_clone))
598    }
599
600    /// Deletes a row and returns a Delta for IVM propagation.
601    pub fn delete_with_delta(&mut self, row_id: RowId) -> Result<Delta<Row>> {
602        let row = self.delete(row_id)?;
603        Ok(Delta::delete((*row).clone()))
604    }
605
606    /// Updates a row and returns Deltas for IVM propagation (delete old + insert new).
607    pub fn update_with_delta(&mut self, row_id: RowId, new_row: Row) -> Result<(Delta<Row>, Delta<Row>)> {
608        let old_row = self.rows.get(&row_id).ok_or_else(|| {
609            Error::not_found(self.schema.name(), Value::Int64(row_id as i64))
610        })?.clone();
611        let new_row_clone = new_row.clone();
612        self.update(row_id, new_row)?;
613        Ok((Delta::delete((*old_row).clone()), Delta::insert(new_row_clone)))
614    }
615
616    // ========== GIN Index Methods ==========
617
618    /// Indexes a JSONB value into the GIN index.
619    fn index_jsonb_value(gin_idx: &mut GinIndex, value: &Value, row_id: RowId) {
620        if let Value::Jsonb(jsonb) = value {
621            // Parse JSON and extract key-value pairs
622            if let Ok(json_str) = core::str::from_utf8(&jsonb.0) {
623                Self::extract_and_index_json(gin_idx, json_str, row_id);
624            }
625        }
626    }
627
628    /// Extracts key-value pairs from JSON and adds them to the GIN index.
629    fn extract_and_index_json(gin_idx: &mut GinIndex, json_str: &str, row_id: RowId) {
630        // Simple JSON parsing for top-level key-value pairs
631        let trimmed = json_str.trim();
632        if !trimmed.starts_with('{') || !trimmed.ends_with('}') {
633            return;
634        }
635
636        let inner = &trimmed[1..trimmed.len() - 1];
637        let mut depth = 0;
638        let mut in_string = false;
639        let mut escape = false;
640        let mut start = 0;
641
642        for (i, c) in inner.char_indices() {
643            if escape {
644                escape = false;
645                continue;
646            }
647            match c {
648                '\\' if in_string => escape = true,
649                '"' => in_string = !in_string,
650                '{' | '[' if !in_string => depth += 1,
651                '}' | ']' if !in_string => depth -= 1,
652                ',' if !in_string && depth == 0 => {
653                    Self::parse_and_index_pair(gin_idx, &inner[start..i], row_id);
654                    start = i + 1;
655                }
656                _ => {}
657            }
658        }
659        // Parse last pair
660        if start < inner.len() {
661            Self::parse_and_index_pair(gin_idx, &inner[start..], row_id);
662        }
663    }
664
665    /// Parses a key-value pair and adds it to the GIN index.
666    fn parse_and_index_pair(gin_idx: &mut GinIndex, pair: &str, row_id: RowId) {
667        let pair = pair.trim();
668        if let Some(colon_pos) = pair.find(':') {
669            let key = pair[..colon_pos].trim().trim_matches('"');
670            let value = pair[colon_pos + 1..].trim();
671
672            // Add key to index
673            gin_idx.add_key(key.into(), row_id);
674
675            // Add key-value pair to index (for string values)
676            let value_str = if value.starts_with('"') && value.ends_with('"') {
677                &value[1..value.len() - 1]
678            } else {
679                value
680            };
681            gin_idx.add_key_value(key.into(), value_str.into(), row_id);
682        }
683    }
684
685    /// Removes JSONB value from the GIN index.
686    #[allow(dead_code)]
687    fn remove_jsonb_from_gin(gin_idx: &mut GinIndex, value: &Value, row_id: RowId) {
688        if let Value::Jsonb(jsonb) = value {
689            if let Ok(json_str) = core::str::from_utf8(&jsonb.0) {
690                Self::extract_and_remove_json(gin_idx, json_str, row_id);
691            }
692        }
693    }
694
695    /// Extracts key-value pairs from JSON and removes them from the GIN index.
696    fn extract_and_remove_json(gin_idx: &mut GinIndex, json_str: &str, row_id: RowId) {
697        let trimmed = json_str.trim();
698        if !trimmed.starts_with('{') || !trimmed.ends_with('}') {
699            return;
700        }
701
702        let inner = &trimmed[1..trimmed.len() - 1];
703        let mut depth = 0;
704        let mut in_string = false;
705        let mut escape = false;
706        let mut start = 0;
707
708        for (i, c) in inner.char_indices() {
709            if escape {
710                escape = false;
711                continue;
712            }
713            match c {
714                '\\' if in_string => escape = true,
715                '"' => in_string = !in_string,
716                '{' | '[' if !in_string => depth += 1,
717                '}' | ']' if !in_string => depth -= 1,
718                ',' if !in_string && depth == 0 => {
719                    Self::parse_and_remove_pair(gin_idx, &inner[start..i], row_id);
720                    start = i + 1;
721                }
722                _ => {}
723            }
724        }
725        if start < inner.len() {
726            Self::parse_and_remove_pair(gin_idx, &inner[start..], row_id);
727        }
728    }
729
730    /// Parses a key-value pair and removes it from the GIN index.
731    fn parse_and_remove_pair(gin_idx: &mut GinIndex, pair: &str, row_id: RowId) {
732        let pair = pair.trim();
733        if let Some(colon_pos) = pair.find(':') {
734            let key = pair[..colon_pos].trim().trim_matches('"');
735            let value = pair[colon_pos + 1..].trim();
736
737            gin_idx.remove_key(key, row_id);
738
739            let value_str = if value.starts_with('"') && value.ends_with('"') {
740                &value[1..value.len() - 1]
741            } else {
742                value
743            };
744            gin_idx.remove_key_value(key, value_str, row_id);
745        }
746    }
747
748    /// Queries the GIN index by key-value pair.
749    pub fn gin_index_get_by_key_value(&self, index_name: &str, key: &str, value: &str) -> Vec<Rc<Row>> {
750        if let Some(gin_idx) = self.gin_indices.get(index_name) {
751            let row_ids = gin_idx.get_by_key_value(key, value);
752            row_ids.iter().filter_map(|&id| self.rows.get(&id).cloned()).collect()
753        } else {
754            Vec::new()
755        }
756    }
757
758    /// Queries the GIN index by key existence.
759    pub fn gin_index_get_by_key(&self, index_name: &str, key: &str) -> Vec<Rc<Row>> {
760        if let Some(gin_idx) = self.gin_indices.get(index_name) {
761            let row_ids = gin_idx.get_by_key(key);
762            row_ids.iter().filter_map(|&id| self.rows.get(&id).cloned()).collect()
763        } else {
764            Vec::new()
765        }
766    }
767
768    /// Queries the GIN index by multiple key-value pairs (AND query).
769    /// Returns rows that match ALL of the given key-value pairs.
770    pub fn gin_index_get_by_key_values_all(&self, index_name: &str, pairs: &[(&str, &str)]) -> Vec<Rc<Row>> {
771        if let Some(gin_idx) = self.gin_indices.get(index_name) {
772            let row_ids = gin_idx.get_by_key_values_all(pairs);
773            row_ids.iter().filter_map(|&id| self.rows.get(&id).cloned()).collect()
774        } else {
775            Vec::new()
776        }
777    }
778}
779
780#[cfg(test)]
781mod tests {
782    use super::*;
783    use cynos_core::schema::TableBuilder;
784    use cynos_core::DataType;
785    use alloc::vec;
786
787    fn test_schema() -> Table {
788        TableBuilder::new("test")
789            .unwrap()
790            .add_column("id", DataType::Int64)
791            .unwrap()
792            .add_column("name", DataType::String)
793            .unwrap()
794            .add_primary_key(&["id"], false)
795            .unwrap()
796            .build()
797            .unwrap()
798    }
799
800    fn test_schema_with_index() -> Table {
801        TableBuilder::new("test")
802            .unwrap()
803            .add_column("id", DataType::Int64)
804            .unwrap()
805            .add_column("value", DataType::Int64)
806            .unwrap()
807            .add_primary_key(&["id"], false)
808            .unwrap()
809            .add_index("idx_value", &["value"], false)
810            .unwrap()
811            .build()
812            .unwrap()
813    }
814
815    #[test]
816    fn test_row_store_insert() {
817        let mut store = RowStore::new(test_schema());
818        let row = Row::new(1, vec![Value::Int64(1), Value::String("Alice".into())]);
819        assert!(store.insert(row).is_ok());
820        assert_eq!(store.len(), 1);
821    }
822
823    #[test]
824    fn test_row_store_get() {
825        let mut store = RowStore::new(test_schema());
826        let row = Row::new(1, vec![Value::Int64(1), Value::String("Alice".into())]);
827        store.insert(row).unwrap();
828        let retrieved = store.get(1);
829        assert!(retrieved.is_some());
830        assert_eq!(retrieved.unwrap().get(1), Some(&Value::String("Alice".into())));
831    }
832
833    #[test]
834    fn test_row_store_update() {
835        let mut store = RowStore::new(test_schema());
836        let row = Row::new(1, vec![Value::Int64(1), Value::String("Alice".into())]);
837        store.insert(row).unwrap();
838        let new_row = Row::new(1, vec![Value::Int64(1), Value::String("Bob".into())]);
839        assert!(store.update(1, new_row).is_ok());
840        let retrieved = store.get(1);
841        assert_eq!(retrieved.unwrap().get(1), Some(&Value::String("Bob".into())));
842    }
843
844    #[test]
845    fn test_row_store_delete() {
846        let mut store = RowStore::new(test_schema());
847        let row = Row::new(1, vec![Value::Int64(1), Value::String("Alice".into())]);
848        store.insert(row).unwrap();
849        assert!(store.delete(1).is_ok());
850        assert_eq!(store.len(), 0);
851        assert!(store.get(1).is_none());
852    }
853
854    #[test]
855    fn test_row_store_pk_uniqueness() {
856        let mut store = RowStore::new(test_schema());
857        let row1 = Row::new(1, vec![Value::Int64(1), Value::String("Alice".into())]);
858        let row2 = Row::new(2, vec![Value::Int64(1), Value::String("Bob".into())]);
859        store.insert(row1).unwrap();
860        assert!(store.insert(row2).is_err());
861    }
862
863    #[test]
864    fn test_row_store_scan() {
865        let mut store = RowStore::new(test_schema());
866        store.insert(Row::new(1, vec![Value::Int64(1), Value::String("Alice".into())])).unwrap();
867        store.insert(Row::new(2, vec![Value::Int64(2), Value::String("Bob".into())])).unwrap();
868        let rows: Vec<_> = store.scan().collect();
869        assert_eq!(rows.len(), 2);
870    }
871
872    #[test]
873    fn test_row_store_index_maintenance() {
874        let mut store = RowStore::new(test_schema_with_index());
875        let row = Row::new(1, vec![Value::Int64(1), Value::Int64(100)]);
876        store.insert(row).unwrap();
877        let results = store.index_scan("idx_value", Some(&KeyRange::only(Value::Int64(100))));
878        assert_eq!(results.len(), 1);
879        store.delete(1).unwrap();
880        let results = store.index_scan("idx_value", Some(&KeyRange::only(Value::Int64(100))));
881        assert_eq!(results.len(), 0);
882    }
883
884    #[test]
885    fn test_row_store_clear() {
886        let mut store = RowStore::new(test_schema());
887        store.insert(Row::new(1, vec![Value::Int64(1), Value::String("Alice".into())])).unwrap();
888        store.insert(Row::new(2, vec![Value::Int64(2), Value::String("Bob".into())])).unwrap();
889        store.clear();
890        assert!(store.is_empty());
891    }
892
893    // === Additional tests for better coverage ===
894
895    fn test_schema_composite_pk() -> Table {
896        TableBuilder::new("test")
897            .unwrap()
898            .add_column("id1", DataType::String)
899            .unwrap()
900            .add_column("id2", DataType::Int64)
901            .unwrap()
902            .add_column("name", DataType::String)
903            .unwrap()
904            .add_primary_key(&["id1", "id2"], false)
905            .unwrap()
906            .build()
907            .unwrap()
908    }
909
910    fn test_schema_with_unique_index() -> Table {
911        TableBuilder::new("test")
912            .unwrap()
913            .add_column("id", DataType::Int64)
914            .unwrap()
915            .add_column("email", DataType::String)
916            .unwrap()
917            .add_primary_key(&["id"], false)
918            .unwrap()
919            .add_index("idx_email", &["email"], true) // unique index
920            .unwrap()
921            .build()
922            .unwrap()
923    }
924
925    #[test]
926    fn test_composite_primary_key() {
927        let mut store = RowStore::new(test_schema_composite_pk());
928
929        let row1 = Row::new(1, vec![
930            Value::String("pk1".into()),
931            Value::Int64(100),
932            Value::String("Name1".into())
933        ]);
934        assert!(store.insert(row1).is_ok());
935
936        // Same id1, different id2 - should succeed
937        let row2 = Row::new(2, vec![
938            Value::String("pk1".into()),
939            Value::Int64(200),
940            Value::String("Name2".into())
941        ]);
942        assert!(store.insert(row2).is_ok());
943
944        // Same composite key - should fail
945        let row3 = Row::new(3, vec![
946            Value::String("pk1".into()),
947            Value::Int64(100),
948            Value::String("Name3".into())
949        ]);
950        assert!(store.insert(row3).is_err());
951    }
952
953    #[test]
954    fn test_insert_or_replace_new() {
955        let mut store = RowStore::new(test_schema());
956        let row = Row::new(1, vec![Value::Int64(1), Value::String("Alice".into())]);
957
958        let (row_id, replaced) = store.insert_or_replace(row).unwrap();
959        assert_eq!(row_id, 1);
960        assert!(!replaced);
961        assert_eq!(store.len(), 1);
962    }
963
964    #[test]
965    fn test_insert_or_replace_existing() {
966        let mut store = RowStore::new(test_schema());
967
968        // Insert first row
969        let row1 = Row::new(1, vec![Value::Int64(1), Value::String("Alice".into())]);
970        store.insert(row1).unwrap();
971
972        // Replace with same PK but different row ID
973        let row2 = Row::new(2, vec![Value::Int64(1), Value::String("Updated".into())]);
974        let (row_id, replaced) = store.insert_or_replace(row2).unwrap();
975
976        assert_eq!(row_id, 1); // Should preserve original row ID
977        assert!(replaced);
978        assert_eq!(store.len(), 1);
979
980        let stored = store.get(1).unwrap();
981        assert_eq!(stored.get(1), Some(&Value::String("Updated".into())));
982    }
983
984    #[test]
985    fn test_update_pk_violation() {
986        let mut store = RowStore::new(test_schema());
987
988        // Insert two rows
989        let row1 = Row::new(1, vec![Value::Int64(1), Value::String("Alice".into())]);
990        let row2 = Row::new(2, vec![Value::Int64(2), Value::String("Bob".into())]);
991        store.insert(row1).unwrap();
992        store.insert(row2).unwrap();
993
994        // Try to update row2 to have the same PK as row1
995        let row2_updated = Row::new(2, vec![Value::Int64(1), Value::String("Bob Updated".into())]);
996        let result = store.update(2, row2_updated);
997        assert!(result.is_err());
998    }
999
1000    #[test]
1001    fn test_unique_index_violation() {
1002        let mut store = RowStore::new(test_schema_with_unique_index());
1003
1004        let row1 = Row::new(1, vec![Value::Int64(1), Value::String("alice@test.com".into())]);
1005        store.insert(row1).unwrap();
1006
1007        // Try to insert with same email (unique index violation)
1008        let row2 = Row::new(2, vec![Value::Int64(2), Value::String("alice@test.com".into())]);
1009        let result = store.insert(row2);
1010        assert!(result.is_err());
1011    }
1012
1013    #[test]
1014    fn test_unique_index_update_violation() {
1015        let mut store = RowStore::new(test_schema_with_unique_index());
1016
1017        let row1 = Row::new(1, vec![Value::Int64(1), Value::String("alice@test.com".into())]);
1018        let row2 = Row::new(2, vec![Value::Int64(2), Value::String("bob@test.com".into())]);
1019        store.insert(row1).unwrap();
1020        store.insert(row2).unwrap();
1021
1022        // Try to update row2 to have the same email as row1
1023        let row2_updated = Row::new(2, vec![Value::Int64(2), Value::String("alice@test.com".into())]);
1024        let result = store.update(2, row2_updated);
1025        assert!(result.is_err());
1026    }
1027
1028    #[test]
1029    fn test_delete_then_insert_same_pk() {
1030        let mut store = RowStore::new(test_schema());
1031
1032        // Insert and delete
1033        let row1 = Row::new(1, vec![Value::Int64(100), Value::String("Alice".into())]);
1034        store.insert(row1).unwrap();
1035        store.delete(1).unwrap();
1036
1037        // Insert with same PK should succeed
1038        let row2 = Row::new(2, vec![Value::Int64(100), Value::String("Bob".into())]);
1039        assert!(store.insert(row2).is_ok());
1040        assert_eq!(store.len(), 1);
1041    }
1042
1043    #[test]
1044    fn test_index_update_maintenance() {
1045        let mut store = RowStore::new(test_schema_with_index());
1046
1047        // Insert row
1048        let row = Row::new(1, vec![Value::Int64(1), Value::Int64(100)]);
1049        store.insert(row).unwrap();
1050
1051        // Verify index has the value
1052        let results = store.index_scan("idx_value", Some(&KeyRange::only(Value::Int64(100))));
1053        assert_eq!(results.len(), 1);
1054
1055        // Update the indexed value
1056        let updated = Row::new(1, vec![Value::Int64(1), Value::Int64(200)]);
1057        store.update(1, updated).unwrap();
1058
1059        // Old value should not be in index
1060        let results = store.index_scan("idx_value", Some(&KeyRange::only(Value::Int64(100))));
1061        assert_eq!(results.len(), 0);
1062
1063        // New value should be in index
1064        let results = store.index_scan("idx_value", Some(&KeyRange::only(Value::Int64(200))));
1065        assert_eq!(results.len(), 1);
1066    }
1067
1068    // === Delta integration tests ===
1069
1070    #[test]
1071    fn test_insert_with_delta() {
1072        let mut store = RowStore::new(test_schema());
1073        let row = Row::new(1, vec![Value::Int64(1), Value::String("Alice".into())]);
1074        let delta = store.insert_with_delta(row.clone()).unwrap();
1075
1076        assert_eq!(delta.diff(), 1);
1077        assert_eq!(delta.data(), &row);
1078        assert_eq!(store.len(), 1);
1079    }
1080
1081    #[test]
1082    fn test_delete_with_delta() {
1083        let mut store = RowStore::new(test_schema());
1084        let row = Row::new(1, vec![Value::Int64(1), Value::String("Alice".into())]);
1085        store.insert(row.clone()).unwrap();
1086
1087        let delta = store.delete_with_delta(1).unwrap();
1088        assert_eq!(delta.diff(), -1);
1089        assert_eq!(delta.data(), &row);
1090        assert_eq!(store.len(), 0);
1091    }
1092
1093    #[test]
1094    fn test_update_with_delta() {
1095        let mut store = RowStore::new(test_schema());
1096        let old_row = Row::new(1, vec![Value::Int64(1), Value::String("Alice".into())]);
1097        store.insert(old_row.clone()).unwrap();
1098
1099        let new_row = Row::new(1, vec![Value::Int64(1), Value::String("Bob".into())]);
1100        let (delete_delta, insert_delta) = store.update_with_delta(1, new_row.clone()).unwrap();
1101
1102        assert_eq!(delete_delta.diff(), -1);
1103        assert_eq!(delete_delta.data(), &old_row);
1104        assert_eq!(insert_delta.diff(), 1);
1105        assert_eq!(insert_delta.data(), &new_row);
1106        assert_eq!(store.len(), 1);
1107    }
1108}