1use crate::executor::{ExecutionResult, NodeExecutor};
7use async_trait::async_trait;
8use jamjet_scheduler::strategy_bridge::{
9 DecideRequest, DiscoverRequest, ScoreRequest, StrategyBridge,
10};
11use jamjet_state::backend::WorkItem;
12use serde_json::{json, Value};
13use tracing::{debug, instrument, warn};
14
15pub struct CoordinatorExecutor {
16 bridge: StrategyBridge,
17}
18
19impl CoordinatorExecutor {
20 pub fn new(strategy_bridge_url: String) -> Self {
21 Self {
22 bridge: StrategyBridge::new(strategy_bridge_url),
23 }
24 }
25}
26
27#[async_trait]
28impl NodeExecutor for CoordinatorExecutor {
29 #[instrument(skip(self, item), fields(node_id = %item.node_id))]
30 async fn execute(&self, item: &WorkItem) -> Result<ExecutionResult, String> {
31 let start = std::time::Instant::now();
32 let p = &item.payload;
33
34 let task = p
36 .get("task")
37 .and_then(|v| v.as_str())
38 .unwrap_or("")
39 .to_string();
40 let required_skills: Vec<String> = p
41 .get("required_skills")
42 .and_then(|v| serde_json::from_value(v.clone()).ok())
43 .unwrap_or_default();
44 let preferred_skills: Vec<String> = p
45 .get("preferred_skills")
46 .and_then(|v| serde_json::from_value(v.clone()).ok())
47 .unwrap_or_default();
48 let trust_domain = p
49 .get("trust_domain")
50 .and_then(|v| v.as_str())
51 .map(String::from);
52 let strategy_name = p
53 .get("strategy")
54 .and_then(|v| v.as_str())
55 .unwrap_or("default")
56 .to_string();
57 let output_key = p
58 .get("output_key")
59 .and_then(|v| v.as_str())
60 .unwrap_or("result")
61 .to_string();
62 let threshold = p
63 .get("tiebreaker")
64 .and_then(|t| t.get("threshold"))
65 .and_then(|v| v.as_f64())
66 .unwrap_or(0.1);
67 let tiebreaker_model = p
68 .get("tiebreaker")
69 .and_then(|t| t.get("model"))
70 .and_then(|v| v.as_str())
71 .unwrap_or("claude-sonnet-4-6")
72 .to_string();
73 let weights = p.get("weights").cloned().unwrap_or(json!({}));
74 let context = p.get("input").cloned().unwrap_or(json!({}));
75
76 debug!(task = %task, strategy = %strategy_name, "Coordinator: starting discovery");
77
78 let discover_resp = self
80 .bridge
81 .discover(DiscoverRequest {
82 task: task.clone(),
83 required_skills: required_skills.clone(),
84 preferred_skills: preferred_skills.clone(),
85 trust_domain: trust_domain.clone(),
86 strategy_name: strategy_name.clone(),
87 context: context.clone(),
88 })
89 .await
90 .map_err(|e| format!("Discovery failed: {e}"))?;
91
92 let discovery_event = json!({
93 "type": "coordinator_discovery",
94 "node_id": item.node_id,
95 "query_skills": required_skills,
96 "query_trust_domain": trust_domain,
97 "candidates": discover_resp.candidates.iter()
98 .map(|c| json!({"uri": &c.uri, "skills": &c.skills}))
99 .collect::<Vec<_>>(),
100 "filtered_out": discover_resp.filtered_out,
101 });
102
103 if discover_resp.candidates.is_empty() {
105 let decision_event = json!({
106 "type": "coordinator_decision",
107 "node_id": item.node_id,
108 "selected": null,
109 "method": "no_candidates",
110 "confidence": 0.0,
111 "rejected": [],
112 });
113 let duration_ms = start.elapsed().as_millis() as u64;
114 return Ok(ExecutionResult {
115 output: json!(null),
116 state_patch: json!({
117 "coordinator_events": [discovery_event, decision_event]
118 }),
119 duration_ms,
120 gen_ai_system: None,
121 gen_ai_model: None,
122 input_tokens: None,
123 output_tokens: None,
124 finish_reason: None,
125 });
126 }
127
128 if discover_resp.candidates.len() == 1 {
130 let selected = discover_resp.candidates[0].uri.clone();
131 let duration_ms = start.elapsed().as_millis() as u64;
132 return Ok(ExecutionResult {
133 output: json!({ output_key: selected.clone() }),
134 state_patch: json!({
135 "coordinator_events": [
136 discovery_event,
137 {
138 "type": "coordinator_scoring",
139 "node_id": &item.node_id,
140 "rankings": [{"uri": &selected, "composite": 1.0}],
141 "spread": 1.0,
142 "weights": &weights
143 },
144 {
145 "type": "coordinator_decision",
146 "node_id": &item.node_id,
147 "selected": &selected,
148 "method": "single_candidate",
149 "confidence": 1.0,
150 "rejected": []
151 },
152 ]
153 }),
154 duration_ms,
155 gen_ai_system: None,
156 gen_ai_model: None,
157 input_tokens: None,
158 output_tokens: None,
159 finish_reason: None,
160 });
161 }
162
163 let score_resp = self
165 .bridge
166 .score(ScoreRequest {
167 task: task.clone(),
168 candidates: discover_resp.candidates,
169 weights: weights.clone(),
170 context: context.clone(),
171 })
172 .await
173 .map_err(|e| format!("Scoring failed: {e}"))?;
174
175 if score_resp.rankings.is_empty() {
177 let decision_event = json!({
178 "type": "coordinator_decision",
179 "node_id": &item.node_id,
180 "selected": null,
181 "method": "no_candidates",
182 "confidence": 0.0,
183 "rejected": [],
184 });
185 let duration_ms = start.elapsed().as_millis() as u64;
186 return Ok(ExecutionResult {
187 output: json!(null),
188 state_patch: json!({
189 "coordinator_events": [
190 json!({
191 "type": "coordinator_scoring",
192 "node_id": &item.node_id,
193 "rankings": [],
194 "spread": score_resp.spread,
195 "weights": &weights,
196 }),
197 decision_event,
198 ]
199 }),
200 duration_ms,
201 gen_ai_system: None,
202 gen_ai_model: None,
203 input_tokens: None,
204 output_tokens: None,
205 finish_reason: None,
206 });
207 }
208
209 let scoring_event = json!({
210 "type": "coordinator_scoring",
211 "node_id": item.node_id,
212 "rankings": score_resp.rankings,
213 "spread": score_resp.spread,
214 "weights": weights,
215 });
216
217 let used_tiebreaker_model: Option<String>;
219 let decide_result: Value = if score_resp.spread > threshold {
220 let top = &score_resp.rankings[0];
222 let rejected: Vec<Value> = score_resp.rankings[1..]
223 .iter()
224 .map(|r| json!({"uri": &r.uri, "reason": "lower score"}))
225 .collect();
226 used_tiebreaker_model = None;
227 json!({
228 "type": "coordinator_decision",
229 "node_id": item.node_id,
230 "selected": top.uri,
231 "method": "structured",
232 "confidence": top.composite,
233 "rejected": rejected,
234 })
235 } else {
236 let max_candidates = p
238 .get("tiebreaker")
239 .and_then(|t| t.get("max_candidates"))
240 .and_then(|v| v.as_u64())
241 .unwrap_or(3) as usize;
242 let top_n: Vec<_> = score_resp
243 .rankings
244 .into_iter()
245 .take(max_candidates)
246 .collect();
247
248 match self
249 .bridge
250 .decide(DecideRequest {
251 task: task.clone(),
252 top_candidates: top_n.clone(),
253 threshold,
254 tiebreaker_model: tiebreaker_model.clone(),
255 context: context.clone(),
256 })
257 .await
258 {
259 Ok(r) => {
260 let is_llm = r.method == "llm_tiebreaker";
261 used_tiebreaker_model = if is_llm {
262 Some(tiebreaker_model.clone())
263 } else {
264 None
265 };
266 json!({
267 "type": "coordinator_decision",
268 "node_id": item.node_id,
269 "selected": r.selected_uri,
270 "method": r.method,
271 "reasoning": r.reasoning,
272 "confidence": r.confidence,
273 "rejected": r.rejected,
274 "tiebreaker_tokens": r.tiebreaker_tokens,
275 "tiebreaker_cost": r.tiebreaker_cost,
276 })
277 }
278 Err(e) => {
279 warn!("LLM tiebreaker failed, falling back to top scorer: {e}");
280 used_tiebreaker_model = None;
281 if let Some(first) = top_n.first() {
282 json!({
283 "type": "coordinator_decision",
284 "node_id": item.node_id,
285 "selected": first.uri,
286 "method": "tiebreaker_failed",
287 "reasoning": format!("Tiebreaker error: {e}"),
288 "confidence": first.composite,
289 "rejected": [],
290 })
291 } else {
292 json!({
293 "type": "coordinator_decision",
294 "node_id": item.node_id,
295 "selected": null,
296 "method": "tiebreaker_failed",
297 "reasoning": format!("Tiebreaker error: {e}"),
298 "confidence": 0.0,
299 "rejected": [],
300 })
301 }
302 }
303 }
304 };
305
306 let selected_uri = decide_result
307 .get("selected")
308 .and_then(|v| v.as_str())
309 .map(String::from);
310 let duration_ms = start.elapsed().as_millis() as u64;
311
312 Ok(ExecutionResult {
313 output: match &selected_uri {
314 Some(uri) => json!({ output_key: uri }),
315 None => json!(null),
316 },
317 state_patch: json!({
318 "coordinator_events": [discovery_event, scoring_event, decide_result]
319 }),
320 duration_ms,
321 gen_ai_system: None,
322 gen_ai_model: used_tiebreaker_model,
323 input_tokens: None,
324 output_tokens: None,
325 finish_reason: None,
326 })
327 }
328}