Skip to main content

reddb_server/storage/unified/
segment.rs

1//! Unified Segment System
2//!
3//! Implements the Growing → Sealed segment lifecycle pattern inspired by
4//! Milvus and ChromaDB. Segments are the fundamental unit of storage
5//! that handle entities of all types.
6//!
7//! # Lifecycle
8//!
9//! ```text
10//! Growing (in-memory, accepts writes)
11//!    ↓ seal() when full or manually triggered
12//! Sealed (immutable, fully indexed)
13//!    ↓ flush() for persistence
14//! Flushed (on disk, can be mmap'd)
15//!    ↓ archive() for cold storage
16//! Archived (compressed, infrequently accessed)
17//! ```
18
19use std::collections::{BTreeMap, HashMap, HashSet};
20use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
21use std::time::{SystemTime, UNIX_EPOCH};
22
23use super::entity::{CrossRef, EntityData, EntityId, EntityKind, RefType, UnifiedEntity};
24use super::memtable::Memtable;
25use super::metadata::{Metadata, MetadataStorage};
26use crate::storage::primitives::bloom::BloomFilter;
27use crate::storage::query::value_compare::partial_compare_values;
28use crate::storage::schema::{value_to_canonical_key, CanonicalKey, Value};
29
30/// Unique identifier for a segment
31pub type SegmentId = u64;
32
33/// Segment state in its lifecycle
34#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35pub enum SegmentState {
36    /// Accepts writes, partial/no index
37    Growing,
38    /// Transitioning, building indices
39    Sealing,
40    /// Immutable, fully indexed
41    Sealed,
42    /// Persisted to disk
43    Flushed,
44    /// Compressed, cold storage
45    Archived,
46}
47
48impl SegmentState {
49    /// Check if segment accepts writes
50    pub fn is_writable(&self) -> bool {
51        matches!(self, Self::Growing)
52    }
53
54    /// Check if segment is queryable
55    pub fn is_queryable(&self) -> bool {
56        !matches!(self, Self::Sealing)
57    }
58
59    /// Check if segment is immutable
60    pub fn is_immutable(&self) -> bool {
61        matches!(self, Self::Sealed | Self::Flushed | Self::Archived)
62    }
63}
64
65/// Configuration for segments
66#[derive(Debug, Clone)]
67pub struct SegmentConfig {
68    /// Maximum entities before auto-sealing
69    pub max_entities: usize,
70    /// Maximum memory bytes before auto-sealing
71    pub max_bytes: usize,
72    /// Maximum age in seconds before auto-sealing
73    pub max_age_secs: u64,
74    /// Enable vector indexing when sealed
75    pub build_vector_index: bool,
76    /// Enable graph indexing when sealed
77    pub build_graph_index: bool,
78    /// Compression level for archived segments (0-9)
79    pub compression_level: u8,
80}
81
82impl Default for SegmentConfig {
83    fn default() -> Self {
84        Self {
85            max_entities: 100_000,
86            max_bytes: 256 * 1024 * 1024, // 256 MB
87            max_age_secs: 3600,           // 1 hour
88            build_vector_index: true,
89            build_graph_index: true,
90            compression_level: 6,
91        }
92    }
93}
94
95/// Segment statistics
96#[derive(Debug, Clone, Default)]
97pub struct SegmentStats {
98    /// Number of entities
99    pub entity_count: usize,
100    /// Number of deleted entities
101    pub deleted_count: usize,
102    /// Approximate memory usage in bytes
103    pub memory_bytes: usize,
104    /// Number of vectors
105    pub vector_count: usize,
106    /// Number of graph nodes
107    pub node_count: usize,
108    /// Number of graph edges
109    pub edge_count: usize,
110    /// Number of table rows
111    pub row_count: usize,
112    /// Number of cross-references
113    pub cross_ref_count: usize,
114}
115
116/// Segment error types
117#[derive(Debug, Clone)]
118pub enum SegmentError {
119    /// Segment is not writable
120    NotWritable,
121    /// Entity not found
122    NotFound(EntityId),
123    /// Entity already exists
124    AlreadyExists(EntityId),
125    /// Segment is full
126    Full,
127    /// Invalid operation for current state
128    InvalidState(SegmentState),
129    /// Internal error
130    Internal(String),
131}
132
133impl std::fmt::Display for SegmentError {
134    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
135        match self {
136            Self::NotWritable => write!(f, "segment is not writable"),
137            Self::NotFound(id) => write!(f, "entity not found: {}", id),
138            Self::AlreadyExists(id) => write!(f, "entity already exists: {}", id),
139            Self::Full => write!(f, "segment is full"),
140            Self::InvalidState(state) => write!(f, "invalid operation for state: {:?}", state),
141            Self::Internal(msg) => write!(f, "internal error: {}", msg),
142        }
143    }
144}
145
146impl std::error::Error for SegmentError {}
147
148fn current_unix_secs() -> u64 {
149    SystemTime::now()
150        .duration_since(UNIX_EPOCH)
151        .unwrap_or_default()
152        .as_secs()
153}
154
155const SEALED_MULTI_ZONE_MAX_INTERVALS: usize = 4;
156
157#[derive(Debug, Clone)]
158struct UpdateIndexSnapshot {
159    pk_column_name: Option<String>,
160    pk_value: Option<Value>,
161    pk_index_key: Option<(String, String)>,
162    cross_refs: Vec<CrossRef>,
163}
164
165impl UpdateIndexSnapshot {
166    fn from_entity(entity: &UnifiedEntity) -> Self {
167        let (pk_column_name, pk_value) = match &entity.data {
168            EntityData::Row(row) => (
169                row.schema
170                    .as_deref()
171                    .and_then(|schema| schema.first().cloned()),
172                row.columns.first().cloned(),
173            ),
174            _ => (None, None),
175        };
176        let pk_index_key = pk_value
177            .as_ref()
178            .map(|value| (entity.kind.collection().to_string(), format!("{:?}", value)));
179        Self {
180            pk_column_name,
181            pk_value,
182            pk_index_key,
183            cross_refs: entity.cross_refs().to_vec(),
184        }
185    }
186}
187
188/// A unified segment that stores all entity types
189pub trait UnifiedSegment: Send + Sync {
190    /// Get segment ID
191    fn id(&self) -> SegmentId;
192
193    /// Get current state
194    fn state(&self) -> SegmentState;
195
196    /// Get collection/namespace name
197    fn collection(&self) -> &str;
198
199    /// Get statistics
200    fn stats(&self) -> SegmentStats;
201
202    /// O(1) live entity count (entities minus tombstones)
203    fn entity_count(&self) -> usize;
204
205    /// Check if entity exists
206    fn contains(&self, id: EntityId) -> bool;
207
208    /// Get an entity by ID
209    fn get(&self, id: EntityId) -> Option<&UnifiedEntity>;
210
211    /// Get mutable reference to entity
212    fn get_mut(&mut self, id: EntityId) -> Option<&mut UnifiedEntity>;
213
214    /// Insert a new entity
215    fn insert(&mut self, entity: UnifiedEntity) -> Result<EntityId, SegmentError>;
216
217    /// Update an existing entity
218    fn update(&mut self, entity: UnifiedEntity) -> Result<(), SegmentError>;
219
220    /// HOT-update: like update but receives the set of field names that actually
221    /// changed. Allows skipping index work when indexed columns are unaffected.
222    /// Default: falls back to full update.
223    fn update_hot(
224        &mut self,
225        entity: UnifiedEntity,
226        modified_columns: &[String],
227    ) -> Result<(), SegmentError> {
228        let _ = modified_columns;
229        self.update(entity)
230    }
231
232    /// Delete an entity
233    fn delete(&mut self, id: EntityId) -> Result<bool, SegmentError>;
234
235    /// Get metadata for an entity
236    fn get_metadata(&self, id: EntityId) -> Option<Metadata>;
237
238    /// Set metadata for an entity
239    fn set_metadata(&mut self, id: EntityId, metadata: Metadata) -> Result<(), SegmentError>;
240
241    /// Seal the segment (make immutable)
242    fn seal(&mut self) -> Result<(), SegmentError>;
243
244    /// Check if should auto-seal based on config
245    fn should_seal(&self, config: &SegmentConfig) -> bool;
246
247    /// Iterate over all entities
248    fn iter(&self) -> Box<dyn Iterator<Item = &UnifiedEntity> + '_>;
249
250    /// Iterate over entities of a specific kind
251    fn iter_kind(&self, kind_filter: &str) -> Box<dyn Iterator<Item = &UnifiedEntity> + '_>;
252
253    /// Search entities by metadata filter
254    fn filter_metadata(
255        &self,
256        filters: &[(String, super::metadata::MetadataFilter)],
257    ) -> Vec<EntityId>;
258}
259
260// ─────────────────────────────────────────────────────────────────────────────
261// Zone map: per-column min/max for segment pruning
262// ─────────────────────────────────────────────────────────────────────────────
263
264/// Tracks the min and max observed `Value` for one column in a segment.
265/// Used to skip segments that cannot satisfy a range or equality predicate.
266#[derive(Debug, Clone)]
267pub struct ColZone {
268    pub min: Value,
269    pub max: Value,
270    min_key: Option<CanonicalKey>,
271    max_key: Option<CanonicalKey>,
272}
273
274impl ColZone {
275    fn new(v: Value) -> Self {
276        Self {
277            min_key: value_to_canonical_key(&v),
278            max_key: value_to_canonical_key(&v),
279            min: v.clone(),
280            max: v,
281        }
282    }
283
284    fn with_bounds(min: Value, max: Value) -> Self {
285        Self {
286            min_key: value_to_canonical_key(&min),
287            max_key: value_to_canonical_key(&max),
288            min,
289            max,
290        }
291    }
292
293    fn update(&mut self, v: &Value) {
294        if compare_zone_values(v, None, &self.min, self.min_key.as_ref())
295            .map(|o| o == std::cmp::Ordering::Less)
296            .unwrap_or(false)
297        {
298            self.min = v.clone();
299            self.min_key = value_to_canonical_key(v);
300        }
301        if compare_zone_values(v, None, &self.max, self.max_key.as_ref())
302            .map(|o| o == std::cmp::Ordering::Greater)
303            .unwrap_or(false)
304        {
305            self.max = v.clone();
306            self.max_key = value_to_canonical_key(v);
307        }
308    }
309}
310
311#[derive(Debug, Clone, Default)]
312pub struct MultiColZone {
313    pub intervals: Vec<ColZone>,
314}
315
316impl MultiColZone {
317    fn can_skip(&self, pred: &ZoneColPred<'_>) -> bool {
318        !self.intervals.is_empty() && self.intervals.iter().all(|zone| pred.can_skip(zone))
319    }
320}
321
322fn compare_zone_values(
323    left: &Value,
324    left_key: Option<&CanonicalKey>,
325    right: &Value,
326    right_key: Option<&CanonicalKey>,
327) -> Option<std::cmp::Ordering> {
328    partial_compare_values(left, right).or_else(|| {
329        let left_key = left_key.cloned().or_else(|| value_to_canonical_key(left))?;
330        let right_key = right_key
331            .cloned()
332            .or_else(|| value_to_canonical_key(right))?;
333        (left_key.family() == right_key.family()).then(|| left_key.cmp(&right_key))
334    })
335}
336
337/// Tag-only variant of `ZoneColPred` — used where the Value is stored separately.
338#[derive(Debug, Clone, Copy, PartialEq, Eq)]
339pub enum ZoneColPredKind {
340    Eq,
341    Gt,
342    Gte,
343    Lt,
344    Lte,
345}
346
347/// A predicate on a single column that can be checked against a `ColZone`.
348#[derive(Debug, Clone)]
349pub enum ZoneColPred<'a> {
350    Eq(&'a Value),
351    Gt(&'a Value),
352    Gte(&'a Value),
353    Lt(&'a Value),
354    Lte(&'a Value),
355}
356
357impl<'a> ZoneColPred<'a> {
358    /// Returns `true` when the entire segment can be skipped (no row can match).
359    pub fn can_skip(&self, zone: &ColZone) -> bool {
360        match self {
361            // Equality: skip if val < min OR val > max
362            ZoneColPred::Eq(val) => {
363                compare_zone_values(val, None, &zone.min, zone.min_key.as_ref())
364                    .map(|o| o == std::cmp::Ordering::Less)
365                    .unwrap_or(false)
366                    || compare_zone_values(val, None, &zone.max, zone.max_key.as_ref())
367                        .map(|o| o == std::cmp::Ordering::Greater)
368                        .unwrap_or(false)
369            }
370            // col > val: skip if max <= val (all rows have col ≤ val, none > val)
371            ZoneColPred::Gt(val) => {
372                compare_zone_values(&zone.max, zone.max_key.as_ref(), val, None)
373                    .map(|o| o != std::cmp::Ordering::Greater)
374                    .unwrap_or(false)
375            }
376            // col >= val: skip if max < val
377            ZoneColPred::Gte(val) => {
378                compare_zone_values(&zone.max, zone.max_key.as_ref(), val, None)
379                    .map(|o| o == std::cmp::Ordering::Less)
380                    .unwrap_or(false)
381            }
382            // col < val: skip if min >= val
383            ZoneColPred::Lt(val) => {
384                compare_zone_values(&zone.min, zone.min_key.as_ref(), val, None)
385                    .map(|o| o != std::cmp::Ordering::Less)
386                    .unwrap_or(false)
387            }
388            // col <= val: skip if min > val
389            ZoneColPred::Lte(val) => {
390                compare_zone_values(&zone.min, zone.min_key.as_ref(), val, None)
391                    .map(|o| o == std::cmp::Ordering::Greater)
392                    .unwrap_or(false)
393            }
394        }
395    }
396}
397
398/// Growing segment implementation (in-memory, writable)
399pub struct GrowingSegment {
400    /// Segment ID
401    id: SegmentId,
402    /// Collection/namespace name
403    collection: String,
404    /// Current state
405    state: SegmentState,
406    /// Creation timestamp
407    created_at: u64,
408    /// Last write timestamp
409    last_write_at: u64,
410
411    /// Entity storage (HashMap for random access)
412    entities: HashMap<EntityId, UnifiedEntity>,
413    /// Flat entity storage for bulk inserts (no HashMap overhead, O(1) by offset)
414    /// Used when entity IDs are sequential from base_entity_id
415    flat_entities: Vec<UnifiedEntity>,
416    /// Base entity ID for flat_entities (flat_entities[0].id == base_entity_id)
417    base_entity_id: u64,
418    /// Whether flat storage is active (bulk insert mode)
419    use_flat: bool,
420    /// Deleted entity IDs (tombstones)
421    deleted: HashSet<EntityId>,
422    /// Metadata storage (type-aware)
423    metadata: MetadataStorage,
424
425    /// Primary key index: (collection, pk_value) → EntityId
426    pk_index: BTreeMap<(String, String), EntityId>,
427    /// Type index: kind → EntityIds
428    kind_index: HashMap<String, HashSet<EntityId>>,
429    /// Cross-reference index: source → Vec<(target, ref_type)>
430    cross_ref_forward: HashMap<EntityId, Vec<(EntityId, RefType)>>,
431    /// Reverse cross-reference index: target → Vec<(source, ref_type)>
432    cross_ref_reverse: HashMap<EntityId, Vec<(EntityId, RefType)>>,
433
434    /// Bloom filter for fast negative key lookups
435    bloom: BloomFilter,
436
437    /// Write buffer for absorbing write spikes (sorted by key)
438    memtable: Memtable,
439
440    /// Per-column zone maps: col_name → (min, max) for segment pruning
441    col_zones: HashMap<String, ColZone>,
442    /// Sealed-only minmax-multi summaries built from canonical ordering.
443    sealed_col_zones: HashMap<String, MultiColZone>,
444
445    /// Sequence counter for ordering
446    sequence: AtomicU64,
447    /// Approximate memory usage
448    memory_bytes: AtomicU64,
449
450    /// Epoch counter for lock-free reads of `flat_entities`.
451    ///
452    /// Updated with `Release` ordering after every flat-mode insert so that
453    /// readers can safely access `flat_entities[0..published_flat_len]` by
454    /// loading with `Acquire` ordering, without holding the segment RwLock.
455    /// Only meaningful when `use_flat == true`; always 0 in HashMap mode.
456    pub(crate) published_flat_len: AtomicUsize,
457}
458
459impl GrowingSegment {
460    /// Direct iteration without Box<dyn> trait dispatch. Returns false to stop early.
461    /// Uses concrete iterator types to avoid heap allocation per call.
462    #[inline]
463    pub fn for_each_fast<F>(&self, mut f: F) -> bool
464    where
465        F: FnMut(&UnifiedEntity) -> bool,
466    {
467        if self.use_flat {
468            // Sequential Vec — best cache locality
469            if self.deleted.is_empty() {
470                for entity in &self.flat_entities {
471                    if !f(entity) {
472                        return false;
473                    }
474                }
475                // Also walk the HashMap — entities added via per-row
476                // `insert()` after flat mode was initialized land there.
477                for entity in self.entities.values() {
478                    if !f(entity) {
479                        return false;
480                    }
481                }
482            } else {
483                for entity in &self.flat_entities {
484                    if self.deleted.contains(&entity.id) {
485                        continue;
486                    }
487                    if !f(entity) {
488                        return false;
489                    }
490                }
491                for entity in self.entities.values() {
492                    if self.deleted.contains(&entity.id) {
493                        continue;
494                    }
495                    if !f(entity) {
496                        return false;
497                    }
498                }
499            }
500        } else {
501            // HashMap values — random order, no boxing
502            if self.deleted.is_empty() {
503                for entity in self.entities.values() {
504                    if !f(entity) {
505                        return false;
506                    }
507                }
508            } else {
509                for entity in self.entities.values() {
510                    if self.deleted.contains(&entity.id) {
511                        continue;
512                    }
513                    if !f(entity) {
514                        return false;
515                    }
516                }
517            }
518        }
519        true
520    }
521
522    /// Create a new growing segment
523    pub fn new(id: SegmentId, collection: impl Into<String>) -> Self {
524        let now = current_unix_secs();
525
526        Self {
527            id,
528            collection: collection.into(),
529            state: SegmentState::Growing,
530            created_at: now,
531            last_write_at: now,
532            entities: HashMap::new(),
533            flat_entities: Vec::new(),
534            base_entity_id: 0,
535            use_flat: false,
536            deleted: HashSet::new(),
537            metadata: MetadataStorage::new(),
538            pk_index: BTreeMap::new(),
539            kind_index: HashMap::new(),
540            cross_ref_forward: HashMap::new(),
541            cross_ref_reverse: HashMap::new(),
542            bloom: BloomFilter::with_capacity(100_000, 0.01),
543            memtable: Memtable::new(),
544            col_zones: HashMap::new(),
545            sealed_col_zones: HashMap::new(),
546            sequence: AtomicU64::new(0),
547            memory_bytes: AtomicU64::new(0),
548            published_flat_len: AtomicUsize::new(0),
549        }
550    }
551
552    /// Get next sequence number
553    fn next_sequence(&self) -> u64 {
554        self.sequence.fetch_add(1, Ordering::SeqCst)
555    }
556
557    fn has_live_entity(&self, id: EntityId) -> bool {
558        if self.deleted.contains(&id) {
559            return false;
560        }
561        if self.use_flat {
562            let raw = id.raw();
563            if raw >= self.base_entity_id {
564                let idx = (raw - self.base_entity_id) as usize;
565                if self
566                    .flat_entities
567                    .get(idx)
568                    .is_some_and(|entity| entity.id == id)
569                {
570                    return true;
571                }
572            }
573            // Same fallback as `get()` — entities added via `insert()`
574            // after flat mode was initialized live only in the HashMap.
575            self.entities.contains_key(&id)
576        } else {
577            self.entities.contains_key(&id)
578        }
579    }
580
581    fn update_existing_entity_in_place(
582        &mut self,
583        entity: &UnifiedEntity,
584    ) -> Result<UpdateIndexSnapshot, SegmentError> {
585        if self.use_flat {
586            let raw = entity.id.raw();
587            if raw < self.base_entity_id {
588                return Err(SegmentError::NotFound(entity.id));
589            }
590            let idx = (raw - self.base_entity_id) as usize;
591            let Some(slot) = self.flat_entities.get_mut(idx) else {
592                return Err(SegmentError::NotFound(entity.id));
593            };
594            if slot.id != entity.id {
595                return Err(SegmentError::NotFound(entity.id));
596            }
597            let snapshot = UpdateIndexSnapshot::from_entity(slot);
598            slot.clone_from(entity);
599            Ok(snapshot)
600        } else {
601            let Some(slot) = self.entities.get_mut(&entity.id) else {
602                return Err(SegmentError::NotFound(entity.id));
603            };
604            let snapshot = UpdateIndexSnapshot::from_entity(slot);
605            slot.clone_from(entity);
606            Ok(snapshot)
607        }
608    }
609
610    fn apply_hot_update_with_metadata(
611        &mut self,
612        entity: &UnifiedEntity,
613        modified_columns: &[String],
614        metadata: Option<&Metadata>,
615    ) -> Result<(), SegmentError> {
616        let old = self.update_existing_entity_in_place(entity)?;
617        self.reindex_for_update(&old, entity, Some(modified_columns));
618        self.update_col_zones_from_entity(entity);
619        if let Some(metadata) = metadata {
620            self.metadata.set_all(entity.id, metadata);
621        }
622        Ok(())
623    }
624
625    fn apply_update_with_metadata(
626        &mut self,
627        entity: &UnifiedEntity,
628        metadata: Option<&Metadata>,
629    ) -> Result<(), SegmentError> {
630        let old = self.update_existing_entity_in_place(entity)?;
631        self.reindex_for_update(&old, entity, None);
632        self.update_col_zones_from_entity(entity);
633        if let Some(metadata) = metadata {
634            self.metadata.set_all(entity.id, metadata);
635        }
636        Ok(())
637    }
638
639    pub fn update_hot_batch_with_metadata<'a, I>(&mut self, items: I) -> Result<(), SegmentError>
640    where
641        I: IntoIterator<Item = (&'a UnifiedEntity, &'a [String], Option<&'a Metadata>)>,
642    {
643        if !self.state.is_writable() {
644            return Err(SegmentError::NotWritable);
645        }
646
647        let items: Vec<(&UnifiedEntity, &[String], Option<&Metadata>)> =
648            items.into_iter().collect();
649        if items.is_empty() {
650            return Ok(());
651        }
652
653        for (entity, _, _) in &items {
654            if !self.has_live_entity(entity.id) {
655                return Err(SegmentError::NotFound(entity.id));
656            }
657        }
658
659        for (entity, modified_columns, metadata) in items {
660            self.apply_hot_update_with_metadata(entity, modified_columns, metadata)?;
661        }
662
663        self.last_write_at = current_unix_secs();
664        Ok(())
665    }
666
667    pub fn delete_batch(&mut self, ids: &[EntityId]) -> Result<Vec<EntityId>, SegmentError> {
668        if !self.state.is_writable() {
669            return Err(SegmentError::NotWritable);
670        }
671        if ids.is_empty() {
672            return Ok(Vec::new());
673        }
674
675        let mut deleted_ids = Vec::with_capacity(ids.len());
676
677        if self.use_flat {
678            for &id in ids {
679                let raw = id.raw();
680                if raw < self.base_entity_id {
681                    if let Some(entity) = self.entities.remove(&id) {
682                        self.unindex_entity(&entity);
683                        self.metadata.remove_all(id);
684                        self.deleted.insert(id);
685                        deleted_ids.push(id);
686                    }
687                    continue;
688                }
689                let idx = (raw - self.base_entity_id) as usize;
690                if idx < self.flat_entities.len() && self.flat_entities[idx].id == id {
691                    if !self.deleted.contains(&id) {
692                        self.metadata.remove_all(id);
693                        self.deleted.insert(id);
694                        deleted_ids.push(id);
695                    }
696                } else if let Some(entity) = self.entities.remove(&id) {
697                    self.unindex_entity(&entity);
698                    self.metadata.remove_all(id);
699                    self.deleted.insert(id);
700                    deleted_ids.push(id);
701                }
702            }
703        } else {
704            for &id in ids {
705                if let Some(entity) = self.entities.remove(&id) {
706                    self.unindex_entity(&entity);
707                    self.metadata.remove_all(id);
708                    self.deleted.insert(id);
709                    deleted_ids.push(id);
710                }
711            }
712        }
713
714        if !deleted_ids.is_empty() {
715            self.last_write_at = current_unix_secs();
716        }
717
718        Ok(deleted_ids)
719    }
720
721    /// Update memory estimate
722    fn add_memory(&self, bytes: usize) {
723        self.memory_bytes.fetch_add(bytes as u64, Ordering::Relaxed);
724    }
725
726    /// Estimate memory for an entity
727    fn estimate_entity_size(entity: &UnifiedEntity) -> usize {
728        let mut size = std::mem::size_of::<UnifiedEntity>();
729
730        // Add data size
731        size += match &entity.data {
732            EntityData::Row(row) => row.columns.len() * 64, // Rough estimate
733            EntityData::Node(node) => node.properties.len() * 128,
734            EntityData::Edge(edge) => edge.properties.len() * 128,
735            EntityData::Vector(vec) => {
736                vec.dense.len() * 4 + vec.sparse.as_ref().map_or(0, |s| s.indices.len() * 8)
737            }
738            EntityData::TimeSeries(_) => 64,
739            EntityData::QueueMessage(_) => 128,
740        };
741
742        // Add embeddings
743        for emb in entity.embeddings() {
744            size += emb.vector.len() * 4 + emb.name.len() + emb.model.len();
745        }
746
747        // Add cross-refs
748        size += std::mem::size_of_val(entity.cross_refs());
749
750        size
751    }
752
753    /// Update per-column zone maps from a newly inserted entity's fields.
754    ///
755    /// Handles both insert paths:
756    /// - **Named** (`row.named`): individual inserts where fields are a `HashMap<String, Value>`
757    /// - **Positional** (`row.columns` + `row.schema`): bulk-inserted entities stored as `Vec<Value>`
758    ///   keyed by the shared schema. Previously this path was silently skipped, meaning zone maps
759    ///   were always empty for bulk-loaded tables and segment pruning never fired.
760    fn update_col_zones_from_entity(&mut self, entity: &UnifiedEntity) {
761        if let EntityData::Row(row) = &entity.data {
762            if let Some(named) = &row.named {
763                // Individual insert path — HashMap fields
764                for (col, val) in named {
765                    if matches!(val, Value::Null) {
766                        continue;
767                    }
768                    self.col_zones
769                        .entry(col.clone())
770                        .and_modify(|z| z.update(val))
771                        .or_insert_with(|| ColZone::new(val.clone()));
772                }
773            } else if let Some(schema) = &row.schema {
774                // Bulk-insert (columnar) path — positional Vec<Value> + shared schema.
775                // Previously skipped: zone maps were always empty for bulk-loaded tables.
776                for (col, val) in schema.iter().zip(row.columns.iter()) {
777                    if matches!(val, Value::Null) {
778                        continue;
779                    }
780                    self.col_zones
781                        .entry(col.clone())
782                        .and_modify(|z| z.update(val))
783                        .or_insert_with(|| ColZone::new(val.clone()));
784                }
785            }
786        }
787    }
788
789    fn rebuild_sealed_col_zones(&mut self) {
790        let mut values_by_col: HashMap<String, Vec<(CanonicalKey, Value)>> = HashMap::new();
791        let mut family_by_col: HashMap<String, crate::storage::schema::CanonicalKeyFamily> =
792            HashMap::new();
793        let mut mixed_family_cols = HashSet::new();
794        let mut unsupported_cols = HashSet::new();
795
796        let mut observe_row = |row: &super::entity::RowData| {
797            for (col, value) in row.iter_fields() {
798                if matches!(value, Value::Null) {
799                    continue;
800                }
801                let Some(key) = value_to_canonical_key(value) else {
802                    unsupported_cols.insert(col.to_string());
803                    continue;
804                };
805                match family_by_col.get(col).copied() {
806                    Some(existing) if existing != key.family() => {
807                        mixed_family_cols.insert(col.to_string());
808                    }
809                    None => {
810                        family_by_col.insert(col.to_string(), key.family());
811                    }
812                    _ => {}
813                }
814                values_by_col
815                    .entry(col.to_string())
816                    .or_default()
817                    .push((key, value.clone()));
818            }
819        };
820
821        if self.use_flat {
822            for entity in &self.flat_entities {
823                if self.deleted.contains(&entity.id) {
824                    continue;
825                }
826                if let EntityData::Row(row) = &entity.data {
827                    observe_row(row);
828                }
829            }
830        } else {
831            for entity in self.entities.values() {
832                if self.deleted.contains(&entity.id) {
833                    continue;
834                }
835                if let EntityData::Row(row) = &entity.data {
836                    observe_row(row);
837                }
838            }
839        }
840
841        let mut sealed_col_zones = HashMap::new();
842        for (col, mut entries) in values_by_col {
843            if mixed_family_cols.contains(&col)
844                || unsupported_cols.contains(&col)
845                || entries.is_empty()
846            {
847                continue;
848            }
849            entries.sort_unstable_by(|left, right| left.0.cmp(&right.0));
850            entries.dedup_by(|left, right| left.0 == right.0);
851
852            let intervals = build_minmax_multi_intervals(&entries, SEALED_MULTI_ZONE_MAX_INTERVALS);
853            if intervals.len() > 1 {
854                sealed_col_zones.insert(col, MultiColZone { intervals });
855            }
856        }
857
858        self.sealed_col_zones = sealed_col_zones;
859    }
860
861    /// Returns `true` when this segment can be entirely skipped for the given predicates.
862    /// A segment is skipped only if ALL predicates say so (conservative: any non-skippable
863    /// predicate forces the scan to proceed).
864    pub fn can_skip_zone_preds(&self, preds: &[(&str, ZoneColPred<'_>)]) -> bool {
865        if preds.is_empty() {
866            return false;
867        }
868        for (col, pred) in preds {
869            if let Some(zone) = self.sealed_col_zones.get(*col) {
870                if zone.can_skip(pred) {
871                    return true;
872                }
873                continue;
874            }
875            if let Some(zone) = self.col_zones.get(*col) {
876                if pred.can_skip(zone) {
877                    return true; // ONE predicate suffices to skip
878                }
879            }
880        }
881        false
882    }
883
884    /// Index an entity
885    fn index_entity(&mut self, entity: &UnifiedEntity) {
886        // Kind index
887        let kind_key = entity.kind.storage_type().to_string();
888        self.kind_index
889            .entry(kind_key)
890            .or_default()
891            .insert(entity.id);
892
893        // Bloom filter: insert entity ID bytes for fast negative lookups
894        let id_bytes = entity.id.raw().to_le_bytes();
895        self.bloom.insert(&id_bytes);
896
897        // Primary key index (if applicable)
898        if let EntityData::Row(row) = &entity.data {
899            if let Some(first_col) = row.columns.first() {
900                let pk_str = format!("{:?}", first_col);
901                // Also add PK to bloom filter
902                self.bloom.insert(pk_str.as_bytes());
903                self.pk_index
904                    .insert((entity.kind.collection().to_string(), pk_str), entity.id);
905            }
906        }
907
908        // Cross-reference indices
909        for cross_ref in entity.cross_refs() {
910            self.cross_ref_forward
911                .entry(cross_ref.source)
912                .or_default()
913                .push((cross_ref.target, cross_ref.ref_type));
914
915            self.cross_ref_reverse
916                .entry(cross_ref.target)
917                .or_default()
918                .push((cross_ref.source, cross_ref.ref_type));
919        }
920    }
921
922    /// Check if an entity ID might exist in this segment via bloom filter.
923    /// Returns `false` means *definitely not here*. `true` means *maybe here*.
924    pub fn bloom_might_contain_id(&self, id: EntityId) -> bool {
925        let id_bytes = id.raw().to_le_bytes();
926        self.bloom.contains(&id_bytes)
927    }
928
929    /// Check if a primary key value might exist in this segment via bloom filter.
930    pub fn bloom_might_contain_key(&self, key: &[u8]) -> bool {
931        self.bloom.contains(key)
932    }
933
934    /// Get bloom filter statistics
935    pub fn bloom_stats(&self) -> (f64, u32) {
936        (self.bloom.fill_ratio(), self.bloom.count_set_bits())
937    }
938
939    /// Remove entity from indices
940    fn unindex_entity(&mut self, entity: &UnifiedEntity) {
941        // Kind index
942        let kind_key = entity.kind.storage_type().to_string();
943        if let Some(set) = self.kind_index.get_mut(&kind_key) {
944            set.remove(&entity.id);
945        }
946
947        // Primary key index
948        if let EntityData::Row(row) = &entity.data {
949            if let Some(first_col) = row.columns.first() {
950                let pk_str = format!("{:?}", first_col);
951                self.pk_index
952                    .remove(&(entity.kind.collection().to_string(), pk_str));
953            }
954        }
955
956        // Cross-reference indices
957        self.cross_ref_forward.remove(&entity.id);
958        // Note: reverse refs from this entity still need cleanup
959    }
960
961    /// Selective re-index for updates.
962    ///
963    /// Skips index work that is not needed:
964    /// - `kind_index`: entity kind is immutable, so remove+reinsert is always
965    ///   a no-op; we skip it entirely and keep the existing entry.
966    /// - `pk_index`: only updated when the primary-key column (first column of
967    ///   a Row entity) actually changed. When `modified_columns` is provided,
968    ///   we check membership; otherwise we compare old vs new pk value.
969    /// - `bloom`: add-only by design, so we only insert the new pk when it
970    ///   genuinely changes (old entry is a benign false positive).
971    /// - `cross_ref`: only rebuilt when the refs actually differ.
972    fn reindex_for_update(
973        &mut self,
974        old: &UpdateIndexSnapshot,
975        new: &UnifiedEntity,
976        modified_columns: Option<&[String]>,
977    ) {
978        // kind_index: kind is immutable — the existing entry is already correct.
979        // No remove + reinsert needed.
980
981        // bloom: entity ID never changes; already present from insert.
982
983        // pk_index: only update when pk column is touched
984        let pk_changed = match &new.data {
985            EntityData::Row(new_row) => {
986                if let Some(cols) = modified_columns {
987                    // Caller told us exactly what changed — check if first schema column modified
988                    // pk is the first column; check by name against the schema or by position 0
989                    let pk_col_name = old.pk_column_name.as_deref().or_else(|| {
990                        new_row
991                            .schema
992                            .as_deref()
993                            .and_then(|schema| schema.first().map(|name| name.as_str()))
994                    });
995                    match pk_col_name {
996                        Some(pk_name) => cols.iter().any(|c| c.eq_ignore_ascii_case(pk_name)),
997                        // No schema — fall back to value comparison
998                        None => old.pk_value.as_ref() != new_row.columns.first(),
999                    }
1000                } else {
1001                    old.pk_value.as_ref() != new_row.columns.first()
1002                }
1003            }
1004            // Non-row types don't use pk_index
1005            _ => false,
1006        };
1007
1008        if pk_changed {
1009            // Remove old pk entry
1010            if let Some((collection, pk_str)) = &old.pk_index_key {
1011                self.pk_index.remove(&(collection.clone(), pk_str.clone()));
1012            }
1013            // Insert new pk entry
1014            if let EntityData::Row(row) = &new.data {
1015                if let Some(first_col) = row.columns.first() {
1016                    let pk_str = format!("{:?}", first_col);
1017                    self.bloom.insert(pk_str.as_bytes());
1018                    self.pk_index
1019                        .insert((new.kind.collection().to_string(), pk_str), new.id);
1020                }
1021            }
1022        }
1023
1024        // cross_ref: only rebuild when refs actually changed
1025        let new_refs = new.cross_refs();
1026        if old.cross_refs.as_slice() != new_refs {
1027            // Remove stale forward refs
1028            self.cross_ref_forward.remove(&new.id);
1029            // Prune stale entries from reverse index
1030            for cross_ref in &old.cross_refs {
1031                if let Some(rev) = self.cross_ref_reverse.get_mut(&cross_ref.target) {
1032                    rev.retain(|(src, _)| *src != new.id);
1033                }
1034            }
1035            // Add new refs
1036            for cross_ref in new_refs {
1037                self.cross_ref_forward
1038                    .entry(cross_ref.source)
1039                    .or_default()
1040                    .push((cross_ref.target, cross_ref.ref_type));
1041                self.cross_ref_reverse
1042                    .entry(cross_ref.target)
1043                    .or_default()
1044                    .push((cross_ref.source, cross_ref.ref_type));
1045            }
1046        }
1047    }
1048
1049    /// Get entities referencing the given entity
1050    pub fn get_references_to(&self, id: EntityId) -> Vec<(EntityId, RefType)> {
1051        self.cross_ref_reverse.get(&id).cloned().unwrap_or_default()
1052    }
1053
1054    /// Get entities referenced by the given entity
1055    pub fn get_references_from(&self, id: EntityId) -> Vec<(EntityId, RefType)> {
1056        self.cross_ref_forward.get(&id).cloned().unwrap_or_default()
1057    }
1058
1059    /// Get memtable statistics
1060    pub fn memtable_stats(&self) -> super::memtable::MemtableStats {
1061        self.memtable.stats()
1062    }
1063
1064    /// Check if memtable should be flushed
1065    pub fn memtable_should_flush(&self) -> bool {
1066        self.memtable.should_flush()
1067    }
1068
1069    /// Get age in seconds
1070    pub fn age_secs(&self) -> u64 {
1071        let now = current_unix_secs();
1072        now.saturating_sub(self.created_at)
1073    }
1074
1075    /// Get time since last write
1076    pub fn idle_secs(&self) -> u64 {
1077        let now = current_unix_secs();
1078        now.saturating_sub(self.last_write_at)
1079    }
1080
1081    /// Turbo bulk insert — minimal allocations per entity.
1082    ///
1083    /// Optimizations vs normal insert:
1084    /// - Skips bloom filter, memtable, cross-refs, memory tracking
1085    /// - Computes kind_key ONCE (not per entity)
1086    /// - Pre-allocates kind_index HashSet
1087    /// - Skips contains_key check (caller guarantees unique IDs)
1088    /// - Uses Relaxed ordering for sequence counter
1089    pub fn bulk_insert(
1090        &mut self,
1091        entities: Vec<UnifiedEntity>,
1092    ) -> Result<Vec<EntityId>, SegmentError> {
1093        if !self.state.is_writable() {
1094            return Err(SegmentError::NotWritable);
1095        }
1096
1097        let n = entities.len();
1098
1099        // Compute kind_key ONCE
1100        let kind_key = if let Some(first) = entities.first() {
1101            first.kind.storage_type().to_string()
1102        } else {
1103            return Ok(Vec::new());
1104        };
1105
1106        let kind_set = self.kind_index.entry(kind_key).or_default();
1107        kind_set.reserve(n);
1108
1109        let now = current_unix_secs();
1110
1111        let base_seq = self.sequence.fetch_add(n as u64, Ordering::Relaxed);
1112
1113        let mut ids = Vec::with_capacity(n);
1114
1115        // Use flat storage (Vec) instead of HashMap — saves ~80 bytes/entity overhead
1116        if self.flat_entities.is_empty() && self.entities.is_empty() {
1117            // First bulk insert: initialize flat storage
1118            self.base_entity_id = entities.first().map(|e| e.id.raw()).unwrap_or(0);
1119            self.use_flat = true;
1120        }
1121
1122        // Collect zone-update values per column position, not per
1123        // (row, col). The old code did `col.to_string()` + HashMap
1124        // probe per cell — 15 cols × 25k rows = 375K String
1125        // allocations per bulk_insert call for the typed_insert bench.
1126        //
1127        // New shape: accumulate `Vec<Value>` per column index; after
1128        // the scan, walk each non-empty column exactly once, resolve
1129        // its name via the shared schema, and apply all observations
1130        // under a single HashMap entry. 375K allocs → ~ncols allocs.
1131        let mut columnar_zone_updates: Vec<Vec<Value>> = Vec::new();
1132        let mut columnar_schema: Option<std::sync::Arc<Vec<String>>> = None;
1133        // Named-row fallback (non-prevalidated path) still uses the
1134        // per-cell vec — these rows don't share a schema, so there's
1135        // no columnar shortcut.
1136        let mut named_zone_updates: Vec<(String, Value)> = Vec::new();
1137
1138        if self.use_flat {
1139            self.flat_entities.reserve(n);
1140            for (i, mut entity) in entities.into_iter().enumerate() {
1141                entity.sequence_id = base_seq + i as u64;
1142                let id = entity.id;
1143                kind_set.insert(id);
1144                ids.push(id);
1145                if let EntityData::Row(row) = &entity.data {
1146                    if row.schema.is_some() && !row.columns.is_empty() {
1147                        if columnar_zone_updates.is_empty() {
1148                            columnar_zone_updates = vec![Vec::with_capacity(n); row.columns.len()];
1149                            columnar_schema = row.schema.clone();
1150                        }
1151                        for (ci, val) in row.columns.iter().enumerate() {
1152                            if !matches!(val, Value::Null) {
1153                                if let Some(bucket) = columnar_zone_updates.get_mut(ci) {
1154                                    bucket.push(val.clone());
1155                                }
1156                            }
1157                        }
1158                    } else {
1159                        for (col, val) in row.iter_fields() {
1160                            if !matches!(val, Value::Null) {
1161                                named_zone_updates.push((col.to_string(), val.clone()));
1162                            }
1163                        }
1164                    }
1165                }
1166                // Position-based `flat_entities` access in `get(id)`
1167                // assumes IDs are contiguous starting at
1168                // `base_entity_id`. When IDs ARE contiguous (the bulk
1169                // ingest hot path) this saves a HashMap probe per
1170                // lookup. But the global ID counter can jump between
1171                // bulk calls — CREATE INDEX, catalog DDL, even other
1172                // collections in the same store reserve IDs from the
1173                // same allocator. If we blindly `push` a gap entity
1174                // its actual position in the Vec no longer matches
1175                // `id - base`, and `flat_entities[id - base].id == id`
1176                // fails — the entity becomes invisible to `get(id)`.
1177                //
1178                // Route gap entities into the HashMap fallback
1179                // (`self.entities`). `get(id)` already falls through
1180                // there when the flat probe misses.
1181                let expected = self.base_entity_id + self.flat_entities.len() as u64;
1182                if id.raw() == expected {
1183                    self.flat_entities.push(entity);
1184                } else {
1185                    self.entities.insert(id, entity);
1186                }
1187            }
1188        } else {
1189            // Fallback to HashMap for non-sequential inserts
1190            self.entities.reserve(n);
1191            let mut pairs = Vec::with_capacity(n);
1192            for (i, mut entity) in entities.into_iter().enumerate() {
1193                entity.sequence_id = base_seq + i as u64;
1194                let id = entity.id;
1195                kind_set.insert(id);
1196                ids.push(id);
1197                if let EntityData::Row(row) = &entity.data {
1198                    for (col, val) in row.iter_fields() {
1199                        if !matches!(val, Value::Null) {
1200                            named_zone_updates.push((col.to_string(), val.clone()));
1201                        }
1202                    }
1203                }
1204                pairs.push((id, entity));
1205            }
1206            self.entities.extend(pairs);
1207        }
1208
1209        // Apply zone updates now that kind_set borrow is released.
1210        // Columnar path: one `col_zones.entry` call per column (not
1211        // per cell). Named-fallback path: unchanged.
1212        let _ = kind_set;
1213        if !columnar_zone_updates.is_empty() {
1214            let schema = columnar_schema.as_ref();
1215            for (ci, values) in columnar_zone_updates.into_iter().enumerate() {
1216                if values.is_empty() {
1217                    continue;
1218                }
1219                let Some(col_name) = schema.and_then(|s| s.get(ci)) else {
1220                    continue;
1221                };
1222                // Enter the HashMap once per column. `raw_entry_mut`
1223                // avoids the String::clone when the column already
1224                // exists in the map — but that's nightly-only, so we
1225                // pay one `clone()` per call-per-column (ncols total)
1226                // instead of the old ncols×nrows.
1227                let mut iter = values.into_iter();
1228                if let Some(first) = iter.next() {
1229                    let zone = self
1230                        .col_zones
1231                        .entry(col_name.clone())
1232                        .and_modify(|z| z.update(&first))
1233                        .or_insert_with(|| ColZone::new(first));
1234                    for v in iter {
1235                        zone.update(&v);
1236                    }
1237                }
1238            }
1239        }
1240        for (col, val) in named_zone_updates {
1241            self.col_zones
1242                .entry(col)
1243                .and_modify(|z| z.update(&val))
1244                .or_insert_with(|| ColZone::new(val));
1245        }
1246
1247        self.last_write_at = now;
1248
1249        // Publish the new flat length so lock-free readers can see the new entities.
1250        if self.use_flat {
1251            self.published_flat_len
1252                .store(self.flat_entities.len(), Ordering::Release);
1253        }
1254
1255        Ok(ids)
1256    }
1257
1258    /// Delete from this segment regardless of its seal state.
1259    /// Used to mutate sealed segments when DELETE touches bulk-inserted entities.
1260    pub(crate) fn force_delete(&mut self, id: EntityId) -> bool {
1261        if self.use_flat {
1262            let raw = id.raw();
1263            if raw >= self.base_entity_id {
1264                let idx = (raw - self.base_entity_id) as usize;
1265                if idx < self.flat_entities.len() && self.flat_entities[idx].id == id {
1266                    self.deleted.insert(id);
1267                    self.metadata.remove_all(id);
1268                    return true;
1269                }
1270            }
1271            return false;
1272        }
1273
1274        if let Some(entity) = self.entities.remove(&id) {
1275            self.unindex_entity(&entity);
1276            self.metadata.remove_all(id);
1277            self.deleted.insert(id);
1278            true
1279        } else {
1280            false
1281        }
1282    }
1283
1284    /// Update an entity in this segment regardless of its seal state.
1285    /// Used to mutate sealed segments when UPDATE touches bulk-inserted entities.
1286    pub(crate) fn force_update_with_metadata(
1287        &mut self,
1288        entity: &UnifiedEntity,
1289        modified_columns: &[String],
1290        metadata: Option<&Metadata>,
1291    ) -> Result<(), SegmentError> {
1292        self.apply_hot_update_with_metadata(entity, modified_columns, metadata)
1293    }
1294}
1295
1296impl UnifiedSegment for GrowingSegment {
1297    fn id(&self) -> SegmentId {
1298        self.id
1299    }
1300
1301    fn state(&self) -> SegmentState {
1302        self.state
1303    }
1304
1305    fn collection(&self) -> &str {
1306        &self.collection
1307    }
1308
1309    fn stats(&self) -> SegmentStats {
1310        let mut stats = SegmentStats {
1311            entity_count: self.entities.len(),
1312            deleted_count: self.deleted.len(),
1313            memory_bytes: self.memory_bytes.load(Ordering::Relaxed) as usize,
1314            ..Default::default()
1315        };
1316
1317        for entity in self.entities.values() {
1318            match &entity.kind {
1319                EntityKind::TableRow { .. } => stats.row_count += 1,
1320                EntityKind::GraphNode(_) => stats.node_count += 1,
1321                EntityKind::GraphEdge(_) => stats.edge_count += 1,
1322                EntityKind::Vector { .. } => stats.vector_count += 1,
1323                EntityKind::TimeSeriesPoint(_) => stats.row_count += 1,
1324                EntityKind::QueueMessage { .. } => stats.row_count += 1,
1325            }
1326            stats.cross_ref_count += entity.cross_refs().len();
1327        }
1328
1329        stats
1330    }
1331
1332    fn entity_count(&self) -> usize {
1333        let total = if self.use_flat {
1334            self.flat_entities.len()
1335        } else {
1336            self.entities.len()
1337        };
1338        total.saturating_sub(self.deleted.len())
1339    }
1340
1341    fn contains(&self, id: EntityId) -> bool {
1342        self.has_live_entity(id)
1343    }
1344
1345    fn get(&self, id: EntityId) -> Option<&UnifiedEntity> {
1346        if self.deleted.contains(&id) {
1347            return None;
1348        }
1349        if self.use_flat {
1350            let raw = id.raw();
1351            if raw >= self.base_entity_id {
1352                let idx = (raw - self.base_entity_id) as usize;
1353                if let Some(entity) = self.flat_entities.get(idx).filter(|e| e.id == id) {
1354                    return Some(entity);
1355                }
1356            }
1357            // Fall through: once flat mode is initialized by a bulk_insert,
1358            // subsequent per-row `insert()` calls still write into the
1359            // HashMap (see the impl below) so reads must check both. Without
1360            // this fallback, a post-bulk single-row insert silently
1361            // disappears from `get()` / `query_all()`.
1362            self.entities.get(&id)
1363        } else {
1364            self.entities.get(&id)
1365        }
1366    }
1367
1368    fn get_mut(&mut self, id: EntityId) -> Option<&mut UnifiedEntity> {
1369        if self.deleted.contains(&id) || !self.state.is_writable() {
1370            return None;
1371        }
1372        if self.use_flat {
1373            let raw = id.raw();
1374            if raw >= self.base_entity_id {
1375                let idx = (raw - self.base_entity_id) as usize;
1376                if self
1377                    .flat_entities
1378                    .get(idx)
1379                    .map(|e| e.id == id)
1380                    .unwrap_or(false)
1381                {
1382                    return self.flat_entities.get_mut(idx);
1383                }
1384            }
1385            // Fall through to HashMap for entities inserted via
1386            // per-row `insert()` after flat mode was activated.
1387            self.entities.get_mut(&id)
1388        } else {
1389            self.entities.get_mut(&id)
1390        }
1391    }
1392
1393    fn insert(&mut self, mut entity: UnifiedEntity) -> Result<EntityId, SegmentError> {
1394        if !self.state.is_writable() {
1395            return Err(SegmentError::NotWritable);
1396        }
1397
1398        if self.entities.contains_key(&entity.id) {
1399            return Err(SegmentError::AlreadyExists(entity.id));
1400        }
1401
1402        // Assign sequence ID
1403        entity.sequence_id = self.next_sequence();
1404
1405        // Estimate and track memory
1406        let size = Self::estimate_entity_size(&entity);
1407        self.add_memory(size);
1408
1409        // Index the entity
1410        self.index_entity(&entity);
1411
1412        // Update column zone maps for range-based segment pruning
1413        self.update_col_zones_from_entity(&entity);
1414
1415        // Store
1416        let id = entity.id;
1417        self.entities.insert(id, entity);
1418
1419        // Update write timestamp
1420        self.last_write_at = current_unix_secs();
1421
1422        Ok(id)
1423    }
1424
1425    fn update(&mut self, entity: UnifiedEntity) -> Result<(), SegmentError> {
1426        if !self.state.is_writable() {
1427            return Err(SegmentError::NotWritable);
1428        }
1429
1430        self.apply_update_with_metadata(&entity, None)?;
1431        self.last_write_at = current_unix_secs();
1432
1433        Ok(())
1434    }
1435
1436    fn update_hot(
1437        &mut self,
1438        entity: UnifiedEntity,
1439        modified_columns: &[String],
1440    ) -> Result<(), SegmentError> {
1441        if !self.state.is_writable() {
1442            return Err(SegmentError::NotWritable);
1443        }
1444
1445        self.apply_hot_update_with_metadata(&entity, modified_columns, None)?;
1446        self.last_write_at = current_unix_secs();
1447        Ok(())
1448    }
1449
1450    fn delete(&mut self, id: EntityId) -> Result<bool, SegmentError> {
1451        if !self.state.is_writable() {
1452            return Err(SegmentError::NotWritable);
1453        }
1454
1455        // For flat storage, use tombstone (don't remove from Vec to keep indices valid)
1456        if self.use_flat {
1457            let raw = id.raw();
1458            if raw >= self.base_entity_id {
1459                let idx = (raw - self.base_entity_id) as usize;
1460                if idx < self.flat_entities.len() && self.flat_entities[idx].id == id {
1461                    self.metadata.remove_all(id);
1462                    self.deleted.insert(id);
1463                    return Ok(true);
1464                }
1465            }
1466            if let Some(entity) = self.entities.remove(&id) {
1467                self.unindex_entity(&entity);
1468                self.metadata.remove_all(id);
1469                self.deleted.insert(id);
1470                return Ok(true);
1471            }
1472            return Ok(false);
1473        }
1474
1475        // Remove entity from HashMap
1476        let entity = self.entities.remove(&id);
1477        if entity.is_none() {
1478            return Ok(false);
1479        }
1480
1481        // Unindex
1482        if let Some(ref e) = entity {
1483            self.unindex_entity(e);
1484        }
1485
1486        // Remove metadata
1487        self.metadata.remove_all(id);
1488
1489        // Mark as deleted (tombstone)
1490        self.deleted.insert(id);
1491
1492        Ok(true)
1493    }
1494
1495    fn get_metadata(&self, id: EntityId) -> Option<Metadata> {
1496        if !self.has_live_entity(id) {
1497            return None;
1498        }
1499        Some(self.metadata.get_all(id))
1500    }
1501
1502    fn set_metadata(&mut self, id: EntityId, metadata: Metadata) -> Result<(), SegmentError> {
1503        if !self.state.is_writable() {
1504            return Err(SegmentError::NotWritable);
1505        }
1506
1507        if !self.has_live_entity(id) {
1508            return Err(SegmentError::NotFound(id));
1509        }
1510
1511        self.metadata.set_all(id, &metadata);
1512        Ok(())
1513    }
1514
1515    fn seal(&mut self) -> Result<(), SegmentError> {
1516        if self.state != SegmentState::Growing {
1517            return Err(SegmentError::InvalidState(self.state));
1518        }
1519
1520        self.state = SegmentState::Sealing;
1521
1522        // Flush memtable: drain sorted entries for potential B-tree bulk insert
1523        let memtable_stats = self.memtable.stats();
1524        if memtable_stats.entry_count > 0 {
1525            // The memtable entries are entity ID keys in sorted order.
1526            // This ordering enables efficient sequential I/O for persistence.
1527            self.memtable.clear();
1528        }
1529
1530        // Build indices on the sealed data:
1531        // - Bloom filter is already populated from insert()
1532        // - HNSW/IVF for vectors (future)
1533        // - B-tree for sorted access (future)
1534        // - Inverted index for text search (future)
1535        self.rebuild_sealed_col_zones();
1536
1537        self.state = SegmentState::Sealed;
1538        Ok(())
1539    }
1540
1541    fn should_seal(&self, config: &SegmentConfig) -> bool {
1542        // Check entity count
1543        if self.entities.len() >= config.max_entities {
1544            return true;
1545        }
1546
1547        // Check memory usage
1548        if self.memory_bytes.load(Ordering::Relaxed) as usize >= config.max_bytes {
1549            return true;
1550        }
1551
1552        // Check age
1553        if self.age_secs() >= config.max_age_secs {
1554            return true;
1555        }
1556
1557        false
1558    }
1559
1560    fn iter(&self) -> Box<dyn Iterator<Item = &UnifiedEntity> + '_> {
1561        let base: Box<dyn Iterator<Item = &UnifiedEntity>> = if self.use_flat {
1562            // Chain flat entities (from bulk_insert) with any entities
1563            // that landed in the HashMap via per-row `insert()` after
1564            // flat mode was activated.
1565            Box::new(self.flat_entities.iter().chain(self.entities.values()))
1566        } else {
1567            Box::new(self.entities.values())
1568        };
1569        if self.deleted.is_empty() {
1570            base
1571        } else {
1572            Box::new(base.filter(|e| !self.deleted.contains(&e.id)))
1573        }
1574    }
1575
1576    fn iter_kind(&self, kind_filter: &str) -> Box<dyn Iterator<Item = &UnifiedEntity> + '_> {
1577        let ids = self.kind_index.get(kind_filter).cloned();
1578        // In flat mode entities live in `flat_entities`, not `entities` —
1579        // chain both so iter_kind doesn't drop bulk-inserted entities.
1580        let flat: Box<dyn Iterator<Item = &UnifiedEntity>> = if self.use_flat {
1581            Box::new(self.flat_entities.iter())
1582        } else {
1583            Box::new(std::iter::empty())
1584        };
1585        Box::new(flat.chain(self.entities.values()).filter(move |e| {
1586            if self.deleted.contains(&e.id) {
1587                return false;
1588            }
1589            if let Some(ref ids) = ids {
1590                ids.contains(&e.id)
1591            } else {
1592                false
1593            }
1594        }))
1595    }
1596
1597    fn filter_metadata(
1598        &self,
1599        filters: &[(String, super::metadata::MetadataFilter)],
1600    ) -> Vec<EntityId> {
1601        // For growing segments, we iterate and filter. In flat mode the
1602        // IDs live in `flat_entities`, not `entities`, so chain both.
1603        let flat_ids: Box<dyn Iterator<Item = EntityId> + '_> = if self.use_flat {
1604            Box::new(self.flat_entities.iter().map(|e| e.id))
1605        } else {
1606            Box::new(std::iter::empty())
1607        };
1608        flat_ids
1609            .chain(self.entities.keys().copied())
1610            .filter(|id| {
1611                if self.deleted.contains(id) {
1612                    return false;
1613                }
1614                let metadata = self.metadata.get_all(*id);
1615                metadata.matches_all(filters)
1616            })
1617            .collect()
1618    }
1619}
1620
1621fn build_minmax_multi_intervals(
1622    entries: &[(CanonicalKey, Value)],
1623    max_intervals: usize,
1624) -> Vec<ColZone> {
1625    if entries.is_empty() {
1626        return Vec::new();
1627    }
1628    if entries.len() == 1 || max_intervals <= 1 {
1629        return vec![ColZone::with_bounds(
1630            entries[0].1.clone(),
1631            entries[entries.len() - 1].1.clone(),
1632        )];
1633    }
1634
1635    let mut split_points = if entries.len() <= max_intervals {
1636        (1..entries.len()).collect::<Vec<_>>()
1637    } else {
1638        let target_splits = max_intervals - 1;
1639        let mut selected = select_gap_split_points(entries, target_splits);
1640        if selected.len() < target_splits {
1641            for bucket in 1..max_intervals {
1642                let idx = bucket * entries.len() / max_intervals;
1643                if idx == 0 || idx >= entries.len() || selected.contains(&idx) {
1644                    continue;
1645                }
1646                selected.push(idx);
1647                if selected.len() >= target_splits {
1648                    break;
1649                }
1650            }
1651        }
1652        selected.sort_unstable();
1653        selected.dedup();
1654        selected
1655    };
1656
1657    split_points.push(entries.len());
1658
1659    let mut out = Vec::with_capacity(split_points.len());
1660    let mut start = 0usize;
1661    for end in split_points {
1662        if end <= start {
1663            continue;
1664        }
1665        out.push(ColZone::with_bounds(
1666            entries[start].1.clone(),
1667            entries[end - 1].1.clone(),
1668        ));
1669        start = end;
1670    }
1671
1672    if out.is_empty() {
1673        out.push(ColZone::with_bounds(
1674            entries[0].1.clone(),
1675            entries[entries.len() - 1].1.clone(),
1676        ));
1677    }
1678
1679    out
1680}
1681
1682fn select_gap_split_points(entries: &[(CanonicalKey, Value)], max_splits: usize) -> Vec<usize> {
1683    let mut gaps = Vec::new();
1684    for idx in 1..entries.len() {
1685        if let Some(score) = canonical_gap_score(&entries[idx - 1].0, &entries[idx].0) {
1686            if score > 0.0 {
1687                gaps.push((score, idx));
1688            }
1689        }
1690    }
1691    gaps.sort_by(|left, right| {
1692        right
1693            .0
1694            .partial_cmp(&left.0)
1695            .unwrap_or(std::cmp::Ordering::Equal)
1696            .then_with(|| left.1.cmp(&right.1))
1697    });
1698    gaps.into_iter()
1699        .take(max_splits)
1700        .map(|(_, idx)| idx)
1701        .collect()
1702}
1703
1704fn canonical_gap_score(left: &CanonicalKey, right: &CanonicalKey) -> Option<f64> {
1705    if left.family() != right.family() {
1706        return None;
1707    }
1708    match (left, right) {
1709        (CanonicalKey::Signed(_, l), CanonicalKey::Signed(_, r)) => {
1710            Some(r.saturating_sub(*l) as f64)
1711        }
1712        (CanonicalKey::Unsigned(_, l), CanonicalKey::Unsigned(_, r)) => {
1713            Some(r.saturating_sub(*l) as f64)
1714        }
1715        (CanonicalKey::Float(l), CanonicalKey::Float(r)) => {
1716            Some((f64::from_bits(*r) - f64::from_bits(*l)).abs())
1717        }
1718        _ => None,
1719    }
1720}
1721
1722#[cfg(test)]
1723mod tests {
1724    use super::*;
1725    use crate::storage::schema::Value;
1726    use crate::storage::unified::entity::RowData;
1727    use crate::storage::unified::MetadataValue;
1728
1729    #[test]
1730    fn test_growing_segment_basic() {
1731        let mut segment = GrowingSegment::new(1, "test");
1732
1733        let entity = UnifiedEntity::table_row(
1734            EntityId::new(1),
1735            "users",
1736            1,
1737            vec![Value::text("Alice".to_string())],
1738        );
1739
1740        let id = segment.insert(entity).unwrap();
1741        assert_eq!(id, EntityId::new(1));
1742        assert!(segment.contains(id));
1743
1744        let stats = segment.stats();
1745        assert_eq!(stats.entity_count, 1);
1746        assert_eq!(stats.row_count, 1);
1747    }
1748
1749    #[test]
1750    fn test_segment_metadata() {
1751        let mut segment = GrowingSegment::new(1, "test");
1752
1753        let entity = UnifiedEntity::table_row(
1754            EntityId::new(1),
1755            "users",
1756            1,
1757            vec![Value::text("Alice".to_string())],
1758        );
1759        segment.insert(entity).unwrap();
1760
1761        let mut meta = Metadata::new();
1762        meta.set("role", MetadataValue::String("admin".to_string()));
1763        meta.set("level", MetadataValue::Int(5));
1764
1765        segment.set_metadata(EntityId::new(1), meta).unwrap();
1766
1767        let retrieved = segment.get_metadata(EntityId::new(1)).unwrap();
1768        assert_eq!(
1769            retrieved.get("role"),
1770            Some(&MetadataValue::String("admin".to_string()))
1771        );
1772    }
1773
1774    #[test]
1775    fn test_segment_seal() {
1776        let mut segment = GrowingSegment::new(1, "test");
1777
1778        let entity = UnifiedEntity::vector(EntityId::new(1), "embeddings", vec![0.1, 0.2, 0.3]);
1779        segment.insert(entity).unwrap();
1780
1781        // Can write before sealing
1782        assert!(segment.state().is_writable());
1783
1784        // Seal the segment
1785        segment.seal().unwrap();
1786        assert_eq!(segment.state(), SegmentState::Sealed);
1787
1788        // Cannot write after sealing
1789        let entity2 = UnifiedEntity::vector(EntityId::new(2), "embeddings", vec![0.4, 0.5, 0.6]);
1790        assert!(segment.insert(entity2).is_err());
1791    }
1792
1793    #[test]
1794    fn test_should_seal() {
1795        let mut segment = GrowingSegment::new(1, "test");
1796
1797        let config = SegmentConfig {
1798            max_entities: 2,
1799            ..Default::default()
1800        };
1801
1802        assert!(!segment.should_seal(&config));
1803
1804        segment
1805            .insert(UnifiedEntity::vector(EntityId::new(1), "v", vec![0.1]))
1806            .unwrap();
1807        assert!(!segment.should_seal(&config));
1808
1809        segment
1810            .insert(UnifiedEntity::vector(EntityId::new(2), "v", vec![0.2]))
1811            .unwrap();
1812        assert!(segment.should_seal(&config));
1813    }
1814
1815    #[test]
1816    fn test_cross_references() {
1817        let mut segment = GrowingSegment::new(1, "test");
1818
1819        let mut entity1 = UnifiedEntity::table_row(
1820            EntityId::new(1),
1821            "hosts",
1822            1,
1823            vec![Value::text("192.168.1.1".to_string())],
1824        );
1825        entity1.add_cross_ref(CrossRef::new(
1826            EntityId::new(1),
1827            EntityId::new(2),
1828            "nodes",
1829            RefType::RowToNode,
1830        ));
1831        segment.insert(entity1).unwrap();
1832
1833        let refs_from = segment.get_references_from(EntityId::new(1));
1834        assert_eq!(refs_from.len(), 1);
1835        assert_eq!(refs_from[0], (EntityId::new(2), RefType::RowToNode));
1836
1837        let refs_to = segment.get_references_to(EntityId::new(2));
1838        assert_eq!(refs_to.len(), 1);
1839        assert_eq!(refs_to[0], (EntityId::new(1), RefType::RowToNode));
1840    }
1841
1842    #[test]
1843    fn test_zone_predicate_uses_canonical_fallback_for_email_values() {
1844        let mut zone = ColZone::new(Value::Email("bravo@example.com".to_string()));
1845        zone.update(&Value::Email("delta@example.com".to_string()));
1846
1847        let probe = Value::Email("alpha@example.com".to_string());
1848        assert!(ZoneColPred::Eq(&probe).can_skip(&zone));
1849
1850        let in_range = Value::Email("charlie@example.com".to_string());
1851        assert!(!ZoneColPred::Eq(&in_range).can_skip(&zone));
1852    }
1853
1854    #[test]
1855    fn test_sealed_multi_zone_prunes_numeric_gap_outlier() {
1856        let mut segment = GrowingSegment::new(1, "test");
1857
1858        for (row_id, age) in [(1_u64, 1_i64), (2, 2), (3, 3), (4, 1000)] {
1859            let entity = UnifiedEntity::new(
1860                EntityId::new(row_id),
1861                EntityKind::TableRow {
1862                    table: "users".into(),
1863                    row_id,
1864                },
1865                EntityData::Row(RowData::with_names(
1866                    vec![Value::Integer(age)],
1867                    vec!["age".to_string()],
1868                )),
1869            );
1870            segment.insert(entity).unwrap();
1871        }
1872
1873        segment.seal().unwrap();
1874
1875        let miss = Value::Integer(500);
1876        assert!(segment.can_skip_zone_preds(&[("age", ZoneColPred::Eq(&miss))]));
1877
1878        let hit = Value::Integer(1000);
1879        assert!(!segment.can_skip_zone_preds(&[("age", ZoneColPred::Eq(&hit))]));
1880    }
1881}