pub trait Source: Send + Sync {
Show 14 methods
// Required methods
fn id(&self) -> &str;
fn type_name(&self) -> &str;
fn properties(&self) -> HashMap<String, Value>;
fn start<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
fn stop<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
fn status<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = ComponentStatus> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
fn subscribe<'life0, 'async_trait>(
&'life0 self,
settings: SourceSubscriptionSettings,
) -> Pin<Box<dyn Future<Output = Result<SubscriptionResponse>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
fn as_any(&self) -> &dyn Any;
fn initialize<'life0, 'async_trait>(
&'life0 self,
context: SourceRuntimeContext,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
// Provided methods
fn dispatch_mode(&self) -> DispatchMode { ... }
fn auto_start(&self) -> bool { ... }
fn supports_replay(&self) -> bool { ... }
fn deprovision<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn set_bootstrap_provider<'life0, 'async_trait>(
&'life0 self,
_provider: Box<dyn BootstrapProvider + 'static>,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
}Expand description
Source trait for implementing source plugins Trait defining the interface for all source implementations.
This is the core abstraction that all source plugins must implement. drasi-lib only interacts with sources through this trait - it has no knowledge of specific plugin types or their configurations.
§Example Implementation
use drasi_lib::Source;
use drasi_lib::sources::{SourceBase, SourceBaseParams};
use drasi_lib::context::SourceRuntimeContext;
pub struct MySource {
base: SourceBase,
// Plugin-specific fields
}
impl MySource {
pub fn new(config: MySourceConfig) -> Result<Self> {
let params = SourceBaseParams::new(&config.id)
.with_dispatch_mode(config.dispatch_mode)
.with_dispatch_buffer_capacity(config.buffer_capacity);
Ok(Self {
base: SourceBase::new(params)?,
})
}
}
#[async_trait]
impl Source for MySource {
fn id(&self) -> &str {
&self.base.id
}
fn type_name(&self) -> &str {
"my-source"
}
fn properties(&self) -> HashMap<String, Value> {
// Return plugin-specific properties
}
async fn initialize(&self, context: SourceRuntimeContext) {
self.base.initialize(context).await;
}
// ... implement other methods
}Required Methods§
Sourcefn properties(&self) -> HashMap<String, Value>
fn properties(&self) -> HashMap<String, Value>
Return all configuration properties for this source, including secrets.
§Persistence contract
This method is the serialization hook used by the host to persist
configuration to disk. When the server saves its config file it calls
snapshot_configuration(), which in turn calls properties() on every
source. The returned map is written to the YAML config so the component
can be recreated on the next startup.
Because there is no separate config cache — the live component is the single source of truth — any key/value omitted here will be lost on the next save and the component will fail to start after a restart.
§⚠ Do not filter secrets
Implementations must include sensitive values (passwords, tokens, connection strings, etc.). Removing them makes the persistence round-trip lossy and breaks restart. The host is responsible for protecting the config file on disk; this method is not an external-facing API.
Sourcefn start<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn start<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Start the source
This begins data ingestion and event generation.
Sourcefn stop<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn stop<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Stop the source
This stops data ingestion and cleans up resources.
Sourcefn status<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = ComponentStatus> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn status<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = ComponentStatus> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Get the current status of the source
Sourcefn subscribe<'life0, 'async_trait>(
&'life0 self,
settings: SourceSubscriptionSettings,
) -> Pin<Box<dyn Future<Output = Result<SubscriptionResponse>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn subscribe<'life0, 'async_trait>(
&'life0 self,
settings: SourceSubscriptionSettings,
) -> Pin<Box<dyn Future<Output = Result<SubscriptionResponse>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Subscribe to this source for change events.
This is called by queries to receive data changes from this source. The source should return a receiver for streaming events and optionally a bootstrap receiver for initial data.
§Important
Implementations must call
SourceBase::apply_subscription_settings(&settings)
at the start of their implementation (or delegate to
SourceBase::subscribe_with_bootstrap()
which does it automatically). Failing to do so will break sequence
monotonicity after restarts.
§Arguments
settings- Subscription settings including query ID, text, and labels of interest
§Returns
A SubscriptionResponse containing:
- A receiver for streaming source events
- Optionally a bootstrap receiver for initial data
Sourcefn initialize<'life0, 'async_trait>(
&'life0 self,
context: SourceRuntimeContext,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn initialize<'life0, 'async_trait>(
&'life0 self,
context: SourceRuntimeContext,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Initialize the source with runtime context.
This method is called automatically by DrasiLib when the source is added
via add_source(). Plugin developers do not need to call this directly.
The context provides access to:
source_id: The source’s unique identifierevent_tx: Channel for reporting component lifecycle eventsstate_store: Optional persistent state storage
Implementation should delegate to self.base.initialize(context).await.
Provided Methods§
Sourcefn dispatch_mode(&self) -> DispatchMode
fn dispatch_mode(&self) -> DispatchMode
Get the dispatch mode for this source (Channel or Broadcast)
Default is Channel mode for backpressure support.
Sourcefn auto_start(&self) -> bool
fn auto_start(&self) -> bool
Whether this source should auto-start when DrasiLib starts
Default is true. Override to return false if this source
should only be started manually via start_source().
Sourcefn supports_replay(&self) -> bool
fn supports_replay(&self) -> bool
Whether this source supports positional replay via resume_from.
Sources backed by a persistent log (e.g., Postgres WAL, Kafka) should
override this to return true. The orchestration layer uses this to
validate compatibility with persistent queries and to request position handles.
Sourcefn deprovision<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn deprovision<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Permanently clean up internal state when the source is being removed.
This is called when remove_source(id, cleanup: true) is used.
Use this to release external resources that should not persist after
the source is deleted (e.g., drop a replication slot, remove cursors).
The default implementation is a no-op. Override only if your source manages external state that needs explicit teardown.
Errors are logged but do not prevent the source from being removed.
Sourcefn set_bootstrap_provider<'life0, 'async_trait>(
&'life0 self,
_provider: Box<dyn BootstrapProvider + 'static>,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn set_bootstrap_provider<'life0, 'async_trait>(
&'life0 self,
_provider: Box<dyn BootstrapProvider + 'static>,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Set the bootstrap provider for this source
This method allows setting a bootstrap provider after source construction. It is optional - sources without a bootstrap provider will report that bootstrap is not available.
Implementation should delegate to self.base.set_bootstrap_provider(provider).await.
Trait Implementations§
Source§impl Source for Box<dyn Source + 'static>
Blanket implementation of Source for Box<dyn Source>
impl Source for Box<dyn Source + 'static>
Blanket implementation of Source for Box<dyn Source>
This allows boxed trait objects to be used with methods expecting impl Source.
Source§fn properties(&self) -> HashMap<String, Value>
fn properties(&self) -> HashMap<String, Value>
Source§fn dispatch_mode(&self) -> DispatchMode
fn dispatch_mode(&self) -> DispatchMode
Source§fn auto_start(&self) -> bool
fn auto_start(&self) -> bool
Source§fn supports_replay(&self) -> bool
fn supports_replay(&self) -> bool
resume_from. Read moreSource§fn start<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn start<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Source§fn stop<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn stop<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Source§fn status<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = ComponentStatus> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn status<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = ComponentStatus> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Source§fn subscribe<'life0, 'async_trait>(
&'life0 self,
settings: SourceSubscriptionSettings,
) -> Pin<Box<dyn Future<Output = Result<SubscriptionResponse>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn subscribe<'life0, 'async_trait>(
&'life0 self,
settings: SourceSubscriptionSettings,
) -> Pin<Box<dyn Future<Output = Result<SubscriptionResponse>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Source§fn deprovision<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn deprovision<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Source§fn initialize<'life0, 'async_trait>(
&'life0 self,
context: SourceRuntimeContext,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn initialize<'life0, 'async_trait>(
&'life0 self,
context: SourceRuntimeContext,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Implementations on Foreign Types§
Source§impl Source for Box<dyn Source + 'static>
Blanket implementation of Source for Box<dyn Source>
impl Source for Box<dyn Source + 'static>
Blanket implementation of Source for Box<dyn Source>
This allows boxed trait objects to be used with methods expecting impl Source.