1use diesel::associations::HasTable;
2use diesel::connection::{
3 AnsiTransactionManager, CacheSize, Connection, ConnectionSealed, DefaultLoadingMode,
4 Instrumentation, LoadConnection, MultiConnectionHelper, SimpleConnection, TransactionManager,
5};
6use diesel::deserialize::{FromSqlRow, StaticallySizedRow};
7use diesel::dsl::{Find, Update};
8use diesel::expression::{is_aggregate, MixedAggregates, QueryMetadata, ValidGrouping};
9use diesel::migration::{MigrationConnection, CREATE_MIGRATIONS_TABLE};
10use diesel::query_builder::{AsChangeset, IntoUpdateTarget, Query, QueryFragment, QueryId};
11use diesel::query_dsl::methods::{ExecuteDsl, FindDsl};
12use diesel::query_dsl::{LoadQuery, UpdateAndFetchResults};
13use diesel::result::{ConnectionResult, QueryResult};
14use diesel::serialize::ToSql;
15use diesel::sql_types::HasSqlType;
16use diesel::sqlite::{Sqlite, SqliteConnection};
17use diesel::RunQueryDsl;
18use diesel::{sql_query, Identifiable, Table};
19use tracing::{debug, instrument};
20
21#[cfg(feature = "r2d2")]
22use diesel::r2d2::R2D2Connection;
23
24pub struct InstrumentedSqliteConnection {
25 inner: SqliteConnection,
26}
27
28#[cfg(feature = "r2d2")]
29impl R2D2Connection for InstrumentedSqliteConnection {
30 fn ping(&mut self) -> QueryResult<()> {
31 self.inner.batch_execute("SELECT 1")?;
32
33 Ok(())
34 }
35}
36
37impl MultiConnectionHelper for InstrumentedSqliteConnection {
38 fn to_any<'a>(
39 lookup: &mut <Self::Backend as diesel::sql_types::TypeMetadata>::MetadataLookup,
40 ) -> &mut (dyn std::any::Any + 'a) {
41 lookup
42 }
43
44 fn from_any(
45 lookup: &mut dyn std::any::Any,
46 ) -> Option<&mut <Self::Backend as diesel::sql_types::TypeMetadata>::MetadataLookup> {
47 lookup.downcast_mut()
48 }
49}
50
51impl SimpleConnection for InstrumentedSqliteConnection {
52 #[instrument(fields(db.system="sqlite", otel.kind="client"), skip(self, query), err)]
53 fn batch_execute(&mut self, query: &str) -> QueryResult<()> {
54 self.inner.batch_execute(query)?;
55
56 Ok(())
57 }
58}
59
60impl ConnectionSealed for InstrumentedSqliteConnection {}
61
62impl Connection for InstrumentedSqliteConnection {
63 type Backend = Sqlite;
64 type TransactionManager = AnsiTransactionManager;
65
66 #[instrument(fields(db.system="sqlite", otel.kind="client"), skip(database_url), err)]
67 fn establish(database_url: &str) -> ConnectionResult<InstrumentedSqliteConnection> {
68 Ok(InstrumentedSqliteConnection {
69 inner: SqliteConnection::establish(database_url)?,
70 })
71 }
72
73 #[instrument(fields(db.system="sqlite", otel.kind="client"), skip(self, f))]
74 fn transaction<T, E, F>(&mut self, f: F) -> Result<T, E>
75 where
76 F: FnOnce(&mut Self) -> Result<T, E>,
77 E: From<diesel::result::Error>,
78 {
79 Self::TransactionManager::transaction(self, f)
80 }
81
82 #[instrument(fields(db.system="sqlite", otel.kind="client"), skip(self, source), err)]
83 fn execute_returning_count<T>(&mut self, source: &T) -> QueryResult<usize>
84 where
85 T: QueryFragment<Sqlite> + QueryId,
86 {
87 self.inner.execute_returning_count(source)
88 }
89
90 #[instrument(fields(db.system="sqlite", otel.kind="client"), skip(self))]
91 fn transaction_state(&mut self) -> &mut Self::TransactionManager {
92 self.inner.transaction_state()
93 }
94
95 #[instrument(fields(db.system="sqlite", otel.kind="client"), skip(self))]
96 fn instrumentation(&mut self) -> &mut dyn Instrumentation {
97 self.inner.instrumentation()
98 }
99
100 #[instrument(fields(db.system="sqlite", otel.kind="client"), skip(self, instrumentation))]
101 fn set_instrumentation(&mut self, instrumentation: impl Instrumentation) {
102 self.inner.set_instrumentation(instrumentation)
103 }
104
105 #[instrument(fields(db.system="sqlite", otel.kind="client"), skip(self, cache_size))]
106 fn set_prepared_statement_cache_size(&mut self, cache_size: CacheSize) {
107 self.inner.set_prepared_statement_cache_size(cache_size)
108 }
109}
110
111impl LoadConnection<DefaultLoadingMode> for InstrumentedSqliteConnection {
112 type Cursor<'conn, 'query>
113 = <SqliteConnection as LoadConnection<DefaultLoadingMode>>::Cursor<'conn, 'query>
114 where
115 Self: 'conn;
116 type Row<'conn, 'query>
117 = <SqliteConnection as LoadConnection<DefaultLoadingMode>>::Row<'conn, 'query>
118 where
119 Self: 'conn;
120
121 #[cfg_attr(
122 feature = "statement-fields",
123 instrument(
124 fields(
125 db.system="sqlite",
126 otel.kind="client",
127 db.statement=%diesel::debug_query(&source),
128 ),
129 skip(self, source),
130 err,
131 )
132 )]
133 #[cfg_attr(
134 not(feature = "statement-fields"),
135 instrument(
136 fields(
137 db.system="sqlite",
138 otel.kind="client",
139 ),
140 skip(self, source),
141 err,
142 )
143 )]
144 fn load<'conn, 'query, T>(
145 &'conn mut self,
146 source: T,
147 ) -> QueryResult<Self::Cursor<'conn, 'query>>
148 where
149 T: Query + QueryFragment<Self::Backend> + QueryId + 'query,
150 Self::Backend: QueryMetadata<T::SqlType>,
151 {
152 self.inner.load(source)
153 }
154}
155
156impl MigrationConnection for InstrumentedSqliteConnection {
157 fn setup(&mut self) -> QueryResult<usize> {
158 sql_query(CREATE_MIGRATIONS_TABLE).execute(self)
159 }
160}
161
162impl InstrumentedSqliteConnection {
163 #[instrument(fields(db.system="sqlite", otel.kind="client"), skip(self, f))]
164 pub fn immediate_transaction<T, E, F>(&mut self, f: F) -> Result<T, E>
165 where
166 F: FnOnce(&mut SqliteConnection) -> Result<T, E>,
167 E: From<diesel::result::Error>,
168 {
169 self.inner.immediate_transaction(f)
170 }
171
172 #[instrument(fields(db.system="sqlite", otel.kind="client"), skip(self, f))]
173 pub fn exclusive_transaction<T, E, F>(&mut self, f: F) -> Result<T, E>
174 where
175 F: FnOnce(&mut SqliteConnection) -> Result<T, E>,
176 E: From<diesel::result::Error>,
177 {
178 self.inner.exclusive_transaction(f)
179 }
180
181 #[doc(hidden)]
182 #[instrument(fields(db.system="sqlite", otel.kind="client"), skip(self, f))]
183 pub fn register_sql_function<ArgsSqlType, RetSqlType, Args, Ret, F>(
184 &mut self,
185 fn_name: &str,
186 deterministic: bool,
187 f: F,
188 ) -> QueryResult<()>
189 where
190 F: FnMut(Args) -> Ret + std::panic::UnwindSafe + Send + 'static,
191 Args: FromSqlRow<ArgsSqlType, Sqlite> + StaticallySizedRow<ArgsSqlType, Sqlite>,
192 Ret: ToSql<RetSqlType, Sqlite>,
193 Sqlite: HasSqlType<RetSqlType>,
194 {
195 self.inner.register_sql_function(fn_name, deterministic, f)
196 }
197}
198
199impl<'b, Changes, Output> UpdateAndFetchResults<Changes, Output> for InstrumentedSqliteConnection
200where
201 Changes: Copy + Identifiable,
202 Changes: AsChangeset<Target = <Changes as HasTable>::Table> + IntoUpdateTarget,
203 Changes::Table: FindDsl<Changes::Id>,
204 Update<Changes, Changes>: ExecuteDsl<SqliteConnection>,
205 Find<Changes::Table, Changes::Id>: LoadQuery<'b, SqliteConnection, Output>,
206 <Changes::Table as Table>::AllColumns: ValidGrouping<()>,
207 <<Changes::Table as Table>::AllColumns as ValidGrouping<()>>::IsAggregate:
208 MixedAggregates<is_aggregate::No, Output = is_aggregate::No>,
209{
210 fn update_and_fetch(&mut self, changeset: Changes) -> QueryResult<Output> {
211 debug!("updating and fetching changeset");
212 self.inner.update_and_fetch(changeset)
213 }
214}