use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use schemars::JsonSchema;
use serde::Serialize;
use serde_json::Value;
use crate::context::WorkflowContext;
use crate::error::EngineError;
pub fn input_schema_for<T: JsonSchema>() -> Value {
let schema = schemars::schema_for!(T);
serde_json::to_value(schema).expect("schema serialization cannot fail")
}
pub type HandlerFuture<'a> = Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'a>>;
#[derive(Debug, Clone, Serialize)]
pub struct WorkflowInfo {
pub description: String,
pub source_code: Option<String>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub sub_workflows: Vec<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub category: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub version: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub input_schema: Option<Value>,
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
pub default_labels: HashMap<String, String>,
}
pub trait WorkflowHandler: Send + Sync {
fn name(&self) -> &str;
fn version(&self) -> Option<&str> {
None
}
fn category(&self) -> Option<&str> {
None
}
fn input_schema(&self) -> Option<Value> {
None
}
fn default_labels(&self) -> HashMap<String, String> {
HashMap::new()
}
fn describe(&self) -> WorkflowInfo {
WorkflowInfo {
description: String::new(),
source_code: None,
sub_workflows: Vec::new(),
category: self.category().map(str::to_string),
version: self.version().map(str::to_string),
input_schema: self.input_schema(),
default_labels: self.default_labels(),
}
}
fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a>;
}
#[cfg(test)]
mod tests {
use super::*;
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize, JsonSchema)]
struct TestInput {
environment: String,
#[serde(default)]
dry_run: bool,
}
struct MinimalHandler;
impl WorkflowHandler for MinimalHandler {
fn name(&self) -> &str {
"minimal"
}
fn execute<'a>(&'a self, _ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
Box::pin(async { Ok(()) })
}
}
struct FullFeaturedHandler;
impl WorkflowHandler for FullFeaturedHandler {
fn name(&self) -> &str {
"full"
}
fn version(&self) -> Option<&str> {
Some("1.2.0")
}
fn category(&self) -> Option<&str> {
Some("data/etl")
}
fn input_schema(&self) -> Option<Value> {
Some(input_schema_for::<TestInput>())
}
fn default_labels(&self) -> HashMap<String, String> {
HashMap::from([
("team".to_string(), "platform".to_string()),
("env".to_string(), "prod".to_string()),
])
}
fn describe(&self) -> WorkflowInfo {
WorkflowInfo {
description: "Full-featured test handler".to_string(),
source_code: Some("fn test() {}".to_string()),
sub_workflows: vec!["helper".to_string()],
category: self.category().map(str::to_string),
version: self.version().map(str::to_string),
input_schema: self.input_schema(),
default_labels: self.default_labels(),
}
}
fn execute<'a>(&'a self, _ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
Box::pin(async { Ok(()) })
}
}
#[test]
fn minimal_handler_has_required_name() {
let handler = MinimalHandler;
assert_eq!(handler.name(), "minimal");
}
#[test]
fn minimal_handler_defaults_to_no_version() {
let handler = MinimalHandler;
assert_eq!(handler.version(), None);
}
#[test]
fn minimal_handler_defaults_to_no_category() {
let handler = MinimalHandler;
assert_eq!(handler.category(), None);
}
#[test]
fn minimal_handler_defaults_to_no_schema() {
let handler = MinimalHandler;
assert_eq!(handler.input_schema(), None);
}
#[test]
fn minimal_handler_defaults_to_empty_labels() {
let handler = MinimalHandler;
let labels = handler.default_labels();
assert!(labels.is_empty());
}
#[test]
fn minimal_handler_describe_reflects_defaults() {
let handler = MinimalHandler;
let info = handler.describe();
assert_eq!(info.description, "");
assert_eq!(info.source_code, None);
assert_eq!(info.sub_workflows, Vec::<String>::new());
assert_eq!(info.category, None);
assert_eq!(info.version, None);
assert_eq!(info.input_schema, None);
assert!(info.default_labels.is_empty());
}
#[test]
fn full_handler_returns_all_metadata() {
let handler = FullFeaturedHandler;
assert_eq!(handler.name(), "full");
assert_eq!(handler.version(), Some("1.2.0"));
assert_eq!(handler.category(), Some("data/etl"));
assert!(handler.input_schema().is_some());
}
#[test]
fn full_handler_default_labels_are_set() {
let handler = FullFeaturedHandler;
let labels = handler.default_labels();
assert_eq!(labels.get("team"), Some(&"platform".to_string()));
assert_eq!(labels.get("env"), Some(&"prod".to_string()));
}
#[test]
fn full_handler_describe_includes_all_fields() {
let handler = FullFeaturedHandler;
let info = handler.describe();
assert_eq!(info.description, "Full-featured test handler");
assert_eq!(info.source_code, Some("fn test() {}".to_string()));
assert_eq!(info.sub_workflows, vec!["helper".to_string()]);
assert_eq!(info.category, Some("data/etl".to_string()));
assert_eq!(info.version, Some("1.2.0".to_string()));
assert!(info.input_schema.is_some());
assert_eq!(info.default_labels.len(), 2);
}
#[test]
fn input_schema_for_generates_json_schema() {
let schema = input_schema_for::<TestInput>();
assert_eq!(schema["type"], "object");
assert!(schema["properties"]["environment"].is_object());
assert!(schema["properties"]["dry_run"].is_object());
}
#[test]
fn input_schema_for_preserves_serde_attributes() {
let schema = input_schema_for::<TestInput>();
let properties = &schema["properties"];
assert!(properties.is_object());
assert!(properties.get("environment").is_some());
assert!(properties.get("dry_run").is_some());
}
#[test]
fn workflow_info_serializes_with_skip_empty() {
let info = WorkflowInfo {
description: "test".to_string(),
source_code: None,
sub_workflows: Vec::new(),
category: None,
version: None,
input_schema: None,
default_labels: HashMap::new(),
};
let json = serde_json::to_value(&info).expect("serialize");
assert_eq!(json["description"], "test");
assert!(json.is_object());
}
#[test]
fn workflow_info_serializes_with_values() {
let info = WorkflowInfo {
description: "test".to_string(),
source_code: Some("code".to_string()),
sub_workflows: vec!["sub".to_string()],
category: Some("cat".to_string()),
version: Some("1.0.0".to_string()),
input_schema: Some(serde_json::json!({"type": "object"})),
default_labels: HashMap::from([("key".to_string(), "value".to_string())]),
};
let json = serde_json::to_value(&info).expect("serialize");
assert_eq!(json["description"], "test");
assert_eq!(json["source_code"], "code");
assert_eq!(json["sub_workflows"][0], "sub");
assert_eq!(json["category"], "cat");
assert_eq!(json["version"], "1.0.0");
assert_eq!(json["default_labels"]["key"], "value");
}
}