1use crate::analysis::{AnalyzerRegistry, Token};
11use crate::core::{FieldId, LuciError, Result, SegmentId};
12use crate::mapping::{DynamicMode, FieldType, Mapping};
13use crate::storage::Storage;
14
15use crate::columnar::writer::ColumnValue;
16use crate::segment::builder::SegmentBuilder;
17use crate::spatial::geo::GeoPoint;
18use crate::vector::global::GlobalHnsw;
19use crate::vector::hnsw::BuildThreads;
20
21const DEFAULT_MEMORY_BUDGET: usize = 64 * 1024 * 1024;
23
24pub struct IndexWriter {
31 storage: Box<dyn Storage>,
32 schema: Mapping,
33 analyzers: AnalyzerRegistry,
34 builder: SegmentBuilder,
35 next_segment_id: u64,
36 memory_budget: usize,
37 buffer_size: usize,
39 merge_policy: crate::merge_policy::MergePolicy,
41 pending_deletions: crate::deletion::DeletionMap,
43 analysis_json: Option<serde_json::Value>,
46 global_hnsw: GlobalHnsw,
52 build_threads: BuildThreads,
57}
58
59impl IndexWriter {
60 pub fn new(
62 storage: impl Storage + 'static,
63 schema: Mapping,
64 analyzers: AnalyzerRegistry,
65 ) -> Self {
66 let next_id = storage.generation() + 1;
67 let builder = SegmentBuilder::new(SegmentId::new(next_id), &schema);
68 let global_hnsw = GlobalHnsw::new(&schema);
69 Self {
70 storage: Box::new(storage),
71 schema,
72 analyzers,
73 builder,
74 next_segment_id: next_id,
75 memory_budget: DEFAULT_MEMORY_BUDGET,
76 buffer_size: 0,
77 merge_policy: crate::merge_policy::MergePolicy::default(),
78 pending_deletions: crate::deletion::DeletionMap::new(),
79 analysis_json: None,
80 global_hnsw,
81 build_threads: BuildThreads::Ambient,
82 }
83 }
84
85 pub fn set_build_threads(&mut self, threads: BuildThreads) {
91 self.build_threads = threads;
92 }
93
94 pub fn set_analysis_json(&mut self, json: Option<serde_json::Value>) {
96 self.analysis_json = json;
97 }
98
99 pub fn load_deletions(&mut self, deletions: crate::deletion::DeletionMap) {
101 self.pending_deletions = deletions;
102 }
103
104 pub fn load_global_hnsw(&mut self, global_hnsw: GlobalHnsw) {
107 self.global_hnsw = global_hnsw;
108 }
109
110 pub fn global_hnsw(&self) -> &GlobalHnsw {
113 &self.global_hnsw
114 }
115
116 pub fn mark_deleted(&mut self, segment_id: SegmentId, doc_id: crate::core::DocId) {
118 self.pending_deletions.mark_deleted(segment_id, doc_id);
119 }
120
121 pub fn deletions(&self) -> &crate::deletion::DeletionMap {
123 &self.pending_deletions
124 }
125
126 pub fn set_memory_budget(&mut self, budget: usize) {
128 self.memory_budget = budget;
129 }
130
131 pub fn set_write_timeout(&mut self, timeout: std::time::Duration) {
136 self.storage.set_write_timeout(timeout);
137 }
138
139 pub fn add(&mut self, doc: serde_json::Value) -> Result<()> {
144 let obj = doc
145 .as_object()
146 .ok_or_else(|| LuciError::InvalidQuery("document must be a JSON object".into()))?;
147
148 let source = serde_json::to_vec(&doc)
149 .map_err(|e| LuciError::InvalidQuery(format!("JSON serialization failed: {e}")))?;
150
151 let mut analyzed_fields: Vec<(FieldId, Vec<Token>)> = Vec::new();
152 let mut column_values: Vec<(FieldId, ColumnValue)> = Vec::new();
153
154 let doc_id_str = match obj.get("_id").and_then(|v| v.as_str()) {
159 Some(id) => id.to_string(),
160 None => uuid::Uuid::new_v4().to_string(),
161 };
162 if let Some(id_field_id) = self.schema.field_id("_id") {
163 analyzed_fields.push((id_field_id, vec![Token::new(doc_id_str.clone(), 0, 0, 0)]));
164 column_values.push((id_field_id, ColumnValue::keyword(doc_id_str)?));
165 }
166 let mut vector_fields: Vec<(FieldId, Vec<f32>)> = Vec::new();
167 let mut geo_points: Vec<(FieldId, GeoPoint)> = Vec::new();
168 let mut geo_shapes: Vec<(FieldId, ::geo::Geometry<f64>)> = Vec::new();
169 let mut copy_to_pairs: Vec<(String, String)> = Vec::new(); for (field_name, value) in obj {
172 if field_name == "_id" {
177 continue;
178 }
179 let field_id = match self.schema.field_id(field_name) {
181 Some(id) => id,
182 None => match self.schema.dynamic_mode() {
183 DynamicMode::False => continue, DynamicMode::True => {
185 continue;
188 }
189 },
190 };
191
192 let mapping = self.schema.field(field_id);
193
194 let tokens = match &mapping.field_type {
196 FieldType::Text => {
197 let text = value.as_str().unwrap_or_default();
198 let analyzer_name = mapping.analyzer.as_deref().unwrap_or("standard");
199 let analyzer = self.analyzers.get(analyzer_name);
200 analyzer.analyze(text)
201 }
202 FieldType::Keyword => {
203 let text = match value {
204 serde_json::Value::String(s) => s.clone(),
205 other => other.to_string(),
206 };
207 vec![Token::new(text.clone(), 0, 0, 0)]
208 }
209 FieldType::Ip => {
210 let text = value.as_str().unwrap_or_default();
212 let normalized = crate::ip::normalize_ip(text);
213 if normalized.is_empty() {
214 Vec::new()
215 } else {
216 vec![Token::new(normalized, 0, 0, 0)]
217 }
218 }
219 _ => {
220 Vec::new()
222 }
223 };
224
225 if !tokens.is_empty() && mapping.indexed {
226 analyzed_fields.push((field_id, tokens));
227 }
228
229 if matches!(mapping.field_type, FieldType::GeoShape) {
231 if let Some(geom) = crate::spatial::shape::parse_geojson(value) {
232 geo_shapes.push((field_id, geom));
233 }
234 }
235
236 if matches!(mapping.field_type, FieldType::GeoPoint) {
238 if let Some(point) = GeoPoint::from_json(value) {
239 geo_points.push((field_id, point));
240 }
241 }
242
243 if mapping.field_type.is_dense_vector() {
245 if let serde_json::Value::Array(arr) = value {
246 let vec: Vec<f32> = arr
247 .iter()
248 .filter_map(|v| v.as_f64().map(|f| f as f32))
249 .collect();
250 if !vec.is_empty() {
251 vector_fields.push((field_id, vec));
252 }
253 }
254 }
255
256 if mapping.doc_values {
258 let col_val = match &mapping.field_type {
259 FieldType::Keyword => match value {
260 serde_json::Value::String(s) => ColumnValue::keyword(s.clone())?,
261 serde_json::Value::Null => ColumnValue::Null,
262 other => ColumnValue::keyword(other.to_string())?,
263 },
264 FieldType::Integer | FieldType::Long => match value {
265 serde_json::Value::Number(n) => ColumnValue::I64(n.as_i64().unwrap_or(0)),
266 _ => ColumnValue::Null,
267 },
268 FieldType::Float | FieldType::Double => match value {
269 serde_json::Value::Number(n) => ColumnValue::F64(n.as_f64().unwrap_or(0.0)),
270 _ => ColumnValue::Null,
271 },
272 FieldType::Boolean => match value {
273 serde_json::Value::Bool(b) => ColumnValue::Bool(*b),
274 _ => ColumnValue::Null,
275 },
276 FieldType::TokenCount => {
277 let text = value.as_str().unwrap_or_default();
278 let analyzer_name = mapping.analyzer.as_deref().unwrap_or("standard");
279 let analyzer = self.analyzers.get(analyzer_name);
280 ColumnValue::I64(analyzer.analyze(text).len() as i64)
281 }
282 FieldType::Ip => {
283 let text = value.as_str().unwrap_or_default();
284 match crate::ip::ip_to_i64(text) {
285 Some(v) => ColumnValue::I64(v),
286 None => ColumnValue::Null,
287 }
288 }
289 _ => ColumnValue::Null, };
291 column_values.push((field_id, col_val));
292 }
293
294 if !mapping.copy_to.is_empty() {
296 let source_text = match value {
297 serde_json::Value::String(s) => s.clone(),
298 other => other.to_string(),
299 };
300 for target in &mapping.copy_to {
301 copy_to_pairs.push((target.clone(), source_text.clone()));
302 }
303 }
304 }
305
306 for (target_name, source_text) in ©_to_pairs {
312 let target_id = self.schema.field_id(target_name).unwrap_or_else(|| {
313 panic!(
314 "copy_to target \"{target_name}\" missing from schema; \
315 Mapping::validate should have rejected this at index \
316 creation. This is an internal wiring bug, not user input."
317 );
318 });
319 let target_mapping = self.schema.field(target_id);
320 if !target_mapping.indexed {
321 continue;
322 }
323
324 let tokens = match &target_mapping.field_type {
325 FieldType::Text => {
326 let analyzer_name = target_mapping.analyzer.as_deref().unwrap_or("standard");
327 let analyzer = self.analyzers.get(analyzer_name);
328 analyzer.analyze(source_text)
329 }
330 FieldType::Keyword => {
331 vec![Token::new(source_text.clone(), 0, 0, 0)]
332 }
333 _ => continue,
334 };
335 if !tokens.is_empty() {
336 analyzed_fields.push((target_id, tokens));
337 }
338 }
339
340 for mapping in self.schema.fields() {
344 if let Some(ref parent_name) = mapping.parent_field {
345 let parent_value = match obj.get(parent_name) {
347 Some(v) => v,
348 None => continue,
349 };
350 let field_id = match self.schema.field_id(&mapping.name) {
351 Some(id) => id,
352 None => continue,
353 };
354
355 if mapping.indexed {
356 let tokens = match &mapping.field_type {
357 FieldType::Text => {
358 let text = parent_value.as_str().unwrap_or_default();
359 let analyzer_name = mapping.analyzer.as_deref().unwrap_or("standard");
360 let analyzer = self.analyzers.get(analyzer_name);
361 analyzer.analyze(text)
362 }
363 FieldType::Keyword => {
364 let text = match parent_value {
365 serde_json::Value::String(s) => s.clone(),
366 other => other.to_string(),
367 };
368 vec![Token::new(text, 0, 0, 0)]
369 }
370 _ => continue,
371 };
372 analyzed_fields.push((field_id, tokens));
373 }
374
375 if mapping.doc_values {
376 let col_val = match &mapping.field_type {
377 FieldType::Keyword => {
378 let text = match parent_value {
379 serde_json::Value::String(s) => s.clone(),
380 other => other.to_string(),
381 };
382 ColumnValue::keyword(text)?
383 }
384 _ => continue,
385 };
386 column_values.push((field_id, col_val));
387 }
388 }
389 }
390
391 self.buffer_size += source.len();
392
393 let has_nested = self
395 .schema
396 .fields()
397 .iter()
398 .any(|f| matches!(f.field_type, FieldType::Nested));
399
400 let segment_id = self.builder.segment_id();
405 let local_doc_id = self.builder.doc_count();
406
407 self.builder.add_document(&analyzed_fields, &source);
408
409 if has_nested {
410 self.builder.mark_parent();
411 }
412
413 for (field_id, col_val) in column_values {
415 self.builder.add_column_value(field_id, col_val);
416 }
417
418 for (field_id, vec) in vector_fields {
427 self.global_hnsw
428 .store_vector(field_id, segment_id, local_doc_id, vec)?;
429 }
430
431 for (field_id, point) in geo_points {
433 self.builder.add_geo_point(field_id, point);
434 }
435
436 for (field_id, geom) in &geo_shapes {
438 self.builder.add_geo_shape(*field_id, geom);
439 }
440
441 for mapping in self.schema.fields() {
443 if !matches!(mapping.field_type, FieldType::Nested) {
444 continue;
445 }
446 let field_name = &mapping.name;
447 if let Some(serde_json::Value::Array(nested_arr)) = obj.get(field_name) {
448 for nested_obj in nested_arr {
449 if let Some(nested_map) = nested_obj.as_object() {
450 let mut nested_fields: Vec<(FieldId, Vec<Token>)> = Vec::new();
452 for (nested_key, nested_val) in nested_map {
453 let prefixed = format!("{field_name}.{nested_key}");
455 if let Some(fid) = self.schema.field_id(&prefixed) {
456 let m = self.schema.field(fid);
457 let tokens = match &m.field_type {
458 FieldType::Text => {
459 let text = nested_val.as_str().unwrap_or_default();
460 let analyzer = self
461 .analyzers
462 .get(m.analyzer.as_deref().unwrap_or("standard"));
463 analyzer.analyze(text)
464 }
465 FieldType::Keyword => {
466 let text = match nested_val {
467 serde_json::Value::String(s) => s.clone(),
468 other => other.to_string(),
469 };
470 vec![Token::new(text, 0, 0, 0)]
471 }
472 _ => continue,
473 };
474 if !tokens.is_empty() {
475 nested_fields.push((fid, tokens));
476 }
477 }
478 }
479 self.builder.add_document(&nested_fields, b"{}");
481 self.builder.mark_nested();
482 }
483 }
484 }
485 }
486
487 if self.buffer_size >= self.memory_budget {
489 self.flush()?;
490 }
491
492 Ok(())
493 }
494
495 fn flush(&mut self) -> Result<()> {
497 if self.builder.is_empty() {
498 return Ok(());
499 }
500
501 let segment_id = SegmentId::new(self.next_segment_id);
502 self.next_segment_id += 1;
503
504 let builder = std::mem::replace(
506 &mut self.builder,
507 SegmentBuilder::new(SegmentId::new(self.next_segment_id), &self.schema),
508 );
509
510 let segment_data = builder.build();
511 self.storage.write_segment(segment_id, &segment_data)?;
512 self.buffer_size = 0;
513
514 Ok(())
515 }
516
517 pub fn commit(&mut self) -> Result<()> {
524 self.flush()?;
525 let mut mapping_json = self.schema.to_json();
527 if let Some(ref analysis) = self.analysis_json {
528 if let Some(obj) = mapping_json.as_object_mut() {
529 let mut settings = serde_json::Map::new();
530 settings.insert("analysis".to_string(), analysis.clone());
531 obj.insert("settings".to_string(), serde_json::Value::Object(settings));
532 }
533 }
534 let mapping_bytes = serde_json::to_vec(&mapping_json).map_err(|e| {
535 LuciError::Io(std::io::Error::new(
536 std::io::ErrorKind::Other,
537 format!("failed to serialize mapping: {e}"),
538 ))
539 })?;
540
541 let deletion_bytes = self.pending_deletions.to_bytes();
546 let mut metadata = Vec::with_capacity(4 + mapping_bytes.len() + 4 + deletion_bytes.len());
547 metadata.extend_from_slice(&(mapping_bytes.len() as u32).to_le_bytes());
548 metadata.extend_from_slice(&mapping_bytes);
549 metadata.extend_from_slice(&(deletion_bytes.len() as u32).to_le_bytes());
550 metadata.extend_from_slice(&deletion_bytes);
551 self.storage.set_user_metadata(metadata);
552
553 for field_id in self.global_hnsw.non_empty_field_ids() {
561 self.global_hnsw
562 .connect_pending(field_id, self.build_threads);
563 }
564
565 for field_id in self.global_hnsw.non_empty_field_ids() {
569 if let Some(bytes) = self.global_hnsw.field_to_bytes(field_id) {
570 self.storage.write_vector_index(field_id, &bytes)?;
571 }
572 }
573
574 self.storage.commit()?;
575 self.maybe_merge()?;
576
577 Ok(())
578 }
579
580 fn maybe_merge(&mut self) -> Result<()> {
582 use crate::merge_policy::{SegmentInfo, find_merge};
583
584 let infos: Vec<SegmentInfo> = self
585 .storage
586 .segments()
587 .iter()
588 .map(|e| {
589 SegmentInfo {
590 segment_id: e.segment_id,
591 size_bytes: e.data_len,
592 doc_count: 0, deletion_count: 0,
594 }
595 })
596 .collect();
597
598 let candidate = match find_merge(&self.merge_policy, &infos) {
599 Some(c) => c,
600 None => return Ok(()),
601 };
602
603 self.execute_merge(&candidate.segment_ids)
604 }
605
606 fn execute_merge(&mut self, source_ids: &[SegmentId]) -> Result<()> {
608 use crate::deletion::DeletionMap;
609 use crate::segment::reader::SegmentReader;
610
611 let mut readers = Vec::new();
613 let mut segment_data = Vec::new();
614 for &seg_id in source_ids {
615 let data = self.storage.read_segment(seg_id)?;
616 segment_data.push(data);
617 }
618 for data in &segment_data {
619 readers.push(SegmentReader::open(data.clone())?);
620 }
621 let reader_refs: Vec<&SegmentReader> = readers.iter().collect();
622
623 let new_id = SegmentId::new(self.next_segment_id);
625 self.next_segment_id += 1;
626
627 let deletions = DeletionMap::new();
628 let merge_output = crate::merge::merge_segments(
629 new_id,
630 &reader_refs,
631 &deletions,
632 &self.schema,
633 &self.analyzers,
634 )?;
635
636 self.storage.write_segment(new_id, &merge_output.bytes)?;
638
639 self.storage.remove_segments(source_ids);
641
642 self.global_hnsw
647 .rewrite_after_merge(&merge_output.ord_remap);
648
649 for field_id in self.global_hnsw.non_empty_field_ids() {
660 if let Some(bytes) = self.global_hnsw.field_to_bytes(field_id) {
661 self.storage.write_vector_index(field_id, &bytes)?;
662 }
663 }
664
665 self.storage.commit()?;
667
668 Ok(())
669 }
670
671 pub fn force_merge(&mut self, max_segments: usize) -> Result<()> {
677 loop {
678 let segments = self.storage.segments();
679 if segments.len() <= max_segments {
680 break;
681 }
682
683 let ids: Vec<SegmentId> = segments
685 .iter()
686 .take(self.merge_policy.max_merge_at_once)
687 .map(|e| e.segment_id)
688 .collect();
689
690 if ids.len() < 2 {
691 break;
692 }
693
694 self.execute_merge(&ids)?;
695 }
696 Ok(())
697 }
698
699 pub fn buffered_doc_count(&self) -> u32 {
701 self.builder.doc_count()
702 }
703
704 pub fn discard_buffer(&mut self) {
708 let seg_id = SegmentId::new(self.next_segment_id);
709 self.builder = SegmentBuilder::new(seg_id, &self.schema);
710 self.buffer_size = 0;
711 }
712}
713
714#[cfg(test)]
715mod tests {
716 use super::*;
717 use crate::mapping::FieldType;
718 use crate::storage::SingleFileDirectory;
719 use std::path::PathBuf;
720
721 fn test_dir(name: &str) -> PathBuf {
722 let dir =
723 std::env::temp_dir().join(format!("luci_writer_test_{}_{name}", std::process::id()));
724 let _ = std::fs::remove_dir_all(&dir);
725 dir
726 }
727
728 fn cleanup(path: &std::path::Path) {
729 let _ = std::fs::remove_dir_all(path);
730 }
731
732 fn basic_setup(name: &str) -> (PathBuf, IndexWriter) {
733 let path = test_dir(name);
734 let storage = SingleFileDirectory::create(&path).unwrap();
735 let schema = Mapping::builder()
736 .field("title", FieldType::Text)
737 .field("status", FieldType::Keyword)
738 .build();
739 let analyzers = AnalyzerRegistry::new();
740 let writer = IndexWriter::new(storage, schema, analyzers);
741 (path, writer)
742 }
743
744 #[test]
745 fn put_single_doc_and_commit() {
746 let (path, mut writer) = basic_setup("single");
747 writer
748 .add(serde_json::json!({
749 "title": "hello world",
750 "status": "active"
751 }))
752 .unwrap();
753 writer.commit().unwrap();
754
755 let storage = SingleFileDirectory::open(&path).unwrap();
757 assert_eq!(storage.segments().len(), 1);
758
759 cleanup(&path);
760 }
761
762 #[test]
763 fn put_multiple_docs_and_commit() {
764 let (path, mut writer) = basic_setup("multi");
765 for i in 0..10 {
766 writer
767 .add(serde_json::json!({
768 "title": format!("document {i}"),
769 "status": "published"
770 }))
771 .unwrap();
772 }
773 writer.commit().unwrap();
774
775 let storage = SingleFileDirectory::open(&path).unwrap();
776 assert_eq!(storage.segments().len(), 1);
777
778 cleanup(&path);
779 }
780
781 #[test]
782 fn text_fields_analyzed() {
783 let (path, mut writer) = basic_setup("analyzed");
784 writer
785 .add(serde_json::json!({
786 "title": "The Quick Brown Fox",
787 "status": "active"
788 }))
789 .unwrap();
790 writer.commit().unwrap();
791
792 let storage = SingleFileDirectory::open(&path).unwrap();
794 let seg_id = storage.segments()[0].segment_id;
795 let data = storage.read_segment(seg_id).unwrap();
796
797 use crate::segment::reader::SegmentReader;
798 let reader = SegmentReader::open(data).unwrap();
799
800 assert!(reader.postings(FieldId::new(0), "the").is_some());
802 assert!(reader.postings(FieldId::new(0), "quick").is_some());
803 assert!(reader.postings(FieldId::new(0), "brown").is_some());
804 assert!(reader.postings(FieldId::new(0), "fox").is_some());
805
806 assert!(reader.postings(FieldId::new(0), "The").is_none());
808 assert!(reader.postings(FieldId::new(0), "Quick").is_none());
809
810 cleanup(&path);
811 }
812
813 #[test]
814 fn keyword_fields_exact() {
815 let (path, mut writer) = basic_setup("keyword");
816 writer
817 .add(serde_json::json!({
818 "title": "test",
819 "status": "Active"
820 }))
821 .unwrap();
822 writer.commit().unwrap();
823
824 let storage = SingleFileDirectory::open(&path).unwrap();
825 let data = storage
826 .read_segment(storage.segments()[0].segment_id)
827 .unwrap();
828
829 use crate::segment::reader::SegmentReader;
830 let reader = SegmentReader::open(data).unwrap();
831
832 assert!(reader.postings(FieldId::new(1), "Active").is_some());
834 assert!(reader.postings(FieldId::new(1), "active").is_none());
835
836 cleanup(&path);
837 }
838
839 #[test]
840 fn commit_with_no_docs_is_noop() {
841 let (path, mut writer) = basic_setup("empty_commit");
842 writer.commit().unwrap();
843
844 let storage = SingleFileDirectory::open(&path).unwrap();
845 assert!(storage.segments().is_empty());
846
847 cleanup(&path);
848 }
849
850 #[test]
851 fn auto_flush_on_memory_budget() {
852 let (path, mut writer) = basic_setup("autoflush");
853 writer.set_memory_budget(100); for i in 0..5 {
856 writer
857 .add(serde_json::json!({
858 "title": format!("document number {i} with some extra text to exceed the budget"),
859 "status": "active"
860 }))
861 .unwrap();
862 }
863 writer.commit().unwrap();
864
865 let storage = SingleFileDirectory::open(&path).unwrap();
867 assert!(
868 storage.segments().len() > 1,
869 "expected multiple segments from auto-flush, got {}",
870 storage.segments().len()
871 );
872
873 cleanup(&path);
874 }
875
876 #[test]
877 fn dynamic_false_ignores_unknown() {
878 let path = test_dir("dynamic_false");
879 let storage = SingleFileDirectory::create(&path).unwrap();
880 let schema = Mapping::builder()
881 .field("title", FieldType::Text)
882 .dynamic(DynamicMode::False)
883 .build();
884 let analyzers = AnalyzerRegistry::new();
885 let mut writer = IndexWriter::new(storage, schema, analyzers);
886
887 writer
889 .add(serde_json::json!({
890 "title": "hello",
891 "unknown_field": "value"
892 }))
893 .unwrap();
894 writer.commit().unwrap();
895
896 cleanup(&path);
897 }
898
899 #[test]
900 fn multiple_commits() {
901 let (path, mut writer) = basic_setup("multi_commit");
902 writer
903 .add(serde_json::json!({"title": "first", "status": "a"}))
904 .unwrap();
905 writer.commit().unwrap();
906
907 writer
908 .add(serde_json::json!({"title": "second", "status": "b"}))
909 .unwrap();
910 writer.commit().unwrap();
911
912 let storage = SingleFileDirectory::open(&path).unwrap();
913 assert_eq!(storage.segments().len(), 2);
914
915 cleanup(&path);
916 }
917
918 #[test]
919 fn source_stored_correctly() {
920 let (path, mut writer) = basic_setup("source");
921 let doc = serde_json::json!({"title": "hello world", "status": "active"});
922 writer.add(doc.clone()).unwrap();
923 writer.commit().unwrap();
924
925 let storage = SingleFileDirectory::open(&path).unwrap();
926 let data = storage
927 .read_segment(storage.segments()[0].segment_id)
928 .unwrap();
929
930 use crate::segment::reader::SegmentReader;
931 let reader = SegmentReader::open(data).unwrap();
932 let source = reader.doc_store().get(0).unwrap();
933 let stored: serde_json::Value = serde_json::from_slice(&source).unwrap();
934 assert_eq!(stored, doc);
935
936 cleanup(&path);
937 }
938}