cloacina_computation_graph/lib.rs
1/*
2 * Copyright 2026 Colliery Software
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//! Core types for Cloacina computation graph plugins.
18//!
19//! This crate contains the types that packaged computation graph cdylibs need
20//! at compile time. It is the computation-graph equivalent of `cloacina-workflow`
21//! — a thin crate that avoids pulling in the full engine.
22//!
23//! The `#[computation_graph]` macro expands into code that references types from
24//! this crate. Embedded-mode users get these types re-exported from `cloacina`.
25
26use serde::de::DeserializeOwned;
27use serde::{Deserialize, Serialize};
28use std::any::Any;
29use std::collections::HashMap;
30use std::fmt;
31use std::future::Future;
32use std::pin::Pin;
33use std::sync::Arc;
34
35// ---------------------------------------------------------------------------
36// SourceName
37// ---------------------------------------------------------------------------
38
39/// Identifies an accumulator source by name.
40#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
41pub struct SourceName(pub String);
42
43impl SourceName {
44 pub fn new(name: impl Into<String>) -> Self {
45 Self(name.into())
46 }
47
48 pub fn as_str(&self) -> &str {
49 &self.0
50 }
51}
52
53impl fmt::Display for SourceName {
54 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
55 write!(f, "{}", self.0)
56 }
57}
58
59impl From<&str> for SourceName {
60 fn from(s: &str) -> Self {
61 Self(s.to_string())
62 }
63}
64
65impl From<String> for SourceName {
66 fn from(s: String) -> Self {
67 Self(s)
68 }
69}
70
71// ---------------------------------------------------------------------------
72// Serialization helpers (bincode wire format)
73// ---------------------------------------------------------------------------
74
75/// Serialize a value to bincode bytes.
76///
77/// Bincode is used for all internal wire formats (boundary channels,
78/// checkpoint persistence, accumulator-to-reactor messaging).
79pub fn serialize<T: Serialize>(value: &T) -> Result<Vec<u8>, GraphError> {
80 bincode::serialize(value).map_err(|e| GraphError::Serialization(e.to_string()))
81}
82
83/// Deserialize bincode bytes to a value.
84pub fn deserialize<T: DeserializeOwned>(bytes: &[u8]) -> Result<T, GraphError> {
85 bincode::deserialize(bytes).map_err(|e| GraphError::Deserialization(e.to_string()))
86}
87
88// ---------------------------------------------------------------------------
89// InputCache
90// ---------------------------------------------------------------------------
91
92/// The input cache holds the last-seen serialized boundary per source.
93///
94/// The reactor's receiver task updates this cache continuously. The executor
95/// takes a snapshot before calling the compiled graph function.
96///
97/// Serialization format: bincode (compact binary). The FFI packaging bridge
98/// converts bincode→JSON at the boundary for plugin compatibility.
99#[derive(Debug, Clone)]
100pub struct InputCache {
101 entries: HashMap<SourceName, Vec<u8>>,
102}
103
104impl InputCache {
105 pub fn new() -> Self {
106 Self {
107 entries: HashMap::new(),
108 }
109 }
110
111 /// Update the cached value for a source.
112 pub fn update(&mut self, source: SourceName, bytes: Vec<u8>) {
113 self.entries.insert(source, bytes);
114 }
115
116 /// Get and deserialize a cached value by source name.
117 pub fn get<T: DeserializeOwned>(&self, name: &str) -> Option<Result<T, GraphError>> {
118 let bytes = self.entries.get(&SourceName::new(name))?;
119 Some(deserialize::<T>(bytes))
120 }
121
122 /// Check if a source has an entry in the cache.
123 pub fn has(&self, name: &str) -> bool {
124 self.entries.contains_key(&SourceName::new(name))
125 }
126
127 /// Get the raw bytes for a source.
128 pub fn get_raw(&self, name: &str) -> Option<&[u8]> {
129 self.entries
130 .get(&SourceName::new(name))
131 .map(|v| v.as_slice())
132 }
133
134 /// Create a snapshot (clone) of the cache.
135 pub fn snapshot(&self) -> InputCache {
136 self.clone()
137 }
138
139 /// Number of sources in the cache.
140 pub fn len(&self) -> usize {
141 self.entries.len()
142 }
143
144 /// Whether the cache is empty.
145 pub fn is_empty(&self) -> bool {
146 self.entries.is_empty()
147 }
148
149 /// Replace all entries.
150 pub fn replace_all(&mut self, other: InputCache) {
151 self.entries = other.entries;
152 }
153
154 /// List all source names in the cache.
155 pub fn sources(&self) -> Vec<&SourceName> {
156 self.entries.keys().collect()
157 }
158
159 /// Get a reference to the raw entries map.
160 pub fn entries_raw(&self) -> &HashMap<SourceName, Vec<u8>> {
161 &self.entries
162 }
163
164 /// Return entries as a JSON-friendly map.
165 pub fn entries_as_json(&self) -> HashMap<String, String> {
166 self.entries
167 .iter()
168 .map(|(name, bytes)| {
169 let value = if cfg!(debug_assertions) {
170 serde_json::from_slice::<serde_json::Value>(bytes)
171 .map(|v| v.to_string())
172 .unwrap_or_else(|_| hex_encode(bytes))
173 } else {
174 hex_encode(bytes)
175 };
176 (name.as_str().to_string(), value)
177 })
178 .collect()
179 }
180}
181
182impl Default for InputCache {
183 fn default() -> Self {
184 Self::new()
185 }
186}
187
188fn hex_encode(bytes: &[u8]) -> String {
189 bytes.iter().map(|b| format!("{:02x}", b)).collect()
190}
191
192// ---------------------------------------------------------------------------
193// GraphResult / GraphError
194// ---------------------------------------------------------------------------
195
196/// Result of executing a compiled computation graph.
197#[derive(Debug)]
198pub enum GraphResult {
199 /// Graph executed to completion. Contains terminal node outputs.
200 Completed { outputs: Vec<Box<dyn Any + Send>> },
201 /// Graph execution failed.
202 Error(GraphError),
203}
204
205impl GraphResult {
206 pub fn completed(outputs: Vec<Box<dyn Any + Send>>) -> Self {
207 Self::Completed { outputs }
208 }
209
210 pub fn error(err: GraphError) -> Self {
211 Self::Error(err)
212 }
213
214 pub fn is_completed(&self) -> bool {
215 matches!(self, Self::Completed { .. })
216 }
217
218 pub fn is_error(&self) -> bool {
219 matches!(self, Self::Error(_))
220 }
221}
222
223/// Errors that can occur during graph execution.
224#[derive(Debug, thiserror::Error)]
225pub enum GraphError {
226 #[error("serialization failed: {0}")]
227 Serialization(String),
228
229 #[error("deserialization failed: {0}")]
230 Deserialization(String),
231
232 #[error("missing input: source '{0}' not found in cache")]
233 MissingInput(String),
234
235 #[error("node execution failed: {0}")]
236 NodeExecution(String),
237
238 #[error("graph execution failed: {0}")]
239 Execution(String),
240}
241
242// ---------------------------------------------------------------------------
243// CompiledGraphFn
244// ---------------------------------------------------------------------------
245
246/// Type alias for the compiled graph function.
247pub type CompiledGraphFn =
248 Arc<dyn Fn(InputCache) -> Pin<Box<dyn Future<Output = GraphResult> + Send>> + Send + Sync>;
249
250// ---------------------------------------------------------------------------
251// Computation graph constructor types
252// ---------------------------------------------------------------------------
253//
254// The process-global computation-graph registry was removed in CLOACI-T-0509.
255// Constructors are owned by `cloacina::Runtime`, which is seeded from the
256// `inventory` entries emitted by the `#[computation_graph]` macro.
257
258/// Metadata about a registered computation graph.
259///
260/// `accumulator_names` and `reaction_mode` are the canonical fields consumed
261/// by the packaging FFI and the reconciler. Bundled-form graphs populate
262/// these from the local declaration; split-form graphs mirror the
263/// referenced reactor's declaration. Trigger-less graphs carry empty
264/// `accumulator_names` and `trigger_reactor = None`.
265pub struct ComputationGraphRegistration {
266 /// The compiled graph function.
267 pub graph_fn: CompiledGraphFn,
268 /// Name of the reactor this graph is bound to, if any. `None` for
269 /// trigger-less graphs (T-02/T-03 invoke these directly from workflow
270 /// tasks or Python tasks).
271 pub trigger_reactor: Option<String>,
272 /// Accumulator names. For split-form graphs this mirrors the reactor's
273 /// accumulators; for trigger-less graphs it is empty.
274 pub accumulator_names: Vec<String>,
275 /// Reaction mode: `"when_any"`, `"when_all"`, or `"none"` for
276 /// trigger-less graphs.
277 pub reaction_mode: String,
278}
279
280pub type ComputationGraphConstructor = Box<dyn Fn() -> ComputationGraphRegistration + Send + Sync>;
281
282// ---------------------------------------------------------------------------
283// Reactor
284// ---------------------------------------------------------------------------
285//
286// A reactor is a named bundle of accumulators + firing criteria. It fires an
287// `InputCache` whenever its criteria are met and publishes to any graph bound
288// to it by name.
289//
290// The `#[reactor]` attribute macro emits a unit struct + `impl Reactor for X`.
291// The struct is a compile-time handle: `#[computation_graph(trigger =
292// reactor(X))]` references it by type path so the graph macro can const-check
293// that its entry accumulators are a subset of `<X as Reactor>::ACCUMULATORS`.
294
295/// How a reactor decides when to fire.
296#[derive(Debug, Clone, Copy, PartialEq, Eq)]
297pub enum ReactionMode {
298 /// Fire as soon as any one accumulator has new input.
299 WhenAny,
300 /// Fire only when every accumulator has new input since the last firing.
301 WhenAll,
302}
303
304impl ReactionMode {
305 pub const fn as_str(&self) -> &'static str {
306 match self {
307 ReactionMode::WhenAny => "when_any",
308 ReactionMode::WhenAll => "when_all",
309 }
310 }
311}
312
313impl fmt::Display for ReactionMode {
314 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
315 f.write_str(self.as_str())
316 }
317}
318
319/// Compile-time handle for a reactor declaration.
320///
321/// Implemented by the unit struct emitted by `#[reactor]`. The constants are
322/// readable at `const` context, which the `#[computation_graph]` macro uses to
323/// const-check the graph's entry accumulators against the reactor's
324/// accumulator set.
325pub trait Reactor {
326 /// Reactor name as declared in `#[reactor(name = "...")]`.
327 const NAME: &'static str;
328 /// Declared accumulator names. Order matches the `accumulators = [...]`
329 /// clause; uniqueness is enforced at macro expansion.
330 const ACCUMULATORS: &'static [&'static str];
331 /// Firing criteria.
332 const REACTION_MODE: ReactionMode;
333}
334
335/// Runtime-side description of a reactor.
336///
337/// Populated by the `#[reactor]` macro's emitted inventory entry.
338#[derive(Debug, Clone)]
339pub struct ReactorRegistration {
340 pub name: String,
341 pub accumulator_names: Vec<String>,
342 pub reaction_mode: ReactionMode,
343}
344
345pub type ReactorConstructor = Box<dyn Fn() -> ReactorRegistration + Send + Sync>;
346
347/// Compile-time handle for a computation graph declaration.
348///
349/// Implemented by the `__CGHandle_<modname>` unit struct emitted by
350/// `#[computation_graph]`. Lets other macros (notably `#[task(invokes =
351/// computation_graph(H))]`) reference a graph by type path and const-check
352/// invariants like trigger-less-ness at compile time.
353pub trait Graph {
354 /// Graph name (the macro's `mod` name).
355 const NAME: &'static str;
356 /// True if the graph was declared without `trigger = reactor(...)` and is
357 /// therefore invocable directly by a workflow task.
358 const IS_TRIGGERLESS: bool;
359}
360
361// Re-export types module for backward compat path: `cloacina_computation_graph::types::serialize`
362pub mod types {
363 pub use crate::{deserialize, serialize, GraphError, GraphResult, InputCache, SourceName};
364}