somatize_worker/
remote_executor.rs1use somatize_core::error::{Result, SomaError};
4use somatize_core::filter::RemoteTarget;
5use somatize_core::value::Value;
6use somatize_runtime::executor::RemoteExecutor;
7
8use crate::protocol::*;
9use std::sync::RwLock;
10
11pub struct WsRemoteExecutor {
17 workers: RwLock<Vec<WorkerEntry>>,
19}
20
21#[derive(Clone)]
22struct WorkerEntry {
23 address: String,
24 token: Option<String>,
25 tags: Vec<String>,
26}
27
28impl WsRemoteExecutor {
29 pub fn new() -> Self {
30 Self {
31 workers: RwLock::new(Vec::new()),
32 }
33 }
34
35 pub fn add_worker(&self, address: impl Into<String>, token: Option<String>, tags: Vec<String>) {
37 let mut workers = self.workers.write().unwrap();
38 workers.push(WorkerEntry {
39 address: address.into(),
40 token,
41 tags,
42 });
43 }
44
45 fn find_worker(&self, target: &RemoteTarget) -> Option<WorkerEntry> {
47 let workers = self.workers.read().unwrap();
48 match target {
49 RemoteTarget::WorkerId(id) => workers.iter().find(|w| w.address.contains(id)).cloned(),
50 RemoteTarget::Tag(tag) => workers.iter().find(|w| w.tags.contains(tag)).cloned(),
51 }
52 }
53
54 fn execute_on_worker(
56 &self,
57 worker: &WorkerEntry,
58 node_id: &str,
59 input: Option<&Value>,
60 ) -> Result<Value> {
61 let rt = tokio::runtime::Builder::new_current_thread()
62 .enable_all()
63 .build()
64 .map_err(|e| SomaError::Other(format!("tokio runtime: {e}")))?;
65
66 rt.block_on(async {
67 let url = if let Some(token) = &worker.token {
68 format!("{}/ws?token={}", worker.address, token)
69 } else {
70 format!("{}/ws", worker.address)
71 };
72
73 let (mut ws, _) = tokio_tungstenite::connect_async(&url)
74 .await
75 .map_err(|e| SomaError::Other(format!("WS connect to {}: {e}", worker.address)))?;
76
77 use futures_util::{SinkExt, StreamExt};
78 use tokio_tungstenite::tungstenite::Message;
79
80 let plan = SerializedPlan {
82 plan_id: format!("remote_{node_id}"),
83 plan: somatize_compiler::ExecutionPlan::Execute {
84 node_id: node_id.to_string(),
85 },
86 input: input.map(|v| InputSource::Inline { value: v.clone() }),
87 filters: vec![],
88 mode: ExecutionMode::default(),
89 metadata: serde_json::json!({}),
90 };
91
92 let msg = CoordinatorToWorker::AssignPlan { plan };
93 let json = serde_json::to_string(&msg)
94 .map_err(|e| SomaError::Other(format!("serialize: {e}")))?;
95
96 ws.send(Message::Text(json.into()))
97 .await
98 .map_err(|e| SomaError::Other(format!("WS send: {e}")))?;
99
100 while let Some(Ok(Message::Text(response))) = ws.next().await {
102 if let Ok(result) = serde_json::from_str::<WorkerToCoordinator>(&response) {
103 match result {
104 WorkerToCoordinator::PlanResult { result, .. } => match result {
105 PlanResult::Success { output, .. } => {
106 let _ = ws.close(None).await;
107 return Ok(output);
108 }
109 PlanResult::Failed { error, .. } => {
110 let _ = ws.close(None).await;
111 return Err(SomaError::Execution {
112 node_id: node_id.to_string(),
113 message: error,
114 });
115 }
116 },
117 _ => continue,
119 }
120 }
121 }
122
123 let _ = ws.close(None).await;
124 Err(SomaError::Other(format!(
125 "worker {} closed without result",
126 worker.address
127 )))
128 })
129 }
130
131 pub fn has_workers(&self) -> bool {
132 !self.workers.read().unwrap().is_empty()
133 }
134}
135
136impl Default for WsRemoteExecutor {
137 fn default() -> Self {
138 Self::new()
139 }
140}
141
142impl RemoteExecutor for WsRemoteExecutor {
143 fn execute_remote(
144 &self,
145 node_id: &str,
146 target: &RemoteTarget,
147 input: Option<&Value>,
148 ) -> Result<Value> {
149 let worker = self
150 .find_worker(target)
151 .ok_or_else(|| SomaError::Other(format!("no worker found for target {target:?}")))?;
152
153 tracing::info!(
154 "Dispatching node '{node_id}' to worker at {}",
155 worker.address
156 );
157
158 self.execute_on_worker(&worker, node_id, input)
159 }
160}