drasi_lib/sources/base.rs
1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Base implementation for common source functionality.
16//!
17//! This module provides `SourceBase` which encapsulates common patterns
18//! used across all source implementations:
19//! - Dispatcher setup and management
20//! - Bootstrap subscription handling
21//! - Event dispatching with profiling
22//! - Component lifecycle management
23//!
24//! # Plugin Architecture
25//!
26//! SourceBase is designed to be used by source plugins. Each plugin:
27//! 1. Defines its own typed configuration struct
28//! 2. Creates a SourceBase with SourceBaseParams
29//! 3. Optionally provides a bootstrap provider via `set_bootstrap_provider()`
30//! 4. Implements the Source trait delegating to SourceBase methods
31
32use anyhow::Result;
33use log::{debug, error, info};
34use std::sync::Arc;
35use tokio::sync::RwLock;
36use tracing::Instrument;
37
38use crate::bootstrap::{BootstrapContext, BootstrapProvider, BootstrapRequest};
39use crate::channels::*;
40use crate::context::SourceRuntimeContext;
41use crate::profiling;
42use crate::state_store::StateStoreProvider;
43use drasi_core::models::SourceChange;
44
45/// Parameters for creating a SourceBase instance.
46///
47/// This struct contains only the information that SourceBase needs to function.
48/// Plugin-specific configuration should remain in the plugin crate.
49///
50/// # Example
51///
52/// ```ignore
53/// use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
54///
55/// let params = SourceBaseParams::new("my-source")
56/// .with_dispatch_mode(DispatchMode::Channel)
57/// .with_dispatch_buffer_capacity(2000)
58/// .with_bootstrap_provider(my_provider);
59///
60/// let base = SourceBase::new(params)?;
61/// ```
62pub struct SourceBaseParams {
63 /// Unique identifier for the source
64 pub id: String,
65 /// Dispatch mode (Broadcast or Channel) - defaults to Channel
66 pub dispatch_mode: Option<DispatchMode>,
67 /// Dispatch buffer capacity - defaults to 1000
68 pub dispatch_buffer_capacity: Option<usize>,
69 /// Optional bootstrap provider to set during construction
70 pub bootstrap_provider: Option<Box<dyn BootstrapProvider + 'static>>,
71 /// Whether this source should auto-start - defaults to true
72 pub auto_start: bool,
73}
74
75impl std::fmt::Debug for SourceBaseParams {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 f.debug_struct("SourceBaseParams")
78 .field("id", &self.id)
79 .field("dispatch_mode", &self.dispatch_mode)
80 .field("dispatch_buffer_capacity", &self.dispatch_buffer_capacity)
81 .field(
82 "bootstrap_provider",
83 &self.bootstrap_provider.as_ref().map(|_| "<provider>"),
84 )
85 .field("auto_start", &self.auto_start)
86 .finish()
87 }
88}
89
90impl SourceBaseParams {
91 /// Create new params with just an ID, using defaults for everything else
92 pub fn new(id: impl Into<String>) -> Self {
93 Self {
94 id: id.into(),
95 dispatch_mode: None,
96 dispatch_buffer_capacity: None,
97 bootstrap_provider: None,
98 auto_start: true,
99 }
100 }
101
102 /// Set the dispatch mode
103 pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
104 self.dispatch_mode = Some(mode);
105 self
106 }
107
108 /// Set the dispatch buffer capacity
109 pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
110 self.dispatch_buffer_capacity = Some(capacity);
111 self
112 }
113
114 /// Set the bootstrap provider
115 ///
116 /// This provider will be used during source subscription to deliver
117 /// initial data to queries that request bootstrap.
118 pub fn with_bootstrap_provider(mut self, provider: impl BootstrapProvider + 'static) -> Self {
119 self.bootstrap_provider = Some(Box::new(provider));
120 self
121 }
122
123 /// Set whether this source should auto-start
124 ///
125 /// Default is `true`. Set to `false` if this source should only be
126 /// started manually via `start_source()`.
127 pub fn with_auto_start(mut self, auto_start: bool) -> Self {
128 self.auto_start = auto_start;
129 self
130 }
131}
132
133/// Base implementation for common source functionality
134pub struct SourceBase {
135 /// Source identifier
136 pub id: String,
137 /// Dispatch mode setting
138 dispatch_mode: DispatchMode,
139 /// Dispatch buffer capacity
140 dispatch_buffer_capacity: usize,
141 /// Whether this source should auto-start
142 pub auto_start: bool,
143 /// Current component status
144 pub status: Arc<RwLock<ComponentStatus>>,
145 /// Dispatchers for sending source events to subscribers
146 ///
147 /// This is a vector of dispatchers that send source events to all registered
148 /// subscribers (queries). When a source produces a change event, it broadcasts
149 /// it to all dispatchers in this vector.
150 pub dispatchers: Arc<RwLock<Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>>>>,
151 /// Runtime context (set by initialize())
152 context: Arc<RwLock<Option<SourceRuntimeContext>>>,
153 /// Channel for sending component status events (extracted from context for convenience)
154 status_tx: Arc<RwLock<Option<ComponentEventSender>>>,
155 /// State store provider (extracted from context for convenience)
156 state_store: Arc<RwLock<Option<Arc<dyn StateStoreProvider>>>>,
157 /// Handle to the source's main task
158 pub task_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
159 /// Sender for shutdown signal
160 pub shutdown_tx: Arc<RwLock<Option<tokio::sync::oneshot::Sender<()>>>>,
161 /// Optional bootstrap provider - plugins set this if they support bootstrap
162 bootstrap_provider: Arc<RwLock<Option<Arc<dyn BootstrapProvider>>>>,
163}
164
165impl SourceBase {
166 /// Create a new SourceBase with the given parameters
167 ///
168 /// The status channel is not required during construction - it will be
169 /// provided via the `SourceRuntimeContext` when `initialize()` is called.
170 ///
171 /// If a bootstrap provider is specified in params, it will be set during
172 /// construction (no async needed since nothing is shared yet).
173 pub fn new(params: SourceBaseParams) -> Result<Self> {
174 // Determine dispatch mode (default to Channel if not specified)
175 let dispatch_mode = params.dispatch_mode.unwrap_or_default();
176 let dispatch_buffer_capacity = params.dispatch_buffer_capacity.unwrap_or(1000);
177
178 // Set up initial dispatchers based on dispatch mode
179 let mut dispatchers: Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>> =
180 Vec::new();
181
182 if dispatch_mode == DispatchMode::Broadcast {
183 // For broadcast mode, create a single broadcast dispatcher
184 let dispatcher =
185 BroadcastChangeDispatcher::<SourceEventWrapper>::new(dispatch_buffer_capacity);
186 dispatchers.push(Box::new(dispatcher));
187 }
188 // For channel mode, dispatchers will be created on-demand when subscribing
189
190 // Initialize bootstrap provider if provided (no async needed at construction time)
191 let bootstrap_provider = params
192 .bootstrap_provider
193 .map(|p| Arc::from(p) as Arc<dyn BootstrapProvider>);
194
195 Ok(Self {
196 id: params.id,
197 dispatch_mode,
198 dispatch_buffer_capacity,
199 auto_start: params.auto_start,
200 status: Arc::new(RwLock::new(ComponentStatus::Stopped)),
201 dispatchers: Arc::new(RwLock::new(dispatchers)),
202 context: Arc::new(RwLock::new(None)), // Set by initialize()
203 status_tx: Arc::new(RwLock::new(None)), // Extracted from context
204 state_store: Arc::new(RwLock::new(None)), // Extracted from context
205 task_handle: Arc::new(RwLock::new(None)),
206 shutdown_tx: Arc::new(RwLock::new(None)),
207 bootstrap_provider: Arc::new(RwLock::new(bootstrap_provider)),
208 })
209 }
210
211 /// Get whether this source should auto-start
212 pub fn get_auto_start(&self) -> bool {
213 self.auto_start
214 }
215
216 /// Initialize the source with runtime context.
217 ///
218 /// This method is called automatically by DrasiLib's `add_source()` method.
219 /// Plugin developers do not need to call this directly.
220 ///
221 /// The context provides access to:
222 /// - `source_id`: The source's unique identifier
223 /// - `status_tx`: Channel for reporting component status events
224 /// - `state_store`: Optional persistent state storage
225 pub async fn initialize(&self, context: SourceRuntimeContext) {
226 // Store context for later use
227 *self.context.write().await = Some(context.clone());
228
229 // Extract services for convenience
230 *self.status_tx.write().await = Some(context.status_tx.clone());
231
232 if let Some(state_store) = context.state_store.as_ref() {
233 *self.state_store.write().await = Some(state_store.clone());
234 }
235 }
236
237 /// Get the runtime context if initialized.
238 ///
239 /// Returns `None` if `initialize()` has not been called yet.
240 pub async fn context(&self) -> Option<SourceRuntimeContext> {
241 self.context.read().await.clone()
242 }
243
244 /// Get the state store if configured.
245 ///
246 /// Returns `None` if no state store was provided in the context.
247 pub async fn state_store(&self) -> Option<Arc<dyn StateStoreProvider>> {
248 self.state_store.read().await.clone()
249 }
250
251 /// Get the status channel Arc for internal use by spawned tasks
252 ///
253 /// This returns the internal status_tx wrapped in Arc<RwLock<Option<...>>>
254 /// which allows background tasks to send component status events.
255 ///
256 /// Returns a clone of the Arc that can be moved into spawned tasks.
257 pub fn status_tx(&self) -> Arc<RwLock<Option<ComponentEventSender>>> {
258 self.status_tx.clone()
259 }
260
261 /// Clone the SourceBase with shared Arc references
262 ///
263 /// This creates a new SourceBase that shares the same underlying
264 /// data through Arc references. Useful for passing to spawned tasks.
265 pub fn clone_shared(&self) -> Self {
266 Self {
267 id: self.id.clone(),
268 dispatch_mode: self.dispatch_mode,
269 dispatch_buffer_capacity: self.dispatch_buffer_capacity,
270 auto_start: self.auto_start,
271 status: self.status.clone(),
272 dispatchers: self.dispatchers.clone(),
273 context: self.context.clone(),
274 status_tx: self.status_tx.clone(),
275 state_store: self.state_store.clone(),
276 task_handle: self.task_handle.clone(),
277 shutdown_tx: self.shutdown_tx.clone(),
278 bootstrap_provider: self.bootstrap_provider.clone(),
279 }
280 }
281
282 /// Set the bootstrap provider for this source, taking ownership.
283 ///
284 /// Call this after creating the SourceBase if the source plugin supports bootstrapping.
285 /// The bootstrap provider is created by the plugin using its own configuration.
286 ///
287 /// # Example
288 /// ```ignore
289 /// let provider = MyBootstrapProvider::new(config);
290 /// source_base.set_bootstrap_provider(provider).await; // Ownership transferred
291 /// ```
292 pub async fn set_bootstrap_provider(&self, provider: impl BootstrapProvider + 'static) {
293 *self.bootstrap_provider.write().await = Some(Arc::new(provider));
294 }
295
296 /// Get the source ID
297 pub fn get_id(&self) -> &str {
298 &self.id
299 }
300
301 /// Create a streaming receiver for a query subscription
302 ///
303 /// This creates the appropriate receiver based on the configured dispatch mode:
304 /// - Broadcast mode: Returns a receiver from the shared broadcast dispatcher
305 /// - Channel mode: Creates a new dedicated dispatcher and returns its receiver
306 ///
307 /// This is a helper method that can be used by sources with custom subscribe logic.
308 pub async fn create_streaming_receiver(
309 &self,
310 ) -> Result<Box<dyn ChangeReceiver<SourceEventWrapper>>> {
311 let receiver: Box<dyn ChangeReceiver<SourceEventWrapper>> = match self.dispatch_mode {
312 DispatchMode::Broadcast => {
313 // For broadcast mode, use the single dispatcher
314 let dispatchers = self.dispatchers.read().await;
315 if let Some(dispatcher) = dispatchers.first() {
316 dispatcher.create_receiver().await?
317 } else {
318 return Err(anyhow::anyhow!("No broadcast dispatcher available"));
319 }
320 }
321 DispatchMode::Channel => {
322 // For channel mode, create a new dispatcher for this subscription
323 let dispatcher = ChannelChangeDispatcher::<SourceEventWrapper>::new(
324 self.dispatch_buffer_capacity,
325 );
326 let receiver = dispatcher.create_receiver().await?;
327
328 // Add the new dispatcher to our list
329 let mut dispatchers = self.dispatchers.write().await;
330 dispatchers.push(Box::new(dispatcher));
331
332 receiver
333 }
334 };
335
336 Ok(receiver)
337 }
338
339 /// Subscribe to this source with optional bootstrap
340 ///
341 /// This is the standard subscribe implementation that all sources can use.
342 /// It handles:
343 /// - Creating a receiver for streaming events (based on dispatch mode)
344 /// - Setting up bootstrap if requested and a provider has been set
345 /// - Returning the appropriate SubscriptionResponse
346 pub async fn subscribe_with_bootstrap(
347 &self,
348 settings: &crate::config::SourceSubscriptionSettings,
349 source_type: &str,
350 ) -> Result<SubscriptionResponse> {
351 info!(
352 "Query '{}' subscribing to {} source '{}' (bootstrap: {})",
353 settings.query_id, source_type, self.id, settings.enable_bootstrap
354 );
355
356 // Create streaming receiver using helper method
357 let receiver = self.create_streaming_receiver().await?;
358
359 let query_id_for_response = settings.query_id.clone();
360
361 // Handle bootstrap if requested and bootstrap provider is configured
362 let bootstrap_receiver = if settings.enable_bootstrap {
363 self.handle_bootstrap_subscription(settings, source_type)
364 .await?
365 } else {
366 None
367 };
368
369 Ok(SubscriptionResponse {
370 query_id: query_id_for_response,
371 source_id: self.id.clone(),
372 receiver,
373 bootstrap_receiver,
374 })
375 }
376
377 /// Handle bootstrap subscription logic
378 async fn handle_bootstrap_subscription(
379 &self,
380 settings: &crate::config::SourceSubscriptionSettings,
381 source_type: &str,
382 ) -> Result<Option<BootstrapEventReceiver>> {
383 let provider_guard = self.bootstrap_provider.read().await;
384 if let Some(provider) = provider_guard.clone() {
385 drop(provider_guard); // Release lock before spawning task
386
387 info!(
388 "Creating bootstrap for query '{}' on {} source '{}'",
389 settings.query_id, source_type, self.id
390 );
391
392 // Create bootstrap context
393 let context = BootstrapContext::new_minimal(
394 self.id.clone(), // server_id
395 self.id.clone(), // source_id
396 );
397
398 // Create bootstrap channel
399 let (bootstrap_tx, bootstrap_rx) = tokio::sync::mpsc::channel(1000);
400
401 // Convert HashSet to Vec for backward compatibility with BootstrapRequest
402 let node_labels: Vec<String> = settings.nodes.iter().cloned().collect();
403 let relation_labels: Vec<String> = settings.relations.iter().cloned().collect();
404
405 // Create bootstrap request with request_id
406 let request = BootstrapRequest {
407 query_id: settings.query_id.clone(),
408 node_labels,
409 relation_labels,
410 request_id: format!("{}-{}", settings.query_id, uuid::Uuid::new_v4()),
411 };
412
413 // Clone settings for the async task
414 let settings_clone = settings.clone();
415 let source_id = self.id.clone();
416
417 // Get instance_id from context for log routing isolation
418 let instance_id = self
419 .context()
420 .await
421 .map(|c| c.instance_id.clone())
422 .unwrap_or_default();
423
424 // Spawn bootstrap task with tracing span for proper log routing
425 let span = tracing::info_span!(
426 "source_bootstrap",
427 instance_id = %instance_id,
428 component_id = %source_id,
429 component_type = "source"
430 );
431 tokio::spawn(
432 async move {
433 match provider
434 .bootstrap(request, &context, bootstrap_tx, Some(&settings_clone))
435 .await
436 {
437 Ok(count) => {
438 info!(
439 "Bootstrap completed successfully for query '{}', sent {count} events",
440 settings_clone.query_id
441 );
442 }
443 Err(e) => {
444 error!(
445 "Bootstrap failed for query '{}': {e}",
446 settings_clone.query_id
447 );
448 }
449 }
450 }
451 .instrument(span),
452 );
453
454 Ok(Some(bootstrap_rx))
455 } else {
456 info!(
457 "Bootstrap requested for query '{}' but no bootstrap provider configured for {} source '{}'",
458 settings.query_id, source_type, self.id
459 );
460 Ok(None)
461 }
462 }
463
464 /// Dispatch a SourceChange event with profiling metadata
465 ///
466 /// This method handles the common pattern of:
467 /// - Creating profiling metadata with timestamp
468 /// - Wrapping the change in a SourceEventWrapper
469 /// - Dispatching to all subscribers
470 /// - Handling the no-subscriber case gracefully
471 pub async fn dispatch_source_change(&self, change: SourceChange) -> Result<()> {
472 // Create profiling metadata
473 let mut profiling = profiling::ProfilingMetadata::new();
474 profiling.source_send_ns = Some(profiling::timestamp_ns());
475
476 // Create event wrapper
477 let wrapper = SourceEventWrapper::with_profiling(
478 self.id.clone(),
479 SourceEvent::Change(change),
480 chrono::Utc::now(),
481 profiling,
482 );
483
484 // Dispatch event
485 self.dispatch_event(wrapper).await
486 }
487
488 /// Dispatch a SourceEventWrapper to all subscribers
489 ///
490 /// This is a generic method for dispatching any SourceEvent.
491 /// It handles Arc-wrapping for zero-copy sharing and logs
492 /// when there are no subscribers.
493 pub async fn dispatch_event(&self, wrapper: SourceEventWrapper) -> Result<()> {
494 debug!("[{}] Dispatching event: {:?}", self.id, &wrapper);
495
496 // Arc-wrap for zero-copy sharing across dispatchers
497 let arc_wrapper = Arc::new(wrapper);
498
499 // Send to all dispatchers
500 let dispatchers = self.dispatchers.read().await;
501 for dispatcher in dispatchers.iter() {
502 if let Err(e) = dispatcher.dispatch_change(arc_wrapper.clone()).await {
503 debug!("[{}] Failed to dispatch event: {}", self.id, e);
504 }
505 }
506
507 Ok(())
508 }
509
510 /// Broadcast SourceControl events
511 pub async fn broadcast_control(&self, control: SourceControl) -> Result<()> {
512 let wrapper = SourceEventWrapper::new(
513 self.id.clone(),
514 SourceEvent::Control(control),
515 chrono::Utc::now(),
516 );
517 self.dispatch_event(wrapper).await
518 }
519
520 /// Create a test subscription to this source (synchronous wrapper)
521 ///
522 /// This method is intended for use in tests to receive events from the source.
523 /// It properly handles both Broadcast and Channel dispatch modes by delegating
524 /// to `create_streaming_receiver()`, making the dispatch mode transparent to tests.
525 ///
526 /// Note: This is a synchronous wrapper that uses `tokio::task::block_in_place` internally.
527 /// For async contexts, prefer calling `create_streaming_receiver()` directly.
528 ///
529 /// # Returns
530 /// A receiver that will receive all events dispatched by this source
531 ///
532 /// # Panics
533 /// Panics if the receiver cannot be created (e.g., internal error)
534 pub fn test_subscribe(&self) -> Box<dyn ChangeReceiver<SourceEventWrapper>> {
535 // Use block_in_place to avoid nested executor issues in async tests
536 tokio::task::block_in_place(|| {
537 tokio::runtime::Handle::current().block_on(self.create_streaming_receiver())
538 })
539 .expect("Failed to create test subscription receiver")
540 }
541
542 /// Helper function to dispatch events from spawned tasks
543 ///
544 /// This is a static helper that can be used from spawned async tasks that don't
545 /// have access to `self`. It manually iterates through dispatchers and sends the event.
546 ///
547 /// For code that has access to `&self`, prefer using `dispatch_event()` instead.
548 ///
549 /// # Arguments
550 /// * `dispatchers` - Arc to the dispatchers list (from `self.base.dispatchers.clone()`)
551 /// * `wrapper` - The event wrapper to dispatch
552 /// * `source_id` - Source ID for logging
553 pub async fn dispatch_from_task(
554 dispatchers: Arc<RwLock<Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>>>>,
555 wrapper: SourceEventWrapper,
556 source_id: &str,
557 ) -> Result<()> {
558 debug!(
559 "[{}] Dispatching event from task: {:?}",
560 source_id, &wrapper
561 );
562
563 // Arc-wrap for zero-copy sharing across dispatchers
564 let arc_wrapper = Arc::new(wrapper);
565
566 // Send to all dispatchers
567 let dispatchers_guard = dispatchers.read().await;
568 for dispatcher in dispatchers_guard.iter() {
569 if let Err(e) = dispatcher.dispatch_change(arc_wrapper.clone()).await {
570 debug!("[{source_id}] Failed to dispatch event from task: {e}");
571 }
572 }
573
574 Ok(())
575 }
576
577 /// Handle common stop functionality
578 pub async fn stop_common(&self) -> Result<()> {
579 info!("Stopping source '{}'", self.id);
580
581 // Send shutdown signal if we have one
582 if let Some(tx) = self.shutdown_tx.write().await.take() {
583 let _ = tx.send(());
584 }
585
586 // Wait for task to complete
587 if let Some(handle) = self.task_handle.write().await.take() {
588 match tokio::time::timeout(std::time::Duration::from_secs(5), handle).await {
589 Ok(Ok(())) => {
590 info!("Source '{}' task completed successfully", self.id);
591 }
592 Ok(Err(e)) => {
593 error!("Source '{}' task panicked: {}", self.id, e);
594 }
595 Err(_) => {
596 error!("Source '{}' task did not complete within timeout", self.id);
597 }
598 }
599 }
600
601 *self.status.write().await = ComponentStatus::Stopped;
602 info!("Source '{}' stopped", self.id);
603 Ok(())
604 }
605
606 /// Clear the source's state store partition.
607 ///
608 /// This is called during deprovision to remove all persisted state
609 /// associated with this source. Sources that override `deprovision()`
610 /// can call this to clean up their state store.
611 pub async fn deprovision_common(&self) -> Result<()> {
612 info!("Deprovisioning source '{}'", self.id);
613 if let Some(store) = self.state_store().await {
614 let count = store.clear_store(&self.id).await.map_err(|e| {
615 anyhow::anyhow!(
616 "Failed to clear state store for source '{}': {}",
617 self.id,
618 e
619 )
620 })?;
621 info!(
622 "Cleared {} keys from state store for source '{}'",
623 count, self.id
624 );
625 }
626 Ok(())
627 }
628
629 /// Get the current status
630 pub async fn get_status(&self) -> ComponentStatus {
631 self.status.read().await.clone()
632 }
633
634 /// Set the current status
635 pub async fn set_status(&self, status: ComponentStatus) {
636 *self.status.write().await = status;
637 }
638
639 /// Transition to a new status and send event
640 pub async fn set_status_with_event(
641 &self,
642 status: ComponentStatus,
643 message: Option<String>,
644 ) -> Result<()> {
645 *self.status.write().await = status.clone();
646 self.send_component_event(status, message).await
647 }
648
649 /// Set the task handle
650 pub async fn set_task_handle(&self, handle: tokio::task::JoinHandle<()>) {
651 *self.task_handle.write().await = Some(handle);
652 }
653
654 /// Set the shutdown sender
655 pub async fn set_shutdown_tx(&self, tx: tokio::sync::oneshot::Sender<()>) {
656 *self.shutdown_tx.write().await = Some(tx);
657 }
658
659 /// Send a component event
660 ///
661 /// If the status channel has not been initialized yet, this method silently
662 /// succeeds without sending anything. This allows sources to be used
663 /// in a standalone fashion without DrasiLib if needed.
664 pub async fn send_component_event(
665 &self,
666 status: ComponentStatus,
667 message: Option<String>,
668 ) -> Result<()> {
669 let event = ComponentEvent {
670 component_id: self.id.clone(),
671 component_type: ComponentType::Source,
672 status,
673 timestamp: chrono::Utc::now(),
674 message,
675 };
676
677 if let Some(ref tx) = *self.status_tx.read().await {
678 if let Err(e) = tx.send(event).await {
679 error!("Failed to send component event: {e}");
680 }
681 }
682 // If status_tx is None, silently skip - initialization happens before start()
683 Ok(())
684 }
685}