1use crate::clusterlabels::ClusterLabels;
19use crate::memdx::durability_level::DurabilityLevel;
20use crate::util::get_host_port_from_uri;
21use arc_swap::ArcSwap;
22use std::fmt::Display;
23use std::future::Future;
24use std::net::SocketAddr;
25use std::sync::Arc;
26use std::time::{Duration, Instant};
27use tracing::{instrument, span, trace_span, Instrument, Level, Span};
28use url::Url;
29
30pub const SPAN_NAME_DISPATCH_TO_SERVER: &str = "dispatch_to_server";
31pub const SPAN_NAME_REQUEST_ENCODING: &str = "request_encoding";
32
33pub const SPAN_ATTRIB_DB_SYSTEM_KEY: &str = "db.system.name";
34pub const SPAN_ATTRIB_DB_SYSTEM_VALUE: &str = "couchbase";
35pub const SPAN_ATTRIB_OPERATION_ID_KEY: &str = "couchbase.operation_id";
36pub const SPAN_ATTRIB_OPERATION_KEY: &str = "db.operation.name";
37pub const SPAN_ATTRIB_RETRIES: &str = "couchbase.retries";
38pub const SPAN_ATTRIB_LOCAL_ID_KEY: &str = "couchbase.local_id";
39pub const SPAN_ATTRIB_NET_TRANSPORT_KEY: &str = "network.transport";
40pub const SPAN_ATTRIB_NET_TRANSPORT_VALUE: &str = "tcp";
41pub const SPAN_ATTRIB_NET_REMOTE_ADDRESS_KEY: &str = "server.address";
42pub const SPAN_ATTRIB_NET_REMOTE_PORT_KEY: &str = "server.port";
43pub const SPAN_ATTRIB_NET_PEER_ADDRESS_KEY: &str = "network.peer.address";
44pub const SPAN_ATTRIB_NET_PEER_PORT_KEY: &str = "network.peer.port";
45pub const SPAN_ATTRIB_SERVER_DURATION_KEY: &str = "couchbase.server_duration";
46pub const SPAN_ATTRIB_SERVICE_KEY: &str = "couchbase.service";
47pub const SPAN_ATTRIB_DB_NAME_KEY: &str = "db.namespace";
48pub const SPAN_ATTRIB_DB_COLLECTION_NAME_KEY: &str = "couchbase.collection.name";
49pub const SPAN_ATTRIB_DB_SCOPE_NAME_KEY: &str = "couchbase.scope.name";
50pub const SPAN_ATTRIB_DB_DURABILITY: &str = "couchbase.durability";
51pub const SPAN_ATTRIB_NUM_RETRIES: &str = "couchbase.retries";
52pub const SPAN_ATTRIB_CLUSTER_UUID_KEY: &str = "couchbase.cluster.uuid";
53pub const SPAN_ATTRIB_CLUSTER_NAME_KEY: &str = "couchbase.cluster.name";
54
55pub const METER_NAME_CB_OPERATION_DURATION: &str = "db.client.operation.duration";
56pub const METER_ATTRIB_SERVICE_KEY: &str = "couchbase.service";
57pub const METER_ATTRIB_OPERATION_KEY: &str = "db.operation.name";
58pub const METER_ATTRIB_BUCKET_NAME_KEY: &str = "db.namespace";
59pub const METER_ATTRIB_SCOPE_NAME_KEY: &str = "couchbase.scope.name";
60pub const METER_ATTRIB_COLLECTION_NAME_KEY: &str = "couchbase.collection.name";
61pub const METER_ATTRIB_ERROR_KEY: &str = "error.type";
62pub const METER_ATTRIB_CLUSTER_UUID_KEY: &str = "couchbase.cluster.uuid";
63pub const METER_ATTRIB_CLUSTER_NAME_KEY: &str = "couchbase.cluster.name";
64
65pub const SERVICE_VALUE_KV: &str = "kv";
66pub const SERVICE_VALUE_QUERY: &str = "query";
67pub const SERVICE_VALUE_ANALYTICS: &str = "analytics";
68pub const SERVICE_VALUE_SEARCH: &str = "search";
69pub const SERVICE_VALUE_MANAGEMENT: &str = "management";
70pub const SERVICE_VALUE_EVENTING: &str = "eventing";
71
72pub const SPAN_ATTRIB_OTEL_KIND_KEY: &str = "otel.kind";
73pub const SPAN_ATTRIB_OTEL_KIND_CLIENT_VALUE: &str = "client";
74
75macro_rules! record_operation_metric_event {
76 (
77 $duration:expr,
78 $operation:expr,
79 $cluster_name:expr,
80 $cluster_uuid:expr,
81 $error:expr
82 $(, $($field:ident).+ = $value:expr )*
83 ) => {{
84 let cluster_name = $cluster_name;
85 let cluster_uuid = $cluster_uuid;
86 let error = $error;
87 if let Some(error) = error {
88 match (cluster_name, cluster_uuid) {
89 (Some(name), Some(uuid)) => tracing::event!(
90 target: "couchbase::metrics",
91 Level::TRACE,
92 histogram.db.client.operation.duration = $duration,
93 otel.unit = "s",
94 db.operation.name = $operation,
95 db.system.name = SPAN_ATTRIB_DB_SYSTEM_VALUE,
96 $( $($field).+ = $value, )*
97 couchbase.cluster.name = name,
98 couchbase.cluster.uuid = uuid,
99 error.type = error,
100 ),
101 (Some(name), None) => tracing::event!(
102 target: "couchbase::metrics",
103 Level::TRACE,
104 histogram.db.client.operation.duration = $duration,
105 otel.unit = "s",
106 db.operation.name = $operation,
107 db.system.name = SPAN_ATTRIB_DB_SYSTEM_VALUE,
108 $( $($field).+ = $value, )*
109 couchbase.cluster.name = name,
110 error.type = error,
111 ),
112 (None, Some(uuid)) => tracing::event!(
113 target: "couchbase::metrics",
114 Level::TRACE,
115 histogram.db.client.operation.duration = $duration,
116 otel.unit = "s",
117 db.operation.name = $operation,
118 db.system.name = SPAN_ATTRIB_DB_SYSTEM_VALUE,
119 $( $($field).+ = $value, )*
120 couchbase.cluster.uuid = uuid,
121 error.type = error,
122 ),
123 (None, None) => tracing::event!(
124 target: "couchbase::metrics",
125 Level::TRACE,
126 histogram.db.client.operation.duration = $duration,
127 otel.unit = "s",
128 db.operation.name = $operation,
129 db.system.name = SPAN_ATTRIB_DB_SYSTEM_VALUE,
130 $( $($field).+ = $value, )*
131 error.type = error,
132 ),
133 }
134 } else {
135 match (cluster_name, cluster_uuid) {
136 (Some(name), Some(uuid)) => tracing::event!(
137 target: "couchbase::metrics",
138 Level::TRACE,
139 histogram.db.client.operation.duration = $duration,
140 otel.unit = "s",
141 db.operation.name = $operation,
142 db.system.name = SPAN_ATTRIB_DB_SYSTEM_VALUE,
143 $( $($field).+ = $value, )*
144 couchbase.cluster.name = name,
145 couchbase.cluster.uuid = uuid,
146 ),
147 (Some(name), None) => tracing::event!(
148 target: "couchbase::metrics",
149 Level::TRACE,
150 histogram.db.client.operation.duration = $duration,
151 otel.unit = "s",
152 db.operation.name = $operation,
153 db.system.name = SPAN_ATTRIB_DB_SYSTEM_VALUE,
154 $( $($field).+ = $value, )*
155 couchbase.cluster.name = name,
156 ),
157 (None, Some(uuid)) => tracing::event!(
158 target: "couchbase::metrics",
159 Level::TRACE,
160 histogram.db.client.operation.duration = $duration,
161 otel.unit = "s",
162 db.operation.name = $operation,
163 db.system.name = SPAN_ATTRIB_DB_SYSTEM_VALUE,
164 $( $($field).+ = $value, )*
165 couchbase.cluster.uuid = uuid,
166 ),
167 (None, None) => tracing::event!(
168 target: "couchbase::metrics",
169 Level::TRACE,
170 histogram.db.client.operation.duration = $duration,
171 otel.unit = "s",
172 db.operation.name = $operation,
173 db.system.name = SPAN_ATTRIB_DB_SYSTEM_VALUE,
174 $( $($field).+ = $value, )*
175 ),
176 }
177 }
178 }};
179}
180
181#[macro_export]
182macro_rules! create_span {
183 ($name:literal) => {
184 $crate::tracingcomponent::SpanBuilder::new(
185 $name,
186 tracing::trace_span!(
187 target: "couchbase::tracing",
188 $name,
189 otel.kind = $crate::tracingcomponent::SPAN_ATTRIB_OTEL_KIND_CLIENT_VALUE,
190 otel.status_code = tracing::field::Empty,
191 db.operation.name = $name,
192 db.system.name = $crate::tracingcomponent::SPAN_ATTRIB_DB_SYSTEM_VALUE,
193 couchbase.retries = 0,
194 couchbase.cluster.name = tracing::field::Empty,
195 couchbase.cluster.uuid = tracing::field::Empty,
196 couchbase.service = tracing::field::Empty,
197 db.namespace = tracing::field::Empty,
198 couchbase.scope.name = tracing::field::Empty,
199 couchbase.collection.name = tracing::field::Empty,
200 couchbase.durability = tracing::field::Empty,
201 ),
202 )
203 };
204}
205
206pub(crate) fn build_keyspace<'a>(
207 bucket: Option<&'a str>,
208 scope: Option<&'a str>,
209 collection_name: Option<&'a str>,
210) -> Keyspace<'a> {
211 match (bucket, scope, collection_name) {
212 (Some(bucket_str), Some(scope_name), Some(collection)) => Keyspace::Collection {
213 bucket: bucket_str,
214 scope: scope_name,
215 collection,
216 },
217 (Some(bucket_str), Some(scope_name), None) => Keyspace::Scope {
218 bucket: bucket_str,
219 scope: scope_name,
220 },
221 (Some(bucket_str), None, None) => Keyspace::Bucket { bucket: bucket_str },
222 _ => Keyspace::Cluster,
223 }
224}
225
226#[derive(Debug)]
227pub(crate) struct TracingComponent {
228 cluster_labels: ArcSwap<Option<ClusterLabels>>,
229}
230
231impl Default for TracingComponent {
232 fn default() -> Self {
233 Self {
234 cluster_labels: ArcSwap::new(Arc::new(None)),
235 }
236 }
237}
238
239impl TracingComponent {
240 pub(crate) fn new(config: TracingComponentConfig) -> Self {
241 Self {
242 cluster_labels: ArcSwap::new(Arc::new(config.cluster_labels)),
243 }
244 }
245
246 pub(crate) fn reconfigure(&self, state: TracingComponentConfig) {
247 self.cluster_labels.store(Arc::new(state.cluster_labels));
248 }
249
250 pub(crate) fn get_cluster_labels(&self) -> Option<ClusterLabels> {
251 (**self.cluster_labels.load()).clone()
252 }
253
254 pub(crate) fn create_dispatch_span(&self, fields: &BeginDispatchFields<'_>) -> Span {
255 let span = trace_span!(
256 target: "couchbase::tracing",
257 SPAN_NAME_DISPATCH_TO_SERVER,
258 otel.kind = SPAN_ATTRIB_OTEL_KIND_CLIENT_VALUE,
259 db.system.name = SPAN_ATTRIB_DB_SYSTEM_VALUE,
260 network.transport = SPAN_ATTRIB_NET_TRANSPORT_VALUE,
261 couchbase.cluster.uuid = tracing::field::Empty,
262 couchbase.cluster.name = tracing::field::Empty,
263 couchbase.server_duration = tracing::field::Empty,
264 couchbase.local_id = fields.client_id,
265 network.peer.address = fields.peer_addr.0,
266 network.peer.port = fields.peer_addr.1,
267 server.address = fields.canonical_addr.0,
268 server.port = fields.canonical_addr.1,
269 couchbase.operation_id = tracing::field::Empty,
270 );
271
272 self.record_cluster_labels(&span);
273 span
274 }
275
276 pub(crate) fn record_cluster_labels(&self, span: &Span) {
277 let cluster_labels = self.cluster_labels.load();
278 if let Some(ref cluster_labels) = **cluster_labels {
279 if let Some(ref cluster_uuid) = cluster_labels.cluster_uuid {
280 span.record(SPAN_ATTRIB_CLUSTER_UUID_KEY, cluster_uuid.as_str());
281 }
282 if let Some(ref cluster_name) = cluster_labels.cluster_name {
283 span.record(SPAN_ATTRIB_CLUSTER_NAME_KEY, cluster_name.as_str());
284 }
285 }
286 }
287
288 pub(crate) async fn orchestrate_dispatch_span<Fut, T, F>(
289 &self,
290 begin_fields: BeginDispatchFields<'_>,
291 operation: Fut,
292 end_fields_provider: F,
293 ) -> T
294 where
295 Fut: Future<Output = T> + Send,
296 F: FnOnce(&T) -> EndDispatchFields + Send,
297 {
298 if !tracing::span_enabled!(tracing::Level::TRACE) {
300 return operation.await;
301 }
302
303 let span = self.create_dispatch_span(&begin_fields);
304 let result = operation.instrument(span.clone()).await;
305 let end_fields = end_fields_provider(&result);
306 end_dispatch_span(span, end_fields);
307 result
308 }
309}
310
311#[derive(Debug, Clone, Default)]
312pub struct TracingComponentConfig {
313 pub cluster_labels: Option<ClusterLabels>,
314}
315
316pub enum OperationId {
317 String(String),
318 Number(u64),
319}
320
321impl OperationId {
322 pub fn from_u32(n: u32) -> Self {
323 Self::Number(n as u64)
324 }
325
326 pub fn from_string(s: String) -> Self {
327 Self::String(s)
328 }
329}
330
331pub struct EndDispatchFields {
332 pub server_duration: Option<Duration>,
333 pub operation_id: Option<OperationId>,
334}
335
336impl EndDispatchFields {
337 pub fn new(server_duration: Option<Duration>, operation_id: Option<OperationId>) -> Self {
338 Self {
339 server_duration,
340 operation_id,
341 }
342 }
343
344 pub fn server_duration(mut self, server_duration: Option<Duration>) -> Self {
345 self.server_duration = server_duration;
346 self
347 }
348
349 pub fn operation_id(mut self, operation_id: Option<OperationId>) -> Self {
350 self.operation_id = operation_id;
351 self
352 }
353}
354
355pub fn end_dispatch_span(span: Span, fields: EndDispatchFields) {
356 if let Some(server_duration) = fields.server_duration {
357 span.record(SPAN_ATTRIB_SERVER_DURATION_KEY, server_duration.as_micros());
358 }
359
360 if let Some(operation_id) = fields.operation_id {
361 match operation_id {
362 OperationId::String(s) => span.record(SPAN_ATTRIB_OPERATION_ID_KEY, s),
363 OperationId::Number(n) => span.record(SPAN_ATTRIB_OPERATION_ID_KEY, n),
364 };
365 }
366
367 drop(span);
368}
369
370#[derive(Debug)]
371pub(crate) struct BeginDispatchFields<'a> {
372 pub peer_addr: (&'a str, &'a str),
374 pub client_id: Option<&'a str>,
375 pub canonical_addr: (&'a str, &'a str),
377}
378
379impl<'a> BeginDispatchFields<'a> {
380 pub fn new(
381 peer_addr: (&'a str, &'a str),
382 canonical_addr: (&'a str, &'a str),
383 client_id: Option<&'a str>,
384 ) -> Self {
385 Self {
386 peer_addr,
387 client_id,
388 canonical_addr,
389 }
390 }
391}
392
393pub struct SpanBuilder {
394 name: &'static str,
395 span: tracing::Span,
396}
397
398#[derive(Clone, Debug)]
399pub enum Keyspace<'a> {
400 Cluster,
401 Bucket {
402 bucket: &'a str,
403 },
404 Scope {
405 bucket: &'a str,
406 scope: &'a str,
407 },
408 Collection {
409 bucket: &'a str,
410 scope: &'a str,
411 collection: &'a str,
412 },
413}
414
415impl Display for Keyspace<'_> {
416 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
417 match self {
418 Keyspace::Cluster => write!(f, "cluster"),
419 Keyspace::Bucket { bucket } => write!(f, "bucket:{}", bucket),
420 Keyspace::Scope { bucket, scope } => write!(f, "bucket:{}/scope:{}", bucket, scope),
421 Keyspace::Collection {
422 bucket,
423 scope,
424 collection,
425 } => write!(
426 f,
427 "bucket:{}/scope:{}/collection:{}",
428 bucket, scope, collection
429 ),
430 }
431 }
432}
433
434impl SpanBuilder {
435 pub fn new(name: &'static str, span: tracing::Span) -> Self {
436 Self { span, name }
437 }
438
439 pub fn span(&self) -> &tracing::Span {
440 &self.span
441 }
442
443 pub fn with_cluster_labels(self, cluster_labels: &Option<ClusterLabels>) -> Self {
444 if let Some(labels) = cluster_labels {
445 if let Some(uuid) = &labels.cluster_uuid {
446 self.span
447 .record(SPAN_ATTRIB_CLUSTER_UUID_KEY, uuid.as_str());
448 }
449 if let Some(name) = &labels.cluster_name {
450 self.span
451 .record(SPAN_ATTRIB_CLUSTER_NAME_KEY, name.as_str());
452 }
453 }
454 self
455 }
456
457 pub fn with_durability<D>(self, durability: Option<&D>) -> Self
458 where
459 for<'a> &'a D: Into<u8>,
460 {
461 let durability_str = if let Some(durability) = durability {
462 match durability.into() {
463 1 => "majority",
464 2 => "majority_and_persist_active",
465 3 => "persist_majority",
466 _ => return self,
467 }
468 } else {
469 return self;
470 };
471
472 self.span.record(SPAN_ATTRIB_DB_DURABILITY, durability_str);
473 self
474 }
475
476 pub fn with_keyspace(self, keyspace: &Keyspace<'_>) -> Self {
477 match keyspace {
478 Keyspace::Cluster => {}
479 Keyspace::Bucket { bucket } => {
480 self.span.record("db.namespace", *bucket);
481 }
482 Keyspace::Scope { bucket, scope } => {
483 self.span.record("db.namespace", *bucket);
484 self.span.record("couchbase.scope.name", *scope);
485 }
486 Keyspace::Collection {
487 bucket,
488 scope,
489 collection,
490 } => {
491 self.span.record("db.namespace", *bucket);
492 self.span.record("couchbase.scope.name", *scope);
493 self.span.record("couchbase.collection.name", *collection);
494 }
495 }
496 self
497 }
498
499 pub fn with_service(self, service: Option<&'static str>) -> Self {
500 if let Some(service) = service {
501 self.span.record(SPAN_ATTRIB_SERVICE_KEY, service);
502 }
503 self
504 }
505
506 pub fn with_statement(self, statement: &str) -> Self {
507 self.span.record("db.query.text", statement);
508 self
509 }
510
511 pub fn build(self) -> tracing::Span {
512 self.span
513 }
514
515 pub fn name(&self) -> &'static str {
516 self.name
517 }
518}
519
520pub trait MetricsName {
521 fn metrics_name(&self) -> &'static str;
522}
523
524pub fn record_metrics<E>(
525 operation_name: &str,
526 service: Option<&str>,
527 keyspace: &Keyspace<'_>,
528 cluster_labels: &Option<ClusterLabels>,
529 start: Instant,
530 error: Option<&E>,
531) where
532 E: MetricsName,
533{
534 let duration = start.elapsed().as_secs_f64();
535
536 let cluster_name = cluster_labels
537 .as_ref()
538 .and_then(|labels| labels.cluster_name.as_deref());
539 let cluster_uuid = cluster_labels
540 .as_ref()
541 .and_then(|labels| labels.cluster_uuid.as_deref());
542 let error_name = error.map(|err| err.metrics_name());
543
544 match keyspace {
545 Keyspace::Cluster => {
546 if let Some(service) = service {
547 record_operation_metric_event!(
548 duration,
549 operation_name,
550 cluster_name,
551 cluster_uuid,
552 error_name,
553 couchbase.service = service
554 );
555 } else {
556 record_operation_metric_event!(
557 duration,
558 operation_name,
559 cluster_name,
560 cluster_uuid,
561 error_name
562 );
563 }
564 }
565 Keyspace::Bucket { bucket } => {
566 if let Some(service) = service {
567 record_operation_metric_event!(
568 duration,
569 operation_name,
570 cluster_name,
571 cluster_uuid,
572 error_name,
573 couchbase.service = service,
574 db.namespace = bucket
575 );
576 } else {
577 record_operation_metric_event!(
578 duration,
579 operation_name,
580 cluster_name,
581 cluster_uuid,
582 error_name,
583 db.namespace = bucket
584 );
585 }
586 }
587 Keyspace::Scope { bucket, scope } => {
588 if let Some(service) = service {
589 record_operation_metric_event!(
590 duration,
591 operation_name,
592 cluster_name,
593 cluster_uuid,
594 error_name,
595 couchbase.service = service,
596 db.namespace = bucket,
597 couchbase.scope.name = scope
598 );
599 } else {
600 record_operation_metric_event!(
601 duration,
602 operation_name,
603 cluster_name,
604 cluster_uuid,
605 error_name,
606 db.namespace = bucket,
607 couchbase.scope.name = scope
608 );
609 }
610 }
611 Keyspace::Collection {
612 bucket,
613 scope,
614 collection,
615 } => {
616 if let Some(service) = service {
617 record_operation_metric_event!(
618 duration,
619 operation_name,
620 cluster_name,
621 cluster_uuid,
622 error_name,
623 couchbase.service = service,
624 db.namespace = bucket,
625 couchbase.scope.name = scope,
626 couchbase.collection.name = collection
627 );
628 } else {
629 record_operation_metric_event!(
630 duration,
631 operation_name,
632 cluster_name,
633 cluster_uuid,
634 error_name,
635 db.namespace = bucket,
636 couchbase.scope.name = scope,
637 couchbase.collection.name = collection
638 );
639 }
640 }
641 }
642}