Skip to main content

runkon_flow/traits/
action_executor.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use crate::engine_error::EngineError;
6use crate::traits::run_context::RunContext;
7
8/// Trait for pluggable action execution.
9pub trait ActionExecutor: Send + Sync {
10    #[allow(dead_code)]
11    fn name(&self) -> &str;
12    fn execute(
13        &self,
14        ctx: &dyn RunContext,
15        info: &StepInfo,
16        params: &ActionParams,
17    ) -> Result<ActionOutput, EngineError>;
18    #[allow(dead_code)]
19    fn cancel(&self, execution_id: &str) -> Result<(), EngineError> {
20        let _ = execution_id;
21        Ok(())
22    }
23}
24
25/// Engine-populated per-call info for a workflow step.
26pub struct StepInfo {
27    pub step_id: String,
28    pub step_timeout: Duration,
29}
30
31/// Per-invocation inputs passed to an `ActionExecutor`.
32pub struct ActionParams {
33    pub name: String,
34    pub inputs: Arc<HashMap<String, String>>,
35    #[allow(dead_code)]
36    pub retries_remaining: u32,
37    pub retry_error: Option<String>,
38    pub snippets: Vec<String>,
39    pub dry_run: bool,
40    #[allow(dead_code)]
41    pub gate_feedback: Option<String>,
42    pub extensions: crate::extensions::Extensions,
43    /// Optional named variant of the executor's underlying tool/agent.
44    ///
45    /// Cross-runtime convention — every major terminal AI tool uses `--model`:
46    /// Claude (`claude-opus-4-7`), OpenAI Codex (`gpt-5.4`, `gpt-5.3-codex`),
47    /// Moonshot Kimi (`k2.5`), Google Gemini (`gemini-2.5-flash`), Simon
48    /// Willison's `llm` CLI, `aichat`, image-gen tools, etc.
49    ///
50    /// Conductor's `CliRuntime` substitutes `{{model}}` into a configurable arg
51    /// template so any of these tools can be driven by a runkon-flow workflow
52    /// (`runkon-runtimes/src/runtime/cli.rs`).
53    ///
54    /// Executors that have no concept of named variants (e.g. `SendEmailExecutor`,
55    /// `HttpRequestExecutor`) ignore the field. `None` means "use the executor's
56    /// default."
57    pub model: Option<String>,
58    /// When `Some`, names the identity this step's action should act as.
59    /// Executor implementations resolve it into harness-defined auth
60    /// material — typically credentials threaded into the spawned agent.
61    /// Examples:
62    ///
63    /// - GitHub App installation name → `GH_TOKEN`
64    /// - AWS service account ID → `AWS_ACCESS_KEY_ID` / related vars
65    /// - Slack bot user ID → `SLACK_BOT_TOKEN`
66    /// - Agent persona key → API key scoped to that persona
67    ///
68    /// Executors that don't model named identities ignore the field.
69    pub as_identity: Option<String>,
70    pub plugin_dirs: Vec<String>,
71}
72
73/// Output produced by an `ActionExecutor` on success.
74#[derive(Debug, Default)]
75pub struct ActionOutput {
76    pub markers: Vec<String>,
77    pub context: Option<String>,
78    pub result_text: Option<String>,
79    pub structured_output: Option<String>,
80    /// Executor-specific key/value metadata. Claude executors populate the seven
81    /// metric keys defined in `runkon_flow::constants::metadata_keys`.
82    pub metadata: HashMap<String, String>,
83    pub child_run_id: Option<String>,
84}
85
86/// Holds named and fallback `ActionExecutor` implementations.
87pub struct ActionRegistry {
88    named: HashMap<String, Box<dyn ActionExecutor>>,
89    fallback: Option<Box<dyn ActionExecutor>>,
90}
91
92impl ActionRegistry {
93    /// Construct a registry from pre-built maps (called only by `FlowEngineBuilder`).
94    pub(crate) fn new(
95        named: HashMap<String, Box<dyn ActionExecutor>>,
96        fallback: Option<Box<dyn ActionExecutor>>,
97    ) -> Self {
98        Self { named, fallback }
99    }
100
101    /// Construct a registry for external consumers that build registries outside the
102    /// `FlowEngineBuilder` pipeline — such as bridge adapters or integration-test harnesses
103    /// — that cannot use the builder's fluent API.
104    pub fn from_executors(
105        named: HashMap<String, Box<dyn ActionExecutor>>,
106        fallback: Option<Box<dyn ActionExecutor>>,
107    ) -> Self {
108        Self::new(named, fallback)
109    }
110
111    /// Returns `true` if the named executor is registered OR a fallback is configured.
112    ///
113    /// Mirrors the fallback semantics of `dispatch()`: a harness that registers only
114    /// a fallback executor passes all action name checks.
115    pub fn has_action(&self, name: &str) -> bool {
116        self.named.contains_key(name) || self.fallback.is_some()
117    }
118
119    fn find_executor(&self, name: &str) -> Option<&dyn ActionExecutor> {
120        self.named
121            .get(name)
122            .map(|e| e.as_ref())
123            .or(self.fallback.as_deref())
124    }
125
126    /// Find the executor for `name` and run it.
127    pub fn dispatch(
128        &self,
129        name: &str,
130        ctx: &dyn RunContext,
131        info: &StepInfo,
132        params: &ActionParams,
133    ) -> Result<ActionOutput, EngineError> {
134        match self.find_executor(name) {
135            Some(e) => e.execute(ctx, info, params),
136            None => Err(EngineError::Workflow(format!(
137                "no registered ActionExecutor for '{}' and no fallback configured",
138                name
139            ))),
140        }
141    }
142
143    /// Call `cancel()` on the executor for `name`, if registered.
144    /// Used by `FlowEngine::cancel_run()` to fire-and-forget executor-level cancellation.
145    pub fn cancel(&self, name: &str, execution_id: &str) -> Result<(), EngineError> {
146        match self.find_executor(name) {
147            Some(e) => e.cancel(execution_id),
148            None => Ok(()),
149        }
150    }
151}
152
153#[cfg(test)]
154mod tests {
155    use super::*;
156    use crate::test_helpers::{make_params, make_run_ctx, make_step_info};
157
158    struct NoopExecutor;
159
160    impl ActionExecutor for NoopExecutor {
161        fn name(&self) -> &str {
162            "noop"
163        }
164        fn execute(
165            &self,
166            _ctx: &dyn RunContext,
167            _info: &StepInfo,
168            _params: &ActionParams,
169        ) -> Result<ActionOutput, EngineError> {
170            Ok(ActionOutput {
171                markers: vec!["done".to_string()],
172                context: Some("noop ran".to_string()),
173                ..Default::default()
174            })
175        }
176    }
177
178    #[test]
179    fn dispatch_named_executor() {
180        let registry = ActionRegistry::new(
181            [(
182                "noop".to_string(),
183                Box::new(NoopExecutor) as Box<dyn ActionExecutor>,
184            )]
185            .into_iter()
186            .collect(),
187            None,
188        );
189        let ctx = make_run_ctx();
190        let info = make_step_info();
191        let params = make_params("noop");
192        let output = registry
193            .dispatch("noop", ctx.as_ref(), &info, &params)
194            .unwrap();
195        assert_eq!(output.markers, vec!["done"]);
196    }
197
198    #[test]
199    fn dispatch_fallback_when_no_named_match() {
200        let registry = ActionRegistry::new(
201            std::collections::HashMap::new(),
202            Some(Box::new(NoopExecutor)),
203        );
204        let ctx = make_run_ctx();
205        let info = make_step_info();
206        let params = make_params("anything");
207        let output = registry
208            .dispatch("anything", ctx.as_ref(), &info, &params)
209            .unwrap();
210        assert_eq!(output.markers, vec!["done"]);
211    }
212
213    #[test]
214    fn dispatch_error_when_no_executor_or_fallback() {
215        let registry = ActionRegistry::new(std::collections::HashMap::new(), None);
216        let ctx = make_run_ctx();
217        let info = make_step_info();
218        let params = make_params("missing");
219        let err = registry
220            .dispatch("missing", ctx.as_ref(), &info, &params)
221            .unwrap_err();
222        assert!(
223            err.to_string()
224                .contains("no registered ActionExecutor for 'missing'"),
225            "got: {err}"
226        );
227    }
228
229    #[test]
230    fn cancel_default_impl_is_noop() {
231        let executor = NoopExecutor;
232        assert!(executor.cancel("any-id").is_ok());
233    }
234
235    #[test]
236    fn has_action_named_executor_found() {
237        let registry = ActionRegistry::new(
238            [(
239                "noop".to_string(),
240                Box::new(NoopExecutor) as Box<dyn ActionExecutor>,
241            )]
242            .into_iter()
243            .collect(),
244            None,
245        );
246        assert!(registry.has_action("noop"));
247        assert!(!registry.has_action("missing"));
248    }
249
250    #[test]
251    fn has_action_true_with_fallback_regardless_of_name() {
252        let registry = ActionRegistry::new(
253            std::collections::HashMap::new(),
254            Some(Box::new(NoopExecutor)),
255        );
256        assert!(registry.has_action("anything"));
257        assert!(registry.has_action("also_this"));
258    }
259
260    #[test]
261    fn has_action_false_when_empty() {
262        let registry = ActionRegistry::new(std::collections::HashMap::new(), None);
263        assert!(!registry.has_action("noop"));
264    }
265}