Skip to main content

drasi_lib/sources/
base.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Base implementation for common source functionality.
16//!
17//! This module provides `SourceBase` which encapsulates common patterns
18//! used across all source implementations:
19//! - Dispatcher setup and management
20//! - Bootstrap subscription handling
21//! - Event dispatching with profiling
22//! - Component lifecycle management
23//!
24//! # Plugin Architecture
25//!
26//! SourceBase is designed to be used by source plugins. Each plugin:
27//! 1. Defines its own typed configuration struct
28//! 2. Creates a SourceBase with SourceBaseParams
29//! 3. Optionally provides a bootstrap provider via `set_bootstrap_provider()`
30//! 4. Implements the Source trait delegating to SourceBase methods
31
32use anyhow::Result;
33use log::{debug, error, info};
34use std::sync::Arc;
35use tokio::sync::RwLock;
36
37use crate::bootstrap::{BootstrapContext, BootstrapProvider, BootstrapRequest};
38use crate::channels::*;
39use crate::context::SourceRuntimeContext;
40use crate::profiling;
41use crate::state_store::StateStoreProvider;
42use drasi_core::models::SourceChange;
43
44/// Parameters for creating a SourceBase instance.
45///
46/// This struct contains only the information that SourceBase needs to function.
47/// Plugin-specific configuration should remain in the plugin crate.
48///
49/// # Example
50///
51/// ```ignore
52/// use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
53///
54/// let params = SourceBaseParams::new("my-source")
55///     .with_dispatch_mode(DispatchMode::Channel)
56///     .with_dispatch_buffer_capacity(2000)
57///     .with_bootstrap_provider(my_provider);
58///
59/// let base = SourceBase::new(params)?;
60/// ```
61pub struct SourceBaseParams {
62    /// Unique identifier for the source
63    pub id: String,
64    /// Dispatch mode (Broadcast or Channel) - defaults to Channel
65    pub dispatch_mode: Option<DispatchMode>,
66    /// Dispatch buffer capacity - defaults to 1000
67    pub dispatch_buffer_capacity: Option<usize>,
68    /// Optional bootstrap provider to set during construction
69    pub bootstrap_provider: Option<Box<dyn BootstrapProvider + 'static>>,
70    /// Whether this source should auto-start - defaults to true
71    pub auto_start: bool,
72}
73
74impl std::fmt::Debug for SourceBaseParams {
75    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76        f.debug_struct("SourceBaseParams")
77            .field("id", &self.id)
78            .field("dispatch_mode", &self.dispatch_mode)
79            .field("dispatch_buffer_capacity", &self.dispatch_buffer_capacity)
80            .field(
81                "bootstrap_provider",
82                &self.bootstrap_provider.as_ref().map(|_| "<provider>"),
83            )
84            .field("auto_start", &self.auto_start)
85            .finish()
86    }
87}
88
89impl SourceBaseParams {
90    /// Create new params with just an ID, using defaults for everything else
91    pub fn new(id: impl Into<String>) -> Self {
92        Self {
93            id: id.into(),
94            dispatch_mode: None,
95            dispatch_buffer_capacity: None,
96            bootstrap_provider: None,
97            auto_start: true,
98        }
99    }
100
101    /// Set the dispatch mode
102    pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
103        self.dispatch_mode = Some(mode);
104        self
105    }
106
107    /// Set the dispatch buffer capacity
108    pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
109        self.dispatch_buffer_capacity = Some(capacity);
110        self
111    }
112
113    /// Set the bootstrap provider
114    ///
115    /// This provider will be used during source subscription to deliver
116    /// initial data to queries that request bootstrap.
117    pub fn with_bootstrap_provider(mut self, provider: impl BootstrapProvider + 'static) -> Self {
118        self.bootstrap_provider = Some(Box::new(provider));
119        self
120    }
121
122    /// Set whether this source should auto-start
123    ///
124    /// Default is `true`. Set to `false` if this source should only be
125    /// started manually via `start_source()`.
126    pub fn with_auto_start(mut self, auto_start: bool) -> Self {
127        self.auto_start = auto_start;
128        self
129    }
130}
131
132/// Base implementation for common source functionality
133pub struct SourceBase {
134    /// Source identifier
135    pub id: String,
136    /// Dispatch mode setting
137    dispatch_mode: DispatchMode,
138    /// Dispatch buffer capacity
139    dispatch_buffer_capacity: usize,
140    /// Whether this source should auto-start
141    pub auto_start: bool,
142    /// Current component status
143    pub status: Arc<RwLock<ComponentStatus>>,
144    /// Dispatchers for sending source events to subscribers
145    ///
146    /// This is a vector of dispatchers that send source events to all registered
147    /// subscribers (queries). When a source produces a change event, it broadcasts
148    /// it to all dispatchers in this vector.
149    pub dispatchers: Arc<RwLock<Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>>>>,
150    /// Runtime context (set by initialize())
151    context: Arc<RwLock<Option<SourceRuntimeContext>>>,
152    /// Channel for sending component status events (extracted from context for convenience)
153    status_tx: Arc<RwLock<Option<ComponentEventSender>>>,
154    /// State store provider (extracted from context for convenience)
155    state_store: Arc<RwLock<Option<Arc<dyn StateStoreProvider>>>>,
156    /// Handle to the source's main task
157    pub task_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
158    /// Sender for shutdown signal
159    pub shutdown_tx: Arc<RwLock<Option<tokio::sync::oneshot::Sender<()>>>>,
160    /// Optional bootstrap provider - plugins set this if they support bootstrap
161    bootstrap_provider: Arc<RwLock<Option<Arc<dyn BootstrapProvider>>>>,
162}
163
164impl SourceBase {
165    /// Create a new SourceBase with the given parameters
166    ///
167    /// The status channel is not required during construction - it will be
168    /// provided via the `SourceRuntimeContext` when `initialize()` is called.
169    ///
170    /// If a bootstrap provider is specified in params, it will be set during
171    /// construction (no async needed since nothing is shared yet).
172    pub fn new(params: SourceBaseParams) -> Result<Self> {
173        // Determine dispatch mode (default to Channel if not specified)
174        let dispatch_mode = params.dispatch_mode.unwrap_or_default();
175        let dispatch_buffer_capacity = params.dispatch_buffer_capacity.unwrap_or(1000);
176
177        // Set up initial dispatchers based on dispatch mode
178        let mut dispatchers: Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>> =
179            Vec::new();
180
181        if dispatch_mode == DispatchMode::Broadcast {
182            // For broadcast mode, create a single broadcast dispatcher
183            let dispatcher =
184                BroadcastChangeDispatcher::<SourceEventWrapper>::new(dispatch_buffer_capacity);
185            dispatchers.push(Box::new(dispatcher));
186        }
187        // For channel mode, dispatchers will be created on-demand when subscribing
188
189        // Initialize bootstrap provider if provided (no async needed at construction time)
190        let bootstrap_provider = params
191            .bootstrap_provider
192            .map(|p| Arc::from(p) as Arc<dyn BootstrapProvider>);
193
194        Ok(Self {
195            id: params.id,
196            dispatch_mode,
197            dispatch_buffer_capacity,
198            auto_start: params.auto_start,
199            status: Arc::new(RwLock::new(ComponentStatus::Stopped)),
200            dispatchers: Arc::new(RwLock::new(dispatchers)),
201            context: Arc::new(RwLock::new(None)), // Set by initialize()
202            status_tx: Arc::new(RwLock::new(None)), // Extracted from context
203            state_store: Arc::new(RwLock::new(None)), // Extracted from context
204            task_handle: Arc::new(RwLock::new(None)),
205            shutdown_tx: Arc::new(RwLock::new(None)),
206            bootstrap_provider: Arc::new(RwLock::new(bootstrap_provider)),
207        })
208    }
209
210    /// Get whether this source should auto-start
211    pub fn get_auto_start(&self) -> bool {
212        self.auto_start
213    }
214
215    /// Initialize the source with runtime context.
216    ///
217    /// This method is called automatically by DrasiLib's `add_source()` method.
218    /// Plugin developers do not need to call this directly.
219    ///
220    /// The context provides access to:
221    /// - `source_id`: The source's unique identifier
222    /// - `status_tx`: Channel for reporting component status events
223    /// - `state_store`: Optional persistent state storage
224    pub async fn initialize(&self, context: SourceRuntimeContext) {
225        // Store context for later use
226        *self.context.write().await = Some(context.clone());
227
228        // Extract services for convenience
229        *self.status_tx.write().await = Some(context.status_tx.clone());
230
231        if let Some(state_store) = context.state_store.as_ref() {
232            *self.state_store.write().await = Some(state_store.clone());
233        }
234    }
235
236    /// Get the runtime context if initialized.
237    ///
238    /// Returns `None` if `initialize()` has not been called yet.
239    pub async fn context(&self) -> Option<SourceRuntimeContext> {
240        self.context.read().await.clone()
241    }
242
243    /// Get the state store if configured.
244    ///
245    /// Returns `None` if no state store was provided in the context.
246    pub async fn state_store(&self) -> Option<Arc<dyn StateStoreProvider>> {
247        self.state_store.read().await.clone()
248    }
249
250    /// Get the status channel Arc for internal use by spawned tasks
251    ///
252    /// This returns the internal status_tx wrapped in Arc<RwLock<Option<...>>>
253    /// which allows background tasks to send component status events.
254    ///
255    /// Returns a clone of the Arc that can be moved into spawned tasks.
256    pub fn status_tx(&self) -> Arc<RwLock<Option<ComponentEventSender>>> {
257        self.status_tx.clone()
258    }
259
260    /// Clone the SourceBase with shared Arc references
261    ///
262    /// This creates a new SourceBase that shares the same underlying
263    /// data through Arc references. Useful for passing to spawned tasks.
264    pub fn clone_shared(&self) -> Self {
265        Self {
266            id: self.id.clone(),
267            dispatch_mode: self.dispatch_mode,
268            dispatch_buffer_capacity: self.dispatch_buffer_capacity,
269            auto_start: self.auto_start,
270            status: self.status.clone(),
271            dispatchers: self.dispatchers.clone(),
272            context: self.context.clone(),
273            status_tx: self.status_tx.clone(),
274            state_store: self.state_store.clone(),
275            task_handle: self.task_handle.clone(),
276            shutdown_tx: self.shutdown_tx.clone(),
277            bootstrap_provider: self.bootstrap_provider.clone(),
278        }
279    }
280
281    /// Set the bootstrap provider for this source, taking ownership.
282    ///
283    /// Call this after creating the SourceBase if the source plugin supports bootstrapping.
284    /// The bootstrap provider is created by the plugin using its own configuration.
285    ///
286    /// # Example
287    /// ```ignore
288    /// let provider = MyBootstrapProvider::new(config);
289    /// source_base.set_bootstrap_provider(provider).await;  // Ownership transferred
290    /// ```
291    pub async fn set_bootstrap_provider(&self, provider: impl BootstrapProvider + 'static) {
292        *self.bootstrap_provider.write().await = Some(Arc::new(provider));
293    }
294
295    /// Get the source ID
296    pub fn get_id(&self) -> &str {
297        &self.id
298    }
299
300    /// Create a streaming receiver for a query subscription
301    ///
302    /// This creates the appropriate receiver based on the configured dispatch mode:
303    /// - Broadcast mode: Returns a receiver from the shared broadcast dispatcher
304    /// - Channel mode: Creates a new dedicated dispatcher and returns its receiver
305    ///
306    /// This is a helper method that can be used by sources with custom subscribe logic.
307    pub async fn create_streaming_receiver(
308        &self,
309    ) -> Result<Box<dyn ChangeReceiver<SourceEventWrapper>>> {
310        let receiver: Box<dyn ChangeReceiver<SourceEventWrapper>> = match self.dispatch_mode {
311            DispatchMode::Broadcast => {
312                // For broadcast mode, use the single dispatcher
313                let dispatchers = self.dispatchers.read().await;
314                if let Some(dispatcher) = dispatchers.first() {
315                    dispatcher.create_receiver().await?
316                } else {
317                    return Err(anyhow::anyhow!("No broadcast dispatcher available"));
318                }
319            }
320            DispatchMode::Channel => {
321                // For channel mode, create a new dispatcher for this subscription
322                let dispatcher = ChannelChangeDispatcher::<SourceEventWrapper>::new(
323                    self.dispatch_buffer_capacity,
324                );
325                let receiver = dispatcher.create_receiver().await?;
326
327                // Add the new dispatcher to our list
328                let mut dispatchers = self.dispatchers.write().await;
329                dispatchers.push(Box::new(dispatcher));
330
331                receiver
332            }
333        };
334
335        Ok(receiver)
336    }
337
338    /// Subscribe to this source with optional bootstrap
339    ///
340    /// This is the standard subscribe implementation that all sources can use.
341    /// It handles:
342    /// - Creating a receiver for streaming events (based on dispatch mode)
343    /// - Setting up bootstrap if requested and a provider has been set
344    /// - Returning the appropriate SubscriptionResponse
345    pub async fn subscribe_with_bootstrap(
346        &self,
347        settings: &crate::config::SourceSubscriptionSettings,
348        source_type: &str,
349    ) -> Result<SubscriptionResponse> {
350        info!(
351            "Query '{}' subscribing to {} source '{}' (bootstrap: {})",
352            settings.query_id, source_type, self.id, settings.enable_bootstrap
353        );
354
355        // Create streaming receiver using helper method
356        let receiver = self.create_streaming_receiver().await?;
357
358        let query_id_for_response = settings.query_id.clone();
359
360        // Handle bootstrap if requested and bootstrap provider is configured
361        let bootstrap_receiver = if settings.enable_bootstrap {
362            self.handle_bootstrap_subscription(settings, source_type)
363                .await?
364        } else {
365            None
366        };
367
368        Ok(SubscriptionResponse {
369            query_id: query_id_for_response,
370            source_id: self.id.clone(),
371            receiver,
372            bootstrap_receiver,
373        })
374    }
375
376    /// Handle bootstrap subscription logic
377    async fn handle_bootstrap_subscription(
378        &self,
379        settings: &crate::config::SourceSubscriptionSettings,
380        source_type: &str,
381    ) -> Result<Option<BootstrapEventReceiver>> {
382        let provider_guard = self.bootstrap_provider.read().await;
383        if let Some(provider) = provider_guard.clone() {
384            drop(provider_guard); // Release lock before spawning task
385
386            info!(
387                "Creating bootstrap for query '{}' on {} source '{}'",
388                settings.query_id, source_type, self.id
389            );
390
391            // Create bootstrap context
392            let context = BootstrapContext::new_minimal(
393                self.id.clone(), // server_id
394                self.id.clone(), // source_id
395            );
396
397            // Create bootstrap channel
398            let (bootstrap_tx, bootstrap_rx) = tokio::sync::mpsc::channel(1000);
399
400            // Convert HashSet to Vec for backward compatibility with BootstrapRequest
401            let node_labels: Vec<String> = settings.nodes.iter().cloned().collect();
402            let relation_labels: Vec<String> = settings.relations.iter().cloned().collect();
403
404            // Create bootstrap request with request_id
405            let request = BootstrapRequest {
406                query_id: settings.query_id.clone(),
407                node_labels,
408                relation_labels,
409                request_id: format!("{}-{}", settings.query_id, uuid::Uuid::new_v4()),
410            };
411
412            // Clone settings for the async task
413            let settings_clone = settings.clone();
414
415            // Spawn bootstrap task
416            tokio::spawn(async move {
417                match provider
418                    .bootstrap(request, &context, bootstrap_tx, Some(&settings_clone))
419                    .await
420                {
421                    Ok(count) => {
422                        info!(
423                            "Bootstrap completed successfully for query '{}', sent {count} events",
424                            settings_clone.query_id
425                        );
426                    }
427                    Err(e) => {
428                        error!(
429                            "Bootstrap failed for query '{}': {e}",
430                            settings_clone.query_id
431                        );
432                    }
433                }
434            });
435
436            Ok(Some(bootstrap_rx))
437        } else {
438            info!(
439                "Bootstrap requested for query '{}' but no bootstrap provider configured for {} source '{}'",
440                settings.query_id, source_type, self.id
441            );
442            Ok(None)
443        }
444    }
445
446    /// Dispatch a SourceChange event with profiling metadata
447    ///
448    /// This method handles the common pattern of:
449    /// - Creating profiling metadata with timestamp
450    /// - Wrapping the change in a SourceEventWrapper
451    /// - Dispatching to all subscribers
452    /// - Handling the no-subscriber case gracefully
453    pub async fn dispatch_source_change(&self, change: SourceChange) -> Result<()> {
454        // Create profiling metadata
455        let mut profiling = profiling::ProfilingMetadata::new();
456        profiling.source_send_ns = Some(profiling::timestamp_ns());
457
458        // Create event wrapper
459        let wrapper = SourceEventWrapper::with_profiling(
460            self.id.clone(),
461            SourceEvent::Change(change),
462            chrono::Utc::now(),
463            profiling,
464        );
465
466        // Dispatch event
467        self.dispatch_event(wrapper).await
468    }
469
470    /// Dispatch a SourceEventWrapper to all subscribers
471    ///
472    /// This is a generic method for dispatching any SourceEvent.
473    /// It handles Arc-wrapping for zero-copy sharing and logs
474    /// when there are no subscribers.
475    pub async fn dispatch_event(&self, wrapper: SourceEventWrapper) -> Result<()> {
476        debug!("[{}] Dispatching event: {:?}", self.id, &wrapper);
477
478        // Arc-wrap for zero-copy sharing across dispatchers
479        let arc_wrapper = Arc::new(wrapper);
480
481        // Send to all dispatchers
482        let dispatchers = self.dispatchers.read().await;
483        for dispatcher in dispatchers.iter() {
484            if let Err(e) = dispatcher.dispatch_change(arc_wrapper.clone()).await {
485                debug!("[{}] Failed to dispatch event: {}", self.id, e);
486            }
487        }
488
489        Ok(())
490    }
491
492    /// Broadcast SourceControl events
493    pub async fn broadcast_control(&self, control: SourceControl) -> Result<()> {
494        let wrapper = SourceEventWrapper::new(
495            self.id.clone(),
496            SourceEvent::Control(control),
497            chrono::Utc::now(),
498        );
499        self.dispatch_event(wrapper).await
500    }
501
502    /// Create a test subscription to this source (synchronous wrapper)
503    ///
504    /// This method is intended for use in tests to receive events from the source.
505    /// It properly handles both Broadcast and Channel dispatch modes by delegating
506    /// to `create_streaming_receiver()`, making the dispatch mode transparent to tests.
507    ///
508    /// Note: This is a synchronous wrapper that uses `tokio::task::block_in_place` internally.
509    /// For async contexts, prefer calling `create_streaming_receiver()` directly.
510    ///
511    /// # Returns
512    /// A receiver that will receive all events dispatched by this source
513    ///
514    /// # Panics
515    /// Panics if the receiver cannot be created (e.g., internal error)
516    pub fn test_subscribe(&self) -> Box<dyn ChangeReceiver<SourceEventWrapper>> {
517        // Use block_in_place to avoid nested executor issues in async tests
518        tokio::task::block_in_place(|| {
519            tokio::runtime::Handle::current().block_on(self.create_streaming_receiver())
520        })
521        .expect("Failed to create test subscription receiver")
522    }
523
524    /// Helper function to dispatch events from spawned tasks
525    ///
526    /// This is a static helper that can be used from spawned async tasks that don't
527    /// have access to `self`. It manually iterates through dispatchers and sends the event.
528    ///
529    /// For code that has access to `&self`, prefer using `dispatch_event()` instead.
530    ///
531    /// # Arguments
532    /// * `dispatchers` - Arc to the dispatchers list (from `self.base.dispatchers.clone()`)
533    /// * `wrapper` - The event wrapper to dispatch
534    /// * `source_id` - Source ID for logging
535    pub async fn dispatch_from_task(
536        dispatchers: Arc<RwLock<Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>>>>,
537        wrapper: SourceEventWrapper,
538        source_id: &str,
539    ) -> Result<()> {
540        debug!(
541            "[{}] Dispatching event from task: {:?}",
542            source_id, &wrapper
543        );
544
545        // Arc-wrap for zero-copy sharing across dispatchers
546        let arc_wrapper = Arc::new(wrapper);
547
548        // Send to all dispatchers
549        let dispatchers_guard = dispatchers.read().await;
550        for dispatcher in dispatchers_guard.iter() {
551            if let Err(e) = dispatcher.dispatch_change(arc_wrapper.clone()).await {
552                debug!("[{source_id}] Failed to dispatch event from task: {e}");
553            }
554        }
555
556        Ok(())
557    }
558
559    /// Handle common stop functionality
560    pub async fn stop_common(&self) -> Result<()> {
561        info!("Stopping source '{}'", self.id);
562
563        // Send shutdown signal if we have one
564        if let Some(tx) = self.shutdown_tx.write().await.take() {
565            let _ = tx.send(());
566        }
567
568        // Wait for task to complete
569        if let Some(handle) = self.task_handle.write().await.take() {
570            match tokio::time::timeout(std::time::Duration::from_secs(5), handle).await {
571                Ok(Ok(())) => {
572                    info!("Source '{}' task completed successfully", self.id);
573                }
574                Ok(Err(e)) => {
575                    error!("Source '{}' task panicked: {}", self.id, e);
576                }
577                Err(_) => {
578                    error!("Source '{}' task did not complete within timeout", self.id);
579                }
580            }
581        }
582
583        *self.status.write().await = ComponentStatus::Stopped;
584        info!("Source '{}' stopped", self.id);
585        Ok(())
586    }
587
588    /// Get the current status
589    pub async fn get_status(&self) -> ComponentStatus {
590        self.status.read().await.clone()
591    }
592
593    /// Set the current status
594    pub async fn set_status(&self, status: ComponentStatus) {
595        *self.status.write().await = status;
596    }
597
598    /// Transition to a new status and send event
599    pub async fn set_status_with_event(
600        &self,
601        status: ComponentStatus,
602        message: Option<String>,
603    ) -> Result<()> {
604        *self.status.write().await = status.clone();
605        self.send_component_event(status, message).await
606    }
607
608    /// Set the task handle
609    pub async fn set_task_handle(&self, handle: tokio::task::JoinHandle<()>) {
610        *self.task_handle.write().await = Some(handle);
611    }
612
613    /// Set the shutdown sender
614    pub async fn set_shutdown_tx(&self, tx: tokio::sync::oneshot::Sender<()>) {
615        *self.shutdown_tx.write().await = Some(tx);
616    }
617
618    /// Send a component event
619    ///
620    /// If the status channel has not been initialized yet, this method silently
621    /// succeeds without sending anything. This allows sources to be used
622    /// in a standalone fashion without DrasiLib if needed.
623    pub async fn send_component_event(
624        &self,
625        status: ComponentStatus,
626        message: Option<String>,
627    ) -> Result<()> {
628        let event = ComponentEvent {
629            component_id: self.id.clone(),
630            component_type: ComponentType::Source,
631            status,
632            timestamp: chrono::Utc::now(),
633            message,
634        };
635
636        if let Some(ref tx) = *self.status_tx.read().await {
637            if let Err(e) = tx.send(event).await {
638                error!("Failed to send component event: {e}");
639            }
640        }
641        // If status_tx is None, silently skip - initialization happens before start()
642        Ok(())
643    }
644}