danube_client/
schema_registry_client.rs

1use crate::{
2    errors::{DanubeError, Result},
3    retry_manager::RetryManager,
4    schema_types::{CompatibilityMode, SchemaInfo, SchemaType},
5    DanubeClient,
6};
7use danube_core::proto::danube_schema::{
8    schema_registry_client::SchemaRegistryClient as GrpcSchemaRegistryClient,
9    CheckCompatibilityRequest, CheckCompatibilityResponse, GetLatestSchemaRequest,
10    GetSchemaRequest, ListVersionsRequest, RegisterSchemaRequest,
11    RegisterSchemaResponse, SetCompatibilityModeRequest, SetCompatibilityModeResponse,
12};
13
14/// Client for interacting with the Danube Schema Registry
15///
16/// Provides methods for registering, retrieving, and managing schemas
17/// in the centralized schema registry.
18#[derive(Debug, Clone)]
19pub struct SchemaRegistryClient {
20    client: DanubeClient,
21    grpc_client: Option<GrpcSchemaRegistryClient<tonic::transport::Channel>>,
22}
23
24impl SchemaRegistryClient {
25    /// Create a new SchemaRegistryClient from a DanubeClient
26    pub async fn new(client: &DanubeClient) -> Result<Self> {
27        Ok(Self::new_internal(client.clone()))
28    }
29
30    /// Internal constructor for building schema registry client synchronously
31    pub(crate) fn new_internal(client: DanubeClient) -> Self {
32        SchemaRegistryClient {
33            client,
34            grpc_client: None,
35        }
36    }
37
38    /// Connect to the schema registry service
39    async fn connect(&mut self) -> Result<()> {
40        if self.grpc_client.is_some() {
41            return Ok(());
42        }
43
44        let grpc_cnx = self
45            .client
46            .cnx_manager
47            .get_connection(&self.client.uri, &self.client.uri)
48            .await?;
49
50        let client = GrpcSchemaRegistryClient::new(grpc_cnx.grpc_cnx.clone());
51        self.grpc_client = Some(client);
52        Ok(())
53    }
54
55    /// Register a new schema or get existing schema ID
56    ///
57    /// Returns a builder for configuring schema registration
58    pub fn register_schema(&mut self, subject: impl Into<String>) -> SchemaRegistrationBuilder<'_> {
59        SchemaRegistrationBuilder {
60            client: self,
61            subject: subject.into(),
62            schema_type: None,
63            schema_data: None,
64        }
65    }
66
67    /// Get schema by ID
68    ///
69    /// Returns schema information for the given schema ID.
70    /// Schema ID identifies a subject (not a specific version).
71    pub async fn get_schema_by_id(&mut self, schema_id: u64) -> Result<SchemaInfo> {
72        self.connect().await?;
73
74        let request = GetSchemaRequest {
75            schema_id,
76            version: None,
77        };
78
79        let mut req = tonic::Request::new(request);
80        RetryManager::insert_auth_token(&self.client, &mut req, &self.client.uri).await?;
81
82        let response = self
83            .grpc_client
84            .as_mut()
85            .ok_or_else(|| {
86                DanubeError::Unrecoverable("Schema registry client not connected".into())
87            })?
88            .get_schema(req)
89            .await
90            .map_err(|e| DanubeError::Unrecoverable(format!("Failed to get schema: {}", e)))?
91            .into_inner();
92
93        Ok(SchemaInfo::from(response))
94    }
95
96    /// Get specific schema version
97    ///
98    /// Returns schema information for a specific version of a schema subject.
99    pub async fn get_schema_version(
100        &mut self,
101        schema_id: u64,
102        version: Option<u32>,
103    ) -> Result<SchemaInfo> {
104        self.connect().await?;
105
106        let request = GetSchemaRequest { schema_id, version };
107
108        let mut req = tonic::Request::new(request);
109        RetryManager::insert_auth_token(&self.client, &mut req, &self.client.uri).await?;
110
111        let response = self
112            .grpc_client
113            .as_mut()
114            .ok_or_else(|| {
115                DanubeError::Unrecoverable("Schema registry client not connected".into())
116            })?
117            .get_schema(req)
118            .await
119            .map_err(|e| DanubeError::Unrecoverable(format!("Failed to get schema: {}", e)))?
120            .into_inner();
121
122        Ok(SchemaInfo::from(response))
123    }
124
125    /// Get latest schema for a subject
126    ///
127    /// Returns the latest schema version for the given subject.
128    pub async fn get_latest_schema(
129        &mut self,
130        subject: impl Into<String>,
131    ) -> Result<SchemaInfo> {
132        self.connect().await?;
133
134        let request = GetLatestSchemaRequest {
135            subject: subject.into(),
136        };
137
138        let mut req = tonic::Request::new(request);
139        RetryManager::insert_auth_token(&self.client, &mut req, &self.client.uri).await?;
140
141        let response = self
142            .grpc_client
143            .as_mut()
144            .ok_or_else(|| {
145                DanubeError::Unrecoverable("Schema registry client not connected".into())
146            })?
147            .get_latest_schema(req)
148            .await
149            .map_err(|e| DanubeError::Unrecoverable(format!("Failed to get latest schema: {}", e)))?
150            .into_inner();
151
152        Ok(SchemaInfo::from(response))
153    }
154
155    /// Check if a schema is compatible with existing versions
156    ///
157    /// # Arguments
158    /// * `subject` - Schema subject name
159    /// * `schema_data` - Raw schema content
160    /// * `schema_type` - Schema type (Avro, JsonSchema, Protobuf)
161    /// * `mode` - Optional compatibility mode override (uses subject's default if None)
162    pub async fn check_compatibility(
163        &mut self,
164        subject: impl Into<String>,
165        schema_data: Vec<u8>,
166        schema_type: SchemaType,
167        mode: Option<CompatibilityMode>,
168    ) -> Result<CheckCompatibilityResponse> {
169        self.connect().await?;
170
171        let request = CheckCompatibilityRequest {
172            subject: subject.into(),
173            new_schema_definition: schema_data,
174            schema_type: schema_type.as_str().to_string(),
175            compatibility_mode: mode.map(|m| m.as_str().to_string()),
176        };
177
178        let mut req = tonic::Request::new(request);
179        RetryManager::insert_auth_token(&self.client, &mut req, &self.client.uri).await?;
180
181        let response = self
182            .grpc_client
183            .as_mut()
184            .ok_or_else(|| {
185                DanubeError::Unrecoverable("Schema registry client not connected".into())
186            })?
187            .check_compatibility(req)
188            .await
189            .map_err(|e| {
190                DanubeError::Unrecoverable(format!("Failed to check compatibility: {}", e))
191            })?
192            .into_inner();
193
194        Ok(response)
195    }
196
197    /// Set compatibility mode for a subject
198    ///
199    /// # Arguments
200    /// * `subject` - Schema subject name
201    /// * `mode` - Compatibility mode to set
202    ///
203    /// # Example
204    /// ```no_run
205    /// use danube_client::{SchemaRegistryClient, CompatibilityMode};
206    ///
207    /// schema_client.set_compatibility_mode("critical-orders", CompatibilityMode::Full).await?;
208    /// ```
209    pub async fn set_compatibility_mode(
210        &mut self,
211        subject: impl Into<String>,
212        mode: CompatibilityMode,
213    ) -> Result<SetCompatibilityModeResponse> {
214        self.connect().await?;
215
216        let request = SetCompatibilityModeRequest {
217            subject: subject.into(),
218            compatibility_mode: mode.as_str().to_string(),
219        };
220
221        let mut req = tonic::Request::new(request);
222        RetryManager::insert_auth_token(&self.client, &mut req, &self.client.uri).await?;
223
224        let response = self
225            .grpc_client
226            .as_mut()
227            .ok_or_else(|| {
228                DanubeError::Unrecoverable("Schema registry client not connected".into())
229            })?
230            .set_compatibility_mode(req)
231            .await
232            .map_err(|e| {
233                DanubeError::Unrecoverable(format!("Failed to set compatibility mode: {}", e))
234            })?
235            .into_inner();
236
237        Ok(response)
238    }
239
240    /// List all versions for a subject
241    pub async fn list_versions(&mut self, subject: impl Into<String>) -> Result<Vec<u32>> {
242        self.connect().await?;
243
244        let request = ListVersionsRequest {
245            subject: subject.into(),
246        };
247
248        let mut req = tonic::Request::new(request);
249        RetryManager::insert_auth_token(&self.client, &mut req, &self.client.uri).await?;
250
251        let response = self
252            .grpc_client
253            .as_mut()
254            .ok_or_else(|| {
255                DanubeError::Unrecoverable("Schema registry client not connected".into())
256            })?
257            .list_versions(req)
258            .await
259            .map_err(|e| DanubeError::Unrecoverable(format!("Failed to list versions: {}", e)))?
260            .into_inner();
261
262        Ok(response.versions.into_iter().map(|v| v.version).collect())
263    }
264
265    /// Internal method to register schema via gRPC
266    async fn register_schema_internal(
267        &mut self,
268        subject: String,
269        schema_type: String,
270        schema_data: Vec<u8>,
271    ) -> Result<RegisterSchemaResponse> {
272        self.connect().await?;
273
274        let request = RegisterSchemaRequest {
275            subject,
276            schema_type,
277            schema_definition: schema_data,
278            description: String::new(),
279            created_by: String::from("danube-client"),
280            tags: vec![],
281        };
282
283        let mut req = tonic::Request::new(request);
284        RetryManager::insert_auth_token(&self.client, &mut req, &self.client.uri).await?;
285
286        let response = self
287            .grpc_client
288            .as_mut()
289            .ok_or_else(|| {
290                DanubeError::Unrecoverable("Schema registry client not connected".into())
291            })?
292            .register_schema(req)
293            .await
294            .map_err(|e| DanubeError::Unrecoverable(format!("Failed to register schema: {}", e)))?
295            .into_inner();
296
297        Ok(response)
298    }
299}
300
301/// Builder for schema registration with fluent API
302///
303/// This builder provides a convenient way to register schemas in the Danube Schema Registry.
304/// It supports all schema types (Avro, Protobuf, JSON Schema, etc.) and handles version management.
305///
306/// # Example
307///
308/// ```no_run
309/// use danube_client::{DanubeClient, SchemaType};
310///
311/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
312/// let client = DanubeClient::builder()
313///     .service_url("http://localhost:6650")
314///     .build()
315///     .await?;
316///
317/// let mut schema_client = client.schema_registry_client();
318///
319/// // Register an Avro schema
320/// let schema_id = schema_client
321///     .register_schema("user-events-value")
322///     .with_type(SchemaType::Avro)
323///     .with_schema_data(avro_schema_bytes)
324///     .execute()
325///     .await?;
326///
327/// println!("Schema registered with ID: {}", schema_id);
328/// # Ok(())
329/// # }
330/// ```
331///
332/// # Schema Versioning
333///
334/// The registry automatically handles versioning:
335/// - If the schema definition is new, a new version is created
336/// - If the schema definition already exists, the existing schema_id and version are returned
337/// - Compatibility checks are performed based on the subject's compatibility mode
338pub struct SchemaRegistrationBuilder<'a> {
339    client: &'a mut SchemaRegistryClient,
340    subject: String,
341    schema_type: Option<SchemaType>,
342    schema_data: Option<Vec<u8>>,
343}
344
345impl<'a> SchemaRegistrationBuilder<'a> {
346    /// Set the schema type
347    ///
348    /// # Example
349    /// ```no_run
350    /// use danube_client::SchemaType;
351    ///
352    /// .with_type(SchemaType::Avro)
353    /// ```
354    pub fn with_type(mut self, schema_type: SchemaType) -> Self {
355        self.schema_type = Some(schema_type);
356        self
357    }
358
359    /// Set the schema data (raw schema content)
360    pub fn with_schema_data(mut self, data: impl Into<Vec<u8>>) -> Self {
361        self.schema_data = Some(data.into());
362        self
363    }
364
365    /// Execute the schema registration
366    pub async fn execute(self) -> Result<u64> {
367        let schema_type = self
368            .schema_type
369            .ok_or_else(|| DanubeError::Unrecoverable("Schema type is required".into()))?;
370        let schema_data = self
371            .schema_data
372            .ok_or_else(|| DanubeError::Unrecoverable("Schema data is required".into()))?;
373
374        let response = self
375            .client
376            .register_schema_internal(self.subject, schema_type.as_str().to_string(), schema_data)
377            .await?;
378
379        Ok(response.schema_id)
380    }
381}