messaging_thread_pool/samples/
user_session.rs

1//! # UserSession Example
2//!
3//! This module provides a canonical example of using the `#[pool_item]` macro
4//! to create a pool item that leverages the key advantage of this library:
5//! **using `Rc` and `RefCell` for shared state without locks**.
6//!
7//! ## The Problem This Solves
8//!
9//! In traditional thread pools (like `rayon`), any shared state must be wrapped
10//! in `Arc<Mutex<...>>` because work can migrate between threads. This adds:
11//! - Lock contention overhead
12//! - Complexity in reasoning about concurrent access
13//! - Risk of deadlocks
14//!
15//! With `messaging_thread_pool`, each pool item is **pinned to a single thread**
16//! for its entire lifetime. This means you can safely use:
17//! - `Rc<T>` instead of `Arc<T>`
18//! - `RefCell<T>` instead of `Mutex<T>`
19//! - Raw pointers and FFI resources that aren't `Send`/`Sync`
20//!
21//! ## Example Structure
22//!
23//! This example demonstrates:
24//! 1. A `UserSession` pool item that tracks user actions
25//! 2. A `HistoryTracker` helper struct that shares access to the session's log
26//! 3. Both structs share the same `Vec<String>` via `Rc<RefCell<...>>`
27//!
28//! ## Usage
29//!
30//! ```rust
31//! use messaging_thread_pool::{ThreadPool, samples::*};
32//!
33//! // Create a thread pool with 2 threads
34//! let thread_pool = ThreadPool::<UserSession>::new(2);
35//!
36//! // Create a session with ID 1
37//! thread_pool
38//!     .send_and_receive(vec![UserSessionInit(1)].into_iter())
39//!     .expect("session creation")
40//!     .for_each(|_| {});
41//!
42//! // Log some actions - these are processed sequentially by the thread owning Session 1
43//! let counts: Vec<usize> = thread_pool
44//!     .send_and_receive(vec![
45//!         LogActionRequest(1, "Login".to_string()),
46//!         LogActionRequest(1, "ViewProfile".to_string()),
47//!         LogActionRequest(1, "Logout".to_string()),
48//!     ].into_iter())
49//!     .expect("actions")
50//!     .map(|resp| resp.result)
51//!     .collect();
52//!
53//! assert_eq!(counts, vec![1, 2, 3]);
54//!
55//! // Retrieve the full log
56//! let log: Vec<String> = thread_pool
57//!     .send_and_receive(vec![GetLogRequest(1)].into_iter())
58//!     .expect("get log")
59//!     .next()
60//!     .unwrap()
61//!     .result;
62//!
63//! assert_eq!(log[0], "Action: Login");
64//! assert_eq!(log[1], "Action: ViewProfile");
65//! assert_eq!(log[2], "Action: Logout");
66//! ```
67
68use std::cell::RefCell;
69use std::rc::Rc;
70
71use crate::IdTargeted;
72use crate::pool_item;
73
74/// A helper struct that needs access to the session's data.
75///
76/// In a standard thread pool, this would likely need `Arc<Mutex<Vec<String>>>`.
77/// Here, we can use `Rc<RefCell<...>>` because `UserSession` never leaves its thread.
78///
79/// This pattern is useful when you have:
80/// - Helper objects that need to modify shared state
81/// - Multiple components within a pool item that need access to the same data
82/// - Complex internal structures that would be painful to wrap in `Arc<Mutex<...>>`
83#[derive(Debug, Clone)]
84pub struct HistoryTracker {
85    /// Shared access to the history log - no `Arc`, no `Mutex`!
86    log: Rc<RefCell<Vec<String>>>,
87}
88
89impl HistoryTracker {
90    /// Add an entry to the shared log.
91    ///
92    /// Note: No locks needed! Just `borrow_mut()`.
93    pub fn add_entry(&self, entry: String) {
94        self.log.borrow_mut().push(entry);
95    }
96}
97
98/// A user session that tracks actions performed by a user.
99///
100/// This is the main pool item. It demonstrates:
101/// - Owning non-`Send`/`Sync` data (`Rc<RefCell<...>>`)
102/// - Sharing data with helper structs (`HistoryTracker`)
103/// - Sequential message processing (messages to the same session are never concurrent)
104///
105/// ## Generated Types
106///
107/// The `#[pool_item]` macro on the impl block generates:
108/// - `UserSessionInit(u64)` - Request to create a new session
109/// - `UserSessionApi` - Enum of all message types
110/// - `LogActionRequest(u64, String)` / `LogActionResponse` - For the `log_action` method
111/// - `GetLogRequest(u64)` / `GetLogResponse` - For the `get_log` method
112#[derive(Debug)]
113pub struct UserSession {
114    id: u64,
115    /// We hold the data
116    log: Rc<RefCell<Vec<String>>>,
117    /// Our helper also holds a reference to the SAME data
118    tracker: HistoryTracker,
119}
120
121impl IdTargeted for UserSession {
122    fn id(&self) -> u64 {
123        self.id
124    }
125}
126
127/// The `#[pool_item]` macro transforms this impl block to:
128/// 1. Generate the `PoolItem` trait implementation
129/// 2. Generate request/response structs for each `#[messaging(...)]` method
130/// 3. Generate the `UserSessionApi` enum containing all message variants
131/// 4. Generate the `UserSessionInit` struct for creating new sessions
132#[pool_item]
133impl UserSession {
134    /// Creates a new UserSession.
135    ///
136    /// This method is called by the thread pool when a `UserSessionInit` request
137    /// is received. The session is created on the thread determined by `id % thread_count`.
138    ///
139    /// Note how we create an `Rc<RefCell<...>>` and clone it to share with the helper.
140    /// This would be impossible in a traditional thread pool where the item might
141    /// move between threads.
142    pub fn new(id: u64) -> Self {
143        let log = Rc::new(RefCell::new(Vec::new()));
144        let tracker = HistoryTracker { log: log.clone() };
145
146        Self { id, log, tracker }
147    }
148
149    /// Log a user action and return the total number of logged actions.
150    ///
151    /// The `#[messaging(LogActionRequest, LogActionResponse)]` attribute tells the macro to:
152    /// - Generate `LogActionRequest(u64, String)` - the id and the `action` parameter
153    /// - Generate `LogActionResponse { id: u64, result: usize }` - the id and return value
154    /// - Add a variant to `UserSessionApi` for this request/response pair
155    ///
156    /// Messages to the same session (same ID) are processed sequentially,
157    /// so there's no risk of concurrent modification of the log.
158    #[messaging(LogActionRequest, LogActionResponse)]
159    pub fn log_action(&self, action: String) -> usize {
160        // We use the helper to modify the state
161        self.tracker.add_entry(format!("Action: {}", action));
162
163        // We can read the state directly
164        self.log.borrow().len()
165    }
166
167    /// Retrieve the entire action history.
168    ///
169    /// Returns a clone of the log because the original `Vec` can't leave the thread
170    /// (it's behind an `Rc`).
171    #[messaging(GetLogRequest, GetLogResponse)]
172    pub fn get_log(&self) -> Vec<String> {
173        self.log.borrow().clone()
174    }
175}
176
177#[cfg(test)]
178mod tests {
179    use crate::ThreadPool;
180
181    use super::*;
182
183    #[test]
184    fn given_user_session_when_logging_actions_then_count_increments() {
185        let thread_pool = ThreadPool::<UserSession>::new(2);
186
187        // Create session
188        thread_pool
189            .send_and_receive(vec![UserSessionInit(1)].into_iter())
190            .expect("session creation")
191            .for_each(|_| {});
192
193        // Log actions and verify counts
194        let counts: Vec<usize> = thread_pool
195            .send_and_receive(
196                vec![
197                    LogActionRequest(1, "Login".to_string()),
198                    LogActionRequest(1, "ViewProfile".to_string()),
199                    LogActionRequest(1, "Logout".to_string()),
200                ]
201                .into_iter(),
202            )
203            .expect("actions")
204            .map(|resp| resp.result)
205            .collect();
206
207        assert_eq!(counts, vec![1, 2, 3]);
208    }
209
210    #[test]
211    fn given_user_session_when_getting_log_then_returns_all_entries() {
212        let thread_pool = ThreadPool::<UserSession>::new(2);
213
214        // Create session
215        thread_pool
216            .send_and_receive(vec![UserSessionInit(1)].into_iter())
217            .expect("session creation")
218            .for_each(|_| {});
219
220        // Log actions
221        thread_pool
222            .send_and_receive(
223                vec![
224                    LogActionRequest(1, "Login".to_string()),
225                    LogActionRequest(1, "Logout".to_string()),
226                ]
227                .into_iter(),
228            )
229            .expect("actions")
230            .for_each(|_| {});
231
232        // Get log
233        let log = thread_pool
234            .send_and_receive(vec![GetLogRequest(1)].into_iter())
235            .expect("get log")
236            .next()
237            .unwrap()
238            .result;
239
240        assert_eq!(log.len(), 2);
241        assert_eq!(log[0], "Action: Login");
242        assert_eq!(log[1], "Action: Logout");
243    }
244
245    #[test]
246    fn given_multiple_sessions_when_logging_actions_then_each_has_independent_state() {
247        let thread_pool = ThreadPool::<UserSession>::new(2);
248
249        // Create two sessions
250        thread_pool
251            .send_and_receive(vec![UserSessionInit(1), UserSessionInit(2)].into_iter())
252            .expect("session creation")
253            .for_each(|_| {});
254
255        // Log to session 1
256        thread_pool
257            .send_and_receive(vec![LogActionRequest(1, "Action1".to_string())].into_iter())
258            .expect("action")
259            .for_each(|_| {});
260
261        // Log to session 2 (twice)
262        thread_pool
263            .send_and_receive(
264                vec![
265                    LogActionRequest(2, "ActionA".to_string()),
266                    LogActionRequest(2, "ActionB".to_string()),
267                ]
268                .into_iter(),
269            )
270            .expect("actions")
271            .for_each(|_| {});
272
273        // Verify independent logs
274        let log1 = thread_pool
275            .send_and_receive(vec![GetLogRequest(1)].into_iter())
276            .expect("get log")
277            .next()
278            .unwrap()
279            .result;
280
281        let log2 = thread_pool
282            .send_and_receive(vec![GetLogRequest(2)].into_iter())
283            .expect("get log")
284            .next()
285            .unwrap()
286            .result;
287
288        assert_eq!(log1.len(), 1);
289        assert_eq!(log2.len(), 2);
290    }
291}