sabi/tokio/mod.rs
1// Copyright (C) 2024-2026 Takayuki Sato. All Rights Reserved.
2// This program is free software under MIT License.
3// See the file LICENSE in this distribution for more details.
4
5//! This module provides Tokio-specific implementations for asynchronous data access,
6//! including `AsyncGroup` for concurrent task management, `DataConn` for
7//! transactional data connections, `DataSrc` for data source management,
8//! and `DataHub` as a central orchestrator.
9//!
10//! It leverages Rust's asynchronous capabilities with the Tokio runtime
11//! to enable efficient and concurrent handling of data operations.
12
13mod async_group;
14mod data_acc;
15mod data_conn;
16mod data_hub;
17mod data_src;
18
19use crate::SendSyncNonNull;
20
21use std::collections::HashMap;
22use std::future::Future;
23use std::pin::Pin;
24use std::sync::Arc;
25use std::{any, ptr};
26
27pub use data_conn::DataConnError;
28pub use data_hub::DataHubError;
29pub use data_src::{
30 create_static_data_src_container, setup_async, setup_with_order_async, uses_async, DataSrcError,
31};
32
33/// Manages a collection of asynchronous tasks, allowing them to be executed concurrently
34/// and their results (or errors) collected.
35#[allow(clippy::type_complexity)]
36pub struct AsyncGroup {
37 names: Vec<Arc<str>>,
38 tasks: Vec<Pin<Box<dyn Future<Output = errs::Result<()>> + Send + 'static>>>,
39 pub(crate) _name: Arc<str>,
40}
41
42/// A trait for data connection implementations, providing methods for transaction management.
43///
44/// Implementors of this trait represent a connection to a data source and define
45/// how to commit, rollback, and handle the lifecycle of transactions.
46#[allow(async_fn_in_trait)]
47#[allow(unused_variables)] // rustdoc
48pub trait DataConn {
49 /// Attempts to commit the changes made within this data connection.
50 ///
51 /// This is typically the main commit process, executed after `pre_commit_async`.
52 ///
53 /// # Arguments
54 ///
55 /// * `ag` - An `AsyncGroup` to which asynchronous tasks related to the commit can be added.
56 ///
57 /// # Returns
58 ///
59 /// A `Result` indicating success or failure of the commit operation.
60 async fn commit_async(&mut self, ag: &mut AsyncGroup) -> errs::Result<()>;
61
62 /// Performs preparatory actions before the main commit process.
63 ///
64 /// This method is called before `commit_async` and can be used for tasks like
65 /// validation or preparing data.
66 ///
67 /// # Arguments
68 ///
69 /// * `ag` - An `AsyncGroup` to which asynchronous tasks related to pre-commit can be added.
70 ///
71 /// # Returns
72 ///
73 /// A `Result` indicating success or failure of the pre-commit operation.
74 async fn pre_commit_async(&mut self, ag: &mut AsyncGroup) -> errs::Result<()> {
75 Ok(())
76 }
77
78 /// Performs actions after the main commit process, only if it succeeds.
79 ///
80 /// This can be used for cleanup or post-transaction logging. Errors returned from
81 /// tasks added to `ag` in this method are ignored.
82 ///
83 /// # Arguments
84 ///
85 /// * `ag` - An `AsyncGroup` to which asynchronous tasks related to post-commit can be added.
86 async fn post_commit_async(&mut self, ag: &mut AsyncGroup) {}
87
88 /// Indicates whether `force_back_async` should be called instead of `rollback_async`.
89 ///
90 /// This is typically `true` if `commit_async` has already succeeded for this connection,
91 /// implying that changes need to be undone rather than simply discarded.
92 ///
93 /// # Returns
94 ///
95 /// `true` if `force_back_async` should be called, `false` otherwise.
96 fn should_force_back(&self) -> bool {
97 false
98 }
99
100 /// Rolls back any changes made within this data connection.
101 ///
102 /// This method is called if a transaction fails before `commit_async` completes,
103 /// or if `should_force_back` returns `false`. Errors returned from tasks added to `ag`
104 /// in this method are ignored.
105 ///
106 /// # Arguments
107 ///
108 /// * `ag` - An `AsyncGroup` to which asynchronous tasks related to rollback can be added.
109 async fn rollback_async(&mut self, ag: &mut AsyncGroup);
110
111 /// Forces the data connection to revert changes that have already been committed.
112 ///
113 /// This method is called if a transaction fails after `commit_async` has completed
114 /// for this connection, and `should_force_back` returns `true`. Errors returned from
115 /// tasks added to `ag` in this method are ignored.
116 ///
117 /// # Arguments
118 ///
119 /// * `ag` - An `AsyncGroup` to which asynchronous tasks related to force-back can be added.
120 async fn force_back_async(&mut self, ag: &mut AsyncGroup) {}
121
122 /// Closes the data connection, releasing any associated resources.
123 ///
124 /// This method is always called at the end of a transaction, regardless of its outcome.
125 fn close(&mut self);
126}
127
128pub(crate) struct NoopDataConn {}
129
130impl DataConn for NoopDataConn {
131 async fn commit_async(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
132 Ok(())
133 }
134 async fn rollback_async(&mut self, _ag: &mut AsyncGroup) {}
135 fn close(&mut self) {}
136}
137
138#[allow(clippy::type_complexity)]
139#[repr(C)]
140pub(crate) struct DataConnContainer<C = NoopDataConn>
141where
142 C: DataConn + 'static,
143{
144 drop_fn: fn(*const DataConnContainer),
145 is_fn: fn(any::TypeId) -> bool,
146
147 commit_fn: for<'ag> fn(
148 *const DataConnContainer,
149 &'ag mut AsyncGroup,
150 ) -> Pin<Box<dyn Future<Output = errs::Result<()>> + 'ag>>,
151
152 pre_commit_fn: for<'ag> fn(
153 *const DataConnContainer,
154 &'ag mut AsyncGroup,
155 ) -> Pin<Box<dyn Future<Output = errs::Result<()>> + 'ag>>,
156
157 post_commit_fn: for<'ag> fn(
158 *const DataConnContainer,
159 &'ag mut AsyncGroup,
160 ) -> Pin<Box<dyn Future<Output = ()> + 'ag>>,
161
162 should_force_back_fn: fn(*const DataConnContainer) -> bool,
163
164 rollback_fn: for<'ag> fn(
165 *const DataConnContainer,
166 &'ag mut AsyncGroup,
167 ) -> Pin<Box<dyn Future<Output = ()> + 'ag>>,
168
169 force_back_fn: for<'ag> fn(
170 *const DataConnContainer,
171 &'ag mut AsyncGroup,
172 ) -> Pin<Box<dyn Future<Output = ()> + 'ag>>,
173
174 close_fn: fn(*const DataConnContainer),
175
176 name: Arc<str>,
177 data_conn: Box<C>,
178}
179
180pub(crate) struct DataConnManager {
181 vec: Vec<Option<ptr::NonNull<DataConnContainer>>>,
182 index_map: HashMap<Arc<str>, usize>,
183}
184
185/// A trait for data source implementations, responsible for setting up and creating data connections.
186///
187/// Implementors of this trait define how to prepare a data source and how to instantiate
188/// data connections of a specific type `C`.
189#[trait_variant::make(Send)]
190#[allow(async_fn_in_trait)]
191#[allow(unused_variables)] // for rustdoc
192pub trait DataSrc<C>
193where
194 C: DataConn + 'static,
195{
196 /// Performs asynchronous setup operations for the data source.
197 ///
198 /// This method is called to initialize the data source before any data connections
199 /// are created from it.
200 ///
201 /// # Arguments
202 ///
203 /// * `ag` - An `AsyncGroup` to which asynchronous setup tasks can be added.
204 ///
205 /// # Returns
206 ///
207 /// A `Result` indicating success or failure of the setup operation.
208 async fn setup_async(&mut self, ag: &mut AsyncGroup) -> errs::Result<()>;
209
210 /// Closes the data source, releasing any associated resources.
211 ///
212 /// This method is always called at the end of the `DataHub`'s lifecycle for this source.
213 fn close(&mut self);
214
215 /// Asynchronously creates a new data connection from this data source.
216 ///
217 /// # Returns
218 ///
219 /// A `Result` which is `Ok` containing a `Box`ed instance of the data connection `C`,
220 /// or an `Err` if the connection creation fails.
221 async fn create_data_conn_async(&mut self) -> errs::Result<Box<C>>;
222}
223
224pub(crate) struct NoopDataSrc {}
225
226impl DataSrc<NoopDataConn> for NoopDataSrc {
227 async fn setup_async(&mut self, _ag: &mut AsyncGroup) -> errs::Result<()> {
228 Ok(())
229 }
230 fn close(&mut self) {}
231 async fn create_data_conn_async(&mut self) -> errs::Result<Box<NoopDataConn>> {
232 Ok(Box::new(NoopDataConn {}))
233 }
234}
235
236#[allow(clippy::type_complexity)]
237#[repr(C)]
238pub(crate) struct DataSrcContainer<S = NoopDataSrc, C = NoopDataConn>
239where
240 S: DataSrc<C>,
241 C: DataConn + 'static,
242{
243 drop_fn: fn(*const DataSrcContainer),
244 close_fn: fn(*const DataSrcContainer),
245 is_data_conn_fn: fn(any::TypeId) -> bool,
246
247 setup_fn: for<'ag> fn(
248 *const DataSrcContainer,
249 &'ag mut AsyncGroup,
250 ) -> Pin<Box<dyn Future<Output = errs::Result<()>> + Send + 'ag>>,
251
252 create_data_conn_fn: fn(
253 *const DataSrcContainer,
254 ) -> Pin<
255 Box<dyn Future<Output = errs::Result<Box<DataConnContainer<C>>>> + 'static>,
256 >,
257
258 local: bool,
259 name: Arc<str>,
260 data_src: S,
261}
262
263pub(crate) struct DataSrcManager {
264 vec_unready: Vec<SendSyncNonNull<DataSrcContainer>>,
265 vec_ready: Vec<SendSyncNonNull<DataSrcContainer>>,
266 local: bool,
267}
268
269/// A marker struct that can be used to trigger automatic shutdown behavior
270/// for resources managed by the `DataHub` when it goes out of scope.
271pub struct AutoShutdown {}
272
273/// The central hub for managing data sources and their connections within an application.
274///
275/// `DataHub` provides mechanisms to register data sources, acquire data connections,
276/// and execute transactional or non-transactional asynchronous logic.
277pub struct DataHub {
278 local_data_src_manager: DataSrcManager,
279 data_src_map: HashMap<Arc<str>, (bool, usize)>,
280 data_conn_manager: DataConnManager,
281 fixed: bool,
282}
283
284/// A trait defining the ability to access data connections.
285///
286/// This trait abstracts the mechanism for retrieving data connections, allowing
287/// different implementations (e.g., `DataHub`) to provide connections.
288#[allow(async_fn_in_trait)]
289pub trait DataAcc {
290 /// Asynchronously retrieves a data connection of a specific type.
291 ///
292 /// # Type Parameters
293 ///
294 /// * `C` - The expected type of the data connection, which must implement `DataConn` and have a `'static` lifetime.
295 ///
296 /// # Arguments
297 ///
298 /// * `name` - The name of the data connection to retrieve.
299 ///
300 /// # Returns
301 ///
302 /// A `Result` which is `Ok` containing a mutable reference to the data connection
303 /// if found, or an `Err` if the connection cannot be retrieved or cast.
304 async fn get_data_conn_async<C: DataConn + 'static>(
305 &mut self,
306 name: impl AsRef<str>,
307 ) -> errs::Result<&mut C>;
308}
309
310#[doc(hidden)]
311pub struct StaticDataSrcContainer {
312 pub(crate) ssnnptr: SendSyncNonNull<DataSrcContainer>,
313}
314
315#[doc(hidden)]
316pub struct StaticDataSrcRegistration {
317 pub(crate) factory: fn() -> StaticDataSrcContainer,
318}