#![allow(clippy::type_complexity)]
use std::{fmt::Debug, hash::Hash};
use futures::stream::{StreamExt, TryStreamExt};
use peace_cfg::ItemId;
use peace_params::ParamsSpecs;
use peace_resources::{
internal::{FlowParamsFile, ProfileParamsFile, WorkspaceParamsFile},
paths::ParamsSpecsFile,
resources::ts::{Empty, SetUp},
Resources,
};
use peace_rt_model::{
fn_graph::resman::Resource,
params::{FlowParams, ProfileParams, WorkspaceParams},
Error, Flow, ItemGraph, ItemParamsTypeReg, ParamsSpecsTypeReg, StatesTypeReg, Storage,
Workspace, WorkspaceInitializer,
};
use serde::{de::DeserializeOwned, Serialize};
pub use self::{
multi_profile_no_flow_builder::MultiProfileNoFlowBuilder,
multi_profile_single_flow_builder::MultiProfileSingleFlowBuilder,
no_profile_no_flow_builder::NoProfileNoFlowBuilder,
single_profile_no_flow_builder::SingleProfileNoFlowBuilder,
single_profile_single_flow_builder::SingleProfileSingleFlowBuilder,
};
mod multi_profile_no_flow_builder;
mod multi_profile_single_flow_builder;
mod no_profile_no_flow_builder;
mod single_profile_no_flow_builder;
mod single_profile_single_flow_builder;
#[derive(Debug)]
pub struct CmdCtxBuilder<'ctx, O, ScopeBuilder> {
output: &'ctx mut O,
workspace: &'ctx Workspace,
scope_builder: ScopeBuilder,
}
async fn workspace_params_serialize<WorkspaceParamsK>(
workspace_params: &WorkspaceParams<WorkspaceParamsK>,
storage: &Storage,
workspace_params_file: &WorkspaceParamsFile,
) -> Result<(), Error>
where
WorkspaceParamsK:
Clone + Debug + Eq + Hash + DeserializeOwned + Serialize + Send + Sync + 'static,
{
WorkspaceInitializer::workspace_params_serialize(
storage,
workspace_params,
workspace_params_file,
)
.await?;
Ok(())
}
fn workspace_params_insert<WorkspaceParamsK>(
mut workspace_params: WorkspaceParams<WorkspaceParamsK>,
resources: &mut Resources<Empty>,
) where
WorkspaceParamsK:
Clone + Debug + Eq + Hash + DeserializeOwned + Serialize + Send + Sync + 'static,
{
workspace_params
.drain(..)
.for_each(|(_key, workspace_param)| {
let workspace_param = workspace_param.into_inner().upcast();
let type_id = Resource::type_id(&*workspace_param);
resources.insert_raw(type_id, workspace_param);
});
}
async fn profile_params_serialize<ProfileParamsK>(
profile_params: &ProfileParams<ProfileParamsK>,
storage: &Storage,
profile_params_file: &ProfileParamsFile,
) -> Result<(), Error>
where
ProfileParamsK:
Clone + Debug + Eq + Hash + DeserializeOwned + Serialize + Send + Sync + 'static,
{
WorkspaceInitializer::profile_params_serialize(storage, profile_params, profile_params_file)
.await?;
Ok(())
}
fn profile_params_insert<ProfileParamsK>(
mut profile_params: ProfileParams<ProfileParamsK>,
resources: &mut Resources<Empty>,
) where
ProfileParamsK:
Clone + Debug + Eq + Hash + DeserializeOwned + Serialize + Send + Sync + 'static,
{
profile_params.drain(..).for_each(|(_key, profile_param)| {
let profile_param = profile_param.into_inner().upcast();
let type_id = Resource::type_id(&*profile_param);
resources.insert_raw(type_id, profile_param);
});
}
async fn flow_params_serialize<FlowParamsK>(
flow_params: &FlowParams<FlowParamsK>,
storage: &Storage,
flow_params_file: &FlowParamsFile,
) -> Result<(), Error>
where
FlowParamsK: Clone + Debug + Eq + Hash + DeserializeOwned + Serialize + Send + Sync + 'static,
{
WorkspaceInitializer::flow_params_serialize(storage, flow_params, flow_params_file).await?;
Ok(())
}
async fn params_specs_serialize(
params_specs: &ParamsSpecs,
storage: &Storage,
params_specs_file: &ParamsSpecsFile,
) -> Result<(), Error> {
storage
.serialized_write(
#[cfg(not(target_arch = "wasm32"))]
"cmd_ctx_builder::params_specs_serialize".to_string(),
params_specs_file,
params_specs,
Error::ParamsSpecsSerialize,
)
.await
}
fn flow_params_insert<FlowParamsK>(
mut flow_params: FlowParams<FlowParamsK>,
resources: &mut Resources<Empty>,
) where
FlowParamsK: Clone + Debug + Eq + Hash + DeserializeOwned + Serialize + Send + Sync + 'static,
{
flow_params.drain(..).for_each(|(_key, flow_param)| {
let flow_param = flow_param.into_inner().upcast();
let type_id = Resource::type_id(&*flow_param);
resources.insert_raw(type_id, flow_param);
});
}
#[cfg(not(target_arch = "wasm32"))]
pub(crate) async fn profiles_from_peace_app_dir(
peace_app_dir: &peace_resources::paths::PeaceAppDir,
profiles_filter_fn: Option<&dyn Fn(&peace_core::Profile) -> bool>,
) -> Result<Vec<peace_core::Profile>, peace_rt_model::Error> {
use std::{ffi::OsStr, str::FromStr};
let mut profiles = Vec::new();
let mut peace_app_read_dir = tokio::fs::read_dir(peace_app_dir).await.map_err(|error| {
peace_rt_model::Error::Native(peace_rt_model::NativeError::PeaceAppDirRead {
peace_app_dir: peace_app_dir.to_path_buf(),
error,
})
})?;
while let Some(entry) = peace_app_read_dir.next_entry().await.map_err(|error| {
peace_rt_model::Error::Native(peace_rt_model::NativeError::PeaceAppDirEntryRead {
peace_app_dir: peace_app_dir.to_path_buf(),
error,
})
})? {
let file_type = entry.file_type().await.map_err(|error| {
peace_rt_model::Error::Native(
peace_rt_model::NativeError::PeaceAppDirEntryFileTypeRead {
path: entry.path(),
error,
},
)
})?;
if file_type.is_dir() {
let entry_path = entry.path();
if let Some(dir_name) = entry_path.file_name().and_then(OsStr::to_str) {
let profile = peace_core::Profile::from_str(dir_name).map_err(|error| {
peace_rt_model::Error::Native(
peace_rt_model::NativeError::ProfileDirInvalidName {
dir_name: dir_name.to_string(),
path: entry_path.to_path_buf(),
error,
},
)
})?;
if let Some(profiles_filter_fn) = profiles_filter_fn {
if !profiles_filter_fn(&profile) {
continue;
}
}
profiles.push(profile)
}
}
}
profiles.sort();
Ok(profiles)
}
#[cfg(target_arch = "wasm32")]
pub(crate) async fn profiles_from_peace_app_dir(
_peace_app_dir: &peace_resources::paths::PeaceAppDir,
_profiles_filter_fn: Option<&dyn Fn(&peace_core::Profile) -> bool>,
) -> Result<Vec<peace_core::Profile>, peace_rt_model::Error> {
let profiles = Vec::new();
Ok(profiles)
}
fn params_and_states_type_reg<E>(
item_graph: &ItemGraph<E>,
) -> (ItemParamsTypeReg, ParamsSpecsTypeReg, StatesTypeReg)
where
E: 'static,
{
item_graph.iter().fold(
(
ItemParamsTypeReg::new(),
ParamsSpecsTypeReg::new(),
StatesTypeReg::new(),
),
|(mut item_params_type_reg, mut params_specs_type_reg, mut states_type_reg), item| {
item.params_and_state_register(
&mut item_params_type_reg,
&mut params_specs_type_reg,
&mut states_type_reg,
);
(item_params_type_reg, params_specs_type_reg, states_type_reg)
},
)
}
fn params_specs_merge<E>(
flow: &Flow<E>,
mut params_specs_provided: ParamsSpecs,
params_specs_stored: Option<ParamsSpecs>,
) -> Result<ParamsSpecs, Error>
where
E: From<Error> + 'static,
{
let item_graph = flow.graph();
let mut params_specs = ParamsSpecs::with_capacity(item_graph.node_count());
let mut item_ids_with_no_params_specs = Vec::<ItemId>::new();
let mut params_specs_stored_mismatches = None;
let mut params_specs_not_usable = Vec::<ItemId>::new();
if let Some(mut params_specs_stored) = params_specs_stored {
item_graph.iter_insertion().for_each(|item_rt| {
let item_id = item_rt.id();
let params_spec_provided = params_specs_provided.remove_entry(item_id);
let params_spec_stored = params_specs_stored.remove_entry(item_id);
let params_spec_to_use = match (params_spec_provided, params_spec_stored) {
(None, None) => None,
(None, Some(params_spec_stored)) => Some(params_spec_stored),
(Some(params_spec_provided), None) => Some(params_spec_provided),
(
Some((item_id, mut params_spec_provided)),
Some((_item_id, params_spec_stored)),
) => {
params_spec_provided.merge(&*params_spec_stored);
Some((item_id, params_spec_provided))
}
};
if let Some((item_id, params_spec_boxed)) = params_spec_to_use {
if params_spec_boxed.is_usable() {
params_specs.insert_raw(item_id, params_spec_boxed);
} else {
params_specs_not_usable.push(item_id);
}
} else {
item_ids_with_no_params_specs.push(item_id.clone());
}
});
params_specs_stored_mismatches = Some(params_specs_stored);
} else {
item_graph.iter_insertion().for_each(|item_rt| {
let item_id = item_rt.id();
if let Some((item_id, params_spec_boxed)) = params_specs_provided.remove_entry(item_id)
{
params_specs.insert_raw(item_id, params_spec_boxed);
} else {
item_ids_with_no_params_specs.push(item_id.clone());
}
});
}
let params_specs_provided_mismatches = params_specs_provided;
let params_no_issues = item_ids_with_no_params_specs.is_empty()
&& params_specs_provided_mismatches.is_empty()
&& params_specs_stored_mismatches
.as_ref()
.map(|params_specs_stored_mismatches| params_specs_stored_mismatches.is_empty())
.unwrap_or(true)
&& params_specs_not_usable.is_empty();
if params_no_issues {
Ok(params_specs)
} else {
Err(Error::ParamsSpecsMismatch {
item_ids_with_no_params_specs,
params_specs_provided_mismatches,
params_specs_stored_mismatches,
params_specs_not_usable,
})
}
}
async fn item_graph_setup<E>(
item_graph: &ItemGraph<E>,
resources: Resources<Empty>,
) -> Result<Resources<SetUp>, E>
where
E: std::error::Error + 'static,
{
let resources = item_graph
.stream()
.map(Ok::<_, E>)
.try_fold(resources, |mut resources, item| async move {
item.setup(&mut resources).await?;
Ok(resources)
})
.await?;
Ok(Resources::<SetUp>::from(resources))
}