wmill/
client.rs

1#[cfg(feature = "async")]
2use futures::FutureExt;
3use windmill_api::{
4    apis::{
5        configuration::Configuration,
6        job_api,
7        resource_api::{self, get_resource_value_interpolated},
8        variable_api,
9    },
10    models::{EditResource, GetCompletedJobResultMaybe200Response},
11};
12
13use serde::Deserialize;
14use serde_json::{Value, json};
15use std::{
16    env::{self, var},
17    fmt::Debug,
18    time::{Duration, Instant},
19};
20
21use crate::{maybe_future::maybe_future::MaybeFuture, ret};
22
23#[derive(Debug, Clone)]
24pub struct Windmill {
25    pub workspace: String,
26    pub token: String,
27    pub base_internal_url: String,
28    pub client_config: Configuration,
29}
30
31impl Windmill {
32    /// Creates a new `Windmill` instance with default configuration.
33    ///
34    /// Reads configuration from environment variables:
35    /// - `WM_TOKEN` for authentication token
36    /// - `WM_WORKSPACE` for workspace name  
37    /// - `BASE_INTERNAL_URL` for API base URL (appends `/api` automatically)
38    ///
39    /// # Errors
40    /// Returns `SdkError` if:
41    /// - Required environment variables are missing
42    /// - Environment variables cannot be read
43    pub fn default() -> Result<Self, SdkError> {
44        Windmill::new(None, None, None)
45    }
46
47    /// Creates a new `Windmill` instance with optional overrides.
48    ///
49    /// Falls back to environment variables for any `None` parameters:
50    /// - `WM_TOKEN` if `token` is `None`
51    /// - `WM_WORKSPACE` if `workspace` is `None`  
52    /// - `BASE_INTERNAL_URL` + "/api" if `base_internal_url` is `None`
53    ///
54    /// # Parameters
55    /// - `token`: Optional bearer token override
56    /// - `workspace`: Optional workspace name override  
57    /// - `base_internal_url`: Optional base URL override (without `/api` suffix)
58    ///
59    /// # Errors  
60    /// Returns `SdkError` if:
61    /// - Required environment variables are missing when needed
62    /// - Environment variables cannot be read
63    pub fn new(
64        token: Option<String>,
65        workspace: Option<String>,
66        base_internal_url: Option<String>,
67    ) -> Result<Self, SdkError> {
68        use env::var;
69        let (token, base_internal_url, workspace) = (
70            token.or(var("WM_TOKEN").ok()).ok_or(SdkError::BadValue(
71                "WM_TOKEN is not set nor was provided in constructor".to_owned(),
72            ))?,
73            base_internal_url
74                .or(var("BASE_INTERNAL_URL").ok())
75                .ok_or(SdkError::BadValue(
76                    "BASE_INTERNAL_URL is not set nor was provided in constructor".to_owned(),
77                ))?
78                + "/api",
79            workspace
80                .or(var("WM_WORKSPACE").ok())
81                .ok_or(SdkError::BadValue(
82                    "WM_WORKSPACE is not set nor was provided in constructor".to_owned(),
83                ))?,
84        );
85
86        Ok(Windmill {
87            client_config: Configuration {
88                // TODO: client: reqwest::blocking::Client::new(), // Use blocking client?
89                base_path: base_internal_url.clone(),
90                bearer_access_token: Some(token.clone()),
91                ..Default::default()
92            },
93            workspace,
94            token,
95            base_internal_url,
96        })
97    }
98
99    /// Retrieves a variable from Windmill, automatically parsing it as JSON/YAML.
100    ///
101    /// This is the convenience version that attempts to parse the variable value as:
102    /// 1. JSON (primary attempt)
103    /// 2. YAML (fallback if JSON parsing fails)
104    /// 3. Raw string (final fallback if both parsings fail)
105    ///
106    /// For better performance when you know the format or don't need parsing,
107    /// use [`Self::get_variable_raw`] instead.
108    ///
109    /// # Arguments
110    /// * `path` - Variable path (e.g., "u/user/variable_name")
111    ///
112    /// # Example
113    /// ```no_run
114    ///
115    /// use wmill::Windmill;
116    /// use serde_json::json;
117    ///
118    /// let wm = Windmill::default()?;
119    ///
120    /// // For a variable containing JSON: {"key": "value"}
121    /// let json_var = wm.get_variable("u/admin/config")?;
122    ///
123    /// // For a variable containing plain text
124    /// let text_var = wm.get_variable("u/user/plaintext_note")?;
125    ///
126    /// # Ok::<(), wmill::SdkError>(())
127    /// ```
128    ///
129    /// See also: [`Self::get_variable_raw`] for the unparsed version
130    pub fn get_variable<'a>(&'a self, path: &'a str) -> MaybeFuture<'a, Result<Value, SdkError>> {
131        ret!(self.get_variable_inner(path));
132    }
133
134    async fn get_variable_inner<'a>(&'a self, path: &'a str) -> Result<Value, SdkError> {
135        let raw = self.get_variable_raw_inner(path).await?;
136        Ok(serde_json::from_str(&raw)
137            .or(serde_yaml::from_str(&raw))
138            .unwrap_or(json!(raw)))
139    }
140
141    /// This is the **faster version** when:
142    /// - You know the variable contains plain text
143    /// - You want to handle parsing yourself
144    /// - You need maximum performance
145    ///
146    /// Performance benefit comes from avoiding:
147    /// 1. JSON parsing attempt
148    /// 2. YAML parsing fallback
149    ///
150    /// # Arguments
151    /// * `path` - Variable path (e.g., "u/user/variable_name")
152    ///
153    /// # Example
154    /// ```no_run
155    /// use wmill::Windmill;
156    /// use serde_json::{json, Value};
157    ///
158    /// let wm = Windmill::default()?;
159    ///
160    /// // When you need the raw content
161    /// let raw_content = wm.get_variable_raw("u/user/custom_format")?;
162    ///
163    /// // When you know it's JSON and want to parse it differently
164    /// let json_value: Value = serde_json::from_str(
165    ///     &wm.get_variable_raw("u/admin/config")?
166    /// )?;
167    ///
168    /// # Ok::<(), wmill::SdkError>(())
169    /// ```
170    ///
171    /// See also: [`Self::get_variable`] for the auto-parsed version
172    pub fn get_variable_raw<'a>(
173        &'a self,
174        path: &'a str,
175    ) -> MaybeFuture<'a, Result<String, SdkError>> {
176        ret!(self.get_variable_raw_inner(path));
177    }
178
179    async fn get_variable_raw_inner<'a>(&'a self, path: &'a str) -> Result<String, SdkError> {
180        let v =
181            variable_api::get_variable_value(&self.client_config, &self.workspace, path).await?;
182
183        v.get(1..v.len() - 1)
184            .ok_or(SdkError::InternalError(
185                "returned value is incorrect".into(),
186            ))
187            .map(|s| s.to_owned())
188    }
189
190    /// Creates or updates a variable in the workspace.
191    ///
192    /// This function provides atomic variable management that:
193    /// - Creates a new variable if it doesn't exist
194    /// - Updates an existing variable if found
195    /// - Handles both regular and secret variables
196    ///
197    /// # Parameters
198    /// - `value`: The variable value to set
199    /// - `path`: The variable path/identifier
200    /// - `is_secret`: Whether to store as a secret (encrypted) variable
201    ///
202    /// # Errors
203    /// Returns `SdkError` if:
204    /// - Variable fetch fails for reasons other than "not found"
205    /// - Variable creation fails
206    /// - Variable update fails
207    /// - Underlying API calls fail
208    ///
209    /// # Notes
210    /// - For new variables, defaults to empty description
211    /// - Updates only modify the value (preserving other metadata)
212    /// - Secret status can only be set during creation
213    pub fn set_variable<'a>(
214        &'a self,
215        value: String,
216        path: &'a str,
217        is_secret: bool,
218    ) -> MaybeFuture<'a, Result<(), SdkError>> {
219        ret!(self.set_variable_inner(value, path, is_secret));
220    }
221
222    async fn set_variable_inner<'a>(
223        &'a self,
224        value: String,
225        path: &'a str,
226        is_secret: bool,
227    ) -> Result<(), SdkError> {
228        let res =
229            variable_api::get_variable(&self.client_config, &self.workspace, path, None, None)
230                .await;
231
232        if res.is_err() {
233            variable_api::create_variable(
234                &self.client_config,
235                &self.workspace,
236                windmill_api::models::CreateVariable {
237                    path: path.to_owned(),
238                    value,
239                    is_secret,
240                    description: "".to_owned(),
241                    account: None,
242                    is_oauth: None,
243                    expires_at: None,
244                },
245                None,
246            )
247            .await?;
248        } else {
249            variable_api::update_variable(
250                &self.client_config,
251                &self.workspace,
252                path,
253                windmill_api::models::EditVariable {
254                    path: None,
255                    value: Some(value),
256                    is_secret: None,
257                    description: None,
258                },
259                None,
260            )
261            .await?;
262        }
263        Ok(())
264    }
265
266    /// Fetches and deserializes a resource into a concrete type.
267    ///
268    /// This is the recommended way to access resources when you know the expected type.
269    /// For raw JSON access or dynamic typing, use [`Self::get_resource_any`] instead.
270    ///
271    /// # Type Parameters
272    /// * `T` - Any type implementing `DeserializeOwned` (most structs with `#[derive(Deserialize)]`)
273    ///
274    /// # Arguments
275    /// * `path` - The resource path (e.g., "u/user/resource_name")
276    ///
277    /// # Example
278    /// ```no_run
279    /// use wmill::Windmill;
280    /// use serde_json::{json, Value};
281    /// use serde::Deserialize;
282    ///
283    /// #[derive(Deserialize)]
284    /// struct DbConfig {
285    ///     url: String,
286    ///     pool_size: Option<u32>,
287    /// }
288    ///
289    /// let wm = Windmill::default()?;
290    ///
291    /// // Directly deserialize to your type
292    /// let config: DbConfig = wm.get_resource("u/admin/db_connection")?;
293    /// # Ok::<(), wmill::SdkError>(())
294    /// ```
295    ///
296    /// See also: [`Self::get_resource_any`] for untyped version
297    pub fn get_resource<'a, T: serde::de::DeserializeOwned>(
298        &'a self,
299        path: &'a str,
300    ) -> MaybeFuture<'a, Result<T, SdkError>> {
301        ret!(self.get_resource_inner(path));
302    }
303
304    async fn get_resource_inner<'a, T: serde::de::DeserializeOwned>(
305        &'a self,
306        path: &'a str,
307    ) -> Result<T, SdkError> {
308        Ok(serde_json::from_value(
309            self.get_resource_any_inner(path).await?,
310        )?)
311    }
312
313    /// Fetches a raw JSON [`Value`] from Windmill by path.
314    ///
315    /// Use this when you need the raw JSON structure or don't have a concrete type to deserialize into.
316    /// For typed deserialization, prefer [`Self::get_resource`] instead.
317    ///
318    /// # Arguments
319    /// * `path` - The resource path (e.g., "u/user/resource_name")
320    ///
321    /// # Example
322    /// ```no_run
323    /// use wmill::Windmill;
324    ///
325    /// let wm = Windmill::default()?;
326    ///
327    /// // When you need to inspect the raw structure first
328    /// let json = wm.get_resource_any("u/admin/db_connection")?;
329    ///
330    /// println!("Url is: {}", json["url"]);
331    ///
332    /// # Ok::<(), wmill::SdkError>(())
333    /// ```
334    ///
335    /// See also: [`Self::get_resource`] for typed version
336    pub fn get_resource_any<'a>(
337        &'a self,
338        path: &'a str,
339    ) -> MaybeFuture<'a, Result<Value, SdkError>> {
340        ret!(self.get_resource_any_inner(path));
341    }
342
343    async fn get_resource_any_inner<'a>(&'a self, path: &'a str) -> Result<Value, SdkError> {
344        Ok(
345            get_resource_value_interpolated(&self.client_config, &self.workspace, path, None)
346                .await?,
347        )
348    }
349
350    /// Creates or updates a resource in Windmill.
351    ///
352    /// This function sets a resource's value at the specified path, creating it if it doesn't exist
353    /// or updating it if it does. The resource will be of the specified type.
354    ///
355    /// # Arguments
356    /// * `value` - The value to set for the resource. Use `None` to create an empty resource.
357    /// * `path` - The ownership path of the resource (e.g., "u/user/resource_name").
358    ///            Defines permissions based on Windmill's path prefix system.
359    /// * `resource_type` - The type of resource to create (e.g., "postgresql", "smtp").
360    ///                     Must be a pre-existing resource type.
361    ///
362    /// # Examples
363    /// ```no_run
364    /// use wmill::Windmill;
365    ///
366    /// # fn main() -> anyhow::Result<()> {
367    /// let wm = Windmill::default()?;
368    /// wm.set_resource(
369    ///     Some(serde_json::json!({"host": "localhost", "port": 5432})),
370    ///     "u/admin/database",
371    ///     "postgresql"
372    /// )?;
373    /// # Ok(())
374    /// # }
375    /// ```
376    pub fn set_resource<'a>(
377        &'a self,
378        value: Option<Value>,
379        path: &'a str,
380        resource_type: &'a str,
381    ) -> MaybeFuture<'a, Result<(), SdkError>> {
382        ret!(self.set_resource_inner(value, path, resource_type));
383    }
384
385    async fn set_resource_inner<'a>(
386        &'a self,
387        value: Option<Value>,
388        path: &'a str,
389        resource_type: &'a str,
390    ) -> Result<(), SdkError> {
391        // if resource_api::get_resource(&self.client_config, &self.workspace, path)
392        //     .await
393        //     .is_err()
394        if !resource_api::exists_resource(&self.client_config, &self.workspace, path).await? {
395            resource_api::create_resource(
396                &self.client_config,
397                &self.workspace,
398                windmill_api::models::CreateResource {
399                    path: path.to_owned(),
400                    value,
401                    description: None,
402                    resource_type: resource_type.to_owned(),
403                    // resource_type: "any".to_owned(),
404                },
405                // Some(true),
406                None,
407            )
408            .await?;
409        } else {
410            resource_api::update_resource(
411                &self.client_config,
412                &self.workspace,
413                path,
414                EditResource {
415                    path: None,
416                    description: None,
417                    value: Some(value),
418                    resource_type: Some(resource_type.to_owned()),
419                },
420            )
421            .await?;
422        }
423        Ok(())
424    }
425
426    /// Retrieves and deserializes the current typed state value for a script's execution context.
427    ///
428    /// This is the typed version of [`Self::get_state_any`], automatically deserializing the state
429    /// into the specified type `T` that implements [`serde::de::DeserializeOwned`].
430    ///
431    /// # Type Parameter
432    /// * `T` - The type to deserialize into (must implement `serde::de::DeserializeOwned`)
433    ///
434    /// # Examples
435    /// ```no_run
436    /// use wmill::Windmill;
437    /// use serde_json::{json, Value};
438    /// use serde::Deserialize;
439    ///
440    /// #[derive(Deserialize)]
441    /// struct ScriptState {
442    ///     counter: i32,
443    ///     last_run: String,
444    /// }
445    ///
446    /// let wm = Windmill::default()?;
447    ///
448    /// // Get typed state
449    /// let state: ScriptState = wm.get_state()?;
450    ///
451    /// println!("Counter: {}, Last run: {}", state.counter, state.last_run);
452    ///
453    /// # Ok::<(), wmill::SdkError>(())
454    /// ```
455    ///
456    /// # Behavior Details
457    /// - Uses same state path resolution as [`Self::get_state_any`] (`WM_STATE_PATH_NEW` → `WM_STATE_PATH` fallback)
458    /// - Performs runtime type checking during deserialization
459    ///
460    /// # When to Use vs [`Self::get_state_any`]
461    /// | Use Case               | `get_state<T>`          | `get_state_any`        |
462    /// |------------------------|-------------------------|------------------------|
463    /// | Known state structure  | ✅ Preferred            | ⚠️ Requires manual parsing |
464    /// | Dynamic state          | ❌ Won't compile        | ✅ Works               |
465    /// | Type safety            | ✅ Compile-time checks  | ❌ Runtime checks only |
466    ///
467    /// # Notes
468    /// - For complex types, derive `Deserialize` using Serde attributes
469    /// - Prefer this over [`Self::get_state_any`] when state schema is stable
470    /// - State modifications should use corresponding [`Self::set_state`] with matching type
471    ///
472    /// # See Also
473    /// - [`Self::get_state_any`] - Untyped state access
474    /// - [`Self::set_state`] - For updating typed states
475    /// - [Windmill State Management](https://www.windmill.dev/docs/core_concepts/persistent_storage/within_windmill#states)
476    pub fn get_state<'a, T: serde::de::DeserializeOwned>(
477        &'a self,
478    ) -> MaybeFuture<'a, Result<T, SdkError>> {
479        ret!(self.get_resource_inner(&get_state_path()?));
480    }
481
482    /// Retrieves the current state value for a script's execution context.
483    ///
484    /// States persist data between runs of the same script by the same trigger (schedule or user).
485    /// This is the untyped version that returns a generic [`Value`].
486    ///
487    /// # Examples
488    /// ```no_run
489    /// use wmill::Windmill;
490    /// use serde_json::Value;
491    /// use serde::Deserialize;
492    ///
493    /// let wm = Windmill::default()?;
494    ///
495    /// // Get state (returns serde_json::Value)
496    /// let state: Value = wm.get_state_any()?;
497    ///
498    /// // Use with default if empty
499    /// let counter = state.as_i64().unwrap_or(0);
500    /// # Ok::<(), wmill::SdkError>(())
501    /// ```
502    ///
503    /// # Behavior Details
504    /// - Automatically uses the script's state path from `WM_STATE_PATH_NEW` (falls back to `WM_STATE_PATH`)
505    /// - Returns the full state object stored as a Windmill resource
506    /// - State resources are hidden from Workspace view but visible under Resources → States
507    ///
508    /// # Typical Use Cases
509    /// 1. Maintaining counters between runs
510    /// 2. Storing last execution timestamps
511    /// 3. Keeping reference data (like previous API responses)
512    ///
513    /// # Notes
514    /// - For typed state access, use `get_state<T>` instead
515    /// - States are isolated per script and trigger combination
516    /// - Maximum state size: 5MB (compressed)
517    ///
518    /// # See Also
519    /// - [`Self::set_state`] - For updating the state
520    /// - [Windmill State Management](https://www.windmill.dev/docs/core_concepts/persistent_storage/within_windmill#states)
521    pub fn get_state_any<'a>(&'a self) -> MaybeFuture<'a, Result<Value, SdkError>> {
522        ret!(self.get_resource_any_inner(&get_state_path()?));
523    }
524
525    /// Updates or clears the script's persistent state.
526    ///
527    /// # Arguments
528    /// * `value` - New state value (`Some(Value)`) or `None` to clear state
529    ///
530    /// # Examples
531    /// ```no_run
532    /// use wmill::Windmill;
533    /// use serde_json::json;
534    ///
535    /// # async fn example() -> anyhow::Result<()> {
536    /// let wm = Windmill::default()?;
537    ///
538    /// // Set state
539    /// wm.set_state(Some(json!({"count": 42})))?;
540    ///
541    /// // Clear state
542    /// wm.set_state(None)?;
543    /// # Ok(())
544    /// # }
545    /// ```
546    ///
547    /// See also: [`Self::get_state`], [`Self::get_state_any`]
548    pub fn set_state<'a>(&'a self, value: Option<Value>) -> MaybeFuture<'a, Result<(), SdkError>> {
549        ret!(self.set_resource_inner(value, &get_state_path()?, "state"));
550    }
551
552    /// Executes a script synchronously and waits for its completion.
553    ///
554    /// This is a blocking version of `run_script_async` that handles the entire script execution
555    /// lifecycle including job scheduling and result waiting.
556    ///
557    /// # Parameters
558    /// - `ident`: Script identifier (either path or hash)
559    /// - `ident_is_hash`: Whether the identifier is a hash (true) or path (false)
560    /// - `args`: JSON arguments to pass to the script
561    /// - `scheduled_in_secs`: Optional delay before execution (in seconds)
562    /// - `timeout_secs`: Maximum time to wait for job completion (in seconds)
563    /// - `verbose`: Whether to print execution details
564    /// - `assert_result_is_not_none`: Whether to fail if the result is None
565    ///
566    /// # Errors
567    /// Returns `SdkError` if:
568    /// - Script fails to start
569    /// - Job times out
570    /// - Result assertion fails
571    /// - Underlying API calls fail
572    pub fn run_script_sync<'a>(
573        &'a self,
574        ident: &'a str,
575        ident_is_hash: bool,
576        args: Value,
577        scheduled_in_secs: Option<u32>,
578        timeout_secs: Option<u64>,
579        verbose: bool,
580        assert_result_is_not_none: bool,
581    ) -> MaybeFuture<'a, Result<Value, SdkError>> {
582        if verbose {
583            println!("running `{ident}` synchronously with {:?}", &args);
584        }
585
586        ret!(async move {
587            let job_id = self
588                .run_script_async_inner(ident, ident_is_hash, args, scheduled_in_secs)
589                .await?;
590            self.wait_job_inner(
591                &job_id.to_string(),
592                timeout_secs,
593                verbose,
594                assert_result_is_not_none,
595            )
596            .await
597        });
598    }
599
600    /// Asynchronously executes a script in Windmill and returns its job UUID.
601    ///
602    /// This function runs a script either by path or by hash, with optional:
603    /// - Parent job inheritance (when run within a flow)
604    /// - Scheduled execution delay
605    /// - Argument passing
606    ///
607    /// # Arguments
608    /// * `ident` - Script identifier (path or hash depending on `ident_is_hash`)
609    /// * `ident_is_hash` - If true, `ident` is treated as a script hash; if false, as a path
610    /// * `args` - JSON arguments to pass to the script (must be an object if using path)
611    /// * `scheduled_in_secs` - Optional delay (in seconds) before execution
612    ///
613    /// # Examples
614    /// ```no_run
615    /// use wmill::Windmill;
616    /// use serde_json::json;
617    ///
618    /// let wm = Windmill::default()?;
619    /// let job_id = wm.run_script_async(
620    ///     "u/user/script_path",
621    ///     false,
622    ///     json!({"param1": "value1"}),
623    ///     Some(10) // Run after 10 seconds
624    /// )?;
625    ///
626    /// # Ok::<(), wmill::SdkError>(())
627    /// ```
628    ///
629    /// # Behavior Details
630    /// - **Automatic Job Inheritance**:
631    ///   - Detects `WM_JOB_ID` env var → sets as `parent_job`
632    ///   - Detects `WM_ROOT_FLOW_JOB_ID` env var → sets as `root_job`
633    /// - **Scheduled Execution**:
634    ///   - When `scheduled_in_secs` is provided, sets `scheduled_in_secs` in args
635    /// - **Argument Handling**:
636    ///   - For path-based execution (`ident_is_hash=false`), args must be a JSON object
637    ///   - For hash-based execution, args can be any valid JSON value
638    ///
639    /// # Errors
640    /// - [`SdkError::BadValue`] if path-based execution receives non-object arguments
641    /// - API errors from Windmill's backend
642    pub fn run_script_async<'a>(
643        &'a self,
644        ident: &'a str,
645        ident_is_hash: bool,
646        args: Value,
647        scheduled_in_secs: Option<u32>,
648    ) -> MaybeFuture<'a, Result<uuid::Uuid, SdkError>> {
649        ret!(self.run_script_async_inner(ident, ident_is_hash, args, scheduled_in_secs));
650    }
651
652    async fn run_script_async_inner<'a>(
653        &'a self,
654        ident: &'a str,
655        ident_is_hash: bool,
656        mut args: Value,
657        scheduled_in_secs: Option<u32>,
658    ) -> Result<uuid::Uuid, SdkError> {
659        if let Ok(parent_job) = var("WM_JOB_ID") {
660            args["parent_job"] = json!(parent_job);
661        }
662
663        if let Ok(root_job) = var("WM_ROOT_FLOW_JOB_ID") {
664            args["root_job"] = json!(root_job);
665        }
666
667        if let Some(scheduled_in_secs) = scheduled_in_secs {
668            args["scheduled_in_secs"] = json!(scheduled_in_secs);
669        }
670
671        let uuid = if ident_is_hash {
672            job_api::run_script_by_hash(
673                &self.client_config,
674                &self.workspace,
675                ident,
676                args,
677                None,
678                None,
679                None,
680                None,
681                None,
682                None,
683                None,
684                None,
685                None,
686            )
687            .await?
688        } else {
689            job_api::run_script_by_path(
690                &self.client_config,
691                &self.workspace,
692                ident,
693                args.as_object()
694                    .ok_or(SdkError::BadValue(format!(
695                        "Args should be Object, but it is: {}",
696                        args
697                    )))?
698                    .clone()
699                    .into_iter()
700                    .collect(),
701                None,
702                None,
703                None,
704                None,
705                None,
706                None,
707                None,
708                None,
709            )
710            .await?
711        };
712        Ok(uuid)
713    }
714
715    /// Waits for a job to complete and returns its result.
716    ///
717    /// This function provides both synchronous and asynchronous interfaces for waiting
718    /// on job completion, with timeout handling and result validation.
719    ///
720    /// # Parameters
721    /// - `job_id`: The ID of the job to wait for
722    /// - `timeout_secs`: Maximum time to wait (in seconds) before cancelling the job
723    /// - `verbose`: Whether to print progress information
724    /// - `assert_result_is_not_none`: Whether to fail if the job returns no result
725    ///
726    /// # Errors
727    /// Returns `SdkError` if:
728    /// - Job fails or times out
729    /// - Result assertion fails when `assert_result_is_not_none` is true
730    /// - Underlying API calls fail
731    ///
732    /// # Behavior
733    /// 1. Polls job status at 500ms intervals
734    /// 2. Cancels job if timeout is reached
735    /// 3. Validates success status and optional result presence
736    /// 4. Returns either the result or appropriate error
737    pub fn wait_job<'a>(
738        &'a self,
739        job_id: &'a str,
740        timeout_secs: Option<u64>,
741        verbose: bool,
742        assert_result_is_not_none: bool,
743    ) -> MaybeFuture<'a, Result<Value, SdkError>> {
744        ret!(self.wait_job_inner(job_id, timeout_secs, verbose, assert_result_is_not_none));
745    }
746
747    async fn wait_job_inner<'a>(
748        &'a self,
749        job_id: &'a str,
750        timeout_secs: Option<u64>,
751        verbose: bool,
752        assert_result_is_not_none: bool,
753    ) -> Result<Value, SdkError> {
754        let start = Instant::now();
755        loop {
756            if let Some(timeout) = timeout_secs {
757                if start.elapsed().as_secs() > timeout {
758                    println!("WARN: reached timeout for {job_id}. Cancelling the job.");
759                    job_api::cancel_queued_job(
760                        &self.client_config,
761                        &self.workspace,
762                        job_id,
763                        windmill_api::models::CancelQueuedJobRequest {
764                            reason: Some("reached timeout".into()),
765                        },
766                    )
767                    .await?;
768                }
769            }
770            let GetCompletedJobResultMaybe200Response {
771                completed,
772                result,
773                success,
774                started,
775            } = job_api::get_completed_job_result_maybe(
776                &self.client_config,
777                &self.workspace,
778                job_id,
779                Some(true),
780            )
781            .await?;
782
783            if matches!(started, Some(false)) && verbose {
784                println!("INFO: job {job_id} has not started yet");
785            }
786
787            if completed {
788                if matches!(success, Some(true)) {
789                    if result.is_none() && assert_result_is_not_none {
790                        return Err(SdkError::ExecutionError("Result was None".into()));
791                    }
792                    return Ok(result.unwrap_or_default());
793                } else {
794                    return Err(SdkError::ExecutionError(format!(
795                        "Job {job_id} was not successful: {:?}",
796                        result
797                    )));
798                }
799            }
800            if verbose {
801                println!("INFO: sleeping 0.5 seconds for {job_id}");
802            }
803            tokio::time::sleep(Duration::from_millis(500)).await;
804        }
805    }
806
807    /// Retrieves the current status of a Windmill job by its UUID.
808    ///
809    /// This function queries the Windmill backend to determine whether a job is:
810    /// - Waiting to be executed
811    /// - Currently running
812    /// - Already completed
813    ///
814    /// # Arguments
815    /// * `job_id` - The UUID of the job to check (format: "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx")
816    ///
817    /// # Job Lifecycle States
818    /// - **Waiting**: Job is queued but hasn't started execution
819    /// - **Running**: Job is currently being executed
820    /// - **Completed**: Job has finished (successfully or with errors)
821    pub fn get_job_status<'a>(
822        &'a self,
823        job_id: &'a str,
824    ) -> MaybeFuture<'a, Result<JobStatus, SdkError>> {
825        ret!(self.get_job_status_inner(job_id));
826    }
827
828    async fn get_job_status_inner<'a>(&'a self, job_id: &'a str) -> Result<JobStatus, SdkError> {
829        let job = job_api::get_job(
830            &self.client_config,
831            &self.workspace,
832            job_id,
833            Some(true),
834            Some(true),
835        )
836        .await?;
837
838        Ok(match job {
839            windmill_api::models::Job::JobOneOf(..) => JobStatus::Completed,
840            windmill_api::models::Job::JobOneOf1(job_one_of1) => {
841                if job_one_of1.running {
842                    JobStatus::Running
843                } else {
844                    JobStatus::Waiting
845                }
846            }
847        })
848    }
849
850    /// Retrieves the result of a completed job.
851    ///
852    /// This provides both synchronous and asynchronous interfaces for fetching
853    /// the final result of a successfully completed job.
854    ///
855    /// # Parameters
856    /// - `job_id`: The ID of the completed job to fetch results for
857    ///
858    /// # Errors
859    /// Returns `SdkError` if:
860    /// - Job is not found
861    /// - Job failed to complete successfully
862    /// - Underlying API calls fail
863    /// - Result cannot be parsed
864    ///
865    /// # Notes
866    /// - Only works for jobs that have already completed
867    /// - For pending/running jobs, use `wait_job` instead
868    /// - Does not handle timeouts or polling - assumes job is already complete
869    pub fn get_result<'a>(&'a self, job_id: &'a str) -> MaybeFuture<'a, Result<Value, SdkError>> {
870        ret!(self.get_result_inner(job_id));
871    }
872
873    async fn get_result_inner<'a>(&'a self, job_id: &'a str) -> Result<Value, SdkError> {
874        Ok(job_api::get_completed_job_result(
875            &self.client_config,
876            &self.workspace,
877            job_id,
878            None,
879            None,
880            None,
881            None,
882        )
883        .await?)
884    }
885
886    /// Updates the progress percentage of a running Windmill job.
887    ///
888    /// This function allows scripts to report their execution progress (0-100%) back to the Windmill UI.
889    /// Progress updates are visible in both the jobs dashboard and flow visualizations.
890    ///
891    /// # Arguments
892    /// * `value` - Progress percentage (0-100)
893    /// * `job_id` - Optional job UUID. If None, uses current job's ID from `WM_JOB_ID` environment variable
894    ///
895    /// # Examples
896    /// ```no_run
897    /// use wmill::Windmill;
898    /// # fn main () -> anyhow::Result<()>{
899    /// let wm = Windmill::default()?;
900    ///
901    /// // Report progress for current job
902    /// wm.set_progress(25, None)?;
903    /// # Ok(())
904    /// # }
905    ///
906    /// ```
907    ///
908    /// # Behavior Details
909    /// - Automatically handles flow context by detecting parent job ID
910    /// - Progress updates are reflected in real-time in the Windmill UI
911    /// - Typical usage pattern:
912    ///   ```ignore
913    ///   for (i, item) in items.iter().enumerate() {
914    ///       process(item);
915    ///       let progress = ((i + 1) * 100 / items.len()) as i32;
916    ///       wmill.set_progress(progress, None).await?;
917    ///   }
918    ///   ```
919    ///
920    /// # Notes
921    /// - Only affects jobs that are currently running
922    /// - Progress values outside 0-99 range are clamped by the server
923    /// - Progress cannot decrease
924    /// - For flows, updates the progress of both the step and parent flow
925    ///
926    /// # See Also
927    /// - [Flow Progress Tracking](https://www.windmill.dev/docs/advanced/explicit_progress)
928    /// - [`Self::get_progress`] - For reading job progress
929    pub fn set_progress<'a>(
930        &'a self,
931        value: u32,
932        job_id: Option<String>,
933    ) -> MaybeFuture<'a, Result<(), SdkError>> {
934        let f = async move {
935            let job_id = job_id.unwrap_or(var("WM_JOB_ID")?);
936            let job = job_api::get_job(
937                &self.client_config,
938                &self.workspace,
939                &job_id,
940                Some(true),
941                Some(true),
942            )
943            .await?;
944
945            let flow_id = match job {
946                windmill_api::models::Job::JobOneOf(job) => job.parent_job,
947                windmill_api::models::Job::JobOneOf1(job) => job.parent_job,
948            };
949
950            windmill_api::apis::metrics_api::set_job_progress(
951                &self.client_config,
952                &self.workspace,
953                &job_id,
954                windmill_api::models::SetJobProgressRequest {
955                    percent: Some(value as i32),
956                    flow_job_id: flow_id,
957                },
958            )
959            .await?;
960            Ok(())
961        };
962        ret!(f);
963    }
964
965    /// Retrieves the current progress percentage of a Windmill job.
966    ///
967    /// This function queries the Windmill backend to get the execution progress (0-100%)
968    /// of either a specific job or the current job context.
969    ///
970    /// # Arguments
971    /// * `job_id` - Optional job UUID. If `None`, uses current job's ID from `WM_JOB_ID` env var
972    ///
973    /// # See Also
974    /// - [Flow Progress Tracking](https://www.windmill.dev/docs/advanced/explicit_progress)
975    /// - [`Self::set_progress`] - For updating job progress
976    pub fn get_progress<'a>(
977        &'a self,
978        job_id: Option<String>,
979    ) -> MaybeFuture<'a, Result<u32, SdkError>> {
980        let f = async move {
981            let job_id = job_id.unwrap_or(var("WM_JOB_ID")?);
982            Ok(windmill_api::apis::metrics_api::get_job_progress(
983                &self.client_config,
984                &self.workspace,
985                &job_id,
986            )
987            .await
988            .map(|v| v as u32)?)
989        };
990        ret!(f);
991    }
992
993    /// Executes an API call in either asynchronous or synchronous mode based on compilation context.
994    ///
995    /// This function serves as a bridge between async and sync code, automatically adapting its behavior:
996    /// - With `async` feature: Returns a boxed future for later await
997    /// - Without `async` feature: Blocks immediately using the global runtime
998    ///
999    /// # Examples
1000    ///
1001    /// ## Async usage (with `async` feature)
1002    /// ```ignore
1003    /// use wmill::Windmill;
1004    /// # #[tokio::main]
1005    /// # async fn main() -> anyhow::Result<()>{
1006    /// let wm = Windmill::default()?;
1007    /// let user = wm.call_api(wmill::apis::admin_api::get_user(&wm.client_config, &wm.workspace, "Bob"))?;
1008    /// println!("User details: {:?}", user);
1009    /// # Ok(())
1010    /// # }
1011    /// ```
1012    ///
1013    /// ## Sync usage (without `async` feature)
1014    /// ```ignore
1015    /// use wmill::Windmill;
1016    /// let wm = Windmill::new(Some("<TOKEN>".into()), Some("admins".into()), Some("http://localhost:5000/api".into()))?;
1017    /// let user = wm.call_api(wmill::apis::admin_api::get_user(&wm.client_config, &wm.workspace, "Bob"));
1018    /// println!("User details: {:?}", user);
1019    /// # Ok::<(), wmill::SdkError>(())
1020    /// ```
1021    pub fn call_api<'a, T>(
1022        &'a self,
1023        callback: impl Future<Output = T> + std::marker::Send + 'a,
1024    ) -> MaybeFuture<'a, T> {
1025        ret!(callback);
1026    }
1027
1028    // pub fn get_version() {}
1029}
1030
1031pub enum JobStatus {
1032    Running,
1033    Waiting,
1034    Completed,
1035}
1036
1037fn get_state_path() -> Result<String, SdkError> {
1038    Ok(var("WM_STATE_PATH_NEW").unwrap_or(var("WM_STATE_PATH")?))
1039}
1040
1041#[derive(thiserror::Error, Debug)]
1042pub enum SdkError {
1043    #[error("Error: {0}")]
1044    Serde(#[from] serde_json::Error),
1045    #[error("Having troubles reading Env Variable: {0}")]
1046    VarError(#[from] std::env::VarError),
1047    #[error("Api error: {0}")]
1048    ApiError(String),
1049    #[error("Internal Error: {0}")]
1050    InternalError(String),
1051    #[error("Specified value is incorrect: {0}")]
1052    BadValue(String),
1053    #[error("{0}")]
1054    ExecutionError(String),
1055}
1056impl<T: for<'a> Deserialize<'a>> From<windmill_api::apis::Error<T>> for SdkError {
1057    fn from(value: windmill_api::apis::Error<T>) -> Self {
1058        Self::ApiError(value.to_string())
1059    }
1060}