1#![allow(missing_docs)]
2use super::IndexedResources;
3use crate::{
4 channel::{AsyncChannel, AsyncChannelSender},
5 db::{Connection, Database},
6 event_handler::{
7 event::QueryRecord,
8 swarm_event::{FoundEvent, ResponseEvent},
9 Event,
10 },
11 network::swarm::CapsuleTag,
12 settings, Db, Receipt,
13};
14use anyhow::{anyhow, bail, Result};
15use chrono::{NaiveDateTime, Utc};
16use diesel::{Associations, Identifiable, Insertable, Queryable, Selectable};
17use faststr::FastStr;
18use homestar_invocation::{ipld::DagJson, Pointer};
19use libipld::{cbor::DagCborCodec, prelude::Codec, serde::from_ipld, Cid, Ipld};
20use serde::{Deserialize, Serialize};
21use std::{collections::BTreeMap, fmt, sync::Arc, time::Duration};
22use tokio::{
23 runtime::Handle,
24 time::{timeout_at, Instant},
25};
26use tracing::info;
27
28pub const WORKFLOW_TAG: &str = "ipvm/workflow";
32
33const CID_KEY: &str = "cid";
34const NAME_KEY: &str = "name";
35const NUM_TASKS_KEY: &str = "num_tasks";
36const PROGRESS_KEY: &str = "progress";
37const PROGRESS_COUNT_KEY: &str = "progress_count";
38const RESOURCES_KEY: &str = "resources";
39
40#[derive(Debug, Clone, PartialEq, diesel_derive_enum::DbEnum)]
44pub enum Status {
45 Pending,
47 Running,
49 Completed,
51 Stuck,
53}
54
55#[derive(Debug, Clone, PartialEq, Queryable, Insertable, Identifiable, Selectable)]
59#[diesel(table_name = crate::db::schema::workflows, primary_key(cid))]
60pub struct Stored {
61 pub(crate) cid: Pointer,
65 pub(crate) name: Option<String>,
69 pub(crate) num_tasks: i32,
73 pub(crate) resources: IndexedResources,
77 pub(crate) created_at: NaiveDateTime,
81 pub(crate) completed_at: Option<NaiveDateTime>,
85 pub(crate) status: Status,
89 pub(crate) retries: i32,
93}
94
95impl Stored {
96 pub fn new(
100 cid: Pointer,
101 name: Option<String>,
102 num_tasks: i32,
103 resources: IndexedResources,
104 created_at: NaiveDateTime,
105 ) -> Self {
106 Self {
107 cid,
108 name,
109 num_tasks,
110 resources,
111 created_at,
112 completed_at: None,
113 status: Status::Pending,
114 retries: 0,
115 }
116 }
117
118 pub fn new_with_resources(
122 cid: Pointer,
123 name: Option<String>,
124 num_tasks: i32,
125 resources: IndexedResources,
126 ) -> Self {
127 Self {
128 cid,
129 name,
130 num_tasks,
131 resources,
132 created_at: Utc::now().naive_utc(),
133 completed_at: None,
134 status: Status::Pending,
135 retries: 0,
136 }
137 }
138
139 pub fn default(cid: Pointer, num_tasks: i32) -> Self {
143 let name = cid.to_string();
144 Self {
145 cid,
146 name: Some(name),
147 num_tasks,
148 resources: IndexedResources::default(),
149 created_at: Utc::now().naive_utc(),
150 completed_at: None,
151 status: Status::Pending,
152 retries: 0,
153 }
154 }
155}
156
157#[derive(
162 Debug, Clone, PartialEq, Queryable, Insertable, Identifiable, Selectable, Associations, Hash,
163)]
164#[diesel(belongs_to(Receipt, foreign_key = receipt_cid))]
165#[diesel(belongs_to(Stored, foreign_key = workflow_cid))]
166#[diesel(table_name = crate::db::schema::workflows_receipts, primary_key(workflow_cid, receipt_cid))]
167pub(crate) struct StoredReceipt {
168 pub(crate) workflow_cid: Pointer,
169 pub(crate) receipt_cid: Pointer,
170}
171
172impl StoredReceipt {
173 pub(crate) fn new(workflow_cid: Pointer, receipt_cid: Pointer) -> Self {
174 Self {
175 workflow_cid,
176 receipt_cid,
177 }
178 }
179}
180
181#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
187pub struct Info {
188 pub(crate) cid: Cid,
189 pub(crate) name: Option<FastStr>,
190 pub(crate) num_tasks: u32,
191 pub(crate) progress: Vec<Cid>,
192 pub(crate) progress_count: u32,
193 pub(crate) resources: IndexedResources,
194}
195
196impl fmt::Display for Info {
197 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
198 write!(
199 f,
200 "cid: {}, local_name: {}, name, progress: {}/{}",
201 self.cid,
202 self.name.clone().unwrap_or(self.cid.to_string().into()),
203 self.progress_count,
204 self.num_tasks
205 )
206 }
207}
208
209impl Info {
210 pub fn new(stored: Stored, progress: Vec<Cid>) -> Self {
213 let progress_count = progress.len() as u32;
214 let cid = stored.cid.cid();
215 Self {
216 cid,
217 name: stored.name.map(|name| name.into()),
218 num_tasks: stored.num_tasks as u32,
219 progress,
220 progress_count,
221 resources: stored.resources,
222 }
223 }
224
225 pub fn default(stored: Stored) -> Self {
227 let cid = stored.cid.cid();
228 Self {
229 cid,
230 name: stored.name.map(|name| name.into()),
231 num_tasks: stored.num_tasks as u32,
232 progress: vec![],
233 progress_count: 0,
234 resources: stored.resources,
235 }
236 }
237
238 pub fn progress(&self) -> &Vec<Cid> {
240 &self.progress
241 }
242
243 pub fn progress_count(&self) -> u32 {
245 self.progress_count
246 }
247
248 pub fn num_tasks(&self) -> u32 {
252 self.num_tasks
253 }
254
255 pub(crate) fn cid(&self) -> Cid {
259 self.cid
260 }
261
262 #[allow(dead_code)]
266 pub(crate) fn cid_as_string(&self) -> String {
267 self.cid.to_string()
268 }
269
270 pub(crate) fn cid_as_bytes(&self) -> Vec<u8> {
274 self.cid().to_bytes()
275 }
276
277 #[allow(dead_code)]
281 pub(crate) fn set_resources(&mut self, resources: IndexedResources) {
282 self.resources = resources;
283 }
284
285 #[allow(dead_code)]
290 pub(crate) fn set_progress_count(&mut self, progress_count: u32) {
291 self.progress_count = progress_count;
292 }
293
294 pub(crate) fn increment_progress(&mut self, new_cid: Cid) {
296 self.progress.push(new_cid);
297 self.progress_count = self.progress.len() as u32 + 1;
298 }
299
300 pub(crate) fn capsule(&self) -> anyhow::Result<Vec<u8>> {
303 let info_ipld = Ipld::from(self.to_owned());
304 let capsule = if let Ipld::Map(map) = info_ipld {
305 Ok(Ipld::Map(BTreeMap::from([(
306 WORKFLOW_TAG.into(),
307 Ipld::Map(map),
308 )])))
309 } else {
310 Err(anyhow!("workflow info to Ipld conversion is not a map"))
311 }?;
312
313 DagCborCodec.encode(&capsule)
314 }
315
316 pub(crate) async fn init(
321 workflow_cid: Cid,
322 workflow_len: u32,
323 name: FastStr,
324 resources: IndexedResources,
325 network_settings: settings::Dht,
326 event_sender: Arc<AsyncChannelSender<Event>>,
327 mut conn: Connection,
328 ) -> Result<(Self, NaiveDateTime)> {
329 let timestamp = Utc::now().naive_utc();
330 match Db::get_workflow_info(workflow_cid, &mut conn) {
331 Ok((Some(stored_name), info)) if stored_name != name => {
332 Db::update_local_name(&name, &mut conn)?;
333 Ok((info, timestamp))
334 }
335 Ok((_, info)) => Ok((info, timestamp)),
336 Err(_err) => {
337 info!(
338 subject = "workflow.init.db.check",
339 category = "workflow",
340 cid = workflow_cid.to_string(),
341 "workflow information not available in the database"
342 );
343
344 let stored = Stored::new(
345 Pointer::new(workflow_cid),
346 Some(name.to_string()),
347 workflow_len as i32,
348 resources,
349 timestamp,
350 );
351
352 let result = Db::store_workflow(stored.clone(), &mut conn)?;
353 let workflow_info = Self::default(result);
354
355 let handle = Handle::current();
358 handle.spawn(async move {
359 match Self::retrieve_from_dht(
360 workflow_cid,
361 event_sender.clone(),
362 network_settings.p2p_workflow_info_timeout,
363 )
364 .await
365 {
366 Ok(workflow_info) => Ok(workflow_info),
367 Err(_) => {
368 Self::retrieve_from_provider(
369 workflow_cid,
370 event_sender,
371 network_settings.p2p_provider_timeout,
372 )
373 .await
374 }
375 }
376 });
377
378 Ok((workflow_info, timestamp))
379 }
380 }
381 }
382
383 pub(crate) async fn retrieve<'a>(
386 workflow_cid: Cid,
387 #[allow(unused)] event_sender: Arc<AsyncChannelSender<Event>>,
388 mut conn: Option<Connection>,
389 #[allow(unused)] p2p_provider_timeout: Duration,
390 ) -> Result<Self> {
391 let workflow_info = match conn
392 .as_mut()
393 .and_then(|conn| Db::get_workflow_info(workflow_cid, conn).ok())
394 {
395 Some((_name, workflow_info)) => Ok(workflow_info),
396 None => {
397 info!(
398 subject = "workflow.retrieve.db.check",
399 category = "workflow",
400 cid = workflow_cid.to_string(),
401 "workflow information not available in the database"
402 );
403
404 Self::retrieve_from_provider(workflow_cid, event_sender, p2p_provider_timeout).await
405 }
406 }?;
407
408 Ok(workflow_info)
409 }
410
411 async fn retrieve_from_dht<'a>(
414 workflow_cid: Cid,
415 event_sender: Arc<AsyncChannelSender<Event>>,
416 p2p_workflow_info_timeout: Duration,
417 ) -> Result<Info> {
418 let (tx, rx) = AsyncChannel::oneshot();
419 event_sender
420 .send_async(Event::FindRecord(QueryRecord::with(
421 workflow_cid,
422 CapsuleTag::Workflow,
423 Some(tx),
424 )))
425 .await?;
426
427 match timeout_at(Instant::now() + p2p_workflow_info_timeout, rx.recv_async()).await {
428 Ok(Ok(ResponseEvent::Found(Ok(FoundEvent::Workflow(event))))) => {
429 #[cfg(feature = "websocket-notify")]
430 let _ = event_sender
431 .send_async(Event::StoredRecord(FoundEvent::Workflow(event.clone())))
432 .await;
433
434 Ok(event.workflow_info)
435 }
436 Ok(Ok(ResponseEvent::Found(Err(_err)))) => {
437 bail!("failed to find workflow info with cid {workflow_cid}")
438 }
439 Ok(Ok(event)) => {
440 bail!("received unexpected event {event:?} for workflow {workflow_cid}")
441 }
442 Ok(Err(err)) => {
443 bail!("unexpected error while retrieving workflow info: {err}")
444 }
445 Err(_) => {
446 bail!(
447 "timeout deadline reached while finding workflow info with cid {workflow_cid}"
448 )
449 }
450 }
451 }
452
453 async fn retrieve_from_provider<'a>(
456 workflow_cid: Cid,
457 event_sender: Arc<AsyncChannelSender<Event>>,
458 p2p_provider_timeout: Duration,
459 ) -> Result<Info> {
460 let (tx, rx) = AsyncChannel::oneshot();
461 event_sender
462 .send_async(Event::GetProviders(QueryRecord::with(
463 workflow_cid,
464 CapsuleTag::Workflow,
465 Some(tx),
466 )))
467 .await?;
468
469 match timeout_at(Instant::now() + p2p_provider_timeout, rx.recv_async()).await {
470 Ok(Ok(ResponseEvent::Found(Ok(FoundEvent::Workflow(event))))) => {
471 #[cfg(feature = "websocket-notify")]
472 let _ = event_sender
473 .send_async(Event::StoredRecord(FoundEvent::Workflow(event.clone())))
474 .await;
475
476 Ok(event.workflow_info)
477 }
478 Ok(Ok(ResponseEvent::Found(Err(err)))) => {
479 bail!("failure in attempting to find event: {err}")
480 }
481 Ok(Ok(event)) => {
482 bail!("received unexpected event {event:?} for workflow {workflow_cid}")
483 }
484 Ok(Err(err)) => {
485 bail!("unexpected error while retrieving workflow info: {err}")
486 }
487 Err(_) => {
488 bail!(
489 "timeout deadline reached while finding workflow info with cid {workflow_cid}"
490 )
491 }
492 }
493 }
494}
495
496impl From<Info> for Ipld {
497 fn from(workflow: Info) -> Self {
498 Ipld::Map(BTreeMap::from([
499 (CID_KEY.into(), Ipld::Link(workflow.cid)),
500 (
501 NAME_KEY.into(),
502 workflow
503 .name
504 .as_ref()
505 .map(|name| name.to_string().into())
506 .unwrap_or(Ipld::Null),
507 ),
508 (
509 NUM_TASKS_KEY.into(),
510 Ipld::Integer(workflow.num_tasks as i128),
511 ),
512 (
513 PROGRESS_KEY.into(),
514 Ipld::List(workflow.progress.into_iter().map(Ipld::Link).collect()),
515 ),
516 (
517 PROGRESS_COUNT_KEY.into(),
518 Ipld::Integer(workflow.progress_count as i128),
519 ),
520 (RESOURCES_KEY.into(), Ipld::from(workflow.resources)),
521 ]))
522 }
523}
524
525impl TryFrom<Ipld> for Info {
526 type Error = anyhow::Error;
527
528 fn try_from(ipld: Ipld) -> Result<Self, Self::Error> {
529 let map = from_ipld::<BTreeMap<String, Ipld>>(ipld)?;
530 let cid = from_ipld(
531 map.get(CID_KEY)
532 .ok_or_else(|| anyhow!("no `cid` set"))?
533 .to_owned(),
534 )?;
535 let name = map
536 .get(NAME_KEY)
537 .and_then(|ipld| match ipld {
538 Ipld::Null => None,
539 ipld => Some(ipld),
540 })
541 .and_then(|ipld| from_ipld(ipld.to_owned()).ok());
542 let num_tasks = from_ipld(
543 map.get(NUM_TASKS_KEY)
544 .ok_or_else(|| anyhow!("no `num_tasks` set"))?
545 .to_owned(),
546 )?;
547 let progress = from_ipld(
548 map.get(PROGRESS_KEY)
549 .ok_or_else(|| anyhow!("no `progress` set"))?
550 .to_owned(),
551 )?;
552 let progress_count = from_ipld(
553 map.get(PROGRESS_COUNT_KEY)
554 .ok_or_else(|| anyhow!("no `progress_count` set"))?
555 .to_owned(),
556 )?;
557 let resources = IndexedResources::try_from(
558 map.get(RESOURCES_KEY)
559 .ok_or_else(|| anyhow!("no `resources` set"))?
560 .to_owned(),
561 )?;
562
563 Ok(Self {
564 cid,
565 name,
566 num_tasks,
567 progress,
568 progress_count,
569 resources,
570 })
571 }
572}
573
574impl TryFrom<Info> for Vec<u8> {
575 type Error = anyhow::Error;
576
577 fn try_from(workflow_info: Info) -> Result<Self, Self::Error> {
578 let info_ipld = Ipld::from(workflow_info);
579 DagCborCodec.encode(&info_ipld)
580 }
581}
582
583impl TryFrom<Vec<u8>> for Info {
584 type Error = anyhow::Error;
585
586 fn try_from(bytes: Vec<u8>) -> Result<Self, Self::Error> {
587 let ipld: Ipld = DagCborCodec.decode(&bytes)?;
588 ipld.try_into()
589 }
590}
591
592impl DagJson for Info where Ipld: From<Info> {}
593
594#[cfg(test)]
595mod test {
596 use super::*;
597 use crate::workflow::Resource;
598 use homestar_invocation::{
599 authority::UcanPrf,
600 ipld::DagCbor,
601 task::{instruction::RunInstruction, Resources},
602 test_utils, Task,
603 };
604 use homestar_wasm::io::Arg;
605 use homestar_workflow::Workflow;
606 use indexmap::IndexMap;
607
608 #[test]
609 fn ipld_roundtrip_workflow_info() {
610 let config = Resources::default();
611 let (instruction1, instruction2, _) = test_utils::related_wasm_instructions::<Arg>();
612 let task1 = Task::new(
613 RunInstruction::Expanded(instruction1.clone()),
614 config.clone().into(),
615 UcanPrf::default(),
616 );
617 let task2 = Task::new(
618 RunInstruction::Expanded(instruction2.clone()),
619 config.into(),
620 UcanPrf::default(),
621 );
622
623 let mut index_map = IndexMap::new();
624 index_map.insert(
625 instruction1.clone().to_cid().unwrap(),
626 vec![Resource::Url(instruction1.resource().to_owned())],
627 );
628 index_map.insert(
629 instruction2.clone().to_cid().unwrap(),
630 vec![Resource::Url(instruction2.resource().to_owned())],
631 );
632
633 let workflow = Workflow::new(vec![task1.clone(), task2.clone()]);
634 let stored_info = Stored::new_with_resources(
635 Pointer::new(workflow.clone().to_cid().unwrap()),
636 None,
637 workflow.len() as i32,
638 IndexedResources::new(index_map),
639 );
640
641 let mut workflow_info = Info::default(stored_info);
642 workflow_info.increment_progress(task1.to_cid().unwrap());
643 workflow_info.increment_progress(task2.to_cid().unwrap());
644 let ipld = Ipld::from(workflow_info.clone());
645 assert_eq!(workflow_info, ipld.try_into().unwrap());
646 }
647}