Skip to main content

datalab_cli/commands/
workflows.rs

1use crate::client::DatalabClient;
2use crate::error::{DatalabError, Result};
3use crate::output::Progress;
4use clap::{Args, Subcommand};
5use serde_json::json;
6use std::fs;
7use std::path::PathBuf;
8
9#[derive(Subcommand, Debug)]
10pub enum WorkflowsCommand {
11    /// Create a new workflow
12    Create(CreateArgs),
13    /// List all workflows
14    List(ListArgs),
15    /// Get workflow details
16    Get(GetArgs),
17    /// Execute a workflow
18    Execute(ExecuteArgs),
19    /// Get execution status
20    Execution(ExecutionArgs),
21    /// Delete a workflow
22    Delete(DeleteArgs),
23    /// List available step types
24    StepTypes(StepTypesArgs),
25}
26
27#[derive(Args, Debug)]
28pub struct CreateArgs {
29    /// Workflow name
30    #[arg(long, value_name = "NAME")]
31    pub name: String,
32
33    /// JSON file containing workflow steps
34    #[arg(long, value_name = "FILE")]
35    pub steps: PathBuf,
36
37    /// Team ID for ownership
38    #[arg(long, value_name = "ID")]
39    pub team_id: Option<i64>,
40
41    /// Request timeout in seconds
42    #[arg(long, default_value = "60", value_name = "SECS")]
43    pub timeout: u64,
44}
45
46#[derive(Args, Debug)]
47pub struct ListArgs {
48    /// Request timeout in seconds
49    #[arg(long, default_value = "60", value_name = "SECS")]
50    pub timeout: u64,
51}
52
53#[derive(Args, Debug)]
54pub struct GetArgs {
55    /// Workflow ID
56    #[arg(value_name = "WORKFLOW_ID")]
57    pub workflow_id: String,
58
59    /// Request timeout in seconds
60    #[arg(long, default_value = "60", value_name = "SECS")]
61    pub timeout: u64,
62}
63
64#[derive(Args, Debug)]
65pub struct ExecuteArgs {
66    /// Workflow ID
67    #[arg(value_name = "WORKFLOW_ID")]
68    pub workflow_id: String,
69
70    /// JSON file containing input configuration
71    #[arg(long, value_name = "FILE")]
72    pub input: PathBuf,
73
74    /// Request timeout in seconds
75    #[arg(long, default_value = "300", value_name = "SECS")]
76    pub timeout: u64,
77}
78
79#[derive(Args, Debug)]
80pub struct ExecutionArgs {
81    /// Execution ID
82    #[arg(value_name = "EXECUTION_ID")]
83    pub execution_id: String,
84
85    /// Request timeout in seconds
86    #[arg(long, default_value = "60", value_name = "SECS")]
87    pub timeout: u64,
88}
89
90#[derive(Args, Debug)]
91pub struct DeleteArgs {
92    /// Workflow ID
93    #[arg(value_name = "WORKFLOW_ID")]
94    pub workflow_id: String,
95
96    /// Request timeout in seconds
97    #[arg(long, default_value = "60", value_name = "SECS")]
98    pub timeout: u64,
99}
100
101#[derive(Args, Debug)]
102pub struct StepTypesArgs {
103    /// Request timeout in seconds
104    #[arg(long, default_value = "60", value_name = "SECS")]
105    pub timeout: u64,
106}
107
108pub async fn execute(cmd: WorkflowsCommand, progress: &Progress) -> Result<()> {
109    match cmd {
110        WorkflowsCommand::Create(args) => create(args, progress).await,
111        WorkflowsCommand::List(args) => list(args, progress).await,
112        WorkflowsCommand::Get(args) => get(args, progress).await,
113        WorkflowsCommand::Execute(args) => execute_workflow(args, progress).await,
114        WorkflowsCommand::Execution(args) => execution(args, progress).await,
115        WorkflowsCommand::Delete(args) => delete(args, progress).await,
116        WorkflowsCommand::StepTypes(args) => step_types(args, progress).await,
117    }
118}
119
120async fn create(args: CreateArgs, progress: &Progress) -> Result<()> {
121    let client = DatalabClient::new(Some(args.timeout))?;
122
123    if !args.steps.exists() {
124        return Err(DatalabError::FileNotFound(args.steps.clone()));
125    }
126
127    progress.start("create-workflow", Some(&args.name));
128
129    let steps_content = fs::read_to_string(&args.steps)?;
130    let steps: serde_json::Value = serde_json::from_str(&steps_content)
131        .map_err(|e| DatalabError::InvalidInput(format!("Invalid JSON in steps file: {}", e)))?;
132
133    let mut body = json!({
134        "name": args.name,
135        "steps": steps,
136    });
137
138    if let Some(team_id) = args.team_id {
139        body["team_id"] = json!(team_id);
140    }
141
142    let response = client.post_json("workflows/workflows", &body).await?;
143
144    println!("{}", serde_json::to_string_pretty(&response)?);
145
146    Ok(())
147}
148
149async fn list(args: ListArgs, progress: &Progress) -> Result<()> {
150    let client = DatalabClient::new(Some(args.timeout))?;
151
152    progress.start("list-workflows", None);
153
154    let response = client.get("workflows/workflows").await?;
155
156    println!("{}", serde_json::to_string_pretty(&response)?);
157
158    Ok(())
159}
160
161async fn get(args: GetArgs, progress: &Progress) -> Result<()> {
162    let client = DatalabClient::new(Some(args.timeout))?;
163
164    progress.start("get-workflow", Some(&args.workflow_id));
165
166    let path = format!("workflows/workflows/{}", args.workflow_id);
167    let response = client.get(&path).await?;
168
169    println!("{}", serde_json::to_string_pretty(&response)?);
170
171    Ok(())
172}
173
174async fn execute_workflow(args: ExecuteArgs, progress: &Progress) -> Result<()> {
175    let client = DatalabClient::new(Some(args.timeout))?;
176
177    if !args.input.exists() {
178        return Err(DatalabError::FileNotFound(args.input.clone()));
179    }
180
181    progress.start("execute-workflow", Some(&args.workflow_id));
182
183    let input_content = fs::read_to_string(&args.input)?;
184    let input_config: serde_json::Value = serde_json::from_str(&input_content)
185        .map_err(|e| DatalabError::InvalidInput(format!("Invalid JSON in input file: {}", e)))?;
186
187    let body = json!({
188        "input_config": input_config,
189    });
190
191    let path = format!("workflows/workflows/{}/execute", args.workflow_id);
192    let response = client.post_json(&path, &body).await?;
193
194    println!("{}", serde_json::to_string_pretty(&response)?);
195
196    Ok(())
197}
198
199async fn execution(args: ExecutionArgs, progress: &Progress) -> Result<()> {
200    let client = DatalabClient::new(Some(args.timeout))?;
201
202    progress.start("get-execution", Some(&args.execution_id));
203
204    let path = format!("workflows/executions/{}", args.execution_id);
205    let response = client.get(&path).await?;
206
207    println!("{}", serde_json::to_string_pretty(&response)?);
208
209    Ok(())
210}
211
212async fn delete(args: DeleteArgs, progress: &Progress) -> Result<()> {
213    let client = DatalabClient::new(Some(args.timeout))?;
214
215    progress.start("delete-workflow", Some(&args.workflow_id));
216
217    let path = format!("workflows/workflows/{}", args.workflow_id);
218    client.delete(&path).await?;
219
220    println!(
221        "{}",
222        serde_json::to_string_pretty(&json!({
223            "deleted": true,
224            "workflow_id": args.workflow_id,
225        }))?
226    );
227
228    Ok(())
229}
230
231async fn step_types(args: StepTypesArgs, progress: &Progress) -> Result<()> {
232    let client = DatalabClient::new(Some(args.timeout))?;
233
234    progress.start("list-step-types", None);
235
236    let response = client.get("workflows/step-types").await?;
237
238    println!("{}", serde_json::to_string_pretty(&response)?);
239
240    Ok(())
241}