Skip to main content

drasi_bootstrap_scriptfile/
script_file.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Script file bootstrap provider for reading JSONL bootstrap data
16
17use anyhow::{anyhow, Result};
18use async_trait::async_trait;
19use drasi_core::models::{Element, ElementMetadata, ElementReference, SourceChange};
20use log::{debug, error, info};
21use std::path::PathBuf;
22use std::sync::Arc;
23
24use crate::script_reader::BootstrapScriptReader;
25use crate::script_types::{BootstrapScriptRecord, NodeRecord, RelationRecord};
26use drasi_lib::bootstrap::{
27    BootstrapContext, BootstrapProvider, BootstrapRequest, BootstrapResult,
28};
29use drasi_lib::sources::manager::convert_json_to_element_properties;
30
31use drasi_lib::bootstrap::ScriptFileBootstrapConfig;
32
33/// Bootstrap provider that reads data from JSONL script files
34#[derive(Default)]
35pub struct ScriptFileBootstrapProvider {
36    file_paths: Vec<String>,
37}
38
39impl ScriptFileBootstrapProvider {
40    /// Create a new script file bootstrap provider from configuration
41    ///
42    /// # Arguments
43    /// * `config` - ScriptFile bootstrap configuration
44    pub fn new(config: ScriptFileBootstrapConfig) -> Self {
45        Self {
46            file_paths: config.file_paths,
47        }
48    }
49
50    /// Create a new script file bootstrap provider with explicit file paths
51    ///
52    /// # Arguments
53    /// * `file_paths` - List of JSONL file paths to read in order
54    pub fn with_paths(file_paths: Vec<String>) -> Self {
55        Self { file_paths }
56    }
57
58    /// Create a builder for ScriptFileBootstrapProvider
59    pub fn builder() -> ScriptFileBootstrapProviderBuilder {
60        ScriptFileBootstrapProviderBuilder::new()
61    }
62}
63
64/// Builder for ScriptFileBootstrapProvider
65///
66/// # Example
67///
68/// ```no_run
69/// use drasi_bootstrap_scriptfile::ScriptFileBootstrapProvider;
70///
71/// let provider = ScriptFileBootstrapProvider::builder()
72///     .with_file("/path/to/data.jsonl")
73///     .with_file("/path/to/more_data.jsonl")
74///     .build();
75/// ```
76pub struct ScriptFileBootstrapProviderBuilder {
77    file_paths: Vec<String>,
78}
79
80impl ScriptFileBootstrapProviderBuilder {
81    /// Create a new builder
82    pub fn new() -> Self {
83        Self {
84            file_paths: Vec::new(),
85        }
86    }
87
88    /// Set all file paths at once
89    pub fn with_file_paths(mut self, paths: Vec<String>) -> Self {
90        self.file_paths = paths;
91        self
92    }
93
94    /// Add a single file path
95    pub fn with_file(mut self, path: impl Into<String>) -> Self {
96        self.file_paths.push(path.into());
97        self
98    }
99
100    /// Build the ScriptFileBootstrapProvider
101    pub fn build(self) -> ScriptFileBootstrapProvider {
102        ScriptFileBootstrapProvider::with_paths(self.file_paths)
103    }
104}
105
106impl Default for ScriptFileBootstrapProviderBuilder {
107    fn default() -> Self {
108        Self::new()
109    }
110}
111
112impl ScriptFileBootstrapProvider {
113    /// Convert a NodeRecord to an Element::Node
114    fn convert_node_to_element(source_id: &str, node: &NodeRecord) -> Result<Element> {
115        // Convert properties from JSON to ElementPropertyMap
116        let properties = if let serde_json::Value::Object(obj) = &node.properties {
117            convert_json_to_element_properties(obj)
118        } else if node.properties.is_null() {
119            Default::default()
120        } else {
121            return Err(anyhow!(
122                "ScriptFile bootstrap error: Node '{}' has invalid properties type. \
123                 Properties must be a JSON object or null, found: {}",
124                node.id,
125                node.properties
126            ));
127        };
128
129        // Convert labels to Arc slice
130        let labels: Arc<[Arc<str>]> = node
131            .labels
132            .iter()
133            .map(|l| Arc::from(l.as_str()))
134            .collect::<Vec<_>>()
135            .into();
136
137        Ok(Element::Node {
138            metadata: ElementMetadata {
139                reference: ElementReference::new(source_id, &node.id),
140                labels,
141                effective_from: 0,
142            },
143            properties,
144        })
145    }
146
147    /// Convert a RelationRecord to an Element::Relation
148    fn convert_relation_to_element(source_id: &str, relation: &RelationRecord) -> Result<Element> {
149        // Convert properties from JSON to ElementPropertyMap
150        let properties = if let serde_json::Value::Object(obj) = &relation.properties {
151            convert_json_to_element_properties(obj)
152        } else if relation.properties.is_null() {
153            Default::default()
154        } else {
155            return Err(anyhow!(
156                "ScriptFile bootstrap error: Relation '{}' has invalid properties type. \
157                 Properties must be a JSON object or null, found: {}",
158                relation.id,
159                relation.properties
160            ));
161        };
162
163        // Convert labels to Arc slice
164        let labels: Arc<[Arc<str>]> = relation
165            .labels
166            .iter()
167            .map(|l| Arc::from(l.as_str()))
168            .collect::<Vec<_>>()
169            .into();
170
171        // Create start and end element references
172        let start_ref = ElementReference::new(source_id, &relation.start_id);
173        let end_ref = ElementReference::new(source_id, &relation.end_id);
174
175        Ok(Element::Relation {
176            metadata: ElementMetadata {
177                reference: ElementReference::new(source_id, &relation.id),
178                labels,
179                effective_from: 0,
180            },
181            properties,
182            in_node: start_ref,
183            out_node: end_ref,
184        })
185    }
186
187    /// Check if a record matches the requested labels
188    fn matches_labels(
189        record_labels: &[String],
190        requested_labels: &[String],
191        _is_node: bool,
192    ) -> bool {
193        // If no labels requested, include all records
194        if requested_labels.is_empty() {
195            return true;
196        }
197
198        // Check if any of the record's labels match the requested labels
199        record_labels
200            .iter()
201            .any(|label| requested_labels.contains(label))
202    }
203
204    /// Process records from the script and send matching elements
205    async fn process_records(
206        &self,
207        reader: &mut BootstrapScriptReader,
208        request: &BootstrapRequest,
209        context: &BootstrapContext,
210        event_tx: drasi_lib::channels::BootstrapEventSender,
211    ) -> Result<usize> {
212        let mut count = 0;
213
214        for record_result in reader {
215            let seq_record = record_result?;
216
217            match seq_record.record {
218                BootstrapScriptRecord::Node(node) => {
219                    // Check if node matches requested labels
220                    if Self::matches_labels(&node.labels, &request.node_labels, true) {
221                        debug!("Processing node: id={}, labels={:?}", node.id, node.labels);
222
223                        // Convert to element
224                        let element = Self::convert_node_to_element(&context.source_id, &node)?;
225
226                        // Send as insert
227                        let source_change = SourceChange::Insert { element };
228
229                        // Get next sequence number for this bootstrap event
230                        let sequence = context.next_sequence();
231
232                        let bootstrap_event = drasi_lib::channels::BootstrapEvent {
233                            source_id: context.source_id.clone(),
234                            change: source_change,
235                            timestamp: chrono::Utc::now(),
236                            sequence,
237                        };
238
239                        event_tx
240                            .send(bootstrap_event)
241                            .await
242                            .map_err(|e| anyhow!("Failed to send node: {e}"))?;
243
244                        count += 1;
245                    }
246                }
247                BootstrapScriptRecord::Relation(relation) => {
248                    // Check if relation matches requested labels
249                    if Self::matches_labels(&relation.labels, &request.relation_labels, false) {
250                        debug!(
251                            "Processing relation: id={}, labels={:?}, start={}, end={}",
252                            relation.id, relation.labels, relation.start_id, relation.end_id
253                        );
254
255                        // Convert to element
256                        let element =
257                            Self::convert_relation_to_element(&context.source_id, &relation)?;
258
259                        // Send as insert
260                        let source_change = SourceChange::Insert { element };
261
262                        // Get next sequence number for this bootstrap event
263                        let sequence = context.next_sequence();
264
265                        let bootstrap_event = drasi_lib::channels::BootstrapEvent {
266                            source_id: context.source_id.clone(),
267                            change: source_change,
268                            timestamp: chrono::Utc::now(),
269                            sequence,
270                        };
271
272                        event_tx
273                            .send(bootstrap_event)
274                            .await
275                            .map_err(|e| anyhow!("Failed to send relation: {e}"))?;
276
277                        count += 1;
278                    }
279                }
280                BootstrapScriptRecord::Finish(finish) => {
281                    debug!("Reached finish record: {}", finish.description);
282                    break;
283                }
284                BootstrapScriptRecord::Label(label) => {
285                    debug!("Skipping label record: {}", label.label);
286                }
287                BootstrapScriptRecord::Comment(_) => {
288                    // Comments are filtered by the reader, but handle just in case
289                    debug!("Skipping comment record");
290                }
291                BootstrapScriptRecord::Header(_) => {
292                    // Header already processed by reader
293                    debug!("Skipping header record in iteration");
294                }
295            }
296        }
297
298        Ok(count)
299    }
300}
301
302#[async_trait]
303impl BootstrapProvider for ScriptFileBootstrapProvider {
304    async fn bootstrap(
305        &self,
306        request: BootstrapRequest,
307        context: &BootstrapContext,
308        event_tx: drasi_lib::channels::BootstrapEventSender,
309        _settings: Option<&drasi_lib::config::SourceSubscriptionSettings>,
310    ) -> Result<BootstrapResult> {
311        info!(
312            "Starting script file bootstrap for query {} from {} file(s)",
313            request.query_id,
314            self.file_paths.len()
315        );
316
317        // Convert file paths to PathBuf
318        let paths: Vec<PathBuf> = self.file_paths.iter().map(PathBuf::from).collect();
319
320        // Create the script reader
321        let mut reader = BootstrapScriptReader::new(paths).map_err(|e| {
322            error!("Failed to create script reader: {e}");
323            anyhow!("Failed to create script reader: {e}")
324        })?;
325
326        // Get and log header information
327        let header = reader.get_header();
328        info!(
329            "Script header - start_time: {}, description: {}",
330            header.start_time, header.description
331        );
332
333        // Process records and send matching elements
334        let count = self
335            .process_records(&mut reader, &request, context, event_tx)
336            .await?;
337
338        info!(
339            "Completed script file bootstrap for query {}: sent {} elements (requested node labels: {:?}, relation labels: {:?})",
340            request.query_id, count, request.node_labels, request.relation_labels
341        );
342
343        Ok(BootstrapResult {
344            event_count: count,
345            last_sequence: None,
346            sequences_aligned: false,
347        })
348    }
349}
350
351#[cfg(test)]
352mod tests {
353    use super::*;
354    use crate::script_types::{NodeRecord, RelationRecord};
355    use serde_json::json;
356
357    #[test]
358    fn test_convert_node_to_element() {
359        let node = NodeRecord {
360            id: "n1".to_string(),
361            labels: vec!["Person".to_string()],
362            properties: json!({"name": "Alice", "age": 30}),
363        };
364
365        let element =
366            ScriptFileBootstrapProvider::convert_node_to_element("test_source", &node).unwrap();
367
368        match element {
369            Element::Node {
370                metadata,
371                properties,
372            } => {
373                assert_eq!(metadata.reference.source_id.as_ref(), "test_source");
374                assert_eq!(metadata.reference.element_id.as_ref(), "n1");
375                assert_eq!(metadata.labels.len(), 1);
376                assert_eq!(metadata.labels[0].as_ref(), "Person");
377                assert!(properties.get(&Arc::from("name")).is_some());
378            }
379            _ => panic!("Expected Node element"),
380        }
381    }
382
383    #[test]
384    fn test_convert_node_with_null_properties() {
385        let node = NodeRecord {
386            id: "n1".to_string(),
387            labels: vec!["Person".to_string()],
388            properties: serde_json::Value::Null,
389        };
390
391        let element =
392            ScriptFileBootstrapProvider::convert_node_to_element("test_source", &node).unwrap();
393
394        match element {
395            Element::Node { properties, .. } => {
396                // Properties should be empty for null input
397                assert!(properties.get(&Arc::from("test")).is_none());
398            }
399            _ => panic!("Expected Node element"),
400        }
401    }
402
403    #[test]
404    fn test_convert_relation_to_element() {
405        let relation = RelationRecord {
406            id: "r1".to_string(),
407            labels: vec!["KNOWS".to_string()],
408            start_id: "n1".to_string(),
409            start_label: Some("Person".to_string()),
410            end_id: "n2".to_string(),
411            end_label: Some("Person".to_string()),
412            properties: json!({"since": 2020}),
413        };
414
415        let element =
416            ScriptFileBootstrapProvider::convert_relation_to_element("test_source", &relation)
417                .unwrap();
418
419        match element {
420            Element::Relation {
421                metadata,
422                out_node,
423                in_node,
424                properties,
425            } => {
426                assert_eq!(metadata.reference.source_id.as_ref(), "test_source");
427                assert_eq!(metadata.reference.element_id.as_ref(), "r1");
428                assert_eq!(metadata.labels.len(), 1);
429                assert_eq!(metadata.labels[0].as_ref(), "KNOWS");
430                assert_eq!(in_node.source_id.as_ref(), "test_source");
431                assert_eq!(in_node.element_id.as_ref(), "n1");
432                assert_eq!(out_node.source_id.as_ref(), "test_source");
433                assert_eq!(out_node.element_id.as_ref(), "n2");
434                assert!(properties.get(&Arc::from("since")).is_some());
435            }
436            _ => panic!("Expected Relation element"),
437        }
438    }
439
440    #[test]
441    fn test_matches_labels_empty_request() {
442        let record_labels = vec!["Person".to_string()];
443        let requested_labels = vec![];
444
445        assert!(ScriptFileBootstrapProvider::matches_labels(
446            &record_labels,
447            &requested_labels,
448            true
449        ));
450    }
451
452    #[test]
453    fn test_matches_labels_match() {
454        let record_labels = vec!["Person".to_string(), "Employee".to_string()];
455        let requested_labels = vec!["Person".to_string()];
456
457        assert!(ScriptFileBootstrapProvider::matches_labels(
458            &record_labels,
459            &requested_labels,
460            true
461        ));
462    }
463
464    #[test]
465    fn test_matches_labels_no_match() {
466        let record_labels = vec!["Person".to_string()];
467        let requested_labels = vec!["Company".to_string()];
468
469        assert!(!ScriptFileBootstrapProvider::matches_labels(
470            &record_labels,
471            &requested_labels,
472            true
473        ));
474    }
475
476    // Note: Full integration tests require tokio runtime and channels
477    // These are handled in the main test suite
478}