use super::context::Context;
use crate::error::Result;
use crate::types::RelPtr;
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum PortDirection {
Input,
Output,
}
#[derive(Debug, Clone)]
pub struct Port {
pub name: String,
pub direction: PortDirection,
pub schema: String,
pub required: bool,
pub description: String,
}
impl Port {
pub fn input(schema: impl Into<String>) -> Self {
Self {
name: "in".to_string(),
direction: PortDirection::Input,
schema: schema.into(),
required: true,
description: "Default input".to_string(),
}
}
pub fn output(schema: impl Into<String>) -> Self {
Self {
name: "out".to_string(),
direction: PortDirection::Output,
schema: schema.into(),
required: false,
description: "Default output".to_string(),
}
}
pub fn error() -> Self {
Self {
name: "error".to_string(),
direction: PortDirection::Output,
schema: "Error@v1".to_string(),
required: false,
description: "Error output".to_string(),
}
}
pub fn named(
name: impl Into<String>,
direction: PortDirection,
schema: impl Into<String>,
) -> Self {
Self {
name: name.into(),
direction,
schema: schema.into(),
required: direction == PortDirection::Input,
description: String::new(),
}
}
pub fn optional(mut self) -> Self {
self.required = false;
self
}
pub fn with_description(mut self, desc: impl Into<String>) -> Self {
self.description = desc.into();
self
}
}
#[derive(Debug, Clone)]
pub struct NodeInfo {
pub name: String,
pub namespace: String,
pub short_name: String,
pub description: String,
pub version: String,
pub inputs: Vec<Port>,
pub outputs: Vec<Port>,
pub effectful: bool,
pub deterministic: bool,
}
impl NodeInfo {
pub fn new(namespace: impl Into<String>, name: impl Into<String>) -> Self {
let namespace = namespace.into();
let short_name = name.into();
let full_name = format!("{}::{}", namespace, short_name);
Self {
name: full_name,
namespace,
short_name,
description: String::new(),
version: "1.0.0".to_string(),
inputs: vec![Port::input("Any")],
outputs: vec![Port::output("Any"), Port::error()],
effectful: false,
deterministic: true,
}
}
pub fn with_description(mut self, desc: impl Into<String>) -> Self {
self.description = desc.into();
self
}
pub fn with_version(mut self, version: impl Into<String>) -> Self {
self.version = version.into();
self
}
pub fn with_inputs(mut self, inputs: Vec<Port>) -> Self {
self.inputs = inputs;
self
}
pub fn with_outputs(mut self, outputs: Vec<Port>) -> Self {
self.outputs = outputs;
self
}
pub fn effectful(mut self) -> Self {
self.effectful = true;
self
}
pub fn non_deterministic(mut self) -> Self {
self.deterministic = false;
self
}
pub fn get_input(&self, name: &str) -> Option<&Port> {
self.inputs.iter().find(|p| p.name == name)
}
pub fn get_output(&self, name: &str) -> Option<&Port> {
self.outputs.iter().find(|p| p.name == name)
}
}
#[derive(Debug)]
pub struct NodeOutput {
pub port: String,
pub data: RelPtr<()>,
pub schema_hash: u64,
pub error_message: Option<String>,
}
impl NodeOutput {
pub fn new<T>(port: impl Into<String>, data: RelPtr<T>) -> Self {
Self {
port: port.into(),
data: RelPtr::new(data.offset(), data.size()),
schema_hash: 0,
error_message: None,
}
}
pub fn out<T>(data: RelPtr<T>) -> Self {
Self::new("out", data)
}
pub fn error<T>(data: RelPtr<T>) -> Self {
Self::new("error", data)
}
pub fn error_with_message(message: impl Into<String>) -> Self {
Self {
port: "error".to_string(),
data: RelPtr::null(),
schema_hash: 0,
error_message: Some(message.into()),
}
}
pub fn on_true<T>(data: RelPtr<T>) -> Self {
Self::new("true", data)
}
pub fn on_false<T>(data: RelPtr<T>) -> Self {
Self::new("false", data)
}
pub fn with_schema_hash(mut self, hash: u64) -> Self {
self.schema_hash = hash;
self
}
pub fn has_error_message(&self) -> bool {
self.error_message.is_some()
}
pub fn get_error_message(&self) -> Option<&str> {
self.error_message.as_deref()
}
pub fn arena_location(&self) -> (crate::types::ArenaOffset, u32) {
(self.data.offset(), self.data.size())
}
}
pub type NodeFuture<'a> = Pin<Box<dyn Future<Output = Result<NodeOutput>> + Send + 'a>>;
pub trait Node: Send + Sync {
fn info(&self) -> NodeInfo;
fn execute<'a>(&'a self, ctx: Context, inputs: HashMap<String, RelPtr<()>>) -> NodeFuture<'a>;
fn shutdown(&self) {}
fn output_schema_hash(&self) -> u64 {
0
}
}
pub trait NodeFactory: Send + Sync {
fn node_type(&self) -> &str;
fn create(&self, config: &serde_yaml::Value) -> Result<Box<dyn Node>>;
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn port_creation() {
let input = Port::input("OrderInput@v1");
assert_eq!(input.name, "in");
assert_eq!(input.direction, PortDirection::Input);
assert!(input.required);
let output = Port::output("OrderOutput@v1").optional();
assert_eq!(output.name, "out");
assert!(!output.required);
}
#[test]
fn node_info_creation() {
let info = NodeInfo::new("std", "switch")
.with_description("Conditional branching")
.with_inputs(vec![Port::input("Any")])
.with_outputs(vec![
Port::named("true", PortDirection::Output, "Any"),
Port::named("false", PortDirection::Output, "Any"),
Port::error(),
]);
assert_eq!(info.name, "std::switch");
assert_eq!(info.namespace, "std");
assert_eq!(info.short_name, "switch");
assert_eq!(info.inputs.len(), 1);
assert_eq!(info.outputs.len(), 3);
}
}