terraform_wrapper/
streaming.rs1use std::process::Stdio;
28
29use serde::{Deserialize, Serialize};
30use tokio::io::{AsyncBufReadExt, BufReader};
31use tokio::process::Command as TokioCommand;
32use tracing::{debug, trace, warn};
33
34use crate::Terraform;
35use crate::command::TerraformCommand;
36use crate::error::{Error, Result};
37use crate::exec::CommandOutput;
38
39#[derive(Debug, Clone, Deserialize, Serialize)]
44pub struct JsonLogLine {
45 #[serde(rename = "@level")]
47 pub level: String,
48 #[serde(rename = "@message")]
50 pub message: String,
51 #[serde(rename = "@module", default)]
53 pub module: String,
54 #[serde(rename = "@timestamp", default)]
56 pub timestamp: String,
57 #[serde(rename = "type", default)]
61 pub log_type: String,
62 #[serde(default)]
64 pub change: serde_json::Value,
65 #[serde(default)]
67 pub hook: serde_json::Value,
68 #[serde(default)]
70 pub changes: serde_json::Value,
71 #[serde(default)]
73 pub outputs: serde_json::Value,
74}
75
76pub async fn stream_terraform<C, F>(
96 tf: &Terraform,
97 command: C,
98 allowed_exit_codes: &[i32],
99 mut handler: F,
100) -> Result<CommandOutput>
101where
102 C: TerraformCommand,
103 F: FnMut(JsonLogLine),
104{
105 let args = command.prepare_args(tf);
106
107 let mut cmd = TokioCommand::new(&tf.binary);
108
109 if let Some(ref working_dir) = tf.working_dir {
110 cmd.arg(format!("-chdir={}", working_dir.display()));
111 }
112
113 for arg in &args {
114 cmd.arg(arg);
115 }
116
117 for arg in &tf.global_args {
119 cmd.arg(arg);
120 }
121
122 for (key, value) in &tf.env {
123 cmd.env(key, value);
124 }
125
126 cmd.stdout(Stdio::piped());
127 cmd.stderr(Stdio::piped());
128
129 trace!(binary = ?tf.binary, args = ?args, timeout_secs = ?tf.timeout.map(|t| t.as_secs()), "streaming terraform command");
130
131 let mut child = cmd.spawn().map_err(|e| {
132 if e.kind() == std::io::ErrorKind::NotFound {
133 Error::NotFound
134 } else {
135 Error::Io {
136 message: format!("failed to spawn terraform: {e}"),
137 source: e,
138 }
139 }
140 })?;
141
142 let stdout = child.stdout.take().ok_or_else(|| Error::Io {
143 message: "failed to capture stdout".to_string(),
144 source: std::io::Error::other("no stdout"),
145 })?;
146
147 let stream_and_wait = async {
148 let mut reader = BufReader::new(stdout).lines();
149
150 while let Some(line) = reader.next_line().await.map_err(|e| Error::Io {
151 message: format!("failed to read stdout line: {e}"),
152 source: e,
153 })? {
154 trace!(%line, "stream line");
155 match serde_json::from_str::<JsonLogLine>(&line) {
156 Ok(log_line) => handler(log_line),
157 Err(e) => {
158 warn!(%line, error = %e, "failed to parse streaming json line, skipping");
159 }
160 }
161 }
162
163 child.wait_with_output().await.map_err(|e| Error::Io {
164 message: format!("failed to wait for terraform: {e}"),
165 source: e,
166 })
167 };
168
169 let output = if let Some(duration) = tf.timeout {
170 match tokio::time::timeout(duration, stream_and_wait).await {
171 Ok(result) => result?,
172 Err(_) => {
173 warn!(
174 timeout_seconds = duration.as_secs(),
175 "streaming terraform command timed out"
176 );
177 return Err(Error::Timeout {
178 timeout_seconds: duration.as_secs(),
179 });
180 }
181 }
182 } else {
183 stream_and_wait.await?
184 };
185
186 let stderr = String::from_utf8_lossy(&output.stderr).to_string();
187 let exit_code = output.status.code().unwrap_or(-1);
188 let success = allowed_exit_codes.contains(&exit_code);
189
190 debug!(exit_code, success, "streaming terraform command completed");
191
192 if !success {
193 return Err(Error::CommandFailed {
194 command: args.first().cloned().unwrap_or_default(),
195 exit_code,
196 stdout: String::new(),
197 stderr,
198 });
199 }
200
201 Ok(CommandOutput {
202 stdout: String::new(), stderr,
204 exit_code,
205 success,
206 })
207}