1use std::any::Any;
2use std::collections::HashMap;
3use std::sync::{Arc, RwLock};
4use crate::events::PublishFn;
5use anyhow::Result;
6
7use crate::hooks::{collect_method_hooks, HookFut};
8use crate::{
9 DogConfig, DogService, DogServiceRegistry, HookContext, HookResult, Next, ServiceHooks,
10 ServiceMethodKind, TenantContext,
11};
12
13use crate::events::{method_to_standard_event, DogEventHub, ServiceEventData, ServiceEventKind};
14
15struct DogAppInner<R, P>
16where
17 R: Send + 'static,
18 P: Send + Clone + 'static,
19{
20 registry: RwLock<DogServiceRegistry<R, P>>,
21 global_hooks: RwLock<ServiceHooks<R, P>>,
22 service_hooks: RwLock<HashMap<String, ServiceHooks<R, P>>>,
23 config: RwLock<DogConfig>,
24 any_services: RwLock<HashMap<String, Box<dyn Any + Send + Sync>>>,
26 any_state: RwLock<HashMap<String, Box<dyn Any + Send + Sync>>>,
28 events: RwLock<DogEventHub<R, P>>,
29}
30
31pub enum AppValue {
33 Str(String),
34 Any(Box<dyn Any + Send + Sync>),
35}
36
37pub trait IntoAppValue {
38 fn into_value(self) -> AppValue;
39}
40
41impl IntoAppValue for String {
42 fn into_value(self) -> AppValue {
43 AppValue::Str(self)
44 }
45}
46
47impl IntoAppValue for &str {
48 fn into_value(self) -> AppValue {
49 AppValue::Str(self.to_string())
50 }
51}
52
53impl<T> IntoAppValue for Arc<T>
54where
55 T: Any + Send + Sync + 'static,
56{
57 fn into_value(self) -> AppValue {
58 AppValue::Any(Box::new(self))
59 }
60}
61
62pub trait FromAppValue: Sized {
63 fn from_config(_s: &str) -> Option<Self> { None }
64 fn from_any(_b: &Box<dyn Any + Send + Sync>) -> Option<Self> { None }
65}
66
67impl FromAppValue for String {
68 fn from_config(s: &str) -> Option<Self> {
69 Some(s.to_string())
70 }
71}
72
73impl<T> FromAppValue for Arc<T>
74where
75 T: Any + Send + Sync + 'static,
76{
77 fn from_any(b: &Box<dyn Any + Send + Sync>) -> Option<Self> {
78 b.downcast_ref::<Arc<T>>().cloned()
79 }
80}
81
82pub struct DogApp<R, P = ()>
90where
91 R: Send + 'static,
92 P: Send + Clone + 'static,
93{
94 inner: Arc<DogAppInner<R, P>>,
95}
96
97type HooksForMethod<R, P> = (
98 Vec<Arc<dyn crate::DogAroundHook<R, P>>>,
99 Vec<Arc<dyn crate::DogBeforeHook<R, P>>>,
100 Vec<Arc<dyn crate::DogAfterHook<R, P>>>,
101 Vec<Arc<dyn crate::DogErrorHook<R, P>>>,
102);
103
104type ServiceCall<R, P> = Arc<
105 dyn for<'a> Fn(Arc<dyn DogService<R, P>>, &'a mut HookContext<R, P>) -> HookFut<'a>
106 + Send
107 + Sync,
108>;
109
110impl<R, P> Default for DogApp<R, P>
111where
112 R: Send + 'static,
113 P: Send + Clone + 'static,
114{
115 fn default() -> Self {
116 Self::new()
117 }
118}
119
120impl<R, P> Clone for DogApp<R, P>
121where
122 R: Send + 'static,
123 P: Send + Clone + 'static,
124{
125 fn clone(&self) -> Self {
126 Self {
127 inner: Arc::clone(&self.inner),
128 }
129 }
130}
131
132impl<R, P> DogApp<R, P>
133where
134 R: Send + 'static,
135 P: Send + Clone + 'static,
136{
137 pub fn new() -> Self {
138 Self {
139 inner: Arc::new(DogAppInner {
140 registry: RwLock::new(DogServiceRegistry::new()),
141 global_hooks: RwLock::new(ServiceHooks::new()),
142 service_hooks: RwLock::new(HashMap::new()),
143 config: RwLock::new(DogConfig::new()),
144 any_services: RwLock::new(HashMap::new()),
145 any_state: RwLock::new(HashMap::new()),
146 events: RwLock::new(DogEventHub::new()),
147 }),
148 }
149 }
150
151 pub fn register_service<S>(&self, name: S, service: Arc<dyn DogService<R, P>>)
152 where
153 S: Into<String>,
154 {
155 let name = name.into();
156
157 self.inner
159 .registry
160 .write()
161 .unwrap()
162 .register(name.clone(), service.clone());
163
164 self.inner
166 .any_services
167 .write()
168 .unwrap()
169 .insert(name, Box::new(service));
170 }
171
172 pub fn hooks<F>(&self, f: F)
174 where
175 F: FnOnce(&mut ServiceHooks<R, P>),
176 {
177 let mut g = self.inner.global_hooks.write().unwrap();
178 f(&mut g);
179 }
180
181 pub(crate) fn configure_service_hooks<F>(&self, service_name: &str, f: F)
183 where
184 F: FnOnce(&mut ServiceHooks<R, P>),
185 {
186 let mut map = self.inner.service_hooks.write().unwrap();
187 let hooks = map
188 .entry(service_name.to_string())
189 .or_default();
190 f(hooks);
191 }
192
193 pub fn service(&self, name: &str) -> Result<ServiceHandle<R, P>> {
195 let svc = self
196 .inner
197 .registry
198 .read()
199 .unwrap()
200 .get(name)
201 .ok_or_else(|| anyhow::anyhow!("DogService not found: {name}"))?
202 .clone();
203
204 Ok(ServiceHandle {
205 app: self.clone(),
206 name: name.to_string(),
207 service: svc,
208 })
209 }
210
211 pub fn set<K, V>(&self, key: K, value: V)
215 where
216 K: Into<String>,
217 V: IntoAppValue,
218 {
219 match value.into_value() {
220 AppValue::Str(s) => {
221 self.inner.config.write().unwrap().set(key, s);
222 }
223 AppValue::Any(b) => {
224 self.inner
225 .any_state
226 .write()
227 .unwrap()
228 .insert(key.into(), b);
229 }
230 }
231 }
232
233 pub fn get<T>(&self, key: &str) -> Option<T>
237 where
238 T: FromAppValue,
239 {
240 if let Some(s) = self.inner.config.read().unwrap().get(key) {
242 if let Some(v) = T::from_config(s) {
243 return Some(v);
244 }
245 }
246 self.inner
248 .any_state
249 .read()
250 .unwrap()
251 .get(key)
252 .and_then(|b| T::from_any(b))
253 }
254
255 pub fn config_snapshot(&self) -> crate::DogConfigSnapshot {
258 let cfg = self.inner.config.read().unwrap();
259 cfg.snapshot()
260 }
261
262}
263
264impl<R, P> DogApp<R, P>
265where
266 R: Send + 'static,
267 P: Send + Clone + 'static,
268{
269 pub fn on(
271 &self,
272 path: impl Into<String>,
273 event: ServiceEventKind,
274 listener: crate::events::EventListener<R, P>,
275 ) {
276 self.inner.events.write().unwrap().on_exact(path, event, listener);
277 }
278
279 pub fn on_str(
280 &self,
281 pattern: &str,
282 listener: crate::events::EventListener<R, P>,
283 ) -> anyhow::Result<()> {
284 let pat = crate::events::parse_event_pattern(pattern)?;
285 self.inner.events.write().unwrap().on_pattern(pat, listener);
286 Ok(())
287 }
288
289 pub async fn emit_custom(
290 &self,
291 path: &str,
292 event_name: impl Into<String>,
293 payload: Arc<dyn Any + Send + Sync>,
294 ctx: &HookContext<R, P>,
295 ) {
296 let event = ServiceEventKind::Custom(event_name.into());
297 let data = ServiceEventData::Custom(&payload);
298
299 let (listeners, once_ids) = {
300 let hub = self.inner.events.read().unwrap();
301 hub.snapshot_emit(path, &event, &data, ctx)
302 };
303
304 for f in &listeners {
305 let _ = f(&data, ctx).await;
306 }
307
308 {
309 let mut hub = self.inner.events.write().unwrap();
310 hub.finalize_once_removals(&once_ids);
311 }
312 }
313
314 pub fn publish(&self, f: PublishFn<R, P>) {
315 self.inner.events.write().unwrap().set_publish(f);
316 }
317
318 pub fn clear_publish(&self) {
319 self.inner.events.write().unwrap().clear_publish();
320 }
321}
322
323pub struct ServiceHandle<R, P>
324where
325 R: Send + 'static,
326 P: Send + Clone + 'static,
327{
328 app: DogApp<R, P>,
329 name: String,
330 service: Arc<dyn DogService<R, P>>,
331}
332
333impl<R, P> ServiceHandle<R, P>
334where
335 R: Send + 'static,
336 P: Send + Clone + 'static,
337{
338 pub fn hooks<F>(self, f: F) -> Self
339 where
340 F: FnOnce(&mut ServiceHooks<R, P>),
341 {
342 self.app.configure_service_hooks(&self.name, f);
343 self
344 }
345
346 pub fn inner(&self) -> &Arc<dyn DogService<R, P>> {
347 &self.service
348 }
349
350}
351
352impl<R, P> ServiceHandle<R, P>
353where
354 R: Send + 'static,
355 P: Send + Clone + 'static,
356{
357
358 pub fn on(
360 &self,
361 event: ServiceEventKind,
362 listener: crate::events::EventListener<R, P>,
363 ) {
364 self.app.on(self.name.clone(), event, listener);
365 }
366 pub fn on_str(
367 &self,
368 event: &str,
369 listener: crate::events::EventListener<R, P>,
370 ) -> anyhow::Result<()> {
371 let ev = if event.trim() == "*" {
373 crate::events::EventPat::Any
374 } else {
375 crate::events::EventPat::Exact(crate::events::parse_event_kind(event)?)
376 };
377
378 let pat = crate::events::ServiceEventPattern {
379 service: crate::events::ServiceNamePat::Exact(self.name.clone()),
380 event: ev,
381 };
382
383 self.app.inner.events.write().unwrap().on_pattern(pat, listener);
384 Ok(())
385 }
386}
387
388
389impl<R, P> ServiceHandle<R, P>
394where
395 R: Send + 'static,
396 P: Send + Clone + 'static,
397{
398 fn collect_hooks_for_method(
401 &self,
402 method: &ServiceMethodKind,
403 ) -> HooksForMethod<R, P> {
404 let g = self.app.inner.global_hooks.read().unwrap();
405 let map = self.app.inner.service_hooks.read().unwrap();
406 let s = map.get(&self.name);
407
408 let mut around = collect_method_hooks(&g.around_all, &g.around_by_method, method);
410 let mut before = collect_method_hooks(&g.before_all, &g.before_by_method, method);
411 let mut after = collect_method_hooks(&g.after_all, &g.after_by_method, method);
412 let mut error = collect_method_hooks(&g.error_all, &g.error_by_method, method);
413
414 if let Some(h) = s {
416 around.extend(collect_method_hooks(
417 &h.around_all,
418 &h.around_by_method,
419 method,
420 ));
421 before.extend(collect_method_hooks(
422 &h.before_all,
423 &h.before_by_method,
424 method,
425 ));
426 after.extend(collect_method_hooks(
427 &h.after_all,
428 &h.after_by_method,
429 method,
430 ));
431 error.extend(collect_method_hooks(
432 &h.error_all,
433 &h.error_by_method,
434 method,
435 ));
436 }
437
438 (around, before, after, error)
439 }
440
441 async fn run_pipeline(
444 &self,
445 method: ServiceMethodKind,
446 mut ctx: HookContext<R, P>,
447 service_call: ServiceCall<R, P>,
448 ) -> Result<HookContext<R, P>> {
449 let (around, before, after, error) = self.collect_hooks_for_method(&method);
450
451 let svc = self.service.clone();
452 let service_call_inner = service_call.clone();
453
454 let mut next: Next<R, P> = Next {
456 call: Box::new(move |ctx: &mut HookContext<R, P>| -> HookFut<'_> {
457 let before = before.clone();
458 let after = after.clone();
459 let svc = svc.clone();
460 let service_call = service_call_inner.clone();
461
462 Box::pin(async move {
463 for h in &before {
464 h.run(ctx).await?;
465 }
466
467 (service_call)(svc, ctx).await?;
469
470 for h in after.iter().rev() {
471 h.run(ctx).await?;
472 }
473
474 Ok(())
475 })
476 }),
477 };
478
479 for h in around.iter().rev() {
481 let hook = h.clone();
482 let prev = next;
483 next = Next {
484 call: Box::new(move |ctx: &mut HookContext<R, P>| -> HookFut<'_> {
485 let hook = hook.clone();
486 Box::pin(async move { hook.run(ctx, prev).await })
487 }),
488 };
489 }
490
491 let res = next.run(&mut ctx).await;
493
494 if let Err(e) = res {
496 ctx.error = Some(e);
497
498 for h in &error {
499 let _ = h.run(&mut ctx).await;
500 }
501
502 if let Some(err) = ctx.error.take() {
504 return Err(err);
505 }
506 }
507
508 if ctx.error.is_none() {
511 if let Some(event) = method_to_standard_event(&method) {
512 if let Some(result) = ctx.result.as_ref() {
513 let data = ServiceEventData::Standard(result);
514
515 let (listeners, once_ids) = {
516 let hub = self.app.inner.events.read().unwrap();
517 hub.snapshot_emit(&self.name, &event, &data, &ctx)
518 };
519
520 for f in &listeners {
521 let _ = f(&data, &ctx).await;
522 }
523
524 {
525 let mut hub = self.app.inner.events.write().unwrap();
526 hub.finalize_once_removals(&once_ids);
527 }
528 }
529 }
530 }
531
532
533 Ok(ctx)
534
535 }
536
537 pub async fn find(&self, tenant: TenantContext, params: P) -> Result<Vec<R>> {
542 let method = ServiceMethodKind::Find;
543
544 let services = ServiceCaller::new(self.app.clone());
545 let config = self.app.config_snapshot();
546 let ctx = HookContext::new(tenant, method.clone(), params, services, config);
547
548 let ctx = self
549 .run_pipeline(
550 method,
551 ctx,
552 Arc::new(|svc, ctx| {
553 Box::pin(async move {
554 let records = svc.find(&ctx.tenant, ctx.params.clone()).await?;
555 ctx.result = Some(HookResult::Many(records));
556 Ok(())
557 })
558 }),
559 )
560 .await?;
561
562 match ctx.result {
563 Some(HookResult::Many(v)) => Ok(v),
564 Some(HookResult::One(_)) => Err(anyhow::anyhow!(
565 "find() produced HookResult::One unexpectedly"
566 )),
567 None => Ok(vec![]),
568 }
569 }
570
571 pub async fn get(&self, tenant: TenantContext, id: &str, params: P) -> Result<R> {
572 let method = ServiceMethodKind::Get;
573
574 let services = ServiceCaller::new(self.app.clone());
575 let config = self.app.config_snapshot();
576 let ctx = HookContext::new(tenant, method.clone(), params, services, config);
577
578 let id = id.to_string();
579
580 let ctx = self
581 .run_pipeline(
582 method,
583 ctx,
584 Arc::new(move |svc, ctx| {
585 let id = id.clone();
586 Box::pin(async move {
587 let record = svc.get(&ctx.tenant, &id, ctx.params.clone()).await?;
588 ctx.result = Some(HookResult::One(record));
589 Ok(())
590 })
591 }),
592 )
593 .await?;
594
595 match ctx.result {
596 Some(HookResult::One(v)) => Ok(v),
597 Some(HookResult::Many(_)) => Err(anyhow::anyhow!(
598 "get() produced HookResult::Many unexpectedly"
599 )),
600 None => Err(anyhow::anyhow!("get() produced no result")),
601 }
602 }
603
604 pub async fn create(&self, tenant: TenantContext, data: R, params: P) -> Result<R> {
605 let method = ServiceMethodKind::Create;
606
607 let services = ServiceCaller::new(self.app.clone());
608 let config = self.app.config_snapshot();
609 let mut ctx = HookContext::new(tenant, method.clone(), params, services, config);
610 ctx.data = Some(data);
611
612 let ctx = self
613 .run_pipeline(
614 method,
615 ctx,
616 Arc::new(|svc, ctx| {
617 Box::pin(async move {
618 let data = ctx
619 .data
620 .take()
621 .ok_or_else(|| anyhow::anyhow!("create() requires ctx.data"))?;
622
623 let created = svc.create(&ctx.tenant, data, ctx.params.clone()).await?;
624 ctx.result = Some(HookResult::One(created));
625 Ok(())
626 })
627 }),
628 )
629 .await?;
630
631 match ctx.result {
632 Some(HookResult::One(v)) => Ok(v),
633 Some(HookResult::Many(_)) => Err(anyhow::anyhow!(
634 "create() produced HookResult::Many unexpectedly"
635 )),
636 None => Err(anyhow::anyhow!("create() produced no result")),
637 }
638 }
639
640 pub async fn patch(
641 &self,
642 tenant: TenantContext,
643 id: Option<&str>,
644 data: R,
645 params: P,
646 ) -> Result<R> {
647 let method = ServiceMethodKind::Patch;
648
649 let services = ServiceCaller::new(self.app.clone());
650 let config = self.app.config_snapshot();
651 let mut ctx = HookContext::new(tenant, method.clone(), params, services, config);
652 ctx.data = Some(data);
653
654 let id: Option<String> = id.map(|s| s.to_string());
655
656 let ctx = self
657 .run_pipeline(
658 method,
659 ctx,
660 Arc::new(move |svc, ctx| {
661 let id = id.clone();
662 Box::pin(async move {
663 let data = ctx
664 .data
665 .take()
666 .ok_or_else(|| anyhow::anyhow!("patch() requires ctx.data"))?;
667
668 let patched = svc
669 .patch(&ctx.tenant, id.as_deref(), data, ctx.params.clone())
670 .await?;
671
672 ctx.result = Some(HookResult::One(patched));
673 Ok(())
674 })
675 }),
676 )
677 .await?;
678
679 match ctx.result {
680 Some(HookResult::One(v)) => Ok(v),
681 Some(HookResult::Many(_)) => Err(anyhow::anyhow!(
682 "patch() produced HookResult::Many unexpectedly"
683 )),
684 None => Err(anyhow::anyhow!("patch() produced no result")),
685 }
686 }
687
688 pub async fn update(&self, tenant: TenantContext, id: &str, data: R, params: P) -> Result<R> {
689 let method = ServiceMethodKind::Update;
690
691 let services = ServiceCaller::new(self.app.clone());
692 let config = self.app.config_snapshot();
693 let mut ctx = HookContext::new(tenant, method.clone(), params, services, config);
694 ctx.data = Some(data);
695
696 let id = id.to_string();
697
698 let ctx = self
699 .run_pipeline(
700 method,
701 ctx,
702 Arc::new(move |svc, ctx| {
703 let id = id.clone();
704 Box::pin(async move {
705 let data = ctx
706 .data
707 .take()
708 .ok_or_else(|| anyhow::anyhow!("update() requires ctx.data"))?;
709
710 let updated = svc
711 .update(&ctx.tenant, &id, data, ctx.params.clone())
712 .await?;
713
714 ctx.result = Some(HookResult::One(updated));
715 Ok(())
716 })
717 }),
718 )
719 .await?;
720
721 match ctx.result {
722 Some(HookResult::One(v)) => Ok(v),
723 Some(HookResult::Many(_)) => Err(anyhow::anyhow!(
724 "update() produced HookResult::Many unexpectedly"
725 )),
726 None => Err(anyhow::anyhow!("update() produced no result")),
727 }
728 }
729
730 pub async fn remove(&self, tenant: TenantContext, id: Option<&str>, params: P) -> Result<R> {
731 let method = ServiceMethodKind::Remove;
732
733 let services = ServiceCaller::new(self.app.clone());
734 let config = self.app.config_snapshot();
735 let ctx = HookContext::new(tenant, method.clone(), params, services, config);
736
737 let id: Option<String> = id.map(|s| s.to_string());
738
739 let ctx = self
740 .run_pipeline(
741 method,
742 ctx,
743 Arc::new(move |svc, ctx| {
744 let id = id.clone();
745 Box::pin(async move {
746 let removed = svc
747 .remove(&ctx.tenant, id.as_deref(), ctx.params.clone())
748 .await?;
749
750 ctx.result = Some(HookResult::One(removed));
751 Ok(())
752 })
753 }),
754 )
755 .await?;
756
757 match ctx.result {
758 Some(HookResult::One(v)) => Ok(v),
759 Some(HookResult::Many(_)) => Err(anyhow::anyhow!(
760 "remove() produced HookResult::Many unexpectedly"
761 )),
762 None => Err(anyhow::anyhow!("remove() produced no result")),
763 }
764 }
765
766 pub async fn custom(&self, tenant: TenantContext, method: &'static str, data: Option<R>, params: P) -> Result<R> {
768 let method_kind = ServiceMethodKind::Custom(method);
769
770 let services = ServiceCaller::new(self.app.clone());
771 let config = self.app.config_snapshot();
772 let mut ctx = HookContext::new(tenant, method_kind.clone(), params, services, config);
773 ctx.data = data;
774
775 let method_name = method.to_string();
776
777 let ctx = self
778 .run_pipeline(
779 method_kind,
780 ctx,
781 Arc::new(move |svc, ctx| {
782 let method_name = method_name.clone();
783 Box::pin(async move {
784 let result = svc
785 .custom(&ctx.tenant, &method_name, ctx.data.take(), ctx.params.clone())
786 .await?;
787
788 ctx.result = Some(HookResult::One(result));
789 Ok(())
790 })
791 }),
792 )
793 .await?;
794
795 match ctx.result {
796 Some(HookResult::One(v)) => Ok(v),
797 Some(HookResult::Many(_)) => Err(anyhow::anyhow!(
798 "custom() produced HookResult::Many unexpectedly"
799 )),
800 None => Err(anyhow::anyhow!("custom() produced no result")),
801 }
802 }
803}
804
805
806pub struct ServiceCaller<R, P>
807where
808 R: Send + 'static,
809 P: Send + Clone + 'static,
810{
811 app: DogApp<R, P>,
812}
813
814
815impl<R, P> Clone for ServiceCaller<R, P>
816where
817 R: Send + 'static,
818 P: Send + Clone + 'static,
819{
820 fn clone(&self) -> Self {
821 Self {
822 app: self.app.clone(),
823 }
824 }
825}
826
827impl<R, P> ServiceCaller<R, P>
828where
829 R: Send + 'static,
830 P: Send + Clone + 'static,
831{
832 pub fn new(app: DogApp<R, P>) -> Self {
833 Self { app }
834 }
835
836 pub fn service<R2, P2>(&self, name: &str) -> Result<Arc<dyn DogService<R2, P2>>>
837 where
838 R2: Send + 'static,
839 P2: Send + 'static,
840 {
841 let map = self.app.inner.any_services.read().unwrap();
842
843 let any = map
845 .get(name)
846 .ok_or_else(|| anyhow::anyhow!("DogService not found: {name}"))?;
847
848 let stored = any
850 .as_ref()
851 .downcast_ref::<Arc<dyn DogService<R2, P2>>>()
852 .ok_or_else(|| {
853 anyhow::anyhow!(
854 "DogService type mismatch for '{name}'. \
855 You requested a different <R,P> than what was registered."
856 )
857 })?;
858
859 Ok(stored.clone())
860 }
861}