Skip to main content

faucet_common_kafka/schema_registry/
mod.rs

1//! Confluent Schema Registry client and wire-envelope handling.
2//!
3//! All four moving pieces live behind the `schema-registry` Cargo feature:
4//!
5//! - [`SchemaRegistryConfig`] — connection settings.
6//! - [`envelope`] — encode/decode the Confluent wire envelope
7//!   `[magic_byte: 0x00][schema_id: be u32][payload bytes...]`.
8//! - [`client::SchemaRegistryClient`] — HTTP client with an LRU schema cache.
9//! - The per-format codecs (`avro`, `protobuf`, `json_schema`) live in
10//!   sibling modules.
11
12use schemars::JsonSchema;
13use serde::{Deserialize, Serialize};
14use std::time::Duration;
15
16use crate::BasicAuth;
17
18pub mod avro;
19pub mod client;
20pub mod envelope;
21pub mod json_schema;
22pub mod protobuf;
23
24/// Confluent Schema Registry connection settings.
25#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
26pub struct SchemaRegistryConfig {
27    pub url: String,
28    #[serde(default, skip_serializing_if = "Option::is_none")]
29    pub auth: Option<BasicAuth>,
30    #[serde(default = "default_cache_capacity")]
31    pub cache_capacity: usize,
32    #[serde(
33        default = "default_request_timeout",
34        with = "faucet_core::config::duration_secs"
35    )]
36    #[schemars(with = "u64")]
37    pub request_timeout: Duration,
38}
39
40fn default_cache_capacity() -> usize {
41    1024
42}
43
44fn default_request_timeout() -> Duration {
45    Duration::from_secs(10)
46}
47
48impl SchemaRegistryConfig {
49    pub fn new(url: impl Into<String>) -> Self {
50        Self {
51            url: url.into(),
52            auth: None,
53            cache_capacity: default_cache_capacity(),
54            request_timeout: default_request_timeout(),
55        }
56    }
57
58    pub fn validate(&self) -> Result<(), faucet_core::FaucetError> {
59        let parsed = url::Url::parse(&self.url).map_err(|e| {
60            faucet_core::FaucetError::Config(format!(
61                "schema_registry.url '{}' is not a valid URL: {e}",
62                self.url
63            ))
64        })?;
65        match parsed.scheme() {
66            "http" | "https" => {}
67            other => {
68                return Err(faucet_core::FaucetError::Config(format!(
69                    "schema_registry.url scheme must be http or https, got '{other}'"
70                )));
71            }
72        }
73        if self.cache_capacity == 0 {
74            return Err(faucet_core::FaucetError::Config(
75                "schema_registry.cache_capacity must be at least 1".into(),
76            ));
77        }
78        Ok(())
79    }
80}