1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
//! Procedural macros for [RustStream](https://github.com/powersemmi/ruststream).
//!
//! Re-exported from the `ruststream` crate under the `macros` feature; depend on that rather than
//! on this crate directly.
use TokenStream;
use TokenStream as TokenStream2;
use quote;
use ;
use ;
/// Turns an `async fn` handler into a mountable subscriber definition.
///
/// ```ignore
/// /// Processes incoming orders.
/// #[subscriber("orders")]
/// async fn handle(order: &Order) -> HandlerResult { HandlerResult::Ack }
/// // later: broker_scope.include(handle);
///
/// // reply form: the return value is encoded and published to "responses" through the
/// // TypedPublisher (broker + reply codec) passed at wiring time.
/// #[subscriber("requests", publish("responses"))]
/// async fn reply(req: &Request) -> Response { /* ... */ }
/// // later: broker_scope.include_publishing(reply, typed_publisher);
///
/// // reply form with explicit ack control: `Ok` publishes the reply, `Err` skips it and the
/// // dispatcher acts on the returned HandlerResult.
/// #[subscriber("requests", publish("responses"))]
/// async fn confirm(req: &Request) -> Result<Response, HandlerResult> { /* ... */ }
///
/// // batch form: the handler takes the whole decoded batch as a slice; the source's
/// // subscriber must implement BatchSubscriber. Mounted with include_batch.
/// #[subscriber(batch("orders"))]
/// async fn bill(orders: &[Order]) -> HandlerResult { /* settles the whole batch */ }
/// ```
///
/// Without `publish(..)` the handler returns any `IntoHandlerResult` (a `HandlerResult`, `()`, or
/// `Result<_, E>`). With `publish(..)` it returns the reply value to publish, or
/// `Result<Reply, HandlerResult>` to control acknowledgement: `Err(result)` publishes nothing and
/// returns `result` to the dispatcher. The `Result` form is detected syntactically, so spell it
/// out in the signature (a type alias is treated as a plain reply type).
///
/// Wrapping the source in `batch(..)` switches the definition to a `BatchDef`: the handler takes
/// `&[T]` and runs once per batch pulled from the broker's `BatchSubscriber` (use the `Buffered`
/// adapter for brokers without native batching). It returns any `IntoBatchResult` - one outcome
/// for the whole batch (`HandlerResult`, `()`, `Result<_, E>`), or `Vec<HandlerResult>` to settle
/// element `i` of the slice with outcome `i`. The source type is recovered from the constructor
/// path, so a generic source spells its parameters:
/// `batch(Buffered::<Name>::new(Name::new("orders")))`.
///
/// Combining `batch(..)` with `publish(..)` produces a `BatchPublishingDef` (mounted with
/// `include_batch_publishing`): the handler returns `Vec<Reply>` (or
/// `Result<Vec<Reply>, HandlerResult>` for explicit ack control, all-or-nothing - selective
/// outcomes do not compose with a transaction), every reply is published to the reply name, and
/// the whole batch is acked after. Hand the mount a `TypedPublisher` for independent reply
/// publishes, or `.transactional()` for one transaction per batch.
///
/// A `workers(n)` clause processes up to `n` deliveries (or batches) of this subscriber
/// concurrently, each in its own task; global processing order is lost by design, and
/// back-pressure holds at `n` in-flight deliveries. `workers(n, by_key)` switches to `n`
/// sequential lanes keyed by the message's partition key, preserving per-key ordering
/// (single-message forms only). The default is the sequential loop.
///
/// In both forms the handler may declare an optional second parameter, the per-delivery
/// `&mut Context`, to read app state or publish manually.
/// Generates a `main` entry point for a `RustStream` service.
///
/// Place it on a synchronous, argument-free function that builds and returns a `RustStream`
/// application. The expansion keeps the function and adds a `main` that hands it to
/// `ruststream::runtime::cli::run_main`, producing a binary that understands the `run` and
/// `asyncapi gen` commands with no hand-written runtime boilerplate.
///
/// ```ignore
/// #[ruststream::app]
/// fn app() -> RustStream {
/// RustStream::new(AppInfo::new("svc", "0.1.0")).register_broker(MemoryBroker::new())
/// }
/// ```
/// Derives [`Message`](../ruststream/trait.Message.html) metadata: the type name and its doc
/// comment.
///
/// ```ignore
/// /// An order placed by a customer.
/// #[derive(Message)]
/// struct Order { id: u32 }
/// // Order::NAME == "Order", Order::DESCRIPTION == Some("An order placed by a customer.")
/// ```