drasi_bootstrap_platform/
platform.rs1use anyhow::{Context, Result};
22use async_trait::async_trait;
23use chrono::Utc;
24use futures::StreamExt;
25use log::{debug, info, warn};
26use reqwest::Client;
27use serde::{Deserialize, Serialize};
28use serde_json::Map;
29use std::sync::Arc;
30use std::time::Duration;
31
32use drasi_core::models::{Element, ElementMetadata, ElementReference, SourceChange};
33use drasi_lib::bootstrap::{BootstrapContext, BootstrapProvider, BootstrapRequest};
34use drasi_lib::sources::manager::convert_json_to_element_properties;
35
36#[derive(Debug, Clone, Serialize)]
38#[serde(rename_all = "camelCase")]
39struct SubscriptionRequest {
40 query_id: String,
41 query_node_id: String,
42 node_labels: Vec<String>,
43 rel_labels: Vec<String>,
44}
45
46#[derive(Debug, Clone, Deserialize)]
48#[serde(rename_all = "camelCase")]
49struct BootstrapElement {
50 id: String,
51 labels: Vec<String>,
52 properties: Map<String, serde_json::Value>,
53 #[serde(skip_serializing_if = "Option::is_none")]
54 start_id: Option<String>,
55 #[serde(skip_serializing_if = "Option::is_none")]
56 end_id: Option<String>,
57}
58
59use drasi_lib::bootstrap::PlatformBootstrapConfig;
60
61pub struct PlatformBootstrapProvider {
63 query_api_url: String,
64 client: Client,
65}
66
67impl PlatformBootstrapProvider {
68 pub fn new(config: PlatformBootstrapConfig) -> Result<Self> {
76 let query_api_url = config.query_api_url.ok_or_else(|| {
77 anyhow::anyhow!("query_api_url is required for PlatformBootstrapProvider")
78 })?;
79
80 Self::create_internal(query_api_url, config.timeout_seconds)
81 }
82
83 pub fn with_url(query_api_url: impl Into<String>, timeout_seconds: u64) -> Result<Self> {
92 Self::create_internal(query_api_url.into(), timeout_seconds)
93 }
94
95 pub fn builder() -> PlatformBootstrapProviderBuilder {
97 PlatformBootstrapProviderBuilder::new()
98 }
99
100 fn create_internal(query_api_url: String, timeout_seconds: u64) -> Result<Self> {
102 reqwest::Url::parse(&query_api_url)
104 .context(format!("Invalid query_api_url: {query_api_url}"))?;
105
106 let timeout = Duration::from_secs(timeout_seconds);
107 let client = Client::builder()
108 .timeout(timeout)
109 .build()
110 .context("Failed to build HTTP client")?;
111
112 Ok(Self {
113 query_api_url,
114 client,
115 })
116 }
117
118 async fn make_subscription_request(
123 &self,
124 request: &BootstrapRequest,
125 context: &BootstrapContext,
126 ) -> Result<reqwest::Response> {
127 let subscription_req = SubscriptionRequest {
128 query_id: request.query_id.clone(),
129 query_node_id: context.server_id.clone(),
130 node_labels: request.node_labels.clone(),
131 rel_labels: request.relation_labels.clone(),
132 };
133
134 let url = format!("{}/subscription", self.query_api_url);
135 debug!(
136 "Making bootstrap subscription request to {} for query {}",
137 url, request.query_id
138 );
139
140 let response = self
141 .client
142 .post(&url)
143 .json(&subscription_req)
144 .send()
145 .await
146 .context(format!("Failed to connect to Query API at {url}"))?;
147
148 if !response.status().is_success() {
149 let status = response.status();
150 let error_text = response
151 .text()
152 .await
153 .unwrap_or_else(|_| "Unable to read error response".to_string());
154 return Err(anyhow::anyhow!(
155 "Query API returned error status {status}: {error_text}"
156 ));
157 }
158
159 debug!(
160 "Successfully connected to Query API, preparing to stream bootstrap data for query {}",
161 request.query_id
162 );
163 Ok(response)
164 }
165
166 async fn process_bootstrap_stream(
171 &self,
172 response: reqwest::Response,
173 ) -> Result<Vec<BootstrapElement>> {
174 let mut elements = Vec::new();
175 let mut line_buffer = String::new();
176 let mut byte_stream = response.bytes_stream();
177 let mut element_count = 0;
178
179 while let Some(chunk_result) = byte_stream.next().await {
180 let chunk = chunk_result.context("Error reading stream chunk")?;
181 let chunk_str =
182 std::str::from_utf8(&chunk).context("Invalid UTF-8 in stream response")?;
183
184 line_buffer.push_str(chunk_str);
186
187 while let Some(newline_pos) = line_buffer.find('\n') {
189 let line = line_buffer[..newline_pos].trim().to_string();
190 line_buffer = line_buffer[newline_pos + 1..].to_string();
191
192 if line.is_empty() {
194 continue;
195 }
196
197 match serde_json::from_str::<BootstrapElement>(&line) {
199 Ok(element) => {
200 elements.push(element);
201 element_count += 1;
202 if element_count % 1000 == 0 {
203 debug!("Received {element_count} bootstrap elements from stream");
204 }
205 }
206 Err(e) => {
207 warn!("Failed to parse bootstrap element from JSON: {e} - Line: {line}");
208 }
210 }
211 }
212 }
213
214 let remaining = line_buffer.trim();
216 if !remaining.is_empty() {
217 match serde_json::from_str::<BootstrapElement>(remaining) {
218 Ok(element) => {
219 elements.push(element);
220 element_count += 1;
221 }
222 Err(e) => {
223 warn!(
224 "Failed to parse final bootstrap element from JSON: {e} - Line: {remaining}"
225 );
226 }
227 }
228 }
229
230 info!("Received total of {element_count} bootstrap elements from Query API stream");
231 Ok(elements)
232 }
233}
234
235pub struct PlatformBootstrapProviderBuilder {
249 query_api_url: Option<String>,
250 timeout_seconds: u64,
251}
252
253impl PlatformBootstrapProviderBuilder {
254 pub fn new() -> Self {
256 Self {
257 query_api_url: None,
258 timeout_seconds: 300, }
260 }
261
262 pub fn with_query_api_url(mut self, url: impl Into<String>) -> Self {
264 self.query_api_url = Some(url.into());
265 self
266 }
267
268 pub fn with_timeout_seconds(mut self, seconds: u64) -> Self {
270 self.timeout_seconds = seconds;
271 self
272 }
273
274 pub fn build(self) -> Result<PlatformBootstrapProvider> {
278 let query_api_url = self
279 .query_api_url
280 .ok_or_else(|| anyhow::anyhow!("query_api_url is required"))?;
281
282 PlatformBootstrapProvider::create_internal(query_api_url, self.timeout_seconds)
283 }
284}
285
286impl Default for PlatformBootstrapProviderBuilder {
287 fn default() -> Self {
288 Self::new()
289 }
290}
291
292#[async_trait]
293impl BootstrapProvider for PlatformBootstrapProvider {
294 async fn bootstrap(
295 &self,
296 request: BootstrapRequest,
297 context: &BootstrapContext,
298 event_tx: drasi_lib::channels::BootstrapEventSender,
299 _settings: Option<&drasi_lib::config::SourceSubscriptionSettings>,
300 ) -> Result<usize> {
301 info!(
302 "Starting platform bootstrap for query {} from source {}",
303 request.query_id, context.source_id
304 );
305
306 let response = self
308 .make_subscription_request(&request, context)
309 .await
310 .context("Failed to make subscription request to Query API")?;
311
312 let bootstrap_elements = self
314 .process_bootstrap_stream(response)
315 .await
316 .context("Failed to process bootstrap stream from Query API")?;
317
318 debug!(
319 "Processing {} bootstrap elements for query {}",
320 bootstrap_elements.len(),
321 request.query_id
322 );
323
324 let mut sent_count = 0;
325 let mut filtered_nodes = 0;
326 let mut filtered_relations = 0;
327
328 for bootstrap_elem in bootstrap_elements {
329 let is_relation = bootstrap_elem.start_id.is_some() && bootstrap_elem.end_id.is_some();
331
332 let should_process = if is_relation {
334 matches_labels(&bootstrap_elem.labels, &request.relation_labels)
335 } else {
336 matches_labels(&bootstrap_elem.labels, &request.node_labels)
337 };
338
339 if !should_process {
340 if is_relation {
341 filtered_relations += 1;
342 } else {
343 filtered_nodes += 1;
344 }
345 continue;
346 }
347
348 let element = transform_element(&context.source_id, bootstrap_elem)
350 .context("Failed to transform bootstrap element")?;
351
352 let source_change = SourceChange::Insert { element };
354
355 let sequence = context.next_sequence();
357
358 let bootstrap_event = drasi_lib::channels::BootstrapEvent {
360 source_id: context.source_id.clone(),
361 change: source_change,
362 timestamp: Utc::now(),
363 sequence,
364 };
365
366 event_tx
367 .send(bootstrap_event)
368 .await
369 .context("Failed to send bootstrap element via channel")?;
370
371 sent_count += 1;
372 }
373
374 debug!(
375 "Filtered {filtered_nodes} nodes and {filtered_relations} relations based on requested labels"
376 );
377
378 info!(
379 "Completed platform bootstrap for query {}: sent {} elements",
380 request.query_id, sent_count
381 );
382
383 Ok(sent_count)
384 }
385}
386
387fn matches_labels(element_labels: &[String], requested_labels: &[String]) -> bool {
392 requested_labels.is_empty()
393 || element_labels
394 .iter()
395 .any(|label| requested_labels.contains(label))
396}
397
398fn transform_element(source_id: &str, bootstrap_elem: BootstrapElement) -> Result<Element> {
403 let properties = convert_json_to_element_properties(&bootstrap_elem.properties)
405 .context("Failed to convert element properties")?;
406
407 let labels: Arc<[Arc<str>]> = bootstrap_elem
409 .labels
410 .iter()
411 .map(|l| Arc::from(l.as_str()))
412 .collect::<Vec<_>>()
413 .into();
414
415 if let (Some(start_id), Some(end_id)) = (&bootstrap_elem.start_id, &bootstrap_elem.end_id) {
417 let in_node = ElementReference::new(source_id, start_id);
419 let out_node = ElementReference::new(source_id, end_id);
420
421 Ok(Element::Relation {
422 metadata: ElementMetadata {
423 reference: ElementReference::new(source_id, &bootstrap_elem.id),
424 labels,
425 effective_from: 0,
426 },
427 properties,
428 in_node,
429 out_node,
430 })
431 } else {
432 Ok(Element::Node {
434 metadata: ElementMetadata {
435 reference: ElementReference::new(source_id, &bootstrap_elem.id),
436 labels,
437 effective_from: 0,
438 },
439 properties,
440 })
441 }
442}
443
444#[cfg(test)]
445mod tests {
446 use super::*;
447
448 #[test]
449 fn test_matches_labels_empty_requested() {
450 let element_labels = vec!["Person".to_string(), "Employee".to_string()];
452 let requested_labels = vec![];
453
454 assert!(matches_labels(&element_labels, &requested_labels));
455 }
456
457 #[test]
458 fn test_matches_labels_matching() {
459 let element_labels = vec!["Person".to_string(), "Employee".to_string()];
461 let requested_labels = vec!["Person".to_string()];
462
463 assert!(matches_labels(&element_labels, &requested_labels));
464 }
465
466 #[test]
467 fn test_matches_labels_non_matching() {
468 let element_labels = vec!["Person".to_string(), "Employee".to_string()];
470 let requested_labels = vec!["Company".to_string()];
471
472 assert!(!matches_labels(&element_labels, &requested_labels));
473 }
474
475 #[test]
476 fn test_matches_labels_partial_overlap() {
477 let element_labels = vec!["Person".to_string(), "Employee".to_string()];
479 let requested_labels = vec!["Employee".to_string(), "Company".to_string()];
480
481 assert!(matches_labels(&element_labels, &requested_labels));
482 }
483
484 #[test]
485 fn test_matches_labels_empty_element() {
486 let element_labels = vec![];
488 let requested_labels = vec!["Person".to_string()];
489
490 assert!(!matches_labels(&element_labels, &requested_labels));
491 }
492
493 #[test]
494 fn test_matches_labels_both_empty() {
495 let element_labels = vec![];
497 let requested_labels = vec![];
498
499 assert!(matches_labels(&element_labels, &requested_labels));
500 }
501
502 #[test]
503 fn test_transform_element_node() {
504 let mut properties = Map::new();
506 properties.insert("name".to_string(), serde_json::json!("Alice"));
507 properties.insert("age".to_string(), serde_json::json!(30));
508
509 let bootstrap_elem = BootstrapElement {
510 id: "1".to_string(),
511 labels: vec!["Person".to_string()],
512 properties,
513 start_id: None,
514 end_id: None,
515 };
516
517 let element = transform_element("test-source", bootstrap_elem).unwrap();
518
519 match element {
520 Element::Node { metadata, .. } => {
521 assert_eq!(metadata.reference.element_id.as_ref(), "1");
522 assert_eq!(metadata.labels.len(), 1);
523 assert_eq!(metadata.labels[0].as_ref(), "Person");
524 }
525 _ => panic!("Expected Node element"),
526 }
527 }
528
529 #[test]
530 fn test_transform_element_relation() {
531 let mut properties = Map::new();
533 properties.insert("since".to_string(), serde_json::json!("2020"));
534
535 let bootstrap_elem = BootstrapElement {
536 id: "r1".to_string(),
537 labels: vec!["WORKS_FOR".to_string()],
538 properties,
539 start_id: Some("1".to_string()),
540 end_id: Some("2".to_string()),
541 };
542
543 let element = transform_element("test-source", bootstrap_elem).unwrap();
544
545 match element {
546 Element::Relation {
547 metadata,
548 in_node,
549 out_node,
550 ..
551 } => {
552 assert_eq!(metadata.reference.element_id.as_ref(), "r1");
553 assert_eq!(metadata.labels.len(), 1);
554 assert_eq!(metadata.labels[0].as_ref(), "WORKS_FOR");
555 assert_eq!(in_node.element_id.as_ref(), "1");
556 assert_eq!(out_node.element_id.as_ref(), "2");
557 }
558 _ => panic!("Expected Relation element"),
559 }
560 }
561
562 #[test]
563 fn test_transform_element_various_property_types() {
564 let mut properties = Map::new();
566 properties.insert("string_prop".to_string(), serde_json::json!("text"));
567 properties.insert("number_prop".to_string(), serde_json::json!(42));
568 properties.insert("float_prop".to_string(), serde_json::json!(1.23456));
569 properties.insert("bool_prop".to_string(), serde_json::json!(true));
570 properties.insert("null_prop".to_string(), serde_json::json!(null));
571
572 let bootstrap_elem = BootstrapElement {
573 id: "1".to_string(),
574 labels: vec!["Test".to_string()],
575 properties,
576 start_id: None,
577 end_id: None,
578 };
579
580 let element = transform_element("test-source", bootstrap_elem).unwrap();
581
582 match element {
583 Element::Node { metadata, .. } => {
584 assert_eq!(metadata.reference.element_id.as_ref(), "1");
585 }
587 _ => panic!("Expected Node element"),
588 }
589 }
590
591 #[test]
592 fn test_transform_element_empty_properties() {
593 let bootstrap_elem = BootstrapElement {
595 id: "1".to_string(),
596 labels: vec!["Empty".to_string()],
597 properties: Map::new(),
598 start_id: None,
599 end_id: None,
600 };
601
602 let element = transform_element("test-source", bootstrap_elem).unwrap();
603
604 match element {
605 Element::Node { metadata, .. } => {
606 assert_eq!(metadata.reference.element_id.as_ref(), "1");
607 }
609 _ => panic!("Expected Node element"),
610 }
611 }
612}