1use faucet_core::{AuthSpec, DEFAULT_BATCH_SIZE};
4use schemars::JsonSchema;
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7use std::path::PathBuf;
8use std::time::Duration;
9
10#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
15pub struct MetadataEntry {
16 pub key: String,
17 pub value: String,
18}
19
20#[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)]
22#[serde(tag = "type", content = "config", rename_all = "snake_case")]
23pub enum GrpcAuth {
24 #[default]
26 None,
27 Bearer { token: String },
29 Metadata { entries: Vec<MetadataEntry> },
31}
32
33#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
38#[serde(rename_all = "snake_case")]
39pub enum RpcKind {
40 #[default]
42 Unary,
43 ServerStreaming,
47}
48
49#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
51pub struct GrpcStreamConfig {
52 pub endpoint: String,
54 pub service_name: String,
56 pub method_name: String,
58 pub request: Value,
61 pub descriptor_set_path: PathBuf,
63 pub auth: AuthSpec<GrpcAuth>,
66 pub tls: Option<bool>,
68 pub records_path: Option<String>,
71 #[serde(default = "default_batch_size")]
85 pub batch_size: usize,
86 #[serde(default)]
88 pub rpc_kind: RpcKind,
89 #[serde(default)]
93 pub max_messages: Option<usize>,
94 #[serde(default)]
102 pub terminate_on_error: bool,
103 #[serde(
107 default = "default_reconnect_initial_backoff",
108 with = "faucet_core::config::duration_secs"
109 )]
110 #[schemars(with = "u64")]
111 pub reconnect_initial_backoff: Duration,
112 #[serde(
115 default = "default_reconnect_max_backoff",
116 with = "faucet_core::config::duration_secs"
117 )]
118 #[schemars(with = "u64")]
119 pub reconnect_max_backoff: Duration,
120 #[serde(default)]
124 pub reconnect_max_attempts: Option<u32>,
125 #[serde(default = "default_reconnect_replay_from_start")]
137 pub reconnect_replay_from_start: bool,
138 #[serde(default)]
144 pub max_decoding_message_size: Option<usize>,
145 #[serde(default)]
149 pub max_encoding_message_size: Option<usize>,
150}
151
152fn default_batch_size() -> usize {
153 DEFAULT_BATCH_SIZE
154}
155
156fn default_reconnect_replay_from_start() -> bool {
157 true
158}
159
160fn default_reconnect_initial_backoff() -> Duration {
161 Duration::from_secs(1)
162}
163
164fn default_reconnect_max_backoff() -> Duration {
165 Duration::from_secs(30)
166}
167
168impl GrpcStreamConfig {
169 pub fn new(
171 endpoint: impl Into<String>,
172 service_name: impl Into<String>,
173 method_name: impl Into<String>,
174 descriptor_set_path: impl Into<PathBuf>,
175 ) -> Self {
176 Self {
177 endpoint: endpoint.into(),
178 service_name: service_name.into(),
179 method_name: method_name.into(),
180 request: Value::Object(Default::default()),
181 descriptor_set_path: descriptor_set_path.into(),
182 auth: AuthSpec::Inline(GrpcAuth::None),
183 tls: None,
184 records_path: None,
185 batch_size: DEFAULT_BATCH_SIZE,
186 rpc_kind: RpcKind::Unary,
187 max_messages: None,
188 terminate_on_error: false,
189 reconnect_initial_backoff: default_reconnect_initial_backoff(),
190 reconnect_max_backoff: default_reconnect_max_backoff(),
191 reconnect_max_attempts: None,
192 reconnect_replay_from_start: default_reconnect_replay_from_start(),
193 max_decoding_message_size: None,
194 max_encoding_message_size: None,
195 }
196 }
197
198 pub fn request(mut self, request: Value) -> Self {
200 self.request = request;
201 self
202 }
203
204 pub fn auth(mut self, auth: GrpcAuth) -> Self {
206 self.auth = AuthSpec::Inline(auth);
207 self
208 }
209
210 pub fn tls(mut self, tls: bool) -> Self {
212 self.tls = Some(tls);
213 self
214 }
215
216 pub fn records_path(mut self, path: impl Into<String>) -> Self {
218 self.records_path = Some(path.into());
219 self
220 }
221
222 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
231 self.batch_size = batch_size;
232 self
233 }
234
235 pub fn rpc_kind(mut self, rpc_kind: RpcKind) -> Self {
237 self.rpc_kind = rpc_kind;
238 self
239 }
240
241 pub fn max_messages(mut self, max_messages: usize) -> Self {
244 self.max_messages = Some(max_messages);
245 self
246 }
247
248 pub fn terminate_on_error(mut self, terminate_on_error: bool) -> Self {
252 self.terminate_on_error = terminate_on_error;
253 self
254 }
255
256 pub fn reconnect_initial_backoff(mut self, backoff: Duration) -> Self {
258 self.reconnect_initial_backoff = backoff;
259 self
260 }
261
262 pub fn reconnect_max_backoff(mut self, backoff: Duration) -> Self {
264 self.reconnect_max_backoff = backoff;
265 self
266 }
267
268 pub fn reconnect_max_attempts(mut self, attempts: u32) -> Self {
270 self.reconnect_max_attempts = Some(attempts);
271 self
272 }
273
274 pub fn reconnect_replay_from_start(mut self, replay: bool) -> Self {
278 self.reconnect_replay_from_start = replay;
279 self
280 }
281
282 pub fn max_decoding_message_size(mut self, bytes: usize) -> Self {
284 self.max_decoding_message_size = Some(bytes);
285 self
286 }
287
288 pub fn max_encoding_message_size(mut self, bytes: usize) -> Self {
290 self.max_encoding_message_size = Some(bytes);
291 self
292 }
293}
294
295#[cfg(test)]
296mod tests {
297 use super::*;
298 use serde_json::json;
299
300 #[test]
301 fn default_config() {
302 let config = GrpcStreamConfig::new(
303 "http://localhost:50051",
304 "users.UserService",
305 "ListUsers",
306 "proto/descriptor.bin",
307 );
308 assert_eq!(config.endpoint, "http://localhost:50051");
309 assert_eq!(config.service_name, "users.UserService");
310 assert_eq!(config.method_name, "ListUsers");
311 assert!(config.records_path.is_none());
312 assert!(matches!(config.auth, AuthSpec::Inline(GrpcAuth::None)));
313 assert_eq!(config.rpc_kind, RpcKind::Unary);
314 assert!(config.max_messages.is_none());
315 assert!(!config.terminate_on_error);
316 assert_eq!(config.reconnect_initial_backoff, Duration::from_secs(1));
317 assert_eq!(config.reconnect_max_backoff, Duration::from_secs(30));
318 assert!(config.reconnect_max_attempts.is_none());
319 assert!(config.reconnect_replay_from_start);
320 assert!(config.max_decoding_message_size.is_none());
321 assert!(config.max_encoding_message_size.is_none());
322 }
323
324 #[test]
325 fn message_size_and_replay_builders() {
326 let config = GrpcStreamConfig::new(
327 "http://localhost:50051",
328 "svc.Svc",
329 "Tail",
330 "proto/descriptor.bin",
331 )
332 .reconnect_replay_from_start(false)
333 .max_decoding_message_size(16 * 1024 * 1024)
334 .max_encoding_message_size(1024);
335 assert!(!config.reconnect_replay_from_start);
336 assert_eq!(config.max_decoding_message_size, Some(16 * 1024 * 1024));
337 assert_eq!(config.max_encoding_message_size, Some(1024));
338 }
339
340 #[test]
341 fn builder_methods() {
342 let config =
343 GrpcStreamConfig::new("https://grpc.example.com", "svc.Svc", "Get", "desc.bin")
344 .request(json!({"page_size": 100}))
345 .auth(GrpcAuth::Bearer {
346 token: "tok".into(),
347 })
348 .tls(true)
349 .records_path("$.users[*]");
350 assert_eq!(config.request["page_size"], 100);
351 assert!(matches!(
352 config.auth,
353 AuthSpec::Inline(GrpcAuth::Bearer { .. })
354 ));
355 assert_eq!(config.tls, Some(true));
356 assert_eq!(config.records_path.unwrap(), "$.users[*]");
357 }
358
359 #[test]
360 fn batch_size_defaults_to_default_batch_size() {
361 let config = GrpcStreamConfig::new(
362 "http://localhost:50051",
363 "users.UserService",
364 "ListUsers",
365 "proto/descriptor.bin",
366 );
367 assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
368 }
369
370 #[test]
371 fn with_batch_size_overrides_default() {
372 let config = GrpcStreamConfig::new(
373 "http://localhost:50051",
374 "users.UserService",
375 "ListUsers",
376 "proto/descriptor.bin",
377 )
378 .with_batch_size(500);
379 assert_eq!(config.batch_size, 500);
380 }
381
382 #[test]
383 fn batch_size_zero_is_accepted_as_no_batching_sentinel() {
384 let config = GrpcStreamConfig::new(
385 "http://localhost:50051",
386 "users.UserService",
387 "ListUsers",
388 "proto/descriptor.bin",
389 )
390 .with_batch_size(0);
391 assert_eq!(config.batch_size, 0);
392 assert!(faucet_core::validate_batch_size(config.batch_size).is_ok());
393 }
394
395 #[test]
396 fn batch_size_above_max_is_rejected_by_validate_batch_size() {
397 let config = GrpcStreamConfig::new(
398 "http://localhost:50051",
399 "users.UserService",
400 "ListUsers",
401 "proto/descriptor.bin",
402 )
403 .with_batch_size(faucet_core::MAX_BATCH_SIZE + 1);
404 assert!(faucet_core::validate_batch_size(config.batch_size).is_err());
405 }
406
407 #[test]
408 fn batch_size_deserializes_from_json() {
409 let json = r#"{
410 "endpoint": "http://localhost:50051",
411 "service_name": "users.UserService",
412 "method_name": "ListUsers",
413 "request": {},
414 "descriptor_set_path": "proto/descriptor.bin",
415 "auth": { "type": "none" },
416 "tls": null,
417 "records_path": null,
418 "batch_size": 250
419 }"#;
420 let config: GrpcStreamConfig = serde_json::from_str(json).unwrap();
421 assert_eq!(config.batch_size, 250);
422 }
423
424 #[test]
425 fn batch_size_defaults_when_absent_from_json() {
426 let json = r#"{
427 "endpoint": "http://localhost:50051",
428 "service_name": "users.UserService",
429 "method_name": "ListUsers",
430 "request": {},
431 "descriptor_set_path": "proto/descriptor.bin",
432 "auth": { "type": "none" },
433 "tls": null,
434 "records_path": null
435 }"#;
436 let config: GrpcStreamConfig = serde_json::from_str(json).unwrap();
437 assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
438 }
439
440 #[test]
441 fn server_streaming_fields_deserialize_from_json() {
442 let json = r#"{
443 "endpoint": "http://localhost:50051",
444 "service_name": "events.EventService",
445 "method_name": "Tail",
446 "request": {},
447 "descriptor_set_path": "proto/descriptor.bin",
448 "auth": { "type": "none" },
449 "tls": null,
450 "records_path": null,
451 "rpc_kind": "server_streaming",
452 "max_messages": 100,
453 "terminate_on_error": true,
454 "reconnect_initial_backoff": 2,
455 "reconnect_max_backoff": 60,
456 "reconnect_max_attempts": 5
457 }"#;
458 let config: GrpcStreamConfig = serde_json::from_str(json).unwrap();
459 assert_eq!(config.rpc_kind, RpcKind::ServerStreaming);
460 assert_eq!(config.max_messages, Some(100));
461 assert!(config.terminate_on_error);
462 assert_eq!(config.reconnect_initial_backoff, Duration::from_secs(2));
463 assert_eq!(config.reconnect_max_backoff, Duration::from_secs(60));
464 assert_eq!(config.reconnect_max_attempts, Some(5));
465 }
466
467 #[test]
468 fn server_streaming_fields_default_when_absent_from_json() {
469 let json = r#"{
470 "endpoint": "http://localhost:50051",
471 "service_name": "users.UserService",
472 "method_name": "ListUsers",
473 "request": {},
474 "descriptor_set_path": "proto/descriptor.bin",
475 "auth": { "type": "none" },
476 "tls": null,
477 "records_path": null
478 }"#;
479 let config: GrpcStreamConfig = serde_json::from_str(json).unwrap();
480 assert_eq!(config.rpc_kind, RpcKind::Unary);
481 assert!(config.max_messages.is_none());
482 assert!(!config.terminate_on_error);
483 assert_eq!(config.reconnect_initial_backoff, Duration::from_secs(1));
484 assert_eq!(config.reconnect_max_backoff, Duration::from_secs(30));
485 assert!(config.reconnect_max_attempts.is_none());
486 assert!(config.reconnect_replay_from_start);
487 assert!(config.max_decoding_message_size.is_none());
488 }
489
490 #[test]
491 fn rpc_kind_builder() {
492 let config = GrpcStreamConfig::new(
493 "http://localhost:50051",
494 "events.EventService",
495 "Tail",
496 "proto/descriptor.bin",
497 )
498 .rpc_kind(RpcKind::ServerStreaming)
499 .max_messages(50)
500 .terminate_on_error(true)
501 .reconnect_initial_backoff(Duration::from_secs(5))
502 .reconnect_max_backoff(Duration::from_secs(120))
503 .reconnect_max_attempts(10);
504 assert_eq!(config.rpc_kind, RpcKind::ServerStreaming);
505 assert_eq!(config.max_messages, Some(50));
506 assert!(config.terminate_on_error);
507 assert_eq!(config.reconnect_initial_backoff, Duration::from_secs(5));
508 assert_eq!(config.reconnect_max_backoff, Duration::from_secs(120));
509 assert_eq!(config.reconnect_max_attempts, Some(10));
510 }
511}