use crate::{
errors::{DanubeError, Result},
retry_manager::RetryManager,
schema_types::{CompatibilityMode, SchemaInfo, SchemaType},
DanubeClient,
};
use danube_core::proto::danube_schema::{
schema_registry_client::SchemaRegistryClient as GrpcSchemaRegistryClient,
CheckCompatibilityRequest, CheckCompatibilityResponse, GetLatestSchemaRequest,
GetSchemaRequest, ListVersionsRequest, RegisterSchemaRequest,
RegisterSchemaResponse, SetCompatibilityModeRequest, SetCompatibilityModeResponse,
};
#[derive(Debug, Clone)]
pub struct SchemaRegistryClient {
client: DanubeClient,
grpc_client: Option<GrpcSchemaRegistryClient<tonic::transport::Channel>>,
}
impl SchemaRegistryClient {
pub async fn new(client: &DanubeClient) -> Result<Self> {
Ok(Self::new_internal(client.clone()))
}
pub(crate) fn new_internal(client: DanubeClient) -> Self {
SchemaRegistryClient {
client,
grpc_client: None,
}
}
async fn connect(&mut self) -> Result<()> {
if self.grpc_client.is_some() {
return Ok(());
}
let grpc_cnx = self
.client
.cnx_manager
.get_connection(&self.client.uri, &self.client.uri)
.await?;
let client = GrpcSchemaRegistryClient::new(grpc_cnx.grpc_cnx.clone());
self.grpc_client = Some(client);
Ok(())
}
pub fn register_schema(&mut self, subject: impl Into<String>) -> SchemaRegistrationBuilder<'_> {
SchemaRegistrationBuilder {
client: self,
subject: subject.into(),
schema_type: None,
schema_data: None,
}
}
pub async fn get_schema_by_id(&mut self, schema_id: u64) -> Result<SchemaInfo> {
self.connect().await?;
let request = GetSchemaRequest {
schema_id,
version: None,
};
let mut req = tonic::Request::new(request);
RetryManager::insert_auth_token(&self.client, &mut req, &self.client.uri).await?;
let response = self
.grpc_client
.as_mut()
.ok_or_else(|| {
DanubeError::Unrecoverable("Schema registry client not connected".into())
})?
.get_schema(req)
.await
.map_err(|e| DanubeError::Unrecoverable(format!("Failed to get schema: {}", e)))?
.into_inner();
Ok(SchemaInfo::from(response))
}
pub async fn get_schema_version(
&mut self,
schema_id: u64,
version: Option<u32>,
) -> Result<SchemaInfo> {
self.connect().await?;
let request = GetSchemaRequest { schema_id, version };
let mut req = tonic::Request::new(request);
RetryManager::insert_auth_token(&self.client, &mut req, &self.client.uri).await?;
let response = self
.grpc_client
.as_mut()
.ok_or_else(|| {
DanubeError::Unrecoverable("Schema registry client not connected".into())
})?
.get_schema(req)
.await
.map_err(|e| DanubeError::Unrecoverable(format!("Failed to get schema: {}", e)))?
.into_inner();
Ok(SchemaInfo::from(response))
}
pub async fn get_latest_schema(
&mut self,
subject: impl Into<String>,
) -> Result<SchemaInfo> {
self.connect().await?;
let request = GetLatestSchemaRequest {
subject: subject.into(),
};
let mut req = tonic::Request::new(request);
RetryManager::insert_auth_token(&self.client, &mut req, &self.client.uri).await?;
let response = self
.grpc_client
.as_mut()
.ok_or_else(|| {
DanubeError::Unrecoverable("Schema registry client not connected".into())
})?
.get_latest_schema(req)
.await
.map_err(|e| DanubeError::Unrecoverable(format!("Failed to get latest schema: {}", e)))?
.into_inner();
Ok(SchemaInfo::from(response))
}
pub async fn check_compatibility(
&mut self,
subject: impl Into<String>,
schema_data: Vec<u8>,
schema_type: SchemaType,
mode: Option<CompatibilityMode>,
) -> Result<CheckCompatibilityResponse> {
self.connect().await?;
let request = CheckCompatibilityRequest {
subject: subject.into(),
new_schema_definition: schema_data,
schema_type: schema_type.as_str().to_string(),
compatibility_mode: mode.map(|m| m.as_str().to_string()),
};
let mut req = tonic::Request::new(request);
RetryManager::insert_auth_token(&self.client, &mut req, &self.client.uri).await?;
let response = self
.grpc_client
.as_mut()
.ok_or_else(|| {
DanubeError::Unrecoverable("Schema registry client not connected".into())
})?
.check_compatibility(req)
.await
.map_err(|e| {
DanubeError::Unrecoverable(format!("Failed to check compatibility: {}", e))
})?
.into_inner();
Ok(response)
}
pub async fn set_compatibility_mode(
&mut self,
subject: impl Into<String>,
mode: CompatibilityMode,
) -> Result<SetCompatibilityModeResponse> {
self.connect().await?;
let request = SetCompatibilityModeRequest {
subject: subject.into(),
compatibility_mode: mode.as_str().to_string(),
};
let mut req = tonic::Request::new(request);
RetryManager::insert_auth_token(&self.client, &mut req, &self.client.uri).await?;
let response = self
.grpc_client
.as_mut()
.ok_or_else(|| {
DanubeError::Unrecoverable("Schema registry client not connected".into())
})?
.set_compatibility_mode(req)
.await
.map_err(|e| {
DanubeError::Unrecoverable(format!("Failed to set compatibility mode: {}", e))
})?
.into_inner();
Ok(response)
}
pub async fn list_versions(&mut self, subject: impl Into<String>) -> Result<Vec<u32>> {
self.connect().await?;
let request = ListVersionsRequest {
subject: subject.into(),
};
let mut req = tonic::Request::new(request);
RetryManager::insert_auth_token(&self.client, &mut req, &self.client.uri).await?;
let response = self
.grpc_client
.as_mut()
.ok_or_else(|| {
DanubeError::Unrecoverable("Schema registry client not connected".into())
})?
.list_versions(req)
.await
.map_err(|e| DanubeError::Unrecoverable(format!("Failed to list versions: {}", e)))?
.into_inner();
Ok(response.versions.into_iter().map(|v| v.version).collect())
}
async fn register_schema_internal(
&mut self,
subject: String,
schema_type: String,
schema_data: Vec<u8>,
) -> Result<RegisterSchemaResponse> {
self.connect().await?;
let request = RegisterSchemaRequest {
subject,
schema_type,
schema_definition: schema_data,
description: String::new(),
created_by: String::from("danube-client"),
tags: vec![],
};
let mut req = tonic::Request::new(request);
RetryManager::insert_auth_token(&self.client, &mut req, &self.client.uri).await?;
let response = self
.grpc_client
.as_mut()
.ok_or_else(|| {
DanubeError::Unrecoverable("Schema registry client not connected".into())
})?
.register_schema(req)
.await
.map_err(|e| DanubeError::Unrecoverable(format!("Failed to register schema: {}", e)))?
.into_inner();
Ok(response)
}
}
pub struct SchemaRegistrationBuilder<'a> {
client: &'a mut SchemaRegistryClient,
subject: String,
schema_type: Option<SchemaType>,
schema_data: Option<Vec<u8>>,
}
impl<'a> SchemaRegistrationBuilder<'a> {
pub fn with_type(mut self, schema_type: SchemaType) -> Self {
self.schema_type = Some(schema_type);
self
}
pub fn with_schema_data(mut self, data: impl Into<Vec<u8>>) -> Self {
self.schema_data = Some(data.into());
self
}
pub async fn execute(self) -> Result<u64> {
let schema_type = self
.schema_type
.ok_or_else(|| DanubeError::Unrecoverable("Schema type is required".into()))?;
let schema_data = self
.schema_data
.ok_or_else(|| DanubeError::Unrecoverable("Schema data is required".into()))?;
let response = self
.client
.register_schema_internal(self.subject, schema_type.as_str().to_string(), schema_data)
.await?;
Ok(response.schema_id)
}
}