faucet-source-grpc
A config-driven gRPC source that uses protobuf reflection to call any gRPC service dynamically and return records as JSON.
Part of the faucet-stream ecosystem.
Installation
[]
= "1.0"
= { = "1", = ["full"] }
Or via the umbrella crate:
= { = "1.0", = ["source-grpc"] }
Prerequisites
This source requires a compiled FileDescriptorSet file. Generate it from your .proto files using protoc:
The descriptor file contains the full schema of your protobuf messages and services, enabling dynamic encoding and decoding without code generation.
Quick Start
use ;
use json;
async
Configuration
GrpcStreamConfig
| Field | Type | Default | Description |
|---|---|---|---|
endpoint |
String |
(required) | gRPC endpoint URL (e.g. "http://localhost:50051") |
service_name |
String |
(required) | Fully qualified service name (e.g. "mypackage.MyService") |
method_name |
String |
(required) | Method name (e.g. "ListUsers") |
descriptor_set_path |
PathBuf |
(required) | Path to the compiled FileDescriptorSet file |
request |
Value |
{} |
Request message as JSON. Fields are mapped to protobuf fields using the descriptor |
auth |
GrpcAuth |
GrpcAuth::None |
Authentication method |
tls |
Option<bool> |
None |
Whether to use TLS. When None, auto-detected from https:// in the endpoint URL |
records_path |
Option<String> |
None |
JSONPath to extract records from the response. If not set, the entire response is returned as a single record |
batch_size |
usize |
1000 |
Records per emitted StreamPage for Source::stream_pages. 0 is the "no batching" sentinel — the entire result set is emitted in a single page. See Streaming and batching below — for unary RPCs 0 and any positive value behave identically |
rpc_kind |
RpcKind |
Unary |
RPC kind: Unary (one request → one response) or ServerStreaming (one request → stream of responses). See Server-streaming RPCs below |
max_messages |
Option<usize> |
None |
Server-streaming only. Cap on the number of streamed messages to consume before terminating. None means consume until the server closes the stream |
terminate_on_error |
bool |
false |
Server-streaming only. If true, transient stream errors terminate the run. If false, the source reconnects with exponential backoff |
reconnect_initial_backoff |
Duration (secs) |
1 |
Server-streaming only. Initial backoff before the first reconnect attempt; doubles after each failure up to reconnect_max_backoff. Must be > 0 (a zero backoff would busy-spin reconnects and is rejected at construction) |
reconnect_max_backoff |
Duration (secs) |
30 |
Server-streaming only. Upper bound on reconnect backoff |
reconnect_max_attempts |
Option<u32> |
None |
Server-streaming only. Maximum reconnect attempts before surfacing the error. None means unlimited |
reconnect_replay_from_start |
bool |
true |
Server-streaming only. Whether the server replays the stream from message 0 when the same request is re-issued on reconnect. true (default) skips already-emitted messages so each is delivered once; false emits every received message (at-least-once). See Reconnect on transient errors |
max_decoding_message_size |
Option<usize> |
None |
Maximum size in bytes of a single inbound (decoded) message. None keeps tonic's built-in 4 MiB limit. Raise for sources that return large messages |
max_encoding_message_size |
Option<usize> |
None |
Maximum size in bytes of a single outbound (encoded) request message. None keeps tonic's built-in limit |
Authentication (GrpcAuth)
| Variant | Fields | Description |
|---|---|---|
None |
-- | No authentication |
Bearer { token } |
String |
Bearer token sent as authorization metadata |
Metadata { entries } |
Vec<MetadataEntry { key, value }> |
Custom metadata pairs attached to the gRPC request — Vec preserves order and allows duplicate keys (gRPC permits both) |
Streaming and batching
Unary RPCs
Unary gRPC returns one response containing all records; stream_pages falls back to the default trait impl, which buffers the full response and then chunks it in memory into batch_size pages. This bounds sink-side memory only — source-side memory is still O(full response).
batch_size = 0 and any positive batch_size are observably identical for unary gRPC — both buffer the full result before yielding, since the unary RPC has no native paging primitive the source could honour. Treat the field as a sink-side chunk size, not a wire-protocol hint.
Server-streaming RPCs
When rpc_kind = "server_streaming", the source calls tonic::client::Grpc::server_streaming and consumes the response stream message-by-message. Each streamed DynamicMessage is decoded via prost-reflect and converted to JSON; if records_path is set it is applied to each message individually (so $.events[*] flattens an array nested inside each message into the page record set).
stream_pages flushes a page each time the buffer accumulates batch_size records, bounding both source-side and sink-side memory. batch_size = 0 drains the entire stream into a single page (useful for short streams or sinks that prefer one large write). Pages carry bookmark: None — server-streaming has no native cursor, so resumption is driven by user-supplied request fields (e.g. an event id), not a faucet-managed bookmark.
Reconnect on transient errors
By default, transient stream errors (server disconnects, transport failures, etc.) trigger a reconnect with exponential backoff starting at reconnect_initial_backoff, doubling each attempt up to reconnect_max_backoff. After reconnect_max_attempts (when set), the error is surfaced. Set terminate_on_error = true to skip the reconnect path entirely and propagate the error on first failure.
Reconnect re-sends the same request from scratch (the request is resolved once per run), so a stateless server begins emitting from the start of the stream again. By default (reconnect_replay_from_start = true) the source tracks how many messages it has already emitted and skips that replayed prefix on the reconnected attempt, so each message is delivered downstream exactly once.
Set reconnect_replay_from_start = false only for servers that resume mid-stream on an identical request (rare — most resumable feeds require a resume token in the request, e.g. an after_event_id field the user maintains). With it false, every received message is emitted (at-least-once), so a replayed prefix produces duplicates downstream; conversely, leaving it true against a genuinely-resuming server would skip legitimate new messages. Match the setting to the server's replay semantics.
Config Loading
use ;
use GrpcStreamConfig;
let config: GrpcStreamConfig = load_json?;
let config: GrpcStreamConfig = load_env_file?;
Example JSON config (unary)
Example JSON config (server-streaming)
Example .env file
GRPC_ENDPOINT=http://localhost:50051
GRPC_SERVICE_NAME=users.UserService
GRPC_METHOD_NAME=ListUsers
GRPC_DESCRIPTOR_SET_PATH=proto/descriptor.bin
Config Schema Introspection
use Source;
let stream = new?;
let schema = stream.config_schema;
println!;
Examples
Basic unary RPC call
use ;
use json;
let config = new
.request;
let stream = new?;
let records = stream.fetch_all.await?;
// Returns the full response as a single JSON record
Authenticated gRPC with TLS and record extraction
use ;
use json;
let config = new
.request
.auth
.tls
.records_path;
let stream = new?;
let events = stream.fetch_all.await?;
println!;
Server-streaming RPC
use ;
use json;
let config = new
.request
.rpc_kind
.max_messages
.with_batch_size;
let stream = new?;
let events = stream.fetch_all.await?;
println!;
For long-lived streams, drive the pipeline directly so pages flush to the sink as they arrive instead of buffering everything in memory:
use Pipeline;
let pipeline = new;
pipeline.run.await?;
Custom metadata authentication
use ;
let config = new
.auth;
let stream = new?;
let items = stream.fetch_all.await?;
License
Licensed under MIT or Apache-2.0.