Skip to main content

couchbase_core/
tracingcomponent.rs

1/*
2 *
3 *  * Copyright (c) 2025 Couchbase, Inc.
4 *  *
5 *  * Licensed under the Apache License, Version 2.0 (the "License");
6 *  * you may not use this file except in compliance with the License.
7 *  * You may obtain a copy of the License at
8 *  *
9 *  *    http://www.apache.org/licenses/LICENSE-2.0
10 *  *
11 *  * Unless required by applicable law or agreed to in writing, software
12 *  * distributed under the License is distributed on an "AS IS" BASIS,
13 *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  * See the License for the specific language governing permissions and
15 *  * limitations under the License.
16 *
17 */
18use crate::clusterlabels::ClusterLabels;
19use crate::memdx::durability_level::DurabilityLevel;
20use crate::util::get_host_port_from_uri;
21use arc_swap::ArcSwap;
22use std::fmt::Display;
23use std::future::Future;
24use std::net::SocketAddr;
25use std::sync::Arc;
26use std::time::{Duration, Instant};
27use tracing::{instrument, span, trace_span, Instrument, Level, Span};
28use url::Url;
29
30pub const SPAN_NAME_DISPATCH_TO_SERVER: &str = "dispatch_to_server";
31pub const SPAN_NAME_REQUEST_ENCODING: &str = "request_encoding";
32
33pub const SPAN_ATTRIB_DB_SYSTEM_KEY: &str = "db.system.name";
34pub const SPAN_ATTRIB_DB_SYSTEM_VALUE: &str = "couchbase";
35pub const SPAN_ATTRIB_OPERATION_ID_KEY: &str = "couchbase.operation_id";
36pub const SPAN_ATTRIB_OPERATION_KEY: &str = "db.operation.name";
37pub const SPAN_ATTRIB_RETRIES: &str = "couchbase.retries";
38pub const SPAN_ATTRIB_LOCAL_ID_KEY: &str = "couchbase.local_id";
39pub const SPAN_ATTRIB_NET_TRANSPORT_KEY: &str = "network.transport";
40pub const SPAN_ATTRIB_NET_TRANSPORT_VALUE: &str = "tcp";
41pub const SPAN_ATTRIB_NET_REMOTE_ADDRESS_KEY: &str = "server.address";
42pub const SPAN_ATTRIB_NET_REMOTE_PORT_KEY: &str = "server.port";
43pub const SPAN_ATTRIB_NET_PEER_ADDRESS_KEY: &str = "network.peer.address";
44pub const SPAN_ATTRIB_NET_PEER_PORT_KEY: &str = "network.peer.port";
45pub const SPAN_ATTRIB_SERVER_DURATION_KEY: &str = "couchbase.server_duration";
46pub const SPAN_ATTRIB_SERVICE_KEY: &str = "couchbase.service";
47pub const SPAN_ATTRIB_DB_NAME_KEY: &str = "db.namespace";
48pub const SPAN_ATTRIB_DB_COLLECTION_NAME_KEY: &str = "couchbase.collection.name";
49pub const SPAN_ATTRIB_DB_SCOPE_NAME_KEY: &str = "couchbase.scope.name";
50pub const SPAN_ATTRIB_DB_DURABILITY: &str = "couchbase.durability";
51pub const SPAN_ATTRIB_NUM_RETRIES: &str = "couchbase.retries";
52pub const SPAN_ATTRIB_CLUSTER_UUID_KEY: &str = "couchbase.cluster.uuid";
53pub const SPAN_ATTRIB_CLUSTER_NAME_KEY: &str = "couchbase.cluster.name";
54
55pub const METER_NAME_CB_OPERATION_DURATION: &str = "db.client.operation.duration";
56pub const METER_ATTRIB_SERVICE_KEY: &str = "couchbase.service";
57pub const METER_ATTRIB_OPERATION_KEY: &str = "db.operation.name";
58pub const METER_ATTRIB_BUCKET_NAME_KEY: &str = "db.namespace";
59pub const METER_ATTRIB_SCOPE_NAME_KEY: &str = "couchbase.scope.name";
60pub const METER_ATTRIB_COLLECTION_NAME_KEY: &str = "couchbase.collection.name";
61pub const METER_ATTRIB_ERROR_KEY: &str = "error.type";
62pub const METER_ATTRIB_CLUSTER_UUID_KEY: &str = "couchbase.cluster.uuid";
63pub const METER_ATTRIB_CLUSTER_NAME_KEY: &str = "couchbase.cluster.name";
64
65pub const SERVICE_VALUE_KV: &str = "kv";
66pub const SERVICE_VALUE_QUERY: &str = "query";
67pub const SERVICE_VALUE_ANALYTICS: &str = "analytics";
68pub const SERVICE_VALUE_SEARCH: &str = "search";
69pub const SERVICE_VALUE_MANAGEMENT: &str = "management";
70pub const SERVICE_VALUE_EVENTING: &str = "eventing";
71
72pub const SPAN_ATTRIB_OTEL_KIND_KEY: &str = "otel.kind";
73pub const SPAN_ATTRIB_OTEL_KIND_CLIENT_VALUE: &str = "client";
74
75macro_rules! record_operation_metric_event {
76    (
77        $duration:expr,
78        $operation:expr,
79        $cluster_name:expr,
80        $cluster_uuid:expr,
81        $error:expr
82        $(, $($field:ident).+ = $value:expr )*
83    ) => {{
84        let cluster_name = $cluster_name;
85        let cluster_uuid = $cluster_uuid;
86        let error = $error;
87        if let Some(error) = error {
88            match (cluster_name, cluster_uuid) {
89                (Some(name), Some(uuid)) => tracing::event!(
90                    target: "couchbase::metrics",
91                    Level::TRACE,
92                    histogram.db.client.operation.duration = $duration,
93                    otel.unit = "s",
94                    db.operation.name = $operation,
95                    db.system.name = SPAN_ATTRIB_DB_SYSTEM_VALUE,
96                    $( $($field).+ = $value, )*
97                    couchbase.cluster.name = name,
98                    couchbase.cluster.uuid = uuid,
99                    error.type = error,
100                ),
101                (Some(name), None) => tracing::event!(
102                    target: "couchbase::metrics",
103                    Level::TRACE,
104                    histogram.db.client.operation.duration = $duration,
105                    otel.unit = "s",
106                    db.operation.name = $operation,
107                    db.system.name = SPAN_ATTRIB_DB_SYSTEM_VALUE,
108                    $( $($field).+ = $value, )*
109                    couchbase.cluster.name = name,
110                    error.type = error,
111                ),
112                (None, Some(uuid)) => tracing::event!(
113                    target: "couchbase::metrics",
114                    Level::TRACE,
115                    histogram.db.client.operation.duration = $duration,
116                    otel.unit = "s",
117                    db.operation.name = $operation,
118                    db.system.name = SPAN_ATTRIB_DB_SYSTEM_VALUE,
119                    $( $($field).+ = $value, )*
120                    couchbase.cluster.uuid = uuid,
121                    error.type = error,
122                ),
123                (None, None) => tracing::event!(
124                    target: "couchbase::metrics",
125                    Level::TRACE,
126                    histogram.db.client.operation.duration = $duration,
127                    otel.unit = "s",
128                    db.operation.name = $operation,
129                    db.system.name = SPAN_ATTRIB_DB_SYSTEM_VALUE,
130                    $( $($field).+ = $value, )*
131                    error.type = error,
132                ),
133            }
134        } else {
135            match (cluster_name, cluster_uuid) {
136                (Some(name), Some(uuid)) => tracing::event!(
137                    target: "couchbase::metrics",
138                    Level::TRACE,
139                    histogram.db.client.operation.duration = $duration,
140                    otel.unit = "s",
141                    db.operation.name = $operation,
142                    db.system.name = SPAN_ATTRIB_DB_SYSTEM_VALUE,
143                    $( $($field).+ = $value, )*
144                    couchbase.cluster.name = name,
145                    couchbase.cluster.uuid = uuid,
146                ),
147                (Some(name), None) => tracing::event!(
148                    target: "couchbase::metrics",
149                    Level::TRACE,
150                    histogram.db.client.operation.duration = $duration,
151                    otel.unit = "s",
152                    db.operation.name = $operation,
153                    db.system.name = SPAN_ATTRIB_DB_SYSTEM_VALUE,
154                    $( $($field).+ = $value, )*
155                    couchbase.cluster.name = name,
156                ),
157                (None, Some(uuid)) => tracing::event!(
158                    target: "couchbase::metrics",
159                    Level::TRACE,
160                    histogram.db.client.operation.duration = $duration,
161                    otel.unit = "s",
162                    db.operation.name = $operation,
163                    db.system.name = SPAN_ATTRIB_DB_SYSTEM_VALUE,
164                    $( $($field).+ = $value, )*
165                    couchbase.cluster.uuid = uuid,
166                ),
167                (None, None) => tracing::event!(
168                    target: "couchbase::metrics",
169                    Level::TRACE,
170                    histogram.db.client.operation.duration = $duration,
171                    otel.unit = "s",
172                    db.operation.name = $operation,
173                    db.system.name = SPAN_ATTRIB_DB_SYSTEM_VALUE,
174                    $( $($field).+ = $value, )*
175                ),
176            }
177        }
178    }};
179}
180
181#[macro_export]
182macro_rules! create_span {
183    ($name:literal) => {
184        $crate::tracingcomponent::SpanBuilder::new(
185            $name,
186            tracing::trace_span!(
187                target: "couchbase::tracing",
188                $name,
189                otel.kind = $crate::tracingcomponent::SPAN_ATTRIB_OTEL_KIND_CLIENT_VALUE,
190                otel.status_code = tracing::field::Empty,
191                db.operation.name = $name,
192                db.system.name = $crate::tracingcomponent::SPAN_ATTRIB_DB_SYSTEM_VALUE,
193                couchbase.retries = 0,
194                couchbase.cluster.name = tracing::field::Empty,
195                couchbase.cluster.uuid = tracing::field::Empty,
196                couchbase.service = tracing::field::Empty,
197                db.namespace = tracing::field::Empty,
198                couchbase.scope.name = tracing::field::Empty,
199                couchbase.collection.name = tracing::field::Empty,
200                couchbase.durability = tracing::field::Empty,
201            ),
202        )
203    };
204}
205
206pub(crate) fn build_keyspace<'a>(
207    bucket: Option<&'a str>,
208    scope: Option<&'a str>,
209    collection_name: Option<&'a str>,
210) -> Keyspace<'a> {
211    match (bucket, scope, collection_name) {
212        (Some(bucket_str), Some(scope_name), Some(collection)) => Keyspace::Collection {
213            bucket: bucket_str,
214            scope: scope_name,
215            collection,
216        },
217        (Some(bucket_str), Some(scope_name), None) => Keyspace::Scope {
218            bucket: bucket_str,
219            scope: scope_name,
220        },
221        (Some(bucket_str), None, None) => Keyspace::Bucket { bucket: bucket_str },
222        _ => Keyspace::Cluster,
223    }
224}
225
226#[derive(Debug)]
227pub(crate) struct TracingComponent {
228    cluster_labels: ArcSwap<Option<ClusterLabels>>,
229}
230
231impl Default for TracingComponent {
232    fn default() -> Self {
233        Self {
234            cluster_labels: ArcSwap::new(Arc::new(None)),
235        }
236    }
237}
238
239impl TracingComponent {
240    pub(crate) fn new(config: TracingComponentConfig) -> Self {
241        Self {
242            cluster_labels: ArcSwap::new(Arc::new(config.cluster_labels)),
243        }
244    }
245
246    pub(crate) fn reconfigure(&self, state: TracingComponentConfig) {
247        self.cluster_labels.store(Arc::new(state.cluster_labels));
248    }
249
250    pub(crate) fn get_cluster_labels(&self) -> Option<ClusterLabels> {
251        (**self.cluster_labels.load()).clone()
252    }
253
254    pub(crate) fn create_dispatch_span(&self, fields: &BeginDispatchFields<'_>) -> Span {
255        let span = trace_span!(
256            target: "couchbase::tracing",
257            SPAN_NAME_DISPATCH_TO_SERVER,
258            otel.kind = SPAN_ATTRIB_OTEL_KIND_CLIENT_VALUE,
259            db.system.name = SPAN_ATTRIB_DB_SYSTEM_VALUE,
260            network.transport = SPAN_ATTRIB_NET_TRANSPORT_VALUE,
261            couchbase.cluster.uuid = tracing::field::Empty,
262            couchbase.cluster.name = tracing::field::Empty,
263            couchbase.server_duration = tracing::field::Empty,
264            couchbase.local_id = fields.client_id,
265            network.peer.address = fields.peer_addr.0,
266            network.peer.port = fields.peer_addr.1,
267            server.address = fields.canonical_addr.0,
268            server.port = fields.canonical_addr.1,
269            couchbase.operation_id = tracing::field::Empty,
270        );
271
272        self.record_cluster_labels(&span);
273        span
274    }
275
276    pub(crate) fn record_cluster_labels(&self, span: &Span) {
277        let cluster_labels = self.cluster_labels.load();
278        if let Some(ref cluster_labels) = **cluster_labels {
279            if let Some(ref cluster_uuid) = cluster_labels.cluster_uuid {
280                span.record(SPAN_ATTRIB_CLUSTER_UUID_KEY, cluster_uuid.as_str());
281            }
282            if let Some(ref cluster_name) = cluster_labels.cluster_name {
283                span.record(SPAN_ATTRIB_CLUSTER_NAME_KEY, cluster_name.as_str());
284            }
285        }
286    }
287
288    pub(crate) async fn orchestrate_dispatch_span<Fut, T, F>(
289        &self,
290        begin_fields: BeginDispatchFields<'_>,
291        operation: Fut,
292        end_fields_provider: F,
293    ) -> T
294    where
295        Fut: Future<Output = T> + Send,
296        F: FnOnce(&T) -> EndDispatchFields + Send,
297    {
298        // Fast path: skip tracing overhead when no subscriber is listening
299        if !tracing::span_enabled!(tracing::Level::TRACE) {
300            return operation.await;
301        }
302
303        let span = self.create_dispatch_span(&begin_fields);
304        let result = operation.instrument(span.clone()).await;
305        let end_fields = end_fields_provider(&result);
306        end_dispatch_span(span, end_fields);
307        result
308    }
309}
310
311#[derive(Debug, Clone, Default)]
312pub struct TracingComponentConfig {
313    pub cluster_labels: Option<ClusterLabels>,
314}
315
316pub enum OperationId {
317    String(String),
318    Number(u64),
319}
320
321impl OperationId {
322    pub fn from_u32(n: u32) -> Self {
323        Self::Number(n as u64)
324    }
325
326    pub fn from_string(s: String) -> Self {
327        Self::String(s)
328    }
329}
330
331pub struct EndDispatchFields {
332    pub server_duration: Option<Duration>,
333    pub operation_id: Option<OperationId>,
334}
335
336impl EndDispatchFields {
337    pub fn new(server_duration: Option<Duration>, operation_id: Option<OperationId>) -> Self {
338        Self {
339            server_duration,
340            operation_id,
341        }
342    }
343
344    pub fn server_duration(mut self, server_duration: Option<Duration>) -> Self {
345        self.server_duration = server_duration;
346        self
347    }
348
349    pub fn operation_id(mut self, operation_id: Option<OperationId>) -> Self {
350        self.operation_id = operation_id;
351        self
352    }
353}
354
355pub fn end_dispatch_span(span: Span, fields: EndDispatchFields) {
356    if let Some(server_duration) = fields.server_duration {
357        span.record(SPAN_ATTRIB_SERVER_DURATION_KEY, server_duration.as_micros());
358    }
359
360    if let Some(operation_id) = fields.operation_id {
361        match operation_id {
362            OperationId::String(s) => span.record(SPAN_ATTRIB_OPERATION_ID_KEY, s),
363            OperationId::Number(n) => span.record(SPAN_ATTRIB_OPERATION_ID_KEY, n),
364        };
365    }
366
367    drop(span);
368}
369
370#[derive(Debug)]
371pub(crate) struct BeginDispatchFields<'a> {
372    // (network.peer.address, network.peer.port)
373    pub peer_addr: (&'a str, &'a str),
374    pub client_id: Option<&'a str>,
375    // (server.address / server.port)
376    pub canonical_addr: (&'a str, &'a str),
377}
378
379impl<'a> BeginDispatchFields<'a> {
380    pub fn new(
381        peer_addr: (&'a str, &'a str),
382        canonical_addr: (&'a str, &'a str),
383        client_id: Option<&'a str>,
384    ) -> Self {
385        Self {
386            peer_addr,
387            client_id,
388            canonical_addr,
389        }
390    }
391}
392
393pub struct SpanBuilder {
394    name: &'static str,
395    span: tracing::Span,
396}
397
398#[derive(Clone, Debug)]
399pub enum Keyspace<'a> {
400    Cluster,
401    Bucket {
402        bucket: &'a str,
403    },
404    Scope {
405        bucket: &'a str,
406        scope: &'a str,
407    },
408    Collection {
409        bucket: &'a str,
410        scope: &'a str,
411        collection: &'a str,
412    },
413}
414
415impl Display for Keyspace<'_> {
416    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
417        match self {
418            Keyspace::Cluster => write!(f, "cluster"),
419            Keyspace::Bucket { bucket } => write!(f, "bucket:{}", bucket),
420            Keyspace::Scope { bucket, scope } => write!(f, "bucket:{}/scope:{}", bucket, scope),
421            Keyspace::Collection {
422                bucket,
423                scope,
424                collection,
425            } => write!(
426                f,
427                "bucket:{}/scope:{}/collection:{}",
428                bucket, scope, collection
429            ),
430        }
431    }
432}
433
434impl SpanBuilder {
435    pub fn new(name: &'static str, span: tracing::Span) -> Self {
436        Self { span, name }
437    }
438
439    pub fn span(&self) -> &tracing::Span {
440        &self.span
441    }
442
443    pub fn with_cluster_labels(self, cluster_labels: &Option<ClusterLabels>) -> Self {
444        if let Some(labels) = cluster_labels {
445            if let Some(uuid) = &labels.cluster_uuid {
446                self.span
447                    .record(SPAN_ATTRIB_CLUSTER_UUID_KEY, uuid.as_str());
448            }
449            if let Some(name) = &labels.cluster_name {
450                self.span
451                    .record(SPAN_ATTRIB_CLUSTER_NAME_KEY, name.as_str());
452            }
453        }
454        self
455    }
456
457    pub fn with_durability<D>(self, durability: Option<&D>) -> Self
458    where
459        for<'a> &'a D: Into<u8>,
460    {
461        let durability_str = if let Some(durability) = durability {
462            match durability.into() {
463                1 => "majority",
464                2 => "majority_and_persist_active",
465                3 => "persist_majority",
466                _ => return self,
467            }
468        } else {
469            return self;
470        };
471
472        self.span.record(SPAN_ATTRIB_DB_DURABILITY, durability_str);
473        self
474    }
475
476    pub fn with_keyspace(self, keyspace: &Keyspace<'_>) -> Self {
477        match keyspace {
478            Keyspace::Cluster => {}
479            Keyspace::Bucket { bucket } => {
480                self.span.record("db.namespace", *bucket);
481            }
482            Keyspace::Scope { bucket, scope } => {
483                self.span.record("db.namespace", *bucket);
484                self.span.record("couchbase.scope.name", *scope);
485            }
486            Keyspace::Collection {
487                bucket,
488                scope,
489                collection,
490            } => {
491                self.span.record("db.namespace", *bucket);
492                self.span.record("couchbase.scope.name", *scope);
493                self.span.record("couchbase.collection.name", *collection);
494            }
495        }
496        self
497    }
498
499    pub fn with_service(self, service: Option<&'static str>) -> Self {
500        if let Some(service) = service {
501            self.span.record(SPAN_ATTRIB_SERVICE_KEY, service);
502        }
503        self
504    }
505
506    pub fn with_statement(self, statement: &str) -> Self {
507        self.span.record("db.query.text", statement);
508        self
509    }
510
511    pub fn build(self) -> tracing::Span {
512        self.span
513    }
514
515    pub fn name(&self) -> &'static str {
516        self.name
517    }
518}
519
520pub trait MetricsName {
521    fn metrics_name(&self) -> &'static str;
522}
523
524pub fn record_metrics<E>(
525    operation_name: &str,
526    service: Option<&str>,
527    keyspace: &Keyspace<'_>,
528    cluster_labels: &Option<ClusterLabels>,
529    start: Instant,
530    error: Option<&E>,
531) where
532    E: MetricsName,
533{
534    let duration = start.elapsed().as_secs_f64();
535
536    let cluster_name = cluster_labels
537        .as_ref()
538        .and_then(|labels| labels.cluster_name.as_deref());
539    let cluster_uuid = cluster_labels
540        .as_ref()
541        .and_then(|labels| labels.cluster_uuid.as_deref());
542    let error_name = error.map(|err| err.metrics_name());
543
544    match keyspace {
545        Keyspace::Cluster => {
546            if let Some(service) = service {
547                record_operation_metric_event!(
548                    duration,
549                    operation_name,
550                    cluster_name,
551                    cluster_uuid,
552                    error_name,
553                    couchbase.service = service
554                );
555            } else {
556                record_operation_metric_event!(
557                    duration,
558                    operation_name,
559                    cluster_name,
560                    cluster_uuid,
561                    error_name
562                );
563            }
564        }
565        Keyspace::Bucket { bucket } => {
566            if let Some(service) = service {
567                record_operation_metric_event!(
568                    duration,
569                    operation_name,
570                    cluster_name,
571                    cluster_uuid,
572                    error_name,
573                    couchbase.service = service,
574                    db.namespace = bucket
575                );
576            } else {
577                record_operation_metric_event!(
578                    duration,
579                    operation_name,
580                    cluster_name,
581                    cluster_uuid,
582                    error_name,
583                    db.namespace = bucket
584                );
585            }
586        }
587        Keyspace::Scope { bucket, scope } => {
588            if let Some(service) = service {
589                record_operation_metric_event!(
590                    duration,
591                    operation_name,
592                    cluster_name,
593                    cluster_uuid,
594                    error_name,
595                    couchbase.service = service,
596                    db.namespace = bucket,
597                    couchbase.scope.name = scope
598                );
599            } else {
600                record_operation_metric_event!(
601                    duration,
602                    operation_name,
603                    cluster_name,
604                    cluster_uuid,
605                    error_name,
606                    db.namespace = bucket,
607                    couchbase.scope.name = scope
608                );
609            }
610        }
611        Keyspace::Collection {
612            bucket,
613            scope,
614            collection,
615        } => {
616            if let Some(service) = service {
617                record_operation_metric_event!(
618                    duration,
619                    operation_name,
620                    cluster_name,
621                    cluster_uuid,
622                    error_name,
623                    couchbase.service = service,
624                    db.namespace = bucket,
625                    couchbase.scope.name = scope,
626                    couchbase.collection.name = collection
627                );
628            } else {
629                record_operation_metric_event!(
630                    duration,
631                    operation_name,
632                    cluster_name,
633                    cluster_uuid,
634                    error_name,
635                    db.namespace = bucket,
636                    couchbase.scope.name = scope,
637                    couchbase.collection.name = collection
638                );
639            }
640        }
641    }
642}