1use std::collections::HashSet;
4
5use axum::extract::{Path, State};
6use axum::response::IntoResponse;
7use ironflow_auth::extractor::Authenticated;
8use serde::Serialize;
9
10use crate::error::ApiError;
11use crate::response::ok;
12use crate::state::AppState;
13
14#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
16#[derive(Debug, Serialize)]
17pub struct SubWorkflowDetail {
18 pub name: String,
20 pub description: String,
22 pub source_code: Option<String>,
24}
25
26#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
28#[derive(Debug, Serialize)]
29pub struct WorkflowDetailResponse {
30 pub name: String,
32 pub description: String,
34 pub source_code: Option<String>,
36 pub sub_workflows: Vec<SubWorkflowDetail>,
38 pub category: Option<String>,
40}
41
42#[cfg_attr(
48 feature = "openapi",
49 utoipa::path(
50 get,
51 path = "/api/v1/workflows/{name}",
52 tags = ["workflows"],
53 params(("name" = String, Path, description = "Workflow name")),
54 responses(
55 (status = 200, description = "Workflow details", body = WorkflowDetailResponse),
56 (status = 401, description = "Unauthorized"),
57 (status = 404, description = "Workflow not found")
58 ),
59 security(("Bearer" = []))
60 )
61)]
62pub async fn get_workflow(
63 _auth: Authenticated,
64 State(state): State<AppState>,
65 Path(name): Path<String>,
66) -> Result<impl IntoResponse, ApiError> {
67 let info = state
68 .engine
69 .handler_info(&name)
70 .ok_or_else(|| ApiError::WorkflowNotFound(name.clone()))?;
71
72 let mut sub_workflows = Vec::new();
73 let mut visited = HashSet::new();
74 visited.insert(name.clone());
75 collect_sub_workflows(
76 &state,
77 &info.sub_workflows,
78 &mut sub_workflows,
79 &mut visited,
80 5,
81 );
82
83 Ok(ok(WorkflowDetailResponse {
84 name,
85 description: info.description,
86 source_code: info.source_code,
87 sub_workflows,
88 category: info.category,
89 }))
90}
91
92fn collect_sub_workflows(
93 state: &AppState,
94 names: &[String],
95 result: &mut Vec<SubWorkflowDetail>,
96 visited: &mut HashSet<String>,
97 depth: usize,
98) {
99 if depth == 0 {
100 return;
101 }
102 for sub_name in names {
103 if !visited.insert(sub_name.clone()) {
104 continue;
105 }
106 if let Some(sub_info) = state.engine.handler_info(sub_name) {
107 collect_sub_workflows(state, &sub_info.sub_workflows, result, visited, depth - 1);
108 result.push(SubWorkflowDetail {
109 name: sub_name.clone(),
110 description: sub_info.description,
111 source_code: sub_info.source_code,
112 });
113 }
114 }
115}
116
117#[cfg(test)]
118mod tests {
119 use axum::Router;
120 use axum::body::Body;
121 use axum::http::{Request, StatusCode};
122 use axum::routing::get;
123 use http_body_util::BodyExt;
124 use ironflow_auth::jwt::AccessToken;
125 use ironflow_core::providers::claude::ClaudeCodeProvider;
126 use ironflow_engine::context::WorkflowContext;
127 use ironflow_engine::engine::Engine;
128 use ironflow_engine::handler::{HandlerFuture, WorkflowHandler};
129 use ironflow_engine::notify::Event;
130 use ironflow_store::api_key_store::ApiKeyStore;
131 use ironflow_store::memory::InMemoryStore;
132 use serde_json::Value as JsonValue;
133 use std::sync::Arc;
134 use tokio::sync::broadcast;
135 use tower::ServiceExt;
136 use uuid::Uuid;
137
138 use super::*;
139
140 struct DescribedWorkflow;
141 impl WorkflowHandler for DescribedWorkflow {
142 fn name(&self) -> &str {
143 "my-workflow"
144 }
145 fn execute<'a>(&'a self, _ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
146 Box::pin(async move { Ok(()) })
147 }
148 }
149
150 struct CategorizedWorkflow;
151 impl WorkflowHandler for CategorizedWorkflow {
152 fn name(&self) -> &str {
153 "cat-workflow"
154 }
155 fn category(&self) -> Option<&str> {
156 Some("data/etl")
157 }
158 fn execute<'a>(&'a self, _ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
159 Box::pin(async move { Ok(()) })
160 }
161 }
162
163 fn test_state() -> AppState {
164 let store = Arc::new(InMemoryStore::new());
165 let user_store: Arc<dyn ironflow_store::user_store::UserStore> =
166 Arc::new(InMemoryStore::new());
167 let api_key_store: Arc<dyn ApiKeyStore> = Arc::new(InMemoryStore::new());
168 let provider = Arc::new(ClaudeCodeProvider::new());
169 let mut engine = Engine::new(store.clone(), provider);
170 engine.register(DescribedWorkflow).unwrap();
171 engine.register(CategorizedWorkflow).unwrap();
172 let jwt_config = Arc::new(ironflow_auth::jwt::JwtConfig {
173 secret: "test-secret".to_string(),
174 access_token_ttl_secs: 900,
175 refresh_token_ttl_secs: 604800,
176 cookie_domain: None,
177 cookie_secure: false,
178 });
179 let (event_sender, _) = broadcast::channel::<Event>(1);
180 AppState::new(
181 store,
182 user_store,
183 api_key_store,
184 Arc::new(engine),
185 jwt_config,
186 "test-worker-token".to_string(),
187 event_sender,
188 )
189 }
190
191 fn make_auth_header(state: &AppState) -> String {
192 let user_id = Uuid::now_v7();
193 let token = AccessToken::for_user(user_id, "testuser", false, &state.jwt_config).unwrap();
194 format!("Bearer {}", token.0)
195 }
196
197 #[tokio::test]
198 async fn get_workflow_found() {
199 let state = test_state();
200 let auth_header = make_auth_header(&state);
201 let app = Router::new()
202 .route("/{name}", get(get_workflow))
203 .with_state(state);
204
205 let req = Request::builder()
206 .uri("/my-workflow")
207 .header("authorization", auth_header)
208 .body(Body::empty())
209 .unwrap();
210
211 let resp = app.oneshot(req).await.unwrap();
212 assert_eq!(resp.status(), StatusCode::OK);
213
214 let body = resp.into_body().collect().await.unwrap().to_bytes();
215 let json_val: JsonValue = serde_json::from_slice(&body).unwrap();
216 assert_eq!(json_val["data"]["name"], "my-workflow");
217 }
218
219 #[tokio::test]
220 async fn get_workflow_returns_category_when_set() {
221 let state = test_state();
222 let auth_header = make_auth_header(&state);
223 let app = Router::new()
224 .route("/{name}", get(get_workflow))
225 .with_state(state);
226
227 let req = Request::builder()
228 .uri("/cat-workflow")
229 .header("authorization", auth_header)
230 .body(Body::empty())
231 .unwrap();
232
233 let resp = app.oneshot(req).await.unwrap();
234 assert_eq!(resp.status(), StatusCode::OK);
235 let body = resp.into_body().collect().await.unwrap().to_bytes();
236 let json_val: JsonValue = serde_json::from_slice(&body).unwrap();
237 assert_eq!(json_val["data"]["category"], "data/etl");
238 }
239
240 #[tokio::test]
241 async fn get_workflow_category_null_when_uncategorized() {
242 let state = test_state();
243 let auth_header = make_auth_header(&state);
244 let app = Router::new()
245 .route("/{name}", get(get_workflow))
246 .with_state(state);
247
248 let req = Request::builder()
249 .uri("/my-workflow")
250 .header("authorization", auth_header)
251 .body(Body::empty())
252 .unwrap();
253
254 let resp = app.oneshot(req).await.unwrap();
255 let body = resp.into_body().collect().await.unwrap().to_bytes();
256 let json_val: JsonValue = serde_json::from_slice(&body).unwrap();
257 assert!(json_val["data"]["category"].is_null());
258 }
259
260 #[tokio::test]
261 async fn get_workflow_not_found() {
262 let state = test_state();
263 let auth_header = make_auth_header(&state);
264 let app = Router::new()
265 .route("/{name}", get(get_workflow))
266 .with_state(state);
267
268 let req = Request::builder()
269 .uri("/nonexistent")
270 .header("authorization", auth_header)
271 .body(Body::empty())
272 .unwrap();
273
274 let resp = app.oneshot(req).await.unwrap();
275 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
276 }
277}