use anyhow::{bail, Result};
use reqwest::Client;
use serde::Deserialize;
use std::io::{self, IsTerminal, Write as _};
use std::time::{Duration, Instant};
use tracing::{debug, info};
use crate::api::models::{Deployment, DeploymentStatus};
use crate::config::Config;
use super::core::{fetch_deployment, open_log_stream, parse_duration, LogStreamError};
#[derive(Deserialize)]
struct ProjectInfo {
#[serde(skip_serializing_if = "Option::is_none")]
primary_url: Option<String>,
}
#[derive(Deserialize, Debug, Clone, PartialEq)]
struct DockerMetadata {
#[serde(default)]
reconcile_phase: ReconcilePhase,
container_id: Option<String>,
container_name: Option<String>,
assigned_port: Option<u16>,
}
#[derive(Deserialize, Debug, Clone, PartialEq, Default)]
enum ReconcilePhase {
#[default]
NotStarted,
CreatingContainer,
StartingContainer,
WaitingForHealth,
Completed,
}
mod ansi {
pub const CLEAR_LINE: &str = "\x1B[2K";
pub const HIDE_CURSOR: &str = "\x1B[?25l";
pub const SHOW_CURSOR: &str = "\x1B[?25h";
pub const RESET: &str = "\x1B[0m";
pub fn move_up(n: usize) -> String {
format!("\x1B[{}A", n)
}
pub const CURSOR_TO_START: &str = "\r";
}
const SPINNER_FRAMES: &[&str] = &["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"];
struct FollowState {
last_status: DeploymentStatus,
last_controller_phase: Option<ReconcilePhase>,
last_error: Option<String>,
last_url: Option<String>,
last_metadata: serde_json::Value,
spinner_frame: usize,
is_first_poll: bool,
}
impl FollowState {
fn new() -> Self {
Self {
last_status: DeploymentStatus::Pending,
last_controller_phase: None,
last_error: None,
last_url: None,
last_metadata: serde_json::Value::Null,
spinner_frame: 0,
is_first_poll: true,
}
}
fn should_log_state_change(
&self,
deployment: &Deployment,
controller_phase: &Option<ReconcilePhase>,
) -> bool {
self.is_first_poll
|| self.last_status != deployment.status
|| self.last_controller_phase != *controller_phase
}
fn update(&mut self, deployment: &Deployment, controller_phase: Option<ReconcilePhase>) {
self.last_status = deployment.status.clone();
self.last_controller_phase = controller_phase;
self.last_error = deployment.error_message.clone();
self.last_url = deployment.primary_url.clone();
self.last_metadata = deployment.controller_metadata.clone();
self.is_first_poll = false;
}
}
struct LiveStatusSection {
pub last_line_count: usize,
}
impl LiveStatusSection {
fn new() -> Self {
Self { last_line_count: 0 }
}
fn clear_previous(&self) {
if self.last_line_count > 0 {
for _ in 0..self.last_line_count {
print!(
"{}{}{}",
ansi::move_up(1),
ansi::CURSOR_TO_START,
ansi::CLEAR_LINE
);
}
print!("{}", ansi::CURSOR_TO_START);
io::stdout().flush().unwrap();
}
}
fn render(
&mut self,
deployment: &Deployment,
state: &FollowState,
controller_phase: &Option<ReconcilePhase>,
) -> String {
self.clear_previous();
let mut output = String::new();
let mut line_count = 0;
let icon = status_icon(&deployment.status);
let color = status_color(&deployment.status);
let spinner = if is_in_progress(&deployment.status) {
format!("{} ", spinner_frame(state.spinner_frame))
} else {
String::new()
};
let status_text = if let Some(phase) = controller_phase {
format!("{} ({})", deployment.status, format_controller_phase(phase))
} else {
format!("{}", deployment.status)
};
output.push_str(&format!(
"{}{} Status: {}{}{}\n",
spinner,
icon,
color,
status_text,
ansi::RESET
));
line_count += 1;
if let Some(ref url) = deployment.primary_url {
output.push_str(&format!(" URL: {}\n", url));
line_count += 1;
}
if let Some(ref error) = deployment.error_message {
output.push_str(&format!(
" {}Error:{} {}\n",
"\x1B[31m",
ansi::RESET,
error
));
line_count += 1;
}
if let Some(container_id) = extract_container_id(&deployment.controller_metadata) {
output.push_str(&format!(" Container: {}\n", container_id));
line_count += 1;
}
self.last_line_count = line_count;
output
}
}
fn status_color(status: &DeploymentStatus) -> &'static str {
match status {
DeploymentStatus::Healthy => "\x1B[32m", DeploymentStatus::Failed => "\x1B[31m", DeploymentStatus::Deploying => "\x1B[33m", DeploymentStatus::Building => "\x1B[36m", DeploymentStatus::Pushing => "\x1B[36m", DeploymentStatus::Unhealthy => "\x1B[31m", DeploymentStatus::Cancelled => "\x1B[90m", DeploymentStatus::Stopped => "\x1B[90m", _ => "\x1B[37m", }
}
fn status_icon(status: &DeploymentStatus) -> &'static str {
match status {
DeploymentStatus::Healthy => "✓",
DeploymentStatus::Failed => "✗",
DeploymentStatus::Deploying => "⚙",
DeploymentStatus::Building => "🔨",
DeploymentStatus::Pushing => "⬆",
DeploymentStatus::Pushed => "✓",
DeploymentStatus::Unhealthy => "⚠",
DeploymentStatus::Cancelled => "⊘",
DeploymentStatus::Cancelling => "⊘",
DeploymentStatus::Terminating => "⊘",
DeploymentStatus::Stopped => "■",
DeploymentStatus::Superseded => "↻",
DeploymentStatus::Expired => "⏱",
DeploymentStatus::Pending => "○",
}
}
fn spinner_frame(frame_num: usize) -> &'static str {
SPINNER_FRAMES[frame_num % SPINNER_FRAMES.len()]
}
fn is_in_progress(status: &DeploymentStatus) -> bool {
matches!(
status,
DeploymentStatus::Pending
| DeploymentStatus::Building
| DeploymentStatus::Pushing
| DeploymentStatus::Pushed
| DeploymentStatus::Deploying
| DeploymentStatus::Cancelling
| DeploymentStatus::Terminating
)
}
fn is_terminal_state(status: &DeploymentStatus) -> bool {
matches!(
status,
DeploymentStatus::Healthy
| DeploymentStatus::Failed
| DeploymentStatus::Cancelled
| DeploymentStatus::Stopped
| DeploymentStatus::Superseded
| DeploymentStatus::Expired
)
}
fn parse_controller_metadata(metadata: &serde_json::Value) -> Option<DockerMetadata> {
if metadata.is_null() || metadata == &serde_json::json!({}) {
return None;
}
serde_json::from_value::<DockerMetadata>(metadata.clone()).ok()
}
fn extract_container_id(metadata: &serde_json::Value) -> Option<String> {
parse_controller_metadata(metadata)
.and_then(|m| m.container_id.map(|id| id[..12.min(id.len())].to_string()))
}
fn format_controller_phase(phase: &ReconcilePhase) -> String {
match phase {
ReconcilePhase::NotStarted => "not started".to_string(),
ReconcilePhase::CreatingContainer => "creating container".to_string(),
ReconcilePhase::StartingContainer => "starting container".to_string(),
ReconcilePhase::WaitingForHealth => "waiting for health".to_string(),
ReconcilePhase::Completed => "running".to_string(),
}
}
fn log_state_change(
project: &str,
deployment_id: &str,
status: &DeploymentStatus,
controller_phase: &Option<ReconcilePhase>,
) {
let status_text = if let Some(phase) = controller_phase {
format!("{} ({})", status, format_controller_phase(phase))
} else {
format!("{}", status)
};
info!("Deployment {}:{} → {}", project, deployment_id, status_text);
}
fn is_tty() -> bool {
io::stdout().is_terminal()
}
pub fn print_deployment_snapshot(deployment: &Deployment) {
let controller_phase =
parse_controller_metadata(&deployment.controller_metadata).map(|m| m.reconcile_phase);
let icon = status_icon(&deployment.status);
let color = status_color(&deployment.status);
let status_text = if let Some(phase) = controller_phase {
format!(
"{} ({})",
deployment.status,
format_controller_phase(&phase)
)
} else {
format!("{}", deployment.status)
};
println!(
"{} Status: {}{}{}",
icon,
color,
status_text,
ansi::RESET
);
println!(" Deployment ID: {}", deployment.deployment_id);
if deployment.deployment_group != "default" {
println!(" Group: {}", deployment.deployment_group);
}
println!(" Created by: {}", deployment.created_by_email);
println!(" Created: {}", deployment.created);
if deployment.updated != deployment.created {
println!(" Updated: {}", deployment.updated);
}
if let Some(ref expires) = deployment.expires_at {
println!(" Expires at: {}", expires);
}
if let Some(ref image) = deployment.image {
println!(" Image: {}", image);
}
if let Some(ref digest) = deployment.image_digest {
println!(" Image digest: {}", digest);
}
if let Some(ref url) = deployment.primary_url {
println!(" URL: {}", url);
}
if let Some(container_id) = extract_container_id(&deployment.controller_metadata) {
println!(" Container: {}", container_id);
}
if let Some(ref error) = deployment.error_message {
println!(" \x1B[31mError:{} {}", ansi::RESET, error);
}
}
async fn fetch_project_info(
http_client: &Client,
backend_url: &str,
token: &str,
project: &str,
) -> Result<ProjectInfo> {
let url = format!("{}/api/v1/projects/{}", backend_url, project);
let response = http_client.get(&url).bearer_auth(token).send().await?;
if !response.status().is_success() {
bail!("Failed to fetch project info");
}
let project_info: ProjectInfo = response.json().await?;
Ok(project_info)
}
fn should_stream_logs(status: &DeploymentStatus) -> bool {
matches!(
status,
DeploymentStatus::Deploying | DeploymentStatus::Unhealthy
)
}
async fn stream_logs_with_status_polling(
http_client: &Client,
backend_url: &str,
token: &str,
project: &str,
deployment_id: &str,
timeout: Duration,
start_time: Instant,
) -> Result<Deployment> {
let mut log_stream = None;
let mut retry_count: usize = 0;
const MAX_RETRIES: usize = 10;
const RETRY_DELAY: Duration = Duration::from_secs(2);
let mut status_interval = tokio::time::interval(Duration::from_secs(3));
status_interval.tick().await;
match open_log_stream(http_client, backend_url, token, project, deployment_id, 100).await {
Ok(s) => log_stream = Some(s),
Err(LogStreamError::Gone) => {
return fetch_deployment(http_client, backend_url, token, project, deployment_id).await;
}
Err(e) => {
debug!("Initial log stream connection failed: {:?}", e);
}
}
loop {
if start_time.elapsed() >= timeout {
bail!(
"Timeout waiting for deployment to complete after {:?}",
timeout
);
}
if let Some(ref mut stream) = log_stream {
tokio::select! {
biased; line = stream.recv() => {
match line {
Some(Ok(text)) => println!("{}", text),
Some(Err(e)) => {
debug!("Log stream error: {:?}", e);
log_stream = None;
}
None => {
debug!("Log stream ended");
log_stream = None;
}
}
}
_ = status_interval.tick() => {
let deployment = fetch_deployment(
http_client, backend_url, token, project, deployment_id,
).await?;
if is_terminal_state(&deployment.status) {
drain_log_stream(stream).await;
return Ok(deployment);
}
}
}
} else {
if retry_count >= MAX_RETRIES {
debug!("Max log stream retries exceeded, falling back to status-only polling");
return status_only_polling(
http_client,
backend_url,
token,
project,
deployment_id,
timeout,
start_time,
)
.await;
}
tokio::select! {
_ = tokio::time::sleep(RETRY_DELAY) => {
retry_count += 1;
match open_log_stream(
http_client, backend_url, token, project, deployment_id, 100,
).await {
Ok(s) => {
log_stream = Some(s);
retry_count = 0;
}
Err(LogStreamError::Gone) => {
return fetch_deployment(
http_client, backend_url, token, project, deployment_id,
).await;
}
Err(e) => {
debug!("Log stream reconnect failed (attempt {}): {:?}", retry_count, e);
}
}
}
_ = status_interval.tick() => {
let deployment = fetch_deployment(
http_client, backend_url, token, project, deployment_id,
).await?;
if is_terminal_state(&deployment.status) {
return Ok(deployment);
}
}
}
}
}
}
async fn drain_log_stream(stream: &mut super::core::LogStream) {
let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
loop {
tokio::select! {
line = stream.recv() => {
match line {
Some(Ok(text)) => println!("{}", text),
_ => break,
}
}
_ = tokio::time::sleep_until(deadline) => break,
}
}
}
async fn status_only_polling(
http_client: &Client,
backend_url: &str,
token: &str,
project: &str,
deployment_id: &str,
timeout: Duration,
start_time: Instant,
) -> Result<Deployment> {
loop {
let deployment =
fetch_deployment(http_client, backend_url, token, project, deployment_id).await?;
if is_terminal_state(&deployment.status) {
return Ok(deployment);
}
if start_time.elapsed() >= timeout {
bail!(
"Timeout waiting for deployment to complete after {:?}",
timeout
);
}
tokio::time::sleep(Duration::from_secs(3)).await;
}
}
pub async fn follow_deployment_with_ui(
http_client: &Client,
backend_url: &str,
config: &Config,
project: &str,
deployment_id: &str,
timeout_str: &str,
) -> Result<Deployment> {
let token = config
.get_token()
.ok_or_else(|| anyhow::anyhow!("Not authenticated"))?;
let timeout = parse_duration(timeout_str)?;
let start_time = Instant::now();
if !is_tty() {
return follow_deployment_simple(
http_client,
backend_url,
&token,
project,
deployment_id,
timeout,
)
.await;
}
let mut state = FollowState::new();
let mut live_section = LiveStatusSection::new();
print!("{}", ansi::HIDE_CURSOR);
io::stdout().flush().unwrap();
let phase1_result: Result<Deployment> = async {
loop {
let deployment =
fetch_deployment(http_client, backend_url, &token, project, deployment_id).await?;
let controller_phase = parse_controller_metadata(&deployment.controller_metadata)
.map(|m| m.reconcile_phase);
if state.should_log_state_change(&deployment, &controller_phase) {
live_section.clear_previous();
log_state_change(
project,
deployment_id,
&deployment.status,
&controller_phase,
);
live_section.last_line_count = 0;
} else {
let output = live_section.render(&deployment, &state, &controller_phase);
print!("{}", output);
io::stdout().flush().unwrap();
}
state.update(&deployment, controller_phase);
state.spinner_frame = (state.spinner_frame + 1) % SPINNER_FRAMES.len();
if is_terminal_state(&deployment.status) {
return Ok(deployment);
}
if should_stream_logs(&deployment.status) {
return Ok(deployment);
}
if start_time.elapsed() >= timeout {
bail!(
"Timeout waiting for deployment to complete after {:?}",
timeout
);
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
.await;
let deployment = match phase1_result {
Ok(d) => d,
Err(e) => {
print!("{}", ansi::SHOW_CURSOR);
io::stdout().flush().unwrap();
return Err(e);
}
};
let final_deployment = if !is_terminal_state(&deployment.status) {
live_section.clear_previous();
print!("{}", ansi::SHOW_CURSOR);
io::stdout().flush().unwrap();
println!("--- Logs ---");
stream_logs_with_status_polling(
http_client,
backend_url,
&token,
project,
deployment_id,
timeout,
start_time,
)
.await?
} else {
print!("{}", ansi::SHOW_CURSOR);
io::stdout().flush().unwrap();
deployment
};
if final_deployment.status == DeploymentStatus::Healthy
&& final_deployment.deployment_group == "default"
{
if let Ok(project_info) =
fetch_project_info(http_client, backend_url, &token, project).await
{
if let Some(url) = project_info.primary_url {
println!();
println!("Project URL: {}", url);
}
}
}
Ok(final_deployment)
}
async fn follow_deployment_simple(
http_client: &Client,
backend_url: &str,
token: &str,
project: &str,
deployment_id: &str,
timeout: Duration,
) -> Result<Deployment> {
let start_time = Instant::now();
let mut state = FollowState::new();
let deployment = loop {
let deployment =
fetch_deployment(http_client, backend_url, token, project, deployment_id).await?;
let controller_phase =
parse_controller_metadata(&deployment.controller_metadata).map(|m| m.reconcile_phase);
if state.should_log_state_change(&deployment, &controller_phase) {
log_state_change(
project,
deployment_id,
&deployment.status,
&controller_phase,
);
}
state.update(&deployment, controller_phase);
if is_terminal_state(&deployment.status) {
break deployment;
}
if should_stream_logs(&deployment.status) {
break deployment;
}
if start_time.elapsed() >= timeout {
bail!("Timeout waiting for deployment");
}
tokio::time::sleep(Duration::from_secs(1)).await;
};
let final_deployment = if !is_terminal_state(&deployment.status) {
println!("--- Logs ---");
stream_logs_with_status_polling(
http_client,
backend_url,
token,
project,
deployment_id,
timeout,
start_time,
)
.await?
} else {
deployment
};
if final_deployment.status == DeploymentStatus::Healthy
&& final_deployment.deployment_group == "default"
{
if let Ok(project_info) = fetch_project_info(http_client, backend_url, token, project).await
{
if let Some(url) = project_info.primary_url {
println!();
println!("Project URL: {}", url);
}
}
}
Ok(final_deployment)
}