homestar_invocation/task/instruction/
input.rs

1//! Input paramters for [Instruction] execution and means to
2//! generally [parse] and [resolve] them.
3//!
4//! [Instruction]: super::Instruction
5//! [parse]: Parse::parse
6//! [resolve]: Args::resolve
7
8use crate::{
9    error::ResolveError,
10    pointer::{Await, AwaitResult, ERR_BRANCH, OK_BRANCH, PTR_BRANCH},
11    task, Error, Pointer,
12};
13use async_recursion::async_recursion;
14use futures::{future, future::BoxFuture};
15use libipld::{serde::from_ipld, Cid, Ipld};
16use serde::{Deserialize, Serialize};
17use std::{collections::btree_map::BTreeMap, sync::Arc};
18
19mod parse;
20pub use parse::{Parse, Parsed};
21
22/// A list of ordered [Input] arguments/parameters.
23#[derive(Clone, Debug, PartialEq)]
24pub struct Args<T>(Vec<Input<T>>);
25
26impl<T> Args<T>
27where
28    T: std::fmt::Debug,
29{
30    /// Create an [Args] [Vec]-type.
31    pub fn new(args: Vec<Input<T>>) -> Self {
32        Self(args)
33    }
34
35    /// Return wrapped [Vec] of [inputs].
36    ///
37    /// [inputs]: Input
38    pub fn into_inner(self) -> Vec<Input<T>> {
39        self.0
40    }
41
42    /// Return refeerence to a wrapped [Vec] of [inputs].
43    ///
44    /// [inputs]: Input
45    pub fn inner(&self) -> &Vec<Input<T>> {
46        &self.0
47    }
48
49    /// Return *only* deferred/awaited inputs.
50    pub fn deferreds(&self) -> impl Iterator<Item = Cid> + '_ {
51        self.0.iter().filter_map(|input| {
52            if let Input::Deferred(awaited_promise) = input {
53                Some(awaited_promise.instruction_cid())
54            } else {
55                None
56            }
57        })
58    }
59
60    /// Return *only* `Ipld::Link` Cids.
61    pub fn links(&self) -> impl Iterator<Item = Cid> + '_ {
62        self.0.iter().filter_map(|input| {
63            if let Input::Ipld(Ipld::Link(link)) = input {
64                Some(link.to_owned())
65            } else {
66                None
67            }
68        })
69    }
70
71    /// Resolve [awaited promises] of [inputs] into task-specific [Input::Arg]'s,
72    /// given a successful lookup function; otherwise, return [Input::Deferred]
73    /// for unresolved promises, or just return [Input::Ipld],
74    /// [resolving Ipld links] if the lookup function expected Ipld input data.
75    ///
76    /// [awaited promises]: Await
77    /// [inputs]: Input
78    /// [resolving Ipld links]: resolve_links
79    pub async fn resolve<'a, F>(self, lookup_fn: F) -> Result<Self, ResolveError>
80    where
81        F: Fn(Cid) -> BoxFuture<'a, Result<task::Result<T>, ResolveError>> + Clone + Send + Sync,
82        Ipld: From<T>,
83    {
84        let inputs = resolve_args(self.0, lookup_fn).await;
85        for input in inputs.iter() {
86            if let Input::Deferred(awaiting) = input {
87                return Err(ResolveError::UnresolvedCid(
88                    awaiting.instruction_cid().to_string(),
89                ));
90            }
91        }
92        Ok(Args(inputs))
93    }
94}
95
96impl<T> From<Args<T>> for Ipld
97where
98    Ipld: From<T>,
99{
100    fn from(args: Args<T>) -> Self {
101        let args = args.0.into_iter().map(|v| v.into());
102        Ipld::List(args.collect())
103    }
104}
105
106impl<T> TryFrom<Ipld> for Args<T>
107where
108    task::Result<T>: TryFrom<Ipld>,
109{
110    type Error = Error<T>;
111
112    fn try_from(ipld: Ipld) -> Result<Self, Self::Error> {
113        if let Ipld::List(vec) = ipld {
114            let args = vec
115                .into_iter()
116                .fold(Vec::<Input<T>>::new(), |mut acc, ipld| {
117                    if let Ok(invocation_result) = task::Result::try_from(ipld.to_owned()) {
118                        acc.push(Input::Arg(invocation_result));
119                    } else if let Ok(await_result) = Await::try_from(ipld.to_owned()) {
120                        acc.push(Input::Deferred(await_result));
121                    } else {
122                        acc.push(Input::Ipld(ipld))
123                    }
124
125                    acc
126                });
127            Ok(Args(args))
128        } else {
129            Err(Error::not_an_ipld_list())
130        }
131    }
132}
133
134/// Contains parameters expected by the URI/[Ability] pair.
135///
136/// Left to the executor to define the shape of this data, per job.
137///
138/// [Ability]: super::Ability
139#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
140pub enum Input<T> {
141    /// Ipld Literals.
142    Ipld(Ipld),
143    /// Promise-[links] awaiting the output of another [Instruction]'s
144    /// invocation, directly.
145    ///
146    /// [links]: Pointer
147    /// [Instruction]: super::Instruction
148    Deferred(Await),
149    /// General argument, wrapping a [task::Result] over a task-specific
150    /// implementation's own input type(s).
151    Arg(task::Result<T>),
152}
153
154impl<T> Input<T> {
155    /// Resolve [awaited promise] of an [Input] into a task-specific
156    /// [Input::Arg], given a successful lookup function; otherwise, return
157    /// [Input::Deferred] for an unresolved promise, or just return
158    /// [Input::Ipld], [resolving Ipld links] if the lookup function expected
159    /// Ipld input data.
160    ///
161    /// [awaited promises]: Await
162    /// [inputs]: Input
163    /// [resolving Ipld links]: resolve_links
164    pub async fn resolve<'a, F>(self, lookup_fn: F) -> Input<T>
165    where
166        F: Fn(Cid) -> BoxFuture<'a, Result<task::Result<T>, ResolveError>> + Clone + Send + Sync,
167        Ipld: From<T>,
168    {
169        match self {
170            Input::Ipld(ipld) => {
171                if let Ok(await_promise) = Await::try_from(&ipld) {
172                    if let Ok(func_ret) = lookup_fn(await_promise.instruction_cid()).await {
173                        Input::Arg(func_ret)
174                    } else {
175                        Input::Deferred(await_promise)
176                    }
177                } else {
178                    Input::Ipld(resolve_links(ipld, lookup_fn.into()).await)
179                }
180            }
181            Input::Arg(ref _arg) => self,
182            Input::Deferred(await_promise) => {
183                if let Ok(func_ret) = lookup_fn(await_promise.instruction_cid()).await {
184                    Input::Arg(func_ret)
185                } else {
186                    Input::Deferred(await_promise)
187                }
188            }
189        }
190    }
191}
192
193impl<T> From<Input<T>> for Ipld
194where
195    Ipld: From<T>,
196{
197    fn from(input: Input<T>) -> Self {
198        match input {
199            Input::Ipld(ipld) => ipld,
200            Input::Deferred(promise) => Await::into(promise),
201            Input::Arg(arg) => arg.into(),
202        }
203    }
204}
205
206impl<T> From<Await> for Input<T> {
207    fn from(await_promise: Await) -> Self {
208        Input::Deferred(await_promise)
209    }
210}
211
212impl<T> TryFrom<Ipld> for Input<T>
213where
214    T: From<Ipld>,
215{
216    type Error = Error<String>;
217
218    fn try_from(ipld: Ipld) -> Result<Self, Self::Error> {
219        let Ok(map) = from_ipld::<BTreeMap<String, Ipld>>(ipld.to_owned()) else {
220            if let Ok(invocation_result) = ipld.to_owned().try_into() {
221                return Ok(Input::Arg(invocation_result));
222            } else {
223                return Ok(Input::Ipld(ipld));
224            }
225        };
226
227        map.get_key_value(OK_BRANCH)
228            .or_else(|| map.get_key_value(ERR_BRANCH))
229            .or_else(|| map.get_key_value(PTR_BRANCH))
230            .map_or(
231                if let Ok(invocation_result) = task::Result::try_from(ipld.to_owned()) {
232                    Ok(Input::Arg(invocation_result))
233                } else {
234                    Ok(Input::Ipld(ipld))
235                },
236                |(branch, ipld)| {
237                    let instruction = Pointer::try_from(ipld)?;
238                    Ok(Input::Deferred(Await::new(
239                        instruction,
240                        AwaitResult::result(branch)
241                            .ok_or_else(|| Error::InvalidDiscriminant(branch.to_string()))?,
242                    )))
243                },
244            )
245    }
246}
247
248async fn resolve_args<'a, T, F>(args: Vec<Input<T>>, lookup_fn: F) -> Vec<Input<T>>
249where
250    F: Fn(Cid) -> BoxFuture<'a, Result<task::Result<T>, ResolveError>> + Clone + Send + Sync,
251    Ipld: From<T>,
252{
253    let args = args.into_iter().map(|v| v.resolve(lookup_fn.clone()));
254    future::join_all(args).await.into_iter().collect()
255}
256
257/// Resolve [awaited promises] for *only* Ipld data, given a lookup function.
258///
259/// [awaited promises]: Await
260#[async_recursion]
261pub async fn resolve_links<'a, T, F>(ipld: Ipld, lookup_fn: Arc<F>) -> Ipld
262where
263    F: Fn(Cid) -> BoxFuture<'a, Result<task::Result<T>, ResolveError>> + Clone + Sync + Send,
264    Ipld: From<T>,
265{
266    match ipld {
267        Ipld::Map(m) => {
268            let futures = m.into_iter().map(|(k, v)| async {
269                match v {
270                    Ipld::Link(cid) => {
271                        let mut f = Arc::clone(&lookup_fn);
272                        if let Ok(func_ret) = Arc::make_mut(&mut f)(cid).await {
273                            if k.eq(PTR_BRANCH) {
274                                (k, func_ret.into())
275                            } else {
276                                (k, func_ret.into_inner().into())
277                            }
278                        } else {
279                            (k, v)
280                        }
281                    }
282                    Ipld::Map(ref m) => {
283                        let resolved = resolve_links(Ipld::Map(m.clone()), lookup_fn.clone()).await;
284                        (k, resolved)
285                    }
286                    Ipld::List(ref l) => {
287                        let resolved =
288                            resolve_links(Ipld::List(l.clone()), lookup_fn.clone()).await;
289                        (k, resolved)
290                    }
291                    _ => (k, v),
292                }
293            });
294            let resolved_results = future::join_all(futures).await;
295            Ipld::Map(
296                resolved_results
297                    .into_iter()
298                    .collect::<BTreeMap<String, Ipld>>(),
299            )
300        }
301        Ipld::List(l) => {
302            let futures = l.into_iter().map(|v| async {
303                match v {
304                    Ipld::Link(cid) => {
305                        let mut f = Arc::clone(&lookup_fn);
306                        if let Ok(func_ret) = Arc::make_mut(&mut f)(cid).await {
307                            func_ret.into_inner().into()
308                        } else {
309                            v
310                        }
311                    }
312                    Ipld::Map(ref m) => {
313                        resolve_links(Ipld::Map(m.clone()), lookup_fn.clone()).await
314                    }
315                    Ipld::List(ref l) => {
316                        resolve_links(Ipld::List(l.clone()), lookup_fn.clone()).await
317                    }
318                    _ => v,
319                }
320            });
321            let resolved_results = future::join_all(futures).await;
322            Ipld::List(resolved_results)
323        }
324        Ipld::Link(link) => {
325            let mut f = Arc::clone(&lookup_fn);
326            if let Ok(func_ret) = Arc::make_mut(&mut f)(link).await {
327                func_ret.into_inner().into()
328            } else {
329                Ipld::Link(link)
330            }
331        }
332        _ => ipld,
333    }
334}
335
336#[cfg(test)]
337mod test {
338    use super::*;
339    use crate::{test_utils, Unit};
340
341    #[test]
342    fn input_ipld_ipld_rountrip() {
343        let input: Input<Unit> = Input::Ipld(Ipld::List(vec![Ipld::Bool(true)]));
344        let ipld = Ipld::from(input.clone());
345
346        assert_eq!(ipld, Ipld::List(vec![Ipld::Bool(true)]));
347        assert_eq!(input, ipld.try_into().unwrap());
348    }
349
350    #[test]
351    fn input_deferred_ipld_rountrip() {
352        let instruction = test_utils::instruction::<Unit>();
353        let ptr: Pointer = instruction.try_into().unwrap();
354        let input: Input<Unit> = Input::Deferred(Await::new(ptr.clone(), AwaitResult::Ptr));
355        let ipld = Ipld::from(input.clone());
356
357        assert_eq!(
358            ipld,
359            Ipld::Map(BTreeMap::from([(PTR_BRANCH.into(), Ipld::Link(ptr.cid()))]))
360        );
361        assert_eq!(input, ipld.try_into().unwrap());
362    }
363
364    #[test]
365    fn input_arg_ipld_rountrip() {
366        let input: Input<Ipld> = Input::Arg(task::Result::Just(Ipld::Bool(false)));
367        let ipld = Ipld::from(input.clone());
368
369        assert_eq!(
370            ipld,
371            Ipld::List(vec![Ipld::String("just".into()), Ipld::Bool(false)])
372        );
373        assert_eq!(input, ipld.try_into().unwrap());
374    }
375
376    #[test]
377    fn args_ipld_rountrip() {
378        let input: Input<Unit> = Input::Ipld(Ipld::Bool(true));
379        let args = Args::new(vec![input]);
380        let ipld = Ipld::from(args.clone());
381
382        assert_eq!(ipld, Ipld::List(vec![Ipld::Bool(true)]));
383        assert_eq!(args, ipld.try_into().unwrap());
384    }
385
386    #[test]
387    fn ser_de_ipld() {
388        let input: Input<Unit> = Input::Ipld(Ipld::Bool(true));
389        let ser = serde_json::to_string(&input).unwrap();
390        let de = serde_json::from_str(&ser).unwrap();
391
392        assert_eq!(input, de);
393    }
394
395    #[test]
396    fn ser_de_arg_ipld() {
397        let input: Input<Ipld> = Input::Arg(task::Result::Just(Ipld::Bool(false)));
398        let ser = serde_json::to_string(&input).unwrap();
399        let de = serde_json::from_str(&ser).unwrap();
400
401        assert_eq!(input, de);
402    }
403}