1use std::collections::HashMap;
2use std::env;
3use std::error::Error as StdError;
4use std::sync::Arc;
5use std::time::Duration;
6
7use anyhow::{Context, Result, anyhow, bail};
8use greentic_flow::ir::{FlowIR, NodeIR};
9#[cfg(feature = "mcp")]
10use greentic_mcp::{ExecConfig, ExecRequest};
11#[cfg(feature = "mcp")]
12use greentic_types::TenantCtx as TypesTenantCtx;
13use handlebars::Handlebars;
14use parking_lot::RwLock;
15use serde::{Deserialize, Serialize};
16use serde_json::{Map as JsonMap, Value, json};
17use tokio::task;
18
19use super::mocks::MockLayer;
20use crate::config::{HostConfig, McpRetryConfig};
21use crate::pack::{FlowDescriptor, PackRuntime};
22#[cfg(feature = "mcp")]
23use crate::telemetry::tenant_context;
24use crate::telemetry::{FlowSpanAttributes, annotate_span, backoff_delay_ms, set_flow_context};
25
26pub struct FlowEngine {
27 packs: Vec<Arc<PackRuntime>>,
28 flows: Vec<FlowDescriptor>,
29 flow_sources: HashMap<String, usize>,
30 flow_ir: RwLock<HashMap<String, FlowIR>>,
31 #[cfg(feature = "mcp")]
32 exec_config: ExecConfig,
33 template_engine: Arc<Handlebars<'static>>,
34 default_env: String,
35}
36
37#[derive(Clone, Debug, Serialize, Deserialize)]
38pub struct FlowSnapshot {
39 pub flow_id: String,
40 pub next_node: String,
41 pub state: ExecutionState,
42}
43
44#[derive(Clone, Debug)]
45pub struct FlowWait {
46 pub reason: Option<String>,
47 pub snapshot: FlowSnapshot,
48}
49
50#[derive(Clone, Debug)]
51pub enum FlowStatus {
52 Completed,
53 Waiting(FlowWait),
54}
55
56#[derive(Clone, Debug)]
57pub struct FlowExecution {
58 pub output: Value,
59 pub status: FlowStatus,
60}
61
62impl FlowExecution {
63 fn completed(output: Value) -> Self {
64 Self {
65 output,
66 status: FlowStatus::Completed,
67 }
68 }
69
70 fn waiting(output: Value, wait: FlowWait) -> Self {
71 Self {
72 output,
73 status: FlowStatus::Waiting(wait),
74 }
75 }
76}
77
78impl FlowEngine {
79 pub async fn new(packs: Vec<Arc<PackRuntime>>, config: Arc<HostConfig>) -> Result<Self> {
80 #[cfg(not(feature = "mcp"))]
81 let _ = &config;
82 let mut flow_sources = HashMap::new();
83 let mut descriptors = Vec::new();
84 for (idx, pack) in packs.iter().enumerate() {
85 let flows = pack.list_flows().await?;
86 for flow in flows {
87 tracing::info!(
88 flow_id = %flow.id,
89 flow_type = %flow.flow_type,
90 pack_index = idx,
91 "registered flow"
92 );
93 flow_sources.insert(flow.id.clone(), idx);
94 descriptors.retain(|existing: &FlowDescriptor| existing.id != flow.id);
95 descriptors.push(flow);
96 }
97 }
98
99 #[cfg(feature = "mcp")]
100 let exec_config = config
101 .mcp_exec_config()
102 .context("failed to build MCP executor config")?;
103
104 let mut ir_map = HashMap::new();
105 for flow in &descriptors {
106 if let Some(&pack_idx) = flow_sources.get(&flow.id) {
107 let pack_clone = Arc::clone(&packs[pack_idx]);
108 let flow_id = flow.id.clone();
109 let task_flow_id = flow_id.clone();
110 match task::spawn_blocking(move || pack_clone.load_flow_ir(&task_flow_id)).await {
111 Ok(Ok(ir)) => {
112 ir_map.insert(flow_id, ir);
113 }
114 Ok(Err(err)) => {
115 tracing::warn!(flow_id = %flow.id, error = %err, "failed to load flow metadata");
116 }
117 Err(err) => {
118 tracing::warn!(flow_id = %flow.id, error = %err, "join error loading flow metadata");
119 }
120 }
121 }
122 }
123
124 let mut handlebars = Handlebars::new();
125 handlebars.set_strict_mode(false);
126
127 Ok(Self {
128 packs,
129 flows: descriptors,
130 flow_sources,
131 flow_ir: RwLock::new(ir_map),
132 #[cfg(feature = "mcp")]
133 exec_config,
134 template_engine: Arc::new(handlebars),
135 default_env: env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string()),
136 })
137 }
138
139 async fn get_or_load_flow_ir(&self, flow_id: &str) -> Result<FlowIR> {
140 if let Some(ir) = self.flow_ir.read().get(flow_id).cloned() {
141 return Ok(ir);
142 }
143
144 let pack_idx = *self
145 .flow_sources
146 .get(flow_id)
147 .with_context(|| format!("flow {flow_id} not registered"))?;
148 let pack = Arc::clone(&self.packs[pack_idx]);
149 let flow_id_owned = flow_id.to_string();
150 let task_flow_id = flow_id_owned.clone();
151 let ir = task::spawn_blocking(move || pack.load_flow_ir(&task_flow_id))
152 .await
153 .context("failed to join flow metadata task")??;
154 self.flow_ir
155 .write()
156 .insert(flow_id_owned.clone(), ir.clone());
157 Ok(ir)
158 }
159
160 pub async fn execute(&self, ctx: FlowContext<'_>, input: Value) -> Result<FlowExecution> {
161 let span = tracing::info_span!(
162 "flow.execute",
163 tenant = tracing::field::Empty,
164 flow_id = tracing::field::Empty,
165 node_id = tracing::field::Empty,
166 tool = tracing::field::Empty,
167 action = tracing::field::Empty
168 );
169 annotate_span(
170 &span,
171 &FlowSpanAttributes {
172 tenant: ctx.tenant,
173 flow_id: ctx.flow_id,
174 node_id: ctx.node_id,
175 tool: ctx.tool,
176 action: ctx.action,
177 },
178 );
179 set_flow_context(
180 &self.default_env,
181 ctx.tenant,
182 ctx.flow_id,
183 ctx.node_id,
184 ctx.provider_id,
185 ctx.session_id,
186 );
187 let retry_config = ctx.retry_config;
188 let original_input = input;
189 async move {
190 let mut attempt = 0u32;
191 loop {
192 attempt += 1;
193 match self.execute_once(&ctx, original_input.clone()).await {
194 Ok(value) => return Ok(value),
195 Err(err) => {
196 if attempt >= retry_config.max_attempts || !should_retry(&err) {
197 return Err(err);
198 }
199 let delay = backoff_delay_ms(retry_config.base_delay_ms, attempt - 1);
200 tracing::warn!(
201 tenant = ctx.tenant,
202 flow_id = ctx.flow_id,
203 attempt,
204 max_attempts = retry_config.max_attempts,
205 delay_ms = delay,
206 error = %err,
207 "transient flow execution failure, backing off"
208 );
209 tokio::time::sleep(Duration::from_millis(delay)).await;
210 }
211 }
212 }
213 }
214 .instrument(span)
215 .await
216 }
217
218 pub async fn resume(
219 &self,
220 ctx: FlowContext<'_>,
221 snapshot: FlowSnapshot,
222 input: Value,
223 ) -> Result<FlowExecution> {
224 if snapshot.flow_id != ctx.flow_id {
225 bail!(
226 "snapshot flow {} does not match requested {}",
227 snapshot.flow_id,
228 ctx.flow_id
229 );
230 }
231 let flow_ir = self.get_or_load_flow_ir(ctx.flow_id).await?;
232 let mut state = snapshot.state;
233 state.replace_input(input);
234 self.drive_flow(&ctx, flow_ir, state, Some(snapshot.next_node))
235 .await
236 }
237
238 async fn execute_once(&self, ctx: &FlowContext<'_>, input: Value) -> Result<FlowExecution> {
239 let flow_ir = self.get_or_load_flow_ir(ctx.flow_id).await?;
240 let state = ExecutionState::new(input);
241 self.drive_flow(ctx, flow_ir, state, None).await
242 }
243
244 async fn drive_flow(
245 &self,
246 ctx: &FlowContext<'_>,
247 flow_ir: FlowIR,
248 mut state: ExecutionState,
249 resume_from: Option<String>,
250 ) -> Result<FlowExecution> {
251 let mut current = flow_ir
252 .start
253 .clone()
254 .or_else(|| flow_ir.nodes.keys().next().cloned())
255 .with_context(|| format!("flow {} has no start node", flow_ir.id))?;
256 if let Some(resume) = resume_from {
257 current = resume;
258 }
259 let mut final_payload = None;
260
261 loop {
262 let node = flow_ir
263 .nodes
264 .get(¤t)
265 .with_context(|| format!("node {current} not found"))?;
266
267 let context_value = state.context();
268 let payload = resolve_template_value(
269 self.template_engine.as_ref(),
270 &node.payload_expr,
271 &context_value,
272 )?;
273 let observed_payload = payload.clone();
274 let node_id = current.clone();
275 let event = NodeEvent {
276 context: ctx,
277 node_id: &node_id,
278 node,
279 payload: &observed_payload,
280 };
281 if let Some(observer) = ctx.observer {
282 observer.on_node_start(&event);
283 }
284 let DispatchOutcome {
285 output,
286 wait_reason,
287 } = self
288 .dispatch_node(ctx, ¤t, node, &mut state, payload)
289 .await?;
290
291 state.nodes.insert(current.clone(), output.clone());
292
293 let mut next = None;
294 let mut should_exit = false;
295 for route in &node.routes {
296 if route.out || matches!(route.to.as_deref(), Some("out")) {
297 final_payload = Some(output.payload.clone());
298 should_exit = true;
299 break;
300 }
301 if let Some(to) = &route.to {
302 next = Some(to.clone());
303 break;
304 }
305 }
306
307 if let Some(wait_reason) = wait_reason {
308 let resume_target = next.clone().ok_or_else(|| {
309 anyhow!("session.wait node {current} requires a non-empty route")
310 })?;
311 let mut snapshot_state = state.clone();
312 snapshot_state.clear_egress();
313 let snapshot = FlowSnapshot {
314 flow_id: ctx.flow_id.to_string(),
315 next_node: resume_target,
316 state: snapshot_state,
317 };
318 let output_value = state.clone().finalize_with(None);
319 return Ok(FlowExecution::waiting(
320 output_value,
321 FlowWait {
322 reason: Some(wait_reason),
323 snapshot,
324 },
325 ));
326 }
327
328 if should_exit {
329 break;
330 }
331
332 match next {
333 Some(n) => current = n,
334 None => {
335 final_payload = Some(output.payload.clone());
336 break;
337 }
338 }
339 }
340
341 let payload = final_payload.unwrap_or(Value::Null);
342 Ok(FlowExecution::completed(state.finalize_with(Some(payload))))
343 }
344
345 async fn dispatch_node(
346 &self,
347 ctx: &FlowContext<'_>,
348 _node_id: &str,
349 node: &NodeIR,
350 state: &mut ExecutionState,
351 payload: Value,
352 ) -> Result<DispatchOutcome> {
353 match node.component.as_str() {
354 "qa.process" => Ok(DispatchOutcome::complete(NodeOutput::new(payload))),
355 "mcp.exec" => self
356 .execute_mcp(ctx, payload)
357 .await
358 .map(DispatchOutcome::complete),
359 "templating.handlebars" => self
360 .execute_template(state, payload)
361 .map(DispatchOutcome::complete),
362 "flow.call" => self
363 .execute_flow_call(ctx, payload)
364 .await
365 .map(DispatchOutcome::complete),
366 component if component.starts_with("emit") => {
367 state.push_egress(payload.clone());
368 Ok(DispatchOutcome::complete(NodeOutput::new(payload)))
369 }
370 "session.wait" => {
371 let reason = extract_wait_reason(&payload);
372 Ok(DispatchOutcome::wait(NodeOutput::new(payload), reason))
373 }
374 other => bail!("unsupported node component: {other}"),
375 }
376 }
377
378 async fn execute_flow_call(&self, ctx: &FlowContext<'_>, payload: Value) -> Result<NodeOutput> {
379 #[derive(Deserialize)]
380 struct FlowCallPayload {
381 #[serde(alias = "flow")]
382 flow_id: String,
383 #[serde(default)]
384 input: Value,
385 }
386
387 let call: FlowCallPayload =
388 serde_json::from_value(payload).context("invalid payload for flow.call node")?;
389 if call.flow_id.trim().is_empty() {
390 bail!("flow.call requires a non-empty flow_id");
391 }
392
393 let sub_input = if call.input.is_null() {
394 Value::Null
395 } else {
396 call.input
397 };
398
399 let flow_id_owned = call.flow_id;
400 let action = "flow.call";
401 let sub_ctx = FlowContext {
402 tenant: ctx.tenant,
403 flow_id: flow_id_owned.as_str(),
404 node_id: None,
405 tool: ctx.tool,
406 action: Some(action),
407 session_id: ctx.session_id,
408 provider_id: ctx.provider_id,
409 retry_config: ctx.retry_config,
410 observer: ctx.observer,
411 mocks: ctx.mocks,
412 };
413
414 let execution = Box::pin(self.execute(sub_ctx, sub_input))
415 .await
416 .with_context(|| format!("flow.call failed for {}", flow_id_owned))?;
417 match execution.status {
418 FlowStatus::Completed => Ok(NodeOutput::new(execution.output)),
419 FlowStatus::Waiting(wait) => bail!(
420 "flow.call cannot pause (flow {} waiting {:?})",
421 flow_id_owned,
422 wait.reason
423 ),
424 }
425 }
426
427 async fn execute_mcp(&self, ctx: &FlowContext<'_>, payload: Value) -> Result<NodeOutput> {
428 #[cfg(not(feature = "mcp"))]
429 {
430 let _ = (ctx, payload);
431 bail!("crate built without `mcp` feature; mcp.exec nodes are unavailable");
432 }
433
434 #[cfg(feature = "mcp")]
435 {
436 #[derive(Deserialize)]
437 struct McpPayload {
438 component: String,
439 action: String,
440 #[serde(default)]
441 args: Value,
442 }
443
444 let payload: McpPayload =
445 serde_json::from_value(payload).context("invalid payload for mcp.exec node")?;
446
447 if let Some(mocks) = ctx.mocks
448 && let Some(result) = mocks.tool_short_circuit(&payload.component, &payload.action)
449 {
450 let value = result.map_err(|err| anyhow!(err))?;
451 return Ok(NodeOutput::new(value));
452 }
453
454 let request = ExecRequest {
455 component: payload.component,
456 action: payload.action,
457 args: payload.args,
458 tenant: Some(types_tenant_ctx(ctx, &self.default_env)),
459 };
460
461 let exec_config = self.exec_config.clone();
462 let exec_result =
463 task::spawn_blocking(move || greentic_mcp::exec(request, &exec_config))
464 .await
465 .context("failed to join mcp.exec")?;
466 let value = exec_result.map_err(|err| anyhow!(err))?;
467
468 Ok(NodeOutput::new(value))
469 }
470 }
471
472 fn execute_template(&self, state: &ExecutionState, payload: Value) -> Result<NodeOutput> {
473 let payload: TemplatePayload = serde_json::from_value(payload)
474 .context("invalid payload for templating.handlebars node")?;
475
476 let mut context = state.context();
477 if !payload.data.is_null() {
478 let data =
479 resolve_template_value(self.template_engine.as_ref(), &payload.data, &context)?;
480 merge_values(&mut context, data);
481 }
482
483 let rendered = render_template(
484 self.template_engine.as_ref(),
485 &payload.template,
486 &payload.partials,
487 &context,
488 )?;
489
490 Ok(NodeOutput::new(json!({ "text": rendered })))
491 }
492
493 pub fn flows(&self) -> &[FlowDescriptor] {
494 &self.flows
495 }
496
497 pub fn flow_by_type(&self, flow_type: &str) -> Option<&FlowDescriptor> {
498 self.flows
499 .iter()
500 .find(|descriptor| descriptor.flow_type == flow_type)
501 }
502
503 pub fn flow_by_id(&self, flow_id: &str) -> Option<&FlowDescriptor> {
504 self.flows
505 .iter()
506 .find(|descriptor| descriptor.id == flow_id)
507 }
508}
509
510pub trait ExecutionObserver: Send + Sync {
511 fn on_node_start(&self, event: &NodeEvent<'_>);
512 fn on_node_end(&self, event: &NodeEvent<'_>, output: &Value);
513 fn on_node_error(&self, event: &NodeEvent<'_>, error: &dyn StdError);
514}
515
516pub struct NodeEvent<'a> {
517 pub context: &'a FlowContext<'a>,
518 pub node_id: &'a str,
519 pub node: &'a NodeIR,
520 pub payload: &'a Value,
521}
522
523#[derive(Clone, Debug, Serialize, Deserialize)]
524pub struct ExecutionState {
525 input: Value,
526 nodes: HashMap<String, NodeOutput>,
527 egress: Vec<Value>,
528}
529
530impl ExecutionState {
531 fn new(input: Value) -> Self {
532 Self {
533 input,
534 nodes: HashMap::new(),
535 egress: Vec::new(),
536 }
537 }
538
539 fn context(&self) -> Value {
540 let mut nodes = JsonMap::new();
541 for (id, output) in &self.nodes {
542 nodes.insert(
543 id.clone(),
544 json!({
545 "ok": output.ok,
546 "payload": output.payload.clone(),
547 "meta": output.meta.clone(),
548 }),
549 );
550 }
551 json!({
552 "input": self.input.clone(),
553 "nodes": nodes,
554 })
555 }
556 fn push_egress(&mut self, payload: Value) {
557 self.egress.push(payload);
558 }
559
560 fn replace_input(&mut self, input: Value) {
561 self.input = input;
562 }
563
564 fn clear_egress(&mut self) {
565 self.egress.clear();
566 }
567
568 fn finalize_with(mut self, final_payload: Option<Value>) -> Value {
569 if self.egress.is_empty() {
570 return final_payload.unwrap_or(Value::Null);
571 }
572 let mut emitted = std::mem::take(&mut self.egress);
573 if let Some(value) = final_payload {
574 match value {
575 Value::Null => {}
576 Value::Array(items) => emitted.extend(items),
577 other => emitted.push(other),
578 }
579 }
580 Value::Array(emitted)
581 }
582}
583
584#[derive(Clone, Debug, Serialize, Deserialize)]
585struct NodeOutput {
586 ok: bool,
587 payload: Value,
588 meta: Value,
589}
590
591impl NodeOutput {
592 fn new(payload: Value) -> Self {
593 Self {
594 ok: true,
595 payload,
596 meta: Value::Null,
597 }
598 }
599}
600
601struct DispatchOutcome {
602 output: NodeOutput,
603 wait_reason: Option<String>,
604}
605
606impl DispatchOutcome {
607 fn complete(output: NodeOutput) -> Self {
608 Self {
609 output,
610 wait_reason: None,
611 }
612 }
613
614 fn wait(output: NodeOutput, reason: Option<String>) -> Self {
615 Self {
616 output,
617 wait_reason: reason,
618 }
619 }
620}
621
622#[derive(Deserialize)]
623struct TemplatePayload {
624 template: String,
625 #[serde(default)]
626 partials: HashMap<String, String>,
627 #[serde(default)]
628 data: Value,
629}
630
631fn resolve_template_value(
632 engine: &Handlebars<'static>,
633 value: &Value,
634 context: &Value,
635) -> Result<Value> {
636 match value {
637 Value::String(s) => {
638 if s.contains("{{") {
639 let rendered = engine
640 .render_template(s, context)
641 .with_context(|| format!("failed to render template: {s}"))?;
642 Ok(Value::String(rendered))
643 } else {
644 Ok(Value::String(s.clone()))
645 }
646 }
647 Value::Array(items) => {
648 let values = items
649 .iter()
650 .map(|v| resolve_template_value(engine, v, context))
651 .collect::<Result<Vec<_>>>()?;
652 Ok(Value::Array(values))
653 }
654 Value::Object(map) => {
655 let mut resolved = JsonMap::new();
656 for (key, v) in map {
657 resolved.insert(key.clone(), resolve_template_value(engine, v, context)?);
658 }
659 Ok(Value::Object(resolved))
660 }
661 other => Ok(other.clone()),
662 }
663}
664
665fn merge_values(target: &mut Value, addition: Value) {
666 match (target, addition) {
667 (Value::Object(target_map), Value::Object(add_map)) => {
668 for (key, value) in add_map {
669 merge_values(target_map.entry(key).or_insert(Value::Null), value);
670 }
671 }
672 (slot, value) => {
673 *slot = value;
674 }
675 }
676}
677
678fn render_template(
679 base: &Handlebars<'static>,
680 template: &str,
681 partials: &HashMap<String, String>,
682 context: &Value,
683) -> Result<String> {
684 let mut engine = base.clone();
685 for (name, body) in partials {
686 engine
687 .register_template_string(name, body)
688 .with_context(|| format!("failed to register partial {name}"))?;
689 }
690 engine
691 .render_template(template, context)
692 .with_context(|| "failed to render template")
693}
694
695fn extract_wait_reason(payload: &Value) -> Option<String> {
696 match payload {
697 Value::String(s) => Some(s.clone()),
698 Value::Object(map) => map
699 .get("reason")
700 .and_then(Value::as_str)
701 .map(|value| value.to_string()),
702 _ => None,
703 }
704}
705
706#[cfg(feature = "mcp")]
707fn types_tenant_ctx(ctx: &FlowContext<'_>, default_env: &str) -> TypesTenantCtx {
708 tenant_context(
709 default_env,
710 ctx.tenant,
711 Some(ctx.flow_id),
712 ctx.node_id,
713 ctx.provider_id,
714 ctx.session_id,
715 )
716}
717
718#[cfg(test)]
719mod tests {
720 use super::*;
721 use serde_json::json;
722
723 #[test]
724 fn templating_renders_with_partials_and_data() {
725 let mut state = ExecutionState::new(json!({ "city": "London" }));
726 state.nodes.insert(
727 "forecast".to_string(),
728 NodeOutput::new(json!({ "temp": "20C" })),
729 );
730
731 let mut partials = HashMap::new();
732 partials.insert(
733 "line".to_string(),
734 "Weather in {{input.city}}: {{nodes.forecast.payload.temp}} {{extra.note}}".to_string(),
735 );
736
737 let payload = TemplatePayload {
738 template: "{{> line}}".to_string(),
739 partials,
740 data: json!({ "extra": { "note": "today" } }),
741 };
742
743 let mut base = Handlebars::new();
744 base.set_strict_mode(false);
745
746 let mut context = state.context();
747 let data = resolve_template_value(&base, &payload.data, &context).unwrap();
748 merge_values(&mut context, data);
749 let rendered =
750 render_template(&base, &payload.template, &payload.partials, &context).unwrap();
751
752 assert_eq!(rendered, "Weather in London: 20C today");
753 }
754
755 #[test]
756 fn finalize_wraps_emitted_payloads() {
757 let mut state = ExecutionState::new(json!({}));
758 state.push_egress(json!({ "text": "first" }));
759 state.push_egress(json!({ "text": "second" }));
760 let result = state.finalize_with(Some(json!({ "text": "final" })));
761 assert_eq!(
762 result,
763 json!([
764 { "text": "first" },
765 { "text": "second" },
766 { "text": "final" }
767 ])
768 );
769 }
770
771 #[test]
772 fn finalize_flattens_final_array() {
773 let mut state = ExecutionState::new(json!({}));
774 state.push_egress(json!({ "text": "only" }));
775 let result = state.finalize_with(Some(json!([
776 { "text": "extra-1" },
777 { "text": "extra-2" }
778 ])));
779 assert_eq!(
780 result,
781 json!([
782 { "text": "only" },
783 { "text": "extra-1" },
784 { "text": "extra-2" }
785 ])
786 );
787 }
788}
789
790use tracing::Instrument;
791
792pub struct FlowContext<'a> {
793 pub tenant: &'a str,
794 pub flow_id: &'a str,
795 pub node_id: Option<&'a str>,
796 pub tool: Option<&'a str>,
797 pub action: Option<&'a str>,
798 pub session_id: Option<&'a str>,
799 pub provider_id: Option<&'a str>,
800 pub retry_config: RetryConfig,
801 pub observer: Option<&'a dyn ExecutionObserver>,
802 pub mocks: Option<&'a MockLayer>,
803}
804
805#[derive(Copy, Clone)]
806pub struct RetryConfig {
807 pub max_attempts: u32,
808 pub base_delay_ms: u64,
809}
810
811fn should_retry(err: &anyhow::Error) -> bool {
812 let lower = err.to_string().to_lowercase();
813 lower.contains("transient") || lower.contains("unavailable") || lower.contains("internal")
814}
815
816impl From<McpRetryConfig> for RetryConfig {
817 fn from(value: McpRetryConfig) -> Self {
818 Self {
819 max_attempts: value.max_attempts.max(1),
820 base_delay_ms: value.base_delay_ms.max(50),
821 }
822 }
823}