1use std::collections::HashMap;
10use std::sync::Arc;
11use std::time::Instant;
12
13use chrono::Utc;
14use serde_json::Value;
15use tracing::{error, info};
16use uuid::Uuid;
17
18use ironflow_core::provider::AgentProvider;
19use ironflow_store::error::StoreError;
20use ironflow_store::models::{NewRun, Run, RunStatus, RunUpdate, TriggerKind};
21use ironflow_store::store::RunStore;
22
23use crate::context::WorkflowContext;
24use crate::error::EngineError;
25use crate::handler::{WorkflowHandler, WorkflowInfo};
26
27pub struct Engine {
68 store: Arc<dyn RunStore>,
69 provider: Arc<dyn AgentProvider>,
70 handlers: HashMap<String, Arc<dyn WorkflowHandler>>,
71}
72
73impl Engine {
74 pub fn new(store: Arc<dyn RunStore>, provider: Arc<dyn AgentProvider>) -> Self {
90 Self {
91 store,
92 provider,
93 handlers: HashMap::new(),
94 }
95 }
96
97 pub fn store(&self) -> &Arc<dyn RunStore> {
99 &self.store
100 }
101
102 pub fn provider(&self) -> &Arc<dyn AgentProvider> {
104 &self.provider
105 }
106
107 fn build_context(&self, run_id: Uuid) -> WorkflowContext {
109 let handlers = self.handlers.clone();
110 let resolver: crate::context::HandlerResolver =
111 Arc::new(move |name: &str| handlers.get(name).cloned());
112 WorkflowContext::with_handler_resolver(
113 run_id,
114 self.store.clone(),
115 self.provider.clone(),
116 resolver,
117 )
118 }
119
120 pub fn register(&mut self, handler: impl WorkflowHandler + 'static) -> Result<(), EngineError> {
164 let name = handler.name().to_string();
165 if self.handlers.contains_key(&name) {
166 return Err(EngineError::InvalidWorkflow(format!(
167 "handler '{}' already registered",
168 name
169 )));
170 }
171 self.handlers.insert(name, Arc::new(handler));
172 Ok(())
173 }
174
175 pub fn register_boxed(&mut self, handler: Box<dyn WorkflowHandler>) -> Result<(), EngineError> {
182 let name = handler.name().to_string();
183 if self.handlers.contains_key(&name) {
184 return Err(EngineError::InvalidWorkflow(format!(
185 "handler '{}' already registered",
186 name
187 )));
188 }
189 self.handlers.insert(name, Arc::from(handler));
190 Ok(())
191 }
192
193 pub fn get_handler(&self, name: &str) -> Option<&Arc<dyn WorkflowHandler>> {
195 self.handlers.get(name)
196 }
197
198 pub fn handler_names(&self) -> Vec<&str> {
200 self.handlers.keys().map(|s| s.as_str()).collect()
201 }
202
203 pub fn handler_info(&self, name: &str) -> Option<WorkflowInfo> {
205 self.handlers.get(name).map(|h| h.describe())
206 }
207
208 #[tracing::instrument(name = "engine.run_handler", skip_all, fields(workflow = %handler_name))]
238 pub async fn run_handler(
239 &self,
240 handler_name: &str,
241 trigger: TriggerKind,
242 payload: Value,
243 ) -> Result<Run, EngineError> {
244 let handler = self
245 .handlers
246 .get(handler_name)
247 .ok_or_else(|| {
248 EngineError::InvalidWorkflow(format!("no handler registered: {handler_name}"))
249 })?
250 .clone();
251
252 let run = self
253 .store
254 .create_run(NewRun {
255 workflow_name: handler_name.to_string(),
256 trigger,
257 payload,
258 max_retries: 0,
259 })
260 .await?;
261
262 let run_id = run.id;
263 info!(run_id = %run_id, "run created");
264
265 self.store
266 .update_run_status(run_id, RunStatus::Running)
267 .await?;
268
269 let run_start = Instant::now();
270 let mut ctx = self.build_context(run_id);
271
272 let result = handler.execute(&mut ctx).await;
273 self.finalize_run(run_id, result, &ctx, run_start).await
274 }
275
276 #[tracing::instrument(name = "engine.enqueue_handler", skip_all, fields(workflow = %handler_name))]
285 pub async fn enqueue_handler(
286 &self,
287 handler_name: &str,
288 trigger: TriggerKind,
289 payload: Value,
290 max_retries: u32,
291 ) -> Result<Run, EngineError> {
292 if !self.handlers.contains_key(handler_name) {
293 return Err(EngineError::InvalidWorkflow(format!(
294 "no handler registered: {handler_name}"
295 )));
296 }
297
298 let run = self
299 .store
300 .create_run(NewRun {
301 workflow_name: handler_name.to_string(),
302 trigger,
303 payload,
304 max_retries,
305 })
306 .await?;
307
308 info!(run_id = %run.id, workflow = %handler_name, "handler run enqueued");
309 Ok(run)
310 }
311
312 #[tracing::instrument(name = "engine.execute_handler_run", skip_all, fields(run_id = %run_id))]
321 pub async fn execute_handler_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
322 let run = self
323 .store
324 .get_run(run_id)
325 .await?
326 .ok_or(EngineError::Store(StoreError::RunNotFound(run_id)))?;
327
328 let handler = self
329 .handlers
330 .get(&run.workflow_name)
331 .ok_or_else(|| {
332 EngineError::InvalidWorkflow(format!(
333 "no handler registered: {}",
334 run.workflow_name
335 ))
336 })?
337 .clone();
338
339 let run_start = Instant::now();
340 let mut ctx = self.build_context(run_id);
341
342 let result = handler.execute(&mut ctx).await;
343 self.finalize_run(run_id, result, &ctx, run_start).await
344 }
345
346 #[tracing::instrument(name = "engine.execute_run", skip_all, fields(run_id = %run_id))]
354 pub async fn execute_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
355 self.execute_handler_run(run_id).await
356 }
357
358 async fn finalize_run(
367 &self,
368 run_id: Uuid,
369 result: Result<(), EngineError>,
370 ctx: &WorkflowContext,
371 run_start: Instant,
372 ) -> Result<Run, EngineError> {
373 let total_duration = run_start.elapsed().as_millis() as u64;
374 let completed_at = Utc::now();
375
376 match result {
377 Ok(()) => {
378 self.store
379 .update_run(
380 run_id,
381 RunUpdate {
382 status: Some(RunStatus::Completed),
383 cost_usd: Some(ctx.total_cost_usd()),
384 duration_ms: Some(total_duration),
385 completed_at: Some(completed_at),
386 ..RunUpdate::default()
387 },
388 )
389 .await?;
390
391 info!(
392 run_id = %run_id,
393 cost_usd = %ctx.total_cost_usd(),
394 duration_ms = total_duration,
395 "run completed"
396 );
397 }
398 Err(err) => {
399 if let Err(store_err) = self
400 .store
401 .update_run(
402 run_id,
403 RunUpdate {
404 status: Some(RunStatus::Failed),
405 error: Some(err.to_string()),
406 cost_usd: Some(ctx.total_cost_usd()),
407 duration_ms: Some(total_duration),
408 completed_at: Some(completed_at),
409 ..RunUpdate::default()
410 },
411 )
412 .await
413 {
414 error!(run_id = %run_id, store_error = %store_err, "failed to persist run failure");
415 }
416
417 error!(run_id = %run_id, error = %err, "run failed");
418 return Err(err);
419 }
420 }
421
422 self.store
423 .get_run(run_id)
424 .await?
425 .ok_or(EngineError::Store(StoreError::RunNotFound(run_id)))
426 }
427}
428
429impl std::fmt::Debug for Engine {
430 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
431 f.debug_struct("Engine")
432 .field("handlers", &self.handlers.keys().collect::<Vec<_>>())
433 .finish_non_exhaustive()
434 }
435}
436
437#[cfg(test)]
438mod tests {
439 use super::*;
440 use crate::config::ShellConfig;
441 use crate::handler::{HandlerFuture, WorkflowHandler};
442 use ironflow_core::providers::claude::ClaudeCodeProvider;
443 use ironflow_core::providers::record_replay::RecordReplayProvider;
444 use ironflow_store::memory::InMemoryStore;
445 use serde_json::json;
446
447 struct EchoWorkflow;
449
450 impl WorkflowHandler for EchoWorkflow {
451 fn name(&self) -> &str {
452 "echo-workflow"
453 }
454
455 fn describe(&self) -> WorkflowInfo {
456 WorkflowInfo {
457 description: "A simple workflow that echoes hello".to_string(),
458 source_code: None,
459 sub_workflows: Vec::new(),
460 }
461 }
462
463 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
464 Box::pin(async move {
465 ctx.shell("greet", ShellConfig::new("echo hello")).await?;
466 Ok(())
467 })
468 }
469 }
470
471 struct FailingWorkflow;
473
474 impl WorkflowHandler for FailingWorkflow {
475 fn name(&self) -> &str {
476 "failing-workflow"
477 }
478
479 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
480 Box::pin(async move {
481 ctx.shell("fail", ShellConfig::new("exit 1")).await?;
482 Ok(())
483 })
484 }
485 }
486
487 fn create_test_engine() -> Engine {
488 let store = Arc::new(InMemoryStore::new());
489 let inner = ClaudeCodeProvider::new();
490 let provider: Arc<dyn AgentProvider> = Arc::new(RecordReplayProvider::replay(
491 inner,
492 "/tmp/ironflow-fixtures",
493 ));
494 Engine::new(store, provider)
495 }
496
497 #[test]
498 fn engine_new_creates_instance() {
499 let engine = create_test_engine();
500 assert_eq!(engine.handler_names().len(), 0);
501 }
502
503 #[test]
504 fn engine_register_handler() {
505 let mut engine = create_test_engine();
506 let result = engine.register(EchoWorkflow);
507 assert!(result.is_ok());
508 assert_eq!(engine.handler_names().len(), 1);
509 assert!(engine.handler_names().contains(&"echo-workflow"));
510 }
511
512 #[test]
513 fn engine_register_duplicate_returns_error() {
514 let mut engine = create_test_engine();
515 engine.register(EchoWorkflow).unwrap();
516 let result = engine.register(EchoWorkflow);
517 assert!(result.is_err());
518 }
519
520 #[test]
521 fn engine_get_handler_found() {
522 let mut engine = create_test_engine();
523 engine.register(EchoWorkflow).unwrap();
524 let handler = engine.get_handler("echo-workflow");
525 assert!(handler.is_some());
526 }
527
528 #[test]
529 fn engine_get_handler_not_found() {
530 let engine = create_test_engine();
531 let handler = engine.get_handler("nonexistent");
532 assert!(handler.is_none());
533 }
534
535 #[test]
536 fn engine_handler_names_lists_all() {
537 let mut engine = create_test_engine();
538 engine.register(EchoWorkflow).unwrap();
539 engine.register(FailingWorkflow).unwrap();
540 let names = engine.handler_names();
541 assert_eq!(names.len(), 2);
542 assert!(names.contains(&"echo-workflow"));
543 assert!(names.contains(&"failing-workflow"));
544 }
545
546 #[test]
547 fn engine_handler_info_returns_description() {
548 let mut engine = create_test_engine();
549 engine.register(EchoWorkflow).unwrap();
550 let info = engine.handler_info("echo-workflow");
551 assert!(info.is_some());
552 let info = info.unwrap();
553 assert_eq!(info.description, "A simple workflow that echoes hello");
554 }
555
556 #[tokio::test]
557 async fn engine_unknown_workflow_returns_error() {
558 let engine = create_test_engine();
559 let result = engine
560 .run_handler("unknown", TriggerKind::Manual, json!({}))
561 .await;
562 assert!(result.is_err());
563 match result {
564 Err(EngineError::InvalidWorkflow(msg)) => {
565 assert!(msg.contains("no handler registered"));
566 }
567 _ => panic!("expected InvalidWorkflow error"),
568 }
569 }
570
571 #[tokio::test]
572 async fn engine_enqueue_handler_creates_pending_run() {
573 let mut engine = create_test_engine();
574 engine.register(EchoWorkflow).unwrap();
575
576 let run = engine
577 .enqueue_handler("echo-workflow", TriggerKind::Manual, json!({}), 0)
578 .await
579 .unwrap();
580 assert_eq!(run.status.state, RunStatus::Pending);
581 assert_eq!(run.workflow_name, "echo-workflow");
582 }
583
584 #[tokio::test]
585 async fn engine_register_boxed() {
586 let mut engine = create_test_engine();
587 let handler: Box<dyn WorkflowHandler> = Box::new(EchoWorkflow);
588 let result = engine.register_boxed(handler);
589 assert!(result.is_ok());
590 assert_eq!(engine.handler_names().len(), 1);
591 }
592
593 #[tokio::test]
594 async fn engine_store_and_provider_accessors() {
595 let store = Arc::new(InMemoryStore::new());
596 let inner = ClaudeCodeProvider::new();
597 let provider: Arc<dyn AgentProvider> = Arc::new(RecordReplayProvider::replay(
598 inner,
599 "/tmp/ironflow-fixtures",
600 ));
601 let engine = Engine::new(store.clone(), provider.clone());
602
603 let _ = engine.store();
605 let _ = engine.provider();
606 }
607
608 use crate::operation::Operation;
613 use ironflow_store::models::StepKind;
614 use std::future::Future;
615 use std::pin::Pin;
616
617 struct FakeGitlabOp {
618 project_id: u64,
619 title: String,
620 }
621
622 impl Operation for FakeGitlabOp {
623 fn kind(&self) -> &str {
624 "gitlab"
625 }
626
627 fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
628 Box::pin(async move {
629 Ok(json!({
630 "issue_id": 42,
631 "project_id": self.project_id,
632 "title": self.title,
633 }))
634 })
635 }
636
637 fn input(&self) -> Option<Value> {
638 Some(json!({
639 "project_id": self.project_id,
640 "title": self.title,
641 }))
642 }
643 }
644
645 struct FailingOp;
646
647 impl Operation for FailingOp {
648 fn kind(&self) -> &str {
649 "broken-service"
650 }
651
652 fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
653 Box::pin(async move { Err(EngineError::StepConfig("service unavailable".to_string())) })
654 }
655 }
656
657 struct OperationWorkflow;
658
659 impl WorkflowHandler for OperationWorkflow {
660 fn name(&self) -> &str {
661 "operation-workflow"
662 }
663
664 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
665 Box::pin(async move {
666 let op = FakeGitlabOp {
667 project_id: 123,
668 title: "Bug report".to_string(),
669 };
670 ctx.operation("create-issue", &op).await?;
671 Ok(())
672 })
673 }
674 }
675
676 struct FailingOperationWorkflow;
677
678 impl WorkflowHandler for FailingOperationWorkflow {
679 fn name(&self) -> &str {
680 "failing-operation-workflow"
681 }
682
683 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
684 Box::pin(async move {
685 ctx.operation("broken-call", &FailingOp).await?;
686 Ok(())
687 })
688 }
689 }
690
691 struct MixedWorkflow;
692
693 impl WorkflowHandler for MixedWorkflow {
694 fn name(&self) -> &str {
695 "mixed-workflow"
696 }
697
698 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
699 Box::pin(async move {
700 ctx.shell("build", ShellConfig::new("echo built")).await?;
701 let op = FakeGitlabOp {
702 project_id: 456,
703 title: "Deploy done".to_string(),
704 };
705 let result = ctx.operation("notify-gitlab", &op).await?;
706 assert_eq!(result.output["issue_id"], 42);
707 Ok(())
708 })
709 }
710 }
711
712 #[tokio::test]
713 async fn operation_step_happy_path() {
714 let mut engine = create_test_engine();
715 engine.register(OperationWorkflow).unwrap();
716
717 let run = engine
718 .run_handler("operation-workflow", TriggerKind::Manual, json!({}))
719 .await
720 .unwrap();
721
722 assert_eq!(run.status.state, RunStatus::Completed);
723
724 let steps = engine.store().list_steps(run.id).await.unwrap();
725
726 assert_eq!(steps.len(), 1);
727 assert_eq!(steps[0].name, "create-issue");
728 assert_eq!(steps[0].kind, StepKind::Custom("gitlab".to_string()));
729 assert_eq!(
730 steps[0].status.state,
731 ironflow_store::models::StepStatus::Completed
732 );
733
734 let output = steps[0].output.as_ref().unwrap();
735 assert_eq!(output["issue_id"], 42);
736 assert_eq!(output["project_id"], 123);
737
738 let input = steps[0].input.as_ref().unwrap();
739 assert_eq!(input["project_id"], 123);
740 assert_eq!(input["title"], "Bug report");
741 }
742
743 #[tokio::test]
744 async fn operation_step_failure_marks_run_failed() {
745 let mut engine = create_test_engine();
746 engine.register(FailingOperationWorkflow).unwrap();
747
748 let result = engine
749 .run_handler("failing-operation-workflow", TriggerKind::Manual, json!({}))
750 .await;
751
752 assert!(result.is_err());
753 }
754
755 #[tokio::test]
756 async fn operation_mixed_with_shell_steps() {
757 let mut engine = create_test_engine();
758 engine.register(MixedWorkflow).unwrap();
759
760 let run = engine
761 .run_handler("mixed-workflow", TriggerKind::Manual, json!({}))
762 .await
763 .unwrap();
764
765 assert_eq!(run.status.state, RunStatus::Completed);
766
767 let steps = engine.store().list_steps(run.id).await.unwrap();
768
769 assert_eq!(steps.len(), 2);
770 assert_eq!(steps[0].kind, StepKind::Shell);
771 assert_eq!(steps[1].kind, StepKind::Custom("gitlab".to_string()));
772 assert_eq!(steps[0].position, 0);
773 assert_eq!(steps[1].position, 1);
774 }
775}