Skip to main content

faucet_source_grpc/
config.rs

1//! gRPC source configuration.
2
3use faucet_core::{AuthSpec, DEFAULT_BATCH_SIZE};
4use schemars::JsonSchema;
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7use std::path::PathBuf;
8use std::time::Duration;
9
10/// A single piece of gRPC request metadata.
11///
12/// Use a `Vec<MetadataEntry>` rather than a map because gRPC allows duplicate
13/// keys and order is occasionally observable.
14#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
15pub struct MetadataEntry {
16    pub key: String,
17    pub value: String,
18}
19
20/// Authentication for gRPC endpoints.
21#[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)]
22#[serde(tag = "type", content = "config", rename_all = "snake_case")]
23pub enum GrpcAuth {
24    /// No authentication.
25    #[default]
26    None,
27    /// Bearer token sent as `authorization` metadata.
28    Bearer { token: String },
29    /// Custom metadata key-value pairs.
30    Metadata { entries: Vec<MetadataEntry> },
31}
32
33/// Kind of gRPC RPC to invoke.
34///
35/// Selected via the `rpc_kind` config field; defaults to [`RpcKind::Unary`]
36/// so existing configs are backward-compatible.
37#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
38#[serde(rename_all = "snake_case")]
39pub enum RpcKind {
40    /// One request, one response (default).
41    #[default]
42    Unary,
43    /// One request, a server-driven stream of responses. The source consumes
44    /// each streamed message and emits records as they arrive. Useful for
45    /// event feeds, change feeds, log tails, and any long-lived gRPC stream.
46    ServerStreaming,
47}
48
49/// Configuration for the gRPC source.
50#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
51pub struct GrpcStreamConfig {
52    /// gRPC endpoint URL (e.g. `"http://localhost:50051"`).
53    pub endpoint: String,
54    /// Fully qualified service name (e.g. `"mypackage.MyService"`).
55    pub service_name: String,
56    /// Method name (e.g. `"ListUsers"`).
57    pub method_name: String,
58    /// Request message as JSON. Fields are mapped to protobuf fields
59    /// using the `FileDescriptorSet`.
60    pub request: Value,
61    /// Path to the compiled `FileDescriptorSet` file.
62    pub descriptor_set_path: PathBuf,
63    /// Authentication: either inline (`{ type, config }`) or a `{ ref: <name> }`
64    /// pointer to a shared provider in the CLI's top-level `auth:` catalog.
65    pub auth: AuthSpec<GrpcAuth>,
66    /// Whether to use TLS (detected from `https://` in endpoint by default).
67    pub tls: Option<bool>,
68    /// JSONPath to extract records from the response.
69    /// If not set, the entire response is returned as a single record.
70    pub records_path: Option<String>,
71    /// Records per emitted [`StreamPage`](faucet_core::StreamPage). Defaults
72    /// to [`DEFAULT_BATCH_SIZE`].
73    ///
74    /// For unary RPCs the source has no native paging primitive to honour
75    /// this hint: the default
76    /// [`Source::stream_pages`](faucet_core::Source::stream_pages) impl
77    /// buffers the full response and then chunks it in memory, bounding
78    /// **sink-side** memory only.
79    ///
80    /// For server-streaming RPCs (`rpc_kind = "server_streaming"`) the source
81    /// overrides `stream_pages` and flushes a page each time `batch_size`
82    /// messages accumulate, bounding both source-side and sink-side memory.
83    /// `batch_size = 0` drains the entire stream into a single page.
84    #[serde(default = "default_batch_size")]
85    pub batch_size: usize,
86    /// Kind of RPC to invoke. Defaults to [`RpcKind::Unary`].
87    #[serde(default)]
88    pub rpc_kind: RpcKind,
89    /// For [`RpcKind::ServerStreaming`]: maximum number of messages to consume
90    /// before terminating the stream. `None` means consume until the server
91    /// closes the stream (or the run is cancelled).
92    #[serde(default)]
93    pub max_messages: Option<usize>,
94    /// For [`RpcKind::ServerStreaming`]: if `true`, transient stream errors
95    /// (server-side disconnects, transport errors, etc.) terminate the run
96    /// with [`FaucetError::Source`](faucet_core::FaucetError::Source). When
97    /// `false` (the default), the source reconnects with exponential backoff
98    /// up to [`reconnect_max_attempts`](Self::reconnect_max_attempts).
99    ///
100    /// Ignored for [`RpcKind::Unary`].
101    #[serde(default)]
102    pub terminate_on_error: bool,
103    /// For [`RpcKind::ServerStreaming`] reconnect: initial backoff delay
104    /// before the first retry. Doubles after each failure up to
105    /// [`reconnect_max_backoff`](Self::reconnect_max_backoff). Defaults to 1s.
106    #[serde(
107        default = "default_reconnect_initial_backoff",
108        with = "faucet_core::config::duration_secs"
109    )]
110    #[schemars(with = "u64")]
111    pub reconnect_initial_backoff: Duration,
112    /// For [`RpcKind::ServerStreaming`] reconnect: maximum backoff cap.
113    /// Defaults to 30s.
114    #[serde(
115        default = "default_reconnect_max_backoff",
116        with = "faucet_core::config::duration_secs"
117    )]
118    #[schemars(with = "u64")]
119    pub reconnect_max_backoff: Duration,
120    /// For [`RpcKind::ServerStreaming`] reconnect: maximum reconnect attempts
121    /// before surfacing the error. `None` (the default) means unlimited
122    /// retries.
123    #[serde(default)]
124    pub reconnect_max_attempts: Option<u32>,
125    /// For [`RpcKind::ServerStreaming`] reconnect: whether the server replays
126    /// the response stream from the beginning when the identical request is
127    /// re-issued after a disconnect. Defaults to `true`.
128    ///
129    /// Because the request is resolved once per run, a reconnect sends the
130    /// *same* request — a stateless server therefore re-streams from message
131    /// 0. When `true` the source skips the messages it already emitted before
132    /// the disconnect, so consumers see each message once. Set to `false`
133    /// only for servers that resume mid-stream on the same request (rare):
134    /// there, skipping would drop genuinely-new messages, so every received
135    /// message is emitted (at-least-once; duplicates possible).
136    #[serde(default = "default_reconnect_replay_from_start")]
137    pub reconnect_replay_from_start: bool,
138    /// Maximum size, in bytes, of a single inbound (decoded) gRPC message.
139    /// `None` (the default) keeps tonic's built-in 4 MiB limit. Raise this
140    /// for sources that legitimately return large messages; a too-low limit
141    /// surfaces as a decode error and aborts the call. Applies to both unary
142    /// and server-streaming RPCs.
143    #[serde(default)]
144    pub max_decoding_message_size: Option<usize>,
145    /// Maximum size, in bytes, of a single outbound (encoded) gRPC request
146    /// message. `None` (the default) keeps tonic's built-in limit. Rarely
147    /// needs tuning for a data source, since requests are typically small.
148    #[serde(default)]
149    pub max_encoding_message_size: Option<usize>,
150}
151
152fn default_batch_size() -> usize {
153    DEFAULT_BATCH_SIZE
154}
155
156fn default_reconnect_replay_from_start() -> bool {
157    true
158}
159
160fn default_reconnect_initial_backoff() -> Duration {
161    Duration::from_secs(1)
162}
163
164fn default_reconnect_max_backoff() -> Duration {
165    Duration::from_secs(30)
166}
167
168impl GrpcStreamConfig {
169    /// Create a new config with the required fields.
170    pub fn new(
171        endpoint: impl Into<String>,
172        service_name: impl Into<String>,
173        method_name: impl Into<String>,
174        descriptor_set_path: impl Into<PathBuf>,
175    ) -> Self {
176        Self {
177            endpoint: endpoint.into(),
178            service_name: service_name.into(),
179            method_name: method_name.into(),
180            request: Value::Object(Default::default()),
181            descriptor_set_path: descriptor_set_path.into(),
182            auth: AuthSpec::Inline(GrpcAuth::None),
183            tls: None,
184            records_path: None,
185            batch_size: DEFAULT_BATCH_SIZE,
186            rpc_kind: RpcKind::Unary,
187            max_messages: None,
188            terminate_on_error: false,
189            reconnect_initial_backoff: default_reconnect_initial_backoff(),
190            reconnect_max_backoff: default_reconnect_max_backoff(),
191            reconnect_max_attempts: None,
192            reconnect_replay_from_start: default_reconnect_replay_from_start(),
193            max_decoding_message_size: None,
194            max_encoding_message_size: None,
195        }
196    }
197
198    /// Set the request message as JSON.
199    pub fn request(mut self, request: Value) -> Self {
200        self.request = request;
201        self
202    }
203
204    /// Set the authentication method (inline).
205    pub fn auth(mut self, auth: GrpcAuth) -> Self {
206        self.auth = AuthSpec::Inline(auth);
207        self
208    }
209
210    /// Set the TLS mode explicitly.
211    pub fn tls(mut self, tls: bool) -> Self {
212        self.tls = Some(tls);
213        self
214    }
215
216    /// Set the JSONPath for record extraction from the response.
217    pub fn records_path(mut self, path: impl Into<String>) -> Self {
218        self.records_path = Some(path.into());
219        self
220    }
221
222    /// Set the per-page record count for
223    /// [`Source::stream_pages`](faucet_core::Source::stream_pages).
224    ///
225    /// Pass `0` to opt out of batching — the entire result set is emitted in
226    /// a single [`StreamPage`](faucet_core::StreamPage). For unary RPCs this
227    /// is observably identical to any positive `batch_size`, since the full
228    /// response is buffered before any page is yielded. For server-streaming
229    /// RPCs, `0` drains the entire stream before yielding.
230    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
231        self.batch_size = batch_size;
232        self
233    }
234
235    /// Set the RPC kind (unary or server-streaming).
236    pub fn rpc_kind(mut self, rpc_kind: RpcKind) -> Self {
237        self.rpc_kind = rpc_kind;
238        self
239    }
240
241    /// Cap the number of messages to consume from a server-streaming RPC.
242    /// Ignored for unary RPCs.
243    pub fn max_messages(mut self, max_messages: usize) -> Self {
244        self.max_messages = Some(max_messages);
245        self
246    }
247
248    /// Whether transient server-streaming errors should terminate the run
249    /// (`true`) or trigger a reconnect with exponential backoff (`false`,
250    /// the default).
251    pub fn terminate_on_error(mut self, terminate_on_error: bool) -> Self {
252        self.terminate_on_error = terminate_on_error;
253        self
254    }
255
256    /// Set the initial reconnect backoff for server-streaming RPCs.
257    pub fn reconnect_initial_backoff(mut self, backoff: Duration) -> Self {
258        self.reconnect_initial_backoff = backoff;
259        self
260    }
261
262    /// Set the maximum reconnect backoff for server-streaming RPCs.
263    pub fn reconnect_max_backoff(mut self, backoff: Duration) -> Self {
264        self.reconnect_max_backoff = backoff;
265        self
266    }
267
268    /// Cap the number of reconnect attempts for server-streaming RPCs.
269    pub fn reconnect_max_attempts(mut self, attempts: u32) -> Self {
270        self.reconnect_max_attempts = Some(attempts);
271        self
272    }
273
274    /// Set whether the server replays the stream from the start on reconnect
275    /// (default `true`). See
276    /// [`reconnect_replay_from_start`](Self::reconnect_replay_from_start).
277    pub fn reconnect_replay_from_start(mut self, replay: bool) -> Self {
278        self.reconnect_replay_from_start = replay;
279        self
280    }
281
282    /// Set the maximum inbound (decoded) gRPC message size in bytes.
283    pub fn max_decoding_message_size(mut self, bytes: usize) -> Self {
284        self.max_decoding_message_size = Some(bytes);
285        self
286    }
287
288    /// Set the maximum outbound (encoded) gRPC message size in bytes.
289    pub fn max_encoding_message_size(mut self, bytes: usize) -> Self {
290        self.max_encoding_message_size = Some(bytes);
291        self
292    }
293}
294
295#[cfg(test)]
296mod tests {
297    use super::*;
298    use serde_json::json;
299
300    #[test]
301    fn default_config() {
302        let config = GrpcStreamConfig::new(
303            "http://localhost:50051",
304            "users.UserService",
305            "ListUsers",
306            "proto/descriptor.bin",
307        );
308        assert_eq!(config.endpoint, "http://localhost:50051");
309        assert_eq!(config.service_name, "users.UserService");
310        assert_eq!(config.method_name, "ListUsers");
311        assert!(config.records_path.is_none());
312        assert!(matches!(config.auth, AuthSpec::Inline(GrpcAuth::None)));
313        assert_eq!(config.rpc_kind, RpcKind::Unary);
314        assert!(config.max_messages.is_none());
315        assert!(!config.terminate_on_error);
316        assert_eq!(config.reconnect_initial_backoff, Duration::from_secs(1));
317        assert_eq!(config.reconnect_max_backoff, Duration::from_secs(30));
318        assert!(config.reconnect_max_attempts.is_none());
319        assert!(config.reconnect_replay_from_start);
320        assert!(config.max_decoding_message_size.is_none());
321        assert!(config.max_encoding_message_size.is_none());
322    }
323
324    #[test]
325    fn message_size_and_replay_builders() {
326        let config = GrpcStreamConfig::new(
327            "http://localhost:50051",
328            "svc.Svc",
329            "Tail",
330            "proto/descriptor.bin",
331        )
332        .reconnect_replay_from_start(false)
333        .max_decoding_message_size(16 * 1024 * 1024)
334        .max_encoding_message_size(1024);
335        assert!(!config.reconnect_replay_from_start);
336        assert_eq!(config.max_decoding_message_size, Some(16 * 1024 * 1024));
337        assert_eq!(config.max_encoding_message_size, Some(1024));
338    }
339
340    #[test]
341    fn builder_methods() {
342        let config =
343            GrpcStreamConfig::new("https://grpc.example.com", "svc.Svc", "Get", "desc.bin")
344                .request(json!({"page_size": 100}))
345                .auth(GrpcAuth::Bearer {
346                    token: "tok".into(),
347                })
348                .tls(true)
349                .records_path("$.users[*]");
350        assert_eq!(config.request["page_size"], 100);
351        assert!(matches!(
352            config.auth,
353            AuthSpec::Inline(GrpcAuth::Bearer { .. })
354        ));
355        assert_eq!(config.tls, Some(true));
356        assert_eq!(config.records_path.unwrap(), "$.users[*]");
357    }
358
359    #[test]
360    fn batch_size_defaults_to_default_batch_size() {
361        let config = GrpcStreamConfig::new(
362            "http://localhost:50051",
363            "users.UserService",
364            "ListUsers",
365            "proto/descriptor.bin",
366        );
367        assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
368    }
369
370    #[test]
371    fn with_batch_size_overrides_default() {
372        let config = GrpcStreamConfig::new(
373            "http://localhost:50051",
374            "users.UserService",
375            "ListUsers",
376            "proto/descriptor.bin",
377        )
378        .with_batch_size(500);
379        assert_eq!(config.batch_size, 500);
380    }
381
382    #[test]
383    fn batch_size_zero_is_accepted_as_no_batching_sentinel() {
384        let config = GrpcStreamConfig::new(
385            "http://localhost:50051",
386            "users.UserService",
387            "ListUsers",
388            "proto/descriptor.bin",
389        )
390        .with_batch_size(0);
391        assert_eq!(config.batch_size, 0);
392        assert!(faucet_core::validate_batch_size(config.batch_size).is_ok());
393    }
394
395    #[test]
396    fn batch_size_above_max_is_rejected_by_validate_batch_size() {
397        let config = GrpcStreamConfig::new(
398            "http://localhost:50051",
399            "users.UserService",
400            "ListUsers",
401            "proto/descriptor.bin",
402        )
403        .with_batch_size(faucet_core::MAX_BATCH_SIZE + 1);
404        assert!(faucet_core::validate_batch_size(config.batch_size).is_err());
405    }
406
407    #[test]
408    fn batch_size_deserializes_from_json() {
409        let json = r#"{
410            "endpoint": "http://localhost:50051",
411            "service_name": "users.UserService",
412            "method_name": "ListUsers",
413            "request": {},
414            "descriptor_set_path": "proto/descriptor.bin",
415            "auth": { "type": "none" },
416            "tls": null,
417            "records_path": null,
418            "batch_size": 250
419        }"#;
420        let config: GrpcStreamConfig = serde_json::from_str(json).unwrap();
421        assert_eq!(config.batch_size, 250);
422    }
423
424    #[test]
425    fn batch_size_defaults_when_absent_from_json() {
426        let json = r#"{
427            "endpoint": "http://localhost:50051",
428            "service_name": "users.UserService",
429            "method_name": "ListUsers",
430            "request": {},
431            "descriptor_set_path": "proto/descriptor.bin",
432            "auth": { "type": "none" },
433            "tls": null,
434            "records_path": null
435        }"#;
436        let config: GrpcStreamConfig = serde_json::from_str(json).unwrap();
437        assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
438    }
439
440    #[test]
441    fn server_streaming_fields_deserialize_from_json() {
442        let json = r#"{
443            "endpoint": "http://localhost:50051",
444            "service_name": "events.EventService",
445            "method_name": "Tail",
446            "request": {},
447            "descriptor_set_path": "proto/descriptor.bin",
448            "auth": { "type": "none" },
449            "tls": null,
450            "records_path": null,
451            "rpc_kind": "server_streaming",
452            "max_messages": 100,
453            "terminate_on_error": true,
454            "reconnect_initial_backoff": 2,
455            "reconnect_max_backoff": 60,
456            "reconnect_max_attempts": 5
457        }"#;
458        let config: GrpcStreamConfig = serde_json::from_str(json).unwrap();
459        assert_eq!(config.rpc_kind, RpcKind::ServerStreaming);
460        assert_eq!(config.max_messages, Some(100));
461        assert!(config.terminate_on_error);
462        assert_eq!(config.reconnect_initial_backoff, Duration::from_secs(2));
463        assert_eq!(config.reconnect_max_backoff, Duration::from_secs(60));
464        assert_eq!(config.reconnect_max_attempts, Some(5));
465    }
466
467    #[test]
468    fn server_streaming_fields_default_when_absent_from_json() {
469        let json = r#"{
470            "endpoint": "http://localhost:50051",
471            "service_name": "users.UserService",
472            "method_name": "ListUsers",
473            "request": {},
474            "descriptor_set_path": "proto/descriptor.bin",
475            "auth": { "type": "none" },
476            "tls": null,
477            "records_path": null
478        }"#;
479        let config: GrpcStreamConfig = serde_json::from_str(json).unwrap();
480        assert_eq!(config.rpc_kind, RpcKind::Unary);
481        assert!(config.max_messages.is_none());
482        assert!(!config.terminate_on_error);
483        assert_eq!(config.reconnect_initial_backoff, Duration::from_secs(1));
484        assert_eq!(config.reconnect_max_backoff, Duration::from_secs(30));
485        assert!(config.reconnect_max_attempts.is_none());
486        assert!(config.reconnect_replay_from_start);
487        assert!(config.max_decoding_message_size.is_none());
488    }
489
490    #[test]
491    fn rpc_kind_builder() {
492        let config = GrpcStreamConfig::new(
493            "http://localhost:50051",
494            "events.EventService",
495            "Tail",
496            "proto/descriptor.bin",
497        )
498        .rpc_kind(RpcKind::ServerStreaming)
499        .max_messages(50)
500        .terminate_on_error(true)
501        .reconnect_initial_backoff(Duration::from_secs(5))
502        .reconnect_max_backoff(Duration::from_secs(120))
503        .reconnect_max_attempts(10);
504        assert_eq!(config.rpc_kind, RpcKind::ServerStreaming);
505        assert_eq!(config.max_messages, Some(50));
506        assert!(config.terminate_on_error);
507        assert_eq!(config.reconnect_initial_backoff, Duration::from_secs(5));
508        assert_eq!(config.reconnect_max_backoff, Duration::from_secs(120));
509        assert_eq!(config.reconnect_max_attempts, Some(10));
510    }
511}