faucet-source-grpc 1.0.0

gRPC API source connector for the faucet-stream ecosystem
Documentation

faucet-source-grpc

Crates.io Docs.rs

A config-driven gRPC source that uses protobuf reflection to call any gRPC service dynamically and return records as JSON.

Part of the faucet-stream ecosystem.

Installation

[dependencies]
faucet-source-grpc = "1.0"
tokio = { version = "1", features = ["full"] }

Or via the umbrella crate:

faucet-stream = { version = "1.0", features = ["source-grpc"] }

Prerequisites

This source requires a compiled FileDescriptorSet file. Generate it from your .proto files using protoc:

protoc --descriptor_set_out=descriptor.bin --include_imports \
    -I proto/ proto/my_service.proto

The descriptor file contains the full schema of your protobuf messages and services, enabling dynamic encoding and decoding without code generation.

Quick Start

use faucet_source_grpc::{GrpcStream, GrpcStreamConfig};
use serde_json::json;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = GrpcStreamConfig::new(
        "http://localhost:50051",
        "users.UserService",
        "ListUsers",
        "proto/descriptor.bin",
    )
    .request(json!({"page_size": 100}))
    .records_path("$.users[*]");

    let stream = GrpcStream::new(config)?;
    let records = stream.fetch_all().await?;

    for record in &records {
        println!("{}", record);
    }
    Ok(())
}

Configuration

GrpcStreamConfig

Field Type Default Description
endpoint String (required) gRPC endpoint URL (e.g. "http://localhost:50051")
service_name String (required) Fully qualified service name (e.g. "mypackage.MyService")
method_name String (required) Method name (e.g. "ListUsers")
descriptor_set_path PathBuf (required) Path to the compiled FileDescriptorSet file
request Value {} Request message as JSON. Fields are mapped to protobuf fields using the descriptor
auth GrpcAuth GrpcAuth::None Authentication method
tls Option<bool> None Whether to use TLS. When None, auto-detected from https:// in the endpoint URL
records_path Option<String> None JSONPath to extract records from the response. If not set, the entire response is returned as a single record
batch_size usize 1000 Records per emitted StreamPage for Source::stream_pages. 0 is the "no batching" sentinel — the entire result set is emitted in a single page. See Streaming and batching below — for unary RPCs 0 and any positive value behave identically
rpc_kind RpcKind Unary RPC kind: Unary (one request → one response) or ServerStreaming (one request → stream of responses). See Server-streaming RPCs below
max_messages Option<usize> None Server-streaming only. Cap on the number of streamed messages to consume before terminating. None means consume until the server closes the stream
terminate_on_error bool false Server-streaming only. If true, transient stream errors terminate the run. If false, the source reconnects with exponential backoff
reconnect_initial_backoff Duration (secs) 1 Server-streaming only. Initial backoff before the first reconnect attempt; doubles after each failure up to reconnect_max_backoff. Must be > 0 (a zero backoff would busy-spin reconnects and is rejected at construction)
reconnect_max_backoff Duration (secs) 30 Server-streaming only. Upper bound on reconnect backoff
reconnect_max_attempts Option<u32> None Server-streaming only. Maximum reconnect attempts before surfacing the error. None means unlimited
reconnect_replay_from_start bool true Server-streaming only. Whether the server replays the stream from message 0 when the same request is re-issued on reconnect. true (default) skips already-emitted messages so each is delivered once; false emits every received message (at-least-once). See Reconnect on transient errors
max_decoding_message_size Option<usize> None Maximum size in bytes of a single inbound (decoded) message. None keeps tonic's built-in 4 MiB limit. Raise for sources that return large messages
max_encoding_message_size Option<usize> None Maximum size in bytes of a single outbound (encoded) request message. None keeps tonic's built-in limit

Authentication (GrpcAuth)

Variant Fields Description
None -- No authentication
Bearer { token } String Bearer token sent as authorization metadata
Metadata { entries } Vec<MetadataEntry { key, value }> Custom metadata pairs attached to the gRPC request — Vec preserves order and allows duplicate keys (gRPC permits both)

Streaming and batching

Unary RPCs

Unary gRPC returns one response containing all records; stream_pages falls back to the default trait impl, which buffers the full response and then chunks it in memory into batch_size pages. This bounds sink-side memory only — source-side memory is still O(full response).

batch_size = 0 and any positive batch_size are observably identical for unary gRPC — both buffer the full result before yielding, since the unary RPC has no native paging primitive the source could honour. Treat the field as a sink-side chunk size, not a wire-protocol hint.

Server-streaming RPCs

When rpc_kind = "server_streaming", the source calls tonic::client::Grpc::server_streaming and consumes the response stream message-by-message. Each streamed DynamicMessage is decoded via prost-reflect and converted to JSON; if records_path is set it is applied to each message individually (so $.events[*] flattens an array nested inside each message into the page record set).

stream_pages flushes a page each time the buffer accumulates batch_size records, bounding both source-side and sink-side memory. batch_size = 0 drains the entire stream into a single page (useful for short streams or sinks that prefer one large write). Pages carry bookmark: None — server-streaming has no native cursor, so resumption is driven by user-supplied request fields (e.g. an event id), not a faucet-managed bookmark.

Reconnect on transient errors

By default, transient stream errors (server disconnects, transport failures, etc.) trigger a reconnect with exponential backoff starting at reconnect_initial_backoff, doubling each attempt up to reconnect_max_backoff. After reconnect_max_attempts (when set), the error is surfaced. Set terminate_on_error = true to skip the reconnect path entirely and propagate the error on first failure.

