Skip to main content

somatize_worker/
remote_executor.rs

1//! WebSocket-based RemoteExecutor — sends plans to workers and collects results.
2
3use somatize_core::error::{Result, SomaError};
4use somatize_core::filter::RemoteTarget;
5use somatize_core::value::Value;
6use somatize_runtime::executor::RemoteExecutor;
7
8use crate::protocol::*;
9use std::sync::RwLock;
10
11/// A remote executor that dispatches work to workers via WebSocket.
12///
13/// Workers are registered by address + optional token.
14/// When `execute_remote` is called, it finds a matching worker,
15/// connects via WS, sends the plan, and waits for the result.
16pub struct WsRemoteExecutor {
17    /// Registered workers: (address, token, tags)
18    workers: RwLock<Vec<WorkerEntry>>,
19}
20
21#[derive(Clone)]
22struct WorkerEntry {
23    address: String,
24    token: Option<String>,
25    tags: Vec<String>,
26}
27
28impl WsRemoteExecutor {
29    pub fn new() -> Self {
30        Self {
31            workers: RwLock::new(Vec::new()),
32        }
33    }
34
35    /// Register a worker endpoint.
36    pub fn add_worker(&self, address: impl Into<String>, token: Option<String>, tags: Vec<String>) {
37        let mut workers = self.workers.write().unwrap();
38        workers.push(WorkerEntry {
39            address: address.into(),
40            token,
41            tags,
42        });
43    }
44
45    /// Find a worker matching the given target.
46    fn find_worker(&self, target: &RemoteTarget) -> Option<WorkerEntry> {
47        let workers = self.workers.read().unwrap();
48        match target {
49            RemoteTarget::WorkerId(id) => workers.iter().find(|w| w.address.contains(id)).cloned(),
50            RemoteTarget::Tag(tag) => workers.iter().find(|w| w.tags.contains(tag)).cloned(),
51        }
52    }
53
54    /// Send a plan to a worker via WebSocket and wait for the result.
55    fn execute_on_worker(
56        &self,
57        worker: &WorkerEntry,
58        node_id: &str,
59        input: Option<&Value>,
60    ) -> Result<Value> {
61        let rt = tokio::runtime::Builder::new_current_thread()
62            .enable_all()
63            .build()
64            .map_err(|e| SomaError::Other(format!("tokio runtime: {e}")))?;
65
66        rt.block_on(async {
67            let url = if let Some(token) = &worker.token {
68                format!("{}/ws?token={}", worker.address, token)
69            } else {
70                format!("{}/ws", worker.address)
71            };
72
73            let (mut ws, _) = tokio_tungstenite::connect_async(&url)
74                .await
75                .map_err(|e| SomaError::Other(format!("WS connect to {}: {e}", worker.address)))?;
76
77            use futures_util::{SinkExt, StreamExt};
78            use tokio_tungstenite::tungstenite::Message;
79
80            // Build a simple plan: execute this one node
81            let plan = SerializedPlan {
82                plan_id: format!("remote_{node_id}"),
83                plan: somatize_compiler::ExecutionPlan::Execute {
84                    node_id: node_id.to_string(),
85                },
86                input: input.map(|v| InputSource::Inline { value: v.clone() }),
87                filters: vec![],
88                metadata: serde_json::json!({}),
89            };
90
91            let msg = CoordinatorToWorker::AssignPlan { plan };
92            let json = serde_json::to_string(&msg)
93                .map_err(|e| SomaError::Other(format!("serialize: {e}")))?;
94
95            ws.send(Message::Text(json.into()))
96                .await
97                .map_err(|e| SomaError::Other(format!("WS send: {e}")))?;
98
99            // Wait for result
100            while let Some(Ok(Message::Text(response))) = ws.next().await {
101                if let Ok(result) = serde_json::from_str::<WorkerToCoordinator>(&response) {
102                    match result {
103                        WorkerToCoordinator::PlanResult { result, .. } => match result {
104                            PlanResult::Success { output, .. } => {
105                                let _ = ws.close(None).await;
106                                return Ok(output);
107                            }
108                            PlanResult::Failed { error, .. } => {
109                                let _ = ws.close(None).await;
110                                return Err(SomaError::Execution {
111                                    node_id: node_id.to_string(),
112                                    message: error,
113                                });
114                            }
115                        },
116                        // Skip progress/event messages
117                        _ => continue,
118                    }
119                }
120            }
121
122            let _ = ws.close(None).await;
123            Err(SomaError::Other(format!(
124                "worker {} closed without result",
125                worker.address
126            )))
127        })
128    }
129
130    pub fn has_workers(&self) -> bool {
131        !self.workers.read().unwrap().is_empty()
132    }
133}
134
135impl Default for WsRemoteExecutor {
136    fn default() -> Self {
137        Self::new()
138    }
139}
140
141impl RemoteExecutor for WsRemoteExecutor {
142    fn execute_remote(
143        &self,
144        node_id: &str,
145        target: &RemoteTarget,
146        input: Option<&Value>,
147    ) -> Result<Value> {
148        let worker = self
149            .find_worker(target)
150            .ok_or_else(|| SomaError::Other(format!("no worker found for target {target:?}")))?;
151
152        tracing::info!(
153            "Dispatching node '{node_id}' to worker at {}",
154            worker.address
155        );
156
157        self.execute_on_worker(&worker, node_id, input)
158    }
159}