mod buffer_schema;
mod fork_clone_schema;
mod fork_result_schema;
mod join_schema;
mod node_schema;
mod operation_ref;
mod registration;
mod scope_schema;
mod section_schema;
mod serialization;
mod split_schema;
mod stream_out_schema;
mod supported;
mod transform_schema;
mod unzip_schema;
mod workflow_builder;
#[cfg(feature = "grpc")]
pub mod grpc;
#[cfg(feature = "zenoh")]
pub mod zenoh;
use bevy_derive::{Deref, DerefMut};
use bevy_ecs::system::Commands;
pub use buffer_schema::{BufferAccessSchema, BufferSchema, ListenSchema};
pub use fork_clone_schema::{DynForkClone, ForkCloneSchema, RegisterClone};
pub use fork_result_schema::{DynForkResult, ForkResultSchema};
pub use join_schema::JoinSchema;
pub use node_schema::NodeSchema;
pub use operation_ref::*;
pub use registration::*;
pub use scope_schema::*;
pub use section_schema::*;
pub use serialization::*;
pub use split_schema::*;
pub use stream_out_schema::*;
use tracing::debug;
pub use transform_schema::{TransformError, TransformSchema};
pub use unzip_schema::UnzipSchema;
pub use workflow_builder::*;
use anyhow::Error as Anyhow;
use std::{
borrow::Cow,
collections::{HashMap, HashSet},
fmt::Display,
io::Read,
sync::Arc,
};
pub use crate::type_info::TypeInfo;
use crate::{
BufferIdentifier, Builder, IncompatibleLayout, IncrementalScopeError, JsonMessage,
MessageTypeHint, Scope, Service, SpawnWorkflowExt, SplitConnectionError, StreamPack,
is_default,
};
use schemars::{JsonSchema, Schema, SchemaGenerator, json_schema};
use serde::{
Deserialize, Deserializer, Serialize, Serializer,
de::{Error, Visitor},
ser::SerializeMap,
};
use thiserror::Error as ThisError;
const CURRENT_DIAGRAM_VERSION: &str = "0.1.0";
const SUPPORTED_DIAGRAM_VERSION: &str = ">=0.1.0, <0.2.0";
const RESERVED_OPERATION_NAMES: [&'static str; 2] = ["", "builtin"];
pub type BuilderId = Arc<str>;
pub type OperationName = Arc<str>;
pub type DisplayText = Arc<str>;
#[derive(
Debug, Clone, Serialize, Deserialize, JsonSchema, Hash, PartialEq, Eq, PartialOrd, Ord,
)]
#[serde(untagged, rename_all = "snake_case")]
pub enum NextOperation {
Name(OperationName),
Builtin {
builtin: BuiltinTarget,
},
Namespace(NamespacedOperation),
}
impl NextOperation {
pub fn dispose() -> Self {
NextOperation::Builtin {
builtin: BuiltinTarget::Dispose,
}
}
pub fn terminate() -> Self {
NextOperation::Builtin {
builtin: BuiltinTarget::Terminate,
}
}
}
impl Display for NextOperation {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Name(operation_id) => f.write_str(operation_id),
Self::Namespace(NamespacedOperation {
namespace,
operation,
}) => write!(f, "{namespace}:{operation}"),
Self::Builtin { builtin } => write!(f, "builtin:{builtin}"),
}
}
}
#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct NamespacedOperation {
pub namespace: OperationName,
pub operation: OperationName,
}
impl Serialize for NamespacedOperation {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut map = serializer.serialize_map(Some(1))?;
map.serialize_entry(&self.namespace, &self.operation)?;
map.end()
}
}
struct NamespacedOperationVisitor;
impl<'de> Visitor<'de> for NamespacedOperationVisitor {
type Value = NamespacedOperation;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter.write_str(
"a map with exactly one entry of { \"<namespace>\" : \"<operation>\" } \
whose key is the namespace string and whose value is the operation string",
)
}
fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
where
A: serde::de::MapAccess<'de>,
{
let (key, value) = map.next_entry::<String, String>()?.ok_or_else(|| {
A::Error::custom(
"namespaced operation must be a map from the namespace to the operation name",
)
})?;
if !map.next_key::<String>()?.is_none() {
return Err(A::Error::custom(
"namespaced operation must contain exactly one entry",
));
}
Ok(NamespacedOperation {
namespace: key.into(),
operation: value.into(),
})
}
}
impl<'de> Deserialize<'de> for NamespacedOperation {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
deserializer.deserialize_map(NamespacedOperationVisitor)
}
}
impl JsonSchema for NamespacedOperation {
fn json_schema(_generator: &mut SchemaGenerator) -> Schema {
json_schema!({
"title": "NamespacedOperation",
"description": "Refer to an operation inside of a namespace, e.g. { \"<namespace>\": \"<operation>\"",
"type": "object",
"maxProperties": 1,
"minProperties": 1,
"additionalProperties": {
"type": "string"
}
})
}
fn schema_name() -> Cow<'static, str> {
"NamespacedOperation".into()
}
}
#[derive(Clone, Debug, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "snake_case", untagged)]
pub enum BufferSelection {
Dict(HashMap<String, NextOperation>),
Array(Vec<NextOperation>),
}
impl BufferSelection {
pub fn is_empty(&self) -> bool {
match self {
Self::Dict(d) => d.is_empty(),
Self::Array(a) => a.is_empty(),
}
}
}
#[derive(
Debug,
Clone,
Serialize,
Deserialize,
JsonSchema,
Hash,
PartialEq,
Eq,
PartialOrd,
Ord,
strum::Display,
)]
#[serde(rename_all = "snake_case")]
#[strum(serialize_all = "snake_case")]
pub enum BuiltinTarget {
Terminate,
Dispose,
Cancel,
}
#[derive(
Debug,
Clone,
Serialize,
Deserialize,
JsonSchema,
Hash,
PartialEq,
Eq,
PartialOrd,
Ord,
strum::Display,
)]
#[serde(rename_all = "snake_case")]
#[strum(serialize_all = "snake_case")]
pub enum BuiltinSource {
Start,
}
#[derive(Debug, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "snake_case")]
pub struct TerminateSchema {}
#[derive(Clone, strum::Display, Debug, JsonSchema, Serialize, Deserialize)]
#[serde(rename_all = "snake_case", tag = "type")]
#[strum(serialize_all = "snake_case")]
pub enum DiagramOperation {
Node(NodeSchema),
Section(SectionSchema),
Scope(ScopeSchema),
StreamOut(StreamOutSchema),
ForkClone(ForkCloneSchema),
Unzip(UnzipSchema),
ForkResult(ForkResultSchema),
Split(SplitSchema),
Join(JoinSchema),
Transform(TransformSchema),
Buffer(BufferSchema),
BufferAccess(BufferAccessSchema),
Listen(ListenSchema),
}
impl BuildDiagramOperation for DiagramOperation {
fn build_diagram_operation(
&self,
id: &OperationName,
ctx: &mut DiagramContext,
) -> Result<BuildStatus, DiagramErrorCode> {
match self {
Self::Buffer(op) => op.build_diagram_operation(id, ctx),
Self::BufferAccess(op) => op.build_diagram_operation(id, ctx),
Self::ForkClone(op) => op.build_diagram_operation(id, ctx),
Self::ForkResult(op) => op.build_diagram_operation(id, ctx),
Self::Join(op) => op.build_diagram_operation(id, ctx),
Self::Listen(op) => op.build_diagram_operation(id, ctx),
Self::Node(op) => op.build_diagram_operation(id, ctx),
Self::Scope(op) => op.build_diagram_operation(id, ctx),
Self::Section(op) => op.build_diagram_operation(id, ctx),
Self::Split(op) => op.build_diagram_operation(id, ctx),
Self::StreamOut(op) => op.build_diagram_operation(id, ctx),
Self::Transform(op) => op.build_diagram_operation(id, ctx),
Self::Unzip(op) => op.build_diagram_operation(id, ctx),
}
}
}
fn schema_with_string(generator: &mut SchemaGenerator) -> Schema {
generator.subschema_for::<String>()
}
fn deserialize_semver<'de, D>(de: D) -> Result<semver::Version, D::Error>
where
D: serde::Deserializer<'de>,
{
let s = String::deserialize(de)?;
let ver_req = semver::VersionReq::parse(SUPPORTED_DIAGRAM_VERSION).unwrap();
let ver = semver::Version::parse(&s).map_err(|_| {
serde::de::Error::invalid_value(serde::de::Unexpected::Str(&s), &SUPPORTED_DIAGRAM_VERSION)
})?;
if !ver_req.matches(&ver) {
return Err(serde::de::Error::invalid_value(
serde::de::Unexpected::Str(&s),
&SUPPORTED_DIAGRAM_VERSION,
));
}
Ok(ver)
}
fn serialize_semver<S>(o: &semver::Version, ser: S) -> Result<S::Ok, S::Error>
where
S: serde::ser::Serializer,
{
o.to_string().serialize(ser)
}
#[derive(Default, Debug, Clone, JsonSchema, Serialize, Deserialize)]
pub struct ExtensionSettings {
#[serde(default)]
pub extensions: HashMap<String, serde_json::Value>,
}
#[derive(Default, Debug, Clone, JsonSchema, PartialEq, Serialize, Deserialize)]
pub struct InputExample {
pub value: JsonMessage,
pub description: String,
}
#[derive(Debug, Clone, JsonSchema, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub struct Diagram {
#[serde(
deserialize_with = "deserialize_semver",
serialize_with = "serialize_semver"
)]
#[schemars(schema_with = "schema_with_string")]
version: semver::Version,
#[serde(default)]
pub templates: Templates,
pub start: NextOperation,
#[serde(default, skip_serializing_if = "is_default")]
pub on_implicit_error: Option<NextOperation>,
pub ops: Operations,
#[serde(default, skip_serializing_if = "is_default")]
pub default_trace: TraceToggle,
#[serde(flatten)]
pub extensions: Option<ExtensionSettings>,
#[serde(default, skip_serializing_if = "is_default")]
pub description: String,
#[serde(default, skip_serializing_if = "is_default")]
input_examples: Vec<InputExample>,
}
#[derive(Default, Debug, Clone, Copy, JsonSchema, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum TraceToggle {
#[default]
Off,
On,
Messages,
}
impl TraceToggle {
pub fn is_on(&self) -> bool {
!matches!(self, Self::Off)
}
pub fn with_messages(&self) -> bool {
matches!(self, Self::Messages)
}
}
#[derive(Default, Debug, Clone, JsonSchema, Serialize, Deserialize)]
pub struct TraceSettings {
#[serde(default, skip_serializing_if = "is_default")]
pub display_text: Option<DisplayText>,
#[serde(default, skip_serializing_if = "is_default")]
pub trace: Option<TraceToggle>,
#[serde(flatten)]
pub extensions: Option<ExtensionSettings>,
}
impl Diagram {
pub fn new(start: NextOperation) -> Self {
Self {
version: semver::Version::parse(CURRENT_DIAGRAM_VERSION).unwrap(),
start,
templates: Default::default(),
on_implicit_error: Default::default(),
ops: Default::default(),
default_trace: Default::default(),
extensions: None,
description: Default::default(),
input_examples: Default::default(),
}
}
pub fn spawn_workflow<Request, Response, Streams>(
&self,
cmds: &mut Commands,
registry: &DiagramElementRegistry,
) -> Result<Service<Request, Response, Streams>, DiagramError>
where
Request: 'static + Send + Sync,
Response: 'static + Send + Sync,
Streams: StreamPack,
{
let mut err: Option<DiagramError> = None;
let w = cmds.spawn_workflow(
|scope: Scope<Request, Response, Streams>, builder: &mut Builder| {
debug!(
"spawn workflow, scope input: {:?}, terminate: {:?}",
scope.start.id(),
scope.terminate.id()
);
if let Err(had_err) = create_workflow(scope, builder, registry, self) {
err = Some(had_err);
}
},
);
if let Some(err) = err {
cmds.entity(w.provider()).despawn();
return Err(err);
}
Ok(w)
}
pub fn spawn_io_workflow<Request, Response>(
&self,
cmds: &mut Commands,
registry: &DiagramElementRegistry,
) -> Result<Service<Request, Response, ()>, DiagramError>
where
Request: 'static + Send + Sync,
Response: 'static + Send + Sync,
{
self.spawn_workflow::<Request, Response, ()>(cmds, registry)
}
pub fn from_json(value: serde_json::Value) -> Result<Self, serde_json::Error> {
serde_json::from_value(value)
}
pub fn from_json_str(s: &str) -> Result<Self, serde_json::Error> {
serde_json::from_str(s)
}
pub fn from_reader<R>(r: R) -> Result<Self, serde_json::Error>
where
R: Read,
{
serde_json::from_reader(r)
}
pub fn validate_operation_names(&self) -> Result<(), DiagramErrorCode> {
self.ops.validate_operation_names()?;
self.templates.validate_operation_names()?;
Ok(())
}
pub fn validate_template_usage(&self) -> Result<(), DiagramErrorCode> {
for op in self.ops.values() {
match op.as_ref() {
DiagramOperation::Section(section) => match §ion.provider {
SectionProvider::Template(template) => {
self.templates.validate_template(template)?;
}
_ => continue,
},
_ => continue,
}
}
Ok(())
}
}
#[derive(Debug, Clone, Default, JsonSchema, Serialize, Deserialize, Deref, DerefMut)]
#[serde(transparent, rename_all = "snake_case")]
pub struct Operations(Arc<HashMap<OperationName, Arc<DiagramOperation>>>);
impl Operations {
pub fn get_op(&self, op_id: &Arc<str>) -> Result<&Arc<DiagramOperation>, DiagramErrorCode> {
self.get(op_id)
.ok_or_else(|| DiagramErrorCode::operation_name_not_found(op_id.clone()))
}
pub fn validate_operation_names(&self) -> Result<(), DiagramErrorCode> {
validate_operation_names(&self.0)
}
}
#[derive(Debug, Clone, Default, JsonSchema, Serialize, Deserialize, Deref, DerefMut)]
#[serde(transparent, rename_all = "snake_case")]
pub struct Templates(HashMap<OperationName, SectionTemplate>);
impl Templates {
pub fn get_template(
&self,
template_id: &OperationName,
) -> Result<&SectionTemplate, DiagramErrorCode> {
self.get(template_id)
.ok_or_else(|| DiagramErrorCode::TemplateNotFound(template_id.clone()))
}
pub fn validate_operation_names(&self) -> Result<(), DiagramErrorCode> {
for (name, template) in &self.0 {
validate_operation_name(name)?;
validate_operation_names(&template.ops)?;
}
Ok(())
}
pub fn validate_template(&self, template_id: &OperationName) -> Result<(), DiagramErrorCode> {
check_circular_template_dependency(template_id, &self.0)?;
Ok(())
}
}
fn validate_operation_names(
ops: &HashMap<OperationName, Arc<DiagramOperation>>,
) -> Result<(), DiagramErrorCode> {
for name in ops.keys() {
validate_operation_name(name)?;
}
Ok(())
}
fn validate_operation_name(name: &str) -> Result<(), DiagramErrorCode> {
for reserved in &RESERVED_OPERATION_NAMES {
if name == *reserved {
return Err(DiagramErrorCode::InvalidUseOfReservedName(*reserved));
}
}
Ok(())
}
fn check_circular_template_dependency(
start_from: &OperationName,
templates: &HashMap<OperationName, SectionTemplate>,
) -> Result<(), DiagramErrorCode> {
let mut queue = Vec::new();
queue.push(TemplateStack::new(start_from));
while let Some(top) = queue.pop() {
let Some(template) = templates.get(&top.next) else {
return Err(DiagramErrorCode::UnknownTemplate(top.next));
};
for op in template.ops.0.values() {
match op.as_ref() {
DiagramOperation::Section(section) => match §ion.provider {
SectionProvider::Template(template) => {
queue.push(top.child(template)?);
}
_ => continue,
},
_ => continue,
}
}
}
Ok(())
}
struct TemplateStack {
used: HashSet<OperationName>,
next: OperationName,
}
impl TemplateStack {
fn new(op: &OperationName) -> Self {
TemplateStack {
used: HashSet::from_iter([Arc::clone(op)]),
next: Arc::clone(op),
}
}
fn child(&self, next: &OperationName) -> Result<Self, DiagramErrorCode> {
let mut used = self.used.clone();
if !used.insert(Arc::clone(next)) {
return Err(DiagramErrorCode::CircularTemplateDependency(
used.into_iter().collect(),
));
}
Ok(Self {
used,
next: Arc::clone(next),
})
}
}
#[derive(ThisError, Debug)]
#[error("{context} {code}")]
pub struct DiagramError {
pub context: DiagramErrorContext,
#[source]
pub code: DiagramErrorCode,
}
impl DiagramError {
pub fn in_operation(op_id: impl Into<OperationRef>, code: DiagramErrorCode) -> Self {
Self {
context: DiagramErrorContext {
op_id: Some(op_id.into()),
},
code,
}
}
}
#[derive(Debug)]
pub struct DiagramErrorContext {
op_id: Option<OperationRef>,
}
impl Display for DiagramErrorContext {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if let Some(op_id) = &self.op_id {
write!(f, "in operation [{}],", op_id)?;
}
Ok(())
}
}
#[derive(ThisError, Debug)]
pub enum DiagramErrorCode {
#[error("node builder [{0}] is not registered")]
BuilderNotFound(BuilderId),
#[error("node builder [{builder}] encountered an error: {error}")]
NodeBuildingError { builder: BuilderId, error: Anyhow },
#[error("section builder [{builder}] encountered an error: {error}")]
SectionBuildingError { builder: BuilderId, error: Anyhow },
#[error("operation [{0}] not found")]
OperationNotFound(NextOperation),
#[error("section template [{0}] does not exist")]
TemplateNotFound(OperationName),
#[error("{0}")]
TypeMismatch(#[from] TypeMismatch),
#[error("{0}")]
MissingStream(#[from] MissingStream),
#[error("Operation [{0}] attempted to instantiate a duplicate of itself.")]
DuplicateInputsCreated(OperationRef),
#[error("Operation [{0}] attempted to instantiate a duplicate buffer.")]
DuplicateBuffersCreated(OperationRef),
#[error(
"Missing a connection to start or terminate. A workflow cannot run with a valid connection to each."
)]
MissingStartOrTerminate,
#[error("Serialization was not registered for the target message type.")]
NotSerializable(TypeInfo),
#[error("Deserialization was not registered for the target message type.")]
NotDeserializable(TypeInfo),
#[error("Cloning was not registered for the target message type. Type: {0}")]
NotCloneable(TypeInfo),
#[error("The target message type does not support unzipping. Type: {0}")]
NotUnzippable(TypeInfo),
#[error(
"The number of elements in the unzip expected by the diagram [{expected}] is different from the real number [{actual}]"
)]
UnzipMismatch {
expected: usize,
actual: usize,
elements: Vec<TypeInfo>,
},
#[error(
"Call .with_result() on your node to be able to fork its Result-type output. Type: {0}"
)]
CannotForkResult(TypeInfo),
#[error(
"Response cannot be split. Make sure to use .with_split() when building the node. Type: {0}"
)]
NotSplittable(TypeInfo),
#[error(
"Message cannot be joined. Make sure to use .with_join() when building the target node. Type: {0}"
)]
NotJoinable(TypeInfo),
#[error("Empty join is not allowed.")]
EmptyJoin,
#[error("Unknown buffer identifier [{unknown}] used for join containing {}", format_list(.available))]
UnknownJoinField {
unknown: BufferIdentifier<'static>,
available: Vec<BufferIdentifier<'static>>,
},
#[error(
"Target type cannot be determined from [next] and [target_node] is not provided or cannot be inferred from."
)]
UnknownTarget,
#[error("There was an attempt to connect to an unknown operation: [{0}]")]
UnknownOperation(OperationRef),
#[error("There was an attempt to use an unknown section template: [{0}]")]
UnknownTemplate(OperationName),
#[error("There was an attempt to use an operation in an invalid way: [{0}]")]
InvalidOperation(OperationRef),
#[error(transparent)]
CannotTransform(#[from] TransformError),
#[error("box/unbox operation for the message is not registered")]
CannotBoxOrUnbox,
#[error("buffer access is not registered for {0}")]
CannotAccessBuffers(TypeInfo),
#[error("listening is not registered for {0}")]
CannotListen(TypeInfo),
#[error(transparent)]
IncompatibleBuffers(#[from] IncompatibleLayout),
#[error("inconsistent type hints for the buffer message: {}", format_list(&.0))]
InconsistentBufferHints(Vec<MessageTypeHint>),
#[error(
"This error should not happen, it means the implementation of buffer hints is broken. Identifier of missing hint: {0}"
)]
BrokenBufferMessageTypeHint(BufferIdentifier<'static>),
#[error(transparent)]
SectionError(#[from] SectionError),
#[error("one or more operation is missing inputs")]
IncompleteDiagram,
#[error("the config of the operation has an error: {0}")]
ConfigError(serde_json::Error),
#[error("failed to create trace info for the operation: {0}")]
TraceInfoError(serde_json::Error),
#[error(transparent)]
ConnectionError(#[from] SplitConnectionError),
#[error("a type being used in the diagram was not registered {0}")]
UnregisteredType(TypeInfo),
#[error("The build of the workflow came to a halt, reasons:\n{reasons:?}")]
BuildHalted {
reasons: HashMap<OperationRef, Cow<'static, str>>,
},
#[error(
"The workflow building process has had an excessive number of iterations. \
This may indicate an implementation bug or an extraordinarily complex diagram."
)]
ExcessiveIterations,
#[error("An operation was given a reserved name [{0}]")]
InvalidUseOfReservedName(&'static str),
#[error("an error happened while building a nested diagram: {0}")]
NestedError(Box<DiagramError>),
#[error("A circular redirection exists between operations: {}", format_list(&.0))]
CircularRedirect(Vec<OperationRef>),
#[error("A circular dependency exists between templates: {}", format_list(&.0))]
CircularTemplateDependency(Vec<OperationName>),
#[error("An error occurred while finishing the workflow build: {0}")]
FinishingErrors(FinishingErrors),
#[error("An error occurred while creating a scope: {0}")]
IncrementalScopeError(#[from] IncrementalScopeError),
}
fn format_list<T: std::fmt::Display>(list: &[T]) -> String {
let mut output = String::new();
for op in list {
output += &format!("[{op}]");
}
output
}
impl From<DiagramErrorCode> for DiagramError {
fn from(code: DiagramErrorCode) -> Self {
DiagramError {
context: DiagramErrorContext { op_id: None },
code,
}
}
}
impl DiagramErrorCode {
pub fn operation_name_not_found(name: OperationName) -> Self {
DiagramErrorCode::OperationNotFound(NextOperation::Name(name))
}
pub fn in_operation(self, op_id: OperationRef) -> DiagramError {
DiagramError::in_operation(op_id, self)
}
}
#[derive(ThisError, Debug)]
#[error("An expected stream is not provided by this node: {missing_name}. Available stream names: {}", format_list(&available_names))]
pub struct MissingStream {
pub missing_name: OperationName,
pub available_names: Vec<OperationName>,
}
#[derive(ThisError, Debug, Default)]
pub struct FinishingErrors {
pub errors: HashMap<OperationRef, DiagramErrorCode>,
}
impl FinishingErrors {
pub fn as_result(self) -> Result<(), Self> {
if self.errors.is_empty() {
Ok(())
} else {
Err(self)
}
}
}
impl std::fmt::Display for FinishingErrors {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
for (op, code) in &self.errors {
write!(f, " - [{op}]: {code}")?;
}
Ok(())
}
}
#[cfg(test)]
pub(crate) mod testing;
#[cfg(test)]
mod tests {
use crate::{Cancellation, CancellationCause};
use serde_json::json;
use test_log::test;
use testing::DiagramTestFixture;
use super::*;
#[test]
fn test_no_terminate() {
let mut fixture = DiagramTestFixture::new();
let diagram = Diagram::from_json(json!({
"version": "0.1.0",
"start": "op1",
"ops": {
"op1": {
"type": "node",
"builder": "multiply3",
"next": { "builtin": "dispose" },
},
},
}))
.unwrap();
let err = fixture
.spawn_and_run::<_, JsonMessage>(&diagram, JsonMessage::from(4))
.unwrap_err();
assert!(fixture.context.no_unhandled_errors());
assert!(matches!(
*err.downcast_ref::<Cancellation>().unwrap().cause,
CancellationCause::Unreachable(_)
));
}
#[test]
fn test_unserializable_start() {
let mut fixture = DiagramTestFixture::new();
let diagram = Diagram::from_json(json!({
"version": "0.1.0",
"start": "op1",
"ops": {
"op1": {
"type": "node",
"builder": "opaque_request",
"next": { "builtin": "terminate" },
},
},
}))
.unwrap();
let err = fixture.spawn_json_io_workflow(&diagram).unwrap_err();
assert!(
matches!(err.code, DiagramErrorCode::TypeMismatch { .. }),
"{:?}",
err
);
}
#[test]
fn test_unserializable_terminate() {
let mut fixture = DiagramTestFixture::new();
let diagram = Diagram::from_json(json!({
"version": "0.1.0",
"start": "op1",
"ops": {
"op1": {
"type": "node",
"builder": "opaque_response",
"next": { "builtin": "terminate" },
},
},
}))
.unwrap();
let err = fixture.spawn_json_io_workflow(&diagram).unwrap_err();
assert!(
matches!(err.code, DiagramErrorCode::NotSerializable(_)),
"{:?}",
err
);
}
#[test]
fn test_mismatch_types() {
let mut fixture = DiagramTestFixture::new();
let diagram = Diagram::from_json(json!({
"version": "0.1.0",
"start": "op1",
"ops": {
"op1": {
"type": "node",
"builder": "multiply3",
"next": "op2",
},
"op2": {
"type": "node",
"builder": "opaque_request",
"next": { "builtin": "terminate" },
},
},
}))
.unwrap();
let err = fixture.spawn_json_io_workflow(&diagram).unwrap_err();
assert!(
matches!(
err.code,
DiagramErrorCode::TypeMismatch(TypeMismatch {
target_type: _,
source_type: _
})
),
"{:?}",
err
);
}
#[test]
fn test_disconnected() {
let mut fixture = DiagramTestFixture::new();
let diagram = Diagram::from_json(json!({
"version": "0.1.0",
"start": "op1",
"ops": {
"op1": {
"type": "node",
"builder": "multiply3",
"next": "op2",
},
"op2": {
"type": "node",
"builder": "multiply3",
"next": "op1",
},
},
}))
.unwrap();
let err = fixture
.spawn_and_run::<_, JsonMessage>(&diagram, JsonMessage::from(4))
.unwrap_err();
assert!(fixture.context.no_unhandled_errors());
assert!(matches!(
*err.downcast_ref::<Cancellation>().unwrap().cause,
CancellationCause::Unreachable(_),
));
}
#[test]
fn test_looping_diagram() {
let mut fixture = DiagramTestFixture::new();
let diagram = Diagram::from_json(json!({
"version": "0.1.0",
"start": "op1",
"ops": {
"op1": {
"type": "node",
"builder": "multiply3",
"next": "fork_clone",
},
"fork_clone": {
"type": "fork_clone",
"next": ["op1", "op2"],
},
"op2": {
"type": "node",
"builder": "multiply3",
"next": { "builtin": "terminate" },
},
},
}))
.unwrap();
let result: JsonMessage = fixture
.spawn_and_run(&diagram, JsonMessage::from(4))
.unwrap();
assert!(fixture.context.no_unhandled_errors());
assert_eq!(result, 36);
}
#[test]
fn test_noop_diagram() {
let mut fixture = DiagramTestFixture::new();
let diagram = Diagram::from_json(json!({
"version": "0.1.0",
"start": { "builtin": "terminate" },
"ops": {},
}))
.unwrap();
let result: JsonMessage = fixture
.spawn_and_run(&diagram, JsonMessage::from(4))
.unwrap();
assert!(fixture.context.no_unhandled_errors());
assert_eq!(result, 4);
}
#[test]
fn test_serialized_diagram() {
let mut fixture = DiagramTestFixture::new();
let json_str = r#"
{
"version": "0.1.0",
"start": "multiply3",
"ops": {
"multiply3": {
"type": "node",
"builder": "multiply_by",
"config": 7,
"next": { "builtin": "terminate" }
}
}
}
"#;
let result: JsonMessage = fixture
.spawn_and_run(
&Diagram::from_json_str(json_str).unwrap(),
JsonMessage::from(4),
)
.unwrap();
assert!(fixture.context.no_unhandled_errors());
assert_eq!(result, 28);
}
#[test]
fn test_transform_unzip() {
let mut fixture = DiagramTestFixture::new();
let diagram = Diagram::from_json(json!({
"version": "0.1.0",
"start": "op1",
"ops": {
"op1": {
"type": "node",
"builder": "multiply3_5",
"next": "unzip",
},
"unzip": {
"type": "unzip",
"next": [
"transform",
{ "builtin": "dispose" },
],
},
"transform": {
"type": "transform",
"cel": "777",
"next": { "builtin": "terminate" },
},
},
}))
.unwrap();
let result: JsonMessage = fixture
.spawn_and_run(&diagram, JsonMessage::from(4))
.unwrap();
assert!(fixture.context.no_unhandled_errors());
assert_eq!(result, 777);
}
#[test]
fn test_unknown_operation_detection() {
let mut fixture = DiagramTestFixture::new();
let diagram = Diagram::from_json(json!({
"version": "0.1.0",
"start": "op1",
"ops": {
"op1": {
"type": "node",
"builder": "multiply3_5",
"next": "clone",
},
"clone": {
"type": "fork_clone",
"next": [
"unknown",
{ "builtin": "terminate" },
],
},
},
}))
.unwrap();
let result = fixture.spawn_json_io_workflow(&diagram).unwrap_err();
assert!(matches!(result.code, DiagramErrorCode::UnknownOperation(_),));
}
#[test]
fn test_fork_result_termination() {
let mut fixture = DiagramTestFixture::new();
fixture
.registry
.register_message::<Result<f32, ()>>()
.with_result();
let diagram = Diagram::from_json(json!({
"version": "0.1.0",
"start": "fork",
"ops": {
"fork": {
"type": "fork_result",
"ok": { "builtin": "terminate" },
"err": { "builtin": "dispose" }
}
}
}))
.unwrap();
let result: f32 = fixture.spawn_and_run(&diagram, Ok::<_, ()>(5_f32)).unwrap();
assert!(fixture.context.no_unhandled_errors());
assert_eq!(result, 5.0);
}
}