Skip to main content

Source

Trait Source 

Source
pub trait Source: Send + Sync {
Show 14 methods // Required methods fn id(&self) -> &str; fn type_name(&self) -> &str; fn properties(&self) -> HashMap<String, Value>; fn start<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn stop<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn status<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = ComponentStatus> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn subscribe<'life0, 'async_trait>( &'life0 self, settings: SourceSubscriptionSettings, ) -> Pin<Box<dyn Future<Output = Result<SubscriptionResponse>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn as_any(&self) -> &dyn Any; fn initialize<'life0, 'async_trait>( &'life0 self, context: SourceRuntimeContext, ) -> Pin<Box<dyn Future<Output = ()> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; // Provided methods fn dispatch_mode(&self) -> DispatchMode { ... } fn auto_start(&self) -> bool { ... } fn supports_replay(&self) -> bool { ... } fn deprovision<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn set_bootstrap_provider<'life0, 'async_trait>( &'life0 self, _provider: Box<dyn BootstrapProvider + 'static>, ) -> Pin<Box<dyn Future<Output = ()> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... }
}
Expand description

Source trait for implementing source plugins Trait defining the interface for all source implementations.

This is the core abstraction that all source plugins must implement. drasi-lib only interacts with sources through this trait - it has no knowledge of specific plugin types or their configurations.

§Example Implementation

use drasi_lib::Source;
use drasi_lib::sources::{SourceBase, SourceBaseParams};
use drasi_lib::context::SourceRuntimeContext;

pub struct MySource {
    base: SourceBase,
    // Plugin-specific fields
}

impl MySource {
    pub fn new(config: MySourceConfig) -> Result<Self> {
        let params = SourceBaseParams::new(&config.id)
            .with_dispatch_mode(config.dispatch_mode)
            .with_dispatch_buffer_capacity(config.buffer_capacity);

        Ok(Self {
            base: SourceBase::new(params)?,
        })
    }
}

#[async_trait]
impl Source for MySource {
    fn id(&self) -> &str {
        &self.base.id
    }

    fn type_name(&self) -> &str {
        "my-source"
    }

    fn properties(&self) -> HashMap<String, Value> {
        // Return plugin-specific properties
    }

    async fn initialize(&self, context: SourceRuntimeContext) {
        self.base.initialize(context).await;
    }

    // ... implement other methods
}

Required Methods§

Source

fn id(&self) -> &str

Get the source’s unique identifier

Source

fn type_name(&self) -> &str

Get the source type name (e.g., “postgres”, “http”, “mock”)

Source

fn properties(&self) -> HashMap<String, Value>

Get the source’s configuration properties for inspection

This returns a HashMap representation of the source’s configuration for use in APIs and inspection. The actual typed configuration is owned by the plugin - this is just for external visibility.

Source

fn start<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Start the source

This begins data ingestion and event generation.

Source

fn stop<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Stop the source

This stops data ingestion and cleans up resources.

Source

fn status<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = ComponentStatus> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Get the current status of the source

Source

