Skip to main content

tierkreis_runtime/
lib.rs

1//! tierkreis-runtime implements a tokio-based runtime system for Tierkreis graphs
2//! (i.e., an asynchronous + parallel interpreter).
3use anyhow::anyhow;
4use operations::graph::checkpoint_client::CheckpointClient;
5
6use operations::{RuntimeOperation, TaskHandle};
7use std::collections::{HashMap, HashSet};
8use std::sync::Arc;
9use thiserror::Error;
10use tierkreis_core::{
11    graph::TypeScheme,
12    namespace::{FunctionDeclaration, NamespaceItem, Signature, SignatureError},
13    symbol::{Label, Location, LocationName},
14    type_checker::{GraphWithInputs, TypeErrors, Typeable},
15};
16use tierkreis_core::{
17    graph::{Graph, Node, Value},
18    prelude::TryInto,
19    symbol::FunctionName,
20};
21use tierkreis_proto::messages::{Callback, GraphTrace, InferTypeResponse};
22use tierkreis_proto::protos_gen::v1alpha1::signature as ps;
23use workers::{EscapeHatch, LocalWorker, Worker};
24
25#[cfg(feature = "config")]
26use serde::Deserialize;
27
28pub mod operations;
29pub(crate) mod util;
30pub mod workers;
31
32/// Builder for [`Runtime`].
33pub struct RuntimeBuilder {
34    workers: HashMap<LocationName, Box<dyn Worker>>,
35    local_functions: LocalWorker,
36    signature: Signature,
37    runtime_type_checking: RuntimeTypeChecking,
38}
39
40#[derive(Error, Debug)]
41#[error("Location {0} was defined twice")]
42struct LocationConflict(LocationName);
43
44impl RuntimeBuilder {
45    /// Creates a new RuntimeBuilder with a blank initial configuration:
46    /// no workers, only the builtin functions, and [RuntimeTypeChecking::OnlyExternal]
47    pub fn new() -> Self {
48        let mut res = RuntimeBuilder {
49            workers: HashMap::new(),
50            local_functions: LocalWorker::new(),
51            signature: Signature::default(),
52            runtime_type_checking: RuntimeTypeChecking::OnlyExternal,
53        }
54        .with_local_functions(LocalWorker::builtins())
55        .unwrap();
56        res.signature.scopes.insert(Location::local());
57        res
58    }
59
60    /// Add a worker at a specified child location, which must be new in the config.
61    pub async fn with_worker<W>(mut self, worker: W, loc: LocationName) -> anyhow::Result<Self>
62    where
63        W: Worker + 'static,
64    {
65        if self.workers.contains_key(&loc) {
66            return Err(LocationConflict(loc).into());
67        }
68
69        let sig: Signature = TryInto::try_into(worker.signature(Location::local()).await?)?;
70
71        self.signature.merge_signature(sig.in_location(loc))?;
72
73        self.workers.insert(loc, Box::new(worker));
74        Ok(self)
75    }
76
77    /// Add new [LocalWorker] functions at [Location::local].
78    ///
79    /// # Errors
80    ///
81    /// `SignatureError` if any of the new functions have signatures that conflict
82    /// with already-added functions of the same name.
83    pub fn with_local_functions(
84        mut self,
85        local_functions: LocalWorker,
86    ) -> Result<Self, SignatureError> {
87        self.signature.merge_signature(Signature {
88            root: local_functions.declarations().map(|x| NamespaceItem {
89                decl: x,
90                locations: vec![Location::local()],
91            }),
92            aliases: HashMap::new(),
93            scopes: HashSet::new(),
94        })?;
95        self.local_functions.merge(local_functions)?;
96        Ok(self)
97    }
98
99    /// Changes what level of runtime type-checking the [Runtime] will perform
100    /// (overwrites previous setting)
101    pub fn with_checking(mut self, runtime_type_checking: RuntimeTypeChecking) -> Self {
102        self.runtime_type_checking = runtime_type_checking;
103        self
104    }
105
106    /// Constructs a Runtime using the configuration set in this RuntimeBuilder
107    pub fn start(self) -> Runtime {
108        Runtime {
109            workers: Arc::new(self.workers),
110            local_functions: Arc::new(self.local_functions),
111            signature: Arc::new(self.signature),
112            runtime_type_checking: self.runtime_type_checking,
113        }
114    }
115}
116
117impl Default for RuntimeBuilder {
118    fn default() -> Self {
119        Self::new()
120    }
121}
122
123/// Levels of type checking that can be performed by a [Runtime]
124/// during graph execution. (Distinct from the `type_check: bool` parameter
125/// to e.g. [start_graph](crate::Runtime::start_graph) which relates to
126/// type-checking *before* graph execution begins)
127#[derive(Copy, Clone, Debug, Default)]
128#[cfg_attr(feature = "config", derive(Deserialize))]
129#[cfg_attr(feature = "config", serde(rename_all = "snake_case"))]
130pub enum RuntimeTypeChecking {
131    /// No type checking at runtime. If functions produce outputs that do not
132    /// match their [outputs](FunctionDeclaration), these values will nonetheless
133    /// be placed onto the graph edges and passed as inputs to the edge targets.
134    Disabled,
135    /// Values from external workers (either running functions, or graph workers
136    /// running Boxes at non-local/strict-descendant [Location]s) are checked as
137    /// [Self::Enabled]; values from local functions and boxes are not checked
138    /// (as [Self::Disabled]).
139    #[default]
140    OnlyExternal,
141    /// Type-check that every value produced at runtime meets the expected/declared
142    /// type; any that does not will abort execution with a
143    /// `GraphError::UnexpectedOutputType` error
144    // Note that error is private (it gets anyhow!'d), hence can't link to it
145    Enabled,
146}
147
148impl RuntimeTypeChecking {
149    fn should_type_check(&self, source_node: &Node) -> bool {
150        match self {
151            RuntimeTypeChecking::Disabled => false,
152            RuntimeTypeChecking::OnlyExternal => source_node.is_external(),
153            RuntimeTypeChecking::Enabled => true,
154        }
155    }
156}
157
158/// Runtime system that manages all resources necessary to run graphs.
159///
160/// The runtime system manages a collection of [`Worker`]s and routes requests to run nodes to the
161/// appropriate workers depending on which functions they support.
162///
163/// This struct can be efficiently cloned to be passed around and cleans up its
164/// resources after the last copy has been dropped.
165#[derive(Clone)]
166pub struct Runtime {
167    workers: Arc<HashMap<LocationName, Box<dyn Worker>>>,
168    local_functions: Arc<LocalWorker>,
169    signature: Arc<Signature>,
170    runtime_type_checking: RuntimeTypeChecking,
171}
172
173impl std::fmt::Debug for Runtime {
174    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
175        f.debug_struct("Runtime").finish()
176    }
177}
178
179impl Runtime {
180    /// Create a new `RuntimeBuilder`.
181    #[must_use]
182    pub fn builder() -> RuntimeBuilder {
183        RuntimeBuilder::new()
184    }
185
186    /// Returns the signatures of all functions available to the runtime.
187    pub fn function_declarations(&self) -> impl Iterator<Item = &FunctionDeclaration> {
188        self.signature.root.values().map(|entry| &entry.decl)
189    }
190
191    /// Returns the signature for a function.
192    #[must_use]
193    pub fn function_declaration(&self, name: FunctionName) -> Option<&FunctionDeclaration> {
194        self.signature.root.get(&name).map(|entry| &entry.decl)
195    }
196
197    /// Lists all the functions known to this runtime; equivalent to [Self::signature_in_loc]
198    /// with [Location::local]
199    pub fn signature(&self) -> Signature {
200        self.signature.as_ref().clone()
201    }
202
203    /// Lists the functions known in a specified location (and sub-locations thereof).
204    pub async fn signature_in_loc(
205        &self,
206        loc: Location,
207    ) -> anyhow::Result<ps::ListFunctionsResponse> {
208        match loc.pop() {
209            Some((ln, rest)) => match self.workers.get(&ln) {
210                Some(x) => Ok(x.signature(rest).await?),
211                None => Err(anyhow!("Location {} not found", ln)),
212            },
213            None => Ok(self.signature().into()),
214        }
215    }
216
217    /// Infer the type of a graph or value using the signature provided by the runtime's workers.
218    ///
219    /// # Errors
220    ///
221    /// Returns an error in case the type inference fails.
222    pub fn infer_type<T>(&self, to_check: &T) -> Result<(TypeScheme, T::Annotated), TypeErrors>
223    where
224        T: Typeable,
225    {
226        to_check.infer_type(&self.signature().type_schemes())
227    }
228
229    /// Infers the type that a [Value] would have if it were seen in a
230    /// particular location, i.e. a descendant [RuntimeWorker] (able to
231    /// run Graphs).
232    ///
233    /// # Errors
234    /// * If `location` does not identify a worker able to type-check graphs
235    /// * Any type error in `to_check`, for example, using functions not
236    ///   available at `location`.
237    ///
238    /// [RuntimeWorker]: workers::RuntimeWorker
239    pub async fn infer_type_in_loc(
240        &self,
241        to_check: Value,
242        location: Location,
243    ) -> anyhow::Result<ps::InferTypeResponse> {
244        match location.pop() {
245            Some((ln, rest)) => {
246                self.workers
247                    .get(&ln)
248                    .ok_or_else(|| anyhow!("Location {} not found", ln))?
249                    .to_runtime_worker()
250                    .ok_or_else(|| anyhow!("Location {} is not a runtime worker", ln))?
251                    .infer_type(to_check, rest)
252                    .await
253            }
254            None => {
255                let resp: InferTypeResponse = self.infer_type(&to_check).into();
256                Ok(resp.into())
257            }
258        }
259    }
260
261    /// Run a graph to completion with a given collection of inputs,
262    /// at a non-local Location.
263    pub async fn execute_graph_remote(
264        &self,
265        graph: Graph,
266        inputs: HashMap<Label, Value>,
267        type_check: bool,
268        remote_loc: (LocationName, Location),
269        escape: Callback, // back to this server, at least
270    ) -> Result<HashMap<Label, Value>, RunGraphError> {
271        let (ln, rest) = remote_loc;
272        match self
273            .workers
274            .get(&ln)
275            .ok_or_else(|| {
276                RunGraphError::RuntimeError(Arc::new(anyhow!("Location {} not found", ln)))
277            })?
278            .to_runtime_worker()
279        {
280            Some(x) => {
281                x.execute_graph(graph, inputs, rest, type_check, Some(escape))
282                    .await
283            }
284            None => Err(RunGraphError::RuntimeError(Arc::new(anyhow!(
285                "Location {} is not a runtime worker",
286                ln
287            )))),
288        }
289    }
290
291    /// Run a graph to completion with a given collection of inputs, locally,
292    /// optionally after type-checking.
293    ///
294    /// `callback` should be a connection-point (for child workers) to connect to this runtime.
295    ///
296    /// `escape` may be a forwarding chain to the root of a chain of servers,
297    ///    but note that if `type_check` is true, then we will never need to escape
298    ///    above this server (as successful type-checking guarantees the graph uses
299    ///    only functions known to this server).
300    #[allow(clippy::missing_panics_doc)]
301    pub async fn execute_graph_cb(
302        &self,
303        graph: Graph,
304        inputs: HashMap<Label, Value>,
305        type_check: bool,
306        callback: Callback,
307        escape: EscapeHatch,
308    ) -> Result<HashMap<Label, Value>, RunGraphError> {
309        Ok(self
310            .start_graph(graph, inputs, type_check, callback, escape, None)?
311            .complete()
312            .await?
313            .as_ref()
314            .clone())
315    }
316
317    /// Starts running a graph (locally, on this Runtime),
318    /// optionally type-checking the graph and inputs first.
319    ///
320    /// If type-checking succeeds (or is not requested), returns a handle to
321    /// the graph-executing process which will continue in the background.
322    ///
323    /// See [execute_graph_cb](Self::execute_graph_cb) for more detail on `callback` + `escape`
324    pub fn start_graph(
325        &self,
326        graph: Graph,
327        inputs: HashMap<Label, Value>,
328        type_check: bool,
329        callback: Callback,
330        escape: EscapeHatch,
331        checkpoint_client: Option<CheckpointClient>,
332    ) -> Result<TaskHandle, TypeErrors> {
333        let (checked_graph, checked_inputs) = if type_check {
334            let (_, gwi) = GraphWithInputs { graph, inputs }
335                .infer_type(&self.signature.as_ref().clone().type_schemes())?;
336            (gwi.graph, gwi.inputs)
337        } else {
338            (graph, inputs)
339        };
340
341        // ALAN: TODO: pass thru run_graph etc...but is this enough if we don't use scoped execution?
342        let stack_trace = GraphTrace::Root;
343
344        let handle = RuntimeOperation::new_graph(checked_graph)
345            .run_simple(
346                self.clone(),
347                callback,
348                escape,
349                checked_inputs,
350                stack_trace,
351                checkpoint_client,
352            )
353            .into_task();
354
355        Ok(handle)
356    }
357
358    /// Runs a function at a specific location (local or otherwise).
359    /// If the location is non-local, will definitely return Some RuntimeOperation
360    /// (one that fails when run if the indicated remote can't run the function).
361    /// If the location is local, may return None if the function name is not known.
362    #[must_use]
363    pub fn run_function_with_loc(
364        &self,
365        function: &FunctionName,
366        loc: &Location,
367    ) -> Option<RuntimeOperation> {
368        let maybe_remote = loc.clone().pop().or_else(|| {
369            self.signature
370                .root
371                .get(function)
372                // Choosing the first location here is arbitrary and might be optimised
373                .and_then(|entry| entry.locations.first())
374                .and_then(|l| l.clone().pop())
375        });
376        match maybe_remote {
377            Some((ln, rest)) => Some(
378                // We could check in the signature that the named Location is able
379                // to execute the named function, but for (suitably) well-behaved
380                // clients  and type-checked graphs, we shouldn't hit such a case.
381                // A more definitive verdict requires contacting the worker in question,
382                // which we do not have time for until the RuntimeOperation executes,
383                // so execution may fail if the function name isn't recognized.
384                self.workers.get(&ln)?.spawn(function, &rest),
385            ),
386            None => self.local_functions.spawn(function),
387        }
388    }
389
390    /// Runs a graph in a runtime that is a strict descendant of this one.
391    /// (A non-local [Location] with at least one [LocationName].)
392    #[must_use]
393    pub fn run_graph_remote(
394        &self,
395        graph: Graph,
396        loc: (LocationName, Location),
397    ) -> Option<RuntimeOperation> {
398        let (ln, rest) = loc;
399        Some(
400            self.workers
401                .get(&ln)?
402                .to_runtime_worker()?
403                .spawn_graph(graph, &rest),
404        )
405    }
406}
407
408/// Errors for [`Runtime::execute_graph_cb`].
409#[derive(Debug, Clone, Error)]
410pub enum RunGraphError {
411    /// The graph did not pass type-checking
412    #[error("graph failed to type check")]
413    TypeError(
414        #[source]
415        #[from]
416        TypeErrors,
417    ),
418    /// A miscellaneous error during graph execution
419    #[error("error while running a graph")]
420    RuntimeError(
421        #[source]
422        #[from]
423        Arc<anyhow::Error>,
424    ),
425}
426
427impl From<anyhow::Error> for RunGraphError {
428    fn from(e: anyhow::Error) -> Self {
429        RunGraphError::RuntimeError(Arc::new(e))
430    }
431}
432
433#[cfg(test)]
434pub(crate) mod tests {
435    use crate::EscapeHatch;
436    use anyhow::bail;
437    use rstest::{fixture, rstest};
438    use std::{collections::HashMap, time::Duration};
439    use tierkreis_core::{
440        graph::{Graph, GraphBuilder, GraphBuilderError, GraphType, Node, Type, TypeScheme, Value},
441        namespace::FunctionDeclaration,
442        prelude::TryInto,
443        symbol::{FunctionName, Label, Location, LocationName},
444    };
445    use tierkreis_proto::messages::{Callback, GraphTrace};
446
447    use crate::{
448        operations::{OperationContext, RuntimeOperation},
449        workers::{AuthInjector, ClientInterceptor, ExternalWorker, LocalWorker},
450        LocationConflict, Runtime,
451    };
452    pub(super) fn fake_interceptor() -> ClientInterceptor {
453        ClientInterceptor::new(AuthInjector::NoAuth)
454    }
455    pub(super) fn fake_callback() -> Callback {
456        Callback {
457            uri: "https://localhost:8020".parse().unwrap(),
458            loc: Location::local(),
459        }
460    }
461    pub(super) fn fake_escape() -> EscapeHatch {
462        EscapeHatch::this_runtime(fake_callback())
463    }
464    pub(super) fn py_loc() -> LocationName {
465        TryInto::try_into("python").unwrap()
466    }
467    fn python_path() -> String {
468        format!(
469            "{}/../python/tests/test_worker/main.py",
470            env!("CARGO_MANIFEST_DIR")
471        )
472    }
473
474    #[tokio::test]
475    async fn empty_graph() -> anyhow::Result<()> {
476        let graph = GraphBuilder::new().build()?;
477        let runtime = Runtime::builder().start();
478        let outputs = runtime
479            .execute_graph_cb(graph, HashMap::new(), true, fake_callback(), fake_escape())
480            .await?;
481        assert!(outputs.is_empty());
482        Ok(())
483    }
484
485    #[tokio::test]
486    async fn direct_identity() -> anyhow::Result<()> {
487        let graph = {
488            let mut builder = GraphBuilder::new();
489            let [input, output] = Graph::boundary();
490
491            builder.add_edge((input, "value"), (output, "value"), None)?;
492            builder.build()?
493        };
494
495        let runtime = Runtime::builder().start();
496        let mut inputs = HashMap::new();
497        inputs.insert(Label::value(), Value::Int(42));
498        let outputs = runtime
499            .execute_graph_cb(graph, inputs, true, fake_callback(), fake_escape())
500            .await?;
501        assert_eq!(outputs[&Label::value()], Value::Int(42));
502        Ok(())
503    }
504
505    #[tokio::test]
506    async fn const_node() -> anyhow::Result<()> {
507        let graph = {
508            let mut builder = GraphBuilder::new();
509            let [_, output] = Graph::boundary();
510
511            let constant = builder.add_node(Value::Int(3))?;
512            builder.add_edge((constant, "value"), (output, "value"), None)?;
513            builder.build()?
514        };
515
516        let runtime = Runtime::builder().start();
517        let outputs = runtime
518            .execute_graph_cb(graph, HashMap::new(), true, fake_callback(), fake_escape())
519            .await?;
520        assert_eq!(outputs[&Label::value()], Value::Int(3));
521        Ok(())
522    }
523
524    #[tokio::test]
525    async fn boxed_const_node() -> anyhow::Result<()> {
526        let subgraph = {
527            let mut builder = GraphBuilder::new();
528            let [_, output] = Graph::boundary();
529
530            let const_ = builder.add_node(Value::Int(3))?;
531            builder.add_edge((const_, "value"), (output, "value"), None)?;
532            builder.build()?
533        };
534
535        let graph = {
536            let mut builder = GraphBuilder::new();
537            let [_, output] = Graph::boundary();
538
539            let subgraph = builder.add_node(subgraph)?;
540            builder.add_edge((subgraph, "value"), (output, "value"), None)?;
541            builder.build()?
542        };
543
544        let runtime = Runtime::builder().start();
545        let outputs = runtime
546            .execute_graph_cb(graph, HashMap::new(), true, fake_callback(), fake_escape())
547            .await
548            .unwrap();
549        assert!(matches!(outputs[&Label::value()], Value::Int(3)));
550        Ok(())
551    }
552
553    #[tokio::test]
554    async fn id_node() -> anyhow::Result<()> {
555        let graph = {
556            let mut builder = GraphBuilder::new();
557            let [input, output] = Graph::boundary();
558
559            let id = builder.add_node("id")?;
560            builder.add_edge((input, "id_in"), (id, "value"), None)?;
561            builder.add_edge((id, "value"), (output, "id_out"), None)?;
562            builder.build()?
563        };
564
565        let mut inputs = HashMap::new();
566        inputs.insert(TryInto::try_into("id_in")?, Value::Int(3));
567
568        let runtime = Runtime::builder().start();
569        let outputs = runtime
570            .execute_graph_cb(graph, inputs, true, fake_callback(), fake_escape())
571            .await?;
572
573        assert_eq!(outputs[&TryInto::try_into("id_out")?], Value::Int(3));
574
575        Ok(())
576    }
577
578    #[tokio::test]
579    async fn id_node_pair() -> anyhow::Result<()> {
580        let graph = {
581            let mut builder = GraphBuilder::new();
582            let [input, output] = Graph::boundary();
583
584            let in_ = builder.add_node("id")?;
585            let out = builder.add_node("id")?;
586            builder.add_edge((input, "in"), (in_, "value"), None)?;
587            builder.add_edge((in_, "value"), (out, "value"), None)?;
588            builder.add_edge((out, "value"), (output, "out"), None)?;
589            builder.build()?
590        };
591
592        let mut inputs = HashMap::new();
593        inputs.insert(TryInto::try_into("in")?, Value::Int(3));
594
595        let runtime = Runtime::builder().start();
596        let outputs = runtime
597            .execute_graph_cb(graph, inputs, true, fake_callback(), fake_escape())
598            .await?;
599        assert_eq!(outputs[&TryInto::try_into("out")?], Value::Int(3));
600        Ok(())
601    }
602
603    #[tokio::test]
604    async fn id_py_node() -> anyhow::Result<()> {
605        let graph = {
606            let mut builder = GraphBuilder::new();
607            let [input, output] = Graph::boundary();
608
609            let id_py = builder.add_node("python_nodes::id_py")?;
610            builder.add_edge((input, "id_in"), (id_py, "value"), None)?;
611            builder.add_edge((id_py, "value"), (output, "id_out"), None)?;
612            builder.build()?
613        };
614
615        let python = ExternalWorker::new_spawn(python_path(), fake_interceptor()).await?;
616        let runtime = Runtime::builder()
617            .with_worker(python, py_loc())
618            .await?
619            .start();
620
621        let mut inputs = HashMap::new();
622        inputs.insert(TryInto::try_into("id_in")?, Value::Int(3));
623
624        let outputs = runtime
625            .execute_graph_cb(graph, inputs, true, fake_callback(), fake_escape())
626            .await?;
627        assert_eq!(outputs[&TryInto::try_into("id_out")?], Value::Int(3));
628        Ok(())
629    }
630
631    fn add_entry() -> anyhow::Result<FunctionDeclaration> {
632        let lhs: Label = TryInto::try_into("lhs")?;
633        let rhs: Label = TryInto::try_into("rhs")?;
634        let out: Label = TryInto::try_into("out")?;
635
636        let type_scheme = TypeScheme::from({
637            let mut t = GraphType::new();
638            t.add_input(lhs, Type::Int);
639            t.add_input(rhs, Type::Int);
640            t.add_output(out, Type::Int);
641            t
642        });
643
644        Ok(FunctionDeclaration {
645            type_scheme,
646            description: "Add two numbers".to_string(),
647            input_order: vec![lhs, rhs],
648            output_order: vec![out],
649        })
650    }
651
652    async fn add_function(
653        mut inputs: HashMap<Label, Value>,
654        _context: OperationContext,
655    ) -> anyhow::Result<HashMap<Label, Value>> {
656        let lhs = match inputs.remove(&TryInto::try_into("lhs")?) {
657            Some(Value::Int(lhs)) => lhs,
658            _ => anyhow::bail!("Missing or invalid lhs input."),
659        };
660
661        let rhs = match inputs.remove(&TryInto::try_into("rhs")?) {
662            Some(Value::Int(lhs)) => lhs,
663            _ => anyhow::bail!("Missing or invalid rhs input."),
664        };
665
666        let mut output = HashMap::new();
667        output.insert(TryInto::try_into("out")?, Value::Int(lhs + rhs));
668        Ok(output)
669    }
670
671    #[tokio::test]
672    async fn add() -> anyhow::Result<()> {
673        let graph = {
674            let mut builder = GraphBuilder::new();
675            let [input, output] = Graph::boundary();
676
677            let add = builder.add_node("add")?;
678            builder.add_edge((input, "lhs"), (add, "lhs"), None)?;
679            builder.add_edge((input, "rhs"), (add, "rhs"), None)?;
680            builder.add_edge((add, "out"), (output, "out"), None)?;
681            builder.build()?
682        };
683
684        let mut inputs = HashMap::new();
685        inputs.insert(TryInto::try_into("lhs")?, Value::Int(3));
686        inputs.insert(TryInto::try_into("rhs")?, Value::Int(1));
687
688        let mut worker = LocalWorker::new();
689        let add = add_entry()?;
690        worker.add_function(
691            FunctionName::builtin(TryInto::try_into("add")?),
692            add,
693            || RuntimeOperation::new_fn_async(add_function),
694        );
695
696        let runtime = Runtime::builder().with_local_functions(worker)?.start();
697        let outputs = runtime
698            .execute_graph_cb(graph, inputs, true, fake_callback(), fake_escape())
699            .await?;
700        assert_eq!(outputs[&TryInto::try_into("out")?], Value::Int(4));
701        Ok(())
702    }
703
704    #[tokio::test]
705    async fn eval_add() -> anyhow::Result<()> {
706        let [input, output] = Graph::boundary();
707        let add_graph = {
708            let mut builder = GraphBuilder::new();
709
710            let add = builder.add_node("add")?;
711            let id = builder.add_node("id")?;
712            builder.add_edge((input, "id_in"), (id, "value"), None)?;
713            builder.add_edge((input, "lhs"), (add, "lhs"), None)?;
714            builder.add_edge((input, "rhs"), (add, "rhs"), None)?;
715            builder.add_edge((id, "value"), (output, "id_out"), None)?;
716            builder.add_edge((add, "out"), (output, "out"), None)?;
717            builder.build()?
718        };
719
720        let eval_graph = {
721            let mut builder = GraphBuilder::new();
722            let add_thunk_const = builder.add_node(Value::Graph(add_graph))?;
723            let eval_add = builder.add_node("eval")?;
724            builder.add_edge((add_thunk_const, "value"), (eval_add, "thunk"), None)?;
725            builder.add_edge((input, "x"), (eval_add, "lhs"), None)?;
726            builder.add_edge((input, "y"), (eval_add, "rhs"), None)?;
727            builder.add_edge((input, "z"), (eval_add, "id_in"), None)?;
728            builder.add_edge((eval_add, "out"), (output, "add_out"), None)?;
729            builder.add_edge((eval_add, "id_out"), (output, "id_out"), None)?;
730            builder.build()?
731        };
732
733        let mut inputs = HashMap::new();
734        inputs.insert(TryInto::try_into("x")?, Value::Int(5));
735        inputs.insert(TryInto::try_into("y")?, Value::Int(7));
736        inputs.insert(TryInto::try_into("z")?, Value::Bool(true));
737
738        let mut worker = LocalWorker::new();
739        let add = add_entry()?;
740        worker.add_function(
741            FunctionName::builtin(TryInto::try_into("add")?),
742            add,
743            || RuntimeOperation::new_fn_async(add_function),
744        );
745
746        let runtime = Runtime::builder().with_local_functions(worker)?.start();
747        let outputs = runtime
748            .execute_graph_cb(eval_graph, inputs, true, fake_callback(), fake_escape())
749            .await?;
750
751        assert_eq!(outputs[&TryInto::try_into("add_out")?], Value::Int(12));
752        assert_eq!(outputs[&TryInto::try_into("id_out")?], Value::Bool(true));
753
754        Ok(())
755    }
756
757    #[tokio::test]
758    async fn test_pair() -> anyhow::Result<()> {
759        let graph = {
760            let mut builder = GraphBuilder::new();
761            let [input, output] = Graph::boundary();
762
763            let make_pair = builder.add_node("make_pair")?;
764            builder.add_edge((input, "one"), (make_pair, "first"), None)?;
765            builder.add_edge((input, "two"), (make_pair, "second"), None)?;
766            builder.add_edge((make_pair, "pair"), (output, "out"), None)?;
767            builder.build()?
768        };
769
770        let mut inputs = HashMap::new();
771        inputs.insert(TryInto::try_into("one")?, Value::Int(5));
772        inputs.insert(TryInto::try_into("two")?, Value::Bool(true));
773
774        let runtime = Runtime::builder().start();
775        let outputs = runtime
776            .execute_graph_cb(graph, inputs, true, fake_callback(), fake_escape())
777            .await?;
778
779        assert_eq!(
780            outputs[&TryInto::try_into("out")?],
781            Value::Pair(Box::new((Value::Int(5), Value::Bool(true))))
782        );
783
784        Ok(())
785    }
786
787    /*
788        |   |
789      make_pair
790          |
791        unpair
792        |    |
793    */
794    #[tokio::test]
795    async fn test_pair_unpair() -> anyhow::Result<()> {
796        let graph = {
797            let mut builder = GraphBuilder::new();
798            let [input, output] = Graph::boundary();
799
800            let make_pair = builder.add_node("make_pair")?;
801            let unpack_pair = builder.add_node("unpack_pair")?;
802            builder.add_edge((input, "one"), (make_pair, "first"), None)?;
803            builder.add_edge((input, "two"), (make_pair, "second"), None)?;
804            builder.add_edge((make_pair, "pair"), (unpack_pair, "pair"), None)?;
805            builder.add_edge((unpack_pair, "first"), (output, "one"), None)?;
806            builder.add_edge((unpack_pair, "second"), (output, "two"), None)?;
807            builder.build()?
808        };
809
810        let mut inputs = HashMap::new();
811        inputs.insert(TryInto::try_into("one")?, Value::Int(4));
812        inputs.insert(TryInto::try_into("two")?, Value::Bool(false));
813
814        let runtime = Runtime::builder().start();
815        let outputs = runtime
816            .execute_graph_cb(graph, inputs.clone(), true, fake_callback(), fake_escape())
817            .await
818            .unwrap();
819
820        assert_eq!(inputs, outputs);
821
822        Ok(())
823    }
824
825    #[tokio::test]
826    async fn test_vec() -> anyhow::Result<()> {
827        let graph = {
828            let mut builder = GraphBuilder::new();
829            let [input, output] = Graph::boundary();
830
831            let make_list = builder.add_node("push")?;
832            builder.add_edge((input, "item"), (make_list, "item"), None)?;
833            builder.add_edge((input, "vec"), (make_list, "vec"), None)?;
834            builder.add_edge((make_list, "vec"), (output, "out"), None)?;
835            builder.build()?
836        };
837
838        let runtime = Runtime::builder().start();
839        let values = vec![Value::Int(1), Value::Int(2), Value::Int(3)];
840
841        let mut inputs = HashMap::new();
842        inputs.insert(
843            TryInto::try_into("vec")?,
844            Value::Vec(vec![values[0].clone(), values[1].clone()]),
845        );
846        inputs.insert(TryInto::try_into("item")?, values[2].clone());
847
848        let outputs = runtime
849            .execute_graph_cb(graph, inputs, true, fake_callback(), fake_escape())
850            .await?;
851
852        assert_eq!(outputs[&TryInto::try_into("out")?], Value::Vec(values));
853        Ok(())
854    }
855
856    /*
857        |  |
858      push
859        |
860       pop
861       |  |
862    */
863    #[tokio::test]
864    async fn test_pop() -> anyhow::Result<()> {
865        let graph = {
866            let mut builder = GraphBuilder::new();
867            let [input, output] = Graph::boundary();
868
869            let make_list = builder.add_node("push")?;
870            let unpack_list = builder.add_node("pop")?;
871            builder.add_edge((input, "item"), (make_list, "item"), None)?;
872            builder.add_edge((input, "vec"), (make_list, "vec"), None)?;
873            builder.add_edge((make_list, "vec"), (unpack_list, "vec"), None)?;
874            builder.add_edge((unpack_list, "item"), (output, "item"), None)?;
875            builder.add_edge((unpack_list, "vec"), (output, "vec"), None)?;
876            builder.build()?
877        };
878
879        let mut inputs = HashMap::new();
880        inputs.insert(TryInto::try_into("item")?, Value::Int(4));
881        inputs.insert(
882            TryInto::try_into("vec")?,
883            Value::Vec(vec![Value::Int(3), Value::Int(2)]),
884        );
885
886        let runtime = Runtime::builder().start();
887        let outputs = runtime
888            .execute_graph_cb(graph, inputs.clone(), true, fake_callback(), fake_escape())
889            .await
890            .unwrap();
891
892        assert_eq!(inputs, outputs);
893        Ok(())
894    }
895
896    /*
897        |
898       tag
899        |
900       match
901        |
902    */
903    #[tokio::test]
904    async fn test_match() -> anyhow::Result<()> {
905        let [input, output] = Graph::boundary();
906        // Test a variant < foo: Int | bar: Vec<Int> >
907        let foo_graph = {
908            let mut builder = GraphBuilder::new();
909
910            let inc = builder.add_node("iadd")?;
911            builder.add_edge((input, "value"), (inc, "a"), None)?;
912            builder.add_edge((input, "xtra"), (inc, "b"), None)?;
913            builder.add_edge((inc, "value"), (output, "value"), None)?;
914            builder.build()?
915        };
916        let bar_graph = {
917            let mut builder = GraphBuilder::new();
918            let pop = builder.add_node("pop")?;
919            let d1 = builder.add_node("discard")?;
920            builder.add_edge((input, "value"), (pop, "vec"), None)?;
921            builder.add_edge((pop, "vec"), (d1, "value"), None)?;
922            builder.add_edge((pop, "item"), (output, "value"), None)?;
923            let d2 = builder.add_node("discard")?;
924            builder.add_edge((input, "xtra"), (d2, "value"), None)?;
925            builder.build()?
926        };
927        let match_graph = {
928            let mut builder = GraphBuilder::new();
929            let n = builder.add_node(Node::Match)?;
930            let ev = builder.add_node("eval")?;
931            let foo_g = builder.add_node(Value::Graph(foo_graph))?;
932            let bar_g = builder.add_node(Value::Graph(bar_graph))?;
933            builder.add_edge((input, "arg"), (n, Label::variant_value()), None)?;
934            builder.add_edge((foo_g, "value"), (n, "foo"), None)?;
935            builder.add_edge((bar_g, "value"), (n, "bar"), None)?;
936            builder.add_edge((n, "thunk"), (ev, "thunk"), None)?;
937            builder.add_edge((ev, "value"), (output, "value"), None)?;
938            builder.add_edge((input, "if_foo"), (ev, "xtra"), None)?;
939            builder.build()?
940        };
941
942        let runtime = Runtime::builder().start();
943
944        // Test with a "foo" (providing variant value directly)
945        let arg = Value::Variant(TryInto::try_into("foo")?, Box::new(Value::Int(3)));
946        let inputs = HashMap::from([
947            (TryInto::try_into("arg")?, arg),
948            (TryInto::try_into("if_foo")?, Value::Int(5)),
949        ]);
950        let outputs = runtime
951            .execute_graph_cb(
952                match_graph.clone(),
953                inputs,
954                true,
955                fake_callback(),
956                fake_escape(),
957            )
958            .await?;
959        assert_eq!(outputs, HashMap::from([(Label::value(), Value::Int(8))]));
960
961        // Test with a "bar" (built via a Tag node)
962        let v = Value::Vec(vec![Value::Int(31), Value::Int(42)]);
963        let graph2 = {
964            let mut builder = GraphBuilder::new();
965            let vec = builder.add_node(v)?;
966            let mkv = builder.add_node(Node::Tag(TryInto::try_into("bar")?))?;
967            builder.add_edge((vec, "value"), (mkv, "value"), None)?;
968            let m = builder.add_node(match_graph.clone())?; // Box node
969            builder.add_edge((mkv, "value"), (m, "arg"), None)?;
970            let c = builder.add_node(Value::Int(101))?;
971            builder.add_edge((c, "value"), (m, "if_foo"), None)?;
972            builder.add_edge((m, "value"), (output, "result"), None)?;
973            builder.build()?
974        };
975        let outputs = runtime
976            .execute_graph_cb(
977                graph2,
978                HashMap::from([]),
979                true,
980                fake_callback(),
981                fake_escape(),
982            )
983            .await?;
984        assert_eq!(
985            outputs,
986            HashMap::from([(TryInto::try_into("result")?, Value::Int(42))])
987        );
988
989        Ok(())
990    }
991
992    #[tokio::test]
993    async fn test_switch() -> anyhow::Result<()> {
994        let graph = {
995            let mut builder = GraphBuilder::new();
996            let [input, output] = Graph::boundary();
997
998            let switch = builder.add_node("switch")?;
999            builder.add_edge((input, "true"), (switch, "if_true"), None)?;
1000            builder.add_edge((input, "false"), (switch, "if_false"), None)?;
1001            builder.add_edge((input, "predicate"), (switch, "pred"), None)?;
1002            builder.add_edge((switch, "value"), (output, "out"), None)?;
1003            builder.build()?
1004        };
1005
1006        let runtime = Runtime::builder().start();
1007
1008        // Test 'true' branch
1009        let mut inputs = HashMap::new();
1010        inputs.insert(TryInto::try_into("predicate")?, Value::Bool(true));
1011        inputs.insert(TryInto::try_into("true")?, Value::Int(4));
1012        inputs.insert(TryInto::try_into("false")?, Value::Int(5));
1013
1014        let outputs = runtime
1015            .execute_graph_cb(graph.clone(), inputs, true, fake_callback(), fake_escape())
1016            .await
1017            .unwrap();
1018        assert_eq!(outputs[&TryInto::try_into("out")?], Value::Int(4));
1019
1020        // Test 'false' branch
1021        let mut inputs = HashMap::new();
1022        inputs.insert(TryInto::try_into("predicate")?, Value::Bool(false));
1023        inputs.insert(TryInto::try_into("true")?, Value::Int(4));
1024        inputs.insert(TryInto::try_into("false")?, Value::Int(5));
1025
1026        let outputs = runtime
1027            .execute_graph_cb(graph, inputs, true, fake_callback(), fake_escape())
1028            .await?;
1029        assert_eq!(outputs[&TryInto::try_into("out")?], Value::Int(5));
1030        Ok(())
1031    }
1032
1033    #[tokio::test]
1034    async fn test_loop() -> anyhow::Result<()> {
1035        let stopval = 3;
1036
1037        let [input, output] = Graph::boundary();
1038        let lt_graph = {
1039            let mut builder = GraphBuilder::new();
1040
1041            let lt = builder.add_node("ilt")?;
1042            let stop_val = builder.add_node(Value::Int(stopval))?;
1043
1044            builder.add_edge((input, "number"), (lt, "a"), None)?;
1045            builder.add_edge((stop_val, "value"), (lt, "b"), None)?;
1046            builder.add_edge((lt, "value"), (output, "pred"), None)?;
1047
1048            builder.build()?
1049        };
1050
1051        let break_graph = {
1052            let mut builder = GraphBuilder::new();
1053            let makest = builder.add_node("make_struct")?;
1054            builder.add_edge((input, "number"), (makest, "iter"), None)?;
1055            builder.add_edge((input, "accum"), (makest, "final"), None)?;
1056
1057            let brk = builder.add_node(Node::Tag(Label::break_()))?;
1058            builder.add_edge((makest, "struct"), (brk, "value"), None)?;
1059            builder.add_edge((brk, "value"), (output, "value"), None)?;
1060            builder.build()?
1061        };
1062
1063        let update_graph = {
1064            let mut builder = GraphBuilder::new();
1065            let numbers = builder.add_node("copy")?;
1066            builder.add_edge((input, "number"), (numbers, "value"), None)?;
1067            let makest = builder.add_node("make_struct")?;
1068
1069            let add1 = builder.add_node("iadd")?;
1070            let increment = builder.add_node(Value::Int(1))?;
1071            builder.add_edge((numbers, "value_0"), (add1, "b"), None)?;
1072            builder.add_edge((increment, "value"), (add1, "a"), None)?;
1073            builder.add_edge((add1, "value"), (makest, "number"), None)?;
1074
1075            let add2 = builder.add_node("iadd")?;
1076            builder.add_edge((input, "accum"), (add2, "a"), None)?;
1077            builder.add_edge((numbers, "value_1"), (add2, "b"), None)?;
1078            builder.add_edge((add2, "value"), (makest, "accum"), None)?;
1079
1080            let update = builder.add_node(Node::Tag(Label::continue_()))?;
1081            builder.add_edge((makest, "struct"), (update, "value"), None)?;
1082            builder.add_edge((update, "value"), (output, "value"), None)?;
1083
1084            builder.build()?
1085        };
1086
1087        let body_graph = {
1088            let mut builder = GraphBuilder::new();
1089            let inputs = builder.add_node("unpack_struct")?;
1090            builder.add_edge((input, Label::value()), (inputs, "struct"), None)?;
1091            let switch = builder.add_node("switch")?;
1092            let copy = builder.add_node("copy")?;
1093            let eval = builder.add_node("eval")?;
1094            let condition = builder.add_node(Node::local_box(lt_graph))?;
1095            let update = builder.add_node(Value::Graph(update_graph))?;
1096            let return_ = builder.add_node(Value::Graph(break_graph))?;
1097
1098            builder.add_edge((inputs, "number"), (copy, "value"), None)?;
1099            builder.add_edge((copy, "value_0"), (condition, "number"), None)?;
1100
1101            builder.add_edge((update, "value"), (switch, "if_true"), None)?;
1102            builder.add_edge((return_, "value"), (switch, "if_false"), None)?;
1103            builder.add_edge((condition, "pred"), (switch, "pred"), None)?;
1104
1105            builder.add_edge((switch, "value"), (eval, "thunk"), None)?;
1106            builder.add_edge((inputs, "accum"), (eval, "accum"), None)?;
1107            builder.add_edge((copy, "value_1"), (eval, "number"), None)?;
1108
1109            builder.add_edge((eval, "value"), (output, "value"), None)?;
1110
1111            builder.build()?
1112        };
1113        let graph = {
1114            let mut builder = GraphBuilder::new();
1115            let init = builder.add_node("make_struct")?;
1116            builder.add_edge((input, "initial"), (init, "number"), None)?;
1117            builder.add_edge((input, "extra"), (init, "accum"), None)?;
1118
1119            let loop_ = builder.add_node("loop")?;
1120            builder.add_edge((init, "struct"), (loop_, Label::value()), None)?;
1121
1122            let body = builder.add_node(Value::Graph(body_graph))?;
1123
1124            builder.add_edge((body, "value"), (loop_, "body"), None)?;
1125
1126            let outs = builder.add_node("unpack_struct")?;
1127            builder.add_edge((loop_, Label::value()), (outs, "struct"), None)?;
1128
1129            builder.add_edge((outs, "iter"), (output, "iter"), None)?;
1130            builder.add_edge((outs, "final"), (output, "final"), None)?;
1131            builder.build()?
1132        };
1133
1134        let python = ExternalWorker::new_spawn(python_path(), fake_interceptor()).await?;
1135        let runtime = Runtime::builder()
1136            .with_worker(python, py_loc())
1137            .await?
1138            .start();
1139
1140        async fn run_loop(
1141            runtime: &Runtime,
1142            graph: crate::Graph,
1143            initial: i64,
1144            extra: i64,
1145        ) -> anyhow::Result<(i64, i64)> {
1146            let inputs = HashMap::from([
1147                (TryInto::try_into("initial")?, Value::Int(initial)),
1148                (TryInto::try_into("extra")?, Value::Int(extra)),
1149            ]);
1150            let outputs = runtime
1151                .execute_graph_cb(graph, inputs, true, fake_callback(), fake_escape())
1152                .await?;
1153            let iter = outputs.get(&TryInto::try_into("iter")?);
1154            let final_ = outputs.get(&TryInto::try_into("final")?);
1155            match (iter, final_, outputs.len()) {
1156                (Some(Value::Int(iter)), Some(Value::Int(final_)), 2) => Ok((*iter, *final_)),
1157                _ => bail!(
1158                    "Should have returned two ints iter&final, not {:?}",
1159                    outputs
1160                ),
1161            }
1162        }
1163
1164        assert_eq!(run_loop(&runtime, graph.clone(), 1, 5).await?, (3, 8));
1165
1166        assert_eq!(run_loop(&runtime, graph.clone(), -1, 0).await?, (3, 2));
1167
1168        assert_eq!(run_loop(&runtime, graph.clone(), 5, 10).await?, (5, 10));
1169
1170        Ok(())
1171    }
1172
1173    // sequence unpair->pair to result in identity graph
1174    #[tokio::test]
1175    async fn test_sequence() -> anyhow::Result<()> {
1176        let [input, output] = Graph::boundary();
1177        let unpack = {
1178            let mut builder = GraphBuilder::new();
1179
1180            let unpack = builder.add_node("unpack_pair")?;
1181
1182            builder.add_edge((input, "p"), (unpack, "pair"), None)?;
1183            builder.add_edge((unpack, "first"), (output, "f"), None)?;
1184            builder.add_edge((unpack, "second"), (output, "s"), None)?;
1185
1186            builder.build()?
1187        };
1188
1189        let pack = {
1190            let mut builder = GraphBuilder::new();
1191            let pack = builder.add_node("make_pair")?;
1192
1193            builder.add_edge((input, "f"), (pack, "first"), None)?;
1194            builder.add_edge((input, "s"), (pack, "second"), None)?;
1195            builder.add_edge((pack, "pair"), (output, "p"), None)?;
1196
1197            builder.build()?
1198        };
1199
1200        let seq_eval = {
1201            let mut builder = GraphBuilder::new();
1202            let c1 = builder.add_node(Node::Const(Value::Graph(unpack)))?;
1203            let c2 = builder.add_node(Node::Const(Value::Graph(pack)))?;
1204            let seq = builder.add_node("sequence")?;
1205
1206            builder.add_edge((c1, "value"), (seq, "first"), None)?;
1207            builder.add_edge((c2, "value"), (seq, "second"), None)?;
1208
1209            let eval = builder.add_node("eval")?;
1210
1211            builder.add_edge((seq, "sequenced"), (eval, "thunk"), None)?;
1212            builder.add_edge((input, "p"), (eval, "p"), None)?;
1213            builder.add_edge((eval, "p"), (output, "p"), None)?;
1214
1215            builder.build()?
1216        };
1217
1218        let runtime = Runtime::builder().start();
1219
1220        let mut inputs = HashMap::new();
1221        inputs.insert(
1222            TryInto::try_into("p")?,
1223            Value::Pair(Box::new((Value::Int(1), Value::Str("word".into())))),
1224        );
1225
1226        let outputs = runtime
1227            .execute_graph_cb(
1228                seq_eval,
1229                inputs.clone(),
1230                true,
1231                fake_callback(),
1232                fake_escape(),
1233            )
1234            .await
1235            .unwrap();
1236        assert_eq!(outputs, inputs);
1237
1238        Ok(())
1239    }
1240
1241    // Multithreaded tests. Prior to PR #160 which introduced the test and fixed the runtime,
1242    // each of these were failing (nondeterministically) about 50% of the time. Putting
1243    // the body of the test inside a loop doesn't seem to change that, one must add a new test
1244    // with '#[tokio::test(flavor="multi_thread")] async fn' to get another roll of the die.
1245    fn discard_return() -> Result<Graph, GraphBuilderError> {
1246        let mut builder = GraphBuilder::new();
1247        let [input, output] = Graph::boundary();
1248
1249        builder.add_edge((input, "keep"), (output, "res"), None)?;
1250        let disc = builder.add_node("discard")?;
1251        builder.add_edge((input, "ignore"), (disc, "value"), None)?;
1252        builder.build()
1253    }
1254
1255    #[tokio::test(flavor = "multi_thread")]
1256    async fn test_box() -> anyhow::Result<()> {
1257        let outer = {
1258            let mut builder = GraphBuilder::new();
1259            let [input, output] = Graph::boundary();
1260
1261            let inner = builder.add_node(Node::local_box(discard_return()?))?;
1262            let cst5 = builder.add_node(Value::Int(5))?;
1263            builder.add_edge((cst5, Label::value()), (inner, "keep"), None)?;
1264            builder.add_edge((input, "arg"), (inner, "ignore"), None)?;
1265            builder.add_edge((inner, "res"), (output, "value"), None)?;
1266            builder.build()?
1267        };
1268
1269        let runtime = Runtime::builder().start();
1270        let inputs = HashMap::from([(TryInto::try_into("arg")?, Value::Float(3.1))]);
1271        let expected_outputs = HashMap::from([(TryInto::try_into("value")?, Value::Int(5))]);
1272
1273        let outputs = runtime
1274            .execute_graph_cb(outer, inputs.clone(), true, fake_callback(), fake_escape())
1275            .await?;
1276        assert_eq!(outputs, expected_outputs);
1277
1278        Ok(())
1279    }
1280
1281    #[tokio::test(flavor = "multi_thread")]
1282    async fn test_partial() -> anyhow::Result<()> {
1283        let outer = {
1284            let mut builder = GraphBuilder::new();
1285            let [input, output] = Graph::boundary();
1286
1287            let inner = builder.add_node(Value::Graph(discard_return()?))?;
1288            let cst5 = builder.add_node(Value::Int(5))?;
1289
1290            let part = builder.add_node("partial")?;
1291            builder.add_edge((inner, Label::value()), (part, Label::thunk()), None)?;
1292            builder.add_edge((cst5, Label::value()), (part, "keep"), None)?;
1293
1294            let ev = builder.add_node("eval")?;
1295            builder.add_edge((part, Label::value()), (ev, "thunk"), None)?;
1296            builder.add_edge((input, "arg"), (ev, "ignore"), None)?;
1297            builder.add_edge((ev, "res"), (output, "value"), None)?;
1298            builder.build()?
1299        };
1300
1301        let runtime = Runtime::builder().start();
1302        let inputs = HashMap::from([(TryInto::try_into("arg")?, Value::Float(3.1))]);
1303        let expected_outputs = HashMap::from([(TryInto::try_into("value")?, Value::Int(5))]);
1304
1305        let outputs = runtime
1306            .execute_graph_cb(outer, inputs, true, fake_callback(), fake_escape())
1307            .await?;
1308        assert_eq!(outputs, expected_outputs);
1309
1310        Ok(())
1311    }
1312
1313    // unpack and repack a Struct
1314    #[tokio::test]
1315    async fn test_struct() -> anyhow::Result<()> {
1316        let graph = {
1317            let mut builder = GraphBuilder::new();
1318            let [input, output] = Graph::boundary();
1319
1320            let unpack = builder.add_node("unpack_struct")?;
1321            let pack = builder.add_node("make_struct")?;
1322
1323            builder.add_edge((input, "st"), (unpack, "struct"), None)?;
1324            builder.add_edge((unpack, "x"), (pack, "x"), None)?;
1325            builder.add_edge((unpack, "y"), (pack, "y"), None)?;
1326            builder.add_edge((pack, "struct"), (output, "st"), None)?;
1327
1328            builder.build()?
1329        };
1330
1331        let runtime = Runtime::builder().start();
1332        let mut fields = HashMap::new();
1333        fields.insert(TryInto::try_into("x")?, Value::Int(1));
1334        fields.insert(TryInto::try_into("y")?, Value::Float(2.3));
1335
1336        let mut inputs = HashMap::new();
1337        inputs.insert(TryInto::try_into("st")?, Value::Struct(fields));
1338
1339        let outputs = runtime
1340            .execute_graph_cb(graph, inputs.clone(), true, fake_callback(), fake_escape())
1341            .await
1342            .unwrap();
1343        assert_eq!(outputs, inputs);
1344
1345        Ok(())
1346    }
1347
1348    // unpack and repack a Struct
1349    #[tokio::test]
1350    async fn test_map() -> anyhow::Result<()> {
1351        let graph = {
1352            let mut builder = GraphBuilder::new();
1353            let [input, output] = Graph::boundary();
1354
1355            let insert = builder.add_node("insert_key")?;
1356            let remove = builder.add_node("remove_key")?;
1357            let copy = builder.add_node("copy")?;
1358
1359            builder.add_edge((input, "mp"), (insert, "map"), None)?;
1360            builder.add_edge((input, "v"), (insert, "val"), None)?;
1361            builder.add_edge((input, "k"), (copy, "value"), None)?;
1362            builder.add_edge((copy, "value_0"), (insert, "key"), None)?;
1363
1364            builder.add_edge((insert, "map"), (remove, "map"), None)?;
1365            builder.add_edge((copy, "value_1"), (remove, "key"), None)?;
1366            builder.add_edge((remove, "map"), (output, "mp"), None)?;
1367            builder.add_edge((remove, "val"), (output, "vl"), None)?;
1368
1369            builder.build()?
1370        };
1371
1372        let runtime = Runtime::builder().start();
1373        let mut map = HashMap::new();
1374        map.insert(Value::Str("x".into()), Value::Float(2.3));
1375
1376        let insert_key = Value::Str("y".into());
1377        let insert_val = Value::Float(1.2);
1378
1379        let mut inputs = HashMap::new();
1380        inputs.insert(TryInto::try_into("mp")?, Value::Map(map.clone()));
1381        inputs.insert(TryInto::try_into("k")?, insert_key);
1382        inputs.insert(TryInto::try_into("v")?, insert_val.clone());
1383
1384        let outputs = runtime
1385            .execute_graph_cb(graph, inputs.clone(), true, fake_callback(), fake_escape())
1386            .await
1387            .unwrap();
1388
1389        let mut expected_outputs = HashMap::new();
1390        expected_outputs.insert(TryInto::try_into("mp")?, Value::Map(map));
1391        expected_outputs.insert(TryInto::try_into("vl")?, insert_val);
1392        assert_eq!(outputs, expected_outputs);
1393
1394        Ok(())
1395    }
1396
1397    #[tokio::test]
1398    async fn test_equality() -> anyhow::Result<()> {
1399        let graph = {
1400            let mut builder = GraphBuilder::new();
1401            let [input, output] = Graph::boundary();
1402
1403            let equality = builder.add_node("eq")?;
1404
1405            builder.add_edge((input, "v0"), (equality, "value_0"), None)?;
1406            builder.add_edge((input, "v1"), (equality, "value_1"), None)?;
1407
1408            builder.add_edge((equality, "result"), (output, "p"), None)?;
1409
1410            builder.build()?
1411        };
1412
1413        let runtime = Runtime::builder().start();
1414
1415        let mut inputs = HashMap::new();
1416        inputs.insert(TryInto::try_into("v0")?, Value::Int(1));
1417        inputs.insert(TryInto::try_into("v1")?, Value::Int(1));
1418
1419        let outputs = runtime
1420            .execute_graph_cb(
1421                graph.clone(),
1422                inputs.clone(),
1423                true,
1424                fake_callback(),
1425                fake_escape(),
1426            )
1427            .await
1428            .unwrap();
1429
1430        assert_eq!(
1431            outputs.get(&TryInto::try_into("p")?),
1432            Some(&Value::Bool(true))
1433        );
1434
1435        let mut inputs = HashMap::new();
1436        inputs.insert(TryInto::try_into("v0")?, Value::Str("foo".into()));
1437        inputs.insert(TryInto::try_into("v1")?, Value::Str("foof".into()));
1438
1439        let outputs = runtime
1440            .execute_graph_cb(graph, inputs.clone(), true, fake_callback(), fake_escape())
1441            .await
1442            .unwrap();
1443
1444        assert_eq!(
1445            outputs.get(&TryInto::try_into("p")?),
1446            Some(&Value::Bool(false))
1447        );
1448        Ok(())
1449    }
1450
1451    // Parallel of simple graphs with node name clash
1452    #[tokio::test]
1453    async fn test_parallel() -> anyhow::Result<()> {
1454        let [input, output] = Graph::boundary();
1455        let graph1 = {
1456            let mut builder = GraphBuilder::new();
1457
1458            let c1 = builder.add_node(Node::Const(Value::Int(1)))?;
1459            let add = builder.add_node("iadd")?;
1460
1461            builder.add_edge((c1, "value"), (add, "b"), None)?;
1462            builder.add_edge((input, "vl"), (add, "a"), None)?;
1463            builder.add_edge((add, "value"), (output, "vl"), None)?;
1464
1465            builder.build()?
1466        };
1467
1468        let graph2 = {
1469            let mut builder = GraphBuilder::new();
1470            let c1 = builder.add_node(Node::Const(Value::Str("word2".into())))?;
1471            builder.add_edge((input, "vr"), (output, "vr_0"), None)?;
1472            builder.add_edge((c1, "value"), (output, "vr_1"), None)?;
1473
1474            builder.build()?
1475        };
1476
1477        let graph_par = {
1478            let mut builder = GraphBuilder::new();
1479
1480            let c1 = builder.add_node(Node::Const(Value::Graph(graph1)))?;
1481            let c2 = builder.add_node(Node::Const(Value::Graph(graph2)))?;
1482            let par = builder.add_node("parallel")?;
1483            let eval = builder.add_node("eval")?;
1484
1485            builder.add_edge((c1, "value"), (par, "left"), None)?;
1486            builder.add_edge((c2, "value"), (par, "right"), None)?;
1487            builder.add_edge((par, "value"), (eval, "thunk"), None)?;
1488            builder.add_edge((input, "value_0"), (eval, "vl"), None)?;
1489            builder.add_edge((input, "value_1"), (eval, "vr"), None)?;
1490            builder.add_edge((eval, "vl"), (output, "value_0"), None)?;
1491            builder.add_edge((eval, "vr_0"), (output, "value_1"), None)?;
1492            builder.add_edge((eval, "vr_1"), (output, "value_2"), None)?;
1493
1494            builder.build()?
1495        };
1496
1497        let runtime = Runtime::builder().start();
1498
1499        let inputs = HashMap::from([
1500            (TryInto::try_into("value_0")?, Value::Int(2)),
1501            (TryInto::try_into("value_1")?, Value::Str("word".into())),
1502        ]);
1503
1504        let outputs = runtime
1505            .execute_graph_cb(graph_par, inputs, true, fake_callback(), fake_escape())
1506            .await?;
1507
1508        assert_eq!(
1509            outputs.get(&TryInto::try_into("value_0")?),
1510            Some(&Value::Int(3))
1511        );
1512        assert_eq!(
1513            outputs.get(&TryInto::try_into("value_1")?),
1514            Some(&Value::Str("word".into()))
1515        );
1516        assert_eq!(
1517            outputs.get(&TryInto::try_into("value_2")?),
1518            Some(&Value::Str("word2".into()))
1519        );
1520
1521        Ok(())
1522    }
1523
1524    #[tokio::test]
1525    async fn test_map_builtin() -> anyhow::Result<()> {
1526        let [input, output] = Graph::boundary();
1527        let thunk = {
1528            let mut builder = GraphBuilder::new();
1529
1530            let c1 = builder.add_node(Node::Const(Value::Int(2)))?;
1531            let mul = builder.add_node("imul")?;
1532
1533            builder.add_edge((c1, Label::value()), (mul, "b"), None)?;
1534            builder.add_edge((input, Label::value()), (mul, "a"), None)?;
1535            builder.add_edge((mul, Label::value()), (output, Label::value()), None)?;
1536
1537            builder.build()?
1538        };
1539
1540        let graph = {
1541            let mut builder = GraphBuilder::new();
1542
1543            let c1 = builder.add_node(Node::Const(Value::Graph(thunk)))?;
1544            let map = builder.add_node("map")?;
1545
1546            builder.add_edge((c1, Label::value()), (map, Label::thunk()), None)?;
1547            builder.add_edge((input, Label::value()), (map, Label::value()), None)?;
1548            builder.add_edge((map, Label::value()), (output, Label::value()), None)?;
1549
1550            builder.build()?
1551        };
1552
1553        let input_vec = Value::Vec(vec![Value::Int(1), Value::Int(2), Value::Int(3)]);
1554
1555        let runtime = Runtime::builder().start();
1556
1557        let inputs = HashMap::from([(Label::value(), input_vec)]);
1558
1559        let outputs = runtime
1560            .execute_graph_cb(graph, inputs, true, fake_callback(), fake_escape())
1561            .await?;
1562
1563        println!("{:?}", outputs);
1564
1565        assert_eq!(
1566            outputs.get(&Label::value()),
1567            Some(&Value::Vec(vec![
1568                Value::Int(2),
1569                Value::Int(4),
1570                Value::Int(6)
1571            ]))
1572        );
1573
1574        Ok(())
1575    }
1576
1577    #[tokio::test]
1578    async fn test_map_concurrent() -> anyhow::Result<()> {
1579        let [input, output] = Graph::boundary();
1580        let thunk = {
1581            let mut builder = GraphBuilder::new();
1582
1583            let c1 = builder.add_node(Value::Float(1.0))?;
1584            let sleep = builder.add_node("sleep")?;
1585
1586            builder.add_edge((c1, Label::value()), (sleep, "delay_secs"), None)?;
1587            builder.add_edge((input, Label::value()), (sleep, Label::value()), None)?;
1588            builder.add_edge((sleep, Label::value()), (output, Label::value()), None)?;
1589
1590            builder.build()?
1591        };
1592
1593        let graph = {
1594            let mut builder = GraphBuilder::new();
1595
1596            let c1 = builder.add_node(Node::Const(Value::Graph(thunk)))?;
1597            let map = builder.add_node("map")?;
1598
1599            builder.add_edge((c1, Label::value()), (map, Label::thunk()), None)?;
1600            builder.add_edge((input, Label::value()), (map, Label::value()), None)?;
1601            builder.add_edge((map, Label::value()), (output, Label::value()), None)?;
1602
1603            builder.build()?
1604        };
1605
1606        let input_vec = Value::Vec(vec![
1607            Value::Int(1),
1608            Value::Int(2),
1609            Value::Int(3),
1610            Value::Int(4),
1611            Value::Int(5),
1612        ]);
1613
1614        let runtime = Runtime::builder()
1615            .with_worker(
1616                ExternalWorker::new_spawn(python_path(), fake_interceptor()).await?,
1617                py_loc(),
1618            )
1619            .await?
1620            .start();
1621
1622        let inputs = HashMap::from([(Label::value(), input_vec)]);
1623
1624        let earlier = std::time::Instant::now();
1625
1626        let outputs = runtime
1627            .execute_graph_cb(graph, inputs, true, fake_callback(), fake_escape())
1628            .await?;
1629
1630        assert!(earlier.elapsed() < Duration::from_millis(1250));
1631        assert_eq!(
1632            outputs.get(&Label::value()),
1633            Some(&Value::Vec(vec![
1634                Value::Int(1),
1635                Value::Int(2),
1636                Value::Int(3),
1637                Value::Int(4),
1638                Value::Int(5)
1639            ]))
1640        );
1641
1642        Ok(())
1643    }
1644
1645    #[fixture]
1646    fn div_graph() -> anyhow::Result<Graph> {
1647        let mut builder = GraphBuilder::new();
1648        let [input, output] = Graph::boundary();
1649
1650        let div = builder.add_node("idiv")?;
1651        builder.add_edge((input, "a"), (div, "a"), None)?;
1652        builder.add_edge((input, "b"), (div, "b"), None)?;
1653        builder.add_edge((div, "value"), (output, "value"), None)?;
1654        let graph = builder.build()?;
1655        Ok(graph)
1656    }
1657
1658    #[rstest]
1659    #[tokio::test]
1660    async fn test_idiv_op(div_graph: anyhow::Result<Graph>) -> anyhow::Result<()> {
1661        let graph = div_graph?;
1662
1663        let runtime = Runtime::builder().start();
1664
1665        let inputs = HashMap::from([
1666            (TryInto::try_into("a")?, Value::Int(10)),
1667            (TryInto::try_into("b")?, Value::Int(2)),
1668        ]);
1669
1670        let outputs = runtime
1671            .execute_graph_cb(graph, inputs, true, fake_callback(), fake_escape())
1672            .await?;
1673
1674        assert_eq!(
1675            outputs.get(&TryInto::try_into("value")?),
1676            Some(&Value::Int(5))
1677        );
1678
1679        Ok(())
1680    }
1681
1682    #[rstest]
1683    #[tokio::test]
1684    #[should_panic(expected = "Tried to divide by zero")]
1685    async fn test_idiv_zero(div_graph: anyhow::Result<Graph>) {
1686        let graph = div_graph.unwrap();
1687
1688        let runtime = Runtime::builder().start();
1689
1690        let inputs = HashMap::from([
1691            (TryInto::try_into("a").unwrap(), Value::Int(10)),
1692            (TryInto::try_into("b").unwrap(), Value::Int(0)),
1693        ]);
1694
1695        runtime
1696            .execute_graph_cb(graph, inputs, true, fake_callback(), fake_escape())
1697            .await
1698            .unwrap();
1699    }
1700
1701    #[fixture]
1702    fn mod_graph() -> anyhow::Result<Graph> {
1703        let mut builder = GraphBuilder::new();
1704        let [input, output] = Graph::boundary();
1705
1706        let mod_ = builder.add_node("imod")?;
1707        builder.add_edge((input, "a"), (mod_, "a"), None)?;
1708        builder.add_edge((input, "b"), (mod_, "b"), None)?;
1709        builder.add_edge((mod_, "value"), (output, "value"), None)?;
1710        let graph = builder.build()?;
1711        Ok(graph)
1712    }
1713
1714    #[rstest]
1715    #[tokio::test]
1716    async fn test_imod_op(mod_graph: anyhow::Result<Graph>) -> anyhow::Result<()> {
1717        let graph = mod_graph?;
1718
1719        let runtime = Runtime::builder().start();
1720
1721        let inputs = HashMap::from([
1722            (TryInto::try_into("a")?, Value::Int(10)),
1723            (TryInto::try_into("b")?, Value::Int(3)),
1724        ]);
1725
1726        let outputs = runtime
1727            .execute_graph_cb(graph, inputs, true, fake_callback(), fake_escape())
1728            .await?;
1729
1730        assert_eq!(
1731            outputs.get(&TryInto::try_into("value")?),
1732            Some(&Value::Int(1))
1733        );
1734
1735        Ok(())
1736    }
1737
1738    #[rstest]
1739    #[tokio::test]
1740    #[should_panic(expected = "Tried to take modulus with zero.")]
1741    async fn test_imod_zero(mod_graph: anyhow::Result<Graph>) {
1742        let graph = mod_graph.unwrap();
1743
1744        let runtime = Runtime::builder().start();
1745
1746        let inputs = HashMap::from([
1747            (TryInto::try_into("a").unwrap(), Value::Int(10)),
1748            (TryInto::try_into("b").unwrap(), Value::Int(0)),
1749        ]);
1750
1751        runtime
1752            .execute_graph_cb(graph, inputs, true, fake_callback(), fake_escape())
1753            .await
1754            .unwrap();
1755    }
1756
1757    #[tokio::test]
1758    async fn test_loc_merge() -> anyhow::Result<()> {
1759        let py_loc_2: LocationName = TryInto::try_into("python2")?;
1760        let python_1 = ExternalWorker::new_spawn(&python_path(), fake_interceptor()).await?;
1761        let python_2 = python_1.clone();
1762        let runtime = Runtime::builder()
1763            .with_worker(python_1, py_loc())
1764            .await?
1765            .with_worker(python_2, py_loc_2)
1766            .await?
1767            .start();
1768
1769        let sig = runtime.signature().root;
1770        assert_eq!(
1771            sig.functions[&(TryInto::try_into("copy")?)].locations,
1772            vec![Location::local()]
1773        );
1774        assert_eq!(
1775            sig.subspaces[&(TryInto::try_into("python_nodes")?)].functions
1776                [&(TryInto::try_into("id_py")?)]
1777                .locations,
1778            vec![Location(vec![py_loc()]), Location(vec![py_loc_2])]
1779        );
1780        Ok(())
1781    }
1782
1783    #[tokio::test]
1784    async fn test_loc_clash() -> anyhow::Result<()> {
1785        let python_1 = ExternalWorker::new_spawn(&python_path(), fake_interceptor()).await?;
1786        let python_2 = python_1.clone();
1787
1788        let builder = Runtime::builder()
1789            .with_worker(python_1, py_loc())
1790            .await?
1791            .with_worker(python_2, py_loc())
1792            .await;
1793        match builder {
1794            Ok(_) => bail!("Builder should fail with two workers on the same location"),
1795            Err(e) => {
1796                assert_eq!(
1797                    e.to_string(),
1798                    anyhow::Error::from(LocationConflict(py_loc())).to_string()
1799                );
1800                Ok(())
1801            }
1802        }
1803    }
1804
1805    #[tokio::test]
1806    async fn test_stack_trace_map_eval() -> anyhow::Result<()> {
1807        use anyhow::anyhow;
1808        let (func_node, func_graph) = {
1809            let mut builder = GraphBuilder::new();
1810            let [input, output] = Graph::boundary();
1811
1812            let nod = builder.add_node("st_test")?;
1813            builder.add_edge((input, "argument"), (nod, "label"), None)?;
1814            builder.add_edge((nod, "labelled_trace"), (output, "result"), None)?;
1815            (nod, builder.build()?)
1816        };
1817        let (eval_node, map_thunk) = {
1818            let mut builder = GraphBuilder::new();
1819            let [input, output] = Graph::boundary();
1820            let func_cst = builder.add_node(Node::Const(Value::Graph(func_graph)))?;
1821            let eval_node = builder.add_node("eval")?;
1822            builder.add_edge((func_cst, "value"), (eval_node, "thunk"), None)?;
1823            builder.add_edge((input, "value"), (eval_node, "argument"), None)?;
1824            builder.add_edge((eval_node, "result"), (output, "value"), None)?;
1825            (eval_node, builder.build()?)
1826        };
1827        let (map_node, graph) = {
1828            let mut builder = GraphBuilder::new();
1829            let [input, output] = Graph::boundary();
1830            let map_func = builder.add_node(Node::Const(Value::Graph(map_thunk)))?;
1831            let map_node = builder.add_node("map")?;
1832            builder.add_edge((map_func, "value"), (map_node, "thunk"), None)?;
1833            builder.add_edge((input, "elems"), (map_node, "value"), None)?;
1834            builder.add_edge((map_node, "value"), (output, "mapped"), None)?;
1835            (map_node, builder.build()?)
1836        };
1837
1838        let arg: Label = TryInto::try_into("label").unwrap();
1839        let res: Label = TryInto::try_into("labelled_trace").unwrap();
1840        let mut t = GraphType::new();
1841        t.add_input(arg, Type::Str);
1842        t.add_output(res, Type::Str);
1843        let mut worker = LocalWorker::new();
1844        let n = tierkreis_core::prelude::TryFrom::try_from("st_test").unwrap();
1845        worker.add_function(
1846            FunctionName::builtin(n),
1847            FunctionDeclaration {
1848                type_scheme: t.into(),
1849                description: "Compare stack trace with expected string".into(),
1850                input_order: vec![arg],
1851                output_order: vec![res],
1852            },
1853            move || {
1854                RuntimeOperation::new_fn_simple(move |mut inputs, context| {
1855                    let Value::Str(s) = inputs.remove_entry(&arg).unwrap().1 else {
1856                        return Err(anyhow!("bad input"));
1857                    };
1858                    Ok(HashMap::from([(
1859                        res,
1860                        Value::Str(format!("{s} {:?}", context.graph_trace)),
1861                    )]))
1862                })
1863            },
1864        );
1865        let runtime = Runtime::builder()
1866            .with_local_functions(worker)
1867            .unwrap()
1868            .start();
1869
1870        runtime.infer_type(&graph).unwrap();
1871        let input_elems = vec![Value::Str("a".into()), Value::Str("b".into())];
1872
1873        let res = runtime
1874            .execute_graph_cb(
1875                graph.clone(),
1876                HashMap::from([(TryInto::try_into("elems").unwrap(), Value::Vec(input_elems))]),
1877                true,
1878                fake_callback(),
1879                fake_escape(),
1880            )
1881            .await
1882            .unwrap();
1883        let trace_for_elem = |i| {
1884            Into::<GraphTrace>::into(
1885                GraphTrace::Root
1886                    .inner_node(map_node)
1887                    .list_elem(i)
1888                    .inner_node(eval_node)
1889                    .inner_node(func_node),
1890            )
1891        };
1892        let expected = HashMap::from([(
1893            TryInto::try_into("mapped").unwrap(),
1894            Value::Vec(vec![
1895                Value::Str(format!("a {:?}", trace_for_elem(0))),
1896                Value::Str(format!("b {:?}", trace_for_elem(1))),
1897            ]),
1898        )]);
1899        assert_eq!(res, expected);
1900        Ok(())
1901    }
1902}