use std::borrow::Cow;
use futures::Future;
use representation::rep::ContainerDetails;
use Error;
use Result;
use futures::Stream;
use build::LogsOptions;
use util::build_simple_query;
use transport::parse::parse_to_lines;
use transport::parse::parse_to_trait;
use transport::parse::parse_to_stream;
use transport::parse::status_code;
use representation::rep::Change;
use representation::rep::Stats;
use representation::rep::Top;
use http::StatusCode;
use hyper::Body;
use std::time::Duration;
use representation::rep::Exit;
use build::RmContainerOptions;
use build::ExecContainerOptions;
use serde_json::Value;
use errors::ErrorKind as EK;
use transport::tty;
use futures::future;
use hyper::Chunk;
use std::sync::Arc;
use build::ContainerArchivePutOptions;
use tarball::tarball;
use transport::interact::InteractApi;
use transport::interact::InteractApiExt;
use communicate::util::AsSlice;
pub struct Container
{
interact: Arc<InteractApi>,
id: Cow<'static, str>,
}
impl Clone for Container
{
fn clone(&self) -> Container {
Container::new(self.interact.clone(), self.id.clone())
}
}
impl Container
{
pub(crate) fn new(interact: Arc<InteractApi>, id: Cow<'static, str>) -> Container {
Container {
interact,
id: id.into(),
}
}
pub fn id(&self) -> &str {
&self.id
}
pub fn inspect(&self) -> impl Future<Item=ContainerDetails, Error=Error> + Send {
let path = format!("/containers/{}/json", self.id);
let args = path.as_str();
parse_to_trait::<ContainerDetails>(self.interact.get(args))
}
pub fn top(&self, psargs: Option<&str>) -> impl Future<Item=Top, Error=Error> + Send {
let path = format!("/containers/{}/top", self.id);
let query = build_simple_query("ps_args", psargs);
let args = (path.as_ref(), query.as_slice());
parse_to_trait::<Top>(self.interact.get(args))
}
pub fn logs(&self, opts: &LogsOptions) -> impl Stream<Item=String, Error=Error> + Send {
let path = format!("/containers/{}/logs", self.id);
let query = opts.serialize();
let args = (path.as_str(), query.as_slice());
parse_to_lines(self.interact.get(args))
}
pub fn changes(&self) -> impl Future<Item=Vec<Change>, Error=Error> + Send {
let args = format!("/containers/{}/changes", self.id);
parse_to_trait::<Vec<Change>>(self.interact.get(args.as_str()))
}
pub fn export(&self) -> impl Stream<Item=Chunk, Error=Error> + Send {
let path = format!("/containers/{}/export", self.id);
self.interact.get(path.as_str())
.and_then(|a| a.map_err(Error::from))
.and_then(|a| Ok(a.into_body().map_err(Error::from)))
.flatten_stream()
}
pub fn stats(&self) -> impl Stream<Item=Result<Stats>, Error=Error> + Send {
let args = format!("/containers/{}/stats", self.id);
parse_to_stream::<Stats>(self.interact.get(args.as_str()))
}
pub fn start(&self) -> impl Future<Item=StatusCode, Error=Error> + Send {
let args = format!("/containers/{}/start", self.id);
status_code(self.interact.post(args.as_str()))
}
pub fn stop(&self, wait: Option<Duration>) -> impl Future<Item=StatusCode, Error=Error> + Send {
let path = format!("/containers/{}/stop", self.id);
let query =
build_simple_query("t", wait.map(|w| w.as_secs().to_string()));
let args = (path.as_str(), query.as_slice());
status_code(self.interact.post(args))
}
pub fn restart(&self, wait: Option<Duration>) -> impl Future<Item=StatusCode, Error=Error> + Send {
let path = format!("/containers/{}/restart", self.id);
let query =
build_simple_query("t", wait.map(|w| w.as_secs().to_string()));
let args = (path.as_str(), query.as_slice());
status_code(self.interact.post(args))
}
pub fn kill(&self, signal: Option<&str>) -> impl Future<Item=StatusCode, Error=Error> + Send {
let path = format!("/containers/{}/kill", self.id);
let query = build_simple_query("signal", signal.map(|sig| sig));
let args = (path.as_str(), query.as_slice());
status_code(self.interact.post(args))
}
pub fn rename(&self, name: &str) -> impl Future<Item=StatusCode, Error=Error> + Send {
let path = format!("/containers/{}/rename", self.id);
let query = build_simple_query("name", Some(name));
let args = (path.as_str(), query.as_slice());
status_code(self.interact.post(args))
}
pub fn pause(&self) -> impl Future<Item=StatusCode, Error=Error> + Send {
let args = format!("/containers/{}/pause", self.id);
status_code(self.interact.post(args.as_str()))
}
pub fn unpause(&self) -> impl Future<Item=StatusCode, Error=Error> + Send {
let args = format!("/containers/{}/unpause", self.id);
status_code(self.interact.post(args.as_str()))
}
pub fn wait(&self) -> impl Future<Item=Exit, Error=Error> + Send {
let args = format!("/containers/{}/wait", self.id);
parse_to_trait::<Exit>(self.interact.post(args.as_str()))
}
pub fn delete(&self) -> impl Future<Item=StatusCode, Error=Error> + Send {
let args = format!("/containers/{}", self.id);
status_code(self.interact.delete(args.as_str()))
}
pub fn remove(&self, opts: &RmContainerOptions) -> impl Future<Item=StatusCode, Error=Error> + Send {
let path = format!("/containers/{}", self.id);
let query = opts.serialize();
let args = (path.as_str(), query.as_slice());
status_code(self.interact.delete(args))
}
pub fn create_exec(&self, opts: &ExecContainerOptions)
-> impl Future<Item=String, Error=Error> + Send
{
let path = format!("/containers/{}/exec", self.id);
let body = opts.serialize().map(Body::from);
let args = (path.as_str(), body);
parse_to_trait::<Value>(self.interact.post_json(args))
.and_then(|val| {
debug!("{:?}", val);
match val {
Value::Object(obj) => future::result(obj
.get("Id")
.ok_or(Error::from(EK::JsonFieldMissing("Id")))
.and_then(|val| val.as_str()
.ok_or(Error::from(EK::JsonTypeError("Id", "String"))))
.map(|a| {
debug!("String: {}", a.to_string());
a.to_string()
})),
_ => future::err(Error::from(EK::JsonTypeError("<anonymous>", "Object")))
}
})
}
pub fn start_exec(&self, id: String)
-> impl Stream<Item=(u32, Chunk), Error=Error>
{
let path = format!("/exec/{}/start", id);
let body = Some(Body::from("{}".to_string()));
let args = (path.as_str(), body);
let body_future = self.interact.post_json(args)
.and_then(|response| {
response.map_err(Error::from)
})
.and_then(|result| {
Ok(result.into_body().map_err(Error::from))
})
.flatten_stream();
tty::decode(body_future)
}
pub fn exec(&self, opts: &ExecContainerOptions)
-> impl Stream<Item=(u32, Chunk), Error=Error>
{
let copy_self = self.clone();
self.create_exec(opts)
.and_then(move |id| Ok(copy_self.start_exec(id)))
.flatten_stream()
}
pub fn archive_get(&self, pth: &str) -> impl Stream<Item=Chunk, Error=Error>
{
let path = format!("/containers/{}/archive", self.id);
let query = build_simple_query("path", Some(pth));
let args = (path.as_str(), query.as_slice());
self.interact.get(args)
.and_then(|a| a.map_err(Error::from))
.and_then(|a| Ok(a.into_body().map_err(Error::from)))
.flatten_stream()
}
pub fn archive_put(&self, opts: &ContainerArchivePutOptions)
-> impl Future<Item=StatusCode, Error=Error> + Send
{
let mut bytes = vec![];
let path = format!("/containers/{}/archive", self.id);
let query = opts.serialize();
let interact = self.interact.clone();
future::result(tarball::dir(&mut bytes, &opts.local_path))
.and_then(move |_| {
let body = Some(Body::from(bytes));
let args = (path.as_str(), query.as_slice(), body);
status_code(interact.put(args))
})
}
}