[][src]Struct schema_registry_converter::Encoder

pub struct Encoder { /* fields omitted */ }

An encoder used to transform a Value object to bytes

The main purpose of having this struct is to be able to cache the schema's. Because the need to be retrieved over http from the schema registry, and we can already see from the bytes which schema we should use, this can save a lot of unnecessary calls. Errors are also stored to the cache, because they may not be recoverable. A function is available to remove the errors from the cache. To get the value avro_rs is used.

For both the key and the payload/key it's possible to use the schema registry, this struct supports both. But only using the SubjectNameStrategy::TopicNameStrategy it has to be made explicit whether it's actual used as key or value.


let _m = mock("GET", "/subjects/heartbeat-value/versions/latest")
    .with_status(200)
    .with_header("content-type", "application/vnd.schemaregistry.v1+json")
    .with_body(r#"{"subject":"heartbeat-value","version":1,"id":3,"schema":"{\"type\":\"record\",\"name\":\"Heartbeat\",\"namespace\":\"nl.openweb.data\",\"fields\":[{\"name\":\"beat\",\"type\":\"long\"}]}"}"#)
    .create();

let _m = mock("GET", "/subjects/heartbeat-key/versions/latest")
    .with_status(200)
    .with_header("content-type", "application/vnd.schemaregistry.v1+json")
    .with_body(r#"{"subject":"heartbeat-value","version":1,"id":4,"schema":"{\"type\":\"record\",\"name\":\"Name\",\"namespace\":\"nl.openweb.data\",\"fields\":[{\"name\":\"name\",\"type\":\"string\",\"avro.java.string\":\"String\"}]}"}"#)
    .create();

let mut encoder = Encoder::new(SERVER_ADDRESS);

let key_strategy = SubjectNameStrategy::TopicNameStrategy("heartbeat", true);
let bytes = encoder.encode(vec!(("name", Value::String("Some name".to_owned()))), &key_strategy);

assert_eq!(bytes, Ok(vec!(0, 0, 0, 0, 4, 18, 83, 111, 109, 101, 32, 110, 97, 109, 101)));

let value_strategy = SubjectNameStrategy::TopicNameStrategy("heartbeat", false);
let bytes = encoder.encode(vec!(("beat", Value::Long(3))), &value_strategy);

assert_eq!(bytes, Ok(vec!(0,0,0,0,3,6)))

Methods

impl Encoder[src]

pub fn new(schema_registry_url: &str) -> Encoder[src]

Creates a new encoder which will use the supplied url to fetch the schema's. The schema's need to be retrieved together with the id, in order for a consumer to decode the bytes. For the encoding several strategies are available in the java client, all three of them are supported. The schema's does have to be present in the schema registry already. This is unlike the Java client with wich it's possible to update/upload the schema when it's not present yet. While it may be added to this library, it's also not hard to do it separately. New schema's can set by doing a post at /subjects/{subject}/versions.

pub fn remove_errors_from_cache(&mut self)[src]

Remove al the errors from the cache, you might need to/want to run this when a recoverable error is met. Errors are also cashed to prevent trying to get schema's that either don't exist or can't be parsed.


let mut encoder = Encoder::new(SERVER_ADDRESS);
let strategy = SubjectNameStrategy::RecordNameStrategy("nl.openweb.data.Heartbeat");

let _m = mock("GET", "/subjects/nl.openweb.data.Heartbeat/versions/latest")
    .with_status(404)
    .with_header("content-type", "application/vnd.schemaregistry.v1+json")
    .with_body(r#"{"error_code":40403,"message":"Schema not found"}"#)
    .create();

let bytes = encoder.encode(vec!(("beat", Value::Long(3))), &strategy);
assert_eq!(bytes, Err(SRCError::new("Did not get a 200 response code but 404 instead", None, false).into_cache()));

let _m = mock("GET", "/subjects/nl.openweb.data.Heartbeat/versions/latest")
    .with_status(200)
    .with_header("content-type", "application/vnd.schemaregistry.v1+json")
    .with_body(r#"{"subject":"heartbeat-value","version":1,"id":4,"schema":"{\"type\":\"record\",\"name\":\"Heartbeat\",\"namespace\":\"nl.openweb.data\",\"fields\":[{\"name\":\"beat\",\"type\":\"long\"}]}"}"#)
    .create();

let bytes = encoder.encode(vec!(("beat", Value::Long(3))), &strategy);
assert_eq!(bytes, Err(SRCError::new("Did not get a 200 response code but 404 instead", None, false).into_cache()));

encoder.remove_errors_from_cache();

let bytes = encoder.encode(vec!(("beat", Value::Long(3))), &strategy);
assert_eq!(bytes, Ok(vec!(0,0,0,0,4,6)))

pub fn encode(
    &mut self,
    values: Vec<(&'static str, Value)>,
    subject_name_strategy: &SubjectNameStrategy
) -> Result<Vec<u8>, SRCError>
[src]

Encodes a vector of values to bytes. The corrects values of the 'keys' depend on the schema being fetched at runtime. For example you might agree on a schema with a consuming party and /or upload a schema to the schema registry before starting the program. In the future an 'encode with schema' might be added which makes it easier to make sure the schema will become available in the correct way.


let _m = mock("GET", "/subjects/heartbeat-nl.openweb.data.Heartbeat/versions/latest")
    .with_status(200)
    .with_header("content-type", "application/vnd.schemaregistry.v1+json")
    .with_body(r#"{"subject":"heartbeat-value","version":1,"id":3,"schema":"{\"type\":\"record\",\"name\":\"Heartbeat\",\"namespace\":\"nl.openweb.data\",\"fields\":[{\"name\":\"beat\",\"type\":\"long\"}]}"}"#)
    .create();

let mut encoder = Encoder::new(SERVER_ADDRESS);
let strategy = SubjectNameStrategy::TopicRecordNameStrategy("heartbeat", "nl.openweb.data.Heartbeat");
let bytes = encoder.encode(vec!(("beat", Value::Long(3))), &strategy);

assert_eq!(bytes, Ok(vec!(0,0,0,0,3,6)))

Trait Implementations

impl Debug for Encoder[src]

Auto Trait Implementations

impl Send for Encoder

impl Sync for Encoder

Blanket Implementations

impl<T, U> Into for T where
    U: From<T>, 
[src]

impl<T> From for T[src]

impl<T, U> TryFrom for T where
    U: Into<T>, 
[src]

type Error = !

🔬 This is a nightly-only experimental API. (try_from)

The type returned in the event of a conversion error.

impl<T> Borrow for T where
    T: ?Sized
[src]

impl<T, U> TryInto for T where
    U: TryFrom<T>, 
[src]

type Error = <U as TryFrom<T>>::Error

🔬 This is a nightly-only experimental API. (try_from)

The type returned in the event of a conversion error.

impl<T> Any for T where
    T: 'static + ?Sized
[src]

impl<T> BorrowMut for T where
    T: ?Sized
[src]

impl<T> Same for T

type Output = T

Should always be Self