1use crate::http_client::{map_error, HttpClient};
7use crate::record::{from_system_time, Record};
8use crate::RecordStream;
9use async_channel::{unbounded, Receiver};
10use async_stream::stream;
11use bytes::Bytes;
12use bytes::BytesMut;
13use futures::Stream;
14use futures_util::{pin_mut, StreamExt};
15use reduct_base::batch::{parse_batched_header, sort_headers_by_time, RecordHeader};
16use reduct_base::error::ErrorCode::Unknown;
17use reduct_base::error::{ErrorCode, IntEnum, ReductError};
18use reduct_base::msg::entry_api::{QueryEntry, QueryInfo, QueryType, RemoveQueryInfo};
19use reqwest::header::{HeaderMap, HeaderValue};
20use reqwest::Method;
21use serde_json::Value;
22use std::sync::Arc;
23use std::time::{Duration, SystemTime};
24
25pub struct QueryBuilder {
27 query: QueryEntry,
28
29 bucket: String,
30 entry: String,
31 client: Arc<HttpClient>,
32}
33
34impl QueryBuilder {
35 pub(crate) fn new(bucket: String, entry: String, client: Arc<HttpClient>) -> Self {
36 Self {
37 query: QueryEntry::default(),
38 bucket,
39 entry,
40 client,
41 }
42 }
43
44 pub fn start(mut self, time: SystemTime) -> Self {
46 self.query.start = Some(from_system_time(time));
47 self
48 }
49
50 pub fn start_us(mut self, time_us: u64) -> Self {
52 self.query.start = Some(time_us);
53 self
54 }
55
56 pub fn stop(mut self, time: SystemTime) -> Self {
58 self.query.stop = Some(from_system_time(time));
59 self
60 }
61
62 pub fn stop_us(mut self, time_us: u64) -> Self {
64 self.query.stop = Some(time_us);
65 self
66 }
67
68 pub fn when(mut self, condition: Value) -> Self {
70 self.query.when = Some(condition);
71 self
72 }
73
74 pub fn strict(mut self, strict: bool) -> Self {
78 self.query.strict = Some(strict);
79 self
80 }
81
82 pub fn ext(mut self, ext: Value) -> Self {
85 self.query.ext = Some(ext);
86 self
87 }
88
89 #[deprecated(
92 since = "1.15.0",
93 note = "Use `$each_t` operator in `when` condition. It will be removed in v1.18.0."
94 )]
95 pub fn each_s(mut self, each_s: f64) -> Self {
96 self.query.each_s = Some(each_s);
97 self
98 }
99
100 #[deprecated(
103 since = "1.15.0",
104 note = "Use `$each_n` operator in `when` condition. It will be removed in v1.18.0."
105 )]
106 pub fn each_n(mut self, each_n: u64) -> Self {
107 self.query.each_n = Some(each_n);
108 self
109 }
110
111 #[deprecated(
114 since = "1.15.0",
115 note = "Use `$limit` operator in `when` condition. It will be removed in v1.18.0."
116 )]
117 pub fn limit(mut self, limit: u64) -> Self {
118 self.query.limit = Some(limit);
119 self
120 }
121
122 pub fn ttl(mut self, ttl: Duration) -> Self {
124 self.query.ttl = Some(ttl.as_secs());
125 self
126 }
127
128 pub fn continuous(mut self) -> Self {
130 self.query.continuous = Some(true);
131 self
132 }
133
134 pub fn head_only(mut self, head_only: bool) -> Self {
137 self.query.only_metadata = Some(head_only);
138 self
139 }
140
141 pub async fn send(
143 mut self,
144 ) -> Result<impl Stream<Item = Result<Record, ReductError>>, ReductError> {
145 self.query.query_type = QueryType::Query;
147 let response = self
148 .client
149 .send_and_receive_json::<QueryEntry, QueryInfo>(
150 Method::POST,
151 &format!("/b/{}/{}/q", self.bucket, self.entry),
152 Some(self.query.clone()),
153 )
154 .await?;
155
156 let head_only = self.query.only_metadata.as_ref().unwrap_or(&false).clone();
157
158 Ok(stream! {
159 let mut last = false;
160 while !last {
161 let method = if head_only { Method::HEAD } else { Method::GET };
162 let request = self.client.request(method, &format!("/b/{}/{}/batch?q={}", self.bucket, self.entry, response.id));
163 let response = self.client.send_request(request).await?;
164
165 if response.status() == reqwest::StatusCode::NO_CONTENT {
166 break;
167 }
168
169 let headers = response.headers().clone();
170
171
172 let (tx, rx) = unbounded();
173 tokio::spawn(async move {
174 let mut stream = response.bytes_stream();
175 while let Some(bytes) = stream.next().await {
176 if let Err(_) = tx.send(bytes).await {
177 break;
178 }
179 }
180 });
181
182 let stream = parse_batched_records(headers, rx, head_only).await?;
183 pin_mut!(stream);
184 while let Some(record) = stream.next().await {
185 let record = record?;
186 last = record.1;
187 yield Ok(record.0);
188 }
189 }
190 })
191 }
192}
193
194pub struct RemoveQueryBuilder {
198 query: QueryEntry,
199
200 bucket: String,
201 entry: String,
202 client: Arc<HttpClient>,
203}
204
205impl RemoveQueryBuilder {
206 pub(crate) fn new(bucket: String, entry: String, client: Arc<HttpClient>) -> Self {
207 Self {
208 query: QueryEntry::default(),
209 bucket,
210 entry,
211 client,
212 }
213 }
214
215 pub fn start(mut self, time: SystemTime) -> Self {
217 self.query.start = Some(from_system_time(time));
218 self
219 }
220
221 pub fn start_us(mut self, time_us: u64) -> Self {
223 self.query.start = Some(time_us);
224 self
225 }
226
227 pub fn stop(mut self, time: SystemTime) -> Self {
229 self.query.stop = Some(from_system_time(time));
230 self
231 }
232
233 pub fn stop_us(mut self, time_us: u64) -> Self {
235 self.query.stop = Some(time_us);
236 self
237 }
238
239 pub fn when(mut self, condition: Value) -> Self {
243 self.query.when = Some(condition);
244 self
245 }
246
247 pub fn strict(mut self, strict: bool) -> Self {
251 self.query.strict = Some(strict);
252 self
253 }
254
255 #[deprecated(
258 since = "1.15.0",
259 note = "Use `$each_t` operator in `when` condition. It will be removed in v1.18.0."
260 )]
261 pub fn each_s(mut self, each_s: f64) -> Self {
262 self.query.each_s = Some(each_s);
263 self
264 }
265
266 #[deprecated(
269 since = "1.15.0",
270 note = "Use `$each_n` operator in `when` condition. It will be removed in v1.18.0."
271 )]
272 pub fn each_n(mut self, each_n: u64) -> Self {
273 self.query.each_n = Some(each_n);
274 self
275 }
276
277 pub async fn send(mut self) -> Result<u64, ReductError> {
285 self.query.query_type = QueryType::Remove;
286 let response = self
287 .client
288 .send_and_receive_json::<QueryEntry, RemoveQueryInfo>(
289 Method::POST,
290 &format!("/b/{}/{}/q", self.bucket, self.entry),
291 Some(self.query.clone()),
292 )
293 .await?;
294
295 Ok(response.removed_records)
296 }
297}
298
299async fn parse_batched_records(
300 headers: HeaderMap,
301 rx: Receiver<Result<Bytes, reqwest::Error>>,
302 head_only: bool,
303) -> Result<impl Stream<Item = Result<(Record, bool), ReductError>>, ReductError> {
304 let sorted_records = sort_headers_by_time(&headers)?;
306
307 let records_total = sorted_records.iter().count();
308 let mut records_count = 0;
309
310 let unwrap_byte = |bytes: Result<Bytes, reqwest::Error>| match bytes {
311 Ok(b) => Ok(b),
312 Err(err) => {
313 if let Some(status) = err.status() {
314 Err(ReductError::new(
315 ErrorCode::from_int(status.as_u16() as i16).unwrap_or(Unknown),
316 &err.to_string(),
317 ))
318 } else {
319 Err(map_error(err))
320 }
321 }
322 };
323
324 Ok(stream! {
325 let mut rest_data = BytesMut::new();
326
327 for (timestamp, value) in sorted_records {
328 let RecordHeader{content_length, content_type, labels} = parse_batched_header(value.to_str().unwrap()).unwrap();
329 let last = headers.get("x-reduct-last") == Some(&HeaderValue::from_str("true").unwrap());
330
331 records_count += 1;
332
333
334 let data: Option<RecordStream> = if head_only {
335 None
336 } else if records_count == records_total {
337 let first_chunk: Bytes = rest_data.clone().into();
339 let rx = rx.clone();
340
341 Some(Box::pin(stream! {
342 yield Ok(first_chunk);
343 while let Ok(bytes) = rx.recv().await {
344 yield unwrap_byte(bytes);
345 }
346
347 }))
348 } else {
349 let mut data = rest_data.clone();
354 while let Ok(bytes) = rx.recv().await {
355 data.extend_from_slice(&unwrap_byte(bytes)?);
356 if data.len() >= content_length as usize {
357 break;
358 }
359 }
360
361 rest_data = data.split_off(content_length as usize);
362 data.truncate(content_length as usize);
363
364 Some(Box::pin(stream! {
365 yield Ok(data.into());
366 }))
367 };
368
369 yield Ok((Record {
370 timestamp,
371 labels,
372 content_type,
373 content_length,
374 data
375 }, last));
376
377 }
378 })
379}