Skip to main content

sabi/tokio/
mod.rs

1// Copyright (C) 2024-2026 Takayuki Sato. All Rights Reserved.
2// This program is free software under MIT License.
3// See the file LICENSE in this distribution for more details.
4
5//! This module provides Tokio-specific implementations for asynchronous data access,
6//! including `AsyncGroup` for concurrent task management, `DataConn` for
7//! transactional data connections, `DataSrc` for data source management,
8//! and `DataHub` as a central orchestrator.
9//!
10//! It leverages Rust's asynchronous capabilities with the Tokio runtime
11//! to enable efficient and concurrent handling of data operations.
12
13mod async_group;
14mod data_acc;
15mod data_conn;
16mod data_hub;
17mod data_src;
18
19use crate::SendSyncNonNull;
20
21use std::any;
22use std::collections::HashMap;
23use std::future::Future;
24use std::pin::Pin;
25use std::sync::Arc;
26
27pub use data_conn::DataConnError;
28pub use data_hub::DataHubError;
29pub use data_src::{
30    create_static_data_src_container, setup_async, setup_with_order_async, uses, uses_async,
31    DataSrcError,
32};
33
34/// A convenience macro to easily convert an asynchronous function into a `Pin<Box<dyn Future>>`
35/// closure suitable for `DataHub`'s `run_async` or `txn_async` methods.
36///
37/// This macro simplifies passing async functions by handling the boxing and pinning.
38/// The resulting `Future` implements `Send`.
39///
40/// # Example
41///
42/// ```ignore
43/// async fn my_logic(data: &mut (impl MyData + Send)) -> errs::Result<()> {
44///     // ... some logic using data
45///     Ok(())
46/// }
47///
48/// #[tokio::main]
49/// async fn main() {
50///     let mut hub = DataHub::new();
51///     hub.txn_async(logic!(my_logic)).await.unwrap();
52/// }
53/// ```
54#[doc(inline)]
55pub use crate::_logic as logic;
56
57/// Macro for registering a global data source at the top-level.
58///
59/// # Arguments
60///
61/// * `$name` - The name of the data source (must be a string literal).
62/// * `$data_src` - The data source instance.
63///
64/// # Examples
65///
66/// ```ignore
67/// uses!("my_global_source", MyDataSource::new());
68/// ```
69#[doc(inline)]
70pub use crate::_uses_for_async as uses;
71
72/// Manages a collection of asynchronous tasks, allowing them to be executed concurrently
73/// and their results (or errors) collected.
74#[allow(clippy::type_complexity)]
75pub struct AsyncGroup {
76    names: Vec<Arc<str>>,
77    tasks: Vec<Pin<Box<dyn Future<Output = errs::Result<()>> + Send + 'static>>>,
78    pub(crate) _name: Arc<str>,
79}
80
81/// A trait for data connection implementations, providing methods for transaction management.
82///
83/// Implementors of this trait represent a connection to a data source and define
84/// how to commit, rollback, and handle the lifecycle of transactions.
85#[allow(async_fn_in_trait)]
86#[allow(unused_variables)] // rustdoc
87pub trait DataConn {
88    /// Attempts to commit the changes made within this data connection.
89    ///
90    /// This is typically the main commit process, executed after `pre_commit_async`.
91    ///
92    /// # Arguments
93    ///
94    /// * `ag` - An `AsyncGroup` to which asynchronous tasks related to the commit can be added.
95    ///
96    /// # Returns
97    ///
98    /// A `Result` indicating success or failure of the commit operation.
99    fn commit_async(
100        &mut self,
101        ag: &mut AsyncGroup,
102    ) -> impl Future<Output = errs::Result<()>> + Send;
103
104    /// Performs preparatory actions before the main commit process.
105    ///
106    /// This method is called before `commit_async` and can be used for tasks like
107    /// validation or preparing data.
108    ///
109    /// # Arguments
110    ///
111    /// * `ag` - An `AsyncGroup` to which asynchronous tasks related to pre-commit can be added.
112    ///
113    /// # Returns
114    ///
115    /// A `Result` indicating success or failure of the pre-commit operation.
116    fn pre_commit_async(
117        &mut self,
118        ag: &mut AsyncGroup,
119    ) -> impl Future<Output = errs::Result<()>> + Send {
120        async { Ok(()) }
121    }
122
123    /// Performs actions after the main commit process, only if it succeeds.
124    ///
125    /// This can be used for cleanup or post-transaction logging. Errors returned from
126    /// tasks added to `ag` in this method are ignored.
127    ///
128    /// # Arguments
129    ///
130    /// * `ag` - An `AsyncGroup` to which asynchronous tasks related to post-commit can be added.
131    fn post_commit_async(&mut self, ag: &mut AsyncGroup) -> impl Future<Output = ()> + Send {
132        async {}
133    }
134
135    /// Indicates whether `force_back_async` should be called instead of `rollback_async`.
136    ///
137    /// This is typically `true` if `commit_async` has already succeeded for this connection,
138    /// implying that changes need to be undone rather than simply discarded.
139    ///
140    /// # Returns
141    ///
142    /// `true` if `force_back_async` should be called, `false` otherwise.
143    fn should_force_back(&self) -> bool {
144        false
145    }
146
147    /// Rolls back any changes made within this data connection.
148    ///
149    /// This method is called if a transaction fails before `commit_async` completes,
150    /// or if `should_force_back` returns `false`. Errors returned from tasks added to `ag`
151    /// in this method are ignored.
152    ///
153    /// # Arguments
154    ///
155    /// * `ag` - An `AsyncGroup` to which asynchronous tasks related to rollback can be added.
156    fn rollback_async(&mut self, ag: &mut AsyncGroup) -> impl Future<Output = ()> + Send;
157
158    /// Forces the data connection to revert changes that have already been committed.
159    ///
160    /// This method is called if a transaction fails after `commit_async` has completed
161    /// for this connection, and `should_force_back` returns `true`. Errors returned from
162    /// tasks added to `ag` in this method are ignored.
163    ///
164    /// # Arguments
165    ///
166    /// * `ag` - An `AsyncGroup` to which asynchronous tasks related to force-back can be added.
167    fn force_back_async(&mut self, ag: &mut AsyncGroup) -> impl Future<Output = ()> + Send {
168        async {}
169    }
170
171    /// Closes the data connection, releasing any associated resources.
172    ///
173    /// This method is always called at the end of a transaction, regardless of its outcome.
174    fn close(&mut self);
175}
176
177pub(crate) struct NoopDataConn {}
178
179impl DataConn for NoopDataConn {
180    async fn commit_async(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
181        Ok(())
182    }
183    async fn rollback_async(&mut self, _ag: &mut AsyncGroup) {}
184    fn close(&mut self) {}
185}
186
187#[allow(clippy::type_complexity)]
188#[repr(C)]
189pub(crate) struct DataConnContainer<C = NoopDataConn>
190where
191    C: DataConn + 'static,
192{
193    drop_fn: fn(*const DataConnContainer),
194    is_fn: fn(any::TypeId) -> bool,
195
196    commit_fn: for<'ag> fn(
197        *const DataConnContainer,
198        &'ag mut AsyncGroup,
199    ) -> Pin<Box<dyn Future<Output = errs::Result<()>> + Send + 'ag>>,
200
201    pre_commit_fn: for<'ag> fn(
202        *const DataConnContainer,
203        &'ag mut AsyncGroup,
204    ) -> Pin<Box<dyn Future<Output = errs::Result<()>> + Send + 'ag>>,
205
206    post_commit_fn: for<'ag> fn(
207        *const DataConnContainer,
208        &'ag mut AsyncGroup,
209    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'ag>>,
210
211    should_force_back_fn: fn(*const DataConnContainer) -> bool,
212
213    rollback_fn: for<'ag> fn(
214        *const DataConnContainer,
215        &'ag mut AsyncGroup,
216    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'ag>>,
217
218    force_back_fn: for<'ag> fn(
219        *const DataConnContainer,
220        &'ag mut AsyncGroup,
221    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'ag>>,
222
223    close_fn: fn(*const DataConnContainer),
224
225    name: Arc<str>,
226    data_conn: Box<C>,
227}
228
229pub(crate) struct DataConnManager {
230    vec: Vec<Option<SendSyncNonNull<DataConnContainer>>>,
231    index_map: HashMap<Arc<str>, usize>,
232}
233
234/// A trait for data source implementations, responsible for setting up and creating data connections.
235///
236/// Implementors of this trait define how to prepare a data source and how to instantiate
237/// data connections of a specific type `C`.
238#[trait_variant::make(Send)]
239#[allow(unused_variables)] // for rustdoc
240pub trait DataSrc<C>
241where
242    C: DataConn + 'static,
243{
244    /// Performs asynchronous setup operations for the data source.
245    ///
246    /// This method is called to initialize the data source before any data connections
247    /// are created from it.
248    ///
249    /// # Arguments
250    ///
251    /// * `ag` - An `AsyncGroup` to which asynchronous setup tasks can be added.
252    ///
253    /// # Returns
254    ///
255    /// A `Result` indicating success or failure of the setup operation.
256    async fn setup_async(&mut self, ag: &mut AsyncGroup) -> errs::Result<()>;
257
258    /// Closes the data source, releasing any associated resources.
259    ///
260    /// This method is always called at the end of the `DataHub`'s lifecycle for this source.
261    fn close(&mut self);
262
263    /// Asynchronously creates a new data connection from this data source.
264    ///
265    /// # Returns
266    ///
267    /// A `Result` which is `Ok` containing a `Box`ed instance of the data connection `C`,
268    /// or an `Err` if the connection creation fails.
269    async fn create_data_conn_async(&mut self) -> errs::Result<Box<C>>;
270}
271
272pub(crate) struct NoopDataSrc {}
273
274impl DataSrc<NoopDataConn> for NoopDataSrc {
275    async fn setup_async(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
276        Ok(())
277    }
278    fn close(&mut self) {}
279    async fn create_data_conn_async(&mut self) -> errs::Result<Box<NoopDataConn>> {
280        Ok(Box::new(NoopDataConn {}))
281    }
282}
283
284#[allow(clippy::type_complexity)]
285#[repr(C)]
286pub(crate) struct DataSrcContainer<S = NoopDataSrc, C = NoopDataConn>
287where
288    S: DataSrc<C>,
289    C: DataConn + 'static,
290{
291    drop_fn: fn(*const DataSrcContainer),
292    close_fn: fn(*const DataSrcContainer),
293    is_data_conn_fn: fn(any::TypeId) -> bool,
294
295    setup_fn: for<'ag> fn(
296        *const DataSrcContainer,
297        &'ag mut AsyncGroup,
298    ) -> Pin<Box<dyn Future<Output = errs::Result<()>> + Send + 'ag>>,
299
300    create_data_conn_fn: fn(
301        *const DataSrcContainer,
302    ) -> Pin<
303        Box<dyn Future<Output = errs::Result<Box<DataConnContainer<C>>>> + Send + 'static>,
304    >,
305
306    local: bool,
307    name: Arc<str>,
308    data_src: S,
309}
310
311pub(crate) struct DataSrcManager {
312    vec_unready: Vec<SendSyncNonNull<DataSrcContainer>>,
313    vec_ready: Vec<SendSyncNonNull<DataSrcContainer>>,
314    local: bool,
315}
316
317/// A marker struct that can be used to trigger automatic shutdown behavior
318/// for resources managed by the `DataHub` when it goes out of scope.
319pub struct AutoShutdown {}
320
321/// The central hub for managing data sources and their connections within an application.
322///
323/// `DataHub` provides mechanisms to register data sources, acquire data connections,
324/// and execute transactional or non-transactional asynchronous logic.
325///
326/// This structure implements `Send`.
327pub struct DataHub {
328    local_data_src_manager: DataSrcManager,
329    data_src_map: HashMap<Arc<str>, (bool, usize)>,
330    data_conn_manager: DataConnManager,
331    fixed: bool,
332}
333
334/// A trait defining the ability to access data connections.
335///
336/// This trait abstracts the mechanism for retrieving data connections, allowing
337/// different implementations (e.g., `DataHub`) to provide connections.
338pub trait DataAcc {
339    /// Asynchronously retrieves a data connection of a specific type.
340    ///
341    /// # Type Parameters
342    ///
343    /// * `C` - The expected type of the data connection, which must implement `DataConn` and have a `'static` lifetime.
344    ///
345    /// # Arguments
346    ///
347    /// * `name` - The name of the data connection to retrieve.
348    ///
349    /// # Returns
350    ///
351    /// A `Result` which is `Ok` containing a mutable reference to the data connection
352    /// if found, or an `Err` if the connection cannot be retrieved or cast.
353    #[allow(async_fn_in_trait)]
354    fn get_data_conn_async<C: DataConn + 'static>(
355        &mut self,
356        name: &str,
357    ) -> impl Future<Output = errs::Result<&mut C>> + Send;
358}
359
360#[doc(hidden)]
361pub struct StaticDataSrcContainer {
362    pub(crate) ssnnptr: SendSyncNonNull<DataSrcContainer>,
363}
364
365#[doc(hidden)]
366pub struct StaticDataSrcRegistration {
367    pub(crate) factory: fn() -> StaticDataSrcContainer,
368}