use ruststream::memory::{MemoryBroker, MemoryPublisher};
use ruststream::runtime::{AppInfo, HandlerResult, RustStream};
use ruststream::testing::TestApp;
use ruststream::{OutgoingMessage, Publisher, subscriber};
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, PartialEq, Debug)]
struct Order {
id: u64,
}
#[derive(Serialize, Deserialize, PartialEq, Debug)]
struct Receipt {
order_id: u64,
}
struct AppState {
receipts: MemoryPublisher,
}
#[subscriber("orders")]
async fn handle_order(
order: &Order,
ctx: &mut ruststream::runtime::Context<'_, (), AppState>,
) -> HandlerResult {
let receipt = Receipt { order_id: order.id };
let payload = serde_json::to_vec(&receipt).expect("serialize");
if ctx
.state()
.receipts
.publish(OutgoingMessage::new("receipts", &payload))
.await
.is_err()
{
return HandlerResult::retry();
}
HandlerResult::Ack
}
fn service() -> RustStream<ruststream::runtime::Identity, AppState> {
let broker = MemoryBroker::new();
let receipts = broker.publisher();
RustStream::new(AppInfo::new("orders", "0.1.0"))
.on_startup(
move |()| async move { Ok::<_, std::convert::Infallible>(AppState { receipts }) },
)
.with_broker(broker, |b| b.include(handle_order))
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let tb = TestApp::start(service()).await?;
tb.broker::<MemoryBroker>()
.publish("orders", &Order { id: 42 })
.await?;
tb.broker::<MemoryBroker>()
.subscriber("orders")
.assert_called_once()
.with(&Order { id: 42 })
.settled(HandlerResult::Ack);
tb.broker::<MemoryBroker>()
.published::<Receipt>("receipts")
.assert_called_once()
.with(&Receipt { order_id: 42 });
println!("ok: handler validated, acked, and published a receipt");
Ok(())
}