Skip to main content

terraform_wrapper/
streaming.rs

1//! Streaming JSON output from `terraform plan` and `terraform apply`.
2//!
3//! When run with `-json`, Terraform produces one JSON object per line (NDJSON),
4//! each representing an event like resource creation, progress, or completion.
5//!
6//! This module provides [`stream_terraform`] which yields [`JsonLogLine`] events
7//! as they arrive, useful for progress reporting in orchestration tools.
8//!
9//! # Example
10//!
11//! ```rust,no_run
12//! use terraform_wrapper::prelude::*;
13//! use terraform_wrapper::streaming::{stream_terraform, JsonLogLine};
14//!
15//! # async fn example() -> terraform_wrapper::error::Result<()> {
16//! let tf = Terraform::builder().working_dir("./infra").build()?;
17//!
18//! let result = stream_terraform(&tf, ApplyCommand::new().auto_approve().json(), &[0], |line| {
19//!     println!("[{}] {}", line.log_type, line.message);
20//! }).await?;
21//!
22//! assert!(result.success);
23//! # Ok(())
24//! # }
25//! ```
26
27use std::process::Stdio;
28
29use serde::{Deserialize, Serialize};
30use tokio::io::{AsyncBufReadExt, BufReader};
31use tokio::process::Command as TokioCommand;
32use tracing::{debug, trace, warn};
33
34use crate::Terraform;
35use crate::command::TerraformCommand;
36use crate::error::{Error, Result};
37use crate::exec::CommandOutput;
38
39/// A single JSON log line from Terraform's streaming output.
40///
41/// Terraform emits these as NDJSON (one per line) when commands are run
42/// with the `-json` flag.
43#[derive(Debug, Clone, Deserialize, Serialize)]
44pub struct JsonLogLine {
45    /// Log level: "info", "warn", "error".
46    #[serde(rename = "@level")]
47    pub level: String,
48    /// Human-readable message.
49    #[serde(rename = "@message")]
50    pub message: String,
51    /// Module that emitted the message (e.g., "terraform.ui").
52    #[serde(rename = "@module", default)]
53    pub module: String,
54    /// ISO 8601 timestamp.
55    #[serde(rename = "@timestamp", default)]
56    pub timestamp: String,
57    /// Event type: "version", "planned_change", "change_summary",
58    /// "apply_start", "apply_progress", "apply_complete",
59    /// "apply_errored", "outputs", etc.
60    #[serde(rename = "type", default)]
61    pub log_type: String,
62    /// Change details (for planned_change events).
63    #[serde(default)]
64    pub change: serde_json::Value,
65    /// Hook details (for apply_start/apply_complete/apply_progress events).
66    #[serde(default)]
67    pub hook: serde_json::Value,
68    /// Change summary (for change_summary events).
69    #[serde(default)]
70    pub changes: serde_json::Value,
71    /// Output values (for outputs events).
72    #[serde(default)]
73    pub outputs: serde_json::Value,
74}
75
76/// Execute a Terraform command with streaming JSON output.
77///
78/// Spawns the terraform process and calls `handler` with each [`JsonLogLine`]
79/// as it arrives on stdout. Lines that fail to parse as JSON are logged and
80/// skipped.
81///
82/// `allowed_exit_codes` controls which exit codes are treated as success.
83/// Pass `&[0]` for most commands, or `&[0, 2]` for `plan -detailed-exitcode`
84/// where exit code 2 means "changes present".
85///
86/// Respects the client's timeout setting. If the command exceeds the timeout,
87/// the child process is killed and [`Error::Timeout`] is returned.
88///
89/// Returns a [`CommandOutput`] with the complete stderr and exit code after
90/// the process finishes. Stdout is empty since lines were consumed by the
91/// handler.
92///
93/// The command should have `.json()` enabled. If not, lines won't parse and
94/// will be skipped.
95pub async fn stream_terraform<C, F>(
96    tf: &Terraform,
97    command: C,
98    allowed_exit_codes: &[i32],
99    mut handler: F,
100) -> Result<CommandOutput>
101where
102    C: TerraformCommand,
103    F: FnMut(JsonLogLine),
104{
105    let args = command.prepare_args(tf);
106
107    let mut cmd = TokioCommand::new(&tf.binary);
108
109    if let Some(ref working_dir) = tf.working_dir {
110        cmd.arg(format!("-chdir={}", working_dir.display()));
111    }
112
113    for arg in &args {
114        cmd.arg(arg);
115    }
116
117    // Global args at end (same ordering as exec.rs)
118    for arg in &tf.global_args {
119        cmd.arg(arg);
120    }
121
122    for (key, value) in &tf.env {
123        cmd.env(key, value);
124    }
125
126    cmd.stdout(Stdio::piped());
127    cmd.stderr(Stdio::piped());
128
129    trace!(binary = ?tf.binary, args = ?args, timeout_secs = ?tf.timeout.map(|t| t.as_secs()), "streaming terraform command");
130
131    let mut child = cmd.spawn().map_err(|e| {
132        if e.kind() == std::io::ErrorKind::NotFound {
133            Error::NotFound
134        } else {
135            Error::Io {
136                message: format!("failed to spawn terraform: {e}"),
137                source: e,
138            }
139        }
140    })?;
141
142    let stdout = child.stdout.take().ok_or_else(|| Error::Io {
143        message: "failed to capture stdout".to_string(),
144        source: std::io::Error::other("no stdout"),
145    })?;
146
147    let stream_and_wait = async {
148        let mut reader = BufReader::new(stdout).lines();
149
150        while let Some(line) = reader.next_line().await.map_err(|e| Error::Io {
151            message: format!("failed to read stdout line: {e}"),
152            source: e,
153        })? {
154            trace!(%line, "stream line");
155            match serde_json::from_str::<JsonLogLine>(&line) {
156                Ok(log_line) => handler(log_line),
157                Err(e) => {
158                    warn!(%line, error = %e, "failed to parse streaming json line, skipping");
159                }
160            }
161        }
162
163        child.wait_with_output().await.map_err(|e| Error::Io {
164            message: format!("failed to wait for terraform: {e}"),
165            source: e,
166        })
167    };
168
169    let output = if let Some(duration) = tf.timeout {
170        match tokio::time::timeout(duration, stream_and_wait).await {
171            Ok(result) => result?,
172            Err(_) => {
173                warn!(
174                    timeout_seconds = duration.as_secs(),
175                    "streaming terraform command timed out"
176                );
177                return Err(Error::Timeout {
178                    timeout_seconds: duration.as_secs(),
179                });
180            }
181        }
182    } else {
183        stream_and_wait.await?
184    };
185
186    let stderr = String::from_utf8_lossy(&output.stderr).to_string();
187    let exit_code = output.status.code().unwrap_or(-1);
188    let success = allowed_exit_codes.contains(&exit_code);
189
190    debug!(exit_code, success, "streaming terraform command completed");
191
192    if !success {
193        return Err(Error::CommandFailed {
194            command: args.first().cloned().unwrap_or_default(),
195            exit_code,
196            stdout: String::new(),
197            stderr,
198        });
199    }
200
201    Ok(CommandOutput {
202        stdout: String::new(), // consumed by handler
203        stderr,
204        exit_code,
205        success,
206    })
207}