1use std::sync::Arc;
33use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
34
35use rustc_hash::FxHashMap;
36
37use crate::directories::DirectoryWriter;
38use crate::dsl::{Document, Field, Schema};
39use crate::error::{Error, Result};
40use crate::segment::{SegmentBuilder, SegmentBuilderConfig, SegmentId};
41use crate::tokenizer::BoxedTokenizer;
42
43use super::IndexConfig;
44
45const PIPELINE_MAX_SIZE_IN_DOCS: usize = 10_000;
47
48pub struct IndexWriter<D: DirectoryWriter + 'static> {
60 pub(super) directory: Arc<D>,
61 pub(super) schema: Arc<Schema>,
62 pub(super) config: IndexConfig,
63 doc_sender: async_channel::Sender<Document>,
66 workers: Vec<std::thread::JoinHandle<()>>,
68 worker_state: Arc<WorkerState<D>>,
70 pub(super) segment_manager: Arc<crate::merge::SegmentManager<D>>,
72 flushed_segments: Vec<(String, u32)>,
74 primary_key_index: Option<super::primary_key::PrimaryKeyIndex>,
76}
77
78struct WorkerState<D: DirectoryWriter + 'static> {
80 directory: Arc<D>,
81 schema: Arc<Schema>,
82 builder_config: SegmentBuilderConfig,
83 tokenizers: parking_lot::RwLock<FxHashMap<Field, BoxedTokenizer>>,
84 memory_budget_per_worker: usize,
86 segment_manager: Arc<crate::merge::SegmentManager<D>>,
88 built_segments: parking_lot::Mutex<Vec<(String, u32)>>,
90
91 flush_count: AtomicUsize,
98 flush_mutex: parking_lot::Mutex<()>,
100 flush_cvar: parking_lot::Condvar,
101 resume_receiver: parking_lot::Mutex<Option<async_channel::Receiver<Document>>>,
103 resume_epoch: AtomicUsize,
106 resume_cvar: parking_lot::Condvar,
108 shutdown: AtomicBool,
110 num_workers: usize,
112}
113
114impl<D: DirectoryWriter + 'static> IndexWriter<D> {
115 pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
117 Self::create_with_config(directory, schema, config, SegmentBuilderConfig::default()).await
118 }
119
120 pub async fn create_with_config(
122 directory: D,
123 schema: Schema,
124 config: IndexConfig,
125 builder_config: SegmentBuilderConfig,
126 ) -> Result<Self> {
127 let directory = Arc::new(directory);
128 let schema = Arc::new(schema);
129 let metadata = super::IndexMetadata::new((*schema).clone());
130
131 let segment_manager = Arc::new(crate::merge::SegmentManager::new(
132 Arc::clone(&directory),
133 Arc::clone(&schema),
134 metadata,
135 config.merge_policy.clone_box(),
136 config.term_cache_blocks,
137 config.max_concurrent_merges,
138 ));
139 segment_manager.update_metadata(|_| {}).await?;
140
141 Ok(Self::new_with_parts(
142 directory,
143 schema,
144 config,
145 builder_config,
146 segment_manager,
147 ))
148 }
149
150 pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
152 Self::open_with_config(directory, config, SegmentBuilderConfig::default()).await
153 }
154
155 pub async fn open_with_config(
157 directory: D,
158 config: IndexConfig,
159 builder_config: SegmentBuilderConfig,
160 ) -> Result<Self> {
161 let directory = Arc::new(directory);
162 let metadata = super::IndexMetadata::load(directory.as_ref()).await?;
163 let schema = Arc::new(metadata.schema.clone());
164
165 let segment_manager = Arc::new(crate::merge::SegmentManager::new(
166 Arc::clone(&directory),
167 Arc::clone(&schema),
168 metadata,
169 config.merge_policy.clone_box(),
170 config.term_cache_blocks,
171 config.max_concurrent_merges,
172 ));
173 segment_manager.load_and_publish_trained().await;
174
175 Ok(Self::new_with_parts(
176 directory,
177 schema,
178 config,
179 builder_config,
180 segment_manager,
181 ))
182 }
183
184 pub fn from_index(index: &super::Index<D>) -> Self {
187 Self::new_with_parts(
188 Arc::clone(&index.directory),
189 Arc::clone(&index.schema),
190 index.config.clone(),
191 SegmentBuilderConfig::default(),
192 Arc::clone(&index.segment_manager),
193 )
194 }
195
196 fn new_with_parts(
202 directory: Arc<D>,
203 schema: Arc<Schema>,
204 config: IndexConfig,
205 builder_config: SegmentBuilderConfig,
206 segment_manager: Arc<crate::merge::SegmentManager<D>>,
207 ) -> Self {
208 let registry = crate::tokenizer::TokenizerRegistry::new();
210 let mut tokenizers = FxHashMap::default();
211 for (field, entry) in schema.fields() {
212 if matches!(entry.field_type, crate::dsl::FieldType::Text)
213 && let Some(ref tok_name) = entry.tokenizer
214 && let Some(tok) = registry.get(tok_name)
215 {
216 tokenizers.insert(field, tok);
217 }
218 }
219
220 let num_workers = config.num_indexing_threads.max(1);
221 let worker_state = Arc::new(WorkerState {
222 directory: Arc::clone(&directory),
223 schema: Arc::clone(&schema),
224 builder_config,
225 tokenizers: parking_lot::RwLock::new(tokenizers),
226 memory_budget_per_worker: config.max_indexing_memory_bytes / num_workers,
227 segment_manager: Arc::clone(&segment_manager),
228 built_segments: parking_lot::Mutex::new(Vec::new()),
229 flush_count: AtomicUsize::new(0),
230 flush_mutex: parking_lot::Mutex::new(()),
231 flush_cvar: parking_lot::Condvar::new(),
232 resume_receiver: parking_lot::Mutex::new(None),
233 resume_epoch: AtomicUsize::new(0),
234 resume_cvar: parking_lot::Condvar::new(),
235 shutdown: AtomicBool::new(false),
236 num_workers,
237 });
238 let (doc_sender, workers) = Self::spawn_workers(&worker_state, num_workers);
239
240 Self {
241 directory,
242 schema,
243 config,
244 doc_sender,
245 workers,
246 worker_state,
247 segment_manager,
248 flushed_segments: Vec::new(),
249 primary_key_index: None,
250 }
251 }
252
253 fn spawn_workers(
254 worker_state: &Arc<WorkerState<D>>,
255 num_workers: usize,
256 ) -> (
257 async_channel::Sender<Document>,
258 Vec<std::thread::JoinHandle<()>>,
259 ) {
260 let (sender, receiver) = async_channel::bounded(PIPELINE_MAX_SIZE_IN_DOCS);
261 let handle = tokio::runtime::Handle::current();
262 let mut workers = Vec::with_capacity(num_workers);
263 for i in 0..num_workers {
264 let state = Arc::clone(worker_state);
265 let rx = receiver.clone();
266 let rt = handle.clone();
267 workers.push(
268 std::thread::Builder::new()
269 .name(format!("index-worker-{}", i))
270 .spawn(move || Self::worker_loop(state, rx, rt))
271 .expect("failed to spawn index worker thread"),
272 );
273 }
274 (sender, workers)
275 }
276
277 pub fn schema(&self) -> &Schema {
279 &self.schema
280 }
281
282 pub fn set_tokenizer<T: crate::tokenizer::Tokenizer>(&mut self, field: Field, tokenizer: T) {
285 self.worker_state
286 .tokenizers
287 .write()
288 .insert(field, Box::new(tokenizer));
289 }
290
291 pub async fn init_primary_key_dedup(&mut self) -> Result<()> {
307 use super::primary_key::{PK_BLOOM_FILE, deserialize_pk_bloom};
308
309 let field = match self.schema.primary_field() {
310 Some(f) => f,
311 None => return Ok(()),
312 };
313
314 let snapshot = self.segment_manager.acquire_snapshot().await;
315 let current_seg_ids: Vec<String> = snapshot.segment_ids().to_vec();
316
317 let cached = match self
319 .directory
320 .open_read(std::path::Path::new(PK_BLOOM_FILE))
321 .await
322 {
323 Ok(handle) => {
324 let data = handle.read_bytes_range(0..handle.len()).await;
325 match data {
326 Ok(bytes) => deserialize_pk_bloom(bytes.as_slice()),
327 Err(_) => None,
328 }
329 }
330 Err(_) => None,
331 };
332
333 let load_futures: Vec<_> = current_seg_ids
335 .iter()
336 .map(|seg_id_str| {
337 let seg_id_str = seg_id_str.clone();
338 let dir = self.directory.as_ref();
339 let schema = Arc::clone(&self.schema);
340 async move { load_pk_segment_data(dir, &seg_id_str, &schema).await }
341 })
342 .collect();
343 let all_data = futures::future::try_join_all(load_futures).await?;
344
345 if let Some((persisted_seg_ids, bloom)) = cached {
346 let mut pk_data = Vec::with_capacity(all_data.len());
348 let mut new_data = Vec::new();
349 for d in all_data {
350 if persisted_seg_ids.contains(&d.segment_id) {
351 pk_data.push(d);
352 } else {
353 new_data.push(d);
354 }
355 }
356 let needs_persist = !new_data.is_empty();
357 let new_start = pk_data.len();
358 pk_data.extend(new_data);
359
360 let pk_index = if new_start == pk_data.len() {
361 super::primary_key::PrimaryKeyIndex::from_persisted(
363 field,
364 bloom,
365 pk_data,
366 &[],
367 snapshot,
368 )
369 } else {
370 tokio::task::spawn_blocking(move || {
372 let mut bloom = bloom;
375 let mut added = 0usize;
376 let num_new = pk_data.len() - new_start;
377 for data in &pk_data[new_start..] {
378 if let Some(ff) = data.fast_fields.get(&field.0)
379 && let Some(dict) = ff.text_dict()
380 {
381 for key in dict.iter() {
382 bloom.insert(key.as_bytes());
383 added += 1;
384 }
385 }
386 }
387 if added > 0 {
388 log::info!(
389 "[primary_key] bloom: added {} keys from {} new segment(s)",
390 added,
391 num_new,
392 );
393 }
394 super::primary_key::PrimaryKeyIndex::from_persisted(
395 field,
396 bloom,
397 pk_data,
398 &[],
399 snapshot,
400 )
401 })
402 .await
403 .map_err(|e| Error::Internal(format!("spawn_blocking failed: {}", e)))?
404 };
405
406 if needs_persist {
407 self.persist_pk_bloom(&pk_index, ¤t_seg_ids).await;
408 }
409
410 self.primary_key_index = Some(pk_index);
411 } else {
412 let pk_index = tokio::task::spawn_blocking(move || {
414 super::primary_key::PrimaryKeyIndex::new(field, all_data, snapshot)
415 })
416 .await
417 .map_err(|e| Error::Internal(format!("spawn_blocking failed: {}", e)))?;
418
419 self.persist_pk_bloom(&pk_index, ¤t_seg_ids).await;
420 self.primary_key_index = Some(pk_index);
421 }
422
423 Ok(())
424 }
425
426 async fn persist_pk_bloom(
429 &self,
430 pk_index: &super::primary_key::PrimaryKeyIndex,
431 segment_ids: &[String],
432 ) {
433 use super::primary_key::{PK_BLOOM_FILE, serialize_pk_bloom};
434
435 let bloom_bytes = pk_index.bloom_to_bytes();
436 let data = serialize_pk_bloom(segment_ids, &bloom_bytes);
437 if let Err(e) = self
438 .directory
439 .write(std::path::Path::new(PK_BLOOM_FILE), &data)
440 .await
441 {
442 log::warn!("[primary_key] failed to persist bloom cache: {}", e);
443 }
444 }
445
446 pub fn add_document(&self, doc: Document) -> Result<()> {
451 if let Some(ref pk_index) = self.primary_key_index {
452 pk_index.check_and_insert(&doc)?;
453 }
454 match self.doc_sender.try_send(doc) {
455 Ok(()) => Ok(()),
456 Err(async_channel::TrySendError::Full(doc)) => {
457 if let Some(ref pk_index) = self.primary_key_index {
459 pk_index.rollback_uncommitted_key(&doc);
460 }
461 Err(Error::QueueFull)
462 }
463 Err(async_channel::TrySendError::Closed(doc)) => {
464 if let Some(ref pk_index) = self.primary_key_index {
466 pk_index.rollback_uncommitted_key(&doc);
467 }
468 Err(Error::Internal("Document channel closed".into()))
469 }
470 }
471 }
472
473 pub fn add_documents(&self, documents: Vec<Document>) -> Result<usize> {
478 let total = documents.len();
479 for (i, doc) in documents.into_iter().enumerate() {
480 match self.add_document(doc) {
481 Ok(()) => {}
482 Err(Error::QueueFull) => return Ok(i),
483 Err(e) => return Err(e),
484 }
485 }
486 Ok(total)
487 }
488
489 fn worker_loop(
502 state: Arc<WorkerState<D>>,
503 initial_receiver: async_channel::Receiver<Document>,
504 handle: tokio::runtime::Handle,
505 ) {
506 let mut receiver = initial_receiver;
507 let mut my_epoch = 0usize;
508
509 loop {
510 let build_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
514 let mut builder: Option<SegmentBuilder> = None;
515
516 while let Ok(doc) = receiver.recv_blocking() {
517 if builder.is_none() {
519 match SegmentBuilder::new(
520 Arc::clone(&state.schema),
521 state.builder_config.clone(),
522 ) {
523 Ok(mut b) => {
524 for (field, tokenizer) in state.tokenizers.read().iter() {
525 b.set_tokenizer(*field, tokenizer.clone_box());
526 }
527 builder = Some(b);
528 }
529 Err(e) => {
530 log::error!("Failed to create segment builder: {:?}", e);
531 continue;
532 }
533 }
534 }
535
536 let b = builder.as_mut().unwrap();
537 if let Err(e) = b.add_document(doc) {
538 log::error!("Failed to index document: {:?}", e);
539 continue;
540 }
541
542 let builder_memory = b.estimated_memory_bytes();
543
544 if b.num_docs() & 0x3FFF == 0 {
545 log::debug!(
546 "[indexing] docs={}, memory={:.2} MB, budget={:.2} MB",
547 b.num_docs(),
548 builder_memory as f64 / (1024.0 * 1024.0),
549 state.memory_budget_per_worker as f64 / (1024.0 * 1024.0)
550 );
551 }
552
553 const MIN_DOCS_BEFORE_FLUSH: u32 = 100;
555
556 let effective_budget = state.memory_budget_per_worker * 4 / 5;
560
561 if builder_memory >= effective_budget && b.num_docs() >= MIN_DOCS_BEFORE_FLUSH {
562 log::info!(
563 "[indexing] memory budget reached, building segment: \
564 docs={}, memory={:.2} MB, budget={:.2} MB",
565 b.num_docs(),
566 builder_memory as f64 / (1024.0 * 1024.0),
567 state.memory_budget_per_worker as f64 / (1024.0 * 1024.0),
568 );
569 let full_builder = builder.take().unwrap();
570 Self::build_segment_inline(&state, full_builder, &handle);
571 }
572 }
573
574 if let Some(b) = builder.take()
576 && b.num_docs() > 0
577 {
578 Self::build_segment_inline(&state, b, &handle);
579 }
580 }));
581
582 if build_result.is_err() {
583 log::error!(
584 "[worker] panic during indexing cycle — documents in this cycle may be lost"
585 );
586 }
587
588 let prev = state.flush_count.fetch_add(1, Ordering::Release);
591 if prev + 1 == state.num_workers {
592 let _lock = state.flush_mutex.lock();
594 state.flush_cvar.notify_one();
595 }
596
597 {
601 let mut lock = state.resume_receiver.lock();
602 loop {
603 if state.shutdown.load(Ordering::Acquire) {
604 return;
605 }
606 let current_epoch = state.resume_epoch.load(Ordering::Acquire);
607 if current_epoch > my_epoch
608 && let Some(rx) = lock.as_ref()
609 {
610 receiver = rx.clone();
611 my_epoch = current_epoch;
612 break;
613 }
614 state.resume_cvar.wait(&mut lock);
615 }
616 }
617 }
618 }
619
620 fn build_segment_inline(
624 state: &WorkerState<D>,
625 builder: SegmentBuilder,
626 handle: &tokio::runtime::Handle,
627 ) {
628 let segment_id = SegmentId::new();
629 let segment_hex = segment_id.to_hex();
630 let trained = state.segment_manager.trained();
631 let doc_count = builder.num_docs();
632 let build_start = std::time::Instant::now();
633
634 log::info!(
635 "[segment_build] segment_id={} doc_count={} ann={}",
636 segment_hex,
637 doc_count,
638 trained.is_some()
639 );
640
641 match handle.block_on(builder.build(
642 state.directory.as_ref(),
643 segment_id,
644 trained.as_deref(),
645 )) {
646 Ok(meta) if meta.num_docs > 0 => {
647 let duration_ms = build_start.elapsed().as_millis() as u64;
648 log::info!(
649 "[segment_build_done] segment_id={} doc_count={} duration_ms={}",
650 segment_hex,
651 meta.num_docs,
652 duration_ms,
653 );
654 state
655 .built_segments
656 .lock()
657 .push((segment_hex, meta.num_docs));
658 }
659 Ok(_) => {}
660 Err(e) => {
661 log::error!(
662 "[segment_build_failed] segment_id={} error={:?}",
663 segment_hex,
664 e
665 );
666 }
667 }
668 }
669
670 pub async fn maybe_merge(&self) {
676 self.segment_manager.maybe_merge().await;
677 }
678
679 pub async fn abort_merges(&self) {
681 self.segment_manager.abort_merges().await;
682 }
683
684 pub async fn wait_for_merging_thread(&self) {
686 self.segment_manager.wait_for_merging_thread().await;
687 }
688
689 pub async fn wait_for_all_merges(&self) {
691 self.segment_manager.wait_for_all_merges().await;
692 }
693
694 pub fn tracker(&self) -> std::sync::Arc<crate::segment::SegmentTracker> {
696 self.segment_manager.tracker()
697 }
698
699 pub async fn acquire_snapshot(&self) -> crate::segment::SegmentSnapshot {
701 self.segment_manager.acquire_snapshot().await
702 }
703
704 pub async fn cleanup_orphan_segments(&self) -> Result<usize> {
706 self.segment_manager.cleanup_orphan_segments().await
707 }
708
709 pub async fn prepare_commit(&mut self) -> Result<PreparedCommit<'_, D>> {
720 self.doc_sender.close();
722
723 self.worker_state.resume_cvar.notify_all();
727
728 let state = Arc::clone(&self.worker_state);
731 let all_flushed = tokio::task::spawn_blocking(move || {
732 let mut lock = state.flush_mutex.lock();
733 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(300);
734 while state.flush_count.load(Ordering::Acquire) < state.num_workers {
735 let remaining = deadline.saturating_duration_since(std::time::Instant::now());
736 if remaining.is_zero() {
737 log::error!(
738 "[prepare_commit] timed out waiting for workers: {}/{} flushed",
739 state.flush_count.load(Ordering::Acquire),
740 state.num_workers
741 );
742 return false;
743 }
744 state.flush_cvar.wait_for(&mut lock, remaining);
745 }
746 true
747 })
748 .await
749 .map_err(|e| Error::Internal(format!("Failed to wait for workers: {}", e)))?;
750
751 if !all_flushed {
752 self.resume_workers();
754 return Err(Error::Internal(format!(
755 "prepare_commit timed out: {}/{} workers flushed",
756 self.worker_state.flush_count.load(Ordering::Acquire),
757 self.worker_state.num_workers
758 )));
759 }
760
761 let built = std::mem::take(&mut *self.worker_state.built_segments.lock());
763 self.flushed_segments.extend(built);
764
765 Ok(PreparedCommit {
766 writer: self,
767 is_resolved: false,
768 })
769 }
770
771 pub async fn commit(&mut self) -> Result<bool> {
776 self.prepare_commit().await?.commit().await
777 }
778
779 pub async fn force_merge(&mut self) -> Result<()> {
781 self.prepare_commit().await?.commit().await?;
782 self.segment_manager.force_merge().await
783 }
784
785 pub async fn reorder(&mut self) -> Result<()> {
790 self.prepare_commit().await?.commit().await?;
791 self.segment_manager.reorder_segments().await
792 }
793
794 pub fn segment_manager(&self) -> &Arc<crate::merge::SegmentManager<D>> {
796 &self.segment_manager
797 }
798
799 fn resume_workers(&mut self) {
804 if tokio::runtime::Handle::try_current().is_err() {
805 self.worker_state.shutdown.store(true, Ordering::Release);
808 self.worker_state.resume_cvar.notify_all();
809 return;
810 }
811
812 self.worker_state.flush_count.store(0, Ordering::Release);
814
815 let (sender, receiver) = async_channel::bounded(PIPELINE_MAX_SIZE_IN_DOCS);
817 self.doc_sender = sender;
818
819 {
821 let mut lock = self.worker_state.resume_receiver.lock();
822 *lock = Some(receiver);
823 }
824 self.worker_state
825 .resume_epoch
826 .fetch_add(1, Ordering::Release);
827 self.worker_state.resume_cvar.notify_all();
828 }
829
830 }
832
833impl<D: DirectoryWriter + 'static> Drop for IndexWriter<D> {
834 fn drop(&mut self) {
835 self.worker_state.shutdown.store(true, Ordering::Release);
837 self.doc_sender.close();
839 self.worker_state.resume_cvar.notify_all();
841 for w in std::mem::take(&mut self.workers) {
843 let _ = w.join();
844 }
845 }
846}
847
848pub struct PreparedCommit<'a, D: DirectoryWriter + 'static> {
855 writer: &'a mut IndexWriter<D>,
856 is_resolved: bool,
857}
858
859impl<'a, D: DirectoryWriter + 'static> PreparedCommit<'a, D> {
860 pub async fn commit(mut self) -> Result<bool> {
864 self.is_resolved = true;
865 let segments = std::mem::take(&mut self.writer.flushed_segments);
866
867 if segments.is_empty() {
869 log::debug!("[commit] no segments to commit, skipping");
870 self.writer.resume_workers();
871 return Ok(false);
872 }
873
874 self.writer.segment_manager.commit(segments).await?;
875
876 if let Some(ref mut pk_index) = self.writer.primary_key_index {
878 let snapshot = self.writer.segment_manager.acquire_snapshot().await;
879 let existing_ids: std::collections::HashSet<&str> =
880 pk_index.committed_segment_ids().collect();
881
882 let load_futures: Vec<_> = snapshot
884 .segment_ids()
885 .iter()
886 .filter(|id| !existing_ids.contains(id.as_str()))
887 .map(|seg_id_str| {
888 let seg_id_str = seg_id_str.clone();
889 let dir = self.writer.directory.as_ref();
890 let schema = Arc::clone(&self.writer.schema);
891 async move { load_pk_segment_data(dir, &seg_id_str, &schema).await }
892 })
893 .collect();
894 let new_data = futures::future::try_join_all(load_futures).await?;
895
896 let seg_ids: Vec<String> = snapshot.segment_ids().to_vec();
897 pk_index.refresh_incremental(new_data, snapshot);
898
899 let bloom_bytes = pk_index.bloom_to_bytes();
901 let data = super::primary_key::serialize_pk_bloom(&seg_ids, &bloom_bytes);
902 if let Err(e) = self
903 .writer
904 .directory
905 .write(
906 std::path::Path::new(super::primary_key::PK_BLOOM_FILE),
907 &data,
908 )
909 .await
910 {
911 log::warn!("[primary_key] failed to persist bloom cache: {}", e);
912 }
913 }
914
915 self.writer.segment_manager.maybe_merge().await;
916 self.writer.resume_workers();
917 Ok(true)
918 }
919
920 pub fn abort(mut self) {
923 self.is_resolved = true;
924 self.writer.flushed_segments.clear();
925 if let Some(ref mut pk_index) = self.writer.primary_key_index {
926 pk_index.clear_uncommitted();
927 }
928 self.writer.resume_workers();
929 }
930}
931
932impl<D: DirectoryWriter + 'static> Drop for PreparedCommit<'_, D> {
933 fn drop(&mut self) {
934 if !self.is_resolved {
935 log::warn!("PreparedCommit dropped without commit/abort — auto-aborting");
936 self.writer.flushed_segments.clear();
937 if let Some(ref mut pk_index) = self.writer.primary_key_index {
938 pk_index.clear_uncommitted();
939 }
940 self.writer.resume_workers();
941 }
942 }
943}
944
945async fn load_pk_segment_data<D: crate::directories::Directory>(
947 dir: &D,
948 seg_id_str: &str,
949 schema: &Arc<crate::dsl::Schema>,
950) -> Result<super::primary_key::PkSegmentData> {
951 let seg_id = crate::segment::SegmentId::from_hex(seg_id_str)
952 .ok_or_else(|| Error::Internal(format!("Invalid segment id: {}", seg_id_str)))?;
953 let files = crate::segment::SegmentFiles::new(seg_id.0);
954 let fast_fields =
955 crate::segment::reader::loader::load_fast_fields_file(dir, &files, schema).await?;
956 Ok(super::primary_key::PkSegmentData {
957 segment_id: seg_id_str.to_string(),
958 fast_fields,
959 })
960}