use std::time::Duration;
use ruststream::memory::MemoryBroker;
use ruststream::runtime::{AppInfo, HandlerResult, Identity, RustStream};
use ruststream::subscriber;
use serde::Deserialize;
#[derive(Debug, Deserialize)]
struct Order {
id: u64,
}
#[derive(Debug, Clone)]
struct Database;
#[derive(Debug, thiserror::Error)]
#[error("database error")]
struct DbError;
impl Database {
async fn connect(url: &str) -> Result<Self, DbError> {
println!("connecting to {url}");
tokio::task::yield_now().await; Ok(Self)
}
async fn insert_order(&self, id: u64) -> Result<(), DbError> {
println!("insert order {id}");
tokio::task::yield_now().await;
Ok(())
}
async fn close(&self) {
println!("closing the database");
tokio::task::yield_now().await;
}
}
#[subscriber("orders")]
async fn handle(order: &Order, ctx: &mut Context<'_, (), Database>) -> HandlerResult {
let db = ctx.state();
if db.insert_order(order.id).await.is_err() {
return HandlerResult::retry();
}
HandlerResult::Ack
}
#[ruststream::app]
fn app() -> RustStream<Identity, Database> {
RustStream::new(AppInfo::new("orders", "0.1.0"))
.on_startup(|()| async move { Database::connect("postgres://localhost/orders").await })
.after_shutdown(|db: std::sync::Arc<Database>| async move {
db.close().await;
Ok::<_, DbError>(())
})
.shutdown_timeout(Duration::from_secs(10))
.with_broker(MemoryBroker::new(), |b| b.include(handle))
}