Skip to main content

cloacina_computation_graph/
lib.rs

1/*
2 *  Copyright 2026 Colliery Software
3 *
4 *  Licensed under the Apache License, Version 2.0 (the "License");
5 *  you may not use this file except in compliance with the License.
6 *  You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 *  Unless required by applicable law or agreed to in writing, software
11 *  distributed under the License is distributed on an "AS IS" BASIS,
12 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 *  See the License for the specific language governing permissions and
14 *  limitations under the License.
15 */
16
17//! Core types for Cloacina computation graph plugins.
18//!
19//! This crate contains the types that packaged computation graph cdylibs need
20//! at compile time. It is the computation-graph equivalent of `cloacina-workflow`
21//! — a thin crate that avoids pulling in the full engine.
22//!
23//! The `#[computation_graph]` macro expands into code that references types from
24//! this crate. Embedded-mode users get these types re-exported from `cloacina`.
25
26use serde::de::DeserializeOwned;
27use serde::{Deserialize, Serialize};
28use std::any::Any;
29use std::collections::HashMap;
30use std::fmt;
31use std::future::Future;
32use std::pin::Pin;
33use std::sync::Arc;
34
35// ---------------------------------------------------------------------------
36// SourceName
37// ---------------------------------------------------------------------------
38
39/// Identifies an accumulator source by name.
40#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
41pub struct SourceName(pub String);
42
43impl SourceName {
44    pub fn new(name: impl Into<String>) -> Self {
45        Self(name.into())
46    }
47
48    pub fn as_str(&self) -> &str {
49        &self.0
50    }
51}
52
53impl fmt::Display for SourceName {
54    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
55        write!(f, "{}", self.0)
56    }
57}
58
59impl From<&str> for SourceName {
60    fn from(s: &str) -> Self {
61        Self(s.to_string())
62    }
63}
64
65impl From<String> for SourceName {
66    fn from(s: String) -> Self {
67        Self(s)
68    }
69}
70
71// ---------------------------------------------------------------------------
72// Serialization helpers (bincode wire format)
73// ---------------------------------------------------------------------------
74
75/// Serialize a value to bincode bytes.
76///
77/// Bincode is used for all internal wire formats (boundary channels,
78/// checkpoint persistence, accumulator-to-reactor messaging).
79pub fn serialize<T: Serialize>(value: &T) -> Result<Vec<u8>, GraphError> {
80    bincode::serialize(value).map_err(|e| GraphError::Serialization(e.to_string()))
81}
82
83/// Deserialize bincode bytes to a value.
84pub fn deserialize<T: DeserializeOwned>(bytes: &[u8]) -> Result<T, GraphError> {
85    bincode::deserialize(bytes).map_err(|e| GraphError::Deserialization(e.to_string()))
86}
87
88// ---------------------------------------------------------------------------
89// InputCache
90// ---------------------------------------------------------------------------
91
92/// The input cache holds the last-seen serialized boundary per source.
93///
94/// The reactor's receiver task updates this cache continuously. The executor
95/// takes a snapshot before calling the compiled graph function.
96///
97/// Serialization format: bincode (compact binary). The FFI packaging bridge
98/// converts bincode→JSON at the boundary for plugin compatibility.
99#[derive(Debug, Clone)]
100pub struct InputCache {
101    entries: HashMap<SourceName, Vec<u8>>,
102}
103
104impl InputCache {
105    pub fn new() -> Self {
106        Self {
107            entries: HashMap::new(),
108        }
109    }
110
111    /// Update the cached value for a source.
112    pub fn update(&mut self, source: SourceName, bytes: Vec<u8>) {
113        self.entries.insert(source, bytes);
114    }
115
116    /// Get and deserialize a cached value by source name.
117    pub fn get<T: DeserializeOwned>(&self, name: &str) -> Option<Result<T, GraphError>> {
118        let bytes = self.entries.get(&SourceName::new(name))?;
119        Some(deserialize::<T>(bytes))
120    }
121
122    /// Check if a source has an entry in the cache.
123    pub fn has(&self, name: &str) -> bool {
124        self.entries.contains_key(&SourceName::new(name))
125    }
126
127    /// Get the raw bytes for a source.
128    pub fn get_raw(&self, name: &str) -> Option<&[u8]> {
129        self.entries
130            .get(&SourceName::new(name))
131            .map(|v| v.as_slice())
132    }
133
134    /// Create a snapshot (clone) of the cache.
135    pub fn snapshot(&self) -> InputCache {
136        self.clone()
137    }
138
139    /// Number of sources in the cache.
140    pub fn len(&self) -> usize {
141        self.entries.len()
142    }
143
144    /// Whether the cache is empty.
145    pub fn is_empty(&self) -> bool {
146        self.entries.is_empty()
147    }
148
149    /// Replace all entries.
150    pub fn replace_all(&mut self, other: InputCache) {
151        self.entries = other.entries;
152    }
153
154    /// List all source names in the cache.
155    pub fn sources(&self) -> Vec<&SourceName> {
156        self.entries.keys().collect()
157    }
158
159    /// Get a reference to the raw entries map.
160    pub fn entries_raw(&self) -> &HashMap<SourceName, Vec<u8>> {
161        &self.entries
162    }
163
164    /// Return entries as a JSON-friendly map.
165    pub fn entries_as_json(&self) -> HashMap<String, String> {
166        self.entries
167            .iter()
168            .map(|(name, bytes)| {
169                let value = if cfg!(debug_assertions) {
170                    serde_json::from_slice::<serde_json::Value>(bytes)
171                        .map(|v| v.to_string())
172                        .unwrap_or_else(|_| hex_encode(bytes))
173                } else {
174                    hex_encode(bytes)
175                };
176                (name.as_str().to_string(), value)
177            })
178            .collect()
179    }
180}
181
182impl Default for InputCache {
183    fn default() -> Self {
184        Self::new()
185    }
186}
187
188fn hex_encode(bytes: &[u8]) -> String {
189    bytes.iter().map(|b| format!("{:02x}", b)).collect()
190}
191
192// ---------------------------------------------------------------------------
193// GraphResult / GraphError
194// ---------------------------------------------------------------------------
195
196/// Result of executing a compiled computation graph.
197#[derive(Debug)]
198pub enum GraphResult {
199    /// Graph executed to completion. Contains terminal node outputs.
200    Completed { outputs: Vec<Box<dyn Any + Send>> },
201    /// Graph execution failed.
202    Error(GraphError),
203}
204
205impl GraphResult {
206    pub fn completed(outputs: Vec<Box<dyn Any + Send>>) -> Self {
207        Self::Completed { outputs }
208    }
209
210    pub fn error(err: GraphError) -> Self {
211        Self::Error(err)
212    }
213
214    pub fn is_completed(&self) -> bool {
215        matches!(self, Self::Completed { .. })
216    }
217
218    pub fn is_error(&self) -> bool {
219        matches!(self, Self::Error(_))
220    }
221}
222
223/// Errors that can occur during graph execution.
224#[derive(Debug, thiserror::Error)]
225pub enum GraphError {
226    #[error("serialization failed: {0}")]
227    Serialization(String),
228
229    #[error("deserialization failed: {0}")]
230    Deserialization(String),
231
232    #[error("missing input: source '{0}' not found in cache")]
233    MissingInput(String),
234
235    #[error("node execution failed: {0}")]
236    NodeExecution(String),
237
238    #[error("graph execution failed: {0}")]
239    Execution(String),
240}
241
242// ---------------------------------------------------------------------------
243// CompiledGraphFn
244// ---------------------------------------------------------------------------
245
246/// Type alias for the compiled graph function.
247pub type CompiledGraphFn =
248    Arc<dyn Fn(InputCache) -> Pin<Box<dyn Future<Output = GraphResult> + Send>> + Send + Sync>;
249
250// ---------------------------------------------------------------------------
251// Computation graph constructor types
252// ---------------------------------------------------------------------------
253//
254// The process-global computation-graph registry was removed in CLOACI-T-0509.
255// Constructors are owned by `cloacina::Runtime`, which is seeded from the
256// `inventory` entries emitted by the `#[computation_graph]` macro.
257
258/// Metadata about a registered computation graph.
259///
260/// `accumulator_names` and `reaction_mode` are the canonical fields consumed
261/// by the packaging FFI and the reconciler. Bundled-form graphs populate
262/// these from the local declaration; split-form graphs mirror the
263/// referenced reactor's declaration. Trigger-less graphs carry empty
264/// `accumulator_names` and `trigger_reactor = None`.
265pub struct ComputationGraphRegistration {
266    /// The compiled graph function.
267    pub graph_fn: CompiledGraphFn,
268    /// Name of the reactor this graph is bound to, if any. `None` for
269    /// trigger-less graphs (T-02/T-03 invoke these directly from workflow
270    /// tasks or Python tasks).
271    pub trigger_reactor: Option<String>,
272    /// Accumulator names. For split-form graphs this mirrors the reactor's
273    /// accumulators; for trigger-less graphs it is empty.
274    pub accumulator_names: Vec<String>,
275    /// Reaction mode: `"when_any"`, `"when_all"`, or `"none"` for
276    /// trigger-less graphs.
277    pub reaction_mode: String,
278}
279
280pub type ComputationGraphConstructor = Box<dyn Fn() -> ComputationGraphRegistration + Send + Sync>;
281
282// ---------------------------------------------------------------------------
283// Reactor
284// ---------------------------------------------------------------------------
285//
286// A reactor is a named bundle of accumulators + firing criteria. It fires an
287// `InputCache` whenever its criteria are met and publishes to any graph bound
288// to it by name.
289//
290// The `#[reactor]` attribute macro emits a unit struct + `impl Reactor for X`.
291// The struct is a compile-time handle: `#[computation_graph(trigger =
292// reactor(X))]` references it by type path so the graph macro can const-check
293// that its entry accumulators are a subset of `<X as Reactor>::ACCUMULATORS`.
294
295/// How a reactor decides when to fire.
296#[derive(Debug, Clone, Copy, PartialEq, Eq)]
297pub enum ReactionMode {
298    /// Fire as soon as any one accumulator has new input.
299    WhenAny,
300    /// Fire only when every accumulator has new input since the last firing.
301    WhenAll,
302}
303
304impl ReactionMode {
305    pub const fn as_str(&self) -> &'static str {
306        match self {
307            ReactionMode::WhenAny => "when_any",
308            ReactionMode::WhenAll => "when_all",
309        }
310    }
311}
312
313impl fmt::Display for ReactionMode {
314    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
315        f.write_str(self.as_str())
316    }
317}
318
319/// Compile-time handle for a reactor declaration.
320///
321/// Implemented by the unit struct emitted by `#[reactor]`. The constants are
322/// readable at `const` context, which the `#[computation_graph]` macro uses to
323/// const-check the graph's entry accumulators against the reactor's
324/// accumulator set.
325pub trait Reactor {
326    /// Reactor name as declared in `#[reactor(name = "...")]`.
327    const NAME: &'static str;
328    /// Declared accumulator names. Order matches the `accumulators = [...]`
329    /// clause; uniqueness is enforced at macro expansion.
330    const ACCUMULATORS: &'static [&'static str];
331    /// Firing criteria.
332    const REACTION_MODE: ReactionMode;
333}
334
335/// Runtime-side description of a reactor.
336///
337/// Populated by the `#[reactor]` macro's emitted inventory entry.
338#[derive(Debug, Clone)]
339pub struct ReactorRegistration {
340    pub name: String,
341    pub accumulator_names: Vec<String>,
342    pub reaction_mode: ReactionMode,
343}
344
345pub type ReactorConstructor = Box<dyn Fn() -> ReactorRegistration + Send + Sync>;
346
347/// Compile-time handle for a computation graph declaration.
348///
349/// Implemented by the `__CGHandle_<modname>` unit struct emitted by
350/// `#[computation_graph]`. Lets other macros (notably `#[task(invokes =
351/// computation_graph(H))]`) reference a graph by type path and const-check
352/// invariants like trigger-less-ness at compile time.
353pub trait Graph {
354    /// Graph name (the macro's `mod` name).
355    const NAME: &'static str;
356    /// True if the graph was declared without `trigger = reactor(...)` and is
357    /// therefore invocable directly by a workflow task.
358    const IS_TRIGGERLESS: bool;
359}
360
361// Re-export types module for backward compat path: `cloacina_computation_graph::types::serialize`
362pub mod types {
363    pub use crate::{deserialize, serialize, GraphError, GraphResult, InputCache, SourceName};
364}