xerv_core/traits/
node.rs

1//! Node trait and related types.
2
3use super::context::Context;
4use crate::error::Result;
5use crate::types::RelPtr;
6use std::collections::HashMap;
7use std::future::Future;
8use std::pin::Pin;
9
10/// Direction of a port.
11#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
12pub enum PortDirection {
13    /// Input port.
14    Input,
15    /// Output port.
16    Output,
17}
18
19/// A port on a node.
20#[derive(Debug, Clone)]
21pub struct Port {
22    /// Port name (e.g., "in", "out", "error", "true", "false").
23    pub name: String,
24    /// Port direction.
25    pub direction: PortDirection,
26    /// Schema name for the data type.
27    pub schema: String,
28    /// Whether this port is required.
29    pub required: bool,
30    /// Description of the port.
31    pub description: String,
32}
33
34impl Port {
35    /// Create a standard input port.
36    pub fn input(schema: impl Into<String>) -> Self {
37        Self {
38            name: "in".to_string(),
39            direction: PortDirection::Input,
40            schema: schema.into(),
41            required: true,
42            description: "Default input".to_string(),
43        }
44    }
45
46    /// Create a standard output port.
47    pub fn output(schema: impl Into<String>) -> Self {
48        Self {
49            name: "out".to_string(),
50            direction: PortDirection::Output,
51            schema: schema.into(),
52            required: false,
53            description: "Default output".to_string(),
54        }
55    }
56
57    /// Create an error output port.
58    pub fn error() -> Self {
59        Self {
60            name: "error".to_string(),
61            direction: PortDirection::Output,
62            schema: "Error@v1".to_string(),
63            required: false,
64            description: "Error output".to_string(),
65        }
66    }
67
68    /// Create a named port.
69    pub fn named(
70        name: impl Into<String>,
71        direction: PortDirection,
72        schema: impl Into<String>,
73    ) -> Self {
74        Self {
75            name: name.into(),
76            direction,
77            schema: schema.into(),
78            required: direction == PortDirection::Input,
79            description: String::new(),
80        }
81    }
82
83    /// Set the port as optional.
84    pub fn optional(mut self) -> Self {
85        self.required = false;
86        self
87    }
88
89    /// Set the port description.
90    pub fn with_description(mut self, desc: impl Into<String>) -> Self {
91        self.description = desc.into();
92        self
93    }
94}
95
96/// Metadata about a node type.
97#[derive(Debug, Clone)]
98pub struct NodeInfo {
99    /// Fully qualified name (e.g., "std::switch", "plugins::fraud_model").
100    pub name: String,
101    /// Namespace (e.g., "std", "plugins").
102    pub namespace: String,
103    /// Short name (e.g., "switch", "fraud_model").
104    pub short_name: String,
105    /// Description of what the node does.
106    pub description: String,
107    /// Version of the node implementation.
108    pub version: String,
109    /// Input ports.
110    pub inputs: Vec<Port>,
111    /// Output ports.
112    pub outputs: Vec<Port>,
113    /// Whether this node has side effects.
114    pub effectful: bool,
115    /// Whether this node is deterministic (same input = same output).
116    pub deterministic: bool,
117}
118
119impl NodeInfo {
120    /// Create new node info.
121    pub fn new(namespace: impl Into<String>, name: impl Into<String>) -> Self {
122        let namespace = namespace.into();
123        let short_name = name.into();
124        let full_name = format!("{}::{}", namespace, short_name);
125
126        Self {
127            name: full_name,
128            namespace,
129            short_name,
130            description: String::new(),
131            version: "1.0.0".to_string(),
132            inputs: vec![Port::input("Any")],
133            outputs: vec![Port::output("Any"), Port::error()],
134            effectful: false,
135            deterministic: true,
136        }
137    }
138
139    /// Set the description.
140    pub fn with_description(mut self, desc: impl Into<String>) -> Self {
141        self.description = desc.into();
142        self
143    }
144
145    /// Set the version.
146    pub fn with_version(mut self, version: impl Into<String>) -> Self {
147        self.version = version.into();
148        self
149    }
150
151    /// Set input ports.
152    pub fn with_inputs(mut self, inputs: Vec<Port>) -> Self {
153        self.inputs = inputs;
154        self
155    }
156
157    /// Set output ports.
158    pub fn with_outputs(mut self, outputs: Vec<Port>) -> Self {
159        self.outputs = outputs;
160        self
161    }
162
163    /// Mark as effectful (has side effects).
164    pub fn effectful(mut self) -> Self {
165        self.effectful = true;
166        self
167    }
168
169    /// Mark as non-deterministic.
170    pub fn non_deterministic(mut self) -> Self {
171        self.deterministic = false;
172        self
173    }
174
175    /// Get an input port by name.
176    pub fn get_input(&self, name: &str) -> Option<&Port> {
177        self.inputs.iter().find(|p| p.name == name)
178    }
179
180    /// Get an output port by name.
181    pub fn get_output(&self, name: &str) -> Option<&Port> {
182        self.outputs.iter().find(|p| p.name == name)
183    }
184}
185
186/// Output from a node execution.
187#[derive(Debug)]
188pub struct NodeOutput {
189    /// The output port that was activated.
190    pub port: String,
191    /// Pointer to the output data in the arena.
192    pub data: RelPtr<()>,
193    /// Schema hash of the output data.
194    pub schema_hash: u64,
195    /// Optional error message (for error outputs without arena data).
196    pub error_message: Option<String>,
197}
198
199impl NodeOutput {
200    /// Create a new node output.
201    pub fn new<T>(port: impl Into<String>, data: RelPtr<T>) -> Self {
202        Self {
203            port: port.into(),
204            data: RelPtr::new(data.offset(), data.size()),
205            schema_hash: 0,
206            error_message: None,
207        }
208    }
209
210    /// Create output on the default "out" port.
211    pub fn out<T>(data: RelPtr<T>) -> Self {
212        Self::new("out", data)
213    }
214
215    /// Create output on the "error" port with arena data.
216    pub fn error<T>(data: RelPtr<T>) -> Self {
217        Self::new("error", data)
218    }
219
220    /// Create output on the "error" port with a message string.
221    ///
222    /// Use this when you have an error message but no arena data to write.
223    /// The error message will be available for logging and debugging.
224    pub fn error_with_message(message: impl Into<String>) -> Self {
225        Self {
226            port: "error".to_string(),
227            data: RelPtr::null(),
228            schema_hash: 0,
229            error_message: Some(message.into()),
230        }
231    }
232
233    /// Create output on the "true" port (for switch nodes).
234    pub fn on_true<T>(data: RelPtr<T>) -> Self {
235        Self::new("true", data)
236    }
237
238    /// Create output on the "false" port (for switch nodes).
239    pub fn on_false<T>(data: RelPtr<T>) -> Self {
240        Self::new("false", data)
241    }
242
243    /// Set the schema hash.
244    pub fn with_schema_hash(mut self, hash: u64) -> Self {
245        self.schema_hash = hash;
246        self
247    }
248
249    /// Check if this output has an error message.
250    pub fn has_error_message(&self) -> bool {
251        self.error_message.is_some()
252    }
253
254    /// Get the error message, if any.
255    pub fn get_error_message(&self) -> Option<&str> {
256        self.error_message.as_deref()
257    }
258
259    /// Get the arena location (offset and size) of the output data.
260    ///
261    /// Returns `(offset, size)` tuple for use in WAL records and crash recovery.
262    /// Returns `(ArenaOffset::NULL, 0)` if the data pointer is null.
263    pub fn arena_location(&self) -> (crate::types::ArenaOffset, u32) {
264        (self.data.offset(), self.data.size())
265    }
266}
267
268/// A boxed future for async node execution.
269pub type NodeFuture<'a> = Pin<Box<dyn Future<Output = Result<NodeOutput>> + Send + 'a>>;
270
271/// The core trait for all XERV nodes.
272///
273/// Nodes are the basic units of computation in a flow. Each node:
274/// - Receives input from upstream nodes via the arena
275/// - Performs some computation
276/// - Writes output to the arena
277/// - Returns which output port to activate
278///
279/// # Example
280///
281/// ```ignore
282/// use xerv_core::prelude::*;
283///
284/// struct MyNode {
285///     config: MyConfig,
286/// }
287///
288/// impl Node for MyNode {
289///     fn info(&self) -> NodeInfo {
290///         NodeInfo::new("custom", "my_node")
291///             .with_description("My custom node")
292///     }
293///
294///     fn execute<'a>(&'a self, ctx: Context, input: RelPtr<()>) -> NodeFuture<'a> {
295///         Box::pin(async move {
296///             // Read input
297///             let data = ctx.read::<MyInput>(input.cast())?;
298///
299///             // Process
300///             let output = MyOutput { value: data.value * 2 };
301///
302///             // Write output
303///             let ptr = ctx.write(&output)?;
304///             Ok(NodeOutput::out(ptr))
305///         })
306///     }
307/// }
308/// ```
309pub trait Node: Send + Sync {
310    /// Get metadata about this node.
311    fn info(&self) -> NodeInfo;
312
313    /// Execute the node.
314    ///
315    /// # Parameters
316    /// - `ctx`: Execution context with access to arena and logging
317    /// - `inputs`: Map of input port names to data pointers
318    ///
319    /// # Returns
320    /// The output port and data pointer to activate.
321    fn execute<'a>(&'a self, ctx: Context, inputs: HashMap<String, RelPtr<()>>) -> NodeFuture<'a>;
322
323    /// Called when the node is being shut down.
324    fn shutdown(&self) {}
325
326    /// Get the schema hash for this node's output type.
327    fn output_schema_hash(&self) -> u64 {
328        0
329    }
330}
331
332/// A node factory that creates node instances from configuration.
333pub trait NodeFactory: Send + Sync {
334    /// Get the node type name this factory creates.
335    fn node_type(&self) -> &str;
336
337    /// Create a new node instance from YAML configuration.
338    fn create(&self, config: &serde_yaml::Value) -> Result<Box<dyn Node>>;
339}
340
341#[cfg(test)]
342mod tests {
343    use super::*;
344
345    #[test]
346    fn port_creation() {
347        let input = Port::input("OrderInput@v1");
348        assert_eq!(input.name, "in");
349        assert_eq!(input.direction, PortDirection::Input);
350        assert!(input.required);
351
352        let output = Port::output("OrderOutput@v1").optional();
353        assert_eq!(output.name, "out");
354        assert!(!output.required);
355    }
356
357    #[test]
358    fn node_info_creation() {
359        let info = NodeInfo::new("std", "switch")
360            .with_description("Conditional branching")
361            .with_inputs(vec![Port::input("Any")])
362            .with_outputs(vec![
363                Port::named("true", PortDirection::Output, "Any"),
364                Port::named("false", PortDirection::Output, "Any"),
365                Port::error(),
366            ]);
367
368        assert_eq!(info.name, "std::switch");
369        assert_eq!(info.namespace, "std");
370        assert_eq!(info.short_name, "switch");
371        assert_eq!(info.inputs.len(), 1);
372        assert_eq!(info.outputs.len(), 3);
373    }
374}