Skip to main content

Source

Trait Source 

Source
pub trait Source: Send + Sync {
    // Required method
    fn fetch_with_context<'life0, 'life1, 'async_trait>(
        &'life0 self,
        context: &'life1 HashMap<String, Value>,
    ) -> Pin<Box<dyn Future<Output = Result<Vec<Value>, FaucetError>> + Send + 'async_trait>>
       where 'life0: 'async_trait,
             'life1: 'async_trait,
             Self: 'async_trait;

    // Provided methods
    fn fetch_all<'life0, 'async_trait>(
        &'life0 self,
    ) -> Pin<Box<dyn Future<Output = Result<Vec<Value>, FaucetError>> + Send + 'async_trait>>
       where 'life0: 'async_trait,
             Self: 'async_trait { ... }
    fn fetch_with_context_incremental<'life0, 'life1, 'async_trait>(
        &'life0 self,
        context: &'life1 HashMap<String, Value>,
    ) -> Pin<Box<dyn Future<Output = Result<(Vec<Value>, Option<Value>), FaucetError>> + Send + 'async_trait>>
       where 'life0: 'async_trait,
             'life1: 'async_trait,
             Self: 'async_trait { ... }
    fn fetch_all_incremental<'life0, 'async_trait>(
        &'life0 self,
    ) -> Pin<Box<dyn Future<Output = Result<(Vec<Value>, Option<Value>), FaucetError>> + Send + 'async_trait>>
       where 'life0: 'async_trait,
             Self: 'async_trait { ... }
    fn stream_pages<'a>(
        &'a self,
        context: &'a HashMap<String, Value>,
        batch_size: usize,
    ) -> Pin<Box<dyn Stream<Item = Result<StreamPage, FaucetError>> + Send + 'a>> { ... }
    fn config_schema(&self) -> Value { ... }
    fn state_key(&self) -> Option<String> { ... }
    fn apply_start_bookmark<'life0, 'async_trait>(
        &'life0 self,
        _bookmark: Value,
    ) -> Pin<Box<dyn Future<Output = Result<(), FaucetError>> + Send + 'async_trait>>
       where 'life0: 'async_trait,
             Self: 'async_trait { ... }
    fn connector_name(&self) -> &'static str { ... }
    fn check<'life0, 'life1, 'async_trait>(
        &'life0 self,
        ctx: &'life1 CheckContext,
    ) -> Pin<Box<dyn Future<Output = Result<CheckReport, FaucetError>> + Send + 'async_trait>>
       where 'life0: 'async_trait,
             'life1: 'async_trait,
             Self: 'async_trait { ... }
}
Available on crate features source-kafka and source-rest only.
Expand description

A source fetches records from an external system.

Required Methods§

Source

fn fetch_with_context<'life0, 'life1, 'async_trait>( &'life0 self, context: &'life1 HashMap<String, Value>, ) -> Pin<Box<dyn Future<Output = Result<Vec<Value>, FaucetError>> + Send + 'async_trait>>
where 'life0: 'async_trait, 'life1: 'async_trait, Self: 'async_trait,

Primary fetch method. Receives context from a parent source’s records.

An empty context map means this is a root source (no parent). Connectors that support being a child should use substitute_context() to resolve {placeholder} tokens in their URL path, query parameters, headers, or body. Connectors that don’t need parent context ignore the map.

Provided Methods§

Source

fn fetch_all<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<Vec<Value>, FaucetError>> + Send + 'async_trait>>
where 'life0: 'async_trait, Self: 'async_trait,

Convenience: fetch with no parent context.

Source

fn fetch_with_context_incremental<'life0, 'life1, 'async_trait>( &'life0 self, context: &'life1 HashMap<String, Value>, ) -> Pin<Box<dyn Future<Output = Result<(Vec<Value>, Option<Value>), FaucetError>> + Send + 'async_trait>>
where 'life0: 'async_trait, 'life1: 'async_trait, Self: 'async_trait,

Incremental fetch with parent context support.

