Skip to main content

danube_client/
schema_registry_client.rs

1use std::sync::Arc;
2
3use crate::{
4    auth_service::AuthService,
5    connection_manager::ConnectionManager,
6    errors::{DanubeError, Result},
7    schema_types::{CompatibilityMode, SchemaInfo, SchemaType},
8};
9use danube_core::proto::danube_schema::{
10    schema_registry_client::SchemaRegistryClient as GrpcSchemaRegistryClient,
11    CheckCompatibilityRequest, CheckCompatibilityResponse, GetLatestSchemaRequest,
12    GetSchemaRequest, ListVersionsRequest, RegisterSchemaRequest, RegisterSchemaResponse,
13    SetCompatibilityModeRequest, SetCompatibilityModeResponse,
14};
15use tonic::transport::Uri;
16
17/// Client for interacting with the Danube Schema Registry.
18///
19/// Obtained via [`DanubeClient::schema()`]. Provides methods for registering,
20/// retrieving, and managing schemas in the centralized schema registry.
21///
22/// Follows the same connection pattern as other Danube services — a fresh gRPC
23/// connection is obtained from the shared `ConnectionManager` on each call.
24#[derive(Debug, Clone)]
25pub struct SchemaRegistryClient {
26    cnx_manager: Arc<ConnectionManager>,
27    auth_service: AuthService,
28    uri: Uri,
29}
30
31impl SchemaRegistryClient {
32    /// Create a new `SchemaRegistryClient` from shared connection infrastructure.
33    pub(crate) fn new(
34        cnx_manager: Arc<ConnectionManager>,
35        auth_service: AuthService,
36        uri: Uri,
37    ) -> Self {
38        SchemaRegistryClient {
39            cnx_manager,
40            auth_service,
41            uri,
42        }
43    }
44
45    /// Get a gRPC client and authenticated request for a schema registry call.
46    async fn prepare_request<T>(
47        &self,
48        request: T,
49    ) -> Result<(
50        tonic::Request<T>,
51        GrpcSchemaRegistryClient<tonic::transport::Channel>,
52    )> {
53        let grpc_cnx = self
54            .cnx_manager
55            .get_connection(&self.uri, &self.uri)
56            .await?;
57        let client = GrpcSchemaRegistryClient::new(grpc_cnx.grpc_cnx.clone());
58        let mut req = tonic::Request::new(request);
59        self.auth_service
60            .insert_token_if_needed(
61                self.cnx_manager.connection_options.api_key.as_deref(),
62                &mut req,
63                &self.uri,
64            )
65            .await?;
66        Ok((req, client))
67    }
68
69    /// Register a new schema or get existing schema ID.
70    ///
71    /// Returns a builder for configuring schema registration.
72    pub fn register_schema(&self, subject: impl Into<String>) -> SchemaRegistrationBuilder<'_> {
73        SchemaRegistrationBuilder {
74            client: self,
75            subject: subject.into(),
76            schema_type: None,
77            schema_data: None,
78            description: None,
79            created_by: None,
80            tags: None,
81        }
82    }
83
84    /// Get schema by ID.
85    ///
86    /// Returns schema information for the given schema ID.
87    /// Schema ID identifies a subject (not a specific version).
88    pub async fn get_schema_by_id(&self, schema_id: u64) -> Result<SchemaInfo> {
89        let request = GetSchemaRequest {
90            schema_id,
91            version: None,
92        };
93        let (req, mut client) = self.prepare_request(request).await?;
94        let response = client
95            .get_schema(req)
96            .await
97            .map_err(|status| DanubeError::FromStatus(status))?
98            .into_inner();
99        Ok(SchemaInfo::from(response))
100    }
101
102    /// Get specific schema version.
103    ///
104    /// Returns schema information for a specific version of a schema subject.
105    pub async fn get_schema_version(
106        &self,
107        schema_id: u64,
108        version: Option<u32>,
109    ) -> Result<SchemaInfo> {
110        let request = GetSchemaRequest { schema_id, version };
111        let (req, mut client) = self.prepare_request(request).await?;
112        let response = client
113            .get_schema(req)
114            .await
115            .map_err(|status| DanubeError::FromStatus(status))?
116            .into_inner();
117        Ok(SchemaInfo::from(response))
118    }
119
120    /// Get latest schema for a subject.
121    ///
122    /// Returns the latest schema version for the given subject.
123    pub async fn get_latest_schema(&self, subject: impl Into<String>) -> Result<SchemaInfo> {
124        let request = GetLatestSchemaRequest {
125            subject: subject.into(),
126        };
127        let (req, mut client) = self.prepare_request(request).await?;
128        let response = client
129            .get_latest_schema(req)
130            .await
131            .map_err(|status| DanubeError::FromStatus(status))?
132            .into_inner();
133        Ok(SchemaInfo::from(response))
134    }
135
136    /// Check if a schema is compatible with existing versions.
137    ///
138    /// # Arguments
139    /// * `subject` - Schema subject name
140    /// * `schema_data` - Raw schema content
141    /// * `schema_type` - Schema type (Avro, JsonSchema, Protobuf)
142    /// * `mode` - Optional compatibility mode override (uses subject's default if None)
143    pub async fn check_compatibility(
144        &self,
145        subject: impl Into<String>,
146        schema_data: Vec<u8>,
147        schema_type: SchemaType,
148        mode: Option<CompatibilityMode>,
149    ) -> Result<CheckCompatibilityResponse> {
150        let request = CheckCompatibilityRequest {
151            subject: subject.into(),
152            new_schema_definition: schema_data,
153            schema_type: schema_type.as_str().to_string(),
154            compatibility_mode: mode.map(|m| m.as_str().to_string()),
155        };
156        let (req, mut client) = self.prepare_request(request).await?;
157        let response = client
158            .check_compatibility(req)
159            .await
160            .map_err(|status| DanubeError::FromStatus(status))?
161            .into_inner();
162        Ok(response)
163    }
164
165    /// Set compatibility mode for a subject.
166    ///
167    /// # Arguments
168    /// * `subject` - Schema subject name
169    /// * `mode` - Compatibility mode to set
170    pub async fn set_compatibility_mode(
171        &self,
172        subject: impl Into<String>,
173        mode: CompatibilityMode,
174    ) -> Result<SetCompatibilityModeResponse> {
175        let request = SetCompatibilityModeRequest {
176            subject: subject.into(),
177            compatibility_mode: mode.as_str().to_string(),
178        };
179        let (req, mut client) = self.prepare_request(request).await?;
180        let response = client
181            .set_compatibility_mode(req)
182            .await
183            .map_err(|status| DanubeError::FromStatus(status))?
184            .into_inner();
185        Ok(response)
186    }
187
188    /// List all versions for a subject.
189    pub async fn list_versions(&self, subject: impl Into<String>) -> Result<Vec<u32>> {
190        let request = ListVersionsRequest {
191            subject: subject.into(),
192        };
193        let (req, mut client) = self.prepare_request(request).await?;
194        let response = client
195            .list_versions(req)
196            .await
197            .map_err(|status| DanubeError::FromStatus(status))?
198            .into_inner();
199        Ok(response.versions.into_iter().map(|v| v.version).collect())
200    }
201
202    /// Internal method to register schema via gRPC.
203    async fn register_schema_internal(
204        &self,
205        subject: String,
206        schema_type: String,
207        schema_data: Vec<u8>,
208        description: String,
209        created_by: String,
210        tags: Vec<String>,
211    ) -> Result<RegisterSchemaResponse> {
212        let request = RegisterSchemaRequest {
213            subject,
214            schema_type,
215            schema_definition: schema_data,
216            description,
217            created_by,
218            tags,
219        };
220        let (req, mut client) = self.prepare_request(request).await?;
221        let response = client
222            .register_schema(req)
223            .await
224            .map_err(|status| DanubeError::FromStatus(status))?
225            .into_inner();
226        Ok(response)
227    }
228}
229
230/// Builder for schema registration with fluent API.
231///
232/// # Example
233///
234/// ```no_run
235/// use danube_client::{DanubeClient, SchemaType};
236///
237/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
238/// let client = DanubeClient::builder()
239///     .service_url("http://localhost:6650")
240///     .build()
241///     .await?;
242///
243/// let schema_id = client.schema()
244///     .register_schema("user-events-value")
245///     .with_type(SchemaType::Avro)
246///     .with_schema_data(avro_schema_bytes)
247///     .execute()
248///     .await?;
249/// # Ok(())
250/// # }
251/// ```
252///
253/// # Schema Versioning
254///
255/// The registry automatically handles versioning:
256/// - If the schema definition is new, a new version is created
257/// - If the schema definition already exists, the existing schema_id and version are returned
258/// - Compatibility checks are performed based on the subject's compatibility mode
259pub struct SchemaRegistrationBuilder<'a> {
260    client: &'a SchemaRegistryClient,
261    subject: String,
262    schema_type: Option<SchemaType>,
263    schema_data: Option<Vec<u8>>,
264    description: Option<String>,
265    created_by: Option<String>,
266    tags: Option<Vec<String>>,
267}
268
269impl<'a> SchemaRegistrationBuilder<'a> {
270    /// Set the schema type
271    pub fn with_type(mut self, schema_type: SchemaType) -> Self {
272        self.schema_type = Some(schema_type);
273        self
274    }
275
276    /// Set the schema data (raw schema content)
277    pub fn with_schema_data(mut self, data: impl Into<Vec<u8>>) -> Self {
278        self.schema_data = Some(data.into());
279        self
280    }
281
282    /// Set an optional description for the schema
283    pub fn with_description(mut self, description: impl Into<String>) -> Self {
284        self.description = Some(description.into());
285        self
286    }
287
288    /// Set who created the schema (defaults to "danube-client")
289    pub fn with_created_by(mut self, created_by: impl Into<String>) -> Self {
290        self.created_by = Some(created_by.into());
291        self
292    }
293
294    /// Set tags for the schema
295    pub fn with_tags(mut self, tags: Vec<String>) -> Self {
296        self.tags = Some(tags);
297        self
298    }
299
300    /// Execute the schema registration
301    pub async fn execute(self) -> Result<u64> {
302        let schema_type = self
303            .schema_type
304            .ok_or_else(|| DanubeError::SchemaError("Schema type is required".into()))?;
305        let schema_data = self
306            .schema_data
307            .ok_or_else(|| DanubeError::SchemaError("Schema data is required".into()))?;
308
309        let response = self
310            .client
311            .register_schema_internal(
312                self.subject,
313                schema_type.as_str().to_string(),
314                schema_data,
315                self.description.unwrap_or_default(),
316                self.created_by.unwrap_or_else(|| "danube-client".into()),
317                self.tags.unwrap_or_default(),
318            )
319            .await?;
320
321        Ok(response.schema_id)
322    }
323}