uni_plugin/
observability.rs1use std::time::Duration;
15
16use crate::plugin::PluginId;
17use crate::qname::QName;
18
19#[derive(Clone, Copy, Debug, PartialEq, Eq)]
21#[non_exhaustive]
22pub enum InvocationKind {
23 Scalar,
25 Aggregate,
27 Window,
29 Procedure,
31 LocyAggregate,
33 LocyPredicate,
35 Operator,
37 Index,
39 Storage,
41 Algorithm,
43 Crdt,
45 Hook,
47 Trigger,
49 BackgroundJob,
51 Type,
53 Auth,
55 Authz,
57 Connector,
59}
60
61impl InvocationKind {
62 #[must_use]
64 pub fn as_str(&self) -> &'static str {
65 match self {
66 Self::Scalar => "scalar",
67 Self::Aggregate => "aggregate",
68 Self::Window => "window",
69 Self::Procedure => "procedure",
70 Self::LocyAggregate => "locy_aggregate",
71 Self::LocyPredicate => "locy_predicate",
72 Self::Operator => "operator",
73 Self::Index => "index",
74 Self::Storage => "storage",
75 Self::Algorithm => "algorithm",
76 Self::Crdt => "crdt",
77 Self::Hook => "hook",
78 Self::Trigger => "trigger",
79 Self::BackgroundJob => "background_job",
80 Self::Type => "type",
81 Self::Auth => "auth",
82 Self::Authz => "authz",
83 Self::Connector => "connector",
84 }
85 }
86}
87
88pub fn record_invocation(
95 plugin: &PluginId,
96 qname: &QName,
97 kind: InvocationKind,
98 rows: u64,
99 elapsed: Duration,
100 ok: bool,
101) {
102 tracing::debug!(
103 plugin.id = plugin.as_str(),
104 plugin.qname = %qname,
105 plugin.kind = kind.as_str(),
106 batch.rows = rows,
107 duration_ms = elapsed.as_millis() as u64,
108 result.ok = ok,
109 "plugin.invoke"
110 );
111}
112
113#[derive(Clone, Debug, Default)]
123#[non_exhaustive]
124pub struct TraceContext {
125 pub trace_id: Vec<u8>,
127 pub span_id: Vec<u8>,
129 pub trace_flags: u8,
131}
132
133impl TraceContext {
134 #[must_use]
140 pub fn to_traceparent(&self) -> Option<String> {
141 use std::fmt::Write as _;
142 if self.trace_id.len() != 16 || self.span_id.len() != 8 {
143 return None;
144 }
145 let mut s = String::with_capacity(55);
147 s.push_str("00-");
148 for b in &self.trace_id {
149 let _ = write!(s, "{b:02x}");
150 }
151 s.push('-');
152 for b in &self.span_id {
153 let _ = write!(s, "{b:02x}");
154 }
155 let _ = write!(s, "-{:02x}", self.trace_flags);
156 Some(s)
157 }
158}
159
160#[must_use]
169pub fn current_trace_context() -> TraceContext {
170 #[cfg(feature = "otel")]
171 {
172 use opentelemetry::trace::TraceContextExt as _;
173 use tracing_opentelemetry::OpenTelemetrySpanExt as _;
174
175 let span = tracing::Span::current();
176 let ctx = span.context();
177 let span_ref = ctx.span();
178 let sc = span_ref.span_context();
179 if sc.is_valid() {
180 return TraceContext {
181 trace_id: sc.trace_id().to_bytes().to_vec(),
182 span_id: sc.span_id().to_bytes().to_vec(),
183 trace_flags: sc.trace_flags().to_u8(),
184 };
185 }
186 }
187 TraceContext::default()
188}
189
190#[cfg(test)]
191mod tests {
192 use super::*;
193
194 #[test]
195 fn invocation_kind_strings_are_stable() {
196 assert_eq!(InvocationKind::Scalar.as_str(), "scalar");
197 assert_eq!(InvocationKind::Procedure.as_str(), "procedure");
198 assert_eq!(InvocationKind::LocyAggregate.as_str(), "locy_aggregate");
199 assert_eq!(InvocationKind::BackgroundJob.as_str(), "background_job");
200 }
201
202 #[test]
203 fn record_invocation_does_not_panic_without_subscriber() {
204 record_invocation(
205 &PluginId::new("test"),
206 &QName::builtin("identity"),
207 InvocationKind::Scalar,
208 128,
209 Duration::from_micros(50),
210 true,
211 );
212 }
213
214 #[test]
215 fn trace_context_empty_without_otel_layer() {
216 let c = current_trace_context();
220 assert!(c.trace_id.is_empty());
221 assert!(c.span_id.is_empty());
222 assert!(c.to_traceparent().is_none());
223 }
224
225 #[test]
226 fn empty_context_has_no_traceparent() {
227 assert!(TraceContext::default().to_traceparent().is_none());
228 }
229
230 #[cfg(feature = "otel")]
235 #[test]
236 fn current_trace_context_extracts_valid_context_under_otel_layer() {
237 use opentelemetry::trace::TracerProvider as _;
238 use tracing_subscriber::prelude::*;
239
240 let provider = opentelemetry_sdk::trace::SdkTracerProvider::builder().build();
241 let tracer = provider.tracer("uni-plugin-test");
242 let subscriber =
243 tracing_subscriber::registry().with(tracing_opentelemetry::layer().with_tracer(tracer));
244
245 tracing::subscriber::with_default(subscriber, || {
246 let span = tracing::info_span!("otel-test-span");
247 let _enter = span.enter();
248 let c = current_trace_context();
249 assert_eq!(c.trace_id.len(), 16, "trace id should be 16 bytes");
250 assert_eq!(c.span_id.len(), 8, "span id should be 8 bytes");
251 let tp = c
252 .to_traceparent()
253 .expect("a valid context renders a traceparent");
254 assert!(tp.starts_with("00-"), "traceparent: {tp}");
255 assert_eq!(tp.len(), 55, "traceparent: {tp}");
257 });
258 }
259}