use crate::config::Config;
use axum::http::{HeaderValue, StatusCode, header};
use axum::response::{IntoResponse, Json, Response};
use rand::RngExt;
use reqwest::Client;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::time::{Duration, Instant};
pub fn build_client_from_config(config: &Config) -> KumihoClient {
let base_url = config.kumiho.api_url.clone();
let service_token = std::env::var("KUMIHO_SERVICE_TOKEN").unwrap_or_default();
KumihoClient::new(base_url, service_token)
}
pub fn slugify(name: &str) -> String {
name.trim()
.to_lowercase()
.chars()
.map(|c| {
if c.is_alphanumeric() || c == '-' {
c
} else {
'-'
}
})
.collect::<String>()
.split('-')
.filter(|s| !s.is_empty())
.collect::<Vec<_>>()
.join("-")
}
fn item_kref_without_selectors(kref: &str) -> &str {
kref.split_once('?').map_or(kref, |(base, _)| base)
}
#[derive(Clone)]
pub struct KumihoClient {
client: Client,
base_url: String,
service_token: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ItemResponse {
pub kref: String,
pub name: String,
pub item_name: String,
pub kind: String,
#[serde(default)]
pub deprecated: bool,
pub created_at: Option<String>,
pub author: Option<String>,
pub username: Option<String>,
pub author_display: Option<String>,
#[serde(default)]
pub metadata: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RevisionResponse {
pub kref: String,
pub item_kref: String,
pub number: i32,
#[serde(default)]
pub latest: bool,
#[serde(default)]
pub tags: Vec<String>,
#[serde(default)]
pub metadata: HashMap<String, String>,
#[serde(default)]
pub deprecated: bool,
pub created_at: Option<String>,
pub author: Option<String>,
pub username: Option<String>,
pub author_display: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BatchRevisionsResponse {
pub revisions: Vec<RevisionResponse>,
pub not_found: Vec<String>,
pub requested_count: i32,
pub found_count: i32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SearchResult {
pub item: ItemResponse,
#[serde(default)]
pub score: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BundleMemberInfo {
pub item_kref: String,
pub added_at: Option<String>,
pub added_by: Option<String>,
pub added_in_revision: Option<i32>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BundleMembersResponse {
pub members: Vec<BundleMemberInfo>,
pub total_count: Option<i32>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ArtifactResponse {
pub kref: String,
pub name: String,
pub location: String,
pub revision_kref: String,
pub item_kref: Option<String>,
#[serde(default)]
pub deprecated: bool,
pub created_at: Option<String>,
pub author: Option<String>,
pub username: Option<String>,
pub author_display: Option<String>,
#[serde(default)]
pub metadata: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EdgeResponse {
pub source_kref: String,
pub target_kref: String,
pub edge_type: String,
pub created_at: Option<String>,
#[serde(default)]
pub metadata: Option<HashMap<String, String>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SpaceResponse {
pub path: String,
pub name: String,
pub parent_path: Option<String>,
pub created_at: Option<String>,
pub author: Option<String>,
pub username: Option<String>,
pub author_display: Option<String>,
}
#[derive(Debug, thiserror::Error)]
pub enum KumihoError {
#[error("Kumiho service unreachable: {0}")]
Unreachable(#[from] reqwest::Error),
#[error("Kumiho returned {status}: {body}")]
Api { status: u16, body: String },
#[error("Kumiho upstream temporarily unavailable (HTTP {status} after {attempts} attempts)")]
UpstreamUnavailable { status: u16, attempts: u32 },
#[error("Unexpected response: {0}")]
Decode(String),
}
pub type Result<T> = std::result::Result<T, KumihoError>;
pub(crate) fn is_retryable_status(status: u16) -> bool {
matches!(status, 502 | 503 | 504 | 520 | 522 | 524)
}
const PER_ATTEMPT_TIMEOUT: Duration = Duration::from_secs(5);
const TOTAL_BUDGET: Duration = Duration::from_secs(15);
fn deadline_allows(deadline: Instant, delay_ms: u64) -> bool {
let now = Instant::now();
if now >= deadline {
return false;
}
let remaining = deadline.saturating_duration_since(now);
remaining > Duration::from_millis(delay_ms)
}
async fn sleep_with_jitter(base_ms: u64) {
let jitter_range = (base_ms as f64 * 0.2) as i64;
let jitter: i64 = if jitter_range > 0 {
rand::rng().random_range(-jitter_range..=jitter_range)
} else {
0
};
let delay = (base_ms as i64 + jitter).max(0) as u64;
tokio::time::sleep(Duration::from_millis(delay)).await;
}
pub(crate) fn looks_like_html_body(body: &str, content_type: Option<&str>) -> bool {
if let Some(ct) = content_type {
if ct.to_ascii_lowercase().starts_with("text/html") {
return true;
}
}
let trimmed = body.trim_start();
let head: String = trimmed
.chars()
.take(16)
.collect::<String>()
.to_ascii_lowercase();
head.starts_with("<!doctype") || head.starts_with("<html")
}
pub fn kumiho_error_to_response(err: KumihoError) -> Response {
match err {
KumihoError::Unreachable(e) => {
tracing::warn!(error = %e, "Kumiho unreachable");
let mut resp = (
StatusCode::SERVICE_UNAVAILABLE,
Json(serde_json::json!({
"error": "Kumiho cloud unreachable",
"error_code": "kumiho_unreachable",
"retry_after_seconds": 10,
})),
)
.into_response();
resp.headers_mut()
.insert(header::RETRY_AFTER, HeaderValue::from_static("10"));
resp
}
KumihoError::UpstreamUnavailable { status, attempts } => {
tracing::warn!(
upstream_status = status,
attempts = attempts,
"Kumiho upstream unavailable after retries"
);
let mut resp = (
StatusCode::SERVICE_UNAVAILABLE,
Json(serde_json::json!({
"error": "Kumiho cloud temporarily unavailable",
"error_code": "kumiho_upstream_unavailable",
"upstream_status": status,
"attempts": attempts,
"retry_after_seconds": 5,
})),
)
.into_response();
resp.headers_mut()
.insert(header::RETRY_AFTER, HeaderValue::from_static("5"));
resp
}
KumihoError::Api { status, body } => {
if status >= 500 {
tracing::warn!(upstream_status = status, body = %body, "Kumiho 5xx (non-retried)");
let mut resp = (
StatusCode::SERVICE_UNAVAILABLE,
Json(serde_json::json!({
"error": "Kumiho cloud temporarily unavailable",
"error_code": "kumiho_upstream_unavailable",
"upstream_status": status,
"attempts": 1,
"retry_after_seconds": 5,
})),
)
.into_response();
resp.headers_mut()
.insert(header::RETRY_AFTER, HeaderValue::from_static("5"));
return resp;
}
let code = if status == 401 || status == 403 {
StatusCode::BAD_GATEWAY
} else {
StatusCode::from_u16(status).unwrap_or(StatusCode::BAD_GATEWAY)
};
(
code,
Json(serde_json::json!({
"error": format!("Kumiho upstream: {body}"),
"error_code": "kumiho_upstream_error",
"upstream_status": status,
})),
)
.into_response()
}
KumihoError::Decode(msg) => (
StatusCode::BAD_GATEWAY,
Json(serde_json::json!({
"error": format!("Bad response from Kumiho: {msg}"),
"error_code": "kumiho_decode_error",
})),
)
.into_response(),
}
}
#[derive(Serialize)]
struct CreateProjectBody {
name: String,
#[serde(skip_serializing_if = "Option::is_none")]
description: Option<String>,
}
#[derive(Serialize)]
struct CreateSpaceBody {
parent_path: String,
name: String,
}
#[derive(Serialize)]
struct CreateItemBody {
space_path: String,
item_name: String,
kind: String,
#[serde(skip_serializing_if = "HashMap::is_empty")]
metadata: HashMap<String, String>,
}
#[derive(Serialize)]
struct CreateRevisionBody {
item_kref: String,
#[serde(skip_serializing_if = "HashMap::is_empty")]
metadata: HashMap<String, String>,
}
#[derive(Serialize)]
struct CreateBundleBody {
space_path: String,
bundle_name: String,
#[serde(skip_serializing_if = "HashMap::is_empty")]
metadata: HashMap<String, String>,
}
#[derive(Serialize)]
struct BundleMemberBody {
bundle_kref: String,
item_kref: String,
#[serde(skip_serializing_if = "Option::is_none")]
metadata: Option<HashMap<String, String>>,
}
#[derive(Serialize)]
struct RemoveBundleMemberBody {
bundle_kref: String,
item_kref: String,
}
#[derive(Serialize)]
struct CreateEdgeBody {
source_revision_kref: String,
target_revision_kref: String,
edge_type: String,
#[serde(skip_serializing_if = "HashMap::is_empty")]
metadata: HashMap<String, String>,
}
#[derive(Serialize)]
struct CreateArtifactBody {
revision_kref: String,
name: String,
location: String,
#[serde(skip_serializing_if = "HashMap::is_empty")]
metadata: HashMap<String, String>,
}
impl KumihoClient {
pub fn new(base_url: String, service_token: String) -> Self {
let client = Client::builder()
.timeout(std::time::Duration::from_secs(20))
.connect_timeout(std::time::Duration::from_secs(5))
.pool_max_idle_per_host(32)
.build()
.unwrap_or_else(|_| Client::new());
Self {
client,
base_url: base_url.trim_end_matches('/').to_string(),
service_token,
}
}
pub fn client(&self) -> &Client {
&self.client
}
fn url(&self, path: &str) -> String {
format!("{}/api/v1{}", self.base_url, path)
}
async fn check_response(&self, resp: reqwest::Response) -> Result<reqwest::Response> {
let status = resp.status();
if status.is_success() {
Ok(resp)
} else {
let code = status.as_u16();
let content_type = resp
.headers()
.get(reqwest::header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.map(str::to_owned);
let body = resp.text().await.unwrap_or_default();
let body = if looks_like_html_body(&body, content_type.as_deref()) {
tracing::warn!(
status = code,
content_type = content_type.as_deref().unwrap_or(""),
body_preview = body.chars().take(256).collect::<String>(),
"Kumiho returned HTML error body (trimming before propagation)"
);
"<HTML error page — see gateway logs>".to_string()
} else {
body
};
Err(KumihoError::Api { status: code, body })
}
}
async fn send_with_retry<F>(&self, build: F) -> Result<reqwest::Response>
where
F: Fn() -> reqwest::RequestBuilder,
{
self.send_with_retry_deadline(build, Instant::now() + TOTAL_BUDGET)
.await
}
async fn send_with_retry_deadline<F>(
&self,
build: F,
deadline: Instant,
) -> Result<reqwest::Response>
where
F: Fn() -> reqwest::RequestBuilder,
{
const MAX_ATTEMPTS: u32 = 3;
const BASE_DELAYS_MS: [u64; 2] = [500, 1500];
let mut last_status: Option<u16> = None;
for attempt in 1..=MAX_ATTEMPTS {
let now = Instant::now();
if now >= deadline {
break;
}
let attempt_cap = PER_ATTEMPT_TIMEOUT.min(deadline.saturating_duration_since(now));
let attempt_request = build().timeout(attempt_cap);
let result = attempt_request.send().await;
match result {
Ok(resp) => {
let status = resp.status().as_u16();
if is_retryable_status(status) {
last_status = Some(status);
if attempt < MAX_ATTEMPTS {
let delay_ms = BASE_DELAYS_MS[(attempt - 1) as usize];
if !deadline_allows(deadline, delay_ms) {
drop(resp);
break;
}
tracing::warn!(
attempt = attempt,
max_attempts = MAX_ATTEMPTS,
upstream_status = status,
"Kumiho returned transient 5xx; retrying"
);
drop(resp);
sleep_with_jitter(delay_ms).await;
continue;
}
drop(resp);
break;
}
return Ok(resp);
}
Err(e) => {
if attempt < MAX_ATTEMPTS {
let delay_ms = BASE_DELAYS_MS[(attempt - 1) as usize];
if !deadline_allows(deadline, delay_ms) {
return Err(KumihoError::Unreachable(e));
}
tracing::warn!(
attempt = attempt,
max_attempts = MAX_ATTEMPTS,
error = %e,
"Kumiho request failed (network); retrying"
);
sleep_with_jitter(delay_ms).await;
continue;
}
return Err(KumihoError::Unreachable(e));
}
}
}
Err(KumihoError::UpstreamUnavailable {
status: last_status.unwrap_or(502),
attempts: MAX_ATTEMPTS,
})
}
async fn send_no_retry<F>(&self, build: F) -> Result<reqwest::Response>
where
F: FnOnce() -> reqwest::RequestBuilder,
{
let result = build().timeout(PER_ATTEMPT_TIMEOUT).send().await;
match result {
Ok(resp) => {
let status = resp.status().as_u16();
if is_retryable_status(status) {
drop(resp);
return Err(KumihoError::UpstreamUnavailable {
status,
attempts: 1,
});
}
Ok(resp)
}
Err(e) => Err(KumihoError::Unreachable(e)),
}
}
pub async fn ensure_project(&self, project_name: &str) -> Result<()> {
let body = CreateProjectBody {
name: project_name.to_string(),
description: None,
};
let resp = self
.send_no_retry(|| {
self.client
.post(self.url("/projects"))
.header("X-Kumiho-Token", &self.service_token)
.json(&body)
})
.await?;
let status = resp.status().as_u16();
if resp.status().is_success() || status == 409 {
Ok(())
} else {
let _ = self.check_response(resp).await?;
Ok(())
}
}
pub async fn ensure_space(&self, project: &str, space_name: &str) -> Result<()> {
let body = CreateSpaceBody {
parent_path: format!("/{project}"),
name: space_name.to_string(),
};
let resp = self
.send_no_retry(|| {
self.client
.post(self.url("/spaces"))
.header("X-Kumiho-Token", &self.service_token)
.json(&body)
})
.await?;
let status = resp.status().as_u16();
if resp.status().is_success() || status == 409 {
Ok(())
} else {
let _ = self.check_response(resp).await?;
Ok(())
}
}
pub async fn ensure_child_space(
&self,
_project: &str,
parent_path: &str,
space_name: &str,
) -> Result<()> {
let body = CreateSpaceBody {
parent_path: parent_path.to_string(),
name: space_name.to_string(),
};
let resp = self
.send_no_retry(|| {
self.client
.post(self.url("/spaces"))
.header("X-Kumiho-Token", &self.service_token)
.json(&body)
})
.await?;
let status = resp.status().as_u16();
if resp.status().is_success() || status == 409 {
Ok(())
} else {
let _ = self.check_response(resp).await?;
Ok(())
}
}
pub async fn list_spaces(
&self,
parent_path: &str,
recursive: bool,
) -> Result<Vec<SpaceResponse>> {
let resp = self
.send_with_retry(|| {
self.client
.get(self.url("/spaces"))
.header("X-Kumiho-Token", &self.service_token)
.query(&[
("parent_path", parent_path),
("recursive", if recursive { "true" } else { "false" }),
])
})
.await?;
let resp = self.check_response(resp).await?;
resp.json::<Vec<SpaceResponse>>()
.await
.map_err(|e| KumihoError::Decode(e.to_string()))
}
pub async fn list_items(
&self,
space_path: &str,
include_deprecated: bool,
) -> Result<Vec<ItemResponse>> {
self.list_items_paged(space_path, include_deprecated, 100, 0)
.await
}
pub async fn list_items_paged(
&self,
space_path: &str,
include_deprecated: bool,
limit: u32,
offset: u32,
) -> Result<Vec<ItemResponse>> {
let limit_s = limit.to_string();
let offset_s = offset.to_string();
let resp = self
.send_with_retry(|| {
self.client
.get(self.url("/items"))
.header("X-Kumiho-Token", &self.service_token)
.query(&[
("space_path", space_path),
(
"include_deprecated",
if include_deprecated { "true" } else { "false" },
),
("limit", limit_s.as_str()),
("offset", offset_s.as_str()),
])
})
.await?;
let resp = self.check_response(resp).await?;
resp.json::<Vec<ItemResponse>>()
.await
.map_err(|e| KumihoError::Decode(e.to_string()))
}
pub async fn list_items_filtered(
&self,
space_path: &str,
name_filter: &str,
include_deprecated: bool,
) -> Result<Vec<ItemResponse>> {
let resp = self
.send_with_retry(|| {
self.client
.get(self.url("/items"))
.header("X-Kumiho-Token", &self.service_token)
.query(&[
("space_path", space_path),
("name_filter", name_filter),
(
"include_deprecated",
if include_deprecated { "true" } else { "false" },
),
])
})
.await?;
let resp = self.check_response(resp).await?;
resp.json::<Vec<ItemResponse>>()
.await
.map_err(|e| KumihoError::Decode(e.to_string()))
}
pub async fn create_item(
&self,
space_path: &str,
item_name: &str,
kind: &str,
metadata: HashMap<String, String>,
) -> Result<ItemResponse> {
let body = CreateItemBody {
space_path: space_path.to_string(),
item_name: item_name.to_string(),
kind: kind.to_string(),
metadata,
};
let resp = self
.send_no_retry(|| {
self.client
.post(self.url("/items"))
.header("X-Kumiho-Token", &self.service_token)
.json(&body)
})
.await?;
let resp = self.check_response(resp).await?;
resp.json::<ItemResponse>()
.await
.map_err(|e| KumihoError::Decode(e.to_string()))
}
pub async fn deprecate_item(&self, kref: &str, deprecated: bool) -> Result<ItemResponse> {
let item_kref = item_kref_without_selectors(kref);
let resp = self
.send_no_retry(|| {
self.client
.post(self.url("/items/deprecate"))
.header("X-Kumiho-Token", &self.service_token)
.query(&[
("kref", item_kref),
("deprecated", if deprecated { "true" } else { "false" }),
])
})
.await?;
let resp = match self.check_response(resp).await {
Ok(resp) => resp,
Err(KumihoError::Api { status, .. }) if status == 404 && deprecated => {
self.delete_item_with_force(item_kref, false).await?;
return self.get_item_by_kref(item_kref).await;
}
Err(e) => return Err(e),
};
resp.json::<ItemResponse>()
.await
.map_err(|e| KumihoError::Decode(e.to_string()))
}
pub async fn get_item_by_kref(&self, kref: &str) -> Result<ItemResponse> {
let item_kref = item_kref_without_selectors(kref);
let resp = self
.send_with_retry(|| {
self.client
.get(self.url("/items/by-kref"))
.header("X-Kumiho-Token", &self.service_token)
.query(&[("kref", item_kref)])
})
.await?;
let resp = self.check_response(resp).await?;
resp.json::<ItemResponse>()
.await
.map_err(|e| KumihoError::Decode(e.to_string()))
}
pub async fn delete_item(&self, kref: &str) -> Result<()> {
self.delete_item_with_force(kref, true).await
}
async fn delete_item_with_force(&self, kref: &str, force: bool) -> Result<()> {
let item_kref = item_kref_without_selectors(kref);
let resp = self
.send_no_retry(|| {
self.client
.delete(self.url("/items/by-kref"))
.header("X-Kumiho-Token", &self.service_token)
.query(&[
("kref", item_kref),
("force", if force { "true" } else { "false" }),
])
})
.await?;
let _ = self.check_response(resp).await?;
Ok(())
}
pub async fn search_items(
&self,
query: &str,
context: &str,
kind: &str,
include_deprecated: bool,
) -> Result<Vec<SearchResult>> {
let resp = self
.send_with_retry(|| {
self.client
.get(self.url("/items/fulltext-search"))
.header("X-Kumiho-Token", &self.service_token)
.query(&[
("query", query),
("context", context),
("kind", kind),
(
"include_deprecated",
if include_deprecated { "true" } else { "false" },
),
])
})
.await?;
let resp = self.check_response(resp).await?;
resp.json::<Vec<SearchResult>>()
.await
.map_err(|e| KumihoError::Decode(e.to_string()))
}
pub async fn create_revision(
&self,
item_kref: &str,
metadata: HashMap<String, String>,
) -> Result<RevisionResponse> {
let body = CreateRevisionBody {
item_kref: item_kref.to_string(),
metadata,
};
let resp = self
.send_no_retry(|| {
self.client
.post(self.url("/revisions"))
.header("X-Kumiho-Token", &self.service_token)
.json(&body)
})
.await?;
let resp = self.check_response(resp).await?;
resp.json::<RevisionResponse>()
.await
.map_err(|e| KumihoError::Decode(e.to_string()))
}
pub async fn list_item_revisions(&self, item_kref: &str) -> Result<Vec<RevisionResponse>> {
let resp = self
.send_with_retry(|| {
self.client
.get(self.url("/revisions"))
.header("X-Kumiho-Token", &self.service_token)
.query(&[("item_kref", item_kref)])
})
.await?;
let resp = self.check_response(resp).await?;
resp.json::<Vec<RevisionResponse>>()
.await
.map_err(|e| KumihoError::Decode(e.to_string()))
}
pub async fn tag_revision(&self, revision_kref: &str, tag: &str) -> Result<()> {
let body = serde_json::json!({ "tag": tag });
let resp = self
.send_no_retry(|| {
self.client
.post(self.url("/revisions/tags"))
.header("X-Kumiho-Token", &self.service_token)
.query(&[("kref", revision_kref)])
.json(&body)
})
.await?;
let _ = self.check_response(resp).await?;
Ok(())
}
pub async fn deprecate_revision(
&self,
revision_kref: &str,
deprecated: bool,
) -> Result<RevisionResponse> {
let resp = self
.send_no_retry(|| {
self.client
.post(self.url("/revisions/deprecate"))
.header("X-Kumiho-Token", &self.service_token)
.query(&[
("kref", revision_kref),
("deprecated", if deprecated { "true" } else { "false" }),
])
})
.await?;
let resp = self.check_response(resp).await?;
resp.json::<RevisionResponse>()
.await
.map_err(|e| KumihoError::Decode(e.to_string()))
}
pub async fn get_revision_by_tag(
&self,
item_kref: &str,
tag: &str,
) -> Result<RevisionResponse> {
let resp = self
.send_with_retry(|| {
self.client
.get(self.url("/revisions/by-kref"))
.header("X-Kumiho-Token", &self.service_token)
.query(&[("kref", item_kref), ("t", tag)])
})
.await?;
let resp = self.check_response(resp).await?;
resp.json::<RevisionResponse>()
.await
.map_err(|e| KumihoError::Decode(e.to_string()))
}
pub async fn get_revision(&self, revision_kref: &str) -> Result<RevisionResponse> {
let resp = self
.send_with_retry(|| {
self.client
.get(self.url("/revisions/by-kref"))
.header("X-Kumiho-Token", &self.service_token)
.query(&[("kref", revision_kref)])
})
.await?;
let resp = self.check_response(resp).await?;
resp.json::<RevisionResponse>()
.await
.map_err(|e| KumihoError::Decode(e.to_string()))
}
pub async fn get_latest_revision(&self, item_kref: &str) -> Result<RevisionResponse> {
let resp = self
.send_with_retry(|| {
self.client
.get(self.url("/revisions/latest"))
.header("X-Kumiho-Token", &self.service_token)
.query(&[("item_kref", item_kref)])
})
.await?;
let resp = self.check_response(resp).await?;
resp.json::<RevisionResponse>()
.await
.map_err(|e| KumihoError::Decode(e.to_string()))
}
pub async fn get_published_or_latest(&self, item_kref: &str) -> Result<RevisionResponse> {
let deadline = Instant::now() + TOTAL_BUDGET;
let by_tag = self
.send_with_retry_deadline(
|| {
self.client
.get(self.url("/revisions/by-kref"))
.header("X-Kumiho-Token", &self.service_token)
.query(&[("kref", item_kref), ("t", "published")])
},
deadline,
)
.await;
match by_tag {
Ok(resp) => {
let resp = self.check_response(resp).await?;
resp.json::<RevisionResponse>()
.await
.map_err(|e| KumihoError::Decode(e.to_string()))
}
Err(_) => {
let resp = self
.send_with_retry_deadline(
|| {
self.client
.get(self.url("/revisions/latest"))
.header("X-Kumiho-Token", &self.service_token)
.query(&[("item_kref", item_kref)])
},
deadline,
)
.await?;
let resp = self.check_response(resp).await?;
resp.json::<RevisionResponse>()
.await
.map_err(|e| KumihoError::Decode(e.to_string()))
}
}
}
pub async fn batch_get_revisions(
&self,
item_krefs: &[String],
tag: &str,
) -> Result<HashMap<String, RevisionResponse>> {
if item_krefs.is_empty() {
return Ok(HashMap::new());
}
let body = serde_json::json!({
"item_krefs": item_krefs,
"tag": tag,
"allow_partial": true,
});
let resp = self
.send_no_retry(|| {
self.client
.post(self.url("/revisions/batch"))
.header("X-Kumiho-Token", &self.service_token)
.json(&body)
})
.await?;
let resp = self.check_response(resp).await?;
let batch: BatchRevisionsResponse = resp
.json()
.await
.map_err(|e| KumihoError::Decode(e.to_string()))?;
let mut map = HashMap::with_capacity(batch.revisions.len());
for rev in batch.revisions {
map.insert(rev.item_kref.clone(), rev);
}
Ok(map)
}
pub async fn list_skills(
&self,
project: &str,
include_deprecated: bool,
) -> Result<Vec<ItemResponse>> {
let space_path = format!("/{project}/Skills");
self.list_items(&space_path, include_deprecated).await
}
pub async fn search_skills(
&self,
query: &str,
project: &str,
include_deprecated: bool,
) -> Result<Vec<SearchResult>> {
self.search_items_with_legacy(
query,
project,
crate::skills::registration::SKILL_ITEM_KIND,
crate::skills::registration::LEGACY_SKILL_ITEM_KIND,
include_deprecated,
)
.await
}
pub async fn search_items_with_legacy(
&self,
query: &str,
context: &str,
primary: &str,
legacy: &str,
include_deprecated: bool,
) -> Result<Vec<SearchResult>> {
let primary_results = self
.search_items(query, context, primary, include_deprecated)
.await?;
let legacy_results = match self
.search_items(query, context, legacy, include_deprecated)
.await
{
Ok(r) => r,
Err(e) => {
tracing::warn!(
primary = primary,
legacy = legacy,
context = context,
error = ?e,
"search_items_with_legacy: legacy-kind query failed; \
returning primary results only",
);
Vec::new()
}
};
let mut seen: std::collections::HashSet<String> =
std::collections::HashSet::with_capacity(primary_results.len() + legacy_results.len());
let mut merged: Vec<SearchResult> =
Vec::with_capacity(primary_results.len() + legacy_results.len());
for r in primary_results.into_iter().chain(legacy_results) {
if seen.insert(r.item.kref.clone()) {
merged.push(r);
}
}
Ok(merged)
}
pub async fn create_skill(
&self,
project: &str,
name: &str,
metadata: HashMap<String, String>,
) -> Result<(ItemResponse, RevisionResponse)> {
self.ensure_space(project, "Skills").await.ok();
let space_path = format!("/{project}/Skills");
let item = self
.create_item(
&space_path,
name,
crate::skills::registration::SKILL_ITEM_KIND,
HashMap::new(),
)
.await?;
let revision = self.create_revision(&item.kref, metadata).await?;
Ok((item, revision))
}
pub async fn deprecate_skill(&self, kref: &str, deprecated: bool) -> Result<ItemResponse> {
self.deprecate_item(kref, deprecated).await
}
pub async fn create_bundle(
&self,
space_path: &str,
bundle_name: &str,
metadata: HashMap<String, String>,
) -> Result<ItemResponse> {
let body = CreateBundleBody {
space_path: space_path.to_string(),
bundle_name: bundle_name.to_string(),
metadata,
};
let resp = self
.send_no_retry(|| {
self.client
.post(self.url("/bundles"))
.header("X-Kumiho-Token", &self.service_token)
.json(&body)
})
.await?;
let resp = self.check_response(resp).await?;
resp.json::<ItemResponse>()
.await
.map_err(|e| KumihoError::Decode(e.to_string()))
}
pub async fn get_bundle(&self, kref: &str) -> Result<ItemResponse> {
let resp = self
.send_with_retry(|| {
self.client
.get(self.url("/bundles/by-kref"))
.header("X-Kumiho-Token", &self.service_token)
.query(&[("kref", kref)])
})
.await?;
let resp = self.check_response(resp).await?;
resp.json::<ItemResponse>()
.await
.map_err(|e| KumihoError::Decode(e.to_string()))
}
pub async fn delete_bundle(&self, kref: &str) -> Result<()> {
let resp = self
.send_no_retry(|| {
self.client
.delete(self.url("/bundles/by-kref"))
.header("X-Kumiho-Token", &self.service_token)
.query(&[("kref", kref), ("force", "true")])
})
.await?;
let _ = self.check_response(resp).await?;
Ok(())
}
pub async fn add_bundle_member(
&self,
bundle_kref: &str,
item_kref: &str,
metadata: HashMap<String, String>,
) -> Result<serde_json::Value> {
let body = BundleMemberBody {
bundle_kref: bundle_kref.to_string(),
item_kref: item_kref.to_string(),
metadata: if metadata.is_empty() {
None
} else {
Some(metadata)
},
};
let resp = self
.send_no_retry(|| {
self.client
.post(self.url("/bundles/members/add"))
.header("X-Kumiho-Token", &self.service_token)
.json(&body)
})
.await?;
let resp = self.check_response(resp).await?;
resp.json::<serde_json::Value>()
.await
.map_err(|e| KumihoError::Decode(e.to_string()))
}
pub async fn remove_bundle_member(
&self,
bundle_kref: &str,
item_kref: &str,
) -> Result<serde_json::Value> {
let body = RemoveBundleMemberBody {
bundle_kref: bundle_kref.to_string(),
item_kref: item_kref.to_string(),
};
let resp = self
.send_no_retry(|| {
self.client
.post(self.url("/bundles/members/remove"))
.header("X-Kumiho-Token", &self.service_token)
.json(&body)
})
.await?;
let resp = self.check_response(resp).await?;
resp.json::<serde_json::Value>()
.await
.map_err(|e| KumihoError::Decode(e.to_string()))
}
pub async fn list_bundle_members(&self, bundle_kref: &str) -> Result<BundleMembersResponse> {
let resp = self
.send_with_retry(|| {
self.client
.get(self.url("/bundles/members"))
.header("X-Kumiho-Token", &self.service_token)
.query(&[("bundle_kref", bundle_kref)])
})
.await?;
let resp = self.check_response(resp).await?;
resp.json::<BundleMembersResponse>()
.await
.map_err(|e| KumihoError::Decode(e.to_string()))
}
pub async fn create_edge(
&self,
source_kref: &str,
target_kref: &str,
edge_type: &str,
metadata: HashMap<String, String>,
) -> Result<EdgeResponse> {
let body = CreateEdgeBody {
source_revision_kref: source_kref.to_string(),
target_revision_kref: target_kref.to_string(),
edge_type: edge_type.to_string(),
metadata,
};
let resp = self
.send_no_retry(|| {
self.client
.post(self.url("/edges"))
.header("X-Kumiho-Token", &self.service_token)
.json(&body)
})
.await?;
let resp = self.check_response(resp).await?;
resp.json::<EdgeResponse>()
.await
.map_err(|e| KumihoError::Decode(e.to_string()))
}
pub async fn list_edges(
&self,
revision_kref: &str,
edge_type: Option<&str>,
direction: Option<&str>,
) -> Result<Vec<EdgeResponse>> {
let dir_num = direction.map(|d| match d {
"outgoing" | "out" => "0",
"incoming" | "in" => "1",
"both" => "2",
other => other, });
let mut query_params: Vec<(&str, &str)> = vec![("kref", revision_kref)];
if let Some(et) = edge_type {
query_params.push(("edge_type", et));
}
if let Some(dir) = dir_num.as_deref() {
query_params.push(("direction", dir));
}
let resp = self
.send_with_retry(|| {
self.client
.get(self.url("/edges"))
.header("X-Kumiho-Token", &self.service_token)
.query(&query_params)
})
.await?;
let resp = self.check_response(resp).await?;
resp.json::<Vec<EdgeResponse>>()
.await
.map_err(|e| KumihoError::Decode(e.to_string()))
}
pub async fn delete_edge(
&self,
source_kref: &str,
target_kref: &str,
edge_type: &str,
) -> Result<()> {
let resp = self
.send_no_retry(|| {
self.client
.delete(self.url("/edges"))
.header("X-Kumiho-Token", &self.service_token)
.query(&[
("source_kref", source_kref),
("target_kref", target_kref),
("edge_type", edge_type),
])
})
.await?;
let _ = self.check_response(resp).await?;
Ok(())
}
pub async fn create_artifact(
&self,
revision_kref: &str,
name: &str,
location: &str,
metadata: HashMap<String, String>,
) -> Result<ArtifactResponse> {
let body = CreateArtifactBody {
revision_kref: revision_kref.to_string(),
name: name.to_string(),
location: location.to_string(),
metadata,
};
let resp = self
.send_no_retry(|| {
self.client
.post(self.url("/artifacts"))
.header("X-Kumiho-Token", &self.service_token)
.json(&body)
})
.await?;
let resp = self.check_response(resp).await?;
resp.json::<ArtifactResponse>()
.await
.map_err(|e| KumihoError::Decode(e.to_string()))
}
pub async fn get_artifacts(&self, revision_kref: &str) -> Result<Vec<ArtifactResponse>> {
let resp = self
.send_with_retry(|| {
self.client
.get(self.url("/artifacts"))
.header("X-Kumiho-Token", &self.service_token)
.query(&[("revision_kref", revision_kref)])
})
.await?;
let resp = self.check_response(resp).await?;
resp.json::<Vec<ArtifactResponse>>()
.await
.map_err(|e| KumihoError::Decode(e.to_string()))
}
pub async fn get_artifact_by_name(
&self,
revision_kref: &str,
name: &str,
) -> Result<ArtifactResponse> {
let resp = self
.send_with_retry(|| {
self.client
.get(self.url("/artifacts/by-kref"))
.header("X-Kumiho-Token", &self.service_token)
.query(&[("revision_kref", revision_kref), ("name", name)])
})
.await?;
let resp = self.check_response(resp).await?;
resp.json::<ArtifactResponse>()
.await
.map_err(|e| KumihoError::Decode(e.to_string()))
}
pub async fn get_artifact(&self, artifact_kref: &str) -> Result<ArtifactResponse> {
let resp = self
.send_with_retry(|| {
self.client
.get(self.url("/artifacts/by-kref"))
.header("X-Kumiho-Token", &self.service_token)
.query(&[("kref", artifact_kref)])
})
.await?;
let resp = self.check_response(resp).await?;
resp.json::<ArtifactResponse>()
.await
.map_err(|e| KumihoError::Decode(e.to_string()))
}
pub async fn deprecate_artifact(
&self,
artifact_kref: &str,
deprecated: bool,
) -> Result<ArtifactResponse> {
let resp = self
.send_no_retry(|| {
self.client
.post(self.url("/artifacts/deprecate"))
.header("X-Kumiho-Token", &self.service_token)
.query(&[
("kref", artifact_kref),
("deprecated", if deprecated { "true" } else { "false" }),
])
})
.await?;
let resp = self.check_response(resp).await?;
resp.json::<ArtifactResponse>()
.await
.map_err(|e| KumihoError::Decode(e.to_string()))
}
pub async fn list_teams_in(
&self,
space_path: &str,
include_deprecated: bool,
) -> Result<Vec<ItemResponse>> {
self.list_items(space_path, include_deprecated).await
}
pub async fn deprecate_team(&self, kref: &str, deprecated: bool) -> Result<()> {
self.deprecate_item(kref, deprecated).await?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use wiremock::matchers::{method, path, query_param};
use wiremock::{Mock, MockServer, Respond, ResponseTemplate};
struct CountingResponder {
responses: Vec<ResponseTemplate>,
counter: Arc<AtomicUsize>,
}
impl Respond for CountingResponder {
fn respond(&self, _request: &wiremock::Request) -> ResponseTemplate {
let idx = self.counter.fetch_add(1, Ordering::SeqCst);
let last = self.responses.len() - 1;
self.responses[idx.min(last)].clone()
}
}
fn make_client(base_url: &str) -> KumihoClient {
KumihoClient::new(base_url.to_string(), "test-token".to_string())
}
#[tokio::test]
async fn deprecate_item_strips_revision_selectors_from_kref() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/api/v1/items/deprecate"))
.and(query_param("kref", "kref://Project/Skills/example.skill"))
.and(query_param("deprecated", "true"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"kref": "kref://Project/Skills/example.skill",
"name": "example",
"item_name": "example",
"kind": "skill",
"deprecated": true,
"created_at": null,
"metadata": {}
})))
.mount(&server)
.await;
let client = make_client(&server.uri());
let item = client
.deprecate_item("kref://Project/Skills/example.skill?r=2&a=SKILL.md", true)
.await
.expect("deprecate item should use the base item kref");
assert!(item.deprecated);
assert_eq!(item.kref, "kref://Project/Skills/example.skill");
}
#[tokio::test]
async fn deprecate_item_falls_back_to_soft_delete_when_upstream_set_deprecated_404s() {
let server = MockServer::start().await;
let item = serde_json::json!({
"kref": "kref://Project/Skills/example.skill",
"name": "example",
"item_name": "example",
"kind": "skill",
"deprecated": true,
"created_at": null,
"metadata": {}
});
Mock::given(method("POST"))
.and(path("/api/v1/items/deprecate"))
.and(query_param("kref", "kref://Project/Skills/example.skill"))
.and(query_param("deprecated", "true"))
.respond_with(ResponseTemplate::new(404).set_body_string("not found"))
.expect(1)
.mount(&server)
.await;
Mock::given(method("DELETE"))
.and(path("/api/v1/items/by-kref"))
.and(query_param("kref", "kref://Project/Skills/example.skill"))
.and(query_param("force", "false"))
.respond_with(ResponseTemplate::new(204))
.expect(1)
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/api/v1/items/by-kref"))
.and(query_param("kref", "kref://Project/Skills/example.skill"))
.respond_with(ResponseTemplate::new(200).set_body_json(item))
.expect(1)
.mount(&server)
.await;
let client = make_client(&server.uri());
let item = client
.deprecate_item("kref://Project/Skills/example.skill", true)
.await
.expect("deprecate item should fall back to soft delete");
assert!(item.deprecated);
}
#[tokio::test]
async fn retries_on_502_then_succeeds_on_third_attempt() {
let server = MockServer::start().await;
let counter = Arc::new(AtomicUsize::new(0));
Mock::given(method("GET"))
.and(path("/api/v1/spaces"))
.respond_with(CountingResponder {
responses: vec![
ResponseTemplate::new(502).set_body_string("<html>boom</html>"),
ResponseTemplate::new(502).set_body_string("<html>boom</html>"),
ResponseTemplate::new(200).set_body_json(serde_json::json!([])),
],
counter: counter.clone(),
})
.mount(&server)
.await;
let client = make_client(&server.uri());
let result = client.list_spaces("/foo", false).await;
assert!(result.is_ok(), "expected Ok after retries, got {result:?}");
assert_eq!(
counter.load(Ordering::SeqCst),
3,
"should have hit upstream 3x"
);
}
#[tokio::test]
async fn three_502s_returns_upstream_unavailable_not_api() {
let server = MockServer::start().await;
let counter = Arc::new(AtomicUsize::new(0));
Mock::given(method("GET"))
.and(path("/api/v1/spaces"))
.respond_with(CountingResponder {
responses: vec![
ResponseTemplate::new(502)
.insert_header("content-type", "text/html")
.set_body_string("<!DOCTYPE html><html>cloudflare</html>"),
],
counter: counter.clone(),
})
.mount(&server)
.await;
let client = make_client(&server.uri());
let err = client
.list_spaces("/foo", false)
.await
.expect_err("must fail after 3 attempts");
match err {
KumihoError::UpstreamUnavailable { status, attempts } => {
assert_eq!(status, 502);
assert_eq!(attempts, 3);
}
other => panic!("expected UpstreamUnavailable, got {other:?}"),
}
assert_eq!(counter.load(Ordering::SeqCst), 3);
}
#[tokio::test]
async fn non_retryable_4xx_returns_api_immediately() {
let server = MockServer::start().await;
let counter = Arc::new(AtomicUsize::new(0));
Mock::given(method("GET"))
.and(path("/api/v1/spaces"))
.respond_with(CountingResponder {
responses: vec![ResponseTemplate::new(400).set_body_string("bad request")],
counter: counter.clone(),
})
.mount(&server)
.await;
let client = make_client(&server.uri());
let err = client
.list_spaces("/foo", false)
.await
.expect_err("400 must surface");
match err {
KumihoError::Api { status, body } => {
assert_eq!(status, 400);
assert_eq!(body, "bad request");
}
other => panic!("expected Api, got {other:?}"),
}
assert_eq!(counter.load(Ordering::SeqCst), 1, "no retry on 4xx");
}
#[tokio::test]
async fn html_body_on_non_retryable_status_is_trimmed() {
let server = MockServer::start().await;
let big_html = format!(
"<!doctype html><html><body>{}</body></html>",
"padding ".repeat(200) );
Mock::given(method("GET"))
.and(path("/api/v1/spaces"))
.respond_with(
ResponseTemplate::new(404)
.insert_header("content-type", "text/html; charset=utf-8")
.set_body_string(big_html.clone()),
)
.mount(&server)
.await;
let client = make_client(&server.uri());
let err = client
.list_spaces("/foo", false)
.await
.expect_err("404 must surface");
match err {
KumihoError::Api { status, body } => {
assert_eq!(status, 404);
assert!(
!body.contains("<html") && !body.contains("<!doctype"),
"HTML body leaked into error: {body}"
);
assert!(
body.len() < 200,
"trimmed body should be a short placeholder, got {} bytes",
body.len()
);
}
other => panic!("expected Api, got {other:?}"),
}
}
#[tokio::test]
async fn connection_refused_eventually_returns_unreachable() {
let client = make_client("http://127.0.0.1:1"); let err = client
.list_spaces("/foo", false)
.await
.expect_err("connection must fail");
assert!(
matches!(err, KumihoError::Unreachable(_)),
"expected Unreachable, got {err:?}"
);
}
#[test]
fn is_retryable_status_covers_gateway_codes() {
for s in [502, 503, 504, 520, 522, 524] {
assert!(is_retryable_status(s), "{s} should retry");
}
for s in [200, 400, 401, 404, 409, 500, 501, 505] {
assert!(!is_retryable_status(s), "{s} should NOT retry");
}
}
#[test]
fn looks_like_html_body_detects_common_shapes() {
assert!(looks_like_html_body("<!DOCTYPE html><html>", None));
assert!(looks_like_html_body("<!doctype html>", None));
assert!(looks_like_html_body("<html><body>x</body></html>", None));
assert!(looks_like_html_body(" <HTML>", None));
assert!(looks_like_html_body(
"{\"ok\":true}",
Some("text/html; charset=utf-8")
));
assert!(!looks_like_html_body(
"{\"error\":\"x\"}",
Some("application/json")
));
assert!(!looks_like_html_body("plain text", None));
}
#[test]
fn item_kref_without_selectors_drops_revision_and_artifact_query() {
assert_eq!(
item_kref_without_selectors("kref://Project/Skills/example.skill?r=2&a=SKILL.md"),
"kref://Project/Skills/example.skill"
);
assert_eq!(
item_kref_without_selectors("kref://Project/Skills/example.skill"),
"kref://Project/Skills/example.skill"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn hung_upstream_respects_total_budget() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/api/v1/spaces"))
.respond_with(
ResponseTemplate::new(502)
.set_body_string("hang")
.set_delay(Duration::from_secs(10)),
)
.mount(&server)
.await;
let client = make_client(&server.uri());
let started = Instant::now();
let err = client
.list_spaces("/foo", false)
.await
.expect_err("hung upstream must fail");
let elapsed = started.elapsed();
assert!(
elapsed <= Duration::from_millis(15_500),
"retries blew past budget: elapsed={elapsed:?}"
);
assert!(
matches!(
err,
KumihoError::Unreachable(_) | KumihoError::UpstreamUnavailable { .. }
),
"expected Unreachable / UpstreamUnavailable, got {err:?}"
);
}
#[tokio::test]
async fn post_502_does_not_retry_returns_upstream_unavailable() {
let server = MockServer::start().await;
let counter = Arc::new(AtomicUsize::new(0));
Mock::given(method("POST"))
.and(path("/api/v1/items"))
.respond_with(CountingResponder {
responses: vec![
ResponseTemplate::new(502)
.insert_header("content-type", "text/html")
.set_body_string("<!DOCTYPE html><html>cloudflare</html>"),
],
counter: counter.clone(),
})
.mount(&server)
.await;
let client = make_client(&server.uri());
let err = client
.create_item("/foo", "item", "kind", HashMap::new())
.await
.expect_err("POST 502 must surface");
match err {
KumihoError::UpstreamUnavailable { status, attempts } => {
assert_eq!(status, 502);
assert_eq!(
attempts, 1,
"POST must not retry (idempotency-key not honoured by Kumiho)"
);
}
other => panic!("expected UpstreamUnavailable, got {other:?}"),
}
assert_eq!(
counter.load(Ordering::SeqCst),
1,
"POST must hit upstream exactly once"
);
}
#[tokio::test]
async fn get_502_retries_three_times() {
let server = MockServer::start().await;
let counter = Arc::new(AtomicUsize::new(0));
Mock::given(method("GET"))
.and(path("/api/v1/items"))
.respond_with(CountingResponder {
responses: vec![ResponseTemplate::new(502).set_body_string("<html>x</html>")],
counter: counter.clone(),
})
.mount(&server)
.await;
let client = make_client(&server.uri());
let _ = client
.list_items("/foo", false)
.await
.expect_err("3 retries");
assert_eq!(counter.load(Ordering::SeqCst), 3, "GET must retry 3x");
}
}