1use parking_lot::RwLock;
16use std::collections::HashMap;
17use std::sync::atomic::{AtomicU64, Ordering};
18use std::sync::Arc;
19
20use super::entity::{EntityId, UnifiedEntity};
21use super::metadata::{Metadata, MetadataFilter};
22use super::segment::{
23 GrowingSegment, SegmentConfig, SegmentError, SegmentId, SegmentState, SegmentStats,
24 UnifiedSegment, ZoneColPred, ZoneColPredKind,
25};
26use crate::storage::btree::visibility_map::VisibilityMap;
27
28#[derive(Debug, Clone)]
30pub struct ManagerConfig {
31 pub segment_config: SegmentConfig,
33 pub max_sealed_segments: usize,
35 pub idle_seal_secs: u64,
37 pub enable_compaction: bool,
39 pub enable_archival: bool,
41 pub archive_age_secs: u64,
43}
44
45impl Default for ManagerConfig {
46 fn default() -> Self {
47 Self {
48 segment_config: SegmentConfig::default(),
49 max_sealed_segments: 10,
50 idle_seal_secs: 300, enable_compaction: true,
52 enable_archival: true,
53 archive_age_secs: 86400 * 7, }
55 }
56}
57
58#[derive(Debug, Clone, Default)]
60pub struct ManagerStats {
61 pub total_entities: usize,
63 pub growing_count: usize,
65 pub sealed_count: usize,
67 pub archived_count: usize,
69 pub total_memory_bytes: usize,
71 pub seal_ops: u64,
73 pub compact_ops: u64,
75}
76
77#[derive(Debug, Clone)]
79pub enum LifecycleEvent {
80 SegmentCreated(SegmentId),
81 SegmentSealed(SegmentId),
82 SegmentCompacted {
83 source: Vec<SegmentId>,
84 target: SegmentId,
85 },
86 SegmentArchived(SegmentId),
87 EntityInserted(EntityId, SegmentId),
88 EntityDeleted(EntityId, SegmentId),
89}
90
91pub struct SegmentManager {
93 collection: String,
95 config: ManagerConfig,
97 next_segment_id: AtomicU64,
99 next_entity_id: AtomicU64,
101 next_row_id: AtomicU64,
103 total_entities_atomic: AtomicU64,
107 growing: RwLock<Option<Arc<RwLock<GrowingSegment>>>>,
109 sealed: RwLock<Vec<Arc<RwLock<GrowingSegment>>>>,
111 archived: RwLock<Vec<SegmentId>>,
113 entity_segment: RwLock<HashMap<EntityId, SegmentId>>,
117 column_schema: RwLock<Option<Arc<Vec<String>>>>,
120 stats: RwLock<ManagerStats>,
122 events: RwLock<Vec<LifecycleEvent>>,
124 visibility_map: VisibilityMap,
128}
129
130impl SegmentManager {
131 pub fn new(collection: impl Into<String>) -> Self {
133 Self::with_config(collection, ManagerConfig::default())
134 }
135
136 pub fn with_config(collection: impl Into<String>, config: ManagerConfig) -> Self {
138 Self {
139 collection: collection.into(),
140 config,
141 next_segment_id: AtomicU64::new(1),
142 next_entity_id: AtomicU64::new(1),
143 next_row_id: AtomicU64::new(1),
144 total_entities_atomic: AtomicU64::new(0),
145 growing: RwLock::new(None),
146 sealed: RwLock::new(Vec::new()),
147 archived: RwLock::new(Vec::new()),
148 entity_segment: RwLock::new(HashMap::new()),
149 column_schema: RwLock::new(None),
150 stats: RwLock::new(ManagerStats::default()),
151 events: RwLock::new(Vec::new()),
152 visibility_map: VisibilityMap::new(),
153 }
154 }
155
156 pub fn get_or_init_schema(
158 &self,
159 named: &HashMap<String, crate::storage::schema::Value>,
160 ) -> Arc<Vec<String>> {
161 {
162 let schema = self.column_schema.read();
163 if let Some(ref s) = *schema {
164 return Arc::clone(s);
165 }
166 }
167 let cols: Vec<String> = named.keys().cloned().collect();
168 let arc = Arc::new(cols);
169 *self.column_schema.write() = Some(Arc::clone(&arc));
170 arc
171 }
172
173 pub fn column_schema(&self) -> Option<Arc<Vec<String>>> {
175 self.column_schema.read().clone()
176 }
177
178 pub fn collection(&self) -> &str {
180 &self.collection
181 }
182
183 pub fn config(&self) -> &ManagerConfig {
185 &self.config
186 }
187
188 pub fn stats(&self) -> ManagerStats {
191 let mut s = self.stats.read().clone();
192 s.total_entities = self.total_entities_atomic.load(Ordering::Relaxed) as usize;
193 s
194 }
195
196 pub fn next_entity_id(&self) -> EntityId {
198 EntityId::new(self.next_entity_id.fetch_add(1, Ordering::SeqCst))
199 }
200
201 pub fn next_row_id(&self) -> u64 {
203 self.next_row_id.fetch_add(1, Ordering::SeqCst)
204 }
205
206 pub fn reserve_row_ids(&self, n: u64) -> std::ops::Range<u64> {
210 let start = self.next_row_id.fetch_add(n, Ordering::SeqCst);
211 start..start + n
212 }
213
214 pub fn register_row_id(&self, id: u64) {
217 let candidate = id.saturating_add(1);
218 let mut current = self.next_row_id.load(Ordering::SeqCst);
219 while candidate > current {
220 match self.next_row_id.compare_exchange(
221 current,
222 candidate,
223 Ordering::SeqCst,
224 Ordering::SeqCst,
225 ) {
226 Ok(_) => break,
227 Err(updated) => current = updated,
228 }
229 }
230 }
231
232 fn get_or_create_growing(&self) -> Arc<RwLock<GrowingSegment>> {
239 {
241 let growing = self.growing.read();
242 if let Some(segment) = growing.as_ref() {
243 return Arc::clone(segment);
244 }
245 }
246
247 let mut growing = self.growing.write();
249 if let Some(segment) = growing.as_ref() {
251 return Arc::clone(segment);
252 }
253
254 let id = self.next_segment_id.fetch_add(1, Ordering::SeqCst);
255 let segment = GrowingSegment::new(id, &self.collection);
256 let segment_arc = Arc::new(RwLock::new(segment));
257 *growing = Some(Arc::clone(&segment_arc));
258
259 self.emit(LifecycleEvent::SegmentCreated(id));
260
261 self.stats.write().growing_count += 1;
264
265 segment_arc
266 }
267
268 pub fn insert(&self, mut entity: UnifiedEntity) -> Result<EntityId, SegmentError> {
270 self.maybe_seal_growing()?;
272
273 let segment_arc = self.get_or_create_growing();
274 let mut segment = segment_arc.write();
275
276 if entity.id.raw() == 0 {
278 entity.id = self.next_entity_id();
279 }
280
281 let entity_id = entity.id;
282 let segment_id = segment.id();
283
284 segment.insert(entity)?;
285
286 self.total_entities_atomic.fetch_add(1, Ordering::Relaxed);
288
289 self.emit(LifecycleEvent::EntityInserted(entity_id, segment_id));
298
299 Ok(entity_id)
300 }
301
302 pub fn insert_batch(
304 &self,
305 entities: Vec<UnifiedEntity>,
306 ) -> Result<Vec<EntityId>, SegmentError> {
307 let mut ids = Vec::with_capacity(entities.len());
308 for entity in entities {
309 ids.push(self.insert(entity)?);
310 }
311 Ok(ids)
312 }
313
314 pub fn bulk_insert(
317 &self,
318 mut entities: Vec<UnifiedEntity>,
319 ) -> Result<Vec<EntityId>, SegmentError> {
320 for entity in &mut entities {
322 if entity.id.raw() == 0 {
323 entity.id = self.next_entity_id();
324 }
325 if let super::entity::EntityKind::TableRow { ref mut row_id, .. } = entity.kind {
326 if *row_id == 0 {
327 *row_id = self.next_row_id();
328 } else {
329 self.register_row_id(*row_id);
330 }
331 }
332 }
333
334 if let Some(first_row) = entities.first() {
337 if let super::entity::EntityData::Row(ref row) = first_row.data {
338 if let Some(ref named) = row.named {
339 let schema = self.get_or_init_schema(named);
340 for entity in &mut entities {
341 if let super::entity::EntityData::Row(ref mut row) = entity.data {
342 if let Some(named) = row.named.take() {
343 let mut cols = Vec::with_capacity(schema.len());
344 for col_name in schema.iter() {
345 cols.push(
346 named
347 .get(col_name)
348 .cloned()
349 .unwrap_or(crate::storage::schema::Value::Null),
350 );
351 }
352 row.columns = cols;
353 row.schema = Some(Arc::clone(&schema));
354 }
355 }
356 }
357 }
358 }
359 }
360
361 let segment_arc = self.get_or_create_growing();
362 let mut segment = segment_arc.write();
363 let segment_id = segment.id();
364
365 let ids = segment.bulk_insert(entities)?;
367
368 self.total_entities_atomic
373 .fetch_add(ids.len() as u64, Ordering::Relaxed);
374
375 Ok(ids)
376 }
377
378 pub fn get(&self, id: EntityId) -> Option<UnifiedEntity> {
380 if let Some(growing_arc) = self.growing.read().as_ref() {
382 let growing = growing_arc.read();
383 if let Some(entity) = growing.get(id) {
384 return Some(entity.clone());
385 }
386 }
387
388 let sealed = self.sealed.read();
390 for segment in sealed.iter() {
391 let seg = segment.read();
392 if let Some(entity) = seg.get(id) {
393 return Some(entity.clone());
394 }
395 }
396
397 None
398 }
399
400 pub fn get_many(&self, ids: &[EntityId]) -> Vec<Option<UnifiedEntity>> {
405 let mut out: Vec<Option<UnifiedEntity>> = vec![None; ids.len()];
406 let mut remaining: Vec<usize> = (0..ids.len()).collect(); if let Some(growing_arc) = self.growing.read().as_ref() {
411 let growing = if let Some(g) = growing_arc.try_read() {
412 g
413 } else {
414 growing_arc.read()
415 };
416 remaining.retain(|&i| {
417 if let Some(entity) = growing.get(ids[i]) {
418 out[i] = Some(entity.clone());
419 false } else {
421 true }
423 });
424 }
425
426 if remaining.is_empty() {
427 return out;
428 }
429
430 let sealed = self.sealed.read();
432 for segment in sealed.iter() {
433 if remaining.is_empty() {
434 break;
435 }
436 let seg = segment.read();
437 remaining.retain(|&i| {
438 if let Some(entity) = seg.get(ids[i]) {
439 out[i] = Some(entity.clone());
440 false
441 } else {
442 true
443 }
444 });
445 }
446
447 out
448 }
449
450 pub fn for_each_id<F>(&self, ids: &[EntityId], mut f: F)
463 where
464 F: FnMut(usize, &UnifiedEntity),
465 {
466 thread_local! {
475 static REMAINING_SCRATCH: std::cell::Cell<Vec<usize>> =
476 const { std::cell::Cell::new(Vec::new()) };
477 }
478
479 let mut remaining: Vec<usize> = REMAINING_SCRATCH.with(|cell| cell.take());
480 remaining.clear();
481
482 if let Some(growing_arc) = self.growing.read().as_ref() {
483 let growing = if let Some(g) = growing_arc.try_read() {
484 g
485 } else {
486 growing_arc.read()
487 };
488 for (i, id) in ids.iter().enumerate() {
489 if let Some(entity) = growing.get(*id) {
490 f(i, entity);
491 } else {
492 remaining.push(i);
493 }
494 }
495 } else {
496 remaining.reserve(ids.len());
497 remaining.extend(0..ids.len());
498 }
499
500 if !remaining.is_empty() {
501 let sealed = self.sealed.read();
502 for segment in sealed.iter() {
503 if remaining.is_empty() {
504 break;
505 }
506 let seg = segment.read();
507 remaining.retain(|&i| {
508 if let Some(entity) = seg.get(ids[i]) {
509 f(i, entity);
510 false
511 } else {
512 true
513 }
514 });
515 }
516 }
517
518 REMAINING_SCRATCH.with(|cell| cell.set(remaining));
519 }
520
521 fn scan_for_entity(&self, id: EntityId) -> Option<UnifiedEntity> {
523 if let Some(growing_arc) = self.growing.read().as_ref() {
525 let growing = growing_arc.read();
526 if let Some(entity) = growing.get(id) {
527 return Some(entity.clone());
528 }
529 }
530
531 let sealed = self.sealed.read();
533 for segment in sealed.iter() {
534 if let Some(entity) = segment.get(id) {
535 return Some(entity.clone());
536 }
537 }
538
539 None
540 }
541
542 fn find_sealed_segment_arc(&self, id: EntityId) -> Option<Arc<RwLock<GrowingSegment>>> {
543 let sealed = self.sealed.read();
544 sealed
545 .iter()
546 .find(|segment_arc| segment_arc.read().contains(id))
547 .map(Arc::clone)
548 }
549
550 fn rewrite_sealed_entity_into_growing(
551 &self,
552 entity: UnifiedEntity,
553 metadata: Option<&Metadata>,
554 ) -> Result<(), SegmentError> {
555 let entity_id = entity.id;
556 let sealed_arc = self
557 .find_sealed_segment_arc(entity_id)
558 .ok_or(SegmentError::NotFound(entity_id))?;
559
560 let metadata_to_apply = {
561 let mut sealed = sealed_arc.write();
562 let existing_metadata = sealed.get_metadata(entity_id);
563 if !sealed.force_delete(entity_id) {
564 return Err(SegmentError::NotFound(entity_id));
565 }
566 metadata.cloned().or(existing_metadata)
567 };
568
569 let growing_arc = self.get_or_create_growing();
570 let growing_id = {
571 let mut growing = growing_arc.write();
572 growing.insert(entity)?;
573 if let Some(metadata) = metadata_to_apply {
574 growing.set_metadata(entity_id, metadata)?;
575 }
576 growing.id()
577 };
578
579 self.entity_segment.write().insert(entity_id, growing_id);
580 Ok(())
581 }
582
583 pub fn update(&self, entity: UnifiedEntity) -> Result<(), SegmentError> {
585 let entity_id = entity.id;
586 let mut entity = Some(entity);
587
588 if let Some(growing_arc) = self.growing.read().as_ref() {
590 let mut growing = growing_arc.write();
591 if growing.contains(entity_id) && growing.state().is_writable() {
592 return growing.update(entity.take().expect("entity already moved"));
593 }
594 }
595
596 let segment_id = self.entity_segment.read().get(&entity_id).copied();
598 if let Some(seg_id) = segment_id {
599 if let Some(growing_arc) = self.growing.read().as_ref() {
600 let mut growing = growing_arc.write();
601 if growing.id() == seg_id && growing.state().is_writable() {
602 return growing.update(entity.take().expect("entity already moved"));
603 }
604 }
605 }
606
607 if let Some(entity) = entity.take() {
608 return self.rewrite_sealed_entity_into_growing(entity, None);
609 }
610
611 Err(SegmentError::NotFound(entity_id))
612 }
613
614 pub fn update_with_metadata(
617 &self,
618 entity: UnifiedEntity,
619 metadata: Option<&Metadata>,
620 ) -> Result<(), SegmentError> {
621 let entity_id = entity.id;
622 let mut entity = Some(entity);
623
624 if let Some(growing_arc) = self.growing.read().as_ref() {
626 let mut growing = growing_arc.write();
627 if growing.contains(entity_id) && growing.state().is_writable() {
628 growing.update(entity.take().expect("entity already moved"))?;
629 if let Some(metadata) = metadata {
630 growing.set_metadata(entity_id, metadata.clone())?;
631 }
632 return Ok(());
633 }
634 }
635
636 let segment_id = self.entity_segment.read().get(&entity_id).copied();
638 if let Some(seg_id) = segment_id {
639 if let Some(growing_arc) = self.growing.read().as_ref() {
640 let mut growing = growing_arc.write();
641 if growing.id() == seg_id && growing.state().is_writable() {
642 growing.update(entity.take().expect("entity already moved"))?;
643 if let Some(metadata) = metadata {
644 growing.set_metadata(entity_id, metadata.clone())?;
645 }
646 return Ok(());
647 }
648 }
649 }
650
651 if let Some(entity) = entity.take() {
652 return self.rewrite_sealed_entity_into_growing(entity, metadata);
653 }
654
655 Err(SegmentError::NotFound(entity_id))
656 }
657
658 pub fn update_hot(
662 &self,
663 entity: UnifiedEntity,
664 modified_columns: &[String],
665 ) -> Result<(), SegmentError> {
666 let entity_id = entity.id;
667 let mut entity = Some(entity);
668
669 if let Some(growing_arc) = self.growing.read().as_ref() {
670 let mut growing = growing_arc.write();
671 if growing.contains(entity_id) && growing.state().is_writable() {
672 return growing.update_hot(
673 entity.take().expect("entity already moved"),
674 modified_columns,
675 );
676 }
677 }
678
679 let segment_id = self.entity_segment.read().get(&entity_id).copied();
680 if let Some(seg_id) = segment_id {
681 if let Some(growing_arc) = self.growing.read().as_ref() {
682 let mut growing = growing_arc.write();
683 if growing.id() == seg_id && growing.state().is_writable() {
684 return growing.update_hot(
685 entity.take().expect("entity already moved"),
686 modified_columns,
687 );
688 }
689 }
690 }
691
692 if let Some(entity) = entity.take() {
693 return self.rewrite_sealed_entity_into_growing(entity, None);
694 }
695
696 Err(SegmentError::NotFound(entity_id))
697 }
698
699 pub fn update_hot_with_metadata(
702 &self,
703 entity: UnifiedEntity,
704 modified_columns: &[String],
705 metadata: Option<&Metadata>,
706 ) -> Result<(), SegmentError> {
707 let entity_id = entity.id;
708 let mut entity = Some(entity);
709
710 if let Some(growing_arc) = self.growing.read().as_ref() {
711 let mut growing = growing_arc.write();
712 if growing.contains(entity_id) && growing.state().is_writable() {
713 growing.update_hot(
714 entity.take().expect("entity already moved"),
715 modified_columns,
716 )?;
717 if let Some(metadata) = metadata {
718 growing.set_metadata(entity_id, metadata.clone())?;
719 }
720 return Ok(());
721 }
722 }
723
724 let segment_id = self.entity_segment.read().get(&entity_id).copied();
725 if let Some(seg_id) = segment_id {
726 if let Some(growing_arc) = self.growing.read().as_ref() {
727 let mut growing = growing_arc.write();
728 if growing.id() == seg_id && growing.state().is_writable() {
729 growing.update_hot(
730 entity.take().expect("entity already moved"),
731 modified_columns,
732 )?;
733 if let Some(metadata) = metadata {
734 growing.set_metadata(entity_id, metadata.clone())?;
735 }
736 return Ok(());
737 }
738 }
739 }
740
741 if let Some(entity) = entity.take() {
742 return self.rewrite_sealed_entity_into_growing(entity, metadata);
743 }
744
745 Err(SegmentError::NotFound(entity_id))
746 }
747
748 pub fn update_hot_batch_with_metadata<'a, I>(&self, items: I) -> Result<(), SegmentError>
751 where
752 I: IntoIterator<Item = (&'a UnifiedEntity, &'a [String], Option<&'a Metadata>)>,
753 {
754 let items: Vec<(&UnifiedEntity, &[String], Option<&Metadata>)> =
755 items.into_iter().collect();
756 if items.is_empty() {
757 return Ok(());
758 }
759
760 if let Some(growing_arc) = self.growing.read().as_ref() {
761 let mut growing = growing_arc.write();
762 if growing.state().is_writable() {
763 match growing.update_hot_batch_with_metadata(items.iter().copied()) {
764 Ok(()) => return Ok(()),
765 Err(SegmentError::NotFound(_)) => {}
766 Err(other) => return Err(other),
767 }
768 }
769 }
770
771 for (entity, modified_columns, metadata) in items {
772 self.update_hot_with_metadata(entity.clone(), modified_columns, metadata)?;
773 }
774 Ok(())
775 }
776
777 pub fn delete(&self, id: EntityId) -> Result<bool, SegmentError> {
779 if let Some(growing_arc) = self.growing.read().as_ref() {
782 let mut growing = growing_arc.write();
783 if growing.contains(id) && growing.state().is_writable() {
784 let seg_id = growing.id();
785 let deleted = growing.delete(id)?;
786 if deleted {
787 self.entity_segment.write().remove(&id);
788 self.total_entities_atomic.fetch_sub(1, Ordering::Relaxed);
789 self.emit(LifecycleEvent::EntityDeleted(id, seg_id));
790 }
791 return Ok(deleted);
792 }
793 }
794
795 let segment_id = self.entity_segment.read().get(&id).copied();
798
799 if let Some(seg_id) = segment_id {
800 if let Some(growing_arc) = self.growing.read().as_ref() {
801 let mut growing = growing_arc.write();
802 if growing.id() == seg_id && growing.state().is_writable() {
803 let deleted = growing.delete(id)?;
804 if deleted {
805 self.entity_segment.write().remove(&id);
806 self.total_entities_atomic.fetch_sub(1, Ordering::Relaxed);
807 self.emit(LifecycleEvent::EntityDeleted(id, seg_id));
808 }
809 return Ok(deleted);
810 }
811 }
812 }
813
814 {
817 let sealed = self.sealed.read();
818 for segment_arc in sealed.iter() {
819 let mut seg = segment_arc.write();
820 let seg_id = seg.id();
821 if seg.contains(id) {
822 let deleted = seg.force_delete(id);
823 drop(seg);
824 if deleted {
825 self.entity_segment.write().remove(&id);
826 self.total_entities_atomic.fetch_sub(1, Ordering::Relaxed);
827 self.emit(LifecycleEvent::EntityDeleted(id, seg_id));
828 }
829 return Ok(deleted);
830 }
831 }
832 }
833
834 Ok(false)
835 }
836
837 pub fn delete_batch(&self, ids: &[EntityId]) -> Result<Vec<EntityId>, SegmentError> {
838 if ids.is_empty() {
839 return Ok(Vec::new());
840 }
841
842 let mut deleted_ids = Vec::with_capacity(ids.len());
843
844 if let Some(growing_arc) = self.growing.read().as_ref() {
845 let mut growing = growing_arc.write();
846 if growing.state().is_writable() {
847 let seg_id = growing.id();
848 let deleted = growing.delete_batch(ids)?;
849 if !deleted.is_empty() {
850 {
851 let mut entity_segment = self.entity_segment.write();
852 for id in &deleted {
853 entity_segment.remove(id);
854 }
855 }
856 self.total_entities_atomic
857 .fetch_sub(deleted.len() as u64, Ordering::Relaxed);
858 for id in &deleted {
859 self.emit(LifecycleEvent::EntityDeleted(*id, seg_id));
860 }
861 deleted_ids.extend(deleted);
862 }
863 }
864 }
865
866 if deleted_ids.len() == ids.len() {
867 return Ok(deleted_ids);
868 }
869
870 let deleted_set: std::collections::HashSet<EntityId> =
871 deleted_ids.iter().copied().collect();
872 for &id in ids {
873 if deleted_set.contains(&id) {
874 continue;
875 }
876 if self.delete(id)? {
877 deleted_ids.push(id);
878 }
879 }
880
881 Ok(deleted_ids)
882 }
883
884 pub fn get_metadata(&self, id: EntityId) -> Option<Metadata> {
886 if let Some(growing_arc) = self.growing.read().as_ref() {
888 let growing = growing_arc.read();
889 if growing.contains(id) {
890 return growing.get_metadata(id);
891 }
892 }
893
894 let segment_id = self.entity_segment.read().get(&id).copied();
896
897 if let Some(seg_id) = segment_id {
898 if let Some(growing_arc) = self.growing.read().as_ref() {
899 let growing = growing_arc.read();
900 if growing.id() == seg_id {
901 return growing.get_metadata(id);
902 }
903 }
904
905 let sealed = self.sealed.read();
906 for segment in sealed.iter() {
907 if segment.id() == seg_id {
908 return segment.get_metadata(id);
909 }
910 }
911 }
912
913 if let Some(segment_arc) = self.find_sealed_segment_arc(id) {
914 return segment_arc.read().get_metadata(id);
915 }
916
917 None
918 }
919
920 pub fn set_metadata(&self, id: EntityId, metadata: Metadata) -> Result<(), SegmentError> {
922 if let Some(growing_arc) = self.growing.read().as_ref() {
925 let mut growing = growing_arc.write();
926 if growing.contains(id) && growing.state().is_writable() {
927 return growing.set_metadata(id, metadata);
928 }
929 }
930
931 let segment_id = self.entity_segment.read().get(&id).copied();
933
934 if let Some(seg_id) = segment_id {
935 if let Some(growing_arc) = self.growing.read().as_ref() {
936 let mut growing = growing_arc.write();
937 if growing.id() == seg_id && growing.state().is_writable() {
938 return growing.set_metadata(id, metadata);
939 }
940 }
941 }
942
943 if let Some(entity) = self.get(id) {
944 return self.rewrite_sealed_entity_into_growing(entity, Some(&metadata));
945 }
946
947 Err(SegmentError::NotFound(id))
948 }
949
950 fn maybe_seal_growing(&self) -> Result<(), SegmentError> {
952 let should_seal = {
953 let growing_opt = self.growing.read();
954 if let Some(growing_arc) = growing_opt.as_ref() {
955 let growing = growing_arc.read();
956 growing.should_seal(&self.config.segment_config)
957 || growing.idle_secs() >= self.config.idle_seal_secs
958 } else {
959 false
960 }
961 };
962
963 if should_seal {
964 self.seal_current()?;
965 }
966
967 Ok(())
968 }
969
970 pub fn seal_current(&self) -> Result<SegmentId, SegmentError> {
972 let growing_opt = self.growing.write().take();
973
974 if let Some(growing_arc) = growing_opt {
975 let mut growing = growing_arc.write();
976 let seg_id = growing.id();
977 let entity_count = growing.stats().entity_count as u64;
978
979 growing.seal()?;
981
982 drop(growing); self.sealed.write().push(growing_arc);
988
989 self.mark_sealed_pages_visible(entity_count);
991
992 {
994 let mut stats = self.stats.write();
995 stats.growing_count = stats.growing_count.saturating_sub(1);
996 stats.sealed_count += 1;
997 stats.seal_ops += 1;
998 }
999
1000 self.emit(LifecycleEvent::SegmentSealed(seg_id));
1001
1002 return Ok(seg_id);
1003 }
1004
1005 Err(SegmentError::InvalidState(SegmentState::Sealed))
1006 }
1007
1008 pub fn force_seal(&self) -> Result<Option<SegmentId>, SegmentError> {
1010 let has_growing = self.growing.read().is_some();
1011 if has_growing {
1012 Ok(Some(self.seal_current()?))
1013 } else {
1014 Ok(None)
1015 }
1016 }
1017
1018 pub fn all_visible_fraction(&self) -> f64 {
1028 const ROWS_PER_PAGE: u32 = 256;
1029 let sealed = self.sealed.read();
1030 if sealed.is_empty() {
1031 return 0.0;
1032 }
1033 let mut total_pages: u64 = 0;
1034 for seg_arc in sealed.iter() {
1035 let seg = seg_arc.read();
1036 let entity_count = seg.stats().entity_count as u64;
1037 let pages = entity_count.div_ceil(ROWS_PER_PAGE as u64);
1038 total_pages += pages;
1039 }
1040 if total_pages == 0 {
1041 return 0.0;
1042 }
1043 let visible = self.visibility_map.all_visible_count();
1044 (visible as f64 / total_pages as f64).min(1.0)
1045 }
1046
1047 fn mark_sealed_pages_visible(&self, seg_entity_count: u64) {
1050 const ROWS_PER_PAGE: u32 = 256;
1051 let existing_visible = self.visibility_map.all_visible_count();
1052 let start_page = existing_visible as u32;
1054 let new_pages = seg_entity_count.div_ceil(ROWS_PER_PAGE as u64);
1055 let end_page = start_page + new_pages as u32;
1056 self.visibility_map.mark_range_visible(start_page, end_page);
1057 }
1058
1059 pub fn for_each_entity<F>(&self, mut callback: F)
1065 where
1066 F: FnMut(&UnifiedEntity) -> bool,
1067 {
1068 if let Some(growing_arc) = self.growing.read().as_ref() {
1072 let growing = if let Some(g) = growing_arc.try_read() {
1073 g
1074 } else {
1075 growing_arc.read()
1076 };
1077 if !growing.for_each_fast(&mut callback) {
1078 return;
1079 }
1080 }
1081
1082 let sealed = self.sealed.read();
1084 for segment_arc in sealed.iter() {
1085 let segment = segment_arc.read();
1086 if !segment.for_each_fast(&mut callback) {
1087 return;
1088 }
1089 }
1090 }
1091
1092 pub fn fold_entities_parallel<T, FInit, FFold, FReduce>(
1108 &self,
1109 init: FInit,
1110 fold: FFold,
1111 reduce: FReduce,
1112 ) -> T
1113 where
1114 T: Send,
1115 FInit: Fn() -> T + Send + Sync,
1116 FFold: Fn(T, &UnifiedEntity) -> T + Send + Sync,
1117 FReduce: Fn(T, T) -> T + Send + Sync,
1118 {
1119 use rayon::prelude::*;
1120
1121 let mut acc = init();
1124 if let Some(growing_arc) = self.growing.read().as_ref() {
1125 let growing = if let Some(g) = growing_arc.try_read() {
1126 g
1127 } else {
1128 growing_arc.read()
1129 };
1130 growing.for_each_fast(|entity| {
1131 acc = fold(std::mem::replace(&mut acc, init()), entity);
1132 true
1133 });
1134 }
1135
1136 let segments: Vec<_> = {
1140 let sealed = self.sealed.read();
1141 sealed.iter().cloned().collect()
1142 };
1143
1144 if segments.len() <= 1 {
1145 for seg_arc in &segments {
1146 let seg = seg_arc.read();
1147 seg.for_each_fast(|entity| {
1148 acc = fold(std::mem::replace(&mut acc, init()), entity);
1149 true
1150 });
1151 }
1152 return acc;
1153 }
1154
1155 let sealed_acc = segments
1156 .into_par_iter()
1157 .map(|seg_arc| {
1158 let mut local = init();
1159 let seg = seg_arc.read();
1160 seg.for_each_fast(|entity| {
1161 local = fold(std::mem::replace(&mut local, init()), entity);
1162 true
1163 });
1164 local
1165 })
1166 .reduce(&init, &reduce);
1167
1168 reduce(acc, sealed_acc)
1169 }
1170
1171 pub fn for_each_entity_zoned<F>(&self, zone_preds: &[(&str, ZoneColPred<'_>)], mut callback: F)
1180 where
1181 F: FnMut(&UnifiedEntity) -> bool,
1182 {
1183 if let Some(growing_arc) = self.growing.read().as_ref() {
1189 let growing = if let Some(g) = growing_arc.try_read() {
1190 g
1191 } else {
1192 growing_arc.read()
1193 };
1194 if !growing.for_each_fast(&mut callback) {
1195 return;
1196 }
1197 }
1198
1199 let sealed = self.sealed.read();
1201 for segment_arc in sealed.iter() {
1202 let segment = segment_arc.read();
1203 if !zone_preds.is_empty() && segment.can_skip_zone_preds(zone_preds) {
1204 continue; }
1206 if !segment.for_each_fast(&mut callback) {
1207 return;
1208 }
1209 }
1210 }
1211
1212 pub fn query_all_zoned<F>(
1222 &self,
1223 zone_preds: &[(&str, ZoneColPred<'_>)],
1224 filter: F,
1225 ) -> Vec<UnifiedEntity>
1226 where
1227 F: Fn(&UnifiedEntity) -> bool + Sync,
1228 {
1229 let mut results = Vec::new();
1230
1231 if let Some(growing_arc) = self.growing.read().as_ref() {
1234 let growing = if let Some(g) = growing_arc.try_read() {
1235 g
1236 } else {
1237 growing_arc.read()
1238 };
1239 results.extend(growing.iter().filter(|e| filter(e)).cloned());
1240 }
1241
1242 let sealed = self.sealed.read();
1244 let surviving: Vec<_> = sealed
1246 .iter()
1247 .filter(|seg_arc| {
1248 if zone_preds.is_empty() {
1249 return true;
1250 }
1251 let seg = seg_arc.read();
1252 !seg.can_skip_zone_preds(zone_preds)
1253 })
1254 .collect();
1255
1256 let use_parallel = surviving.len() > 1 && crate::runtime::SystemInfo::should_parallelize();
1257
1258 if use_parallel {
1259 let filter_ref = &filter;
1260 let segment_results: Vec<Vec<UnifiedEntity>> = std::thread::scope(|s| {
1261 surviving
1262 .iter()
1263 .map(|segment| {
1264 s.spawn(move || {
1265 segment
1266 .read()
1267 .iter()
1268 .filter(|e| filter_ref(e))
1269 .cloned()
1270 .collect::<Vec<_>>()
1271 })
1272 })
1273 .collect::<Vec<_>>()
1274 .into_iter()
1275 .map(|handle| handle.join().unwrap_or_default())
1276 .collect()
1277 });
1278 for batch in segment_results {
1279 results.extend(batch);
1280 }
1281 } else {
1282 for segment_arc in surviving {
1283 let seg = segment_arc.read();
1284 results.extend(seg.iter().filter(|e| filter(e)).cloned());
1285 }
1286 }
1287
1288 results
1289 }
1290
1291 pub fn query_all<F>(&self, filter: F) -> Vec<UnifiedEntity>
1294 where
1295 F: Fn(&UnifiedEntity) -> bool + Sync,
1296 {
1297 let mut results = Vec::new();
1298
1299 if let Some(growing_arc) = self.growing.read().as_ref() {
1302 let growing = if let Some(g) = growing_arc.try_read() {
1303 g
1304 } else {
1305 growing_arc.read()
1306 };
1307 results.extend(growing.iter().filter(|e| filter(e)).cloned());
1308 }
1309
1310 let sealed = self.sealed.read();
1312 let use_parallel = sealed.len() > 1 && crate::runtime::SystemInfo::should_parallelize();
1313 if use_parallel {
1314 let filter_ref = &filter;
1315 let segment_results: Vec<Vec<UnifiedEntity>> = std::thread::scope(|s| {
1316 sealed
1317 .iter()
1318 .map(|segment| {
1319 s.spawn(move || {
1320 segment
1321 .read()
1322 .iter()
1323 .filter(|e| filter_ref(e))
1324 .cloned()
1325 .collect::<Vec<_>>()
1326 })
1327 })
1328 .collect::<Vec<_>>()
1329 .into_iter()
1330 .map(|handle| handle.join().unwrap_or_default())
1331 .collect()
1332 });
1333 for batch in segment_results {
1334 results.extend(batch);
1335 }
1336 } else {
1337 for segment in sealed.iter() {
1338 let seg = segment.read();
1339 results.extend(seg.iter().filter(|e| filter(e)).cloned());
1340 }
1341 }
1342
1343 results
1344 }
1345
1346 pub fn query_with_bloom_hint<F>(
1356 &self,
1357 key_hint: Option<&[u8]>,
1358 filter: F,
1359 ) -> (Vec<UnifiedEntity>, bool)
1360 where
1361 F: Fn(&UnifiedEntity) -> bool,
1362 {
1363 let mut results = Vec::new();
1364 let mut bloom_pruned = false;
1365
1366 if let Some(growing_arc) = self.growing.read().as_ref() {
1367 let growing = growing_arc.read();
1368 if let Some(key) = key_hint {
1369 if !growing.bloom_might_contain_key(key) {
1370 bloom_pruned = true;
1371 return (results, bloom_pruned);
1372 }
1373 }
1374 for entity in growing.iter() {
1375 if filter(entity) {
1376 results.push(entity.clone());
1377 }
1378 }
1379 }
1380
1381 let sealed = self.sealed.read();
1383 for segment_arc in sealed.iter() {
1384 let segment = segment_arc.read();
1385 if let Some(key) = key_hint {
1386 if !segment.bloom_might_contain_key(key) {
1387 bloom_pruned = true;
1388 continue;
1389 }
1390 }
1391 for entity in segment.iter() {
1392 if filter(entity) {
1393 results.push(entity.clone());
1394 }
1395 }
1396 }
1397
1398 (results, bloom_pruned)
1399 }
1400
1401 pub fn filter_metadata(&self, filters: &[(String, MetadataFilter)]) -> Vec<EntityId> {
1403 let mut results = Vec::new();
1404
1405 if let Some(growing_arc) = self.growing.read().as_ref() {
1407 let growing = growing_arc.read();
1408 results.extend(growing.filter_metadata(filters));
1409 }
1410
1411 let sealed = self.sealed.read();
1413 for segment in sealed.iter() {
1414 results.extend(segment.filter_metadata(filters));
1415 }
1416
1417 results
1418 }
1419
1420 pub fn get_by_kind(&self, kind: &str) -> Vec<UnifiedEntity> {
1422 let mut results = Vec::new();
1423
1424 if let Some(growing_arc) = self.growing.read().as_ref() {
1426 let growing = growing_arc.read();
1427 for entity in growing.iter_kind(kind) {
1428 results.push(entity.clone());
1429 }
1430 }
1431
1432 let sealed = self.sealed.read();
1434 for segment in sealed.iter() {
1435 for entity in segment.iter_kind(kind) {
1436 results.push(entity.clone());
1437 }
1438 }
1439
1440 results
1441 }
1442
1443 pub fn count(&self) -> usize {
1445 self.total_entities_atomic.load(Ordering::Relaxed) as usize
1446 }
1447
1448 pub fn segment_ids(&self) -> Vec<SegmentId> {
1450 let mut ids = Vec::new();
1451
1452 if let Some(growing_arc) = self.growing.read().as_ref() {
1453 ids.push(growing_arc.read().id());
1454 }
1455
1456 let sealed = self.sealed.read();
1457 for segment in sealed.iter() {
1458 ids.push(segment.id());
1459 }
1460
1461 ids.extend(self.archived.read().iter().copied());
1462
1463 ids
1464 }
1465
1466 #[inline]
1479 #[allow(clippy::unused_self)]
1480 fn emit(&self, _event: LifecycleEvent) {}
1481
1482 pub fn drain_events(&self) -> Vec<LifecycleEvent> {
1485 std::mem::take(&mut *self.events.write())
1486 }
1487
1488 pub fn run_maintenance(&self) -> Result<(), SegmentError> {
1490 self.maybe_seal_growing()?;
1492
1493 if self.config.enable_compaction {
1495 let sealed_count = self.sealed.read().len();
1496 if sealed_count > self.config.max_sealed_segments {
1497 }
1500 }
1501
1502 Ok(())
1503 }
1504}
1505
1506impl UnifiedSegment for Arc<RwLock<GrowingSegment>> {
1509 fn id(&self) -> SegmentId {
1510 self.read().id()
1511 }
1512
1513 fn state(&self) -> SegmentState {
1514 self.read().state()
1515 }
1516
1517 fn collection(&self) -> &str {
1518 "unknown"
1520 }
1521
1522 fn stats(&self) -> SegmentStats {
1523 self.read().stats()
1524 }
1525
1526 fn entity_count(&self) -> usize {
1527 self.read().entity_count()
1528 }
1529
1530 fn contains(&self, id: EntityId) -> bool {
1531 self.read().contains(id)
1532 }
1533
1534 fn get(&self, id: EntityId) -> Option<&UnifiedEntity> {
1535 None
1538 }
1539
1540 fn get_mut(&mut self, _id: EntityId) -> Option<&mut UnifiedEntity> {
1541 None
1542 }
1543
1544 fn insert(&mut self, entity: UnifiedEntity) -> Result<EntityId, SegmentError> {
1545 self.write().insert(entity)
1546 }
1547
1548 fn update(&mut self, entity: UnifiedEntity) -> Result<(), SegmentError> {
1549 self.write().update(entity)
1550 }
1551
1552 fn update_hot(
1553 &mut self,
1554 entity: UnifiedEntity,
1555 modified_columns: &[String],
1556 ) -> Result<(), SegmentError> {
1557 self.write().update_hot(entity, modified_columns)
1558 }
1559
1560 fn delete(&mut self, id: EntityId) -> Result<bool, SegmentError> {
1561 self.write().delete(id)
1562 }
1563
1564 fn get_metadata(&self, id: EntityId) -> Option<Metadata> {
1565 self.read().get_metadata(id)
1566 }
1567
1568 fn set_metadata(&mut self, id: EntityId, metadata: Metadata) -> Result<(), SegmentError> {
1569 self.write().set_metadata(id, metadata)
1570 }
1571
1572 fn seal(&mut self) -> Result<(), SegmentError> {
1573 self.write().seal()
1574 }
1575
1576 fn should_seal(&self, config: &SegmentConfig) -> bool {
1577 self.read().should_seal(config)
1578 }
1579
1580 fn iter(&self) -> Box<dyn Iterator<Item = &UnifiedEntity> + '_> {
1581 Box::new(std::iter::empty())
1583 }
1584
1585 fn iter_kind(&self, _kind_filter: &str) -> Box<dyn Iterator<Item = &UnifiedEntity> + '_> {
1586 Box::new(std::iter::empty())
1587 }
1588
1589 fn filter_metadata(&self, filters: &[(String, MetadataFilter)]) -> Vec<EntityId> {
1590 self.read().filter_metadata(filters)
1591 }
1592}
1593
1594#[cfg(test)]
1595mod tests {
1596 use super::*;
1597 use crate::storage::schema::Value;
1598
1599 #[test]
1600 fn test_manager_basic() {
1601 let manager = SegmentManager::new("test_collection");
1602
1603 let entity = UnifiedEntity::table_row(
1604 manager.next_entity_id(),
1605 "users",
1606 1,
1607 vec![Value::text("Alice".to_string())],
1608 );
1609
1610 let id = manager.insert(entity).unwrap();
1611 assert!(manager.get(id).is_some());
1612 assert_eq!(manager.count(), 1);
1613 }
1614
1615 #[test]
1616 fn test_manager_auto_seal() {
1617 let config = ManagerConfig {
1618 segment_config: SegmentConfig {
1619 max_entities: 2,
1620 ..Default::default()
1621 },
1622 ..Default::default()
1623 };
1624
1625 let manager = SegmentManager::with_config("test", config);
1626
1627 manager
1629 .insert(UnifiedEntity::vector(
1630 manager.next_entity_id(),
1631 "v",
1632 vec![0.1],
1633 ))
1634 .unwrap();
1635
1636 manager
1638 .insert(UnifiedEntity::vector(
1639 manager.next_entity_id(),
1640 "v",
1641 vec![0.2],
1642 ))
1643 .unwrap();
1644
1645 manager
1647 .insert(UnifiedEntity::vector(
1648 manager.next_entity_id(),
1649 "v",
1650 vec![0.3],
1651 ))
1652 .unwrap();
1653
1654 let stats = manager.stats();
1655 assert_eq!(stats.total_entities, 3);
1656 }
1657
1658 #[test]
1659 fn test_manager_delete() {
1660 let manager = SegmentManager::new("test");
1661
1662 let id = manager
1663 .insert(UnifiedEntity::vector(
1664 manager.next_entity_id(),
1665 "v",
1666 vec![0.1],
1667 ))
1668 .unwrap();
1669
1670 assert!(manager.get(id).is_some());
1671 assert!(manager.delete(id).unwrap());
1672 assert!(manager.get(id).is_none());
1673 }
1674
1675 #[test]
1676 fn test_manager_metadata() {
1677 let manager = SegmentManager::new("test");
1678
1679 let id = manager
1680 .insert(UnifiedEntity::table_row(
1681 manager.next_entity_id(),
1682 "hosts",
1683 1,
1684 vec![Value::text("192.168.1.1".to_string())],
1685 ))
1686 .unwrap();
1687
1688 let mut meta = Metadata::new();
1689 meta.set(
1690 "os",
1691 super::super::metadata::MetadataValue::String("linux".to_string()),
1692 );
1693
1694 manager.set_metadata(id, meta).unwrap();
1695
1696 let retrieved = manager.get_metadata(id).unwrap();
1697 assert!(retrieved.has("os"));
1698 }
1699
1700 #[test]
1701 fn test_manager_query_by_kind() {
1702 let manager = SegmentManager::new("test");
1703
1704 manager
1705 .insert(UnifiedEntity::table_row(
1706 manager.next_entity_id(),
1707 "hosts",
1708 1,
1709 vec![],
1710 ))
1711 .unwrap();
1712
1713 manager
1714 .insert(UnifiedEntity::vector(
1715 manager.next_entity_id(),
1716 "embeddings",
1717 vec![0.1],
1718 ))
1719 .unwrap();
1720
1721 manager
1722 .insert(UnifiedEntity::table_row(
1723 manager.next_entity_id(),
1724 "hosts",
1725 2,
1726 vec![],
1727 ))
1728 .unwrap();
1729
1730 let rows = manager.get_by_kind("table");
1731 assert_eq!(rows.len(), 2);
1732
1733 let vectors = manager.get_by_kind("vector");
1734 assert_eq!(vectors.len(), 1);
1735 }
1736
1737 #[test]
1738 #[ignore = "lifecycle events intentionally no-op since the emit-channel refactor; drain_events returns empty — see SegmentManager::emit"]
1739 fn test_lifecycle_events() {
1740 let manager = SegmentManager::new("test");
1741
1742 manager
1743 .insert(UnifiedEntity::vector(
1744 manager.next_entity_id(),
1745 "v",
1746 vec![0.1],
1747 ))
1748 .unwrap();
1749
1750 let events = manager.drain_events();
1751
1752 assert!(events
1754 .iter()
1755 .any(|e| matches!(e, LifecycleEvent::SegmentCreated(_))));
1756 assert!(events
1757 .iter()
1758 .any(|e| matches!(e, LifecycleEvent::EntityInserted(_, _))));
1759 }
1760}