pub trait DataSource: Send + Sync {
// Required methods
fn receive<'life0, 'async_trait>(
&'life0 mut self,
) -> Pin<Box<dyn Future<Output = SourceResult<SourceBatch>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
fn try_receive(&mut self) -> Option<SourceBatch>;
fn identifier(&self) -> String;
// Provided methods
fn supports_try_receive(&self) -> bool { ... }
fn can_try_receive(&mut self) -> bool { ... }
fn identifier_ref(&self) -> Cow<'_, str> { ... }
fn caps(&self) -> SourceCaps { ... }
fn start<'life0, 'async_trait>(
&'life0 mut self,
_ctrl_rx: CtrlRx,
) -> Pin<Box<dyn Future<Output = SourceResult<()>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn close<'life0, 'async_trait>(
&'life0 mut self,
) -> Pin<Box<dyn Future<Output = SourceResult<()>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn ack<'life0, 'async_trait>(
&'life0 mut self,
_token: Arc<dyn AckToken>,
) -> Pin<Box<dyn Future<Output = SourceResult<()>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn seek<'life0, 'async_trait>(
&'life0 mut self,
_pos: Arc<dyn SeekPosition>,
) -> Pin<Box<dyn Future<Output = SourceResult<()>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
}Expand description
Core trait for data sources.
A data source produces batches of events that can be consumed by the pipeline.
Implementations must be Send + Sync to allow concurrent access.
§Lifecycle
- Create the source instance
- Call
start(ctrl_rx)to initialize and begin listening for control events - Call
receive()repeatedly to pull data batches - Call
close()when done to release resources
§Example
#[async_trait]
impl DataSource for MySource {
async fn receive(&mut self) -> SourceResult<SourceBatch> {
// Pull data from upstream
let data = self.client.poll().await?;
Ok(vec![SourceEvent::new(0, "my-source", data, Arc::new(Tags::new()))])
}
fn try_receive(&mut self) -> Option<SourceBatch> { None }
fn identifier(&self) -> String { "my-source".into() }
}Required Methods§
Sourcefn receive<'life0, 'async_trait>(
&'life0 mut self,
) -> Pin<Box<dyn Future<Output = SourceResult<SourceBatch>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn receive<'life0, 'async_trait>(
&'life0 mut self,
) -> Pin<Box<dyn Future<Output = SourceResult<SourceBatch>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Pull the next batch of events from the source.
Returns a vector of [SourceEvent]s. An empty vector indicates no data
is currently available (not EOF). Use error variants like SourceReason::EOF
to signal end of stream.
This method should be idempotent and safe to call repeatedly after start().
Sourcefn try_receive(&mut self) -> Option<SourceBatch>
fn try_receive(&mut self) -> Option<SourceBatch>
Non-blocking attempt to receive data.
Returns Some(batch) if data is immediately available, None otherwise.
Only call this when can_try_receive() returns true.
Sourcefn identifier(&self) -> String
fn identifier(&self) -> String
Returns a unique identifier for this source instance (for logging/metrics).
Provided Methods§
Sourcefn supports_try_receive(&self) -> bool
fn supports_try_receive(&self) -> bool
Static capability: whether this source ever supports non-blocking receive.
Sourcefn can_try_receive(&mut self) -> bool
fn can_try_receive(&mut self) -> bool
Dynamic capability: whether non-blocking receive is safe right now.
Sourcefn identifier_ref(&self) -> Cow<'_, str>
fn identifier_ref(&self) -> Cow<'_, str>
Zero-allocation identifier access. Defaults to allocating via identifier().
Sourcefn caps(&self) -> SourceCaps
fn caps(&self) -> SourceCaps
Returns capability flags for this source.
Sourcefn start<'life0, 'async_trait>(
&'life0 mut self,
_ctrl_rx: CtrlRx,
) -> Pin<Box<dyn Future<Output = SourceResult<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn start<'life0, 'async_trait>(
&'life0 mut self,
_ctrl_rx: CtrlRx,
) -> Pin<Box<dyn Future<Output = SourceResult<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Initialize the source and start listening for control events.
Must be called before receive(). The ctrl_rx channel delivers
ControlEvents for lifecycle management (stop, isolate, seek).
Sourcefn close<'life0, 'async_trait>(
&'life0 mut self,
) -> Pin<Box<dyn Future<Output = SourceResult<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn close<'life0, 'async_trait>(
&'life0 mut self,
) -> Pin<Box<dyn Future<Output = SourceResult<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Stop the source and release all resources.
Must be idempotent - safe to call multiple times or before start().
Sourcefn ack<'life0, 'async_trait>(
&'life0 mut self,
_token: Arc<dyn AckToken>,
) -> Pin<Box<dyn Future<Output = SourceResult<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn ack<'life0, 'async_trait>(
&'life0 mut self,
_token: Arc<dyn AckToken>,
) -> Pin<Box<dyn Future<Output = SourceResult<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Acknowledge consumption of events up to the given token.
Only supported when caps().ack == true. Returns error by default.
Sourcefn seek<'life0, 'async_trait>(
&'life0 mut self,
_pos: Arc<dyn SeekPosition>,
) -> Pin<Box<dyn Future<Output = SourceResult<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn seek<'life0, 'async_trait>(
&'life0 mut self,
_pos: Arc<dyn SeekPosition>,
) -> Pin<Box<dyn Future<Output = SourceResult<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Seek to a specific position in the source.
Only supported when caps().seek == true. Returns error by default.