use std::future::Future;
use std::pin::Pin;
use ruststream::memory::MemoryBroker;
use ruststream::runtime::{
AppInfo, HandlerResult, Outgoing, PublishLayer, PublishMiddleware, PublishNext, RustStream,
TypedPublisher,
};
use ruststream::subscriber;
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize)]
struct Request {
id: u64,
}
#[derive(Debug, Serialize)]
struct Response {
ok: bool,
}
#[derive(Debug, Deserialize, Serialize)]
struct Event {
id: u64,
}
#[subscriber("requests", publish("responses"))]
async fn respond(req: &Request) -> Response {
println!("responding to request {}", req.id);
Response { ok: true }
}
#[subscriber("validated-requests", publish("responses"))]
async fn validate(req: &Request) -> Result<Response, HandlerResult> {
if req.id == 0 {
return Err(HandlerResult::drop());
}
Ok(Response { ok: true })
}
#[subscriber("ingress")]
async fn forward(event: &Event, ctx: &mut Context<'_>) -> HandlerResult {
if let Some(publisher) = ctx.publisher("egress") {
let out = Outgoing::new("egress", serde_json::to_vec(event).expect("serializable"));
if publisher.publish(out).await.is_err() {
return HandlerResult::retry();
}
}
HandlerResult::Ack
}
struct EnvelopeLayer;
impl PublishLayer for EnvelopeLayer {
fn apply(&self, out: &mut Outgoing) {
out.headers_mut().insert("x-envelope", b"1".to_vec());
}
}
struct AuditPublish;
impl PublishMiddleware for AuditPublish {
fn on_publish<'a>(
&'a self,
out: &'a mut Outgoing,
next: PublishNext<'a>,
) -> Pin<
Box<dyn Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>> + Send + 'a>,
> {
Box::pin(async move {
println!("publishing to {}", out.name());
next.run(out).await
})
}
}
#[subscriber(batch("orders"), publish("confirmations"))]
async fn confirm(orders: &[Event]) -> Result<Vec<Event>, HandlerResult> {
if orders.is_empty() {
return Err(HandlerResult::drop()); }
Ok(orders.iter().map(|o| Event { id: o.id }).collect())
}
#[ruststream::app]
fn app() -> RustStream {
let broker = MemoryBroker::new();
let egress = broker.publisher();
RustStream::new(AppInfo::new("publishing", "0.1.0"))
.publish_layer(AuditPublish)
.publisher("egress", egress)
.with_broker(broker, |b| {
let replies = TypedPublisher::new(b.broker().publisher()).layer(EnvelopeLayer);
b.include_publishing(respond, replies);
let validated = TypedPublisher::new(b.broker().publisher());
b.include_publishing(validate, validated);
b.include(forward);
let confirmations = TypedPublisher::new(b.broker().publisher()).transactional();
b.include_batch_publishing(confirm, confirmations);
})
}