drasi_bootstrap_application/application.rs
1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Application bootstrap provider for replaying stored insert events
16
17use anyhow::Result;
18use async_trait::async_trait;
19use drasi_core::models::{Element, SourceChange};
20use log::info;
21use std::sync::Arc;
22use tokio::sync::RwLock;
23
24use drasi_lib::bootstrap::BootstrapRequest;
25use drasi_lib::bootstrap::{BootstrapContext, BootstrapProvider};
26
27/// Bootstrap provider for application sources that replays stored insert events
28pub struct ApplicationBootstrapProvider {
29 /// Shared reference to bootstrap data (insert events) for replay
30 /// This should be connected to ApplicationSource's bootstrap_data via shared Arc
31 bootstrap_data: Arc<RwLock<Vec<SourceChange>>>,
32}
33
34impl ApplicationBootstrapProvider {
35 /// Create a new provider with its own isolated bootstrap data storage
36 /// Note: This creates an independent storage that won't be connected to any ApplicationSource
37 pub fn new() -> Self {
38 Self {
39 bootstrap_data: Arc::new(RwLock::new(Vec::new())),
40 }
41 }
42
43 /// Create a new provider with a shared reference to ApplicationSource's bootstrap data
44 /// This allows the provider to access the actual data stored by ApplicationSource
45 pub fn with_shared_data(bootstrap_data: Arc<RwLock<Vec<SourceChange>>>) -> Self {
46 Self { bootstrap_data }
47 }
48
49 /// Create a builder for ApplicationBootstrapProvider
50 pub fn builder() -> ApplicationBootstrapProviderBuilder {
51 ApplicationBootstrapProviderBuilder::new()
52 }
53}
54
55impl Default for ApplicationBootstrapProvider {
56 fn default() -> Self {
57 Self::new()
58 }
59}
60
61/// Builder for ApplicationBootstrapProvider
62///
63/// # Example
64///
65/// ```no_run
66/// use drasi_bootstrap_application::ApplicationBootstrapProvider;
67///
68/// // Create with isolated storage
69/// let provider = ApplicationBootstrapProvider::builder().build();
70///
71/// // Create with shared storage
72/// use std::sync::Arc;
73/// use tokio::sync::RwLock;
74/// use drasi_core::models::SourceChange;
75///
76/// let shared_data = Arc::new(RwLock::new(Vec::<SourceChange>::new()));
77/// let provider = ApplicationBootstrapProvider::builder()
78/// .with_shared_data(shared_data)
79/// .build();
80/// ```
81pub struct ApplicationBootstrapProviderBuilder {
82 shared_data: Option<Arc<RwLock<Vec<SourceChange>>>>,
83}
84
85impl ApplicationBootstrapProviderBuilder {
86 /// Create a new builder
87 pub fn new() -> Self {
88 Self { shared_data: None }
89 }
90
91 /// Set shared bootstrap data
92 ///
93 /// Use this when you want the provider to share bootstrap data with an ApplicationSource.
94 pub fn with_shared_data(mut self, data: Arc<RwLock<Vec<SourceChange>>>) -> Self {
95 self.shared_data = Some(data);
96 self
97 }
98
99 /// Build the ApplicationBootstrapProvider
100 pub fn build(self) -> ApplicationBootstrapProvider {
101 match self.shared_data {
102 Some(data) => ApplicationBootstrapProvider::with_shared_data(data),
103 None => ApplicationBootstrapProvider::new(),
104 }
105 }
106}
107
108impl Default for ApplicationBootstrapProviderBuilder {
109 fn default() -> Self {
110 Self::new()
111 }
112}
113
114impl ApplicationBootstrapProvider {
115 /// Store an insert event for future bootstrap replay
116 /// This would be called by the application source when it receives insert events
117 pub async fn store_insert_event(&self, change: SourceChange) {
118 if matches!(change, SourceChange::Insert { .. }) {
119 self.bootstrap_data.write().await.push(change);
120 }
121 }
122
123 /// Get all stored insert events (for testing or inspection)
124 pub async fn get_stored_events(&self) -> Vec<SourceChange> {
125 self.bootstrap_data.read().await.clone()
126 }
127
128 /// Clear stored events (for testing or reset)
129 pub async fn clear_stored_events(&self) {
130 self.bootstrap_data.write().await.clear();
131 }
132
133 /// Check if a change matches the requested labels
134 fn matches_labels(&self, change: &SourceChange, request: &BootstrapRequest) -> bool {
135 match change {
136 SourceChange::Insert { element } | SourceChange::Update { element, .. } => {
137 match element {
138 Element::Node { metadata, .. } => {
139 request.node_labels.is_empty()
140 || metadata
141 .labels
142 .iter()
143 .any(|l| request.node_labels.contains(&l.to_string()))
144 }
145 Element::Relation { metadata, .. } => {
146 request.relation_labels.is_empty()
147 || metadata
148 .labels
149 .iter()
150 .any(|l| request.relation_labels.contains(&l.to_string()))
151 }
152 }
153 }
154 SourceChange::Delete { metadata } => {
155 request.node_labels.is_empty() && request.relation_labels.is_empty()
156 || metadata.labels.iter().any(|l| {
157 request.node_labels.contains(&l.to_string())
158 || request.relation_labels.contains(&l.to_string())
159 })
160 }
161 SourceChange::Future { .. } => {
162 // Future events are not supported in bootstrap
163 false
164 }
165 }
166 }
167}
168
169#[async_trait]
170impl BootstrapProvider for ApplicationBootstrapProvider {
171 async fn bootstrap(
172 &self,
173 request: BootstrapRequest,
174 _context: &BootstrapContext,
175 _event_tx: drasi_lib::channels::BootstrapEventSender,
176 _settings: Option<&drasi_lib::config::SourceSubscriptionSettings>,
177 ) -> Result<usize> {
178 info!(
179 "ApplicationBootstrapProvider processing bootstrap request for query '{}' with {} node labels and {} relation labels",
180 request.query_id,
181 request.node_labels.len(),
182 request.relation_labels.len()
183 );
184
185 let bootstrap_data = self.bootstrap_data.read().await;
186 let mut count = 0;
187
188 if bootstrap_data.is_empty() {
189 info!(
190 "ApplicationBootstrapProvider: No stored events to replay for query '{}'",
191 request.query_id
192 );
193 return Ok(0);
194 }
195
196 info!(
197 "ApplicationBootstrapProvider: Replaying {} stored events for query '{}'",
198 bootstrap_data.len(),
199 request.query_id
200 );
201
202 for change in bootstrap_data.iter() {
203 // Filter by requested labels
204 if self.matches_labels(change, &request) {
205 // Note: Sequence numbering and profiling metadata are handled by
206 // ApplicationSource.subscribe() which sends bootstrap events through
207 // dedicated channels. This provider only counts matching events.
208 count += 1;
209 }
210 }
211
212 info!(
213 "ApplicationBootstrapProvider sent {} bootstrap events for query '{}'",
214 count, request.query_id
215 );
216 Ok(count)
217 }
218}
219
220// Implementation Note: Bootstrap Data Connection
221//
222// This provider can be connected to ApplicationSource's actual bootstrap data in two ways:
223//
224// 1. Via with_shared_data() constructor:
225// When creating an ApplicationSource, pass the bootstrap_data Arc to the provider:
226// ```rust
227// let (source, handle) = ApplicationSource::new(config, event_tx);
228// let provider = ApplicationBootstrapProvider::with_shared_data(
229// source.get_bootstrap_data()
230// );
231// ```
232//
233// 2. Via BootstrapContext properties:
234// Store the bootstrap_data Arc in the source config properties as a special internal
235// property that can be retrieved by the provider during bootstrap operations.
236//
237// Currently, ApplicationSource handles bootstrap directly in its subscribe() method
238// (lines 337-384 in sources/application/mod.rs), so this provider is not actively used
239// by ApplicationSource. The provider exists for testing and potential future integration
240// where bootstrap logic might be delegated to the provider system.