Skip to main content

brokkr_client/
wrapper.rs

1/*
2 * Copyright (c) 2025-2026 Dylan Storey
3 * Licensed under the Elastic License 2.0.
4 * See LICENSE file in the project root for full license text.
5 */
6
7//! Ergonomic wrapper around the progenitor-generated [`crate::Client`].
8//!
9//! The generated [`Client`] is a faithful 1:1 of the OpenAPI spec — every
10//! operation is a per-call builder, auth is supplied via the underlying
11//! `reqwest::Client`, and errors come back as `progenitor_client::Error<E>`.
12//! That surface is correct but verbose. This module adds:
13//!
14//! - A single-credential constructor that injects the `Authorization` header
15//!   on every request. Hides the fact that the spec declares three security
16//!   schemes (`admin_pak`, `agent_pak`, `generator_pak`) — they all map to
17//!   the same header and the broker disambiguates at runtime.
18//! - A typed [`BrokkrError`] that wraps the generated error enum and exposes
19//!   the `code` string from [`crate::types::ErrorResponse`] for pattern
20//!   matching.
21//! - A [`BrokkrClient::retry`] helper that re-invokes a fallible operation
22//!   with exponential backoff on transient failures. Retry is opt-in —
23//!   callers wrap individual ops they consider safe to retry.
24//!
25//! Pagination iterators are intentionally not provided: the v1 broker API
26//! returns full collections without cursor tokens (per the audit). If
27//! pagination is introduced later, add `Stream` adapters here.
28//!
29//! The module is intentionally small. If it grows past a few hundred lines,
30//! that is a signal to push complexity back into the spec rather than the
31//! wrapper.
32
33use std::time::Duration;
34
35use progenitor_client::Error as RawError;
36use reqwest::header::{AUTHORIZATION, HeaderMap, HeaderValue};
37
38use crate::Client;
39use crate::types::{
40    CreateDeploymentObjectRequest, DeploymentObject, ErrorResponse, K8sEventHistoryResponse,
41    NewStack, PodLogHistoryResponse, Stack, WsConnectionsResponse,
42};
43use chrono::{DateTime, Utc};
44use std::path::Path;
45use uuid::Uuid;
46
47/// Top-level error returned by every wrapper method. Mirrors
48/// [`progenitor_client::Error`] but specialises `E` to [`ErrorResponse`] so
49/// callers can match on [`ErrorResponse::code`] directly.
50#[derive(Debug)]
51pub enum BrokkrError {
52    /// A documented 4xx/5xx response body. Match on `.code` for stable
53    /// machine-readable error categorisation (e.g. `agent_not_found`).
54    Api(ErrorResponse, reqwest::StatusCode),
55    /// Local or transport error (DNS, TLS, timeout, connection reset, etc).
56    Transport(reqwest::Error),
57    /// Server returned a response shape that did not match the spec. Usually
58    /// a sign of spec drift; investigate with the raw bytes attached.
59    UnexpectedResponse {
60        status: Option<reqwest::StatusCode>,
61        detail: String,
62    },
63    /// Request rejected before transmission (bad input).
64    InvalidRequest(String),
65}
66
67impl BrokkrError {
68    /// HTTP status, when known.
69    pub fn status(&self) -> Option<reqwest::StatusCode> {
70        match self {
71            Self::Api(_, status) => Some(*status),
72            Self::Transport(e) => e.status(),
73            Self::UnexpectedResponse { status, .. } => *status,
74            Self::InvalidRequest(_) => None,
75        }
76    }
77
78    /// Stable, machine-readable error code from the wire response, if any.
79    /// Pattern-match on this rather than the human-readable message.
80    pub fn code(&self) -> Option<&str> {
81        match self {
82            Self::Api(body, _) => Some(&body.code),
83            _ => None,
84        }
85    }
86
87    /// Whether this error is appropriate to retry. Mirrors
88    /// [`progenitor_client::Error::is_retryable`]: transport errors and
89    /// 408/429/502/503/504 responses qualify.
90    pub fn is_retryable(&self) -> bool {
91        match self {
92            Self::Transport(_) => true,
93            Self::Api(_, status) => is_retryable_status(*status),
94            Self::UnexpectedResponse {
95                status: Some(status),
96                ..
97            } => is_retryable_status(*status),
98            _ => false,
99        }
100    }
101}
102
103impl std::fmt::Display for BrokkrError {
104    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
105        match self {
106            Self::Api(body, status) => {
107                write!(f, "{} {}: {}", status.as_u16(), body.code, body.message)
108            }
109            Self::Transport(e) => write!(f, "transport error: {e}"),
110            Self::UnexpectedResponse { status, detail } => match status {
111                Some(s) => write!(f, "unexpected response ({}): {}", s.as_u16(), detail),
112                None => write!(f, "unexpected response: {detail}"),
113            },
114            Self::InvalidRequest(msg) => write!(f, "invalid request: {msg}"),
115        }
116    }
117}
118
119impl std::error::Error for BrokkrError {}
120
121impl From<RawError<ErrorResponse>> for BrokkrError {
122    fn from(err: RawError<ErrorResponse>) -> Self {
123        match err {
124            RawError::ErrorResponse(rv) => {
125                let status = rv.status();
126                Self::Api(rv.into_inner(), status)
127            }
128            RawError::CommunicationError(e)
129            | RawError::InvalidUpgrade(e)
130            | RawError::ResponseBodyError(e) => Self::Transport(e),
131            RawError::InvalidRequest(msg) => Self::InvalidRequest(msg),
132            RawError::InvalidResponsePayload(bytes, e) => Self::UnexpectedResponse {
133                status: None,
134                detail: format!(
135                    "payload deserialization failed: {e} ({} bytes)",
136                    bytes.len()
137                ),
138            },
139            RawError::UnexpectedResponse(resp) => Self::UnexpectedResponse {
140                status: Some(resp.status()),
141                detail: "response not described in OpenAPI spec".to_string(),
142            },
143            RawError::Custom(s) => Self::InvalidRequest(s),
144        }
145    }
146}
147
148fn is_retryable_status(status: reqwest::StatusCode) -> bool {
149    matches!(status.as_u16(), 408 | 429 | 502 | 503 | 504)
150}
151
152/// Builder for [`BrokkrClient`]. Use [`BrokkrClient::builder`] to start.
153#[derive(Debug)]
154pub struct BrokkrClientBuilder {
155    base_url: String,
156    token: Option<String>,
157    request_timeout: Duration,
158    connect_timeout: Duration,
159    max_retries: u32,
160    initial_backoff: Duration,
161}
162
163impl BrokkrClientBuilder {
164    fn new(base_url: impl Into<String>) -> Self {
165        Self {
166            base_url: base_url.into(),
167            token: None,
168            request_timeout: Duration::from_secs(30),
169            connect_timeout: Duration::from_secs(10),
170            max_retries: 3,
171            initial_backoff: Duration::from_millis(200),
172        }
173    }
174
175    /// PAK credential (admin, agent, or generator). The wrapper sends this as
176    /// the `Authorization` header on every request; the broker inspects the
177    /// PAK prefix to determine which security scheme applies.
178    pub fn token(mut self, token: impl Into<String>) -> Self {
179        self.token = Some(token.into());
180        self
181    }
182
183    /// Total per-request timeout. Default: 30 seconds.
184    pub fn request_timeout(mut self, timeout: Duration) -> Self {
185        self.request_timeout = timeout;
186        self
187    }
188
189    /// TCP connect timeout. Default: 10 seconds.
190    pub fn connect_timeout(mut self, timeout: Duration) -> Self {
191        self.connect_timeout = timeout;
192        self
193    }
194
195    /// Maximum retry attempts for [`BrokkrClient::retry`]. Default: 3.
196    /// A value of 0 disables retry; the first attempt always runs.
197    pub fn max_retries(mut self, max: u32) -> Self {
198        self.max_retries = max;
199        self
200    }
201
202    /// Initial backoff between retry attempts. Doubles on each subsequent
203    /// failure (capped at 10s). Default: 200ms.
204    pub fn initial_backoff(mut self, initial: Duration) -> Self {
205        self.initial_backoff = initial;
206        self
207    }
208
209    pub fn build(self) -> Result<BrokkrClient, BrokkrError> {
210        let mut headers = HeaderMap::new();
211        if let Some(token) = &self.token {
212            let value = HeaderValue::from_str(token).map_err(|e| {
213                BrokkrError::InvalidRequest(format!("invalid token header value: {e}"))
214            })?;
215            headers.insert(AUTHORIZATION, value);
216        }
217
218        let reqwest_client = reqwest::Client::builder()
219            .default_headers(headers)
220            .connect_timeout(self.connect_timeout)
221            .timeout(self.request_timeout)
222            .build()
223            .map_err(BrokkrError::Transport)?;
224
225        let inner = Client::new_with_client(&self.base_url, reqwest_client);
226        Ok(BrokkrClient {
227            inner,
228            max_retries: self.max_retries,
229            initial_backoff: self.initial_backoff,
230        })
231    }
232}
233
234/// Ergonomic client for the Brokkr broker API.
235///
236/// Construct via [`BrokkrClient::builder`]. The wrapper holds a configured
237/// [`Client`] (the generated low-level client) and a retry policy. Access the
238/// generated builders through [`BrokkrClient::api`].
239#[derive(Debug, Clone)]
240pub struct BrokkrClient {
241    inner: Client,
242    max_retries: u32,
243    initial_backoff: Duration,
244}
245
246impl BrokkrClient {
247    /// Start building a client. `base_url` should include the version prefix
248    /// (e.g. `https://broker.example.com/api/v1`).
249    pub fn builder(base_url: impl Into<String>) -> BrokkrClientBuilder {
250        BrokkrClientBuilder::new(base_url)
251    }
252
253    /// Access the underlying generated client. Every spec operation is
254    /// available as a builder method on it: e.g.
255    /// `client.api().list_agents().send().await`.
256    pub fn api(&self) -> &Client {
257        &self.inner
258    }
259
260    // -------------------------------------------------------------------
261    // Ergonomic methods for the internal-WS-channel surface
262    // (BROKKR-I-0019). These wrap the generated builders so callers can
263    // skip the `.api().<op>().<param>().<param>().send().await` chain
264    // for the most common cases. The retention metadata is part of the
265    // typed response — callers should surface it (or at least not hide
266    // it) per ADR-0008 / project_log_retention_stance.
267    // -------------------------------------------------------------------
268
269    /// Paginated kube-event history for a stack, scoped to the 6h
270    /// retention window. The response carries the `retention` block —
271    /// surface it in any UI built on this SDK so users aren't surprised
272    /// by missing rows.
273    pub async fn list_telemetry_events(
274        &self,
275        stack_id: Uuid,
276        since: Option<DateTime<Utc>>,
277        limit: Option<i64>,
278    ) -> Result<K8sEventHistoryResponse, BrokkrError> {
279        let mut req = self.inner.list_telemetry_events().id(stack_id);
280        if let Some(since) = since {
281            req = req.since(since);
282        }
283        if let Some(limit) = limit {
284            req = req.limit(limit);
285        }
286        let resp = req.send().await?;
287        Ok(resp.into_inner())
288    }
289
290    /// Paginated pod-log history for a stack within the 6h retention
291    /// window. See [`Self::list_telemetry_events`] for retention
292    /// semantics — same response shape modulo the row type.
293    pub async fn list_telemetry_logs(
294        &self,
295        stack_id: Uuid,
296        since: Option<DateTime<Utc>>,
297        limit: Option<i64>,
298    ) -> Result<PodLogHistoryResponse, BrokkrError> {
299        let mut req = self.inner.list_telemetry_logs().id(stack_id);
300        if let Some(since) = since {
301            req = req.since(since);
302        }
303        if let Some(limit) = limit {
304            req = req.limit(limit);
305        }
306        let resp = req.send().await?;
307        Ok(resp.into_inner())
308    }
309
310    /// Snapshot of currently-connected agents on the internal WS
311    /// channel (admin-only). Useful for fleet diagnostics — for
312    /// continuous monitoring prefer scraping the
313    /// `brokkr_ws_connected_agents` Prometheus gauge.
314    pub async fn list_ws_connections(&self) -> Result<WsConnectionsResponse, BrokkrError> {
315        let resp = self.inner.list_ws_connections().send().await?;
316        Ok(resp.into_inner())
317    }
318
319    // -------------------------------------------------------------------
320    // Manifest submission helpers (BROKKR-I-0021). The control-plane
321    // friction is taking a folder of Kubernetes manifests and getting it
322    // submitted as a stack's desired state. These read the folder,
323    // concatenate it into one multi-document YAML stream, validate it, and
324    // submit — so callers hand over a directory, not a hand-built blob.
325    // "1 stack = 1 rendered bundle"; the latest deployment object is the
326    // stack's desired state, and the agent reconciles + prunes.
327    // -------------------------------------------------------------------
328
329    /// Read a folder (or file/list of files) of `*.yaml`/`*.yml` manifests,
330    /// concatenate them into one multi-document stream, validate that each
331    /// document parses and carries `apiVersion`+`kind`, and submit it as a
332    /// new deployment object on an existing stack.
333    ///
334    /// Files in a directory are concatenated in sorted-name order. Ordering
335    /// is forgiving: the agent front-loads `Namespace`/`CustomResourceDefinition`
336    /// objects during apply.
337    pub async fn submit_manifests(
338        &self,
339        stack_id: Uuid,
340        path: impl AsRef<Path>,
341    ) -> Result<DeploymentObject, BrokkrError> {
342        let yaml_content = read_manifests(path.as_ref())?;
343        let resp = self
344            .inner
345            .create_deployment_object()
346            .id(stack_id)
347            .body(CreateDeploymentObjectRequest {
348                yaml_content,
349                is_deletion_marker: Some(false),
350            })
351            .send()
352            .await?;
353        Ok(resp.into_inner())
354    }
355
356    /// Idempotently make a folder of manifests the desired state of the stack
357    /// named `stack_name`, creating the stack if it does not exist and
358    /// applying any `targeting` labels (for fan-out). A new revision is
359    /// submitted only when the bundle differs from the stack's current latest
360    /// deployment object, so this drops straight into a reconcile loop.
361    ///
362    /// Requires a generator PAK (the new stack is owned by that generator);
363    /// admin callers should create the stack explicitly and use
364    /// [`Self::submit_manifests`].
365    pub async fn apply(
366        &self,
367        stack_name: &str,
368        path: impl AsRef<Path>,
369        targeting: &[String],
370    ) -> Result<ApplyOutcome, BrokkrError> {
371        let yaml_content = read_manifests(path.as_ref())?;
372        let checksum = sha256_hex(&yaml_content);
373
374        // Resolve the caller's generator identity (needed to own a new stack).
375        let auth = self.inner.verify_pak().send().await?.into_inner();
376        let generator_id = auth
377            .generator
378            .ok_or_else(|| {
379                BrokkrError::InvalidRequest(
380                    "apply by name requires a generator PAK; admin callers should create the \
381                     stack explicitly and use submit_manifests"
382                        .to_string(),
383                )
384            })
385            .and_then(|g| {
386                Uuid::parse_str(&g).map_err(|e| BrokkrError::UnexpectedResponse {
387                    status: None,
388                    detail: format!("auth response generator id is not a UUID: {e}"),
389                })
390            })?;
391
392        // Find-or-create the stack by name.
393        let stacks: Vec<Stack> = self.inner.list_stacks().send().await?.into_inner();
394        let stack = match stacks.into_iter().find(|s| s.name == stack_name) {
395            Some(s) => s,
396            None => self
397                .inner
398                .create_stack()
399                .body(NewStack {
400                    name: stack_name.to_string(),
401                    generator_id,
402                    description: None,
403                })
404                .send()
405                .await?
406                .into_inner(),
407        };
408
409        // Apply targeting labels; a label that already exists is not an error.
410        for label in targeting {
411            if let Err(e) = self
412                .inner
413                .stacks_add_label()
414                .id(stack.id)
415                .body(label.clone())
416                .send()
417                .await
418            {
419                let err = BrokkrError::from(e);
420                if err.status() != Some(reqwest::StatusCode::CONFLICT) {
421                    return Err(err);
422                }
423            }
424        }
425
426        // Idempotency: skip submission when the latest bundle already matches.
427        let objects: Vec<DeploymentObject> = self
428            .inner
429            .list_deployment_objects()
430            .id(stack.id)
431            .send()
432            .await?
433            .into_inner();
434        let had_prior = !objects.is_empty();
435        let already_current = objects
436            .iter()
437            .max_by_key(|o| o.sequence_id)
438            .map(|latest| latest.yaml_checksum == checksum)
439            .unwrap_or(false);
440        if already_current {
441            return Ok(ApplyOutcome::Unchanged);
442        }
443
444        let object = self
445            .inner
446            .create_deployment_object()
447            .id(stack.id)
448            .body(CreateDeploymentObjectRequest {
449                yaml_content,
450                is_deletion_marker: Some(false),
451            })
452            .send()
453            .await?
454            .into_inner();
455
456        Ok(if had_prior {
457            ApplyOutcome::Updated(object)
458        } else {
459            ApplyOutcome::Created(object)
460        })
461    }
462
463    /// Run `op` with exponential backoff on retryable errors.
464    ///
465    /// The closure is invoked at most `max_retries + 1` times (configured via
466    /// [`BrokkrClientBuilder::max_retries`]). Between attempts, the wrapper
467    /// sleeps for `initial_backoff * 2^(attempt - 1)`, capped at 10 seconds.
468    /// Non-retryable errors return immediately on the first attempt.
469    ///
470    /// Callers are responsible for choosing safe operations to retry. POSTs
471    /// that are not idempotent should generally not be wrapped.
472    pub async fn retry<F, Fut, T>(&self, mut op: F) -> Result<T, BrokkrError>
473    where
474        F: FnMut(&Client) -> Fut,
475        Fut: std::future::Future<Output = Result<T, BrokkrError>>,
476    {
477        let mut attempt: u32 = 0;
478        loop {
479            match op(&self.inner).await {
480                Ok(value) => return Ok(value),
481                Err(err) if !err.is_retryable() || attempt >= self.max_retries => {
482                    return Err(err);
483                }
484                Err(_) => {
485                    let backoff = self
486                        .initial_backoff
487                        .saturating_mul(1u32 << attempt)
488                        .min(Duration::from_secs(10));
489                    tokio::time::sleep(backoff).await;
490                    attempt += 1;
491                }
492            }
493        }
494    }
495}
496
497/// Outcome of [`BrokkrClient::apply`].
498#[derive(Debug)]
499pub enum ApplyOutcome {
500    /// The stack had no prior deployment object; this bundle is its first.
501    Created(DeploymentObject),
502    /// A new revision was submitted (the bundle differed from the latest).
503    Updated(DeploymentObject),
504    /// The stack's latest bundle already matched; nothing was submitted.
505    Unchanged,
506}
507
508/// Read a manifest path into one validated multi-document YAML stream.
509///
510/// `path` may be a single file or a directory; for a directory, top-level
511/// `*.yaml`/`*.yml` files are concatenated in sorted-name order. Each
512/// document must parse and carry `apiVersion` and `kind`.
513fn read_manifests(path: &Path) -> Result<String, BrokkrError> {
514    let files = collect_manifest_files(path)?;
515    if files.is_empty() {
516        return Err(BrokkrError::InvalidRequest(format!(
517            "no .yaml/.yml manifests found in {}",
518            path.display()
519        )));
520    }
521    let mut parts: Vec<String> = Vec::with_capacity(files.len());
522    for file in &files {
523        let content = std::fs::read_to_string(file).map_err(|e| {
524            BrokkrError::InvalidRequest(format!("cannot read {}: {e}", file.display()))
525        })?;
526        validate_manifest_documents(&content, file)?;
527        parts.push(content.trim_end().to_string());
528    }
529    Ok(format!("{}\n", parts.join("\n---\n")))
530}
531
532/// Resolve a manifest path to the concrete list of files to read.
533fn collect_manifest_files(path: &Path) -> Result<Vec<std::path::PathBuf>, BrokkrError> {
534    if path.is_file() {
535        return Ok(vec![path.to_path_buf()]);
536    }
537    if !path.is_dir() {
538        return Err(BrokkrError::InvalidRequest(format!(
539            "path not found: {}",
540            path.display()
541        )));
542    }
543    let mut files: Vec<std::path::PathBuf> = std::fs::read_dir(path)
544        .map_err(|e| {
545            BrokkrError::InvalidRequest(format!("cannot read directory {}: {e}", path.display()))
546        })?
547        .filter_map(|entry| entry.ok().map(|e| e.path()))
548        .filter(|p| {
549            p.is_file()
550                && matches!(
551                    p.extension().and_then(|s| s.to_str()),
552                    Some("yaml") | Some("yml")
553                )
554        })
555        .collect();
556    files.sort();
557    Ok(files)
558}
559
560/// Validate that every non-empty document in `content` parses and carries
561/// `apiVersion` and `kind`.
562fn validate_manifest_documents(content: &str, file: &Path) -> Result<(), BrokkrError> {
563    use serde::Deserialize;
564    for doc in serde_yaml::Deserializer::from_str(content) {
565        let value = serde_yaml::Value::deserialize(doc).map_err(|e| {
566            BrokkrError::InvalidRequest(format!("{}: invalid YAML: {e}", file.display()))
567        })?;
568        if value.is_null() {
569            continue;
570        }
571        let has = |key: &str| value.get(key).and_then(|v| v.as_str()).is_some();
572        if !has("apiVersion") || !has("kind") {
573            return Err(BrokkrError::InvalidRequest(format!(
574                "{}: every manifest document must have apiVersion and kind",
575                file.display()
576            )));
577        }
578    }
579    Ok(())
580}
581
582/// Lowercase hex SHA-256, matching the broker's deployment-object checksum so
583/// [`BrokkrClient::apply`] can detect an unchanged bundle.
584fn sha256_hex(content: &str) -> String {
585    use sha2::{Digest, Sha256};
586    let mut hasher = Sha256::new();
587    hasher.update(content.as_bytes());
588    format!("{:x}", hasher.finalize())
589}
590
591#[cfg(test)]
592mod tests {
593    use super::*;
594
595    #[test]
596    fn builder_constructs_without_token() {
597        use progenitor_client::ClientInfo;
598        let c = BrokkrClient::builder("http://localhost:3000/api/v1")
599            .build()
600            .expect("builder should succeed");
601        assert_eq!(c.api().baseurl(), "http://localhost:3000/api/v1");
602    }
603
604    #[test]
605    fn builder_accepts_token_and_timeouts() {
606        let c = BrokkrClient::builder("http://localhost:3000/api/v1")
607            .token("bk_admin_test_token")
608            .request_timeout(Duration::from_secs(5))
609            .connect_timeout(Duration::from_secs(2))
610            .max_retries(5)
611            .initial_backoff(Duration::from_millis(50))
612            .build()
613            .expect("builder should succeed");
614        assert_eq!(c.max_retries, 5);
615        assert_eq!(c.initial_backoff, Duration::from_millis(50));
616    }
617
618    #[test]
619    fn invalid_token_header_is_rejected() {
620        let result = BrokkrClient::builder("http://localhost:3000/api/v1")
621            .token("invalid\nheader\rvalue")
622            .build();
623        assert!(matches!(result, Err(BrokkrError::InvalidRequest(_))));
624    }
625
626    #[test]
627    fn error_code_extracted_from_api_response() {
628        let err = BrokkrError::Api(
629            ErrorResponse {
630                code: "agent_not_found".to_string(),
631                message: "agent not found".to_string(),
632                details: None,
633            },
634            reqwest::StatusCode::NOT_FOUND,
635        );
636        assert_eq!(err.code(), Some("agent_not_found"));
637        assert_eq!(err.status(), Some(reqwest::StatusCode::NOT_FOUND));
638        assert!(!err.is_retryable());
639    }
640
641    #[test]
642    fn retryable_classification() {
643        for status in [408u16, 429, 502, 503, 504] {
644            let err = BrokkrError::Api(
645                ErrorResponse {
646                    code: "transient".to_string(),
647                    message: "x".to_string(),
648                    details: None,
649                },
650                reqwest::StatusCode::from_u16(status).unwrap(),
651            );
652            assert!(err.is_retryable(), "{status} should be retryable");
653        }
654        for status in [400u16, 401, 403, 404, 409, 422, 500, 501] {
655            let err = BrokkrError::Api(
656                ErrorResponse {
657                    code: "non_transient".to_string(),
658                    message: "x".to_string(),
659                    details: None,
660                },
661                reqwest::StatusCode::from_u16(status).unwrap(),
662            );
663            assert!(!err.is_retryable(), "{status} should NOT be retryable");
664        }
665    }
666
667    #[tokio::test(start_paused = true)]
668    async fn retry_stops_after_max_attempts() {
669        let client = BrokkrClient::builder("http://localhost:3000/api/v1")
670            .max_retries(2)
671            .initial_backoff(Duration::from_millis(1))
672            .build()
673            .unwrap();
674        let calls = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
675        let calls_clone = calls.clone();
676        let result: Result<(), BrokkrError> = client
677            .retry(|_| {
678                let calls = calls_clone.clone();
679                async move {
680                    calls.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
681                    // A retryable error per is_retryable_status (503).
682                    Err(BrokkrError::Api(
683                        ErrorResponse {
684                            code: "transient".to_string(),
685                            message: "service unavailable".to_string(),
686                            details: None,
687                        },
688                        reqwest::StatusCode::SERVICE_UNAVAILABLE,
689                    ))
690                }
691            })
692            .await;
693        assert!(result.is_err());
694        // Initial attempt + 2 retries = 3 calls total.
695        assert_eq!(calls.load(std::sync::atomic::Ordering::SeqCst), 3);
696    }
697
698    // -----------------------------------------------------------------
699    // WS-10 / WS-13 ergonomic-method surface (BROKKR-I-0019).
700    //
701    // We don't run a real broker here; the contract under test is that
702    // the method exists with the right signature and returns the
703    // declared response type. End-to-end coverage is in
704    // `crates/brokkr-broker/tests/integration/api/ws.rs`.
705    // -----------------------------------------------------------------
706    #[test]
707    fn ws_wrapper_methods_compile_with_expected_signatures() {
708        fn _assert_signatures() {
709            async fn _types_check() {
710                let c = BrokkrClient::builder("http://localhost:3000/api/v1")
711                    .build()
712                    .unwrap();
713                let id = uuid::Uuid::nil();
714                let _ev: K8sEventHistoryResponse =
715                    c.list_telemetry_events(id, None, None).await.unwrap();
716                let _lo: PodLogHistoryResponse =
717                    c.list_telemetry_logs(id, None, Some(100)).await.unwrap();
718                let _co: WsConnectionsResponse = c.list_ws_connections().await.unwrap();
719            }
720            let _ = _types_check;
721        }
722    }
723
724    #[tokio::test(start_paused = true)]
725    async fn retry_returns_immediately_on_non_retryable() {
726        let client = BrokkrClient::builder("http://localhost:3000/api/v1")
727            .max_retries(5)
728            .build()
729            .unwrap();
730        let calls = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
731        let calls_clone = calls.clone();
732        let result: Result<(), BrokkrError> = client
733            .retry(|_| {
734                let calls = calls_clone.clone();
735                async move {
736                    calls.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
737                    Err(BrokkrError::Api(
738                        ErrorResponse {
739                            code: "agent_not_found".to_string(),
740                            message: "x".to_string(),
741                            details: None,
742                        },
743                        reqwest::StatusCode::NOT_FOUND,
744                    ))
745                }
746            })
747            .await;
748        assert!(result.is_err());
749        assert_eq!(calls.load(std::sync::atomic::Ordering::SeqCst), 1);
750    }
751
752    // --- BROKKR-T-0195: manifest folder helpers ---
753
754    fn write(dir: &std::path::Path, name: &str, content: &str) {
755        std::fs::write(dir.join(name), content).unwrap();
756    }
757
758    #[test]
759    fn read_manifests_concatenates_folder_in_sorted_order() {
760        let dir = tempfile::tempdir().unwrap();
761        // intentionally out of order on disk; sorted by name on read
762        write(dir.path(), "02-deploy.yaml", "apiVersion: apps/v1\nkind: Deployment\nmetadata:\n  name: d\n");
763        write(dir.path(), "01-namespace.yaml", "apiVersion: v1\nkind: Namespace\nmetadata:\n  name: ns\n");
764        write(dir.path(), "notes.txt", "ignored");
765        let stream = read_manifests(dir.path()).unwrap();
766        let ns_at = stream.find("kind: Namespace").unwrap();
767        let dep_at = stream.find("kind: Deployment").unwrap();
768        assert!(ns_at < dep_at, "01-namespace should come before 02-deploy");
769        assert!(stream.contains("\n---\n"), "documents joined with a separator");
770        assert!(!stream.contains("ignored"), "non-yaml files are skipped");
771    }
772
773    #[test]
774    fn read_manifests_accepts_single_file_and_multidoc() {
775        let dir = tempfile::tempdir().unwrap();
776        write(dir.path(), "all.yaml", "apiVersion: v1\nkind: Namespace\nmetadata:\n  name: a\n---\napiVersion: v1\nkind: ConfigMap\nmetadata:\n  name: b\n");
777        let stream = read_manifests(&dir.path().join("all.yaml")).unwrap();
778        assert!(stream.contains("kind: Namespace") && stream.contains("kind: ConfigMap"));
779    }
780
781    #[test]
782    fn read_manifests_rejects_missing_apiversion_or_kind() {
783        let dir = tempfile::tempdir().unwrap();
784        write(dir.path(), "bad.yaml", "kind: ConfigMap\nmetadata:\n  name: x\n");
785        let err = read_manifests(dir.path()).unwrap_err();
786        assert!(matches!(err, BrokkrError::InvalidRequest(_)), "got {err:?}");
787    }
788
789    #[test]
790    fn read_manifests_rejects_malformed_yaml() {
791        let dir = tempfile::tempdir().unwrap();
792        write(dir.path(), "bad.yaml", "kind: : : [unbalanced");
793        assert!(read_manifests(dir.path()).is_err());
794    }
795
796    #[test]
797    fn read_manifests_errors_on_empty_dir_and_missing_path() {
798        let dir = tempfile::tempdir().unwrap();
799        assert!(read_manifests(dir.path()).is_err(), "empty dir");
800        assert!(read_manifests(&dir.path().join("nope")).is_err(), "missing path");
801    }
802
803    #[test]
804    fn sha256_hex_is_stable_and_matches_known_vector() {
805        // Matches the broker's `format!("{:x}", Sha256::digest(...))`.
806        assert_eq!(
807            sha256_hex(""),
808            "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
809        );
810        let a = "apiVersion: v1\nkind: ConfigMap\n";
811        assert_eq!(sha256_hex(a), sha256_hex(a));
812        assert_ne!(sha256_hex(a), sha256_hex("apiVersion: v1\nkind: Secret\n"));
813    }
814
815}