faucet_common_kafka/schema_registry/
mod.rs1use schemars::JsonSchema;
13use serde::{Deserialize, Serialize};
14use std::time::Duration;
15
16use crate::BasicAuth;
17
18pub mod avro;
19pub mod client;
20pub mod envelope;
21pub mod json_schema;
22pub mod protobuf;
23
24#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
26pub struct SchemaRegistryConfig {
27 pub url: String,
28 #[serde(default, skip_serializing_if = "Option::is_none")]
29 pub auth: Option<BasicAuth>,
30 #[serde(default = "default_cache_capacity")]
31 pub cache_capacity: usize,
32 #[serde(
33 default = "default_request_timeout",
34 with = "faucet_core::config::duration_secs"
35 )]
36 #[schemars(with = "u64")]
37 pub request_timeout: Duration,
38}
39
40fn default_cache_capacity() -> usize {
41 1024
42}
43
44fn default_request_timeout() -> Duration {
45 Duration::from_secs(10)
46}
47
48impl SchemaRegistryConfig {
49 pub fn new(url: impl Into<String>) -> Self {
50 Self {
51 url: url.into(),
52 auth: None,
53 cache_capacity: default_cache_capacity(),
54 request_timeout: default_request_timeout(),
55 }
56 }
57
58 pub fn validate(&self) -> Result<(), faucet_core::FaucetError> {
59 let parsed = url::Url::parse(&self.url).map_err(|e| {
60 faucet_core::FaucetError::Config(format!(
61 "schema_registry.url '{}' is not a valid URL: {e}",
62 self.url
63 ))
64 })?;
65 match parsed.scheme() {
66 "http" | "https" => {}
67 other => {
68 return Err(faucet_core::FaucetError::Config(format!(
69 "schema_registry.url scheme must be http or https, got '{other}'"
70 )));
71 }
72 }
73 if self.cache_capacity == 0 {
74 return Err(faucet_core::FaucetError::Config(
75 "schema_registry.cache_capacity must be at least 1".into(),
76 ));
77 }
78 Ok(())
79 }
80}