use std::collections::HashMap;
use std::env;
use bollard::{
models::ContainerCreateBody,
query_parameters::{CreateContainerOptions, CreateImageOptions, StartContainerOptions},
service::{HostConfig, PortBinding},
};
use futures_util::TryStreamExt;
use crate::{
commands::{
container::shared::{Error as ConnectionError, Network},
global,
},
print,
};
use super::shared::{Args, Name};
const DEFAULT_PORT_MAPPING: &str = "8000:8000";
const DOCKER_IMAGE: &str = "docker.io/stellar/quickstart";
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("⛔ ️Failed to connect to docker: {0}")]
DockerConnectionFailed(#[from] ConnectionError),
#[error("⛔ ️Failed to create container: {0}")]
CreateContainerFailed(#[from] bollard::errors::Error),
#[error("⛔ ️ a container named {0:?} already running")]
ContainerAlreadyRunning(String),
}
#[derive(Debug, clap::Parser, Clone)]
pub struct Cmd {
#[command(flatten)]
pub container_args: Args,
pub network: Option<Network>,
#[arg(long)]
pub name: Option<String>,
#[arg(short = 'l', long)]
pub limits: Option<String>,
#[arg(short = 'p', long, num_args = 1.., default_value = DEFAULT_PORT_MAPPING)]
pub ports_mapping: Vec<String>,
#[arg(short = 't', long)]
pub image_tag_override: Option<String>,
#[arg(long)]
pub protocol_version: Option<String>,
}
impl Cmd {
pub async fn run(&self, global_args: &global::Args) -> Result<(), Error> {
let runner = Runner {
args: self.clone(),
network: self.network.unwrap_or(Network::Local),
print: print::Print::new(global_args.quiet),
};
runner.run_docker_command().await
}
}
struct Runner {
args: Cmd,
network: Network,
print: print::Print,
}
impl Runner {
async fn run_docker_command(&self) -> Result<(), Error> {
self.print
.infoln(format!("Starting {} network", &self.network));
let docker = self
.args
.container_args
.connect_to_docker(&self.print)
.await?;
let image = self.get_image_name();
let mut stream = docker.create_image(
Some(CreateImageOptions {
from_image: Some(image.clone()),
..Default::default()
}),
None,
None,
);
while let Some(result) = stream.try_next().await.transpose() {
if let Ok(item) = result {
if let Some(status) = item.status {
if status.contains("Pulling from")
|| status.contains("Digest")
|| status.contains("Status")
{
self.print.infoln(status);
}
}
} else {
self.print
.warnln(format!("Failed to fetch image: {image}."));
self.print.warnln(
"Attempting to start local quickstart image. The image may be out-of-date.",
);
break;
}
}
let config = ContainerCreateBody {
image: Some(image),
cmd: Some(self.get_container_args()),
attach_stdout: Some(true),
attach_stderr: Some(true),
host_config: Some(HostConfig {
auto_remove: Some(true),
port_bindings: Some(self.get_port_mapping()),
..Default::default()
}),
..Default::default()
};
let create_container_response = docker
.create_container(
Some(CreateContainerOptions {
name: Some(self.container_name().get_internal_container_name()),
..Default::default()
}),
config,
)
.await
.map_err(|e| match &e {
bollard::errors::Error::DockerResponseServerError { status_code, .. } => {
if *status_code == 409 {
return Error::ContainerAlreadyRunning(
self.container_name().get_internal_container_name(),
);
}
Error::CreateContainerFailed(e)
}
_ => Error::CreateContainerFailed(e),
})?;
docker
.start_container(&create_container_response.id, None::<StartContainerOptions>)
.await?;
self.print.checkln("Started container");
self.print_instructions();
Ok(())
}
fn get_image_name(&self) -> String {
let mut image_tag = match &self.network {
Network::Pubnet => "latest",
Network::Futurenet => "future",
_ => "testing", };
if let Some(image_override) = &self.args.image_tag_override {
self.print.infoln(format!(
"Overriding docker image tag to use '{image_override}' instead of '{image_tag}'"
));
image_tag = image_override;
}
format!("{DOCKER_IMAGE}:{image_tag}")
}
fn get_container_args(&self) -> Vec<String> {
let args = env::var("STELLAR_CONTAINER_ARGS").unwrap_or("rpc,horizon,lab".to_string());
[
format!("--{}", self.network),
format!("--enable {args}"),
self.get_protocol_version_arg(),
self.get_limits_arg(),
]
.iter()
.filter(|&s| !s.is_empty())
.cloned()
.collect()
}
fn get_port_mapping(&self) -> HashMap<String, Option<Vec<PortBinding>>> {
let mut port_mapping_hash = HashMap::new();
for port_mapping in &self.args.ports_mapping {
let ports_vec: Vec<&str> = port_mapping.split(':').collect();
let from_port = ports_vec[0];
let to_port = ports_vec[1];
port_mapping_hash.insert(
format!("{to_port}/tcp"),
Some(vec![PortBinding {
host_ip: None,
host_port: Some(from_port.to_string()),
}]),
);
}
port_mapping_hash
}
fn container_name(&self) -> Name {
Name(self.args.name.clone().unwrap_or(self.network.to_string()))
}
fn print_instructions(&self) {
let container_name = self.container_name().get_external_container_name().clone();
let additional_flags = self.args.container_args.get_additional_flags().clone();
let tail = format!("{container_name} {additional_flags}");
self.print.searchln(format!(
"Watch logs with `stellar container logs {}`",
tail.trim()
));
self.print.infoln(format!(
"Stop the container with `stellar container stop {}`",
tail.trim()
));
}
fn get_protocol_version_arg(&self) -> String {
if self.network == Network::Local {
if let Some(version) = self.args.protocol_version.as_ref() {
return format!("--protocol-version {version}");
}
}
String::new()
}
fn get_limits_arg(&self) -> String {
if self.network == Network::Local {
if let Some(limits) = self.args.limits.as_ref() {
return format!("--limits {limits}");
}
}
String::new()
}
}