Skip to main content

drasi_bootstrap_application/
application.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Application bootstrap provider for replaying stored insert events
16
17use anyhow::Result;
18use async_trait::async_trait;
19use drasi_core::models::{Element, SourceChange};
20use log::info;
21use std::sync::Arc;
22use tokio::sync::RwLock;
23
24use drasi_lib::bootstrap::BootstrapRequest;
25use drasi_lib::bootstrap::{BootstrapContext, BootstrapProvider};
26
27/// Bootstrap provider for application sources that replays stored insert events
28pub struct ApplicationBootstrapProvider {
29    /// Shared reference to bootstrap data (insert events) for replay
30    /// This should be connected to ApplicationSource's bootstrap_data via shared Arc
31    bootstrap_data: Arc<RwLock<Vec<SourceChange>>>,
32}
33
34impl ApplicationBootstrapProvider {
35    /// Create a new provider with its own isolated bootstrap data storage
36    /// Note: This creates an independent storage that won't be connected to any ApplicationSource
37    pub fn new() -> Self {
38        Self {
39            bootstrap_data: Arc::new(RwLock::new(Vec::new())),
40        }
41    }
42
43    /// Create a new provider with a shared reference to ApplicationSource's bootstrap data
44    /// This allows the provider to access the actual data stored by ApplicationSource
45    pub fn with_shared_data(bootstrap_data: Arc<RwLock<Vec<SourceChange>>>) -> Self {
46        Self { bootstrap_data }
47    }
48
49    /// Create a builder for ApplicationBootstrapProvider
50    pub fn builder() -> ApplicationBootstrapProviderBuilder {
51        ApplicationBootstrapProviderBuilder::new()
52    }
53}
54
55impl Default for ApplicationBootstrapProvider {
56    fn default() -> Self {
57        Self::new()
58    }
59}
60
61/// Builder for ApplicationBootstrapProvider
62///
63/// # Example
64///
65/// ```no_run
66/// use drasi_bootstrap_application::ApplicationBootstrapProvider;
67///
68/// // Create with isolated storage
69/// let provider = ApplicationBootstrapProvider::builder().build();
70///
71/// // Create with shared storage
72/// use std::sync::Arc;
73/// use tokio::sync::RwLock;
74/// use drasi_core::models::SourceChange;
75///
76/// let shared_data = Arc::new(RwLock::new(Vec::<SourceChange>::new()));
77/// let provider = ApplicationBootstrapProvider::builder()
78///     .with_shared_data(shared_data)
79///     .build();
80/// ```
81pub struct ApplicationBootstrapProviderBuilder {
82    shared_data: Option<Arc<RwLock<Vec<SourceChange>>>>,
83}
84
85impl ApplicationBootstrapProviderBuilder {
86    /// Create a new builder
87    pub fn new() -> Self {
88        Self { shared_data: None }
89    }
90
91    /// Set shared bootstrap data
92    ///
93    /// Use this when you want the provider to share bootstrap data with an ApplicationSource.
94    pub fn with_shared_data(mut self, data: Arc<RwLock<Vec<SourceChange>>>) -> Self {
95        self.shared_data = Some(data);
96        self
97    }
98
99    /// Build the ApplicationBootstrapProvider
100    pub fn build(self) -> ApplicationBootstrapProvider {
101        match self.shared_data {
102            Some(data) => ApplicationBootstrapProvider::with_shared_data(data),
103            None => ApplicationBootstrapProvider::new(),
104        }
105    }
106}
107
108impl Default for ApplicationBootstrapProviderBuilder {
109    fn default() -> Self {
110        Self::new()
111    }
112}
113
114impl ApplicationBootstrapProvider {
115    /// Store an insert event for future bootstrap replay
116    /// This would be called by the application source when it receives insert events
117    pub async fn store_insert_event(&self, change: SourceChange) {
118        if matches!(change, SourceChange::Insert { .. }) {
119            self.bootstrap_data.write().await.push(change);
120        }
121    }
122
123    /// Get all stored insert events (for testing or inspection)
124    pub async fn get_stored_events(&self) -> Vec<SourceChange> {
125        self.bootstrap_data.read().await.clone()
126    }
127
128    /// Clear stored events (for testing or reset)
129    pub async fn clear_stored_events(&self) {
130        self.bootstrap_data.write().await.clear();
131    }
132
133    /// Check if a change matches the requested labels
134    fn matches_labels(&self, change: &SourceChange, request: &BootstrapRequest) -> bool {
135        match change {
136            SourceChange::Insert { element } | SourceChange::Update { element, .. } => {
137                match element {
138                    Element::Node { metadata, .. } => {
139                        request.node_labels.is_empty()
140                            || metadata
141                                .labels
142                                .iter()
143                                .any(|l| request.node_labels.contains(&l.to_string()))
144                    }
145                    Element::Relation { metadata, .. } => {
146                        request.relation_labels.is_empty()
147                            || metadata
148                                .labels
149                                .iter()
150                                .any(|l| request.relation_labels.contains(&l.to_string()))
151                    }
152                }
153            }
154            SourceChange::Delete { metadata } => {
155                request.node_labels.is_empty() && request.relation_labels.is_empty()
156                    || metadata.labels.iter().any(|l| {
157                        request.node_labels.contains(&l.to_string())
158                            || request.relation_labels.contains(&l.to_string())
159                    })
160            }
161            SourceChange::Future { .. } => {
162                // Future events are not supported in bootstrap
163                false
164            }
165        }
166    }
167}
168
169#[async_trait]
170impl BootstrapProvider for ApplicationBootstrapProvider {
171    async fn bootstrap(
172        &self,
173        request: BootstrapRequest,
174        _context: &BootstrapContext,
175        _event_tx: drasi_lib::channels::BootstrapEventSender,
176        _settings: Option<&drasi_lib::config::SourceSubscriptionSettings>,
177    ) -> Result<usize> {
178        info!(
179            "ApplicationBootstrapProvider processing bootstrap request for query '{}' with {} node labels and {} relation labels",
180            request.query_id,
181            request.node_labels.len(),
182            request.relation_labels.len()
183        );
184
185        let bootstrap_data = self.bootstrap_data.read().await;
186        let mut count = 0;
187
188        if bootstrap_data.is_empty() {
189            info!(
190                "ApplicationBootstrapProvider: No stored events to replay for query '{}'",
191                request.query_id
192            );
193            return Ok(0);
194        }
195
196        info!(
197            "ApplicationBootstrapProvider: Replaying {} stored events for query '{}'",
198            bootstrap_data.len(),
199            request.query_id
200        );
201
202        for change in bootstrap_data.iter() {
203            // Filter by requested labels
204            if self.matches_labels(change, &request) {
205                // Note: Sequence numbering and profiling metadata are handled by
206                // ApplicationSource.subscribe() which sends bootstrap events through
207                // dedicated channels. This provider only counts matching events.
208                count += 1;
209            }
210        }
211
212        info!(
213            "ApplicationBootstrapProvider sent {} bootstrap events for query '{}'",
214            count, request.query_id
215        );
216        Ok(count)
217    }
218}
219
220// Implementation Note: Bootstrap Data Connection
221//
222// This provider can be connected to ApplicationSource's actual bootstrap data in two ways:
223//
224// 1. Via with_shared_data() constructor:
225//    When creating an ApplicationSource, pass the bootstrap_data Arc to the provider:
226//    ```rust
227//    let (source, handle) = ApplicationSource::new(config, event_tx);
228//    let provider = ApplicationBootstrapProvider::with_shared_data(
229//        source.get_bootstrap_data()
230//    );
231//    ```
232//
233// 2. Via BootstrapContext properties:
234//    Store the bootstrap_data Arc in the source config properties as a special internal
235//    property that can be retrieved by the provider during bootstrap operations.
236//
237// Currently, ApplicationSource handles bootstrap directly in its subscribe() method
238// (lines 337-384 in sources/application/mod.rs), so this provider is not actively used
239// by ApplicationSource. The provider exists for testing and potential future integration
240// where bootstrap logic might be delegated to the provider system.