1use crate::{
2 build_request,
3 error::{surf_to_tool_error, ToolError},
4 settings::Core,
5 workspace, BASE_URL,
6};
7use async_std::{
8 sync::{Arc, Mutex, RwLock},
9 task::{self, JoinHandle},
10};
11use log::{error, info};
12use serde::{Deserialize, Serialize};
13use serde_json::json;
14use std::{
15 fmt::{Display, Formatter},
16 thread,
17 time::Duration,
18};
19use surf::{http::Method, Client};
20use url::Url;
21
22pub const COMPLETED_STATUSES: [Status; 2] =
24 [Status::Applied, Status::PlannedAndFinished];
25
26pub const NO_APPLY_END_STATUSES: [Status; 2] =
27 [Status::Planned, Status::CostEstimated];
28
29pub const ERROR_STATUSES: [Status; 8] = [
31 Status::Canceled,
32 Status::ForceCanceled,
33 Status::Errored,
34 Status::Discarded,
35 Status::Failed,
36 Status::Unreachable,
37 Status::Unknown,
38 Status::PolicySoftFailed,
39];
40
41#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
43#[serde(rename_all = "snake_case")]
44#[non_exhaustive]
45pub enum Status {
46 ApplyQueued,
47 Pending,
48 PlanQueued,
49 Queuing,
50 Queued,
51 Fetching,
52 CostEstimating,
53 CostEstimated,
54 FetchingCompleted,
55 PrePlanRunning,
56 PrePlanCompleted,
57 ManagedQueued,
58 Running,
59 Passed,
60 Failed,
61 Applying,
62 Planning,
63 Planned,
64 Applied,
65 Canceled,
66 Errored,
67 Discarded,
68 PlannedAndFinished,
69 PlannedAndSaved,
70 PolicyChecking,
71 PolicyOverride,
72 PolicySoftFailed,
73 Unreachable,
74 ForceCanceled,
75 #[default]
76 Unknown,
77 Finished,
78}
79
80impl Display for Status {
81 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
82 match self {
83 Status::ApplyQueued => write!(f, "apply_queued"),
84 Status::Pending => write!(f, "pending"),
85 Status::PlanQueued => write!(f, "plan_queued"),
86 Status::Queuing => write!(f, "queuing"),
87 Status::Queued => write!(f, "queued"),
88 Status::Fetching => write!(f, "fetching"),
89 Status::CostEstimating => write!(f, "cost_estimating"),
90 Status::CostEstimated => write!(f, "cost_estimated"),
91 Status::FetchingCompleted => write!(f, "fetching_completed"),
92 Status::PrePlanRunning => write!(f, "pre_plan_running"),
93 Status::PrePlanCompleted => write!(f, "pre_plan_completed"),
94 Status::ManagedQueued => write!(f, "managed_queued"),
95 Status::Running => write!(f, "running"),
96 Status::Passed => write!(f, "passed"),
97 Status::Failed => write!(f, "failed"),
98 Status::Applying => write!(f, "applying"),
99 Status::Planning => write!(f, "planning"),
100 Status::Planned => write!(f, "planned"),
101 Status::Applied => write!(f, "applied"),
102 Status::Canceled => write!(f, "canceled"),
103 Status::Errored => write!(f, "errored"),
104 Status::Discarded => write!(f, "discarded"),
105 Status::PlannedAndFinished => write!(f, "planned_and_finished"),
106 Status::PlannedAndSaved => write!(f, "planned_and_saved"),
107 Status::PolicyChecking => write!(f, "policy_checking"),
108 Status::PolicyOverride => write!(f, "policy_override"),
109 Status::PolicySoftFailed => write!(f, "policy_soft_failed"),
110 Status::Unreachable => write!(f, "unreachable"),
111 Status::ForceCanceled => write!(f, "force_canceled"),
112 Status::Unknown => write!(f, "unknown"),
113 Status::Finished => write!(f, "finished"),
114 }
115 }
116}
117
118impl From<String> for Status {
119 fn from(item: String) -> Self {
120 match item.as_str() {
121 "apply_queued" => Status::ApplyQueued,
122 "pending" => Status::Pending,
123 "plan_queued" => Status::PlanQueued,
124 "queuing" => Status::Queuing,
125 "queued" => Status::Queued,
126 "fetching" => Status::Fetching,
127 "cost_estimating" => Status::CostEstimating,
128 "cost_estimated" => Status::CostEstimated,
129 "fetching_completed" => Status::FetchingCompleted,
130 "pre_plan_running" => Status::PrePlanRunning,
131 "pre_plan_completed" => Status::PrePlanCompleted,
132 "managed_queued" => Status::ManagedQueued,
133 "running" => Status::Running,
134 "passed" => Status::Passed,
135 "failed" => Status::Failed,
136 "applying" => Status::Applying,
137 "planning" => Status::Planning,
138 "planned" => Status::Planned,
139 "applied" => Status::Applied,
140 "canceled" => Status::Canceled,
141 "errored" => Status::Errored,
142 "discarded" => Status::Discarded,
143 "planned_and_finished" => Status::PlannedAndFinished,
144 "planned_and_saved" => Status::PlannedAndSaved,
145 "policy_checking" => Status::PolicyChecking,
146 "policy_override" => Status::PolicyOverride,
147 "policy_soft_failed" => Status::PolicySoftFailed,
148 "unreachable" => Status::Unreachable,
149 "force_canceled" => Status::ForceCanceled,
150 "unknown" => Status::Unknown,
151 "finished" => Status::Finished,
152 _ => Status::Unknown,
153 }
154 }
155}
156
157#[derive(Clone, Debug, Deserialize, Serialize)]
158#[serde(rename_all = "kebab-case")]
159pub struct Attributes {
160 pub message: String,
161 #[serde(skip_serializing_if = "Option::is_none")]
162 pub terraform_version: Option<String>,
163 #[serde(skip_serializing_if = "Option::is_none")]
164 pub plan_only: Option<bool>,
165 #[serde(skip_serializing_if = "Option::is_none")]
166 pub save_plan: Option<bool>,
167 pub target_addrs: Vec<String>,
168 pub replace_addrs: Vec<String>,
169 #[serde(skip_serializing_if = "Option::is_none")]
170 pub refresh: Option<bool>,
171 #[serde(skip_serializing_if = "Option::is_none")]
172 pub refresh_only: Option<bool>,
173 #[serde(skip_serializing_if = "Option::is_none")]
174 pub auto_apply: Option<bool>,
175 pub allow_empty_apply: bool,
176 #[serde(skip_serializing_if = "Option::is_none")]
177 pub is_destroy: Option<bool>,
178 #[serde(skip_serializing_if = "Option::is_none")]
179 pub created_at: Option<String>,
180 #[serde(skip_serializing_if = "Option::is_none")]
181 pub status: Option<Status>,
182}
183
184impl Default for Attributes {
185 fn default() -> Self {
186 Self {
187 message: "Run created by tfc-toolset".to_string(),
188 terraform_version: None,
189 plan_only: None,
190 save_plan: None,
191 target_addrs: Vec::new(),
192 replace_addrs: Vec::new(),
193 refresh: None,
194 refresh_only: None,
195 auto_apply: None,
196 allow_empty_apply: false,
197 is_destroy: None,
198 created_at: None,
199 status: None,
200 }
201 }
202}
203
204#[derive(Clone, Debug, Deserialize, Serialize)]
205pub struct WorkspaceOuter {
206 pub data: Workspace,
207}
208
209#[derive(Clone, Debug, Deserialize, Serialize)]
210pub struct Workspace {
211 #[serde(rename = "type")]
212 pub relationship_type: String,
213 pub id: String,
214}
215
216#[derive(Clone, Debug, Deserialize, Serialize)]
217pub struct Relationships {
218 pub workspace: WorkspaceOuter,
219}
220
221#[derive(Clone, Debug, Deserialize, Serialize)]
222pub struct Run {
223 #[serde(skip_serializing_if = "Option::is_none")]
224 pub id: Option<String>,
225 pub attributes: Attributes,
226 #[serde(rename = "type")]
227 pub request_type: String,
228 pub relationships: Relationships,
229 #[serde(skip_serializing_if = "Option::is_none")]
230 pub links: Option<Links>,
231}
232#[derive(Clone, Debug, Deserialize, Serialize)]
233struct RunOuter {
234 pub data: Run,
235}
236
237impl RunOuter {
238 fn new(workspace_id: &str, attributes: Option<Attributes>) -> Self {
239 Self {
240 data: Run {
241 id: None,
242 attributes: attributes.unwrap_or_default(),
243 request_type: "runs".to_string(),
244 relationships: Relationships {
245 workspace: WorkspaceOuter {
246 data: Workspace {
247 relationship_type: "workspaces".to_string(),
248 id: workspace_id.to_string(),
249 },
250 },
251 },
252 links: None,
253 },
254 }
255 }
256}
257
258#[derive(Clone, Debug, Deserialize, Serialize)]
259pub struct Links {
260 #[serde(rename = "self")]
261 pub self_link: String,
262}
263
264#[derive(Clone, Debug, Deserialize, Serialize)]
265#[serde(rename_all = "kebab-case")]
266pub struct QueueOptions {
267 pub max_concurrent: usize,
268 pub max_iterations: usize,
269 pub status_check_sleep_seconds: u64,
270 pub cancel_on_timeout: bool,
271}
272
273#[derive(Clone, Debug, Deserialize, Serialize)]
274pub struct QueueResult {
275 pub results: Vec<RunResult>,
276 pub errors: Vec<RunResult>,
277}
278
279#[derive(Clone, Debug, Deserialize, Serialize)]
280#[serde(rename_all = "kebab-case")]
281pub struct RunResult {
282 pub id: RunId,
283 pub status: String,
284 pub workspace_id: String,
285}
286
287pub type RunId = String;
288
289pub async fn create(
290 workspace_id: &str,
291 attributes: Option<Attributes>,
292 config: &Core,
293 client: Client,
294) -> Result<Run, ToolError> {
295 info!("Creating run for workspace: {}", workspace_id);
296 let url = Url::parse(&format!("{}/runs", BASE_URL))?;
297 let req = build_request(
298 Method::Post,
299 url,
300 config,
301 Some(json!(RunOuter::new(workspace_id, attributes))),
302 );
303 match client.send(req).await {
304 Ok(mut r) => {
305 if r.status().is_success() {
306 info!("Successfully created run!");
307 let res = r.body_string().await.map_err(surf_to_tool_error)?;
308 let run: RunOuter = serde_json::from_str(&res)?;
309 Ok(run.data)
310 } else {
311 let error =
312 r.body_string().await.map_err(surf_to_tool_error)?;
313 error!("Failed to create run: {}", error);
314 Err(ToolError::General(anyhow::anyhow!(error)))
315 }
316 }
317 Err(e) => Err(surf_to_tool_error(e)),
318 }
319}
320
321pub async fn status(
322 run_id: &str,
323 config: &Core,
324 client: Client,
325) -> Result<Run, ToolError> {
326 info!("Getting status for run: {}", run_id);
327 let url = Url::parse(&format!("{}/runs/{}", BASE_URL, run_id))?;
328 let req = build_request(Method::Get, url, config, None);
329 match client.send(req).await {
330 Ok(mut r) => {
331 if r.status().is_success() {
332 info!("Successfully retrieved run status!");
333 let res = r.body_string().await.map_err(surf_to_tool_error)?;
334 let run: RunOuter = serde_json::from_str(&res)?;
335 Ok(run.data)
336 } else {
337 error!("Failed to retrieve run status :(");
338 let error =
339 r.body_string().await.map_err(surf_to_tool_error)?;
340 Err(ToolError::General(anyhow::anyhow!(error)))
341 }
342 }
343 Err(e) => Err(surf_to_tool_error(e)),
344 }
345}
346
347pub async fn cancel(
348 run_id: &str,
349 config: &Core,
350 client: Client,
351) -> Result<(), ToolError> {
352 info!("Cancelling run: {}", run_id);
353 let url =
354 Url::parse(&format!("{}/runs/{}/actions/cancel", BASE_URL, run_id))?;
355 let req = build_request(Method::Post, url, config, None);
356 match client.send(req).await {
357 Ok(mut r) => {
358 if r.status().is_success() {
359 info!("Successfully cancelled run!");
360 Ok(())
361 } else {
362 error!("Failed to cancel run :(");
363 let error =
364 r.body_string().await.map_err(surf_to_tool_error)?;
365 Err(ToolError::General(anyhow::anyhow!(error)))
366 }
367 }
368 Err(e) => Err(surf_to_tool_error(e)),
369 }
370}
371
372pub async fn discard(
373 run_id: &str,
374 config: &Core,
375 client: Client,
376) -> Result<(), ToolError> {
377 info!("Discarding run: {}", run_id);
378 let url =
379 Url::parse(&format!("{}/runs/{}/actions/discard", BASE_URL, run_id))?;
380 let req = build_request(Method::Post, url, config, None);
381 match client.send(req).await {
382 Ok(mut r) => {
383 if r.status().is_success() {
384 info!("Successfully discarded run!");
385 Ok(())
386 } else {
387 error!("Failed to discard run :(");
388 let error =
389 r.body_string().await.map_err(surf_to_tool_error)?;
390 Err(ToolError::General(anyhow::anyhow!(error)))
391 }
392 }
393 Err(e) => Err(surf_to_tool_error(e)),
394 }
395}
396
397fn run_has_ended(
398 status: &Status,
399 will_auto_apply: bool,
400 will_save_plan: bool,
401) -> bool {
402 COMPLETED_STATUSES.contains(status)
403 || ERROR_STATUSES.contains(status)
404 || !will_auto_apply && NO_APPLY_END_STATUSES.contains(status)
405 || will_save_plan && status == &Status::PlannedAndSaved
406}
407
408fn build_handle(
409 id: String,
410 options: QueueOptions,
411 attributes: Attributes,
412 core: Core,
413 client: Client,
414) -> JoinHandle<Result<RunResult, ToolError>> {
415 task::spawn(async move {
416 let will_auto_apply = attributes.auto_apply.unwrap_or(false);
417 let will_save_plan = attributes.save_plan.unwrap_or(false);
418 let mut iterations = 0;
419 let mut run =
420 create(&id.clone(), Some(attributes), &core, client.clone())
421 .await?;
422 let run_id = run.id.clone().unwrap();
423 info!("Run {} created for workspace {}", &run_id, &id.clone());
424 while !run_has_ended(
425 &run.attributes.status.clone().unwrap_or(Status::Unknown),
426 will_auto_apply,
427 will_save_plan,
428 ) {
429 run = status(&run_id, &core, client.clone()).await?;
430 let status =
431 run.attributes.status.clone().unwrap_or(Status::Unknown);
432 info!("Run {} status: {}", &run_id, &status);
433 if run_has_ended(&status, will_auto_apply, will_save_plan) {
434 break;
435 }
436 iterations += 1;
437 if iterations >= options.max_iterations {
438 error!(
439 "Run {} for workspace {} has been in status {} too long.",
440 &run_id,
441 &id.clone(),
442 &status.clone()
443 );
444 if status == Status::Pending {
445 error!(
446 "There is likely previous run pending. Please check the workspace in the UI."
447 );
448 } else {
449 error!(
450 "This is likely some error. Please check the run in the UI."
451 );
452 }
453 if options.cancel_on_timeout
454 && (status == Status::Pending || status == Status::Applying)
455 {
456 cancel(&run_id, &core, client.clone()).await?;
457 }
458 break;
459 }
460 async_std::task::sleep(Duration::from_secs(
461 options.status_check_sleep_seconds,
462 ))
463 .await;
464 }
465 Ok(RunResult {
466 id: run_id,
467 status: run
468 .attributes
469 .status
470 .unwrap_or(Status::Unknown)
471 .to_string(),
472 workspace_id: id,
473 })
474 })
475}
476
477pub async fn work_queue(
478 workspaces: Vec<workspace::Workspace>,
479 options: QueueOptions,
480 attributes: Attributes,
481 client: Client,
482 core: &Core,
483) -> Result<QueueResult, ToolError> {
484 let queue: Arc<Mutex<Vec<workspace::Workspace>>> =
485 Arc::new(Mutex::new(Vec::with_capacity(workspaces.len())));
486 let results: Arc<Mutex<Vec<RunResult>>> =
487 Arc::new(Mutex::new(Vec::with_capacity(workspaces.len())));
488 let errors: Arc<Mutex<Vec<RunResult>>> = Arc::new(Mutex::new(Vec::new()));
489
490 let max_concurrent = options.max_concurrent;
491
492 let opts = Arc::new(RwLock::new(options));
493 let attrs = Arc::new(RwLock::new(attributes));
494 let c = Arc::new(RwLock::new(core.clone()));
495 let cl = Arc::new(RwLock::new(client));
496
497 for ws in workspaces {
498 queue.lock().await.push(ws);
499 }
500
501 let mut handles = vec![];
502
503 for _ in 0..max_concurrent {
504 let queue_clone = Arc::clone(&queue);
505 let results_clone = Arc::clone(&results);
506 let errors_clone = Arc::clone(&errors);
507 let opts_clone = Arc::clone(&opts);
508 let attrs_clone = Arc::clone(&attrs);
509 let core_clone = Arc::clone(&c);
510 let client_clone = Arc::clone(&cl);
511
512 let handle = thread::spawn(move || {
513 task::block_on(async {
514 loop {
515 let work = {
516 let mut queue = queue_clone.lock().await;
517 if queue.is_empty() {
518 None
519 } else {
520 Some(queue.pop().unwrap())
521 }
522 };
523
524 if let Some(work) = work {
525 let run_result = build_handle(
526 work.id.clone(),
527 opts_clone.read().await.clone(),
528 attrs_clone.read().await.clone(),
529 core_clone.read().await.clone(),
530 client_clone.read().await.clone(),
531 )
532 .await;
533 match run_result {
534 Ok(result) => {
535 results_clone.lock().await.push(result);
536 }
537 Err(e) => {
538 errors_clone.lock().await.push(RunResult {
539 id: "unknown".to_string(),
540 status: e.to_string(),
541 workspace_id: work.id.clone(),
542 });
543 error!(
544 "Error processing workspace {}: {}",
545 work.id, e
546 );
547 }
548 }
549 } else {
550 info!("No more work to do, thread exiting.");
551 break;
552 }
553 }
554 });
555 });
556 handles.push(handle);
557 }
558
559 for handle in handles {
560 handle.join().unwrap();
561 }
562
563 let return_results = results.lock().await.clone();
564 let return_errors = errors.lock().await.clone();
565
566 Ok(QueueResult { results: return_results, errors: return_errors })
567}