Skip to main content

ironflow_api/routes/
get_workflow.rs

1//! `GET /api/v1/workflows/:name` — Get workflow details.
2
3use std::collections::{HashMap, HashSet};
4
5use axum::extract::{Path, State};
6use axum::response::IntoResponse;
7use ironflow_auth::extractor::Authenticated;
8use serde::Serialize;
9use serde_json::Value;
10
11use crate::error::ApiError;
12use crate::response::ok;
13use crate::state::AppState;
14
15/// Sub-workflow detail included in the workflow response.
16#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
17#[derive(Debug, Serialize)]
18pub struct SubWorkflowDetail {
19    /// Sub-workflow name.
20    pub name: String,
21    /// Human-readable description.
22    pub description: String,
23    /// Optional Rust source code of the handler.
24    pub source_code: Option<String>,
25}
26
27/// Workflow detail response.
28#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
29#[derive(Debug, Serialize)]
30pub struct WorkflowDetailResponse {
31    /// Workflow name.
32    pub name: String,
33    /// Human-readable description.
34    pub description: String,
35    /// Optional Rust source code of the handler.
36    pub source_code: Option<String>,
37    /// Sub-workflows invoked by this handler (recursive, depth-limited).
38    pub sub_workflows: Vec<SubWorkflowDetail>,
39    /// Optional `/`-separated category path used to group workflows.
40    pub category: Option<String>,
41    /// Current handler version.
42    pub version: Option<String>,
43    /// JSON Schema describing the expected input payload.
44    #[serde(skip_serializing_if = "Option::is_none")]
45    pub input_schema: Option<Value>,
46    /// Labels automatically applied to every run of this workflow.
47    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
48    pub default_labels: HashMap<String, String>,
49}
50
51/// Get details about a registered workflow.
52///
53/// # Errors
54///
55/// - 404 if the workflow is not registered
56#[cfg_attr(
57    feature = "openapi",
58    utoipa::path(
59        get,
60        path = "/api/v1/workflows/{name}",
61        tags = ["workflows"],
62        params(("name" = String, Path, description = "Workflow name")),
63        responses(
64            (status = 200, description = "Workflow details", body = WorkflowDetailResponse),
65            (status = 401, description = "Unauthorized"),
66            (status = 404, description = "Workflow not found")
67        ),
68        security(("Bearer" = []))
69    )
70)]
71pub async fn get_workflow(
72    _auth: Authenticated,
73    State(state): State<AppState>,
74    Path(name): Path<String>,
75) -> Result<impl IntoResponse, ApiError> {
76    let info = state
77        .engine
78        .handler_info(&name)
79        .ok_or_else(|| ApiError::WorkflowNotFound(name.clone()))?;
80
81    let mut sub_workflows = Vec::new();
82    let mut visited = HashSet::new();
83    visited.insert(name.clone());
84    collect_sub_workflows(
85        &state,
86        &info.sub_workflows,
87        &mut sub_workflows,
88        &mut visited,
89        5,
90    );
91
92    Ok(ok(WorkflowDetailResponse {
93        name,
94        description: info.description,
95        source_code: info.source_code,
96        sub_workflows,
97        category: info.category,
98        version: info.version,
99        input_schema: info.input_schema,
100        default_labels: info.default_labels,
101    }))
102}
103
104fn collect_sub_workflows(
105    state: &AppState,
106    names: &[String],
107    result: &mut Vec<SubWorkflowDetail>,
108    visited: &mut HashSet<String>,
109    depth: usize,
110) {
111    if depth == 0 {
112        return;
113    }
114    for sub_name in names {
115        if !visited.insert(sub_name.clone()) {
116            continue;
117        }
118        if let Some(sub_info) = state.engine.handler_info(sub_name) {
119            collect_sub_workflows(state, &sub_info.sub_workflows, result, visited, depth - 1);
120            result.push(SubWorkflowDetail {
121                name: sub_name.clone(),
122                description: sub_info.description,
123                source_code: sub_info.source_code,
124            });
125        }
126    }
127}
128
129#[cfg(test)]
130mod tests {
131    use axum::Router;
132    use axum::body::Body;
133    use axum::http::{Request, StatusCode};
134    use axum::routing::get;
135    use http_body_util::BodyExt;
136    use ironflow_auth::jwt::AccessToken;
137    use ironflow_core::providers::claude::ClaudeCodeProvider;
138    use ironflow_engine::context::WorkflowContext;
139    use ironflow_engine::engine::Engine;
140    use ironflow_engine::handler::{HandlerFuture, WorkflowHandler};
141    use ironflow_engine::notify::Event;
142    use ironflow_store::memory::InMemoryStore;
143    use serde_json::Value as JsonValue;
144    use std::sync::Arc;
145    use tokio::sync::broadcast;
146    use tower::ServiceExt;
147    use uuid::Uuid;
148
149    use super::*;
150
151    struct DescribedWorkflow;
152    impl WorkflowHandler for DescribedWorkflow {
153        fn name(&self) -> &str {
154            "my-workflow"
155        }
156        fn execute<'a>(&'a self, _ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
157            Box::pin(async move { Ok(()) })
158        }
159    }
160
161    struct CategorizedWorkflow;
162    impl WorkflowHandler for CategorizedWorkflow {
163        fn name(&self) -> &str {
164            "cat-workflow"
165        }
166        fn category(&self) -> Option<&str> {
167            Some("data/etl")
168        }
169        fn execute<'a>(&'a self, _ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
170            Box::pin(async move { Ok(()) })
171        }
172    }
173
174    fn test_state() -> AppState {
175        let store = Arc::new(InMemoryStore::new());
176        Arc::new(InMemoryStore::new());
177        let provider = Arc::new(ClaudeCodeProvider::new());
178        let mut engine = Engine::new(store.clone(), provider);
179        engine.register(DescribedWorkflow).unwrap();
180        engine.register(CategorizedWorkflow).unwrap();
181        let jwt_config = Arc::new(ironflow_auth::jwt::JwtConfig {
182            secret: "test-secret".to_string(),
183            access_token_ttl_secs: 900,
184            refresh_token_ttl_secs: 604800,
185            cookie_domain: None,
186            cookie_secure: false,
187        });
188        let (event_sender, _) = broadcast::channel::<Event>(1);
189        AppState::new(
190            store,
191            Arc::new(engine),
192            jwt_config,
193            "test-worker-token".to_string(),
194            event_sender,
195        )
196    }
197
198    fn make_auth_header(state: &AppState) -> String {
199        let user_id = Uuid::now_v7();
200        let token = AccessToken::for_user(user_id, "testuser", false, &state.jwt_config).unwrap();
201        format!("Bearer {}", token.0)
202    }
203
204    #[tokio::test]
205    async fn get_workflow_found() {
206        let state = test_state();
207        let auth_header = make_auth_header(&state);
208        let app = Router::new()
209            .route("/{name}", get(get_workflow))
210            .with_state(state);
211
212        let req = Request::builder()
213            .uri("/my-workflow")
214            .header("authorization", auth_header)
215            .body(Body::empty())
216            .unwrap();
217
218        let resp = app.oneshot(req).await.unwrap();
219        assert_eq!(resp.status(), StatusCode::OK);
220
221        let body = resp.into_body().collect().await.unwrap().to_bytes();
222        let json_val: JsonValue = serde_json::from_slice(&body).unwrap();
223        assert_eq!(json_val["data"]["name"], "my-workflow");
224    }
225
226    #[tokio::test]
227    async fn get_workflow_returns_category_when_set() {
228        let state = test_state();
229        let auth_header = make_auth_header(&state);
230        let app = Router::new()
231            .route("/{name}", get(get_workflow))
232            .with_state(state);
233
234        let req = Request::builder()
235            .uri("/cat-workflow")
236            .header("authorization", auth_header)
237            .body(Body::empty())
238            .unwrap();
239
240        let resp = app.oneshot(req).await.unwrap();
241        assert_eq!(resp.status(), StatusCode::OK);
242        let body = resp.into_body().collect().await.unwrap().to_bytes();
243        let json_val: JsonValue = serde_json::from_slice(&body).unwrap();
244        assert_eq!(json_val["data"]["category"], "data/etl");
245    }
246
247    #[tokio::test]
248    async fn get_workflow_category_null_when_uncategorized() {
249        let state = test_state();
250        let auth_header = make_auth_header(&state);
251        let app = Router::new()
252            .route("/{name}", get(get_workflow))
253            .with_state(state);
254
255        let req = Request::builder()
256            .uri("/my-workflow")
257            .header("authorization", auth_header)
258            .body(Body::empty())
259            .unwrap();
260
261        let resp = app.oneshot(req).await.unwrap();
262        let body = resp.into_body().collect().await.unwrap().to_bytes();
263        let json_val: JsonValue = serde_json::from_slice(&body).unwrap();
264        assert!(json_val["data"]["category"].is_null());
265    }
266
267    #[tokio::test]
268    async fn get_workflow_not_found() {
269        let state = test_state();
270        let auth_header = make_auth_header(&state);
271        let app = Router::new()
272            .route("/{name}", get(get_workflow))
273            .with_state(state);
274
275        let req = Request::builder()
276            .uri("/nonexistent")
277            .header("authorization", auth_header)
278            .body(Body::empty())
279            .unwrap();
280
281        let resp = app.oneshot(req).await.unwrap();
282        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
283    }
284}