1use crate::{
2 errors::{DanubeError, Result},
3 retry_manager::RetryManager,
4 schema_types::{CompatibilityMode, SchemaInfo, SchemaType},
5 DanubeClient,
6};
7use danube_core::proto::danube_schema::{
8 schema_registry_client::SchemaRegistryClient as GrpcSchemaRegistryClient,
9 CheckCompatibilityRequest, CheckCompatibilityResponse, GetLatestSchemaRequest,
10 GetSchemaRequest, ListVersionsRequest, RegisterSchemaRequest,
11 RegisterSchemaResponse, SetCompatibilityModeRequest, SetCompatibilityModeResponse,
12};
13
14#[derive(Debug, Clone)]
19pub struct SchemaRegistryClient {
20 client: DanubeClient,
21 grpc_client: Option<GrpcSchemaRegistryClient<tonic::transport::Channel>>,
22}
23
24impl SchemaRegistryClient {
25 pub async fn new(client: &DanubeClient) -> Result<Self> {
27 Ok(Self::new_internal(client.clone()))
28 }
29
30 pub(crate) fn new_internal(client: DanubeClient) -> Self {
32 SchemaRegistryClient {
33 client,
34 grpc_client: None,
35 }
36 }
37
38 async fn connect(&mut self) -> Result<()> {
40 if self.grpc_client.is_some() {
41 return Ok(());
42 }
43
44 let grpc_cnx = self
45 .client
46 .cnx_manager
47 .get_connection(&self.client.uri, &self.client.uri)
48 .await?;
49
50 let client = GrpcSchemaRegistryClient::new(grpc_cnx.grpc_cnx.clone());
51 self.grpc_client = Some(client);
52 Ok(())
53 }
54
55 pub fn register_schema(&mut self, subject: impl Into<String>) -> SchemaRegistrationBuilder<'_> {
59 SchemaRegistrationBuilder {
60 client: self,
61 subject: subject.into(),
62 schema_type: None,
63 schema_data: None,
64 }
65 }
66
67 pub async fn get_schema_by_id(&mut self, schema_id: u64) -> Result<SchemaInfo> {
72 self.connect().await?;
73
74 let request = GetSchemaRequest {
75 schema_id,
76 version: None,
77 };
78
79 let mut req = tonic::Request::new(request);
80 RetryManager::insert_auth_token(&self.client, &mut req, &self.client.uri).await?;
81
82 let response = self
83 .grpc_client
84 .as_mut()
85 .ok_or_else(|| {
86 DanubeError::Unrecoverable("Schema registry client not connected".into())
87 })?
88 .get_schema(req)
89 .await
90 .map_err(|e| DanubeError::Unrecoverable(format!("Failed to get schema: {}", e)))?
91 .into_inner();
92
93 Ok(SchemaInfo::from(response))
94 }
95
96 pub async fn get_schema_version(
100 &mut self,
101 schema_id: u64,
102 version: Option<u32>,
103 ) -> Result<SchemaInfo> {
104 self.connect().await?;
105
106 let request = GetSchemaRequest { schema_id, version };
107
108 let mut req = tonic::Request::new(request);
109 RetryManager::insert_auth_token(&self.client, &mut req, &self.client.uri).await?;
110
111 let response = self
112 .grpc_client
113 .as_mut()
114 .ok_or_else(|| {
115 DanubeError::Unrecoverable("Schema registry client not connected".into())
116 })?
117 .get_schema(req)
118 .await
119 .map_err(|e| DanubeError::Unrecoverable(format!("Failed to get schema: {}", e)))?
120 .into_inner();
121
122 Ok(SchemaInfo::from(response))
123 }
124
125 pub async fn get_latest_schema(
129 &mut self,
130 subject: impl Into<String>,
131 ) -> Result<SchemaInfo> {
132 self.connect().await?;
133
134 let request = GetLatestSchemaRequest {
135 subject: subject.into(),
136 };
137
138 let mut req = tonic::Request::new(request);
139 RetryManager::insert_auth_token(&self.client, &mut req, &self.client.uri).await?;
140
141 let response = self
142 .grpc_client
143 .as_mut()
144 .ok_or_else(|| {
145 DanubeError::Unrecoverable("Schema registry client not connected".into())
146 })?
147 .get_latest_schema(req)
148 .await
149 .map_err(|e| DanubeError::Unrecoverable(format!("Failed to get latest schema: {}", e)))?
150 .into_inner();
151
152 Ok(SchemaInfo::from(response))
153 }
154
155 pub async fn check_compatibility(
163 &mut self,
164 subject: impl Into<String>,
165 schema_data: Vec<u8>,
166 schema_type: SchemaType,
167 mode: Option<CompatibilityMode>,
168 ) -> Result<CheckCompatibilityResponse> {
169 self.connect().await?;
170
171 let request = CheckCompatibilityRequest {
172 subject: subject.into(),
173 new_schema_definition: schema_data,
174 schema_type: schema_type.as_str().to_string(),
175 compatibility_mode: mode.map(|m| m.as_str().to_string()),
176 };
177
178 let mut req = tonic::Request::new(request);
179 RetryManager::insert_auth_token(&self.client, &mut req, &self.client.uri).await?;
180
181 let response = self
182 .grpc_client
183 .as_mut()
184 .ok_or_else(|| {
185 DanubeError::Unrecoverable("Schema registry client not connected".into())
186 })?
187 .check_compatibility(req)
188 .await
189 .map_err(|e| {
190 DanubeError::Unrecoverable(format!("Failed to check compatibility: {}", e))
191 })?
192 .into_inner();
193
194 Ok(response)
195 }
196
197 pub async fn set_compatibility_mode(
210 &mut self,
211 subject: impl Into<String>,
212 mode: CompatibilityMode,
213 ) -> Result<SetCompatibilityModeResponse> {
214 self.connect().await?;
215
216 let request = SetCompatibilityModeRequest {
217 subject: subject.into(),
218 compatibility_mode: mode.as_str().to_string(),
219 };
220
221 let mut req = tonic::Request::new(request);
222 RetryManager::insert_auth_token(&self.client, &mut req, &self.client.uri).await?;
223
224 let response = self
225 .grpc_client
226 .as_mut()
227 .ok_or_else(|| {
228 DanubeError::Unrecoverable("Schema registry client not connected".into())
229 })?
230 .set_compatibility_mode(req)
231 .await
232 .map_err(|e| {
233 DanubeError::Unrecoverable(format!("Failed to set compatibility mode: {}", e))
234 })?
235 .into_inner();
236
237 Ok(response)
238 }
239
240 pub async fn list_versions(&mut self, subject: impl Into<String>) -> Result<Vec<u32>> {
242 self.connect().await?;
243
244 let request = ListVersionsRequest {
245 subject: subject.into(),
246 };
247
248 let mut req = tonic::Request::new(request);
249 RetryManager::insert_auth_token(&self.client, &mut req, &self.client.uri).await?;
250
251 let response = self
252 .grpc_client
253 .as_mut()
254 .ok_or_else(|| {
255 DanubeError::Unrecoverable("Schema registry client not connected".into())
256 })?
257 .list_versions(req)
258 .await
259 .map_err(|e| DanubeError::Unrecoverable(format!("Failed to list versions: {}", e)))?
260 .into_inner();
261
262 Ok(response.versions.into_iter().map(|v| v.version).collect())
263 }
264
265 async fn register_schema_internal(
267 &mut self,
268 subject: String,
269 schema_type: String,
270 schema_data: Vec<u8>,
271 ) -> Result<RegisterSchemaResponse> {
272 self.connect().await?;
273
274 let request = RegisterSchemaRequest {
275 subject,
276 schema_type,
277 schema_definition: schema_data,
278 description: String::new(),
279 created_by: String::from("danube-client"),
280 tags: vec![],
281 };
282
283 let mut req = tonic::Request::new(request);
284 RetryManager::insert_auth_token(&self.client, &mut req, &self.client.uri).await?;
285
286 let response = self
287 .grpc_client
288 .as_mut()
289 .ok_or_else(|| {
290 DanubeError::Unrecoverable("Schema registry client not connected".into())
291 })?
292 .register_schema(req)
293 .await
294 .map_err(|e| DanubeError::Unrecoverable(format!("Failed to register schema: {}", e)))?
295 .into_inner();
296
297 Ok(response)
298 }
299}
300
301pub struct SchemaRegistrationBuilder<'a> {
339 client: &'a mut SchemaRegistryClient,
340 subject: String,
341 schema_type: Option<SchemaType>,
342 schema_data: Option<Vec<u8>>,
343}
344
345impl<'a> SchemaRegistrationBuilder<'a> {
346 pub fn with_type(mut self, schema_type: SchemaType) -> Self {
355 self.schema_type = Some(schema_type);
356 self
357 }
358
359 pub fn with_schema_data(mut self, data: impl Into<Vec<u8>>) -> Self {
361 self.schema_data = Some(data.into());
362 self
363 }
364
365 pub async fn execute(self) -> Result<u64> {
367 let schema_type = self
368 .schema_type
369 .ok_or_else(|| DanubeError::Unrecoverable("Schema type is required".into()))?;
370 let schema_data = self
371 .schema_data
372 .ok_or_else(|| DanubeError::Unrecoverable("Schema data is required".into()))?;
373
374 let response = self
375 .client
376 .register_schema_internal(self.subject, schema_type.as_str().to_string(), schema_data)
377 .await?;
378
379 Ok(response.schema_id)
380 }
381}