messaging_thread_pool/samples/user_session.rs
1//! # UserSession Example
2//!
3//! This module provides a canonical example of using the `#[pool_item]` macro
4//! to create a pool item that leverages the key advantage of this library:
5//! **using `Rc` and `RefCell` for shared state without locks**.
6//!
7//! ## The Problem This Solves
8//!
9//! In traditional thread pools (like `rayon`), any shared state must be wrapped
10//! in `Arc<Mutex<...>>` because work can migrate between threads. This adds:
11//! - Lock contention overhead
12//! - Complexity in reasoning about concurrent access
13//! - Risk of deadlocks
14//!
15//! With `messaging_thread_pool`, each pool item is **pinned to a single thread**
16//! for its entire lifetime. This means you can safely use:
17//! - `Rc<T>` instead of `Arc<T>`
18//! - `RefCell<T>` instead of `Mutex<T>`
19//! - Raw pointers and FFI resources that aren't `Send`/`Sync`
20//!
21//! ## Example Structure
22//!
23//! This example demonstrates:
24//! 1. A `UserSession` pool item that tracks user actions
25//! 2. A `HistoryTracker` helper struct that shares access to the session's log
26//! 3. Both structs share the same `Vec<String>` via `Rc<RefCell<...>>`
27//!
28//! ## Usage
29//!
30//! ```rust
31//! use messaging_thread_pool::{ThreadPool, samples::*};
32//!
33//! // Create a thread pool with 2 threads
34//! let thread_pool = ThreadPool::<UserSession>::new(2);
35//!
36//! // Create a session with ID 1
37//! thread_pool
38//! .send_and_receive(vec![UserSessionInit(1)].into_iter())
39//! .expect("session creation")
40//! .for_each(|_| {});
41//!
42//! // Log some actions - these are processed sequentially by the thread owning Session 1
43//! let counts: Vec<usize> = thread_pool
44//! .send_and_receive(vec![
45//! LogActionRequest(1, "Login".to_string()),
46//! LogActionRequest(1, "ViewProfile".to_string()),
47//! LogActionRequest(1, "Logout".to_string()),
48//! ].into_iter())
49//! .expect("actions")
50//! .map(|resp| resp.result)
51//! .collect();
52//!
53//! assert_eq!(counts, vec![1, 2, 3]);
54//!
55//! // Retrieve the full log
56//! let log: Vec<String> = thread_pool
57//! .send_and_receive(vec![GetLogRequest(1)].into_iter())
58//! .expect("get log")
59//! .next()
60//! .unwrap()
61//! .result;
62//!
63//! assert_eq!(log[0], "Action: Login");
64//! assert_eq!(log[1], "Action: ViewProfile");
65//! assert_eq!(log[2], "Action: Logout");
66//! ```
67
68use std::cell::RefCell;
69use std::rc::Rc;
70
71use crate::IdTargeted;
72use crate::pool_item;
73
74/// A helper struct that needs access to the session's data.
75///
76/// In a standard thread pool, this would likely need `Arc<Mutex<Vec<String>>>`.
77/// Here, we can use `Rc<RefCell<...>>` because `UserSession` never leaves its thread.
78///
79/// This pattern is useful when you have:
80/// - Helper objects that need to modify shared state
81/// - Multiple components within a pool item that need access to the same data
82/// - Complex internal structures that would be painful to wrap in `Arc<Mutex<...>>`
83#[derive(Debug, Clone)]
84pub struct HistoryTracker {
85 /// Shared access to the history log - no `Arc`, no `Mutex`!
86 log: Rc<RefCell<Vec<String>>>,
87}
88
89impl HistoryTracker {
90 /// Add an entry to the shared log.
91 ///
92 /// Note: No locks needed! Just `borrow_mut()`.
93 pub fn add_entry(&self, entry: String) {
94 self.log.borrow_mut().push(entry);
95 }
96}
97
98/// A user session that tracks actions performed by a user.
99///
100/// This is the main pool item. It demonstrates:
101/// - Owning non-`Send`/`Sync` data (`Rc<RefCell<...>>`)
102/// - Sharing data with helper structs (`HistoryTracker`)
103/// - Sequential message processing (messages to the same session are never concurrent)
104///
105/// ## Generated Types
106///
107/// The `#[pool_item]` macro on the impl block generates:
108/// - `UserSessionInit(u64)` - Request to create a new session
109/// - `UserSessionApi` - Enum of all message types
110/// - `LogActionRequest(u64, String)` / `LogActionResponse` - For the `log_action` method
111/// - `GetLogRequest(u64)` / `GetLogResponse` - For the `get_log` method
112#[derive(Debug)]
113pub struct UserSession {
114 id: u64,
115 /// We hold the data
116 log: Rc<RefCell<Vec<String>>>,
117 /// Our helper also holds a reference to the SAME data
118 tracker: HistoryTracker,
119}
120
121impl IdTargeted for UserSession {
122 fn id(&self) -> u64 {
123 self.id
124 }
125}
126
127/// The `#[pool_item]` macro transforms this impl block to:
128/// 1. Generate the `PoolItem` trait implementation
129/// 2. Generate request/response structs for each `#[messaging(...)]` method
130/// 3. Generate the `UserSessionApi` enum containing all message variants
131/// 4. Generate the `UserSessionInit` struct for creating new sessions
132#[pool_item]
133impl UserSession {
134 /// Creates a new UserSession.
135 ///
136 /// This method is called by the thread pool when a `UserSessionInit` request
137 /// is received. The session is created on the thread determined by `id % thread_count`.
138 ///
139 /// Note how we create an `Rc<RefCell<...>>` and clone it to share with the helper.
140 /// This would be impossible in a traditional thread pool where the item might
141 /// move between threads.
142 pub fn new(id: u64) -> Self {
143 let log = Rc::new(RefCell::new(Vec::new()));
144 let tracker = HistoryTracker { log: log.clone() };
145
146 Self { id, log, tracker }
147 }
148
149 /// Log a user action and return the total number of logged actions.
150 ///
151 /// The `#[messaging(LogActionRequest, LogActionResponse)]` attribute tells the macro to:
152 /// - Generate `LogActionRequest(u64, String)` - the id and the `action` parameter
153 /// - Generate `LogActionResponse { id: u64, result: usize }` - the id and return value
154 /// - Add a variant to `UserSessionApi` for this request/response pair
155 ///
156 /// Messages to the same session (same ID) are processed sequentially,
157 /// so there's no risk of concurrent modification of the log.
158 #[messaging(LogActionRequest, LogActionResponse)]
159 pub fn log_action(&self, action: String) -> usize {
160 // We use the helper to modify the state
161 self.tracker.add_entry(format!("Action: {}", action));
162
163 // We can read the state directly
164 self.log.borrow().len()
165 }
166
167 /// Retrieve the entire action history.
168 ///
169 /// Returns a clone of the log because the original `Vec` can't leave the thread
170 /// (it's behind an `Rc`).
171 #[messaging(GetLogRequest, GetLogResponse)]
172 pub fn get_log(&self) -> Vec<String> {
173 self.log.borrow().clone()
174 }
175}
176
177#[cfg(test)]
178mod tests {
179 use crate::ThreadPool;
180
181 use super::*;
182
183 #[test]
184 fn given_user_session_when_logging_actions_then_count_increments() {
185 let thread_pool = ThreadPool::<UserSession>::new(2);
186
187 // Create session
188 thread_pool
189 .send_and_receive(vec![UserSessionInit(1)].into_iter())
190 .expect("session creation")
191 .for_each(|_| {});
192
193 // Log actions and verify counts
194 let counts: Vec<usize> = thread_pool
195 .send_and_receive(
196 vec![
197 LogActionRequest(1, "Login".to_string()),
198 LogActionRequest(1, "ViewProfile".to_string()),
199 LogActionRequest(1, "Logout".to_string()),
200 ]
201 .into_iter(),
202 )
203 .expect("actions")
204 .map(|resp| resp.result)
205 .collect();
206
207 assert_eq!(counts, vec![1, 2, 3]);
208 }
209
210 #[test]
211 fn given_user_session_when_getting_log_then_returns_all_entries() {
212 let thread_pool = ThreadPool::<UserSession>::new(2);
213
214 // Create session
215 thread_pool
216 .send_and_receive(vec![UserSessionInit(1)].into_iter())
217 .expect("session creation")
218 .for_each(|_| {});
219
220 // Log actions
221 thread_pool
222 .send_and_receive(
223 vec![
224 LogActionRequest(1, "Login".to_string()),
225 LogActionRequest(1, "Logout".to_string()),
226 ]
227 .into_iter(),
228 )
229 .expect("actions")
230 .for_each(|_| {});
231
232 // Get log
233 let log = thread_pool
234 .send_and_receive(vec![GetLogRequest(1)].into_iter())
235 .expect("get log")
236 .next()
237 .unwrap()
238 .result;
239
240 assert_eq!(log.len(), 2);
241 assert_eq!(log[0], "Action: Login");
242 assert_eq!(log[1], "Action: Logout");
243 }
244
245 #[test]
246 fn given_multiple_sessions_when_logging_actions_then_each_has_independent_state() {
247 let thread_pool = ThreadPool::<UserSession>::new(2);
248
249 // Create two sessions
250 thread_pool
251 .send_and_receive(vec![UserSessionInit(1), UserSessionInit(2)].into_iter())
252 .expect("session creation")
253 .for_each(|_| {});
254
255 // Log to session 1
256 thread_pool
257 .send_and_receive(vec![LogActionRequest(1, "Action1".to_string())].into_iter())
258 .expect("action")
259 .for_each(|_| {});
260
261 // Log to session 2 (twice)
262 thread_pool
263 .send_and_receive(
264 vec![
265 LogActionRequest(2, "ActionA".to_string()),
266 LogActionRequest(2, "ActionB".to_string()),
267 ]
268 .into_iter(),
269 )
270 .expect("actions")
271 .for_each(|_| {});
272
273 // Verify independent logs
274 let log1 = thread_pool
275 .send_and_receive(vec![GetLogRequest(1)].into_iter())
276 .expect("get log")
277 .next()
278 .unwrap()
279 .result;
280
281 let log2 = thread_pool
282 .send_and_receive(vec![GetLogRequest(2)].into_iter())
283 .expect("get log")
284 .next()
285 .unwrap()
286 .result;
287
288 assert_eq!(log1.len(), 1);
289 assert_eq!(log2.len(), 2);
290 }
291}