Skip to main content

diesel_tracing/
sqlite.rs

1use diesel::associations::HasTable;
2use diesel::connection::{
3    AnsiTransactionManager, CacheSize, Connection, ConnectionSealed, DefaultLoadingMode,
4    Instrumentation, LoadConnection, MultiConnectionHelper, SimpleConnection, TransactionManager,
5};
6use diesel::deserialize::{FromSqlRow, StaticallySizedRow};
7use diesel::dsl::{Find, Update};
8use diesel::expression::{is_aggregate, MixedAggregates, QueryMetadata, ValidGrouping};
9use diesel::migration::{MigrationConnection, CREATE_MIGRATIONS_TABLE};
10use diesel::query_builder::{AsChangeset, IntoUpdateTarget, Query, QueryFragment, QueryId};
11use diesel::query_dsl::methods::{ExecuteDsl, FindDsl};
12use diesel::query_dsl::{LoadQuery, UpdateAndFetchResults};
13use diesel::result::{ConnectionResult, QueryResult};
14use diesel::serialize::ToSql;
15use diesel::sql_types::HasSqlType;
16use diesel::sqlite::{Sqlite, SqliteConnection};
17use diesel::RunQueryDsl;
18use diesel::{sql_query, Identifiable, Table};
19use tracing::{debug, instrument};
20
21#[cfg(feature = "r2d2")]
22use diesel::r2d2::R2D2Connection;
23
24pub struct InstrumentedSqliteConnection {
25    inner: SqliteConnection,
26}
27
28#[cfg(feature = "r2d2")]
29impl R2D2Connection for InstrumentedSqliteConnection {
30    fn ping(&mut self) -> QueryResult<()> {
31        self.inner.batch_execute("SELECT 1")?;
32
33        Ok(())
34    }
35}
36
37impl MultiConnectionHelper for InstrumentedSqliteConnection {
38    fn to_any<'a>(
39        lookup: &mut <Self::Backend as diesel::sql_types::TypeMetadata>::MetadataLookup,
40    ) -> &mut (dyn std::any::Any + 'a) {
41        lookup
42    }
43
44    fn from_any(
45        lookup: &mut dyn std::any::Any,
46    ) -> Option<&mut <Self::Backend as diesel::sql_types::TypeMetadata>::MetadataLookup> {
47        lookup.downcast_mut()
48    }
49}
50
51impl SimpleConnection for InstrumentedSqliteConnection {
52    #[instrument(fields(db.system="sqlite", otel.kind="client"), skip(self, query), err)]
53    fn batch_execute(&mut self, query: &str) -> QueryResult<()> {
54        self.inner.batch_execute(query)?;
55
56        Ok(())
57    }
58}
59
60impl ConnectionSealed for InstrumentedSqliteConnection {}
61
62impl Connection for InstrumentedSqliteConnection {
63    type Backend = Sqlite;
64    type TransactionManager = AnsiTransactionManager;
65
66    #[instrument(fields(db.system="sqlite", otel.kind="client"), skip(database_url), err)]
67    fn establish(database_url: &str) -> ConnectionResult<InstrumentedSqliteConnection> {
68        Ok(InstrumentedSqliteConnection {
69            inner: SqliteConnection::establish(database_url)?,
70        })
71    }
72
73    #[instrument(fields(db.system="sqlite", otel.kind="client"), skip(self, f))]
74    fn transaction<T, E, F>(&mut self, f: F) -> Result<T, E>
75    where
76        F: FnOnce(&mut Self) -> Result<T, E>,
77        E: From<diesel::result::Error>,
78    {
79        Self::TransactionManager::transaction(self, f)
80    }
81
82    #[instrument(fields(db.system="sqlite", otel.kind="client"), skip(self, source), err)]
83    fn execute_returning_count<T>(&mut self, source: &T) -> QueryResult<usize>
84    where
85        T: QueryFragment<Sqlite> + QueryId,
86    {
87        self.inner.execute_returning_count(source)
88    }
89
90    #[instrument(fields(db.system="sqlite", otel.kind="client"), skip(self))]
91    fn transaction_state(&mut self) -> &mut Self::TransactionManager {
92        self.inner.transaction_state()
93    }
94
95    #[instrument(fields(db.system="sqlite", otel.kind="client"), skip(self))]
96    fn instrumentation(&mut self) -> &mut dyn Instrumentation {
97        self.inner.instrumentation()
98    }
99
100    #[instrument(fields(db.system="sqlite", otel.kind="client"), skip(self, instrumentation))]
101    fn set_instrumentation(&mut self, instrumentation: impl Instrumentation) {
102        self.inner.set_instrumentation(instrumentation)
103    }
104
105    #[instrument(fields(db.system="sqlite", otel.kind="client"), skip(self, cache_size))]
106    fn set_prepared_statement_cache_size(&mut self, cache_size: CacheSize) {
107        self.inner.set_prepared_statement_cache_size(cache_size)
108    }
109}
110
111impl LoadConnection<DefaultLoadingMode> for InstrumentedSqliteConnection {
112    type Cursor<'conn, 'query>
113        = <SqliteConnection as LoadConnection<DefaultLoadingMode>>::Cursor<'conn, 'query>
114    where
115        Self: 'conn;
116    type Row<'conn, 'query>
117        = <SqliteConnection as LoadConnection<DefaultLoadingMode>>::Row<'conn, 'query>
118    where
119        Self: 'conn;
120
121    #[cfg_attr(
122        feature = "statement-fields",
123        instrument(
124            fields(
125                db.system="sqlite",
126                otel.kind="client",
127                db.statement=%diesel::debug_query(&source),
128            ),
129            skip(self, source),
130            err,
131        )
132    )]
133    #[cfg_attr(
134        not(feature = "statement-fields"),
135        instrument(
136            fields(
137                db.system="sqlite",
138                otel.kind="client",
139            ),
140            skip(self, source),
141            err,
142        )
143    )]
144    fn load<'conn, 'query, T>(
145        &'conn mut self,
146        source: T,
147    ) -> QueryResult<Self::Cursor<'conn, 'query>>
148    where
149        T: Query + QueryFragment<Self::Backend> + QueryId + 'query,
150        Self::Backend: QueryMetadata<T::SqlType>,
151    {
152        self.inner.load(source)
153    }
154}
155
156impl MigrationConnection for InstrumentedSqliteConnection {
157    fn setup(&mut self) -> QueryResult<usize> {
158        sql_query(CREATE_MIGRATIONS_TABLE).execute(self)
159    }
160}
161
162impl InstrumentedSqliteConnection {
163    #[instrument(fields(db.system="sqlite", otel.kind="client"), skip(self, f))]
164    pub fn immediate_transaction<T, E, F>(&mut self, f: F) -> Result<T, E>
165    where
166        F: FnOnce(&mut SqliteConnection) -> Result<T, E>,
167        E: From<diesel::result::Error>,
168    {
169        self.inner.immediate_transaction(f)
170    }
171
172    #[instrument(fields(db.system="sqlite", otel.kind="client"), skip(self, f))]
173    pub fn exclusive_transaction<T, E, F>(&mut self, f: F) -> Result<T, E>
174    where
175        F: FnOnce(&mut SqliteConnection) -> Result<T, E>,
176        E: From<diesel::result::Error>,
177    {
178        self.inner.exclusive_transaction(f)
179    }
180
181    #[doc(hidden)]
182    #[instrument(fields(db.system="sqlite", otel.kind="client"), skip(self, f))]
183    pub fn register_sql_function<ArgsSqlType, RetSqlType, Args, Ret, F>(
184        &mut self,
185        fn_name: &str,
186        deterministic: bool,
187        f: F,
188    ) -> QueryResult<()>
189    where
190        F: FnMut(Args) -> Ret + std::panic::UnwindSafe + Send + 'static,
191        Args: FromSqlRow<ArgsSqlType, Sqlite> + StaticallySizedRow<ArgsSqlType, Sqlite>,
192        Ret: ToSql<RetSqlType, Sqlite>,
193        Sqlite: HasSqlType<RetSqlType>,
194    {
195        self.inner.register_sql_function(fn_name, deterministic, f)
196    }
197}
198
199impl<'b, Changes, Output> UpdateAndFetchResults<Changes, Output> for InstrumentedSqliteConnection
200where
201    Changes: Copy + Identifiable,
202    Changes: AsChangeset<Target = <Changes as HasTable>::Table> + IntoUpdateTarget,
203    Changes::Table: FindDsl<Changes::Id>,
204    Update<Changes, Changes>: ExecuteDsl<SqliteConnection>,
205    Find<Changes::Table, Changes::Id>: LoadQuery<'b, SqliteConnection, Output>,
206    <Changes::Table as Table>::AllColumns: ValidGrouping<()>,
207    <<Changes::Table as Table>::AllColumns as ValidGrouping<()>>::IsAggregate:
208        MixedAggregates<is_aggregate::No, Output = is_aggregate::No>,
209{
210    fn update_and_fetch(&mut self, changeset: Changes) -> QueryResult<Output> {
211        debug!("updating and fetching changeset");
212        self.inner.update_and_fetch(changeset)
213    }
214}