pub trait Source: Send + Sync {
// Required method
fn fetch_with_context<'life0, 'life1, 'async_trait>(
&'life0 self,
context: &'life1 HashMap<String, Value>,
) -> Pin<Box<dyn Future<Output = Result<Vec<Value>, FaucetError>> + Send + 'async_trait>>
where 'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait;
// Provided methods
fn fetch_all<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<Vec<Value>, FaucetError>> + Send + 'async_trait>>
where 'life0: 'async_trait,
Self: 'async_trait { ... }
fn fetch_with_context_incremental<'life0, 'life1, 'async_trait>(
&'life0 self,
context: &'life1 HashMap<String, Value>,
) -> Pin<Box<dyn Future<Output = Result<(Vec<Value>, Option<Value>), FaucetError>> + Send + 'async_trait>>
where 'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait { ... }
fn fetch_all_incremental<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<(Vec<Value>, Option<Value>), FaucetError>> + Send + 'async_trait>>
where 'life0: 'async_trait,
Self: 'async_trait { ... }
fn stream_pages<'a>(
&'a self,
context: &'a HashMap<String, Value>,
batch_size: usize,
) -> Pin<Box<dyn Stream<Item = Result<StreamPage, FaucetError>> + Send + 'a>> { ... }
fn config_schema(&self) -> Value { ... }
fn state_key(&self) -> Option<String> { ... }
fn apply_start_bookmark<'life0, 'async_trait>(
&'life0 self,
_bookmark: Value,
) -> Pin<Box<dyn Future<Output = Result<(), FaucetError>> + Send + 'async_trait>>
where 'life0: 'async_trait,
Self: 'async_trait { ... }
fn connector_name(&self) -> &'static str { ... }
fn check<'life0, 'life1, 'async_trait>(
&'life0 self,
ctx: &'life1 CheckContext,
) -> Pin<Box<dyn Future<Output = Result<CheckReport, FaucetError>> + Send + 'async_trait>>
where 'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait { ... }
}source-rest and source-s3 only.Expand description
A source fetches records from an external system.
Required Methods§
Sourcefn fetch_with_context<'life0, 'life1, 'async_trait>(
&'life0 self,
context: &'life1 HashMap<String, Value>,
) -> Pin<Box<dyn Future<Output = Result<Vec<Value>, FaucetError>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
fn fetch_with_context<'life0, 'life1, 'async_trait>(
&'life0 self,
context: &'life1 HashMap<String, Value>,
) -> Pin<Box<dyn Future<Output = Result<Vec<Value>, FaucetError>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
Primary fetch method. Receives context from a parent source’s records.
An empty context map means this is a root source (no parent).
Connectors that support being a child should use
substitute_context() to resolve
{placeholder} tokens in their URL path, query parameters, headers,
or body. Connectors that don’t need parent context ignore the map.
Provided Methods§
Sourcefn fetch_all<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<Vec<Value>, FaucetError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
fn fetch_all<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<Vec<Value>, FaucetError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
Convenience: fetch with no parent context.
Sourcefn fetch_with_context_incremental<'life0, 'life1, 'async_trait>(
&'life0 self,
context: &'life1 HashMap<String, Value>,
) -> Pin<Box<dyn Future<Output = Result<(Vec<Value>, Option<Value>), FaucetError>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
fn fetch_with_context_incremental<'life0, 'life1, 'async_trait>(
&'life0 self,
context: &'life1 HashMap<String, Value>,
) -> Pin<Box<dyn Future<Output = Result<(Vec<Value>, Option<Value>), FaucetError>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
Incremental fetch with parent context support.
Returns the records and an optional bookmark value for incremental
replication. The default delegates to fetch_with_context and
returns None for the bookmark.
Sourcefn fetch_all_incremental<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<(Vec<Value>, Option<Value>), FaucetError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
fn fetch_all_incremental<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<(Vec<Value>, Option<Value>), FaucetError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
Convenience: incremental fetch with no parent context.
Sourcefn stream_pages<'a>(
&'a self,
context: &'a HashMap<String, Value>,
batch_size: usize,
) -> Pin<Box<dyn Stream<Item = Result<StreamPage, FaucetError>> + Send + 'a>>
fn stream_pages<'a>( &'a self, context: &'a HashMap<String, Value>, batch_size: usize, ) -> Pin<Box<dyn Stream<Item = Result<StreamPage, FaucetError>> + Send + 'a>>
Stream records page-by-page so the pipeline can write to the sink as pages arrive instead of buffering the full result set.
batch_size is the hint the pipeline passes down; sources are free
to use a larger or smaller native chunk (e.g. one page per HTTP
response, one row-group per Parquet file) but should approximate it
where feasible. The special value batch_size = 0 means “do not
batch — emit the entire result set in a single page.” Sources that
stream natively should treat 0 as “skip the chunking layer and
yield one page after the underlying read completes” (useful for
small lookup tables or for sinks like SQL COPY / BigQuery load
jobs that prefer one large request).
The default implementation fetches the full result set via
fetch_with_context_incremental
and chunks it in memory by batch_size. The bookmark (when present)
is attached to the final page so the pipeline only persists after
the entire fetch has been written. Sources that can stream natively
override this method and may emit per-page bookmarks (e.g. CDC).
An empty result with a Some(bookmark) still yields one empty page
carrying the bookmark, so incremental runs that produce no records
still advance their checkpoint.
Sourcefn config_schema(&self) -> Value
fn config_schema(&self) -> Value
Return a JSON Schema describing the configuration this source accepts.
Sourcefn state_key(&self) -> Option<String>
fn state_key(&self) -> Option<String>
Stable key under which this source’s incremental-replication bookmark
should be persisted in a StateStore.
Returning Some(key) opts this source into resumable runs: when the
pipeline is configured with a state store via
Pipeline::with_state_store, it
reads the bookmark at key before fetching and writes the new
bookmark back only after the sink confirms the batch was written.
The default returns None, meaning the source is not persisted.
Keys must satisfy validate_state_key.
Sourcefn apply_start_bookmark<'life0, 'async_trait>(
&'life0 self,
_bookmark: Value,
) -> Pin<Box<dyn Future<Output = Result<(), FaucetError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
fn apply_start_bookmark<'life0, 'async_trait>(
&'life0 self,
_bookmark: Value,
) -> Pin<Box<dyn Future<Output = Result<(), FaucetError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
Apply a bookmark loaded from a StateStore
as this run’s starting point.
The default implementation ignores the value, which keeps existing
sources backwards-compatible. Sources that support incremental
replication override this — typically by storing the value behind
interior mutability and consulting it inside
fetch_with_context_incremental.
Sourcefn connector_name(&self) -> &'static str
fn connector_name(&self) -> &'static str
Stable identifier used as the connector label on metrics and the
connector attribute on spans. Defaults to the final segment of
std::any::type_name::<Self>(), e.g. "RestSource". Built-in
connectors override with a short, friendly snake_case name (e.g.
"rest"). Must return a non-empty string; observability decorators
fall back to "unknown" in release builds if it is empty (and
debug_assert! in debug builds).
Sourcefn check<'life0, 'life1, 'async_trait>(
&'life0 self,
ctx: &'life1 CheckContext,
) -> Pin<Box<dyn Future<Output = Result<CheckReport, FaucetError>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
fn check<'life0, 'life1, 'async_trait>(
&'life0 self,
ctx: &'life1 CheckContext,
) -> Pin<Box<dyn Future<Output = Result<CheckReport, FaucetError>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
Run a fast, non-mutating preflight probe (used by faucet doctor).
The default pulls a single page via
stream_pages and reports success/failure — it
exercises the real read path (DNS, TLS, auth, the first request, the
first-record decode) but never paginates the full dataset and never
repeats. The page stream is dropped immediately after the first page.
Sources whose first page blocks waiting for inbound data (webhook,
websocket) or has side effects (CDC consuming WAL) override this with a
cheaper, side-effect-free probe. Probe-level failures are returned as a
ProbeStatus::Fail inside Ok(report).
Dyn Compatibility§
This trait is dyn compatible.
In older versions of Rust, dyn compatibility was called "object safety".