use std::sync::Arc;
use std::time::Duration;
use reqwest::header::{HeaderMap, HeaderValue};
use serde::de::DeserializeOwned;
use serde::Serialize;
use crate::config::{CyclesClientBuilder, CyclesConfig};
use crate::constants::{
API_KEY_HEADER, BALANCES_PATH, DECIDE_PATH, EVENTS_PATH, IDEMPOTENCY_KEY_HEADER,
RESERVATIONS_PATH,
};
use crate::error::Error;
use crate::guard::ReservationGuard;
use crate::models::enums::Unit;
use crate::models::request::{
BalanceParams, CommitRequest, DecisionRequest, EventCreateRequest, ExtendRequest,
ListReservationsParams, ReleaseRequest, ReservationCreateRequest,
};
use crate::models::response::{
BalanceResponse, CommitResponse, DecisionResponse, ErrorResponse, EventCreateResponse,
ExtendResponse, ReleaseResponse, ReservationCreateResponse, ReservationDetail,
ReservationListResponse,
};
use crate::models::{ErrorCode, ReservationId};
use crate::response::ApiResponse;
use crate::validation;
const BUDGET_NOT_FOUND_MARKER: &str = "Budget not found for provided scope";
fn enrich_budget_not_found(err: Error, unit: Unit) -> Error {
match err {
Error::Api {
status: 404,
code: Some(ErrorCode::NotFound),
message,
request_id,
retry_after,
details,
} if message.starts_with(BUDGET_NOT_FOUND_MARKER) => {
let unit_wire = serde_json::to_string(&unit)
.ok()
.map(|s| s.trim_matches('"').to_string())
.unwrap_or_else(|| "UNKNOWN".to_string());
let enriched = format!(
"{message} (request was sent with unit={unit_wire}; \
verify an ACTIVE budget exists at this scope AND unit — \
the server indexes budgets by (scope, unit), so a mismatched \
unit surfaces as a 404 NOT_FOUND)"
);
Error::Api {
status: 404,
code: Some(ErrorCode::NotFound),
message: enriched,
request_id,
retry_after,
details,
}
}
other => other,
}
}
#[derive(Clone)]
pub struct CyclesClient {
inner: Arc<ClientInner>,
}
struct ClientInner {
http: reqwest::Client,
config: CyclesConfig,
}
impl std::fmt::Debug for CyclesClient {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CyclesClient")
.field("base_url", &self.inner.config.base_url)
.finish()
}
}
impl CyclesClient {
pub fn builder(api_key: impl Into<String>, base_url: impl Into<String>) -> CyclesClientBuilder {
CyclesClientBuilder::new(api_key, base_url)
}
pub fn new(config: CyclesConfig) -> Self {
Self::from_builder(config, None)
}
pub(crate) fn from_builder(config: CyclesConfig, http_client: Option<reqwest::Client>) -> Self {
let http = http_client.unwrap_or_else(|| {
reqwest::Client::builder()
.connect_timeout(config.connect_timeout)
.timeout(config.connect_timeout + config.read_timeout)
.build()
.expect("failed to build HTTP client")
});
Self {
inner: Arc::new(ClientInner { http, config }),
}
}
pub fn config(&self) -> &CyclesConfig {
&self.inner.config
}
#[tracing::instrument(skip(self, req), fields(cycles.reservation_id, cycles.decision))]
pub async fn reserve(&self, req: ReservationCreateRequest) -> Result<ReservationGuard, Error> {
validation::validate_subject(&req.subject)?;
validation::validate_ttl_ms(req.ttl_ms)?;
validation::validate_grace_period_ms(req.grace_period_ms)?;
validation::validate_non_negative(req.estimate.amount, "estimate.amount")?;
let resp = self.create_reservation(&req).await?;
if resp.decision.is_denied() {
return Err(Error::BudgetExceeded {
message: resp
.reason_code
.clone()
.unwrap_or_else(|| "budget exceeded".to_string()),
affected_scopes: resp.affected_scopes.clone(),
retry_after: resp.retry_after_ms.map(Duration::from_millis),
request_id: None,
});
}
let reservation_id = resp
.reservation_id
.clone()
.expect("reservation_id must be present when decision is ALLOW");
let span = tracing::Span::current();
span.record("cycles.reservation_id", reservation_id.as_str());
span.record("cycles.decision", tracing::field::debug(&resp.decision));
Ok(ReservationGuard::new(
self.clone(),
reservation_id,
resp.decision,
resp.caps.clone(),
resp.expires_at_ms,
resp.affected_scopes.clone(),
req.ttl_ms,
))
}
pub async fn create_reservation(
&self,
req: &ReservationCreateRequest,
) -> Result<ReservationCreateResponse, Error> {
self.post_json(RESERVATIONS_PATH, req, Some(req.idempotency_key.as_str()))
.await
.map_err(|e| enrich_budget_not_found(e, req.estimate.unit))
}
pub async fn create_reservation_with_metadata(
&self,
req: &ReservationCreateRequest,
) -> Result<ApiResponse<ReservationCreateResponse>, Error> {
self.post_json_with_metadata(RESERVATIONS_PATH, req, Some(req.idempotency_key.as_str()))
.await
.map_err(|e| enrich_budget_not_found(e, req.estimate.unit))
}
pub async fn commit_reservation(
&self,
id: &ReservationId,
req: &CommitRequest,
) -> Result<CommitResponse, Error> {
let path = format!("{RESERVATIONS_PATH}/{}/commit", id.as_str());
self.post_json(&path, req, Some(req.idempotency_key.as_str()))
.await
}
pub async fn release_reservation(
&self,
id: &ReservationId,
req: &ReleaseRequest,
) -> Result<ReleaseResponse, Error> {
let path = format!("{RESERVATIONS_PATH}/{}/release", id.as_str());
self.post_json(&path, req, Some(req.idempotency_key.as_str()))
.await
}
pub async fn extend_reservation(
&self,
id: &ReservationId,
req: &ExtendRequest,
) -> Result<ExtendResponse, Error> {
let path = format!("{RESERVATIONS_PATH}/{}/extend", id.as_str());
self.post_json(&path, req, Some(req.idempotency_key.as_str()))
.await
}
pub async fn decide(&self, req: &DecisionRequest) -> Result<DecisionResponse, Error> {
self.post_json(DECIDE_PATH, req, Some(req.idempotency_key.as_str()))
.await
.map_err(|e| enrich_budget_not_found(e, req.estimate.unit))
}
pub async fn create_event(
&self,
req: &EventCreateRequest,
) -> Result<EventCreateResponse, Error> {
self.post_json(EVENTS_PATH, req, Some(req.idempotency_key.as_str()))
.await
.map_err(|e| enrich_budget_not_found(e, req.actual.unit))
}
pub async fn list_reservations(
&self,
params: &ListReservationsParams,
) -> Result<ReservationListResponse, Error> {
self.get_json(RESERVATIONS_PATH, Some(params)).await
}
pub async fn get_reservation(&self, id: &ReservationId) -> Result<ReservationDetail, Error> {
let path = format!("{RESERVATIONS_PATH}/{}", id.as_str());
self.get_json::<(), _>(&path, None).await
}
pub async fn get_balances(&self, params: &BalanceParams) -> Result<BalanceResponse, Error> {
if !params.has_filter() {
return Err(Error::Validation(
"getBalances requires at least one subject filter".to_string(),
));
}
self.get_json(BALANCES_PATH, Some(params)).await
}
async fn post_json<B: Serialize, R: DeserializeOwned>(
&self,
path: &str,
body: &B,
idempotency_key: Option<&str>,
) -> Result<R, Error> {
let resp: ApiResponse<R> = self
.post_json_with_metadata(path, body, idempotency_key)
.await?;
Ok(resp.into_inner())
}
async fn post_json_with_metadata<B: Serialize, R: DeserializeOwned>(
&self,
path: &str,
body: &B,
idempotency_key: Option<&str>,
) -> Result<ApiResponse<R>, Error> {
let url = format!("{}{}", self.inner.config.base_url, path);
let mut headers = HeaderMap::new();
headers.insert(
API_KEY_HEADER,
HeaderValue::from_str(&self.inner.config.api_key)
.map_err(|e| Error::Config(format!("invalid API key header value: {e}")))?,
);
if let Some(key) = idempotency_key {
if let Ok(val) = HeaderValue::from_str(key) {
headers.insert(IDEMPOTENCY_KEY_HEADER, val);
}
}
let resp = self
.inner
.http
.post(&url)
.headers(headers)
.json(body)
.send()
.await?;
let response_headers = resp.headers().clone();
let status = resp.status().as_u16();
if (200..300).contains(&status) {
let data: R = resp
.json()
.await
.map_err(|e| Error::Deserialization(serde::de::Error::custom(e.to_string())))?;
Ok(ApiResponse::from_response(data, &response_headers))
} else {
Err(self
.parse_error_response(status, resp, &response_headers)
.await)
}
}
async fn get_json<Q: Serialize, R: DeserializeOwned>(
&self,
path: &str,
query: Option<&Q>,
) -> Result<R, Error> {
let url = format!("{}{}", self.inner.config.base_url, path);
let mut request = self
.inner
.http
.get(&url)
.header(API_KEY_HEADER, &self.inner.config.api_key);
if let Some(q) = query {
request = request.query(q);
}
let resp = request.send().await?;
let response_headers = resp.headers().clone();
let status = resp.status().as_u16();
if (200..300).contains(&status) {
resp.json()
.await
.map_err(|e| Error::Deserialization(serde::de::Error::custom(e.to_string())))
} else {
Err(self
.parse_error_response(status, resp, &response_headers)
.await)
}
}
async fn parse_error_response(
&self,
status: u16,
resp: reqwest::Response,
headers: &HeaderMap,
) -> Error {
let header_request_id = headers
.get("x-request-id")
.and_then(|v| v.to_str().ok())
.map(String::from);
let body: Option<ErrorResponse> = resp.json().await.ok();
let message = body
.as_ref()
.map(|b| b.message.clone())
.unwrap_or_else(|| format!("HTTP {status}"));
let error_code: Option<ErrorCode> = body
.as_ref()
.and_then(|b| serde_json::from_value(serde_json::Value::String(b.error.clone())).ok());
let details = body.as_ref().and_then(|b| b.details.clone());
let request_id = body
.as_ref()
.and_then(|b| b.request_id.clone())
.or(header_request_id);
if status == 409
&& matches!(
error_code,
Some(ErrorCode::BudgetExceeded)
| Some(ErrorCode::OverdraftLimitExceeded)
| Some(ErrorCode::DebtOutstanding)
)
{
return Error::BudgetExceeded {
message,
affected_scopes: vec![],
retry_after: None,
request_id,
};
}
Error::Api {
status,
code: error_code,
message,
request_id,
retry_after: None,
details,
}
}
}
const _: fn() = || {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<CyclesClient>();
};
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn enrich_budget_not_found_adds_unit_hint() {
let err = Error::Api {
status: 404,
code: Some(ErrorCode::NotFound),
message: "Budget not found for provided scope: tenant:rider".to_string(),
request_id: Some("req-1".to_string()),
retry_after: None,
details: None,
};
let enriched = enrich_budget_not_found(err, Unit::Tokens);
match enriched {
Error::Api {
status,
code,
message,
request_id,
..
} => {
assert_eq!(status, 404);
assert_eq!(code, Some(ErrorCode::NotFound));
assert!(message.starts_with("Budget not found for provided scope: tenant:rider"));
assert!(message.contains("unit=TOKENS"));
assert!(message.contains("(scope, unit)"));
assert_eq!(request_id.as_deref(), Some("req-1"));
}
other => panic!("expected Api error, got {other:?}"),
}
}
#[test]
fn enrich_budget_not_found_uses_wire_format_for_unit() {
let err = Error::Api {
status: 404,
code: Some(ErrorCode::NotFound),
message: "Budget not found for provided scope: tenant:acme".to_string(),
request_id: None,
retry_after: None,
details: None,
};
let enriched = enrich_budget_not_found(err, Unit::UsdMicrocents);
if let Error::Api { message, .. } = enriched {
assert!(message.contains("unit=USD_MICROCENTS"));
} else {
panic!("expected Api error");
}
}
#[test]
fn enrich_budget_not_found_ignores_non_matching_messages() {
let err = Error::Api {
status: 404,
code: Some(ErrorCode::NotFound),
message: "Reservation not found: rsv_xyz".to_string(),
request_id: None,
retry_after: None,
details: None,
};
let enriched = enrich_budget_not_found(err, Unit::Tokens);
if let Error::Api { message, .. } = enriched {
assert_eq!(message, "Reservation not found: rsv_xyz");
assert!(!message.contains("unit="));
} else {
panic!("expected Api error");
}
}
#[test]
fn enrich_budget_not_found_ignores_non_404_errors() {
let err = Error::Api {
status: 409,
code: Some(ErrorCode::NotFound),
message: "Budget not found for provided scope: tenant:rider".to_string(),
request_id: None,
retry_after: None,
details: None,
};
let enriched = enrich_budget_not_found(err, Unit::Tokens);
if let Error::Api { message, .. } = enriched {
assert_eq!(message, "Budget not found for provided scope: tenant:rider");
} else {
panic!("expected Api error");
}
}
#[test]
fn enrich_budget_not_found_passes_through_other_error_kinds() {
let err = Error::Validation("bad input".to_string());
let enriched = enrich_budget_not_found(err, Unit::Tokens);
assert!(matches!(enriched, Error::Validation(msg) if msg == "bad input"));
}
}