use std::time::Duration;
use async_stomp::*;
use client::{Connector, Subscriber};
use futures::future::ok;
use futures::prelude::*;
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
let conn = Connector::builder()
.server("127.0.0.1:61613")
.virtualhost("/")
.login("guest".to_string())
.passcode("guest".to_string())
.connect()
.await?;
tokio::time::sleep(Duration::from_millis(200)).await;
let (mut sink, stream) = conn.split();
let fut1 = async move {
let subscribe = Subscriber::builder()
.destination("rusty")
.id("myid")
.subscribe();
sink.send(subscribe).await?;
println!("Subscribe sent");
tokio::time::sleep(Duration::from_millis(200)).await;
sink.send(
ToServer::Send {
destination: "rusty".into(),
transaction: None,
headers: None,
body: Some(b"Hello there rustaceans!".to_vec()),
}
.into(),
)
.await?;
println!("Message sent");
tokio::time::sleep(Duration::from_millis(200)).await;
sink.send(ToServer::Unsubscribe { id: "myid".into() }.into())
.await?;
println!("Unsubscribe sent");
tokio::time::sleep(Duration::from_millis(200)).await;
tokio::time::sleep(Duration::from_secs(1)).await;
sink.send(ToServer::Disconnect { receipt: None }.into())
.await?;
println!("Disconnect sent");
Ok(())
};
let fut2 = stream.try_for_each(|item| {
if let FromServer::Message { body, .. } = item.content {
println!(
"Message received: {:?}",
String::from_utf8_lossy(&body.unwrap())
);
} else {
println!("{item:?}");
}
ok(())
});
futures::future::select(Box::pin(fut1), Box::pin(fut2))
.await
.factor_first()
.0
}