Skip to main content

veilid_core/
component.rs

1use super::*;
2
3impl_veilid_log_facility!("registry");
4
5pub(crate) trait AsAnyArcSendSync {
6    fn as_any_arc_send_sync(self: Arc<Self>) -> Arc<dyn core::any::Any + Send + Sync>;
7}
8
9impl<T: Send + Sync + 'static> AsAnyArcSendSync for T {
10    fn as_any_arc_send_sync(self: Arc<Self>) -> Arc<dyn core::any::Any + Send + Sync> {
11        self
12    }
13}
14
15pub(crate) trait VeilidComponent:
16    AsAnyArcSendSync + VeilidComponentRegistryAccessor + core::fmt::Debug
17{
18    fn name(&self) -> &'static str;
19    fn log_facilities(&self) -> VeilidComponentLogFacilities;
20    fn init(&self) -> PinBoxFuture<'_, EyreResult<()>>;
21    fn post_init(&self) -> PinBoxFuture<'_, EyreResult<()>>;
22    fn pre_terminate(&self) -> PinBoxFuture<'_, ()>;
23    fn terminate(&self) -> PinBoxFuture<'_, ()>;
24}
25
26pub(crate) trait VeilidComponentRegistryAccessor {
27    fn registry(&self) -> VeilidComponentRegistry;
28
29    fn config(&self) -> Arc<VeilidConfig> {
30        self.registry().unlocked_inner.startup_options.config()
31    }
32    fn update_callback(&self) -> UpdateCallback {
33        self.registry()
34            .unlocked_inner
35            .startup_options
36            .update_callback()
37    }
38    fn event_bus(&self) -> EventBus {
39        self.registry().event_bus()
40    }
41    fn log_key(&self) -> VeilidLogKey {
42        self.registry().log_key()
43    }
44}
45
46pub struct VeilidComponentGuard<'a, T: Send + Sync + 'static> {
47    component: Arc<T>,
48    _phantom: core::marker::PhantomData<&'a T>,
49}
50
51impl<T> core::ops::Deref for VeilidComponentGuard<'_, T>
52where
53    T: Send + Sync + 'static,
54{
55    type Target = T;
56
57    fn deref(&self) -> &Self::Target {
58        &self.component
59    }
60}
61
62#[derive(Debug)]
63struct VeilidComponentRegistryInner {
64    type_map: HashMap<core::any::TypeId, Arc<dyn VeilidComponent + Send + Sync>>,
65    init_order: Vec<core::any::TypeId>,
66    #[cfg(any(test, feature = "test-util"))]
67    mock: bool,
68}
69
70#[derive(Debug)]
71struct VeilidComponentRegistryUnlockedInner {
72    inner: Mutex<VeilidComponentRegistryInner>,
73    startup_options: VeilidStartupOptions,
74    namespace: &'static str,
75    program_name: &'static str,
76    log_key: &'static str,
77    event_bus: EventBus,
78    init_lock: AsyncMutex<bool>,
79}
80
81#[derive(Clone, Debug)]
82pub(crate) struct VeilidComponentRegistry {
83    unlocked_inner: Arc<VeilidComponentRegistryUnlockedInner>,
84}
85
86impl VeilidComponentRegistry {
87    pub fn new(startup_options: VeilidStartupOptions) -> Self {
88        let namespace = startup_options.config().namespace.to_static_str();
89        let program_name = startup_options.config().program_name.to_static_str();
90
91        let log_key = VeilidLayerFilter::make_veilid_log_key(program_name, namespace);
92
93        Self {
94            unlocked_inner: Arc::new(VeilidComponentRegistryUnlockedInner {
95                inner: Mutex::new(VeilidComponentRegistryInner {
96                    type_map: HashMap::new(),
97                    init_order: Vec::new(),
98                    #[cfg(any(test, feature = "test-util"))]
99                    mock: false,
100                }),
101                startup_options,
102                namespace,
103                program_name,
104                log_key,
105                event_bus: EventBus::new(),
106                init_lock: AsyncMutex::new(false),
107            }),
108        }
109    }
110
111    #[cfg(any(test, feature = "test-util"))]
112    pub fn enable_mock(&self) {
113        let mut inner = self.unlocked_inner.inner.lock();
114        inner.mock = true;
115    }
116    // #[cfg(any(test, feature = "test-util"))]
117    // pub fn is_mock(&self) -> bool {
118    //     let inner = self.unlocked_inner.inner.lock();
119    //     inner.mock
120    // }
121
122    #[expect(dead_code)]
123    pub fn namespace(&self) -> &'static str {
124        self.unlocked_inner.namespace
125    }
126
127    #[allow(dead_code)]
128    pub fn program_name(&self) -> &'static str {
129        self.unlocked_inner.program_name
130    }
131
132    pub fn log_key(&self) -> VeilidLogKey {
133        self.unlocked_inner.log_key
134    }
135
136    pub fn event_bus(&self) -> EventBus {
137        self.unlocked_inner.event_bus.clone()
138    }
139
140    pub fn register<
141        T: VeilidComponent + Send + Sync + 'static,
142        F: FnOnce(VeilidComponentRegistry) -> T,
143    >(
144        &self,
145        component_constructor: F,
146    ) {
147        let component = Arc::new(component_constructor(self.clone()));
148        let component_type_id = core::any::TypeId::of::<T>();
149
150        // Add to type map and initialization order
151        let mut inner = self.unlocked_inner.inner.lock();
152        assert!(
153            inner
154                .type_map
155                .insert(component_type_id, component)
156                .is_none(),
157            "should not register same component twice"
158        );
159        inner.init_order.push(component_type_id);
160    }
161
162    pub fn register_with_context<
163        C,
164        T: VeilidComponent + Send + Sync + 'static,
165        F: FnOnce(VeilidComponentRegistry, C) -> T,
166    >(
167        &self,
168        component_constructor: F,
169        context: C,
170    ) {
171        let component = Arc::new(component_constructor(self.clone(), context));
172        let component_type_id = core::any::TypeId::of::<T>();
173
174        // Add to type map and initialization order
175        let mut inner = self.unlocked_inner.inner.lock();
176        assert!(
177            inner
178                .type_map
179                .insert(component_type_id, component)
180                .is_none(),
181            "should not register same component twice"
182        );
183        inner.init_order.push(component_type_id);
184    }
185
186    pub async fn init(&self) -> EyreResult<()> {
187        let Some(mut _init_guard) = self.unlocked_inner.init_lock.try_lock() else {
188            bail!("init should only happen one at a time");
189        };
190        if *_init_guard {
191            bail!("already initialized");
192        }
193
194        VeilidLayerFilter::init_veilid_component_log_facilities(
195            self.log_key(),
196            self.get_init_order()
197                .into_iter()
198                .map(|x| x.log_facilities())
199                .collect(),
200        )?;
201
202        // Event bus starts up early
203        self.unlocked_inner.event_bus.startup()?;
204
205        // Process components in initialization order
206        let init_order = self.get_init_order();
207        let mut initialized = vec![];
208        for component in init_order {
209            if let Err(e) = component.init().await {
210                veilid_log!(self error "Error initializing component '{}': {}", component.name(), e);
211                self.terminate_inner(initialized).await;
212                self.unlocked_inner.event_bus.shutdown().await;
213                return Err(e);
214            }
215            initialized.push(component);
216        }
217
218        *_init_guard = true;
219        Ok(())
220    }
221
222    pub async fn post_init(&self) -> EyreResult<()> {
223        let Some(mut _init_guard) = self.unlocked_inner.init_lock.try_lock() else {
224            bail!("init should only happen one at a time");
225        };
226        if !*_init_guard {
227            bail!("not initialized");
228        }
229
230        let init_order = self.get_init_order();
231        let mut post_initialized = vec![];
232        for component in init_order {
233            if let Err(e) = component.post_init().await {
234                self.pre_terminate_inner(post_initialized).await;
235                return Err(e);
236            }
237            post_initialized.push(component)
238        }
239        Ok(())
240    }
241
242    pub async fn pre_terminate(&self) {
243        let Some(mut _init_guard) = self.unlocked_inner.init_lock.try_lock() else {
244            panic!("terminate should only happen one at a time");
245        };
246        if !*_init_guard {
247            panic!("not initialized");
248        }
249
250        let init_order = self.get_init_order();
251        self.pre_terminate_inner(init_order).await;
252    }
253
254    pub async fn terminate(&self) {
255        let Some(mut _init_guard) = self.unlocked_inner.init_lock.try_lock() else {
256            panic!("terminate should only happen one at a time");
257        };
258        if !*_init_guard {
259            panic!("not initialized");
260        }
261
262        // Terminate components in reverse initialization order
263        let init_order = self.get_init_order();
264        self.terminate_inner(init_order).await;
265
266        // Event bus shuts down last
267        self.unlocked_inner.event_bus.shutdown().await;
268
269        // Remoave all registered component log facilities from VeilidLayerFilter for this log key
270        if let Err(e) = VeilidLayerFilter::terminate_veilid_component_log_facilities(self.log_key())
271        {
272            eprintln!("Error terminating log facilities: {}", e);
273        }
274
275        *_init_guard = false;
276    }
277
278    async fn pre_terminate_inner(
279        &self,
280        pre_initialized: Vec<Arc<dyn VeilidComponent + Send + Sync>>,
281    ) {
282        for component in pre_initialized.iter().rev() {
283            component.pre_terminate().await;
284        }
285    }
286    async fn terminate_inner(&self, initialized: Vec<Arc<dyn VeilidComponent + Send + Sync>>) {
287        for component in initialized.iter().rev() {
288            let refs = Arc::strong_count(component);
289            if refs > 2 {
290                veilid_log!(self warn
291                    "Terminating component '{}' while still referenced ({} extra references)",
292                    component.name(),
293                    refs - 2
294                );
295            }
296            component.terminate().await;
297        }
298    }
299
300    fn get_init_order(&self) -> Vec<Arc<dyn VeilidComponent + Send + Sync>> {
301        let inner = self.unlocked_inner.inner.lock();
302        inner
303            .init_order
304            .iter()
305            .map(|id| inner.type_map.get(id).unwrap_or_log().clone())
306            .collect::<Vec<_>>()
307    }
308
309    //////////////////////////////////////////////////////////////
310
311    pub fn lookup<'a, T: VeilidComponent + Send + Sync + 'static>(
312        &self,
313    ) -> Option<VeilidComponentGuard<'a, T>> {
314        let inner = self.unlocked_inner.inner.lock();
315        let component_type_id = core::any::TypeId::of::<T>();
316        let component_dyn = inner.type_map.get(&component_type_id)?.clone();
317        let component = component_dyn
318            .as_any_arc_send_sync()
319            .downcast::<T>()
320            .unwrap_or_log();
321        Some(VeilidComponentGuard {
322            component,
323            _phantom: core::marker::PhantomData {},
324        })
325    }
326}
327
328impl VeilidComponentRegistryAccessor for VeilidComponentRegistry {
329    fn registry(&self) -> VeilidComponentRegistry {
330        self.clone()
331    }
332}
333
334////////////////////////////////////////////////////////////////////
335
336macro_rules! impl_veilid_component_accessors {
337    ($struct_name:ty) => {
338        impl VeilidComponentRegistryAccessor for $struct_name {
339            fn registry(&self) -> VeilidComponentRegistry {
340                self.registry.clone()
341            }
342        }
343    };
344}
345
346pub(crate) use impl_veilid_component_accessors;
347
348/////////////////////////////////////////////////////////////////////
349
350macro_rules! impl_veilid_component {
351    ($component_name:ty) => {
352        impl_veilid_component_accessors!($component_name);
353
354        impl VeilidComponent for $component_name {
355            fn name(&self) -> &'static str {
356                stringify!($component_name)
357            }
358
359            fn log_facilities(&self) -> VeilidComponentLogFacilities {
360                <$component_name>::log_facilities_impl(self)
361            }
362
363            fn init(&self) -> PinBoxFuture<'_, EyreResult<()>> {
364                Box::pin(async { self.init_async().await })
365            }
366
367            fn post_init(&self) -> PinBoxFuture<'_, EyreResult<()>> {
368                Box::pin(async { self.post_init_async().await })
369            }
370
371            fn pre_terminate(&self) -> PinBoxFuture<'_, ()> {
372                Box::pin(async { self.pre_terminate_async().await })
373            }
374
375            fn terminate(&self) -> PinBoxFuture<'_, ()> {
376                Box::pin(async { self.terminate_async().await })
377            }
378        }
379    };
380}
381
382pub(crate) use impl_veilid_component;
383
384/////////////////////////////////////////////////////////////////////
385
386// Utility macro for setting up a background TickTask
387// Should be called during new/construction of a component with background tasks
388// and before any post-init 'tick' operations are started
389macro_rules! impl_setup_task {
390    ($this:expr, $this_type:ty, $task_name:ident, $task_routine:ident ) => {{
391        let registry = $this.registry();
392        $this.$task_name.set_routine(move |s, l, t| {
393            let registry = registry.clone();
394            Box::pin(async move {
395                let this = registry.lookup::<$this_type>().unwrap_or_log();
396                this.$task_routine(s, Timestamp::new(l), Timestamp::new(t))
397            })
398        });
399    }};
400}
401
402pub(crate) use impl_setup_task;
403
404macro_rules! impl_setup_task_async {
405    ($this:expr, $this_type:ty, $task_name:ident, $task_routine:ident ) => {{
406        let registry = $this.registry();
407        $this.$task_name.set_routine(move |s, l, t| {
408            let registry = registry.clone();
409            Box::pin(async move {
410                let this = registry.lookup::<$this_type>().unwrap_or_log();
411                this.$task_routine(s, Timestamp::new(l), Timestamp::new(t))
412                    .await
413            })
414        });
415    }};
416}
417
418pub(crate) use impl_setup_task_async;
419
420macro_rules! impl_setup_task_async_clone {
421    ($this:expr, $task_name:ident, $task_routine:ident ) => {{
422        let this = $this.clone();
423        $this.$task_name.set_routine(move |s, l, t| {
424            let this = this.clone();
425            Box::pin(async move {
426                this.$task_routine(s, Timestamp::new(l), Timestamp::new(t))
427                    .await
428            })
429        });
430    }};
431}
432
433pub(crate) use impl_setup_task_async_clone;
434
435// Utility macro for setting up an event bus handler
436// Should be called after init, during post-init or later
437// Subscription should be unsubscribed before termination
438macro_rules! impl_subscribe_event_bus {
439    ($this:expr, $this_type:ty, $event_handler:ident ) => {{
440        let registry = $this.registry();
441        $this.event_bus().subscribe(move |evt| {
442            let registry = registry.clone();
443            Box::pin(async move {
444                let this = registry.lookup::<$this_type>().unwrap_or_log();
445                this.$event_handler(evt);
446            })
447        })
448    }};
449}
450
451pub(crate) use impl_subscribe_event_bus;
452
453macro_rules! impl_subscribe_event_bus_async {
454    ($this:expr, $this_type:ty, $event_handler:ident ) => {{
455        let registry = $this.registry();
456        $this.event_bus().subscribe(move |evt| {
457            let registry = registry.clone();
458            Box::pin(async move {
459                let this = registry.lookup::<$this_type>().unwrap_or_log();
460                this.$event_handler(evt).await;
461            })
462        })
463    }};
464}
465
466pub(crate) use impl_subscribe_event_bus_async;