use futures_util::StreamExt;
use crate::compose::types::ComposeFile;
use crate::error::{ComposeError, Result};
use crate::libpod::types::exec::{
ExecCreateConfig, ExecCreateResponse, ExecInspect, ExecStartConfig,
};
use crate::libpod::{urlencoded, LogOutput, API_PREFIX};
use super::Engine;
mod inspect;
#[derive(Default)]
pub struct ExecOptions {
pub env: Vec<String>,
pub user: Option<String>,
pub workdir: Option<String>,
pub privileged: bool,
pub detach: bool,
pub index: Option<u32>,
}
#[derive(Default)]
pub struct PsOptions {
pub all: bool,
pub quiet: bool,
pub json: bool,
}
#[derive(Default)]
pub struct ImagesOptions {
pub quiet: bool,
pub json: bool,
}
#[derive(Default)]
pub struct LogsOptions {
pub follow: bool,
pub tail: Option<String>,
pub since: Option<String>,
pub until: Option<String>,
pub timestamps: bool,
}
fn log_query(opts: &LogsOptions) -> String {
let mut q = format!(
"stdout=true&stderr=true&follow={}×tamps={}",
opts.follow, opts.timestamps
);
if let Some(tail) = &opts.tail {
q.push_str(&format!("&tail={}", urlencoded(tail)));
}
if let Some(since) = &opts.since {
q.push_str(&format!("&since={}", urlencoded(since)));
}
if let Some(until) = &opts.until {
q.push_str(&format!("&until={}", urlencoded(until)));
}
q
}
impl Engine {
pub async fn ps(&self, file: &ComposeFile) -> Result<()> {
self.ps_with_options(file, PsOptions::default()).await
}
pub async fn ps_with_options(&self, _file: &ComposeFile, opts: PsOptions) -> Result<()> {
let label = format!("podup.project={}", self.project);
let filters = serde_json::json!({ "label": [label] });
let path = format!(
"{API_PREFIX}/containers/json?all={}&filters={}",
opts.all,
urlencoded(&filters.to_string()),
);
let containers = self
.client
.get_json::<Vec<crate::libpod::types::container::ContainerListEntry>>(&path)
.await
.map_err(ComposeError::Podman)?;
let name_of = |c: &crate::libpod::types::container::ContainerListEntry| {
c.names.join(", ").trim_start_matches('/').to_string()
};
if opts.quiet {
for c in &containers {
let id = c.id.get(..12).unwrap_or(&c.id);
println!("{id}");
}
return Ok(());
}
if opts.json {
let rows: Vec<_> = containers
.iter()
.map(|c| {
serde_json::json!({
"Name": name_of(c),
"Image": c.image,
"Status": c.status,
"ID": c.id,
})
})
.collect();
println!(
"{}",
serde_json::to_string_pretty(&rows).unwrap_or_default()
);
return Ok(());
}
println!("{:<40} {:<30} {:<20}", "NAME", "IMAGE", "STATUS");
for c in &containers {
let ports = c
.ports
.iter()
.map(|p| {
let proto = p
.protocol
.as_deref()
.map(|proto| format!("/{proto}"))
.unwrap_or_default();
format!(
"{}:{}->{}{proto}",
p.host_ip.as_deref().unwrap_or(""),
p.host_port.unwrap_or(0),
p.container_port,
)
})
.collect::<Vec<_>>()
.join(", ");
println!(
"{:<40} {:<30} {:<20} {ports}",
name_of(c),
c.image,
c.status
);
}
Ok(())
}
pub async fn logs(
&self,
file: &ComposeFile,
service_name: Option<&str>,
follow: bool,
) -> Result<()> {
self.logs_with_options(
file,
service_name,
LogsOptions {
follow,
..Default::default()
},
)
.await
}
pub async fn logs_with_options(
&self,
file: &ComposeFile,
service_name: Option<&str>,
opts: LogsOptions,
) -> Result<()> {
let follow = opts.follow;
let query = log_query(&opts);
let targets: Vec<(String, bool)> = if let Some(svc) = service_name {
let service = file
.services
.get(svc)
.ok_or_else(|| ComposeError::ServiceNotFound(svc.into()))?;
let is_tty = service.tty.unwrap_or(false);
self.replica_names(svc, service)
.into_iter()
.map(|n| (n, is_tty))
.collect()
} else {
file.services
.iter()
.flat_map(|(n, s)| {
let is_tty = s.tty.unwrap_or(false);
self.replica_names(n, s)
.into_iter()
.map(move |cname| (cname, is_tty))
})
.collect()
};
if follow && targets.len() > 1 {
let futs: Vec<_> = targets
.into_iter()
.map(|(container_name, is_tty)| {
let client = &self.client;
let query = query.clone();
async move {
let path = format!(
"{API_PREFIX}/containers/{}/logs?{query}",
urlencoded(&container_name),
);
let resp = match client.get_stream(&path).await {
Ok(r) => r,
Err(e) => {
tracing::warn!("logs {container_name}: {e}");
return;
}
};
let mut stream = if is_tty {
crate::libpod::parse_raw(resp.into_body())
} else {
crate::libpod::parse_multiplexed(resp.into_body())
};
while let Some(msg) = stream.next().await {
match msg {
Ok(LogOutput::StdOut { message }) => {
print!("{}", String::from_utf8_lossy(&message));
}
Ok(LogOutput::StdErr { message }) => {
eprint!("{}", String::from_utf8_lossy(&message));
}
Err(_) => break,
}
}
}
})
.collect();
futures_util::future::join_all(futs).await;
} else {
for (container_name, is_tty) in targets {
let path = format!(
"{API_PREFIX}/containers/{}/logs?{query}",
urlencoded(&container_name),
);
let resp = self
.client
.get_stream(&path)
.await
.map_err(ComposeError::Podman)?;
let mut stream = if is_tty {
crate::libpod::parse_raw(resp.into_body())
} else {
crate::libpod::parse_multiplexed(resp.into_body())
};
while let Some(msg) = stream.next().await {
match msg.map_err(ComposeError::Podman)? {
LogOutput::StdOut { message } => {
print!("{}", String::from_utf8_lossy(&message));
}
LogOutput::StdErr { message } => {
eprint!("{}", String::from_utf8_lossy(&message));
}
}
}
}
}
Ok(())
}
pub async fn exec(
&self,
file: &ComposeFile,
service_name: &str,
cmd: Vec<String>,
) -> Result<()> {
self.exec_with_options(file, service_name, cmd, ExecOptions::default())
.await
}
pub async fn exec_with_options(
&self,
file: &ComposeFile,
service_name: &str,
cmd: Vec<String>,
opts: ExecOptions,
) -> Result<()> {
let service = file
.services
.get(service_name)
.ok_or_else(|| ComposeError::ServiceNotFound(service_name.into()))?;
let container_name = match opts.index {
Some(i) => {
let names = self.replica_names(service_name, service);
let idx = (i as usize).saturating_sub(1);
names.get(idx).cloned().ok_or_else(|| {
ComposeError::ServiceNotFound(format!("{service_name} (replica index {i})"))
})?
}
None => self.first_replica_name(service_name, service),
};
let exec_cfg = ExecCreateConfig {
cmd: Some(cmd),
attach_stdout: Some(true),
attach_stderr: Some(true),
user: opts.user.clone(),
working_dir: opts.workdir.clone(),
privileged: opts.privileged.then_some(true),
env: (!opts.env.is_empty()).then(|| opts.env.clone()),
..Default::default()
};
let create_path = format!(
"{API_PREFIX}/containers/{}/exec",
urlencoded(&container_name),
);
let resp: ExecCreateResponse = self
.client
.post_json(&create_path, &exec_cfg)
.await
.map_err(ComposeError::Podman)?;
let exec_id = resp.id;
if opts.detach {
let start_cfg = ExecStartConfig {
detach: true,
tty: false,
};
let start_path = format!("{API_PREFIX}/exec/{}/start", urlencoded(&exec_id));
let _ = self
.client
.post_json_stream(&start_path, &start_cfg)
.await
.map_err(ComposeError::Podman)?;
return Ok(());
}
let start_cfg = ExecStartConfig {
detach: false,
tty: false,
};
let start_path = format!("{API_PREFIX}/exec/{}/start", urlencoded(&exec_id));
let start_resp = self
.client
.post_json_stream(&start_path, &start_cfg)
.await
.map_err(ComposeError::Podman)?;
let mut stream = crate::libpod::parse_multiplexed(start_resp.into_body());
while let Some(msg) = stream.next().await {
match msg.map_err(ComposeError::Podman)? {
LogOutput::StdOut { message } => {
print!("{}", String::from_utf8_lossy(&message));
}
LogOutput::StdErr { message } => {
eprint!("{}", String::from_utf8_lossy(&message));
}
}
}
let inspect_path = format!("{API_PREFIX}/exec/{}/json", urlencoded(&exec_id));
let inspect: ExecInspect = self
.client
.get_json(&inspect_path)
.await
.map_err(ComposeError::Podman)?;
if let Some(code) = inspect.exit_code {
if code != 0 {
return Err(ComposeError::RunExited(code));
}
}
Ok(())
}
pub async fn remove_orphans(&self, file: &ComposeFile) -> Result<()> {
let label = format!("podup.project={}", self.project);
let filters = serde_json::json!({ "label": [label] });
let path = format!(
"{API_PREFIX}/containers/json?all=true&filters={}",
urlencoded(&filters.to_string()),
);
let running = self
.client
.get_json::<Vec<crate::libpod::types::container::ContainerListEntry>>(&path)
.await
.map_err(ComposeError::Podman)?;
let known: std::collections::HashSet<String> = file
.services
.iter()
.flat_map(|(n, s)| self.replica_names(n, s))
.collect();
for c in running {
for raw in &c.names {
let name = raw.trim_start_matches('/');
if !known.contains(name) {
tracing::info!("removing orphan container {name}");
let rm_path =
format!("{API_PREFIX}/containers/{}?force=true", urlencoded(name));
if let Err(e) = self.client.delete_ok(&rm_path).await {
tracing::debug!("orphan delete {name}: {e}");
}
}
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::{log_query, LogsOptions};
#[test]
fn log_query_defaults_to_stdout_stderr_no_follow() {
let q = log_query(&LogsOptions::default());
assert_eq!(q, "stdout=true&stderr=true&follow=false×tamps=false");
}
#[test]
fn log_query_includes_set_options() {
let q = log_query(&LogsOptions {
follow: true,
tail: Some("20".into()),
since: Some("10m".into()),
until: Some("2024-01-01T00:00:00".into()),
timestamps: true,
});
assert!(q.contains("follow=true"));
assert!(q.contains("timestamps=true"));
assert!(q.contains("&tail=20"));
assert!(q.contains("&since=10m"));
assert!(q.contains("&until=2024-01-01T00%3A00%3A00"));
}
}