parquet_format_async_temp/thrift/protocol/
stream.rs1use async_trait::async_trait;
19
20use crate::thrift::{Error, ProtocolError, ProtocolErrorKind, Result};
21
22use super::*;
23
24#[async_trait]
25pub trait TInputStreamProtocol: Send {
26 async fn read_message_begin(&mut self) -> Result<TMessageIdentifier>;
28 async fn read_message_end(&mut self) -> Result<()>;
30 async fn read_struct_begin(&mut self) -> Result<Option<TStructIdentifier>>;
32 async fn read_struct_end(&mut self) -> Result<()>;
34 async fn read_field_begin(&mut self) -> Result<TFieldIdentifier>;
36 async fn read_field_end(&mut self) -> Result<()>;
38 async fn read_bool(&mut self) -> Result<bool>;
40 async fn read_bytes(&mut self) -> Result<Vec<u8>>;
42 async fn read_i8(&mut self) -> Result<i8>;
44 async fn read_i16(&mut self) -> Result<i16>;
46 async fn read_i32(&mut self) -> Result<i32>;
48 async fn read_i64(&mut self) -> Result<i64>;
50 async fn read_double(&mut self) -> Result<f64>;
52 async fn read_string(&mut self) -> Result<String>;
54 async fn read_list_begin(&mut self) -> Result<TListIdentifier>;
56 async fn read_list_end(&mut self) -> Result<()>;
58 async fn read_set_begin(&mut self) -> Result<TSetIdentifier>;
60 async fn read_set_end(&mut self) -> Result<()>;
62 async fn read_map_begin(&mut self) -> Result<TMapIdentifier>;
64 async fn read_map_end(&mut self) -> Result<()>;
66
67 async fn skip(&mut self, field_type: TType) -> Result<()> {
70 self.skip_till_depth(field_type, MAXIMUM_SKIP_DEPTH).await
71 }
72
73 async fn skip_till_depth(&mut self, field_type: TType, depth: i8) -> Result<()> {
75 if depth == 0 {
76 return Err(Error::Protocol(ProtocolError {
77 kind: ProtocolErrorKind::DepthLimit,
78 message: format!("cannot parse past {:?}", field_type),
79 }));
80 }
81
82 match field_type {
83 TType::Bool => self.read_bool().await.map(|_| ()),
84 TType::I08 => self.read_i8().await.map(|_| ()),
85 TType::I16 => self.read_i16().await.map(|_| ()),
86 TType::I32 => self.read_i32().await.map(|_| ()),
87 TType::I64 => self.read_i64().await.map(|_| ()),
88 TType::Double => self.read_double().await.map(|_| ()),
89 TType::String => self.read_string().await.map(|_| ()),
90 TType::Struct => {
91 self.read_struct_begin().await?;
92 loop {
93 let field_ident = self.read_field_begin().await?;
94 if field_ident.field_type == TType::Stop {
95 break;
96 }
97 self.skip_till_depth(field_ident.field_type, depth - 1)
98 .await?;
99 }
100 self.read_struct_end().await
101 }
102 TType::List => {
103 let list_ident = self.read_list_begin().await?;
104 for _ in 0..list_ident.size {
105 self.skip_till_depth(list_ident.element_type, depth - 1)
106 .await?;
107 }
108 self.read_list_end().await
109 }
110 TType::Set => {
111 let set_ident = self.read_set_begin().await?;
112 for _ in 0..set_ident.size {
113 self.skip_till_depth(set_ident.element_type, depth - 1)
114 .await?;
115 }
116 self.read_set_end().await
117 }
118 TType::Map => {
119 let map_ident = self.read_map_begin().await?;
120 for _ in 0..map_ident.size {
121 let key_type = map_ident
122 .key_type
123 .expect("non-zero sized map should contain key type");
124 let val_type = map_ident
125 .value_type
126 .expect("non-zero sized map should contain value type");
127 self.skip_till_depth(key_type, depth - 1).await?;
128 self.skip_till_depth(val_type, depth - 1).await?;
129 }
130 self.read_map_end().await
131 }
132 u => Err(Error::Protocol(ProtocolError {
133 kind: ProtocolErrorKind::Unknown,
134 message: format!("cannot skip field type {:?}", &u),
135 })),
136 }
137 }
138
139 async fn read_byte(&mut self) -> Result<u8>;
146}
147
148#[async_trait]
149pub trait TOutputStreamProtocol: Send {
150 async fn write_message_begin(&mut self, identifier: &TMessageIdentifier) -> Result<usize>;
152 async fn write_message_end(&mut self) -> Result<usize>;
154 async fn write_struct_begin(&mut self, identifier: &TStructIdentifier) -> Result<usize>;
156 fn write_struct_end(&mut self) -> Result<usize>;
158 async fn write_field_begin(&mut self, identifier: &TFieldIdentifier) -> Result<usize>;
160 fn write_field_end(&mut self) -> Result<usize>;
162 async fn write_field_stop(&mut self) -> Result<usize>;
165 async fn write_bool(&mut self, b: bool) -> Result<usize>;
167 async fn write_bytes(&mut self, b: &[u8]) -> Result<usize>;
169 async fn write_i8(&mut self, i: i8) -> Result<usize>;
171 async fn write_i16(&mut self, i: i16) -> Result<usize>;
173 async fn write_i32(&mut self, i: i32) -> Result<usize>;
175 async fn write_i64(&mut self, i: i64) -> Result<usize>;
177 async fn write_double(&mut self, d: f64) -> Result<usize>;
179 async fn write_string(&mut self, s: &str) -> Result<usize>;
181 async fn write_list_begin(&mut self, identifier: &TListIdentifier) -> Result<usize>;
183 async fn write_list_end(&mut self) -> Result<usize>;
185 async fn write_set_begin(&mut self, identifier: &TSetIdentifier) -> Result<usize>;
187 async fn write_set_end(&mut self) -> Result<usize>;
189 async fn write_map_begin(&mut self, identifier: &TMapIdentifier) -> Result<usize>;
191 async fn write_map_end(&mut self) -> Result<usize>;
193 async fn flush(&mut self) -> Result<()>;
195
196 async fn write_byte(&mut self, b: u8) -> Result<usize>; }