homestar_runtime/workflow/
info.rs

1#![allow(missing_docs)]
2use super::IndexedResources;
3use crate::{
4    channel::{AsyncChannel, AsyncChannelSender},
5    db::{Connection, Database},
6    event_handler::{
7        event::QueryRecord,
8        swarm_event::{FoundEvent, ResponseEvent},
9        Event,
10    },
11    network::swarm::CapsuleTag,
12    settings, Db, Receipt,
13};
14use anyhow::{anyhow, bail, Result};
15use chrono::{NaiveDateTime, Utc};
16use diesel::{Associations, Identifiable, Insertable, Queryable, Selectable};
17use faststr::FastStr;
18use homestar_invocation::{ipld::DagJson, Pointer};
19use libipld::{cbor::DagCborCodec, prelude::Codec, serde::from_ipld, Cid, Ipld};
20use serde::{Deserialize, Serialize};
21use std::{collections::BTreeMap, fmt, sync::Arc, time::Duration};
22use tokio::{
23    runtime::Handle,
24    time::{timeout_at, Instant},
25};
26use tracing::info;
27
28/// [Workflow] header tag, for sharing workflow information over libp2p.
29///
30/// [Workflow]: homestar_workflow::Workflow
31pub const WORKFLOW_TAG: &str = "ipvm/workflow";
32
33const CID_KEY: &str = "cid";
34const NAME_KEY: &str = "name";
35const NUM_TASKS_KEY: &str = "num_tasks";
36const PROGRESS_KEY: &str = "progress";
37const PROGRESS_COUNT_KEY: &str = "progress_count";
38const RESOURCES_KEY: &str = "resources";
39
40/// Status of a [Workflow].
41///
42/// [Workflow]: homestar_workflow::Workflow
43#[derive(Debug, Clone, PartialEq, diesel_derive_enum::DbEnum)]
44pub enum Status {
45    /// Workflow is pending - default case.
46    Pending,
47    /// Workflow is currently running.
48    Running,
49    /// Workflow has been completed.
50    Completed,
51    /// Workflow is stuck, awaiting CIDs we can't find on the network.
52    Stuck,
53}
54
55/// [Workflow] information stored in the database.
56///
57/// [Workflow]: homestar_workflow::Workflow
58#[derive(Debug, Clone, PartialEq, Queryable, Insertable, Identifiable, Selectable)]
59#[diesel(table_name = crate::db::schema::workflows, primary_key(cid))]
60pub struct Stored {
61    /// Wrapped-Cid of [Workflow].
62    ///
63    /// [Workflow]: homestar_workflow::Workflow
64    pub(crate) cid: Pointer,
65    /// Local name of [Workflow].
66    ///
67    /// [Workflow]: homestar_workflow::Workflow
68    pub(crate) name: Option<String>,
69    /// Number of tasks in [Workflow].
70    ///
71    /// [Workflow]: homestar_workflow::Workflow
72    pub(crate) num_tasks: i32,
73    /// Map of [Instruction] Cids to resources.
74    ///
75    /// [Instruction]: homestar_invocation::task::Instruction
76    pub(crate) resources: IndexedResources,
77    /// Local timestamp of [Workflow] creation.
78    ///
79    /// [Workflow]: homestar_workflow::Workflow
80    pub(crate) created_at: NaiveDateTime,
81    /// Local timestamp of [Workflow] completion.
82    ///
83    /// [Workflow]: homestar_workflow::Workflow
84    pub(crate) completed_at: Option<NaiveDateTime>,
85    /// Status of [Workflow].
86    ///
87    /// [Workflow]: homestar_workflow::Workflow
88    pub(crate) status: Status,
89    /// Retries of [Workflow] when checking for provider.
90    ///
91    /// [Workflow]: homestar_workflow::Workflow
92    pub(crate) retries: i32,
93}
94
95impl Stored {
96    /// Create a new [Stored] workflow for the [db].
97    ///
98    /// [db]: Database
99    pub fn new(
100        cid: Pointer,
101        name: Option<String>,
102        num_tasks: i32,
103        resources: IndexedResources,
104        created_at: NaiveDateTime,
105    ) -> Self {
106        Self {
107            cid,
108            name,
109            num_tasks,
110            resources,
111            created_at,
112            completed_at: None,
113            status: Status::Pending,
114            retries: 0,
115        }
116    }
117
118    /// Create a new [Stored] workflow for the [db] with a default timestamp.
119    ///
120    /// [db]: Database
121    pub fn new_with_resources(
122        cid: Pointer,
123        name: Option<String>,
124        num_tasks: i32,
125        resources: IndexedResources,
126    ) -> Self {
127        Self {
128            cid,
129            name,
130            num_tasks,
131            resources,
132            created_at: Utc::now().naive_utc(),
133            completed_at: None,
134            status: Status::Pending,
135            retries: 0,
136        }
137    }
138
139    /// Create a default [Stored] workflow for the [db].
140    ///
141    /// [db]: Database
142    pub fn default(cid: Pointer, num_tasks: i32) -> Self {
143        let name = cid.to_string();
144        Self {
145            cid,
146            name: Some(name),
147            num_tasks,
148            resources: IndexedResources::default(),
149            created_at: Utc::now().naive_utc(),
150            completed_at: None,
151            status: Status::Pending,
152            retries: 0,
153        }
154    }
155}
156
157/// [Workflow] information stored in the database, tied to [receipts].
158///
159/// [Workflow]: homestar_workflow::Workflow
160/// [receipts]: crate::Receipt
161#[derive(
162    Debug, Clone, PartialEq, Queryable, Insertable, Identifiable, Selectable, Associations, Hash,
163)]
164#[diesel(belongs_to(Receipt, foreign_key = receipt_cid))]
165#[diesel(belongs_to(Stored, foreign_key = workflow_cid))]
166#[diesel(table_name = crate::db::schema::workflows_receipts, primary_key(workflow_cid, receipt_cid))]
167pub(crate) struct StoredReceipt {
168    pub(crate) workflow_cid: Pointer,
169    pub(crate) receipt_cid: Pointer,
170}
171
172impl StoredReceipt {
173    pub(crate) fn new(workflow_cid: Pointer, receipt_cid: Pointer) -> Self {
174        Self {
175            workflow_cid,
176            receipt_cid,
177        }
178    }
179}
180
181/// Associated [Workflow] information, separated from [Workflow] struct in order
182/// to relate to it as a key-value relationship of (workflow)
183/// cid => [Info].
184///
185/// [Workflow]: homestar_workflow::Workflow
186#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
187pub struct Info {
188    pub(crate) cid: Cid,
189    pub(crate) name: Option<FastStr>,
190    pub(crate) num_tasks: u32,
191    pub(crate) progress: Vec<Cid>,
192    pub(crate) progress_count: u32,
193    pub(crate) resources: IndexedResources,
194}
195
196impl fmt::Display for Info {
197    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
198        write!(
199            f,
200            "cid: {}, local_name: {}, name, progress: {}/{}",
201            self.cid,
202            self.name.clone().unwrap_or(self.cid.to_string().into()),
203            self.progress_count,
204            self.num_tasks
205        )
206    }
207}
208
209impl Info {
210    /// Create a workflow information structure from a [Stored] workflow and
211    /// `progress` vector.
212    pub fn new(stored: Stored, progress: Vec<Cid>) -> Self {
213        let progress_count = progress.len() as u32;
214        let cid = stored.cid.cid();
215        Self {
216            cid,
217            name: stored.name.map(|name| name.into()),
218            num_tasks: stored.num_tasks as u32,
219            progress,
220            progress_count,
221            resources: stored.resources,
222        }
223    }
224
225    /// Create a default workflow [Info] given a Cid and number of tasks.
226    pub fn default(stored: Stored) -> Self {
227        let cid = stored.cid.cid();
228        Self {
229            cid,
230            name: stored.name.map(|name| name.into()),
231            num_tasks: stored.num_tasks as u32,
232            progress: vec![],
233            progress_count: 0,
234            resources: stored.resources,
235        }
236    }
237
238    /// Get workflow progress as a vector of Cids.
239    pub fn progress(&self) -> &Vec<Cid> {
240        &self.progress
241    }
242
243    /// Get workflow progress as a number of receipts completed.
244    pub fn progress_count(&self) -> u32 {
245        self.progress_count
246    }
247
248    /// Get the number of tasks in the [Workflow].
249    ///
250    /// [Workflow]: homestar_workflow::Workflow
251    pub fn num_tasks(&self) -> u32 {
252        self.num_tasks
253    }
254
255    /// Get unique identifier, Cid, of [Workflow].
256    ///
257    /// [Workflow]: homestar_workflow::Workflow
258    pub(crate) fn cid(&self) -> Cid {
259        self.cid
260    }
261
262    /// Get the Cid of a [Workflow] as a [String].
263    ///
264    /// [Workflow]: homestar_workflow::Workflow
265    #[allow(dead_code)]
266    pub(crate) fn cid_as_string(&self) -> String {
267        self.cid.to_string()
268    }
269
270    /// Get the Cid of a [Workflow] as bytes.
271    ///
272    /// [Workflow]: homestar_workflow::Workflow
273    pub(crate) fn cid_as_bytes(&self) -> Vec<u8> {
274        self.cid().to_bytes()
275    }
276
277    /// Set map of [Instruction] Cids to resources.
278    ///
279    /// [Instruction]: homestar_invocation::task::Instruction
280    #[allow(dead_code)]
281    pub(crate) fn set_resources(&mut self, resources: IndexedResources) {
282        self.resources = resources;
283    }
284
285    /// Set the progress / step of the [Workflow] completed, which
286    /// may not be the same as the `progress` vector of Cids.
287    ///
288    /// [Workflow]: homestar_workflow::Workflow
289    #[allow(dead_code)]
290    pub(crate) fn set_progress_count(&mut self, progress_count: u32) {
291        self.progress_count = progress_count;
292    }
293
294    /// Set the progress / step of the [Info].
295    pub(crate) fn increment_progress(&mut self, new_cid: Cid) {
296        self.progress.push(new_cid);
297        self.progress_count = self.progress.len() as u32 + 1;
298    }
299
300    /// Capsule-wrapper for [Info] to to be shared over libp2p as
301    /// DagCbor encoded bytes.
302    pub(crate) fn capsule(&self) -> anyhow::Result<Vec<u8>> {
303        let info_ipld = Ipld::from(self.to_owned());
304        let capsule = if let Ipld::Map(map) = info_ipld {
305            Ok(Ipld::Map(BTreeMap::from([(
306                WORKFLOW_TAG.into(),
307                Ipld::Map(map),
308            )])))
309        } else {
310            Err(anyhow!("workflow info to Ipld conversion is not a map"))
311        }?;
312
313        DagCborCodec.encode(&capsule)
314    }
315
316    /// Retrieve available [Info] from the database or libp2p given a
317    /// [Workflow], or return a default/new version of [Info] if none is found.
318    ///
319    /// [Workflow]: homestar_workflow::Workflow
320    pub(crate) async fn init(
321        workflow_cid: Cid,
322        workflow_len: u32,
323        name: FastStr,
324        resources: IndexedResources,
325        network_settings: settings::Dht,
326        event_sender: Arc<AsyncChannelSender<Event>>,
327        mut conn: Connection,
328    ) -> Result<(Self, NaiveDateTime)> {
329        let timestamp = Utc::now().naive_utc();
330        match Db::get_workflow_info(workflow_cid, &mut conn) {
331            Ok((Some(stored_name), info)) if stored_name != name => {
332                Db::update_local_name(&name, &mut conn)?;
333                Ok((info, timestamp))
334            }
335            Ok((_, info)) => Ok((info, timestamp)),
336            Err(_err) => {
337                info!(
338                    subject = "workflow.init.db.check",
339                    category = "workflow",
340                    cid = workflow_cid.to_string(),
341                    "workflow information not available in the database"
342                );
343
344                let stored = Stored::new(
345                    Pointer::new(workflow_cid),
346                    Some(name.to_string()),
347                    workflow_len as i32,
348                    resources,
349                    timestamp,
350                );
351
352                let result = Db::store_workflow(stored.clone(), &mut conn)?;
353                let workflow_info = Self::default(result);
354
355                // spawn a separate task to retrieve workflow info from the
356                // network and store it in the database if it finds it.
357                let handle = Handle::current();
358                handle.spawn(async move {
359                    match Self::retrieve_from_dht(
360                        workflow_cid,
361                        event_sender.clone(),
362                        network_settings.p2p_workflow_info_timeout,
363                    )
364                    .await
365                    {
366                        Ok(workflow_info) => Ok(workflow_info),
367                        Err(_) => {
368                            Self::retrieve_from_provider(
369                                workflow_cid,
370                                event_sender,
371                                network_settings.p2p_provider_timeout,
372                            )
373                            .await
374                        }
375                    }
376                });
377
378                Ok((workflow_info, timestamp))
379            }
380        }
381    }
382
383    /// Retrieve available [Info] from the database or libp2p given a
384    /// workflow Cid.
385    pub(crate) async fn retrieve<'a>(
386        workflow_cid: Cid,
387        #[allow(unused)] event_sender: Arc<AsyncChannelSender<Event>>,
388        mut conn: Option<Connection>,
389        #[allow(unused)] p2p_provider_timeout: Duration,
390    ) -> Result<Self> {
391        let workflow_info = match conn
392            .as_mut()
393            .and_then(|conn| Db::get_workflow_info(workflow_cid, conn).ok())
394        {
395            Some((_name, workflow_info)) => Ok(workflow_info),
396            None => {
397                info!(
398                    subject = "workflow.retrieve.db.check",
399                    category = "workflow",
400                    cid = workflow_cid.to_string(),
401                    "workflow information not available in the database"
402                );
403
404                Self::retrieve_from_provider(workflow_cid, event_sender, p2p_provider_timeout).await
405            }
406        }?;
407
408        Ok(workflow_info)
409    }
410
411    // Retrieve [Info] from the DHT and send a [FoundEvent::Workflow] event
412    // if info is found.
413    async fn retrieve_from_dht<'a>(
414        workflow_cid: Cid,
415        event_sender: Arc<AsyncChannelSender<Event>>,
416        p2p_workflow_info_timeout: Duration,
417    ) -> Result<Info> {
418        let (tx, rx) = AsyncChannel::oneshot();
419        event_sender
420            .send_async(Event::FindRecord(QueryRecord::with(
421                workflow_cid,
422                CapsuleTag::Workflow,
423                Some(tx),
424            )))
425            .await?;
426
427        match timeout_at(Instant::now() + p2p_workflow_info_timeout, rx.recv_async()).await {
428            Ok(Ok(ResponseEvent::Found(Ok(FoundEvent::Workflow(event))))) => {
429                #[cfg(feature = "websocket-notify")]
430                let _ = event_sender
431                    .send_async(Event::StoredRecord(FoundEvent::Workflow(event.clone())))
432                    .await;
433
434                Ok(event.workflow_info)
435            }
436            Ok(Ok(ResponseEvent::Found(Err(_err)))) => {
437                bail!("failed to find workflow info with cid {workflow_cid}")
438            }
439            Ok(Ok(event)) => {
440                bail!("received unexpected event {event:?} for workflow {workflow_cid}")
441            }
442            Ok(Err(err)) => {
443                bail!("unexpected error while retrieving workflow info: {err}")
444            }
445            Err(_) => {
446                bail!(
447                    "timeout deadline reached while finding workflow info with cid {workflow_cid}"
448                )
449            }
450        }
451    }
452
453    // Retrieve [Info] from a provider and send a [FoundEvent::Workflow] event
454    // if info is found.
455    async fn retrieve_from_provider<'a>(
456        workflow_cid: Cid,
457        event_sender: Arc<AsyncChannelSender<Event>>,
458        p2p_provider_timeout: Duration,
459    ) -> Result<Info> {
460        let (tx, rx) = AsyncChannel::oneshot();
461        event_sender
462            .send_async(Event::GetProviders(QueryRecord::with(
463                workflow_cid,
464                CapsuleTag::Workflow,
465                Some(tx),
466            )))
467            .await?;
468
469        match timeout_at(Instant::now() + p2p_provider_timeout, rx.recv_async()).await {
470            Ok(Ok(ResponseEvent::Found(Ok(FoundEvent::Workflow(event))))) => {
471                #[cfg(feature = "websocket-notify")]
472                let _ = event_sender
473                    .send_async(Event::StoredRecord(FoundEvent::Workflow(event.clone())))
474                    .await;
475
476                Ok(event.workflow_info)
477            }
478            Ok(Ok(ResponseEvent::Found(Err(err)))) => {
479                bail!("failure in attempting to find event: {err}")
480            }
481            Ok(Ok(event)) => {
482                bail!("received unexpected event {event:?} for workflow {workflow_cid}")
483            }
484            Ok(Err(err)) => {
485                bail!("unexpected error while retrieving workflow info: {err}")
486            }
487            Err(_) => {
488                bail!(
489                    "timeout deadline reached while finding workflow info with cid {workflow_cid}"
490                )
491            }
492        }
493    }
494}
495
496impl From<Info> for Ipld {
497    fn from(workflow: Info) -> Self {
498        Ipld::Map(BTreeMap::from([
499            (CID_KEY.into(), Ipld::Link(workflow.cid)),
500            (
501                NAME_KEY.into(),
502                workflow
503                    .name
504                    .as_ref()
505                    .map(|name| name.to_string().into())
506                    .unwrap_or(Ipld::Null),
507            ),
508            (
509                NUM_TASKS_KEY.into(),
510                Ipld::Integer(workflow.num_tasks as i128),
511            ),
512            (
513                PROGRESS_KEY.into(),
514                Ipld::List(workflow.progress.into_iter().map(Ipld::Link).collect()),
515            ),
516            (
517                PROGRESS_COUNT_KEY.into(),
518                Ipld::Integer(workflow.progress_count as i128),
519            ),
520            (RESOURCES_KEY.into(), Ipld::from(workflow.resources)),
521        ]))
522    }
523}
524
525impl TryFrom<Ipld> for Info {
526    type Error = anyhow::Error;
527
528    fn try_from(ipld: Ipld) -> Result<Self, Self::Error> {
529        let map = from_ipld::<BTreeMap<String, Ipld>>(ipld)?;
530        let cid = from_ipld(
531            map.get(CID_KEY)
532                .ok_or_else(|| anyhow!("no `cid` set"))?
533                .to_owned(),
534        )?;
535        let name = map
536            .get(NAME_KEY)
537            .and_then(|ipld| match ipld {
538                Ipld::Null => None,
539                ipld => Some(ipld),
540            })
541            .and_then(|ipld| from_ipld(ipld.to_owned()).ok());
542        let num_tasks = from_ipld(
543            map.get(NUM_TASKS_KEY)
544                .ok_or_else(|| anyhow!("no `num_tasks` set"))?
545                .to_owned(),
546        )?;
547        let progress = from_ipld(
548            map.get(PROGRESS_KEY)
549                .ok_or_else(|| anyhow!("no `progress` set"))?
550                .to_owned(),
551        )?;
552        let progress_count = from_ipld(
553            map.get(PROGRESS_COUNT_KEY)
554                .ok_or_else(|| anyhow!("no `progress_count` set"))?
555                .to_owned(),
556        )?;
557        let resources = IndexedResources::try_from(
558            map.get(RESOURCES_KEY)
559                .ok_or_else(|| anyhow!("no `resources` set"))?
560                .to_owned(),
561        )?;
562
563        Ok(Self {
564            cid,
565            name,
566            num_tasks,
567            progress,
568            progress_count,
569            resources,
570        })
571    }
572}
573
574impl TryFrom<Info> for Vec<u8> {
575    type Error = anyhow::Error;
576
577    fn try_from(workflow_info: Info) -> Result<Self, Self::Error> {
578        let info_ipld = Ipld::from(workflow_info);
579        DagCborCodec.encode(&info_ipld)
580    }
581}
582
583impl TryFrom<Vec<u8>> for Info {
584    type Error = anyhow::Error;
585
586    fn try_from(bytes: Vec<u8>) -> Result<Self, Self::Error> {
587        let ipld: Ipld = DagCborCodec.decode(&bytes)?;
588        ipld.try_into()
589    }
590}
591
592impl DagJson for Info where Ipld: From<Info> {}
593
594#[cfg(test)]
595mod test {
596    use super::*;
597    use crate::workflow::Resource;
598    use homestar_invocation::{
599        authority::UcanPrf,
600        ipld::DagCbor,
601        task::{instruction::RunInstruction, Resources},
602        test_utils, Task,
603    };
604    use homestar_wasm::io::Arg;
605    use homestar_workflow::Workflow;
606    use indexmap::IndexMap;
607
608    #[test]
609    fn ipld_roundtrip_workflow_info() {
610        let config = Resources::default();
611        let (instruction1, instruction2, _) = test_utils::related_wasm_instructions::<Arg>();
612        let task1 = Task::new(
613            RunInstruction::Expanded(instruction1.clone()),
614            config.clone().into(),
615            UcanPrf::default(),
616        );
617        let task2 = Task::new(
618            RunInstruction::Expanded(instruction2.clone()),
619            config.into(),
620            UcanPrf::default(),
621        );
622
623        let mut index_map = IndexMap::new();
624        index_map.insert(
625            instruction1.clone().to_cid().unwrap(),
626            vec![Resource::Url(instruction1.resource().to_owned())],
627        );
628        index_map.insert(
629            instruction2.clone().to_cid().unwrap(),
630            vec![Resource::Url(instruction2.resource().to_owned())],
631        );
632
633        let workflow = Workflow::new(vec![task1.clone(), task2.clone()]);
634        let stored_info = Stored::new_with_resources(
635            Pointer::new(workflow.clone().to_cid().unwrap()),
636            None,
637            workflow.len() as i32,
638            IndexedResources::new(index_map),
639        );
640
641        let mut workflow_info = Info::default(stored_info);
642        workflow_info.increment_progress(task1.to_cid().unwrap());
643        workflow_info.increment_progress(task2.to_cid().unwrap());
644        let ipld = Ipld::from(workflow_info.clone());
645        assert_eq!(workflow_info, ipld.try_into().unwrap());
646    }
647}