Skip to main content

k2db_api_client/
client.rs

1// SPDX-FileCopyrightText: 2026 Alexander R. Croft
2// SPDX-License-Identifier: GPL-3.0-only
3
4use std::collections::HashMap;
5use std::sync::{Arc, Mutex};
6use std::time::{Duration, Instant};
7
8use k2db_api_contract::{
9    AggregateRequest, CountRequest, CountResult, CreateIndexesRequest, CreateResult, HealthOk,
10    MessageResponse, PatchCollectionRequest, ProblemDetailsPayload, ReadyNotOk, ReadyOk,
11    RestoreRequest, RestoreResult, SearchRequest, UpdateResult, VersionInfo,
12    VersionedUpdateRequest, VersionedUpdateResult,
13};
14use reqwest::header::{ACCEPT, AUTHORIZATION, CONTENT_TYPE, HeaderMap, HeaderName, HeaderValue};
15use reqwest::{Method, StatusCode};
16use serde::{Serialize, de::DeserializeOwned};
17use serde_json::Value;
18use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore};
19
20use crate::error::K2DbApiClientError;
21
22#[derive(Debug, Clone)]
23pub struct K2DbApiClientOptions {
24    pub base_url: String,
25    pub api_key: Option<String>,
26    pub headers: Vec<(String, String)>,
27    pub read_cache_ttl_ms: Option<u64>,
28    pub max_concurrent_requests: Option<usize>,
29}
30
31#[derive(Debug, Clone, Default)]
32pub struct RequestOptions {
33    pub api_key: Option<String>,
34    pub scope: Option<String>,
35    pub headers: Vec<(String, String)>,
36    pub cache_ttl_ms: Option<u64>,
37}
38
39#[derive(Debug, Clone)]
40pub struct K2DbApiClient {
41    base_url: String,
42    api_key: Option<String>,
43    default_headers: HeaderMap,
44    http: reqwest::Client,
45    read_cache_ttl_ms: u64,
46    cache: Arc<Mutex<HashMap<String, CacheEntry>>>,
47    inflight: Arc<Mutex<HashMap<String, Arc<InFlightRequest>>>>,
48    semaphore: Option<Arc<Semaphore>>,
49}
50
51#[derive(Debug, Clone)]
52struct CacheEntry {
53    expires_at: Instant,
54    value: Value,
55}
56
57#[derive(Debug)]
58struct InFlightRequest {
59    notify: Notify,
60    result: Mutex<Option<Result<Value, SharedClientError>>>,
61}
62
63#[derive(Debug, Clone)]
64enum SharedClientError {
65    Transport(String),
66    Problem(ProblemDetailsPayload),
67    Configuration(String),
68    Serialization(String),
69}
70
71impl From<SharedClientError> for K2DbApiClientError {
72    fn from(value: SharedClientError) -> Self {
73        match value {
74            SharedClientError::Transport(message) => Self::Transport(message),
75            SharedClientError::Problem(problem) => Self::Problem(problem),
76            SharedClientError::Configuration(message) => Self::Configuration(message),
77            SharedClientError::Serialization(message) => Self::Serialization(message),
78        }
79    }
80}
81
82impl InFlightRequest {
83    fn new() -> Self {
84        Self {
85            notify: Notify::new(),
86            result: Mutex::new(None),
87        }
88    }
89
90    fn finish(&self, result: Result<Value, SharedClientError>) {
91        *self.result.lock().expect("inflight result lock") = Some(result);
92        self.notify.notify_waiters();
93    }
94
95    async fn wait(&self) -> Result<Value, SharedClientError> {
96        loop {
97            if let Some(result) = self.result.lock().expect("inflight result lock clone").clone() {
98                return result;
99            }
100            self.notify.notified().await;
101        }
102    }
103}
104
105impl K2DbApiClient {
106    pub fn new(options: K2DbApiClientOptions) -> Result<Self, K2DbApiClientError> {
107        let base_url = options.base_url.trim_end_matches('/').to_owned();
108        if base_url.is_empty() {
109            return Err(K2DbApiClientError::Configuration(
110                "base_url is required".to_owned(),
111            ));
112        }
113
114        let mut default_headers = HeaderMap::new();
115        default_headers.insert(ACCEPT, HeaderValue::from_static("application/json"));
116
117        for (name, value) in options.headers {
118            let header_name = HeaderName::try_from(name.as_str()).map_err(|_| {
119                K2DbApiClientError::Configuration(format!("invalid header name: {name}"))
120            })?;
121            let header_value = HeaderValue::try_from(value.as_str()).map_err(|_| {
122                K2DbApiClientError::Configuration(format!("invalid header value for {name}"))
123            })?;
124            default_headers.insert(header_name, header_value);
125        }
126
127        let read_cache_ttl_ms = options.read_cache_ttl_ms.unwrap_or(60_000);
128        let max_concurrent_requests = options.max_concurrent_requests.unwrap_or(0);
129
130        Ok(Self {
131            base_url,
132            api_key: options.api_key,
133            default_headers,
134            http: reqwest::Client::new(),
135            read_cache_ttl_ms,
136            cache: Arc::new(Mutex::new(HashMap::new())),
137            inflight: Arc::new(Mutex::new(HashMap::new())),
138            semaphore: (max_concurrent_requests > 0)
139                .then(|| Arc::new(Semaphore::new(max_concurrent_requests))),
140        })
141    }
142
143    pub async fn health(&self) -> Result<HealthOk, K2DbApiClientError> {
144        self.get_json("/health", &RequestOptions::default(), false).await
145    }
146
147    pub async fn ready(&self) -> Result<Result<ReadyOk, ReadyNotOk>, K2DbApiClientError> {
148        let (status, payload) = self
149            .request_value_with_status(
150                Method::GET,
151                "/ready",
152                &RequestOptions::default(),
153                Option::<&()>::None,
154                None,
155                false,
156            )
157            .await?;
158
159        match status {
160            status if status.is_success() => Ok(Ok(from_value(payload)?)),
161            StatusCode::SERVICE_UNAVAILABLE => Ok(Err(from_value(payload)?)),
162            _ => Err(error_from_status_payload(status, payload)),
163        }
164    }
165
166    pub async fn create<T: Serialize>(
167        &self,
168        collection: &str,
169        document: &T,
170        options: &RequestOptions,
171    ) -> Result<CreateResult, K2DbApiClientError> {
172        let result = self
173            .request_json(
174                Method::POST,
175                &format!("/v1/{}", encode_segment(collection)),
176                options,
177                Some(document),
178                None,
179                true,
180            )
181            .await?;
182        self.invalidate_cache();
183        Ok(result)
184    }
185
186    pub async fn get_by_id<T: DeserializeOwned>(
187        &self,
188        collection: &str,
189        id: &str,
190        options: &RequestOptions,
191    ) -> Result<T, K2DbApiClientError> {
192        self.request_json(
193            Method::GET,
194            &format!("/v1/{}/{}", encode_segment(collection), encode_segment(id)),
195            options,
196            Option::<&()>::None,
197            None,
198            true,
199        )
200        .await
201    }
202
203    pub async fn patch_by_id<T: Serialize>(
204        &self,
205        collection: &str,
206        id: &str,
207        document: &T,
208        options: &RequestOptions,
209    ) -> Result<UpdateResult, K2DbApiClientError> {
210        let result = self
211            .request_json(
212                Method::PATCH,
213                &format!("/v1/{}/{}", encode_segment(collection), encode_segment(id)),
214                options,
215                Some(document),
216                None,
217                true,
218            )
219            .await?;
220        self.invalidate_cache();
221        Ok(result)
222    }
223
224    pub async fn delete_by_id(
225        &self,
226        collection: &str,
227        id: &str,
228        options: &RequestOptions,
229    ) -> Result<(), K2DbApiClientError> {
230        self.request_empty(
231            Method::DELETE,
232            &format!("/v1/{}/{}", encode_segment(collection), encode_segment(id)),
233            options,
234            Option::<&()>::None,
235            None,
236            true,
237        )
238        .await?;
239        self.invalidate_cache();
240        Ok(())
241    }
242
243    pub async fn patch_collection(
244        &self,
245        collection: &str,
246        payload: &PatchCollectionRequest,
247        options: &RequestOptions,
248    ) -> Result<UpdateResult, K2DbApiClientError> {
249        let result = self
250            .request_json(
251                Method::PATCH,
252                &format!("/v1/{}", encode_segment(collection)),
253                options,
254                Some(payload),
255                None,
256                true,
257            )
258            .await?;
259        self.invalidate_cache();
260        Ok(result)
261    }
262
263    pub async fn search<T: DeserializeOwned>(
264        &self,
265        collection: &str,
266        payload: &SearchRequest,
267        options: &RequestOptions,
268    ) -> Result<Vec<T>, K2DbApiClientError> {
269        self.request_json(
270            Method::POST,
271            &format!("/v1/{}/search", encode_segment(collection)),
272            options,
273            Some(payload),
274            None,
275            true,
276        )
277        .await
278    }
279
280    pub async fn aggregate<T: DeserializeOwned>(
281        &self,
282        collection: &str,
283        payload: &AggregateRequest,
284        options: &RequestOptions,
285    ) -> Result<Vec<T>, K2DbApiClientError> {
286        self.request_json(
287            Method::POST,
288            &format!("/v1/{}/aggregate", encode_segment(collection)),
289            options,
290            Some(payload),
291            None,
292            true,
293        )
294        .await
295    }
296
297    pub async fn count(
298        &self,
299        collection: &str,
300        payload: &CountRequest,
301        options: &RequestOptions,
302    ) -> Result<CountResult, K2DbApiClientError> {
303        self.request_json(
304            Method::POST,
305            &format!("/v1/{}/count", encode_segment(collection)),
306            options,
307            Some(payload),
308            None,
309            true,
310        )
311        .await
312    }
313
314    pub async fn restore(
315        &self,
316        collection: &str,
317        payload: &RestoreRequest,
318        options: &RequestOptions,
319    ) -> Result<RestoreResult, K2DbApiClientError> {
320        let result = self
321            .request_json(
322                Method::POST,
323                &format!("/v1/{}/restore", encode_segment(collection)),
324                options,
325                Some(payload),
326                None,
327                true,
328            )
329            .await?;
330        self.invalidate_cache();
331        Ok(result)
332    }
333
334    pub async fn get_versions(
335        &self,
336        collection: &str,
337        id: &str,
338        skip: Option<u64>,
339        limit: Option<u64>,
340        options: &RequestOptions,
341    ) -> Result<Vec<VersionInfo>, K2DbApiClientError> {
342        let query = query_pairs(skip, limit);
343        self.request_json(
344            Method::GET,
345            &format!("/v1/{}/{}/versions", encode_segment(collection), encode_segment(id)),
346            options,
347            Option::<&()>::None,
348            Some(&query),
349            true,
350        )
351        .await
352    }
353
354    pub async fn patch_versions(
355        &self,
356        collection: &str,
357        id: &str,
358        payload: &VersionedUpdateRequest,
359        options: &RequestOptions,
360    ) -> Result<Vec<VersionedUpdateResult>, K2DbApiClientError> {
361        let result = self
362            .request_json(
363                Method::PATCH,
364                &format!("/v1/{}/{}/versions", encode_segment(collection), encode_segment(id)),
365                options,
366                Some(payload),
367                None,
368                true,
369            )
370            .await?;
371        self.invalidate_cache();
372        Ok(result)
373    }
374
375    pub async fn revert_version(
376        &self,
377        collection: &str,
378        id: &str,
379        version: u64,
380        options: &RequestOptions,
381    ) -> Result<UpdateResult, K2DbApiClientError> {
382        let result = self
383            .request_json(
384                Method::POST,
385                &format!(
386                    "/v1/{}/{}/versions/{}/revert",
387                    encode_segment(collection),
388                    encode_segment(id),
389                    version
390                ),
391                options,
392                Option::<&()>::None,
393                None,
394                true,
395            )
396            .await?;
397        self.invalidate_cache();
398        Ok(result)
399    }
400
401    pub async fn admin_delete_collection(
402        &self,
403        collection: &str,
404        options: &RequestOptions,
405    ) -> Result<(), K2DbApiClientError> {
406        self.request_empty(
407            Method::DELETE,
408            &format!("/v1/admin/{}", encode_segment(collection)),
409            options,
410            Option::<&()>::None,
411            None,
412            true,
413        )
414        .await?;
415        self.invalidate_cache();
416        Ok(())
417    }
418
419    pub async fn admin_delete_by_id(
420        &self,
421        collection: &str,
422        id: &str,
423        options: &RequestOptions,
424    ) -> Result<(), K2DbApiClientError> {
425        self.request_empty(
426            Method::DELETE,
427            &format!("/v1/admin/{}/{}", encode_segment(collection), encode_segment(id)),
428            options,
429            Option::<&()>::None,
430            None,
431            true,
432        )
433        .await?;
434        self.invalidate_cache();
435        Ok(())
436    }
437
438    pub async fn admin_create_indexes(
439        &self,
440        collection: &str,
441        payload: &CreateIndexesRequest,
442        options: &RequestOptions,
443    ) -> Result<MessageResponse, K2DbApiClientError> {
444        let result = self
445            .request_json(
446                Method::POST,
447                &format!("/v1/admin/{}/indexes", encode_segment(collection)),
448                options,
449                Some(payload),
450                None,
451                true,
452            )
453            .await?;
454        self.invalidate_cache();
455        Ok(result)
456    }
457
458    pub async fn admin_create_history_indexes(
459        &self,
460        collection: &str,
461        options: &RequestOptions,
462    ) -> Result<MessageResponse, K2DbApiClientError> {
463        let result = self
464            .request_json(
465                Method::POST,
466                &format!("/v1/admin/{}/history-indexes", encode_segment(collection)),
467                options,
468                Some(&serde_json::json!({})),
469                None,
470                true,
471            )
472            .await?;
473        self.invalidate_cache();
474        Ok(result)
475    }
476
477    async fn get_json<T: DeserializeOwned>(
478        &self,
479        path: &str,
480        options: &RequestOptions,
481        include_auth: bool,
482    ) -> Result<T, K2DbApiClientError> {
483        let value = self
484            .request_value(
485                Method::GET,
486                path,
487                options,
488                Option::<&()>::None,
489                None,
490                include_auth,
491            )
492            .await?;
493        from_value(value)
494    }
495
496    async fn request_json<T, B>(
497        &self,
498        method: Method,
499        path: &str,
500        options: &RequestOptions,
501        body: Option<&B>,
502        query: Option<&[(String, String)]>,
503        include_auth: bool,
504    ) -> Result<T, K2DbApiClientError>
505    where
506        T: DeserializeOwned,
507        B: Serialize + ?Sized,
508    {
509        let value = self
510            .request_value(method, path, options, body, query, include_auth)
511            .await?;
512        from_value(value)
513    }
514
515    async fn request_empty<B>(
516        &self,
517        method: Method,
518        path: &str,
519        options: &RequestOptions,
520        body: Option<&B>,
521        query: Option<&[(String, String)]>,
522        include_auth: bool,
523    ) -> Result<(), K2DbApiClientError>
524    where
525        B: Serialize + ?Sized,
526    {
527        let url = self.url(path, query);
528        let headers = self.headers(options, include_auth, body.is_some())?;
529        let body_bytes = self.serialize_body_bytes(body)?;
530        let (status, payload) = self
531            .perform_request(method, url, headers, body_bytes)
532            .await
533            .map_err(K2DbApiClientError::from)?;
534
535        if status.is_success() {
536            return Ok(());
537        }
538        Err(error_from_status_payload(status, payload))
539    }
540
541    async fn request_value<B>(
542        &self,
543        method: Method,
544        path: &str,
545        options: &RequestOptions,
546        body: Option<&B>,
547        query: Option<&[(String, String)]>,
548        include_auth: bool,
549    ) -> Result<Value, K2DbApiClientError>
550    where
551        B: Serialize + ?Sized,
552    {
553        let (status, payload) = self
554            .request_value_with_status(method, path, options, body, query, include_auth)
555            .await?;
556
557        if status.is_success() {
558            return Ok(payload);
559        }
560
561        Err(error_from_status_payload(status, payload))
562    }
563
564    async fn request_value_with_status<B>(
565        &self,
566        method: Method,
567        path: &str,
568        options: &RequestOptions,
569        body: Option<&B>,
570        query: Option<&[(String, String)]>,
571        include_auth: bool,
572    ) -> Result<(StatusCode, Value), K2DbApiClientError>
573    where
574        B: Serialize + ?Sized,
575    {
576        let url = self.url(path, query);
577        let headers = self.headers(options, include_auth, body.is_some())?;
578        let body_value = self.serialize_body_value(body)?;
579        let body_bytes = self.serialize_body_bytes(body)?;
580
581        let ttl_ms = self.cache_ttl_ms(method.as_str(), path, options);
582        if ttl_ms > 0 {
583            let signature = self.request_signature(method.as_str(), &url, &headers, body_value.as_ref())?;
584            if let Some(value) = self.cache_get(&signature) {
585                return Ok((StatusCode::OK, value));
586            }
587
588            let (request, owner) = {
589                let mut inflight = self.inflight.lock().expect("inflight map lock");
590                if let Some(existing) = inflight.get(&signature).cloned() {
591                    (existing, false)
592                } else {
593                    let created = Arc::new(InFlightRequest::new());
594                    inflight.insert(signature.clone(), created.clone());
595                    (created, true)
596                }
597            };
598
599            if owner {
600                let result = self
601                    .perform_request(method, url, headers, body_bytes)
602                    .await;
603                let shared = result
604                    .map_err(SharedClientError::from)
605                    .and_then(|(status, payload)| {
606                        if status.is_success() {
607                            Ok(payload)
608                        } else {
609                            Err(SharedClientError::from(error_from_status_payload(status, payload)))
610                        }
611                    });
612
613                if let Ok(value) = &shared {
614                    self.cache_set(&signature, ttl_ms, value.clone());
615                }
616
617                request.finish(shared.clone());
618                self.inflight
619                    .lock()
620                    .expect("inflight map lock remove")
621                    .remove(&signature);
622
623                return shared.map(|value| (StatusCode::OK, value)).map_err(K2DbApiClientError::from);
624            }
625
626            return request
627                .wait()
628                .await
629                .map(|value| (StatusCode::OK, value))
630                .map_err(K2DbApiClientError::from);
631        }
632
633        self.perform_request(method, url, headers, body_bytes)
634            .await
635            .map_err(K2DbApiClientError::from)
636    }
637
638    async fn perform_request(
639        &self,
640        method: Method,
641        url: String,
642        headers: HeaderMap,
643        body: Option<Vec<u8>>,
644    ) -> Result<(StatusCode, Value), SharedClientError> {
645        let mut request = self.http.request(method, url).headers(headers);
646        if let Some(body) = body {
647            request = request.body(body);
648        }
649
650        let _permit = self.acquire_request_slot().await?;
651        let response = request
652            .send()
653            .await
654            .map_err(|error| SharedClientError::Transport(error.to_string()))?;
655        let status = response.status();
656
657        if status == StatusCode::NO_CONTENT {
658            return Ok((status, Value::Null));
659        }
660
661        let content_type = response
662            .headers()
663            .get(reqwest::header::CONTENT_TYPE)
664            .and_then(|value| value.to_str().ok())
665            .unwrap_or_default()
666            .to_owned();
667        let text = response
668            .text()
669            .await
670            .map_err(|error| SharedClientError::Transport(error.to_string()))?;
671
672        if content_type.contains("application/json") || content_type.contains("application/problem+json") {
673            let payload = serde_json::from_str::<Value>(&text)
674                .map_err(|error| SharedClientError::Serialization(error.to_string()))?;
675            return Ok((status, payload));
676        }
677
678        Ok((status, Value::String(text)))
679    }
680
681    async fn acquire_request_slot(&self) -> Result<Option<OwnedSemaphorePermit>, SharedClientError> {
682        match &self.semaphore {
683            Some(semaphore) => semaphore
684                .clone()
685                .acquire_owned()
686                .await
687                .map(Some)
688                .map_err(|error| SharedClientError::Transport(error.to_string())),
689            None => Ok(None),
690        }
691    }
692
693    fn invalidate_cache(&self) {
694        self.cache.lock().expect("cache lock clear").clear();
695    }
696
697    fn cache_ttl_ms(&self, method: &str, path: &str, options: &RequestOptions) -> u64 {
698        if self.is_cacheable_read(method, path) {
699            options.cache_ttl_ms.unwrap_or(self.read_cache_ttl_ms)
700        } else {
701            0
702        }
703    }
704
705    fn is_cacheable_read(&self, method: &str, path: &str) -> bool {
706        if method == "GET" {
707            return true;
708        }
709        method == "POST"
710            && (path.ends_with("/search") || path.ends_with("/aggregate") || path.ends_with("/count"))
711    }
712
713    fn cache_get(&self, signature: &str) -> Option<Value> {
714        let mut cache = self.cache.lock().expect("cache lock get");
715        let entry = cache.get(signature)?.clone();
716        if entry.expires_at > Instant::now() {
717            Some(entry.value)
718        } else {
719            cache.remove(signature);
720            None
721        }
722    }
723
724    fn cache_set(&self, signature: &str, ttl_ms: u64, value: Value) {
725        self.cache.lock().expect("cache lock set").insert(
726            signature.to_owned(),
727            CacheEntry {
728                expires_at: Instant::now() + Duration::from_millis(ttl_ms),
729                value,
730            },
731        );
732    }
733
734    fn request_signature(
735        &self,
736        method: &str,
737        url: &str,
738        headers: &HeaderMap,
739        body: Option<&Value>,
740    ) -> Result<String, K2DbApiClientError> {
741        let scope = headers
742            .get("x-scope")
743            .and_then(|value| value.to_str().ok())
744            .unwrap_or_default();
745        let auth = headers
746            .get(AUTHORIZATION)
747            .and_then(|value| value.to_str().ok())
748            .unwrap_or_default();
749        let auth_key = if auth.is_empty() { String::new() } else { fnv1a64(auth) };
750        let body_key = body.map(stable_stringify).unwrap_or_default();
751        let raw = format!("{method} {url}\nscope={scope}\nauth={auth_key}\nbody={body_key}");
752        Ok(fnv1a64(&raw))
753    }
754
755    fn serialize_body_value<B: Serialize + ?Sized>(
756        &self,
757        body: Option<&B>,
758    ) -> Result<Option<Value>, K2DbApiClientError> {
759        body.map(|value| {
760            serde_json::to_value(value)
761                .map_err(|error| K2DbApiClientError::Serialization(error.to_string()))
762        })
763        .transpose()
764    }
765
766    fn serialize_body_bytes<B: Serialize + ?Sized>(
767        &self,
768        body: Option<&B>,
769    ) -> Result<Option<Vec<u8>>, K2DbApiClientError> {
770        body.map(|value| {
771            serde_json::to_vec(value)
772                .map_err(|error| K2DbApiClientError::Serialization(error.to_string()))
773        })
774        .transpose()
775    }
776
777    fn url(&self, path: &str, query: Option<&[(String, String)]>) -> String {
778        let mut url = format!("{}{}", self.base_url, path);
779        if let Some(query) = query {
780            if !query.is_empty() {
781                let mut first = true;
782                for (key, value) in query {
783                    url.push(if first { '?' } else { '&' });
784                    first = false;
785                    url.push_str(&urlencoding::encode(key));
786                    url.push('=');
787                    url.push_str(&urlencoding::encode(value));
788                }
789            }
790        }
791        url
792    }
793
794    fn headers(
795        &self,
796        options: &RequestOptions,
797        include_auth: bool,
798        include_content_type: bool,
799    ) -> Result<HeaderMap, K2DbApiClientError> {
800        let mut headers = self.default_headers.clone();
801
802        for (name, value) in &options.headers {
803            let header_name = HeaderName::try_from(name.as_str()).map_err(|_| {
804                K2DbApiClientError::Configuration(format!("invalid header name: {name}"))
805            })?;
806            let header_value = HeaderValue::try_from(value.as_str()).map_err(|_| {
807                K2DbApiClientError::Configuration(format!("invalid header value for {name}"))
808            })?;
809            headers.insert(header_name, header_value);
810        }
811
812        if let Some(scope) = &options.scope {
813            headers.insert(
814                HeaderName::from_static("x-scope"),
815                HeaderValue::try_from(scope.as_str()).map_err(|_| {
816                    K2DbApiClientError::Configuration("invalid x-scope header value".to_owned())
817                })?,
818            );
819        }
820
821        if include_auth {
822            let auth = options.api_key.as_ref().or(self.api_key.as_ref());
823            if let Some(auth) = auth {
824                let value = if auth.starts_with("ApiKey ") {
825                    auth.clone()
826                } else {
827                    format!("ApiKey {auth}")
828                };
829                headers.insert(
830                    AUTHORIZATION,
831                    HeaderValue::try_from(value.as_str()).map_err(|_| {
832                        K2DbApiClientError::Configuration(
833                            "invalid authorization header value".to_owned(),
834                        )
835                    })?,
836                );
837            }
838        }
839
840        if include_content_type {
841            headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
842        }
843
844        Ok(headers)
845    }
846}
847
848impl From<K2DbApiClientError> for SharedClientError {
849    fn from(value: K2DbApiClientError) -> Self {
850        match value {
851            K2DbApiClientError::Http(error) => Self::Transport(error.to_string()),
852            K2DbApiClientError::Transport(message) => Self::Transport(message),
853            K2DbApiClientError::Problem(problem) => Self::Problem(problem),
854            K2DbApiClientError::Configuration(message) => Self::Configuration(message),
855            K2DbApiClientError::Serialization(message) => Self::Serialization(message),
856        }
857    }
858}
859
860fn from_value<T: DeserializeOwned>(value: Value) -> Result<T, K2DbApiClientError> {
861    serde_json::from_value(value).map_err(|error| K2DbApiClientError::Serialization(error.to_string()))
862}
863
864fn error_from_status_payload(status: StatusCode, payload: Value) -> K2DbApiClientError {
865    if let Ok(problem) = serde_json::from_value::<ProblemDetailsPayload>(payload.clone()) {
866        return K2DbApiClientError::Problem(problem);
867    }
868
869    let detail = match payload {
870        Value::String(value) if !value.is_empty() => value,
871        Value::Object(map) => map
872            .get("detail")
873            .and_then(|value| value.as_str())
874            .map(ToOwned::to_owned)
875            .unwrap_or_else(|| format!("request failed: {status}")),
876        _ => format!("request failed: {status}"),
877    };
878    K2DbApiClientError::Transport(detail)
879}
880
881fn stable_stringify(value: &Value) -> String {
882    match value {
883        Value::Null => "null".to_owned(),
884        Value::Bool(value) => value.to_string(),
885        Value::Number(value) => value.to_string(),
886        Value::String(value) => serde_json::to_string(value).unwrap_or_else(|_| "\"\"".to_owned()),
887        Value::Array(values) => format!(
888            "[{}]",
889            values.iter().map(stable_stringify).collect::<Vec<_>>().join(",")
890        ),
891        Value::Object(map) => {
892            let mut keys = map.keys().cloned().collect::<Vec<_>>();
893            keys.sort();
894            format!(
895                "{{{}}}",
896                keys.iter()
897                    .map(|key| {
898                        let encoded = serde_json::to_string(key).unwrap_or_else(|_| "\"\"".to_owned());
899                        let value = map.get(key).expect("stable stringify value");
900                        format!("{encoded}:{}", stable_stringify(value))
901                    })
902                    .collect::<Vec<_>>()
903                    .join(",")
904            )
905        }
906    }
907}
908
909fn fnv1a64(value: &str) -> String {
910    let mut hash = 0xcbf29ce484222325_u64;
911    for byte in value.as_bytes() {
912        hash ^= u64::from(*byte);
913        hash = hash.wrapping_mul(0x100000001b3);
914    }
915    format!("{hash:016x}")
916}
917
918fn encode_segment(value: &str) -> String {
919    urlencoding::encode(value).into_owned()
920}
921
922fn query_pairs(skip: Option<u64>, limit: Option<u64>) -> Vec<(String, String)> {
923    let mut out = Vec::new();
924    if let Some(skip) = skip {
925        out.push(("skip".to_owned(), skip.to_string()));
926    }
927    if let Some(limit) = limit {
928        out.push(("limit".to_owned(), limit.to_string()));
929    }
930    out
931}
932
933#[cfg(test)]
934mod tests {
935    use std::sync::Arc;
936    use std::sync::atomic::{AtomicUsize, Ordering};
937    use std::time::Duration;
938
939    use axum::Json;
940    use axum::extract::Path;
941    use axum::routing::{get, patch};
942    use axum::{Router, serve};
943    use serde_json::json;
944    use tokio::net::TcpListener;
945    use tokio::time::sleep;
946
947    use super::*;
948
949    async fn start_server(router: Router) -> String {
950        let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind");
951        let address = listener.local_addr().expect("local addr");
952        tokio::spawn(async move {
953            serve(listener, router).await.expect("serve");
954        });
955        format!("http://{}", address)
956    }
957
958    #[tokio::test]
959    async fn cache_and_inflight_collapse_share_single_read() {
960        let hits = Arc::new(AtomicUsize::new(0));
961        let router = {
962            let hits = hits.clone();
963            Router::new().route(
964                "/health",
965                get(move || {
966                    let hits = hits.clone();
967                    async move {
968                        hits.fetch_add(1, Ordering::SeqCst);
969                        sleep(Duration::from_millis(50)).await;
970                        Json(json!({ "status": "ok" }))
971                    }
972                }),
973            )
974        };
975        let base_url = start_server(router).await;
976        let client = K2DbApiClient::new(K2DbApiClientOptions {
977            base_url,
978            api_key: None,
979            headers: Vec::new(),
980            read_cache_ttl_ms: Some(1_000),
981            max_concurrent_requests: None,
982        })
983        .expect("client");
984
985        let (left, right) = tokio::join!(client.health(), client.health());
986        assert_eq!(left.expect("left").status, "ok");
987        assert_eq!(right.expect("right").status, "ok");
988        assert_eq!(hits.load(Ordering::SeqCst), 1);
989
990        let third = client.health().await.expect("third");
991        assert_eq!(third.status, "ok");
992        assert_eq!(hits.load(Ordering::SeqCst), 1);
993    }
994
995    #[tokio::test]
996    async fn mutating_requests_invalidate_read_cache() {
997        let reads = Arc::new(AtomicUsize::new(0));
998        let router = {
999            let reads_get = reads.clone();
1000            Router::new()
1001                .route(
1002                    "/v1/widgets/alpha",
1003                    get(move || {
1004                        let reads = reads_get.clone();
1005                        async move {
1006                            reads.fetch_add(1, Ordering::SeqCst);
1007                            Json(json!({ "name": "before" }))
1008                        }
1009                    })
1010                    .patch(|| async { Json(json!({ "updated": 1 })) }),
1011                )
1012        };
1013        let base_url = start_server(router).await;
1014        let client = K2DbApiClient::new(K2DbApiClientOptions {
1015            base_url,
1016            api_key: Some("demo.secret".to_owned()),
1017            headers: Vec::new(),
1018            read_cache_ttl_ms: Some(1_000),
1019            max_concurrent_requests: None,
1020        })
1021        .expect("client");
1022
1023        let options = RequestOptions {
1024            scope: Some("owner:demo".to_owned()),
1025            ..RequestOptions::default()
1026        };
1027
1028        let _: Value = client.get_by_id("widgets", "alpha", &options).await.expect("first read");
1029        let _: Value = client.get_by_id("widgets", "alpha", &options).await.expect("cached read");
1030        assert_eq!(reads.load(Ordering::SeqCst), 1);
1031
1032        client
1033            .patch_by_id("widgets", "alpha", &json!({ "name": "after" }), &options)
1034            .await
1035            .expect("patch");
1036
1037        let _: Value = client.get_by_id("widgets", "alpha", &options).await.expect("after patch read");
1038        assert_eq!(reads.load(Ordering::SeqCst), 2);
1039    }
1040
1041    #[tokio::test]
1042    async fn concurrency_limit_serializes_requests() {
1043        let active = Arc::new(AtomicUsize::new(0));
1044        let max_seen = Arc::new(AtomicUsize::new(0));
1045        let router = {
1046            let active = active.clone();
1047            let max_seen = max_seen.clone();
1048            Router::new().route(
1049                "/v1/widgets/{id}",
1050                get(move |Path(_id): Path<String>| {
1051                    let active = active.clone();
1052                    let max_seen = max_seen.clone();
1053                    async move {
1054                        let current = active.fetch_add(1, Ordering::SeqCst) + 1;
1055                        let _ = max_seen.fetch_max(current, Ordering::SeqCst);
1056                        sleep(Duration::from_millis(40)).await;
1057                        active.fetch_sub(1, Ordering::SeqCst);
1058                        Json(json!({ "ok": true }))
1059                    }
1060                }),
1061            )
1062        };
1063        let base_url = start_server(router).await;
1064        let client = K2DbApiClient::new(K2DbApiClientOptions {
1065            base_url,
1066            api_key: Some("demo.secret".to_owned()),
1067            headers: Vec::new(),
1068            read_cache_ttl_ms: Some(0),
1069            max_concurrent_requests: Some(1),
1070        })
1071        .expect("client");
1072
1073        let options = RequestOptions {
1074            scope: Some("owner:demo".to_owned()),
1075            ..RequestOptions::default()
1076        };
1077
1078        let (left, right) = tokio::join!(
1079            client.get_by_id::<Value>("widgets", "a", &options),
1080            client.get_by_id::<Value>("widgets", "b", &options)
1081        );
1082        left.expect("left request");
1083        right.expect("right request");
1084        assert_eq!(max_seen.load(Ordering::SeqCst), 1);
1085    }
1086
1087    #[tokio::test]
1088    async fn patch_collection_calls_collection_patch_endpoint() {
1089        let router = Router::new().route(
1090            "/v1/widgets",
1091            patch(|| async { Json(json!({ "updated": 3 })) }),
1092        );
1093        let base_url = start_server(router).await;
1094        let client = K2DbApiClient::new(K2DbApiClientOptions {
1095            base_url,
1096            api_key: Some("demo.secret".to_owned()),
1097            headers: Vec::new(),
1098            read_cache_ttl_ms: Some(0),
1099            max_concurrent_requests: None,
1100        })
1101        .expect("client");
1102
1103        let result = client
1104            .patch_collection(
1105                "widgets",
1106                &PatchCollectionRequest {
1107                    criteria: json!({ "kind": "demo" }),
1108                    values: json!({ "name": "updated" }),
1109                },
1110                &RequestOptions {
1111                    scope: Some("owner:demo".to_owned()),
1112                    ..RequestOptions::default()
1113                },
1114            )
1115            .await
1116            .expect("patch collection");
1117
1118        assert_eq!(result.updated, 3);
1119    }
1120}