1use homestar_invocation::{
6 bail,
7 error::Error,
8 ipld::{DagCbor, DagJson},
9 Task, Unit,
10};
11use libipld::{serde::from_ipld, Ipld};
12use schemars::JsonSchema;
13use serde::{Deserialize, Serialize};
14use std::collections::BTreeMap;
15
16const TASKS_KEY: &str = "tasks";
17
18#[derive(Debug, Clone, JsonSchema, PartialEq, Serialize, Deserialize)]
22#[schemars(title = "Workflow", description = "Workflow composed of tasks")]
23pub struct Workflow<'a, T> {
24 tasks: Vec<Task<'a, T>>,
25}
26
27impl<'a, T> Workflow<'a, T> {
28 pub fn new(tasks: Vec<Task<'a, T>>) -> Self {
30 Self { tasks }
31 }
32
33 pub fn tasks(self) -> Vec<Task<'a, T>> {
37 self.tasks
38 }
39
40 pub fn tasks_ref(&self) -> &Vec<Task<'a, T>> {
44 &self.tasks
45 }
46
47 pub fn len(&self) -> u32 {
51 self.tasks.len() as u32
52 }
53
54 pub fn is_empty(&self) -> bool {
58 self.tasks.is_empty()
59 }
60}
61
62impl<'a, T> From<Workflow<'a, T>> for Ipld
63where
64 Ipld: From<Task<'a, T>>,
65{
66 fn from(workflow: Workflow<'a, T>) -> Self {
67 Ipld::Map(BTreeMap::from([(
68 TASKS_KEY.into(),
69 Ipld::List(
70 workflow
71 .tasks
72 .into_iter()
73 .map(Ipld::from)
74 .collect::<Vec<Ipld>>(),
75 ),
76 )]))
77 }
78}
79
80impl<'a, T> TryFrom<Ipld> for Workflow<'a, T>
81where
82 T: From<Ipld>,
83{
84 type Error = Error<Unit>;
85
86 fn try_from(ipld: Ipld) -> Result<Self, Self::Error> {
87 let map = from_ipld::<BTreeMap<String, Ipld>>(ipld)?;
88 let ipld = map
89 .get(TASKS_KEY)
90 .ok_or_else(|| Error::<Unit>::MissingField(TASKS_KEY.to_string()))?;
91
92 let tasks = if let Ipld::List(tasks) = ipld {
93 tasks.iter().try_fold(vec![], |mut acc, ipld| {
94 acc.push(ipld.to_owned().try_into()?);
95 Ok::<_, Self::Error>(acc)
96 })?
97 } else {
98 bail!(Error::not_an_ipld_list());
99 };
100
101 Ok(Self { tasks })
102 }
103}
104
105impl<'a, T> DagCbor for Workflow<'a, T>
106where
107 T: Clone,
108 Ipld: From<T>,
109{
110}
111
112impl<'a, T> DagJson for Workflow<'a, T>
113where
114 T: From<Ipld> + Clone,
115 Ipld: From<T>,
116{
117}
118
119#[cfg(test)]
120mod test {
121 use super::*;
122 use homestar_invocation::{
123 authority::UcanPrf,
124 task::{instruction::RunInstruction, Resources},
125 test_utils,
126 };
127 use std::assert_eq;
128
129 #[test]
130 fn workflow_to_json_roundtrip() {
131 let config = Resources::default();
132 let instruction1 = test_utils::instruction::<Unit>();
133 let (instruction2, _) = test_utils::wasm_instruction_with_nonce::<Unit>();
134
135 let task1 = Task::new(
136 RunInstruction::Expanded(instruction1),
137 config.clone().into(),
138 UcanPrf::default(),
139 );
140 let task2 = Task::new(
141 RunInstruction::Expanded(instruction2),
142 config.into(),
143 UcanPrf::default(),
144 );
145
146 let workflow = Workflow::new(vec![task1.clone(), task2.clone()]);
147
148 let json_bytes = workflow.to_json().unwrap();
149 let json_string = workflow.to_json_string().unwrap();
150 let json_val = json::from(json_string.clone());
151
152 assert_eq!(json_string, json_val.to_string());
153 assert_eq!(json_bytes, json_string.as_bytes());
154 let wf_from_json1: Workflow<'_, Unit> = DagJson::from_json(json_string.as_bytes()).unwrap();
155 assert_eq!(workflow, wf_from_json1);
156 let wf_from_json2: Workflow<'_, Unit> = DagJson::from_json_string(json_string).unwrap();
157 assert_eq!(workflow, wf_from_json2);
158 }
159
160 #[test]
161 fn workflow_to_cbor_to_json_roundtrip() {
162 let config = Resources::default();
163 let instruction1 = test_utils::instruction::<Unit>();
164 let (instruction2, _) = test_utils::wasm_instruction_with_nonce::<Unit>();
165
166 let task1 = Task::new(
167 RunInstruction::Expanded(instruction1),
168 config.clone().into(),
169 UcanPrf::default(),
170 );
171 let task2 = Task::new(
172 RunInstruction::Expanded(instruction2),
173 config.into(),
174 UcanPrf::default(),
175 );
176
177 let workflow = Workflow::new(vec![task1.clone(), task2.clone()]);
178 let cbor_bytes = workflow.clone().to_cbor().unwrap();
179 let workflow_from_cbor = Workflow::<Unit>::from_cbor(&cbor_bytes).unwrap();
180 assert_eq!(workflow, workflow_from_cbor);
181
182 let json_from_cbor_string = workflow_from_cbor.clone().to_dagjson_string().unwrap();
183 let json_string = workflow.to_json_string().unwrap();
184
185 assert_eq!(json_from_cbor_string, json_string);
186 }
187
188 #[test]
189 fn ipld_roundtrip_workflow() {
190 let config = Resources::default();
191 let (instruction1, instruction2, _) = test_utils::related_wasm_instructions::<Unit>();
192 let task1 = Task::new(
193 RunInstruction::Expanded(instruction1),
194 config.clone().into(),
195 UcanPrf::default(),
196 );
197 let task2 = Task::new(
198 RunInstruction::Expanded(instruction2),
199 config.into(),
200 UcanPrf::default(),
201 );
202
203 let workflow = Workflow::new(vec![task1.clone(), task2.clone()]);
204 let ipld = Ipld::from(workflow.clone());
205 let ipld_to_workflow = ipld.try_into().unwrap();
206 assert_eq!(workflow, ipld_to_workflow);
207 }
208
209 #[test]
210 fn ser_de() {
211 let config = Resources::default();
212 let instruction1 = test_utils::instruction::<Unit>();
213 let (instruction2, _) = test_utils::wasm_instruction_with_nonce::<Unit>();
214
215 let task1 = Task::new(
216 RunInstruction::Expanded(instruction1),
217 config.clone().into(),
218 UcanPrf::default(),
219 );
220 let task2 = Task::new(
221 RunInstruction::Expanded(instruction2),
222 config.into(),
223 UcanPrf::default(),
224 );
225
226 let workflow = Workflow::new(vec![task1.clone(), task2.clone()]);
227
228 let ser = serde_json::to_string(&workflow).unwrap();
229 let de = serde_json::from_str(&ser).unwrap();
230
231 assert_eq!(workflow, de);
232 }
233}