Skip to main content

fory_core/resolver/
meta_resolver.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::buffer::{Reader, Writer};
19use crate::error::Error;
20use crate::meta::TypeMeta;
21use crate::resolver::type_resolver::NO_USER_TYPE_ID;
22use crate::resolver::{TypeInfo, TypeResolver};
23use std::collections::HashMap;
24use std::rc::Rc;
25
26/// Streaming meta writer that writes TypeMeta inline during serialization.
27/// Uses the streaming protocol:
28/// - (index << 1) | 0 for new type definition (followed by TypeMeta bytes)
29/// - (index << 1) | 1 for reference to previously written type
30#[derive(Default)]
31pub struct MetaWriterResolver {
32    type_id_index_map: HashMap<std::any::TypeId, usize>,
33    type_index_index_map: Vec<usize>,
34    next_index: usize,
35}
36
37const MAX_PARSED_NUM_TYPE_DEFS: usize = 8192;
38const NO_WRITTEN_TYPE_INDEX: usize = usize::MAX;
39
40#[allow(dead_code)]
41impl MetaWriterResolver {
42    /// Write type meta inline using streaming protocol.
43    /// Returns the index assigned to this type.
44    #[inline(always)]
45    pub fn write_type_meta(
46        &mut self,
47        writer: &mut Writer,
48        type_id: std::any::TypeId,
49        type_resolver: &TypeResolver,
50    ) -> Result<(), Error> {
51        match self.type_id_index_map.get(&type_id) {
52            Some(&index) => {
53                // Reference to previously written type: (index << 1) | 1, LSB=1
54                writer.write_var_u32(((index as u32) << 1) | 1);
55            }
56            None => {
57                // New type: index << 1, LSB=0, followed by TypeMeta bytes inline
58                let index = self.next_index;
59                self.next_index += 1;
60                writer.write_var_u32((index as u32) << 1);
61                self.type_id_index_map.insert(type_id, index);
62                // Write TypeMeta bytes inline
63                let type_def = type_resolver.get_type_info(&type_id)?.get_type_def();
64                writer.write_bytes(&type_def);
65            }
66        }
67        Ok(())
68    }
69
70    /// Write type meta by generated struct type index, avoiding Rust TypeId hash lookup.
71    #[inline(always)]
72    pub fn write_type_meta_fast(
73        &mut self,
74        writer: &mut Writer,
75        type_id: std::any::TypeId,
76        type_index: u32,
77        type_resolver: &TypeResolver,
78    ) -> Result<(), Error> {
79        let type_index = type_index as usize;
80        if let Some(&index) = self.type_index_index_map.get(type_index) {
81            if index != NO_WRITTEN_TYPE_INDEX {
82                writer.write_var_u32(((index as u32) << 1) | 1);
83                return Ok(());
84            }
85        }
86
87        let index = self.next_index;
88        self.next_index += 1;
89        writer.write_var_u32((index as u32) << 1);
90        if type_index >= self.type_index_index_map.len() {
91            self.type_index_index_map
92                .resize(type_index + 1, NO_WRITTEN_TYPE_INDEX);
93        }
94        self.type_index_index_map[type_index] = index;
95        let type_meta = type_resolver.get_type_meta_by_index_ref(&type_id, type_index as u32)?;
96        writer.write_bytes(type_meta.get_bytes());
97        Ok(())
98    }
99
100    #[inline(always)]
101    pub fn reset(&mut self) {
102        self.type_id_index_map.clear();
103        self.type_index_index_map.clear();
104        self.next_index = 0;
105    }
106}
107
108/// Streaming meta reader that reads TypeMeta inline during deserialization.
109/// Uses the streaming protocol:
110/// - (index << 1) | 0 for new type definition (followed by TypeMeta bytes)
111/// - (index << 1) | 1 for reference to previously read type
112#[derive(Default)]
113pub struct MetaReaderResolver {
114    pub reading_type_infos: Vec<Rc<TypeInfo>>,
115    parsed_type_infos: HashMap<i64, Rc<TypeInfo>>,
116    last_meta_header: i64,
117    last_type_info: Option<Rc<TypeInfo>>,
118}
119
120impl MetaReaderResolver {
121    #[inline(always)]
122    pub fn get(&self, index: usize) -> Option<&Rc<TypeInfo>> {
123        self.reading_type_infos.get(index)
124    }
125
126    /// Read type meta inline using streaming protocol.
127    /// Returns the TypeInfo for this type.
128    #[inline(always)]
129    pub fn read_type_meta(
130        &mut self,
131        reader: &mut Reader,
132        type_resolver: &TypeResolver,
133    ) -> Result<Rc<TypeInfo>, Error> {
134        let index_marker = reader.read_var_u32()?;
135        let is_ref = (index_marker & 1) == 1;
136        let index = (index_marker >> 1) as usize;
137
138        if is_ref {
139            // Reference to previously read type
140            self.reading_type_infos.get(index).cloned().ok_or_else(|| {
141                Error::type_error(format!("TypeInfo not found for type index: {}", index))
142            })
143        } else {
144            // New type - read TypeMeta inline
145            let meta_header = reader.read_i64()?;
146            if let Some(type_info) = self
147                .last_type_info
148                .as_ref()
149                .filter(|_| self.last_meta_header == meta_header)
150            {
151                // Header-cache hits intentionally skip without rehashing. Entries reach this cache
152                // only after a successful TypeMeta parse and 52-bit metadata-hash validation.
153                self.reading_type_infos.push(type_info.clone());
154                TypeMeta::skip_bytes_for_validated_header(reader, meta_header)?;
155                return Ok(type_info.clone());
156            }
157            if let Some(type_info) = self.parsed_type_infos.get(&meta_header) {
158                // Header-cache hits intentionally skip without rehashing. Entries reach this cache
159                // only after a successful TypeMeta parse and 52-bit metadata-hash validation.
160                self.last_meta_header = meta_header;
161                self.last_type_info = Some(type_info.clone());
162                self.reading_type_infos.push(type_info.clone());
163                TypeMeta::skip_bytes_for_validated_header(reader, meta_header)?;
164                Ok(type_info.clone())
165            } else {
166                let type_meta = Rc::new(TypeMeta::from_bytes_with_header(
167                    reader,
168                    type_resolver,
169                    meta_header,
170                )?);
171
172                // Try to find local type info
173                let namespace = &type_meta.get_namespace().original;
174                let type_name = &type_meta.get_type_name().original;
175                let register_by_name = !namespace.is_empty() || !type_name.is_empty();
176                let type_info = if register_by_name {
177                    // Registered by name (namespace can be empty)
178                    if let Some(local_type_info) =
179                        type_resolver.get_type_info_by_name(namespace, type_name)
180                    {
181                        // Exact schemas can reuse the local TypeInfo; changed
182                        // schemas keep the remote metadata with the local harness.
183                        if type_meta.get_hash() == local_type_info.get_type_meta_ref().get_hash() {
184                            local_type_info
185                        } else {
186                            Rc::new(TypeInfo::from_remote_meta(
187                                type_meta.clone(),
188                                Some(local_type_info.get_harness()),
189                                Some(local_type_info.get_type_id() as u32),
190                                Some(local_type_info.get_user_type_id()),
191                            ))
192                        }
193                    } else {
194                        // No local type found, use stub harness
195                        Rc::new(TypeInfo::from_remote_meta(
196                            type_meta.clone(),
197                            None,
198                            None,
199                            None,
200                        ))
201                    }
202                } else {
203                    // Registered by ID
204                    let type_id = type_meta.get_type_id();
205                    let user_type_id = type_meta.get_user_type_id();
206                    if user_type_id != NO_USER_TYPE_ID {
207                        if let Some(local_type_info) =
208                            type_resolver.get_user_type_info_by_id(user_type_id)
209                        {
210                            // Exact schemas can reuse the local TypeInfo; changed
211                            // schemas keep the remote metadata with the local harness.
212                            if type_meta.get_hash()
213                                == local_type_info.get_type_meta_ref().get_hash()
214                            {
215                                local_type_info
216                            } else {
217                                Rc::new(TypeInfo::from_remote_meta(
218                                    type_meta.clone(),
219                                    Some(local_type_info.get_harness()),
220                                    Some(local_type_info.get_type_id() as u32),
221                                    Some(local_type_info.get_user_type_id()),
222                                ))
223                            }
224                        } else {
225                            // No local type found, use stub harness
226                            Rc::new(TypeInfo::from_remote_meta(
227                                type_meta.clone(),
228                                None,
229                                None,
230                                None,
231                            ))
232                        }
233                    } else if let Some(local_type_info) = type_resolver.get_type_info_by_id(type_id)
234                    {
235                        // Exact schemas can reuse the local TypeInfo; changed
236                        // schemas keep the remote metadata with the local harness.
237                        if type_meta.get_hash() == local_type_info.get_type_meta_ref().get_hash() {
238                            local_type_info
239                        } else {
240                            Rc::new(TypeInfo::from_remote_meta(
241                                type_meta.clone(),
242                                Some(local_type_info.get_harness()),
243                                Some(local_type_info.get_type_id() as u32),
244                                Some(local_type_info.get_user_type_id()),
245                            ))
246                        }
247                    } else {
248                        // No local type found, use stub harness
249                        Rc::new(TypeInfo::from_remote_meta(
250                            type_meta.clone(),
251                            None,
252                            None,
253                            None,
254                        ))
255                    }
256                };
257
258                if self.parsed_type_infos.len() < MAX_PARSED_NUM_TYPE_DEFS {
259                    // avoid malicious type defs to OOM parsed_type_infos
260                    self.parsed_type_infos
261                        .insert(meta_header, type_info.clone());
262                    self.last_meta_header = meta_header;
263                    self.last_type_info = Some(type_info.clone());
264                }
265                self.reading_type_infos.push(type_info.clone());
266                Ok(type_info)
267            }
268        }
269    }
270
271    #[inline(always)]
272    pub fn reset(&mut self) {
273        self.reading_type_infos.clear();
274    }
275}
276
277#[cfg(test)]
278mod tests {
279    use super::*;
280    use crate::meta::MetaString;
281    use crate::TypeId;
282
283    #[test]
284    fn parsed_type_info_cache_does_not_publish_after_limit() {
285        let meta = TypeMeta::new(
286            TypeId::STRUCT as u32,
287            9001,
288            MetaString::get_empty().clone(),
289            MetaString::get_empty().clone(),
290            false,
291            vec![],
292        )
293        .unwrap();
294        let type_def = meta.get_bytes().to_vec();
295        let mut header_reader = Reader::new(&type_def);
296        let meta_header = header_reader.read_i64().unwrap();
297
298        let mut resolver = MetaReaderResolver::default();
299        let cached_type_info = Rc::new(TypeInfo::from_remote_meta(
300            Rc::new(TypeMeta::empty().unwrap()),
301            None,
302            None,
303            None,
304        ));
305        let mut header = 0;
306        while resolver.parsed_type_infos.len() < MAX_PARSED_NUM_TYPE_DEFS {
307            if header != meta_header {
308                resolver
309                    .parsed_type_infos
310                    .insert(header, cached_type_info.clone());
311            }
312            header += 1;
313        }
314
315        let mut bytes = vec![];
316        let mut writer = Writer::from_buffer(&mut bytes);
317        writer.write_var_u32(0);
318        writer.write_bytes(&type_def);
319
320        let mut reader = Reader::new(&bytes);
321        let current = resolver
322            .read_type_meta(&mut reader, &TypeResolver::default())
323            .unwrap();
324
325        assert_eq!(current.get_user_type_id(), 9001);
326        assert_eq!(resolver.parsed_type_infos.len(), MAX_PARSED_NUM_TYPE_DEFS);
327        assert!(!resolver.parsed_type_infos.contains_key(&meta_header));
328        assert!(resolver.last_type_info.is_none());
329    }
330}