Returns the records and an optional bookmark value for incremental replication. The default delegates to fetch_with_context and returns None for the bookmark.

Source

fn fetch_all_incremental<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<(Vec<Value>, Option<Value>), FaucetError>> + Send + 'async_trait>>
where 'life0: 'async_trait, Self: 'async_trait,

Convenience: incremental fetch with no parent context.

Source

fn stream_pages<'a>( &'a self, context: &'a HashMap<String, Value>, batch_size: usize, ) -> Pin<Box<dyn Stream<Item = Result<StreamPage, FaucetError>> + Send + 'a>>

Stream records page-by-page so the pipeline can write to the sink as pages arrive instead of buffering the full result set.

batch_size is the hint the pipeline passes down; sources are free to use a larger or smaller native chunk (e.g. one page per HTTP response, one row-group per Parquet file) but should approximate it where feasible. The special value batch_size = 0 means “do not batch — emit the entire result set in a single page.” Sources that stream natively should treat 0 as “skip the chunking layer and yield one page after the underlying read completes” (useful for small lookup tables or for sinks like SQL COPY / BigQuery load jobs that prefer one large request).

The default implementation fetches the full result set via fetch_with_context_incremental and chunks it in memory by batch_size. The bookmark (when present) is attached to the final page so the pipeline only persists after the entire fetch has been written. Sources that can stream natively override this method and may emit per-page bookmarks (e.g. CDC).

An empty result with a Some(bookmark) still yields one empty page carrying the bookmark, so incremental runs that produce no records still advance their checkpoint.

Source

fn config_schema(&self) -> Value

Return a JSON Schema describing the configuration this source accepts.

Source

fn state_key(&self) -> Option<String>

Stable key under which this source’s incremental-replication bookmark should be persisted in a StateStore.

Returning Some(key) opts this source into resumable runs: when the pipeline is configured with a state store via Pipeline::with_state_store, it reads the bookmark at key before fetching and writes the new bookmark back only after the sink confirms the batch was written.

The default returns None, meaning the source is not persisted. Keys must satisfy validate_state_key.

Source

fn apply_start_bookmark<'life0, 'async_trait>( &'life0 self, _bookmark: Value, ) -> Pin<Box<dyn Future<Output = Result<(), FaucetError>> + Send + 'async_trait>>
where 'life0: 'async_trait, Self: 'async_trait,

Apply a bookmark loaded from a StateStore as this run’s starting point.

The default implementation ignores the value, which keeps existing sources backwards-compatible. Sources that support incremental replication override this — typically by storing the value behind interior mutability and consulting it inside fetch_with_context_incremental.

Source

fn connector_name(&self) -> &'static str

Stable identifier used as the connector label on metrics and the connector attribute on spans. Defaults to the final segment of std::any::type_name::<Self>(), e.g. "RestSource". Built-in connectors override with a short, friendly snake_case name (e.g. "rest"). Must return a non-empty string; observability decorators fall back to "unknown" in release builds if it is empty (and debug_assert! in debug builds).

Source

fn check<'life0, 'life1, 'async_trait>( &'life0 self, ctx: &'life1 CheckContext, ) -> Pin<Box<dyn Future<Output = Result<CheckReport, FaucetError>> + Send + 'async_trait>>
where 'life0: 'async_trait, 'life1: 'async_trait, Self: 'async_trait,

Run a fast, non-mutating preflight probe (used by faucet doctor).

The default pulls a single page via stream_pages and reports success/failure — it exercises the real read path (DNS, TLS, auth, the first request, the first-record decode) but never paginates the full dataset and never repeats. The page stream is dropped immediately after the first page.

Sources whose first page blocks waiting for inbound data (webhook, websocket) or has side effects (CDC consuming WAL) override this with a cheaper, side-effect-free probe. Probe-level failures are returned as a ProbeStatus::Fail inside Ok(report).

Dyn Compatibility§

This trait is dyn compatible.

In older versions of Rust, dyn compatibility was called "object safety".

Implementors§