wmill/client.rs
1#[cfg(feature = "async")]
2use futures::FutureExt;
3use windmill_api::{
4 apis::{
5 configuration::Configuration,
6 job_api,
7 resource_api::{self, get_resource_value_interpolated},
8 variable_api,
9 },
10 models::{EditResource, GetCompletedJobResultMaybe200Response},
11};
12
13use serde::Deserialize;
14use serde_json::{Value, json};
15use std::{
16 env::{self, var},
17 fmt::Debug,
18 time::{Duration, Instant},
19};
20
21use crate::{maybe_future::maybe_future::MaybeFuture, ret};
22
23#[derive(Debug, Clone)]
24pub struct Windmill {
25 pub workspace: String,
26 pub token: String,
27 pub base_internal_url: String,
28 pub client_config: Configuration,
29}
30
31impl Windmill {
32 /// Creates a new `Windmill` instance with default configuration.
33 ///
34 /// Reads configuration from environment variables:
35 /// - `WM_TOKEN` for authentication token
36 /// - `WM_WORKSPACE` for workspace name
37 /// - `BASE_INTERNAL_URL` for API base URL (appends `/api` automatically)
38 ///
39 /// # Errors
40 /// Returns `SdkError` if:
41 /// - Required environment variables are missing
42 /// - Environment variables cannot be read
43 pub fn default() -> Result<Self, SdkError> {
44 Windmill::new(None, None, None)
45 }
46
47 /// Creates a new `Windmill` instance with optional overrides.
48 ///
49 /// Falls back to environment variables for any `None` parameters:
50 /// - `WM_TOKEN` if `token` is `None`
51 /// - `WM_WORKSPACE` if `workspace` is `None`
52 /// - `BASE_INTERNAL_URL` + "/api" if `base_internal_url` is `None`
53 ///
54 /// # Parameters
55 /// - `token`: Optional bearer token override
56 /// - `workspace`: Optional workspace name override
57 /// - `base_internal_url`: Optional base URL override (without `/api` suffix)
58 ///
59 /// # Errors
60 /// Returns `SdkError` if:
61 /// - Required environment variables are missing when needed
62 /// - Environment variables cannot be read
63 pub fn new(
64 token: Option<String>,
65 workspace: Option<String>,
66 base_internal_url: Option<String>,
67 ) -> Result<Self, SdkError> {
68 use env::var;
69 let (token, base_internal_url, workspace) = (
70 token.or(var("WM_TOKEN").ok()).ok_or(SdkError::BadValue(
71 "WM_TOKEN is not set nor was provided in constructor".to_owned(),
72 ))?,
73 base_internal_url
74 .or(var("BASE_INTERNAL_URL").ok())
75 .ok_or(SdkError::BadValue(
76 "BASE_INTERNAL_URL is not set nor was provided in constructor".to_owned(),
77 ))?
78 + "/api",
79 workspace
80 .or(var("WM_WORKSPACE").ok())
81 .ok_or(SdkError::BadValue(
82 "WM_WORKSPACE is not set nor was provided in constructor".to_owned(),
83 ))?,
84 );
85
86 Ok(Windmill {
87 client_config: Configuration {
88 // TODO: client: reqwest::blocking::Client::new(), // Use blocking client?
89 base_path: base_internal_url.clone(),
90 bearer_access_token: Some(token.clone()),
91 ..Default::default()
92 },
93 workspace,
94 token,
95 base_internal_url,
96 })
97 }
98
99 /// Retrieves a variable from Windmill, automatically parsing it as JSON/YAML.
100 ///
101 /// This is the convenience version that attempts to parse the variable value as:
102 /// 1. JSON (primary attempt)
103 /// 2. YAML (fallback if JSON parsing fails)
104 /// 3. Raw string (final fallback if both parsings fail)
105 ///
106 /// For better performance when you know the format or don't need parsing,
107 /// use [`Self::get_variable_raw`] instead.
108 ///
109 /// # Arguments
110 /// * `path` - Variable path (e.g., "u/user/variable_name")
111 ///
112 /// # Example
113 /// ```no_run
114 ///
115 /// use wmill::Windmill;
116 /// use serde_json::json;
117 ///
118 /// let wm = Windmill::default()?;
119 ///
120 /// // For a variable containing JSON: {"key": "value"}
121 /// let json_var = wm.get_variable("u/admin/config")?;
122 ///
123 /// // For a variable containing plain text
124 /// let text_var = wm.get_variable("u/user/plaintext_note")?;
125 ///
126 /// # Ok::<(), wmill::SdkError>(())
127 /// ```
128 ///
129 /// See also: [`Self::get_variable_raw`] for the unparsed version
130 pub fn get_variable<'a>(&'a self, path: &'a str) -> MaybeFuture<'a, Result<Value, SdkError>> {
131 ret!(self.get_variable_inner(path));
132 }
133
134 async fn get_variable_inner<'a>(&'a self, path: &'a str) -> Result<Value, SdkError> {
135 let raw = self.get_variable_raw_inner(path).await?;
136 Ok(serde_json::from_str(&raw)
137 .or(serde_yaml::from_str(&raw))
138 .unwrap_or(json!(raw)))
139 }
140
141 /// This is the **faster version** when:
142 /// - You know the variable contains plain text
143 /// - You want to handle parsing yourself
144 /// - You need maximum performance
145 ///
146 /// Performance benefit comes from avoiding:
147 /// 1. JSON parsing attempt
148 /// 2. YAML parsing fallback
149 ///
150 /// # Arguments
151 /// * `path` - Variable path (e.g., "u/user/variable_name")
152 ///
153 /// # Example
154 /// ```no_run
155 /// use wmill::Windmill;
156 /// use serde_json::{json, Value};
157 ///
158 /// let wm = Windmill::default()?;
159 ///
160 /// // When you need the raw content
161 /// let raw_content = wm.get_variable_raw("u/user/custom_format")?;
162 ///
163 /// // When you know it's JSON and want to parse it differently
164 /// let json_value: Value = serde_json::from_str(
165 /// &wm.get_variable_raw("u/admin/config")?
166 /// )?;
167 ///
168 /// # Ok::<(), wmill::SdkError>(())
169 /// ```
170 ///
171 /// See also: [`Self::get_variable`] for the auto-parsed version
172 pub fn get_variable_raw<'a>(
173 &'a self,
174 path: &'a str,
175 ) -> MaybeFuture<'a, Result<String, SdkError>> {
176 ret!(self.get_variable_raw_inner(path));
177 }
178
179 async fn get_variable_raw_inner<'a>(&'a self, path: &'a str) -> Result<String, SdkError> {
180 let v =
181 variable_api::get_variable_value(&self.client_config, &self.workspace, path).await?;
182
183 v.get(1..v.len() - 1)
184 .ok_or(SdkError::InternalError(
185 "returned value is incorrect".into(),
186 ))
187 .map(|s| s.to_owned())
188 }
189
190 /// Creates or updates a variable in the workspace.
191 ///
192 /// This function provides atomic variable management that:
193 /// - Creates a new variable if it doesn't exist
194 /// - Updates an existing variable if found
195 /// - Handles both regular and secret variables
196 ///
197 /// # Parameters
198 /// - `value`: The variable value to set
199 /// - `path`: The variable path/identifier
200 /// - `is_secret`: Whether to store as a secret (encrypted) variable
201 ///
202 /// # Errors
203 /// Returns `SdkError` if:
204 /// - Variable fetch fails for reasons other than "not found"
205 /// - Variable creation fails
206 /// - Variable update fails
207 /// - Underlying API calls fail
208 ///
209 /// # Notes
210 /// - For new variables, defaults to empty description
211 /// - Updates only modify the value (preserving other metadata)
212 /// - Secret status can only be set during creation
213 pub fn set_variable<'a>(
214 &'a self,
215 value: String,
216 path: &'a str,
217 is_secret: bool,
218 ) -> MaybeFuture<'a, Result<(), SdkError>> {
219 ret!(self.set_variable_inner(value, path, is_secret));
220 }
221
222 async fn set_variable_inner<'a>(
223 &'a self,
224 value: String,
225 path: &'a str,
226 is_secret: bool,
227 ) -> Result<(), SdkError> {
228 let res =
229 variable_api::get_variable(&self.client_config, &self.workspace, path, None, None)
230 .await;
231
232 if res.is_err() {
233 variable_api::create_variable(
234 &self.client_config,
235 &self.workspace,
236 windmill_api::models::CreateVariable {
237 path: path.to_owned(),
238 value,
239 is_secret,
240 description: "".to_owned(),
241 account: None,
242 is_oauth: None,
243 expires_at: None,
244 },
245 None,
246 )
247 .await?;
248 } else {
249 variable_api::update_variable(
250 &self.client_config,
251 &self.workspace,
252 path,
253 windmill_api::models::EditVariable {
254 path: None,
255 value: Some(value),
256 is_secret: None,
257 description: None,
258 },
259 None,
260 )
261 .await?;
262 }
263 Ok(())
264 }
265
266 /// Fetches and deserializes a resource into a concrete type.
267 ///
268 /// This is the recommended way to access resources when you know the expected type.
269 /// For raw JSON access or dynamic typing, use [`Self::get_resource_any`] instead.
270 ///
271 /// # Type Parameters
272 /// * `T` - Any type implementing `DeserializeOwned` (most structs with `#[derive(Deserialize)]`)
273 ///
274 /// # Arguments
275 /// * `path` - The resource path (e.g., "u/user/resource_name")
276 ///
277 /// # Example
278 /// ```no_run
279 /// use wmill::Windmill;
280 /// use serde_json::{json, Value};
281 /// use serde::Deserialize;
282 ///
283 /// #[derive(Deserialize)]
284 /// struct DbConfig {
285 /// url: String,
286 /// pool_size: Option<u32>,
287 /// }
288 ///
289 /// let wm = Windmill::default()?;
290 ///
291 /// // Directly deserialize to your type
292 /// let config: DbConfig = wm.get_resource("u/admin/db_connection")?;
293 /// # Ok::<(), wmill::SdkError>(())
294 /// ```
295 ///
296 /// See also: [`Self::get_resource_any`] for untyped version
297 pub fn get_resource<'a, T: serde::de::DeserializeOwned>(
298 &'a self,
299 path: &'a str,
300 ) -> MaybeFuture<'a, Result<T, SdkError>> {
301 ret!(self.get_resource_inner(path));
302 }
303
304 async fn get_resource_inner<'a, T: serde::de::DeserializeOwned>(
305 &'a self,
306 path: &'a str,
307 ) -> Result<T, SdkError> {
308 Ok(serde_json::from_value(
309 self.get_resource_any_inner(path).await?,
310 )?)
311 }
312
313 /// Fetches a raw JSON [`Value`] from Windmill by path.
314 ///
315 /// Use this when you need the raw JSON structure or don't have a concrete type to deserialize into.
316 /// For typed deserialization, prefer [`Self::get_resource`] instead.
317 ///
318 /// # Arguments
319 /// * `path` - The resource path (e.g., "u/user/resource_name")
320 ///
321 /// # Example
322 /// ```no_run
323 /// use wmill::Windmill;
324 ///
325 /// let wm = Windmill::default()?;
326 ///
327 /// // When you need to inspect the raw structure first
328 /// let json = wm.get_resource_any("u/admin/db_connection")?;
329 ///
330 /// println!("Url is: {}", json["url"]);
331 ///
332 /// # Ok::<(), wmill::SdkError>(())
333 /// ```
334 ///
335 /// See also: [`Self::get_resource`] for typed version
336 pub fn get_resource_any<'a>(
337 &'a self,
338 path: &'a str,
339 ) -> MaybeFuture<'a, Result<Value, SdkError>> {
340 ret!(self.get_resource_any_inner(path));
341 }
342
343 async fn get_resource_any_inner<'a>(&'a self, path: &'a str) -> Result<Value, SdkError> {
344 Ok(
345 get_resource_value_interpolated(&self.client_config, &self.workspace, path, None)
346 .await?,
347 )
348 }
349
350 /// Creates or updates a resource in Windmill.
351 ///
352 /// This function sets a resource's value at the specified path, creating it if it doesn't exist
353 /// or updating it if it does. The resource will be of the specified type.
354 ///
355 /// # Arguments
356 /// * `value` - The value to set for the resource. Use `None` to create an empty resource.
357 /// * `path` - The ownership path of the resource (e.g., "u/user/resource_name").
358 /// Defines permissions based on Windmill's path prefix system.
359 /// * `resource_type` - The type of resource to create (e.g., "postgresql", "smtp").
360 /// Must be a pre-existing resource type.
361 ///
362 /// # Examples
363 /// ```no_run
364 /// use wmill::Windmill;
365 ///
366 /// # fn main() -> anyhow::Result<()> {
367 /// let wm = Windmill::default()?;
368 /// wm.set_resource(
369 /// Some(serde_json::json!({"host": "localhost", "port": 5432})),
370 /// "u/admin/database",
371 /// "postgresql"
372 /// )?;
373 /// # Ok(())
374 /// # }
375 /// ```
376 pub fn set_resource<'a>(
377 &'a self,
378 value: Option<Value>,
379 path: &'a str,
380 resource_type: &'a str,
381 ) -> MaybeFuture<'a, Result<(), SdkError>> {
382 ret!(self.set_resource_inner(value, path, resource_type));
383 }
384
385 async fn set_resource_inner<'a>(
386 &'a self,
387 value: Option<Value>,
388 path: &'a str,
389 resource_type: &'a str,
390 ) -> Result<(), SdkError> {
391 // if resource_api::get_resource(&self.client_config, &self.workspace, path)
392 // .await
393 // .is_err()
394 if !resource_api::exists_resource(&self.client_config, &self.workspace, path).await? {
395 resource_api::create_resource(
396 &self.client_config,
397 &self.workspace,
398 windmill_api::models::CreateResource {
399 path: path.to_owned(),
400 value,
401 description: None,
402 resource_type: resource_type.to_owned(),
403 // resource_type: "any".to_owned(),
404 },
405 // Some(true),
406 None,
407 )
408 .await?;
409 } else {
410 resource_api::update_resource(
411 &self.client_config,
412 &self.workspace,
413 path,
414 EditResource {
415 path: None,
416 description: None,
417 value: Some(value),
418 },
419 )
420 .await?;
421 }
422 Ok(())
423 }
424
425 /// Retrieves and deserializes the current typed state value for a script's execution context.
426 ///
427 /// This is the typed version of [`Self::get_state_any`], automatically deserializing the state
428 /// into the specified type `T` that implements [`serde::de::DeserializeOwned`].
429 ///
430 /// # Type Parameter
431 /// * `T` - The type to deserialize into (must implement `serde::de::DeserializeOwned`)
432 ///
433 /// # Examples
434 /// ```no_run
435 /// use wmill::Windmill;
436 /// use serde_json::{json, Value};
437 /// use serde::Deserialize;
438 ///
439 /// #[derive(Deserialize)]
440 /// struct ScriptState {
441 /// counter: i32,
442 /// last_run: String,
443 /// }
444 ///
445 /// let wm = Windmill::default()?;
446 ///
447 /// // Get typed state
448 /// let state: ScriptState = wm.get_state()?;
449 ///
450 /// println!("Counter: {}, Last run: {}", state.counter, state.last_run);
451 ///
452 /// # Ok::<(), wmill::SdkError>(())
453 /// ```
454 ///
455 /// # Behavior Details
456 /// - Uses same state path resolution as [`Self::get_state_any`] (`WM_STATE_PATH_NEW` → `WM_STATE_PATH` fallback)
457 /// - Performs runtime type checking during deserialization
458 ///
459 /// # When to Use vs [`Self::get_state_any`]
460 /// | Use Case | `get_state<T>` | `get_state_any` |
461 /// |------------------------|-------------------------|------------------------|
462 /// | Known state structure | ✅ Preferred | ⚠️ Requires manual parsing |
463 /// | Dynamic state | ❌ Won't compile | ✅ Works |
464 /// | Type safety | ✅ Compile-time checks | ❌ Runtime checks only |
465 ///
466 /// # Notes
467 /// - For complex types, derive `Deserialize` using Serde attributes
468 /// - Prefer this over [`Self::get_state_any`] when state schema is stable
469 /// - State modifications should use corresponding [`Self::set_state`] with matching type
470 ///
471 /// # See Also
472 /// - [`Self::get_state_any`] - Untyped state access
473 /// - [`Self::set_state`] - For updating typed states
474 /// - [Windmill State Management](https://www.windmill.dev/docs/core_concepts/persistent_storage/within_windmill#states)
475 pub fn get_state<'a, T: serde::de::DeserializeOwned>(
476 &'a self,
477 ) -> MaybeFuture<'a, Result<T, SdkError>> {
478 ret!(self.get_resource_inner(&get_state_path()?));
479 }
480
481 /// Retrieves the current state value for a script's execution context.
482 ///
483 /// States persist data between runs of the same script by the same trigger (schedule or user).
484 /// This is the untyped version that returns a generic [`Value`].
485 ///
486 /// # Examples
487 /// ```no_run
488 /// use wmill::Windmill;
489 /// use serde_json::Value;
490 /// use serde::Deserialize;
491 ///
492 /// let wm = Windmill::default()?;
493 ///
494 /// // Get state (returns serde_json::Value)
495 /// let state: Value = wm.get_state_any()?;
496 ///
497 /// // Use with default if empty
498 /// let counter = state.as_i64().unwrap_or(0);
499 /// # Ok::<(), wmill::SdkError>(())
500 /// ```
501 ///
502 /// # Behavior Details
503 /// - Automatically uses the script's state path from `WM_STATE_PATH_NEW` (falls back to `WM_STATE_PATH`)
504 /// - Returns the full state object stored as a Windmill resource
505 /// - State resources are hidden from Workspace view but visible under Resources → States
506 ///
507 /// # Typical Use Cases
508 /// 1. Maintaining counters between runs
509 /// 2. Storing last execution timestamps
510 /// 3. Keeping reference data (like previous API responses)
511 ///
512 /// # Notes
513 /// - For typed state access, use `get_state<T>` instead
514 /// - States are isolated per script and trigger combination
515 /// - Maximum state size: 5MB (compressed)
516 ///
517 /// # See Also
518 /// - [`Self::set_state`] - For updating the state
519 /// - [Windmill State Management](https://www.windmill.dev/docs/core_concepts/persistent_storage/within_windmill#states)
520 pub fn get_state_any<'a>(&'a self) -> MaybeFuture<'a, Result<Value, SdkError>> {
521 ret!(self.get_resource_any_inner(&get_state_path()?));
522 }
523
524 /// Updates or clears the script's persistent state.
525 ///
526 /// # Arguments
527 /// * `value` - New state value (`Some(Value)`) or `None` to clear state
528 ///
529 /// # Examples
530 /// ```no_run
531 /// use wmill::Windmill;
532 /// use serde_json::json;
533 ///
534 /// # async fn example() -> anyhow::Result<()> {
535 /// let wm = Windmill::default()?;
536 ///
537 /// // Set state
538 /// wm.set_state(Some(json!({"count": 42})))?;
539 ///
540 /// // Clear state
541 /// wm.set_state(None)?;
542 /// # Ok(())
543 /// # }
544 /// ```
545 ///
546 /// See also: [`Self::get_state`], [`Self::get_state_any`]
547 pub fn set_state<'a>(&'a self, value: Option<Value>) -> MaybeFuture<'a, Result<(), SdkError>> {
548 ret!(self.set_resource_inner(value, &get_state_path()?, "state"));
549 }
550
551 /// Executes a script synchronously and waits for its completion.
552 ///
553 /// This is a blocking version of `run_script_async` that handles the entire script execution
554 /// lifecycle including job scheduling and result waiting.
555 ///
556 /// # Parameters
557 /// - `ident`: Script identifier (either path or hash)
558 /// - `ident_is_hash`: Whether the identifier is a hash (true) or path (false)
559 /// - `args`: JSON arguments to pass to the script
560 /// - `scheduled_in_secs`: Optional delay before execution (in seconds)
561 /// - `timeout_secs`: Maximum time to wait for job completion (in seconds)
562 /// - `verbose`: Whether to print execution details
563 /// - `assert_result_is_not_none`: Whether to fail if the result is None
564 ///
565 /// # Errors
566 /// Returns `SdkError` if:
567 /// - Script fails to start
568 /// - Job times out
569 /// - Result assertion fails
570 /// - Underlying API calls fail
571 pub fn run_script_sync<'a>(
572 &'a self,
573 ident: &'a str,
574 ident_is_hash: bool,
575 args: Value,
576 scheduled_in_secs: Option<u32>,
577 timeout_secs: Option<u64>,
578 verbose: bool,
579 assert_result_is_not_none: bool,
580 ) -> MaybeFuture<'a, Result<Value, SdkError>> {
581 if verbose {
582 println!("running `{ident}` synchronously with {:?}", &args);
583 }
584
585 ret!(async move {
586 let job_id = self
587 .run_script_async_inner(ident, ident_is_hash, args, scheduled_in_secs)
588 .await?;
589 self.wait_job_inner(
590 &job_id.to_string(),
591 timeout_secs,
592 verbose,
593 assert_result_is_not_none,
594 )
595 .await
596 });
597 }
598
599 /// Asynchronously executes a script in Windmill and returns its job UUID.
600 ///
601 /// This function runs a script either by path or by hash, with optional:
602 /// - Parent job inheritance (when run within a flow)
603 /// - Scheduled execution delay
604 /// - Argument passing
605 ///
606 /// # Arguments
607 /// * `ident` - Script identifier (path or hash depending on `ident_is_hash`)
608 /// * `ident_is_hash` - If true, `ident` is treated as a script hash; if false, as a path
609 /// * `args` - JSON arguments to pass to the script (must be an object if using path)
610 /// * `scheduled_in_secs` - Optional delay (in seconds) before execution
611 ///
612 /// # Examples
613 /// ```no_run
614 /// use wmill::Windmill;
615 /// use serde_json::json;
616 ///
617 /// let wm = Windmill::default()?;
618 /// let job_id = wm.run_script_async(
619 /// "u/user/script_path",
620 /// false,
621 /// json!({"param1": "value1"}),
622 /// Some(10) // Run after 10 seconds
623 /// )?;
624 ///
625 /// # Ok::<(), wmill::SdkError>(())
626 /// ```
627 ///
628 /// # Behavior Details
629 /// - **Automatic Job Inheritance**:
630 /// - Detects `WM_JOB_ID` env var → sets as `parent_job`
631 /// - Detects `WM_ROOT_FLOW_JOB_ID` env var → sets as `root_job`
632 /// - **Scheduled Execution**:
633 /// - When `scheduled_in_secs` is provided, sets `scheduled_in_secs` in args
634 /// - **Argument Handling**:
635 /// - For path-based execution (`ident_is_hash=false`), args must be a JSON object
636 /// - For hash-based execution, args can be any valid JSON value
637 ///
638 /// # Errors
639 /// - [`SdkError::BadValue`] if path-based execution receives non-object arguments
640 /// - API errors from Windmill's backend
641 pub fn run_script_async<'a>(
642 &'a self,
643 ident: &'a str,
644 ident_is_hash: bool,
645 args: Value,
646 scheduled_in_secs: Option<u32>,
647 ) -> MaybeFuture<'a, Result<uuid::Uuid, SdkError>> {
648 ret!(self.run_script_async_inner(ident, ident_is_hash, args, scheduled_in_secs));
649 }
650
651 async fn run_script_async_inner<'a>(
652 &'a self,
653 ident: &'a str,
654 ident_is_hash: bool,
655 mut args: Value,
656 scheduled_in_secs: Option<u32>,
657 ) -> Result<uuid::Uuid, SdkError> {
658 if let Ok(parent_job) = var("WM_JOB_ID") {
659 args["parent_job"] = json!(parent_job);
660 }
661
662 if let Ok(root_job) = var("WM_ROOT_FLOW_JOB_ID") {
663 args["root_job"] = json!(root_job);
664 }
665
666 if let Some(scheduled_in_secs) = scheduled_in_secs {
667 args["scheduled_in_secs"] = json!(scheduled_in_secs);
668 }
669
670 let uuid = if ident_is_hash {
671 job_api::run_script_by_hash(
672 &self.client_config,
673 &self.workspace,
674 ident,
675 args,
676 None,
677 None,
678 None,
679 None,
680 None,
681 None,
682 None,
683 None,
684 None,
685 )
686 .await?
687 } else {
688 job_api::run_script_by_path(
689 &self.client_config,
690 &self.workspace,
691 ident,
692 args.as_object()
693 .ok_or(SdkError::BadValue(format!(
694 "Args should be Object, but it is: {}",
695 args
696 )))?
697 .clone()
698 .into_iter()
699 .collect(),
700 None,
701 None,
702 None,
703 None,
704 None,
705 None,
706 None,
707 None,
708 )
709 .await?
710 };
711 Ok(uuid)
712 }
713
714 /// Waits for a job to complete and returns its result.
715 ///
716 /// This function provides both synchronous and asynchronous interfaces for waiting
717 /// on job completion, with timeout handling and result validation.
718 ///
719 /// # Parameters
720 /// - `job_id`: The ID of the job to wait for
721 /// - `timeout_secs`: Maximum time to wait (in seconds) before cancelling the job
722 /// - `verbose`: Whether to print progress information
723 /// - `assert_result_is_not_none`: Whether to fail if the job returns no result
724 ///
725 /// # Errors
726 /// Returns `SdkError` if:
727 /// - Job fails or times out
728 /// - Result assertion fails when `assert_result_is_not_none` is true
729 /// - Underlying API calls fail
730 ///
731 /// # Behavior
732 /// 1. Polls job status at 500ms intervals
733 /// 2. Cancels job if timeout is reached
734 /// 3. Validates success status and optional result presence
735 /// 4. Returns either the result or appropriate error
736 pub fn wait_job<'a>(
737 &'a self,
738 job_id: &'a str,
739 timeout_secs: Option<u64>,
740 verbose: bool,
741 assert_result_is_not_none: bool,
742 ) -> MaybeFuture<'a, Result<Value, SdkError>> {
743 ret!(self.wait_job_inner(job_id, timeout_secs, verbose, assert_result_is_not_none));
744 }
745
746 async fn wait_job_inner<'a>(
747 &'a self,
748 job_id: &'a str,
749 timeout_secs: Option<u64>,
750 verbose: bool,
751 assert_result_is_not_none: bool,
752 ) -> Result<Value, SdkError> {
753 let start = Instant::now();
754 loop {
755 if let Some(timeout) = timeout_secs {
756 if start.elapsed().as_secs() > timeout {
757 println!("WARN: reached timeout for {job_id}. Cancelling the job.");
758 job_api::cancel_queued_job(
759 &self.client_config,
760 &self.workspace,
761 job_id,
762 windmill_api::models::CancelQueuedJobRequest {
763 reason: Some("reached timeout".into()),
764 },
765 )
766 .await?;
767 }
768 }
769 let GetCompletedJobResultMaybe200Response {
770 completed,
771 result,
772 success,
773 started,
774 } = job_api::get_completed_job_result_maybe(
775 &self.client_config,
776 &self.workspace,
777 job_id,
778 Some(true),
779 )
780 .await?;
781
782 if matches!(started, Some(false)) && verbose {
783 println!("INFO: job {job_id} has not started yet");
784 }
785
786 if completed {
787 if matches!(success, Some(true)) {
788 if result.is_none() && assert_result_is_not_none {
789 return Err(SdkError::ExecutionError("Result was None".into()));
790 }
791 return Ok(result.unwrap_or_default());
792 } else {
793 return Err(SdkError::ExecutionError(format!(
794 "Job {job_id} was not successful: {:?}",
795 result
796 )));
797 }
798 }
799 if verbose {
800 println!("INFO: sleeping 0.5 seconds for {job_id}");
801 }
802 tokio::time::sleep(Duration::from_millis(500)).await;
803 }
804 }
805
806 /// Retrieves the current status of a Windmill job by its UUID.
807 ///
808 /// This function queries the Windmill backend to determine whether a job is:
809 /// - Waiting to be executed
810 /// - Currently running
811 /// - Already completed
812 ///
813 /// # Arguments
814 /// * `job_id` - The UUID of the job to check (format: "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx")
815 ///
816 /// # Job Lifecycle States
817 /// - **Waiting**: Job is queued but hasn't started execution
818 /// - **Running**: Job is currently being executed
819 /// - **Completed**: Job has finished (successfully or with errors)
820 pub fn get_job_status<'a>(
821 &'a self,
822 job_id: &'a str,
823 ) -> MaybeFuture<'a, Result<JobStatus, SdkError>> {
824 ret!(self.get_job_status_inner(job_id));
825 }
826
827 async fn get_job_status_inner<'a>(&'a self, job_id: &'a str) -> Result<JobStatus, SdkError> {
828 let job =
829 job_api::get_job(&self.client_config, &self.workspace, job_id, Some(true)).await?;
830
831 Ok(match job {
832 windmill_api::models::Job::JobOneOf(..) => JobStatus::Completed,
833 windmill_api::models::Job::JobOneOf1(job_one_of1) => {
834 if job_one_of1.running {
835 JobStatus::Running
836 } else {
837 JobStatus::Waiting
838 }
839 }
840 })
841 }
842
843 /// Retrieves the result of a completed job.
844 ///
845 /// This provides both synchronous and asynchronous interfaces for fetching
846 /// the final result of a successfully completed job.
847 ///
848 /// # Parameters
849 /// - `job_id`: The ID of the completed job to fetch results for
850 ///
851 /// # Errors
852 /// Returns `SdkError` if:
853 /// - Job is not found
854 /// - Job failed to complete successfully
855 /// - Underlying API calls fail
856 /// - Result cannot be parsed
857 ///
858 /// # Notes
859 /// - Only works for jobs that have already completed
860 /// - For pending/running jobs, use `wait_job` instead
861 /// - Does not handle timeouts or polling - assumes job is already complete
862 pub fn get_result<'a>(&'a self, job_id: &'a str) -> MaybeFuture<'a, Result<Value, SdkError>> {
863 ret!(self.get_result_inner(job_id));
864 }
865
866 async fn get_result_inner<'a>(&'a self, job_id: &'a str) -> Result<Value, SdkError> {
867 Ok(job_api::get_completed_job_result(
868 &self.client_config,
869 &self.workspace,
870 job_id,
871 None,
872 None,
873 None,
874 None,
875 )
876 .await?)
877 }
878
879 /// Updates the progress percentage of a running Windmill job.
880 ///
881 /// This function allows scripts to report their execution progress (0-100%) back to the Windmill UI.
882 /// Progress updates are visible in both the jobs dashboard and flow visualizations.
883 ///
884 /// # Arguments
885 /// * `value` - Progress percentage (0-100)
886 /// * `job_id` - Optional job UUID. If None, uses current job's ID from `WM_JOB_ID` environment variable
887 ///
888 /// # Examples
889 /// ```no_run
890 /// use wmill::Windmill;
891 /// # fn main () -> anyhow::Result<()>{
892 /// let wm = Windmill::default()?;
893 ///
894 /// // Report progress for current job
895 /// wm.set_progress(25, None)?;
896 /// # Ok(())
897 /// # }
898 ///
899 /// ```
900 ///
901 /// # Behavior Details
902 /// - Automatically handles flow context by detecting parent job ID
903 /// - Progress updates are reflected in real-time in the Windmill UI
904 /// - Typical usage pattern:
905 /// ```ignore
906 /// for (i, item) in items.iter().enumerate() {
907 /// process(item);
908 /// let progress = ((i + 1) * 100 / items.len()) as i32;
909 /// wmill.set_progress(progress, None).await?;
910 /// }
911 /// ```
912 ///
913 /// # Notes
914 /// - Only affects jobs that are currently running
915 /// - Progress values outside 0-99 range are clamped by the server
916 /// - Progress cannot decrease
917 /// - For flows, updates the progress of both the step and parent flow
918 ///
919 /// # See Also
920 /// - [Flow Progress Tracking](https://www.windmill.dev/docs/advanced/explicit_progress)
921 /// - [`Self::get_progress`] - For reading job progress
922 pub fn set_progress<'a>(
923 &'a self,
924 value: u32,
925 job_id: Option<String>,
926 ) -> MaybeFuture<'a, Result<(), SdkError>> {
927 let f = async move {
928 let job_id = job_id.unwrap_or(var("WM_JOB_ID")?);
929 let job =
930 job_api::get_job(&self.client_config, &self.workspace, &job_id, Some(true)).await?;
931
932 let flow_id = match job {
933 windmill_api::models::Job::JobOneOf(job) => job.parent_job,
934 windmill_api::models::Job::JobOneOf1(job) => job.parent_job,
935 };
936
937 windmill_api::apis::metrics_api::set_job_progress(
938 &self.client_config,
939 &self.workspace,
940 &job_id,
941 windmill_api::models::SetJobProgressRequest {
942 percent: Some(value as i32),
943 flow_job_id: flow_id,
944 },
945 )
946 .await?;
947 Ok(())
948 };
949 ret!(f);
950 }
951
952 /// Retrieves the current progress percentage of a Windmill job.
953 ///
954 /// This function queries the Windmill backend to get the execution progress (0-100%)
955 /// of either a specific job or the current job context.
956 ///
957 /// # Arguments
958 /// * `job_id` - Optional job UUID. If `None`, uses current job's ID from `WM_JOB_ID` env var
959 ///
960 /// # See Also
961 /// - [Flow Progress Tracking](https://www.windmill.dev/docs/advanced/explicit_progress)
962 /// - [`Self::set_progress`] - For updating job progress
963 pub fn get_progress<'a>(
964 &'a self,
965 job_id: Option<String>,
966 ) -> MaybeFuture<'a, Result<u32, SdkError>> {
967 let f = async move {
968 let job_id = job_id.unwrap_or(var("WM_JOB_ID")?);
969 Ok(windmill_api::apis::metrics_api::get_job_progress(
970 &self.client_config,
971 &self.workspace,
972 &job_id,
973 )
974 .await
975 .map(|v| v as u32)?)
976 };
977 ret!(f);
978 }
979
980 /// Executes an API call in either asynchronous or synchronous mode based on compilation context.
981 ///
982 /// This function serves as a bridge between async and sync code, automatically adapting its behavior:
983 /// - With `async` feature: Returns a boxed future for later await
984 /// - Without `async` feature: Blocks immediately using the global runtime
985 ///
986 /// # Examples
987 ///
988 /// ## Async usage (with `async` feature)
989 /// ```ignore
990 /// use wmill::Windmill;
991 /// # #[tokio::main]
992 /// # async fn main() -> anyhow::Result<()>{
993 /// let wm = Windmill::default()?;
994 /// let user = wm.call_api(wmill::apis::admin_api::get_user(&wm.client_config, &wm.workspace, "Bob"))?;
995 /// println!("User details: {:?}", user);
996 /// # Ok(())
997 /// # }
998 /// ```
999 ///
1000 /// ## Sync usage (without `async` feature)
1001 /// ```ignore
1002 /// use wmill::Windmill;
1003 /// let wm = Windmill::new(Some("<TOKEN>".into()), Some("admins".into()), Some("http://localhost:5000/api".into()))?;
1004 /// let user = wm.call_api(wmill::apis::admin_api::get_user(&wm.client_config, &wm.workspace, "Bob"));
1005 /// println!("User details: {:?}", user);
1006 /// # Ok::<(), wmill::SdkError>(())
1007 /// ```
1008 pub fn call_api<'a, T>(
1009 &'a self,
1010 callback: impl Future<Output = T> + std::marker::Send + 'a,
1011 ) -> MaybeFuture<'a, T> {
1012 ret!(callback);
1013 }
1014
1015 // pub fn get_version() {}
1016}
1017
1018pub enum JobStatus {
1019 Running,
1020 Waiting,
1021 Completed,
1022}
1023
1024fn get_state_path() -> Result<String, SdkError> {
1025 Ok(var("WM_STATE_PATH_NEW").unwrap_or(var("WM_STATE_PATH")?))
1026}
1027
1028#[derive(thiserror::Error, Debug)]
1029pub enum SdkError {
1030 #[error("Error: {0}")]
1031 Serde(#[from] serde_json::Error),
1032 #[error("Having troubles reading Env Variable: {0}")]
1033 VarError(#[from] std::env::VarError),
1034 #[error("Api error: {0}")]
1035 ApiError(String),
1036 #[error("Internal Error: {0}")]
1037 InternalError(String),
1038 #[error("Specified value is incorrect: {0}")]
1039 BadValue(String),
1040 #[error("{0}")]
1041 ExecutionError(String),
1042}
1043impl<T: for<'a> Deserialize<'a>> From<windmill_api::apis::Error<T>> for SdkError {
1044 fn from(value: windmill_api::apis::Error<T>) -> Self {
1045 Self::ApiError(value.to_string())
1046 }
1047}