use homestar_invocation::{
bail,
error::Error,
ipld::{DagCbor, DagJson},
Task, Unit,
};
use libipld::{serde::from_ipld, Ipld};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
const TASKS_KEY: &str = "tasks";
#[derive(Debug, Clone, JsonSchema, PartialEq, Serialize, Deserialize)]
#[schemars(title = "Workflow", description = "Workflow composed of tasks")]
pub struct Workflow<'a, T> {
tasks: Vec<Task<'a, T>>,
}
impl<'a, T> Workflow<'a, T> {
pub fn new(tasks: Vec<Task<'a, T>>) -> Self {
Self { tasks }
}
pub fn tasks(self) -> Vec<Task<'a, T>> {
self.tasks
}
pub fn tasks_ref(&self) -> &Vec<Task<'a, T>> {
&self.tasks
}
pub fn len(&self) -> u32 {
self.tasks.len() as u32
}
pub fn is_empty(&self) -> bool {
self.tasks.is_empty()
}
}
impl<'a, T> From<Workflow<'a, T>> for Ipld
where
Ipld: From<Task<'a, T>>,
{
fn from(workflow: Workflow<'a, T>) -> Self {
Ipld::Map(BTreeMap::from([(
TASKS_KEY.into(),
Ipld::List(
workflow
.tasks
.into_iter()
.map(Ipld::from)
.collect::<Vec<Ipld>>(),
),
)]))
}
}
impl<'a, T> TryFrom<Ipld> for Workflow<'a, T>
where
T: From<Ipld>,
{
type Error = Error<Unit>;
fn try_from(ipld: Ipld) -> Result<Self, Self::Error> {
let map = from_ipld::<BTreeMap<String, Ipld>>(ipld)?;
let ipld = map
.get(TASKS_KEY)
.ok_or_else(|| Error::<Unit>::MissingField(TASKS_KEY.to_string()))?;
let tasks = if let Ipld::List(tasks) = ipld {
tasks.iter().try_fold(vec![], |mut acc, ipld| {
acc.push(ipld.to_owned().try_into()?);
Ok::<_, Self::Error>(acc)
})?
} else {
bail!(Error::not_an_ipld_list());
};
Ok(Self { tasks })
}
}
impl<'a, T> DagCbor for Workflow<'a, T>
where
T: Clone,
Ipld: From<T>,
{
}
impl<'a, T> DagJson for Workflow<'a, T>
where
T: From<Ipld> + Clone,
Ipld: From<T>,
{
}
#[cfg(test)]
mod test {
use super::*;
use homestar_invocation::{
authority::UcanPrf,
task::{instruction::RunInstruction, Resources},
test_utils,
};
use std::assert_eq;
#[test]
fn workflow_to_json_roundtrip() {
let config = Resources::default();
let instruction1 = test_utils::instruction::<Unit>();
let (instruction2, _) = test_utils::wasm_instruction_with_nonce::<Unit>();
let task1 = Task::new(
RunInstruction::Expanded(instruction1),
config.clone().into(),
UcanPrf::default(),
);
let task2 = Task::new(
RunInstruction::Expanded(instruction2),
config.into(),
UcanPrf::default(),
);
let workflow = Workflow::new(vec![task1.clone(), task2.clone()]);
let json_bytes = workflow.to_json().unwrap();
let json_string = workflow.to_json_string().unwrap();
let json_val = json::from(json_string.clone());
assert_eq!(json_string, json_val.to_string());
assert_eq!(json_bytes, json_string.as_bytes());
let wf_from_json1: Workflow<'_, Unit> = DagJson::from_json(json_string.as_bytes()).unwrap();
assert_eq!(workflow, wf_from_json1);
let wf_from_json2: Workflow<'_, Unit> = DagJson::from_json_string(json_string).unwrap();
assert_eq!(workflow, wf_from_json2);
}
#[test]
fn workflow_to_cbor_to_json_roundtrip() {
let config = Resources::default();
let instruction1 = test_utils::instruction::<Unit>();
let (instruction2, _) = test_utils::wasm_instruction_with_nonce::<Unit>();
let task1 = Task::new(
RunInstruction::Expanded(instruction1),
config.clone().into(),
UcanPrf::default(),
);
let task2 = Task::new(
RunInstruction::Expanded(instruction2),
config.into(),
UcanPrf::default(),
);
let workflow = Workflow::new(vec![task1.clone(), task2.clone()]);
let cbor_bytes = workflow.clone().to_cbor().unwrap();
let workflow_from_cbor = Workflow::<Unit>::from_cbor(&cbor_bytes).unwrap();
assert_eq!(workflow, workflow_from_cbor);
let json_from_cbor_string = workflow_from_cbor.clone().to_dagjson_string().unwrap();
let json_string = workflow.to_json_string().unwrap();
assert_eq!(json_from_cbor_string, json_string);
}
#[test]
fn ipld_roundtrip_workflow() {
let config = Resources::default();
let (instruction1, instruction2, _) = test_utils::related_wasm_instructions::<Unit>();
let task1 = Task::new(
RunInstruction::Expanded(instruction1),
config.clone().into(),
UcanPrf::default(),
);
let task2 = Task::new(
RunInstruction::Expanded(instruction2),
config.into(),
UcanPrf::default(),
);
let workflow = Workflow::new(vec![task1.clone(), task2.clone()]);
let ipld = Ipld::from(workflow.clone());
let ipld_to_workflow = ipld.try_into().unwrap();
assert_eq!(workflow, ipld_to_workflow);
}
#[test]
fn ser_de() {
let config = Resources::default();
let instruction1 = test_utils::instruction::<Unit>();
let (instruction2, _) = test_utils::wasm_instruction_with_nonce::<Unit>();
let task1 = Task::new(
RunInstruction::Expanded(instruction1),
config.clone().into(),
UcanPrf::default(),
);
let task2 = Task::new(
RunInstruction::Expanded(instruction2),
config.into(),
UcanPrf::default(),
);
let workflow = Workflow::new(vec![task1.clone(), task2.clone()]);
let ser = serde_json::to_string(&workflow).unwrap();
let de = serde_json::from_str(&ser).unwrap();
assert_eq!(workflow, de);
}
}