1mod event_payload;
8
9use proc_macro::TokenStream;
10use quote::quote;
11use std::collections::HashSet;
12use syn::{
13 parse_macro_input, spanned::Spanned, Attribute, Data, DeriveInput, Fields, Ident, LitInt, Path,
14};
15
16#[proc_macro_derive(EventPayload, attributes(batpak))]
21pub fn derive_event_payload(input: TokenStream) -> TokenStream {
22 let input = parse_macro_input!(input as DeriveInput);
23 match event_payload::expand(&input) {
24 Ok(ts) => ts.into(),
25 Err(e) => e.to_compile_error().into(),
26 }
27}
28
29#[proc_macro_derive(MultiEventReactor, attributes(batpak))]
48pub fn derive_multi_event_reactor(input: TokenStream) -> TokenStream {
49 let input = parse_macro_input!(input as DeriveInput);
50 match expand_multi_event_reactor(&input) {
51 Ok(ts) => ts.into(),
52 Err(e) => e.to_compile_error().into(),
53 }
54}
55
56#[proc_macro_derive(EventSourced, attributes(batpak))]
101pub fn derive_event_sourced(input: TokenStream) -> TokenStream {
102 let input = parse_macro_input!(input as DeriveInput);
103 match expand_event_sourced(&input) {
104 Ok(ts) => ts.into(),
105 Err(e) => e.to_compile_error().into(),
106 }
107}
108
109struct EventBinding {
114 event: Path,
115 handler: Ident,
116}
117
118enum BatpakAttrKind {
124 Config {
125 input: Option<Path>,
126 cache_version: Option<LitInt>,
127 error: Option<Path>,
128 },
129 Event(EventBinding),
130}
131
132fn classify_batpak_attr(attr: &Attribute) -> syn::Result<BatpakAttrKind> {
133 let mut input: Option<Path> = None;
135 let mut cache_version: Option<LitInt> = None;
136 let mut error_ty: Option<Path> = None;
137 let mut event: Option<Path> = None;
138 let mut handler: Option<Ident> = None;
139
140 attr.parse_nested_meta(|meta| {
141 let key = meta.path.get_ident().ok_or_else(|| {
142 meta.error("expected `input`, `cache_version`, `error`, `event`, or `handler`")
143 })?;
144 match key.to_string().as_str() {
145 "input" => {
146 if input.is_some() {
147 return Err(meta.error("duplicate `input` key within attribute"));
148 }
149 input = Some(meta.value()?.parse::<Path>()?);
150 }
151 "cache_version" => {
152 if cache_version.is_some() {
153 return Err(meta.error("duplicate `cache_version` key within attribute"));
154 }
155 cache_version = Some(meta.value()?.parse::<LitInt>()?);
156 }
157 "error" => {
158 if error_ty.is_some() {
159 return Err(meta.error("duplicate `error` key within attribute"));
160 }
161 error_ty = Some(meta.value()?.parse::<Path>()?);
162 }
163 "event" => {
164 if event.is_some() {
165 return Err(meta.error("duplicate `event` key within attribute"));
166 }
167 event = Some(meta.value()?.parse::<Path>()?);
168 }
169 "handler" => {
170 if handler.is_some() {
171 return Err(meta.error("duplicate `handler` key within attribute"));
172 }
173 handler = Some(meta.value()?.parse::<Ident>()?);
174 }
175 other => {
176 return Err(meta.error(format!(
177 "unknown key `{other}`, expected `input`, `cache_version`, `error`, `event`, or `handler`"
178 )));
179 }
180 }
181 Ok(())
182 })?;
183
184 let has_config = input.is_some() || cache_version.is_some() || error_ty.is_some();
185 let has_event = event.is_some() || handler.is_some();
186
187 if has_config && has_event {
188 return Err(syn::Error::new(
189 attr.span(),
190 "`#[batpak(...)]` attribute must contain either config keys \
191 (`input`, `cache_version`, `error`) or an event-binding pair (`event`, `handler`), not both",
192 ));
193 }
194
195 if has_event {
196 let event = event.ok_or_else(|| {
197 syn::Error::new(
198 attr.span(),
199 "event-binding attribute is missing `event = <PayloadType>`",
200 )
201 })?;
202 let handler = handler.ok_or_else(|| {
203 syn::Error::new(
204 attr.span(),
205 "event-binding attribute is missing `handler = <fn_name>`",
206 )
207 })?;
208 return Ok(BatpakAttrKind::Event(EventBinding { event, handler }));
209 }
210
211 if !has_config {
213 return Err(syn::Error::new(
214 attr.span(),
215 "`#[batpak(...)]` must contain at least one key: `input`, `cache_version`, `error`, or the `event`/`handler` pair",
216 ));
217 }
218 Ok(BatpakAttrKind::Config {
219 input,
220 cache_version,
221 error: error_ty,
222 })
223}
224
225fn expand_event_sourced(input: &DeriveInput) -> syn::Result<proc_macro2::TokenStream> {
226 match &input.data {
228 Data::Struct(s) => match &s.fields {
229 Fields::Named(_) => {}
230 Fields::Unnamed(f) => {
231 return Err(syn::Error::new(
232 f.span(),
233 "#[derive(EventSourced)] requires a named-field struct; tuple structs are not supported",
234 ));
235 }
236 Fields::Unit => {
237 return Err(syn::Error::new(
238 input.ident.span(),
239 "#[derive(EventSourced)] requires a named-field struct; unit structs are not supported",
240 ));
241 }
242 },
243 Data::Enum(e) => {
244 return Err(syn::Error::new(
245 e.enum_token.span,
246 "#[derive(EventSourced)] requires a named-field struct; enums are not supported",
247 ));
248 }
249 Data::Union(u) => {
250 return Err(syn::Error::new(
251 u.union_token.span,
252 "#[derive(EventSourced)] requires a named-field struct; unions are not supported",
253 ));
254 }
255 }
256
257 let batpak_attrs: Vec<&Attribute> = input
259 .attrs
260 .iter()
261 .filter(|a| a.path().is_ident("batpak"))
262 .collect();
263
264 if batpak_attrs.is_empty() {
265 return Err(syn::Error::new(
266 input.ident.span(),
267 "#[derive(EventSourced)] requires at least one `#[batpak(input = <Lane>)]` attribute",
268 ));
269 }
270
271 let mut input_path: Option<Path> = None;
272 let mut cache_version_lit: Option<LitInt> = None;
273 let mut bindings: Vec<EventBinding> = Vec::new();
274 let mut seen_events: HashSet<String> = HashSet::new();
275
276 for attr in &batpak_attrs {
277 match classify_batpak_attr(attr)? {
278 BatpakAttrKind::Config {
279 input: attr_input,
280 cache_version: attr_cache,
281 error: attr_error,
282 } => {
283 if let Some(path) = attr_error {
284 return Err(syn::Error::new(
285 path.span(),
286 "`error` is not valid on `#[derive(EventSourced)]` — projections do not have an associated error type",
287 ));
288 }
289 if let Some(path) = attr_input {
290 if input_path.is_some() {
291 return Err(syn::Error::new(
292 path.span(),
293 "duplicate `input =` across `#[batpak(...)]` config attributes — `input` must appear exactly once",
294 ));
295 }
296 input_path = Some(path);
297 }
298 if let Some(lit) = attr_cache {
299 if cache_version_lit.is_some() {
300 return Err(syn::Error::new(
301 lit.span(),
302 "duplicate `cache_version =` across `#[batpak(...)]` config attributes",
303 ));
304 }
305 cache_version_lit = Some(lit);
306 }
307 }
308 BatpakAttrKind::Event(binding) => {
309 require_single_segment_event_path(&binding.event)?;
310 let key = binding.event.to_token_stream_string();
311 if !seen_events.insert(key) {
312 return Err(syn::Error::new(
313 binding.event.span(),
314 "duplicate `event = X` — each payload type may be bound to exactly one handler per projection",
315 ));
316 }
317 bindings.push(binding);
318 }
319 }
320 }
321
322 let input_path = input_path.ok_or_else(|| {
323 syn::Error::new(
324 input.ident.span(),
325 "#[derive(EventSourced)] requires `#[batpak(input = <Lane>)]` — e.g. `input = JsonValueInput` or `input = RawMsgpackInput`",
326 )
327 })?;
328
329 if bindings.is_empty() {
330 return Err(syn::Error::new(
331 input.ident.span(),
332 "`#[derive(EventSourced)]` requires at least one `#[batpak(event = T, handler = h)]` binding",
333 ));
334 }
335
336 let cache_version_value: u64 = match &cache_version_lit {
338 Some(lit) => lit.base10_parse::<u64>()?,
339 None => 0u64,
340 };
341
342 let ident = &input.ident;
344 let (impl_generics, ty_generics, where_clause) = input.generics.split_for_impl();
345
346 let arms: Vec<proc_macro2::TokenStream> = bindings
351 .iter()
352 .map(|b| {
353 let event_ty = &b.event;
354 let handler_fn = &b.handler;
355 quote! {
356 match ::batpak::event::DecodeTyped::route_typed::<#event_ty>(event) {
361 ::core::result::Result::Ok(::core::option::Option::Some(__p)) => {
362 self.#handler_fn(&__p);
363 return;
364 }
365 ::core::result::Result::Ok(::core::option::Option::None) => {}
366 ::core::result::Result::Err(__e) => {
367 ::core::panic!(
368 "EventSourced: decode failed for matched kind {}: {}",
369 ::core::stringify!(#event_ty),
370 __e
371 );
372 }
373 }
374 }
375 })
376 .collect();
377
378 let kind_exprs: Vec<proc_macro2::TokenStream> = bindings
380 .iter()
381 .map(|b| {
382 let event_ty = &b.event;
383 quote! {
384 <#event_ty as ::batpak::event::EventPayload>::KIND
385 }
386 })
387 .collect();
388 let kind_count = bindings.len();
389
390 let handler_checks: Vec<proc_macro2::TokenStream> = bindings
397 .iter()
398 .map(|b| {
399 let event_ty = &b.event;
400 let handler_fn = &b.handler;
401 quote! {
402 let _: fn(&mut Self, &#event_ty) = Self::#handler_fn;
403 }
404 })
405 .collect();
406
407 let input_assertion = {
412 quote! {
413 const _: fn() = || {
414 fn __batpak_assert_projection_input<T: ::batpak::event::ProjectionInput>() {}
415 __batpak_assert_projection_input::<#input_path>();
416 };
417 }
418 };
419
420 Ok(quote! {
421 #input_assertion
422
423 impl #impl_generics ::batpak::event::EventSourced for #ident #ty_generics #where_clause {
424 type Input = #input_path;
425
426 fn from_events(
427 events: &[::batpak::event::ProjectionEvent<Self>],
428 ) -> ::core::option::Option<Self> {
429 if events.is_empty() {
430 return ::core::option::Option::None;
431 }
432 let mut state: Self = ::core::default::Default::default();
433 for __ev in events {
434 state.apply_event(__ev);
435 }
436 ::core::option::Option::Some(state)
437 }
438
439 fn apply_event(&mut self, event: &::batpak::event::ProjectionEvent<Self>) {
440 #(#handler_checks)*
441 #(#arms)*
446 let _ = event;
448 }
449
450 fn relevant_event_kinds() -> &'static [::batpak::event::EventKind] {
451 static KINDS: [::batpak::event::EventKind; #kind_count] = [
452 #(#kind_exprs),*
453 ];
454 &KINDS
455 }
456
457 fn schema_version() -> u64 {
458 #cache_version_value
462 }
463 }
464 })
465}
466
467trait ToTokenStreamString {
468 fn to_token_stream_string(&self) -> String;
469}
470
471impl ToTokenStreamString for Path {
472 fn to_token_stream_string(&self) -> String {
473 quote!(#self).to_string()
474 }
475}
476
477fn require_single_segment_event_path(path: &Path) -> syn::Result<()> {
487 if path.leading_colon.is_some() || path.segments.len() != 1 {
488 return Err(syn::Error::new_spanned(
489 path,
490 "event type must be named by its in-scope single-segment name — use a `use` import if the type is in another module",
491 ));
492 }
493 Ok(())
494}
495
496fn expand_multi_event_reactor(input: &DeriveInput) -> syn::Result<proc_macro2::TokenStream> {
499 match &input.data {
501 Data::Struct(s) => match &s.fields {
502 Fields::Named(_) => {}
503 Fields::Unnamed(f) => {
504 return Err(syn::Error::new(
505 f.span(),
506 "#[derive(MultiEventReactor)] requires a named-field struct; tuple structs are not supported",
507 ));
508 }
509 Fields::Unit => {
510 return Err(syn::Error::new(
511 input.ident.span(),
512 "#[derive(MultiEventReactor)] requires a named-field struct; unit structs are not supported",
513 ));
514 }
515 },
516 Data::Enum(e) => {
517 return Err(syn::Error::new(
518 e.enum_token.span,
519 "#[derive(MultiEventReactor)] requires a named-field struct; enums are not supported",
520 ));
521 }
522 Data::Union(u) => {
523 return Err(syn::Error::new(
524 u.union_token.span,
525 "#[derive(MultiEventReactor)] requires a named-field struct; unions are not supported",
526 ));
527 }
528 }
529
530 let batpak_attrs: Vec<&Attribute> = input
531 .attrs
532 .iter()
533 .filter(|a| a.path().is_ident("batpak"))
534 .collect();
535
536 if batpak_attrs.is_empty() {
537 return Err(syn::Error::new(
538 input.ident.span(),
539 "#[derive(MultiEventReactor)] requires `#[batpak(input = <Lane>)]` plus at least one `#[batpak(event = <Payload>, handler = <fn>)]` attribute",
540 ));
541 }
542
543 let mut input_path: Option<Path> = None;
544 let mut error_path: Option<Path> = None;
545 let mut bindings: Vec<EventBinding> = Vec::new();
546 let mut seen_events: HashSet<String> = HashSet::new();
547
548 for attr in &batpak_attrs {
549 match classify_batpak_attr(attr)? {
550 BatpakAttrKind::Config {
551 input: attr_input,
552 cache_version,
553 error: attr_error,
554 } => {
555 if let Some(lit) = cache_version {
556 return Err(syn::Error::new(
557 lit.span(),
558 "`cache_version` is not valid on `#[derive(MultiEventReactor)]` — \
559 `cache_version` is a projection-cache key, not a reactor setting",
560 ));
561 }
562 if let Some(path) = attr_input {
563 if input_path.is_some() {
564 return Err(syn::Error::new(
565 path.span(),
566 "duplicate `input =` across `#[batpak(...)]` config attributes — `input` must appear exactly once",
567 ));
568 }
569 input_path = Some(path);
570 }
571 if let Some(path) = attr_error {
572 if error_path.is_some() {
573 return Err(syn::Error::new(
574 path.span(),
575 "duplicate `error =` across `#[batpak(...)]` config attributes — `error` must appear exactly once",
576 ));
577 }
578 error_path = Some(path);
579 }
580 }
581 BatpakAttrKind::Event(binding) => {
582 require_single_segment_event_path(&binding.event)?;
583 let key = binding.event.to_token_stream_string();
584 if !seen_events.insert(key) {
585 return Err(syn::Error::new(
586 binding.event.span(),
587 "duplicate `event = X` — each payload type may be bound to exactly one handler per reactor",
588 ));
589 }
590 bindings.push(binding);
591 }
592 }
593 }
594
595 let input_path = input_path.ok_or_else(|| {
596 syn::Error::new(
597 input.ident.span(),
598 "#[derive(MultiEventReactor)] requires `#[batpak(input = <Lane>)]` — e.g. `input = JsonValueInput` or `input = RawMsgpackInput`",
599 )
600 })?;
601 let error_path = error_path.ok_or_else(|| {
602 syn::Error::new(
603 input.ident.span(),
604 "#[derive(MultiEventReactor)] requires `#[batpak(error = <ErrorType>)]` — the shared error type all handlers return",
605 )
606 })?;
607
608 if bindings.is_empty() {
609 return Err(syn::Error::new(
610 input.ident.span(),
611 "#[derive(MultiEventReactor)] requires at least one `#[batpak(event = <Payload>, handler = <fn>)]`",
612 ));
613 }
614
615 let ident = &input.ident;
616 let (impl_generics, ty_generics, where_clause) = input.generics.split_for_impl();
617
618 let kind_exprs: Vec<proc_macro2::TokenStream> = bindings
619 .iter()
620 .map(|b| {
621 let event_ty = &b.event;
622 quote! {
623 <#event_ty as ::batpak::event::EventPayload>::KIND
624 }
625 })
626 .collect();
627 let kind_count = bindings.len();
628
629 let arms: Vec<proc_macro2::TokenStream> = bindings
635 .iter()
636 .map(|b| {
637 let event_ty = &b.event;
638 let handler_fn = &b.handler;
639 quote! {
640 match ::batpak::event::DecodeTyped::route_typed::<#event_ty>(&event.event) {
641 ::core::result::Result::Ok(::core::option::Option::Some(__p)) => {
642 let __typed_event = ::batpak::event::StoredEvent {
643 coordinate: event.coordinate.clone(),
644 event: ::batpak::event::Event {
645 header: event.event.header.clone(),
646 payload: __p,
647 hash_chain: event.event.hash_chain.clone(),
648 },
649 };
650 return self
651 .#handler_fn(&__typed_event, out, at_least_once)
652 .map_err(::batpak::event::MultiDispatchError::User);
653 }
654 ::core::result::Result::Ok(::core::option::Option::None) => {}
655 ::core::result::Result::Err(__e) => {
656 return ::core::result::Result::Err(
657 ::batpak::event::MultiDispatchError::Decode(__e)
658 );
659 }
660 }
661 }
662 })
663 .collect();
664
665 let handler_checks: Vec<proc_macro2::TokenStream> = bindings
671 .iter()
672 .map(|b| {
673 let event_ty = &b.event;
674 let handler_fn = &b.handler;
675 quote! {
676 let _: fn(
677 &mut Self,
678 &::batpak::event::StoredEvent<#event_ty>,
679 &mut ::batpak::store::ReactionBatch,
680 ::core::option::Option<&::batpak::store::AtLeastOnce>,
681 ) -> ::core::result::Result<(), #error_path> = Self::#handler_fn;
682 }
683 })
684 .collect();
685
686 let attr_assertions = {
691 quote! {
692 const _: fn() = || {
693 fn __batpak_assert_projection_input<T: ::batpak::event::ProjectionInput>() {}
694 __batpak_assert_projection_input::<#input_path>();
695 };
696 const _: fn() = || {
697 fn __batpak_assert_error<
698 T: ::core::marker::Send
699 + ::core::marker::Sync
700 + 'static
701 + ::std::error::Error,
702 >() {}
703 __batpak_assert_error::<#error_path>();
704 };
705 }
706 };
707
708 Ok(quote! {
709 #attr_assertions
710
711 impl #impl_generics ::batpak::event::MultiReactive<#input_path>
712 for #ident #ty_generics #where_clause
713 {
714 type Error = #error_path;
715
716 fn relevant_event_kinds() -> &'static [::batpak::event::EventKind] {
717 static KINDS: [::batpak::event::EventKind; #kind_count] = [
718 #(#kind_exprs),*
719 ];
720 &KINDS
721 }
722
723 fn dispatch(
724 &mut self,
725 event: &::batpak::event::StoredEvent<
726 <#input_path as ::batpak::event::ProjectionInput>::Payload,
727 >,
728 out: &mut ::batpak::store::ReactionBatch,
729 at_least_once: ::core::option::Option<&::batpak::store::AtLeastOnce>,
730 ) -> ::core::result::Result<(), ::batpak::event::MultiDispatchError<Self::Error>> {
731 #(#handler_checks)*
732 #(#arms)*
733 ::core::result::Result::Ok(())
735 }
736 }
737 })
738}