Skip to main content

sabi/tokio/
mod.rs

1// Copyright (C) 2024-2026 Takayuki Sato. All Rights Reserved.
2// This program is free software under MIT License.
3// See the file LICENSE in this distribution for more details.
4
5//! This module provides Tokio-specific implementations for asynchronous data access,
6//! including `AsyncGroup` for concurrent task management, `DataConn` for
7//! transactional data connections, `DataSrc` for data source management,
8//! and `DataHub` as a central orchestrator.
9//!
10//! It leverages Rust's asynchronous capabilities with the Tokio runtime
11//! to enable efficient and concurrent handling of data operations.
12
13mod async_group;
14mod data_acc;
15mod data_conn;
16mod data_hub;
17mod data_src;
18
19use crate::SendSyncNonNull;
20
21use std::collections::HashMap;
22use std::future::Future;
23use std::pin::Pin;
24use std::sync::Arc;
25use std::{any, ptr};
26
27pub use data_conn::DataConnError;
28pub use data_hub::DataHubError;
29pub use data_src::{
30    create_static_data_src_container, setup_async, setup_with_order_async, uses_async, DataSrcError,
31};
32
33/// Manages a collection of asynchronous tasks, allowing them to be executed concurrently
34/// and their results (or errors) collected.
35#[allow(clippy::type_complexity)]
36pub struct AsyncGroup {
37    names: Vec<Arc<str>>,
38    tasks: Vec<Pin<Box<dyn Future<Output = errs::Result<()>> + Send + 'static>>>,
39    pub(crate) _name: Arc<str>,
40}
41
42/// A trait for data connection implementations, providing methods for transaction management.
43///
44/// Implementors of this trait represent a connection to a data source and define
45/// how to commit, rollback, and handle the lifecycle of transactions.
46#[allow(async_fn_in_trait)]
47#[allow(unused_variables)] // rustdoc
48pub trait DataConn {
49    /// Attempts to commit the changes made within this data connection.
50    ///
51    /// This is typically the main commit process, executed after `pre_commit_async`.
52    ///
53    /// # Arguments
54    ///
55    /// * `ag` - An `AsyncGroup` to which asynchronous tasks related to the commit can be added.
56    ///
57    /// # Returns
58    ///
59    /// A `Result` indicating success or failure of the commit operation.
60    async fn commit_async(&mut self, ag: &mut AsyncGroup) -> errs::Result<()>;
61
62    /// Performs preparatory actions before the main commit process.
63    ///
64    /// This method is called before `commit_async` and can be used for tasks like
65    /// validation or preparing data.
66    ///
67    /// # Arguments
68    ///
69    /// * `ag` - An `AsyncGroup` to which asynchronous tasks related to pre-commit can be added.
70    ///
71    /// # Returns
72    ///
73    /// A `Result` indicating success or failure of the pre-commit operation.
74    async fn pre_commit_async(&mut self, ag: &mut AsyncGroup) -> errs::Result<()> {
75        Ok(())
76    }
77
78    /// Performs actions after the main commit process, only if it succeeds.
79    ///
80    /// This can be used for cleanup or post-transaction logging. Errors returned from
81    /// tasks added to `ag` in this method are ignored.
82    ///
83    /// # Arguments
84    ///
85    /// * `ag` - An `AsyncGroup` to which asynchronous tasks related to post-commit can be added.
86    async fn post_commit_async(&mut self, ag: &mut AsyncGroup) {}
87
88    /// Indicates whether `force_back_async` should be called instead of `rollback_async`.
89    ///
90    /// This is typically `true` if `commit_async` has already succeeded for this connection,
91    /// implying that changes need to be undone rather than simply discarded.
92    ///
93    /// # Returns
94    ///
95    /// `true` if `force_back_async` should be called, `false` otherwise.
96    fn should_force_back(&self) -> bool {
97        false
98    }
99
100    /// Rolls back any changes made within this data connection.
101    ///
102    /// This method is called if a transaction fails before `commit_async` completes,
103    /// or if `should_force_back` returns `false`. Errors returned from tasks added to `ag`
104    /// in this method are ignored.
105    ///
106    /// # Arguments
107    ///
108    /// * `ag` - An `AsyncGroup` to which asynchronous tasks related to rollback can be added.
109    async fn rollback_async(&mut self, ag: &mut AsyncGroup);
110
111    /// Forces the data connection to revert changes that have already been committed.
112    ///
113    /// This method is called if a transaction fails after `commit_async` has completed
114    /// for this connection, and `should_force_back` returns `true`. Errors returned from
115    /// tasks added to `ag` in this method are ignored.
116    ///
117    /// # Arguments
118    ///
119    /// * `ag` - An `AsyncGroup` to which asynchronous tasks related to force-back can be added.
120    async fn force_back_async(&mut self, ag: &mut AsyncGroup) {}
121
122    /// Closes the data connection, releasing any associated resources.
123    ///
124    /// This method is always called at the end of a transaction, regardless of its outcome.
125    fn close(&mut self);
126}
127
128pub(crate) struct NoopDataConn {}
129
130impl DataConn for NoopDataConn {
131    async fn commit_async(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
132        Ok(())
133    }
134    async fn rollback_async(&mut self, _ag: &mut AsyncGroup) {}
135    fn close(&mut self) {}
136}
137
138#[allow(clippy::type_complexity)]
139#[repr(C)]
140pub(crate) struct DataConnContainer<C = NoopDataConn>
141where
142    C: DataConn + 'static,
143{
144    drop_fn: fn(*const DataConnContainer),
145    is_fn: fn(any::TypeId) -> bool,
146
147    commit_fn: for<'ag> fn(
148        *const DataConnContainer,
149        &'ag mut AsyncGroup,
150    ) -> Pin<Box<dyn Future<Output = errs::Result<()>> + 'ag>>,
151
152    pre_commit_fn: for<'ag> fn(
153        *const DataConnContainer,
154        &'ag mut AsyncGroup,
155    ) -> Pin<Box<dyn Future<Output = errs::Result<()>> + 'ag>>,
156
157    post_commit_fn: for<'ag> fn(
158        *const DataConnContainer,
159        &'ag mut AsyncGroup,
160    ) -> Pin<Box<dyn Future<Output = ()> + 'ag>>,
161
162    should_force_back_fn: fn(*const DataConnContainer) -> bool,
163
164    rollback_fn: for<'ag> fn(
165        *const DataConnContainer,
166        &'ag mut AsyncGroup,
167    ) -> Pin<Box<dyn Future<Output = ()> + 'ag>>,
168
169    force_back_fn: for<'ag> fn(
170        *const DataConnContainer,
171        &'ag mut AsyncGroup,
172    ) -> Pin<Box<dyn Future<Output = ()> + 'ag>>,
173
174    close_fn: fn(*const DataConnContainer),
175
176    name: Arc<str>,
177    data_conn: Box<C>,
178}
179
180pub(crate) struct DataConnManager {
181    vec: Vec<Option<ptr::NonNull<DataConnContainer>>>,
182    index_map: HashMap<Arc<str>, usize>,
183}
184
185/// A trait for data source implementations, responsible for setting up and creating data connections.
186///
187/// Implementors of this trait define how to prepare a data source and how to instantiate
188/// data connections of a specific type `C`.
189#[trait_variant::make(Send)]
190#[allow(async_fn_in_trait)]
191#[allow(unused_variables)] // for rustdoc
192pub trait DataSrc<C>
193where
194    C: DataConn + 'static,
195{
196    /// Performs asynchronous setup operations for the data source.
197    ///
198    /// This method is called to initialize the data source before any data connections
199    /// are created from it.
200    ///
201    /// # Arguments
202    ///
203    /// * `ag` - An `AsyncGroup` to which asynchronous setup tasks can be added.
204    ///
205    /// # Returns
206    ///
207    /// A `Result` indicating success or failure of the setup operation.
208    async fn setup_async(&mut self, ag: &mut AsyncGroup) -> errs::Result<()>;
209
210    /// Closes the data source, releasing any associated resources.
211    ///
212    /// This method is always called at the end of the `DataHub`'s lifecycle for this source.
213    fn close(&mut self);
214
215    /// Asynchronously creates a new data connection from this data source.
216    ///
217    /// # Returns
218    ///
219    /// A `Result` which is `Ok` containing a `Box`ed instance of the data connection `C`,
220    /// or an `Err` if the connection creation fails.
221    async fn create_data_conn_async(&mut self) -> errs::Result<Box<C>>;
222}
223
224pub(crate) struct NoopDataSrc {}
225
226impl DataSrc<NoopDataConn> for NoopDataSrc {
227    async fn setup_async(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
228        Ok(())
229    }
230    fn close(&mut self) {}
231    async fn create_data_conn_async(&mut self) -> errs::Result<Box<NoopDataConn>> {
232        Ok(Box::new(NoopDataConn {}))
233    }
234}
235
236#[allow(clippy::type_complexity)]
237#[repr(C)]
238pub(crate) struct DataSrcContainer<S = NoopDataSrc, C = NoopDataConn>
239where
240    S: DataSrc<C>,
241    C: DataConn + 'static,
242{
243    drop_fn: fn(*const DataSrcContainer),
244    close_fn: fn(*const DataSrcContainer),
245    is_data_conn_fn: fn(any::TypeId) -> bool,
246
247    setup_fn: for<'ag> fn(
248        *const DataSrcContainer,
249        &'ag mut AsyncGroup,
250    ) -> Pin<Box<dyn Future<Output = errs::Result<()>> + Send + 'ag>>,
251
252    create_data_conn_fn: fn(
253        *const DataSrcContainer,
254    ) -> Pin<
255        Box<dyn Future<Output = errs::Result<Box<DataConnContainer<C>>>> + 'static>,
256    >,
257
258    local: bool,
259    name: Arc<str>,
260    data_src: S,
261}
262
263pub(crate) struct DataSrcManager {
264    vec_unready: Vec<SendSyncNonNull<DataSrcContainer>>,
265    vec_ready: Vec<SendSyncNonNull<DataSrcContainer>>,
266    local: bool,
267}
268
269/// A marker struct that can be used to trigger automatic shutdown behavior
270/// for resources managed by the `DataHub` when it goes out of scope.
271pub struct AutoShutdown {}
272
273/// The central hub for managing data sources and their connections within an application.
274///
275/// `DataHub` provides mechanisms to register data sources, acquire data connections,
276/// and execute transactional or non-transactional asynchronous logic.
277pub struct DataHub {
278    local_data_src_manager: DataSrcManager,
279    data_src_map: HashMap<Arc<str>, (bool, usize)>,
280    data_conn_manager: DataConnManager,
281    fixed: bool,
282}
283
284/// A trait defining the ability to access data connections.
285///
286/// This trait abstracts the mechanism for retrieving data connections, allowing
287/// different implementations (e.g., `DataHub`) to provide connections.
288#[allow(async_fn_in_trait)]
289pub trait DataAcc {
290    /// Asynchronously retrieves a data connection of a specific type.
291    ///
292    /// # Type Parameters
293    ///
294    /// * `C` - The expected type of the data connection, which must implement `DataConn` and have a `'static` lifetime.
295    ///
296    /// # Arguments
297    ///
298    /// * `name` - The name of the data connection to retrieve.
299    ///
300    /// # Returns
301    ///
302    /// A `Result` which is `Ok` containing a mutable reference to the data connection
303    /// if found, or an `Err` if the connection cannot be retrieved or cast.
304    async fn get_data_conn_async<C: DataConn + 'static>(
305        &mut self,
306        name: impl AsRef<str>,
307    ) -> errs::Result<&mut C>;
308}
309
310#[doc(hidden)]
311pub struct StaticDataSrcContainer {
312    pub(crate) ssnnptr: SendSyncNonNull<DataSrcContainer>,
313}
314
315#[doc(hidden)]
316pub struct StaticDataSrcRegistration {
317    pub(crate) factory: fn() -> StaticDataSrcContainer,
318}