Skip to main content

reddb_server/storage/unified/
manager.rs

1//! Segment Manager
2//!
3//! Manages the lifecycle of unified segments: creation, sealing, compaction,
4//! and archival. Coordinates writes to growing segments and queries across
5//! all segments.
6//!
7//! # Responsibilities
8//!
9//! - Route writes to the active growing segment
10//! - Auto-seal segments when thresholds are met
11//! - Coordinate queries across multiple segments
12//! - Background compaction of sealed segments
13//! - Archive old segments to cold storage
14
15use parking_lot::RwLock;
16use std::collections::HashMap;
17use std::sync::atomic::{AtomicU64, Ordering};
18use std::sync::Arc;
19
20use super::entity::{EntityId, UnifiedEntity};
21use super::metadata::{Metadata, MetadataFilter};
22use super::segment::{
23    GrowingSegment, SegmentConfig, SegmentError, SegmentId, SegmentState, SegmentStats,
24    UnifiedSegment, ZoneColPred, ZoneColPredKind,
25};
26use crate::storage::btree::visibility_map::VisibilityMap;
27
28/// Configuration for the segment manager
29#[derive(Debug, Clone)]
30pub struct ManagerConfig {
31    /// Segment configuration
32    pub segment_config: SegmentConfig,
33    /// Maximum number of sealed segments before compaction
34    pub max_sealed_segments: usize,
35    /// Idle time (seconds) before auto-sealing
36    pub idle_seal_secs: u64,
37    /// Enable background compaction
38    pub enable_compaction: bool,
39    /// Enable background archival
40    pub enable_archival: bool,
41    /// Age threshold for archival (seconds)
42    pub archive_age_secs: u64,
43}
44
45impl Default for ManagerConfig {
46    fn default() -> Self {
47        Self {
48            segment_config: SegmentConfig::default(),
49            max_sealed_segments: 10,
50            idle_seal_secs: 300, // 5 minutes
51            enable_compaction: true,
52            enable_archival: true,
53            archive_age_secs: 86400 * 7, // 7 days
54        }
55    }
56}
57
58/// Manager statistics
59#[derive(Debug, Clone, Default)]
60pub struct ManagerStats {
61    /// Total entities across all segments
62    pub total_entities: usize,
63    /// Number of growing segments
64    pub growing_count: usize,
65    /// Number of sealed segments
66    pub sealed_count: usize,
67    /// Number of archived segments
68    pub archived_count: usize,
69    /// Total memory usage
70    pub total_memory_bytes: usize,
71    /// Number of seal operations
72    pub seal_ops: u64,
73    /// Number of compaction operations
74    pub compact_ops: u64,
75}
76
77/// Lifecycle events for monitoring
78#[derive(Debug, Clone)]
79pub enum LifecycleEvent {
80    SegmentCreated(SegmentId),
81    SegmentSealed(SegmentId),
82    SegmentCompacted {
83        source: Vec<SegmentId>,
84        target: SegmentId,
85    },
86    SegmentArchived(SegmentId),
87    EntityInserted(EntityId, SegmentId),
88    EntityDeleted(EntityId, SegmentId),
89}
90
91/// Segment manager for a collection
92pub struct SegmentManager {
93    /// Collection name
94    collection: String,
95    /// Configuration
96    config: ManagerConfig,
97    /// Next segment ID counter
98    next_segment_id: AtomicU64,
99    /// Next entity ID counter
100    next_entity_id: AtomicU64,
101    /// Per-table auto-increment row ID (1, 2, 3... per collection)
102    next_row_id: AtomicU64,
103    /// Hot-path entity counter — lock-free, updated by every insert/delete.
104    /// Replaces stats.total_entities on the write path to eliminate a lock
105    /// acquisition per row (from 4 lock ops per insert → 2).
106    total_entities_atomic: AtomicU64,
107    /// Currently active growing segment
108    growing: RwLock<Option<Arc<RwLock<GrowingSegment>>>>,
109    /// Sealed segments (immutable, queryable)
110    sealed: RwLock<Vec<Arc<RwLock<GrowingSegment>>>>,
111    /// Archived segment IDs (stored externally)
112    archived: RwLock<Vec<SegmentId>>,
113    /// Entity to segment mapping (for fast lookups by individually-inserted entities).
114    /// Bulk-inserted entities skip this map; their segment is found by sequential scan
115    /// of growing + sealed segments in get()/update()/delete().
116    entity_segment: RwLock<HashMap<EntityId, SegmentId>>,
117    /// Shared column schema: column_name → index in Vec<Value>.
118    /// Populated on first bulk_insert. Enables columnar storage (Vec instead of HashMap per row).
119    column_schema: RwLock<Option<Arc<Vec<String>>>>,
120    /// Statistics (slow path — not updated on every insert).
121    stats: RwLock<ManagerStats>,
122    /// Event listeners (simplified - would be channels in production)
123    events: RwLock<Vec<LifecycleEvent>>,
124    /// Visibility map: sealed segment entity ranges marked as all-visible.
125    /// Growing segment is never all-visible (writes are in-flight).
126    /// Used by index-only scan decisions.
127    visibility_map: VisibilityMap,
128}
129
130impl SegmentManager {
131    /// Create a new segment manager
132    pub fn new(collection: impl Into<String>) -> Self {
133        Self::with_config(collection, ManagerConfig::default())
134    }
135
136    /// Create with custom configuration
137    pub fn with_config(collection: impl Into<String>, config: ManagerConfig) -> Self {
138        Self {
139            collection: collection.into(),
140            config,
141            next_segment_id: AtomicU64::new(1),
142            next_entity_id: AtomicU64::new(1),
143            next_row_id: AtomicU64::new(1),
144            total_entities_atomic: AtomicU64::new(0),
145            growing: RwLock::new(None),
146            sealed: RwLock::new(Vec::new()),
147            archived: RwLock::new(Vec::new()),
148            entity_segment: RwLock::new(HashMap::new()),
149            column_schema: RwLock::new(None),
150            stats: RwLock::new(ManagerStats::default()),
151            events: RwLock::new(Vec::new()),
152            visibility_map: VisibilityMap::new(),
153        }
154    }
155
156    /// Get or create the shared column schema from first row's named fields.
157    pub fn get_or_init_schema(
158        &self,
159        named: &HashMap<String, crate::storage::schema::Value>,
160    ) -> Arc<Vec<String>> {
161        {
162            let schema = self.column_schema.read();
163            if let Some(ref s) = *schema {
164                return Arc::clone(s);
165            }
166        }
167        let cols: Vec<String> = named.keys().cloned().collect();
168        let arc = Arc::new(cols);
169        *self.column_schema.write() = Some(Arc::clone(&arc));
170        arc
171    }
172
173    /// Get the column schema if it exists.
174    pub fn column_schema(&self) -> Option<Arc<Vec<String>>> {
175        self.column_schema.read().clone()
176    }
177
178    /// Get collection name
179    pub fn collection(&self) -> &str {
180        &self.collection
181    }
182
183    /// Get configuration
184    pub fn config(&self) -> &ManagerConfig {
185        &self.config
186    }
187
188    /// Get statistics. total_entities is read from the lock-free atomic;
189    /// other fields come from the slow-path stats struct.
190    pub fn stats(&self) -> ManagerStats {
191        let mut s = self.stats.read().clone();
192        s.total_entities = self.total_entities_atomic.load(Ordering::Relaxed) as usize;
193        s
194    }
195
196    /// Generate a new entity ID
197    pub fn next_entity_id(&self) -> EntityId {
198        EntityId::new(self.next_entity_id.fetch_add(1, Ordering::SeqCst))
199    }
200
201    /// Generate a per-table sequential row ID (1, 2, 3... per collection)
202    pub fn next_row_id(&self) -> u64 {
203        self.next_row_id.fetch_add(1, Ordering::SeqCst)
204    }
205
206    /// Reserve `n` contiguous per-table row IDs with one atomic
207    /// fetch_add. Caller assigns `row_id = start + i` per entity.
208    /// Saves N-1 atomic RMWs on bulk inserts (25k atomics → 1).
209    pub fn reserve_row_ids(&self, n: u64) -> std::ops::Range<u64> {
210        let start = self.next_row_id.fetch_add(n, Ordering::SeqCst);
211        start..start + n
212    }
213
214    /// Advance the per-table row_id counter to at least `id + 1`.
215    /// Called during load to restore the counter from existing data.
216    pub fn register_row_id(&self, id: u64) {
217        let candidate = id.saturating_add(1);
218        let mut current = self.next_row_id.load(Ordering::SeqCst);
219        while candidate > current {
220            match self.next_row_id.compare_exchange(
221                current,
222                candidate,
223                Ordering::SeqCst,
224                Ordering::SeqCst,
225            ) {
226                Ok(_) => break,
227                Err(updated) => current = updated,
228            }
229        }
230    }
231
232    /// Get or create the active growing segment.
233    ///
234    /// Fast path: read lock only — no write contention when the segment already exists.
235    /// Concurrent writers each clone the `Arc` under a shared read lock, then compete
236    /// on the segment's own write lock. This eliminates the global write-lock serialisation
237    /// that previously throttled concurrent inserts to ~238 ops/s.
238    fn get_or_create_growing(&self) -> Arc<RwLock<GrowingSegment>> {
239        // Common case: segment already exists — shared read lock, zero contention.
240        {
241            let growing = self.growing.read();
242            if let Some(segment) = growing.as_ref() {
243                return Arc::clone(segment);
244            }
245        }
246
247        // Slow path: segment missing — take exclusive write lock to create it.
248        let mut growing = self.growing.write();
249        // Double-check: another thread may have created it between the two lock acquisitions.
250        if let Some(segment) = growing.as_ref() {
251            return Arc::clone(segment);
252        }
253
254        let id = self.next_segment_id.fetch_add(1, Ordering::SeqCst);
255        let segment = GrowingSegment::new(id, &self.collection);
256        let segment_arc = Arc::new(RwLock::new(segment));
257        *growing = Some(Arc::clone(&segment_arc));
258
259        self.emit(LifecycleEvent::SegmentCreated(id));
260
261        // Update growing_count in the slow-path stats struct.
262        // This is the rare segment-creation path — locking is fine here.
263        self.stats.write().growing_count += 1;
264
265        segment_arc
266    }
267
268    /// Insert a new entity
269    pub fn insert(&self, mut entity: UnifiedEntity) -> Result<EntityId, SegmentError> {
270        // Check if we need to seal the current segment first
271        self.maybe_seal_growing()?;
272
273        let segment_arc = self.get_or_create_growing();
274        let mut segment = segment_arc.write();
275
276        // Assign entity ID if not already set
277        if entity.id.raw() == 0 {
278            entity.id = self.next_entity_id();
279        }
280
281        let entity_id = entity.id;
282        let segment_id = segment.id();
283
284        segment.insert(entity)?;
285
286        // Lock-free counter update — eliminates the stats write lock on the hot path.
287        self.total_entities_atomic.fetch_add(1, Ordering::Relaxed);
288
289        // entity_segment map is intentionally NOT updated here.
290        // update() and update_hot() first probe the growing segment directly
291        // (growing.contains(entity.id)) before consulting this map, so entities
292        // that were just inserted are found without entity_segment. The map is
293        // only consulted for entities that may have been moved to sealed segments,
294        // which can't be updated anyway (state().is_writable() == false).
295        // Skipping this write removes one exclusive HashMap lock per insert.
296
297        self.emit(LifecycleEvent::EntityInserted(entity_id, segment_id));
298
299        Ok(entity_id)
300    }
301
302    /// Insert multiple entities (batch) — sequential, one lock per item.
303    pub fn insert_batch(
304        &self,
305        entities: Vec<UnifiedEntity>,
306    ) -> Result<Vec<EntityId>, SegmentError> {
307        let mut ids = Vec::with_capacity(entities.len());
308        for entity in entities {
309            ids.push(self.insert(entity)?);
310        }
311        Ok(ids)
312    }
313
314    /// Turbo bulk insert — single lock acquisition for the entire batch.
315    /// Skips bloom filter, memtable, and cross-ref indexing for maximum speed.
316    pub fn bulk_insert(
317        &self,
318        mut entities: Vec<UnifiedEntity>,
319    ) -> Result<Vec<EntityId>, SegmentError> {
320        // Assign IDs and per-table row_ids.
321        for entity in &mut entities {
322            if entity.id.raw() == 0 {
323                entity.id = self.next_entity_id();
324            }
325            if let super::entity::EntityKind::TableRow { ref mut row_id, .. } = entity.kind {
326                if *row_id == 0 {
327                    *row_id = self.next_row_id();
328                } else {
329                    self.register_row_id(*row_id);
330                }
331            }
332        }
333
334        // Convert named HashMap → positional Vec (compact memory representation)
335        // The schema (column order) is shared across all rows in the collection.
336        if let Some(first_row) = entities.first() {
337            if let super::entity::EntityData::Row(ref row) = first_row.data {
338                if let Some(ref named) = row.named {
339                    let schema = self.get_or_init_schema(named);
340                    for entity in &mut entities {
341                        if let super::entity::EntityData::Row(ref mut row) = entity.data {
342                            if let Some(named) = row.named.take() {
343                                let mut cols = Vec::with_capacity(schema.len());
344                                for col_name in schema.iter() {
345                                    cols.push(
346                                        named
347                                            .get(col_name)
348                                            .cloned()
349                                            .unwrap_or(crate::storage::schema::Value::Null),
350                                    );
351                                }
352                                row.columns = cols;
353                                row.schema = Some(Arc::clone(&schema));
354                            }
355                        }
356                    }
357                }
358            }
359        }
360
361        let segment_arc = self.get_or_create_growing();
362        let mut segment = segment_arc.write();
363        let segment_id = segment.id();
364
365        // Single call to GrowingSegment.bulk_insert (one lock, no bloom/memtable)
366        let ids = segment.bulk_insert(entities)?;
367
368        // Skip entity-segment mapping for bulk inserts (saves ~56 bytes/entity).
369        // The get() method scans growing+sealed segments directly.
370
371        // Lock-free batch counter update.
372        self.total_entities_atomic
373            .fetch_add(ids.len() as u64, Ordering::Relaxed);
374
375        Ok(ids)
376    }
377
378    /// Get an entity by ID — scans growing then sealed segments.
379    pub fn get(&self, id: EntityId) -> Option<UnifiedEntity> {
380        // Growing segment first (most likely for recent inserts)
381        if let Some(growing_arc) = self.growing.read().as_ref() {
382            let growing = growing_arc.read();
383            if let Some(entity) = growing.get(id) {
384                return Some(entity.clone());
385            }
386        }
387
388        // Then sealed segments
389        let sealed = self.sealed.read();
390        for segment in sealed.iter() {
391            let seg = segment.read();
392            if let Some(entity) = seg.get(id) {
393                return Some(entity.clone());
394            }
395        }
396
397        None
398    }
399
400    /// Batch-fetch multiple entities by ID in a single lock acquisition per segment.
401    ///
402    /// For indexed-scan result sets (up to ~5000 ids from range/bitmap lookup) this
403    /// is 2-3 lock acquisitions total vs N×3 with individual `get()` calls.
404    pub fn get_many(&self, ids: &[EntityId]) -> Vec<Option<UnifiedEntity>> {
405        let mut out: Vec<Option<UnifiedEntity>> = vec![None; ids.len()];
406        let mut remaining: Vec<usize> = (0..ids.len()).collect(); // indices still unfound
407
408        // Growing segment — one read lock for the entire batch.
409        // Non-blocking first: if a writer is active, fall back to blocking.
410        if let Some(growing_arc) = self.growing.read().as_ref() {
411            let growing = if let Some(g) = growing_arc.try_read() {
412                g
413            } else {
414                growing_arc.read()
415            };
416            remaining.retain(|&i| {
417                if let Some(entity) = growing.get(ids[i]) {
418                    out[i] = Some(entity.clone());
419                    false // remove from remaining
420                } else {
421                    true // keep — not found yet
422                }
423            });
424        }
425
426        if remaining.is_empty() {
427            return out;
428        }
429
430        // Sealed segments — one read lock per segment
431        let sealed = self.sealed.read();
432        for segment in sealed.iter() {
433            if remaining.is_empty() {
434                break;
435            }
436            let seg = segment.read();
437            remaining.retain(|&i| {
438                if let Some(entity) = seg.get(ids[i]) {
439                    out[i] = Some(entity.clone());
440                    false
441                } else {
442                    true
443                }
444            });
445        }
446
447        out
448    }
449
450    /// Visitor-pattern batch fetch. Invokes `f(&UnifiedEntity, usize_index)`
451    /// for each id that resolves, never cloning the entity.
452    ///
453    /// Used by scan hot paths (select_range, select_filtered) that
454    /// materialize each entity into an output record and don't need
455    /// an owned `UnifiedEntity`. Eliminates ~20% of scan CPU spent in
456    /// `UnifiedEntity::clone` when `get_batch` is followed by
457    /// `runtime_table_record_lean(entity)`.
458    ///
459    /// The closure runs while the segment read lock is held, so it
460    /// must be short — avoid doing I/O or taking unrelated locks in
461    /// `f`.
462    pub fn for_each_id<F>(&self, ids: &[EntityId], mut f: F)
463    where
464        F: FnMut(usize, &UnifiedEntity),
465    {
466        // Thread-local scratch buffer for the "pending" index list.
467        // Previous code allocated a fresh `Vec<usize>` of capacity
468        // N on every call — 4200 × 1000 queries / scenario on the
469        // select_range bench path. Take-and-restore pattern (vs
470        // RefCell::borrow_mut) so user closures that recurse into
471        // another `for_each_id` don't panic on a re-borrow; worst
472        // case they allocate a fresh buffer and we lose the caching
473        // win for that nested call.
474        thread_local! {
475            static REMAINING_SCRATCH: std::cell::Cell<Vec<usize>> =
476                const { std::cell::Cell::new(Vec::new()) };
477        }
478
479        let mut remaining: Vec<usize> = REMAINING_SCRATCH.with(|cell| cell.take());
480        remaining.clear();
481
482        if let Some(growing_arc) = self.growing.read().as_ref() {
483            let growing = if let Some(g) = growing_arc.try_read() {
484                g
485            } else {
486                growing_arc.read()
487            };
488            for (i, id) in ids.iter().enumerate() {
489                if let Some(entity) = growing.get(*id) {
490                    f(i, entity);
491                } else {
492                    remaining.push(i);
493                }
494            }
495        } else {
496            remaining.reserve(ids.len());
497            remaining.extend(0..ids.len());
498        }
499
500        if !remaining.is_empty() {
501            let sealed = self.sealed.read();
502            for segment in sealed.iter() {
503                if remaining.is_empty() {
504                    break;
505                }
506                let seg = segment.read();
507                remaining.retain(|&i| {
508                    if let Some(entity) = seg.get(ids[i]) {
509                        f(i, entity);
510                        false
511                    } else {
512                        true
513                    }
514                });
515            }
516        }
517
518        REMAINING_SCRATCH.with(|cell| cell.set(remaining));
519    }
520
521    /// Scan all segments for an entity
522    fn scan_for_entity(&self, id: EntityId) -> Option<UnifiedEntity> {
523        // Check growing
524        if let Some(growing_arc) = self.growing.read().as_ref() {
525            let growing = growing_arc.read();
526            if let Some(entity) = growing.get(id) {
527                return Some(entity.clone());
528            }
529        }
530
531        // Check sealed
532        let sealed = self.sealed.read();
533        for segment in sealed.iter() {
534            if let Some(entity) = segment.get(id) {
535                return Some(entity.clone());
536            }
537        }
538
539        None
540    }
541
542    fn find_sealed_segment_arc(&self, id: EntityId) -> Option<Arc<RwLock<GrowingSegment>>> {
543        let sealed = self.sealed.read();
544        sealed
545            .iter()
546            .find(|segment_arc| segment_arc.read().contains(id))
547            .map(Arc::clone)
548    }
549
550    fn rewrite_sealed_entity_into_growing(
551        &self,
552        entity: UnifiedEntity,
553        metadata: Option<&Metadata>,
554    ) -> Result<(), SegmentError> {
555        let entity_id = entity.id;
556        let sealed_arc = self
557            .find_sealed_segment_arc(entity_id)
558            .ok_or(SegmentError::NotFound(entity_id))?;
559
560        let metadata_to_apply = {
561            let mut sealed = sealed_arc.write();
562            let existing_metadata = sealed.get_metadata(entity_id);
563            if !sealed.force_delete(entity_id) {
564                return Err(SegmentError::NotFound(entity_id));
565            }
566            metadata.cloned().or(existing_metadata)
567        };
568
569        let growing_arc = self.get_or_create_growing();
570        let growing_id = {
571            let mut growing = growing_arc.write();
572            growing.insert(entity)?;
573            if let Some(metadata) = metadata_to_apply {
574                growing.set_metadata(entity_id, metadata)?;
575            }
576            growing.id()
577        };
578
579        self.entity_segment.write().insert(entity_id, growing_id);
580        Ok(())
581    }
582
583    /// Update an entity
584    pub fn update(&self, entity: UnifiedEntity) -> Result<(), SegmentError> {
585        let entity_id = entity.id;
586        let mut entity = Some(entity);
587
588        // Try growing segment directly (covers bulk-inserted entities without entity_segment map)
589        if let Some(growing_arc) = self.growing.read().as_ref() {
590            let mut growing = growing_arc.write();
591            if growing.contains(entity_id) && growing.state().is_writable() {
592                return growing.update(entity.take().expect("entity already moved"));
593            }
594        }
595
596        // Try entity_segment mapping for individually inserted entities
597        let segment_id = self.entity_segment.read().get(&entity_id).copied();
598        if let Some(seg_id) = segment_id {
599            if let Some(growing_arc) = self.growing.read().as_ref() {
600                let mut growing = growing_arc.write();
601                if growing.id() == seg_id && growing.state().is_writable() {
602                    return growing.update(entity.take().expect("entity already moved"));
603                }
604            }
605        }
606
607        if let Some(entity) = entity.take() {
608            return self.rewrite_sealed_entity_into_growing(entity, None);
609        }
610
611        Err(SegmentError::NotFound(entity_id))
612    }
613
614    /// Update an entity and, optionally, replace its metadata while holding
615    /// the segment write lock only once.
616    pub fn update_with_metadata(
617        &self,
618        entity: UnifiedEntity,
619        metadata: Option<&Metadata>,
620    ) -> Result<(), SegmentError> {
621        let entity_id = entity.id;
622        let mut entity = Some(entity);
623
624        // Try growing segment directly (covers bulk-inserted entities without entity_segment map)
625        if let Some(growing_arc) = self.growing.read().as_ref() {
626            let mut growing = growing_arc.write();
627            if growing.contains(entity_id) && growing.state().is_writable() {
628                growing.update(entity.take().expect("entity already moved"))?;
629                if let Some(metadata) = metadata {
630                    growing.set_metadata(entity_id, metadata.clone())?;
631                }
632                return Ok(());
633            }
634        }
635
636        // Try entity_segment mapping for individually inserted entities
637        let segment_id = self.entity_segment.read().get(&entity_id).copied();
638        if let Some(seg_id) = segment_id {
639            if let Some(growing_arc) = self.growing.read().as_ref() {
640                let mut growing = growing_arc.write();
641                if growing.id() == seg_id && growing.state().is_writable() {
642                    growing.update(entity.take().expect("entity already moved"))?;
643                    if let Some(metadata) = metadata {
644                        growing.set_metadata(entity_id, metadata.clone())?;
645                    }
646                    return Ok(());
647                }
648            }
649        }
650
651        if let Some(entity) = entity.take() {
652            return self.rewrite_sealed_entity_into_growing(entity, metadata);
653        }
654
655        Err(SegmentError::NotFound(entity_id))
656    }
657
658    /// HOT-update: like update but skips index work for unchanged columns.
659    /// `modified_columns` is the list of column names actually changed by the
660    /// UPDATE statement — lets us skip pk_index and cross_ref when safe.
661    pub fn update_hot(
662        &self,
663        entity: UnifiedEntity,
664        modified_columns: &[String],
665    ) -> Result<(), SegmentError> {
666        let entity_id = entity.id;
667        let mut entity = Some(entity);
668
669        if let Some(growing_arc) = self.growing.read().as_ref() {
670            let mut growing = growing_arc.write();
671            if growing.contains(entity_id) && growing.state().is_writable() {
672                return growing.update_hot(
673                    entity.take().expect("entity already moved"),
674                    modified_columns,
675                );
676            }
677        }
678
679        let segment_id = self.entity_segment.read().get(&entity_id).copied();
680        if let Some(seg_id) = segment_id {
681            if let Some(growing_arc) = self.growing.read().as_ref() {
682                let mut growing = growing_arc.write();
683                if growing.id() == seg_id && growing.state().is_writable() {
684                    return growing.update_hot(
685                        entity.take().expect("entity already moved"),
686                        modified_columns,
687                    );
688                }
689            }
690        }
691
692        if let Some(entity) = entity.take() {
693            return self.rewrite_sealed_entity_into_growing(entity, None);
694        }
695
696        Err(SegmentError::NotFound(entity_id))
697    }
698
699    /// HOT-update an entity and, optionally, replace its metadata while
700    /// holding the segment write lock only once.
701    pub fn update_hot_with_metadata(
702        &self,
703        entity: UnifiedEntity,
704        modified_columns: &[String],
705        metadata: Option<&Metadata>,
706    ) -> Result<(), SegmentError> {
707        let entity_id = entity.id;
708        let mut entity = Some(entity);
709
710        if let Some(growing_arc) = self.growing.read().as_ref() {
711            let mut growing = growing_arc.write();
712            if growing.contains(entity_id) && growing.state().is_writable() {
713                growing.update_hot(
714                    entity.take().expect("entity already moved"),
715                    modified_columns,
716                )?;
717                if let Some(metadata) = metadata {
718                    growing.set_metadata(entity_id, metadata.clone())?;
719                }
720                return Ok(());
721            }
722        }
723
724        let segment_id = self.entity_segment.read().get(&entity_id).copied();
725        if let Some(seg_id) = segment_id {
726            if let Some(growing_arc) = self.growing.read().as_ref() {
727                let mut growing = growing_arc.write();
728                if growing.id() == seg_id && growing.state().is_writable() {
729                    growing.update_hot(
730                        entity.take().expect("entity already moved"),
731                        modified_columns,
732                    )?;
733                    if let Some(metadata) = metadata {
734                        growing.set_metadata(entity_id, metadata.clone())?;
735                    }
736                    return Ok(());
737                }
738            }
739        }
740
741        if let Some(entity) = entity.take() {
742            return self.rewrite_sealed_entity_into_growing(entity, metadata);
743        }
744
745        Err(SegmentError::NotFound(entity_id))
746    }
747
748    /// Batch HOT-update multiple entities while holding the growing-segment
749    /// write lock only once when possible.
750    pub fn update_hot_batch_with_metadata<'a, I>(&self, items: I) -> Result<(), SegmentError>
751    where
752        I: IntoIterator<Item = (&'a UnifiedEntity, &'a [String], Option<&'a Metadata>)>,
753    {
754        let items: Vec<(&UnifiedEntity, &[String], Option<&Metadata>)> =
755            items.into_iter().collect();
756        if items.is_empty() {
757            return Ok(());
758        }
759
760        if let Some(growing_arc) = self.growing.read().as_ref() {
761            let mut growing = growing_arc.write();
762            if growing.state().is_writable() {
763                match growing.update_hot_batch_with_metadata(items.iter().copied()) {
764                    Ok(()) => return Ok(()),
765                    Err(SegmentError::NotFound(_)) => {}
766                    Err(other) => return Err(other),
767                }
768            }
769        }
770
771        for (entity, modified_columns, metadata) in items {
772            self.update_hot_with_metadata(entity.clone(), modified_columns, metadata)?;
773        }
774        Ok(())
775    }
776
777    /// Delete an entity
778    pub fn delete(&self, id: EntityId) -> Result<bool, SegmentError> {
779        // Fast path: probe the growing segment directly — covers entities inserted via
780        // insert() which no longer writes to entity_segment, and bulk-inserted entities.
781        if let Some(growing_arc) = self.growing.read().as_ref() {
782            let mut growing = growing_arc.write();
783            if growing.contains(id) && growing.state().is_writable() {
784                let seg_id = growing.id();
785                let deleted = growing.delete(id)?;
786                if deleted {
787                    self.entity_segment.write().remove(&id);
788                    self.total_entities_atomic.fetch_sub(1, Ordering::Relaxed);
789                    self.emit(LifecycleEvent::EntityDeleted(id, seg_id));
790                }
791                return Ok(deleted);
792            }
793        }
794
795        // Fallback: check entity_segment map (populated for older insert() paths
796        // or entities that were in a previous growing segment).
797        let segment_id = self.entity_segment.read().get(&id).copied();
798
799        if let Some(seg_id) = segment_id {
800            if let Some(growing_arc) = self.growing.read().as_ref() {
801                let mut growing = growing_arc.write();
802                if growing.id() == seg_id && growing.state().is_writable() {
803                    let deleted = growing.delete(id)?;
804                    if deleted {
805                        self.entity_segment.write().remove(&id);
806                        self.total_entities_atomic.fetch_sub(1, Ordering::Relaxed);
807                        self.emit(LifecycleEvent::EntityDeleted(id, seg_id));
808                    }
809                    return Ok(deleted);
810                }
811            }
812        }
813
814        // Fallback: entity is in a sealed segment (bulk-inserted, not in entity_segment map).
815        // Single write-lock per segment to avoid TOCTOU race between contains() and force_delete().
816        {
817            let sealed = self.sealed.read();
818            for segment_arc in sealed.iter() {
819                let mut seg = segment_arc.write();
820                let seg_id = seg.id();
821                if seg.contains(id) {
822                    let deleted = seg.force_delete(id);
823                    drop(seg);
824                    if deleted {
825                        self.entity_segment.write().remove(&id);
826                        self.total_entities_atomic.fetch_sub(1, Ordering::Relaxed);
827                        self.emit(LifecycleEvent::EntityDeleted(id, seg_id));
828                    }
829                    return Ok(deleted);
830                }
831            }
832        }
833
834        Ok(false)
835    }
836
837    pub fn delete_batch(&self, ids: &[EntityId]) -> Result<Vec<EntityId>, SegmentError> {
838        if ids.is_empty() {
839            return Ok(Vec::new());
840        }
841
842        let mut deleted_ids = Vec::with_capacity(ids.len());
843
844        if let Some(growing_arc) = self.growing.read().as_ref() {
845            let mut growing = growing_arc.write();
846            if growing.state().is_writable() {
847                let seg_id = growing.id();
848                let deleted = growing.delete_batch(ids)?;
849                if !deleted.is_empty() {
850                    {
851                        let mut entity_segment = self.entity_segment.write();
852                        for id in &deleted {
853                            entity_segment.remove(id);
854                        }
855                    }
856                    self.total_entities_atomic
857                        .fetch_sub(deleted.len() as u64, Ordering::Relaxed);
858                    for id in &deleted {
859                        self.emit(LifecycleEvent::EntityDeleted(*id, seg_id));
860                    }
861                    deleted_ids.extend(deleted);
862                }
863            }
864        }
865
866        if deleted_ids.len() == ids.len() {
867            return Ok(deleted_ids);
868        }
869
870        let deleted_set: std::collections::HashSet<EntityId> =
871            deleted_ids.iter().copied().collect();
872        for &id in ids {
873            if deleted_set.contains(&id) {
874                continue;
875            }
876            if self.delete(id)? {
877                deleted_ids.push(id);
878            }
879        }
880
881        Ok(deleted_ids)
882    }
883
884    /// Get metadata for an entity
885    pub fn get_metadata(&self, id: EntityId) -> Option<Metadata> {
886        // Fast path: probe growing segment directly (no entity_segment needed).
887        if let Some(growing_arc) = self.growing.read().as_ref() {
888            let growing = growing_arc.read();
889            if growing.contains(id) {
890                return growing.get_metadata(id);
891            }
892        }
893
894        // Fallback: entity_segment map (for pre-existing or sealed entities)
895        let segment_id = self.entity_segment.read().get(&id).copied();
896
897        if let Some(seg_id) = segment_id {
898            if let Some(growing_arc) = self.growing.read().as_ref() {
899                let growing = growing_arc.read();
900                if growing.id() == seg_id {
901                    return growing.get_metadata(id);
902                }
903            }
904
905            let sealed = self.sealed.read();
906            for segment in sealed.iter() {
907                if segment.id() == seg_id {
908                    return segment.get_metadata(id);
909                }
910            }
911        }
912
913        if let Some(segment_arc) = self.find_sealed_segment_arc(id) {
914            return segment_arc.read().get_metadata(id);
915        }
916
917        None
918    }
919
920    /// Set metadata for an entity
921    pub fn set_metadata(&self, id: EntityId, metadata: Metadata) -> Result<(), SegmentError> {
922        // Fast path: probe growing segment directly — covers entities inserted via
923        // insert() which no longer writes to entity_segment.
924        if let Some(growing_arc) = self.growing.read().as_ref() {
925            let mut growing = growing_arc.write();
926            if growing.contains(id) && growing.state().is_writable() {
927                return growing.set_metadata(id, metadata);
928            }
929        }
930
931        // Fallback: entity_segment map (sealed or pre-atomic-path entities)
932        let segment_id = self.entity_segment.read().get(&id).copied();
933
934        if let Some(seg_id) = segment_id {
935            if let Some(growing_arc) = self.growing.read().as_ref() {
936                let mut growing = growing_arc.write();
937                if growing.id() == seg_id && growing.state().is_writable() {
938                    return growing.set_metadata(id, metadata);
939                }
940            }
941        }
942
943        if let Some(entity) = self.get(id) {
944            return self.rewrite_sealed_entity_into_growing(entity, Some(&metadata));
945        }
946
947        Err(SegmentError::NotFound(id))
948    }
949
950    /// Check if growing segment should be sealed
951    fn maybe_seal_growing(&self) -> Result<(), SegmentError> {
952        let should_seal = {
953            let growing_opt = self.growing.read();
954            if let Some(growing_arc) = growing_opt.as_ref() {
955                let growing = growing_arc.read();
956                growing.should_seal(&self.config.segment_config)
957                    || growing.idle_secs() >= self.config.idle_seal_secs
958            } else {
959                false
960            }
961        };
962
963        if should_seal {
964            self.seal_current()?;
965        }
966
967        Ok(())
968    }
969
970    /// Seal the current growing segment
971    pub fn seal_current(&self) -> Result<SegmentId, SegmentError> {
972        let growing_opt = self.growing.write().take();
973
974        if let Some(growing_arc) = growing_opt {
975            let mut growing = growing_arc.write();
976            let seg_id = growing.id();
977            let entity_count = growing.stats().entity_count as u64;
978
979            // Seal it
980            growing.seal()?;
981
982            // Move to sealed list (we need to extract it from the Arc)
983            drop(growing); // Release write lock
984
985            // In a real implementation, we'd convert to SealedSegment here
986            // For now, we keep it as-is since GrowingSegment implements UnifiedSegment
987            self.sealed.write().push(growing_arc);
988
989            // Mark sealed segment pages all-visible — they're now immutable
990            self.mark_sealed_pages_visible(entity_count);
991
992            // Update stats
993            {
994                let mut stats = self.stats.write();
995                stats.growing_count = stats.growing_count.saturating_sub(1);
996                stats.sealed_count += 1;
997                stats.seal_ops += 1;
998            }
999
1000            self.emit(LifecycleEvent::SegmentSealed(seg_id));
1001
1002            return Ok(seg_id);
1003        }
1004
1005        Err(SegmentError::InvalidState(SegmentState::Sealed))
1006    }
1007
1008    /// Force seal (for testing/manual control)
1009    pub fn force_seal(&self) -> Result<Option<SegmentId>, SegmentError> {
1010        let has_growing = self.growing.read().is_some();
1011        if has_growing {
1012            Ok(Some(self.seal_current()?))
1013        } else {
1014            Ok(None)
1015        }
1016    }
1017
1018    /// Fraction of "pages" in sealed segments that are marked all-visible.
1019    ///
1020    /// Sealed segments are immutable so all their rows are safe for
1021    /// index-only scans. The growing segment is never counted (writes
1022    /// may be in-flight). Uses `rows_per_page = 256` (matching 8 KB pages
1023    /// with ~32-byte rows).
1024    ///
1025    /// Returns a value in `[0.0, 1.0]`. 1.0 when all sealed rows are
1026    /// visible; 0.0 when there are no sealed segments.
1027    pub fn all_visible_fraction(&self) -> f64 {
1028        const ROWS_PER_PAGE: u32 = 256;
1029        let sealed = self.sealed.read();
1030        if sealed.is_empty() {
1031            return 0.0;
1032        }
1033        let mut total_pages: u64 = 0;
1034        for seg_arc in sealed.iter() {
1035            let seg = seg_arc.read();
1036            let entity_count = seg.stats().entity_count as u64;
1037            let pages = entity_count.div_ceil(ROWS_PER_PAGE as u64);
1038            total_pages += pages;
1039        }
1040        if total_pages == 0 {
1041            return 0.0;
1042        }
1043        let visible = self.visibility_map.all_visible_count();
1044        (visible as f64 / total_pages as f64).min(1.0)
1045    }
1046
1047    /// Mark all pages of newly sealed segments as all-visible in the
1048    /// visibility map. Called internally after `seal_current`.
1049    fn mark_sealed_pages_visible(&self, seg_entity_count: u64) {
1050        const ROWS_PER_PAGE: u32 = 256;
1051        let existing_visible = self.visibility_map.all_visible_count();
1052        // Append pages starting after the last known visible page
1053        let start_page = existing_visible as u32;
1054        let new_pages = seg_entity_count.div_ceil(ROWS_PER_PAGE as u64);
1055        let end_page = start_page + new_pages as u32;
1056        self.visibility_map.mark_range_visible(start_page, end_page);
1057    }
1058
1059    /// Iterate over all entities in-place without collecting into a Vec.
1060    ///
1061    /// The callback receives a reference to each entity. Return `true` to
1062    /// continue iteration, `false` to stop early (e.g. when a LIMIT is reached).
1063    /// This avoids the allocation and cloning overhead of `query_all`.
1064    pub fn for_each_entity<F>(&self, mut callback: F)
1065    where
1066        F: FnMut(&UnifiedEntity) -> bool,
1067    {
1068        // Growing segment — direct iteration (no Box<dyn>)
1069        // Try non-blocking read first; fall back to blocking only when a writer
1070        // is actively holding the write lock (rare in read-heavy workloads).
1071        if let Some(growing_arc) = self.growing.read().as_ref() {
1072            let growing = if let Some(g) = growing_arc.try_read() {
1073                g
1074            } else {
1075                growing_arc.read()
1076            };
1077            if !growing.for_each_fast(&mut callback) {
1078                return;
1079            }
1080        }
1081
1082        // Sealed segments
1083        let sealed = self.sealed.read();
1084        for segment_arc in sealed.iter() {
1085            let segment = segment_arc.read();
1086            if !segment.for_each_fast(&mut callback) {
1087                return;
1088            }
1089        }
1090    }
1091
1092    /// Parallel fold across all entities. Each sealed segment is
1093    /// processed on its own rayon task; the growing segment stays on
1094    /// the caller thread (its read lock is briefly held).
1095    ///
1096    /// - `init` builds a fresh accumulator per thread.
1097    /// - `fold` mutates an accumulator with one entity at a time.
1098    /// - `reduce` combines two accumulators into one.
1099    ///
1100    /// The returned value is the reduction of every per-thread
1101    /// accumulator. Use this for aggregate-shape workloads (GROUP BY)
1102    /// where per-thread partial state can be merged cheaply.
1103    ///
1104    /// NOTE: when there are 0 or 1 sealed segments, the parallel path
1105    /// is skipped and the work runs sequentially to avoid rayon
1106    /// overhead on tiny tables.
1107    pub fn fold_entities_parallel<T, FInit, FFold, FReduce>(
1108        &self,
1109        init: FInit,
1110        fold: FFold,
1111        reduce: FReduce,
1112    ) -> T
1113    where
1114        T: Send,
1115        FInit: Fn() -> T + Send + Sync,
1116        FFold: Fn(T, &UnifiedEntity) -> T + Send + Sync,
1117        FReduce: Fn(T, T) -> T + Send + Sync,
1118    {
1119        use rayon::prelude::*;
1120
1121        // Growing segment — always sequential (single writer lock,
1122        // usually small working set).
1123        let mut acc = init();
1124        if let Some(growing_arc) = self.growing.read().as_ref() {
1125            let growing = if let Some(g) = growing_arc.try_read() {
1126                g
1127            } else {
1128                growing_arc.read()
1129            };
1130            growing.for_each_fast(|entity| {
1131                acc = fold(std::mem::replace(&mut acc, init()), entity);
1132                true
1133            });
1134        }
1135
1136        // Sealed segments — snapshot the Arc list under the read lock,
1137        // then drop the lock so rayon workers can fan out without
1138        // blocking writers.
1139        let segments: Vec<_> = {
1140            let sealed = self.sealed.read();
1141            sealed.iter().cloned().collect()
1142        };
1143
1144        if segments.len() <= 1 {
1145            for seg_arc in &segments {
1146                let seg = seg_arc.read();
1147                seg.for_each_fast(|entity| {
1148                    acc = fold(std::mem::replace(&mut acc, init()), entity);
1149                    true
1150                });
1151            }
1152            return acc;
1153        }
1154
1155        let sealed_acc = segments
1156            .into_par_iter()
1157            .map(|seg_arc| {
1158                let mut local = init();
1159                let seg = seg_arc.read();
1160                seg.for_each_fast(|entity| {
1161                    local = fold(std::mem::replace(&mut local, init()), entity);
1162                    true
1163                });
1164                local
1165            })
1166            .reduce(&init, &reduce);
1167
1168        reduce(acc, sealed_acc)
1169    }
1170
1171    /// Zone-map-aware iteration across all segments.
1172    ///
1173    /// Like `for_each_entity`, but checks `zone_preds` against each segment's
1174    /// column zone maps before iterating. Segments where any predicate can
1175    /// definitively prove no rows match are skipped entirely.
1176    ///
1177    /// `zone_preds`: slice of `(column_name, ZoneColPred)` extracted from the WHERE clause.
1178    /// Empty slice → same behaviour as `for_each_entity` (no pruning).
1179    pub fn for_each_entity_zoned<F>(&self, zone_preds: &[(&str, ZoneColPred<'_>)], mut callback: F)
1180    where
1181        F: FnMut(&UnifiedEntity) -> bool,
1182    {
1183        // Growing segment — never skip (it's receiving writes, zones are partial).
1184        // Try a non-blocking read first: if a writer is currently inserting
1185        // (holding the write lock), try_read() returns None and we fall back to
1186        // the blocking read.  In low-contention workloads (reads far outnumber
1187        // writes) the try_read() almost always succeeds and readers never stall.
1188        if let Some(growing_arc) = self.growing.read().as_ref() {
1189            let growing = if let Some(g) = growing_arc.try_read() {
1190                g
1191            } else {
1192                growing_arc.read()
1193            };
1194            if !growing.for_each_fast(&mut callback) {
1195                return;
1196            }
1197        }
1198
1199        // Sealed segments — check zone maps before iterating
1200        let sealed = self.sealed.read();
1201        for segment_arc in sealed.iter() {
1202            let segment = segment_arc.read();
1203            if !zone_preds.is_empty() && segment.can_skip_zone_preds(zone_preds) {
1204                continue; // entire segment pruned
1205            }
1206            if !segment.for_each_fast(&mut callback) {
1207                return;
1208            }
1209        }
1210    }
1211
1212    /// Zone-map-aware parallel query.
1213    ///
1214    /// Like `query_all` but applies `zone_preds` on the main thread to
1215    /// prune sealed segments before spawning workers — segments that
1216    /// provably contain no matching rows are skipped entirely.
1217    ///
1218    /// Zone check runs single-threaded (it reads per-segment metadata,
1219    /// not row data), so it's cheap. Surviving segments are then scanned
1220    /// in parallel using `std::thread::scope` when there are > 1 of them.
1221    pub fn query_all_zoned<F>(
1222        &self,
1223        zone_preds: &[(&str, ZoneColPred<'_>)],
1224        filter: F,
1225    ) -> Vec<UnifiedEntity>
1226    where
1227        F: Fn(&UnifiedEntity) -> bool + Sync,
1228    {
1229        let mut results = Vec::new();
1230
1231        // Growing segment — always scan, no zone skip (zones are partial).
1232        // Non-blocking try_read() avoids stalling behind in-progress inserts.
1233        if let Some(growing_arc) = self.growing.read().as_ref() {
1234            let growing = if let Some(g) = growing_arc.try_read() {
1235                g
1236            } else {
1237                growing_arc.read()
1238            };
1239            results.extend(growing.iter().filter(|e| filter(e)).cloned());
1240        }
1241
1242        // Sealed segments: zone-prune on main thread, then scan in parallel.
1243        let sealed = self.sealed.read();
1244        // Collect only the segments that survive zone-predicate pruning.
1245        let surviving: Vec<_> = sealed
1246            .iter()
1247            .filter(|seg_arc| {
1248                if zone_preds.is_empty() {
1249                    return true;
1250                }
1251                let seg = seg_arc.read();
1252                !seg.can_skip_zone_preds(zone_preds)
1253            })
1254            .collect();
1255
1256        let use_parallel = surviving.len() > 1 && crate::runtime::SystemInfo::should_parallelize();
1257
1258        if use_parallel {
1259            let filter_ref = &filter;
1260            let segment_results: Vec<Vec<UnifiedEntity>> = std::thread::scope(|s| {
1261                surviving
1262                    .iter()
1263                    .map(|segment| {
1264                        s.spawn(move || {
1265                            segment
1266                                .read()
1267                                .iter()
1268                                .filter(|e| filter_ref(e))
1269                                .cloned()
1270                                .collect::<Vec<_>>()
1271                        })
1272                    })
1273                    .collect::<Vec<_>>()
1274                    .into_iter()
1275                    .map(|handle| handle.join().unwrap_or_default())
1276                    .collect()
1277            });
1278            for batch in segment_results {
1279                results.extend(batch);
1280            }
1281        } else {
1282            for segment_arc in surviving {
1283                let seg = segment_arc.read();
1284                results.extend(seg.iter().filter(|e| filter(e)).cloned());
1285            }
1286        }
1287
1288        results
1289    }
1290
1291    /// Query across all segments. Uses parallel scanning for sealed segments
1292    /// when more than one sealed segment exists.
1293    pub fn query_all<F>(&self, filter: F) -> Vec<UnifiedEntity>
1294    where
1295        F: Fn(&UnifiedEntity) -> bool + Sync,
1296    {
1297        let mut results = Vec::new();
1298
1299        // Query growing segment — try non-blocking read first (avoids stalling
1300        // behind an in-progress insert; falls back to blocking if writer is active).
1301        if let Some(growing_arc) = self.growing.read().as_ref() {
1302            let growing = if let Some(g) = growing_arc.try_read() {
1303                g
1304            } else {
1305                growing_arc.read()
1306            };
1307            results.extend(growing.iter().filter(|e| filter(e)).cloned());
1308        }
1309
1310        // Query sealed segments — parallel when multiple exist AND multi-core
1311        let sealed = self.sealed.read();
1312        let use_parallel = sealed.len() > 1 && crate::runtime::SystemInfo::should_parallelize();
1313        if use_parallel {
1314            let filter_ref = &filter;
1315            let segment_results: Vec<Vec<UnifiedEntity>> = std::thread::scope(|s| {
1316                sealed
1317                    .iter()
1318                    .map(|segment| {
1319                        s.spawn(move || {
1320                            segment
1321                                .read()
1322                                .iter()
1323                                .filter(|e| filter_ref(e))
1324                                .cloned()
1325                                .collect::<Vec<_>>()
1326                        })
1327                    })
1328                    .collect::<Vec<_>>()
1329                    .into_iter()
1330                    .map(|handle| handle.join().unwrap_or_default())
1331                    .collect()
1332            });
1333            for batch in segment_results {
1334                results.extend(batch);
1335            }
1336        } else {
1337            for segment in sealed.iter() {
1338                let seg = segment.read();
1339                results.extend(seg.iter().filter(|e| filter(e)).cloned());
1340            }
1341        }
1342
1343        results
1344    }
1345
1346    /// Query with bloom filter hint: skip the growing segment when bloom says key is absent.
1347    ///
1348    /// This is the integration point for bloom filter pruning.
1349    /// When a query has an equality predicate on a known key, the executor
1350    /// can call this instead of `query_all` to avoid scanning when the
1351    /// bloom filter proves the key doesn't exist.
1352    ///
1353    /// Returns (results, bloom_pruned) where bloom_pruned indicates if the
1354    /// segment was skipped.
1355    pub fn query_with_bloom_hint<F>(
1356        &self,
1357        key_hint: Option<&[u8]>,
1358        filter: F,
1359    ) -> (Vec<UnifiedEntity>, bool)
1360    where
1361        F: Fn(&UnifiedEntity) -> bool,
1362    {
1363        let mut results = Vec::new();
1364        let mut bloom_pruned = false;
1365
1366        if let Some(growing_arc) = self.growing.read().as_ref() {
1367            let growing = growing_arc.read();
1368            if let Some(key) = key_hint {
1369                if !growing.bloom_might_contain_key(key) {
1370                    bloom_pruned = true;
1371                    return (results, bloom_pruned);
1372                }
1373            }
1374            for entity in growing.iter() {
1375                if filter(entity) {
1376                    results.push(entity.clone());
1377                }
1378            }
1379        }
1380
1381        // Sealed segments (currently empty iter, but future-proofed)
1382        let sealed = self.sealed.read();
1383        for segment_arc in sealed.iter() {
1384            let segment = segment_arc.read();
1385            if let Some(key) = key_hint {
1386                if !segment.bloom_might_contain_key(key) {
1387                    bloom_pruned = true;
1388                    continue;
1389                }
1390            }
1391            for entity in segment.iter() {
1392                if filter(entity) {
1393                    results.push(entity.clone());
1394                }
1395            }
1396        }
1397
1398        (results, bloom_pruned)
1399    }
1400
1401    /// Filter by metadata across all segments
1402    pub fn filter_metadata(&self, filters: &[(String, MetadataFilter)]) -> Vec<EntityId> {
1403        let mut results = Vec::new();
1404
1405        // Growing segment
1406        if let Some(growing_arc) = self.growing.read().as_ref() {
1407            let growing = growing_arc.read();
1408            results.extend(growing.filter_metadata(filters));
1409        }
1410
1411        // Sealed segments
1412        let sealed = self.sealed.read();
1413        for segment in sealed.iter() {
1414            results.extend(segment.filter_metadata(filters));
1415        }
1416
1417        results
1418    }
1419
1420    /// Get entities by kind
1421    pub fn get_by_kind(&self, kind: &str) -> Vec<UnifiedEntity> {
1422        let mut results = Vec::new();
1423
1424        // Growing segment
1425        if let Some(growing_arc) = self.growing.read().as_ref() {
1426            let growing = growing_arc.read();
1427            for entity in growing.iter_kind(kind) {
1428                results.push(entity.clone());
1429            }
1430        }
1431
1432        // Sealed segments
1433        let sealed = self.sealed.read();
1434        for segment in sealed.iter() {
1435            for entity in segment.iter_kind(kind) {
1436                results.push(entity.clone());
1437            }
1438        }
1439
1440        results
1441    }
1442
1443    /// Count entities
1444    pub fn count(&self) -> usize {
1445        self.total_entities_atomic.load(Ordering::Relaxed) as usize
1446    }
1447
1448    /// Get all segment IDs
1449    pub fn segment_ids(&self) -> Vec<SegmentId> {
1450        let mut ids = Vec::new();
1451
1452        if let Some(growing_arc) = self.growing.read().as_ref() {
1453            ids.push(growing_arc.read().id());
1454        }
1455
1456        let sealed = self.sealed.read();
1457        for segment in sealed.iter() {
1458            ids.push(segment.id());
1459        }
1460
1461        ids.extend(self.archived.read().iter().copied());
1462
1463        ids
1464    }
1465
1466    /// Emit a lifecycle event.
1467    ///
1468    /// Perf: this used to push onto a `RwLock<Vec<LifecycleEvent>>`
1469    /// on every insert / delete / seal. Nobody consumes that vec
1470    /// today (no subscription API, `drain_events` has no callers),
1471    /// so the write lock + push was pure tax — and the vec grew
1472    /// unbounded in long-running processes.
1473    ///
1474    /// Current behaviour: no-op. If we ever want the hooks back,
1475    /// replace this with a bounded channel or an actual subscriber
1476    /// registry; the callers (`insert`, `delete`, `maybe_seal_growing`)
1477    /// already pass well-typed events.
1478    #[inline]
1479    #[allow(clippy::unused_self)]
1480    fn emit(&self, _event: LifecycleEvent) {}
1481
1482    /// Drain events. Kept for API compatibility; always returns
1483    /// empty because `emit` no longer buffers.
1484    pub fn drain_events(&self) -> Vec<LifecycleEvent> {
1485        std::mem::take(&mut *self.events.write())
1486    }
1487
1488    /// Run maintenance (would be called periodically in production)
1489    pub fn run_maintenance(&self) -> Result<(), SegmentError> {
1490        // Auto-seal idle segments
1491        self.maybe_seal_growing()?;
1492
1493        // Compact if too many sealed segments
1494        if self.config.enable_compaction {
1495            let sealed_count = self.sealed.read().len();
1496            if sealed_count > self.config.max_sealed_segments {
1497                // In production, we'd trigger background compaction here
1498                // For now, just log that compaction is needed
1499            }
1500        }
1501
1502        Ok(())
1503    }
1504}
1505
1506// Implement the Arc<RwLock<GrowingSegment>> as UnifiedSegment
1507// This is needed because we store growing segments in the sealed list after sealing
1508impl UnifiedSegment for Arc<RwLock<GrowingSegment>> {
1509    fn id(&self) -> SegmentId {
1510        self.read().id()
1511    }
1512
1513    fn state(&self) -> SegmentState {
1514        self.read().state()
1515    }
1516
1517    fn collection(&self) -> &str {
1518        // This is a limitation - we'd need to store collection in the Arc wrapper
1519        "unknown"
1520    }
1521
1522    fn stats(&self) -> SegmentStats {
1523        self.read().stats()
1524    }
1525
1526    fn entity_count(&self) -> usize {
1527        self.read().entity_count()
1528    }
1529
1530    fn contains(&self, id: EntityId) -> bool {
1531        self.read().contains(id)
1532    }
1533
1534    fn get(&self, id: EntityId) -> Option<&UnifiedEntity> {
1535        // This is tricky with RwLock - we can't return a reference
1536        // In production, we'd use a different approach
1537        None
1538    }
1539
1540    fn get_mut(&mut self, _id: EntityId) -> Option<&mut UnifiedEntity> {
1541        None
1542    }
1543
1544    fn insert(&mut self, entity: UnifiedEntity) -> Result<EntityId, SegmentError> {
1545        self.write().insert(entity)
1546    }
1547
1548    fn update(&mut self, entity: UnifiedEntity) -> Result<(), SegmentError> {
1549        self.write().update(entity)
1550    }
1551
1552    fn update_hot(
1553        &mut self,
1554        entity: UnifiedEntity,
1555        modified_columns: &[String],
1556    ) -> Result<(), SegmentError> {
1557        self.write().update_hot(entity, modified_columns)
1558    }
1559
1560    fn delete(&mut self, id: EntityId) -> Result<bool, SegmentError> {
1561        self.write().delete(id)
1562    }
1563
1564    fn get_metadata(&self, id: EntityId) -> Option<Metadata> {
1565        self.read().get_metadata(id)
1566    }
1567
1568    fn set_metadata(&mut self, id: EntityId, metadata: Metadata) -> Result<(), SegmentError> {
1569        self.write().set_metadata(id, metadata)
1570    }
1571
1572    fn seal(&mut self) -> Result<(), SegmentError> {
1573        self.write().seal()
1574    }
1575
1576    fn should_seal(&self, config: &SegmentConfig) -> bool {
1577        self.read().should_seal(config)
1578    }
1579
1580    fn iter(&self) -> Box<dyn Iterator<Item = &UnifiedEntity> + '_> {
1581        // Cannot return iterator with RwLock
1582        Box::new(std::iter::empty())
1583    }
1584
1585    fn iter_kind(&self, _kind_filter: &str) -> Box<dyn Iterator<Item = &UnifiedEntity> + '_> {
1586        Box::new(std::iter::empty())
1587    }
1588
1589    fn filter_metadata(&self, filters: &[(String, MetadataFilter)]) -> Vec<EntityId> {
1590        self.read().filter_metadata(filters)
1591    }
1592}
1593
1594#[cfg(test)]
1595mod tests {
1596    use super::*;
1597    use crate::storage::schema::Value;
1598
1599    #[test]
1600    fn test_manager_basic() {
1601        let manager = SegmentManager::new("test_collection");
1602
1603        let entity = UnifiedEntity::table_row(
1604            manager.next_entity_id(),
1605            "users",
1606            1,
1607            vec![Value::text("Alice".to_string())],
1608        );
1609
1610        let id = manager.insert(entity).unwrap();
1611        assert!(manager.get(id).is_some());
1612        assert_eq!(manager.count(), 1);
1613    }
1614
1615    #[test]
1616    fn test_manager_auto_seal() {
1617        let config = ManagerConfig {
1618            segment_config: SegmentConfig {
1619                max_entities: 2,
1620                ..Default::default()
1621            },
1622            ..Default::default()
1623        };
1624
1625        let manager = SegmentManager::with_config("test", config);
1626
1627        // Insert first entity
1628        manager
1629            .insert(UnifiedEntity::vector(
1630                manager.next_entity_id(),
1631                "v",
1632                vec![0.1],
1633            ))
1634            .unwrap();
1635
1636        // Insert second entity (triggers seal check)
1637        manager
1638            .insert(UnifiedEntity::vector(
1639                manager.next_entity_id(),
1640                "v",
1641                vec![0.2],
1642            ))
1643            .unwrap();
1644
1645        // Insert third entity (should trigger auto-seal of first segment)
1646        manager
1647            .insert(UnifiedEntity::vector(
1648                manager.next_entity_id(),
1649                "v",
1650                vec![0.3],
1651            ))
1652            .unwrap();
1653
1654        let stats = manager.stats();
1655        assert_eq!(stats.total_entities, 3);
1656    }
1657
1658    #[test]
1659    fn test_manager_delete() {
1660        let manager = SegmentManager::new("test");
1661
1662        let id = manager
1663            .insert(UnifiedEntity::vector(
1664                manager.next_entity_id(),
1665                "v",
1666                vec![0.1],
1667            ))
1668            .unwrap();
1669
1670        assert!(manager.get(id).is_some());
1671        assert!(manager.delete(id).unwrap());
1672        assert!(manager.get(id).is_none());
1673    }
1674
1675    #[test]
1676    fn test_manager_metadata() {
1677        let manager = SegmentManager::new("test");
1678
1679        let id = manager
1680            .insert(UnifiedEntity::table_row(
1681                manager.next_entity_id(),
1682                "hosts",
1683                1,
1684                vec![Value::text("192.168.1.1".to_string())],
1685            ))
1686            .unwrap();
1687
1688        let mut meta = Metadata::new();
1689        meta.set(
1690            "os",
1691            super::super::metadata::MetadataValue::String("linux".to_string()),
1692        );
1693
1694        manager.set_metadata(id, meta).unwrap();
1695
1696        let retrieved = manager.get_metadata(id).unwrap();
1697        assert!(retrieved.has("os"));
1698    }
1699
1700    #[test]
1701    fn test_manager_query_by_kind() {
1702        let manager = SegmentManager::new("test");
1703
1704        manager
1705            .insert(UnifiedEntity::table_row(
1706                manager.next_entity_id(),
1707                "hosts",
1708                1,
1709                vec![],
1710            ))
1711            .unwrap();
1712
1713        manager
1714            .insert(UnifiedEntity::vector(
1715                manager.next_entity_id(),
1716                "embeddings",
1717                vec![0.1],
1718            ))
1719            .unwrap();
1720
1721        manager
1722            .insert(UnifiedEntity::table_row(
1723                manager.next_entity_id(),
1724                "hosts",
1725                2,
1726                vec![],
1727            ))
1728            .unwrap();
1729
1730        let rows = manager.get_by_kind("table");
1731        assert_eq!(rows.len(), 2);
1732
1733        let vectors = manager.get_by_kind("vector");
1734        assert_eq!(vectors.len(), 1);
1735    }
1736
1737    #[test]
1738    #[ignore = "lifecycle events intentionally no-op since the emit-channel refactor; drain_events returns empty — see SegmentManager::emit"]
1739    fn test_lifecycle_events() {
1740        let manager = SegmentManager::new("test");
1741
1742        manager
1743            .insert(UnifiedEntity::vector(
1744                manager.next_entity_id(),
1745                "v",
1746                vec![0.1],
1747            ))
1748            .unwrap();
1749
1750        let events = manager.drain_events();
1751
1752        // Should have: SegmentCreated, EntityInserted
1753        assert!(events
1754            .iter()
1755            .any(|e| matches!(e, LifecycleEvent::SegmentCreated(_))));
1756        assert!(events
1757            .iter()
1758            .any(|e| matches!(e, LifecycleEvent::EntityInserted(_, _))));
1759    }
1760}