1use std::collections::{BTreeMap, HashMap, HashSet};
20use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
21use std::time::{SystemTime, UNIX_EPOCH};
22
23use super::entity::{CrossRef, EntityData, EntityId, EntityKind, RefType, UnifiedEntity};
24use super::memtable::Memtable;
25use super::metadata::{Metadata, MetadataStorage};
26use crate::storage::primitives::bloom::BloomFilter;
27use crate::storage::query::value_compare::partial_compare_values;
28use crate::storage::schema::{value_to_canonical_key, CanonicalKey, Value};
29
30pub type SegmentId = u64;
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35pub enum SegmentState {
36 Growing,
38 Sealing,
40 Sealed,
42 Flushed,
44 Archived,
46}
47
48impl SegmentState {
49 pub fn is_writable(&self) -> bool {
51 matches!(self, Self::Growing)
52 }
53
54 pub fn is_queryable(&self) -> bool {
56 !matches!(self, Self::Sealing)
57 }
58
59 pub fn is_immutable(&self) -> bool {
61 matches!(self, Self::Sealed | Self::Flushed | Self::Archived)
62 }
63}
64
65#[derive(Debug, Clone)]
67pub struct SegmentConfig {
68 pub max_entities: usize,
70 pub max_bytes: usize,
72 pub max_age_secs: u64,
74 pub build_vector_index: bool,
76 pub build_graph_index: bool,
78 pub compression_level: u8,
80}
81
82impl Default for SegmentConfig {
83 fn default() -> Self {
84 Self {
85 max_entities: 100_000,
86 max_bytes: 256 * 1024 * 1024, max_age_secs: 3600, build_vector_index: true,
89 build_graph_index: true,
90 compression_level: 6,
91 }
92 }
93}
94
95#[derive(Debug, Clone, Default)]
97pub struct SegmentStats {
98 pub entity_count: usize,
100 pub deleted_count: usize,
102 pub memory_bytes: usize,
104 pub vector_count: usize,
106 pub node_count: usize,
108 pub edge_count: usize,
110 pub row_count: usize,
112 pub cross_ref_count: usize,
114}
115
116#[derive(Debug, Clone)]
118pub enum SegmentError {
119 NotWritable,
121 NotFound(EntityId),
123 AlreadyExists(EntityId),
125 Full,
127 InvalidState(SegmentState),
129 Internal(String),
131}
132
133impl std::fmt::Display for SegmentError {
134 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
135 match self {
136 Self::NotWritable => write!(f, "segment is not writable"),
137 Self::NotFound(id) => write!(f, "entity not found: {}", id),
138 Self::AlreadyExists(id) => write!(f, "entity already exists: {}", id),
139 Self::Full => write!(f, "segment is full"),
140 Self::InvalidState(state) => write!(f, "invalid operation for state: {:?}", state),
141 Self::Internal(msg) => write!(f, "internal error: {}", msg),
142 }
143 }
144}
145
146impl std::error::Error for SegmentError {}
147
148fn current_unix_secs() -> u64 {
149 SystemTime::now()
150 .duration_since(UNIX_EPOCH)
151 .unwrap_or_default()
152 .as_secs()
153}
154
155const SEALED_MULTI_ZONE_MAX_INTERVALS: usize = 4;
156
157#[derive(Debug, Clone)]
158struct UpdateIndexSnapshot {
159 pk_column_name: Option<String>,
160 pk_value: Option<Value>,
161 pk_index_key: Option<(String, String)>,
162 cross_refs: Vec<CrossRef>,
163}
164
165impl UpdateIndexSnapshot {
166 fn from_entity(entity: &UnifiedEntity) -> Self {
167 let (pk_column_name, pk_value) = match &entity.data {
168 EntityData::Row(row) => (
169 row.schema
170 .as_deref()
171 .and_then(|schema| schema.first().cloned()),
172 row.columns.first().cloned(),
173 ),
174 _ => (None, None),
175 };
176 let pk_index_key = pk_value
177 .as_ref()
178 .map(|value| (entity.kind.collection().to_string(), format!("{:?}", value)));
179 Self {
180 pk_column_name,
181 pk_value,
182 pk_index_key,
183 cross_refs: entity.cross_refs().to_vec(),
184 }
185 }
186}
187
188pub trait UnifiedSegment: Send + Sync {
190 fn id(&self) -> SegmentId;
192
193 fn state(&self) -> SegmentState;
195
196 fn collection(&self) -> &str;
198
199 fn stats(&self) -> SegmentStats;
201
202 fn entity_count(&self) -> usize;
204
205 fn contains(&self, id: EntityId) -> bool;
207
208 fn get(&self, id: EntityId) -> Option<&UnifiedEntity>;
210
211 fn get_mut(&mut self, id: EntityId) -> Option<&mut UnifiedEntity>;
213
214 fn insert(&mut self, entity: UnifiedEntity) -> Result<EntityId, SegmentError>;
216
217 fn update(&mut self, entity: UnifiedEntity) -> Result<(), SegmentError>;
219
220 fn update_hot(
224 &mut self,
225 entity: UnifiedEntity,
226 modified_columns: &[String],
227 ) -> Result<(), SegmentError> {
228 let _ = modified_columns;
229 self.update(entity)
230 }
231
232 fn delete(&mut self, id: EntityId) -> Result<bool, SegmentError>;
234
235 fn get_metadata(&self, id: EntityId) -> Option<Metadata>;
237
238 fn set_metadata(&mut self, id: EntityId, metadata: Metadata) -> Result<(), SegmentError>;
240
241 fn seal(&mut self) -> Result<(), SegmentError>;
243
244 fn should_seal(&self, config: &SegmentConfig) -> bool;
246
247 fn iter(&self) -> Box<dyn Iterator<Item = &UnifiedEntity> + '_>;
249
250 fn iter_kind(&self, kind_filter: &str) -> Box<dyn Iterator<Item = &UnifiedEntity> + '_>;
252
253 fn filter_metadata(
255 &self,
256 filters: &[(String, super::metadata::MetadataFilter)],
257 ) -> Vec<EntityId>;
258}
259
260#[derive(Debug, Clone)]
267pub struct ColZone {
268 pub min: Value,
269 pub max: Value,
270 min_key: Option<CanonicalKey>,
271 max_key: Option<CanonicalKey>,
272}
273
274impl ColZone {
275 fn new(v: Value) -> Self {
276 Self {
277 min_key: value_to_canonical_key(&v),
278 max_key: value_to_canonical_key(&v),
279 min: v.clone(),
280 max: v,
281 }
282 }
283
284 fn with_bounds(min: Value, max: Value) -> Self {
285 Self {
286 min_key: value_to_canonical_key(&min),
287 max_key: value_to_canonical_key(&max),
288 min,
289 max,
290 }
291 }
292
293 fn update(&mut self, v: &Value) {
294 if compare_zone_values(v, None, &self.min, self.min_key.as_ref())
295 .map(|o| o == std::cmp::Ordering::Less)
296 .unwrap_or(false)
297 {
298 self.min = v.clone();
299 self.min_key = value_to_canonical_key(v);
300 }
301 if compare_zone_values(v, None, &self.max, self.max_key.as_ref())
302 .map(|o| o == std::cmp::Ordering::Greater)
303 .unwrap_or(false)
304 {
305 self.max = v.clone();
306 self.max_key = value_to_canonical_key(v);
307 }
308 }
309}
310
311#[derive(Debug, Clone, Default)]
312pub struct MultiColZone {
313 pub intervals: Vec<ColZone>,
314}
315
316impl MultiColZone {
317 fn can_skip(&self, pred: &ZoneColPred<'_>) -> bool {
318 !self.intervals.is_empty() && self.intervals.iter().all(|zone| pred.can_skip(zone))
319 }
320}
321
322fn compare_zone_values(
323 left: &Value,
324 left_key: Option<&CanonicalKey>,
325 right: &Value,
326 right_key: Option<&CanonicalKey>,
327) -> Option<std::cmp::Ordering> {
328 partial_compare_values(left, right).or_else(|| {
329 let left_key = left_key.cloned().or_else(|| value_to_canonical_key(left))?;
330 let right_key = right_key
331 .cloned()
332 .or_else(|| value_to_canonical_key(right))?;
333 (left_key.family() == right_key.family()).then(|| left_key.cmp(&right_key))
334 })
335}
336
337#[derive(Debug, Clone, Copy, PartialEq, Eq)]
339pub enum ZoneColPredKind {
340 Eq,
341 Gt,
342 Gte,
343 Lt,
344 Lte,
345}
346
347#[derive(Debug, Clone)]
349pub enum ZoneColPred<'a> {
350 Eq(&'a Value),
351 Gt(&'a Value),
352 Gte(&'a Value),
353 Lt(&'a Value),
354 Lte(&'a Value),
355}
356
357impl<'a> ZoneColPred<'a> {
358 pub fn can_skip(&self, zone: &ColZone) -> bool {
360 match self {
361 ZoneColPred::Eq(val) => {
363 compare_zone_values(val, None, &zone.min, zone.min_key.as_ref())
364 .map(|o| o == std::cmp::Ordering::Less)
365 .unwrap_or(false)
366 || compare_zone_values(val, None, &zone.max, zone.max_key.as_ref())
367 .map(|o| o == std::cmp::Ordering::Greater)
368 .unwrap_or(false)
369 }
370 ZoneColPred::Gt(val) => {
372 compare_zone_values(&zone.max, zone.max_key.as_ref(), val, None)
373 .map(|o| o != std::cmp::Ordering::Greater)
374 .unwrap_or(false)
375 }
376 ZoneColPred::Gte(val) => {
378 compare_zone_values(&zone.max, zone.max_key.as_ref(), val, None)
379 .map(|o| o == std::cmp::Ordering::Less)
380 .unwrap_or(false)
381 }
382 ZoneColPred::Lt(val) => {
384 compare_zone_values(&zone.min, zone.min_key.as_ref(), val, None)
385 .map(|o| o != std::cmp::Ordering::Less)
386 .unwrap_or(false)
387 }
388 ZoneColPred::Lte(val) => {
390 compare_zone_values(&zone.min, zone.min_key.as_ref(), val, None)
391 .map(|o| o == std::cmp::Ordering::Greater)
392 .unwrap_or(false)
393 }
394 }
395 }
396}
397
398pub struct GrowingSegment {
400 id: SegmentId,
402 collection: String,
404 state: SegmentState,
406 created_at: u64,
408 last_write_at: u64,
410
411 entities: HashMap<EntityId, UnifiedEntity>,
413 flat_entities: Vec<UnifiedEntity>,
416 base_entity_id: u64,
418 use_flat: bool,
420 deleted: HashSet<EntityId>,
422 metadata: MetadataStorage,
424
425 pk_index: BTreeMap<(String, String), EntityId>,
427 kind_index: HashMap<String, HashSet<EntityId>>,
429 cross_ref_forward: HashMap<EntityId, Vec<(EntityId, RefType)>>,
431 cross_ref_reverse: HashMap<EntityId, Vec<(EntityId, RefType)>>,
433
434 bloom: BloomFilter,
436
437 memtable: Memtable,
439
440 col_zones: HashMap<String, ColZone>,
442 sealed_col_zones: HashMap<String, MultiColZone>,
444
445 sequence: AtomicU64,
447 memory_bytes: AtomicU64,
449
450 pub(crate) published_flat_len: AtomicUsize,
457}
458
459impl GrowingSegment {
460 #[inline]
463 pub fn for_each_fast<F>(&self, mut f: F) -> bool
464 where
465 F: FnMut(&UnifiedEntity) -> bool,
466 {
467 if self.use_flat {
468 if self.deleted.is_empty() {
470 for entity in &self.flat_entities {
471 if !f(entity) {
472 return false;
473 }
474 }
475 for entity in self.entities.values() {
478 if !f(entity) {
479 return false;
480 }
481 }
482 } else {
483 for entity in &self.flat_entities {
484 if self.deleted.contains(&entity.id) {
485 continue;
486 }
487 if !f(entity) {
488 return false;
489 }
490 }
491 for entity in self.entities.values() {
492 if self.deleted.contains(&entity.id) {
493 continue;
494 }
495 if !f(entity) {
496 return false;
497 }
498 }
499 }
500 } else {
501 if self.deleted.is_empty() {
503 for entity in self.entities.values() {
504 if !f(entity) {
505 return false;
506 }
507 }
508 } else {
509 for entity in self.entities.values() {
510 if self.deleted.contains(&entity.id) {
511 continue;
512 }
513 if !f(entity) {
514 return false;
515 }
516 }
517 }
518 }
519 true
520 }
521
522 pub fn new(id: SegmentId, collection: impl Into<String>) -> Self {
524 let now = current_unix_secs();
525
526 Self {
527 id,
528 collection: collection.into(),
529 state: SegmentState::Growing,
530 created_at: now,
531 last_write_at: now,
532 entities: HashMap::new(),
533 flat_entities: Vec::new(),
534 base_entity_id: 0,
535 use_flat: false,
536 deleted: HashSet::new(),
537 metadata: MetadataStorage::new(),
538 pk_index: BTreeMap::new(),
539 kind_index: HashMap::new(),
540 cross_ref_forward: HashMap::new(),
541 cross_ref_reverse: HashMap::new(),
542 bloom: BloomFilter::with_capacity(100_000, 0.01),
543 memtable: Memtable::new(),
544 col_zones: HashMap::new(),
545 sealed_col_zones: HashMap::new(),
546 sequence: AtomicU64::new(0),
547 memory_bytes: AtomicU64::new(0),
548 published_flat_len: AtomicUsize::new(0),
549 }
550 }
551
552 fn next_sequence(&self) -> u64 {
554 self.sequence.fetch_add(1, Ordering::SeqCst)
555 }
556
557 fn has_live_entity(&self, id: EntityId) -> bool {
558 if self.deleted.contains(&id) {
559 return false;
560 }
561 if self.use_flat {
562 let raw = id.raw();
563 if raw >= self.base_entity_id {
564 let idx = (raw - self.base_entity_id) as usize;
565 if self
566 .flat_entities
567 .get(idx)
568 .is_some_and(|entity| entity.id == id)
569 {
570 return true;
571 }
572 }
573 self.entities.contains_key(&id)
576 } else {
577 self.entities.contains_key(&id)
578 }
579 }
580
581 fn update_existing_entity_in_place(
582 &mut self,
583 entity: &UnifiedEntity,
584 ) -> Result<UpdateIndexSnapshot, SegmentError> {
585 if self.use_flat {
586 let raw = entity.id.raw();
587 if raw < self.base_entity_id {
588 return Err(SegmentError::NotFound(entity.id));
589 }
590 let idx = (raw - self.base_entity_id) as usize;
591 let Some(slot) = self.flat_entities.get_mut(idx) else {
592 return Err(SegmentError::NotFound(entity.id));
593 };
594 if slot.id != entity.id {
595 return Err(SegmentError::NotFound(entity.id));
596 }
597 let snapshot = UpdateIndexSnapshot::from_entity(slot);
598 slot.clone_from(entity);
599 Ok(snapshot)
600 } else {
601 let Some(slot) = self.entities.get_mut(&entity.id) else {
602 return Err(SegmentError::NotFound(entity.id));
603 };
604 let snapshot = UpdateIndexSnapshot::from_entity(slot);
605 slot.clone_from(entity);
606 Ok(snapshot)
607 }
608 }
609
610 fn apply_hot_update_with_metadata(
611 &mut self,
612 entity: &UnifiedEntity,
613 modified_columns: &[String],
614 metadata: Option<&Metadata>,
615 ) -> Result<(), SegmentError> {
616 let old = self.update_existing_entity_in_place(entity)?;
617 self.reindex_for_update(&old, entity, Some(modified_columns));
618 self.update_col_zones_from_entity(entity);
619 if let Some(metadata) = metadata {
620 self.metadata.set_all(entity.id, metadata);
621 }
622 Ok(())
623 }
624
625 fn apply_update_with_metadata(
626 &mut self,
627 entity: &UnifiedEntity,
628 metadata: Option<&Metadata>,
629 ) -> Result<(), SegmentError> {
630 let old = self.update_existing_entity_in_place(entity)?;
631 self.reindex_for_update(&old, entity, None);
632 self.update_col_zones_from_entity(entity);
633 if let Some(metadata) = metadata {
634 self.metadata.set_all(entity.id, metadata);
635 }
636 Ok(())
637 }
638
639 pub fn update_hot_batch_with_metadata<'a, I>(&mut self, items: I) -> Result<(), SegmentError>
640 where
641 I: IntoIterator<Item = (&'a UnifiedEntity, &'a [String], Option<&'a Metadata>)>,
642 {
643 if !self.state.is_writable() {
644 return Err(SegmentError::NotWritable);
645 }
646
647 let items: Vec<(&UnifiedEntity, &[String], Option<&Metadata>)> =
648 items.into_iter().collect();
649 if items.is_empty() {
650 return Ok(());
651 }
652
653 for (entity, _, _) in &items {
654 if !self.has_live_entity(entity.id) {
655 return Err(SegmentError::NotFound(entity.id));
656 }
657 }
658
659 for (entity, modified_columns, metadata) in items {
660 self.apply_hot_update_with_metadata(entity, modified_columns, metadata)?;
661 }
662
663 self.last_write_at = current_unix_secs();
664 Ok(())
665 }
666
667 pub fn delete_batch(&mut self, ids: &[EntityId]) -> Result<Vec<EntityId>, SegmentError> {
668 if !self.state.is_writable() {
669 return Err(SegmentError::NotWritable);
670 }
671 if ids.is_empty() {
672 return Ok(Vec::new());
673 }
674
675 let mut deleted_ids = Vec::with_capacity(ids.len());
676
677 if self.use_flat {
678 for &id in ids {
679 let raw = id.raw();
680 if raw < self.base_entity_id {
681 continue;
682 }
683 let idx = (raw - self.base_entity_id) as usize;
684 if idx < self.flat_entities.len()
685 && self.flat_entities[idx].id == id
686 && !self.deleted.contains(&id)
687 {
688 self.metadata.remove_all(id);
689 self.deleted.insert(id);
690 deleted_ids.push(id);
691 }
692 }
693 } else {
694 for &id in ids {
695 if let Some(entity) = self.entities.remove(&id) {
696 self.unindex_entity(&entity);
697 self.metadata.remove_all(id);
698 self.deleted.insert(id);
699 deleted_ids.push(id);
700 }
701 }
702 }
703
704 if !deleted_ids.is_empty() {
705 self.last_write_at = current_unix_secs();
706 }
707
708 Ok(deleted_ids)
709 }
710
711 fn add_memory(&self, bytes: usize) {
713 self.memory_bytes.fetch_add(bytes as u64, Ordering::Relaxed);
714 }
715
716 fn estimate_entity_size(entity: &UnifiedEntity) -> usize {
718 let mut size = std::mem::size_of::<UnifiedEntity>();
719
720 size += match &entity.data {
722 EntityData::Row(row) => row.columns.len() * 64, EntityData::Node(node) => node.properties.len() * 128,
724 EntityData::Edge(edge) => edge.properties.len() * 128,
725 EntityData::Vector(vec) => {
726 vec.dense.len() * 4 + vec.sparse.as_ref().map_or(0, |s| s.indices.len() * 8)
727 }
728 EntityData::TimeSeries(_) => 64,
729 EntityData::QueueMessage(_) => 128,
730 };
731
732 for emb in entity.embeddings() {
734 size += emb.vector.len() * 4 + emb.name.len() + emb.model.len();
735 }
736
737 size += std::mem::size_of_val(entity.cross_refs());
739
740 size
741 }
742
743 fn update_col_zones_from_entity(&mut self, entity: &UnifiedEntity) {
751 if let EntityData::Row(row) = &entity.data {
752 if let Some(named) = &row.named {
753 for (col, val) in named {
755 if matches!(val, Value::Null) {
756 continue;
757 }
758 self.col_zones
759 .entry(col.clone())
760 .and_modify(|z| z.update(val))
761 .or_insert_with(|| ColZone::new(val.clone()));
762 }
763 } else if let Some(schema) = &row.schema {
764 for (col, val) in schema.iter().zip(row.columns.iter()) {
767 if matches!(val, Value::Null) {
768 continue;
769 }
770 self.col_zones
771 .entry(col.clone())
772 .and_modify(|z| z.update(val))
773 .or_insert_with(|| ColZone::new(val.clone()));
774 }
775 }
776 }
777 }
778
779 fn rebuild_sealed_col_zones(&mut self) {
780 let mut values_by_col: HashMap<String, Vec<(CanonicalKey, Value)>> = HashMap::new();
781 let mut family_by_col: HashMap<String, crate::storage::schema::CanonicalKeyFamily> =
782 HashMap::new();
783 let mut mixed_family_cols = HashSet::new();
784 let mut unsupported_cols = HashSet::new();
785
786 let mut observe_row = |row: &super::entity::RowData| {
787 for (col, value) in row.iter_fields() {
788 if matches!(value, Value::Null) {
789 continue;
790 }
791 let Some(key) = value_to_canonical_key(value) else {
792 unsupported_cols.insert(col.to_string());
793 continue;
794 };
795 match family_by_col.get(col).copied() {
796 Some(existing) if existing != key.family() => {
797 mixed_family_cols.insert(col.to_string());
798 }
799 None => {
800 family_by_col.insert(col.to_string(), key.family());
801 }
802 _ => {}
803 }
804 values_by_col
805 .entry(col.to_string())
806 .or_default()
807 .push((key, value.clone()));
808 }
809 };
810
811 if self.use_flat {
812 for entity in &self.flat_entities {
813 if self.deleted.contains(&entity.id) {
814 continue;
815 }
816 if let EntityData::Row(row) = &entity.data {
817 observe_row(row);
818 }
819 }
820 } else {
821 for entity in self.entities.values() {
822 if self.deleted.contains(&entity.id) {
823 continue;
824 }
825 if let EntityData::Row(row) = &entity.data {
826 observe_row(row);
827 }
828 }
829 }
830
831 let mut sealed_col_zones = HashMap::new();
832 for (col, mut entries) in values_by_col {
833 if mixed_family_cols.contains(&col)
834 || unsupported_cols.contains(&col)
835 || entries.is_empty()
836 {
837 continue;
838 }
839 entries.sort_unstable_by(|left, right| left.0.cmp(&right.0));
840 entries.dedup_by(|left, right| left.0 == right.0);
841
842 let intervals = build_minmax_multi_intervals(&entries, SEALED_MULTI_ZONE_MAX_INTERVALS);
843 if intervals.len() > 1 {
844 sealed_col_zones.insert(col, MultiColZone { intervals });
845 }
846 }
847
848 self.sealed_col_zones = sealed_col_zones;
849 }
850
851 pub fn can_skip_zone_preds(&self, preds: &[(&str, ZoneColPred<'_>)]) -> bool {
855 if preds.is_empty() {
856 return false;
857 }
858 for (col, pred) in preds {
859 if let Some(zone) = self.sealed_col_zones.get(*col) {
860 if zone.can_skip(pred) {
861 return true;
862 }
863 continue;
864 }
865 if let Some(zone) = self.col_zones.get(*col) {
866 if pred.can_skip(zone) {
867 return true; }
869 }
870 }
871 false
872 }
873
874 fn index_entity(&mut self, entity: &UnifiedEntity) {
876 let kind_key = entity.kind.storage_type().to_string();
878 self.kind_index
879 .entry(kind_key)
880 .or_default()
881 .insert(entity.id);
882
883 let id_bytes = entity.id.raw().to_le_bytes();
885 self.bloom.insert(&id_bytes);
886
887 if let EntityData::Row(row) = &entity.data {
889 if let Some(first_col) = row.columns.first() {
890 let pk_str = format!("{:?}", first_col);
891 self.bloom.insert(pk_str.as_bytes());
893 self.pk_index
894 .insert((entity.kind.collection().to_string(), pk_str), entity.id);
895 }
896 }
897
898 for cross_ref in entity.cross_refs() {
900 self.cross_ref_forward
901 .entry(cross_ref.source)
902 .or_default()
903 .push((cross_ref.target, cross_ref.ref_type));
904
905 self.cross_ref_reverse
906 .entry(cross_ref.target)
907 .or_default()
908 .push((cross_ref.source, cross_ref.ref_type));
909 }
910 }
911
912 pub fn bloom_might_contain_id(&self, id: EntityId) -> bool {
915 let id_bytes = id.raw().to_le_bytes();
916 self.bloom.contains(&id_bytes)
917 }
918
919 pub fn bloom_might_contain_key(&self, key: &[u8]) -> bool {
921 self.bloom.contains(key)
922 }
923
924 pub fn bloom_stats(&self) -> (f64, u32) {
926 (self.bloom.fill_ratio(), self.bloom.count_set_bits())
927 }
928
929 fn unindex_entity(&mut self, entity: &UnifiedEntity) {
931 let kind_key = entity.kind.storage_type().to_string();
933 if let Some(set) = self.kind_index.get_mut(&kind_key) {
934 set.remove(&entity.id);
935 }
936
937 if let EntityData::Row(row) = &entity.data {
939 if let Some(first_col) = row.columns.first() {
940 let pk_str = format!("{:?}", first_col);
941 self.pk_index
942 .remove(&(entity.kind.collection().to_string(), pk_str));
943 }
944 }
945
946 self.cross_ref_forward.remove(&entity.id);
948 }
950
951 fn reindex_for_update(
963 &mut self,
964 old: &UpdateIndexSnapshot,
965 new: &UnifiedEntity,
966 modified_columns: Option<&[String]>,
967 ) {
968 let pk_changed = match &new.data {
975 EntityData::Row(new_row) => {
976 if let Some(cols) = modified_columns {
977 let pk_col_name = old.pk_column_name.as_deref().or_else(|| {
980 new_row
981 .schema
982 .as_deref()
983 .and_then(|schema| schema.first().map(|name| name.as_str()))
984 });
985 match pk_col_name {
986 Some(pk_name) => cols.iter().any(|c| c.eq_ignore_ascii_case(pk_name)),
987 None => old.pk_value.as_ref() != new_row.columns.first(),
989 }
990 } else {
991 old.pk_value.as_ref() != new_row.columns.first()
992 }
993 }
994 _ => false,
996 };
997
998 if pk_changed {
999 if let Some((collection, pk_str)) = &old.pk_index_key {
1001 self.pk_index.remove(&(collection.clone(), pk_str.clone()));
1002 }
1003 if let EntityData::Row(row) = &new.data {
1005 if let Some(first_col) = row.columns.first() {
1006 let pk_str = format!("{:?}", first_col);
1007 self.bloom.insert(pk_str.as_bytes());
1008 self.pk_index
1009 .insert((new.kind.collection().to_string(), pk_str), new.id);
1010 }
1011 }
1012 }
1013
1014 let new_refs = new.cross_refs();
1016 if old.cross_refs.as_slice() != new_refs {
1017 self.cross_ref_forward.remove(&new.id);
1019 for cross_ref in &old.cross_refs {
1021 if let Some(rev) = self.cross_ref_reverse.get_mut(&cross_ref.target) {
1022 rev.retain(|(src, _)| *src != new.id);
1023 }
1024 }
1025 for cross_ref in new_refs {
1027 self.cross_ref_forward
1028 .entry(cross_ref.source)
1029 .or_default()
1030 .push((cross_ref.target, cross_ref.ref_type));
1031 self.cross_ref_reverse
1032 .entry(cross_ref.target)
1033 .or_default()
1034 .push((cross_ref.source, cross_ref.ref_type));
1035 }
1036 }
1037 }
1038
1039 pub fn get_references_to(&self, id: EntityId) -> Vec<(EntityId, RefType)> {
1041 self.cross_ref_reverse.get(&id).cloned().unwrap_or_default()
1042 }
1043
1044 pub fn get_references_from(&self, id: EntityId) -> Vec<(EntityId, RefType)> {
1046 self.cross_ref_forward.get(&id).cloned().unwrap_or_default()
1047 }
1048
1049 pub fn memtable_stats(&self) -> super::memtable::MemtableStats {
1051 self.memtable.stats()
1052 }
1053
1054 pub fn memtable_should_flush(&self) -> bool {
1056 self.memtable.should_flush()
1057 }
1058
1059 pub fn age_secs(&self) -> u64 {
1061 let now = current_unix_secs();
1062 now.saturating_sub(self.created_at)
1063 }
1064
1065 pub fn idle_secs(&self) -> u64 {
1067 let now = current_unix_secs();
1068 now.saturating_sub(self.last_write_at)
1069 }
1070
1071 pub fn bulk_insert(
1080 &mut self,
1081 entities: Vec<UnifiedEntity>,
1082 ) -> Result<Vec<EntityId>, SegmentError> {
1083 if !self.state.is_writable() {
1084 return Err(SegmentError::NotWritable);
1085 }
1086
1087 let n = entities.len();
1088
1089 let kind_key = if let Some(first) = entities.first() {
1091 first.kind.storage_type().to_string()
1092 } else {
1093 return Ok(Vec::new());
1094 };
1095
1096 let kind_set = self.kind_index.entry(kind_key).or_default();
1097 kind_set.reserve(n);
1098
1099 let now = current_unix_secs();
1100
1101 let base_seq = self.sequence.fetch_add(n as u64, Ordering::Relaxed);
1102
1103 let mut ids = Vec::with_capacity(n);
1104
1105 if self.flat_entities.is_empty() && self.entities.is_empty() {
1107 self.base_entity_id = entities.first().map(|e| e.id.raw()).unwrap_or(0);
1109 self.use_flat = true;
1110 }
1111
1112 let mut columnar_zone_updates: Vec<Vec<Value>> = Vec::new();
1122 let mut columnar_schema: Option<std::sync::Arc<Vec<String>>> = None;
1123 let mut named_zone_updates: Vec<(String, Value)> = Vec::new();
1127
1128 if self.use_flat {
1129 self.flat_entities.reserve(n);
1130 for (i, mut entity) in entities.into_iter().enumerate() {
1131 entity.sequence_id = base_seq + i as u64;
1132 let id = entity.id;
1133 kind_set.insert(id);
1134 ids.push(id);
1135 if let EntityData::Row(row) = &entity.data {
1136 if row.schema.is_some() && !row.columns.is_empty() {
1137 if columnar_zone_updates.is_empty() {
1138 columnar_zone_updates = vec![Vec::with_capacity(n); row.columns.len()];
1139 columnar_schema = row.schema.clone();
1140 }
1141 for (ci, val) in row.columns.iter().enumerate() {
1142 if !matches!(val, Value::Null) {
1143 if let Some(bucket) = columnar_zone_updates.get_mut(ci) {
1144 bucket.push(val.clone());
1145 }
1146 }
1147 }
1148 } else {
1149 for (col, val) in row.iter_fields() {
1150 if !matches!(val, Value::Null) {
1151 named_zone_updates.push((col.to_string(), val.clone()));
1152 }
1153 }
1154 }
1155 }
1156 let expected = self.base_entity_id + self.flat_entities.len() as u64;
1172 if id.raw() == expected {
1173 self.flat_entities.push(entity);
1174 } else {
1175 self.entities.insert(id, entity);
1176 }
1177 }
1178 } else {
1179 self.entities.reserve(n);
1181 let mut pairs = Vec::with_capacity(n);
1182 for (i, mut entity) in entities.into_iter().enumerate() {
1183 entity.sequence_id = base_seq + i as u64;
1184 let id = entity.id;
1185 kind_set.insert(id);
1186 ids.push(id);
1187 if let EntityData::Row(row) = &entity.data {
1188 for (col, val) in row.iter_fields() {
1189 if !matches!(val, Value::Null) {
1190 named_zone_updates.push((col.to_string(), val.clone()));
1191 }
1192 }
1193 }
1194 pairs.push((id, entity));
1195 }
1196 self.entities.extend(pairs);
1197 }
1198
1199 let _ = kind_set;
1203 if !columnar_zone_updates.is_empty() {
1204 let schema = columnar_schema.as_ref();
1205 for (ci, values) in columnar_zone_updates.into_iter().enumerate() {
1206 if values.is_empty() {
1207 continue;
1208 }
1209 let Some(col_name) = schema.and_then(|s| s.get(ci)) else {
1210 continue;
1211 };
1212 let mut iter = values.into_iter();
1218 if let Some(first) = iter.next() {
1219 let zone = self
1220 .col_zones
1221 .entry(col_name.clone())
1222 .and_modify(|z| z.update(&first))
1223 .or_insert_with(|| ColZone::new(first));
1224 for v in iter {
1225 zone.update(&v);
1226 }
1227 }
1228 }
1229 }
1230 for (col, val) in named_zone_updates {
1231 self.col_zones
1232 .entry(col)
1233 .and_modify(|z| z.update(&val))
1234 .or_insert_with(|| ColZone::new(val));
1235 }
1236
1237 self.last_write_at = now;
1238
1239 if self.use_flat {
1241 self.published_flat_len
1242 .store(self.flat_entities.len(), Ordering::Release);
1243 }
1244
1245 Ok(ids)
1246 }
1247
1248 pub(crate) fn force_delete(&mut self, id: EntityId) -> bool {
1251 if self.use_flat {
1252 let raw = id.raw();
1253 if raw >= self.base_entity_id {
1254 let idx = (raw - self.base_entity_id) as usize;
1255 if idx < self.flat_entities.len() && self.flat_entities[idx].id == id {
1256 self.deleted.insert(id);
1257 self.metadata.remove_all(id);
1258 return true;
1259 }
1260 }
1261 return false;
1262 }
1263
1264 if let Some(entity) = self.entities.remove(&id) {
1265 self.unindex_entity(&entity);
1266 self.metadata.remove_all(id);
1267 self.deleted.insert(id);
1268 true
1269 } else {
1270 false
1271 }
1272 }
1273
1274 pub(crate) fn force_update_with_metadata(
1277 &mut self,
1278 entity: &UnifiedEntity,
1279 modified_columns: &[String],
1280 metadata: Option<&Metadata>,
1281 ) -> Result<(), SegmentError> {
1282 self.apply_hot_update_with_metadata(entity, modified_columns, metadata)
1283 }
1284}
1285
1286impl UnifiedSegment for GrowingSegment {
1287 fn id(&self) -> SegmentId {
1288 self.id
1289 }
1290
1291 fn state(&self) -> SegmentState {
1292 self.state
1293 }
1294
1295 fn collection(&self) -> &str {
1296 &self.collection
1297 }
1298
1299 fn stats(&self) -> SegmentStats {
1300 let mut stats = SegmentStats {
1301 entity_count: self.entities.len(),
1302 deleted_count: self.deleted.len(),
1303 memory_bytes: self.memory_bytes.load(Ordering::Relaxed) as usize,
1304 ..Default::default()
1305 };
1306
1307 for entity in self.entities.values() {
1308 match &entity.kind {
1309 EntityKind::TableRow { .. } => stats.row_count += 1,
1310 EntityKind::GraphNode(_) => stats.node_count += 1,
1311 EntityKind::GraphEdge(_) => stats.edge_count += 1,
1312 EntityKind::Vector { .. } => stats.vector_count += 1,
1313 EntityKind::TimeSeriesPoint(_) => stats.row_count += 1,
1314 EntityKind::QueueMessage { .. } => stats.row_count += 1,
1315 }
1316 stats.cross_ref_count += entity.cross_refs().len();
1317 }
1318
1319 stats
1320 }
1321
1322 fn entity_count(&self) -> usize {
1323 let total = if self.use_flat {
1324 self.flat_entities.len()
1325 } else {
1326 self.entities.len()
1327 };
1328 total.saturating_sub(self.deleted.len())
1329 }
1330
1331 fn contains(&self, id: EntityId) -> bool {
1332 self.has_live_entity(id)
1333 }
1334
1335 fn get(&self, id: EntityId) -> Option<&UnifiedEntity> {
1336 if self.deleted.contains(&id) {
1337 return None;
1338 }
1339 if self.use_flat {
1340 let raw = id.raw();
1341 if raw >= self.base_entity_id {
1342 let idx = (raw - self.base_entity_id) as usize;
1343 if let Some(entity) = self.flat_entities.get(idx).filter(|e| e.id == id) {
1344 return Some(entity);
1345 }
1346 }
1347 self.entities.get(&id)
1353 } else {
1354 self.entities.get(&id)
1355 }
1356 }
1357
1358 fn get_mut(&mut self, id: EntityId) -> Option<&mut UnifiedEntity> {
1359 if self.deleted.contains(&id) || !self.state.is_writable() {
1360 return None;
1361 }
1362 if self.use_flat {
1363 let raw = id.raw();
1364 if raw >= self.base_entity_id {
1365 let idx = (raw - self.base_entity_id) as usize;
1366 if self
1367 .flat_entities
1368 .get(idx)
1369 .map(|e| e.id == id)
1370 .unwrap_or(false)
1371 {
1372 return self.flat_entities.get_mut(idx);
1373 }
1374 }
1375 self.entities.get_mut(&id)
1378 } else {
1379 self.entities.get_mut(&id)
1380 }
1381 }
1382
1383 fn insert(&mut self, mut entity: UnifiedEntity) -> Result<EntityId, SegmentError> {
1384 if !self.state.is_writable() {
1385 return Err(SegmentError::NotWritable);
1386 }
1387
1388 if self.entities.contains_key(&entity.id) {
1389 return Err(SegmentError::AlreadyExists(entity.id));
1390 }
1391
1392 entity.sequence_id = self.next_sequence();
1394
1395 let size = Self::estimate_entity_size(&entity);
1397 self.add_memory(size);
1398
1399 self.index_entity(&entity);
1401
1402 self.update_col_zones_from_entity(&entity);
1404
1405 let id = entity.id;
1407 self.entities.insert(id, entity);
1408
1409 self.last_write_at = current_unix_secs();
1411
1412 Ok(id)
1413 }
1414
1415 fn update(&mut self, entity: UnifiedEntity) -> Result<(), SegmentError> {
1416 if !self.state.is_writable() {
1417 return Err(SegmentError::NotWritable);
1418 }
1419
1420 self.apply_update_with_metadata(&entity, None)?;
1421 self.last_write_at = current_unix_secs();
1422
1423 Ok(())
1424 }
1425
1426 fn update_hot(
1427 &mut self,
1428 entity: UnifiedEntity,
1429 modified_columns: &[String],
1430 ) -> Result<(), SegmentError> {
1431 if !self.state.is_writable() {
1432 return Err(SegmentError::NotWritable);
1433 }
1434
1435 self.apply_hot_update_with_metadata(&entity, modified_columns, None)?;
1436 self.last_write_at = current_unix_secs();
1437 Ok(())
1438 }
1439
1440 fn delete(&mut self, id: EntityId) -> Result<bool, SegmentError> {
1441 if !self.state.is_writable() {
1442 return Err(SegmentError::NotWritable);
1443 }
1444
1445 if self.use_flat {
1447 let raw = id.raw();
1448 if raw >= self.base_entity_id {
1449 let idx = (raw - self.base_entity_id) as usize;
1450 if idx < self.flat_entities.len() && self.flat_entities[idx].id == id {
1451 self.metadata.remove_all(id);
1452 self.deleted.insert(id);
1453 return Ok(true);
1454 }
1455 }
1456 return Ok(false);
1457 }
1458
1459 let entity = self.entities.remove(&id);
1461 if entity.is_none() {
1462 return Ok(false);
1463 }
1464
1465 if let Some(ref e) = entity {
1467 self.unindex_entity(e);
1468 }
1469
1470 self.metadata.remove_all(id);
1472
1473 self.deleted.insert(id);
1475
1476 Ok(true)
1477 }
1478
1479 fn get_metadata(&self, id: EntityId) -> Option<Metadata> {
1480 if !self.has_live_entity(id) {
1481 return None;
1482 }
1483 Some(self.metadata.get_all(id))
1484 }
1485
1486 fn set_metadata(&mut self, id: EntityId, metadata: Metadata) -> Result<(), SegmentError> {
1487 if !self.state.is_writable() {
1488 return Err(SegmentError::NotWritable);
1489 }
1490
1491 if !self.has_live_entity(id) {
1492 return Err(SegmentError::NotFound(id));
1493 }
1494
1495 self.metadata.set_all(id, &metadata);
1496 Ok(())
1497 }
1498
1499 fn seal(&mut self) -> Result<(), SegmentError> {
1500 if self.state != SegmentState::Growing {
1501 return Err(SegmentError::InvalidState(self.state));
1502 }
1503
1504 self.state = SegmentState::Sealing;
1505
1506 let memtable_stats = self.memtable.stats();
1508 if memtable_stats.entry_count > 0 {
1509 self.memtable.clear();
1512 }
1513
1514 self.rebuild_sealed_col_zones();
1520
1521 self.state = SegmentState::Sealed;
1522 Ok(())
1523 }
1524
1525 fn should_seal(&self, config: &SegmentConfig) -> bool {
1526 if self.entities.len() >= config.max_entities {
1528 return true;
1529 }
1530
1531 if self.memory_bytes.load(Ordering::Relaxed) as usize >= config.max_bytes {
1533 return true;
1534 }
1535
1536 if self.age_secs() >= config.max_age_secs {
1538 return true;
1539 }
1540
1541 false
1542 }
1543
1544 fn iter(&self) -> Box<dyn Iterator<Item = &UnifiedEntity> + '_> {
1545 let base: Box<dyn Iterator<Item = &UnifiedEntity>> = if self.use_flat {
1546 Box::new(self.flat_entities.iter().chain(self.entities.values()))
1550 } else {
1551 Box::new(self.entities.values())
1552 };
1553 if self.deleted.is_empty() {
1554 base
1555 } else {
1556 Box::new(base.filter(|e| !self.deleted.contains(&e.id)))
1557 }
1558 }
1559
1560 fn iter_kind(&self, kind_filter: &str) -> Box<dyn Iterator<Item = &UnifiedEntity> + '_> {
1561 let ids = self.kind_index.get(kind_filter).cloned();
1562 let flat: Box<dyn Iterator<Item = &UnifiedEntity>> = if self.use_flat {
1565 Box::new(self.flat_entities.iter())
1566 } else {
1567 Box::new(std::iter::empty())
1568 };
1569 Box::new(flat.chain(self.entities.values()).filter(move |e| {
1570 if self.deleted.contains(&e.id) {
1571 return false;
1572 }
1573 if let Some(ref ids) = ids {
1574 ids.contains(&e.id)
1575 } else {
1576 false
1577 }
1578 }))
1579 }
1580
1581 fn filter_metadata(
1582 &self,
1583 filters: &[(String, super::metadata::MetadataFilter)],
1584 ) -> Vec<EntityId> {
1585 let flat_ids: Box<dyn Iterator<Item = EntityId> + '_> = if self.use_flat {
1588 Box::new(self.flat_entities.iter().map(|e| e.id))
1589 } else {
1590 Box::new(std::iter::empty())
1591 };
1592 flat_ids
1593 .chain(self.entities.keys().copied())
1594 .filter(|id| {
1595 if self.deleted.contains(id) {
1596 return false;
1597 }
1598 let metadata = self.metadata.get_all(*id);
1599 metadata.matches_all(filters)
1600 })
1601 .collect()
1602 }
1603}
1604
1605fn build_minmax_multi_intervals(
1606 entries: &[(CanonicalKey, Value)],
1607 max_intervals: usize,
1608) -> Vec<ColZone> {
1609 if entries.is_empty() {
1610 return Vec::new();
1611 }
1612 if entries.len() == 1 || max_intervals <= 1 {
1613 return vec![ColZone::with_bounds(
1614 entries[0].1.clone(),
1615 entries[entries.len() - 1].1.clone(),
1616 )];
1617 }
1618
1619 let mut split_points = if entries.len() <= max_intervals {
1620 (1..entries.len()).collect::<Vec<_>>()
1621 } else {
1622 let target_splits = max_intervals - 1;
1623 let mut selected = select_gap_split_points(entries, target_splits);
1624 if selected.len() < target_splits {
1625 for bucket in 1..max_intervals {
1626 let idx = bucket * entries.len() / max_intervals;
1627 if idx == 0 || idx >= entries.len() || selected.contains(&idx) {
1628 continue;
1629 }
1630 selected.push(idx);
1631 if selected.len() >= target_splits {
1632 break;
1633 }
1634 }
1635 }
1636 selected.sort_unstable();
1637 selected.dedup();
1638 selected
1639 };
1640
1641 split_points.push(entries.len());
1642
1643 let mut out = Vec::with_capacity(split_points.len());
1644 let mut start = 0usize;
1645 for end in split_points {
1646 if end <= start {
1647 continue;
1648 }
1649 out.push(ColZone::with_bounds(
1650 entries[start].1.clone(),
1651 entries[end - 1].1.clone(),
1652 ));
1653 start = end;
1654 }
1655
1656 if out.is_empty() {
1657 out.push(ColZone::with_bounds(
1658 entries[0].1.clone(),
1659 entries[entries.len() - 1].1.clone(),
1660 ));
1661 }
1662
1663 out
1664}
1665
1666fn select_gap_split_points(entries: &[(CanonicalKey, Value)], max_splits: usize) -> Vec<usize> {
1667 let mut gaps = Vec::new();
1668 for idx in 1..entries.len() {
1669 if let Some(score) = canonical_gap_score(&entries[idx - 1].0, &entries[idx].0) {
1670 if score > 0.0 {
1671 gaps.push((score, idx));
1672 }
1673 }
1674 }
1675 gaps.sort_by(|left, right| {
1676 right
1677 .0
1678 .partial_cmp(&left.0)
1679 .unwrap_or(std::cmp::Ordering::Equal)
1680 .then_with(|| left.1.cmp(&right.1))
1681 });
1682 gaps.into_iter()
1683 .take(max_splits)
1684 .map(|(_, idx)| idx)
1685 .collect()
1686}
1687
1688fn canonical_gap_score(left: &CanonicalKey, right: &CanonicalKey) -> Option<f64> {
1689 if left.family() != right.family() {
1690 return None;
1691 }
1692 match (left, right) {
1693 (CanonicalKey::Signed(_, l), CanonicalKey::Signed(_, r)) => {
1694 Some(r.saturating_sub(*l) as f64)
1695 }
1696 (CanonicalKey::Unsigned(_, l), CanonicalKey::Unsigned(_, r)) => {
1697 Some(r.saturating_sub(*l) as f64)
1698 }
1699 (CanonicalKey::Float(l), CanonicalKey::Float(r)) => {
1700 Some((f64::from_bits(*r) - f64::from_bits(*l)).abs())
1701 }
1702 _ => None,
1703 }
1704}
1705
1706#[cfg(test)]
1707mod tests {
1708 use super::*;
1709 use crate::storage::schema::Value;
1710 use crate::storage::unified::entity::RowData;
1711 use crate::storage::unified::MetadataValue;
1712
1713 #[test]
1714 fn test_growing_segment_basic() {
1715 let mut segment = GrowingSegment::new(1, "test");
1716
1717 let entity = UnifiedEntity::table_row(
1718 EntityId::new(1),
1719 "users",
1720 1,
1721 vec![Value::text("Alice".to_string())],
1722 );
1723
1724 let id = segment.insert(entity).unwrap();
1725 assert_eq!(id, EntityId::new(1));
1726 assert!(segment.contains(id));
1727
1728 let stats = segment.stats();
1729 assert_eq!(stats.entity_count, 1);
1730 assert_eq!(stats.row_count, 1);
1731 }
1732
1733 #[test]
1734 fn test_segment_metadata() {
1735 let mut segment = GrowingSegment::new(1, "test");
1736
1737 let entity = UnifiedEntity::table_row(
1738 EntityId::new(1),
1739 "users",
1740 1,
1741 vec![Value::text("Alice".to_string())],
1742 );
1743 segment.insert(entity).unwrap();
1744
1745 let mut meta = Metadata::new();
1746 meta.set("role", MetadataValue::String("admin".to_string()));
1747 meta.set("level", MetadataValue::Int(5));
1748
1749 segment.set_metadata(EntityId::new(1), meta).unwrap();
1750
1751 let retrieved = segment.get_metadata(EntityId::new(1)).unwrap();
1752 assert_eq!(
1753 retrieved.get("role"),
1754 Some(&MetadataValue::String("admin".to_string()))
1755 );
1756 }
1757
1758 #[test]
1759 fn test_segment_seal() {
1760 let mut segment = GrowingSegment::new(1, "test");
1761
1762 let entity = UnifiedEntity::vector(EntityId::new(1), "embeddings", vec![0.1, 0.2, 0.3]);
1763 segment.insert(entity).unwrap();
1764
1765 assert!(segment.state().is_writable());
1767
1768 segment.seal().unwrap();
1770 assert_eq!(segment.state(), SegmentState::Sealed);
1771
1772 let entity2 = UnifiedEntity::vector(EntityId::new(2), "embeddings", vec![0.4, 0.5, 0.6]);
1774 assert!(segment.insert(entity2).is_err());
1775 }
1776
1777 #[test]
1778 fn test_should_seal() {
1779 let mut segment = GrowingSegment::new(1, "test");
1780
1781 let config = SegmentConfig {
1782 max_entities: 2,
1783 ..Default::default()
1784 };
1785
1786 assert!(!segment.should_seal(&config));
1787
1788 segment
1789 .insert(UnifiedEntity::vector(EntityId::new(1), "v", vec![0.1]))
1790 .unwrap();
1791 assert!(!segment.should_seal(&config));
1792
1793 segment
1794 .insert(UnifiedEntity::vector(EntityId::new(2), "v", vec![0.2]))
1795 .unwrap();
1796 assert!(segment.should_seal(&config));
1797 }
1798
1799 #[test]
1800 fn test_cross_references() {
1801 let mut segment = GrowingSegment::new(1, "test");
1802
1803 let mut entity1 = UnifiedEntity::table_row(
1804 EntityId::new(1),
1805 "hosts",
1806 1,
1807 vec![Value::text("192.168.1.1".to_string())],
1808 );
1809 entity1.add_cross_ref(CrossRef::new(
1810 EntityId::new(1),
1811 EntityId::new(2),
1812 "nodes",
1813 RefType::RowToNode,
1814 ));
1815 segment.insert(entity1).unwrap();
1816
1817 let refs_from = segment.get_references_from(EntityId::new(1));
1818 assert_eq!(refs_from.len(), 1);
1819 assert_eq!(refs_from[0], (EntityId::new(2), RefType::RowToNode));
1820
1821 let refs_to = segment.get_references_to(EntityId::new(2));
1822 assert_eq!(refs_to.len(), 1);
1823 assert_eq!(refs_to[0], (EntityId::new(1), RefType::RowToNode));
1824 }
1825
1826 #[test]
1827 fn test_zone_predicate_uses_canonical_fallback_for_email_values() {
1828 let mut zone = ColZone::new(Value::Email("bravo@example.com".to_string()));
1829 zone.update(&Value::Email("delta@example.com".to_string()));
1830
1831 let probe = Value::Email("alpha@example.com".to_string());
1832 assert!(ZoneColPred::Eq(&probe).can_skip(&zone));
1833
1834 let in_range = Value::Email("charlie@example.com".to_string());
1835 assert!(!ZoneColPred::Eq(&in_range).can_skip(&zone));
1836 }
1837
1838 #[test]
1839 fn test_sealed_multi_zone_prunes_numeric_gap_outlier() {
1840 let mut segment = GrowingSegment::new(1, "test");
1841
1842 for (row_id, age) in [(1_u64, 1_i64), (2, 2), (3, 3), (4, 1000)] {
1843 let entity = UnifiedEntity::new(
1844 EntityId::new(row_id),
1845 EntityKind::TableRow {
1846 table: "users".into(),
1847 row_id,
1848 },
1849 EntityData::Row(RowData::with_names(
1850 vec![Value::Integer(age)],
1851 vec!["age".to_string()],
1852 )),
1853 );
1854 segment.insert(entity).unwrap();
1855 }
1856
1857 segment.seal().unwrap();
1858
1859 let miss = Value::Integer(500);
1860 assert!(segment.can_skip_zone_preds(&[("age", ZoneColPred::Eq(&miss))]));
1861
1862 let hit = Value::Integer(1000);
1863 assert!(!segment.can_skip_zone_preds(&[("age", ZoneColPred::Eq(&hit))]));
1864 }
1865}