Skip to main content

orcs_runtime/channel/
manager.rs

1//! WorldManager - Concurrent World state management.
2//!
3//! [`WorldManager`] provides safe concurrent access to the [`World`](super::World)
4//! by processing state modifications through a command queue.
5//!
6//! # Architecture
7//!
8//! ```text
9//! ┌─────────────────────────────────────────────────────────────────┐
10//! │                        WorldManager                             │
11//! │                                                                 │
12//! │  ┌──────────────────┐    ┌─────────────────────────────────┐  │
13//! │  │ command_rx       │───►│ apply_command() loop            │  │
14//! │  │ (mpsc::Receiver) │    │ - Spawn/Kill/Complete/Update    │  │
15//! │  └──────────────────┘    └─────────────────────────────────┘  │
16//! │                                         │                      │
17//! │                                         ▼                      │
18//! │                          ┌─────────────────────────────┐      │
19//! │                          │ Arc<RwLock<World>>          │      │
20//! │                          │ - Sequential writes         │      │
21//! │                          │ - Parallel reads via clone  │      │
22//! │                          └─────────────────────────────┘      │
23//! └─────────────────────────────────────────────────────────────────┘
24//! ```
25//!
26//! # Usage
27//!
28//! ```ignore
29//! use orcs_runtime::channel::{WorldManager, WorldCommand};
30//!
31//! // Create manager
32//! let (manager, cmd_tx) = WorldManager::new();
33//!
34//! // Get read-only World access
35//! let world = manager.world();
36//!
37//! // Spawn manager task
38//! tokio::spawn(manager.run());
39//!
40//! // Send commands from any task
41//! let (reply_tx, reply_rx) = oneshot::channel();
42//! cmd_tx.send(WorldCommand::Spawn { ... }).await?;
43//! ```
44
45use super::command::{StateTransition, WorldCommand};
46use super::traits::{ChannelCore, ChannelMut};
47use super::World;
48use std::sync::Arc;
49use tokio::sync::{mpsc, RwLock};
50use tracing::{debug, info, warn};
51
52/// Default command channel buffer size.
53///
54/// 256 commands provides sufficient buffering for concurrent channel operations.
55/// Higher values increase memory usage; lower values may cause backpressure
56/// during burst spawn/kill operations.
57const COMMAND_BUFFER_SIZE: usize = 256;
58
59/// Handle for sending commands to [`WorldManager`].
60pub type WorldCommandSender = mpsc::Sender<WorldCommand>;
61
62/// Manages World state with concurrent access support.
63///
64/// `WorldManager` owns the `World` and processes state modifications
65/// through a command queue. This enables multiple [`ChannelRunner`]s
66/// to modify the World without holding locks.
67///
68/// [`ChannelRunner`]: super::ChannelRunner
69///
70/// # Thread Safety
71///
72/// - **Reads**: Via `Arc<RwLock<World>>` clone (parallel reads allowed)
73/// - **Writes**: Via command queue (sequential, no contention)
74///
75/// # Example
76///
77/// ```ignore
78/// let (manager, cmd_tx) = WorldManager::new();
79///
80/// // Clone for read access
81/// let world_read = manager.world();
82///
83/// // Spawn the manager loop
84/// tokio::spawn(manager.run());
85///
86/// // Send commands
87/// cmd_tx.send(WorldCommand::Kill { id, reason }).await?;
88/// ```
89pub struct WorldManager {
90    /// The managed World instance.
91    world: Arc<RwLock<World>>,
92    /// Receiver for incoming commands.
93    command_rx: mpsc::Receiver<WorldCommand>,
94}
95
96impl WorldManager {
97    /// Creates a new WorldManager with an empty World.
98    ///
99    /// Returns the manager and a sender for commands.
100    ///
101    /// # Example
102    ///
103    /// ```ignore
104    /// let (manager, cmd_tx) = WorldManager::new();
105    /// tokio::spawn(manager.run());
106    /// ```
107    #[must_use]
108    pub fn new() -> (Self, WorldCommandSender) {
109        Self::with_world(World::new())
110    }
111
112    /// Creates a WorldManager with an existing World.
113    ///
114    /// Use this when you need to initialize the World before
115    /// starting the manager (e.g., creating the IO channel).
116    ///
117    /// # Example
118    ///
119    /// ```ignore
120    /// let mut world = World::new();
121    /// let io = world.create_channel(ChannelConfig::interactive());
122    ///
123    /// let (manager, cmd_tx) = WorldManager::with_world(world);
124    /// ```
125    #[must_use]
126    pub fn with_world(world: World) -> (Self, WorldCommandSender) {
127        let (tx, rx) = mpsc::channel(COMMAND_BUFFER_SIZE);
128        let manager = Self {
129            world: Arc::new(RwLock::new(world)),
130            command_rx: rx,
131        };
132        (manager, tx)
133    }
134
135    /// Returns a clone of the World handle for read access.
136    ///
137    /// The returned `Arc<RwLock<World>>` can be shared across tasks
138    /// for concurrent read access.
139    #[must_use]
140    pub fn world(&self) -> Arc<RwLock<World>> {
141        Arc::clone(&self.world)
142    }
143
144    /// Runs the command processing loop.
145    ///
146    /// This method consumes the manager and runs until:
147    /// - A `Shutdown` command is received
148    /// - All command senders are dropped (channel closed)
149    ///
150    /// # Example
151    ///
152    /// ```ignore
153    /// let (manager, cmd_tx) = WorldManager::new();
154    /// let handle = tokio::spawn(manager.run());
155    ///
156    /// // ... use cmd_tx ...
157    ///
158    /// cmd_tx.send(WorldCommand::Shutdown).await?;
159    /// handle.await?;
160    /// ```
161    pub async fn run(mut self) {
162        info!("WorldManager started");
163
164        while let Some(cmd) = self.command_rx.recv().await {
165            if matches!(cmd, WorldCommand::Shutdown) {
166                info!("WorldManager received shutdown");
167                break;
168            }
169            self.apply_command(cmd).await;
170        }
171
172        info!("WorldManager stopped");
173    }
174
175    /// Applies a single command to the World.
176    async fn apply_command(&self, cmd: WorldCommand) {
177        match cmd {
178            WorldCommand::Spawn {
179                parent,
180                config,
181                reply,
182            } => {
183                let result = {
184                    let mut world = self.world.write().await;
185                    world.spawn_with(parent, config)
186                };
187                debug!("Spawn command: parent={}, result={:?}", parent, result);
188                let _ = reply.send(result);
189            }
190
191            WorldCommand::SpawnWithId {
192                parent,
193                id,
194                config,
195                reply,
196            } => {
197                let result = {
198                    let mut world = self.world.write().await;
199                    world.spawn_with_id(parent, id, config)
200                };
201                debug!(
202                    "SpawnWithId command: parent={}, id={}, success={}",
203                    parent,
204                    id,
205                    result.is_some()
206                );
207                let _ = reply.send(result.is_some());
208            }
209
210            WorldCommand::Kill { id, reason } => {
211                let mut world = self.world.write().await;
212                world.kill(id, reason.clone());
213                debug!("Kill command: id={}, reason={}", id, reason);
214            }
215
216            WorldCommand::Complete { id, reply } => {
217                let result = {
218                    let mut world = self.world.write().await;
219                    world.complete(id)
220                };
221                debug!("Complete command: id={}, result={}", id, result);
222                let _ = reply.send(result);
223            }
224
225            WorldCommand::UpdateState {
226                id,
227                transition,
228                reply,
229            } => {
230                let result = {
231                    let mut world = self.world.write().await;
232                    if let Some(channel) = world.get_mut(&id) {
233                        match transition {
234                            StateTransition::Pause => channel.pause(),
235                            StateTransition::Resume => channel.resume(),
236                            StateTransition::AwaitApproval { request_id } => {
237                                channel.await_approval(request_id)
238                            }
239                            StateTransition::ResolveApproval { approval_id } => {
240                                channel.resolve_approval(&approval_id).is_some()
241                            }
242                            StateTransition::Abort { reason } => channel.abort(reason),
243                        }
244                    } else {
245                        warn!("UpdateState: channel {} not found", id);
246                        false
247                    }
248                };
249                debug!("UpdateState command: id={}, result={}", id, result);
250                let _ = reply.send(result);
251            }
252
253            WorldCommand::GetState { id, reply } => {
254                let result = {
255                    let world = self.world.read().await;
256                    world.get(&id).map(|ch| ch.state().clone())
257                };
258                debug!("GetState command: id={}, found={}", id, result.is_some());
259                let _ = reply.send(result);
260            }
261
262            WorldCommand::Shutdown => {
263                // Handled in run() loop
264            }
265        }
266    }
267}
268
269#[cfg(test)]
270mod tests {
271    use super::*;
272    use crate::channel::{ChannelConfig, ChannelState};
273
274    #[tokio::test]
275    async fn manager_creation() {
276        let (manager, _tx) = WorldManager::new();
277        let world = manager.world();
278        let w = world.read().await;
279        assert_eq!(w.channel_count(), 0);
280    }
281
282    #[tokio::test]
283    async fn manager_with_existing_world() {
284        let mut world = World::new();
285        let io = world.create_channel(ChannelConfig::interactive());
286
287        let (manager, _tx) = WorldManager::with_world(world);
288        let w = manager.world();
289        let r = w.read().await;
290        assert!(r.get(&io).is_some());
291    }
292
293    #[tokio::test]
294    async fn spawn_command() {
295        let mut world = World::new();
296        let io = world.create_channel(ChannelConfig::interactive());
297
298        let (manager, tx) = WorldManager::with_world(world);
299        let world_handle = manager.world();
300
301        // Start manager
302        let manager_handle = tokio::spawn(manager.run());
303
304        // Send spawn command
305        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
306        tx.send(WorldCommand::Spawn {
307            parent: io,
308            config: ChannelConfig::background(),
309            reply: reply_tx,
310        })
311        .await
312        .expect("send Spawn command to WorldManager");
313
314        // Wait for reply
315        let child_id = reply_rx
316            .await
317            .expect("receive Spawn reply from WorldManager");
318        assert!(child_id.is_some());
319
320        // Verify in world
321        {
322            let w = world_handle.read().await;
323            assert_eq!(w.channel_count(), 2);
324        }
325
326        // Shutdown
327        tx.send(WorldCommand::Shutdown)
328            .await
329            .expect("send Shutdown command");
330        manager_handle
331            .await
332            .expect("WorldManager task should complete cleanly");
333    }
334
335    #[tokio::test]
336    async fn kill_command() {
337        let mut world = World::new();
338        let io = world.create_channel(ChannelConfig::interactive());
339        let child = world.spawn(io).expect("spawn child under IO channel");
340
341        let (manager, tx) = WorldManager::with_world(world);
342        let world_handle = manager.world();
343
344        let manager_handle = tokio::spawn(manager.run());
345
346        // Kill child
347        tx.send(WorldCommand::Kill {
348            id: child,
349            reason: "test".into(),
350        })
351        .await
352        .expect("send Kill command to WorldManager");
353
354        // Give time to process
355        tokio::task::yield_now().await;
356
357        // Verify
358        {
359            let w = world_handle.read().await;
360            assert_eq!(w.channel_count(), 1);
361            assert!(w.get(&child).is_none());
362        }
363
364        tx.send(WorldCommand::Shutdown)
365            .await
366            .expect("send Shutdown command");
367        manager_handle
368            .await
369            .expect("WorldManager task should complete cleanly after kill");
370    }
371
372    #[tokio::test]
373    async fn complete_command() {
374        let mut world = World::new();
375        let io = world.create_channel(ChannelConfig::interactive());
376
377        let (manager, tx) = WorldManager::with_world(world);
378        let world_handle = manager.world();
379
380        let manager_handle = tokio::spawn(manager.run());
381
382        // Complete IO channel
383        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
384        tx.send(WorldCommand::Complete {
385            id: io,
386            reply: reply_tx,
387        })
388        .await
389        .expect("send Complete command to WorldManager");
390
391        let result = reply_rx
392            .await
393            .expect("receive Complete reply from WorldManager");
394        assert!(result);
395
396        // Verify state
397        {
398            let w = world_handle.read().await;
399            let ch = w.get(&io).expect("IO channel should exist after complete");
400            assert_eq!(ch.state(), &ChannelState::Completed);
401        }
402
403        tx.send(WorldCommand::Shutdown)
404            .await
405            .expect("send Shutdown command");
406        manager_handle
407            .await
408            .expect("WorldManager task should complete cleanly after complete");
409    }
410
411    #[tokio::test]
412    async fn update_state_pause_resume() {
413        let mut world = World::new();
414        let io = world.create_channel(ChannelConfig::interactive());
415
416        let (manager, tx) = WorldManager::with_world(world);
417        let world_handle = manager.world();
418
419        let manager_handle = tokio::spawn(manager.run());
420
421        // Pause
422        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
423        tx.send(WorldCommand::UpdateState {
424            id: io,
425            transition: StateTransition::Pause,
426            reply: reply_tx,
427        })
428        .await
429        .expect("send Pause command to WorldManager");
430
431        assert!(reply_rx
432            .await
433            .expect("receive Pause reply from WorldManager"));
434
435        {
436            let w = world_handle.read().await;
437            assert!(w
438                .get(&io)
439                .expect("IO channel should exist after pause")
440                .is_paused());
441        }
442
443        // Resume
444        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
445        tx.send(WorldCommand::UpdateState {
446            id: io,
447            transition: StateTransition::Resume,
448            reply: reply_tx,
449        })
450        .await
451        .expect("send Resume command to WorldManager");
452
453        assert!(reply_rx
454            .await
455            .expect("receive Resume reply from WorldManager"));
456
457        {
458            let w = world_handle.read().await;
459            assert!(w
460                .get(&io)
461                .expect("IO channel should exist after resume")
462                .is_running());
463        }
464
465        tx.send(WorldCommand::Shutdown)
466            .await
467            .expect("send Shutdown command");
468        manager_handle
469            .await
470            .expect("WorldManager task should complete cleanly after pause/resume");
471    }
472
473    #[tokio::test]
474    async fn get_state_command() {
475        let mut world = World::new();
476        let io = world.create_channel(ChannelConfig::interactive());
477
478        let (manager, tx) = WorldManager::with_world(world);
479
480        let manager_handle = tokio::spawn(manager.run());
481
482        // Get existing
483        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
484        tx.send(WorldCommand::GetState {
485            id: io,
486            reply: reply_tx,
487        })
488        .await
489        .expect("send GetState command for existing channel");
490
491        let state = reply_rx
492            .await
493            .expect("receive GetState reply for existing channel");
494        assert_eq!(state, Some(ChannelState::Running));
495
496        // Get non-existing
497        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
498        tx.send(WorldCommand::GetState {
499            id: orcs_types::ChannelId::new(),
500            reply: reply_tx,
501        })
502        .await
503        .expect("send GetState command for non-existing channel");
504
505        let state = reply_rx
506            .await
507            .expect("receive GetState reply for non-existing channel");
508        assert!(state.is_none());
509
510        tx.send(WorldCommand::Shutdown)
511            .await
512            .expect("send Shutdown command");
513        manager_handle
514            .await
515            .expect("WorldManager task should complete cleanly after get_state");
516    }
517
518    #[tokio::test]
519    async fn shutdown_command() {
520        let (manager, tx) = WorldManager::new();
521        let manager_handle = tokio::spawn(manager.run());
522
523        tx.send(WorldCommand::Shutdown)
524            .await
525            .expect("send Shutdown command");
526
527        // Should complete without hanging
528        let result =
529            tokio::time::timeout(std::time::Duration::from_millis(100), manager_handle).await;
530
531        assert!(result.is_ok());
532    }
533
534    #[tokio::test]
535    async fn concurrent_read_access() {
536        let mut world = World::new();
537        let io = world.create_channel(ChannelConfig::interactive());
538
539        let (manager, tx) = WorldManager::with_world(world);
540        let world1 = manager.world();
541        let world2 = manager.world();
542
543        let manager_handle = tokio::spawn(manager.run());
544
545        // Concurrent reads should not block
546        let (r1, r2) = tokio::join!(
547            async {
548                let w = world1.read().await;
549                w.get(&io).is_some()
550            },
551            async {
552                let w = world2.read().await;
553                w.channel_count()
554            }
555        );
556
557        assert!(r1);
558        assert_eq!(r2, 1);
559
560        tx.send(WorldCommand::Shutdown)
561            .await
562            .expect("send Shutdown command for concurrent_read test");
563        manager_handle
564            .await
565            .expect("WorldManager task should complete cleanly after concurrent reads");
566    }
567}