Skip to main content

couchbase_core/
tracingcomponent.rs

1/*
2 *
3 *  * Copyright (c) 2025 Couchbase, Inc.
4 *  *
5 *  * Licensed under the Apache License, Version 2.0 (the "License");
6 *  * you may not use this file except in compliance with the License.
7 *  * You may obtain a copy of the License at
8 *  *
9 *  *    http://www.apache.org/licenses/LICENSE-2.0
10 *  *
11 *  * Unless required by applicable law or agreed to in writing, software
12 *  * distributed under the License is distributed on an "AS IS" BASIS,
13 *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  * See the License for the specific language governing permissions and
15 *  * limitations under the License.
16 *
17 */
18use crate::clusterlabels::ClusterLabels;
19use crate::memdx::durability_level::DurabilityLevel;
20use crate::util::get_host_port_from_uri;
21use arc_swap::ArcSwap;
22use std::fmt::Display;
23use std::future::Future;
24use std::net::SocketAddr;
25use std::sync::Arc;
26use std::time::{Duration, Instant};
27use tracing::{instrument, span, trace_span, Instrument, Level, Span};
28use url::Url;
29
30pub const SPAN_NAME_DISPATCH_TO_SERVER: &str = "dispatch_to_server";
31pub const SPAN_NAME_REQUEST_ENCODING: &str = "request_encoding";
32
33pub const SPAN_ATTRIB_DB_SYSTEM_KEY: &str = "db.system.name";
34pub const SPAN_ATTRIB_DB_SYSTEM_VALUE: &str = "couchbase";
35pub const SPAN_ATTRIB_OPERATION_ID_KEY: &str = "couchbase.operation_id";
36pub const SPAN_ATTRIB_OPERATION_KEY: &str = "db.operation.name";
37pub const SPAN_ATTRIB_RETRIES: &str = "couchbase.retries";
38pub const SPAN_ATTRIB_LOCAL_ID_KEY: &str = "couchbase.local_id";
39pub const SPAN_ATTRIB_NET_TRANSPORT_KEY: &str = "network.transport";
40pub const SPAN_ATTRIB_NET_TRANSPORT_VALUE: &str = "tcp";
41pub const SPAN_ATTRIB_NET_REMOTE_ADDRESS_KEY: &str = "server.address";
42pub const SPAN_ATTRIB_NET_REMOTE_PORT_KEY: &str = "server.port";
43pub const SPAN_ATTRIB_NET_PEER_ADDRESS_KEY: &str = "network.peer.address";
44pub const SPAN_ATTRIB_NET_PEER_PORT_KEY: &str = "network.peer.port";
45pub const SPAN_ATTRIB_SERVER_DURATION_KEY: &str = "couchbase.server_duration";
46pub const SPAN_ATTRIB_SERVICE_KEY: &str = "couchbase.service";
47pub const SPAN_ATTRIB_DB_NAME_KEY: &str = "db.namespace";
48pub const SPAN_ATTRIB_DB_COLLECTION_NAME_KEY: &str = "couchbase.collection.name";
49pub const SPAN_ATTRIB_DB_SCOPE_NAME_KEY: &str = "couchbase.scope.name";
50pub const SPAN_ATTRIB_DB_DURABILITY: &str = "couchbase.durability";
51pub const SPAN_ATTRIB_NUM_RETRIES: &str = "couchbase.retries";
52pub const SPAN_ATTRIB_CLUSTER_UUID_KEY: &str = "couchbase.cluster.uuid";
53pub const SPAN_ATTRIB_CLUSTER_NAME_KEY: &str = "couchbase.cluster.name";
54
55pub const METER_NAME_CB_OPERATION_DURATION: &str = "db.client.operation.duration";
56pub const METER_ATTRIB_SERVICE_KEY: &str = "couchbase.service";
57pub const METER_ATTRIB_OPERATION_KEY: &str = "db.operation.name";
58pub const METER_ATTRIB_BUCKET_NAME_KEY: &str = "db.namespace";
59pub const METER_ATTRIB_SCOPE_NAME_KEY: &str = "couchbase.scope.name";
60pub const METER_ATTRIB_COLLECTION_NAME_KEY: &str = "couchbase.collection.name";
61pub const METER_ATTRIB_ERROR_KEY: &str = "error.type";
62pub const METER_ATTRIB_CLUSTER_UUID_KEY: &str = "couchbase.cluster.uuid";
63pub const METER_ATTRIB_CLUSTER_NAME_KEY: &str = "couchbase.cluster.name";
64
65pub const SERVICE_VALUE_KV: &str = "kv";
66pub const SERVICE_VALUE_QUERY: &str = "query";
67pub const SERVICE_VALUE_ANALYTICS: &str = "analytics";
68pub const SERVICE_VALUE_SEARCH: &str = "search";
69pub const SERVICE_VALUE_MANAGEMENT: &str = "management";
70pub const SERVICE_VALUE_EVENTING: &str = "eventing";
71
72pub const SPAN_ATTRIB_OTEL_KIND_KEY: &str = "otel.kind";
73pub const SPAN_ATTRIB_OTEL_KIND_CLIENT_VALUE: &str = "client";
74
75macro_rules! record_operation_metric_event {
76    (
77        $duration:expr,
78        $operation:expr,
79        $cluster_name:expr,
80        $cluster_uuid:expr,
81        $error:expr
82        $(, $($field:ident).+ = $value:expr )*
83    ) => {{
84        let cluster_name = $cluster_name;
85        let cluster_uuid = $cluster_uuid;
86        let error = $error;
87        if let Some(error) = error {
88            match (cluster_name, cluster_uuid) {
89                (Some(name), Some(uuid)) => tracing::event!(
90                    target: "couchbase::metrics",
91                    Level::TRACE,
92                    histogram.db.client.operation.duration = $duration,
93                    otel.unit = "s",
94                    db.operation.name = $operation,
95                    $( $($field).+ = $value, )*
96                    couchbase.cluster.name = name,
97                    couchbase.cluster.uuid = uuid,
98                    error.type = error,
99                ),
100                (Some(name), None) => tracing::event!(
101                    target: "couchbase::metrics",
102                    Level::TRACE,
103                    histogram.db.client.operation.duration = $duration,
104                    otel.unit = "s",
105                    db.operation.name = $operation,
106                    $( $($field).+ = $value, )*
107                    couchbase.cluster.name = name,
108                    error.type = error,
109                ),
110                (None, Some(uuid)) => tracing::event!(
111                    target: "couchbase::metrics",
112                    Level::TRACE,
113                    histogram.db.client.operation.duration = $duration,
114                    otel.unit = "s",
115                    db.operation.name = $operation,
116                    $( $($field).+ = $value, )*
117                    couchbase.cluster.uuid = uuid,
118                    error.type = error,
119                ),
120                (None, None) => tracing::event!(
121                    target: "couchbase::metrics",
122                    Level::TRACE,
123                    histogram.db.client.operation.duration = $duration,
124                    otel.unit = "s",
125                    db.operation.name = $operation,
126                    $( $($field).+ = $value, )*
127                    error.type = error,
128                ),
129            }
130        } else {
131            match (cluster_name, cluster_uuid) {
132                (Some(name), Some(uuid)) => tracing::event!(
133                    target: "couchbase::metrics",
134                    Level::TRACE,
135                    histogram.db.client.operation.duration = $duration,
136                    otel.unit = "s",
137                    db.operation.name = $operation,
138                    $( $($field).+ = $value, )*
139                    couchbase.cluster.name = name,
140                    couchbase.cluster.uuid = uuid,
141                ),
142                (Some(name), None) => tracing::event!(
143                    target: "couchbase::metrics",
144                    Level::TRACE,
145                    histogram.db.client.operation.duration = $duration,
146                    otel.unit = "s",
147                    db.operation.name = $operation,
148                    $( $($field).+ = $value, )*
149                    couchbase.cluster.name = name,
150                ),
151                (None, Some(uuid)) => tracing::event!(
152                    target: "couchbase::metrics",
153                    Level::TRACE,
154                    histogram.db.client.operation.duration = $duration,
155                    otel.unit = "s",
156                    db.operation.name = $operation,
157                    $( $($field).+ = $value, )*
158                    couchbase.cluster.uuid = uuid,
159                ),
160                (None, None) => tracing::event!(
161                    target: "couchbase::metrics",
162                    Level::TRACE,
163                    histogram.db.client.operation.duration = $duration,
164                    otel.unit = "s",
165                    db.operation.name = $operation,
166                    $( $($field).+ = $value, )*
167                ),
168            }
169        }
170    }};
171}
172
173#[macro_export]
174macro_rules! create_span {
175    ($name:literal) => {
176        $crate::tracingcomponent::SpanBuilder::new(
177            $name,
178            tracing::trace_span!(
179                target: "couchbase::tracing",
180                $name,
181                otel.kind = $crate::tracingcomponent::SPAN_ATTRIB_OTEL_KIND_CLIENT_VALUE,
182                otel.status_code = tracing::field::Empty,
183                db.operation.name = $name,
184                db.system.name = $crate::tracingcomponent::SPAN_ATTRIB_DB_SYSTEM_VALUE,
185                couchbase.retries = 0,
186                couchbase.cluster.name = tracing::field::Empty,
187                couchbase.cluster.uuid = tracing::field::Empty,
188                couchbase.service = tracing::field::Empty,
189                db.namespace = tracing::field::Empty,
190                couchbase.scope.name = tracing::field::Empty,
191                couchbase.collection.name = tracing::field::Empty,
192                couchbase.durability = tracing::field::Empty,
193            ),
194        )
195    };
196}
197
198pub(crate) fn build_keyspace<'a>(
199    bucket: Option<&'a str>,
200    scope: Option<&'a str>,
201    collection_name: Option<&'a str>,
202) -> Keyspace<'a> {
203    match (bucket, scope, collection_name) {
204        (Some(bucket_str), Some(scope_name), Some(collection)) => Keyspace::Collection {
205            bucket: bucket_str,
206            scope: scope_name,
207            collection,
208        },
209        (Some(bucket_str), Some(scope_name), None) => Keyspace::Scope {
210            bucket: bucket_str,
211            scope: scope_name,
212        },
213        (Some(bucket_str), None, None) => Keyspace::Bucket { bucket: bucket_str },
214        _ => Keyspace::Cluster,
215    }
216}
217
218#[derive(Debug)]
219pub(crate) struct TracingComponent {
220    cluster_labels: ArcSwap<Option<ClusterLabels>>,
221}
222
223impl Default for TracingComponent {
224    fn default() -> Self {
225        Self {
226            cluster_labels: ArcSwap::new(Arc::new(None)),
227        }
228    }
229}
230
231impl TracingComponent {
232    pub(crate) fn new(config: TracingComponentConfig) -> Self {
233        Self {
234            cluster_labels: ArcSwap::new(Arc::new(config.cluster_labels)),
235        }
236    }
237
238    pub(crate) fn reconfigure(&self, state: TracingComponentConfig) {
239        self.cluster_labels.store(Arc::new(state.cluster_labels));
240    }
241
242    pub(crate) fn get_cluster_labels(&self) -> Option<ClusterLabels> {
243        (**self.cluster_labels.load()).clone()
244    }
245
246    pub(crate) fn create_dispatch_span(&self, fields: &BeginDispatchFields<'_>) -> Span {
247        let span = trace_span!(
248            target: "couchbase::tracing",
249            SPAN_NAME_DISPATCH_TO_SERVER,
250            otel.kind = SPAN_ATTRIB_OTEL_KIND_CLIENT_VALUE,
251            db.system.name = SPAN_ATTRIB_DB_SYSTEM_VALUE,
252            network.transport = SPAN_ATTRIB_NET_TRANSPORT_VALUE,
253            couchbase.cluster.uuid = tracing::field::Empty,
254            couchbase.cluster.name = tracing::field::Empty,
255            couchbase.server_duration = tracing::field::Empty,
256            couchbase.local_id = fields.client_id,
257            network.peer.address = fields.peer_addr.0,
258            network.peer.port = fields.peer_addr.1,
259            server.address = fields.canonical_addr.0,
260            server.port = fields.canonical_addr.1,
261            couchbase.operation_id = tracing::field::Empty,
262        );
263
264        self.record_cluster_labels(&span);
265        span
266    }
267
268    pub(crate) fn record_cluster_labels(&self, span: &Span) {
269        let cluster_labels = self.cluster_labels.load();
270        if let Some(ref cluster_labels) = **cluster_labels {
271            if let Some(ref cluster_uuid) = cluster_labels.cluster_uuid {
272                span.record(SPAN_ATTRIB_CLUSTER_UUID_KEY, cluster_uuid.as_str());
273            }
274            if let Some(ref cluster_name) = cluster_labels.cluster_name {
275                span.record(SPAN_ATTRIB_CLUSTER_NAME_KEY, cluster_name.as_str());
276            }
277        }
278    }
279
280    pub(crate) async fn orchestrate_dispatch_span<Fut, T, F>(
281        &self,
282        begin_fields: BeginDispatchFields<'_>,
283        operation: Fut,
284        end_fields_provider: F,
285    ) -> T
286    where
287        Fut: Future<Output = T> + Send,
288        F: FnOnce(&T) -> EndDispatchFields + Send,
289    {
290        // Fast path: skip tracing overhead when no subscriber is listening
291        if !tracing::span_enabled!(tracing::Level::TRACE) {
292            return operation.await;
293        }
294
295        let span = self.create_dispatch_span(&begin_fields);
296        let result = operation.instrument(span.clone()).await;
297        let end_fields = end_fields_provider(&result);
298        end_dispatch_span(span, end_fields);
299        result
300    }
301}
302
303#[derive(Debug, Clone, Default)]
304pub struct TracingComponentConfig {
305    pub cluster_labels: Option<ClusterLabels>,
306}
307
308pub enum OperationId {
309    String(String),
310    Number(u64),
311}
312
313impl OperationId {
314    pub fn from_u32(n: u32) -> Self {
315        Self::Number(n as u64)
316    }
317
318    pub fn from_string(s: String) -> Self {
319        Self::String(s)
320    }
321}
322
323pub struct EndDispatchFields {
324    pub server_duration: Option<Duration>,
325    pub operation_id: Option<OperationId>,
326}
327
328impl EndDispatchFields {
329    pub fn new(server_duration: Option<Duration>, operation_id: Option<OperationId>) -> Self {
330        Self {
331            server_duration,
332            operation_id,
333        }
334    }
335
336    pub fn server_duration(mut self, server_duration: Option<Duration>) -> Self {
337        self.server_duration = server_duration;
338        self
339    }
340
341    pub fn operation_id(mut self, operation_id: Option<OperationId>) -> Self {
342        self.operation_id = operation_id;
343        self
344    }
345}
346
347pub fn end_dispatch_span(span: Span, fields: EndDispatchFields) {
348    if let Some(server_duration) = fields.server_duration {
349        span.record(SPAN_ATTRIB_SERVER_DURATION_KEY, server_duration.as_micros());
350    }
351
352    if let Some(operation_id) = fields.operation_id {
353        match operation_id {
354            OperationId::String(s) => span.record(SPAN_ATTRIB_OPERATION_ID_KEY, s),
355            OperationId::Number(n) => span.record(SPAN_ATTRIB_OPERATION_ID_KEY, n),
356        };
357    }
358
359    drop(span);
360}
361
362#[derive(Debug)]
363pub(crate) struct BeginDispatchFields<'a> {
364    // (network.peer.address, network.peer.port)
365    pub peer_addr: (&'a str, &'a str),
366    pub client_id: Option<&'a str>,
367    // (server.address / server.port)
368    pub canonical_addr: (&'a str, &'a str),
369}
370
371impl<'a> BeginDispatchFields<'a> {
372    pub fn new(
373        peer_addr: (&'a str, &'a str),
374        canonical_addr: (&'a str, &'a str),
375        client_id: Option<&'a str>,
376    ) -> Self {
377        Self {
378            peer_addr,
379            client_id,
380            canonical_addr,
381        }
382    }
383}
384
385pub struct SpanBuilder {
386    name: &'static str,
387    span: tracing::Span,
388}
389
390#[derive(Clone, Debug)]
391pub enum Keyspace<'a> {
392    Cluster,
393    Bucket {
394        bucket: &'a str,
395    },
396    Scope {
397        bucket: &'a str,
398        scope: &'a str,
399    },
400    Collection {
401        bucket: &'a str,
402        scope: &'a str,
403        collection: &'a str,
404    },
405}
406
407impl Display for Keyspace<'_> {
408    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
409        match self {
410            Keyspace::Cluster => write!(f, "cluster"),
411            Keyspace::Bucket { bucket } => write!(f, "bucket:{}", bucket),
412            Keyspace::Scope { bucket, scope } => write!(f, "bucket:{}/scope:{}", bucket, scope),
413            Keyspace::Collection {
414                bucket,
415                scope,
416                collection,
417            } => write!(
418                f,
419                "bucket:{}/scope:{}/collection:{}",
420                bucket, scope, collection
421            ),
422        }
423    }
424}
425
426impl SpanBuilder {
427    pub fn new(name: &'static str, span: tracing::Span) -> Self {
428        Self { span, name }
429    }
430
431    pub fn span(&self) -> &tracing::Span {
432        &self.span
433    }
434
435    pub fn with_cluster_labels(self, cluster_labels: &Option<ClusterLabels>) -> Self {
436        if let Some(labels) = cluster_labels {
437            if let Some(uuid) = &labels.cluster_uuid {
438                self.span
439                    .record(SPAN_ATTRIB_CLUSTER_UUID_KEY, uuid.as_str());
440            }
441            if let Some(name) = &labels.cluster_name {
442                self.span
443                    .record(SPAN_ATTRIB_CLUSTER_NAME_KEY, name.as_str());
444            }
445        }
446        self
447    }
448
449    pub fn with_durability<D>(self, durability: Option<&D>) -> Self
450    where
451        for<'a> &'a D: Into<u8>,
452    {
453        let durability_str = if let Some(durability) = durability {
454            match durability.into() {
455                1 => "majority",
456                2 => "majority_and_persist_active",
457                3 => "persist_majority",
458                _ => return self,
459            }
460        } else {
461            return self;
462        };
463
464        self.span.record(SPAN_ATTRIB_DB_DURABILITY, durability_str);
465        self
466    }
467
468    pub fn with_keyspace(self, keyspace: &Keyspace<'_>) -> Self {
469        match keyspace {
470            Keyspace::Cluster => {}
471            Keyspace::Bucket { bucket } => {
472                self.span.record("db.namespace", *bucket);
473            }
474            Keyspace::Scope { bucket, scope } => {
475                self.span.record("db.namespace", *bucket);
476                self.span.record("couchbase.scope.name", *scope);
477            }
478            Keyspace::Collection {
479                bucket,
480                scope,
481                collection,
482            } => {
483                self.span.record("db.namespace", *bucket);
484                self.span.record("couchbase.scope.name", *scope);
485                self.span.record("couchbase.collection.name", *collection);
486            }
487        }
488        self
489    }
490
491    pub fn with_service(self, service: Option<&'static str>) -> Self {
492        if let Some(service) = service {
493            self.span.record(SPAN_ATTRIB_SERVICE_KEY, service);
494        }
495        self
496    }
497
498    pub fn with_statement(self, statement: &str) -> Self {
499        self.span.record("db.query.text", statement);
500        self
501    }
502
503    pub fn build(self) -> tracing::Span {
504        self.span
505    }
506
507    pub fn name(&self) -> &'static str {
508        self.name
509    }
510}
511
512pub trait MetricsName {
513    fn metrics_name(&self) -> &'static str;
514}
515
516pub fn record_metrics<E>(
517    operation_name: &str,
518    service: Option<&str>,
519    keyspace: &Keyspace<'_>,
520    cluster_labels: &Option<ClusterLabels>,
521    start: Instant,
522    error: Option<&E>,
523) where
524    E: MetricsName,
525{
526    let duration = start.elapsed().as_secs_f64();
527
528    let cluster_name = cluster_labels
529        .as_ref()
530        .and_then(|labels| labels.cluster_name.as_deref());
531    let cluster_uuid = cluster_labels
532        .as_ref()
533        .and_then(|labels| labels.cluster_uuid.as_deref());
534    let error_name = error.map(|err| err.metrics_name());
535
536    match keyspace {
537        Keyspace::Cluster => {
538            if let Some(service) = service {
539                record_operation_metric_event!(
540                    duration,
541                    operation_name,
542                    cluster_name,
543                    cluster_uuid,
544                    error_name,
545                    couchbase.service = service
546                );
547            } else {
548                record_operation_metric_event!(
549                    duration,
550                    operation_name,
551                    cluster_name,
552                    cluster_uuid,
553                    error_name
554                );
555            }
556        }
557        Keyspace::Bucket { bucket } => {
558            if let Some(service) = service {
559                record_operation_metric_event!(
560                    duration,
561                    operation_name,
562                    cluster_name,
563                    cluster_uuid,
564                    error_name,
565                    couchbase.service = service,
566                    db.namespace = bucket
567                );
568            } else {
569                record_operation_metric_event!(
570                    duration,
571                    operation_name,
572                    cluster_name,
573                    cluster_uuid,
574                    error_name,
575                    db.namespace = bucket
576                );
577            }
578        }
579        Keyspace::Scope { bucket, scope } => {
580            if let Some(service) = service {
581                record_operation_metric_event!(
582                    duration,
583                    operation_name,
584                    cluster_name,
585                    cluster_uuid,
586                    error_name,
587                    couchbase.service = service,
588                    db.namespace = bucket,
589                    couchbase.scope.name = scope
590                );
591            } else {
592                record_operation_metric_event!(
593                    duration,
594                    operation_name,
595                    cluster_name,
596                    cluster_uuid,
597                    error_name,
598                    db.namespace = bucket,
599                    couchbase.scope.name = scope
600                );
601            }
602        }
603        Keyspace::Collection {
604            bucket,
605            scope,
606            collection,
607        } => {
608            if let Some(service) = service {
609                record_operation_metric_event!(
610                    duration,
611                    operation_name,
612                    cluster_name,
613                    cluster_uuid,
614                    error_name,
615                    couchbase.service = service,
616                    db.namespace = bucket,
617                    couchbase.scope.name = scope,
618                    couchbase.collection.name = collection
619                );
620            } else {
621                record_operation_metric_event!(
622                    duration,
623                    operation_name,
624                    cluster_name,
625                    cluster_uuid,
626                    error_name,
627                    db.namespace = bucket,
628                    couchbase.scope.name = scope,
629                    couchbase.collection.name = collection
630                );
631            }
632        }
633    }
634}