Skip to main content

hydracache_sqlx/
query_ext.rs

1use async_trait::async_trait;
2use hydracache_core::CacheCodec;
3use hydracache_db::DbQuery;
4use serde::{de::DeserializeOwned, Serialize};
5use sqlx::{query::QueryAs, Database, Executor, FromRow, IntoArguments};
6
7use crate::Result;
8
9/// Convenience SQLx execution methods for [`DbQuery`].
10///
11/// These helpers keep SQLx responsible for query construction and row mapping,
12/// while HydraCache owns keying, tags, TTL, serialization, and local
13/// single-flight. Use [`DbQuery::fetch_with`] when you need a transaction,
14/// custom repository call, or a database client that is not pool-like.
15#[async_trait]
16pub trait SqlxQueryExt<T, C>
17where
18    C: CacheCodec,
19{
20    /// Execute a SQLx query on miss and cache exactly one row.
21    async fn fetch_one<'q, DB, A, E>(self, executor: E, query: QueryAs<'q, DB, T, A>) -> Result<T>
22    where
23        'q: 'static,
24        T: Serialize + DeserializeOwned + Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'static,
25        DB: Database + Send + Sync + 'static,
26        A: IntoArguments<'q, DB> + Send + 'static,
27        E: Send + Sync + 'static,
28        for<'c> &'c E: Executor<'c, Database = DB>;
29
30    /// Execute a SQLx query on miss and cache either one row or `None`.
31    async fn fetch_optional<'q, DB, A, E>(
32        self,
33        executor: E,
34        query: QueryAs<'q, DB, T, A>,
35    ) -> Result<Option<T>>
36    where
37        'q: 'static,
38        T: Serialize + DeserializeOwned + Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'static,
39        DB: Database + Send + Sync + 'static,
40        A: IntoArguments<'q, DB> + Send + 'static,
41        E: Send + Sync + 'static,
42        for<'c> &'c E: Executor<'c, Database = DB>;
43
44    /// Execute a SQLx query on miss and cache all returned rows.
45    async fn fetch_all<'q, DB, A, E>(
46        self,
47        executor: E,
48        query: QueryAs<'q, DB, T, A>,
49    ) -> Result<Vec<T>>
50    where
51        'q: 'static,
52        T: Serialize + DeserializeOwned + Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'static,
53        DB: Database + Send + Sync + 'static,
54        A: IntoArguments<'q, DB> + Send + 'static,
55        E: Send + Sync + 'static,
56        for<'c> &'c E: Executor<'c, Database = DB>;
57}
58
59#[async_trait]
60impl<T, C> SqlxQueryExt<T, C> for DbQuery<T, C>
61where
62    C: CacheCodec,
63{
64    async fn fetch_one<'q, DB, A, E>(self, executor: E, query: QueryAs<'q, DB, T, A>) -> Result<T>
65    where
66        'q: 'static,
67        T: Serialize + DeserializeOwned + Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'static,
68        DB: Database + Send + Sync + 'static,
69        A: IntoArguments<'q, DB> + Send + 'static,
70        E: Send + Sync + 'static,
71        for<'c> &'c E: Executor<'c, Database = DB>,
72    {
73        self.fetch_value_with(move || async move { query.fetch_one(&executor).await })
74            .await
75            .map_err(Into::into)
76    }
77
78    async fn fetch_optional<'q, DB, A, E>(
79        self,
80        executor: E,
81        query: QueryAs<'q, DB, T, A>,
82    ) -> Result<Option<T>>
83    where
84        'q: 'static,
85        T: Serialize + DeserializeOwned + Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'static,
86        DB: Database + Send + Sync + 'static,
87        A: IntoArguments<'q, DB> + Send + 'static,
88        E: Send + Sync + 'static,
89        for<'c> &'c E: Executor<'c, Database = DB>,
90    {
91        self.fetch_value_with(move || async move { query.fetch_optional(&executor).await })
92            .await
93            .map_err(Into::into)
94    }
95
96    async fn fetch_all<'q, DB, A, E>(
97        self,
98        executor: E,
99        query: QueryAs<'q, DB, T, A>,
100    ) -> Result<Vec<T>>
101    where
102        'q: 'static,
103        T: Serialize + DeserializeOwned + Send + Unpin + for<'r> FromRow<'r, DB::Row> + 'static,
104        DB: Database + Send + Sync + 'static,
105        A: IntoArguments<'q, DB> + Send + 'static,
106        E: Send + Sync + 'static,
107        for<'c> &'c E: Executor<'c, Database = DB>,
108    {
109        self.fetch_value_with(move || async move { query.fetch_all(&executor).await })
110            .await
111            .map_err(Into::into)
112    }
113}