fn subscribe<'life0, 'async_trait>( &'life0 self, settings: SourceSubscriptionSettings, ) -> Pin<Box<dyn Future<Output = Result<SubscriptionResponse>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Subscribe to this source for change events

This is called by queries to receive data changes from this source. The source should return a receiver for streaming events and optionally a bootstrap receiver for initial data.

§Arguments
  • settings - Subscription settings including query ID, text, and labels of interest
§Returns

A SubscriptionResponse containing:

  • A receiver for streaming source events
  • Optionally a bootstrap receiver for initial data
Source

fn as_any(&self) -> &dyn Any

Downcast helper for testing - allows access to concrete types

Source

fn initialize<'life0, 'async_trait>( &'life0 self, context: SourceRuntimeContext, ) -> Pin<Box<dyn Future<Output = ()> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Initialize the source with runtime context.

This method is called automatically by DrasiLib when the source is added via add_source(). Plugin developers do not need to call this directly.

The context provides access to:

  • source_id: The source’s unique identifier
  • event_tx: Channel for reporting component lifecycle events
  • state_store: Optional persistent state storage

Implementation should delegate to self.base.initialize(context).await.

Provided Methods§

Source

fn dispatch_mode(&self) -> DispatchMode

Get the dispatch mode for this source (Channel or Broadcast)

Default is Channel mode for backpressure support.

Source

fn auto_start(&self) -> bool

Whether this source should auto-start when DrasiLib starts

Default is true. Override to return false if this source should only be started manually via start_source().

Source

fn supports_replay(&self) -> bool

Whether this source supports positional replay via resume_from.

Sources backed by a persistent log (e.g., Postgres WAL, Kafka) should override this to return true. The orchestration layer uses this to validate compatibility with persistent queries and to request position handles.

Source

fn deprovision<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Permanently clean up internal state when the source is being removed.

This is called when remove_source(id, cleanup: true) is used. Use this to release external resources that should not persist after the source is deleted (e.g., drop a replication slot, remove cursors).

The default implementation is a no-op. Override only if your source manages external state that needs explicit teardown.

Errors are logged but do not prevent the source from being removed.

Source

fn set_bootstrap_provider<'life0, 'async_trait>( &'life0 self, _provider: Box<dyn BootstrapProvider + 'static>, ) -> Pin<Box<dyn Future<Output = ()> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Set the bootstrap provider for this source

This method allows setting a bootstrap provider after source construction. It is optional - sources without a bootstrap provider will report that bootstrap is not available.

Implementation should delegate to self.base.set_bootstrap_provider(provider).await.

Trait Implementations§

Source§

impl Source for Box<dyn Source + 'static>

Blanket implementation of Source for Box<dyn Source>

This allows boxed trait objects to be used with methods expecting impl Source.

Source§

fn id(&self) -> &str

Get the source’s unique identifier
Source§

fn type_name(&self) -> &str

Get the source type name (e.g., “postgres”, “http”, “mock”)
Source§

fn properties(&self) -> HashMap<String, Value>

Get the source’s configuration properties for inspection Read more
Source§

fn dispatch_mode(&self) -> DispatchMode

Get the dispatch mode for this source (Channel or Broadcast) Read more
Source§

fn auto_start(&self) -> bool

Whether this source should auto-start when DrasiLib starts Read more
Source§

fn supports_replay(&self) -> bool

Whether this source supports positional replay via resume_from. Read more
Source§

fn start<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Start the source Read more
Source§

fn stop<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Stop the source Read more
Source§

fn status<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = ComponentStatus> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Get the current status of the source
Source§

fn subscribe<'life0, 'async_trait>( &'life0 self, settings: SourceSubscriptionSettings, ) -> Pin<Box<dyn Future<Output = Result<SubscriptionResponse>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Subscribe to this source for change events Read more
Source§

fn as_any(&self) -> &dyn Any

Downcast helper for testing - allows access to concrete types
Source§

fn deprovision<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Permanently clean up internal state when the source is being removed. Read more
Source§

fn initialize<'life0, 'async_trait>( &'life0 self, context: SourceRuntimeContext, ) -> Pin<Box<dyn Future<Output = ()> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Initialize the source with runtime context. Read more
Source§

fn set_bootstrap_provider<'life0, 'async_trait>( &'life0 self, provider: Box<dyn BootstrapProvider + 'static>, ) -> Pin<Box<dyn Future<Output = ()> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Set the bootstrap provider for this source Read more

Implementations on Foreign Types§

Source§

impl Source for Box<dyn Source + 'static>

Blanket implementation of Source for Box<dyn Source>

This allows boxed trait objects to be used with methods expecting impl Source.

Source§

fn id(&self) -> &str

Source§

fn type_name(&self) -> &str

Source§

fn properties(&self) -> HashMap<String, Value>

Source§

fn dispatch_mode(&self) -> DispatchMode

Source§

fn auto_start(&self) -> bool

Source§

fn supports_replay(&self) -> bool

Source§

fn start<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Source§

fn stop<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Source§

fn status<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = ComponentStatus> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Source§

fn subscribe<'life0, 'async_trait>( &'life0 self, settings: SourceSubscriptionSettings, ) -> Pin<Box<dyn Future<Output = Result<SubscriptionResponse>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Source§

fn as_any(&self) -> &dyn Any

Source§

fn deprovision<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Source§

fn initialize<'life0, 'async_trait>( &'life0 self, context: SourceRuntimeContext, ) -> Pin<Box<dyn Future<Output = ()> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Source§

fn set_bootstrap_provider<'life0, 'async_trait>( &'life0 self, provider: Box<dyn BootstrapProvider + 'static>, ) -> Pin<Box<dyn Future<Output = ()> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Implementors§