Skip to main content

sabi/tokio/data_src/
global_setup.rs

1// Copyright (C) 2024-2026 Takayuki Sato. All Rights Reserved.
2// This program is free software under MIT License.
3// See the file LICENSE in this distribution for more details.
4
5use super::DataSrcError;
6
7use super::super::{
8    AutoShutdown, DataConn, DataConnContainer, DataSrc, DataSrcContainer, DataSrcManager,
9    StaticDataSrcContainer, StaticDataSrcRegistration,
10};
11use crate::SendSyncNonNull;
12
13use setup_read_cleanup::{PhasedCellAsync, PhasedError, PhasedErrorKind};
14
15use std::collections::HashMap;
16use std::sync::Arc;
17use std::{any, ptr};
18use tokio::sync::Mutex;
19
20pub(crate) static DS_MANAGER: PhasedCellAsync<DataSrcManager> =
21    PhasedCellAsync::new(DataSrcManager::new(false));
22
23const NOOP: fn(&mut DataSrcManager) -> Result<(), PhasedError> = |_| Ok(());
24
25impl Drop for AutoShutdown {
26    fn drop(&mut self) {
27        let _ = DS_MANAGER.force_to_cleanup(|ds_m| {
28            ds_m.close();
29            Ok::<(), PhasedError>(())
30        });
31    }
32}
33
34/// Registers a global data source, making it available throughout the application.
35///
36/// Global data sources are managed by a singleton and can be set up once for the application's lifetime.
37/// If `setup_async` or `setup_with_order_async` has already been called, this function will return an `errs::Err`.
38/// If another Tokio task holds the lock of the global data source manager, this function will wait until the lock is released.
39///
40/// # Arguments
41///
42/// * `name` - The name to associate with this data source.
43/// * `ds` - The data source instance, which must implement `DataSrc` and have a `'static` lifetime.
44///
45/// # Type Parameters
46///
47/// * `S` - The type of the data source.
48/// * `C` - The type of the data connection provided by the data source.
49///
50/// # Returns
51///
52/// * `errs::Result<()>`: [`Ok`] if the data source is successfully registered, or an [`errs::Err`] if
53///   the global data source manager is in an invalid state or setup has already occurred.
54pub async fn uses_async<S, C>(name: impl Into<Arc<str>>, ds: S) -> errs::Result<()>
55where
56    S: DataSrc<C> + 'static,
57    C: DataConn + 'static,
58{
59    match DS_MANAGER.lock_async().await {
60        Ok(mut dsm) => {
61            dsm.add(name, ds);
62            Ok(())
63        }
64        Err(e) => Err(errs::Err::with_source(
65            DataSrcError::FailToRegisterGlobalDataSrc { name: name.into() },
66            e,
67        )),
68    }
69}
70
71/// Registers a global data source, making it available throughout the application.
72///
73/// This is the synchronous version of `uses_async`.
74/// Global data sources are managed by a singleton and can be set up once for the application's lifetime.
75/// If `setup_async` or `setup_with_order_async` has already been called, this function will return an `errs::Err`.
76/// If another Tokio task holds the lock of the global data source manager, this function will return an error immediately without waiting.
77///
78/// # Arguments
79///
80/// * `name` - The name to associate with this data source.
81/// * `ds` - The data source instance, which must implement `DataSrc` and have a `'static` lifetime.
82///
83/// # Type Parameters
84///
85/// * `S` - The type of the data source.
86/// * `C` - The type of the data connection provided by the data source.
87///
88/// # Returns
89///
90/// * `errs::Result<()>`: [`Ok`] if the data source is successfully registered, or an [`errs::Err`] if
91///   the global data source manager is in an invalid state or setup has already occurred.
92pub fn uses<S, C>(name: impl Into<Arc<str>>, ds: S) -> errs::Result<()>
93where
94    S: DataSrc<C> + 'static,
95    C: DataConn + 'static,
96{
97    match DS_MANAGER.try_lock() {
98        Ok(mut dsm) => {
99            dsm.add(name, ds);
100            Ok(())
101        }
102        Err(e) => Err(errs::Err::with_source(
103            DataSrcError::FailToRegisterGlobalDataSrc { name: name.into() },
104            e,
105        )),
106    }
107}
108
109fn collect_static_data_src_containers(dsm: &mut DataSrcManager) {
110    let regs: Vec<_> = inventory::iter::<StaticDataSrcRegistration>
111        .into_iter()
112        .collect();
113
114    let mut static_vec: Vec<SendSyncNonNull<DataSrcContainer>> = Vec::with_capacity(regs.len());
115    for reg in regs {
116        let any_container = (reg.factory)();
117        static_vec.push(any_container.ssnnptr);
118    }
119
120    dsm.prepend(static_vec);
121}
122
123/// Asynchronously sets up all globally registered data sources.
124///
125/// This function sets up all data sources that have been registered via `uses_async` function or
126/// `uses!` macro. It collects any setup errors.
127///
128/// # Returns
129///
130/// A `Result` which is `Ok` containing an `AutoShutdown` guard if all data sources
131/// are set up successfully. If setup fails for any data source, it returns an `Err`
132/// with `DataSrcError::FailToSetupGlobalDataSrcs`. If called when data sources are
133/// already set up or in transition, it returns `DataSrcError::AlreadySetupGlobalDataSrcs`
134/// or `DataSrcError::DuringSetupGlobalDataSrcs` respectively.
135pub async fn setup_async() -> errs::Result<AutoShutdown> {
136    let errors = Arc::new(Mutex::new(Vec::new()));
137    let errors_for_closure = Arc::clone(&errors);
138
139    if let Err(e) = DS_MANAGER
140        .transition_to_read_async(move |ds_m| {
141            collect_static_data_src_containers(ds_m);
142            let errors_for_future = Arc::clone(&errors_for_closure);
143            Box::pin(async move {
144                let mut lock = errors_for_future.lock().await;
145                ds_m.setup_async(&mut lock).await;
146                Ok::<(), PhasedError>(())
147            })
148        })
149        .await
150    {
151        if e.kind() == PhasedErrorKind::DuringTransitionToRead {
152            return Err(errs::Err::new(DataSrcError::DuringSetupGlobalDataSrcs));
153        } else {
154            return Err(errs::Err::new(DataSrcError::AlreadySetupGlobalDataSrcs));
155        }
156    }
157
158    let errors = Arc::try_unwrap(errors).unwrap().into_inner();
159    if errors.is_empty() {
160        Ok(AutoShutdown {})
161    } else {
162        Err(errs::Err::new(DataSrcError::FailToSetupGlobalDataSrcs {
163            errors,
164        }))
165    }
166}
167
168/// Asynchronously sets up all globally registered data sources with a specified order.
169///
170/// Similar to `setup_async`, but allows defining the order in which data sources
171/// are set up. Data sources not specified in `names` will be set up after the
172/// specified ones, in an undefined order.
173///
174/// # Arguments
175///
176/// * `names` - An array of string slices specifying the desired setup order by data source name.
177///
178/// # Returns
179///
180/// A `Result` which is `Ok` containing an `AutoShutdown` guard if all data sources
181/// are set up successfully. If setup fails for any data source, it returns an `Err`
182/// with `DataSrcError::FailToSetupGlobalDataSrcs`. If called when data sources are
183/// already set up or in transition, it returns `DataSrcError::AlreadySetupGlobalDataSrcs`
184/// or `DataSrcError::DuringSetupGlobalDataSrcs` respectively.
185pub async fn setup_with_order_async(names: &'static [&str]) -> errs::Result<AutoShutdown> {
186    let errors = Arc::new(Mutex::new(Vec::new()));
187    let errors_for_closure = Arc::clone(&errors);
188
189    if let Err(e) = DS_MANAGER
190        .transition_to_read_async(move |ds_m| {
191            collect_static_data_src_containers(ds_m);
192            let errors_for_future = Arc::clone(&errors_for_closure);
193            Box::pin(async move {
194                let mut lock = errors_for_future.lock().await;
195                ds_m.setup_with_order_async(names, &mut lock).await;
196                Ok::<(), PhasedError>(())
197            })
198        })
199        .await
200    {
201        if e.kind() == PhasedErrorKind::DuringTransitionToRead {
202            return Err(errs::Err::new(DataSrcError::DuringSetupGlobalDataSrcs));
203        } else {
204            return Err(errs::Err::new(DataSrcError::AlreadySetupGlobalDataSrcs));
205        }
206    }
207
208    let errors = Arc::try_unwrap(errors).unwrap().into_inner();
209    if errors.is_empty() {
210        Ok(AutoShutdown {})
211    } else {
212        Err(errs::Err::new(DataSrcError::FailToSetupGlobalDataSrcs {
213            errors,
214        }))
215    }
216}
217
218pub(crate) fn copy_global_data_srcs_to_map(index_map: &mut HashMap<Arc<str>, (bool, usize)>) {
219    if let Ok(ds_m) = DS_MANAGER.read_relaxed() {
220        ds_m.copy_ds_ready_to_map(index_map);
221    } else if (match DS_MANAGER.force_to_read(NOOP) {
222        Ok(_) => Ok(()),
223        Err(e) => match e.kind() {
224            PhasedErrorKind::PhaseIsAlreadyCleanup => Ok(()),
225            PhasedErrorKind::DuringTransitionToRead => Ok(()),
226            _ => Err(()),
227        },
228    })
229    .is_ok()
230    {
231        if let Ok(ds_m) = DS_MANAGER.read_relaxed() {
232            ds_m.copy_ds_ready_to_map(index_map);
233        }
234    }
235}
236
237#[doc(hidden)]
238pub fn create_static_data_src_container<S, C>(
239    name: &'static str,
240    data_src: S,
241) -> StaticDataSrcContainer
242where
243    S: DataSrc<C> + 'static,
244    C: DataConn + 'static,
245{
246    let boxed = Box::new(DataSrcContainer::<S, C>::new(name, data_src, false));
247    let ptr = ptr::NonNull::from(Box::leak(boxed)).cast::<DataSrcContainer>();
248    StaticDataSrcContainer {
249        ssnnptr: SendSyncNonNull::new(ptr),
250    }
251}
252
253impl StaticDataSrcRegistration {
254    pub const fn new(factory: fn() -> StaticDataSrcContainer) -> Self {
255        Self { factory }
256    }
257}
258inventory::collect!(StaticDataSrcRegistration);
259
260#[macro_export]
261#[doc(hidden)]
262macro_rules! _uses_for_async {
263    ($name:tt, $data_src:expr) => {
264        const _: () = {
265            inventory::submit! {
266                $crate::tokio::StaticDataSrcRegistration::new(|| {
267                    $crate::tokio::create_static_data_src_container($name, $data_src)
268                })
269            }
270        };
271    };
272}
273
274pub(crate) async fn create_data_conn_from_global_data_src_async<C>(
275    index: usize,
276    name: impl AsRef<str>,
277) -> errs::Result<Box<DataConnContainer>>
278where
279    C: DataConn + 'static,
280{
281    match DS_MANAGER.read_relaxed() {
282        Ok(ds_manager) => ds_manager.create_data_conn_async::<C>(index, name).await,
283        Err(e) => Err(errs::Err::with_source(
284            DataSrcError::FailToCreateDataConn {
285                name: name.as_ref().into(),
286                data_conn_type: any::type_name::<C>(),
287            },
288            e,
289        )),
290    }
291}