archy/
app.rs

1use std::any::{Any, TypeId};
2use std::collections::HashMap;
3use std::future::Future;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7
8use async_channel::{Receiver, bounded};
9use parking_lot::Mutex;
10use tokio::sync::{broadcast, Semaphore};
11use tokio_util::sync::CancellationToken;
12
13use crate::{FromApp, Module, RestartPolicy, Schedule, Service, SystemResult};
14
15// --- System Types ---
16
17pub(crate) type BoxFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
18
19/// Future type for systems - returns SystemResult to enable error-based restarts
20pub(crate) type SystemFuture = Pin<Box<dyn Future<Output = SystemResult> + Send>>;
21
22/// A factory that can create system futures. Can be called multiple times (for Fixed schedules).
23/// Uses Arc for zero-overhead sharing: dereference is same cost as Box, clone only on restart.
24pub type SystemFactory = Arc<dyn Fn() -> SystemFuture + Send + Sync>;
25
26/// Creates system factories from the app context
27pub(crate) type SystemSpawner = Box<dyn FnOnce(&App, usize) -> Vec<SystemFactory> + Send>;
28
29/// Configuration for a system (parallels WorkerConfig for services)
30#[derive(Clone)]
31pub struct SystemConfig {
32    pub workers: usize,
33    pub restart: RestartPolicy,
34}
35
36impl SystemConfig {
37    /// Default number of workers per system.
38    pub const DEFAULT_WORKERS: usize = 1;
39}
40
41impl Default for SystemConfig {
42    fn default() -> Self {
43        Self {
44            workers: Self::DEFAULT_WORKERS,
45            restart: RestartPolicy::default(),
46        }
47    }
48}
49
50/// Descriptor for a registered system
51pub struct SystemDescriptor {
52    pub id: TypeId,
53    pub schedule: Schedule,
54    pub spawner: SystemSpawner,
55    pub(crate) overrides: SystemOverrides,
56}
57
58impl SystemDescriptor {
59    pub(crate) fn new(id: TypeId, schedule: Schedule, spawner: SystemSpawner) -> Self {
60        Self { id, schedule, spawner, overrides: SystemOverrides::default() }
61    }
62}
63
64// --- IntoSystem Trait ---
65
66pub trait IntoSystem<Args>: Send + 'static {
67    fn into_system(self) -> SystemSpawner;
68}
69
70// --- IntoSystemConfigs Trait ---
71
72/// Trait for tuple system registration via add_systems
73pub trait IntoSystemConfigs<Marker> {
74    fn into_descriptors(self, schedule: Schedule) -> Vec<SystemDescriptor>;
75}
76
77// --- Internal Types ---
78
79pub(crate) struct EventBus<E> {
80    pub(crate) sender: broadcast::Sender<E>,
81}
82
83type Closer = Box<dyn FnOnce() + Send>;
84type ChannelCreator = Box<dyn FnOnce(usize) -> (Arc<dyn Any + Send + Sync>, Arc<dyn Any + Send + Sync>) + Send>;
85type EventCreator = Box<dyn FnOnce(usize) -> Arc<dyn Any + Send + Sync> + Send>;
86type LifecycleHook = Box<dyn FnOnce() -> BoxFuture + Send>;
87
88/// Result of spawning a service - includes lifecycle hooks and worker futures
89struct ServiceSpawnResult {
90    startup: LifecycleHook,
91    workers: Vec<BoxFuture>,
92    shutdown: LifecycleHook,
93}
94
95type ServiceSpawner = Box<dyn FnOnce(&App, Arc<dyn Any + Send + Sync>, WorkerConfig) -> ServiceSpawnResult + Send>;
96
97#[derive(Clone)]
98pub(crate) struct WorkerConfig {
99    pub(crate) workers: usize,
100    pub(crate) capacity: usize,
101    pub(crate) concurrent: Option<usize>,
102    pub(crate) restart: RestartPolicy,
103}
104
105impl WorkerConfig {
106    /// Default number of workers per service.
107    pub const DEFAULT_WORKERS: usize = 1;
108    /// Default channel capacity for service message queues.
109    pub const DEFAULT_CAPACITY: usize = 32;
110}
111
112impl Default for WorkerConfig {
113    fn default() -> Self {
114        Self {
115            workers: Self::DEFAULT_WORKERS,
116            capacity: Self::DEFAULT_CAPACITY,
117            concurrent: None,
118            restart: RestartPolicy::default(),
119        }
120    }
121}
122
123// --- Overrides ---
124
125#[derive(Clone, Default)]
126struct ServiceOverrides {
127    workers: Option<usize>,
128    capacity: Option<usize>,
129    concurrent: Option<usize>,
130    restart: Option<RestartPolicy>,
131}
132
133#[derive(Clone, Default)]
134pub(crate) struct SystemOverrides {
135    workers: Option<usize>,
136    restart: Option<RestartPolicy>,
137}
138
139#[derive(Clone, Default)]
140struct EventOverrides {
141    capacity: Option<usize>,
142}
143
144#[derive(Clone)]
145pub(crate) struct EventConfig {
146    pub(crate) capacity: usize,
147}
148
149// --- Batch Registration Traits ---
150
151pub trait AddResources {
152    fn add_to(self, app: &mut App);
153}
154
155pub trait AddServices {
156    fn add_to(app: &mut App);
157}
158
159pub trait AddEvents {
160    fn add_to(app: &mut App);
161}
162
163// --- Config Builders ---
164
165pub struct ServiceConfigBuilder<'a> {
166    overrides: &'a mut ServiceOverrides,
167}
168
169impl ServiceConfigBuilder<'_> {
170    pub fn workers(self, n: usize) -> Self {
171        self.overrides.workers = Some(n);
172        self
173    }
174
175    pub fn capacity(self, n: usize) -> Self {
176        self.overrides.capacity = Some(n);
177        self
178    }
179
180    // Note: Setting concurrent to 0 is equivalent to `.sequential()`
181    pub fn concurrent(self, n: usize) -> Self {
182        self.overrides.concurrent = Some(n);
183        self
184    }
185
186    pub fn sequential(self) -> Self {
187        self.overrides.concurrent = Some(0);
188        self
189    }
190
191    pub fn restart(self, policy: RestartPolicy) -> Self {
192        self.overrides.restart = Some(policy);
193        self
194    }
195}
196
197pub struct SystemConfigBuilder<'a> {
198    overrides: &'a mut SystemOverrides,
199}
200
201impl SystemConfigBuilder<'_> {
202    pub fn workers(self, n: usize) -> Self {
203        self.overrides.workers = Some(n);
204        self
205    }
206
207    pub fn restart(self, policy: RestartPolicy) -> Self {
208        self.overrides.restart = Some(policy);
209        self
210    }
211}
212
213pub struct EventConfigBuilder<'a> {
214    overrides: &'a mut EventOverrides,
215}
216
217impl EventConfigBuilder<'_> {
218    pub fn capacity(self, n: usize) -> Self {
219        self.overrides.capacity = Some(n);
220        self
221    }
222}
223
224pub struct ShutdownConfigBuilder<'a> {
225    pub(crate) app: &'a mut App,
226}
227
228impl ShutdownConfigBuilder<'_> {
229    /// Force-abort workers if they don't drain within this duration.
230    pub fn timeout(self, duration: Duration) -> Self {
231        self.app.shutdown_timeout = Some(duration);
232        self
233    }
234}
235
236// --- Supervision Helpers ---
237
238/// Supervise service workers - restarts on panic only
239async fn supervise_service<F, Fut>(name: &'static str, restart: RestartPolicy, mut factory: F)
240where
241    F: FnMut() -> Fut,
242    Fut: Future<Output = ()> + Send + 'static,
243{
244    match restart {
245        RestartPolicy::Never => {
246            let _ = tokio::spawn(factory()).await;
247        }
248        RestartPolicy::Always => {
249            loop {
250                let result = tokio::spawn(factory()).await;
251                match result {
252                    Ok(()) => break,
253                    Err(e) if e.is_panic() => {
254                        tracing::warn!(target: "archy", task = %name, error = ?e, "task panicked, restarting");
255                    }
256                    Err(_) => break,
257                }
258            }
259        }
260        RestartPolicy::Attempts { max, reset_after } => {
261            let mut attempts = 0;
262            loop {
263                let start = Instant::now();
264                let result = tokio::spawn(factory()).await;
265                match result {
266                    Ok(()) => break,
267                    Err(e) if e.is_panic() => {
268                        if let Some(duration) = reset_after
269                            && start.elapsed() >= duration
270                        {
271                            attempts = 0;
272                        }
273                        attempts += 1;
274                        if attempts >= max {
275                            tracing::error!(target: "archy", task = %name, max_attempts = max, "max restart attempts reached");
276                            break;
277                        }
278                        tracing::warn!(target: "archy", task = %name, attempt = attempts, max_attempts = max, error = ?e, "task panicked, restarting");
279                    }
280                    Err(_) => break,
281                }
282            }
283        }
284    }
285}
286
287/// Supervise systems - restarts on panic OR error return
288async fn supervise_system<F, Fut>(name: &'static str, restart: RestartPolicy, mut factory: F)
289where
290    F: FnMut() -> Fut,
291    Fut: Future<Output = SystemResult> + Send + 'static,
292{
293    match restart {
294        RestartPolicy::Never => {
295            let result = tokio::spawn(factory()).await;
296            match result {
297                Ok(SystemResult::Ok) => {}
298                Ok(SystemResult::Err(e)) => {
299                    tracing::warn!(target: "archy", task = %name, error = %e, "system failed");
300                }
301                Err(e) if e.is_panic() => {
302                    tracing::error!(target: "archy", task = %name, error = ?e, "system panicked");
303                }
304                Err(_) => {}
305            }
306        }
307        RestartPolicy::Always => {
308            loop {
309                let result = tokio::spawn(factory()).await;
310                match result {
311                    Ok(SystemResult::Ok) => break,
312                    Ok(SystemResult::Err(e)) => {
313                        tracing::warn!(target: "archy", task = %name, error = %e, "system failed, restarting");
314                    }
315                    Err(e) if e.is_panic() => {
316                        tracing::warn!(target: "archy", task = %name, error = ?e, "system panicked, restarting");
317                    }
318                    Err(_) => break,
319                }
320            }
321        }
322        RestartPolicy::Attempts { max, reset_after } => {
323            let mut attempts = 0;
324            loop {
325                let start = Instant::now();
326                let result = tokio::spawn(factory()).await;
327
328                let should_restart = match result {
329                    Ok(SystemResult::Ok) => false,
330                    Ok(SystemResult::Err(e)) => {
331                        tracing::warn!(target: "archy", task = %name, error = %e, "system failed");
332                        true
333                    }
334                    Err(e) if e.is_panic() => {
335                        tracing::warn!(target: "archy", task = %name, error = ?e, "system panicked");
336                        true
337                    }
338                    Err(_) => false,
339                };
340
341                if !should_restart {
342                    break;
343                }
344
345                if let Some(duration) = reset_after
346                    && start.elapsed() >= duration
347                {
348                    attempts = 0;
349                }
350
351                attempts += 1;
352                if attempts >= max {
353                    tracing::error!(target: "archy", task = %name, max_attempts = max, "max restart attempts reached");
354                    break;
355                }
356
357                tracing::info!(target: "archy", task = %name, attempt = attempts, max_attempts = max, "restarting");
358            }
359        }
360    }
361}
362
363// --- App ---
364
365pub struct App {
366    pub(crate) resources: HashMap<TypeId, Arc<dyn Any + Send + Sync>>,
367    pub(crate) event_buses: HashMap<TypeId, Arc<dyn Any + Send + Sync>>,
368    pub(crate) service_senders: HashMap<TypeId, Arc<dyn Any + Send + Sync>>,
369
370    event_creators: HashMap<TypeId, EventCreator>,
371    channel_creators: HashMap<TypeId, ChannelCreator>,
372    service_spawners: HashMap<TypeId, ServiceSpawner>,
373
374    pub(crate) systems: Vec<SystemDescriptor>,
375
376    service_overrides: HashMap<TypeId, ServiceOverrides>,
377    event_overrides: HashMap<TypeId, EventOverrides>,
378
379    service_defaults: ServiceOverrides,
380    system_defaults: SystemOverrides,
381    event_defaults: EventOverrides,
382
383    closers: Arc<Mutex<Vec<Closer>>>,
384    shutdown_hooks: Vec<LifecycleHook>,
385    pub(crate) shutdown_token: CancellationToken,
386    pub(crate) shutdown_timeout: Option<Duration>,
387}
388
389impl App {
390    /// Default channel capacity for event buses.
391    pub const DEFAULT_EVENT_CAPACITY: usize = 32;
392
393    pub fn new() -> Self {
394        Self {
395            resources: HashMap::new(),
396            event_buses: HashMap::new(),
397            service_senders: HashMap::new(),
398            event_creators: HashMap::new(),
399            channel_creators: HashMap::new(),
400            service_spawners: HashMap::new(),
401            systems: Vec::new(),
402            service_overrides: HashMap::new(),
403            event_overrides: HashMap::new(),
404            service_defaults: ServiceOverrides::default(),
405            system_defaults: SystemOverrides::default(),
406            event_defaults: EventOverrides::default(),
407            closers: Arc::new(Mutex::new(Vec::new())),
408            shutdown_hooks: Vec::new(),
409            shutdown_token: CancellationToken::new(),
410            shutdown_timeout: None,
411        }
412    }
413
414    /// Configure shutdown behavior
415    pub fn shutdown(&mut self) -> ShutdownConfigBuilder<'_> {
416        ShutdownConfigBuilder { app: self }
417    }
418
419    /// Configure default settings for all services
420    pub fn service_defaults(&mut self) -> ServiceConfigBuilder<'_> {
421        ServiceConfigBuilder { overrides: &mut self.service_defaults }
422    }
423
424    /// Configure default settings for all systems
425    pub fn system_defaults(&mut self) -> SystemConfigBuilder<'_> {
426        SystemConfigBuilder { overrides: &mut self.system_defaults }
427    }
428
429    /// Configure default settings for all events
430    pub fn event_defaults(&mut self) -> EventConfigBuilder<'_> {
431        EventConfigBuilder { overrides: &mut self.event_defaults }
432    }
433
434    /// Extract a dependency from the App (convenience helper for Service::create)
435    pub fn extract<T: FromApp>(&self) -> T {
436        T::from_app(self)
437    }
438
439    pub fn add_module<M: Module>(&mut self, module: M) -> &mut Self {
440        module.register(self);
441        self
442    }
443
444    pub fn add_resource<T: Send + Sync + 'static>(&mut self, resource: T) -> &mut Self {
445        let id = TypeId::of::<T>();
446        assert!(!self.resources.contains_key(&id), "Resource {} already registered", std::any::type_name::<T>());
447        self.resources.insert(id, Arc::new(resource));
448        self
449    }
450
451    pub fn add_resources<T: AddResources>(&mut self, resources: T) -> &mut Self {
452        resources.add_to(self);
453        self
454    }
455
456    pub fn add_event<E: Clone + Send + 'static>(&mut self) -> EventConfigBuilder<'_> {
457        let id = TypeId::of::<E>();
458        assert!(!self.event_creators.contains_key(&id), "Event {} already registered", std::any::type_name::<E>());
459        self.event_creators.insert(id, Box::new(|capacity| {
460            let (tx, _) = broadcast::channel::<E>(capacity);
461            Arc::new(EventBus { sender: tx })
462        }));
463        EventConfigBuilder { overrides: self.event_overrides.entry(id).or_default() }
464    }
465
466    pub fn add_events<T: AddEvents>(&mut self) -> &mut Self {
467        T::add_to(self);
468        self
469    }
470
471    pub fn event<E: Clone + Send + 'static>(&mut self) -> EventConfigBuilder<'_> {
472        let id = TypeId::of::<E>();
473        assert!(self.event_creators.contains_key(&id), "Event {} not registered", std::any::type_name::<E>());
474        EventConfigBuilder { overrides: self.event_overrides.entry(id).or_default() }
475    }
476
477    pub fn add_service<S: Service>(&mut self) -> ServiceConfigBuilder<'_> {
478        let id = TypeId::of::<S>();
479        assert!(!self.channel_creators.contains_key(&id), "Service {} already registered", std::any::type_name::<S>());
480
481        let closers = self.closers.clone();
482        self.channel_creators.insert(id, Box::new(move |capacity| {
483            let (tx, rx) = bounded::<S::Message>(capacity);
484            let closer_tx = tx.clone();
485            closers.lock().push(Box::new(move || { closer_tx.close(); }));
486            (Arc::new(tx), Arc::new(rx))
487        }));
488
489        self.service_spawners.insert(id, Box::new(|app, rx_any, config| {
490            let rx = rx_any.downcast::<Receiver<S::Message>>().unwrap();
491            let service = Arc::new(S::create(app));
492
493            // Create startup hook
494            let startup_svc = service.clone();
495            let startup: LifecycleHook = Box::new(move || {
496                Box::pin(async move {
497                    startup_svc.startup().await;
498                })
499            });
500
501            // Create worker futures
502            let workers: Vec<BoxFuture> = (0..config.workers).map(|_| {
503                let rx = rx.as_ref().clone();
504                let svc = service.clone();
505                let restart = config.restart;
506
507                match config.concurrent {
508                    None => Box::pin(async move {
509                        supervise_service("service worker", restart, || {
510                            let rx = rx.clone();
511                            let svc = svc.clone();
512                            async move {
513                                while let Ok(msg) = rx.recv().await {
514                                    svc.clone().handle(msg).await;
515                                }
516                            }
517                        }).await;
518                    }) as BoxFuture,
519
520                    Some(max_concurrent) => {
521                        let semaphore = Arc::new(Semaphore::new(max_concurrent));
522                        Box::pin(async move {
523                            supervise_service("service worker", restart, || {
524                                let rx = rx.clone();
525                                let svc = svc.clone();
526                                let semaphore = semaphore.clone();
527                                async move {
528                                    while let Ok(msg) = rx.recv().await {
529                                        let permit = semaphore.clone().acquire_owned().await
530                                            .expect("semaphore closed unexpectedly");
531                                        let svc = svc.clone();
532                                        tokio::spawn(async move {
533                                            svc.handle(msg).await;
534                                            drop(permit);
535                                        });
536                                    }
537                                }
538                            }).await;
539                        }) as BoxFuture
540                    }
541                }
542            }).collect();
543
544            // Create shutdown hook
545            let shutdown_svc = service.clone();
546            let shutdown: LifecycleHook = Box::new(move || {
547                Box::pin(async move {
548                    shutdown_svc.shutdown().await;
549                })
550            });
551
552            ServiceSpawnResult { startup, workers, shutdown }
553        }));
554
555        ServiceConfigBuilder { overrides: self.service_overrides.entry(id).or_default() }
556    }
557
558    pub fn add_services<T: AddServices>(&mut self) -> &mut Self {
559        T::add_to(self);
560        self
561    }
562
563    pub fn service<S: Service>(&mut self) -> ServiceConfigBuilder<'_> {
564        let id = TypeId::of::<S>();
565        assert!(self.channel_creators.contains_key(&id), "Service {} not registered", std::any::type_name::<S>());
566        ServiceConfigBuilder { overrides: self.service_overrides.entry(id).or_default() }
567    }
568
569    /// Add a single system with the given schedule
570    pub fn add_system<F, Args>(&mut self, schedule: Schedule, system: F) -> SystemConfigBuilder<'_>
571    where
572        F: IntoSystem<Args>,
573    {
574        self.systems.push(SystemDescriptor::new(
575            TypeId::of::<F>(),
576            schedule,
577            system.into_system(),
578        ));
579        SystemConfigBuilder { overrides: &mut self.systems.last_mut().unwrap().overrides }
580    }
581
582    /// Add multiple systems with the given schedule: `add_systems(Schedule::Run, (sys1, sys2, sys3))`
583    pub fn add_systems<M, S>(&mut self, schedule: Schedule, systems: S) -> &mut Self
584    where
585        S: IntoSystemConfigs<M>,
586    {
587        self.systems.extend(systems.into_descriptors(schedule));
588        self
589    }
590
591    /// Configure a previously registered system
592    pub fn system<F, Args>(&mut self, _system: F) -> SystemConfigBuilder<'_>
593    where
594        F: IntoSystem<Args>,
595    {
596        let id = TypeId::of::<F>();
597        let desc = self.systems.iter_mut()
598            .find(|d| d.id == id)
599            .expect("System not found");
600        SystemConfigBuilder { overrides: &mut desc.overrides }
601    }
602
603    fn resolve_service(&self, id: TypeId) -> WorkerConfig {
604        let o = self.service_overrides.get(&id);
605        let d = &self.service_defaults;
606        WorkerConfig {
607            workers: o.and_then(|x| x.workers)
608                .or(d.workers)
609                .unwrap_or(WorkerConfig::DEFAULT_WORKERS),
610            capacity: o.and_then(|x| x.capacity)
611                .or(d.capacity)
612                .unwrap_or(WorkerConfig::DEFAULT_CAPACITY),
613            // concurrent(0) forces sequential mode (filters to None)
614            concurrent: o.and_then(|x| x.concurrent).or(d.concurrent).filter(|&n| n > 0),
615            restart: o.and_then(|x| x.restart)
616                .or(d.restart)
617                .unwrap_or_default(),
618        }
619    }
620
621    fn resolve_system(&self, overrides: &SystemOverrides) -> SystemConfig {
622        let d = &self.system_defaults;
623        SystemConfig {
624            workers: overrides.workers
625                .or(d.workers)
626                .unwrap_or(SystemConfig::DEFAULT_WORKERS),
627            restart: overrides.restart
628                .or(d.restart)
629                .unwrap_or_default(),
630        }
631    }
632
633    fn resolve_event(&self, id: TypeId) -> EventConfig {
634        let o = self.event_overrides.get(&id);
635        let d = &self.event_defaults;
636        EventConfig {
637            capacity: o.and_then(|x| x.capacity)
638                .or(d.capacity)
639                .unwrap_or(Self::DEFAULT_EVENT_CAPACITY),
640        }
641    }
642
643    /// Run the application until shutdown
644    pub async fn run(mut self) {
645        let mut service_handles = Vec::new();
646        let mut system_handles = Vec::new();
647
648        // Phase 0: Create event buses
649        let event_creators = std::mem::take(&mut self.event_creators);
650        for (id, creator) in event_creators {
651            let config = self.resolve_event(id);
652            self.event_buses.insert(id, creator(config.capacity));
653        }
654
655        // Phase 1: Create service channels
656        let creators = std::mem::take(&mut self.channel_creators);
657        let mut receivers: HashMap<TypeId, Arc<dyn Any + Send + Sync>> = HashMap::new();
658        for (id, creator) in creators {
659            let config = self.resolve_service(id);
660            let (tx, rx) = creator(config.capacity);
661            self.service_senders.insert(id, tx);
662            receivers.insert(id, rx);
663        }
664
665        // Phase 2: Run First systems (blocking)
666        let first = self.extract_systems(|s| matches!(s, Schedule::First));
667        self.run_phase_blocking(first).await;
668
669        // Phase 3: Create services and run startup hooks
670        let spawners = std::mem::take(&mut self.service_spawners);
671        let mut startup_futures = Vec::new();
672        let mut worker_batches = Vec::new();
673
674        for (id, spawner) in spawners {
675            let config = self.resolve_service(id);
676            let rx = receivers.remove(&id).unwrap();
677            let result = spawner(&self, rx, config);
678            startup_futures.push((result.startup)());
679            worker_batches.push(result.workers);
680            self.shutdown_hooks.push(result.shutdown);
681        }
682
683        // Run all service startups in parallel (panic = abort)
684        let startup_handles: Vec<_> = startup_futures.into_iter()
685            .map(|fut| tokio::spawn(fut))
686            .collect();
687        for handle in startup_handles {
688            handle.await.expect("service startup panicked");
689        }
690
691        // Phase 4: Spawn service workers
692        for workers in worker_batches {
693            for fut in workers {
694                service_handles.push(tokio::spawn(fut));
695            }
696        }
697
698        // Phase 5: Run Startup systems (blocking)
699        let startup = self.extract_systems(|s| matches!(s, Schedule::Startup));
700        self.run_phase_blocking(startup).await;
701
702        // Phase 6: Spawn Run systems (background)
703        let run = self.extract_systems(|s| matches!(s, Schedule::Run));
704        for desc in run {
705            let config = self.resolve_system(&desc.overrides);
706            for factory in (desc.spawner)(&self, config.workers) {
707                let restart = config.restart;
708                system_handles.push(tokio::spawn(async move {
709                    supervise_system("system", restart, || factory()).await;
710                }));
711            }
712        }
713
714        // Phase 7: Spawn Fixed systems (interval loops)
715        let fixed = self.extract_systems(|s| matches!(s, Schedule::Fixed(_)));
716        for desc in fixed {
717            let duration = match desc.schedule {
718                Schedule::Fixed(d) => d,
719                _ => unreachable!(),
720            };
721            let config = self.resolve_system(&desc.overrides);
722            let token = self.shutdown_token.clone();
723            for factory in (desc.spawner)(&self, config.workers) {
724                let token = token.clone();
725                let restart = config.restart;
726                system_handles.push(tokio::spawn(async move {
727                    supervise_system("fixed system", restart, || {
728                        let factory = factory.clone();
729                        let token = token.clone();
730                        async move {
731                            let mut interval = tokio::time::interval(duration);
732                            interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
733                            loop {
734                                tokio::select! {
735                                    _ = interval.tick() => {}
736                                    _ = token.cancelled() => return SystemResult::Ok,
737                                }
738                                match factory().await {
739                                    SystemResult::Ok => {}
740                                    err @ SystemResult::Err(_) => return err,
741                                }
742                            }
743                        }
744                    }).await;
745                }));
746            }
747        }
748
749        // Wait for shutdown
750        self.shutdown_token.cancelled().await;
751
752        // Phase 8: Run Shutdown systems (blocking)
753        let shutdown = self.extract_systems(|s| matches!(s, Schedule::Shutdown));
754        self.run_phase_blocking(shutdown).await;
755
756        // Phase 9: Run service shutdown hooks (parallel, with timeout)
757        let shutdown_hooks = std::mem::take(&mut self.shutdown_hooks);
758        let shutdown_handles: Vec<_> = shutdown_hooks.into_iter()
759            .map(|hook| tokio::spawn(hook()))
760            .collect();
761
762        let await_shutdowns = async {
763            for handle in shutdown_handles {
764                let _ = handle.await; // Don't panic on shutdown errors
765            }
766        };
767
768        if let Some(timeout) = self.shutdown_timeout {
769            if tokio::time::timeout(timeout, await_shutdowns).await.is_err() {
770                tracing::warn!(target: "archy", "service shutdown hooks timed out");
771            }
772        } else {
773            await_shutdowns.await;
774        }
775
776        // Phase 10: Close channels, abort systems, wait for services
777        let closers = std::mem::take(&mut *self.closers.lock());
778        for closer in closers { closer(); }
779
780        for handle in system_handles { handle.abort(); }
781
782        if let Some(timeout) = self.shutdown_timeout {
783            let abort_handles: Vec<_> = service_handles.iter().map(|h| h.abort_handle()).collect();
784            let drain = async { for h in service_handles { let _ = h.await; } };
785            if tokio::time::timeout(timeout, drain).await.is_err() {
786                tracing::warn!(target: "archy", "shutdown timeout, force-aborting workers");
787                for h in abort_handles { h.abort(); }
788            }
789        } else {
790            for h in service_handles { let _ = h.await; }
791        }
792
793        // Phase 11: Run Last systems (blocking)
794        let last = self.extract_systems(|s| matches!(s, Schedule::Last));
795        self.run_phase_blocking(last).await;
796    }
797
798    /// Extract systems matching predicate (removes from self.systems)
799    fn extract_systems(&mut self, predicate: impl Fn(&Schedule) -> bool) -> Vec<SystemDescriptor> {
800        let mut extracted = Vec::new();
801        let mut i = 0;
802        while i < self.systems.len() {
803            if predicate(&self.systems[i].schedule) {
804                extracted.push(self.systems.swap_remove(i));
805            } else {
806                i += 1;
807            }
808        }
809        extracted
810    }
811
812    /// Run systems and wait for completion
813    async fn run_phase_blocking(&self, systems: Vec<SystemDescriptor>) {
814        let mut handles = Vec::new();
815        for desc in systems {
816            let config = self.resolve_system(&desc.overrides);
817            for factory in (desc.spawner)(self, config.workers) {
818                let restart = config.restart;
819                handles.push(tokio::spawn(async move {
820                    supervise_system("system", restart, || factory()).await;
821                }));
822            }
823        }
824        for h in handles { let _ = h.await; }
825    }
826}
827
828impl Default for App {
829    fn default() -> Self { Self::new() }
830}