cloacina_macros/lib.rs
1/*
2 * Copyright 2025-2026 Colliery Software
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#![allow(unexpected_cfgs)]
18//! # Cloacina Macros
19//!
20//! Procedural macros for defining tasks and workflows in the Cloacina framework.
21//!
22//! ## Key Features
23//!
24//! - `#[task]` — define tasks with retry policies and trigger rules
25//! - `#[workflow]` — define workflows as modules containing `#[task]` functions
26//! - Compile-time validation of task dependencies and workflow structure
27//! - Automatic task and workflow registration
28//! - Code fingerprinting for task versioning
29//!
30//! ## Example
31//!
32//! ```rust,ignore
33//! use cloacina::{task, workflow, Context, TaskError};
34//!
35//! #[workflow(name = "my_pipeline", description = "Process data")]
36//! pub mod my_pipeline {
37//! use super::*;
38//!
39//! #[task(id = "fetch", dependencies = [])]
40//! pub async fn fetch(ctx: &mut Context<Value>) -> Result<(), TaskError> { Ok(()) }
41//!
42//! #[task(id = "process", dependencies = ["fetch"])]
43//! pub async fn process(ctx: &mut Context<Value>) -> Result<(), TaskError> { Ok(()) }
44//! }
45//! ```
46
47pub(crate) mod computation_graph;
48pub(crate) mod packaged_workflow;
49mod registry;
50pub(crate) mod tasks;
51mod trigger_attr;
52mod workflow_attr;
53
54use proc_macro::TokenStream;
55
56/// Define a task with retry policies and trigger rules.
57#[proc_macro_attribute]
58pub fn task(args: TokenStream, input: TokenStream) -> TokenStream {
59 tasks::task(args, input)
60}
61
62/// Define a workflow as a module containing `#[task]` functions.
63///
64/// Applied to a `pub mod` containing `#[task]` functions. Auto-discovers tasks,
65/// validates dependencies, and generates registration code based on delivery mode:
66///
67/// - **Embedded** (default): `#[ctor]` auto-registration
68/// - **Packaged** (`features = ["packaged"]`): FFI exports for `.cloacina` packages
69///
70/// # Example
71///
72/// ```rust,ignore
73/// #[workflow(name = "my_pipeline", description = "Process data")]
74/// pub mod my_pipeline {
75/// use super::*;
76///
77/// #[task(id = "fetch", dependencies = [])]
78/// pub async fn fetch(ctx: &mut Context<Value>) -> Result<(), TaskError> { Ok(()) }
79///
80/// #[task(id = "process", dependencies = ["fetch"])]
81/// pub async fn process(ctx: &mut Context<Value>) -> Result<(), TaskError> { Ok(()) }
82/// }
83/// ```
84#[proc_macro_attribute]
85pub fn workflow(args: TokenStream, input: TokenStream) -> TokenStream {
86 workflow_attr::workflow_attr(args, input)
87}
88
89/// Define a trigger that fires a workflow on a schedule or condition.
90///
91/// # Custom poll trigger
92///
93/// ```rust,ignore
94/// #[trigger(on = "my_workflow", poll_interval = "5s")]
95/// pub async fn check_inbox() -> Result<TriggerResult, TriggerError> {
96/// // check condition, return Fire(ctx) or Skip
97/// }
98/// ```
99///
100/// # Cron trigger (T-0305)
101///
102/// ```rust,ignore
103/// #[trigger(on = "my_workflow", cron = "0 2 * * *", timezone = "UTC")]
104/// ```
105#[proc_macro_attribute]
106pub fn trigger(args: TokenStream, input: TokenStream) -> TokenStream {
107 trigger_attr::trigger_attr(args, input)
108}
109
110/// Define a computation graph as a module containing async node functions.
111///
112/// The topology is declared in the macro attribute. Nodes are pure async functions
113/// within the module. The macro compiles the topology into a single async function
114/// with nested match arms for enum routing.
115///
116/// # Example
117///
118/// ```rust,ignore
119/// #[computation_graph(
120/// react = when_any(alpha, beta),
121/// graph = {
122/// decision(alpha, beta) => {
123/// Signal -> output_handler,
124/// NoAction -> audit_logger,
125/// },
126/// }
127/// )]
128/// mod my_strategy {
129/// async fn decision(alpha: Option<&AlphaData>, beta: Option<&BetaData>) -> DecisionOutcome { ... }
130/// async fn output_handler(signal: &Signal) -> OutputConfirmation { ... }
131/// async fn audit_logger(reason: &NoActionReason) -> AuditRecord { ... }
132/// }
133/// ```
134#[proc_macro_attribute]
135pub fn computation_graph(args: TokenStream, input: TokenStream) -> TokenStream {
136 computation_graph::computation_graph_attr(args, input)
137}
138
139/// Define a passthrough accumulator (socket-only, no event loop).
140///
141/// ```rust,ignore
142/// #[passthrough_accumulator]
143/// fn beta(event: PricingUpdate) -> BetaData {
144/// BetaData { estimate: event.mid_price }
145/// }
146/// ```
147#[proc_macro_attribute]
148pub fn passthrough_accumulator(args: TokenStream, input: TokenStream) -> TokenStream {
149 match computation_graph::accumulator_macros::passthrough_accumulator_impl(
150 args.into(),
151 input.into(),
152 ) {
153 Ok(output) => output.into(),
154 Err(err) => err.to_compile_error().into(),
155 }
156}
157
158/// Define a stream-backed accumulator.
159///
160/// ```rust,ignore
161/// #[stream_accumulator(type = "kafka", topic = "market.orderbook")]
162/// fn alpha(event: OrderBookUpdate) -> AlphaData {
163/// AlphaData { top_high: event.best_ask, top_low: event.best_bid }
164/// }
165/// ```
166#[proc_macro_attribute]
167pub fn stream_accumulator(args: TokenStream, input: TokenStream) -> TokenStream {
168 match computation_graph::accumulator_macros::stream_accumulator_impl(args.into(), input.into())
169 {
170 Ok(output) => output.into(),
171 Err(err) => err.to_compile_error().into(),
172 }
173}
174
175/// Define a batch accumulator (buffers events, flushes on timer or size threshold).
176///
177/// ```rust,ignore
178/// #[batch_accumulator(flush_interval = "1s", max_buffer_size = 100)]
179/// fn aggregate_fills(events: Vec<FillEvent>) -> Option<AggregatedFills> {
180/// if events.is_empty() { return None; }
181/// Some(AggregatedFills { total: events.len(), volume: events.iter().map(|e| e.qty).sum() })
182/// }
183/// ```
184#[proc_macro_attribute]
185pub fn batch_accumulator(args: TokenStream, input: TokenStream) -> TokenStream {
186 match computation_graph::accumulator_macros::batch_accumulator_impl(args.into(), input.into()) {
187 Ok(output) => output.into(),
188 Err(err) => err.to_compile_error().into(),
189 }
190}
191
192/// Define a polling accumulator (timer-based, queries pull-based sources).
193///
194/// ```rust,ignore
195/// #[polling_accumulator(interval = "5s")]
196/// async fn config_source() -> Option<ConfigData> {
197/// let data = fetch_config().await;
198/// if data.changed() { Some(data) } else { None }
199/// }
200/// ```
201#[proc_macro_attribute]
202pub fn polling_accumulator(args: TokenStream, input: TokenStream) -> TokenStream {
203 match computation_graph::accumulator_macros::polling_accumulator_impl(args.into(), input.into())
204 {
205 Ok(output) => output.into(),
206 Err(err) => err.to_compile_error().into(),
207 }
208}
209
210/// Define a state accumulator (bounded history buffer with DAL persistence).
211///
212/// ```rust,ignore
213/// #[state_accumulator(capacity = 10)]
214/// fn previous_outputs() -> VecDeque<DecisionOutput>;
215/// ```
216#[proc_macro_attribute]
217pub fn state_accumulator(args: TokenStream, input: TokenStream) -> TokenStream {
218 match computation_graph::accumulator_macros::state_accumulator_impl(args.into(), input.into()) {
219 Ok(output) => output.into(),
220 Err(err) => err.to_compile_error().into(),
221 }
222}