Skip to main content

reddb_server/storage/unified/
segment.rs

1//! Unified Segment System
2//!
3//! Implements the Growing → Sealed segment lifecycle pattern inspired by
4//! Milvus and ChromaDB. Segments are the fundamental unit of storage
5//! that handle entities of all types.
6//!
7//! # Lifecycle
8//!
9//! ```text
10//! Growing (in-memory, accepts writes)
11//!    ↓ seal() when full or manually triggered
12//! Sealed (immutable, fully indexed)
13//!    ↓ flush() for persistence
14//! Flushed (on disk, can be mmap'd)
15//!    ↓ archive() for cold storage
16//! Archived (compressed, infrequently accessed)
17//! ```
18
19use std::collections::{BTreeMap, HashMap, HashSet};
20use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
21use std::time::{SystemTime, UNIX_EPOCH};
22
23use super::entity::{CrossRef, EntityData, EntityId, EntityKind, RefType, UnifiedEntity};
24use super::memtable::Memtable;
25use super::metadata::{Metadata, MetadataStorage};
26use crate::storage::primitives::bloom::BloomFilter;
27use crate::storage::query::value_compare::partial_compare_values;
28use crate::storage::schema::{value_to_canonical_key, CanonicalKey, Value};
29
30/// Unique identifier for a segment
31pub type SegmentId = u64;
32
33/// Segment state in its lifecycle
34#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35pub enum SegmentState {
36    /// Accepts writes, partial/no index
37    Growing,
38    /// Transitioning, building indices
39    Sealing,
40    /// Immutable, fully indexed
41    Sealed,
42    /// Persisted to disk
43    Flushed,
44    /// Compressed, cold storage
45    Archived,
46}
47
48impl SegmentState {
49    /// Check if segment accepts writes
50    pub fn is_writable(&self) -> bool {
51        matches!(self, Self::Growing)
52    }
53
54    /// Check if segment is queryable
55    pub fn is_queryable(&self) -> bool {
56        !matches!(self, Self::Sealing)
57    }
58
59    /// Check if segment is immutable
60    pub fn is_immutable(&self) -> bool {
61        matches!(self, Self::Sealed | Self::Flushed | Self::Archived)
62    }
63}
64
65/// Configuration for segments
66#[derive(Debug, Clone)]
67pub struct SegmentConfig {
68    /// Maximum entities before auto-sealing
69    pub max_entities: usize,
70    /// Maximum memory bytes before auto-sealing
71    pub max_bytes: usize,
72    /// Maximum age in seconds before auto-sealing
73    pub max_age_secs: u64,
74    /// Enable vector indexing when sealed
75    pub build_vector_index: bool,
76    /// Enable graph indexing when sealed
77    pub build_graph_index: bool,
78    /// Compression level for archived segments (0-9)
79    pub compression_level: u8,
80}
81
82impl Default for SegmentConfig {
83    fn default() -> Self {
84        Self {
85            max_entities: 100_000,
86            max_bytes: 256 * 1024 * 1024, // 256 MB
87            max_age_secs: 3600,           // 1 hour
88            build_vector_index: true,
89            build_graph_index: true,
90            compression_level: 6,
91        }
92    }
93}
94
95/// Segment statistics
96#[derive(Debug, Clone, Default)]
97pub struct SegmentStats {
98    /// Number of entities
99    pub entity_count: usize,
100    /// Number of deleted entities
101    pub deleted_count: usize,
102    /// Approximate memory usage in bytes
103    pub memory_bytes: usize,
104    /// Number of vectors
105    pub vector_count: usize,
106    /// Number of graph nodes
107    pub node_count: usize,
108    /// Number of graph edges
109    pub edge_count: usize,
110    /// Number of table rows
111    pub row_count: usize,
112    /// Number of cross-references
113    pub cross_ref_count: usize,
114}
115
116/// Segment error types
117#[derive(Debug, Clone)]
118pub enum SegmentError {
119    /// Segment is not writable
120    NotWritable,
121    /// Entity not found
122    NotFound(EntityId),
123    /// Entity already exists
124    AlreadyExists(EntityId),
125    /// Segment is full
126    Full,
127    /// Invalid operation for current state
128    InvalidState(SegmentState),
129    /// Internal error
130    Internal(String),
131}
132
133impl std::fmt::Display for SegmentError {
134    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
135        match self {
136            Self::NotWritable => write!(f, "segment is not writable"),
137            Self::NotFound(id) => write!(f, "entity not found: {}", id),
138            Self::AlreadyExists(id) => write!(f, "entity already exists: {}", id),
139            Self::Full => write!(f, "segment is full"),
140            Self::InvalidState(state) => write!(f, "invalid operation for state: {:?}", state),
141            Self::Internal(msg) => write!(f, "internal error: {}", msg),
142        }
143    }
144}
145
146impl std::error::Error for SegmentError {}
147
148fn current_unix_secs() -> u64 {
149    SystemTime::now()
150        .duration_since(UNIX_EPOCH)
151        .unwrap_or_default()
152        .as_secs()
153}
154
155const SEALED_MULTI_ZONE_MAX_INTERVALS: usize = 4;
156
157#[derive(Debug, Clone)]
158struct UpdateIndexSnapshot {
159    pk_column_name: Option<String>,
160    pk_value: Option<Value>,
161    pk_index_key: Option<(String, String)>,
162    cross_refs: Vec<CrossRef>,
163}
164
165impl UpdateIndexSnapshot {
166    fn from_entity(entity: &UnifiedEntity) -> Self {
167        let (pk_column_name, pk_value) = match &entity.data {
168            EntityData::Row(row) => (
169                row.schema
170                    .as_deref()
171                    .and_then(|schema| schema.first().cloned()),
172                row.columns.first().cloned(),
173            ),
174            _ => (None, None),
175        };
176        let pk_index_key = pk_value
177            .as_ref()
178            .map(|value| (entity.kind.collection().to_string(), format!("{:?}", value)));
179        Self {
180            pk_column_name,
181            pk_value,
182            pk_index_key,
183            cross_refs: entity.cross_refs().to_vec(),
184        }
185    }
186}
187
188/// A unified segment that stores all entity types
189pub trait UnifiedSegment: Send + Sync {
190    /// Get segment ID
191    fn id(&self) -> SegmentId;
192
193    /// Get current state
194    fn state(&self) -> SegmentState;
195
196    /// Get collection/namespace name
197    fn collection(&self) -> &str;
198
199    /// Get statistics
200    fn stats(&self) -> SegmentStats;
201
202    /// O(1) live entity count (entities minus tombstones)
203    fn entity_count(&self) -> usize;
204
205    /// Check if entity exists
206    fn contains(&self, id: EntityId) -> bool;
207
208    /// Get an entity by ID
209    fn get(&self, id: EntityId) -> Option<&UnifiedEntity>;
210
211    /// Get mutable reference to entity
212    fn get_mut(&mut self, id: EntityId) -> Option<&mut UnifiedEntity>;
213
214    /// Insert a new entity
215    fn insert(&mut self, entity: UnifiedEntity) -> Result<EntityId, SegmentError>;
216
217    /// Update an existing entity
218    fn update(&mut self, entity: UnifiedEntity) -> Result<(), SegmentError>;
219
220    /// HOT-update: like update but receives the set of field names that actually
221    /// changed. Allows skipping index work when indexed columns are unaffected.
222    /// Default: falls back to full update.
223    fn update_hot(
224        &mut self,
225        entity: UnifiedEntity,
226        modified_columns: &[String],
227    ) -> Result<(), SegmentError> {
228        let _ = modified_columns;
229        self.update(entity)
230    }
231
232    /// Delete an entity
233    fn delete(&mut self, id: EntityId) -> Result<bool, SegmentError>;
234
235    /// Get metadata for an entity
236    fn get_metadata(&self, id: EntityId) -> Option<Metadata>;
237
238    /// Set metadata for an entity
239    fn set_metadata(&mut self, id: EntityId, metadata: Metadata) -> Result<(), SegmentError>;
240
241    /// Seal the segment (make immutable)
242    fn seal(&mut self) -> Result<(), SegmentError>;
243
244    /// Check if should auto-seal based on config
245    fn should_seal(&self, config: &SegmentConfig) -> bool;
246
247    /// Iterate over all entities
248    fn iter(&self) -> Box<dyn Iterator<Item = &UnifiedEntity> + '_>;
249
250    /// Iterate over entities of a specific kind
251    fn iter_kind(&self, kind_filter: &str) -> Box<dyn Iterator<Item = &UnifiedEntity> + '_>;
252
253    /// Search entities by metadata filter
254    fn filter_metadata(
255        &self,
256        filters: &[(String, super::metadata::MetadataFilter)],
257    ) -> Vec<EntityId>;
258}
259
260// ─────────────────────────────────────────────────────────────────────────────
261// Zone map: per-column min/max for segment pruning
262// ─────────────────────────────────────────────────────────────────────────────
263
264/// Tracks the min and max observed `Value` for one column in a segment.
265/// Used to skip segments that cannot satisfy a range or equality predicate.
266#[derive(Debug, Clone)]
267pub struct ColZone {
268    pub min: Value,
269    pub max: Value,
270    min_key: Option<CanonicalKey>,
271    max_key: Option<CanonicalKey>,
272}
273
274impl ColZone {
275    fn new(v: Value) -> Self {
276        Self {
277            min_key: value_to_canonical_key(&v),
278            max_key: value_to_canonical_key(&v),
279            min: v.clone(),
280            max: v,
281        }
282    }
283
284    fn with_bounds(min: Value, max: Value) -> Self {
285        Self {
286            min_key: value_to_canonical_key(&min),
287            max_key: value_to_canonical_key(&max),
288            min,
289            max,
290        }
291    }
292
293    fn update(&mut self, v: &Value) {
294        if compare_zone_values(v, None, &self.min, self.min_key.as_ref())
295            .map(|o| o == std::cmp::Ordering::Less)
296            .unwrap_or(false)
297        {
298            self.min = v.clone();
299            self.min_key = value_to_canonical_key(v);
300        }
301        if compare_zone_values(v, None, &self.max, self.max_key.as_ref())
302            .map(|o| o == std::cmp::Ordering::Greater)
303            .unwrap_or(false)
304        {
305            self.max = v.clone();
306            self.max_key = value_to_canonical_key(v);
307        }
308    }
309}
310
311#[derive(Debug, Clone, Default)]
312pub struct MultiColZone {
313    pub intervals: Vec<ColZone>,
314}
315
316impl MultiColZone {
317    fn can_skip(&self, pred: &ZoneColPred<'_>) -> bool {
318        !self.intervals.is_empty() && self.intervals.iter().all(|zone| pred.can_skip(zone))
319    }
320}
321
322fn compare_zone_values(
323    left: &Value,
324    left_key: Option<&CanonicalKey>,
325    right: &Value,
326    right_key: Option<&CanonicalKey>,
327) -> Option<std::cmp::Ordering> {
328    partial_compare_values(left, right).or_else(|| {
329        let left_key = left_key.cloned().or_else(|| value_to_canonical_key(left))?;
330        let right_key = right_key
331            .cloned()
332            .or_else(|| value_to_canonical_key(right))?;
333        (left_key.family() == right_key.family()).then(|| left_key.cmp(&right_key))
334    })
335}
336
337/// Tag-only variant of `ZoneColPred` — used where the Value is stored separately.
338#[derive(Debug, Clone, Copy, PartialEq, Eq)]
339pub enum ZoneColPredKind {
340    Eq,
341    Gt,
342    Gte,
343    Lt,
344    Lte,
345}
346
347/// A predicate on a single column that can be checked against a `ColZone`.
348#[derive(Debug, Clone)]
349pub enum ZoneColPred<'a> {
350    Eq(&'a Value),
351    Gt(&'a Value),
352    Gte(&'a Value),
353    Lt(&'a Value),
354    Lte(&'a Value),
355}
356
357impl<'a> ZoneColPred<'a> {
358    /// Returns `true` when the entire segment can be skipped (no row can match).
359    pub fn can_skip(&self, zone: &ColZone) -> bool {
360        match self {
361            // Equality: skip if val < min OR val > max
362            ZoneColPred::Eq(val) => {
363                compare_zone_values(val, None, &zone.min, zone.min_key.as_ref())
364                    .map(|o| o == std::cmp::Ordering::Less)
365                    .unwrap_or(false)
366                    || compare_zone_values(val, None, &zone.max, zone.max_key.as_ref())
367                        .map(|o| o == std::cmp::Ordering::Greater)
368                        .unwrap_or(false)
369            }
370            // col > val: skip if max <= val (all rows have col ≤ val, none > val)
371            ZoneColPred::Gt(val) => {
372                compare_zone_values(&zone.max, zone.max_key.as_ref(), val, None)
373                    .map(|o| o != std::cmp::Ordering::Greater)
374                    .unwrap_or(false)
375            }
376            // col >= val: skip if max < val
377            ZoneColPred::Gte(val) => {
378                compare_zone_values(&zone.max, zone.max_key.as_ref(), val, None)
379                    .map(|o| o == std::cmp::Ordering::Less)
380                    .unwrap_or(false)
381            }
382            // col < val: skip if min >= val
383            ZoneColPred::Lt(val) => {
384                compare_zone_values(&zone.min, zone.min_key.as_ref(), val, None)
385                    .map(|o| o != std::cmp::Ordering::Less)
386                    .unwrap_or(false)
387            }
388            // col <= val: skip if min > val
389            ZoneColPred::Lte(val) => {
390                compare_zone_values(&zone.min, zone.min_key.as_ref(), val, None)
391                    .map(|o| o == std::cmp::Ordering::Greater)
392                    .unwrap_or(false)
393            }
394        }
395    }
396}
397
398/// Growing segment implementation (in-memory, writable)
399pub struct GrowingSegment {
400    /// Segment ID
401    id: SegmentId,
402    /// Collection/namespace name
403    collection: String,
404    /// Current state
405    state: SegmentState,
406    /// Creation timestamp
407    created_at: u64,
408    /// Last write timestamp
409    last_write_at: u64,
410
411    /// Entity storage (HashMap for random access)
412    entities: HashMap<EntityId, UnifiedEntity>,
413    /// Flat entity storage for bulk inserts (no HashMap overhead, O(1) by offset)
414    /// Used when entity IDs are sequential from base_entity_id
415    flat_entities: Vec<UnifiedEntity>,
416    /// Base entity ID for flat_entities (flat_entities[0].id == base_entity_id)
417    base_entity_id: u64,
418    /// Whether flat storage is active (bulk insert mode)
419    use_flat: bool,
420    /// Deleted entity IDs (tombstones)
421    deleted: HashSet<EntityId>,
422    /// Metadata storage (type-aware)
423    metadata: MetadataStorage,
424
425    /// Primary key index: (collection, pk_value) → EntityId
426    pk_index: BTreeMap<(String, String), EntityId>,
427    /// Type index: kind → EntityIds
428    kind_index: HashMap<String, HashSet<EntityId>>,
429    /// Cross-reference index: source → Vec<(target, ref_type)>
430    cross_ref_forward: HashMap<EntityId, Vec<(EntityId, RefType)>>,
431    /// Reverse cross-reference index: target → Vec<(source, ref_type)>
432    cross_ref_reverse: HashMap<EntityId, Vec<(EntityId, RefType)>>,
433
434    /// Bloom filter for fast negative key lookups
435    bloom: BloomFilter,
436
437    /// Write buffer for absorbing write spikes (sorted by key)
438    memtable: Memtable,
439
440    /// Per-column zone maps: col_name → (min, max) for segment pruning
441    col_zones: HashMap<String, ColZone>,
442    /// Sealed-only minmax-multi summaries built from canonical ordering.
443    sealed_col_zones: HashMap<String, MultiColZone>,
444
445    /// Sequence counter for ordering
446    sequence: AtomicU64,
447    /// Approximate memory usage
448    memory_bytes: AtomicU64,
449
450    /// Epoch counter for lock-free reads of `flat_entities`.
451    ///
452    /// Updated with `Release` ordering after every flat-mode insert so that
453    /// readers can safely access `flat_entities[0..published_flat_len]` by
454    /// loading with `Acquire` ordering, without holding the segment RwLock.
455    /// Only meaningful when `use_flat == true`; always 0 in HashMap mode.
456    pub(crate) published_flat_len: AtomicUsize,
457}
458
459impl GrowingSegment {
460    /// Direct iteration without Box<dyn> trait dispatch. Returns false to stop early.
461    /// Uses concrete iterator types to avoid heap allocation per call.
462    #[inline]
463    pub fn for_each_fast<F>(&self, mut f: F) -> bool
464    where
465        F: FnMut(&UnifiedEntity) -> bool,
466    {
467        if self.use_flat {
468            // Sequential Vec — best cache locality
469            if self.deleted.is_empty() {
470                for entity in &self.flat_entities {
471                    if !f(entity) {
472                        return false;
473                    }
474                }
475                // Also walk the HashMap — entities added via per-row
476                // `insert()` after flat mode was initialized land there.
477                for entity in self.entities.values() {
478                    if !f(entity) {
479                        return false;
480                    }
481                }
482            } else {
483                for entity in &self.flat_entities {
484                    if self.deleted.contains(&entity.id) {
485                        continue;
486                    }
487                    if !f(entity) {
488                        return false;
489                    }
490                }
491                for entity in self.entities.values() {
492                    if self.deleted.contains(&entity.id) {
493                        continue;
494                    }
495                    if !f(entity) {
496                        return false;
497                    }
498                }
499            }
500        } else {
501            // HashMap values — random order, no boxing
502            if self.deleted.is_empty() {
503                for entity in self.entities.values() {
504                    if !f(entity) {
505                        return false;
506                    }
507                }
508            } else {
509                for entity in self.entities.values() {
510                    if self.deleted.contains(&entity.id) {
511                        continue;
512                    }
513                    if !f(entity) {
514                        return false;
515                    }
516                }
517            }
518        }
519        true
520    }
521
522    /// Create a new growing segment
523    pub fn new(id: SegmentId, collection: impl Into<String>) -> Self {
524        let now = current_unix_secs();
525
526        Self {
527            id,
528            collection: collection.into(),
529            state: SegmentState::Growing,
530            created_at: now,
531            last_write_at: now,
532            entities: HashMap::new(),
533            flat_entities: Vec::new(),
534            base_entity_id: 0,
535            use_flat: false,
536            deleted: HashSet::new(),
537            metadata: MetadataStorage::new(),
538            pk_index: BTreeMap::new(),
539            kind_index: HashMap::new(),
540            cross_ref_forward: HashMap::new(),
541            cross_ref_reverse: HashMap::new(),
542            bloom: BloomFilter::with_capacity(100_000, 0.01),
543            memtable: Memtable::new(),
544            col_zones: HashMap::new(),
545            sealed_col_zones: HashMap::new(),
546            sequence: AtomicU64::new(0),
547            memory_bytes: AtomicU64::new(0),
548            published_flat_len: AtomicUsize::new(0),
549        }
550    }
551
552    /// Get next sequence number
553    fn next_sequence(&self) -> u64 {
554        self.sequence.fetch_add(1, Ordering::SeqCst)
555    }
556
557    fn has_live_entity(&self, id: EntityId) -> bool {
558        if self.deleted.contains(&id) {
559            return false;
560        }
561        if self.use_flat {
562            let raw = id.raw();
563            if raw >= self.base_entity_id {
564                let idx = (raw - self.base_entity_id) as usize;
565                if self
566                    .flat_entities
567                    .get(idx)
568                    .is_some_and(|entity| entity.id == id)
569                {
570                    return true;
571                }
572            }
573            // Same fallback as `get()` — entities added via `insert()`
574            // after flat mode was initialized live only in the HashMap.
575            self.entities.contains_key(&id)
576        } else {
577            self.entities.contains_key(&id)
578        }
579    }
580
581    fn update_existing_entity_in_place(
582        &mut self,
583        entity: &UnifiedEntity,
584    ) -> Result<UpdateIndexSnapshot, SegmentError> {
585        if self.use_flat {
586            let raw = entity.id.raw();
587            if raw < self.base_entity_id {
588                return Err(SegmentError::NotFound(entity.id));
589            }
590            let idx = (raw - self.base_entity_id) as usize;
591            let Some(slot) = self.flat_entities.get_mut(idx) else {
592                return Err(SegmentError::NotFound(entity.id));
593            };
594            if slot.id != entity.id {
595                return Err(SegmentError::NotFound(entity.id));
596            }
597            let snapshot = UpdateIndexSnapshot::from_entity(slot);
598            slot.clone_from(entity);
599            Ok(snapshot)
600        } else {
601            let Some(slot) = self.entities.get_mut(&entity.id) else {
602                return Err(SegmentError::NotFound(entity.id));
603            };
604            let snapshot = UpdateIndexSnapshot::from_entity(slot);
605            slot.clone_from(entity);
606            Ok(snapshot)
607        }
608    }
609
610    fn apply_hot_update_with_metadata(
611        &mut self,
612        entity: &UnifiedEntity,
613        modified_columns: &[String],
614        metadata: Option<&Metadata>,
615    ) -> Result<(), SegmentError> {
616        let old = self.update_existing_entity_in_place(entity)?;
617        self.reindex_for_update(&old, entity, Some(modified_columns));
618        self.update_col_zones_from_entity(entity);
619        if let Some(metadata) = metadata {
620            self.metadata.set_all(entity.id, metadata);
621        }
622        Ok(())
623    }
624
625    fn apply_update_with_metadata(
626        &mut self,
627        entity: &UnifiedEntity,
628        metadata: Option<&Metadata>,
629    ) -> Result<(), SegmentError> {
630        let old = self.update_existing_entity_in_place(entity)?;
631        self.reindex_for_update(&old, entity, None);
632        self.update_col_zones_from_entity(entity);
633        if let Some(metadata) = metadata {
634            self.metadata.set_all(entity.id, metadata);
635        }
636        Ok(())
637    }
638
639    pub fn update_hot_batch_with_metadata<'a, I>(&mut self, items: I) -> Result<(), SegmentError>
640    where
641        I: IntoIterator<Item = (&'a UnifiedEntity, &'a [String], Option<&'a Metadata>)>,
642    {
643        if !self.state.is_writable() {
644            return Err(SegmentError::NotWritable);
645        }
646
647        let items: Vec<(&UnifiedEntity, &[String], Option<&Metadata>)> =
648            items.into_iter().collect();
649        if items.is_empty() {
650            return Ok(());
651        }
652
653        for (entity, _, _) in &items {
654            if !self.has_live_entity(entity.id) {
655                return Err(SegmentError::NotFound(entity.id));
656            }
657        }
658
659        for (entity, modified_columns, metadata) in items {
660            self.apply_hot_update_with_metadata(entity, modified_columns, metadata)?;
661        }
662
663        self.last_write_at = current_unix_secs();
664        Ok(())
665    }
666
667    pub fn delete_batch(&mut self, ids: &[EntityId]) -> Result<Vec<EntityId>, SegmentError> {
668        if !self.state.is_writable() {
669            return Err(SegmentError::NotWritable);
670        }
671        if ids.is_empty() {
672            return Ok(Vec::new());
673        }
674
675        let mut deleted_ids = Vec::with_capacity(ids.len());
676
677        if self.use_flat {
678            for &id in ids {
679                let raw = id.raw();
680                if raw < self.base_entity_id {
681                    continue;
682                }
683                let idx = (raw - self.base_entity_id) as usize;
684                if idx < self.flat_entities.len()
685                    && self.flat_entities[idx].id == id
686                    && !self.deleted.contains(&id)
687                {
688                    self.metadata.remove_all(id);
689                    self.deleted.insert(id);
690                    deleted_ids.push(id);
691                }
692            }
693        } else {
694            for &id in ids {
695                if let Some(entity) = self.entities.remove(&id) {
696                    self.unindex_entity(&entity);
697                    self.metadata.remove_all(id);
698                    self.deleted.insert(id);
699                    deleted_ids.push(id);
700                }
701            }
702        }
703
704        if !deleted_ids.is_empty() {
705            self.last_write_at = current_unix_secs();
706        }
707
708        Ok(deleted_ids)
709    }
710
711    /// Update memory estimate
712    fn add_memory(&self, bytes: usize) {
713        self.memory_bytes.fetch_add(bytes as u64, Ordering::Relaxed);
714    }
715
716    /// Estimate memory for an entity
717    fn estimate_entity_size(entity: &UnifiedEntity) -> usize {
718        let mut size = std::mem::size_of::<UnifiedEntity>();
719
720        // Add data size
721        size += match &entity.data {
722            EntityData::Row(row) => row.columns.len() * 64, // Rough estimate
723            EntityData::Node(node) => node.properties.len() * 128,
724            EntityData::Edge(edge) => edge.properties.len() * 128,
725            EntityData::Vector(vec) => {
726                vec.dense.len() * 4 + vec.sparse.as_ref().map_or(0, |s| s.indices.len() * 8)
727            }
728            EntityData::TimeSeries(_) => 64,
729            EntityData::QueueMessage(_) => 128,
730        };
731
732        // Add embeddings
733        for emb in entity.embeddings() {
734            size += emb.vector.len() * 4 + emb.name.len() + emb.model.len();
735        }
736
737        // Add cross-refs
738        size += std::mem::size_of_val(entity.cross_refs());
739
740        size
741    }
742
743    /// Update per-column zone maps from a newly inserted entity's fields.
744    ///
745    /// Handles both insert paths:
746    /// - **Named** (`row.named`): individual inserts where fields are a `HashMap<String, Value>`
747    /// - **Positional** (`row.columns` + `row.schema`): bulk-inserted entities stored as `Vec<Value>`
748    ///   keyed by the shared schema. Previously this path was silently skipped, meaning zone maps
749    ///   were always empty for bulk-loaded tables and segment pruning never fired.
750    fn update_col_zones_from_entity(&mut self, entity: &UnifiedEntity) {
751        if let EntityData::Row(row) = &entity.data {
752            if let Some(named) = &row.named {
753                // Individual insert path — HashMap fields
754                for (col, val) in named {
755                    if matches!(val, Value::Null) {
756                        continue;
757                    }
758                    self.col_zones
759                        .entry(col.clone())
760                        .and_modify(|z| z.update(val))
761                        .or_insert_with(|| ColZone::new(val.clone()));
762                }
763            } else if let Some(schema) = &row.schema {
764                // Bulk-insert (columnar) path — positional Vec<Value> + shared schema.
765                // Previously skipped: zone maps were always empty for bulk-loaded tables.
766                for (col, val) in schema.iter().zip(row.columns.iter()) {
767                    if matches!(val, Value::Null) {
768                        continue;
769                    }
770                    self.col_zones
771                        .entry(col.clone())
772                        .and_modify(|z| z.update(val))
773                        .or_insert_with(|| ColZone::new(val.clone()));
774                }
775            }
776        }
777    }
778
779    fn rebuild_sealed_col_zones(&mut self) {
780        let mut values_by_col: HashMap<String, Vec<(CanonicalKey, Value)>> = HashMap::new();
781        let mut family_by_col: HashMap<String, crate::storage::schema::CanonicalKeyFamily> =
782            HashMap::new();
783        let mut mixed_family_cols = HashSet::new();
784        let mut unsupported_cols = HashSet::new();
785
786        let mut observe_row = |row: &super::entity::RowData| {
787            for (col, value) in row.iter_fields() {
788                if matches!(value, Value::Null) {
789                    continue;
790                }
791                let Some(key) = value_to_canonical_key(value) else {
792                    unsupported_cols.insert(col.to_string());
793                    continue;
794                };
795                match family_by_col.get(col).copied() {
796                    Some(existing) if existing != key.family() => {
797                        mixed_family_cols.insert(col.to_string());
798                    }
799                    None => {
800                        family_by_col.insert(col.to_string(), key.family());
801                    }
802                    _ => {}
803                }
804                values_by_col
805                    .entry(col.to_string())
806                    .or_default()
807                    .push((key, value.clone()));
808            }
809        };
810
811        if self.use_flat {
812            for entity in &self.flat_entities {
813                if self.deleted.contains(&entity.id) {
814                    continue;
815                }
816                if let EntityData::Row(row) = &entity.data {
817                    observe_row(row);
818                }
819            }
820        } else {
821            for entity in self.entities.values() {
822                if self.deleted.contains(&entity.id) {
823                    continue;
824                }
825                if let EntityData::Row(row) = &entity.data {
826                    observe_row(row);
827                }
828            }
829        }
830
831        let mut sealed_col_zones = HashMap::new();
832        for (col, mut entries) in values_by_col {
833            if mixed_family_cols.contains(&col)
834                || unsupported_cols.contains(&col)
835                || entries.is_empty()
836            {
837                continue;
838            }
839            entries.sort_unstable_by(|left, right| left.0.cmp(&right.0));
840            entries.dedup_by(|left, right| left.0 == right.0);
841
842            let intervals = build_minmax_multi_intervals(&entries, SEALED_MULTI_ZONE_MAX_INTERVALS);
843            if intervals.len() > 1 {
844                sealed_col_zones.insert(col, MultiColZone { intervals });
845            }
846        }
847
848        self.sealed_col_zones = sealed_col_zones;
849    }
850
851    /// Returns `true` when this segment can be entirely skipped for the given predicates.
852    /// A segment is skipped only if ALL predicates say so (conservative: any non-skippable
853    /// predicate forces the scan to proceed).
854    pub fn can_skip_zone_preds(&self, preds: &[(&str, ZoneColPred<'_>)]) -> bool {
855        if preds.is_empty() {
856            return false;
857        }
858        for (col, pred) in preds {
859            if let Some(zone) = self.sealed_col_zones.get(*col) {
860                if zone.can_skip(pred) {
861                    return true;
862                }
863                continue;
864            }
865            if let Some(zone) = self.col_zones.get(*col) {
866                if pred.can_skip(zone) {
867                    return true; // ONE predicate suffices to skip
868                }
869            }
870        }
871        false
872    }
873
874    /// Index an entity
875    fn index_entity(&mut self, entity: &UnifiedEntity) {
876        // Kind index
877        let kind_key = entity.kind.storage_type().to_string();
878        self.kind_index
879            .entry(kind_key)
880            .or_default()
881            .insert(entity.id);
882
883        // Bloom filter: insert entity ID bytes for fast negative lookups
884        let id_bytes = entity.id.raw().to_le_bytes();
885        self.bloom.insert(&id_bytes);
886
887        // Primary key index (if applicable)
888        if let EntityData::Row(row) = &entity.data {
889            if let Some(first_col) = row.columns.first() {
890                let pk_str = format!("{:?}", first_col);
891                // Also add PK to bloom filter
892                self.bloom.insert(pk_str.as_bytes());
893                self.pk_index
894                    .insert((entity.kind.collection().to_string(), pk_str), entity.id);
895            }
896        }
897
898        // Cross-reference indices
899        for cross_ref in entity.cross_refs() {
900            self.cross_ref_forward
901                .entry(cross_ref.source)
902                .or_default()
903                .push((cross_ref.target, cross_ref.ref_type));
904
905            self.cross_ref_reverse
906                .entry(cross_ref.target)
907                .or_default()
908                .push((cross_ref.source, cross_ref.ref_type));
909        }
910    }
911
912    /// Check if an entity ID might exist in this segment via bloom filter.
913    /// Returns `false` means *definitely not here*. `true` means *maybe here*.
914    pub fn bloom_might_contain_id(&self, id: EntityId) -> bool {
915        let id_bytes = id.raw().to_le_bytes();
916        self.bloom.contains(&id_bytes)
917    }
918
919    /// Check if a primary key value might exist in this segment via bloom filter.
920    pub fn bloom_might_contain_key(&self, key: &[u8]) -> bool {
921        self.bloom.contains(key)
922    }
923
924    /// Get bloom filter statistics
925    pub fn bloom_stats(&self) -> (f64, u32) {
926        (self.bloom.fill_ratio(), self.bloom.count_set_bits())
927    }
928
929    /// Remove entity from indices
930    fn unindex_entity(&mut self, entity: &UnifiedEntity) {
931        // Kind index
932        let kind_key = entity.kind.storage_type().to_string();
933        if let Some(set) = self.kind_index.get_mut(&kind_key) {
934            set.remove(&entity.id);
935        }
936
937        // Primary key index
938        if let EntityData::Row(row) = &entity.data {
939            if let Some(first_col) = row.columns.first() {
940                let pk_str = format!("{:?}", first_col);
941                self.pk_index
942                    .remove(&(entity.kind.collection().to_string(), pk_str));
943            }
944        }
945
946        // Cross-reference indices
947        self.cross_ref_forward.remove(&entity.id);
948        // Note: reverse refs from this entity still need cleanup
949    }
950
951    /// Selective re-index for updates.
952    ///
953    /// Skips index work that is not needed:
954    /// - `kind_index`: entity kind is immutable, so remove+reinsert is always
955    ///   a no-op; we skip it entirely and keep the existing entry.
956    /// - `pk_index`: only updated when the primary-key column (first column of
957    ///   a Row entity) actually changed. When `modified_columns` is provided,
958    ///   we check membership; otherwise we compare old vs new pk value.
959    /// - `bloom`: add-only by design, so we only insert the new pk when it
960    ///   genuinely changes (old entry is a benign false positive).
961    /// - `cross_ref`: only rebuilt when the refs actually differ.
962    fn reindex_for_update(
963        &mut self,
964        old: &UpdateIndexSnapshot,
965        new: &UnifiedEntity,
966        modified_columns: Option<&[String]>,
967    ) {
968        // kind_index: kind is immutable — the existing entry is already correct.
969        // No remove + reinsert needed.
970
971        // bloom: entity ID never changes; already present from insert.
972
973        // pk_index: only update when pk column is touched
974        let pk_changed = match &new.data {
975            EntityData::Row(new_row) => {
976                if let Some(cols) = modified_columns {
977                    // Caller told us exactly what changed — check if first schema column modified
978                    // pk is the first column; check by name against the schema or by position 0
979                    let pk_col_name = old.pk_column_name.as_deref().or_else(|| {
980                        new_row
981                            .schema
982                            .as_deref()
983                            .and_then(|schema| schema.first().map(|name| name.as_str()))
984                    });
985                    match pk_col_name {
986                        Some(pk_name) => cols.iter().any(|c| c.eq_ignore_ascii_case(pk_name)),
987                        // No schema — fall back to value comparison
988                        None => old.pk_value.as_ref() != new_row.columns.first(),
989                    }
990                } else {
991                    old.pk_value.as_ref() != new_row.columns.first()
992                }
993            }
994            // Non-row types don't use pk_index
995            _ => false,
996        };
997
998        if pk_changed {
999            // Remove old pk entry
1000            if let Some((collection, pk_str)) = &old.pk_index_key {
1001                self.pk_index.remove(&(collection.clone(), pk_str.clone()));
1002            }
1003            // Insert new pk entry
1004            if let EntityData::Row(row) = &new.data {
1005                if let Some(first_col) = row.columns.first() {
1006                    let pk_str = format!("{:?}", first_col);
1007                    self.bloom.insert(pk_str.as_bytes());
1008                    self.pk_index
1009                        .insert((new.kind.collection().to_string(), pk_str), new.id);
1010                }
1011            }
1012        }
1013
1014        // cross_ref: only rebuild when refs actually changed
1015        let new_refs = new.cross_refs();
1016        if old.cross_refs.as_slice() != new_refs {
1017            // Remove stale forward refs
1018            self.cross_ref_forward.remove(&new.id);
1019            // Prune stale entries from reverse index
1020            for cross_ref in &old.cross_refs {
1021                if let Some(rev) = self.cross_ref_reverse.get_mut(&cross_ref.target) {
1022                    rev.retain(|(src, _)| *src != new.id);
1023                }
1024            }
1025            // Add new refs
1026            for cross_ref in new_refs {
1027                self.cross_ref_forward
1028                    .entry(cross_ref.source)
1029                    .or_default()
1030                    .push((cross_ref.target, cross_ref.ref_type));
1031                self.cross_ref_reverse
1032                    .entry(cross_ref.target)
1033                    .or_default()
1034                    .push((cross_ref.source, cross_ref.ref_type));
1035            }
1036        }
1037    }
1038
1039    /// Get entities referencing the given entity
1040    pub fn get_references_to(&self, id: EntityId) -> Vec<(EntityId, RefType)> {
1041        self.cross_ref_reverse.get(&id).cloned().unwrap_or_default()
1042    }
1043
1044    /// Get entities referenced by the given entity
1045    pub fn get_references_from(&self, id: EntityId) -> Vec<(EntityId, RefType)> {
1046        self.cross_ref_forward.get(&id).cloned().unwrap_or_default()
1047    }
1048
1049    /// Get memtable statistics
1050    pub fn memtable_stats(&self) -> super::memtable::MemtableStats {
1051        self.memtable.stats()
1052    }
1053
1054    /// Check if memtable should be flushed
1055    pub fn memtable_should_flush(&self) -> bool {
1056        self.memtable.should_flush()
1057    }
1058
1059    /// Get age in seconds
1060    pub fn age_secs(&self) -> u64 {
1061        let now = current_unix_secs();
1062        now.saturating_sub(self.created_at)
1063    }
1064
1065    /// Get time since last write
1066    pub fn idle_secs(&self) -> u64 {
1067        let now = current_unix_secs();
1068        now.saturating_sub(self.last_write_at)
1069    }
1070
1071    /// Turbo bulk insert — minimal allocations per entity.
1072    ///
1073    /// Optimizations vs normal insert:
1074    /// - Skips bloom filter, memtable, cross-refs, memory tracking
1075    /// - Computes kind_key ONCE (not per entity)
1076    /// - Pre-allocates kind_index HashSet
1077    /// - Skips contains_key check (caller guarantees unique IDs)
1078    /// - Uses Relaxed ordering for sequence counter
1079    pub fn bulk_insert(
1080        &mut self,
1081        entities: Vec<UnifiedEntity>,
1082    ) -> Result<Vec<EntityId>, SegmentError> {
1083        if !self.state.is_writable() {
1084            return Err(SegmentError::NotWritable);
1085        }
1086
1087        let n = entities.len();
1088
1089        // Compute kind_key ONCE
1090        let kind_key = if let Some(first) = entities.first() {
1091            first.kind.storage_type().to_string()
1092        } else {
1093            return Ok(Vec::new());
1094        };
1095
1096        let kind_set = self.kind_index.entry(kind_key).or_default();
1097        kind_set.reserve(n);
1098
1099        let now = current_unix_secs();
1100
1101        let base_seq = self.sequence.fetch_add(n as u64, Ordering::Relaxed);
1102
1103        let mut ids = Vec::with_capacity(n);
1104
1105        // Use flat storage (Vec) instead of HashMap — saves ~80 bytes/entity overhead
1106        if self.flat_entities.is_empty() && self.entities.is_empty() {
1107            // First bulk insert: initialize flat storage
1108            self.base_entity_id = entities.first().map(|e| e.id.raw()).unwrap_or(0);
1109            self.use_flat = true;
1110        }
1111
1112        // Collect zone-update values per column position, not per
1113        // (row, col). The old code did `col.to_string()` + HashMap
1114        // probe per cell — 15 cols × 25k rows = 375K String
1115        // allocations per bulk_insert call for the typed_insert bench.
1116        //
1117        // New shape: accumulate `Vec<Value>` per column index; after
1118        // the scan, walk each non-empty column exactly once, resolve
1119        // its name via the shared schema, and apply all observations
1120        // under a single HashMap entry. 375K allocs → ~ncols allocs.
1121        let mut columnar_zone_updates: Vec<Vec<Value>> = Vec::new();
1122        let mut columnar_schema: Option<std::sync::Arc<Vec<String>>> = None;
1123        // Named-row fallback (non-prevalidated path) still uses the
1124        // per-cell vec — these rows don't share a schema, so there's
1125        // no columnar shortcut.
1126        let mut named_zone_updates: Vec<(String, Value)> = Vec::new();
1127
1128        if self.use_flat {
1129            self.flat_entities.reserve(n);
1130            for (i, mut entity) in entities.into_iter().enumerate() {
1131                entity.sequence_id = base_seq + i as u64;
1132                let id = entity.id;
1133                kind_set.insert(id);
1134                ids.push(id);
1135                if let EntityData::Row(row) = &entity.data {
1136                    if row.schema.is_some() && !row.columns.is_empty() {
1137                        if columnar_zone_updates.is_empty() {
1138                            columnar_zone_updates = vec![Vec::with_capacity(n); row.columns.len()];
1139                            columnar_schema = row.schema.clone();
1140                        }
1141                        for (ci, val) in row.columns.iter().enumerate() {
1142                            if !matches!(val, Value::Null) {
1143                                if let Some(bucket) = columnar_zone_updates.get_mut(ci) {
1144                                    bucket.push(val.clone());
1145                                }
1146                            }
1147                        }
1148                    } else {
1149                        for (col, val) in row.iter_fields() {
1150                            if !matches!(val, Value::Null) {
1151                                named_zone_updates.push((col.to_string(), val.clone()));
1152                            }
1153                        }
1154                    }
1155                }
1156                // Position-based `flat_entities` access in `get(id)`
1157                // assumes IDs are contiguous starting at
1158                // `base_entity_id`. When IDs ARE contiguous (the bulk
1159                // ingest hot path) this saves a HashMap probe per
1160                // lookup. But the global ID counter can jump between
1161                // bulk calls — CREATE INDEX, catalog DDL, even other
1162                // collections in the same store reserve IDs from the
1163                // same allocator. If we blindly `push` a gap entity
1164                // its actual position in the Vec no longer matches
1165                // `id - base`, and `flat_entities[id - base].id == id`
1166                // fails — the entity becomes invisible to `get(id)`.
1167                //
1168                // Route gap entities into the HashMap fallback
1169                // (`self.entities`). `get(id)` already falls through
1170                // there when the flat probe misses.
1171                let expected = self.base_entity_id + self.flat_entities.len() as u64;
1172                if id.raw() == expected {
1173                    self.flat_entities.push(entity);
1174                } else {
1175                    self.entities.insert(id, entity);
1176                }
1177            }
1178        } else {
1179            // Fallback to HashMap for non-sequential inserts
1180            self.entities.reserve(n);
1181            let mut pairs = Vec::with_capacity(n);
1182            for (i, mut entity) in entities.into_iter().enumerate() {
1183                entity.sequence_id = base_seq + i as u64;
1184                let id = entity.id;
1185                kind_set.insert(id);
1186                ids.push(id);
1187                if let EntityData::Row(row) = &entity.data {
1188                    for (col, val) in row.iter_fields() {
1189                        if !matches!(val, Value::Null) {
1190                            named_zone_updates.push((col.to_string(), val.clone()));
1191                        }
1192                    }
1193                }
1194                pairs.push((id, entity));
1195            }
1196            self.entities.extend(pairs);
1197        }
1198
1199        // Apply zone updates now that kind_set borrow is released.
1200        // Columnar path: one `col_zones.entry` call per column (not
1201        // per cell). Named-fallback path: unchanged.
1202        let _ = kind_set;
1203        if !columnar_zone_updates.is_empty() {
1204            let schema = columnar_schema.as_ref();
1205            for (ci, values) in columnar_zone_updates.into_iter().enumerate() {
1206                if values.is_empty() {
1207                    continue;
1208                }
1209                let Some(col_name) = schema.and_then(|s| s.get(ci)) else {
1210                    continue;
1211                };
1212                // Enter the HashMap once per column. `raw_entry_mut`
1213                // avoids the String::clone when the column already
1214                // exists in the map — but that's nightly-only, so we
1215                // pay one `clone()` per call-per-column (ncols total)
1216                // instead of the old ncols×nrows.
1217                let mut iter = values.into_iter();
1218                if let Some(first) = iter.next() {
1219                    let zone = self
1220                        .col_zones
1221                        .entry(col_name.clone())
1222                        .and_modify(|z| z.update(&first))
1223                        .or_insert_with(|| ColZone::new(first));
1224                    for v in iter {
1225                        zone.update(&v);
1226                    }
1227                }
1228            }
1229        }
1230        for (col, val) in named_zone_updates {
1231            self.col_zones
1232                .entry(col)
1233                .and_modify(|z| z.update(&val))
1234                .or_insert_with(|| ColZone::new(val));
1235        }
1236
1237        self.last_write_at = now;
1238
1239        // Publish the new flat length so lock-free readers can see the new entities.
1240        if self.use_flat {
1241            self.published_flat_len
1242                .store(self.flat_entities.len(), Ordering::Release);
1243        }
1244
1245        Ok(ids)
1246    }
1247
1248    /// Delete from this segment regardless of its seal state.
1249    /// Used to mutate sealed segments when DELETE touches bulk-inserted entities.
1250    pub(crate) fn force_delete(&mut self, id: EntityId) -> bool {
1251        if self.use_flat {
1252            let raw = id.raw();
1253            if raw >= self.base_entity_id {
1254                let idx = (raw - self.base_entity_id) as usize;
1255                if idx < self.flat_entities.len() && self.flat_entities[idx].id == id {
1256                    self.deleted.insert(id);
1257                    self.metadata.remove_all(id);
1258                    return true;
1259                }
1260            }
1261            return false;
1262        }
1263
1264        if let Some(entity) = self.entities.remove(&id) {
1265            self.unindex_entity(&entity);
1266            self.metadata.remove_all(id);
1267            self.deleted.insert(id);
1268            true
1269        } else {
1270            false
1271        }
1272    }
1273
1274    /// Update an entity in this segment regardless of its seal state.
1275    /// Used to mutate sealed segments when UPDATE touches bulk-inserted entities.
1276    pub(crate) fn force_update_with_metadata(
1277        &mut self,
1278        entity: &UnifiedEntity,
1279        modified_columns: &[String],
1280        metadata: Option<&Metadata>,
1281    ) -> Result<(), SegmentError> {
1282        self.apply_hot_update_with_metadata(entity, modified_columns, metadata)
1283    }
1284}
1285
1286impl UnifiedSegment for GrowingSegment {
1287    fn id(&self) -> SegmentId {
1288        self.id
1289    }
1290
1291    fn state(&self) -> SegmentState {
1292        self.state
1293    }
1294
1295    fn collection(&self) -> &str {
1296        &self.collection
1297    }
1298
1299    fn stats(&self) -> SegmentStats {
1300        let mut stats = SegmentStats {
1301            entity_count: self.entities.len(),
1302            deleted_count: self.deleted.len(),
1303            memory_bytes: self.memory_bytes.load(Ordering::Relaxed) as usize,
1304            ..Default::default()
1305        };
1306
1307        for entity in self.entities.values() {
1308            match &entity.kind {
1309                EntityKind::TableRow { .. } => stats.row_count += 1,
1310                EntityKind::GraphNode(_) => stats.node_count += 1,
1311                EntityKind::GraphEdge(_) => stats.edge_count += 1,
1312                EntityKind::Vector { .. } => stats.vector_count += 1,
1313                EntityKind::TimeSeriesPoint(_) => stats.row_count += 1,
1314                EntityKind::QueueMessage { .. } => stats.row_count += 1,
1315            }
1316            stats.cross_ref_count += entity.cross_refs().len();
1317        }
1318
1319        stats
1320    }
1321
1322    fn entity_count(&self) -> usize {
1323        let total = if self.use_flat {
1324            self.flat_entities.len()
1325        } else {
1326            self.entities.len()
1327        };
1328        total.saturating_sub(self.deleted.len())
1329    }
1330
1331    fn contains(&self, id: EntityId) -> bool {
1332        self.has_live_entity(id)
1333    }
1334
1335    fn get(&self, id: EntityId) -> Option<&UnifiedEntity> {
1336        if self.deleted.contains(&id) {
1337            return None;
1338        }
1339        if self.use_flat {
1340            let raw = id.raw();
1341            if raw >= self.base_entity_id {
1342                let idx = (raw - self.base_entity_id) as usize;
1343                if let Some(entity) = self.flat_entities.get(idx).filter(|e| e.id == id) {
1344                    return Some(entity);
1345                }
1346            }
1347            // Fall through: once flat mode is initialized by a bulk_insert,
1348            // subsequent per-row `insert()` calls still write into the
1349            // HashMap (see the impl below) so reads must check both. Without
1350            // this fallback, a post-bulk single-row insert silently
1351            // disappears from `get()` / `query_all()`.
1352            self.entities.get(&id)
1353        } else {
1354            self.entities.get(&id)
1355        }
1356    }
1357
1358    fn get_mut(&mut self, id: EntityId) -> Option<&mut UnifiedEntity> {
1359        if self.deleted.contains(&id) || !self.state.is_writable() {
1360            return None;
1361        }
1362        if self.use_flat {
1363            let raw = id.raw();
1364            if raw >= self.base_entity_id {
1365                let idx = (raw - self.base_entity_id) as usize;
1366                if self
1367                    .flat_entities
1368                    .get(idx)
1369                    .map(|e| e.id == id)
1370                    .unwrap_or(false)
1371                {
1372                    return self.flat_entities.get_mut(idx);
1373                }
1374            }
1375            // Fall through to HashMap for entities inserted via
1376            // per-row `insert()` after flat mode was activated.
1377            self.entities.get_mut(&id)
1378        } else {
1379            self.entities.get_mut(&id)
1380        }
1381    }
1382
1383    fn insert(&mut self, mut entity: UnifiedEntity) -> Result<EntityId, SegmentError> {
1384        if !self.state.is_writable() {
1385            return Err(SegmentError::NotWritable);
1386        }
1387
1388        if self.entities.contains_key(&entity.id) {
1389            return Err(SegmentError::AlreadyExists(entity.id));
1390        }
1391
1392        // Assign sequence ID
1393        entity.sequence_id = self.next_sequence();
1394
1395        // Estimate and track memory
1396        let size = Self::estimate_entity_size(&entity);
1397        self.add_memory(size);
1398
1399        // Index the entity
1400        self.index_entity(&entity);
1401
1402        // Update column zone maps for range-based segment pruning
1403        self.update_col_zones_from_entity(&entity);
1404
1405        // Store
1406        let id = entity.id;
1407        self.entities.insert(id, entity);
1408
1409        // Update write timestamp
1410        self.last_write_at = current_unix_secs();
1411
1412        Ok(id)
1413    }
1414
1415    fn update(&mut self, entity: UnifiedEntity) -> Result<(), SegmentError> {
1416        if !self.state.is_writable() {
1417            return Err(SegmentError::NotWritable);
1418        }
1419
1420        self.apply_update_with_metadata(&entity, None)?;
1421        self.last_write_at = current_unix_secs();
1422
1423        Ok(())
1424    }
1425
1426    fn update_hot(
1427        &mut self,
1428        entity: UnifiedEntity,
1429        modified_columns: &[String],
1430    ) -> Result<(), SegmentError> {
1431        if !self.state.is_writable() {
1432            return Err(SegmentError::NotWritable);
1433        }
1434
1435        self.apply_hot_update_with_metadata(&entity, modified_columns, None)?;
1436        self.last_write_at = current_unix_secs();
1437        Ok(())
1438    }
1439
1440    fn delete(&mut self, id: EntityId) -> Result<bool, SegmentError> {
1441        if !self.state.is_writable() {
1442            return Err(SegmentError::NotWritable);
1443        }
1444
1445        // For flat storage, use tombstone (don't remove from Vec to keep indices valid)
1446        if self.use_flat {
1447            let raw = id.raw();
1448            if raw >= self.base_entity_id {
1449                let idx = (raw - self.base_entity_id) as usize;
1450                if idx < self.flat_entities.len() && self.flat_entities[idx].id == id {
1451                    self.metadata.remove_all(id);
1452                    self.deleted.insert(id);
1453                    return Ok(true);
1454                }
1455            }
1456            return Ok(false);
1457        }
1458
1459        // Remove entity from HashMap
1460        let entity = self.entities.remove(&id);
1461        if entity.is_none() {
1462            return Ok(false);
1463        }
1464
1465        // Unindex
1466        if let Some(ref e) = entity {
1467            self.unindex_entity(e);
1468        }
1469
1470        // Remove metadata
1471        self.metadata.remove_all(id);
1472
1473        // Mark as deleted (tombstone)
1474        self.deleted.insert(id);
1475
1476        Ok(true)
1477    }
1478
1479    fn get_metadata(&self, id: EntityId) -> Option<Metadata> {
1480        if !self.has_live_entity(id) {
1481            return None;
1482        }
1483        Some(self.metadata.get_all(id))
1484    }
1485
1486    fn set_metadata(&mut self, id: EntityId, metadata: Metadata) -> Result<(), SegmentError> {
1487        if !self.state.is_writable() {
1488            return Err(SegmentError::NotWritable);
1489        }
1490
1491        if !self.has_live_entity(id) {
1492            return Err(SegmentError::NotFound(id));
1493        }
1494
1495        self.metadata.set_all(id, &metadata);
1496        Ok(())
1497    }
1498
1499    fn seal(&mut self) -> Result<(), SegmentError> {
1500        if self.state != SegmentState::Growing {
1501            return Err(SegmentError::InvalidState(self.state));
1502        }
1503
1504        self.state = SegmentState::Sealing;
1505
1506        // Flush memtable: drain sorted entries for potential B-tree bulk insert
1507        let memtable_stats = self.memtable.stats();
1508        if memtable_stats.entry_count > 0 {
1509            // The memtable entries are entity ID keys in sorted order.
1510            // This ordering enables efficient sequential I/O for persistence.
1511            self.memtable.clear();
1512        }
1513
1514        // Build indices on the sealed data:
1515        // - Bloom filter is already populated from insert()
1516        // - HNSW/IVF for vectors (future)
1517        // - B-tree for sorted access (future)
1518        // - Inverted index for text search (future)
1519        self.rebuild_sealed_col_zones();
1520
1521        self.state = SegmentState::Sealed;
1522        Ok(())
1523    }
1524
1525    fn should_seal(&self, config: &SegmentConfig) -> bool {
1526        // Check entity count
1527        if self.entities.len() >= config.max_entities {
1528            return true;
1529        }
1530
1531        // Check memory usage
1532        if self.memory_bytes.load(Ordering::Relaxed) as usize >= config.max_bytes {
1533            return true;
1534        }
1535
1536        // Check age
1537        if self.age_secs() >= config.max_age_secs {
1538            return true;
1539        }
1540
1541        false
1542    }
1543
1544    fn iter(&self) -> Box<dyn Iterator<Item = &UnifiedEntity> + '_> {
1545        let base: Box<dyn Iterator<Item = &UnifiedEntity>> = if self.use_flat {
1546            // Chain flat entities (from bulk_insert) with any entities
1547            // that landed in the HashMap via per-row `insert()` after
1548            // flat mode was activated.
1549            Box::new(self.flat_entities.iter().chain(self.entities.values()))
1550        } else {
1551            Box::new(self.entities.values())
1552        };
1553        if self.deleted.is_empty() {
1554            base
1555        } else {
1556            Box::new(base.filter(|e| !self.deleted.contains(&e.id)))
1557        }
1558    }
1559
1560    fn iter_kind(&self, kind_filter: &str) -> Box<dyn Iterator<Item = &UnifiedEntity> + '_> {
1561        let ids = self.kind_index.get(kind_filter).cloned();
1562        // In flat mode entities live in `flat_entities`, not `entities` —
1563        // chain both so iter_kind doesn't drop bulk-inserted entities.
1564        let flat: Box<dyn Iterator<Item = &UnifiedEntity>> = if self.use_flat {
1565            Box::new(self.flat_entities.iter())
1566        } else {
1567            Box::new(std::iter::empty())
1568        };
1569        Box::new(flat.chain(self.entities.values()).filter(move |e| {
1570            if self.deleted.contains(&e.id) {
1571                return false;
1572            }
1573            if let Some(ref ids) = ids {
1574                ids.contains(&e.id)
1575            } else {
1576                false
1577            }
1578        }))
1579    }
1580
1581    fn filter_metadata(
1582        &self,
1583        filters: &[(String, super::metadata::MetadataFilter)],
1584    ) -> Vec<EntityId> {
1585        // For growing segments, we iterate and filter. In flat mode the
1586        // IDs live in `flat_entities`, not `entities`, so chain both.
1587        let flat_ids: Box<dyn Iterator<Item = EntityId> + '_> = if self.use_flat {
1588            Box::new(self.flat_entities.iter().map(|e| e.id))
1589        } else {
1590            Box::new(std::iter::empty())
1591        };
1592        flat_ids
1593            .chain(self.entities.keys().copied())
1594            .filter(|id| {
1595                if self.deleted.contains(id) {
1596                    return false;
1597                }
1598                let metadata = self.metadata.get_all(*id);
1599                metadata.matches_all(filters)
1600            })
1601            .collect()
1602    }
1603}
1604
1605fn build_minmax_multi_intervals(
1606    entries: &[(CanonicalKey, Value)],
1607    max_intervals: usize,
1608) -> Vec<ColZone> {
1609    if entries.is_empty() {
1610        return Vec::new();
1611    }
1612    if entries.len() == 1 || max_intervals <= 1 {
1613        return vec![ColZone::with_bounds(
1614            entries[0].1.clone(),
1615            entries[entries.len() - 1].1.clone(),
1616        )];
1617    }
1618
1619    let mut split_points = if entries.len() <= max_intervals {
1620        (1..entries.len()).collect::<Vec<_>>()
1621    } else {
1622        let target_splits = max_intervals - 1;
1623        let mut selected = select_gap_split_points(entries, target_splits);
1624        if selected.len() < target_splits {
1625            for bucket in 1..max_intervals {
1626                let idx = bucket * entries.len() / max_intervals;
1627                if idx == 0 || idx >= entries.len() || selected.contains(&idx) {
1628                    continue;
1629                }
1630                selected.push(idx);
1631                if selected.len() >= target_splits {
1632                    break;
1633                }
1634            }
1635        }
1636        selected.sort_unstable();
1637        selected.dedup();
1638        selected
1639    };
1640
1641    split_points.push(entries.len());
1642
1643    let mut out = Vec::with_capacity(split_points.len());
1644    let mut start = 0usize;
1645    for end in split_points {
1646        if end <= start {
1647            continue;
1648        }
1649        out.push(ColZone::with_bounds(
1650            entries[start].1.clone(),
1651            entries[end - 1].1.clone(),
1652        ));
1653        start = end;
1654    }
1655
1656    if out.is_empty() {
1657        out.push(ColZone::with_bounds(
1658            entries[0].1.clone(),
1659            entries[entries.len() - 1].1.clone(),
1660        ));
1661    }
1662
1663    out
1664}
1665
1666fn select_gap_split_points(entries: &[(CanonicalKey, Value)], max_splits: usize) -> Vec<usize> {
1667    let mut gaps = Vec::new();
1668    for idx in 1..entries.len() {
1669        if let Some(score) = canonical_gap_score(&entries[idx - 1].0, &entries[idx].0) {
1670            if score > 0.0 {
1671                gaps.push((score, idx));
1672            }
1673        }
1674    }
1675    gaps.sort_by(|left, right| {
1676        right
1677            .0
1678            .partial_cmp(&left.0)
1679            .unwrap_or(std::cmp::Ordering::Equal)
1680            .then_with(|| left.1.cmp(&right.1))
1681    });
1682    gaps.into_iter()
1683        .take(max_splits)
1684        .map(|(_, idx)| idx)
1685        .collect()
1686}
1687
1688fn canonical_gap_score(left: &CanonicalKey, right: &CanonicalKey) -> Option<f64> {
1689    if left.family() != right.family() {
1690        return None;
1691    }
1692    match (left, right) {
1693        (CanonicalKey::Signed(_, l), CanonicalKey::Signed(_, r)) => {
1694            Some(r.saturating_sub(*l) as f64)
1695        }
1696        (CanonicalKey::Unsigned(_, l), CanonicalKey::Unsigned(_, r)) => {
1697            Some(r.saturating_sub(*l) as f64)
1698        }
1699        (CanonicalKey::Float(l), CanonicalKey::Float(r)) => {
1700            Some((f64::from_bits(*r) - f64::from_bits(*l)).abs())
1701        }
1702        _ => None,
1703    }
1704}
1705
1706#[cfg(test)]
1707mod tests {
1708    use super::*;
1709    use crate::storage::schema::Value;
1710    use crate::storage::unified::entity::RowData;
1711    use crate::storage::unified::MetadataValue;
1712
1713    #[test]
1714    fn test_growing_segment_basic() {
1715        let mut segment = GrowingSegment::new(1, "test");
1716
1717        let entity = UnifiedEntity::table_row(
1718            EntityId::new(1),
1719            "users",
1720            1,
1721            vec![Value::text("Alice".to_string())],
1722        );
1723
1724        let id = segment.insert(entity).unwrap();
1725        assert_eq!(id, EntityId::new(1));
1726        assert!(segment.contains(id));
1727
1728        let stats = segment.stats();
1729        assert_eq!(stats.entity_count, 1);
1730        assert_eq!(stats.row_count, 1);
1731    }
1732
1733    #[test]
1734    fn test_segment_metadata() {
1735        let mut segment = GrowingSegment::new(1, "test");
1736
1737        let entity = UnifiedEntity::table_row(
1738            EntityId::new(1),
1739            "users",
1740            1,
1741            vec![Value::text("Alice".to_string())],
1742        );
1743        segment.insert(entity).unwrap();
1744
1745        let mut meta = Metadata::new();
1746        meta.set("role", MetadataValue::String("admin".to_string()));
1747        meta.set("level", MetadataValue::Int(5));
1748
1749        segment.set_metadata(EntityId::new(1), meta).unwrap();
1750
1751        let retrieved = segment.get_metadata(EntityId::new(1)).unwrap();
1752        assert_eq!(
1753            retrieved.get("role"),
1754            Some(&MetadataValue::String("admin".to_string()))
1755        );
1756    }
1757
1758    #[test]
1759    fn test_segment_seal() {
1760        let mut segment = GrowingSegment::new(1, "test");
1761
1762        let entity = UnifiedEntity::vector(EntityId::new(1), "embeddings", vec![0.1, 0.2, 0.3]);
1763        segment.insert(entity).unwrap();
1764
1765        // Can write before sealing
1766        assert!(segment.state().is_writable());
1767
1768        // Seal the segment
1769        segment.seal().unwrap();
1770        assert_eq!(segment.state(), SegmentState::Sealed);
1771
1772        // Cannot write after sealing
1773        let entity2 = UnifiedEntity::vector(EntityId::new(2), "embeddings", vec![0.4, 0.5, 0.6]);
1774        assert!(segment.insert(entity2).is_err());
1775    }
1776
1777    #[test]
1778    fn test_should_seal() {
1779        let mut segment = GrowingSegment::new(1, "test");
1780
1781        let config = SegmentConfig {
1782            max_entities: 2,
1783            ..Default::default()
1784        };
1785
1786        assert!(!segment.should_seal(&config));
1787
1788        segment
1789            .insert(UnifiedEntity::vector(EntityId::new(1), "v", vec![0.1]))
1790            .unwrap();
1791        assert!(!segment.should_seal(&config));
1792
1793        segment
1794            .insert(UnifiedEntity::vector(EntityId::new(2), "v", vec![0.2]))
1795            .unwrap();
1796        assert!(segment.should_seal(&config));
1797    }
1798
1799    #[test]
1800    fn test_cross_references() {
1801        let mut segment = GrowingSegment::new(1, "test");
1802
1803        let mut entity1 = UnifiedEntity::table_row(
1804            EntityId::new(1),
1805            "hosts",
1806            1,
1807            vec![Value::text("192.168.1.1".to_string())],
1808        );
1809        entity1.add_cross_ref(CrossRef::new(
1810            EntityId::new(1),
1811            EntityId::new(2),
1812            "nodes",
1813            RefType::RowToNode,
1814        ));
1815        segment.insert(entity1).unwrap();
1816
1817        let refs_from = segment.get_references_from(EntityId::new(1));
1818        assert_eq!(refs_from.len(), 1);
1819        assert_eq!(refs_from[0], (EntityId::new(2), RefType::RowToNode));
1820
1821        let refs_to = segment.get_references_to(EntityId::new(2));
1822        assert_eq!(refs_to.len(), 1);
1823        assert_eq!(refs_to[0], (EntityId::new(1), RefType::RowToNode));
1824    }
1825
1826    #[test]
1827    fn test_zone_predicate_uses_canonical_fallback_for_email_values() {
1828        let mut zone = ColZone::new(Value::Email("bravo@example.com".to_string()));
1829        zone.update(&Value::Email("delta@example.com".to_string()));
1830
1831        let probe = Value::Email("alpha@example.com".to_string());
1832        assert!(ZoneColPred::Eq(&probe).can_skip(&zone));
1833
1834        let in_range = Value::Email("charlie@example.com".to_string());
1835        assert!(!ZoneColPred::Eq(&in_range).can_skip(&zone));
1836    }
1837
1838    #[test]
1839    fn test_sealed_multi_zone_prunes_numeric_gap_outlier() {
1840        let mut segment = GrowingSegment::new(1, "test");
1841
1842        for (row_id, age) in [(1_u64, 1_i64), (2, 2), (3, 3), (4, 1000)] {
1843            let entity = UnifiedEntity::new(
1844                EntityId::new(row_id),
1845                EntityKind::TableRow {
1846                    table: "users".into(),
1847                    row_id,
1848                },
1849                EntityData::Row(RowData::with_names(
1850                    vec![Value::Integer(age)],
1851                    vec!["age".to_string()],
1852                )),
1853            );
1854            segment.insert(entity).unwrap();
1855        }
1856
1857        segment.seal().unwrap();
1858
1859        let miss = Value::Integer(500);
1860        assert!(segment.can_skip_zone_preds(&[("age", ZoneColPred::Eq(&miss))]));
1861
1862        let hit = Value::Integer(1000);
1863        assert!(!segment.can_skip_zone_preds(&[("age", ZoneColPred::Eq(&hit))]));
1864    }
1865}