1use std::marker::PhantomData;
8use std::time::{Duration, Instant};
9
10use async_trait::async_trait;
11use thiserror::Error;
12
13pub mod states {
15 pub struct Acquired;
17
18 pub struct Released;
20
21 pub struct Initializing;
23
24 pub struct Failed;
26}
27
28#[derive(Debug, Error)]
30pub enum ResourceError {
31 #[error("Resource acquisition failed: {0}")]
33 AcquisitionFailed(String),
34
35 #[error("Resource release failed: {0}")]
37 ReleaseFailed(String),
38
39 #[error("Resource is in invalid state: {0}")]
41 InvalidState(String),
42
43 #[error("Resource operation timed out after {duration:?}")]
45 Timeout {
46 duration: Duration,
48 },
49
50 #[error("Resource poisoned: {0}")]
52 Poisoned(String),
53}
54
55pub type ResourceResult<T> = Result<T, ResourceError>;
57
58#[derive(Debug)]
115pub struct Resource<T, S> {
116 inner: T,
117 _state: PhantomData<S>,
118}
119
120impl<T, S> Resource<T, S> {
121 const fn new(inner: T) -> Self {
127 Self {
128 inner,
129 _state: PhantomData,
130 }
131 }
132
133 pub const fn get(&self) -> &T
137 where
138 S: IsAcquired,
139 {
140 &self.inner
141 }
142
143 pub fn get_mut(&mut self) -> &mut T
147 where
148 S: IsAcquired,
149 {
150 &mut self.inner
151 }
152
153 pub fn into_inner(self) -> T
157 where
158 S: IsAcquired,
159 {
160 self.inner
161 }
162}
163
164pub trait IsAcquired: private::Sealed {}
169
170impl IsAcquired for states::Acquired {}
171
172pub trait IsReleasable: private::Sealed {}
176
177impl IsReleasable for states::Acquired {}
178impl IsReleasable for states::Failed {}
179
180pub trait IsRecoverable: private::Sealed {}
184
185impl IsRecoverable for states::Failed {}
186
187mod private {
189 pub trait Sealed {}
190
191 impl Sealed for super::states::Acquired {}
192 impl Sealed for super::states::Released {}
193 impl Sealed for super::states::Initializing {}
194 impl Sealed for super::states::Failed {}
195}
196
197impl<T> Resource<T, states::Initializing> {
199 pub fn mark_acquired(self) -> Resource<T, states::Acquired> {
203 Resource::new(self.inner)
204 }
205
206 pub fn mark_failed(self) -> Resource<T, states::Failed> {
210 Resource::new(self.inner)
211 }
212}
213
214impl<T> Resource<T, states::Acquired> {
215 pub fn release(self) -> ResourceResult<Resource<T, states::Released>> {
219 Ok(Resource::new(self.inner))
222 }
223
224 pub fn mark_failed(self) -> Resource<T, states::Failed> {
228 Resource::new(self.inner)
229 }
230}
231
232impl<T> Resource<T, states::Failed> {
233 pub fn recover(self) -> ResourceResult<Resource<T, states::Acquired>> {
237 Ok(Resource::new(self.inner))
240 }
241
242 pub fn release(self) -> ResourceResult<Resource<T, states::Released>> {
246 Ok(Resource::new(self.inner))
248 }
249}
250
251impl<T> Resource<T, states::Released> {
252 pub const fn is_released(&self) -> bool {
256 true
257 }
258}
259
260#[async_trait]
262pub trait ResourceManager<T> {
263 async fn acquire() -> ResourceResult<Resource<T, states::Acquired>>;
265
266 fn create_initializing(inner: T) -> Resource<T, states::Initializing> {
270 Resource::new(inner)
271 }
272}
273
274pub struct ResourceScope<T> {
279 resource: Option<Resource<T, states::Acquired>>,
280 leaked: bool,
281}
282
283impl<T> ResourceScope<T> {
284 pub const fn new(resource: Resource<T, states::Acquired>) -> Self {
286 Self {
287 resource: Some(resource),
288 leaked: false,
289 }
290 }
291
292 pub fn with_resource<F, R>(&mut self, f: F) -> R
296 where
297 F: FnOnce(&mut Resource<T, states::Acquired>) -> R,
298 {
299 let resource = self
300 .resource
301 .as_mut()
302 .expect("Resource has already been released from scope");
303 f(resource)
304 }
305
306 pub fn release(mut self) -> ResourceResult<()> {
310 if let Some(resource) = self.resource.take() {
311 resource.release()?;
312 }
313 Ok(())
314 }
315
316 pub const fn is_released(&self) -> bool {
318 self.resource.is_none()
319 }
320
321 pub const fn is_leaked(&self) -> bool {
323 self.leaked
324 }
325}
326
327impl<T> Drop for ResourceScope<T> {
328 fn drop(&mut self) {
329 if self.resource.is_some() {
330 self.leaked = true;
331 tracing::error!("ResourceScope dropped without explicit release - resource may leak");
332
333 if let Some(resource) = self.resource.take() {
335 drop(resource);
338 }
339 }
340 }
341}
342
343pub struct TimedResourceGuard<T>
347where
348 T: Send + 'static,
349{
350 resource: Option<Resource<T, states::Acquired>>,
351 timeout: Duration,
352 acquired_at: Instant,
353 cleanup_task: Option<tokio::task::JoinHandle<()>>,
354}
355
356impl<T> TimedResourceGuard<T>
357where
358 T: Send + 'static,
359{
360 pub fn new(resource: Resource<T, states::Acquired>, timeout: Duration) -> Self {
362 let acquired_at = Instant::now();
363
364 Self {
365 resource: Some(resource),
366 timeout,
367 acquired_at,
368 cleanup_task: None,
369 }
370 }
371
372 pub fn new_with_auto_cleanup(resource: Resource<T, states::Acquired>, timeout: Duration) -> Self
374 where
375 T: Send + Sync + 'static,
376 {
377 let acquired_at = Instant::now();
378
379 Self {
383 resource: Some(resource),
384 timeout,
385 acquired_at,
386 cleanup_task: None,
387 }
388 }
389
390 pub fn is_timed_out(&self) -> bool {
392 self.acquired_at.elapsed() > self.timeout
393 }
394
395 pub fn time_remaining(&self) -> Option<Duration> {
397 self.timeout.checked_sub(self.acquired_at.elapsed())
398 }
399
400 pub fn get(&self) -> Option<&T>
402 where
403 T: Send,
404 {
405 if self.is_timed_out() {
406 None
407 } else {
408 self.resource.as_ref().map(Resource::get)
409 }
410 }
411
412 pub fn release(mut self) -> ResourceResult<()> {
414 if let Some(cleanup_task) = self.cleanup_task.take() {
415 cleanup_task.abort();
416 }
417
418 if let Some(resource) = self.resource.take() {
419 if self.is_timed_out() {
420 return Err(ResourceError::Timeout {
421 duration: self.acquired_at.elapsed(),
422 });
423 }
424 resource.release()?;
425 }
426 Ok(())
427 }
428}
429
430impl<T> Drop for TimedResourceGuard<T>
431where
432 T: Send + 'static,
433{
434 fn drop(&mut self) {
435 if let Some(cleanup_task) = self.cleanup_task.take() {
436 cleanup_task.abort();
437 }
438
439 if self.resource.is_some() {
440 if self.is_timed_out() {
441 tracing::error!(
442 "TimedResourceGuard dropped after timeout of {:?} - resource forcibly cleaned up",
443 self.timeout
444 );
445 } else {
446 tracing::warn!("TimedResourceGuard dropped before timeout - resource may leak");
447 }
448 }
449 }
450}
451
452#[derive(Debug, Default)]
454pub struct ResourceLeakDetector {
455 active_resources: std::sync::Mutex<std::collections::HashMap<String, ResourceInfo>>,
456}
457
458#[derive(Debug, Clone)]
459struct ResourceInfo {
460 resource_type: String,
461 acquired_at: Instant,
462 location: Option<String>,
463}
464
465impl ResourceLeakDetector {
466 pub fn new() -> Self {
468 Self::default()
469 }
470
471 pub fn register_acquisition(
473 &self,
474 resource_id: &str,
475 resource_type: &str,
476 location: Option<String>,
477 ) {
478 if let Ok(mut resources) = self.active_resources.lock() {
479 resources.insert(
480 resource_id.to_string(),
481 ResourceInfo {
482 resource_type: resource_type.to_string(),
483 acquired_at: Instant::now(),
484 location,
485 },
486 );
487 }
488 }
489
490 pub fn register_release(&self, resource_id: &str) {
492 if let Ok(mut resources) = self.active_resources.lock() {
493 resources.remove(resource_id);
494 }
495 }
496
497 pub fn get_stats(&self) -> ResourceLeakStats {
499 self.active_resources.lock().map_or_else(
500 |_| ResourceLeakStats::default(),
501 |resources| {
502 let total_count = resources.len();
503 let mut by_type = std::collections::HashMap::new();
504 let mut oldest_age = Duration::ZERO;
505
506 for info in resources.values() {
507 *by_type.entry(info.resource_type.clone()).or_insert(0) += 1;
508 let age = info.acquired_at.elapsed();
509 if age > oldest_age {
510 oldest_age = age;
511 }
512 }
513
514 ResourceLeakStats {
515 total_active: total_count,
516 by_type,
517 oldest_resource_age: oldest_age,
518 }
519 },
520 )
521 }
522
523 pub fn find_potential_leaks(&self, threshold: Duration) -> Vec<String> {
525 self.active_resources.lock().map_or_else(
526 |_| Vec::new(),
527 |resources| {
528 resources
529 .iter()
530 .filter(|(_, info)| info.acquired_at.elapsed() > threshold)
531 .map(|(id, _)| id.clone())
532 .collect()
533 },
534 )
535 }
536}
537
538#[derive(Debug, Default)]
540pub struct ResourceLeakStats {
541 pub total_active: usize,
543 pub by_type: std::collections::HashMap<String, usize>,
545 pub oldest_resource_age: Duration,
547}
548
549static GLOBAL_LEAK_DETECTOR: std::sync::OnceLock<ResourceLeakDetector> = std::sync::OnceLock::new();
551
552pub fn global_leak_detector() -> &'static ResourceLeakDetector {
554 GLOBAL_LEAK_DETECTOR.get_or_init(ResourceLeakDetector::new)
555}
556
557pub struct ManagedResource<T, S> {
562 inner: Option<Resource<T, S>>,
563 resource_id: String,
564 cleanup_registered: bool,
565}
566
567impl<T, S> ManagedResource<T, S> {
568 pub fn new(resource: Resource<T, S>, resource_type: &str) -> Self {
570 let resource_id = format!(
571 "{}_{}",
572 resource_type,
573 uuid::Uuid::new_v7(uuid::Timestamp::now(uuid::NoContext))
574 );
575
576 global_leak_detector().register_acquisition(
578 &resource_id,
579 resource_type,
580 Some(format!("{}:{}:{}", file!(), line!(), column!())),
581 );
582
583 Self {
584 inner: Some(resource),
585 resource_id,
586 cleanup_registered: true,
587 }
588 }
589
590 pub const fn get(&self) -> Option<&Resource<T, S>> {
592 self.inner.as_ref()
593 }
594
595 pub fn take(mut self) -> Option<Resource<T, S>> {
597 if self.cleanup_registered {
598 global_leak_detector().register_release(&self.resource_id);
599 self.cleanup_registered = false;
600 }
601 self.inner.take()
602 }
603}
604
605impl<T, S> Drop for ManagedResource<T, S> {
606 fn drop(&mut self) {
607 if self.cleanup_registered {
608 global_leak_detector().register_release(&self.resource_id);
609 }
610
611 if self.inner.is_some() {
612 tracing::debug!("ManagedResource dropped with resource still present");
613 }
614 }
615}
616
617pub trait ResourceExt<T, S>: Sized {
619 fn managed(self, resource_type: &str) -> ManagedResource<T, S>;
621
622 fn scoped(self) -> ResourceScope<T>
624 where
625 S: IsAcquired;
626
627 fn with_timeout(self, timeout: Duration) -> TimedResourceGuard<T>
629 where
630 S: IsAcquired,
631 T: Send + 'static;
632}
633
634impl<T> ResourceExt<T, states::Acquired> for Resource<T, states::Acquired> {
636 fn managed(self, resource_type: &str) -> ManagedResource<T, states::Acquired> {
637 ManagedResource::new(self, resource_type)
638 }
639
640 fn scoped(self) -> ResourceScope<T> {
641 ResourceScope::new(self)
642 }
643
644 fn with_timeout(self, timeout: Duration) -> TimedResourceGuard<T>
645 where
646 T: Send + 'static,
647 {
648 TimedResourceGuard::new(self, timeout)
649 }
650}
651
652impl<T, S> Resource<T, S> {
654 pub fn managed(self, resource_type: &str) -> ManagedResource<T, S> {
656 ManagedResource::new(self, resource_type)
657 }
658}
659
660#[cfg(feature = "postgres")]
662pub mod database {
663 use super::{async_trait, states, Resource, ResourceError, ResourceManager, ResourceResult};
664 use sqlx::{PgPool, Postgres, Transaction};
665 use std::sync::Arc;
666 use std::time::{Duration, Instant};
667
668 pub type DatabasePool = Resource<Arc<PgPool>, states::Acquired>;
670
671 pub type DatabaseTransaction<'a> = Resource<Transaction<'a, Postgres>, states::Acquired>;
673
674 pub type DatabaseConnection = Resource<sqlx::pool::PoolConnection<Postgres>, states::Acquired>;
676
677 pub struct DatabaseResourceManager {
679 pool: Arc<PgPool>,
680 last_health_check: std::sync::Mutex<Instant>,
681 health_check_interval: Duration,
682 }
683
684 impl DatabaseResourceManager {
685 pub fn new(pool: Arc<PgPool>) -> Self {
687 Self {
688 pool,
689 last_health_check: std::sync::Mutex::new(Instant::now()),
690 health_check_interval: Duration::from_secs(30),
691 }
692 }
693
694 pub fn new_with_health_interval(
696 pool: Arc<PgPool>,
697 health_check_interval: Duration,
698 ) -> Self {
699 Self {
700 pool,
701 last_health_check: std::sync::Mutex::new(Instant::now()),
702 health_check_interval,
703 }
704 }
705
706 fn needs_health_check(&self) -> bool {
708 self.last_health_check.lock().map_or(true, |last_check| {
709 last_check.elapsed() > self.health_check_interval
710 })
711 }
712
713 async fn perform_health_check(&self) -> ResourceResult<()> {
715 sqlx::query("SELECT 1")
717 .execute(self.pool.as_ref())
718 .await
719 .map_err(|e| {
720 ResourceError::AcquisitionFailed(format!("Health check failed: {e}"))
721 })?;
722
723 if let Ok(mut last_check) = self.last_health_check.lock() {
725 *last_check = Instant::now();
726 }
727
728 Ok(())
729 }
730 }
731
732 #[async_trait]
733 impl ResourceManager<Arc<PgPool>> for DatabaseResourceManager {
734 async fn acquire() -> ResourceResult<Resource<Arc<PgPool>, states::Acquired>> {
735 Err(ResourceError::AcquisitionFailed(
737 "DatabaseResourceManager::acquire requires instance method".to_string(),
738 ))
739 }
740 }
741
742 impl DatabaseResourceManager {
743 pub async fn acquire_pool(&self) -> ResourceResult<DatabasePool> {
745 if self.needs_health_check() {
747 self.perform_health_check().await?;
748 }
749
750 if self.pool.is_closed() {
752 return Err(ResourceError::AcquisitionFailed(
753 "Connection pool is closed".to_string(),
754 ));
755 }
756
757 Ok(Resource::new(Arc::clone(&self.pool)))
758 }
759
760 pub async fn acquire_connection(&self) -> ResourceResult<DatabaseConnection> {
762 if self.needs_health_check() {
764 self.perform_health_check().await?;
765 }
766
767 let connection = self.pool.acquire().await.map_err(|e| {
769 ResourceError::AcquisitionFailed(format!("Failed to acquire connection: {e}"))
770 })?;
771
772 Ok(Resource::new(connection))
773 }
774
775 pub async fn begin_transaction(&self) -> ResourceResult<DatabaseTransaction<'_>> {
777 if self.needs_health_check() {
779 self.perform_health_check().await?;
780 }
781
782 let transaction = self.pool.begin().await.map_err(|e| {
784 ResourceError::AcquisitionFailed(format!("Failed to begin transaction: {e}"))
785 })?;
786
787 Ok(Resource::new(transaction))
788 }
789 }
790
791 impl DatabasePool {
793 pub async fn execute_query(
797 &self,
798 query: &str,
799 ) -> ResourceResult<sqlx::postgres::PgQueryResult> {
800 sqlx::query(query)
801 .execute(self.get().as_ref())
802 .await
803 .map_err(|e| ResourceError::InvalidState(format!("Query execution failed: {e}")))
804 }
805
806 pub async fn fetch_one(&self, query: &str) -> ResourceResult<sqlx::postgres::PgRow> {
810 sqlx::query(query)
811 .fetch_one(self.get().as_ref())
812 .await
813 .map_err(|e| ResourceError::InvalidState(format!("Query fetch failed: {e}")))
814 }
815
816 pub const fn pool(&self) -> &Arc<PgPool> {
820 self.get()
821 }
822
823 pub fn pool_stats(&self) -> PoolStats {
827 let pool = self.get();
828 PoolStats {
829 size: pool.size(),
830 idle: u32::try_from(pool.num_idle()).unwrap_or(u32::MAX),
831 is_closed: pool.is_closed(),
832 }
833 }
834 }
835
836 impl DatabaseConnection {
838 pub async fn execute_query(
842 &mut self,
843 query: &str,
844 ) -> ResourceResult<sqlx::postgres::PgQueryResult> {
845 sqlx::query(query)
846 .execute(&mut **self.get_mut())
847 .await
848 .map_err(|e| ResourceError::InvalidState(format!("Query execution failed: {e}")))
849 }
850
851 }
855
856 impl DatabaseTransaction<'_> {
858 pub async fn execute_query(
862 &mut self,
863 query: &str,
864 ) -> ResourceResult<sqlx::postgres::PgQueryResult> {
865 sqlx::query(query)
866 .execute(&mut **self.get_mut())
867 .await
868 .map_err(|e| ResourceError::InvalidState(format!("Transaction query failed: {e}")))
869 }
870
871 pub async fn commit(self) -> ResourceResult<Resource<(), states::Released>> {
876 self.into_inner().commit().await.map_err(|e| {
877 ResourceError::ReleaseFailed(format!("Transaction commit failed: {e}"))
878 })?;
879
880 Ok(Resource::new(()))
881 }
882
883 pub async fn rollback(self) -> ResourceResult<Resource<(), states::Released>> {
888 self.into_inner().rollback().await.map_err(|e| {
889 ResourceError::ReleaseFailed(format!("Transaction rollback failed: {e}"))
890 })?;
891
892 Ok(Resource::new(()))
893 }
894 }
895
896 #[derive(Debug, Clone)]
898 pub struct PoolStats {
899 pub size: u32,
901 pub idle: u32,
903 pub is_closed: bool,
905 }
906
907 pub struct DatabaseResourceFactory;
909
910 impl DatabaseResourceFactory {
911 pub fn from_pool(pool: Arc<PgPool>) -> DatabaseResourceManager {
913 DatabaseResourceManager::new(pool)
914 }
915
916 pub fn from_pool_with_health_interval(
918 pool: Arc<PgPool>,
919 health_check_interval: Duration,
920 ) -> DatabaseResourceManager {
921 DatabaseResourceManager::new_with_health_interval(pool, health_check_interval)
922 }
923 }
924}
925
926pub mod locking {
928 use super::{states, PhantomData, Resource, ResourceError, ResourceResult};
929 use std::sync::{Arc, Mutex, MutexGuard};
930
931 pub type MutexResource<T> = Resource<Arc<Mutex<T>>, states::Acquired>;
933
934 pub struct MutexGuardResource<'a, T> {
938 guard: MutexGuard<'a, T>,
939 _phantom: PhantomData<states::Acquired>,
940 }
941
942 impl<'a, T> MutexGuardResource<'a, T> {
943 const fn new(guard: MutexGuard<'a, T>) -> Self {
945 Self {
946 guard,
947 _phantom: PhantomData,
948 }
949 }
950
951 pub fn get(&self) -> &T {
953 &self.guard
954 }
955
956 pub fn get_mut(&mut self) -> &mut T {
958 &mut self.guard
959 }
960 }
961
962 impl<T> MutexResource<T> {
964 pub fn lock(&self) -> ResourceResult<MutexGuardResource<'_, T>> {
968 let guard = self
969 .get()
970 .lock()
971 .map_err(|e| ResourceError::Poisoned(format!("Mutex poisoned: {e}")))?;
972 Ok(MutexGuardResource::new(guard))
973 }
974
975 pub fn try_lock(&self) -> ResourceResult<Option<MutexGuardResource<'_, T>>> {
979 match self.get().try_lock() {
980 Ok(guard) => Ok(Some(MutexGuardResource::new(guard))),
981 Err(std::sync::TryLockError::WouldBlock) => Ok(None),
982 Err(std::sync::TryLockError::Poisoned(e)) => {
983 Err(ResourceError::Poisoned(format!("Mutex poisoned: {e}")))
984 }
985 }
986 }
987 }
988
989 pub fn create_mutex_resource<T>(data: T) -> MutexResource<T> {
991 Resource::new(Arc::new(Mutex::new(data)))
992 }
993}
994
995#[cfg(test)]
996mod tests {
997 use super::states::*;
998 use super::{
999 global_leak_detector, locking, IsAcquired, IsReleasable, ManagedResource, Resource,
1000 ResourceError, ResourceExt, ResourceLeakDetector, ResourceScope, TimedResourceGuard,
1001 };
1002 use std::sync::atomic::{AtomicUsize, Ordering};
1003 use std::sync::Arc;
1004 use std::time::Duration;
1005 use tokio::time::{sleep, timeout};
1006
1007 #[test]
1008 fn test_resource_state_transitions() {
1009 let initializing = Resource::<String, Initializing>::new("test".to_string());
1011
1012 let acquired = initializing.mark_acquired();
1014 assert_eq!(acquired.get(), "test");
1015
1016 let failed = acquired.mark_failed();
1018
1019 let recovered = failed.recover().unwrap();
1021 assert_eq!(recovered.get(), "test");
1022
1023 let released = recovered.release().unwrap();
1025 assert!(released.is_released());
1026 }
1027
1028 #[test]
1029 fn test_resource_scope() {
1030 let resource = Resource::<String, Acquired>::new("test".to_string());
1031 let mut scope = ResourceScope::new(resource);
1032
1033 scope.with_resource(|r| {
1035 assert_eq!(r.get(), "test");
1036 });
1037
1038 assert!(!scope.is_released());
1040 assert!(!scope.is_leaked());
1041
1042 scope.release().unwrap();
1044 }
1045
1046 #[test]
1047 fn test_resource_scope_automatic_cleanup() {
1048 let leaked_flag = Arc::new(AtomicUsize::new(0));
1049
1050 {
1052 let resource = Resource::<String, Acquired>::new("test".to_string());
1053 let _scope = ResourceScope::new(resource);
1054
1055 leaked_flag.store(1, Ordering::SeqCst);
1057
1058 }
1060
1061 assert_eq!(leaked_flag.load(Ordering::SeqCst), 1);
1063 }
1064
1065 #[tokio::test]
1066 async fn test_timed_resource_guard() {
1067 let resource = Resource::<String, Acquired>::new("test".to_string());
1068 let timeout_duration = Duration::from_millis(100);
1069 let guard = TimedResourceGuard::new(resource, timeout_duration);
1070
1071 assert!(guard.get().is_some());
1073 assert!(!guard.is_timed_out());
1074 assert!(guard.time_remaining().is_some());
1075
1076 sleep(Duration::from_millis(150)).await;
1078
1079 assert!(guard.is_timed_out());
1081 assert!(guard.get().is_none());
1082 assert!(guard.time_remaining().is_none());
1083
1084 match guard.release() {
1086 Err(ResourceError::Timeout { .. }) => {
1087 }
1089 other => panic!("Expected timeout error, got: {other:?}"),
1090 }
1091 }
1092
1093 #[tokio::test]
1094 async fn test_timed_resource_guard_early_release() {
1095 let resource = Resource::<String, Acquired>::new("test".to_string());
1096 let guard = TimedResourceGuard::new(resource, Duration::from_secs(10));
1097
1098 assert!(guard.get().is_some());
1100 assert!(!guard.is_timed_out());
1101
1102 guard.release().unwrap();
1104 }
1105
1106 #[test]
1107 fn test_mutex_resource() {
1108 let mutex_resource = locking::create_mutex_resource(42i32);
1109
1110 let guard = mutex_resource.lock().unwrap();
1112 assert_eq!(*guard.get(), 42);
1113
1114 assert!(mutex_resource.try_lock().unwrap().is_none()); drop(guard);
1119
1120 assert!(mutex_resource.try_lock().unwrap().is_some());
1122 }
1123
1124 #[test]
1125 fn test_mutex_resource_mutable_access() {
1126 let mutex_resource = locking::create_mutex_resource(42i32);
1127
1128 {
1130 let mut guard = mutex_resource.lock().unwrap();
1131 *guard.get_mut() = 100;
1132 }
1133
1134 assert_eq!(*mutex_resource.lock().unwrap().get(), 100);
1136 }
1137
1138 #[test]
1139 fn test_resource_leak_detector() {
1140 let detector = ResourceLeakDetector::new();
1141
1142 let initial_stats = detector.get_stats();
1144 assert_eq!(initial_stats.total_active, 0);
1145 assert!(initial_stats.by_type.is_empty());
1146
1147 detector.register_acquisition("res1", "DatabasePool", Some("test_location".to_string()));
1149 detector.register_acquisition("res2", "DatabasePool", None);
1150 detector.register_acquisition("res3", "MutexLock", None);
1151
1152 let stats = detector.get_stats();
1153 assert_eq!(stats.total_active, 3);
1154 assert_eq!(stats.by_type.get("DatabasePool"), Some(&2));
1155 assert_eq!(stats.by_type.get("MutexLock"), Some(&1));
1156
1157 detector.register_release("res1");
1159
1160 let stats = detector.get_stats();
1161 assert_eq!(stats.total_active, 2);
1162 assert_eq!(stats.by_type.get("DatabasePool"), Some(&1));
1163
1164 let leaks = detector.find_potential_leaks(Duration::from_secs(1));
1166 assert!(leaks.is_empty());
1167
1168 detector.register_release("res2");
1170 detector.register_release("res3");
1171
1172 let final_stats = detector.get_stats();
1173 assert_eq!(final_stats.total_active, 0);
1174 }
1175
1176 #[test]
1177 fn test_global_leak_detector() {
1178 let initial_count = global_leak_detector().get_stats().total_active;
1179
1180 global_leak_detector().register_acquisition("global_test", "TestResource", None);
1182
1183 let stats = global_leak_detector().get_stats();
1184 assert_eq!(stats.total_active, initial_count + 1);
1185
1186 global_leak_detector().register_release("global_test");
1188
1189 let final_stats = global_leak_detector().get_stats();
1190 assert_eq!(final_stats.total_active, initial_count);
1191 }
1192
1193 #[test]
1194 fn test_managed_resource() {
1195 let initial_count = global_leak_detector().get_stats().total_active;
1196
1197 let resource = Resource::<String, Acquired>::new("test".to_string());
1199 let managed = ManagedResource::new(resource, "TestResource");
1200
1201 let stats = global_leak_detector().get_stats();
1203 assert!(stats.total_active > initial_count);
1204 assert!(stats.by_type.contains_key("TestResource"));
1205
1206 assert!(managed.get().is_some());
1208
1209 let taken = managed.take();
1211 assert!(taken.is_some());
1212
1213 let final_stats = global_leak_detector().get_stats();
1215 assert_eq!(final_stats.total_active, initial_count);
1216 }
1217
1218 #[test]
1219 fn test_managed_resource_drop_cleanup() {
1220 let initial_count = global_leak_detector().get_stats().total_active;
1221
1222 {
1224 let resource = Resource::<String, Acquired>::new("test".to_string());
1225 let _managed = ManagedResource::new(resource, "TestResource");
1226
1227 let stats = global_leak_detector().get_stats();
1229 assert!(stats.total_active > initial_count);
1230 } let final_stats = global_leak_detector().get_stats();
1234 assert_eq!(final_stats.total_active, initial_count);
1235 }
1236
1237 #[test]
1238 fn test_resource_extension_traits() {
1239 let resource = Resource::<String, Acquired>::new("test".to_string());
1240
1241 let managed = resource.managed("TestResource");
1243 assert!(managed.get().is_some());
1244
1245 let resource2 = Resource::<String, Acquired>::new("test2".to_string());
1246
1247 let scope = resource2.scoped();
1249 assert!(!scope.is_released());
1250
1251 let resource3 = Resource::<String, Acquired>::new("test3".to_string());
1252
1253 let timed = resource3.with_timeout(Duration::from_secs(1));
1255 assert!(timed.get().is_some());
1256 }
1257
1258 #[tokio::test]
1259 async fn test_resource_state_machine_invalid_transitions() {
1260 let released = Resource::<String, Released>::new("test".to_string());
1263 assert!(released.is_released());
1264
1265 }
1270
1271 #[tokio::test]
1272 async fn test_concurrent_resource_access() {
1273 let resource = locking::create_mutex_resource(0i32);
1274 let resource = Arc::new(resource);
1275
1276 let mut handles = vec![];
1278 for _ in 0..10 {
1279 let resource_clone = resource.clone();
1280 let handle = tokio::spawn(async move {
1281 let mut guard = resource_clone.lock().unwrap();
1282 let current = *guard.get();
1283 *guard.get_mut() = current + 1;
1284 });
1286 handles.push(handle);
1287 }
1288
1289 for handle in handles {
1291 handle.await.unwrap();
1292 }
1293
1294 assert_eq!(*resource.lock().unwrap().get(), 10);
1296 }
1297
1298 #[tokio::test]
1299 async fn test_resource_timeout_behavior() {
1300 let resource = Resource::<String, Acquired>::new("test".to_string());
1301 let guard = TimedResourceGuard::new(resource, Duration::from_millis(50));
1302
1303 let result = timeout(Duration::from_millis(100), async {
1305 while !guard.is_timed_out() {
1306 sleep(Duration::from_millis(10)).await;
1307 }
1308 })
1309 .await;
1310
1311 assert!(result.is_ok(), "Timeout should have occurred within 100ms");
1312 assert!(guard.is_timed_out());
1313 }
1314
1315 #[test]
1316 fn test_resource_error_types() {
1317 let acquisition_error = ResourceError::AcquisitionFailed("test".to_string());
1319 assert!(matches!(
1320 acquisition_error,
1321 ResourceError::AcquisitionFailed(_)
1322 ));
1323
1324 let release_error = ResourceError::ReleaseFailed("test".to_string());
1325 assert!(matches!(release_error, ResourceError::ReleaseFailed(_)));
1326
1327 let invalid_state_error = ResourceError::InvalidState("test".to_string());
1328 assert!(matches!(
1329 invalid_state_error,
1330 ResourceError::InvalidState(_)
1331 ));
1332
1333 let timeout_error = ResourceError::Timeout {
1334 duration: Duration::from_secs(1),
1335 };
1336 assert!(matches!(timeout_error, ResourceError::Timeout { .. }));
1337
1338 let poisoned_error = ResourceError::Poisoned("test".to_string());
1339 assert!(matches!(poisoned_error, ResourceError::Poisoned(_)));
1340 }
1341
1342 #[test]
1343 fn test_sealed_trait_pattern() {
1344 fn test_acquired<S: IsAcquired>(_state: std::marker::PhantomData<S>) {
1348 }
1350
1351 fn test_releasable<S: IsReleasable>(_state: std::marker::PhantomData<S>) {
1352 }
1354
1355 test_acquired(std::marker::PhantomData::<Acquired>);
1357 test_releasable(std::marker::PhantomData::<Acquired>);
1358 test_releasable(std::marker::PhantomData::<Failed>);
1359
1360 }
1364
1365 #[test]
1366 fn test_compilation_errors() {
1367 let _resource = Resource::<String, Released>::new("test".to_string());
1372 }
1380}
1381
1382pub mod integration {
1384 use super::{states, Resource};
1385
1386 pub type EventStoreResource<ES> = Resource<ES, states::Acquired>;
1388
1389 pub type SubscriptionResource<S> = Resource<S, states::Acquired>;
1391
1392 pub trait EventStoreResourceExt<ES> {
1394 fn into_resource(self) -> EventStoreResource<ES>;
1396 }
1397
1398 impl<ES> EventStoreResourceExt<ES> for ES {
1399 fn into_resource(self) -> EventStoreResource<ES> {
1400 Resource::new(self)
1401 }
1402 }
1403
1404 pub trait SubscriptionResourceExt<S> {
1406 fn into_resource(self) -> SubscriptionResource<S>;
1408 }
1409
1410 impl<S> SubscriptionResourceExt<S> for S {
1411 fn into_resource(self) -> SubscriptionResource<S> {
1412 Resource::new(self)
1413 }
1414 }
1415}