Danube-client
An async Rust client library for interacting with Danube Pub/Sub messaging platform.
Danube is an open-source distributed Pub/Sub messaging platform written in Rust.
⚠️ This library is currently under active development and may have missing or incomplete functionalities. Use with caution.
I'm working on improving and adding new features. Please feel free to contribute or report any issues you encounter.
Example usage
Check out the example files.
Producer
use anyhow::Result;
use danube_client::{DanubeClient, SchemaType};
use serde_json::json;
use std::thread;
use std::time::Duration;
use tracing::info;
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt::init();
let client = DanubeClient::builder()
.service_url("http://[::1]:6650")
.build()
.unwrap();
let topic = "/default/test_topic".to_string();
let json_schema = r#"{"type": "object", "properties": {"field1": {"type": "string"}, "field2": {"type": "integer"}}}"#.to_string();
let mut producer = client
.new_producer()
.with_topic(topic)
.with_name("test_producer1")
.with_schema("my_app".into(), SchemaType::Json(json_schema))
.build();
let prod_id = producer.create().await?;
info!("The Producer was created with ID: {:?}", prod_id);
let mut i = 0;
while i < 20 {
let data = json!({
"field1": format!{"value{}", i},
"field2": 2020+i,
});
let json_string = serde_json::to_string(&data).unwrap();
let encoded_data = json_string.as_bytes().to_vec();
let message_id = producer.send(encoded_data).await?;
println!("The Message with id {} was sent", message_id);
thread::sleep(Duration::from_secs(1));
i += 1;
}
Ok(())
}
Consumer
use anyhow::Result;
use danube_client::{DanubeClient, SubType};
use futures_util::stream::StreamExt;
use serde::Deserialize;
#[derive(Deserialize, Debug)]
#[allow(dead_code)]
struct MyMessage {
field1: String,
field2: i32,
}
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt::init();
let client = DanubeClient::builder()
.service_url("http://[::1]:6650")
.build()
.unwrap();
let topic = "/default/test_topic".to_string();
let mut consumer = client
.new_consumer()
.with_topic(topic.clone())
.with_consumer_name("test_consumer")
.with_subscription("test_subscription")
.with_subscription_type(SubType::Exclusive)
.build();
let consumer_id = consumer.subscribe().await?;
println!("The Consumer with ID: {:?} was created", consumer_id);
let _schema = client.get_schema(topic).await.unwrap();
let mut message_stream = consumer.receive().await?;
while let Some(message) = message_stream.next().await {
match message {
Ok(stream_message) => {
let payload = stream_message.messages;
match serde_json::from_slice::<MyMessage>(&payload) {
Ok(decoded_message) => {
println!("Received message: {:?}", decoded_message);
}
Err(e) => {
eprintln!("Failed to decode message: {}", e);
}
}
}
Err(e) => {
eprintln!("Error receiving message: {}", e);
break;
}
}
}
Ok(())
}