use std::future::Future;
use std::pin::Pin;
use ruststream::codec::{Codec, JsonCodec};
use ruststream::memory::{MemoryBroker, MemoryPublisher};
use ruststream::runtime::{
App, AppInfo, HandlerResult, Outgoing, PublishLayer, PublishNext, PublishTransform, RustStream,
TypedPublisher,
};
use ruststream::{OutgoingMessage, Publisher, subscriber};
use serde::{Deserialize, Serialize};
struct AppState {
egress: MemoryPublisher,
}
#[derive(Debug, Deserialize)]
struct Request {
id: u64,
}
#[derive(Debug, Serialize)]
struct Response {
ok: bool,
}
#[derive(Debug, Deserialize, Serialize)]
struct Event {
id: u64,
}
#[subscriber("requests", publish("responses"))]
async fn respond(req: &Request) -> Response {
println!("responding to request {}", req.id);
Response { ok: true }
}
#[subscriber("validated-requests", publish("responses"))]
async fn validate(req: &Request) -> Result<Response, HandlerResult> {
if req.id == 0 {
return Err(HandlerResult::drop());
}
Ok(Response { ok: true })
}
#[subscriber("ingress")]
async fn forward(event: &Event, ctx: &mut Context<'_, (), AppState>) -> HandlerResult {
let payload = JsonCodec.encode(event).expect("serializable");
let out = OutgoingMessage::new("egress", payload.as_ref());
if ctx.state().egress.publish(out).await.is_err() {
return HandlerResult::retry();
}
HandlerResult::Ack
}
struct EnvelopeTransform;
impl<C> PublishTransform<C> for EnvelopeTransform {
fn apply(&self, out: &mut Outgoing<'_>, _cx: &ruststream::runtime::PublishContext<'_, C>) {
out.headers_mut().insert("x-envelope", b"1".to_vec());
}
}
#[derive(Clone)]
struct AuditPublish;
impl PublishLayer for AuditPublish {
fn on_publish<'a, N: ruststream::runtime::PublishPipeline>(
&'a self,
out: &'a mut Outgoing<'a>,
next: PublishNext<'a, N>,
) -> Pin<
Box<dyn Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>> + Send + 'a>,
> {
Box::pin(async move {
println!("publishing to {}", out.name());
next.run(out).await
})
}
}
#[subscriber(batch("orders"), publish("confirmations"))]
async fn confirm(orders: &[Event]) -> Result<Vec<Event>, HandlerResult> {
if orders.is_empty() {
return Err(HandlerResult::drop()); }
Ok(orders.iter().map(|o| Event { id: o.id }).collect())
}
#[ruststream::app]
fn app() -> impl App {
let broker = MemoryBroker::new();
let egress = broker.publisher();
RustStream::new(AppInfo::new("publishing", "0.1.0"))
.publish_layer(AuditPublish)
.on_startup(move |()| async move { Ok::<_, std::convert::Infallible>(AppState { egress }) })
.with_broker(broker, |b| {
let replies = TypedPublisher::new(b.broker().publisher()).transform(EnvelopeTransform);
b.include_publishing(respond, replies);
let validated = TypedPublisher::new(b.broker().publisher());
b.include_publishing(validate, validated);
b.include(forward);
let confirmations = TypedPublisher::new(b.broker().publisher()).transactional();
b.include_batch_publishing(confirm, confirmations);
})
}