1use alloc::collections::BTreeMap;
7use alloc::format;
8use alloc::rc::Rc;
9use alloc::string::{String, ToString};
10use alloc::vec::Vec;
11use cynos_core::schema::Table;
12use cynos_core::{Error, Result, Row, RowId, Value};
13use cynos_incremental::Delta;
14use cynos_index::{BTreeIndex, GinIndex, HashIndex, Index, KeyRange, RangeIndex};
15
16#[cfg(feature = "hash-store")]
18type RowMap = hashbrown::HashMap<RowId, Rc<Row>>;
19#[cfg(not(feature = "hash-store"))]
20type RowMap = BTreeMap<RowId, Rc<Row>>;
21
22pub trait IndexStore {
24 fn add(&mut self, key: Value, row_id: RowId) -> core::result::Result<(), cynos_index::IndexError>;
26 fn set(&mut self, key: Value, row_id: RowId);
28 fn get(&self, key: &Value) -> Vec<RowId>;
30 fn remove(&mut self, key: &Value, row_id: Option<RowId>);
32 fn contains_key(&self, key: &Value) -> bool;
34 fn len(&self) -> usize;
36 fn is_empty(&self) -> bool {
38 self.len() == 0
39 }
40 fn clear(&mut self);
42 fn get_range(&self, range: Option<&KeyRange<Value>>, reverse: bool, limit: Option<usize>, skip: usize) -> Vec<RowId>;
44 fn get_all(&self) -> Vec<RowId>;
46}
47
48pub struct BTreeIndexStore {
50 inner: BTreeIndex<Value>,
51}
52
53impl BTreeIndexStore {
54 pub fn new(unique: bool) -> Self {
56 Self {
57 inner: BTreeIndex::new(64, unique),
58 }
59 }
60}
61
62impl IndexStore for BTreeIndexStore {
63 fn add(&mut self, key: Value, row_id: RowId) -> core::result::Result<(), cynos_index::IndexError> {
64 self.inner.add(key, row_id)
65 }
66
67 fn set(&mut self, key: Value, row_id: RowId) {
68 self.inner.set(key, row_id);
69 }
70
71 fn get(&self, key: &Value) -> Vec<RowId> {
72 self.inner.get(key)
73 }
74
75 fn remove(&mut self, key: &Value, row_id: Option<RowId>) {
76 self.inner.remove(key, row_id);
77 }
78
79 fn contains_key(&self, key: &Value) -> bool {
80 self.inner.contains_key(key)
81 }
82
83 fn len(&self) -> usize {
84 self.inner.len()
85 }
86
87 fn clear(&mut self) {
88 self.inner.clear();
89 }
90
91 fn get_range(&self, range: Option<&KeyRange<Value>>, reverse: bool, limit: Option<usize>, skip: usize) -> Vec<RowId> {
92 self.inner.get_range(range, reverse, limit, skip)
93 }
94
95 fn get_all(&self) -> Vec<RowId> {
96 self.inner.get_range(None, false, None, 0)
97 }
98}
99
100pub struct HashIndexStore {
102 inner: HashIndex<Value>,
103}
104
105impl HashIndexStore {
106 pub fn new(unique: bool) -> Self {
108 Self {
109 inner: HashIndex::new(unique),
110 }
111 }
112}
113
114impl IndexStore for HashIndexStore {
115 fn add(&mut self, key: Value, row_id: RowId) -> core::result::Result<(), cynos_index::IndexError> {
116 self.inner.add(key, row_id)
117 }
118
119 fn set(&mut self, key: Value, row_id: RowId) {
120 self.inner.set(key, row_id);
121 }
122
123 fn get(&self, key: &Value) -> Vec<RowId> {
124 self.inner.get(key)
125 }
126
127 fn remove(&mut self, key: &Value, row_id: Option<RowId>) {
128 self.inner.remove(key, row_id);
129 }
130
131 fn contains_key(&self, key: &Value) -> bool {
132 self.inner.contains_key(key)
133 }
134
135 fn len(&self) -> usize {
136 self.inner.len()
137 }
138
139 fn clear(&mut self) {
140 self.inner.clear();
141 }
142
143 fn get_range(&self, _range: Option<&KeyRange<Value>>, _reverse: bool, _limit: Option<usize>, _skip: usize) -> Vec<RowId> {
144 self.get_all()
145 }
146
147 fn get_all(&self) -> Vec<RowId> {
148 self.inner.get_all_row_ids()
149 }
150}
151
152fn extract_key(row: &Row, col_indices: &[usize]) -> Value {
154 if col_indices.len() == 1 {
155 row.get(col_indices[0]).cloned().unwrap_or(Value::Null)
156 } else {
157 let values: Vec<Value> = col_indices
158 .iter()
159 .map(|&i| row.get(i).cloned().unwrap_or(Value::Null))
160 .collect();
161 let key_str: String = values
162 .iter()
163 .map(|v| format!("{:?}", v))
164 .collect::<Vec<_>>()
165 .join("|");
166 Value::String(key_str)
167 }
168}
169
170pub struct RowStore {
172 schema: Table,
173 rows: RowMap,
174 row_id_index: BTreeIndexStore,
175 primary_index: Option<BTreeIndexStore>,
176 pk_columns: Vec<usize>,
177 secondary_indices: BTreeMap<String, BTreeIndexStore>,
178 index_columns: BTreeMap<String, Vec<usize>>,
179 gin_indices: BTreeMap<String, GinIndex>,
181 gin_index_columns: BTreeMap<String, usize>,
183}
184
185impl RowStore {
186 pub fn new(schema: Table) -> Self {
188 let mut store = Self {
189 schema: schema.clone(),
190 rows: RowMap::default(),
191 row_id_index: BTreeIndexStore::new(true),
192 primary_index: None,
193 pk_columns: Vec::new(),
194 secondary_indices: BTreeMap::new(),
195 index_columns: BTreeMap::new(),
196 gin_indices: BTreeMap::new(),
197 gin_index_columns: BTreeMap::new(),
198 };
199
200 if let Some(pk) = schema.primary_key() {
201 store.primary_index = Some(BTreeIndexStore::new(true));
202 store.pk_columns = pk
203 .columns()
204 .iter()
205 .filter_map(|c| schema.get_column_index(&c.name))
206 .collect();
207 }
208
209 for idx in schema.indices() {
210 let cols: Vec<usize> = idx
211 .columns()
212 .iter()
213 .filter_map(|c| schema.get_column_index(&c.name))
214 .collect();
215
216 if idx.get_index_type() == cynos_core::schema::IndexType::Gin {
218 if let Some(&col_idx) = cols.first() {
219 store.gin_indices.insert(idx.name().to_string(), GinIndex::new());
220 store.gin_index_columns.insert(idx.name().to_string(), col_idx);
221 }
222 } else {
223 store.secondary_indices.insert(
224 idx.name().to_string(),
225 BTreeIndexStore::new(idx.is_unique()),
226 );
227 store.index_columns.insert(idx.name().to_string(), cols);
228 }
229 }
230
231 store
232 }
233
234 pub fn schema(&self) -> &Table {
236 &self.schema
237 }
238
239 pub fn len(&self) -> usize {
241 self.rows.len()
242 }
243
244 pub fn is_empty(&self) -> bool {
246 self.rows.is_empty()
247 }
248
249 pub fn insert(&mut self, row: Row) -> Result<RowId> {
251 let row_id = row.id();
252
253 if self.rows.contains_key(&row_id) {
254 return Err(Error::invalid_operation("Row ID already exists"));
255 }
256
257 let pk_value = if !self.pk_columns.is_empty() {
259 let pk = extract_key(&row, &self.pk_columns);
260 if let Some(ref pk_index) = self.primary_index {
261 if pk_index.contains_key(&pk) {
262 return Err(Error::UniqueConstraint {
263 column: "primary_key".into(),
264 value: pk,
265 });
266 }
267 }
268 Some(pk)
269 } else {
270 None
271 };
272
273 self.row_id_index
275 .add(Value::Int64(row_id as i64), row_id)
276 .map_err(|_| Error::invalid_operation("Failed to add to row ID index"))?;
277
278 if let (Some(ref mut pk_index), Some(pk)) = (&mut self.primary_index, pk_value.clone()) {
280 if pk_index.add(pk.clone(), row_id).is_err() {
281 self.row_id_index.remove(&Value::Int64(row_id as i64), Some(row_id));
282 return Err(Error::UniqueConstraint {
283 column: "primary_key".into(),
284 value: pk,
285 });
286 }
287 }
288
289 let index_names: Vec<String> = self.index_columns.keys().cloned().collect();
292 for idx_name in &index_names {
293 let cols = &self.index_columns[idx_name];
294 let key = extract_key(&row, cols);
295 if let Some(idx) = self.secondary_indices.get_mut(idx_name) {
296 if idx.add(key.clone(), row_id).is_err() {
297 self.rollback_insert(row_id, &row);
298 return Err(Error::UniqueConstraint {
299 column: idx_name.clone(),
300 value: key,
301 });
302 }
303 }
304 }
305
306 let gin_index_names: Vec<String> = self.gin_index_columns.keys().cloned().collect();
308 for idx_name in &gin_index_names {
309 let col_idx = self.gin_index_columns[idx_name];
310 if let Some(gin_idx) = self.gin_indices.get_mut(idx_name) {
311 if let Some(value) = row.get(col_idx) {
312 Self::index_jsonb_value(gin_idx, value, row_id);
313 }
314 }
315 }
316
317 self.rows.insert(row_id, Rc::new(row));
318 Ok(row_id)
319 }
320
321 fn rollback_insert(&mut self, row_id: RowId, row: &Row) {
322 self.row_id_index.remove(&Value::Int64(row_id as i64), Some(row_id));
323
324 if let Some(ref mut pk_index) = self.primary_index {
325 let pk_value = extract_key(row, &self.pk_columns);
326 pk_index.remove(&pk_value, Some(row_id));
327 }
328
329 let index_names: Vec<String> = self.index_columns.keys().cloned().collect();
330 for idx_name in &index_names {
331 let cols = &self.index_columns[idx_name];
332 let key = extract_key(row, cols);
333 if let Some(idx) = self.secondary_indices.get_mut(idx_name) {
334 idx.remove(&key, Some(row_id));
335 }
336 }
337 }
338
339 pub fn update(&mut self, row_id: RowId, new_row: Row) -> Result<()> {
341 let old_row = self.rows.get(&row_id).ok_or_else(|| {
342 Error::not_found(self.schema.name(), Value::Int64(row_id as i64))
343 })?.clone();
344
345 if !self.pk_columns.is_empty() {
347 let old_pk = extract_key(&old_row, &self.pk_columns);
348 let new_pk = extract_key(&new_row, &self.pk_columns);
349 if let Some(ref pk_index) = self.primary_index {
350 if old_pk != new_pk && pk_index.contains_key(&new_pk) {
351 return Err(Error::UniqueConstraint {
352 column: "primary_key".into(),
353 value: new_pk,
354 });
355 }
356 }
357 }
358
359 for (idx_name, cols) in &self.index_columns {
361 let old_key = extract_key(&old_row, cols);
362 let new_key = extract_key(&new_row, cols);
363 if let Some(idx) = self.secondary_indices.get(idx_name) {
364 if old_key != new_key && idx.contains_key(&new_key) {
365 return Err(Error::UniqueConstraint {
366 column: idx_name.clone(),
367 value: new_key,
368 });
369 }
370 }
371 }
372
373 if !self.pk_columns.is_empty() {
375 let old_pk = extract_key(&old_row, &self.pk_columns);
376 let new_pk = extract_key(&new_row, &self.pk_columns);
377 if let Some(ref mut pk_index) = self.primary_index {
378 if old_pk != new_pk {
379 pk_index.remove(&old_pk, Some(row_id));
380 let _ = pk_index.add(new_pk, row_id);
381 }
382 }
383 }
384
385 let index_names: Vec<String> = self.index_columns.keys().cloned().collect();
387 for idx_name in &index_names {
388 let cols = &self.index_columns[idx_name];
389 let old_key = extract_key(&old_row, cols);
390 let new_key = extract_key(&new_row, cols);
391 if let Some(idx) = self.secondary_indices.get_mut(idx_name) {
392 if old_key != new_key {
393 idx.remove(&old_key, Some(row_id));
394 let _ = idx.add(new_key, row_id);
395 }
396 }
397 }
398
399 self.rows.insert(row_id, Rc::new(new_row));
400 Ok(())
401 }
402
403 pub fn delete(&mut self, row_id: RowId) -> Result<Rc<Row>> {
405 let row = self.rows.remove(&row_id).ok_or_else(|| {
406 Error::not_found(self.schema.name(), Value::Int64(row_id as i64))
407 })?;
408
409 self.row_id_index.remove(&Value::Int64(row_id as i64), Some(row_id));
410
411 if !self.pk_columns.is_empty() {
412 let pk_value = extract_key(&row, &self.pk_columns);
413 if let Some(ref mut pk_index) = self.primary_index {
414 pk_index.remove(&pk_value, Some(row_id));
415 }
416 }
417
418 let index_names: Vec<String> = self.index_columns.keys().cloned().collect();
419 for idx_name in &index_names {
420 let cols = &self.index_columns[idx_name];
421 let key = extract_key(&row, cols);
422 if let Some(idx) = self.secondary_indices.get_mut(idx_name) {
423 idx.remove(&key, Some(row_id));
424 }
425 }
426
427 Ok(row)
428 }
429
430 pub fn get(&self, row_id: RowId) -> Option<Rc<Row>> {
432 self.rows.get(&row_id).cloned()
433 }
434
435 pub fn get_mut(&mut self, row_id: RowId) -> Option<&mut Row> {
438 self.rows.get_mut(&row_id).map(|rc| Rc::make_mut(rc))
439 }
440
441 pub fn scan(&self) -> impl Iterator<Item = Rc<Row>> + '_ {
443 self.rows.values().cloned()
444 }
445
446 pub fn row_ids(&self) -> Vec<RowId> {
448 self.rows.keys().copied().collect()
449 }
450
451 pub fn get_by_pk(&self, pk_value: &Value) -> Vec<Rc<Row>> {
453 if let Some(ref pk_index) = self.primary_index {
454 pk_index
455 .get(pk_value)
456 .iter()
457 .filter_map(|&id| self.rows.get(&id).cloned())
458 .collect()
459 } else {
460 Vec::new()
461 }
462 }
463
464 pub fn find_row_id_by_pk(&self, row: &Row) -> Option<RowId> {
466 if let Some(ref pk_index) = self.primary_index {
467 let pk_value = extract_key(row, &self.pk_columns);
468 pk_index.get(&pk_value).first().copied()
469 } else {
470 None
471 }
472 }
473
474 pub fn pk_exists(&self, pk_value: &Value) -> bool {
476 if let Some(ref pk_index) = self.primary_index {
477 pk_index.contains_key(pk_value)
478 } else {
479 false
480 }
481 }
482
483 pub fn index_scan(&self, index_name: &str, range: Option<&KeyRange<Value>>) -> Vec<Rc<Row>> {
485 if let Some(idx) = self.secondary_indices.get(index_name) {
486 idx.get_range(range, false, None, 0)
487 .iter()
488 .filter_map(|&id| self.rows.get(&id).cloned())
489 .collect()
490 } else {
491 Vec::new()
492 }
493 }
494
495 pub fn index_scan_with_limit(
497 &self,
498 index_name: &str,
499 range: Option<&KeyRange<Value>>,
500 limit: Option<usize>,
501 ) -> Vec<Rc<Row>> {
502 self.index_scan_with_limit_offset(index_name, range, limit, 0)
503 }
504
505 pub fn index_scan_with_limit_offset(
508 &self,
509 index_name: &str,
510 range: Option<&KeyRange<Value>>,
511 limit: Option<usize>,
512 offset: usize,
513 ) -> Vec<Rc<Row>> {
514 self.index_scan_with_options(index_name, range, limit, offset, false)
515 }
516
517 pub fn index_scan_with_options(
520 &self,
521 index_name: &str,
522 range: Option<&KeyRange<Value>>,
523 limit: Option<usize>,
524 offset: usize,
525 reverse: bool,
526 ) -> Vec<Rc<Row>> {
527 if let Some(idx) = self.secondary_indices.get(index_name) {
528 idx.get_range(range, reverse, limit, offset)
529 .iter()
530 .filter_map(|&id| self.rows.get(&id).cloned())
531 .collect()
532 } else {
533 Vec::new()
534 }
535 }
536
537 pub fn clear(&mut self) {
539 self.rows.clear();
540 self.row_id_index.clear();
541 if let Some(ref mut pk_index) = self.primary_index {
542 pk_index.clear();
543 }
544 for idx in self.secondary_indices.values_mut() {
545 idx.clear();
546 }
547 }
548
549 pub fn get_many(&self, row_ids: &[RowId]) -> Vec<Option<Rc<Row>>> {
551 row_ids.iter().map(|&id| self.rows.get(&id).cloned()).collect()
552 }
553
554 pub fn insert_or_replace(&mut self, row: Row) -> Result<(RowId, bool)> {
557 if let Some(existing_row_id) = self.find_row_id_by_pk(&row) {
559 let updated_row = Row::new(existing_row_id, row.values().to_vec());
561 self.update(existing_row_id, updated_row)?;
562 Ok((existing_row_id, true))
563 } else {
564 let row_id = self.insert(row)?;
566 Ok((row_id, false))
567 }
568 }
569
570 pub fn secondary_index_contains(&self, index_name: &str, key: &Value) -> bool {
572 if let Some(idx) = self.secondary_indices.get(index_name) {
573 idx.contains_key(key)
574 } else {
575 false
576 }
577 }
578
579 pub fn pk_columns(&self) -> &[usize] {
581 &self.pk_columns
582 }
583
584 pub fn extract_pk(&self, row: &Row) -> Option<Value> {
586 if self.pk_columns.is_empty() {
587 None
588 } else {
589 Some(extract_key(row, &self.pk_columns))
590 }
591 }
592
593 pub fn insert_with_delta(&mut self, row: Row) -> Result<Delta<Row>> {
595 let row_clone = row.clone();
596 self.insert(row)?;
597 Ok(Delta::insert(row_clone))
598 }
599
600 pub fn delete_with_delta(&mut self, row_id: RowId) -> Result<Delta<Row>> {
602 let row = self.delete(row_id)?;
603 Ok(Delta::delete((*row).clone()))
604 }
605
606 pub fn update_with_delta(&mut self, row_id: RowId, new_row: Row) -> Result<(Delta<Row>, Delta<Row>)> {
608 let old_row = self.rows.get(&row_id).ok_or_else(|| {
609 Error::not_found(self.schema.name(), Value::Int64(row_id as i64))
610 })?.clone();
611 let new_row_clone = new_row.clone();
612 self.update(row_id, new_row)?;
613 Ok((Delta::delete((*old_row).clone()), Delta::insert(new_row_clone)))
614 }
615
616 fn index_jsonb_value(gin_idx: &mut GinIndex, value: &Value, row_id: RowId) {
620 if let Value::Jsonb(jsonb) = value {
621 if let Ok(json_str) = core::str::from_utf8(&jsonb.0) {
623 Self::extract_and_index_json(gin_idx, json_str, row_id);
624 }
625 }
626 }
627
628 fn extract_and_index_json(gin_idx: &mut GinIndex, json_str: &str, row_id: RowId) {
630 let trimmed = json_str.trim();
632 if !trimmed.starts_with('{') || !trimmed.ends_with('}') {
633 return;
634 }
635
636 let inner = &trimmed[1..trimmed.len() - 1];
637 let mut depth = 0;
638 let mut in_string = false;
639 let mut escape = false;
640 let mut start = 0;
641
642 for (i, c) in inner.char_indices() {
643 if escape {
644 escape = false;
645 continue;
646 }
647 match c {
648 '\\' if in_string => escape = true,
649 '"' => in_string = !in_string,
650 '{' | '[' if !in_string => depth += 1,
651 '}' | ']' if !in_string => depth -= 1,
652 ',' if !in_string && depth == 0 => {
653 Self::parse_and_index_pair(gin_idx, &inner[start..i], row_id);
654 start = i + 1;
655 }
656 _ => {}
657 }
658 }
659 if start < inner.len() {
661 Self::parse_and_index_pair(gin_idx, &inner[start..], row_id);
662 }
663 }
664
665 fn parse_and_index_pair(gin_idx: &mut GinIndex, pair: &str, row_id: RowId) {
667 let pair = pair.trim();
668 if let Some(colon_pos) = pair.find(':') {
669 let key = pair[..colon_pos].trim().trim_matches('"');
670 let value = pair[colon_pos + 1..].trim();
671
672 gin_idx.add_key(key.into(), row_id);
674
675 let value_str = if value.starts_with('"') && value.ends_with('"') {
677 &value[1..value.len() - 1]
678 } else {
679 value
680 };
681 gin_idx.add_key_value(key.into(), value_str.into(), row_id);
682 }
683 }
684
685 #[allow(dead_code)]
687 fn remove_jsonb_from_gin(gin_idx: &mut GinIndex, value: &Value, row_id: RowId) {
688 if let Value::Jsonb(jsonb) = value {
689 if let Ok(json_str) = core::str::from_utf8(&jsonb.0) {
690 Self::extract_and_remove_json(gin_idx, json_str, row_id);
691 }
692 }
693 }
694
695 fn extract_and_remove_json(gin_idx: &mut GinIndex, json_str: &str, row_id: RowId) {
697 let trimmed = json_str.trim();
698 if !trimmed.starts_with('{') || !trimmed.ends_with('}') {
699 return;
700 }
701
702 let inner = &trimmed[1..trimmed.len() - 1];
703 let mut depth = 0;
704 let mut in_string = false;
705 let mut escape = false;
706 let mut start = 0;
707
708 for (i, c) in inner.char_indices() {
709 if escape {
710 escape = false;
711 continue;
712 }
713 match c {
714 '\\' if in_string => escape = true,
715 '"' => in_string = !in_string,
716 '{' | '[' if !in_string => depth += 1,
717 '}' | ']' if !in_string => depth -= 1,
718 ',' if !in_string && depth == 0 => {
719 Self::parse_and_remove_pair(gin_idx, &inner[start..i], row_id);
720 start = i + 1;
721 }
722 _ => {}
723 }
724 }
725 if start < inner.len() {
726 Self::parse_and_remove_pair(gin_idx, &inner[start..], row_id);
727 }
728 }
729
730 fn parse_and_remove_pair(gin_idx: &mut GinIndex, pair: &str, row_id: RowId) {
732 let pair = pair.trim();
733 if let Some(colon_pos) = pair.find(':') {
734 let key = pair[..colon_pos].trim().trim_matches('"');
735 let value = pair[colon_pos + 1..].trim();
736
737 gin_idx.remove_key(key, row_id);
738
739 let value_str = if value.starts_with('"') && value.ends_with('"') {
740 &value[1..value.len() - 1]
741 } else {
742 value
743 };
744 gin_idx.remove_key_value(key, value_str, row_id);
745 }
746 }
747
748 pub fn gin_index_get_by_key_value(&self, index_name: &str, key: &str, value: &str) -> Vec<Rc<Row>> {
750 if let Some(gin_idx) = self.gin_indices.get(index_name) {
751 let row_ids = gin_idx.get_by_key_value(key, value);
752 row_ids.iter().filter_map(|&id| self.rows.get(&id).cloned()).collect()
753 } else {
754 Vec::new()
755 }
756 }
757
758 pub fn gin_index_get_by_key(&self, index_name: &str, key: &str) -> Vec<Rc<Row>> {
760 if let Some(gin_idx) = self.gin_indices.get(index_name) {
761 let row_ids = gin_idx.get_by_key(key);
762 row_ids.iter().filter_map(|&id| self.rows.get(&id).cloned()).collect()
763 } else {
764 Vec::new()
765 }
766 }
767
768 pub fn gin_index_get_by_key_values_all(&self, index_name: &str, pairs: &[(&str, &str)]) -> Vec<Rc<Row>> {
771 if let Some(gin_idx) = self.gin_indices.get(index_name) {
772 let row_ids = gin_idx.get_by_key_values_all(pairs);
773 row_ids.iter().filter_map(|&id| self.rows.get(&id).cloned()).collect()
774 } else {
775 Vec::new()
776 }
777 }
778}
779
780#[cfg(test)]
781mod tests {
782 use super::*;
783 use cynos_core::schema::TableBuilder;
784 use cynos_core::DataType;
785 use alloc::vec;
786
787 fn test_schema() -> Table {
788 TableBuilder::new("test")
789 .unwrap()
790 .add_column("id", DataType::Int64)
791 .unwrap()
792 .add_column("name", DataType::String)
793 .unwrap()
794 .add_primary_key(&["id"], false)
795 .unwrap()
796 .build()
797 .unwrap()
798 }
799
800 fn test_schema_with_index() -> Table {
801 TableBuilder::new("test")
802 .unwrap()
803 .add_column("id", DataType::Int64)
804 .unwrap()
805 .add_column("value", DataType::Int64)
806 .unwrap()
807 .add_primary_key(&["id"], false)
808 .unwrap()
809 .add_index("idx_value", &["value"], false)
810 .unwrap()
811 .build()
812 .unwrap()
813 }
814
815 #[test]
816 fn test_row_store_insert() {
817 let mut store = RowStore::new(test_schema());
818 let row = Row::new(1, vec![Value::Int64(1), Value::String("Alice".into())]);
819 assert!(store.insert(row).is_ok());
820 assert_eq!(store.len(), 1);
821 }
822
823 #[test]
824 fn test_row_store_get() {
825 let mut store = RowStore::new(test_schema());
826 let row = Row::new(1, vec![Value::Int64(1), Value::String("Alice".into())]);
827 store.insert(row).unwrap();
828 let retrieved = store.get(1);
829 assert!(retrieved.is_some());
830 assert_eq!(retrieved.unwrap().get(1), Some(&Value::String("Alice".into())));
831 }
832
833 #[test]
834 fn test_row_store_update() {
835 let mut store = RowStore::new(test_schema());
836 let row = Row::new(1, vec![Value::Int64(1), Value::String("Alice".into())]);
837 store.insert(row).unwrap();
838 let new_row = Row::new(1, vec![Value::Int64(1), Value::String("Bob".into())]);
839 assert!(store.update(1, new_row).is_ok());
840 let retrieved = store.get(1);
841 assert_eq!(retrieved.unwrap().get(1), Some(&Value::String("Bob".into())));
842 }
843
844 #[test]
845 fn test_row_store_delete() {
846 let mut store = RowStore::new(test_schema());
847 let row = Row::new(1, vec![Value::Int64(1), Value::String("Alice".into())]);
848 store.insert(row).unwrap();
849 assert!(store.delete(1).is_ok());
850 assert_eq!(store.len(), 0);
851 assert!(store.get(1).is_none());
852 }
853
854 #[test]
855 fn test_row_store_pk_uniqueness() {
856 let mut store = RowStore::new(test_schema());
857 let row1 = Row::new(1, vec![Value::Int64(1), Value::String("Alice".into())]);
858 let row2 = Row::new(2, vec![Value::Int64(1), Value::String("Bob".into())]);
859 store.insert(row1).unwrap();
860 assert!(store.insert(row2).is_err());
861 }
862
863 #[test]
864 fn test_row_store_scan() {
865 let mut store = RowStore::new(test_schema());
866 store.insert(Row::new(1, vec![Value::Int64(1), Value::String("Alice".into())])).unwrap();
867 store.insert(Row::new(2, vec![Value::Int64(2), Value::String("Bob".into())])).unwrap();
868 let rows: Vec<_> = store.scan().collect();
869 assert_eq!(rows.len(), 2);
870 }
871
872 #[test]
873 fn test_row_store_index_maintenance() {
874 let mut store = RowStore::new(test_schema_with_index());
875 let row = Row::new(1, vec![Value::Int64(1), Value::Int64(100)]);
876 store.insert(row).unwrap();
877 let results = store.index_scan("idx_value", Some(&KeyRange::only(Value::Int64(100))));
878 assert_eq!(results.len(), 1);
879 store.delete(1).unwrap();
880 let results = store.index_scan("idx_value", Some(&KeyRange::only(Value::Int64(100))));
881 assert_eq!(results.len(), 0);
882 }
883
884 #[test]
885 fn test_row_store_clear() {
886 let mut store = RowStore::new(test_schema());
887 store.insert(Row::new(1, vec![Value::Int64(1), Value::String("Alice".into())])).unwrap();
888 store.insert(Row::new(2, vec![Value::Int64(2), Value::String("Bob".into())])).unwrap();
889 store.clear();
890 assert!(store.is_empty());
891 }
892
893 fn test_schema_composite_pk() -> Table {
896 TableBuilder::new("test")
897 .unwrap()
898 .add_column("id1", DataType::String)
899 .unwrap()
900 .add_column("id2", DataType::Int64)
901 .unwrap()
902 .add_column("name", DataType::String)
903 .unwrap()
904 .add_primary_key(&["id1", "id2"], false)
905 .unwrap()
906 .build()
907 .unwrap()
908 }
909
910 fn test_schema_with_unique_index() -> Table {
911 TableBuilder::new("test")
912 .unwrap()
913 .add_column("id", DataType::Int64)
914 .unwrap()
915 .add_column("email", DataType::String)
916 .unwrap()
917 .add_primary_key(&["id"], false)
918 .unwrap()
919 .add_index("idx_email", &["email"], true) .unwrap()
921 .build()
922 .unwrap()
923 }
924
925 #[test]
926 fn test_composite_primary_key() {
927 let mut store = RowStore::new(test_schema_composite_pk());
928
929 let row1 = Row::new(1, vec![
930 Value::String("pk1".into()),
931 Value::Int64(100),
932 Value::String("Name1".into())
933 ]);
934 assert!(store.insert(row1).is_ok());
935
936 let row2 = Row::new(2, vec![
938 Value::String("pk1".into()),
939 Value::Int64(200),
940 Value::String("Name2".into())
941 ]);
942 assert!(store.insert(row2).is_ok());
943
944 let row3 = Row::new(3, vec![
946 Value::String("pk1".into()),
947 Value::Int64(100),
948 Value::String("Name3".into())
949 ]);
950 assert!(store.insert(row3).is_err());
951 }
952
953 #[test]
954 fn test_insert_or_replace_new() {
955 let mut store = RowStore::new(test_schema());
956 let row = Row::new(1, vec![Value::Int64(1), Value::String("Alice".into())]);
957
958 let (row_id, replaced) = store.insert_or_replace(row).unwrap();
959 assert_eq!(row_id, 1);
960 assert!(!replaced);
961 assert_eq!(store.len(), 1);
962 }
963
964 #[test]
965 fn test_insert_or_replace_existing() {
966 let mut store = RowStore::new(test_schema());
967
968 let row1 = Row::new(1, vec![Value::Int64(1), Value::String("Alice".into())]);
970 store.insert(row1).unwrap();
971
972 let row2 = Row::new(2, vec![Value::Int64(1), Value::String("Updated".into())]);
974 let (row_id, replaced) = store.insert_or_replace(row2).unwrap();
975
976 assert_eq!(row_id, 1); assert!(replaced);
978 assert_eq!(store.len(), 1);
979
980 let stored = store.get(1).unwrap();
981 assert_eq!(stored.get(1), Some(&Value::String("Updated".into())));
982 }
983
984 #[test]
985 fn test_update_pk_violation() {
986 let mut store = RowStore::new(test_schema());
987
988 let row1 = Row::new(1, vec![Value::Int64(1), Value::String("Alice".into())]);
990 let row2 = Row::new(2, vec![Value::Int64(2), Value::String("Bob".into())]);
991 store.insert(row1).unwrap();
992 store.insert(row2).unwrap();
993
994 let row2_updated = Row::new(2, vec![Value::Int64(1), Value::String("Bob Updated".into())]);
996 let result = store.update(2, row2_updated);
997 assert!(result.is_err());
998 }
999
1000 #[test]
1001 fn test_unique_index_violation() {
1002 let mut store = RowStore::new(test_schema_with_unique_index());
1003
1004 let row1 = Row::new(1, vec![Value::Int64(1), Value::String("alice@test.com".into())]);
1005 store.insert(row1).unwrap();
1006
1007 let row2 = Row::new(2, vec![Value::Int64(2), Value::String("alice@test.com".into())]);
1009 let result = store.insert(row2);
1010 assert!(result.is_err());
1011 }
1012
1013 #[test]
1014 fn test_unique_index_update_violation() {
1015 let mut store = RowStore::new(test_schema_with_unique_index());
1016
1017 let row1 = Row::new(1, vec![Value::Int64(1), Value::String("alice@test.com".into())]);
1018 let row2 = Row::new(2, vec![Value::Int64(2), Value::String("bob@test.com".into())]);
1019 store.insert(row1).unwrap();
1020 store.insert(row2).unwrap();
1021
1022 let row2_updated = Row::new(2, vec![Value::Int64(2), Value::String("alice@test.com".into())]);
1024 let result = store.update(2, row2_updated);
1025 assert!(result.is_err());
1026 }
1027
1028 #[test]
1029 fn test_delete_then_insert_same_pk() {
1030 let mut store = RowStore::new(test_schema());
1031
1032 let row1 = Row::new(1, vec![Value::Int64(100), Value::String("Alice".into())]);
1034 store.insert(row1).unwrap();
1035 store.delete(1).unwrap();
1036
1037 let row2 = Row::new(2, vec![Value::Int64(100), Value::String("Bob".into())]);
1039 assert!(store.insert(row2).is_ok());
1040 assert_eq!(store.len(), 1);
1041 }
1042
1043 #[test]
1044 fn test_index_update_maintenance() {
1045 let mut store = RowStore::new(test_schema_with_index());
1046
1047 let row = Row::new(1, vec![Value::Int64(1), Value::Int64(100)]);
1049 store.insert(row).unwrap();
1050
1051 let results = store.index_scan("idx_value", Some(&KeyRange::only(Value::Int64(100))));
1053 assert_eq!(results.len(), 1);
1054
1055 let updated = Row::new(1, vec![Value::Int64(1), Value::Int64(200)]);
1057 store.update(1, updated).unwrap();
1058
1059 let results = store.index_scan("idx_value", Some(&KeyRange::only(Value::Int64(100))));
1061 assert_eq!(results.len(), 0);
1062
1063 let results = store.index_scan("idx_value", Some(&KeyRange::only(Value::Int64(200))));
1065 assert_eq!(results.len(), 1);
1066 }
1067
1068 #[test]
1071 fn test_insert_with_delta() {
1072 let mut store = RowStore::new(test_schema());
1073 let row = Row::new(1, vec![Value::Int64(1), Value::String("Alice".into())]);
1074 let delta = store.insert_with_delta(row.clone()).unwrap();
1075
1076 assert_eq!(delta.diff(), 1);
1077 assert_eq!(delta.data(), &row);
1078 assert_eq!(store.len(), 1);
1079 }
1080
1081 #[test]
1082 fn test_delete_with_delta() {
1083 let mut store = RowStore::new(test_schema());
1084 let row = Row::new(1, vec![Value::Int64(1), Value::String("Alice".into())]);
1085 store.insert(row.clone()).unwrap();
1086
1087 let delta = store.delete_with_delta(1).unwrap();
1088 assert_eq!(delta.diff(), -1);
1089 assert_eq!(delta.data(), &row);
1090 assert_eq!(store.len(), 0);
1091 }
1092
1093 #[test]
1094 fn test_update_with_delta() {
1095 let mut store = RowStore::new(test_schema());
1096 let old_row = Row::new(1, vec![Value::Int64(1), Value::String("Alice".into())]);
1097 store.insert(old_row.clone()).unwrap();
1098
1099 let new_row = Row::new(1, vec![Value::Int64(1), Value::String("Bob".into())]);
1100 let (delete_delta, insert_delta) = store.update_with_delta(1, new_row.clone()).unwrap();
1101
1102 assert_eq!(delete_delta.diff(), -1);
1103 assert_eq!(delete_delta.data(), &old_row);
1104 assert_eq!(insert_delta.diff(), 1);
1105 assert_eq!(insert_delta.data(), &new_row);
1106 assert_eq!(store.len(), 1);
1107 }
1108}