couchbase_core/httpx/
raw_json_row_streamer.rs1use crate::httpx::decoder::{Decoder, Token};
20use crate::httpx::error::Error;
21use crate::httpx::error::Result as HttpxResult;
22use futures::{stream, FutureExt, Stream, TryStreamExt};
23use serde_json::Value;
24use std::cmp::{PartialEq, PartialOrd};
25use std::collections::HashMap;
26
27#[derive(PartialEq, Eq, PartialOrd, Debug)]
28enum RowStreamState {
29 Start = 0,
30 Rows = 1,
31 PostRows = 2,
32 End = 3,
33}
34
35pub struct RawJsonRowStreamer {
36 stream: Decoder,
37 rows_attrib: String,
38 attribs: HashMap<String, Value>,
39 state: RowStreamState,
40}
41
42pub enum RawJsonRowItem {
43 Row(Vec<u8>),
44 Metadata(Vec<u8>),
45}
46
47impl RawJsonRowStreamer {
48 pub fn new(stream: Decoder, rows_attrib: impl Into<String>) -> Self {
49 Self {
50 stream,
51 rows_attrib: rows_attrib.into(),
52 attribs: HashMap::new(),
53 state: RowStreamState::Start,
54 }
55 }
56
57 async fn begin(&mut self) -> HttpxResult<()> {
58 if self.state != RowStreamState::Start {
59 return Err(Error::new_message_error(
60 "unexpected parsing state during begin",
61 ));
62 }
63
64 let first = self.stream.token().await?;
65
66 if first != Token::Delim('{') {
67 return Err(Error::new_message_error(
68 "expected an opening brace for the result",
69 ));
70 }
71
72 loop {
73 if !self.stream.more().await {
74 self.state = RowStreamState::End;
75 break;
76 }
77
78 let token = self.stream.token().await?;
79 let key = match token {
80 Token::String(s) => s,
81 _ => {
82 return Err(Error::new_message_error(
83 "expected a string key for the result",
84 ));
85 }
86 };
87
88 if key == self.rows_attrib.as_str() {
89 let token = self.stream.token().await?;
90 match token {
91 Token::Delim('[') => {
92 self.state = RowStreamState::Rows;
93 }
94 Token::Value(v) => {
95 if &v == b"null" {
96 continue;
97 }
98
99 return Err(Error::new_message_error(
100 "expected an opening bracket for the rows",
101 ));
102 }
103 _ => {
104 return Err(Error::new_message_error(
105 "expected an opening bracket for the rows",
106 ));
107 }
108 }
109
110 if self.stream.more().await {
111 self.state = RowStreamState::Rows;
112 break;
113 }
114
115 let token = match self.stream.token().await {
117 Ok(t) => t,
118 Err(e) => return Err(e),
119 };
120
121 match token {
122 Token::Delim(']') => {}
123 _ => {
124 return Err(Error::new_message_error(
125 "expected closing ] for the result",
126 ));
127 }
128 }
129
130 self.state = RowStreamState::PostRows;
131 continue;
132 }
133
134 let value = self.stream.decode().await?;
135 let value = serde_json::from_slice(&value)
136 .map_err(|e| Error::new_message_error(format!("failed to parse value: {e}")))?;
137
138 self.attribs.insert(key, value);
139 }
140
141 Ok(())
142 }
143
144 pub async fn has_more_rows(&mut self) -> bool {
145 if self.state != RowStreamState::Rows {
146 return false;
147 }
148
149 self.stream.more().await
150 }
151
152 pub async fn read_prelude(&mut self) -> HttpxResult<Vec<u8>> {
153 self.begin().await?;
154 serde_json::to_vec(&self.attribs)
155 .map_err(|e| Error::new_message_error(format!("failed to read prelude: {e}")))
156 }
157
158 pub fn epilog(&mut self) -> HttpxResult<Vec<u8>> {
159 serde_json::to_vec(&self.attribs)
160 .map_err(|e| Error::new_message_error(format!("failed to read epilogue: {e}")))
161 }
162
163 pub async fn next(&mut self) -> Option<HttpxResult<RawJsonRowItem>> {
164 if self.state == RowStreamState::End {
165 return None;
166 }
167
168 loop {
169 if self.state == RowStreamState::PostRows {
170 let token = match self.stream.token().await {
171 Ok(t) => t,
172 Err(e) => return Some(Err(e)),
173 };
174
175 let key = match token {
176 Token::String(s) => s,
177 Token::Delim('}') => {
178 self.state = RowStreamState::End;
179
180 let metadata = match serde_json::to_vec(&self.attribs).map_err(|e| {
181 Error::new_message_error(format!("failed to encode metadata: {e}"))
182 }) {
183 Ok(m) => m,
184 Err(e) => return Some(Err(e)),
185 };
186
187 return Some(Ok(RawJsonRowItem::Metadata(metadata)));
188 }
189 _ => {
190 return Some(Err(Error::new_message_error(
191 "expected a string key for the result",
192 )));
193 }
194 };
195
196 let value = match self.stream.decode().await {
197 Ok(v) => v,
198 Err(e) => return Some(Err(e)),
199 };
200
201 let value = match serde_json::from_slice::<Value>(&value) {
202 Ok(v) => v,
203 Err(e) => {
204 return Some(Err(Error::new_message_error(format!(
205 "failed to parse value: {e}"
206 ))))
207 }
208 };
209
210 self.attribs.insert(key, value);
211 continue;
212 }
213
214 let row = match self.stream.decode().await {
215 Ok(v) => v,
216 Err(e) => return Some(Err(e)),
217 };
218
219 if !self.stream.more().await {
220 let token = match self.stream.token().await {
221 Ok(t) => t,
222 Err(e) => return Some(Err(e)),
223 };
224
225 match token {
226 Token::Delim(']') => {}
227 _ => {
228 return Some(Err(Error::new_message_error(
229 "expected closing ] for the result",
230 )));
231 }
232 }
233
234 self.state = RowStreamState::PostRows;
235 }
236
237 return Some(Ok(RawJsonRowItem::Row(row)));
238 }
239 }
240
241 pub fn into_stream(self) -> impl Stream<Item = HttpxResult<RawJsonRowItem>> {
242 stream::unfold(self, |mut stream| async move {
243 stream.next().await.map(|row| (row, stream))
244 })
245 }
246}