drasi_bootstrap_scriptfile/
script_file.rs1use anyhow::{anyhow, Result};
18use async_trait::async_trait;
19use drasi_core::models::{Element, ElementMetadata, ElementReference, SourceChange};
20use log::{debug, error, info};
21use std::path::PathBuf;
22use std::sync::Arc;
23
24use crate::script_reader::BootstrapScriptReader;
25use crate::script_types::{BootstrapScriptRecord, NodeRecord, RelationRecord};
26use drasi_lib::bootstrap::{BootstrapContext, BootstrapProvider, BootstrapRequest};
27use drasi_lib::sources::manager::convert_json_to_element_properties;
28
29use drasi_lib::bootstrap::ScriptFileBootstrapConfig;
30
31#[derive(Default)]
33pub struct ScriptFileBootstrapProvider {
34 file_paths: Vec<String>,
35}
36
37impl ScriptFileBootstrapProvider {
38 pub fn new(config: ScriptFileBootstrapConfig) -> Self {
43 Self {
44 file_paths: config.file_paths,
45 }
46 }
47
48 pub fn with_paths(file_paths: Vec<String>) -> Self {
53 Self { file_paths }
54 }
55
56 pub fn builder() -> ScriptFileBootstrapProviderBuilder {
58 ScriptFileBootstrapProviderBuilder::new()
59 }
60}
61
62pub struct ScriptFileBootstrapProviderBuilder {
75 file_paths: Vec<String>,
76}
77
78impl ScriptFileBootstrapProviderBuilder {
79 pub fn new() -> Self {
81 Self {
82 file_paths: Vec::new(),
83 }
84 }
85
86 pub fn with_file_paths(mut self, paths: Vec<String>) -> Self {
88 self.file_paths = paths;
89 self
90 }
91
92 pub fn with_file(mut self, path: impl Into<String>) -> Self {
94 self.file_paths.push(path.into());
95 self
96 }
97
98 pub fn build(self) -> ScriptFileBootstrapProvider {
100 ScriptFileBootstrapProvider::with_paths(self.file_paths)
101 }
102}
103
104impl Default for ScriptFileBootstrapProviderBuilder {
105 fn default() -> Self {
106 Self::new()
107 }
108}
109
110impl ScriptFileBootstrapProvider {
111 fn convert_node_to_element(source_id: &str, node: &NodeRecord) -> Result<Element> {
113 let properties = if let serde_json::Value::Object(obj) = &node.properties {
115 convert_json_to_element_properties(obj)?
116 } else if node.properties.is_null() {
117 Default::default()
118 } else {
119 return Err(anyhow!(
120 "ScriptFile bootstrap error: Node '{}' has invalid properties type. \
121 Properties must be a JSON object or null, found: {}",
122 node.id,
123 node.properties
124 ));
125 };
126
127 let labels: Arc<[Arc<str>]> = node
129 .labels
130 .iter()
131 .map(|l| Arc::from(l.as_str()))
132 .collect::<Vec<_>>()
133 .into();
134
135 Ok(Element::Node {
136 metadata: ElementMetadata {
137 reference: ElementReference::new(source_id, &node.id),
138 labels,
139 effective_from: 0,
140 },
141 properties,
142 })
143 }
144
145 fn convert_relation_to_element(source_id: &str, relation: &RelationRecord) -> Result<Element> {
147 let properties = if let serde_json::Value::Object(obj) = &relation.properties {
149 convert_json_to_element_properties(obj)?
150 } else if relation.properties.is_null() {
151 Default::default()
152 } else {
153 return Err(anyhow!(
154 "ScriptFile bootstrap error: Relation '{}' has invalid properties type. \
155 Properties must be a JSON object or null, found: {}",
156 relation.id,
157 relation.properties
158 ));
159 };
160
161 let labels: Arc<[Arc<str>]> = relation
163 .labels
164 .iter()
165 .map(|l| Arc::from(l.as_str()))
166 .collect::<Vec<_>>()
167 .into();
168
169 let start_ref = ElementReference::new(source_id, &relation.start_id);
171 let end_ref = ElementReference::new(source_id, &relation.end_id);
172
173 Ok(Element::Relation {
174 metadata: ElementMetadata {
175 reference: ElementReference::new(source_id, &relation.id),
176 labels,
177 effective_from: 0,
178 },
179 properties,
180 in_node: end_ref,
181 out_node: start_ref,
182 })
183 }
184
185 fn matches_labels(
187 record_labels: &[String],
188 requested_labels: &[String],
189 _is_node: bool,
190 ) -> bool {
191 if requested_labels.is_empty() {
193 return true;
194 }
195
196 record_labels
198 .iter()
199 .any(|label| requested_labels.contains(label))
200 }
201
202 async fn process_records(
204 &self,
205 reader: &mut BootstrapScriptReader,
206 request: &BootstrapRequest,
207 context: &BootstrapContext,
208 event_tx: drasi_lib::channels::BootstrapEventSender,
209 ) -> Result<usize> {
210 let mut count = 0;
211
212 for record_result in reader {
213 let seq_record = record_result?;
214
215 match seq_record.record {
216 BootstrapScriptRecord::Node(node) => {
217 if Self::matches_labels(&node.labels, &request.node_labels, true) {
219 debug!("Processing node: id={}, labels={:?}", node.id, node.labels);
220
221 let element = Self::convert_node_to_element(&context.source_id, &node)?;
223
224 let source_change = SourceChange::Insert { element };
226
227 let sequence = context.next_sequence();
229
230 let bootstrap_event = drasi_lib::channels::BootstrapEvent {
231 source_id: context.source_id.clone(),
232 change: source_change,
233 timestamp: chrono::Utc::now(),
234 sequence,
235 };
236
237 event_tx
238 .send(bootstrap_event)
239 .await
240 .map_err(|e| anyhow!("Failed to send node: {e}"))?;
241
242 count += 1;
243 }
244 }
245 BootstrapScriptRecord::Relation(relation) => {
246 if Self::matches_labels(&relation.labels, &request.relation_labels, false) {
248 debug!(
249 "Processing relation: id={}, labels={:?}, start={}, end={}",
250 relation.id, relation.labels, relation.start_id, relation.end_id
251 );
252
253 let element =
255 Self::convert_relation_to_element(&context.source_id, &relation)?;
256
257 let source_change = SourceChange::Insert { element };
259
260 let sequence = context.next_sequence();
262
263 let bootstrap_event = drasi_lib::channels::BootstrapEvent {
264 source_id: context.source_id.clone(),
265 change: source_change,
266 timestamp: chrono::Utc::now(),
267 sequence,
268 };
269
270 event_tx
271 .send(bootstrap_event)
272 .await
273 .map_err(|e| anyhow!("Failed to send relation: {e}"))?;
274
275 count += 1;
276 }
277 }
278 BootstrapScriptRecord::Finish(finish) => {
279 debug!("Reached finish record: {}", finish.description);
280 break;
281 }
282 BootstrapScriptRecord::Label(label) => {
283 debug!("Skipping label record: {}", label.label);
284 }
285 BootstrapScriptRecord::Comment(_) => {
286 debug!("Skipping comment record");
288 }
289 BootstrapScriptRecord::Header(_) => {
290 debug!("Skipping header record in iteration");
292 }
293 }
294 }
295
296 Ok(count)
297 }
298}
299
300#[async_trait]
301impl BootstrapProvider for ScriptFileBootstrapProvider {
302 async fn bootstrap(
303 &self,
304 request: BootstrapRequest,
305 context: &BootstrapContext,
306 event_tx: drasi_lib::channels::BootstrapEventSender,
307 _settings: Option<&drasi_lib::config::SourceSubscriptionSettings>,
308 ) -> Result<usize> {
309 info!(
310 "Starting script file bootstrap for query {} from {} file(s)",
311 request.query_id,
312 self.file_paths.len()
313 );
314
315 let paths: Vec<PathBuf> = self.file_paths.iter().map(PathBuf::from).collect();
317
318 let mut reader = BootstrapScriptReader::new(paths).map_err(|e| {
320 error!("Failed to create script reader: {e}");
321 anyhow!("Failed to create script reader: {e}")
322 })?;
323
324 let header = reader.get_header();
326 info!(
327 "Script header - start_time: {}, description: {}",
328 header.start_time, header.description
329 );
330
331 let count = self
333 .process_records(&mut reader, &request, context, event_tx)
334 .await?;
335
336 info!(
337 "Completed script file bootstrap for query {}: sent {} elements (requested node labels: {:?}, relation labels: {:?})",
338 request.query_id, count, request.node_labels, request.relation_labels
339 );
340
341 Ok(count)
342 }
343}
344
345#[cfg(test)]
346mod tests {
347 use super::*;
348 use crate::script_types::{NodeRecord, RelationRecord};
349 use serde_json::json;
350
351 #[test]
352 fn test_convert_node_to_element() {
353 let node = NodeRecord {
354 id: "n1".to_string(),
355 labels: vec!["Person".to_string()],
356 properties: json!({"name": "Alice", "age": 30}),
357 };
358
359 let element =
360 ScriptFileBootstrapProvider::convert_node_to_element("test_source", &node).unwrap();
361
362 match element {
363 Element::Node {
364 metadata,
365 properties,
366 } => {
367 assert_eq!(metadata.reference.source_id.as_ref(), "test_source");
368 assert_eq!(metadata.reference.element_id.as_ref(), "n1");
369 assert_eq!(metadata.labels.len(), 1);
370 assert_eq!(metadata.labels[0].as_ref(), "Person");
371 assert!(properties.get(&Arc::from("name")).is_some());
372 }
373 _ => panic!("Expected Node element"),
374 }
375 }
376
377 #[test]
378 fn test_convert_node_with_null_properties() {
379 let node = NodeRecord {
380 id: "n1".to_string(),
381 labels: vec!["Person".to_string()],
382 properties: serde_json::Value::Null,
383 };
384
385 let element =
386 ScriptFileBootstrapProvider::convert_node_to_element("test_source", &node).unwrap();
387
388 match element {
389 Element::Node { properties, .. } => {
390 assert!(properties.get(&Arc::from("test")).is_none());
392 }
393 _ => panic!("Expected Node element"),
394 }
395 }
396
397 #[test]
398 fn test_convert_relation_to_element() {
399 let relation = RelationRecord {
400 id: "r1".to_string(),
401 labels: vec!["KNOWS".to_string()],
402 start_id: "n1".to_string(),
403 start_label: Some("Person".to_string()),
404 end_id: "n2".to_string(),
405 end_label: Some("Person".to_string()),
406 properties: json!({"since": 2020}),
407 };
408
409 let element =
410 ScriptFileBootstrapProvider::convert_relation_to_element("test_source", &relation)
411 .unwrap();
412
413 match element {
414 Element::Relation {
415 metadata,
416 out_node,
417 in_node,
418 properties,
419 } => {
420 assert_eq!(metadata.reference.source_id.as_ref(), "test_source");
421 assert_eq!(metadata.reference.element_id.as_ref(), "r1");
422 assert_eq!(metadata.labels.len(), 1);
423 assert_eq!(metadata.labels[0].as_ref(), "KNOWS");
424 assert_eq!(out_node.source_id.as_ref(), "test_source");
425 assert_eq!(out_node.element_id.as_ref(), "n1");
426 assert_eq!(in_node.source_id.as_ref(), "test_source");
427 assert_eq!(in_node.element_id.as_ref(), "n2");
428 assert!(properties.get(&Arc::from("since")).is_some());
429 }
430 _ => panic!("Expected Relation element"),
431 }
432 }
433
434 #[test]
435 fn test_matches_labels_empty_request() {
436 let record_labels = vec!["Person".to_string()];
437 let requested_labels = vec![];
438
439 assert!(ScriptFileBootstrapProvider::matches_labels(
440 &record_labels,
441 &requested_labels,
442 true
443 ));
444 }
445
446 #[test]
447 fn test_matches_labels_match() {
448 let record_labels = vec!["Person".to_string(), "Employee".to_string()];
449 let requested_labels = vec!["Person".to_string()];
450
451 assert!(ScriptFileBootstrapProvider::matches_labels(
452 &record_labels,
453 &requested_labels,
454 true
455 ));
456 }
457
458 #[test]
459 fn test_matches_labels_no_match() {
460 let record_labels = vec!["Person".to_string()];
461 let requested_labels = vec!["Company".to_string()];
462
463 assert!(!ScriptFileBootstrapProvider::matches_labels(
464 &record_labels,
465 &requested_labels,
466 true
467 ));
468 }
469
470 }