Skip to main content

sabi/tokio/
mod.rs

1// Copyright (C) 2024-2026 Takayuki Sato. All Rights Reserved.
2// This program is free software under MIT License.
3// See the file LICENSE in this distribution for more details.
4
5//! This module provides Tokio-specific implementations for asynchronous data access,
6//! including `AsyncGroup` for concurrent task management, `DataConn` for
7//! transactional data connections, `DataSrc` for data source management,
8//! and `DataHub` as a central orchestrator.
9//!
10//! It leverages Rust's asynchronous capabilities with the Tokio runtime
11//! to enable efficient and concurrent handling of data operations.
12
13mod async_group;
14mod data_acc;
15mod data_conn;
16mod data_hub;
17mod data_src;
18
19use crate::SendSyncNonNull;
20
21use std::any;
22use std::collections::HashMap;
23use std::future::Future;
24use std::pin::Pin;
25use std::sync::Arc;
26
27pub use data_conn::DataConnError;
28pub use data_hub::DataHubError;
29pub use data_src::{
30    create_static_data_src_container, setup_async, setup_with_order_async, uses, uses_async,
31    DataSrcError,
32};
33
34/// A convenience macro to easily convert an asynchronous function into a `Pin<Box<dyn Future>>`
35/// closure suitable for `DataHub`'s `run_async` or `txn_async` methods.
36///
37/// This macro simplifies passing async functions by handling the boxing and pinning.
38/// The resulting `Future` implements `Send`.
39///
40/// # Example
41///
42/// ```ignore
43/// async fn my_logic(data: &mut (impl MyData + Send)) -> errs::Result<()> {
44///     // ... some logic using data
45///     Ok(())
46/// }
47///
48/// #[tokio::main]
49/// async fn main() {
50///     let mut hub = DataHub::new();
51///     hub.txn_async(logic!(my_logic)).await.unwrap();
52/// }
53/// ```
54#[doc(inline)]
55pub use crate::_logic as logic;
56
57/// Macro for registering a global data source at the top-level.
58///
59/// # Arguments
60///
61/// * `$name` - The name of the data source (must be a string literal).
62/// * `$data_src` - The data source instance.
63///
64/// # Examples
65///
66/// ```ignore
67/// uses!("my_global_source", MyDataSource::new());
68/// ```
69#[doc(inline)]
70pub use crate::_uses_for_async as uses;
71
72/// Manages a collection of asynchronous tasks, allowing them to be executed concurrently
73/// and their results (or errors) collected.
74#[allow(clippy::type_complexity)]
75pub struct AsyncGroup {
76    names: Vec<Arc<str>>,
77    tasks: Vec<Pin<Box<dyn Future<Output = errs::Result<()>> + Send + 'static>>>,
78    pub(crate) _name: Arc<str>,
79}
80
81/// A trait for data connection implementations, providing methods for transaction management.
82///
83/// Implementors of this trait represent a connection to a data source and define
84/// how to commit, rollback, and handle the lifecycle of transactions.
85#[allow(async_fn_in_trait)]
86#[allow(unused_variables)] // rustdoc
87pub trait DataConn {
88    /// Attempts to commit the changes made within this data connection.
89    ///
90    /// This is typically the main commit process, executed after `pre_commit_async`.
91    ///
92    /// # Arguments
93    ///
94    /// * `ag` - An `AsyncGroup` to which asynchronous tasks related to the commit can be added.
95    ///
96    /// # Returns
97    ///
98    /// A `Result` indicating success or failure of the commit operation.
99    async fn commit_async(&mut self, ag: &mut AsyncGroup) -> errs::Result<()>;
100
101    /// Performs preparatory actions before the main commit process.
102    ///
103    /// This method is called before `commit_async` and can be used for tasks like
104    /// validation or preparing data.
105    ///
106    /// # Arguments
107    ///
108    /// * `ag` - An `AsyncGroup` to which asynchronous tasks related to pre-commit can be added.
109    ///
110    /// # Returns
111    ///
112    /// A `Result` indicating success or failure of the pre-commit operation.
113    async fn pre_commit_async(&mut self, ag: &mut AsyncGroup) -> errs::Result<()> {
114        Ok(())
115    }
116
117    /// Performs actions after the main commit process, only if it succeeds.
118    ///
119    /// This can be used for cleanup or post-transaction logging. Errors returned from
120    /// tasks added to `ag` in this method are ignored.
121    ///
122    /// # Arguments
123    ///
124    /// * `ag` - An `AsyncGroup` to which asynchronous tasks related to post-commit can be added.
125    async fn post_commit_async(&mut self, ag: &mut AsyncGroup) {}
126
127    /// Indicates whether `force_back_async` should be called instead of `rollback_async`.
128    ///
129    /// This is typically `true` if `commit_async` has already succeeded for this connection,
130    /// implying that changes need to be undone rather than simply discarded.
131    ///
132    /// # Returns
133    ///
134    /// `true` if `force_back_async` should be called, `false` otherwise.
135    fn should_force_back(&self) -> bool {
136        false
137    }
138
139    /// Rolls back any changes made within this data connection.
140    ///
141    /// This method is called if a transaction fails before `commit_async` completes,
142    /// or if `should_force_back` returns `false`. Errors returned from tasks added to `ag`
143    /// in this method are ignored.
144    ///
145    /// # Arguments
146    ///
147    /// * `ag` - An `AsyncGroup` to which asynchronous tasks related to rollback can be added.
148    async fn rollback_async(&mut self, ag: &mut AsyncGroup);
149
150    /// Forces the data connection to revert changes that have already been committed.
151    ///
152    /// This method is called if a transaction fails after `commit_async` has completed
153    /// for this connection, and `should_force_back` returns `true`. Errors returned from
154    /// tasks added to `ag` in this method are ignored.
155    ///
156    /// # Arguments
157    ///
158    /// * `ag` - An `AsyncGroup` to which asynchronous tasks related to force-back can be added.
159    async fn force_back_async(&mut self, ag: &mut AsyncGroup) {}
160
161    /// Closes the data connection, releasing any associated resources.
162    ///
163    /// This method is always called at the end of a transaction, regardless of its outcome.
164    fn close(&mut self);
165}
166
167pub(crate) struct NoopDataConn {}
168
169impl DataConn for NoopDataConn {
170    async fn commit_async(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
171        Ok(())
172    }
173    async fn rollback_async(&mut self, _ag: &mut AsyncGroup) {}
174    fn close(&mut self) {}
175}
176
177#[allow(clippy::type_complexity)]
178#[repr(C)]
179pub(crate) struct DataConnContainer<C = NoopDataConn>
180where
181    C: DataConn + 'static,
182{
183    drop_fn: fn(*const DataConnContainer),
184    is_fn: fn(any::TypeId) -> bool,
185
186    commit_fn: for<'ag> fn(
187        *const DataConnContainer,
188        &'ag mut AsyncGroup,
189    ) -> Pin<Box<dyn Future<Output = errs::Result<()>> + 'ag>>,
190
191    pre_commit_fn: for<'ag> fn(
192        *const DataConnContainer,
193        &'ag mut AsyncGroup,
194    ) -> Pin<Box<dyn Future<Output = errs::Result<()>> + 'ag>>,
195
196    post_commit_fn: for<'ag> fn(
197        *const DataConnContainer,
198        &'ag mut AsyncGroup,
199    ) -> Pin<Box<dyn Future<Output = ()> + 'ag>>,
200
201    should_force_back_fn: fn(*const DataConnContainer) -> bool,
202
203    rollback_fn: for<'ag> fn(
204        *const DataConnContainer,
205        &'ag mut AsyncGroup,
206    ) -> Pin<Box<dyn Future<Output = ()> + 'ag>>,
207
208    force_back_fn: for<'ag> fn(
209        *const DataConnContainer,
210        &'ag mut AsyncGroup,
211    ) -> Pin<Box<dyn Future<Output = ()> + 'ag>>,
212
213    close_fn: fn(*const DataConnContainer),
214
215    name: Arc<str>,
216    data_conn: Box<C>,
217}
218
219pub(crate) struct DataConnManager {
220    vec: Vec<Option<SendSyncNonNull<DataConnContainer>>>,
221    index_map: HashMap<Arc<str>, usize>,
222}
223
224/// A trait for data source implementations, responsible for setting up and creating data connections.
225///
226/// Implementors of this trait define how to prepare a data source and how to instantiate
227/// data connections of a specific type `C`.
228#[trait_variant::make(Send)]
229#[allow(async_fn_in_trait)]
230#[allow(unused_variables)] // for rustdoc
231pub trait DataSrc<C>
232where
233    C: DataConn + 'static,
234{
235    /// Performs asynchronous setup operations for the data source.
236    ///
237    /// This method is called to initialize the data source before any data connections
238    /// are created from it.
239    ///
240    /// # Arguments
241    ///
242    /// * `ag` - An `AsyncGroup` to which asynchronous setup tasks can be added.
243    ///
244    /// # Returns
245    ///
246    /// A `Result` indicating success or failure of the setup operation.
247    async fn setup_async(&mut self, ag: &mut AsyncGroup) -> errs::Result<()>;
248
249    /// Closes the data source, releasing any associated resources.
250    ///
251    /// This method is always called at the end of the `DataHub`'s lifecycle for this source.
252    fn close(&mut self);
253
254    /// Asynchronously creates a new data connection from this data source.
255    ///
256    /// # Returns
257    ///
258    /// A `Result` which is `Ok` containing a `Box`ed instance of the data connection `C`,
259    /// or an `Err` if the connection creation fails.
260    async fn create_data_conn_async(&mut self) -> errs::Result<Box<C>>;
261}
262
263pub(crate) struct NoopDataSrc {}
264
265impl DataSrc<NoopDataConn> for NoopDataSrc {
266    async fn setup_async(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
267        Ok(())
268    }
269    fn close(&mut self) {}
270    async fn create_data_conn_async(&mut self) -> errs::Result<Box<NoopDataConn>> {
271        Ok(Box::new(NoopDataConn {}))
272    }
273}
274
275#[allow(clippy::type_complexity)]
276#[repr(C)]
277pub(crate) struct DataSrcContainer<S = NoopDataSrc, C = NoopDataConn>
278where
279    S: DataSrc<C>,
280    C: DataConn + 'static,
281{
282    drop_fn: fn(*const DataSrcContainer),
283    close_fn: fn(*const DataSrcContainer),
284    is_data_conn_fn: fn(any::TypeId) -> bool,
285
286    setup_fn: for<'ag> fn(
287        *const DataSrcContainer,
288        &'ag mut AsyncGroup,
289    ) -> Pin<Box<dyn Future<Output = errs::Result<()>> + Send + 'ag>>,
290
291    create_data_conn_fn: fn(
292        *const DataSrcContainer,
293    ) -> Pin<
294        Box<dyn Future<Output = errs::Result<Box<DataConnContainer<C>>>> + Send + 'static>,
295    >,
296
297    local: bool,
298    name: Arc<str>,
299    data_src: S,
300}
301
302pub(crate) struct DataSrcManager {
303    vec_unready: Vec<SendSyncNonNull<DataSrcContainer>>,
304    vec_ready: Vec<SendSyncNonNull<DataSrcContainer>>,
305    local: bool,
306}
307
308/// A marker struct that can be used to trigger automatic shutdown behavior
309/// for resources managed by the `DataHub` when it goes out of scope.
310pub struct AutoShutdown {}
311
312/// The central hub for managing data sources and their connections within an application.
313///
314/// `DataHub` provides mechanisms to register data sources, acquire data connections,
315/// and execute transactional or non-transactional asynchronous logic.
316///
317/// This structure implements `Send`.
318pub struct DataHub {
319    local_data_src_manager: DataSrcManager,
320    data_src_map: HashMap<Arc<str>, (bool, usize)>,
321    data_conn_manager: DataConnManager,
322    fixed: bool,
323}
324
325/// A trait defining the ability to access data connections.
326///
327/// This trait abstracts the mechanism for retrieving data connections, allowing
328/// different implementations (e.g., `DataHub`) to provide connections.
329#[allow(async_fn_in_trait)]
330pub trait DataAcc {
331    /// Asynchronously retrieves a data connection of a specific type.
332    ///
333    /// # Type Parameters
334    ///
335    /// * `C` - The expected type of the data connection, which must implement `DataConn` and have a `'static` lifetime.
336    ///
337    /// # Arguments
338    ///
339    /// * `name` - The name of the data connection to retrieve.
340    ///
341    /// # Returns
342    ///
343    /// A `Result` which is `Ok` containing a mutable reference to the data connection
344    /// if found, or an `Err` if the connection cannot be retrieved or cast.
345    async fn get_data_conn_async<C: DataConn + 'static>(
346        &mut self,
347        name: impl AsRef<str>,
348    ) -> errs::Result<&mut C>;
349}
350
351#[doc(hidden)]
352pub struct StaticDataSrcContainer {
353    pub(crate) ssnnptr: SendSyncNonNull<DataSrcContainer>,
354}
355
356#[doc(hidden)]
357pub struct StaticDataSrcRegistration {
358    pub(crate) factory: fn() -> StaticDataSrcContainer,
359}