use crate::cli::RunCommand;
use crate::polling::providers::{InitializationError, Provider};
use crate::shared::{CollectionEvent, CollectionMethod, CollectionTarget};
use crate::shell::Shell;
use crate::util::{self, CgroupManager, CgroupPath, ItemPool};
use std::cell::RefCell;
use std::collections::BTreeMap;
use std::sync::Arc;
use failure::Error;
use futures_01::future::Future;
use shiplift::builder::ContainerListOptions;
use shiplift::rep::Container;
use tokio_01::runtime::current_thread::Runtime;
const PROVIDER_TYPE: &str = "docker";
pub struct Docker {
container_id_pool: ItemPool<String>,
cgroup_manager: CgroupManager,
client: shiplift::Docker,
shell: Option<Arc<Shell>>,
runtime: RefCell<Runtime>,
}
#[derive(Debug)]
enum DockerInitError {
ConnectionFailed(shiplift::Error),
InvalidCgroupMount,
}
impl Into<InitializationError> for DockerInitError {
fn into(self) -> InitializationError {
match self {
Self::ConnectionFailed(error) => InitializationError {
original: Some(error.into()),
suggestion: String::from(
"Could not connect to the docker socket. Are you running rAdvisor as \
root?\nIf running at a non-standard URL, set DOCKER_HOST to the correct URL.",
),
},
Self::InvalidCgroupMount => InitializationError {
original: None,
suggestion: String::from(util::INVALID_CGROUP_MOUNT_MESSAGE),
},
}
}
}
#[derive(Debug)]
enum StartCollectionError {
MetadataSerializationError(Error),
CgroupNotFound,
}
impl Provider for Docker {
fn initialize(
&mut self,
_opts: &RunCommand,
shell: Arc<Shell>,
) -> Result<(), InitializationError> {
self.shell = Some(Arc::clone(&shell));
self.shell().status("Initializing", "Docker API provider");
match self.try_init() {
Ok(_) => Ok(()),
Err(init_err) => Err(init_err.into()),
}
}
fn poll(&mut self) -> Result<Vec<CollectionEvent>, Error> {
let future = self
.client
.containers()
.list(&ContainerListOptions::default());
let containers = self.exec(future)?;
let original_num = containers.len();
let to_collect: BTreeMap<String, Container> = containers
.into_iter()
.filter_map(|c| {
if should_collect_stats(&c) {
Some((c.id.clone(), c))
} else {
None
}
})
.collect::<BTreeMap<_, _>>();
let ids = to_collect.keys().map(String::clone);
let mut events: Vec<CollectionEvent> = Vec::new();
let (added, removed) = self.container_id_pool.update(ids);
let removed_len = removed.len();
events.reserve_exact(added.len() + removed_len);
events.extend(removed.into_iter().map(CollectionEvent::Stop));
let start_events = added
.into_iter()
.flat_map(|id| {
let container = match to_collect.get(&id) {
Some(container) => container,
None => {
self.shell().error(format!(
"Processed Id from ItemPool added result that was not in fetched \
container list. This is a bug!\nId: {}",
id
));
return None;
},
};
match self.make_start_event(container) {
Ok(start) => Some(start),
Err(error) => {
let container_display = display(container);
match error {
StartCollectionError::CgroupNotFound => {
self.shell().warn(format!(
"Could not create container metadata for container {}: cgroup \
path could not be constructed or does not exist",
container_display
));
},
StartCollectionError::MetadataSerializationError(cause) => {
self.shell().warn(format!(
"Could not serialize container metadata: {}",
cause
));
},
}
None
},
}
})
.collect::<Vec<_>>();
let processed_num = start_events.len();
events.extend(start_events);
if processed_num != 0 || removed_len != 0 {
self.shell().verbose(|sh| {
sh.info(format!(
"Received {} -> {} (+{}, -{}) containers from the Docker API",
original_num,
to_collect.len(),
processed_num,
removed_len
))
});
}
Ok(events)
}
}
impl Default for Docker {
fn default() -> Self { Self::new() }
}
impl Docker {
#[must_use]
pub fn new() -> Self {
let runtime = Runtime::new().unwrap();
Self {
container_id_pool: ItemPool::new(),
cgroup_manager: CgroupManager::new(),
client: shiplift::Docker::new(),
shell: None,
runtime: RefCell::new(runtime),
}
}
fn exec<I, E>(&self, future: impl Future<Item = I, Error = E>) -> Result<I, E> {
let mut rt = self.runtime.borrow_mut();
rt.block_on(future)
}
fn try_init(&mut self) -> Result<(), DockerInitError> {
let future = self.client.ping();
self.exec(future)
.map_err(DockerInitError::ConnectionFailed)?;
if !util::cgroups_mounted_properly() {
return Err(DockerInitError::InvalidCgroupMount);
}
Ok(())
}
fn make_start_event(
&mut self,
container: &Container,
) -> Result<CollectionEvent, StartCollectionError> {
let method = self.get_collection_method(container)?;
let metadata = match serde_yaml::to_value(container) {
Ok(metadata) => metadata,
Err(err) => {
return Err(StartCollectionError::MetadataSerializationError(
Error::from(err),
));
},
};
Ok(CollectionEvent::Start {
method,
target: CollectionTarget {
provider: PROVIDER_TYPE,
metadata: Some(metadata),
name: container.names.get(0).unwrap_or(&container.id).clone(),
poll_time: util::nano_ts(),
id: container.id.clone(),
},
})
}
fn get_collection_method(
&mut self,
container: &Container,
) -> Result<CollectionMethod, StartCollectionError> {
match self.get_cgroup(container) {
Some(cgroup) => Ok(CollectionMethod::LinuxCgroups(cgroup)),
None => Err(StartCollectionError::CgroupNotFound),
}
}
fn get_cgroup(&mut self, c: &Container) -> Option<CgroupPath> {
let had_driver = self.cgroup_manager.driver().is_some();
let cgroup_option: Option<CgroupPath> = self.cgroup_manager.get_cgroup_divided(
&["system.slice", &format!("docker-{}.scope", &c.id)],
&["docker", &c.id],
false,
);
if !had_driver {
if let Some(driver) = self.cgroup_manager.driver() {
self.shell()
.info(format!("Identified {} as cgroup driver", driver));
}
}
cgroup_option
}
fn shell(&self) -> &Shell {
self.shell
.as_ref()
.expect("Shell must be initialized: invariant violated")
}
}
#[allow(clippy::missing_const_for_fn)]
fn should_collect_stats(_c: &Container) -> bool { true }
fn display(container: &Container) -> &str { container.names.get(0).unwrap_or(&container.id) }