use std::io::Write;
use std::path::Path;
use std::thread;
use std::time::{Duration, Instant};
use crate::api::ApiClient;
use crate::binary_format::remote_endpoints::RemoteJobStorageEndpoint;
use crate::binary_format::stream_seeker::StreamSeeker;
use crate::binary_format::utils::RemoteIndexableBuffer;
use crate::binary_format::{ModuleOutputV2, StdoutAndStderr};
use crate::error::BioLibError;
use crate::multipart_upload;
use crate::resource_uri::parse_resource_uri;
use crate::types::job::{CloudJobDict, JobDict, JobStatusResponse};
pub struct Job {
uuid: String,
auth_token: String,
job_dict: JobDict,
api_client: ApiClient,
result: Option<JobResult>,
}
pub struct JobResult {
module_output: ModuleOutputV2,
}
impl JobResult {
pub fn get_stdout(&mut self) -> crate::Result<Vec<u8>> {
self.module_output.get_stdout()
}
pub fn get_stderr(&mut self) -> crate::Result<Vec<u8>> {
self.module_output.get_stderr()
}
pub fn get_exit_code(&mut self) -> crate::Result<u16> {
self.module_output.get_exit_code()
}
pub fn get_files(
&mut self,
) -> crate::Result<Vec<crate::binary_format::module_output::OutputFile>> {
self.module_output.get_files()
}
pub fn get_file_data(
&self,
file: &crate::binary_format::module_output::OutputFile,
) -> crate::Result<Vec<u8>> {
self.module_output.get_file_data(file)
}
}
impl Job {
pub fn from_uuid(
_api_client: &ApiClient,
uuid: &str,
auth_token: Option<&str>,
) -> crate::Result<Self> {
let config = crate::Config::load();
let mut client = ApiClient::new(&config)?;
let job_dict = client.get_job(uuid, auth_token)?;
Ok(Self {
uuid: job_dict.uuid.clone(),
auth_token: job_dict.auth_token.clone(),
job_dict,
api_client: client,
result: None,
})
}
pub fn id(&self) -> &str {
&self.uuid
}
pub fn auth_token(&self) -> &str {
&self.auth_token
}
pub fn app_uri(&self) -> Option<&str> {
self.job_dict.app_uri.as_deref()
}
pub fn get_status(&mut self) -> crate::Result<String> {
self.refetch_job_dict()?;
Ok(self
.job_dict
.state
.clone()
.unwrap_or_else(|| "unknown".to_string()))
}
pub fn is_finished(&mut self) -> crate::Result<bool> {
if self.job_dict.ended_at.is_some() {
return Ok(true);
}
self.refetch_job_dict()?;
Ok(self.job_dict.ended_at.is_some())
}
pub fn wait(&mut self, timeout: Option<u64>) -> crate::Result<()> {
crate::logging::info(&format!("Waiting for job {} to finish...", self.uuid));
let deadline = timeout.map(|t| Instant::now() + Duration::from_secs(t));
while !self.is_finished()? {
if let Some(dl) = deadline {
if Instant::now() >= dl {
return Err(BioLibError::WaitTimeout(format!(
"Job {} did not finish within {} seconds",
self.uuid,
timeout.unwrap()
)));
}
}
thread::sleep(Duration::from_secs(2));
}
crate::logging::info(&format!("Result {} has finished.", self.uuid));
Ok(())
}
pub fn get_stdout(&mut self) -> crate::Result<Vec<u8>> {
self.ensure_result()?;
self.result.as_mut().unwrap().get_stdout()
}
pub fn get_stderr(&mut self) -> crate::Result<Vec<u8>> {
self.ensure_result()?;
self.result.as_mut().unwrap().get_stderr()
}
pub fn get_exit_code(&mut self) -> crate::Result<u16> {
self.ensure_result()?;
self.result.as_mut().unwrap().get_exit_code()
}
pub fn list_output_files(
&mut self,
) -> crate::Result<Vec<crate::binary_format::module_output::OutputFile>> {
self.ensure_result()?;
self.result.as_mut().unwrap().get_files()
}
pub fn save_files(&mut self, output_dir: &str, overwrite: bool) -> crate::Result<()> {
self.ensure_result()?;
let result = self.result.as_mut().unwrap();
let files = result.get_files()?;
if files.is_empty() {
crate::logging::debug("No output files to save");
return Ok(());
}
let output_path = Path::new(output_dir);
std::fs::create_dir_all(output_path)?;
let n = files.len();
let major_gap_threshold: u64 = 50_000;
let mut next_break_end = vec![0u64; n];
next_break_end[n - 1] = files[n - 1].data_start + files[n - 1].data_length;
for i in (0..n - 1).rev() {
let end_i = files[i].data_start + files[i].data_length;
let gap = files[i + 1].data_start - end_i;
if gap >= major_gap_threshold {
next_break_end[i] = end_i;
} else {
next_break_end[i] = next_break_end[i + 1];
}
}
let mut total_files_data_to_download: u64 = 0;
let mut file_read_ahead: Vec<u64> = Vec::with_capacity(n);
for (i, file) in files.iter().enumerate() {
total_files_data_to_download += file.data_length;
let end_i = file.data_start + file.data_length;
let read_ahead = next_break_end[i].saturating_sub(end_i);
file_read_ahead.push(read_ahead);
}
let first_file = &files[0];
let last_file = &files[n - 1];
let mut stream_seeker = StreamSeeker::new(
result.module_output.buffer(),
first_file.data_start,
last_file.data_start + last_file.data_length,
std::cmp::min(total_files_data_to_download, 10_000_000),
);
crate::logging::info(&format!("Saving {} files to {output_dir}...", files.len()));
for (i, file) in files.iter().enumerate() {
let file_path = output_path.join(file.path.trim_start_matches('/'));
if file_path.exists() && !overwrite {
return Err(BioLibError::General(format!(
"File {} already exists. Set overwrite=true to overwrite.",
file_path.display()
)));
}
if let Some(parent) = file_path.parent() {
std::fs::create_dir_all(parent)?;
}
crate::logging::debug(&format!(
"Downloading file {}/{}: {} ({} bytes)",
i + 1,
files.len(),
file.path,
file.data_length
));
let mut f = std::fs::File::create(&file_path)?;
stream_seeker.seek_and_write(
file.data_start,
file.data_length,
file_read_ahead[i],
&mut f,
)?;
}
Ok(())
}
pub fn cancel(&mut self) -> crate::Result<()> {
let data = serde_json::json!({"state": "cancelled"});
self.api_client
.patch_job(&self.uuid, &data, Some(&self.auth_token))?;
crate::logging::info(&format!("Result {} canceled", self.uuid));
Ok(())
}
pub fn delete(&mut self) -> crate::Result<()> {
self.api_client.delete_job(&self.uuid)?;
crate::logging::info(&format!("Result {} deleted", self.uuid));
Ok(())
}
pub fn rename(&mut self, name: &str) -> crate::Result<()> {
let data = serde_json::json!({"result_name_prefix": name});
self.api_client.patch_job(
&format!("{}/main_result", self.uuid),
&data,
Some(&self.auth_token),
)?;
crate::logging::info(&format!("Result {} renamed to \"{name}\"", self.uuid));
Ok(())
}
pub fn get_shareable_link(&self) -> String {
format!(
"{}/results/{}/?token={}",
self.api_client.base_url(),
self.uuid,
self.auth_token
)
}
pub fn stream_logs(&mut self) -> crate::Result<()> {
let cloud_job = match self.get_cloud_job_awaiting_started() {
Ok(cj) => cj,
Err(BioLibError::CloudJobFinished) => {
crate::logging::info(&format!(
"--- The result {} has already completed (no streaming will take place) ---",
self.uuid
));
crate::logging::info("--- The stdout log is printed below: ---");
if let Ok(stdout) = self.get_stdout() {
let _ = std::io::stdout().write_all(&stdout);
let _ = std::io::stdout().flush();
}
crate::logging::info("--- The stderr log is printed below: ---");
if let Ok(stderr) = self.get_stderr() {
let _ = std::io::stderr().write_all(&stderr);
let _ = std::io::stderr().flush();
}
crate::logging::info(&format!(
"--- The job {} has already completed. Its output was printed above. ---",
self.uuid
));
return Ok(());
}
Err(e) => return Err(e),
};
let mut compute_node_url = cloud_job
.compute_node_url
.as_ref()
.ok_or_else(|| BioLibError::CloudJobError("No compute node URL".to_string()))?
.clone();
if let Some(cloud_base) = self.api_client.cloud_base_url() {
if let Some(path_start) = compute_node_url
.find("://")
.and_then(|s| compute_node_url[s + 3..].find('/').map(|p| s + 3 + p))
{
compute_node_url = format!("{}{}", cloud_base, &compute_node_url[path_start..]);
} else {
compute_node_url = cloud_base.to_string();
}
crate::logging::debug(&format!(
"Using cloud proxy URL from BIOLIB_CLOUD_BASE_URL: {compute_node_url}"
));
} else {
crate::logging::debug(&format!("Using compute node URL \"{compute_node_url}\""));
}
let status_response = self.api_client.get_job_status_from_compute_node(
&compute_node_url,
&self.uuid,
Some("full"),
)?;
for update in &status_response.previous_status_updates {
crate::logging::info(&format!("Cloud: {}", update.log_message));
}
for pkg_b64 in &status_response.streamed_logs_packages_b64 {
print_log_package(pkg_b64);
}
let mut final_status_messages = Vec::new();
loop {
thread::sleep(Duration::from_secs(2));
let status_json = match self.get_job_status_with_retry(&compute_node_url) {
Ok(Some(s)) => s,
Ok(None) => break,
Err(e) => return Err(e),
};
for update in &status_json.status_updates {
if status_json.is_completed {
final_status_messages.push(update.log_message.clone());
} else {
crate::logging::info(&format!("Cloud: {}", update.log_message));
}
}
for pkg_b64 in &status_json.stdout_and_stderr_packages_b64 {
print_log_package(pkg_b64);
}
if let Some(error_code) = status_json.error_code {
crate::logging::error(&format!(
"Could not get full streamed logs due to error code: {error_code}"
));
return Err(BioLibError::CloudJobError(format!(
"Cloud error code: {error_code}"
)));
}
if status_json.is_completed {
break;
}
}
for message in final_status_messages {
crate::logging::info(&format!("Cloud: {message}"));
}
self.wait(None)?;
Ok(())
}
fn get_job_status_with_retry(
&self,
compute_node_url: &str,
) -> crate::Result<Option<JobStatusResponse>> {
for _ in 0..15 {
match self.api_client.get_job_status_from_compute_node(
compute_node_url,
&self.uuid,
None,
) {
Ok(status) => return Ok(Some(status)),
Err(_) => {
crate::logging::debug("Failed to get status from compute node, retrying...");
let mut client = ApiClient::new(&crate::Config::load())?;
let job_dict = client.get_job(&self.uuid, Some(&self.auth_token))?;
if let Some(ref cloud_job) = job_dict.cloud_job {
if cloud_job.finished_at.is_some() {
crate::logging::debug(
"Result no longer exists on compute node, checking for error...",
);
if cloud_job.error_code.unwrap_or(-1) != 0 {
let code = cloud_job.error_code.unwrap_or(-1);
return Err(BioLibError::CloudJobError(format!(
"Cloud error code: {code}"
)));
} else {
crate::logging::info(&format!(
"The job {} is finished.",
self.uuid
));
return Ok(None);
}
}
}
thread::sleep(Duration::from_secs(2));
}
}
}
Err(BioLibError::General(
"Failed to stream logs, did you lose internet connection?\n\
Call `.stream_logs()` on your job to resume streaming logs."
.to_string(),
))
}
fn get_cloud_job_awaiting_started(&self) -> crate::Result<CloudJobDict> {
let mut retry_count = 0;
loop {
retry_count += 1;
thread::sleep(Duration::from_secs(std::cmp::min(10, retry_count)));
let mut client = ApiClient::new(&crate::Config::load())?;
let job_dict = client.get_job(&self.uuid, Some(&self.auth_token))?;
if let Some(ref cloud_job) = job_dict.cloud_job {
if cloud_job.finished_at.is_some() {
return Err(BioLibError::CloudJobFinished);
}
if cloud_job.started_at.is_some() {
if cloud_job.compute_node_url.is_none() {
return Err(BioLibError::CloudJobError(format!(
"Failed to get URL to compute node for job {}",
self.uuid
)));
}
return Ok(cloud_job.clone());
}
}
crate::logging::info("Cloud: The job has been queued. Please wait...");
}
}
fn refetch_job_dict(&mut self) -> crate::Result<()> {
self.job_dict = self
.api_client
.get_job(&self.uuid, Some(&self.auth_token))?;
Ok(())
}
fn ensure_result(&mut self) -> crate::Result<()> {
if self.result.is_some() {
return Ok(());
}
let endpoint = RemoteJobStorageEndpoint::new(
self.uuid.clone(),
self.auth_token.clone(),
"results".to_string(),
);
let buffer = RemoteIndexableBuffer::new(Box::new(endpoint));
let module_output = ModuleOutputV2::new(Box::new(buffer));
self.result = Some(JobResult { module_output });
Ok(())
}
#[allow(clippy::too_many_arguments)]
pub fn start_in_cloud(
_api_client: &ApiClient,
app_uri: &str,
app_version_uuid: &str,
module_input: &[u8],
override_command: bool,
machine: Option<&str>,
experiment_id: Option<&str>,
result_prefix: Option<&str>,
timeout: Option<u64>,
notify: bool,
requested_machine_count: Option<u32>,
) -> crate::Result<Self> {
let config = crate::Config::load();
let mut client = ApiClient::new(&config)?;
let parsed_uri = parse_resource_uri(app_uri)?;
let resource_prefix = parsed_uri.resource_prefix.as_deref();
crate::logging::debug(&format!("Cloud: Creating job for {app_uri}"));
if module_input.len() < 500_000 {
let job_dict = client.create_job_with_data(
app_version_uuid,
resource_prefix,
module_input,
override_command,
experiment_id,
machine,
result_prefix,
timeout,
notify,
requested_machine_count,
)?;
return Ok(Self {
uuid: job_dict.uuid.clone(),
auth_token: job_dict.auth_token.clone(),
job_dict,
api_client: client,
result: None,
});
}
let created = client.create_job(
app_version_uuid,
resource_prefix,
override_command,
machine,
experiment_id,
timeout,
notify,
requested_machine_count,
)?;
multipart_upload::upload_module_input(
&mut client,
&created.public_id,
&created.auth_token,
module_input,
)?;
client.create_cloud_job(&created.public_id, result_prefix)?;
let job_dict = client.get_job(&created.uuid, Some(&created.auth_token))?;
Ok(Self {
uuid: job_dict.uuid.clone(),
auth_token: job_dict.auth_token.clone(),
job_dict,
api_client: client,
result: None,
})
}
pub fn fetch_jobs(
_api_client: &ApiClient,
count: usize,
status: Option<&str>,
) -> crate::Result<Vec<Job>> {
let config = crate::Config::load();
let mut client = ApiClient::new(&config)?;
let page_size = std::cmp::min(count, 1_000);
let response = client.get_jobs(page_size, status, None)?;
let mut jobs: Vec<Job> = response
.results
.into_iter()
.map(|jd| Job {
uuid: jd.uuid.clone(),
auth_token: jd.auth_token.clone(),
job_dict: jd,
api_client: ApiClient::new_unauthenticated(client.base_url()),
result: None,
})
.collect();
for page_number in 2..=response.page_count {
if jobs.len() >= count {
break;
}
let page_response = client.get_jobs(page_size, status, Some(page_number))?;
for jd in page_response.results {
jobs.push(Job {
uuid: jd.uuid.clone(),
auth_token: jd.auth_token.clone(),
job_dict: jd,
api_client: ApiClient::new_unauthenticated(client.base_url()),
result: None,
});
}
}
jobs.truncate(count);
Ok(jobs)
}
}
fn base64_decode_standard(input: &str) -> std::result::Result<Vec<u8>, String> {
const TABLE: &[u8; 128] = b"\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\
\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\
\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x3e\xff\xff\xff\x3f\
\x34\x35\x36\x37\x38\x39\x3a\x3b\x3c\x3d\xff\xff\xff\xff\xff\xff\
\xff\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\
\x0f\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\xff\xff\xff\xff\xff\
\xff\x1a\x1b\x1c\x1d\x1e\x1f\x20\x21\x22\x23\x24\x25\x26\x27\x28\
\x29\x2a\x2b\x2c\x2d\x2e\x2f\x30\x31\x32\x33\xff\xff\xff\xff\xff";
let input = input.trim_end_matches('=');
let mut out = Vec::with_capacity(input.len() * 3 / 4);
let mut buf: u32 = 0;
let mut bits: u32 = 0;
for &b in input.as_bytes() {
if b >= 128 {
return Err("invalid base64".to_string());
}
let val = TABLE[b as usize];
if val == 0xff {
return Err("invalid base64".to_string());
}
buf = (buf << 6) | val as u32;
bits += 6;
if bits >= 8 {
bits -= 8;
out.push((buf >> bits) as u8);
buf &= (1 << bits) - 1;
}
}
Ok(out)
}
fn print_log_package(b64_package: &str) {
if let Ok(decoded) = base64_decode_standard(b64_package) {
if let Ok(stdout_stderr) = StdoutAndStderr::deserialize(&decoded) {
let _ = std::io::stdout().write_all(&stdout_stderr);
let _ = std::io::stdout().flush();
}
}
}