use std::{
any::{Any, type_name},
borrow::{Borrow, Cow},
cell::RefCell,
collections::{HashMap, HashSet},
marker::PhantomData,
sync::Arc,
};
use bevy_ecs::prelude::{Commands, Entity};
use anyhow::Error as Anyhow;
pub use crate::dyn_node::*;
use crate::{
Accessor, AnyBuffer, AsAnyBuffer, BufferIdentifier, BufferMap, BufferMapLayout, BufferSettings,
Builder, DisplayText, IncompatibleLayout, IncrementalScopeBuilder, IncrementalScopeRequest,
IncrementalScopeRequestResult, IncrementalScopeResponse, IncrementalScopeResponseResult,
Joined, JsonBuffer, JsonMessage, MessageTypeHintMap, NamedStream, Node, StreamAvailability,
StreamOf, StreamPack,
};
#[cfg(feature = "trace")]
use crate::Trace;
use schemars::{JsonSchema, Schema, SchemaGenerator, generate::SchemaSettings, json_schema};
use serde::{Deserialize, Serialize, de::DeserializeOwned, ser::SerializeMap};
use serde_json::json;
use super::{
BuilderId, DeserializeMessage, DiagramErrorCode, DynForkClone, DynForkResult, DynSplit,
DynType, JsonRegistration, RegisterJson, RegisterSplit, Section, SectionMetadata,
SectionMetadataProvider, SerializeMessage, SplitSchema, TransformError, TypeInfo,
buffer_schema::BufferAccessRequest, fork_clone_schema::RegisterClone,
fork_result_schema::RegisterForkResult, register_json, supported::*,
unzip_schema::PerformUnzip,
};
#[derive(Serialize, JsonSchema)]
pub struct NodeRegistration {
pub(super) default_display_text: DisplayText,
pub(super) request: TypeInfo,
pub(super) response: TypeInfo,
pub(super) streams: HashMap<Cow<'static, str>, TypeInfo>,
pub(super) config_schema: Schema,
pub(super) description: Option<String>,
pub(super) config_examples: Vec<ConfigExample>,
#[serde(skip)]
create_node_impl: CreateNodeFn,
}
impl NodeRegistration {
pub(super) fn create_node(
&self,
builder: &mut Builder,
config: JsonMessage,
) -> Result<DynNode, DiagramErrorCode> {
let mut create_node_impl = self.create_node_impl.borrow_mut();
let n = create_node_impl(builder, config)?;
Ok(n)
}
}
type CreateNodeFn =
RefCell<Box<dyn FnMut(&mut Builder, JsonMessage) -> Result<DynNode, DiagramErrorCode> + Send>>;
type DeserializeFn = fn(&mut Builder) -> Result<DynForkResult, DiagramErrorCode>;
type SerializeFn = fn(&mut Builder) -> Result<DynForkResult, DiagramErrorCode>;
type ForkCloneFn = fn(&mut Builder) -> Result<DynForkClone, DiagramErrorCode>;
type ForkResultFn = fn(&mut Builder) -> Result<DynForkResult, DiagramErrorCode>;
type SplitFn = fn(&SplitSchema, &mut Builder) -> Result<DynSplit, DiagramErrorCode>;
type JoinFn = fn(&BufferMap, &mut Builder) -> Result<DynOutput, DiagramErrorCode>;
type BufferAccessFn = fn(&BufferMap, &mut Builder) -> Result<DynNode, DiagramErrorCode>;
type ListenFn = fn(&BufferMap, &mut Builder) -> Result<DynOutput, DiagramErrorCode>;
type BufferLayoutTypeHintFn =
fn(HashSet<BufferIdentifier<'static>>) -> Result<MessageTypeHintMap, IncompatibleLayout>;
type CreateBufferFn = fn(BufferSettings, &mut Builder) -> AnyBuffer;
type CreateTriggerFn = fn(&mut Builder) -> DynNode;
type ToStringFn = fn(&mut Builder) -> DynNode;
#[cfg(feature = "trace")]
type EnableTraceSerializeFn = fn(&mut Trace);
struct BuildScope {
set_request: fn(&mut IncrementalScopeBuilder, &mut Commands) -> IncrementalScopeRequestResult,
set_response: fn(&mut IncrementalScopeBuilder, &mut Commands) -> IncrementalScopeResponseResult,
spawn_basic_scope_stream: fn(Entity, Entity, &mut Commands) -> (DynInputSlot, DynOutput),
}
impl BuildScope {
fn new<T: 'static + Send + Sync>() -> Self {
Self {
set_request: Self::impl_set_request::<T>,
set_response: Self::impl_set_response::<T>,
spawn_basic_scope_stream: Self::impl_spawn_basic_scope_stream::<T>,
}
}
fn impl_set_request<T: 'static + Send + Sync>(
incremental: &mut IncrementalScopeBuilder,
commands: &mut Commands,
) -> IncrementalScopeRequestResult {
incremental.set_request::<T>(commands)
}
fn impl_set_response<T: 'static + Send + Sync>(
incremental: &mut IncrementalScopeBuilder,
commands: &mut Commands,
) -> IncrementalScopeResponseResult {
incremental.set_response::<T>(commands)
}
fn impl_spawn_basic_scope_stream<T: 'static + Send + Sync>(
in_scope: Entity,
out_scope: Entity,
commands: &mut Commands,
) -> (DynInputSlot, DynOutput) {
let (stream_in, stream_out) =
NamedStream::<StreamOf<T>>::spawn_scope_stream(in_scope, out_scope, commands);
(stream_in.into(), stream_out.into())
}
}
#[must_use]
pub struct CommonOperations<'a, Deserialize, Serialize, Cloneable> {
registry: &'a mut DiagramElementRegistry,
_ignore: PhantomData<(Deserialize, Serialize, Cloneable)>,
}
impl<'a, DeserializeImpl, SerializeImpl, Cloneable>
CommonOperations<'a, DeserializeImpl, SerializeImpl, Cloneable>
{
pub fn register_node_builder<Config, Request, Response, Streams>(
self,
options: NodeBuilderOptions,
mut f: impl FnMut(&mut Builder, Config) -> Node<Request, Response, Streams> + Send + 'static,
) -> NodeRegistrationBuilder<'a, Request, Response, Streams>
where
Config: JsonSchema + DeserializeOwned,
Request: Send + Sync + 'static,
Response: Send + Sync + 'static,
Streams: StreamPack,
DeserializeImpl: DeserializeMessage<Request>,
DeserializeImpl: DeserializeMessage<Response>,
SerializeImpl: SerializeMessage<Request>,
SerializeImpl: SerializeMessage<Response>,
Cloneable: RegisterClone<Request>,
Cloneable: RegisterClone<Response>,
JsonRegistration<SerializeImpl, DeserializeImpl>: RegisterJson<Request>,
JsonRegistration<SerializeImpl, DeserializeImpl>: RegisterJson<Response>,
{
self.register_node_builder_fallible(options, move |builder, config| Ok(f(builder, config)))
}
pub fn register_node_builder_fallible<Config, Request, Response, Streams>(
mut self,
options: NodeBuilderOptions,
mut f: impl FnMut(&mut Builder, Config) -> Result<Node<Request, Response, Streams>, Anyhow>
+ Send
+ 'static,
) -> NodeRegistrationBuilder<'a, Request, Response, Streams>
where
Config: JsonSchema + DeserializeOwned,
Request: Send + Sync + 'static,
Response: Send + Sync + 'static,
Streams: StreamPack,
DeserializeImpl: DeserializeMessage<Request>,
DeserializeImpl: DeserializeMessage<Response>,
SerializeImpl: SerializeMessage<Request>,
SerializeImpl: SerializeMessage<Response>,
Cloneable: RegisterClone<Request>,
Cloneable: RegisterClone<Response>,
JsonRegistration<SerializeImpl, DeserializeImpl>: RegisterJson<Request>,
JsonRegistration<SerializeImpl, DeserializeImpl>: RegisterJson<Response>,
{
self.impl_register_message::<Request>();
self.impl_register_message::<Response>();
let node_builder_name = Arc::clone(&options.id);
let mut availability = StreamAvailability::default();
Streams::set_stream_availability(&mut availability);
let streams = availability.named_streams();
let registration = NodeRegistration {
default_display_text: options.default_display_text.unwrap_or(options.id.clone()),
request: TypeInfo::of::<Request>(),
response: TypeInfo::of::<Response>(),
streams,
config_schema: self
.registry
.messages
.schema_generator
.subschema_for::<Config>(),
create_node_impl: RefCell::new(Box::new(move |builder, config| {
let config =
serde_json::from_value(config).map_err(DiagramErrorCode::ConfigError)?;
let node =
f(builder, config).map_err(|error| DiagramErrorCode::NodeBuildingError {
builder: Arc::clone(&node_builder_name),
error,
})?;
Ok(node.into())
})),
description: options.description,
config_examples: options.config_examples,
};
self.registry.nodes.insert(options.id.clone(), registration);
NodeRegistrationBuilder::<Request, Response, Streams>::new(self.registry)
}
pub fn register_message<Message>(mut self) -> MessageRegistrationBuilder<'a, Message>
where
Message: Send + Sync + 'static,
DeserializeImpl: DeserializeMessage<Message>,
SerializeImpl: SerializeMessage<Message>,
Cloneable: RegisterClone<Message>,
JsonRegistration<SerializeImpl, DeserializeImpl>: RegisterJson<Message>,
{
self.impl_register_message();
MessageRegistrationBuilder::<Message>::new(&mut self.registry.messages)
}
fn impl_register_message<Message>(&mut self)
where
Message: Send + Sync + 'static,
DeserializeImpl: DeserializeMessage<Message>,
SerializeImpl: SerializeMessage<Message>,
Cloneable: RegisterClone<Message>,
JsonRegistration<SerializeImpl, DeserializeImpl>: RegisterJson<Message>,
{
self.registry
.messages
.register_deserialize::<Message, DeserializeImpl>();
self.registry
.messages
.register_serialize::<Message, SerializeImpl>();
self.registry
.messages
.register_clone::<Message, Cloneable>();
register_json::<Message, SerializeImpl, DeserializeImpl>();
}
pub fn no_deserializing(self) -> CommonOperations<'a, NotSupported, SerializeImpl, Cloneable> {
CommonOperations {
registry: self.registry,
_ignore: Default::default(),
}
}
pub fn no_serializing(self) -> CommonOperations<'a, DeserializeImpl, NotSupported, Cloneable> {
CommonOperations {
registry: self.registry,
_ignore: Default::default(),
}
}
pub fn no_cloning(self) -> CommonOperations<'a, DeserializeImpl, SerializeImpl, NotSupported> {
CommonOperations {
registry: self.registry,
_ignore: Default::default(),
}
}
}
pub struct MessageRegistrationBuilder<'a, Message> {
data: &'a mut MessageRegistry,
_ignore: PhantomData<Message>,
}
impl<'a, Message> MessageRegistrationBuilder<'a, Message>
where
Message: Send + Sync + 'static + Any,
{
pub fn new(registry: &'a mut MessageRegistry) -> Self {
registry.register_join::<Vec<Message>>();
Self {
data: registry,
_ignore: Default::default(),
}
}
pub fn with_unzip(&mut self) -> &mut Self
where
Supported<(Message, Supported, Supported)>: PerformUnzip,
{
self.data.register_unzip::<Message, Supported, Supported>();
self
}
pub fn with_unzip_minimal(&mut self) -> &mut Self
where
Supported<(Message, NotSupported, NotSupported)>: PerformUnzip,
{
self.data
.register_unzip::<Message, NotSupported, NotSupported>();
self
}
pub fn with_result(&mut self) -> &mut Self
where
Supported<(Message, Supported, Supported)>: RegisterForkResult,
{
self.data
.register_result::<Supported<(Message, Supported, Supported)>>();
self
}
pub fn with_result_minimal(&mut self) -> &mut Self
where
Supported<(Message, NotSupported, NotSupported)>: RegisterForkResult,
{
self.data
.register_result::<Supported<(Message, NotSupported, NotSupported)>>();
self
}
pub fn with_split(&mut self) -> &mut Self
where
Supported<(Message, Supported, Supported)>: RegisterSplit,
{
self.data.register_split::<Message, Supported, Supported>();
self
}
pub fn with_split_minimal(&mut self) -> &mut Self
where
Supported<(Message, NotSupported, NotSupported)>: RegisterSplit,
{
self.data
.register_split::<Message, NotSupported, NotSupported>();
self
}
pub fn with_join(&mut self) -> &mut Self
where
Message: Joined,
{
self.data.register_join::<Message>();
self
}
pub fn with_buffer_access(&mut self) -> &mut Self
where
Message: BufferAccessRequest,
{
self.data.register_buffer_access::<Message>();
self
}
pub fn with_listen(&mut self) -> &mut Self
where
Message: Accessor,
{
self.data.register_listen::<Message>();
self
}
pub fn with_to_string(&mut self) -> &mut Self
where
Message: ToString,
{
self.data.register_to_string::<Message>();
self
}
}
pub struct NodeRegistrationBuilder<'a, Request, Response, Streams> {
registry: &'a mut DiagramElementRegistry,
_ignore: PhantomData<(Request, Response, Streams)>,
}
impl<'a, Request, Response, Streams> NodeRegistrationBuilder<'a, Request, Response, Streams>
where
Request: Send + Sync + 'static + Any,
Response: Send + Sync + 'static + Any,
{
fn new(registry: &'a mut DiagramElementRegistry) -> Self {
Self {
registry,
_ignore: Default::default(),
}
}
pub fn with_common_request(&mut self) -> &mut Self
where
Request: DynType + DeserializeOwned + Serialize + Clone,
{
self.registry.register_message::<Request>();
self
}
pub fn with_clone_request(&mut self) -> &mut Self
where
Request: Clone,
{
self.registry
.messages
.register_clone::<Request, Supported>();
self
}
pub fn with_deserialize_request(&mut self) -> &mut Self
where
Request: DeserializeOwned + DynType,
{
self.registry
.messages
.register_deserialize::<Request, Supported>();
self
}
pub fn with_common_response(&mut self) -> &mut Self
where
Response: DynType + DeserializeOwned + Serialize + Clone,
{
self.registry.register_message::<Response>();
self
}
pub fn with_clone_response(&mut self) -> &mut Self
where
Response: Clone,
{
self.registry
.messages
.register_clone::<Response, Supported>();
self
}
pub fn with_serialize_response(&mut self) -> &mut Self
where
Response: Serialize + DynType,
{
self.registry
.messages
.register_serialize::<Response, Supported>();
self
}
pub fn with_unzip(&mut self) -> &mut Self
where
Supported<(Response, Supported, Supported)>: PerformUnzip,
{
MessageRegistrationBuilder::new(&mut self.registry.messages).with_unzip();
self
}
pub fn with_unzip_unserializable(&mut self) -> &mut Self
where
Supported<(Response, NotSupported, NotSupported)>: PerformUnzip,
{
MessageRegistrationBuilder::new(&mut self.registry.messages).with_unzip_minimal();
self
}
pub fn with_result(&mut self) -> &mut Self
where
Supported<(Response, Supported, Supported)>: RegisterForkResult,
{
MessageRegistrationBuilder::new(&mut self.registry.messages).with_result();
self
}
pub fn with_result_minimal(&mut self) -> &mut Self
where
Supported<(Response, NotSupported, NotSupported)>: RegisterForkResult,
{
MessageRegistrationBuilder::new(&mut self.registry.messages).with_result_minimal();
self
}
pub fn with_split(&mut self) -> &mut Self
where
Supported<(Response, Supported, Supported)>: RegisterSplit,
{
MessageRegistrationBuilder::new(&mut self.registry.messages).with_split();
self
}
pub fn with_split_unserializable(&mut self) -> &mut Self
where
Supported<(Response, NotSupported, NotSupported)>: RegisterSplit,
{
MessageRegistrationBuilder::new(&mut self.registry.messages).with_split_minimal();
self
}
pub fn with_join(&mut self) -> &mut Self
where
Request: Joined,
{
self.registry.messages.register_join::<Request>();
self
}
pub fn with_buffer_access(&mut self) -> &mut Self
where
Request: BufferAccessRequest,
{
self.registry.messages.register_buffer_access::<Request>();
self
}
pub fn with_listen(&mut self) -> &mut Self
where
Request: Accessor,
{
self.registry.messages.register_listen::<Request>();
self
}
pub fn with_request_to_string(&mut self) -> &mut Self
where
Request: ToString,
{
self.registry.messages.register_to_string::<Request>();
self
}
pub fn with_response_to_string(&mut self) -> &mut Self
where
Response: ToString,
{
self.registry.messages.register_to_string::<Response>();
self
}
}
pub trait IntoNodeRegistration {
fn into_node_registration(
self,
id: BuilderId,
name: String,
schema_generator: &mut SchemaGenerator,
) -> NodeRegistration;
}
type CreateSectionFn =
dyn FnMut(&mut Builder, serde_json::Value) -> Result<Box<dyn Section>, DiagramErrorCode> + Send;
#[derive(Serialize, JsonSchema)]
pub struct SectionRegistration {
pub(super) default_display_text: DisplayText,
pub(super) metadata: SectionMetadata,
pub(super) config_schema: Schema,
pub(super) description: Option<String>,
pub(super) config_examples: Vec<ConfigExample>,
#[serde(skip)]
create_section_impl: RefCell<Box<CreateSectionFn>>,
}
impl SectionRegistration {
pub(super) fn create_section(
&self,
builder: &mut Builder,
config: serde_json::Value,
) -> Result<Box<dyn Section>, DiagramErrorCode> {
let mut create_section_impl = self.create_section_impl.borrow_mut();
let section = create_section_impl(builder, config)?;
Ok(section)
}
}
pub trait IntoSectionRegistration<SectionT, Config>
where
SectionT: Section,
{
fn into_section_registration(
self,
options: &SectionBuilderOptions,
schema_generator: &mut SchemaGenerator,
) -> SectionRegistration;
}
impl<F, SectionT, Config> IntoSectionRegistration<SectionT, Config> for F
where
F: FnMut(&mut Builder, Config) -> Result<SectionT, Anyhow> + Send + 'static,
SectionT: Section + SectionMetadataProvider + 'static,
Config: DeserializeOwned + JsonSchema,
{
fn into_section_registration(
mut self,
options: &SectionBuilderOptions,
schema_generator: &mut SchemaGenerator,
) -> SectionRegistration {
let builder_id = Arc::clone(&options.id);
SectionRegistration {
default_display_text: options
.default_display_text
.as_ref()
.unwrap_or(&options.id)
.clone(),
metadata: SectionT::metadata().clone(),
config_schema: schema_generator.subschema_for::<()>(),
create_section_impl: RefCell::new(Box::new(move |builder, config| {
let section = self(builder, serde_json::from_value::<Config>(config).unwrap())
.map_err(|error| DiagramErrorCode::NodeBuildingError {
builder: Arc::clone(&builder_id),
error,
})?;
Ok(Box::new(section))
})),
description: options.description.clone(),
config_examples: options.config_examples.clone(),
}
}
}
#[derive(Serialize, JsonSchema)]
pub struct DiagramElementRegistry {
pub(super) nodes: HashMap<BuilderId, NodeRegistration>,
pub(super) sections: HashMap<BuilderId, SectionRegistration>,
pub(super) trace_supported: bool,
#[serde(flatten)]
pub(super) messages: MessageRegistry,
}
pub struct MessageOperation {
pub(super) deserialize_impl: Option<DeserializeFn>,
pub(super) serialize_impl: Option<SerializeFn>,
pub(super) fork_clone_impl: Option<ForkCloneFn>,
pub(super) unzip_impl: Option<Box<dyn PerformUnzip + Send>>,
pub(super) fork_result_impl: Option<ForkResultFn>,
pub(super) split_impl: Option<SplitFn>,
pub(super) join_impl: Option<JoinFn>,
pub(super) buffer_access_impl: Option<BufferAccessFn>,
pub(super) accessor_hints: Option<BufferLayoutTypeHintFn>,
pub(super) listen_impl: Option<ListenFn>,
pub(super) listen_hints: Option<BufferLayoutTypeHintFn>,
pub(super) to_string_impl: Option<ToStringFn>,
pub(super) create_buffer_impl: CreateBufferFn,
pub(super) create_trigger_impl: CreateTriggerFn,
build_scope: BuildScope,
#[cfg(feature = "trace")]
pub(super) enable_trace_serialization: Option<EnableTraceSerializeFn>,
}
impl MessageOperation {
fn new<T>() -> Self
where
T: Send + Sync + 'static + Any,
{
Self {
deserialize_impl: None,
serialize_impl: None,
fork_clone_impl: None,
unzip_impl: None,
fork_result_impl: None,
split_impl: None,
join_impl: None,
buffer_access_impl: None,
accessor_hints: None,
listen_impl: None,
listen_hints: None,
to_string_impl: None,
create_buffer_impl: |settings, builder| {
builder.create_buffer::<T>(settings).as_any_buffer()
},
create_trigger_impl: |builder| builder.create_map_block(|_: T| ()).into(),
build_scope: BuildScope::new::<T>(),
#[cfg(feature = "trace")]
enable_trace_serialization: None,
}
}
}
struct JsEmptyObject;
impl Serialize for JsEmptyObject {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_map(Some(0))?.end()
}
}
impl<'de> Deserialize<'de> for JsEmptyObject {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
deserializer.deserialize_map(serde::de::IgnoredAny)?;
Ok(JsEmptyObject {})
}
}
impl JsonSchema for JsEmptyObject {
fn schema_name() -> Cow<'static, str> {
"object".into()
}
fn json_schema(_generator: &mut SchemaGenerator) -> Schema {
json_schema!({ "type": "object" })
}
fn inline_schema() -> bool {
true
}
}
impl Serialize for MessageOperation {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut s = serializer.serialize_map(None)?;
let empty_object = JsEmptyObject {};
if self.deserialize_impl.is_some() {
s.serialize_entry("deserialize", &empty_object)?;
}
if self.serialize_impl.is_some() {
s.serialize_entry("serialize", &empty_object)?;
}
if self.fork_clone_impl.is_some() {
s.serialize_entry("fork_clone", &empty_object)?;
}
if let Some(unzip_impl) = &self.unzip_impl {
s.serialize_entry("unzip", &json!({"output_types": unzip_impl.output_types()}))?;
}
if self.fork_result_impl.is_some() {
s.serialize_entry("fork_result", &empty_object)?;
}
if self.split_impl.is_some() {
s.serialize_entry("split", &empty_object)?;
}
if self.join_impl.is_some() {
s.serialize_entry("join", &empty_object)?;
}
s.end()
}
}
#[derive(JsonSchema)]
#[allow(unused)] struct MessageOperationSchema {
deserialize: Option<JsEmptyObject>,
serialize: Option<JsEmptyObject>,
fork_clone: Option<JsEmptyObject>,
unzip: Option<Vec<TypeInfo>>,
fork_result: Option<JsEmptyObject>,
split: Option<JsEmptyObject>,
join: Option<JsEmptyObject>,
}
impl JsonSchema for MessageOperation {
fn schema_name() -> Cow<'static, str> {
"MessageOperation".into()
}
fn json_schema(generator: &mut SchemaGenerator) -> Schema {
<MessageOperationSchema as JsonSchema>::json_schema(generator)
}
}
#[derive(Serialize, JsonSchema)]
pub struct MessageRegistration {
pub(super) type_name: &'static str,
pub(super) schema: Option<Schema>,
pub(super) operations: MessageOperation,
}
impl MessageRegistration {
pub(super) fn new<T>() -> Self
where
T: Send + Sync + 'static + Any,
{
Self {
type_name: type_name::<T>(),
schema: None,
operations: MessageOperation::new::<T>(),
}
}
}
#[derive(Serialize, JsonSchema)]
pub struct MessageRegistry {
#[serde(serialize_with = "MessageRegistry::serialize_messages")]
pub messages: HashMap<TypeInfo, MessageRegistration>,
#[serde(rename = "schemas", with = "MessageRegistrySerializeSchemas")]
pub schema_generator: SchemaGenerator,
}
impl MessageRegistry {
fn new() -> Self {
let mut settings = SchemaSettings::default();
settings.definitions_path = "#/schemas/".into();
Self {
schema_generator: SchemaGenerator::new(settings),
messages: HashMap::from([(
TypeInfo::of::<serde_json::Value>(),
MessageRegistration::new::<serde_json::Value>(),
)]),
}
}
fn get<T>(&self) -> Option<&MessageRegistration>
where
T: Any,
{
self.messages.get(&TypeInfo::of::<T>())
}
pub fn deserialize(
&self,
target_type: &TypeInfo,
builder: &mut Builder,
) -> Result<DynForkResult, DiagramErrorCode> {
self.try_deserialize(target_type, builder)?
.ok_or_else(|| DiagramErrorCode::NotDeserializable(*target_type))
}
pub fn try_deserialize(
&self,
target_type: &TypeInfo,
builder: &mut Builder,
) -> Result<Option<DynForkResult>, DiagramErrorCode> {
self.messages
.get(target_type)
.and_then(|reg| reg.operations.deserialize_impl.as_ref())
.map(|deserialize| deserialize(builder))
.transpose()
}
pub(super) fn register_deserialize<T, Deserializer>(&mut self)
where
T: Send + Sync + 'static + Any,
Deserializer: DeserializeMessage<T>,
{
Deserializer::register_deserialize(&mut self.messages, &mut self.schema_generator);
}
pub fn serialize(
&self,
incoming_type: &TypeInfo,
builder: &mut Builder,
) -> Result<DynForkResult, DiagramErrorCode> {
self.try_serialize(incoming_type, builder)?
.ok_or_else(|| DiagramErrorCode::NotSerializable(*incoming_type))
}
pub fn try_serialize(
&self,
incoming_type: &TypeInfo,
builder: &mut Builder,
) -> Result<Option<DynForkResult>, DiagramErrorCode> {
self.messages
.get(incoming_type)
.and_then(|reg| reg.operations.serialize_impl.as_ref())
.map(|serialize| serialize(builder))
.transpose()
}
pub fn try_to_string(
&self,
incoming_type: &TypeInfo,
builder: &mut Builder,
) -> Result<Option<DynNode>, DiagramErrorCode> {
let ops = &self
.messages
.get(incoming_type)
.ok_or_else(|| DiagramErrorCode::UnregisteredType(*incoming_type))?
.operations;
Ok(ops.to_string_impl.map(|f| f(builder)))
}
pub(super) fn register_serialize<T, Serializer>(&mut self)
where
T: Send + Sync + 'static + Any,
Serializer: SerializeMessage<T>,
{
Serializer::register_serialize(&mut self.messages, &mut self.schema_generator)
}
pub fn fork_clone(
&self,
message_info: &TypeInfo,
builder: &mut Builder,
) -> Result<DynForkClone, DiagramErrorCode> {
self.messages
.get(message_info)
.and_then(|reg| reg.operations.fork_clone_impl.as_ref())
.ok_or(DiagramErrorCode::NotCloneable(*message_info))
.and_then(|f| f(builder))
}
pub(super) fn register_clone<T, F>(&mut self) -> bool
where
T: Send + Sync + 'static + Any,
F: RegisterClone<T>,
{
let ops = &mut self
.messages
.entry(TypeInfo::of::<T>())
.or_insert(MessageRegistration::new::<T>())
.operations;
if !F::CLONEABLE || ops.fork_clone_impl.is_some() {
return false;
}
F::register_clone(ops);
true
}
pub fn unzip<'a>(
&'a self,
message_info: &TypeInfo,
) -> Result<&'a dyn PerformUnzip, DiagramErrorCode> {
self.messages
.get(message_info)
.and_then(|reg| reg.operations.unzip_impl.as_ref())
.map(|unzip| -> &'a dyn PerformUnzip { unzip.as_ref() })
.ok_or(DiagramErrorCode::NotUnzippable(*message_info))
}
pub(super) fn register_unzip<T, Serializer, Cloneable>(&mut self) -> bool
where
T: Send + Sync + 'static + Any,
Serializer: 'static,
Cloneable: 'static,
Supported<(T, Serializer, Cloneable)>: PerformUnzip,
{
let unzip_impl = Supported::<(T, Serializer, Cloneable)>::new();
unzip_impl.on_register(self);
let ops = &mut self
.messages
.entry(TypeInfo::of::<T>())
.or_insert(MessageRegistration::new::<T>())
.operations;
if ops.unzip_impl.is_some() {
return false;
}
ops.unzip_impl = Some(Box::new(unzip_impl));
true
}
pub fn fork_result(
&self,
message_info: &TypeInfo,
builder: &mut Builder,
) -> Result<DynForkResult, DiagramErrorCode> {
self.messages
.get(message_info)
.and_then(|reg| reg.operations.fork_result_impl.as_ref())
.ok_or(DiagramErrorCode::CannotForkResult(*message_info))
.and_then(|f| f(builder))
}
pub(super) fn register_result<R>(&mut self) -> bool
where
R: RegisterForkResult,
{
R::on_register(self)
}
pub fn split(
&self,
message_info: &TypeInfo,
split_op: &SplitSchema,
builder: &mut Builder,
) -> Result<DynSplit, DiagramErrorCode> {
self.messages
.get(message_info)
.and_then(|reg| reg.operations.split_impl.as_ref())
.ok_or(DiagramErrorCode::NotSplittable(*message_info))
.and_then(|f| f(split_op, builder))
}
pub(super) fn register_split<T, S, C>(&mut self)
where
T: Send + Sync + 'static + Any,
Supported<(T, S, C)>: RegisterSplit,
{
Supported::<(T, S, C)>::on_register(self);
}
pub fn create_buffer(
&self,
message_info: &TypeInfo,
settings: BufferSettings,
builder: &mut Builder,
) -> Result<AnyBuffer, DiagramErrorCode> {
let f = self
.messages
.get(message_info)
.ok_or_else(|| DiagramErrorCode::UnregisteredType(*message_info))?
.operations
.create_buffer_impl;
Ok(f(settings, builder))
}
pub(crate) fn set_scope_request(
&self,
message_info: &TypeInfo,
incremental: &mut IncrementalScopeBuilder,
commands: &mut Commands,
) -> Result<IncrementalScopeRequest, DiagramErrorCode> {
let f = self
.messages
.get(message_info)
.ok_or_else(|| DiagramErrorCode::UnregisteredType(*message_info))?
.operations
.build_scope
.set_request;
f(incremental, commands).map_err(Into::into)
}
pub(crate) fn set_scope_response(
&self,
message_info: &TypeInfo,
incremental: &mut IncrementalScopeBuilder,
commands: &mut Commands,
) -> Result<IncrementalScopeResponse, DiagramErrorCode> {
let f = self
.messages
.get(message_info)
.ok_or_else(|| DiagramErrorCode::UnregisteredType(*message_info))?
.operations
.build_scope
.set_response;
f(incremental, commands).map_err(Into::into)
}
pub(crate) fn spawn_basic_scope_stream(
&self,
message_info: &TypeInfo,
in_scope: Entity,
out_scope: Entity,
commands: &mut Commands,
) -> Result<(DynInputSlot, DynOutput), DiagramErrorCode> {
let f = self
.messages
.get(message_info)
.ok_or_else(|| DiagramErrorCode::UnregisteredType(*message_info))?
.operations
.build_scope
.spawn_basic_scope_stream;
Ok(f(in_scope, out_scope, commands))
}
pub fn trigger(
&self,
message_info: &TypeInfo,
builder: &mut Builder,
) -> Result<DynNode, DiagramErrorCode> {
self.messages
.get(message_info)
.map(|reg| (reg.operations.create_trigger_impl)(builder))
.ok_or_else(|| DiagramErrorCode::UnregisteredType(*message_info))
}
pub fn join(
&self,
joinable: &TypeInfo,
buffers: &BufferMap,
builder: &mut Builder,
) -> Result<DynOutput, DiagramErrorCode> {
self.messages
.get(joinable)
.and_then(|reg| reg.operations.join_impl.as_ref())
.ok_or_else(|| DiagramErrorCode::NotJoinable(*joinable))
.and_then(|f| f(buffers, builder))
}
pub(super) fn register_join<T>(&mut self) -> bool
where
T: Send + Sync + 'static + Any + Joined,
{
let ops = &mut self
.messages
.entry(TypeInfo::of::<T>())
.or_insert(MessageRegistration::new::<T>())
.operations;
if ops.join_impl.is_some() {
return false;
}
ops.join_impl =
Some(|buffers, builder| Ok(builder.try_join::<T>(buffers)?.output().into()));
true
}
pub fn with_buffer_access(
&self,
target_type: &TypeInfo,
buffers: &BufferMap,
builder: &mut Builder,
) -> Result<DynNode, DiagramErrorCode> {
self.messages
.get(target_type)
.and_then(|reg| reg.operations.buffer_access_impl.as_ref())
.ok_or(DiagramErrorCode::CannotAccessBuffers(*target_type))
.and_then(|f| f(buffers, builder))
}
pub fn accessor_hint(
&self,
message_info: &TypeInfo,
identifiers: HashSet<BufferIdentifier<'static>>,
) -> Result<MessageTypeHintMap, DiagramErrorCode> {
self.messages
.get(message_info)
.and_then(|reg| reg.operations.accessor_hints)
.ok_or_else(|| DiagramErrorCode::CannotAccessBuffers(*message_info))
.and_then(|f| (f)(identifiers).map_err(Into::into))
}
pub(super) fn register_buffer_access<T>(&mut self) -> bool
where
T: Send + Sync + 'static + BufferAccessRequest,
{
let ops = &mut self
.messages
.entry(TypeInfo::of::<T>())
.or_insert(MessageRegistration::new::<T>())
.operations;
if ops.buffer_access_impl.is_some() {
return false;
}
ops.buffer_access_impl = Some(|buffers, builder| {
let buffer_access =
builder.try_create_buffer_access::<T::Message, T::BufferKeys>(buffers)?;
Ok(buffer_access.into())
});
ops.accessor_hints = Some(<<T::BufferKeys as Accessor>::Buffers as BufferMapLayout>::get_buffer_message_type_hints);
true
}
pub fn listen(
&self,
target_type: &TypeInfo,
buffers: &BufferMap,
builder: &mut Builder,
) -> Result<DynOutput, DiagramErrorCode> {
self.messages
.get(target_type)
.and_then(|reg| reg.operations.listen_impl.as_ref())
.ok_or_else(|| DiagramErrorCode::CannotListen(*target_type))
.and_then(|f| f(buffers, builder))
}
pub fn listen_hint(
&self,
message_info: &TypeInfo,
identifiers: HashSet<BufferIdentifier<'static>>,
) -> Result<MessageTypeHintMap, DiagramErrorCode> {
self.messages
.get(message_info)
.and_then(|reg| reg.operations.listen_hints)
.ok_or_else(|| DiagramErrorCode::CannotListen(*message_info))
.and_then(|f| (f)(identifiers).map_err(Into::into))
}
pub(super) fn register_listen<T>(&mut self) -> bool
where
T: Send + Sync + 'static + Any + Accessor,
{
let ops = &mut self
.messages
.entry(TypeInfo::of::<T>())
.or_insert(MessageRegistration::new::<T>())
.operations;
if ops.listen_impl.is_some() {
return false;
}
ops.listen_impl =
Some(|buffers, builder| Ok(builder.try_listen::<T>(buffers)?.output().into()));
ops.listen_hints = Some(<T::Buffers as BufferMapLayout>::get_buffer_message_type_hints);
true
}
pub(super) fn register_to_string<T>(&mut self)
where
T: 'static + Send + Sync + ToString,
{
let ops = &mut self
.messages
.entry(TypeInfo::of::<T>())
.or_insert(MessageRegistration::new::<T>())
.operations;
ops.to_string_impl =
Some(|builder| builder.create_map_block(|msg: T| msg.to_string()).into());
}
fn serialize_messages<S>(
v: &HashMap<TypeInfo, MessageRegistration>,
serializer: S,
) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut s = serializer.serialize_map(Some(v.len()))?;
for (type_id, reg) in v {
if type_id == &TypeInfo::of::<serde_json::Value>() {
continue;
}
s.serialize_entry(reg.type_name, reg)?;
}
s.end()
}
}
#[derive(JsonSchema)]
#[schemars(rename = "MessageRegistry", inline)]
struct MessageRegistrySerializeSchemas {
#[allow(unused)] #[schemars(flatten)]
schemas: serde_json::Map<String, serde_json::Value>,
}
impl MessageRegistrySerializeSchemas {
fn serialize<S>(v: &SchemaGenerator, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
v.definitions().serialize(serializer)
}
}
impl Default for DiagramElementRegistry {
fn default() -> Self {
JsonBuffer::register_for::<()>();
let mut registry = DiagramElementRegistry {
nodes: Default::default(),
sections: Default::default(),
messages: MessageRegistry::new(),
trace_supported: crate::trace_supported(),
};
registry.register_builtin_messages();
registry
}
}
impl DiagramElementRegistry {
pub fn new() -> Self {
Self::default()
}
pub fn blank() -> Self {
JsonBuffer::register_for::<()>();
DiagramElementRegistry {
nodes: Default::default(),
sections: Default::default(),
messages: MessageRegistry::new(),
trace_supported: crate::trace_supported(),
}
}
pub fn register_node_builder<Config, Request, Response, Streams: StreamPack>(
&mut self,
options: NodeBuilderOptions,
builder: impl FnMut(&mut Builder, Config) -> Node<Request, Response, Streams> + Send + 'static,
) -> NodeRegistrationBuilder<'_, Request, Response, Streams>
where
Config: JsonSchema + DeserializeOwned,
Request: Send + Sync + 'static + DynType + Serialize + DeserializeOwned + Clone,
Response: Send + Sync + 'static + DynType + Serialize + DeserializeOwned + Clone,
{
self.opt_out().register_node_builder(options, builder)
}
pub fn register_node_builder_fallible<Config, Request, Response, Streams: StreamPack>(
&mut self,
options: NodeBuilderOptions,
builder: impl FnMut(&mut Builder, Config) -> Result<Node<Request, Response, Streams>, Anyhow>
+ Send
+ 'static,
) -> NodeRegistrationBuilder<'_, Request, Response, Streams>
where
Config: JsonSchema + DeserializeOwned,
Request: Send + Sync + 'static + DynType + Serialize + DeserializeOwned + Clone,
Response: Send + Sync + 'static + DynType + Serialize + DeserializeOwned + Clone,
{
self.opt_out()
.register_node_builder_fallible(options, builder)
}
pub fn register_message<Message>(&mut self) -> MessageRegistrationBuilder<'_, Message>
where
Message: Send + Sync + 'static + DynType + DeserializeOwned + Serialize + Clone,
{
self.opt_out().register_message()
}
pub fn register_section_builder<Config, SectionT>(
&mut self,
options: SectionBuilderOptions,
mut section_builder: impl FnMut(&mut Builder, Config) -> SectionT + Send + 'static,
) where
SectionT: Section + SectionMetadataProvider + 'static,
Config: DeserializeOwned + JsonSchema,
{
self.register_section_builder_fallible(options, move |builder, config| {
Ok(section_builder(builder, config))
});
}
pub fn register_section_builder_fallible<Config, SectionT>(
&mut self,
options: SectionBuilderOptions,
mut section_builder: impl FnMut(&mut Builder, Config) -> Result<SectionT, Anyhow>
+ Send
+ 'static,
) where
SectionT: Section + SectionMetadataProvider + 'static,
Config: DeserializeOwned + JsonSchema,
{
let builder_id = Arc::clone(&options.id);
let registration = SectionRegistration {
default_display_text: options
.default_display_text
.as_ref()
.unwrap_or(&options.id)
.clone(),
metadata: SectionT::metadata().clone(),
config_schema: self.messages.schema_generator.subschema_for::<()>(),
create_section_impl: RefCell::new(Box::new(move |builder, config| {
let section =
section_builder(builder, serde_json::from_value::<Config>(config).unwrap())
.map_err(|error| DiagramErrorCode::NodeBuildingError {
builder: Arc::clone(&builder_id),
error,
})?;
Ok(Box::new(section))
})),
description: options.description.clone(),
config_examples: options.config_examples.clone(),
};
self.sections.insert(options.id, registration);
SectionT::on_register(self);
}
pub fn opt_out(&mut self) -> CommonOperations<'_, Supported, Supported, Supported> {
CommonOperations {
registry: self,
_ignore: Default::default(),
}
}
pub fn get_node_registration<Q>(&self, id: &Q) -> Result<&NodeRegistration, DiagramErrorCode>
where
Q: Borrow<str> + ?Sized,
{
let k = id.borrow();
self.nodes
.get(k)
.ok_or(DiagramErrorCode::BuilderNotFound(k.to_string().into()))
}
pub fn get_section_registration<Q>(
&self,
id: &Q,
) -> Result<&SectionRegistration, DiagramErrorCode>
where
Q: Borrow<str> + ?Sized,
{
self.sections
.get(id.borrow())
.ok_or_else(|| DiagramErrorCode::BuilderNotFound(id.borrow().to_string().into()))
}
pub fn get_message_registration<T>(&self) -> Option<&MessageRegistration>
where
T: Any,
{
self.messages.get::<T>()
}
pub fn register_builtin_messages(&mut self) {
self.register_message::<JsonMessage>()
.with_join()
.with_split();
self.opt_out()
.no_serializing()
.no_deserializing()
.no_cloning()
.register_message::<TransformError>()
.with_to_string();
self.register_message::<String>();
self.register_message::<u8>();
self.register_message::<u16>();
self.register_message::<u32>();
self.register_message::<u64>();
self.register_message::<usize>();
self.register_message::<i8>();
self.register_message::<i16>();
self.register_message::<i32>();
self.register_message::<i64>();
self.register_message::<isize>();
self.register_message::<f32>();
self.register_message::<f64>();
self.register_message::<bool>();
self.register_message::<char>();
self.register_message::<()>();
}
}
#[derive(Clone)]
#[non_exhaustive]
pub struct NodeBuilderOptions {
pub id: BuilderId,
pub default_display_text: Option<BuilderId>,
pub description: Option<String>,
pub config_examples: Vec<ConfigExample>,
}
#[derive(Clone, Serialize, JsonSchema)]
pub struct ConfigExample {
pub description: String,
pub config: JsonMessage,
}
impl ConfigExample {
pub fn new(description: impl ToString, config: impl Serialize) -> Self {
Self {
description: description.to_string(),
config: serde_json::to_value(config).expect("failed to serialize example config"),
}
}
}
impl NodeBuilderOptions {
pub fn new(id: impl Into<BuilderId>) -> Self {
Self {
id: id.into(),
default_display_text: None,
description: None,
config_examples: Default::default(),
}
}
pub fn with_default_display_text(mut self, text: impl Into<DisplayText>) -> Self {
self.default_display_text = Some(text.into());
self
}
pub fn with_description(mut self, text: impl Into<String>) -> Self {
self.description = Some(text.into());
self
}
pub fn with_config_examples(
mut self,
config_examples: impl IntoIterator<Item = ConfigExample>,
) -> Self {
self.config_examples = config_examples.into_iter().collect();
self
}
}
#[non_exhaustive]
pub struct SectionBuilderOptions {
pub id: BuilderId,
pub default_display_text: Option<BuilderId>,
pub description: Option<String>,
pub config_examples: Vec<ConfigExample>,
}
impl SectionBuilderOptions {
pub fn new(id: impl Into<BuilderId>) -> Self {
Self {
id: id.into(),
default_display_text: None,
description: None,
config_examples: Default::default(),
}
}
pub fn with_default_display_text(mut self, text: impl Into<DisplayText>) -> Self {
self.default_display_text = Some(text.into());
self
}
pub fn with_description(mut self, text: impl Into<String>) -> Self {
self.description = Some(text.into());
self
}
pub fn with_config_examples(
mut self,
config_examples: impl IntoIterator<Item = ConfigExample>,
) -> Self {
self.config_examples = config_examples.into_iter().collect();
self
}
}
#[cfg(test)]
mod tests {
use schemars::JsonSchema;
use serde::Deserialize;
use super::*;
use crate::*;
fn multiply3(i: i64) -> i64 {
i * 3
}
#[derive(StreamPack)]
struct TestStreamRegistration {
foo_stream: i64,
bar_stream: f64,
baz_stream: String,
}
impl MessageOperation {
fn deserializable(&self) -> bool {
self.deserialize_impl.is_some()
}
fn serializable(&self) -> bool {
self.serialize_impl.is_some()
}
fn cloneable(&self) -> bool {
self.fork_clone_impl.is_some()
}
fn unzippable(&self) -> bool {
self.unzip_impl.is_some()
}
fn can_fork_result(&self) -> bool {
self.fork_result_impl.is_some()
}
fn splittable(&self) -> bool {
self.split_impl.is_some()
}
fn joinable(&self) -> bool {
self.join_impl.is_some()
}
}
#[test]
fn test_register_node_builder() {
let mut registry = DiagramElementRegistry::new();
registry.opt_out().register_node_builder(
NodeBuilderOptions::new("multiply3").with_default_display_text("Test Name"),
|builder, _config: ()| builder.create_map_block(multiply3),
);
let req_ops = ®istry.messages.get::<i64>().unwrap().operations;
let resp_ops = ®istry.messages.get::<i64>().unwrap().operations;
assert!(req_ops.deserializable());
assert!(resp_ops.serializable());
assert!(resp_ops.cloneable());
assert!(!resp_ops.unzippable());
assert!(!resp_ops.can_fork_result());
assert!(!resp_ops.splittable());
assert!(!resp_ops.joinable());
}
#[test]
fn test_register_cloneable_node() {
let mut registry = DiagramElementRegistry::new();
registry.register_node_builder(
NodeBuilderOptions::new("multiply3").with_default_display_text("Test Name"),
|builder, _config: ()| builder.create_map_block(multiply3),
);
let req_ops = ®istry.messages.get::<i64>().unwrap().operations;
let resp_ops = ®istry.messages.get::<i64>().unwrap().operations;
assert!(req_ops.deserializable());
assert!(resp_ops.serializable());
assert!(resp_ops.cloneable());
}
#[test]
fn test_register_unzippable_node() {
let mut registry = DiagramElementRegistry::new();
let tuple_resp = |_: ()| -> (i64,) { (1,) };
registry
.opt_out()
.no_cloning()
.register_node_builder(
NodeBuilderOptions::new("multiply3_uncloneable")
.with_default_display_text("Test Name"),
move |builder: &mut Builder, _config: ()| builder.create_map_block(tuple_resp),
)
.with_unzip();
let req_ops = ®istry.messages.get::<()>().unwrap().operations;
let resp_ops = ®istry.messages.get::<(i64,)>().unwrap().operations;
assert!(req_ops.deserializable());
assert!(resp_ops.serializable());
assert!(resp_ops.unzippable());
}
#[test]
fn test_register_splittable_node() {
let mut registry = DiagramElementRegistry::new();
let vec_resp = |_: ()| -> Vec<i64> { vec![1, 2] };
registry
.register_node_builder(
NodeBuilderOptions::new("vec_resp").with_default_display_text("Test Name"),
move |builder: &mut Builder, _config: ()| builder.create_map_block(vec_resp),
)
.with_split();
assert!(
registry
.messages
.get::<Vec<i64>>()
.unwrap()
.operations
.splittable()
);
let map_resp = |_: ()| -> HashMap<String, i64> { HashMap::new() };
registry
.register_node_builder(
NodeBuilderOptions::new("map_resp").with_default_display_text("Test Name"),
move |builder: &mut Builder, _config: ()| builder.create_map_block(map_resp),
)
.with_split();
assert!(
registry
.messages
.get::<HashMap<String, i64>>()
.unwrap()
.operations
.splittable()
);
registry.register_node_builder(
NodeBuilderOptions::new("not_splittable").with_default_display_text("Test Name"),
move |builder: &mut Builder, _config: ()| builder.create_map_block(map_resp),
);
assert!(
registry
.messages
.get::<HashMap<String, i64>>()
.unwrap()
.operations
.splittable()
);
}
#[test]
fn test_register_with_config() {
let mut registry = DiagramElementRegistry::new();
#[derive(Deserialize, JsonSchema)]
struct TestConfig {
by: i64,
}
registry.register_node_builder(
NodeBuilderOptions::new("multiply").with_default_display_text("Test Name"),
move |builder: &mut Builder, config: TestConfig| {
builder.create_map_block(move |operand: i64| operand * config.by)
},
);
assert!(registry.get_node_registration("multiply").is_ok());
}
struct NonSerializableRequest {}
#[test]
fn test_register_opaque_node() {
let opaque_request_map = |_: NonSerializableRequest| {};
let mut registry = DiagramElementRegistry::new();
registry
.opt_out()
.no_serializing()
.no_deserializing()
.no_cloning()
.register_node_builder(
NodeBuilderOptions::new("opaque_request_map")
.with_default_display_text("Test Name"),
move |builder, _config: ()| builder.create_map_block(opaque_request_map),
)
.with_serialize_response();
assert!(registry.get_node_registration("opaque_request_map").is_ok());
let req_ops = ®istry
.messages
.get::<NonSerializableRequest>()
.unwrap()
.operations;
let resp_ops = ®istry.messages.get::<()>().unwrap().operations;
assert!(!req_ops.deserializable());
assert!(resp_ops.serializable());
let opaque_response_map = |_: ()| NonSerializableRequest {};
registry
.opt_out()
.no_serializing()
.no_deserializing()
.no_cloning()
.register_node_builder(
NodeBuilderOptions::new("opaque_response_map")
.with_default_display_text("Test Name"),
move |builder: &mut Builder, _config: ()| {
builder.create_map_block(opaque_response_map)
},
)
.with_deserialize_request();
assert!(
registry
.get_node_registration("opaque_response_map")
.is_ok()
);
let req_ops = ®istry.messages.get::<()>().unwrap().operations;
let resp_ops = ®istry
.messages
.get::<NonSerializableRequest>()
.unwrap()
.operations;
assert!(req_ops.deserializable());
assert!(!resp_ops.serializable());
let opaque_req_resp_map = |_: NonSerializableRequest| NonSerializableRequest {};
registry
.opt_out()
.no_deserializing()
.no_serializing()
.no_cloning()
.register_node_builder(
NodeBuilderOptions::new("opaque_req_resp_map")
.with_default_display_text("Test Name"),
move |builder: &mut Builder, _config: ()| {
builder.create_map_block(opaque_req_resp_map)
},
);
assert!(
registry
.get_node_registration("opaque_req_resp_map")
.is_ok()
);
let req_ops = ®istry
.messages
.get::<NonSerializableRequest>()
.unwrap()
.operations;
let resp_ops = ®istry
.messages
.get::<NonSerializableRequest>()
.unwrap()
.operations;
assert!(!req_ops.deserializable());
assert!(!resp_ops.serializable());
}
#[test]
fn test_register_message() {
let mut registry = DiagramElementRegistry::new();
#[derive(Deserialize, Serialize, JsonSchema, Clone)]
struct TestMessage;
registry.opt_out().register_message::<TestMessage>();
let ops = ®istry
.get_message_registration::<TestMessage>()
.unwrap()
.operations;
assert!(ops.deserializable());
assert!(ops.serializable());
assert!(ops.cloneable());
assert!(!ops.unzippable());
assert!(!ops.can_fork_result());
assert!(!ops.splittable());
assert!(!ops.joinable());
}
#[test]
fn test_serialize_registry() {
let mut reg = DiagramElementRegistry::new();
#[derive(Deserialize, Serialize, JsonSchema, Clone)]
struct Foo {
hello: String,
}
#[derive(Deserialize, Serialize, JsonSchema, Clone)]
struct Bar {
foo: Foo,
}
struct Opaque;
reg.opt_out()
.no_serializing()
.no_deserializing()
.no_cloning()
.register_node_builder(NodeBuilderOptions::new("test"), |builder, _config: ()| {
builder.create_map_block(|_: Opaque| {
(
Foo {
hello: "hello".to_string(),
},
Bar {
foo: Foo {
hello: "world".to_string(),
},
},
)
})
})
.with_unzip();
reg.register_node_builder(
NodeBuilderOptions::new("stream_test"),
|builder, _config: ()| {
builder.create_map(|input: BlockingMap<f64, TestStreamRegistration>| {
let value = input.request;
input.streams.foo_stream.send(value as i64);
input.streams.bar_stream.send(value);
input.streams.baz_stream.send(value.to_string());
})
},
);
println!("{}", serde_json::to_string_pretty(®).unwrap());
let value = serde_json::to_value(®).unwrap();
let messages = &value["messages"];
let schemas = &value["schemas"];
let bar_schema = &messages[type_name::<Bar>()]["schema"];
assert_eq!(bar_schema["$ref"].as_str().unwrap(), "#/schemas/Bar");
assert!(schemas.get("Bar").is_some());
assert!(schemas.get("Foo").is_some());
let nodes = &value["nodes"];
let stream_test_schema = &nodes["stream_test"];
let streams = &stream_test_schema["streams"];
assert_eq!(
streams["foo_stream"].as_str().unwrap(),
TypeInfo::of::<i64>().type_name
);
assert_eq!(
streams["bar_stream"].as_str().unwrap(),
TypeInfo::of::<f64>().type_name
);
assert_eq!(
streams["baz_stream"].as_str().unwrap(),
TypeInfo::of::<String>().type_name
);
}
#[test]
fn test_serialize_js_empty_object() {
let json = serde_json::to_string(&JsEmptyObject {}).unwrap();
assert_eq!(json, "{}");
}
#[test]
fn test_deserialize_js_empty_object() {
serde_json::from_str::<JsEmptyObject>("{}").unwrap();
serde_json::from_str::<JsEmptyObject>(r#"{ "extra": "fields" }"#).unwrap();
assert!(serde_json::from_str::<JsEmptyObject>(r#""some string""#).is_err());
assert!(serde_json::from_str::<JsEmptyObject>("123").is_err());
assert!(serde_json::from_str::<JsEmptyObject>("true").is_err());
assert!(serde_json::from_str::<JsEmptyObject>("null").is_err());
}
}