use crate::access::*;
use crate::api;
use crate::resources::arch::*;
use crate::resources::*;
use core::time::Duration;
use melodium_core::*;
use melodium_macro::{mel_function, mel_model, mel_treatment};
use std::{
collections::HashMap,
sync::{Arc, RwLock, Weak},
};
use uuid::Uuid;
#[derive(Debug)]
#[mel_model(
param location string "api"
param api_url Option<string> none
param api_token Option<string> none
initialize initialize
)]
pub struct DistantEngine {
model: Weak<DistantEngineModel>,
location: RwLock<Option<String>>,
api_url: RwLock<Option<String>>,
api_token: RwLock<Option<String>>,
}
impl DistantEngine {
fn new(model: Weak<DistantEngineModel>) -> Self {
Self {
model,
location: RwLock::new(None),
api_url: RwLock::new(None),
api_token: RwLock::new(None),
}
}
pub fn initialize(&self) {
let model = self.model.upgrade().unwrap();
let location = model.get_location();
let api_url = model
.get_api_url()
.or_else(|| Some(crate::API_URL.to_string()));
let api_token = model.get_api_token().or_else(|| crate::API_TOKEN.clone());
self.location.write().unwrap().replace(location);
if let Some(api_url) = api_url {
self.api_url.write().unwrap().replace(api_url);
}
if let Some(api_token) = api_token {
self.api_token.write().unwrap().replace(api_token);
}
}
#[cfg(feature = "real")]
pub async fn start(
&self,
request: api::Request,
) -> Result<
(
api::DistributionResponse,
Vec<String>,
Option<Box<dyn core::future::Future<Output = Vec<String>> + Send + Unpin>>,
),
String,
> {
let location = self.location.read().unwrap().clone();
match location.as_ref().map(|loc| loc.as_str()) {
Some("api") => self.distrib_api(request).await,
Some("compose") => self.distrib_compose(request).await,
Some(oth) => Err(format!(
"\"{oth}\" is not a recognized distant execution location"
)),
None => Err("No location set".to_string()),
}
}
#[cfg(feature = "mock")]
pub async fn start(
&self,
request: api::Request,
) -> Result<
(
api::DistributionResponse,
Vec<String>,
Option<Box<dyn core::future::Future<Output = Vec<String>> + Send + Unpin>>,
),
String,
> {
Err("Mock mode, nothing to do".to_string())
}
fn invoke_source(&self, _source: &str, _params: HashMap<String, Value>) {}
#[cfg(feature = "real")]
async fn distrib_compose(
&self,
mut request: api::Request,
) -> Result<
(
api::DistributionResponse,
Vec<String>,
Option<Box<dyn core::future::Future<Output = Vec<String>> + Send + Unpin>>,
),
String,
> {
request.local_exec = true;
let mut run_api_id = None;
let mut api_errors = Vec::new();
let (api_url, api_token) = (
self.api_url.read().unwrap().clone(),
self.api_token.read().unwrap().clone(),
);
if let (Some(api_url), Some(api_token)) = (&api_url, &api_token) {
match generic_async_http_client::Request::post(&format!(
"{api_url}/execution/run/start"
))
.add_header("User-Agent", crate::USER_AGENT)
.map_err(|err| err.to_string())?
.add_header("Authorization", format!("Bearer {api_token}").as_bytes())
.map_err(|err| err.to_string())?
.add_header("Content-Type", "application/json")
.map_err(|err| err.to_string())?
.body(serde_json::to_string(&request).unwrap())
.map_err(|err| err.to_string())?
.exec()
.await
{
Ok(mut response) => {
if response.status_code() == 200 {
match response.json::<api::Response>().await {
Ok(response) => match response {
api::Response::Ok(id) => {
run_api_id = Some(id);
request.id = Some(id);
}
api::Response::Error(errs) => {
api_errors.extend(errs);
}
},
Err(error) => api_errors.push(Self::manage_error(error).await),
}
} else {
match response.text().await {
Ok(body) => api_errors.push(format!(
"Server {} response: {body}",
response.status_code()
)),
Err(error) => api_errors.push(Self::manage_error(error).await),
}
}
}
Err(error) => api_errors.push(Self::manage_error(error).await),
}
} else {
api_errors.push("API address and token missing".into());
}
let response = crate::compose::compose(request).await;
if let (Some(run_api_id), Some(api_url), Some(api_token)) =
(run_api_id.clone(), &api_url, &api_token)
{
match generic_async_http_client::Request::post(&format!(
"{api_url}/execution/run/launched"
))
.add_header("User-Agent", crate::USER_AGENT)
.map_err(|err| err.to_string())?
.add_header("Authorization", format!("Bearer {api_token}").as_bytes())
.map_err(|err| err.to_string())?
.add_header("Content-Type", "application/json")
.map_err(|err| err.to_string())?
.body(
serde_json::to_string(&api::LocalLaunched {
run_id: run_api_id,
response: match &response {
Ok(_) => api::DistributionResponse::Started(None),
Err(errs) => api::DistributionResponse::Error(errs.clone()),
},
})
.unwrap(),
)
.map_err(|err| err.to_string())?
.exec()
.await
{
Ok(mut response) => {
if response.status_code() != 200 {
match response.text().await {
Ok(body) => api_errors.push(format!(
"Server {} response: {body}",
response.status_code()
)),
Err(error) => api_errors.push(Self::manage_error(error).await),
}
}
}
Err(error) => api_errors.push(Self::manage_error(error).await),
}
}
match response {
Ok((access, mut child)) => {
let finish_notification = async move {
let mut possible_errors = Vec::new();
let status =
async_std::future::timeout(Duration::from_secs(10), child.status()).await;
match status {
Ok(Ok(exit)) => {
if let (Some(run_api_id), Some(api_url), Some(api_token)) =
(run_api_id, api_url, api_token)
{
let _ = generic_async_http_client::Request::post(&format!(
"{api_url}/execution/run/ended"
))
.add_header("User-Agent", crate::USER_AGENT)?
.add_header(
"Authorization",
format!("Bearer {api_token}").as_bytes(),
)?
.add_header("Content-Type", "application/json")?
.body(
serde_json::to_string(&api::LocalEnd {
run_id: run_api_id,
result: if exit.success() {
api::DistributionResult::Success(None)
} else {
api::DistributionResult::Failure(Some(vec![format!(
"Compose exit code {}",
exit.code()
.map(|code| code.to_string())
.unwrap_or("undefined".into())
)]))
},
})
.unwrap(),
)?
.exec()
.await;
if !exit.success() {
possible_errors.push(format!(
"Compose exited with code {}",
exit.code()
.map(|code| code.to_string())
.unwrap_or("undefined".into())
));
}
}
}
Ok(Err(err)) => {
if let (Some(run_api_id), Some(api_url), Some(api_token)) =
(run_api_id, api_url, api_token)
{
let _ = generic_async_http_client::Request::post(&format!(
"{api_url}/execution/run/ended"
))
.add_header("User-Agent", crate::USER_AGENT)?
.add_header(
"Authorization",
format!("Bearer {api_token}").as_bytes(),
)?
.add_header("Content-Type", "application/json")?
.body(
serde_json::to_string(&api::LocalEnd {
run_id: run_api_id,
result: api::DistributionResult::Failure(Some(vec![
err.to_string()
])),
})
.unwrap(),
)?
.exec()
.await;
possible_errors.push(err.to_string());
}
}
Err(err) => {
if let (Some(run_api_id), Some(api_url), Some(api_token)) =
(run_api_id, api_url, api_token)
{
let _ = generic_async_http_client::Request::post(&format!(
"{api_url}/execution/run/ended"
))
.add_header("User-Agent", crate::USER_AGENT)?
.add_header(
"Authorization",
format!("Bearer {api_token}").as_bytes(),
)?
.add_header("Content-Type", "application/json")?
.body(
serde_json::to_string(&api::LocalEnd {
run_id: run_api_id,
result: api::DistributionResult::Success(Some(vec![
format!("Compose exit timeout: {}", err.to_string()),
])),
})
.unwrap(),
)?
.exec()
.await;
possible_errors
.push(format!("Compose exit timeout: {}", err.to_string()));
}
}
}
Ok::<Vec<String>, generic_async_http_client::Error>(possible_errors)
};
let finish_notification = async move {
match finish_notification.await {
Ok(possible_errors) => possible_errors,
Err(err) => vec![format!(
"Error while sending run end notification: {}",
err.to_string()
)],
}
};
Ok((
api::DistributionResponse::Started(Some(access)),
api_errors,
Some(Box::new(Box::pin(finish_notification))),
))
}
Err(errs) => Ok((
api::DistributionResponse::Error(errs.clone()),
api_errors,
None,
)),
}
}
#[cfg(feature = "real")]
async fn distrib_api(
&self,
request: api::Request,
) -> Result<
(
api::DistributionResponse,
Vec<String>,
Option<Box<dyn core::future::Future<Output = Vec<String>> + Send + Unpin>>,
),
String,
> {
let (api_url, api_token) = (
self.api_url.read().unwrap().clone(),
self.api_token.read().unwrap().clone(),
);
if let (Some(api_url), Some(api_token)) = (api_url, api_token) {
match generic_async_http_client::Request::post(&format!(
"{api_url}/execution/run/start"
))
.add_header("User-Agent", crate::USER_AGENT)
.map_err(|err| err.to_string())?
.add_header("Authorization", format!("Bearer {api_token}").as_bytes())
.map_err(|err| err.to_string())?
.add_header("Content-Type", "application/json")
.map_err(|err| err.to_string())?
.body(serde_json::to_string(&request).unwrap())
.map_err(|err| err.to_string())?
.exec()
.await
{
Ok(mut response) => {
if response.status_code() == 200 {
match response.json::<api::Response>().await {
Ok(response) => match response {
api::Response::Ok(id) => {
async_std::task::sleep(Duration::from_secs(1)).await;
loop {
match generic_async_http_client::Request::get(&format!(
"{api_url}/execution/run/{id}/access"
))
.add_header("User-Agent", crate::USER_AGENT)
.map_err(|err| err.to_string())?
.add_header(
"Authorization",
format!("Bearer {api_token}").as_bytes(),
)
.map_err(|err| err.to_string())?
.exec()
.await
{
Ok(mut response) => match response.status_code() {
202 => {
async_std::task::sleep(Duration::from_secs(5))
.await
}
200 => match response
.json::<api::DistributionResponse>()
.await
{
Ok(distribution) => {
return Ok((distribution, vec![], None))
}
Err(error) => {
return Err(Self::manage_error(error).await)
}
},
code => {
return Err(format!(
"API {code} response: {response}",
response = match response.text().await {
Ok(response) => response,
Err(error) =>
Box::pin(Self::manage_error(error))
.await,
}
))
}
},
Err(error) => {
return Err(Self::manage_error(error).await)
}
}
}
}
api::Response::Error(errs) => {
Ok((api::DistributionResponse::Error(errs), vec![], None))
}
},
Err(error) => Err(Self::manage_error(error).await),
}
} else {
match response.text().await {
Ok(body) => Err(format!(
"Server {} response: {body}",
response.status_code()
)),
Err(error) => Err(Self::manage_error(error).await),
}
}
}
Err(error) => Err(Self::manage_error(error).await),
}
} else {
Err("API address and token missing".into())
}
}
#[cfg(feature = "real")]
async fn manage_error(error: generic_async_http_client::Error) -> String {
match error {
generic_async_http_client::Error::Io(error) => error.to_string(),
generic_async_http_client::Error::HTTPServerErr(code, mut response) => format!(
"API {code} error: {response}",
response = match response.text().await {
Ok(text) =>
if text.is_empty() {
response.status().to_string()
} else {
format!("{}: {}", response.status(), text)
},
Err(error) => Box::pin(Self::manage_error(error)).await,
}
),
generic_async_http_client::Error::HTTPClientErr(code, mut response) => format!(
"API {code} error: {response}",
response = match response.text().await {
Ok(text) =>
if text.is_empty() {
response.status().to_string()
} else {
format!("{}: {}", response.status(), text)
},
Err(error) => Box::pin(Self::manage_error(error)).await,
}
),
generic_async_http_client::Error::Other(error) => error.to_string(),
}
}
}
#[mel_treatment(
model distant_engine DistantEngine
input trigger Block<void>
output access Block<Access>
output failed Block<void>
output errors Stream<string>
)]
pub async fn distant(
max_duration: u32,
memory: u32,
cpu: u32,
storage: u32,
edition: Option<string>,
arch: Option<Arch>,
volumes: Vec<Volume>,
containers: Vec<Container>,
service_containers: Vec<ServiceContainer>,
tags: Vec<string>,
) {
let model = DistantEngineModel::into(distant_engine);
let distant = model.inner();
let key = Uuid::new_v4();
let start = api::Request {
edition: Some(edition.unwrap_or_else(|| "scratch".to_string())),
max_duration: Some(max_duration),
memory: Some(memory),
cpu: Some(cpu),
mode: api::ModeRequest::DistributionSecretKey { key: key.clone() },
config: None,
id: None,
organization_id: None,
version: env!("CARGO_PKG_VERSION").to_string(),
storage: Some(storage),
arch: arch.map(|arch| arch.0),
volumes: volumes.into_iter().map(|vol| vol.0.clone()).collect(),
containers: containers.into_iter().map(|cont| cont.0.clone()).collect(),
service_containers: service_containers
.into_iter()
.map(|cont| cont.0.clone())
.collect(),
group_id: Some(melodium_engine::execution_group_id().clone()),
parent_id: Some(melodium_engine::execution_run_id().clone()),
tags: tags,
local_exec: false,
};
if let Ok(_) = trigger.recv_one().await {
match distant.start(start).await {
Ok((distrib, api_errors, future)) => {
let _ = errors.send_many(api_errors.into()).await;
match distrib {
api::DistributionResponse::Started(Some(access_info)) => {
let _ = access
.send_one(Value::Data(Arc::new(Access(api::CommonAccess {
addresses: access_info.addresses,
port: access_info.port,
remote_key: access_info.key,
self_key: key,
disable_tls: access_info.disable_tls,
}))))
.await;
let _ = access.close().await;
let _ = failed.close().await;
if let Some(future_errors) = future {
let some_errors = future_errors.await;
if !some_errors.is_empty() {
let _ = errors.send_many(some_errors.into()).await;
}
}
let _ = errors.close().await;
}
api::DistributionResponse::Started(None) => {}
api::DistributionResponse::Error(errs) => {
let _ = failed.send_one(().into()).await;
let _ = errors.send_many(errs.into()).await;
}
}
}
Err(err) => {
let _ = failed.send_one(().into()).await;
let _ = errors.send_many(vec![err].into()).await;
}
}
}
}
#[mel_function]
pub fn default_api_url() -> string {
crate::API_URL.to_string()
}