use super::dispatch::Workers;
use super::failure::FailurePolicies;
use super::handler::Handler;
use super::metadata::HandlerMetadata;
pub trait SubscriberDef: Sized {
type Input;
type Handler: Handler<Self::Input>;
type Source;
fn source(&self) -> Self::Source;
fn description(&self) -> Option<&str> {
None
}
fn input_schema(&self) -> Option<String> {
None
}
fn message_name(&self) -> Option<&'static str> {
None
}
fn message_description(&self) -> Option<&'static str> {
None
}
fn workers(&self) -> Workers {
Workers::sequential()
}
fn failure_policies(&self) -> FailurePolicies {
FailurePolicies::default()
}
fn into_handler(self) -> Self::Handler;
}
pub(crate) fn subscriber_metadata<D: SubscriberDef>(name: String, def: &D) -> HandlerMetadata {
let mut meta = HandlerMetadata::typed::<D::Input>(name);
if let Some(description) = def.description() {
meta = meta.with_description(description.to_owned());
}
if let Some(schema) = def.input_schema() {
meta = meta.with_payload_schema(schema);
}
if let Some(message_name) = def.message_name() {
meta = meta.with_message_name(message_name);
}
if let Some(message_description) = def.message_description() {
meta = meta.with_message_description(message_description);
}
meta
}
#[cfg(test)]
mod tests {
use super::{SubscriberDef, subscriber_metadata};
use crate::Headers;
use crate::Name;
use crate::runtime::context::{Context, State};
use crate::runtime::dispatch::{Delivery, Workers};
use crate::runtime::handler::{Handler, HandlerResult, Settle};
struct Noop;
impl Handler<u32> for Noop {
async fn handle(&self, _msg: &u32, _ctx: &mut Context<'_>) -> Settle {
HandlerResult::Ack.into()
}
}
struct ManualDef;
impl SubscriberDef for ManualDef {
type Input = u32;
type Handler = Noop;
type Source = Name;
fn source(&self) -> Name {
Name::new("manual")
}
fn into_handler(self) -> Noop {
Noop
}
}
#[tokio::test]
async fn defaults_omit_metadata_and_dispatch_sequentially() {
let def = ManualDef;
assert_eq!(def.workers(), Workers::sequential());
assert!(def.description().is_none());
assert!(def.input_schema().is_none());
assert!(def.message_name().is_none());
assert!(def.message_description().is_none());
let _source = def.source();
let meta = subscriber_metadata("manual".to_owned(), &def);
assert_eq!(meta.name, "manual");
assert_eq!(meta.input_type, "u32");
let handler = def.into_handler();
let state = State::default();
let delivery = Delivery::empty();
let headers = Headers::new();
let mut ctx = Context::new("manual", &headers, &state, &delivery);
assert_eq!(
handler.handle(&7u32, &mut ctx).await.outcome(),
HandlerResult::Ack
);
}
}