Skip to main content

drasi_bootstrap_platform/
platform.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Platform bootstrap provider for remote Drasi sources
16//!
17//! This provider bootstraps data from a Query API service running in a remote
18//! Drasi environment. It makes HTTP requests to the Query API service and
19//! processes streaming JSON-NL responses.
20
21use anyhow::{Context, Result};
22use async_trait::async_trait;
23use chrono::Utc;
24use futures::StreamExt;
25use log::{debug, info, warn};
26use reqwest::Client;
27use serde::{Deserialize, Serialize};
28use serde_json::Map;
29use std::sync::Arc;
30use std::time::Duration;
31
32use drasi_core::models::{Element, ElementMetadata, ElementReference, SourceChange};
33use drasi_lib::bootstrap::{BootstrapContext, BootstrapProvider, BootstrapRequest};
34use drasi_lib::sources::manager::convert_json_to_element_properties;
35
36/// Request format for Query API subscription
37#[derive(Debug, Clone, Serialize)]
38#[serde(rename_all = "camelCase")]
39struct SubscriptionRequest {
40    query_id: String,
41    query_node_id: String,
42    node_labels: Vec<String>,
43    rel_labels: Vec<String>,
44}
45
46/// Bootstrap element format from Query API service (matches platform SDK)
47#[derive(Debug, Clone, Deserialize)]
48#[serde(rename_all = "camelCase")]
49struct BootstrapElement {
50    id: String,
51    labels: Vec<String>,
52    properties: Map<String, serde_json::Value>,
53    #[serde(skip_serializing_if = "Option::is_none")]
54    start_id: Option<String>,
55    #[serde(skip_serializing_if = "Option::is_none")]
56    end_id: Option<String>,
57}
58
59use drasi_lib::bootstrap::PlatformBootstrapConfig;
60
61/// Platform bootstrap provider that connects to Query API service
62pub struct PlatformBootstrapProvider {
63    query_api_url: String,
64    client: Client,
65}
66
67impl PlatformBootstrapProvider {
68    /// Create a new platform bootstrap provider from configuration
69    ///
70    /// # Arguments
71    /// * `config` - Platform bootstrap configuration
72    ///
73    /// # Returns
74    /// Returns a new instance of PlatformBootstrapProvider or an error if configuration is invalid
75    pub fn new(config: PlatformBootstrapConfig) -> Result<Self> {
76        let query_api_url = config.query_api_url.ok_or_else(|| {
77            anyhow::anyhow!("query_api_url is required for PlatformBootstrapProvider")
78        })?;
79
80        Self::create_internal(query_api_url, config.timeout_seconds)
81    }
82
83    /// Create a new platform bootstrap provider with explicit parameters
84    ///
85    /// # Arguments
86    /// * `query_api_url` - Base URL of the Query API service (e.g., "http://my-source-query-api:8080")
87    /// * `timeout_seconds` - Timeout for HTTP requests in seconds
88    ///
89    /// # Returns
90    /// Returns a new instance of PlatformBootstrapProvider or an error if the URL is invalid
91    pub fn with_url(query_api_url: impl Into<String>, timeout_seconds: u64) -> Result<Self> {
92        Self::create_internal(query_api_url.into(), timeout_seconds)
93    }
94
95    /// Create a builder for PlatformBootstrapProvider
96    pub fn builder() -> PlatformBootstrapProviderBuilder {
97        PlatformBootstrapProviderBuilder::new()
98    }
99
100    /// Internal constructor
101    fn create_internal(query_api_url: String, timeout_seconds: u64) -> Result<Self> {
102        // Validate URL format
103        reqwest::Url::parse(&query_api_url)
104            .context(format!("Invalid query_api_url: {query_api_url}"))?;
105
106        let timeout = Duration::from_secs(timeout_seconds);
107        let client = Client::builder()
108            .timeout(timeout)
109            .build()
110            .context("Failed to build HTTP client")?;
111
112        Ok(Self {
113            query_api_url,
114            client,
115        })
116    }
117
118    /// Make HTTP subscription request to Query API service
119    ///
120    /// Constructs a subscription request and sends it to the Query API service's
121    /// subscription endpoint.
122    async fn make_subscription_request(
123        &self,
124        request: &BootstrapRequest,
125        context: &BootstrapContext,
126    ) -> Result<reqwest::Response> {
127        let subscription_req = SubscriptionRequest {
128            query_id: request.query_id.clone(),
129            query_node_id: context.server_id.clone(),
130            node_labels: request.node_labels.clone(),
131            rel_labels: request.relation_labels.clone(),
132        };
133
134        let url = format!("{}/subscription", self.query_api_url);
135        debug!(
136            "Making bootstrap subscription request to {} for query {}",
137            url, request.query_id
138        );
139
140        let response = self
141            .client
142            .post(&url)
143            .json(&subscription_req)
144            .send()
145            .await
146            .context(format!("Failed to connect to Query API at {url}"))?;
147
148        if !response.status().is_success() {
149            let status = response.status();
150            let error_text = response
151                .text()
152                .await
153                .unwrap_or_else(|_| "Unable to read error response".to_string());
154            return Err(anyhow::anyhow!(
155                "Query API returned error status {status}: {error_text}"
156            ));
157        }
158
159        debug!(
160            "Successfully connected to Query API, preparing to stream bootstrap data for query {}",
161            request.query_id
162        );
163        Ok(response)
164    }
165
166    /// Process streaming JSON-NL response from Query API
167    ///
168    /// Reads the response body as a stream of bytes, splits by newlines,
169    /// and parses each line as a BootstrapElement.
170    async fn process_bootstrap_stream(
171        &self,
172        response: reqwest::Response,
173    ) -> Result<Vec<BootstrapElement>> {
174        let mut elements = Vec::new();
175        let mut line_buffer = String::new();
176        let mut byte_stream = response.bytes_stream();
177        let mut element_count = 0;
178
179        while let Some(chunk_result) = byte_stream.next().await {
180            let chunk = chunk_result.context("Error reading stream chunk")?;
181            let chunk_str =
182                std::str::from_utf8(&chunk).context("Invalid UTF-8 in stream response")?;
183
184            // Add chunk to buffer
185            line_buffer.push_str(chunk_str);
186
187            // Process complete lines
188            while let Some(newline_pos) = line_buffer.find('\n') {
189                let line = line_buffer[..newline_pos].trim().to_string();
190                line_buffer = line_buffer[newline_pos + 1..].to_string();
191
192                // Skip empty lines
193                if line.is_empty() {
194                    continue;
195                }
196
197                // Parse JSON line
198                match serde_json::from_str::<BootstrapElement>(&line) {
199                    Ok(element) => {
200                        elements.push(element);
201                        element_count += 1;
202                        if element_count % 1000 == 0 {
203                            debug!("Received {element_count} bootstrap elements from stream");
204                        }
205                    }
206                    Err(e) => {
207                        warn!("Failed to parse bootstrap element from JSON: {e} - Line: {line}");
208                        // Continue processing other elements
209                    }
210                }
211            }
212        }
213
214        // Process any remaining data in buffer
215        let remaining = line_buffer.trim();
216        if !remaining.is_empty() {
217            match serde_json::from_str::<BootstrapElement>(remaining) {
218                Ok(element) => {
219                    elements.push(element);
220                    element_count += 1;
221                }
222                Err(e) => {
223                    warn!(
224                        "Failed to parse final bootstrap element from JSON: {e} - Line: {remaining}"
225                    );
226                }
227            }
228        }
229
230        info!("Received total of {element_count} bootstrap elements from Query API stream");
231        Ok(elements)
232    }
233}
234
235/// Builder for PlatformBootstrapProvider
236///
237/// # Example
238///
239/// ```no_run
240/// use drasi_bootstrap_platform::PlatformBootstrapProvider;
241///
242/// let provider = PlatformBootstrapProvider::builder()
243///     .with_query_api_url("http://remote-drasi:8080")
244///     .with_timeout_seconds(600)
245///     .build()
246///     .expect("Failed to create platform bootstrap provider");
247/// ```
248pub struct PlatformBootstrapProviderBuilder {
249    query_api_url: Option<String>,
250    timeout_seconds: u64,
251}
252
253impl PlatformBootstrapProviderBuilder {
254    /// Create a new builder
255    pub fn new() -> Self {
256        Self {
257            query_api_url: None,
258            timeout_seconds: 300, // Default timeout
259        }
260    }
261
262    /// Set the Query API URL (required)
263    pub fn with_query_api_url(mut self, url: impl Into<String>) -> Self {
264        self.query_api_url = Some(url.into());
265        self
266    }
267
268    /// Set the timeout in seconds (default: 300)
269    pub fn with_timeout_seconds(mut self, seconds: u64) -> Self {
270        self.timeout_seconds = seconds;
271        self
272    }
273
274    /// Build the PlatformBootstrapProvider
275    ///
276    /// Returns an error if query_api_url is not set or is invalid
277    pub fn build(self) -> Result<PlatformBootstrapProvider> {
278        let query_api_url = self
279            .query_api_url
280            .ok_or_else(|| anyhow::anyhow!("query_api_url is required"))?;
281
282        PlatformBootstrapProvider::create_internal(query_api_url, self.timeout_seconds)
283    }
284}
285
286impl Default for PlatformBootstrapProviderBuilder {
287    fn default() -> Self {
288        Self::new()
289    }
290}
291
292#[async_trait]
293impl BootstrapProvider for PlatformBootstrapProvider {
294    async fn bootstrap(
295        &self,
296        request: BootstrapRequest,
297        context: &BootstrapContext,
298        event_tx: drasi_lib::channels::BootstrapEventSender,
299        _settings: Option<&drasi_lib::config::SourceSubscriptionSettings>,
300    ) -> Result<usize> {
301        info!(
302            "Starting platform bootstrap for query {} from source {}",
303            request.query_id, context.source_id
304        );
305
306        // Make HTTP request to Query API service
307        let response = self
308            .make_subscription_request(&request, context)
309            .await
310            .context("Failed to make subscription request to Query API")?;
311
312        // Process streaming response
313        let bootstrap_elements = self
314            .process_bootstrap_stream(response)
315            .await
316            .context("Failed to process bootstrap stream from Query API")?;
317
318        debug!(
319            "Processing {} bootstrap elements for query {}",
320            bootstrap_elements.len(),
321            request.query_id
322        );
323
324        let mut sent_count = 0;
325        let mut filtered_nodes = 0;
326        let mut filtered_relations = 0;
327
328        for bootstrap_elem in bootstrap_elements {
329            // Determine if this is a node or relation based on start_id/end_id
330            let is_relation = bootstrap_elem.start_id.is_some() && bootstrap_elem.end_id.is_some();
331
332            // Filter by appropriate labels
333            let should_process = if is_relation {
334                matches_labels(&bootstrap_elem.labels, &request.relation_labels)
335            } else {
336                matches_labels(&bootstrap_elem.labels, &request.node_labels)
337            };
338
339            if !should_process {
340                if is_relation {
341                    filtered_relations += 1;
342                } else {
343                    filtered_nodes += 1;
344                }
345                continue;
346            }
347
348            // Transform to drasi-core Element
349            let element = transform_element(&context.source_id, bootstrap_elem)
350                .context("Failed to transform bootstrap element")?;
351
352            // Wrap in SourceChange::Insert
353            let source_change = SourceChange::Insert { element };
354
355            // Get next sequence number for this bootstrap event
356            let sequence = context.next_sequence();
357
358            // Send via channel
359            let bootstrap_event = drasi_lib::channels::BootstrapEvent {
360                source_id: context.source_id.clone(),
361                change: source_change,
362                timestamp: Utc::now(),
363                sequence,
364            };
365
366            event_tx
367                .send(bootstrap_event)
368                .await
369                .context("Failed to send bootstrap element via channel")?;
370
371            sent_count += 1;
372        }
373
374        debug!(
375            "Filtered {filtered_nodes} nodes and {filtered_relations} relations based on requested labels"
376        );
377
378        info!(
379            "Completed platform bootstrap for query {}: sent {} elements",
380            request.query_id, sent_count
381        );
382
383        Ok(sent_count)
384    }
385}
386
387/// Check if element labels match requested labels
388///
389/// Returns true if requested_labels is empty (match all) or if any element label
390/// is present in requested_labels
391fn matches_labels(element_labels: &[String], requested_labels: &[String]) -> bool {
392    requested_labels.is_empty()
393        || element_labels
394            .iter()
395            .any(|label| requested_labels.contains(label))
396}
397
398/// Transform BootstrapElement to drasi-core Element
399///
400/// Converts platform SDK format to drasi-core format. Determines if element is
401/// a Node or Relation based on presence of start_id/end_id fields.
402fn transform_element(source_id: &str, bootstrap_elem: BootstrapElement) -> Result<Element> {
403    // Convert properties from JSON Map to ElementPropertyMap
404    let properties = convert_json_to_element_properties(&bootstrap_elem.properties)
405        .context("Failed to convert element properties")?;
406
407    // Convert labels to Arc slice
408    let labels: Arc<[Arc<str>]> = bootstrap_elem
409        .labels
410        .iter()
411        .map(|l| Arc::from(l.as_str()))
412        .collect::<Vec<_>>()
413        .into();
414
415    // Check if this is a relation (has start_id and end_id)
416    if let (Some(start_id), Some(end_id)) = (&bootstrap_elem.start_id, &bootstrap_elem.end_id) {
417        // Create start and end element references
418        let in_node = ElementReference::new(source_id, start_id);
419        let out_node = ElementReference::new(source_id, end_id);
420
421        Ok(Element::Relation {
422            metadata: ElementMetadata {
423                reference: ElementReference::new(source_id, &bootstrap_elem.id),
424                labels,
425                effective_from: 0,
426            },
427            properties,
428            in_node,
429            out_node,
430        })
431    } else {
432        // This is a node
433        Ok(Element::Node {
434            metadata: ElementMetadata {
435                reference: ElementReference::new(source_id, &bootstrap_elem.id),
436                labels,
437                effective_from: 0,
438            },
439            properties,
440        })
441    }
442}
443
444#[cfg(test)]
445mod tests {
446    use super::*;
447
448    #[test]
449    fn test_matches_labels_empty_requested() {
450        // Empty requested labels should match all
451        let element_labels = vec!["Person".to_string(), "Employee".to_string()];
452        let requested_labels = vec![];
453
454        assert!(matches_labels(&element_labels, &requested_labels));
455    }
456
457    #[test]
458    fn test_matches_labels_matching() {
459        // Should return true when element has one of the requested labels
460        let element_labels = vec!["Person".to_string(), "Employee".to_string()];
461        let requested_labels = vec!["Person".to_string()];
462
463        assert!(matches_labels(&element_labels, &requested_labels));
464    }
465
466    #[test]
467    fn test_matches_labels_non_matching() {
468        // Should return false when element has none of the requested labels
469        let element_labels = vec!["Person".to_string(), "Employee".to_string()];
470        let requested_labels = vec!["Company".to_string()];
471
472        assert!(!matches_labels(&element_labels, &requested_labels));
473    }
474
475    #[test]
476    fn test_matches_labels_partial_overlap() {
477        // Should return true when there's any overlap
478        let element_labels = vec!["Person".to_string(), "Employee".to_string()];
479        let requested_labels = vec!["Employee".to_string(), "Company".to_string()];
480
481        assert!(matches_labels(&element_labels, &requested_labels));
482    }
483
484    #[test]
485    fn test_matches_labels_empty_element() {
486        // Empty element labels with non-empty requested should not match
487        let element_labels = vec![];
488        let requested_labels = vec!["Person".to_string()];
489
490        assert!(!matches_labels(&element_labels, &requested_labels));
491    }
492
493    #[test]
494    fn test_matches_labels_both_empty() {
495        // Both empty should match (empty requested matches all)
496        let element_labels = vec![];
497        let requested_labels = vec![];
498
499        assert!(matches_labels(&element_labels, &requested_labels));
500    }
501
502    #[test]
503    fn test_transform_element_node() {
504        // Test transforming a node (no start_id/end_id)
505        let mut properties = Map::new();
506        properties.insert("name".to_string(), serde_json::json!("Alice"));
507        properties.insert("age".to_string(), serde_json::json!(30));
508
509        let bootstrap_elem = BootstrapElement {
510            id: "1".to_string(),
511            labels: vec!["Person".to_string()],
512            properties,
513            start_id: None,
514            end_id: None,
515        };
516
517        let element = transform_element("test-source", bootstrap_elem).unwrap();
518
519        match element {
520            Element::Node { metadata, .. } => {
521                assert_eq!(metadata.reference.element_id.as_ref(), "1");
522                assert_eq!(metadata.labels.len(), 1);
523                assert_eq!(metadata.labels[0].as_ref(), "Person");
524            }
525            _ => panic!("Expected Node element"),
526        }
527    }
528
529    #[test]
530    fn test_transform_element_relation() {
531        // Test transforming a relation (with start_id and end_id)
532        let mut properties = Map::new();
533        properties.insert("since".to_string(), serde_json::json!("2020"));
534
535        let bootstrap_elem = BootstrapElement {
536            id: "r1".to_string(),
537            labels: vec!["WORKS_FOR".to_string()],
538            properties,
539            start_id: Some("1".to_string()),
540            end_id: Some("2".to_string()),
541        };
542
543        let element = transform_element("test-source", bootstrap_elem).unwrap();
544
545        match element {
546            Element::Relation {
547                metadata,
548                in_node,
549                out_node,
550                ..
551            } => {
552                assert_eq!(metadata.reference.element_id.as_ref(), "r1");
553                assert_eq!(metadata.labels.len(), 1);
554                assert_eq!(metadata.labels[0].as_ref(), "WORKS_FOR");
555                assert_eq!(in_node.element_id.as_ref(), "1");
556                assert_eq!(out_node.element_id.as_ref(), "2");
557            }
558            _ => panic!("Expected Relation element"),
559        }
560    }
561
562    #[test]
563    fn test_transform_element_various_property_types() {
564        // Test property conversion with various JSON value types
565        let mut properties = Map::new();
566        properties.insert("string_prop".to_string(), serde_json::json!("text"));
567        properties.insert("number_prop".to_string(), serde_json::json!(42));
568        properties.insert("float_prop".to_string(), serde_json::json!(1.23456));
569        properties.insert("bool_prop".to_string(), serde_json::json!(true));
570        properties.insert("null_prop".to_string(), serde_json::json!(null));
571
572        let bootstrap_elem = BootstrapElement {
573            id: "1".to_string(),
574            labels: vec!["Test".to_string()],
575            properties,
576            start_id: None,
577            end_id: None,
578        };
579
580        let element = transform_element("test-source", bootstrap_elem).unwrap();
581
582        match element {
583            Element::Node { metadata, .. } => {
584                assert_eq!(metadata.reference.element_id.as_ref(), "1");
585                // Properties were successfully converted
586            }
587            _ => panic!("Expected Node element"),
588        }
589    }
590
591    #[test]
592    fn test_transform_element_empty_properties() {
593        // Test with empty properties
594        let bootstrap_elem = BootstrapElement {
595            id: "1".to_string(),
596            labels: vec!["Empty".to_string()],
597            properties: Map::new(),
598            start_id: None,
599            end_id: None,
600        };
601
602        let element = transform_element("test-source", bootstrap_elem).unwrap();
603
604        match element {
605            Element::Node { metadata, .. } => {
606                assert_eq!(metadata.reference.element_id.as_ref(), "1");
607                // Properties transformed successfully (even if empty)
608            }
609            _ => panic!("Expected Node element"),
610        }
611    }
612}