Reconnect re-sends the same request from scratch (the request is resolved once per run), so a stateless server begins emitting from the start of the stream again. By default (reconnect_replay_from_start = true) the source tracks how many messages it has already emitted and skips that replayed prefix on the reconnected attempt, so each message is delivered downstream exactly once.

Set reconnect_replay_from_start = false only for servers that resume mid-stream on an identical request (rare — most resumable feeds require a resume token in the request, e.g. an after_event_id field the user maintains). With it false, every received message is emitted (at-least-once), so a replayed prefix produces duplicates downstream; conversely, leaving it true against a genuinely-resuming server would skip legitimate new messages. Match the setting to the server's replay semantics.

Config Loading

use faucet_core::config::{load_json, load_env_file};
use faucet_source_grpc::GrpcStreamConfig;

let config: GrpcStreamConfig = load_json("config.json")?;
let config: GrpcStreamConfig = load_env_file(".env", "GRPC")?;

Example JSON config (unary)

{
  "endpoint": "https://grpc.example.com:443",
  "service_name": "inventory.InventoryService",
  "method_name": "ListProducts",
  "descriptor_set_path": "proto/descriptor.bin",
  "request": {
    "category": "electronics",
    "page_size": 100
  },
  "auth": {
    "type": "bearer",
    "config": {
      "token": "your-api-token"
    }
  },
  "tls": true,
  "records_path": "$.products[*]",
  "batch_size": 1000
}

Example JSON config (server-streaming)

{
  "endpoint": "https://grpc.example.com:443",
  "service_name": "events.EventService",
  "method_name": "Tail",
  "descriptor_set_path": "proto/descriptor.bin",
  "request": { "topic": "audit-log" },
  "auth": { "type": "bearer", "config": { "token": "your-api-token" } },
  "tls": true,
  "records_path": null,
  "rpc_kind": "server_streaming",
  "max_messages": 100000,
  "terminate_on_error": false,
  "reconnect_initial_backoff": 1,
  "reconnect_max_backoff": 30,
  "reconnect_max_attempts": null,
  "reconnect_replay_from_start": true,
  "max_decoding_message_size": 16777216,
  "batch_size": 500
}

Example .env file

GRPC_ENDPOINT=http://localhost:50051
GRPC_SERVICE_NAME=users.UserService
GRPC_METHOD_NAME=ListUsers
GRPC_DESCRIPTOR_SET_PATH=proto/descriptor.bin

Config Schema Introspection

use faucet_core::Source;

let stream = GrpcStream::new(config)?;
let schema = stream.config_schema();
println!("{}", serde_json::to_string_pretty(&schema)?);

Examples

Basic unary RPC call

use faucet_source_grpc::{GrpcStream, GrpcStreamConfig};
use serde_json::json;

let config = GrpcStreamConfig::new(
    "http://localhost:50051",
    "orders.OrderService",
    "GetOrder",
    "proto/descriptor.bin",
)
.request(json!({"order_id": "ord-12345"}));

let stream = GrpcStream::new(config)?;
let records = stream.fetch_all().await?;
// Returns the full response as a single JSON record

Authenticated gRPC with TLS and record extraction

use faucet_source_grpc::{GrpcStream, GrpcStreamConfig, GrpcAuth};
use serde_json::json;

let config = GrpcStreamConfig::new(
    "https://grpc.production.example.com",
    "analytics.EventService",
    "QueryEvents",
    "proto/descriptor.bin",
)
.request(json!({
    "start_time": "2025-01-01T00:00:00Z",
    "end_time": "2025-02-01T00:00:00Z",
    "limit": 1000
}))
.auth(GrpcAuth::Bearer {
    token: "your-bearer-token".into(),
})
.tls(true)
.records_path("$.events[*]");

let stream = GrpcStream::new(config)?;
let events = stream.fetch_all().await?;
println!("Fetched {} events", events.len());

Server-streaming RPC

use faucet_source_grpc::{GrpcStream, GrpcStreamConfig, RpcKind};
use serde_json::json;

let config = GrpcStreamConfig::new(
    "http://localhost:50051",
    "events.EventService",
    "Tail",
    "proto/descriptor.bin",
)
.request(json!({ "topic": "audit-log" }))
.rpc_kind(RpcKind::ServerStreaming)
.max_messages(10_000)
.with_batch_size(500);

let stream = GrpcStream::new(config)?;
let events = stream.fetch_all().await?;
println!("Collected {} events", events.len());

For long-lived streams, drive the pipeline directly so pages flush to the sink as they arrive instead of buffering everything in memory:

use faucet_core::Pipeline;

let pipeline = Pipeline::new(&stream, &my_sink);
pipeline.run().await?;

Custom metadata authentication

use faucet_source_grpc::{GrpcAuth, GrpcStream, GrpcStreamConfig, MetadataEntry};

let config = GrpcStreamConfig::new(
    "http://localhost:50051",
    "mypackage.MyService",
    "ListItems",
    "proto/descriptor.bin",
)
.auth(GrpcAuth::Metadata {
    entries: vec![
        MetadataEntry { key: "x-api-key".into(), value: "my-secret-key".into() },
        MetadataEntry { key: "x-tenant-id".into(), value: "tenant-123".into() },
    ],
});

let stream = GrpcStream::new(config)?;
let items = stream.fetch_all().await?;

License

Licensed under MIT or Apache-2.0.