pub struct S3Source { /* private fields */ }Expand description
An S3 source that lists and reads objects from a bucket.
Implementations§
Source§impl S3Source
impl S3Source
Sourcepub async fn new(config: S3SourceConfig) -> Result<Self, FaucetError>
pub async fn new(config: S3SourceConfig) -> Result<Self, FaucetError>
Create a new S3 source from the given configuration.
Builds the S3 client eagerly so it is reused across calls.
Trait Implementations§
Source§impl Source for S3Source
impl Source for S3Source
Source§fn stream_pages<'a>(
&'a self,
context: &'a HashMap<String, Value>,
_batch_size: usize,
) -> Pin<Box<dyn Stream<Item = Result<StreamPage, FaucetError>> + Send + 'a>>
fn stream_pages<'a>( &'a self, context: &'a HashMap<String, Value>, _batch_size: usize, ) -> Pin<Box<dyn Stream<Item = Result<StreamPage, FaucetError>> + Send + 'a>>
Stream records from listed S3 objects without buffering the full
scan. Each emitted StreamPage holds up to
S3SourceConfig::batch_size records.
The trait-level batch_size argument is ignored in favour of the
config field — the config is the user-facing knob the README
documents, and routing the pipeline-supplied hint through it would
silently override an explicit config value.
Behaviour by format:
JsonLines/RawText: the object body is decoded line-by-line viatokio::io::AsyncBufReadExt::linesso client-side memory is bounded atO(batch_size)per object. Multi-object scans are flattened — a single page may carry lines drawn from any object.JsonArray: each object is buffered fully (the JSON value can only be parsed once the array is complete) and then its records are chunked into pages ofbatch_size. See the README “Streaming and batching” section for the caveat.
batch_size = 0 is the “no batching” sentinel: one StreamPage
is emitted per S3 object (no within-object chunking and no
cross-object accumulation). The S3 source has no
incremental-replication mode today, so every emitted page carries
bookmark: None.
Source§fn fetch_with_context<'life0, 'life1, 'async_trait>(
&'life0 self,
context: &'life1 HashMap<String, Value>,
) -> Pin<Box<dyn Future<Output = Result<Vec<Value>, FaucetError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn fetch_with_context<'life0, 'life1, 'async_trait>(
&'life0 self,
context: &'life1 HashMap<String, Value>,
) -> Pin<Box<dyn Future<Output = Result<Vec<Value>, FaucetError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Source§fn config_schema(&self) -> Value
fn config_schema(&self) -> Value
Source§fn fetch_all<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<Vec<Value>, FaucetError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
fn fetch_all<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<Vec<Value>, FaucetError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
Source§fn fetch_with_context_incremental<'life0, 'life1, 'async_trait>(
&'life0 self,
context: &'life1 HashMap<String, Value>,
) -> Pin<Box<dyn Future<Output = Result<(Vec<Value>, Option<Value>), FaucetError>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
fn fetch_with_context_incremental<'life0, 'life1, 'async_trait>(
&'life0 self,
context: &'life1 HashMap<String, Value>,
) -> Pin<Box<dyn Future<Output = Result<(Vec<Value>, Option<Value>), FaucetError>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
Source§fn fetch_all_incremental<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<(Vec<Value>, Option<Value>), FaucetError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
fn fetch_all_incremental<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<(Vec<Value>, Option<Value>), FaucetError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
Source§fn state_key(&self) -> Option<String>
fn state_key(&self) -> Option<String>
StateStore. Read moreSource§fn apply_start_bookmark<'life0, 'async_trait>(
&'life0 self,
_bookmark: Value,
) -> Pin<Box<dyn Future<Output = Result<(), FaucetError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
fn apply_start_bookmark<'life0, 'async_trait>(
&'life0 self,
_bookmark: Value,
) -> Pin<Box<dyn Future<Output = Result<(), FaucetError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
StateStore
as this run’s starting point. Read moreSource§fn connector_name(&self) -> &'static str
fn connector_name(&self) -> &'static str
connector label on metrics and the
connector attribute on spans. Defaults to the final segment of
std::any::type_name::<Self>(), e.g. "RestSource". Built-in
connectors override with a short, friendly snake_case name (e.g.
"rest"). Must return a non-empty string; observability decorators
fall back to "unknown" in release builds if it is empty (and
debug_assert! in debug builds).Source§fn check<'life0, 'life1, 'async_trait>(
&'life0 self,
ctx: &'life1 CheckContext,
) -> Pin<Box<dyn Future<Output = Result<CheckReport, FaucetError>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
fn check<'life0, 'life1, 'async_trait>(
&'life0 self,
ctx: &'life1 CheckContext,
) -> Pin<Box<dyn Future<Output = Result<CheckReport, FaucetError>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
faucet doctor). Read moreAuto Trait Implementations§
impl Freeze for S3Source
impl !RefUnwindSafe for S3Source
impl Send for S3Source
impl Sync for S3Source
impl Unpin for S3Source
impl UnsafeUnpin for S3Source
impl !UnwindSafe for S3Source
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more