1use super::command::{StateTransition, WorldCommand};
46use super::traits::{ChannelCore, ChannelMut};
47use super::World;
48use std::sync::Arc;
49use tokio::sync::{mpsc, RwLock};
50use tracing::{debug, info, warn};
51
52const COMMAND_BUFFER_SIZE: usize = 256;
58
59pub type WorldCommandSender = mpsc::Sender<WorldCommand>;
61
62pub struct WorldManager {
90 world: Arc<RwLock<World>>,
92 command_rx: mpsc::Receiver<WorldCommand>,
94}
95
96impl WorldManager {
97 #[must_use]
108 pub fn new() -> (Self, WorldCommandSender) {
109 Self::with_world(World::new())
110 }
111
112 #[must_use]
126 pub fn with_world(world: World) -> (Self, WorldCommandSender) {
127 let (tx, rx) = mpsc::channel(COMMAND_BUFFER_SIZE);
128 let manager = Self {
129 world: Arc::new(RwLock::new(world)),
130 command_rx: rx,
131 };
132 (manager, tx)
133 }
134
135 #[must_use]
140 pub fn world(&self) -> Arc<RwLock<World>> {
141 Arc::clone(&self.world)
142 }
143
144 pub async fn run(mut self) {
162 info!("WorldManager started");
163
164 while let Some(cmd) = self.command_rx.recv().await {
165 if matches!(cmd, WorldCommand::Shutdown) {
166 info!("WorldManager received shutdown");
167 break;
168 }
169 self.apply_command(cmd).await;
170 }
171
172 info!("WorldManager stopped");
173 }
174
175 async fn apply_command(&self, cmd: WorldCommand) {
177 match cmd {
178 WorldCommand::Spawn {
179 parent,
180 config,
181 reply,
182 } => {
183 let result = {
184 let mut world = self.world.write().await;
185 world.spawn_with(parent, config)
186 };
187 debug!("Spawn command: parent={}, result={:?}", parent, result);
188 let _ = reply.send(result);
189 }
190
191 WorldCommand::SpawnWithId {
192 parent,
193 id,
194 config,
195 reply,
196 } => {
197 let result = {
198 let mut world = self.world.write().await;
199 world.spawn_with_id(parent, id, config)
200 };
201 debug!(
202 "SpawnWithId command: parent={}, id={}, success={}",
203 parent,
204 id,
205 result.is_some()
206 );
207 let _ = reply.send(result.is_some());
208 }
209
210 WorldCommand::Kill { id, reason } => {
211 let mut world = self.world.write().await;
212 world.kill(id, reason.clone());
213 debug!("Kill command: id={}, reason={}", id, reason);
214 }
215
216 WorldCommand::Complete { id, reply } => {
217 let result = {
218 let mut world = self.world.write().await;
219 world.complete(id)
220 };
221 debug!("Complete command: id={}, result={}", id, result);
222 let _ = reply.send(result);
223 }
224
225 WorldCommand::UpdateState {
226 id,
227 transition,
228 reply,
229 } => {
230 let result = {
231 let mut world = self.world.write().await;
232 if let Some(channel) = world.get_mut(&id) {
233 match transition {
234 StateTransition::Pause => channel.pause(),
235 StateTransition::Resume => channel.resume(),
236 StateTransition::AwaitApproval { request_id } => {
237 channel.await_approval(request_id)
238 }
239 StateTransition::ResolveApproval { approval_id } => {
240 channel.resolve_approval(&approval_id).is_some()
241 }
242 StateTransition::Abort { reason } => channel.abort(reason),
243 }
244 } else {
245 warn!("UpdateState: channel {} not found", id);
246 false
247 }
248 };
249 debug!("UpdateState command: id={}, result={}", id, result);
250 let _ = reply.send(result);
251 }
252
253 WorldCommand::GetState { id, reply } => {
254 let result = {
255 let world = self.world.read().await;
256 world.get(&id).map(|ch| ch.state().clone())
257 };
258 debug!("GetState command: id={}, found={}", id, result.is_some());
259 let _ = reply.send(result);
260 }
261
262 WorldCommand::Shutdown => {
263 }
265 }
266 }
267}
268
269#[cfg(test)]
270mod tests {
271 use super::*;
272 use crate::channel::{ChannelConfig, ChannelState};
273
274 #[tokio::test]
275 async fn manager_creation() {
276 let (manager, _tx) = WorldManager::new();
277 let world = manager.world();
278 let w = world.read().await;
279 assert_eq!(w.channel_count(), 0);
280 }
281
282 #[tokio::test]
283 async fn manager_with_existing_world() {
284 let mut world = World::new();
285 let io = world.create_channel(ChannelConfig::interactive());
286
287 let (manager, _tx) = WorldManager::with_world(world);
288 let w = manager.world();
289 let r = w.read().await;
290 assert!(r.get(&io).is_some());
291 }
292
293 #[tokio::test]
294 async fn spawn_command() {
295 let mut world = World::new();
296 let io = world.create_channel(ChannelConfig::interactive());
297
298 let (manager, tx) = WorldManager::with_world(world);
299 let world_handle = manager.world();
300
301 let manager_handle = tokio::spawn(manager.run());
303
304 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
306 tx.send(WorldCommand::Spawn {
307 parent: io,
308 config: ChannelConfig::background(),
309 reply: reply_tx,
310 })
311 .await
312 .expect("send Spawn command to WorldManager");
313
314 let child_id = reply_rx
316 .await
317 .expect("receive Spawn reply from WorldManager");
318 assert!(child_id.is_some());
319
320 {
322 let w = world_handle.read().await;
323 assert_eq!(w.channel_count(), 2);
324 }
325
326 tx.send(WorldCommand::Shutdown)
328 .await
329 .expect("send Shutdown command");
330 manager_handle
331 .await
332 .expect("WorldManager task should complete cleanly");
333 }
334
335 #[tokio::test]
336 async fn kill_command() {
337 let mut world = World::new();
338 let io = world.create_channel(ChannelConfig::interactive());
339 let child = world.spawn(io).expect("spawn child under IO channel");
340
341 let (manager, tx) = WorldManager::with_world(world);
342 let world_handle = manager.world();
343
344 let manager_handle = tokio::spawn(manager.run());
345
346 tx.send(WorldCommand::Kill {
348 id: child,
349 reason: "test".into(),
350 })
351 .await
352 .expect("send Kill command to WorldManager");
353
354 tokio::task::yield_now().await;
356
357 {
359 let w = world_handle.read().await;
360 assert_eq!(w.channel_count(), 1);
361 assert!(w.get(&child).is_none());
362 }
363
364 tx.send(WorldCommand::Shutdown)
365 .await
366 .expect("send Shutdown command");
367 manager_handle
368 .await
369 .expect("WorldManager task should complete cleanly after kill");
370 }
371
372 #[tokio::test]
373 async fn complete_command() {
374 let mut world = World::new();
375 let io = world.create_channel(ChannelConfig::interactive());
376
377 let (manager, tx) = WorldManager::with_world(world);
378 let world_handle = manager.world();
379
380 let manager_handle = tokio::spawn(manager.run());
381
382 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
384 tx.send(WorldCommand::Complete {
385 id: io,
386 reply: reply_tx,
387 })
388 .await
389 .expect("send Complete command to WorldManager");
390
391 let result = reply_rx
392 .await
393 .expect("receive Complete reply from WorldManager");
394 assert!(result);
395
396 {
398 let w = world_handle.read().await;
399 let ch = w.get(&io).expect("IO channel should exist after complete");
400 assert_eq!(ch.state(), &ChannelState::Completed);
401 }
402
403 tx.send(WorldCommand::Shutdown)
404 .await
405 .expect("send Shutdown command");
406 manager_handle
407 .await
408 .expect("WorldManager task should complete cleanly after complete");
409 }
410
411 #[tokio::test]
412 async fn update_state_pause_resume() {
413 let mut world = World::new();
414 let io = world.create_channel(ChannelConfig::interactive());
415
416 let (manager, tx) = WorldManager::with_world(world);
417 let world_handle = manager.world();
418
419 let manager_handle = tokio::spawn(manager.run());
420
421 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
423 tx.send(WorldCommand::UpdateState {
424 id: io,
425 transition: StateTransition::Pause,
426 reply: reply_tx,
427 })
428 .await
429 .expect("send Pause command to WorldManager");
430
431 assert!(reply_rx
432 .await
433 .expect("receive Pause reply from WorldManager"));
434
435 {
436 let w = world_handle.read().await;
437 assert!(w
438 .get(&io)
439 .expect("IO channel should exist after pause")
440 .is_paused());
441 }
442
443 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
445 tx.send(WorldCommand::UpdateState {
446 id: io,
447 transition: StateTransition::Resume,
448 reply: reply_tx,
449 })
450 .await
451 .expect("send Resume command to WorldManager");
452
453 assert!(reply_rx
454 .await
455 .expect("receive Resume reply from WorldManager"));
456
457 {
458 let w = world_handle.read().await;
459 assert!(w
460 .get(&io)
461 .expect("IO channel should exist after resume")
462 .is_running());
463 }
464
465 tx.send(WorldCommand::Shutdown)
466 .await
467 .expect("send Shutdown command");
468 manager_handle
469 .await
470 .expect("WorldManager task should complete cleanly after pause/resume");
471 }
472
473 #[tokio::test]
474 async fn get_state_command() {
475 let mut world = World::new();
476 let io = world.create_channel(ChannelConfig::interactive());
477
478 let (manager, tx) = WorldManager::with_world(world);
479
480 let manager_handle = tokio::spawn(manager.run());
481
482 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
484 tx.send(WorldCommand::GetState {
485 id: io,
486 reply: reply_tx,
487 })
488 .await
489 .expect("send GetState command for existing channel");
490
491 let state = reply_rx
492 .await
493 .expect("receive GetState reply for existing channel");
494 assert_eq!(state, Some(ChannelState::Running));
495
496 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
498 tx.send(WorldCommand::GetState {
499 id: orcs_types::ChannelId::new(),
500 reply: reply_tx,
501 })
502 .await
503 .expect("send GetState command for non-existing channel");
504
505 let state = reply_rx
506 .await
507 .expect("receive GetState reply for non-existing channel");
508 assert!(state.is_none());
509
510 tx.send(WorldCommand::Shutdown)
511 .await
512 .expect("send Shutdown command");
513 manager_handle
514 .await
515 .expect("WorldManager task should complete cleanly after get_state");
516 }
517
518 #[tokio::test]
519 async fn shutdown_command() {
520 let (manager, tx) = WorldManager::new();
521 let manager_handle = tokio::spawn(manager.run());
522
523 tx.send(WorldCommand::Shutdown)
524 .await
525 .expect("send Shutdown command");
526
527 let result =
529 tokio::time::timeout(std::time::Duration::from_millis(100), manager_handle).await;
530
531 assert!(result.is_ok());
532 }
533
534 #[tokio::test]
535 async fn concurrent_read_access() {
536 let mut world = World::new();
537 let io = world.create_channel(ChannelConfig::interactive());
538
539 let (manager, tx) = WorldManager::with_world(world);
540 let world1 = manager.world();
541 let world2 = manager.world();
542
543 let manager_handle = tokio::spawn(manager.run());
544
545 let (r1, r2) = tokio::join!(
547 async {
548 let w = world1.read().await;
549 w.get(&io).is_some()
550 },
551 async {
552 let w = world2.read().await;
553 w.channel_count()
554 }
555 );
556
557 assert!(r1);
558 assert_eq!(r2, 1);
559
560 tx.send(WorldCommand::Shutdown)
561 .await
562 .expect("send Shutdown command for concurrent_read test");
563 manager_handle
564 .await
565 .expect("WorldManager task should complete cleanly after concurrent reads");
566 }
567}