tycho_network/util/
router.rs

1use std::marker::PhantomData;
2use std::sync::Arc;
3
4use tycho_util::futures::BoxFutureOrNoop;
5use tycho_util::FastHashMap;
6
7use crate::types::{BoxService, Service, ServiceExt};
8
9pub trait Routable {
10    #[inline]
11    fn query_ids(&self) -> impl IntoIterator<Item = u32> {
12        std::iter::empty()
13    }
14
15    #[inline]
16    fn message_ids(&self) -> impl IntoIterator<Item = u32> {
17        std::iter::empty()
18    }
19
20    #[inline]
21    fn datagram_ids(&self) -> impl IntoIterator<Item = u32> {
22        std::iter::empty()
23    }
24}
25
26pub struct RouterBuilder<Request, Q> {
27    inner: Inner<Request, Q>,
28}
29
30impl<Request, Q> RouterBuilder<Request, Q> {
31    pub fn route<S>(mut self, service: S) -> Self
32    where
33        S: Service<Request, QueryResponse = Q> + Routable + Send + Sync + 'static,
34    {
35        let index = self.inner.services.len();
36        for id in service.query_ids() {
37            let prev = self.inner.query_handlers.insert(id, index);
38            assert!(prev.is_none(), "duplicate query id: {:08x}", id);
39        }
40        for id in service.message_ids() {
41            let prev = self.inner.message_handlers.insert(id, index);
42            assert!(prev.is_none(), "duplicate message id: {:08x}", id);
43        }
44        for id in service.datagram_ids() {
45            let prev = self.inner.datagram_handlers.insert(id, index);
46            assert!(prev.is_none(), "duplicate datagram id: {:08x}", id);
47        }
48
49        self.inner.services.push(service.boxed());
50        self
51    }
52
53    pub fn build(self) -> Router<Request, Q> {
54        Router {
55            inner: Arc::new(self.inner),
56        }
57    }
58}
59
60impl<Request, Q> Default for RouterBuilder<Request, Q> {
61    fn default() -> Self {
62        Self {
63            inner: Inner {
64                services: Vec::new(),
65                query_handlers: FastHashMap::default(),
66                message_handlers: FastHashMap::default(),
67                datagram_handlers: FastHashMap::default(),
68                _response: PhantomData,
69            },
70        }
71    }
72}
73
74pub struct Router<Request, Q> {
75    inner: Arc<Inner<Request, Q>>,
76}
77
78impl<Request, Q> Router<Request, Q> {
79    pub fn builder() -> RouterBuilder<Request, Q> {
80        RouterBuilder::default()
81    }
82}
83
84impl<Request, Q> Clone for Router<Request, Q> {
85    #[inline]
86    fn clone(&self) -> Self {
87        Self {
88            inner: self.inner.clone(),
89        }
90    }
91}
92
93impl<Request, Q> Service<Request> for Router<Request, Q>
94where
95    Request: Send + AsRef<[u8]> + 'static,
96    Q: Send + 'static,
97{
98    type QueryResponse = Q;
99    type OnQueryFuture = BoxFutureOrNoop<Option<Self::QueryResponse>>;
100    type OnMessageFuture = BoxFutureOrNoop<()>;
101    type OnDatagramFuture = BoxFutureOrNoop<()>;
102
103    fn on_query(&self, req: Request) -> Self::OnQueryFuture {
104        match find_handler(&req, &self.inner.query_handlers, &self.inner.services) {
105            Some(service) => BoxFutureOrNoop::Boxed(service.on_query(req)),
106            None => BoxFutureOrNoop::Noop,
107        }
108    }
109
110    fn on_message(&self, req: Request) -> Self::OnMessageFuture {
111        match find_handler(&req, &self.inner.message_handlers, &self.inner.services) {
112            Some(service) => BoxFutureOrNoop::Boxed(service.on_message(req)),
113            None => BoxFutureOrNoop::Noop,
114        }
115    }
116
117    fn on_datagram(&self, req: Request) -> Self::OnDatagramFuture {
118        match find_handler(&req, &self.inner.datagram_handlers, &self.inner.services) {
119            Some(service) => BoxFutureOrNoop::Boxed(service.on_datagram(req)),
120            None => BoxFutureOrNoop::Noop,
121        }
122    }
123}
124
125fn find_handler<'a, T: AsRef<[u8]>, S>(
126    req: &T,
127    indices: &FastHashMap<u32, usize>,
128    handlers: &'a [S],
129) -> Option<&'a S> {
130    if let Some(id) = read_le_u32(req.as_ref()) {
131        if let Some(&index) = indices.get(&id) {
132            // NOTE: intentionally panics if index is out of bounds as it is
133            // an implementation error.
134            return Some(handlers.get(index).expect("index must be in bounds"));
135        }
136    }
137    None
138}
139
140struct Inner<Request, Q> {
141    services: Vec<BoxService<Request, Q>>,
142    query_handlers: FastHashMap<u32, usize>,
143    message_handlers: FastHashMap<u32, usize>,
144    datagram_handlers: FastHashMap<u32, usize>,
145    _response: PhantomData<Q>,
146}
147
148fn read_le_u32(buf: &[u8]) -> Option<u32> {
149    if buf.len() >= 4 {
150        let mut bytes = [0; 4];
151        bytes.copy_from_slice(&buf[..4]);
152        Some(u32::from_le_bytes(bytes))
153    } else {
154        None
155    }
156}