faucet_core/observability/
state.rs1use crate::error::FaucetError;
5use crate::observability::decorator::error_kind;
6use crate::observability::labels::Labels;
7use crate::observability::timer::DurationGuard;
8use crate::state::StateStore;
9use async_trait::async_trait;
10use metrics::{Label, SharedString, counter};
11use serde_json::Value;
12use std::sync::Arc;
13use tracing::{Instrument, info_span};
14
15pub struct InstrumentedStateStore {
18 inner: Arc<dyn StateStore>,
19 labels: Labels,
20}
21
22impl InstrumentedStateStore {
23 pub fn new(inner: Arc<dyn StateStore>, labels: Labels) -> Self {
24 Self { inner, labels }
25 }
26
27 fn base_labels(&self) -> Vec<Label> {
28 vec![
29 Label::new(
30 "pipeline",
31 SharedString::from(self.labels.pipeline.to_string()),
32 ),
33 Label::new("row", SharedString::from(self.labels.row.to_string())),
34 ]
35 }
36
37 fn error_labels(&self, op: &'static str, kind: &'static str) -> Vec<Label> {
38 let mut l = self.base_labels();
39 l.push(Label::new("op", SharedString::const_str(op)));
40 l.push(Label::new("kind", SharedString::const_str(kind)));
41 l
42 }
43}
44
45#[async_trait]
46impl StateStore for InstrumentedStateStore {
47 async fn get(&self, key: &str) -> Result<Option<Value>, FaucetError> {
48 let span = info_span!("faucet.state.get",
49 pipeline = %self.labels.pipeline,
50 row = %self.labels.row,
51 run_id = %self.labels.run_id,
52 key = key,
53 );
54 let base = self.base_labels();
55 let _timer = DurationGuard::new("faucet_state_get_duration_seconds", base.clone());
56 let result = self.inner.get(key).instrument(span).await;
57 match result {
58 Ok(Some(v)) => {
59 let mut l = base;
60 l.push(Label::new("outcome", SharedString::const_str("hit")));
61 counter!("faucet_state_get_total", l).increment(1);
62 Ok(Some(v))
63 }
64 Ok(None) => {
65 let mut l = base;
66 l.push(Label::new("outcome", SharedString::const_str("miss")));
67 counter!("faucet_state_get_total", l).increment(1);
68 Ok(None)
69 }
70 Err(e) => {
71 counter!(
72 "faucet_state_errors_total",
73 self.error_labels("get", error_kind(&e))
74 )
75 .increment(1);
76 Err(e)
77 }
78 }
79 }
80
81 async fn put(&self, key: &str, value: &Value) -> Result<(), FaucetError> {
82 let span = info_span!("faucet.state.put",
83 pipeline = %self.labels.pipeline,
84 row = %self.labels.row,
85 run_id = %self.labels.run_id,
86 key = key,
87 );
88 let base = self.base_labels();
89 let _timer = DurationGuard::new("faucet_state_put_duration_seconds", base.clone());
90 let result = self.inner.put(key, value).instrument(span).await;
91 match result {
92 Ok(()) => {
93 counter!("faucet_state_put_total", base).increment(1);
94 Ok(())
95 }
96 Err(e) => {
97 counter!(
98 "faucet_state_errors_total",
99 self.error_labels("put", error_kind(&e))
100 )
101 .increment(1);
102 Err(e)
103 }
104 }
105 }
106
107 async fn delete(&self, key: &str) -> Result<(), FaucetError> {
108 let span = info_span!("faucet.state.delete",
109 pipeline = %self.labels.pipeline,
110 row = %self.labels.row,
111 run_id = %self.labels.run_id,
112 key = key,
113 );
114 let base = self.base_labels();
115 let _timer = DurationGuard::new("faucet_state_delete_duration_seconds", base.clone());
116 let result = self.inner.delete(key).instrument(span).await;
117 match result {
118 Ok(()) => {
119 counter!("faucet_state_delete_total", base).increment(1);
120 Ok(())
121 }
122 Err(e) => {
123 counter!(
124 "faucet_state_errors_total",
125 self.error_labels("delete", error_kind(&e))
126 )
127 .increment(1);
128 Err(e)
129 }
130 }
131 }
132}
133
134#[cfg(test)]
135mod tests {
136 use super::*;
137 use crate::observability::decorator::source_tests::{LOCK, labels, snapshotter};
138 use crate::state::MemoryStateStore;
139 use metrics_util::debugging::DebugValue;
140 use serde_json::json;
141
142 #[tokio::test]
143 #[allow(clippy::await_holding_lock)]
144 async fn get_records_hit_outcome() {
145 let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
146 let snap = snapshotter();
147 let inner: Arc<dyn StateStore> = Arc::new(MemoryStateStore::new());
148 inner.put("k", &json!("v")).await.unwrap();
149 let wrapped = InstrumentedStateStore::new(Arc::clone(&inner), labels());
150 let result = wrapped.get("k").await.unwrap();
151 assert_eq!(result, Some(json!("v")));
152 let snapshot = snap.snapshot();
153 let found = snapshot.into_vec().into_iter().any(|(key, _u, _d, v)| {
154 key.key().name() == "faucet_state_get_total"
155 && key
156 .key()
157 .labels()
158 .any(|l| l.key() == "outcome" && l.value() == "hit")
159 && matches!(v, DebugValue::Counter(c) if c >= 1)
160 });
161 assert!(found, "expected faucet_state_get_total{{outcome=hit}}");
162 }
163
164 #[tokio::test]
165 #[allow(clippy::await_holding_lock)]
166 async fn get_records_miss_outcome() {
167 let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
168 let snap = snapshotter();
169 let inner: Arc<dyn StateStore> = Arc::new(MemoryStateStore::new());
170 let wrapped = InstrumentedStateStore::new(inner, labels());
171 assert!(wrapped.get("absent").await.unwrap().is_none());
172 let snapshot = snap.snapshot();
173 let found = snapshot.into_vec().into_iter().any(|(key, _u, _d, v)| {
174 key.key().name() == "faucet_state_get_total"
175 && key
176 .key()
177 .labels()
178 .any(|l| l.key() == "outcome" && l.value() == "miss")
179 && matches!(v, DebugValue::Counter(c) if c >= 1)
180 });
181 assert!(found, "expected faucet_state_get_total{{outcome=miss}}");
182 }
183}