eventcore/
resource.rs

1//! Resource acquisition and release with phantom type safety
2//!
3//! This module provides compile-time guarantees for resource lifecycle management.
4//! Resources must be acquired before use and cannot be used after release.
5//! The type system prevents use-after-release and double-release errors.
6
7use std::marker::PhantomData;
8use std::time::{Duration, Instant};
9
10use async_trait::async_trait;
11use thiserror::Error;
12
13/// Phantom type markers for resource states
14pub mod states {
15    /// Resource has been acquired and is ready for use
16    pub struct Acquired;
17
18    /// Resource has been released and cannot be used
19    pub struct Released;
20
21    /// Resource is in an intermediate state (e.g., during initialization)
22    pub struct Initializing;
23
24    /// Resource has failed and requires recovery
25    pub struct Failed;
26}
27
28/// Errors that can occur during resource operations
29#[derive(Debug, Error)]
30pub enum ResourceError {
31    /// Resource acquisition failed
32    #[error("Resource acquisition failed: {0}")]
33    AcquisitionFailed(String),
34
35    /// Resource release failed
36    #[error("Resource release failed: {0}")]
37    ReleaseFailed(String),
38
39    /// Resource is in invalid state for operation
40    #[error("Resource is in invalid state: {0}")]
41    InvalidState(String),
42
43    /// Resource operation timed out
44    #[error("Resource operation timed out after {duration:?}")]
45    Timeout {
46        /// The duration after which the operation timed out
47        duration: Duration,
48    },
49
50    /// Resource has been poisoned due to panic
51    #[error("Resource poisoned: {0}")]
52    Poisoned(String),
53}
54
55/// Type alias for resource operation results
56pub type ResourceResult<T> = Result<T, ResourceError>;
57
58/// A resource that enforces acquisition and release through the type system
59///
60/// # Type Parameters
61/// * `T` - The underlying resource type
62/// * `S` - The current state of the resource (phantom type)
63///
64/// # Example
65/// ```rust,no_run
66/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
67/// use eventcore::resource::{Resource, states, ResourceManager};
68/// use std::sync::Arc;
69///
70/// // Example with database pool resource (requires postgres feature)
71/// // This would typically be used with eventcore-postgres crate:
72/// // ```rust,ignore
73/// // use eventcore::resource::database::{DatabaseResourceManager, DatabasePool};
74/// // use sqlx::PgPool;
75/// //
76/// // let pool = Arc::new(PgPool::connect("postgres://localhost/mydb").await?);
77/// // let manager = DatabaseResourceManager::new(pool);
78/// // let db_resource = manager.acquire_pool().await?;
79/// // let result = db_resource.execute_query("SELECT 1").await?;
80/// // let _released = db_resource.release()?;
81/// // ```
82///
83/// // Example with a custom resource type
84/// struct MyResource {
85///     data: String,
86/// }
87///
88/// // Implement ResourceManager for your type
89/// struct MyResourceManager;
90///
91/// #[async_trait::async_trait]
92/// impl ResourceManager<MyResource> for MyResourceManager {
93///     async fn acquire() -> Result<Resource<MyResource, states::Acquired>, eventcore::resource::ResourceError> {
94///         // In practice, you'd acquire from a pool or create the resource
95///         let resource = Self::create_initializing(MyResource { data: "example".to_string() });
96///         Ok(resource.mark_acquired())
97///     }
98/// }
99///
100/// // Use the resource manager to acquire a resource
101/// let resource = MyResourceManager::acquire().await?;
102///
103/// // Access the inner data (only possible in Acquired state)
104/// let data = resource.get();
105/// println!("Resource data: {}", data.data);
106///
107/// // Transition to released state
108/// let _released: Resource<MyResource, states::Released> = resource.release()?;
109/// // released resource cannot be used anymore (compile-time guarantee)
110///
111/// # Ok(())
112/// # }
113/// ```
114#[derive(Debug)]
115pub struct Resource<T, S> {
116    inner: T,
117    _state: PhantomData<S>,
118}
119
120impl<T, S> Resource<T, S> {
121    /// Create a new resource in the given state
122    ///
123    /// # Safety
124    /// This is an internal method and should only be called by the ResourceManager
125    /// with appropriate state validation.
126    const fn new(inner: T) -> Self {
127        Self {
128            inner,
129            _state: PhantomData,
130        }
131    }
132
133    /// Get a reference to the underlying resource
134    ///
135    /// Only available when resource is acquired
136    pub const fn get(&self) -> &T
137    where
138        S: IsAcquired,
139    {
140        &self.inner
141    }
142
143    /// Get a mutable reference to the underlying resource
144    ///
145    /// Only available when resource is acquired
146    pub fn get_mut(&mut self) -> &mut T
147    where
148        S: IsAcquired,
149    {
150        &mut self.inner
151    }
152
153    /// Consume the resource and return the inner value
154    ///
155    /// Only available when resource is acquired
156    pub fn into_inner(self) -> T
157    where
158        S: IsAcquired,
159    {
160        self.inner
161    }
162}
163
164/// Type-level marker trait for acquired resource states
165///
166/// This trait is sealed and can only be implemented for states that represent
167/// an acquired resource (e.g., `Acquired` but not `Released`)
168pub trait IsAcquired: private::Sealed {}
169
170impl IsAcquired for states::Acquired {}
171
172/// Type-level marker trait for releasable resource states
173///
174/// This trait determines which states allow resource release operations
175pub trait IsReleasable: private::Sealed {}
176
177impl IsReleasable for states::Acquired {}
178impl IsReleasable for states::Failed {}
179
180/// Type-level marker trait for recoverable resource states
181///
182/// This trait determines which states allow resource recovery operations
183pub trait IsRecoverable: private::Sealed {}
184
185impl IsRecoverable for states::Failed {}
186
187// Sealed trait pattern to prevent external implementations
188mod private {
189    pub trait Sealed {}
190
191    impl Sealed for super::states::Acquired {}
192    impl Sealed for super::states::Released {}
193    impl Sealed for super::states::Initializing {}
194    impl Sealed for super::states::Failed {}
195}
196
197/// State transitions for resources
198impl<T> Resource<T, states::Initializing> {
199    /// Transition from initializing to acquired state
200    ///
201    /// This represents successful resource acquisition
202    pub fn mark_acquired(self) -> Resource<T, states::Acquired> {
203        Resource::new(self.inner)
204    }
205
206    /// Transition from initializing to failed state
207    ///
208    /// This represents failed resource acquisition
209    pub fn mark_failed(self) -> Resource<T, states::Failed> {
210        Resource::new(self.inner)
211    }
212}
213
214impl<T> Resource<T, states::Acquired> {
215    /// Release the resource, transitioning to released state
216    ///
217    /// After release, the resource cannot be used anymore
218    pub fn release(self) -> ResourceResult<Resource<T, states::Released>> {
219        // Perform any cleanup logic here
220        // For now, we just transition the state
221        Ok(Resource::new(self.inner))
222    }
223
224    /// Mark resource as failed due to an error
225    ///
226    /// Failed resources can be recovered or released
227    pub fn mark_failed(self) -> Resource<T, states::Failed> {
228        Resource::new(self.inner)
229    }
230}
231
232impl<T> Resource<T, states::Failed> {
233    /// Attempt to recover a failed resource
234    ///
235    /// If successful, transitions back to acquired state
236    pub fn recover(self) -> ResourceResult<Resource<T, states::Acquired>> {
237        // Perform recovery logic here
238        // For now, we optimistically assume recovery succeeds
239        Ok(Resource::new(self.inner))
240    }
241
242    /// Release a failed resource
243    ///
244    /// This allows cleanup even when the resource is in a failed state
245    pub fn release(self) -> ResourceResult<Resource<T, states::Released>> {
246        // Perform cleanup of failed resource
247        Ok(Resource::new(self.inner))
248    }
249}
250
251impl<T> Resource<T, states::Released> {
252    /// Check if resource has been released
253    ///
254    /// This is always true for resources in Released state
255    pub const fn is_released(&self) -> bool {
256        true
257    }
258}
259
260/// Trait for types that can manage resource acquisition and release
261#[async_trait]
262pub trait ResourceManager<T> {
263    /// Acquire a resource, returning it in an acquired state
264    async fn acquire() -> ResourceResult<Resource<T, states::Acquired>>;
265
266    /// Create a resource in initializing state
267    ///
268    /// Callers must transition to acquired or failed state
269    fn create_initializing(inner: T) -> Resource<T, states::Initializing> {
270        Resource::new(inner)
271    }
272}
273
274/// Scoped resource management with automatic cleanup
275///
276/// This ensures resources are always released when the scope ends,
277/// even if an error occurs. Supports both async and sync cleanup.
278pub struct ResourceScope<T> {
279    resource: Option<Resource<T, states::Acquired>>,
280    leaked: bool,
281}
282
283impl<T> ResourceScope<T> {
284    /// Create a new resource scope with an acquired resource
285    pub const fn new(resource: Resource<T, states::Acquired>) -> Self {
286        Self {
287            resource: Some(resource),
288            leaked: false,
289        }
290    }
291
292    /// Access the resource within the scope
293    ///
294    /// Panics if the resource has already been released
295    pub fn with_resource<F, R>(&mut self, f: F) -> R
296    where
297        F: FnOnce(&mut Resource<T, states::Acquired>) -> R,
298    {
299        let resource = self
300            .resource
301            .as_mut()
302            .expect("Resource has already been released from scope");
303        f(resource)
304    }
305
306    /// Manually release the resource from the scope
307    ///
308    /// If not called, the resource will be automatically released when dropped
309    pub fn release(mut self) -> ResourceResult<()> {
310        if let Some(resource) = self.resource.take() {
311            resource.release()?;
312        }
313        Ok(())
314    }
315
316    /// Check if the resource has been released
317    pub const fn is_released(&self) -> bool {
318        self.resource.is_none()
319    }
320
321    /// Check if the resource was leaked (dropped without explicit release)
322    pub const fn is_leaked(&self) -> bool {
323        self.leaked
324    }
325}
326
327impl<T> Drop for ResourceScope<T> {
328    fn drop(&mut self) {
329        if self.resource.is_some() {
330            self.leaked = true;
331            tracing::error!("ResourceScope dropped without explicit release - resource may leak");
332
333            // Attempt to perform emergency cleanup if the resource supports it
334            if let Some(resource) = self.resource.take() {
335                // Try to release synchronously if possible
336                // Note: This is a best-effort cleanup for resources that don't require async release
337                drop(resource);
338            }
339        }
340    }
341}
342
343/// Timeout-based resource cleanup guard
344///
345/// Automatically releases resources after a specified timeout if not explicitly released
346pub struct TimedResourceGuard<T>
347where
348    T: Send + 'static,
349{
350    resource: Option<Resource<T, states::Acquired>>,
351    timeout: Duration,
352    acquired_at: Instant,
353    cleanup_task: Option<tokio::task::JoinHandle<()>>,
354}
355
356impl<T> TimedResourceGuard<T>
357where
358    T: Send + 'static,
359{
360    /// Create a new timed resource guard
361    pub fn new(resource: Resource<T, states::Acquired>, timeout: Duration) -> Self {
362        let acquired_at = Instant::now();
363
364        Self {
365            resource: Some(resource),
366            timeout,
367            acquired_at,
368            cleanup_task: None,
369        }
370    }
371
372    /// Create a new timed resource guard with automatic cleanup task
373    pub fn new_with_auto_cleanup(resource: Resource<T, states::Acquired>, timeout: Duration) -> Self
374    where
375        T: Send + Sync + 'static,
376    {
377        let acquired_at = Instant::now();
378
379        // Note: In a real implementation, we'd need a way to cancel the cleanup task
380        // when the resource is manually released. This is a simplified version.
381
382        Self {
383            resource: Some(resource),
384            timeout,
385            acquired_at,
386            cleanup_task: None,
387        }
388    }
389
390    /// Check if the resource has timed out
391    pub fn is_timed_out(&self) -> bool {
392        self.acquired_at.elapsed() > self.timeout
393    }
394
395    /// Get time remaining before timeout
396    pub fn time_remaining(&self) -> Option<Duration> {
397        self.timeout.checked_sub(self.acquired_at.elapsed())
398    }
399
400    /// Access the resource if it hasn't timed out
401    pub fn get(&self) -> Option<&T>
402    where
403        T: Send,
404    {
405        if self.is_timed_out() {
406            None
407        } else {
408            self.resource.as_ref().map(Resource::get)
409        }
410    }
411
412    /// Release the resource manually before timeout
413    pub fn release(mut self) -> ResourceResult<()> {
414        if let Some(cleanup_task) = self.cleanup_task.take() {
415            cleanup_task.abort();
416        }
417
418        if let Some(resource) = self.resource.take() {
419            if self.is_timed_out() {
420                return Err(ResourceError::Timeout {
421                    duration: self.acquired_at.elapsed(),
422                });
423            }
424            resource.release()?;
425        }
426        Ok(())
427    }
428}
429
430impl<T> Drop for TimedResourceGuard<T>
431where
432    T: Send + 'static,
433{
434    fn drop(&mut self) {
435        if let Some(cleanup_task) = self.cleanup_task.take() {
436            cleanup_task.abort();
437        }
438
439        if self.resource.is_some() {
440            if self.is_timed_out() {
441                tracing::error!(
442                    "TimedResourceGuard dropped after timeout of {:?} - resource forcibly cleaned up",
443                    self.timeout
444                );
445            } else {
446                tracing::warn!("TimedResourceGuard dropped before timeout - resource may leak");
447            }
448        }
449    }
450}
451
452/// Resource leak detector for debugging and monitoring
453#[derive(Debug, Default)]
454pub struct ResourceLeakDetector {
455    active_resources: std::sync::Mutex<std::collections::HashMap<String, ResourceInfo>>,
456}
457
458#[derive(Debug, Clone)]
459struct ResourceInfo {
460    resource_type: String,
461    acquired_at: Instant,
462    location: Option<String>,
463}
464
465impl ResourceLeakDetector {
466    /// Create a new leak detector
467    pub fn new() -> Self {
468        Self::default()
469    }
470
471    /// Register a resource acquisition
472    pub fn register_acquisition(
473        &self,
474        resource_id: &str,
475        resource_type: &str,
476        location: Option<String>,
477    ) {
478        if let Ok(mut resources) = self.active_resources.lock() {
479            resources.insert(
480                resource_id.to_string(),
481                ResourceInfo {
482                    resource_type: resource_type.to_string(),
483                    acquired_at: Instant::now(),
484                    location,
485                },
486            );
487        }
488    }
489
490    /// Register a resource release
491    pub fn register_release(&self, resource_id: &str) {
492        if let Ok(mut resources) = self.active_resources.lock() {
493            resources.remove(resource_id);
494        }
495    }
496
497    /// Get statistics about active resources
498    pub fn get_stats(&self) -> ResourceLeakStats {
499        self.active_resources.lock().map_or_else(
500            |_| ResourceLeakStats::default(),
501            |resources| {
502                let total_count = resources.len();
503                let mut by_type = std::collections::HashMap::new();
504                let mut oldest_age = Duration::ZERO;
505
506                for info in resources.values() {
507                    *by_type.entry(info.resource_type.clone()).or_insert(0) += 1;
508                    let age = info.acquired_at.elapsed();
509                    if age > oldest_age {
510                        oldest_age = age;
511                    }
512                }
513
514                ResourceLeakStats {
515                    total_active: total_count,
516                    by_type,
517                    oldest_resource_age: oldest_age,
518                }
519            },
520        )
521    }
522
523    /// Find potentially leaked resources (older than threshold)
524    pub fn find_potential_leaks(&self, threshold: Duration) -> Vec<String> {
525        self.active_resources.lock().map_or_else(
526            |_| Vec::new(),
527            |resources| {
528                resources
529                    .iter()
530                    .filter(|(_, info)| info.acquired_at.elapsed() > threshold)
531                    .map(|(id, _)| id.clone())
532                    .collect()
533            },
534        )
535    }
536}
537
538/// Statistics about resource usage and potential leaks
539#[derive(Debug, Default)]
540pub struct ResourceLeakStats {
541    /// Total number of active resources
542    pub total_active: usize,
543    /// Count of active resources by type
544    pub by_type: std::collections::HashMap<String, usize>,
545    /// Age of the oldest active resource
546    pub oldest_resource_age: Duration,
547}
548
549/// Global resource leak detector instance
550static GLOBAL_LEAK_DETECTOR: std::sync::OnceLock<ResourceLeakDetector> = std::sync::OnceLock::new();
551
552/// Get the global resource leak detector
553pub fn global_leak_detector() -> &'static ResourceLeakDetector {
554    GLOBAL_LEAK_DETECTOR.get_or_init(ResourceLeakDetector::new)
555}
556
557/// Automatic cleanup resource wrapper
558///
559/// This wrapper automatically registers resources for leak detection
560/// and provides cleanup on drop
561pub struct ManagedResource<T, S> {
562    inner: Option<Resource<T, S>>,
563    resource_id: String,
564    cleanup_registered: bool,
565}
566
567impl<T, S> ManagedResource<T, S> {
568    /// Create a new managed resource
569    pub fn new(resource: Resource<T, S>, resource_type: &str) -> Self {
570        let resource_id = format!(
571            "{}_{}",
572            resource_type,
573            uuid::Uuid::new_v7(uuid::Timestamp::now(uuid::NoContext))
574        );
575
576        // Register with leak detector
577        global_leak_detector().register_acquisition(
578            &resource_id,
579            resource_type,
580            Some(format!("{}:{}:{}", file!(), line!(), column!())),
581        );
582
583        Self {
584            inner: Some(resource),
585            resource_id,
586            cleanup_registered: true,
587        }
588    }
589
590    /// Get a reference to the inner resource
591    pub const fn get(&self) -> Option<&Resource<T, S>> {
592        self.inner.as_ref()
593    }
594
595    /// Take the inner resource, transferring ownership
596    pub fn take(mut self) -> Option<Resource<T, S>> {
597        if self.cleanup_registered {
598            global_leak_detector().register_release(&self.resource_id);
599            self.cleanup_registered = false;
600        }
601        self.inner.take()
602    }
603}
604
605impl<T, S> Drop for ManagedResource<T, S> {
606    fn drop(&mut self) {
607        if self.cleanup_registered {
608            global_leak_detector().register_release(&self.resource_id);
609        }
610
611        if self.inner.is_some() {
612            tracing::debug!("ManagedResource dropped with resource still present");
613        }
614    }
615}
616
617/// Extension trait for adding automatic cleanup to resources
618pub trait ResourceExt<T, S>: Sized {
619    /// Wrap in a managed resource for automatic leak detection
620    fn managed(self, resource_type: &str) -> ManagedResource<T, S>;
621
622    /// Wrap in a scoped resource for automatic cleanup
623    fn scoped(self) -> ResourceScope<T>
624    where
625        S: IsAcquired;
626
627    /// Wrap in a timed guard for timeout-based cleanup
628    fn with_timeout(self, timeout: Duration) -> TimedResourceGuard<T>
629    where
630        S: IsAcquired,
631        T: Send + 'static;
632}
633
634// Implementation only for Acquired state to avoid unsafe code and conflicts
635impl<T> ResourceExt<T, states::Acquired> for Resource<T, states::Acquired> {
636    fn managed(self, resource_type: &str) -> ManagedResource<T, states::Acquired> {
637        ManagedResource::new(self, resource_type)
638    }
639
640    fn scoped(self) -> ResourceScope<T> {
641        ResourceScope::new(self)
642    }
643
644    fn with_timeout(self, timeout: Duration) -> TimedResourceGuard<T>
645    where
646        T: Send + 'static,
647    {
648        TimedResourceGuard::new(self, timeout)
649    }
650}
651
652// Implementation for other states (can only use managed)
653impl<T, S> Resource<T, S> {
654    /// Create a managed resource for any state
655    pub fn managed(self, resource_type: &str) -> ManagedResource<T, S> {
656        ManagedResource::new(self, resource_type)
657    }
658}
659
660/// Database connection resource implementation
661#[cfg(feature = "postgres")]
662pub mod database {
663    use super::{async_trait, states, Resource, ResourceError, ResourceManager, ResourceResult};
664    use sqlx::{PgPool, Postgres, Transaction};
665    use std::sync::Arc;
666    use std::time::{Duration, Instant};
667
668    /// A database connection pool wrapped in resource management
669    pub type DatabasePool = Resource<Arc<PgPool>, states::Acquired>;
670
671    /// A database transaction wrapped in resource management
672    pub type DatabaseTransaction<'a> = Resource<Transaction<'a, Postgres>, states::Acquired>;
673
674    /// Database connection wrapped in resource management  
675    pub type DatabaseConnection = Resource<sqlx::pool::PoolConnection<Postgres>, states::Acquired>;
676
677    /// Database pool resource manager with health monitoring
678    pub struct DatabaseResourceManager {
679        pool: Arc<PgPool>,
680        last_health_check: std::sync::Mutex<Instant>,
681        health_check_interval: Duration,
682    }
683
684    impl DatabaseResourceManager {
685        /// Create a new database resource manager
686        pub fn new(pool: Arc<PgPool>) -> Self {
687            Self {
688                pool,
689                last_health_check: std::sync::Mutex::new(Instant::now()),
690                health_check_interval: Duration::from_secs(30),
691            }
692        }
693
694        /// Create a new database resource manager with custom health check interval
695        pub fn new_with_health_interval(
696            pool: Arc<PgPool>,
697            health_check_interval: Duration,
698        ) -> Self {
699            Self {
700                pool,
701                last_health_check: std::sync::Mutex::new(Instant::now()),
702                health_check_interval,
703            }
704        }
705
706        /// Check if health check is needed
707        fn needs_health_check(&self) -> bool {
708            self.last_health_check.lock().map_or(true, |last_check| {
709                last_check.elapsed() > self.health_check_interval
710            })
711        }
712
713        /// Perform health check and update timestamp
714        async fn perform_health_check(&self) -> ResourceResult<()> {
715            // Basic connectivity check
716            sqlx::query("SELECT 1")
717                .execute(self.pool.as_ref())
718                .await
719                .map_err(|e| {
720                    ResourceError::AcquisitionFailed(format!("Health check failed: {e}"))
721                })?;
722
723            // Update health check timestamp
724            if let Ok(mut last_check) = self.last_health_check.lock() {
725                *last_check = Instant::now();
726            }
727
728            Ok(())
729        }
730    }
731
732    #[async_trait]
733    impl ResourceManager<Arc<PgPool>> for DatabaseResourceManager {
734        async fn acquire() -> ResourceResult<Resource<Arc<PgPool>, states::Acquired>> {
735            // Static method - cannot access instance data
736            Err(ResourceError::AcquisitionFailed(
737                "DatabaseResourceManager::acquire requires instance method".to_string(),
738            ))
739        }
740    }
741
742    impl DatabaseResourceManager {
743        /// Acquire a database pool resource with health checking
744        pub async fn acquire_pool(&self) -> ResourceResult<DatabasePool> {
745            // Perform health check if needed
746            if self.needs_health_check() {
747                self.perform_health_check().await?;
748            }
749
750            // Verify pool is not closed
751            if self.pool.is_closed() {
752                return Err(ResourceError::AcquisitionFailed(
753                    "Connection pool is closed".to_string(),
754                ));
755            }
756
757            Ok(Resource::new(Arc::clone(&self.pool)))
758        }
759
760        /// Acquire a single database connection resource
761        pub async fn acquire_connection(&self) -> ResourceResult<DatabaseConnection> {
762            // Perform health check if needed
763            if self.needs_health_check() {
764                self.perform_health_check().await?;
765            }
766
767            // Acquire a connection from the pool
768            let connection = self.pool.acquire().await.map_err(|e| {
769                ResourceError::AcquisitionFailed(format!("Failed to acquire connection: {e}"))
770            })?;
771
772            Ok(Resource::new(connection))
773        }
774
775        /// Begin a transaction with resource management
776        pub async fn begin_transaction(&self) -> ResourceResult<DatabaseTransaction<'_>> {
777            // Perform health check if needed
778            if self.needs_health_check() {
779                self.perform_health_check().await?;
780            }
781
782            // Begin transaction
783            let transaction = self.pool.begin().await.map_err(|e| {
784                ResourceError::AcquisitionFailed(format!("Failed to begin transaction: {e}"))
785            })?;
786
787            Ok(Resource::new(transaction))
788        }
789    }
790
791    /// Extension methods for database pool resources
792    impl DatabasePool {
793        /// Execute a query using the resource
794        ///
795        /// Only available when the resource is acquired
796        pub async fn execute_query(
797            &self,
798            query: &str,
799        ) -> ResourceResult<sqlx::postgres::PgQueryResult> {
800            sqlx::query(query)
801                .execute(self.get().as_ref())
802                .await
803                .map_err(|e| ResourceError::InvalidState(format!("Query execution failed: {e}")))
804        }
805
806        /// Fetch one row from a query
807        ///
808        /// Only available when the resource is acquired
809        pub async fn fetch_one(&self, query: &str) -> ResourceResult<sqlx::postgres::PgRow> {
810            sqlx::query(query)
811                .fetch_one(self.get().as_ref())
812                .await
813                .map_err(|e| ResourceError::InvalidState(format!("Query fetch failed: {e}")))
814        }
815
816        /// Get the connection pool
817        ///
818        /// Only available when the resource is acquired
819        pub const fn pool(&self) -> &Arc<PgPool> {
820            self.get()
821        }
822
823        /// Check pool health
824        ///
825        /// Only available when the resource is acquired
826        pub fn pool_stats(&self) -> PoolStats {
827            let pool = self.get();
828            PoolStats {
829                size: pool.size(),
830                idle: u32::try_from(pool.num_idle()).unwrap_or(u32::MAX),
831                is_closed: pool.is_closed(),
832            }
833        }
834    }
835
836    /// Extension methods for database connection resources
837    impl DatabaseConnection {
838        /// Execute a query using the connection
839        ///
840        /// Only available when the resource is acquired
841        pub async fn execute_query(
842            &mut self,
843            query: &str,
844        ) -> ResourceResult<sqlx::postgres::PgQueryResult> {
845            sqlx::query(query)
846                .execute(&mut **self.get_mut())
847                .await
848                .map_err(|e| ResourceError::InvalidState(format!("Query execution failed: {e}")))
849        }
850
851        // Note: To begin a transaction, use DatabasePool::begin_transaction() instead.
852        // Pool connections should not create their own transactions as this can lead
853        // to lifetime issues and is not the recommended SQLx pattern.
854    }
855
856    /// Extension methods for database transaction resources
857    impl DatabaseTransaction<'_> {
858        /// Execute a query within the transaction
859        ///
860        /// Only available when the resource is acquired
861        pub async fn execute_query(
862            &mut self,
863            query: &str,
864        ) -> ResourceResult<sqlx::postgres::PgQueryResult> {
865            sqlx::query(query)
866                .execute(&mut **self.get_mut())
867                .await
868                .map_err(|e| ResourceError::InvalidState(format!("Transaction query failed: {e}")))
869        }
870
871        /// Commit the transaction
872        ///
873        /// Only available when the resource is acquired
874        /// Consumes the transaction and returns a released resource
875        pub async fn commit(self) -> ResourceResult<Resource<(), states::Released>> {
876            self.into_inner().commit().await.map_err(|e| {
877                ResourceError::ReleaseFailed(format!("Transaction commit failed: {e}"))
878            })?;
879
880            Ok(Resource::new(()))
881        }
882
883        /// Rollback the transaction
884        ///
885        /// Only available when the resource is acquired
886        /// Consumes the transaction and returns a released resource
887        pub async fn rollback(self) -> ResourceResult<Resource<(), states::Released>> {
888            self.into_inner().rollback().await.map_err(|e| {
889                ResourceError::ReleaseFailed(format!("Transaction rollback failed: {e}"))
890            })?;
891
892            Ok(Resource::new(()))
893        }
894    }
895
896    /// Pool statistics for monitoring
897    #[derive(Debug, Clone)]
898    pub struct PoolStats {
899        /// Current pool size
900        pub size: u32,
901        /// Number of idle connections
902        pub idle: u32,
903        /// Whether the pool is closed
904        pub is_closed: bool,
905    }
906
907    /// Factory for creating database resource managers
908    pub struct DatabaseResourceFactory;
909
910    impl DatabaseResourceFactory {
911        /// Create a resource manager from an existing pool
912        pub fn from_pool(pool: Arc<PgPool>) -> DatabaseResourceManager {
913            DatabaseResourceManager::new(pool)
914        }
915
916        /// Create a resource manager with custom health check interval
917        pub fn from_pool_with_health_interval(
918            pool: Arc<PgPool>,
919            health_check_interval: Duration,
920        ) -> DatabaseResourceManager {
921            DatabaseResourceManager::new_with_health_interval(pool, health_check_interval)
922        }
923    }
924}
925
926/// Lock resource implementation with phantom types
927pub mod locking {
928    use super::{states, PhantomData, Resource, ResourceError, ResourceResult};
929    use std::sync::{Arc, Mutex, MutexGuard};
930
931    /// A mutex lock wrapped in resource management
932    pub type MutexResource<T> = Resource<Arc<Mutex<T>>, states::Acquired>;
933
934    /// A mutex guard wrapped in resource management
935    ///
936    /// This ensures the guard is only used while the lock is held
937    pub struct MutexGuardResource<'a, T> {
938        guard: MutexGuard<'a, T>,
939        _phantom: PhantomData<states::Acquired>,
940    }
941
942    impl<'a, T> MutexGuardResource<'a, T> {
943        /// Create a new mutex guard resource
944        const fn new(guard: MutexGuard<'a, T>) -> Self {
945            Self {
946                guard,
947                _phantom: PhantomData,
948            }
949        }
950
951        /// Access the protected data
952        pub fn get(&self) -> &T {
953            &self.guard
954        }
955
956        /// Access the protected data mutably
957        pub fn get_mut(&mut self) -> &mut T {
958            &mut self.guard
959        }
960    }
961
962    /// Extension methods for mutex resources
963    impl<T> MutexResource<T> {
964        /// Acquire the mutex lock
965        ///
966        /// Returns a guard resource that enforces lock is held
967        pub fn lock(&self) -> ResourceResult<MutexGuardResource<'_, T>> {
968            let guard = self
969                .get()
970                .lock()
971                .map_err(|e| ResourceError::Poisoned(format!("Mutex poisoned: {e}")))?;
972            Ok(MutexGuardResource::new(guard))
973        }
974
975        /// Try to acquire the mutex lock without blocking
976        ///
977        /// Returns None if the lock is currently held
978        pub fn try_lock(&self) -> ResourceResult<Option<MutexGuardResource<'_, T>>> {
979            match self.get().try_lock() {
980                Ok(guard) => Ok(Some(MutexGuardResource::new(guard))),
981                Err(std::sync::TryLockError::WouldBlock) => Ok(None),
982                Err(std::sync::TryLockError::Poisoned(e)) => {
983                    Err(ResourceError::Poisoned(format!("Mutex poisoned: {e}")))
984                }
985            }
986        }
987    }
988
989    /// Create a mutex resource
990    pub fn create_mutex_resource<T>(data: T) -> MutexResource<T> {
991        Resource::new(Arc::new(Mutex::new(data)))
992    }
993}
994
995#[cfg(test)]
996mod tests {
997    use super::states::*;
998    use super::{
999        global_leak_detector, locking, IsAcquired, IsReleasable, ManagedResource, Resource,
1000        ResourceError, ResourceExt, ResourceLeakDetector, ResourceScope, TimedResourceGuard,
1001    };
1002    use std::sync::atomic::{AtomicUsize, Ordering};
1003    use std::sync::Arc;
1004    use std::time::Duration;
1005    use tokio::time::{sleep, timeout};
1006
1007    #[test]
1008    fn test_resource_state_transitions() {
1009        // Start with initializing resource
1010        let initializing = Resource::<String, Initializing>::new("test".to_string());
1011
1012        // Can transition to acquired
1013        let acquired = initializing.mark_acquired();
1014        assert_eq!(acquired.get(), "test");
1015
1016        // Can transition to failed
1017        let failed = acquired.mark_failed();
1018
1019        // Failed can be recovered
1020        let recovered = failed.recover().unwrap();
1021        assert_eq!(recovered.get(), "test");
1022
1023        // Can be released
1024        let released = recovered.release().unwrap();
1025        assert!(released.is_released());
1026    }
1027
1028    #[test]
1029    fn test_resource_scope() {
1030        let resource = Resource::<String, Acquired>::new("test".to_string());
1031        let mut scope = ResourceScope::new(resource);
1032
1033        // Can access resource in scope
1034        scope.with_resource(|r| {
1035            assert_eq!(r.get(), "test");
1036        });
1037
1038        // Check scope state
1039        assert!(!scope.is_released());
1040        assert!(!scope.is_leaked());
1041
1042        // Manual release works
1043        scope.release().unwrap();
1044    }
1045
1046    #[test]
1047    fn test_resource_scope_automatic_cleanup() {
1048        let leaked_flag = Arc::new(AtomicUsize::new(0));
1049
1050        // Create scope and let it drop without explicit release
1051        {
1052            let resource = Resource::<String, Acquired>::new("test".to_string());
1053            let _scope = ResourceScope::new(resource);
1054
1055            // Increment counter when scope is created
1056            leaked_flag.store(1, Ordering::SeqCst);
1057
1058            // Scope will be dropped here without explicit release
1059        }
1060
1061        // Verify scope was created (this tests that the test setup works)
1062        assert_eq!(leaked_flag.load(Ordering::SeqCst), 1);
1063    }
1064
1065    #[tokio::test]
1066    async fn test_timed_resource_guard() {
1067        let resource = Resource::<String, Acquired>::new("test".to_string());
1068        let timeout_duration = Duration::from_millis(100);
1069        let guard = TimedResourceGuard::new(resource, timeout_duration);
1070
1071        // Initially should be available
1072        assert!(guard.get().is_some());
1073        assert!(!guard.is_timed_out());
1074        assert!(guard.time_remaining().is_some());
1075
1076        // Wait for timeout
1077        sleep(Duration::from_millis(150)).await;
1078
1079        // Should be timed out
1080        assert!(guard.is_timed_out());
1081        assert!(guard.get().is_none());
1082        assert!(guard.time_remaining().is_none());
1083
1084        // Release should fail with timeout error
1085        match guard.release() {
1086            Err(ResourceError::Timeout { .. }) => {
1087                // Expected timeout error
1088            }
1089            other => panic!("Expected timeout error, got: {other:?}"),
1090        }
1091    }
1092
1093    #[tokio::test]
1094    async fn test_timed_resource_guard_early_release() {
1095        let resource = Resource::<String, Acquired>::new("test".to_string());
1096        let guard = TimedResourceGuard::new(resource, Duration::from_secs(10));
1097
1098        // Should be available
1099        assert!(guard.get().is_some());
1100        assert!(!guard.is_timed_out());
1101
1102        // Release before timeout
1103        guard.release().unwrap();
1104    }
1105
1106    #[test]
1107    fn test_mutex_resource() {
1108        let mutex_resource = locking::create_mutex_resource(42i32);
1109
1110        // Can acquire lock
1111        let guard = mutex_resource.lock().unwrap();
1112        assert_eq!(*guard.get(), 42);
1113
1114        // Lock is exclusive
1115        assert!(mutex_resource.try_lock().unwrap().is_none()); // Should fail to acquire
1116
1117        // Release first lock
1118        drop(guard);
1119
1120        // Now should be able to acquire
1121        assert!(mutex_resource.try_lock().unwrap().is_some());
1122    }
1123
1124    #[test]
1125    fn test_mutex_resource_mutable_access() {
1126        let mutex_resource = locking::create_mutex_resource(42i32);
1127
1128        // Acquire lock and modify data
1129        {
1130            let mut guard = mutex_resource.lock().unwrap();
1131            *guard.get_mut() = 100;
1132        }
1133
1134        // Verify modification
1135        assert_eq!(*mutex_resource.lock().unwrap().get(), 100);
1136    }
1137
1138    #[test]
1139    fn test_resource_leak_detector() {
1140        let detector = ResourceLeakDetector::new();
1141
1142        // Initially empty
1143        let initial_stats = detector.get_stats();
1144        assert_eq!(initial_stats.total_active, 0);
1145        assert!(initial_stats.by_type.is_empty());
1146
1147        // Register some resources
1148        detector.register_acquisition("res1", "DatabasePool", Some("test_location".to_string()));
1149        detector.register_acquisition("res2", "DatabasePool", None);
1150        detector.register_acquisition("res3", "MutexLock", None);
1151
1152        let stats = detector.get_stats();
1153        assert_eq!(stats.total_active, 3);
1154        assert_eq!(stats.by_type.get("DatabasePool"), Some(&2));
1155        assert_eq!(stats.by_type.get("MutexLock"), Some(&1));
1156
1157        // Release one resource
1158        detector.register_release("res1");
1159
1160        let stats = detector.get_stats();
1161        assert_eq!(stats.total_active, 2);
1162        assert_eq!(stats.by_type.get("DatabasePool"), Some(&1));
1163
1164        // Find potential leaks (none yet since resources are new)
1165        let leaks = detector.find_potential_leaks(Duration::from_secs(1));
1166        assert!(leaks.is_empty());
1167
1168        // Clean up
1169        detector.register_release("res2");
1170        detector.register_release("res3");
1171
1172        let final_stats = detector.get_stats();
1173        assert_eq!(final_stats.total_active, 0);
1174    }
1175
1176    #[test]
1177    fn test_global_leak_detector() {
1178        let initial_count = global_leak_detector().get_stats().total_active;
1179
1180        // Register a resource
1181        global_leak_detector().register_acquisition("global_test", "TestResource", None);
1182
1183        let stats = global_leak_detector().get_stats();
1184        assert_eq!(stats.total_active, initial_count + 1);
1185
1186        // Release the resource
1187        global_leak_detector().register_release("global_test");
1188
1189        let final_stats = global_leak_detector().get_stats();
1190        assert_eq!(final_stats.total_active, initial_count);
1191    }
1192
1193    #[test]
1194    fn test_managed_resource() {
1195        let initial_count = global_leak_detector().get_stats().total_active;
1196
1197        // Create managed resource
1198        let resource = Resource::<String, Acquired>::new("test".to_string());
1199        let managed = ManagedResource::new(resource, "TestResource");
1200
1201        // Should be tracked
1202        let stats = global_leak_detector().get_stats();
1203        assert!(stats.total_active > initial_count);
1204        assert!(stats.by_type.contains_key("TestResource"));
1205
1206        // Can access inner resource
1207        assert!(managed.get().is_some());
1208
1209        // Taking resource should work and update tracking
1210        let taken = managed.take();
1211        assert!(taken.is_some());
1212
1213        // Should be untracked now
1214        let final_stats = global_leak_detector().get_stats();
1215        assert_eq!(final_stats.total_active, initial_count);
1216    }
1217
1218    #[test]
1219    fn test_managed_resource_drop_cleanup() {
1220        let initial_count = global_leak_detector().get_stats().total_active;
1221
1222        // Create managed resource and drop it
1223        {
1224            let resource = Resource::<String, Acquired>::new("test".to_string());
1225            let _managed = ManagedResource::new(resource, "TestResource");
1226
1227            // Should be tracked
1228            let stats = global_leak_detector().get_stats();
1229            assert!(stats.total_active > initial_count);
1230        } // Drop happens here
1231
1232        // Should be untracked after drop
1233        let final_stats = global_leak_detector().get_stats();
1234        assert_eq!(final_stats.total_active, initial_count);
1235    }
1236
1237    #[test]
1238    fn test_resource_extension_traits() {
1239        let resource = Resource::<String, Acquired>::new("test".to_string());
1240
1241        // Test managed extension
1242        let managed = resource.managed("TestResource");
1243        assert!(managed.get().is_some());
1244
1245        let resource2 = Resource::<String, Acquired>::new("test2".to_string());
1246
1247        // Test scoped extension
1248        let scope = resource2.scoped();
1249        assert!(!scope.is_released());
1250
1251        let resource3 = Resource::<String, Acquired>::new("test3".to_string());
1252
1253        // Test timed extension
1254        let timed = resource3.with_timeout(Duration::from_secs(1));
1255        assert!(timed.get().is_some());
1256    }
1257
1258    #[tokio::test]
1259    async fn test_resource_state_machine_invalid_transitions() {
1260        // Test that certain state transitions are not allowed at compile time
1261
1262        let released = Resource::<String, Released>::new("test".to_string());
1263        assert!(released.is_released());
1264
1265        // These operations should not compile (verified by compiler):
1266        // released.get(); // ❌ Cannot access released resource
1267        // released.release(); // ❌ Cannot release already released resource
1268        // released.mark_failed(); // ❌ Cannot fail released resource
1269    }
1270
1271    #[tokio::test]
1272    async fn test_concurrent_resource_access() {
1273        let resource = locking::create_mutex_resource(0i32);
1274        let resource = Arc::new(resource);
1275
1276        // Spawn multiple tasks that increment the counter
1277        let mut handles = vec![];
1278        for _ in 0..10 {
1279            let resource_clone = resource.clone();
1280            let handle = tokio::spawn(async move {
1281                let mut guard = resource_clone.lock().unwrap();
1282                let current = *guard.get();
1283                *guard.get_mut() = current + 1;
1284                // Lock is released when guard is dropped
1285            });
1286            handles.push(handle);
1287        }
1288
1289        // Wait for all tasks to complete
1290        for handle in handles {
1291            handle.await.unwrap();
1292        }
1293
1294        // Verify final value
1295        assert_eq!(*resource.lock().unwrap().get(), 10);
1296    }
1297
1298    #[tokio::test]
1299    async fn test_resource_timeout_behavior() {
1300        let resource = Resource::<String, Acquired>::new("test".to_string());
1301        let guard = TimedResourceGuard::new(resource, Duration::from_millis(50));
1302
1303        // Test that timeout actually works
1304        let result = timeout(Duration::from_millis(100), async {
1305            while !guard.is_timed_out() {
1306                sleep(Duration::from_millis(10)).await;
1307            }
1308        })
1309        .await;
1310
1311        assert!(result.is_ok(), "Timeout should have occurred within 100ms");
1312        assert!(guard.is_timed_out());
1313    }
1314
1315    #[test]
1316    fn test_resource_error_types() {
1317        // Test different error variants
1318        let acquisition_error = ResourceError::AcquisitionFailed("test".to_string());
1319        assert!(matches!(
1320            acquisition_error,
1321            ResourceError::AcquisitionFailed(_)
1322        ));
1323
1324        let release_error = ResourceError::ReleaseFailed("test".to_string());
1325        assert!(matches!(release_error, ResourceError::ReleaseFailed(_)));
1326
1327        let invalid_state_error = ResourceError::InvalidState("test".to_string());
1328        assert!(matches!(
1329            invalid_state_error,
1330            ResourceError::InvalidState(_)
1331        ));
1332
1333        let timeout_error = ResourceError::Timeout {
1334            duration: Duration::from_secs(1),
1335        };
1336        assert!(matches!(timeout_error, ResourceError::Timeout { .. }));
1337
1338        let poisoned_error = ResourceError::Poisoned("test".to_string());
1339        assert!(matches!(poisoned_error, ResourceError::Poisoned(_)));
1340    }
1341
1342    #[test]
1343    fn test_sealed_trait_pattern() {
1344        // Test that our sealed traits work correctly
1345        // This is mostly a compile-time test
1346
1347        fn test_acquired<S: IsAcquired>(_state: std::marker::PhantomData<S>) {
1348            // This function should only accept acquired states
1349        }
1350
1351        fn test_releasable<S: IsReleasable>(_state: std::marker::PhantomData<S>) {
1352            // This function should only accept releasable states
1353        }
1354
1355        // These should compile
1356        test_acquired(std::marker::PhantomData::<Acquired>);
1357        test_releasable(std::marker::PhantomData::<Acquired>);
1358        test_releasable(std::marker::PhantomData::<Failed>);
1359
1360        // These should NOT compile (verified by external compile tests):
1361        // test_acquired(std::marker::PhantomData::<Released>);
1362        // test_releasable(std::marker::PhantomData::<Released>);
1363    }
1364
1365    #[test]
1366    fn test_compilation_errors() {
1367        // These tests verify that certain operations don't compile
1368        // They are written as compile_fail tests to document the expected behavior
1369
1370        // Cannot use released resource
1371        let _resource = Resource::<String, Released>::new("test".to_string());
1372        // resource.get(); // ❌ This should not compile
1373
1374        // Cannot release an already released resource
1375        // resource.release(); // ❌ This should not compile
1376
1377        // Cannot mark released resource as failed
1378        // resource.mark_failed(); // ❌ This should not compile
1379    }
1380}
1381
1382/// Integration with existing EventCore types
1383pub mod integration {
1384    use super::{states, Resource};
1385
1386    /// Resource wrapper for event stores
1387    pub type EventStoreResource<ES> = Resource<ES, states::Acquired>;
1388
1389    /// Resource wrapper for subscriptions
1390    pub type SubscriptionResource<S> = Resource<S, states::Acquired>;
1391
1392    /// Extension trait for event store resources
1393    pub trait EventStoreResourceExt<ES> {
1394        /// Create an event store resource
1395        fn into_resource(self) -> EventStoreResource<ES>;
1396    }
1397
1398    impl<ES> EventStoreResourceExt<ES> for ES {
1399        fn into_resource(self) -> EventStoreResource<ES> {
1400            Resource::new(self)
1401        }
1402    }
1403
1404    /// Extension trait for subscription resources
1405    pub trait SubscriptionResourceExt<S> {
1406        /// Create a subscription resource
1407        fn into_resource(self) -> SubscriptionResource<S>;
1408    }
1409
1410    impl<S> SubscriptionResourceExt<S> for S {
1411        fn into_resource(self) -> SubscriptionResource<S> {
1412            Resource::new(self)
1413        }
1414    }
1415}