use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use bytes::Bytes;
use crate::error::{Result, SchemaRegError};
use crate::types::{
CompatibilityLevel, EncodeTarget, Schema, SchemaId, SchemaReference, SchemaType, SchemaVersion,
};
pub trait SchemaRegistryClient: Send + Sync {
fn get_schema_by_id(
&self,
id: SchemaId,
) -> impl Future<Output = Result<Arc<Schema>>> + Send + '_;
fn get_latest_schema<'a>(
&'a self,
subject: &'a str,
) -> impl Future<Output = Result<Arc<Schema>>> + Send + 'a;
fn get_schema_by_version<'a>(
&'a self,
subject: &'a str,
version: SchemaVersion,
) -> impl Future<Output = Result<Arc<Schema>>> + Send + 'a;
fn register_schema<'a>(
&'a self,
subject: &'a str,
schema: &'a str,
schema_type: SchemaType,
references: &'a [SchemaReference],
) -> impl Future<Output = Result<SchemaId>> + Send + 'a;
fn check_compatibility<'a>(
&'a self,
_subject: &'a str,
_schema: &'a str,
_schema_type: SchemaType,
_references: &'a [SchemaReference],
) -> impl Future<Output = Result<bool>> + Send + 'a {
async {
Err(SchemaRegError::not_supported(
"check_compatibility is not supported by this registry",
))
}
}
fn check_compatible<'a>(
&'a self,
subject: &'a str,
schema: &'a str,
schema_type: SchemaType,
) -> impl Future<Output = Result<bool>> + Send + 'a {
self.check_compatibility(subject, schema, schema_type, &[])
}
fn delete_subject<'a>(
&'a self,
_subject: &'a str,
_permanent: bool,
) -> impl Future<Output = Result<Vec<SchemaVersion>>> + Send + 'a {
async {
Err(SchemaRegError::not_supported(
"delete_subject is not supported by this registry",
))
}
}
fn get_subjects(&self) -> impl Future<Output = Result<Vec<String>>> + Send + '_ {
async {
Err(SchemaRegError::not_supported(
"get_subjects is not supported by this registry",
))
}
}
fn get_versions<'a>(
&'a self,
_subject: &'a str,
) -> impl Future<Output = Result<Vec<SchemaVersion>>> + Send + 'a {
async {
Err(SchemaRegError::not_supported(
"get_versions is not supported by this registry",
))
}
}
fn health_check(&self) -> impl Future<Output = Result<()>> + Send + '_ {
async {
Err(SchemaRegError::not_supported(
"health_check is not supported by this registry",
))
}
}
fn set_compatibility<'a>(
&'a self,
_subject: &'a str,
_level: CompatibilityLevel,
) -> impl Future<Output = Result<()>> + Send + 'a {
async {
Err(SchemaRegError::not_supported(
"set_compatibility is not supported by this registry",
))
}
}
fn get_compatibility<'a>(
&'a self,
_subject: &'a str,
) -> impl Future<Output = Result<CompatibilityLevel>> + Send + 'a {
async {
Err(SchemaRegError::not_supported(
"get_compatibility is not supported by this registry",
))
}
}
}
impl<T: SchemaRegistryClient + ?Sized> SchemaRegistryClient for &T {
fn get_schema_by_id(
&self,
id: SchemaId,
) -> impl Future<Output = Result<Arc<Schema>>> + Send + '_ {
(**self).get_schema_by_id(id)
}
fn get_latest_schema<'a>(
&'a self,
subject: &'a str,
) -> impl Future<Output = Result<Arc<Schema>>> + Send + 'a {
(**self).get_latest_schema(subject)
}
fn get_schema_by_version<'a>(
&'a self,
subject: &'a str,
version: SchemaVersion,
) -> impl Future<Output = Result<Arc<Schema>>> + Send + 'a {
(**self).get_schema_by_version(subject, version)
}
fn register_schema<'a>(
&'a self,
subject: &'a str,
schema: &'a str,
schema_type: SchemaType,
references: &'a [SchemaReference],
) -> impl Future<Output = Result<SchemaId>> + Send + 'a {
(**self).register_schema(subject, schema, schema_type, references)
}
fn check_compatibility<'a>(
&'a self,
subject: &'a str,
schema: &'a str,
schema_type: SchemaType,
references: &'a [SchemaReference],
) -> impl Future<Output = Result<bool>> + Send + 'a {
(**self).check_compatibility(subject, schema, schema_type, references)
}
fn delete_subject<'a>(
&'a self,
subject: &'a str,
permanent: bool,
) -> impl Future<Output = Result<Vec<SchemaVersion>>> + Send + 'a {
(**self).delete_subject(subject, permanent)
}
fn get_subjects(&self) -> impl Future<Output = Result<Vec<String>>> + Send + '_ {
(**self).get_subjects()
}
fn get_versions<'a>(
&'a self,
subject: &'a str,
) -> impl Future<Output = Result<Vec<SchemaVersion>>> + Send + 'a {
(**self).get_versions(subject)
}
fn health_check(&self) -> impl Future<Output = Result<()>> + Send + '_ {
(**self).health_check()
}
fn set_compatibility<'a>(
&'a self,
subject: &'a str,
level: CompatibilityLevel,
) -> impl Future<Output = Result<()>> + Send + 'a {
(**self).set_compatibility(subject, level)
}
fn get_compatibility<'a>(
&'a self,
subject: &'a str,
) -> impl Future<Output = Result<CompatibilityLevel>> + Send + 'a {
(**self).get_compatibility(subject)
}
}
impl<T: SchemaRegistryClient + ?Sized> SchemaRegistryClient for Arc<T> {
fn get_schema_by_id(
&self,
id: SchemaId,
) -> impl Future<Output = Result<Arc<Schema>>> + Send + '_ {
(**self).get_schema_by_id(id)
}
fn get_latest_schema<'a>(
&'a self,
subject: &'a str,
) -> impl Future<Output = Result<Arc<Schema>>> + Send + 'a {
(**self).get_latest_schema(subject)
}
fn get_schema_by_version<'a>(
&'a self,
subject: &'a str,
version: SchemaVersion,
) -> impl Future<Output = Result<Arc<Schema>>> + Send + 'a {
(**self).get_schema_by_version(subject, version)
}
fn register_schema<'a>(
&'a self,
subject: &'a str,
schema: &'a str,
schema_type: SchemaType,
references: &'a [SchemaReference],
) -> impl Future<Output = Result<SchemaId>> + Send + 'a {
(**self).register_schema(subject, schema, schema_type, references)
}
fn check_compatibility<'a>(
&'a self,
subject: &'a str,
schema: &'a str,
schema_type: SchemaType,
references: &'a [SchemaReference],
) -> impl Future<Output = Result<bool>> + Send + 'a {
(**self).check_compatibility(subject, schema, schema_type, references)
}
fn delete_subject<'a>(
&'a self,
subject: &'a str,
permanent: bool,
) -> impl Future<Output = Result<Vec<SchemaVersion>>> + Send + 'a {
(**self).delete_subject(subject, permanent)
}
fn get_subjects(&self) -> impl Future<Output = Result<Vec<String>>> + Send + '_ {
(**self).get_subjects()
}
fn get_versions<'a>(
&'a self,
subject: &'a str,
) -> impl Future<Output = Result<Vec<SchemaVersion>>> + Send + 'a {
(**self).get_versions(subject)
}
fn health_check(&self) -> impl Future<Output = Result<()>> + Send + '_ {
(**self).health_check()
}
fn set_compatibility<'a>(
&'a self,
subject: &'a str,
level: CompatibilityLevel,
) -> impl Future<Output = Result<()>> + Send + 'a {
(**self).set_compatibility(subject, level)
}
fn get_compatibility<'a>(
&'a self,
subject: &'a str,
) -> impl Future<Output = Result<CompatibilityLevel>> + Send + 'a {
(**self).get_compatibility(subject)
}
}
pub trait DynSchemaRegistryClient: Send + Sync {
fn get_schema_by_id<'a>(
&'a self,
id: SchemaId,
) -> Pin<Box<dyn Future<Output = Result<Arc<Schema>>> + Send + 'a>>;
fn get_latest_schema<'a>(
&'a self,
subject: &'a str,
) -> Pin<Box<dyn Future<Output = Result<Arc<Schema>>> + Send + 'a>>;
fn get_schema_by_version<'a>(
&'a self,
subject: &'a str,
version: SchemaVersion,
) -> Pin<Box<dyn Future<Output = Result<Arc<Schema>>> + Send + 'a>>;
fn register_schema<'a>(
&'a self,
subject: &'a str,
schema: &'a str,
schema_type: SchemaType,
references: &'a [SchemaReference],
) -> Pin<Box<dyn Future<Output = Result<SchemaId>> + Send + 'a>>;
fn check_compatibility<'a>(
&'a self,
subject: &'a str,
schema: &'a str,
schema_type: SchemaType,
references: &'a [SchemaReference],
) -> Pin<Box<dyn Future<Output = Result<bool>> + Send + 'a>>;
fn check_compatible<'a>(
&'a self,
subject: &'a str,
schema: &'a str,
schema_type: SchemaType,
) -> Pin<Box<dyn Future<Output = Result<bool>> + Send + 'a>>;
fn delete_subject<'a>(
&'a self,
subject: &'a str,
permanent: bool,
) -> Pin<Box<dyn Future<Output = Result<Vec<SchemaVersion>>> + Send + 'a>>;
fn get_subjects<'a>(&'a self)
-> Pin<Box<dyn Future<Output = Result<Vec<String>>> + Send + 'a>>;
fn get_versions<'a>(
&'a self,
subject: &'a str,
) -> Pin<Box<dyn Future<Output = Result<Vec<SchemaVersion>>> + Send + 'a>>;
fn health_check<'a>(&'a self) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>>;
fn set_compatibility<'a>(
&'a self,
subject: &'a str,
level: CompatibilityLevel,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>>;
fn get_compatibility<'a>(
&'a self,
subject: &'a str,
) -> Pin<Box<dyn Future<Output = Result<CompatibilityLevel>> + Send + 'a>>;
}
const _: () = {
fn _assert_dyn_is_send_sync()
where
dyn DynSchemaRegistryClient: Send + Sync,
{
}
};
impl<T: SchemaRegistryClient> DynSchemaRegistryClient for T {
fn get_schema_by_id<'a>(
&'a self,
id: SchemaId,
) -> Pin<Box<dyn Future<Output = Result<Arc<Schema>>> + Send + 'a>> {
Box::pin(self.get_schema_by_id(id))
}
fn get_latest_schema<'a>(
&'a self,
subject: &'a str,
) -> Pin<Box<dyn Future<Output = Result<Arc<Schema>>> + Send + 'a>> {
Box::pin(self.get_latest_schema(subject))
}
fn get_schema_by_version<'a>(
&'a self,
subject: &'a str,
version: SchemaVersion,
) -> Pin<Box<dyn Future<Output = Result<Arc<Schema>>> + Send + 'a>> {
Box::pin(self.get_schema_by_version(subject, version))
}
fn register_schema<'a>(
&'a self,
subject: &'a str,
schema: &'a str,
schema_type: SchemaType,
references: &'a [SchemaReference],
) -> Pin<Box<dyn Future<Output = Result<SchemaId>> + Send + 'a>> {
Box::pin(self.register_schema(subject, schema, schema_type, references))
}
fn check_compatibility<'a>(
&'a self,
subject: &'a str,
schema: &'a str,
schema_type: SchemaType,
references: &'a [SchemaReference],
) -> Pin<Box<dyn Future<Output = Result<bool>> + Send + 'a>> {
Box::pin(self.check_compatibility(subject, schema, schema_type, references))
}
fn check_compatible<'a>(
&'a self,
subject: &'a str,
schema: &'a str,
schema_type: SchemaType,
) -> Pin<Box<dyn Future<Output = Result<bool>> + Send + 'a>> {
Box::pin(self.check_compatible(subject, schema, schema_type))
}
fn delete_subject<'a>(
&'a self,
subject: &'a str,
permanent: bool,
) -> Pin<Box<dyn Future<Output = Result<Vec<SchemaVersion>>> + Send + 'a>> {
Box::pin(self.delete_subject(subject, permanent))
}
fn get_subjects<'a>(
&'a self,
) -> Pin<Box<dyn Future<Output = Result<Vec<String>>> + Send + 'a>> {
Box::pin(self.get_subjects())
}
fn get_versions<'a>(
&'a self,
subject: &'a str,
) -> Pin<Box<dyn Future<Output = Result<Vec<SchemaVersion>>> + Send + 'a>> {
Box::pin(self.get_versions(subject))
}
fn health_check<'a>(&'a self) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
Box::pin(self.health_check())
}
fn set_compatibility<'a>(
&'a self,
subject: &'a str,
level: CompatibilityLevel,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
Box::pin(self.set_compatibility(subject, level))
}
fn get_compatibility<'a>(
&'a self,
subject: &'a str,
) -> Pin<Box<dyn Future<Output = Result<CompatibilityLevel>> + Send + 'a>> {
Box::pin(self.get_compatibility(subject))
}
}
pub trait AnySchemaCache: Send + Sync {
type Id: Copy + Send + Sync;
fn cache_len(&self) -> usize;
fn cache_is_empty(&self) -> bool;
fn clear_cache(&self);
fn invalidate(&self, id: Self::Id);
fn invalidate_all(&self);
fn warm_cache<'a>(
&'a self,
ids: &'a [Self::Id],
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>>;
}
pub trait SchemaEncoder: Send + Sync {
fn encode(
&self,
payload: Bytes,
topic: &str,
record_name: Option<&str>,
target: EncodeTarget,
) -> Pin<Box<dyn Future<Output = Result<Bytes>> + Send + '_>>;
}
pub trait SchemaDecoder: Send + Sync {
fn decode(
&self,
payload: Bytes,
topic: &str,
target: EncodeTarget,
) -> Pin<Box<dyn Future<Output = Result<Bytes>> + Send + '_>>;
}