use std::collections::BTreeMap;
use crate::sdk::core::types::TrellisBindingsGetResponseBinding;
use serde::Deserialize;
use serde_json::Value;
#[derive(Debug, Clone, PartialEq)]
pub struct JobsBinding {
pub namespace: String,
pub queues: BTreeMap<String, JobsQueueBinding>,
}
#[derive(Debug, Clone, PartialEq)]
pub struct JobsRuntimeBinding {
pub jobs: JobsBinding,
pub work_stream: String,
}
#[derive(Debug, Clone, PartialEq)]
pub struct JobsQueueBinding {
pub queue_type: String,
pub publish_prefix: String,
pub work_subject: String,
pub consumer_name: String,
pub max_deliver: u64,
pub backoff_ms: Vec<u64>,
pub ack_wait_ms: u64,
pub default_deadline_ms: Option<u64>,
pub progress: bool,
pub logs: bool,
pub concurrency: u32,
}
#[derive(Debug, thiserror::Error)]
pub enum JobsBindingError {
#[error("bindings response is missing resources.jobs")]
MissingJobsResource,
#[error("bindings response is missing resources.jobs.workStream")]
MissingWorkStream,
#[error("invalid jobs queue binding for queue type '{queue_type}': {details}")]
InvalidQueueBinding { queue_type: String, details: String },
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct JobsQueueBindingValue {
publish_prefix: String,
work_subject: String,
consumer_name: String,
max_deliver: u64,
backoff_ms: Vec<u64>,
ack_wait_ms: u64,
default_deadline_ms: Option<u64>,
progress: bool,
logs: bool,
concurrency: u32,
}
#[derive(Debug)]
struct NormalizedJobsQueueBinding {
queue_type: String,
publish_prefix: String,
work_subject: String,
consumer_name: String,
max_deliver: u64,
backoff_ms: Vec<u64>,
ack_wait_ms: u64,
default_deadline_ms: Option<u64>,
progress: bool,
logs: bool,
concurrency: u32,
}
pub fn parse_jobs_binding(
namespace: &str,
queues: &BTreeMap<String, Value>,
) -> Result<JobsBinding, JobsBindingError> {
let normalized = queues
.iter()
.map(|(queue_type, value)| normalize_json_queue_binding(queue_type, value))
.collect::<Result<Vec<_>, _>>()?;
Ok(build_jobs_binding(namespace.to_string(), normalized))
}
impl TryFrom<&TrellisBindingsGetResponseBinding> for JobsRuntimeBinding {
type Error = JobsBindingError;
fn try_from(binding: &TrellisBindingsGetResponseBinding) -> Result<Self, Self::Error> {
let jobs = binding
.resources
.jobs
.as_ref()
.ok_or(JobsBindingError::MissingJobsResource)?;
let normalized = jobs
.queues
.iter()
.map(|(queue_type, queue)| normalize_core_queue_binding(queue_type, queue))
.collect::<Result<Vec<_>, _>>()?;
let work_stream = jobs
.work_stream
.clone()
.ok_or(JobsBindingError::MissingWorkStream)?;
Ok(Self {
jobs: build_jobs_binding(jobs.namespace.clone(), normalized),
work_stream,
})
}
}
fn build_jobs_binding(namespace: String, queues: Vec<NormalizedJobsQueueBinding>) -> JobsBinding {
JobsBinding {
namespace,
queues: queues
.into_iter()
.map(|queue| {
let queue_type = queue.queue_type.clone();
(queue_type, jobs_queue_binding_from_normalized(queue))
})
.collect(),
}
}
fn jobs_queue_binding_from_normalized(queue: NormalizedJobsQueueBinding) -> JobsQueueBinding {
JobsQueueBinding {
queue_type: queue.queue_type,
publish_prefix: queue.publish_prefix,
work_subject: queue.work_subject,
consumer_name: queue.consumer_name,
max_deliver: queue.max_deliver,
backoff_ms: queue.backoff_ms,
ack_wait_ms: queue.ack_wait_ms,
default_deadline_ms: queue.default_deadline_ms,
progress: queue.progress,
logs: queue.logs,
concurrency: queue.concurrency,
}
}
fn normalize_json_queue_binding(
queue_type: &str,
value: &Value,
) -> Result<NormalizedJobsQueueBinding, JobsBindingError> {
let parsed: JobsQueueBindingValue = serde_json::from_value(value.clone()).map_err(|error| {
JobsBindingError::InvalidQueueBinding {
queue_type: queue_type.to_string(),
details: error.to_string(),
}
})?;
Ok(NormalizedJobsQueueBinding {
queue_type: queue_type.to_string(),
publish_prefix: parsed.publish_prefix,
work_subject: parsed.work_subject,
consumer_name: parsed.consumer_name,
max_deliver: parsed.max_deliver,
backoff_ms: parsed.backoff_ms,
ack_wait_ms: parsed.ack_wait_ms,
default_deadline_ms: parsed.default_deadline_ms,
progress: parsed.progress,
logs: parsed.logs,
concurrency: parsed.concurrency,
})
}
fn normalize_core_queue_binding(
queue_type: &str,
queue: &crate::sdk::core::types::TrellisBindingsGetResponseBindingResourcesJobsQueuesValue,
) -> Result<NormalizedJobsQueueBinding, JobsBindingError> {
Ok(NormalizedJobsQueueBinding {
queue_type: queue.queue_type.clone(),
publish_prefix: queue.publish_prefix.clone(),
work_subject: queue.work_subject.clone(),
consumer_name: queue.consumer_name.clone(),
max_deliver: i64_to_u64(queue.max_deliver, queue_type, "maxDeliver")?,
backoff_ms: queue
.backoff_ms
.iter()
.copied()
.map(|value| i64_to_u64(value, queue_type, "backoffMs"))
.collect::<Result<Vec<_>, _>>()?,
ack_wait_ms: i64_to_u64(queue.ack_wait_ms, queue_type, "ackWaitMs")?,
default_deadline_ms: queue
.default_deadline_ms
.map(|value| i64_to_u64(value, queue_type, "defaultDeadlineMs"))
.transpose()?,
progress: queue.progress,
logs: queue.logs,
concurrency: i64_to_u32(queue.concurrency, queue_type, "concurrency")?,
})
}
fn i64_to_u64(value: i64, queue_type: &str, field: &str) -> Result<u64, JobsBindingError> {
if value < 0 {
return Err(JobsBindingError::InvalidQueueBinding {
queue_type: queue_type.to_string(),
details: format!("{field} must be a non-negative integer"),
});
}
Ok(value as u64)
}
fn i64_to_u32(value: i64, queue_type: &str, field: &str) -> Result<u32, JobsBindingError> {
let value = i64_to_u64(value, queue_type, field)?;
u32::try_from(value).map_err(|_| JobsBindingError::InvalidQueueBinding {
queue_type: queue_type.to_string(),
details: format!("{field} exceeds u32 range"),
})
}