Skip to main content

somatize_worker/
remote_executor.rs

1//! WebSocket-based RemoteExecutor — sends plans to workers and collects results.
2
3use somatize_core::error::{Result, SomaError};
4use somatize_core::filter::RemoteTarget;
5use somatize_core::value::Value;
6use somatize_runtime::executor::RemoteExecutor;
7
8use crate::protocol::*;
9use std::sync::RwLock;
10
11/// A remote executor that dispatches work to workers via WebSocket.
12///
13/// Workers are registered by address + optional token.
14/// When `execute_remote` is called, it finds a matching worker,
15/// connects via WS, sends the plan, and waits for the result.
16pub struct WsRemoteExecutor {
17    /// Registered workers: (address, token, tags)
18    workers: RwLock<Vec<WorkerEntry>>,
19}
20
21#[derive(Clone)]
22struct WorkerEntry {
23    address: String,
24    token: Option<String>,
25    tags: Vec<String>,
26}
27
28impl WsRemoteExecutor {
29    pub fn new() -> Self {
30        Self {
31            workers: RwLock::new(Vec::new()),
32        }
33    }
34
35    /// Register a worker endpoint.
36    pub fn add_worker(&self, address: impl Into<String>, token: Option<String>, tags: Vec<String>) {
37        let mut workers = self.workers.write().unwrap();
38        workers.push(WorkerEntry {
39            address: address.into(),
40            token,
41            tags,
42        });
43    }
44
45    /// Find a worker matching the given target.
46    fn find_worker(&self, target: &RemoteTarget) -> Option<WorkerEntry> {
47        let workers = self.workers.read().unwrap();
48        match target {
49            RemoteTarget::WorkerId(id) => workers.iter().find(|w| w.address.contains(id)).cloned(),
50            RemoteTarget::Tag(tag) => workers.iter().find(|w| w.tags.contains(tag)).cloned(),
51        }
52    }
53
54    /// Send a plan to a worker via WebSocket and wait for the result.
55    fn execute_on_worker(
56        &self,
57        worker: &WorkerEntry,
58        node_id: &str,
59        input: Option<&Value>,
60    ) -> Result<Value> {
61        let rt = tokio::runtime::Builder::new_current_thread()
62            .enable_all()
63            .build()
64            .map_err(|e| SomaError::Other(format!("tokio runtime: {e}")))?;
65
66        rt.block_on(async {
67            let url = if let Some(token) = &worker.token {
68                format!("{}/ws?token={}", worker.address, token)
69            } else {
70                format!("{}/ws", worker.address)
71            };
72
73            let (mut ws, _) = tokio_tungstenite::connect_async(&url)
74                .await
75                .map_err(|e| SomaError::Other(format!("WS connect to {}: {e}", worker.address)))?;
76
77            use futures_util::{SinkExt, StreamExt};
78            use tokio_tungstenite::tungstenite::Message;
79
80            // Build a simple plan: execute this one node
81            let plan = SerializedPlan {
82                plan_id: format!("remote_{node_id}"),
83                plan: somatize_compiler::ExecutionPlan::Execute {
84                    node_id: node_id.to_string(),
85                },
86                input: input.map(|v| InputSource::Inline { value: v.clone() }),
87                filters: vec![],
88                mode: ExecutionMode::default(),
89                metadata: serde_json::json!({}),
90            };
91
92            let msg = CoordinatorToWorker::AssignPlan { plan };
93            let json = serde_json::to_string(&msg)
94                .map_err(|e| SomaError::Other(format!("serialize: {e}")))?;
95
96            ws.send(Message::Text(json.into()))
97                .await
98                .map_err(|e| SomaError::Other(format!("WS send: {e}")))?;
99
100            // Wait for result
101            while let Some(Ok(Message::Text(response))) = ws.next().await {
102                if let Ok(result) = serde_json::from_str::<WorkerToCoordinator>(&response) {
103                    match result {
104                        WorkerToCoordinator::PlanResult { result, .. } => match result {
105                            PlanResult::Success { output, .. } => {
106                                let _ = ws.close(None).await;
107                                return Ok(output);
108                            }
109                            PlanResult::Failed { error, .. } => {
110                                let _ = ws.close(None).await;
111                                return Err(SomaError::Execution {
112                                    node_id: node_id.to_string(),
113                                    message: error,
114                                });
115                            }
116                        },
117                        // Skip progress/event messages
118                        _ => continue,
119                    }
120                }
121            }
122
123            let _ = ws.close(None).await;
124            Err(SomaError::Other(format!(
125                "worker {} closed without result",
126                worker.address
127            )))
128        })
129    }
130
131    pub fn has_workers(&self) -> bool {
132        !self.workers.read().unwrap().is_empty()
133    }
134}
135
136impl Default for WsRemoteExecutor {
137    fn default() -> Self {
138        Self::new()
139    }
140}
141
142impl RemoteExecutor for WsRemoteExecutor {
143    fn execute_remote(
144        &self,
145        node_id: &str,
146        target: &RemoteTarget,
147        input: Option<&Value>,
148    ) -> Result<Value> {
149        let worker = self
150            .find_worker(target)
151            .ok_or_else(|| SomaError::Other(format!("no worker found for target {target:?}")))?;
152
153        tracing::info!(
154            "Dispatching node '{node_id}' to worker at {}",
155            worker.address
156        );
157
158        self.execute_on_worker(&worker, node_id, input)
159    }
160}