1use std::collections::HashMap;
5use std::sync::{Arc, Mutex};
6use std::time::{Duration, Instant};
7
8use k2db_api_contract::{
9 AggregateRequest, CountRequest, CountResult, CreateIndexesRequest, CreateResult, HealthOk,
10 MessageResponse, PatchCollectionRequest, ProblemDetailsPayload, ReadyNotOk, ReadyOk,
11 RestoreRequest, RestoreResult, SearchRequest, UpdateResult, VersionInfo,
12 VersionedUpdateRequest, VersionedUpdateResult,
13};
14use reqwest::header::{ACCEPT, AUTHORIZATION, CONTENT_TYPE, HeaderMap, HeaderName, HeaderValue};
15use reqwest::{Method, StatusCode};
16use serde::{Serialize, de::DeserializeOwned};
17use serde_json::Value;
18use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore};
19
20use crate::error::K2DbApiClientError;
21
22#[derive(Debug, Clone)]
23pub struct K2DbApiClientOptions {
24 pub base_url: String,
25 pub api_key: Option<String>,
26 pub headers: Vec<(String, String)>,
27 pub read_cache_ttl_ms: Option<u64>,
28 pub max_concurrent_requests: Option<usize>,
29}
30
31#[derive(Debug, Clone, Default)]
32pub struct RequestOptions {
33 pub api_key: Option<String>,
34 pub scope: Option<String>,
35 pub headers: Vec<(String, String)>,
36 pub cache_ttl_ms: Option<u64>,
37}
38
39#[derive(Debug, Clone)]
40pub struct K2DbApiClient {
41 base_url: String,
42 api_key: Option<String>,
43 default_headers: HeaderMap,
44 http: reqwest::Client,
45 read_cache_ttl_ms: u64,
46 cache: Arc<Mutex<HashMap<String, CacheEntry>>>,
47 inflight: Arc<Mutex<HashMap<String, Arc<InFlightRequest>>>>,
48 semaphore: Option<Arc<Semaphore>>,
49}
50
51#[derive(Debug, Clone)]
52struct CacheEntry {
53 expires_at: Instant,
54 value: Value,
55}
56
57#[derive(Debug)]
58struct InFlightRequest {
59 notify: Notify,
60 result: Mutex<Option<Result<Value, SharedClientError>>>,
61}
62
63#[derive(Debug, Clone)]
64enum SharedClientError {
65 Transport(String),
66 Problem(ProblemDetailsPayload),
67 Configuration(String),
68 Serialization(String),
69}
70
71impl From<SharedClientError> for K2DbApiClientError {
72 fn from(value: SharedClientError) -> Self {
73 match value {
74 SharedClientError::Transport(message) => Self::Transport(message),
75 SharedClientError::Problem(problem) => Self::Problem(problem),
76 SharedClientError::Configuration(message) => Self::Configuration(message),
77 SharedClientError::Serialization(message) => Self::Serialization(message),
78 }
79 }
80}
81
82impl InFlightRequest {
83 fn new() -> Self {
84 Self {
85 notify: Notify::new(),
86 result: Mutex::new(None),
87 }
88 }
89
90 fn finish(&self, result: Result<Value, SharedClientError>) {
91 *self.result.lock().expect("inflight result lock") = Some(result);
92 self.notify.notify_waiters();
93 }
94
95 async fn wait(&self) -> Result<Value, SharedClientError> {
96 loop {
97 if let Some(result) = self.result.lock().expect("inflight result lock clone").clone() {
98 return result;
99 }
100 self.notify.notified().await;
101 }
102 }
103}
104
105impl K2DbApiClient {
106 pub fn new(options: K2DbApiClientOptions) -> Result<Self, K2DbApiClientError> {
107 let base_url = options.base_url.trim_end_matches('/').to_owned();
108 if base_url.is_empty() {
109 return Err(K2DbApiClientError::Configuration(
110 "base_url is required".to_owned(),
111 ));
112 }
113
114 let mut default_headers = HeaderMap::new();
115 default_headers.insert(ACCEPT, HeaderValue::from_static("application/json"));
116
117 for (name, value) in options.headers {
118 let header_name = HeaderName::try_from(name.as_str()).map_err(|_| {
119 K2DbApiClientError::Configuration(format!("invalid header name: {name}"))
120 })?;
121 let header_value = HeaderValue::try_from(value.as_str()).map_err(|_| {
122 K2DbApiClientError::Configuration(format!("invalid header value for {name}"))
123 })?;
124 default_headers.insert(header_name, header_value);
125 }
126
127 let read_cache_ttl_ms = options.read_cache_ttl_ms.unwrap_or(60_000);
128 let max_concurrent_requests = options.max_concurrent_requests.unwrap_or(0);
129
130 Ok(Self {
131 base_url,
132 api_key: options.api_key,
133 default_headers,
134 http: reqwest::Client::new(),
135 read_cache_ttl_ms,
136 cache: Arc::new(Mutex::new(HashMap::new())),
137 inflight: Arc::new(Mutex::new(HashMap::new())),
138 semaphore: (max_concurrent_requests > 0)
139 .then(|| Arc::new(Semaphore::new(max_concurrent_requests))),
140 })
141 }
142
143 pub async fn health(&self) -> Result<HealthOk, K2DbApiClientError> {
144 self.get_json("/health", &RequestOptions::default(), false).await
145 }
146
147 pub async fn ready(&self) -> Result<Result<ReadyOk, ReadyNotOk>, K2DbApiClientError> {
148 let (status, payload) = self
149 .request_value_with_status(
150 Method::GET,
151 "/ready",
152 &RequestOptions::default(),
153 Option::<&()>::None,
154 None,
155 false,
156 )
157 .await?;
158
159 match status {
160 status if status.is_success() => Ok(Ok(from_value(payload)?)),
161 StatusCode::SERVICE_UNAVAILABLE => Ok(Err(from_value(payload)?)),
162 _ => Err(error_from_status_payload(status, payload)),
163 }
164 }
165
166 pub async fn create<T: Serialize>(
167 &self,
168 collection: &str,
169 document: &T,
170 options: &RequestOptions,
171 ) -> Result<CreateResult, K2DbApiClientError> {
172 let result = self
173 .request_json(
174 Method::POST,
175 &format!("/v1/{}", encode_segment(collection)),
176 options,
177 Some(document),
178 None,
179 true,
180 )
181 .await?;
182 self.invalidate_cache();
183 Ok(result)
184 }
185
186 pub async fn get_by_id<T: DeserializeOwned>(
187 &self,
188 collection: &str,
189 id: &str,
190 options: &RequestOptions,
191 ) -> Result<T, K2DbApiClientError> {
192 self.request_json(
193 Method::GET,
194 &format!("/v1/{}/{}", encode_segment(collection), encode_segment(id)),
195 options,
196 Option::<&()>::None,
197 None,
198 true,
199 )
200 .await
201 }
202
203 pub async fn patch_by_id<T: Serialize>(
204 &self,
205 collection: &str,
206 id: &str,
207 document: &T,
208 options: &RequestOptions,
209 ) -> Result<UpdateResult, K2DbApiClientError> {
210 let result = self
211 .request_json(
212 Method::PATCH,
213 &format!("/v1/{}/{}", encode_segment(collection), encode_segment(id)),
214 options,
215 Some(document),
216 None,
217 true,
218 )
219 .await?;
220 self.invalidate_cache();
221 Ok(result)
222 }
223
224 pub async fn delete_by_id(
225 &self,
226 collection: &str,
227 id: &str,
228 options: &RequestOptions,
229 ) -> Result<(), K2DbApiClientError> {
230 self.request_empty(
231 Method::DELETE,
232 &format!("/v1/{}/{}", encode_segment(collection), encode_segment(id)),
233 options,
234 Option::<&()>::None,
235 None,
236 true,
237 )
238 .await?;
239 self.invalidate_cache();
240 Ok(())
241 }
242
243 pub async fn patch_collection(
244 &self,
245 collection: &str,
246 payload: &PatchCollectionRequest,
247 options: &RequestOptions,
248 ) -> Result<UpdateResult, K2DbApiClientError> {
249 let result = self
250 .request_json(
251 Method::PATCH,
252 &format!("/v1/{}", encode_segment(collection)),
253 options,
254 Some(payload),
255 None,
256 true,
257 )
258 .await?;
259 self.invalidate_cache();
260 Ok(result)
261 }
262
263 pub async fn search<T: DeserializeOwned>(
264 &self,
265 collection: &str,
266 payload: &SearchRequest,
267 options: &RequestOptions,
268 ) -> Result<Vec<T>, K2DbApiClientError> {
269 self.request_json(
270 Method::POST,
271 &format!("/v1/{}/search", encode_segment(collection)),
272 options,
273 Some(payload),
274 None,
275 true,
276 )
277 .await
278 }
279
280 pub async fn aggregate<T: DeserializeOwned>(
281 &self,
282 collection: &str,
283 payload: &AggregateRequest,
284 options: &RequestOptions,
285 ) -> Result<Vec<T>, K2DbApiClientError> {
286 self.request_json(
287 Method::POST,
288 &format!("/v1/{}/aggregate", encode_segment(collection)),
289 options,
290 Some(payload),
291 None,
292 true,
293 )
294 .await
295 }
296
297 pub async fn count(
298 &self,
299 collection: &str,
300 payload: &CountRequest,
301 options: &RequestOptions,
302 ) -> Result<CountResult, K2DbApiClientError> {
303 self.request_json(
304 Method::POST,
305 &format!("/v1/{}/count", encode_segment(collection)),
306 options,
307 Some(payload),
308 None,
309 true,
310 )
311 .await
312 }
313
314 pub async fn restore(
315 &self,
316 collection: &str,
317 payload: &RestoreRequest,
318 options: &RequestOptions,
319 ) -> Result<RestoreResult, K2DbApiClientError> {
320 let result = self
321 .request_json(
322 Method::POST,
323 &format!("/v1/{}/restore", encode_segment(collection)),
324 options,
325 Some(payload),
326 None,
327 true,
328 )
329 .await?;
330 self.invalidate_cache();
331 Ok(result)
332 }
333
334 pub async fn get_versions(
335 &self,
336 collection: &str,
337 id: &str,
338 skip: Option<u64>,
339 limit: Option<u64>,
340 options: &RequestOptions,
341 ) -> Result<Vec<VersionInfo>, K2DbApiClientError> {
342 let query = query_pairs(skip, limit);
343 self.request_json(
344 Method::GET,
345 &format!("/v1/{}/{}/versions", encode_segment(collection), encode_segment(id)),
346 options,
347 Option::<&()>::None,
348 Some(&query),
349 true,
350 )
351 .await
352 }
353
354 pub async fn patch_versions(
355 &self,
356 collection: &str,
357 id: &str,
358 payload: &VersionedUpdateRequest,
359 options: &RequestOptions,
360 ) -> Result<Vec<VersionedUpdateResult>, K2DbApiClientError> {
361 let result = self
362 .request_json(
363 Method::PATCH,
364 &format!("/v1/{}/{}/versions", encode_segment(collection), encode_segment(id)),
365 options,
366 Some(payload),
367 None,
368 true,
369 )
370 .await?;
371 self.invalidate_cache();
372 Ok(result)
373 }
374
375 pub async fn revert_version(
376 &self,
377 collection: &str,
378 id: &str,
379 version: u64,
380 options: &RequestOptions,
381 ) -> Result<UpdateResult, K2DbApiClientError> {
382 let result = self
383 .request_json(
384 Method::POST,
385 &format!(
386 "/v1/{}/{}/versions/{}/revert",
387 encode_segment(collection),
388 encode_segment(id),
389 version
390 ),
391 options,
392 Option::<&()>::None,
393 None,
394 true,
395 )
396 .await?;
397 self.invalidate_cache();
398 Ok(result)
399 }
400
401 pub async fn admin_delete_collection(
402 &self,
403 collection: &str,
404 options: &RequestOptions,
405 ) -> Result<(), K2DbApiClientError> {
406 self.request_empty(
407 Method::DELETE,
408 &format!("/v1/admin/{}", encode_segment(collection)),
409 options,
410 Option::<&()>::None,
411 None,
412 true,
413 )
414 .await?;
415 self.invalidate_cache();
416 Ok(())
417 }
418
419 pub async fn admin_delete_by_id(
420 &self,
421 collection: &str,
422 id: &str,
423 options: &RequestOptions,
424 ) -> Result<(), K2DbApiClientError> {
425 self.request_empty(
426 Method::DELETE,
427 &format!("/v1/admin/{}/{}", encode_segment(collection), encode_segment(id)),
428 options,
429 Option::<&()>::None,
430 None,
431 true,
432 )
433 .await?;
434 self.invalidate_cache();
435 Ok(())
436 }
437
438 pub async fn admin_create_indexes(
439 &self,
440 collection: &str,
441 payload: &CreateIndexesRequest,
442 options: &RequestOptions,
443 ) -> Result<MessageResponse, K2DbApiClientError> {
444 let result = self
445 .request_json(
446 Method::POST,
447 &format!("/v1/admin/{}/indexes", encode_segment(collection)),
448 options,
449 Some(payload),
450 None,
451 true,
452 )
453 .await?;
454 self.invalidate_cache();
455 Ok(result)
456 }
457
458 pub async fn admin_create_history_indexes(
459 &self,
460 collection: &str,
461 options: &RequestOptions,
462 ) -> Result<MessageResponse, K2DbApiClientError> {
463 let result = self
464 .request_json(
465 Method::POST,
466 &format!("/v1/admin/{}/history-indexes", encode_segment(collection)),
467 options,
468 Some(&serde_json::json!({})),
469 None,
470 true,
471 )
472 .await?;
473 self.invalidate_cache();
474 Ok(result)
475 }
476
477 async fn get_json<T: DeserializeOwned>(
478 &self,
479 path: &str,
480 options: &RequestOptions,
481 include_auth: bool,
482 ) -> Result<T, K2DbApiClientError> {
483 let value = self
484 .request_value(
485 Method::GET,
486 path,
487 options,
488 Option::<&()>::None,
489 None,
490 include_auth,
491 )
492 .await?;
493 from_value(value)
494 }
495
496 async fn request_json<T, B>(
497 &self,
498 method: Method,
499 path: &str,
500 options: &RequestOptions,
501 body: Option<&B>,
502 query: Option<&[(String, String)]>,
503 include_auth: bool,
504 ) -> Result<T, K2DbApiClientError>
505 where
506 T: DeserializeOwned,
507 B: Serialize + ?Sized,
508 {
509 let value = self
510 .request_value(method, path, options, body, query, include_auth)
511 .await?;
512 from_value(value)
513 }
514
515 async fn request_empty<B>(
516 &self,
517 method: Method,
518 path: &str,
519 options: &RequestOptions,
520 body: Option<&B>,
521 query: Option<&[(String, String)]>,
522 include_auth: bool,
523 ) -> Result<(), K2DbApiClientError>
524 where
525 B: Serialize + ?Sized,
526 {
527 let url = self.url(path, query);
528 let headers = self.headers(options, include_auth, body.is_some())?;
529 let body_bytes = self.serialize_body_bytes(body)?;
530 let (status, payload) = self
531 .perform_request(method, url, headers, body_bytes)
532 .await
533 .map_err(K2DbApiClientError::from)?;
534
535 if status.is_success() {
536 return Ok(());
537 }
538 Err(error_from_status_payload(status, payload))
539 }
540
541 async fn request_value<B>(
542 &self,
543 method: Method,
544 path: &str,
545 options: &RequestOptions,
546 body: Option<&B>,
547 query: Option<&[(String, String)]>,
548 include_auth: bool,
549 ) -> Result<Value, K2DbApiClientError>
550 where
551 B: Serialize + ?Sized,
552 {
553 let (status, payload) = self
554 .request_value_with_status(method, path, options, body, query, include_auth)
555 .await?;
556
557 if status.is_success() {
558 return Ok(payload);
559 }
560
561 Err(error_from_status_payload(status, payload))
562 }
563
564 async fn request_value_with_status<B>(
565 &self,
566 method: Method,
567 path: &str,
568 options: &RequestOptions,
569 body: Option<&B>,
570 query: Option<&[(String, String)]>,
571 include_auth: bool,
572 ) -> Result<(StatusCode, Value), K2DbApiClientError>
573 where
574 B: Serialize + ?Sized,
575 {
576 let url = self.url(path, query);
577 let headers = self.headers(options, include_auth, body.is_some())?;
578 let body_value = self.serialize_body_value(body)?;
579 let body_bytes = self.serialize_body_bytes(body)?;
580
581 let ttl_ms = self.cache_ttl_ms(method.as_str(), path, options);
582 if ttl_ms > 0 {
583 let signature = self.request_signature(method.as_str(), &url, &headers, body_value.as_ref())?;
584 if let Some(value) = self.cache_get(&signature) {
585 return Ok((StatusCode::OK, value));
586 }
587
588 let (request, owner) = {
589 let mut inflight = self.inflight.lock().expect("inflight map lock");
590 if let Some(existing) = inflight.get(&signature).cloned() {
591 (existing, false)
592 } else {
593 let created = Arc::new(InFlightRequest::new());
594 inflight.insert(signature.clone(), created.clone());
595 (created, true)
596 }
597 };
598
599 if owner {
600 let result = self
601 .perform_request(method, url, headers, body_bytes)
602 .await;
603 let shared = result
604 .map_err(SharedClientError::from)
605 .and_then(|(status, payload)| {
606 if status.is_success() {
607 Ok(payload)
608 } else {
609 Err(SharedClientError::from(error_from_status_payload(status, payload)))
610 }
611 });
612
613 if let Ok(value) = &shared {
614 self.cache_set(&signature, ttl_ms, value.clone());
615 }
616
617 request.finish(shared.clone());
618 self.inflight
619 .lock()
620 .expect("inflight map lock remove")
621 .remove(&signature);
622
623 return shared.map(|value| (StatusCode::OK, value)).map_err(K2DbApiClientError::from);
624 }
625
626 return request
627 .wait()
628 .await
629 .map(|value| (StatusCode::OK, value))
630 .map_err(K2DbApiClientError::from);
631 }
632
633 self.perform_request(method, url, headers, body_bytes)
634 .await
635 .map_err(K2DbApiClientError::from)
636 }
637
638 async fn perform_request(
639 &self,
640 method: Method,
641 url: String,
642 headers: HeaderMap,
643 body: Option<Vec<u8>>,
644 ) -> Result<(StatusCode, Value), SharedClientError> {
645 let mut request = self.http.request(method, url).headers(headers);
646 if let Some(body) = body {
647 request = request.body(body);
648 }
649
650 let _permit = self.acquire_request_slot().await?;
651 let response = request
652 .send()
653 .await
654 .map_err(|error| SharedClientError::Transport(error.to_string()))?;
655 let status = response.status();
656
657 if status == StatusCode::NO_CONTENT {
658 return Ok((status, Value::Null));
659 }
660
661 let content_type = response
662 .headers()
663 .get(reqwest::header::CONTENT_TYPE)
664 .and_then(|value| value.to_str().ok())
665 .unwrap_or_default()
666 .to_owned();
667 let text = response
668 .text()
669 .await
670 .map_err(|error| SharedClientError::Transport(error.to_string()))?;
671
672 if content_type.contains("application/json") || content_type.contains("application/problem+json") {
673 let payload = serde_json::from_str::<Value>(&text)
674 .map_err(|error| SharedClientError::Serialization(error.to_string()))?;
675 return Ok((status, payload));
676 }
677
678 Ok((status, Value::String(text)))
679 }
680
681 async fn acquire_request_slot(&self) -> Result<Option<OwnedSemaphorePermit>, SharedClientError> {
682 match &self.semaphore {
683 Some(semaphore) => semaphore
684 .clone()
685 .acquire_owned()
686 .await
687 .map(Some)
688 .map_err(|error| SharedClientError::Transport(error.to_string())),
689 None => Ok(None),
690 }
691 }
692
693 fn invalidate_cache(&self) {
694 self.cache.lock().expect("cache lock clear").clear();
695 }
696
697 fn cache_ttl_ms(&self, method: &str, path: &str, options: &RequestOptions) -> u64 {
698 if self.is_cacheable_read(method, path) {
699 options.cache_ttl_ms.unwrap_or(self.read_cache_ttl_ms)
700 } else {
701 0
702 }
703 }
704
705 fn is_cacheable_read(&self, method: &str, path: &str) -> bool {
706 if method == "GET" {
707 return true;
708 }
709 method == "POST"
710 && (path.ends_with("/search") || path.ends_with("/aggregate") || path.ends_with("/count"))
711 }
712
713 fn cache_get(&self, signature: &str) -> Option<Value> {
714 let mut cache = self.cache.lock().expect("cache lock get");
715 let entry = cache.get(signature)?.clone();
716 if entry.expires_at > Instant::now() {
717 Some(entry.value)
718 } else {
719 cache.remove(signature);
720 None
721 }
722 }
723
724 fn cache_set(&self, signature: &str, ttl_ms: u64, value: Value) {
725 self.cache.lock().expect("cache lock set").insert(
726 signature.to_owned(),
727 CacheEntry {
728 expires_at: Instant::now() + Duration::from_millis(ttl_ms),
729 value,
730 },
731 );
732 }
733
734 fn request_signature(
735 &self,
736 method: &str,
737 url: &str,
738 headers: &HeaderMap,
739 body: Option<&Value>,
740 ) -> Result<String, K2DbApiClientError> {
741 let scope = headers
742 .get("x-scope")
743 .and_then(|value| value.to_str().ok())
744 .unwrap_or_default();
745 let auth = headers
746 .get(AUTHORIZATION)
747 .and_then(|value| value.to_str().ok())
748 .unwrap_or_default();
749 let auth_key = if auth.is_empty() { String::new() } else { fnv1a64(auth) };
750 let body_key = body.map(stable_stringify).unwrap_or_default();
751 let raw = format!("{method} {url}\nscope={scope}\nauth={auth_key}\nbody={body_key}");
752 Ok(fnv1a64(&raw))
753 }
754
755 fn serialize_body_value<B: Serialize + ?Sized>(
756 &self,
757 body: Option<&B>,
758 ) -> Result<Option<Value>, K2DbApiClientError> {
759 body.map(|value| {
760 serde_json::to_value(value)
761 .map_err(|error| K2DbApiClientError::Serialization(error.to_string()))
762 })
763 .transpose()
764 }
765
766 fn serialize_body_bytes<B: Serialize + ?Sized>(
767 &self,
768 body: Option<&B>,
769 ) -> Result<Option<Vec<u8>>, K2DbApiClientError> {
770 body.map(|value| {
771 serde_json::to_vec(value)
772 .map_err(|error| K2DbApiClientError::Serialization(error.to_string()))
773 })
774 .transpose()
775 }
776
777 fn url(&self, path: &str, query: Option<&[(String, String)]>) -> String {
778 let mut url = format!("{}{}", self.base_url, path);
779 if let Some(query) = query {
780 if !query.is_empty() {
781 let mut first = true;
782 for (key, value) in query {
783 url.push(if first { '?' } else { '&' });
784 first = false;
785 url.push_str(&urlencoding::encode(key));
786 url.push('=');
787 url.push_str(&urlencoding::encode(value));
788 }
789 }
790 }
791 url
792 }
793
794 fn headers(
795 &self,
796 options: &RequestOptions,
797 include_auth: bool,
798 include_content_type: bool,
799 ) -> Result<HeaderMap, K2DbApiClientError> {
800 let mut headers = self.default_headers.clone();
801
802 for (name, value) in &options.headers {
803 let header_name = HeaderName::try_from(name.as_str()).map_err(|_| {
804 K2DbApiClientError::Configuration(format!("invalid header name: {name}"))
805 })?;
806 let header_value = HeaderValue::try_from(value.as_str()).map_err(|_| {
807 K2DbApiClientError::Configuration(format!("invalid header value for {name}"))
808 })?;
809 headers.insert(header_name, header_value);
810 }
811
812 if let Some(scope) = &options.scope {
813 headers.insert(
814 HeaderName::from_static("x-scope"),
815 HeaderValue::try_from(scope.as_str()).map_err(|_| {
816 K2DbApiClientError::Configuration("invalid x-scope header value".to_owned())
817 })?,
818 );
819 }
820
821 if include_auth {
822 let auth = options.api_key.as_ref().or(self.api_key.as_ref());
823 if let Some(auth) = auth {
824 let value = if auth.starts_with("ApiKey ") {
825 auth.clone()
826 } else {
827 format!("ApiKey {auth}")
828 };
829 headers.insert(
830 AUTHORIZATION,
831 HeaderValue::try_from(value.as_str()).map_err(|_| {
832 K2DbApiClientError::Configuration(
833 "invalid authorization header value".to_owned(),
834 )
835 })?,
836 );
837 }
838 }
839
840 if include_content_type {
841 headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
842 }
843
844 Ok(headers)
845 }
846}
847
848impl From<K2DbApiClientError> for SharedClientError {
849 fn from(value: K2DbApiClientError) -> Self {
850 match value {
851 K2DbApiClientError::Http(error) => Self::Transport(error.to_string()),
852 K2DbApiClientError::Transport(message) => Self::Transport(message),
853 K2DbApiClientError::Problem(problem) => Self::Problem(problem),
854 K2DbApiClientError::Configuration(message) => Self::Configuration(message),
855 K2DbApiClientError::Serialization(message) => Self::Serialization(message),
856 }
857 }
858}
859
860fn from_value<T: DeserializeOwned>(value: Value) -> Result<T, K2DbApiClientError> {
861 serde_json::from_value(value).map_err(|error| K2DbApiClientError::Serialization(error.to_string()))
862}
863
864fn error_from_status_payload(status: StatusCode, payload: Value) -> K2DbApiClientError {
865 if let Ok(problem) = serde_json::from_value::<ProblemDetailsPayload>(payload.clone()) {
866 return K2DbApiClientError::Problem(problem);
867 }
868
869 let detail = match payload {
870 Value::String(value) if !value.is_empty() => value,
871 Value::Object(map) => map
872 .get("detail")
873 .and_then(|value| value.as_str())
874 .map(ToOwned::to_owned)
875 .unwrap_or_else(|| format!("request failed: {status}")),
876 _ => format!("request failed: {status}"),
877 };
878 K2DbApiClientError::Transport(detail)
879}
880
881fn stable_stringify(value: &Value) -> String {
882 match value {
883 Value::Null => "null".to_owned(),
884 Value::Bool(value) => value.to_string(),
885 Value::Number(value) => value.to_string(),
886 Value::String(value) => serde_json::to_string(value).unwrap_or_else(|_| "\"\"".to_owned()),
887 Value::Array(values) => format!(
888 "[{}]",
889 values.iter().map(stable_stringify).collect::<Vec<_>>().join(",")
890 ),
891 Value::Object(map) => {
892 let mut keys = map.keys().cloned().collect::<Vec<_>>();
893 keys.sort();
894 format!(
895 "{{{}}}",
896 keys.iter()
897 .map(|key| {
898 let encoded = serde_json::to_string(key).unwrap_or_else(|_| "\"\"".to_owned());
899 let value = map.get(key).expect("stable stringify value");
900 format!("{encoded}:{}", stable_stringify(value))
901 })
902 .collect::<Vec<_>>()
903 .join(",")
904 )
905 }
906 }
907}
908
909fn fnv1a64(value: &str) -> String {
910 let mut hash = 0xcbf29ce484222325_u64;
911 for byte in value.as_bytes() {
912 hash ^= u64::from(*byte);
913 hash = hash.wrapping_mul(0x100000001b3);
914 }
915 format!("{hash:016x}")
916}
917
918fn encode_segment(value: &str) -> String {
919 urlencoding::encode(value).into_owned()
920}
921
922fn query_pairs(skip: Option<u64>, limit: Option<u64>) -> Vec<(String, String)> {
923 let mut out = Vec::new();
924 if let Some(skip) = skip {
925 out.push(("skip".to_owned(), skip.to_string()));
926 }
927 if let Some(limit) = limit {
928 out.push(("limit".to_owned(), limit.to_string()));
929 }
930 out
931}
932
933#[cfg(test)]
934mod tests {
935 use std::sync::Arc;
936 use std::sync::atomic::{AtomicUsize, Ordering};
937 use std::time::Duration;
938
939 use axum::Json;
940 use axum::extract::Path;
941 use axum::routing::{get, patch};
942 use axum::{Router, serve};
943 use serde_json::json;
944 use tokio::net::TcpListener;
945 use tokio::time::sleep;
946
947 use super::*;
948
949 async fn start_server(router: Router) -> String {
950 let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind");
951 let address = listener.local_addr().expect("local addr");
952 tokio::spawn(async move {
953 serve(listener, router).await.expect("serve");
954 });
955 format!("http://{}", address)
956 }
957
958 #[tokio::test]
959 async fn cache_and_inflight_collapse_share_single_read() {
960 let hits = Arc::new(AtomicUsize::new(0));
961 let router = {
962 let hits = hits.clone();
963 Router::new().route(
964 "/health",
965 get(move || {
966 let hits = hits.clone();
967 async move {
968 hits.fetch_add(1, Ordering::SeqCst);
969 sleep(Duration::from_millis(50)).await;
970 Json(json!({ "status": "ok" }))
971 }
972 }),
973 )
974 };
975 let base_url = start_server(router).await;
976 let client = K2DbApiClient::new(K2DbApiClientOptions {
977 base_url,
978 api_key: None,
979 headers: Vec::new(),
980 read_cache_ttl_ms: Some(1_000),
981 max_concurrent_requests: None,
982 })
983 .expect("client");
984
985 let (left, right) = tokio::join!(client.health(), client.health());
986 assert_eq!(left.expect("left").status, "ok");
987 assert_eq!(right.expect("right").status, "ok");
988 assert_eq!(hits.load(Ordering::SeqCst), 1);
989
990 let third = client.health().await.expect("third");
991 assert_eq!(third.status, "ok");
992 assert_eq!(hits.load(Ordering::SeqCst), 1);
993 }
994
995 #[tokio::test]
996 async fn mutating_requests_invalidate_read_cache() {
997 let reads = Arc::new(AtomicUsize::new(0));
998 let router = {
999 let reads_get = reads.clone();
1000 Router::new()
1001 .route(
1002 "/v1/widgets/alpha",
1003 get(move || {
1004 let reads = reads_get.clone();
1005 async move {
1006 reads.fetch_add(1, Ordering::SeqCst);
1007 Json(json!({ "name": "before" }))
1008 }
1009 })
1010 .patch(|| async { Json(json!({ "updated": 1 })) }),
1011 )
1012 };
1013 let base_url = start_server(router).await;
1014 let client = K2DbApiClient::new(K2DbApiClientOptions {
1015 base_url,
1016 api_key: Some("demo.secret".to_owned()),
1017 headers: Vec::new(),
1018 read_cache_ttl_ms: Some(1_000),
1019 max_concurrent_requests: None,
1020 })
1021 .expect("client");
1022
1023 let options = RequestOptions {
1024 scope: Some("owner:demo".to_owned()),
1025 ..RequestOptions::default()
1026 };
1027
1028 let _: Value = client.get_by_id("widgets", "alpha", &options).await.expect("first read");
1029 let _: Value = client.get_by_id("widgets", "alpha", &options).await.expect("cached read");
1030 assert_eq!(reads.load(Ordering::SeqCst), 1);
1031
1032 client
1033 .patch_by_id("widgets", "alpha", &json!({ "name": "after" }), &options)
1034 .await
1035 .expect("patch");
1036
1037 let _: Value = client.get_by_id("widgets", "alpha", &options).await.expect("after patch read");
1038 assert_eq!(reads.load(Ordering::SeqCst), 2);
1039 }
1040
1041 #[tokio::test]
1042 async fn concurrency_limit_serializes_requests() {
1043 let active = Arc::new(AtomicUsize::new(0));
1044 let max_seen = Arc::new(AtomicUsize::new(0));
1045 let router = {
1046 let active = active.clone();
1047 let max_seen = max_seen.clone();
1048 Router::new().route(
1049 "/v1/widgets/{id}",
1050 get(move |Path(_id): Path<String>| {
1051 let active = active.clone();
1052 let max_seen = max_seen.clone();
1053 async move {
1054 let current = active.fetch_add(1, Ordering::SeqCst) + 1;
1055 let _ = max_seen.fetch_max(current, Ordering::SeqCst);
1056 sleep(Duration::from_millis(40)).await;
1057 active.fetch_sub(1, Ordering::SeqCst);
1058 Json(json!({ "ok": true }))
1059 }
1060 }),
1061 )
1062 };
1063 let base_url = start_server(router).await;
1064 let client = K2DbApiClient::new(K2DbApiClientOptions {
1065 base_url,
1066 api_key: Some("demo.secret".to_owned()),
1067 headers: Vec::new(),
1068 read_cache_ttl_ms: Some(0),
1069 max_concurrent_requests: Some(1),
1070 })
1071 .expect("client");
1072
1073 let options = RequestOptions {
1074 scope: Some("owner:demo".to_owned()),
1075 ..RequestOptions::default()
1076 };
1077
1078 let (left, right) = tokio::join!(
1079 client.get_by_id::<Value>("widgets", "a", &options),
1080 client.get_by_id::<Value>("widgets", "b", &options)
1081 );
1082 left.expect("left request");
1083 right.expect("right request");
1084 assert_eq!(max_seen.load(Ordering::SeqCst), 1);
1085 }
1086
1087 #[tokio::test]
1088 async fn patch_collection_calls_collection_patch_endpoint() {
1089 let router = Router::new().route(
1090 "/v1/widgets",
1091 patch(|| async { Json(json!({ "updated": 3 })) }),
1092 );
1093 let base_url = start_server(router).await;
1094 let client = K2DbApiClient::new(K2DbApiClientOptions {
1095 base_url,
1096 api_key: Some("demo.secret".to_owned()),
1097 headers: Vec::new(),
1098 read_cache_ttl_ms: Some(0),
1099 max_concurrent_requests: None,
1100 })
1101 .expect("client");
1102
1103 let result = client
1104 .patch_collection(
1105 "widgets",
1106 &PatchCollectionRequest {
1107 criteria: json!({ "kind": "demo" }),
1108 values: json!({ "name": "updated" }),
1109 },
1110 &RequestOptions {
1111 scope: Some("owner:demo".to_owned()),
1112 ..RequestOptions::default()
1113 },
1114 )
1115 .await
1116 .expect("patch collection");
1117
1118 assert_eq!(result.updated, 3);
1119 }
1120}