schema_registry_api/service/
mod.rs1use reqwest::header::{HeaderMap, HeaderValue, ACCEPT};
2use reqwest::{Client, Response, StatusCode, Url};
3use reqwest_middleware::ClientWithMiddleware;
4use reqwest_tracing::TracingMiddleware;
5use serde::de::DeserializeOwned;
6use serde::Serialize;
7
8mod compatibility;
9mod config;
10mod error;
11mod mode;
12mod schema;
13mod subject;
14
15use crate::ApiError;
16
17pub use self::compatibility::*;
18pub use self::config::*;
19pub use self::error::*;
20pub use self::mode::*;
21pub use self::schema::*;
22pub use self::subject::*;
23
24#[derive(Debug, Clone)]
26pub struct SchemaRegistry {
27 base_url: Url,
28 client: ClientWithMiddleware,
29}
30
31impl SchemaRegistry {
32 #[must_use]
34 pub fn new(base_url: Url, client: ClientWithMiddleware) -> Self {
35 Self { base_url, client }
36 }
37
38 pub fn build_default(base_url: Url) -> Result<Self, SchemaRegistryError> {
44 let mut headers = HeaderMap::new();
45 headers.insert(
46 ACCEPT,
47 HeaderValue::from_static("application/vnd.schemaregistry.v1+json"),
48 );
49 let reqwest_client = Client::builder().default_headers(headers).build()?;
50 let client = reqwest_middleware::ClientBuilder::new(reqwest_client)
51 .with(TracingMiddleware::default())
53 .build();
54 Ok(Self::new(base_url, client))
55 }
56
57 async fn get<B>(&self, url: Url) -> Result<B, SchemaRegistryError>
58 where
59 B: DeserializeOwned,
60 {
61 let response = self.client.get(url).send().await?;
62 handle_response(response).await
63 }
64
65 async fn get_optional<B>(&self, url: Url) -> Result<Option<B>, SchemaRegistryError>
66 where
67 B: DeserializeOwned,
68 {
69 let response = self.client.get(url).send().await?;
70 handle_optional_response(response).await
71 }
72
73 async fn get_optional_string(&self, url: Url) -> Result<Option<String>, SchemaRegistryError> {
74 let response = self.client.get(url).send().await?;
75 handle_optional_string_response(response).await
76 }
77
78 async fn post<R, B>(&self, url: Url, body: &R) -> Result<B, SchemaRegistryError>
79 where
80 R: Serialize,
81 B: DeserializeOwned,
82 {
83 let response = self.client.post(url).json(body).send().await?;
84 handle_response(response).await
85 }
86
87 async fn put<R, B>(&self, url: Url, body: &R) -> Result<B, SchemaRegistryError>
88 where
89 R: Serialize,
90 B: DeserializeOwned,
91 {
92 let response = self.client.put(url).json(body).send().await?;
93 handle_response(response).await
94 }
95
96 async fn delete<B>(&self, url: Url) -> Result<B, SchemaRegistryError>
97 where
98 B: DeserializeOwned,
99 {
100 let response = self.client.delete(url).send().await?;
101 handle_response(response).await
102 }
103
104 async fn delete_option<B>(&self, url: Url) -> Result<Option<B>, SchemaRegistryError>
105 where
106 B: DeserializeOwned,
107 {
108 let response = self.client.delete(url).send().await?;
109 handle_optional_response(response).await
110 }
111
112 #[must_use]
114 pub fn schema(&self) -> SchemaClient {
115 SchemaClient { sr: self }
116 }
117
118 #[must_use]
120 pub fn subject(&self) -> SubjectClient {
121 SubjectClient { sr: self }
122 }
123
124 #[must_use]
126 pub fn mode(&self) -> ModeClient {
127 ModeClient { sr: self }
128 }
129
130 #[must_use]
132 pub fn compatibility(&self) -> CompatibilityClient {
133 CompatibilityClient { sr: self }
134 }
135
136 #[must_use]
138 pub fn config(&self) -> ConfigClient {
139 ConfigClient { sr: self }
140 }
141}
142
143async fn handle_response<T>(response: Response) -> Result<T, SchemaRegistryError>
144where
145 T: DeserializeOwned,
146{
147 if response.status().is_success() {
148 let result = response.json().await?;
149 Ok(result)
150 } else {
151 let err = handle_error(response).await;
152 Err(err)
153 }
154}
155
156async fn handle_optional_response<T>(response: Response) -> Result<Option<T>, SchemaRegistryError>
157where
158 T: DeserializeOwned,
159{
160 let status = response.status();
161 if status.is_success() {
162 let result = response.json().await?;
163 Ok(Some(result))
164 } else if status == StatusCode::NOT_FOUND || status == StatusCode::NO_CONTENT {
165 Ok(None)
166 } else {
167 let err = handle_error(response).await;
168 Err(err)
169 }
170}
171
172async fn handle_optional_string_response(
173 response: Response,
174) -> Result<Option<String>, SchemaRegistryError> {
175 let status = response.status();
176 if status.is_success() {
177 let result = response.text().await?;
178 Ok(Some(result))
179 } else if status == StatusCode::NOT_FOUND || status == StatusCode::NO_CONTENT {
180 Ok(None)
181 } else {
182 let err = handle_error(response).await;
183 Err(err)
184 }
185}
186
187async fn handle_error(response: Response) -> SchemaRegistryError {
188 let body = response.text().await.unwrap_or_default();
189 if let Ok(error) = serde_json::from_str::<ApiError>(&body) {
190 SchemaRegistryError::ApiError(error)
191 } else {
192 SchemaRegistryError::SchemaRegistryError(body)
193 }
194}