1use std::io::BufRead;
7use std::path::Path;
8
9use crate::directories::{Directory, DirectoryWriter, FsDirectory};
10use crate::dsl::{Document, Schema, SchemaBuilder, parse_single_index};
11use crate::error::{Error, Result};
12use crate::index::{IndexConfig, IndexWriter};
13
14#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
16pub struct SchemaFieldConfig {
17 pub name: String,
19 #[serde(rename = "type")]
21 pub field_type: String,
22 #[serde(default = "default_true")]
24 pub indexed: bool,
25 #[serde(default = "default_true")]
27 pub stored: bool,
28 #[serde(default)]
30 pub dimension: usize,
31}
32
33fn default_true() -> bool {
34 true
35}
36
37#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
39pub struct SchemaConfig {
40 pub fields: Vec<SchemaFieldConfig>,
42}
43
44impl SchemaConfig {
45 pub fn build(&self) -> Result<Schema> {
47 let mut builder = SchemaBuilder::default();
48
49 for field in &self.fields {
50 match field.field_type.as_str() {
51 "text" => {
52 builder.add_text_field(&field.name, field.indexed, field.stored);
53 }
54 "u64" => {
55 builder.add_u64_field(&field.name, field.indexed, field.stored);
56 }
57 "i64" => {
58 builder.add_i64_field(&field.name, field.indexed, field.stored);
59 }
60 "f64" => {
61 builder.add_f64_field(&field.name, field.indexed, field.stored);
62 }
63 "bytes" => {
64 builder.add_bytes_field(&field.name, field.stored);
65 }
66 "json" => {
67 builder.add_json_field(&field.name, field.stored);
68 }
69 "sparse_vector" => {
70 builder.add_sparse_vector_field(&field.name, field.indexed, field.stored);
71 }
72 "dense_vector" => {
73 builder.add_dense_vector_field(
74 &field.name,
75 field.dimension,
76 field.indexed,
77 field.stored,
78 );
79 }
80 other => {
81 return Err(Error::Schema(format!("Unknown field type: {}", other)));
82 }
83 }
84 }
85
86 Ok(builder.build())
87 }
88}
89
90pub fn parse_schema(content: &str) -> Result<Schema> {
92 let trimmed = content.trim();
93
94 if trimmed.starts_with("index ") || trimmed.starts_with('#') {
96 let index_def = parse_single_index(content)
97 .map_err(|e| Error::Schema(format!("Failed to parse SDL: {}", e)))?;
98 Ok(index_def.to_schema())
99 } else {
100 let config: SchemaConfig = serde_json::from_str(content)
102 .map_err(|e| Error::Schema(format!("Failed to parse JSON schema: {}", e)))?;
103 config.build()
104 }
105}
106
107pub async fn create_index_at_path(
109 path: impl AsRef<Path>,
110 schema: Schema,
111 config: IndexConfig,
112) -> Result<IndexWriter<FsDirectory>> {
113 let path = path.as_ref();
114
115 std::fs::create_dir_all(path).map_err(|e| {
116 Error::Io(std::io::Error::new(
117 e.kind(),
118 format!("Failed to create index directory {:?}: {}", path, e),
119 ))
120 })?;
121
122 let dir = FsDirectory::new(path);
123 IndexWriter::create(dir, schema, config).await
124}
125
126pub async fn create_index_from_sdl(
128 path: impl AsRef<Path>,
129 sdl: &str,
130 config: IndexConfig,
131) -> Result<IndexWriter<FsDirectory>> {
132 let schema = parse_schema(sdl)?;
133 create_index_at_path(path, schema, config).await
134}
135
136#[derive(Debug, Clone, Default)]
138pub struct IndexingStats {
139 pub indexed: usize,
141 pub errors: usize,
143 pub elapsed_secs: f64,
145}
146
147impl IndexingStats {
148 pub fn docs_per_sec(&self) -> f64 {
150 if self.elapsed_secs > 0.0 {
151 self.indexed as f64 / self.elapsed_secs
152 } else {
153 0.0
154 }
155 }
156}
157
158pub async fn index_documents_from_reader<D, R>(
163 writer: &IndexWriter<D>,
164 reader: R,
165 progress_callback: Option<&dyn Fn(usize)>,
166) -> Result<IndexingStats>
167where
168 D: Directory + DirectoryWriter,
169 R: BufRead,
170{
171 let schema = writer.schema().clone();
172 let mut stats = IndexingStats::default();
173 let start_time = std::time::Instant::now();
174
175 for line in reader.lines() {
176 let line = line.map_err(Error::Io)?;
177 if line.trim().is_empty() {
178 continue;
179 }
180
181 let json: serde_json::Value = match serde_json::from_str(&line) {
182 Ok(v) => v,
183 Err(_) => {
184 stats.errors += 1;
185 continue;
186 }
187 };
188
189 let doc = match Document::from_json(&json, &schema) {
190 Some(d) => d,
191 None => {
192 stats.errors += 1;
193 continue;
194 }
195 };
196
197 writer.add_document(doc).await?;
198 stats.indexed += 1;
199
200 if let Some(callback) = progress_callback {
201 callback(stats.indexed);
202 }
203 }
204
205 writer.commit().await?;
206 stats.elapsed_secs = start_time.elapsed().as_secs_f64();
207
208 Ok(stats)
209}
210
211pub async fn index_json_document<D>(writer: &IndexWriter<D>, json: &serde_json::Value) -> Result<()>
213where
214 D: Directory + DirectoryWriter,
215{
216 let schema = writer.schema().clone();
217 let doc = Document::from_json(json, &schema)
218 .ok_or_else(|| Error::Document("Failed to parse JSON document".to_string()))?;
219 writer.add_document(doc).await?;
220 Ok(())
221}
222
223#[cfg(test)]
224mod tests {
225 use super::*;
226 use crate::directories::RamDirectory;
227
228 #[test]
229 fn test_schema_config_json() {
230 let json = r#"{
231 "fields": [
232 {"name": "title", "type": "text", "indexed": true, "stored": true},
233 {"name": "body", "type": "text"},
234 {"name": "score", "type": "f64", "indexed": false}
235 ]
236 }"#;
237
238 let config: SchemaConfig = serde_json::from_str(json).unwrap();
239 assert_eq!(config.fields.len(), 3);
240
241 let schema = config.build().unwrap();
242 assert!(schema.get_field("title").is_some());
243 assert!(schema.get_field("body").is_some());
244 assert!(schema.get_field("score").is_some());
245 }
246
247 #[test]
248 fn test_parse_schema_json() {
249 let json = r#"{"fields": [{"name": "text", "type": "text"}]}"#;
250 let schema = parse_schema(json).unwrap();
251 assert!(schema.get_field("text").is_some());
252 }
253
254 #[test]
255 fn test_parse_schema_sdl() {
256 let sdl = r#"
257 index test {
258 field text: text [indexed, stored]
259 }
260 "#;
261 let schema = parse_schema(sdl).unwrap();
262 assert!(schema.get_field("text").is_some());
263 }
264
265 #[tokio::test]
266 async fn test_index_documents_from_reader() {
267 let mut builder = SchemaBuilder::default();
268 let _title = builder.add_text_field("title", true, true);
269 let schema = builder.build();
270
271 let dir = RamDirectory::new();
272 let config = IndexConfig::default();
273 let writer = IndexWriter::create(dir, schema, config).await.unwrap();
274
275 let jsonl = r#"{"title": "Doc 1"}
276{"title": "Doc 2"}
277{"title": "Doc 3"}"#;
278
279 let reader = std::io::Cursor::new(jsonl);
280 let stats = index_documents_from_reader(&writer, reader, None)
281 .await
282 .unwrap();
283
284 assert_eq!(stats.indexed, 3);
285 assert_eq!(stats.errors, 0);
286 }
287}