sabi/tokio/mod.rs
1// Copyright (C) 2024-2026 Takayuki Sato. All Rights Reserved.
2// This program is free software under MIT License.
3// See the file LICENSE in this distribution for more details.
4
5//! This module provides Tokio-specific implementations for asynchronous data access,
6//! including `AsyncGroup` for concurrent task management, `DataConn` for
7//! transactional data connections, `DataSrc` for data source management,
8//! and `DataHub` as a central orchestrator.
9//!
10//! It leverages Rust's asynchronous capabilities with the Tokio runtime
11//! to enable efficient and concurrent handling of data operations.
12
13mod async_group;
14mod data_acc;
15mod data_conn;
16mod data_hub;
17mod data_src;
18
19use crate::SendSyncNonNull;
20
21use std::any;
22use std::collections::HashMap;
23use std::future::Future;
24use std::pin::Pin;
25use std::sync::Arc;
26
27pub use data_conn::DataConnError;
28pub use data_hub::DataHubError;
29pub use data_src::{
30 create_static_data_src_container, setup_async, setup_with_order_async, uses, uses_async,
31 DataSrcError,
32};
33
34/// A convenience macro to easily convert an asynchronous function into a `Pin<Box<dyn Future>>`
35/// closure suitable for `DataHub`'s `run_async` or `txn_async` methods.
36///
37/// This macro simplifies passing async functions by handling the boxing and pinning.
38/// The resulting `Future` implements `Send`.
39///
40/// # Example
41///
42/// ```ignore
43/// async fn my_logic(data: &mut (impl MyData + Send)) -> errs::Result<()> {
44/// // ... some logic using data
45/// Ok(())
46/// }
47///
48/// #[tokio::main]
49/// async fn main() {
50/// let mut hub = DataHub::new();
51/// hub.txn_async(logic!(my_logic)).await.unwrap();
52/// }
53/// ```
54#[doc(inline)]
55pub use crate::_logic as logic;
56
57/// Macro for registering a global data source at the top-level.
58///
59/// # Arguments
60///
61/// * `$name` - The name of the data source (must be a string literal).
62/// * `$data_src` - The data source instance.
63///
64/// # Examples
65///
66/// ```ignore
67/// uses!("my_global_source", MyDataSource::new());
68/// ```
69#[doc(inline)]
70pub use crate::_uses_for_async as uses;
71
72/// Manages a collection of asynchronous tasks, allowing them to be executed concurrently
73/// and their results (or errors) collected.
74#[allow(clippy::type_complexity)]
75pub struct AsyncGroup {
76 names: Vec<Arc<str>>,
77 tasks: Vec<Pin<Box<dyn Future<Output = errs::Result<()>> + Send + 'static>>>,
78 pub(crate) _name: Arc<str>,
79}
80
81/// A trait for data connection implementations, providing methods for transaction management.
82///
83/// Implementors of this trait represent a connection to a data source and define
84/// how to commit, rollback, and handle the lifecycle of transactions.
85#[allow(async_fn_in_trait)]
86#[allow(unused_variables)] // rustdoc
87pub trait DataConn {
88 /// Attempts to commit the changes made within this data connection.
89 ///
90 /// This is typically the main commit process, executed after `pre_commit_async`.
91 ///
92 /// # Arguments
93 ///
94 /// * `ag` - An `AsyncGroup` to which asynchronous tasks related to the commit can be added.
95 ///
96 /// # Returns
97 ///
98 /// A `Result` indicating success or failure of the commit operation.
99 fn commit_async(
100 &mut self,
101 ag: &mut AsyncGroup,
102 ) -> impl Future<Output = errs::Result<()>> + Send;
103
104 /// Performs preparatory actions before the main commit process.
105 ///
106 /// This method is called before `commit_async` and can be used for tasks like
107 /// validation or preparing data.
108 ///
109 /// # Arguments
110 ///
111 /// * `ag` - An `AsyncGroup` to which asynchronous tasks related to pre-commit can be added.
112 ///
113 /// # Returns
114 ///
115 /// A `Result` indicating success or failure of the pre-commit operation.
116 fn pre_commit_async(
117 &mut self,
118 ag: &mut AsyncGroup,
119 ) -> impl Future<Output = errs::Result<()>> + Send {
120 async { Ok(()) }
121 }
122
123 /// Performs actions after the main commit process, only if it succeeds.
124 ///
125 /// This can be used for cleanup or post-transaction logging. Errors returned from
126 /// tasks added to `ag` in this method are ignored.
127 ///
128 /// # Arguments
129 ///
130 /// * `ag` - An `AsyncGroup` to which asynchronous tasks related to post-commit can be added.
131 fn post_commit_async(&mut self, ag: &mut AsyncGroup) -> impl Future<Output = ()> + Send {
132 async {}
133 }
134
135 /// Indicates whether `force_back_async` should be called instead of `rollback_async`.
136 ///
137 /// This is typically `true` if `commit_async` has already succeeded for this connection,
138 /// implying that changes need to be undone rather than simply discarded.
139 ///
140 /// # Returns
141 ///
142 /// `true` if `force_back_async` should be called, `false` otherwise.
143 fn should_force_back(&self) -> bool {
144 false
145 }
146
147 /// Rolls back any changes made within this data connection.
148 ///
149 /// This method is called if a transaction fails before `commit_async` completes,
150 /// or if `should_force_back` returns `false`. Errors returned from tasks added to `ag`
151 /// in this method are ignored.
152 ///
153 /// # Arguments
154 ///
155 /// * `ag` - An `AsyncGroup` to which asynchronous tasks related to rollback can be added.
156 fn rollback_async(&mut self, ag: &mut AsyncGroup) -> impl Future<Output = ()> + Send;
157
158 /// Forces the data connection to revert changes that have already been committed.
159 ///
160 /// This method is called if a transaction fails after `commit_async` has completed
161 /// for this connection, and `should_force_back` returns `true`. Errors returned from
162 /// tasks added to `ag` in this method are ignored.
163 ///
164 /// # Arguments
165 ///
166 /// * `ag` - An `AsyncGroup` to which asynchronous tasks related to force-back can be added.
167 fn force_back_async(&mut self, ag: &mut AsyncGroup) -> impl Future<Output = ()> + Send {
168 async {}
169 }
170
171 /// Closes the data connection, releasing any associated resources.
172 ///
173 /// This method is always called at the end of a transaction, regardless of its outcome.
174 fn close(&mut self);
175}
176
177pub(crate) struct NoopDataConn {}
178
179impl DataConn for NoopDataConn {
180 async fn commit_async(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
181 Ok(())
182 }
183 async fn rollback_async(&mut self, _ag: &mut AsyncGroup) {}
184 fn close(&mut self) {}
185}
186
187#[allow(clippy::type_complexity)]
188#[repr(C)]
189pub(crate) struct DataConnContainer<C = NoopDataConn>
190where
191 C: DataConn + 'static,
192{
193 drop_fn: fn(*const DataConnContainer),
194 is_fn: fn(any::TypeId) -> bool,
195
196 commit_fn: for<'ag> fn(
197 *const DataConnContainer,
198 &'ag mut AsyncGroup,
199 ) -> Pin<Box<dyn Future<Output = errs::Result<()>> + Send + 'ag>>,
200
201 pre_commit_fn: for<'ag> fn(
202 *const DataConnContainer,
203 &'ag mut AsyncGroup,
204 ) -> Pin<Box<dyn Future<Output = errs::Result<()>> + Send + 'ag>>,
205
206 post_commit_fn: for<'ag> fn(
207 *const DataConnContainer,
208 &'ag mut AsyncGroup,
209 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'ag>>,
210
211 should_force_back_fn: fn(*const DataConnContainer) -> bool,
212
213 rollback_fn: for<'ag> fn(
214 *const DataConnContainer,
215 &'ag mut AsyncGroup,
216 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'ag>>,
217
218 force_back_fn: for<'ag> fn(
219 *const DataConnContainer,
220 &'ag mut AsyncGroup,
221 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'ag>>,
222
223 close_fn: fn(*const DataConnContainer),
224
225 name: Arc<str>,
226 data_conn: Box<C>,
227}
228
229pub(crate) struct DataConnManager {
230 vec: Vec<Option<SendSyncNonNull<DataConnContainer>>>,
231 index_map: HashMap<Arc<str>, usize>,
232}
233
234/// A trait for data source implementations, responsible for setting up and creating data connections.
235///
236/// Implementors of this trait define how to prepare a data source and how to instantiate
237/// data connections of a specific type `C`.
238#[trait_variant::make(Send)]
239#[allow(unused_variables)] // for rustdoc
240pub trait DataSrc<C>
241where
242 C: DataConn + 'static,
243{
244 /// Performs asynchronous setup operations for the data source.
245 ///
246 /// This method is called to initialize the data source before any data connections
247 /// are created from it.
248 ///
249 /// # Arguments
250 ///
251 /// * `ag` - An `AsyncGroup` to which asynchronous setup tasks can be added.
252 ///
253 /// # Returns
254 ///
255 /// A `Result` indicating success or failure of the setup operation.
256 async fn setup_async(&mut self, ag: &mut AsyncGroup) -> errs::Result<()>;
257
258 /// Closes the data source, releasing any associated resources.
259 ///
260 /// This method is always called at the end of the `DataHub`'s lifecycle for this source.
261 fn close(&mut self);
262
263 /// Asynchronously creates a new data connection from this data source.
264 ///
265 /// # Returns
266 ///
267 /// A `Result` which is `Ok` containing a `Box`ed instance of the data connection `C`,
268 /// or an `Err` if the connection creation fails.
269 async fn create_data_conn_async(&mut self) -> errs::Result<Box<C>>;
270}
271
272pub(crate) struct NoopDataSrc {}
273
274impl DataSrc<NoopDataConn> for NoopDataSrc {
275 async fn setup_async(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
276 Ok(())
277 }
278 fn close(&mut self) {}
279 async fn create_data_conn_async(&mut self) -> errs::Result<Box<NoopDataConn>> {
280 Ok(Box::new(NoopDataConn {}))
281 }
282}
283
284#[allow(clippy::type_complexity)]
285#[repr(C)]
286pub(crate) struct DataSrcContainer<S = NoopDataSrc, C = NoopDataConn>
287where
288 S: DataSrc<C>,
289 C: DataConn + 'static,
290{
291 drop_fn: fn(*const DataSrcContainer),
292 close_fn: fn(*const DataSrcContainer),
293 is_data_conn_fn: fn(any::TypeId) -> bool,
294
295 setup_fn: for<'ag> fn(
296 *const DataSrcContainer,
297 &'ag mut AsyncGroup,
298 ) -> Pin<Box<dyn Future<Output = errs::Result<()>> + Send + 'ag>>,
299
300 create_data_conn_fn: fn(
301 *const DataSrcContainer,
302 ) -> Pin<
303 Box<dyn Future<Output = errs::Result<Box<DataConnContainer<C>>>> + Send + 'static>,
304 >,
305
306 local: bool,
307 name: Arc<str>,
308 data_src: S,
309}
310
311pub(crate) struct DataSrcManager {
312 vec_unready: Vec<SendSyncNonNull<DataSrcContainer>>,
313 vec_ready: Vec<SendSyncNonNull<DataSrcContainer>>,
314 local: bool,
315}
316
317/// A marker struct that can be used to trigger automatic shutdown behavior
318/// for resources managed by the `DataHub` when it goes out of scope.
319pub struct AutoShutdown {}
320
321/// The central hub for managing data sources and their connections within an application.
322///
323/// `DataHub` provides mechanisms to register data sources, acquire data connections,
324/// and execute transactional or non-transactional asynchronous logic.
325///
326/// This structure implements `Send`.
327pub struct DataHub {
328 local_data_src_manager: DataSrcManager,
329 data_src_map: HashMap<Arc<str>, (bool, usize)>,
330 data_conn_manager: DataConnManager,
331 fixed: bool,
332}
333
334/// A trait defining the ability to access data connections.
335///
336/// This trait abstracts the mechanism for retrieving data connections, allowing
337/// different implementations (e.g., `DataHub`) to provide connections.
338pub trait DataAcc {
339 /// Asynchronously retrieves a data connection of a specific type.
340 ///
341 /// # Type Parameters
342 ///
343 /// * `C` - The expected type of the data connection, which must implement `DataConn` and have a `'static` lifetime.
344 ///
345 /// # Arguments
346 ///
347 /// * `name` - The name of the data connection to retrieve.
348 ///
349 /// # Returns
350 ///
351 /// A `Result` which is `Ok` containing a mutable reference to the data connection
352 /// if found, or an `Err` if the connection cannot be retrieved or cast.
353 #[allow(async_fn_in_trait)]
354 fn get_data_conn_async<C: DataConn + 'static>(
355 &mut self,
356 name: &str,
357 ) -> impl Future<Output = errs::Result<&mut C>> + Send;
358}
359
360#[doc(hidden)]
361pub struct StaticDataSrcContainer {
362 pub(crate) ssnnptr: SendSyncNonNull<DataSrcContainer>,
363}
364
365#[doc(hidden)]
366pub struct StaticDataSrcRegistration {
367 pub(crate) factory: fn() -> StaticDataSrcContainer,
368}