drasi_bootstrap_scriptfile/
script_file.rs1use anyhow::{anyhow, Result};
18use async_trait::async_trait;
19use drasi_core::models::{Element, ElementMetadata, ElementReference, SourceChange};
20use log::{debug, error, info};
21use std::path::PathBuf;
22use std::sync::Arc;
23
24use crate::script_reader::BootstrapScriptReader;
25use crate::script_types::{BootstrapScriptRecord, NodeRecord, RelationRecord};
26use drasi_lib::bootstrap::{
27 BootstrapContext, BootstrapProvider, BootstrapRequest, BootstrapResult,
28};
29use drasi_lib::sources::manager::convert_json_to_element_properties;
30
31use drasi_lib::bootstrap::ScriptFileBootstrapConfig;
32
33#[derive(Default)]
35pub struct ScriptFileBootstrapProvider {
36 file_paths: Vec<String>,
37}
38
39impl ScriptFileBootstrapProvider {
40 pub fn new(config: ScriptFileBootstrapConfig) -> Self {
45 Self {
46 file_paths: config.file_paths,
47 }
48 }
49
50 pub fn with_paths(file_paths: Vec<String>) -> Self {
55 Self { file_paths }
56 }
57
58 pub fn builder() -> ScriptFileBootstrapProviderBuilder {
60 ScriptFileBootstrapProviderBuilder::new()
61 }
62}
63
64pub struct ScriptFileBootstrapProviderBuilder {
77 file_paths: Vec<String>,
78}
79
80impl ScriptFileBootstrapProviderBuilder {
81 pub fn new() -> Self {
83 Self {
84 file_paths: Vec::new(),
85 }
86 }
87
88 pub fn with_file_paths(mut self, paths: Vec<String>) -> Self {
90 self.file_paths = paths;
91 self
92 }
93
94 pub fn with_file(mut self, path: impl Into<String>) -> Self {
96 self.file_paths.push(path.into());
97 self
98 }
99
100 pub fn build(self) -> ScriptFileBootstrapProvider {
102 ScriptFileBootstrapProvider::with_paths(self.file_paths)
103 }
104}
105
106impl Default for ScriptFileBootstrapProviderBuilder {
107 fn default() -> Self {
108 Self::new()
109 }
110}
111
112impl ScriptFileBootstrapProvider {
113 fn convert_node_to_element(source_id: &str, node: &NodeRecord) -> Result<Element> {
115 let properties = if let serde_json::Value::Object(obj) = &node.properties {
117 convert_json_to_element_properties(obj)
118 } else if node.properties.is_null() {
119 Default::default()
120 } else {
121 return Err(anyhow!(
122 "ScriptFile bootstrap error: Node '{}' has invalid properties type. \
123 Properties must be a JSON object or null, found: {}",
124 node.id,
125 node.properties
126 ));
127 };
128
129 let labels: Arc<[Arc<str>]> = node
131 .labels
132 .iter()
133 .map(|l| Arc::from(l.as_str()))
134 .collect::<Vec<_>>()
135 .into();
136
137 Ok(Element::Node {
138 metadata: ElementMetadata {
139 reference: ElementReference::new(source_id, &node.id),
140 labels,
141 effective_from: 0,
142 },
143 properties,
144 })
145 }
146
147 fn convert_relation_to_element(source_id: &str, relation: &RelationRecord) -> Result<Element> {
149 let properties = if let serde_json::Value::Object(obj) = &relation.properties {
151 convert_json_to_element_properties(obj)
152 } else if relation.properties.is_null() {
153 Default::default()
154 } else {
155 return Err(anyhow!(
156 "ScriptFile bootstrap error: Relation '{}' has invalid properties type. \
157 Properties must be a JSON object or null, found: {}",
158 relation.id,
159 relation.properties
160 ));
161 };
162
163 let labels: Arc<[Arc<str>]> = relation
165 .labels
166 .iter()
167 .map(|l| Arc::from(l.as_str()))
168 .collect::<Vec<_>>()
169 .into();
170
171 let start_ref = ElementReference::new(source_id, &relation.start_id);
173 let end_ref = ElementReference::new(source_id, &relation.end_id);
174
175 Ok(Element::Relation {
176 metadata: ElementMetadata {
177 reference: ElementReference::new(source_id, &relation.id),
178 labels,
179 effective_from: 0,
180 },
181 properties,
182 in_node: start_ref,
183 out_node: end_ref,
184 })
185 }
186
187 fn matches_labels(
189 record_labels: &[String],
190 requested_labels: &[String],
191 _is_node: bool,
192 ) -> bool {
193 if requested_labels.is_empty() {
195 return true;
196 }
197
198 record_labels
200 .iter()
201 .any(|label| requested_labels.contains(label))
202 }
203
204 async fn process_records(
206 &self,
207 reader: &mut BootstrapScriptReader,
208 request: &BootstrapRequest,
209 context: &BootstrapContext,
210 event_tx: drasi_lib::channels::BootstrapEventSender,
211 ) -> Result<usize> {
212 let mut count = 0;
213
214 for record_result in reader {
215 let seq_record = record_result?;
216
217 match seq_record.record {
218 BootstrapScriptRecord::Node(node) => {
219 if Self::matches_labels(&node.labels, &request.node_labels, true) {
221 debug!("Processing node: id={}, labels={:?}", node.id, node.labels);
222
223 let element = Self::convert_node_to_element(&context.source_id, &node)?;
225
226 let source_change = SourceChange::Insert { element };
228
229 let sequence = context.next_sequence();
231
232 let bootstrap_event = drasi_lib::channels::BootstrapEvent {
233 source_id: context.source_id.clone(),
234 change: source_change,
235 timestamp: chrono::Utc::now(),
236 sequence,
237 };
238
239 event_tx
240 .send(bootstrap_event)
241 .await
242 .map_err(|e| anyhow!("Failed to send node: {e}"))?;
243
244 count += 1;
245 }
246 }
247 BootstrapScriptRecord::Relation(relation) => {
248 if Self::matches_labels(&relation.labels, &request.relation_labels, false) {
250 debug!(
251 "Processing relation: id={}, labels={:?}, start={}, end={}",
252 relation.id, relation.labels, relation.start_id, relation.end_id
253 );
254
255 let element =
257 Self::convert_relation_to_element(&context.source_id, &relation)?;
258
259 let source_change = SourceChange::Insert { element };
261
262 let sequence = context.next_sequence();
264
265 let bootstrap_event = drasi_lib::channels::BootstrapEvent {
266 source_id: context.source_id.clone(),
267 change: source_change,
268 timestamp: chrono::Utc::now(),
269 sequence,
270 };
271
272 event_tx
273 .send(bootstrap_event)
274 .await
275 .map_err(|e| anyhow!("Failed to send relation: {e}"))?;
276
277 count += 1;
278 }
279 }
280 BootstrapScriptRecord::Finish(finish) => {
281 debug!("Reached finish record: {}", finish.description);
282 break;
283 }
284 BootstrapScriptRecord::Label(label) => {
285 debug!("Skipping label record: {}", label.label);
286 }
287 BootstrapScriptRecord::Comment(_) => {
288 debug!("Skipping comment record");
290 }
291 BootstrapScriptRecord::Header(_) => {
292 debug!("Skipping header record in iteration");
294 }
295 }
296 }
297
298 Ok(count)
299 }
300}
301
302#[async_trait]
303impl BootstrapProvider for ScriptFileBootstrapProvider {
304 async fn bootstrap(
305 &self,
306 request: BootstrapRequest,
307 context: &BootstrapContext,
308 event_tx: drasi_lib::channels::BootstrapEventSender,
309 _settings: Option<&drasi_lib::config::SourceSubscriptionSettings>,
310 ) -> Result<BootstrapResult> {
311 info!(
312 "Starting script file bootstrap for query {} from {} file(s)",
313 request.query_id,
314 self.file_paths.len()
315 );
316
317 let paths: Vec<PathBuf> = self.file_paths.iter().map(PathBuf::from).collect();
319
320 let mut reader = BootstrapScriptReader::new(paths).map_err(|e| {
322 error!("Failed to create script reader: {e}");
323 anyhow!("Failed to create script reader: {e}")
324 })?;
325
326 let header = reader.get_header();
328 info!(
329 "Script header - start_time: {}, description: {}",
330 header.start_time, header.description
331 );
332
333 let count = self
335 .process_records(&mut reader, &request, context, event_tx)
336 .await?;
337
338 info!(
339 "Completed script file bootstrap for query {}: sent {} elements (requested node labels: {:?}, relation labels: {:?})",
340 request.query_id, count, request.node_labels, request.relation_labels
341 );
342
343 Ok(BootstrapResult {
344 event_count: count,
345 last_sequence: None,
346 sequences_aligned: false,
347 })
348 }
349}
350
351#[cfg(test)]
352mod tests {
353 use super::*;
354 use crate::script_types::{NodeRecord, RelationRecord};
355 use serde_json::json;
356
357 #[test]
358 fn test_convert_node_to_element() {
359 let node = NodeRecord {
360 id: "n1".to_string(),
361 labels: vec!["Person".to_string()],
362 properties: json!({"name": "Alice", "age": 30}),
363 };
364
365 let element =
366 ScriptFileBootstrapProvider::convert_node_to_element("test_source", &node).unwrap();
367
368 match element {
369 Element::Node {
370 metadata,
371 properties,
372 } => {
373 assert_eq!(metadata.reference.source_id.as_ref(), "test_source");
374 assert_eq!(metadata.reference.element_id.as_ref(), "n1");
375 assert_eq!(metadata.labels.len(), 1);
376 assert_eq!(metadata.labels[0].as_ref(), "Person");
377 assert!(properties.get(&Arc::from("name")).is_some());
378 }
379 _ => panic!("Expected Node element"),
380 }
381 }
382
383 #[test]
384 fn test_convert_node_with_null_properties() {
385 let node = NodeRecord {
386 id: "n1".to_string(),
387 labels: vec!["Person".to_string()],
388 properties: serde_json::Value::Null,
389 };
390
391 let element =
392 ScriptFileBootstrapProvider::convert_node_to_element("test_source", &node).unwrap();
393
394 match element {
395 Element::Node { properties, .. } => {
396 assert!(properties.get(&Arc::from("test")).is_none());
398 }
399 _ => panic!("Expected Node element"),
400 }
401 }
402
403 #[test]
404 fn test_convert_relation_to_element() {
405 let relation = RelationRecord {
406 id: "r1".to_string(),
407 labels: vec!["KNOWS".to_string()],
408 start_id: "n1".to_string(),
409 start_label: Some("Person".to_string()),
410 end_id: "n2".to_string(),
411 end_label: Some("Person".to_string()),
412 properties: json!({"since": 2020}),
413 };
414
415 let element =
416 ScriptFileBootstrapProvider::convert_relation_to_element("test_source", &relation)
417 .unwrap();
418
419 match element {
420 Element::Relation {
421 metadata,
422 out_node,
423 in_node,
424 properties,
425 } => {
426 assert_eq!(metadata.reference.source_id.as_ref(), "test_source");
427 assert_eq!(metadata.reference.element_id.as_ref(), "r1");
428 assert_eq!(metadata.labels.len(), 1);
429 assert_eq!(metadata.labels[0].as_ref(), "KNOWS");
430 assert_eq!(in_node.source_id.as_ref(), "test_source");
431 assert_eq!(in_node.element_id.as_ref(), "n1");
432 assert_eq!(out_node.source_id.as_ref(), "test_source");
433 assert_eq!(out_node.element_id.as_ref(), "n2");
434 assert!(properties.get(&Arc::from("since")).is_some());
435 }
436 _ => panic!("Expected Relation element"),
437 }
438 }
439
440 #[test]
441 fn test_matches_labels_empty_request() {
442 let record_labels = vec!["Person".to_string()];
443 let requested_labels = vec![];
444
445 assert!(ScriptFileBootstrapProvider::matches_labels(
446 &record_labels,
447 &requested_labels,
448 true
449 ));
450 }
451
452 #[test]
453 fn test_matches_labels_match() {
454 let record_labels = vec!["Person".to_string(), "Employee".to_string()];
455 let requested_labels = vec!["Person".to_string()];
456
457 assert!(ScriptFileBootstrapProvider::matches_labels(
458 &record_labels,
459 &requested_labels,
460 true
461 ));
462 }
463
464 #[test]
465 fn test_matches_labels_no_match() {
466 let record_labels = vec!["Person".to_string()];
467 let requested_labels = vec!["Company".to_string()];
468
469 assert!(!ScriptFileBootstrapProvider::matches_labels(
470 &record_labels,
471 &requested_labels,
472 true
473 ));
474 }
475
476 }