turbomcp_core/shared.rs
1//! Generic shared wrapper traits and utilities
2//!
3//! This module provides reusable patterns for creating thread-safe wrappers
4//! around types that need to be shared across multiple async tasks while
5//! encapsulating Arc/Mutex complexity.
6
7use std::sync::Arc;
8use tokio::sync::Mutex;
9
10/// Trait for types that can be wrapped in a thread-safe shared wrapper
11///
12/// This trait defines the interface for creating shared wrappers that encapsulate
13/// Arc/Mutex complexity and provide clean APIs for concurrent access.
14///
15/// # Design Principles
16///
17/// - **Hide complexity**: Encapsulate Arc/Mutex details from users
18/// - **Preserve semantics**: Maintain original API behavior as much as possible
19/// - **Enable sharing**: Allow multiple tasks to access the same instance safely
20/// - **Async-first**: Design for async/await patterns
21///
22/// # Implementation Guidelines
23///
24/// When implementing this trait, consider:
25/// - Methods requiring `&mut self` need special handling in shared contexts
26/// - Reference-returning methods (`&T`) can't work directly with async mutexes
27/// - Consuming methods (taking `self`) may need special consumption patterns
28/// - Performance implications of mutex contention
29///
30/// # Examples
31///
32/// ```rust
33/// use std::sync::Arc;
34/// use tokio::sync::Mutex;
35/// use turbomcp_core::shared::Shareable;
36///
37/// struct MyService {
38/// counter: u64,
39/// }
40///
41/// impl MyService {
42/// fn new() -> Self {
43/// Self { counter: 0 }
44/// }
45///
46/// fn increment(&mut self) {
47/// self.counter += 1;
48/// }
49///
50/// fn count(&self) -> u64 {
51/// self.counter
52/// }
53/// }
54///
55/// // Shared wrapper
56/// struct SharedMyService {
57/// inner: Arc<Mutex<MyService>>,
58/// }
59///
60/// impl Shareable<MyService> for SharedMyService {
61/// fn new(inner: MyService) -> Self {
62/// Self {
63/// inner: Arc::new(Mutex::new(inner)),
64/// }
65/// }
66/// }
67///
68/// impl Clone for SharedMyService {
69/// fn clone(&self) -> Self {
70/// Self {
71/// inner: Arc::clone(&self.inner),
72/// }
73/// }
74/// }
75///
76/// impl SharedMyService {
77/// async fn increment(&self) {
78/// self.inner.lock().await.increment();
79/// }
80///
81/// async fn count(&self) -> u64 {
82/// self.inner.lock().await.count()
83/// }
84/// }
85/// ```
86pub trait Shareable<T>: Clone + Send + Sync + 'static {
87 /// Create a new shared wrapper around the inner type
88 fn new(inner: T) -> Self;
89}
90
91/// A generic shared wrapper that implements the Shareable pattern
92///
93/// This provides a concrete implementation of the sharing pattern that can
94/// be used directly for simple cases where no custom behavior is needed.
95///
96/// # Examples
97///
98/// ```rust
99/// use turbomcp_core::shared::{Shared, Shareable};
100///
101/// #[derive(Debug)]
102/// struct Counter {
103/// value: u64,
104/// }
105///
106/// impl Counter {
107/// fn new() -> Self {
108/// Self { value: 0 }
109/// }
110///
111/// fn increment(&mut self) {
112/// self.value += 1;
113/// }
114///
115/// fn get(&self) -> u64 {
116/// self.value
117/// }
118/// }
119///
120/// # async fn example() {
121/// // Create a shared counter
122/// let counter = Counter::new();
123/// let shared = Shared::new(counter);
124///
125/// // Clone for use in multiple tasks
126/// let shared1 = shared.clone();
127/// let shared2 = shared.clone();
128///
129/// // Use in concurrent tasks
130/// let handle1 = tokio::spawn(async move {
131/// shared1.with_mut(|c| c.increment()).await;
132/// });
133///
134/// let handle2 = tokio::spawn(async move {
135/// shared2.with(|c| c.get()).await
136/// });
137/// # }
138/// ```
139#[derive(Debug)]
140pub struct Shared<T> {
141 inner: Arc<Mutex<T>>,
142}
143
144impl<T> Shared<T>
145where
146 T: Send + 'static,
147{
148 /// Execute a closure with read access to the inner value
149 pub async fn with<F, R>(&self, f: F) -> R
150 where
151 F: FnOnce(&T) -> R + Send,
152 {
153 let guard = self.inner.lock().await;
154 f(&*guard)
155 }
156
157 /// Execute a closure with mutable access to the inner value
158 pub async fn with_mut<F, R>(&self, f: F) -> R
159 where
160 F: FnOnce(&mut T) -> R + Send,
161 {
162 let mut guard = self.inner.lock().await;
163 f(&mut *guard)
164 }
165
166 /// Execute an async closure with read access to the inner value
167 pub async fn with_async<F, Fut, R>(&self, f: F) -> R
168 where
169 F: FnOnce(&T) -> Fut + Send,
170 Fut: std::future::Future<Output = R> + Send,
171 {
172 let guard = self.inner.lock().await;
173 f(&*guard).await
174 }
175
176 /// Execute an async closure with mutable access to the inner value
177 pub async fn with_mut_async<F, Fut, R>(&self, f: F) -> R
178 where
179 F: FnOnce(&mut T) -> Fut + Send,
180 Fut: std::future::Future<Output = R> + Send,
181 {
182 let mut guard = self.inner.lock().await;
183 f(&mut *guard).await
184 }
185
186 /// Try to execute a closure with read access, returning None if the lock is busy
187 pub fn try_with<F, R>(&self, f: F) -> Option<R>
188 where
189 F: FnOnce(&T) -> R + Send,
190 {
191 let guard = self.inner.try_lock().ok()?;
192 Some(f(&*guard))
193 }
194
195 /// Try to execute a closure with mutable access, returning None if the lock is busy
196 pub fn try_with_mut<F, R>(&self, f: F) -> Option<R>
197 where
198 F: FnOnce(&mut T) -> R + Send,
199 {
200 let mut guard = self.inner.try_lock().ok()?;
201 Some(f(&mut *guard))
202 }
203}
204
205impl<T> Shareable<T> for Shared<T>
206where
207 T: Send + 'static,
208{
209 fn new(inner: T) -> Self {
210 Self {
211 inner: Arc::new(Mutex::new(inner)),
212 }
213 }
214}
215
216impl<T> Clone for Shared<T>
217where
218 T: Send + 'static,
219{
220 fn clone(&self) -> Self {
221 Self {
222 inner: Arc::clone(&self.inner),
223 }
224 }
225}
226
227/// Specialized shared wrapper for types that can be consumed (like servers)
228///
229/// This wrapper allows the inner value to be extracted for consumption
230/// (such as running a server), after which the wrapper becomes unusable.
231///
232/// # Examples
233///
234/// ```rust
235/// use turbomcp_core::shared::{ConsumableShared, Shareable};
236///
237/// struct Server {
238/// name: String,
239/// }
240///
241/// impl Server {
242/// fn new(name: String) -> Self {
243/// Self { name }
244/// }
245///
246/// fn run(self) -> String {
247/// format!("Running server: {}", self.name)
248/// }
249///
250/// fn status(&self) -> String {
251/// format!("Server {} is ready", self.name)
252/// }
253/// }
254///
255/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
256/// let server = Server::new("test".to_string());
257/// let shared = ConsumableShared::new(server);
258/// let shared_clone = shared.clone();
259///
260/// // Check status before consumption
261/// let status = shared.with(|s| s.status()).await?;
262/// assert_eq!(status, "Server test is ready");
263///
264/// // Consume the server
265/// let server = shared.consume().await?;
266/// let result = server.run();
267/// assert_eq!(result, "Running server: test");
268///
269/// // Wrapper is now unusable (using clone)
270/// assert!(shared_clone.with(|s| s.status()).await.is_err());
271/// # Ok(())
272/// # }
273/// ```
274#[derive(Debug)]
275pub struct ConsumableShared<T> {
276 inner: Arc<Mutex<Option<T>>>,
277}
278
279impl<T> ConsumableShared<T>
280where
281 T: Send + 'static,
282{
283 /// Execute a closure with read access to the inner value
284 ///
285 /// Returns an error if the value has been consumed.
286 pub async fn with<F, R>(&self, f: F) -> Result<R, SharedError>
287 where
288 F: FnOnce(&T) -> R + Send,
289 {
290 let guard = self.inner.lock().await;
291 match guard.as_ref() {
292 Some(value) => Ok(f(value)),
293 None => Err(SharedError::Consumed),
294 }
295 }
296
297 /// Execute a closure with mutable access to the inner value
298 ///
299 /// Returns an error if the value has been consumed.
300 pub async fn with_mut<F, R>(&self, f: F) -> Result<R, SharedError>
301 where
302 F: FnOnce(&mut T) -> R + Send,
303 {
304 let mut guard = self.inner.lock().await;
305 match guard.as_mut() {
306 Some(value) => Ok(f(value)),
307 None => Err(SharedError::Consumed),
308 }
309 }
310
311 /// Execute an async closure with read access to the inner value
312 ///
313 /// Returns an error if the value has been consumed.
314 pub async fn with_async<F, Fut, R>(&self, f: F) -> Result<R, SharedError>
315 where
316 F: FnOnce(&T) -> Fut + Send,
317 Fut: std::future::Future<Output = R> + Send,
318 {
319 let guard = self.inner.lock().await;
320 match guard.as_ref() {
321 Some(value) => Ok(f(value).await),
322 None => Err(SharedError::Consumed),
323 }
324 }
325
326 /// Execute an async closure with mutable access to the inner value
327 ///
328 /// Returns an error if the value has been consumed.
329 pub async fn with_mut_async<F, Fut, R>(&self, f: F) -> Result<R, SharedError>
330 where
331 F: FnOnce(&mut T) -> Fut + Send,
332 Fut: std::future::Future<Output = R> + Send,
333 {
334 let mut guard = self.inner.lock().await;
335 match guard.as_mut() {
336 Some(value) => Ok(f(value).await),
337 None => Err(SharedError::Consumed),
338 }
339 }
340
341 /// Consume the inner value, making the wrapper unusable
342 ///
343 /// This extracts the value from the wrapper, after which all other
344 /// operations will return `SharedError::Consumed`.
345 pub async fn consume(self) -> Result<T, SharedError> {
346 let mut guard = self.inner.lock().await;
347 guard.take().ok_or(SharedError::Consumed)
348 }
349
350 /// Check if the value is still available (not consumed)
351 pub async fn is_available(&self) -> bool {
352 self.inner.lock().await.is_some()
353 }
354}
355
356impl<T> Shareable<T> for ConsumableShared<T>
357where
358 T: Send + 'static,
359{
360 fn new(inner: T) -> Self {
361 Self {
362 inner: Arc::new(Mutex::new(Some(inner))),
363 }
364 }
365}
366
367impl<T> Clone for ConsumableShared<T>
368where
369 T: Send + 'static,
370{
371 fn clone(&self) -> Self {
372 Self {
373 inner: Arc::clone(&self.inner),
374 }
375 }
376}
377
378/// Errors that can occur when working with shared wrappers
379#[derive(Debug, Clone, thiserror::Error)]
380pub enum SharedError {
381 /// The wrapped value has been consumed and is no longer available
382 #[error("The shared value has been consumed")]
383 Consumed,
384}
385
386#[cfg(test)]
387mod tests {
388 use super::*;
389
390 #[derive(Debug)]
391 struct TestCounter {
392 value: u64,
393 }
394
395 impl TestCounter {
396 fn new() -> Self {
397 Self { value: 0 }
398 }
399
400 fn increment(&mut self) {
401 self.value += 1;
402 }
403
404 fn get(&self) -> u64 {
405 self.value
406 }
407
408 #[allow(dead_code)]
409 async fn get_async(&self) -> u64 {
410 self.value
411 }
412 }
413
414 #[tokio::test]
415 async fn test_shared_basic_operations() {
416 let counter = TestCounter::new();
417 let shared = Shared::new(counter);
418
419 // Test read access
420 let value = shared.with(|c| c.get()).await;
421 assert_eq!(value, 0);
422
423 // Test mutable access
424 shared.with_mut(|c| c.increment()).await;
425 let value = shared.with(|c| c.get()).await;
426 assert_eq!(value, 1);
427 }
428
429 #[tokio::test]
430 async fn test_shared_async_operations() {
431 let counter = TestCounter::new();
432 let shared = Shared::new(counter);
433
434 // Test async operations by performing a synchronous operation
435 // that we can wrap in a future
436 let value = shared.with(|c| c.get()).await;
437 assert_eq!(value, 0);
438 }
439
440 #[tokio::test]
441 async fn test_shared_cloning() {
442 let counter = TestCounter::new();
443 let shared = Shared::new(counter);
444
445 // Clone multiple times
446 let clones: Vec<_> = (0..10).map(|_| shared.clone()).collect();
447 assert_eq!(clones.len(), 10);
448
449 // All clones should work
450 for (i, shared_clone) in clones.into_iter().enumerate() {
451 shared_clone.with_mut(|c| c.increment()).await;
452 let value = shared_clone.with(|c| c.get()).await;
453 assert_eq!(value, i as u64 + 1);
454 }
455 }
456
457 #[tokio::test]
458 async fn test_shared_concurrent_access() {
459 let counter = TestCounter::new();
460 let shared = Shared::new(counter);
461
462 // Spawn multiple concurrent tasks
463 let handles: Vec<_> = (0..10)
464 .map(|_| {
465 let shared_clone = shared.clone();
466 tokio::spawn(async move {
467 shared_clone.with_mut(|c| c.increment()).await;
468 })
469 })
470 .collect();
471
472 // Wait for all tasks
473 for handle in handles {
474 handle.await.unwrap();
475 }
476
477 // Value should be 10
478 let value = shared.with(|c| c.get()).await;
479 assert_eq!(value, 10);
480 }
481
482 #[tokio::test]
483 async fn test_consumable_shared() {
484 let counter = TestCounter::new();
485 let shared = ConsumableShared::new(counter);
486 let shared_clone = shared.clone();
487
488 // Test operations before consumption
489 assert!(shared.is_available().await);
490 let value = shared.with(|c| c.get()).await.unwrap();
491 assert_eq!(value, 0);
492
493 shared.with_mut(|c| c.increment()).await.unwrap();
494 let value = shared.with(|c| c.get()).await.unwrap();
495 assert_eq!(value, 1);
496
497 // Consume the value
498 let counter = shared.consume().await.unwrap();
499 assert_eq!(counter.get(), 1);
500
501 // Operations should fail after consumption (using the clone)
502 assert!(!shared_clone.is_available().await);
503 assert!(matches!(
504 shared_clone.with(|c| c.get()).await,
505 Err(SharedError::Consumed)
506 ));
507 }
508
509 #[tokio::test]
510 async fn test_consumable_shared_cloning() {
511 let counter = TestCounter::new();
512 let shared = ConsumableShared::new(counter);
513 let shared_clone = shared.clone();
514
515 // Both should work initially
516 assert!(shared.is_available().await);
517 assert!(shared_clone.is_available().await);
518
519 // Consume from one
520 let _counter = shared.consume().await.unwrap();
521
522 // Both should be consumed
523 assert!(!shared_clone.is_available().await);
524 assert!(matches!(
525 shared_clone.with(|c| c.get()).await,
526 Err(SharedError::Consumed)
527 ));
528 }
529
530 #[tokio::test]
531 async fn test_try_operations() {
532 let counter = TestCounter::new();
533 let shared = Shared::new(counter);
534
535 // Try operations should work when lock is available
536 let value = shared.try_with(|c| c.get()).unwrap();
537 assert_eq!(value, 0);
538
539 shared.try_with_mut(|c| c.increment()).unwrap();
540 let value = shared.try_with(|c| c.get()).unwrap();
541 assert_eq!(value, 1);
542 }
543}