tfc_toolset/
run.rs

1use crate::{
2    build_request,
3    error::{surf_to_tool_error, ToolError},
4    settings::Core,
5    workspace, BASE_URL,
6};
7use async_std::{
8    sync::{Arc, Mutex, RwLock},
9    task::{self, JoinHandle},
10};
11use log::{error, info};
12use serde::{Deserialize, Serialize};
13use serde_json::json;
14use std::{
15    fmt::{Display, Formatter},
16    thread,
17    time::Duration,
18};
19use surf::{http::Method, Client};
20use url::Url;
21
22// Statuses in Terraform Cloud that indicate a run is in a completed state
23pub const COMPLETED_STATUSES: [Status; 2] =
24    [Status::Applied, Status::PlannedAndFinished];
25
26pub const NO_APPLY_END_STATUSES: [Status; 2] =
27    [Status::Planned, Status::CostEstimated];
28
29// Statuses in Terraform Cloud that indicate a run is in an error state
30pub const ERROR_STATUSES: [Status; 8] = [
31    Status::Canceled,
32    Status::ForceCanceled,
33    Status::Errored,
34    Status::Discarded,
35    Status::Failed,
36    Status::Unreachable,
37    Status::Unknown,
38    Status::PolicySoftFailed,
39];
40
41// Statuses in Terraform Cloud
42#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
43#[serde(rename_all = "snake_case")]
44#[non_exhaustive]
45pub enum Status {
46    ApplyQueued,
47    Pending,
48    PlanQueued,
49    Queuing,
50    Queued,
51    Fetching,
52    CostEstimating,
53    CostEstimated,
54    FetchingCompleted,
55    PrePlanRunning,
56    PrePlanCompleted,
57    ManagedQueued,
58    Running,
59    Passed,
60    Failed,
61    Applying,
62    Planning,
63    Planned,
64    Applied,
65    Canceled,
66    Errored,
67    Discarded,
68    PlannedAndFinished,
69    PlannedAndSaved,
70    PolicyChecking,
71    PolicyOverride,
72    PolicySoftFailed,
73    Unreachable,
74    ForceCanceled,
75    #[default]
76    Unknown,
77    Finished,
78}
79
80impl Display for Status {
81    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
82        match self {
83            Status::ApplyQueued => write!(f, "apply_queued"),
84            Status::Pending => write!(f, "pending"),
85            Status::PlanQueued => write!(f, "plan_queued"),
86            Status::Queuing => write!(f, "queuing"),
87            Status::Queued => write!(f, "queued"),
88            Status::Fetching => write!(f, "fetching"),
89            Status::CostEstimating => write!(f, "cost_estimating"),
90            Status::CostEstimated => write!(f, "cost_estimated"),
91            Status::FetchingCompleted => write!(f, "fetching_completed"),
92            Status::PrePlanRunning => write!(f, "pre_plan_running"),
93            Status::PrePlanCompleted => write!(f, "pre_plan_completed"),
94            Status::ManagedQueued => write!(f, "managed_queued"),
95            Status::Running => write!(f, "running"),
96            Status::Passed => write!(f, "passed"),
97            Status::Failed => write!(f, "failed"),
98            Status::Applying => write!(f, "applying"),
99            Status::Planning => write!(f, "planning"),
100            Status::Planned => write!(f, "planned"),
101            Status::Applied => write!(f, "applied"),
102            Status::Canceled => write!(f, "canceled"),
103            Status::Errored => write!(f, "errored"),
104            Status::Discarded => write!(f, "discarded"),
105            Status::PlannedAndFinished => write!(f, "planned_and_finished"),
106            Status::PlannedAndSaved => write!(f, "planned_and_saved"),
107            Status::PolicyChecking => write!(f, "policy_checking"),
108            Status::PolicyOverride => write!(f, "policy_override"),
109            Status::PolicySoftFailed => write!(f, "policy_soft_failed"),
110            Status::Unreachable => write!(f, "unreachable"),
111            Status::ForceCanceled => write!(f, "force_canceled"),
112            Status::Unknown => write!(f, "unknown"),
113            Status::Finished => write!(f, "finished"),
114        }
115    }
116}
117
118impl From<String> for Status {
119    fn from(item: String) -> Self {
120        match item.as_str() {
121            "apply_queued" => Status::ApplyQueued,
122            "pending" => Status::Pending,
123            "plan_queued" => Status::PlanQueued,
124            "queuing" => Status::Queuing,
125            "queued" => Status::Queued,
126            "fetching" => Status::Fetching,
127            "cost_estimating" => Status::CostEstimating,
128            "cost_estimated" => Status::CostEstimated,
129            "fetching_completed" => Status::FetchingCompleted,
130            "pre_plan_running" => Status::PrePlanRunning,
131            "pre_plan_completed" => Status::PrePlanCompleted,
132            "managed_queued" => Status::ManagedQueued,
133            "running" => Status::Running,
134            "passed" => Status::Passed,
135            "failed" => Status::Failed,
136            "applying" => Status::Applying,
137            "planning" => Status::Planning,
138            "planned" => Status::Planned,
139            "applied" => Status::Applied,
140            "canceled" => Status::Canceled,
141            "errored" => Status::Errored,
142            "discarded" => Status::Discarded,
143            "planned_and_finished" => Status::PlannedAndFinished,
144            "planned_and_saved" => Status::PlannedAndSaved,
145            "policy_checking" => Status::PolicyChecking,
146            "policy_override" => Status::PolicyOverride,
147            "policy_soft_failed" => Status::PolicySoftFailed,
148            "unreachable" => Status::Unreachable,
149            "force_canceled" => Status::ForceCanceled,
150            "unknown" => Status::Unknown,
151            "finished" => Status::Finished,
152            _ => Status::Unknown,
153        }
154    }
155}
156
157#[derive(Clone, Debug, Deserialize, Serialize)]
158#[serde(rename_all = "kebab-case")]
159pub struct Attributes {
160    pub message: String,
161    #[serde(skip_serializing_if = "Option::is_none")]
162    pub terraform_version: Option<String>,
163    #[serde(skip_serializing_if = "Option::is_none")]
164    pub plan_only: Option<bool>,
165    #[serde(skip_serializing_if = "Option::is_none")]
166    pub save_plan: Option<bool>,
167    pub target_addrs: Vec<String>,
168    pub replace_addrs: Vec<String>,
169    #[serde(skip_serializing_if = "Option::is_none")]
170    pub refresh: Option<bool>,
171    #[serde(skip_serializing_if = "Option::is_none")]
172    pub refresh_only: Option<bool>,
173    #[serde(skip_serializing_if = "Option::is_none")]
174    pub auto_apply: Option<bool>,
175    pub allow_empty_apply: bool,
176    #[serde(skip_serializing_if = "Option::is_none")]
177    pub is_destroy: Option<bool>,
178    #[serde(skip_serializing_if = "Option::is_none")]
179    pub created_at: Option<String>,
180    #[serde(skip_serializing_if = "Option::is_none")]
181    pub status: Option<Status>,
182}
183
184impl Default for Attributes {
185    fn default() -> Self {
186        Self {
187            message: "Run created by tfc-toolset".to_string(),
188            terraform_version: None,
189            plan_only: None,
190            save_plan: None,
191            target_addrs: Vec::new(),
192            replace_addrs: Vec::new(),
193            refresh: None,
194            refresh_only: None,
195            auto_apply: None,
196            allow_empty_apply: false,
197            is_destroy: None,
198            created_at: None,
199            status: None,
200        }
201    }
202}
203
204#[derive(Clone, Debug, Deserialize, Serialize)]
205pub struct WorkspaceOuter {
206    pub data: Workspace,
207}
208
209#[derive(Clone, Debug, Deserialize, Serialize)]
210pub struct Workspace {
211    #[serde(rename = "type")]
212    pub relationship_type: String,
213    pub id: String,
214}
215
216#[derive(Clone, Debug, Deserialize, Serialize)]
217pub struct Relationships {
218    pub workspace: WorkspaceOuter,
219}
220
221#[derive(Clone, Debug, Deserialize, Serialize)]
222pub struct Run {
223    #[serde(skip_serializing_if = "Option::is_none")]
224    pub id: Option<String>,
225    pub attributes: Attributes,
226    #[serde(rename = "type")]
227    pub request_type: String,
228    pub relationships: Relationships,
229    #[serde(skip_serializing_if = "Option::is_none")]
230    pub links: Option<Links>,
231}
232#[derive(Clone, Debug, Deserialize, Serialize)]
233struct RunOuter {
234    pub data: Run,
235}
236
237impl RunOuter {
238    fn new(workspace_id: &str, attributes: Option<Attributes>) -> Self {
239        Self {
240            data: Run {
241                id: None,
242                attributes: attributes.unwrap_or_default(),
243                request_type: "runs".to_string(),
244                relationships: Relationships {
245                    workspace: WorkspaceOuter {
246                        data: Workspace {
247                            relationship_type: "workspaces".to_string(),
248                            id: workspace_id.to_string(),
249                        },
250                    },
251                },
252                links: None,
253            },
254        }
255    }
256}
257
258#[derive(Clone, Debug, Deserialize, Serialize)]
259pub struct Links {
260    #[serde(rename = "self")]
261    pub self_link: String,
262}
263
264#[derive(Clone, Debug, Deserialize, Serialize)]
265#[serde(rename_all = "kebab-case")]
266pub struct QueueOptions {
267    pub max_concurrent: usize,
268    pub max_iterations: usize,
269    pub status_check_sleep_seconds: u64,
270    pub cancel_on_timeout: bool,
271}
272
273#[derive(Clone, Debug, Deserialize, Serialize)]
274pub struct QueueResult {
275    pub results: Vec<RunResult>,
276    pub errors: Vec<RunResult>,
277}
278
279#[derive(Clone, Debug, Deserialize, Serialize)]
280#[serde(rename_all = "kebab-case")]
281pub struct RunResult {
282    pub id: RunId,
283    pub status: String,
284    pub workspace_id: String,
285}
286
287pub type RunId = String;
288
289pub async fn create(
290    workspace_id: &str,
291    attributes: Option<Attributes>,
292    config: &Core,
293    client: Client,
294) -> Result<Run, ToolError> {
295    info!("Creating run for workspace: {}", workspace_id);
296    let url = Url::parse(&format!("{}/runs", BASE_URL))?;
297    let req = build_request(
298        Method::Post,
299        url,
300        config,
301        Some(json!(RunOuter::new(workspace_id, attributes))),
302    );
303    match client.send(req).await {
304        Ok(mut r) => {
305            if r.status().is_success() {
306                info!("Successfully created run!");
307                let res = r.body_string().await.map_err(surf_to_tool_error)?;
308                let run: RunOuter = serde_json::from_str(&res)?;
309                Ok(run.data)
310            } else {
311                let error =
312                    r.body_string().await.map_err(surf_to_tool_error)?;
313                error!("Failed to create run: {}", error);
314                Err(ToolError::General(anyhow::anyhow!(error)))
315            }
316        }
317        Err(e) => Err(surf_to_tool_error(e)),
318    }
319}
320
321pub async fn status(
322    run_id: &str,
323    config: &Core,
324    client: Client,
325) -> Result<Run, ToolError> {
326    info!("Getting status for run: {}", run_id);
327    let url = Url::parse(&format!("{}/runs/{}", BASE_URL, run_id))?;
328    let req = build_request(Method::Get, url, config, None);
329    match client.send(req).await {
330        Ok(mut r) => {
331            if r.status().is_success() {
332                info!("Successfully retrieved run status!");
333                let res = r.body_string().await.map_err(surf_to_tool_error)?;
334                let run: RunOuter = serde_json::from_str(&res)?;
335                Ok(run.data)
336            } else {
337                error!("Failed to retrieve run status :(");
338                let error =
339                    r.body_string().await.map_err(surf_to_tool_error)?;
340                Err(ToolError::General(anyhow::anyhow!(error)))
341            }
342        }
343        Err(e) => Err(surf_to_tool_error(e)),
344    }
345}
346
347pub async fn cancel(
348    run_id: &str,
349    config: &Core,
350    client: Client,
351) -> Result<(), ToolError> {
352    info!("Cancelling run: {}", run_id);
353    let url =
354        Url::parse(&format!("{}/runs/{}/actions/cancel", BASE_URL, run_id))?;
355    let req = build_request(Method::Post, url, config, None);
356    match client.send(req).await {
357        Ok(mut r) => {
358            if r.status().is_success() {
359                info!("Successfully cancelled run!");
360                Ok(())
361            } else {
362                error!("Failed to cancel run :(");
363                let error =
364                    r.body_string().await.map_err(surf_to_tool_error)?;
365                Err(ToolError::General(anyhow::anyhow!(error)))
366            }
367        }
368        Err(e) => Err(surf_to_tool_error(e)),
369    }
370}
371
372pub async fn discard(
373    run_id: &str,
374    config: &Core,
375    client: Client,
376) -> Result<(), ToolError> {
377    info!("Discarding run: {}", run_id);
378    let url =
379        Url::parse(&format!("{}/runs/{}/actions/discard", BASE_URL, run_id))?;
380    let req = build_request(Method::Post, url, config, None);
381    match client.send(req).await {
382        Ok(mut r) => {
383            if r.status().is_success() {
384                info!("Successfully discarded run!");
385                Ok(())
386            } else {
387                error!("Failed to discard run :(");
388                let error =
389                    r.body_string().await.map_err(surf_to_tool_error)?;
390                Err(ToolError::General(anyhow::anyhow!(error)))
391            }
392        }
393        Err(e) => Err(surf_to_tool_error(e)),
394    }
395}
396
397fn run_has_ended(
398    status: &Status,
399    will_auto_apply: bool,
400    will_save_plan: bool,
401) -> bool {
402    COMPLETED_STATUSES.contains(status)
403        || ERROR_STATUSES.contains(status)
404        || !will_auto_apply && NO_APPLY_END_STATUSES.contains(status)
405        || will_save_plan && status == &Status::PlannedAndSaved
406}
407
408fn build_handle(
409    id: String,
410    options: QueueOptions,
411    attributes: Attributes,
412    core: Core,
413    client: Client,
414) -> JoinHandle<Result<RunResult, ToolError>> {
415    task::spawn(async move {
416        let will_auto_apply = attributes.auto_apply.unwrap_or(false);
417        let will_save_plan = attributes.save_plan.unwrap_or(false);
418        let mut iterations = 0;
419        let mut run =
420            create(&id.clone(), Some(attributes), &core, client.clone())
421                .await?;
422        let run_id = run.id.clone().unwrap();
423        info!("Run {} created for workspace {}", &run_id, &id.clone());
424        while !run_has_ended(
425            &run.attributes.status.clone().unwrap_or(Status::Unknown),
426            will_auto_apply,
427            will_save_plan,
428        ) {
429            run = status(&run_id, &core, client.clone()).await?;
430            let status =
431                run.attributes.status.clone().unwrap_or(Status::Unknown);
432            info!("Run {} status: {}", &run_id, &status);
433            if run_has_ended(&status, will_auto_apply, will_save_plan) {
434                break;
435            }
436            iterations += 1;
437            if iterations >= options.max_iterations {
438                error!(
439                    "Run {} for workspace {} has been in status {} too long.",
440                    &run_id,
441                    &id.clone(),
442                    &status.clone()
443                );
444                if status == Status::Pending {
445                    error!(
446                        "There is likely previous run pending. Please check the workspace in the UI."
447                    );
448                } else {
449                    error!(
450                        "This is likely some error. Please check the run in the UI."
451                    );
452                }
453                if options.cancel_on_timeout
454                    && (status == Status::Pending || status == Status::Applying)
455                {
456                    cancel(&run_id, &core, client.clone()).await?;
457                }
458                break;
459            }
460            async_std::task::sleep(Duration::from_secs(
461                options.status_check_sleep_seconds,
462            ))
463            .await;
464        }
465        Ok(RunResult {
466            id: run_id,
467            status: run
468                .attributes
469                .status
470                .unwrap_or(Status::Unknown)
471                .to_string(),
472            workspace_id: id,
473        })
474    })
475}
476
477pub async fn work_queue(
478    workspaces: Vec<workspace::Workspace>,
479    options: QueueOptions,
480    attributes: Attributes,
481    client: Client,
482    core: &Core,
483) -> Result<QueueResult, ToolError> {
484    let queue: Arc<Mutex<Vec<workspace::Workspace>>> =
485        Arc::new(Mutex::new(Vec::with_capacity(workspaces.len())));
486    let results: Arc<Mutex<Vec<RunResult>>> =
487        Arc::new(Mutex::new(Vec::with_capacity(workspaces.len())));
488    let errors: Arc<Mutex<Vec<RunResult>>> = Arc::new(Mutex::new(Vec::new()));
489
490    let max_concurrent = options.max_concurrent;
491
492    let opts = Arc::new(RwLock::new(options));
493    let attrs = Arc::new(RwLock::new(attributes));
494    let c = Arc::new(RwLock::new(core.clone()));
495    let cl = Arc::new(RwLock::new(client));
496
497    for ws in workspaces {
498        queue.lock().await.push(ws);
499    }
500
501    let mut handles = vec![];
502
503    for _ in 0..max_concurrent {
504        let queue_clone = Arc::clone(&queue);
505        let results_clone = Arc::clone(&results);
506        let errors_clone = Arc::clone(&errors);
507        let opts_clone = Arc::clone(&opts);
508        let attrs_clone = Arc::clone(&attrs);
509        let core_clone = Arc::clone(&c);
510        let client_clone = Arc::clone(&cl);
511
512        let handle = thread::spawn(move || {
513            task::block_on(async {
514                loop {
515                    let work = {
516                        let mut queue = queue_clone.lock().await;
517                        if queue.is_empty() {
518                            None
519                        } else {
520                            Some(queue.pop().unwrap())
521                        }
522                    };
523
524                    if let Some(work) = work {
525                        let run_result = build_handle(
526                            work.id.clone(),
527                            opts_clone.read().await.clone(),
528                            attrs_clone.read().await.clone(),
529                            core_clone.read().await.clone(),
530                            client_clone.read().await.clone(),
531                        )
532                        .await;
533                        match run_result {
534                            Ok(result) => {
535                                results_clone.lock().await.push(result);
536                            }
537                            Err(e) => {
538                                errors_clone.lock().await.push(RunResult {
539                                    id: "unknown".to_string(),
540                                    status: e.to_string(),
541                                    workspace_id: work.id.clone(),
542                                });
543                                error!(
544                                    "Error processing workspace {}: {}",
545                                    work.id, e
546                                );
547                            }
548                        }
549                    } else {
550                        info!("No more work to do, thread exiting.");
551                        break;
552                    }
553                }
554            });
555        });
556        handles.push(handle);
557    }
558
559    for handle in handles {
560        handle.join().unwrap();
561    }
562
563    let return_results = results.lock().await.clone();
564    let return_errors = errors.lock().await.clone();
565
566    Ok(QueueResult { results: return_results, errors: return_errors })
567}