use std::time::Duration;
use ruststream::memory::MemoryBroker;
use ruststream::runtime::{AppInfo, HandlerResult, RustStream};
use ruststream::subscriber;
use serde::Deserialize;
#[derive(Debug, Deserialize)]
struct Order {
id: u64,
}
#[derive(Debug, Clone)]
struct Database;
#[derive(Debug, thiserror::Error)]
#[error("database error")]
struct DbError;
impl Database {
async fn connect(url: &str) -> Result<Self, DbError> {
println!("connecting to {url}");
tokio::task::yield_now().await; Ok(Self)
}
async fn insert_order(&self, id: u64) -> Result<(), DbError> {
println!("insert order {id}");
tokio::task::yield_now().await;
Ok(())
}
async fn close(&self) {
println!("closing the database");
tokio::task::yield_now().await;
}
}
#[subscriber("orders")]
async fn handle(order: &Order, ctx: &mut Context<'_>) -> HandlerResult {
let db = ctx
.state()
.get::<Database>()
.expect("database inserted in on_startup");
if db.insert_order(order.id).await.is_err() {
return HandlerResult::retry();
}
HandlerResult::Ack
}
#[ruststream::app]
fn app() -> RustStream {
RustStream::new(AppInfo::new("orders", "0.1.0"))
.on_startup(|mut state| async move {
let db = Database::connect("postgres://localhost/orders").await?;
state.insert(db);
Ok::<_, DbError>(state)
})
.after_shutdown(|state| async move {
if let Some(db) = state.get::<Database>() {
db.close().await;
}
Ok::<_, DbError>(())
})
.shutdown_timeout(Duration::from_secs(10))
.with_broker(MemoryBroker::new(), |b| b.include(handle))
}