tsubakuro_rust_core/session/
session.rs1use std::{
2 sync::{atomic::AtomicBool, Arc, RwLock},
3 time::Duration,
4};
5
6use log::{error, trace, warn};
7
8use crate::{
9 error::TgError,
10 illegal_argument_error,
11 job::Job,
12 prelude::{
13 r#type::large_object::{LargeObjectRecvPathMapping, LargeObjectSendPathMapping},
14 Endpoint, ShutdownType,
15 },
16 service::{core::core_service::CoreService, ServiceClient},
17};
18
19use super::{option::ConnectionOption, tcp::connector::TcpConnector, wire::Wire};
20
21#[derive(Debug)]
46pub struct Session {
47 wire: Arc<Wire>,
48 lob_send_path_mapping: LargeObjectSendPathMapping,
49 lob_recv_path_mapping: LargeObjectRecvPathMapping,
50 default_timeout: RwLock<Duration>,
51 shutdowned: AtomicBool,
52 fail_on_drop_error: AtomicBool,
53}
54
55impl Session {
56 pub async fn connect(connection_option: &ConnectionOption) -> Result<Arc<Session>, TgError> {
73 let timeout = connection_option.default_timeout();
74 Self::connect_for(connection_option, timeout).await
75 }
76
77 pub async fn connect_for(
81 connection_option: &ConnectionOption,
82 timeout: Duration,
83 ) -> Result<Arc<Session>, TgError> {
84 let endpoint = Self::check_endpoint(connection_option)?;
85 let default_timeout = connection_option.default_timeout();
86
87 match endpoint {
88 Endpoint::Tcp(..) => {
89 TcpConnector::connect(connection_option, timeout, default_timeout).await
90 }
91 _ => Err(illegal_argument_error!("unsupported endpoint")),
92 }
93 }
94
95 pub async fn connect_async(
99 connection_option: &ConnectionOption,
100 ) -> Result<Job<Arc<Session>>, TgError> {
101 let endpoint = Self::check_endpoint(connection_option)?;
102 let default_timeout = connection_option.default_timeout();
103
104 let job = match endpoint {
105 Endpoint::Tcp(..) => {
106 TcpConnector::connect_async(connection_option, default_timeout).await?
107 }
108 _ => return Err(illegal_argument_error!("unsupported endpoint")),
109 };
110
111 Ok(job)
112 }
113
114 fn check_endpoint(option: &ConnectionOption) -> Result<&Endpoint, TgError> {
115 let endpoint = option
116 .endpoint()
117 .ok_or(illegal_argument_error!("endpoint not specified"))?;
118
119 Ok(endpoint)
120 }
121
122 pub fn user_name(&self) -> Option<String> {
126 self.wire.user_name()
127 }
128
129 pub async fn has_encryption_key(&self) -> bool {
135 self.wire.has_encryption_key().await
136 }
137
138 pub fn set_default_timeout(&self, timeout: Duration) {
140 let mut default_timeout = self.default_timeout.write().unwrap();
141 *default_timeout = timeout;
142 }
143
144 pub fn default_timeout(&self) -> Duration {
146 let default_timeout = self.default_timeout.read().unwrap();
147 *default_timeout
148 }
149
150 pub fn make_client<T: ServiceClient>(self: &Arc<Session>) -> T {
162 T::new(self.clone())
163 }
164
165 pub async fn update_expiration_time(
173 &self,
174 expiration_time: Option<Duration>,
175 ) -> Result<(), TgError> {
176 let timeout = self.default_timeout();
177 self.update_expiration_time_for(expiration_time, timeout)
178 .await
179 }
180
181 pub async fn update_expiration_time_for(
189 &self,
190 expiration_time: Option<Duration>,
191 timeout: Duration,
192 ) -> Result<(), TgError> {
193 CoreService::update_expiration_time(&self.wire, expiration_time, timeout).await
194 }
195
196 pub async fn update_expiration_time_async(
204 &self,
205 expiration_time: Option<Duration>,
206 ) -> Result<Job<()>, TgError> {
207 CoreService::update_expiration_time_async(
208 &self.wire,
209 expiration_time,
210 self.default_timeout(),
211 self.fail_on_drop_error(),
212 )
213 .await
214 }
215
216 pub(crate) fn large_object_path_mapping_on_send(&self) -> &LargeObjectSendPathMapping {
217 &self.lob_send_path_mapping
218 }
219
220 pub(crate) fn large_object_path_mapping_on_recv(&self) -> &LargeObjectRecvPathMapping {
221 &self.lob_recv_path_mapping
222 }
223
224 pub async fn shutdown(&self, shutdown_type: ShutdownType) -> Result<(), TgError> {
226 let timeout = self.default_timeout();
227 self.shutdown_for(shutdown_type, timeout).await
228 }
229
230 pub async fn shutdown_for(
232 &self,
233 shutdown_type: ShutdownType,
234 timeout: Duration,
235 ) -> Result<(), TgError> {
236 self.set_shutdown();
237 CoreService::shutdown(&self.wire, shutdown_type, timeout).await
238 }
239
240 pub async fn shutdown_async(&self, shutdown_type: ShutdownType) -> Result<Job<()>, TgError> {
242 self.set_shutdown();
243 CoreService::shutdown_async(
244 &self.wire,
245 shutdown_type,
246 self.default_timeout(),
247 self.fail_on_drop_error(),
248 )
249 .await
250 }
251
252 fn set_shutdown(&self) {
253 self.shutdowned
254 .store(true, std::sync::atomic::Ordering::SeqCst);
255 }
256
257 pub fn is_shutdowned(&self) -> bool {
259 self.shutdowned.load(std::sync::atomic::Ordering::SeqCst)
260 }
261
262 pub async fn close(&self) -> Result<(), TgError> {
269 self.wire.close().await
270 }
271
272 pub fn is_closed(&self) -> bool {
274 self.wire.is_closed()
275 }
276
277 #[doc(hidden)]
279 pub fn set_fail_on_drop_error(&self, value: bool) {
280 self.fail_on_drop_error
281 .store(value, std::sync::atomic::Ordering::SeqCst);
282 }
283
284 pub(crate) fn fail_on_drop_error(&self) -> bool {
285 self.fail_on_drop_error
286 .load(std::sync::atomic::Ordering::SeqCst)
287 }
288}
289
290impl Session {
291 pub(crate) fn new(
292 wire: Arc<Wire>,
293 connection_option: &ConnectionOption,
294 default_timeout: Duration,
295 ) -> Arc<Self> {
296 let session = Arc::new(Session {
297 wire,
298 lob_send_path_mapping: connection_option
299 .large_object_path_mapping_on_send()
300 .clone(),
301 lob_recv_path_mapping: connection_option
302 .large_object_path_mapping_on_recv()
303 .clone(),
304 default_timeout: RwLock::new(default_timeout),
305 shutdowned: AtomicBool::new(false),
306 fail_on_drop_error: AtomicBool::new(false),
307 });
308
309 let keep_alive = connection_option.keep_alive();
310 if !keep_alive.is_zero() {
311 let wire = session.wire();
312 tokio::spawn(async move {
313 trace!("session.keep_alive start");
314 loop {
315 tokio::time::sleep(keep_alive).await;
316
317 if wire.is_closed() {
318 trace!("session.keep_alive end");
319 break;
320 }
321
322 let result =
323 CoreService::update_expiration_time(&wire, None, default_timeout).await;
324 if let Err(error) = result {
325 error!("session.keep_alive end. {}", error);
326 break;
327 }
328 }
329 });
330 }
331
332 session
333 }
334
335 pub(crate) fn wire(&self) -> Arc<Wire> {
336 self.wire.clone()
337 }
338}
339
340impl Drop for Session {
341 fn drop(&mut self) {
342 if self.is_closed() {
343 return;
344 }
345
346 std::thread::scope(|scope| {
347 scope.spawn(move || {
348 trace!("Session.drop() start");
349 let runtime = {
350 match tokio::runtime::Runtime::new() {
351 Ok(runtime) => runtime,
352 Err(e) => {
353 error!("Session.drop() runtime::new error. {}", e);
354 if self.fail_on_drop_error() {
355 panic!("Session.drop() runtime::new error. {}", e);
356 }
357 return;
358 }
359 }
360 };
361 runtime.block_on(async {
362 if let Err(e) = self.close().await {
363 warn!("Session.drop() close error. {}", e);
364 if self.fail_on_drop_error() {
365 panic!("Session.drop() close error. {}", e);
366 }
367 }
368 });
369 trace!("Session.drop() end");
370 });
371 });
372 }
373}