pub struct SourceBase {
pub id: String,
pub auto_start: bool,
pub status: Arc<RwLock<ComponentStatus>>,
pub dispatchers: Arc<RwLock<Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>>>>,
pub task_handle: Arc<RwLock<Option<JoinHandle<()>>>>,
pub shutdown_tx: Arc<RwLock<Option<Sender<()>>>>,
/* private fields */
}Expand description
Base implementations for source and reaction plugins These are used by plugin developers, not by drasi-lib itself Base implementation for common source functionality
Fields§
§id: StringSource identifier
auto_start: boolWhether this source should auto-start
status: Arc<RwLock<ComponentStatus>>Current component status
dispatchers: Arc<RwLock<Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>>>>Dispatchers for sending source events to subscribers
This is a vector of dispatchers that send source events to all registered subscribers (queries). When a source produces a change event, it broadcasts it to all dispatchers in this vector.
task_handle: Arc<RwLock<Option<JoinHandle<()>>>>Handle to the source’s main task
shutdown_tx: Arc<RwLock<Option<Sender<()>>>>Sender for shutdown signal
Implementations§
Source§impl SourceBase
impl SourceBase
Sourcepub fn new(params: SourceBaseParams) -> Result<Self>
pub fn new(params: SourceBaseParams) -> Result<Self>
Create a new SourceBase with the given parameters
The status channel is not required during construction - it will be
provided via the SourceRuntimeContext when initialize() is called.
If a bootstrap provider is specified in params, it will be set during construction (no async needed since nothing is shared yet).
Sourcepub fn get_auto_start(&self) -> bool
pub fn get_auto_start(&self) -> bool
Get whether this source should auto-start
Sourcepub async fn initialize(&self, context: SourceRuntimeContext)
pub async fn initialize(&self, context: SourceRuntimeContext)
Initialize the source with runtime context.
This method is called automatically by DrasiLib’s add_source() method.
Plugin developers do not need to call this directly.
The context provides access to:
source_id: The source’s unique identifierstatus_tx: Channel for reporting component status eventsstate_store: Optional persistent state storage
Sourcepub async fn context(&self) -> Option<SourceRuntimeContext>
pub async fn context(&self) -> Option<SourceRuntimeContext>
Get the runtime context if initialized.
Returns None if initialize() has not been called yet.
Sourcepub async fn state_store(&self) -> Option<Arc<dyn StateStoreProvider>>
pub async fn state_store(&self) -> Option<Arc<dyn StateStoreProvider>>
Get the state store if configured.
Returns None if no state store was provided in the context.
Sourcepub fn status_tx(&self) -> Arc<RwLock<Option<ComponentEventSender>>>
pub fn status_tx(&self) -> Arc<RwLock<Option<ComponentEventSender>>>
Get the status channel Arc for internal use by spawned tasks
This returns the internal status_tx wrapped in Arc<RwLock<Option<…>>> which allows background tasks to send component status events.
Returns a clone of the Arc that can be moved into spawned tasks.
Clone the SourceBase with shared Arc references
This creates a new SourceBase that shares the same underlying data through Arc references. Useful for passing to spawned tasks.
Sourcepub async fn set_bootstrap_provider(
&self,
provider: impl BootstrapProvider + 'static,
)
pub async fn set_bootstrap_provider( &self, provider: impl BootstrapProvider + 'static, )
Set the bootstrap provider for this source, taking ownership.
Call this after creating the SourceBase if the source plugin supports bootstrapping. The bootstrap provider is created by the plugin using its own configuration.
§Example
let provider = MyBootstrapProvider::new(config);
source_base.set_bootstrap_provider(provider).await; // Ownership transferredSourcepub async fn create_streaming_receiver(
&self,
) -> Result<Box<dyn ChangeReceiver<SourceEventWrapper>>>
pub async fn create_streaming_receiver( &self, ) -> Result<Box<dyn ChangeReceiver<SourceEventWrapper>>>
Create a streaming receiver for a query subscription
This creates the appropriate receiver based on the configured dispatch mode:
- Broadcast mode: Returns a receiver from the shared broadcast dispatcher
- Channel mode: Creates a new dedicated dispatcher and returns its receiver
This is a helper method that can be used by sources with custom subscribe logic.
Sourcepub async fn subscribe_with_bootstrap(
&self,
settings: &SourceSubscriptionSettings,
source_type: &str,
) -> Result<SubscriptionResponse>
pub async fn subscribe_with_bootstrap( &self, settings: &SourceSubscriptionSettings, source_type: &str, ) -> Result<SubscriptionResponse>
Subscribe to this source with optional bootstrap
This is the standard subscribe implementation that all sources can use. It handles:
- Creating a receiver for streaming events (based on dispatch mode)
- Setting up bootstrap if requested and a provider has been set
- Returning the appropriate SubscriptionResponse
Sourcepub async fn dispatch_source_change(&self, change: SourceChange) -> Result<()>
pub async fn dispatch_source_change(&self, change: SourceChange) -> Result<()>
Dispatch a SourceChange event with profiling metadata
This method handles the common pattern of:
- Creating profiling metadata with timestamp
- Wrapping the change in a SourceEventWrapper
- Dispatching to all subscribers
- Handling the no-subscriber case gracefully
Sourcepub async fn dispatch_event(&self, wrapper: SourceEventWrapper) -> Result<()>
pub async fn dispatch_event(&self, wrapper: SourceEventWrapper) -> Result<()>
Dispatch a SourceEventWrapper to all subscribers
This is a generic method for dispatching any SourceEvent. It handles Arc-wrapping for zero-copy sharing and logs when there are no subscribers.
Sourcepub async fn broadcast_control(&self, control: SourceControl) -> Result<()>
pub async fn broadcast_control(&self, control: SourceControl) -> Result<()>
Broadcast SourceControl events
Sourcepub fn test_subscribe(&self) -> Box<dyn ChangeReceiver<SourceEventWrapper>>
pub fn test_subscribe(&self) -> Box<dyn ChangeReceiver<SourceEventWrapper>>
Create a test subscription to this source (synchronous wrapper)
This method is intended for use in tests to receive events from the source.
It properly handles both Broadcast and Channel dispatch modes by delegating
to create_streaming_receiver(), making the dispatch mode transparent to tests.
Note: This is a synchronous wrapper that uses tokio::task::block_in_place internally.
For async contexts, prefer calling create_streaming_receiver() directly.
§Returns
A receiver that will receive all events dispatched by this source
§Panics
Panics if the receiver cannot be created (e.g., internal error)
Sourcepub async fn dispatch_from_task(
dispatchers: Arc<RwLock<Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>>>>,
wrapper: SourceEventWrapper,
source_id: &str,
) -> Result<()>
pub async fn dispatch_from_task( dispatchers: Arc<RwLock<Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>>>>, wrapper: SourceEventWrapper, source_id: &str, ) -> Result<()>
Helper function to dispatch events from spawned tasks
This is a static helper that can be used from spawned async tasks that don’t
have access to self. It manually iterates through dispatchers and sends the event.
For code that has access to &self, prefer using dispatch_event() instead.
§Arguments
dispatchers- Arc to the dispatchers list (fromself.base.dispatchers.clone())wrapper- The event wrapper to dispatchsource_id- Source ID for logging
Sourcepub async fn stop_common(&self) -> Result<()>
pub async fn stop_common(&self) -> Result<()>
Handle common stop functionality
Sourcepub async fn get_status(&self) -> ComponentStatus
pub async fn get_status(&self) -> ComponentStatus
Get the current status
Sourcepub async fn set_status(&self, status: ComponentStatus)
pub async fn set_status(&self, status: ComponentStatus)
Set the current status
Sourcepub async fn set_status_with_event(
&self,
status: ComponentStatus,
message: Option<String>,
) -> Result<()>
pub async fn set_status_with_event( &self, status: ComponentStatus, message: Option<String>, ) -> Result<()>
Transition to a new status and send event
Sourcepub async fn set_task_handle(&self, handle: JoinHandle<()>)
pub async fn set_task_handle(&self, handle: JoinHandle<()>)
Set the task handle
Sourcepub async fn set_shutdown_tx(&self, tx: Sender<()>)
pub async fn set_shutdown_tx(&self, tx: Sender<()>)
Set the shutdown sender
Sourcepub async fn send_component_event(
&self,
status: ComponentStatus,
message: Option<String>,
) -> Result<()>
pub async fn send_component_event( &self, status: ComponentStatus, message: Option<String>, ) -> Result<()>
Send a component event
If the status channel has not been initialized yet, this method silently succeeds without sending anything. This allows sources to be used in a standalone fashion without DrasiLib if needed.