tsubakuro_rust_core/session/
session.rs

1use std::{
2    sync::{atomic::AtomicBool, Arc, RwLock},
3    time::Duration,
4};
5
6use log::{error, trace, warn};
7
8use crate::{
9    error::TgError,
10    illegal_argument_error,
11    job::Job,
12    prelude::{
13        r#type::large_object::{LargeObjectRecvPathMapping, LargeObjectSendPathMapping},
14        Endpoint, ShutdownType,
15    },
16    service::{core::core_service::CoreService, ServiceClient},
17};
18
19use super::{option::ConnectionOption, tcp::connector::TcpConnector, wire::Wire};
20
21/// Represents a connection to Tsurugi server.
22///
23/// Note: Should invoke [`Self::close`] before [`Self::drop`] to dispose the session.
24///
25/// # Examples
26/// ```
27/// use tsubakuro_rust_core::prelude::*;
28///
29/// async fn example() -> Result<(), TgError> {
30///     let mut connection_option = ConnectionOption::new();
31///     connection_option.set_endpoint_url("tcp://localhost:12345");
32///     connection_option.set_application_name("Tsubakuro/Rust example");
33///     connection_option.set_session_label("example session");
34///     connection_option.set_default_timeout(std::time::Duration::from_secs(10));
35///
36///     let session = Session::connect(&connection_option).await?;
37///     let client: SqlClient = session.make_client();
38///
39///     session.close().await;
40///     Ok(())
41/// }
42/// ```
43///
44/// See [SqlClient](crate::prelude::SqlClient).
45#[derive(Debug)]
46pub struct Session {
47    wire: Arc<Wire>,
48    lob_send_path_mapping: LargeObjectSendPathMapping,
49    lob_recv_path_mapping: LargeObjectRecvPathMapping,
50    default_timeout: RwLock<Duration>,
51    shutdowned: AtomicBool,
52    fail_on_drop_error: AtomicBool,
53}
54
55impl Session {
56    /// Establishes a connection to the Tsurugi server.
57    ///
58    /// Note: Should invoke [`Self::close`] before [`Self::drop`] to dispose the session.
59    ///
60    /// # Examples
61    /// ```
62    /// use tsubakuro_rust_core::prelude::*;
63    ///
64    /// async fn example(connection_option: ConnectionOption) -> Result<(), TgError> {
65    ///     let session = Session::connect(&connection_option).await?;
66    ///     let client: SqlClient = session.make_client();
67    ///
68    ///     session.close().await;
69    ///     Ok(())
70    /// }
71    /// ```
72    pub async fn connect(connection_option: &ConnectionOption) -> Result<Arc<Session>, TgError> {
73        let timeout = connection_option.default_timeout();
74        Self::connect_for(connection_option, timeout).await
75    }
76
77    /// Establishes a connection to the Tsurugi server.
78    ///
79    /// Note: Should invoke [`Self::close`] before [`Self::drop`] to dispose the session.
80    pub async fn connect_for(
81        connection_option: &ConnectionOption,
82        timeout: Duration,
83    ) -> Result<Arc<Session>, TgError> {
84        let endpoint = Self::check_endpoint(connection_option)?;
85        let default_timeout = connection_option.default_timeout();
86
87        match endpoint {
88            Endpoint::Tcp(..) => {
89                TcpConnector::connect(connection_option, timeout, default_timeout).await
90            }
91            _ => Err(illegal_argument_error!("unsupported endpoint")),
92        }
93    }
94
95    /// Establishes a connection to the Tsurugi server.
96    ///
97    /// Note: Should invoke [`Self::close`] before [`Self::drop`] to dispose the session.
98    pub async fn connect_async(
99        connection_option: &ConnectionOption,
100    ) -> Result<Job<Arc<Session>>, TgError> {
101        let endpoint = Self::check_endpoint(connection_option)?;
102        let default_timeout = connection_option.default_timeout();
103
104        let job = match endpoint {
105            Endpoint::Tcp(..) => {
106                TcpConnector::connect_async(connection_option, default_timeout).await?
107            }
108            _ => return Err(illegal_argument_error!("unsupported endpoint")),
109        };
110
111        Ok(job)
112    }
113
114    fn check_endpoint(option: &ConnectionOption) -> Result<&Endpoint, TgError> {
115        let endpoint = option
116            .endpoint()
117            .ok_or(illegal_argument_error!("endpoint not specified"))?;
118
119        Ok(endpoint)
120    }
121
122    /// Get user name.
123    ///
124    /// since 0.5.0
125    pub fn user_name(&self) -> Option<String> {
126        self.wire.user_name()
127    }
128
129    /// Checks if the session has an encryption key.
130    ///
131    /// For internal use.
132    ///
133    /// since 0.5.0
134    pub async fn has_encryption_key(&self) -> bool {
135        self.wire.has_encryption_key().await
136    }
137
138    /// Set default timeout.
139    pub fn set_default_timeout(&self, timeout: Duration) {
140        let mut default_timeout = self.default_timeout.write().unwrap();
141        *default_timeout = timeout;
142    }
143
144    /// Get default timeout.
145    pub fn default_timeout(&self) -> Duration {
146        let default_timeout = self.default_timeout.read().unwrap();
147        *default_timeout
148    }
149
150    /// Creates a service client.
151    ///
152    /// # Examples
153    /// ```
154    /// use std::sync::Arc;
155    /// use tsubakuro_rust_core::prelude::*;
156    ///
157    /// fn example(session: &Arc<Session>) {
158    ///     let client: SqlClient = session.make_client();
159    /// }
160    /// ```
161    pub fn make_client<T: ServiceClient>(self: &Arc<Session>) -> T {
162        T::new(self.clone())
163    }
164
165    /// Requests to update the session expiration time.
166    ///
167    /// The resources underlying this session will be disposed after this session was expired.
168    /// To extend the expiration time, clients should continue to send requests in this session, or update expiration time explicitly by using this method.
169    ///
170    /// If the specified expiration time is too long, the server will automatically shorten it to its limit.
171    /// Or, if the time is too short or less than zero, the server sometimes omits the request.
172    pub async fn update_expiration_time(
173        &self,
174        expiration_time: Option<Duration>,
175    ) -> Result<(), TgError> {
176        let timeout = self.default_timeout();
177        self.update_expiration_time_for(expiration_time, timeout)
178            .await
179    }
180
181    /// Requests to update the session expiration time.
182    ///
183    /// The resources underlying this session will be disposed after this session was expired.
184    /// To extend the expiration time, clients should continue to send requests in this session, or update expiration time explicitly by using this method.
185    ///
186    /// If the specified expiration time is too long, the server will automatically shorten it to its limit.
187    /// Or, if the time is too short or less than zero, the server sometimes omits the request.
188    pub async fn update_expiration_time_for(
189        &self,
190        expiration_time: Option<Duration>,
191        timeout: Duration,
192    ) -> Result<(), TgError> {
193        CoreService::update_expiration_time(&self.wire, expiration_time, timeout).await
194    }
195
196    /// Requests to update the session expiration time.
197    ///
198    /// The resources underlying this session will be disposed after this session was expired.
199    /// To extend the expiration time, clients should continue to send requests in this session, or update expiration time explicitly by using this method.
200    ///
201    /// If the specified expiration time is too long, the server will automatically shorten it to its limit.
202    /// Or, if the time is too short or less than zero, the server sometimes omits the request.
203    pub async fn update_expiration_time_async(
204        &self,
205        expiration_time: Option<Duration>,
206    ) -> Result<Job<()>, TgError> {
207        CoreService::update_expiration_time_async(
208            &self.wire,
209            expiration_time,
210            self.default_timeout(),
211            self.fail_on_drop_error(),
212        )
213        .await
214    }
215
216    pub(crate) fn large_object_path_mapping_on_send(&self) -> &LargeObjectSendPathMapping {
217        &self.lob_send_path_mapping
218    }
219
220    pub(crate) fn large_object_path_mapping_on_recv(&self) -> &LargeObjectRecvPathMapping {
221        &self.lob_recv_path_mapping
222    }
223
224    /// Request to shutdown the current session and wait for the running requests were finished.
225    pub async fn shutdown(&self, shutdown_type: ShutdownType) -> Result<(), TgError> {
226        let timeout = self.default_timeout();
227        self.shutdown_for(shutdown_type, timeout).await
228    }
229
230    /// Request to shutdown the current session and wait for the running requests were finished.
231    pub async fn shutdown_for(
232        &self,
233        shutdown_type: ShutdownType,
234        timeout: Duration,
235    ) -> Result<(), TgError> {
236        self.set_shutdown();
237        CoreService::shutdown(&self.wire, shutdown_type, timeout).await
238    }
239
240    /// Request to shutdown the current session and wait for the running requests were finished.
241    pub async fn shutdown_async(&self, shutdown_type: ShutdownType) -> Result<Job<()>, TgError> {
242        self.set_shutdown();
243        CoreService::shutdown_async(
244            &self.wire,
245            shutdown_type,
246            self.default_timeout(),
247            self.fail_on_drop_error(),
248        )
249        .await
250    }
251
252    fn set_shutdown(&self) {
253        self.shutdowned
254            .store(true, std::sync::atomic::Ordering::SeqCst);
255    }
256
257    /// Check if the session is shut down.
258    pub fn is_shutdowned(&self) -> bool {
259        self.shutdowned.load(std::sync::atomic::Ordering::SeqCst)
260    }
261
262    /// Disposes the current session.
263    ///
264    /// This may not wait for complete the ongoing requests, and it may cause the requests may still be in progress after disconnected from the session.
265    /// You can use [Self::shutdown] to safely close this session with waiting for complete the ongoing requests, if any.
266    ///
267    /// Note: Should invoke `close` before [`Self::drop`] to dispose the session.
268    pub async fn close(&self) -> Result<(), TgError> {
269        self.wire.close().await
270    }
271
272    /// Check if the session is closed.
273    pub fn is_closed(&self) -> bool {
274        self.wire.is_closed()
275    }
276
277    /// for debug
278    #[doc(hidden)]
279    pub fn set_fail_on_drop_error(&self, value: bool) {
280        self.fail_on_drop_error
281            .store(value, std::sync::atomic::Ordering::SeqCst);
282    }
283
284    pub(crate) fn fail_on_drop_error(&self) -> bool {
285        self.fail_on_drop_error
286            .load(std::sync::atomic::Ordering::SeqCst)
287    }
288}
289
290impl Session {
291    pub(crate) fn new(
292        wire: Arc<Wire>,
293        connection_option: &ConnectionOption,
294        default_timeout: Duration,
295    ) -> Arc<Self> {
296        let session = Arc::new(Session {
297            wire,
298            lob_send_path_mapping: connection_option
299                .large_object_path_mapping_on_send()
300                .clone(),
301            lob_recv_path_mapping: connection_option
302                .large_object_path_mapping_on_recv()
303                .clone(),
304            default_timeout: RwLock::new(default_timeout),
305            shutdowned: AtomicBool::new(false),
306            fail_on_drop_error: AtomicBool::new(false),
307        });
308
309        let keep_alive = connection_option.keep_alive();
310        if !keep_alive.is_zero() {
311            let wire = session.wire();
312            tokio::spawn(async move {
313                trace!("session.keep_alive start");
314                loop {
315                    tokio::time::sleep(keep_alive).await;
316
317                    if wire.is_closed() {
318                        trace!("session.keep_alive end");
319                        break;
320                    }
321
322                    let result =
323                        CoreService::update_expiration_time(&wire, None, default_timeout).await;
324                    if let Err(error) = result {
325                        error!("session.keep_alive end. {}", error);
326                        break;
327                    }
328                }
329            });
330        }
331
332        session
333    }
334
335    pub(crate) fn wire(&self) -> Arc<Wire> {
336        self.wire.clone()
337    }
338}
339
340impl Drop for Session {
341    fn drop(&mut self) {
342        if self.is_closed() {
343            return;
344        }
345
346        std::thread::scope(|scope| {
347            scope.spawn(move || {
348                trace!("Session.drop() start");
349                let runtime = {
350                    match tokio::runtime::Runtime::new() {
351                        Ok(runtime) => runtime,
352                        Err(e) => {
353                            error!("Session.drop() runtime::new error. {}", e);
354                            if self.fail_on_drop_error() {
355                                panic!("Session.drop() runtime::new error. {}", e);
356                            }
357                            return;
358                        }
359                    }
360                };
361                runtime.block_on(async {
362                    if let Err(e) = self.close().await {
363                        warn!("Session.drop() close error. {}", e);
364                        if self.fail_on_drop_error() {
365                            panic!("Session.drop() close error. {}", e);
366                        }
367                    }
368                });
369                trace!("Session.drop() end");
370            });
371        });
372    }
373}