1#![forbid(unsafe_code)]
2
3use futures_core::Stream;
4use std::any::{Any, TypeId};
5use std::borrow::Cow;
6use std::collections::HashMap;
7use std::future::Future;
8use std::hash::{BuildHasherDefault, Hasher};
9use std::ops::DerefMut;
10use std::sync::Arc;
11
12#[derive(Clone, Debug)]
21pub struct Scope {
22 protocol: Cow<'static, str>,
23 scopes: HashMap<TypeId, Arc<dyn Any + Sync + Send>, BuildHasherDefault<TypeIdHasher>>,
24}
25
26impl Scope {
27 #[inline]
29 pub fn new(protocol: Cow<'static, str>) -> Self {
30 Self {
31 protocol,
32 scopes: Default::default(),
33 }
34 }
35
36 #[inline]
38 pub fn protocol(&self) -> &str {
39 &self.protocol
40 }
41
42 #[inline]
44 pub fn with_protocol(self, protocol: Cow<'static, str>) -> Self {
45 Self {
46 protocol,
47 scopes: self.scopes,
48 }
49 }
50
51 #[inline]
54 pub fn get<T: Any + Sync + Send>(&self) -> Option<Arc<T>> {
55 self.scopes
56 .get(&TypeId::of::<T>())?
57 .clone()
58 .downcast::<T>()
59 .ok()
60 }
61
62 #[inline]
64 pub fn get_ref<T: Any + Sync + Send>(&self) -> Option<&T> {
65 self.scopes.get(&TypeId::of::<T>())?.downcast_ref::<T>()
66 }
67
68 #[inline]
70 pub fn insert<T: Any + Sync + Send>(&mut self, scope: T) -> Option<Arc<T>> {
71 self.scopes
72 .insert(TypeId::of::<T>(), Arc::new(scope))
73 .map(|arc| arc.downcast::<T>().unwrap())
74 }
75
76 #[inline]
80 pub fn remove<T: Any + Sync + Send>(&mut self) -> Option<Arc<T>> {
81 self.scopes
82 .remove(&TypeId::of::<T>())
83 .map(|arc| arc.downcast::<T>().unwrap())
84 }
85
86 #[inline]
88 pub fn with_scope<T: Any + Sync + Send>(mut self, scope: T) -> Self {
89 let _ = self.insert(scope);
90 self
91 }
92}
93
94#[derive(Clone, Debug)]
99pub struct Event {
100 family: Cow<'static, str>,
101 event: Arc<dyn Any + Sync + Send>,
102}
103
104impl Event {
105 #[inline]
107 pub fn new<T: Any + Sync + Send>(family: Cow<'static, str>, event: T) -> Self {
108 Self {
109 family,
110 event: Arc::new(event),
111 }
112 }
113
114 #[inline]
116 pub fn family(&self) -> &str {
117 &self.family
118 }
119
120 #[inline]
122 pub fn get<T: Any + Sync + Send>(&self) -> Option<Arc<T>> {
123 self.event.clone().downcast::<T>().ok()
124 }
125
126 #[inline]
128 pub fn get_ref<T: Any + Sync + Send>(&self) -> Option<&T> {
129 self.event.downcast_ref::<T>()
130 }
131}
132
133pub trait Service<ServerStream: Stream<Item = Event>> {
139 type AppStream: Stream<Item = Event>;
140 type Error: std::error::Error;
141 type Future: Future<Output = Result<Self::AppStream, Self::Error>>;
142
143 fn call(&mut self, scope: Scope, server_events: ServerStream) -> Self::Future;
145}
146
147#[derive(Default)]
148struct TypeIdHasher {
149 value: u64,
150}
151
152impl Hasher for TypeIdHasher {
153 #[inline]
154 fn finish(&self) -> u64 {
155 self.value
156 }
157
158 #[inline]
159 fn write(&mut self, bytes: &[u8]) {
160 debug_assert_eq!(bytes.len(), 8);
161 let _ = bytes
162 .try_into()
163 .map(|array| self.value = u64::from_ne_bytes(array));
164 }
165}
166
167impl<X, S, SS> Service<SS> for X
168where
169 X: DerefMut<Target = S>,
170 S: Service<SS>,
171 SS: Stream<Item = Event>,
172{
173 type AppStream = S::AppStream;
174 type Error = S::Error;
175 type Future = S::Future;
176
177 fn call(&mut self, scope: Scope, server_events: SS) -> Self::Future {
178 (**self).call(scope, server_events)
179 }
180}