1use std::collections::{HashMap, HashSet};
4
5use axum::extract::{Path, State};
6use axum::response::IntoResponse;
7use ironflow_auth::extractor::Authenticated;
8use serde::Serialize;
9use serde_json::Value;
10
11use crate::error::ApiError;
12use crate::response::ok;
13use crate::state::AppState;
14
15#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
17#[derive(Debug, Serialize)]
18pub struct SubWorkflowDetail {
19 pub name: String,
21 pub description: String,
23 pub source_code: Option<String>,
25}
26
27#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
29#[derive(Debug, Serialize)]
30pub struct WorkflowDetailResponse {
31 pub name: String,
33 pub description: String,
35 pub source_code: Option<String>,
37 pub sub_workflows: Vec<SubWorkflowDetail>,
39 pub category: Option<String>,
41 pub version: Option<String>,
43 #[serde(skip_serializing_if = "Option::is_none")]
45 pub input_schema: Option<Value>,
46 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
48 pub default_labels: HashMap<String, String>,
49}
50
51#[cfg_attr(
57 feature = "openapi",
58 utoipa::path(
59 get,
60 path = "/api/v1/workflows/{name}",
61 tags = ["workflows"],
62 params(("name" = String, Path, description = "Workflow name")),
63 responses(
64 (status = 200, description = "Workflow details", body = WorkflowDetailResponse),
65 (status = 401, description = "Unauthorized"),
66 (status = 404, description = "Workflow not found")
67 ),
68 security(("Bearer" = []))
69 )
70)]
71pub async fn get_workflow(
72 _auth: Authenticated,
73 State(state): State<AppState>,
74 Path(name): Path<String>,
75) -> Result<impl IntoResponse, ApiError> {
76 let info = state
77 .engine
78 .handler_info(&name)
79 .ok_or_else(|| ApiError::WorkflowNotFound(name.clone()))?;
80
81 let mut sub_workflows = Vec::new();
82 let mut visited = HashSet::new();
83 visited.insert(name.clone());
84 collect_sub_workflows(
85 &state,
86 &info.sub_workflows,
87 &mut sub_workflows,
88 &mut visited,
89 5,
90 );
91
92 Ok(ok(WorkflowDetailResponse {
93 name,
94 description: info.description,
95 source_code: info.source_code,
96 sub_workflows,
97 category: info.category,
98 version: info.version,
99 input_schema: info.input_schema,
100 default_labels: info.default_labels,
101 }))
102}
103
104fn collect_sub_workflows(
105 state: &AppState,
106 names: &[String],
107 result: &mut Vec<SubWorkflowDetail>,
108 visited: &mut HashSet<String>,
109 depth: usize,
110) {
111 if depth == 0 {
112 return;
113 }
114 for sub_name in names {
115 if !visited.insert(sub_name.clone()) {
116 continue;
117 }
118 if let Some(sub_info) = state.engine.handler_info(sub_name) {
119 collect_sub_workflows(state, &sub_info.sub_workflows, result, visited, depth - 1);
120 result.push(SubWorkflowDetail {
121 name: sub_name.clone(),
122 description: sub_info.description,
123 source_code: sub_info.source_code,
124 });
125 }
126 }
127}
128
129#[cfg(test)]
130mod tests {
131 use axum::Router;
132 use axum::body::Body;
133 use axum::http::{Request, StatusCode};
134 use axum::routing::get;
135 use http_body_util::BodyExt;
136 use ironflow_auth::jwt::AccessToken;
137 use ironflow_core::providers::claude::ClaudeCodeProvider;
138 use ironflow_engine::context::WorkflowContext;
139 use ironflow_engine::engine::Engine;
140 use ironflow_engine::handler::{HandlerFuture, WorkflowHandler};
141 use ironflow_engine::notify::Event;
142 use ironflow_store::memory::InMemoryStore;
143 use serde_json::Value as JsonValue;
144 use std::sync::Arc;
145 use tokio::sync::broadcast;
146 use tower::ServiceExt;
147 use uuid::Uuid;
148
149 use super::*;
150
151 struct DescribedWorkflow;
152 impl WorkflowHandler for DescribedWorkflow {
153 fn name(&self) -> &str {
154 "my-workflow"
155 }
156 fn execute<'a>(&'a self, _ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
157 Box::pin(async move { Ok(()) })
158 }
159 }
160
161 struct CategorizedWorkflow;
162 impl WorkflowHandler for CategorizedWorkflow {
163 fn name(&self) -> &str {
164 "cat-workflow"
165 }
166 fn category(&self) -> Option<&str> {
167 Some("data/etl")
168 }
169 fn execute<'a>(&'a self, _ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
170 Box::pin(async move { Ok(()) })
171 }
172 }
173
174 fn test_state() -> AppState {
175 let store = Arc::new(InMemoryStore::new());
176 Arc::new(InMemoryStore::new());
177 let provider = Arc::new(ClaudeCodeProvider::new());
178 let mut engine = Engine::new(store.clone(), provider);
179 engine.register(DescribedWorkflow).unwrap();
180 engine.register(CategorizedWorkflow).unwrap();
181 let jwt_config = Arc::new(ironflow_auth::jwt::JwtConfig {
182 secret: "test-secret".to_string(),
183 access_token_ttl_secs: 900,
184 refresh_token_ttl_secs: 604800,
185 cookie_domain: None,
186 cookie_secure: false,
187 });
188 let (event_sender, _) = broadcast::channel::<Event>(1);
189 AppState::new(
190 store,
191 Arc::new(engine),
192 jwt_config,
193 "test-worker-token".to_string(),
194 event_sender,
195 )
196 }
197
198 fn make_auth_header(state: &AppState) -> String {
199 let user_id = Uuid::now_v7();
200 let token = AccessToken::for_user(user_id, "testuser", false, &state.jwt_config).unwrap();
201 format!("Bearer {}", token.0)
202 }
203
204 #[tokio::test]
205 async fn get_workflow_found() {
206 let state = test_state();
207 let auth_header = make_auth_header(&state);
208 let app = Router::new()
209 .route("/{name}", get(get_workflow))
210 .with_state(state);
211
212 let req = Request::builder()
213 .uri("/my-workflow")
214 .header("authorization", auth_header)
215 .body(Body::empty())
216 .unwrap();
217
218 let resp = app.oneshot(req).await.unwrap();
219 assert_eq!(resp.status(), StatusCode::OK);
220
221 let body = resp.into_body().collect().await.unwrap().to_bytes();
222 let json_val: JsonValue = serde_json::from_slice(&body).unwrap();
223 assert_eq!(json_val["data"]["name"], "my-workflow");
224 }
225
226 #[tokio::test]
227 async fn get_workflow_returns_category_when_set() {
228 let state = test_state();
229 let auth_header = make_auth_header(&state);
230 let app = Router::new()
231 .route("/{name}", get(get_workflow))
232 .with_state(state);
233
234 let req = Request::builder()
235 .uri("/cat-workflow")
236 .header("authorization", auth_header)
237 .body(Body::empty())
238 .unwrap();
239
240 let resp = app.oneshot(req).await.unwrap();
241 assert_eq!(resp.status(), StatusCode::OK);
242 let body = resp.into_body().collect().await.unwrap().to_bytes();
243 let json_val: JsonValue = serde_json::from_slice(&body).unwrap();
244 assert_eq!(json_val["data"]["category"], "data/etl");
245 }
246
247 #[tokio::test]
248 async fn get_workflow_category_null_when_uncategorized() {
249 let state = test_state();
250 let auth_header = make_auth_header(&state);
251 let app = Router::new()
252 .route("/{name}", get(get_workflow))
253 .with_state(state);
254
255 let req = Request::builder()
256 .uri("/my-workflow")
257 .header("authorization", auth_header)
258 .body(Body::empty())
259 .unwrap();
260
261 let resp = app.oneshot(req).await.unwrap();
262 let body = resp.into_body().collect().await.unwrap().to_bytes();
263 let json_val: JsonValue = serde_json::from_slice(&body).unwrap();
264 assert!(json_val["data"]["category"].is_null());
265 }
266
267 #[tokio::test]
268 async fn get_workflow_not_found() {
269 let state = test_state();
270 let auth_header = make_auth_header(&state);
271 let app = Router::new()
272 .route("/{name}", get(get_workflow))
273 .with_state(state);
274
275 let req = Request::builder()
276 .uri("/nonexistent")
277 .header("authorization", auth_header)
278 .body(Body::empty())
279 .unwrap();
280
281 let resp = app.oneshot(req).await.unwrap();
282 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
283 }
284}