#![doc = include_str!("../README.md")]
mod error;
mod graphs;
mod node;
mod registry;
mod stage;
mod types;
pub use directed_stage_macro::stage;
pub use error::*;
pub use graphs::{EdgeInfo, Graph};
pub use node::{AnyNode, Cached, DowncastEq, Node};
pub use registry::{NodeId, Registry};
pub use stage::{EvalStrategy, ReevaluationRule, RefType, Stage};
pub use types::{DataLabel, GraphOutput, NodeOutput};
#[cfg(test)]
mod tests {
extern crate self as directed;
use super::*;
use directed_stage_macro::stage;
use std::sync::atomic::{AtomicUsize, Ordering};
#[test]
fn basic_macro_test() {
#[stage(lazy, cache_last)]
fn TinyStage1() -> String {
println!("Running stage 1");
String::from("This is the output!")
}
#[stage(lazy, cache_last)]
fn TinyStage2(input: String, input2: String) -> String {
println!("Running stage 2");
input.to_uppercase() + " [" + &input2.chars().count().to_string() + " chars]"
}
#[stage(cache_last)]
fn TinyStage3(input: String) {
println!("Running stage 3");
assert_eq!("THIS IS THE OUTPUT! [19 chars]", input);
}
let mut registry = Registry::new();
let node_1 = registry.register(TinyStage1::new());
let node_2 = registry.register(TinyStage2::new());
let node_3 = registry.register(TinyStage3::new());
let graph = graph! {
nodes: [node_1, node_2, node_3],
connections: {
node_1 => node_2: input,
node_1 => node_2: input2,
node_2 => node_3: input,
}
}
.unwrap();
graph.execute(&mut registry).unwrap();
}
#[test]
fn get_output_test() {
#[stage]
fn TinyStage1() -> String {
println!("Running stage 1");
String::from("This is the output!")
}
let mut registry = Registry::new();
let node_1 = registry.register(TinyStage1::new());
let graph = graph! {
nodes: [node_1],
connections: {}
}
.unwrap();
graph.execute(&mut registry).unwrap();
let mut outputs = graph.get_output(&mut registry).unwrap();
assert_eq!(
outputs.take_unnamed::<String>(node_1).unwrap(),
String::from("This is the output!")
)
}
#[test]
fn inject_input_test() {
#[stage]
fn TinyStage1(simple_input: String) -> String {
println!("Running stage 1");
simple_input.replace("input", "output")
}
let mut registry = Registry::new();
let node_1 = registry.register(TinyStage1::new());
let graph = graph! {
nodes: [node_1],
connections: {}
}
.unwrap();
graph.set_input(&mut registry, node_1, "simple_input", String::from("This is the simple input!")).unwrap();
graph.execute(&mut registry).unwrap();
let mut outputs = graph.get_output(&mut registry).unwrap();
assert_eq!(
outputs.take_unnamed::<String>(node_1).unwrap(),
String::from("This is the simple output!")
)
}
#[test]
fn multiple_output_stage_test() {
#[stage(out(number: i32, text: String))]
fn MultiOutputStage() -> NodeOutput {
let value1 = 42;
let value2 = String::from("Hello");
output! {
number: value1,
text: value2
}
}
#[stage]
fn ConsumerStage1(number: i32) {
assert_eq!(number, 42);
}
#[stage]
fn ConsumerStage2(text: String) {
assert_eq!(text, "Hello");
}
let mut registry = Registry::new();
let producer = registry.register(MultiOutputStage::new());
let consumer1 = registry.register(ConsumerStage1::new());
let consumer2 = registry.register(ConsumerStage2::new());
let graph = graph! {
nodes: [producer, consumer1, consumer2],
connections: {
producer: number => consumer1: number,
producer: text => consumer2: text,
}
}
.unwrap();
graph.execute(&mut registry).unwrap();
}
#[test]
fn lazy_and_urgent_eval_test() {
static COUNTER: AtomicUsize = AtomicUsize::new(0);
#[stage(lazy, cache_last)]
fn LazyStage() -> i32 {
COUNTER.fetch_add(1, Ordering::SeqCst);
42
}
#[stage(cache_last)]
fn UrgentStage(input: i32) {
assert_eq!(input, 42);
assert_eq!(COUNTER.load(Ordering::SeqCst), 1);
}
let mut registry = Registry::new();
let lazy_node = registry.register(LazyStage::new());
let urgent_node = registry.register(UrgentStage::new());
let graph = graph! {
nodes: [lazy_node, urgent_node],
connections: {
lazy_node => urgent_node: input,
}
}
.unwrap();
COUNTER.store(0, Ordering::SeqCst);
graph.execute(&mut registry).unwrap();
}
#[test]
fn transparent_opaque_reevaluation_test() {
static TRANSPARENT_COUNTER: AtomicUsize = AtomicUsize::new(0);
static OPAQUE_COUNTER: AtomicUsize = AtomicUsize::new(0);
#[stage(lazy, cache_last)]
fn SourceStage() -> i32 {
println!("SourceStage");
42
}
#[stage(lazy, cache_last)]
fn TransparentStage(input: i32) -> i32 {
println!("TransparentStage");
TRANSPARENT_COUNTER.fetch_add(1, Ordering::SeqCst);
input * 2
}
#[stage(lazy)]
fn OpaqueStage(input: &i32) -> i32 {
println!("OpaqueStage");
OPAQUE_COUNTER.fetch_add(1, Ordering::SeqCst);
input * 3
}
#[stage]
fn SinkStage(t_input: &i32, o_input: &i32) {
println!("SinkStage");
assert_eq!(*t_input, 84);
assert_eq!(*o_input, 126);
}
let mut registry = Registry::new();
let source = registry.register(SourceStage::new());
let transparent = registry.register(TransparentStage::new());
let opaque = registry.register(OpaqueStage::new());
let sink = registry.register(SinkStage::new());
let graph = graph! {
nodes: [source, transparent, opaque, sink],
connections: {
source => transparent: input,
source => opaque: input,
transparent => sink: t_input,
opaque => sink: o_input,
}
}
.unwrap();
TRANSPARENT_COUNTER.store(0, Ordering::SeqCst);
OPAQUE_COUNTER.store(0, Ordering::SeqCst);
graph.execute(&mut registry).unwrap();
assert_eq!(TRANSPARENT_COUNTER.load(Ordering::SeqCst), 1);
assert_eq!(OPAQUE_COUNTER.load(Ordering::SeqCst), 1);
graph.execute(&mut registry).unwrap();
assert_eq!(TRANSPARENT_COUNTER.load(Ordering::SeqCst), 1); assert_eq!(OPAQUE_COUNTER.load(Ordering::SeqCst), 2); }
#[test]
fn cycle_detection_test() {
#[stage]
fn StageA(input: i32) -> i32 {
input + 1
}
#[stage]
fn StageB(input: i32) -> i32 {
input * 2
}
let mut registry = Registry::new();
let node_a = registry.register(StageA::new());
let node_b = registry.register(StageB::new());
let result = graph! {
nodes: [node_a, node_b],
connections: {
node_a => node_b: input,
node_b => node_a: input,
}
};
assert!(result.is_err());
}
#[test]
fn registry_operations_test() {
#[stage]
fn SimpleStage() -> i32 {
42
}
let mut registry = Registry::new();
let node_id = registry.register(SimpleStage::new());
registry.validate_node_type::<SimpleStage>(node_id).unwrap();
#[stage]
fn OtherStage() -> String {
"hello".to_string()
}
assert!(registry.validate_node_type::<OtherStage>(node_id).is_err());
assert!(registry.get(node_id).is_some());
assert!(registry.get_mut(node_id).is_some());
let node = registry
.unregister::<SimpleStage>(node_id)
.unwrap()
.unwrap();
assert!(node.stage.eval_strategy() == EvalStrategy::Urgent);
assert!(registry.get(node_id).is_none());
}
#[test]
fn nonexistent_node_test() {
let mut registry = Registry::new();
let invalid_id = 9999;
assert!(registry.get(invalid_id).is_none());
assert!(registry.get_mut(invalid_id).is_none());
assert!(registry.unregister_and_drop(invalid_id).is_err());
}
#[test]
fn type_mismatch_test() {
#[stage]
fn StringStage() -> String {
"Hello".to_string()
}
#[stage]
fn IntegerConsumer(_input: i32) {
panic!("Should not execute");
}
let mut registry = Registry::new();
let producer = registry.register(StringStage::new());
let consumer = registry.register(IntegerConsumer::new());
let graph = graph! {
nodes: [producer, consumer],
connections: {
producer => consumer: input,
}
}
.unwrap();
let result = graph.execute(&mut registry);
assert!(result.is_err());
}
#[test]
fn missing_input_test() {
#[stage]
fn ConsumerStage(_input1: i32, _input2: String) {
panic!("Should not execute");
}
#[stage]
fn ProducerStage() -> i32 {
42
}
let mut registry = Registry::new();
let producer = registry.register(ProducerStage::new());
let consumer = registry.register(ConsumerStage::new());
let graph = graph! {
nodes: [producer, consumer],
connections: {
producer => consumer: input1,
}
}
.unwrap();
let result = graph.execute(&mut registry);
assert!(result.is_err());
}
#[test]
fn data_label_test() {
let label1 = DataLabel::new("test");
let label2 = DataLabel::new("test");
let label3 = DataLabel::new("different");
assert_eq!(label1, label2);
assert_ne!(label1, label3);
let const_label = DataLabel::new_const("const");
assert_eq!(const_label.inner(), Some("const"));
let from_str: DataLabel = "string".into();
assert_eq!(from_str.inner(), Some("string"));
}
#[test]
fn diamond_graph_test() {
#[stage]
fn Source() -> i32 {
10
}
#[stage]
fn PathA(input: i32) -> i32 {
input * 2
}
#[stage]
fn PathB(input: i32) -> i32 {
input + 5
}
#[stage]
fn Sink(a: i32, b: i32) {
assert_eq!(a, 20); assert_eq!(b, 15); }
let mut registry = Registry::new();
let source = registry.register(Source::new());
let path_a = registry.register(PathA::new());
let path_b = registry.register(PathB::new());
let sink = registry.register(Sink::new());
let graph = graph! {
nodes: [source, path_a, path_b, sink],
connections: {
source => path_a: input,
source => path_b: input,
path_a => sink: a,
path_b => sink: b,
}
}
.unwrap();
graph.execute(&mut registry).unwrap();
}
#[test]
fn invalid_output_name_test() {
#[stage]
fn MultiOutputStage() -> NodeOutput {
output! {
output1: 42,
output2: "Hello".to_string()
}
}
#[stage]
fn ConsumerStage(_input: i32) {
panic!("Should not execute");
}
let mut registry = Registry::new();
let producer = registry.register(MultiOutputStage::new());
let consumer = registry.register(ConsumerStage::new());
let graph = graph! {
nodes: [producer, consumer],
connections: {
producer: nonexistent => consumer: input,
}
}
.unwrap();
let result = graph.execute(&mut registry);
assert!(result.is_err());
}
#[test]
fn node_with_state_test() {
#[stage(state((u8, u8)))]
fn StateStage() {
assert_eq!(state.1, state.0 * 5);
state.0 += 1;
state.1 += 5;
println!("State is {}", state.1);
}
let mut registry = Registry::new();
let node = registry.register_with_state(StateStage::new(), (1, 5));
let graph = graph! {
nodes: [node],
connections: {}
}
.unwrap();
graph.execute(&mut registry).unwrap();
graph.execute(&mut registry).unwrap();
graph.execute(&mut registry).unwrap();
graph.execute(&mut registry).unwrap();
}
#[test]
fn output_macro_test() {
#[stage(out(number: i32, text: String, vector: Vec<i32>))]
fn ProduceOutput1() -> NodeOutput {
println!("Running ProduceOutput1");
let number = 42;
let text = "hello".to_string();
let vector = vec![1, 2, 3];
output! {
number,
text,
vector
}
}
#[stage]
fn ConsumeOutputs(num: i32, txt: String, vec: Vec<i32>) {
assert_eq!(num, 42);
assert_eq!(txt, "hello");
assert_eq!(vec, vec![1, 2, 3]);
}
let mut registry = Registry::new();
let producer = registry.register(ProduceOutput1::new());
let consumer = registry.register(ConsumeOutputs::new());
let graph = graph! {
nodes: [producer, consumer],
connections: {
producer: number => consumer: num,
producer: text => consumer: txt,
producer: vector => consumer: vec,
}
}
.unwrap();
graph.execute(&mut registry).unwrap();
}
#[test]
fn registry_type_validation_test() {
#[stage]
fn StageA() -> i32 {
42
}
#[stage]
fn StageB() -> String {
"hello".to_string()
}
let mut registry = Registry::new();
let node_a = registry.register(StageA::new());
assert!(registry.validate_node_type::<StageA>(node_a).is_ok());
assert!(registry.validate_node_type::<StageB>(node_a).is_err());
assert!(registry.unregister::<StageB>(node_a).is_err());
assert!(registry.unregister::<StageA>(node_a).is_ok());
}
#[test]
fn basic_cache_all_test() {
static COUNTER: AtomicUsize = AtomicUsize::new(0);
#[stage(lazy, cache_all)]
fn CacheStage1() -> String {
println!("Running stage 1");
COUNTER.fetch_add(1, Ordering::SeqCst);
String::from("This is the output!")
}
#[stage(lazy, cache_all)]
fn CacheStage2(input: String, input2: String) -> String {
println!("Running stage 2");
COUNTER.fetch_add(1, Ordering::SeqCst);
input.to_uppercase() + " [" + &input2.chars().count().to_string() + " chars]"
}
#[stage(cache_last)]
fn TinyStage3(input: String) {
println!("Running stage 3");
assert_eq!("THIS IS THE OUTPUT! [19 chars]", input);
}
#[stage(lazy, cache_all)]
fn CacheStage1Alternate() -> String {
println!("Running alt stage 1");
COUNTER.fetch_add(1, Ordering::SeqCst);
String::from("This is a different output!")
}
#[stage(cache_last)]
fn TinyStage3Alternate(input: String) {
println!("Running alt stage 3");
assert_eq!("THIS IS A DIFFERENT OUTPUT! [27 chars]", input);
}
let mut registry = Registry::new();
let node_1 = registry.register(CacheStage1::new());
let node_2 = registry.register(CacheStage2::new());
let node_3 = registry.register(TinyStage3::new());
let node_1_alt = registry.register(CacheStage1Alternate::new());
let node_3_alt = registry.register(TinyStage3Alternate::new());
let graph1 = graph! {
nodes: [node_1, node_2, node_3],
connections: {
node_1 => node_2: input,
node_1 => node_2: input2,
node_2 => node_3: input,
}
}
.unwrap();
graph1.execute(&mut registry).unwrap();
assert_eq!(COUNTER.load(Ordering::SeqCst), 2);
graph1.execute(&mut registry).unwrap();
assert_eq!(COUNTER.load(Ordering::SeqCst), 2);
let graph2 = graph! {
nodes: [node_1_alt, node_2, node_3_alt],
connections: {
node_1_alt => node_2: input,
node_1_alt => node_2: input2,
node_2 => node_3_alt: input,
}
}
.unwrap();
graph2.execute(&mut registry).unwrap();
assert_eq!(COUNTER.load(Ordering::SeqCst), 4);
graph2.execute(&mut registry).unwrap();
assert_eq!(COUNTER.load(Ordering::SeqCst), 4);
graph1.execute(&mut registry).unwrap();
assert_eq!(COUNTER.load(Ordering::SeqCst), 4);
}
#[test]
fn blank_connections_test() {
static COUNTER: AtomicUsize = AtomicUsize::new(0);
#[stage(lazy)]
fn TinyStage1() {
println!("Running stage 1");
COUNTER.fetch_add(1, Ordering::SeqCst);
}
#[stage(lazy)]
fn TinyStage2() {
println!("Running stage 2");
assert_eq!(COUNTER.load(Ordering::SeqCst), 2);
COUNTER.fetch_add(1, Ordering::SeqCst);
}
#[stage]
fn TinyStage3() {
println!("Running stage 3");
assert_eq!(COUNTER.load(Ordering::SeqCst), 3);
COUNTER.fetch_add(1, Ordering::SeqCst);
}
let mut registry = Registry::new();
let node_1 = registry.register(TinyStage1::new());
let node_2 = registry.register(TinyStage2::new());
let node_3 = registry.register(TinyStage3::new());
let graph = graph! {
nodes: [node_1, node_2, node_3],
connections: {
node_1 => node_2,
node_2 => node_3,
node_1 => node_3,
}
}
.unwrap();
graph.execute(&mut registry).unwrap();
assert_eq!(COUNTER.load(Ordering::SeqCst), 4);
}
}
#[cfg(all(test, feature = "tokio"))]
mod async_tests {
extern crate self as directed;
use super::*;
use directed_stage_macro::stage;
use std::sync::atomic::{AtomicUsize, Ordering};
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn parallel_execution_test() {
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
static COUNTER: AtomicUsize = AtomicUsize::new(0);
let (tx1, rx1) = unbounded_channel::<u8>();
let (tx2, rx2) = unbounded_channel::<u8>();
#[stage(lazy, state((UnboundedSender<u8>, UnboundedReceiver<u8>)))]
async fn SlowStage1() -> i32 {
println!("Running SlowStage1");
let (tx, rx) = state;
tx.send(1).unwrap();
assert_eq!(rx.recv().await.unwrap(), 2);
COUNTER.fetch_add(1, Ordering::SeqCst);
42
}
#[stage(lazy, state((UnboundedSender<u8>, UnboundedReceiver<u8>)))]
async fn SlowStage2() -> String {
println!("Running SlowStage2");
let (tx, rx) = state;
assert_eq!(rx.recv().await.unwrap(), 1);
tx.send(2).unwrap();
COUNTER.fetch_add(1, Ordering::SeqCst);
"hello".to_string()
}
#[stage]
fn CombineStage(as_num: i32, as_text: String) -> String {
println!("Running CombineStage");
format!("{} {}", as_text, as_num)
}
let mut registry = Registry::new();
let stage1 = registry.register_with_state(SlowStage1::new(), (tx1, rx2));
let stage2 = registry.register_with_state(SlowStage2::new(), (tx2, rx1));
let combine = registry.register(CombineStage::new());
println!("Node {stage1} is SlowStage1");
println!("Node {stage2} is SlowStage2");
println!("Node {combine} is CombineStage");
let graph = graph! {
nodes: [stage1, stage2, combine],
connections: {
stage1 => combine: as_num,
stage2 => combine: as_text,
}
}
.unwrap();
let graph = std::sync::Arc::new(graph);
COUNTER.store(0, Ordering::SeqCst);
graph
.execute_async(tokio::sync::Mutex::new(registry))
.await
.unwrap();
assert_eq!(COUNTER.load(Ordering::SeqCst), 2);
}
}