Skip to main content

drasi_bootstrap_scriptfile/
script_file.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Script file bootstrap provider for reading JSONL bootstrap data
16
17use anyhow::{anyhow, Result};
18use async_trait::async_trait;
19use drasi_core::models::{Element, ElementMetadata, ElementReference, SourceChange};
20use log::{debug, error, info};
21use std::path::PathBuf;
22use std::sync::Arc;
23
24use crate::script_reader::BootstrapScriptReader;
25use crate::script_types::{BootstrapScriptRecord, NodeRecord, RelationRecord};
26use drasi_lib::bootstrap::{BootstrapContext, BootstrapProvider, BootstrapRequest};
27use drasi_lib::sources::manager::convert_json_to_element_properties;
28
29use drasi_lib::bootstrap::ScriptFileBootstrapConfig;
30
31/// Bootstrap provider that reads data from JSONL script files
32#[derive(Default)]
33pub struct ScriptFileBootstrapProvider {
34    file_paths: Vec<String>,
35}
36
37impl ScriptFileBootstrapProvider {
38    /// Create a new script file bootstrap provider from configuration
39    ///
40    /// # Arguments
41    /// * `config` - ScriptFile bootstrap configuration
42    pub fn new(config: ScriptFileBootstrapConfig) -> Self {
43        Self {
44            file_paths: config.file_paths,
45        }
46    }
47
48    /// Create a new script file bootstrap provider with explicit file paths
49    ///
50    /// # Arguments
51    /// * `file_paths` - List of JSONL file paths to read in order
52    pub fn with_paths(file_paths: Vec<String>) -> Self {
53        Self { file_paths }
54    }
55
56    /// Create a builder for ScriptFileBootstrapProvider
57    pub fn builder() -> ScriptFileBootstrapProviderBuilder {
58        ScriptFileBootstrapProviderBuilder::new()
59    }
60}
61
62/// Builder for ScriptFileBootstrapProvider
63///
64/// # Example
65///
66/// ```no_run
67/// use drasi_bootstrap_scriptfile::ScriptFileBootstrapProvider;
68///
69/// let provider = ScriptFileBootstrapProvider::builder()
70///     .with_file("/path/to/data.jsonl")
71///     .with_file("/path/to/more_data.jsonl")
72///     .build();
73/// ```
74pub struct ScriptFileBootstrapProviderBuilder {
75    file_paths: Vec<String>,
76}
77
78impl ScriptFileBootstrapProviderBuilder {
79    /// Create a new builder
80    pub fn new() -> Self {
81        Self {
82            file_paths: Vec::new(),
83        }
84    }
85
86    /// Set all file paths at once
87    pub fn with_file_paths(mut self, paths: Vec<String>) -> Self {
88        self.file_paths = paths;
89        self
90    }
91
92    /// Add a single file path
93    pub fn with_file(mut self, path: impl Into<String>) -> Self {
94        self.file_paths.push(path.into());
95        self
96    }
97
98    /// Build the ScriptFileBootstrapProvider
99    pub fn build(self) -> ScriptFileBootstrapProvider {
100        ScriptFileBootstrapProvider::with_paths(self.file_paths)
101    }
102}
103
104impl Default for ScriptFileBootstrapProviderBuilder {
105    fn default() -> Self {
106        Self::new()
107    }
108}
109
110impl ScriptFileBootstrapProvider {
111    /// Convert a NodeRecord to an Element::Node
112    fn convert_node_to_element(source_id: &str, node: &NodeRecord) -> Result<Element> {
113        // Convert properties from JSON to ElementPropertyMap
114        let properties = if let serde_json::Value::Object(obj) = &node.properties {
115            convert_json_to_element_properties(obj)?
116        } else if node.properties.is_null() {
117            Default::default()
118        } else {
119            return Err(anyhow!(
120                "ScriptFile bootstrap error: Node '{}' has invalid properties type. \
121                 Properties must be a JSON object or null, found: {}",
122                node.id,
123                node.properties
124            ));
125        };
126
127        // Convert labels to Arc slice
128        let labels: Arc<[Arc<str>]> = node
129            .labels
130            .iter()
131            .map(|l| Arc::from(l.as_str()))
132            .collect::<Vec<_>>()
133            .into();
134
135        Ok(Element::Node {
136            metadata: ElementMetadata {
137                reference: ElementReference::new(source_id, &node.id),
138                labels,
139                effective_from: 0,
140            },
141            properties,
142        })
143    }
144
145    /// Convert a RelationRecord to an Element::Relation
146    fn convert_relation_to_element(source_id: &str, relation: &RelationRecord) -> Result<Element> {
147        // Convert properties from JSON to ElementPropertyMap
148        let properties = if let serde_json::Value::Object(obj) = &relation.properties {
149            convert_json_to_element_properties(obj)?
150        } else if relation.properties.is_null() {
151            Default::default()
152        } else {
153            return Err(anyhow!(
154                "ScriptFile bootstrap error: Relation '{}' has invalid properties type. \
155                 Properties must be a JSON object or null, found: {}",
156                relation.id,
157                relation.properties
158            ));
159        };
160
161        // Convert labels to Arc slice
162        let labels: Arc<[Arc<str>]> = relation
163            .labels
164            .iter()
165            .map(|l| Arc::from(l.as_str()))
166            .collect::<Vec<_>>()
167            .into();
168
169        // Create start and end element references
170        let start_ref = ElementReference::new(source_id, &relation.start_id);
171        let end_ref = ElementReference::new(source_id, &relation.end_id);
172
173        Ok(Element::Relation {
174            metadata: ElementMetadata {
175                reference: ElementReference::new(source_id, &relation.id),
176                labels,
177                effective_from: 0,
178            },
179            properties,
180            in_node: start_ref,
181            out_node: end_ref,
182        })
183    }
184
185    /// Check if a record matches the requested labels
186    fn matches_labels(
187        record_labels: &[String],
188        requested_labels: &[String],
189        _is_node: bool,
190    ) -> bool {
191        // If no labels requested, include all records
192        if requested_labels.is_empty() {
193            return true;
194        }
195
196        // Check if any of the record's labels match the requested labels
197        record_labels
198            .iter()
199            .any(|label| requested_labels.contains(label))
200    }
201
202    /// Process records from the script and send matching elements
203    async fn process_records(
204        &self,
205        reader: &mut BootstrapScriptReader,
206        request: &BootstrapRequest,
207        context: &BootstrapContext,
208        event_tx: drasi_lib::channels::BootstrapEventSender,
209    ) -> Result<usize> {
210        let mut count = 0;
211
212        for record_result in reader {
213            let seq_record = record_result?;
214
215            match seq_record.record {
216                BootstrapScriptRecord::Node(node) => {
217                    // Check if node matches requested labels
218                    if Self::matches_labels(&node.labels, &request.node_labels, true) {
219                        debug!("Processing node: id={}, labels={:?}", node.id, node.labels);
220
221                        // Convert to element
222                        let element = Self::convert_node_to_element(&context.source_id, &node)?;
223
224                        // Send as insert
225                        let source_change = SourceChange::Insert { element };
226
227                        // Get next sequence number for this bootstrap event
228                        let sequence = context.next_sequence();
229
230                        let bootstrap_event = drasi_lib::channels::BootstrapEvent {
231                            source_id: context.source_id.clone(),
232                            change: source_change,
233                            timestamp: chrono::Utc::now(),
234                            sequence,
235                        };
236
237                        event_tx
238                            .send(bootstrap_event)
239                            .await
240                            .map_err(|e| anyhow!("Failed to send node: {e}"))?;
241
242                        count += 1;
243                    }
244                }
245                BootstrapScriptRecord::Relation(relation) => {
246                    // Check if relation matches requested labels
247                    if Self::matches_labels(&relation.labels, &request.relation_labels, false) {
248                        debug!(
249                            "Processing relation: id={}, labels={:?}, start={}, end={}",
250                            relation.id, relation.labels, relation.start_id, relation.end_id
251                        );
252
253                        // Convert to element
254                        let element =
255                            Self::convert_relation_to_element(&context.source_id, &relation)?;
256
257                        // Send as insert
258                        let source_change = SourceChange::Insert { element };
259
260                        // Get next sequence number for this bootstrap event
261                        let sequence = context.next_sequence();
262
263                        let bootstrap_event = drasi_lib::channels::BootstrapEvent {
264                            source_id: context.source_id.clone(),
265                            change: source_change,
266                            timestamp: chrono::Utc::now(),
267                            sequence,
268                        };
269
270                        event_tx
271                            .send(bootstrap_event)
272                            .await
273                            .map_err(|e| anyhow!("Failed to send relation: {e}"))?;
274
275                        count += 1;
276                    }
277                }
278                BootstrapScriptRecord::Finish(finish) => {
279                    debug!("Reached finish record: {}", finish.description);
280                    break;
281                }
282                BootstrapScriptRecord::Label(label) => {
283                    debug!("Skipping label record: {}", label.label);
284                }
285                BootstrapScriptRecord::Comment(_) => {
286                    // Comments are filtered by the reader, but handle just in case
287                    debug!("Skipping comment record");
288                }
289                BootstrapScriptRecord::Header(_) => {
290                    // Header already processed by reader
291                    debug!("Skipping header record in iteration");
292                }
293            }
294        }
295
296        Ok(count)
297    }
298}
299
300#[async_trait]
301impl BootstrapProvider for ScriptFileBootstrapProvider {
302    async fn bootstrap(
303        &self,
304        request: BootstrapRequest,
305        context: &BootstrapContext,
306        event_tx: drasi_lib::channels::BootstrapEventSender,
307        _settings: Option<&drasi_lib::config::SourceSubscriptionSettings>,
308    ) -> Result<usize> {
309        info!(
310            "Starting script file bootstrap for query {} from {} file(s)",
311            request.query_id,
312            self.file_paths.len()
313        );
314
315        // Convert file paths to PathBuf
316        let paths: Vec<PathBuf> = self.file_paths.iter().map(PathBuf::from).collect();
317
318        // Create the script reader
319        let mut reader = BootstrapScriptReader::new(paths).map_err(|e| {
320            error!("Failed to create script reader: {e}");
321            anyhow!("Failed to create script reader: {e}")
322        })?;
323
324        // Get and log header information
325        let header = reader.get_header();
326        info!(
327            "Script header - start_time: {}, description: {}",
328            header.start_time, header.description
329        );
330
331        // Process records and send matching elements
332        let count = self
333            .process_records(&mut reader, &request, context, event_tx)
334            .await?;
335
336        info!(
337            "Completed script file bootstrap for query {}: sent {} elements (requested node labels: {:?}, relation labels: {:?})",
338            request.query_id, count, request.node_labels, request.relation_labels
339        );
340
341        Ok(count)
342    }
343}
344
345#[cfg(test)]
346mod tests {
347    use super::*;
348    use crate::script_types::{NodeRecord, RelationRecord};
349    use serde_json::json;
350
351    #[test]
352    fn test_convert_node_to_element() {
353        let node = NodeRecord {
354            id: "n1".to_string(),
355            labels: vec!["Person".to_string()],
356            properties: json!({"name": "Alice", "age": 30}),
357        };
358
359        let element =
360            ScriptFileBootstrapProvider::convert_node_to_element("test_source", &node).unwrap();
361
362        match element {
363            Element::Node {
364                metadata,
365                properties,
366            } => {
367                assert_eq!(metadata.reference.source_id.as_ref(), "test_source");
368                assert_eq!(metadata.reference.element_id.as_ref(), "n1");
369                assert_eq!(metadata.labels.len(), 1);
370                assert_eq!(metadata.labels[0].as_ref(), "Person");
371                assert!(properties.get(&Arc::from("name")).is_some());
372            }
373            _ => panic!("Expected Node element"),
374        }
375    }
376
377    #[test]
378    fn test_convert_node_with_null_properties() {
379        let node = NodeRecord {
380            id: "n1".to_string(),
381            labels: vec!["Person".to_string()],
382            properties: serde_json::Value::Null,
383        };
384
385        let element =
386            ScriptFileBootstrapProvider::convert_node_to_element("test_source", &node).unwrap();
387
388        match element {
389            Element::Node { properties, .. } => {
390                // Properties should be empty for null input
391                assert!(properties.get(&Arc::from("test")).is_none());
392            }
393            _ => panic!("Expected Node element"),
394        }
395    }
396
397    #[test]
398    fn test_convert_relation_to_element() {
399        let relation = RelationRecord {
400            id: "r1".to_string(),
401            labels: vec!["KNOWS".to_string()],
402            start_id: "n1".to_string(),
403            start_label: Some("Person".to_string()),
404            end_id: "n2".to_string(),
405            end_label: Some("Person".to_string()),
406            properties: json!({"since": 2020}),
407        };
408
409        let element =
410            ScriptFileBootstrapProvider::convert_relation_to_element("test_source", &relation)
411                .unwrap();
412
413        match element {
414            Element::Relation {
415                metadata,
416                out_node,
417                in_node,
418                properties,
419            } => {
420                assert_eq!(metadata.reference.source_id.as_ref(), "test_source");
421                assert_eq!(metadata.reference.element_id.as_ref(), "r1");
422                assert_eq!(metadata.labels.len(), 1);
423                assert_eq!(metadata.labels[0].as_ref(), "KNOWS");
424                assert_eq!(in_node.source_id.as_ref(), "test_source");
425                assert_eq!(in_node.element_id.as_ref(), "n1");
426                assert_eq!(out_node.source_id.as_ref(), "test_source");
427                assert_eq!(out_node.element_id.as_ref(), "n2");
428                assert!(properties.get(&Arc::from("since")).is_some());
429            }
430            _ => panic!("Expected Relation element"),
431        }
432    }
433
434    #[test]
435    fn test_matches_labels_empty_request() {
436        let record_labels = vec!["Person".to_string()];
437        let requested_labels = vec![];
438
439        assert!(ScriptFileBootstrapProvider::matches_labels(
440            &record_labels,
441            &requested_labels,
442            true
443        ));
444    }
445
446    #[test]
447    fn test_matches_labels_match() {
448        let record_labels = vec!["Person".to_string(), "Employee".to_string()];
449        let requested_labels = vec!["Person".to_string()];
450
451        assert!(ScriptFileBootstrapProvider::matches_labels(
452            &record_labels,
453            &requested_labels,
454            true
455        ));
456    }
457
458    #[test]
459    fn test_matches_labels_no_match() {
460        let record_labels = vec!["Person".to_string()];
461        let requested_labels = vec!["Company".to_string()];
462
463        assert!(!ScriptFileBootstrapProvider::matches_labels(
464            &record_labels,
465            &requested_labels,
466            true
467        ));
468    }
469
470    // Note: Full integration tests require tokio runtime and channels
471    // These are handled in the main test suite
472}