pub struct GrpcStreamConfig {Show 18 fields
pub endpoint: String,
pub service_name: String,
pub method_name: String,
pub request: Value,
pub descriptor_set_path: PathBuf,
pub auth: AuthSpec<GrpcAuth>,
pub tls: Option<bool>,
pub records_path: Option<String>,
pub batch_size: usize,
pub rpc_kind: RpcKind,
pub max_messages: Option<usize>,
pub terminate_on_error: bool,
pub reconnect_initial_backoff: Duration,
pub reconnect_max_backoff: Duration,
pub reconnect_max_attempts: Option<u32>,
pub reconnect_replay_from_start: bool,
pub max_decoding_message_size: Option<usize>,
pub max_encoding_message_size: Option<usize>,
}source-grpc and source-rest only.Expand description
Configuration for the gRPC source.
Fields§
§endpoint: StringgRPC endpoint URL (e.g. "http://localhost:50051").
service_name: StringFully qualified service name (e.g. "mypackage.MyService").
method_name: StringMethod name (e.g. "ListUsers").
request: ValueRequest message as JSON. Fields are mapped to protobuf fields
using the FileDescriptorSet.
descriptor_set_path: PathBufPath to the compiled FileDescriptorSet file.
auth: AuthSpec<GrpcAuth>Authentication: either inline ({ type, config }) or a { ref: <name> }
pointer to a shared provider in the CLI’s top-level auth: catalog.
tls: Option<bool>Whether to use TLS (detected from https:// in endpoint by default).
records_path: Option<String>JSONPath to extract records from the response. If not set, the entire response is returned as a single record.
batch_size: usizeRecords per emitted StreamPage. Defaults
to DEFAULT_BATCH_SIZE.
For unary RPCs the source has no native paging primitive to honour
this hint: the default
Source::stream_pages impl
buffers the full response and then chunks it in memory, bounding
sink-side memory only.
For server-streaming RPCs (rpc_kind = "server_streaming") the source
overrides stream_pages and flushes a page each time batch_size
messages accumulate, bounding both source-side and sink-side memory.
batch_size = 0 drains the entire stream into a single page.
rpc_kind: RpcKindKind of RPC to invoke. Defaults to RpcKind::Unary.
max_messages: Option<usize>For RpcKind::ServerStreaming: maximum number of messages to consume
before terminating the stream. None means consume until the server
closes the stream (or the run is cancelled).
terminate_on_error: boolFor RpcKind::ServerStreaming: if true, transient stream errors
(server-side disconnects, transport errors, etc.) terminate the run
with FaucetError::Source. When
false (the default), the source reconnects with exponential backoff
up to reconnect_max_attempts.
Ignored for RpcKind::Unary.
reconnect_initial_backoff: DurationFor RpcKind::ServerStreaming reconnect: initial backoff delay
before the first retry. Doubles after each failure up to
reconnect_max_backoff. Defaults to 1s.
reconnect_max_backoff: DurationFor RpcKind::ServerStreaming reconnect: maximum backoff cap.
Defaults to 30s.
reconnect_max_attempts: Option<u32>For RpcKind::ServerStreaming reconnect: maximum reconnect attempts
before surfacing the error. None (the default) means unlimited
retries.
reconnect_replay_from_start: boolFor RpcKind::ServerStreaming reconnect: whether the server replays
the response stream from the beginning when the identical request is
re-issued after a disconnect. Defaults to true.
Because the request is resolved once per run, a reconnect sends the
same request — a stateless server therefore re-streams from message
0. When true the source skips the messages it already emitted before
the disconnect, so consumers see each message once. Set to false
only for servers that resume mid-stream on the same request (rare):
there, skipping would drop genuinely-new messages, so every received
message is emitted (at-least-once; duplicates possible).
max_decoding_message_size: Option<usize>Maximum size, in bytes, of a single inbound (decoded) gRPC message.
None (the default) keeps tonic’s built-in 4 MiB limit. Raise this
for sources that legitimately return large messages; a too-low limit
surfaces as a decode error and aborts the call. Applies to both unary
and server-streaming RPCs.
max_encoding_message_size: Option<usize>Maximum size, in bytes, of a single outbound (encoded) gRPC request
message. None (the default) keeps tonic’s built-in limit. Rarely
needs tuning for a data source, since requests are typically small.
Implementations§
Source§impl GrpcStreamConfig
impl GrpcStreamConfig
Sourcepub fn new(
endpoint: impl Into<String>,
service_name: impl Into<String>,
method_name: impl Into<String>,
descriptor_set_path: impl Into<PathBuf>,
) -> GrpcStreamConfig
pub fn new( endpoint: impl Into<String>, service_name: impl Into<String>, method_name: impl Into<String>, descriptor_set_path: impl Into<PathBuf>, ) -> GrpcStreamConfig
Create a new config with the required fields.
Sourcepub fn request(self, request: Value) -> GrpcStreamConfig
pub fn request(self, request: Value) -> GrpcStreamConfig
Set the request message as JSON.
Sourcepub fn auth(self, auth: GrpcAuth) -> GrpcStreamConfig
pub fn auth(self, auth: GrpcAuth) -> GrpcStreamConfig
Set the authentication method (inline).
Sourcepub fn tls(self, tls: bool) -> GrpcStreamConfig
pub fn tls(self, tls: bool) -> GrpcStreamConfig
Set the TLS mode explicitly.
Sourcepub fn records_path(self, path: impl Into<String>) -> GrpcStreamConfig
pub fn records_path(self, path: impl Into<String>) -> GrpcStreamConfig
Set the JSONPath for record extraction from the response.
Sourcepub fn with_batch_size(self, batch_size: usize) -> GrpcStreamConfig
pub fn with_batch_size(self, batch_size: usize) -> GrpcStreamConfig
Set the per-page record count for
Source::stream_pages.
Pass 0 to opt out of batching — the entire result set is emitted in
a single StreamPage. For unary RPCs this
is observably identical to any positive batch_size, since the full
response is buffered before any page is yielded. For server-streaming
RPCs, 0 drains the entire stream before yielding.
Sourcepub fn rpc_kind(self, rpc_kind: RpcKind) -> GrpcStreamConfig
pub fn rpc_kind(self, rpc_kind: RpcKind) -> GrpcStreamConfig
Set the RPC kind (unary or server-streaming).
Sourcepub fn max_messages(self, max_messages: usize) -> GrpcStreamConfig
pub fn max_messages(self, max_messages: usize) -> GrpcStreamConfig
Cap the number of messages to consume from a server-streaming RPC. Ignored for unary RPCs.
Sourcepub fn terminate_on_error(self, terminate_on_error: bool) -> GrpcStreamConfig
pub fn terminate_on_error(self, terminate_on_error: bool) -> GrpcStreamConfig
Whether transient server-streaming errors should terminate the run
(true) or trigger a reconnect with exponential backoff (false,
the default).
Sourcepub fn reconnect_initial_backoff(self, backoff: Duration) -> GrpcStreamConfig
pub fn reconnect_initial_backoff(self, backoff: Duration) -> GrpcStreamConfig
Set the initial reconnect backoff for server-streaming RPCs.
Sourcepub fn reconnect_max_backoff(self, backoff: Duration) -> GrpcStreamConfig
pub fn reconnect_max_backoff(self, backoff: Duration) -> GrpcStreamConfig
Set the maximum reconnect backoff for server-streaming RPCs.
Sourcepub fn reconnect_max_attempts(self, attempts: u32) -> GrpcStreamConfig
pub fn reconnect_max_attempts(self, attempts: u32) -> GrpcStreamConfig
Cap the number of reconnect attempts for server-streaming RPCs.
Sourcepub fn reconnect_replay_from_start(self, replay: bool) -> GrpcStreamConfig
pub fn reconnect_replay_from_start(self, replay: bool) -> GrpcStreamConfig
Set whether the server replays the stream from the start on reconnect
(default true). See
reconnect_replay_from_start.
Sourcepub fn max_decoding_message_size(self, bytes: usize) -> GrpcStreamConfig
pub fn max_decoding_message_size(self, bytes: usize) -> GrpcStreamConfig
Set the maximum inbound (decoded) gRPC message size in bytes.
Sourcepub fn max_encoding_message_size(self, bytes: usize) -> GrpcStreamConfig
pub fn max_encoding_message_size(self, bytes: usize) -> GrpcStreamConfig
Set the maximum outbound (encoded) gRPC message size in bytes.
Trait Implementations§
Source§impl Clone for GrpcStreamConfig
impl Clone for GrpcStreamConfig
Source§fn clone(&self) -> GrpcStreamConfig
fn clone(&self) -> GrpcStreamConfig
1.0.0 (const: unstable) · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read moreSource§impl Debug for GrpcStreamConfig
impl Debug for GrpcStreamConfig
Source§impl<'de> Deserialize<'de> for GrpcStreamConfig
impl<'de> Deserialize<'de> for GrpcStreamConfig
Source§fn deserialize<__D>(
__deserializer: __D,
) -> Result<GrpcStreamConfig, <__D as Deserializer<'de>>::Error>where
__D: Deserializer<'de>,
fn deserialize<__D>(
__deserializer: __D,
) -> Result<GrpcStreamConfig, <__D as Deserializer<'de>>::Error>where
__D: Deserializer<'de>,
Source§impl JsonSchema for GrpcStreamConfig
impl JsonSchema for GrpcStreamConfig
Source§fn schema_id() -> Cow<'static, str>
fn schema_id() -> Cow<'static, str>
Source§fn json_schema(generator: &mut SchemaGenerator) -> Schema
fn json_schema(generator: &mut SchemaGenerator) -> Schema
Source§fn inline_schema() -> bool
fn inline_schema() -> bool
$ref keyword. Read moreSource§impl Serialize for GrpcStreamConfig
impl Serialize for GrpcStreamConfig
Source§fn serialize<__S>(
&self,
__serializer: __S,
) -> Result<<__S as Serializer>::Ok, <__S as Serializer>::Error>where
__S: Serializer,
fn serialize<__S>(
&self,
__serializer: __S,
) -> Result<<__S as Serializer>::Ok, <__S as Serializer>::Error>where
__S: Serializer,
Auto Trait Implementations§
impl Freeze for GrpcStreamConfig
impl RefUnwindSafe for GrpcStreamConfig
impl Send for GrpcStreamConfig
impl Sync for GrpcStreamConfig
impl Unpin for GrpcStreamConfig
impl UnsafeUnpin for GrpcStreamConfig
impl UnwindSafe for GrpcStreamConfig
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> FmtForward for T
impl<T> FmtForward for T
Source§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
self to use its Binary implementation when Debug-formatted.Source§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
self to use its Display implementation when
Debug-formatted.Source§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
self to use its LowerExp implementation when
Debug-formatted.Source§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
self to use its LowerHex implementation when
Debug-formatted.Source§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
self to use its Octal implementation when Debug-formatted.Source§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
self to use its Pointer implementation when
Debug-formatted.Source§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
self to use its UpperExp implementation when
Debug-formatted.Source§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
self to use its UpperHex implementation when
Debug-formatted.Source§impl<T> FutureExt for T
impl<T> FutureExt for T
Source§fn with_context(self, otel_cx: Context) -> WithContext<Self> ⓘ
fn with_context(self, otel_cx: Context) -> WithContext<Self> ⓘ
Source§fn with_current_context(self) -> WithContext<Self> ⓘ
fn with_current_context(self) -> WithContext<Self> ⓘ
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self> ⓘ
fn instrument(self, span: Span) -> Instrumented<Self> ⓘ
Source§fn in_current_span(self) -> Instrumented<Self> ⓘ
fn in_current_span(self) -> Instrumented<Self> ⓘ
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self> ⓘ
fn into_either(self, into_left: bool) -> Either<Self, Self> ⓘ
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self> ⓘ
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self> ⓘ
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T in a tonic::RequestSource§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T in a tonic::RequestSource§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
Source§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
Source§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
self and passes that borrow into the pipe function. Read moreSource§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
self and passes that borrow into the pipe function. Read moreSource§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
Source§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R,
) -> R
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
Source§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
self, then passes self.as_ref() into the pipe function.Source§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
self, then passes self.as_mut() into the pipe
function.Source§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
self, then passes self.deref() into the pipe function.Source§impl<T> Pointable for T
impl<T> Pointable for T
Source§impl<T> PolicyExt for Twhere
T: ?Sized,
impl<T> PolicyExt for Twhere
T: ?Sized,
Source§impl<T> Tap for T
impl<T> Tap for T
Source§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Borrow<B> of a value. Read moreSource§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
BorrowMut<B> of a value. Read moreSource§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
AsRef<R> view of a value. Read moreSource§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
AsMut<R> view of a value. Read moreSource§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Deref::Target of a value. Read moreSource§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Deref::Target of a value. Read moreSource§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap() only in debug builds, and is erased in release builds.Source§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut() only in debug builds, and is erased in release
builds.Source§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
.tap_borrow() only in debug builds, and is erased in release
builds.Source§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
.tap_borrow_mut() only in debug builds, and is erased in release
builds.Source§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
.tap_ref() only in debug builds, and is erased in release
builds.Source§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
.tap_ref_mut() only in debug builds, and is erased in release
builds.Source§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
.tap_deref() only in debug builds, and is erased in release
builds.