Skip to main content

veilid_core/
component.rs

1use super::*;
2
3impl_veilid_log_facility!("registry");
4
5pub(crate) trait AsAnyArcSendSync {
6    fn as_any_arc_send_sync(self: Arc<Self>) -> Arc<dyn core::any::Any + Send + Sync>;
7}
8
9impl<T: Send + Sync + 'static> AsAnyArcSendSync for T {
10    fn as_any_arc_send_sync(self: Arc<Self>) -> Arc<dyn core::any::Any + Send + Sync> {
11        self
12    }
13}
14
15pub(crate) trait VeilidComponent:
16    AsAnyArcSendSync + VeilidComponentRegistryAccessor + core::fmt::Debug
17{
18    fn name(&self) -> &'static str;
19    fn log_facilities(&self) -> VeilidComponentLogFacilities;
20    fn init(&self) -> PinBoxFuture<'_, EyreResult<()>>;
21    fn post_init(&self) -> PinBoxFuture<'_, EyreResult<()>>;
22    fn pre_terminate(&self) -> PinBoxFuture<'_, ()>;
23    fn terminate(&self) -> PinBoxFuture<'_, ()>;
24}
25
26pub(crate) trait VeilidComponentRegistryAccessor {
27    fn registry(&self) -> VeilidComponentRegistry;
28
29    fn config(&self) -> Arc<VeilidConfig> {
30        self.registry().startup_options.config()
31    }
32    fn update_callback(&self) -> UpdateCallback {
33        self.registry().startup_options.update_callback()
34    }
35    fn event_bus(&self) -> EventBus {
36        self.registry().event_bus.clone()
37    }
38    fn log_key(&self) -> VeilidLogKey {
39        self.registry().log_key()
40    }
41}
42
43pub struct VeilidComponentGuard<'a, T: Send + Sync + 'static> {
44    component: Arc<T>,
45    _phantom: core::marker::PhantomData<&'a T>,
46}
47
48impl<T> core::ops::Deref for VeilidComponentGuard<'_, T>
49where
50    T: Send + Sync + 'static,
51{
52    type Target = T;
53
54    fn deref(&self) -> &Self::Target {
55        &self.component
56    }
57}
58
59#[derive(Debug)]
60struct VeilidComponentRegistryInner {
61    type_map: HashMap<core::any::TypeId, Arc<dyn VeilidComponent + Send + Sync>>,
62    init_order: Vec<core::any::TypeId>,
63    #[cfg(any(test, feature = "test-util"))]
64    mock: bool,
65}
66
67#[derive(Clone, Debug)]
68pub(crate) struct VeilidComponentRegistry {
69    inner: Arc<Mutex<VeilidComponentRegistryInner>>,
70    startup_options: VeilidStartupOptions,
71    namespace: &'static str,
72    program_name: &'static str,
73    log_key: &'static str,
74    event_bus: EventBus,
75    init_lock: Arc<AsyncMutex<bool>>,
76}
77
78impl VeilidComponentRegistry {
79    pub fn new(startup_options: VeilidStartupOptions) -> Self {
80        let namespace = startup_options.config().namespace.to_static_str();
81        let program_name = startup_options.config().program_name.to_static_str();
82
83        let log_key = VeilidLayerFilter::make_veilid_log_key(program_name, namespace);
84
85        Self {
86            inner: Arc::new(Mutex::new(VeilidComponentRegistryInner {
87                type_map: HashMap::new(),
88                init_order: Vec::new(),
89                #[cfg(any(test, feature = "test-util"))]
90                mock: false,
91            })),
92            startup_options,
93            namespace,
94            program_name,
95            log_key,
96            event_bus: EventBus::new(),
97            init_lock: Arc::new(AsyncMutex::new(false)),
98        }
99    }
100
101    #[cfg(any(test, feature = "test-util"))]
102    pub fn enable_mock(&self) {
103        let mut inner = self.inner.lock();
104        inner.mock = true;
105    }
106    // #[cfg(any(test, feature = "test-util"))]
107    // pub fn is_mock(&self) -> bool {
108    //     let inner = self.inner.lock();
109    //     inner.mock
110    // }
111
112    #[expect(dead_code)]
113    pub fn namespace(&self) -> &'static str {
114        self.namespace
115    }
116
117    #[allow(dead_code)]
118    pub fn program_name(&self) -> &'static str {
119        self.program_name
120    }
121
122    pub fn log_key(&self) -> VeilidLogKey {
123        self.log_key
124    }
125
126    pub fn register<
127        T: VeilidComponent + Send + Sync + 'static,
128        F: FnOnce(VeilidComponentRegistry) -> T,
129    >(
130        &self,
131        component_constructor: F,
132    ) {
133        let component = Arc::new(component_constructor(self.clone()));
134        let component_type_id = core::any::TypeId::of::<T>();
135
136        // Add to type map and initialization order
137        let mut inner = self.inner.lock();
138        assert!(
139            inner
140                .type_map
141                .insert(component_type_id, component)
142                .is_none(),
143            "should not register same component twice"
144        );
145        inner.init_order.push(component_type_id);
146    }
147
148    pub fn register_with_context<
149        C,
150        T: VeilidComponent + Send + Sync + 'static,
151        F: FnOnce(VeilidComponentRegistry, C) -> T,
152    >(
153        &self,
154        component_constructor: F,
155        context: C,
156    ) {
157        let component = Arc::new(component_constructor(self.clone(), context));
158        let component_type_id = core::any::TypeId::of::<T>();
159
160        // Add to type map and initialization order
161        let mut inner = self.inner.lock();
162        assert!(
163            inner
164                .type_map
165                .insert(component_type_id, component)
166                .is_none(),
167            "should not register same component twice"
168        );
169        inner.init_order.push(component_type_id);
170    }
171
172    pub async fn init(&self) -> EyreResult<()> {
173        let Some(mut _init_guard) = self.init_lock.try_lock() else {
174            bail!("init should only happen one at a time");
175        };
176        if *_init_guard {
177            bail!("already initialized");
178        }
179
180        VeilidLayerFilter::init_veilid_component_log_facilities(
181            self.log_key(),
182            self.get_init_order()
183                .into_iter()
184                .map(|x| x.log_facilities())
185                .collect(),
186        )?;
187
188        // Event bus starts up early
189        self.event_bus.startup()?;
190
191        // Process components in initialization order
192        let init_order = self.get_init_order();
193        let mut initialized = vec![];
194        for component in init_order {
195            if let Err(e) = component.init().await {
196                self.terminate_inner(initialized).await;
197                self.event_bus.shutdown().await;
198                return Err(e);
199            }
200            initialized.push(component);
201        }
202
203        *_init_guard = true;
204        Ok(())
205    }
206
207    pub async fn post_init(&self) -> EyreResult<()> {
208        let Some(mut _init_guard) = self.init_lock.try_lock() else {
209            bail!("init should only happen one at a time");
210        };
211        if !*_init_guard {
212            bail!("not initialized");
213        }
214
215        let init_order = self.get_init_order();
216        let mut post_initialized = vec![];
217        for component in init_order {
218            if let Err(e) = component.post_init().await {
219                self.pre_terminate_inner(post_initialized).await;
220                return Err(e);
221            }
222            post_initialized.push(component)
223        }
224        Ok(())
225    }
226
227    pub async fn pre_terminate(&self) {
228        let Some(mut _init_guard) = self.init_lock.try_lock() else {
229            panic!("terminate should only happen one at a time");
230        };
231        if !*_init_guard {
232            panic!("not initialized");
233        }
234
235        let init_order = self.get_init_order();
236        self.pre_terminate_inner(init_order).await;
237    }
238
239    pub async fn terminate(&self) {
240        let Some(mut _init_guard) = self.init_lock.try_lock() else {
241            panic!("terminate should only happen one at a time");
242        };
243        if !*_init_guard {
244            panic!("not initialized");
245        }
246
247        // Terminate components in reverse initialization order
248        let init_order = self.get_init_order();
249        self.terminate_inner(init_order).await;
250
251        // Event bus shuts down last
252        self.event_bus.shutdown().await;
253
254        // Remoave all registered component log facilities from VeilidLayerFilter for this log key
255        if let Err(e) = VeilidLayerFilter::terminate_veilid_component_log_facilities(self.log_key())
256        {
257            eprintln!("Error terminating log facilities: {}", e);
258        }
259
260        *_init_guard = false;
261    }
262
263    async fn pre_terminate_inner(
264        &self,
265        pre_initialized: Vec<Arc<dyn VeilidComponent + Send + Sync>>,
266    ) {
267        for component in pre_initialized.iter().rev() {
268            component.pre_terminate().await;
269        }
270    }
271    async fn terminate_inner(&self, initialized: Vec<Arc<dyn VeilidComponent + Send + Sync>>) {
272        for component in initialized.iter().rev() {
273            let refs = Arc::strong_count(component);
274            if refs > 2 {
275                veilid_log!(self warn
276                    "Terminating component '{}' while still referenced ({} extra references)",
277                    component.name(),
278                    refs - 2
279                );
280            }
281            component.terminate().await;
282        }
283    }
284
285    fn get_init_order(&self) -> Vec<Arc<dyn VeilidComponent + Send + Sync>> {
286        let inner = self.inner.lock();
287        inner
288            .init_order
289            .iter()
290            .map(|id| inner.type_map.get(id).unwrap_or_log().clone())
291            .collect::<Vec<_>>()
292    }
293
294    //////////////////////////////////////////////////////////////
295
296    pub fn lookup<'a, T: VeilidComponent + Send + Sync + 'static>(
297        &self,
298    ) -> Option<VeilidComponentGuard<'a, T>> {
299        let inner = self.inner.lock();
300        let component_type_id = core::any::TypeId::of::<T>();
301        let component_dyn = inner.type_map.get(&component_type_id)?.clone();
302        let component = component_dyn
303            .as_any_arc_send_sync()
304            .downcast::<T>()
305            .unwrap_or_log();
306        Some(VeilidComponentGuard {
307            component,
308            _phantom: core::marker::PhantomData {},
309        })
310    }
311}
312
313impl VeilidComponentRegistryAccessor for VeilidComponentRegistry {
314    fn registry(&self) -> VeilidComponentRegistry {
315        self.clone()
316    }
317}
318
319////////////////////////////////////////////////////////////////////
320
321macro_rules! impl_veilid_component_accessors {
322    ($struct_name:ty) => {
323        impl VeilidComponentRegistryAccessor for $struct_name {
324            fn registry(&self) -> VeilidComponentRegistry {
325                self.registry.clone()
326            }
327        }
328    };
329}
330
331pub(crate) use impl_veilid_component_accessors;
332
333/////////////////////////////////////////////////////////////////////
334
335macro_rules! impl_veilid_component {
336    ($component_name:ty) => {
337        impl_veilid_component_accessors!($component_name);
338
339        impl VeilidComponent for $component_name {
340            fn name(&self) -> &'static str {
341                stringify!($component_name)
342            }
343
344            fn log_facilities(&self) -> VeilidComponentLogFacilities {
345                <$component_name>::log_facilities_impl(self)
346            }
347
348            fn init(&self) -> PinBoxFuture<'_, EyreResult<()>> {
349                Box::pin(async { self.init_async().await })
350            }
351
352            fn post_init(&self) -> PinBoxFuture<'_, EyreResult<()>> {
353                Box::pin(async { self.post_init_async().await })
354            }
355
356            fn pre_terminate(&self) -> PinBoxFuture<'_, ()> {
357                Box::pin(async { self.pre_terminate_async().await })
358            }
359
360            fn terminate(&self) -> PinBoxFuture<'_, ()> {
361                Box::pin(async { self.terminate_async().await })
362            }
363        }
364    };
365}
366
367pub(crate) use impl_veilid_component;
368
369/////////////////////////////////////////////////////////////////////
370
371// Utility macro for setting up a background TickTask
372// Should be called during new/construction of a component with background tasks
373// and before any post-init 'tick' operations are started
374macro_rules! impl_setup_task {
375    ($this:expr, $this_type:ty, $task_name:ident, $task_routine:ident ) => {{
376        let registry = $this.registry();
377        $this.$task_name.set_routine(move |s, l, t| {
378            let registry = registry.clone();
379            Box::pin(async move {
380                let this = registry.lookup::<$this_type>().unwrap_or_log();
381                this.$task_routine(s, Timestamp::new(l), Timestamp::new(t))
382            })
383        });
384    }};
385}
386
387pub(crate) use impl_setup_task;
388
389macro_rules! impl_setup_task_async {
390    ($this:expr, $this_type:ty, $task_name:ident, $task_routine:ident ) => {{
391        let registry = $this.registry();
392        $this.$task_name.set_routine(move |s, l, t| {
393            let registry = registry.clone();
394            Box::pin(async move {
395                let this = registry.lookup::<$this_type>().unwrap_or_log();
396                this.$task_routine(s, Timestamp::new(l), Timestamp::new(t))
397                    .await
398            })
399        });
400    }};
401}
402
403pub(crate) use impl_setup_task_async;
404
405// Utility macro for setting up an event bus handler
406// Should be called after init, during post-init or later
407// Subscription should be unsubscribed before termination
408macro_rules! impl_subscribe_event_bus {
409    ($this:expr, $this_type:ty, $event_handler:ident ) => {{
410        let registry = $this.registry();
411        $this.event_bus().subscribe(move |evt| {
412            let registry = registry.clone();
413            Box::pin(async move {
414                let this = registry.lookup::<$this_type>().unwrap_or_log();
415                this.$event_handler(evt);
416            })
417        })
418    }};
419}
420
421pub(crate) use impl_subscribe_event_bus;
422
423macro_rules! impl_subscribe_event_bus_async {
424    ($this:expr, $this_type:ty, $event_handler:ident ) => {{
425        let registry = $this.registry();
426        $this.event_bus().subscribe(move |evt| {
427            let registry = registry.clone();
428            Box::pin(async move {
429                let this = registry.lookup::<$this_type>().unwrap_or_log();
430                this.$event_handler(evt).await;
431            })
432        })
433    }};
434}
435
436pub(crate) use impl_subscribe_event_bus_async;