use bytes::Bytes;
use crabka_schema_serde::SchemaCache;
use crabka_schema_serde::format::{SchemaDeserializer, SchemaSerializer, SchemaSubject};
use crate::error::StreamsClientError;
use crate::membership::SchemaPrewarm;
use crate::processor::serde::{Serde, SerdeAssociate, SerdeError, SerdeRole};
pub struct SchemaSerde<T, S> {
inner: S,
_marker: std::marker::PhantomData<fn() -> T>,
}
impl<T, S: Clone> Clone for SchemaSerde<T, S> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
_marker: std::marker::PhantomData,
}
}
}
impl<T, S> SchemaSerde<T, S> {
pub fn new(inner: S) -> Self {
Self {
inner,
_marker: std::marker::PhantomData,
}
}
}
impl<T, S: Default> Default for SchemaSerde<T, S> {
fn default() -> Self {
Self::new(S::default())
}
}
impl<T: Send + Sync + 'static, S> SerdeAssociate for SchemaSerde<T, S> {
type Target = T;
}
impl<T, S> Serde<T> for SchemaSerde<T, S>
where
T: Send + Sync + 'static,
S: SchemaSerializer<T> + SchemaDeserializer<T> + SchemaSubject,
{
fn serialize(&self, topic: &str, value: &T) -> Bytes {
self.inner
.serialize(topic, value)
.expect("schema serialize failed (did membership prewarm run?)")
}
fn deserialize(&self, topic: &str, bytes: &[u8]) -> Result<T, SerdeError> {
self.inner
.deserialize(topic, bytes)
.map_err(|e| SerdeError(e.to_string()))
}
fn prepare(&self, topic: &str, _role: SerdeRole) {
self.inner.register_subject(topic);
}
}
#[async_trait::async_trait]
impl SchemaPrewarm for SchemaCache {
async fn prewarm(&self) -> Result<(), StreamsClientError> {
SchemaCache::prewarm(self)
.await
.map_err(|e| StreamsClientError::Runtime(e.to_string()))
}
}