parquet_format_async_temp/thrift/protocol/
stream.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use async_trait::async_trait;
19
20use crate::thrift::{Error, ProtocolError, ProtocolErrorKind, Result};
21
22use super::*;
23
24#[async_trait]
25pub trait TInputStreamProtocol: Send {
26    /// Read the beginning of a Thrift message.
27    async fn read_message_begin(&mut self) -> Result<TMessageIdentifier>;
28    /// Read the end of a Thrift message.
29    async fn read_message_end(&mut self) -> Result<()>;
30    /// Read the beginning of a Thrift struct.
31    async fn read_struct_begin(&mut self) -> Result<Option<TStructIdentifier>>;
32    /// Read the end of a Thrift struct.
33    async fn read_struct_end(&mut self) -> Result<()>;
34    /// Read the beginning of a Thrift struct field.
35    async fn read_field_begin(&mut self) -> Result<TFieldIdentifier>;
36    /// Read the end of a Thrift struct field.
37    async fn read_field_end(&mut self) -> Result<()>;
38    /// Read a bool.
39    async fn read_bool(&mut self) -> Result<bool>;
40    /// Read a fixed-length byte array.
41    async fn read_bytes(&mut self) -> Result<Vec<u8>>;
42    /// Read a word.
43    async fn read_i8(&mut self) -> Result<i8>;
44    /// Read a 16-bit signed integer.
45    async fn read_i16(&mut self) -> Result<i16>;
46    /// Read a 32-bit signed integer.
47    async fn read_i32(&mut self) -> Result<i32>;
48    /// Read a 64-bit signed integer.
49    async fn read_i64(&mut self) -> Result<i64>;
50    /// Read a 64-bit float.
51    async fn read_double(&mut self) -> Result<f64>;
52    /// Read a fixed-length string (not null terminated).
53    async fn read_string(&mut self) -> Result<String>;
54    /// Read the beginning of a list.
55    async fn read_list_begin(&mut self) -> Result<TListIdentifier>;
56    /// Read the end of a list.
57    async fn read_list_end(&mut self) -> Result<()>;
58    /// Read the beginning of a set.
59    async fn read_set_begin(&mut self) -> Result<TSetIdentifier>;
60    /// Read the end of a set.
61    async fn read_set_end(&mut self) -> Result<()>;
62    /// Read the beginning of a map.
63    async fn read_map_begin(&mut self) -> Result<TMapIdentifier>;
64    /// Read the end of a map.
65    async fn read_map_end(&mut self) -> Result<()>;
66
67    /// Skip a field with type `field_type` recursively until the default
68    /// maximum skip depth is reached.
69    async fn skip(&mut self, field_type: TType) -> Result<()> {
70        self.skip_till_depth(field_type, MAXIMUM_SKIP_DEPTH).await
71    }
72
73    /// Skip a field with type `field_type` recursively up to `depth` levels.
74    async fn skip_till_depth(&mut self, field_type: TType, depth: i8) -> Result<()> {
75        if depth == 0 {
76            return Err(Error::Protocol(ProtocolError {
77                kind: ProtocolErrorKind::DepthLimit,
78                message: format!("cannot parse past {:?}", field_type),
79            }));
80        }
81
82        match field_type {
83            TType::Bool => self.read_bool().await.map(|_| ()),
84            TType::I08 => self.read_i8().await.map(|_| ()),
85            TType::I16 => self.read_i16().await.map(|_| ()),
86            TType::I32 => self.read_i32().await.map(|_| ()),
87            TType::I64 => self.read_i64().await.map(|_| ()),
88            TType::Double => self.read_double().await.map(|_| ()),
89            TType::String => self.read_string().await.map(|_| ()),
90            TType::Struct => {
91                self.read_struct_begin().await?;
92                loop {
93                    let field_ident = self.read_field_begin().await?;
94                    if field_ident.field_type == TType::Stop {
95                        break;
96                    }
97                    self.skip_till_depth(field_ident.field_type, depth - 1)
98                        .await?;
99                }
100                self.read_struct_end().await
101            }
102            TType::List => {
103                let list_ident = self.read_list_begin().await?;
104                for _ in 0..list_ident.size {
105                    self.skip_till_depth(list_ident.element_type, depth - 1)
106                        .await?;
107                }
108                self.read_list_end().await
109            }
110            TType::Set => {
111                let set_ident = self.read_set_begin().await?;
112                for _ in 0..set_ident.size {
113                    self.skip_till_depth(set_ident.element_type, depth - 1)
114                        .await?;
115                }
116                self.read_set_end().await
117            }
118            TType::Map => {
119                let map_ident = self.read_map_begin().await?;
120                for _ in 0..map_ident.size {
121                    let key_type = map_ident
122                        .key_type
123                        .expect("non-zero sized map should contain key type");
124                    let val_type = map_ident
125                        .value_type
126                        .expect("non-zero sized map should contain value type");
127                    self.skip_till_depth(key_type, depth - 1).await?;
128                    self.skip_till_depth(val_type, depth - 1).await?;
129                }
130                self.read_map_end().await
131            }
132            u => Err(Error::Protocol(ProtocolError {
133                kind: ProtocolErrorKind::Unknown,
134                message: format!("cannot skip field type {:?}", &u),
135            })),
136        }
137    }
138
139    // utility (DO NOT USE IN GENERATED CODE!!!!)
140    //
141
142    /// Read an unsigned byte.
143    ///
144    /// This method should **never** be used in generated code.
145    async fn read_byte(&mut self) -> Result<u8>;
146}
147
148#[async_trait]
149pub trait TOutputStreamProtocol: Send {
150    /// Write the beginning of a Thrift message.
151    async fn write_message_begin(&mut self, identifier: &TMessageIdentifier) -> Result<usize>;
152    /// Write the end of a Thrift message.
153    async fn write_message_end(&mut self) -> Result<usize>;
154    /// Write the beginning of a Thrift struct.
155    async fn write_struct_begin(&mut self, identifier: &TStructIdentifier) -> Result<usize>;
156    /// Write the end of a Thrift struct.
157    fn write_struct_end(&mut self) -> Result<usize>;
158    /// Write the beginning of a Thrift field.
159    async fn write_field_begin(&mut self, identifier: &TFieldIdentifier) -> Result<usize>;
160    /// Write the end of a Thrift field.
161    fn write_field_end(&mut self) -> Result<usize>;
162    /// Write a STOP field indicating that all the fields in a struct have been
163    /// written.
164    async fn write_field_stop(&mut self) -> Result<usize>;
165    /// Write a bool.
166    async fn write_bool(&mut self, b: bool) -> Result<usize>;
167    /// Write a fixed-length byte array.
168    async fn write_bytes(&mut self, b: &[u8]) -> Result<usize>;
169    /// Write an 8-bit signed integer.
170    async fn write_i8(&mut self, i: i8) -> Result<usize>;
171    /// Write a 16-bit signed integer.
172    async fn write_i16(&mut self, i: i16) -> Result<usize>;
173    /// Write a 32-bit signed integer.
174    async fn write_i32(&mut self, i: i32) -> Result<usize>;
175    /// Write a 64-bit signed integer.
176    async fn write_i64(&mut self, i: i64) -> Result<usize>;
177    /// Write a 64-bit float.
178    async fn write_double(&mut self, d: f64) -> Result<usize>;
179    /// Write a fixed-length string.
180    async fn write_string(&mut self, s: &str) -> Result<usize>;
181    /// Write the beginning of a list.
182    async fn write_list_begin(&mut self, identifier: &TListIdentifier) -> Result<usize>;
183    /// Write the end of a list.
184    async fn write_list_end(&mut self) -> Result<usize>;
185    /// Write the beginning of a set.
186    async fn write_set_begin(&mut self, identifier: &TSetIdentifier) -> Result<usize>;
187    /// Write the end of a set.
188    async fn write_set_end(&mut self) -> Result<usize>;
189    /// Write the beginning of a map.
190    async fn write_map_begin(&mut self, identifier: &TMapIdentifier) -> Result<usize>;
191    /// Write the end of a map.
192    async fn write_map_end(&mut self) -> Result<usize>;
193    /// Flush buffered bytes to the underlying transport.
194    async fn flush(&mut self) -> Result<()>;
195
196    // utility (DO NOT USE IN GENERATED CODE!!!!)
197    //
198
199    /// Write an unsigned byte.
200    ///
201    /// This method should **never** be used in generated code.
202    async fn write_byte(&mut self, b: u8) -> Result<usize>; // FIXME: REMOVE
203}