homestar_workflow/
workflow.rs

1//! Workflow and [Ucan invocation] componets for building Homestar pipelines.
2//!
3//! [Ucan invocation]: <https://github.com/ucan-wg/invocation>
4
5use homestar_invocation::{
6    bail,
7    error::Error,
8    ipld::{DagCbor, DagJson},
9    Task, Unit,
10};
11use libipld::{serde::from_ipld, Ipld};
12use schemars::JsonSchema;
13use serde::{Deserialize, Serialize};
14use std::collections::BTreeMap;
15
16const TASKS_KEY: &str = "tasks";
17
18/// Workflow composed of [tasks].
19///
20/// [tasks]: Task
21#[derive(Debug, Clone, JsonSchema, PartialEq, Serialize, Deserialize)]
22#[schemars(title = "Workflow", description = "Workflow composed of tasks")]
23pub struct Workflow<'a, T> {
24    tasks: Vec<Task<'a, T>>,
25}
26
27impl<'a, T> Workflow<'a, T> {
28    /// Create a new [Workflow] given a set of tasks.
29    pub fn new(tasks: Vec<Task<'a, T>>) -> Self {
30        Self { tasks }
31    }
32
33    /// Return a [Workflow]'s [tasks] vector.
34    ///
35    /// [tasks]: Task
36    pub fn tasks(self) -> Vec<Task<'a, T>> {
37        self.tasks
38    }
39
40    /// Return a reference to [Workflow]'s [tasks] vector.
41    ///
42    /// [tasks]: Task
43    pub fn tasks_ref(&self) -> &Vec<Task<'a, T>> {
44        &self.tasks
45    }
46
47    /// Length of workflow given a series of [tasks].
48    ///
49    /// [tasks]: Task
50    pub fn len(&self) -> u32 {
51        self.tasks.len() as u32
52    }
53
54    /// Whether [Workflow] contains [tasks] or not.
55    ///
56    /// [tasks]: Task
57    pub fn is_empty(&self) -> bool {
58        self.tasks.is_empty()
59    }
60}
61
62impl<'a, T> From<Workflow<'a, T>> for Ipld
63where
64    Ipld: From<Task<'a, T>>,
65{
66    fn from(workflow: Workflow<'a, T>) -> Self {
67        Ipld::Map(BTreeMap::from([(
68            TASKS_KEY.into(),
69            Ipld::List(
70                workflow
71                    .tasks
72                    .into_iter()
73                    .map(Ipld::from)
74                    .collect::<Vec<Ipld>>(),
75            ),
76        )]))
77    }
78}
79
80impl<'a, T> TryFrom<Ipld> for Workflow<'a, T>
81where
82    T: From<Ipld>,
83{
84    type Error = Error<Unit>;
85
86    fn try_from(ipld: Ipld) -> Result<Self, Self::Error> {
87        let map = from_ipld::<BTreeMap<String, Ipld>>(ipld)?;
88        let ipld = map
89            .get(TASKS_KEY)
90            .ok_or_else(|| Error::<Unit>::MissingField(TASKS_KEY.to_string()))?;
91
92        let tasks = if let Ipld::List(tasks) = ipld {
93            tasks.iter().try_fold(vec![], |mut acc, ipld| {
94                acc.push(ipld.to_owned().try_into()?);
95                Ok::<_, Self::Error>(acc)
96            })?
97        } else {
98            bail!(Error::not_an_ipld_list());
99        };
100
101        Ok(Self { tasks })
102    }
103}
104
105impl<'a, T> DagCbor for Workflow<'a, T>
106where
107    T: Clone,
108    Ipld: From<T>,
109{
110}
111
112impl<'a, T> DagJson for Workflow<'a, T>
113where
114    T: From<Ipld> + Clone,
115    Ipld: From<T>,
116{
117}
118
119#[cfg(test)]
120mod test {
121    use super::*;
122    use homestar_invocation::{
123        authority::UcanPrf,
124        task::{instruction::RunInstruction, Resources},
125        test_utils,
126    };
127    use std::assert_eq;
128
129    #[test]
130    fn workflow_to_json_roundtrip() {
131        let config = Resources::default();
132        let instruction1 = test_utils::instruction::<Unit>();
133        let (instruction2, _) = test_utils::wasm_instruction_with_nonce::<Unit>();
134
135        let task1 = Task::new(
136            RunInstruction::Expanded(instruction1),
137            config.clone().into(),
138            UcanPrf::default(),
139        );
140        let task2 = Task::new(
141            RunInstruction::Expanded(instruction2),
142            config.into(),
143            UcanPrf::default(),
144        );
145
146        let workflow = Workflow::new(vec![task1.clone(), task2.clone()]);
147
148        let json_bytes = workflow.to_json().unwrap();
149        let json_string = workflow.to_json_string().unwrap();
150        let json_val = json::from(json_string.clone());
151
152        assert_eq!(json_string, json_val.to_string());
153        assert_eq!(json_bytes, json_string.as_bytes());
154        let wf_from_json1: Workflow<'_, Unit> = DagJson::from_json(json_string.as_bytes()).unwrap();
155        assert_eq!(workflow, wf_from_json1);
156        let wf_from_json2: Workflow<'_, Unit> = DagJson::from_json_string(json_string).unwrap();
157        assert_eq!(workflow, wf_from_json2);
158    }
159
160    #[test]
161    fn workflow_to_cbor_to_json_roundtrip() {
162        let config = Resources::default();
163        let instruction1 = test_utils::instruction::<Unit>();
164        let (instruction2, _) = test_utils::wasm_instruction_with_nonce::<Unit>();
165
166        let task1 = Task::new(
167            RunInstruction::Expanded(instruction1),
168            config.clone().into(),
169            UcanPrf::default(),
170        );
171        let task2 = Task::new(
172            RunInstruction::Expanded(instruction2),
173            config.into(),
174            UcanPrf::default(),
175        );
176
177        let workflow = Workflow::new(vec![task1.clone(), task2.clone()]);
178        let cbor_bytes = workflow.clone().to_cbor().unwrap();
179        let workflow_from_cbor = Workflow::<Unit>::from_cbor(&cbor_bytes).unwrap();
180        assert_eq!(workflow, workflow_from_cbor);
181
182        let json_from_cbor_string = workflow_from_cbor.clone().to_dagjson_string().unwrap();
183        let json_string = workflow.to_json_string().unwrap();
184
185        assert_eq!(json_from_cbor_string, json_string);
186    }
187
188    #[test]
189    fn ipld_roundtrip_workflow() {
190        let config = Resources::default();
191        let (instruction1, instruction2, _) = test_utils::related_wasm_instructions::<Unit>();
192        let task1 = Task::new(
193            RunInstruction::Expanded(instruction1),
194            config.clone().into(),
195            UcanPrf::default(),
196        );
197        let task2 = Task::new(
198            RunInstruction::Expanded(instruction2),
199            config.into(),
200            UcanPrf::default(),
201        );
202
203        let workflow = Workflow::new(vec![task1.clone(), task2.clone()]);
204        let ipld = Ipld::from(workflow.clone());
205        let ipld_to_workflow = ipld.try_into().unwrap();
206        assert_eq!(workflow, ipld_to_workflow);
207    }
208
209    #[test]
210    fn ser_de() {
211        let config = Resources::default();
212        let instruction1 = test_utils::instruction::<Unit>();
213        let (instruction2, _) = test_utils::wasm_instruction_with_nonce::<Unit>();
214
215        let task1 = Task::new(
216            RunInstruction::Expanded(instruction1),
217            config.clone().into(),
218            UcanPrf::default(),
219        );
220        let task2 = Task::new(
221            RunInstruction::Expanded(instruction2),
222            config.into(),
223            UcanPrf::default(),
224        );
225
226        let workflow = Workflow::new(vec![task1.clone(), task2.clone()]);
227
228        let ser = serde_json::to_string(&workflow).unwrap();
229        let de = serde_json::from_str(&ser).unwrap();
230
231        assert_eq!(workflow, de);
232    }
233}