1use crate::scheduler::ExecutionGraph;
7use anyhow::{anyhow, bail};
8use core::fmt;
9use dagga::{dot::DagLegend, Node};
10use diesel::{
11 backend::Backend,
12 deserialize::{self, FromSql},
13 serialize::{self, IsNull, Output, ToSql},
14 sql_types::Binary,
15 sqlite::Sqlite,
16 AsExpression, FromSqlRow,
17};
18use homestar_invocation::{
19 task::{
20 instruction::{Parse, Parsed, RunInstruction},
21 Instruction,
22 },
23 Invocation, Pointer,
24};
25use homestar_wasm::io::Arg;
26use homestar_workflow::Workflow;
27use indexmap::IndexMap;
28use itertools::Itertools;
29use libipld::{cbor::DagCborCodec, cid::Cid, prelude::Codec, serde::from_ipld, Ipld};
30use serde::{Deserialize, Serialize};
31use std::{collections::BTreeMap, path::Path};
32use tracing::debug;
33use url::Url;
34
35pub(crate) mod error;
36mod info;
37pub mod settings;
38
39pub(crate) use error::Error;
40pub(crate) use info::{Info, Stored, StoredReceipt};
41pub use info::{Status, StatusMapping, WORKFLOW_TAG};
42#[allow(unused_imports)]
43pub use settings::Settings;
44
45type Dag<'a> = dagga::Dag<Vertex<'a>, usize>;
46
47#[derive(Debug, Clone, PartialEq)]
49pub struct Builder<'a>(Workflow<'a, Arg>);
50
51#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
56#[allow(dead_code)]
57pub(crate) enum Resource {
58 Url(Url),
60 Cid(Cid),
62}
63
64impl fmt::Display for Resource {
65 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
66 match *self {
67 Resource::Cid(cid) => write!(f, "{}", cid),
68 Resource::Url(ref url) => write!(f, "{}", url),
69 }
70 }
71}
72
73#[derive(Debug, Clone)]
81pub(crate) struct AOTContext<'a> {
82 dag: Dag<'a>,
83 awaiting: Promises,
84 indexed_resources: IndexedResources,
85}
86
87impl AOTContext<'static> {
88 #[allow(dead_code)]
93 pub(crate) fn dot(&self, name: &str, path: &Path) -> anyhow::Result<()> {
94 DagLegend::new(self.dag.nodes())
95 .with_name(name)
96 .save_to(
97 path.to_str()
98 .ok_or_else(|| anyhow!("path is not correctly formatted"))?,
99 )
100 .map_err(|e| anyhow!(e))
101 }
102}
103
104#[derive(Debug, Clone, PartialEq)]
108pub(crate) struct Vertex<'a> {
109 pub(crate) instruction: Instruction<'a, Arg>,
110 pub(crate) parsed: Parsed<Arg>,
111 pub(crate) invocation: Pointer,
112}
113
114#[derive(Debug, Clone, PartialEq)]
116pub(crate) enum Origin {
117 InFlow,
119 OutFlow,
121}
122
123#[derive(Debug, Clone, PartialEq, Default)]
125pub(crate) struct Promises {
126 pub(crate) in_flow: Vec<Cid>,
127 pub(crate) out_flow: Vec<Cid>,
128}
129
130impl Promises {
131 pub(crate) fn new(in_flow: Vec<Cid>, out_flow: Vec<Cid>) -> Promises {
134 Promises { in_flow, out_flow }
135 }
136
137 pub(crate) fn iter(&self) -> impl Iterator<Item = (Origin, &Cid)> {
139 let in_iter = self.in_flow.iter().map(|cid| (Origin::InFlow, cid));
140 let out_iter = self.out_flow.iter().map(|cid| (Origin::OutFlow, cid));
141 in_iter.chain(out_iter)
142 }
143}
144
145impl<'a> Vertex<'a> {
146 fn new(
147 instruction: Instruction<'a, Arg>,
148 parsed: Parsed<Arg>,
149 invocation: Pointer,
150 ) -> Vertex<'a> {
151 Vertex {
152 instruction,
153 parsed,
154 invocation,
155 }
156 }
157}
158
159impl<'a> Builder<'a> {
160 pub fn new(workflow: Workflow<'a, Arg>) -> Builder<'a> {
162 Builder(workflow)
163 }
164
165 pub fn into_inner(self) -> Workflow<'a, Arg> {
167 self.0
168 }
169
170 pub fn inner(&self) -> &Workflow<'a, Arg> {
172 &self.0
173 }
174
175 pub(crate) fn graph(self) -> Result<ExecutionGraph<'a>, Error> {
177 let aot = self.aot()?;
178 if let Err(_e) = aot.dag.detect_duplicates() {
179 homestar_invocation::bail!(Error::DuplicateTask)
180 }
181
182 match aot.dag.build_schedule() {
183 Ok(schedule) => Ok(ExecutionGraph {
184 schedule: schedule.batches,
185 awaiting: aot.awaiting,
186 indexed_resources: aot.indexed_resources,
187 }),
188 Err(e) => homestar_invocation::bail!(Error::InvalidSchedule(e.to_string())),
189 }
190 }
191
192 fn aot(self) -> anyhow::Result<AOTContext<'a>> {
193 let lookup_table = self.lookup_table()?;
194 let (mut dag, unawaits, awaited, promised_cids, resources) =
195 self.into_inner().tasks().into_iter().enumerate().try_fold(
196 (
197 Dag::default(),
198 vec![],
199 vec![],
200 (vec![], vec![]),
201 IndexMap::new(),
202 ),
203 |(
204 mut dag,
205 mut unawaits,
206 mut awaited,
207 (mut in_flows, mut out_flows),
208 mut resources,
209 ),
210 (i, task)| {
211 let instr_cid = task.instruction_cid()?;
212 debug!(
213 subject = "task.instruction",
214 category = "aot.information",
215 "instruction cid of task: {}",
216 instr_cid
217 );
218
219 let ptr: Pointer = Invocation::<Arg>::from(task.clone()).try_into()?;
221
222 let RunInstruction::Expanded(instr) = task.into_instruction() else {
223 bail!("workflow tasks/instructions must be expanded / inlined")
224 };
225
226 resources
227 .entry(instr_cid)
228 .or_insert_with(|| vec![Resource::Url(instr.resource().to_owned())]);
229 let parsed = instr.input().parse()?;
230 let deferred = parsed.args().deferreds();
231 let reads = deferred.fold(vec![], |mut in_flow_reads, cid| {
232 if let Some(v) = lookup_table.get(&cid) {
233 in_flows.push(cid);
234 in_flow_reads.push(*v)
235 } else {
236 out_flows.push(cid);
237 }
238 in_flow_reads
241 });
242
243 parsed.args().links().for_each(|cid| {
244 resources
245 .entry(instr_cid)
246 .and_modify(|prev_rscs| {
247 prev_rscs.push(Resource::Cid(cid.to_owned()));
248 })
249 .or_insert_with(|| vec![Resource::Cid(cid.to_owned())]);
250 });
251
252 let node = Node::new(Vertex::new(instr.to_owned(), parsed, ptr))
253 .with_name(instr_cid.to_string())
254 .with_result(i);
255
256 if !reads.is_empty() {
257 dag.add_node(node.with_reads(reads.clone()));
258 awaited.extend(reads);
259 } else {
260 unawaits.push(node);
261 }
262
263 Ok::<_, anyhow::Error>((
264 dag,
265 unawaits,
266 awaited,
267 (in_flows, out_flows),
268 resources,
269 ))
270 },
271 )?;
272
273 for mut node in unawaits.clone().into_iter() {
274 if node.get_results().any(|r| awaited.contains(r)) {
275 dag.add_node(node);
276 } else {
277 node.set_barrier(1);
279 dag.add_node(node);
280 }
281 }
282
283 Ok(AOTContext {
284 dag,
285 awaiting: Promises::new(promised_cids.0, promised_cids.1),
286 indexed_resources: IndexedResources(resources),
287 })
288 }
289
290 fn lookup_table(&self) -> anyhow::Result<IndexMap<Cid, usize>> {
293 self.inner()
294 .tasks_ref()
295 .iter()
296 .enumerate()
297 .try_fold(IndexMap::new(), |mut acc, (i, t)| {
298 acc.insert(t.instruction_cid()?, i);
299 Ok::<_, anyhow::Error>(acc)
300 })
301 }
302}
303
304#[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize, AsExpression, FromSqlRow)]
306#[diesel(sql_type = Binary)]
307pub struct IndexedResources(IndexMap<Cid, Vec<Resource>>);
308
309impl IndexedResources {
310 #[allow(dead_code)]
313 pub(crate) fn new(map: IndexMap<Cid, Vec<Resource>>) -> IndexedResources {
314 IndexedResources(map)
315 }
316
317 #[allow(dead_code)]
319 pub(crate) fn inner(&self) -> &IndexMap<Cid, Vec<Resource>> {
320 &self.0
321 }
322
323 #[allow(dead_code)]
325 pub(crate) fn into_inner(self) -> IndexMap<Cid, Vec<Resource>> {
326 self.0
327 }
328
329 #[allow(dead_code)]
331 pub(crate) fn len(&self) -> usize {
332 self.0.len()
333 }
334
335 #[allow(dead_code)]
337 pub(crate) fn is_empty(&self) -> bool {
338 self.0.is_empty()
339 }
340
341 #[allow(dead_code)]
345 pub(crate) fn get(&self, cid: &Cid) -> Option<&Vec<Resource>> {
346 self.0.get(cid)
347 }
348
349 #[allow(dead_code)]
351 pub(crate) fn iter(&self) -> impl Iterator<Item = &Resource> {
352 self.0.values().flatten().unique()
353 }
354
355 #[allow(dead_code)]
357 pub(crate) fn into_iter(self) -> impl Iterator<Item = Resource> {
358 self.0.into_values().flatten().unique()
359 }
360}
361
362impl From<IndexedResources> for Ipld {
363 fn from(resources: IndexedResources) -> Self {
364 let btreemap: BTreeMap<String, Ipld> = resources
365 .0
366 .into_iter()
367 .map(|(k, v)| {
368 (
369 k.to_string(),
370 Ipld::List(
371 v.into_iter()
372 .map(|v| match v {
373 Resource::Url(url) => Ipld::String(url.to_string()),
374 Resource::Cid(cid) => Ipld::Link(cid),
375 })
376 .collect(),
377 ),
378 )
379 })
380 .collect();
381 Ipld::Map(btreemap)
382 }
383}
384
385impl TryFrom<Ipld> for IndexedResources {
386 type Error = anyhow::Error;
387
388 fn try_from(ipld: Ipld) -> Result<Self, Self::Error> {
389 let map = from_ipld::<BTreeMap<String, Ipld>>(ipld)?
390 .into_iter()
391 .map(|(k, v)| {
392 let cid = Cid::try_from(k)?;
393 let list = from_ipld::<Vec<Ipld>>(v)?;
394 let rscs = list
395 .into_iter()
396 .map(|v| {
397 Ok(match v {
398 Ipld::String(url) => Resource::Url(Url::parse(&url)?),
399 Ipld::Link(cid) => Resource::Cid(cid),
400 _ => bail!("invalid resource type"),
401 })
402 })
403 .collect::<Result<Vec<Resource>, anyhow::Error>>()?;
404
405 Ok((cid, rscs))
406 })
407 .collect::<Result<IndexMap<Cid, Vec<Resource>>, anyhow::Error>>()?;
408
409 Ok(IndexedResources(map))
410 }
411}
412
413impl TryFrom<IndexedResources> for Vec<u8> {
414 type Error = anyhow::Error;
415
416 fn try_from(resources: IndexedResources) -> Result<Self, Self::Error> {
417 let ipld = Ipld::from(resources);
418 DagCborCodec.encode(&ipld)
419 }
420}
421
422impl ToSql<Binary, Sqlite> for IndexedResources
423where
424 [u8]: ToSql<Binary, Sqlite>,
425{
426 fn to_sql<'b>(&'b self, out: &mut Output<'b, '_, Sqlite>) -> serialize::Result {
427 let bytes: Vec<u8> = self.to_owned().try_into()?;
428 out.set_value(bytes);
429 Ok(IsNull::No)
430 }
431}
432
433impl<DB> FromSql<Binary, DB> for IndexedResources
434where
435 DB: Backend,
436 *const [u8]: FromSql<Binary, DB>,
437{
438 fn from_sql(bytes: DB::RawValue<'_>) -> deserialize::Result<Self> {
439 let raw_bytes = <*const [u8] as FromSql<Binary, DB>>::from_sql(bytes)?;
440 let raw_bytes: &[u8] = unsafe { &*raw_bytes };
441 let ipld: Ipld = DagCborCodec.decode(raw_bytes)?;
442 let decoded: IndexedResources = ipld.try_into()?;
443 Ok(decoded)
444 }
445}
446
447#[cfg(test)]
448mod test {
449 use super::*;
450 use homestar_invocation::{
451 authority::UcanPrf,
452 ipld::DagCbor,
453 pointer::{Await, AwaitResult},
454 task::{
455 instruction::{Ability, Input},
456 Resources,
457 },
458 test_utils, Task, Unit,
459 };
460
461 #[test]
462 fn ipld_roundtrip_indexed_resources() {
463 let (instruction1, instruction2, _) = test_utils::related_wasm_instructions::<Unit>();
464
465 let mut index_map = IndexMap::new();
466 index_map.insert(
467 instruction1.clone().to_cid().unwrap(),
468 vec![Resource::Url(instruction1.resource().to_owned())],
469 );
470 index_map.insert(
471 instruction2.clone().to_cid().unwrap(),
472 vec![Resource::Url(instruction2.resource().to_owned())],
473 );
474 let indexed_resources = IndexedResources::new(index_map);
475
476 let ipld = Ipld::from(indexed_resources.clone());
477 let ipld_to_indexed_resources = ipld.try_into().unwrap();
478 assert_eq!(indexed_resources, ipld_to_indexed_resources);
479 }
480
481 #[test]
482 fn dag_to_dot() {
483 let config = Resources::default();
484 let instruction1 = test_utils::wasm_instruction::<Arg>();
485 let (instruction2, _) = test_utils::wasm_instruction_with_nonce::<Arg>();
486 let task1 = Task::new(
487 RunInstruction::Expanded(instruction1),
488 config.clone().into(),
489 UcanPrf::default(),
490 );
491 let task2 = Task::new(
492 RunInstruction::Expanded(instruction2),
493 config.into(),
494 UcanPrf::default(),
495 );
496
497 let workflow = Workflow::new(vec![task1, task2]);
498 let builder = Builder::new(workflow);
499 let aot = builder.aot().unwrap();
500
501 aot.dot("test", Path::new("test.dot")).unwrap();
502 assert!(Path::new("test.dot").exists());
503 }
504
505 #[test]
506 fn build_parallel_schedule() {
507 let config = Resources::default();
508 let instruction1 = test_utils::wasm_instruction::<Arg>();
509 let (instruction2, _) = test_utils::wasm_instruction_with_nonce::<Arg>();
510 let task1 = Task::new(
511 RunInstruction::Expanded(instruction1),
512 config.clone().into(),
513 UcanPrf::default(),
514 );
515 let task2 = Task::new(
516 RunInstruction::Expanded(instruction2),
517 config.into(),
518 UcanPrf::default(),
519 );
520
521 let tasks = vec![task1.clone(), task2.clone()];
522
523 let workflow = Workflow::new(tasks);
524 let builder = Builder::new(workflow);
525 let dag = builder.aot().unwrap().dag;
526
527 let instr1 = task1.instruction_cid().unwrap().to_string();
528 let instr2 = task2.instruction_cid().unwrap().to_string();
529
530 assert!(dag
531 .nodes()
532 .any(|node| node.name() == instr1 || node.name() == instr2));
533 }
534
535 #[test]
536 fn build_seq_schedule() {
537 let config = Resources::default();
538 let (instruction1, instruction2, _) = test_utils::related_wasm_instructions::<Arg>();
539 let task1 = Task::new(
540 RunInstruction::Expanded(instruction1),
541 config.clone().into(),
542 UcanPrf::default(),
543 );
544 let task2 = Task::new(
545 RunInstruction::Expanded(instruction2),
546 config.into(),
547 UcanPrf::default(),
548 );
549
550 let workflow = Workflow::new(vec![task1.clone(), task2.clone()]);
551 let builder = Builder::new(workflow);
552 let dag = builder.aot().unwrap().dag;
553
554 let instr1 = task1.instruction_cid().unwrap().to_string();
555 let instr2 = task2.instruction_cid().unwrap().to_string();
556
557 dagga::assert_batches(&[&instr1, &instr2], dag);
559 }
560
561 #[test]
562 fn build_mixed_graph() {
563 let config = Resources::default();
564 let (instruction1, instruction2, instruction3) =
565 test_utils::related_wasm_instructions::<Arg>();
566 let task1 = Task::new(
567 RunInstruction::Expanded(instruction1.clone()),
568 config.clone().into(),
569 UcanPrf::default(),
570 );
571 let task2 = Task::new(
572 RunInstruction::Expanded(instruction2),
573 config.clone().into(),
574 UcanPrf::default(),
575 );
576 let task3 = Task::new(
577 RunInstruction::Expanded(instruction3),
578 config.clone().into(),
579 UcanPrf::default(),
580 );
581
582 let (instruction4, _) = test_utils::wasm_instruction_with_nonce::<Arg>();
583 let task4 = Task::new(
584 RunInstruction::Expanded(instruction4),
585 config.clone().into(),
586 UcanPrf::default(),
587 );
588
589 let (instruction5, _) = test_utils::wasm_instruction_with_nonce::<Arg>();
590 let task5 = Task::new(
591 RunInstruction::Expanded(instruction5),
592 config.clone().into(),
593 UcanPrf::default(),
594 );
595
596 let promise1 = Await::new(
597 Pointer::new(instruction1.clone().to_cid().unwrap()),
598 AwaitResult::Ok,
599 );
600
601 let dep_instr = Instruction::new(
602 instruction1.resource().to_owned(),
603 Ability::from("wasm/run"),
604 Input::<Arg>::Ipld(Ipld::Map(BTreeMap::from([
605 ("func".into(), Ipld::String("add_two".to_string())),
606 (
607 "args".into(),
608 Ipld::List(vec![Ipld::from(promise1.clone())]),
609 ),
610 ]))),
611 );
612
613 let task6 = Task::new(
614 RunInstruction::Expanded(dep_instr),
615 config.into(),
616 UcanPrf::default(),
617 );
618
619 let tasks = vec![
620 task6.clone(),
621 task1.clone(),
622 task2.clone(),
623 task3.clone(),
624 task4.clone(),
625 task5.clone(),
626 ];
627 let workflow = Workflow::new(tasks);
628
629 let instr1 = task1.instruction_cid().unwrap().to_string();
630 let instr2 = task2.instruction_cid().unwrap().to_string();
631 let instr3 = task3.instruction_cid().unwrap().to_string();
632 let instr4 = task4.instruction_cid().unwrap().to_string();
633 let instr5 = task5.instruction_cid().unwrap().to_string();
634 let instr6 = task6.instruction_cid().unwrap().to_string();
635
636 let builder = Builder::new(workflow);
637 let schedule = builder.graph().unwrap().schedule;
638 let nodes = schedule
639 .into_iter()
640 .fold(vec![], |mut acc: Vec<String>, vec| {
641 if vec.len() == 1 {
642 acc.push(vec.first().unwrap().name().to_string())
643 } else {
644 let mut tmp = vec![];
645 for node in vec {
646 tmp.push(node.name().to_string());
647 }
648 acc.push(tmp.join(", "))
649 }
650
651 acc
652 });
653
654 assert!(
655 nodes
656 == vec![
657 format!("{instr1}"),
658 format!("{instr6}, {instr2}"),
659 format!("{instr3}"),
660 format!("{instr4}, {instr5}")
661 ]
662 || nodes
663 == vec![
664 format!("{instr1}"),
665 format!("{instr6}, {instr2}"),
666 format!("{instr3}"),
667 format!("{instr5}, {instr4}")
668 ]
669 || nodes
670 == vec![
671 format!("{instr1}"),
672 format!("{instr2}, {instr6}"),
673 format!("{instr3}"),
674 format!("{instr4}, {instr5}")
675 ]
676 || nodes
677 == vec![
678 format!("{instr1}"),
679 format!("{instr2}, {instr6}"),
680 format!("{instr3}"),
681 format!("{instr5}, {instr4}")
682 ]
683 );
684 }
685}