somatize_runtime/runner/
remote.rs1use super::Runner;
8use crate::EventBus;
9use crate::filter_library::FilterLibrary;
10
11use somatize_compiler::ExecutionPlan;
12use somatize_core::cache::CacheStore;
13use somatize_core::error::Result;
14use somatize_core::value::Value;
15use std::collections::HashMap;
16use std::sync::Arc;
17
18pub trait Transport: Send + Sync {
21 fn execute(
23 &self,
24 plan: &ExecutionPlan,
25 filters: &FilterLibrary,
26 input: &Value,
27 y: Option<&Value>,
28 fit_mode: bool,
29 ) -> Result<(Value, HashMap<String, Value>)>;
30
31 fn get_state(&self, node_ids: &[String]) -> Result<HashMap<String, Value>>;
33
34 fn set_state(&self, states: &HashMap<String, Value>) -> Result<()>;
36
37 fn get_gradients(&self, node_ids: &[String]) -> Result<HashMap<String, Value>>;
39
40 fn apply_gradients(&self, gradients: &HashMap<String, Value>) -> Result<()>;
42
43 fn execute_node(&self, node_id: &str, input: Option<&Value>) -> Result<Value> {
45 let plan = ExecutionPlan::Execute {
46 node_id: node_id.to_string(),
47 };
48 let input_val = input.cloned().unwrap_or(Value::Empty);
49 let filters = crate::filter_library::FilterLibrary::new();
50 let (output, _) = self.execute(&plan, &filters, &input_val, None, false)?;
51 Ok(output)
52 }
53}
54
55pub struct RemoteRunner {
57 transport: Box<dyn Transport>,
58}
59
60impl RemoteRunner {
61 pub fn new(transport: impl Transport + 'static) -> Self {
62 Self {
63 transport: Box::new(transport),
64 }
65 }
66
67 pub fn transport(&self) -> &dyn Transport {
69 self.transport.as_ref()
70 }
71}
72
73impl Runner for RemoteRunner {
74 fn fit(
75 &self,
76 plan: &ExecutionPlan,
77 filters: &FilterLibrary,
78 _cache: &dyn CacheStore,
79 _event_bus: &Arc<EventBus>,
80 input: &Value,
81 y: Option<&Value>,
82 ) -> Result<(Value, HashMap<String, Value>)> {
83 self.transport.execute(plan, filters, input, y, true)
84 }
85
86 fn forward(
87 &self,
88 plan: &ExecutionPlan,
89 filters: &FilterLibrary,
90 _cache: &dyn CacheStore,
91 _event_bus: &Arc<EventBus>,
92 input: &Value,
93 ) -> Result<Value> {
94 let (output, _states) = self.transport.execute(plan, filters, input, None, false)?;
95 Ok(output)
96 }
97}