1use std::sync::Arc;
2
3use crate::{
4 auth_service::AuthService,
5 connection_manager::ConnectionManager,
6 errors::{DanubeError, Result},
7 schema_types::{CompatibilityMode, SchemaInfo, SchemaType},
8};
9use danube_core::proto::danube_schema::{
10 schema_registry_client::SchemaRegistryClient as GrpcSchemaRegistryClient,
11 CheckCompatibilityRequest, CheckCompatibilityResponse, GetLatestSchemaRequest,
12 GetSchemaRequest, ListVersionsRequest, RegisterSchemaRequest, RegisterSchemaResponse,
13 SetCompatibilityModeRequest, SetCompatibilityModeResponse,
14};
15use tonic::transport::Uri;
16
17#[derive(Debug, Clone)]
25pub struct SchemaRegistryClient {
26 cnx_manager: Arc<ConnectionManager>,
27 auth_service: AuthService,
28 uri: Uri,
29}
30
31impl SchemaRegistryClient {
32 pub(crate) fn new(
34 cnx_manager: Arc<ConnectionManager>,
35 auth_service: AuthService,
36 uri: Uri,
37 ) -> Self {
38 SchemaRegistryClient {
39 cnx_manager,
40 auth_service,
41 uri,
42 }
43 }
44
45 async fn prepare_request<T>(
47 &self,
48 request: T,
49 ) -> Result<(
50 tonic::Request<T>,
51 GrpcSchemaRegistryClient<tonic::transport::Channel>,
52 )> {
53 let grpc_cnx = self
54 .cnx_manager
55 .get_connection(&self.uri, &self.uri)
56 .await?;
57 let client = GrpcSchemaRegistryClient::new(grpc_cnx.grpc_cnx.clone());
58 let mut req = tonic::Request::new(request);
59 self.auth_service
60 .insert_token_if_needed(
61 self.cnx_manager.connection_options.api_key.as_deref(),
62 &mut req,
63 &self.uri,
64 )
65 .await?;
66 Ok((req, client))
67 }
68
69 pub fn register_schema(&self, subject: impl Into<String>) -> SchemaRegistrationBuilder<'_> {
73 SchemaRegistrationBuilder {
74 client: self,
75 subject: subject.into(),
76 schema_type: None,
77 schema_data: None,
78 description: None,
79 created_by: None,
80 tags: None,
81 }
82 }
83
84 pub async fn get_schema_by_id(&self, schema_id: u64) -> Result<SchemaInfo> {
89 let request = GetSchemaRequest {
90 schema_id,
91 version: None,
92 };
93 let (req, mut client) = self.prepare_request(request).await?;
94 let response = client
95 .get_schema(req)
96 .await
97 .map_err(|status| DanubeError::FromStatus(status))?
98 .into_inner();
99 Ok(SchemaInfo::from(response))
100 }
101
102 pub async fn get_schema_version(
106 &self,
107 schema_id: u64,
108 version: Option<u32>,
109 ) -> Result<SchemaInfo> {
110 let request = GetSchemaRequest { schema_id, version };
111 let (req, mut client) = self.prepare_request(request).await?;
112 let response = client
113 .get_schema(req)
114 .await
115 .map_err(|status| DanubeError::FromStatus(status))?
116 .into_inner();
117 Ok(SchemaInfo::from(response))
118 }
119
120 pub async fn get_latest_schema(&self, subject: impl Into<String>) -> Result<SchemaInfo> {
124 let request = GetLatestSchemaRequest {
125 subject: subject.into(),
126 };
127 let (req, mut client) = self.prepare_request(request).await?;
128 let response = client
129 .get_latest_schema(req)
130 .await
131 .map_err(|status| DanubeError::FromStatus(status))?
132 .into_inner();
133 Ok(SchemaInfo::from(response))
134 }
135
136 pub async fn check_compatibility(
144 &self,
145 subject: impl Into<String>,
146 schema_data: Vec<u8>,
147 schema_type: SchemaType,
148 mode: Option<CompatibilityMode>,
149 ) -> Result<CheckCompatibilityResponse> {
150 let request = CheckCompatibilityRequest {
151 subject: subject.into(),
152 new_schema_definition: schema_data,
153 schema_type: schema_type.as_str().to_string(),
154 compatibility_mode: mode.map(|m| m.as_str().to_string()),
155 };
156 let (req, mut client) = self.prepare_request(request).await?;
157 let response = client
158 .check_compatibility(req)
159 .await
160 .map_err(|status| DanubeError::FromStatus(status))?
161 .into_inner();
162 Ok(response)
163 }
164
165 pub async fn set_compatibility_mode(
171 &self,
172 subject: impl Into<String>,
173 mode: CompatibilityMode,
174 ) -> Result<SetCompatibilityModeResponse> {
175 let request = SetCompatibilityModeRequest {
176 subject: subject.into(),
177 compatibility_mode: mode.as_str().to_string(),
178 };
179 let (req, mut client) = self.prepare_request(request).await?;
180 let response = client
181 .set_compatibility_mode(req)
182 .await
183 .map_err(|status| DanubeError::FromStatus(status))?
184 .into_inner();
185 Ok(response)
186 }
187
188 pub async fn list_versions(&self, subject: impl Into<String>) -> Result<Vec<u32>> {
190 let request = ListVersionsRequest {
191 subject: subject.into(),
192 };
193 let (req, mut client) = self.prepare_request(request).await?;
194 let response = client
195 .list_versions(req)
196 .await
197 .map_err(|status| DanubeError::FromStatus(status))?
198 .into_inner();
199 Ok(response.versions.into_iter().map(|v| v.version).collect())
200 }
201
202 async fn register_schema_internal(
204 &self,
205 subject: String,
206 schema_type: String,
207 schema_data: Vec<u8>,
208 description: String,
209 created_by: String,
210 tags: Vec<String>,
211 ) -> Result<RegisterSchemaResponse> {
212 let request = RegisterSchemaRequest {
213 subject,
214 schema_type,
215 schema_definition: schema_data,
216 description,
217 created_by,
218 tags,
219 };
220 let (req, mut client) = self.prepare_request(request).await?;
221 let response = client
222 .register_schema(req)
223 .await
224 .map_err(|status| DanubeError::FromStatus(status))?
225 .into_inner();
226 Ok(response)
227 }
228}
229
230pub struct SchemaRegistrationBuilder<'a> {
260 client: &'a SchemaRegistryClient,
261 subject: String,
262 schema_type: Option<SchemaType>,
263 schema_data: Option<Vec<u8>>,
264 description: Option<String>,
265 created_by: Option<String>,
266 tags: Option<Vec<String>>,
267}
268
269impl<'a> SchemaRegistrationBuilder<'a> {
270 pub fn with_type(mut self, schema_type: SchemaType) -> Self {
272 self.schema_type = Some(schema_type);
273 self
274 }
275
276 pub fn with_schema_data(mut self, data: impl Into<Vec<u8>>) -> Self {
278 self.schema_data = Some(data.into());
279 self
280 }
281
282 pub fn with_description(mut self, description: impl Into<String>) -> Self {
284 self.description = Some(description.into());
285 self
286 }
287
288 pub fn with_created_by(mut self, created_by: impl Into<String>) -> Self {
290 self.created_by = Some(created_by.into());
291 self
292 }
293
294 pub fn with_tags(mut self, tags: Vec<String>) -> Self {
296 self.tags = Some(tags);
297 self
298 }
299
300 pub async fn execute(self) -> Result<u64> {
302 let schema_type = self
303 .schema_type
304 .ok_or_else(|| DanubeError::SchemaError("Schema type is required".into()))?;
305 let schema_data = self
306 .schema_data
307 .ok_or_else(|| DanubeError::SchemaError("Schema data is required".into()))?;
308
309 let response = self
310 .client
311 .register_schema_internal(
312 self.subject,
313 schema_type.as_str().to_string(),
314 schema_data,
315 self.description.unwrap_or_default(),
316 self.created_by.unwrap_or_else(|| "danube-client".into()),
317 self.tags.unwrap_or_default(),
318 )
319 .await?;
320
321 Ok(response.schema_id)
322 }
323}