drasi_bootstrap_platform/
platform.rs1use anyhow::{Context, Result};
22use async_trait::async_trait;
23use chrono::Utc;
24use futures::StreamExt;
25use log::{debug, info, warn};
26use reqwest::Client;
27use serde::{Deserialize, Serialize};
28use serde_json::Map;
29use std::sync::Arc;
30use std::time::Duration;
31
32use drasi_core::models::{Element, ElementMetadata, ElementReference, SourceChange};
33use drasi_lib::bootstrap::{
34 BootstrapContext, BootstrapProvider, BootstrapRequest, BootstrapResult,
35};
36use drasi_lib::sources::manager::convert_json_to_element_properties;
37
38#[derive(Debug, Clone, Serialize)]
40#[serde(rename_all = "camelCase")]
41struct SubscriptionRequest {
42 query_id: String,
43 query_node_id: String,
44 node_labels: Vec<String>,
45 rel_labels: Vec<String>,
46}
47
48#[derive(Debug, Clone, Deserialize)]
50#[serde(rename_all = "camelCase")]
51struct BootstrapElement {
52 id: String,
53 labels: Vec<String>,
54 properties: Map<String, serde_json::Value>,
55 #[serde(skip_serializing_if = "Option::is_none")]
56 start_id: Option<String>,
57 #[serde(skip_serializing_if = "Option::is_none")]
58 end_id: Option<String>,
59}
60
61use drasi_lib::bootstrap::PlatformBootstrapConfig;
62
63pub struct PlatformBootstrapProvider {
65 query_api_url: String,
66 client: Client,
67}
68
69impl PlatformBootstrapProvider {
70 pub fn new(config: PlatformBootstrapConfig) -> Result<Self> {
78 let query_api_url = config.query_api_url.ok_or_else(|| {
79 anyhow::anyhow!("query_api_url is required for PlatformBootstrapProvider")
80 })?;
81
82 Self::create_internal(query_api_url, config.timeout_seconds)
83 }
84
85 pub fn with_url(query_api_url: impl Into<String>, timeout_seconds: u64) -> Result<Self> {
94 Self::create_internal(query_api_url.into(), timeout_seconds)
95 }
96
97 pub fn builder() -> PlatformBootstrapProviderBuilder {
99 PlatformBootstrapProviderBuilder::new()
100 }
101
102 fn create_internal(query_api_url: String, timeout_seconds: u64) -> Result<Self> {
104 reqwest::Url::parse(&query_api_url)
106 .context(format!("Invalid query_api_url: {query_api_url}"))?;
107
108 let timeout = Duration::from_secs(timeout_seconds);
109 let client = Client::builder()
110 .timeout(timeout)
111 .build()
112 .context("Failed to build HTTP client")?;
113
114 Ok(Self {
115 query_api_url,
116 client,
117 })
118 }
119
120 async fn make_subscription_request(
125 &self,
126 request: &BootstrapRequest,
127 context: &BootstrapContext,
128 ) -> Result<reqwest::Response> {
129 let subscription_req = SubscriptionRequest {
130 query_id: request.query_id.clone(),
131 query_node_id: context.server_id.clone(),
132 node_labels: request.node_labels.clone(),
133 rel_labels: request.relation_labels.clone(),
134 };
135
136 let url = format!("{}/subscription", self.query_api_url);
137 debug!(
138 "Making bootstrap subscription request to {} for query {}",
139 url, request.query_id
140 );
141
142 let response = self
143 .client
144 .post(&url)
145 .json(&subscription_req)
146 .send()
147 .await
148 .context(format!("Failed to connect to Query API at {url}"))?;
149
150 if !response.status().is_success() {
151 let status = response.status();
152 let error_text = response
153 .text()
154 .await
155 .unwrap_or_else(|_| "Unable to read error response".to_string());
156 return Err(anyhow::anyhow!(
157 "Query API returned error status {status}: {error_text}"
158 ));
159 }
160
161 debug!(
162 "Successfully connected to Query API, preparing to stream bootstrap data for query {}",
163 request.query_id
164 );
165 Ok(response)
166 }
167
168 async fn process_bootstrap_stream(
173 &self,
174 response: reqwest::Response,
175 ) -> Result<Vec<BootstrapElement>> {
176 let mut elements = Vec::new();
177 let mut line_buffer = String::new();
178 let mut byte_stream = response.bytes_stream();
179 let mut element_count = 0;
180
181 while let Some(chunk_result) = byte_stream.next().await {
182 let chunk = chunk_result.context("Error reading stream chunk")?;
183 let chunk_str =
184 std::str::from_utf8(&chunk).context("Invalid UTF-8 in stream response")?;
185
186 line_buffer.push_str(chunk_str);
188
189 while let Some(newline_pos) = line_buffer.find('\n') {
191 let line = line_buffer[..newline_pos].trim().to_string();
192 line_buffer = line_buffer[newline_pos + 1..].to_string();
193
194 if line.is_empty() {
196 continue;
197 }
198
199 match serde_json::from_str::<BootstrapElement>(&line) {
201 Ok(element) => {
202 elements.push(element);
203 element_count += 1;
204 if element_count % 1000 == 0 {
205 debug!("Received {element_count} bootstrap elements from stream");
206 }
207 }
208 Err(e) => {
209 warn!("Failed to parse bootstrap element from JSON: {e} - Line: {line}");
210 }
212 }
213 }
214 }
215
216 let remaining = line_buffer.trim();
218 if !remaining.is_empty() {
219 match serde_json::from_str::<BootstrapElement>(remaining) {
220 Ok(element) => {
221 elements.push(element);
222 element_count += 1;
223 }
224 Err(e) => {
225 warn!(
226 "Failed to parse final bootstrap element from JSON: {e} - Line: {remaining}"
227 );
228 }
229 }
230 }
231
232 info!("Received total of {element_count} bootstrap elements from Query API stream");
233 Ok(elements)
234 }
235}
236
237pub struct PlatformBootstrapProviderBuilder {
251 query_api_url: Option<String>,
252 timeout_seconds: u64,
253}
254
255impl PlatformBootstrapProviderBuilder {
256 pub fn new() -> Self {
258 Self {
259 query_api_url: None,
260 timeout_seconds: 300, }
262 }
263
264 pub fn with_query_api_url(mut self, url: impl Into<String>) -> Self {
266 self.query_api_url = Some(url.into());
267 self
268 }
269
270 pub fn with_timeout_seconds(mut self, seconds: u64) -> Self {
272 self.timeout_seconds = seconds;
273 self
274 }
275
276 pub fn build(self) -> Result<PlatformBootstrapProvider> {
280 let query_api_url = self
281 .query_api_url
282 .ok_or_else(|| anyhow::anyhow!("query_api_url is required"))?;
283
284 PlatformBootstrapProvider::create_internal(query_api_url, self.timeout_seconds)
285 }
286}
287
288impl Default for PlatformBootstrapProviderBuilder {
289 fn default() -> Self {
290 Self::new()
291 }
292}
293
294#[async_trait]
295impl BootstrapProvider for PlatformBootstrapProvider {
296 async fn bootstrap(
297 &self,
298 request: BootstrapRequest,
299 context: &BootstrapContext,
300 event_tx: drasi_lib::channels::BootstrapEventSender,
301 _settings: Option<&drasi_lib::config::SourceSubscriptionSettings>,
302 ) -> Result<BootstrapResult> {
303 info!(
304 "Starting platform bootstrap for query {} from source {}",
305 request.query_id, context.source_id
306 );
307
308 let response = self
310 .make_subscription_request(&request, context)
311 .await
312 .context("Failed to make subscription request to Query API")?;
313
314 let bootstrap_elements = self
316 .process_bootstrap_stream(response)
317 .await
318 .context("Failed to process bootstrap stream from Query API")?;
319
320 debug!(
321 "Processing {} bootstrap elements for query {}",
322 bootstrap_elements.len(),
323 request.query_id
324 );
325
326 let mut sent_count = 0;
327 let mut filtered_nodes = 0;
328 let mut filtered_relations = 0;
329
330 for bootstrap_elem in bootstrap_elements {
331 let is_relation = bootstrap_elem.start_id.is_some() && bootstrap_elem.end_id.is_some();
333
334 let should_process = if is_relation {
336 matches_labels(&bootstrap_elem.labels, &request.relation_labels)
337 } else {
338 matches_labels(&bootstrap_elem.labels, &request.node_labels)
339 };
340
341 if !should_process {
342 if is_relation {
343 filtered_relations += 1;
344 } else {
345 filtered_nodes += 1;
346 }
347 continue;
348 }
349
350 let element = transform_element(&context.source_id, bootstrap_elem)
352 .context("Failed to transform bootstrap element")?;
353
354 let source_change = SourceChange::Insert { element };
356
357 let sequence = context.next_sequence();
359
360 let bootstrap_event = drasi_lib::channels::BootstrapEvent {
362 source_id: context.source_id.clone(),
363 change: source_change,
364 timestamp: Utc::now(),
365 sequence,
366 };
367
368 event_tx
369 .send(bootstrap_event)
370 .await
371 .context("Failed to send bootstrap element via channel")?;
372
373 sent_count += 1;
374 }
375
376 debug!(
377 "Filtered {filtered_nodes} nodes and {filtered_relations} relations based on requested labels"
378 );
379
380 info!(
381 "Completed platform bootstrap for query {}: sent {} elements",
382 request.query_id, sent_count
383 );
384
385 Ok(BootstrapResult {
386 event_count: sent_count,
387 last_sequence: None,
388 sequences_aligned: false,
389 })
390 }
391}
392
393fn matches_labels(element_labels: &[String], requested_labels: &[String]) -> bool {
398 requested_labels.is_empty()
399 || element_labels
400 .iter()
401 .any(|label| requested_labels.contains(label))
402}
403
404fn transform_element(source_id: &str, bootstrap_elem: BootstrapElement) -> Result<Element> {
409 let properties = convert_json_to_element_properties(&bootstrap_elem.properties);
411
412 let labels: Arc<[Arc<str>]> = bootstrap_elem
414 .labels
415 .iter()
416 .map(|l| Arc::from(l.as_str()))
417 .collect::<Vec<_>>()
418 .into();
419
420 if let (Some(start_id), Some(end_id)) = (&bootstrap_elem.start_id, &bootstrap_elem.end_id) {
422 let in_node = ElementReference::new(source_id, start_id);
424 let out_node = ElementReference::new(source_id, end_id);
425
426 Ok(Element::Relation {
427 metadata: ElementMetadata {
428 reference: ElementReference::new(source_id, &bootstrap_elem.id),
429 labels,
430 effective_from: 0,
431 },
432 properties,
433 in_node,
434 out_node,
435 })
436 } else {
437 Ok(Element::Node {
439 metadata: ElementMetadata {
440 reference: ElementReference::new(source_id, &bootstrap_elem.id),
441 labels,
442 effective_from: 0,
443 },
444 properties,
445 })
446 }
447}
448
449#[cfg(test)]
450mod tests {
451 use super::*;
452
453 #[test]
454 fn test_matches_labels_empty_requested() {
455 let element_labels = vec!["Person".to_string(), "Employee".to_string()];
457 let requested_labels = vec![];
458
459 assert!(matches_labels(&element_labels, &requested_labels));
460 }
461
462 #[test]
463 fn test_matches_labels_matching() {
464 let element_labels = vec!["Person".to_string(), "Employee".to_string()];
466 let requested_labels = vec!["Person".to_string()];
467
468 assert!(matches_labels(&element_labels, &requested_labels));
469 }
470
471 #[test]
472 fn test_matches_labels_non_matching() {
473 let element_labels = vec!["Person".to_string(), "Employee".to_string()];
475 let requested_labels = vec!["Company".to_string()];
476
477 assert!(!matches_labels(&element_labels, &requested_labels));
478 }
479
480 #[test]
481 fn test_matches_labels_partial_overlap() {
482 let element_labels = vec!["Person".to_string(), "Employee".to_string()];
484 let requested_labels = vec!["Employee".to_string(), "Company".to_string()];
485
486 assert!(matches_labels(&element_labels, &requested_labels));
487 }
488
489 #[test]
490 fn test_matches_labels_empty_element() {
491 let element_labels = vec![];
493 let requested_labels = vec!["Person".to_string()];
494
495 assert!(!matches_labels(&element_labels, &requested_labels));
496 }
497
498 #[test]
499 fn test_matches_labels_both_empty() {
500 let element_labels = vec![];
502 let requested_labels = vec![];
503
504 assert!(matches_labels(&element_labels, &requested_labels));
505 }
506
507 #[test]
508 fn test_transform_element_node() {
509 let mut properties = Map::new();
511 properties.insert("name".to_string(), serde_json::json!("Alice"));
512 properties.insert("age".to_string(), serde_json::json!(30));
513
514 let bootstrap_elem = BootstrapElement {
515 id: "1".to_string(),
516 labels: vec!["Person".to_string()],
517 properties,
518 start_id: None,
519 end_id: None,
520 };
521
522 let element = transform_element("test-source", bootstrap_elem).unwrap();
523
524 match element {
525 Element::Node { metadata, .. } => {
526 assert_eq!(metadata.reference.element_id.as_ref(), "1");
527 assert_eq!(metadata.labels.len(), 1);
528 assert_eq!(metadata.labels[0].as_ref(), "Person");
529 }
530 _ => panic!("Expected Node element"),
531 }
532 }
533
534 #[test]
535 fn test_transform_element_relation() {
536 let mut properties = Map::new();
538 properties.insert("since".to_string(), serde_json::json!("2020"));
539
540 let bootstrap_elem = BootstrapElement {
541 id: "r1".to_string(),
542 labels: vec!["WORKS_FOR".to_string()],
543 properties,
544 start_id: Some("1".to_string()),
545 end_id: Some("2".to_string()),
546 };
547
548 let element = transform_element("test-source", bootstrap_elem).unwrap();
549
550 match element {
551 Element::Relation {
552 metadata,
553 in_node,
554 out_node,
555 ..
556 } => {
557 assert_eq!(metadata.reference.element_id.as_ref(), "r1");
558 assert_eq!(metadata.labels.len(), 1);
559 assert_eq!(metadata.labels[0].as_ref(), "WORKS_FOR");
560 assert_eq!(in_node.element_id.as_ref(), "1");
561 assert_eq!(out_node.element_id.as_ref(), "2");
562 }
563 _ => panic!("Expected Relation element"),
564 }
565 }
566
567 #[test]
568 fn test_transform_element_various_property_types() {
569 let mut properties = Map::new();
571 properties.insert("string_prop".to_string(), serde_json::json!("text"));
572 properties.insert("number_prop".to_string(), serde_json::json!(42));
573 properties.insert("float_prop".to_string(), serde_json::json!(1.23456));
574 properties.insert("bool_prop".to_string(), serde_json::json!(true));
575 properties.insert("null_prop".to_string(), serde_json::json!(null));
576
577 let bootstrap_elem = BootstrapElement {
578 id: "1".to_string(),
579 labels: vec!["Test".to_string()],
580 properties,
581 start_id: None,
582 end_id: None,
583 };
584
585 let element = transform_element("test-source", bootstrap_elem).unwrap();
586
587 match element {
588 Element::Node { metadata, .. } => {
589 assert_eq!(metadata.reference.element_id.as_ref(), "1");
590 }
592 _ => panic!("Expected Node element"),
593 }
594 }
595
596 #[test]
597 fn test_transform_element_empty_properties() {
598 let bootstrap_elem = BootstrapElement {
600 id: "1".to_string(),
601 labels: vec!["Empty".to_string()],
602 properties: Map::new(),
603 start_id: None,
604 end_id: None,
605 };
606
607 let element = transform_element("test-source", bootstrap_elem).unwrap();
608
609 match element {
610 Element::Node { metadata, .. } => {
611 assert_eq!(metadata.reference.element_id.as_ref(), "1");
612 }
614 _ => panic!("Expected Node element"),
615 }
616 }
617}