Skip to main content

somatize_runtime/runner/
remote.rs

1//! RemoteRunner — executes plans on remote workers via a Transport abstraction.
2//!
3//! The Transport trait abstracts HOW to communicate with workers (WS, HTTP, gRPC, etc.).
4//! RemoteRunner implements Runner by serializing fit/forward calls and sending them
5//! through the transport layer.
6
7use super::Runner;
8use crate::EventBus;
9use crate::filter_library::FilterLibrary;
10
11use somatize_compiler::ExecutionPlan;
12use somatize_core::cache::CacheStore;
13use somatize_core::error::Result;
14use somatize_core::value::Value;
15use std::collections::HashMap;
16use std::sync::Arc;
17
18/// Abstraction for communicating with remote workers.
19/// Implemented by WsTransport (WebSocket), but could be HTTP, gRPC, etc.
20pub trait Transport: Send + Sync {
21    /// Send a plan for execution and receive the output + trained states.
22    fn execute(
23        &self,
24        plan: &ExecutionPlan,
25        filters: &FilterLibrary,
26        input: &Value,
27        y: Option<&Value>,
28        fit_mode: bool,
29    ) -> Result<(Value, HashMap<String, Value>)>;
30
31    /// Request trained states from the remote worker.
32    fn get_state(&self, node_ids: &[String]) -> Result<HashMap<String, Value>>;
33
34    /// Load states on the remote worker.
35    fn set_state(&self, states: &HashMap<String, Value>) -> Result<()>;
36
37    /// Request gradients from the remote worker.
38    fn get_gradients(&self, node_ids: &[String]) -> Result<HashMap<String, Value>>;
39
40    /// Apply aggregated gradients on the remote worker.
41    fn apply_gradients(&self, gradients: &HashMap<String, Value>) -> Result<()>;
42
43    /// Convenience: execute a single node remotely (used by the plan executor).
44    fn execute_node(&self, node_id: &str, input: Option<&Value>) -> Result<Value> {
45        let plan = ExecutionPlan::Execute {
46            node_id: node_id.to_string(),
47        };
48        let input_val = input.cloned().unwrap_or(Value::Empty);
49        let filters = crate::filter_library::FilterLibrary::new();
50        let (output, _) = self.execute(&plan, &filters, &input_val, None, false)?;
51        Ok(output)
52    }
53}
54
55/// A Runner that delegates execution to a remote worker via Transport.
56pub struct RemoteRunner {
57    transport: Box<dyn Transport>,
58}
59
60impl RemoteRunner {
61    pub fn new(transport: impl Transport + 'static) -> Self {
62        Self {
63            transport: Box::new(transport),
64        }
65    }
66
67    /// Access the underlying transport (for strategy methods).
68    pub fn transport(&self) -> &dyn Transport {
69        self.transport.as_ref()
70    }
71}
72
73impl Runner for RemoteRunner {
74    fn fit(
75        &self,
76        plan: &ExecutionPlan,
77        filters: &FilterLibrary,
78        _cache: &dyn CacheStore,
79        _event_bus: &Arc<EventBus>,
80        input: &Value,
81        y: Option<&Value>,
82    ) -> Result<(Value, HashMap<String, Value>)> {
83        self.transport.execute(plan, filters, input, y, true)
84    }
85
86    fn forward(
87        &self,
88        plan: &ExecutionPlan,
89        filters: &FilterLibrary,
90        _cache: &dyn CacheStore,
91        _event_bus: &Arc<EventBus>,
92        input: &Value,
93    ) -> Result<Value> {
94        let (output, _states) = self.transport.execute(plan, filters, input, None, false)?;
95        Ok(output)
96    }
97}