use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use super::{
BufferSelection, BuildDiagramOperation, BuildStatus, DiagramContext, DiagramErrorCode,
JsonMessage, NextOperation, OperationName,
};
use crate::{BufferIdentifier, TraceSettings, default_as_false, is_default, is_false};
#[derive(Clone, Debug, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "snake_case")]
pub struct JoinSchema {
pub next: NextOperation,
pub buffers: BufferSelection,
#[serde(default, skip_serializing_if = "is_default")]
pub clone: Vec<BufferIdentifier<'static>>,
#[serde(default = "default_as_false", skip_serializing_if = "is_false")]
pub serialize: bool,
#[serde(flatten)]
pub trace_settings: TraceSettings,
}
impl BuildDiagramOperation for JoinSchema {
fn build_diagram_operation(
&self,
_: &OperationName,
ctx: &mut DiagramContext,
) -> Result<BuildStatus, DiagramErrorCode> {
if self.buffers.is_empty() {
return Err(DiagramErrorCode::EmptyJoin);
}
let mut buffer_map = match ctx.create_buffer_map(&self.buffers) {
Ok(buffer_map) => buffer_map,
Err(reason) => return Ok(BuildStatus::defer(reason)),
};
for to_clone in &self.clone {
let Some(buffer) = buffer_map.get_mut(to_clone) else {
return Err(DiagramErrorCode::UnknownJoinField {
unknown: to_clone.clone(),
available: buffer_map.keys().cloned().collect(),
});
};
*buffer = (*buffer)
.join_by_cloning()
.ok_or_else(|| DiagramErrorCode::NotCloneable(buffer.message_type()))?;
}
if self.serialize {
let output = ctx.builder.try_join::<JsonMessage>(&buffer_map)?.output();
ctx.add_output_into_target(&self.next, output.into());
} else {
let Some(target_type) = ctx.infer_input_type_into_target(&self.next)? else {
return Ok(BuildStatus::defer(
"waiting to find out target message type",
));
};
let output = ctx
.registry
.messages
.join(&target_type, &buffer_map, ctx.builder)?;
ctx.add_output_into_target(&self.next, output);
}
Ok(BuildStatus::Finished)
}
}
#[cfg(test)]
mod tests {
use crossflow_derive::Joined;
use serde_json::json;
use test_log::test;
use super::*;
use crate::{
Diagram, DiagramElementRegistry, DiagramErrorCode, NodeBuilderOptions,
diagram::testing::DiagramTestFixture,
};
fn foo(_: serde_json::Value) -> String {
"foo".to_string()
}
fn bar(_: serde_json::Value) -> String {
"bar".to_string()
}
#[derive(Serialize, Deserialize, JsonSchema, Joined)]
struct FooBar {
foo: String,
bar: String,
}
impl Default for FooBar {
fn default() -> Self {
FooBar {
foo: "foo".to_owned(),
bar: "bar".to_owned(),
}
}
}
fn foobar(foobar: FooBar) -> String {
format!("{}{}", foobar.foo, foobar.bar)
}
fn foobar_array(foobar: Vec<String>) -> String {
format!("{}{}", foobar[0], foobar[1])
}
fn register_join_nodes(registry: &mut DiagramElementRegistry) {
registry.register_node_builder(NodeBuilderOptions::new("foo"), |builder, _config: ()| {
builder.create_map_block(foo)
});
registry.register_node_builder(NodeBuilderOptions::new("bar"), |builder, _config: ()| {
builder.create_map_block(bar)
});
registry
.opt_out()
.no_cloning()
.register_node_builder(NodeBuilderOptions::new("foobar"), |builder, _config: ()| {
builder.create_map_block(foobar)
})
.with_join();
registry
.register_node_builder(
NodeBuilderOptions::new("foobar_array"),
|builder, _config: ()| builder.create_map_block(foobar_array),
)
.with_join();
registry.opt_out().no_cloning().register_node_builder(
NodeBuilderOptions::new("create_foobar"),
|builder, config: FooBar| {
builder.create_map_block(move |_: JsonMessage| FooBar {
foo: config.foo.clone(),
bar: config.bar.clone(),
})
},
);
}
#[test]
fn test_join() {
let mut fixture = DiagramTestFixture::new();
register_join_nodes(&mut fixture.registry);
let diagram = Diagram::from_json(json!({
"version": "0.1.0",
"start": "fork_clone",
"ops": {
"fork_clone": {
"type": "fork_clone",
"next": ["foo", "bar"],
},
"foo": {
"type": "node",
"builder": "foo",
"next": "foo_buffer",
},
"foo_buffer": {
"type": "buffer",
},
"bar": {
"type": "node",
"builder": "bar",
"next": "bar_buffer",
},
"bar_buffer": {
"type": "buffer",
},
"join": {
"type": "join",
"buffers": {
"foo": "foo_buffer",
"bar": "bar_buffer",
},
"next": "foobar",
},
"foobar": {
"type": "node",
"builder": "foobar",
"next": { "builtin": "terminate" },
},
}
}))
.unwrap();
let result: JsonMessage = fixture.spawn_and_run(&diagram, JsonMessage::Null).unwrap();
assert!(fixture.context.no_unhandled_errors());
assert_eq!(result, "foobar");
}
#[test]
fn test_join_infer_type() {
let mut fixture = DiagramTestFixture::new();
register_join_nodes(&mut fixture.registry);
let diagram = Diagram::from_json(json!({
"version": "0.1.0",
"start": "fork_clone",
"ops": {
"fork_clone": {
"type": "fork_clone",
"next": ["foo", "bar"],
},
"foo": {
"type": "node",
"builder": "foo",
"next": "foo_buffer",
},
"foo_buffer": {
"type": "buffer",
},
"bar": {
"type": "node",
"builder": "bar",
"next": "bar_buffer",
},
"bar_buffer": {
"type": "buffer",
},
"join": {
"type": "join",
"buffers": {
"foo": "foo_buffer",
"bar": "bar_buffer",
},
"next": "foobar",
},
"foobar": {
"type": "node",
"builder": "foobar",
"next": { "builtin": "terminate" },
},
}
}))
.unwrap();
let result: JsonMessage = fixture.spawn_and_run(&diagram, JsonMessage::Null).unwrap();
assert!(fixture.context.no_unhandled_errors());
assert_eq!(result, "foobar");
}
#[test]
fn test_join_infer_from_terminate() {
let mut fixture = DiagramTestFixture::new();
register_join_nodes(&mut fixture.registry);
let diagram = Diagram::from_json(json!({
"version": "0.1.0",
"start": "fork_clone",
"ops": {
"fork_clone": {
"type": "fork_clone",
"next": ["foo", "bar"],
},
"foo": {
"type": "node",
"builder": "foo",
"next": "foo_buffer",
},
"foo_buffer": {
"type": "buffer",
},
"bar": {
"type": "node",
"builder": "bar",
"next": "bar_buffer",
},
"bar_buffer": {
"type": "buffer",
},
"join": {
"type": "join",
"buffers": {
"foo": "foo_buffer",
"bar": "bar_buffer",
},
"next": "fork_clone2",
},
"fork_clone2": {
"type": "fork_clone",
"next": [{ "builtin": "terminate" }],
},
}
}))
.unwrap();
let result: JsonMessage = fixture.spawn_and_run(&diagram, JsonMessage::Null).unwrap();
let expectation = serde_json::Value::Object(serde_json::Map::from_iter([
(
"bar".to_string(),
serde_json::Value::String("bar".to_string()),
),
(
"foo".to_string(),
serde_json::Value::String("foo".to_string()),
),
]));
assert_eq!(result, expectation);
}
#[test]
fn test_join_buffer_array() {
let mut fixture = DiagramTestFixture::new();
register_join_nodes(&mut fixture.registry);
let diagram = Diagram::from_json(json!({
"version": "0.1.0",
"start": "fork_clone",
"ops": {
"fork_clone": {
"type": "fork_clone",
"next": ["foo", "bar"],
},
"foo": {
"type": "node",
"builder": "foo",
"next": "foo_buffer",
},
"foo_buffer": {
"type": "buffer",
},
"bar": {
"type": "node",
"builder": "bar",
"next": "bar_buffer",
},
"bar_buffer": {
"type": "buffer",
},
"join": {
"type": "join",
"buffers": ["foo_buffer", "bar_buffer"],
"next": "foobar_array",
},
"foobar_array": {
"type": "node",
"builder": "foobar_array",
"next": { "builtin": "terminate" },
},
}
}))
.unwrap();
let result: JsonMessage = fixture.spawn_and_run(&diagram, JsonMessage::Null).unwrap();
assert!(fixture.context.no_unhandled_errors());
assert_eq!(result, "foobar");
}
#[test]
fn test_empty_join() {
let mut fixture = DiagramTestFixture::new();
register_join_nodes(&mut fixture.registry);
let diagram = Diagram::from_json(json!({
"version": "0.1.0",
"start": "foo",
"ops": {
"foo": {
"type": "node",
"builder": "foo",
"next": { "builtin": "terminate" },
},
"join": {
"type": "join",
"buffers": [],
"next": "foobar",
},
"foobar": {
"type": "node",
"builder": "foobar",
"next": { "builtin": "terminate" },
},
}
}))
.unwrap();
let err = fixture.spawn_json_io_workflow(&diagram).unwrap_err();
assert!(matches!(err.code, DiagramErrorCode::EmptyJoin));
}
#[test]
fn test_diagram_join_by_clone() {
let mut fixture = DiagramTestFixture::new();
fixture
.registry
.register_node_builder(NodeBuilderOptions::new("check"), |builder, config: i64| {
builder.create_map_block(move |joined: JoinByCloneTest| {
if joined.count < config {
Err(joined.count + 1)
} else {
Ok(joined)
}
})
})
.with_join()
.with_result();
fixture
.registry
.register_message::<(String, i64)>()
.with_unzip();
let diagram = Diagram::from_json(json!({
"version": "0.1.0",
"start": "unzip",
"ops": {
"unzip": {
"type": "unzip",
"next": [
"message_buffer",
"count_buffer"
]
},
"message_buffer": { "type": "buffer" },
"count_buffer": { "type": "buffer" },
"join": {
"type": "join",
"buffers": {
"count": "count_buffer",
"message": "message_buffer"
},
"clone": [ "message" ],
"next": "check"
},
"check": {
"type": "node",
"builder": "check",
"config": 10,
"next": "fork_result"
},
"fork_result": {
"type": "fork_result",
"ok": { "builtin": "terminate" },
"err": "count_buffer"
}
}
}))
.unwrap();
let input = (String::from("hello"), 0_i64);
let result: JoinByCloneTest = fixture.spawn_and_run(&diagram, input).unwrap();
assert_eq!(result.count, 10);
assert_eq!(result.message, "hello");
}
#[derive(Joined, Clone, Serialize, Deserialize, JsonSchema)]
struct JoinByCloneTest {
count: i64,
message: String,
}
#[test]
fn test_split_clone_join() {
let mut fixture = DiagramTestFixture::new();
fixture.registry.register_message::<[f64; 2]>().with_split();
let diagram = Diagram::from_json(json!({
"version": "0.1.0",
"start": "initial_split",
"ops": {
"failure_split": {
"type": "split",
"sequential": [
{ "builtin": "dispose" },
"x10"
]
},
"evaluate": {
"type": "node",
"builder": "less_than",
"next": "fork_result"
},
"fork_result": {
"type": "fork_result",
"err": "failure_split",
"ok": {
"builtin": "terminate"
}
},
"x100": {
"type": "node",
"builder": "mul",
"next": "x100-buffer",
"config": 100,
"display_text": "x100"
},
"join": {
"type": "join",
"buffers": [
"x100-buffer",
"x10-buffer"
],
"next": "evaluate",
"clone": [
0
]
},
"x10": {
"type": "node",
"builder": "mul",
"next": "x10-buffer",
"config": 10,
"display_text": "x10"
},
"initial_split": {
"type": "split",
"sequential": [
"x100",
"x10"
]
},
"x100-buffer": {
"type": "buffer"
},
"x10-buffer": {
"type": "buffer"
}
}
}))
.unwrap();
let values: [f64; 2] = fixture.spawn_and_run(&diagram, [200.0, 10.0]).unwrap();
fixture.context.assert_no_errors();
assert_eq!(values[0], 20000.0);
assert_eq!(values[1], 100000.0);
}
}