use super::*;
use base64::Engine;
mod call_function;
mod call_http;
mod control_flow;
mod do_task;
mod e2e;
mod emit;
mod export;
mod expression;
mod for_loop;
mod fork;
mod listen;
mod raise;
mod run_shell;
mod schedule;
mod set;
mod switch;
mod task;
mod try_catch;
mod wait;
mod workflow;
use crate::error::WorkflowError;
use crate::events::EventBus;
use serde_json::json;
use std::collections::HashMap;
use std::path::Path;
use warp::Filter;
use warp::Reply;
async fn run_workflow_from_yaml(yaml_path: &str, input: Value) -> WorkflowResult<Value> {
let yaml_str = std::fs::read_to_string(yaml_path).map_err(|e| {
WorkflowError::runtime(format!("failed to read '{}': {}", yaml_path, e), "/", "/")
})?;
let workflow: WorkflowDefinition = serde_yaml::from_str(&yaml_str).map_err(|e| {
WorkflowError::runtime(format!("failed to parse '{}': {}", yaml_path, e), "/", "/")
})?;
let runner = WorkflowRunner::new(workflow)?;
runner.run(input).await
}
async fn run_workflow_from_yaml_with_secrets(
yaml_path: &str,
input: Value,
secret_manager: Arc<dyn SecretManager>,
) -> WorkflowResult<Value> {
let yaml_str = std::fs::read_to_string(yaml_path).map_err(|e| {
WorkflowError::runtime(format!("failed to read '{}': {}", yaml_path, e), "/", "/")
})?;
let workflow: WorkflowDefinition = serde_yaml::from_str(&yaml_str).map_err(|e| {
WorkflowError::runtime(format!("failed to parse '{}': {}", yaml_path, e), "/", "/")
})?;
let runner = WorkflowRunner::new(workflow)?.with_secret_manager(secret_manager);
runner.run(input).await
}
async fn run_workflow_yaml(yaml_str: &str, input: Value) -> WorkflowResult<Value> {
let workflow: WorkflowDefinition = serde_yaml::from_str(yaml_str)
.map_err(|e| WorkflowError::runtime(format!("failed to parse YAML: {}", e), "/", "/"))?;
let runner = WorkflowRunner::new(workflow)?;
runner.run(input).await
}
fn testdata(filename: &str) -> String {
let manifest_dir = std::env::var("CARGO_MANIFEST_DIR").unwrap_or(".".to_string());
Path::new(&manifest_dir)
.join("testdata")
.join(filename)
.to_str()
.unwrap()
.to_string()
}
async fn run_workflow_with_mock_server(
yaml_file: &str,
filter: impl warp::Filter<Extract = impl warp::Reply> + Clone + Send + Sync + 'static,
input: Value,
) -> Value {
let (addr, server_fn) = warp::serve(filter).bind_ephemeral(([127, 0, 0, 1], 0));
let port = addr.port();
tokio::spawn(server_fn);
let yaml_str = std::fs::read_to_string(testdata(yaml_file)).unwrap();
let yaml_str = yaml_str.replace("9876", &port.to_string());
let workflow: WorkflowDefinition = serde_yaml::from_str(&yaml_str).unwrap();
let runner = WorkflowRunner::new(workflow).unwrap();
runner.run(input).await.unwrap()
}
fn start_mock_server(
filter: impl warp::Filter<Extract = impl warp::Reply> + Clone + Send + Sync + 'static,
) -> u16 {
let (addr, server_fn) = warp::serve(filter).bind_ephemeral(([127, 0, 0, 1], 0));
let port = addr.port();
tokio::spawn(server_fn);
port
}
#[allow(dead_code)]
fn bearer_protected_endpoint(
expected_token: &str,
) -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone + Send + Sync + 'static
{
let token = expected_token.to_string();
warp::path("protected")
.and(warp::header::optional("Authorization"))
.map(move |auth: Option<String>| {
let token = token.clone();
match auth {
Some(val) if val == format!("Bearer {}", token) => {
warp::reply::json(&serde_json::json!({"data": "secret", "authenticated": true}))
.into_response()
}
_ => warp::reply::with_status(
warp::reply::json(&serde_json::json!({"error": "unauthorized"})),
warp::http::StatusCode::UNAUTHORIZED,
)
.into_response(),
}
})
}
#[allow(dead_code)]
fn basic_auth_protected_endpoint(
expected_b64: &str,
) -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone + Send + Sync + 'static
{
let expected = format!("Basic {}", expected_b64);
warp::path("protected")
.and(warp::header::optional("Authorization"))
.map(move |auth: Option<String>| match auth {
Some(val) if val == expected => {
warp::reply::json(&serde_json::json!({"access": "granted"}))
}
_ => warp::reply::json(&serde_json::json!({"access": "denied"})),
})
}
#[allow(dead_code)]
fn oauth2_token_endpoint(
expected_grant_type: &str,
access_token: &str,
) -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone + Send + Sync + 'static
{
let grant = expected_grant_type.to_string();
let token = access_token.to_string();
warp::path("oauth2")
.and(warp::path("token"))
.and(warp::post())
.and(warp::body::form::<HashMap<String, String>>())
.map(move |params: HashMap<String, String>| {
let gt = params.get("grant_type").map(|s| s.as_str()).unwrap_or("");
if gt == grant {
warp::reply::json(&serde_json::json!({
"access_token": token,
"token_type": "Bearer",
"expires_in": 3600
}))
} else {
warp::reply::json(&serde_json::json!({"error": "invalid_client"}))
}
})
}