schema_registry_api/service/
mod.rs

1use reqwest::header::{HeaderMap, HeaderValue, ACCEPT};
2use reqwest::{Client, Response, StatusCode, Url};
3use reqwest_middleware::ClientWithMiddleware;
4use reqwest_tracing::TracingMiddleware;
5use serde::de::DeserializeOwned;
6use serde::Serialize;
7
8mod compatibility;
9mod config;
10mod error;
11mod mode;
12mod schema;
13mod subject;
14
15use crate::ApiError;
16
17pub use self::compatibility::*;
18pub use self::config::*;
19pub use self::error::*;
20pub use self::mode::*;
21pub use self::schema::*;
22pub use self::subject::*;
23
24/// A schema registry client
25#[derive(Debug, Clone)]
26pub struct SchemaRegistry {
27    base_url: Url,
28    client: ClientWithMiddleware,
29}
30
31impl SchemaRegistry {
32    /// Create a schema registry client
33    #[must_use]
34    pub fn new(base_url: Url, client: ClientWithMiddleware) -> Self {
35        Self { base_url, client }
36    }
37
38    /// Create a schema registry client
39    ///
40    /// # Errors
41    ///
42    /// Fail if we cannot build the reqwest client
43    pub fn build_default(base_url: Url) -> Result<Self, SchemaRegistryError> {
44        let mut headers = HeaderMap::new();
45        headers.insert(
46            ACCEPT,
47            HeaderValue::from_static("application/vnd.schemaregistry.v1+json"),
48        );
49        let reqwest_client = Client::builder().default_headers(headers).build()?;
50        let client = reqwest_middleware::ClientBuilder::new(reqwest_client)
51            // Insert the tracing middleware
52            .with(TracingMiddleware::default())
53            .build();
54        Ok(Self::new(base_url, client))
55    }
56
57    async fn get<B>(&self, url: Url) -> Result<B, SchemaRegistryError>
58    where
59        B: DeserializeOwned,
60    {
61        let response = self.client.get(url).send().await?;
62        handle_response(response).await
63    }
64
65    async fn get_optional<B>(&self, url: Url) -> Result<Option<B>, SchemaRegistryError>
66    where
67        B: DeserializeOwned,
68    {
69        let response = self.client.get(url).send().await?;
70        handle_optional_response(response).await
71    }
72
73    async fn get_optional_string(&self, url: Url) -> Result<Option<String>, SchemaRegistryError> {
74        let response = self.client.get(url).send().await?;
75        handle_optional_string_response(response).await
76    }
77
78    async fn post<R, B>(&self, url: Url, body: &R) -> Result<B, SchemaRegistryError>
79    where
80        R: Serialize,
81        B: DeserializeOwned,
82    {
83        let response = self.client.post(url).json(body).send().await?;
84        handle_response(response).await
85    }
86
87    async fn put<R, B>(&self, url: Url, body: &R) -> Result<B, SchemaRegistryError>
88    where
89        R: Serialize,
90        B: DeserializeOwned,
91    {
92        let response = self.client.put(url).json(body).send().await?;
93        handle_response(response).await
94    }
95
96    async fn delete<B>(&self, url: Url) -> Result<B, SchemaRegistryError>
97    where
98        B: DeserializeOwned,
99    {
100        let response = self.client.delete(url).send().await?;
101        handle_response(response).await
102    }
103
104    async fn delete_option<B>(&self, url: Url) -> Result<Option<B>, SchemaRegistryError>
105    where
106        B: DeserializeOwned,
107    {
108        let response = self.client.delete(url).send().await?;
109        handle_optional_response(response).await
110    }
111
112    /// Schema client
113    #[must_use]
114    pub fn schema(&self) -> SchemaClient {
115        SchemaClient { sr: self }
116    }
117
118    /// Subject client
119    #[must_use]
120    pub fn subject(&self) -> SubjectClient {
121        SubjectClient { sr: self }
122    }
123
124    /// Mode client
125    #[must_use]
126    pub fn mode(&self) -> ModeClient {
127        ModeClient { sr: self }
128    }
129
130    /// Compatibility client
131    #[must_use]
132    pub fn compatibility(&self) -> CompatibilityClient {
133        CompatibilityClient { sr: self }
134    }
135
136    /// Configuration client
137    #[must_use]
138    pub fn config(&self) -> ConfigClient {
139        ConfigClient { sr: self }
140    }
141}
142
143async fn handle_response<T>(response: Response) -> Result<T, SchemaRegistryError>
144where
145    T: DeserializeOwned,
146{
147    if response.status().is_success() {
148        let result = response.json().await?;
149        Ok(result)
150    } else {
151        let err = handle_error(response).await;
152        Err(err)
153    }
154}
155
156async fn handle_optional_response<T>(response: Response) -> Result<Option<T>, SchemaRegistryError>
157where
158    T: DeserializeOwned,
159{
160    let status = response.status();
161    if status.is_success() {
162        let result = response.json().await?;
163        Ok(Some(result))
164    } else if status == StatusCode::NOT_FOUND || status == StatusCode::NO_CONTENT {
165        Ok(None)
166    } else {
167        let err = handle_error(response).await;
168        Err(err)
169    }
170}
171
172async fn handle_optional_string_response(
173    response: Response,
174) -> Result<Option<String>, SchemaRegistryError> {
175    let status = response.status();
176    if status.is_success() {
177        let result = response.text().await?;
178        Ok(Some(result))
179    } else if status == StatusCode::NOT_FOUND || status == StatusCode::NO_CONTENT {
180        Ok(None)
181    } else {
182        let err = handle_error(response).await;
183        Err(err)
184    }
185}
186
187async fn handle_error(response: Response) -> SchemaRegistryError {
188    let body = response.text().await.unwrap_or_default();
189    if let Ok(error) = serde_json::from_str::<ApiError>(&body) {
190        SchemaRegistryError::ApiError(error)
191    } else {
192        SchemaRegistryError::SchemaRegistryError(body)
193    }
194}