Skip to main content

cloacina_macros/
lib.rs

1/*
2 *  Copyright 2025-2026 Colliery Software
3 *
4 *  Licensed under the Apache License, Version 2.0 (the "License");
5 *  you may not use this file except in compliance with the License.
6 *  You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 *  Unless required by applicable law or agreed to in writing, software
11 *  distributed under the License is distributed on an "AS IS" BASIS,
12 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 *  See the License for the specific language governing permissions and
14 *  limitations under the License.
15 */
16
17#![allow(unexpected_cfgs)]
18//! # Cloacina Macros
19//!
20//! Procedural macros for defining tasks and workflows in the Cloacina framework.
21//!
22//! ## Key Features
23//!
24//! - `#[task]` — define tasks with retry policies and trigger rules
25//! - `#[workflow]` — define workflows as modules containing `#[task]` functions
26//! - Compile-time validation of task dependencies and workflow structure
27//! - Automatic task and workflow registration
28//! - Code fingerprinting for task versioning
29//!
30//! ## Example
31//!
32//! ```rust,ignore
33//! use cloacina::{task, workflow, Context, TaskError};
34//!
35//! #[workflow(name = "my_pipeline", description = "Process data")]
36//! pub mod my_pipeline {
37//!     use super::*;
38//!
39//!     #[task(id = "fetch", dependencies = [])]
40//!     pub async fn fetch(ctx: &mut Context<Value>) -> Result<(), TaskError> { Ok(()) }
41//!
42//!     #[task(id = "process", dependencies = ["fetch"])]
43//!     pub async fn process(ctx: &mut Context<Value>) -> Result<(), TaskError> { Ok(()) }
44//! }
45//! ```
46
47pub(crate) mod computation_graph;
48pub(crate) mod packaged_workflow;
49mod registry;
50pub(crate) mod tasks;
51mod trigger_attr;
52mod workflow_attr;
53
54use proc_macro::TokenStream;
55
56/// Define a task with retry policies and trigger rules.
57#[proc_macro_attribute]
58pub fn task(args: TokenStream, input: TokenStream) -> TokenStream {
59    tasks::task(args, input)
60}
61
62/// Define a workflow as a module containing `#[task]` functions.
63///
64/// Applied to a `pub mod` containing `#[task]` functions. Auto-discovers tasks,
65/// validates dependencies, and generates registration code based on delivery mode:
66///
67/// - **Embedded** (default): `#[ctor]` auto-registration
68/// - **Packaged** (`features = ["packaged"]`): FFI exports for `.cloacina` packages
69///
70/// # Example
71///
72/// ```rust,ignore
73/// #[workflow(name = "my_pipeline", description = "Process data")]
74/// pub mod my_pipeline {
75///     use super::*;
76///
77///     #[task(id = "fetch", dependencies = [])]
78///     pub async fn fetch(ctx: &mut Context<Value>) -> Result<(), TaskError> { Ok(()) }
79///
80///     #[task(id = "process", dependencies = ["fetch"])]
81///     pub async fn process(ctx: &mut Context<Value>) -> Result<(), TaskError> { Ok(()) }
82/// }
83/// ```
84#[proc_macro_attribute]
85pub fn workflow(args: TokenStream, input: TokenStream) -> TokenStream {
86    workflow_attr::workflow_attr(args, input)
87}
88
89/// Define a trigger that fires a workflow on a schedule or condition.
90///
91/// # Custom poll trigger
92///
93/// ```rust,ignore
94/// #[trigger(on = "my_workflow", poll_interval = "5s")]
95/// pub async fn check_inbox() -> Result<TriggerResult, TriggerError> {
96///     // check condition, return Fire(ctx) or Skip
97/// }
98/// ```
99///
100/// # Cron trigger (T-0305)
101///
102/// ```rust,ignore
103/// #[trigger(on = "my_workflow", cron = "0 2 * * *", timezone = "UTC")]
104/// ```
105#[proc_macro_attribute]
106pub fn trigger(args: TokenStream, input: TokenStream) -> TokenStream {
107    trigger_attr::trigger_attr(args, input)
108}
109
110/// Define a computation graph as a module containing async node functions.
111///
112/// The topology is declared in the macro attribute. Nodes are pure async functions
113/// within the module. The macro compiles the topology into a single async function
114/// with nested match arms for enum routing.
115///
116/// # Example
117///
118/// ```rust,ignore
119/// #[computation_graph(
120///     react = when_any(alpha, beta),
121///     graph = {
122///         decision(alpha, beta) => {
123///             Signal -> output_handler,
124///             NoAction -> audit_logger,
125///         },
126///     }
127/// )]
128/// mod my_strategy {
129///     async fn decision(alpha: Option<&AlphaData>, beta: Option<&BetaData>) -> DecisionOutcome { ... }
130///     async fn output_handler(signal: &Signal) -> OutputConfirmation { ... }
131///     async fn audit_logger(reason: &NoActionReason) -> AuditRecord { ... }
132/// }
133/// ```
134#[proc_macro_attribute]
135pub fn computation_graph(args: TokenStream, input: TokenStream) -> TokenStream {
136    computation_graph::computation_graph_attr(args, input)
137}
138
139/// Define a passthrough accumulator (socket-only, no event loop).
140///
141/// ```rust,ignore
142/// #[passthrough_accumulator]
143/// fn beta(event: PricingUpdate) -> BetaData {
144///     BetaData { estimate: event.mid_price }
145/// }
146/// ```
147#[proc_macro_attribute]
148pub fn passthrough_accumulator(args: TokenStream, input: TokenStream) -> TokenStream {
149    match computation_graph::accumulator_macros::passthrough_accumulator_impl(
150        args.into(),
151        input.into(),
152    ) {
153        Ok(output) => output.into(),
154        Err(err) => err.to_compile_error().into(),
155    }
156}
157
158/// Define a stream-backed accumulator.
159///
160/// ```rust,ignore
161/// #[stream_accumulator(type = "kafka", topic = "market.orderbook")]
162/// fn alpha(event: OrderBookUpdate) -> AlphaData {
163///     AlphaData { top_high: event.best_ask, top_low: event.best_bid }
164/// }
165/// ```
166#[proc_macro_attribute]
167pub fn stream_accumulator(args: TokenStream, input: TokenStream) -> TokenStream {
168    match computation_graph::accumulator_macros::stream_accumulator_impl(args.into(), input.into())
169    {
170        Ok(output) => output.into(),
171        Err(err) => err.to_compile_error().into(),
172    }
173}
174
175/// Define a batch accumulator (buffers events, flushes on timer or size threshold).
176///
177/// ```rust,ignore
178/// #[batch_accumulator(flush_interval = "1s", max_buffer_size = 100)]
179/// fn aggregate_fills(events: Vec<FillEvent>) -> Option<AggregatedFills> {
180///     if events.is_empty() { return None; }
181///     Some(AggregatedFills { total: events.len(), volume: events.iter().map(|e| e.qty).sum() })
182/// }
183/// ```
184#[proc_macro_attribute]
185pub fn batch_accumulator(args: TokenStream, input: TokenStream) -> TokenStream {
186    match computation_graph::accumulator_macros::batch_accumulator_impl(args.into(), input.into()) {
187        Ok(output) => output.into(),
188        Err(err) => err.to_compile_error().into(),
189    }
190}
191
192/// Define a polling accumulator (timer-based, queries pull-based sources).
193///
194/// ```rust,ignore
195/// #[polling_accumulator(interval = "5s")]
196/// async fn config_source() -> Option<ConfigData> {
197///     let data = fetch_config().await;
198///     if data.changed() { Some(data) } else { None }
199/// }
200/// ```
201#[proc_macro_attribute]
202pub fn polling_accumulator(args: TokenStream, input: TokenStream) -> TokenStream {
203    match computation_graph::accumulator_macros::polling_accumulator_impl(args.into(), input.into())
204    {
205        Ok(output) => output.into(),
206        Err(err) => err.to_compile_error().into(),
207    }
208}
209
210/// Define a state accumulator (bounded history buffer with DAL persistence).
211///
212/// ```rust,ignore
213/// #[state_accumulator(capacity = 10)]
214/// fn previous_outputs() -> VecDeque<DecisionOutput>;
215/// ```
216#[proc_macro_attribute]
217pub fn state_accumulator(args: TokenStream, input: TokenStream) -> TokenStream {
218    match computation_graph::accumulator_macros::state_accumulator_impl(args.into(), input.into()) {
219        Ok(output) => output.into(),
220        Err(err) => err.to_compile_error().into(),
221    }
222}