use flow_graph_interpreter::{HandlerMap, NamespaceHandler};
use wick_config::config::ComponentDefinition;
use wick_config::Resolver;
use super::error::ConstraintFailure;
use super::ChildInit;
use crate::components::{init_hlc_component, init_manifest_component, init_wasmrs_component};
use crate::dev::prelude::*;
use crate::runtime::RuntimeConstraint;
pub(super) fn assert_constraints(constraints: &[RuntimeConstraint], components: &HandlerMap) -> Result<(), ScopeError> {
for constraint in constraints {
#[allow(irrefutable_let_patterns)]
if let RuntimeConstraint::Operation { entity, signature } = constraint {
let handler = components
.get(entity.component_id())
.ok_or_else(|| ScopeError::InvalidConstraint(ConstraintFailure::ComponentNotFound(entity.clone())))?;
let sig = handler.component().signature();
let op = sig.get_operation(entity.operation_id()).ok_or_else(|| {
ScopeError::InvalidConstraint(ConstraintFailure::OperationNotFound(
entity.clone(),
sig.operations.iter().map(|o| o.name().to_owned()).collect(),
))
})?;
for field in &signature.inputs {
op.inputs
.iter()
.find(|sig_field| sig_field.name == field.name)
.ok_or_else(|| {
ScopeError::InvalidConstraint(ConstraintFailure::InputNotFound(entity.clone(), field.name.clone()))
})?;
}
for field in &signature.outputs {
op.outputs
.iter()
.find(|sig_field| sig_field.name == field.name)
.ok_or_else(|| {
ScopeError::InvalidConstraint(ConstraintFailure::OutputNotFound(entity.clone(), field.name.clone()))
})?;
}
}
}
Ok(())
}
pub(crate) async fn instantiate_import(
binding: &config::Binding<config::ImportDefinition>,
opts: ChildInit,
resolver: Box<Resolver>,
) -> Result<Option<NamespaceHandler>, ScopeError> {
opts.span.in_scope(|| {
debug!(id = binding.id(), ?opts, "init options");
});
let id = binding.id().to_owned();
let start = std::time::Instant::now();
let span = opts.span.clone();
let config::ImportDefinition::Component(config) = binding.kind() else {
return Ok(None);
};
let result = instantiate_imported_component(id, config, opts, resolver).await;
let end = std::time::Instant::now();
span.in_scope(|| {
info!(id = binding.id(), duration_ms = %end.duration_since(start).as_millis(), "initialized");
});
result
}
pub(crate) async fn instantiate_imported_component(
id: String,
kind: &ComponentDefinition,
opts: ChildInit,
resolver: Box<Resolver>,
) -> Result<Option<NamespaceHandler>, ScopeError> {
match kind {
#[allow(deprecated)]
config::ComponentDefinition::Wasm(def) => Ok(Some(
init_wasmrs_component(
def.reference(),
id,
opts,
None,
None,
Default::default(),
Default::default(),
)
.await?,
)),
config::ComponentDefinition::Manifest(def) => Ok(Some(init_manifest_component(def, id, opts).await?)),
config::ComponentDefinition::Reference(_) => unreachable!(),
config::ComponentDefinition::GrpcUrl(_) => todo!(), config::ComponentDefinition::HighLevelComponent(hlc) => {
init_hlc_component(id, opts.root_config.clone(), None, hlc.clone(), resolver)
.await
.map(Some)
}
config::ComponentDefinition::Native(_) => Ok(None),
}
}
#[cfg(test)]
mod test {
use anyhow::Result;
use flow_component::Component;
use wick_interface_types::{component, operation, ComponentSignature};
use wick_packet::{Entity, Invocation, PacketStream, RuntimeConfig};
use super::*;
struct TestComponent {
signature: ComponentSignature,
}
impl TestComponent {
fn new() -> Self {
Self {
signature: component! {
name: "test",
version: Some("0.0.1"),
operations: {
"testop" => {
inputs: {
"in" => "object",
},
outputs: {
"out" => "object",
},
},
}
},
}
}
}
impl Component for TestComponent {
fn handle(
&self,
_invocation: Invocation,
_data: Option<RuntimeConfig>,
_callback: LocalScope,
) -> flow_component::BoxFuture<std::result::Result<PacketStream, flow_component::ComponentError>> {
todo!()
}
fn signature(&self) -> &ComponentSignature {
&self.signature
}
}
#[test]
fn test_constraints() -> Result<()> {
let mut components = HandlerMap::default();
components.add(NamespaceHandler::new("test", Box::new(TestComponent::new())))?;
let constraints = vec![RuntimeConstraint::Operation {
entity: Entity::operation("test", "testop"),
signature: operation!(
"testop" => {
inputs: {
"in" => "object",
},
outputs: {
"out" => "object",
},
}
),
}];
assert_constraints(&constraints, &components)?;
let constraints = vec![RuntimeConstraint::Operation {
entity: Entity::operation("test", "testop"),
signature: operation!(
"testop" => {
inputs: {
"otherin" => "object",
},
outputs: {
"otherout" => "object",
},
}
),
}];
let result = assert_constraints(&constraints, &components);
assert!(result.is_err());
Ok(())
}
}