use std::sync::Arc;
use crate::{
auth_service::AuthService,
connection_manager::ConnectionManager,
errors::{DanubeError, Result},
schema_types::{CompatibilityMode, SchemaInfo, SchemaType},
};
use danube_core::proto::danube_schema::{
schema_registry_client::SchemaRegistryClient as GrpcSchemaRegistryClient,
CheckCompatibilityRequest, CheckCompatibilityResponse, GetLatestSchemaRequest,
GetSchemaRequest, ListVersionsRequest, RegisterSchemaRequest, RegisterSchemaResponse,
SetCompatibilityModeRequest, SetCompatibilityModeResponse,
};
use tonic::transport::Uri;
#[derive(Debug, Clone)]
pub struct SchemaRegistryClient {
cnx_manager: Arc<ConnectionManager>,
auth_service: AuthService,
uri: Uri,
}
impl SchemaRegistryClient {
pub(crate) fn new(
cnx_manager: Arc<ConnectionManager>,
auth_service: AuthService,
uri: Uri,
) -> Self {
SchemaRegistryClient {
cnx_manager,
auth_service,
uri,
}
}
async fn prepare_request<T>(
&self,
request: T,
) -> Result<(
tonic::Request<T>,
GrpcSchemaRegistryClient<tonic::transport::Channel>,
)> {
let grpc_cnx = self
.cnx_manager
.get_connection(&self.uri, &self.uri)
.await?;
let client = GrpcSchemaRegistryClient::new(grpc_cnx.grpc_cnx.clone());
let mut req = tonic::Request::new(request);
self.auth_service
.insert_token_if_needed(
self.cnx_manager.connection_options.api_key.as_deref(),
&mut req,
&self.uri,
)
.await?;
Ok((req, client))
}
pub fn register_schema(&self, subject: impl Into<String>) -> SchemaRegistrationBuilder<'_> {
SchemaRegistrationBuilder {
client: self,
subject: subject.into(),
schema_type: None,
schema_data: None,
description: None,
created_by: None,
tags: None,
}
}
pub async fn get_schema_by_id(&self, schema_id: u64) -> Result<SchemaInfo> {
let request = GetSchemaRequest {
schema_id,
version: None,
};
let (req, mut client) = self.prepare_request(request).await?;
let response = client
.get_schema(req)
.await
.map_err(|status| DanubeError::FromStatus(status))?
.into_inner();
Ok(SchemaInfo::from(response))
}
pub async fn get_schema_version(
&self,
schema_id: u64,
version: Option<u32>,
) -> Result<SchemaInfo> {
let request = GetSchemaRequest { schema_id, version };
let (req, mut client) = self.prepare_request(request).await?;
let response = client
.get_schema(req)
.await
.map_err(|status| DanubeError::FromStatus(status))?
.into_inner();
Ok(SchemaInfo::from(response))
}
pub async fn get_latest_schema(&self, subject: impl Into<String>) -> Result<SchemaInfo> {
let request = GetLatestSchemaRequest {
subject: subject.into(),
};
let (req, mut client) = self.prepare_request(request).await?;
let response = client
.get_latest_schema(req)
.await
.map_err(|status| DanubeError::FromStatus(status))?
.into_inner();
Ok(SchemaInfo::from(response))
}
pub async fn check_compatibility(
&self,
subject: impl Into<String>,
schema_data: Vec<u8>,
schema_type: SchemaType,
mode: Option<CompatibilityMode>,
) -> Result<CheckCompatibilityResponse> {
let request = CheckCompatibilityRequest {
subject: subject.into(),
new_schema_definition: schema_data,
schema_type: schema_type.as_str().to_string(),
compatibility_mode: mode.map(|m| m.as_str().to_string()),
};
let (req, mut client) = self.prepare_request(request).await?;
let response = client
.check_compatibility(req)
.await
.map_err(|status| DanubeError::FromStatus(status))?
.into_inner();
Ok(response)
}
pub async fn set_compatibility_mode(
&self,
subject: impl Into<String>,
mode: CompatibilityMode,
) -> Result<SetCompatibilityModeResponse> {
let request = SetCompatibilityModeRequest {
subject: subject.into(),
compatibility_mode: mode.as_str().to_string(),
};
let (req, mut client) = self.prepare_request(request).await?;
let response = client
.set_compatibility_mode(req)
.await
.map_err(|status| DanubeError::FromStatus(status))?
.into_inner();
Ok(response)
}
pub async fn list_versions(&self, subject: impl Into<String>) -> Result<Vec<u32>> {
let request = ListVersionsRequest {
subject: subject.into(),
};
let (req, mut client) = self.prepare_request(request).await?;
let response = client
.list_versions(req)
.await
.map_err(|status| DanubeError::FromStatus(status))?
.into_inner();
Ok(response.versions.into_iter().map(|v| v.version).collect())
}
async fn register_schema_internal(
&self,
subject: String,
schema_type: String,
schema_data: Vec<u8>,
description: String,
created_by: String,
tags: Vec<String>,
) -> Result<RegisterSchemaResponse> {
let request = RegisterSchemaRequest {
subject,
schema_type,
schema_definition: schema_data,
description,
created_by,
tags,
};
let (req, mut client) = self.prepare_request(request).await?;
let response = client
.register_schema(req)
.await
.map_err(|status| DanubeError::FromStatus(status))?
.into_inner();
Ok(response)
}
}
pub struct SchemaRegistrationBuilder<'a> {
client: &'a SchemaRegistryClient,
subject: String,
schema_type: Option<SchemaType>,
schema_data: Option<Vec<u8>>,
description: Option<String>,
created_by: Option<String>,
tags: Option<Vec<String>>,
}
impl<'a> SchemaRegistrationBuilder<'a> {
pub fn with_type(mut self, schema_type: SchemaType) -> Self {
self.schema_type = Some(schema_type);
self
}
pub fn with_schema_data(mut self, data: impl Into<Vec<u8>>) -> Self {
self.schema_data = Some(data.into());
self
}
pub fn with_description(mut self, description: impl Into<String>) -> Self {
self.description = Some(description.into());
self
}
pub fn with_created_by(mut self, created_by: impl Into<String>) -> Self {
self.created_by = Some(created_by.into());
self
}
pub fn with_tags(mut self, tags: Vec<String>) -> Self {
self.tags = Some(tags);
self
}
pub async fn execute(self) -> Result<u64> {
let schema_type = self
.schema_type
.ok_or_else(|| DanubeError::SchemaError("Schema type is required".into()))?;
let schema_data = self
.schema_data
.ok_or_else(|| DanubeError::SchemaError("Schema data is required".into()))?;
let response = self
.client
.register_schema_internal(
self.subject,
schema_type.as_str().to_string(),
schema_data,
self.description.unwrap_or_default(),
self.created_by.unwrap_or_else(|| "danube-client".into()),
self.tags.unwrap_or_default(),
)
.await?;
Ok(response.schema_id)
}
}