Skip to main content

jamjet_worker/executors/
coordinator.rs

1//! Executor for `Coordinator` workflow nodes.
2//!
3//! Runs the three-phase coordinator pipeline: discovery → scoring → decision,
4//! delegating each phase to the Python strategy bridge via REST.
5
6use crate::executor::{ExecutionResult, NodeExecutor};
7use async_trait::async_trait;
8use jamjet_scheduler::strategy_bridge::{
9    DecideRequest, DiscoverRequest, ScoreRequest, StrategyBridge,
10};
11use jamjet_state::backend::WorkItem;
12use serde_json::{json, Value};
13use tracing::{debug, instrument, warn};
14
15pub struct CoordinatorExecutor {
16    bridge: StrategyBridge,
17}
18
19impl CoordinatorExecutor {
20    pub fn new(strategy_bridge_url: String) -> Self {
21        Self {
22            bridge: StrategyBridge::new(strategy_bridge_url),
23        }
24    }
25}
26
27#[async_trait]
28impl NodeExecutor for CoordinatorExecutor {
29    #[instrument(skip(self, item), fields(node_id = %item.node_id))]
30    async fn execute(&self, item: &WorkItem) -> Result<ExecutionResult, String> {
31        let start = std::time::Instant::now();
32        let p = &item.payload;
33
34        // Extract config from payload
35        let task = p
36            .get("task")
37            .and_then(|v| v.as_str())
38            .unwrap_or("")
39            .to_string();
40        let required_skills: Vec<String> = p
41            .get("required_skills")
42            .and_then(|v| serde_json::from_value(v.clone()).ok())
43            .unwrap_or_default();
44        let preferred_skills: Vec<String> = p
45            .get("preferred_skills")
46            .and_then(|v| serde_json::from_value(v.clone()).ok())
47            .unwrap_or_default();
48        let trust_domain = p
49            .get("trust_domain")
50            .and_then(|v| v.as_str())
51            .map(String::from);
52        let strategy_name = p
53            .get("strategy")
54            .and_then(|v| v.as_str())
55            .unwrap_or("default")
56            .to_string();
57        let output_key = p
58            .get("output_key")
59            .and_then(|v| v.as_str())
60            .unwrap_or("result")
61            .to_string();
62        let threshold = p
63            .get("tiebreaker")
64            .and_then(|t| t.get("threshold"))
65            .and_then(|v| v.as_f64())
66            .unwrap_or(0.1);
67        let tiebreaker_model = p
68            .get("tiebreaker")
69            .and_then(|t| t.get("model"))
70            .and_then(|v| v.as_str())
71            .unwrap_or("claude-sonnet-4-6")
72            .to_string();
73        let weights = p.get("weights").cloned().unwrap_or(json!({}));
74        let context = p.get("input").cloned().unwrap_or(json!({}));
75
76        debug!(task = %task, strategy = %strategy_name, "Coordinator: starting discovery");
77
78        // Phase 1: Discovery
79        let discover_resp = self
80            .bridge
81            .discover(DiscoverRequest {
82                task: task.clone(),
83                required_skills: required_skills.clone(),
84                preferred_skills: preferred_skills.clone(),
85                trust_domain: trust_domain.clone(),
86                strategy_name: strategy_name.clone(),
87                context: context.clone(),
88            })
89            .await
90            .map_err(|e| format!("Discovery failed: {e}"))?;
91
92        let discovery_event = json!({
93            "type": "coordinator_discovery",
94            "node_id": item.node_id,
95            "query_skills": required_skills,
96            "query_trust_domain": trust_domain,
97            "candidates": discover_resp.candidates.iter()
98                .map(|c| json!({"uri": &c.uri, "skills": &c.skills}))
99                .collect::<Vec<_>>(),
100            "filtered_out": discover_resp.filtered_out,
101        });
102
103        // Handle zero candidates
104        if discover_resp.candidates.is_empty() {
105            let decision_event = json!({
106                "type": "coordinator_decision",
107                "node_id": item.node_id,
108                "selected": null,
109                "method": "no_candidates",
110                "confidence": 0.0,
111                "rejected": [],
112            });
113            let duration_ms = start.elapsed().as_millis() as u64;
114            return Ok(ExecutionResult {
115                output: json!(null),
116                state_patch: json!({
117                    "coordinator_events": [discovery_event, decision_event]
118                }),
119                duration_ms,
120                gen_ai_system: None,
121                gen_ai_model: None,
122                input_tokens: None,
123                output_tokens: None,
124                finish_reason: None,
125            });
126        }
127
128        // Handle single candidate — skip scoring
129        if discover_resp.candidates.len() == 1 {
130            let selected = discover_resp.candidates[0].uri.clone();
131            let duration_ms = start.elapsed().as_millis() as u64;
132            return Ok(ExecutionResult {
133                output: json!({ output_key: selected.clone() }),
134                state_patch: json!({
135                    "coordinator_events": [
136                        discovery_event,
137                        {
138                            "type": "coordinator_scoring",
139                            "node_id": &item.node_id,
140                            "rankings": [{"uri": &selected, "composite": 1.0}],
141                            "spread": 1.0,
142                            "weights": &weights
143                        },
144                        {
145                            "type": "coordinator_decision",
146                            "node_id": &item.node_id,
147                            "selected": &selected,
148                            "method": "single_candidate",
149                            "confidence": 1.0,
150                            "rejected": []
151                        },
152                    ]
153                }),
154                duration_ms,
155                gen_ai_system: None,
156                gen_ai_model: None,
157                input_tokens: None,
158                output_tokens: None,
159                finish_reason: None,
160            });
161        }
162
163        // Phase 2: Scoring
164        let score_resp = self
165            .bridge
166            .score(ScoreRequest {
167                task: task.clone(),
168                candidates: discover_resp.candidates,
169                weights: weights.clone(),
170                context: context.clone(),
171            })
172            .await
173            .map_err(|e| format!("Scoring failed: {e}"))?;
174
175        // Guard: empty rankings means scoring produced nothing
176        if score_resp.rankings.is_empty() {
177            let decision_event = json!({
178                "type": "coordinator_decision",
179                "node_id": &item.node_id,
180                "selected": null,
181                "method": "no_candidates",
182                "confidence": 0.0,
183                "rejected": [],
184            });
185            let duration_ms = start.elapsed().as_millis() as u64;
186            return Ok(ExecutionResult {
187                output: json!(null),
188                state_patch: json!({
189                    "coordinator_events": [
190                        json!({
191                            "type": "coordinator_scoring",
192                            "node_id": &item.node_id,
193                            "rankings": [],
194                            "spread": score_resp.spread,
195                            "weights": &weights,
196                        }),
197                        decision_event,
198                    ]
199                }),
200                duration_ms,
201                gen_ai_system: None,
202                gen_ai_model: None,
203                input_tokens: None,
204                output_tokens: None,
205                finish_reason: None,
206            });
207        }
208
209        let scoring_event = json!({
210            "type": "coordinator_scoring",
211            "node_id": item.node_id,
212            "rankings": score_resp.rankings,
213            "spread": score_resp.spread,
214            "weights": weights,
215        });
216
217        // Phase 3: Decision
218        let used_tiebreaker_model: Option<String>;
219        let decide_result: Value = if score_resp.spread > threshold {
220            // Fast path: structured scoring wins
221            let top = &score_resp.rankings[0];
222            let rejected: Vec<Value> = score_resp.rankings[1..]
223                .iter()
224                .map(|r| json!({"uri": &r.uri, "reason": "lower score"}))
225                .collect();
226            used_tiebreaker_model = None;
227            json!({
228                "type": "coordinator_decision",
229                "node_id": item.node_id,
230                "selected": top.uri,
231                "method": "structured",
232                "confidence": top.composite,
233                "rejected": rejected,
234            })
235        } else {
236            // LLM tiebreaker
237            let max_candidates = p
238                .get("tiebreaker")
239                .and_then(|t| t.get("max_candidates"))
240                .and_then(|v| v.as_u64())
241                .unwrap_or(3) as usize;
242            let top_n: Vec<_> = score_resp
243                .rankings
244                .into_iter()
245                .take(max_candidates)
246                .collect();
247
248            match self
249                .bridge
250                .decide(DecideRequest {
251                    task: task.clone(),
252                    top_candidates: top_n.clone(),
253                    threshold,
254                    tiebreaker_model: tiebreaker_model.clone(),
255                    context: context.clone(),
256                })
257                .await
258            {
259                Ok(r) => {
260                    let is_llm = r.method == "llm_tiebreaker";
261                    used_tiebreaker_model = if is_llm {
262                        Some(tiebreaker_model.clone())
263                    } else {
264                        None
265                    };
266                    json!({
267                        "type": "coordinator_decision",
268                        "node_id": item.node_id,
269                        "selected": r.selected_uri,
270                        "method": r.method,
271                        "reasoning": r.reasoning,
272                        "confidence": r.confidence,
273                        "rejected": r.rejected,
274                        "tiebreaker_tokens": r.tiebreaker_tokens,
275                        "tiebreaker_cost": r.tiebreaker_cost,
276                    })
277                }
278                Err(e) => {
279                    warn!("LLM tiebreaker failed, falling back to top scorer: {e}");
280                    used_tiebreaker_model = None;
281                    if let Some(first) = top_n.first() {
282                        json!({
283                            "type": "coordinator_decision",
284                            "node_id": item.node_id,
285                            "selected": first.uri,
286                            "method": "tiebreaker_failed",
287                            "reasoning": format!("Tiebreaker error: {e}"),
288                            "confidence": first.composite,
289                            "rejected": [],
290                        })
291                    } else {
292                        json!({
293                            "type": "coordinator_decision",
294                            "node_id": item.node_id,
295                            "selected": null,
296                            "method": "tiebreaker_failed",
297                            "reasoning": format!("Tiebreaker error: {e}"),
298                            "confidence": 0.0,
299                            "rejected": [],
300                        })
301                    }
302                }
303            }
304        };
305
306        let selected_uri = decide_result
307            .get("selected")
308            .and_then(|v| v.as_str())
309            .map(String::from);
310        let duration_ms = start.elapsed().as_millis() as u64;
311
312        Ok(ExecutionResult {
313            output: match &selected_uri {
314                Some(uri) => json!({ output_key: uri }),
315                None => json!(null),
316            },
317            state_patch: json!({
318                "coordinator_events": [discovery_event, scoring_event, decide_result]
319            }),
320            duration_ms,
321            gen_ai_system: None,
322            gen_ai_model: used_tiebreaker_model,
323            input_tokens: None,
324            output_tokens: None,
325            finish_reason: None,
326        })
327    }
328}