Skip to main content

SourceBase

Struct SourceBase 

Source
pub struct SourceBase {
    pub id: String,
    pub auto_start: bool,
    pub status: Arc<RwLock<ComponentStatus>>,
    pub dispatchers: Arc<RwLock<Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>>>>,
    pub task_handle: Arc<RwLock<Option<JoinHandle<()>>>>,
    pub shutdown_tx: Arc<RwLock<Option<Sender<()>>>>,
    /* private fields */
}
Expand description

Base implementations for source and reaction plugins These are used by plugin developers, not by drasi-lib itself Base implementation for common source functionality

Fields§

§id: String

Source identifier

§auto_start: bool

Whether this source should auto-start

§status: Arc<RwLock<ComponentStatus>>

Current component status

§dispatchers: Arc<RwLock<Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>>>>

Dispatchers for sending source events to subscribers

This is a vector of dispatchers that send source events to all registered subscribers (queries). When a source produces a change event, it broadcasts it to all dispatchers in this vector.

§task_handle: Arc<RwLock<Option<JoinHandle<()>>>>

Handle to the source’s main task

§shutdown_tx: Arc<RwLock<Option<Sender<()>>>>

Sender for shutdown signal

Implementations§

Source§

impl SourceBase

Source

pub fn new(params: SourceBaseParams) -> Result<Self>

Create a new SourceBase with the given parameters

The status channel is not required during construction - it will be provided via the SourceRuntimeContext when initialize() is called.

If a bootstrap provider is specified in params, it will be set during construction (no async needed since nothing is shared yet).

Source

pub fn get_auto_start(&self) -> bool

Get whether this source should auto-start

Source

pub async fn initialize(&self, context: SourceRuntimeContext)

Initialize the source with runtime context.

This method is called automatically by DrasiLib’s add_source() method. Plugin developers do not need to call this directly.

The context provides access to:

  • source_id: The source’s unique identifier
  • status_tx: Channel for reporting component status events
  • state_store: Optional persistent state storage
Source

pub async fn context(&self) -> Option<SourceRuntimeContext>

Get the runtime context if initialized.

Returns None if initialize() has not been called yet.

Source

pub async fn state_store(&self) -> Option<Arc<dyn StateStoreProvider>>

Get the state store if configured.

Returns None if no state store was provided in the context.

Source

pub fn status_tx(&self) -> Arc<RwLock<Option<ComponentEventSender>>>

Get the status channel Arc for internal use by spawned tasks

This returns the internal status_tx wrapped in Arc<RwLock<Option<…>>> which allows background tasks to send component status events.

Returns a clone of the Arc that can be moved into spawned tasks.

Source

pub fn clone_shared(&self) -> Self

Clone the SourceBase with shared Arc references

This creates a new SourceBase that shares the same underlying data through Arc references. Useful for passing to spawned tasks.

Source

pub async fn set_bootstrap_provider( &self, provider: impl BootstrapProvider + 'static, )

Set the bootstrap provider for this source, taking ownership.

Call this after creating the SourceBase if the source plugin supports bootstrapping. The bootstrap provider is created by the plugin using its own configuration.

§Example
let provider = MyBootstrapProvider::new(config);
source_base.set_bootstrap_provider(provider).await;  // Ownership transferred
Source

pub fn get_id(&self) -> &str

Get the source ID

Source

pub async fn create_streaming_receiver( &self, ) -> Result<Box<dyn ChangeReceiver<SourceEventWrapper>>>

Create a streaming receiver for a query subscription

This creates the appropriate receiver based on the configured dispatch mode:

  • Broadcast mode: Returns a receiver from the shared broadcast dispatcher
  • Channel mode: Creates a new dedicated dispatcher and returns its receiver

This is a helper method that can be used by sources with custom subscribe logic.

Source

pub async fn subscribe_with_bootstrap( &self, settings: &SourceSubscriptionSettings, source_type: &str, ) -> Result<SubscriptionResponse>

Subscribe to this source with optional bootstrap

This is the standard subscribe implementation that all sources can use. It handles:

  • Creating a receiver for streaming events (based on dispatch mode)
  • Setting up bootstrap if requested and a provider has been set
  • Returning the appropriate SubscriptionResponse
Source

pub async fn dispatch_source_change(&self, change: SourceChange) -> Result<()>

Dispatch a SourceChange event with profiling metadata

This method handles the common pattern of:

  • Creating profiling metadata with timestamp
  • Wrapping the change in a SourceEventWrapper
  • Dispatching to all subscribers
  • Handling the no-subscriber case gracefully
Source

pub async fn dispatch_event(&self, wrapper: SourceEventWrapper) -> Result<()>

Dispatch a SourceEventWrapper to all subscribers

This is a generic method for dispatching any SourceEvent. It handles Arc-wrapping for zero-copy sharing and logs when there are no subscribers.

Source

pub async fn broadcast_control(&self, control: SourceControl) -> Result<()>

Broadcast SourceControl events

Source

pub fn test_subscribe(&self) -> Box<dyn ChangeReceiver<SourceEventWrapper>>

Create a test subscription to this source (synchronous wrapper)

This method is intended for use in tests to receive events from the source. It properly handles both Broadcast and Channel dispatch modes by delegating to create_streaming_receiver(), making the dispatch mode transparent to tests.

Note: This is a synchronous wrapper that uses tokio::task::block_in_place internally. For async contexts, prefer calling create_streaming_receiver() directly.

§Returns

A receiver that will receive all events dispatched by this source

§Panics

Panics if the receiver cannot be created (e.g., internal error)

Source

pub async fn dispatch_from_task( dispatchers: Arc<RwLock<Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>>>>, wrapper: SourceEventWrapper, source_id: &str, ) -> Result<()>

Helper function to dispatch events from spawned tasks

This is a static helper that can be used from spawned async tasks that don’t have access to self. It manually iterates through dispatchers and sends the event.

For code that has access to &self, prefer using dispatch_event() instead.

§Arguments
  • dispatchers - Arc to the dispatchers list (from self.base.dispatchers.clone())
  • wrapper - The event wrapper to dispatch
  • source_id - Source ID for logging
Source

pub async fn stop_common(&self) -> Result<()>

Handle common stop functionality

Source

pub async fn get_status(&self) -> ComponentStatus

Get the current status

Source

pub async fn set_status(&self, status: ComponentStatus)

Set the current status

Source

pub async fn set_status_with_event( &self, status: ComponentStatus, message: Option<String>, ) -> Result<()>

Transition to a new status and send event

Source

pub async fn set_task_handle(&self, handle: JoinHandle<()>)

Set the task handle

Source

pub async fn set_shutdown_tx(&self, tx: Sender<()>)

Set the shutdown sender

Source

pub async fn send_component_event( &self, status: ComponentStatus, message: Option<String>, ) -> Result<()>

Send a component event

If the status channel has not been initialized yet, this method silently succeeds without sending anything. This allows sources to be used in a standalone fashion without DrasiLib if needed.

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more