1use std::collections::{BTreeMap, HashMap, HashSet};
20use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
21use std::time::{SystemTime, UNIX_EPOCH};
22
23use super::entity::{CrossRef, EntityData, EntityId, EntityKind, RefType, UnifiedEntity};
24use super::memtable::Memtable;
25use super::metadata::{Metadata, MetadataStorage};
26use crate::storage::primitives::bloom::BloomFilter;
27use crate::storage::query::value_compare::partial_compare_values;
28use crate::storage::schema::{value_to_canonical_key, CanonicalKey, Value};
29
30pub type SegmentId = u64;
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35pub enum SegmentState {
36 Growing,
38 Sealing,
40 Sealed,
42 Flushed,
44 Archived,
46}
47
48impl SegmentState {
49 pub fn is_writable(&self) -> bool {
51 matches!(self, Self::Growing)
52 }
53
54 pub fn is_queryable(&self) -> bool {
56 !matches!(self, Self::Sealing)
57 }
58
59 pub fn is_immutable(&self) -> bool {
61 matches!(self, Self::Sealed | Self::Flushed | Self::Archived)
62 }
63}
64
65#[derive(Debug, Clone)]
67pub struct SegmentConfig {
68 pub max_entities: usize,
70 pub max_bytes: usize,
72 pub max_age_secs: u64,
74 pub build_vector_index: bool,
76 pub build_graph_index: bool,
78 pub compression_level: u8,
80}
81
82impl Default for SegmentConfig {
83 fn default() -> Self {
84 Self {
85 max_entities: 100_000,
86 max_bytes: 256 * 1024 * 1024, max_age_secs: 3600, build_vector_index: true,
89 build_graph_index: true,
90 compression_level: 6,
91 }
92 }
93}
94
95#[derive(Debug, Clone, Default)]
97pub struct SegmentStats {
98 pub entity_count: usize,
100 pub deleted_count: usize,
102 pub memory_bytes: usize,
104 pub vector_count: usize,
106 pub node_count: usize,
108 pub edge_count: usize,
110 pub row_count: usize,
112 pub cross_ref_count: usize,
114}
115
116#[derive(Debug, Clone)]
118pub enum SegmentError {
119 NotWritable,
121 NotFound(EntityId),
123 AlreadyExists(EntityId),
125 Full,
127 InvalidState(SegmentState),
129 Internal(String),
131}
132
133impl std::fmt::Display for SegmentError {
134 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
135 match self {
136 Self::NotWritable => write!(f, "segment is not writable"),
137 Self::NotFound(id) => write!(f, "entity not found: {}", id),
138 Self::AlreadyExists(id) => write!(f, "entity already exists: {}", id),
139 Self::Full => write!(f, "segment is full"),
140 Self::InvalidState(state) => write!(f, "invalid operation for state: {:?}", state),
141 Self::Internal(msg) => write!(f, "internal error: {}", msg),
142 }
143 }
144}
145
146impl std::error::Error for SegmentError {}
147
148fn current_unix_secs() -> u64 {
149 SystemTime::now()
150 .duration_since(UNIX_EPOCH)
151 .unwrap_or_default()
152 .as_secs()
153}
154
155const SEALED_MULTI_ZONE_MAX_INTERVALS: usize = 4;
156
157#[derive(Debug, Clone)]
158struct UpdateIndexSnapshot {
159 pk_column_name: Option<String>,
160 pk_value: Option<Value>,
161 pk_index_key: Option<(String, String)>,
162 cross_refs: Vec<CrossRef>,
163}
164
165impl UpdateIndexSnapshot {
166 fn from_entity(entity: &UnifiedEntity) -> Self {
167 let (pk_column_name, pk_value) = match &entity.data {
168 EntityData::Row(row) => (
169 row.schema
170 .as_deref()
171 .and_then(|schema| schema.first().cloned()),
172 row.columns.first().cloned(),
173 ),
174 _ => (None, None),
175 };
176 let pk_index_key = pk_value
177 .as_ref()
178 .map(|value| (entity.kind.collection().to_string(), format!("{:?}", value)));
179 Self {
180 pk_column_name,
181 pk_value,
182 pk_index_key,
183 cross_refs: entity.cross_refs().to_vec(),
184 }
185 }
186}
187
188pub trait UnifiedSegment: Send + Sync {
190 fn id(&self) -> SegmentId;
192
193 fn state(&self) -> SegmentState;
195
196 fn collection(&self) -> &str;
198
199 fn stats(&self) -> SegmentStats;
201
202 fn entity_count(&self) -> usize;
204
205 fn contains(&self, id: EntityId) -> bool;
207
208 fn get(&self, id: EntityId) -> Option<&UnifiedEntity>;
210
211 fn get_mut(&mut self, id: EntityId) -> Option<&mut UnifiedEntity>;
213
214 fn insert(&mut self, entity: UnifiedEntity) -> Result<EntityId, SegmentError>;
216
217 fn update(&mut self, entity: UnifiedEntity) -> Result<(), SegmentError>;
219
220 fn update_hot(
224 &mut self,
225 entity: UnifiedEntity,
226 modified_columns: &[String],
227 ) -> Result<(), SegmentError> {
228 let _ = modified_columns;
229 self.update(entity)
230 }
231
232 fn delete(&mut self, id: EntityId) -> Result<bool, SegmentError>;
234
235 fn get_metadata(&self, id: EntityId) -> Option<Metadata>;
237
238 fn set_metadata(&mut self, id: EntityId, metadata: Metadata) -> Result<(), SegmentError>;
240
241 fn seal(&mut self) -> Result<(), SegmentError>;
243
244 fn should_seal(&self, config: &SegmentConfig) -> bool;
246
247 fn iter(&self) -> Box<dyn Iterator<Item = &UnifiedEntity> + '_>;
249
250 fn iter_kind(&self, kind_filter: &str) -> Box<dyn Iterator<Item = &UnifiedEntity> + '_>;
252
253 fn filter_metadata(
255 &self,
256 filters: &[(String, super::metadata::MetadataFilter)],
257 ) -> Vec<EntityId>;
258}
259
260#[derive(Debug, Clone)]
267pub struct ColZone {
268 pub min: Value,
269 pub max: Value,
270 min_key: Option<CanonicalKey>,
271 max_key: Option<CanonicalKey>,
272}
273
274impl ColZone {
275 fn new(v: Value) -> Self {
276 Self {
277 min_key: value_to_canonical_key(&v),
278 max_key: value_to_canonical_key(&v),
279 min: v.clone(),
280 max: v,
281 }
282 }
283
284 fn with_bounds(min: Value, max: Value) -> Self {
285 Self {
286 min_key: value_to_canonical_key(&min),
287 max_key: value_to_canonical_key(&max),
288 min,
289 max,
290 }
291 }
292
293 fn update(&mut self, v: &Value) {
294 if compare_zone_values(v, None, &self.min, self.min_key.as_ref())
295 .map(|o| o == std::cmp::Ordering::Less)
296 .unwrap_or(false)
297 {
298 self.min = v.clone();
299 self.min_key = value_to_canonical_key(v);
300 }
301 if compare_zone_values(v, None, &self.max, self.max_key.as_ref())
302 .map(|o| o == std::cmp::Ordering::Greater)
303 .unwrap_or(false)
304 {
305 self.max = v.clone();
306 self.max_key = value_to_canonical_key(v);
307 }
308 }
309}
310
311#[derive(Debug, Clone, Default)]
312pub struct MultiColZone {
313 pub intervals: Vec<ColZone>,
314}
315
316impl MultiColZone {
317 fn can_skip(&self, pred: &ZoneColPred<'_>) -> bool {
318 !self.intervals.is_empty() && self.intervals.iter().all(|zone| pred.can_skip(zone))
319 }
320}
321
322fn compare_zone_values(
323 left: &Value,
324 left_key: Option<&CanonicalKey>,
325 right: &Value,
326 right_key: Option<&CanonicalKey>,
327) -> Option<std::cmp::Ordering> {
328 partial_compare_values(left, right).or_else(|| {
329 let left_key = left_key.cloned().or_else(|| value_to_canonical_key(left))?;
330 let right_key = right_key
331 .cloned()
332 .or_else(|| value_to_canonical_key(right))?;
333 (left_key.family() == right_key.family()).then(|| left_key.cmp(&right_key))
334 })
335}
336
337#[derive(Debug, Clone, Copy, PartialEq, Eq)]
339pub enum ZoneColPredKind {
340 Eq,
341 Gt,
342 Gte,
343 Lt,
344 Lte,
345}
346
347#[derive(Debug, Clone)]
349pub enum ZoneColPred<'a> {
350 Eq(&'a Value),
351 Gt(&'a Value),
352 Gte(&'a Value),
353 Lt(&'a Value),
354 Lte(&'a Value),
355}
356
357impl<'a> ZoneColPred<'a> {
358 pub fn can_skip(&self, zone: &ColZone) -> bool {
360 match self {
361 ZoneColPred::Eq(val) => {
363 compare_zone_values(val, None, &zone.min, zone.min_key.as_ref())
364 .map(|o| o == std::cmp::Ordering::Less)
365 .unwrap_or(false)
366 || compare_zone_values(val, None, &zone.max, zone.max_key.as_ref())
367 .map(|o| o == std::cmp::Ordering::Greater)
368 .unwrap_or(false)
369 }
370 ZoneColPred::Gt(val) => {
372 compare_zone_values(&zone.max, zone.max_key.as_ref(), val, None)
373 .map(|o| o != std::cmp::Ordering::Greater)
374 .unwrap_or(false)
375 }
376 ZoneColPred::Gte(val) => {
378 compare_zone_values(&zone.max, zone.max_key.as_ref(), val, None)
379 .map(|o| o == std::cmp::Ordering::Less)
380 .unwrap_or(false)
381 }
382 ZoneColPred::Lt(val) => {
384 compare_zone_values(&zone.min, zone.min_key.as_ref(), val, None)
385 .map(|o| o != std::cmp::Ordering::Less)
386 .unwrap_or(false)
387 }
388 ZoneColPred::Lte(val) => {
390 compare_zone_values(&zone.min, zone.min_key.as_ref(), val, None)
391 .map(|o| o == std::cmp::Ordering::Greater)
392 .unwrap_or(false)
393 }
394 }
395 }
396}
397
398pub struct GrowingSegment {
400 id: SegmentId,
402 collection: String,
404 state: SegmentState,
406 created_at: u64,
408 last_write_at: u64,
410
411 entities: HashMap<EntityId, UnifiedEntity>,
413 flat_entities: Vec<UnifiedEntity>,
416 base_entity_id: u64,
418 use_flat: bool,
420 deleted: HashSet<EntityId>,
422 metadata: MetadataStorage,
424
425 pk_index: BTreeMap<(String, String), EntityId>,
427 kind_index: HashMap<String, HashSet<EntityId>>,
429 cross_ref_forward: HashMap<EntityId, Vec<(EntityId, RefType)>>,
431 cross_ref_reverse: HashMap<EntityId, Vec<(EntityId, RefType)>>,
433
434 bloom: BloomFilter,
436
437 memtable: Memtable,
439
440 col_zones: HashMap<String, ColZone>,
442 sealed_col_zones: HashMap<String, MultiColZone>,
444
445 sequence: AtomicU64,
447 memory_bytes: AtomicU64,
449
450 pub(crate) published_flat_len: AtomicUsize,
457}
458
459impl GrowingSegment {
460 #[inline]
463 pub fn for_each_fast<F>(&self, mut f: F) -> bool
464 where
465 F: FnMut(&UnifiedEntity) -> bool,
466 {
467 if self.use_flat {
468 if self.deleted.is_empty() {
470 for entity in &self.flat_entities {
471 if !f(entity) {
472 return false;
473 }
474 }
475 for entity in self.entities.values() {
478 if !f(entity) {
479 return false;
480 }
481 }
482 } else {
483 for entity in &self.flat_entities {
484 if self.deleted.contains(&entity.id) {
485 continue;
486 }
487 if !f(entity) {
488 return false;
489 }
490 }
491 for entity in self.entities.values() {
492 if self.deleted.contains(&entity.id) {
493 continue;
494 }
495 if !f(entity) {
496 return false;
497 }
498 }
499 }
500 } else {
501 if self.deleted.is_empty() {
503 for entity in self.entities.values() {
504 if !f(entity) {
505 return false;
506 }
507 }
508 } else {
509 for entity in self.entities.values() {
510 if self.deleted.contains(&entity.id) {
511 continue;
512 }
513 if !f(entity) {
514 return false;
515 }
516 }
517 }
518 }
519 true
520 }
521
522 pub fn new(id: SegmentId, collection: impl Into<String>) -> Self {
524 let now = current_unix_secs();
525
526 Self {
527 id,
528 collection: collection.into(),
529 state: SegmentState::Growing,
530 created_at: now,
531 last_write_at: now,
532 entities: HashMap::new(),
533 flat_entities: Vec::new(),
534 base_entity_id: 0,
535 use_flat: false,
536 deleted: HashSet::new(),
537 metadata: MetadataStorage::new(),
538 pk_index: BTreeMap::new(),
539 kind_index: HashMap::new(),
540 cross_ref_forward: HashMap::new(),
541 cross_ref_reverse: HashMap::new(),
542 bloom: BloomFilter::with_capacity(100_000, 0.01),
543 memtable: Memtable::new(),
544 col_zones: HashMap::new(),
545 sealed_col_zones: HashMap::new(),
546 sequence: AtomicU64::new(0),
547 memory_bytes: AtomicU64::new(0),
548 published_flat_len: AtomicUsize::new(0),
549 }
550 }
551
552 fn next_sequence(&self) -> u64 {
554 self.sequence.fetch_add(1, Ordering::SeqCst)
555 }
556
557 fn has_live_entity(&self, id: EntityId) -> bool {
558 if self.deleted.contains(&id) {
559 return false;
560 }
561 if self.use_flat {
562 let raw = id.raw();
563 if raw >= self.base_entity_id {
564 let idx = (raw - self.base_entity_id) as usize;
565 if self
566 .flat_entities
567 .get(idx)
568 .is_some_and(|entity| entity.id == id)
569 {
570 return true;
571 }
572 }
573 self.entities.contains_key(&id)
576 } else {
577 self.entities.contains_key(&id)
578 }
579 }
580
581 fn update_existing_entity_in_place(
582 &mut self,
583 entity: &UnifiedEntity,
584 ) -> Result<UpdateIndexSnapshot, SegmentError> {
585 if self.use_flat {
586 let raw = entity.id.raw();
587 if raw < self.base_entity_id {
588 return Err(SegmentError::NotFound(entity.id));
589 }
590 let idx = (raw - self.base_entity_id) as usize;
591 let Some(slot) = self.flat_entities.get_mut(idx) else {
592 return Err(SegmentError::NotFound(entity.id));
593 };
594 if slot.id != entity.id {
595 return Err(SegmentError::NotFound(entity.id));
596 }
597 let snapshot = UpdateIndexSnapshot::from_entity(slot);
598 slot.clone_from(entity);
599 Ok(snapshot)
600 } else {
601 let Some(slot) = self.entities.get_mut(&entity.id) else {
602 return Err(SegmentError::NotFound(entity.id));
603 };
604 let snapshot = UpdateIndexSnapshot::from_entity(slot);
605 slot.clone_from(entity);
606 Ok(snapshot)
607 }
608 }
609
610 fn apply_hot_update_with_metadata(
611 &mut self,
612 entity: &UnifiedEntity,
613 modified_columns: &[String],
614 metadata: Option<&Metadata>,
615 ) -> Result<(), SegmentError> {
616 let old = self.update_existing_entity_in_place(entity)?;
617 self.reindex_for_update(&old, entity, Some(modified_columns));
618 self.update_col_zones_from_entity(entity);
619 if let Some(metadata) = metadata {
620 self.metadata.set_all(entity.id, metadata);
621 }
622 Ok(())
623 }
624
625 fn apply_update_with_metadata(
626 &mut self,
627 entity: &UnifiedEntity,
628 metadata: Option<&Metadata>,
629 ) -> Result<(), SegmentError> {
630 let old = self.update_existing_entity_in_place(entity)?;
631 self.reindex_for_update(&old, entity, None);
632 self.update_col_zones_from_entity(entity);
633 if let Some(metadata) = metadata {
634 self.metadata.set_all(entity.id, metadata);
635 }
636 Ok(())
637 }
638
639 pub fn update_hot_batch_with_metadata<'a, I>(&mut self, items: I) -> Result<(), SegmentError>
640 where
641 I: IntoIterator<Item = (&'a UnifiedEntity, &'a [String], Option<&'a Metadata>)>,
642 {
643 if !self.state.is_writable() {
644 return Err(SegmentError::NotWritable);
645 }
646
647 let items: Vec<(&UnifiedEntity, &[String], Option<&Metadata>)> =
648 items.into_iter().collect();
649 if items.is_empty() {
650 return Ok(());
651 }
652
653 for (entity, _, _) in &items {
654 if !self.has_live_entity(entity.id) {
655 return Err(SegmentError::NotFound(entity.id));
656 }
657 }
658
659 for (entity, modified_columns, metadata) in items {
660 self.apply_hot_update_with_metadata(entity, modified_columns, metadata)?;
661 }
662
663 self.last_write_at = current_unix_secs();
664 Ok(())
665 }
666
667 pub fn delete_batch(&mut self, ids: &[EntityId]) -> Result<Vec<EntityId>, SegmentError> {
668 if !self.state.is_writable() {
669 return Err(SegmentError::NotWritable);
670 }
671 if ids.is_empty() {
672 return Ok(Vec::new());
673 }
674
675 let mut deleted_ids = Vec::with_capacity(ids.len());
676
677 if self.use_flat {
678 for &id in ids {
679 let raw = id.raw();
680 if raw < self.base_entity_id {
681 if let Some(entity) = self.entities.remove(&id) {
682 self.unindex_entity(&entity);
683 self.metadata.remove_all(id);
684 self.deleted.insert(id);
685 deleted_ids.push(id);
686 }
687 continue;
688 }
689 let idx = (raw - self.base_entity_id) as usize;
690 if idx < self.flat_entities.len() && self.flat_entities[idx].id == id {
691 if !self.deleted.contains(&id) {
692 self.metadata.remove_all(id);
693 self.deleted.insert(id);
694 deleted_ids.push(id);
695 }
696 } else if let Some(entity) = self.entities.remove(&id) {
697 self.unindex_entity(&entity);
698 self.metadata.remove_all(id);
699 self.deleted.insert(id);
700 deleted_ids.push(id);
701 }
702 }
703 } else {
704 for &id in ids {
705 if let Some(entity) = self.entities.remove(&id) {
706 self.unindex_entity(&entity);
707 self.metadata.remove_all(id);
708 self.deleted.insert(id);
709 deleted_ids.push(id);
710 }
711 }
712 }
713
714 if !deleted_ids.is_empty() {
715 self.last_write_at = current_unix_secs();
716 }
717
718 Ok(deleted_ids)
719 }
720
721 fn add_memory(&self, bytes: usize) {
723 self.memory_bytes.fetch_add(bytes as u64, Ordering::Relaxed);
724 }
725
726 fn estimate_entity_size(entity: &UnifiedEntity) -> usize {
728 let mut size = std::mem::size_of::<UnifiedEntity>();
729
730 size += match &entity.data {
732 EntityData::Row(row) => row.columns.len() * 64, EntityData::Node(node) => node.properties.len() * 128,
734 EntityData::Edge(edge) => edge.properties.len() * 128,
735 EntityData::Vector(vec) => {
736 vec.dense.len() * 4 + vec.sparse.as_ref().map_or(0, |s| s.indices.len() * 8)
737 }
738 EntityData::TimeSeries(_) => 64,
739 EntityData::QueueMessage(_) => 128,
740 };
741
742 for emb in entity.embeddings() {
744 size += emb.vector.len() * 4 + emb.name.len() + emb.model.len();
745 }
746
747 size += std::mem::size_of_val(entity.cross_refs());
749
750 size
751 }
752
753 fn update_col_zones_from_entity(&mut self, entity: &UnifiedEntity) {
761 if let EntityData::Row(row) = &entity.data {
762 if let Some(named) = &row.named {
763 for (col, val) in named {
765 if matches!(val, Value::Null) {
766 continue;
767 }
768 self.col_zones
769 .entry(col.clone())
770 .and_modify(|z| z.update(val))
771 .or_insert_with(|| ColZone::new(val.clone()));
772 }
773 } else if let Some(schema) = &row.schema {
774 for (col, val) in schema.iter().zip(row.columns.iter()) {
777 if matches!(val, Value::Null) {
778 continue;
779 }
780 self.col_zones
781 .entry(col.clone())
782 .and_modify(|z| z.update(val))
783 .or_insert_with(|| ColZone::new(val.clone()));
784 }
785 }
786 }
787 }
788
789 fn rebuild_sealed_col_zones(&mut self) {
790 let mut values_by_col: HashMap<String, Vec<(CanonicalKey, Value)>> = HashMap::new();
791 let mut family_by_col: HashMap<String, crate::storage::schema::CanonicalKeyFamily> =
792 HashMap::new();
793 let mut mixed_family_cols = HashSet::new();
794 let mut unsupported_cols = HashSet::new();
795
796 let mut observe_row = |row: &super::entity::RowData| {
797 for (col, value) in row.iter_fields() {
798 if matches!(value, Value::Null) {
799 continue;
800 }
801 let Some(key) = value_to_canonical_key(value) else {
802 unsupported_cols.insert(col.to_string());
803 continue;
804 };
805 match family_by_col.get(col).copied() {
806 Some(existing) if existing != key.family() => {
807 mixed_family_cols.insert(col.to_string());
808 }
809 None => {
810 family_by_col.insert(col.to_string(), key.family());
811 }
812 _ => {}
813 }
814 values_by_col
815 .entry(col.to_string())
816 .or_default()
817 .push((key, value.clone()));
818 }
819 };
820
821 if self.use_flat {
822 for entity in &self.flat_entities {
823 if self.deleted.contains(&entity.id) {
824 continue;
825 }
826 if let EntityData::Row(row) = &entity.data {
827 observe_row(row);
828 }
829 }
830 } else {
831 for entity in self.entities.values() {
832 if self.deleted.contains(&entity.id) {
833 continue;
834 }
835 if let EntityData::Row(row) = &entity.data {
836 observe_row(row);
837 }
838 }
839 }
840
841 let mut sealed_col_zones = HashMap::new();
842 for (col, mut entries) in values_by_col {
843 if mixed_family_cols.contains(&col)
844 || unsupported_cols.contains(&col)
845 || entries.is_empty()
846 {
847 continue;
848 }
849 entries.sort_unstable_by(|left, right| left.0.cmp(&right.0));
850 entries.dedup_by(|left, right| left.0 == right.0);
851
852 let intervals = build_minmax_multi_intervals(&entries, SEALED_MULTI_ZONE_MAX_INTERVALS);
853 if intervals.len() > 1 {
854 sealed_col_zones.insert(col, MultiColZone { intervals });
855 }
856 }
857
858 self.sealed_col_zones = sealed_col_zones;
859 }
860
861 pub fn can_skip_zone_preds(&self, preds: &[(&str, ZoneColPred<'_>)]) -> bool {
865 if preds.is_empty() {
866 return false;
867 }
868 for (col, pred) in preds {
869 if let Some(zone) = self.sealed_col_zones.get(*col) {
870 if zone.can_skip(pred) {
871 return true;
872 }
873 continue;
874 }
875 if let Some(zone) = self.col_zones.get(*col) {
876 if pred.can_skip(zone) {
877 return true; }
879 }
880 }
881 false
882 }
883
884 fn index_entity(&mut self, entity: &UnifiedEntity) {
886 let kind_key = entity.kind.storage_type().to_string();
888 self.kind_index
889 .entry(kind_key)
890 .or_default()
891 .insert(entity.id);
892
893 let id_bytes = entity.id.raw().to_le_bytes();
895 self.bloom.insert(&id_bytes);
896
897 if let EntityData::Row(row) = &entity.data {
899 if let Some(first_col) = row.columns.first() {
900 let pk_str = format!("{:?}", first_col);
901 self.bloom.insert(pk_str.as_bytes());
903 self.pk_index
904 .insert((entity.kind.collection().to_string(), pk_str), entity.id);
905 }
906 }
907
908 for cross_ref in entity.cross_refs() {
910 self.cross_ref_forward
911 .entry(cross_ref.source)
912 .or_default()
913 .push((cross_ref.target, cross_ref.ref_type));
914
915 self.cross_ref_reverse
916 .entry(cross_ref.target)
917 .or_default()
918 .push((cross_ref.source, cross_ref.ref_type));
919 }
920 }
921
922 pub fn bloom_might_contain_id(&self, id: EntityId) -> bool {
925 let id_bytes = id.raw().to_le_bytes();
926 self.bloom.contains(&id_bytes)
927 }
928
929 pub fn bloom_might_contain_key(&self, key: &[u8]) -> bool {
931 self.bloom.contains(key)
932 }
933
934 pub fn bloom_stats(&self) -> (f64, u32) {
936 (self.bloom.fill_ratio(), self.bloom.count_set_bits())
937 }
938
939 fn unindex_entity(&mut self, entity: &UnifiedEntity) {
941 let kind_key = entity.kind.storage_type().to_string();
943 if let Some(set) = self.kind_index.get_mut(&kind_key) {
944 set.remove(&entity.id);
945 }
946
947 if let EntityData::Row(row) = &entity.data {
949 if let Some(first_col) = row.columns.first() {
950 let pk_str = format!("{:?}", first_col);
951 self.pk_index
952 .remove(&(entity.kind.collection().to_string(), pk_str));
953 }
954 }
955
956 self.cross_ref_forward.remove(&entity.id);
958 }
960
961 fn reindex_for_update(
973 &mut self,
974 old: &UpdateIndexSnapshot,
975 new: &UnifiedEntity,
976 modified_columns: Option<&[String]>,
977 ) {
978 let pk_changed = match &new.data {
985 EntityData::Row(new_row) => {
986 if let Some(cols) = modified_columns {
987 let pk_col_name = old.pk_column_name.as_deref().or_else(|| {
990 new_row
991 .schema
992 .as_deref()
993 .and_then(|schema| schema.first().map(|name| name.as_str()))
994 });
995 match pk_col_name {
996 Some(pk_name) => cols.iter().any(|c| c.eq_ignore_ascii_case(pk_name)),
997 None => old.pk_value.as_ref() != new_row.columns.first(),
999 }
1000 } else {
1001 old.pk_value.as_ref() != new_row.columns.first()
1002 }
1003 }
1004 _ => false,
1006 };
1007
1008 if pk_changed {
1009 if let Some((collection, pk_str)) = &old.pk_index_key {
1011 self.pk_index.remove(&(collection.clone(), pk_str.clone()));
1012 }
1013 if let EntityData::Row(row) = &new.data {
1015 if let Some(first_col) = row.columns.first() {
1016 let pk_str = format!("{:?}", first_col);
1017 self.bloom.insert(pk_str.as_bytes());
1018 self.pk_index
1019 .insert((new.kind.collection().to_string(), pk_str), new.id);
1020 }
1021 }
1022 }
1023
1024 let new_refs = new.cross_refs();
1026 if old.cross_refs.as_slice() != new_refs {
1027 self.cross_ref_forward.remove(&new.id);
1029 for cross_ref in &old.cross_refs {
1031 if let Some(rev) = self.cross_ref_reverse.get_mut(&cross_ref.target) {
1032 rev.retain(|(src, _)| *src != new.id);
1033 }
1034 }
1035 for cross_ref in new_refs {
1037 self.cross_ref_forward
1038 .entry(cross_ref.source)
1039 .or_default()
1040 .push((cross_ref.target, cross_ref.ref_type));
1041 self.cross_ref_reverse
1042 .entry(cross_ref.target)
1043 .or_default()
1044 .push((cross_ref.source, cross_ref.ref_type));
1045 }
1046 }
1047 }
1048
1049 pub fn get_references_to(&self, id: EntityId) -> Vec<(EntityId, RefType)> {
1051 self.cross_ref_reverse.get(&id).cloned().unwrap_or_default()
1052 }
1053
1054 pub fn get_references_from(&self, id: EntityId) -> Vec<(EntityId, RefType)> {
1056 self.cross_ref_forward.get(&id).cloned().unwrap_or_default()
1057 }
1058
1059 pub fn memtable_stats(&self) -> super::memtable::MemtableStats {
1061 self.memtable.stats()
1062 }
1063
1064 pub fn memtable_should_flush(&self) -> bool {
1066 self.memtable.should_flush()
1067 }
1068
1069 pub fn age_secs(&self) -> u64 {
1071 let now = current_unix_secs();
1072 now.saturating_sub(self.created_at)
1073 }
1074
1075 pub fn idle_secs(&self) -> u64 {
1077 let now = current_unix_secs();
1078 now.saturating_sub(self.last_write_at)
1079 }
1080
1081 pub fn bulk_insert(
1090 &mut self,
1091 entities: Vec<UnifiedEntity>,
1092 ) -> Result<Vec<EntityId>, SegmentError> {
1093 if !self.state.is_writable() {
1094 return Err(SegmentError::NotWritable);
1095 }
1096
1097 let n = entities.len();
1098
1099 let kind_key = if let Some(first) = entities.first() {
1101 first.kind.storage_type().to_string()
1102 } else {
1103 return Ok(Vec::new());
1104 };
1105
1106 let kind_set = self.kind_index.entry(kind_key).or_default();
1107 kind_set.reserve(n);
1108
1109 let now = current_unix_secs();
1110
1111 let base_seq = self.sequence.fetch_add(n as u64, Ordering::Relaxed);
1112
1113 let mut ids = Vec::with_capacity(n);
1114
1115 if self.flat_entities.is_empty() && self.entities.is_empty() {
1117 self.base_entity_id = entities.first().map(|e| e.id.raw()).unwrap_or(0);
1119 self.use_flat = true;
1120 }
1121
1122 let mut columnar_zone_updates: Vec<Vec<Value>> = Vec::new();
1132 let mut columnar_schema: Option<std::sync::Arc<Vec<String>>> = None;
1133 let mut named_zone_updates: Vec<(String, Value)> = Vec::new();
1137
1138 if self.use_flat {
1139 self.flat_entities.reserve(n);
1140 for (i, mut entity) in entities.into_iter().enumerate() {
1141 entity.sequence_id = base_seq + i as u64;
1142 let id = entity.id;
1143 kind_set.insert(id);
1144 ids.push(id);
1145 if let EntityData::Row(row) = &entity.data {
1146 if row.schema.is_some() && !row.columns.is_empty() {
1147 if columnar_zone_updates.is_empty() {
1148 columnar_zone_updates = vec![Vec::with_capacity(n); row.columns.len()];
1149 columnar_schema = row.schema.clone();
1150 }
1151 for (ci, val) in row.columns.iter().enumerate() {
1152 if !matches!(val, Value::Null) {
1153 if let Some(bucket) = columnar_zone_updates.get_mut(ci) {
1154 bucket.push(val.clone());
1155 }
1156 }
1157 }
1158 } else {
1159 for (col, val) in row.iter_fields() {
1160 if !matches!(val, Value::Null) {
1161 named_zone_updates.push((col.to_string(), val.clone()));
1162 }
1163 }
1164 }
1165 }
1166 let expected = self.base_entity_id + self.flat_entities.len() as u64;
1182 if id.raw() == expected {
1183 self.flat_entities.push(entity);
1184 } else {
1185 self.entities.insert(id, entity);
1186 }
1187 }
1188 } else {
1189 self.entities.reserve(n);
1191 let mut pairs = Vec::with_capacity(n);
1192 for (i, mut entity) in entities.into_iter().enumerate() {
1193 entity.sequence_id = base_seq + i as u64;
1194 let id = entity.id;
1195 kind_set.insert(id);
1196 ids.push(id);
1197 if let EntityData::Row(row) = &entity.data {
1198 for (col, val) in row.iter_fields() {
1199 if !matches!(val, Value::Null) {
1200 named_zone_updates.push((col.to_string(), val.clone()));
1201 }
1202 }
1203 }
1204 pairs.push((id, entity));
1205 }
1206 self.entities.extend(pairs);
1207 }
1208
1209 let _ = kind_set;
1213 if !columnar_zone_updates.is_empty() {
1214 let schema = columnar_schema.as_ref();
1215 for (ci, values) in columnar_zone_updates.into_iter().enumerate() {
1216 if values.is_empty() {
1217 continue;
1218 }
1219 let Some(col_name) = schema.and_then(|s| s.get(ci)) else {
1220 continue;
1221 };
1222 let mut iter = values.into_iter();
1228 if let Some(first) = iter.next() {
1229 let zone = self
1230 .col_zones
1231 .entry(col_name.clone())
1232 .and_modify(|z| z.update(&first))
1233 .or_insert_with(|| ColZone::new(first));
1234 for v in iter {
1235 zone.update(&v);
1236 }
1237 }
1238 }
1239 }
1240 for (col, val) in named_zone_updates {
1241 self.col_zones
1242 .entry(col)
1243 .and_modify(|z| z.update(&val))
1244 .or_insert_with(|| ColZone::new(val));
1245 }
1246
1247 self.last_write_at = now;
1248
1249 if self.use_flat {
1251 self.published_flat_len
1252 .store(self.flat_entities.len(), Ordering::Release);
1253 }
1254
1255 Ok(ids)
1256 }
1257
1258 pub(crate) fn force_delete(&mut self, id: EntityId) -> bool {
1261 if self.use_flat {
1262 let raw = id.raw();
1263 if raw >= self.base_entity_id {
1264 let idx = (raw - self.base_entity_id) as usize;
1265 if idx < self.flat_entities.len() && self.flat_entities[idx].id == id {
1266 self.deleted.insert(id);
1267 self.metadata.remove_all(id);
1268 return true;
1269 }
1270 }
1271 return false;
1272 }
1273
1274 if let Some(entity) = self.entities.remove(&id) {
1275 self.unindex_entity(&entity);
1276 self.metadata.remove_all(id);
1277 self.deleted.insert(id);
1278 true
1279 } else {
1280 false
1281 }
1282 }
1283
1284 pub(crate) fn force_update_with_metadata(
1287 &mut self,
1288 entity: &UnifiedEntity,
1289 modified_columns: &[String],
1290 metadata: Option<&Metadata>,
1291 ) -> Result<(), SegmentError> {
1292 self.apply_hot_update_with_metadata(entity, modified_columns, metadata)
1293 }
1294}
1295
1296impl UnifiedSegment for GrowingSegment {
1297 fn id(&self) -> SegmentId {
1298 self.id
1299 }
1300
1301 fn state(&self) -> SegmentState {
1302 self.state
1303 }
1304
1305 fn collection(&self) -> &str {
1306 &self.collection
1307 }
1308
1309 fn stats(&self) -> SegmentStats {
1310 let mut stats = SegmentStats {
1311 entity_count: self.entities.len(),
1312 deleted_count: self.deleted.len(),
1313 memory_bytes: self.memory_bytes.load(Ordering::Relaxed) as usize,
1314 ..Default::default()
1315 };
1316
1317 for entity in self.entities.values() {
1318 match &entity.kind {
1319 EntityKind::TableRow { .. } => stats.row_count += 1,
1320 EntityKind::GraphNode(_) => stats.node_count += 1,
1321 EntityKind::GraphEdge(_) => stats.edge_count += 1,
1322 EntityKind::Vector { .. } => stats.vector_count += 1,
1323 EntityKind::TimeSeriesPoint(_) => stats.row_count += 1,
1324 EntityKind::QueueMessage { .. } => stats.row_count += 1,
1325 }
1326 stats.cross_ref_count += entity.cross_refs().len();
1327 }
1328
1329 stats
1330 }
1331
1332 fn entity_count(&self) -> usize {
1333 let total = if self.use_flat {
1334 self.flat_entities.len()
1335 } else {
1336 self.entities.len()
1337 };
1338 total.saturating_sub(self.deleted.len())
1339 }
1340
1341 fn contains(&self, id: EntityId) -> bool {
1342 self.has_live_entity(id)
1343 }
1344
1345 fn get(&self, id: EntityId) -> Option<&UnifiedEntity> {
1346 if self.deleted.contains(&id) {
1347 return None;
1348 }
1349 if self.use_flat {
1350 let raw = id.raw();
1351 if raw >= self.base_entity_id {
1352 let idx = (raw - self.base_entity_id) as usize;
1353 if let Some(entity) = self.flat_entities.get(idx).filter(|e| e.id == id) {
1354 return Some(entity);
1355 }
1356 }
1357 self.entities.get(&id)
1363 } else {
1364 self.entities.get(&id)
1365 }
1366 }
1367
1368 fn get_mut(&mut self, id: EntityId) -> Option<&mut UnifiedEntity> {
1369 if self.deleted.contains(&id) || !self.state.is_writable() {
1370 return None;
1371 }
1372 if self.use_flat {
1373 let raw = id.raw();
1374 if raw >= self.base_entity_id {
1375 let idx = (raw - self.base_entity_id) as usize;
1376 if self
1377 .flat_entities
1378 .get(idx)
1379 .map(|e| e.id == id)
1380 .unwrap_or(false)
1381 {
1382 return self.flat_entities.get_mut(idx);
1383 }
1384 }
1385 self.entities.get_mut(&id)
1388 } else {
1389 self.entities.get_mut(&id)
1390 }
1391 }
1392
1393 fn insert(&mut self, mut entity: UnifiedEntity) -> Result<EntityId, SegmentError> {
1394 if !self.state.is_writable() {
1395 return Err(SegmentError::NotWritable);
1396 }
1397
1398 if self.entities.contains_key(&entity.id) {
1399 return Err(SegmentError::AlreadyExists(entity.id));
1400 }
1401
1402 entity.sequence_id = self.next_sequence();
1404
1405 let size = Self::estimate_entity_size(&entity);
1407 self.add_memory(size);
1408
1409 self.index_entity(&entity);
1411
1412 self.update_col_zones_from_entity(&entity);
1414
1415 let id = entity.id;
1417 self.entities.insert(id, entity);
1418
1419 self.last_write_at = current_unix_secs();
1421
1422 Ok(id)
1423 }
1424
1425 fn update(&mut self, entity: UnifiedEntity) -> Result<(), SegmentError> {
1426 if !self.state.is_writable() {
1427 return Err(SegmentError::NotWritable);
1428 }
1429
1430 self.apply_update_with_metadata(&entity, None)?;
1431 self.last_write_at = current_unix_secs();
1432
1433 Ok(())
1434 }
1435
1436 fn update_hot(
1437 &mut self,
1438 entity: UnifiedEntity,
1439 modified_columns: &[String],
1440 ) -> Result<(), SegmentError> {
1441 if !self.state.is_writable() {
1442 return Err(SegmentError::NotWritable);
1443 }
1444
1445 self.apply_hot_update_with_metadata(&entity, modified_columns, None)?;
1446 self.last_write_at = current_unix_secs();
1447 Ok(())
1448 }
1449
1450 fn delete(&mut self, id: EntityId) -> Result<bool, SegmentError> {
1451 if !self.state.is_writable() {
1452 return Err(SegmentError::NotWritable);
1453 }
1454
1455 if self.use_flat {
1457 let raw = id.raw();
1458 if raw >= self.base_entity_id {
1459 let idx = (raw - self.base_entity_id) as usize;
1460 if idx < self.flat_entities.len() && self.flat_entities[idx].id == id {
1461 self.metadata.remove_all(id);
1462 self.deleted.insert(id);
1463 return Ok(true);
1464 }
1465 }
1466 if let Some(entity) = self.entities.remove(&id) {
1467 self.unindex_entity(&entity);
1468 self.metadata.remove_all(id);
1469 self.deleted.insert(id);
1470 return Ok(true);
1471 }
1472 return Ok(false);
1473 }
1474
1475 let entity = self.entities.remove(&id);
1477 if entity.is_none() {
1478 return Ok(false);
1479 }
1480
1481 if let Some(ref e) = entity {
1483 self.unindex_entity(e);
1484 }
1485
1486 self.metadata.remove_all(id);
1488
1489 self.deleted.insert(id);
1491
1492 Ok(true)
1493 }
1494
1495 fn get_metadata(&self, id: EntityId) -> Option<Metadata> {
1496 if !self.has_live_entity(id) {
1497 return None;
1498 }
1499 Some(self.metadata.get_all(id))
1500 }
1501
1502 fn set_metadata(&mut self, id: EntityId, metadata: Metadata) -> Result<(), SegmentError> {
1503 if !self.state.is_writable() {
1504 return Err(SegmentError::NotWritable);
1505 }
1506
1507 if !self.has_live_entity(id) {
1508 return Err(SegmentError::NotFound(id));
1509 }
1510
1511 self.metadata.set_all(id, &metadata);
1512 Ok(())
1513 }
1514
1515 fn seal(&mut self) -> Result<(), SegmentError> {
1516 if self.state != SegmentState::Growing {
1517 return Err(SegmentError::InvalidState(self.state));
1518 }
1519
1520 self.state = SegmentState::Sealing;
1521
1522 let memtable_stats = self.memtable.stats();
1524 if memtable_stats.entry_count > 0 {
1525 self.memtable.clear();
1528 }
1529
1530 self.rebuild_sealed_col_zones();
1536
1537 self.state = SegmentState::Sealed;
1538 Ok(())
1539 }
1540
1541 fn should_seal(&self, config: &SegmentConfig) -> bool {
1542 if self.entities.len() >= config.max_entities {
1544 return true;
1545 }
1546
1547 if self.memory_bytes.load(Ordering::Relaxed) as usize >= config.max_bytes {
1549 return true;
1550 }
1551
1552 if self.age_secs() >= config.max_age_secs {
1554 return true;
1555 }
1556
1557 false
1558 }
1559
1560 fn iter(&self) -> Box<dyn Iterator<Item = &UnifiedEntity> + '_> {
1561 let base: Box<dyn Iterator<Item = &UnifiedEntity>> = if self.use_flat {
1562 Box::new(self.flat_entities.iter().chain(self.entities.values()))
1566 } else {
1567 Box::new(self.entities.values())
1568 };
1569 if self.deleted.is_empty() {
1570 base
1571 } else {
1572 Box::new(base.filter(|e| !self.deleted.contains(&e.id)))
1573 }
1574 }
1575
1576 fn iter_kind(&self, kind_filter: &str) -> Box<dyn Iterator<Item = &UnifiedEntity> + '_> {
1577 let ids = self.kind_index.get(kind_filter).cloned();
1578 let flat: Box<dyn Iterator<Item = &UnifiedEntity>> = if self.use_flat {
1581 Box::new(self.flat_entities.iter())
1582 } else {
1583 Box::new(std::iter::empty())
1584 };
1585 Box::new(flat.chain(self.entities.values()).filter(move |e| {
1586 if self.deleted.contains(&e.id) {
1587 return false;
1588 }
1589 if let Some(ref ids) = ids {
1590 ids.contains(&e.id)
1591 } else {
1592 false
1593 }
1594 }))
1595 }
1596
1597 fn filter_metadata(
1598 &self,
1599 filters: &[(String, super::metadata::MetadataFilter)],
1600 ) -> Vec<EntityId> {
1601 let flat_ids: Box<dyn Iterator<Item = EntityId> + '_> = if self.use_flat {
1604 Box::new(self.flat_entities.iter().map(|e| e.id))
1605 } else {
1606 Box::new(std::iter::empty())
1607 };
1608 flat_ids
1609 .chain(self.entities.keys().copied())
1610 .filter(|id| {
1611 if self.deleted.contains(id) {
1612 return false;
1613 }
1614 let metadata = self.metadata.get_all(*id);
1615 metadata.matches_all(filters)
1616 })
1617 .collect()
1618 }
1619}
1620
1621fn build_minmax_multi_intervals(
1622 entries: &[(CanonicalKey, Value)],
1623 max_intervals: usize,
1624) -> Vec<ColZone> {
1625 if entries.is_empty() {
1626 return Vec::new();
1627 }
1628 if entries.len() == 1 || max_intervals <= 1 {
1629 return vec![ColZone::with_bounds(
1630 entries[0].1.clone(),
1631 entries[entries.len() - 1].1.clone(),
1632 )];
1633 }
1634
1635 let mut split_points = if entries.len() <= max_intervals {
1636 (1..entries.len()).collect::<Vec<_>>()
1637 } else {
1638 let target_splits = max_intervals - 1;
1639 let mut selected = select_gap_split_points(entries, target_splits);
1640 if selected.len() < target_splits {
1641 for bucket in 1..max_intervals {
1642 let idx = bucket * entries.len() / max_intervals;
1643 if idx == 0 || idx >= entries.len() || selected.contains(&idx) {
1644 continue;
1645 }
1646 selected.push(idx);
1647 if selected.len() >= target_splits {
1648 break;
1649 }
1650 }
1651 }
1652 selected.sort_unstable();
1653 selected.dedup();
1654 selected
1655 };
1656
1657 split_points.push(entries.len());
1658
1659 let mut out = Vec::with_capacity(split_points.len());
1660 let mut start = 0usize;
1661 for end in split_points {
1662 if end <= start {
1663 continue;
1664 }
1665 out.push(ColZone::with_bounds(
1666 entries[start].1.clone(),
1667 entries[end - 1].1.clone(),
1668 ));
1669 start = end;
1670 }
1671
1672 if out.is_empty() {
1673 out.push(ColZone::with_bounds(
1674 entries[0].1.clone(),
1675 entries[entries.len() - 1].1.clone(),
1676 ));
1677 }
1678
1679 out
1680}
1681
1682fn select_gap_split_points(entries: &[(CanonicalKey, Value)], max_splits: usize) -> Vec<usize> {
1683 let mut gaps = Vec::new();
1684 for idx in 1..entries.len() {
1685 if let Some(score) = canonical_gap_score(&entries[idx - 1].0, &entries[idx].0) {
1686 if score > 0.0 {
1687 gaps.push((score, idx));
1688 }
1689 }
1690 }
1691 gaps.sort_by(|left, right| {
1692 right
1693 .0
1694 .partial_cmp(&left.0)
1695 .unwrap_or(std::cmp::Ordering::Equal)
1696 .then_with(|| left.1.cmp(&right.1))
1697 });
1698 gaps.into_iter()
1699 .take(max_splits)
1700 .map(|(_, idx)| idx)
1701 .collect()
1702}
1703
1704fn canonical_gap_score(left: &CanonicalKey, right: &CanonicalKey) -> Option<f64> {
1705 if left.family() != right.family() {
1706 return None;
1707 }
1708 match (left, right) {
1709 (CanonicalKey::Signed(_, l), CanonicalKey::Signed(_, r)) => {
1710 Some(r.saturating_sub(*l) as f64)
1711 }
1712 (CanonicalKey::Unsigned(_, l), CanonicalKey::Unsigned(_, r)) => {
1713 Some(r.saturating_sub(*l) as f64)
1714 }
1715 (CanonicalKey::Float(l), CanonicalKey::Float(r)) => {
1716 Some((f64::from_bits(*r) - f64::from_bits(*l)).abs())
1717 }
1718 _ => None,
1719 }
1720}
1721
1722#[cfg(test)]
1723mod tests {
1724 use super::*;
1725 use crate::storage::schema::Value;
1726 use crate::storage::unified::entity::RowData;
1727 use crate::storage::unified::MetadataValue;
1728
1729 #[test]
1730 fn test_growing_segment_basic() {
1731 let mut segment = GrowingSegment::new(1, "test");
1732
1733 let entity = UnifiedEntity::table_row(
1734 EntityId::new(1),
1735 "users",
1736 1,
1737 vec![Value::text("Alice".to_string())],
1738 );
1739
1740 let id = segment.insert(entity).unwrap();
1741 assert_eq!(id, EntityId::new(1));
1742 assert!(segment.contains(id));
1743
1744 let stats = segment.stats();
1745 assert_eq!(stats.entity_count, 1);
1746 assert_eq!(stats.row_count, 1);
1747 }
1748
1749 #[test]
1750 fn test_segment_metadata() {
1751 let mut segment = GrowingSegment::new(1, "test");
1752
1753 let entity = UnifiedEntity::table_row(
1754 EntityId::new(1),
1755 "users",
1756 1,
1757 vec![Value::text("Alice".to_string())],
1758 );
1759 segment.insert(entity).unwrap();
1760
1761 let mut meta = Metadata::new();
1762 meta.set("role", MetadataValue::String("admin".to_string()));
1763 meta.set("level", MetadataValue::Int(5));
1764
1765 segment.set_metadata(EntityId::new(1), meta).unwrap();
1766
1767 let retrieved = segment.get_metadata(EntityId::new(1)).unwrap();
1768 assert_eq!(
1769 retrieved.get("role"),
1770 Some(&MetadataValue::String("admin".to_string()))
1771 );
1772 }
1773
1774 #[test]
1775 fn test_segment_seal() {
1776 let mut segment = GrowingSegment::new(1, "test");
1777
1778 let entity = UnifiedEntity::vector(EntityId::new(1), "embeddings", vec![0.1, 0.2, 0.3]);
1779 segment.insert(entity).unwrap();
1780
1781 assert!(segment.state().is_writable());
1783
1784 segment.seal().unwrap();
1786 assert_eq!(segment.state(), SegmentState::Sealed);
1787
1788 let entity2 = UnifiedEntity::vector(EntityId::new(2), "embeddings", vec![0.4, 0.5, 0.6]);
1790 assert!(segment.insert(entity2).is_err());
1791 }
1792
1793 #[test]
1794 fn test_should_seal() {
1795 let mut segment = GrowingSegment::new(1, "test");
1796
1797 let config = SegmentConfig {
1798 max_entities: 2,
1799 ..Default::default()
1800 };
1801
1802 assert!(!segment.should_seal(&config));
1803
1804 segment
1805 .insert(UnifiedEntity::vector(EntityId::new(1), "v", vec![0.1]))
1806 .unwrap();
1807 assert!(!segment.should_seal(&config));
1808
1809 segment
1810 .insert(UnifiedEntity::vector(EntityId::new(2), "v", vec![0.2]))
1811 .unwrap();
1812 assert!(segment.should_seal(&config));
1813 }
1814
1815 #[test]
1816 fn test_cross_references() {
1817 let mut segment = GrowingSegment::new(1, "test");
1818
1819 let mut entity1 = UnifiedEntity::table_row(
1820 EntityId::new(1),
1821 "hosts",
1822 1,
1823 vec![Value::text("192.168.1.1".to_string())],
1824 );
1825 entity1.add_cross_ref(CrossRef::new(
1826 EntityId::new(1),
1827 EntityId::new(2),
1828 "nodes",
1829 RefType::RowToNode,
1830 ));
1831 segment.insert(entity1).unwrap();
1832
1833 let refs_from = segment.get_references_from(EntityId::new(1));
1834 assert_eq!(refs_from.len(), 1);
1835 assert_eq!(refs_from[0], (EntityId::new(2), RefType::RowToNode));
1836
1837 let refs_to = segment.get_references_to(EntityId::new(2));
1838 assert_eq!(refs_to.len(), 1);
1839 assert_eq!(refs_to[0], (EntityId::new(1), RefType::RowToNode));
1840 }
1841
1842 #[test]
1843 fn test_zone_predicate_uses_canonical_fallback_for_email_values() {
1844 let mut zone = ColZone::new(Value::Email("bravo@example.com".to_string()));
1845 zone.update(&Value::Email("delta@example.com".to_string()));
1846
1847 let probe = Value::Email("alpha@example.com".to_string());
1848 assert!(ZoneColPred::Eq(&probe).can_skip(&zone));
1849
1850 let in_range = Value::Email("charlie@example.com".to_string());
1851 assert!(!ZoneColPred::Eq(&in_range).can_skip(&zone));
1852 }
1853
1854 #[test]
1855 fn test_sealed_multi_zone_prunes_numeric_gap_outlier() {
1856 let mut segment = GrowingSegment::new(1, "test");
1857
1858 for (row_id, age) in [(1_u64, 1_i64), (2, 2), (3, 3), (4, 1000)] {
1859 let entity = UnifiedEntity::new(
1860 EntityId::new(row_id),
1861 EntityKind::TableRow {
1862 table: "users".into(),
1863 row_id,
1864 },
1865 EntityData::Row(RowData::with_names(
1866 vec![Value::Integer(age)],
1867 vec!["age".to_string()],
1868 )),
1869 );
1870 segment.insert(entity).unwrap();
1871 }
1872
1873 segment.seal().unwrap();
1874
1875 let miss = Value::Integer(500);
1876 assert!(segment.can_skip_zone_preds(&[("age", ZoneColPred::Eq(&miss))]));
1877
1878 let hit = Value::Integer(1000);
1879 assert!(!segment.can_skip_zone_preds(&[("age", ZoneColPred::Eq(&hit))]));
1880 }
1881}