1use anyhow::anyhow;
4use operations::graph::checkpoint_client::CheckpointClient;
5
6use operations::{RuntimeOperation, TaskHandle};
7use std::collections::{HashMap, HashSet};
8use std::sync::Arc;
9use thiserror::Error;
10use tierkreis_core::{
11 graph::TypeScheme,
12 namespace::{FunctionDeclaration, NamespaceItem, Signature, SignatureError},
13 symbol::{Label, Location, LocationName},
14 type_checker::{GraphWithInputs, TypeErrors, Typeable},
15};
16use tierkreis_core::{
17 graph::{Graph, Node, Value},
18 prelude::TryInto,
19 symbol::FunctionName,
20};
21use tierkreis_proto::messages::{Callback, GraphTrace, InferTypeResponse};
22use tierkreis_proto::protos_gen::v1alpha1::signature as ps;
23use workers::{EscapeHatch, LocalWorker, Worker};
24
25#[cfg(feature = "config")]
26use serde::Deserialize;
27
28pub mod operations;
29pub(crate) mod util;
30pub mod workers;
31
32pub struct RuntimeBuilder {
34 workers: HashMap<LocationName, Box<dyn Worker>>,
35 local_functions: LocalWorker,
36 signature: Signature,
37 runtime_type_checking: RuntimeTypeChecking,
38}
39
40#[derive(Error, Debug)]
41#[error("Location {0} was defined twice")]
42struct LocationConflict(LocationName);
43
44impl RuntimeBuilder {
45 pub fn new() -> Self {
48 let mut res = RuntimeBuilder {
49 workers: HashMap::new(),
50 local_functions: LocalWorker::new(),
51 signature: Signature::default(),
52 runtime_type_checking: RuntimeTypeChecking::OnlyExternal,
53 }
54 .with_local_functions(LocalWorker::builtins())
55 .unwrap();
56 res.signature.scopes.insert(Location::local());
57 res
58 }
59
60 pub async fn with_worker<W>(mut self, worker: W, loc: LocationName) -> anyhow::Result<Self>
62 where
63 W: Worker + 'static,
64 {
65 if self.workers.contains_key(&loc) {
66 return Err(LocationConflict(loc).into());
67 }
68
69 let sig: Signature = TryInto::try_into(worker.signature(Location::local()).await?)?;
70
71 self.signature.merge_signature(sig.in_location(loc))?;
72
73 self.workers.insert(loc, Box::new(worker));
74 Ok(self)
75 }
76
77 pub fn with_local_functions(
84 mut self,
85 local_functions: LocalWorker,
86 ) -> Result<Self, SignatureError> {
87 self.signature.merge_signature(Signature {
88 root: local_functions.declarations().map(|x| NamespaceItem {
89 decl: x,
90 locations: vec![Location::local()],
91 }),
92 aliases: HashMap::new(),
93 scopes: HashSet::new(),
94 })?;
95 self.local_functions.merge(local_functions)?;
96 Ok(self)
97 }
98
99 pub fn with_checking(mut self, runtime_type_checking: RuntimeTypeChecking) -> Self {
102 self.runtime_type_checking = runtime_type_checking;
103 self
104 }
105
106 pub fn start(self) -> Runtime {
108 Runtime {
109 workers: Arc::new(self.workers),
110 local_functions: Arc::new(self.local_functions),
111 signature: Arc::new(self.signature),
112 runtime_type_checking: self.runtime_type_checking,
113 }
114 }
115}
116
117impl Default for RuntimeBuilder {
118 fn default() -> Self {
119 Self::new()
120 }
121}
122
123#[derive(Copy, Clone, Debug, Default)]
128#[cfg_attr(feature = "config", derive(Deserialize))]
129#[cfg_attr(feature = "config", serde(rename_all = "snake_case"))]
130pub enum RuntimeTypeChecking {
131 Disabled,
135 #[default]
140 OnlyExternal,
141 Enabled,
146}
147
148impl RuntimeTypeChecking {
149 fn should_type_check(&self, source_node: &Node) -> bool {
150 match self {
151 RuntimeTypeChecking::Disabled => false,
152 RuntimeTypeChecking::OnlyExternal => source_node.is_external(),
153 RuntimeTypeChecking::Enabled => true,
154 }
155 }
156}
157
158#[derive(Clone)]
166pub struct Runtime {
167 workers: Arc<HashMap<LocationName, Box<dyn Worker>>>,
168 local_functions: Arc<LocalWorker>,
169 signature: Arc<Signature>,
170 runtime_type_checking: RuntimeTypeChecking,
171}
172
173impl std::fmt::Debug for Runtime {
174 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
175 f.debug_struct("Runtime").finish()
176 }
177}
178
179impl Runtime {
180 #[must_use]
182 pub fn builder() -> RuntimeBuilder {
183 RuntimeBuilder::new()
184 }
185
186 pub fn function_declarations(&self) -> impl Iterator<Item = &FunctionDeclaration> {
188 self.signature.root.values().map(|entry| &entry.decl)
189 }
190
191 #[must_use]
193 pub fn function_declaration(&self, name: FunctionName) -> Option<&FunctionDeclaration> {
194 self.signature.root.get(&name).map(|entry| &entry.decl)
195 }
196
197 pub fn signature(&self) -> Signature {
200 self.signature.as_ref().clone()
201 }
202
203 pub async fn signature_in_loc(
205 &self,
206 loc: Location,
207 ) -> anyhow::Result<ps::ListFunctionsResponse> {
208 match loc.pop() {
209 Some((ln, rest)) => match self.workers.get(&ln) {
210 Some(x) => Ok(x.signature(rest).await?),
211 None => Err(anyhow!("Location {} not found", ln)),
212 },
213 None => Ok(self.signature().into()),
214 }
215 }
216
217 pub fn infer_type<T>(&self, to_check: &T) -> Result<(TypeScheme, T::Annotated), TypeErrors>
223 where
224 T: Typeable,
225 {
226 to_check.infer_type(&self.signature().type_schemes())
227 }
228
229 pub async fn infer_type_in_loc(
240 &self,
241 to_check: Value,
242 location: Location,
243 ) -> anyhow::Result<ps::InferTypeResponse> {
244 match location.pop() {
245 Some((ln, rest)) => {
246 self.workers
247 .get(&ln)
248 .ok_or_else(|| anyhow!("Location {} not found", ln))?
249 .to_runtime_worker()
250 .ok_or_else(|| anyhow!("Location {} is not a runtime worker", ln))?
251 .infer_type(to_check, rest)
252 .await
253 }
254 None => {
255 let resp: InferTypeResponse = self.infer_type(&to_check).into();
256 Ok(resp.into())
257 }
258 }
259 }
260
261 pub async fn execute_graph_remote(
264 &self,
265 graph: Graph,
266 inputs: HashMap<Label, Value>,
267 type_check: bool,
268 remote_loc: (LocationName, Location),
269 escape: Callback, ) -> Result<HashMap<Label, Value>, RunGraphError> {
271 let (ln, rest) = remote_loc;
272 match self
273 .workers
274 .get(&ln)
275 .ok_or_else(|| {
276 RunGraphError::RuntimeError(Arc::new(anyhow!("Location {} not found", ln)))
277 })?
278 .to_runtime_worker()
279 {
280 Some(x) => {
281 x.execute_graph(graph, inputs, rest, type_check, Some(escape))
282 .await
283 }
284 None => Err(RunGraphError::RuntimeError(Arc::new(anyhow!(
285 "Location {} is not a runtime worker",
286 ln
287 )))),
288 }
289 }
290
291 #[allow(clippy::missing_panics_doc)]
301 pub async fn execute_graph_cb(
302 &self,
303 graph: Graph,
304 inputs: HashMap<Label, Value>,
305 type_check: bool,
306 callback: Callback,
307 escape: EscapeHatch,
308 ) -> Result<HashMap<Label, Value>, RunGraphError> {
309 Ok(self
310 .start_graph(graph, inputs, type_check, callback, escape, None)?
311 .complete()
312 .await?
313 .as_ref()
314 .clone())
315 }
316
317 pub fn start_graph(
325 &self,
326 graph: Graph,
327 inputs: HashMap<Label, Value>,
328 type_check: bool,
329 callback: Callback,
330 escape: EscapeHatch,
331 checkpoint_client: Option<CheckpointClient>,
332 ) -> Result<TaskHandle, TypeErrors> {
333 let (checked_graph, checked_inputs) = if type_check {
334 let (_, gwi) = GraphWithInputs { graph, inputs }
335 .infer_type(&self.signature.as_ref().clone().type_schemes())?;
336 (gwi.graph, gwi.inputs)
337 } else {
338 (graph, inputs)
339 };
340
341 let stack_trace = GraphTrace::Root;
343
344 let handle = RuntimeOperation::new_graph(checked_graph)
345 .run_simple(
346 self.clone(),
347 callback,
348 escape,
349 checked_inputs,
350 stack_trace,
351 checkpoint_client,
352 )
353 .into_task();
354
355 Ok(handle)
356 }
357
358 #[must_use]
363 pub fn run_function_with_loc(
364 &self,
365 function: &FunctionName,
366 loc: &Location,
367 ) -> Option<RuntimeOperation> {
368 let maybe_remote = loc.clone().pop().or_else(|| {
369 self.signature
370 .root
371 .get(function)
372 .and_then(|entry| entry.locations.first())
374 .and_then(|l| l.clone().pop())
375 });
376 match maybe_remote {
377 Some((ln, rest)) => Some(
378 self.workers.get(&ln)?.spawn(function, &rest),
385 ),
386 None => self.local_functions.spawn(function),
387 }
388 }
389
390 #[must_use]
393 pub fn run_graph_remote(
394 &self,
395 graph: Graph,
396 loc: (LocationName, Location),
397 ) -> Option<RuntimeOperation> {
398 let (ln, rest) = loc;
399 Some(
400 self.workers
401 .get(&ln)?
402 .to_runtime_worker()?
403 .spawn_graph(graph, &rest),
404 )
405 }
406}
407
408#[derive(Debug, Clone, Error)]
410pub enum RunGraphError {
411 #[error("graph failed to type check")]
413 TypeError(
414 #[source]
415 #[from]
416 TypeErrors,
417 ),
418 #[error("error while running a graph")]
420 RuntimeError(
421 #[source]
422 #[from]
423 Arc<anyhow::Error>,
424 ),
425}
426
427impl From<anyhow::Error> for RunGraphError {
428 fn from(e: anyhow::Error) -> Self {
429 RunGraphError::RuntimeError(Arc::new(e))
430 }
431}
432
433#[cfg(test)]
434pub(crate) mod tests {
435 use crate::EscapeHatch;
436 use anyhow::bail;
437 use rstest::{fixture, rstest};
438 use std::{collections::HashMap, time::Duration};
439 use tierkreis_core::{
440 graph::{Graph, GraphBuilder, GraphBuilderError, GraphType, Node, Type, TypeScheme, Value},
441 namespace::FunctionDeclaration,
442 prelude::TryInto,
443 symbol::{FunctionName, Label, Location, LocationName},
444 };
445 use tierkreis_proto::messages::{Callback, GraphTrace};
446
447 use crate::{
448 operations::{OperationContext, RuntimeOperation},
449 workers::{AuthInjector, ClientInterceptor, ExternalWorker, LocalWorker},
450 LocationConflict, Runtime,
451 };
452 pub(super) fn fake_interceptor() -> ClientInterceptor {
453 ClientInterceptor::new(AuthInjector::NoAuth)
454 }
455 pub(super) fn fake_callback() -> Callback {
456 Callback {
457 uri: "https://localhost:8020".parse().unwrap(),
458 loc: Location::local(),
459 }
460 }
461 pub(super) fn fake_escape() -> EscapeHatch {
462 EscapeHatch::this_runtime(fake_callback())
463 }
464 pub(super) fn py_loc() -> LocationName {
465 TryInto::try_into("python").unwrap()
466 }
467 fn python_path() -> String {
468 format!(
469 "{}/../python/tests/test_worker/main.py",
470 env!("CARGO_MANIFEST_DIR")
471 )
472 }
473
474 #[tokio::test]
475 async fn empty_graph() -> anyhow::Result<()> {
476 let graph = GraphBuilder::new().build()?;
477 let runtime = Runtime::builder().start();
478 let outputs = runtime
479 .execute_graph_cb(graph, HashMap::new(), true, fake_callback(), fake_escape())
480 .await?;
481 assert!(outputs.is_empty());
482 Ok(())
483 }
484
485 #[tokio::test]
486 async fn direct_identity() -> anyhow::Result<()> {
487 let graph = {
488 let mut builder = GraphBuilder::new();
489 let [input, output] = Graph::boundary();
490
491 builder.add_edge((input, "value"), (output, "value"), None)?;
492 builder.build()?
493 };
494
495 let runtime = Runtime::builder().start();
496 let mut inputs = HashMap::new();
497 inputs.insert(Label::value(), Value::Int(42));
498 let outputs = runtime
499 .execute_graph_cb(graph, inputs, true, fake_callback(), fake_escape())
500 .await?;
501 assert_eq!(outputs[&Label::value()], Value::Int(42));
502 Ok(())
503 }
504
505 #[tokio::test]
506 async fn const_node() -> anyhow::Result<()> {
507 let graph = {
508 let mut builder = GraphBuilder::new();
509 let [_, output] = Graph::boundary();
510
511 let constant = builder.add_node(Value::Int(3))?;
512 builder.add_edge((constant, "value"), (output, "value"), None)?;
513 builder.build()?
514 };
515
516 let runtime = Runtime::builder().start();
517 let outputs = runtime
518 .execute_graph_cb(graph, HashMap::new(), true, fake_callback(), fake_escape())
519 .await?;
520 assert_eq!(outputs[&Label::value()], Value::Int(3));
521 Ok(())
522 }
523
524 #[tokio::test]
525 async fn boxed_const_node() -> anyhow::Result<()> {
526 let subgraph = {
527 let mut builder = GraphBuilder::new();
528 let [_, output] = Graph::boundary();
529
530 let const_ = builder.add_node(Value::Int(3))?;
531 builder.add_edge((const_, "value"), (output, "value"), None)?;
532 builder.build()?
533 };
534
535 let graph = {
536 let mut builder = GraphBuilder::new();
537 let [_, output] = Graph::boundary();
538
539 let subgraph = builder.add_node(subgraph)?;
540 builder.add_edge((subgraph, "value"), (output, "value"), None)?;
541 builder.build()?
542 };
543
544 let runtime = Runtime::builder().start();
545 let outputs = runtime
546 .execute_graph_cb(graph, HashMap::new(), true, fake_callback(), fake_escape())
547 .await
548 .unwrap();
549 assert!(matches!(outputs[&Label::value()], Value::Int(3)));
550 Ok(())
551 }
552
553 #[tokio::test]
554 async fn id_node() -> anyhow::Result<()> {
555 let graph = {
556 let mut builder = GraphBuilder::new();
557 let [input, output] = Graph::boundary();
558
559 let id = builder.add_node("id")?;
560 builder.add_edge((input, "id_in"), (id, "value"), None)?;
561 builder.add_edge((id, "value"), (output, "id_out"), None)?;
562 builder.build()?
563 };
564
565 let mut inputs = HashMap::new();
566 inputs.insert(TryInto::try_into("id_in")?, Value::Int(3));
567
568 let runtime = Runtime::builder().start();
569 let outputs = runtime
570 .execute_graph_cb(graph, inputs, true, fake_callback(), fake_escape())
571 .await?;
572
573 assert_eq!(outputs[&TryInto::try_into("id_out")?], Value::Int(3));
574
575 Ok(())
576 }
577
578 #[tokio::test]
579 async fn id_node_pair() -> anyhow::Result<()> {
580 let graph = {
581 let mut builder = GraphBuilder::new();
582 let [input, output] = Graph::boundary();
583
584 let in_ = builder.add_node("id")?;
585 let out = builder.add_node("id")?;
586 builder.add_edge((input, "in"), (in_, "value"), None)?;
587 builder.add_edge((in_, "value"), (out, "value"), None)?;
588 builder.add_edge((out, "value"), (output, "out"), None)?;
589 builder.build()?
590 };
591
592 let mut inputs = HashMap::new();
593 inputs.insert(TryInto::try_into("in")?, Value::Int(3));
594
595 let runtime = Runtime::builder().start();
596 let outputs = runtime
597 .execute_graph_cb(graph, inputs, true, fake_callback(), fake_escape())
598 .await?;
599 assert_eq!(outputs[&TryInto::try_into("out")?], Value::Int(3));
600 Ok(())
601 }
602
603 #[tokio::test]
604 async fn id_py_node() -> anyhow::Result<()> {
605 let graph = {
606 let mut builder = GraphBuilder::new();
607 let [input, output] = Graph::boundary();
608
609 let id_py = builder.add_node("python_nodes::id_py")?;
610 builder.add_edge((input, "id_in"), (id_py, "value"), None)?;
611 builder.add_edge((id_py, "value"), (output, "id_out"), None)?;
612 builder.build()?
613 };
614
615 let python = ExternalWorker::new_spawn(python_path(), fake_interceptor()).await?;
616 let runtime = Runtime::builder()
617 .with_worker(python, py_loc())
618 .await?
619 .start();
620
621 let mut inputs = HashMap::new();
622 inputs.insert(TryInto::try_into("id_in")?, Value::Int(3));
623
624 let outputs = runtime
625 .execute_graph_cb(graph, inputs, true, fake_callback(), fake_escape())
626 .await?;
627 assert_eq!(outputs[&TryInto::try_into("id_out")?], Value::Int(3));
628 Ok(())
629 }
630
631 fn add_entry() -> anyhow::Result<FunctionDeclaration> {
632 let lhs: Label = TryInto::try_into("lhs")?;
633 let rhs: Label = TryInto::try_into("rhs")?;
634 let out: Label = TryInto::try_into("out")?;
635
636 let type_scheme = TypeScheme::from({
637 let mut t = GraphType::new();
638 t.add_input(lhs, Type::Int);
639 t.add_input(rhs, Type::Int);
640 t.add_output(out, Type::Int);
641 t
642 });
643
644 Ok(FunctionDeclaration {
645 type_scheme,
646 description: "Add two numbers".to_string(),
647 input_order: vec![lhs, rhs],
648 output_order: vec![out],
649 })
650 }
651
652 async fn add_function(
653 mut inputs: HashMap<Label, Value>,
654 _context: OperationContext,
655 ) -> anyhow::Result<HashMap<Label, Value>> {
656 let lhs = match inputs.remove(&TryInto::try_into("lhs")?) {
657 Some(Value::Int(lhs)) => lhs,
658 _ => anyhow::bail!("Missing or invalid lhs input."),
659 };
660
661 let rhs = match inputs.remove(&TryInto::try_into("rhs")?) {
662 Some(Value::Int(lhs)) => lhs,
663 _ => anyhow::bail!("Missing or invalid rhs input."),
664 };
665
666 let mut output = HashMap::new();
667 output.insert(TryInto::try_into("out")?, Value::Int(lhs + rhs));
668 Ok(output)
669 }
670
671 #[tokio::test]
672 async fn add() -> anyhow::Result<()> {
673 let graph = {
674 let mut builder = GraphBuilder::new();
675 let [input, output] = Graph::boundary();
676
677 let add = builder.add_node("add")?;
678 builder.add_edge((input, "lhs"), (add, "lhs"), None)?;
679 builder.add_edge((input, "rhs"), (add, "rhs"), None)?;
680 builder.add_edge((add, "out"), (output, "out"), None)?;
681 builder.build()?
682 };
683
684 let mut inputs = HashMap::new();
685 inputs.insert(TryInto::try_into("lhs")?, Value::Int(3));
686 inputs.insert(TryInto::try_into("rhs")?, Value::Int(1));
687
688 let mut worker = LocalWorker::new();
689 let add = add_entry()?;
690 worker.add_function(
691 FunctionName::builtin(TryInto::try_into("add")?),
692 add,
693 || RuntimeOperation::new_fn_async(add_function),
694 );
695
696 let runtime = Runtime::builder().with_local_functions(worker)?.start();
697 let outputs = runtime
698 .execute_graph_cb(graph, inputs, true, fake_callback(), fake_escape())
699 .await?;
700 assert_eq!(outputs[&TryInto::try_into("out")?], Value::Int(4));
701 Ok(())
702 }
703
704 #[tokio::test]
705 async fn eval_add() -> anyhow::Result<()> {
706 let [input, output] = Graph::boundary();
707 let add_graph = {
708 let mut builder = GraphBuilder::new();
709
710 let add = builder.add_node("add")?;
711 let id = builder.add_node("id")?;
712 builder.add_edge((input, "id_in"), (id, "value"), None)?;
713 builder.add_edge((input, "lhs"), (add, "lhs"), None)?;
714 builder.add_edge((input, "rhs"), (add, "rhs"), None)?;
715 builder.add_edge((id, "value"), (output, "id_out"), None)?;
716 builder.add_edge((add, "out"), (output, "out"), None)?;
717 builder.build()?
718 };
719
720 let eval_graph = {
721 let mut builder = GraphBuilder::new();
722 let add_thunk_const = builder.add_node(Value::Graph(add_graph))?;
723 let eval_add = builder.add_node("eval")?;
724 builder.add_edge((add_thunk_const, "value"), (eval_add, "thunk"), None)?;
725 builder.add_edge((input, "x"), (eval_add, "lhs"), None)?;
726 builder.add_edge((input, "y"), (eval_add, "rhs"), None)?;
727 builder.add_edge((input, "z"), (eval_add, "id_in"), None)?;
728 builder.add_edge((eval_add, "out"), (output, "add_out"), None)?;
729 builder.add_edge((eval_add, "id_out"), (output, "id_out"), None)?;
730 builder.build()?
731 };
732
733 let mut inputs = HashMap::new();
734 inputs.insert(TryInto::try_into("x")?, Value::Int(5));
735 inputs.insert(TryInto::try_into("y")?, Value::Int(7));
736 inputs.insert(TryInto::try_into("z")?, Value::Bool(true));
737
738 let mut worker = LocalWorker::new();
739 let add = add_entry()?;
740 worker.add_function(
741 FunctionName::builtin(TryInto::try_into("add")?),
742 add,
743 || RuntimeOperation::new_fn_async(add_function),
744 );
745
746 let runtime = Runtime::builder().with_local_functions(worker)?.start();
747 let outputs = runtime
748 .execute_graph_cb(eval_graph, inputs, true, fake_callback(), fake_escape())
749 .await?;
750
751 assert_eq!(outputs[&TryInto::try_into("add_out")?], Value::Int(12));
752 assert_eq!(outputs[&TryInto::try_into("id_out")?], Value::Bool(true));
753
754 Ok(())
755 }
756
757 #[tokio::test]
758 async fn test_pair() -> anyhow::Result<()> {
759 let graph = {
760 let mut builder = GraphBuilder::new();
761 let [input, output] = Graph::boundary();
762
763 let make_pair = builder.add_node("make_pair")?;
764 builder.add_edge((input, "one"), (make_pair, "first"), None)?;
765 builder.add_edge((input, "two"), (make_pair, "second"), None)?;
766 builder.add_edge((make_pair, "pair"), (output, "out"), None)?;
767 builder.build()?
768 };
769
770 let mut inputs = HashMap::new();
771 inputs.insert(TryInto::try_into("one")?, Value::Int(5));
772 inputs.insert(TryInto::try_into("two")?, Value::Bool(true));
773
774 let runtime = Runtime::builder().start();
775 let outputs = runtime
776 .execute_graph_cb(graph, inputs, true, fake_callback(), fake_escape())
777 .await?;
778
779 assert_eq!(
780 outputs[&TryInto::try_into("out")?],
781 Value::Pair(Box::new((Value::Int(5), Value::Bool(true))))
782 );
783
784 Ok(())
785 }
786
787 #[tokio::test]
795 async fn test_pair_unpair() -> anyhow::Result<()> {
796 let graph = {
797 let mut builder = GraphBuilder::new();
798 let [input, output] = Graph::boundary();
799
800 let make_pair = builder.add_node("make_pair")?;
801 let unpack_pair = builder.add_node("unpack_pair")?;
802 builder.add_edge((input, "one"), (make_pair, "first"), None)?;
803 builder.add_edge((input, "two"), (make_pair, "second"), None)?;
804 builder.add_edge((make_pair, "pair"), (unpack_pair, "pair"), None)?;
805 builder.add_edge((unpack_pair, "first"), (output, "one"), None)?;
806 builder.add_edge((unpack_pair, "second"), (output, "two"), None)?;
807 builder.build()?
808 };
809
810 let mut inputs = HashMap::new();
811 inputs.insert(TryInto::try_into("one")?, Value::Int(4));
812 inputs.insert(TryInto::try_into("two")?, Value::Bool(false));
813
814 let runtime = Runtime::builder().start();
815 let outputs = runtime
816 .execute_graph_cb(graph, inputs.clone(), true, fake_callback(), fake_escape())
817 .await
818 .unwrap();
819
820 assert_eq!(inputs, outputs);
821
822 Ok(())
823 }
824
825 #[tokio::test]
826 async fn test_vec() -> anyhow::Result<()> {
827 let graph = {
828 let mut builder = GraphBuilder::new();
829 let [input, output] = Graph::boundary();
830
831 let make_list = builder.add_node("push")?;
832 builder.add_edge((input, "item"), (make_list, "item"), None)?;
833 builder.add_edge((input, "vec"), (make_list, "vec"), None)?;
834 builder.add_edge((make_list, "vec"), (output, "out"), None)?;
835 builder.build()?
836 };
837
838 let runtime = Runtime::builder().start();
839 let values = vec![Value::Int(1), Value::Int(2), Value::Int(3)];
840
841 let mut inputs = HashMap::new();
842 inputs.insert(
843 TryInto::try_into("vec")?,
844 Value::Vec(vec![values[0].clone(), values[1].clone()]),
845 );
846 inputs.insert(TryInto::try_into("item")?, values[2].clone());
847
848 let outputs = runtime
849 .execute_graph_cb(graph, inputs, true, fake_callback(), fake_escape())
850 .await?;
851
852 assert_eq!(outputs[&TryInto::try_into("out")?], Value::Vec(values));
853 Ok(())
854 }
855
856 #[tokio::test]
864 async fn test_pop() -> anyhow::Result<()> {
865 let graph = {
866 let mut builder = GraphBuilder::new();
867 let [input, output] = Graph::boundary();
868
869 let make_list = builder.add_node("push")?;
870 let unpack_list = builder.add_node("pop")?;
871 builder.add_edge((input, "item"), (make_list, "item"), None)?;
872 builder.add_edge((input, "vec"), (make_list, "vec"), None)?;
873 builder.add_edge((make_list, "vec"), (unpack_list, "vec"), None)?;
874 builder.add_edge((unpack_list, "item"), (output, "item"), None)?;
875 builder.add_edge((unpack_list, "vec"), (output, "vec"), None)?;
876 builder.build()?
877 };
878
879 let mut inputs = HashMap::new();
880 inputs.insert(TryInto::try_into("item")?, Value::Int(4));
881 inputs.insert(
882 TryInto::try_into("vec")?,
883 Value::Vec(vec![Value::Int(3), Value::Int(2)]),
884 );
885
886 let runtime = Runtime::builder().start();
887 let outputs = runtime
888 .execute_graph_cb(graph, inputs.clone(), true, fake_callback(), fake_escape())
889 .await
890 .unwrap();
891
892 assert_eq!(inputs, outputs);
893 Ok(())
894 }
895
896 #[tokio::test]
904 async fn test_match() -> anyhow::Result<()> {
905 let [input, output] = Graph::boundary();
906 let foo_graph = {
908 let mut builder = GraphBuilder::new();
909
910 let inc = builder.add_node("iadd")?;
911 builder.add_edge((input, "value"), (inc, "a"), None)?;
912 builder.add_edge((input, "xtra"), (inc, "b"), None)?;
913 builder.add_edge((inc, "value"), (output, "value"), None)?;
914 builder.build()?
915 };
916 let bar_graph = {
917 let mut builder = GraphBuilder::new();
918 let pop = builder.add_node("pop")?;
919 let d1 = builder.add_node("discard")?;
920 builder.add_edge((input, "value"), (pop, "vec"), None)?;
921 builder.add_edge((pop, "vec"), (d1, "value"), None)?;
922 builder.add_edge((pop, "item"), (output, "value"), None)?;
923 let d2 = builder.add_node("discard")?;
924 builder.add_edge((input, "xtra"), (d2, "value"), None)?;
925 builder.build()?
926 };
927 let match_graph = {
928 let mut builder = GraphBuilder::new();
929 let n = builder.add_node(Node::Match)?;
930 let ev = builder.add_node("eval")?;
931 let foo_g = builder.add_node(Value::Graph(foo_graph))?;
932 let bar_g = builder.add_node(Value::Graph(bar_graph))?;
933 builder.add_edge((input, "arg"), (n, Label::variant_value()), None)?;
934 builder.add_edge((foo_g, "value"), (n, "foo"), None)?;
935 builder.add_edge((bar_g, "value"), (n, "bar"), None)?;
936 builder.add_edge((n, "thunk"), (ev, "thunk"), None)?;
937 builder.add_edge((ev, "value"), (output, "value"), None)?;
938 builder.add_edge((input, "if_foo"), (ev, "xtra"), None)?;
939 builder.build()?
940 };
941
942 let runtime = Runtime::builder().start();
943
944 let arg = Value::Variant(TryInto::try_into("foo")?, Box::new(Value::Int(3)));
946 let inputs = HashMap::from([
947 (TryInto::try_into("arg")?, arg),
948 (TryInto::try_into("if_foo")?, Value::Int(5)),
949 ]);
950 let outputs = runtime
951 .execute_graph_cb(
952 match_graph.clone(),
953 inputs,
954 true,
955 fake_callback(),
956 fake_escape(),
957 )
958 .await?;
959 assert_eq!(outputs, HashMap::from([(Label::value(), Value::Int(8))]));
960
961 let v = Value::Vec(vec![Value::Int(31), Value::Int(42)]);
963 let graph2 = {
964 let mut builder = GraphBuilder::new();
965 let vec = builder.add_node(v)?;
966 let mkv = builder.add_node(Node::Tag(TryInto::try_into("bar")?))?;
967 builder.add_edge((vec, "value"), (mkv, "value"), None)?;
968 let m = builder.add_node(match_graph.clone())?; builder.add_edge((mkv, "value"), (m, "arg"), None)?;
970 let c = builder.add_node(Value::Int(101))?;
971 builder.add_edge((c, "value"), (m, "if_foo"), None)?;
972 builder.add_edge((m, "value"), (output, "result"), None)?;
973 builder.build()?
974 };
975 let outputs = runtime
976 .execute_graph_cb(
977 graph2,
978 HashMap::from([]),
979 true,
980 fake_callback(),
981 fake_escape(),
982 )
983 .await?;
984 assert_eq!(
985 outputs,
986 HashMap::from([(TryInto::try_into("result")?, Value::Int(42))])
987 );
988
989 Ok(())
990 }
991
992 #[tokio::test]
993 async fn test_switch() -> anyhow::Result<()> {
994 let graph = {
995 let mut builder = GraphBuilder::new();
996 let [input, output] = Graph::boundary();
997
998 let switch = builder.add_node("switch")?;
999 builder.add_edge((input, "true"), (switch, "if_true"), None)?;
1000 builder.add_edge((input, "false"), (switch, "if_false"), None)?;
1001 builder.add_edge((input, "predicate"), (switch, "pred"), None)?;
1002 builder.add_edge((switch, "value"), (output, "out"), None)?;
1003 builder.build()?
1004 };
1005
1006 let runtime = Runtime::builder().start();
1007
1008 let mut inputs = HashMap::new();
1010 inputs.insert(TryInto::try_into("predicate")?, Value::Bool(true));
1011 inputs.insert(TryInto::try_into("true")?, Value::Int(4));
1012 inputs.insert(TryInto::try_into("false")?, Value::Int(5));
1013
1014 let outputs = runtime
1015 .execute_graph_cb(graph.clone(), inputs, true, fake_callback(), fake_escape())
1016 .await
1017 .unwrap();
1018 assert_eq!(outputs[&TryInto::try_into("out")?], Value::Int(4));
1019
1020 let mut inputs = HashMap::new();
1022 inputs.insert(TryInto::try_into("predicate")?, Value::Bool(false));
1023 inputs.insert(TryInto::try_into("true")?, Value::Int(4));
1024 inputs.insert(TryInto::try_into("false")?, Value::Int(5));
1025
1026 let outputs = runtime
1027 .execute_graph_cb(graph, inputs, true, fake_callback(), fake_escape())
1028 .await?;
1029 assert_eq!(outputs[&TryInto::try_into("out")?], Value::Int(5));
1030 Ok(())
1031 }
1032
1033 #[tokio::test]
1034 async fn test_loop() -> anyhow::Result<()> {
1035 let stopval = 3;
1036
1037 let [input, output] = Graph::boundary();
1038 let lt_graph = {
1039 let mut builder = GraphBuilder::new();
1040
1041 let lt = builder.add_node("ilt")?;
1042 let stop_val = builder.add_node(Value::Int(stopval))?;
1043
1044 builder.add_edge((input, "number"), (lt, "a"), None)?;
1045 builder.add_edge((stop_val, "value"), (lt, "b"), None)?;
1046 builder.add_edge((lt, "value"), (output, "pred"), None)?;
1047
1048 builder.build()?
1049 };
1050
1051 let break_graph = {
1052 let mut builder = GraphBuilder::new();
1053 let makest = builder.add_node("make_struct")?;
1054 builder.add_edge((input, "number"), (makest, "iter"), None)?;
1055 builder.add_edge((input, "accum"), (makest, "final"), None)?;
1056
1057 let brk = builder.add_node(Node::Tag(Label::break_()))?;
1058 builder.add_edge((makest, "struct"), (brk, "value"), None)?;
1059 builder.add_edge((brk, "value"), (output, "value"), None)?;
1060 builder.build()?
1061 };
1062
1063 let update_graph = {
1064 let mut builder = GraphBuilder::new();
1065 let numbers = builder.add_node("copy")?;
1066 builder.add_edge((input, "number"), (numbers, "value"), None)?;
1067 let makest = builder.add_node("make_struct")?;
1068
1069 let add1 = builder.add_node("iadd")?;
1070 let increment = builder.add_node(Value::Int(1))?;
1071 builder.add_edge((numbers, "value_0"), (add1, "b"), None)?;
1072 builder.add_edge((increment, "value"), (add1, "a"), None)?;
1073 builder.add_edge((add1, "value"), (makest, "number"), None)?;
1074
1075 let add2 = builder.add_node("iadd")?;
1076 builder.add_edge((input, "accum"), (add2, "a"), None)?;
1077 builder.add_edge((numbers, "value_1"), (add2, "b"), None)?;
1078 builder.add_edge((add2, "value"), (makest, "accum"), None)?;
1079
1080 let update = builder.add_node(Node::Tag(Label::continue_()))?;
1081 builder.add_edge((makest, "struct"), (update, "value"), None)?;
1082 builder.add_edge((update, "value"), (output, "value"), None)?;
1083
1084 builder.build()?
1085 };
1086
1087 let body_graph = {
1088 let mut builder = GraphBuilder::new();
1089 let inputs = builder.add_node("unpack_struct")?;
1090 builder.add_edge((input, Label::value()), (inputs, "struct"), None)?;
1091 let switch = builder.add_node("switch")?;
1092 let copy = builder.add_node("copy")?;
1093 let eval = builder.add_node("eval")?;
1094 let condition = builder.add_node(Node::local_box(lt_graph))?;
1095 let update = builder.add_node(Value::Graph(update_graph))?;
1096 let return_ = builder.add_node(Value::Graph(break_graph))?;
1097
1098 builder.add_edge((inputs, "number"), (copy, "value"), None)?;
1099 builder.add_edge((copy, "value_0"), (condition, "number"), None)?;
1100
1101 builder.add_edge((update, "value"), (switch, "if_true"), None)?;
1102 builder.add_edge((return_, "value"), (switch, "if_false"), None)?;
1103 builder.add_edge((condition, "pred"), (switch, "pred"), None)?;
1104
1105 builder.add_edge((switch, "value"), (eval, "thunk"), None)?;
1106 builder.add_edge((inputs, "accum"), (eval, "accum"), None)?;
1107 builder.add_edge((copy, "value_1"), (eval, "number"), None)?;
1108
1109 builder.add_edge((eval, "value"), (output, "value"), None)?;
1110
1111 builder.build()?
1112 };
1113 let graph = {
1114 let mut builder = GraphBuilder::new();
1115 let init = builder.add_node("make_struct")?;
1116 builder.add_edge((input, "initial"), (init, "number"), None)?;
1117 builder.add_edge((input, "extra"), (init, "accum"), None)?;
1118
1119 let loop_ = builder.add_node("loop")?;
1120 builder.add_edge((init, "struct"), (loop_, Label::value()), None)?;
1121
1122 let body = builder.add_node(Value::Graph(body_graph))?;
1123
1124 builder.add_edge((body, "value"), (loop_, "body"), None)?;
1125
1126 let outs = builder.add_node("unpack_struct")?;
1127 builder.add_edge((loop_, Label::value()), (outs, "struct"), None)?;
1128
1129 builder.add_edge((outs, "iter"), (output, "iter"), None)?;
1130 builder.add_edge((outs, "final"), (output, "final"), None)?;
1131 builder.build()?
1132 };
1133
1134 let python = ExternalWorker::new_spawn(python_path(), fake_interceptor()).await?;
1135 let runtime = Runtime::builder()
1136 .with_worker(python, py_loc())
1137 .await?
1138 .start();
1139
1140 async fn run_loop(
1141 runtime: &Runtime,
1142 graph: crate::Graph,
1143 initial: i64,
1144 extra: i64,
1145 ) -> anyhow::Result<(i64, i64)> {
1146 let inputs = HashMap::from([
1147 (TryInto::try_into("initial")?, Value::Int(initial)),
1148 (TryInto::try_into("extra")?, Value::Int(extra)),
1149 ]);
1150 let outputs = runtime
1151 .execute_graph_cb(graph, inputs, true, fake_callback(), fake_escape())
1152 .await?;
1153 let iter = outputs.get(&TryInto::try_into("iter")?);
1154 let final_ = outputs.get(&TryInto::try_into("final")?);
1155 match (iter, final_, outputs.len()) {
1156 (Some(Value::Int(iter)), Some(Value::Int(final_)), 2) => Ok((*iter, *final_)),
1157 _ => bail!(
1158 "Should have returned two ints iter&final, not {:?}",
1159 outputs
1160 ),
1161 }
1162 }
1163
1164 assert_eq!(run_loop(&runtime, graph.clone(), 1, 5).await?, (3, 8));
1165
1166 assert_eq!(run_loop(&runtime, graph.clone(), -1, 0).await?, (3, 2));
1167
1168 assert_eq!(run_loop(&runtime, graph.clone(), 5, 10).await?, (5, 10));
1169
1170 Ok(())
1171 }
1172
1173 #[tokio::test]
1175 async fn test_sequence() -> anyhow::Result<()> {
1176 let [input, output] = Graph::boundary();
1177 let unpack = {
1178 let mut builder = GraphBuilder::new();
1179
1180 let unpack = builder.add_node("unpack_pair")?;
1181
1182 builder.add_edge((input, "p"), (unpack, "pair"), None)?;
1183 builder.add_edge((unpack, "first"), (output, "f"), None)?;
1184 builder.add_edge((unpack, "second"), (output, "s"), None)?;
1185
1186 builder.build()?
1187 };
1188
1189 let pack = {
1190 let mut builder = GraphBuilder::new();
1191 let pack = builder.add_node("make_pair")?;
1192
1193 builder.add_edge((input, "f"), (pack, "first"), None)?;
1194 builder.add_edge((input, "s"), (pack, "second"), None)?;
1195 builder.add_edge((pack, "pair"), (output, "p"), None)?;
1196
1197 builder.build()?
1198 };
1199
1200 let seq_eval = {
1201 let mut builder = GraphBuilder::new();
1202 let c1 = builder.add_node(Node::Const(Value::Graph(unpack)))?;
1203 let c2 = builder.add_node(Node::Const(Value::Graph(pack)))?;
1204 let seq = builder.add_node("sequence")?;
1205
1206 builder.add_edge((c1, "value"), (seq, "first"), None)?;
1207 builder.add_edge((c2, "value"), (seq, "second"), None)?;
1208
1209 let eval = builder.add_node("eval")?;
1210
1211 builder.add_edge((seq, "sequenced"), (eval, "thunk"), None)?;
1212 builder.add_edge((input, "p"), (eval, "p"), None)?;
1213 builder.add_edge((eval, "p"), (output, "p"), None)?;
1214
1215 builder.build()?
1216 };
1217
1218 let runtime = Runtime::builder().start();
1219
1220 let mut inputs = HashMap::new();
1221 inputs.insert(
1222 TryInto::try_into("p")?,
1223 Value::Pair(Box::new((Value::Int(1), Value::Str("word".into())))),
1224 );
1225
1226 let outputs = runtime
1227 .execute_graph_cb(
1228 seq_eval,
1229 inputs.clone(),
1230 true,
1231 fake_callback(),
1232 fake_escape(),
1233 )
1234 .await
1235 .unwrap();
1236 assert_eq!(outputs, inputs);
1237
1238 Ok(())
1239 }
1240
1241 fn discard_return() -> Result<Graph, GraphBuilderError> {
1246 let mut builder = GraphBuilder::new();
1247 let [input, output] = Graph::boundary();
1248
1249 builder.add_edge((input, "keep"), (output, "res"), None)?;
1250 let disc = builder.add_node("discard")?;
1251 builder.add_edge((input, "ignore"), (disc, "value"), None)?;
1252 builder.build()
1253 }
1254
1255 #[tokio::test(flavor = "multi_thread")]
1256 async fn test_box() -> anyhow::Result<()> {
1257 let outer = {
1258 let mut builder = GraphBuilder::new();
1259 let [input, output] = Graph::boundary();
1260
1261 let inner = builder.add_node(Node::local_box(discard_return()?))?;
1262 let cst5 = builder.add_node(Value::Int(5))?;
1263 builder.add_edge((cst5, Label::value()), (inner, "keep"), None)?;
1264 builder.add_edge((input, "arg"), (inner, "ignore"), None)?;
1265 builder.add_edge((inner, "res"), (output, "value"), None)?;
1266 builder.build()?
1267 };
1268
1269 let runtime = Runtime::builder().start();
1270 let inputs = HashMap::from([(TryInto::try_into("arg")?, Value::Float(3.1))]);
1271 let expected_outputs = HashMap::from([(TryInto::try_into("value")?, Value::Int(5))]);
1272
1273 let outputs = runtime
1274 .execute_graph_cb(outer, inputs.clone(), true, fake_callback(), fake_escape())
1275 .await?;
1276 assert_eq!(outputs, expected_outputs);
1277
1278 Ok(())
1279 }
1280
1281 #[tokio::test(flavor = "multi_thread")]
1282 async fn test_partial() -> anyhow::Result<()> {
1283 let outer = {
1284 let mut builder = GraphBuilder::new();
1285 let [input, output] = Graph::boundary();
1286
1287 let inner = builder.add_node(Value::Graph(discard_return()?))?;
1288 let cst5 = builder.add_node(Value::Int(5))?;
1289
1290 let part = builder.add_node("partial")?;
1291 builder.add_edge((inner, Label::value()), (part, Label::thunk()), None)?;
1292 builder.add_edge((cst5, Label::value()), (part, "keep"), None)?;
1293
1294 let ev = builder.add_node("eval")?;
1295 builder.add_edge((part, Label::value()), (ev, "thunk"), None)?;
1296 builder.add_edge((input, "arg"), (ev, "ignore"), None)?;
1297 builder.add_edge((ev, "res"), (output, "value"), None)?;
1298 builder.build()?
1299 };
1300
1301 let runtime = Runtime::builder().start();
1302 let inputs = HashMap::from([(TryInto::try_into("arg")?, Value::Float(3.1))]);
1303 let expected_outputs = HashMap::from([(TryInto::try_into("value")?, Value::Int(5))]);
1304
1305 let outputs = runtime
1306 .execute_graph_cb(outer, inputs, true, fake_callback(), fake_escape())
1307 .await?;
1308 assert_eq!(outputs, expected_outputs);
1309
1310 Ok(())
1311 }
1312
1313 #[tokio::test]
1315 async fn test_struct() -> anyhow::Result<()> {
1316 let graph = {
1317 let mut builder = GraphBuilder::new();
1318 let [input, output] = Graph::boundary();
1319
1320 let unpack = builder.add_node("unpack_struct")?;
1321 let pack = builder.add_node("make_struct")?;
1322
1323 builder.add_edge((input, "st"), (unpack, "struct"), None)?;
1324 builder.add_edge((unpack, "x"), (pack, "x"), None)?;
1325 builder.add_edge((unpack, "y"), (pack, "y"), None)?;
1326 builder.add_edge((pack, "struct"), (output, "st"), None)?;
1327
1328 builder.build()?
1329 };
1330
1331 let runtime = Runtime::builder().start();
1332 let mut fields = HashMap::new();
1333 fields.insert(TryInto::try_into("x")?, Value::Int(1));
1334 fields.insert(TryInto::try_into("y")?, Value::Float(2.3));
1335
1336 let mut inputs = HashMap::new();
1337 inputs.insert(TryInto::try_into("st")?, Value::Struct(fields));
1338
1339 let outputs = runtime
1340 .execute_graph_cb(graph, inputs.clone(), true, fake_callback(), fake_escape())
1341 .await
1342 .unwrap();
1343 assert_eq!(outputs, inputs);
1344
1345 Ok(())
1346 }
1347
1348 #[tokio::test]
1350 async fn test_map() -> anyhow::Result<()> {
1351 let graph = {
1352 let mut builder = GraphBuilder::new();
1353 let [input, output] = Graph::boundary();
1354
1355 let insert = builder.add_node("insert_key")?;
1356 let remove = builder.add_node("remove_key")?;
1357 let copy = builder.add_node("copy")?;
1358
1359 builder.add_edge((input, "mp"), (insert, "map"), None)?;
1360 builder.add_edge((input, "v"), (insert, "val"), None)?;
1361 builder.add_edge((input, "k"), (copy, "value"), None)?;
1362 builder.add_edge((copy, "value_0"), (insert, "key"), None)?;
1363
1364 builder.add_edge((insert, "map"), (remove, "map"), None)?;
1365 builder.add_edge((copy, "value_1"), (remove, "key"), None)?;
1366 builder.add_edge((remove, "map"), (output, "mp"), None)?;
1367 builder.add_edge((remove, "val"), (output, "vl"), None)?;
1368
1369 builder.build()?
1370 };
1371
1372 let runtime = Runtime::builder().start();
1373 let mut map = HashMap::new();
1374 map.insert(Value::Str("x".into()), Value::Float(2.3));
1375
1376 let insert_key = Value::Str("y".into());
1377 let insert_val = Value::Float(1.2);
1378
1379 let mut inputs = HashMap::new();
1380 inputs.insert(TryInto::try_into("mp")?, Value::Map(map.clone()));
1381 inputs.insert(TryInto::try_into("k")?, insert_key);
1382 inputs.insert(TryInto::try_into("v")?, insert_val.clone());
1383
1384 let outputs = runtime
1385 .execute_graph_cb(graph, inputs.clone(), true, fake_callback(), fake_escape())
1386 .await
1387 .unwrap();
1388
1389 let mut expected_outputs = HashMap::new();
1390 expected_outputs.insert(TryInto::try_into("mp")?, Value::Map(map));
1391 expected_outputs.insert(TryInto::try_into("vl")?, insert_val);
1392 assert_eq!(outputs, expected_outputs);
1393
1394 Ok(())
1395 }
1396
1397 #[tokio::test]
1398 async fn test_equality() -> anyhow::Result<()> {
1399 let graph = {
1400 let mut builder = GraphBuilder::new();
1401 let [input, output] = Graph::boundary();
1402
1403 let equality = builder.add_node("eq")?;
1404
1405 builder.add_edge((input, "v0"), (equality, "value_0"), None)?;
1406 builder.add_edge((input, "v1"), (equality, "value_1"), None)?;
1407
1408 builder.add_edge((equality, "result"), (output, "p"), None)?;
1409
1410 builder.build()?
1411 };
1412
1413 let runtime = Runtime::builder().start();
1414
1415 let mut inputs = HashMap::new();
1416 inputs.insert(TryInto::try_into("v0")?, Value::Int(1));
1417 inputs.insert(TryInto::try_into("v1")?, Value::Int(1));
1418
1419 let outputs = runtime
1420 .execute_graph_cb(
1421 graph.clone(),
1422 inputs.clone(),
1423 true,
1424 fake_callback(),
1425 fake_escape(),
1426 )
1427 .await
1428 .unwrap();
1429
1430 assert_eq!(
1431 outputs.get(&TryInto::try_into("p")?),
1432 Some(&Value::Bool(true))
1433 );
1434
1435 let mut inputs = HashMap::new();
1436 inputs.insert(TryInto::try_into("v0")?, Value::Str("foo".into()));
1437 inputs.insert(TryInto::try_into("v1")?, Value::Str("foof".into()));
1438
1439 let outputs = runtime
1440 .execute_graph_cb(graph, inputs.clone(), true, fake_callback(), fake_escape())
1441 .await
1442 .unwrap();
1443
1444 assert_eq!(
1445 outputs.get(&TryInto::try_into("p")?),
1446 Some(&Value::Bool(false))
1447 );
1448 Ok(())
1449 }
1450
1451 #[tokio::test]
1453 async fn test_parallel() -> anyhow::Result<()> {
1454 let [input, output] = Graph::boundary();
1455 let graph1 = {
1456 let mut builder = GraphBuilder::new();
1457
1458 let c1 = builder.add_node(Node::Const(Value::Int(1)))?;
1459 let add = builder.add_node("iadd")?;
1460
1461 builder.add_edge((c1, "value"), (add, "b"), None)?;
1462 builder.add_edge((input, "vl"), (add, "a"), None)?;
1463 builder.add_edge((add, "value"), (output, "vl"), None)?;
1464
1465 builder.build()?
1466 };
1467
1468 let graph2 = {
1469 let mut builder = GraphBuilder::new();
1470 let c1 = builder.add_node(Node::Const(Value::Str("word2".into())))?;
1471 builder.add_edge((input, "vr"), (output, "vr_0"), None)?;
1472 builder.add_edge((c1, "value"), (output, "vr_1"), None)?;
1473
1474 builder.build()?
1475 };
1476
1477 let graph_par = {
1478 let mut builder = GraphBuilder::new();
1479
1480 let c1 = builder.add_node(Node::Const(Value::Graph(graph1)))?;
1481 let c2 = builder.add_node(Node::Const(Value::Graph(graph2)))?;
1482 let par = builder.add_node("parallel")?;
1483 let eval = builder.add_node("eval")?;
1484
1485 builder.add_edge((c1, "value"), (par, "left"), None)?;
1486 builder.add_edge((c2, "value"), (par, "right"), None)?;
1487 builder.add_edge((par, "value"), (eval, "thunk"), None)?;
1488 builder.add_edge((input, "value_0"), (eval, "vl"), None)?;
1489 builder.add_edge((input, "value_1"), (eval, "vr"), None)?;
1490 builder.add_edge((eval, "vl"), (output, "value_0"), None)?;
1491 builder.add_edge((eval, "vr_0"), (output, "value_1"), None)?;
1492 builder.add_edge((eval, "vr_1"), (output, "value_2"), None)?;
1493
1494 builder.build()?
1495 };
1496
1497 let runtime = Runtime::builder().start();
1498
1499 let inputs = HashMap::from([
1500 (TryInto::try_into("value_0")?, Value::Int(2)),
1501 (TryInto::try_into("value_1")?, Value::Str("word".into())),
1502 ]);
1503
1504 let outputs = runtime
1505 .execute_graph_cb(graph_par, inputs, true, fake_callback(), fake_escape())
1506 .await?;
1507
1508 assert_eq!(
1509 outputs.get(&TryInto::try_into("value_0")?),
1510 Some(&Value::Int(3))
1511 );
1512 assert_eq!(
1513 outputs.get(&TryInto::try_into("value_1")?),
1514 Some(&Value::Str("word".into()))
1515 );
1516 assert_eq!(
1517 outputs.get(&TryInto::try_into("value_2")?),
1518 Some(&Value::Str("word2".into()))
1519 );
1520
1521 Ok(())
1522 }
1523
1524 #[tokio::test]
1525 async fn test_map_builtin() -> anyhow::Result<()> {
1526 let [input, output] = Graph::boundary();
1527 let thunk = {
1528 let mut builder = GraphBuilder::new();
1529
1530 let c1 = builder.add_node(Node::Const(Value::Int(2)))?;
1531 let mul = builder.add_node("imul")?;
1532
1533 builder.add_edge((c1, Label::value()), (mul, "b"), None)?;
1534 builder.add_edge((input, Label::value()), (mul, "a"), None)?;
1535 builder.add_edge((mul, Label::value()), (output, Label::value()), None)?;
1536
1537 builder.build()?
1538 };
1539
1540 let graph = {
1541 let mut builder = GraphBuilder::new();
1542
1543 let c1 = builder.add_node(Node::Const(Value::Graph(thunk)))?;
1544 let map = builder.add_node("map")?;
1545
1546 builder.add_edge((c1, Label::value()), (map, Label::thunk()), None)?;
1547 builder.add_edge((input, Label::value()), (map, Label::value()), None)?;
1548 builder.add_edge((map, Label::value()), (output, Label::value()), None)?;
1549
1550 builder.build()?
1551 };
1552
1553 let input_vec = Value::Vec(vec![Value::Int(1), Value::Int(2), Value::Int(3)]);
1554
1555 let runtime = Runtime::builder().start();
1556
1557 let inputs = HashMap::from([(Label::value(), input_vec)]);
1558
1559 let outputs = runtime
1560 .execute_graph_cb(graph, inputs, true, fake_callback(), fake_escape())
1561 .await?;
1562
1563 println!("{:?}", outputs);
1564
1565 assert_eq!(
1566 outputs.get(&Label::value()),
1567 Some(&Value::Vec(vec![
1568 Value::Int(2),
1569 Value::Int(4),
1570 Value::Int(6)
1571 ]))
1572 );
1573
1574 Ok(())
1575 }
1576
1577 #[tokio::test]
1578 async fn test_map_concurrent() -> anyhow::Result<()> {
1579 let [input, output] = Graph::boundary();
1580 let thunk = {
1581 let mut builder = GraphBuilder::new();
1582
1583 let c1 = builder.add_node(Value::Float(1.0))?;
1584 let sleep = builder.add_node("sleep")?;
1585
1586 builder.add_edge((c1, Label::value()), (sleep, "delay_secs"), None)?;
1587 builder.add_edge((input, Label::value()), (sleep, Label::value()), None)?;
1588 builder.add_edge((sleep, Label::value()), (output, Label::value()), None)?;
1589
1590 builder.build()?
1591 };
1592
1593 let graph = {
1594 let mut builder = GraphBuilder::new();
1595
1596 let c1 = builder.add_node(Node::Const(Value::Graph(thunk)))?;
1597 let map = builder.add_node("map")?;
1598
1599 builder.add_edge((c1, Label::value()), (map, Label::thunk()), None)?;
1600 builder.add_edge((input, Label::value()), (map, Label::value()), None)?;
1601 builder.add_edge((map, Label::value()), (output, Label::value()), None)?;
1602
1603 builder.build()?
1604 };
1605
1606 let input_vec = Value::Vec(vec![
1607 Value::Int(1),
1608 Value::Int(2),
1609 Value::Int(3),
1610 Value::Int(4),
1611 Value::Int(5),
1612 ]);
1613
1614 let runtime = Runtime::builder()
1615 .with_worker(
1616 ExternalWorker::new_spawn(python_path(), fake_interceptor()).await?,
1617 py_loc(),
1618 )
1619 .await?
1620 .start();
1621
1622 let inputs = HashMap::from([(Label::value(), input_vec)]);
1623
1624 let earlier = std::time::Instant::now();
1625
1626 let outputs = runtime
1627 .execute_graph_cb(graph, inputs, true, fake_callback(), fake_escape())
1628 .await?;
1629
1630 assert!(earlier.elapsed() < Duration::from_millis(1250));
1631 assert_eq!(
1632 outputs.get(&Label::value()),
1633 Some(&Value::Vec(vec![
1634 Value::Int(1),
1635 Value::Int(2),
1636 Value::Int(3),
1637 Value::Int(4),
1638 Value::Int(5)
1639 ]))
1640 );
1641
1642 Ok(())
1643 }
1644
1645 #[fixture]
1646 fn div_graph() -> anyhow::Result<Graph> {
1647 let mut builder = GraphBuilder::new();
1648 let [input, output] = Graph::boundary();
1649
1650 let div = builder.add_node("idiv")?;
1651 builder.add_edge((input, "a"), (div, "a"), None)?;
1652 builder.add_edge((input, "b"), (div, "b"), None)?;
1653 builder.add_edge((div, "value"), (output, "value"), None)?;
1654 let graph = builder.build()?;
1655 Ok(graph)
1656 }
1657
1658 #[rstest]
1659 #[tokio::test]
1660 async fn test_idiv_op(div_graph: anyhow::Result<Graph>) -> anyhow::Result<()> {
1661 let graph = div_graph?;
1662
1663 let runtime = Runtime::builder().start();
1664
1665 let inputs = HashMap::from([
1666 (TryInto::try_into("a")?, Value::Int(10)),
1667 (TryInto::try_into("b")?, Value::Int(2)),
1668 ]);
1669
1670 let outputs = runtime
1671 .execute_graph_cb(graph, inputs, true, fake_callback(), fake_escape())
1672 .await?;
1673
1674 assert_eq!(
1675 outputs.get(&TryInto::try_into("value")?),
1676 Some(&Value::Int(5))
1677 );
1678
1679 Ok(())
1680 }
1681
1682 #[rstest]
1683 #[tokio::test]
1684 #[should_panic(expected = "Tried to divide by zero")]
1685 async fn test_idiv_zero(div_graph: anyhow::Result<Graph>) {
1686 let graph = div_graph.unwrap();
1687
1688 let runtime = Runtime::builder().start();
1689
1690 let inputs = HashMap::from([
1691 (TryInto::try_into("a").unwrap(), Value::Int(10)),
1692 (TryInto::try_into("b").unwrap(), Value::Int(0)),
1693 ]);
1694
1695 runtime
1696 .execute_graph_cb(graph, inputs, true, fake_callback(), fake_escape())
1697 .await
1698 .unwrap();
1699 }
1700
1701 #[fixture]
1702 fn mod_graph() -> anyhow::Result<Graph> {
1703 let mut builder = GraphBuilder::new();
1704 let [input, output] = Graph::boundary();
1705
1706 let mod_ = builder.add_node("imod")?;
1707 builder.add_edge((input, "a"), (mod_, "a"), None)?;
1708 builder.add_edge((input, "b"), (mod_, "b"), None)?;
1709 builder.add_edge((mod_, "value"), (output, "value"), None)?;
1710 let graph = builder.build()?;
1711 Ok(graph)
1712 }
1713
1714 #[rstest]
1715 #[tokio::test]
1716 async fn test_imod_op(mod_graph: anyhow::Result<Graph>) -> anyhow::Result<()> {
1717 let graph = mod_graph?;
1718
1719 let runtime = Runtime::builder().start();
1720
1721 let inputs = HashMap::from([
1722 (TryInto::try_into("a")?, Value::Int(10)),
1723 (TryInto::try_into("b")?, Value::Int(3)),
1724 ]);
1725
1726 let outputs = runtime
1727 .execute_graph_cb(graph, inputs, true, fake_callback(), fake_escape())
1728 .await?;
1729
1730 assert_eq!(
1731 outputs.get(&TryInto::try_into("value")?),
1732 Some(&Value::Int(1))
1733 );
1734
1735 Ok(())
1736 }
1737
1738 #[rstest]
1739 #[tokio::test]
1740 #[should_panic(expected = "Tried to take modulus with zero.")]
1741 async fn test_imod_zero(mod_graph: anyhow::Result<Graph>) {
1742 let graph = mod_graph.unwrap();
1743
1744 let runtime = Runtime::builder().start();
1745
1746 let inputs = HashMap::from([
1747 (TryInto::try_into("a").unwrap(), Value::Int(10)),
1748 (TryInto::try_into("b").unwrap(), Value::Int(0)),
1749 ]);
1750
1751 runtime
1752 .execute_graph_cb(graph, inputs, true, fake_callback(), fake_escape())
1753 .await
1754 .unwrap();
1755 }
1756
1757 #[tokio::test]
1758 async fn test_loc_merge() -> anyhow::Result<()> {
1759 let py_loc_2: LocationName = TryInto::try_into("python2")?;
1760 let python_1 = ExternalWorker::new_spawn(&python_path(), fake_interceptor()).await?;
1761 let python_2 = python_1.clone();
1762 let runtime = Runtime::builder()
1763 .with_worker(python_1, py_loc())
1764 .await?
1765 .with_worker(python_2, py_loc_2)
1766 .await?
1767 .start();
1768
1769 let sig = runtime.signature().root;
1770 assert_eq!(
1771 sig.functions[&(TryInto::try_into("copy")?)].locations,
1772 vec![Location::local()]
1773 );
1774 assert_eq!(
1775 sig.subspaces[&(TryInto::try_into("python_nodes")?)].functions
1776 [&(TryInto::try_into("id_py")?)]
1777 .locations,
1778 vec![Location(vec![py_loc()]), Location(vec![py_loc_2])]
1779 );
1780 Ok(())
1781 }
1782
1783 #[tokio::test]
1784 async fn test_loc_clash() -> anyhow::Result<()> {
1785 let python_1 = ExternalWorker::new_spawn(&python_path(), fake_interceptor()).await?;
1786 let python_2 = python_1.clone();
1787
1788 let builder = Runtime::builder()
1789 .with_worker(python_1, py_loc())
1790 .await?
1791 .with_worker(python_2, py_loc())
1792 .await;
1793 match builder {
1794 Ok(_) => bail!("Builder should fail with two workers on the same location"),
1795 Err(e) => {
1796 assert_eq!(
1797 e.to_string(),
1798 anyhow::Error::from(LocationConflict(py_loc())).to_string()
1799 );
1800 Ok(())
1801 }
1802 }
1803 }
1804
1805 #[tokio::test]
1806 async fn test_stack_trace_map_eval() -> anyhow::Result<()> {
1807 use anyhow::anyhow;
1808 let (func_node, func_graph) = {
1809 let mut builder = GraphBuilder::new();
1810 let [input, output] = Graph::boundary();
1811
1812 let nod = builder.add_node("st_test")?;
1813 builder.add_edge((input, "argument"), (nod, "label"), None)?;
1814 builder.add_edge((nod, "labelled_trace"), (output, "result"), None)?;
1815 (nod, builder.build()?)
1816 };
1817 let (eval_node, map_thunk) = {
1818 let mut builder = GraphBuilder::new();
1819 let [input, output] = Graph::boundary();
1820 let func_cst = builder.add_node(Node::Const(Value::Graph(func_graph)))?;
1821 let eval_node = builder.add_node("eval")?;
1822 builder.add_edge((func_cst, "value"), (eval_node, "thunk"), None)?;
1823 builder.add_edge((input, "value"), (eval_node, "argument"), None)?;
1824 builder.add_edge((eval_node, "result"), (output, "value"), None)?;
1825 (eval_node, builder.build()?)
1826 };
1827 let (map_node, graph) = {
1828 let mut builder = GraphBuilder::new();
1829 let [input, output] = Graph::boundary();
1830 let map_func = builder.add_node(Node::Const(Value::Graph(map_thunk)))?;
1831 let map_node = builder.add_node("map")?;
1832 builder.add_edge((map_func, "value"), (map_node, "thunk"), None)?;
1833 builder.add_edge((input, "elems"), (map_node, "value"), None)?;
1834 builder.add_edge((map_node, "value"), (output, "mapped"), None)?;
1835 (map_node, builder.build()?)
1836 };
1837
1838 let arg: Label = TryInto::try_into("label").unwrap();
1839 let res: Label = TryInto::try_into("labelled_trace").unwrap();
1840 let mut t = GraphType::new();
1841 t.add_input(arg, Type::Str);
1842 t.add_output(res, Type::Str);
1843 let mut worker = LocalWorker::new();
1844 let n = tierkreis_core::prelude::TryFrom::try_from("st_test").unwrap();
1845 worker.add_function(
1846 FunctionName::builtin(n),
1847 FunctionDeclaration {
1848 type_scheme: t.into(),
1849 description: "Compare stack trace with expected string".into(),
1850 input_order: vec![arg],
1851 output_order: vec![res],
1852 },
1853 move || {
1854 RuntimeOperation::new_fn_simple(move |mut inputs, context| {
1855 let Value::Str(s) = inputs.remove_entry(&arg).unwrap().1 else {
1856 return Err(anyhow!("bad input"));
1857 };
1858 Ok(HashMap::from([(
1859 res,
1860 Value::Str(format!("{s} {:?}", context.graph_trace)),
1861 )]))
1862 })
1863 },
1864 );
1865 let runtime = Runtime::builder()
1866 .with_local_functions(worker)
1867 .unwrap()
1868 .start();
1869
1870 runtime.infer_type(&graph).unwrap();
1871 let input_elems = vec![Value::Str("a".into()), Value::Str("b".into())];
1872
1873 let res = runtime
1874 .execute_graph_cb(
1875 graph.clone(),
1876 HashMap::from([(TryInto::try_into("elems").unwrap(), Value::Vec(input_elems))]),
1877 true,
1878 fake_callback(),
1879 fake_escape(),
1880 )
1881 .await
1882 .unwrap();
1883 let trace_for_elem = |i| {
1884 Into::<GraphTrace>::into(
1885 GraphTrace::Root
1886 .inner_node(map_node)
1887 .list_elem(i)
1888 .inner_node(eval_node)
1889 .inner_node(func_node),
1890 )
1891 };
1892 let expected = HashMap::from([(
1893 TryInto::try_into("mapped").unwrap(),
1894 Value::Vec(vec![
1895 Value::Str(format!("a {:?}", trace_for_elem(0))),
1896 Value::Str(format!("b {:?}", trace_for_elem(1))),
1897 ]),
1898 )]);
1899 assert_eq!(res, expected);
1900 Ok(())
1901 }
1902}