1use super::*;
2
3impl_veilid_log_facility!("registry");
4
5pub(crate) trait AsAnyArcSendSync {
6 fn as_any_arc_send_sync(self: Arc<Self>) -> Arc<dyn core::any::Any + Send + Sync>;
7}
8
9impl<T: Send + Sync + 'static> AsAnyArcSendSync for T {
10 fn as_any_arc_send_sync(self: Arc<Self>) -> Arc<dyn core::any::Any + Send + Sync> {
11 self
12 }
13}
14
15pub(crate) trait VeilidComponent:
16 AsAnyArcSendSync + VeilidComponentRegistryAccessor + core::fmt::Debug
17{
18 fn name(&self) -> &'static str;
19 fn log_facilities(&self) -> VeilidComponentLogFacilities;
20 fn init(&self) -> PinBoxFuture<'_, EyreResult<()>>;
21 fn post_init(&self) -> PinBoxFuture<'_, EyreResult<()>>;
22 fn pre_terminate(&self) -> PinBoxFuture<'_, ()>;
23 fn terminate(&self) -> PinBoxFuture<'_, ()>;
24}
25
26pub(crate) trait VeilidComponentRegistryAccessor {
27 fn registry(&self) -> VeilidComponentRegistry;
28
29 fn config(&self) -> Arc<VeilidConfig> {
30 self.registry().unlocked_inner.startup_options.config()
31 }
32 fn update_callback(&self) -> UpdateCallback {
33 self.registry()
34 .unlocked_inner
35 .startup_options
36 .update_callback()
37 }
38 fn event_bus(&self) -> EventBus {
39 self.registry().event_bus()
40 }
41 fn log_key(&self) -> VeilidLogKey {
42 self.registry().log_key()
43 }
44}
45
46pub struct VeilidComponentGuard<'a, T: Send + Sync + 'static> {
47 component: Arc<T>,
48 _phantom: core::marker::PhantomData<&'a T>,
49}
50
51impl<T> core::ops::Deref for VeilidComponentGuard<'_, T>
52where
53 T: Send + Sync + 'static,
54{
55 type Target = T;
56
57 fn deref(&self) -> &Self::Target {
58 &self.component
59 }
60}
61
62#[derive(Debug)]
63struct VeilidComponentRegistryInner {
64 type_map: HashMap<core::any::TypeId, Arc<dyn VeilidComponent + Send + Sync>>,
65 init_order: Vec<core::any::TypeId>,
66 #[cfg(any(test, feature = "test-util"))]
67 mock: bool,
68}
69
70#[derive(Debug)]
71struct VeilidComponentRegistryUnlockedInner {
72 inner: Mutex<VeilidComponentRegistryInner>,
73 startup_options: VeilidStartupOptions,
74 namespace: &'static str,
75 program_name: &'static str,
76 log_key: &'static str,
77 event_bus: EventBus,
78 init_lock: AsyncMutex<bool>,
79}
80
81#[derive(Clone, Debug)]
82pub(crate) struct VeilidComponentRegistry {
83 unlocked_inner: Arc<VeilidComponentRegistryUnlockedInner>,
84}
85
86impl VeilidComponentRegistry {
87 pub fn new(startup_options: VeilidStartupOptions) -> Self {
88 let namespace = startup_options.config().namespace.to_static_str();
89 let program_name = startup_options.config().program_name.to_static_str();
90
91 let log_key = VeilidLayerFilter::make_veilid_log_key(program_name, namespace);
92
93 Self {
94 unlocked_inner: Arc::new(VeilidComponentRegistryUnlockedInner {
95 inner: Mutex::new(VeilidComponentRegistryInner {
96 type_map: HashMap::new(),
97 init_order: Vec::new(),
98 #[cfg(any(test, feature = "test-util"))]
99 mock: false,
100 }),
101 startup_options,
102 namespace,
103 program_name,
104 log_key,
105 event_bus: EventBus::new(),
106 init_lock: AsyncMutex::new(false),
107 }),
108 }
109 }
110
111 #[cfg(any(test, feature = "test-util"))]
112 pub fn enable_mock(&self) {
113 let mut inner = self.unlocked_inner.inner.lock();
114 inner.mock = true;
115 }
116 #[expect(dead_code)]
123 pub fn namespace(&self) -> &'static str {
124 self.unlocked_inner.namespace
125 }
126
127 #[allow(dead_code)]
128 pub fn program_name(&self) -> &'static str {
129 self.unlocked_inner.program_name
130 }
131
132 pub fn log_key(&self) -> VeilidLogKey {
133 self.unlocked_inner.log_key
134 }
135
136 pub fn event_bus(&self) -> EventBus {
137 self.unlocked_inner.event_bus.clone()
138 }
139
140 pub fn register<
141 T: VeilidComponent + Send + Sync + 'static,
142 F: FnOnce(VeilidComponentRegistry) -> T,
143 >(
144 &self,
145 component_constructor: F,
146 ) {
147 let component = Arc::new(component_constructor(self.clone()));
148 let component_type_id = core::any::TypeId::of::<T>();
149
150 let mut inner = self.unlocked_inner.inner.lock();
152 assert!(
153 inner
154 .type_map
155 .insert(component_type_id, component)
156 .is_none(),
157 "should not register same component twice"
158 );
159 inner.init_order.push(component_type_id);
160 }
161
162 pub fn register_with_context<
163 C,
164 T: VeilidComponent + Send + Sync + 'static,
165 F: FnOnce(VeilidComponentRegistry, C) -> T,
166 >(
167 &self,
168 component_constructor: F,
169 context: C,
170 ) {
171 let component = Arc::new(component_constructor(self.clone(), context));
172 let component_type_id = core::any::TypeId::of::<T>();
173
174 let mut inner = self.unlocked_inner.inner.lock();
176 assert!(
177 inner
178 .type_map
179 .insert(component_type_id, component)
180 .is_none(),
181 "should not register same component twice"
182 );
183 inner.init_order.push(component_type_id);
184 }
185
186 pub async fn init(&self) -> EyreResult<()> {
187 let Some(mut _init_guard) = self.unlocked_inner.init_lock.try_lock() else {
188 bail!("init should only happen one at a time");
189 };
190 if *_init_guard {
191 bail!("already initialized");
192 }
193
194 VeilidLayerFilter::init_veilid_component_log_facilities(
195 self.log_key(),
196 self.get_init_order()
197 .into_iter()
198 .map(|x| x.log_facilities())
199 .collect(),
200 )?;
201
202 self.unlocked_inner.event_bus.startup()?;
204
205 let init_order = self.get_init_order();
207 let mut initialized = vec![];
208 for component in init_order {
209 if let Err(e) = component.init().await {
210 veilid_log!(self error "Error initializing component '{}': {}", component.name(), e);
211 self.terminate_inner(initialized).await;
212 self.unlocked_inner.event_bus.shutdown().await;
213 return Err(e);
214 }
215 initialized.push(component);
216 }
217
218 *_init_guard = true;
219 Ok(())
220 }
221
222 pub async fn post_init(&self) -> EyreResult<()> {
223 let Some(mut _init_guard) = self.unlocked_inner.init_lock.try_lock() else {
224 bail!("init should only happen one at a time");
225 };
226 if !*_init_guard {
227 bail!("not initialized");
228 }
229
230 let init_order = self.get_init_order();
231 let mut post_initialized = vec![];
232 for component in init_order {
233 if let Err(e) = component.post_init().await {
234 self.pre_terminate_inner(post_initialized).await;
235 return Err(e);
236 }
237 post_initialized.push(component)
238 }
239 Ok(())
240 }
241
242 pub async fn pre_terminate(&self) {
243 let Some(mut _init_guard) = self.unlocked_inner.init_lock.try_lock() else {
244 panic!("terminate should only happen one at a time");
245 };
246 if !*_init_guard {
247 panic!("not initialized");
248 }
249
250 let init_order = self.get_init_order();
251 self.pre_terminate_inner(init_order).await;
252 }
253
254 pub async fn terminate(&self) {
255 let Some(mut _init_guard) = self.unlocked_inner.init_lock.try_lock() else {
256 panic!("terminate should only happen one at a time");
257 };
258 if !*_init_guard {
259 panic!("not initialized");
260 }
261
262 let init_order = self.get_init_order();
264 self.terminate_inner(init_order).await;
265
266 self.unlocked_inner.event_bus.shutdown().await;
268
269 if let Err(e) = VeilidLayerFilter::terminate_veilid_component_log_facilities(self.log_key())
271 {
272 eprintln!("Error terminating log facilities: {}", e);
273 }
274
275 *_init_guard = false;
276 }
277
278 async fn pre_terminate_inner(
279 &self,
280 pre_initialized: Vec<Arc<dyn VeilidComponent + Send + Sync>>,
281 ) {
282 for component in pre_initialized.iter().rev() {
283 component.pre_terminate().await;
284 }
285 }
286 async fn terminate_inner(&self, initialized: Vec<Arc<dyn VeilidComponent + Send + Sync>>) {
287 for component in initialized.iter().rev() {
288 let refs = Arc::strong_count(component);
289 if refs > 2 {
290 veilid_log!(self warn
291 "Terminating component '{}' while still referenced ({} extra references)",
292 component.name(),
293 refs - 2
294 );
295 }
296 component.terminate().await;
297 }
298 }
299
300 fn get_init_order(&self) -> Vec<Arc<dyn VeilidComponent + Send + Sync>> {
301 let inner = self.unlocked_inner.inner.lock();
302 inner
303 .init_order
304 .iter()
305 .map(|id| inner.type_map.get(id).unwrap_or_log().clone())
306 .collect::<Vec<_>>()
307 }
308
309 pub fn lookup<'a, T: VeilidComponent + Send + Sync + 'static>(
312 &self,
313 ) -> Option<VeilidComponentGuard<'a, T>> {
314 let inner = self.unlocked_inner.inner.lock();
315 let component_type_id = core::any::TypeId::of::<T>();
316 let component_dyn = inner.type_map.get(&component_type_id)?.clone();
317 let component = component_dyn
318 .as_any_arc_send_sync()
319 .downcast::<T>()
320 .unwrap_or_log();
321 Some(VeilidComponentGuard {
322 component,
323 _phantom: core::marker::PhantomData {},
324 })
325 }
326}
327
328impl VeilidComponentRegistryAccessor for VeilidComponentRegistry {
329 fn registry(&self) -> VeilidComponentRegistry {
330 self.clone()
331 }
332}
333
334macro_rules! impl_veilid_component_accessors {
337 ($struct_name:ty) => {
338 impl VeilidComponentRegistryAccessor for $struct_name {
339 fn registry(&self) -> VeilidComponentRegistry {
340 self.registry.clone()
341 }
342 }
343 };
344}
345
346pub(crate) use impl_veilid_component_accessors;
347
348macro_rules! impl_veilid_component {
351 ($component_name:ty) => {
352 impl_veilid_component_accessors!($component_name);
353
354 impl VeilidComponent for $component_name {
355 fn name(&self) -> &'static str {
356 stringify!($component_name)
357 }
358
359 fn log_facilities(&self) -> VeilidComponentLogFacilities {
360 <$component_name>::log_facilities_impl(self)
361 }
362
363 fn init(&self) -> PinBoxFuture<'_, EyreResult<()>> {
364 Box::pin(async { self.init_async().await })
365 }
366
367 fn post_init(&self) -> PinBoxFuture<'_, EyreResult<()>> {
368 Box::pin(async { self.post_init_async().await })
369 }
370
371 fn pre_terminate(&self) -> PinBoxFuture<'_, ()> {
372 Box::pin(async { self.pre_terminate_async().await })
373 }
374
375 fn terminate(&self) -> PinBoxFuture<'_, ()> {
376 Box::pin(async { self.terminate_async().await })
377 }
378 }
379 };
380}
381
382pub(crate) use impl_veilid_component;
383
384macro_rules! impl_setup_task {
390 ($this:expr, $this_type:ty, $task_name:ident, $task_routine:ident ) => {{
391 let registry = $this.registry();
392 $this.$task_name.set_routine(move |s, l, t| {
393 let registry = registry.clone();
394 Box::pin(async move {
395 let this = registry.lookup::<$this_type>().unwrap_or_log();
396 this.$task_routine(s, Timestamp::new(l), Timestamp::new(t))
397 })
398 });
399 }};
400}
401
402pub(crate) use impl_setup_task;
403
404macro_rules! impl_setup_task_async {
405 ($this:expr, $this_type:ty, $task_name:ident, $task_routine:ident ) => {{
406 let registry = $this.registry();
407 $this.$task_name.set_routine(move |s, l, t| {
408 let registry = registry.clone();
409 Box::pin(async move {
410 let this = registry.lookup::<$this_type>().unwrap_or_log();
411 this.$task_routine(s, Timestamp::new(l), Timestamp::new(t))
412 .await
413 })
414 });
415 }};
416}
417
418pub(crate) use impl_setup_task_async;
419
420macro_rules! impl_setup_task_async_clone {
421 ($this:expr, $task_name:ident, $task_routine:ident ) => {{
422 let this = $this.clone();
423 $this.$task_name.set_routine(move |s, l, t| {
424 let this = this.clone();
425 Box::pin(async move {
426 this.$task_routine(s, Timestamp::new(l), Timestamp::new(t))
427 .await
428 })
429 });
430 }};
431}
432
433pub(crate) use impl_setup_task_async_clone;
434
435macro_rules! impl_subscribe_event_bus {
439 ($this:expr, $this_type:ty, $event_handler:ident ) => {{
440 let registry = $this.registry();
441 $this.event_bus().subscribe(move |evt| {
442 let registry = registry.clone();
443 Box::pin(async move {
444 let this = registry.lookup::<$this_type>().unwrap_or_log();
445 this.$event_handler(evt);
446 })
447 })
448 }};
449}
450
451pub(crate) use impl_subscribe_event_bus;
452
453macro_rules! impl_subscribe_event_bus_async {
454 ($this:expr, $this_type:ty, $event_handler:ident ) => {{
455 let registry = $this.registry();
456 $this.event_bus().subscribe(move |evt| {
457 let registry = registry.clone();
458 Box::pin(async move {
459 let this = registry.lookup::<$this_type>().unwrap_or_log();
460 this.$event_handler(evt).await;
461 })
462 })
463 }};
464}
465
466pub(crate) use impl_subscribe_event_bus_async;