reduct_rs/record/
query.rs

1// Copyright 2023-2025 ReductStore
2// This Source Code Form is subject to the terms of the Mozilla Public
3//    License, v. 2.0. If a copy of the MPL was not distributed with this
4//    file, You can obtain one at https://mozilla.org/MPL/2.0/.
5
6use crate::http_client::{map_error, HttpClient};
7use crate::record::{from_system_time, Record};
8use crate::RecordStream;
9use async_channel::{unbounded, Receiver};
10use async_stream::stream;
11use bytes::Bytes;
12use bytes::BytesMut;
13use futures::Stream;
14use futures_util::{pin_mut, StreamExt};
15use reduct_base::batch::{parse_batched_header, sort_headers_by_time, RecordHeader};
16use reduct_base::error::ErrorCode::Unknown;
17use reduct_base::error::{ErrorCode, IntEnum, ReductError};
18use reduct_base::msg::entry_api::{QueryEntry, QueryInfo, QueryType, RemoveQueryInfo};
19use reqwest::header::{HeaderMap, HeaderValue};
20use reqwest::Method;
21use serde_json::Value;
22use std::sync::Arc;
23use std::time::{Duration, SystemTime};
24
25/// Builder for a query request.
26pub struct QueryBuilder {
27    query: QueryEntry,
28
29    bucket: String,
30    entry: String,
31    client: Arc<HttpClient>,
32}
33
34impl QueryBuilder {
35    pub(crate) fn new(bucket: String, entry: String, client: Arc<HttpClient>) -> Self {
36        Self {
37            query: QueryEntry::default(),
38            bucket,
39            entry,
40            client,
41        }
42    }
43
44    /// Set the start time of the query.
45    pub fn start(mut self, time: SystemTime) -> Self {
46        self.query.start = Some(from_system_time(time));
47        self
48    }
49
50    /// Set the start time of the query as a unix timestamp in microseconds.
51    pub fn start_us(mut self, time_us: u64) -> Self {
52        self.query.start = Some(time_us);
53        self
54    }
55
56    /// Set the end time of the query.
57    pub fn stop(mut self, time: SystemTime) -> Self {
58        self.query.stop = Some(from_system_time(time));
59        self
60    }
61
62    /// Set the end time of the query as a unix timestamp in microseconds.
63    pub fn stop_us(mut self, time_us: u64) -> Self {
64        self.query.stop = Some(time_us);
65        self
66    }
67
68    /// Set the condition for the query.
69    pub fn when(mut self, condition: Value) -> Self {
70        self.query.when = Some(condition);
71        self
72    }
73
74    /// Set the query to be strict.
75    /// If the query is strict, the query will return an error if any of the conditions are invalid.
76    /// default: false
77    pub fn strict(mut self, strict: bool) -> Self {
78        self.query.strict = Some(strict);
79        self
80    }
81
82    /// Set extension parameters for the query.
83    /// This is a JSON object that will be passed to extensions on the server side.
84    pub fn ext(mut self, ext: Value) -> Self {
85        self.query.ext = Some(ext);
86        self
87    }
88
89    /// Set S, to return a record every S seconds.
90    /// default: return all records
91    #[deprecated(
92        since = "1.15.0",
93        note = "Use `$each_t` operator in `when` condition. It will be removed in v1.18.0."
94    )]
95    pub fn each_s(mut self, each_s: f64) -> Self {
96        self.query.each_s = Some(each_s);
97        self
98    }
99
100    /// Set N, to return every N records.
101    /// default: return all records
102    #[deprecated(
103        since = "1.15.0",
104        note = "Use `$each_n` operator in `when` condition. It will be removed in v1.18.0."
105    )]
106    pub fn each_n(mut self, each_n: u64) -> Self {
107        self.query.each_n = Some(each_n);
108        self
109    }
110
111    /// Set a limit for the query.
112    /// default: unlimited
113    #[deprecated(
114        since = "1.15.0",
115        note = "Use `$limit` operator in `when` condition. It will be removed in v1.18.0."
116    )]
117    pub fn limit(mut self, limit: u64) -> Self {
118        self.query.limit = Some(limit);
119        self
120    }
121
122    /// Set TTL for the query.
123    pub fn ttl(mut self, ttl: Duration) -> Self {
124        self.query.ttl = Some(ttl.as_secs());
125        self
126    }
127
128    /// Set the query to be continuous.
129    pub fn continuous(mut self) -> Self {
130        self.query.continuous = Some(true);
131        self
132    }
133
134    /// Set the query to head only.
135    /// default: false
136    pub fn head_only(mut self, head_only: bool) -> Self {
137        self.query.only_metadata = Some(head_only);
138        self
139    }
140
141    /// Send the query request.
142    pub async fn send(
143        mut self,
144    ) -> Result<impl Stream<Item = Result<Record, ReductError>>, ReductError> {
145        // use new POST API for new features
146        self.query.query_type = QueryType::Query;
147        let response = self
148            .client
149            .send_and_receive_json::<QueryEntry, QueryInfo>(
150                Method::POST,
151                &format!("/b/{}/{}/q", self.bucket, self.entry),
152                Some(self.query.clone()),
153            )
154            .await?;
155
156        let head_only = self.query.only_metadata.as_ref().unwrap_or(&false).clone();
157
158        Ok(stream! {
159            let mut last = false;
160            while !last {
161                let method = if head_only { Method::HEAD } else { Method::GET };
162                let request = self.client.request(method, &format!("/b/{}/{}/batch?q={}", self.bucket, self.entry, response.id));
163                let response = self.client.send_request(request).await?;
164
165                if response.status() == reqwest::StatusCode::NO_CONTENT {
166                    break;
167                }
168
169                let headers = response.headers().clone();
170
171
172                let (tx, rx) = unbounded();
173                tokio::spawn(async move {
174                    let mut stream = response.bytes_stream();
175                    while let Some(bytes) = stream.next().await {
176                        if let Err(_) = tx.send(bytes).await {
177                            break;
178                        }
179                    }
180                });
181
182                let stream = parse_batched_records(headers, rx, head_only).await?;
183                pin_mut!(stream);
184                while let Some(record) = stream.next().await {
185                    let record = record?;
186                    last = record.1;
187                    yield Ok(record.0);
188                }
189            }
190        })
191    }
192}
193
194/**
195 * Builder for a remove query request.
196 */
197pub struct RemoveQueryBuilder {
198    query: QueryEntry,
199
200    bucket: String,
201    entry: String,
202    client: Arc<HttpClient>,
203}
204
205impl RemoveQueryBuilder {
206    pub(crate) fn new(bucket: String, entry: String, client: Arc<HttpClient>) -> Self {
207        Self {
208            query: QueryEntry::default(),
209            bucket,
210            entry,
211            client,
212        }
213    }
214
215    /// Set the start time of the query.
216    pub fn start(mut self, time: SystemTime) -> Self {
217        self.query.start = Some(from_system_time(time));
218        self
219    }
220
221    /// Set the start time of the query as a unix timestamp in microseconds.
222    pub fn start_us(mut self, time_us: u64) -> Self {
223        self.query.start = Some(time_us);
224        self
225    }
226
227    /// Set the end time of the query.
228    pub fn stop(mut self, time: SystemTime) -> Self {
229        self.query.stop = Some(from_system_time(time));
230        self
231    }
232
233    /// Set the end time of the query as a unix timestamp in microseconds.
234    pub fn stop_us(mut self, time_us: u64) -> Self {
235        self.query.stop = Some(time_us);
236        self
237    }
238
239    /// Set the condition for the query.
240    /// This will remove all records that match the condition.
241    /// This is a destructive operation.
242    pub fn when(mut self, condition: Value) -> Self {
243        self.query.when = Some(condition);
244        self
245    }
246
247    /// Set the query to be strict.
248    /// If the query is strict, the query will return an error if any of the conditions are invalid.
249    /// default: false
250    pub fn strict(mut self, strict: bool) -> Self {
251        self.query.strict = Some(strict);
252        self
253    }
254
255    /// Set S, to return a record every S seconds.
256    /// default: return all records
257    #[deprecated(
258        since = "1.15.0",
259        note = "Use `$each_t` operator in `when` condition. It will be removed in v1.18.0."
260    )]
261    pub fn each_s(mut self, each_s: f64) -> Self {
262        self.query.each_s = Some(each_s);
263        self
264    }
265
266    /// Set N, to return every N records.
267    /// default: return all records
268    #[deprecated(
269        since = "1.15.0",
270        note = "Use `$each_n` operator in `when` condition. It will be removed in v1.18.0."
271    )]
272    pub fn each_n(mut self, each_n: u64) -> Self {
273        self.query.each_n = Some(each_n);
274        self
275    }
276
277    /// Send the remove query request.
278    /// This will remove all records that match the query.
279    /// This is a destructive operation.
280    ///
281    /// # Returns
282    ///
283    /// * `Result<u64, ReductError>` - The number of records removed.
284    pub async fn send(mut self) -> Result<u64, ReductError> {
285        self.query.query_type = QueryType::Remove;
286        let response = self
287            .client
288            .send_and_receive_json::<QueryEntry, RemoveQueryInfo>(
289                Method::POST,
290                &format!("/b/{}/{}/q", self.bucket, self.entry),
291                Some(self.query.clone()),
292            )
293            .await?;
294
295        Ok(response.removed_records)
296    }
297}
298
299async fn parse_batched_records(
300    headers: HeaderMap,
301    rx: Receiver<Result<Bytes, reqwest::Error>>,
302    head_only: bool,
303) -> Result<impl Stream<Item = Result<(Record, bool), ReductError>>, ReductError> {
304    //sort headers by names
305    let sorted_records = sort_headers_by_time(&headers)?;
306
307    let records_total = sorted_records.iter().count();
308    let mut records_count = 0;
309
310    let unwrap_byte = |bytes: Result<Bytes, reqwest::Error>| match bytes {
311        Ok(b) => Ok(b),
312        Err(err) => {
313            if let Some(status) = err.status() {
314                Err(ReductError::new(
315                    ErrorCode::from_int(status.as_u16() as i16).unwrap_or(Unknown),
316                    &err.to_string(),
317                ))
318            } else {
319                Err(map_error(err))
320            }
321        }
322    };
323
324    Ok(stream! {
325        let mut rest_data = BytesMut::new();
326
327        for (timestamp, value) in sorted_records {
328                let RecordHeader{content_length, content_type, labels} = parse_batched_header(value.to_str().unwrap()).unwrap();
329                let last =  headers.get("x-reduct-last") == Some(&HeaderValue::from_str("true").unwrap());
330
331                records_count += 1;
332
333
334                let data: Option<RecordStream> = if head_only {
335                    None
336                } else if records_count == records_total {
337                    // last record in batched records read in client code
338                    let first_chunk: Bytes = rest_data.clone().into();
339                    let rx = rx.clone();
340
341                    Some(Box::pin(stream! {
342                        yield Ok(first_chunk);
343                        while let Ok(bytes) = rx.recv().await {
344                            yield unwrap_byte(bytes);
345                        }
346
347                    }))
348                } else {
349                    // batched records must be read in order, so it is safe to read them here
350                    // instead of reading them in the use code with an async interator.
351                    // The batched records are small if they are not the last.
352                    // The last batched record is read in the async generator in chunks.
353                    let mut data = rest_data.clone();
354                    while let Ok(bytes) = rx.recv().await {
355                        data.extend_from_slice(&unwrap_byte(bytes)?);
356                        if data.len() >= content_length  as usize {
357                            break;
358                        }
359                    }
360
361                    rest_data = data.split_off(content_length as usize);
362                    data.truncate(content_length as usize);
363
364                    Some(Box::pin(stream! {
365                        yield Ok(data.into());
366                    }))
367                };
368
369                yield Ok((Record {
370                    timestamp,
371                    labels,
372                    content_type,
373                    content_length,
374                    data
375                }, last));
376
377        }
378    })
379}