pub struct SourceBase {
pub id: String,
pub auto_start: bool,
pub dispatchers: Arc<RwLock<Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>>>>,
pub task_handle: Arc<RwLock<Option<JoinHandle<()>>>>,
pub shutdown_tx: Arc<RwLock<Option<Sender<()>>>>,
/* private fields */
}Expand description
Base implementations for source plugins Base implementation for common source functionality
Fields§
§id: StringSource identifier
auto_start: boolWhether this source should auto-start
dispatchers: Arc<RwLock<Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>>>>Dispatchers for sending source events to subscribers
This is a vector of dispatchers that send source events to all registered subscribers (queries). When a source produces a change event, it broadcasts it to all dispatchers in this vector.
task_handle: Arc<RwLock<Option<JoinHandle<()>>>>Handle to the source’s main task
shutdown_tx: Arc<RwLock<Option<Sender<()>>>>Sender for shutdown signal
Implementations§
Source§impl SourceBase
impl SourceBase
Sourcepub const MAX_SOURCE_POSITION_BYTES: usize = 65_536
pub const MAX_SOURCE_POSITION_BYTES: usize = 65_536
Maximum allowed size for source position bytes (64KB). Positions exceeding this limit are skipped at checkpoint time to prevent memory issues, preserving the last good position.
Sourcepub fn new(params: SourceBaseParams) -> Result<Self>
pub fn new(params: SourceBaseParams) -> Result<Self>
Create a new SourceBase with the given parameters
The status channel is not required during construction - it will be
provided via the SourceRuntimeContext when initialize() is called.
If a bootstrap provider is specified in params, it will be set during construction (no async needed since nothing is shared yet).
Sourcepub fn get_auto_start(&self) -> bool
pub fn get_auto_start(&self) -> bool
Get whether this source should auto-start
Sourcepub fn get_dispatch_mode(&self) -> DispatchMode
pub fn get_dispatch_mode(&self) -> DispatchMode
Get this source’s dispatch mode.
Sourcepub fn set_raw_config(&mut self, config: Value)
pub fn set_raw_config(&mut self, config: Value)
Set the original raw config JSON for lossless persistence roundtrips.
Called by plugin descriptors to preserve the original config JSON
(including ConfigValue envelopes for secrets and env vars) before
resolution to plain values.
Sourcepub fn raw_config(&self) -> Option<&Value>
pub fn raw_config(&self) -> Option<&Value>
Get the original raw config JSON, if set by a descriptor.
Returns None for builder-created components that don’t have
a raw config JSON (they use DTO reconstruction in properties()).
Sourcepub fn properties_or_serialize<D: Serialize>(
&self,
fallback_dto: &D,
) -> HashMap<String, Value>
pub fn properties_or_serialize<D: Serialize>( &self, fallback_dto: &D, ) -> HashMap<String, Value>
Build the properties map for this source.
If raw_config was set (descriptor path), returns its top-level keys.
Otherwise, serializes fallback_dto (the DTO reconstructed from typed
config) to produce camelCase output.
This eliminates the duplicated if-let + serialize pattern from plugins.
Sourcepub async fn initialize(&self, context: SourceRuntimeContext)
pub async fn initialize(&self, context: SourceRuntimeContext)
Initialize the source with runtime context.
This method is called automatically by DrasiLib’s add_source() method.
Plugin developers do not need to call this directly.
The context provides access to:
source_id: The source’s unique identifierupdate_tx: mpsc sender for fire-and-forget status updates to the component graphstate_store: Optional persistent state storage
Sourcepub async fn context(&self) -> Option<SourceRuntimeContext>
pub async fn context(&self) -> Option<SourceRuntimeContext>
Get the runtime context if initialized.
Returns None if initialize() has not been called yet.
Sourcepub async fn state_store(&self) -> Option<Arc<dyn StateStoreProvider>>
pub async fn state_store(&self) -> Option<Arc<dyn StateStoreProvider>>
Get the state store if configured.
Returns None if no state store was provided in the context.
Sourcepub async fn identity_provider(&self) -> Option<Arc<dyn IdentityProvider>>
pub async fn identity_provider(&self) -> Option<Arc<dyn IdentityProvider>>
Get the identity provider if set.
Returns the identity provider set either programmatically via
set_identity_provider() or from the runtime context during initialize().
Programmatically-set providers take precedence over context providers.
Sourcepub async fn set_identity_provider(&self, provider: Arc<dyn IdentityProvider>)
pub async fn set_identity_provider(&self, provider: Arc<dyn IdentityProvider>)
Set the identity provider programmatically.
This is typically called during source construction when the provider
is available from configuration (e.g., with_identity_provider() builder).
Providers set this way take precedence over context-injected providers.
Sourcepub async fn create_position_handle(&self, query_id: &str) -> Arc<AtomicU64> ⓘ
pub async fn create_position_handle(&self, query_id: &str) -> Arc<AtomicU64> ⓘ
Create and register a position handle for query_id, initialized to u64::MAX.
Returns the shared handle; the same Arc is placed in
SubscriptionResponse::position_handle so the query and the source share
one atomic. If a handle already exists for query_id (re-subscribe after
transient disconnect), the existing handle is returned to preserve any
position the query had previously reported.
Sourcepub async fn remove_position_handle(&self, query_id: &str)
pub async fn remove_position_handle(&self, query_id: &str)
Remove the position handle for query_id. No-op if absent.
Called from explicit cleanup paths (stop_query/delete_query will be
wired in a follow-up issue). Until then, cleanup_stale_handles()
(invoked inside compute_confirmed_position) catches dropped subscribers.
Sourcepub async fn compute_confirmed_position(&self) -> Option<u64>
pub async fn compute_confirmed_position(&self) -> Option<u64>
Compute the minimum confirmed position across all live subscribers.
Returns None if no handles are registered, or if every registered
handle is still u64::MAX (no subscriber has confirmed a position yet —
typically because they are still bootstrapping). Otherwise returns the
minimum non-u64::MAX value, suitable for advancing the source’s
upstream cursor (Postgres flush_lsn, Kafka commit, transient WAL prune
threshold).
Piggy-backs cleanup_stale_handles() so dropped subscribers do not pin
the watermark indefinitely.
Sourcepub async fn cleanup_stale_handles(&self)
pub async fn cleanup_stale_handles(&self)
Drop entries whose Arc::strong_count == 1 (only SourceBase holds a
reference).
This indicates the subscriber dropped its SubscriptionResponse without
calling remove_position_handle — common during query teardown until
explicit cleanup is wired by the query manager.
Safety constraint: this relies on SourceBase being the only long-lived
holder of the Arc besides the subscribing query. If a future periodic
scan task (or any other component) clones the handle, this method must
be revisited or replaced with explicit liveness tracking.
Sourcepub async fn compute_confirmed_source_position(&self) -> Option<Bytes>
pub async fn compute_confirmed_source_position(&self) -> Option<Bytes>
Translate the confirmed framework sequence into the corresponding source-native position (e.g. Postgres WAL LSN, Kafka offset).
Returns None when no confirmed position exists (no subscribers, all
at u64::MAX, or the sequence map has been pruned past the confirmed
point).
This does not prune the internal map — call
prune_position_map() after the source
has successfully acknowledged the position to its upstream.
Sourcepub async fn prune_position_map(&self, up_to_seq: u64)
pub async fn prune_position_map(&self, up_to_seq: u64)
Prune sequence→position entries that are no longer needed.
Removes all entries with sequence ≤ up_to_seq. Call this after the
source has successfully sent feedback/committed the confirmed position
to its upstream, so re-send on failure is still possible.
Sourcepub fn set_next_sequence(&self, sequence: u64)
pub fn set_next_sequence(&self, sequence: u64)
Reset the sequence counter, typically after recovering from a checkpoint.
The next dispatched event will receive sequence + 1.
Sourcepub fn apply_subscription_settings(&self, settings: &SourceSubscriptionSettings)
pub fn apply_subscription_settings(&self, settings: &SourceSubscriptionSettings)
Apply subscription settings that affect the source base.
Should be called at the start of Source::subscribe() implementations.
Handles:
- Recovering the sequence counter from
last_sequenceto maintain monotonicity across restarts.
Sourcepub fn status_handle(&self) -> ComponentStatusHandle
pub fn status_handle(&self) -> ComponentStatusHandle
Returns a cloneable ComponentStatusHandle for use in spawned tasks.
The handle can both read and write the component’s status and automatically
notifies the graph on every status change (after initialize()).
Clone the SourceBase with shared Arc references
This creates a new SourceBase that shares the same underlying data through Arc references. Useful for passing to spawned tasks.
Sourcepub async fn set_bootstrap_provider(
&self,
provider: impl BootstrapProvider + 'static,
)
pub async fn set_bootstrap_provider( &self, provider: impl BootstrapProvider + 'static, )
Set the bootstrap provider for this source, taking ownership.
Call this after creating the SourceBase if the source plugin supports bootstrapping. The bootstrap provider is created by the plugin using its own configuration.
§Example
let provider = MyBootstrapProvider::new(config);
source_base.set_bootstrap_provider(provider).await; // Ownership transferredSourcepub async fn set_position_comparator(
&self,
comparator: impl PositionComparator + 'static,
)
pub async fn set_position_comparator( &self, comparator: impl PositionComparator + 'static, )
Set the position comparator for per-subscriber replay filtering.
Sources that support replay should call this during construction to
enable per-subscriber position gating. Without a comparator, all events
are delivered to all subscribers regardless of their resume_from position.
Sourcepub async fn create_streaming_receiver(
&self,
) -> Result<Box<dyn ChangeReceiver<SourceEventWrapper>>>
pub async fn create_streaming_receiver( &self, ) -> Result<Box<dyn ChangeReceiver<SourceEventWrapper>>>
Create a streaming receiver for a query subscription
This creates the appropriate receiver based on the configured dispatch mode:
- Broadcast mode: Returns a receiver from the shared broadcast dispatcher
- Channel mode: Creates a new dedicated dispatcher and returns its receiver
This is a helper method that can be used by sources with custom subscribe logic.
Sourcepub async fn wait_for_subscribers(&self)
pub async fn wait_for_subscribers(&self)
Wait until at least one subscriber has registered.
Sources that start a background polling loop (e.g. CDC) should call
this before entering their poll loop on a restart cycle. Without this,
events dispatched before subscribe() creates a new dispatcher would
be silently dropped, advancing the checkpoint past changes that no
subscriber ever received.
Returns immediately if at least one dispatcher already exists (fresh start with bootstrap, or broadcast mode which always has one).
Sourcepub async fn subscribe_with_bootstrap(
&self,
settings: &SourceSubscriptionSettings,
source_type: &str,
) -> Result<SubscriptionResponse>
pub async fn subscribe_with_bootstrap( &self, settings: &SourceSubscriptionSettings, source_type: &str, ) -> Result<SubscriptionResponse>
Subscribe to this source with optional bootstrap
This is the standard subscribe implementation that all sources can use. It handles:
- Creating a receiver for streaming events (based on dispatch mode)
- Setting up bootstrap if requested and a provider has been set
- Returning the appropriate SubscriptionResponse
Sourcepub async fn subscribe_with_bootstrap_context(
&self,
settings: &SourceSubscriptionSettings,
source_type: &str,
bootstrap_properties: HashMap<String, Value>,
) -> Result<SubscriptionResponse>
pub async fn subscribe_with_bootstrap_context( &self, settings: &SourceSubscriptionSettings, source_type: &str, bootstrap_properties: HashMap<String, Value>, ) -> Result<SubscriptionResponse>
Subscribe to this source with optional bootstrap context properties.
Sourcepub async fn create_bootstrap_receiver(
&self,
settings: &SourceSubscriptionSettings,
source_type: &str,
bootstrap_properties: HashMap<String, Value>,
) -> Result<Option<BootstrapEventReceiver>>
pub async fn create_bootstrap_receiver( &self, settings: &SourceSubscriptionSettings, source_type: &str, bootstrap_properties: HashMap<String, Value>, ) -> Result<Option<BootstrapEventReceiver>>
Create only the bootstrap receiver for a subscription.
Sourcepub async fn dispatch_source_change(&self, change: SourceChange) -> Result<()>
pub async fn dispatch_source_change(&self, change: SourceChange) -> Result<()>
Dispatch a SourceChange event with profiling metadata
This method handles the common pattern of:
- Creating profiling metadata with timestamp
- Wrapping the change in a SourceEventWrapper
- Dispatching to all subscribers
- Handling the no-subscriber case gracefully
Sourcepub async fn dispatch_event(&self, wrapper: SourceEventWrapper) -> Result<()>
pub async fn dispatch_event(&self, wrapper: SourceEventWrapper) -> Result<()>
Dispatch a SourceEventWrapper to all subscribers.
This is a generic method for dispatching any SourceEvent. It handles Arc-wrapping for zero-copy sharing and logs when there are no subscribers. The framework stamps every event with a monotonic sequence number.
Sourcepub async fn dispatch_events_batch(
&self,
events: Vec<SourceEventWrapper>,
) -> Result<()>
pub async fn dispatch_events_batch( &self, events: Vec<SourceEventWrapper>, ) -> Result<()>
Dispatch a batch of events, acquiring the dispatchers lock once for
the entire batch. This is more efficient than calling
dispatch_event() per-event when the source
processes multiple rows per poll cycle.
Sourcepub async fn broadcast_control(&self, control: SourceControl) -> Result<()>
pub async fn broadcast_control(&self, control: SourceControl) -> Result<()>
Broadcast SourceControl events
Sourcepub fn try_test_subscribe(
&self,
) -> Result<Box<dyn ChangeReceiver<SourceEventWrapper>>>
pub fn try_test_subscribe( &self, ) -> Result<Box<dyn ChangeReceiver<SourceEventWrapper>>>
Create a test subscription to this source (synchronous, fallible)
This method is intended for use in tests to receive events from the source.
It properly handles both Broadcast and Channel dispatch modes by delegating
to create_streaming_receiver(), making the dispatch mode transparent to tests.
Note: This is a synchronous wrapper that uses tokio::task::block_in_place internally.
For async contexts, prefer calling create_streaming_receiver() directly.
§Returns
A receiver that will receive all events dispatched by this source, or an error if the receiver cannot be created.
Sourcepub fn test_subscribe(&self) -> Box<dyn ChangeReceiver<SourceEventWrapper>>
pub fn test_subscribe(&self) -> Box<dyn ChangeReceiver<SourceEventWrapper>>
Create a test subscription to this source (synchronous wrapper)
Convenience wrapper around try_test_subscribe
that panics on failure. Prefer try_test_subscribe() in new code.
§Panics
Panics if the receiver cannot be created.
Sourcepub async fn dispatch_from_task(
dispatchers: Arc<RwLock<Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>>>>,
wrapper: SourceEventWrapper,
source_id: &str,
) -> Result<()>
pub async fn dispatch_from_task( dispatchers: Arc<RwLock<Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>>>>, wrapper: SourceEventWrapper, source_id: &str, ) -> Result<()>
Helper function to dispatch events from spawned tasks (unstamped).
This is a static helper that can be used from spawned async tasks that don’t
have access to self. It manually iterates through dispatchers and sends the event.
Important: This method does NOT stamp a monotonic sequence number and
does NOT validate source_position size. Events dispatched through this
method will not be checkpoint-tracked. This is acceptable for sources that
do not support replay (supports_replay() == false).
§For recoverable/checkpointed sources
Use clone_shared() to obtain a SourceBase that
can be moved into spawned tasks, then call dispatch_event()
which stamps sequences and validates positions:
let base = self.base.clone_shared();
tokio::spawn(async move {
base.dispatch_event(wrapper).await.ok();
});§Arguments
dispatchers- Arc to the dispatchers list (fromself.base.dispatchers.clone())wrapper- The event wrapper to dispatchsource_id- Source ID for logging
Sourcepub async fn stop_common(&self) -> Result<()>
pub async fn stop_common(&self) -> Result<()>
Handle common stop functionality
Sourcepub async fn clear_dispatchers(&self)
pub async fn clear_dispatchers(&self)
Clear stale dispatchers from a prior lifecycle.
Sources that manage their own stop/start lifecycle (instead of using
stop_common()) must call this at the end of their stop()
implementation. Without this, a subsequent start() + subscribe()
cycle can race: the polling loop dispatches events to the old (dead)
channel receivers before subscribe() creates fresh dispatchers,
silently dropping events while still advancing the checkpoint.
Only channel-mode dispatchers are cleared — broadcast mode keeps a single persistent dispatcher.
Sourcepub async fn deprovision_common(&self) -> Result<()>
pub async fn deprovision_common(&self) -> Result<()>
Clear the source’s state store partition.
This is called during deprovision to remove all persisted state
associated with this source. Sources that override deprovision()
can call this to clean up their state store.
Sourcepub async fn get_status(&self) -> ComponentStatus
pub async fn get_status(&self) -> ComponentStatus
Get the current status.
Sourcepub async fn set_status(&self, status: ComponentStatus, message: Option<String>)
pub async fn set_status(&self, status: ComponentStatus, message: Option<String>)
Set the component’s status — updates local state AND notifies the graph.
This is the single canonical way to change a source’s status.
Sourcepub async fn set_task_handle(&self, handle: JoinHandle<()>)
pub async fn set_task_handle(&self, handle: JoinHandle<()>)
Set the task handle
Sourcepub async fn set_shutdown_tx(&self, tx: Sender<()>)
pub async fn set_shutdown_tx(&self, tx: Sender<()>)
Set the shutdown sender