sqlx_core/
sync.rs

1use cfg_if::cfg_if;
2
3// For types with identical signatures that don't require runtime support,
4// we can just arbitrarily pick one to use based on what's enabled.
5//
6// We'll generally lean towards Tokio's types as those are more featureful
7// (including `tokio-console` support) and more widely deployed.
8
9pub struct AsyncSemaphore {
10    // We use the semaphore from futures-intrusive as the one from async-lock
11    // is missing the ability to add arbitrary permits, and is not guaranteed to be fair:
12    // * https://github.com/smol-rs/async-lock/issues/22
13    // * https://github.com/smol-rs/async-lock/issues/23
14    //
15    // We're on the look-out for a replacement, however, as futures-intrusive is not maintained
16    // and there are some soundness concerns (although it turns out any intrusive future is unsound
17    // in MIRI due to the necessitated mutable aliasing):
18    // https://github.com/launchbadge/sqlx/issues/1668
19    #[cfg(all(
20        any(
21            feature = "_rt-async-global-executor",
22            feature = "_rt-async-std",
23            feature = "_rt-smol"
24        ),
25        not(feature = "_rt-tokio")
26    ))]
27    inner: futures_intrusive::sync::Semaphore,
28
29    #[cfg(feature = "_rt-tokio")]
30    inner: tokio::sync::Semaphore,
31}
32
33impl AsyncSemaphore {
34    #[track_caller]
35    pub fn new(fair: bool, permits: usize) -> Self {
36        if cfg!(not(any(
37            feature = "_rt-async-global-executor",
38            feature = "_rt-async-std",
39            feature = "_rt-smol",
40            feature = "_rt-tokio"
41        ))) {
42            crate::rt::missing_rt((fair, permits));
43        }
44
45        AsyncSemaphore {
46            #[cfg(all(
47                any(
48                    feature = "_rt-async-global-executor",
49                    feature = "_rt-async-std",
50                    feature = "_rt-smol"
51                ),
52                not(feature = "_rt-tokio")
53            ))]
54            inner: futures_intrusive::sync::Semaphore::new(fair, permits),
55            #[cfg(feature = "_rt-tokio")]
56            inner: {
57                debug_assert!(fair, "Tokio only has fair permits");
58                tokio::sync::Semaphore::new(permits)
59            },
60        }
61    }
62
63    pub fn permits(&self) -> usize {
64        cfg_if! {
65            if #[cfg(all(
66                any(
67                    feature = "_rt-async-global-executor",
68                    feature = "_rt-async-std",
69                    feature = "_rt-smol"
70                ),
71                not(feature = "_rt-tokio")
72            ))] {
73                self.inner.permits()
74            } else if #[cfg(feature = "_rt-tokio")] {
75                self.inner.available_permits()
76            } else {
77                crate::rt::missing_rt(())
78            }
79        }
80    }
81
82    pub async fn acquire(&self, permits: u32) -> AsyncSemaphoreReleaser<'_> {
83        cfg_if! {
84            if #[cfg(all(
85                any(
86                    feature = "_rt-async-global-executor",
87                    feature = "_rt-async-std",
88                    feature = "_rt-smol"
89                ),
90                not(feature = "_rt-tokio")
91            ))] {
92                AsyncSemaphoreReleaser {
93                    inner: self.inner.acquire(permits as usize).await,
94                }
95            } else if #[cfg(feature = "_rt-tokio")] {
96                AsyncSemaphoreReleaser {
97                    inner: self
98                        .inner
99                        // Weird quirk: `tokio::sync::Semaphore` mostly uses `usize` for permit counts,
100                        // but `u32` for this and `try_acquire_many()`.
101                        .acquire_many(permits)
102                        .await
103                        .expect("BUG: we do not expose the `.close()` method"),
104                }
105            } else {
106                crate::rt::missing_rt(permits)
107            }
108        }
109    }
110
111    pub fn try_acquire(&self, permits: u32) -> Option<AsyncSemaphoreReleaser<'_>> {
112        cfg_if! {
113            if #[cfg(all(
114                any(
115                    feature = "_rt-async-global-executor",
116                    feature = "_rt-async-std",
117                    feature = "_rt-smol"
118                ),
119                not(feature = "_rt-tokio")
120            ))] {
121                Some(AsyncSemaphoreReleaser {
122                    inner: self.inner.try_acquire(permits as usize)?,
123                })
124            } else if #[cfg(feature = "_rt-tokio")] {
125                Some(AsyncSemaphoreReleaser {
126                    inner: self.inner.try_acquire_many(permits).ok()?,
127                })
128            } else {
129                crate::rt::missing_rt(permits)
130            }
131        }
132    }
133
134    pub fn release(&self, permits: usize) {
135        cfg_if! {
136            if #[cfg(all(
137                any(
138                    feature = "_rt-async-global-executor",
139                    feature = "_rt-async-std",
140                    feature = "_rt-smol"
141                ),
142                not(feature = "_rt-tokio")
143            ))] {
144                self.inner.release(permits);
145            } else if #[cfg(feature = "_rt-tokio")] {
146                self.inner.add_permits(permits);
147            } else {
148                crate::rt::missing_rt(permits);
149            }
150        }
151    }
152}
153
154pub struct AsyncSemaphoreReleaser<'a> {
155    // We use the semaphore from futures-intrusive as the one from async-std
156    // is missing the ability to add arbitrary permits, and is not guaranteed to be fair:
157    // * https://github.com/smol-rs/async-lock/issues/22
158    // * https://github.com/smol-rs/async-lock/issues/23
159    //
160    // We're on the look-out for a replacement, however, as futures-intrusive is not maintained
161    // and there are some soundness concerns (although it turns out any intrusive future is unsound
162    // in MIRI due to the necessitated mutable aliasing):
163    // https://github.com/launchbadge/sqlx/issues/1668
164    #[cfg(all(
165        any(
166            feature = "_rt-async-global-executor",
167            feature = "_rt-async-std",
168            feature = "_rt-smol"
169        ),
170        not(feature = "_rt-tokio")
171    ))]
172    inner: futures_intrusive::sync::SemaphoreReleaser<'a>,
173
174    #[cfg(feature = "_rt-tokio")]
175    inner: tokio::sync::SemaphorePermit<'a>,
176
177    #[cfg(not(any(
178        feature = "_rt-async-global-executor",
179        feature = "_rt-async-std",
180        feature = "_rt-smol",
181        feature = "_rt-tokio"
182    )))]
183    _phantom: std::marker::PhantomData<&'a ()>,
184}
185
186impl AsyncSemaphoreReleaser<'_> {
187    pub fn disarm(self) {
188        cfg_if! {
189            if #[cfg(all(
190                any(
191                    feature = "_rt-async-global-executor",
192                    feature = "_rt-async-std",
193                    feature = "_rt-smol"
194                ),
195                not(feature = "_rt-tokio")
196            ))] {
197                let mut this = self;
198                this.inner.disarm();
199            } else if #[cfg(feature = "_rt-tokio")] {
200                self.inner.forget();
201            } else {
202                crate::rt::missing_rt(());
203            }
204        }
205    }
206}