1use super::*;
2
3impl_veilid_log_facility!("registry");
4
5pub(crate) trait AsAnyArcSendSync {
6 fn as_any_arc_send_sync(self: Arc<Self>) -> Arc<dyn core::any::Any + Send + Sync>;
7}
8
9impl<T: Send + Sync + 'static> AsAnyArcSendSync for T {
10 fn as_any_arc_send_sync(self: Arc<Self>) -> Arc<dyn core::any::Any + Send + Sync> {
11 self
12 }
13}
14
15pub(crate) trait VeilidComponent:
16 AsAnyArcSendSync + VeilidComponentRegistryAccessor + core::fmt::Debug
17{
18 fn name(&self) -> &'static str;
19 fn log_facilities(&self) -> VeilidComponentLogFacilities;
20 fn init(&self) -> PinBoxFuture<'_, EyreResult<()>>;
21 fn post_init(&self) -> PinBoxFuture<'_, EyreResult<()>>;
22 fn pre_terminate(&self) -> PinBoxFuture<'_, ()>;
23 fn terminate(&self) -> PinBoxFuture<'_, ()>;
24}
25
26pub(crate) trait VeilidComponentRegistryAccessor {
27 fn registry(&self) -> VeilidComponentRegistry;
28
29 fn config(&self) -> Arc<VeilidConfig> {
30 self.registry().startup_options.config()
31 }
32 fn update_callback(&self) -> UpdateCallback {
33 self.registry().startup_options.update_callback()
34 }
35 fn event_bus(&self) -> EventBus {
36 self.registry().event_bus.clone()
37 }
38 fn log_key(&self) -> VeilidLogKey {
39 self.registry().log_key()
40 }
41}
42
43pub struct VeilidComponentGuard<'a, T: Send + Sync + 'static> {
44 component: Arc<T>,
45 _phantom: core::marker::PhantomData<&'a T>,
46}
47
48impl<T> core::ops::Deref for VeilidComponentGuard<'_, T>
49where
50 T: Send + Sync + 'static,
51{
52 type Target = T;
53
54 fn deref(&self) -> &Self::Target {
55 &self.component
56 }
57}
58
59#[derive(Debug)]
60struct VeilidComponentRegistryInner {
61 type_map: HashMap<core::any::TypeId, Arc<dyn VeilidComponent + Send + Sync>>,
62 init_order: Vec<core::any::TypeId>,
63 #[cfg(any(test, feature = "test-util"))]
64 mock: bool,
65}
66
67#[derive(Clone, Debug)]
68pub(crate) struct VeilidComponentRegistry {
69 inner: Arc<Mutex<VeilidComponentRegistryInner>>,
70 startup_options: VeilidStartupOptions,
71 namespace: &'static str,
72 program_name: &'static str,
73 log_key: &'static str,
74 event_bus: EventBus,
75 init_lock: Arc<AsyncMutex<bool>>,
76}
77
78impl VeilidComponentRegistry {
79 pub fn new(startup_options: VeilidStartupOptions) -> Self {
80 let namespace = startup_options.config().namespace.to_static_str();
81 let program_name = startup_options.config().program_name.to_static_str();
82
83 let log_key = VeilidLayerFilter::make_veilid_log_key(program_name, namespace);
84
85 Self {
86 inner: Arc::new(Mutex::new(VeilidComponentRegistryInner {
87 type_map: HashMap::new(),
88 init_order: Vec::new(),
89 #[cfg(any(test, feature = "test-util"))]
90 mock: false,
91 })),
92 startup_options,
93 namespace,
94 program_name,
95 log_key,
96 event_bus: EventBus::new(),
97 init_lock: Arc::new(AsyncMutex::new(false)),
98 }
99 }
100
101 #[cfg(any(test, feature = "test-util"))]
102 pub fn enable_mock(&self) {
103 let mut inner = self.inner.lock();
104 inner.mock = true;
105 }
106 #[expect(dead_code)]
113 pub fn namespace(&self) -> &'static str {
114 self.namespace
115 }
116
117 #[allow(dead_code)]
118 pub fn program_name(&self) -> &'static str {
119 self.program_name
120 }
121
122 pub fn log_key(&self) -> VeilidLogKey {
123 self.log_key
124 }
125
126 pub fn register<
127 T: VeilidComponent + Send + Sync + 'static,
128 F: FnOnce(VeilidComponentRegistry) -> T,
129 >(
130 &self,
131 component_constructor: F,
132 ) {
133 let component = Arc::new(component_constructor(self.clone()));
134 let component_type_id = core::any::TypeId::of::<T>();
135
136 let mut inner = self.inner.lock();
138 assert!(
139 inner
140 .type_map
141 .insert(component_type_id, component)
142 .is_none(),
143 "should not register same component twice"
144 );
145 inner.init_order.push(component_type_id);
146 }
147
148 pub fn register_with_context<
149 C,
150 T: VeilidComponent + Send + Sync + 'static,
151 F: FnOnce(VeilidComponentRegistry, C) -> T,
152 >(
153 &self,
154 component_constructor: F,
155 context: C,
156 ) {
157 let component = Arc::new(component_constructor(self.clone(), context));
158 let component_type_id = core::any::TypeId::of::<T>();
159
160 let mut inner = self.inner.lock();
162 assert!(
163 inner
164 .type_map
165 .insert(component_type_id, component)
166 .is_none(),
167 "should not register same component twice"
168 );
169 inner.init_order.push(component_type_id);
170 }
171
172 pub async fn init(&self) -> EyreResult<()> {
173 let Some(mut _init_guard) = self.init_lock.try_lock() else {
174 bail!("init should only happen one at a time");
175 };
176 if *_init_guard {
177 bail!("already initialized");
178 }
179
180 VeilidLayerFilter::init_veilid_component_log_facilities(
181 self.log_key(),
182 self.get_init_order()
183 .into_iter()
184 .map(|x| x.log_facilities())
185 .collect(),
186 )?;
187
188 self.event_bus.startup()?;
190
191 let init_order = self.get_init_order();
193 let mut initialized = vec![];
194 for component in init_order {
195 if let Err(e) = component.init().await {
196 self.terminate_inner(initialized).await;
197 self.event_bus.shutdown().await;
198 return Err(e);
199 }
200 initialized.push(component);
201 }
202
203 *_init_guard = true;
204 Ok(())
205 }
206
207 pub async fn post_init(&self) -> EyreResult<()> {
208 let Some(mut _init_guard) = self.init_lock.try_lock() else {
209 bail!("init should only happen one at a time");
210 };
211 if !*_init_guard {
212 bail!("not initialized");
213 }
214
215 let init_order = self.get_init_order();
216 let mut post_initialized = vec![];
217 for component in init_order {
218 if let Err(e) = component.post_init().await {
219 self.pre_terminate_inner(post_initialized).await;
220 return Err(e);
221 }
222 post_initialized.push(component)
223 }
224 Ok(())
225 }
226
227 pub async fn pre_terminate(&self) {
228 let Some(mut _init_guard) = self.init_lock.try_lock() else {
229 panic!("terminate should only happen one at a time");
230 };
231 if !*_init_guard {
232 panic!("not initialized");
233 }
234
235 let init_order = self.get_init_order();
236 self.pre_terminate_inner(init_order).await;
237 }
238
239 pub async fn terminate(&self) {
240 let Some(mut _init_guard) = self.init_lock.try_lock() else {
241 panic!("terminate should only happen one at a time");
242 };
243 if !*_init_guard {
244 panic!("not initialized");
245 }
246
247 let init_order = self.get_init_order();
249 self.terminate_inner(init_order).await;
250
251 self.event_bus.shutdown().await;
253
254 if let Err(e) = VeilidLayerFilter::terminate_veilid_component_log_facilities(self.log_key())
256 {
257 eprintln!("Error terminating log facilities: {}", e);
258 }
259
260 *_init_guard = false;
261 }
262
263 async fn pre_terminate_inner(
264 &self,
265 pre_initialized: Vec<Arc<dyn VeilidComponent + Send + Sync>>,
266 ) {
267 for component in pre_initialized.iter().rev() {
268 component.pre_terminate().await;
269 }
270 }
271 async fn terminate_inner(&self, initialized: Vec<Arc<dyn VeilidComponent + Send + Sync>>) {
272 for component in initialized.iter().rev() {
273 let refs = Arc::strong_count(component);
274 if refs > 2 {
275 veilid_log!(self warn
276 "Terminating component '{}' while still referenced ({} extra references)",
277 component.name(),
278 refs - 2
279 );
280 }
281 component.terminate().await;
282 }
283 }
284
285 fn get_init_order(&self) -> Vec<Arc<dyn VeilidComponent + Send + Sync>> {
286 let inner = self.inner.lock();
287 inner
288 .init_order
289 .iter()
290 .map(|id| inner.type_map.get(id).unwrap_or_log().clone())
291 .collect::<Vec<_>>()
292 }
293
294 pub fn lookup<'a, T: VeilidComponent + Send + Sync + 'static>(
297 &self,
298 ) -> Option<VeilidComponentGuard<'a, T>> {
299 let inner = self.inner.lock();
300 let component_type_id = core::any::TypeId::of::<T>();
301 let component_dyn = inner.type_map.get(&component_type_id)?.clone();
302 let component = component_dyn
303 .as_any_arc_send_sync()
304 .downcast::<T>()
305 .unwrap_or_log();
306 Some(VeilidComponentGuard {
307 component,
308 _phantom: core::marker::PhantomData {},
309 })
310 }
311}
312
313impl VeilidComponentRegistryAccessor for VeilidComponentRegistry {
314 fn registry(&self) -> VeilidComponentRegistry {
315 self.clone()
316 }
317}
318
319macro_rules! impl_veilid_component_accessors {
322 ($struct_name:ty) => {
323 impl VeilidComponentRegistryAccessor for $struct_name {
324 fn registry(&self) -> VeilidComponentRegistry {
325 self.registry.clone()
326 }
327 }
328 };
329}
330
331pub(crate) use impl_veilid_component_accessors;
332
333macro_rules! impl_veilid_component {
336 ($component_name:ty) => {
337 impl_veilid_component_accessors!($component_name);
338
339 impl VeilidComponent for $component_name {
340 fn name(&self) -> &'static str {
341 stringify!($component_name)
342 }
343
344 fn log_facilities(&self) -> VeilidComponentLogFacilities {
345 <$component_name>::log_facilities_impl(self)
346 }
347
348 fn init(&self) -> PinBoxFuture<'_, EyreResult<()>> {
349 Box::pin(async { self.init_async().await })
350 }
351
352 fn post_init(&self) -> PinBoxFuture<'_, EyreResult<()>> {
353 Box::pin(async { self.post_init_async().await })
354 }
355
356 fn pre_terminate(&self) -> PinBoxFuture<'_, ()> {
357 Box::pin(async { self.pre_terminate_async().await })
358 }
359
360 fn terminate(&self) -> PinBoxFuture<'_, ()> {
361 Box::pin(async { self.terminate_async().await })
362 }
363 }
364 };
365}
366
367pub(crate) use impl_veilid_component;
368
369macro_rules! impl_setup_task {
375 ($this:expr, $this_type:ty, $task_name:ident, $task_routine:ident ) => {{
376 let registry = $this.registry();
377 $this.$task_name.set_routine(move |s, l, t| {
378 let registry = registry.clone();
379 Box::pin(async move {
380 let this = registry.lookup::<$this_type>().unwrap_or_log();
381 this.$task_routine(s, Timestamp::new(l), Timestamp::new(t))
382 })
383 });
384 }};
385}
386
387pub(crate) use impl_setup_task;
388
389macro_rules! impl_setup_task_async {
390 ($this:expr, $this_type:ty, $task_name:ident, $task_routine:ident ) => {{
391 let registry = $this.registry();
392 $this.$task_name.set_routine(move |s, l, t| {
393 let registry = registry.clone();
394 Box::pin(async move {
395 let this = registry.lookup::<$this_type>().unwrap_or_log();
396 this.$task_routine(s, Timestamp::new(l), Timestamp::new(t))
397 .await
398 })
399 });
400 }};
401}
402
403pub(crate) use impl_setup_task_async;
404
405macro_rules! impl_subscribe_event_bus {
409 ($this:expr, $this_type:ty, $event_handler:ident ) => {{
410 let registry = $this.registry();
411 $this.event_bus().subscribe(move |evt| {
412 let registry = registry.clone();
413 Box::pin(async move {
414 let this = registry.lookup::<$this_type>().unwrap_or_log();
415 this.$event_handler(evt);
416 })
417 })
418 }};
419}
420
421pub(crate) use impl_subscribe_event_bus;
422
423macro_rules! impl_subscribe_event_bus_async {
424 ($this:expr, $this_type:ty, $event_handler:ident ) => {{
425 let registry = $this.registry();
426 $this.event_bus().subscribe(move |evt| {
427 let registry = registry.clone();
428 Box::pin(async move {
429 let this = registry.lookup::<$this_type>().unwrap_or_log();
430 this.$event_handler(evt).await;
431 })
432 })
433 }};
434}
435
436pub(crate) use impl_subscribe_event_bus_async;