use std::sync::Arc;
use std::collections::HashMap;
use crate::taskflow::Taskflow;
use crate::task::{TaskHandle, TaskId};
#[derive(Clone)]
pub struct CloneableWork {
work: Arc<dyn Fn() + Send + Sync>,
}
impl CloneableWork {
pub fn new<F>(f: F) -> Self
where
F: Fn() + Send + Sync + 'static,
{
Self {
work: Arc::new(f),
}
}
pub fn execute(&self) {
(self.work)()
}
}
#[derive(Clone)]
pub struct CompositionParams {
params: HashMap<String, ParamValue>,
}
impl CompositionParams {
pub fn new() -> Self {
Self {
params: HashMap::new(),
}
}
pub fn set_int(&mut self, key: &str, value: i64) -> &mut Self {
self.params.insert(key.to_string(), ParamValue::Int(value));
self
}
pub fn set_string(&mut self, key: &str, value: String) -> &mut Self {
self.params.insert(key.to_string(), ParamValue::String(value));
self
}
pub fn set_float(&mut self, key: &str, value: f64) -> &mut Self {
self.params.insert(key.to_string(), ParamValue::Float(value));
self
}
pub fn set_bool(&mut self, key: &str, value: bool) -> &mut Self {
self.params.insert(key.to_string(), ParamValue::Bool(value));
self
}
pub fn get_int(&self, key: &str) -> Option<i64> {
match self.params.get(key) {
Some(ParamValue::Int(v)) => Some(*v),
_ => None,
}
}
pub fn get_string(&self, key: &str) -> Option<&str> {
match self.params.get(key) {
Some(ParamValue::String(v)) => Some(v.as_str()),
_ => None,
}
}
pub fn get_float(&self, key: &str) -> Option<f64> {
match self.params.get(key) {
Some(ParamValue::Float(v)) => Some(*v),
_ => None,
}
}
pub fn get_bool(&self, key: &str) -> Option<bool> {
match self.params.get(key) {
Some(ParamValue::Bool(v)) => Some(*v),
_ => None,
}
}
}
impl Default for CompositionParams {
fn default() -> Self {
Self::new()
}
}
#[derive(Clone, Debug)]
pub enum ParamValue {
Int(i64),
Float(f64),
String(String),
Bool(bool),
}
pub type CompositionFactory = Arc<dyn Fn(&CompositionParams) -> Composition + Send + Sync>;
pub struct ParameterizedComposition {
factory: CompositionFactory,
default_params: CompositionParams,
}
impl ParameterizedComposition {
pub fn new<F>(factory: F) -> Self
where
F: Fn(&CompositionParams) -> Composition + Send + Sync + 'static,
{
Self {
factory: Arc::new(factory),
default_params: CompositionParams::new(),
}
}
pub fn with_defaults(mut self, params: CompositionParams) -> Self {
self.default_params = params;
self
}
pub fn instantiate(&self, params: &CompositionParams) -> Composition {
(self.factory)(params)
}
pub fn instantiate_default(&self) -> Composition {
(self.factory)(&self.default_params)
}
pub fn compose_into(&self, target: &mut Taskflow, params: &CompositionParams) -> ComposedInstance {
let composition = self.instantiate(params);
composition.compose_into(target)
}
}
pub struct ModuleTask {
name: String,
entry_points: Vec<TaskHandle>,
exit_points: Vec<TaskHandle>,
}
impl ModuleTask {
pub fn new(name: &str) -> Self {
Self {
name: name.to_string(),
entry_points: Vec::new(),
exit_points: Vec::new(),
}
}
pub fn entries(&self) -> &[TaskHandle] {
&self.entry_points
}
pub fn exits(&self) -> &[TaskHandle] {
&self.exit_points
}
pub fn name(&self) -> &str {
&self.name
}
}
pub struct CompositionBuilder {
taskflow: Taskflow,
entry_tasks: Vec<TaskHandle>,
exit_tasks: Vec<TaskHandle>,
cloneable_works: HashMap<TaskId, CloneableWork>,
}
impl CompositionBuilder {
pub fn new() -> Self {
Self {
taskflow: Taskflow::new(),
entry_tasks: Vec::new(),
exit_tasks: Vec::new(),
cloneable_works: HashMap::new(),
}
}
pub fn taskflow_mut(&mut self) -> &mut Taskflow {
&mut self.taskflow
}
pub fn emplace_cloneable<F>(&mut self, work: F) -> TaskHandle
where
F: Fn() + Send + Sync + 'static,
{
let cloneable = CloneableWork::new(work);
let cloneable_clone = cloneable.clone();
let task = self.taskflow.emplace(move || {
cloneable_clone.execute();
});
self.cloneable_works.insert(task.id, cloneable);
task
}
pub fn mark_entries(&mut self, tasks: &[TaskHandle]) {
self.entry_tasks.extend_from_slice(tasks);
}
pub fn mark_exits(&mut self, tasks: &[TaskHandle]) {
self.exit_tasks.extend_from_slice(tasks);
}
pub fn build(self) -> Composition {
Composition {
taskflow: self.taskflow,
entry_tasks: self.entry_tasks,
exit_tasks: self.exit_tasks,
cloneable_works: self.cloneable_works,
}
}
}
impl Default for CompositionBuilder {
fn default() -> Self {
Self::new()
}
}
pub struct Composition {
taskflow: Taskflow,
entry_tasks: Vec<TaskHandle>,
exit_tasks: Vec<TaskHandle>,
cloneable_works: HashMap<TaskId, CloneableWork>,
}
impl Composition {
pub fn new(taskflow: Taskflow, entries: Vec<TaskHandle>, exits: Vec<TaskHandle>) -> Self {
Self {
taskflow,
entry_tasks: entries,
exit_tasks: exits,
cloneable_works: HashMap::new(),
}
}
pub fn entries(&self) -> &[TaskHandle] {
&self.entry_tasks
}
pub fn exits(&self) -> &[TaskHandle] {
&self.exit_tasks
}
pub fn compose_into(&self, target: &mut Taskflow) -> ComposedInstance {
let id_mapping = self.clone_graph_into(target);
let new_entries = self.entry_tasks.iter()
.filter_map(|h| id_mapping.get(&h.id).copied())
.map(|id| TaskHandle::new(id, target.get_graph()))
.collect::<Vec<_>>();
let new_exits = self.exit_tasks.iter()
.filter_map(|h| id_mapping.get(&h.id).copied())
.map(|id| TaskHandle::new(id, target.get_graph()))
.collect::<Vec<_>>();
ComposedInstance {
entries: new_entries,
exits: new_exits,
}
}
fn clone_graph_into(&self, target: &mut Taskflow) -> HashMap<TaskId, TaskId> {
let source_graph = self.taskflow.get_graph();
let source_guard = source_graph.lock().unwrap();
let mut id_mapping = HashMap::new();
for node in source_guard.iter() {
let new_task = if let Some(cloneable) = self.cloneable_works.get(&node.id) {
let work = cloneable.clone();
target.emplace(move || {
work.execute();
})
} else {
let node_name = node.name.clone();
target.emplace(move || {
println!(" [Composed: {}]", node_name);
})
};
let new_id = new_task.id;
id_mapping.insert(node.id, new_id);
let target_graph_arc = target.get_graph();
let mut target_graph = target_graph_arc.lock().unwrap();
if let Some(new_node) = target_graph.iter_mut().find(|n| n.id == new_id) {
new_node.name = node.name.clone();
}
}
for node in source_guard.iter() {
let new_id = id_mapping[&node.id];
let new_handle = TaskHandle::new(new_id, target.get_graph());
for succ_id in &node.successors {
if let Some(&new_succ_id) = id_mapping.get(succ_id) {
let succ_handle = TaskHandle::new(new_succ_id, target.get_graph());
new_handle.precede(&succ_handle);
}
}
}
id_mapping
}
pub fn taskflow(&self) -> &Taskflow {
&self.taskflow
}
}
pub struct ComposedInstance {
entries: Vec<TaskHandle>,
exits: Vec<TaskHandle>,
}
impl ComposedInstance {
pub fn entries(&self) -> &[TaskHandle] {
&self.entries
}
pub fn exits(&self) -> &[TaskHandle] {
&self.exits
}
pub fn entry(&self, index: usize) -> Option<&TaskHandle> {
self.entries.get(index)
}
pub fn exit(&self, index: usize) -> Option<&TaskHandle> {
self.exits.get(index)
}
}
pub trait TaskflowComposable {
fn compose(&mut self, other: &Composition) -> ComposedInstance;
fn compose_after(&mut self, predecessors: &[TaskHandle], other: &Composition) -> ComposedInstance;
fn compose_before(&mut self, other: &Composition, successors: &[TaskHandle]) -> ComposedInstance;
}
impl TaskflowComposable for Taskflow {
fn compose(&mut self, other: &Composition) -> ComposedInstance {
other.compose_into(self)
}
fn compose_after(&mut self, predecessors: &[TaskHandle], other: &Composition) -> ComposedInstance {
let instance = other.compose_into(self);
for pred in predecessors {
for entry in &instance.entries {
pred.precede(entry);
}
}
instance
}
fn compose_before(&mut self, other: &Composition, successors: &[TaskHandle]) -> ComposedInstance {
let instance = other.compose_into(self);
for exit in &instance.exits {
for succ in successors {
exit.precede(succ);
}
}
instance
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_composition_builder() {
let mut builder = CompositionBuilder::new();
let task = builder.taskflow_mut().emplace(|| {});
builder.mark_entries(&[task.clone()]);
builder.mark_exits(&[task]);
let composition = builder.build();
assert_eq!(composition.entries().len(), 1);
assert_eq!(composition.exits().len(), 1);
}
#[test]
fn test_cloneable_work() {
let counter = Arc::new(std::sync::Mutex::new(0));
let c = counter.clone();
let work = CloneableWork::new(move || {
*c.lock().unwrap() += 1;
});
work.execute();
work.execute();
assert_eq!(*counter.lock().unwrap(), 2);
}
#[test]
fn test_parameterized_composition() {
let param_comp = ParameterizedComposition::new(|params| {
let mut builder = CompositionBuilder::new();
let count = params.get_int("count").unwrap_or(3);
let mut tasks = Vec::new();
for i in 0..count {
let task = builder.emplace_cloneable(move || {
println!("Task {}", i);
});
tasks.push(task);
}
if !tasks.is_empty() {
builder.mark_entries(&[tasks[0].clone()]);
builder.mark_exits(&[tasks.last().unwrap().clone()]);
}
builder.build()
});
let mut params = CompositionParams::new();
params.set_int("count", 5);
let composition = param_comp.instantiate(¶ms);
assert!(!composition.entries().is_empty());
}
}