somatize_worker/
remote_executor.rs1use somatize_core::error::{Result, SomaError};
4use somatize_core::filter::RemoteTarget;
5use somatize_core::value::Value;
6use somatize_runtime::executor::RemoteExecutor;
7
8use crate::protocol::*;
9use std::sync::RwLock;
10
11pub struct WsRemoteExecutor {
17 workers: RwLock<Vec<WorkerEntry>>,
19}
20
21#[derive(Clone)]
22struct WorkerEntry {
23 address: String,
24 token: Option<String>,
25 tags: Vec<String>,
26}
27
28impl WsRemoteExecutor {
29 pub fn new() -> Self {
30 Self {
31 workers: RwLock::new(Vec::new()),
32 }
33 }
34
35 pub fn add_worker(&self, address: impl Into<String>, token: Option<String>, tags: Vec<String>) {
37 let mut workers = self.workers.write().unwrap();
38 workers.push(WorkerEntry {
39 address: address.into(),
40 token,
41 tags,
42 });
43 }
44
45 fn find_worker(&self, target: &RemoteTarget) -> Option<WorkerEntry> {
47 let workers = self.workers.read().unwrap();
48 match target {
49 RemoteTarget::WorkerId(id) => workers.iter().find(|w| w.address.contains(id)).cloned(),
50 RemoteTarget::Tag(tag) => workers.iter().find(|w| w.tags.contains(tag)).cloned(),
51 }
52 }
53
54 fn execute_on_worker(
56 &self,
57 worker: &WorkerEntry,
58 node_id: &str,
59 input: Option<&Value>,
60 ) -> Result<Value> {
61 let rt = tokio::runtime::Builder::new_current_thread()
62 .enable_all()
63 .build()
64 .map_err(|e| SomaError::Other(format!("tokio runtime: {e}")))?;
65
66 rt.block_on(async {
67 let url = if let Some(token) = &worker.token {
68 format!("{}/ws?token={}", worker.address, token)
69 } else {
70 format!("{}/ws", worker.address)
71 };
72
73 let (mut ws, _) = tokio_tungstenite::connect_async(&url)
74 .await
75 .map_err(|e| SomaError::Other(format!("WS connect to {}: {e}", worker.address)))?;
76
77 use futures_util::{SinkExt, StreamExt};
78 use tokio_tungstenite::tungstenite::Message;
79
80 let plan = SerializedPlan {
82 plan_id: format!("remote_{node_id}"),
83 plan: somatize_compiler::ExecutionPlan::Execute {
84 node_id: node_id.to_string(),
85 },
86 input: input.map(|v| InputSource::Inline { value: v.clone() }),
87 filters: vec![],
88 metadata: serde_json::json!({}),
89 };
90
91 let msg = CoordinatorToWorker::AssignPlan { plan };
92 let json = serde_json::to_string(&msg)
93 .map_err(|e| SomaError::Other(format!("serialize: {e}")))?;
94
95 ws.send(Message::Text(json.into()))
96 .await
97 .map_err(|e| SomaError::Other(format!("WS send: {e}")))?;
98
99 while let Some(Ok(Message::Text(response))) = ws.next().await {
101 if let Ok(result) = serde_json::from_str::<WorkerToCoordinator>(&response) {
102 match result {
103 WorkerToCoordinator::PlanResult { result, .. } => match result {
104 PlanResult::Success { output, .. } => {
105 let _ = ws.close(None).await;
106 return Ok(output);
107 }
108 PlanResult::Failed { error, .. } => {
109 let _ = ws.close(None).await;
110 return Err(SomaError::Execution {
111 node_id: node_id.to_string(),
112 message: error,
113 });
114 }
115 },
116 _ => continue,
118 }
119 }
120 }
121
122 let _ = ws.close(None).await;
123 Err(SomaError::Other(format!(
124 "worker {} closed without result",
125 worker.address
126 )))
127 })
128 }
129
130 pub fn has_workers(&self) -> bool {
131 !self.workers.read().unwrap().is_empty()
132 }
133}
134
135impl Default for WsRemoteExecutor {
136 fn default() -> Self {
137 Self::new()
138 }
139}
140
141impl RemoteExecutor for WsRemoteExecutor {
142 fn execute_remote(
143 &self,
144 node_id: &str,
145 target: &RemoteTarget,
146 input: Option<&Value>,
147 ) -> Result<Value> {
148 let worker = self
149 .find_worker(target)
150 .ok_or_else(|| SomaError::Other(format!("no worker found for target {target:?}")))?;
151
152 tracing::info!(
153 "Dispatching node '{node_id}' to worker at {}",
154 worker.address
155 );
156
157 self.execute_on_worker(&worker, node_id, input)
158 }
159}