1#[cfg(feature = "otel")]
40use opentelemetry::{
41 KeyValue, global,
42 trace::{Span, SpanKind, Status, Tracer},
43};
44
45pub const DB_SYSTEM: &str = "mssql";
47
48pub mod span_names {
50 pub const CONNECT: &str = "mssql.connect";
52 pub const QUERY: &str = "mssql.query";
54 pub const EXECUTE: &str = "mssql.execute";
56 pub const BEGIN_TRANSACTION: &str = "mssql.begin_transaction";
58 pub const COMMIT: &str = "mssql.commit";
60 pub const ROLLBACK: &str = "mssql.rollback";
62 pub const SAVEPOINT: &str = "mssql.savepoint";
64 pub const BULK_INSERT: &str = "mssql.bulk_insert";
66}
67
68pub mod attributes {
70 pub const DB_SYSTEM: &str = "db.system";
72 pub const DB_NAME: &str = "db.name";
74 pub const DB_STATEMENT: &str = "db.statement";
76 pub const DB_OPERATION: &str = "db.operation";
78 pub const SERVER_ADDRESS: &str = "server.address";
80 pub const SERVER_PORT: &str = "server.port";
82 pub const DB_ROWS_AFFECTED: &str = "db.rows_affected";
84 pub const DB_ISOLATION_LEVEL: &str = "db.mssql.isolation_level";
86 pub const DB_CONNECTION_ID: &str = "db.connection_id";
88 pub const ERROR_TYPE: &str = "error.type";
90}
91
92#[derive(Debug, Clone)]
94pub struct SanitizationConfig {
95 pub enabled: bool,
97 pub max_length: usize,
99 pub placeholder: String,
101}
102
103impl Default for SanitizationConfig {
104 fn default() -> Self {
105 Self {
106 enabled: true,
107 max_length: 2048,
108 placeholder: "?".to_string(),
109 }
110 }
111}
112
113impl SanitizationConfig {
114 #[must_use]
116 pub fn no_sanitization() -> Self {
117 Self {
118 enabled: false,
119 max_length: usize::MAX,
120 placeholder: String::new(),
121 }
122 }
123
124 #[must_use]
126 pub fn sanitize(&self, sql: &str) -> String {
127 if !self.enabled {
128 return truncate_string(sql, self.max_length);
129 }
130
131 let sanitized = sanitize_sql(sql, &self.placeholder);
133 truncate_string(&sanitized, self.max_length)
134 }
135}
136
137fn sanitize_sql(sql: &str, placeholder: &str) -> String {
139 let mut result = String::with_capacity(sql.len());
140 let mut chars = sql.chars().peekable();
141 let mut in_string = false;
142 let mut string_char = ' ';
143
144 while let Some(c) = chars.next() {
145 if in_string {
146 if c == string_char {
147 if chars.peek() == Some(&string_char) {
149 chars.next();
150 continue;
151 }
152 in_string = false;
153 result.push_str(placeholder);
154 }
155 continue;
156 }
157
158 if c == '\'' || c == '"' {
159 in_string = true;
160 string_char = c;
161 continue;
162 }
163
164 if c.is_ascii_digit() && !result.ends_with(|ch: char| ch.is_alphanumeric() || ch == '_') {
166 while chars
168 .peek()
169 .is_some_and(|ch| ch.is_ascii_digit() || *ch == '.')
170 {
171 chars.next();
172 }
173 result.push_str(placeholder);
174 continue;
175 }
176
177 result.push(c);
178 }
179
180 if in_string {
182 result.push_str(placeholder);
183 }
184
185 result
186}
187
188fn truncate_string(s: &str, max_len: usize) -> String {
190 if s.len() <= max_len {
191 s.to_string()
192 } else {
193 format!("{}...", &s[..max_len.saturating_sub(3)])
194 }
195}
196
197#[must_use]
199pub fn extract_operation(sql: &str) -> &'static str {
200 let sql_upper = sql.trim().to_uppercase();
201
202 if sql_upper.starts_with("SELECT") {
203 "SELECT"
204 } else if sql_upper.starts_with("INSERT") {
205 "INSERT"
206 } else if sql_upper.starts_with("UPDATE") {
207 "UPDATE"
208 } else if sql_upper.starts_with("DELETE") {
209 "DELETE"
210 } else if sql_upper.starts_with("EXEC") || sql_upper.starts_with("EXECUTE") {
211 "EXECUTE"
212 } else if sql_upper.starts_with("BEGIN TRAN") {
213 "BEGIN"
214 } else if sql_upper.starts_with("COMMIT") {
215 "COMMIT"
216 } else if sql_upper.starts_with("ROLLBACK") {
217 "ROLLBACK"
218 } else if sql_upper.starts_with("CREATE") {
219 "CREATE"
220 } else if sql_upper.starts_with("ALTER") {
221 "ALTER"
222 } else if sql_upper.starts_with("DROP") {
223 "DROP"
224 } else {
225 "OTHER"
226 }
227}
228
229#[cfg(feature = "otel")]
231#[derive(Debug, Clone)]
232pub struct InstrumentationContext {
233 pub server_address: String,
235 pub server_port: u16,
237 pub database: Option<String>,
239 pub sanitization: SanitizationConfig,
241}
242
243#[cfg(feature = "otel")]
244impl InstrumentationContext {
245 #[must_use]
247 pub fn new(server_address: String, server_port: u16) -> Self {
248 Self {
249 server_address,
250 server_port,
251 database: None,
252 sanitization: SanitizationConfig::default(),
253 }
254 }
255
256 #[must_use]
258 pub fn with_database(mut self, database: impl Into<String>) -> Self {
259 self.database = Some(database.into());
260 self
261 }
262
263 #[must_use]
265 pub fn with_sanitization(mut self, config: SanitizationConfig) -> Self {
266 self.sanitization = config;
267 self
268 }
269
270 pub fn base_attributes(&self) -> Vec<KeyValue> {
272 let mut attrs = vec![
273 KeyValue::new(attributes::DB_SYSTEM, DB_SYSTEM),
274 KeyValue::new(attributes::SERVER_ADDRESS, self.server_address.clone()),
275 KeyValue::new(attributes::SERVER_PORT, i64::from(self.server_port)),
276 ];
277
278 if let Some(ref db) = self.database {
279 attrs.push(KeyValue::new(attributes::DB_NAME, db.clone()));
280 }
281
282 attrs
283 }
284
285 pub fn connection_span(&self) -> impl Span {
287 let tracer = global::tracer("mssql-client");
288 let mut attrs = self.base_attributes();
289 attrs.push(KeyValue::new(
290 "db.connection_string.host",
291 self.server_address.clone(),
292 ));
293
294 tracer
295 .span_builder(span_names::CONNECT)
296 .with_kind(SpanKind::Client)
297 .with_attributes(attrs)
298 .start(&tracer)
299 }
300
301 pub fn query_span(&self, sql: &str) -> impl Span {
303 let tracer = global::tracer("mssql-client");
304 let mut attrs = self.base_attributes();
305
306 let operation = extract_operation(sql);
307 attrs.push(KeyValue::new(attributes::DB_OPERATION, operation));
308 attrs.push(KeyValue::new(
309 attributes::DB_STATEMENT,
310 self.sanitization.sanitize(sql),
311 ));
312
313 tracer
314 .span_builder(span_names::QUERY)
315 .with_kind(SpanKind::Client)
316 .with_attributes(attrs)
317 .start(&tracer)
318 }
319
320 pub fn transaction_span(&self, operation: &str) -> impl Span {
322 let tracer = global::tracer("mssql-client");
323 let mut attrs = self.base_attributes();
324 attrs.push(KeyValue::new(
325 attributes::DB_OPERATION,
326 operation.to_string(),
327 ));
328
329 let span_name = match operation {
330 "BEGIN" => span_names::BEGIN_TRANSACTION,
331 "COMMIT" => span_names::COMMIT,
332 "ROLLBACK" => span_names::ROLLBACK,
333 _ => span_names::SAVEPOINT,
334 };
335
336 tracer
337 .span_builder(span_name)
338 .with_kind(SpanKind::Client)
339 .with_attributes(attrs)
340 .start(&tracer)
341 }
342
343 pub fn record_error(span: &mut impl Span, error: &crate::error::Error) {
345 span.set_status(Status::error(error.to_string()));
346 span.record_error(error);
347 }
348
349 pub fn record_success(span: &mut impl Span, rows_affected: Option<u64>) {
351 span.set_status(Status::Ok);
352 if let Some(rows) = rows_affected {
353 span.set_attribute(KeyValue::new(attributes::DB_ROWS_AFFECTED, rows as i64));
354 }
355 }
356}
357
358#[cfg(not(feature = "otel"))]
360#[derive(Debug, Clone, Default)]
361pub struct InstrumentationContext;
362
363#[cfg(not(feature = "otel"))]
364impl InstrumentationContext {
365 #[must_use]
367 pub fn new(_server_address: String, _server_port: u16) -> Self {
368 Self
369 }
370
371 #[must_use]
373 pub fn with_database(self, _database: impl Into<String>) -> Self {
374 self
375 }
376
377 #[must_use]
379 pub fn with_sanitization(self, _config: SanitizationConfig) -> Self {
380 self
381 }
382}
383
384pub mod metric_names {
390 pub const DB_CLIENT_CONNECTIONS_USAGE: &str = "db.client.connections.usage";
392 pub const DB_CLIENT_CONNECTIONS_IDLE: &str = "db.client.connections.idle";
394 pub const DB_CLIENT_CONNECTIONS_MAX: &str = "db.client.connections.max";
396 pub const DB_CLIENT_CONNECTIONS_CREATE_TOTAL: &str = "db.client.connections.create.total";
398 pub const DB_CLIENT_CONNECTIONS_CLOSE_TOTAL: &str = "db.client.connections.close.total";
400 pub const DB_CLIENT_OPERATION_DURATION: &str = "db.client.operation.duration";
402 pub const DB_CLIENT_OPERATIONS_TOTAL: &str = "db.client.operations.total";
404 pub const DB_CLIENT_ERRORS_TOTAL: &str = "db.client.errors.total";
406 pub const DB_CLIENT_CONNECTIONS_WAIT_TIME: &str = "db.client.connections.wait_time";
408}
409
410#[cfg(feature = "otel")]
412pub struct DatabaseMetrics {
413 connections_usage: opentelemetry::metrics::Gauge<u64>,
415 connections_idle: opentelemetry::metrics::Gauge<u64>,
417 connections_max: opentelemetry::metrics::Gauge<u64>,
419 connections_create_total: opentelemetry::metrics::Counter<u64>,
421 connections_close_total: opentelemetry::metrics::Counter<u64>,
423 operation_duration: opentelemetry::metrics::Histogram<f64>,
425 operations_total: opentelemetry::metrics::Counter<u64>,
427 errors_total: opentelemetry::metrics::Counter<u64>,
429 connections_wait_time: opentelemetry::metrics::Histogram<f64>,
431 base_attributes: Vec<opentelemetry::KeyValue>,
433}
434
435#[cfg(feature = "otel")]
436impl DatabaseMetrics {
437 pub fn new(pool_name: Option<&str>, server_address: &str, server_port: u16) -> Self {
445 use opentelemetry::{KeyValue, global};
446
447 let meter = global::meter("mssql-client");
448
449 let connections_usage = meter
450 .u64_gauge(metric_names::DB_CLIENT_CONNECTIONS_USAGE)
451 .with_description("Number of connections currently in use")
452 .with_unit("connections")
453 .build();
454
455 let connections_idle = meter
456 .u64_gauge(metric_names::DB_CLIENT_CONNECTIONS_IDLE)
457 .with_description("Number of idle connections available")
458 .with_unit("connections")
459 .build();
460
461 let connections_max = meter
462 .u64_gauge(metric_names::DB_CLIENT_CONNECTIONS_MAX)
463 .with_description("Maximum number of connections allowed")
464 .with_unit("connections")
465 .build();
466
467 let connections_create_total = meter
468 .u64_counter(metric_names::DB_CLIENT_CONNECTIONS_CREATE_TOTAL)
469 .with_description("Total number of connections created")
470 .with_unit("connections")
471 .build();
472
473 let connections_close_total = meter
474 .u64_counter(metric_names::DB_CLIENT_CONNECTIONS_CLOSE_TOTAL)
475 .with_description("Total number of connections closed")
476 .with_unit("connections")
477 .build();
478
479 let operation_duration = meter
480 .f64_histogram(metric_names::DB_CLIENT_OPERATION_DURATION)
481 .with_description("Duration of database operations")
482 .with_unit("s")
483 .build();
484
485 let operations_total = meter
486 .u64_counter(metric_names::DB_CLIENT_OPERATIONS_TOTAL)
487 .with_description("Total number of database operations")
488 .with_unit("operations")
489 .build();
490
491 let errors_total = meter
492 .u64_counter(metric_names::DB_CLIENT_ERRORS_TOTAL)
493 .with_description("Total number of operation errors")
494 .with_unit("errors")
495 .build();
496
497 let connections_wait_time = meter
498 .f64_histogram(metric_names::DB_CLIENT_CONNECTIONS_WAIT_TIME)
499 .with_description("Time spent waiting for a connection")
500 .with_unit("s")
501 .build();
502
503 let mut base_attributes = vec![
504 KeyValue::new(attributes::DB_SYSTEM, DB_SYSTEM),
505 KeyValue::new(attributes::SERVER_ADDRESS, server_address.to_string()),
506 KeyValue::new(attributes::SERVER_PORT, i64::from(server_port)),
507 ];
508
509 if let Some(name) = pool_name {
510 base_attributes.push(KeyValue::new("db.client.pool.name", name.to_string()));
511 }
512
513 Self {
514 connections_usage,
515 connections_idle,
516 connections_max,
517 connections_create_total,
518 connections_close_total,
519 operation_duration,
520 operations_total,
521 errors_total,
522 connections_wait_time,
523 base_attributes,
524 }
525 }
526
527 pub fn record_pool_status(&self, in_use: u64, idle: u64, max: u64) {
529 self.connections_usage.record(in_use, &self.base_attributes);
530 self.connections_idle.record(idle, &self.base_attributes);
531 self.connections_max.record(max, &self.base_attributes);
532 }
533
534 pub fn record_connection_created(&self) {
536 self.connections_create_total.add(1, &self.base_attributes);
537 }
538
539 pub fn record_connection_closed(&self) {
541 self.connections_close_total.add(1, &self.base_attributes);
542 }
543
544 pub fn record_operation(&self, operation: &str, duration_seconds: f64, success: bool) {
546 use opentelemetry::KeyValue;
547
548 let mut attrs = self.base_attributes.clone();
549 attrs.push(KeyValue::new(
550 attributes::DB_OPERATION,
551 operation.to_string(),
552 ));
553 attrs.push(KeyValue::new("db.operation.success", success));
554
555 self.operations_total.add(1, &attrs);
556 self.operation_duration.record(duration_seconds, &attrs);
557
558 if !success {
559 self.errors_total.add(1, &attrs);
560 }
561 }
562
563 pub fn record_connection_wait(&self, duration_seconds: f64) {
565 self.connections_wait_time
566 .record(duration_seconds, &self.base_attributes);
567 }
568}
569
570#[cfg(not(feature = "otel"))]
572#[derive(Debug, Clone, Default)]
573pub struct DatabaseMetrics;
574
575#[cfg(not(feature = "otel"))]
576impl DatabaseMetrics {
577 #[must_use]
579 pub fn new(_pool_name: Option<&str>, _server_address: &str, _server_port: u16) -> Self {
580 Self
581 }
582
583 pub fn record_pool_status(&self, _in_use: u64, _idle: u64, _max: u64) {}
585
586 pub fn record_connection_created(&self) {}
588
589 pub fn record_connection_closed(&self) {}
591
592 pub fn record_operation(&self, _operation: &str, _duration_seconds: f64, _success: bool) {}
594
595 pub fn record_connection_wait(&self, _duration_seconds: f64) {}
597}
598
599#[derive(Debug, Clone)]
601pub struct OperationTimer {
602 start: std::time::Instant,
603 operation: &'static str,
604}
605
606impl OperationTimer {
607 #[must_use]
609 pub fn start(operation: &'static str) -> Self {
610 Self {
611 start: std::time::Instant::now(),
612 operation,
613 }
614 }
615
616 #[must_use]
618 pub fn elapsed_seconds(&self) -> f64 {
619 self.start.elapsed().as_secs_f64()
620 }
621
622 #[must_use]
624 pub fn operation(&self) -> &'static str {
625 self.operation
626 }
627
628 #[cfg(feature = "otel")]
630 pub fn finish(self, metrics: &DatabaseMetrics, success: bool) {
631 metrics.record_operation(self.operation, self.elapsed_seconds(), success);
632 }
633
634 #[cfg(not(feature = "otel"))]
636 pub fn finish(self, _metrics: &DatabaseMetrics, _success: bool) {}
637}
638
639#[cfg(test)]
640#[allow(clippy::unwrap_used)]
641mod tests {
642 use super::*;
643
644 #[test]
645 fn test_extract_operation() {
646 assert_eq!(extract_operation("SELECT * FROM users"), "SELECT");
647 assert_eq!(extract_operation(" select id from users"), "SELECT");
648 assert_eq!(extract_operation("INSERT INTO users VALUES (1)"), "INSERT");
649 assert_eq!(extract_operation("UPDATE users SET name = 'foo'"), "UPDATE");
650 assert_eq!(extract_operation("DELETE FROM users"), "DELETE");
651 assert_eq!(extract_operation("EXEC sp_help"), "EXECUTE");
652 assert_eq!(extract_operation("BEGIN TRANSACTION"), "BEGIN");
653 assert_eq!(extract_operation("COMMIT"), "COMMIT");
654 assert_eq!(extract_operation("ROLLBACK"), "ROLLBACK");
655 assert_eq!(extract_operation("CREATE TABLE foo"), "CREATE");
656 assert_eq!(extract_operation("unknown stuff"), "OTHER");
657 }
658
659 #[test]
660 fn test_sanitize_sql() {
661 let placeholder = "?";
662
663 assert_eq!(
665 sanitize_sql("SELECT * FROM users WHERE name = 'Alice'", placeholder),
666 "SELECT * FROM users WHERE name = ?"
667 );
668
669 assert_eq!(
671 sanitize_sql("INSERT INTO t VALUES ('a', 'b')", placeholder),
672 "INSERT INTO t VALUES (?, ?)"
673 );
674
675 assert_eq!(
677 sanitize_sql("SELECT * WHERE name = 'O''Brien'", placeholder),
678 "SELECT * WHERE name = ?"
679 );
680
681 assert_eq!(
683 sanitize_sql("SELECT * WHERE id = 123", placeholder),
684 "SELECT * WHERE id = ?"
685 );
686
687 assert_eq!(
689 sanitize_sql("SELECT * WHERE id = 42 AND name = 'test'", placeholder),
690 "SELECT * WHERE id = ? AND name = ?"
691 );
692 }
693
694 #[test]
695 fn test_truncate_string() {
696 assert_eq!(truncate_string("hello", 10), "hello");
697 assert_eq!(truncate_string("hello world", 8), "hello...");
698 assert_eq!(truncate_string("hi", 2), "hi");
699 }
700
701 #[test]
702 fn test_sanitization_config_default() {
703 let config = SanitizationConfig::default();
704 assert!(config.enabled);
705 assert_eq!(config.max_length, 2048);
706 assert_eq!(config.placeholder, "?");
707 }
708
709 #[test]
710 fn test_sanitization_config_no_sanitization() {
711 let config = SanitizationConfig::no_sanitization();
712 assert!(!config.enabled);
713
714 let sql = "SELECT * FROM users WHERE name = 'Alice'";
715 assert_eq!(config.sanitize(sql), sql);
716 }
717}