request_shadow/
shadower.rs1use std::sync::Arc;
5
6use tokio::time::timeout;
7
8use crate::backend::{Backend, ResponseRecord};
9use crate::config::ShadowConfig;
10use crate::divergence::Divergence;
11use crate::error::ShadowError;
12use crate::log::{DivergenceEntry, DivergenceLog};
13
14#[derive(Debug)]
17pub struct ShadowOutcome {
18 pub primary: ResponseRecord,
20 pub shadow: Option<ResponseRecord>,
22 pub divergence: Option<Divergence>,
25 pub skipped_by_sampler: bool,
27 pub shadow_failed: Option<String>,
29}
30
31#[derive(Clone)]
33pub struct Shadower {
34 primary: Arc<dyn Backend>,
35 shadow: Arc<dyn Backend>,
36 config: ShadowConfig,
37 log: Arc<DivergenceLog>,
38}
39
40impl Shadower {
41 pub fn new(primary: Arc<dyn Backend>, shadow: Arc<dyn Backend>, config: ShadowConfig) -> Self {
43 Self {
44 primary,
45 shadow,
46 config,
47 log: Arc::new(DivergenceLog::default()),
48 }
49 }
50
51 #[must_use]
53 pub fn with_log(mut self, log: Arc<DivergenceLog>) -> Self {
54 self.log = log;
55 self
56 }
57
58 pub fn divergences(&self) -> Vec<DivergenceEntry> {
60 self.log.snapshot()
61 }
62
63 pub async fn call(&self, input: &[u8]) -> Result<ShadowOutcome, ShadowError> {
66 let should_shadow = self.config.should_shadow(input);
67
68 if !should_shadow {
69 let primary = self.primary.call(input).await?;
70 return Ok(ShadowOutcome {
71 primary,
72 shadow: None,
73 divergence: None,
74 skipped_by_sampler: true,
75 shadow_failed: None,
76 });
77 }
78
79 let primary_fut = self.primary.call(input);
80 let shadow_fut = timeout(self.config.shadow_timeout, self.shadow.call(input));
81
82 let (primary_res, shadow_res) = tokio::join!(primary_fut, shadow_fut);
83 let primary = primary_res?;
84
85 let (shadow, shadow_failed) = match shadow_res {
86 Ok(Ok(resp)) => (Some(resp), None),
87 Ok(Err(err)) => (None, Some(err.to_string())),
88 Err(_) => (None, Some("timeout".to_string())),
89 };
90
91 let divergence = match &shadow {
92 Some(s) => Divergence::compare(&primary, s, &self.config),
93 None => None,
94 };
95
96 if let Some(d) = &divergence {
97 self.log.push(DivergenceEntry {
98 key: input.to_vec(),
99 divergence: d.clone(),
100 });
101 }
102
103 Ok(ShadowOutcome {
104 primary,
105 shadow,
106 divergence,
107 skipped_by_sampler: false,
108 shadow_failed,
109 })
110 }
111
112 pub fn divergence_count(&self) -> usize {
114 self.log.len()
115 }
116}