Skip to main content

fory_core/resolver/
meta_resolver.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::buffer::{Reader, Writer};
19use crate::error::Error;
20use crate::meta::TypeMeta;
21use crate::resolver::type_resolver::NO_USER_TYPE_ID;
22use crate::resolver::{TypeInfo, TypeResolver};
23use std::collections::HashMap;
24use std::rc::Rc;
25
26/// Streaming meta writer that writes TypeMeta inline during serialization.
27/// Uses the streaming protocol:
28/// - (index << 1) | 0 for new type definition (followed by TypeMeta bytes)
29/// - (index << 1) | 1 for reference to previously written type
30#[derive(Default)]
31pub struct MetaWriterResolver {
32    type_id_index_map: HashMap<std::any::TypeId, usize>,
33    type_index_index_map: Vec<usize>,
34    next_index: usize,
35}
36
37const MAX_PARSED_NUM_TYPE_DEFS: usize = 8192;
38const NO_WRITTEN_TYPE_INDEX: usize = usize::MAX;
39
40#[allow(dead_code)]
41impl MetaWriterResolver {
42    /// Write type meta inline using streaming protocol.
43    /// Returns the index assigned to this type.
44    #[inline(always)]
45    pub fn write_type_meta(
46        &mut self,
47        writer: &mut Writer,
48        type_id: std::any::TypeId,
49        type_resolver: &TypeResolver,
50    ) -> Result<(), Error> {
51        match self.type_id_index_map.get(&type_id) {
52            Some(&index) => {
53                // Reference to previously written type: (index << 1) | 1, LSB=1
54                writer.write_var_u32(((index as u32) << 1) | 1);
55            }
56            None => {
57                // New type: index << 1, LSB=0, followed by TypeMeta bytes inline
58                let index = self.next_index;
59                self.next_index += 1;
60                writer.write_var_u32((index as u32) << 1);
61                self.type_id_index_map.insert(type_id, index);
62                // Write TypeMeta bytes inline
63                let type_def = type_resolver.get_type_info(&type_id)?.get_type_def();
64                writer.write_bytes(&type_def);
65            }
66        }
67        Ok(())
68    }
69
70    /// Write type meta by generated struct type index, avoiding Rust TypeId hash lookup.
71    #[inline(always)]
72    pub fn write_type_meta_fast(
73        &mut self,
74        writer: &mut Writer,
75        type_id: std::any::TypeId,
76        type_index: u32,
77        type_resolver: &TypeResolver,
78    ) -> Result<(), Error> {
79        let type_index = type_index as usize;
80        if let Some(&index) = self.type_index_index_map.get(type_index) {
81            if index != NO_WRITTEN_TYPE_INDEX {
82                writer.write_var_u32(((index as u32) << 1) | 1);
83                return Ok(());
84            }
85        }
86
87        let index = self.next_index;
88        self.next_index += 1;
89        writer.write_var_u32((index as u32) << 1);
90        if type_index >= self.type_index_index_map.len() {
91            self.type_index_index_map
92                .resize(type_index + 1, NO_WRITTEN_TYPE_INDEX);
93        }
94        self.type_index_index_map[type_index] = index;
95        let type_meta = type_resolver.get_type_meta_by_index_ref(&type_id, type_index as u32)?;
96        writer.write_bytes(type_meta.get_bytes());
97        Ok(())
98    }
99
100    #[inline(always)]
101    pub fn reset(&mut self) {
102        self.type_id_index_map.clear();
103        self.type_index_index_map.clear();
104        self.next_index = 0;
105    }
106}
107
108/// Streaming meta reader that reads TypeMeta inline during deserialization.
109/// Uses the streaming protocol:
110/// - (index << 1) | 0 for new type definition (followed by TypeMeta bytes)
111/// - (index << 1) | 1 for reference to previously read type
112#[derive(Default)]
113pub struct MetaReaderResolver {
114    pub reading_type_infos: Vec<Rc<TypeInfo>>,
115    parsed_type_infos: HashMap<i64, Rc<TypeInfo>>,
116    last_meta_header: i64,
117    last_type_info: Option<Rc<TypeInfo>>,
118}
119
120impl MetaReaderResolver {
121    #[inline(always)]
122    pub fn get(&self, index: usize) -> Option<&Rc<TypeInfo>> {
123        self.reading_type_infos.get(index)
124    }
125
126    /// Read type meta inline using streaming protocol.
127    /// Returns the TypeInfo for this type.
128    #[inline(always)]
129    pub fn read_type_meta(
130        &mut self,
131        reader: &mut Reader,
132        type_resolver: &TypeResolver,
133    ) -> Result<Rc<TypeInfo>, Error> {
134        let index_marker = reader.read_var_u32()?;
135        let is_ref = (index_marker & 1) == 1;
136        let index = (index_marker >> 1) as usize;
137
138        if is_ref {
139            // Reference to previously read type
140            self.reading_type_infos.get(index).cloned().ok_or_else(|| {
141                Error::type_error(format!("TypeInfo not found for type index: {}", index))
142            })
143        } else {
144            // New type - read TypeMeta inline
145            let meta_header = reader.read_i64()?;
146            if let Some(type_info) = self
147                .last_type_info
148                .as_ref()
149                .filter(|_| self.last_meta_header == meta_header)
150            {
151                // Header-cache hits intentionally skip without rehashing. Entries reach this cache
152                // only after a successful TypeMeta parse and 52-bit metadata-hash validation.
153                self.reading_type_infos.push(type_info.clone());
154                TypeMeta::skip_bytes_for_validated_header(reader, meta_header)?;
155                return Ok(type_info.clone());
156            }
157            if let Some(type_info) = self.parsed_type_infos.get(&meta_header) {
158                // Header-cache hits intentionally skip without rehashing. Entries reach this cache
159                // only after a successful TypeMeta parse and 52-bit metadata-hash validation.
160                self.last_meta_header = meta_header;
161                self.last_type_info = Some(type_info.clone());
162                self.reading_type_infos.push(type_info.clone());
163                TypeMeta::skip_bytes_for_validated_header(reader, meta_header)?;
164                Ok(type_info.clone())
165            } else {
166                let type_meta = Rc::new(TypeMeta::from_bytes_with_header(
167                    reader,
168                    type_resolver,
169                    meta_header,
170                )?);
171
172                // Try to find local type info
173                let namespace = &type_meta.get_namespace().original;
174                let type_name = &type_meta.get_type_name().original;
175                let register_by_name = !namespace.is_empty() || !type_name.is_empty();
176                let type_info = if register_by_name {
177                    // Registered by name (namespace can be empty)
178                    if let Some(local_type_info) =
179                        type_resolver.get_type_info_by_name(namespace, type_name)
180                    {
181                        // Use local harness with remote metadata
182                        Rc::new(TypeInfo::from_remote_meta(
183                            type_meta.clone(),
184                            Some(local_type_info.get_harness()),
185                            Some(local_type_info.get_type_id() as u32),
186                            Some(local_type_info.get_user_type_id()),
187                        ))
188                    } else {
189                        // No local type found, use stub harness
190                        Rc::new(TypeInfo::from_remote_meta(
191                            type_meta.clone(),
192                            None,
193                            None,
194                            None,
195                        ))
196                    }
197                } else {
198                    // Registered by ID
199                    let type_id = type_meta.get_type_id();
200                    let user_type_id = type_meta.get_user_type_id();
201                    if user_type_id != NO_USER_TYPE_ID {
202                        if let Some(local_type_info) =
203                            type_resolver.get_user_type_info_by_id(user_type_id)
204                        {
205                            // Use local harness with remote metadata
206                            Rc::new(TypeInfo::from_remote_meta(
207                                type_meta.clone(),
208                                Some(local_type_info.get_harness()),
209                                Some(local_type_info.get_type_id() as u32),
210                                Some(local_type_info.get_user_type_id()),
211                            ))
212                        } else {
213                            // No local type found, use stub harness
214                            Rc::new(TypeInfo::from_remote_meta(
215                                type_meta.clone(),
216                                None,
217                                None,
218                                None,
219                            ))
220                        }
221                    } else if let Some(local_type_info) = type_resolver.get_type_info_by_id(type_id)
222                    {
223                        // Use local harness with remote metadata
224                        Rc::new(TypeInfo::from_remote_meta(
225                            type_meta.clone(),
226                            Some(local_type_info.get_harness()),
227                            Some(local_type_info.get_type_id() as u32),
228                            Some(local_type_info.get_user_type_id()),
229                        ))
230                    } else {
231                        // No local type found, use stub harness
232                        Rc::new(TypeInfo::from_remote_meta(
233                            type_meta.clone(),
234                            None,
235                            None,
236                            None,
237                        ))
238                    }
239                };
240
241                if self.parsed_type_infos.len() < MAX_PARSED_NUM_TYPE_DEFS {
242                    // avoid malicious type defs to OOM parsed_type_infos
243                    self.parsed_type_infos
244                        .insert(meta_header, type_info.clone());
245                    self.last_meta_header = meta_header;
246                    self.last_type_info = Some(type_info.clone());
247                }
248                self.reading_type_infos.push(type_info.clone());
249                Ok(type_info)
250            }
251        }
252    }
253
254    #[inline(always)]
255    pub fn reset(&mut self) {
256        self.reading_type_infos.clear();
257    }
258}
259
260#[cfg(test)]
261mod tests {
262    use super::*;
263    use crate::meta::MetaString;
264    use crate::TypeId;
265
266    #[test]
267    fn parsed_type_info_cache_does_not_publish_after_limit() {
268        let meta = TypeMeta::new(
269            TypeId::STRUCT as u32,
270            9001,
271            MetaString::get_empty().clone(),
272            MetaString::get_empty().clone(),
273            false,
274            vec![],
275        )
276        .unwrap();
277        let type_def = meta.get_bytes().to_vec();
278        let mut header_reader = Reader::new(&type_def);
279        let meta_header = header_reader.read_i64().unwrap();
280
281        let mut resolver = MetaReaderResolver::default();
282        let cached_type_info = Rc::new(TypeInfo::from_remote_meta(
283            Rc::new(TypeMeta::empty().unwrap()),
284            None,
285            None,
286            None,
287        ));
288        let mut header = 0;
289        while resolver.parsed_type_infos.len() < MAX_PARSED_NUM_TYPE_DEFS {
290            if header != meta_header {
291                resolver
292                    .parsed_type_infos
293                    .insert(header, cached_type_info.clone());
294            }
295            header += 1;
296        }
297
298        let mut bytes = vec![];
299        let mut writer = Writer::from_buffer(&mut bytes);
300        writer.write_var_u32(0);
301        writer.write_bytes(&type_def);
302
303        let mut reader = Reader::new(&bytes);
304        let current = resolver
305            .read_type_meta(&mut reader, &TypeResolver::default())
306            .unwrap();
307
308        assert_eq!(current.get_user_type_id(), 9001);
309        assert_eq!(resolver.parsed_type_infos.len(), MAX_PARSED_NUM_TYPE_DEFS);
310        assert!(!resolver.parsed_type_infos.contains_key(&meta_header));
311        assert!(resolver.last_type_info.is_none());
312    }
313}