archy/
app.rs

1use std::any::{Any, TypeId};
2use std::collections::HashMap;
3use std::future::Future;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7
8use async_channel::{Receiver, bounded};
9use parking_lot::Mutex;
10use tokio::sync::{broadcast, Semaphore};
11use tokio_util::sync::CancellationToken;
12
13use crate::{FromApp, Module, RestartPolicy, Schedule, Service};
14
15// --- System Types ---
16
17pub(crate) type BoxFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
18
19/// A factory that can create system futures. Can be called multiple times (for Fixed schedules).
20/// Uses Arc for zero-overhead sharing: dereference is same cost as Box, clone only on restart.
21pub type SystemFactory = Arc<dyn Fn() -> BoxFuture + Send + Sync>;
22
23/// Creates system factories from the app context
24pub(crate) type SystemSpawner = Box<dyn FnOnce(&App, usize) -> Vec<SystemFactory> + Send>;
25
26/// Configuration for a system (parallels WorkerConfig for services)
27#[derive(Clone)]
28pub struct SystemConfig {
29    pub workers: usize,
30    pub restart: RestartPolicy,
31}
32
33impl Default for SystemConfig {
34    fn default() -> Self {
35        Self { workers: 1, restart: RestartPolicy::default() }
36    }
37}
38
39/// Descriptor for a registered system
40pub struct SystemDescriptor {
41    pub id: TypeId,
42    pub schedule: Schedule,
43    pub spawner: SystemSpawner,
44    pub config: SystemConfig,
45}
46
47// --- IntoSystem Trait ---
48
49pub trait IntoSystem<Args>: Send + 'static {
50    fn into_system(self) -> SystemSpawner;
51}
52
53// --- IntoSystemConfigs Trait ---
54
55/// Trait for tuple system registration via add_systems
56pub trait IntoSystemConfigs<Marker> {
57    fn into_descriptors(self, schedule: Schedule) -> Vec<SystemDescriptor>;
58}
59
60// --- Internal Types ---
61
62pub(crate) struct EventBus<E> {
63    pub(crate) sender: broadcast::Sender<E>,
64}
65
66type Closer = Box<dyn FnOnce() + Send>;
67type ChannelCreator = Box<dyn FnOnce(usize) -> (Arc<dyn Any + Send + Sync>, Arc<dyn Any + Send + Sync>) + Send>;
68type EventCreator = Box<dyn FnOnce(usize) -> Arc<dyn Any + Send + Sync> + Send>;
69type ServiceSpawner = Box<dyn FnOnce(&App, Arc<dyn Any + Send + Sync>, WorkerConfig) -> Vec<BoxFuture> + Send>;
70
71#[derive(Clone)]
72pub(crate) struct WorkerConfig {
73    pub(crate) workers: usize,
74    pub(crate) capacity: usize,
75    pub(crate) concurrent: Option<usize>,
76    pub(crate) restart: RestartPolicy,
77}
78
79impl Default for WorkerConfig {
80    fn default() -> Self {
81        Self { workers: 1, capacity: 64, concurrent: None, restart: RestartPolicy::default() }
82    }
83}
84
85#[derive(Clone)]
86pub(crate) struct EventConfig {
87    pub(crate) capacity: usize,
88}
89
90// --- Batch Registration Traits ---
91
92pub trait AddResources {
93    fn add_to(self, app: &mut App);
94}
95
96pub trait AddServices {
97    fn add_to(app: &mut App);
98}
99
100pub trait AddEvents {
101    fn add_to(app: &mut App);
102}
103
104// --- Config Builders ---
105
106pub struct ServiceConfigBuilder<'a> {
107    pub(crate) app: &'a mut App,
108    pub(crate) id: TypeId,
109}
110
111impl ServiceConfigBuilder<'_> {
112    pub fn workers(self, n: usize) -> Self {
113        self.app.service_configs.get_mut(&self.id).unwrap().workers = n;
114        self
115    }
116    pub fn capacity(self, n: usize) -> Self {
117        self.app.service_configs.get_mut(&self.id).unwrap().capacity = n;
118        self
119    }
120    /// Allow up to `n` concurrent message handlers per worker (default: sequential).
121    pub fn concurrent(self, max_per_worker: usize) -> Self {
122        self.app.service_configs.get_mut(&self.id).unwrap().concurrent = Some(max_per_worker);
123        self
124    }
125    pub fn restart(self, policy: RestartPolicy) -> Self {
126        self.app.service_configs.get_mut(&self.id).unwrap().restart = policy;
127        self
128    }
129}
130
131pub struct SystemConfigBuilder<'a> {
132    pub(crate) app: &'a mut App,
133    pub(crate) id: TypeId,
134}
135
136impl SystemConfigBuilder<'_> {
137    fn get_config_mut(&mut self) -> &mut SystemConfig {
138        &mut self.app.systems.iter_mut()
139            .find(|d| d.id == self.id)
140            .expect("System not found")
141            .config
142    }
143
144    pub fn workers(mut self, n: usize) -> Self {
145        self.get_config_mut().workers = n;
146        self
147    }
148
149    pub fn restart(mut self, policy: RestartPolicy) -> Self {
150        self.get_config_mut().restart = policy;
151        self
152    }
153}
154
155pub struct EventConfigBuilder<'a> {
156    pub(crate) app: &'a mut App,
157    pub(crate) id: TypeId,
158}
159
160impl EventConfigBuilder<'_> {
161    pub fn capacity(self, n: usize) -> Self {
162        self.app.event_configs.get_mut(&self.id).unwrap().capacity = n;
163        self
164    }
165}
166
167pub struct ShutdownConfigBuilder<'a> {
168    pub(crate) app: &'a mut App,
169}
170
171impl ShutdownConfigBuilder<'_> {
172    /// Force-abort workers if they don't drain within this duration.
173    pub fn timeout(self, duration: Duration) -> Self {
174        self.app.shutdown_timeout = Some(duration);
175        self
176    }
177}
178
179// --- Supervision Helper ---
180
181/// Runs a task with restart policy supervision.
182async fn supervise<F, Fut>(name: &str, restart: RestartPolicy, mut factory: F)
183where
184    F: FnMut() -> Fut,
185    Fut: Future<Output = ()> + Send + 'static,
186{
187    match restart {
188        RestartPolicy::Never => {
189            let _ = tokio::spawn(factory()).await;
190        }
191        RestartPolicy::Always => {
192            loop {
193                let result = tokio::spawn(factory()).await;
194                match result {
195                    Ok(()) => break,
196                    Err(e) if e.is_panic() => {
197                        eprintln!("[archy] {} panicked, restarting: {:?}", name, e);
198                    }
199                    Err(_) => break,
200                }
201            }
202        }
203        RestartPolicy::Attempts { max, reset_after } => {
204            let mut attempts = 0;
205            loop {
206                let start = Instant::now();
207                let result = tokio::spawn(factory()).await;
208                match result {
209                    Ok(()) => break,
210                    Err(e) if e.is_panic() => {
211                        if let Some(duration) = reset_after
212                            && start.elapsed() >= duration
213                        {
214                            attempts = 0;
215                        }
216                        attempts += 1;
217                        if attempts >= max {
218                            eprintln!("[archy] {}: max restart attempts ({}) reached", name, max);
219                            break;
220                        }
221                        eprintln!("[archy] {} panicked, restarting ({}/{}): {:?}", name, attempts, max, e);
222                    }
223                    Err(_) => break,
224                }
225            }
226        }
227    }
228}
229
230// --- App ---
231
232pub struct App {
233    pub(crate) resources: HashMap<TypeId, Arc<dyn Any + Send + Sync>>,
234    pub(crate) event_buses: HashMap<TypeId, Arc<dyn Any + Send + Sync>>,
235    pub(crate) service_senders: HashMap<TypeId, Arc<dyn Any + Send + Sync>>,
236
237    event_creators: HashMap<TypeId, EventCreator>,
238    channel_creators: HashMap<TypeId, ChannelCreator>,
239    service_spawners: HashMap<TypeId, ServiceSpawner>,
240
241    pub(crate) systems: Vec<SystemDescriptor>,
242
243    pub(crate) service_configs: HashMap<TypeId, WorkerConfig>,
244    pub(crate) event_configs: HashMap<TypeId, EventConfig>,
245
246    closers: Arc<Mutex<Vec<Closer>>>,
247    pub(crate) shutdown_token: CancellationToken,
248    pub(crate) shutdown_timeout: Option<Duration>,
249    default_capacity: usize,
250}
251
252impl App {
253    pub fn new() -> Self {
254        Self {
255            resources: HashMap::new(),
256            event_buses: HashMap::new(),
257            service_senders: HashMap::new(),
258            event_creators: HashMap::new(),
259            channel_creators: HashMap::new(),
260            service_spawners: HashMap::new(),
261            systems: Vec::new(),
262            service_configs: HashMap::new(),
263            event_configs: HashMap::new(),
264            closers: Arc::new(Mutex::new(Vec::new())),
265            shutdown_token: CancellationToken::new(),
266            shutdown_timeout: None,
267            default_capacity: 64,
268        }
269    }
270
271    /// Configure shutdown behavior
272    pub fn shutdown(&mut self) -> ShutdownConfigBuilder<'_> {
273        ShutdownConfigBuilder { app: self }
274    }
275
276    /// Extract a dependency from the App (convenience helper for Service::create)
277    pub fn extract<T: FromApp>(&self) -> T {
278        T::from_app(self)
279    }
280
281    pub fn add_module<M: Module>(&mut self, module: M) -> &mut Self {
282        module.register(self);
283        self
284    }
285
286    pub fn add_resource<T: Send + Sync + 'static>(&mut self, resource: T) -> &mut Self {
287        let id = TypeId::of::<T>();
288        assert!(!self.resources.contains_key(&id), "Resource {} already registered", std::any::type_name::<T>());
289        self.resources.insert(id, Arc::new(resource));
290        self
291    }
292
293    pub fn add_resources<T: AddResources>(&mut self, resources: T) -> &mut Self {
294        resources.add_to(self);
295        self
296    }
297
298    pub fn add_event<E: Clone + Send + 'static>(&mut self) -> &mut Self {
299        let id = TypeId::of::<E>();
300        assert!(!self.event_creators.contains_key(&id), "Event {} already registered", std::any::type_name::<E>());
301        self.event_configs.insert(id, EventConfig { capacity: self.default_capacity });
302        self.event_creators.insert(id, Box::new(|capacity| {
303            let (tx, _) = broadcast::channel::<E>(capacity);
304            Arc::new(EventBus { sender: tx })
305        }));
306        self
307    }
308
309    pub fn add_events<T: AddEvents>(&mut self) -> &mut Self {
310        T::add_to(self);
311        self
312    }
313
314    pub fn event<E: Clone + Send + 'static>(&mut self) -> EventConfigBuilder<'_> {
315        let id = TypeId::of::<E>();
316        assert!(self.event_configs.contains_key(&id), "Event {} not registered", std::any::type_name::<E>());
317        EventConfigBuilder { app: self, id }
318    }
319
320    pub fn add_service<S: Service>(&mut self) -> ServiceConfigBuilder<'_> {
321        let id = TypeId::of::<S>();
322        assert!(!self.service_configs.contains_key(&id), "Service {} already registered", std::any::type_name::<S>());
323
324        self.service_configs.insert(id, WorkerConfig::default());
325
326        let closers = self.closers.clone();
327        self.channel_creators.insert(id, Box::new(move |capacity| {
328            let (tx, rx) = bounded::<S::Message>(capacity);
329            let closer_tx = tx.clone();
330            closers.lock().push(Box::new(move || { closer_tx.close(); }));
331            (Arc::new(tx), Arc::new(rx))
332        }));
333
334        self.service_spawners.insert(id, Box::new(|app, rx_any, config| {
335            let rx = rx_any.downcast::<Receiver<S::Message>>().unwrap();
336            let service = Arc::new(S::create(app));
337
338            (0..config.workers).map(|_| {
339                let rx = rx.as_ref().clone();
340                let svc = service.clone();
341                let restart = config.restart;
342
343                match config.concurrent {
344                    // Sequential mode (default): process one message at a time per worker
345                    None => Box::pin(async move {
346                        supervise("service worker", restart, || {
347                            let rx = rx.clone();
348                            let svc = svc.clone();
349                            async move {
350                                while let Ok(msg) = rx.recv().await {
351                                    svc.clone().handle(msg).await;
352                                }
353                            }
354                        }).await;
355                    }) as BoxFuture,
356
357                    // Concurrent mode: up to N concurrent handlers per worker
358                    Some(max_concurrent) => {
359                        let semaphore = Arc::new(Semaphore::new(max_concurrent));
360                        Box::pin(async move {
361                            supervise("service worker", restart, || {
362                                let rx = rx.clone();
363                                let svc = svc.clone();
364                                let semaphore = semaphore.clone();
365                                async move {
366                                    while let Ok(msg) = rx.recv().await {
367                                        let permit = semaphore.clone().acquire_owned().await
368                                            .expect("semaphore closed unexpectedly");
369                                        let svc = svc.clone();
370                                        tokio::spawn(async move {
371                                            svc.handle(msg).await;
372                                            drop(permit);
373                                        });
374                                    }
375                                }
376                            }).await;
377                        }) as BoxFuture
378                    }
379                }
380            }).collect()
381        }));
382
383        ServiceConfigBuilder { app: self, id }
384    }
385
386    pub fn add_services<T: AddServices>(&mut self) -> &mut Self {
387        T::add_to(self);
388        self
389    }
390
391    pub fn service<S: Service>(&mut self) -> ServiceConfigBuilder<'_> {
392        let id = TypeId::of::<S>();
393        assert!(self.service_configs.contains_key(&id), "Service {} not registered", std::any::type_name::<S>());
394        ServiceConfigBuilder { app: self, id }
395    }
396
397    /// Add a single system with the given schedule
398    pub fn add_system<F, Args>(&mut self, schedule: Schedule, system: F) -> &mut Self
399    where
400        F: IntoSystem<Args>,
401    {
402        self.systems.push(SystemDescriptor {
403            id: TypeId::of::<F>(),
404            schedule,
405            spawner: system.into_system(),
406            config: SystemConfig::default(),
407        });
408        self
409    }
410
411    /// Add multiple systems with the given schedule: `add_systems(Schedule::Run, (sys1, sys2, sys3))`
412    pub fn add_systems<M, S>(&mut self, schedule: Schedule, systems: S) -> &mut Self
413    where
414        S: IntoSystemConfigs<M>,
415    {
416        self.systems.extend(systems.into_descriptors(schedule));
417        self
418    }
419
420    /// Configure a previously registered system
421    pub fn system<F, Args>(&mut self, _system: F) -> SystemConfigBuilder<'_>
422    where
423        F: IntoSystem<Args>,
424    {
425        SystemConfigBuilder { app: self, id: TypeId::of::<F>() }
426    }
427
428    /// Run the application until shutdown
429    pub async fn run(mut self) {
430        let mut service_handles = Vec::new();
431        let mut system_handles = Vec::new();
432
433        // Phase 0: Create event buses
434        let event_creators = std::mem::take(&mut self.event_creators);
435        for (id, creator) in event_creators {
436            let capacity = self.event_configs.get(&id).map(|c| c.capacity).unwrap_or(self.default_capacity);
437            self.event_buses.insert(id, creator(capacity));
438        }
439
440        // Phase 1: Create service channels
441        let creators = std::mem::take(&mut self.channel_creators);
442        let mut receivers: HashMap<TypeId, Arc<dyn Any + Send + Sync>> = HashMap::new();
443        for (id, creator) in creators {
444            let config = self.service_configs.get(&id).cloned().unwrap_or_default();
445            let (tx, rx) = creator(config.capacity);
446            self.service_senders.insert(id, tx);
447            receivers.insert(id, rx);
448        }
449
450        // Phase 2: Run First systems (blocking)
451        let first = self.extract_systems(|s| matches!(s, Schedule::First));
452        self.run_phase_blocking(first).await;
453
454        // Phase 3: Spawn service workers
455        let spawners = std::mem::take(&mut self.service_spawners);
456        for (id, spawner) in spawners {
457            let config = self.service_configs.get(&id).cloned().unwrap_or_default();
458            let rx = receivers.remove(&id).unwrap();
459            for fut in spawner(&self, rx, config) {
460                service_handles.push(tokio::spawn(fut));
461            }
462        }
463
464        // Phase 4: Run Startup systems (blocking)
465        let startup = self.extract_systems(|s| matches!(s, Schedule::Startup));
466        self.run_phase_blocking(startup).await;
467
468        // Phase 5: Spawn Run systems (background)
469        let run = self.extract_systems(|s| matches!(s, Schedule::Run));
470        for desc in run {
471            for factory in (desc.spawner)(&self, desc.config.workers) {
472                system_handles.push(tokio::spawn(Self::supervise_system(factory, desc.config.restart)));
473            }
474        }
475
476        // Phase 6: Spawn Fixed systems (interval loops)
477        let fixed = self.extract_systems(|s| matches!(s, Schedule::Fixed(_)));
478        for desc in fixed {
479            let duration = match desc.schedule {
480                Schedule::Fixed(d) => d,
481                _ => unreachable!(),
482            };
483            let token = self.shutdown_token.clone();
484            let restart = desc.config.restart;
485            for factory in (desc.spawner)(&self, desc.config.workers) {
486                let token = token.clone();
487                system_handles.push(tokio::spawn(async move {
488                    supervise("fixed system", restart, || {
489                        let factory = factory.clone(); // Arc clone - warm path only
490                        let token = token.clone();
491                        async move {
492                            let mut interval = tokio::time::interval(duration);
493                            interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
494                            loop {
495                                tokio::select! {
496                                    _ = interval.tick() => {}
497                                    _ = token.cancelled() => break,
498                                }
499                                factory().await;
500                            }
501                        }
502                    }).await;
503                }));
504            }
505        }
506
507        // Wait for shutdown
508        self.shutdown_token.cancelled().await;
509
510        // Phase 7: Run Shutdown systems (blocking)
511        let shutdown = self.extract_systems(|s| matches!(s, Schedule::Shutdown));
512        self.run_phase_blocking(shutdown).await;
513
514        // Phase 8: Close channels, abort systems, wait for services
515        let closers = std::mem::take(&mut *self.closers.lock());
516        for closer in closers { closer(); }
517
518        for handle in system_handles { handle.abort(); }
519
520        if let Some(timeout) = self.shutdown_timeout {
521            let abort_handles: Vec<_> = service_handles.iter().map(|h| h.abort_handle()).collect();
522            let drain = async { for h in service_handles { let _ = h.await; } };
523            if tokio::time::timeout(timeout, drain).await.is_err() {
524                eprintln!("[archy] shutdown timeout, force-aborting workers");
525                for h in abort_handles { h.abort(); }
526            }
527        } else {
528            for h in service_handles { let _ = h.await; }
529        }
530
531        // Phase 9: Run Last systems (blocking)
532        let last = self.extract_systems(|s| matches!(s, Schedule::Last));
533        self.run_phase_blocking(last).await;
534    }
535
536    /// Extract systems matching predicate (removes from self.systems)
537    fn extract_systems(&mut self, predicate: impl Fn(&Schedule) -> bool) -> Vec<SystemDescriptor> {
538        let mut extracted = Vec::new();
539        let mut i = 0;
540        while i < self.systems.len() {
541            if predicate(&self.systems[i].schedule) {
542                extracted.push(self.systems.swap_remove(i));
543            } else {
544                i += 1;
545            }
546        }
547        extracted
548    }
549
550    /// Run systems and wait for completion
551    async fn run_phase_blocking(&self, systems: Vec<SystemDescriptor>) {
552        let mut handles = Vec::new();
553        for desc in systems {
554            for factory in (desc.spawner)(self, desc.config.workers) {
555                handles.push(tokio::spawn(Self::supervise_system(factory, desc.config.restart)));
556            }
557        }
558        for h in handles { let _ = h.await; }
559    }
560
561    async fn supervise_system(factory: SystemFactory, restart: RestartPolicy) {
562        supervise("system", restart, || factory()).await;
563    }
564}
565
566impl Default for App {
567    fn default() -> Self { Self::new() }
568}