use reqwest::Method;
use reqwest_eventsource::EventSource;
use crate::client::Client;
use crate::error::VynFiError;
use crate::types::*;
#[derive(Debug, Default)]
pub struct ListJobsParams {
pub status: Option<String>,
pub limit: Option<i64>,
pub offset: Option<i64>,
}
pub struct Jobs<'a> {
client: &'a Client,
}
impl<'a> Jobs<'a> {
pub(crate) fn new(client: &'a Client) -> Self {
Self { client }
}
pub async fn generate(&self, req: &GenerateRequest) -> Result<SubmitJobResponse, VynFiError> {
self.client
.request_with_body(Method::POST, "/v1/generate", Some(req))
.await
}
pub async fn generate_config(
&self,
req: &GenerateConfigRequest,
) -> Result<SubmitJobResponse, VynFiError> {
self.client
.request_with_body(Method::POST, "/v1/generate", Some(req))
.await
}
pub async fn generate_quick(
&self,
req: &GenerateRequest,
) -> Result<QuickJobResponse, VynFiError> {
self.client
.request_with_body(Method::POST, "/v1/generate/quick", Some(req))
.await
}
pub async fn list(&self, params: &ListJobsParams) -> Result<JobList, VynFiError> {
let mut query: Vec<(&str, String)> = Vec::new();
if let Some(ref status) = params.status {
query.push(("status", status.clone()));
}
if let Some(limit) = params.limit {
query.push(("limit", limit.to_string()));
}
if let Some(offset) = params.offset {
query.push(("offset", offset.to_string()));
}
self.client
.request_with_params(Method::GET, "/v1/jobs", &query)
.await
}
pub async fn get(&self, job_id: &str) -> Result<Job, VynFiError> {
self.client
.request(Method::GET, &format!("/v1/jobs/{}", job_id))
.await
}
pub async fn cancel(&self, job_id: &str) -> Result<CancelJobResponse, VynFiError> {
self.client
.request(Method::DELETE, &format!("/v1/jobs/{}", job_id))
.await
}
pub fn stream(&self, job_id: &str) -> EventSource {
let url = self.client.url(&format!("/v1/jobs/{}/stream", job_id));
let builder = self.client.http().get(&url);
EventSource::new(builder).expect("valid request builder")
}
pub async fn download(&self, job_id: &str) -> Result<bytes::Bytes, VynFiError> {
let resp = self
.client
.request_raw(Method::GET, &format!("/v1/jobs/{}/download", job_id), &[])
.await?;
Ok(resp.bytes().await?)
}
pub async fn download_file(
&self,
job_id: &str,
file: &str,
) -> Result<bytes::Bytes, VynFiError> {
let resp = self
.client
.request_raw(
Method::GET,
&format!("/v1/jobs/{}/download/{}", job_id, file),
&[],
)
.await?;
Ok(resp.bytes().await?)
}
pub async fn download_archive(&self, job_id: &str) -> Result<crate::JobArchive, VynFiError> {
let data = self.download(job_id).await?;
crate::JobArchive::from_bytes(&data).map_err(VynFiError::Config)
}
pub async fn download_to(
&self,
job_id: &str,
path: impl AsRef<std::path::Path>,
) -> Result<std::path::PathBuf, VynFiError> {
let bytes = self.download(job_id).await?;
let p = path.as_ref().to_path_buf();
std::fs::write(&p, &bytes).map_err(|e| VynFiError::Config(e.to_string()))?;
Ok(p)
}
pub async fn list_files(&self, job_id: &str) -> Result<JobFileList, VynFiError> {
let path = format!("/v1/jobs/{}/files", job_id);
let mut last_err: Option<VynFiError> = None;
for delay_ms in &[0u64, 1_500, 3_000] {
if *delay_ms > 0 {
tokio::time::sleep(std::time::Duration::from_millis(*delay_ms)).await;
}
match self.client.request::<JobFileList>(Method::GET, &path).await {
Ok(fl) => return Ok(fl),
Err(e @ VynFiError::NotFound(_)) => last_err = Some(e),
Err(e) => return Err(e),
}
}
Err(last_err.unwrap_or_else(|| VynFiError::Config("list_files failed".into())))
}
pub async fn analytics(&self, job_id: &str) -> Result<JobAnalytics, VynFiError> {
self.client
.request(Method::GET, &format!("/v1/jobs/{}/analytics", job_id))
.await
}
pub async fn fraud_split(&self, job_id: &str) -> Result<FraudSplit, VynFiError> {
self.client
.request(Method::GET, &format!("/v1/jobs/{}/fraud-split", job_id))
.await
}
pub async fn audit_artifacts(&self, job_id: &str) -> Result<AuditArtifacts, VynFiError> {
self.client
.request(Method::GET, &format!("/v1/jobs/{}/audit-artifacts", job_id))
.await
}
pub async fn tune(
&self,
job_id: &str,
req: &AiTuneRequest,
) -> Result<AiTuneResponse, VynFiError> {
self.client
.request_with_body(
Method::POST,
&format!("/v1/jobs/{}/tune", job_id),
Some(req),
)
.await
}
pub async fn wait(
&self,
job_id: &str,
poll_interval: std::time::Duration,
timeout: std::time::Duration,
) -> Result<Job, VynFiError> {
let start = std::time::Instant::now();
loop {
let job = self.get(job_id).await?;
if matches!(job.status.as_str(), "completed" | "failed" | "cancelled") {
return Ok(job);
}
if start.elapsed() >= timeout {
return Ok(job);
}
tokio::time::sleep(poll_interval).await;
}
}
pub async fn wait_for_many(
&self,
job_ids: &[String],
poll_interval: std::time::Duration,
timeout: std::time::Duration,
) -> Result<Vec<Job>, VynFiError> {
let start = std::time::Instant::now();
let mut results: std::collections::HashMap<String, Job> = std::collections::HashMap::new();
let mut pending: Vec<String> = job_ids.to_vec();
let terminal = ["completed", "failed", "cancelled"];
while !pending.is_empty() && start.elapsed() < timeout {
let mut still_pending = Vec::new();
for jid in pending.drain(..) {
match self.get(&jid).await {
Ok(job) => {
let done = terminal.contains(&job.status.as_str());
results.insert(jid.clone(), job);
if !done {
still_pending.push(jid);
}
}
Err(_) => still_pending.push(jid),
}
}
pending = still_pending;
if !pending.is_empty() {
tokio::time::sleep(poll_interval).await;
}
}
for jid in &pending {
if !results.contains_key(jid) {
if let Ok(job) = self.get(jid).await {
results.insert(jid.clone(), job);
}
}
}
Ok(job_ids.iter().filter_map(|j| results.remove(j)).collect())
}
pub async fn stream_ndjson(
&self,
job_id: &str,
params: &NdjsonStreamParams,
) -> Result<reqwest::Response, VynFiError> {
let mut query: Vec<(&str, String)> = Vec::new();
if let Some(r) = params.rate {
query.push(("rate", r.to_string()));
}
if let Some(b) = params.burst {
query.push(("burst", b.to_string()));
}
if let Some(p) = params.progress_interval {
query.push(("progress_interval", p.to_string()));
}
if let Some(ref f) = params.file {
query.push(("file", f.clone()));
}
self.client
.request_raw(
Method::GET,
&format!("/v1/jobs/{}/stream/ndjson", job_id),
&query,
)
.await
}
}
#[derive(Debug, Default, Clone)]
pub struct NdjsonStreamParams {
pub rate: Option<u32>,
pub burst: Option<u32>,
pub progress_interval: Option<u32>,
pub file: Option<String>,
}