Skip to main content

couchbase_core/httpx/
raw_json_row_streamer.rs

1/*
2 *
3 *  * Copyright (c) 2025 Couchbase, Inc.
4 *  *
5 *  * Licensed under the Apache License, Version 2.0 (the "License");
6 *  * you may not use this file except in compliance with the License.
7 *  * You may obtain a copy of the License at
8 *  *
9 *  *    http://www.apache.org/licenses/LICENSE-2.0
10 *  *
11 *  * Unless required by applicable law or agreed to in writing, software
12 *  * distributed under the License is distributed on an "AS IS" BASIS,
13 *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  * See the License for the specific language governing permissions and
15 *  * limitations under the License.
16 *
17 */
18
19use crate::httpx::decoder::{Decoder, Token};
20use crate::httpx::error::Error;
21use crate::httpx::error::Result as HttpxResult;
22use futures::{stream, FutureExt, Stream, TryStreamExt};
23use serde_json::Value;
24use std::cmp::{PartialEq, PartialOrd};
25use std::collections::HashMap;
26
27#[derive(PartialEq, Eq, PartialOrd, Debug)]
28enum RowStreamState {
29    Start = 0,
30    Rows = 1,
31    PostRows = 2,
32    End = 3,
33}
34
35pub struct RawJsonRowStreamer {
36    stream: Decoder,
37    rows_attrib: String,
38    attribs: HashMap<String, Value>,
39    state: RowStreamState,
40}
41
42pub enum RawJsonRowItem {
43    Row(Vec<u8>),
44    Metadata(Vec<u8>),
45}
46
47impl RawJsonRowStreamer {
48    pub fn new(stream: Decoder, rows_attrib: impl Into<String>) -> Self {
49        Self {
50            stream,
51            rows_attrib: rows_attrib.into(),
52            attribs: HashMap::new(),
53            state: RowStreamState::Start,
54        }
55    }
56
57    async fn begin(&mut self) -> HttpxResult<()> {
58        if self.state != RowStreamState::Start {
59            return Err(Error::new_message_error(
60                "unexpected parsing state during begin",
61            ));
62        }
63
64        let first = self.stream.token().await?;
65
66        if first != Token::Delim('{') {
67            return Err(Error::new_message_error(
68                "expected an opening brace for the result",
69            ));
70        }
71
72        loop {
73            if !self.stream.more().await {
74                self.state = RowStreamState::End;
75                break;
76            }
77
78            let token = self.stream.token().await?;
79            let key = match token {
80                Token::String(s) => s,
81                _ => {
82                    return Err(Error::new_message_error(
83                        "expected a string key for the result",
84                    ));
85                }
86            };
87
88            if key == self.rows_attrib.as_str() {
89                let token = self.stream.token().await?;
90                match token {
91                    Token::Delim('[') => {
92                        self.state = RowStreamState::Rows;
93                    }
94                    Token::Value(v) => {
95                        if &v == b"null" {
96                            continue;
97                        }
98
99                        return Err(Error::new_message_error(
100                            "expected an opening bracket for the rows",
101                        ));
102                    }
103                    _ => {
104                        return Err(Error::new_message_error(
105                            "expected an opening bracket for the rows",
106                        ));
107                    }
108                }
109
110                if self.stream.more().await {
111                    self.state = RowStreamState::Rows;
112                    break;
113                }
114
115                // There are no rows so we can just read the remaining metadata now.
116                let token = match self.stream.token().await {
117                    Ok(t) => t,
118                    Err(e) => return Err(e),
119                };
120
121                match token {
122                    Token::Delim(']') => {}
123                    _ => {
124                        return Err(Error::new_message_error(
125                            "expected closing ] for the result",
126                        ));
127                    }
128                }
129
130                self.state = RowStreamState::PostRows;
131                continue;
132            }
133
134            let value = self.stream.decode().await?;
135            let value = serde_json::from_slice(&value)
136                .map_err(|e| Error::new_message_error(format!("failed to parse value: {e}")))?;
137
138            self.attribs.insert(key, value);
139        }
140
141        Ok(())
142    }
143
144    pub async fn has_more_rows(&mut self) -> bool {
145        if self.state != RowStreamState::Rows {
146            return false;
147        }
148
149        self.stream.more().await
150    }
151
152    pub async fn read_prelude(&mut self) -> HttpxResult<Vec<u8>> {
153        self.begin().await?;
154        serde_json::to_vec(&self.attribs)
155            .map_err(|e| Error::new_message_error(format!("failed to read prelude: {e}")))
156    }
157
158    pub fn epilog(&mut self) -> HttpxResult<Vec<u8>> {
159        serde_json::to_vec(&self.attribs)
160            .map_err(|e| Error::new_message_error(format!("failed to read epilogue: {e}")))
161    }
162
163    pub async fn next(&mut self) -> Option<HttpxResult<RawJsonRowItem>> {
164        if self.state == RowStreamState::End {
165            return None;
166        }
167
168        loop {
169            if self.state == RowStreamState::PostRows {
170                let token = match self.stream.token().await {
171                    Ok(t) => t,
172                    Err(e) => return Some(Err(e)),
173                };
174
175                let key = match token {
176                    Token::String(s) => s,
177                    Token::Delim('}') => {
178                        self.state = RowStreamState::End;
179
180                        let metadata = match serde_json::to_vec(&self.attribs).map_err(|e| {
181                            Error::new_message_error(format!("failed to encode metadata: {e}"))
182                        }) {
183                            Ok(m) => m,
184                            Err(e) => return Some(Err(e)),
185                        };
186
187                        return Some(Ok(RawJsonRowItem::Metadata(metadata)));
188                    }
189                    _ => {
190                        return Some(Err(Error::new_message_error(
191                            "expected a string key for the result",
192                        )));
193                    }
194                };
195
196                let value = match self.stream.decode().await {
197                    Ok(v) => v,
198                    Err(e) => return Some(Err(e)),
199                };
200
201                let value = match serde_json::from_slice::<Value>(&value) {
202                    Ok(v) => v,
203                    Err(e) => {
204                        return Some(Err(Error::new_message_error(format!(
205                            "failed to parse value: {e}"
206                        ))))
207                    }
208                };
209
210                self.attribs.insert(key, value);
211                continue;
212            }
213
214            let row = match self.stream.decode().await {
215                Ok(v) => v,
216                Err(e) => return Some(Err(e)),
217            };
218
219            if !self.stream.more().await {
220                let token = match self.stream.token().await {
221                    Ok(t) => t,
222                    Err(e) => return Some(Err(e)),
223                };
224
225                match token {
226                    Token::Delim(']') => {}
227                    _ => {
228                        return Some(Err(Error::new_message_error(
229                            "expected closing ] for the result",
230                        )));
231                    }
232                }
233
234                self.state = RowStreamState::PostRows;
235            }
236
237            return Some(Ok(RawJsonRowItem::Row(row)));
238        }
239    }
240
241    pub fn into_stream(self) -> impl Stream<Item = HttpxResult<RawJsonRowItem>> {
242        stream::unfold(self, |mut stream| async move {
243            stream.next().await.map(|row| (row, stream))
244        })
245    }
246}