Skip to main content

batpak_macros/
lib.rs

1//! Proc macros for the batpak event-sourcing runtime.
2//!
3//! This crate is pulled in transitively via `batpak`. Users never add it
4//! to their own `Cargo.toml` — the derives are already in scope via
5//! `use batpak::EventPayload;` or `use batpak::EventSourced;`.
6
7mod event_payload;
8mod operation;
9
10use proc_macro::TokenStream;
11use quote::quote;
12use std::collections::HashSet;
13use syn::{
14    parse_macro_input, spanned::Spanned, Attribute, Data, DeriveInput, Fields, Ident, LitInt, Path,
15};
16
17/// Derives `batpak::event::EventPayload` for a named-field struct.
18///
19/// Requires `#[batpak(category = N, type_id = N)]` on the struct. See
20/// `batpak::event::EventPayload` and ADR-0010 for the full contract.
21#[proc_macro_derive(EventPayload, attributes(batpak))]
22pub fn derive_event_payload(input: TokenStream) -> TokenStream {
23    let input = parse_macro_input!(input as DeriveInput);
24    match event_payload::expand(&input) {
25        Ok(ts) => ts.into(),
26        Err(e) => e.to_compile_error().into(),
27    }
28}
29
30/// `#[operation(...)]` — generate a syncbat operation descriptor + optional
31/// registration fns. Re-exported as `syncbat::operation`; users never name this
32/// crate. (Moved here from the former `syncbat-macros` crate — the family has one
33/// proc-macro crate.)
34#[proc_macro_attribute]
35pub fn operation(attr: TokenStream, item: TokenStream) -> TokenStream {
36    let args = parse_macro_input!(attr as operation::OperationArgs);
37    let function = parse_macro_input!(item as syn::ItemFn);
38    match operation::expand_operation(args, &function) {
39        Ok(tokens) => tokens.into(),
40        Err(error) => error.to_compile_error().into(),
41    }
42}
43
44/// Derives `batpak::event::MultiReactive<Input>` for a named-field struct,
45/// for use with `Store::react_loop_multi` (JSON) or
46/// `Store::react_loop_multi_raw` (msgpack).
47///
48/// Syntax mirrors `#[derive(EventSourced)]`:
49///   * `#[batpak(input = <Lane>)]` — required, once. `Lane` is either
50///     `JsonValueInput` or `RawMsgpackInput`.
51///   * `#[batpak(event = <Payload>, handler = <fn>)]` — one per bound
52///     payload type. At least one is required. `event = T` requires `T` to
53///     be a single-segment path (bring the type into scope with `use` if
54///     needed). This ensures the derive can dedupe event bindings without
55///     running full path resolution.
56///
57/// Generates a `MultiReactive<Input>` impl whose `dispatch` body matches
58/// on `event.header.event_kind`, uses `DecodeTyped::route_typed` per arm,
59/// calls the matching handler, and returns `MultiDispatchError::Decode` on
60/// matched-kind decode failure (unified contract with `TypedReactive<T>`).
61/// Unbound kinds fall through as `Ok(())` — silent filter.
62#[proc_macro_derive(MultiEventReactor, attributes(batpak))]
63pub fn derive_multi_event_reactor(input: TokenStream) -> TokenStream {
64    let input = parse_macro_input!(input as DeriveInput);
65    match expand_multi_event_reactor(&input) {
66        Ok(ts) => ts.into(),
67        Err(e) => e.to_compile_error().into(),
68    }
69}
70
71/// Derives `batpak::event::EventSourced` for a named-field struct.
72///
73/// Requires a config attr `#[batpak(input = <Lane>, cache_version = N)]`
74/// (the `cache_version` key is optional and defaults to 0) plus at least
75/// one event-binding attr `#[batpak(event = <Payload>, handler = <fn>)]`.
76/// `event = T` requires `T` to be a single-segment path (bring the type into
77/// scope with `use` if needed). This ensures the derive can dedupe event
78/// bindings without running full path resolution.
79///
80/// Generates:
81///   - `type Input = <Lane>`
82///   - `from_events` — default fold over `Default::default()`
83///   - `apply_event` — dispatch by `P::KIND` via `DecodeTyped::route_typed`,
84///     with the two failure modes kept rigorously distinct:
85///       * wrong-kind event → silent skip (fall-through to next arm)
86///       * matched-kind + decode failure → `panic!` (see "Panics" below)
87///   - `relevant_event_kinds` — `&[T1::KIND, T2::KIND, ...]` generated from
88///     the `event =` list (single source of truth; sync-drift is impossible)
89///   - `schema_version` — from `cache_version` (projection-cache invalidation
90///     only; unrelated to payload wire `type_id`)
91///
92/// # Panics
93///
94/// The generated `apply_event` **panics** when an event's `event_kind` matches
95/// a bound payload's `KIND` but the payload bytes fail to deserialize into
96/// that payload type. This is a deliberate contract:
97///
98/// 1. The raw `EventSourced` trait's `apply_event` returns `()`, not `Result`.
99///    A hand-written implementation must either panic, log-and-skip, or
100///    log-and-ignore on decode failure. The canonical pattern demonstrated
101///    in the pre-derive `examples/event_sourced_counter.rs` used
102///    `.expect(...)`, which is equivalent.
103///
104/// 2. Matched-kind decode failure is a **hard correctness signal** — the
105///    event was written as this kind but the bytes are malformed (schema
106///    drift, `type_id` reuse, corruption). Silently skipping would produce
107///    incorrect projected state.
108///
109/// If you need fallible replay (log-and-skip, fail-the-projection, custom
110/// recovery), implement `EventSourced` manually. The derive does not offer a
111/// fallible mode because the trait signature does not support one.
112///
113/// See `05_TERMINALS.md`, `08_CIRCUITS.md`, and the Dispatch Chapter plan
114/// for the full contract.
115#[proc_macro_derive(EventSourced, attributes(batpak))]
116pub fn derive_event_sourced(input: TokenStream) -> TokenStream {
117    let input = parse_macro_input!(input as DeriveInput);
118    match expand_event_sourced(&input) {
119        Ok(ts) => ts.into(),
120        Err(e) => e.to_compile_error().into(),
121    }
122}
123
124// ─── EventSourced derive expansion ────────────────────────────────────────────
125
126/// One `#[batpak(event = X, handler = fn)]` entry parsed from the derive
127/// attrs.
128struct EventBinding {
129    event: Path,
130    handler: Ident,
131}
132
133/// Parsed state for a single `#[batpak(...)]` attribute on an `EventSourced`
134/// or `MultiEventReactor` derive. Each attribute is either a `Config` attr
135/// (containing `input`, `cache_version`, `error`, or projection state
136/// contract fields) or an `EventBinding` attr (containing `event` and
137/// `handler`). Mixing keys is a compile-time error.
138enum BatpakAttrKind {
139    Config {
140        input: Option<Path>,
141        cache_version: Option<LitInt>,
142        state_max_cardinality: Option<LitInt>,
143        error: Option<Path>,
144    },
145    Event(EventBinding),
146}
147
148#[derive(Default)]
149struct BatpakAttrParts {
150    input: Option<Path>,
151    cache_version: Option<LitInt>,
152    state_max_cardinality: Option<LitInt>,
153    error_ty: Option<Path>,
154    event: Option<Path>,
155    handler: Option<Ident>,
156}
157
158impl BatpakAttrParts {
159    fn set_nested(&mut self, meta: &syn::meta::ParseNestedMeta<'_>) -> syn::Result<()> {
160        let key = meta.path.get_ident().ok_or_else(|| {
161            meta.error("expected `input`, `cache_version`, `state_max_cardinality`, `error`, `event`, or `handler`")
162        })?;
163        let key_name = key.to_string();
164        if self.set_config_nested(key_name.as_str(), meta)? {
165            return Ok(());
166        }
167        if self.set_event_nested(key_name.as_str(), meta)? {
168            return Ok(());
169        }
170        Err(meta.error(format!(
171            "unknown key `{key_name}`, expected `input`, `cache_version`, `state_max_cardinality`, `error`, `event`, or `handler`"
172        )))
173    }
174
175    fn set_config_nested(
176        &mut self,
177        key: &str,
178        meta: &syn::meta::ParseNestedMeta<'_>,
179    ) -> syn::Result<bool> {
180        match key {
181            "input" => {
182                if self.input.is_some() {
183                    return Err(meta.error("duplicate `input` key within attribute"));
184                }
185                self.input = Some(meta.value()?.parse::<Path>()?);
186            }
187            "cache_version" => {
188                if self.cache_version.is_some() {
189                    return Err(meta.error("duplicate `cache_version` key within attribute"));
190                }
191                self.cache_version = Some(meta.value()?.parse::<LitInt>()?);
192            }
193            "state_max_cardinality" => {
194                if self.state_max_cardinality.is_some() {
195                    return Err(
196                        meta.error("duplicate `state_max_cardinality` key within attribute")
197                    );
198                }
199                self.state_max_cardinality = Some(meta.value()?.parse::<LitInt>()?);
200            }
201            "error" => {
202                if self.error_ty.is_some() {
203                    return Err(meta.error("duplicate `error` key within attribute"));
204                }
205                self.error_ty = Some(meta.value()?.parse::<Path>()?);
206            }
207            _ => return Ok(false),
208        }
209        Ok(true)
210    }
211
212    fn set_event_nested(
213        &mut self,
214        key: &str,
215        meta: &syn::meta::ParseNestedMeta<'_>,
216    ) -> syn::Result<bool> {
217        match key {
218            "event" => {
219                if self.event.is_some() {
220                    return Err(meta.error("duplicate `event` key within attribute"));
221                }
222                self.event = Some(meta.value()?.parse::<Path>()?);
223            }
224            "handler" => {
225                if self.handler.is_some() {
226                    return Err(meta.error("duplicate `handler` key within attribute"));
227                }
228                self.handler = Some(meta.value()?.parse::<Ident>()?);
229            }
230            _ => return Ok(false),
231        }
232        Ok(true)
233    }
234
235    fn finish(self, attr: &Attribute) -> syn::Result<BatpakAttrKind> {
236        let has_config = self.input.is_some()
237            || self.cache_version.is_some()
238            || self.state_max_cardinality.is_some()
239            || self.error_ty.is_some();
240        let has_event = self.event.is_some() || self.handler.is_some();
241
242        if has_config && has_event {
243            return Err(syn::Error::new(
244                attr.span(),
245                "`#[batpak(...)]` attribute must contain either config keys \
246                 (`input`, `cache_version`, `state_max_cardinality`, `error`) or an event-binding pair (`event`, `handler`), not both",
247            ));
248        }
249
250        if has_event {
251            let event = self.event.ok_or_else(|| {
252                syn::Error::new(
253                    attr.span(),
254                    "event-binding attribute is missing `event = <PayloadType>`",
255                )
256            })?;
257            let handler = self.handler.ok_or_else(|| {
258                syn::Error::new(
259                    attr.span(),
260                    "event-binding attribute is missing `handler = <fn_name>`",
261                )
262            })?;
263            return Ok(BatpakAttrKind::Event(EventBinding { event, handler }));
264        }
265
266        if !has_config {
267            return Err(syn::Error::new(
268                attr.span(),
269                "`#[batpak(...)]` must contain at least one key: `input`, `cache_version`, `state_max_cardinality`, `error`, or the `event`/`handler` pair",
270            ));
271        }
272        Ok(BatpakAttrKind::Config {
273            input: self.input,
274            cache_version: self.cache_version,
275            state_max_cardinality: self.state_max_cardinality,
276            error: self.error_ty,
277        })
278    }
279}
280
281fn classify_batpak_attr(attr: &Attribute) -> syn::Result<BatpakAttrKind> {
282    let mut parts = BatpakAttrParts::default();
283    attr.parse_nested_meta(|meta| parts.set_nested(&meta))?;
284    parts.finish(attr)
285}
286
287fn ensure_named_field_struct(input: &DeriveInput, derive_name: &str) -> syn::Result<()> {
288    match &input.data {
289        Data::Struct(s) => match &s.fields {
290            Fields::Named(_) => Ok(()),
291            Fields::Unnamed(f) => Err(syn::Error::new(
292                f.span(),
293                format!(
294                    "#[derive({derive_name})] requires a named-field struct; tuple structs are not supported"
295                ),
296            )),
297            Fields::Unit => Err(syn::Error::new(
298                input.ident.span(),
299                format!(
300                    "#[derive({derive_name})] requires a named-field struct; unit structs are not supported"
301                ),
302            )),
303        },
304        Data::Enum(e) => Err(syn::Error::new(
305            e.enum_token.span,
306            format!("#[derive({derive_name})] requires a named-field struct; enums are not supported"),
307        )),
308        Data::Union(u) => Err(syn::Error::new(
309            u.union_token.span,
310            format!(
311                "#[derive({derive_name})] requires a named-field struct; unions are not supported"
312            ),
313        )),
314    }
315}
316
317struct EventSourcedDeriveAttrs {
318    input_path: Path,
319    cache_version_lit: Option<LitInt>,
320    state_max_cardinality_lit: Option<LitInt>,
321    bindings: Vec<EventBinding>,
322}
323
324fn collect_event_sourced_attrs(input: &DeriveInput) -> syn::Result<EventSourcedDeriveAttrs> {
325    let batpak_attrs: Vec<&Attribute> = input
326        .attrs
327        .iter()
328        .filter(|a| a.path().is_ident("batpak"))
329        .collect();
330
331    if batpak_attrs.is_empty() {
332        return Err(syn::Error::new(
333            input.ident.span(),
334            "#[derive(EventSourced)] requires at least one `#[batpak(input = <Lane>)]` attribute",
335        ));
336    }
337
338    let mut input_path: Option<Path> = None;
339    let mut cache_version_lit: Option<LitInt> = None;
340    let mut state_max_cardinality_lit: Option<LitInt> = None;
341    let mut bindings: Vec<EventBinding> = Vec::new();
342    let mut seen_events: HashSet<String> = HashSet::new();
343
344    for attr in &batpak_attrs {
345        match classify_batpak_attr(attr)? {
346            BatpakAttrKind::Config {
347                input: attr_input,
348                cache_version: attr_cache,
349                state_max_cardinality: attr_state_max,
350                error: attr_error,
351            } => {
352                collect_event_sourced_config(
353                    &mut input_path,
354                    &mut cache_version_lit,
355                    &mut state_max_cardinality_lit,
356                    attr_input,
357                    attr_cache,
358                    attr_state_max,
359                    attr_error,
360                )?;
361            }
362            BatpakAttrKind::Event(binding) => {
363                collect_unique_event_binding(
364                    &mut bindings,
365                    &mut seen_events,
366                    binding,
367                    "projection",
368                )?;
369            }
370        }
371    }
372
373    let input_path = input_path.ok_or_else(|| {
374        syn::Error::new(
375            input.ident.span(),
376            "#[derive(EventSourced)] requires `#[batpak(input = <Lane>)]` — e.g. `input = JsonValueInput` or `input = RawMsgpackInput`",
377        )
378    })?;
379
380    if bindings.is_empty() {
381        return Err(syn::Error::new(
382            input.ident.span(),
383            "`#[derive(EventSourced)]` requires at least one `#[batpak(event = T, handler = h)]` binding",
384        ));
385    }
386
387    Ok(EventSourcedDeriveAttrs {
388        input_path,
389        cache_version_lit,
390        state_max_cardinality_lit,
391        bindings,
392    })
393}
394
395fn collect_event_sourced_config(
396    input_path: &mut Option<Path>,
397    cache_version_lit: &mut Option<LitInt>,
398    state_max_cardinality_lit: &mut Option<LitInt>,
399    attr_input: Option<Path>,
400    attr_cache: Option<LitInt>,
401    attr_state_max: Option<LitInt>,
402    attr_error: Option<Path>,
403) -> syn::Result<()> {
404    if let Some(path) = attr_error {
405        return Err(syn::Error::new(
406            path.span(),
407            "`error` is not valid on `#[derive(EventSourced)]` — projections do not have an associated error type",
408        ));
409    }
410    if let Some(path) = attr_input {
411        if input_path.is_some() {
412            return Err(syn::Error::new(
413                path.span(),
414                "duplicate `input =` across `#[batpak(...)]` config attributes — `input` must appear exactly once",
415            ));
416        }
417        *input_path = Some(path);
418    }
419    if let Some(lit) = attr_cache {
420        if cache_version_lit.is_some() {
421            return Err(syn::Error::new(
422                lit.span(),
423                "duplicate `cache_version =` across `#[batpak(...)]` config attributes",
424            ));
425        }
426        *cache_version_lit = Some(lit);
427    }
428    if let Some(lit) = attr_state_max {
429        if state_max_cardinality_lit.is_some() {
430            return Err(syn::Error::new(
431                lit.span(),
432                "duplicate `state_max_cardinality =` across `#[batpak(...)]` config attributes",
433            ));
434        }
435        *state_max_cardinality_lit = Some(lit);
436    }
437    Ok(())
438}
439
440fn collect_unique_event_binding(
441    bindings: &mut Vec<EventBinding>,
442    seen_events: &mut HashSet<String>,
443    binding: EventBinding,
444    owner: &str,
445) -> syn::Result<()> {
446    require_single_segment_event_path(&binding.event)?;
447    let key = binding.event.to_token_stream_string();
448    if !seen_events.insert(key) {
449        return Err(syn::Error::new(
450            binding.event.span(),
451            format!(
452                "duplicate `event = X` — each payload type may be bound to exactly one handler per {owner}"
453            ),
454        ));
455    }
456    bindings.push(binding);
457    Ok(())
458}
459
460fn expand_event_sourced(input: &DeriveInput) -> syn::Result<proc_macro2::TokenStream> {
461    ensure_named_field_struct(input, "EventSourced")?;
462
463    let attrs = collect_event_sourced_attrs(input)?;
464    let input_path = attrs.input_path;
465    let cache_version_lit = attrs.cache_version_lit;
466    let state_max_cardinality_lit = attrs.state_max_cardinality_lit;
467    let bindings = attrs.bindings;
468
469    // ─── Validate cache_version fits u64 ────────────────────────────────────
470    let cache_version_value: u64 = match &cache_version_lit {
471        Some(lit) => lit.base10_parse::<u64>()?,
472        None => 0u64,
473    };
474
475    // ─── Codegen ─────────────────────────────────────────────────────────────
476    let ident = &input.ident;
477    let (impl_generics, ty_generics, where_clause) = input.generics.split_for_impl();
478
479    let state_contract_impl = match &state_max_cardinality_lit {
480        Some(lit) => {
481            let state_max_cardinality_value = lit.base10_parse::<u64>()?;
482            // The derived `state_extent` reports a fixed cardinality of 1, so a
483            // declared bound above 1 would be vacuous (always 1 <= n). Reject it:
484            // multi-key projections must hand-implement `EventSourced` with a real
485            // `state_extent()` so the P2 bound check stays meaningful.
486            if state_max_cardinality_value != 1 {
487                return Err(syn::Error::new_spanned(
488                    lit,
489                    "#[derive(EventSourced)] supports only single-aggregate state (n = 1); \
490                     implement EventSourced by hand with a real `state_extent()` for multi-key state",
491                ));
492            }
493            quote! {
494                const STATE_CONTRACT: ::batpak::event::ProjectionStateContract =
495                    ::batpak::event::ProjectionStateContract::Bounded {
496                        key_space: ::core::concat!(
497                            ::core::module_path!(),
498                            "::",
499                            ::core::stringify!(#ident)
500                        ),
501                        max_cardinality: #state_max_cardinality_value,
502                        retention_policy: "derive-event-sourced-state-object",
503                        compaction_policy: "projection-cache-overwrite",
504                        checkpoint_policy: "projection-cache",
505                    };
506
507                fn state_extent(&self) -> ::batpak::event::StateExtent {
508                    let _ = self;
509                    ::batpak::event::StateExtent::cardinality(
510                        1,
511                        ::batpak::event::StateExtentCost::ConstantTime,
512                    )
513                }
514            }
515        }
516        None => quote! {},
517    };
518
519    // Build apply_event dispatch arms — one per event-binding. Handlers
520    // take `&T` so users can read fields without being forced to consume
521    // the payload (the clippy::needless_pass_by_value would otherwise fire
522    // on every handler that does not move-use its argument).
523    let arms: Vec<proc_macro2::TokenStream> = bindings
524        .iter()
525        .map(|b| {
526            let event_ty = &b.event;
527            let handler_fn = &b.handler;
528            quote! {
529                // route_typed → Ok(Some(p)): matched kind, decode ok → call handler
530                // route_typed → Ok(None):    wrong kind (normal filter) → fall through
531                // route_typed → Err(e):      matched kind but decode failed → hard
532                //                             correctness signal, panic with source
533                match ::batpak::event::DecodeTyped::route_typed::<#event_ty>(event) {
534                    ::core::result::Result::Ok(::core::option::Option::Some(__p)) => {
535                        self.#handler_fn(&__p);
536                        return;
537                    }
538                    ::core::result::Result::Ok(::core::option::Option::None) => {}
539                    ::core::result::Result::Err(__e) => {
540                        ::core::panic!(
541                            "EventSourced: decode failed for matched kind {}: {}",
542                            ::core::stringify!(#event_ty),
543                            __e
544                        );
545                    }
546                }
547            }
548        })
549        .collect();
550
551    // relevant_event_kinds: compile-time const array from the event= list.
552    let kind_exprs: Vec<proc_macro2::TokenStream> = bindings
553        .iter()
554        .map(|b| {
555            let event_ty = &b.event;
556            quote! {
557                <#event_ty as ::batpak::event::EventPayload>::KIND
558            }
559        })
560        .collect();
561    let kind_count = bindings.len();
562
563    // Handler-signature pins live inside a generic impl so they can reference
564    // `Self`-with-type-params. Module-scope `const _: fn(...)` items can't
565    // reintroduce generics; this pattern does. When the user's
566    // `fn on_x(&mut self, &T)` has the wrong parameter types, rustc spans the
567    // error at the generated fn-pointer coercion rather than inside an opaque
568    // dispatch arm.
569    let handler_checks: Vec<proc_macro2::TokenStream> = bindings
570        .iter()
571        .map(|b| {
572            let event_ty = &b.event;
573            let handler_fn = &b.handler;
574            quote! {
575                let _: fn(&mut Self, &#event_ty) = Self::#handler_fn;
576            }
577        })
578        .collect();
579
580    // C5: pin the `input = T` attribute's type to `ProjectionInput` at
581    // derive-expansion site. A non-`ProjectionInput` `input` errors here with
582    // the attribute's path visible in the trace, rather than bubbling up from
583    // inside generated trait-impl machinery.
584    let input_assertion = {
585        quote! {
586            const _: fn() = || {
587                fn __batpak_assert_projection_input<T: ::batpak::event::ProjectionInput>() {}
588                __batpak_assert_projection_input::<#input_path>();
589            };
590        }
591    };
592
593    Ok(quote! {
594        #input_assertion
595
596        impl #impl_generics ::batpak::event::EventSourced for #ident #ty_generics #where_clause {
597            type Input = #input_path;
598
599            #state_contract_impl
600
601            fn from_events(
602                events: &[::batpak::event::ProjectionEvent<Self>],
603            ) -> ::core::option::Option<Self> {
604                if events.is_empty() {
605                    return ::core::option::Option::None;
606                }
607                let mut state: Self = ::core::default::Default::default();
608                for __ev in events {
609                    state.apply_event(__ev);
610                }
611                ::core::option::Option::Some(state)
612            }
613
614            fn apply_event(&mut self, event: &::batpak::event::ProjectionEvent<Self>) {
615                #(#handler_checks)*
616                // Each arm keeps wrong-kind filtering (Ok(None)) separate from
617                // matched-kind decode failure (Err). A fall-through past all
618                // arms means "kind outside relevant_event_kinds()" — normal
619                // skip, not an error.
620                #(#arms)*
621                // Fall-through: unrelated kind. No-op.
622                let _ = event;
623            }
624
625            fn relevant_event_kinds() -> &'static [::batpak::event::EventKind] {
626                static KINDS: [::batpak::event::EventKind; #kind_count] = [
627                    #(#kind_exprs),*
628                ];
629                &KINDS
630            }
631
632            fn schema_version() -> u64 {
633                // `cache_version` is the projection-cache invalidation key.
634                // Unrelated to payload wire `type_id` — they live in different
635                // layers (ADR-0010 vs this derive).
636                #cache_version_value
637            }
638        }
639    })
640}
641
642trait ToTokenStreamString {
643    fn to_token_stream_string(&self) -> String;
644}
645
646impl ToTokenStreamString for Path {
647    fn to_token_stream_string(&self) -> String {
648        quote!(#self).to_string()
649    }
650}
651
652/// Enforce that an `event = <Path>` attribute value is a single-segment,
653/// unqualified type name (no `crate::`, no `my_mod::`, no leading `::`).
654///
655/// The derive deduplicates event bindings by stringifying the path — if
656/// multi-segment paths were allowed, `Foo` and `crate::Foo` could alias the
657/// same type but compare unequal, producing undetected duplicates. Requiring a
658/// single-segment name lets stringified comparison act as a semantic compare
659/// without running full path resolution. Users who need a type from another
660/// module bring it into scope with `use`.
661fn require_single_segment_event_path(path: &Path) -> syn::Result<()> {
662    if path.leading_colon.is_some() || path.segments.len() != 1 {
663        return Err(syn::Error::new_spanned(
664            path,
665            "event type must be named by its in-scope single-segment name — use a `use` import if the type is in another module",
666        ));
667    }
668    Ok(())
669}
670
671// ─── MultiEventReactor derive expansion ──────────────────────────────────────
672
673fn expand_multi_event_reactor(input: &DeriveInput) -> syn::Result<proc_macro2::TokenStream> {
674    ensure_named_field_struct(input, "MultiEventReactor")?;
675
676    let batpak_attrs: Vec<&Attribute> = input
677        .attrs
678        .iter()
679        .filter(|a| a.path().is_ident("batpak"))
680        .collect();
681
682    if batpak_attrs.is_empty() {
683        return Err(syn::Error::new(
684            input.ident.span(),
685            "#[derive(MultiEventReactor)] requires `#[batpak(input = <Lane>)]` plus at least one `#[batpak(event = <Payload>, handler = <fn>)]` attribute",
686        ));
687    }
688
689    let mut input_path: Option<Path> = None;
690    let mut error_path: Option<Path> = None;
691    let mut bindings: Vec<EventBinding> = Vec::new();
692    let mut seen_events: HashSet<String> = HashSet::new();
693
694    for attr in &batpak_attrs {
695        match classify_batpak_attr(attr)? {
696            BatpakAttrKind::Config {
697                input: attr_input,
698                cache_version,
699                state_max_cardinality,
700                error: attr_error,
701            } => {
702                if let Some(lit) = cache_version {
703                    return Err(syn::Error::new(
704                        lit.span(),
705                        "`cache_version` is not valid on `#[derive(MultiEventReactor)]` — \
706                         `cache_version` is a projection-cache key, not a reactor setting",
707                    ));
708                }
709                if let Some(lit) = state_max_cardinality {
710                    return Err(syn::Error::new(
711                        lit.span(),
712                        "`state_max_cardinality` is not valid on `#[derive(MultiEventReactor)]` — \
713                         state cardinality is a projection contract, not a reactor setting",
714                    ));
715                }
716                if let Some(path) = attr_input {
717                    if input_path.is_some() {
718                        return Err(syn::Error::new(
719                            path.span(),
720                            "duplicate `input =` across `#[batpak(...)]` config attributes — `input` must appear exactly once",
721                        ));
722                    }
723                    input_path = Some(path);
724                }
725                if let Some(path) = attr_error {
726                    if error_path.is_some() {
727                        return Err(syn::Error::new(
728                            path.span(),
729                            "duplicate `error =` across `#[batpak(...)]` config attributes — `error` must appear exactly once",
730                        ));
731                    }
732                    error_path = Some(path);
733                }
734            }
735            BatpakAttrKind::Event(binding) => {
736                collect_unique_event_binding(&mut bindings, &mut seen_events, binding, "reactor")?;
737            }
738        }
739    }
740
741    let input_path = input_path.ok_or_else(|| {
742        syn::Error::new(
743            input.ident.span(),
744            "#[derive(MultiEventReactor)] requires `#[batpak(input = <Lane>)]` — e.g. `input = JsonValueInput` or `input = RawMsgpackInput`",
745        )
746    })?;
747    let error_path = error_path.ok_or_else(|| {
748        syn::Error::new(
749            input.ident.span(),
750            "#[derive(MultiEventReactor)] requires `#[batpak(error = <ErrorType>)]` — the shared error type all handlers return",
751        )
752    })?;
753
754    if bindings.is_empty() {
755        return Err(syn::Error::new(
756            input.ident.span(),
757            "#[derive(MultiEventReactor)] requires at least one `#[batpak(event = <Payload>, handler = <fn>)]`",
758        ));
759    }
760
761    let ident = &input.ident;
762    let (impl_generics, ty_generics, where_clause) = input.generics.split_for_impl();
763
764    let kind_exprs: Vec<proc_macro2::TokenStream> = bindings
765        .iter()
766        .map(|b| {
767            let event_ty = &b.event;
768            quote! {
769                <#event_ty as ::batpak::event::EventPayload>::KIND
770            }
771        })
772        .collect();
773    let kind_count = bindings.len();
774
775    // Generate dispatch arms. Each arm uses DecodeTyped::route_typed on
776    // the inner Event to decode matched kinds to the bound type, then
777    // builds &StoredEvent<T> (carrying the source coordinate) for the
778    // handler. Wrong-kind events fall through and return Ok(());
779    // matched-kind decode failure returns MultiDispatchError::Decode.
780    let arms: Vec<proc_macro2::TokenStream> = bindings
781        .iter()
782        .map(|b| {
783            let event_ty = &b.event;
784            let handler_fn = &b.handler;
785            quote! {
786                match ::batpak::event::DecodeTyped::route_typed::<#event_ty>(&event.event) {
787                    ::core::result::Result::Ok(::core::option::Option::Some(__p)) => {
788                        let __typed_event = ::batpak::event::StoredEvent {
789                            coordinate: event.coordinate.clone(),
790                            event: ::batpak::event::Event {
791                                header: event.event.header.clone(),
792                                payload: __p,
793                                hash_chain: event.event.hash_chain.clone(),
794                            },
795                        };
796                        return self
797                            .#handler_fn(&__typed_event, out, at_least_once)
798                            .map_err(::batpak::event::MultiDispatchError::User);
799                    }
800                    ::core::result::Result::Ok(::core::option::Option::None) => {}
801                    ::core::result::Result::Err(__e) => {
802                        return ::core::result::Result::Err(
803                            ::batpak::event::MultiDispatchError::Decode(__e)
804                        );
805                    }
806                }
807            }
808        })
809        .collect();
810
811    // Handler-signature pins live inside a generic impl so they can reference
812    // `Self`-with-type-params. Module-scope `const _: fn(...)` items can't
813    // reintroduce generics; this pattern does. Mismatched handler signatures
814    // surface as span-pointed errors at the user's handler, not inside the
815    // dispatch body.
816    let handler_checks: Vec<proc_macro2::TokenStream> = bindings
817        .iter()
818        .map(|b| {
819            let event_ty = &b.event;
820            let handler_fn = &b.handler;
821            quote! {
822                let _: fn(
823                    &mut Self,
824                    &::batpak::event::StoredEvent<#event_ty>,
825                    &mut ::batpak::store::ReactionBatch,
826                    ::core::option::Option<&::batpak::store::AtLeastOnce>,
827                ) -> ::core::result::Result<(), #error_path> = Self::#handler_fn;
828            }
829        })
830        .collect();
831
832    // C5: pin attribute types at expansion site. A non-`ProjectionInput`
833    // `input` or a non-`std::error::Error + Send + Sync + 'static` `error`
834    // errors here with the attribute's path visible in the trace, rather than
835    // bubbling up from inside generated trait-impl machinery.
836    let attr_assertions = {
837        quote! {
838            const _: fn() = || {
839                fn __batpak_assert_projection_input<T: ::batpak::event::ProjectionInput>() {}
840                __batpak_assert_projection_input::<#input_path>();
841            };
842            const _: fn() = || {
843                fn __batpak_assert_error<
844                    T: ::core::marker::Send
845                        + ::core::marker::Sync
846                        + 'static
847                        + ::std::error::Error,
848                >() {}
849                __batpak_assert_error::<#error_path>();
850            };
851        }
852    };
853
854    Ok(quote! {
855        #attr_assertions
856
857        impl #impl_generics ::batpak::event::MultiReactive<#input_path>
858        for #ident #ty_generics #where_clause
859        {
860            type Error = #error_path;
861
862            fn relevant_event_kinds() -> &'static [::batpak::event::EventKind] {
863                static KINDS: [::batpak::event::EventKind; #kind_count] = [
864                    #(#kind_exprs),*
865                ];
866                &KINDS
867            }
868
869            fn dispatch(
870                &mut self,
871                event: &::batpak::event::StoredEvent<
872                    <#input_path as ::batpak::event::ProjectionInput>::Payload,
873                >,
874                out: &mut ::batpak::store::ReactionBatch,
875                at_least_once: ::core::option::Option<&::batpak::store::AtLeastOnce>,
876            ) -> ::core::result::Result<(), ::batpak::event::MultiDispatchError<Self::Error>> {
877                #(#handler_checks)*
878                #(#arms)*
879                // Wrong kind / no binding matched — silent filter.
880                ::core::result::Result::Ok(())
881            }
882        }
883    })
884}