1use std::collections::{HashMap, HashSet};
2use std::sync::{Arc, Mutex, OnceLock};
3use std::time::Duration;
4
5use reqwest::{header::CONTENT_TYPE, Client as HttpClient};
6use serde_json::json;
7use tracing::{debug, instrument, trace, warn};
8
9use crate::endpoints::Endpoint;
10use crate::event::BatchRequest;
11use crate::feature_flag_evaluations::{
12 EvaluateFlagsOptions, EvaluatedFlagRecord, FeatureFlagEvaluations, FeatureFlagEvaluationsHost,
13 FlagCalledEventParams,
14};
15use crate::feature_flags::{
16 match_feature_flag, FeatureFlag, FeatureFlagsResponse, FlagDetail, FlagValue,
17};
18use crate::local_evaluation::{AsyncFlagPoller, FlagCache, LocalEvaluationConfig, LocalEvaluator};
19use crate::{event::InnerEvent, Error, Event};
20
21use super::ClientOptions;
22
23const MAX_FLAG_CALLED_CACHE_SIZE: usize = 50_000;
26
27async fn check_response(response: reqwest::Response) -> Result<(), Error> {
28 let status = response.status().as_u16();
29 let body = response
30 .text()
31 .await
32 .unwrap_or_else(|_| "Unknown error".to_string());
33
34 match Error::from_http_response(status, body) {
35 Some(err) => Err(err),
36 None => Ok(()),
37 }
38}
39
40pub struct Client {
42 options: ClientOptions,
43 client: HttpClient,
44 local_evaluator: Option<LocalEvaluator>,
45 _flag_poller: Option<AsyncFlagPoller>,
46 flag_event_host: OnceLock<Arc<dyn FeatureFlagEvaluationsHost>>,
47}
48
49struct AsyncFlagEventHost {
54 http_client: HttpClient,
55 api_key: String,
56 capture_url: String,
57 disabled: bool,
58 disable_geoip: bool,
59 dedup_cache: Mutex<HashMap<String, HashSet<String>>>,
60 runtime: tokio::runtime::Handle,
66}
67
68impl AsyncFlagEventHost {
69 fn from_options(options: &ClientOptions, http_client: HttpClient) -> Self {
70 let capture_url = options.endpoints().build_url(Endpoint::Capture);
71 Self {
72 http_client,
73 api_key: options.api_key.clone(),
74 capture_url,
75 disabled: options.is_disabled(),
76 disable_geoip: options.disable_geoip,
77 dedup_cache: Mutex::new(HashMap::new()),
78 runtime: tokio::runtime::Handle::current(),
79 }
80 }
81
82 fn already_reported(&self, distinct_id: &str, dedup_key: &str) -> bool {
85 let mut cache = self.dedup_cache.lock().unwrap_or_else(|p| p.into_inner());
86 if let Some(seen) = cache.get(distinct_id) {
87 if seen.contains(dedup_key) {
88 return true;
89 }
90 }
91 if cache.len() >= MAX_FLAG_CALLED_CACHE_SIZE {
92 cache.clear();
93 }
94 cache
95 .entry(distinct_id.to_string())
96 .or_default()
97 .insert(dedup_key.to_string());
98 false
99 }
100
101 fn spawn_ship(&self, event: Event) {
102 if self.disabled {
103 return;
104 }
105 let inner_event = InnerEvent::new(event, self.api_key.clone());
106 let payload = match serde_json::to_string(&inner_event) {
107 Ok(p) => p,
108 Err(e) => {
109 debug!(error = %e, "failed to serialize $feature_flag_called event");
110 return;
111 }
112 };
113 let http_client = self.http_client.clone();
114 let url = self.capture_url.clone();
115 self.runtime.spawn(async move {
116 let response = match http_client
117 .post(&url)
118 .header(CONTENT_TYPE, "application/json")
119 .body(payload)
120 .send()
121 .await
122 {
123 Ok(r) => r,
124 Err(send_err) => {
125 let message = send_err.to_string();
126 debug!("failed to send $feature_flag_called event: {message}");
127 return;
128 }
129 };
130 if let Err(check_err) = check_response(response).await {
131 let message = check_err.to_string();
132 debug!("$feature_flag_called event rejected by server: {message}");
133 }
134 });
135 }
136}
137
138impl FeatureFlagEvaluationsHost for AsyncFlagEventHost {
139 fn capture_flag_called_event_if_needed(&self, params: FlagCalledEventParams) {
140 let dedup_key = build_dedup_key(¶ms.key, params.response.as_ref());
141 if self.already_reported(¶ms.distinct_id, &dedup_key) {
142 return;
143 }
144
145 let mut event = Event::new(
146 "$feature_flag_called".to_string(),
147 params.distinct_id.clone(),
148 );
149 for (k, v) in params.properties {
150 if event.insert_prop(k, v).is_err() {
151 return;
152 }
153 }
154 for (group_name, group_id) in ¶ms.groups {
155 event.add_group(group_name, group_id);
156 }
157 if params.disable_geoip.unwrap_or(self.disable_geoip) {
158 let _ = event.insert_prop("$geoip_disable", true);
159 }
160 self.spawn_ship(event);
161 }
162
163 fn log_warning(&self, message: &str) {
164 warn!("{message}");
167 }
168}
169
170fn build_dedup_key(flag_key: &str, response: Option<&FlagValue>) -> String {
171 let response_repr = match response {
172 Some(FlagValue::Boolean(true)) => "true".to_string(),
173 Some(FlagValue::Boolean(false)) => "false".to_string(),
174 Some(FlagValue::String(s)) => s.clone(),
175 None => "::null::".to_string(),
176 };
177 format!("{flag_key}_{response_repr}")
178}
179
180pub async fn client<C: Into<ClientOptions>>(options: C) -> Client {
182 let options = options.into().sanitize();
183 let client = HttpClient::builder()
184 .timeout(Duration::from_secs(options.request_timeout_seconds))
185 .build()
186 .unwrap(); let (local_evaluator, flag_poller) = if options.enable_local_evaluation {
189 if let Some(ref personal_key) = options.personal_api_key {
190 let cache = FlagCache::new();
191
192 let config = LocalEvaluationConfig {
193 personal_api_key: personal_key.clone(),
194 project_api_key: options.api_key.clone(),
195 api_host: options.endpoints().api_host(),
196 poll_interval: Duration::from_secs(options.poll_interval_seconds),
197 request_timeout: Duration::from_secs(options.request_timeout_seconds),
198 };
199
200 let mut poller = AsyncFlagPoller::new(config, cache.clone());
201 poller.start().await;
202
203 (Some(LocalEvaluator::new(cache)), Some(poller))
204 } else {
205 warn!("Local evaluation enabled but personal_api_key not set, falling back to API evaluation");
206 (None, None)
207 }
208 } else {
209 (None, None)
210 };
211
212 Client {
213 options,
214 client,
215 local_evaluator,
216 _flag_poller: flag_poller,
217 flag_event_host: OnceLock::new(),
218 }
219}
220
221impl Client {
222 #[instrument(skip(self, event), level = "debug")]
224 pub async fn capture(&self, mut event: Event) -> Result<(), Error> {
225 if self.options.is_disabled() {
226 trace!("Client is disabled, skipping capture");
227 return Ok(());
228 }
229
230 if self.options.disable_geoip {
232 event.insert_prop("$geoip_disable", true).ok();
233 }
234
235 let inner_event = InnerEvent::new(event, self.options.api_key.clone());
236
237 let payload =
238 serde_json::to_string(&inner_event).map_err(|e| Error::Serialization(e.to_string()))?;
239
240 let url = self.options.endpoints().build_url(Endpoint::Capture);
241
242 let response = self
243 .client
244 .post(&url)
245 .header(CONTENT_TYPE, "application/json")
246 .body(payload)
247 .send()
248 .await
249 .map_err(|e| Error::Connection(e.to_string()))?;
250
251 check_response(response).await
252 }
253
254 pub async fn capture_batch(
258 &self,
259 events: Vec<Event>,
260 historical_migration: bool,
261 ) -> Result<(), Error> {
262 if self.options.is_disabled() {
263 return Ok(());
264 }
265
266 let disable_geoip = self.options.disable_geoip;
267 let inner_events: Vec<InnerEvent> = events
268 .into_iter()
269 .map(|mut event| {
270 if disable_geoip {
271 event.insert_prop("$geoip_disable", true).ok();
272 }
273 InnerEvent::new(event, self.options.api_key.clone())
274 })
275 .collect();
276
277 let batch_request = BatchRequest {
278 api_key: self.options.api_key.clone(),
279 historical_migration,
280 batch: inner_events,
281 };
282 let payload = serde_json::to_string(&batch_request)
283 .map_err(|e| Error::Serialization(e.to_string()))?;
284 let url = self.options.endpoints().build_url(Endpoint::Batch);
285
286 let response = self
287 .client
288 .post(&url)
289 .header(CONTENT_TYPE, "application/json")
290 .body(payload)
291 .send()
292 .await
293 .map_err(|e| Error::Connection(e.to_string()))?;
294
295 check_response(response).await
296 }
297
298 #[must_use = "feature flags result should be used"]
300 pub async fn get_feature_flags<S: Into<String>>(
301 &self,
302 distinct_id: S,
303 groups: Option<HashMap<String, String>>,
304 person_properties: Option<HashMap<String, serde_json::Value>>,
305 group_properties: Option<HashMap<String, HashMap<String, serde_json::Value>>>,
306 ) -> Result<
307 (
308 HashMap<String, FlagValue>,
309 HashMap<String, serde_json::Value>,
310 ),
311 Error,
312 > {
313 let flags_endpoint = self.options.endpoints().build_url(Endpoint::Flags);
314
315 let mut payload = json!({
316 "api_key": self.options.api_key,
317 "distinct_id": distinct_id.into(),
318 });
319
320 if let Some(groups) = groups {
321 payload["groups"] = json!(groups);
322 }
323
324 if let Some(person_properties) = person_properties {
325 payload["person_properties"] = json!(person_properties);
326 }
327
328 if let Some(group_properties) = group_properties {
329 payload["group_properties"] = json!(group_properties);
330 }
331
332 if self.options.disable_geoip {
334 payload["disable_geoip"] = json!(true);
335 }
336
337 let response = self
338 .client
339 .post(&flags_endpoint)
340 .header(CONTENT_TYPE, "application/json")
341 .json(&payload)
342 .timeout(Duration::from_secs(
343 self.options.feature_flags_request_timeout_seconds,
344 ))
345 .send()
346 .await
347 .map_err(|e| Error::Connection(e.to_string()))?;
348
349 if !response.status().is_success() {
350 let status = response.status();
351 let text = response
352 .text()
353 .await
354 .unwrap_or_else(|_| "Unknown error".to_string());
355 return Err(Error::Connection(format!(
356 "API request failed with status {status}: {text}"
357 )));
358 }
359
360 let flags_response = response.json::<FeatureFlagsResponse>().await.map_err(|e| {
361 Error::Serialization(format!("Failed to parse feature flags response: {e}"))
362 })?;
363
364 Ok(flags_response.normalize())
365 }
366
367 #[must_use = "feature flag result should be used"]
369 #[instrument(skip_all, level = "debug")]
370 #[deprecated(
371 since = "0.6.0",
372 note = "Use Client::evaluate_flags() to fetch a snapshot, then call .get_flag(key) on it. \
373 The snapshot deduplicates $feature_flag_called events and supports attaching \
374 rich metadata to captured events via Event::with_flags()."
375 )]
376 pub async fn get_feature_flag<K: Into<String>, D: Into<String>>(
377 &self,
378 key: K,
379 distinct_id: D,
380 groups: Option<HashMap<String, String>>,
381 person_properties: Option<HashMap<String, serde_json::Value>>,
382 group_properties: Option<HashMap<String, HashMap<String, serde_json::Value>>>,
383 ) -> Result<Option<FlagValue>, Error> {
384 let key_str = key.into();
385 let distinct_id_str = distinct_id.into();
386
387 if let Some(ref evaluator) = self.local_evaluator {
389 let empty_props = HashMap::new();
390 let empty_groups: HashMap<String, String> = HashMap::new();
391 let empty_group_props: HashMap<String, HashMap<String, serde_json::Value>> =
392 HashMap::new();
393 let props = person_properties.as_ref().unwrap_or(&empty_props);
394 let groups_ref = groups.as_ref().unwrap_or(&empty_groups);
395 let group_props_ref = group_properties.as_ref().unwrap_or(&empty_group_props);
396 match evaluator.evaluate_flag(
397 &key_str,
398 &distinct_id_str,
399 props,
400 groups_ref,
401 group_props_ref,
402 ) {
403 Ok(Some(value)) => {
404 debug!(flag = %key_str, ?value, "Flag evaluated locally");
405 return Ok(Some(value));
406 }
407 Ok(None) => {
408 if self.options.local_evaluation_only {
409 debug!(flag = %key_str, "Flag not found locally, skipping remote fallback");
410 return Ok(None);
411 }
412 debug!(flag = %key_str, "Flag not found locally, falling back to API");
413 }
414 Err(e) => {
415 if self.options.local_evaluation_only {
416 debug!(flag = %key_str, error = %e.message, "Inconclusive local evaluation, skipping remote fallback");
417 return Ok(None);
418 }
419 debug!(flag = %key_str, error = %e.message, "Inconclusive local evaluation, falling back to API");
420 }
421 }
422 }
423
424 trace!(flag = %key_str, "Fetching flag from API");
426 let (feature_flags, _payloads) = self
427 .get_feature_flags(distinct_id_str, groups, person_properties, group_properties)
428 .await?;
429 Ok(feature_flags.get(&key_str).cloned())
430 }
431
432 #[must_use = "feature flag enabled check result should be used"]
434 #[deprecated(
435 since = "0.6.0",
436 note = "Use Client::evaluate_flags() to fetch a snapshot, then call .is_enabled(key) \
437 on it. The snapshot deduplicates $feature_flag_called events and supports \
438 attaching rich metadata to captured events via Event::with_flags()."
439 )]
440 #[allow(deprecated)] pub async fn is_feature_enabled<K: Into<String>, D: Into<String>>(
442 &self,
443 key: K,
444 distinct_id: D,
445 groups: Option<HashMap<String, String>>,
446 person_properties: Option<HashMap<String, serde_json::Value>>,
447 group_properties: Option<HashMap<String, HashMap<String, serde_json::Value>>>,
448 ) -> Result<bool, Error> {
449 let flag_value = self
450 .get_feature_flag(
451 key.into(),
452 distinct_id.into(),
453 groups,
454 person_properties,
455 group_properties,
456 )
457 .await?;
458 Ok(match flag_value {
459 Some(FlagValue::Boolean(b)) => b,
460 Some(FlagValue::String(_)) => true, None => false,
462 })
463 }
464
465 #[must_use = "feature flag payload result should be used"]
467 #[deprecated(
468 since = "0.6.0",
469 note = "Use Client::evaluate_flags() to fetch a snapshot, then call \
470 .get_flag_payload(key) on it. Reading the payload from a snapshot is \
471 event-free, matching this method's behavior, and avoids the per-call \
472 /flags request."
473 )]
474 pub async fn get_feature_flag_payload<K: Into<String>, D: Into<String>>(
475 &self,
476 key: K,
477 distinct_id: D,
478 ) -> Result<Option<serde_json::Value>, Error> {
479 let key_str = key.into();
480 let flags_endpoint = self.options.endpoints().build_url(Endpoint::Flags);
481
482 let mut payload = json!({
483 "api_key": self.options.api_key,
484 "distinct_id": distinct_id.into(),
485 });
486
487 if self.options.disable_geoip {
489 payload["disable_geoip"] = json!(true);
490 }
491
492 let response = self
493 .client
494 .post(&flags_endpoint)
495 .header(CONTENT_TYPE, "application/json")
496 .json(&payload)
497 .timeout(Duration::from_secs(
498 self.options.feature_flags_request_timeout_seconds,
499 ))
500 .send()
501 .await
502 .map_err(|e| Error::Connection(e.to_string()))?;
503
504 if !response.status().is_success() {
505 return Ok(None);
506 }
507
508 let flags_response: FeatureFlagsResponse = response
509 .json()
510 .await
511 .map_err(|e| Error::Serialization(format!("Failed to parse response: {e}")))?;
512
513 let (_flags, payloads) = flags_response.normalize();
514 Ok(payloads.get(&key_str).cloned())
515 }
516
517 #[allow(clippy::too_many_arguments)]
522 pub fn evaluate_feature_flag_locally(
523 &self,
524 flag: &FeatureFlag,
525 distinct_id: &str,
526 person_properties: &HashMap<String, serde_json::Value>,
527 groups: &HashMap<String, String>,
528 group_properties: &HashMap<String, HashMap<String, serde_json::Value>>,
529 ) -> Result<FlagValue, Error> {
530 let group_type_mapping = self
531 .local_evaluator
532 .as_ref()
533 .map(|ev| ev.cache().get_group_type_mapping())
534 .unwrap_or_default();
535 match_feature_flag(
536 flag,
537 distinct_id,
538 person_properties,
539 groups,
540 group_properties,
541 &group_type_mapping,
542 )
543 .map_err(|e| Error::InconclusiveMatch(e.message))
544 }
545
546 pub async fn evaluate_flags<S: Into<String>>(
557 &self,
558 distinct_id: S,
559 options: EvaluateFlagsOptions,
560 ) -> Result<FeatureFlagEvaluations, Error> {
561 let distinct_id: String = distinct_id.into();
562 let host = self.flag_event_host();
563
564 if distinct_id.is_empty() || self.options.is_disabled() {
565 return Ok(FeatureFlagEvaluations::empty(host));
566 }
567
568 let mut records: HashMap<String, EvaluatedFlagRecord> = HashMap::new();
569 let mut locally_evaluated_keys: HashSet<String> = HashSet::new();
570
571 if let Some(evaluator) = &self.local_evaluator {
572 let person_props_owned = options.person_properties.clone().unwrap_or_default();
573 let groups_owned = options.groups.clone().unwrap_or_default();
574 let group_props_owned = options.group_properties.clone().unwrap_or_default();
575 let local_results = evaluator.evaluate_all_flags(
576 &distinct_id,
577 &person_props_owned,
578 &groups_owned,
579 &group_props_owned,
580 );
581 for (key, result) in local_results {
582 if let Some(filter) = &options.flag_keys {
583 if !filter.iter().any(|k| k == &key) {
584 continue;
585 }
586 }
587 if let Ok(value) = result {
588 records.insert(key.clone(), local_record(value));
589 locally_evaluated_keys.insert(key);
590 }
591 }
592 }
593
594 let mut request_id: Option<String> = None;
595 let mut errors_while_computing = false;
596 let mut quota_limited = false;
597
598 let local_covers_request = options
603 .flag_keys
604 .as_ref()
605 .is_some_and(|keys| keys.iter().all(|k| locally_evaluated_keys.contains(k)));
606
607 if !options.only_evaluate_locally && !local_covers_request {
608 match self.fetch_flag_details(&distinct_id, &options).await {
613 Ok(response) => {
614 request_id = response.request_id;
615 errors_while_computing = response.errors_while_computing_flags;
616 quota_limited = response.quota_limited;
617 for (key, detail) in response.flags {
618 if locally_evaluated_keys.contains(&key) {
619 continue;
620 }
621 records.insert(key, remote_record_from_detail(detail));
622 }
623 }
624 Err(e) => {
625 if records.is_empty() {
626 return Err(e);
627 }
628 debug!(
629 error = e.to_string(),
630 local_count = records.len(),
631 "/flags fetch failed; returning snapshot from local results only"
632 );
633 errors_while_computing = true;
634 }
635 }
636 }
637
638 Ok(FeatureFlagEvaluations::new(
639 host,
640 distinct_id,
641 records,
642 options.groups.unwrap_or_default(),
643 options.disable_geoip,
644 request_id,
645 None,
646 errors_while_computing,
647 quota_limited,
648 ))
649 }
650
651 fn flag_event_host(&self) -> Arc<dyn FeatureFlagEvaluationsHost> {
652 self.flag_event_host
653 .get_or_init(|| {
654 Arc::new(AsyncFlagEventHost::from_options(
655 &self.options,
656 self.client.clone(),
657 )) as Arc<dyn FeatureFlagEvaluationsHost>
658 })
659 .clone()
660 }
661
662 async fn fetch_flag_details(
663 &self,
664 distinct_id: &str,
665 options: &EvaluateFlagsOptions,
666 ) -> Result<DetailedFlagsResponse, Error> {
667 let flags_endpoint = self.options.endpoints().build_url(Endpoint::Flags);
668
669 let mut payload = json!({
670 "api_key": self.options.api_key,
671 "distinct_id": distinct_id,
672 });
673 if let Some(groups) = &options.groups {
674 payload["groups"] = json!(groups);
675 }
676 if let Some(person_properties) = &options.person_properties {
677 payload["person_properties"] = json!(person_properties);
678 }
679 if let Some(group_properties) = &options.group_properties {
680 payload["group_properties"] = json!(group_properties);
681 }
682 let effective_disable_geoip = options.disable_geoip.unwrap_or(self.options.disable_geoip);
683 if effective_disable_geoip {
684 payload["disable_geoip"] = json!(true);
685 }
686 if let Some(flag_keys) = &options.flag_keys {
687 payload["flag_keys_to_evaluate"] = json!(flag_keys);
688 }
689
690 let response = self
691 .client
692 .post(&flags_endpoint)
693 .header(CONTENT_TYPE, "application/json")
694 .json(&payload)
695 .timeout(Duration::from_secs(
696 self.options.feature_flags_request_timeout_seconds,
697 ))
698 .send()
699 .await
700 .map_err(|e| Error::Connection(e.to_string()))?;
701
702 if !response.status().is_success() {
703 let status = response.status();
704 let text = response
705 .text()
706 .await
707 .unwrap_or_else(|_| "Unknown error".to_string());
708 return Err(Error::Connection(format!(
709 "API request failed with status {status}: {text}"
710 )));
711 }
712
713 let parsed = response.json::<FeatureFlagsResponse>().await.map_err(|e| {
714 Error::Serialization(format!("Failed to parse feature flags response: {e}"))
715 })?;
716 Ok(extract_flag_details(parsed))
717 }
718}
719
720struct DetailedFlagsResponse {
723 flags: HashMap<String, FlagDetail>,
724 request_id: Option<String>,
725 errors_while_computing_flags: bool,
726 quota_limited: bool,
727}
728
729fn extract_flag_details(response: FeatureFlagsResponse) -> DetailedFlagsResponse {
730 match response {
731 FeatureFlagsResponse::V2 {
732 flags,
733 request_id,
734 errors_while_computing_flags,
735 quota_limited,
736 } => DetailedFlagsResponse {
737 flags,
738 request_id,
739 errors_while_computing_flags,
740 quota_limited,
741 },
742 FeatureFlagsResponse::Legacy {
743 feature_flags,
744 feature_flag_payloads,
745 errors,
746 } => {
747 let mut flags = HashMap::new();
748 for (key, value) in feature_flags {
749 let (enabled, variant) = match value {
750 FlagValue::Boolean(b) => (b, None),
751 FlagValue::String(s) => (true, Some(s)),
752 };
753 let payload = feature_flag_payloads.get(&key).cloned();
754 flags.insert(
755 key.clone(),
756 FlagDetail {
757 key,
758 enabled,
759 variant,
760 reason: None,
761 metadata: payload.map(|payload| crate::feature_flags::FlagMetadata {
762 id: 0,
763 version: 0,
764 description: None,
765 payload: Some(payload),
766 }),
767 },
768 );
769 }
770 DetailedFlagsResponse {
771 flags,
772 request_id: None,
773 errors_while_computing_flags: errors.is_some_and(|e| !e.is_empty()),
774 quota_limited: false,
775 }
776 }
777 }
778}
779
780fn local_record(value: FlagValue) -> EvaluatedFlagRecord {
781 let (enabled, variant) = match value {
782 FlagValue::Boolean(b) => (b, None),
783 FlagValue::String(s) => (true, Some(s)),
784 };
785 EvaluatedFlagRecord {
786 enabled,
787 variant,
788 payload: None,
790 id: None,
791 version: None,
792 reason: Some("Evaluated locally".to_string()),
793 locally_evaluated: true,
794 }
795}
796
797fn remote_record_from_detail(detail: FlagDetail) -> EvaluatedFlagRecord {
798 let metadata = detail.metadata;
799 let reason = detail
800 .reason
801 .and_then(|r| r.description.or(Some(r.code)))
802 .filter(|s| !s.is_empty());
803 let id = metadata.as_ref().map(|m| m.id);
804 let version = metadata.as_ref().map(|m| m.version);
805 let payload = metadata.and_then(|m| m.payload).map(normalize_payload);
806 EvaluatedFlagRecord {
807 enabled: detail.enabled,
808 variant: detail.variant,
809 payload,
810 id,
811 version,
812 reason,
813 locally_evaluated: false,
814 }
815}
816
817fn normalize_payload(payload: serde_json::Value) -> serde_json::Value {
822 match payload {
823 serde_json::Value::String(raw) => {
824 serde_json::from_str(&raw).unwrap_or(serde_json::Value::String(raw))
825 }
826 other => other,
827 }
828}