sabi/tokio/mod.rs
1// Copyright (C) 2024-2026 Takayuki Sato. All Rights Reserved.
2// This program is free software under MIT License.
3// See the file LICENSE in this distribution for more details.
4
5//! This module provides Tokio-specific implementations for asynchronous data access,
6//! including `AsyncGroup` for concurrent task management, `DataConn` for
7//! transactional data connections, `DataSrc` for data source management,
8//! and `DataHub` as a central orchestrator.
9//!
10//! It leverages Rust's asynchronous capabilities with the Tokio runtime
11//! to enable efficient and concurrent handling of data operations.
12
13mod async_group;
14mod data_acc;
15mod data_conn;
16mod data_hub;
17mod data_src;
18
19use crate::SendSyncNonNull;
20
21use std::any;
22use std::collections::HashMap;
23use std::future::Future;
24use std::pin::Pin;
25use std::sync::Arc;
26
27pub use data_conn::DataConnError;
28pub use data_hub::DataHubError;
29pub use data_src::{
30 create_static_data_src_container, setup_async, setup_with_order_async, uses, uses_async,
31 DataSrcError,
32};
33
34/// A convenience macro to easily convert an asynchronous function into a `Pin<Box<dyn Future>>`
35/// closure suitable for `DataHub`'s `run_async` or `txn_async` methods.
36///
37/// This macro simplifies passing async functions by handling the boxing and pinning.
38/// The resulting `Future` implements `Send`.
39///
40/// # Example
41///
42/// ```ignore
43/// async fn my_logic(data: &mut (impl MyData + Send)) -> errs::Result<()> {
44/// // ... some logic using data
45/// Ok(())
46/// }
47///
48/// #[tokio::main]
49/// async fn main() {
50/// let mut hub = DataHub::new();
51/// hub.txn_async(logic!(my_logic)).await.unwrap();
52/// }
53/// ```
54#[doc(inline)]
55pub use crate::_logic as logic;
56
57/// Macro for registering a global data source at the top-level.
58///
59/// # Arguments
60///
61/// * `$name` - The name of the data source (must be a string literal).
62/// * `$data_src` - The data source instance.
63///
64/// # Examples
65///
66/// ```ignore
67/// uses!("my_global_source", MyDataSource::new());
68/// ```
69#[doc(inline)]
70pub use crate::_uses_for_async as uses;
71
72/// Manages a collection of asynchronous tasks, allowing them to be executed concurrently
73/// and their results (or errors) collected.
74#[allow(clippy::type_complexity)]
75pub struct AsyncGroup {
76 names: Vec<Arc<str>>,
77 tasks: Vec<Pin<Box<dyn Future<Output = errs::Result<()>> + Send + 'static>>>,
78 pub(crate) _name: Arc<str>,
79}
80
81/// A trait for data connection implementations, providing methods for transaction management.
82///
83/// Implementors of this trait represent a connection to a data source and define
84/// how to commit, rollback, and handle the lifecycle of transactions.
85#[allow(async_fn_in_trait)]
86#[allow(unused_variables)] // rustdoc
87pub trait DataConn {
88 /// Attempts to commit the changes made within this data connection.
89 ///
90 /// This is typically the main commit process, executed after `pre_commit_async`.
91 ///
92 /// # Arguments
93 ///
94 /// * `ag` - An `AsyncGroup` to which asynchronous tasks related to the commit can be added.
95 ///
96 /// # Returns
97 ///
98 /// A `Result` indicating success or failure of the commit operation.
99 async fn commit_async(&mut self, ag: &mut AsyncGroup) -> errs::Result<()>;
100
101 /// Performs preparatory actions before the main commit process.
102 ///
103 /// This method is called before `commit_async` and can be used for tasks like
104 /// validation or preparing data.
105 ///
106 /// # Arguments
107 ///
108 /// * `ag` - An `AsyncGroup` to which asynchronous tasks related to pre-commit can be added.
109 ///
110 /// # Returns
111 ///
112 /// A `Result` indicating success or failure of the pre-commit operation.
113 async fn pre_commit_async(&mut self, ag: &mut AsyncGroup) -> errs::Result<()> {
114 Ok(())
115 }
116
117 /// Performs actions after the main commit process, only if it succeeds.
118 ///
119 /// This can be used for cleanup or post-transaction logging. Errors returned from
120 /// tasks added to `ag` in this method are ignored.
121 ///
122 /// # Arguments
123 ///
124 /// * `ag` - An `AsyncGroup` to which asynchronous tasks related to post-commit can be added.
125 async fn post_commit_async(&mut self, ag: &mut AsyncGroup) {}
126
127 /// Indicates whether `force_back_async` should be called instead of `rollback_async`.
128 ///
129 /// This is typically `true` if `commit_async` has already succeeded for this connection,
130 /// implying that changes need to be undone rather than simply discarded.
131 ///
132 /// # Returns
133 ///
134 /// `true` if `force_back_async` should be called, `false` otherwise.
135 fn should_force_back(&self) -> bool {
136 false
137 }
138
139 /// Rolls back any changes made within this data connection.
140 ///
141 /// This method is called if a transaction fails before `commit_async` completes,
142 /// or if `should_force_back` returns `false`. Errors returned from tasks added to `ag`
143 /// in this method are ignored.
144 ///
145 /// # Arguments
146 ///
147 /// * `ag` - An `AsyncGroup` to which asynchronous tasks related to rollback can be added.
148 async fn rollback_async(&mut self, ag: &mut AsyncGroup);
149
150 /// Forces the data connection to revert changes that have already been committed.
151 ///
152 /// This method is called if a transaction fails after `commit_async` has completed
153 /// for this connection, and `should_force_back` returns `true`. Errors returned from
154 /// tasks added to `ag` in this method are ignored.
155 ///
156 /// # Arguments
157 ///
158 /// * `ag` - An `AsyncGroup` to which asynchronous tasks related to force-back can be added.
159 async fn force_back_async(&mut self, ag: &mut AsyncGroup) {}
160
161 /// Closes the data connection, releasing any associated resources.
162 ///
163 /// This method is always called at the end of a transaction, regardless of its outcome.
164 fn close(&mut self);
165}
166
167pub(crate) struct NoopDataConn {}
168
169impl DataConn for NoopDataConn {
170 async fn commit_async(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
171 Ok(())
172 }
173 async fn rollback_async(&mut self, _ag: &mut AsyncGroup) {}
174 fn close(&mut self) {}
175}
176
177#[allow(clippy::type_complexity)]
178#[repr(C)]
179pub(crate) struct DataConnContainer<C = NoopDataConn>
180where
181 C: DataConn + 'static,
182{
183 drop_fn: fn(*const DataConnContainer),
184 is_fn: fn(any::TypeId) -> bool,
185
186 commit_fn: for<'ag> fn(
187 *const DataConnContainer,
188 &'ag mut AsyncGroup,
189 ) -> Pin<Box<dyn Future<Output = errs::Result<()>> + 'ag>>,
190
191 pre_commit_fn: for<'ag> fn(
192 *const DataConnContainer,
193 &'ag mut AsyncGroup,
194 ) -> Pin<Box<dyn Future<Output = errs::Result<()>> + 'ag>>,
195
196 post_commit_fn: for<'ag> fn(
197 *const DataConnContainer,
198 &'ag mut AsyncGroup,
199 ) -> Pin<Box<dyn Future<Output = ()> + 'ag>>,
200
201 should_force_back_fn: fn(*const DataConnContainer) -> bool,
202
203 rollback_fn: for<'ag> fn(
204 *const DataConnContainer,
205 &'ag mut AsyncGroup,
206 ) -> Pin<Box<dyn Future<Output = ()> + 'ag>>,
207
208 force_back_fn: for<'ag> fn(
209 *const DataConnContainer,
210 &'ag mut AsyncGroup,
211 ) -> Pin<Box<dyn Future<Output = ()> + 'ag>>,
212
213 close_fn: fn(*const DataConnContainer),
214
215 name: Arc<str>,
216 data_conn: Box<C>,
217}
218
219pub(crate) struct DataConnManager {
220 vec: Vec<Option<SendSyncNonNull<DataConnContainer>>>,
221 index_map: HashMap<Arc<str>, usize>,
222}
223
224/// A trait for data source implementations, responsible for setting up and creating data connections.
225///
226/// Implementors of this trait define how to prepare a data source and how to instantiate
227/// data connections of a specific type `C`.
228#[trait_variant::make(Send)]
229#[allow(async_fn_in_trait)]
230#[allow(unused_variables)] // for rustdoc
231pub trait DataSrc<C>
232where
233 C: DataConn + 'static,
234{
235 /// Performs asynchronous setup operations for the data source.
236 ///
237 /// This method is called to initialize the data source before any data connections
238 /// are created from it.
239 ///
240 /// # Arguments
241 ///
242 /// * `ag` - An `AsyncGroup` to which asynchronous setup tasks can be added.
243 ///
244 /// # Returns
245 ///
246 /// A `Result` indicating success or failure of the setup operation.
247 async fn setup_async(&mut self, ag: &mut AsyncGroup) -> errs::Result<()>;
248
249 /// Closes the data source, releasing any associated resources.
250 ///
251 /// This method is always called at the end of the `DataHub`'s lifecycle for this source.
252 fn close(&mut self);
253
254 /// Asynchronously creates a new data connection from this data source.
255 ///
256 /// # Returns
257 ///
258 /// A `Result` which is `Ok` containing a `Box`ed instance of the data connection `C`,
259 /// or an `Err` if the connection creation fails.
260 async fn create_data_conn_async(&mut self) -> errs::Result<Box<C>>;
261}
262
263pub(crate) struct NoopDataSrc {}
264
265impl DataSrc<NoopDataConn> for NoopDataSrc {
266 async fn setup_async(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
267 Ok(())
268 }
269 fn close(&mut self) {}
270 async fn create_data_conn_async(&mut self) -> errs::Result<Box<NoopDataConn>> {
271 Ok(Box::new(NoopDataConn {}))
272 }
273}
274
275#[allow(clippy::type_complexity)]
276#[repr(C)]
277pub(crate) struct DataSrcContainer<S = NoopDataSrc, C = NoopDataConn>
278where
279 S: DataSrc<C>,
280 C: DataConn + 'static,
281{
282 drop_fn: fn(*const DataSrcContainer),
283 close_fn: fn(*const DataSrcContainer),
284 is_data_conn_fn: fn(any::TypeId) -> bool,
285
286 setup_fn: for<'ag> fn(
287 *const DataSrcContainer,
288 &'ag mut AsyncGroup,
289 ) -> Pin<Box<dyn Future<Output = errs::Result<()>> + Send + 'ag>>,
290
291 create_data_conn_fn: fn(
292 *const DataSrcContainer,
293 ) -> Pin<
294 Box<dyn Future<Output = errs::Result<Box<DataConnContainer<C>>>> + Send + 'static>,
295 >,
296
297 local: bool,
298 name: Arc<str>,
299 data_src: S,
300}
301
302pub(crate) struct DataSrcManager {
303 vec_unready: Vec<SendSyncNonNull<DataSrcContainer>>,
304 vec_ready: Vec<SendSyncNonNull<DataSrcContainer>>,
305 local: bool,
306}
307
308/// A marker struct that can be used to trigger automatic shutdown behavior
309/// for resources managed by the `DataHub` when it goes out of scope.
310pub struct AutoShutdown {}
311
312/// The central hub for managing data sources and their connections within an application.
313///
314/// `DataHub` provides mechanisms to register data sources, acquire data connections,
315/// and execute transactional or non-transactional asynchronous logic.
316///
317/// This structure implements `Send`.
318pub struct DataHub {
319 local_data_src_manager: DataSrcManager,
320 data_src_map: HashMap<Arc<str>, (bool, usize)>,
321 data_conn_manager: DataConnManager,
322 fixed: bool,
323}
324
325/// A trait defining the ability to access data connections.
326///
327/// This trait abstracts the mechanism for retrieving data connections, allowing
328/// different implementations (e.g., `DataHub`) to provide connections.
329#[allow(async_fn_in_trait)]
330pub trait DataAcc {
331 /// Asynchronously retrieves a data connection of a specific type.
332 ///
333 /// # Type Parameters
334 ///
335 /// * `C` - The expected type of the data connection, which must implement `DataConn` and have a `'static` lifetime.
336 ///
337 /// # Arguments
338 ///
339 /// * `name` - The name of the data connection to retrieve.
340 ///
341 /// # Returns
342 ///
343 /// A `Result` which is `Ok` containing a mutable reference to the data connection
344 /// if found, or an `Err` if the connection cannot be retrieved or cast.
345 async fn get_data_conn_async<C: DataConn + 'static>(
346 &mut self,
347 name: impl AsRef<str>,
348 ) -> errs::Result<&mut C>;
349}
350
351#[doc(hidden)]
352pub struct StaticDataSrcContainer {
353 pub(crate) ssnnptr: SendSyncNonNull<DataSrcContainer>,
354}
355
356#[doc(hidden)]
357pub struct StaticDataSrcRegistration {
358 pub(crate) factory: fn() -> StaticDataSrcContainer,
359}