1use std::any::{Any, TypeId};
2use std::collections::HashMap;
3use std::future::Future;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7
8use async_channel::{Receiver, bounded};
9use parking_lot::Mutex;
10use tokio::sync::{broadcast, Semaphore};
11use tokio_util::sync::CancellationToken;
12
13use crate::{FromApp, Module, RestartPolicy, Schedule, Service};
14
15pub(crate) type BoxFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
18
19pub type SystemFactory = Arc<dyn Fn() -> BoxFuture + Send + Sync>;
22
23pub(crate) type SystemSpawner = Box<dyn FnOnce(&App, usize) -> Vec<SystemFactory> + Send>;
25
26#[derive(Clone)]
28pub struct SystemConfig {
29 pub workers: usize,
30 pub restart: RestartPolicy,
31}
32
33impl Default for SystemConfig {
34 fn default() -> Self {
35 Self { workers: 1, restart: RestartPolicy::default() }
36 }
37}
38
39pub struct SystemDescriptor {
41 pub id: TypeId,
42 pub schedule: Schedule,
43 pub spawner: SystemSpawner,
44 pub config: SystemConfig,
45}
46
47pub trait IntoSystem<Args>: Send + 'static {
50 fn into_system(self) -> SystemSpawner;
51}
52
53pub trait IntoSystemConfigs<Marker> {
57 fn into_descriptors(self, schedule: Schedule) -> Vec<SystemDescriptor>;
58}
59
60pub(crate) struct EventBus<E> {
63 pub(crate) sender: broadcast::Sender<E>,
64}
65
66type Closer = Box<dyn FnOnce() + Send>;
67type ChannelCreator = Box<dyn FnOnce(usize) -> (Arc<dyn Any + Send + Sync>, Arc<dyn Any + Send + Sync>) + Send>;
68type EventCreator = Box<dyn FnOnce(usize) -> Arc<dyn Any + Send + Sync> + Send>;
69type ServiceSpawner = Box<dyn FnOnce(&App, Arc<dyn Any + Send + Sync>, WorkerConfig) -> Vec<BoxFuture> + Send>;
70
71#[derive(Clone)]
72pub(crate) struct WorkerConfig {
73 pub(crate) workers: usize,
74 pub(crate) capacity: usize,
75 pub(crate) concurrent: Option<usize>,
76 pub(crate) restart: RestartPolicy,
77}
78
79impl Default for WorkerConfig {
80 fn default() -> Self {
81 Self { workers: 1, capacity: 64, concurrent: None, restart: RestartPolicy::default() }
82 }
83}
84
85#[derive(Clone)]
86pub(crate) struct EventConfig {
87 pub(crate) capacity: usize,
88}
89
90pub trait AddResources {
93 fn add_to(self, app: &mut App);
94}
95
96pub trait AddServices {
97 fn add_to(app: &mut App);
98}
99
100pub trait AddEvents {
101 fn add_to(app: &mut App);
102}
103
104pub struct ServiceConfigBuilder<'a> {
107 pub(crate) app: &'a mut App,
108 pub(crate) id: TypeId,
109}
110
111impl ServiceConfigBuilder<'_> {
112 pub fn workers(self, n: usize) -> Self {
113 self.app.service_configs.get_mut(&self.id).unwrap().workers = n;
114 self
115 }
116 pub fn capacity(self, n: usize) -> Self {
117 self.app.service_configs.get_mut(&self.id).unwrap().capacity = n;
118 self
119 }
120 pub fn concurrent(self, max_per_worker: usize) -> Self {
122 self.app.service_configs.get_mut(&self.id).unwrap().concurrent = Some(max_per_worker);
123 self
124 }
125 pub fn restart(self, policy: RestartPolicy) -> Self {
126 self.app.service_configs.get_mut(&self.id).unwrap().restart = policy;
127 self
128 }
129}
130
131pub struct SystemConfigBuilder<'a> {
132 pub(crate) app: &'a mut App,
133 pub(crate) id: TypeId,
134}
135
136impl SystemConfigBuilder<'_> {
137 fn get_config_mut(&mut self) -> &mut SystemConfig {
138 &mut self.app.systems.iter_mut()
139 .find(|d| d.id == self.id)
140 .expect("System not found")
141 .config
142 }
143
144 pub fn workers(mut self, n: usize) -> Self {
145 self.get_config_mut().workers = n;
146 self
147 }
148
149 pub fn restart(mut self, policy: RestartPolicy) -> Self {
150 self.get_config_mut().restart = policy;
151 self
152 }
153}
154
155pub struct EventConfigBuilder<'a> {
156 pub(crate) app: &'a mut App,
157 pub(crate) id: TypeId,
158}
159
160impl EventConfigBuilder<'_> {
161 pub fn capacity(self, n: usize) -> Self {
162 self.app.event_configs.get_mut(&self.id).unwrap().capacity = n;
163 self
164 }
165}
166
167pub struct ShutdownConfigBuilder<'a> {
168 pub(crate) app: &'a mut App,
169}
170
171impl ShutdownConfigBuilder<'_> {
172 pub fn timeout(self, duration: Duration) -> Self {
174 self.app.shutdown_timeout = Some(duration);
175 self
176 }
177}
178
179async fn supervise<F, Fut>(name: &str, restart: RestartPolicy, mut factory: F)
183where
184 F: FnMut() -> Fut,
185 Fut: Future<Output = ()> + Send + 'static,
186{
187 match restart {
188 RestartPolicy::Never => {
189 let _ = tokio::spawn(factory()).await;
190 }
191 RestartPolicy::Always => {
192 loop {
193 let result = tokio::spawn(factory()).await;
194 match result {
195 Ok(()) => break,
196 Err(e) if e.is_panic() => {
197 eprintln!("[archy] {} panicked, restarting: {:?}", name, e);
198 }
199 Err(_) => break,
200 }
201 }
202 }
203 RestartPolicy::Attempts { max, reset_after } => {
204 let mut attempts = 0;
205 loop {
206 let start = Instant::now();
207 let result = tokio::spawn(factory()).await;
208 match result {
209 Ok(()) => break,
210 Err(e) if e.is_panic() => {
211 if let Some(duration) = reset_after
212 && start.elapsed() >= duration
213 {
214 attempts = 0;
215 }
216 attempts += 1;
217 if attempts >= max {
218 eprintln!("[archy] {}: max restart attempts ({}) reached", name, max);
219 break;
220 }
221 eprintln!("[archy] {} panicked, restarting ({}/{}): {:?}", name, attempts, max, e);
222 }
223 Err(_) => break,
224 }
225 }
226 }
227 }
228}
229
230pub struct App {
233 pub(crate) resources: HashMap<TypeId, Arc<dyn Any + Send + Sync>>,
234 pub(crate) event_buses: HashMap<TypeId, Arc<dyn Any + Send + Sync>>,
235 pub(crate) service_senders: HashMap<TypeId, Arc<dyn Any + Send + Sync>>,
236
237 event_creators: HashMap<TypeId, EventCreator>,
238 channel_creators: HashMap<TypeId, ChannelCreator>,
239 service_spawners: HashMap<TypeId, ServiceSpawner>,
240
241 pub(crate) systems: Vec<SystemDescriptor>,
242
243 pub(crate) service_configs: HashMap<TypeId, WorkerConfig>,
244 pub(crate) event_configs: HashMap<TypeId, EventConfig>,
245
246 closers: Arc<Mutex<Vec<Closer>>>,
247 pub(crate) shutdown_token: CancellationToken,
248 pub(crate) shutdown_timeout: Option<Duration>,
249 default_capacity: usize,
250}
251
252impl App {
253 pub fn new() -> Self {
254 Self {
255 resources: HashMap::new(),
256 event_buses: HashMap::new(),
257 service_senders: HashMap::new(),
258 event_creators: HashMap::new(),
259 channel_creators: HashMap::new(),
260 service_spawners: HashMap::new(),
261 systems: Vec::new(),
262 service_configs: HashMap::new(),
263 event_configs: HashMap::new(),
264 closers: Arc::new(Mutex::new(Vec::new())),
265 shutdown_token: CancellationToken::new(),
266 shutdown_timeout: None,
267 default_capacity: 64,
268 }
269 }
270
271 pub fn shutdown(&mut self) -> ShutdownConfigBuilder<'_> {
273 ShutdownConfigBuilder { app: self }
274 }
275
276 pub fn extract<T: FromApp>(&self) -> T {
278 T::from_app(self)
279 }
280
281 pub fn add_module<M: Module>(&mut self, module: M) -> &mut Self {
282 module.register(self);
283 self
284 }
285
286 pub fn add_resource<T: Send + Sync + 'static>(&mut self, resource: T) -> &mut Self {
287 let id = TypeId::of::<T>();
288 assert!(!self.resources.contains_key(&id), "Resource {} already registered", std::any::type_name::<T>());
289 self.resources.insert(id, Arc::new(resource));
290 self
291 }
292
293 pub fn add_resources<T: AddResources>(&mut self, resources: T) -> &mut Self {
294 resources.add_to(self);
295 self
296 }
297
298 pub fn add_event<E: Clone + Send + 'static>(&mut self) -> &mut Self {
299 let id = TypeId::of::<E>();
300 assert!(!self.event_creators.contains_key(&id), "Event {} already registered", std::any::type_name::<E>());
301 self.event_configs.insert(id, EventConfig { capacity: self.default_capacity });
302 self.event_creators.insert(id, Box::new(|capacity| {
303 let (tx, _) = broadcast::channel::<E>(capacity);
304 Arc::new(EventBus { sender: tx })
305 }));
306 self
307 }
308
309 pub fn add_events<T: AddEvents>(&mut self) -> &mut Self {
310 T::add_to(self);
311 self
312 }
313
314 pub fn event<E: Clone + Send + 'static>(&mut self) -> EventConfigBuilder<'_> {
315 let id = TypeId::of::<E>();
316 assert!(self.event_configs.contains_key(&id), "Event {} not registered", std::any::type_name::<E>());
317 EventConfigBuilder { app: self, id }
318 }
319
320 pub fn add_service<S: Service>(&mut self) -> ServiceConfigBuilder<'_> {
321 let id = TypeId::of::<S>();
322 assert!(!self.service_configs.contains_key(&id), "Service {} already registered", std::any::type_name::<S>());
323
324 self.service_configs.insert(id, WorkerConfig::default());
325
326 let closers = self.closers.clone();
327 self.channel_creators.insert(id, Box::new(move |capacity| {
328 let (tx, rx) = bounded::<S::Message>(capacity);
329 let closer_tx = tx.clone();
330 closers.lock().push(Box::new(move || { closer_tx.close(); }));
331 (Arc::new(tx), Arc::new(rx))
332 }));
333
334 self.service_spawners.insert(id, Box::new(|app, rx_any, config| {
335 let rx = rx_any.downcast::<Receiver<S::Message>>().unwrap();
336 let service = Arc::new(S::create(app));
337
338 (0..config.workers).map(|_| {
339 let rx = rx.as_ref().clone();
340 let svc = service.clone();
341 let restart = config.restart;
342
343 match config.concurrent {
344 None => Box::pin(async move {
346 supervise("service worker", restart, || {
347 let rx = rx.clone();
348 let svc = svc.clone();
349 async move {
350 while let Ok(msg) = rx.recv().await {
351 svc.clone().handle(msg).await;
352 }
353 }
354 }).await;
355 }) as BoxFuture,
356
357 Some(max_concurrent) => {
359 let semaphore = Arc::new(Semaphore::new(max_concurrent));
360 Box::pin(async move {
361 supervise("service worker", restart, || {
362 let rx = rx.clone();
363 let svc = svc.clone();
364 let semaphore = semaphore.clone();
365 async move {
366 while let Ok(msg) = rx.recv().await {
367 let permit = semaphore.clone().acquire_owned().await
368 .expect("semaphore closed unexpectedly");
369 let svc = svc.clone();
370 tokio::spawn(async move {
371 svc.handle(msg).await;
372 drop(permit);
373 });
374 }
375 }
376 }).await;
377 }) as BoxFuture
378 }
379 }
380 }).collect()
381 }));
382
383 ServiceConfigBuilder { app: self, id }
384 }
385
386 pub fn add_services<T: AddServices>(&mut self) -> &mut Self {
387 T::add_to(self);
388 self
389 }
390
391 pub fn service<S: Service>(&mut self) -> ServiceConfigBuilder<'_> {
392 let id = TypeId::of::<S>();
393 assert!(self.service_configs.contains_key(&id), "Service {} not registered", std::any::type_name::<S>());
394 ServiceConfigBuilder { app: self, id }
395 }
396
397 pub fn add_system<F, Args>(&mut self, schedule: Schedule, system: F) -> &mut Self
399 where
400 F: IntoSystem<Args>,
401 {
402 self.systems.push(SystemDescriptor {
403 id: TypeId::of::<F>(),
404 schedule,
405 spawner: system.into_system(),
406 config: SystemConfig::default(),
407 });
408 self
409 }
410
411 pub fn add_systems<M, S>(&mut self, schedule: Schedule, systems: S) -> &mut Self
413 where
414 S: IntoSystemConfigs<M>,
415 {
416 self.systems.extend(systems.into_descriptors(schedule));
417 self
418 }
419
420 pub fn system<F, Args>(&mut self, _system: F) -> SystemConfigBuilder<'_>
422 where
423 F: IntoSystem<Args>,
424 {
425 SystemConfigBuilder { app: self, id: TypeId::of::<F>() }
426 }
427
428 pub async fn run(mut self) {
430 let mut service_handles = Vec::new();
431 let mut system_handles = Vec::new();
432
433 let event_creators = std::mem::take(&mut self.event_creators);
435 for (id, creator) in event_creators {
436 let capacity = self.event_configs.get(&id).map(|c| c.capacity).unwrap_or(self.default_capacity);
437 self.event_buses.insert(id, creator(capacity));
438 }
439
440 let creators = std::mem::take(&mut self.channel_creators);
442 let mut receivers: HashMap<TypeId, Arc<dyn Any + Send + Sync>> = HashMap::new();
443 for (id, creator) in creators {
444 let config = self.service_configs.get(&id).cloned().unwrap_or_default();
445 let (tx, rx) = creator(config.capacity);
446 self.service_senders.insert(id, tx);
447 receivers.insert(id, rx);
448 }
449
450 let first = self.extract_systems(|s| matches!(s, Schedule::First));
452 self.run_phase_blocking(first).await;
453
454 let spawners = std::mem::take(&mut self.service_spawners);
456 for (id, spawner) in spawners {
457 let config = self.service_configs.get(&id).cloned().unwrap_or_default();
458 let rx = receivers.remove(&id).unwrap();
459 for fut in spawner(&self, rx, config) {
460 service_handles.push(tokio::spawn(fut));
461 }
462 }
463
464 let startup = self.extract_systems(|s| matches!(s, Schedule::Startup));
466 self.run_phase_blocking(startup).await;
467
468 let run = self.extract_systems(|s| matches!(s, Schedule::Run));
470 for desc in run {
471 for factory in (desc.spawner)(&self, desc.config.workers) {
472 system_handles.push(tokio::spawn(Self::supervise_system(factory, desc.config.restart)));
473 }
474 }
475
476 let fixed = self.extract_systems(|s| matches!(s, Schedule::Fixed(_)));
478 for desc in fixed {
479 let duration = match desc.schedule {
480 Schedule::Fixed(d) => d,
481 _ => unreachable!(),
482 };
483 let token = self.shutdown_token.clone();
484 let restart = desc.config.restart;
485 for factory in (desc.spawner)(&self, desc.config.workers) {
486 let token = token.clone();
487 system_handles.push(tokio::spawn(async move {
488 supervise("fixed system", restart, || {
489 let factory = factory.clone(); let token = token.clone();
491 async move {
492 let mut interval = tokio::time::interval(duration);
493 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
494 loop {
495 tokio::select! {
496 _ = interval.tick() => {}
497 _ = token.cancelled() => break,
498 }
499 factory().await;
500 }
501 }
502 }).await;
503 }));
504 }
505 }
506
507 self.shutdown_token.cancelled().await;
509
510 let shutdown = self.extract_systems(|s| matches!(s, Schedule::Shutdown));
512 self.run_phase_blocking(shutdown).await;
513
514 let closers = std::mem::take(&mut *self.closers.lock());
516 for closer in closers { closer(); }
517
518 for handle in system_handles { handle.abort(); }
519
520 if let Some(timeout) = self.shutdown_timeout {
521 let abort_handles: Vec<_> = service_handles.iter().map(|h| h.abort_handle()).collect();
522 let drain = async { for h in service_handles { let _ = h.await; } };
523 if tokio::time::timeout(timeout, drain).await.is_err() {
524 eprintln!("[archy] shutdown timeout, force-aborting workers");
525 for h in abort_handles { h.abort(); }
526 }
527 } else {
528 for h in service_handles { let _ = h.await; }
529 }
530
531 let last = self.extract_systems(|s| matches!(s, Schedule::Last));
533 self.run_phase_blocking(last).await;
534 }
535
536 fn extract_systems(&mut self, predicate: impl Fn(&Schedule) -> bool) -> Vec<SystemDescriptor> {
538 let mut extracted = Vec::new();
539 let mut i = 0;
540 while i < self.systems.len() {
541 if predicate(&self.systems[i].schedule) {
542 extracted.push(self.systems.swap_remove(i));
543 } else {
544 i += 1;
545 }
546 }
547 extracted
548 }
549
550 async fn run_phase_blocking(&self, systems: Vec<SystemDescriptor>) {
552 let mut handles = Vec::new();
553 for desc in systems {
554 for factory in (desc.spawner)(self, desc.config.workers) {
555 handles.push(tokio::spawn(Self::supervise_system(factory, desc.config.restart)));
556 }
557 }
558 for h in handles { let _ = h.await; }
559 }
560
561 async fn supervise_system(factory: SystemFactory, restart: RestartPolicy) {
562 supervise("system", restart, || factory()).await;
563 }
564}
565
566impl Default for App {
567 fn default() -> Self { Self::new() }
568}