Skip to main content

hermes_core/index/
helpers.rs

1//! Indexing helper functions
2//!
3//! This module provides high-level helper functions for creating indexes
4//! and indexing documents, used by hermes-tool, hermes-server, and hermes-core-python.
5
6use std::io::BufRead;
7use std::path::Path;
8
9use crate::directories::{Directory, DirectoryWriter, FsDirectory};
10use crate::dsl::{Document, Schema, SchemaBuilder, parse_single_index};
11use crate::error::{Error, Result};
12use crate::index::{IndexConfig, IndexWriter};
13
14/// Schema configuration from JSON format
15#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
16pub struct SchemaFieldConfig {
17    /// Field name
18    pub name: String,
19    /// Field type: text, u64, i64, f64, bytes, json, sparse_vector, dense_vector
20    #[serde(rename = "type")]
21    pub field_type: String,
22    /// Whether field is indexed (default: true)
23    #[serde(default = "default_true")]
24    pub indexed: bool,
25    /// Whether field is stored (default: true)
26    #[serde(default = "default_true")]
27    pub stored: bool,
28    /// Dimension for dense_vector fields
29    #[serde(default)]
30    pub dimension: usize,
31}
32
33fn default_true() -> bool {
34    true
35}
36
37/// JSON schema configuration
38#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
39pub struct SchemaConfig {
40    /// List of field definitions
41    pub fields: Vec<SchemaFieldConfig>,
42}
43
44impl SchemaConfig {
45    /// Build a Schema from this configuration
46    pub fn build(&self) -> Result<Schema> {
47        let mut builder = SchemaBuilder::default();
48
49        for field in &self.fields {
50            match field.field_type.as_str() {
51                "text" => {
52                    builder.add_text_field(&field.name, field.indexed, field.stored);
53                }
54                "u64" => {
55                    builder.add_u64_field(&field.name, field.indexed, field.stored);
56                }
57                "i64" => {
58                    builder.add_i64_field(&field.name, field.indexed, field.stored);
59                }
60                "f64" => {
61                    builder.add_f64_field(&field.name, field.indexed, field.stored);
62                }
63                "bytes" => {
64                    builder.add_bytes_field(&field.name, field.stored);
65                }
66                "json" => {
67                    builder.add_json_field(&field.name, field.stored);
68                }
69                "sparse_vector" => {
70                    builder.add_sparse_vector_field(&field.name, field.indexed, field.stored);
71                }
72                "dense_vector" => {
73                    builder.add_dense_vector_field(
74                        &field.name,
75                        field.dimension,
76                        field.indexed,
77                        field.stored,
78                    );
79                }
80                other => {
81                    return Err(Error::Schema(format!("Unknown field type: {}", other)));
82                }
83            }
84        }
85
86        Ok(builder.build())
87    }
88}
89
90/// Parse schema from a string (auto-detects JSON or SDL format)
91pub fn parse_schema(content: &str) -> Result<Schema> {
92    let trimmed = content.trim();
93
94    // Detect SDL format (starts with "index " or "#" for comments)
95    if trimmed.starts_with("index ") || trimmed.starts_with('#') {
96        let index_def = parse_single_index(content)
97            .map_err(|e| Error::Schema(format!("Failed to parse SDL: {}", e)))?;
98        Ok(index_def.to_schema())
99    } else {
100        // Try JSON format
101        let config: SchemaConfig = serde_json::from_str(content)
102            .map_err(|e| Error::Schema(format!("Failed to parse JSON schema: {}", e)))?;
103        config.build()
104    }
105}
106
107/// Create a new index at the given path with the provided schema
108pub async fn create_index_at_path(
109    path: impl AsRef<Path>,
110    schema: Schema,
111    config: IndexConfig,
112) -> Result<IndexWriter<FsDirectory>> {
113    let path = path.as_ref();
114
115    std::fs::create_dir_all(path).map_err(|e| {
116        Error::Io(std::io::Error::new(
117            e.kind(),
118            format!("Failed to create index directory {:?}: {}", path, e),
119        ))
120    })?;
121
122    let dir = FsDirectory::new(path);
123    IndexWriter::create(dir, schema, config).await
124}
125
126/// Create a new index from an SDL schema string
127pub async fn create_index_from_sdl(
128    path: impl AsRef<Path>,
129    sdl: &str,
130    config: IndexConfig,
131) -> Result<IndexWriter<FsDirectory>> {
132    let schema = parse_schema(sdl)?;
133    create_index_at_path(path, schema, config).await
134}
135
136/// Indexing statistics
137#[derive(Debug, Clone, Default)]
138pub struct IndexingStats {
139    /// Number of documents indexed
140    pub indexed: usize,
141    /// Number of documents that failed to parse
142    pub errors: usize,
143    /// Total time in seconds
144    pub elapsed_secs: f64,
145}
146
147impl IndexingStats {
148    /// Documents per second rate
149    pub fn docs_per_sec(&self) -> f64 {
150        if self.elapsed_secs > 0.0 {
151            self.indexed as f64 / self.elapsed_secs
152        } else {
153            0.0
154        }
155    }
156}
157
158/// Index documents from a JSONL reader
159///
160/// Each line should be a valid JSON object. Documents are parsed according
161/// to the schema and indexed. Returns statistics about the indexing operation.
162pub async fn index_documents_from_reader<D, R>(
163    writer: &IndexWriter<D>,
164    reader: R,
165    progress_callback: Option<&dyn Fn(usize)>,
166) -> Result<IndexingStats>
167where
168    D: Directory + DirectoryWriter,
169    R: BufRead,
170{
171    let schema = writer.schema().clone();
172    let mut stats = IndexingStats::default();
173    let start_time = std::time::Instant::now();
174
175    for line in reader.lines() {
176        let line = line.map_err(Error::Io)?;
177        if line.trim().is_empty() {
178            continue;
179        }
180
181        let json: serde_json::Value = match serde_json::from_str(&line) {
182            Ok(v) => v,
183            Err(_) => {
184                stats.errors += 1;
185                continue;
186            }
187        };
188
189        let doc = match Document::from_json(&json, &schema) {
190            Some(d) => d,
191            None => {
192                stats.errors += 1;
193                continue;
194            }
195        };
196
197        writer.add_document(doc).await?;
198        stats.indexed += 1;
199
200        if let Some(callback) = progress_callback {
201            callback(stats.indexed);
202        }
203    }
204
205    writer.commit().await?;
206    stats.elapsed_secs = start_time.elapsed().as_secs_f64();
207
208    Ok(stats)
209}
210
211/// Index a single document from JSON
212pub async fn index_json_document<D>(writer: &IndexWriter<D>, json: &serde_json::Value) -> Result<()>
213where
214    D: Directory + DirectoryWriter,
215{
216    let schema = writer.schema().clone();
217    let doc = Document::from_json(json, &schema)
218        .ok_or_else(|| Error::Document("Failed to parse JSON document".to_string()))?;
219    writer.add_document(doc).await?;
220    Ok(())
221}
222
223#[cfg(test)]
224mod tests {
225    use super::*;
226    use crate::directories::RamDirectory;
227
228    #[test]
229    fn test_schema_config_json() {
230        let json = r#"{
231            "fields": [
232                {"name": "title", "type": "text", "indexed": true, "stored": true},
233                {"name": "body", "type": "text"},
234                {"name": "score", "type": "f64", "indexed": false}
235            ]
236        }"#;
237
238        let config: SchemaConfig = serde_json::from_str(json).unwrap();
239        assert_eq!(config.fields.len(), 3);
240
241        let schema = config.build().unwrap();
242        assert!(schema.get_field("title").is_some());
243        assert!(schema.get_field("body").is_some());
244        assert!(schema.get_field("score").is_some());
245    }
246
247    #[test]
248    fn test_parse_schema_json() {
249        let json = r#"{"fields": [{"name": "text", "type": "text"}]}"#;
250        let schema = parse_schema(json).unwrap();
251        assert!(schema.get_field("text").is_some());
252    }
253
254    #[test]
255    fn test_parse_schema_sdl() {
256        let sdl = r#"
257            index test {
258                field text: text [indexed, stored]
259            }
260        "#;
261        let schema = parse_schema(sdl).unwrap();
262        assert!(schema.get_field("text").is_some());
263    }
264
265    #[tokio::test]
266    async fn test_index_documents_from_reader() {
267        let mut builder = SchemaBuilder::default();
268        let _title = builder.add_text_field("title", true, true);
269        let schema = builder.build();
270
271        let dir = RamDirectory::new();
272        let config = IndexConfig::default();
273        let writer = IndexWriter::create(dir, schema, config).await.unwrap();
274
275        let jsonl = r#"{"title": "Doc 1"}
276{"title": "Doc 2"}
277{"title": "Doc 3"}"#;
278
279        let reader = std::io::Cursor::new(jsonl);
280        let stats = index_documents_from_reader(&writer, reader, None)
281            .await
282            .unwrap();
283
284        assert_eq!(stats.indexed, 3);
285        assert_eq!(stats.errors, 0);
286    }
287}