DataSource

Trait DataSource 

Source
pub trait DataSource: Send + Sync {
    // Required methods
    fn receive<'life0, 'async_trait>(
        &'life0 mut self,
    ) -> Pin<Box<dyn Future<Output = SourceResult<SourceBatch>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait;
    fn try_receive(&mut self) -> Option<SourceBatch>;
    fn identifier(&self) -> String;

    // Provided methods
    fn supports_try_receive(&self) -> bool { ... }
    fn can_try_receive(&mut self) -> bool { ... }
    fn identifier_ref(&self) -> Cow<'_, str> { ... }
    fn caps(&self) -> SourceCaps { ... }
    fn start<'life0, 'async_trait>(
        &'life0 mut self,
        _ctrl_rx: CtrlRx,
    ) -> Pin<Box<dyn Future<Output = SourceResult<()>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait { ... }
    fn close<'life0, 'async_trait>(
        &'life0 mut self,
    ) -> Pin<Box<dyn Future<Output = SourceResult<()>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait { ... }
    fn ack<'life0, 'async_trait>(
        &'life0 mut self,
        _token: Arc<dyn AckToken>,
    ) -> Pin<Box<dyn Future<Output = SourceResult<()>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait { ... }
    fn seek<'life0, 'async_trait>(
        &'life0 mut self,
        _pos: Arc<dyn SeekPosition>,
    ) -> Pin<Box<dyn Future<Output = SourceResult<()>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait { ... }
}
Expand description

Core trait for data sources.

A data source produces batches of events that can be consumed by the pipeline. Implementations must be Send + Sync to allow concurrent access.

§Lifecycle

  1. Create the source instance
  2. Call start(ctrl_rx) to initialize and begin listening for control events
  3. Call receive() repeatedly to pull data batches
  4. Call close() when done to release resources

§Example

#[async_trait]
impl DataSource for MySource {
    async fn receive(&mut self) -> SourceResult<SourceBatch> {
        // Pull data from upstream
        let data = self.client.poll().await?;
        Ok(vec![SourceEvent::new(0, "my-source", data, Arc::new(Tags::new()))])
    }

    fn try_receive(&mut self) -> Option<SourceBatch> { None }
    fn identifier(&self) -> String { "my-source".into() }
}

Required Methods§

Source

fn receive<'life0, 'async_trait>( &'life0 mut self, ) -> Pin<Box<dyn Future<Output = SourceResult<SourceBatch>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Pull the next batch of events from the source.

Returns a vector of [SourceEvent]s. An empty vector indicates no data is currently available (not EOF). Use error variants like SourceReason::EOF to signal end of stream.

This method should be idempotent and safe to call repeatedly after start().

Source

fn try_receive(&mut self) -> Option<SourceBatch>

Non-blocking attempt to receive data.

Returns Some(batch) if data is immediately available, None otherwise. Only call this when can_try_receive() returns true.

Source

fn identifier(&self) -> String

Returns a unique identifier for this source instance (for logging/metrics).

Provided Methods§

Source

fn supports_try_receive(&self) -> bool

Static capability: whether this source ever supports non-blocking receive.

Source

fn can_try_receive(&mut self) -> bool

Dynamic capability: whether non-blocking receive is safe right now.

Source

fn identifier_ref(&self) -> Cow<'_, str>

Zero-allocation identifier access. Defaults to allocating via identifier().

Source

fn caps(&self) -> SourceCaps

Returns capability flags for this source.

Source

fn start<'life0, 'async_trait>( &'life0 mut self, _ctrl_rx: CtrlRx, ) -> Pin<Box<dyn Future<Output = SourceResult<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Initialize the source and start listening for control events.

Must be called before receive(). The ctrl_rx channel delivers ControlEvents for lifecycle management (stop, isolate, seek).

Source

fn close<'life0, 'async_trait>( &'life0 mut self, ) -> Pin<Box<dyn Future<Output = SourceResult<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Stop the source and release all resources.

Must be idempotent - safe to call multiple times or before start().

Source

fn ack<'life0, 'async_trait>( &'life0 mut self, _token: Arc<dyn AckToken>, ) -> Pin<Box<dyn Future<Output = SourceResult<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Acknowledge consumption of events up to the given token.

Only supported when caps().ack == true. Returns error by default.

Source

fn seek<'life0, 'async_trait>( &'life0 mut self, _pos: Arc<dyn SeekPosition>, ) -> Pin<Box<dyn Future<Output = SourceResult<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Seek to a specific position in the source.

Only supported when caps().seek == true. Returns error by default.

Implementors§