use crate::api::{DiscourseClient, VersionInfo};
use crate::commands::common::{ensure_api_credentials, missing_config};
use crate::config::{Config, DiscourseConfig, find_discourse};
use crate::utils::color_discourse_label;
use anyhow::{Context, Result, anyhow};
use indicatif::{ProgressBar, ProgressStyle};
use reqwest::blocking::Client;
use std::collections::VecDeque;
use std::io::{self, Write};
use std::io::{BufRead, BufReader, IsTerminal};
use std::process::Stdio;
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
const DEFAULT_PARALLEL_UPDATE_WORKERS: usize = 3;
pub fn update_one(config: &Config, name: &str, post_changelog: bool, yes: bool) -> Result<()> {
let discourse =
find_discourse(config, name).ok_or_else(|| anyhow!("discourse not found: {}", name))?;
let metadata = run_update(discourse)?;
let payload = print_update_summary(discourse, &metadata);
if post_changelog {
handle_changelog_post(discourse, &payload, yes)?;
}
Ok(())
}
pub fn update_all(
config: &Config,
parallel: bool,
max: Option<usize>,
post_changelog: bool,
yes: bool,
) -> Result<()> {
if !parallel {
for discourse in &config.discourse {
if discourse.ssh_host.is_none() {
continue;
}
let metadata = run_update(discourse)?;
let payload = print_update_summary(discourse, &metadata);
if post_changelog {
handle_changelog_post(discourse, &payload, yes)?;
}
}
return Ok(());
}
let updatable: Vec<_> = config
.discourse
.iter()
.filter(|d| d.ssh_host.is_some())
.cloned()
.collect();
let max_threads = parallel_worker_count(max, updatable.len());
let mut handles: Vec<thread::JoinHandle<Result<()>>> = Vec::new();
for discourse in updatable {
if handles.len() >= max_threads {
if let Some(handle) = handles.pop() {
handle.join().expect("thread panicked")?;
}
}
let do_post = post_changelog;
let auto_yes = yes;
handles.push(thread::spawn(move || {
let metadata = run_update(&discourse)?;
let payload = print_update_summary(&discourse, &metadata);
if do_post {
handle_changelog_post(&discourse, &payload, auto_yes)?;
}
Ok::<_, anyhow::Error>(())
}));
}
for handle in handles {
handle.join().expect("thread panicked")?;
}
Ok(())
}
fn parallel_worker_count(max: Option<usize>, discourse_count: usize) -> usize {
let requested = max.unwrap_or(DEFAULT_PARALLEL_UPDATE_WORKERS).max(1);
requested.min(discourse_count.max(1))
}
#[cfg(test)]
mod tests {
use super::parallel_worker_count;
#[test]
fn default_parallel_workers_is_three() {
assert_eq!(parallel_worker_count(None, 10), 3);
}
#[test]
fn max_workers_is_capped_by_discourse_count() {
assert_eq!(parallel_worker_count(Some(8), 2), 2);
}
}
struct UpdateMetadata {
before_version: Option<String>,
before_commit: Option<String>,
after_version: Option<String>,
after_commit: Option<String>,
reclaimed_space: Option<String>,
before_os_version: Option<String>,
after_version_error: Option<String>,
root_disk_usage: Option<String>,
os_updated: bool,
server_rebooted: bool,
}
fn run_update(discourse: &DiscourseConfig) -> Result<UpdateMetadata> {
let client = DiscourseClient::new(discourse)?;
let target = discourse
.ssh_host
.clone()
.unwrap_or_else(|| discourse.name.clone());
let discourse_label = colored_discourse_display(discourse);
println!("\n==> Updating {} ({})", discourse_label, target);
stage(
&discourse_label,
"Fetching Discourse version (before update)",
);
let before_info = match client.fetch_version_info() {
Ok(info) => {
let label = info.version.as_deref().unwrap_or("unknown");
stage(
&discourse_label,
&format!("Initial Discourse Version (before update): {}", label),
);
info
}
Err(err) => {
stage(
&discourse_label,
&format!(
"Initial Discourse Version (before update): unknown (fetch failed: {})",
err
),
);
VersionInfo {
version: None,
commit: None,
}
}
};
stage(&discourse_label, "Fetching OS details");
let before_os_version = match get_os_version(&target) {
Ok(version) => {
let label = version.as_deref().unwrap_or("unknown");
stage(&discourse_label, &format!("OS: {}", label));
version
}
Err(err) => {
stage(
&discourse_label,
&format!(
"Initial OS Version (before update): unknown (fetch failed: {})",
err
),
);
None
}
};
stage(&discourse_label, "Checking root disk free space");
let min_free_gb = std::env::var("DSC_DISCOURSE_MIN_FREE_GB")
.ok()
.and_then(|raw| raw.trim().parse::<u64>().ok())
.filter(|gb| *gb > 0)
.unwrap_or(5);
if let Some(available_gb) = get_root_disk_available_gb(&target)? {
if available_gb < min_free_gb {
return Err(anyhow!(
"insufficient disk space on {}: {}G free (minimum {}G). Please run an interactive update via SSH to clean up space, then retry.",
target,
available_gb,
min_free_gb
));
}
}
let rootless = discourse.docker_rootless.unwrap_or(false);
let os_update_cmd = std::env::var("DSC_SSH_OS_UPDATE_CMD").unwrap_or_else(|_| {
"sudo -n DEBIAN_FRONTEND=noninteractive apt update && sudo -n DEBIAN_FRONTEND=noninteractive apt upgrade -y"
.to_string()
});
let reboot_cmd =
std::env::var("DSC_SSH_REBOOT_CMD").unwrap_or_else(|_| "sudo -n reboot".to_string());
let discourse_update_cmd = std::env::var("DSC_SSH_UPDATE_CMD").unwrap_or_else(|_| {
if rootless {
"cd /var/discourse && ./launcher rebuild app".to_string()
} else {
"cd /var/discourse && sudo -n ./launcher rebuild app".to_string()
}
});
let cleanup_cmd = std::env::var("DSC_SSH_CLEANUP_CMD")
.unwrap_or_else(|_| {
if rootless {
"docker container prune -f && docker image prune -f".to_string()
} else {
"sudo -n docker container prune -f && sudo -n docker image prune -f".to_string()
}
});
let mut server_rebooted = false;
stage(&discourse_label, "Running OS update");
if let Err(err) = run_ssh_command_with_tail(&target, &os_update_cmd, "OS update in progress", 3)
{
if let Some(rollback_cmd) = os_update_rollback_cmd() {
stage(&discourse_label, "Running OS update rollback");
if let Err(rollback_err) = run_ssh_command(&target, &rollback_cmd) {
eprintln!(
"Warning: OS update rollback failed for {}: {}",
target, rollback_err
);
}
}
return Err(anyhow!("OS update failed for {}: {}", target, err));
}
let os_updated = true;
stage(&discourse_label, "Rebooting server");
if run_ssh_command(&target, &reboot_cmd).is_ok() {
server_rebooted = true;
if std::env::var("DSC_SSH_OS_UPDATE_CMD").unwrap_or_default() != "echo OS packages updated"
{
stage(&discourse_label, "Waiting for server to come back online");
std::thread::sleep(std::time::Duration::from_secs(30));
let mut attempts = 0;
let max_attempts = 12;
while attempts < max_attempts {
match ssh_probe(&target) {
Ok(true) => break,
Ok(false) | Err(_) => {
attempts += 1;
if attempts < max_attempts {
println!(
"[{}] Still waiting for SSH (attempt {}/{})",
discourse_label,
attempts + 1,
max_attempts
);
std::thread::sleep(std::time::Duration::from_secs(30));
}
}
}
}
if attempts >= max_attempts {
return Err(anyhow!("Server did not come back online after reboot"));
}
}
}
stage(&discourse_label, "Checking if Discourse update is needed");
let discourse_up_to_date = is_discourse_up_to_date(before_info.commit.as_deref());
if discourse_up_to_date {
stage(
&discourse_label,
"Discourse is already at the latest stable commit — skipping rebuild",
);
} else {
stage(&discourse_label, "Running Discourse update");
run_ssh_command_with_tail(
&target,
&discourse_update_cmd,
"Discourse update in progress",
3,
)?;
}
stage(&discourse_label, "Waiting for Discourse to serve pages");
let wait_secs = std::env::var("DSC_DISCOURSE_BOOT_WAIT_SECS")
.ok()
.and_then(|raw| raw.trim().parse::<u64>().ok())
.filter(|secs| *secs > 0)
.unwrap_or(15);
std::thread::sleep(std::time::Duration::from_secs(wait_secs));
stage(
&discourse_label,
"Fetching Discourse version (after update)",
);
let mut after_version_error = None;
let after_info = match fetch_version_info_with_retry(&client, 6) {
Ok(info) => {
let label = info.version.as_deref().unwrap_or("unknown");
stage(
&discourse_label,
&format!("Final Discourse Version (after update): {}", label),
);
info
}
Err(err) => {
let message = format!("{}", err);
after_version_error = Some(message.clone());
stage(
&discourse_label,
&format!(
"Final Discourse Version (after update): unknown (fetch failed: {})",
message
),
);
VersionInfo {
version: None,
commit: None,
}
}
};
stage(&discourse_label, "Running cleanup");
let cleanup = run_ssh_command_combined(&target, &cleanup_cmd)?;
let reclaimed_space = parse_reclaimed_space(&cleanup);
stage(&discourse_label, "Fetching root disk usage");
let root_disk_usage = match get_root_disk_usage(&target) {
Ok(output) => Some(output),
Err(err) => {
stage(
&discourse_label,
&format!("Root disk usage: unknown (fetch failed: {})", err),
);
None
}
};
Ok(UpdateMetadata {
before_version: before_info.version,
before_commit: before_info.commit,
after_version: after_info.version,
after_commit: after_info.commit,
reclaimed_space,
before_os_version,
after_version_error,
root_disk_usage,
os_updated,
server_rebooted,
})
}
pub(crate) fn run_ssh_command(target: &str, command: &str) -> Result<String> {
let mut cmd = build_ssh_command(target, &[])?;
let output = cmd
.arg(command)
.output()
.with_context(|| format!("running ssh to {}: {}", target, command))?;
if !output.status.success() {
return Err(anyhow!(
"ssh command failed for {}: {}",
target,
String::from_utf8_lossy(&output.stderr)
));
}
Ok(String::from_utf8_lossy(&output.stdout).to_string())
}
fn run_ssh_command_combined(target: &str, command: &str) -> Result<String> {
let mut cmd = build_ssh_command(target, &[])?;
let output = cmd
.arg(command)
.output()
.with_context(|| format!("running ssh to {}: {}", target, command))?;
if !output.status.success() {
return Err(anyhow!(
"ssh command failed for {}: {}",
target,
String::from_utf8_lossy(&output.stderr)
));
}
let mut combined = String::new();
combined.push_str(&String::from_utf8_lossy(&output.stdout));
combined.push_str(&String::from_utf8_lossy(&output.stderr));
Ok(combined)
}
struct LineEvent {
is_stderr: bool,
line: String,
}
fn run_ssh_command_with_tail(
target: &str,
command: &str,
message: &str,
tail_lines: usize,
) -> Result<String> {
let use_progress = io::stderr().is_terminal();
let pb = if use_progress {
ProgressBar::new_spinner()
} else {
ProgressBar::hidden()
};
if use_progress {
let style = ProgressStyle::with_template("{spinner} {msg}")
.unwrap_or_else(|_| ProgressStyle::default_spinner());
pb.set_style(style);
pb.enable_steady_tick(Duration::from_millis(120));
}
let mut cmd = build_ssh_command(target, &[])?;
let mut child = cmd
.arg(command)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.with_context(|| format!("running ssh to {}: {}", target, command))?;
let stdout = child.stdout.take().context("missing stdout")?;
let stderr = child.stderr.take().context("missing stderr")?;
let (tx, rx) = mpsc::channel::<LineEvent>();
let tx_out = tx.clone();
thread::spawn(move || {
let reader = BufReader::new(stdout);
for line in reader.lines() {
match line {
Ok(line) => {
let _ = tx_out.send(LineEvent {
is_stderr: false,
line,
});
}
Err(_) => break,
}
}
});
let tx_err = tx.clone();
thread::spawn(move || {
let reader = BufReader::new(stderr);
for line in reader.lines() {
match line {
Ok(line) => {
let _ = tx_err.send(LineEvent {
is_stderr: true,
line,
});
}
Err(_) => break,
}
}
});
drop(tx);
let mut stdout_buf = String::new();
let mut stderr_buf = String::new();
let mut tail: VecDeque<String> = VecDeque::new();
let base = format!("[{}] {}", target, message);
pb.set_message(base.clone());
loop {
match rx.recv_timeout(Duration::from_millis(200)) {
Ok(event) => {
if event.is_stderr {
stderr_buf.push_str(&event.line);
stderr_buf.push('\n');
} else {
stdout_buf.push_str(&event.line);
stdout_buf.push('\n');
}
if tail_lines > 0 {
if tail.len() == tail_lines {
tail.pop_front();
}
tail.push_back(event.line);
let mut msg = base.clone();
for line in &tail {
msg.push('\n');
msg.push_str(" ");
msg.push_str(line);
}
pb.set_message(msg);
}
}
Err(mpsc::RecvTimeoutError::Timeout) => {}
Err(mpsc::RecvTimeoutError::Disconnected) => break,
}
}
let status = child.wait().context("waiting for ssh command")?;
pb.finish_and_clear();
if !status.success() {
return Err(anyhow!("ssh command failed for {}: {}", target, stderr_buf));
}
Ok(stdout_buf)
}
fn build_ssh_command(target: &str, extra_options: &[&str]) -> Result<std::process::Command> {
validate_ssh_target(target)?;
let mut cmd = std::process::Command::new("ssh");
cmd.arg("-o").arg("BatchMode=yes");
if let Some(strict) = ssh_strict_host_key_checking() {
cmd.arg("-o")
.arg(format!("StrictHostKeyChecking={}", strict));
}
for option in extra_options {
cmd.arg(option);
}
if let Ok(raw) = std::env::var("DSC_SSH_OPTIONS") {
if !raw.trim().is_empty() {
cmd.args(raw.split_whitespace());
}
}
cmd.arg("--").arg(target);
Ok(cmd)
}
fn ssh_strict_host_key_checking() -> Option<String> {
let value = std::env::var("DSC_SSH_STRICT_HOST_KEY_CHECKING")
.unwrap_or_else(|_| "accept-new".to_string());
let value = value.trim();
if value.is_empty() {
None
} else {
Some(value.to_string())
}
}
fn validate_ssh_target(target: &str) -> Result<()> {
let trimmed = target.trim();
if trimmed.is_empty() {
return Err(anyhow!("ssh target is empty"));
}
if trimmed.starts_with('-') {
return Err(anyhow!("ssh target cannot start with '-': {}", target));
}
if trimmed.chars().any(|ch| ch.is_whitespace()) {
return Err(anyhow!("ssh target cannot contain whitespace: {}", target));
}
Ok(())
}
fn ssh_probe(target: &str) -> Result<bool> {
let mut cmd = build_ssh_command(target, &["-o", "ConnectTimeout=10"])?;
let output = cmd
.arg("echo 'server is up'")
.output()
.with_context(|| format!("running ssh probe to {}", target))?;
Ok(output.status.success())
}
fn stage(target: &str, message: &str) {
println!("[{}] {}", target, message);
}
fn print_update_summary(discourse: &DiscourseConfig, metadata: &UpdateMetadata) -> String {
let payload = build_changelog_payload(metadata);
let discourse_label = colored_discourse_display(discourse);
println!("\nUpdate summary for {}:", discourse_label);
for line in payload.lines() {
println!("{}", line);
}
println!();
payload
}
fn discourse_display_name(discourse: &DiscourseConfig) -> String {
if let Some(fullname) = discourse.fullname.as_deref() {
let trimmed = fullname.trim();
if !trimmed.is_empty() {
return format!("{} [{}]", trimmed, discourse.name);
}
}
discourse.name.clone()
}
fn colored_discourse_display(discourse: &DiscourseConfig) -> String {
let label = discourse_display_name(discourse);
color_discourse_label(&label, &discourse.name)
}
fn get_os_version(target: &str) -> Result<Option<String>> {
let version_cmd = std::env::var("DSC_SSH_OS_VERSION_CMD")
.unwrap_or_else(|_| "lsb_release -d | cut -f2".to_string());
match run_ssh_command(target, &version_cmd) {
Ok(output) => Ok(Some(output.trim().to_string())),
Err(_) => {
let fallback_cmd = "grep PRETTY_NAME /etc/os-release | cut -d'=' -f2 | tr -d '\"'";
match run_ssh_command(target, fallback_cmd) {
Ok(output) => Ok(Some(output.trim().to_string())),
Err(_) => Ok(None),
}
}
}
}
fn parse_reclaimed_space(output: &str) -> Option<String> {
let cleaned = strip_ansi_codes(output);
cleaned
.lines()
.filter_map(|line| {
let lower = line.to_ascii_lowercase();
let idx = lower.find("total reclaimed space:")?;
let (_, rest) = line.split_at(idx);
rest.splitn(2, ':')
.nth(1)
.map(|value| value.trim().to_string())
})
.last()
}
fn get_root_disk_usage(target: &str) -> Result<String> {
let cmd = "df -h / | awk 'NR==2 {print $2 \" total, \" $3 \" used, \" $4 \" available, \" $5 \" used\"}'";
let output = run_ssh_command(target, cmd)?;
Ok(output.trim().to_string())
}
fn get_root_disk_available_gb(target: &str) -> Result<Option<u64>> {
let cmd = "df -BG / | awk 'NR==2 {print $4}'";
let output = run_ssh_command(target, cmd)?;
let trimmed = output.trim();
if trimmed.is_empty() {
return Ok(None);
}
let digits = trimmed.trim_end_matches('G');
Ok(digits.parse::<u64>().ok())
}
fn os_update_rollback_cmd() -> Option<String> {
let raw = std::env::var("DSC_SSH_OS_UPDATE_ROLLBACK_CMD").unwrap_or_default();
let trimmed = raw.trim();
if trimmed.is_empty() {
None
} else {
Some(trimmed.to_string())
}
}
fn build_changelog_payload(metadata: &UpdateMetadata) -> String {
let before_version = metadata.before_version.as_deref().unwrap_or("unknown");
let after_version = metadata.after_version.as_deref().unwrap_or("unknown");
let reclaimed = metadata.reclaimed_space.as_deref().unwrap_or("unknown");
let os_version = metadata.before_os_version.as_deref().unwrap_or("unknown");
let root_disk = metadata.root_disk_usage.as_deref().unwrap_or("unknown");
let before_commit = format_commit_link(metadata.before_commit.as_deref());
let after_commit = format_commit_link(metadata.after_commit.as_deref());
let mut body = Vec::new();
if metadata.os_updated {
body.push(format!("- [x] OS updated {}", os_version));
} else {
body.push(format!("- [ ] OS updated {}", os_version));
}
if metadata.server_rebooted {
body.push("- [x] Server rebooted".to_string());
}
body.push("- [x] Updated Discourse:".to_string());
if let Some(commit) = before_commit.as_deref() {
body.push(format!(
" - Initial version: {} {}",
before_version, commit
));
} else {
body.push(format!(" - Initial version: {}", before_version));
}
let after_error = metadata
.after_version_error
.as_deref()
.map(|err| format!(" (fetch failed: {})", err))
.unwrap_or_default();
if let Some(commit) = after_commit.as_deref() {
body.push(format!(
" - Updated version: {}{} {}",
after_version, after_error, commit
));
} else {
body.push(format!(
" - Updated version: {}{}",
after_version, after_error
));
}
if reclaimed == "unknown" {
body.push("- [x] `./launcher cleanup` performed".to_string());
} else {
body.push(format!(
"- [x] `./launcher cleanup` Total reclaimed space: {}",
reclaimed
));
}
body.push(format!("- [x] Root disk usage (df -h /): {}", root_disk));
let test_marker = std::env::var("DSC_TEST_MARKER").ok();
if let Some(marker) = &test_marker {
body.push(format!("- Run-ID: {}", marker));
}
body.join("\n")
}
fn fetch_version_info_with_retry(client: &DiscourseClient, attempts: usize) -> Result<VersionInfo> {
let mut last_err = None;
let total = attempts.max(1);
for attempt in 0..total {
match client.fetch_version_info() {
Ok(info) => return Ok(info),
Err(err) => {
let message = err.to_string();
last_err = Some(err);
if attempt + 1 < total {
if message.contains("502") {
std::thread::sleep(std::time::Duration::from_secs(10));
} else {
std::thread::sleep(std::time::Duration::from_secs(
2 * (attempt + 1) as u64,
));
}
}
}
}
}
Err(last_err.unwrap_or_else(|| anyhow!("fetch version failed")))
}
fn fetch_latest_discourse_commit() -> Option<String> {
let client = Client::builder()
.timeout(Duration::from_secs(10))
.build()
.ok()?;
let resp = client
.get("https://api.github.com/repos/discourse/discourse/commits/stable")
.header("Accept", "application/vnd.github.sha")
.header("User-Agent", "dsc-cli")
.send()
.ok()?;
if !resp.status().is_success() {
return None;
}
let sha = resp.text().ok()?.trim().to_string();
if sha.len() >= 7 && sha.chars().all(|c| c.is_ascii_hexdigit()) {
Some(sha)
} else {
None
}
}
fn is_discourse_up_to_date(running_commit: Option<&str>) -> bool {
let Some(running) = running_commit else {
return false;
};
let running = running.trim();
if running.is_empty() {
return false;
}
let Some(latest) = fetch_latest_discourse_commit() else {
return false;
};
let cmp_len = running.len().min(latest.len());
running[..cmp_len].eq_ignore_ascii_case(&latest[..cmp_len])
}
fn format_commit_link(commit: Option<&str>) -> Option<String> {
let Some(commit) = commit else {
return None;
};
let trimmed = commit.trim();
if trimmed.is_empty() {
return None;
}
let short = trimmed.chars().take(10).collect::<String>();
Some(format!(
"[{}](https://github.com/discourse/discourse/commits/{})",
short, trimmed
))
}
fn strip_ansi_codes(input: &str) -> String {
let mut out = String::with_capacity(input.len());
let mut chars = input.chars().peekable();
while let Some(ch) = chars.next() {
if ch == '\u{1b}' {
if matches!(chars.peek(), Some('[')) {
chars.next();
while let Some(next) = chars.next() {
if next.is_ascii_alphabetic() {
break;
}
}
continue;
}
}
out.push(ch);
}
out
}
fn post_changelog_update(discourse: &DiscourseConfig, payload: &str) -> Result<u64> {
let topic_id = discourse.changelog_topic_id.ok_or_else(|| {
missing_config(
"changelog_topic_id",
&format!("discourse {}", discourse.name),
"changelog_topic_id",
)
})?;
let client = DiscourseClient::new(discourse)?;
let post_id = client.create_post(topic_id, payload)?;
if std::env::var("DSC_TEST_MARKER").is_ok() {
println!("DSC_TEST_POST_ID={}", post_id);
}
Ok(post_id)
}
fn confirm_changelog_post(yes: bool) -> Result<bool> {
if yes {
println!("Post this to changelog? [y/N]: y (--yes)");
return Ok(true);
}
if std::env::var("DSC_TEST_MARKER").is_ok() {
println!("Post this to changelog? [y/N]: y (auto)");
return Ok(true);
}
print!("Post this to changelog? [y/N]: ");
io::stdout().flush()?;
let mut input = String::new();
io::stdin().read_line(&mut input)?;
Ok(matches!(input.trim(), "y" | "Y" | "yes" | "YES"))
}
fn handle_changelog_post(discourse: &DiscourseConfig, payload: &str, yes: bool) -> Result<()> {
let topic_id = discourse.changelog_topic_id;
if topic_id.is_none() {
println!(
"Changelog post skipped: missing changelog_topic_id for {}",
discourse.name
);
return Ok(());
}
if let Err(err) = ensure_api_credentials(discourse) {
println!("Changelog post skipped: {}", err);
return Ok(());
}
if !confirm_changelog_post(yes)? {
println!("Changelog post skipped.");
return Ok(());
}
match post_changelog_update(discourse, payload) {
Ok(post_id) => {
let base = discourse.baseurl.trim_end_matches('/');
println!("Changelog post created: {}/p/{}", base, post_id);
Ok(())
}
Err(err) => {
println!("Changelog post failed: {}", err);
Err(err)
}
}
}