Skip to main content

fory_core/resolver/
meta_resolver.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::buffer::{Reader, Writer};
19use crate::error::Error;
20use crate::meta::TypeMeta;
21use crate::resolver::type_resolver::{TypeInfo, NO_USER_TYPE_ID};
22use crate::TypeResolver;
23use std::collections::HashMap;
24use std::rc::Rc;
25
26/// Streaming meta writer that writes TypeMeta inline during serialization.
27/// Uses the streaming protocol:
28/// - (index << 1) | 0 for new type definition (followed by TypeMeta bytes)
29/// - (index << 1) | 1 for reference to previously written type
30#[derive(Default)]
31pub struct MetaWriterResolver {
32    type_id_index_map: HashMap<std::any::TypeId, usize>,
33}
34
35const MAX_PARSED_NUM_TYPE_DEFS: usize = 8192;
36
37#[allow(dead_code)]
38impl MetaWriterResolver {
39    /// Write type meta inline using streaming protocol.
40    /// Returns the index assigned to this type.
41    #[inline(always)]
42    pub fn write_type_meta(
43        &mut self,
44        writer: &mut Writer,
45        type_id: std::any::TypeId,
46        type_resolver: &TypeResolver,
47    ) -> Result<(), Error> {
48        match self.type_id_index_map.get(&type_id) {
49            Some(&index) => {
50                // Reference to previously written type: (index << 1) | 1, LSB=1
51                writer.write_var_uint32(((index as u32) << 1) | 1);
52            }
53            None => {
54                // New type: index << 1, LSB=0, followed by TypeMeta bytes inline
55                let index = self.type_id_index_map.len();
56                writer.write_var_uint32((index as u32) << 1);
57                self.type_id_index_map.insert(type_id, index);
58                // Write TypeMeta bytes inline
59                let type_def = type_resolver.get_type_info(&type_id)?.get_type_def();
60                writer.write_bytes(&type_def);
61            }
62        }
63        Ok(())
64    }
65
66    #[inline(always)]
67    pub fn reset(&mut self) {
68        self.type_id_index_map.clear();
69    }
70}
71
72/// Streaming meta reader that reads TypeMeta inline during deserialization.
73/// Uses the streaming protocol:
74/// - (index << 1) | 0 for new type definition (followed by TypeMeta bytes)
75/// - (index << 1) | 1 for reference to previously read type
76#[derive(Default)]
77pub struct MetaReaderResolver {
78    pub reading_type_infos: Vec<Rc<TypeInfo>>,
79    parsed_type_infos: HashMap<i64, Rc<TypeInfo>>,
80    last_meta_header: i64,
81    last_type_info: Option<Rc<TypeInfo>>,
82}
83
84impl MetaReaderResolver {
85    #[inline(always)]
86    pub fn get(&self, index: usize) -> Option<&Rc<TypeInfo>> {
87        self.reading_type_infos.get(index)
88    }
89
90    /// Read type meta inline using streaming protocol.
91    /// Returns the TypeInfo for this type.
92    #[inline(always)]
93    pub fn read_type_meta(
94        &mut self,
95        reader: &mut Reader,
96        type_resolver: &TypeResolver,
97    ) -> Result<Rc<TypeInfo>, Error> {
98        let index_marker = reader.read_varuint32()?;
99        let is_ref = (index_marker & 1) == 1;
100        let index = (index_marker >> 1) as usize;
101
102        if is_ref {
103            // Reference to previously read type
104            self.reading_type_infos.get(index).cloned().ok_or_else(|| {
105                Error::type_error(format!("TypeInfo not found for type index: {}", index))
106            })
107        } else {
108            // New type - read TypeMeta inline
109            let meta_header = reader.read_i64()?;
110            if let Some(type_info) = self
111                .last_type_info
112                .as_ref()
113                .filter(|_| self.last_meta_header == meta_header)
114            {
115                self.reading_type_infos.push(type_info.clone());
116                TypeMeta::skip_bytes(reader, meta_header)?;
117                return Ok(type_info.clone());
118            }
119            if let Some(type_info) = self.parsed_type_infos.get(&meta_header) {
120                self.last_meta_header = meta_header;
121                self.last_type_info = Some(type_info.clone());
122                self.reading_type_infos.push(type_info.clone());
123                TypeMeta::skip_bytes(reader, meta_header)?;
124                Ok(type_info.clone())
125            } else {
126                let type_meta = Rc::new(TypeMeta::from_bytes_with_header(
127                    reader,
128                    type_resolver,
129                    meta_header,
130                )?);
131
132                // Try to find local type info
133                let namespace = &type_meta.get_namespace().original;
134                let type_name = &type_meta.get_type_name().original;
135                let register_by_name = !namespace.is_empty() || !type_name.is_empty();
136                let type_info = if register_by_name {
137                    // Registered by name (namespace can be empty)
138                    if let Some(local_type_info) =
139                        type_resolver.get_type_info_by_name(namespace, type_name)
140                    {
141                        // Use local harness with remote metadata
142                        Rc::new(TypeInfo::from_remote_meta(
143                            type_meta.clone(),
144                            Some(local_type_info.get_harness()),
145                            Some(local_type_info.get_type_id() as u32),
146                            Some(local_type_info.get_user_type_id()),
147                        ))
148                    } else {
149                        // No local type found, use stub harness
150                        Rc::new(TypeInfo::from_remote_meta(
151                            type_meta.clone(),
152                            None,
153                            None,
154                            None,
155                        ))
156                    }
157                } else {
158                    // Registered by ID
159                    let type_id = type_meta.get_type_id();
160                    let user_type_id = type_meta.get_user_type_id();
161                    if user_type_id != NO_USER_TYPE_ID {
162                        if let Some(local_type_info) =
163                            type_resolver.get_user_type_info_by_id(user_type_id)
164                        {
165                            // Use local harness with remote metadata
166                            Rc::new(TypeInfo::from_remote_meta(
167                                type_meta.clone(),
168                                Some(local_type_info.get_harness()),
169                                Some(local_type_info.get_type_id() as u32),
170                                Some(local_type_info.get_user_type_id()),
171                            ))
172                        } else {
173                            // No local type found, use stub harness
174                            Rc::new(TypeInfo::from_remote_meta(
175                                type_meta.clone(),
176                                None,
177                                None,
178                                None,
179                            ))
180                        }
181                    } else if let Some(local_type_info) = type_resolver.get_type_info_by_id(type_id)
182                    {
183                        // Use local harness with remote metadata
184                        Rc::new(TypeInfo::from_remote_meta(
185                            type_meta.clone(),
186                            Some(local_type_info.get_harness()),
187                            Some(local_type_info.get_type_id() as u32),
188                            Some(local_type_info.get_user_type_id()),
189                        ))
190                    } else {
191                        // No local type found, use stub harness
192                        Rc::new(TypeInfo::from_remote_meta(
193                            type_meta.clone(),
194                            None,
195                            None,
196                            None,
197                        ))
198                    }
199                };
200
201                if self.parsed_type_infos.len() < MAX_PARSED_NUM_TYPE_DEFS {
202                    // avoid malicious type defs to OOM parsed_type_infos
203                    self.parsed_type_infos
204                        .insert(meta_header, type_info.clone());
205                }
206                self.last_meta_header = meta_header;
207                self.last_type_info = Some(type_info.clone());
208                self.reading_type_infos.push(type_info.clone());
209                Ok(type_info)
210            }
211        }
212    }
213
214    #[inline(always)]
215    pub fn reset(&mut self) {
216        self.reading_type_infos.clear();
217    }
218}