homestar_invocation/task/instruction/
input.rs1use crate::{
9 error::ResolveError,
10 pointer::{Await, AwaitResult, ERR_BRANCH, OK_BRANCH, PTR_BRANCH},
11 task, Error, Pointer,
12};
13use async_recursion::async_recursion;
14use futures::{future, future::BoxFuture};
15use libipld::{serde::from_ipld, Cid, Ipld};
16use serde::{Deserialize, Serialize};
17use std::{collections::btree_map::BTreeMap, sync::Arc};
18
19mod parse;
20pub use parse::{Parse, Parsed};
21
22#[derive(Clone, Debug, PartialEq)]
24pub struct Args<T>(Vec<Input<T>>);
25
26impl<T> Args<T>
27where
28 T: std::fmt::Debug,
29{
30 pub fn new(args: Vec<Input<T>>) -> Self {
32 Self(args)
33 }
34
35 pub fn into_inner(self) -> Vec<Input<T>> {
39 self.0
40 }
41
42 pub fn inner(&self) -> &Vec<Input<T>> {
46 &self.0
47 }
48
49 pub fn deferreds(&self) -> impl Iterator<Item = Cid> + '_ {
51 self.0.iter().filter_map(|input| {
52 if let Input::Deferred(awaited_promise) = input {
53 Some(awaited_promise.instruction_cid())
54 } else {
55 None
56 }
57 })
58 }
59
60 pub fn links(&self) -> impl Iterator<Item = Cid> + '_ {
62 self.0.iter().filter_map(|input| {
63 if let Input::Ipld(Ipld::Link(link)) = input {
64 Some(link.to_owned())
65 } else {
66 None
67 }
68 })
69 }
70
71 pub async fn resolve<'a, F>(self, lookup_fn: F) -> Result<Self, ResolveError>
80 where
81 F: Fn(Cid) -> BoxFuture<'a, Result<task::Result<T>, ResolveError>> + Clone + Send + Sync,
82 Ipld: From<T>,
83 {
84 let inputs = resolve_args(self.0, lookup_fn).await;
85 for input in inputs.iter() {
86 if let Input::Deferred(awaiting) = input {
87 return Err(ResolveError::UnresolvedCid(
88 awaiting.instruction_cid().to_string(),
89 ));
90 }
91 }
92 Ok(Args(inputs))
93 }
94}
95
96impl<T> From<Args<T>> for Ipld
97where
98 Ipld: From<T>,
99{
100 fn from(args: Args<T>) -> Self {
101 let args = args.0.into_iter().map(|v| v.into());
102 Ipld::List(args.collect())
103 }
104}
105
106impl<T> TryFrom<Ipld> for Args<T>
107where
108 task::Result<T>: TryFrom<Ipld>,
109{
110 type Error = Error<T>;
111
112 fn try_from(ipld: Ipld) -> Result<Self, Self::Error> {
113 if let Ipld::List(vec) = ipld {
114 let args = vec
115 .into_iter()
116 .fold(Vec::<Input<T>>::new(), |mut acc, ipld| {
117 if let Ok(invocation_result) = task::Result::try_from(ipld.to_owned()) {
118 acc.push(Input::Arg(invocation_result));
119 } else if let Ok(await_result) = Await::try_from(ipld.to_owned()) {
120 acc.push(Input::Deferred(await_result));
121 } else {
122 acc.push(Input::Ipld(ipld))
123 }
124
125 acc
126 });
127 Ok(Args(args))
128 } else {
129 Err(Error::not_an_ipld_list())
130 }
131 }
132}
133
134#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
140pub enum Input<T> {
141 Ipld(Ipld),
143 Deferred(Await),
149 Arg(task::Result<T>),
152}
153
154impl<T> Input<T> {
155 pub async fn resolve<'a, F>(self, lookup_fn: F) -> Input<T>
165 where
166 F: Fn(Cid) -> BoxFuture<'a, Result<task::Result<T>, ResolveError>> + Clone + Send + Sync,
167 Ipld: From<T>,
168 {
169 match self {
170 Input::Ipld(ipld) => {
171 if let Ok(await_promise) = Await::try_from(&ipld) {
172 if let Ok(func_ret) = lookup_fn(await_promise.instruction_cid()).await {
173 Input::Arg(func_ret)
174 } else {
175 Input::Deferred(await_promise)
176 }
177 } else {
178 Input::Ipld(resolve_links(ipld, lookup_fn.into()).await)
179 }
180 }
181 Input::Arg(ref _arg) => self,
182 Input::Deferred(await_promise) => {
183 if let Ok(func_ret) = lookup_fn(await_promise.instruction_cid()).await {
184 Input::Arg(func_ret)
185 } else {
186 Input::Deferred(await_promise)
187 }
188 }
189 }
190 }
191}
192
193impl<T> From<Input<T>> for Ipld
194where
195 Ipld: From<T>,
196{
197 fn from(input: Input<T>) -> Self {
198 match input {
199 Input::Ipld(ipld) => ipld,
200 Input::Deferred(promise) => Await::into(promise),
201 Input::Arg(arg) => arg.into(),
202 }
203 }
204}
205
206impl<T> From<Await> for Input<T> {
207 fn from(await_promise: Await) -> Self {
208 Input::Deferred(await_promise)
209 }
210}
211
212impl<T> TryFrom<Ipld> for Input<T>
213where
214 T: From<Ipld>,
215{
216 type Error = Error<String>;
217
218 fn try_from(ipld: Ipld) -> Result<Self, Self::Error> {
219 let Ok(map) = from_ipld::<BTreeMap<String, Ipld>>(ipld.to_owned()) else {
220 if let Ok(invocation_result) = ipld.to_owned().try_into() {
221 return Ok(Input::Arg(invocation_result));
222 } else {
223 return Ok(Input::Ipld(ipld));
224 }
225 };
226
227 map.get_key_value(OK_BRANCH)
228 .or_else(|| map.get_key_value(ERR_BRANCH))
229 .or_else(|| map.get_key_value(PTR_BRANCH))
230 .map_or(
231 if let Ok(invocation_result) = task::Result::try_from(ipld.to_owned()) {
232 Ok(Input::Arg(invocation_result))
233 } else {
234 Ok(Input::Ipld(ipld))
235 },
236 |(branch, ipld)| {
237 let instruction = Pointer::try_from(ipld)?;
238 Ok(Input::Deferred(Await::new(
239 instruction,
240 AwaitResult::result(branch)
241 .ok_or_else(|| Error::InvalidDiscriminant(branch.to_string()))?,
242 )))
243 },
244 )
245 }
246}
247
248async fn resolve_args<'a, T, F>(args: Vec<Input<T>>, lookup_fn: F) -> Vec<Input<T>>
249where
250 F: Fn(Cid) -> BoxFuture<'a, Result<task::Result<T>, ResolveError>> + Clone + Send + Sync,
251 Ipld: From<T>,
252{
253 let args = args.into_iter().map(|v| v.resolve(lookup_fn.clone()));
254 future::join_all(args).await.into_iter().collect()
255}
256
257#[async_recursion]
261pub async fn resolve_links<'a, T, F>(ipld: Ipld, lookup_fn: Arc<F>) -> Ipld
262where
263 F: Fn(Cid) -> BoxFuture<'a, Result<task::Result<T>, ResolveError>> + Clone + Sync + Send,
264 Ipld: From<T>,
265{
266 match ipld {
267 Ipld::Map(m) => {
268 let futures = m.into_iter().map(|(k, v)| async {
269 match v {
270 Ipld::Link(cid) => {
271 let mut f = Arc::clone(&lookup_fn);
272 if let Ok(func_ret) = Arc::make_mut(&mut f)(cid).await {
273 if k.eq(PTR_BRANCH) {
274 (k, func_ret.into())
275 } else {
276 (k, func_ret.into_inner().into())
277 }
278 } else {
279 (k, v)
280 }
281 }
282 Ipld::Map(ref m) => {
283 let resolved = resolve_links(Ipld::Map(m.clone()), lookup_fn.clone()).await;
284 (k, resolved)
285 }
286 Ipld::List(ref l) => {
287 let resolved =
288 resolve_links(Ipld::List(l.clone()), lookup_fn.clone()).await;
289 (k, resolved)
290 }
291 _ => (k, v),
292 }
293 });
294 let resolved_results = future::join_all(futures).await;
295 Ipld::Map(
296 resolved_results
297 .into_iter()
298 .collect::<BTreeMap<String, Ipld>>(),
299 )
300 }
301 Ipld::List(l) => {
302 let futures = l.into_iter().map(|v| async {
303 match v {
304 Ipld::Link(cid) => {
305 let mut f = Arc::clone(&lookup_fn);
306 if let Ok(func_ret) = Arc::make_mut(&mut f)(cid).await {
307 func_ret.into_inner().into()
308 } else {
309 v
310 }
311 }
312 Ipld::Map(ref m) => {
313 resolve_links(Ipld::Map(m.clone()), lookup_fn.clone()).await
314 }
315 Ipld::List(ref l) => {
316 resolve_links(Ipld::List(l.clone()), lookup_fn.clone()).await
317 }
318 _ => v,
319 }
320 });
321 let resolved_results = future::join_all(futures).await;
322 Ipld::List(resolved_results)
323 }
324 Ipld::Link(link) => {
325 let mut f = Arc::clone(&lookup_fn);
326 if let Ok(func_ret) = Arc::make_mut(&mut f)(link).await {
327 func_ret.into_inner().into()
328 } else {
329 Ipld::Link(link)
330 }
331 }
332 _ => ipld,
333 }
334}
335
336#[cfg(test)]
337mod test {
338 use super::*;
339 use crate::{test_utils, Unit};
340
341 #[test]
342 fn input_ipld_ipld_rountrip() {
343 let input: Input<Unit> = Input::Ipld(Ipld::List(vec![Ipld::Bool(true)]));
344 let ipld = Ipld::from(input.clone());
345
346 assert_eq!(ipld, Ipld::List(vec![Ipld::Bool(true)]));
347 assert_eq!(input, ipld.try_into().unwrap());
348 }
349
350 #[test]
351 fn input_deferred_ipld_rountrip() {
352 let instruction = test_utils::instruction::<Unit>();
353 let ptr: Pointer = instruction.try_into().unwrap();
354 let input: Input<Unit> = Input::Deferred(Await::new(ptr.clone(), AwaitResult::Ptr));
355 let ipld = Ipld::from(input.clone());
356
357 assert_eq!(
358 ipld,
359 Ipld::Map(BTreeMap::from([(PTR_BRANCH.into(), Ipld::Link(ptr.cid()))]))
360 );
361 assert_eq!(input, ipld.try_into().unwrap());
362 }
363
364 #[test]
365 fn input_arg_ipld_rountrip() {
366 let input: Input<Ipld> = Input::Arg(task::Result::Just(Ipld::Bool(false)));
367 let ipld = Ipld::from(input.clone());
368
369 assert_eq!(
370 ipld,
371 Ipld::List(vec![Ipld::String("just".into()), Ipld::Bool(false)])
372 );
373 assert_eq!(input, ipld.try_into().unwrap());
374 }
375
376 #[test]
377 fn args_ipld_rountrip() {
378 let input: Input<Unit> = Input::Ipld(Ipld::Bool(true));
379 let args = Args::new(vec![input]);
380 let ipld = Ipld::from(args.clone());
381
382 assert_eq!(ipld, Ipld::List(vec![Ipld::Bool(true)]));
383 assert_eq!(args, ipld.try_into().unwrap());
384 }
385
386 #[test]
387 fn ser_de_ipld() {
388 let input: Input<Unit> = Input::Ipld(Ipld::Bool(true));
389 let ser = serde_json::to_string(&input).unwrap();
390 let de = serde_json::from_str(&ser).unwrap();
391
392 assert_eq!(input, de);
393 }
394
395 #[test]
396 fn ser_de_arg_ipld() {
397 let input: Input<Ipld> = Input::Arg(task::Result::Just(Ipld::Bool(false)));
398 let ser = serde_json::to_string(&input).unwrap();
399 let de = serde_json::from_str(&ser).unwrap();
400
401 assert_eq!(input, de);
402 }
403}