drasi_lib/sources/base.rs
1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Base implementation for common source functionality.
16//!
17//! This module provides `SourceBase` which encapsulates common patterns
18//! used across all source implementations:
19//! - Dispatcher setup and management
20//! - Bootstrap subscription handling
21//! - Event dispatching with profiling
22//! - Component lifecycle management
23//!
24//! # Plugin Architecture
25//!
26//! SourceBase is designed to be used by source plugins. Each plugin:
27//! 1. Defines its own typed configuration struct
28//! 2. Creates a SourceBase with SourceBaseParams
29//! 3. Optionally provides a bootstrap provider via `set_bootstrap_provider()`
30//! 4. Implements the Source trait delegating to SourceBase methods
31
32use anyhow::Result;
33use log::{debug, error, info};
34use std::sync::Arc;
35use tokio::sync::RwLock;
36
37use crate::bootstrap::{BootstrapContext, BootstrapProvider, BootstrapRequest};
38use crate::channels::*;
39use crate::context::SourceRuntimeContext;
40use crate::profiling;
41use crate::state_store::StateStoreProvider;
42use drasi_core::models::SourceChange;
43
44/// Parameters for creating a SourceBase instance.
45///
46/// This struct contains only the information that SourceBase needs to function.
47/// Plugin-specific configuration should remain in the plugin crate.
48///
49/// # Example
50///
51/// ```ignore
52/// use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
53///
54/// let params = SourceBaseParams::new("my-source")
55/// .with_dispatch_mode(DispatchMode::Channel)
56/// .with_dispatch_buffer_capacity(2000)
57/// .with_bootstrap_provider(my_provider);
58///
59/// let base = SourceBase::new(params)?;
60/// ```
61pub struct SourceBaseParams {
62 /// Unique identifier for the source
63 pub id: String,
64 /// Dispatch mode (Broadcast or Channel) - defaults to Channel
65 pub dispatch_mode: Option<DispatchMode>,
66 /// Dispatch buffer capacity - defaults to 1000
67 pub dispatch_buffer_capacity: Option<usize>,
68 /// Optional bootstrap provider to set during construction
69 pub bootstrap_provider: Option<Box<dyn BootstrapProvider + 'static>>,
70 /// Whether this source should auto-start - defaults to true
71 pub auto_start: bool,
72}
73
74impl std::fmt::Debug for SourceBaseParams {
75 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76 f.debug_struct("SourceBaseParams")
77 .field("id", &self.id)
78 .field("dispatch_mode", &self.dispatch_mode)
79 .field("dispatch_buffer_capacity", &self.dispatch_buffer_capacity)
80 .field(
81 "bootstrap_provider",
82 &self.bootstrap_provider.as_ref().map(|_| "<provider>"),
83 )
84 .field("auto_start", &self.auto_start)
85 .finish()
86 }
87}
88
89impl SourceBaseParams {
90 /// Create new params with just an ID, using defaults for everything else
91 pub fn new(id: impl Into<String>) -> Self {
92 Self {
93 id: id.into(),
94 dispatch_mode: None,
95 dispatch_buffer_capacity: None,
96 bootstrap_provider: None,
97 auto_start: true,
98 }
99 }
100
101 /// Set the dispatch mode
102 pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
103 self.dispatch_mode = Some(mode);
104 self
105 }
106
107 /// Set the dispatch buffer capacity
108 pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
109 self.dispatch_buffer_capacity = Some(capacity);
110 self
111 }
112
113 /// Set the bootstrap provider
114 ///
115 /// This provider will be used during source subscription to deliver
116 /// initial data to queries that request bootstrap.
117 pub fn with_bootstrap_provider(mut self, provider: impl BootstrapProvider + 'static) -> Self {
118 self.bootstrap_provider = Some(Box::new(provider));
119 self
120 }
121
122 /// Set whether this source should auto-start
123 ///
124 /// Default is `true`. Set to `false` if this source should only be
125 /// started manually via `start_source()`.
126 pub fn with_auto_start(mut self, auto_start: bool) -> Self {
127 self.auto_start = auto_start;
128 self
129 }
130}
131
132/// Base implementation for common source functionality
133pub struct SourceBase {
134 /// Source identifier
135 pub id: String,
136 /// Dispatch mode setting
137 dispatch_mode: DispatchMode,
138 /// Dispatch buffer capacity
139 dispatch_buffer_capacity: usize,
140 /// Whether this source should auto-start
141 pub auto_start: bool,
142 /// Current component status
143 pub status: Arc<RwLock<ComponentStatus>>,
144 /// Dispatchers for sending source events to subscribers
145 ///
146 /// This is a vector of dispatchers that send source events to all registered
147 /// subscribers (queries). When a source produces a change event, it broadcasts
148 /// it to all dispatchers in this vector.
149 pub dispatchers: Arc<RwLock<Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>>>>,
150 /// Runtime context (set by initialize())
151 context: Arc<RwLock<Option<SourceRuntimeContext>>>,
152 /// Channel for sending component status events (extracted from context for convenience)
153 status_tx: Arc<RwLock<Option<ComponentEventSender>>>,
154 /// State store provider (extracted from context for convenience)
155 state_store: Arc<RwLock<Option<Arc<dyn StateStoreProvider>>>>,
156 /// Handle to the source's main task
157 pub task_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
158 /// Sender for shutdown signal
159 pub shutdown_tx: Arc<RwLock<Option<tokio::sync::oneshot::Sender<()>>>>,
160 /// Optional bootstrap provider - plugins set this if they support bootstrap
161 bootstrap_provider: Arc<RwLock<Option<Arc<dyn BootstrapProvider>>>>,
162}
163
164impl SourceBase {
165 /// Create a new SourceBase with the given parameters
166 ///
167 /// The status channel is not required during construction - it will be
168 /// provided via the `SourceRuntimeContext` when `initialize()` is called.
169 ///
170 /// If a bootstrap provider is specified in params, it will be set during
171 /// construction (no async needed since nothing is shared yet).
172 pub fn new(params: SourceBaseParams) -> Result<Self> {
173 // Determine dispatch mode (default to Channel if not specified)
174 let dispatch_mode = params.dispatch_mode.unwrap_or_default();
175 let dispatch_buffer_capacity = params.dispatch_buffer_capacity.unwrap_or(1000);
176
177 // Set up initial dispatchers based on dispatch mode
178 let mut dispatchers: Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>> =
179 Vec::new();
180
181 if dispatch_mode == DispatchMode::Broadcast {
182 // For broadcast mode, create a single broadcast dispatcher
183 let dispatcher =
184 BroadcastChangeDispatcher::<SourceEventWrapper>::new(dispatch_buffer_capacity);
185 dispatchers.push(Box::new(dispatcher));
186 }
187 // For channel mode, dispatchers will be created on-demand when subscribing
188
189 // Initialize bootstrap provider if provided (no async needed at construction time)
190 let bootstrap_provider = params
191 .bootstrap_provider
192 .map(|p| Arc::from(p) as Arc<dyn BootstrapProvider>);
193
194 Ok(Self {
195 id: params.id,
196 dispatch_mode,
197 dispatch_buffer_capacity,
198 auto_start: params.auto_start,
199 status: Arc::new(RwLock::new(ComponentStatus::Stopped)),
200 dispatchers: Arc::new(RwLock::new(dispatchers)),
201 context: Arc::new(RwLock::new(None)), // Set by initialize()
202 status_tx: Arc::new(RwLock::new(None)), // Extracted from context
203 state_store: Arc::new(RwLock::new(None)), // Extracted from context
204 task_handle: Arc::new(RwLock::new(None)),
205 shutdown_tx: Arc::new(RwLock::new(None)),
206 bootstrap_provider: Arc::new(RwLock::new(bootstrap_provider)),
207 })
208 }
209
210 /// Get whether this source should auto-start
211 pub fn get_auto_start(&self) -> bool {
212 self.auto_start
213 }
214
215 /// Initialize the source with runtime context.
216 ///
217 /// This method is called automatically by DrasiLib's `add_source()` method.
218 /// Plugin developers do not need to call this directly.
219 ///
220 /// The context provides access to:
221 /// - `source_id`: The source's unique identifier
222 /// - `status_tx`: Channel for reporting component status events
223 /// - `state_store`: Optional persistent state storage
224 pub async fn initialize(&self, context: SourceRuntimeContext) {
225 // Store context for later use
226 *self.context.write().await = Some(context.clone());
227
228 // Extract services for convenience
229 *self.status_tx.write().await = Some(context.status_tx.clone());
230
231 if let Some(state_store) = context.state_store.as_ref() {
232 *self.state_store.write().await = Some(state_store.clone());
233 }
234 }
235
236 /// Get the runtime context if initialized.
237 ///
238 /// Returns `None` if `initialize()` has not been called yet.
239 pub async fn context(&self) -> Option<SourceRuntimeContext> {
240 self.context.read().await.clone()
241 }
242
243 /// Get the state store if configured.
244 ///
245 /// Returns `None` if no state store was provided in the context.
246 pub async fn state_store(&self) -> Option<Arc<dyn StateStoreProvider>> {
247 self.state_store.read().await.clone()
248 }
249
250 /// Get the status channel Arc for internal use by spawned tasks
251 ///
252 /// This returns the internal status_tx wrapped in Arc<RwLock<Option<...>>>
253 /// which allows background tasks to send component status events.
254 ///
255 /// Returns a clone of the Arc that can be moved into spawned tasks.
256 pub fn status_tx(&self) -> Arc<RwLock<Option<ComponentEventSender>>> {
257 self.status_tx.clone()
258 }
259
260 /// Clone the SourceBase with shared Arc references
261 ///
262 /// This creates a new SourceBase that shares the same underlying
263 /// data through Arc references. Useful for passing to spawned tasks.
264 pub fn clone_shared(&self) -> Self {
265 Self {
266 id: self.id.clone(),
267 dispatch_mode: self.dispatch_mode,
268 dispatch_buffer_capacity: self.dispatch_buffer_capacity,
269 auto_start: self.auto_start,
270 status: self.status.clone(),
271 dispatchers: self.dispatchers.clone(),
272 context: self.context.clone(),
273 status_tx: self.status_tx.clone(),
274 state_store: self.state_store.clone(),
275 task_handle: self.task_handle.clone(),
276 shutdown_tx: self.shutdown_tx.clone(),
277 bootstrap_provider: self.bootstrap_provider.clone(),
278 }
279 }
280
281 /// Set the bootstrap provider for this source, taking ownership.
282 ///
283 /// Call this after creating the SourceBase if the source plugin supports bootstrapping.
284 /// The bootstrap provider is created by the plugin using its own configuration.
285 ///
286 /// # Example
287 /// ```ignore
288 /// let provider = MyBootstrapProvider::new(config);
289 /// source_base.set_bootstrap_provider(provider).await; // Ownership transferred
290 /// ```
291 pub async fn set_bootstrap_provider(&self, provider: impl BootstrapProvider + 'static) {
292 *self.bootstrap_provider.write().await = Some(Arc::new(provider));
293 }
294
295 /// Get the source ID
296 pub fn get_id(&self) -> &str {
297 &self.id
298 }
299
300 /// Create a streaming receiver for a query subscription
301 ///
302 /// This creates the appropriate receiver based on the configured dispatch mode:
303 /// - Broadcast mode: Returns a receiver from the shared broadcast dispatcher
304 /// - Channel mode: Creates a new dedicated dispatcher and returns its receiver
305 ///
306 /// This is a helper method that can be used by sources with custom subscribe logic.
307 pub async fn create_streaming_receiver(
308 &self,
309 ) -> Result<Box<dyn ChangeReceiver<SourceEventWrapper>>> {
310 let receiver: Box<dyn ChangeReceiver<SourceEventWrapper>> = match self.dispatch_mode {
311 DispatchMode::Broadcast => {
312 // For broadcast mode, use the single dispatcher
313 let dispatchers = self.dispatchers.read().await;
314 if let Some(dispatcher) = dispatchers.first() {
315 dispatcher.create_receiver().await?
316 } else {
317 return Err(anyhow::anyhow!("No broadcast dispatcher available"));
318 }
319 }
320 DispatchMode::Channel => {
321 // For channel mode, create a new dispatcher for this subscription
322 let dispatcher = ChannelChangeDispatcher::<SourceEventWrapper>::new(
323 self.dispatch_buffer_capacity,
324 );
325 let receiver = dispatcher.create_receiver().await?;
326
327 // Add the new dispatcher to our list
328 let mut dispatchers = self.dispatchers.write().await;
329 dispatchers.push(Box::new(dispatcher));
330
331 receiver
332 }
333 };
334
335 Ok(receiver)
336 }
337
338 /// Subscribe to this source with optional bootstrap
339 ///
340 /// This is the standard subscribe implementation that all sources can use.
341 /// It handles:
342 /// - Creating a receiver for streaming events (based on dispatch mode)
343 /// - Setting up bootstrap if requested and a provider has been set
344 /// - Returning the appropriate SubscriptionResponse
345 pub async fn subscribe_with_bootstrap(
346 &self,
347 settings: &crate::config::SourceSubscriptionSettings,
348 source_type: &str,
349 ) -> Result<SubscriptionResponse> {
350 info!(
351 "Query '{}' subscribing to {} source '{}' (bootstrap: {})",
352 settings.query_id, source_type, self.id, settings.enable_bootstrap
353 );
354
355 // Create streaming receiver using helper method
356 let receiver = self.create_streaming_receiver().await?;
357
358 let query_id_for_response = settings.query_id.clone();
359
360 // Handle bootstrap if requested and bootstrap provider is configured
361 let bootstrap_receiver = if settings.enable_bootstrap {
362 self.handle_bootstrap_subscription(settings, source_type)
363 .await?
364 } else {
365 None
366 };
367
368 Ok(SubscriptionResponse {
369 query_id: query_id_for_response,
370 source_id: self.id.clone(),
371 receiver,
372 bootstrap_receiver,
373 })
374 }
375
376 /// Handle bootstrap subscription logic
377 async fn handle_bootstrap_subscription(
378 &self,
379 settings: &crate::config::SourceSubscriptionSettings,
380 source_type: &str,
381 ) -> Result<Option<BootstrapEventReceiver>> {
382 let provider_guard = self.bootstrap_provider.read().await;
383 if let Some(provider) = provider_guard.clone() {
384 drop(provider_guard); // Release lock before spawning task
385
386 info!(
387 "Creating bootstrap for query '{}' on {} source '{}'",
388 settings.query_id, source_type, self.id
389 );
390
391 // Create bootstrap context
392 let context = BootstrapContext::new_minimal(
393 self.id.clone(), // server_id
394 self.id.clone(), // source_id
395 );
396
397 // Create bootstrap channel
398 let (bootstrap_tx, bootstrap_rx) = tokio::sync::mpsc::channel(1000);
399
400 // Convert HashSet to Vec for backward compatibility with BootstrapRequest
401 let node_labels: Vec<String> = settings.nodes.iter().cloned().collect();
402 let relation_labels: Vec<String> = settings.relations.iter().cloned().collect();
403
404 // Create bootstrap request with request_id
405 let request = BootstrapRequest {
406 query_id: settings.query_id.clone(),
407 node_labels,
408 relation_labels,
409 request_id: format!("{}-{}", settings.query_id, uuid::Uuid::new_v4()),
410 };
411
412 // Clone settings for the async task
413 let settings_clone = settings.clone();
414
415 // Spawn bootstrap task
416 tokio::spawn(async move {
417 match provider
418 .bootstrap(request, &context, bootstrap_tx, Some(&settings_clone))
419 .await
420 {
421 Ok(count) => {
422 info!(
423 "Bootstrap completed successfully for query '{}', sent {count} events",
424 settings_clone.query_id
425 );
426 }
427 Err(e) => {
428 error!(
429 "Bootstrap failed for query '{}': {e}",
430 settings_clone.query_id
431 );
432 }
433 }
434 });
435
436 Ok(Some(bootstrap_rx))
437 } else {
438 info!(
439 "Bootstrap requested for query '{}' but no bootstrap provider configured for {} source '{}'",
440 settings.query_id, source_type, self.id
441 );
442 Ok(None)
443 }
444 }
445
446 /// Dispatch a SourceChange event with profiling metadata
447 ///
448 /// This method handles the common pattern of:
449 /// - Creating profiling metadata with timestamp
450 /// - Wrapping the change in a SourceEventWrapper
451 /// - Dispatching to all subscribers
452 /// - Handling the no-subscriber case gracefully
453 pub async fn dispatch_source_change(&self, change: SourceChange) -> Result<()> {
454 // Create profiling metadata
455 let mut profiling = profiling::ProfilingMetadata::new();
456 profiling.source_send_ns = Some(profiling::timestamp_ns());
457
458 // Create event wrapper
459 let wrapper = SourceEventWrapper::with_profiling(
460 self.id.clone(),
461 SourceEvent::Change(change),
462 chrono::Utc::now(),
463 profiling,
464 );
465
466 // Dispatch event
467 self.dispatch_event(wrapper).await
468 }
469
470 /// Dispatch a SourceEventWrapper to all subscribers
471 ///
472 /// This is a generic method for dispatching any SourceEvent.
473 /// It handles Arc-wrapping for zero-copy sharing and logs
474 /// when there are no subscribers.
475 pub async fn dispatch_event(&self, wrapper: SourceEventWrapper) -> Result<()> {
476 debug!("[{}] Dispatching event: {:?}", self.id, &wrapper);
477
478 // Arc-wrap for zero-copy sharing across dispatchers
479 let arc_wrapper = Arc::new(wrapper);
480
481 // Send to all dispatchers
482 let dispatchers = self.dispatchers.read().await;
483 for dispatcher in dispatchers.iter() {
484 if let Err(e) = dispatcher.dispatch_change(arc_wrapper.clone()).await {
485 debug!("[{}] Failed to dispatch event: {}", self.id, e);
486 }
487 }
488
489 Ok(())
490 }
491
492 /// Broadcast SourceControl events
493 pub async fn broadcast_control(&self, control: SourceControl) -> Result<()> {
494 let wrapper = SourceEventWrapper::new(
495 self.id.clone(),
496 SourceEvent::Control(control),
497 chrono::Utc::now(),
498 );
499 self.dispatch_event(wrapper).await
500 }
501
502 /// Create a test subscription to this source (synchronous wrapper)
503 ///
504 /// This method is intended for use in tests to receive events from the source.
505 /// It properly handles both Broadcast and Channel dispatch modes by delegating
506 /// to `create_streaming_receiver()`, making the dispatch mode transparent to tests.
507 ///
508 /// Note: This is a synchronous wrapper that uses `tokio::task::block_in_place` internally.
509 /// For async contexts, prefer calling `create_streaming_receiver()` directly.
510 ///
511 /// # Returns
512 /// A receiver that will receive all events dispatched by this source
513 ///
514 /// # Panics
515 /// Panics if the receiver cannot be created (e.g., internal error)
516 pub fn test_subscribe(&self) -> Box<dyn ChangeReceiver<SourceEventWrapper>> {
517 // Use block_in_place to avoid nested executor issues in async tests
518 tokio::task::block_in_place(|| {
519 tokio::runtime::Handle::current().block_on(self.create_streaming_receiver())
520 })
521 .expect("Failed to create test subscription receiver")
522 }
523
524 /// Helper function to dispatch events from spawned tasks
525 ///
526 /// This is a static helper that can be used from spawned async tasks that don't
527 /// have access to `self`. It manually iterates through dispatchers and sends the event.
528 ///
529 /// For code that has access to `&self`, prefer using `dispatch_event()` instead.
530 ///
531 /// # Arguments
532 /// * `dispatchers` - Arc to the dispatchers list (from `self.base.dispatchers.clone()`)
533 /// * `wrapper` - The event wrapper to dispatch
534 /// * `source_id` - Source ID for logging
535 pub async fn dispatch_from_task(
536 dispatchers: Arc<RwLock<Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>>>>,
537 wrapper: SourceEventWrapper,
538 source_id: &str,
539 ) -> Result<()> {
540 debug!(
541 "[{}] Dispatching event from task: {:?}",
542 source_id, &wrapper
543 );
544
545 // Arc-wrap for zero-copy sharing across dispatchers
546 let arc_wrapper = Arc::new(wrapper);
547
548 // Send to all dispatchers
549 let dispatchers_guard = dispatchers.read().await;
550 for dispatcher in dispatchers_guard.iter() {
551 if let Err(e) = dispatcher.dispatch_change(arc_wrapper.clone()).await {
552 debug!("[{source_id}] Failed to dispatch event from task: {e}");
553 }
554 }
555
556 Ok(())
557 }
558
559 /// Handle common stop functionality
560 pub async fn stop_common(&self) -> Result<()> {
561 info!("Stopping source '{}'", self.id);
562
563 // Send shutdown signal if we have one
564 if let Some(tx) = self.shutdown_tx.write().await.take() {
565 let _ = tx.send(());
566 }
567
568 // Wait for task to complete
569 if let Some(handle) = self.task_handle.write().await.take() {
570 match tokio::time::timeout(std::time::Duration::from_secs(5), handle).await {
571 Ok(Ok(())) => {
572 info!("Source '{}' task completed successfully", self.id);
573 }
574 Ok(Err(e)) => {
575 error!("Source '{}' task panicked: {}", self.id, e);
576 }
577 Err(_) => {
578 error!("Source '{}' task did not complete within timeout", self.id);
579 }
580 }
581 }
582
583 *self.status.write().await = ComponentStatus::Stopped;
584 info!("Source '{}' stopped", self.id);
585 Ok(())
586 }
587
588 /// Get the current status
589 pub async fn get_status(&self) -> ComponentStatus {
590 self.status.read().await.clone()
591 }
592
593 /// Set the current status
594 pub async fn set_status(&self, status: ComponentStatus) {
595 *self.status.write().await = status;
596 }
597
598 /// Transition to a new status and send event
599 pub async fn set_status_with_event(
600 &self,
601 status: ComponentStatus,
602 message: Option<String>,
603 ) -> Result<()> {
604 *self.status.write().await = status.clone();
605 self.send_component_event(status, message).await
606 }
607
608 /// Set the task handle
609 pub async fn set_task_handle(&self, handle: tokio::task::JoinHandle<()>) {
610 *self.task_handle.write().await = Some(handle);
611 }
612
613 /// Set the shutdown sender
614 pub async fn set_shutdown_tx(&self, tx: tokio::sync::oneshot::Sender<()>) {
615 *self.shutdown_tx.write().await = Some(tx);
616 }
617
618 /// Send a component event
619 ///
620 /// If the status channel has not been initialized yet, this method silently
621 /// succeeds without sending anything. This allows sources to be used
622 /// in a standalone fashion without DrasiLib if needed.
623 pub async fn send_component_event(
624 &self,
625 status: ComponentStatus,
626 message: Option<String>,
627 ) -> Result<()> {
628 let event = ComponentEvent {
629 component_id: self.id.clone(),
630 component_type: ComponentType::Source,
631 status,
632 timestamp: chrono::Utc::now(),
633 message,
634 };
635
636 if let Some(ref tx) = *self.status_tx.read().await {
637 if let Err(e) = tx.send(event).await {
638 error!("Failed to send component event: {e}");
639 }
640 }
641 // If status_tx is None, silently skip - initialization happens before start()
642 Ok(())
643 }
644}