Skip to main content

reddb_server/storage/unified/
manager.rs

1//! Segment Manager
2//!
3//! Manages the lifecycle of unified segments: creation, sealing, compaction,
4//! and archival. Coordinates writes to growing segments and queries across
5//! all segments.
6//!
7//! # Responsibilities
8//!
9//! - Route writes to the active growing segment
10//! - Auto-seal segments when thresholds are met
11//! - Coordinate queries across multiple segments
12//! - Background compaction of sealed segments
13//! - Archive old segments to cold storage
14
15use parking_lot::RwLock;
16use std::collections::HashMap;
17use std::sync::atomic::{AtomicU64, Ordering};
18use std::sync::Arc;
19
20use super::entity::{EntityId, UnifiedEntity};
21use super::metadata::{Metadata, MetadataFilter};
22use super::segment::{
23    GrowingSegment, SegmentConfig, SegmentError, SegmentId, SegmentState, SegmentStats,
24    UnifiedSegment, ZoneColPred, ZoneColPredKind,
25};
26use crate::storage::btree::visibility_map::VisibilityMap;
27
28/// Configuration for the segment manager
29#[derive(Debug, Clone)]
30pub struct ManagerConfig {
31    /// Segment configuration
32    pub segment_config: SegmentConfig,
33    /// Maximum number of sealed segments before compaction
34    pub max_sealed_segments: usize,
35    /// Idle time (seconds) before auto-sealing
36    pub idle_seal_secs: u64,
37    /// Enable background compaction
38    pub enable_compaction: bool,
39    /// Enable background archival
40    pub enable_archival: bool,
41    /// Age threshold for archival (seconds)
42    pub archive_age_secs: u64,
43}
44
45impl Default for ManagerConfig {
46    fn default() -> Self {
47        Self {
48            segment_config: SegmentConfig::default(),
49            max_sealed_segments: 10,
50            idle_seal_secs: 300, // 5 minutes
51            enable_compaction: true,
52            enable_archival: true,
53            archive_age_secs: 86400 * 7, // 7 days
54        }
55    }
56}
57
58/// Manager statistics
59#[derive(Debug, Clone, Default)]
60pub struct ManagerStats {
61    /// Total entities across all segments
62    pub total_entities: usize,
63    /// Number of growing segments
64    pub growing_count: usize,
65    /// Number of sealed segments
66    pub sealed_count: usize,
67    /// Number of archived segments
68    pub archived_count: usize,
69    /// Total memory usage
70    pub total_memory_bytes: usize,
71    /// Number of seal operations
72    pub seal_ops: u64,
73    /// Number of compaction operations
74    pub compact_ops: u64,
75}
76
77/// Lifecycle events for monitoring
78#[derive(Debug, Clone)]
79pub enum LifecycleEvent {
80    SegmentCreated(SegmentId),
81    SegmentSealed(SegmentId),
82    SegmentCompacted {
83        source: Vec<SegmentId>,
84        target: SegmentId,
85    },
86    SegmentArchived(SegmentId),
87    EntityInserted(EntityId, SegmentId),
88    EntityDeleted(EntityId, SegmentId),
89}
90
91/// Segment manager for a collection
92pub struct SegmentManager {
93    /// Collection name
94    collection: String,
95    /// Configuration
96    config: ManagerConfig,
97    /// Next segment ID counter
98    next_segment_id: AtomicU64,
99    /// Next entity ID counter
100    next_entity_id: AtomicU64,
101    /// Per-table auto-increment row ID (1, 2, 3... per collection)
102    next_row_id: AtomicU64,
103    /// Hot-path entity counter — lock-free, updated by every insert/delete.
104    /// Replaces stats.total_entities on the write path to eliminate a lock
105    /// acquisition per row (from 4 lock ops per insert → 2).
106    total_entities_atomic: AtomicU64,
107    /// Currently active growing segment
108    growing: RwLock<Option<Arc<RwLock<GrowingSegment>>>>,
109    /// Sealed segments (immutable, queryable)
110    sealed: RwLock<Vec<Arc<RwLock<GrowingSegment>>>>,
111    /// Archived segment IDs (stored externally)
112    archived: RwLock<Vec<SegmentId>>,
113    /// Entity to segment mapping (for fast lookups by individually-inserted entities).
114    /// Bulk-inserted entities skip this map; their segment is found by sequential scan
115    /// of growing + sealed segments in get()/update()/delete().
116    entity_segment: RwLock<HashMap<EntityId, SegmentId>>,
117    /// Shared column schema: column_name → index in Vec<Value>.
118    /// Populated on first bulk_insert. Enables columnar storage (Vec instead of HashMap per row).
119    column_schema: RwLock<Option<Arc<Vec<String>>>>,
120    /// Statistics (slow path — not updated on every insert).
121    stats: RwLock<ManagerStats>,
122    /// Event listeners (simplified - would be channels in production)
123    events: RwLock<Vec<LifecycleEvent>>,
124    /// Visibility map: sealed segment entity ranges marked as all-visible.
125    /// Growing segment is never all-visible (writes are in-flight).
126    /// Used by index-only scan decisions.
127    visibility_map: VisibilityMap,
128}
129
130impl SegmentManager {
131    /// Create a new segment manager
132    pub fn new(collection: impl Into<String>) -> Self {
133        Self::with_config(collection, ManagerConfig::default())
134    }
135
136    /// Create with custom configuration
137    pub fn with_config(collection: impl Into<String>, config: ManagerConfig) -> Self {
138        Self {
139            collection: collection.into(),
140            config,
141            next_segment_id: AtomicU64::new(1),
142            next_entity_id: AtomicU64::new(1),
143            next_row_id: AtomicU64::new(1),
144            total_entities_atomic: AtomicU64::new(0),
145            growing: RwLock::new(None),
146            sealed: RwLock::new(Vec::new()),
147            archived: RwLock::new(Vec::new()),
148            entity_segment: RwLock::new(HashMap::new()),
149            column_schema: RwLock::new(None),
150            stats: RwLock::new(ManagerStats::default()),
151            events: RwLock::new(Vec::new()),
152            visibility_map: VisibilityMap::new(),
153        }
154    }
155
156    /// Get or create the shared column schema from first row's named fields.
157    pub fn get_or_init_schema(
158        &self,
159        named: &HashMap<String, crate::storage::schema::Value>,
160    ) -> Arc<Vec<String>> {
161        {
162            let schema = self.column_schema.read();
163            if let Some(ref s) = *schema {
164                return Arc::clone(s);
165            }
166        }
167        let cols: Vec<String> = named.keys().cloned().collect();
168        let arc = Arc::new(cols);
169        *self.column_schema.write() = Some(Arc::clone(&arc));
170        arc
171    }
172
173    /// Get the column schema if it exists.
174    pub fn column_schema(&self) -> Option<Arc<Vec<String>>> {
175        self.column_schema.read().clone()
176    }
177
178    pub(crate) fn set_column_schema_if_empty(&self, columns: Vec<String>) {
179        if columns.is_empty() {
180            return;
181        }
182        let mut schema = self.column_schema.write();
183        if schema.is_none() {
184            *schema = Some(Arc::new(columns));
185        }
186    }
187
188    /// Get collection name
189    pub fn collection(&self) -> &str {
190        &self.collection
191    }
192
193    /// Get configuration
194    pub fn config(&self) -> &ManagerConfig {
195        &self.config
196    }
197
198    /// Get statistics. total_entities is read from the lock-free atomic;
199    /// other fields come from the slow-path stats struct.
200    pub fn stats(&self) -> ManagerStats {
201        let mut s = self.stats.read().clone();
202        s.total_entities = self.total_entities_atomic.load(Ordering::Relaxed) as usize;
203        s
204    }
205
206    /// Generate a new entity ID
207    pub fn next_entity_id(&self) -> EntityId {
208        EntityId::new(self.next_entity_id.fetch_add(1, Ordering::SeqCst))
209    }
210
211    /// Generate a per-table sequential row ID (1, 2, 3... per collection)
212    pub fn next_row_id(&self) -> u64 {
213        self.next_row_id.fetch_add(1, Ordering::SeqCst)
214    }
215
216    /// Reserve `n` contiguous per-table row IDs with one atomic
217    /// fetch_add. Caller assigns `row_id = start + i` per entity.
218    /// Saves N-1 atomic RMWs on bulk inserts (25k atomics → 1).
219    pub fn reserve_row_ids(&self, n: u64) -> std::ops::Range<u64> {
220        let start = self.next_row_id.fetch_add(n, Ordering::SeqCst);
221        start..start + n
222    }
223
224    /// Advance the per-table row_id counter to at least `id + 1`.
225    /// Called during load to restore the counter from existing data.
226    pub fn register_row_id(&self, id: u64) {
227        let candidate = id.saturating_add(1);
228        let mut current = self.next_row_id.load(Ordering::SeqCst);
229        while candidate > current {
230            match self.next_row_id.compare_exchange(
231                current,
232                candidate,
233                Ordering::SeqCst,
234                Ordering::SeqCst,
235            ) {
236                Ok(_) => break,
237                Err(updated) => current = updated,
238            }
239        }
240    }
241
242    /// Get or create the active growing segment.
243    ///
244    /// Fast path: read lock only — no write contention when the segment already exists.
245    /// Concurrent writers each clone the `Arc` under a shared read lock, then compete
246    /// on the segment's own write lock. This eliminates the global write-lock serialisation
247    /// that previously throttled concurrent inserts to ~238 ops/s.
248    fn get_or_create_growing(&self) -> Arc<RwLock<GrowingSegment>> {
249        // Common case: segment already exists — shared read lock, zero contention.
250        {
251            let growing = self.growing.read();
252            if let Some(segment) = growing.as_ref() {
253                return Arc::clone(segment);
254            }
255        }
256
257        // Slow path: segment missing — take exclusive write lock to create it.
258        let mut growing = self.growing.write();
259        // Double-check: another thread may have created it between the two lock acquisitions.
260        if let Some(segment) = growing.as_ref() {
261            return Arc::clone(segment);
262        }
263
264        let id = self.next_segment_id.fetch_add(1, Ordering::SeqCst);
265        let segment = GrowingSegment::new(id, &self.collection);
266        let segment_arc = Arc::new(RwLock::new(segment));
267        *growing = Some(Arc::clone(&segment_arc));
268
269        self.emit(LifecycleEvent::SegmentCreated(id));
270
271        // Update growing_count in the slow-path stats struct.
272        // This is the rare segment-creation path — locking is fine here.
273        self.stats.write().growing_count += 1;
274
275        segment_arc
276    }
277
278    /// Insert a new entity
279    pub fn insert(&self, mut entity: UnifiedEntity) -> Result<EntityId, SegmentError> {
280        // Check if we need to seal the current segment first
281        self.maybe_seal_growing()?;
282
283        let segment_arc = self.get_or_create_growing();
284        let mut segment = segment_arc.write();
285
286        // Assign entity ID if not already set
287        if entity.id.raw() == 0 {
288            entity.id = self.next_entity_id();
289        }
290
291        let entity_id = entity.id;
292        let segment_id = segment.id();
293
294        segment.insert(entity)?;
295
296        // Lock-free counter update — eliminates the stats write lock on the hot path.
297        self.total_entities_atomic.fetch_add(1, Ordering::Relaxed);
298
299        // entity_segment map is intentionally NOT updated here.
300        // update() and update_hot() first probe the growing segment directly
301        // (growing.contains(entity.id)) before consulting this map, so entities
302        // that were just inserted are found without entity_segment. The map is
303        // only consulted for entities that may have been moved to sealed segments,
304        // which can't be updated anyway (state().is_writable() == false).
305        // Skipping this write removes one exclusive HashMap lock per insert.
306
307        self.emit(LifecycleEvent::EntityInserted(entity_id, segment_id));
308
309        Ok(entity_id)
310    }
311
312    /// Insert multiple entities (batch) — sequential, one lock per item.
313    pub fn insert_batch(
314        &self,
315        entities: Vec<UnifiedEntity>,
316    ) -> Result<Vec<EntityId>, SegmentError> {
317        let mut ids = Vec::with_capacity(entities.len());
318        for entity in entities {
319            ids.push(self.insert(entity)?);
320        }
321        Ok(ids)
322    }
323
324    /// Turbo bulk insert — single lock acquisition for the entire batch.
325    /// Skips bloom filter, memtable, and cross-ref indexing for maximum speed.
326    pub fn bulk_insert(
327        &self,
328        mut entities: Vec<UnifiedEntity>,
329    ) -> Result<Vec<EntityId>, SegmentError> {
330        // Assign IDs and per-table row_ids.
331        for entity in &mut entities {
332            if entity.id.raw() == 0 {
333                entity.id = self.next_entity_id();
334            }
335            if let super::entity::EntityKind::TableRow { ref mut row_id, .. } = entity.kind {
336                if *row_id == 0 {
337                    *row_id = self.next_row_id();
338                } else {
339                    self.register_row_id(*row_id);
340                }
341            }
342        }
343
344        // Convert named HashMap → positional Vec (compact memory representation)
345        // The schema (column order) is shared across all rows in the collection.
346        if let Some(first_row) = entities.first() {
347            if let super::entity::EntityData::Row(ref row) = first_row.data {
348                if let Some(ref named) = row.named {
349                    let schema = self.get_or_init_schema(named);
350                    for entity in &mut entities {
351                        if let super::entity::EntityData::Row(ref mut row) = entity.data {
352                            if let Some(named) = row.named.take() {
353                                let mut cols = Vec::with_capacity(schema.len());
354                                for col_name in schema.iter() {
355                                    cols.push(
356                                        named
357                                            .get(col_name)
358                                            .cloned()
359                                            .unwrap_or(crate::storage::schema::Value::Null),
360                                    );
361                                }
362                                row.columns = cols;
363                                row.schema = Some(Arc::clone(&schema));
364                            }
365                        }
366                    }
367                }
368            }
369        }
370
371        let segment_arc = self.get_or_create_growing();
372        let mut segment = segment_arc.write();
373        let segment_id = segment.id();
374
375        // Single call to GrowingSegment.bulk_insert (one lock, no bloom/memtable)
376        let ids = segment.bulk_insert(entities)?;
377
378        // Skip entity-segment mapping for bulk inserts (saves ~56 bytes/entity).
379        // The get() method scans growing+sealed segments directly.
380
381        // Lock-free batch counter update.
382        self.total_entities_atomic
383            .fetch_add(ids.len() as u64, Ordering::Relaxed);
384
385        Ok(ids)
386    }
387
388    /// Get an entity by ID — scans growing then sealed segments.
389    pub fn get(&self, id: EntityId) -> Option<UnifiedEntity> {
390        // Growing segment first (most likely for recent inserts)
391        if let Some(growing_arc) = self.growing.read().as_ref() {
392            let growing = growing_arc.read();
393            if let Some(entity) = growing.get(id) {
394                return Some(entity.clone());
395            }
396        }
397
398        // Then sealed segments
399        let sealed = self.sealed.read();
400        for segment in sealed.iter() {
401            let seg = segment.read();
402            if let Some(entity) = seg.get(id) {
403                return Some(entity.clone());
404            }
405        }
406
407        None
408    }
409
410    /// Batch-fetch multiple entities by ID in a single lock acquisition per segment.
411    ///
412    /// For indexed-scan result sets (up to ~5000 ids from range/bitmap lookup) this
413    /// is 2-3 lock acquisitions total vs N×3 with individual `get()` calls.
414    pub fn get_many(&self, ids: &[EntityId]) -> Vec<Option<UnifiedEntity>> {
415        let mut out: Vec<Option<UnifiedEntity>> = vec![None; ids.len()];
416        let mut remaining: Vec<usize> = (0..ids.len()).collect(); // indices still unfound
417
418        // Growing segment — one read lock for the entire batch.
419        // Non-blocking first: if a writer is active, fall back to blocking.
420        if let Some(growing_arc) = self.growing.read().as_ref() {
421            let growing = if let Some(g) = growing_arc.try_read() {
422                g
423            } else {
424                growing_arc.read()
425            };
426            remaining.retain(|&i| {
427                if let Some(entity) = growing.get(ids[i]) {
428                    out[i] = Some(entity.clone());
429                    false // remove from remaining
430                } else {
431                    true // keep — not found yet
432                }
433            });
434        }
435
436        if remaining.is_empty() {
437            return out;
438        }
439
440        // Sealed segments — one read lock per segment
441        let sealed = self.sealed.read();
442        for segment in sealed.iter() {
443            if remaining.is_empty() {
444                break;
445            }
446            let seg = segment.read();
447            remaining.retain(|&i| {
448                if let Some(entity) = seg.get(ids[i]) {
449                    out[i] = Some(entity.clone());
450                    false
451                } else {
452                    true
453                }
454            });
455        }
456
457        out
458    }
459
460    /// Visitor-pattern batch fetch. Invokes `f(&UnifiedEntity, usize_index)`
461    /// for each id that resolves, never cloning the entity.
462    ///
463    /// Used by scan hot paths (select_range, select_filtered) that
464    /// materialize each entity into an output record and don't need
465    /// an owned `UnifiedEntity`. Eliminates ~20% of scan CPU spent in
466    /// `UnifiedEntity::clone` when `get_batch` is followed by
467    /// `runtime_table_record_lean(entity)`.
468    ///
469    /// The closure runs while the segment read lock is held, so it
470    /// must be short — avoid doing I/O or taking unrelated locks in
471    /// `f`.
472    pub fn for_each_id<F>(&self, ids: &[EntityId], mut f: F)
473    where
474        F: FnMut(usize, &UnifiedEntity),
475    {
476        // Thread-local scratch buffer for the "pending" index list.
477        // Previous code allocated a fresh `Vec<usize>` of capacity
478        // N on every call — 4200 × 1000 queries / scenario on the
479        // select_range bench path. Take-and-restore pattern (vs
480        // RefCell::borrow_mut) so user closures that recurse into
481        // another `for_each_id` don't panic on a re-borrow; worst
482        // case they allocate a fresh buffer and we lose the caching
483        // win for that nested call.
484        thread_local! {
485            static REMAINING_SCRATCH: std::cell::Cell<Vec<usize>> =
486                const { std::cell::Cell::new(Vec::new()) };
487        }
488
489        let mut remaining: Vec<usize> = REMAINING_SCRATCH.with(|cell| cell.take());
490        remaining.clear();
491
492        if let Some(growing_arc) = self.growing.read().as_ref() {
493            let growing = if let Some(g) = growing_arc.try_read() {
494                g
495            } else {
496                growing_arc.read()
497            };
498            for (i, id) in ids.iter().enumerate() {
499                if let Some(entity) = growing.get(*id) {
500                    f(i, entity);
501                } else {
502                    remaining.push(i);
503                }
504            }
505        } else {
506            remaining.reserve(ids.len());
507            remaining.extend(0..ids.len());
508        }
509
510        if !remaining.is_empty() {
511            let sealed = self.sealed.read();
512            for segment in sealed.iter() {
513                if remaining.is_empty() {
514                    break;
515                }
516                let seg = segment.read();
517                remaining.retain(|&i| {
518                    if let Some(entity) = seg.get(ids[i]) {
519                        f(i, entity);
520                        false
521                    } else {
522                        true
523                    }
524                });
525            }
526        }
527
528        REMAINING_SCRATCH.with(|cell| cell.set(remaining));
529    }
530
531    /// Scan all segments for an entity
532    fn scan_for_entity(&self, id: EntityId) -> Option<UnifiedEntity> {
533        // Check growing
534        if let Some(growing_arc) = self.growing.read().as_ref() {
535            let growing = growing_arc.read();
536            if let Some(entity) = growing.get(id) {
537                return Some(entity.clone());
538            }
539        }
540
541        // Check sealed
542        let sealed = self.sealed.read();
543        for segment in sealed.iter() {
544            if let Some(entity) = segment.get(id) {
545                return Some(entity.clone());
546            }
547        }
548
549        None
550    }
551
552    fn find_sealed_segment_arc(&self, id: EntityId) -> Option<Arc<RwLock<GrowingSegment>>> {
553        let sealed = self.sealed.read();
554        sealed
555            .iter()
556            .find(|segment_arc| segment_arc.read().contains(id))
557            .map(Arc::clone)
558    }
559
560    fn rewrite_sealed_entity_into_growing(
561        &self,
562        entity: UnifiedEntity,
563        metadata: Option<&Metadata>,
564    ) -> Result<(), SegmentError> {
565        let entity_id = entity.id;
566        let sealed_arc = self
567            .find_sealed_segment_arc(entity_id)
568            .ok_or(SegmentError::NotFound(entity_id))?;
569
570        let metadata_to_apply = {
571            let mut sealed = sealed_arc.write();
572            let existing_metadata = sealed.get_metadata(entity_id);
573            if !sealed.force_delete(entity_id) {
574                return Err(SegmentError::NotFound(entity_id));
575            }
576            metadata.cloned().or(existing_metadata)
577        };
578
579        let growing_arc = self.get_or_create_growing();
580        let growing_id = {
581            let mut growing = growing_arc.write();
582            growing.insert(entity)?;
583            if let Some(metadata) = metadata_to_apply {
584                growing.set_metadata(entity_id, metadata)?;
585            }
586            growing.id()
587        };
588
589        self.entity_segment.write().insert(entity_id, growing_id);
590        Ok(())
591    }
592
593    /// Update an entity
594    pub fn update(&self, entity: UnifiedEntity) -> Result<(), SegmentError> {
595        let entity_id = entity.id;
596        let mut entity = Some(entity);
597
598        // Try growing segment directly (covers bulk-inserted entities without entity_segment map)
599        if let Some(growing_arc) = self.growing.read().as_ref() {
600            let mut growing = growing_arc.write();
601            if growing.contains(entity_id) && growing.state().is_writable() {
602                return growing.update(entity.take().expect("entity already moved"));
603            }
604        }
605
606        // Try entity_segment mapping for individually inserted entities
607        let segment_id = self.entity_segment.read().get(&entity_id).copied();
608        if let Some(seg_id) = segment_id {
609            if let Some(growing_arc) = self.growing.read().as_ref() {
610                let mut growing = growing_arc.write();
611                if growing.id() == seg_id && growing.state().is_writable() {
612                    return growing.update(entity.take().expect("entity already moved"));
613                }
614            }
615        }
616
617        if let Some(entity) = entity.take() {
618            return self.rewrite_sealed_entity_into_growing(entity, None);
619        }
620
621        Err(SegmentError::NotFound(entity_id))
622    }
623
624    /// Update an entity and, optionally, replace its metadata while holding
625    /// the segment write lock only once.
626    pub fn update_with_metadata(
627        &self,
628        entity: UnifiedEntity,
629        metadata: Option<&Metadata>,
630    ) -> Result<(), SegmentError> {
631        let entity_id = entity.id;
632        let mut entity = Some(entity);
633
634        // Try growing segment directly (covers bulk-inserted entities without entity_segment map)
635        if let Some(growing_arc) = self.growing.read().as_ref() {
636            let mut growing = growing_arc.write();
637            if growing.contains(entity_id) && growing.state().is_writable() {
638                growing.update(entity.take().expect("entity already moved"))?;
639                if let Some(metadata) = metadata {
640                    growing.set_metadata(entity_id, metadata.clone())?;
641                }
642                return Ok(());
643            }
644        }
645
646        // Try entity_segment mapping for individually inserted entities
647        let segment_id = self.entity_segment.read().get(&entity_id).copied();
648        if let Some(seg_id) = segment_id {
649            if let Some(growing_arc) = self.growing.read().as_ref() {
650                let mut growing = growing_arc.write();
651                if growing.id() == seg_id && growing.state().is_writable() {
652                    growing.update(entity.take().expect("entity already moved"))?;
653                    if let Some(metadata) = metadata {
654                        growing.set_metadata(entity_id, metadata.clone())?;
655                    }
656                    return Ok(());
657                }
658            }
659        }
660
661        if let Some(entity) = entity.take() {
662            return self.rewrite_sealed_entity_into_growing(entity, metadata);
663        }
664
665        Err(SegmentError::NotFound(entity_id))
666    }
667
668    /// HOT-update: like update but skips index work for unchanged columns.
669    /// `modified_columns` is the list of column names actually changed by the
670    /// UPDATE statement — lets us skip pk_index and cross_ref when safe.
671    pub fn update_hot(
672        &self,
673        entity: UnifiedEntity,
674        modified_columns: &[String],
675    ) -> Result<(), SegmentError> {
676        let entity_id = entity.id;
677        let mut entity = Some(entity);
678
679        if let Some(growing_arc) = self.growing.read().as_ref() {
680            let mut growing = growing_arc.write();
681            if growing.contains(entity_id) && growing.state().is_writable() {
682                return growing.update_hot(
683                    entity.take().expect("entity already moved"),
684                    modified_columns,
685                );
686            }
687        }
688
689        let segment_id = self.entity_segment.read().get(&entity_id).copied();
690        if let Some(seg_id) = segment_id {
691            if let Some(growing_arc) = self.growing.read().as_ref() {
692                let mut growing = growing_arc.write();
693                if growing.id() == seg_id && growing.state().is_writable() {
694                    return growing.update_hot(
695                        entity.take().expect("entity already moved"),
696                        modified_columns,
697                    );
698                }
699            }
700        }
701
702        if let Some(entity) = entity.take() {
703            return self.rewrite_sealed_entity_into_growing(entity, None);
704        }
705
706        Err(SegmentError::NotFound(entity_id))
707    }
708
709    /// HOT-update an entity and, optionally, replace its metadata while
710    /// holding the segment write lock only once.
711    pub fn update_hot_with_metadata(
712        &self,
713        entity: UnifiedEntity,
714        modified_columns: &[String],
715        metadata: Option<&Metadata>,
716    ) -> Result<(), SegmentError> {
717        let entity_id = entity.id;
718        let mut entity = Some(entity);
719
720        if let Some(growing_arc) = self.growing.read().as_ref() {
721            let mut growing = growing_arc.write();
722            if growing.contains(entity_id) && growing.state().is_writable() {
723                growing.update_hot(
724                    entity.take().expect("entity already moved"),
725                    modified_columns,
726                )?;
727                if let Some(metadata) = metadata {
728                    growing.set_metadata(entity_id, metadata.clone())?;
729                }
730                return Ok(());
731            }
732        }
733
734        let segment_id = self.entity_segment.read().get(&entity_id).copied();
735        if let Some(seg_id) = segment_id {
736            if let Some(growing_arc) = self.growing.read().as_ref() {
737                let mut growing = growing_arc.write();
738                if growing.id() == seg_id && growing.state().is_writable() {
739                    growing.update_hot(
740                        entity.take().expect("entity already moved"),
741                        modified_columns,
742                    )?;
743                    if let Some(metadata) = metadata {
744                        growing.set_metadata(entity_id, metadata.clone())?;
745                    }
746                    return Ok(());
747                }
748            }
749        }
750
751        if let Some(entity) = entity.take() {
752            return self.rewrite_sealed_entity_into_growing(entity, metadata);
753        }
754
755        Err(SegmentError::NotFound(entity_id))
756    }
757
758    /// Batch HOT-update multiple entities while holding the growing-segment
759    /// write lock only once when possible.
760    pub fn update_hot_batch_with_metadata<'a, I>(&self, items: I) -> Result<(), SegmentError>
761    where
762        I: IntoIterator<Item = (&'a UnifiedEntity, &'a [String], Option<&'a Metadata>)>,
763    {
764        let items: Vec<(&UnifiedEntity, &[String], Option<&Metadata>)> =
765            items.into_iter().collect();
766        if items.is_empty() {
767            return Ok(());
768        }
769
770        if let Some(growing_arc) = self.growing.read().as_ref() {
771            let mut growing = growing_arc.write();
772            if growing.state().is_writable() {
773                match growing.update_hot_batch_with_metadata(items.iter().copied()) {
774                    Ok(()) => return Ok(()),
775                    Err(SegmentError::NotFound(_)) => {}
776                    Err(other) => return Err(other),
777                }
778            }
779        }
780
781        for (entity, modified_columns, metadata) in items {
782            self.update_hot_with_metadata(entity.clone(), modified_columns, metadata)?;
783        }
784        Ok(())
785    }
786
787    /// Delete an entity
788    pub fn delete(&self, id: EntityId) -> Result<bool, SegmentError> {
789        // Fast path: probe the growing segment directly — covers entities inserted via
790        // insert() which no longer writes to entity_segment, and bulk-inserted entities.
791        if let Some(growing_arc) = self.growing.read().as_ref() {
792            let mut growing = growing_arc.write();
793            if growing.contains(id) && growing.state().is_writable() {
794                let seg_id = growing.id();
795                let deleted = growing.delete(id)?;
796                if deleted {
797                    self.entity_segment.write().remove(&id);
798                    self.total_entities_atomic.fetch_sub(1, Ordering::Relaxed);
799                    self.emit(LifecycleEvent::EntityDeleted(id, seg_id));
800                }
801                return Ok(deleted);
802            }
803        }
804
805        // Fallback: check entity_segment map (populated for older insert() paths
806        // or entities that were in a previous growing segment).
807        let segment_id = self.entity_segment.read().get(&id).copied();
808
809        if let Some(seg_id) = segment_id {
810            if let Some(growing_arc) = self.growing.read().as_ref() {
811                let mut growing = growing_arc.write();
812                if growing.id() == seg_id && growing.state().is_writable() {
813                    let deleted = growing.delete(id)?;
814                    if deleted {
815                        self.entity_segment.write().remove(&id);
816                        self.total_entities_atomic.fetch_sub(1, Ordering::Relaxed);
817                        self.emit(LifecycleEvent::EntityDeleted(id, seg_id));
818                    }
819                    return Ok(deleted);
820                }
821            }
822        }
823
824        // Fallback: entity is in a sealed segment (bulk-inserted, not in entity_segment map).
825        // Single write-lock per segment to avoid TOCTOU race between contains() and force_delete().
826        {
827            let sealed = self.sealed.read();
828            for segment_arc in sealed.iter() {
829                let mut seg = segment_arc.write();
830                let seg_id = seg.id();
831                if seg.contains(id) {
832                    let deleted = seg.force_delete(id);
833                    drop(seg);
834                    if deleted {
835                        self.entity_segment.write().remove(&id);
836                        self.total_entities_atomic.fetch_sub(1, Ordering::Relaxed);
837                        self.emit(LifecycleEvent::EntityDeleted(id, seg_id));
838                    }
839                    return Ok(deleted);
840                }
841            }
842        }
843
844        Ok(false)
845    }
846
847    pub fn delete_batch(&self, ids: &[EntityId]) -> Result<Vec<EntityId>, SegmentError> {
848        if ids.is_empty() {
849            return Ok(Vec::new());
850        }
851
852        let mut deleted_ids = Vec::with_capacity(ids.len());
853
854        if let Some(growing_arc) = self.growing.read().as_ref() {
855            let mut growing = growing_arc.write();
856            if growing.state().is_writable() {
857                let seg_id = growing.id();
858                let deleted = growing.delete_batch(ids)?;
859                if !deleted.is_empty() {
860                    {
861                        let mut entity_segment = self.entity_segment.write();
862                        for id in &deleted {
863                            entity_segment.remove(id);
864                        }
865                    }
866                    self.total_entities_atomic
867                        .fetch_sub(deleted.len() as u64, Ordering::Relaxed);
868                    for id in &deleted {
869                        self.emit(LifecycleEvent::EntityDeleted(*id, seg_id));
870                    }
871                    deleted_ids.extend(deleted);
872                }
873            }
874        }
875
876        if deleted_ids.len() == ids.len() {
877            return Ok(deleted_ids);
878        }
879
880        let deleted_set: std::collections::HashSet<EntityId> =
881            deleted_ids.iter().copied().collect();
882        for &id in ids {
883            if deleted_set.contains(&id) {
884                continue;
885            }
886            if self.delete(id)? {
887                deleted_ids.push(id);
888            }
889        }
890
891        Ok(deleted_ids)
892    }
893
894    /// Get metadata for an entity
895    pub fn get_metadata(&self, id: EntityId) -> Option<Metadata> {
896        // Fast path: probe growing segment directly (no entity_segment needed).
897        if let Some(growing_arc) = self.growing.read().as_ref() {
898            let growing = growing_arc.read();
899            if growing.contains(id) {
900                return growing.get_metadata(id);
901            }
902        }
903
904        // Fallback: entity_segment map (for pre-existing or sealed entities)
905        let segment_id = self.entity_segment.read().get(&id).copied();
906
907        if let Some(seg_id) = segment_id {
908            if let Some(growing_arc) = self.growing.read().as_ref() {
909                let growing = growing_arc.read();
910                if growing.id() == seg_id {
911                    return growing.get_metadata(id);
912                }
913            }
914
915            let sealed = self.sealed.read();
916            for segment in sealed.iter() {
917                if segment.id() == seg_id {
918                    return segment.get_metadata(id);
919                }
920            }
921        }
922
923        if let Some(segment_arc) = self.find_sealed_segment_arc(id) {
924            return segment_arc.read().get_metadata(id);
925        }
926
927        None
928    }
929
930    /// Set metadata for an entity
931    pub fn set_metadata(&self, id: EntityId, metadata: Metadata) -> Result<(), SegmentError> {
932        // Fast path: probe growing segment directly — covers entities inserted via
933        // insert() which no longer writes to entity_segment.
934        if let Some(growing_arc) = self.growing.read().as_ref() {
935            let mut growing = growing_arc.write();
936            if growing.contains(id) && growing.state().is_writable() {
937                return growing.set_metadata(id, metadata);
938            }
939        }
940
941        // Fallback: entity_segment map (sealed or pre-atomic-path entities)
942        let segment_id = self.entity_segment.read().get(&id).copied();
943
944        if let Some(seg_id) = segment_id {
945            if let Some(growing_arc) = self.growing.read().as_ref() {
946                let mut growing = growing_arc.write();
947                if growing.id() == seg_id && growing.state().is_writable() {
948                    return growing.set_metadata(id, metadata);
949                }
950            }
951        }
952
953        if let Some(entity) = self.get(id) {
954            return self.rewrite_sealed_entity_into_growing(entity, Some(&metadata));
955        }
956
957        Err(SegmentError::NotFound(id))
958    }
959
960    /// Check if growing segment should be sealed
961    fn maybe_seal_growing(&self) -> Result<(), SegmentError> {
962        let should_seal = {
963            let growing_opt = self.growing.read();
964            if let Some(growing_arc) = growing_opt.as_ref() {
965                let growing = growing_arc.read();
966                growing.should_seal(&self.config.segment_config)
967                    || growing.idle_secs() >= self.config.idle_seal_secs
968            } else {
969                false
970            }
971        };
972
973        if should_seal {
974            self.seal_current()?;
975        }
976
977        Ok(())
978    }
979
980    /// Seal the current growing segment
981    pub fn seal_current(&self) -> Result<SegmentId, SegmentError> {
982        let growing_opt = self.growing.write().take();
983
984        if let Some(growing_arc) = growing_opt {
985            let mut growing = growing_arc.write();
986            let seg_id = growing.id();
987            let entity_count = growing.stats().entity_count as u64;
988
989            // Seal it
990            growing.seal()?;
991
992            // Move to sealed list (we need to extract it from the Arc)
993            drop(growing); // Release write lock
994
995            // In a real implementation, we'd convert to SealedSegment here
996            // For now, we keep it as-is since GrowingSegment implements UnifiedSegment
997            self.sealed.write().push(growing_arc);
998
999            // Mark sealed segment pages all-visible — they're now immutable
1000            self.mark_sealed_pages_visible(entity_count);
1001
1002            // Update stats
1003            {
1004                let mut stats = self.stats.write();
1005                stats.growing_count = stats.growing_count.saturating_sub(1);
1006                stats.sealed_count += 1;
1007                stats.seal_ops += 1;
1008            }
1009
1010            self.emit(LifecycleEvent::SegmentSealed(seg_id));
1011
1012            return Ok(seg_id);
1013        }
1014
1015        Err(SegmentError::InvalidState(SegmentState::Sealed))
1016    }
1017
1018    /// Force seal (for testing/manual control)
1019    pub fn force_seal(&self) -> Result<Option<SegmentId>, SegmentError> {
1020        let has_growing = self.growing.read().is_some();
1021        if has_growing {
1022            Ok(Some(self.seal_current()?))
1023        } else {
1024            Ok(None)
1025        }
1026    }
1027
1028    /// Fraction of "pages" in sealed segments that are marked all-visible.
1029    ///
1030    /// Sealed segments are immutable so all their rows are safe for
1031    /// index-only scans. The growing segment is never counted (writes
1032    /// may be in-flight). Uses `rows_per_page = 256` (matching 8 KB pages
1033    /// with ~32-byte rows).
1034    ///
1035    /// Returns a value in `[0.0, 1.0]`. 1.0 when all sealed rows are
1036    /// visible; 0.0 when there are no sealed segments.
1037    pub fn all_visible_fraction(&self) -> f64 {
1038        const ROWS_PER_PAGE: u32 = 256;
1039        let sealed = self.sealed.read();
1040        if sealed.is_empty() {
1041            return 0.0;
1042        }
1043        let mut total_pages: u64 = 0;
1044        for seg_arc in sealed.iter() {
1045            let seg = seg_arc.read();
1046            let entity_count = seg.stats().entity_count as u64;
1047            let pages = entity_count.div_ceil(ROWS_PER_PAGE as u64);
1048            total_pages += pages;
1049        }
1050        if total_pages == 0 {
1051            return 0.0;
1052        }
1053        let visible = self.visibility_map.all_visible_count();
1054        (visible as f64 / total_pages as f64).min(1.0)
1055    }
1056
1057    /// Mark all pages of newly sealed segments as all-visible in the
1058    /// visibility map. Called internally after `seal_current`.
1059    fn mark_sealed_pages_visible(&self, seg_entity_count: u64) {
1060        const ROWS_PER_PAGE: u32 = 256;
1061        let existing_visible = self.visibility_map.all_visible_count();
1062        // Append pages starting after the last known visible page
1063        let start_page = existing_visible as u32;
1064        let new_pages = seg_entity_count.div_ceil(ROWS_PER_PAGE as u64);
1065        let end_page = start_page + new_pages as u32;
1066        self.visibility_map.mark_range_visible(start_page, end_page);
1067    }
1068
1069    /// Iterate over all entities in-place without collecting into a Vec.
1070    ///
1071    /// The callback receives a reference to each entity. Return `true` to
1072    /// continue iteration, `false` to stop early (e.g. when a LIMIT is reached).
1073    /// This avoids the allocation and cloning overhead of `query_all`.
1074    pub fn for_each_entity<F>(&self, mut callback: F)
1075    where
1076        F: FnMut(&UnifiedEntity) -> bool,
1077    {
1078        // Growing segment — direct iteration (no Box<dyn>)
1079        // Try non-blocking read first; fall back to blocking only when a writer
1080        // is actively holding the write lock (rare in read-heavy workloads).
1081        if let Some(growing_arc) = self.growing.read().as_ref() {
1082            let growing = if let Some(g) = growing_arc.try_read() {
1083                g
1084            } else {
1085                growing_arc.read()
1086            };
1087            if !growing.for_each_fast(&mut callback) {
1088                return;
1089            }
1090        }
1091
1092        // Sealed segments
1093        let sealed = self.sealed.read();
1094        for segment_arc in sealed.iter() {
1095            let segment = segment_arc.read();
1096            if !segment.for_each_fast(&mut callback) {
1097                return;
1098            }
1099        }
1100    }
1101
1102    /// Parallel fold across all entities. Each sealed segment is
1103    /// processed on its own rayon task; the growing segment stays on
1104    /// the caller thread (its read lock is briefly held).
1105    ///
1106    /// - `init` builds a fresh accumulator per thread.
1107    /// - `fold` mutates an accumulator with one entity at a time.
1108    /// - `reduce` combines two accumulators into one.
1109    ///
1110    /// The returned value is the reduction of every per-thread
1111    /// accumulator. Use this for aggregate-shape workloads (GROUP BY)
1112    /// where per-thread partial state can be merged cheaply.
1113    ///
1114    /// NOTE: when there are 0 or 1 sealed segments, the parallel path
1115    /// is skipped and the work runs sequentially to avoid rayon
1116    /// overhead on tiny tables.
1117    pub fn fold_entities_parallel<T, FInit, FFold, FReduce>(
1118        &self,
1119        init: FInit,
1120        fold: FFold,
1121        reduce: FReduce,
1122    ) -> T
1123    where
1124        T: Send,
1125        FInit: Fn() -> T + Send + Sync,
1126        FFold: Fn(T, &UnifiedEntity) -> T + Send + Sync,
1127        FReduce: Fn(T, T) -> T + Send + Sync,
1128    {
1129        use rayon::prelude::*;
1130
1131        // Growing segment — always sequential (single writer lock,
1132        // usually small working set).
1133        let mut acc = init();
1134        if let Some(growing_arc) = self.growing.read().as_ref() {
1135            let growing = if let Some(g) = growing_arc.try_read() {
1136                g
1137            } else {
1138                growing_arc.read()
1139            };
1140            growing.for_each_fast(|entity| {
1141                acc = fold(std::mem::replace(&mut acc, init()), entity);
1142                true
1143            });
1144        }
1145
1146        // Sealed segments — snapshot the Arc list under the read lock,
1147        // then drop the lock so rayon workers can fan out without
1148        // blocking writers.
1149        let segments: Vec<_> = {
1150            let sealed = self.sealed.read();
1151            sealed.iter().cloned().collect()
1152        };
1153
1154        if segments.len() <= 1 {
1155            for seg_arc in &segments {
1156                let seg = seg_arc.read();
1157                seg.for_each_fast(|entity| {
1158                    acc = fold(std::mem::replace(&mut acc, init()), entity);
1159                    true
1160                });
1161            }
1162            return acc;
1163        }
1164
1165        let sealed_acc = segments
1166            .into_par_iter()
1167            .map(|seg_arc| {
1168                let mut local = init();
1169                let seg = seg_arc.read();
1170                seg.for_each_fast(|entity| {
1171                    local = fold(std::mem::replace(&mut local, init()), entity);
1172                    true
1173                });
1174                local
1175            })
1176            .reduce(&init, &reduce);
1177
1178        reduce(acc, sealed_acc)
1179    }
1180
1181    /// Zone-map-aware iteration across all segments.
1182    ///
1183    /// Like `for_each_entity`, but checks `zone_preds` against each segment's
1184    /// column zone maps before iterating. Segments where any predicate can
1185    /// definitively prove no rows match are skipped entirely.
1186    ///
1187    /// `zone_preds`: slice of `(column_name, ZoneColPred)` extracted from the WHERE clause.
1188    /// Empty slice → same behaviour as `for_each_entity` (no pruning).
1189    pub fn for_each_entity_zoned<F>(&self, zone_preds: &[(&str, ZoneColPred<'_>)], mut callback: F)
1190    where
1191        F: FnMut(&UnifiedEntity) -> bool,
1192    {
1193        // Growing segment — never skip (it's receiving writes, zones are partial).
1194        // Try a non-blocking read first: if a writer is currently inserting
1195        // (holding the write lock), try_read() returns None and we fall back to
1196        // the blocking read.  In low-contention workloads (reads far outnumber
1197        // writes) the try_read() almost always succeeds and readers never stall.
1198        if let Some(growing_arc) = self.growing.read().as_ref() {
1199            let growing = if let Some(g) = growing_arc.try_read() {
1200                g
1201            } else {
1202                growing_arc.read()
1203            };
1204            if !growing.for_each_fast(&mut callback) {
1205                return;
1206            }
1207        }
1208
1209        // Sealed segments — check zone maps before iterating
1210        let sealed = self.sealed.read();
1211        for segment_arc in sealed.iter() {
1212            let segment = segment_arc.read();
1213            if !zone_preds.is_empty() && segment.can_skip_zone_preds(zone_preds) {
1214                continue; // entire segment pruned
1215            }
1216            if !segment.for_each_fast(&mut callback) {
1217                return;
1218            }
1219        }
1220    }
1221
1222    /// Zone-map-aware parallel query.
1223    ///
1224    /// Like `query_all` but applies `zone_preds` on the main thread to
1225    /// prune sealed segments before spawning workers — segments that
1226    /// provably contain no matching rows are skipped entirely.
1227    ///
1228    /// Zone check runs single-threaded (it reads per-segment metadata,
1229    /// not row data), so it's cheap. Surviving segments are then scanned
1230    /// in parallel using `std::thread::scope` when there are > 1 of them.
1231    pub fn query_all_zoned<F>(
1232        &self,
1233        zone_preds: &[(&str, ZoneColPred<'_>)],
1234        filter: F,
1235    ) -> Vec<UnifiedEntity>
1236    where
1237        F: Fn(&UnifiedEntity) -> bool + Sync,
1238    {
1239        let mut results = Vec::new();
1240
1241        // Growing segment — always scan, no zone skip (zones are partial).
1242        // Non-blocking try_read() avoids stalling behind in-progress inserts.
1243        if let Some(growing_arc) = self.growing.read().as_ref() {
1244            let growing = if let Some(g) = growing_arc.try_read() {
1245                g
1246            } else {
1247                growing_arc.read()
1248            };
1249            results.extend(growing.iter().filter(|e| filter(e)).cloned());
1250        }
1251
1252        // Sealed segments: zone-prune on main thread, then scan in parallel.
1253        let sealed = self.sealed.read();
1254        // Collect only the segments that survive zone-predicate pruning.
1255        let surviving: Vec<_> = sealed
1256            .iter()
1257            .filter(|seg_arc| {
1258                if zone_preds.is_empty() {
1259                    return true;
1260                }
1261                let seg = seg_arc.read();
1262                !seg.can_skip_zone_preds(zone_preds)
1263            })
1264            .collect();
1265
1266        let use_parallel = surviving.len() > 1 && crate::runtime::SystemInfo::should_parallelize();
1267
1268        if use_parallel {
1269            let filter_ref = &filter;
1270            let segment_results: Vec<Vec<UnifiedEntity>> = std::thread::scope(|s| {
1271                surviving
1272                    .iter()
1273                    .map(|segment| {
1274                        s.spawn(move || {
1275                            segment
1276                                .read()
1277                                .iter()
1278                                .filter(|e| filter_ref(e))
1279                                .cloned()
1280                                .collect::<Vec<_>>()
1281                        })
1282                    })
1283                    .collect::<Vec<_>>()
1284                    .into_iter()
1285                    .map(|handle| handle.join().unwrap_or_default())
1286                    .collect()
1287            });
1288            for batch in segment_results {
1289                results.extend(batch);
1290            }
1291        } else {
1292            for segment_arc in surviving {
1293                let seg = segment_arc.read();
1294                results.extend(seg.iter().filter(|e| filter(e)).cloned());
1295            }
1296        }
1297
1298        results
1299    }
1300
1301    /// Query across all segments. Uses parallel scanning for sealed segments
1302    /// when more than one sealed segment exists.
1303    pub fn query_all<F>(&self, filter: F) -> Vec<UnifiedEntity>
1304    where
1305        F: Fn(&UnifiedEntity) -> bool + Sync,
1306    {
1307        let mut results = Vec::new();
1308
1309        // Query growing segment — try non-blocking read first (avoids stalling
1310        // behind an in-progress insert; falls back to blocking if writer is active).
1311        if let Some(growing_arc) = self.growing.read().as_ref() {
1312            let growing = if let Some(g) = growing_arc.try_read() {
1313                g
1314            } else {
1315                growing_arc.read()
1316            };
1317            results.extend(growing.iter().filter(|e| filter(e)).cloned());
1318        }
1319
1320        // Query sealed segments — parallel when multiple exist AND multi-core
1321        let sealed = self.sealed.read();
1322        let use_parallel = sealed.len() > 1 && crate::runtime::SystemInfo::should_parallelize();
1323        if use_parallel {
1324            let filter_ref = &filter;
1325            let segment_results: Vec<Vec<UnifiedEntity>> = std::thread::scope(|s| {
1326                sealed
1327                    .iter()
1328                    .map(|segment| {
1329                        s.spawn(move || {
1330                            segment
1331                                .read()
1332                                .iter()
1333                                .filter(|e| filter_ref(e))
1334                                .cloned()
1335                                .collect::<Vec<_>>()
1336                        })
1337                    })
1338                    .collect::<Vec<_>>()
1339                    .into_iter()
1340                    .map(|handle| handle.join().unwrap_or_default())
1341                    .collect()
1342            });
1343            for batch in segment_results {
1344                results.extend(batch);
1345            }
1346        } else {
1347            for segment in sealed.iter() {
1348                let seg = segment.read();
1349                results.extend(seg.iter().filter(|e| filter(e)).cloned());
1350            }
1351        }
1352
1353        results
1354    }
1355
1356    /// Query with bloom filter hint: skip the growing segment when bloom says key is absent.
1357    ///
1358    /// This is the integration point for bloom filter pruning.
1359    /// When a query has an equality predicate on a known key, the executor
1360    /// can call this instead of `query_all` to avoid scanning when the
1361    /// bloom filter proves the key doesn't exist.
1362    ///
1363    /// Returns (results, bloom_pruned) where bloom_pruned indicates if the
1364    /// segment was skipped.
1365    pub fn query_with_bloom_hint<F>(
1366        &self,
1367        key_hint: Option<&[u8]>,
1368        filter: F,
1369    ) -> (Vec<UnifiedEntity>, bool)
1370    where
1371        F: Fn(&UnifiedEntity) -> bool,
1372    {
1373        let mut results = Vec::new();
1374        let mut bloom_pruned = false;
1375
1376        if let Some(growing_arc) = self.growing.read().as_ref() {
1377            let growing = growing_arc.read();
1378            if let Some(key) = key_hint {
1379                if !growing.bloom_might_contain_key(key) {
1380                    bloom_pruned = true;
1381                    return (results, bloom_pruned);
1382                }
1383            }
1384            for entity in growing.iter() {
1385                if filter(entity) {
1386                    results.push(entity.clone());
1387                }
1388            }
1389        }
1390
1391        // Sealed segments (currently empty iter, but future-proofed)
1392        let sealed = self.sealed.read();
1393        for segment_arc in sealed.iter() {
1394            let segment = segment_arc.read();
1395            if let Some(key) = key_hint {
1396                if !segment.bloom_might_contain_key(key) {
1397                    bloom_pruned = true;
1398                    continue;
1399                }
1400            }
1401            for entity in segment.iter() {
1402                if filter(entity) {
1403                    results.push(entity.clone());
1404                }
1405            }
1406        }
1407
1408        (results, bloom_pruned)
1409    }
1410
1411    /// Filter by metadata across all segments
1412    pub fn filter_metadata(&self, filters: &[(String, MetadataFilter)]) -> Vec<EntityId> {
1413        let mut results = Vec::new();
1414
1415        // Growing segment
1416        if let Some(growing_arc) = self.growing.read().as_ref() {
1417            let growing = growing_arc.read();
1418            results.extend(growing.filter_metadata(filters));
1419        }
1420
1421        // Sealed segments
1422        let sealed = self.sealed.read();
1423        for segment in sealed.iter() {
1424            results.extend(segment.filter_metadata(filters));
1425        }
1426
1427        results
1428    }
1429
1430    /// Get entities by kind
1431    pub fn get_by_kind(&self, kind: &str) -> Vec<UnifiedEntity> {
1432        let mut results = Vec::new();
1433
1434        // Growing segment
1435        if let Some(growing_arc) = self.growing.read().as_ref() {
1436            let growing = growing_arc.read();
1437            for entity in growing.iter_kind(kind) {
1438                results.push(entity.clone());
1439            }
1440        }
1441
1442        // Sealed segments
1443        let sealed = self.sealed.read();
1444        for segment in sealed.iter() {
1445            for entity in segment.iter_kind(kind) {
1446                results.push(entity.clone());
1447            }
1448        }
1449
1450        results
1451    }
1452
1453    /// Count entities
1454    pub fn count(&self) -> usize {
1455        self.total_entities_atomic.load(Ordering::Relaxed) as usize
1456    }
1457
1458    /// Get all segment IDs
1459    pub fn segment_ids(&self) -> Vec<SegmentId> {
1460        let mut ids = Vec::new();
1461
1462        if let Some(growing_arc) = self.growing.read().as_ref() {
1463            ids.push(growing_arc.read().id());
1464        }
1465
1466        let sealed = self.sealed.read();
1467        for segment in sealed.iter() {
1468            ids.push(segment.id());
1469        }
1470
1471        ids.extend(self.archived.read().iter().copied());
1472
1473        ids
1474    }
1475
1476    /// Emit a lifecycle event.
1477    ///
1478    /// Perf: this used to push onto a `RwLock<Vec<LifecycleEvent>>`
1479    /// on every insert / delete / seal. Nobody consumes that vec
1480    /// today (no subscription API, `drain_events` has no callers),
1481    /// so the write lock + push was pure tax — and the vec grew
1482    /// unbounded in long-running processes.
1483    ///
1484    /// Current behaviour: no-op. If we ever want the hooks back,
1485    /// replace this with a bounded channel or an actual subscriber
1486    /// registry; the callers (`insert`, `delete`, `maybe_seal_growing`)
1487    /// already pass well-typed events.
1488    #[inline]
1489    #[allow(clippy::unused_self)]
1490    fn emit(&self, _event: LifecycleEvent) {}
1491
1492    /// Drain events. Kept for API compatibility; always returns
1493    /// empty because `emit` no longer buffers.
1494    pub fn drain_events(&self) -> Vec<LifecycleEvent> {
1495        std::mem::take(&mut *self.events.write())
1496    }
1497
1498    /// Run maintenance (would be called periodically in production)
1499    pub fn run_maintenance(&self) -> Result<(), SegmentError> {
1500        // Auto-seal idle segments
1501        self.maybe_seal_growing()?;
1502
1503        // Compact if too many sealed segments
1504        if self.config.enable_compaction {
1505            let sealed_count = self.sealed.read().len();
1506            if sealed_count > self.config.max_sealed_segments {
1507                // In production, we'd trigger background compaction here
1508                // For now, just log that compaction is needed
1509            }
1510        }
1511
1512        Ok(())
1513    }
1514}
1515
1516// Implement the Arc<RwLock<GrowingSegment>> as UnifiedSegment
1517// This is needed because we store growing segments in the sealed list after sealing
1518impl UnifiedSegment for Arc<RwLock<GrowingSegment>> {
1519    fn id(&self) -> SegmentId {
1520        self.read().id()
1521    }
1522
1523    fn state(&self) -> SegmentState {
1524        self.read().state()
1525    }
1526
1527    fn collection(&self) -> &str {
1528        // This is a limitation - we'd need to store collection in the Arc wrapper
1529        "unknown"
1530    }
1531
1532    fn stats(&self) -> SegmentStats {
1533        self.read().stats()
1534    }
1535
1536    fn entity_count(&self) -> usize {
1537        self.read().entity_count()
1538    }
1539
1540    fn contains(&self, id: EntityId) -> bool {
1541        self.read().contains(id)
1542    }
1543
1544    fn get(&self, id: EntityId) -> Option<&UnifiedEntity> {
1545        // This is tricky with RwLock - we can't return a reference
1546        // In production, we'd use a different approach
1547        None
1548    }
1549
1550    fn get_mut(&mut self, _id: EntityId) -> Option<&mut UnifiedEntity> {
1551        None
1552    }
1553
1554    fn insert(&mut self, entity: UnifiedEntity) -> Result<EntityId, SegmentError> {
1555        self.write().insert(entity)
1556    }
1557
1558    fn update(&mut self, entity: UnifiedEntity) -> Result<(), SegmentError> {
1559        self.write().update(entity)
1560    }
1561
1562    fn update_hot(
1563        &mut self,
1564        entity: UnifiedEntity,
1565        modified_columns: &[String],
1566    ) -> Result<(), SegmentError> {
1567        self.write().update_hot(entity, modified_columns)
1568    }
1569
1570    fn delete(&mut self, id: EntityId) -> Result<bool, SegmentError> {
1571        self.write().delete(id)
1572    }
1573
1574    fn get_metadata(&self, id: EntityId) -> Option<Metadata> {
1575        self.read().get_metadata(id)
1576    }
1577
1578    fn set_metadata(&mut self, id: EntityId, metadata: Metadata) -> Result<(), SegmentError> {
1579        self.write().set_metadata(id, metadata)
1580    }
1581
1582    fn seal(&mut self) -> Result<(), SegmentError> {
1583        self.write().seal()
1584    }
1585
1586    fn should_seal(&self, config: &SegmentConfig) -> bool {
1587        self.read().should_seal(config)
1588    }
1589
1590    fn iter(&self) -> Box<dyn Iterator<Item = &UnifiedEntity> + '_> {
1591        // Cannot return iterator with RwLock
1592        Box::new(std::iter::empty())
1593    }
1594
1595    fn iter_kind(&self, _kind_filter: &str) -> Box<dyn Iterator<Item = &UnifiedEntity> + '_> {
1596        Box::new(std::iter::empty())
1597    }
1598
1599    fn filter_metadata(&self, filters: &[(String, MetadataFilter)]) -> Vec<EntityId> {
1600        self.read().filter_metadata(filters)
1601    }
1602}
1603
1604#[cfg(test)]
1605mod tests {
1606    use super::*;
1607    use crate::storage::schema::Value;
1608
1609    #[test]
1610    fn test_manager_basic() {
1611        let manager = SegmentManager::new("test_collection");
1612
1613        let entity = UnifiedEntity::table_row(
1614            manager.next_entity_id(),
1615            "users",
1616            1,
1617            vec![Value::text("Alice".to_string())],
1618        );
1619
1620        let id = manager.insert(entity).unwrap();
1621        assert!(manager.get(id).is_some());
1622        assert_eq!(manager.count(), 1);
1623    }
1624
1625    #[test]
1626    fn test_manager_auto_seal() {
1627        let config = ManagerConfig {
1628            segment_config: SegmentConfig {
1629                max_entities: 2,
1630                ..Default::default()
1631            },
1632            ..Default::default()
1633        };
1634
1635        let manager = SegmentManager::with_config("test", config);
1636
1637        // Insert first entity
1638        manager
1639            .insert(UnifiedEntity::vector(
1640                manager.next_entity_id(),
1641                "v",
1642                vec![0.1],
1643            ))
1644            .unwrap();
1645
1646        // Insert second entity (triggers seal check)
1647        manager
1648            .insert(UnifiedEntity::vector(
1649                manager.next_entity_id(),
1650                "v",
1651                vec![0.2],
1652            ))
1653            .unwrap();
1654
1655        // Insert third entity (should trigger auto-seal of first segment)
1656        manager
1657            .insert(UnifiedEntity::vector(
1658                manager.next_entity_id(),
1659                "v",
1660                vec![0.3],
1661            ))
1662            .unwrap();
1663
1664        let stats = manager.stats();
1665        assert_eq!(stats.total_entities, 3);
1666    }
1667
1668    #[test]
1669    fn test_manager_delete() {
1670        let manager = SegmentManager::new("test");
1671
1672        let id = manager
1673            .insert(UnifiedEntity::vector(
1674                manager.next_entity_id(),
1675                "v",
1676                vec![0.1],
1677            ))
1678            .unwrap();
1679
1680        assert!(manager.get(id).is_some());
1681        assert!(manager.delete(id).unwrap());
1682        assert!(manager.get(id).is_none());
1683    }
1684
1685    #[test]
1686    fn test_manager_metadata() {
1687        let manager = SegmentManager::new("test");
1688
1689        let id = manager
1690            .insert(UnifiedEntity::table_row(
1691                manager.next_entity_id(),
1692                "hosts",
1693                1,
1694                vec![Value::text("192.168.1.1".to_string())],
1695            ))
1696            .unwrap();
1697
1698        let mut meta = Metadata::new();
1699        meta.set(
1700            "os",
1701            super::super::metadata::MetadataValue::String("linux".to_string()),
1702        );
1703
1704        manager.set_metadata(id, meta).unwrap();
1705
1706        let retrieved = manager.get_metadata(id).unwrap();
1707        assert!(retrieved.has("os"));
1708    }
1709
1710    #[test]
1711    fn test_manager_query_by_kind() {
1712        let manager = SegmentManager::new("test");
1713
1714        manager
1715            .insert(UnifiedEntity::table_row(
1716                manager.next_entity_id(),
1717                "hosts",
1718                1,
1719                vec![],
1720            ))
1721            .unwrap();
1722
1723        manager
1724            .insert(UnifiedEntity::vector(
1725                manager.next_entity_id(),
1726                "embeddings",
1727                vec![0.1],
1728            ))
1729            .unwrap();
1730
1731        manager
1732            .insert(UnifiedEntity::table_row(
1733                manager.next_entity_id(),
1734                "hosts",
1735                2,
1736                vec![],
1737            ))
1738            .unwrap();
1739
1740        let rows = manager.get_by_kind("table");
1741        assert_eq!(rows.len(), 2);
1742
1743        let vectors = manager.get_by_kind("vector");
1744        assert_eq!(vectors.len(), 1);
1745    }
1746
1747    #[test]
1748    #[ignore = "lifecycle events intentionally no-op since the emit-channel refactor; drain_events returns empty — see SegmentManager::emit"]
1749    fn test_lifecycle_events() {
1750        let manager = SegmentManager::new("test");
1751
1752        manager
1753            .insert(UnifiedEntity::vector(
1754                manager.next_entity_id(),
1755                "v",
1756                vec![0.1],
1757            ))
1758            .unwrap();
1759
1760        let events = manager.drain_events();
1761
1762        // Should have: SegmentCreated, EntityInserted
1763        assert!(events
1764            .iter()
1765            .any(|e| matches!(e, LifecycleEvent::SegmentCreated(_))));
1766        assert!(events
1767            .iter()
1768            .any(|e| matches!(e, LifecycleEvent::EntityInserted(_, _))));
1769    }
1770}