1use std::any::{Any, TypeId};
2use std::collections::HashMap;
3use std::future::Future;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7
8use async_channel::{Receiver, bounded};
9use parking_lot::Mutex;
10use tokio::sync::{broadcast, Semaphore};
11use tokio_util::sync::CancellationToken;
12
13use crate::{FromApp, Module, RestartPolicy, Schedule, Service, SystemResult};
14
15pub(crate) type BoxFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
18
19pub(crate) type SystemFuture = Pin<Box<dyn Future<Output = SystemResult> + Send>>;
21
22pub type SystemFactory = Arc<dyn Fn() -> SystemFuture + Send + Sync>;
25
26pub(crate) type SystemSpawner = Box<dyn FnOnce(&App, usize) -> Vec<SystemFactory> + Send>;
28
29#[derive(Clone)]
31pub struct SystemConfig {
32 pub workers: usize,
33 pub restart: RestartPolicy,
34}
35
36impl SystemConfig {
37 pub const DEFAULT_WORKERS: usize = 1;
39}
40
41impl Default for SystemConfig {
42 fn default() -> Self {
43 Self {
44 workers: Self::DEFAULT_WORKERS,
45 restart: RestartPolicy::default(),
46 }
47 }
48}
49
50pub struct SystemDescriptor {
52 pub id: TypeId,
53 pub schedule: Schedule,
54 pub spawner: SystemSpawner,
55 pub(crate) overrides: SystemOverrides,
56}
57
58impl SystemDescriptor {
59 pub(crate) fn new(id: TypeId, schedule: Schedule, spawner: SystemSpawner) -> Self {
60 Self { id, schedule, spawner, overrides: SystemOverrides::default() }
61 }
62}
63
64pub trait IntoSystem<Args>: Send + 'static {
67 fn into_system(self) -> SystemSpawner;
68}
69
70pub trait IntoSystemConfigs<Marker> {
74 fn into_descriptors(self, schedule: Schedule) -> Vec<SystemDescriptor>;
75}
76
77pub(crate) struct EventBus<E> {
80 pub(crate) sender: broadcast::Sender<E>,
81}
82
83type Closer = Box<dyn FnOnce() + Send>;
84type ChannelCreator = Box<dyn FnOnce(usize) -> (Arc<dyn Any + Send + Sync>, Arc<dyn Any + Send + Sync>) + Send>;
85type EventCreator = Box<dyn FnOnce(usize) -> Arc<dyn Any + Send + Sync> + Send>;
86type LifecycleHook = Box<dyn FnOnce() -> BoxFuture + Send>;
87
88struct ServiceSpawnResult {
90 startup: LifecycleHook,
91 workers: Vec<BoxFuture>,
92 shutdown: LifecycleHook,
93}
94
95type ServiceSpawner = Box<dyn FnOnce(&App, Arc<dyn Any + Send + Sync>, WorkerConfig) -> ServiceSpawnResult + Send>;
96
97#[derive(Clone)]
98pub(crate) struct WorkerConfig {
99 pub(crate) workers: usize,
100 pub(crate) capacity: usize,
101 pub(crate) concurrent: Option<usize>,
102 pub(crate) restart: RestartPolicy,
103}
104
105impl WorkerConfig {
106 pub const DEFAULT_WORKERS: usize = 1;
108 pub const DEFAULT_CAPACITY: usize = 32;
110}
111
112impl Default for WorkerConfig {
113 fn default() -> Self {
114 Self {
115 workers: Self::DEFAULT_WORKERS,
116 capacity: Self::DEFAULT_CAPACITY,
117 concurrent: None,
118 restart: RestartPolicy::default(),
119 }
120 }
121}
122
123#[derive(Clone, Default)]
126struct ServiceOverrides {
127 workers: Option<usize>,
128 capacity: Option<usize>,
129 concurrent: Option<usize>,
130 restart: Option<RestartPolicy>,
131}
132
133#[derive(Clone, Default)]
134pub(crate) struct SystemOverrides {
135 workers: Option<usize>,
136 restart: Option<RestartPolicy>,
137}
138
139#[derive(Clone, Default)]
140struct EventOverrides {
141 capacity: Option<usize>,
142}
143
144#[derive(Clone)]
145pub(crate) struct EventConfig {
146 pub(crate) capacity: usize,
147}
148
149pub trait AddResources {
152 fn add_to(self, app: &mut App);
153}
154
155pub trait AddServices {
156 fn add_to(app: &mut App);
157}
158
159pub trait AddEvents {
160 fn add_to(app: &mut App);
161}
162
163pub struct ServiceConfigBuilder<'a> {
166 overrides: &'a mut ServiceOverrides,
167}
168
169impl ServiceConfigBuilder<'_> {
170 pub fn workers(self, n: usize) -> Self {
171 self.overrides.workers = Some(n);
172 self
173 }
174
175 pub fn capacity(self, n: usize) -> Self {
176 self.overrides.capacity = Some(n);
177 self
178 }
179
180 pub fn concurrent(self, n: usize) -> Self {
182 self.overrides.concurrent = Some(n);
183 self
184 }
185
186 pub fn sequential(self) -> Self {
187 self.overrides.concurrent = Some(0);
188 self
189 }
190
191 pub fn restart(self, policy: RestartPolicy) -> Self {
192 self.overrides.restart = Some(policy);
193 self
194 }
195}
196
197pub struct SystemConfigBuilder<'a> {
198 overrides: &'a mut SystemOverrides,
199}
200
201impl SystemConfigBuilder<'_> {
202 pub fn workers(self, n: usize) -> Self {
203 self.overrides.workers = Some(n);
204 self
205 }
206
207 pub fn restart(self, policy: RestartPolicy) -> Self {
208 self.overrides.restart = Some(policy);
209 self
210 }
211}
212
213pub struct EventConfigBuilder<'a> {
214 overrides: &'a mut EventOverrides,
215}
216
217impl EventConfigBuilder<'_> {
218 pub fn capacity(self, n: usize) -> Self {
219 self.overrides.capacity = Some(n);
220 self
221 }
222}
223
224pub struct ShutdownConfigBuilder<'a> {
225 pub(crate) app: &'a mut App,
226}
227
228impl ShutdownConfigBuilder<'_> {
229 pub fn timeout(self, duration: Duration) -> Self {
231 self.app.shutdown_timeout = Some(duration);
232 self
233 }
234}
235
236async fn supervise_service<F, Fut>(name: &'static str, restart: RestartPolicy, mut factory: F)
240where
241 F: FnMut() -> Fut,
242 Fut: Future<Output = ()> + Send + 'static,
243{
244 match restart {
245 RestartPolicy::Never => {
246 let _ = tokio::spawn(factory()).await;
247 }
248 RestartPolicy::Always => {
249 loop {
250 let result = tokio::spawn(factory()).await;
251 match result {
252 Ok(()) => break,
253 Err(e) if e.is_panic() => {
254 tracing::warn!(target: "archy", task = %name, error = ?e, "task panicked, restarting");
255 }
256 Err(_) => break,
257 }
258 }
259 }
260 RestartPolicy::Attempts { max, reset_after } => {
261 let mut attempts = 0;
262 loop {
263 let start = Instant::now();
264 let result = tokio::spawn(factory()).await;
265 match result {
266 Ok(()) => break,
267 Err(e) if e.is_panic() => {
268 if let Some(duration) = reset_after
269 && start.elapsed() >= duration
270 {
271 attempts = 0;
272 }
273 attempts += 1;
274 if attempts >= max {
275 tracing::error!(target: "archy", task = %name, max_attempts = max, "max restart attempts reached");
276 break;
277 }
278 tracing::warn!(target: "archy", task = %name, attempt = attempts, max_attempts = max, error = ?e, "task panicked, restarting");
279 }
280 Err(_) => break,
281 }
282 }
283 }
284 }
285}
286
287async fn supervise_system<F, Fut>(name: &'static str, restart: RestartPolicy, mut factory: F)
289where
290 F: FnMut() -> Fut,
291 Fut: Future<Output = SystemResult> + Send + 'static,
292{
293 match restart {
294 RestartPolicy::Never => {
295 let result = tokio::spawn(factory()).await;
296 match result {
297 Ok(SystemResult::Ok) => {}
298 Ok(SystemResult::Err(e)) => {
299 tracing::warn!(target: "archy", task = %name, error = %e, "system failed");
300 }
301 Err(e) if e.is_panic() => {
302 tracing::error!(target: "archy", task = %name, error = ?e, "system panicked");
303 }
304 Err(_) => {}
305 }
306 }
307 RestartPolicy::Always => {
308 loop {
309 let result = tokio::spawn(factory()).await;
310 match result {
311 Ok(SystemResult::Ok) => break,
312 Ok(SystemResult::Err(e)) => {
313 tracing::warn!(target: "archy", task = %name, error = %e, "system failed, restarting");
314 }
315 Err(e) if e.is_panic() => {
316 tracing::warn!(target: "archy", task = %name, error = ?e, "system panicked, restarting");
317 }
318 Err(_) => break,
319 }
320 }
321 }
322 RestartPolicy::Attempts { max, reset_after } => {
323 let mut attempts = 0;
324 loop {
325 let start = Instant::now();
326 let result = tokio::spawn(factory()).await;
327
328 let should_restart = match result {
329 Ok(SystemResult::Ok) => false,
330 Ok(SystemResult::Err(e)) => {
331 tracing::warn!(target: "archy", task = %name, error = %e, "system failed");
332 true
333 }
334 Err(e) if e.is_panic() => {
335 tracing::warn!(target: "archy", task = %name, error = ?e, "system panicked");
336 true
337 }
338 Err(_) => false,
339 };
340
341 if !should_restart {
342 break;
343 }
344
345 if let Some(duration) = reset_after
346 && start.elapsed() >= duration
347 {
348 attempts = 0;
349 }
350
351 attempts += 1;
352 if attempts >= max {
353 tracing::error!(target: "archy", task = %name, max_attempts = max, "max restart attempts reached");
354 break;
355 }
356
357 tracing::info!(target: "archy", task = %name, attempt = attempts, max_attempts = max, "restarting");
358 }
359 }
360 }
361}
362
363pub struct App {
366 pub(crate) resources: HashMap<TypeId, Arc<dyn Any + Send + Sync>>,
367 pub(crate) event_buses: HashMap<TypeId, Arc<dyn Any + Send + Sync>>,
368 pub(crate) service_senders: HashMap<TypeId, Arc<dyn Any + Send + Sync>>,
369
370 event_creators: HashMap<TypeId, EventCreator>,
371 channel_creators: HashMap<TypeId, ChannelCreator>,
372 service_spawners: HashMap<TypeId, ServiceSpawner>,
373
374 pub(crate) systems: Vec<SystemDescriptor>,
375
376 service_overrides: HashMap<TypeId, ServiceOverrides>,
377 event_overrides: HashMap<TypeId, EventOverrides>,
378
379 service_defaults: ServiceOverrides,
380 system_defaults: SystemOverrides,
381 event_defaults: EventOverrides,
382
383 closers: Arc<Mutex<Vec<Closer>>>,
384 shutdown_hooks: Vec<LifecycleHook>,
385 pub(crate) shutdown_token: CancellationToken,
386 pub(crate) shutdown_timeout: Option<Duration>,
387}
388
389impl App {
390 pub const DEFAULT_EVENT_CAPACITY: usize = 32;
392
393 pub fn new() -> Self {
394 Self {
395 resources: HashMap::new(),
396 event_buses: HashMap::new(),
397 service_senders: HashMap::new(),
398 event_creators: HashMap::new(),
399 channel_creators: HashMap::new(),
400 service_spawners: HashMap::new(),
401 systems: Vec::new(),
402 service_overrides: HashMap::new(),
403 event_overrides: HashMap::new(),
404 service_defaults: ServiceOverrides::default(),
405 system_defaults: SystemOverrides::default(),
406 event_defaults: EventOverrides::default(),
407 closers: Arc::new(Mutex::new(Vec::new())),
408 shutdown_hooks: Vec::new(),
409 shutdown_token: CancellationToken::new(),
410 shutdown_timeout: None,
411 }
412 }
413
414 pub fn shutdown(&mut self) -> ShutdownConfigBuilder<'_> {
416 ShutdownConfigBuilder { app: self }
417 }
418
419 pub fn service_defaults(&mut self) -> ServiceConfigBuilder<'_> {
421 ServiceConfigBuilder { overrides: &mut self.service_defaults }
422 }
423
424 pub fn system_defaults(&mut self) -> SystemConfigBuilder<'_> {
426 SystemConfigBuilder { overrides: &mut self.system_defaults }
427 }
428
429 pub fn event_defaults(&mut self) -> EventConfigBuilder<'_> {
431 EventConfigBuilder { overrides: &mut self.event_defaults }
432 }
433
434 pub fn extract<T: FromApp>(&self) -> T {
436 T::from_app(self)
437 }
438
439 pub fn add_module<M: Module>(&mut self, module: M) -> &mut Self {
440 module.register(self);
441 self
442 }
443
444 pub fn add_resource<T: Send + Sync + 'static>(&mut self, resource: T) -> &mut Self {
445 let id = TypeId::of::<T>();
446 assert!(!self.resources.contains_key(&id), "Resource {} already registered", std::any::type_name::<T>());
447 self.resources.insert(id, Arc::new(resource));
448 self
449 }
450
451 pub fn add_resources<T: AddResources>(&mut self, resources: T) -> &mut Self {
452 resources.add_to(self);
453 self
454 }
455
456 pub fn add_event<E: Clone + Send + 'static>(&mut self) -> EventConfigBuilder<'_> {
457 let id = TypeId::of::<E>();
458 assert!(!self.event_creators.contains_key(&id), "Event {} already registered", std::any::type_name::<E>());
459 self.event_creators.insert(id, Box::new(|capacity| {
460 let (tx, _) = broadcast::channel::<E>(capacity);
461 Arc::new(EventBus { sender: tx })
462 }));
463 EventConfigBuilder { overrides: self.event_overrides.entry(id).or_default() }
464 }
465
466 pub fn add_events<T: AddEvents>(&mut self) -> &mut Self {
467 T::add_to(self);
468 self
469 }
470
471 pub fn event<E: Clone + Send + 'static>(&mut self) -> EventConfigBuilder<'_> {
472 let id = TypeId::of::<E>();
473 assert!(self.event_creators.contains_key(&id), "Event {} not registered", std::any::type_name::<E>());
474 EventConfigBuilder { overrides: self.event_overrides.entry(id).or_default() }
475 }
476
477 pub fn add_service<S: Service>(&mut self) -> ServiceConfigBuilder<'_> {
478 let id = TypeId::of::<S>();
479 assert!(!self.channel_creators.contains_key(&id), "Service {} already registered", std::any::type_name::<S>());
480
481 let closers = self.closers.clone();
482 self.channel_creators.insert(id, Box::new(move |capacity| {
483 let (tx, rx) = bounded::<S::Message>(capacity);
484 let closer_tx = tx.clone();
485 closers.lock().push(Box::new(move || { closer_tx.close(); }));
486 (Arc::new(tx), Arc::new(rx))
487 }));
488
489 self.service_spawners.insert(id, Box::new(|app, rx_any, config| {
490 let rx = rx_any.downcast::<Receiver<S::Message>>().unwrap();
491 let service = Arc::new(S::create(app));
492
493 let startup_svc = service.clone();
495 let startup: LifecycleHook = Box::new(move || {
496 Box::pin(async move {
497 startup_svc.startup().await;
498 })
499 });
500
501 let workers: Vec<BoxFuture> = (0..config.workers).map(|_| {
503 let rx = rx.as_ref().clone();
504 let svc = service.clone();
505 let restart = config.restart;
506
507 match config.concurrent {
508 None => Box::pin(async move {
509 supervise_service("service worker", restart, || {
510 let rx = rx.clone();
511 let svc = svc.clone();
512 async move {
513 while let Ok(msg) = rx.recv().await {
514 svc.clone().handle(msg).await;
515 }
516 }
517 }).await;
518 }) as BoxFuture,
519
520 Some(max_concurrent) => {
521 let semaphore = Arc::new(Semaphore::new(max_concurrent));
522 Box::pin(async move {
523 supervise_service("service worker", restart, || {
524 let rx = rx.clone();
525 let svc = svc.clone();
526 let semaphore = semaphore.clone();
527 async move {
528 while let Ok(msg) = rx.recv().await {
529 let permit = semaphore.clone().acquire_owned().await
530 .expect("semaphore closed unexpectedly");
531 let svc = svc.clone();
532 tokio::spawn(async move {
533 svc.handle(msg).await;
534 drop(permit);
535 });
536 }
537 }
538 }).await;
539 }) as BoxFuture
540 }
541 }
542 }).collect();
543
544 let shutdown_svc = service.clone();
546 let shutdown: LifecycleHook = Box::new(move || {
547 Box::pin(async move {
548 shutdown_svc.shutdown().await;
549 })
550 });
551
552 ServiceSpawnResult { startup, workers, shutdown }
553 }));
554
555 ServiceConfigBuilder { overrides: self.service_overrides.entry(id).or_default() }
556 }
557
558 pub fn add_services<T: AddServices>(&mut self) -> &mut Self {
559 T::add_to(self);
560 self
561 }
562
563 pub fn service<S: Service>(&mut self) -> ServiceConfigBuilder<'_> {
564 let id = TypeId::of::<S>();
565 assert!(self.channel_creators.contains_key(&id), "Service {} not registered", std::any::type_name::<S>());
566 ServiceConfigBuilder { overrides: self.service_overrides.entry(id).or_default() }
567 }
568
569 pub fn add_system<F, Args>(&mut self, schedule: Schedule, system: F) -> SystemConfigBuilder<'_>
571 where
572 F: IntoSystem<Args>,
573 {
574 self.systems.push(SystemDescriptor::new(
575 TypeId::of::<F>(),
576 schedule,
577 system.into_system(),
578 ));
579 SystemConfigBuilder { overrides: &mut self.systems.last_mut().unwrap().overrides }
580 }
581
582 pub fn add_systems<M, S>(&mut self, schedule: Schedule, systems: S) -> &mut Self
584 where
585 S: IntoSystemConfigs<M>,
586 {
587 self.systems.extend(systems.into_descriptors(schedule));
588 self
589 }
590
591 pub fn system<F, Args>(&mut self, _system: F) -> SystemConfigBuilder<'_>
593 where
594 F: IntoSystem<Args>,
595 {
596 let id = TypeId::of::<F>();
597 let desc = self.systems.iter_mut()
598 .find(|d| d.id == id)
599 .expect("System not found");
600 SystemConfigBuilder { overrides: &mut desc.overrides }
601 }
602
603 fn resolve_service(&self, id: TypeId) -> WorkerConfig {
604 let o = self.service_overrides.get(&id);
605 let d = &self.service_defaults;
606 WorkerConfig {
607 workers: o.and_then(|x| x.workers)
608 .or(d.workers)
609 .unwrap_or(WorkerConfig::DEFAULT_WORKERS),
610 capacity: o.and_then(|x| x.capacity)
611 .or(d.capacity)
612 .unwrap_or(WorkerConfig::DEFAULT_CAPACITY),
613 concurrent: o.and_then(|x| x.concurrent).or(d.concurrent).filter(|&n| n > 0),
615 restart: o.and_then(|x| x.restart)
616 .or(d.restart)
617 .unwrap_or_default(),
618 }
619 }
620
621 fn resolve_system(&self, overrides: &SystemOverrides) -> SystemConfig {
622 let d = &self.system_defaults;
623 SystemConfig {
624 workers: overrides.workers
625 .or(d.workers)
626 .unwrap_or(SystemConfig::DEFAULT_WORKERS),
627 restart: overrides.restart
628 .or(d.restart)
629 .unwrap_or_default(),
630 }
631 }
632
633 fn resolve_event(&self, id: TypeId) -> EventConfig {
634 let o = self.event_overrides.get(&id);
635 let d = &self.event_defaults;
636 EventConfig {
637 capacity: o.and_then(|x| x.capacity)
638 .or(d.capacity)
639 .unwrap_or(Self::DEFAULT_EVENT_CAPACITY),
640 }
641 }
642
643 pub async fn run(mut self) {
645 let mut service_handles = Vec::new();
646 let mut system_handles = Vec::new();
647
648 let event_creators = std::mem::take(&mut self.event_creators);
650 for (id, creator) in event_creators {
651 let config = self.resolve_event(id);
652 self.event_buses.insert(id, creator(config.capacity));
653 }
654
655 let creators = std::mem::take(&mut self.channel_creators);
657 let mut receivers: HashMap<TypeId, Arc<dyn Any + Send + Sync>> = HashMap::new();
658 for (id, creator) in creators {
659 let config = self.resolve_service(id);
660 let (tx, rx) = creator(config.capacity);
661 self.service_senders.insert(id, tx);
662 receivers.insert(id, rx);
663 }
664
665 let first = self.extract_systems(|s| matches!(s, Schedule::First));
667 self.run_phase_blocking(first).await;
668
669 let spawners = std::mem::take(&mut self.service_spawners);
671 let mut startup_futures = Vec::new();
672 let mut worker_batches = Vec::new();
673
674 for (id, spawner) in spawners {
675 let config = self.resolve_service(id);
676 let rx = receivers.remove(&id).unwrap();
677 let result = spawner(&self, rx, config);
678 startup_futures.push((result.startup)());
679 worker_batches.push(result.workers);
680 self.shutdown_hooks.push(result.shutdown);
681 }
682
683 let startup_handles: Vec<_> = startup_futures.into_iter()
685 .map(|fut| tokio::spawn(fut))
686 .collect();
687 for handle in startup_handles {
688 handle.await.expect("service startup panicked");
689 }
690
691 for workers in worker_batches {
693 for fut in workers {
694 service_handles.push(tokio::spawn(fut));
695 }
696 }
697
698 let startup = self.extract_systems(|s| matches!(s, Schedule::Startup));
700 self.run_phase_blocking(startup).await;
701
702 let run = self.extract_systems(|s| matches!(s, Schedule::Run));
704 for desc in run {
705 let config = self.resolve_system(&desc.overrides);
706 for factory in (desc.spawner)(&self, config.workers) {
707 let restart = config.restart;
708 system_handles.push(tokio::spawn(async move {
709 supervise_system("system", restart, || factory()).await;
710 }));
711 }
712 }
713
714 let fixed = self.extract_systems(|s| matches!(s, Schedule::Fixed(_)));
716 for desc in fixed {
717 let duration = match desc.schedule {
718 Schedule::Fixed(d) => d,
719 _ => unreachable!(),
720 };
721 let config = self.resolve_system(&desc.overrides);
722 let token = self.shutdown_token.clone();
723 for factory in (desc.spawner)(&self, config.workers) {
724 let token = token.clone();
725 let restart = config.restart;
726 system_handles.push(tokio::spawn(async move {
727 supervise_system("fixed system", restart, || {
728 let factory = factory.clone();
729 let token = token.clone();
730 async move {
731 let mut interval = tokio::time::interval(duration);
732 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
733 loop {
734 tokio::select! {
735 _ = interval.tick() => {}
736 _ = token.cancelled() => return SystemResult::Ok,
737 }
738 match factory().await {
739 SystemResult::Ok => {}
740 err @ SystemResult::Err(_) => return err,
741 }
742 }
743 }
744 }).await;
745 }));
746 }
747 }
748
749 self.shutdown_token.cancelled().await;
751
752 let shutdown = self.extract_systems(|s| matches!(s, Schedule::Shutdown));
754 self.run_phase_blocking(shutdown).await;
755
756 let shutdown_hooks = std::mem::take(&mut self.shutdown_hooks);
758 let shutdown_handles: Vec<_> = shutdown_hooks.into_iter()
759 .map(|hook| tokio::spawn(hook()))
760 .collect();
761
762 let await_shutdowns = async {
763 for handle in shutdown_handles {
764 let _ = handle.await; }
766 };
767
768 if let Some(timeout) = self.shutdown_timeout {
769 if tokio::time::timeout(timeout, await_shutdowns).await.is_err() {
770 tracing::warn!(target: "archy", "service shutdown hooks timed out");
771 }
772 } else {
773 await_shutdowns.await;
774 }
775
776 let closers = std::mem::take(&mut *self.closers.lock());
778 for closer in closers { closer(); }
779
780 for handle in system_handles { handle.abort(); }
781
782 if let Some(timeout) = self.shutdown_timeout {
783 let abort_handles: Vec<_> = service_handles.iter().map(|h| h.abort_handle()).collect();
784 let drain = async { for h in service_handles { let _ = h.await; } };
785 if tokio::time::timeout(timeout, drain).await.is_err() {
786 tracing::warn!(target: "archy", "shutdown timeout, force-aborting workers");
787 for h in abort_handles { h.abort(); }
788 }
789 } else {
790 for h in service_handles { let _ = h.await; }
791 }
792
793 let last = self.extract_systems(|s| matches!(s, Schedule::Last));
795 self.run_phase_blocking(last).await;
796 }
797
798 fn extract_systems(&mut self, predicate: impl Fn(&Schedule) -> bool) -> Vec<SystemDescriptor> {
800 let mut extracted = Vec::new();
801 let mut i = 0;
802 while i < self.systems.len() {
803 if predicate(&self.systems[i].schedule) {
804 extracted.push(self.systems.swap_remove(i));
805 } else {
806 i += 1;
807 }
808 }
809 extracted
810 }
811
812 async fn run_phase_blocking(&self, systems: Vec<SystemDescriptor>) {
814 let mut handles = Vec::new();
815 for desc in systems {
816 let config = self.resolve_system(&desc.overrides);
817 for factory in (desc.spawner)(self, config.workers) {
818 let restart = config.restart;
819 handles.push(tokio::spawn(async move {
820 supervise_system("system", restart, || factory()).await;
821 }));
822 }
823 }
824 for h in handles { let _ = h.await; }
825 }
826}
827
828impl Default for App {
829 fn default() -> Self { Self::new() }
830}