drasi_lib/sources/base.rs
1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Base implementation for common source functionality.
16//!
17//! This module provides `SourceBase` which encapsulates common patterns
18//! used across all source implementations:
19//! - Dispatcher setup and management
20//! - Bootstrap subscription handling
21//! - Event dispatching with profiling
22//! - Component lifecycle management
23//!
24//! # Plugin Architecture
25//!
26//! SourceBase is designed to be used by source plugins. Each plugin:
27//! 1. Defines its own typed configuration struct
28//! 2. Creates a SourceBase with SourceBaseParams
29//! 3. Optionally provides a bootstrap provider via `set_bootstrap_provider()`
30//! 4. Implements the Source trait delegating to SourceBase methods
31
32use anyhow::Result;
33use log::{debug, error, info};
34use std::sync::Arc;
35use tokio::sync::RwLock;
36use tracing::Instrument;
37
38use crate::bootstrap::{BootstrapContext, BootstrapProvider, BootstrapRequest};
39use crate::channels::*;
40use crate::context::SourceRuntimeContext;
41use crate::identity::IdentityProvider;
42use crate::profiling;
43use crate::state_store::StateStoreProvider;
44use drasi_core::models::SourceChange;
45
46/// Parameters for creating a SourceBase instance.
47///
48/// This struct contains only the information that SourceBase needs to function.
49/// Plugin-specific configuration should remain in the plugin crate.
50///
51/// # Example
52///
53/// ```ignore
54/// use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
55///
56/// let params = SourceBaseParams::new("my-source")
57/// .with_dispatch_mode(DispatchMode::Channel)
58/// .with_dispatch_buffer_capacity(2000)
59/// .with_bootstrap_provider(my_provider);
60///
61/// let base = SourceBase::new(params)?;
62/// ```
63pub struct SourceBaseParams {
64 /// Unique identifier for the source
65 pub id: String,
66 /// Dispatch mode (Broadcast or Channel) - defaults to Channel
67 pub dispatch_mode: Option<DispatchMode>,
68 /// Dispatch buffer capacity - defaults to 1000
69 pub dispatch_buffer_capacity: Option<usize>,
70 /// Optional bootstrap provider to set during construction
71 pub bootstrap_provider: Option<Box<dyn BootstrapProvider + 'static>>,
72 /// Whether this source should auto-start - defaults to true
73 pub auto_start: bool,
74}
75
76impl std::fmt::Debug for SourceBaseParams {
77 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78 f.debug_struct("SourceBaseParams")
79 .field("id", &self.id)
80 .field("dispatch_mode", &self.dispatch_mode)
81 .field("dispatch_buffer_capacity", &self.dispatch_buffer_capacity)
82 .field(
83 "bootstrap_provider",
84 &self.bootstrap_provider.as_ref().map(|_| "<provider>"),
85 )
86 .field("auto_start", &self.auto_start)
87 .finish()
88 }
89}
90
91impl SourceBaseParams {
92 /// Create new params with just an ID, using defaults for everything else
93 pub fn new(id: impl Into<String>) -> Self {
94 Self {
95 id: id.into(),
96 dispatch_mode: None,
97 dispatch_buffer_capacity: None,
98 bootstrap_provider: None,
99 auto_start: true,
100 }
101 }
102
103 /// Set the dispatch mode
104 pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
105 self.dispatch_mode = Some(mode);
106 self
107 }
108
109 /// Set the dispatch buffer capacity
110 pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
111 self.dispatch_buffer_capacity = Some(capacity);
112 self
113 }
114
115 /// Set the bootstrap provider
116 ///
117 /// This provider will be used during source subscription to deliver
118 /// initial data to queries that request bootstrap.
119 pub fn with_bootstrap_provider(mut self, provider: impl BootstrapProvider + 'static) -> Self {
120 self.bootstrap_provider = Some(Box::new(provider));
121 self
122 }
123
124 /// Set whether this source should auto-start
125 ///
126 /// Default is `true`. Set to `false` if this source should only be
127 /// started manually via `start_source()`.
128 pub fn with_auto_start(mut self, auto_start: bool) -> Self {
129 self.auto_start = auto_start;
130 self
131 }
132}
133
134/// Base implementation for common source functionality
135pub struct SourceBase {
136 /// Source identifier
137 pub id: String,
138 /// Dispatch mode setting
139 dispatch_mode: DispatchMode,
140 /// Dispatch buffer capacity
141 dispatch_buffer_capacity: usize,
142 /// Whether this source should auto-start
143 pub auto_start: bool,
144 /// Current component status
145 pub status: Arc<RwLock<ComponentStatus>>,
146 /// Dispatchers for sending source events to subscribers
147 ///
148 /// This is a vector of dispatchers that send source events to all registered
149 /// subscribers (queries). When a source produces a change event, it broadcasts
150 /// it to all dispatchers in this vector.
151 pub dispatchers: Arc<RwLock<Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>>>>,
152 /// Runtime context (set by initialize())
153 context: Arc<RwLock<Option<SourceRuntimeContext>>>,
154 /// Channel for sending component status events (extracted from context for convenience)
155 status_tx: Arc<RwLock<Option<ComponentEventSender>>>,
156 /// State store provider (extracted from context for convenience)
157 state_store: Arc<RwLock<Option<Arc<dyn StateStoreProvider>>>>,
158 /// Handle to the source's main task
159 pub task_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
160 /// Sender for shutdown signal
161 pub shutdown_tx: Arc<RwLock<Option<tokio::sync::oneshot::Sender<()>>>>,
162 /// Optional bootstrap provider - plugins set this if they support bootstrap
163 bootstrap_provider: Arc<RwLock<Option<Arc<dyn BootstrapProvider>>>>,
164 /// Optional identity provider for credential management.
165 /// Set either programmatically (via `set_identity_provider`) or automatically
166 /// from the runtime context during `initialize()`.
167 identity_provider: Arc<RwLock<Option<Arc<dyn IdentityProvider>>>>,
168}
169
170impl SourceBase {
171 /// Create a new SourceBase with the given parameters
172 ///
173 /// The status channel is not required during construction - it will be
174 /// provided via the `SourceRuntimeContext` when `initialize()` is called.
175 ///
176 /// If a bootstrap provider is specified in params, it will be set during
177 /// construction (no async needed since nothing is shared yet).
178 pub fn new(params: SourceBaseParams) -> Result<Self> {
179 // Determine dispatch mode (default to Channel if not specified)
180 let dispatch_mode = params.dispatch_mode.unwrap_or_default();
181 let dispatch_buffer_capacity = params.dispatch_buffer_capacity.unwrap_or(1000);
182
183 // Set up initial dispatchers based on dispatch mode
184 let mut dispatchers: Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>> =
185 Vec::new();
186
187 if dispatch_mode == DispatchMode::Broadcast {
188 // For broadcast mode, create a single broadcast dispatcher
189 let dispatcher =
190 BroadcastChangeDispatcher::<SourceEventWrapper>::new(dispatch_buffer_capacity);
191 dispatchers.push(Box::new(dispatcher));
192 }
193 // For channel mode, dispatchers will be created on-demand when subscribing
194
195 // Initialize bootstrap provider if provided (no async needed at construction time)
196 let bootstrap_provider = params
197 .bootstrap_provider
198 .map(|p| Arc::from(p) as Arc<dyn BootstrapProvider>);
199
200 Ok(Self {
201 id: params.id,
202 dispatch_mode,
203 dispatch_buffer_capacity,
204 auto_start: params.auto_start,
205 status: Arc::new(RwLock::new(ComponentStatus::Stopped)),
206 dispatchers: Arc::new(RwLock::new(dispatchers)),
207 context: Arc::new(RwLock::new(None)), // Set by initialize()
208 status_tx: Arc::new(RwLock::new(None)), // Extracted from context
209 state_store: Arc::new(RwLock::new(None)), // Extracted from context
210 task_handle: Arc::new(RwLock::new(None)),
211 shutdown_tx: Arc::new(RwLock::new(None)),
212 bootstrap_provider: Arc::new(RwLock::new(bootstrap_provider)),
213 identity_provider: Arc::new(RwLock::new(None)),
214 })
215 }
216
217 /// Get whether this source should auto-start
218 pub fn get_auto_start(&self) -> bool {
219 self.auto_start
220 }
221
222 /// Initialize the source with runtime context.
223 ///
224 /// This method is called automatically by DrasiLib's `add_source()` method.
225 /// Plugin developers do not need to call this directly.
226 ///
227 /// The context provides access to:
228 /// - `source_id`: The source's unique identifier
229 /// - `status_tx`: Channel for reporting component status events
230 /// - `state_store`: Optional persistent state storage
231 pub async fn initialize(&self, context: SourceRuntimeContext) {
232 // Store context for later use
233 *self.context.write().await = Some(context.clone());
234
235 // Extract services for convenience
236 *self.status_tx.write().await = Some(context.status_tx.clone());
237
238 if let Some(state_store) = context.state_store.as_ref() {
239 *self.state_store.write().await = Some(state_store.clone());
240 }
241
242 // Store identity provider from context if not already set programmatically
243 if let Some(ip) = context.identity_provider.as_ref() {
244 let mut guard = self.identity_provider.write().await;
245 if guard.is_none() {
246 *guard = Some(ip.clone());
247 }
248 }
249 }
250
251 /// Get the runtime context if initialized.
252 ///
253 /// Returns `None` if `initialize()` has not been called yet.
254 pub async fn context(&self) -> Option<SourceRuntimeContext> {
255 self.context.read().await.clone()
256 }
257
258 /// Get the state store if configured.
259 ///
260 /// Returns `None` if no state store was provided in the context.
261 pub async fn state_store(&self) -> Option<Arc<dyn StateStoreProvider>> {
262 self.state_store.read().await.clone()
263 }
264
265 /// Get the identity provider if set.
266 ///
267 /// Returns the identity provider set either programmatically via
268 /// `set_identity_provider()` or from the runtime context during `initialize()`.
269 /// Programmatically-set providers take precedence over context providers.
270 pub async fn identity_provider(&self) -> Option<Arc<dyn IdentityProvider>> {
271 self.identity_provider.read().await.clone()
272 }
273
274 /// Set the identity provider programmatically.
275 ///
276 /// This is typically called during source construction when the provider
277 /// is available from configuration (e.g., `with_identity_provider()` builder).
278 /// Providers set this way take precedence over context-injected providers.
279 pub async fn set_identity_provider(&self, provider: Arc<dyn IdentityProvider>) {
280 *self.identity_provider.write().await = Some(provider);
281 }
282
283 /// Get the status channel Arc for internal use by spawned tasks
284 ///
285 /// This returns the internal status_tx wrapped in Arc<RwLock<Option<...>>>
286 /// which allows background tasks to send component status events.
287 ///
288 /// Returns a clone of the Arc that can be moved into spawned tasks.
289 pub fn status_tx(&self) -> Arc<RwLock<Option<ComponentEventSender>>> {
290 self.status_tx.clone()
291 }
292
293 /// Clone the SourceBase with shared Arc references
294 ///
295 /// This creates a new SourceBase that shares the same underlying
296 /// data through Arc references. Useful for passing to spawned tasks.
297 pub fn clone_shared(&self) -> Self {
298 Self {
299 id: self.id.clone(),
300 dispatch_mode: self.dispatch_mode,
301 dispatch_buffer_capacity: self.dispatch_buffer_capacity,
302 auto_start: self.auto_start,
303 status: self.status.clone(),
304 dispatchers: self.dispatchers.clone(),
305 context: self.context.clone(),
306 status_tx: self.status_tx.clone(),
307 state_store: self.state_store.clone(),
308 task_handle: self.task_handle.clone(),
309 shutdown_tx: self.shutdown_tx.clone(),
310 bootstrap_provider: self.bootstrap_provider.clone(),
311 identity_provider: self.identity_provider.clone(),
312 }
313 }
314
315 /// Set the bootstrap provider for this source, taking ownership.
316 ///
317 /// Call this after creating the SourceBase if the source plugin supports bootstrapping.
318 /// The bootstrap provider is created by the plugin using its own configuration.
319 ///
320 /// # Example
321 /// ```ignore
322 /// let provider = MyBootstrapProvider::new(config);
323 /// source_base.set_bootstrap_provider(provider).await; // Ownership transferred
324 /// ```
325 pub async fn set_bootstrap_provider(&self, provider: impl BootstrapProvider + 'static) {
326 *self.bootstrap_provider.write().await = Some(Arc::new(provider));
327 }
328
329 /// Get the source ID
330 pub fn get_id(&self) -> &str {
331 &self.id
332 }
333
334 /// Create a streaming receiver for a query subscription
335 ///
336 /// This creates the appropriate receiver based on the configured dispatch mode:
337 /// - Broadcast mode: Returns a receiver from the shared broadcast dispatcher
338 /// - Channel mode: Creates a new dedicated dispatcher and returns its receiver
339 ///
340 /// This is a helper method that can be used by sources with custom subscribe logic.
341 pub async fn create_streaming_receiver(
342 &self,
343 ) -> Result<Box<dyn ChangeReceiver<SourceEventWrapper>>> {
344 let receiver: Box<dyn ChangeReceiver<SourceEventWrapper>> = match self.dispatch_mode {
345 DispatchMode::Broadcast => {
346 // For broadcast mode, use the single dispatcher
347 let dispatchers = self.dispatchers.read().await;
348 if let Some(dispatcher) = dispatchers.first() {
349 dispatcher.create_receiver().await?
350 } else {
351 return Err(anyhow::anyhow!("No broadcast dispatcher available"));
352 }
353 }
354 DispatchMode::Channel => {
355 // For channel mode, create a new dispatcher for this subscription
356 let dispatcher = ChannelChangeDispatcher::<SourceEventWrapper>::new(
357 self.dispatch_buffer_capacity,
358 );
359 let receiver = dispatcher.create_receiver().await?;
360
361 // Add the new dispatcher to our list
362 let mut dispatchers = self.dispatchers.write().await;
363 dispatchers.push(Box::new(dispatcher));
364
365 receiver
366 }
367 };
368
369 Ok(receiver)
370 }
371
372 /// Subscribe to this source with optional bootstrap
373 ///
374 /// This is the standard subscribe implementation that all sources can use.
375 /// It handles:
376 /// - Creating a receiver for streaming events (based on dispatch mode)
377 /// - Setting up bootstrap if requested and a provider has been set
378 /// - Returning the appropriate SubscriptionResponse
379 pub async fn subscribe_with_bootstrap(
380 &self,
381 settings: &crate::config::SourceSubscriptionSettings,
382 source_type: &str,
383 ) -> Result<SubscriptionResponse> {
384 info!(
385 "Query '{}' subscribing to {} source '{}' (bootstrap: {})",
386 settings.query_id, source_type, self.id, settings.enable_bootstrap
387 );
388
389 // Create streaming receiver using helper method
390 let receiver = self.create_streaming_receiver().await?;
391
392 let query_id_for_response = settings.query_id.clone();
393
394 // Handle bootstrap if requested and bootstrap provider is configured
395 let bootstrap_receiver = if settings.enable_bootstrap {
396 self.handle_bootstrap_subscription(settings, source_type)
397 .await?
398 } else {
399 None
400 };
401
402 Ok(SubscriptionResponse {
403 query_id: query_id_for_response,
404 source_id: self.id.clone(),
405 receiver,
406 bootstrap_receiver,
407 })
408 }
409
410 /// Handle bootstrap subscription logic
411 async fn handle_bootstrap_subscription(
412 &self,
413 settings: &crate::config::SourceSubscriptionSettings,
414 source_type: &str,
415 ) -> Result<Option<BootstrapEventReceiver>> {
416 let provider_guard = self.bootstrap_provider.read().await;
417 if let Some(provider) = provider_guard.clone() {
418 drop(provider_guard); // Release lock before spawning task
419
420 info!(
421 "Creating bootstrap for query '{}' on {} source '{}'",
422 settings.query_id, source_type, self.id
423 );
424
425 // Create bootstrap context
426 let context = BootstrapContext::new_minimal(
427 self.id.clone(), // server_id
428 self.id.clone(), // source_id
429 );
430
431 // Create bootstrap channel
432 let (bootstrap_tx, bootstrap_rx) = tokio::sync::mpsc::channel(1000);
433
434 // Convert HashSet to Vec for backward compatibility with BootstrapRequest
435 let node_labels: Vec<String> = settings.nodes.iter().cloned().collect();
436 let relation_labels: Vec<String> = settings.relations.iter().cloned().collect();
437
438 // Create bootstrap request with request_id
439 let request = BootstrapRequest {
440 query_id: settings.query_id.clone(),
441 node_labels,
442 relation_labels,
443 request_id: format!("{}-{}", settings.query_id, uuid::Uuid::new_v4()),
444 };
445
446 // Clone settings for the async task
447 let settings_clone = settings.clone();
448 let source_id = self.id.clone();
449
450 // Get instance_id from context for log routing isolation
451 let instance_id = self
452 .context()
453 .await
454 .map(|c| c.instance_id.clone())
455 .unwrap_or_default();
456
457 // Spawn bootstrap task with tracing span for proper log routing
458 let span = tracing::info_span!(
459 "source_bootstrap",
460 instance_id = %instance_id,
461 component_id = %source_id,
462 component_type = "source"
463 );
464 tokio::spawn(
465 async move {
466 match provider
467 .bootstrap(request, &context, bootstrap_tx, Some(&settings_clone))
468 .await
469 {
470 Ok(count) => {
471 info!(
472 "Bootstrap completed successfully for query '{}', sent {count} events",
473 settings_clone.query_id
474 );
475 }
476 Err(e) => {
477 error!(
478 "Bootstrap failed for query '{}': {e}",
479 settings_clone.query_id
480 );
481 }
482 }
483 }
484 .instrument(span),
485 );
486
487 Ok(Some(bootstrap_rx))
488 } else {
489 info!(
490 "Bootstrap requested for query '{}' but no bootstrap provider configured for {} source '{}'",
491 settings.query_id, source_type, self.id
492 );
493 Ok(None)
494 }
495 }
496
497 /// Dispatch a SourceChange event with profiling metadata
498 ///
499 /// This method handles the common pattern of:
500 /// - Creating profiling metadata with timestamp
501 /// - Wrapping the change in a SourceEventWrapper
502 /// - Dispatching to all subscribers
503 /// - Handling the no-subscriber case gracefully
504 pub async fn dispatch_source_change(&self, change: SourceChange) -> Result<()> {
505 // Create profiling metadata
506 let mut profiling = profiling::ProfilingMetadata::new();
507 profiling.source_send_ns = Some(profiling::timestamp_ns());
508
509 // Create event wrapper
510 let wrapper = SourceEventWrapper::with_profiling(
511 self.id.clone(),
512 SourceEvent::Change(change),
513 chrono::Utc::now(),
514 profiling,
515 );
516
517 // Dispatch event
518 self.dispatch_event(wrapper).await
519 }
520
521 /// Dispatch a SourceEventWrapper to all subscribers
522 ///
523 /// This is a generic method for dispatching any SourceEvent.
524 /// It handles Arc-wrapping for zero-copy sharing and logs
525 /// when there are no subscribers.
526 pub async fn dispatch_event(&self, wrapper: SourceEventWrapper) -> Result<()> {
527 debug!("[{}] Dispatching event: {:?}", self.id, &wrapper);
528
529 // Arc-wrap for zero-copy sharing across dispatchers
530 let arc_wrapper = Arc::new(wrapper);
531
532 // Send to all dispatchers
533 let dispatchers = self.dispatchers.read().await;
534 for dispatcher in dispatchers.iter() {
535 if let Err(e) = dispatcher.dispatch_change(arc_wrapper.clone()).await {
536 debug!("[{}] Failed to dispatch event: {}", self.id, e);
537 }
538 }
539
540 Ok(())
541 }
542
543 /// Broadcast SourceControl events
544 pub async fn broadcast_control(&self, control: SourceControl) -> Result<()> {
545 let wrapper = SourceEventWrapper::new(
546 self.id.clone(),
547 SourceEvent::Control(control),
548 chrono::Utc::now(),
549 );
550 self.dispatch_event(wrapper).await
551 }
552
553 /// Create a test subscription to this source (synchronous wrapper)
554 ///
555 /// This method is intended for use in tests to receive events from the source.
556 /// It properly handles both Broadcast and Channel dispatch modes by delegating
557 /// to `create_streaming_receiver()`, making the dispatch mode transparent to tests.
558 ///
559 /// Note: This is a synchronous wrapper that uses `tokio::task::block_in_place` internally.
560 /// For async contexts, prefer calling `create_streaming_receiver()` directly.
561 ///
562 /// # Returns
563 /// A receiver that will receive all events dispatched by this source
564 ///
565 /// # Panics
566 /// Panics if the receiver cannot be created (e.g., internal error)
567 pub fn test_subscribe(&self) -> Box<dyn ChangeReceiver<SourceEventWrapper>> {
568 // Use block_in_place to avoid nested executor issues in async tests
569 tokio::task::block_in_place(|| {
570 tokio::runtime::Handle::current().block_on(self.create_streaming_receiver())
571 })
572 .expect("Failed to create test subscription receiver")
573 }
574
575 /// Helper function to dispatch events from spawned tasks
576 ///
577 /// This is a static helper that can be used from spawned async tasks that don't
578 /// have access to `self`. It manually iterates through dispatchers and sends the event.
579 ///
580 /// For code that has access to `&self`, prefer using `dispatch_event()` instead.
581 ///
582 /// # Arguments
583 /// * `dispatchers` - Arc to the dispatchers list (from `self.base.dispatchers.clone()`)
584 /// * `wrapper` - The event wrapper to dispatch
585 /// * `source_id` - Source ID for logging
586 pub async fn dispatch_from_task(
587 dispatchers: Arc<RwLock<Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>>>>,
588 wrapper: SourceEventWrapper,
589 source_id: &str,
590 ) -> Result<()> {
591 debug!(
592 "[{}] Dispatching event from task: {:?}",
593 source_id, &wrapper
594 );
595
596 // Arc-wrap for zero-copy sharing across dispatchers
597 let arc_wrapper = Arc::new(wrapper);
598
599 // Send to all dispatchers
600 let dispatchers_guard = dispatchers.read().await;
601 for dispatcher in dispatchers_guard.iter() {
602 if let Err(e) = dispatcher.dispatch_change(arc_wrapper.clone()).await {
603 debug!("[{source_id}] Failed to dispatch event from task: {e}");
604 }
605 }
606
607 Ok(())
608 }
609
610 /// Handle common stop functionality
611 pub async fn stop_common(&self) -> Result<()> {
612 info!("Stopping source '{}'", self.id);
613
614 // Send shutdown signal if we have one
615 if let Some(tx) = self.shutdown_tx.write().await.take() {
616 let _ = tx.send(());
617 }
618
619 // Wait for task to complete
620 if let Some(handle) = self.task_handle.write().await.take() {
621 match tokio::time::timeout(std::time::Duration::from_secs(5), handle).await {
622 Ok(Ok(())) => {
623 info!("Source '{}' task completed successfully", self.id);
624 }
625 Ok(Err(e)) => {
626 error!("Source '{}' task panicked: {}", self.id, e);
627 }
628 Err(_) => {
629 error!("Source '{}' task did not complete within timeout", self.id);
630 }
631 }
632 }
633
634 *self.status.write().await = ComponentStatus::Stopped;
635 info!("Source '{}' stopped", self.id);
636 Ok(())
637 }
638
639 /// Clear the source's state store partition.
640 ///
641 /// This is called during deprovision to remove all persisted state
642 /// associated with this source. Sources that override `deprovision()`
643 /// can call this to clean up their state store.
644 pub async fn deprovision_common(&self) -> Result<()> {
645 info!("Deprovisioning source '{}'", self.id);
646 if let Some(store) = self.state_store().await {
647 let count = store.clear_store(&self.id).await.map_err(|e| {
648 anyhow::anyhow!(
649 "Failed to clear state store for source '{}': {}",
650 self.id,
651 e
652 )
653 })?;
654 info!(
655 "Cleared {} keys from state store for source '{}'",
656 count, self.id
657 );
658 }
659 Ok(())
660 }
661
662 /// Get the current status
663 pub async fn get_status(&self) -> ComponentStatus {
664 self.status.read().await.clone()
665 }
666
667 /// Set the current status
668 pub async fn set_status(&self, status: ComponentStatus) {
669 *self.status.write().await = status;
670 }
671
672 /// Transition to a new status and send event
673 pub async fn set_status_with_event(
674 &self,
675 status: ComponentStatus,
676 message: Option<String>,
677 ) -> Result<()> {
678 *self.status.write().await = status.clone();
679 self.send_component_event(status, message).await
680 }
681
682 /// Set the task handle
683 pub async fn set_task_handle(&self, handle: tokio::task::JoinHandle<()>) {
684 *self.task_handle.write().await = Some(handle);
685 }
686
687 /// Set the shutdown sender
688 pub async fn set_shutdown_tx(&self, tx: tokio::sync::oneshot::Sender<()>) {
689 *self.shutdown_tx.write().await = Some(tx);
690 }
691
692 /// Send a component event
693 ///
694 /// If the status channel has not been initialized yet, this method silently
695 /// succeeds without sending anything. This allows sources to be used
696 /// in a standalone fashion without DrasiLib if needed.
697 pub async fn send_component_event(
698 &self,
699 status: ComponentStatus,
700 message: Option<String>,
701 ) -> Result<()> {
702 let event = ComponentEvent {
703 component_id: self.id.clone(),
704 component_type: ComponentType::Source,
705 status,
706 timestamp: chrono::Utc::now(),
707 message,
708 };
709
710 if let Some(ref tx) = *self.status_tx.read().await {
711 if let Err(e) = tx.send(event).await {
712 error!("Failed to send component event: {e}");
713 }
714 }
715 // If status_tx is None, silently skip - initialization happens before start()
716 Ok(())
717 }
718}