Skip to main content

zerodds_discovery/type_lookup/
server.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3//! TypeLookup-Service Server-Side (XTypes 1.3 §7.6.3.3.4).
4//!
5//! Verarbeitet [`GetTypesRequest`] und [`GetTypeDependenciesRequest`]
6//! gegen eine lokale [`TypeRegistry`] und liefert Replies. Pagination
7//! der Dependency-Replies via [`ContinuationPoint`] (§7.6.3.3.3).
8//!
9//! Spec-Mapping (§7.6.3.3.4):
10//! - Operation `getTypes(type_ids)` → [`TypeLookupServer::handle_get_types`]
11//! - Operation `getTypeDependencies(type_ids, continuation_point)` →
12//!   [`TypeLookupServer::handle_get_type_dependencies`]
13//!
14//! Pagination-Konstanten:
15//! - [`TypeLookupServer::DEFAULT_DEPENDENCY_PAGE_SIZE`] = 100 dependencies
16//!   pro Reply. Konfigurierbar via [`TypeLookupServer::with_page_size`].
17
18use alloc::vec::Vec;
19
20use zerodds_types::resolve::TypeRegistry;
21use zerodds_types::type_information::TypeIdentifierWithSize;
22use zerodds_types::type_lookup::{
23    ContinuationPoint, GetTypeDependenciesReply, GetTypeDependenciesRequest, GetTypesReply,
24    GetTypesRequest, ReplyTypeObject,
25};
26use zerodds_types::type_object::TypeObject;
27use zerodds_types::{EquivalenceHash, TypeIdentifier};
28
29/// Server-Side TypeLookup-Service (Responder).
30///
31/// Haelt eine Referenz auf eine [`TypeRegistry`] und beantwortet
32/// RPC-Requests aus deren Inhalt. Stateless ueber Requests hinweg
33/// (jeder Request bringt seinen eigenen `ContinuationPoint` mit).
34#[derive(Debug, Clone)]
35pub struct TypeLookupServer {
36    /// Registry mit lokal bekannten TypeObjects.
37    pub registry: TypeRegistry,
38    /// Maximale Anzahl Dependencies pro `GetTypeDependenciesReply`.
39    page_size: usize,
40}
41
42impl TypeLookupServer {
43    /// Standard-Pagegroesse fuer Dependency-Replies (§7.6.3.3.4 erlaubt
44    /// jede Wahl; 100 ist ein Kompromiss zwischen Roundtrips und
45    /// Reply-Groesse).
46    pub const DEFAULT_DEPENDENCY_PAGE_SIZE: usize = 100;
47
48    /// Konstruiert einen Server ueber einer leeren Registry.
49    #[must_use]
50    pub fn new() -> Self {
51        Self {
52            registry: TypeRegistry::new(),
53            page_size: Self::DEFAULT_DEPENDENCY_PAGE_SIZE,
54        }
55    }
56
57    /// Konstruiert einen Server ueber einer existierenden Registry.
58    #[must_use]
59    pub fn with_registry(registry: TypeRegistry) -> Self {
60        Self {
61            registry,
62            page_size: Self::DEFAULT_DEPENDENCY_PAGE_SIZE,
63        }
64    }
65
66    /// Setzt die Page-Size fuer Dependency-Pagination.
67    /// `page_size = 0` wird auf 1 gehoben (eine Iteration pro Reply).
68    #[must_use]
69    pub fn with_page_size(mut self, page_size: usize) -> Self {
70        self.page_size = page_size.max(1);
71        self
72    }
73
74    /// Aktuell konfigurierte Page-Size.
75    #[must_use]
76    pub fn page_size(&self) -> usize {
77        self.page_size
78    }
79
80    /// Beantwortet `getTypes(type_ids)`.
81    ///
82    /// Fuer jeden bekannten `EquivalenceHashMinimal/Complete` wird das
83    /// passende [`ReplyTypeObject`] eingefuegt. Unbekannte Hashes und
84    /// Nicht-Hash-Identifier (Primitives, Plain-Collections — die
85    /// brauchen kein TypeObject) werden uebersprungen.
86    #[must_use]
87    pub fn handle_get_types(&self, req: &GetTypesRequest) -> GetTypesReply {
88        let mut types: Vec<ReplyTypeObject> = Vec::with_capacity(req.type_ids.len());
89        for ti in &req.type_ids {
90            match ti {
91                TypeIdentifier::EquivalenceHashMinimal(h) => {
92                    if let Some(m) = self.registry.get_minimal(h) {
93                        types.push(ReplyTypeObject::Minimal(m.clone()));
94                    }
95                }
96                TypeIdentifier::EquivalenceHashComplete(h) => {
97                    if let Some(c) = self.registry.get_complete(h) {
98                        types.push(ReplyTypeObject::Complete(c.clone()));
99                    }
100                }
101                _ => {
102                    // Primitives / Plain-Collections — kein TypeObject.
103                }
104            }
105        }
106        GetTypesReply { types }
107    }
108
109    /// Beantwortet `getTypeDependencies(type_ids, continuation_point)`.
110    ///
111    /// Sammelt fuer alle angefragten Hashes die transitiv-bekannten
112    /// Dependencies. Wenn die Gesamtliste mehr als `page_size`
113    /// Eintraege enthaelt, wird sie geteilt: das Reply liefert die
114    /// ersten `page_size` und einen Continuation-Point, der den
115    /// Offset in der nachsten Iteration markiert.
116    ///
117    /// Continuation-Point-Encoding (§7.6.3.3.3): wir kodieren den
118    /// Offset als 4-byte LE in den ersten 4 Bytes (Rest = 0). Das
119    /// ist Implementation-Choice — der Spec sagt nur "opaque, max 32
120    /// bytes". Externe Peers sehen den CP als Black-Box und schicken
121    /// ihn unveraendert zurueck.
122    #[must_use]
123    pub fn handle_get_type_dependencies(
124        &self,
125        req: &GetTypeDependenciesRequest,
126    ) -> GetTypeDependenciesReply {
127        // Aggregierte sortierte Dependencies-Liste — deterministisch
128        // ueber alle Iterationen (sonst koennte der Client zwischen
129        // zwei Calls dieselbe Dependency doppelt oder garnicht sehen).
130        let all = self.collect_dependencies_sorted(&req.type_ids);
131
132        let offset = decode_continuation_offset(&req.continuation_point);
133        if offset >= all.len() {
134            // Keine weiteren Dependencies — leerer Reply mit leerem CP.
135            return GetTypeDependenciesReply {
136                dependent_typeids: Vec::new(),
137                continuation_point: ContinuationPoint::default(),
138            };
139        }
140
141        let end = (offset + self.page_size).min(all.len());
142        let page = all[offset..end].to_vec();
143        let continuation_point = if end < all.len() {
144            encode_continuation_offset(end)
145        } else {
146            ContinuationPoint::default()
147        };
148
149        GetTypeDependenciesReply {
150            dependent_typeids: page,
151            continuation_point,
152        }
153    }
154
155    /// Sortierte, deduplizierte Liste aller Dependencies.
156    fn collect_dependencies_sorted(
157        &self,
158        type_ids: &[TypeIdentifier],
159    ) -> Vec<TypeIdentifierWithSize> {
160        use alloc::collections::BTreeMap;
161        let mut map: BTreeMap<EquivalenceHash, u32> = BTreeMap::new();
162
163        for ti in type_ids {
164            let root_hash = match ti {
165                TypeIdentifier::EquivalenceHashMinimal(h)
166                | TypeIdentifier::EquivalenceHashComplete(h) => *h,
167                _ => continue,
168            };
169            for dep in self
170                .registry
171                .transitive_dependencies(&root_hash, MAX_TRANSITIVE_DEPS)
172            {
173                let size = self.estimate_size(&dep);
174                map.entry(dep).or_insert(size);
175            }
176        }
177
178        map.into_iter()
179            .map(|(h, size)| TypeIdentifierWithSize {
180                type_id: TypeIdentifier::EquivalenceHashMinimal(h),
181                typeobject_serialized_size: size,
182            })
183            .collect()
184    }
185
186    /// Bestimmt die serialized-Size eines registrierten Types.
187    /// Wenn nicht in der Registry: 0.
188    fn estimate_size(&self, hash: &EquivalenceHash) -> u32 {
189        if let Some(m) = self.registry.get_minimal(hash) {
190            TypeObject::Minimal(m.clone())
191                .to_bytes_le()
192                .map(|b| u32::try_from(b.len()).unwrap_or(0))
193                .unwrap_or(0)
194        } else if let Some(c) = self.registry.get_complete(hash) {
195            TypeObject::Complete(c.clone())
196                .to_bytes_le()
197                .map(|b| u32::try_from(b.len()).unwrap_or(0))
198                .unwrap_or(0)
199        } else {
200            0
201        }
202    }
203}
204
205impl Default for TypeLookupServer {
206    fn default() -> Self {
207        Self::new()
208    }
209}
210
211/// DoS-Cap fuer transitive Dependency-Aufloesung (in einem einzelnen
212/// Server-Call). Verhindert dass ein boeser Peer mit einem grossen
213/// Type-Graph einen einzelnen Reply unbegrenzt aufblaeht.
214const MAX_TRANSITIVE_DEPS: usize = 4_096;
215
216/// Kodiert einen Pagination-Offset als 8-byte-Continuation-Point.
217/// Format: `[offset_le_u64; 8 bytes]`. Rest der MAX_LEN bleibt
218/// unbenutzt.
219fn encode_continuation_offset(offset: usize) -> ContinuationPoint {
220    let off64 = u64::try_from(offset).unwrap_or(u64::MAX);
221    let bytes = off64.to_le_bytes();
222    ContinuationPoint(bytes.to_vec())
223}
224
225/// Dekodiert einen Pagination-Offset.
226/// Leere oder zu kurze CPs werden als Offset 0 interpretiert
227/// (= "erste Iteration").
228fn decode_continuation_offset(cp: &ContinuationPoint) -> usize {
229    if cp.0.len() < 8 {
230        return 0;
231    }
232    let mut buf = [0u8; 8];
233    buf.copy_from_slice(&cp.0[..8]);
234    usize::try_from(u64::from_le_bytes(buf)).unwrap_or(usize::MAX)
235}
236
237#[cfg(test)]
238#[allow(clippy::unwrap_used)]
239mod tests {
240    use super::*;
241    use zerodds_types::builder::TypeObjectBuilder;
242    use zerodds_types::{MinimalTypeObject, PrimitiveKind};
243
244    fn sample_struct(name: &str) -> MinimalTypeObject {
245        MinimalTypeObject::Struct(
246            TypeObjectBuilder::struct_type(name)
247                .member("a", TypeIdentifier::Primitive(PrimitiveKind::Int64), |m| m)
248                .build_minimal(),
249        )
250    }
251
252    #[test]
253    fn handle_get_types_unknown_returns_empty() {
254        let server = TypeLookupServer::new();
255        let req = GetTypesRequest {
256            type_ids: alloc::vec![TypeIdentifier::EquivalenceHashMinimal(EquivalenceHash(
257                [0xAA; 14]
258            ))],
259        };
260        let reply = server.handle_get_types(&req);
261        assert!(reply.types.is_empty());
262    }
263
264    #[test]
265    fn handle_get_types_skips_primitives() {
266        let server = TypeLookupServer::new();
267        let req = GetTypesRequest {
268            type_ids: alloc::vec![TypeIdentifier::Primitive(PrimitiveKind::Int32)],
269        };
270        let reply = server.handle_get_types(&req);
271        assert!(reply.types.is_empty());
272    }
273
274    #[test]
275    fn pagination_offset_encoding_roundtrip() {
276        let cp = encode_continuation_offset(123_456);
277        assert_eq!(decode_continuation_offset(&cp), 123_456);
278        let cp = encode_continuation_offset(0);
279        assert_eq!(decode_continuation_offset(&cp), 0);
280    }
281
282    #[test]
283    fn pagination_truncates_at_page_size() {
284        let mut server = TypeLookupServer::new().with_page_size(3);
285        // Build a struct mit 5 Hash-Member-Refs (nicht in registry).
286        let mut builder = TypeObjectBuilder::struct_type("::Root");
287        let dep_hashes: alloc::vec::Vec<EquivalenceHash> = (0..5u8)
288            .map(|i| {
289                let mut b = [0u8; 14];
290                b[0] = i;
291                EquivalenceHash(b)
292            })
293            .collect();
294        for (i, h) in dep_hashes.iter().enumerate() {
295            builder = builder.member(
296                alloc::format!("m{i}").as_str(),
297                TypeIdentifier::EquivalenceHashMinimal(*h),
298                |m| m,
299            );
300        }
301        let root = MinimalTypeObject::Struct(builder.build_minimal());
302        let root_hash = zerodds_types::compute_minimal_hash(&root).unwrap();
303        server.registry.insert_minimal(root_hash, root);
304
305        // First page → 3 deps + non-empty CP.
306        let req = GetTypeDependenciesRequest {
307            type_ids: alloc::vec![TypeIdentifier::EquivalenceHashMinimal(root_hash)],
308            continuation_point: ContinuationPoint::default(),
309        };
310        let reply = server.handle_get_type_dependencies(&req);
311        assert_eq!(reply.dependent_typeids.len(), 3);
312        assert!(!reply.continuation_point.0.is_empty());
313
314        // Second page → 2 deps + empty CP.
315        let req2 = GetTypeDependenciesRequest {
316            type_ids: alloc::vec![TypeIdentifier::EquivalenceHashMinimal(root_hash)],
317            continuation_point: reply.continuation_point.clone(),
318        };
319        let reply2 = server.handle_get_type_dependencies(&req2);
320        assert_eq!(reply2.dependent_typeids.len(), 2);
321        assert!(reply2.continuation_point.0.is_empty());
322    }
323
324    #[test]
325    fn handle_get_type_dependencies_empty_when_no_deps() {
326        let mut server = TypeLookupServer::new();
327        let m = sample_struct("::Empty");
328        let h = zerodds_types::compute_minimal_hash(&m).unwrap();
329        server.registry.insert_minimal(h, m);
330        let req = GetTypeDependenciesRequest {
331            type_ids: alloc::vec![TypeIdentifier::EquivalenceHashMinimal(h)],
332            continuation_point: ContinuationPoint::default(),
333        };
334        let reply = server.handle_get_type_dependencies(&req);
335        assert!(reply.dependent_typeids.is_empty());
336        assert!(reply.continuation_point.0.is_empty());
337    }
338
339    #[test]
340    fn page_size_zero_normalizes_to_one() {
341        let server = TypeLookupServer::new().with_page_size(0);
342        assert_eq!(server.page_size(), 1);
343    }
344}