Skip to main content

faucet_core/observability/
state.rs

1//! Decorator wrapping a `&dyn StateStore` to emit spans + metrics around
2//! every get / put / delete call.
3
4use crate::error::FaucetError;
5use crate::observability::decorator::error_kind;
6use crate::observability::labels::Labels;
7use crate::observability::timer::DurationGuard;
8use crate::state::StateStore;
9use async_trait::async_trait;
10use metrics::{Label, SharedString, counter};
11use serde_json::Value;
12use std::sync::Arc;
13use tracing::{Instrument, info_span};
14
15/// Wraps an `Arc<dyn StateStore>` and emits faucet.state.* spans + metrics
16/// around every operation.
17pub struct InstrumentedStateStore {
18    inner: Arc<dyn StateStore>,
19    labels: Labels,
20}
21
22impl InstrumentedStateStore {
23    pub fn new(inner: Arc<dyn StateStore>, labels: Labels) -> Self {
24        Self { inner, labels }
25    }
26
27    fn base_labels(&self) -> Vec<Label> {
28        vec![
29            Label::new(
30                "pipeline",
31                SharedString::from(self.labels.pipeline.to_string()),
32            ),
33            Label::new("row", SharedString::from(self.labels.row.to_string())),
34        ]
35    }
36
37    fn error_labels(&self, op: &'static str, kind: &'static str) -> Vec<Label> {
38        let mut l = self.base_labels();
39        l.push(Label::new("op", SharedString::const_str(op)));
40        l.push(Label::new("kind", SharedString::const_str(kind)));
41        l
42    }
43}
44
45#[async_trait]
46impl StateStore for InstrumentedStateStore {
47    async fn get(&self, key: &str) -> Result<Option<Value>, FaucetError> {
48        let span = info_span!("faucet.state.get",
49            pipeline = %self.labels.pipeline,
50            row = %self.labels.row,
51            run_id = %self.labels.run_id,
52            key = key,
53        );
54        let base = self.base_labels();
55        let _timer = DurationGuard::new("faucet_state_get_duration_seconds", base.clone());
56        let result = self.inner.get(key).instrument(span).await;
57        match result {
58            Ok(Some(v)) => {
59                let mut l = base;
60                l.push(Label::new("outcome", SharedString::const_str("hit")));
61                counter!("faucet_state_get_total", l).increment(1);
62                Ok(Some(v))
63            }
64            Ok(None) => {
65                let mut l = base;
66                l.push(Label::new("outcome", SharedString::const_str("miss")));
67                counter!("faucet_state_get_total", l).increment(1);
68                Ok(None)
69            }
70            Err(e) => {
71                counter!(
72                    "faucet_state_errors_total",
73                    self.error_labels("get", error_kind(&e))
74                )
75                .increment(1);
76                Err(e)
77            }
78        }
79    }
80
81    async fn put(&self, key: &str, value: &Value) -> Result<(), FaucetError> {
82        let span = info_span!("faucet.state.put",
83            pipeline = %self.labels.pipeline,
84            row = %self.labels.row,
85            run_id = %self.labels.run_id,
86            key = key,
87        );
88        let base = self.base_labels();
89        let _timer = DurationGuard::new("faucet_state_put_duration_seconds", base.clone());
90        let result = self.inner.put(key, value).instrument(span).await;
91        match result {
92            Ok(()) => {
93                counter!("faucet_state_put_total", base).increment(1);
94                Ok(())
95            }
96            Err(e) => {
97                counter!(
98                    "faucet_state_errors_total",
99                    self.error_labels("put", error_kind(&e))
100                )
101                .increment(1);
102                Err(e)
103            }
104        }
105    }
106
107    async fn delete(&self, key: &str) -> Result<(), FaucetError> {
108        let span = info_span!("faucet.state.delete",
109            pipeline = %self.labels.pipeline,
110            row = %self.labels.row,
111            run_id = %self.labels.run_id,
112            key = key,
113        );
114        let base = self.base_labels();
115        let _timer = DurationGuard::new("faucet_state_delete_duration_seconds", base.clone());
116        let result = self.inner.delete(key).instrument(span).await;
117        match result {
118            Ok(()) => {
119                counter!("faucet_state_delete_total", base).increment(1);
120                Ok(())
121            }
122            Err(e) => {
123                counter!(
124                    "faucet_state_errors_total",
125                    self.error_labels("delete", error_kind(&e))
126                )
127                .increment(1);
128                Err(e)
129            }
130        }
131    }
132}
133
134#[cfg(test)]
135mod tests {
136    use super::*;
137    use crate::observability::decorator::source_tests::{LOCK, labels, snapshotter};
138    use crate::state::MemoryStateStore;
139    use metrics_util::debugging::DebugValue;
140    use serde_json::json;
141
142    #[tokio::test]
143    #[allow(clippy::await_holding_lock)]
144    async fn get_records_hit_outcome() {
145        let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
146        let snap = snapshotter();
147        let inner: Arc<dyn StateStore> = Arc::new(MemoryStateStore::new());
148        inner.put("k", &json!("v")).await.unwrap();
149        let wrapped = InstrumentedStateStore::new(Arc::clone(&inner), labels());
150        let result = wrapped.get("k").await.unwrap();
151        assert_eq!(result, Some(json!("v")));
152        let snapshot = snap.snapshot();
153        let found = snapshot.into_vec().into_iter().any(|(key, _u, _d, v)| {
154            key.key().name() == "faucet_state_get_total"
155                && key
156                    .key()
157                    .labels()
158                    .any(|l| l.key() == "outcome" && l.value() == "hit")
159                && matches!(v, DebugValue::Counter(c) if c >= 1)
160        });
161        assert!(found, "expected faucet_state_get_total{{outcome=hit}}");
162    }
163
164    #[tokio::test]
165    #[allow(clippy::await_holding_lock)]
166    async fn get_records_miss_outcome() {
167        let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
168        let snap = snapshotter();
169        let inner: Arc<dyn StateStore> = Arc::new(MemoryStateStore::new());
170        let wrapped = InstrumentedStateStore::new(inner, labels());
171        assert!(wrapped.get("absent").await.unwrap().is_none());
172        let snapshot = snap.snapshot();
173        let found = snapshot.into_vec().into_iter().any(|(key, _u, _d, v)| {
174            key.key().name() == "faucet_state_get_total"
175                && key
176                    .key()
177                    .labels()
178                    .any(|l| l.key() == "outcome" && l.value() == "miss")
179                && matches!(v, DebugValue::Counter(c) if c >= 1)
180        });
181        assert!(found, "expected faucet_state_get_total{{outcome=miss}}");
182    }
183}