sabi/tokio/data_src/
global_setup.rs1use super::DataSrcError;
6
7use super::super::{
8 AutoShutdown, DataConn, DataConnContainer, DataSrc, DataSrcContainer, DataSrcManager,
9 StaticDataSrcContainer, StaticDataSrcRegistration,
10};
11use crate::SendSyncNonNull;
12
13use setup_read_cleanup::{PhasedCellAsync, PhasedError, PhasedErrorKind};
14
15use std::collections::HashMap;
16use std::sync::Arc;
17use std::{any, ptr};
18use tokio::sync::Mutex;
19
20pub(crate) static DS_MANAGER: PhasedCellAsync<DataSrcManager> =
21 PhasedCellAsync::new(DataSrcManager::new(false));
22
23const NOOP: fn(&mut DataSrcManager) -> Result<(), PhasedError> = |_| Ok(());
24
25impl Drop for AutoShutdown {
26 fn drop(&mut self) {
27 let _ = DS_MANAGER.force_to_cleanup(|ds_m| {
28 ds_m.close();
29 Ok::<(), PhasedError>(())
30 });
31 }
32}
33
34pub async fn uses_async<S, C>(name: impl Into<Arc<str>>, ds: S) -> errs::Result<()>
55where
56 S: DataSrc<C> + 'static,
57 C: DataConn + 'static,
58{
59 match DS_MANAGER.lock_async().await {
60 Ok(mut dsm) => {
61 dsm.add(name, ds);
62 Ok(())
63 }
64 Err(e) => Err(errs::Err::with_source(
65 DataSrcError::FailToRegisterGlobalDataSrc { name: name.into() },
66 e,
67 )),
68 }
69}
70
71pub fn uses<S, C>(name: impl Into<Arc<str>>, ds: S) -> errs::Result<()>
93where
94 S: DataSrc<C> + 'static,
95 C: DataConn + 'static,
96{
97 match DS_MANAGER.try_lock() {
98 Ok(mut dsm) => {
99 dsm.add(name, ds);
100 Ok(())
101 }
102 Err(e) => Err(errs::Err::with_source(
103 DataSrcError::FailToRegisterGlobalDataSrc { name: name.into() },
104 e,
105 )),
106 }
107}
108
109fn collect_static_data_src_containers(dsm: &mut DataSrcManager) {
110 let regs: Vec<_> = inventory::iter::<StaticDataSrcRegistration>
111 .into_iter()
112 .collect();
113
114 let mut static_vec: Vec<SendSyncNonNull<DataSrcContainer>> = Vec::with_capacity(regs.len());
115 for reg in regs {
116 let any_container = (reg.factory)();
117 static_vec.push(any_container.ssnnptr);
118 }
119
120 dsm.prepend(static_vec);
121}
122
123pub async fn setup_async() -> errs::Result<AutoShutdown> {
136 let errors = Arc::new(Mutex::new(Vec::new()));
137 let errors_for_closure = Arc::clone(&errors);
138
139 if let Err(e) = DS_MANAGER
140 .transition_to_read_async(move |ds_m| {
141 collect_static_data_src_containers(ds_m);
142 let errors_for_future = Arc::clone(&errors_for_closure);
143 Box::pin(async move {
144 let mut lock = errors_for_future.lock().await;
145 ds_m.setup_async(&mut lock).await;
146 Ok::<(), PhasedError>(())
147 })
148 })
149 .await
150 {
151 if e.kind() == PhasedErrorKind::DuringTransitionToRead {
152 return Err(errs::Err::new(DataSrcError::DuringSetupGlobalDataSrcs));
153 } else {
154 return Err(errs::Err::new(DataSrcError::AlreadySetupGlobalDataSrcs));
155 }
156 }
157
158 let errors = Arc::try_unwrap(errors).unwrap().into_inner();
159 if errors.is_empty() {
160 Ok(AutoShutdown {})
161 } else {
162 Err(errs::Err::new(DataSrcError::FailToSetupGlobalDataSrcs {
163 errors,
164 }))
165 }
166}
167
168pub async fn setup_with_order_async(names: &'static [&str]) -> errs::Result<AutoShutdown> {
186 let errors = Arc::new(Mutex::new(Vec::new()));
187 let errors_for_closure = Arc::clone(&errors);
188
189 if let Err(e) = DS_MANAGER
190 .transition_to_read_async(move |ds_m| {
191 collect_static_data_src_containers(ds_m);
192 let errors_for_future = Arc::clone(&errors_for_closure);
193 Box::pin(async move {
194 let mut lock = errors_for_future.lock().await;
195 ds_m.setup_with_order_async(names, &mut lock).await;
196 Ok::<(), PhasedError>(())
197 })
198 })
199 .await
200 {
201 if e.kind() == PhasedErrorKind::DuringTransitionToRead {
202 return Err(errs::Err::new(DataSrcError::DuringSetupGlobalDataSrcs));
203 } else {
204 return Err(errs::Err::new(DataSrcError::AlreadySetupGlobalDataSrcs));
205 }
206 }
207
208 let errors = Arc::try_unwrap(errors).unwrap().into_inner();
209 if errors.is_empty() {
210 Ok(AutoShutdown {})
211 } else {
212 Err(errs::Err::new(DataSrcError::FailToSetupGlobalDataSrcs {
213 errors,
214 }))
215 }
216}
217
218pub(crate) fn copy_global_data_srcs_to_map(index_map: &mut HashMap<Arc<str>, (bool, usize)>) {
219 if let Ok(ds_m) = DS_MANAGER.read_relaxed() {
220 ds_m.copy_ds_ready_to_map(index_map);
221 } else if (match DS_MANAGER.force_to_read(NOOP) {
222 Ok(_) => Ok(()),
223 Err(e) => match e.kind() {
224 PhasedErrorKind::PhaseIsAlreadyCleanup => Ok(()),
225 PhasedErrorKind::DuringTransitionToRead => Ok(()),
226 _ => Err(()),
227 },
228 })
229 .is_ok()
230 {
231 if let Ok(ds_m) = DS_MANAGER.read_relaxed() {
232 ds_m.copy_ds_ready_to_map(index_map);
233 }
234 }
235}
236
237#[doc(hidden)]
238pub fn create_static_data_src_container<S, C>(
239 name: &'static str,
240 data_src: S,
241) -> StaticDataSrcContainer
242where
243 S: DataSrc<C> + 'static,
244 C: DataConn + 'static,
245{
246 let boxed = Box::new(DataSrcContainer::<S, C>::new(name, data_src, false));
247 let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
248 StaticDataSrcContainer {
249 ssnnptr: SendSyncNonNull::new(ptr),
250 }
251}
252
253impl StaticDataSrcRegistration {
254 pub const fn new(factory: fn() -> StaticDataSrcContainer) -> Self {
255 Self { factory }
256 }
257}
258inventory::collect!(StaticDataSrcRegistration);
259
260#[macro_export]
261#[doc(hidden)]
262macro_rules! _uses_for_async {
263 ($name:tt, $data_src:expr) => {
264 const _: () = {
265 inventory::submit! {
266 $crate::tokio::StaticDataSrcRegistration::new(|| {
267 $crate::tokio::create_static_data_src_container($name, $data_src)
268 })
269 }
270 };
271 };
272}
273
274pub(crate) async fn create_data_conn_from_global_data_src_async<C>(
275 index: usize,
276 name: impl AsRef<str>,
277) -> errs::Result<Box<DataConnContainer>>
278where
279 C: DataConn + 'static,
280{
281 match DS_MANAGER.read_relaxed() {
282 Ok(ds_manager) => ds_manager.create_data_conn_async::<C>(index, name).await,
283 Err(e) => Err(errs::Err::with_source(
284 DataSrcError::FailToCreateDataConn {
285 name: name.as_ref().into(),
286 data_conn_type: any::type_name::<C>(),
287 },
288 e,
289 )),
290 }
291}