vld-lapin 0.2.0

Lapin (RabbitMQ) integration for the vld validation library — validate message payloads before publish and after consume
Documentation

Crates.io docs.rs License Platform GitHub issues GitHub stars

vld-lapin

Lapin (RabbitMQ) integration for vld.

Overview

vld-lapin keeps one entrypoint:

  • impl_to_lapin!(channel)

After rebinding, channel becomes a validating wrapper:

  • auto-conversion helpers: publish, basic_get, decode_bytes, decode_delivery, decode_get
  • ack helpers: ack_decode, nack_decode, reject_decode, ack_decode_get, nack_decode_get, reject_decode_get
  • all other native lapin::Channel methods remain available through deref

Installation

[dependencies]
vld = { version = "0.2", features = ["serialize"] }
vld-lapin = "0.2"
lapin = "2"
serde = { version = "1", features = ["derive"] }
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }

Quick start

use lapin::{
    options::{BasicAckOptions, BasicGetOptions, BasicPublishOptions, QueueDeclareOptions},
    types::FieldTable,
    BasicProperties, Connection, ConnectionProperties,
};
use vld_lapin::prelude::*;

vld::schema! {
    #[derive(Debug, serde::Serialize)]
    pub struct EventSchema {
        pub event: String => vld::string().min(1),
        pub retries: i64 => vld::number().int().min(0).max(5),
    }
}

let evt = EventSchema {
    event: "user.created".into(),
    retries: 0,
};

let conn = Connection::connect(
    "amqp://guest:guest@127.0.0.1:5672/%2f",
    ConnectionProperties::default(),
).await?;
let channel = conn.create_channel().await?;
impl_to_lapin!(channel);

// Native method through deref.
channel
    .basic_qos(10, lapin::options::BasicQosOptions::default())
    .await?;

channel
    .queue_declare("events.user", QueueDeclareOptions::default(), FieldTable::default())
    .await?;

channel
    .publish(
        "",
        "events.user",
        BasicPublishOptions::default(),
        BasicProperties::default(),
        &evt,
    )
    .await?;

if let Some(msg) = channel.basic_get("events.user", BasicGetOptions::default()).await? {
    let parsed: EventSchema = channel.ack_decode_get(&msg, BasicAckOptions::default()).await?;
    println!("event = {}", parsed.event);
}
# Ok::<(), Box<dyn std::error::Error>>(())

Example

cargo run -p vld-lapin --example lapin_basic

License

MIT