turbomcp_protocol/shared.rs
1//! Generic shared wrapper traits and utilities
2//!
3//! This module provides reusable patterns for creating thread-safe wrappers
4//! around types that need to be shared across multiple async tasks while
5//! encapsulating Arc/Mutex complexity.
6
7use std::sync::Arc;
8use tokio::sync::Mutex;
9
10/// Trait for types that can be wrapped in a thread-safe shared wrapper
11///
12/// This trait defines the interface for creating shared wrappers that encapsulate
13/// Arc/Mutex complexity and provide clean APIs for concurrent access.
14///
15/// # Design Principles
16///
17/// - **Hide complexity**: Encapsulate Arc/Mutex details from users
18/// - **Preserve semantics**: Maintain original API behavior as much as possible
19/// - **Enable sharing**: Allow multiple tasks to access the same instance safely
20/// - **Async-first**: Design for async/await patterns
21///
22/// # Implementation Guidelines
23///
24/// When implementing this trait, consider:
25/// - Methods requiring `&mut self` need special handling in shared contexts
26/// - Reference-returning methods (`&T`) can't work directly with async mutexes
27/// - Consuming methods (taking `self`) may need special consumption patterns
28/// - Performance implications of mutex contention
29///
30/// # Examples
31///
32/// ```rust
33/// use std::sync::Arc;
34/// use tokio::sync::Mutex;
35/// use turbomcp_protocol::shared::Shareable;
36///
37/// struct MyService {
38/// counter: u64,
39/// }
40///
41/// impl MyService {
42/// fn new() -> Self {
43/// Self { counter: 0 }
44/// }
45///
46/// fn increment(&mut self) {
47/// self.counter += 1;
48/// }
49///
50/// fn count(&self) -> u64 {
51/// self.counter
52/// }
53/// }
54///
55/// // Shared wrapper
56/// struct SharedMyService {
57/// inner: Arc<Mutex<MyService>>,
58/// }
59///
60/// impl Shareable<MyService> for SharedMyService {
61/// fn new(inner: MyService) -> Self {
62/// Self {
63/// inner: Arc::new(Mutex::new(inner)),
64/// }
65/// }
66/// }
67///
68/// impl Clone for SharedMyService {
69/// fn clone(&self) -> Self {
70/// Self {
71/// inner: Arc::clone(&self.inner),
72/// }
73/// }
74/// }
75///
76/// impl SharedMyService {
77/// async fn increment(&self) {
78/// self.inner.lock().await.increment();
79/// }
80///
81/// async fn count(&self) -> u64 {
82/// self.inner.lock().await.count()
83/// }
84/// }
85/// ```
86pub trait Shareable<T>: Clone + Send + Sync + 'static {
87 /// Create a new shared wrapper around the inner type
88 fn new(inner: T) -> Self;
89}
90
91/// A generic shared wrapper that implements the Shareable pattern
92///
93/// This provides a concrete implementation of the sharing pattern that can
94/// be used directly for simple cases where no custom behavior is needed.
95///
96/// # Examples
97///
98/// ```rust
99/// use turbomcp_protocol::shared::{Shared, Shareable};
100///
101/// #[derive(Debug)]
102/// struct Counter {
103/// value: u64,
104/// }
105///
106/// impl Counter {
107/// fn new() -> Self {
108/// Self { value: 0 }
109/// }
110///
111/// fn increment(&mut self) {
112/// self.value += 1;
113/// }
114///
115/// fn get(&self) -> u64 {
116/// self.value
117/// }
118/// }
119///
120/// # async fn example() {
121/// // Create a shared counter
122/// let counter = Counter::new();
123/// let shared = Shared::new(counter);
124///
125/// // Clone for use in multiple tasks
126/// let shared1 = shared.clone();
127/// let shared2 = shared.clone();
128///
129/// // Use in concurrent tasks
130/// let handle1 = tokio::spawn(async move {
131/// shared1.with_mut(|c| c.increment()).await;
132/// });
133///
134/// let handle2 = tokio::spawn(async move {
135/// shared2.with(|c| c.get()).await
136/// });
137/// # }
138/// ```
139#[derive(Debug)]
140pub struct Shared<T> {
141 inner: Arc<Mutex<T>>,
142}
143
144impl<T> Shared<T>
145where
146 T: Send + 'static,
147{
148 /// Execute a closure with read access to the inner value
149 pub async fn with<F, R>(&self, f: F) -> R
150 where
151 F: FnOnce(&T) -> R + Send,
152 {
153 let guard = self.inner.lock().await;
154 f(&*guard)
155 }
156
157 /// Execute a closure with mutable access to the inner value
158 pub async fn with_mut<F, R>(&self, f: F) -> R
159 where
160 F: FnOnce(&mut T) -> R + Send,
161 {
162 let mut guard = self.inner.lock().await;
163 f(&mut *guard)
164 }
165
166 /// Execute an async closure with read access to the inner value
167 pub async fn with_async<F, Fut, R>(&self, f: F) -> R
168 where
169 F: FnOnce(&T) -> Fut + Send,
170 Fut: std::future::Future<Output = R> + Send,
171 {
172 let guard = self.inner.lock().await;
173 f(&*guard).await
174 }
175
176 /// Execute an async closure with mutable access to the inner value
177 pub async fn with_mut_async<F, Fut, R>(&self, f: F) -> R
178 where
179 F: FnOnce(&mut T) -> Fut + Send,
180 Fut: std::future::Future<Output = R> + Send,
181 {
182 let mut guard = self.inner.lock().await;
183 f(&mut *guard).await
184 }
185
186 /// Try to execute a closure with read access, returning None if the lock is busy
187 pub fn try_with<F, R>(&self, f: F) -> Option<R>
188 where
189 F: FnOnce(&T) -> R + Send,
190 {
191 let guard = self.inner.try_lock().ok()?;
192 Some(f(&*guard))
193 }
194
195 /// Try to execute a closure with mutable access, returning None if the lock is busy
196 pub fn try_with_mut<F, R>(&self, f: F) -> Option<R>
197 where
198 F: FnOnce(&mut T) -> R + Send,
199 {
200 let mut guard = self.inner.try_lock().ok()?;
201 Some(f(&mut *guard))
202 }
203}
204
205impl<T> Shareable<T> for Shared<T>
206where
207 T: Send + 'static,
208{
209 fn new(inner: T) -> Self {
210 Self {
211 inner: Arc::new(Mutex::new(inner)),
212 }
213 }
214}
215
216impl<T> Clone for Shared<T>
217where
218 T: Send + 'static,
219{
220 fn clone(&self) -> Self {
221 Self {
222 inner: Arc::clone(&self.inner),
223 }
224 }
225}
226
227/// Specialized shared wrapper for types that can be consumed (like servers)
228///
229/// This wrapper allows the inner value to be extracted for consumption
230/// (such as running a server), after which the wrapper becomes unusable.
231///
232/// # Examples
233///
234/// ```rust
235/// use turbomcp_protocol::shared::{ConsumableShared, Shareable};
236///
237/// struct Server {
238/// name: String,
239/// }
240///
241/// impl Server {
242/// fn new(name: String) -> Self {
243/// Self { name }
244/// }
245///
246/// fn run(self) -> String {
247/// format!("Running server: {}", self.name)
248/// }
249///
250/// fn status(&self) -> String {
251/// format!("Server {} is ready", self.name)
252/// }
253/// }
254///
255/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
256/// let server = Server::new("test".to_string());
257/// let shared = ConsumableShared::new(server);
258/// let shared_clone = shared.clone();
259///
260/// // Check status before consumption
261/// let status = shared.with(|s| s.status()).await?;
262/// assert_eq!(status, "Server test is ready");
263///
264/// // Consume the server
265/// let server = shared.consume().await?;
266/// let result = server.run();
267/// assert_eq!(result, "Running server: test");
268///
269/// // Wrapper is now unusable (using clone)
270/// assert!(shared_clone.with(|s| s.status()).await.is_err());
271/// # Ok(())
272/// # }
273/// ```
274#[derive(Debug)]
275pub struct ConsumableShared<T> {
276 inner: Arc<Mutex<Option<T>>>,
277}
278
279impl<T> ConsumableShared<T>
280where
281 T: Send + 'static,
282{
283 /// Execute a closure with read access to the inner value
284 ///
285 /// Returns an error if the value has been consumed.
286 pub async fn with<F, R>(&self, f: F) -> Result<R, SharedError>
287 where
288 F: FnOnce(&T) -> R + Send,
289 {
290 let guard = self.inner.lock().await;
291 match guard.as_ref() {
292 Some(value) => Ok(f(value)),
293 None => Err(SharedError::Consumed),
294 }
295 }
296
297 /// Execute a closure with mutable access to the inner value
298 ///
299 /// Returns an error if the value has been consumed.
300 pub async fn with_mut<F, R>(&self, f: F) -> Result<R, SharedError>
301 where
302 F: FnOnce(&mut T) -> R + Send,
303 {
304 let mut guard = self.inner.lock().await;
305 match guard.as_mut() {
306 Some(value) => Ok(f(value)),
307 None => Err(SharedError::Consumed),
308 }
309 }
310
311 /// Execute an async closure with read access to the inner value
312 ///
313 /// Returns an error if the value has been consumed.
314 pub async fn with_async<F, Fut, R>(&self, f: F) -> Result<R, SharedError>
315 where
316 F: FnOnce(&T) -> Fut + Send,
317 Fut: std::future::Future<Output = R> + Send,
318 {
319 let guard = self.inner.lock().await;
320 match guard.as_ref() {
321 Some(value) => Ok(f(value).await),
322 None => Err(SharedError::Consumed),
323 }
324 }
325
326 /// Execute an async closure with mutable access to the inner value
327 ///
328 /// Returns an error if the value has been consumed.
329 pub async fn with_mut_async<F, Fut, R>(&self, f: F) -> Result<R, SharedError>
330 where
331 F: FnOnce(&mut T) -> Fut + Send,
332 Fut: std::future::Future<Output = R> + Send,
333 {
334 let mut guard = self.inner.lock().await;
335 match guard.as_mut() {
336 Some(value) => Ok(f(value).await),
337 None => Err(SharedError::Consumed),
338 }
339 }
340
341 /// Consume the inner value, making the wrapper unusable
342 ///
343 /// This extracts the value from the wrapper, after which all other
344 /// operations will return `SharedError::Consumed`.
345 pub async fn consume(self) -> Result<T, SharedError> {
346 let mut guard = self.inner.lock().await;
347 guard.take().ok_or(SharedError::Consumed)
348 }
349
350 /// Check if the value is still available (not consumed)
351 pub async fn is_available(&self) -> bool {
352 self.inner.lock().await.is_some()
353 }
354}
355
356impl<T> Shareable<T> for ConsumableShared<T>
357where
358 T: Send + 'static,
359{
360 fn new(inner: T) -> Self {
361 Self {
362 inner: Arc::new(Mutex::new(Some(inner))),
363 }
364 }
365}
366
367impl<T> Clone for ConsumableShared<T>
368where
369 T: Send + 'static,
370{
371 fn clone(&self) -> Self {
372 Self {
373 inner: Arc::clone(&self.inner),
374 }
375 }
376}
377
378/// Errors that can occur when working with shared wrappers
379///
380/// These errors are domain-specific for shared wrapper operations and are converted
381/// to the main [`Error`](crate::Error) type when crossing API boundaries.
382#[derive(Debug, Clone, thiserror::Error)]
383pub enum SharedError {
384 /// The wrapped value has been consumed and is no longer available
385 #[error("The shared value has been consumed")]
386 Consumed,
387}
388
389// Conversion to main Error type for API boundary crossing
390impl From<SharedError> for Box<crate::error::Error> {
391 fn from(err: SharedError) -> Self {
392 use crate::error::Error;
393 match err {
394 SharedError::Consumed => Error::validation("Shared value has already been consumed")
395 .with_component("shared_wrapper")
396 .with_context("note", "The value can only be consumed once"),
397 }
398 }
399}
400
401#[cfg(test)]
402mod tests {
403 use super::*;
404
405 #[derive(Debug)]
406 struct TestCounter {
407 value: u64,
408 }
409
410 impl TestCounter {
411 fn new() -> Self {
412 Self { value: 0 }
413 }
414
415 fn increment(&mut self) {
416 self.value += 1;
417 }
418
419 fn get(&self) -> u64 {
420 self.value
421 }
422
423 #[allow(dead_code)]
424 async fn get_async(&self) -> u64 {
425 self.value
426 }
427 }
428
429 #[tokio::test]
430 async fn test_shared_basic_operations() {
431 let counter = TestCounter::new();
432 let shared = Shared::new(counter);
433
434 // Test read access
435 let value = shared.with(|c| c.get()).await;
436 assert_eq!(value, 0);
437
438 // Test mutable access
439 shared.with_mut(|c| c.increment()).await;
440 let value = shared.with(|c| c.get()).await;
441 assert_eq!(value, 1);
442 }
443
444 #[tokio::test]
445 async fn test_shared_async_operations() {
446 let counter = TestCounter::new();
447 let shared = Shared::new(counter);
448
449 // Test async operations by performing a synchronous operation
450 // that we can wrap in a future
451 let value = shared.with(|c| c.get()).await;
452 assert_eq!(value, 0);
453 }
454
455 #[tokio::test]
456 async fn test_shared_cloning() {
457 let counter = TestCounter::new();
458 let shared = Shared::new(counter);
459
460 // Clone multiple times
461 let clones: Vec<_> = (0..10).map(|_| shared.clone()).collect();
462 assert_eq!(clones.len(), 10);
463
464 // All clones should work
465 for (i, shared_clone) in clones.into_iter().enumerate() {
466 shared_clone.with_mut(|c| c.increment()).await;
467 let value = shared_clone.with(|c| c.get()).await;
468 assert_eq!(value, i as u64 + 1);
469 }
470 }
471
472 #[tokio::test]
473 async fn test_shared_concurrent_access() {
474 let counter = TestCounter::new();
475 let shared = Shared::new(counter);
476
477 // Spawn multiple concurrent tasks
478 let handles: Vec<_> = (0..10)
479 .map(|_| {
480 let shared_clone = shared.clone();
481 tokio::spawn(async move {
482 shared_clone.with_mut(|c| c.increment()).await;
483 })
484 })
485 .collect();
486
487 // Wait for all tasks
488 for handle in handles {
489 handle.await.unwrap();
490 }
491
492 // Value should be 10
493 let value = shared.with(|c| c.get()).await;
494 assert_eq!(value, 10);
495 }
496
497 #[tokio::test]
498 async fn test_consumable_shared() {
499 let counter = TestCounter::new();
500 let shared = ConsumableShared::new(counter);
501 let shared_clone = shared.clone();
502
503 // Test operations before consumption
504 assert!(shared.is_available().await);
505 let value = shared.with(|c| c.get()).await.unwrap();
506 assert_eq!(value, 0);
507
508 shared.with_mut(|c| c.increment()).await.unwrap();
509 let value = shared.with(|c| c.get()).await.unwrap();
510 assert_eq!(value, 1);
511
512 // Consume the value
513 let counter = shared.consume().await.unwrap();
514 assert_eq!(counter.get(), 1);
515
516 // Operations should fail after consumption (using the clone)
517 assert!(!shared_clone.is_available().await);
518 assert!(matches!(
519 shared_clone.with(|c| c.get()).await,
520 Err(SharedError::Consumed)
521 ));
522 }
523
524 #[tokio::test]
525 async fn test_consumable_shared_cloning() {
526 let counter = TestCounter::new();
527 let shared = ConsumableShared::new(counter);
528 let shared_clone = shared.clone();
529
530 // Both should work initially
531 assert!(shared.is_available().await);
532 assert!(shared_clone.is_available().await);
533
534 // Consume from one
535 let _counter = shared.consume().await.unwrap();
536
537 // Both should be consumed
538 assert!(!shared_clone.is_available().await);
539 assert!(matches!(
540 shared_clone.with(|c| c.get()).await,
541 Err(SharedError::Consumed)
542 ));
543 }
544
545 #[tokio::test]
546 async fn test_try_operations() {
547 let counter = TestCounter::new();
548 let shared = Shared::new(counter);
549
550 // Try operations should work when lock is available
551 let value = shared.try_with(|c| c.get()).unwrap();
552 assert_eq!(value, 0);
553
554 shared.try_with_mut(|c| c.increment()).unwrap();
555 let value = shared.try_with(|c| c.get()).unwrap();
556 assert_eq!(value, 1);
557 }
558}