Skip to main content

drasi_lib/sources/
base.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Base implementation for common source functionality.
16//!
17//! This module provides `SourceBase` which encapsulates common patterns
18//! used across all source implementations:
19//! - Dispatcher setup and management
20//! - Bootstrap subscription handling
21//! - Event dispatching with profiling
22//! - Component lifecycle management
23//!
24//! # Plugin Architecture
25//!
26//! SourceBase is designed to be used by source plugins. Each plugin:
27//! 1. Defines its own typed configuration struct
28//! 2. Creates a SourceBase with SourceBaseParams
29//! 3. Optionally provides a bootstrap provider via `set_bootstrap_provider()`
30//! 4. Implements the Source trait delegating to SourceBase methods
31
32use anyhow::Result;
33use log::{debug, error, info};
34use std::sync::Arc;
35use tokio::sync::RwLock;
36use tracing::Instrument;
37
38use crate::bootstrap::{BootstrapContext, BootstrapProvider, BootstrapRequest};
39use crate::channels::*;
40use crate::context::SourceRuntimeContext;
41use crate::identity::IdentityProvider;
42use crate::profiling;
43use crate::state_store::StateStoreProvider;
44use drasi_core::models::SourceChange;
45
46/// Parameters for creating a SourceBase instance.
47///
48/// This struct contains only the information that SourceBase needs to function.
49/// Plugin-specific configuration should remain in the plugin crate.
50///
51/// # Example
52///
53/// ```ignore
54/// use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
55///
56/// let params = SourceBaseParams::new("my-source")
57///     .with_dispatch_mode(DispatchMode::Channel)
58///     .with_dispatch_buffer_capacity(2000)
59///     .with_bootstrap_provider(my_provider);
60///
61/// let base = SourceBase::new(params)?;
62/// ```
63pub struct SourceBaseParams {
64    /// Unique identifier for the source
65    pub id: String,
66    /// Dispatch mode (Broadcast or Channel) - defaults to Channel
67    pub dispatch_mode: Option<DispatchMode>,
68    /// Dispatch buffer capacity - defaults to 1000
69    pub dispatch_buffer_capacity: Option<usize>,
70    /// Optional bootstrap provider to set during construction
71    pub bootstrap_provider: Option<Box<dyn BootstrapProvider + 'static>>,
72    /// Whether this source should auto-start - defaults to true
73    pub auto_start: bool,
74}
75
76impl std::fmt::Debug for SourceBaseParams {
77    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78        f.debug_struct("SourceBaseParams")
79            .field("id", &self.id)
80            .field("dispatch_mode", &self.dispatch_mode)
81            .field("dispatch_buffer_capacity", &self.dispatch_buffer_capacity)
82            .field(
83                "bootstrap_provider",
84                &self.bootstrap_provider.as_ref().map(|_| "<provider>"),
85            )
86            .field("auto_start", &self.auto_start)
87            .finish()
88    }
89}
90
91impl SourceBaseParams {
92    /// Create new params with just an ID, using defaults for everything else
93    pub fn new(id: impl Into<String>) -> Self {
94        Self {
95            id: id.into(),
96            dispatch_mode: None,
97            dispatch_buffer_capacity: None,
98            bootstrap_provider: None,
99            auto_start: true,
100        }
101    }
102
103    /// Set the dispatch mode
104    pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
105        self.dispatch_mode = Some(mode);
106        self
107    }
108
109    /// Set the dispatch buffer capacity
110    pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
111        self.dispatch_buffer_capacity = Some(capacity);
112        self
113    }
114
115    /// Set the bootstrap provider
116    ///
117    /// This provider will be used during source subscription to deliver
118    /// initial data to queries that request bootstrap.
119    pub fn with_bootstrap_provider(mut self, provider: impl BootstrapProvider + 'static) -> Self {
120        self.bootstrap_provider = Some(Box::new(provider));
121        self
122    }
123
124    /// Set whether this source should auto-start
125    ///
126    /// Default is `true`. Set to `false` if this source should only be
127    /// started manually via `start_source()`.
128    pub fn with_auto_start(mut self, auto_start: bool) -> Self {
129        self.auto_start = auto_start;
130        self
131    }
132}
133
134/// Base implementation for common source functionality
135pub struct SourceBase {
136    /// Source identifier
137    pub id: String,
138    /// Dispatch mode setting
139    dispatch_mode: DispatchMode,
140    /// Dispatch buffer capacity
141    dispatch_buffer_capacity: usize,
142    /// Whether this source should auto-start
143    pub auto_start: bool,
144    /// Current component status
145    pub status: Arc<RwLock<ComponentStatus>>,
146    /// Dispatchers for sending source events to subscribers
147    ///
148    /// This is a vector of dispatchers that send source events to all registered
149    /// subscribers (queries). When a source produces a change event, it broadcasts
150    /// it to all dispatchers in this vector.
151    pub dispatchers: Arc<RwLock<Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>>>>,
152    /// Runtime context (set by initialize())
153    context: Arc<RwLock<Option<SourceRuntimeContext>>>,
154    /// Channel for sending component status events (extracted from context for convenience)
155    status_tx: Arc<RwLock<Option<ComponentEventSender>>>,
156    /// State store provider (extracted from context for convenience)
157    state_store: Arc<RwLock<Option<Arc<dyn StateStoreProvider>>>>,
158    /// Handle to the source's main task
159    pub task_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
160    /// Sender for shutdown signal
161    pub shutdown_tx: Arc<RwLock<Option<tokio::sync::oneshot::Sender<()>>>>,
162    /// Optional bootstrap provider - plugins set this if they support bootstrap
163    bootstrap_provider: Arc<RwLock<Option<Arc<dyn BootstrapProvider>>>>,
164    /// Optional identity provider for credential management.
165    /// Set either programmatically (via `set_identity_provider`) or automatically
166    /// from the runtime context during `initialize()`.
167    identity_provider: Arc<RwLock<Option<Arc<dyn IdentityProvider>>>>,
168}
169
170impl SourceBase {
171    /// Create a new SourceBase with the given parameters
172    ///
173    /// The status channel is not required during construction - it will be
174    /// provided via the `SourceRuntimeContext` when `initialize()` is called.
175    ///
176    /// If a bootstrap provider is specified in params, it will be set during
177    /// construction (no async needed since nothing is shared yet).
178    pub fn new(params: SourceBaseParams) -> Result<Self> {
179        // Determine dispatch mode (default to Channel if not specified)
180        let dispatch_mode = params.dispatch_mode.unwrap_or_default();
181        let dispatch_buffer_capacity = params.dispatch_buffer_capacity.unwrap_or(1000);
182
183        // Set up initial dispatchers based on dispatch mode
184        let mut dispatchers: Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>> =
185            Vec::new();
186
187        if dispatch_mode == DispatchMode::Broadcast {
188            // For broadcast mode, create a single broadcast dispatcher
189            let dispatcher =
190                BroadcastChangeDispatcher::<SourceEventWrapper>::new(dispatch_buffer_capacity);
191            dispatchers.push(Box::new(dispatcher));
192        }
193        // For channel mode, dispatchers will be created on-demand when subscribing
194
195        // Initialize bootstrap provider if provided (no async needed at construction time)
196        let bootstrap_provider = params
197            .bootstrap_provider
198            .map(|p| Arc::from(p) as Arc<dyn BootstrapProvider>);
199
200        Ok(Self {
201            id: params.id,
202            dispatch_mode,
203            dispatch_buffer_capacity,
204            auto_start: params.auto_start,
205            status: Arc::new(RwLock::new(ComponentStatus::Stopped)),
206            dispatchers: Arc::new(RwLock::new(dispatchers)),
207            context: Arc::new(RwLock::new(None)), // Set by initialize()
208            status_tx: Arc::new(RwLock::new(None)), // Extracted from context
209            state_store: Arc::new(RwLock::new(None)), // Extracted from context
210            task_handle: Arc::new(RwLock::new(None)),
211            shutdown_tx: Arc::new(RwLock::new(None)),
212            bootstrap_provider: Arc::new(RwLock::new(bootstrap_provider)),
213            identity_provider: Arc::new(RwLock::new(None)),
214        })
215    }
216
217    /// Get whether this source should auto-start
218    pub fn get_auto_start(&self) -> bool {
219        self.auto_start
220    }
221
222    /// Initialize the source with runtime context.
223    ///
224    /// This method is called automatically by DrasiLib's `add_source()` method.
225    /// Plugin developers do not need to call this directly.
226    ///
227    /// The context provides access to:
228    /// - `source_id`: The source's unique identifier
229    /// - `status_tx`: Channel for reporting component status events
230    /// - `state_store`: Optional persistent state storage
231    pub async fn initialize(&self, context: SourceRuntimeContext) {
232        // Store context for later use
233        *self.context.write().await = Some(context.clone());
234
235        // Extract services for convenience
236        *self.status_tx.write().await = Some(context.status_tx.clone());
237
238        if let Some(state_store) = context.state_store.as_ref() {
239            *self.state_store.write().await = Some(state_store.clone());
240        }
241
242        // Store identity provider from context if not already set programmatically
243        if let Some(ip) = context.identity_provider.as_ref() {
244            let mut guard = self.identity_provider.write().await;
245            if guard.is_none() {
246                *guard = Some(ip.clone());
247            }
248        }
249    }
250
251    /// Get the runtime context if initialized.
252    ///
253    /// Returns `None` if `initialize()` has not been called yet.
254    pub async fn context(&self) -> Option<SourceRuntimeContext> {
255        self.context.read().await.clone()
256    }
257
258    /// Get the state store if configured.
259    ///
260    /// Returns `None` if no state store was provided in the context.
261    pub async fn state_store(&self) -> Option<Arc<dyn StateStoreProvider>> {
262        self.state_store.read().await.clone()
263    }
264
265    /// Get the identity provider if set.
266    ///
267    /// Returns the identity provider set either programmatically via
268    /// `set_identity_provider()` or from the runtime context during `initialize()`.
269    /// Programmatically-set providers take precedence over context providers.
270    pub async fn identity_provider(&self) -> Option<Arc<dyn IdentityProvider>> {
271        self.identity_provider.read().await.clone()
272    }
273
274    /// Set the identity provider programmatically.
275    ///
276    /// This is typically called during source construction when the provider
277    /// is available from configuration (e.g., `with_identity_provider()` builder).
278    /// Providers set this way take precedence over context-injected providers.
279    pub async fn set_identity_provider(&self, provider: Arc<dyn IdentityProvider>) {
280        *self.identity_provider.write().await = Some(provider);
281    }
282
283    /// Get the status channel Arc for internal use by spawned tasks
284    ///
285    /// This returns the internal status_tx wrapped in Arc<RwLock<Option<...>>>
286    /// which allows background tasks to send component status events.
287    ///
288    /// Returns a clone of the Arc that can be moved into spawned tasks.
289    pub fn status_tx(&self) -> Arc<RwLock<Option<ComponentEventSender>>> {
290        self.status_tx.clone()
291    }
292
293    /// Clone the SourceBase with shared Arc references
294    ///
295    /// This creates a new SourceBase that shares the same underlying
296    /// data through Arc references. Useful for passing to spawned tasks.
297    pub fn clone_shared(&self) -> Self {
298        Self {
299            id: self.id.clone(),
300            dispatch_mode: self.dispatch_mode,
301            dispatch_buffer_capacity: self.dispatch_buffer_capacity,
302            auto_start: self.auto_start,
303            status: self.status.clone(),
304            dispatchers: self.dispatchers.clone(),
305            context: self.context.clone(),
306            status_tx: self.status_tx.clone(),
307            state_store: self.state_store.clone(),
308            task_handle: self.task_handle.clone(),
309            shutdown_tx: self.shutdown_tx.clone(),
310            bootstrap_provider: self.bootstrap_provider.clone(),
311            identity_provider: self.identity_provider.clone(),
312        }
313    }
314
315    /// Set the bootstrap provider for this source, taking ownership.
316    ///
317    /// Call this after creating the SourceBase if the source plugin supports bootstrapping.
318    /// The bootstrap provider is created by the plugin using its own configuration.
319    ///
320    /// # Example
321    /// ```ignore
322    /// let provider = MyBootstrapProvider::new(config);
323    /// source_base.set_bootstrap_provider(provider).await;  // Ownership transferred
324    /// ```
325    pub async fn set_bootstrap_provider(&self, provider: impl BootstrapProvider + 'static) {
326        *self.bootstrap_provider.write().await = Some(Arc::new(provider));
327    }
328
329    /// Get the source ID
330    pub fn get_id(&self) -> &str {
331        &self.id
332    }
333
334    /// Create a streaming receiver for a query subscription
335    ///
336    /// This creates the appropriate receiver based on the configured dispatch mode:
337    /// - Broadcast mode: Returns a receiver from the shared broadcast dispatcher
338    /// - Channel mode: Creates a new dedicated dispatcher and returns its receiver
339    ///
340    /// This is a helper method that can be used by sources with custom subscribe logic.
341    pub async fn create_streaming_receiver(
342        &self,
343    ) -> Result<Box<dyn ChangeReceiver<SourceEventWrapper>>> {
344        let receiver: Box<dyn ChangeReceiver<SourceEventWrapper>> = match self.dispatch_mode {
345            DispatchMode::Broadcast => {
346                // For broadcast mode, use the single dispatcher
347                let dispatchers = self.dispatchers.read().await;
348                if let Some(dispatcher) = dispatchers.first() {
349                    dispatcher.create_receiver().await?
350                } else {
351                    return Err(anyhow::anyhow!("No broadcast dispatcher available"));
352                }
353            }
354            DispatchMode::Channel => {
355                // For channel mode, create a new dispatcher for this subscription
356                let dispatcher = ChannelChangeDispatcher::<SourceEventWrapper>::new(
357                    self.dispatch_buffer_capacity,
358                );
359                let receiver = dispatcher.create_receiver().await?;
360
361                // Add the new dispatcher to our list
362                let mut dispatchers = self.dispatchers.write().await;
363                dispatchers.push(Box::new(dispatcher));
364
365                receiver
366            }
367        };
368
369        Ok(receiver)
370    }
371
372    /// Subscribe to this source with optional bootstrap
373    ///
374    /// This is the standard subscribe implementation that all sources can use.
375    /// It handles:
376    /// - Creating a receiver for streaming events (based on dispatch mode)
377    /// - Setting up bootstrap if requested and a provider has been set
378    /// - Returning the appropriate SubscriptionResponse
379    pub async fn subscribe_with_bootstrap(
380        &self,
381        settings: &crate::config::SourceSubscriptionSettings,
382        source_type: &str,
383    ) -> Result<SubscriptionResponse> {
384        info!(
385            "Query '{}' subscribing to {} source '{}' (bootstrap: {})",
386            settings.query_id, source_type, self.id, settings.enable_bootstrap
387        );
388
389        // Create streaming receiver using helper method
390        let receiver = self.create_streaming_receiver().await?;
391
392        let query_id_for_response = settings.query_id.clone();
393
394        // Handle bootstrap if requested and bootstrap provider is configured
395        let bootstrap_receiver = if settings.enable_bootstrap {
396            self.handle_bootstrap_subscription(settings, source_type)
397                .await?
398        } else {
399            None
400        };
401
402        Ok(SubscriptionResponse {
403            query_id: query_id_for_response,
404            source_id: self.id.clone(),
405            receiver,
406            bootstrap_receiver,
407        })
408    }
409
410    /// Handle bootstrap subscription logic
411    async fn handle_bootstrap_subscription(
412        &self,
413        settings: &crate::config::SourceSubscriptionSettings,
414        source_type: &str,
415    ) -> Result<Option<BootstrapEventReceiver>> {
416        let provider_guard = self.bootstrap_provider.read().await;
417        if let Some(provider) = provider_guard.clone() {
418            drop(provider_guard); // Release lock before spawning task
419
420            info!(
421                "Creating bootstrap for query '{}' on {} source '{}'",
422                settings.query_id, source_type, self.id
423            );
424
425            // Create bootstrap context
426            let context = BootstrapContext::new_minimal(
427                self.id.clone(), // server_id
428                self.id.clone(), // source_id
429            );
430
431            // Create bootstrap channel
432            let (bootstrap_tx, bootstrap_rx) = tokio::sync::mpsc::channel(1000);
433
434            // Convert HashSet to Vec for backward compatibility with BootstrapRequest
435            let node_labels: Vec<String> = settings.nodes.iter().cloned().collect();
436            let relation_labels: Vec<String> = settings.relations.iter().cloned().collect();
437
438            // Create bootstrap request with request_id
439            let request = BootstrapRequest {
440                query_id: settings.query_id.clone(),
441                node_labels,
442                relation_labels,
443                request_id: format!("{}-{}", settings.query_id, uuid::Uuid::new_v4()),
444            };
445
446            // Clone settings for the async task
447            let settings_clone = settings.clone();
448            let source_id = self.id.clone();
449
450            // Get instance_id from context for log routing isolation
451            let instance_id = self
452                .context()
453                .await
454                .map(|c| c.instance_id.clone())
455                .unwrap_or_default();
456
457            // Spawn bootstrap task with tracing span for proper log routing
458            let span = tracing::info_span!(
459                "source_bootstrap",
460                instance_id = %instance_id,
461                component_id = %source_id,
462                component_type = "source"
463            );
464            tokio::spawn(
465                async move {
466                    match provider
467                        .bootstrap(request, &context, bootstrap_tx, Some(&settings_clone))
468                        .await
469                    {
470                        Ok(count) => {
471                            info!(
472                                "Bootstrap completed successfully for query '{}', sent {count} events",
473                                settings_clone.query_id
474                            );
475                        }
476                        Err(e) => {
477                            error!(
478                                "Bootstrap failed for query '{}': {e}",
479                                settings_clone.query_id
480                            );
481                        }
482                    }
483                }
484                .instrument(span),
485            );
486
487            Ok(Some(bootstrap_rx))
488        } else {
489            info!(
490                "Bootstrap requested for query '{}' but no bootstrap provider configured for {} source '{}'",
491                settings.query_id, source_type, self.id
492            );
493            Ok(None)
494        }
495    }
496
497    /// Dispatch a SourceChange event with profiling metadata
498    ///
499    /// This method handles the common pattern of:
500    /// - Creating profiling metadata with timestamp
501    /// - Wrapping the change in a SourceEventWrapper
502    /// - Dispatching to all subscribers
503    /// - Handling the no-subscriber case gracefully
504    pub async fn dispatch_source_change(&self, change: SourceChange) -> Result<()> {
505        // Create profiling metadata
506        let mut profiling = profiling::ProfilingMetadata::new();
507        profiling.source_send_ns = Some(profiling::timestamp_ns());
508
509        // Create event wrapper
510        let wrapper = SourceEventWrapper::with_profiling(
511            self.id.clone(),
512            SourceEvent::Change(change),
513            chrono::Utc::now(),
514            profiling,
515        );
516
517        // Dispatch event
518        self.dispatch_event(wrapper).await
519    }
520
521    /// Dispatch a SourceEventWrapper to all subscribers
522    ///
523    /// This is a generic method for dispatching any SourceEvent.
524    /// It handles Arc-wrapping for zero-copy sharing and logs
525    /// when there are no subscribers.
526    pub async fn dispatch_event(&self, wrapper: SourceEventWrapper) -> Result<()> {
527        debug!("[{}] Dispatching event: {:?}", self.id, &wrapper);
528
529        // Arc-wrap for zero-copy sharing across dispatchers
530        let arc_wrapper = Arc::new(wrapper);
531
532        // Send to all dispatchers
533        let dispatchers = self.dispatchers.read().await;
534        for dispatcher in dispatchers.iter() {
535            if let Err(e) = dispatcher.dispatch_change(arc_wrapper.clone()).await {
536                debug!("[{}] Failed to dispatch event: {}", self.id, e);
537            }
538        }
539
540        Ok(())
541    }
542
543    /// Broadcast SourceControl events
544    pub async fn broadcast_control(&self, control: SourceControl) -> Result<()> {
545        let wrapper = SourceEventWrapper::new(
546            self.id.clone(),
547            SourceEvent::Control(control),
548            chrono::Utc::now(),
549        );
550        self.dispatch_event(wrapper).await
551    }
552
553    /// Create a test subscription to this source (synchronous wrapper)
554    ///
555    /// This method is intended for use in tests to receive events from the source.
556    /// It properly handles both Broadcast and Channel dispatch modes by delegating
557    /// to `create_streaming_receiver()`, making the dispatch mode transparent to tests.
558    ///
559    /// Note: This is a synchronous wrapper that uses `tokio::task::block_in_place` internally.
560    /// For async contexts, prefer calling `create_streaming_receiver()` directly.
561    ///
562    /// # Returns
563    /// A receiver that will receive all events dispatched by this source
564    ///
565    /// # Panics
566    /// Panics if the receiver cannot be created (e.g., internal error)
567    pub fn test_subscribe(&self) -> Box<dyn ChangeReceiver<SourceEventWrapper>> {
568        // Use block_in_place to avoid nested executor issues in async tests
569        tokio::task::block_in_place(|| {
570            tokio::runtime::Handle::current().block_on(self.create_streaming_receiver())
571        })
572        .expect("Failed to create test subscription receiver")
573    }
574
575    /// Helper function to dispatch events from spawned tasks
576    ///
577    /// This is a static helper that can be used from spawned async tasks that don't
578    /// have access to `self`. It manually iterates through dispatchers and sends the event.
579    ///
580    /// For code that has access to `&self`, prefer using `dispatch_event()` instead.
581    ///
582    /// # Arguments
583    /// * `dispatchers` - Arc to the dispatchers list (from `self.base.dispatchers.clone()`)
584    /// * `wrapper` - The event wrapper to dispatch
585    /// * `source_id` - Source ID for logging
586    pub async fn dispatch_from_task(
587        dispatchers: Arc<RwLock<Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>>>>,
588        wrapper: SourceEventWrapper,
589        source_id: &str,
590    ) -> Result<()> {
591        debug!(
592            "[{}] Dispatching event from task: {:?}",
593            source_id, &wrapper
594        );
595
596        // Arc-wrap for zero-copy sharing across dispatchers
597        let arc_wrapper = Arc::new(wrapper);
598
599        // Send to all dispatchers
600        let dispatchers_guard = dispatchers.read().await;
601        for dispatcher in dispatchers_guard.iter() {
602            if let Err(e) = dispatcher.dispatch_change(arc_wrapper.clone()).await {
603                debug!("[{source_id}] Failed to dispatch event from task: {e}");
604            }
605        }
606
607        Ok(())
608    }
609
610    /// Handle common stop functionality
611    pub async fn stop_common(&self) -> Result<()> {
612        info!("Stopping source '{}'", self.id);
613
614        // Send shutdown signal if we have one
615        if let Some(tx) = self.shutdown_tx.write().await.take() {
616            let _ = tx.send(());
617        }
618
619        // Wait for task to complete
620        if let Some(handle) = self.task_handle.write().await.take() {
621            match tokio::time::timeout(std::time::Duration::from_secs(5), handle).await {
622                Ok(Ok(())) => {
623                    info!("Source '{}' task completed successfully", self.id);
624                }
625                Ok(Err(e)) => {
626                    error!("Source '{}' task panicked: {}", self.id, e);
627                }
628                Err(_) => {
629                    error!("Source '{}' task did not complete within timeout", self.id);
630                }
631            }
632        }
633
634        *self.status.write().await = ComponentStatus::Stopped;
635        info!("Source '{}' stopped", self.id);
636        Ok(())
637    }
638
639    /// Clear the source's state store partition.
640    ///
641    /// This is called during deprovision to remove all persisted state
642    /// associated with this source. Sources that override `deprovision()`
643    /// can call this to clean up their state store.
644    pub async fn deprovision_common(&self) -> Result<()> {
645        info!("Deprovisioning source '{}'", self.id);
646        if let Some(store) = self.state_store().await {
647            let count = store.clear_store(&self.id).await.map_err(|e| {
648                anyhow::anyhow!(
649                    "Failed to clear state store for source '{}': {}",
650                    self.id,
651                    e
652                )
653            })?;
654            info!(
655                "Cleared {} keys from state store for source '{}'",
656                count, self.id
657            );
658        }
659        Ok(())
660    }
661
662    /// Get the current status
663    pub async fn get_status(&self) -> ComponentStatus {
664        self.status.read().await.clone()
665    }
666
667    /// Set the current status
668    pub async fn set_status(&self, status: ComponentStatus) {
669        *self.status.write().await = status;
670    }
671
672    /// Transition to a new status and send event
673    pub async fn set_status_with_event(
674        &self,
675        status: ComponentStatus,
676        message: Option<String>,
677    ) -> Result<()> {
678        *self.status.write().await = status.clone();
679        self.send_component_event(status, message).await
680    }
681
682    /// Set the task handle
683    pub async fn set_task_handle(&self, handle: tokio::task::JoinHandle<()>) {
684        *self.task_handle.write().await = Some(handle);
685    }
686
687    /// Set the shutdown sender
688    pub async fn set_shutdown_tx(&self, tx: tokio::sync::oneshot::Sender<()>) {
689        *self.shutdown_tx.write().await = Some(tx);
690    }
691
692    /// Send a component event
693    ///
694    /// If the status channel has not been initialized yet, this method silently
695    /// succeeds without sending anything. This allows sources to be used
696    /// in a standalone fashion without DrasiLib if needed.
697    pub async fn send_component_event(
698        &self,
699        status: ComponentStatus,
700        message: Option<String>,
701    ) -> Result<()> {
702        let event = ComponentEvent {
703            component_id: self.id.clone(),
704            component_type: ComponentType::Source,
705            status,
706            timestamp: chrono::Utc::now(),
707            message,
708        };
709
710        if let Some(ref tx) = *self.status_tx.read().await {
711            if let Err(e) = tx.send(event).await {
712                error!("Failed to send component event: {e}");
713            }
714        }
715        // If status_tx is None, silently skip - initialization happens before start()
716        Ok(())
717    }
718}