homestar_runtime/
workflow.rs

1//! A [Workflow] is a declarative configuration of a series of
2//! [UCAN Invocation] `Tasks`.
3//!
4//! [UCAN Invocation]: <https://github.com/ucan-wg/invocation>
5
6use crate::scheduler::ExecutionGraph;
7use anyhow::{anyhow, bail};
8use core::fmt;
9use dagga::{dot::DagLegend, Node};
10use diesel::{
11    backend::Backend,
12    deserialize::{self, FromSql},
13    serialize::{self, IsNull, Output, ToSql},
14    sql_types::Binary,
15    sqlite::Sqlite,
16    AsExpression, FromSqlRow,
17};
18use homestar_invocation::{
19    task::{
20        instruction::{Parse, Parsed, RunInstruction},
21        Instruction,
22    },
23    Invocation, Pointer,
24};
25use homestar_wasm::io::Arg;
26use homestar_workflow::Workflow;
27use indexmap::IndexMap;
28use itertools::Itertools;
29use libipld::{cbor::DagCborCodec, cid::Cid, prelude::Codec, serde::from_ipld, Ipld};
30use serde::{Deserialize, Serialize};
31use std::{collections::BTreeMap, path::Path};
32use tracing::debug;
33use url::Url;
34
35pub(crate) mod error;
36mod info;
37pub mod settings;
38
39pub(crate) use error::Error;
40pub(crate) use info::{Info, Stored, StoredReceipt};
41pub use info::{Status, StatusMapping, WORKFLOW_TAG};
42#[allow(unused_imports)]
43pub use settings::Settings;
44
45type Dag<'a> = dagga::Dag<Vertex<'a>, usize>;
46
47/// A [Workflow] [Builder] wrapper for the runtime.
48#[derive(Debug, Clone, PartialEq)]
49pub struct Builder<'a>(Workflow<'a, Arg>);
50
51/// A resource can refer to a [URI] or Cid
52/// being accessed.
53///
54/// [URI]: <https://en.wikipedia.org/wiki/Uniform_Resource_Identifier>
55#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
56#[allow(dead_code)]
57pub(crate) enum Resource {
58    /// Resource fetched by Url.
59    Url(Url),
60    /// Resource fetched by Cid.
61    Cid(Cid),
62}
63
64impl fmt::Display for Resource {
65    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
66        match *self {
67            Resource::Cid(cid) => write!(f, "{}", cid),
68            Resource::Url(ref url) => write!(f, "{}", url),
69        }
70    }
71}
72
73/// Ahead-of-time (AOT) context object, which includes the given
74/// [Workflow] as a executable [Dag] (directed acyclic graph) and
75/// the [Task] resources retrieved through IPFS Client or the DHT directly
76/// ahead-of-time.
77///
78/// [Dag]: dagga::Dag
79/// [Task]: homestar_invocation::Task
80#[derive(Debug, Clone)]
81pub(crate) struct AOTContext<'a> {
82    dag: Dag<'a>,
83    awaiting: Promises,
84    indexed_resources: IndexedResources,
85}
86
87impl AOTContext<'static> {
88    /// Convert [Dag] to a [dot] file, to be read by graphviz, etc.
89    ///
90    /// [Dag]: dagga::Dag
91    /// [dot]: <https://graphviz.org/doc/info/lang.html>
92    #[allow(dead_code)]
93    pub(crate) fn dot(&self, name: &str, path: &Path) -> anyhow::Result<()> {
94        DagLegend::new(self.dag.nodes())
95            .with_name(name)
96            .save_to(
97                path.to_str()
98                    .ok_or_else(|| anyhow!("path is not correctly formatted"))?,
99            )
100            .map_err(|e| anyhow!(e))
101    }
102}
103
104/// Vertex information for [Dag] [Node].
105///
106/// [Dag]: dagga::Dag
107#[derive(Debug, Clone, PartialEq)]
108pub(crate) struct Vertex<'a> {
109    pub(crate) instruction: Instruction<'a, Arg>,
110    pub(crate) parsed: Parsed<Arg>,
111    pub(crate) invocation: Pointer,
112}
113
114/// [Origin] of a [Cid] being in/not-in a [Workflow] itself.
115#[derive(Debug, Clone, PartialEq)]
116pub(crate) enum Origin {
117    /// [Cid] awaits an instruction/task in the [Workflow].
118    InFlow,
119    /// [Cid] awaits an instruction/task outside of the [Workflow].
120    OutFlow,
121}
122
123/// [Workflow] promises being awaited on.
124#[derive(Debug, Clone, PartialEq, Default)]
125pub(crate) struct Promises {
126    pub(crate) in_flow: Vec<Cid>,
127    pub(crate) out_flow: Vec<Cid>,
128}
129
130impl Promises {
131    /// Create a new [Promises] object from a given pair of
132    /// in-flow and out-flow [Cid]s.
133    pub(crate) fn new(in_flow: Vec<Cid>, out_flow: Vec<Cid>) -> Promises {
134        Promises { in_flow, out_flow }
135    }
136
137    /// Return an iterator over the [Promises] in-flow and out-flow [Cid]s.
138    pub(crate) fn iter(&self) -> impl Iterator<Item = (Origin, &Cid)> {
139        let in_iter = self.in_flow.iter().map(|cid| (Origin::InFlow, cid));
140        let out_iter = self.out_flow.iter().map(|cid| (Origin::OutFlow, cid));
141        in_iter.chain(out_iter)
142    }
143}
144
145impl<'a> Vertex<'a> {
146    fn new(
147        instruction: Instruction<'a, Arg>,
148        parsed: Parsed<Arg>,
149        invocation: Pointer,
150    ) -> Vertex<'a> {
151        Vertex {
152            instruction,
153            parsed,
154            invocation,
155        }
156    }
157}
158
159impl<'a> Builder<'a> {
160    /// Create a new [Workflow] [Builder] given a [Workflow].
161    pub fn new(workflow: Workflow<'a, Arg>) -> Builder<'a> {
162        Builder(workflow)
163    }
164
165    /// Return an owned [Workflow] from the [Builder].
166    pub fn into_inner(self) -> Workflow<'a, Arg> {
167        self.0
168    }
169
170    /// Return a referenced [Workflow] from the [Builder].
171    pub fn inner(&self) -> &Workflow<'a, Arg> {
172        &self.0
173    }
174
175    /// Convert the [Workflow] into an batch-separated [ExecutionGraph].
176    pub(crate) fn graph(self) -> Result<ExecutionGraph<'a>, Error> {
177        let aot = self.aot()?;
178        if let Err(_e) = aot.dag.detect_duplicates() {
179            homestar_invocation::bail!(Error::DuplicateTask)
180        }
181
182        match aot.dag.build_schedule() {
183            Ok(schedule) => Ok(ExecutionGraph {
184                schedule: schedule.batches,
185                awaiting: aot.awaiting,
186                indexed_resources: aot.indexed_resources,
187            }),
188            Err(e) => homestar_invocation::bail!(Error::InvalidSchedule(e.to_string())),
189        }
190    }
191
192    fn aot(self) -> anyhow::Result<AOTContext<'a>> {
193        let lookup_table = self.lookup_table()?;
194        let (mut dag, unawaits, awaited, promised_cids, resources) =
195            self.into_inner().tasks().into_iter().enumerate().try_fold(
196                (
197                    Dag::default(),
198                    vec![],
199                    vec![],
200                    (vec![], vec![]),
201                    IndexMap::new(),
202                ),
203                |(
204                    mut dag,
205                    mut unawaits,
206                    mut awaited,
207                    (mut in_flows, mut out_flows),
208                    mut resources,
209                ),
210                 (i, task)| {
211                    let instr_cid = task.instruction_cid()?;
212                    debug!(
213                        subject = "task.instruction",
214                        category = "aot.information",
215                        "instruction cid of task: {}",
216                        instr_cid
217                    );
218
219                    // Clone as we're owning the struct going backward.
220                    let ptr: Pointer = Invocation::<Arg>::from(task.clone()).try_into()?;
221
222                    let RunInstruction::Expanded(instr) = task.into_instruction() else {
223                        bail!("workflow tasks/instructions must be expanded / inlined")
224                    };
225
226                    resources
227                        .entry(instr_cid)
228                        .or_insert_with(|| vec![Resource::Url(instr.resource().to_owned())]);
229                    let parsed = instr.input().parse()?;
230                    let deferred = parsed.args().deferreds();
231                    let reads = deferred.fold(vec![], |mut in_flow_reads, cid| {
232                        if let Some(v) = lookup_table.get(&cid) {
233                            in_flows.push(cid);
234                            in_flow_reads.push(*v)
235                        } else {
236                            out_flows.push(cid);
237                        }
238                        // TODO: else, it's a Promise from another task outside
239                        // of the workflow.
240                        in_flow_reads
241                    });
242
243                    parsed.args().links().for_each(|cid| {
244                        resources
245                            .entry(instr_cid)
246                            .and_modify(|prev_rscs| {
247                                prev_rscs.push(Resource::Cid(cid.to_owned()));
248                            })
249                            .or_insert_with(|| vec![Resource::Cid(cid.to_owned())]);
250                    });
251
252                    let node = Node::new(Vertex::new(instr.to_owned(), parsed, ptr))
253                        .with_name(instr_cid.to_string())
254                        .with_result(i);
255
256                    if !reads.is_empty() {
257                        dag.add_node(node.with_reads(reads.clone()));
258                        awaited.extend(reads);
259                    } else {
260                        unawaits.push(node);
261                    }
262
263                    Ok::<_, anyhow::Error>((
264                        dag,
265                        unawaits,
266                        awaited,
267                        (in_flows, out_flows),
268                        resources,
269                    ))
270                },
271            )?;
272
273        for mut node in unawaits.clone().into_iter() {
274            if node.get_results().any(|r| awaited.contains(r)) {
275                dag.add_node(node);
276            } else {
277                // set barrier for non-awaited nodes
278                node.set_barrier(1);
279                dag.add_node(node);
280            }
281        }
282
283        Ok(AOTContext {
284            dag,
285            awaiting: Promises::new(promised_cids.0, promised_cids.1),
286            indexed_resources: IndexedResources(resources),
287        })
288    }
289
290    /// Generate an [IndexMap] lookup table of task instruction CIDs to a
291    /// unique enumeration.
292    fn lookup_table(&self) -> anyhow::Result<IndexMap<Cid, usize>> {
293        self.inner()
294            .tasks_ref()
295            .iter()
296            .enumerate()
297            .try_fold(IndexMap::new(), |mut acc, (i, t)| {
298                acc.insert(t.instruction_cid()?, i);
299                Ok::<_, anyhow::Error>(acc)
300            })
301    }
302}
303
304/// A container for [IndexMap]s from Cid => resource.
305#[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize, AsExpression, FromSqlRow)]
306#[diesel(sql_type = Binary)]
307pub struct IndexedResources(IndexMap<Cid, Vec<Resource>>);
308
309impl IndexedResources {
310    /// Create a new [IndexedResources] container from an [IndexMap] of
311    /// [Resource]s.
312    #[allow(dead_code)]
313    pub(crate) fn new(map: IndexMap<Cid, Vec<Resource>>) -> IndexedResources {
314        IndexedResources(map)
315    }
316
317    /// Reutrn a referenced [IndexMap] of [Resource]s.
318    #[allow(dead_code)]
319    pub(crate) fn inner(&self) -> &IndexMap<Cid, Vec<Resource>> {
320        &self.0
321    }
322
323    /// Return an owned [IndexMap] of [Resource]s.
324    #[allow(dead_code)]
325    pub(crate) fn into_inner(self) -> IndexMap<Cid, Vec<Resource>> {
326        self.0
327    }
328
329    /// Get length of [IndexedResources].
330    #[allow(dead_code)]
331    pub(crate) fn len(&self) -> usize {
332        self.0.len()
333    }
334
335    /// Check if [IndexedResources] is empty.
336    #[allow(dead_code)]
337    pub(crate) fn is_empty(&self) -> bool {
338        self.0.is_empty()
339    }
340
341    /// Get a [Resource] by [Instruction] Cid.
342    ///
343    /// [Instruction]: homestar_invocation::task::Instruction
344    #[allow(dead_code)]
345    pub(crate) fn get(&self, cid: &Cid) -> Option<&Vec<Resource>> {
346        self.0.get(cid)
347    }
348
349    /// Iterate over all [Resource]s as references.
350    #[allow(dead_code)]
351    pub(crate) fn iter(&self) -> impl Iterator<Item = &Resource> {
352        self.0.values().flatten().unique()
353    }
354
355    /// Iterate over all [Resource]s.
356    #[allow(dead_code)]
357    pub(crate) fn into_iter(self) -> impl Iterator<Item = Resource> {
358        self.0.into_values().flatten().unique()
359    }
360}
361
362impl From<IndexedResources> for Ipld {
363    fn from(resources: IndexedResources) -> Self {
364        let btreemap: BTreeMap<String, Ipld> = resources
365            .0
366            .into_iter()
367            .map(|(k, v)| {
368                (
369                    k.to_string(),
370                    Ipld::List(
371                        v.into_iter()
372                            .map(|v| match v {
373                                Resource::Url(url) => Ipld::String(url.to_string()),
374                                Resource::Cid(cid) => Ipld::Link(cid),
375                            })
376                            .collect(),
377                    ),
378                )
379            })
380            .collect();
381        Ipld::Map(btreemap)
382    }
383}
384
385impl TryFrom<Ipld> for IndexedResources {
386    type Error = anyhow::Error;
387
388    fn try_from(ipld: Ipld) -> Result<Self, Self::Error> {
389        let map = from_ipld::<BTreeMap<String, Ipld>>(ipld)?
390            .into_iter()
391            .map(|(k, v)| {
392                let cid = Cid::try_from(k)?;
393                let list = from_ipld::<Vec<Ipld>>(v)?;
394                let rscs = list
395                    .into_iter()
396                    .map(|v| {
397                        Ok(match v {
398                            Ipld::String(url) => Resource::Url(Url::parse(&url)?),
399                            Ipld::Link(cid) => Resource::Cid(cid),
400                            _ => bail!("invalid resource type"),
401                        })
402                    })
403                    .collect::<Result<Vec<Resource>, anyhow::Error>>()?;
404
405                Ok((cid, rscs))
406            })
407            .collect::<Result<IndexMap<Cid, Vec<Resource>>, anyhow::Error>>()?;
408
409        Ok(IndexedResources(map))
410    }
411}
412
413impl TryFrom<IndexedResources> for Vec<u8> {
414    type Error = anyhow::Error;
415
416    fn try_from(resources: IndexedResources) -> Result<Self, Self::Error> {
417        let ipld = Ipld::from(resources);
418        DagCborCodec.encode(&ipld)
419    }
420}
421
422impl ToSql<Binary, Sqlite> for IndexedResources
423where
424    [u8]: ToSql<Binary, Sqlite>,
425{
426    fn to_sql<'b>(&'b self, out: &mut Output<'b, '_, Sqlite>) -> serialize::Result {
427        let bytes: Vec<u8> = self.to_owned().try_into()?;
428        out.set_value(bytes);
429        Ok(IsNull::No)
430    }
431}
432
433impl<DB> FromSql<Binary, DB> for IndexedResources
434where
435    DB: Backend,
436    *const [u8]: FromSql<Binary, DB>,
437{
438    fn from_sql(bytes: DB::RawValue<'_>) -> deserialize::Result<Self> {
439        let raw_bytes = <*const [u8] as FromSql<Binary, DB>>::from_sql(bytes)?;
440        let raw_bytes: &[u8] = unsafe { &*raw_bytes };
441        let ipld: Ipld = DagCborCodec.decode(raw_bytes)?;
442        let decoded: IndexedResources = ipld.try_into()?;
443        Ok(decoded)
444    }
445}
446
447#[cfg(test)]
448mod test {
449    use super::*;
450    use homestar_invocation::{
451        authority::UcanPrf,
452        ipld::DagCbor,
453        pointer::{Await, AwaitResult},
454        task::{
455            instruction::{Ability, Input},
456            Resources,
457        },
458        test_utils, Task, Unit,
459    };
460
461    #[test]
462    fn ipld_roundtrip_indexed_resources() {
463        let (instruction1, instruction2, _) = test_utils::related_wasm_instructions::<Unit>();
464
465        let mut index_map = IndexMap::new();
466        index_map.insert(
467            instruction1.clone().to_cid().unwrap(),
468            vec![Resource::Url(instruction1.resource().to_owned())],
469        );
470        index_map.insert(
471            instruction2.clone().to_cid().unwrap(),
472            vec![Resource::Url(instruction2.resource().to_owned())],
473        );
474        let indexed_resources = IndexedResources::new(index_map);
475
476        let ipld = Ipld::from(indexed_resources.clone());
477        let ipld_to_indexed_resources = ipld.try_into().unwrap();
478        assert_eq!(indexed_resources, ipld_to_indexed_resources);
479    }
480
481    #[test]
482    fn dag_to_dot() {
483        let config = Resources::default();
484        let instruction1 = test_utils::wasm_instruction::<Arg>();
485        let (instruction2, _) = test_utils::wasm_instruction_with_nonce::<Arg>();
486        let task1 = Task::new(
487            RunInstruction::Expanded(instruction1),
488            config.clone().into(),
489            UcanPrf::default(),
490        );
491        let task2 = Task::new(
492            RunInstruction::Expanded(instruction2),
493            config.into(),
494            UcanPrf::default(),
495        );
496
497        let workflow = Workflow::new(vec![task1, task2]);
498        let builder = Builder::new(workflow);
499        let aot = builder.aot().unwrap();
500
501        aot.dot("test", Path::new("test.dot")).unwrap();
502        assert!(Path::new("test.dot").exists());
503    }
504
505    #[test]
506    fn build_parallel_schedule() {
507        let config = Resources::default();
508        let instruction1 = test_utils::wasm_instruction::<Arg>();
509        let (instruction2, _) = test_utils::wasm_instruction_with_nonce::<Arg>();
510        let task1 = Task::new(
511            RunInstruction::Expanded(instruction1),
512            config.clone().into(),
513            UcanPrf::default(),
514        );
515        let task2 = Task::new(
516            RunInstruction::Expanded(instruction2),
517            config.into(),
518            UcanPrf::default(),
519        );
520
521        let tasks = vec![task1.clone(), task2.clone()];
522
523        let workflow = Workflow::new(tasks);
524        let builder = Builder::new(workflow);
525        let dag = builder.aot().unwrap().dag;
526
527        let instr1 = task1.instruction_cid().unwrap().to_string();
528        let instr2 = task2.instruction_cid().unwrap().to_string();
529
530        assert!(dag
531            .nodes()
532            .any(|node| node.name() == instr1 || node.name() == instr2));
533    }
534
535    #[test]
536    fn build_seq_schedule() {
537        let config = Resources::default();
538        let (instruction1, instruction2, _) = test_utils::related_wasm_instructions::<Arg>();
539        let task1 = Task::new(
540            RunInstruction::Expanded(instruction1),
541            config.clone().into(),
542            UcanPrf::default(),
543        );
544        let task2 = Task::new(
545            RunInstruction::Expanded(instruction2),
546            config.into(),
547            UcanPrf::default(),
548        );
549
550        let workflow = Workflow::new(vec![task1.clone(), task2.clone()]);
551        let builder = Builder::new(workflow);
552        let dag = builder.aot().unwrap().dag;
553
554        let instr1 = task1.instruction_cid().unwrap().to_string();
555        let instr2 = task2.instruction_cid().unwrap().to_string();
556
557        // separate
558        dagga::assert_batches(&[&instr1, &instr2], dag);
559    }
560
561    #[test]
562    fn build_mixed_graph() {
563        let config = Resources::default();
564        let (instruction1, instruction2, instruction3) =
565            test_utils::related_wasm_instructions::<Arg>();
566        let task1 = Task::new(
567            RunInstruction::Expanded(instruction1.clone()),
568            config.clone().into(),
569            UcanPrf::default(),
570        );
571        let task2 = Task::new(
572            RunInstruction::Expanded(instruction2),
573            config.clone().into(),
574            UcanPrf::default(),
575        );
576        let task3 = Task::new(
577            RunInstruction::Expanded(instruction3),
578            config.clone().into(),
579            UcanPrf::default(),
580        );
581
582        let (instruction4, _) = test_utils::wasm_instruction_with_nonce::<Arg>();
583        let task4 = Task::new(
584            RunInstruction::Expanded(instruction4),
585            config.clone().into(),
586            UcanPrf::default(),
587        );
588
589        let (instruction5, _) = test_utils::wasm_instruction_with_nonce::<Arg>();
590        let task5 = Task::new(
591            RunInstruction::Expanded(instruction5),
592            config.clone().into(),
593            UcanPrf::default(),
594        );
595
596        let promise1 = Await::new(
597            Pointer::new(instruction1.clone().to_cid().unwrap()),
598            AwaitResult::Ok,
599        );
600
601        let dep_instr = Instruction::new(
602            instruction1.resource().to_owned(),
603            Ability::from("wasm/run"),
604            Input::<Arg>::Ipld(Ipld::Map(BTreeMap::from([
605                ("func".into(), Ipld::String("add_two".to_string())),
606                (
607                    "args".into(),
608                    Ipld::List(vec![Ipld::from(promise1.clone())]),
609                ),
610            ]))),
611        );
612
613        let task6 = Task::new(
614            RunInstruction::Expanded(dep_instr),
615            config.into(),
616            UcanPrf::default(),
617        );
618
619        let tasks = vec![
620            task6.clone(),
621            task1.clone(),
622            task2.clone(),
623            task3.clone(),
624            task4.clone(),
625            task5.clone(),
626        ];
627        let workflow = Workflow::new(tasks);
628
629        let instr1 = task1.instruction_cid().unwrap().to_string();
630        let instr2 = task2.instruction_cid().unwrap().to_string();
631        let instr3 = task3.instruction_cid().unwrap().to_string();
632        let instr4 = task4.instruction_cid().unwrap().to_string();
633        let instr5 = task5.instruction_cid().unwrap().to_string();
634        let instr6 = task6.instruction_cid().unwrap().to_string();
635
636        let builder = Builder::new(workflow);
637        let schedule = builder.graph().unwrap().schedule;
638        let nodes = schedule
639            .into_iter()
640            .fold(vec![], |mut acc: Vec<String>, vec| {
641                if vec.len() == 1 {
642                    acc.push(vec.first().unwrap().name().to_string())
643                } else {
644                    let mut tmp = vec![];
645                    for node in vec {
646                        tmp.push(node.name().to_string());
647                    }
648                    acc.push(tmp.join(", "))
649                }
650
651                acc
652            });
653
654        assert!(
655            nodes
656                == vec![
657                    format!("{instr1}"),
658                    format!("{instr6}, {instr2}"),
659                    format!("{instr3}"),
660                    format!("{instr4}, {instr5}")
661                ]
662                || nodes
663                    == vec![
664                        format!("{instr1}"),
665                        format!("{instr6}, {instr2}"),
666                        format!("{instr3}"),
667                        format!("{instr5}, {instr4}")
668                    ]
669                || nodes
670                    == vec![
671                        format!("{instr1}"),
672                        format!("{instr2}, {instr6}"),
673                        format!("{instr3}"),
674                        format!("{instr4}, {instr5}")
675                    ]
676                || nodes
677                    == vec![
678                        format!("{instr1}"),
679                        format!("{instr2}, {instr6}"),
680                        format!("{instr3}"),
681                        format!("{instr5}, {instr4}")
682                    ]
683        );
684    }
685}