Skip to main content

terraform_wrapper/
streaming.rs

1//! Streaming JSON output from `terraform plan` and `terraform apply`.
2//!
3//! When run with `-json`, Terraform produces one JSON object per line (NDJSON),
4//! each representing an event like resource creation, progress, or completion.
5//!
6//! This module provides [`stream_terraform`] which yields [`JsonLogLine`] events
7//! as they arrive, useful for progress reporting in orchestration tools.
8//!
9//! # Example
10//!
11//! ```rust,no_run
12//! use terraform_wrapper::prelude::*;
13//! use terraform_wrapper::streaming::{stream_terraform, JsonLogLine};
14//!
15//! # async fn example() -> terraform_wrapper::error::Result<()> {
16//! let tf = Terraform::builder().working_dir("./infra").build()?;
17//!
18//! let result = stream_terraform(&tf, ApplyCommand::new().auto_approve().json(), |line| {
19//!     println!("[{}] {}", line.log_type, line.message);
20//! }).await?;
21//!
22//! assert!(result.success);
23//! # Ok(())
24//! # }
25//! ```
26
27use std::process::Stdio;
28
29use serde::{Deserialize, Serialize};
30use tokio::io::{AsyncBufReadExt, BufReader};
31use tokio::process::Command as TokioCommand;
32use tracing::{debug, trace, warn};
33
34use crate::Terraform;
35use crate::command::TerraformCommand;
36use crate::error::{Error, Result};
37use crate::exec::CommandOutput;
38
39/// A single JSON log line from Terraform's streaming output.
40///
41/// Terraform emits these as NDJSON (one per line) when commands are run
42/// with the `-json` flag.
43#[derive(Debug, Clone, Deserialize, Serialize)]
44pub struct JsonLogLine {
45    /// Log level: "info", "warn", "error".
46    #[serde(rename = "@level")]
47    pub level: String,
48    /// Human-readable message.
49    #[serde(rename = "@message")]
50    pub message: String,
51    /// Module that emitted the message (e.g., "terraform.ui").
52    #[serde(rename = "@module", default)]
53    pub module: String,
54    /// ISO 8601 timestamp.
55    #[serde(rename = "@timestamp", default)]
56    pub timestamp: String,
57    /// Event type: "version", "planned_change", "change_summary",
58    /// "apply_start", "apply_progress", "apply_complete",
59    /// "apply_errored", "outputs", etc.
60    #[serde(rename = "type", default)]
61    pub log_type: String,
62    /// Change details (for planned_change events).
63    #[serde(default)]
64    pub change: serde_json::Value,
65    /// Hook details (for apply_start/apply_complete/apply_progress events).
66    #[serde(default)]
67    pub hook: serde_json::Value,
68    /// Change summary (for change_summary events).
69    #[serde(default)]
70    pub changes: serde_json::Value,
71    /// Output values (for outputs events).
72    #[serde(default)]
73    pub outputs: serde_json::Value,
74}
75
76/// Execute a Terraform command with streaming JSON output.
77///
78/// Spawns the terraform process and calls `handler` with each [`JsonLogLine`]
79/// as it arrives on stdout. Lines that fail to parse as JSON are logged and
80/// skipped.
81///
82/// Returns a [`CommandOutput`] with the complete stderr and exit code after
83/// the process finishes. Stdout is empty since lines were consumed by the
84/// handler.
85///
86/// The command should have `.json()` enabled. If not, lines won't parse and
87/// will be skipped.
88pub async fn stream_terraform<C, F>(
89    tf: &Terraform,
90    command: C,
91    mut handler: F,
92) -> Result<CommandOutput>
93where
94    C: TerraformCommand,
95    F: FnMut(JsonLogLine),
96{
97    let mut args = command.args();
98
99    // Inject -input=false for commands that support it (same as the command's execute())
100    if tf.no_input {
101        let subcmd = args.first().map(String::as_str).unwrap_or("");
102        if matches!(subcmd, "init" | "plan" | "apply" | "destroy" | "import") {
103            args.insert(1, "-input=false".to_string());
104        }
105    }
106
107    let mut cmd = TokioCommand::new(&tf.binary);
108
109    if let Some(ref working_dir) = tf.working_dir {
110        cmd.arg(format!("-chdir={}", working_dir.display()));
111    }
112
113    for arg in &args {
114        cmd.arg(arg);
115    }
116
117    // Global args at end (same ordering as exec.rs)
118    for arg in &tf.global_args {
119        cmd.arg(arg);
120    }
121
122    for (key, value) in &tf.env {
123        cmd.env(key, value);
124    }
125
126    cmd.stdout(Stdio::piped());
127    cmd.stderr(Stdio::piped());
128
129    trace!(binary = ?tf.binary, args = ?args, "streaming terraform command");
130
131    let mut child = cmd.spawn().map_err(|e| {
132        if e.kind() == std::io::ErrorKind::NotFound {
133            Error::NotFound
134        } else {
135            Error::Io {
136                message: format!("failed to spawn terraform: {e}"),
137                source: e,
138            }
139        }
140    })?;
141
142    let stdout = child.stdout.take().ok_or_else(|| Error::Io {
143        message: "failed to capture stdout".to_string(),
144        source: std::io::Error::other("no stdout"),
145    })?;
146
147    let mut reader = BufReader::new(stdout).lines();
148
149    while let Some(line) = reader.next_line().await.map_err(|e| Error::Io {
150        message: format!("failed to read stdout line: {e}"),
151        source: e,
152    })? {
153        trace!(%line, "stream line");
154        match serde_json::from_str::<JsonLogLine>(&line) {
155            Ok(log_line) => handler(log_line),
156            Err(e) => {
157                warn!(%line, error = %e, "failed to parse streaming json line, skipping");
158            }
159        }
160    }
161
162    let output = child.wait_with_output().await.map_err(|e| Error::Io {
163        message: format!("failed to wait for terraform: {e}"),
164        source: e,
165    })?;
166
167    let stderr = String::from_utf8_lossy(&output.stderr).to_string();
168    let exit_code = output.status.code().unwrap_or(-1);
169    let success = output.status.success();
170
171    debug!(exit_code, success, "streaming terraform command completed");
172
173    if !success {
174        return Err(Error::CommandFailed {
175            command: args.first().cloned().unwrap_or_default(),
176            exit_code,
177            stdout: String::new(),
178            stderr,
179        });
180    }
181
182    Ok(CommandOutput {
183        stdout: String::new(), // consumed by handler
184        stderr,
185        exit_code,
186        success,
187    })
188}