use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use bytes::Bytes;
use crate::error::Result;
use crate::types::{EncodeTarget, Schema, SchemaId, SchemaReference, SchemaType, SchemaVersion};
pub trait SchemaRegistryClient: Send + Sync {
fn get_schema_by_id(
&self,
id: SchemaId,
) -> impl Future<Output = Result<Arc<Schema>>> + Send + '_;
fn get_latest_schema<'a>(
&'a self,
subject: &'a str,
) -> impl Future<Output = Result<Arc<Schema>>> + Send + 'a;
fn get_schema_by_version<'a>(
&'a self,
subject: &'a str,
version: SchemaVersion,
) -> impl Future<Output = Result<Arc<Schema>>> + Send + 'a;
fn register_schema<'a>(
&'a self,
subject: &'a str,
schema: &'a str,
schema_type: SchemaType,
references: &'a [SchemaReference],
) -> impl Future<Output = Result<SchemaId>> + Send + 'a;
fn check_compatibility<'a>(
&'a self,
_subject: &'a str,
_schema: &'a str,
_schema_type: SchemaType,
_references: &'a [SchemaReference],
) -> impl Future<Output = Result<bool>> + Send + 'a {
std::future::ready(Err(crate::error::SchemaRegError::not_supported(
"check_compatibility is not supported by this registry",
)))
}
fn delete_subject<'a>(
&'a self,
_subject: &'a str,
_permanent: bool,
) -> impl Future<Output = Result<Vec<SchemaVersion>>> + Send + 'a {
std::future::ready(Err(crate::error::SchemaRegError::not_supported(
"delete_subject is not supported by this registry",
)))
}
fn get_subjects(&self) -> impl Future<Output = Result<Vec<String>>> + Send + '_ {
std::future::ready(Err(crate::error::SchemaRegError::not_supported(
"get_subjects is not supported by this registry",
)))
}
fn get_versions<'a>(
&'a self,
_subject: &'a str,
) -> impl Future<Output = Result<Vec<SchemaVersion>>> + Send + 'a {
std::future::ready(Err(crate::error::SchemaRegError::not_supported(
"get_versions is not supported by this registry",
)))
}
}
impl<T: SchemaRegistryClient + ?Sized> SchemaRegistryClient for &T {
fn get_schema_by_id(
&self,
id: crate::types::SchemaId,
) -> impl Future<Output = crate::error::Result<Arc<crate::types::Schema>>> + Send + '_ {
T::get_schema_by_id(self, id)
}
fn get_latest_schema<'a>(
&'a self,
subject: &'a str,
) -> impl Future<Output = crate::error::Result<Arc<crate::types::Schema>>> + Send + 'a {
T::get_latest_schema(self, subject)
}
fn get_schema_by_version<'a>(
&'a self,
subject: &'a str,
version: crate::types::SchemaVersion,
) -> impl Future<Output = crate::error::Result<Arc<crate::types::Schema>>> + Send + 'a {
T::get_schema_by_version(self, subject, version)
}
fn register_schema<'a>(
&'a self,
subject: &'a str,
schema: &'a str,
schema_type: crate::types::SchemaType,
references: &'a [crate::types::SchemaReference],
) -> impl Future<Output = crate::error::Result<crate::types::SchemaId>> + Send + 'a {
T::register_schema(self, subject, schema, schema_type, references)
}
fn check_compatibility<'a>(
&'a self,
subject: &'a str,
schema: &'a str,
schema_type: crate::types::SchemaType,
references: &'a [crate::types::SchemaReference],
) -> impl Future<Output = crate::error::Result<bool>> + Send + 'a {
T::check_compatibility(self, subject, schema, schema_type, references)
}
fn delete_subject<'a>(
&'a self,
subject: &'a str,
permanent: bool,
) -> impl Future<Output = crate::error::Result<Vec<crate::types::SchemaVersion>>> + Send + 'a
{
T::delete_subject(self, subject, permanent)
}
fn get_subjects(&self) -> impl Future<Output = crate::error::Result<Vec<String>>> + Send + '_ {
T::get_subjects(self)
}
fn get_versions<'a>(
&'a self,
subject: &'a str,
) -> impl Future<Output = crate::error::Result<Vec<crate::types::SchemaVersion>>> + Send + 'a
{
T::get_versions(self, subject)
}
}
impl<T: SchemaRegistryClient + ?Sized> SchemaRegistryClient for std::sync::Arc<T> {
fn get_schema_by_id(
&self,
id: crate::types::SchemaId,
) -> impl Future<Output = crate::error::Result<Arc<crate::types::Schema>>> + Send + '_ {
T::get_schema_by_id(self, id)
}
fn get_latest_schema<'a>(
&'a self,
subject: &'a str,
) -> impl Future<Output = crate::error::Result<Arc<crate::types::Schema>>> + Send + 'a {
T::get_latest_schema(self, subject)
}
fn get_schema_by_version<'a>(
&'a self,
subject: &'a str,
version: crate::types::SchemaVersion,
) -> impl Future<Output = crate::error::Result<Arc<crate::types::Schema>>> + Send + 'a {
T::get_schema_by_version(self, subject, version)
}
fn register_schema<'a>(
&'a self,
subject: &'a str,
schema: &'a str,
schema_type: crate::types::SchemaType,
references: &'a [crate::types::SchemaReference],
) -> impl Future<Output = crate::error::Result<crate::types::SchemaId>> + Send + 'a {
T::register_schema(self, subject, schema, schema_type, references)
}
fn check_compatibility<'a>(
&'a self,
subject: &'a str,
schema: &'a str,
schema_type: crate::types::SchemaType,
references: &'a [crate::types::SchemaReference],
) -> impl Future<Output = crate::error::Result<bool>> + Send + 'a {
T::check_compatibility(self, subject, schema, schema_type, references)
}
fn delete_subject<'a>(
&'a self,
subject: &'a str,
permanent: bool,
) -> impl Future<Output = crate::error::Result<Vec<crate::types::SchemaVersion>>> + Send + 'a
{
T::delete_subject(self, subject, permanent)
}
fn get_subjects(&self) -> impl Future<Output = crate::error::Result<Vec<String>>> + Send + '_ {
T::get_subjects(self)
}
fn get_versions<'a>(
&'a self,
subject: &'a str,
) -> impl Future<Output = crate::error::Result<Vec<crate::types::SchemaVersion>>> + Send + 'a
{
T::get_versions(self, subject)
}
}
pub trait AnySchemaCache: Send + Sync {
type Id: Copy + Send + Sync;
fn cache_len(&self) -> usize;
fn cache_is_empty(&self) -> bool;
fn clear_cache(&self);
fn invalidate(&self, id: Self::Id);
fn invalidate_all(&self);
fn warm_cache<'a>(
&'a self,
ids: &'a [Self::Id],
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>>;
}
pub trait SchemaEncoder: Send + Sync {
fn encode(
&self,
payload: Bytes,
topic: &str,
record_name: Option<&str>,
target: EncodeTarget,
) -> Pin<Box<dyn Future<Output = Result<Bytes>> + Send + '_>>;
}
pub trait SchemaDecoder: Send + Sync {
fn decode(
&self,
payload: Bytes,
topic: &str,
target: EncodeTarget,
) -> Pin<Box<dyn Future<Output = Result<Bytes>> + Send + '_>>;
}
pub trait DynSchemaRegistryClient: Send + Sync {
fn get_schema_by_id<'a>(
&'a self,
id: SchemaId,
) -> Pin<Box<dyn Future<Output = Result<Arc<Schema>>> + Send + 'a>>;
fn get_latest_schema<'a>(
&'a self,
subject: &'a str,
) -> Pin<Box<dyn Future<Output = Result<Arc<Schema>>> + Send + 'a>>;
fn get_schema_by_version<'a>(
&'a self,
subject: &'a str,
version: SchemaVersion,
) -> Pin<Box<dyn Future<Output = Result<Arc<Schema>>> + Send + 'a>>;
fn register_schema<'a>(
&'a self,
subject: &'a str,
schema: &'a str,
schema_type: SchemaType,
references: &'a [SchemaReference],
) -> Pin<Box<dyn Future<Output = Result<SchemaId>> + Send + 'a>>;
fn check_compatibility<'a>(
&'a self,
subject: &'a str,
schema: &'a str,
schema_type: SchemaType,
references: &'a [SchemaReference],
) -> Pin<Box<dyn Future<Output = Result<bool>> + Send + 'a>>;
fn delete_subject<'a>(
&'a self,
subject: &'a str,
permanent: bool,
) -> Pin<Box<dyn Future<Output = Result<Vec<SchemaVersion>>> + Send + 'a>>;
fn get_subjects<'a>(&'a self)
-> Pin<Box<dyn Future<Output = Result<Vec<String>>> + Send + 'a>>;
fn get_versions<'a>(
&'a self,
subject: &'a str,
) -> Pin<Box<dyn Future<Output = Result<Vec<SchemaVersion>>> + Send + 'a>>;
}
impl<T: SchemaRegistryClient> DynSchemaRegistryClient for T {
fn get_schema_by_id<'a>(
&'a self,
id: SchemaId,
) -> Pin<Box<dyn Future<Output = Result<Arc<Schema>>> + Send + 'a>> {
Box::pin(SchemaRegistryClient::get_schema_by_id(self, id))
}
fn get_latest_schema<'a>(
&'a self,
subject: &'a str,
) -> Pin<Box<dyn Future<Output = Result<Arc<Schema>>> + Send + 'a>> {
Box::pin(SchemaRegistryClient::get_latest_schema(self, subject))
}
fn get_schema_by_version<'a>(
&'a self,
subject: &'a str,
version: SchemaVersion,
) -> Pin<Box<dyn Future<Output = Result<Arc<Schema>>> + Send + 'a>> {
Box::pin(SchemaRegistryClient::get_schema_by_version(
self, subject, version,
))
}
fn register_schema<'a>(
&'a self,
subject: &'a str,
schema: &'a str,
schema_type: SchemaType,
references: &'a [SchemaReference],
) -> Pin<Box<dyn Future<Output = Result<SchemaId>> + Send + 'a>> {
Box::pin(SchemaRegistryClient::register_schema(
self,
subject,
schema,
schema_type,
references,
))
}
fn check_compatibility<'a>(
&'a self,
subject: &'a str,
schema: &'a str,
schema_type: SchemaType,
references: &'a [SchemaReference],
) -> Pin<Box<dyn Future<Output = Result<bool>> + Send + 'a>> {
Box::pin(SchemaRegistryClient::check_compatibility(
self,
subject,
schema,
schema_type,
references,
))
}
fn delete_subject<'a>(
&'a self,
subject: &'a str,
permanent: bool,
) -> Pin<Box<dyn Future<Output = Result<Vec<SchemaVersion>>> + Send + 'a>> {
Box::pin(SchemaRegistryClient::delete_subject(
self, subject, permanent,
))
}
fn get_subjects<'a>(
&'a self,
) -> Pin<Box<dyn Future<Output = Result<Vec<String>>> + Send + 'a>> {
Box::pin(SchemaRegistryClient::get_subjects(self))
}
fn get_versions<'a>(
&'a self,
subject: &'a str,
) -> Pin<Box<dyn Future<Output = Result<Vec<SchemaVersion>>> + Send + 'a>> {
Box::pin(SchemaRegistryClient::get_versions(self, subject))
}
}