#![allow(unused_imports)]
#![cfg(feature = "as_switch")]
#![cfg(feature = "sys_hotwings")]
use std::path::Path;
use std::ffi::OsStr;
use handlebars::{Handlebars, RenderError};
use ymlctx::context::{Context, CtxObj};
use crate::{TaskError, TaskErrorSource};
use super::Infrastructure;
pub struct Hotwings;
impl Infrastructure for Hotwings {
fn start<I>(&self, ctx_docker: Context, cmd: I) -> Result<String, TaskError>
where I: IntoIterator, I::Item: AsRef<std::ffi::OsStr>
{
let username = "hotwings";
let home = format!("/home/{}", &username);
let playbook_from: String = ctx_docker.unpack("playbook-from").unwrap();
let ctx_modded = ctx_docker
.set("hotwings_user", CtxObj::Str(username.to_owned()))
.set("hotwings_task_id", CtxObj::Str(get_task_id(&playbook_from)));
match k8s_api(ctx_modded, cmd) {
Ok(resources) => {
match k8s_provisioner(&resources) {
Ok(()) => Ok(String::from(resources.iter().map(|(api, res)| res as &str).collect::<Vec<&str>>().join("\n"))),
Err(e) => Err(e)
}
},
Err(e) => Err(TaskError { msg: e.desc, src: TaskErrorSource::Internal })
}
}
}
fn get_task_id<P: AsRef<Path>>(playbook: P) -> String {
playbook.as_ref().parent().unwrap().file_name().unwrap().to_string_lossy().to_string()
}
fn get_renderer() -> Handlebars {
let mut renderer = Handlebars::new();
renderer.register_template_string("batch-job", include_str!("templates-hotwings/batch.hbs")).unwrap();
renderer.register_template_string("pv-current-ro", include_str!("templates-hotwings/pv.hbs")).unwrap();
renderer.register_template_string("pvc-current-ro", include_str!("templates-hotwings/pvc.hbs")).unwrap();
return renderer;
}
pub fn k8s_api<I, S>(ctx_docker: Context, cmd: I) -> Result<Vec<(String, String)>, RenderError>
where I: IntoIterator<Item = S>, S: AsRef<OsStr>
{
let renderer = get_renderer();
let cmd_str: Vec<String> = cmd.into_iter().map(|s| s.as_ref().to_str().unwrap().to_owned()).collect();
#[cfg(not(feature = "ci_only"))]
let env_nfs_server = std::env::var("HOTWINGS_NFS_SERVER").expect("Missing environment variable HOTWINGS_NFS_SERVER?");
#[cfg(feature = "ci_only")]
let env_nfs_server = "NFS_SERVER";
#[cfg(not(feature = "ci_only"))]
let env_currentro_quota = std::env::var("HOTWINGS_CURRENTRO_QUOTA").expect("Missing environment variable HOTWINGS_CURRENTRO_QUOTA?");
#[cfg(feature = "ci_only")]
let env_currentro_quota = "100MiB";
let ctx_modded = ctx_docker
.set("command_str", CtxObj::Str(format!("[{}]", cmd_str.iter().map(|s| format!("'{}'", s)).collect::<Vec<String>>().join(","))))
.set("hotwings_nfs_server", CtxObj::Str(env_nfs_server.to_owned()))
.set("hotwings_currentro_quota", CtxObj::Str(env_currentro_quota.to_owned())) .set("hotwings_nvidia", CtxObj::Bool(ctx_docker.unpack("runtime").unwrap_or(String::from("")) == String::from("nvidia")))
.set("hotwings_gpus", CtxObj::Int(
if ctx_docker.unpack("runtime").unwrap_or(String::from("")) == String::from("nvidia") {
println!("nvidia-runtime!");
ctx_docker.unpack("gpus").unwrap_or(1i64)
}
else { 0 }
));
Ok(vec![
(String::from("api_pv"), renderer.render("pv-current-ro", &ctx_modded)?),
(String::from("api_pvc"), renderer.render("pvc-current-ro", &ctx_modded)?),
(String::from("api_job"), renderer.render("batch-job", &ctx_modded)?),
])
}
#[cfg(feature = "lang_python")]
use pyo3::prelude::*;
#[cfg(feature = "lang_python")]
pub fn k8s_provisioner(resources: &Vec<(String, String)>) -> Result<(), TaskError> {
let gil = Python::acquire_gil();
let py = gil.python();
let src_k8s_provisioner = include_str!("hotwings_k8s_api.py");
if let Err(provisioner_err) = py.run(&src_k8s_provisioner, None, None) {
provisioner_err.print_and_set_sys_last_vars(py);
Err(TaskError {
msg: String::from("An internal error has occurred sourcing the k8s provisioner script."),
src: TaskErrorSource::Internal
})
}
else {
let provisioner = py.eval("k8s_provisioner", None, None).unwrap();
let join_job = py.eval("join_job", None, None).unwrap();
for (api, res) in resources {
info!("Creating kubernetes resource:");
info!("{}", res);
match provisioner.call1((api, res)) {
Ok(api_return) => {
if api == "api_job" { if let Err(join_exception) = join_job.call1((api_return, )) {
join_exception.print_and_set_sys_last_vars(py);
match py.run("sys.stderr.flush()", None, None) {
Ok(_) => {}
Err(_) => {}
}
return Err(TaskError {
msg: format!("An exception has occurred while joining the job execution."),
src: TaskErrorSource::ExternalAPIError
});
}
}
},
Err(api_exception) => {
api_exception.print_and_set_sys_last_vars(py);
return Err(TaskError {
msg: format!("An exception has occurred in the k8s provisioner script."),
src: TaskErrorSource::ExternalAPIError
});
}
}
}
Ok(())
}
}