1use parking_lot::RwLock;
16use std::collections::HashMap;
17use std::sync::atomic::{AtomicU64, Ordering};
18use std::sync::Arc;
19
20use super::entity::{EntityId, UnifiedEntity};
21use super::metadata::{Metadata, MetadataFilter};
22use super::segment::{
23 GrowingSegment, SegmentConfig, SegmentError, SegmentId, SegmentState, SegmentStats,
24 UnifiedSegment, ZoneColPred, ZoneColPredKind,
25};
26use crate::storage::btree::visibility_map::VisibilityMap;
27
28#[derive(Debug, Clone)]
30pub struct ManagerConfig {
31 pub segment_config: SegmentConfig,
33 pub max_sealed_segments: usize,
35 pub idle_seal_secs: u64,
37 pub enable_compaction: bool,
39 pub enable_archival: bool,
41 pub archive_age_secs: u64,
43}
44
45impl Default for ManagerConfig {
46 fn default() -> Self {
47 Self {
48 segment_config: SegmentConfig::default(),
49 max_sealed_segments: 10,
50 idle_seal_secs: 300, enable_compaction: true,
52 enable_archival: true,
53 archive_age_secs: 86400 * 7, }
55 }
56}
57
58#[derive(Debug, Clone, Default)]
60pub struct ManagerStats {
61 pub total_entities: usize,
63 pub growing_count: usize,
65 pub sealed_count: usize,
67 pub archived_count: usize,
69 pub total_memory_bytes: usize,
71 pub seal_ops: u64,
73 pub compact_ops: u64,
75}
76
77#[derive(Debug, Clone)]
79pub enum LifecycleEvent {
80 SegmentCreated(SegmentId),
81 SegmentSealed(SegmentId),
82 SegmentCompacted {
83 source: Vec<SegmentId>,
84 target: SegmentId,
85 },
86 SegmentArchived(SegmentId),
87 EntityInserted(EntityId, SegmentId),
88 EntityDeleted(EntityId, SegmentId),
89}
90
91pub struct SegmentManager {
93 collection: String,
95 config: ManagerConfig,
97 next_segment_id: AtomicU64,
99 next_entity_id: AtomicU64,
101 next_row_id: AtomicU64,
103 total_entities_atomic: AtomicU64,
107 growing: RwLock<Option<Arc<RwLock<GrowingSegment>>>>,
109 sealed: RwLock<Vec<Arc<RwLock<GrowingSegment>>>>,
111 archived: RwLock<Vec<SegmentId>>,
113 entity_segment: RwLock<HashMap<EntityId, SegmentId>>,
117 column_schema: RwLock<Option<Arc<Vec<String>>>>,
120 stats: RwLock<ManagerStats>,
122 events: RwLock<Vec<LifecycleEvent>>,
124 visibility_map: VisibilityMap,
128}
129
130impl SegmentManager {
131 pub fn new(collection: impl Into<String>) -> Self {
133 Self::with_config(collection, ManagerConfig::default())
134 }
135
136 pub fn with_config(collection: impl Into<String>, config: ManagerConfig) -> Self {
138 Self {
139 collection: collection.into(),
140 config,
141 next_segment_id: AtomicU64::new(1),
142 next_entity_id: AtomicU64::new(1),
143 next_row_id: AtomicU64::new(1),
144 total_entities_atomic: AtomicU64::new(0),
145 growing: RwLock::new(None),
146 sealed: RwLock::new(Vec::new()),
147 archived: RwLock::new(Vec::new()),
148 entity_segment: RwLock::new(HashMap::new()),
149 column_schema: RwLock::new(None),
150 stats: RwLock::new(ManagerStats::default()),
151 events: RwLock::new(Vec::new()),
152 visibility_map: VisibilityMap::new(),
153 }
154 }
155
156 pub fn get_or_init_schema(
158 &self,
159 named: &HashMap<String, crate::storage::schema::Value>,
160 ) -> Arc<Vec<String>> {
161 {
162 let schema = self.column_schema.read();
163 if let Some(ref s) = *schema {
164 return Arc::clone(s);
165 }
166 }
167 let cols: Vec<String> = named.keys().cloned().collect();
168 let arc = Arc::new(cols);
169 *self.column_schema.write() = Some(Arc::clone(&arc));
170 arc
171 }
172
173 pub fn column_schema(&self) -> Option<Arc<Vec<String>>> {
175 self.column_schema.read().clone()
176 }
177
178 pub(crate) fn set_column_schema_if_empty(&self, columns: Vec<String>) {
179 if columns.is_empty() {
180 return;
181 }
182 let mut schema = self.column_schema.write();
183 if schema.is_none() {
184 *schema = Some(Arc::new(columns));
185 }
186 }
187
188 pub fn collection(&self) -> &str {
190 &self.collection
191 }
192
193 pub fn config(&self) -> &ManagerConfig {
195 &self.config
196 }
197
198 pub fn stats(&self) -> ManagerStats {
201 let mut s = self.stats.read().clone();
202 s.total_entities = self.total_entities_atomic.load(Ordering::Relaxed) as usize;
203 s
204 }
205
206 pub fn next_entity_id(&self) -> EntityId {
208 EntityId::new(self.next_entity_id.fetch_add(1, Ordering::SeqCst))
209 }
210
211 pub fn next_row_id(&self) -> u64 {
213 self.next_row_id.fetch_add(1, Ordering::SeqCst)
214 }
215
216 pub fn reserve_row_ids(&self, n: u64) -> std::ops::Range<u64> {
220 let start = self.next_row_id.fetch_add(n, Ordering::SeqCst);
221 start..start + n
222 }
223
224 pub fn register_row_id(&self, id: u64) {
227 let candidate = id.saturating_add(1);
228 let mut current = self.next_row_id.load(Ordering::SeqCst);
229 while candidate > current {
230 match self.next_row_id.compare_exchange(
231 current,
232 candidate,
233 Ordering::SeqCst,
234 Ordering::SeqCst,
235 ) {
236 Ok(_) => break,
237 Err(updated) => current = updated,
238 }
239 }
240 }
241
242 fn get_or_create_growing(&self) -> Arc<RwLock<GrowingSegment>> {
249 {
251 let growing = self.growing.read();
252 if let Some(segment) = growing.as_ref() {
253 return Arc::clone(segment);
254 }
255 }
256
257 let mut growing = self.growing.write();
259 if let Some(segment) = growing.as_ref() {
261 return Arc::clone(segment);
262 }
263
264 let id = self.next_segment_id.fetch_add(1, Ordering::SeqCst);
265 let segment = GrowingSegment::new(id, &self.collection);
266 let segment_arc = Arc::new(RwLock::new(segment));
267 *growing = Some(Arc::clone(&segment_arc));
268
269 self.emit(LifecycleEvent::SegmentCreated(id));
270
271 self.stats.write().growing_count += 1;
274
275 segment_arc
276 }
277
278 pub fn insert(&self, mut entity: UnifiedEntity) -> Result<EntityId, SegmentError> {
280 self.maybe_seal_growing()?;
282
283 let segment_arc = self.get_or_create_growing();
284 let mut segment = segment_arc.write();
285
286 if entity.id.raw() == 0 {
288 entity.id = self.next_entity_id();
289 }
290
291 let entity_id = entity.id;
292 let segment_id = segment.id();
293
294 segment.insert(entity)?;
295
296 self.total_entities_atomic.fetch_add(1, Ordering::Relaxed);
298
299 self.emit(LifecycleEvent::EntityInserted(entity_id, segment_id));
308
309 Ok(entity_id)
310 }
311
312 pub fn insert_batch(
314 &self,
315 entities: Vec<UnifiedEntity>,
316 ) -> Result<Vec<EntityId>, SegmentError> {
317 let mut ids = Vec::with_capacity(entities.len());
318 for entity in entities {
319 ids.push(self.insert(entity)?);
320 }
321 Ok(ids)
322 }
323
324 pub fn bulk_insert(
327 &self,
328 mut entities: Vec<UnifiedEntity>,
329 ) -> Result<Vec<EntityId>, SegmentError> {
330 for entity in &mut entities {
332 if entity.id.raw() == 0 {
333 entity.id = self.next_entity_id();
334 }
335 if let super::entity::EntityKind::TableRow { ref mut row_id, .. } = entity.kind {
336 if *row_id == 0 {
337 *row_id = self.next_row_id();
338 } else {
339 self.register_row_id(*row_id);
340 }
341 }
342 }
343
344 if let Some(first_row) = entities.first() {
347 if let super::entity::EntityData::Row(ref row) = first_row.data {
348 if let Some(ref named) = row.named {
349 let schema = self.get_or_init_schema(named);
350 for entity in &mut entities {
351 if let super::entity::EntityData::Row(ref mut row) = entity.data {
352 if let Some(named) = row.named.take() {
353 let mut cols = Vec::with_capacity(schema.len());
354 for col_name in schema.iter() {
355 cols.push(
356 named
357 .get(col_name)
358 .cloned()
359 .unwrap_or(crate::storage::schema::Value::Null),
360 );
361 }
362 row.columns = cols;
363 row.schema = Some(Arc::clone(&schema));
364 }
365 }
366 }
367 }
368 }
369 }
370
371 let segment_arc = self.get_or_create_growing();
372 let mut segment = segment_arc.write();
373 let segment_id = segment.id();
374
375 let ids = segment.bulk_insert(entities)?;
377
378 self.total_entities_atomic
383 .fetch_add(ids.len() as u64, Ordering::Relaxed);
384
385 Ok(ids)
386 }
387
388 pub fn get(&self, id: EntityId) -> Option<UnifiedEntity> {
390 if let Some(growing_arc) = self.growing.read().as_ref() {
392 let growing = growing_arc.read();
393 if let Some(entity) = growing.get(id) {
394 return Some(entity.clone());
395 }
396 }
397
398 let sealed = self.sealed.read();
400 for segment in sealed.iter() {
401 let seg = segment.read();
402 if let Some(entity) = seg.get(id) {
403 return Some(entity.clone());
404 }
405 }
406
407 None
408 }
409
410 pub fn get_many(&self, ids: &[EntityId]) -> Vec<Option<UnifiedEntity>> {
415 let mut out: Vec<Option<UnifiedEntity>> = vec![None; ids.len()];
416 let mut remaining: Vec<usize> = (0..ids.len()).collect(); if let Some(growing_arc) = self.growing.read().as_ref() {
421 let growing = if let Some(g) = growing_arc.try_read() {
422 g
423 } else {
424 growing_arc.read()
425 };
426 remaining.retain(|&i| {
427 if let Some(entity) = growing.get(ids[i]) {
428 out[i] = Some(entity.clone());
429 false } else {
431 true }
433 });
434 }
435
436 if remaining.is_empty() {
437 return out;
438 }
439
440 let sealed = self.sealed.read();
442 for segment in sealed.iter() {
443 if remaining.is_empty() {
444 break;
445 }
446 let seg = segment.read();
447 remaining.retain(|&i| {
448 if let Some(entity) = seg.get(ids[i]) {
449 out[i] = Some(entity.clone());
450 false
451 } else {
452 true
453 }
454 });
455 }
456
457 out
458 }
459
460 pub fn for_each_id<F>(&self, ids: &[EntityId], mut f: F)
473 where
474 F: FnMut(usize, &UnifiedEntity),
475 {
476 thread_local! {
485 static REMAINING_SCRATCH: std::cell::Cell<Vec<usize>> =
486 const { std::cell::Cell::new(Vec::new()) };
487 }
488
489 let mut remaining: Vec<usize> = REMAINING_SCRATCH.with(|cell| cell.take());
490 remaining.clear();
491
492 if let Some(growing_arc) = self.growing.read().as_ref() {
493 let growing = if let Some(g) = growing_arc.try_read() {
494 g
495 } else {
496 growing_arc.read()
497 };
498 for (i, id) in ids.iter().enumerate() {
499 if let Some(entity) = growing.get(*id) {
500 f(i, entity);
501 } else {
502 remaining.push(i);
503 }
504 }
505 } else {
506 remaining.reserve(ids.len());
507 remaining.extend(0..ids.len());
508 }
509
510 if !remaining.is_empty() {
511 let sealed = self.sealed.read();
512 for segment in sealed.iter() {
513 if remaining.is_empty() {
514 break;
515 }
516 let seg = segment.read();
517 remaining.retain(|&i| {
518 if let Some(entity) = seg.get(ids[i]) {
519 f(i, entity);
520 false
521 } else {
522 true
523 }
524 });
525 }
526 }
527
528 REMAINING_SCRATCH.with(|cell| cell.set(remaining));
529 }
530
531 fn scan_for_entity(&self, id: EntityId) -> Option<UnifiedEntity> {
533 if let Some(growing_arc) = self.growing.read().as_ref() {
535 let growing = growing_arc.read();
536 if let Some(entity) = growing.get(id) {
537 return Some(entity.clone());
538 }
539 }
540
541 let sealed = self.sealed.read();
543 for segment in sealed.iter() {
544 if let Some(entity) = segment.get(id) {
545 return Some(entity.clone());
546 }
547 }
548
549 None
550 }
551
552 fn find_sealed_segment_arc(&self, id: EntityId) -> Option<Arc<RwLock<GrowingSegment>>> {
553 let sealed = self.sealed.read();
554 sealed
555 .iter()
556 .find(|segment_arc| segment_arc.read().contains(id))
557 .map(Arc::clone)
558 }
559
560 fn rewrite_sealed_entity_into_growing(
561 &self,
562 entity: UnifiedEntity,
563 metadata: Option<&Metadata>,
564 ) -> Result<(), SegmentError> {
565 let entity_id = entity.id;
566 let sealed_arc = self
567 .find_sealed_segment_arc(entity_id)
568 .ok_or(SegmentError::NotFound(entity_id))?;
569
570 let metadata_to_apply = {
571 let mut sealed = sealed_arc.write();
572 let existing_metadata = sealed.get_metadata(entity_id);
573 if !sealed.force_delete(entity_id) {
574 return Err(SegmentError::NotFound(entity_id));
575 }
576 metadata.cloned().or(existing_metadata)
577 };
578
579 let growing_arc = self.get_or_create_growing();
580 let growing_id = {
581 let mut growing = growing_arc.write();
582 growing.insert(entity)?;
583 if let Some(metadata) = metadata_to_apply {
584 growing.set_metadata(entity_id, metadata)?;
585 }
586 growing.id()
587 };
588
589 self.entity_segment.write().insert(entity_id, growing_id);
590 Ok(())
591 }
592
593 pub fn update(&self, entity: UnifiedEntity) -> Result<(), SegmentError> {
595 let entity_id = entity.id;
596 let mut entity = Some(entity);
597
598 if let Some(growing_arc) = self.growing.read().as_ref() {
600 let mut growing = growing_arc.write();
601 if growing.contains(entity_id) && growing.state().is_writable() {
602 return growing.update(entity.take().expect("entity already moved"));
603 }
604 }
605
606 let segment_id = self.entity_segment.read().get(&entity_id).copied();
608 if let Some(seg_id) = segment_id {
609 if let Some(growing_arc) = self.growing.read().as_ref() {
610 let mut growing = growing_arc.write();
611 if growing.id() == seg_id && growing.state().is_writable() {
612 return growing.update(entity.take().expect("entity already moved"));
613 }
614 }
615 }
616
617 if let Some(entity) = entity.take() {
618 return self.rewrite_sealed_entity_into_growing(entity, None);
619 }
620
621 Err(SegmentError::NotFound(entity_id))
622 }
623
624 pub fn update_with_metadata(
627 &self,
628 entity: UnifiedEntity,
629 metadata: Option<&Metadata>,
630 ) -> Result<(), SegmentError> {
631 let entity_id = entity.id;
632 let mut entity = Some(entity);
633
634 if let Some(growing_arc) = self.growing.read().as_ref() {
636 let mut growing = growing_arc.write();
637 if growing.contains(entity_id) && growing.state().is_writable() {
638 growing.update(entity.take().expect("entity already moved"))?;
639 if let Some(metadata) = metadata {
640 growing.set_metadata(entity_id, metadata.clone())?;
641 }
642 return Ok(());
643 }
644 }
645
646 let segment_id = self.entity_segment.read().get(&entity_id).copied();
648 if let Some(seg_id) = segment_id {
649 if let Some(growing_arc) = self.growing.read().as_ref() {
650 let mut growing = growing_arc.write();
651 if growing.id() == seg_id && growing.state().is_writable() {
652 growing.update(entity.take().expect("entity already moved"))?;
653 if let Some(metadata) = metadata {
654 growing.set_metadata(entity_id, metadata.clone())?;
655 }
656 return Ok(());
657 }
658 }
659 }
660
661 if let Some(entity) = entity.take() {
662 return self.rewrite_sealed_entity_into_growing(entity, metadata);
663 }
664
665 Err(SegmentError::NotFound(entity_id))
666 }
667
668 pub fn update_hot(
672 &self,
673 entity: UnifiedEntity,
674 modified_columns: &[String],
675 ) -> Result<(), SegmentError> {
676 let entity_id = entity.id;
677 let mut entity = Some(entity);
678
679 if let Some(growing_arc) = self.growing.read().as_ref() {
680 let mut growing = growing_arc.write();
681 if growing.contains(entity_id) && growing.state().is_writable() {
682 return growing.update_hot(
683 entity.take().expect("entity already moved"),
684 modified_columns,
685 );
686 }
687 }
688
689 let segment_id = self.entity_segment.read().get(&entity_id).copied();
690 if let Some(seg_id) = segment_id {
691 if let Some(growing_arc) = self.growing.read().as_ref() {
692 let mut growing = growing_arc.write();
693 if growing.id() == seg_id && growing.state().is_writable() {
694 return growing.update_hot(
695 entity.take().expect("entity already moved"),
696 modified_columns,
697 );
698 }
699 }
700 }
701
702 if let Some(entity) = entity.take() {
703 return self.rewrite_sealed_entity_into_growing(entity, None);
704 }
705
706 Err(SegmentError::NotFound(entity_id))
707 }
708
709 pub fn update_hot_with_metadata(
712 &self,
713 entity: UnifiedEntity,
714 modified_columns: &[String],
715 metadata: Option<&Metadata>,
716 ) -> Result<(), SegmentError> {
717 let entity_id = entity.id;
718 let mut entity = Some(entity);
719
720 if let Some(growing_arc) = self.growing.read().as_ref() {
721 let mut growing = growing_arc.write();
722 if growing.contains(entity_id) && growing.state().is_writable() {
723 growing.update_hot(
724 entity.take().expect("entity already moved"),
725 modified_columns,
726 )?;
727 if let Some(metadata) = metadata {
728 growing.set_metadata(entity_id, metadata.clone())?;
729 }
730 return Ok(());
731 }
732 }
733
734 let segment_id = self.entity_segment.read().get(&entity_id).copied();
735 if let Some(seg_id) = segment_id {
736 if let Some(growing_arc) = self.growing.read().as_ref() {
737 let mut growing = growing_arc.write();
738 if growing.id() == seg_id && growing.state().is_writable() {
739 growing.update_hot(
740 entity.take().expect("entity already moved"),
741 modified_columns,
742 )?;
743 if let Some(metadata) = metadata {
744 growing.set_metadata(entity_id, metadata.clone())?;
745 }
746 return Ok(());
747 }
748 }
749 }
750
751 if let Some(entity) = entity.take() {
752 return self.rewrite_sealed_entity_into_growing(entity, metadata);
753 }
754
755 Err(SegmentError::NotFound(entity_id))
756 }
757
758 pub fn update_hot_batch_with_metadata<'a, I>(&self, items: I) -> Result<(), SegmentError>
761 where
762 I: IntoIterator<Item = (&'a UnifiedEntity, &'a [String], Option<&'a Metadata>)>,
763 {
764 let items: Vec<(&UnifiedEntity, &[String], Option<&Metadata>)> =
765 items.into_iter().collect();
766 if items.is_empty() {
767 return Ok(());
768 }
769
770 if let Some(growing_arc) = self.growing.read().as_ref() {
771 let mut growing = growing_arc.write();
772 if growing.state().is_writable() {
773 match growing.update_hot_batch_with_metadata(items.iter().copied()) {
774 Ok(()) => return Ok(()),
775 Err(SegmentError::NotFound(_)) => {}
776 Err(other) => return Err(other),
777 }
778 }
779 }
780
781 for (entity, modified_columns, metadata) in items {
782 self.update_hot_with_metadata(entity.clone(), modified_columns, metadata)?;
783 }
784 Ok(())
785 }
786
787 pub fn delete(&self, id: EntityId) -> Result<bool, SegmentError> {
789 if let Some(growing_arc) = self.growing.read().as_ref() {
792 let mut growing = growing_arc.write();
793 if growing.contains(id) && growing.state().is_writable() {
794 let seg_id = growing.id();
795 let deleted = growing.delete(id)?;
796 if deleted {
797 self.entity_segment.write().remove(&id);
798 self.total_entities_atomic.fetch_sub(1, Ordering::Relaxed);
799 self.emit(LifecycleEvent::EntityDeleted(id, seg_id));
800 }
801 return Ok(deleted);
802 }
803 }
804
805 let segment_id = self.entity_segment.read().get(&id).copied();
808
809 if let Some(seg_id) = segment_id {
810 if let Some(growing_arc) = self.growing.read().as_ref() {
811 let mut growing = growing_arc.write();
812 if growing.id() == seg_id && growing.state().is_writable() {
813 let deleted = growing.delete(id)?;
814 if deleted {
815 self.entity_segment.write().remove(&id);
816 self.total_entities_atomic.fetch_sub(1, Ordering::Relaxed);
817 self.emit(LifecycleEvent::EntityDeleted(id, seg_id));
818 }
819 return Ok(deleted);
820 }
821 }
822 }
823
824 {
827 let sealed = self.sealed.read();
828 for segment_arc in sealed.iter() {
829 let mut seg = segment_arc.write();
830 let seg_id = seg.id();
831 if seg.contains(id) {
832 let deleted = seg.force_delete(id);
833 drop(seg);
834 if deleted {
835 self.entity_segment.write().remove(&id);
836 self.total_entities_atomic.fetch_sub(1, Ordering::Relaxed);
837 self.emit(LifecycleEvent::EntityDeleted(id, seg_id));
838 }
839 return Ok(deleted);
840 }
841 }
842 }
843
844 Ok(false)
845 }
846
847 pub fn delete_batch(&self, ids: &[EntityId]) -> Result<Vec<EntityId>, SegmentError> {
848 if ids.is_empty() {
849 return Ok(Vec::new());
850 }
851
852 let mut deleted_ids = Vec::with_capacity(ids.len());
853
854 if let Some(growing_arc) = self.growing.read().as_ref() {
855 let mut growing = growing_arc.write();
856 if growing.state().is_writable() {
857 let seg_id = growing.id();
858 let deleted = growing.delete_batch(ids)?;
859 if !deleted.is_empty() {
860 {
861 let mut entity_segment = self.entity_segment.write();
862 for id in &deleted {
863 entity_segment.remove(id);
864 }
865 }
866 self.total_entities_atomic
867 .fetch_sub(deleted.len() as u64, Ordering::Relaxed);
868 for id in &deleted {
869 self.emit(LifecycleEvent::EntityDeleted(*id, seg_id));
870 }
871 deleted_ids.extend(deleted);
872 }
873 }
874 }
875
876 if deleted_ids.len() == ids.len() {
877 return Ok(deleted_ids);
878 }
879
880 let deleted_set: std::collections::HashSet<EntityId> =
881 deleted_ids.iter().copied().collect();
882 for &id in ids {
883 if deleted_set.contains(&id) {
884 continue;
885 }
886 if self.delete(id)? {
887 deleted_ids.push(id);
888 }
889 }
890
891 Ok(deleted_ids)
892 }
893
894 pub fn get_metadata(&self, id: EntityId) -> Option<Metadata> {
896 if let Some(growing_arc) = self.growing.read().as_ref() {
898 let growing = growing_arc.read();
899 if growing.contains(id) {
900 return growing.get_metadata(id);
901 }
902 }
903
904 let segment_id = self.entity_segment.read().get(&id).copied();
906
907 if let Some(seg_id) = segment_id {
908 if let Some(growing_arc) = self.growing.read().as_ref() {
909 let growing = growing_arc.read();
910 if growing.id() == seg_id {
911 return growing.get_metadata(id);
912 }
913 }
914
915 let sealed = self.sealed.read();
916 for segment in sealed.iter() {
917 if segment.id() == seg_id {
918 return segment.get_metadata(id);
919 }
920 }
921 }
922
923 if let Some(segment_arc) = self.find_sealed_segment_arc(id) {
924 return segment_arc.read().get_metadata(id);
925 }
926
927 None
928 }
929
930 pub fn set_metadata(&self, id: EntityId, metadata: Metadata) -> Result<(), SegmentError> {
932 if let Some(growing_arc) = self.growing.read().as_ref() {
935 let mut growing = growing_arc.write();
936 if growing.contains(id) && growing.state().is_writable() {
937 return growing.set_metadata(id, metadata);
938 }
939 }
940
941 let segment_id = self.entity_segment.read().get(&id).copied();
943
944 if let Some(seg_id) = segment_id {
945 if let Some(growing_arc) = self.growing.read().as_ref() {
946 let mut growing = growing_arc.write();
947 if growing.id() == seg_id && growing.state().is_writable() {
948 return growing.set_metadata(id, metadata);
949 }
950 }
951 }
952
953 if let Some(entity) = self.get(id) {
954 return self.rewrite_sealed_entity_into_growing(entity, Some(&metadata));
955 }
956
957 Err(SegmentError::NotFound(id))
958 }
959
960 fn maybe_seal_growing(&self) -> Result<(), SegmentError> {
962 let should_seal = {
963 let growing_opt = self.growing.read();
964 if let Some(growing_arc) = growing_opt.as_ref() {
965 let growing = growing_arc.read();
966 growing.should_seal(&self.config.segment_config)
967 || growing.idle_secs() >= self.config.idle_seal_secs
968 } else {
969 false
970 }
971 };
972
973 if should_seal {
974 self.seal_current()?;
975 }
976
977 Ok(())
978 }
979
980 pub fn seal_current(&self) -> Result<SegmentId, SegmentError> {
982 let growing_opt = self.growing.write().take();
983
984 if let Some(growing_arc) = growing_opt {
985 let mut growing = growing_arc.write();
986 let seg_id = growing.id();
987 let entity_count = growing.stats().entity_count as u64;
988
989 growing.seal()?;
991
992 drop(growing); self.sealed.write().push(growing_arc);
998
999 self.mark_sealed_pages_visible(entity_count);
1001
1002 {
1004 let mut stats = self.stats.write();
1005 stats.growing_count = stats.growing_count.saturating_sub(1);
1006 stats.sealed_count += 1;
1007 stats.seal_ops += 1;
1008 }
1009
1010 self.emit(LifecycleEvent::SegmentSealed(seg_id));
1011
1012 return Ok(seg_id);
1013 }
1014
1015 Err(SegmentError::InvalidState(SegmentState::Sealed))
1016 }
1017
1018 pub fn force_seal(&self) -> Result<Option<SegmentId>, SegmentError> {
1020 let has_growing = self.growing.read().is_some();
1021 if has_growing {
1022 Ok(Some(self.seal_current()?))
1023 } else {
1024 Ok(None)
1025 }
1026 }
1027
1028 pub fn all_visible_fraction(&self) -> f64 {
1038 const ROWS_PER_PAGE: u32 = 256;
1039 let sealed = self.sealed.read();
1040 if sealed.is_empty() {
1041 return 0.0;
1042 }
1043 let mut total_pages: u64 = 0;
1044 for seg_arc in sealed.iter() {
1045 let seg = seg_arc.read();
1046 let entity_count = seg.stats().entity_count as u64;
1047 let pages = entity_count.div_ceil(ROWS_PER_PAGE as u64);
1048 total_pages += pages;
1049 }
1050 if total_pages == 0 {
1051 return 0.0;
1052 }
1053 let visible = self.visibility_map.all_visible_count();
1054 (visible as f64 / total_pages as f64).min(1.0)
1055 }
1056
1057 fn mark_sealed_pages_visible(&self, seg_entity_count: u64) {
1060 const ROWS_PER_PAGE: u32 = 256;
1061 let existing_visible = self.visibility_map.all_visible_count();
1062 let start_page = existing_visible as u32;
1064 let new_pages = seg_entity_count.div_ceil(ROWS_PER_PAGE as u64);
1065 let end_page = start_page + new_pages as u32;
1066 self.visibility_map.mark_range_visible(start_page, end_page);
1067 }
1068
1069 pub fn for_each_entity<F>(&self, mut callback: F)
1075 where
1076 F: FnMut(&UnifiedEntity) -> bool,
1077 {
1078 if let Some(growing_arc) = self.growing.read().as_ref() {
1082 let growing = if let Some(g) = growing_arc.try_read() {
1083 g
1084 } else {
1085 growing_arc.read()
1086 };
1087 if !growing.for_each_fast(&mut callback) {
1088 return;
1089 }
1090 }
1091
1092 let sealed = self.sealed.read();
1094 for segment_arc in sealed.iter() {
1095 let segment = segment_arc.read();
1096 if !segment.for_each_fast(&mut callback) {
1097 return;
1098 }
1099 }
1100 }
1101
1102 pub fn fold_entities_parallel<T, FInit, FFold, FReduce>(
1118 &self,
1119 init: FInit,
1120 fold: FFold,
1121 reduce: FReduce,
1122 ) -> T
1123 where
1124 T: Send,
1125 FInit: Fn() -> T + Send + Sync,
1126 FFold: Fn(T, &UnifiedEntity) -> T + Send + Sync,
1127 FReduce: Fn(T, T) -> T + Send + Sync,
1128 {
1129 use rayon::prelude::*;
1130
1131 let mut acc = init();
1134 if let Some(growing_arc) = self.growing.read().as_ref() {
1135 let growing = if let Some(g) = growing_arc.try_read() {
1136 g
1137 } else {
1138 growing_arc.read()
1139 };
1140 growing.for_each_fast(|entity| {
1141 acc = fold(std::mem::replace(&mut acc, init()), entity);
1142 true
1143 });
1144 }
1145
1146 let segments: Vec<_> = {
1150 let sealed = self.sealed.read();
1151 sealed.iter().cloned().collect()
1152 };
1153
1154 if segments.len() <= 1 {
1155 for seg_arc in &segments {
1156 let seg = seg_arc.read();
1157 seg.for_each_fast(|entity| {
1158 acc = fold(std::mem::replace(&mut acc, init()), entity);
1159 true
1160 });
1161 }
1162 return acc;
1163 }
1164
1165 let sealed_acc = segments
1166 .into_par_iter()
1167 .map(|seg_arc| {
1168 let mut local = init();
1169 let seg = seg_arc.read();
1170 seg.for_each_fast(|entity| {
1171 local = fold(std::mem::replace(&mut local, init()), entity);
1172 true
1173 });
1174 local
1175 })
1176 .reduce(&init, &reduce);
1177
1178 reduce(acc, sealed_acc)
1179 }
1180
1181 pub fn for_each_entity_zoned<F>(&self, zone_preds: &[(&str, ZoneColPred<'_>)], mut callback: F)
1190 where
1191 F: FnMut(&UnifiedEntity) -> bool,
1192 {
1193 if let Some(growing_arc) = self.growing.read().as_ref() {
1199 let growing = if let Some(g) = growing_arc.try_read() {
1200 g
1201 } else {
1202 growing_arc.read()
1203 };
1204 if !growing.for_each_fast(&mut callback) {
1205 return;
1206 }
1207 }
1208
1209 let sealed = self.sealed.read();
1211 for segment_arc in sealed.iter() {
1212 let segment = segment_arc.read();
1213 if !zone_preds.is_empty() && segment.can_skip_zone_preds(zone_preds) {
1214 continue; }
1216 if !segment.for_each_fast(&mut callback) {
1217 return;
1218 }
1219 }
1220 }
1221
1222 pub fn query_all_zoned<F>(
1232 &self,
1233 zone_preds: &[(&str, ZoneColPred<'_>)],
1234 filter: F,
1235 ) -> Vec<UnifiedEntity>
1236 where
1237 F: Fn(&UnifiedEntity) -> bool + Sync,
1238 {
1239 let mut results = Vec::new();
1240
1241 if let Some(growing_arc) = self.growing.read().as_ref() {
1244 let growing = if let Some(g) = growing_arc.try_read() {
1245 g
1246 } else {
1247 growing_arc.read()
1248 };
1249 results.extend(growing.iter().filter(|e| filter(e)).cloned());
1250 }
1251
1252 let sealed = self.sealed.read();
1254 let surviving: Vec<_> = sealed
1256 .iter()
1257 .filter(|seg_arc| {
1258 if zone_preds.is_empty() {
1259 return true;
1260 }
1261 let seg = seg_arc.read();
1262 !seg.can_skip_zone_preds(zone_preds)
1263 })
1264 .collect();
1265
1266 let use_parallel = surviving.len() > 1 && crate::runtime::SystemInfo::should_parallelize();
1267
1268 if use_parallel {
1269 let filter_ref = &filter;
1270 let segment_results: Vec<Vec<UnifiedEntity>> = std::thread::scope(|s| {
1271 surviving
1272 .iter()
1273 .map(|segment| {
1274 s.spawn(move || {
1275 segment
1276 .read()
1277 .iter()
1278 .filter(|e| filter_ref(e))
1279 .cloned()
1280 .collect::<Vec<_>>()
1281 })
1282 })
1283 .collect::<Vec<_>>()
1284 .into_iter()
1285 .map(|handle| handle.join().unwrap_or_default())
1286 .collect()
1287 });
1288 for batch in segment_results {
1289 results.extend(batch);
1290 }
1291 } else {
1292 for segment_arc in surviving {
1293 let seg = segment_arc.read();
1294 results.extend(seg.iter().filter(|e| filter(e)).cloned());
1295 }
1296 }
1297
1298 results
1299 }
1300
1301 pub fn query_all<F>(&self, filter: F) -> Vec<UnifiedEntity>
1304 where
1305 F: Fn(&UnifiedEntity) -> bool + Sync,
1306 {
1307 let mut results = Vec::new();
1308
1309 if let Some(growing_arc) = self.growing.read().as_ref() {
1312 let growing = if let Some(g) = growing_arc.try_read() {
1313 g
1314 } else {
1315 growing_arc.read()
1316 };
1317 results.extend(growing.iter().filter(|e| filter(e)).cloned());
1318 }
1319
1320 let sealed = self.sealed.read();
1322 let use_parallel = sealed.len() > 1 && crate::runtime::SystemInfo::should_parallelize();
1323 if use_parallel {
1324 let filter_ref = &filter;
1325 let segment_results: Vec<Vec<UnifiedEntity>> = std::thread::scope(|s| {
1326 sealed
1327 .iter()
1328 .map(|segment| {
1329 s.spawn(move || {
1330 segment
1331 .read()
1332 .iter()
1333 .filter(|e| filter_ref(e))
1334 .cloned()
1335 .collect::<Vec<_>>()
1336 })
1337 })
1338 .collect::<Vec<_>>()
1339 .into_iter()
1340 .map(|handle| handle.join().unwrap_or_default())
1341 .collect()
1342 });
1343 for batch in segment_results {
1344 results.extend(batch);
1345 }
1346 } else {
1347 for segment in sealed.iter() {
1348 let seg = segment.read();
1349 results.extend(seg.iter().filter(|e| filter(e)).cloned());
1350 }
1351 }
1352
1353 results
1354 }
1355
1356 pub fn query_with_bloom_hint<F>(
1366 &self,
1367 key_hint: Option<&[u8]>,
1368 filter: F,
1369 ) -> (Vec<UnifiedEntity>, bool)
1370 where
1371 F: Fn(&UnifiedEntity) -> bool,
1372 {
1373 let mut results = Vec::new();
1374 let mut bloom_pruned = false;
1375
1376 if let Some(growing_arc) = self.growing.read().as_ref() {
1377 let growing = growing_arc.read();
1378 if let Some(key) = key_hint {
1379 if !growing.bloom_might_contain_key(key) {
1380 bloom_pruned = true;
1381 return (results, bloom_pruned);
1382 }
1383 }
1384 for entity in growing.iter() {
1385 if filter(entity) {
1386 results.push(entity.clone());
1387 }
1388 }
1389 }
1390
1391 let sealed = self.sealed.read();
1393 for segment_arc in sealed.iter() {
1394 let segment = segment_arc.read();
1395 if let Some(key) = key_hint {
1396 if !segment.bloom_might_contain_key(key) {
1397 bloom_pruned = true;
1398 continue;
1399 }
1400 }
1401 for entity in segment.iter() {
1402 if filter(entity) {
1403 results.push(entity.clone());
1404 }
1405 }
1406 }
1407
1408 (results, bloom_pruned)
1409 }
1410
1411 pub fn filter_metadata(&self, filters: &[(String, MetadataFilter)]) -> Vec<EntityId> {
1413 let mut results = Vec::new();
1414
1415 if let Some(growing_arc) = self.growing.read().as_ref() {
1417 let growing = growing_arc.read();
1418 results.extend(growing.filter_metadata(filters));
1419 }
1420
1421 let sealed = self.sealed.read();
1423 for segment in sealed.iter() {
1424 results.extend(segment.filter_metadata(filters));
1425 }
1426
1427 results
1428 }
1429
1430 pub fn get_by_kind(&self, kind: &str) -> Vec<UnifiedEntity> {
1432 let mut results = Vec::new();
1433
1434 if let Some(growing_arc) = self.growing.read().as_ref() {
1436 let growing = growing_arc.read();
1437 for entity in growing.iter_kind(kind) {
1438 results.push(entity.clone());
1439 }
1440 }
1441
1442 let sealed = self.sealed.read();
1444 for segment in sealed.iter() {
1445 for entity in segment.iter_kind(kind) {
1446 results.push(entity.clone());
1447 }
1448 }
1449
1450 results
1451 }
1452
1453 pub fn count(&self) -> usize {
1455 self.total_entities_atomic.load(Ordering::Relaxed) as usize
1456 }
1457
1458 pub fn segment_ids(&self) -> Vec<SegmentId> {
1460 let mut ids = Vec::new();
1461
1462 if let Some(growing_arc) = self.growing.read().as_ref() {
1463 ids.push(growing_arc.read().id());
1464 }
1465
1466 let sealed = self.sealed.read();
1467 for segment in sealed.iter() {
1468 ids.push(segment.id());
1469 }
1470
1471 ids.extend(self.archived.read().iter().copied());
1472
1473 ids
1474 }
1475
1476 #[inline]
1489 #[allow(clippy::unused_self)]
1490 fn emit(&self, _event: LifecycleEvent) {}
1491
1492 pub fn drain_events(&self) -> Vec<LifecycleEvent> {
1495 std::mem::take(&mut *self.events.write())
1496 }
1497
1498 pub fn run_maintenance(&self) -> Result<(), SegmentError> {
1500 self.maybe_seal_growing()?;
1502
1503 if self.config.enable_compaction {
1505 let sealed_count = self.sealed.read().len();
1506 if sealed_count > self.config.max_sealed_segments {
1507 }
1510 }
1511
1512 Ok(())
1513 }
1514}
1515
1516impl UnifiedSegment for Arc<RwLock<GrowingSegment>> {
1519 fn id(&self) -> SegmentId {
1520 self.read().id()
1521 }
1522
1523 fn state(&self) -> SegmentState {
1524 self.read().state()
1525 }
1526
1527 fn collection(&self) -> &str {
1528 "unknown"
1530 }
1531
1532 fn stats(&self) -> SegmentStats {
1533 self.read().stats()
1534 }
1535
1536 fn entity_count(&self) -> usize {
1537 self.read().entity_count()
1538 }
1539
1540 fn contains(&self, id: EntityId) -> bool {
1541 self.read().contains(id)
1542 }
1543
1544 fn get(&self, id: EntityId) -> Option<&UnifiedEntity> {
1545 None
1548 }
1549
1550 fn get_mut(&mut self, _id: EntityId) -> Option<&mut UnifiedEntity> {
1551 None
1552 }
1553
1554 fn insert(&mut self, entity: UnifiedEntity) -> Result<EntityId, SegmentError> {
1555 self.write().insert(entity)
1556 }
1557
1558 fn update(&mut self, entity: UnifiedEntity) -> Result<(), SegmentError> {
1559 self.write().update(entity)
1560 }
1561
1562 fn update_hot(
1563 &mut self,
1564 entity: UnifiedEntity,
1565 modified_columns: &[String],
1566 ) -> Result<(), SegmentError> {
1567 self.write().update_hot(entity, modified_columns)
1568 }
1569
1570 fn delete(&mut self, id: EntityId) -> Result<bool, SegmentError> {
1571 self.write().delete(id)
1572 }
1573
1574 fn get_metadata(&self, id: EntityId) -> Option<Metadata> {
1575 self.read().get_metadata(id)
1576 }
1577
1578 fn set_metadata(&mut self, id: EntityId, metadata: Metadata) -> Result<(), SegmentError> {
1579 self.write().set_metadata(id, metadata)
1580 }
1581
1582 fn seal(&mut self) -> Result<(), SegmentError> {
1583 self.write().seal()
1584 }
1585
1586 fn should_seal(&self, config: &SegmentConfig) -> bool {
1587 self.read().should_seal(config)
1588 }
1589
1590 fn iter(&self) -> Box<dyn Iterator<Item = &UnifiedEntity> + '_> {
1591 Box::new(std::iter::empty())
1593 }
1594
1595 fn iter_kind(&self, _kind_filter: &str) -> Box<dyn Iterator<Item = &UnifiedEntity> + '_> {
1596 Box::new(std::iter::empty())
1597 }
1598
1599 fn filter_metadata(&self, filters: &[(String, MetadataFilter)]) -> Vec<EntityId> {
1600 self.read().filter_metadata(filters)
1601 }
1602}
1603
1604#[cfg(test)]
1605mod tests {
1606 use super::*;
1607 use crate::storage::schema::Value;
1608
1609 #[test]
1610 fn test_manager_basic() {
1611 let manager = SegmentManager::new("test_collection");
1612
1613 let entity = UnifiedEntity::table_row(
1614 manager.next_entity_id(),
1615 "users",
1616 1,
1617 vec![Value::text("Alice".to_string())],
1618 );
1619
1620 let id = manager.insert(entity).unwrap();
1621 assert!(manager.get(id).is_some());
1622 assert_eq!(manager.count(), 1);
1623 }
1624
1625 #[test]
1626 fn test_manager_auto_seal() {
1627 let config = ManagerConfig {
1628 segment_config: SegmentConfig {
1629 max_entities: 2,
1630 ..Default::default()
1631 },
1632 ..Default::default()
1633 };
1634
1635 let manager = SegmentManager::with_config("test", config);
1636
1637 manager
1639 .insert(UnifiedEntity::vector(
1640 manager.next_entity_id(),
1641 "v",
1642 vec![0.1],
1643 ))
1644 .unwrap();
1645
1646 manager
1648 .insert(UnifiedEntity::vector(
1649 manager.next_entity_id(),
1650 "v",
1651 vec![0.2],
1652 ))
1653 .unwrap();
1654
1655 manager
1657 .insert(UnifiedEntity::vector(
1658 manager.next_entity_id(),
1659 "v",
1660 vec![0.3],
1661 ))
1662 .unwrap();
1663
1664 let stats = manager.stats();
1665 assert_eq!(stats.total_entities, 3);
1666 }
1667
1668 #[test]
1669 fn test_manager_delete() {
1670 let manager = SegmentManager::new("test");
1671
1672 let id = manager
1673 .insert(UnifiedEntity::vector(
1674 manager.next_entity_id(),
1675 "v",
1676 vec![0.1],
1677 ))
1678 .unwrap();
1679
1680 assert!(manager.get(id).is_some());
1681 assert!(manager.delete(id).unwrap());
1682 assert!(manager.get(id).is_none());
1683 }
1684
1685 #[test]
1686 fn test_manager_metadata() {
1687 let manager = SegmentManager::new("test");
1688
1689 let id = manager
1690 .insert(UnifiedEntity::table_row(
1691 manager.next_entity_id(),
1692 "hosts",
1693 1,
1694 vec![Value::text("192.168.1.1".to_string())],
1695 ))
1696 .unwrap();
1697
1698 let mut meta = Metadata::new();
1699 meta.set(
1700 "os",
1701 super::super::metadata::MetadataValue::String("linux".to_string()),
1702 );
1703
1704 manager.set_metadata(id, meta).unwrap();
1705
1706 let retrieved = manager.get_metadata(id).unwrap();
1707 assert!(retrieved.has("os"));
1708 }
1709
1710 #[test]
1711 fn test_manager_query_by_kind() {
1712 let manager = SegmentManager::new("test");
1713
1714 manager
1715 .insert(UnifiedEntity::table_row(
1716 manager.next_entity_id(),
1717 "hosts",
1718 1,
1719 vec![],
1720 ))
1721 .unwrap();
1722
1723 manager
1724 .insert(UnifiedEntity::vector(
1725 manager.next_entity_id(),
1726 "embeddings",
1727 vec![0.1],
1728 ))
1729 .unwrap();
1730
1731 manager
1732 .insert(UnifiedEntity::table_row(
1733 manager.next_entity_id(),
1734 "hosts",
1735 2,
1736 vec![],
1737 ))
1738 .unwrap();
1739
1740 let rows = manager.get_by_kind("table");
1741 assert_eq!(rows.len(), 2);
1742
1743 let vectors = manager.get_by_kind("vector");
1744 assert_eq!(vectors.len(), 1);
1745 }
1746
1747 #[test]
1748 #[ignore = "lifecycle events intentionally no-op since the emit-channel refactor; drain_events returns empty — see SegmentManager::emit"]
1749 fn test_lifecycle_events() {
1750 let manager = SegmentManager::new("test");
1751
1752 manager
1753 .insert(UnifiedEntity::vector(
1754 manager.next_entity_id(),
1755 "v",
1756 vec![0.1],
1757 ))
1758 .unwrap();
1759
1760 let events = manager.drain_events();
1761
1762 assert!(events
1764 .iter()
1765 .any(|e| matches!(e, LifecycleEvent::SegmentCreated(_))));
1766 assert!(events
1767 .iter()
1768 .any(|e| matches!(e, LifecycleEvent::EntityInserted(_, _))));
1769 }
1770}