use ansi_term::Colour::{Green, Red, Yellow};
use chrono::DateTime;
use chrono::offset::Utc;
use hyper::{Client, Url};
use hyper::client::{Body, RequestBuilder};
use hyper::client::request::Request;
use hyper::client::response::Response;
use hyper::header::{Authorization, Basic, Bearer};
use hyper::method::Method;
use hyper::status::StatusCode;
use hyper_sync_rustls::TlsClient;
use serde::Deserialize;
use serde_json;
use serde_json::{Map, Value};
use rustls::{self, Certificate, PrivateKey};
use std::fmt;
use std::io::BufReader;
use std::net::IpAddr;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use config::AuthProvider;
use connector::ClickSslConnector;
use error::{KubeErrNo, KubeError};
#[derive(Debug, Deserialize)]
pub struct OwnerReference {
pub controller: bool,
pub kind: String,
pub name: String,
pub uid: String,
}
#[derive(Debug, Deserialize)]
pub struct Metadata {
pub name: String,
pub namespace: Option<String>,
#[serde(rename = "creationTimestamp")] pub creation_timestamp: Option<DateTime<Utc>>,
#[serde(rename = "deletionTimestamp")] pub deletion_timestamp: Option<DateTime<Utc>>,
pub labels: Option<Map<String, Value>>,
pub annotations: Option<Map<String, Value>>,
#[serde(rename = "ownerReferences")] pub owner_refs: Option<Vec<OwnerReference>>,
}
#[derive(Debug, Deserialize)]
pub enum ContainerState {
#[serde(rename = "running")]
Running {
#[serde(rename = "startedAt")] started_at: Option<DateTime<Utc>>,
},
#[serde(rename = "terminated")]
Terminated {
#[serde(rename = "containerId")] container_id: Option<String>,
#[serde(rename = "exitCode")] exit_code: u32,
#[serde(rename = "finishedAt")] finished_at: Option<DateTime<Utc>>,
message: Option<String>,
reason: Option<String>,
signal: Option<u32>,
#[serde(rename = "startedAt")] started_at: Option<DateTime<Utc>>,
},
#[serde(rename = "waiting")]
Waiting {
message: Option<String>,
reason: Option<String>,
},
}
impl fmt::Display for ContainerState {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
&ContainerState::Running { started_at } => match started_at {
Some(sa) => write!(f, "{} (started: {})", Green.paint("running"), sa),
None => write!(f, "{} (unknown start time)", Green.paint("running")),
},
&ContainerState::Terminated {
container_id: _,
ref exit_code,
ref finished_at,
message: _,
reason: _,
signal: _,
started_at: _,
} => match finished_at {
&Some(fa) => write!(
f,
"{} at {} (exit code: {})",
Red.paint("terminated"),
fa,
exit_code
),
&None => write!(
f,
"{} (time unknown) (exit code: {})",
Red.paint("terminated"),
exit_code
),
},
&ContainerState::Waiting {
message: _,
ref reason,
} => write!(
f,
"{} ({})",
Yellow.paint("waiting"),
reason.as_ref().unwrap_or(&"<no reason given>".to_owned())
),
}
}
}
#[derive(Debug, Deserialize)]
pub struct ContainerStatus {
#[serde(rename = "containerID")] pub id: Option<String>,
pub name: String,
pub image: String,
#[serde(rename = "restartCount")] pub restart_count: u32,
pub ready: bool,
pub state: ContainerState,
}
#[derive(Debug, Deserialize)]
pub struct PodStatus {
pub phase: String,
#[serde(rename = "containerStatuses")] pub container_statuses: Option<Vec<ContainerStatus>>,
}
#[derive(Debug, Deserialize)]
pub struct VolumeMount {
#[serde(rename = "mountPath")] pub mount_path: String,
pub name: String,
#[serde(rename = "readOnly")] pub read_only: Option<bool>,
#[serde(rename = "subPath")] pub sub_path: Option<String>,
}
#[derive(Debug, Deserialize)]
pub struct ContainerSpec {
pub name: String,
pub args: Option<Vec<String>>,
pub command: Option<Vec<String>>,
#[serde(rename = "volumeMounts")] pub volume_mounts: Option<Vec<VolumeMount>>,
}
#[derive(Debug, Deserialize)]
pub struct PodSpec {
pub hostname: Option<String>,
#[serde(rename = "nodeName")] pub node_name: Option<String>,
pub containers: Vec<ContainerSpec>,
}
#[derive(Debug, Deserialize)]
pub struct Pod {
pub metadata: Metadata,
pub spec: PodSpec,
pub status: PodStatus,
}
#[derive(Debug, Deserialize)]
pub struct PodList {
pub items: Vec<Pod>,
}
#[derive(Debug, Deserialize)]
pub struct Event {
pub count: u32,
pub message: String,
pub reason: String,
#[serde(rename = "lastTimestamp")] pub last_timestamp: DateTime<Utc>,
}
#[derive(Debug, Deserialize)]
pub struct EventList {
pub items: Vec<Event>,
}
#[derive(Debug, Deserialize)]
pub struct NodeCondition {
#[serde(rename = "type")] pub typ: String,
pub status: String,
}
#[derive(Debug, Deserialize)]
pub struct NodeStatus {
pub conditions: Vec<NodeCondition>,
}
#[derive(Debug, Deserialize)]
pub struct NodeSpec {
pub unschedulable: Option<bool>,
}
#[derive(Debug, Deserialize)]
pub struct Node {
pub metadata: Metadata,
pub spec: NodeSpec,
pub status: NodeStatus,
}
#[derive(Debug, Deserialize)]
pub struct NodeList {
pub items: Vec<Node>,
}
fn replicas_none() -> u32 {
0
}
fn replicas_one() -> u32 {
1
}
#[derive(Debug, Deserialize)]
pub struct DeploymentSpec {
#[serde(default = "replicas_one")] pub replicas: u32,
}
#[derive(Debug, Deserialize)]
pub struct DeploymentStatus {
#[serde(default = "replicas_none")] pub replicas: u32,
#[serde(default = "replicas_none", rename = "availableReplicas")] pub available: u32,
#[serde(default = "replicas_none", rename = "updatedReplicas")] pub updated: u32,
}
#[derive(Debug, Deserialize)]
pub struct Deployment {
pub metadata: Metadata,
pub spec: DeploymentSpec,
pub status: DeploymentStatus,
}
#[derive(Debug, Deserialize)]
pub struct DeploymentList {
pub items: Vec<Deployment>,
}
fn tcp_str() -> String {
"TCP".to_owned()
}
#[derive(Debug, Deserialize)]
pub struct ServicePort {
pub name: Option<String>,
#[serde(rename = "nodePort")] pub node_port: Option<u32>,
pub port: u32,
#[serde(default = "tcp_str")] pub protocol: String,
#[serde(rename = "targetPort")] pub target_pod: Option<Value>,
}
#[derive(Debug, Deserialize)]
pub struct ServiceSpec {
#[serde(rename = "clusterIP")] pub cluster_ip: Option<String>,
#[serde(rename = "externalIPs")] pub external_ips: Option<Vec<String>>,
pub ports: Option<Vec<ServicePort>>,
}
#[derive(Debug, Deserialize)]
pub struct Service {
pub metadata: Metadata,
pub spec: ServiceSpec,
pub status: Value,
}
#[derive(Debug, Deserialize)]
pub struct ServiceList {
pub items: Vec<Service>,
}
#[derive(Debug, Deserialize)]
pub struct NamespaceStatus {
pub phase: String,
}
#[derive(Debug, Deserialize)]
pub struct Namespace {
pub metadata: Metadata,
pub status: NamespaceStatus,
}
#[derive(Debug, Deserialize)]
pub struct NamespaceList {
pub items: Vec<Namespace>,
}
#[derive(Debug, Deserialize)]
pub struct ReplicaSetList {
pub items: Vec<Value>,
}
#[derive(Debug, Deserialize)]
pub struct StatefulSetList {
pub items: Vec<Value>,
}
#[derive(Debug, Deserialize)]
pub struct ConfigMapList {
pub items: Vec<Value>,
}
#[derive(Debug, Deserialize)]
pub struct SecretList {
pub items: Vec<Value>,
}
#[derive(Debug, Deserialize)]
pub struct JobList {
pub items: Vec<Value>,
}
pub enum KlusterAuth {
Token(String),
UserPass(String, String),
AuthProvider(AuthProvider),
}
impl KlusterAuth {
pub fn with_token(token: &str) -> KlusterAuth {
KlusterAuth::Token(token.to_owned())
}
pub fn with_userpass(user: &str, pass: &str) -> KlusterAuth {
KlusterAuth::UserPass(user.to_owned(), pass.to_owned())
}
pub fn with_auth_provider(auth_provider: AuthProvider) -> KlusterAuth {
KlusterAuth::AuthProvider(auth_provider)
}
}
pub struct ClientCertKey {
certs: Vec<Certificate>,
key: PrivateKey,
}
impl ClientCertKey {
pub fn with_cert_and_key(cert: Certificate, private_key: PrivateKey) -> ClientCertKey {
ClientCertKey {
certs: vec![cert],
key: private_key
}
}
}
pub struct Kluster {
pub name: String,
endpoint: Url,
auth: Option<KlusterAuth>,
client: Client,
connector: ClickSslConnector<TlsClient>,
}
pub struct NoCertificateVerification {}
impl rustls::ServerCertVerifier for NoCertificateVerification {
fn verify_server_cert(
&self,
_roots: &rustls::RootCertStore,
_presented_certs: &[rustls::Certificate],
_dns_name: webpki::DNSNameRef,
_ocsp_response: &[u8],
) -> Result<rustls::ServerCertVerified, rustls::TLSError> {
Ok(rustls::ServerCertVerified::assertion())
}
}
impl Kluster {
fn make_tlsclient(cert_opt: &Option<String>,
client_cert_key: &Option<ClientCertKey>,
insecure: bool) -> TlsClient {
let mut tlsclient = TlsClient::new();
if let Some(cfg) = Arc::get_mut(&mut tlsclient.cfg) {
if let &Some(ref cert_data) = cert_opt {
let mut br = BufReader::new(cert_data.as_bytes());
match cfg.root_store.add_pem_file(&mut br) {
Ok(added) => {
if added.1 > 0 {
println!(
"[WARNING] Couldn't add your server cert, connection will probably \
fail"
);
}
}
Err(e) => println!(
"[WARNING] Coudln't add your server cert, connection will probably \
fail. Error was: {:?}",
e
),
}
}
if let Some(client_cert_key) = client_cert_key {
cfg.set_single_client_cert(client_cert_key.certs.clone(),
client_cert_key.key.clone());
}
if insecure {
cfg.dangerous()
.set_certificate_verifier(Arc::new(NoCertificateVerification {}));
}
} else {
println!(
"[WARNING] Failed to configure tlsclient, connection will probably fail. \
Please restart click"
);
}
tlsclient
}
fn get_host_ip(endpoint: &mut Url) -> (Option<String>, Option<String>) {
let mut dns_host: Option<String> = None;
let mut ip: Option<String> = None;
if let Some(host) = endpoint.host_str() {
if let Ok(addr) = IpAddr::from_str(host) {
dns_host = ::certs::try_ip_to_name(&addr, endpoint.port().unwrap_or(443));
ip = Some(host.to_owned());
}
};
if let (Some(ref host), Some(ref _ip_addr)) = (dns_host.as_ref(), ip.as_ref()) {
endpoint.set_host(Some(host.as_str())).unwrap();
}
(dns_host, ip)
}
fn make_connector(
tlsclient: TlsClient,
dns_host: Option<String>,
ip: Option<String>,
) -> ClickSslConnector<TlsClient> {
if let (Some(host), Some(ip_addr)) = (dns_host, ip) {
ClickSslConnector::new(tlsclient, Some((host, ip_addr)))
} else {
ClickSslConnector::new(tlsclient, None)
}
}
fn add_auth_header<'a>(&self, req: RequestBuilder<'a>) -> RequestBuilder<'a> {
match self.auth {
Some(KlusterAuth::Token(ref token)) => req.header(Authorization(Bearer {
token: token.clone(),
})),
Some(KlusterAuth::AuthProvider(ref auth_provider)) => {
match auth_provider.ensure_token() {
Some(token) => req.header(Authorization(Bearer { token: token })),
None => {
print_token_err();
req
}
}
}
Some(KlusterAuth::UserPass(ref user, ref pass)) => req.header(Authorization(Basic {
username: user.clone(),
password: Some(pass.clone()),
})),
None => req,
}
}
pub fn new(
name: &str,
cert_opt: Option<String>,
server: &str,
auth: Option<KlusterAuth>,
client_cert_key: Option<ClientCertKey>,
insecure: bool,
) -> Result<Kluster, KubeError> {
let tlsclient = Kluster::make_tlsclient(&cert_opt, &client_cert_key, insecure);
let tlsclient2 = Kluster::make_tlsclient(&cert_opt, &client_cert_key, insecure);
let mut endpoint = try!(Url::parse(server));
let (dns_host, ip) = Kluster::get_host_ip(&mut endpoint);
let mut client = Client::with_connector(Kluster::make_connector(
tlsclient,
dns_host.clone(),
ip.clone(),
));
client.set_read_timeout(Some(Duration::new(20, 0)));
client.set_write_timeout(Some(Duration::new(20, 0)));
Ok(Kluster {
name: name.to_owned(),
endpoint: endpoint,
auth: auth,
client: client,
connector: Kluster::make_connector(tlsclient2, dns_host, ip),
})
}
fn send_req(&self, path: &str) -> Result<Response, KubeError> {
let url = try!(self.endpoint.join(path));
let req = self.client.get(url);
let req = self.add_auth_header(req);
req.send().map_err(|he| KubeError::from(he))
}
fn check_resp(&self, resp: Response) -> Result<Response, KubeError> {
if resp.status == StatusCode::Ok {
Ok(resp)
} else if resp.status == StatusCode::Unauthorized {
Err(KubeError::Kube(KubeErrNo::Unauthorized))
} else {
let val: Value = try!(serde_json::from_reader(resp));
match ::values::val_str_opt("/message", &val) {
Some(msg) => Err(KubeError::KubeServerError(msg)),
None => Err(KubeError::Kube(KubeErrNo::Unknown)),
}
}
}
pub fn get<T>(&self, path: &str) -> Result<T, KubeError>
where
for<'de> T: Deserialize<'de>,
{
let resp = try!(self.send_req(path));
let resp = try!(self.check_resp(resp));
serde_json::from_reader(resp).map_err(|sje| KubeError::from(sje))
}
pub fn get_read(&self, path: &str, timeout: Option<Duration>) -> Result<Response, KubeError> {
if timeout.is_some() {
let url = try!(self.endpoint.join(path));
let mut req = try!(Request::with_connector(Method::Get, url, &self.connector,));
{
let headers = req.headers_mut();
match self.auth {
Some(KlusterAuth::Token(ref token)) => {
headers.set(Authorization(Bearer {
token: token.clone(),
}));
}
Some(KlusterAuth::AuthProvider(ref auth_provider)) => {
match auth_provider.ensure_token() {
Some(token) => headers.set(
Authorization(Bearer { token: token })),
None => print_token_err(),
}
}
Some(KlusterAuth::UserPass(ref user, ref pass)) => {
headers.set(Authorization(Basic {
username: user.clone(),
password: Some(pass.clone()),
}));
}
None => {},
}
}
try!(req.set_read_timeout(timeout));
let next = try!(req.start());
let resp = try!(next.send().map_err(|he| KubeError::from(he)));
self.check_resp(resp)
} else {
let resp = try!(self.send_req(path));
self.check_resp(resp)
}
}
pub fn get_value(&self, path: &str) -> Result<Value, KubeError> {
let resp = try!(self.send_req(path));
let resp = try!(self.check_resp(resp));
serde_json::from_reader(resp).map_err(|sje| KubeError::from(sje))
}
pub fn delete(&self, path: &str, body: Option<String>) -> Result<Response, KubeError> {
let url = try!(self.endpoint.join(path));
let req = self.client.delete(url);
let req = match body {
Some(ref b) => {
let hyper_body = Body::BufBody(b.as_bytes(), b.len());
req.body(hyper_body)
}
None => req,
};
let req = self.add_auth_header(req);
req.send().map_err(|he| KubeError::from(he))
}
pub fn namespaces_for_context(&self) -> Result<Vec<String>, KubeError> {
let mut vec = Vec::new();
let res = try!(self.get::<NamespaceList>("/api/v1/namespaces"));
for ns in res.items.iter() {
vec.push(ns.metadata.name.clone());
}
Ok(vec)
}
}
fn print_token_err() {
println!(
"Couldn't get an authentication token. You can try exiting Click and \
running a kubectl command against the cluster to refresh it. \
Also please report this error on the Click github page."
);
}