greentic_runner_host/runner/
engine.rs

1use std::collections::HashMap;
2use std::env;
3use std::error::Error as StdError;
4use std::sync::Arc;
5use std::time::Duration;
6
7use anyhow::{Context, Result, anyhow, bail};
8use greentic_flow::ir::{FlowIR, NodeIR};
9#[cfg(feature = "mcp")]
10use greentic_mcp::{ExecConfig, ExecRequest};
11#[cfg(feature = "mcp")]
12use greentic_types::TenantCtx as TypesTenantCtx;
13use handlebars::Handlebars;
14use parking_lot::RwLock;
15use serde::{Deserialize, Serialize};
16use serde_json::{Map as JsonMap, Value, json};
17use tokio::task;
18
19use super::mocks::MockLayer;
20use crate::config::{HostConfig, McpRetryConfig};
21use crate::pack::{FlowDescriptor, PackRuntime};
22#[cfg(feature = "mcp")]
23use crate::telemetry::tenant_context;
24use crate::telemetry::{FlowSpanAttributes, annotate_span, backoff_delay_ms, set_flow_context};
25
26pub struct FlowEngine {
27    packs: Vec<Arc<PackRuntime>>,
28    flows: Vec<FlowDescriptor>,
29    flow_sources: HashMap<String, usize>,
30    flow_ir: RwLock<HashMap<String, FlowIR>>,
31    #[cfg(feature = "mcp")]
32    exec_config: ExecConfig,
33    template_engine: Arc<Handlebars<'static>>,
34    default_env: String,
35}
36
37#[derive(Clone, Debug, Serialize, Deserialize)]
38pub struct FlowSnapshot {
39    pub flow_id: String,
40    pub next_node: String,
41    pub state: ExecutionState,
42}
43
44#[derive(Clone, Debug)]
45pub struct FlowWait {
46    pub reason: Option<String>,
47    pub snapshot: FlowSnapshot,
48}
49
50#[derive(Clone, Debug)]
51pub enum FlowStatus {
52    Completed,
53    Waiting(FlowWait),
54}
55
56#[derive(Clone, Debug)]
57pub struct FlowExecution {
58    pub output: Value,
59    pub status: FlowStatus,
60}
61
62impl FlowExecution {
63    fn completed(output: Value) -> Self {
64        Self {
65            output,
66            status: FlowStatus::Completed,
67        }
68    }
69
70    fn waiting(output: Value, wait: FlowWait) -> Self {
71        Self {
72            output,
73            status: FlowStatus::Waiting(wait),
74        }
75    }
76}
77
78impl FlowEngine {
79    pub async fn new(packs: Vec<Arc<PackRuntime>>, config: Arc<HostConfig>) -> Result<Self> {
80        #[cfg(not(feature = "mcp"))]
81        let _ = &config;
82        let mut flow_sources = HashMap::new();
83        let mut descriptors = Vec::new();
84        for (idx, pack) in packs.iter().enumerate() {
85            let flows = pack.list_flows().await?;
86            for flow in flows {
87                tracing::info!(
88                    flow_id = %flow.id,
89                    flow_type = %flow.flow_type,
90                    pack_index = idx,
91                    "registered flow"
92                );
93                flow_sources.insert(flow.id.clone(), idx);
94                descriptors.retain(|existing: &FlowDescriptor| existing.id != flow.id);
95                descriptors.push(flow);
96            }
97        }
98
99        #[cfg(feature = "mcp")]
100        let exec_config = config
101            .mcp_exec_config()
102            .context("failed to build MCP executor config")?;
103
104        let mut ir_map = HashMap::new();
105        for flow in &descriptors {
106            if let Some(&pack_idx) = flow_sources.get(&flow.id) {
107                let pack_clone = Arc::clone(&packs[pack_idx]);
108                let flow_id = flow.id.clone();
109                let task_flow_id = flow_id.clone();
110                match task::spawn_blocking(move || pack_clone.load_flow_ir(&task_flow_id)).await {
111                    Ok(Ok(ir)) => {
112                        ir_map.insert(flow_id, ir);
113                    }
114                    Ok(Err(err)) => {
115                        tracing::warn!(flow_id = %flow.id, error = %err, "failed to load flow metadata");
116                    }
117                    Err(err) => {
118                        tracing::warn!(flow_id = %flow.id, error = %err, "join error loading flow metadata");
119                    }
120                }
121            }
122        }
123
124        let mut handlebars = Handlebars::new();
125        handlebars.set_strict_mode(false);
126
127        Ok(Self {
128            packs,
129            flows: descriptors,
130            flow_sources,
131            flow_ir: RwLock::new(ir_map),
132            #[cfg(feature = "mcp")]
133            exec_config,
134            template_engine: Arc::new(handlebars),
135            default_env: env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string()),
136        })
137    }
138
139    async fn get_or_load_flow_ir(&self, flow_id: &str) -> Result<FlowIR> {
140        if let Some(ir) = self.flow_ir.read().get(flow_id).cloned() {
141            return Ok(ir);
142        }
143
144        let pack_idx = *self
145            .flow_sources
146            .get(flow_id)
147            .with_context(|| format!("flow {flow_id} not registered"))?;
148        let pack = Arc::clone(&self.packs[pack_idx]);
149        let flow_id_owned = flow_id.to_string();
150        let task_flow_id = flow_id_owned.clone();
151        let ir = task::spawn_blocking(move || pack.load_flow_ir(&task_flow_id))
152            .await
153            .context("failed to join flow metadata task")??;
154        self.flow_ir
155            .write()
156            .insert(flow_id_owned.clone(), ir.clone());
157        Ok(ir)
158    }
159
160    pub async fn execute(&self, ctx: FlowContext<'_>, input: Value) -> Result<FlowExecution> {
161        let span = tracing::info_span!(
162            "flow.execute",
163            tenant = tracing::field::Empty,
164            flow_id = tracing::field::Empty,
165            node_id = tracing::field::Empty,
166            tool = tracing::field::Empty,
167            action = tracing::field::Empty
168        );
169        annotate_span(
170            &span,
171            &FlowSpanAttributes {
172                tenant: ctx.tenant,
173                flow_id: ctx.flow_id,
174                node_id: ctx.node_id,
175                tool: ctx.tool,
176                action: ctx.action,
177            },
178        );
179        set_flow_context(
180            &self.default_env,
181            ctx.tenant,
182            ctx.flow_id,
183            ctx.node_id,
184            ctx.provider_id,
185            ctx.session_id,
186        );
187        let retry_config = ctx.retry_config;
188        let original_input = input;
189        async move {
190            let mut attempt = 0u32;
191            loop {
192                attempt += 1;
193                match self.execute_once(&ctx, original_input.clone()).await {
194                    Ok(value) => return Ok(value),
195                    Err(err) => {
196                        if attempt >= retry_config.max_attempts || !should_retry(&err) {
197                            return Err(err);
198                        }
199                        let delay = backoff_delay_ms(retry_config.base_delay_ms, attempt - 1);
200                        tracing::warn!(
201                            tenant = ctx.tenant,
202                            flow_id = ctx.flow_id,
203                            attempt,
204                            max_attempts = retry_config.max_attempts,
205                            delay_ms = delay,
206                            error = %err,
207                            "transient flow execution failure, backing off"
208                        );
209                        tokio::time::sleep(Duration::from_millis(delay)).await;
210                    }
211                }
212            }
213        }
214        .instrument(span)
215        .await
216    }
217
218    pub async fn resume(
219        &self,
220        ctx: FlowContext<'_>,
221        snapshot: FlowSnapshot,
222        input: Value,
223    ) -> Result<FlowExecution> {
224        if snapshot.flow_id != ctx.flow_id {
225            bail!(
226                "snapshot flow {} does not match requested {}",
227                snapshot.flow_id,
228                ctx.flow_id
229            );
230        }
231        let flow_ir = self.get_or_load_flow_ir(ctx.flow_id).await?;
232        let mut state = snapshot.state;
233        state.replace_input(input);
234        self.drive_flow(&ctx, flow_ir, state, Some(snapshot.next_node))
235            .await
236    }
237
238    async fn execute_once(&self, ctx: &FlowContext<'_>, input: Value) -> Result<FlowExecution> {
239        let flow_ir = self.get_or_load_flow_ir(ctx.flow_id).await?;
240        let state = ExecutionState::new(input);
241        self.drive_flow(ctx, flow_ir, state, None).await
242    }
243
244    async fn drive_flow(
245        &self,
246        ctx: &FlowContext<'_>,
247        flow_ir: FlowIR,
248        mut state: ExecutionState,
249        resume_from: Option<String>,
250    ) -> Result<FlowExecution> {
251        let mut current = flow_ir
252            .start
253            .clone()
254            .or_else(|| flow_ir.nodes.keys().next().cloned())
255            .with_context(|| format!("flow {} has no start node", flow_ir.id))?;
256        if let Some(resume) = resume_from {
257            current = resume;
258        }
259        let mut final_payload = None;
260
261        loop {
262            let node = flow_ir
263                .nodes
264                .get(&current)
265                .with_context(|| format!("node {current} not found"))?;
266
267            let context_value = state.context();
268            let payload = resolve_template_value(
269                self.template_engine.as_ref(),
270                &node.payload_expr,
271                &context_value,
272            )?;
273            let observed_payload = payload.clone();
274            let node_id = current.clone();
275            let event = NodeEvent {
276                context: ctx,
277                node_id: &node_id,
278                node,
279                payload: &observed_payload,
280            };
281            if let Some(observer) = ctx.observer {
282                observer.on_node_start(&event);
283            }
284            let DispatchOutcome {
285                output,
286                wait_reason,
287            } = self
288                .dispatch_node(ctx, &current, node, &mut state, payload)
289                .await?;
290
291            state.nodes.insert(current.clone(), output.clone());
292
293            let mut next = None;
294            let mut should_exit = false;
295            for route in &node.routes {
296                if route.out || matches!(route.to.as_deref(), Some("out")) {
297                    final_payload = Some(output.payload.clone());
298                    should_exit = true;
299                    break;
300                }
301                if let Some(to) = &route.to {
302                    next = Some(to.clone());
303                    break;
304                }
305            }
306
307            if let Some(wait_reason) = wait_reason {
308                let resume_target = next.clone().ok_or_else(|| {
309                    anyhow!("session.wait node {current} requires a non-empty route")
310                })?;
311                let mut snapshot_state = state.clone();
312                snapshot_state.clear_egress();
313                let snapshot = FlowSnapshot {
314                    flow_id: ctx.flow_id.to_string(),
315                    next_node: resume_target,
316                    state: snapshot_state,
317                };
318                let output_value = state.clone().finalize_with(None);
319                return Ok(FlowExecution::waiting(
320                    output_value,
321                    FlowWait {
322                        reason: Some(wait_reason),
323                        snapshot,
324                    },
325                ));
326            }
327
328            if should_exit {
329                break;
330            }
331
332            match next {
333                Some(n) => current = n,
334                None => {
335                    final_payload = Some(output.payload.clone());
336                    break;
337                }
338            }
339        }
340
341        let payload = final_payload.unwrap_or(Value::Null);
342        Ok(FlowExecution::completed(state.finalize_with(Some(payload))))
343    }
344
345    async fn dispatch_node(
346        &self,
347        ctx: &FlowContext<'_>,
348        _node_id: &str,
349        node: &NodeIR,
350        state: &mut ExecutionState,
351        payload: Value,
352    ) -> Result<DispatchOutcome> {
353        match node.component.as_str() {
354            "qa.process" => Ok(DispatchOutcome::complete(NodeOutput::new(payload))),
355            "mcp.exec" => self
356                .execute_mcp(ctx, payload)
357                .await
358                .map(DispatchOutcome::complete),
359            "templating.handlebars" => self
360                .execute_template(state, payload)
361                .map(DispatchOutcome::complete),
362            "flow.call" => self
363                .execute_flow_call(ctx, payload)
364                .await
365                .map(DispatchOutcome::complete),
366            component if component.starts_with("emit") => {
367                state.push_egress(payload.clone());
368                Ok(DispatchOutcome::complete(NodeOutput::new(payload)))
369            }
370            "session.wait" => {
371                let reason = extract_wait_reason(&payload);
372                Ok(DispatchOutcome::wait(NodeOutput::new(payload), reason))
373            }
374            other => bail!("unsupported node component: {other}"),
375        }
376    }
377
378    async fn execute_flow_call(&self, ctx: &FlowContext<'_>, payload: Value) -> Result<NodeOutput> {
379        #[derive(Deserialize)]
380        struct FlowCallPayload {
381            #[serde(alias = "flow")]
382            flow_id: String,
383            #[serde(default)]
384            input: Value,
385        }
386
387        let call: FlowCallPayload =
388            serde_json::from_value(payload).context("invalid payload for flow.call node")?;
389        if call.flow_id.trim().is_empty() {
390            bail!("flow.call requires a non-empty flow_id");
391        }
392
393        let sub_input = if call.input.is_null() {
394            Value::Null
395        } else {
396            call.input
397        };
398
399        let flow_id_owned = call.flow_id;
400        let action = "flow.call";
401        let sub_ctx = FlowContext {
402            tenant: ctx.tenant,
403            flow_id: flow_id_owned.as_str(),
404            node_id: None,
405            tool: ctx.tool,
406            action: Some(action),
407            session_id: ctx.session_id,
408            provider_id: ctx.provider_id,
409            retry_config: ctx.retry_config,
410            observer: ctx.observer,
411            mocks: ctx.mocks,
412        };
413
414        let execution = Box::pin(self.execute(sub_ctx, sub_input))
415            .await
416            .with_context(|| format!("flow.call failed for {}", flow_id_owned))?;
417        match execution.status {
418            FlowStatus::Completed => Ok(NodeOutput::new(execution.output)),
419            FlowStatus::Waiting(wait) => bail!(
420                "flow.call cannot pause (flow {} waiting {:?})",
421                flow_id_owned,
422                wait.reason
423            ),
424        }
425    }
426
427    async fn execute_mcp(&self, ctx: &FlowContext<'_>, payload: Value) -> Result<NodeOutput> {
428        #[cfg(not(feature = "mcp"))]
429        {
430            let _ = (ctx, payload);
431            bail!("crate built without `mcp` feature; mcp.exec nodes are unavailable");
432        }
433
434        #[cfg(feature = "mcp")]
435        {
436            #[derive(Deserialize)]
437            struct McpPayload {
438                component: String,
439                action: String,
440                #[serde(default)]
441                args: Value,
442            }
443
444            let payload: McpPayload =
445                serde_json::from_value(payload).context("invalid payload for mcp.exec node")?;
446
447            if let Some(mocks) = ctx.mocks
448                && let Some(result) = mocks.tool_short_circuit(&payload.component, &payload.action)
449            {
450                let value = result.map_err(|err| anyhow!(err))?;
451                return Ok(NodeOutput::new(value));
452            }
453
454            let request = ExecRequest {
455                component: payload.component,
456                action: payload.action,
457                args: payload.args,
458                tenant: Some(types_tenant_ctx(ctx, &self.default_env)),
459            };
460
461            let exec_config = self.exec_config.clone();
462            let exec_result =
463                task::spawn_blocking(move || greentic_mcp::exec(request, &exec_config))
464                    .await
465                    .context("failed to join mcp.exec")?;
466            let value = exec_result.map_err(|err| anyhow!(err))?;
467
468            Ok(NodeOutput::new(value))
469        }
470    }
471
472    fn execute_template(&self, state: &ExecutionState, payload: Value) -> Result<NodeOutput> {
473        let payload: TemplatePayload = serde_json::from_value(payload)
474            .context("invalid payload for templating.handlebars node")?;
475
476        let mut context = state.context();
477        if !payload.data.is_null() {
478            let data =
479                resolve_template_value(self.template_engine.as_ref(), &payload.data, &context)?;
480            merge_values(&mut context, data);
481        }
482
483        let rendered = render_template(
484            self.template_engine.as_ref(),
485            &payload.template,
486            &payload.partials,
487            &context,
488        )?;
489
490        Ok(NodeOutput::new(json!({ "text": rendered })))
491    }
492
493    pub fn flows(&self) -> &[FlowDescriptor] {
494        &self.flows
495    }
496
497    pub fn flow_by_type(&self, flow_type: &str) -> Option<&FlowDescriptor> {
498        self.flows
499            .iter()
500            .find(|descriptor| descriptor.flow_type == flow_type)
501    }
502
503    pub fn flow_by_id(&self, flow_id: &str) -> Option<&FlowDescriptor> {
504        self.flows
505            .iter()
506            .find(|descriptor| descriptor.id == flow_id)
507    }
508}
509
510pub trait ExecutionObserver: Send + Sync {
511    fn on_node_start(&self, event: &NodeEvent<'_>);
512    fn on_node_end(&self, event: &NodeEvent<'_>, output: &Value);
513    fn on_node_error(&self, event: &NodeEvent<'_>, error: &dyn StdError);
514}
515
516pub struct NodeEvent<'a> {
517    pub context: &'a FlowContext<'a>,
518    pub node_id: &'a str,
519    pub node: &'a NodeIR,
520    pub payload: &'a Value,
521}
522
523#[derive(Clone, Debug, Serialize, Deserialize)]
524pub struct ExecutionState {
525    input: Value,
526    nodes: HashMap<String, NodeOutput>,
527    egress: Vec<Value>,
528}
529
530impl ExecutionState {
531    fn new(input: Value) -> Self {
532        Self {
533            input,
534            nodes: HashMap::new(),
535            egress: Vec::new(),
536        }
537    }
538
539    fn context(&self) -> Value {
540        let mut nodes = JsonMap::new();
541        for (id, output) in &self.nodes {
542            nodes.insert(
543                id.clone(),
544                json!({
545                    "ok": output.ok,
546                    "payload": output.payload.clone(),
547                    "meta": output.meta.clone(),
548                }),
549            );
550        }
551        json!({
552            "input": self.input.clone(),
553            "nodes": nodes,
554        })
555    }
556    fn push_egress(&mut self, payload: Value) {
557        self.egress.push(payload);
558    }
559
560    fn replace_input(&mut self, input: Value) {
561        self.input = input;
562    }
563
564    fn clear_egress(&mut self) {
565        self.egress.clear();
566    }
567
568    fn finalize_with(mut self, final_payload: Option<Value>) -> Value {
569        if self.egress.is_empty() {
570            return final_payload.unwrap_or(Value::Null);
571        }
572        let mut emitted = std::mem::take(&mut self.egress);
573        if let Some(value) = final_payload {
574            match value {
575                Value::Null => {}
576                Value::Array(items) => emitted.extend(items),
577                other => emitted.push(other),
578            }
579        }
580        Value::Array(emitted)
581    }
582}
583
584#[derive(Clone, Debug, Serialize, Deserialize)]
585struct NodeOutput {
586    ok: bool,
587    payload: Value,
588    meta: Value,
589}
590
591impl NodeOutput {
592    fn new(payload: Value) -> Self {
593        Self {
594            ok: true,
595            payload,
596            meta: Value::Null,
597        }
598    }
599}
600
601struct DispatchOutcome {
602    output: NodeOutput,
603    wait_reason: Option<String>,
604}
605
606impl DispatchOutcome {
607    fn complete(output: NodeOutput) -> Self {
608        Self {
609            output,
610            wait_reason: None,
611        }
612    }
613
614    fn wait(output: NodeOutput, reason: Option<String>) -> Self {
615        Self {
616            output,
617            wait_reason: reason,
618        }
619    }
620}
621
622#[derive(Deserialize)]
623struct TemplatePayload {
624    template: String,
625    #[serde(default)]
626    partials: HashMap<String, String>,
627    #[serde(default)]
628    data: Value,
629}
630
631fn resolve_template_value(
632    engine: &Handlebars<'static>,
633    value: &Value,
634    context: &Value,
635) -> Result<Value> {
636    match value {
637        Value::String(s) => {
638            if s.contains("{{") {
639                let rendered = engine
640                    .render_template(s, context)
641                    .with_context(|| format!("failed to render template: {s}"))?;
642                Ok(Value::String(rendered))
643            } else {
644                Ok(Value::String(s.clone()))
645            }
646        }
647        Value::Array(items) => {
648            let values = items
649                .iter()
650                .map(|v| resolve_template_value(engine, v, context))
651                .collect::<Result<Vec<_>>>()?;
652            Ok(Value::Array(values))
653        }
654        Value::Object(map) => {
655            let mut resolved = JsonMap::new();
656            for (key, v) in map {
657                resolved.insert(key.clone(), resolve_template_value(engine, v, context)?);
658            }
659            Ok(Value::Object(resolved))
660        }
661        other => Ok(other.clone()),
662    }
663}
664
665fn merge_values(target: &mut Value, addition: Value) {
666    match (target, addition) {
667        (Value::Object(target_map), Value::Object(add_map)) => {
668            for (key, value) in add_map {
669                merge_values(target_map.entry(key).or_insert(Value::Null), value);
670            }
671        }
672        (slot, value) => {
673            *slot = value;
674        }
675    }
676}
677
678fn render_template(
679    base: &Handlebars<'static>,
680    template: &str,
681    partials: &HashMap<String, String>,
682    context: &Value,
683) -> Result<String> {
684    let mut engine = base.clone();
685    for (name, body) in partials {
686        engine
687            .register_template_string(name, body)
688            .with_context(|| format!("failed to register partial {name}"))?;
689    }
690    engine
691        .render_template(template, context)
692        .with_context(|| "failed to render template")
693}
694
695fn extract_wait_reason(payload: &Value) -> Option<String> {
696    match payload {
697        Value::String(s) => Some(s.clone()),
698        Value::Object(map) => map
699            .get("reason")
700            .and_then(Value::as_str)
701            .map(|value| value.to_string()),
702        _ => None,
703    }
704}
705
706#[cfg(feature = "mcp")]
707fn types_tenant_ctx(ctx: &FlowContext<'_>, default_env: &str) -> TypesTenantCtx {
708    tenant_context(
709        default_env,
710        ctx.tenant,
711        Some(ctx.flow_id),
712        ctx.node_id,
713        ctx.provider_id,
714        ctx.session_id,
715    )
716}
717
718#[cfg(test)]
719mod tests {
720    use super::*;
721    use serde_json::json;
722
723    #[test]
724    fn templating_renders_with_partials_and_data() {
725        let mut state = ExecutionState::new(json!({ "city": "London" }));
726        state.nodes.insert(
727            "forecast".to_string(),
728            NodeOutput::new(json!({ "temp": "20C" })),
729        );
730
731        let mut partials = HashMap::new();
732        partials.insert(
733            "line".to_string(),
734            "Weather in {{input.city}}: {{nodes.forecast.payload.temp}} {{extra.note}}".to_string(),
735        );
736
737        let payload = TemplatePayload {
738            template: "{{> line}}".to_string(),
739            partials,
740            data: json!({ "extra": { "note": "today" } }),
741        };
742
743        let mut base = Handlebars::new();
744        base.set_strict_mode(false);
745
746        let mut context = state.context();
747        let data = resolve_template_value(&base, &payload.data, &context).unwrap();
748        merge_values(&mut context, data);
749        let rendered =
750            render_template(&base, &payload.template, &payload.partials, &context).unwrap();
751
752        assert_eq!(rendered, "Weather in London: 20C today");
753    }
754
755    #[test]
756    fn finalize_wraps_emitted_payloads() {
757        let mut state = ExecutionState::new(json!({}));
758        state.push_egress(json!({ "text": "first" }));
759        state.push_egress(json!({ "text": "second" }));
760        let result = state.finalize_with(Some(json!({ "text": "final" })));
761        assert_eq!(
762            result,
763            json!([
764                { "text": "first" },
765                { "text": "second" },
766                { "text": "final" }
767            ])
768        );
769    }
770
771    #[test]
772    fn finalize_flattens_final_array() {
773        let mut state = ExecutionState::new(json!({}));
774        state.push_egress(json!({ "text": "only" }));
775        let result = state.finalize_with(Some(json!([
776            { "text": "extra-1" },
777            { "text": "extra-2" }
778        ])));
779        assert_eq!(
780            result,
781            json!([
782                { "text": "only" },
783                { "text": "extra-1" },
784                { "text": "extra-2" }
785            ])
786        );
787    }
788}
789
790use tracing::Instrument;
791
792pub struct FlowContext<'a> {
793    pub tenant: &'a str,
794    pub flow_id: &'a str,
795    pub node_id: Option<&'a str>,
796    pub tool: Option<&'a str>,
797    pub action: Option<&'a str>,
798    pub session_id: Option<&'a str>,
799    pub provider_id: Option<&'a str>,
800    pub retry_config: RetryConfig,
801    pub observer: Option<&'a dyn ExecutionObserver>,
802    pub mocks: Option<&'a MockLayer>,
803}
804
805#[derive(Copy, Clone)]
806pub struct RetryConfig {
807    pub max_attempts: u32,
808    pub base_delay_ms: u64,
809}
810
811fn should_retry(err: &anyhow::Error) -> bool {
812    let lower = err.to_string().to_lowercase();
813    lower.contains("transient") || lower.contains("unavailable") || lower.contains("internal")
814}
815
816impl From<McpRetryConfig> for RetryConfig {
817    fn from(value: McpRetryConfig) -> Self {
818        Self {
819            max_attempts: value.max_attempts.max(1),
820            base_delay_ms: value.base_delay_ms.max(50),
821        }
822    }
823}