sqlx_datadog/
lib.rs

1#![doc=include_str!("../README.md")]
2
3use proc_macro::TokenStream;
4use quote::{ToTokens, quote};
5use syn::{Meta, parse_macro_input, punctuated::Punctuated};
6
7/// Specialized version of `tracing::instrument` for recording SQLx queries to Datadog.
8///
9/// Accepts all arguments `tracing::instrument` accepts, but patches in extra fields.
10///
11/// By default, expects a function argument called `db` that has a reference to the connection, but
12/// accepts a `db` parameter with an alternative identifier.
13///
14/// For optimal results, the `db.statement` span tag should be set to the text of the SQL query
15/// executed.
16///
17/// ```
18/// # #[macro_use] extern crate sqlx_datadog;
19/// # use sqlx::Execute;
20/// #
21/// # #[derive(Debug, sqlx::FromRow)]
22/// # struct User { name: String, email: String }
23/// #
24/// #[instrument_query(skip(db))]
25/// async fn fetch_user(db: &sqlx::MySqlPool, user_id: i64) -> Result<User, sqlx::Error> {
26///     let query = sqlx::query_as("SELECT name, email FROM users WHERE id = ? LIMIT 1");
27///     tracing::Span::current().record("db.statement", query.sql().trim());
28///     query.bind(user_id).fetch_one(db).await
29/// }
30/// ```
31#[proc_macro_attribute]
32pub fn instrument_query(args: TokenStream, item: TokenStream) -> TokenStream {
33    let args = parse_macro_input!(args with Punctuated::<Meta, syn::Token![,]>::parse_terminated);
34    let mut input_fn = parse_macro_input!(item as syn::ItemFn);
35
36    let mut instrument_args: Vec<Meta> = vec![];
37    let mut fields = vec![];
38    let mut db_ident = quote! { db };
39
40    for arg in args {
41        if let Meta::NameValue(name_value) = arg.clone() {
42            if name_value.path.get_ident().unwrap() == "db" {
43                db_ident = name_value.value.into_token_stream();
44            } else {
45                instrument_args.push(arg);
46            }
47        } else if let Meta::List(list_value) = arg.clone() {
48            if list_value.path.get_ident().unwrap() == "fields" {
49                fields.extend(list_value.tokens);
50            } else {
51                instrument_args.push(arg);
52            }
53        } else {
54            instrument_args.push(arg);
55        }
56    }
57
58    // These are in reverse.
59    let injected_tags = vec![
60        quote! { ::tracing::Span::current().record("peer.service", #db_ident.connect_options().get_database()); },
61        quote! { ::tracing::Span::current().record("peer.hostname", #db_ident.connect_options().get_host()); },
62        quote! { ::tracing::Span::current().record("out.host", #db_ident.connect_options().get_host()); },
63        quote! { ::tracing::Span::current().record("out.port", #db_ident.connect_options().get_port()); },
64        quote! { ::tracing::Span::current().record("db.instance", #db_ident.connect_options().get_database()); },
65        quote! { ::tracing::Span::current().record("db.name", #db_ident.connect_options().get_database()); },
66        quote! { ::tracing::Span::current().record("db.system", #db_ident.connect_options().to_url_lossy().scheme().replace("postgres", "postgresql")); },
67        quote! { use ::sqlx::ConnectOptions; },
68    ];
69    for tag in injected_tags {
70        input_fn
71            .block
72            .stmts
73            .insert(0, syn::parse(tag.into()).unwrap());
74    }
75
76    let instrument_attr = quote! {
77        #[::tracing::instrument(
78            fields(
79                span.kind = "client",
80                span.type = "sql",
81                operation = "query",
82                peer.hostname,
83                peer.service,
84                out.host,
85                out.port,
86                db.system,
87                db.instance,
88                db.name,
89                db.statement,
90                #(#fields),*
91            )
92            #(#instrument_args),*
93        )]
94    };
95
96    // TODO Inject propagation comment into query.
97    // Format is /*key=value,key=value*/
98    // keys are:
99    // dde (environment)
100    // ddps (parent service)
101    // ddpv (parent version)
102    // ddh (db peer host)
103    // dddb (db instance)
104    // traceparent (span id)
105
106    let output = quote! {
107        #instrument_attr
108        #input_fn
109    };
110
111    TokenStream::from(output)
112}