use std::io::{BufRead, BufReader};
use std::path::PathBuf;
use std::process::{Command, Stdio};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};
use anyhow::{bail, Context, Result};
use colored::Colorize;
use indicatif::{ProgressBar, ProgressStyle};
pub struct ComposeRunner {
dir: PathBuf,
}
impl ComposeRunner {
pub fn new(dir: PathBuf) -> Self {
Self { dir }
}
pub async fn pull(&self) -> Result<()> {
println!();
println!(
"{}",
"Pulling Docker images (this may take a while)...".bold()
);
self.run_compose(&["pull"])?;
println!(" {} Images pulled", "✓".green());
Ok(())
}
pub async fn up(&self) -> Result<()> {
self.up_with_profile(None).await
}
pub async fn up_with_profile(&self, profile: Option<&str>) -> Result<()> {
println!();
match profile {
Some(profile) => println!(
"{}",
format!("Starting services in profile `{profile}`...").bold()
),
None => println!("{}", "Starting services...".bold()),
}
if let Err(e) = self.run_compose_with_profile(&["up", "-d", "--wait"], profile) {
eprintln!();
eprintln!(
"{}",
"── Container logs ──────────────────────────────".dimmed()
);
let logs = self.collect_logs();
if logs.trim().is_empty() {
eprintln!("{}", " (no container logs captured)".dimmed());
} else {
for line in logs.lines() {
eprintln!(" {line}");
}
}
eprintln!(
"{}",
"────────────────────────────────────────────────".dimmed()
);
eprintln!();
eprintln!("{} To inspect logs at any time run:", "tip:".cyan().bold());
eprintln!(
" docker compose -f {} logs --follow",
self.dir.join("docker-compose.yml").display()
);
eprintln!();
return Err(e);
}
println!(" {} Services started", "✓".green());
Ok(())
}
pub async fn pull_ollama_model(&self, model: &str) -> Result<()> {
println!();
println!("{} {}", "Pulling Ollama model:".bold(), model.bold().cyan());
let status = Command::new("docker")
.arg("compose")
.args(["exec", "-T", "ollama", "ollama", "pull", model])
.current_dir(&self.dir)
.stdout(Stdio::null())
.stderr(Stdio::null())
.status()
.context("Failed to run `docker compose exec ollama ollama pull`")?;
if !status.success() {
bail!(
"Failed to pull Ollama model '{}' (exit {})",
model,
status.code().unwrap_or(-1)
);
}
let deadline = Instant::now() + Duration::from_secs(300);
loop {
let show_status = Command::new("docker")
.arg("compose")
.args(["exec", "-T", "ollama", "ollama", "show", model])
.current_dir(&self.dir)
.stdout(Stdio::null())
.stderr(Stdio::null())
.status()
.context("Failed to verify pulled Ollama model")?;
if show_status.success() {
break;
}
if Instant::now() >= deadline {
bail!("Timed out waiting for Ollama model '{model}' to become available");
}
thread::sleep(Duration::from_secs(2));
}
println!(" {} Ollama model ready: {}", "✓".green(), model);
Ok(())
}
pub async fn restart(&self, profile: Option<&str>) -> Result<()> {
println!();
match profile {
Some(profile) => {
println!(
"{}",
format!("Restarting services in profile `{profile}`...").bold()
);
self.run_compose_with_profile(&["down"], Some(profile))?;
self.run_compose_with_profile(&["up", "-d", "--wait"], Some(profile))?;
}
None => {
println!("{}", "Restarting all services...".bold());
self.run_compose_with_profile(&["down"], None)?;
self.run_compose_with_profile(&["up", "-d", "--wait"], None)?;
}
}
println!(" {} Services restarted", "✓".green());
Ok(())
}
fn collect_logs(&self) -> String {
let output = Command::new("docker")
.arg("compose")
.args(["logs", "--tail=60", "--no-color"])
.current_dir(&self.dir)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.output();
match output {
Ok(o) => {
let stdout = String::from_utf8_lossy(&o.stdout).into_owned();
let stderr = String::from_utf8_lossy(&o.stderr).into_owned();
format!("{stdout}{stderr}")
}
Err(_) => String::new(),
}
}
fn run_compose(&self, args: &[&str]) -> Result<()> {
self.run_compose_with_profile(args, None)
}
fn run_compose_with_profile(&self, args: &[&str], profile: Option<&str>) -> Result<()> {
let spinner = ProgressBar::new_spinner();
spinner.set_style(
ProgressStyle::with_template(" {spinner:.cyan} {msg}")
.unwrap()
.tick_chars("⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏"),
);
spinner.enable_steady_tick(std::time::Duration::from_millis(80));
let mut cmd = Command::new("docker");
cmd.arg("compose");
if let Some(profile) = profile {
cmd.arg("--profile");
cmd.arg(profile);
}
cmd.args(args);
cmd.current_dir(&self.dir);
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
let mut child = cmd
.spawn()
.context("Failed to spawn `docker compose` — is Docker installed?")?;
let stdout = child.stdout.take().unwrap();
let stderr = child.stderr.take().unwrap();
let last_stderr: Arc<Mutex<String>> = Arc::new(Mutex::new(String::new()));
let spinner_stdout = spinner.clone();
let stdout_thread = thread::spawn(move || {
for line in BufReader::new(stdout).lines().map_while(Result::ok) {
let trimmed = line.trim().to_string();
if !trimmed.is_empty() {
spinner_stdout.set_message(trimmed);
}
}
});
let spinner_stderr = spinner.clone();
let last_stderr_clone = Arc::clone(&last_stderr);
let stderr_thread = thread::spawn(move || {
for line in BufReader::new(stderr).lines().map_while(Result::ok) {
let trimmed = line.trim().to_string();
if !trimmed.is_empty() {
spinner_stderr.set_message(trimmed.clone());
*last_stderr_clone.lock().unwrap() = trimmed;
}
}
});
stdout_thread.join().ok();
stderr_thread.join().ok();
let status = child
.wait()
.context("Failed to wait for `docker compose`")?;
spinner.finish_and_clear();
if !status.success() {
let msg = last_stderr.lock().unwrap().clone();
let rendered_args = match profile {
Some(profile) => format!("--profile {profile} {}", args.join(" ")),
None => args.join(" "),
};
bail!(
"`docker compose {}` failed (exit {}): {}",
rendered_args,
status.code().unwrap_or(-1),
msg
);
}
Ok(())
}
}