Skip to main content

sabi/tokio/
mod.rs

1// Copyright (C) 2024-2026 Takayuki Sato. All Rights Reserved.
2// This program is free software under MIT License.
3// See the file LICENSE in this distribution for more details.
4
5//! This module provides Tokio-specific implementations for asynchronous data access,
6//! including `AsyncGroup` for concurrent task management, `DataConn` for
7//! transactional data connections, `DataSrc` for data source management,
8//! and `DataHub` as a central orchestrator.
9//!
10//! It leverages Rust's asynchronous capabilities with the Tokio runtime
11//! to enable efficient and concurrent handling of data operations.
12
13mod async_group;
14mod data_acc;
15mod data_conn;
16mod data_hub;
17mod data_src;
18
19use crate::SendSyncNonNull;
20
21use std::collections::HashMap;
22use std::future::Future;
23use std::pin::Pin;
24use std::sync::Arc;
25use std::{any, ptr};
26
27pub use data_conn::DataConnError;
28pub use data_hub::DataHubError;
29pub use data_src::{
30    create_static_data_src_container, setup_async, setup_with_order_async, uses, uses_async,
31    DataSrcError,
32};
33
34/// A convenience macro to easily convert an asynchronous function into a `Pin<Box<dyn Future>>`
35/// closure suitable for `DataHub`'s `run_async` or `txn_async` methods.
36///
37/// This macro simplifies passing async functions by handling the boxing and pinning.
38///
39/// # Example
40///
41/// ```ignore
42/// async fn my_logic(data_hub: &mut DataHub) -> errs::Result<()> {
43///     // ... some logic using data_hub
44///     Ok(())
45/// }
46///
47/// #[tokio::main]
48/// async fn main() {
49///     let mut hub = DataHub::new();
50///     hub.txn_async(logic!(my_logic)).await.unwrap();
51/// }
52/// ```
53#[doc(inline)]
54pub use crate::_logic as logic;
55
56/// Macro for registering a global data source at the top-level.
57///
58/// # Arguments
59///
60/// * `$name` - The name of the data source (must be a string literal).
61/// * `$data_src` - The data source instance.
62///
63/// # Examples
64///
65/// ```ignore
66/// uses!("my_global_source", MyDataSource::new());
67/// ```
68#[doc(inline)]
69pub use crate::_uses_for_async as uses;
70
71/// Manages a collection of asynchronous tasks, allowing them to be executed concurrently
72/// and their results (or errors) collected.
73#[allow(clippy::type_complexity)]
74pub struct AsyncGroup {
75    names: Vec<Arc<str>>,
76    tasks: Vec<Pin<Box<dyn Future<Output = errs::Result<()>> + Send + 'static>>>,
77    pub(crate) _name: Arc<str>,
78}
79
80/// A trait for data connection implementations, providing methods for transaction management.
81///
82/// Implementors of this trait represent a connection to a data source and define
83/// how to commit, rollback, and handle the lifecycle of transactions.
84#[allow(async_fn_in_trait)]
85#[allow(unused_variables)] // rustdoc
86pub trait DataConn {
87    /// Attempts to commit the changes made within this data connection.
88    ///
89    /// This is typically the main commit process, executed after `pre_commit_async`.
90    ///
91    /// # Arguments
92    ///
93    /// * `ag` - An `AsyncGroup` to which asynchronous tasks related to the commit can be added.
94    ///
95    /// # Returns
96    ///
97    /// A `Result` indicating success or failure of the commit operation.
98    async fn commit_async(&mut self, ag: &mut AsyncGroup) -> errs::Result<()>;
99
100    /// Performs preparatory actions before the main commit process.
101    ///
102    /// This method is called before `commit_async` and can be used for tasks like
103    /// validation or preparing data.
104    ///
105    /// # Arguments
106    ///
107    /// * `ag` - An `AsyncGroup` to which asynchronous tasks related to pre-commit can be added.
108    ///
109    /// # Returns
110    ///
111    /// A `Result` indicating success or failure of the pre-commit operation.
112    async fn pre_commit_async(&mut self, ag: &mut AsyncGroup) -> errs::Result<()> {
113        Ok(())
114    }
115
116    /// Performs actions after the main commit process, only if it succeeds.
117    ///
118    /// This can be used for cleanup or post-transaction logging. Errors returned from
119    /// tasks added to `ag` in this method are ignored.
120    ///
121    /// # Arguments
122    ///
123    /// * `ag` - An `AsyncGroup` to which asynchronous tasks related to post-commit can be added.
124    async fn post_commit_async(&mut self, ag: &mut AsyncGroup) {}
125
126    /// Indicates whether `force_back_async` should be called instead of `rollback_async`.
127    ///
128    /// This is typically `true` if `commit_async` has already succeeded for this connection,
129    /// implying that changes need to be undone rather than simply discarded.
130    ///
131    /// # Returns
132    ///
133    /// `true` if `force_back_async` should be called, `false` otherwise.
134    fn should_force_back(&self) -> bool {
135        false
136    }
137
138    /// Rolls back any changes made within this data connection.
139    ///
140    /// This method is called if a transaction fails before `commit_async` completes,
141    /// or if `should_force_back` returns `false`. Errors returned from tasks added to `ag`
142    /// in this method are ignored.
143    ///
144    /// # Arguments
145    ///
146    /// * `ag` - An `AsyncGroup` to which asynchronous tasks related to rollback can be added.
147    async fn rollback_async(&mut self, ag: &mut AsyncGroup);
148
149    /// Forces the data connection to revert changes that have already been committed.
150    ///
151    /// This method is called if a transaction fails after `commit_async` has completed
152    /// for this connection, and `should_force_back` returns `true`. Errors returned from
153    /// tasks added to `ag` in this method are ignored.
154    ///
155    /// # Arguments
156    ///
157    /// * `ag` - An `AsyncGroup` to which asynchronous tasks related to force-back can be added.
158    async fn force_back_async(&mut self, ag: &mut AsyncGroup) {}
159
160    /// Closes the data connection, releasing any associated resources.
161    ///
162    /// This method is always called at the end of a transaction, regardless of its outcome.
163    fn close(&mut self);
164}
165
166pub(crate) struct NoopDataConn {}
167
168impl DataConn for NoopDataConn {
169    async fn commit_async(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
170        Ok(())
171    }
172    async fn rollback_async(&mut self, _ag: &mut AsyncGroup) {}
173    fn close(&mut self) {}
174}
175
176#[allow(clippy::type_complexity)]
177#[repr(C)]
178pub(crate) struct DataConnContainer<C = NoopDataConn>
179where
180    C: DataConn + 'static,
181{
182    drop_fn: fn(*const DataConnContainer),
183    is_fn: fn(any::TypeId) -> bool,
184
185    commit_fn: for<'ag> fn(
186        *const DataConnContainer,
187        &'ag mut AsyncGroup,
188    ) -> Pin<Box<dyn Future<Output = errs::Result<()>> + 'ag>>,
189
190    pre_commit_fn: for<'ag> fn(
191        *const DataConnContainer,
192        &'ag mut AsyncGroup,
193    ) -> Pin<Box<dyn Future<Output = errs::Result<()>> + 'ag>>,
194
195    post_commit_fn: for<'ag> fn(
196        *const DataConnContainer,
197        &'ag mut AsyncGroup,
198    ) -> Pin<Box<dyn Future<Output = ()> + 'ag>>,
199
200    should_force_back_fn: fn(*const DataConnContainer) -> bool,
201
202    rollback_fn: for<'ag> fn(
203        *const DataConnContainer,
204        &'ag mut AsyncGroup,
205    ) -> Pin<Box<dyn Future<Output = ()> + 'ag>>,
206
207    force_back_fn: for<'ag> fn(
208        *const DataConnContainer,
209        &'ag mut AsyncGroup,
210    ) -> Pin<Box<dyn Future<Output = ()> + 'ag>>,
211
212    close_fn: fn(*const DataConnContainer),
213
214    name: Arc<str>,
215    data_conn: Box<C>,
216}
217
218pub(crate) struct DataConnManager {
219    vec: Vec<Option<ptr::NonNull<DataConnContainer>>>,
220    index_map: HashMap<Arc<str>, usize>,
221}
222
223/// A trait for data source implementations, responsible for setting up and creating data connections.
224///
225/// Implementors of this trait define how to prepare a data source and how to instantiate
226/// data connections of a specific type `C`.
227#[trait_variant::make(Send)]
228#[allow(async_fn_in_trait)]
229#[allow(unused_variables)] // for rustdoc
230pub trait DataSrc<C>
231where
232    C: DataConn + 'static,
233{
234    /// Performs asynchronous setup operations for the data source.
235    ///
236    /// This method is called to initialize the data source before any data connections
237    /// are created from it.
238    ///
239    /// # Arguments
240    ///
241    /// * `ag` - An `AsyncGroup` to which asynchronous setup tasks can be added.
242    ///
243    /// # Returns
244    ///
245    /// A `Result` indicating success or failure of the setup operation.
246    async fn setup_async(&mut self, ag: &mut AsyncGroup) -> errs::Result<()>;
247
248    /// Closes the data source, releasing any associated resources.
249    ///
250    /// This method is always called at the end of the `DataHub`'s lifecycle for this source.
251    fn close(&mut self);
252
253    /// Asynchronously creates a new data connection from this data source.
254    ///
255    /// # Returns
256    ///
257    /// A `Result` which is `Ok` containing a `Box`ed instance of the data connection `C`,
258    /// or an `Err` if the connection creation fails.
259    async fn create_data_conn_async(&mut self) -> errs::Result<Box<C>>;
260}
261
262pub(crate) struct NoopDataSrc {}
263
264impl DataSrc<NoopDataConn> for NoopDataSrc {
265    async fn setup_async(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
266        Ok(())
267    }
268    fn close(&mut self) {}
269    async fn create_data_conn_async(&mut self) -> errs::Result<Box<NoopDataConn>> {
270        Ok(Box::new(NoopDataConn {}))
271    }
272}
273
274#[allow(clippy::type_complexity)]
275#[repr(C)]
276pub(crate) struct DataSrcContainer<S = NoopDataSrc, C = NoopDataConn>
277where
278    S: DataSrc<C>,
279    C: DataConn + 'static,
280{
281    drop_fn: fn(*const DataSrcContainer),
282    close_fn: fn(*const DataSrcContainer),
283    is_data_conn_fn: fn(any::TypeId) -> bool,
284
285    setup_fn: for<'ag> fn(
286        *const DataSrcContainer,
287        &'ag mut AsyncGroup,
288    ) -> Pin<Box<dyn Future<Output = errs::Result<()>> + Send + 'ag>>,
289
290    create_data_conn_fn: fn(
291        *const DataSrcContainer,
292    ) -> Pin<
293        Box<dyn Future<Output = errs::Result<Box<DataConnContainer<C>>>> + 'static>,
294    >,
295
296    local: bool,
297    name: Arc<str>,
298    data_src: S,
299}
300
301pub(crate) struct DataSrcManager {
302    vec_unready: Vec<SendSyncNonNull<DataSrcContainer>>,
303    vec_ready: Vec<SendSyncNonNull<DataSrcContainer>>,
304    local: bool,
305}
306
307/// A marker struct that can be used to trigger automatic shutdown behavior
308/// for resources managed by the `DataHub` when it goes out of scope.
309pub struct AutoShutdown {}
310
311/// The central hub for managing data sources and their connections within an application.
312///
313/// `DataHub` provides mechanisms to register data sources, acquire data connections,
314/// and execute transactional or non-transactional asynchronous logic.
315pub struct DataHub {
316    local_data_src_manager: DataSrcManager,
317    data_src_map: HashMap<Arc<str>, (bool, usize)>,
318    data_conn_manager: DataConnManager,
319    fixed: bool,
320}
321
322/// A trait defining the ability to access data connections.
323///
324/// This trait abstracts the mechanism for retrieving data connections, allowing
325/// different implementations (e.g., `DataHub`) to provide connections.
326#[allow(async_fn_in_trait)]
327pub trait DataAcc {
328    /// Asynchronously retrieves a data connection of a specific type.
329    ///
330    /// # Type Parameters
331    ///
332    /// * `C` - The expected type of the data connection, which must implement `DataConn` and have a `'static` lifetime.
333    ///
334    /// # Arguments
335    ///
336    /// * `name` - The name of the data connection to retrieve.
337    ///
338    /// # Returns
339    ///
340    /// A `Result` which is `Ok` containing a mutable reference to the data connection
341    /// if found, or an `Err` if the connection cannot be retrieved or cast.
342    async fn get_data_conn_async<C: DataConn + 'static>(
343        &mut self,
344        name: impl AsRef<str>,
345    ) -> errs::Result<&mut C>;
346}
347
348#[doc(hidden)]
349pub struct StaticDataSrcContainer {
350    pub(crate) ssnnptr: SendSyncNonNull<DataSrcContainer>,
351}
352
353#[doc(hidden)]
354pub struct StaticDataSrcRegistration {
355    pub(crate) factory: fn() -> StaticDataSrcContainer,
356}