terraform_wrapper/
streaming.rs1use std::process::Stdio;
28
29use serde::{Deserialize, Serialize};
30use tokio::io::{AsyncBufReadExt, BufReader};
31use tokio::process::Command as TokioCommand;
32use tracing::{debug, trace, warn};
33
34use crate::Terraform;
35use crate::command::TerraformCommand;
36use crate::error::{Error, Result};
37use crate::exec::CommandOutput;
38
39#[derive(Debug, Clone, Deserialize, Serialize)]
44pub struct JsonLogLine {
45 #[serde(rename = "@level")]
47 pub level: String,
48 #[serde(rename = "@message")]
50 pub message: String,
51 #[serde(rename = "@module", default)]
53 pub module: String,
54 #[serde(rename = "@timestamp", default)]
56 pub timestamp: String,
57 #[serde(rename = "type", default)]
61 pub log_type: String,
62 #[serde(default)]
64 pub change: serde_json::Value,
65 #[serde(default)]
67 pub hook: serde_json::Value,
68 #[serde(default)]
70 pub changes: serde_json::Value,
71 #[serde(default)]
73 pub outputs: serde_json::Value,
74}
75
76pub async fn stream_terraform<C, F>(
89 tf: &Terraform,
90 command: C,
91 mut handler: F,
92) -> Result<CommandOutput>
93where
94 C: TerraformCommand,
95 F: FnMut(JsonLogLine),
96{
97 let mut args = command.args();
98
99 if tf.no_input {
101 let subcmd = args.first().map(String::as_str).unwrap_or("");
102 if matches!(subcmd, "init" | "plan" | "apply" | "destroy" | "import") {
103 args.insert(1, "-input=false".to_string());
104 }
105 }
106
107 let mut cmd = TokioCommand::new(&tf.binary);
108
109 if let Some(ref working_dir) = tf.working_dir {
110 cmd.arg(format!("-chdir={}", working_dir.display()));
111 }
112
113 for arg in &args {
114 cmd.arg(arg);
115 }
116
117 for arg in &tf.global_args {
119 cmd.arg(arg);
120 }
121
122 for (key, value) in &tf.env {
123 cmd.env(key, value);
124 }
125
126 cmd.stdout(Stdio::piped());
127 cmd.stderr(Stdio::piped());
128
129 trace!(binary = ?tf.binary, args = ?args, "streaming terraform command");
130
131 let mut child = cmd.spawn().map_err(|e| {
132 if e.kind() == std::io::ErrorKind::NotFound {
133 Error::NotFound
134 } else {
135 Error::Io {
136 message: format!("failed to spawn terraform: {e}"),
137 source: e,
138 }
139 }
140 })?;
141
142 let stdout = child.stdout.take().ok_or_else(|| Error::Io {
143 message: "failed to capture stdout".to_string(),
144 source: std::io::Error::other("no stdout"),
145 })?;
146
147 let mut reader = BufReader::new(stdout).lines();
148
149 while let Some(line) = reader.next_line().await.map_err(|e| Error::Io {
150 message: format!("failed to read stdout line: {e}"),
151 source: e,
152 })? {
153 trace!(%line, "stream line");
154 match serde_json::from_str::<JsonLogLine>(&line) {
155 Ok(log_line) => handler(log_line),
156 Err(e) => {
157 warn!(%line, error = %e, "failed to parse streaming json line, skipping");
158 }
159 }
160 }
161
162 let output = child.wait_with_output().await.map_err(|e| Error::Io {
163 message: format!("failed to wait for terraform: {e}"),
164 source: e,
165 })?;
166
167 let stderr = String::from_utf8_lossy(&output.stderr).to_string();
168 let exit_code = output.status.code().unwrap_or(-1);
169 let success = output.status.success();
170
171 debug!(exit_code, success, "streaming terraform command completed");
172
173 if !success {
174 return Err(Error::CommandFailed {
175 command: args.first().cloned().unwrap_or_default(),
176 exit_code,
177 stdout: String::new(),
178 stderr,
179 });
180 }
181
182 Ok(CommandOutput {
183 stdout: String::new(), stderr,
185 exit_code,
186 success,
187 })
188}