pub struct SourceBase {
pub id: String,
pub auto_start: bool,
pub dispatchers: Arc<RwLock<Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>>>>,
pub task_handle: Arc<RwLock<Option<JoinHandle<()>>>>,
pub shutdown_tx: Arc<RwLock<Option<Sender<()>>>>,
/* private fields */
}Expand description
Base implementations for source plugins Base implementation for common source functionality
Fields§
§id: StringSource identifier
auto_start: boolWhether this source should auto-start
dispatchers: Arc<RwLock<Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>>>>Dispatchers for sending source events to subscribers
This is a vector of dispatchers that send source events to all registered subscribers (queries). When a source produces a change event, it broadcasts it to all dispatchers in this vector.
task_handle: Arc<RwLock<Option<JoinHandle<()>>>>Handle to the source’s main task
shutdown_tx: Arc<RwLock<Option<Sender<()>>>>Sender for shutdown signal
Implementations§
Source§impl SourceBase
impl SourceBase
Sourcepub fn new(params: SourceBaseParams) -> Result<Self>
pub fn new(params: SourceBaseParams) -> Result<Self>
Create a new SourceBase with the given parameters
The status channel is not required during construction - it will be
provided via the SourceRuntimeContext when initialize() is called.
If a bootstrap provider is specified in params, it will be set during construction (no async needed since nothing is shared yet).
Sourcepub fn get_auto_start(&self) -> bool
pub fn get_auto_start(&self) -> bool
Get whether this source should auto-start
Sourcepub async fn initialize(&self, context: SourceRuntimeContext)
pub async fn initialize(&self, context: SourceRuntimeContext)
Initialize the source with runtime context.
This method is called automatically by DrasiLib’s add_source() method.
Plugin developers do not need to call this directly.
The context provides access to:
source_id: The source’s unique identifierupdate_tx: mpsc sender for fire-and-forget status updates to the component graphstate_store: Optional persistent state storage
Sourcepub async fn context(&self) -> Option<SourceRuntimeContext>
pub async fn context(&self) -> Option<SourceRuntimeContext>
Get the runtime context if initialized.
Returns None if initialize() has not been called yet.
Sourcepub async fn state_store(&self) -> Option<Arc<dyn StateStoreProvider>>
pub async fn state_store(&self) -> Option<Arc<dyn StateStoreProvider>>
Get the state store if configured.
Returns None if no state store was provided in the context.
Sourcepub async fn identity_provider(&self) -> Option<Arc<dyn IdentityProvider>>
pub async fn identity_provider(&self) -> Option<Arc<dyn IdentityProvider>>
Get the identity provider if set.
Returns the identity provider set either programmatically via
set_identity_provider() or from the runtime context during initialize().
Programmatically-set providers take precedence over context providers.
Sourcepub async fn set_identity_provider(&self, provider: Arc<dyn IdentityProvider>)
pub async fn set_identity_provider(&self, provider: Arc<dyn IdentityProvider>)
Set the identity provider programmatically.
This is typically called during source construction when the provider
is available from configuration (e.g., with_identity_provider() builder).
Providers set this way take precedence over context-injected providers.
Sourcepub async fn create_position_handle(&self, query_id: &str) -> Arc<AtomicU64> ⓘ
pub async fn create_position_handle(&self, query_id: &str) -> Arc<AtomicU64> ⓘ
Create and register a position handle for query_id, initialized to u64::MAX.
Returns the shared handle; the same Arc is placed in
SubscriptionResponse::position_handle so the query and the source share
one atomic. If a handle already exists for query_id (re-subscribe after
transient disconnect), the existing handle is returned to preserve any
position the query had previously reported.
Sourcepub async fn remove_position_handle(&self, query_id: &str)
pub async fn remove_position_handle(&self, query_id: &str)
Remove the position handle for query_id. No-op if absent.
Called from explicit cleanup paths (stop_query/delete_query will be
wired in a follow-up issue). Until then, cleanup_stale_handles()
(invoked inside compute_confirmed_position) catches dropped subscribers.
Sourcepub async fn compute_confirmed_position(&self) -> Option<u64>
pub async fn compute_confirmed_position(&self) -> Option<u64>
Compute the minimum confirmed position across all live subscribers.
Returns None if no handles are registered, or if every registered
handle is still u64::MAX (no subscriber has confirmed a position yet —
typically because they are still bootstrapping). Otherwise returns the
minimum non-u64::MAX value, suitable for advancing the source’s
upstream cursor (Postgres flush_lsn, Kafka commit, transient WAL prune
threshold).
Piggy-backs cleanup_stale_handles() so dropped subscribers do not pin
the watermark indefinitely.
Sourcepub async fn cleanup_stale_handles(&self)
pub async fn cleanup_stale_handles(&self)
Drop entries whose Arc::strong_count == 1 (only SourceBase holds a
reference).
This indicates the subscriber dropped its SubscriptionResponse without
calling remove_position_handle — common during query teardown until
explicit cleanup is wired by the query manager.
Safety constraint: this relies on SourceBase being the only long-lived
holder of the Arc besides the subscribing query. If a future periodic
scan task (or any other component) clones the handle, this method must
be revisited or replaced with explicit liveness tracking.
Sourcepub fn status_handle(&self) -> ComponentStatusHandle
pub fn status_handle(&self) -> ComponentStatusHandle
Returns a clonable ComponentStatusHandle for use in spawned tasks.
The handle can both read and write the component’s status and automatically
notifies the graph on every status change (after initialize()).
Clone the SourceBase with shared Arc references
This creates a new SourceBase that shares the same underlying data through Arc references. Useful for passing to spawned tasks.
Sourcepub async fn set_bootstrap_provider(
&self,
provider: impl BootstrapProvider + 'static,
)
pub async fn set_bootstrap_provider( &self, provider: impl BootstrapProvider + 'static, )
Set the bootstrap provider for this source, taking ownership.
Call this after creating the SourceBase if the source plugin supports bootstrapping. The bootstrap provider is created by the plugin using its own configuration.
§Example
let provider = MyBootstrapProvider::new(config);
source_base.set_bootstrap_provider(provider).await; // Ownership transferredSourcepub async fn create_streaming_receiver(
&self,
) -> Result<Box<dyn ChangeReceiver<SourceEventWrapper>>>
pub async fn create_streaming_receiver( &self, ) -> Result<Box<dyn ChangeReceiver<SourceEventWrapper>>>
Create a streaming receiver for a query subscription
This creates the appropriate receiver based on the configured dispatch mode:
- Broadcast mode: Returns a receiver from the shared broadcast dispatcher
- Channel mode: Creates a new dedicated dispatcher and returns its receiver
This is a helper method that can be used by sources with custom subscribe logic.
Sourcepub async fn subscribe_with_bootstrap(
&self,
settings: &SourceSubscriptionSettings,
source_type: &str,
) -> Result<SubscriptionResponse>
pub async fn subscribe_with_bootstrap( &self, settings: &SourceSubscriptionSettings, source_type: &str, ) -> Result<SubscriptionResponse>
Subscribe to this source with optional bootstrap
This is the standard subscribe implementation that all sources can use. It handles:
- Creating a receiver for streaming events (based on dispatch mode)
- Setting up bootstrap if requested and a provider has been set
- Returning the appropriate SubscriptionResponse
Sourcepub async fn dispatch_source_change(&self, change: SourceChange) -> Result<()>
pub async fn dispatch_source_change(&self, change: SourceChange) -> Result<()>
Dispatch a SourceChange event with profiling metadata
This method handles the common pattern of:
- Creating profiling metadata with timestamp
- Wrapping the change in a SourceEventWrapper
- Dispatching to all subscribers
- Handling the no-subscriber case gracefully
Sourcepub async fn dispatch_event(&self, wrapper: SourceEventWrapper) -> Result<()>
pub async fn dispatch_event(&self, wrapper: SourceEventWrapper) -> Result<()>
Dispatch a SourceEventWrapper to all subscribers
This is a generic method for dispatching any SourceEvent. It handles Arc-wrapping for zero-copy sharing and logs when there are no subscribers.
Sourcepub async fn broadcast_control(&self, control: SourceControl) -> Result<()>
pub async fn broadcast_control(&self, control: SourceControl) -> Result<()>
Broadcast SourceControl events
Sourcepub fn try_test_subscribe(
&self,
) -> Result<Box<dyn ChangeReceiver<SourceEventWrapper>>>
pub fn try_test_subscribe( &self, ) -> Result<Box<dyn ChangeReceiver<SourceEventWrapper>>>
Create a test subscription to this source (synchronous, fallible)
This method is intended for use in tests to receive events from the source.
It properly handles both Broadcast and Channel dispatch modes by delegating
to create_streaming_receiver(), making the dispatch mode transparent to tests.
Note: This is a synchronous wrapper that uses tokio::task::block_in_place internally.
For async contexts, prefer calling create_streaming_receiver() directly.
§Returns
A receiver that will receive all events dispatched by this source, or an error if the receiver cannot be created.
Sourcepub fn test_subscribe(&self) -> Box<dyn ChangeReceiver<SourceEventWrapper>>
pub fn test_subscribe(&self) -> Box<dyn ChangeReceiver<SourceEventWrapper>>
Create a test subscription to this source (synchronous wrapper)
Convenience wrapper around try_test_subscribe
that panics on failure. Prefer try_test_subscribe() in new code.
§Panics
Panics if the receiver cannot be created.
Sourcepub async fn dispatch_from_task(
dispatchers: Arc<RwLock<Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>>>>,
wrapper: SourceEventWrapper,
source_id: &str,
) -> Result<()>
pub async fn dispatch_from_task( dispatchers: Arc<RwLock<Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>>>>, wrapper: SourceEventWrapper, source_id: &str, ) -> Result<()>
Helper function to dispatch events from spawned tasks
This is a static helper that can be used from spawned async tasks that don’t
have access to self. It manually iterates through dispatchers and sends the event.
For code that has access to &self, prefer using dispatch_event() instead.
§Arguments
dispatchers- Arc to the dispatchers list (fromself.base.dispatchers.clone())wrapper- The event wrapper to dispatchsource_id- Source ID for logging
Sourcepub async fn stop_common(&self) -> Result<()>
pub async fn stop_common(&self) -> Result<()>
Handle common stop functionality
Sourcepub async fn deprovision_common(&self) -> Result<()>
pub async fn deprovision_common(&self) -> Result<()>
Clear the source’s state store partition.
This is called during deprovision to remove all persisted state
associated with this source. Sources that override deprovision()
can call this to clean up their state store.
Sourcepub async fn get_status(&self) -> ComponentStatus
pub async fn get_status(&self) -> ComponentStatus
Get the current status.
Sourcepub async fn set_status(&self, status: ComponentStatus, message: Option<String>)
pub async fn set_status(&self, status: ComponentStatus, message: Option<String>)
Set the component’s status — updates local state AND notifies the graph.
This is the single canonical way to change a source’s status.
Sourcepub async fn set_task_handle(&self, handle: JoinHandle<()>)
pub async fn set_task_handle(&self, handle: JoinHandle<()>)
Set the task handle
Sourcepub async fn set_shutdown_tx(&self, tx: Sender<()>)
pub async fn set_shutdown_tx(&self, tx: Sender<()>)
Set the shutdown sender