use amiquip::{
AmqpProperties, Channel, Connection, Consumer, ConsumerMessage, ConsumerOptions, Exchange,
Publish, Queue, QueueDeclareOptions, Result,
};
use uuid::Uuid;
struct FibonacciRpcClient<'a> {
exchange: Exchange<'a>,
queue: Queue<'a>,
consumer: Consumer<'a>,
}
impl<'a> FibonacciRpcClient<'a> {
fn new(channel: &Channel) -> Result<FibonacciRpcClient> {
let exchange = Exchange::direct(&channel);
let queue = channel.queue_declare(
"",
QueueDeclareOptions {
exclusive: true,
..QueueDeclareOptions::default()
},
)?;
let consumer = queue.consume(ConsumerOptions {
no_ack: true,
..ConsumerOptions::default()
})?;
Ok(FibonacciRpcClient {
exchange,
queue,
consumer,
})
}
fn call(&self, n: u64) -> Result<String> {
let correlation_id = format!("{}", Uuid::new_v4());
self.exchange.publish(Publish::with_properties(
format!("{}", n).as_bytes(),
"rpc_queue",
AmqpProperties::default()
.with_reply_to(self.queue.name().to_string())
.with_correlation_id(correlation_id.clone()),
))?;
for message in self.consumer.receiver().iter() {
match message {
ConsumerMessage::Delivery(delivery) => {
if delivery.properties.correlation_id().as_ref() == Some(&correlation_id) {
return Ok(String::from_utf8_lossy(&delivery.body).into());
}
}
other => {
println!("Consumer ended: {:?}", other);
break;
}
}
}
Ok("ERROR: server failed to respond to RPC call".to_string())
}
}
fn main() -> Result<()> {
let mut connection = Connection::insecure_open("amqp://guest:guest@localhost:5672")?;
let channel = connection.open_channel(None)?;
let rpc_client = FibonacciRpcClient::new(&channel)?;
println!("Requesting fib(30)");
let result = rpc_client.call(30)?;
println!("Got {}", result);
connection.close()
}