wmill/client.rs
1#[cfg(feature = "async")]
2use futures::FutureExt;
3use windmill_api::{
4 apis::{
5 configuration::Configuration,
6 job_api,
7 resource_api::{self, get_resource_value_interpolated},
8 variable_api,
9 },
10 models::{EditResource, GetCompletedJobResultMaybe200Response},
11};
12
13use serde::Deserialize;
14use serde_json::{Value, json};
15use std::{
16 env::{self, var},
17 fmt::Debug,
18 time::{Duration, Instant},
19};
20
21use crate::{maybe_future::maybe_future::MaybeFuture, ret};
22
23#[derive(Debug, Clone)]
24pub struct Windmill {
25 pub workspace: String,
26 pub token: String,
27 pub base_internal_url: String,
28 pub client_config: Configuration,
29}
30
31impl Windmill {
32 /// Creates a new `Windmill` instance with default configuration.
33 ///
34 /// Reads configuration from environment variables:
35 /// - `WM_TOKEN` for authentication token
36 /// - `WM_WORKSPACE` for workspace name
37 /// - `BASE_INTERNAL_URL` for API base URL (appends `/api` automatically)
38 ///
39 /// # Errors
40 /// Returns `SdkError` if:
41 /// - Required environment variables are missing
42 /// - Environment variables cannot be read
43 pub fn default() -> Result<Self, SdkError> {
44 Windmill::new(None, None, None)
45 }
46
47 /// Creates a new `Windmill` instance with optional overrides.
48 ///
49 /// Falls back to environment variables for any `None` parameters:
50 /// - `WM_TOKEN` if `token` is `None`
51 /// - `WM_WORKSPACE` if `workspace` is `None`
52 /// - `BASE_INTERNAL_URL` + "/api" if `base_internal_url` is `None`
53 ///
54 /// # Parameters
55 /// - `token`: Optional bearer token override
56 /// - `workspace`: Optional workspace name override
57 /// - `base_internal_url`: Optional base URL override (without `/api` suffix)
58 ///
59 /// # Errors
60 /// Returns `SdkError` if:
61 /// - Required environment variables are missing when needed
62 /// - Environment variables cannot be read
63 pub fn new(
64 token: Option<String>,
65 workspace: Option<String>,
66 base_internal_url: Option<String>,
67 ) -> Result<Self, SdkError> {
68 use env::var;
69 let (token, base_internal_url, workspace) = (
70 token.or(var("WM_TOKEN").ok()).ok_or(SdkError::BadValue(
71 "WM_TOKEN is not set nor was provided in constructor".to_owned(),
72 ))?,
73 base_internal_url
74 .or(var("BASE_INTERNAL_URL").ok())
75 .ok_or(SdkError::BadValue(
76 "BASE_INTERNAL_URL is not set nor was provided in constructor".to_owned(),
77 ))?
78 + "/api",
79 workspace
80 .or(var("WM_WORKSPACE").ok())
81 .ok_or(SdkError::BadValue(
82 "WM_WORKSPACE is not set nor was provided in constructor".to_owned(),
83 ))?,
84 );
85
86 Ok(Windmill {
87 client_config: Configuration {
88 // TODO: client: reqwest::blocking::Client::new(), // Use blocking client?
89 base_path: base_internal_url.clone(),
90 bearer_access_token: Some(token.clone()),
91 ..Default::default()
92 },
93 workspace,
94 token,
95 base_internal_url,
96 })
97 }
98
99 /// Retrieves a variable from Windmill, automatically parsing it as JSON/YAML.
100 ///
101 /// This is the convenience version that attempts to parse the variable value as:
102 /// 1. JSON (primary attempt)
103 /// 2. YAML (fallback if JSON parsing fails)
104 /// 3. Raw string (final fallback if both parsings fail)
105 ///
106 /// For better performance when you know the format or don't need parsing,
107 /// use [`Self::get_variable_raw`] instead.
108 ///
109 /// # Arguments
110 /// * `path` - Variable path (e.g., "u/user/variable_name")
111 ///
112 /// # Example
113 /// ```no_run
114 ///
115 /// use wmill::Windmill;
116 /// use serde_json::json;
117 ///
118 /// let wm = Windmill::default()?;
119 ///
120 /// // For a variable containing JSON: {"key": "value"}
121 /// let json_var = wm.get_variable("u/admin/config")?;
122 ///
123 /// // For a variable containing plain text
124 /// let text_var = wm.get_variable("u/user/plaintext_note")?;
125 ///
126 /// # Ok::<(), wmill::SdkError>(())
127 /// ```
128 ///
129 /// See also: [`Self::get_variable_raw`] for the unparsed version
130 pub fn get_variable<'a>(&'a self, path: &'a str) -> MaybeFuture<'a, Result<Value, SdkError>> {
131 ret!(self.get_variable_inner(path));
132 }
133
134 async fn get_variable_inner<'a>(&'a self, path: &'a str) -> Result<Value, SdkError> {
135 let raw = self.get_variable_raw_inner(path).await?;
136 Ok(serde_json::from_str(&raw)
137 .or(serde_yaml::from_str(&raw))
138 .unwrap_or(json!(raw)))
139 }
140
141 /// This is the **faster version** when:
142 /// - You know the variable contains plain text
143 /// - You want to handle parsing yourself
144 /// - You need maximum performance
145 ///
146 /// Performance benefit comes from avoiding:
147 /// 1. JSON parsing attempt
148 /// 2. YAML parsing fallback
149 ///
150 /// # Arguments
151 /// * `path` - Variable path (e.g., "u/user/variable_name")
152 ///
153 /// # Example
154 /// ```no_run
155 /// use wmill::Windmill;
156 /// use serde_json::{json, Value};
157 ///
158 /// let wm = Windmill::default()?;
159 ///
160 /// // When you need the raw content
161 /// let raw_content = wm.get_variable_raw("u/user/custom_format")?;
162 ///
163 /// // When you know it's JSON and want to parse it differently
164 /// let json_value: Value = serde_json::from_str(
165 /// &wm.get_variable_raw("u/admin/config")?
166 /// )?;
167 ///
168 /// # Ok::<(), wmill::SdkError>(())
169 /// ```
170 ///
171 /// See also: [`Self::get_variable`] for the auto-parsed version
172 pub fn get_variable_raw<'a>(
173 &'a self,
174 path: &'a str,
175 ) -> MaybeFuture<'a, Result<String, SdkError>> {
176 ret!(self.get_variable_raw_inner(path));
177 }
178
179 async fn get_variable_raw_inner<'a>(&'a self, path: &'a str) -> Result<String, SdkError> {
180 let v =
181 variable_api::get_variable_value(&self.client_config, &self.workspace, path).await?;
182
183 v.get(1..v.len() - 1)
184 .ok_or(SdkError::InternalError(
185 "returned value is incorrect".into(),
186 ))
187 .map(|s| s.to_owned())
188 }
189
190 /// Creates or updates a variable in the workspace.
191 ///
192 /// This function provides atomic variable management that:
193 /// - Creates a new variable if it doesn't exist
194 /// - Updates an existing variable if found
195 /// - Handles both regular and secret variables
196 ///
197 /// # Parameters
198 /// - `value`: The variable value to set
199 /// - `path`: The variable path/identifier
200 /// - `is_secret`: Whether to store as a secret (encrypted) variable
201 ///
202 /// # Errors
203 /// Returns `SdkError` if:
204 /// - Variable fetch fails for reasons other than "not found"
205 /// - Variable creation fails
206 /// - Variable update fails
207 /// - Underlying API calls fail
208 ///
209 /// # Notes
210 /// - For new variables, defaults to empty description
211 /// - Updates only modify the value (preserving other metadata)
212 /// - Secret status can only be set during creation
213 pub fn set_variable<'a>(
214 &'a self,
215 value: String,
216 path: &'a str,
217 is_secret: bool,
218 ) -> MaybeFuture<'a, Result<(), SdkError>> {
219 ret!(self.set_variable_inner(value, path, is_secret));
220 }
221
222 async fn set_variable_inner<'a>(
223 &'a self,
224 value: String,
225 path: &'a str,
226 is_secret: bool,
227 ) -> Result<(), SdkError> {
228 let res =
229 variable_api::get_variable(&self.client_config, &self.workspace, path, None, None)
230 .await;
231
232 if res.is_err() {
233 variable_api::create_variable(
234 &self.client_config,
235 &self.workspace,
236 windmill_api::models::CreateVariable {
237 path: path.to_owned(),
238 value,
239 is_secret,
240 description: "".to_owned(),
241 account: None,
242 is_oauth: None,
243 expires_at: None,
244 },
245 None,
246 )
247 .await?;
248 } else {
249 variable_api::update_variable(
250 &self.client_config,
251 &self.workspace,
252 path,
253 windmill_api::models::EditVariable {
254 path: None,
255 value: Some(value),
256 is_secret: None,
257 description: None,
258 },
259 None,
260 )
261 .await?;
262 }
263 Ok(())
264 }
265
266 /// Fetches and deserializes a resource into a concrete type.
267 ///
268 /// This is the recommended way to access resources when you know the expected type.
269 /// For raw JSON access or dynamic typing, use [`Self::get_resource_any`] instead.
270 ///
271 /// # Type Parameters
272 /// * `T` - Any type implementing `DeserializeOwned` (most structs with `#[derive(Deserialize)]`)
273 ///
274 /// # Arguments
275 /// * `path` - The resource path (e.g., "u/user/resource_name")
276 ///
277 /// # Example
278 /// ```no_run
279 /// use wmill::Windmill;
280 /// use serde_json::{json, Value};
281 /// use serde::Deserialize;
282 ///
283 /// #[derive(Deserialize)]
284 /// struct DbConfig {
285 /// url: String,
286 /// pool_size: Option<u32>,
287 /// }
288 ///
289 /// let wm = Windmill::default()?;
290 ///
291 /// // Directly deserialize to your type
292 /// let config: DbConfig = wm.get_resource("u/admin/db_connection")?;
293 /// # Ok::<(), wmill::SdkError>(())
294 /// ```
295 ///
296 /// See also: [`Self::get_resource_any`] for untyped version
297 pub fn get_resource<'a, T: serde::de::DeserializeOwned>(
298 &'a self,
299 path: &'a str,
300 ) -> MaybeFuture<'a, Result<T, SdkError>> {
301 ret!(self.get_resource_inner(path));
302 }
303
304 async fn get_resource_inner<'a, T: serde::de::DeserializeOwned>(
305 &'a self,
306 path: &'a str,
307 ) -> Result<T, SdkError> {
308 Ok(serde_json::from_value(
309 self.get_resource_any_inner(path).await?,
310 )?)
311 }
312
313 /// Fetches a raw JSON [`Value`] from Windmill by path.
314 ///
315 /// Use this when you need the raw JSON structure or don't have a concrete type to deserialize into.
316 /// For typed deserialization, prefer [`Self::get_resource`] instead.
317 ///
318 /// # Arguments
319 /// * `path` - The resource path (e.g., "u/user/resource_name")
320 ///
321 /// # Example
322 /// ```no_run
323 /// use wmill::Windmill;
324 ///
325 /// let wm = Windmill::default()?;
326 ///
327 /// // When you need to inspect the raw structure first
328 /// let json = wm.get_resource_any("u/admin/db_connection")?;
329 ///
330 /// println!("Url is: {}", json["url"]);
331 ///
332 /// # Ok::<(), wmill::SdkError>(())
333 /// ```
334 ///
335 /// See also: [`Self::get_resource`] for typed version
336 pub fn get_resource_any<'a>(
337 &'a self,
338 path: &'a str,
339 ) -> MaybeFuture<'a, Result<Value, SdkError>> {
340 ret!(self.get_resource_any_inner(path));
341 }
342
343 async fn get_resource_any_inner<'a>(&'a self, path: &'a str) -> Result<Value, SdkError> {
344 Ok(
345 get_resource_value_interpolated(&self.client_config, &self.workspace, path, None)
346 .await?,
347 )
348 }
349
350 /// Creates or updates a resource in Windmill.
351 ///
352 /// This function sets a resource's value at the specified path, creating it if it doesn't exist
353 /// or updating it if it does. The resource will be of the specified type.
354 ///
355 /// # Arguments
356 /// * `value` - The value to set for the resource. Use `None` to create an empty resource.
357 /// * `path` - The ownership path of the resource (e.g., "u/user/resource_name").
358 /// Defines permissions based on Windmill's path prefix system.
359 /// * `resource_type` - The type of resource to create (e.g., "postgresql", "smtp").
360 /// Must be a pre-existing resource type.
361 ///
362 /// # Examples
363 /// ```no_run
364 /// use wmill::Windmill;
365 ///
366 /// # fn main() -> anyhow::Result<()> {
367 /// let wm = Windmill::default()?;
368 /// wm.set_resource(
369 /// Some(serde_json::json!({"host": "localhost", "port": 5432})),
370 /// "u/admin/database",
371 /// "postgresql"
372 /// )?;
373 /// # Ok(())
374 /// # }
375 /// ```
376 pub fn set_resource<'a>(
377 &'a self,
378 value: Option<Value>,
379 path: &'a str,
380 resource_type: &'a str,
381 ) -> MaybeFuture<'a, Result<(), SdkError>> {
382 ret!(self.set_resource_inner(value, path, resource_type));
383 }
384
385 async fn set_resource_inner<'a>(
386 &'a self,
387 value: Option<Value>,
388 path: &'a str,
389 resource_type: &'a str,
390 ) -> Result<(), SdkError> {
391 // if resource_api::get_resource(&self.client_config, &self.workspace, path)
392 // .await
393 // .is_err()
394 if !resource_api::exists_resource(&self.client_config, &self.workspace, path).await? {
395 resource_api::create_resource(
396 &self.client_config,
397 &self.workspace,
398 windmill_api::models::CreateResource {
399 path: path.to_owned(),
400 value,
401 description: None,
402 resource_type: resource_type.to_owned(),
403 // resource_type: "any".to_owned(),
404 },
405 // Some(true),
406 None,
407 )
408 .await?;
409 } else {
410 resource_api::update_resource(
411 &self.client_config,
412 &self.workspace,
413 path,
414 EditResource {
415 path: None,
416 description: None,
417 value: Some(value),
418 resource_type: Some(resource_type.to_owned()),
419 },
420 )
421 .await?;
422 }
423 Ok(())
424 }
425
426 /// Retrieves and deserializes the current typed state value for a script's execution context.
427 ///
428 /// This is the typed version of [`Self::get_state_any`], automatically deserializing the state
429 /// into the specified type `T` that implements [`serde::de::DeserializeOwned`].
430 ///
431 /// # Type Parameter
432 /// * `T` - The type to deserialize into (must implement `serde::de::DeserializeOwned`)
433 ///
434 /// # Examples
435 /// ```no_run
436 /// use wmill::Windmill;
437 /// use serde_json::{json, Value};
438 /// use serde::Deserialize;
439 ///
440 /// #[derive(Deserialize)]
441 /// struct ScriptState {
442 /// counter: i32,
443 /// last_run: String,
444 /// }
445 ///
446 /// let wm = Windmill::default()?;
447 ///
448 /// // Get typed state
449 /// let state: ScriptState = wm.get_state()?;
450 ///
451 /// println!("Counter: {}, Last run: {}", state.counter, state.last_run);
452 ///
453 /// # Ok::<(), wmill::SdkError>(())
454 /// ```
455 ///
456 /// # Behavior Details
457 /// - Uses same state path resolution as [`Self::get_state_any`] (`WM_STATE_PATH_NEW` → `WM_STATE_PATH` fallback)
458 /// - Performs runtime type checking during deserialization
459 ///
460 /// # When to Use vs [`Self::get_state_any`]
461 /// | Use Case | `get_state<T>` | `get_state_any` |
462 /// |------------------------|-------------------------|------------------------|
463 /// | Known state structure | ✅ Preferred | ⚠️ Requires manual parsing |
464 /// | Dynamic state | ❌ Won't compile | ✅ Works |
465 /// | Type safety | ✅ Compile-time checks | ❌ Runtime checks only |
466 ///
467 /// # Notes
468 /// - For complex types, derive `Deserialize` using Serde attributes
469 /// - Prefer this over [`Self::get_state_any`] when state schema is stable
470 /// - State modifications should use corresponding [`Self::set_state`] with matching type
471 ///
472 /// # See Also
473 /// - [`Self::get_state_any`] - Untyped state access
474 /// - [`Self::set_state`] - For updating typed states
475 /// - [Windmill State Management](https://www.windmill.dev/docs/core_concepts/persistent_storage/within_windmill#states)
476 pub fn get_state<'a, T: serde::de::DeserializeOwned>(
477 &'a self,
478 ) -> MaybeFuture<'a, Result<T, SdkError>> {
479 ret!(self.get_resource_inner(&get_state_path()?));
480 }
481
482 /// Retrieves the current state value for a script's execution context.
483 ///
484 /// States persist data between runs of the same script by the same trigger (schedule or user).
485 /// This is the untyped version that returns a generic [`Value`].
486 ///
487 /// # Examples
488 /// ```no_run
489 /// use wmill::Windmill;
490 /// use serde_json::Value;
491 /// use serde::Deserialize;
492 ///
493 /// let wm = Windmill::default()?;
494 ///
495 /// // Get state (returns serde_json::Value)
496 /// let state: Value = wm.get_state_any()?;
497 ///
498 /// // Use with default if empty
499 /// let counter = state.as_i64().unwrap_or(0);
500 /// # Ok::<(), wmill::SdkError>(())
501 /// ```
502 ///
503 /// # Behavior Details
504 /// - Automatically uses the script's state path from `WM_STATE_PATH_NEW` (falls back to `WM_STATE_PATH`)
505 /// - Returns the full state object stored as a Windmill resource
506 /// - State resources are hidden from Workspace view but visible under Resources → States
507 ///
508 /// # Typical Use Cases
509 /// 1. Maintaining counters between runs
510 /// 2. Storing last execution timestamps
511 /// 3. Keeping reference data (like previous API responses)
512 ///
513 /// # Notes
514 /// - For typed state access, use `get_state<T>` instead
515 /// - States are isolated per script and trigger combination
516 /// - Maximum state size: 5MB (compressed)
517 ///
518 /// # See Also
519 /// - [`Self::set_state`] - For updating the state
520 /// - [Windmill State Management](https://www.windmill.dev/docs/core_concepts/persistent_storage/within_windmill#states)
521 pub fn get_state_any<'a>(&'a self) -> MaybeFuture<'a, Result<Value, SdkError>> {
522 ret!(self.get_resource_any_inner(&get_state_path()?));
523 }
524
525 /// Updates or clears the script's persistent state.
526 ///
527 /// # Arguments
528 /// * `value` - New state value (`Some(Value)`) or `None` to clear state
529 ///
530 /// # Examples
531 /// ```no_run
532 /// use wmill::Windmill;
533 /// use serde_json::json;
534 ///
535 /// # async fn example() -> anyhow::Result<()> {
536 /// let wm = Windmill::default()?;
537 ///
538 /// // Set state
539 /// wm.set_state(Some(json!({"count": 42})))?;
540 ///
541 /// // Clear state
542 /// wm.set_state(None)?;
543 /// # Ok(())
544 /// # }
545 /// ```
546 ///
547 /// See also: [`Self::get_state`], [`Self::get_state_any`]
548 pub fn set_state<'a>(&'a self, value: Option<Value>) -> MaybeFuture<'a, Result<(), SdkError>> {
549 ret!(self.set_resource_inner(value, &get_state_path()?, "state"));
550 }
551
552 /// Executes a script synchronously and waits for its completion.
553 ///
554 /// This is a blocking version of `run_script_async` that handles the entire script execution
555 /// lifecycle including job scheduling and result waiting.
556 ///
557 /// # Parameters
558 /// - `ident`: Script identifier (either path or hash)
559 /// - `ident_is_hash`: Whether the identifier is a hash (true) or path (false)
560 /// - `args`: JSON arguments to pass to the script
561 /// - `scheduled_in_secs`: Optional delay before execution (in seconds)
562 /// - `timeout_secs`: Maximum time to wait for job completion (in seconds)
563 /// - `verbose`: Whether to print execution details
564 /// - `assert_result_is_not_none`: Whether to fail if the result is None
565 ///
566 /// # Errors
567 /// Returns `SdkError` if:
568 /// - Script fails to start
569 /// - Job times out
570 /// - Result assertion fails
571 /// - Underlying API calls fail
572 pub fn run_script_sync<'a>(
573 &'a self,
574 ident: &'a str,
575 ident_is_hash: bool,
576 args: Value,
577 scheduled_in_secs: Option<u32>,
578 timeout_secs: Option<u64>,
579 verbose: bool,
580 assert_result_is_not_none: bool,
581 ) -> MaybeFuture<'a, Result<Value, SdkError>> {
582 if verbose {
583 println!("running `{ident}` synchronously with {:?}", &args);
584 }
585
586 ret!(async move {
587 let job_id = self
588 .run_script_async_inner(ident, ident_is_hash, args, scheduled_in_secs)
589 .await?;
590 self.wait_job_inner(
591 &job_id.to_string(),
592 timeout_secs,
593 verbose,
594 assert_result_is_not_none,
595 )
596 .await
597 });
598 }
599
600 /// Asynchronously executes a script in Windmill and returns its job UUID.
601 ///
602 /// This function runs a script either by path or by hash, with optional:
603 /// - Parent job inheritance (when run within a flow)
604 /// - Scheduled execution delay
605 /// - Argument passing
606 ///
607 /// # Arguments
608 /// * `ident` - Script identifier (path or hash depending on `ident_is_hash`)
609 /// * `ident_is_hash` - If true, `ident` is treated as a script hash; if false, as a path
610 /// * `args` - JSON arguments to pass to the script (must be an object if using path)
611 /// * `scheduled_in_secs` - Optional delay (in seconds) before execution
612 ///
613 /// # Examples
614 /// ```no_run
615 /// use wmill::Windmill;
616 /// use serde_json::json;
617 ///
618 /// let wm = Windmill::default()?;
619 /// let job_id = wm.run_script_async(
620 /// "u/user/script_path",
621 /// false,
622 /// json!({"param1": "value1"}),
623 /// Some(10) // Run after 10 seconds
624 /// )?;
625 ///
626 /// # Ok::<(), wmill::SdkError>(())
627 /// ```
628 ///
629 /// # Behavior Details
630 /// - **Automatic Job Inheritance**:
631 /// - Detects `WM_JOB_ID` env var → sets as `parent_job`
632 /// - Detects `WM_ROOT_FLOW_JOB_ID` env var → sets as `root_job`
633 /// - **Scheduled Execution**:
634 /// - When `scheduled_in_secs` is provided, sets `scheduled_in_secs` in args
635 /// - **Argument Handling**:
636 /// - For path-based execution (`ident_is_hash=false`), args must be a JSON object
637 /// - For hash-based execution, args can be any valid JSON value
638 ///
639 /// # Errors
640 /// - [`SdkError::BadValue`] if path-based execution receives non-object arguments
641 /// - API errors from Windmill's backend
642 pub fn run_script_async<'a>(
643 &'a self,
644 ident: &'a str,
645 ident_is_hash: bool,
646 args: Value,
647 scheduled_in_secs: Option<u32>,
648 ) -> MaybeFuture<'a, Result<uuid::Uuid, SdkError>> {
649 ret!(self.run_script_async_inner(ident, ident_is_hash, args, scheduled_in_secs));
650 }
651
652 async fn run_script_async_inner<'a>(
653 &'a self,
654 ident: &'a str,
655 ident_is_hash: bool,
656 mut args: Value,
657 scheduled_in_secs: Option<u32>,
658 ) -> Result<uuid::Uuid, SdkError> {
659 if let Ok(parent_job) = var("WM_JOB_ID") {
660 args["parent_job"] = json!(parent_job);
661 }
662
663 if let Ok(root_job) = var("WM_ROOT_FLOW_JOB_ID") {
664 args["root_job"] = json!(root_job);
665 }
666
667 if let Some(scheduled_in_secs) = scheduled_in_secs {
668 args["scheduled_in_secs"] = json!(scheduled_in_secs);
669 }
670
671 let uuid = if ident_is_hash {
672 job_api::run_script_by_hash(
673 &self.client_config,
674 &self.workspace,
675 ident,
676 args,
677 None,
678 None,
679 None,
680 None,
681 None,
682 None,
683 None,
684 None,
685 None,
686 )
687 .await?
688 } else {
689 job_api::run_script_by_path(
690 &self.client_config,
691 &self.workspace,
692 ident,
693 args.as_object()
694 .ok_or(SdkError::BadValue(format!(
695 "Args should be Object, but it is: {}",
696 args
697 )))?
698 .clone()
699 .into_iter()
700 .collect(),
701 None,
702 None,
703 None,
704 None,
705 None,
706 None,
707 None,
708 None,
709 )
710 .await?
711 };
712 Ok(uuid)
713 }
714
715 /// Waits for a job to complete and returns its result.
716 ///
717 /// This function provides both synchronous and asynchronous interfaces for waiting
718 /// on job completion, with timeout handling and result validation.
719 ///
720 /// # Parameters
721 /// - `job_id`: The ID of the job to wait for
722 /// - `timeout_secs`: Maximum time to wait (in seconds) before cancelling the job
723 /// - `verbose`: Whether to print progress information
724 /// - `assert_result_is_not_none`: Whether to fail if the job returns no result
725 ///
726 /// # Errors
727 /// Returns `SdkError` if:
728 /// - Job fails or times out
729 /// - Result assertion fails when `assert_result_is_not_none` is true
730 /// - Underlying API calls fail
731 ///
732 /// # Behavior
733 /// 1. Polls job status at 500ms intervals
734 /// 2. Cancels job if timeout is reached
735 /// 3. Validates success status and optional result presence
736 /// 4. Returns either the result or appropriate error
737 pub fn wait_job<'a>(
738 &'a self,
739 job_id: &'a str,
740 timeout_secs: Option<u64>,
741 verbose: bool,
742 assert_result_is_not_none: bool,
743 ) -> MaybeFuture<'a, Result<Value, SdkError>> {
744 ret!(self.wait_job_inner(job_id, timeout_secs, verbose, assert_result_is_not_none));
745 }
746
747 async fn wait_job_inner<'a>(
748 &'a self,
749 job_id: &'a str,
750 timeout_secs: Option<u64>,
751 verbose: bool,
752 assert_result_is_not_none: bool,
753 ) -> Result<Value, SdkError> {
754 let start = Instant::now();
755 loop {
756 if let Some(timeout) = timeout_secs {
757 if start.elapsed().as_secs() > timeout {
758 println!("WARN: reached timeout for {job_id}. Cancelling the job.");
759 job_api::cancel_queued_job(
760 &self.client_config,
761 &self.workspace,
762 job_id,
763 windmill_api::models::CancelQueuedJobRequest {
764 reason: Some("reached timeout".into()),
765 },
766 )
767 .await?;
768 }
769 }
770 let GetCompletedJobResultMaybe200Response {
771 completed,
772 result,
773 success,
774 started,
775 } = job_api::get_completed_job_result_maybe(
776 &self.client_config,
777 &self.workspace,
778 job_id,
779 Some(true),
780 )
781 .await?;
782
783 if matches!(started, Some(false)) && verbose {
784 println!("INFO: job {job_id} has not started yet");
785 }
786
787 if completed {
788 if matches!(success, Some(true)) {
789 if result.is_none() && assert_result_is_not_none {
790 return Err(SdkError::ExecutionError("Result was None".into()));
791 }
792 return Ok(result.unwrap_or_default());
793 } else {
794 return Err(SdkError::ExecutionError(format!(
795 "Job {job_id} was not successful: {:?}",
796 result
797 )));
798 }
799 }
800 if verbose {
801 println!("INFO: sleeping 0.5 seconds for {job_id}");
802 }
803 tokio::time::sleep(Duration::from_millis(500)).await;
804 }
805 }
806
807 /// Retrieves the current status of a Windmill job by its UUID.
808 ///
809 /// This function queries the Windmill backend to determine whether a job is:
810 /// - Waiting to be executed
811 /// - Currently running
812 /// - Already completed
813 ///
814 /// # Arguments
815 /// * `job_id` - The UUID of the job to check (format: "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx")
816 ///
817 /// # Job Lifecycle States
818 /// - **Waiting**: Job is queued but hasn't started execution
819 /// - **Running**: Job is currently being executed
820 /// - **Completed**: Job has finished (successfully or with errors)
821 pub fn get_job_status<'a>(
822 &'a self,
823 job_id: &'a str,
824 ) -> MaybeFuture<'a, Result<JobStatus, SdkError>> {
825 ret!(self.get_job_status_inner(job_id));
826 }
827
828 async fn get_job_status_inner<'a>(&'a self, job_id: &'a str) -> Result<JobStatus, SdkError> {
829 let job = job_api::get_job(
830 &self.client_config,
831 &self.workspace,
832 job_id,
833 Some(true),
834 Some(true),
835 )
836 .await?;
837
838 Ok(match job {
839 windmill_api::models::Job::JobOneOf(..) => JobStatus::Completed,
840 windmill_api::models::Job::JobOneOf1(job_one_of1) => {
841 if job_one_of1.running {
842 JobStatus::Running
843 } else {
844 JobStatus::Waiting
845 }
846 }
847 })
848 }
849
850 /// Retrieves the result of a completed job.
851 ///
852 /// This provides both synchronous and asynchronous interfaces for fetching
853 /// the final result of a successfully completed job.
854 ///
855 /// # Parameters
856 /// - `job_id`: The ID of the completed job to fetch results for
857 ///
858 /// # Errors
859 /// Returns `SdkError` if:
860 /// - Job is not found
861 /// - Job failed to complete successfully
862 /// - Underlying API calls fail
863 /// - Result cannot be parsed
864 ///
865 /// # Notes
866 /// - Only works for jobs that have already completed
867 /// - For pending/running jobs, use `wait_job` instead
868 /// - Does not handle timeouts or polling - assumes job is already complete
869 pub fn get_result<'a>(&'a self, job_id: &'a str) -> MaybeFuture<'a, Result<Value, SdkError>> {
870 ret!(self.get_result_inner(job_id));
871 }
872
873 async fn get_result_inner<'a>(&'a self, job_id: &'a str) -> Result<Value, SdkError> {
874 Ok(job_api::get_completed_job_result(
875 &self.client_config,
876 &self.workspace,
877 job_id,
878 None,
879 None,
880 None,
881 None,
882 )
883 .await?)
884 }
885
886 /// Updates the progress percentage of a running Windmill job.
887 ///
888 /// This function allows scripts to report their execution progress (0-100%) back to the Windmill UI.
889 /// Progress updates are visible in both the jobs dashboard and flow visualizations.
890 ///
891 /// # Arguments
892 /// * `value` - Progress percentage (0-100)
893 /// * `job_id` - Optional job UUID. If None, uses current job's ID from `WM_JOB_ID` environment variable
894 ///
895 /// # Examples
896 /// ```no_run
897 /// use wmill::Windmill;
898 /// # fn main () -> anyhow::Result<()>{
899 /// let wm = Windmill::default()?;
900 ///
901 /// // Report progress for current job
902 /// wm.set_progress(25, None)?;
903 /// # Ok(())
904 /// # }
905 ///
906 /// ```
907 ///
908 /// # Behavior Details
909 /// - Automatically handles flow context by detecting parent job ID
910 /// - Progress updates are reflected in real-time in the Windmill UI
911 /// - Typical usage pattern:
912 /// ```ignore
913 /// for (i, item) in items.iter().enumerate() {
914 /// process(item);
915 /// let progress = ((i + 1) * 100 / items.len()) as i32;
916 /// wmill.set_progress(progress, None).await?;
917 /// }
918 /// ```
919 ///
920 /// # Notes
921 /// - Only affects jobs that are currently running
922 /// - Progress values outside 0-99 range are clamped by the server
923 /// - Progress cannot decrease
924 /// - For flows, updates the progress of both the step and parent flow
925 ///
926 /// # See Also
927 /// - [Flow Progress Tracking](https://www.windmill.dev/docs/advanced/explicit_progress)
928 /// - [`Self::get_progress`] - For reading job progress
929 pub fn set_progress<'a>(
930 &'a self,
931 value: u32,
932 job_id: Option<String>,
933 ) -> MaybeFuture<'a, Result<(), SdkError>> {
934 let f = async move {
935 let job_id = job_id.unwrap_or(var("WM_JOB_ID")?);
936 let job = job_api::get_job(
937 &self.client_config,
938 &self.workspace,
939 &job_id,
940 Some(true),
941 Some(true),
942 )
943 .await?;
944
945 let flow_id = match job {
946 windmill_api::models::Job::JobOneOf(job) => job.parent_job,
947 windmill_api::models::Job::JobOneOf1(job) => job.parent_job,
948 };
949
950 windmill_api::apis::metrics_api::set_job_progress(
951 &self.client_config,
952 &self.workspace,
953 &job_id,
954 windmill_api::models::SetJobProgressRequest {
955 percent: Some(value as i32),
956 flow_job_id: flow_id,
957 },
958 )
959 .await?;
960 Ok(())
961 };
962 ret!(f);
963 }
964
965 /// Retrieves the current progress percentage of a Windmill job.
966 ///
967 /// This function queries the Windmill backend to get the execution progress (0-100%)
968 /// of either a specific job or the current job context.
969 ///
970 /// # Arguments
971 /// * `job_id` - Optional job UUID. If `None`, uses current job's ID from `WM_JOB_ID` env var
972 ///
973 /// # See Also
974 /// - [Flow Progress Tracking](https://www.windmill.dev/docs/advanced/explicit_progress)
975 /// - [`Self::set_progress`] - For updating job progress
976 pub fn get_progress<'a>(
977 &'a self,
978 job_id: Option<String>,
979 ) -> MaybeFuture<'a, Result<u32, SdkError>> {
980 let f = async move {
981 let job_id = job_id.unwrap_or(var("WM_JOB_ID")?);
982 Ok(windmill_api::apis::metrics_api::get_job_progress(
983 &self.client_config,
984 &self.workspace,
985 &job_id,
986 )
987 .await
988 .map(|v| v as u32)?)
989 };
990 ret!(f);
991 }
992
993 /// Executes an API call in either asynchronous or synchronous mode based on compilation context.
994 ///
995 /// This function serves as a bridge between async and sync code, automatically adapting its behavior:
996 /// - With `async` feature: Returns a boxed future for later await
997 /// - Without `async` feature: Blocks immediately using the global runtime
998 ///
999 /// # Examples
1000 ///
1001 /// ## Async usage (with `async` feature)
1002 /// ```ignore
1003 /// use wmill::Windmill;
1004 /// # #[tokio::main]
1005 /// # async fn main() -> anyhow::Result<()>{
1006 /// let wm = Windmill::default()?;
1007 /// let user = wm.call_api(wmill::apis::admin_api::get_user(&wm.client_config, &wm.workspace, "Bob"))?;
1008 /// println!("User details: {:?}", user);
1009 /// # Ok(())
1010 /// # }
1011 /// ```
1012 ///
1013 /// ## Sync usage (without `async` feature)
1014 /// ```ignore
1015 /// use wmill::Windmill;
1016 /// let wm = Windmill::new(Some("<TOKEN>".into()), Some("admins".into()), Some("http://localhost:5000/api".into()))?;
1017 /// let user = wm.call_api(wmill::apis::admin_api::get_user(&wm.client_config, &wm.workspace, "Bob"));
1018 /// println!("User details: {:?}", user);
1019 /// # Ok::<(), wmill::SdkError>(())
1020 /// ```
1021 pub fn call_api<'a, T>(
1022 &'a self,
1023 callback: impl Future<Output = T> + std::marker::Send + 'a,
1024 ) -> MaybeFuture<'a, T> {
1025 ret!(callback);
1026 }
1027
1028 // pub fn get_version() {}
1029}
1030
1031pub enum JobStatus {
1032 Running,
1033 Waiting,
1034 Completed,
1035}
1036
1037fn get_state_path() -> Result<String, SdkError> {
1038 Ok(var("WM_STATE_PATH_NEW").unwrap_or(var("WM_STATE_PATH")?))
1039}
1040
1041#[derive(thiserror::Error, Debug)]
1042pub enum SdkError {
1043 #[error("Error: {0}")]
1044 Serde(#[from] serde_json::Error),
1045 #[error("Having troubles reading Env Variable: {0}")]
1046 VarError(#[from] std::env::VarError),
1047 #[error("Api error: {0}")]
1048 ApiError(String),
1049 #[error("Internal Error: {0}")]
1050 InternalError(String),
1051 #[error("Specified value is incorrect: {0}")]
1052 BadValue(String),
1053 #[error("{0}")]
1054 ExecutionError(String),
1055}
1056impl<T: for<'a> Deserialize<'a>> From<windmill_api::apis::Error<T>> for SdkError {
1057 fn from(value: windmill_api::apis::Error<T>) -> Self {
1058 Self::ApiError(value.to_string())
1059 }
1060}