1mod event_payload;
8mod operation;
9
10use proc_macro::TokenStream;
11use quote::quote;
12use std::collections::HashSet;
13use syn::{
14 parse_macro_input, spanned::Spanned, Attribute, Data, DeriveInput, Fields, Ident, LitInt, Path,
15};
16
17#[proc_macro_derive(EventPayload, attributes(batpak))]
22pub fn derive_event_payload(input: TokenStream) -> TokenStream {
23 let input = parse_macro_input!(input as DeriveInput);
24 match event_payload::expand(&input) {
25 Ok(ts) => ts.into(),
26 Err(e) => e.to_compile_error().into(),
27 }
28}
29
30#[proc_macro_attribute]
35pub fn operation(attr: TokenStream, item: TokenStream) -> TokenStream {
36 let args = parse_macro_input!(attr as operation::OperationArgs);
37 let function = parse_macro_input!(item as syn::ItemFn);
38 match operation::expand_operation(args, &function) {
39 Ok(tokens) => tokens.into(),
40 Err(error) => error.to_compile_error().into(),
41 }
42}
43
44#[proc_macro_derive(MultiEventReactor, attributes(batpak))]
63pub fn derive_multi_event_reactor(input: TokenStream) -> TokenStream {
64 let input = parse_macro_input!(input as DeriveInput);
65 match expand_multi_event_reactor(&input) {
66 Ok(ts) => ts.into(),
67 Err(e) => e.to_compile_error().into(),
68 }
69}
70
71#[proc_macro_derive(EventSourced, attributes(batpak))]
116pub fn derive_event_sourced(input: TokenStream) -> TokenStream {
117 let input = parse_macro_input!(input as DeriveInput);
118 match expand_event_sourced(&input) {
119 Ok(ts) => ts.into(),
120 Err(e) => e.to_compile_error().into(),
121 }
122}
123
124struct EventBinding {
129 event: Path,
130 handler: Ident,
131}
132
133enum BatpakAttrKind {
139 Config {
140 input: Option<Path>,
141 cache_version: Option<LitInt>,
142 state_max_cardinality: Option<LitInt>,
143 error: Option<Path>,
144 },
145 Event(EventBinding),
146}
147
148#[derive(Default)]
149struct BatpakAttrParts {
150 input: Option<Path>,
151 cache_version: Option<LitInt>,
152 state_max_cardinality: Option<LitInt>,
153 error_ty: Option<Path>,
154 event: Option<Path>,
155 handler: Option<Ident>,
156}
157
158impl BatpakAttrParts {
159 fn set_nested(&mut self, meta: &syn::meta::ParseNestedMeta<'_>) -> syn::Result<()> {
160 let key = meta.path.get_ident().ok_or_else(|| {
161 meta.error("expected `input`, `cache_version`, `state_max_cardinality`, `error`, `event`, or `handler`")
162 })?;
163 let key_name = key.to_string();
164 if self.set_config_nested(key_name.as_str(), meta)? {
165 return Ok(());
166 }
167 if self.set_event_nested(key_name.as_str(), meta)? {
168 return Ok(());
169 }
170 Err(meta.error(format!(
171 "unknown key `{key_name}`, expected `input`, `cache_version`, `state_max_cardinality`, `error`, `event`, or `handler`"
172 )))
173 }
174
175 fn set_config_nested(
176 &mut self,
177 key: &str,
178 meta: &syn::meta::ParseNestedMeta<'_>,
179 ) -> syn::Result<bool> {
180 match key {
181 "input" => {
182 if self.input.is_some() {
183 return Err(meta.error("duplicate `input` key within attribute"));
184 }
185 self.input = Some(meta.value()?.parse::<Path>()?);
186 }
187 "cache_version" => {
188 if self.cache_version.is_some() {
189 return Err(meta.error("duplicate `cache_version` key within attribute"));
190 }
191 self.cache_version = Some(meta.value()?.parse::<LitInt>()?);
192 }
193 "state_max_cardinality" => {
194 if self.state_max_cardinality.is_some() {
195 return Err(
196 meta.error("duplicate `state_max_cardinality` key within attribute")
197 );
198 }
199 self.state_max_cardinality = Some(meta.value()?.parse::<LitInt>()?);
200 }
201 "error" => {
202 if self.error_ty.is_some() {
203 return Err(meta.error("duplicate `error` key within attribute"));
204 }
205 self.error_ty = Some(meta.value()?.parse::<Path>()?);
206 }
207 _ => return Ok(false),
208 }
209 Ok(true)
210 }
211
212 fn set_event_nested(
213 &mut self,
214 key: &str,
215 meta: &syn::meta::ParseNestedMeta<'_>,
216 ) -> syn::Result<bool> {
217 match key {
218 "event" => {
219 if self.event.is_some() {
220 return Err(meta.error("duplicate `event` key within attribute"));
221 }
222 self.event = Some(meta.value()?.parse::<Path>()?);
223 }
224 "handler" => {
225 if self.handler.is_some() {
226 return Err(meta.error("duplicate `handler` key within attribute"));
227 }
228 self.handler = Some(meta.value()?.parse::<Ident>()?);
229 }
230 _ => return Ok(false),
231 }
232 Ok(true)
233 }
234
235 fn finish(self, attr: &Attribute) -> syn::Result<BatpakAttrKind> {
236 let has_config = self.input.is_some()
237 || self.cache_version.is_some()
238 || self.state_max_cardinality.is_some()
239 || self.error_ty.is_some();
240 let has_event = self.event.is_some() || self.handler.is_some();
241
242 if has_config && has_event {
243 return Err(syn::Error::new(
244 attr.span(),
245 "`#[batpak(...)]` attribute must contain either config keys \
246 (`input`, `cache_version`, `state_max_cardinality`, `error`) or an event-binding pair (`event`, `handler`), not both",
247 ));
248 }
249
250 if has_event {
251 let event = self.event.ok_or_else(|| {
252 syn::Error::new(
253 attr.span(),
254 "event-binding attribute is missing `event = <PayloadType>`",
255 )
256 })?;
257 let handler = self.handler.ok_or_else(|| {
258 syn::Error::new(
259 attr.span(),
260 "event-binding attribute is missing `handler = <fn_name>`",
261 )
262 })?;
263 return Ok(BatpakAttrKind::Event(EventBinding { event, handler }));
264 }
265
266 if !has_config {
267 return Err(syn::Error::new(
268 attr.span(),
269 "`#[batpak(...)]` must contain at least one key: `input`, `cache_version`, `state_max_cardinality`, `error`, or the `event`/`handler` pair",
270 ));
271 }
272 Ok(BatpakAttrKind::Config {
273 input: self.input,
274 cache_version: self.cache_version,
275 state_max_cardinality: self.state_max_cardinality,
276 error: self.error_ty,
277 })
278 }
279}
280
281fn classify_batpak_attr(attr: &Attribute) -> syn::Result<BatpakAttrKind> {
282 let mut parts = BatpakAttrParts::default();
283 attr.parse_nested_meta(|meta| parts.set_nested(&meta))?;
284 parts.finish(attr)
285}
286
287fn ensure_named_field_struct(input: &DeriveInput, derive_name: &str) -> syn::Result<()> {
288 match &input.data {
289 Data::Struct(s) => match &s.fields {
290 Fields::Named(_) => Ok(()),
291 Fields::Unnamed(f) => Err(syn::Error::new(
292 f.span(),
293 format!(
294 "#[derive({derive_name})] requires a named-field struct; tuple structs are not supported"
295 ),
296 )),
297 Fields::Unit => Err(syn::Error::new(
298 input.ident.span(),
299 format!(
300 "#[derive({derive_name})] requires a named-field struct; unit structs are not supported"
301 ),
302 )),
303 },
304 Data::Enum(e) => Err(syn::Error::new(
305 e.enum_token.span,
306 format!("#[derive({derive_name})] requires a named-field struct; enums are not supported"),
307 )),
308 Data::Union(u) => Err(syn::Error::new(
309 u.union_token.span,
310 format!(
311 "#[derive({derive_name})] requires a named-field struct; unions are not supported"
312 ),
313 )),
314 }
315}
316
317struct EventSourcedDeriveAttrs {
318 input_path: Path,
319 cache_version_lit: Option<LitInt>,
320 state_max_cardinality_lit: Option<LitInt>,
321 bindings: Vec<EventBinding>,
322}
323
324fn collect_event_sourced_attrs(input: &DeriveInput) -> syn::Result<EventSourcedDeriveAttrs> {
325 let batpak_attrs: Vec<&Attribute> = input
326 .attrs
327 .iter()
328 .filter(|a| a.path().is_ident("batpak"))
329 .collect();
330
331 if batpak_attrs.is_empty() {
332 return Err(syn::Error::new(
333 input.ident.span(),
334 "#[derive(EventSourced)] requires at least one `#[batpak(input = <Lane>)]` attribute",
335 ));
336 }
337
338 let mut input_path: Option<Path> = None;
339 let mut cache_version_lit: Option<LitInt> = None;
340 let mut state_max_cardinality_lit: Option<LitInt> = None;
341 let mut bindings: Vec<EventBinding> = Vec::new();
342 let mut seen_events: HashSet<String> = HashSet::new();
343
344 for attr in &batpak_attrs {
345 match classify_batpak_attr(attr)? {
346 BatpakAttrKind::Config {
347 input: attr_input,
348 cache_version: attr_cache,
349 state_max_cardinality: attr_state_max,
350 error: attr_error,
351 } => {
352 collect_event_sourced_config(
353 &mut input_path,
354 &mut cache_version_lit,
355 &mut state_max_cardinality_lit,
356 attr_input,
357 attr_cache,
358 attr_state_max,
359 attr_error,
360 )?;
361 }
362 BatpakAttrKind::Event(binding) => {
363 collect_unique_event_binding(
364 &mut bindings,
365 &mut seen_events,
366 binding,
367 "projection",
368 )?;
369 }
370 }
371 }
372
373 let input_path = input_path.ok_or_else(|| {
374 syn::Error::new(
375 input.ident.span(),
376 "#[derive(EventSourced)] requires `#[batpak(input = <Lane>)]` — e.g. `input = JsonValueInput` or `input = RawMsgpackInput`",
377 )
378 })?;
379
380 if bindings.is_empty() {
381 return Err(syn::Error::new(
382 input.ident.span(),
383 "`#[derive(EventSourced)]` requires at least one `#[batpak(event = T, handler = h)]` binding",
384 ));
385 }
386
387 Ok(EventSourcedDeriveAttrs {
388 input_path,
389 cache_version_lit,
390 state_max_cardinality_lit,
391 bindings,
392 })
393}
394
395fn collect_event_sourced_config(
396 input_path: &mut Option<Path>,
397 cache_version_lit: &mut Option<LitInt>,
398 state_max_cardinality_lit: &mut Option<LitInt>,
399 attr_input: Option<Path>,
400 attr_cache: Option<LitInt>,
401 attr_state_max: Option<LitInt>,
402 attr_error: Option<Path>,
403) -> syn::Result<()> {
404 if let Some(path) = attr_error {
405 return Err(syn::Error::new(
406 path.span(),
407 "`error` is not valid on `#[derive(EventSourced)]` — projections do not have an associated error type",
408 ));
409 }
410 if let Some(path) = attr_input {
411 if input_path.is_some() {
412 return Err(syn::Error::new(
413 path.span(),
414 "duplicate `input =` across `#[batpak(...)]` config attributes — `input` must appear exactly once",
415 ));
416 }
417 *input_path = Some(path);
418 }
419 if let Some(lit) = attr_cache {
420 if cache_version_lit.is_some() {
421 return Err(syn::Error::new(
422 lit.span(),
423 "duplicate `cache_version =` across `#[batpak(...)]` config attributes",
424 ));
425 }
426 *cache_version_lit = Some(lit);
427 }
428 if let Some(lit) = attr_state_max {
429 if state_max_cardinality_lit.is_some() {
430 return Err(syn::Error::new(
431 lit.span(),
432 "duplicate `state_max_cardinality =` across `#[batpak(...)]` config attributes",
433 ));
434 }
435 *state_max_cardinality_lit = Some(lit);
436 }
437 Ok(())
438}
439
440fn collect_unique_event_binding(
441 bindings: &mut Vec<EventBinding>,
442 seen_events: &mut HashSet<String>,
443 binding: EventBinding,
444 owner: &str,
445) -> syn::Result<()> {
446 require_single_segment_event_path(&binding.event)?;
447 let key = binding.event.to_token_stream_string();
448 if !seen_events.insert(key) {
449 return Err(syn::Error::new(
450 binding.event.span(),
451 format!(
452 "duplicate `event = X` — each payload type may be bound to exactly one handler per {owner}"
453 ),
454 ));
455 }
456 bindings.push(binding);
457 Ok(())
458}
459
460fn expand_event_sourced(input: &DeriveInput) -> syn::Result<proc_macro2::TokenStream> {
461 ensure_named_field_struct(input, "EventSourced")?;
462
463 let attrs = collect_event_sourced_attrs(input)?;
464 let input_path = attrs.input_path;
465 let cache_version_lit = attrs.cache_version_lit;
466 let state_max_cardinality_lit = attrs.state_max_cardinality_lit;
467 let bindings = attrs.bindings;
468
469 let cache_version_value: u64 = match &cache_version_lit {
471 Some(lit) => lit.base10_parse::<u64>()?,
472 None => 0u64,
473 };
474
475 let ident = &input.ident;
477 let (impl_generics, ty_generics, where_clause) = input.generics.split_for_impl();
478
479 let state_contract_impl = match &state_max_cardinality_lit {
480 Some(lit) => {
481 let state_max_cardinality_value = lit.base10_parse::<u64>()?;
482 if state_max_cardinality_value != 1 {
487 return Err(syn::Error::new_spanned(
488 lit,
489 "#[derive(EventSourced)] supports only single-aggregate state (n = 1); \
490 implement EventSourced by hand with a real `state_extent()` for multi-key state",
491 ));
492 }
493 quote! {
494 const STATE_CONTRACT: ::batpak::event::ProjectionStateContract =
495 ::batpak::event::ProjectionStateContract::Bounded {
496 key_space: ::core::concat!(
497 ::core::module_path!(),
498 "::",
499 ::core::stringify!(#ident)
500 ),
501 max_cardinality: #state_max_cardinality_value,
502 retention_policy: "derive-event-sourced-state-object",
503 compaction_policy: "projection-cache-overwrite",
504 checkpoint_policy: "projection-cache",
505 };
506
507 fn state_extent(&self) -> ::batpak::event::StateExtent {
508 let _ = self;
509 ::batpak::event::StateExtent::cardinality(
510 1,
511 ::batpak::event::StateExtentCost::ConstantTime,
512 )
513 }
514 }
515 }
516 None => quote! {},
517 };
518
519 let arms: Vec<proc_macro2::TokenStream> = bindings
524 .iter()
525 .map(|b| {
526 let event_ty = &b.event;
527 let handler_fn = &b.handler;
528 quote! {
529 match ::batpak::event::DecodeTyped::route_typed::<#event_ty>(event) {
534 ::core::result::Result::Ok(::core::option::Option::Some(__p)) => {
535 self.#handler_fn(&__p);
536 return;
537 }
538 ::core::result::Result::Ok(::core::option::Option::None) => {}
539 ::core::result::Result::Err(__e) => {
540 ::core::panic!(
541 "EventSourced: decode failed for matched kind {}: {}",
542 ::core::stringify!(#event_ty),
543 __e
544 );
545 }
546 }
547 }
548 })
549 .collect();
550
551 let kind_exprs: Vec<proc_macro2::TokenStream> = bindings
553 .iter()
554 .map(|b| {
555 let event_ty = &b.event;
556 quote! {
557 <#event_ty as ::batpak::event::EventPayload>::KIND
558 }
559 })
560 .collect();
561 let kind_count = bindings.len();
562
563 let handler_checks: Vec<proc_macro2::TokenStream> = bindings
570 .iter()
571 .map(|b| {
572 let event_ty = &b.event;
573 let handler_fn = &b.handler;
574 quote! {
575 let _: fn(&mut Self, &#event_ty) = Self::#handler_fn;
576 }
577 })
578 .collect();
579
580 let input_assertion = {
585 quote! {
586 const _: fn() = || {
587 fn __batpak_assert_projection_input<T: ::batpak::event::ProjectionInput>() {}
588 __batpak_assert_projection_input::<#input_path>();
589 };
590 }
591 };
592
593 Ok(quote! {
594 #input_assertion
595
596 impl #impl_generics ::batpak::event::EventSourced for #ident #ty_generics #where_clause {
597 type Input = #input_path;
598
599 #state_contract_impl
600
601 fn from_events(
602 events: &[::batpak::event::ProjectionEvent<Self>],
603 ) -> ::core::option::Option<Self> {
604 if events.is_empty() {
605 return ::core::option::Option::None;
606 }
607 let mut state: Self = ::core::default::Default::default();
608 for __ev in events {
609 state.apply_event(__ev);
610 }
611 ::core::option::Option::Some(state)
612 }
613
614 fn apply_event(&mut self, event: &::batpak::event::ProjectionEvent<Self>) {
615 #(#handler_checks)*
616 #(#arms)*
621 let _ = event;
623 }
624
625 fn relevant_event_kinds() -> &'static [::batpak::event::EventKind] {
626 static KINDS: [::batpak::event::EventKind; #kind_count] = [
627 #(#kind_exprs),*
628 ];
629 &KINDS
630 }
631
632 fn schema_version() -> u64 {
633 #cache_version_value
637 }
638 }
639 })
640}
641
642trait ToTokenStreamString {
643 fn to_token_stream_string(&self) -> String;
644}
645
646impl ToTokenStreamString for Path {
647 fn to_token_stream_string(&self) -> String {
648 quote!(#self).to_string()
649 }
650}
651
652fn require_single_segment_event_path(path: &Path) -> syn::Result<()> {
662 if path.leading_colon.is_some() || path.segments.len() != 1 {
663 return Err(syn::Error::new_spanned(
664 path,
665 "event type must be named by its in-scope single-segment name — use a `use` import if the type is in another module",
666 ));
667 }
668 Ok(())
669}
670
671fn expand_multi_event_reactor(input: &DeriveInput) -> syn::Result<proc_macro2::TokenStream> {
674 ensure_named_field_struct(input, "MultiEventReactor")?;
675
676 let batpak_attrs: Vec<&Attribute> = input
677 .attrs
678 .iter()
679 .filter(|a| a.path().is_ident("batpak"))
680 .collect();
681
682 if batpak_attrs.is_empty() {
683 return Err(syn::Error::new(
684 input.ident.span(),
685 "#[derive(MultiEventReactor)] requires `#[batpak(input = <Lane>)]` plus at least one `#[batpak(event = <Payload>, handler = <fn>)]` attribute",
686 ));
687 }
688
689 let mut input_path: Option<Path> = None;
690 let mut error_path: Option<Path> = None;
691 let mut bindings: Vec<EventBinding> = Vec::new();
692 let mut seen_events: HashSet<String> = HashSet::new();
693
694 for attr in &batpak_attrs {
695 match classify_batpak_attr(attr)? {
696 BatpakAttrKind::Config {
697 input: attr_input,
698 cache_version,
699 state_max_cardinality,
700 error: attr_error,
701 } => {
702 if let Some(lit) = cache_version {
703 return Err(syn::Error::new(
704 lit.span(),
705 "`cache_version` is not valid on `#[derive(MultiEventReactor)]` — \
706 `cache_version` is a projection-cache key, not a reactor setting",
707 ));
708 }
709 if let Some(lit) = state_max_cardinality {
710 return Err(syn::Error::new(
711 lit.span(),
712 "`state_max_cardinality` is not valid on `#[derive(MultiEventReactor)]` — \
713 state cardinality is a projection contract, not a reactor setting",
714 ));
715 }
716 if let Some(path) = attr_input {
717 if input_path.is_some() {
718 return Err(syn::Error::new(
719 path.span(),
720 "duplicate `input =` across `#[batpak(...)]` config attributes — `input` must appear exactly once",
721 ));
722 }
723 input_path = Some(path);
724 }
725 if let Some(path) = attr_error {
726 if error_path.is_some() {
727 return Err(syn::Error::new(
728 path.span(),
729 "duplicate `error =` across `#[batpak(...)]` config attributes — `error` must appear exactly once",
730 ));
731 }
732 error_path = Some(path);
733 }
734 }
735 BatpakAttrKind::Event(binding) => {
736 collect_unique_event_binding(&mut bindings, &mut seen_events, binding, "reactor")?;
737 }
738 }
739 }
740
741 let input_path = input_path.ok_or_else(|| {
742 syn::Error::new(
743 input.ident.span(),
744 "#[derive(MultiEventReactor)] requires `#[batpak(input = <Lane>)]` — e.g. `input = JsonValueInput` or `input = RawMsgpackInput`",
745 )
746 })?;
747 let error_path = error_path.ok_or_else(|| {
748 syn::Error::new(
749 input.ident.span(),
750 "#[derive(MultiEventReactor)] requires `#[batpak(error = <ErrorType>)]` — the shared error type all handlers return",
751 )
752 })?;
753
754 if bindings.is_empty() {
755 return Err(syn::Error::new(
756 input.ident.span(),
757 "#[derive(MultiEventReactor)] requires at least one `#[batpak(event = <Payload>, handler = <fn>)]`",
758 ));
759 }
760
761 let ident = &input.ident;
762 let (impl_generics, ty_generics, where_clause) = input.generics.split_for_impl();
763
764 let kind_exprs: Vec<proc_macro2::TokenStream> = bindings
765 .iter()
766 .map(|b| {
767 let event_ty = &b.event;
768 quote! {
769 <#event_ty as ::batpak::event::EventPayload>::KIND
770 }
771 })
772 .collect();
773 let kind_count = bindings.len();
774
775 let arms: Vec<proc_macro2::TokenStream> = bindings
781 .iter()
782 .map(|b| {
783 let event_ty = &b.event;
784 let handler_fn = &b.handler;
785 quote! {
786 match ::batpak::event::DecodeTyped::route_typed::<#event_ty>(&event.event) {
787 ::core::result::Result::Ok(::core::option::Option::Some(__p)) => {
788 let __typed_event = ::batpak::event::StoredEvent {
789 coordinate: event.coordinate.clone(),
790 event: ::batpak::event::Event {
791 header: event.event.header.clone(),
792 payload: __p,
793 hash_chain: event.event.hash_chain.clone(),
794 },
795 };
796 return self
797 .#handler_fn(&__typed_event, out, at_least_once)
798 .map_err(::batpak::event::MultiDispatchError::User);
799 }
800 ::core::result::Result::Ok(::core::option::Option::None) => {}
801 ::core::result::Result::Err(__e) => {
802 return ::core::result::Result::Err(
803 ::batpak::event::MultiDispatchError::Decode(__e)
804 );
805 }
806 }
807 }
808 })
809 .collect();
810
811 let handler_checks: Vec<proc_macro2::TokenStream> = bindings
817 .iter()
818 .map(|b| {
819 let event_ty = &b.event;
820 let handler_fn = &b.handler;
821 quote! {
822 let _: fn(
823 &mut Self,
824 &::batpak::event::StoredEvent<#event_ty>,
825 &mut ::batpak::store::ReactionBatch,
826 ::core::option::Option<&::batpak::store::AtLeastOnce>,
827 ) -> ::core::result::Result<(), #error_path> = Self::#handler_fn;
828 }
829 })
830 .collect();
831
832 let attr_assertions = {
837 quote! {
838 const _: fn() = || {
839 fn __batpak_assert_projection_input<T: ::batpak::event::ProjectionInput>() {}
840 __batpak_assert_projection_input::<#input_path>();
841 };
842 const _: fn() = || {
843 fn __batpak_assert_error<
844 T: ::core::marker::Send
845 + ::core::marker::Sync
846 + 'static
847 + ::std::error::Error,
848 >() {}
849 __batpak_assert_error::<#error_path>();
850 };
851 }
852 };
853
854 Ok(quote! {
855 #attr_assertions
856
857 impl #impl_generics ::batpak::event::MultiReactive<#input_path>
858 for #ident #ty_generics #where_clause
859 {
860 type Error = #error_path;
861
862 fn relevant_event_kinds() -> &'static [::batpak::event::EventKind] {
863 static KINDS: [::batpak::event::EventKind; #kind_count] = [
864 #(#kind_exprs),*
865 ];
866 &KINDS
867 }
868
869 fn dispatch(
870 &mut self,
871 event: &::batpak::event::StoredEvent<
872 <#input_path as ::batpak::event::ProjectionInput>::Payload,
873 >,
874 out: &mut ::batpak::store::ReactionBatch,
875 at_least_once: ::core::option::Option<&::batpak::store::AtLeastOnce>,
876 ) -> ::core::result::Result<(), ::batpak::event::MultiDispatchError<Self::Error>> {
877 #(#handler_checks)*
878 #(#arms)*
879 ::core::result::Result::Ok(())
881 }
882 }
883 })
884}