use std::marker::PhantomData;
use futures::{
stream::{StreamExt, TryStreamExt},
TryStream,
};
use peace_cfg::OpCheckStatus;
use peace_resources::{
internal::OpCheckStatuses,
resources::ts::{Cleaned, CleanedDry, SetUp, WithStatesCurrent},
states::{StatesCleaned, StatesCleanedDry},
Resources,
};
use peace_rt_model::{
cmd::CmdContext, output::OutputWrite, Error, FnRef, ItemSpecBoxed, ItemSpecGraph,
};
use crate::cmds::sub::StatesCurrentDiscoverCmd;
#[derive(Debug)]
pub struct CleanCmd<E, O>(PhantomData<(E, O)>);
impl<E, O> CleanCmd<E, O>
where
E: std::error::Error + From<Error> + Send,
O: OutputWrite<E>,
{
pub async fn exec_dry(
cmd_context: CmdContext<'_, E, O, SetUp>,
) -> Result<CmdContext<'_, E, O, CleanedDry>, E> {
let CmdContext {
workspace,
item_spec_graph,
output,
resources,
states_type_regs,
#[cfg(feature = "output_progress")]
cmd_progress_tracker,
..
} = cmd_context;
let resources_result = Self::exec_dry_internal(item_spec_graph, resources).await;
match resources_result {
Ok(resources) => {
{
let states_cleaned_dry = resources.borrow::<StatesCleanedDry>();
output.write_states_cleaned_dry(&states_cleaned_dry).await?;
}
let cmd_context = CmdContext::from((
workspace,
item_spec_graph,
output,
resources,
states_type_regs,
#[cfg(feature = "output_progress")]
cmd_progress_tracker,
));
Ok(cmd_context)
}
Err(e) => {
output.write_err(&e).await?;
Err(e)
}
}
}
pub(crate) async fn exec_dry_internal(
item_spec_graph: &ItemSpecGraph<E>,
mut resources: Resources<SetUp>,
) -> Result<Resources<CleanedDry>, E> {
#[allow(clippy::needless_borrow)]
let states_current =
StatesCurrentDiscoverCmd::<E, O>::exec_internal(item_spec_graph, &mut resources)
.await?;
let resources = Resources::<WithStatesCurrent>::from((resources, states_current));
let op_check_statuses = Self::clean_op_spec_check(item_spec_graph, &resources).await?;
Self::clean_op_spec_exec_dry(item_spec_graph, &resources, &op_check_statuses).await?;
let states_current = StatesCurrentDiscoverCmd::<E, O>::exec_internal_for_clean_dry(
item_spec_graph,
&resources,
)
.await?;
let states_cleaned_dry = StatesCleanedDry::from((states_current, &resources));
let resources = Resources::<CleanedDry>::from((resources, states_cleaned_dry));
Ok(resources)
}
async fn clean_op_spec_exec_dry(
item_spec_graph: &ItemSpecGraph<E>,
resources: &Resources<WithStatesCurrent>,
op_check_statuses: &OpCheckStatuses,
) -> Result<(), E> {
Self::clean_op_spec_stream(item_spec_graph, op_check_statuses)
.try_for_each(|item_spec| async move { item_spec.clean_op_exec_dry(resources).await })
.await?;
Ok(())
}
pub async fn exec(
cmd_context: CmdContext<'_, E, O, SetUp>,
) -> Result<CmdContext<'_, E, O, Cleaned>, E> {
let CmdContext {
workspace,
item_spec_graph,
output,
resources,
states_type_regs,
#[cfg(feature = "output_progress")]
cmd_progress_tracker,
..
} = cmd_context;
#[allow(clippy::needless_borrow)]
let resources_result = Self::exec_internal(item_spec_graph, resources).await;
match resources_result {
Ok(resources) => {
{
let states_cleaned = resources.borrow::<StatesCleaned>();
output.write_states_cleaned(&states_cleaned).await?;
}
let cmd_context = CmdContext::from((
workspace,
item_spec_graph,
output,
resources,
states_type_regs,
#[cfg(feature = "output_progress")]
cmd_progress_tracker,
));
Ok(cmd_context)
}
Err(e) => {
output.write_err(&e).await?;
Err(e)
}
}
}
pub(crate) async fn exec_internal(
item_spec_graph: &ItemSpecGraph<E>,
mut resources: Resources<SetUp>,
) -> Result<Resources<Cleaned>, E> {
#[allow(clippy::needless_borrow)]
let states =
StatesCurrentDiscoverCmd::<E, O>::exec_internal(item_spec_graph, &mut resources)
.await?;
let mut resources = Resources::<WithStatesCurrent>::from((resources, states));
let op_check_statuses = Self::clean_op_spec_check(item_spec_graph, &resources).await?;
Self::clean_op_spec_exec(item_spec_graph, &resources, &op_check_statuses).await?;
let states_current = StatesCurrentDiscoverCmd::<E, O>::exec_internal_for_clean(
item_spec_graph,
&mut resources,
)
.await?;
let states_cleaned = StatesCleaned::from((states_current, &resources));
let resources = Resources::<Cleaned>::from((resources, states_cleaned));
Ok(resources)
}
async fn clean_op_spec_check(
item_spec_graph: &ItemSpecGraph<E>,
resources: &Resources<WithStatesCurrent>,
) -> Result<OpCheckStatuses, E> {
let op_check_statuses = item_spec_graph
.stream()
.map(Result::<_, E>::Ok)
.and_then(|item_spec| async move {
let op_check_status = item_spec.clean_op_check(resources).await?;
Ok((item_spec.id().clone(), op_check_status))
})
.try_collect::<OpCheckStatuses>()
.await?;
Ok(op_check_statuses)
}
async fn clean_op_spec_exec(
item_spec_graph: &ItemSpecGraph<E>,
resources: &Resources<WithStatesCurrent>,
op_check_statuses: &OpCheckStatuses,
) -> Result<(), E> {
Self::clean_op_spec_stream(item_spec_graph, op_check_statuses)
.try_for_each(|item_spec| async move { item_spec.clean_op_exec(resources).await })
.await?;
Ok(())
}
fn clean_op_spec_stream<'f>(
item_spec_graph: &'f ItemSpecGraph<E>,
op_check_statuses: &'f OpCheckStatuses,
) -> impl TryStream<Ok = FnRef<'f, ItemSpecBoxed<E>>, Error = E> {
item_spec_graph
.stream()
.filter(|item_spec| {
let exec_required = op_check_statuses
.get(item_spec.id())
.map(|op_check_status| {
matches!(op_check_status, OpCheckStatus::ExecRequired { .. })
})
.unwrap_or(true);
async move { exec_required }
})
.map(Result::Ok)
}
}