1use crate::clusterlabels::ClusterLabels;
19use crate::memdx::durability_level::DurabilityLevel;
20use crate::util::get_host_port_from_uri;
21use arc_swap::ArcSwap;
22use std::fmt::Display;
23use std::future::Future;
24use std::net::SocketAddr;
25use std::sync::Arc;
26use std::time::{Duration, Instant};
27use tracing::{instrument, span, trace_span, Instrument, Level, Span};
28use url::Url;
29
30pub const SPAN_NAME_DISPATCH_TO_SERVER: &str = "dispatch_to_server";
31pub const SPAN_NAME_REQUEST_ENCODING: &str = "request_encoding";
32
33pub const SPAN_ATTRIB_DB_SYSTEM_KEY: &str = "db.system.name";
34pub const SPAN_ATTRIB_DB_SYSTEM_VALUE: &str = "couchbase";
35pub const SPAN_ATTRIB_OPERATION_ID_KEY: &str = "couchbase.operation_id";
36pub const SPAN_ATTRIB_OPERATION_KEY: &str = "db.operation.name";
37pub const SPAN_ATTRIB_RETRIES: &str = "couchbase.retries";
38pub const SPAN_ATTRIB_LOCAL_ID_KEY: &str = "couchbase.local_id";
39pub const SPAN_ATTRIB_NET_TRANSPORT_KEY: &str = "network.transport";
40pub const SPAN_ATTRIB_NET_TRANSPORT_VALUE: &str = "tcp";
41pub const SPAN_ATTRIB_NET_REMOTE_ADDRESS_KEY: &str = "server.address";
42pub const SPAN_ATTRIB_NET_REMOTE_PORT_KEY: &str = "server.port";
43pub const SPAN_ATTRIB_NET_PEER_ADDRESS_KEY: &str = "network.peer.address";
44pub const SPAN_ATTRIB_NET_PEER_PORT_KEY: &str = "network.peer.port";
45pub const SPAN_ATTRIB_SERVER_DURATION_KEY: &str = "couchbase.server_duration";
46pub const SPAN_ATTRIB_SERVICE_KEY: &str = "couchbase.service";
47pub const SPAN_ATTRIB_DB_NAME_KEY: &str = "db.namespace";
48pub const SPAN_ATTRIB_DB_COLLECTION_NAME_KEY: &str = "couchbase.collection.name";
49pub const SPAN_ATTRIB_DB_SCOPE_NAME_KEY: &str = "couchbase.scope.name";
50pub const SPAN_ATTRIB_DB_DURABILITY: &str = "couchbase.durability";
51pub const SPAN_ATTRIB_NUM_RETRIES: &str = "couchbase.retries";
52pub const SPAN_ATTRIB_CLUSTER_UUID_KEY: &str = "couchbase.cluster.uuid";
53pub const SPAN_ATTRIB_CLUSTER_NAME_KEY: &str = "couchbase.cluster.name";
54
55pub const METER_NAME_CB_OPERATION_DURATION: &str = "db.client.operation.duration";
56pub const METER_ATTRIB_SERVICE_KEY: &str = "couchbase.service";
57pub const METER_ATTRIB_OPERATION_KEY: &str = "db.operation.name";
58pub const METER_ATTRIB_BUCKET_NAME_KEY: &str = "db.namespace";
59pub const METER_ATTRIB_SCOPE_NAME_KEY: &str = "couchbase.scope.name";
60pub const METER_ATTRIB_COLLECTION_NAME_KEY: &str = "couchbase.collection.name";
61pub const METER_ATTRIB_ERROR_KEY: &str = "error.type";
62pub const METER_ATTRIB_CLUSTER_UUID_KEY: &str = "couchbase.cluster.uuid";
63pub const METER_ATTRIB_CLUSTER_NAME_KEY: &str = "couchbase.cluster.name";
64
65pub const SERVICE_VALUE_KV: &str = "kv";
66pub const SERVICE_VALUE_QUERY: &str = "query";
67pub const SERVICE_VALUE_ANALYTICS: &str = "analytics";
68pub const SERVICE_VALUE_SEARCH: &str = "search";
69pub const SERVICE_VALUE_MANAGEMENT: &str = "management";
70pub const SERVICE_VALUE_EVENTING: &str = "eventing";
71
72pub const SPAN_ATTRIB_OTEL_KIND_KEY: &str = "otel.kind";
73pub const SPAN_ATTRIB_OTEL_KIND_CLIENT_VALUE: &str = "client";
74
75macro_rules! record_operation_metric_event {
76 (
77 $duration:expr,
78 $operation:expr,
79 $cluster_name:expr,
80 $cluster_uuid:expr,
81 $error:expr
82 $(, $($field:ident).+ = $value:expr )*
83 ) => {{
84 let cluster_name = $cluster_name;
85 let cluster_uuid = $cluster_uuid;
86 let error = $error;
87 if let Some(error) = error {
88 match (cluster_name, cluster_uuid) {
89 (Some(name), Some(uuid)) => tracing::event!(
90 target: "couchbase::metrics",
91 Level::TRACE,
92 histogram.db.client.operation.duration = $duration,
93 otel.unit = "s",
94 db.operation.name = $operation,
95 $( $($field).+ = $value, )*
96 couchbase.cluster.name = name,
97 couchbase.cluster.uuid = uuid,
98 error.type = error,
99 ),
100 (Some(name), None) => tracing::event!(
101 target: "couchbase::metrics",
102 Level::TRACE,
103 histogram.db.client.operation.duration = $duration,
104 otel.unit = "s",
105 db.operation.name = $operation,
106 $( $($field).+ = $value, )*
107 couchbase.cluster.name = name,
108 error.type = error,
109 ),
110 (None, Some(uuid)) => tracing::event!(
111 target: "couchbase::metrics",
112 Level::TRACE,
113 histogram.db.client.operation.duration = $duration,
114 otel.unit = "s",
115 db.operation.name = $operation,
116 $( $($field).+ = $value, )*
117 couchbase.cluster.uuid = uuid,
118 error.type = error,
119 ),
120 (None, None) => tracing::event!(
121 target: "couchbase::metrics",
122 Level::TRACE,
123 histogram.db.client.operation.duration = $duration,
124 otel.unit = "s",
125 db.operation.name = $operation,
126 $( $($field).+ = $value, )*
127 error.type = error,
128 ),
129 }
130 } else {
131 match (cluster_name, cluster_uuid) {
132 (Some(name), Some(uuid)) => tracing::event!(
133 target: "couchbase::metrics",
134 Level::TRACE,
135 histogram.db.client.operation.duration = $duration,
136 otel.unit = "s",
137 db.operation.name = $operation,
138 $( $($field).+ = $value, )*
139 couchbase.cluster.name = name,
140 couchbase.cluster.uuid = uuid,
141 ),
142 (Some(name), None) => tracing::event!(
143 target: "couchbase::metrics",
144 Level::TRACE,
145 histogram.db.client.operation.duration = $duration,
146 otel.unit = "s",
147 db.operation.name = $operation,
148 $( $($field).+ = $value, )*
149 couchbase.cluster.name = name,
150 ),
151 (None, Some(uuid)) => tracing::event!(
152 target: "couchbase::metrics",
153 Level::TRACE,
154 histogram.db.client.operation.duration = $duration,
155 otel.unit = "s",
156 db.operation.name = $operation,
157 $( $($field).+ = $value, )*
158 couchbase.cluster.uuid = uuid,
159 ),
160 (None, None) => tracing::event!(
161 target: "couchbase::metrics",
162 Level::TRACE,
163 histogram.db.client.operation.duration = $duration,
164 otel.unit = "s",
165 db.operation.name = $operation,
166 $( $($field).+ = $value, )*
167 ),
168 }
169 }
170 }};
171}
172
173#[macro_export]
174macro_rules! create_span {
175 ($name:literal) => {
176 $crate::tracingcomponent::SpanBuilder::new(
177 $name,
178 tracing::trace_span!(
179 target: "couchbase::tracing",
180 $name,
181 otel.kind = $crate::tracingcomponent::SPAN_ATTRIB_OTEL_KIND_CLIENT_VALUE,
182 otel.status_code = tracing::field::Empty,
183 db.operation.name = $name,
184 db.system.name = $crate::tracingcomponent::SPAN_ATTRIB_DB_SYSTEM_VALUE,
185 couchbase.retries = 0,
186 couchbase.cluster.name = tracing::field::Empty,
187 couchbase.cluster.uuid = tracing::field::Empty,
188 couchbase.service = tracing::field::Empty,
189 db.namespace = tracing::field::Empty,
190 couchbase.scope.name = tracing::field::Empty,
191 couchbase.collection.name = tracing::field::Empty,
192 couchbase.durability = tracing::field::Empty,
193 ),
194 )
195 };
196}
197
198pub(crate) fn build_keyspace<'a>(
199 bucket: Option<&'a str>,
200 scope: Option<&'a str>,
201 collection_name: Option<&'a str>,
202) -> Keyspace<'a> {
203 match (bucket, scope, collection_name) {
204 (Some(bucket_str), Some(scope_name), Some(collection)) => Keyspace::Collection {
205 bucket: bucket_str,
206 scope: scope_name,
207 collection,
208 },
209 (Some(bucket_str), Some(scope_name), None) => Keyspace::Scope {
210 bucket: bucket_str,
211 scope: scope_name,
212 },
213 (Some(bucket_str), None, None) => Keyspace::Bucket { bucket: bucket_str },
214 _ => Keyspace::Cluster,
215 }
216}
217
218#[derive(Debug)]
219pub(crate) struct TracingComponent {
220 cluster_labels: ArcSwap<Option<ClusterLabels>>,
221}
222
223impl Default for TracingComponent {
224 fn default() -> Self {
225 Self {
226 cluster_labels: ArcSwap::new(Arc::new(None)),
227 }
228 }
229}
230
231impl TracingComponent {
232 pub(crate) fn new(config: TracingComponentConfig) -> Self {
233 Self {
234 cluster_labels: ArcSwap::new(Arc::new(config.cluster_labels)),
235 }
236 }
237
238 pub(crate) fn reconfigure(&self, state: TracingComponentConfig) {
239 self.cluster_labels.store(Arc::new(state.cluster_labels));
240 }
241
242 pub(crate) fn get_cluster_labels(&self) -> Option<ClusterLabels> {
243 (**self.cluster_labels.load()).clone()
244 }
245
246 pub(crate) fn create_dispatch_span(&self, fields: &BeginDispatchFields<'_>) -> Span {
247 let span = trace_span!(
248 target: "couchbase::tracing",
249 SPAN_NAME_DISPATCH_TO_SERVER,
250 otel.kind = SPAN_ATTRIB_OTEL_KIND_CLIENT_VALUE,
251 db.system.name = SPAN_ATTRIB_DB_SYSTEM_VALUE,
252 network.transport = SPAN_ATTRIB_NET_TRANSPORT_VALUE,
253 couchbase.cluster.uuid = tracing::field::Empty,
254 couchbase.cluster.name = tracing::field::Empty,
255 couchbase.server_duration = tracing::field::Empty,
256 couchbase.local_id = fields.client_id,
257 network.peer.address = fields.peer_addr.0,
258 network.peer.port = fields.peer_addr.1,
259 server.address = fields.canonical_addr.0,
260 server.port = fields.canonical_addr.1,
261 couchbase.operation_id = tracing::field::Empty,
262 );
263
264 self.record_cluster_labels(&span);
265 span
266 }
267
268 pub(crate) fn record_cluster_labels(&self, span: &Span) {
269 let cluster_labels = self.cluster_labels.load();
270 if let Some(ref cluster_labels) = **cluster_labels {
271 if let Some(ref cluster_uuid) = cluster_labels.cluster_uuid {
272 span.record(SPAN_ATTRIB_CLUSTER_UUID_KEY, cluster_uuid.as_str());
273 }
274 if let Some(ref cluster_name) = cluster_labels.cluster_name {
275 span.record(SPAN_ATTRIB_CLUSTER_NAME_KEY, cluster_name.as_str());
276 }
277 }
278 }
279
280 pub(crate) async fn orchestrate_dispatch_span<Fut, T, F>(
281 &self,
282 begin_fields: BeginDispatchFields<'_>,
283 operation: Fut,
284 end_fields_provider: F,
285 ) -> T
286 where
287 Fut: Future<Output = T> + Send,
288 F: FnOnce(&T) -> EndDispatchFields + Send,
289 {
290 if !tracing::span_enabled!(tracing::Level::TRACE) {
292 return operation.await;
293 }
294
295 let span = self.create_dispatch_span(&begin_fields);
296 let result = operation.instrument(span.clone()).await;
297 let end_fields = end_fields_provider(&result);
298 end_dispatch_span(span, end_fields);
299 result
300 }
301}
302
303#[derive(Debug, Clone, Default)]
304pub struct TracingComponentConfig {
305 pub cluster_labels: Option<ClusterLabels>,
306}
307
308pub enum OperationId {
309 String(String),
310 Number(u64),
311}
312
313impl OperationId {
314 pub fn from_u32(n: u32) -> Self {
315 Self::Number(n as u64)
316 }
317
318 pub fn from_string(s: String) -> Self {
319 Self::String(s)
320 }
321}
322
323pub struct EndDispatchFields {
324 pub server_duration: Option<Duration>,
325 pub operation_id: Option<OperationId>,
326}
327
328impl EndDispatchFields {
329 pub fn new(server_duration: Option<Duration>, operation_id: Option<OperationId>) -> Self {
330 Self {
331 server_duration,
332 operation_id,
333 }
334 }
335
336 pub fn server_duration(mut self, server_duration: Option<Duration>) -> Self {
337 self.server_duration = server_duration;
338 self
339 }
340
341 pub fn operation_id(mut self, operation_id: Option<OperationId>) -> Self {
342 self.operation_id = operation_id;
343 self
344 }
345}
346
347pub fn end_dispatch_span(span: Span, fields: EndDispatchFields) {
348 if let Some(server_duration) = fields.server_duration {
349 span.record(SPAN_ATTRIB_SERVER_DURATION_KEY, server_duration.as_micros());
350 }
351
352 if let Some(operation_id) = fields.operation_id {
353 match operation_id {
354 OperationId::String(s) => span.record(SPAN_ATTRIB_OPERATION_ID_KEY, s),
355 OperationId::Number(n) => span.record(SPAN_ATTRIB_OPERATION_ID_KEY, n),
356 };
357 }
358
359 drop(span);
360}
361
362#[derive(Debug)]
363pub(crate) struct BeginDispatchFields<'a> {
364 pub peer_addr: (&'a str, &'a str),
366 pub client_id: Option<&'a str>,
367 pub canonical_addr: (&'a str, &'a str),
369}
370
371impl<'a> BeginDispatchFields<'a> {
372 pub fn new(
373 peer_addr: (&'a str, &'a str),
374 canonical_addr: (&'a str, &'a str),
375 client_id: Option<&'a str>,
376 ) -> Self {
377 Self {
378 peer_addr,
379 client_id,
380 canonical_addr,
381 }
382 }
383}
384
385pub struct SpanBuilder {
386 name: &'static str,
387 span: tracing::Span,
388}
389
390#[derive(Clone, Debug)]
391pub enum Keyspace<'a> {
392 Cluster,
393 Bucket {
394 bucket: &'a str,
395 },
396 Scope {
397 bucket: &'a str,
398 scope: &'a str,
399 },
400 Collection {
401 bucket: &'a str,
402 scope: &'a str,
403 collection: &'a str,
404 },
405}
406
407impl Display for Keyspace<'_> {
408 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
409 match self {
410 Keyspace::Cluster => write!(f, "cluster"),
411 Keyspace::Bucket { bucket } => write!(f, "bucket:{}", bucket),
412 Keyspace::Scope { bucket, scope } => write!(f, "bucket:{}/scope:{}", bucket, scope),
413 Keyspace::Collection {
414 bucket,
415 scope,
416 collection,
417 } => write!(
418 f,
419 "bucket:{}/scope:{}/collection:{}",
420 bucket, scope, collection
421 ),
422 }
423 }
424}
425
426impl SpanBuilder {
427 pub fn new(name: &'static str, span: tracing::Span) -> Self {
428 Self { span, name }
429 }
430
431 pub fn span(&self) -> &tracing::Span {
432 &self.span
433 }
434
435 pub fn with_cluster_labels(self, cluster_labels: &Option<ClusterLabels>) -> Self {
436 if let Some(labels) = cluster_labels {
437 if let Some(uuid) = &labels.cluster_uuid {
438 self.span
439 .record(SPAN_ATTRIB_CLUSTER_UUID_KEY, uuid.as_str());
440 }
441 if let Some(name) = &labels.cluster_name {
442 self.span
443 .record(SPAN_ATTRIB_CLUSTER_NAME_KEY, name.as_str());
444 }
445 }
446 self
447 }
448
449 pub fn with_durability<D>(self, durability: Option<&D>) -> Self
450 where
451 for<'a> &'a D: Into<u8>,
452 {
453 let durability_str = if let Some(durability) = durability {
454 match durability.into() {
455 1 => "majority",
456 2 => "majority_and_persist_active",
457 3 => "persist_majority",
458 _ => return self,
459 }
460 } else {
461 return self;
462 };
463
464 self.span.record(SPAN_ATTRIB_DB_DURABILITY, durability_str);
465 self
466 }
467
468 pub fn with_keyspace(self, keyspace: &Keyspace<'_>) -> Self {
469 match keyspace {
470 Keyspace::Cluster => {}
471 Keyspace::Bucket { bucket } => {
472 self.span.record("db.namespace", *bucket);
473 }
474 Keyspace::Scope { bucket, scope } => {
475 self.span.record("db.namespace", *bucket);
476 self.span.record("couchbase.scope.name", *scope);
477 }
478 Keyspace::Collection {
479 bucket,
480 scope,
481 collection,
482 } => {
483 self.span.record("db.namespace", *bucket);
484 self.span.record("couchbase.scope.name", *scope);
485 self.span.record("couchbase.collection.name", *collection);
486 }
487 }
488 self
489 }
490
491 pub fn with_service(self, service: Option<&'static str>) -> Self {
492 if let Some(service) = service {
493 self.span.record(SPAN_ATTRIB_SERVICE_KEY, service);
494 }
495 self
496 }
497
498 pub fn with_statement(self, statement: &str) -> Self {
499 self.span.record("db.query.text", statement);
500 self
501 }
502
503 pub fn build(self) -> tracing::Span {
504 self.span
505 }
506
507 pub fn name(&self) -> &'static str {
508 self.name
509 }
510}
511
512pub trait MetricsName {
513 fn metrics_name(&self) -> &'static str;
514}
515
516pub fn record_metrics<E>(
517 operation_name: &str,
518 service: Option<&str>,
519 keyspace: &Keyspace<'_>,
520 cluster_labels: &Option<ClusterLabels>,
521 start: Instant,
522 error: Option<&E>,
523) where
524 E: MetricsName,
525{
526 let duration = start.elapsed().as_secs_f64();
527
528 let cluster_name = cluster_labels
529 .as_ref()
530 .and_then(|labels| labels.cluster_name.as_deref());
531 let cluster_uuid = cluster_labels
532 .as_ref()
533 .and_then(|labels| labels.cluster_uuid.as_deref());
534 let error_name = error.map(|err| err.metrics_name());
535
536 match keyspace {
537 Keyspace::Cluster => {
538 if let Some(service) = service {
539 record_operation_metric_event!(
540 duration,
541 operation_name,
542 cluster_name,
543 cluster_uuid,
544 error_name,
545 couchbase.service = service
546 );
547 } else {
548 record_operation_metric_event!(
549 duration,
550 operation_name,
551 cluster_name,
552 cluster_uuid,
553 error_name
554 );
555 }
556 }
557 Keyspace::Bucket { bucket } => {
558 if let Some(service) = service {
559 record_operation_metric_event!(
560 duration,
561 operation_name,
562 cluster_name,
563 cluster_uuid,
564 error_name,
565 couchbase.service = service,
566 db.namespace = bucket
567 );
568 } else {
569 record_operation_metric_event!(
570 duration,
571 operation_name,
572 cluster_name,
573 cluster_uuid,
574 error_name,
575 db.namespace = bucket
576 );
577 }
578 }
579 Keyspace::Scope { bucket, scope } => {
580 if let Some(service) = service {
581 record_operation_metric_event!(
582 duration,
583 operation_name,
584 cluster_name,
585 cluster_uuid,
586 error_name,
587 couchbase.service = service,
588 db.namespace = bucket,
589 couchbase.scope.name = scope
590 );
591 } else {
592 record_operation_metric_event!(
593 duration,
594 operation_name,
595 cluster_name,
596 cluster_uuid,
597 error_name,
598 db.namespace = bucket,
599 couchbase.scope.name = scope
600 );
601 }
602 }
603 Keyspace::Collection {
604 bucket,
605 scope,
606 collection,
607 } => {
608 if let Some(service) = service {
609 record_operation_metric_event!(
610 duration,
611 operation_name,
612 cluster_name,
613 cluster_uuid,
614 error_name,
615 couchbase.service = service,
616 db.namespace = bucket,
617 couchbase.scope.name = scope,
618 couchbase.collection.name = collection
619 );
620 } else {
621 record_operation_metric_event!(
622 duration,
623 operation_name,
624 cluster_name,
625 cluster_uuid,
626 error_name,
627 db.namespace = bucket,
628 couchbase.scope.name = scope,
629 couchbase.collection.name = collection
630 );
631 }
632 }
633 }
634}