use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use variadics_please::all_tuples_with_size;
use crate::Builder;
use super::{
BuildDiagramOperation, BuildStatus, DiagramContext, DiagramErrorCode, DynInputSlot, DynOutput,
MessageRegistry, NextOperation, OperationName, RegisterClone, SerializeMessage, TraceInfo,
TraceSettings, TypeInfo, supported::*,
};
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "snake_case")]
pub struct UnzipSchema {
pub next: Vec<NextOperation>,
#[serde(flatten)]
pub trace_settings: TraceSettings,
}
impl BuildDiagramOperation for UnzipSchema {
fn build_diagram_operation(
&self,
id: &OperationName,
ctx: &mut DiagramContext,
) -> Result<BuildStatus, DiagramErrorCode> {
let Some(inferred_type) = ctx.infer_input_type_into_target(id)? else {
return Ok(BuildStatus::defer("waiting for an input"));
};
let unzip = ctx.registry.messages.unzip(&inferred_type)?;
let actual_output = unzip.output_types();
if actual_output.len() != self.next.len() {
return Err(DiagramErrorCode::UnzipMismatch {
expected: self.next.len(),
actual: unzip.output_types().len(),
elements: actual_output,
});
}
let unzip = unzip.perform_unzip(ctx.builder)?;
let trace = TraceInfo::new(self, self.trace_settings.trace)?;
ctx.set_input_for_target(id, unzip.input, trace)?;
for (target, output) in self.next.iter().zip(unzip.outputs) {
ctx.add_output_into_target(target, output);
}
Ok(BuildStatus::Finished)
}
}
pub struct DynUnzip {
input: DynInputSlot,
outputs: Vec<DynOutput>,
}
pub trait PerformUnzip {
fn output_types(&self) -> Vec<TypeInfo>;
fn perform_unzip(&self, builder: &mut Builder) -> Result<DynUnzip, DiagramErrorCode>;
fn on_register(&self, registry: &mut MessageRegistry);
}
macro_rules! dyn_unzip_impl {
($len:literal, $(($P:ident, $o:ident)),*) => {
impl<$($P),*, Serializer, Cloneable> PerformUnzip for Supported<(($($P,)*), Serializer, Cloneable)>
where
$($P: Send + Sync + 'static),*,
Serializer: $(SerializeMessage<$P> +)* $(SerializeMessage<Vec<$P>> +)*,
Cloneable: $(RegisterClone<$P> +)* $(RegisterClone<Vec<$P>> +)*,
{
fn output_types(&self) -> Vec<TypeInfo> {
vec![$(
TypeInfo::of::<$P>(),
)*]
}
fn perform_unzip(
&self,
builder: &mut Builder,
) -> Result<DynUnzip, DiagramErrorCode> {
let (input, ($($o,)*)) = builder.create_unzip::<($($P,)*)>();
let mut outputs: Vec<DynOutput> = Vec::with_capacity($len);
$({
outputs.push($o.into());
})*
Ok(DynUnzip {
input: input.into(),
outputs,
})
}
fn on_register(&self, registry: &mut MessageRegistry)
{
$(
registry.register_serialize::<$P, Serializer>();
registry.register_clone::<$P, Cloneable>();
)*
}
}
};
}
all_tuples_with_size!(dyn_unzip_impl, 1, 12, R, o);
#[cfg(test)]
mod tests {
use serde_json::json;
use test_log::test;
use crate::{Diagram, DiagramErrorCode, JsonMessage, diagram::testing::DiagramTestFixture};
#[test]
fn test_unzip_not_unzippable() {
let mut fixture = DiagramTestFixture::new();
let diagram = Diagram::from_json(json!({
"version": "0.1.0",
"start": "op1",
"ops": {
"op1": {
"type": "node",
"builder": "multiply3",
"next": "unzip"
},
"unzip": {
"type": "unzip",
"next": [{ "builtin": "terminate" }],
},
},
}))
.unwrap();
let err = fixture.spawn_json_io_workflow(&diagram).unwrap_err();
assert!(
matches!(err.code, DiagramErrorCode::NotUnzippable(_)),
"{}",
err
);
}
#[test]
fn test_unzip_to_too_many_slots() {
let mut fixture = DiagramTestFixture::new();
let diagram = Diagram::from_json(json!({
"version": "0.1.0",
"start": "op1",
"ops": {
"op1": {
"type": "node",
"builder": "multiply3_5",
"next": "unzip"
},
"unzip": {
"type": "unzip",
"next": ["op2", "op3", "op4"],
},
"op2": {
"type": "node",
"builder": "multiply3",
"next": { "builtin": "terminate" },
},
"op3": {
"type": "node",
"builder": "multiply3",
"next": { "builtin": "terminate" },
},
"op4": {
"type": "node",
"builder": "multiply3",
"next": { "builtin": "terminate" },
},
},
}))
.unwrap();
let err = fixture.spawn_json_io_workflow(&diagram).unwrap_err();
assert!(matches!(
err.code,
DiagramErrorCode::UnzipMismatch {
expected: 3,
actual: 2,
..
}
));
}
#[test]
fn test_unzip_to_terminate() {
let mut fixture = DiagramTestFixture::new();
let diagram = Diagram::from_json(json!({
"version": "0.1.0",
"start": "op1",
"ops": {
"op1": {
"type": "node",
"builder": "multiply3_5",
"next": "unzip"
},
"unzip": {
"type": "unzip",
"next": [{ "builtin": "dispose" }, { "builtin": "terminate" }],
},
},
}))
.unwrap();
let result: JsonMessage = fixture
.spawn_and_run(&diagram, JsonMessage::from(4))
.unwrap();
assert!(fixture.context.no_unhandled_errors());
assert_eq!(result, 20);
}
#[test]
fn test_unzip() {
let mut fixture = DiagramTestFixture::new();
let diagram = Diagram::from_json(json!({
"version": "0.1.0",
"start": "op1",
"ops": {
"op1": {
"type": "node",
"builder": "multiply3_5",
"next": "unzip",
},
"unzip": {
"type": "unzip",
"next": [
"op2",
{ "builtin": "dispose" },
],
},
"op2": {
"type": "node",
"builder": "multiply3",
"next": { "builtin": "terminate" },
},
},
}))
.unwrap();
let result: JsonMessage = fixture
.spawn_and_run(&diagram, JsonMessage::from(4))
.unwrap();
assert!(fixture.context.no_unhandled_errors());
assert_eq!(result, 36);
}
#[test]
fn test_unzip_with_dispose() {
let mut fixture = DiagramTestFixture::new();
let diagram = Diagram::from_json(json!({
"version": "0.1.0",
"start": "op1",
"ops": {
"op1": {
"type": "node",
"builder": "multiply3_5",
"next": "unzip",
},
"unzip": {
"type": "unzip",
"next": [{ "builtin": "dispose" }, "op2"],
},
"op2": {
"type": "node",
"builder": "multiply3",
"next": { "builtin": "terminate" },
},
},
}))
.unwrap();
let result: JsonMessage = fixture
.spawn_and_run(&diagram, JsonMessage::from(4))
.unwrap();
assert!(fixture.context.no_unhandled_errors());
assert_eq!(result, 60);
}
}