turbomcp_core/
shared.rs

1//! Generic shared wrapper traits and utilities
2//!
3//! This module provides reusable patterns for creating thread-safe wrappers
4//! around types that need to be shared across multiple async tasks while
5//! encapsulating Arc/Mutex complexity.
6
7use std::sync::Arc;
8use tokio::sync::Mutex;
9
10/// Trait for types that can be wrapped in a thread-safe shared wrapper
11///
12/// This trait defines the interface for creating shared wrappers that encapsulate
13/// Arc/Mutex complexity and provide clean APIs for concurrent access.
14///
15/// # Design Principles
16///
17/// - **Hide complexity**: Encapsulate Arc/Mutex details from users
18/// - **Preserve semantics**: Maintain original API behavior as much as possible
19/// - **Enable sharing**: Allow multiple tasks to access the same instance safely
20/// - **Async-first**: Design for async/await patterns
21///
22/// # Implementation Guidelines
23///
24/// When implementing this trait, consider:
25/// - Methods requiring `&mut self` need special handling in shared contexts
26/// - Reference-returning methods (`&T`) can't work directly with async mutexes
27/// - Consuming methods (taking `self`) may need special consumption patterns
28/// - Performance implications of mutex contention
29///
30/// # Examples
31///
32/// ```rust
33/// use std::sync::Arc;
34/// use tokio::sync::Mutex;
35/// use turbomcp_core::shared::Shareable;
36///
37/// struct MyService {
38///     counter: u64,
39/// }
40///
41/// impl MyService {
42///     fn new() -> Self {
43///         Self { counter: 0 }
44///     }
45///
46///     fn increment(&mut self) {
47///         self.counter += 1;
48///     }
49///
50///     fn count(&self) -> u64 {
51///         self.counter
52///     }
53/// }
54///
55/// // Shared wrapper
56/// struct SharedMyService {
57///     inner: Arc<Mutex<MyService>>,
58/// }
59///
60/// impl Shareable<MyService> for SharedMyService {
61///     fn new(inner: MyService) -> Self {
62///         Self {
63///             inner: Arc::new(Mutex::new(inner)),
64///         }
65///     }
66/// }
67///
68/// impl Clone for SharedMyService {
69///     fn clone(&self) -> Self {
70///         Self {
71///             inner: Arc::clone(&self.inner),
72///         }
73///     }
74/// }
75///
76/// impl SharedMyService {
77///     async fn increment(&self) {
78///         self.inner.lock().await.increment();
79///     }
80///
81///     async fn count(&self) -> u64 {
82///         self.inner.lock().await.count()
83///     }
84/// }
85/// ```
86pub trait Shareable<T>: Clone + Send + Sync + 'static {
87    /// Create a new shared wrapper around the inner type
88    fn new(inner: T) -> Self;
89}
90
91/// A generic shared wrapper that implements the Shareable pattern
92///
93/// This provides a concrete implementation of the sharing pattern that can
94/// be used directly for simple cases where no custom behavior is needed.
95///
96/// # Examples
97///
98/// ```rust
99/// use turbomcp_core::shared::{Shared, Shareable};
100///
101/// #[derive(Debug)]
102/// struct Counter {
103///     value: u64,
104/// }
105///
106/// impl Counter {
107///     fn new() -> Self {
108///         Self { value: 0 }
109///     }
110///
111///     fn increment(&mut self) {
112///         self.value += 1;
113///     }
114///
115///     fn get(&self) -> u64 {
116///         self.value
117///     }
118/// }
119///
120/// # async fn example() {
121/// // Create a shared counter
122/// let counter = Counter::new();
123/// let shared = Shared::new(counter);
124///
125/// // Clone for use in multiple tasks
126/// let shared1 = shared.clone();
127/// let shared2 = shared.clone();
128///
129/// // Use in concurrent tasks
130/// let handle1 = tokio::spawn(async move {
131///     shared1.with_mut(|c| c.increment()).await;
132/// });
133///
134/// let handle2 = tokio::spawn(async move {
135///     shared2.with(|c| c.get()).await
136/// });
137/// # }
138/// ```
139#[derive(Debug)]
140pub struct Shared<T> {
141    inner: Arc<Mutex<T>>,
142}
143
144impl<T> Shared<T>
145where
146    T: Send + 'static,
147{
148    /// Execute a closure with read access to the inner value
149    pub async fn with<F, R>(&self, f: F) -> R
150    where
151        F: FnOnce(&T) -> R + Send,
152    {
153        let guard = self.inner.lock().await;
154        f(&*guard)
155    }
156
157    /// Execute a closure with mutable access to the inner value
158    pub async fn with_mut<F, R>(&self, f: F) -> R
159    where
160        F: FnOnce(&mut T) -> R + Send,
161    {
162        let mut guard = self.inner.lock().await;
163        f(&mut *guard)
164    }
165
166    /// Execute an async closure with read access to the inner value
167    pub async fn with_async<F, Fut, R>(&self, f: F) -> R
168    where
169        F: FnOnce(&T) -> Fut + Send,
170        Fut: std::future::Future<Output = R> + Send,
171    {
172        let guard = self.inner.lock().await;
173        f(&*guard).await
174    }
175
176    /// Execute an async closure with mutable access to the inner value
177    pub async fn with_mut_async<F, Fut, R>(&self, f: F) -> R
178    where
179        F: FnOnce(&mut T) -> Fut + Send,
180        Fut: std::future::Future<Output = R> + Send,
181    {
182        let mut guard = self.inner.lock().await;
183        f(&mut *guard).await
184    }
185
186    /// Try to execute a closure with read access, returning None if the lock is busy
187    pub fn try_with<F, R>(&self, f: F) -> Option<R>
188    where
189        F: FnOnce(&T) -> R + Send,
190    {
191        let guard = self.inner.try_lock().ok()?;
192        Some(f(&*guard))
193    }
194
195    /// Try to execute a closure with mutable access, returning None if the lock is busy
196    pub fn try_with_mut<F, R>(&self, f: F) -> Option<R>
197    where
198        F: FnOnce(&mut T) -> R + Send,
199    {
200        let mut guard = self.inner.try_lock().ok()?;
201        Some(f(&mut *guard))
202    }
203}
204
205impl<T> Shareable<T> for Shared<T>
206where
207    T: Send + 'static,
208{
209    fn new(inner: T) -> Self {
210        Self {
211            inner: Arc::new(Mutex::new(inner)),
212        }
213    }
214}
215
216impl<T> Clone for Shared<T>
217where
218    T: Send + 'static,
219{
220    fn clone(&self) -> Self {
221        Self {
222            inner: Arc::clone(&self.inner),
223        }
224    }
225}
226
227/// Specialized shared wrapper for types that can be consumed (like servers)
228///
229/// This wrapper allows the inner value to be extracted for consumption
230/// (such as running a server), after which the wrapper becomes unusable.
231///
232/// # Examples
233///
234/// ```rust
235/// use turbomcp_core::shared::{ConsumableShared, Shareable};
236///
237/// struct Server {
238///     name: String,
239/// }
240///
241/// impl Server {
242///     fn new(name: String) -> Self {
243///         Self { name }
244///     }
245///
246///     fn run(self) -> String {
247///         format!("Running server: {}", self.name)
248///     }
249///
250///     fn status(&self) -> String {
251///         format!("Server {} is ready", self.name)
252///     }
253/// }
254///
255/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
256/// let server = Server::new("test".to_string());
257/// let shared = ConsumableShared::new(server);
258/// let shared_clone = shared.clone();
259///
260/// // Check status before consumption
261/// let status = shared.with(|s| s.status()).await?;
262/// assert_eq!(status, "Server test is ready");
263///
264/// // Consume the server
265/// let server = shared.consume().await?;
266/// let result = server.run();
267/// assert_eq!(result, "Running server: test");
268///
269/// // Wrapper is now unusable (using clone)
270/// assert!(shared_clone.with(|s| s.status()).await.is_err());
271/// # Ok(())
272/// # }
273/// ```
274#[derive(Debug)]
275pub struct ConsumableShared<T> {
276    inner: Arc<Mutex<Option<T>>>,
277}
278
279impl<T> ConsumableShared<T>
280where
281    T: Send + 'static,
282{
283    /// Execute a closure with read access to the inner value
284    ///
285    /// Returns an error if the value has been consumed.
286    pub async fn with<F, R>(&self, f: F) -> Result<R, SharedError>
287    where
288        F: FnOnce(&T) -> R + Send,
289    {
290        let guard = self.inner.lock().await;
291        match guard.as_ref() {
292            Some(value) => Ok(f(value)),
293            None => Err(SharedError::Consumed),
294        }
295    }
296
297    /// Execute a closure with mutable access to the inner value
298    ///
299    /// Returns an error if the value has been consumed.
300    pub async fn with_mut<F, R>(&self, f: F) -> Result<R, SharedError>
301    where
302        F: FnOnce(&mut T) -> R + Send,
303    {
304        let mut guard = self.inner.lock().await;
305        match guard.as_mut() {
306            Some(value) => Ok(f(value)),
307            None => Err(SharedError::Consumed),
308        }
309    }
310
311    /// Execute an async closure with read access to the inner value
312    ///
313    /// Returns an error if the value has been consumed.
314    pub async fn with_async<F, Fut, R>(&self, f: F) -> Result<R, SharedError>
315    where
316        F: FnOnce(&T) -> Fut + Send,
317        Fut: std::future::Future<Output = R> + Send,
318    {
319        let guard = self.inner.lock().await;
320        match guard.as_ref() {
321            Some(value) => Ok(f(value).await),
322            None => Err(SharedError::Consumed),
323        }
324    }
325
326    /// Execute an async closure with mutable access to the inner value
327    ///
328    /// Returns an error if the value has been consumed.
329    pub async fn with_mut_async<F, Fut, R>(&self, f: F) -> Result<R, SharedError>
330    where
331        F: FnOnce(&mut T) -> Fut + Send,
332        Fut: std::future::Future<Output = R> + Send,
333    {
334        let mut guard = self.inner.lock().await;
335        match guard.as_mut() {
336            Some(value) => Ok(f(value).await),
337            None => Err(SharedError::Consumed),
338        }
339    }
340
341    /// Consume the inner value, making the wrapper unusable
342    ///
343    /// This extracts the value from the wrapper, after which all other
344    /// operations will return `SharedError::Consumed`.
345    pub async fn consume(self) -> Result<T, SharedError> {
346        let mut guard = self.inner.lock().await;
347        guard.take().ok_or(SharedError::Consumed)
348    }
349
350    /// Check if the value is still available (not consumed)
351    pub async fn is_available(&self) -> bool {
352        self.inner.lock().await.is_some()
353    }
354}
355
356impl<T> Shareable<T> for ConsumableShared<T>
357where
358    T: Send + 'static,
359{
360    fn new(inner: T) -> Self {
361        Self {
362            inner: Arc::new(Mutex::new(Some(inner))),
363        }
364    }
365}
366
367impl<T> Clone for ConsumableShared<T>
368where
369    T: Send + 'static,
370{
371    fn clone(&self) -> Self {
372        Self {
373            inner: Arc::clone(&self.inner),
374        }
375    }
376}
377
378/// Errors that can occur when working with shared wrappers
379#[derive(Debug, Clone, thiserror::Error)]
380pub enum SharedError {
381    /// The wrapped value has been consumed and is no longer available
382    #[error("The shared value has been consumed")]
383    Consumed,
384}
385
386#[cfg(test)]
387mod tests {
388    use super::*;
389
390    #[derive(Debug)]
391    struct TestCounter {
392        value: u64,
393    }
394
395    impl TestCounter {
396        fn new() -> Self {
397            Self { value: 0 }
398        }
399
400        fn increment(&mut self) {
401            self.value += 1;
402        }
403
404        fn get(&self) -> u64 {
405            self.value
406        }
407
408        #[allow(dead_code)]
409        async fn get_async(&self) -> u64 {
410            self.value
411        }
412    }
413
414    #[tokio::test]
415    async fn test_shared_basic_operations() {
416        let counter = TestCounter::new();
417        let shared = Shared::new(counter);
418
419        // Test read access
420        let value = shared.with(|c| c.get()).await;
421        assert_eq!(value, 0);
422
423        // Test mutable access
424        shared.with_mut(|c| c.increment()).await;
425        let value = shared.with(|c| c.get()).await;
426        assert_eq!(value, 1);
427    }
428
429    #[tokio::test]
430    async fn test_shared_async_operations() {
431        let counter = TestCounter::new();
432        let shared = Shared::new(counter);
433
434        // Test async operations by performing a synchronous operation
435        // that we can wrap in a future
436        let value = shared.with(|c| c.get()).await;
437        assert_eq!(value, 0);
438    }
439
440    #[tokio::test]
441    async fn test_shared_cloning() {
442        let counter = TestCounter::new();
443        let shared = Shared::new(counter);
444
445        // Clone multiple times
446        let clones: Vec<_> = (0..10).map(|_| shared.clone()).collect();
447        assert_eq!(clones.len(), 10);
448
449        // All clones should work
450        for (i, shared_clone) in clones.into_iter().enumerate() {
451            shared_clone.with_mut(|c| c.increment()).await;
452            let value = shared_clone.with(|c| c.get()).await;
453            assert_eq!(value, i as u64 + 1);
454        }
455    }
456
457    #[tokio::test]
458    async fn test_shared_concurrent_access() {
459        let counter = TestCounter::new();
460        let shared = Shared::new(counter);
461
462        // Spawn multiple concurrent tasks
463        let handles: Vec<_> = (0..10)
464            .map(|_| {
465                let shared_clone = shared.clone();
466                tokio::spawn(async move {
467                    shared_clone.with_mut(|c| c.increment()).await;
468                })
469            })
470            .collect();
471
472        // Wait for all tasks
473        for handle in handles {
474            handle.await.unwrap();
475        }
476
477        // Value should be 10
478        let value = shared.with(|c| c.get()).await;
479        assert_eq!(value, 10);
480    }
481
482    #[tokio::test]
483    async fn test_consumable_shared() {
484        let counter = TestCounter::new();
485        let shared = ConsumableShared::new(counter);
486        let shared_clone = shared.clone();
487
488        // Test operations before consumption
489        assert!(shared.is_available().await);
490        let value = shared.with(|c| c.get()).await.unwrap();
491        assert_eq!(value, 0);
492
493        shared.with_mut(|c| c.increment()).await.unwrap();
494        let value = shared.with(|c| c.get()).await.unwrap();
495        assert_eq!(value, 1);
496
497        // Consume the value
498        let counter = shared.consume().await.unwrap();
499        assert_eq!(counter.get(), 1);
500
501        // Operations should fail after consumption (using the clone)
502        assert!(!shared_clone.is_available().await);
503        assert!(matches!(
504            shared_clone.with(|c| c.get()).await,
505            Err(SharedError::Consumed)
506        ));
507    }
508
509    #[tokio::test]
510    async fn test_consumable_shared_cloning() {
511        let counter = TestCounter::new();
512        let shared = ConsumableShared::new(counter);
513        let shared_clone = shared.clone();
514
515        // Both should work initially
516        assert!(shared.is_available().await);
517        assert!(shared_clone.is_available().await);
518
519        // Consume from one
520        let _counter = shared.consume().await.unwrap();
521
522        // Both should be consumed
523        assert!(!shared_clone.is_available().await);
524        assert!(matches!(
525            shared_clone.with(|c| c.get()).await,
526            Err(SharedError::Consumed)
527        ));
528    }
529
530    #[tokio::test]
531    async fn test_try_operations() {
532        let counter = TestCounter::new();
533        let shared = Shared::new(counter);
534
535        // Try operations should work when lock is available
536        let value = shared.try_with(|c| c.get()).unwrap();
537        assert_eq!(value, 0);
538
539        shared.try_with_mut(|c| c.increment()).unwrap();
540        let value = shared.try_with(|c| c.get()).unwrap();
541        assert_eq!(value, 1);
542    }
543}