Struct schema_registry_converter::blocking::avro::AvroDecoder[][src]

pub struct AvroDecoder { /* fields omitted */ }

A decoder used to transform bytes to a Value object

The main purpose of having this struct is to be able to cache the schema's. Because the need to be retrieved over http from the schema registry, and we can already see from the bytes which schema we should use, this can save a lot of unnecessary calls. Errors are also stored to the cache, because they may not be recoverable. A function is available to remove the errors from the cache. To get the value avro_rs is used.

For both the key and the payload/key it's possible to use the schema registry, this struct supports both. But only using the SubjectNameStrategy::TopicNameStrategy it has to be made explicit whether it's actual used as key or value.

use mockito::{mock, server_address};
use avro_rs::types::Value;
use schema_registry_converter::blocking::schema_registry::SrSettings;
use schema_registry_converter::blocking::avro::AvroDecoder;

let _m = mock("GET", "/schemas/ids/1?deleted=true")
    .with_status(200)
    .with_header("content-type", "application/vnd.schemaregistry.v1+json")
    .with_body(r#"{"schema":"{\"type\":\"record\",\"name\":\"Heartbeat\",\"namespace\":\"nl.openweb.data\",\"fields\":[{\"name\":\"beat\",\"type\":\"long\"}]}"}"#)
    .create();

let sr_settings = SrSettings::new(format!("http://{}", server_address()));
let mut decoder = AvroDecoder::new(sr_settings);
let heartbeat = decoder.decode(Some(&[0,0,0,0,1,6])).unwrap().value;

assert_eq!(heartbeat, Value::Record(vec![("beat".to_string(), Value::Long(3))]))

Implementations

impl AvroDecoder[src]

pub fn new(sr_settings: SrSettings) -> AvroDecoder[src]

Creates a new decoder which will use the supplied url to fetch the schema's since the schema needed is encoded in the binary, independent of the SubjectNameStrategy we don't need any additional data. It's possible for recoverable errors to stay in the cash, when a result comes back as an error you can use remove_errors_from_cache to clean the cache, keeping the correctly fetched schema's

pub fn remove_errors_from_cache(&mut self)[src]

Remove al the errors from the cache, you might need to/want to run this when a recoverable error is met. Errors are also cashed to prevent trying to get schema's that either don't exist or can't be parsed.

use mockito::{mock, server_address};
use avro_rs::types::Value;
use schema_registry_converter::blocking::avro::AvroDecoder;
use schema_registry_converter::blocking::schema_registry::SrSettings;
use schema_registry_converter::error::SRCError;

let sr_settings = SrSettings::new(format!("http://{}", server_address()));
let mut decoder = AvroDecoder::new(sr_settings);
let bytes = [0,0,0,0,2,6];

let _m = mock("GET", "/schemas/ids/2?deleted=true")
    .with_status(404)
    .with_header("content-type", "application/vnd.schemaregistry.v1+json")
    .with_body(r#"{"error_code":40403,"message":"Schema not found"}"#)
    .create();

let heartbeat = decoder.decode(Some(&bytes));

assert_eq!(heartbeat, Err(SRCError::new("Could not get raw schema from response", None, false).into_cache()));

let _m = mock("GET", "/schemas/ids/2?deleted=true")
    .with_status(200)
    .with_header("content-type", "application/vnd.schemaregistry.v1+json")
    .with_body(r#"{"schema":"{\"type\":\"record\",\"name\":\"Heartbeat\",\"namespace\":\"nl.openweb.data\",\"fields\":[{\"name\":\"beat\",\"type\":\"long\"}]}"}"#)
    .create();

let heartbeat = decoder.decode(Some(&bytes));
assert_eq!(heartbeat, Err(SRCError::new("Could not get raw schema from response", None, false).into_cache()));

decoder.remove_errors_from_cache();

let heartbeat = decoder.decode(Some(&bytes)).unwrap().value;
assert_eq!(heartbeat, Value::Record(vec![("beat".to_string(), Value::Long(3))]))

pub fn decode(&mut self, bytes: Option<&[u8]>) -> Result<DecodeResult, SRCError>[src]

Decodes bytes into a value. The choice to use Option<&u8> as type us made so it plays nice with the BorrowedMessage struct from rdkafka, for example if we have m: &'a BorrowedMessage and decoder: &'a mut Decoder we can use decoder.decode(m.payload()) to decode the payload or decoder.decode(m.key()) to get the decoded key.

use rdkafka::message::{Message, BorrowedMessage};
use avro_rs::types::Value;
use schema_registry_converter::blocking::avro::AvroDecoder;
fn get_value<'a>(
    msg: &'a BorrowedMessage,
    decoder: &'a mut AvroDecoder,
) -> Value{
    match decoder.decode(msg.payload()){
        Ok(r) => r.value,
        Err(e) => panic!("Error getting value: {}", e),
    }
}

Trait Implementations

impl Debug for AvroDecoder[src]

Auto Trait Implementations

Blanket Implementations

impl<T> Any for T where
    T: 'static + ?Sized
[src]

impl<T> Borrow<T> for T where
    T: ?Sized
[src]

impl<T> BorrowMut<T> for T where
    T: ?Sized
[src]

impl<T> From<T> for T[src]

impl<T> GetTypeId for T where
    T: Any
[src]

impl<T> Instrument for T[src]

impl<T> Instrument for T[src]

impl<T, U> Into<U> for T where
    U: From<T>, 
[src]

impl<T> Same<T> for T

type Output = T

Should always be Self

impl<T, U> TryFrom<U> for T where
    U: Into<T>, 
[src]

type Error = Infallible

The type returned in the event of a conversion error.

impl<T, U> TryInto<U> for T where
    U: TryFrom<T>, 
[src]

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.

impl<V, T> VZip<V> for T where
    V: MultiLane<T>,