Skip to main content

drasi_lib/sources/
base.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Base implementation for common source functionality.
16//!
17//! This module provides `SourceBase` which encapsulates common patterns
18//! used across all source implementations:
19//! - Dispatcher setup and management
20//! - Bootstrap subscription handling
21//! - Event dispatching with profiling
22//! - Component lifecycle management
23//!
24//! # Plugin Architecture
25//!
26//! SourceBase is designed to be used by source plugins. Each plugin:
27//! 1. Defines its own typed configuration struct
28//! 2. Creates a SourceBase with SourceBaseParams
29//! 3. Optionally provides a bootstrap provider via `set_bootstrap_provider()`
30//! 4. Implements the Source trait delegating to SourceBase methods
31
32use anyhow::Result;
33use log::{debug, error, info};
34use std::sync::Arc;
35use tokio::sync::RwLock;
36use tracing::Instrument;
37
38use crate::bootstrap::{BootstrapContext, BootstrapProvider, BootstrapRequest};
39use crate::channels::*;
40use crate::context::SourceRuntimeContext;
41use crate::profiling;
42use crate::state_store::StateStoreProvider;
43use drasi_core::models::SourceChange;
44
45/// Parameters for creating a SourceBase instance.
46///
47/// This struct contains only the information that SourceBase needs to function.
48/// Plugin-specific configuration should remain in the plugin crate.
49///
50/// # Example
51///
52/// ```ignore
53/// use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
54///
55/// let params = SourceBaseParams::new("my-source")
56///     .with_dispatch_mode(DispatchMode::Channel)
57///     .with_dispatch_buffer_capacity(2000)
58///     .with_bootstrap_provider(my_provider);
59///
60/// let base = SourceBase::new(params)?;
61/// ```
62pub struct SourceBaseParams {
63    /// Unique identifier for the source
64    pub id: String,
65    /// Dispatch mode (Broadcast or Channel) - defaults to Channel
66    pub dispatch_mode: Option<DispatchMode>,
67    /// Dispatch buffer capacity - defaults to 1000
68    pub dispatch_buffer_capacity: Option<usize>,
69    /// Optional bootstrap provider to set during construction
70    pub bootstrap_provider: Option<Box<dyn BootstrapProvider + 'static>>,
71    /// Whether this source should auto-start - defaults to true
72    pub auto_start: bool,
73}
74
75impl std::fmt::Debug for SourceBaseParams {
76    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77        f.debug_struct("SourceBaseParams")
78            .field("id", &self.id)
79            .field("dispatch_mode", &self.dispatch_mode)
80            .field("dispatch_buffer_capacity", &self.dispatch_buffer_capacity)
81            .field(
82                "bootstrap_provider",
83                &self.bootstrap_provider.as_ref().map(|_| "<provider>"),
84            )
85            .field("auto_start", &self.auto_start)
86            .finish()
87    }
88}
89
90impl SourceBaseParams {
91    /// Create new params with just an ID, using defaults for everything else
92    pub fn new(id: impl Into<String>) -> Self {
93        Self {
94            id: id.into(),
95            dispatch_mode: None,
96            dispatch_buffer_capacity: None,
97            bootstrap_provider: None,
98            auto_start: true,
99        }
100    }
101
102    /// Set the dispatch mode
103    pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
104        self.dispatch_mode = Some(mode);
105        self
106    }
107
108    /// Set the dispatch buffer capacity
109    pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
110        self.dispatch_buffer_capacity = Some(capacity);
111        self
112    }
113
114    /// Set the bootstrap provider
115    ///
116    /// This provider will be used during source subscription to deliver
117    /// initial data to queries that request bootstrap.
118    pub fn with_bootstrap_provider(mut self, provider: impl BootstrapProvider + 'static) -> Self {
119        self.bootstrap_provider = Some(Box::new(provider));
120        self
121    }
122
123    /// Set whether this source should auto-start
124    ///
125    /// Default is `true`. Set to `false` if this source should only be
126    /// started manually via `start_source()`.
127    pub fn with_auto_start(mut self, auto_start: bool) -> Self {
128        self.auto_start = auto_start;
129        self
130    }
131}
132
133/// Base implementation for common source functionality
134pub struct SourceBase {
135    /// Source identifier
136    pub id: String,
137    /// Dispatch mode setting
138    dispatch_mode: DispatchMode,
139    /// Dispatch buffer capacity
140    dispatch_buffer_capacity: usize,
141    /// Whether this source should auto-start
142    pub auto_start: bool,
143    /// Current component status
144    pub status: Arc<RwLock<ComponentStatus>>,
145    /// Dispatchers for sending source events to subscribers
146    ///
147    /// This is a vector of dispatchers that send source events to all registered
148    /// subscribers (queries). When a source produces a change event, it broadcasts
149    /// it to all dispatchers in this vector.
150    pub dispatchers: Arc<RwLock<Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>>>>,
151    /// Runtime context (set by initialize())
152    context: Arc<RwLock<Option<SourceRuntimeContext>>>,
153    /// Channel for sending component status events (extracted from context for convenience)
154    status_tx: Arc<RwLock<Option<ComponentEventSender>>>,
155    /// State store provider (extracted from context for convenience)
156    state_store: Arc<RwLock<Option<Arc<dyn StateStoreProvider>>>>,
157    /// Handle to the source's main task
158    pub task_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
159    /// Sender for shutdown signal
160    pub shutdown_tx: Arc<RwLock<Option<tokio::sync::oneshot::Sender<()>>>>,
161    /// Optional bootstrap provider - plugins set this if they support bootstrap
162    bootstrap_provider: Arc<RwLock<Option<Arc<dyn BootstrapProvider>>>>,
163}
164
165impl SourceBase {
166    /// Create a new SourceBase with the given parameters
167    ///
168    /// The status channel is not required during construction - it will be
169    /// provided via the `SourceRuntimeContext` when `initialize()` is called.
170    ///
171    /// If a bootstrap provider is specified in params, it will be set during
172    /// construction (no async needed since nothing is shared yet).
173    pub fn new(params: SourceBaseParams) -> Result<Self> {
174        // Determine dispatch mode (default to Channel if not specified)
175        let dispatch_mode = params.dispatch_mode.unwrap_or_default();
176        let dispatch_buffer_capacity = params.dispatch_buffer_capacity.unwrap_or(1000);
177
178        // Set up initial dispatchers based on dispatch mode
179        let mut dispatchers: Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>> =
180            Vec::new();
181
182        if dispatch_mode == DispatchMode::Broadcast {
183            // For broadcast mode, create a single broadcast dispatcher
184            let dispatcher =
185                BroadcastChangeDispatcher::<SourceEventWrapper>::new(dispatch_buffer_capacity);
186            dispatchers.push(Box::new(dispatcher));
187        }
188        // For channel mode, dispatchers will be created on-demand when subscribing
189
190        // Initialize bootstrap provider if provided (no async needed at construction time)
191        let bootstrap_provider = params
192            .bootstrap_provider
193            .map(|p| Arc::from(p) as Arc<dyn BootstrapProvider>);
194
195        Ok(Self {
196            id: params.id,
197            dispatch_mode,
198            dispatch_buffer_capacity,
199            auto_start: params.auto_start,
200            status: Arc::new(RwLock::new(ComponentStatus::Stopped)),
201            dispatchers: Arc::new(RwLock::new(dispatchers)),
202            context: Arc::new(RwLock::new(None)), // Set by initialize()
203            status_tx: Arc::new(RwLock::new(None)), // Extracted from context
204            state_store: Arc::new(RwLock::new(None)), // Extracted from context
205            task_handle: Arc::new(RwLock::new(None)),
206            shutdown_tx: Arc::new(RwLock::new(None)),
207            bootstrap_provider: Arc::new(RwLock::new(bootstrap_provider)),
208        })
209    }
210
211    /// Get whether this source should auto-start
212    pub fn get_auto_start(&self) -> bool {
213        self.auto_start
214    }
215
216    /// Initialize the source with runtime context.
217    ///
218    /// This method is called automatically by DrasiLib's `add_source()` method.
219    /// Plugin developers do not need to call this directly.
220    ///
221    /// The context provides access to:
222    /// - `source_id`: The source's unique identifier
223    /// - `status_tx`: Channel for reporting component status events
224    /// - `state_store`: Optional persistent state storage
225    pub async fn initialize(&self, context: SourceRuntimeContext) {
226        // Store context for later use
227        *self.context.write().await = Some(context.clone());
228
229        // Extract services for convenience
230        *self.status_tx.write().await = Some(context.status_tx.clone());
231
232        if let Some(state_store) = context.state_store.as_ref() {
233            *self.state_store.write().await = Some(state_store.clone());
234        }
235    }
236
237    /// Get the runtime context if initialized.
238    ///
239    /// Returns `None` if `initialize()` has not been called yet.
240    pub async fn context(&self) -> Option<SourceRuntimeContext> {
241        self.context.read().await.clone()
242    }
243
244    /// Get the state store if configured.
245    ///
246    /// Returns `None` if no state store was provided in the context.
247    pub async fn state_store(&self) -> Option<Arc<dyn StateStoreProvider>> {
248        self.state_store.read().await.clone()
249    }
250
251    /// Get the status channel Arc for internal use by spawned tasks
252    ///
253    /// This returns the internal status_tx wrapped in Arc<RwLock<Option<...>>>
254    /// which allows background tasks to send component status events.
255    ///
256    /// Returns a clone of the Arc that can be moved into spawned tasks.
257    pub fn status_tx(&self) -> Arc<RwLock<Option<ComponentEventSender>>> {
258        self.status_tx.clone()
259    }
260
261    /// Clone the SourceBase with shared Arc references
262    ///
263    /// This creates a new SourceBase that shares the same underlying
264    /// data through Arc references. Useful for passing to spawned tasks.
265    pub fn clone_shared(&self) -> Self {
266        Self {
267            id: self.id.clone(),
268            dispatch_mode: self.dispatch_mode,
269            dispatch_buffer_capacity: self.dispatch_buffer_capacity,
270            auto_start: self.auto_start,
271            status: self.status.clone(),
272            dispatchers: self.dispatchers.clone(),
273            context: self.context.clone(),
274            status_tx: self.status_tx.clone(),
275            state_store: self.state_store.clone(),
276            task_handle: self.task_handle.clone(),
277            shutdown_tx: self.shutdown_tx.clone(),
278            bootstrap_provider: self.bootstrap_provider.clone(),
279        }
280    }
281
282    /// Set the bootstrap provider for this source, taking ownership.
283    ///
284    /// Call this after creating the SourceBase if the source plugin supports bootstrapping.
285    /// The bootstrap provider is created by the plugin using its own configuration.
286    ///
287    /// # Example
288    /// ```ignore
289    /// let provider = MyBootstrapProvider::new(config);
290    /// source_base.set_bootstrap_provider(provider).await;  // Ownership transferred
291    /// ```
292    pub async fn set_bootstrap_provider(&self, provider: impl BootstrapProvider + 'static) {
293        *self.bootstrap_provider.write().await = Some(Arc::new(provider));
294    }
295
296    /// Get the source ID
297    pub fn get_id(&self) -> &str {
298        &self.id
299    }
300
301    /// Create a streaming receiver for a query subscription
302    ///
303    /// This creates the appropriate receiver based on the configured dispatch mode:
304    /// - Broadcast mode: Returns a receiver from the shared broadcast dispatcher
305    /// - Channel mode: Creates a new dedicated dispatcher and returns its receiver
306    ///
307    /// This is a helper method that can be used by sources with custom subscribe logic.
308    pub async fn create_streaming_receiver(
309        &self,
310    ) -> Result<Box<dyn ChangeReceiver<SourceEventWrapper>>> {
311        let receiver: Box<dyn ChangeReceiver<SourceEventWrapper>> = match self.dispatch_mode {
312            DispatchMode::Broadcast => {
313                // For broadcast mode, use the single dispatcher
314                let dispatchers = self.dispatchers.read().await;
315                if let Some(dispatcher) = dispatchers.first() {
316                    dispatcher.create_receiver().await?
317                } else {
318                    return Err(anyhow::anyhow!("No broadcast dispatcher available"));
319                }
320            }
321            DispatchMode::Channel => {
322                // For channel mode, create a new dispatcher for this subscription
323                let dispatcher = ChannelChangeDispatcher::<SourceEventWrapper>::new(
324                    self.dispatch_buffer_capacity,
325                );
326                let receiver = dispatcher.create_receiver().await?;
327
328                // Add the new dispatcher to our list
329                let mut dispatchers = self.dispatchers.write().await;
330                dispatchers.push(Box::new(dispatcher));
331
332                receiver
333            }
334        };
335
336        Ok(receiver)
337    }
338
339    /// Subscribe to this source with optional bootstrap
340    ///
341    /// This is the standard subscribe implementation that all sources can use.
342    /// It handles:
343    /// - Creating a receiver for streaming events (based on dispatch mode)
344    /// - Setting up bootstrap if requested and a provider has been set
345    /// - Returning the appropriate SubscriptionResponse
346    pub async fn subscribe_with_bootstrap(
347        &self,
348        settings: &crate::config::SourceSubscriptionSettings,
349        source_type: &str,
350    ) -> Result<SubscriptionResponse> {
351        info!(
352            "Query '{}' subscribing to {} source '{}' (bootstrap: {})",
353            settings.query_id, source_type, self.id, settings.enable_bootstrap
354        );
355
356        // Create streaming receiver using helper method
357        let receiver = self.create_streaming_receiver().await?;
358
359        let query_id_for_response = settings.query_id.clone();
360
361        // Handle bootstrap if requested and bootstrap provider is configured
362        let bootstrap_receiver = if settings.enable_bootstrap {
363            self.handle_bootstrap_subscription(settings, source_type)
364                .await?
365        } else {
366            None
367        };
368
369        Ok(SubscriptionResponse {
370            query_id: query_id_for_response,
371            source_id: self.id.clone(),
372            receiver,
373            bootstrap_receiver,
374        })
375    }
376
377    /// Handle bootstrap subscription logic
378    async fn handle_bootstrap_subscription(
379        &self,
380        settings: &crate::config::SourceSubscriptionSettings,
381        source_type: &str,
382    ) -> Result<Option<BootstrapEventReceiver>> {
383        let provider_guard = self.bootstrap_provider.read().await;
384        if let Some(provider) = provider_guard.clone() {
385            drop(provider_guard); // Release lock before spawning task
386
387            info!(
388                "Creating bootstrap for query '{}' on {} source '{}'",
389                settings.query_id, source_type, self.id
390            );
391
392            // Create bootstrap context
393            let context = BootstrapContext::new_minimal(
394                self.id.clone(), // server_id
395                self.id.clone(), // source_id
396            );
397
398            // Create bootstrap channel
399            let (bootstrap_tx, bootstrap_rx) = tokio::sync::mpsc::channel(1000);
400
401            // Convert HashSet to Vec for backward compatibility with BootstrapRequest
402            let node_labels: Vec<String> = settings.nodes.iter().cloned().collect();
403            let relation_labels: Vec<String> = settings.relations.iter().cloned().collect();
404
405            // Create bootstrap request with request_id
406            let request = BootstrapRequest {
407                query_id: settings.query_id.clone(),
408                node_labels,
409                relation_labels,
410                request_id: format!("{}-{}", settings.query_id, uuid::Uuid::new_v4()),
411            };
412
413            // Clone settings for the async task
414            let settings_clone = settings.clone();
415            let source_id = self.id.clone();
416
417            // Get instance_id from context for log routing isolation
418            let instance_id = self
419                .context()
420                .await
421                .map(|c| c.instance_id.clone())
422                .unwrap_or_default();
423
424            // Spawn bootstrap task with tracing span for proper log routing
425            let span = tracing::info_span!(
426                "source_bootstrap",
427                instance_id = %instance_id,
428                component_id = %source_id,
429                component_type = "source"
430            );
431            tokio::spawn(
432                async move {
433                    match provider
434                        .bootstrap(request, &context, bootstrap_tx, Some(&settings_clone))
435                        .await
436                    {
437                        Ok(count) => {
438                            info!(
439                                "Bootstrap completed successfully for query '{}', sent {count} events",
440                                settings_clone.query_id
441                            );
442                        }
443                        Err(e) => {
444                            error!(
445                                "Bootstrap failed for query '{}': {e}",
446                                settings_clone.query_id
447                            );
448                        }
449                    }
450                }
451                .instrument(span),
452            );
453
454            Ok(Some(bootstrap_rx))
455        } else {
456            info!(
457                "Bootstrap requested for query '{}' but no bootstrap provider configured for {} source '{}'",
458                settings.query_id, source_type, self.id
459            );
460            Ok(None)
461        }
462    }
463
464    /// Dispatch a SourceChange event with profiling metadata
465    ///
466    /// This method handles the common pattern of:
467    /// - Creating profiling metadata with timestamp
468    /// - Wrapping the change in a SourceEventWrapper
469    /// - Dispatching to all subscribers
470    /// - Handling the no-subscriber case gracefully
471    pub async fn dispatch_source_change(&self, change: SourceChange) -> Result<()> {
472        // Create profiling metadata
473        let mut profiling = profiling::ProfilingMetadata::new();
474        profiling.source_send_ns = Some(profiling::timestamp_ns());
475
476        // Create event wrapper
477        let wrapper = SourceEventWrapper::with_profiling(
478            self.id.clone(),
479            SourceEvent::Change(change),
480            chrono::Utc::now(),
481            profiling,
482        );
483
484        // Dispatch event
485        self.dispatch_event(wrapper).await
486    }
487
488    /// Dispatch a SourceEventWrapper to all subscribers
489    ///
490    /// This is a generic method for dispatching any SourceEvent.
491    /// It handles Arc-wrapping for zero-copy sharing and logs
492    /// when there are no subscribers.
493    pub async fn dispatch_event(&self, wrapper: SourceEventWrapper) -> Result<()> {
494        debug!("[{}] Dispatching event: {:?}", self.id, &wrapper);
495
496        // Arc-wrap for zero-copy sharing across dispatchers
497        let arc_wrapper = Arc::new(wrapper);
498
499        // Send to all dispatchers
500        let dispatchers = self.dispatchers.read().await;
501        for dispatcher in dispatchers.iter() {
502            if let Err(e) = dispatcher.dispatch_change(arc_wrapper.clone()).await {
503                debug!("[{}] Failed to dispatch event: {}", self.id, e);
504            }
505        }
506
507        Ok(())
508    }
509
510    /// Broadcast SourceControl events
511    pub async fn broadcast_control(&self, control: SourceControl) -> Result<()> {
512        let wrapper = SourceEventWrapper::new(
513            self.id.clone(),
514            SourceEvent::Control(control),
515            chrono::Utc::now(),
516        );
517        self.dispatch_event(wrapper).await
518    }
519
520    /// Create a test subscription to this source (synchronous wrapper)
521    ///
522    /// This method is intended for use in tests to receive events from the source.
523    /// It properly handles both Broadcast and Channel dispatch modes by delegating
524    /// to `create_streaming_receiver()`, making the dispatch mode transparent to tests.
525    ///
526    /// Note: This is a synchronous wrapper that uses `tokio::task::block_in_place` internally.
527    /// For async contexts, prefer calling `create_streaming_receiver()` directly.
528    ///
529    /// # Returns
530    /// A receiver that will receive all events dispatched by this source
531    ///
532    /// # Panics
533    /// Panics if the receiver cannot be created (e.g., internal error)
534    pub fn test_subscribe(&self) -> Box<dyn ChangeReceiver<SourceEventWrapper>> {
535        // Use block_in_place to avoid nested executor issues in async tests
536        tokio::task::block_in_place(|| {
537            tokio::runtime::Handle::current().block_on(self.create_streaming_receiver())
538        })
539        .expect("Failed to create test subscription receiver")
540    }
541
542    /// Helper function to dispatch events from spawned tasks
543    ///
544    /// This is a static helper that can be used from spawned async tasks that don't
545    /// have access to `self`. It manually iterates through dispatchers and sends the event.
546    ///
547    /// For code that has access to `&self`, prefer using `dispatch_event()` instead.
548    ///
549    /// # Arguments
550    /// * `dispatchers` - Arc to the dispatchers list (from `self.base.dispatchers.clone()`)
551    /// * `wrapper` - The event wrapper to dispatch
552    /// * `source_id` - Source ID for logging
553    pub async fn dispatch_from_task(
554        dispatchers: Arc<RwLock<Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>>>>,
555        wrapper: SourceEventWrapper,
556        source_id: &str,
557    ) -> Result<()> {
558        debug!(
559            "[{}] Dispatching event from task: {:?}",
560            source_id, &wrapper
561        );
562
563        // Arc-wrap for zero-copy sharing across dispatchers
564        let arc_wrapper = Arc::new(wrapper);
565
566        // Send to all dispatchers
567        let dispatchers_guard = dispatchers.read().await;
568        for dispatcher in dispatchers_guard.iter() {
569            if let Err(e) = dispatcher.dispatch_change(arc_wrapper.clone()).await {
570                debug!("[{source_id}] Failed to dispatch event from task: {e}");
571            }
572        }
573
574        Ok(())
575    }
576
577    /// Handle common stop functionality
578    pub async fn stop_common(&self) -> Result<()> {
579        info!("Stopping source '{}'", self.id);
580
581        // Send shutdown signal if we have one
582        if let Some(tx) = self.shutdown_tx.write().await.take() {
583            let _ = tx.send(());
584        }
585
586        // Wait for task to complete
587        if let Some(handle) = self.task_handle.write().await.take() {
588            match tokio::time::timeout(std::time::Duration::from_secs(5), handle).await {
589                Ok(Ok(())) => {
590                    info!("Source '{}' task completed successfully", self.id);
591                }
592                Ok(Err(e)) => {
593                    error!("Source '{}' task panicked: {}", self.id, e);
594                }
595                Err(_) => {
596                    error!("Source '{}' task did not complete within timeout", self.id);
597                }
598            }
599        }
600
601        *self.status.write().await = ComponentStatus::Stopped;
602        info!("Source '{}' stopped", self.id);
603        Ok(())
604    }
605
606    /// Clear the source's state store partition.
607    ///
608    /// This is called during deprovision to remove all persisted state
609    /// associated with this source. Sources that override `deprovision()`
610    /// can call this to clean up their state store.
611    pub async fn deprovision_common(&self) -> Result<()> {
612        info!("Deprovisioning source '{}'", self.id);
613        if let Some(store) = self.state_store().await {
614            let count = store.clear_store(&self.id).await.map_err(|e| {
615                anyhow::anyhow!(
616                    "Failed to clear state store for source '{}': {}",
617                    self.id,
618                    e
619                )
620            })?;
621            info!(
622                "Cleared {} keys from state store for source '{}'",
623                count, self.id
624            );
625        }
626        Ok(())
627    }
628
629    /// Get the current status
630    pub async fn get_status(&self) -> ComponentStatus {
631        self.status.read().await.clone()
632    }
633
634    /// Set the current status
635    pub async fn set_status(&self, status: ComponentStatus) {
636        *self.status.write().await = status;
637    }
638
639    /// Transition to a new status and send event
640    pub async fn set_status_with_event(
641        &self,
642        status: ComponentStatus,
643        message: Option<String>,
644    ) -> Result<()> {
645        *self.status.write().await = status.clone();
646        self.send_component_event(status, message).await
647    }
648
649    /// Set the task handle
650    pub async fn set_task_handle(&self, handle: tokio::task::JoinHandle<()>) {
651        *self.task_handle.write().await = Some(handle);
652    }
653
654    /// Set the shutdown sender
655    pub async fn set_shutdown_tx(&self, tx: tokio::sync::oneshot::Sender<()>) {
656        *self.shutdown_tx.write().await = Some(tx);
657    }
658
659    /// Send a component event
660    ///
661    /// If the status channel has not been initialized yet, this method silently
662    /// succeeds without sending anything. This allows sources to be used
663    /// in a standalone fashion without DrasiLib if needed.
664    pub async fn send_component_event(
665        &self,
666        status: ComponentStatus,
667        message: Option<String>,
668    ) -> Result<()> {
669        let event = ComponentEvent {
670            component_id: self.id.clone(),
671            component_type: ComponentType::Source,
672            status,
673            timestamp: chrono::Utc::now(),
674            message,
675        };
676
677        if let Some(ref tx) = *self.status_tx.read().await {
678            if let Err(e) = tx.send(event).await {
679                error!("Failed to send component event: {e}");
680            }
681        }
682        // If status_tx is None, silently skip - initialization happens before start()
683        Ok(())
684    }
685}