Skip to main content

drasi_bootstrap_platform/
platform.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Platform bootstrap provider for remote Drasi sources
16//!
17//! This provider bootstraps data from a Query API service running in a remote
18//! Drasi environment. It makes HTTP requests to the Query API service and
19//! processes streaming JSON-NL responses.
20
21use anyhow::{Context, Result};
22use async_trait::async_trait;
23use chrono::Utc;
24use futures::StreamExt;
25use log::{debug, info, warn};
26use reqwest::Client;
27use serde::{Deserialize, Serialize};
28use serde_json::Map;
29use std::sync::Arc;
30use std::time::Duration;
31
32use drasi_core::models::{Element, ElementMetadata, ElementReference, SourceChange};
33use drasi_lib::bootstrap::{
34    BootstrapContext, BootstrapProvider, BootstrapRequest, BootstrapResult,
35};
36use drasi_lib::sources::manager::convert_json_to_element_properties;
37
38/// Request format for Query API subscription
39#[derive(Debug, Clone, Serialize)]
40#[serde(rename_all = "camelCase")]
41struct SubscriptionRequest {
42    query_id: String,
43    query_node_id: String,
44    node_labels: Vec<String>,
45    rel_labels: Vec<String>,
46}
47
48/// Bootstrap element format from Query API service (matches platform SDK)
49#[derive(Debug, Clone, Deserialize)]
50#[serde(rename_all = "camelCase")]
51struct BootstrapElement {
52    id: String,
53    labels: Vec<String>,
54    properties: Map<String, serde_json::Value>,
55    #[serde(skip_serializing_if = "Option::is_none")]
56    start_id: Option<String>,
57    #[serde(skip_serializing_if = "Option::is_none")]
58    end_id: Option<String>,
59}
60
61use drasi_lib::bootstrap::PlatformBootstrapConfig;
62
63/// Platform bootstrap provider that connects to Query API service
64pub struct PlatformBootstrapProvider {
65    query_api_url: String,
66    client: Client,
67}
68
69impl PlatformBootstrapProvider {
70    /// Create a new platform bootstrap provider from configuration
71    ///
72    /// # Arguments
73    /// * `config` - Platform bootstrap configuration
74    ///
75    /// # Returns
76    /// Returns a new instance of PlatformBootstrapProvider or an error if configuration is invalid
77    pub fn new(config: PlatformBootstrapConfig) -> Result<Self> {
78        let query_api_url = config.query_api_url.ok_or_else(|| {
79            anyhow::anyhow!("query_api_url is required for PlatformBootstrapProvider")
80        })?;
81
82        Self::create_internal(query_api_url, config.timeout_seconds)
83    }
84
85    /// Create a new platform bootstrap provider with explicit parameters
86    ///
87    /// # Arguments
88    /// * `query_api_url` - Base URL of the Query API service (e.g., "http://my-source-query-api:8080")
89    /// * `timeout_seconds` - Timeout for HTTP requests in seconds
90    ///
91    /// # Returns
92    /// Returns a new instance of PlatformBootstrapProvider or an error if the URL is invalid
93    pub fn with_url(query_api_url: impl Into<String>, timeout_seconds: u64) -> Result<Self> {
94        Self::create_internal(query_api_url.into(), timeout_seconds)
95    }
96
97    /// Create a builder for PlatformBootstrapProvider
98    pub fn builder() -> PlatformBootstrapProviderBuilder {
99        PlatformBootstrapProviderBuilder::new()
100    }
101
102    /// Internal constructor
103    fn create_internal(query_api_url: String, timeout_seconds: u64) -> Result<Self> {
104        // Validate URL format
105        reqwest::Url::parse(&query_api_url)
106            .context(format!("Invalid query_api_url: {query_api_url}"))?;
107
108        let timeout = Duration::from_secs(timeout_seconds);
109        let client = Client::builder()
110            .timeout(timeout)
111            .build()
112            .context("Failed to build HTTP client")?;
113
114        Ok(Self {
115            query_api_url,
116            client,
117        })
118    }
119
120    /// Make HTTP subscription request to Query API service
121    ///
122    /// Constructs a subscription request and sends it to the Query API service's
123    /// subscription endpoint.
124    async fn make_subscription_request(
125        &self,
126        request: &BootstrapRequest,
127        context: &BootstrapContext,
128    ) -> Result<reqwest::Response> {
129        let subscription_req = SubscriptionRequest {
130            query_id: request.query_id.clone(),
131            query_node_id: context.server_id.clone(),
132            node_labels: request.node_labels.clone(),
133            rel_labels: request.relation_labels.clone(),
134        };
135
136        let url = format!("{}/subscription", self.query_api_url);
137        debug!(
138            "Making bootstrap subscription request to {} for query {}",
139            url, request.query_id
140        );
141
142        let response = self
143            .client
144            .post(&url)
145            .json(&subscription_req)
146            .send()
147            .await
148            .context(format!("Failed to connect to Query API at {url}"))?;
149
150        if !response.status().is_success() {
151            let status = response.status();
152            let error_text = response
153                .text()
154                .await
155                .unwrap_or_else(|_| "Unable to read error response".to_string());
156            return Err(anyhow::anyhow!(
157                "Query API returned error status {status}: {error_text}"
158            ));
159        }
160
161        debug!(
162            "Successfully connected to Query API, preparing to stream bootstrap data for query {}",
163            request.query_id
164        );
165        Ok(response)
166    }
167
168    /// Process streaming JSON-NL response from Query API
169    ///
170    /// Reads the response body as a stream of bytes, splits by newlines,
171    /// and parses each line as a BootstrapElement.
172    async fn process_bootstrap_stream(
173        &self,
174        response: reqwest::Response,
175    ) -> Result<Vec<BootstrapElement>> {
176        let mut elements = Vec::new();
177        let mut line_buffer = String::new();
178        let mut byte_stream = response.bytes_stream();
179        let mut element_count = 0;
180
181        while let Some(chunk_result) = byte_stream.next().await {
182            let chunk = chunk_result.context("Error reading stream chunk")?;
183            let chunk_str =
184                std::str::from_utf8(&chunk).context("Invalid UTF-8 in stream response")?;
185
186            // Add chunk to buffer
187            line_buffer.push_str(chunk_str);
188
189            // Process complete lines
190            while let Some(newline_pos) = line_buffer.find('\n') {
191                let line = line_buffer[..newline_pos].trim().to_string();
192                line_buffer = line_buffer[newline_pos + 1..].to_string();
193
194                // Skip empty lines
195                if line.is_empty() {
196                    continue;
197                }
198
199                // Parse JSON line
200                match serde_json::from_str::<BootstrapElement>(&line) {
201                    Ok(element) => {
202                        elements.push(element);
203                        element_count += 1;
204                        if element_count % 1000 == 0 {
205                            debug!("Received {element_count} bootstrap elements from stream");
206                        }
207                    }
208                    Err(e) => {
209                        warn!("Failed to parse bootstrap element from JSON: {e} - Line: {line}");
210                        // Continue processing other elements
211                    }
212                }
213            }
214        }
215
216        // Process any remaining data in buffer
217        let remaining = line_buffer.trim();
218        if !remaining.is_empty() {
219            match serde_json::from_str::<BootstrapElement>(remaining) {
220                Ok(element) => {
221                    elements.push(element);
222                    element_count += 1;
223                }
224                Err(e) => {
225                    warn!(
226                        "Failed to parse final bootstrap element from JSON: {e} - Line: {remaining}"
227                    );
228                }
229            }
230        }
231
232        info!("Received total of {element_count} bootstrap elements from Query API stream");
233        Ok(elements)
234    }
235}
236
237/// Builder for PlatformBootstrapProvider
238///
239/// # Example
240///
241/// ```no_run
242/// use drasi_bootstrap_platform::PlatformBootstrapProvider;
243///
244/// let provider = PlatformBootstrapProvider::builder()
245///     .with_query_api_url("http://remote-drasi:8080")
246///     .with_timeout_seconds(600)
247///     .build()
248///     .expect("Failed to create platform bootstrap provider");
249/// ```
250pub struct PlatformBootstrapProviderBuilder {
251    query_api_url: Option<String>,
252    timeout_seconds: u64,
253}
254
255impl PlatformBootstrapProviderBuilder {
256    /// Create a new builder
257    pub fn new() -> Self {
258        Self {
259            query_api_url: None,
260            timeout_seconds: 300, // Default timeout
261        }
262    }
263
264    /// Set the Query API URL (required)
265    pub fn with_query_api_url(mut self, url: impl Into<String>) -> Self {
266        self.query_api_url = Some(url.into());
267        self
268    }
269
270    /// Set the timeout in seconds (default: 300)
271    pub fn with_timeout_seconds(mut self, seconds: u64) -> Self {
272        self.timeout_seconds = seconds;
273        self
274    }
275
276    /// Build the PlatformBootstrapProvider
277    ///
278    /// Returns an error if query_api_url is not set or is invalid
279    pub fn build(self) -> Result<PlatformBootstrapProvider> {
280        let query_api_url = self
281            .query_api_url
282            .ok_or_else(|| anyhow::anyhow!("query_api_url is required"))?;
283
284        PlatformBootstrapProvider::create_internal(query_api_url, self.timeout_seconds)
285    }
286}
287
288impl Default for PlatformBootstrapProviderBuilder {
289    fn default() -> Self {
290        Self::new()
291    }
292}
293
294#[async_trait]
295impl BootstrapProvider for PlatformBootstrapProvider {
296    async fn bootstrap(
297        &self,
298        request: BootstrapRequest,
299        context: &BootstrapContext,
300        event_tx: drasi_lib::channels::BootstrapEventSender,
301        _settings: Option<&drasi_lib::config::SourceSubscriptionSettings>,
302    ) -> Result<BootstrapResult> {
303        info!(
304            "Starting platform bootstrap for query {} from source {}",
305            request.query_id, context.source_id
306        );
307
308        // Make HTTP request to Query API service
309        let response = self
310            .make_subscription_request(&request, context)
311            .await
312            .context("Failed to make subscription request to Query API")?;
313
314        // Process streaming response
315        let bootstrap_elements = self
316            .process_bootstrap_stream(response)
317            .await
318            .context("Failed to process bootstrap stream from Query API")?;
319
320        debug!(
321            "Processing {} bootstrap elements for query {}",
322            bootstrap_elements.len(),
323            request.query_id
324        );
325
326        let mut sent_count = 0;
327        let mut filtered_nodes = 0;
328        let mut filtered_relations = 0;
329
330        for bootstrap_elem in bootstrap_elements {
331            // Determine if this is a node or relation based on start_id/end_id
332            let is_relation = bootstrap_elem.start_id.is_some() && bootstrap_elem.end_id.is_some();
333
334            // Filter by appropriate labels
335            let should_process = if is_relation {
336                matches_labels(&bootstrap_elem.labels, &request.relation_labels)
337            } else {
338                matches_labels(&bootstrap_elem.labels, &request.node_labels)
339            };
340
341            if !should_process {
342                if is_relation {
343                    filtered_relations += 1;
344                } else {
345                    filtered_nodes += 1;
346                }
347                continue;
348            }
349
350            // Transform to drasi-core Element
351            let element = transform_element(&context.source_id, bootstrap_elem)
352                .context("Failed to transform bootstrap element")?;
353
354            // Wrap in SourceChange::Insert
355            let source_change = SourceChange::Insert { element };
356
357            // Get next sequence number for this bootstrap event
358            let sequence = context.next_sequence();
359
360            // Send via channel
361            let bootstrap_event = drasi_lib::channels::BootstrapEvent {
362                source_id: context.source_id.clone(),
363                change: source_change,
364                timestamp: Utc::now(),
365                sequence,
366            };
367
368            event_tx
369                .send(bootstrap_event)
370                .await
371                .context("Failed to send bootstrap element via channel")?;
372
373            sent_count += 1;
374        }
375
376        debug!(
377            "Filtered {filtered_nodes} nodes and {filtered_relations} relations based on requested labels"
378        );
379
380        info!(
381            "Completed platform bootstrap for query {}: sent {} elements",
382            request.query_id, sent_count
383        );
384
385        Ok(BootstrapResult {
386            event_count: sent_count,
387            last_sequence: None,
388            sequences_aligned: false,
389        })
390    }
391}
392
393/// Check if element labels match requested labels
394///
395/// Returns true if requested_labels is empty (match all) or if any element label
396/// is present in requested_labels
397fn matches_labels(element_labels: &[String], requested_labels: &[String]) -> bool {
398    requested_labels.is_empty()
399        || element_labels
400            .iter()
401            .any(|label| requested_labels.contains(label))
402}
403
404/// Transform BootstrapElement to drasi-core Element
405///
406/// Converts platform SDK format to drasi-core format. Determines if element is
407/// a Node or Relation based on presence of start_id/end_id fields.
408fn transform_element(source_id: &str, bootstrap_elem: BootstrapElement) -> Result<Element> {
409    // Convert properties from JSON Map to ElementPropertyMap
410    let properties = convert_json_to_element_properties(&bootstrap_elem.properties);
411
412    // Convert labels to Arc slice
413    let labels: Arc<[Arc<str>]> = bootstrap_elem
414        .labels
415        .iter()
416        .map(|l| Arc::from(l.as_str()))
417        .collect::<Vec<_>>()
418        .into();
419
420    // Check if this is a relation (has start_id and end_id)
421    if let (Some(start_id), Some(end_id)) = (&bootstrap_elem.start_id, &bootstrap_elem.end_id) {
422        // Create start and end element references
423        let in_node = ElementReference::new(source_id, start_id);
424        let out_node = ElementReference::new(source_id, end_id);
425
426        Ok(Element::Relation {
427            metadata: ElementMetadata {
428                reference: ElementReference::new(source_id, &bootstrap_elem.id),
429                labels,
430                effective_from: 0,
431            },
432            properties,
433            in_node,
434            out_node,
435        })
436    } else {
437        // This is a node
438        Ok(Element::Node {
439            metadata: ElementMetadata {
440                reference: ElementReference::new(source_id, &bootstrap_elem.id),
441                labels,
442                effective_from: 0,
443            },
444            properties,
445        })
446    }
447}
448
449#[cfg(test)]
450mod tests {
451    use super::*;
452
453    #[test]
454    fn test_matches_labels_empty_requested() {
455        // Empty requested labels should match all
456        let element_labels = vec!["Person".to_string(), "Employee".to_string()];
457        let requested_labels = vec![];
458
459        assert!(matches_labels(&element_labels, &requested_labels));
460    }
461
462    #[test]
463    fn test_matches_labels_matching() {
464        // Should return true when element has one of the requested labels
465        let element_labels = vec!["Person".to_string(), "Employee".to_string()];
466        let requested_labels = vec!["Person".to_string()];
467
468        assert!(matches_labels(&element_labels, &requested_labels));
469    }
470
471    #[test]
472    fn test_matches_labels_non_matching() {
473        // Should return false when element has none of the requested labels
474        let element_labels = vec!["Person".to_string(), "Employee".to_string()];
475        let requested_labels = vec!["Company".to_string()];
476
477        assert!(!matches_labels(&element_labels, &requested_labels));
478    }
479
480    #[test]
481    fn test_matches_labels_partial_overlap() {
482        // Should return true when there's any overlap
483        let element_labels = vec!["Person".to_string(), "Employee".to_string()];
484        let requested_labels = vec!["Employee".to_string(), "Company".to_string()];
485
486        assert!(matches_labels(&element_labels, &requested_labels));
487    }
488
489    #[test]
490    fn test_matches_labels_empty_element() {
491        // Empty element labels with non-empty requested should not match
492        let element_labels = vec![];
493        let requested_labels = vec!["Person".to_string()];
494
495        assert!(!matches_labels(&element_labels, &requested_labels));
496    }
497
498    #[test]
499    fn test_matches_labels_both_empty() {
500        // Both empty should match (empty requested matches all)
501        let element_labels = vec![];
502        let requested_labels = vec![];
503
504        assert!(matches_labels(&element_labels, &requested_labels));
505    }
506
507    #[test]
508    fn test_transform_element_node() {
509        // Test transforming a node (no start_id/end_id)
510        let mut properties = Map::new();
511        properties.insert("name".to_string(), serde_json::json!("Alice"));
512        properties.insert("age".to_string(), serde_json::json!(30));
513
514        let bootstrap_elem = BootstrapElement {
515            id: "1".to_string(),
516            labels: vec!["Person".to_string()],
517            properties,
518            start_id: None,
519            end_id: None,
520        };
521
522        let element = transform_element("test-source", bootstrap_elem).unwrap();
523
524        match element {
525            Element::Node { metadata, .. } => {
526                assert_eq!(metadata.reference.element_id.as_ref(), "1");
527                assert_eq!(metadata.labels.len(), 1);
528                assert_eq!(metadata.labels[0].as_ref(), "Person");
529            }
530            _ => panic!("Expected Node element"),
531        }
532    }
533
534    #[test]
535    fn test_transform_element_relation() {
536        // Test transforming a relation (with start_id and end_id)
537        let mut properties = Map::new();
538        properties.insert("since".to_string(), serde_json::json!("2020"));
539
540        let bootstrap_elem = BootstrapElement {
541            id: "r1".to_string(),
542            labels: vec!["WORKS_FOR".to_string()],
543            properties,
544            start_id: Some("1".to_string()),
545            end_id: Some("2".to_string()),
546        };
547
548        let element = transform_element("test-source", bootstrap_elem).unwrap();
549
550        match element {
551            Element::Relation {
552                metadata,
553                in_node,
554                out_node,
555                ..
556            } => {
557                assert_eq!(metadata.reference.element_id.as_ref(), "r1");
558                assert_eq!(metadata.labels.len(), 1);
559                assert_eq!(metadata.labels[0].as_ref(), "WORKS_FOR");
560                assert_eq!(in_node.element_id.as_ref(), "1");
561                assert_eq!(out_node.element_id.as_ref(), "2");
562            }
563            _ => panic!("Expected Relation element"),
564        }
565    }
566
567    #[test]
568    fn test_transform_element_various_property_types() {
569        // Test property conversion with various JSON value types
570        let mut properties = Map::new();
571        properties.insert("string_prop".to_string(), serde_json::json!("text"));
572        properties.insert("number_prop".to_string(), serde_json::json!(42));
573        properties.insert("float_prop".to_string(), serde_json::json!(1.23456));
574        properties.insert("bool_prop".to_string(), serde_json::json!(true));
575        properties.insert("null_prop".to_string(), serde_json::json!(null));
576
577        let bootstrap_elem = BootstrapElement {
578            id: "1".to_string(),
579            labels: vec!["Test".to_string()],
580            properties,
581            start_id: None,
582            end_id: None,
583        };
584
585        let element = transform_element("test-source", bootstrap_elem).unwrap();
586
587        match element {
588            Element::Node { metadata, .. } => {
589                assert_eq!(metadata.reference.element_id.as_ref(), "1");
590                // Properties were successfully converted
591            }
592            _ => panic!("Expected Node element"),
593        }
594    }
595
596    #[test]
597    fn test_transform_element_empty_properties() {
598        // Test with empty properties
599        let bootstrap_elem = BootstrapElement {
600            id: "1".to_string(),
601            labels: vec!["Empty".to_string()],
602            properties: Map::new(),
603            start_id: None,
604            end_id: None,
605        };
606
607        let element = transform_element("test-source", bootstrap_elem).unwrap();
608
609        match element {
610            Element::Node { metadata, .. } => {
611                assert_eq!(metadata.reference.element_id.as_ref(), "1");
612                // Properties transformed successfully (even if empty)
613            }
614            _ => panic!("Expected Node element"),
615        }
616    }
617}