[][src]Struct schema_registry_converter::Decoder

pub struct Decoder { /* fields omitted */ }

A decoder used to transform bytes to a Value object

The main purpose of having this struct is to be able to cache the schema's. Because the need to be retrieved over http from the schema registry, and we can already see from the bytes which schema we should use, this can save a lot of unnecessary calls. Errors are also stored to the cache, because they may not be recoverable. A function is available to remove the errors from the cache. To get the value avro_rs is used.

For both the key and the payload/key it's possible to use the schema registry, this struct supports both. But only using the SubjectNameStrategy::TopicNameStrategy it has to be made explicit whether it's actual used as key or value.


let _m = mock("GET", "/schemas/ids/1")
    .with_status(200)
    .with_header("content-type", "application/vnd.schemaregistry.v1+json")
    .with_body(r#"{"schema":"{\"type\":\"record\",\"name\":\"Heartbeat\",\"namespace\":\"nl.openweb.data\",\"fields\":[{\"name\":\"beat\",\"type\":\"long\"}]}"}"#)
    .create();

let mut decoder = Decoder::new(SERVER_ADDRESS);
let heartbeat = decoder.decode(Some(&[0,0,0,0,1,6]));

assert_eq!(heartbeat, Ok(Value::Record(vec!(("beat".to_string(), Value::Long(3))))))

Methods

impl Decoder[src]

pub fn new(schema_registry_url: &str) -> Decoder[src]

Creates a new decoder which will use the supplied url to fetch the schema's since the schema needed is encoded in the binary, independent of the SubjectNameStrategy we don't need any additional data. It's possible for recoverable errors to stay in the cash, when a result comes back as an error you can use remove_errors_from_cache to clean the cache, keeping the correctly fetched schema's

pub fn remove_errors_from_cache(&mut self)[src]

Remove al the errors from the cache, you might need to/want to run this when a recoverable error is met. Errors are also cashed to prevent trying to get schema's that either don't exist or can't be parsed.


let mut decoder = Decoder::new(SERVER_ADDRESS);
let bytes = [0,0,0,0,2,6];

let _m = mock("GET", "/schemas/ids/2")
    .with_status(404)
    .with_header("content-type", "application/vnd.schemaregistry.v1+json")
    .with_body(r#"{"error_code":40403,"message":"Schema not found"}"#)
    .create();
let heartbeat = decoder.decode(Some(&bytes));
assert_eq!(heartbeat, Err(SRCError::new("Did not get a 200 response code but 404 instead", None, false).into_cache()));
let _m = mock("GET", "/schemas/ids/2")
    .with_status(200)
    .with_header("content-type", "application/vnd.schemaregistry.v1+json")
    .with_body(r#"{"schema":"{\"type\":\"record\",\"name\":\"Heartbeat\",\"namespace\":\"nl.openweb.data\",\"fields\":[{\"name\":\"beat\",\"type\":\"long\"}]}"}"#)
    .create();

let heartbeat = decoder.decode(Some(&bytes));
assert_eq!(heartbeat, Err(SRCError::new("Did not get a 200 response code but 404 instead", None, false).into_cache()));

decoder.remove_errors_from_cache();

let heartbeat = decoder.decode(Some(&bytes));
assert_eq!(heartbeat, Ok(Value::Record(vec!(("beat".to_string(), Value::Long(3))))))

pub fn decode(&mut self, bytes: Option<&[u8]>) -> Result<Value, SRCError>[src]

Decodes bytes into a value. The choice to use Option<&u8> as type us made so it plays nice with the BorrowedMessage struct from rdkafka, for example if we have m: &'a BorrowedMessage and decoder: &'a mut Decoder we can use decoder.decode(m.payload()) to decode the payload or decoder.decode(m.key()) to get the decoded key.

fn get_value<'a>(
    msg: &'a BorrowedMessage,
    decoder: &'a mut Decoder,
) -> Value{
    match decoder.decode(msg.payload()){
        Ok(v) => v,
        Err(e) => panic!("Error getting value: {}", e),
    }
}

Trait Implementations

impl Debug for Decoder[src]

Auto Trait Implementations

impl Send for Decoder

impl Sync for Decoder

Blanket Implementations

impl<T, U> Into for T where
    U: From<T>, 
[src]

impl<T> From for T[src]

impl<T, U> TryFrom for T where
    U: Into<T>, 
[src]

type Error = !

🔬 This is a nightly-only experimental API. (try_from)

The type returned in the event of a conversion error.

impl<T> Borrow for T where
    T: ?Sized
[src]

impl<T, U> TryInto for T where
    U: TryFrom<T>, 
[src]

type Error = <U as TryFrom<T>>::Error

🔬 This is a nightly-only experimental API. (try_from)

The type returned in the event of a conversion error.

impl<T> Any for T where
    T: 'static + ?Sized
[src]

impl<T> BorrowMut for T where
    T: ?Sized
[src]

impl<T> Same for T

type Output = T

Should always be Self