Skip to main content

reduct_rs/record/
write_batched_records_v2.rs

1// Copyright 2025 ReductStore
2// This Source Code Form is subject to the terms of the Mozilla Public
3//    License, v. 2.0. If a copy of the MPL was not distributed with this
4//    file, You can obtain one at https://mozilla.org/MPL/2.0/.
5
6use crate::http_client::HttpClient;
7use crate::record::write_batched_records_v1::WriteBatchType;
8use crate::Record;
9use async_stream::stream;
10use futures_util::StreamExt;
11use reduct_base::batch::v2::{encode_entry_name, make_batched_header_name};
12use reduct_base::error::{ErrorCode, ReductError};
13use reqwest::header::{HeaderValue, CONTENT_LENGTH, CONTENT_TYPE};
14use reqwest::{Body, Method};
15use std::collections::{BTreeMap, HashMap, VecDeque};
16use std::sync::Arc;
17use std::time::SystemTime;
18
19/// Builder for writing multiple records across entries in a single request.
20pub struct WriteRecordBatchBuilder {
21    bucket: String,
22    batch_type: WriteBatchType,
23    records: VecDeque<Record>,
24    client: Arc<HttpClient>,
25    last_access: SystemTime,
26}
27
28type FailedRecordMap = BTreeMap<(String, u64), ReductError>;
29
30impl WriteRecordBatchBuilder {
31    pub(crate) fn new(bucket: String, client: Arc<HttpClient>, batch_type: WriteBatchType) -> Self {
32        Self {
33            bucket,
34            batch_type,
35            records: VecDeque::new(),
36            client,
37            last_access: SystemTime::now(),
38        }
39    }
40
41    /// Add a record to the batch.
42    ///
43    /// # Arguments
44    ///
45    /// * `record` - The record to add to the batch.
46    ///
47    /// # Returns
48    ///
49    /// Returns the builder for chaining.
50    pub fn add_record(mut self, record: Record) -> Self {
51        self.records.push_back(record);
52        self.last_access = SystemTime::now();
53        self
54    }
55
56    /// Add record to the batch without chaining.
57    ///
58    /// # Arguments
59    ///
60    /// * `record` - The record to append to the batch.
61    pub fn append_record(&mut self, record: Record) {
62        self.records.push_back(record);
63        self.last_access = SystemTime::now();
64    }
65
66    /// Add records to the batch.
67    ///
68    /// # Arguments
69    ///
70    /// * `records` - The records to add to the batch.
71    ///
72    /// # Returns
73    ///
74    /// Returns the builder for chaining.
75    pub fn add_records(mut self, records: Vec<Record>) -> Self {
76        self.records.extend(records);
77        self.last_access = SystemTime::now();
78        self
79    }
80
81    /// Add records to the batch without chaining.
82    ///
83    /// # Arguments
84    ///
85    /// * `records` - The records to append to the batch.
86    ///
87    pub fn append_records(&mut self, records: Vec<Record>) {
88        self.records.extend(records);
89        self.last_access = SystemTime::now();
90    }
91
92    /// Build the request and send it to the server.
93    ///
94    /// # Returns
95    ///
96    /// Returns a map of (entry, timestamp) to errors for any records that failed to write.
97    ///
98    /// # Errors
99    ///
100    /// * `ReductError` - If the request was not successful.
101    pub async fn send(mut self) -> Result<FailedRecordMap, ReductError> {
102        if let Some(version) = self.client.get_api_version().await {
103            if version.1 < 18 {
104                let message = match self.batch_type {
105                    WriteBatchType::Write => {
106                        "Multi-entry batch writes are not supported in API versions below v1.18"
107                    }
108                    WriteBatchType::Update => {
109                        "Multi-entry batch updates are not supported in API versions below v1.18"
110                    }
111                    WriteBatchType::Remove => {
112                        "Multi-entry batch remove is not supported in API versions below v1.18"
113                    }
114                };
115                return Err(ReductError::new(ErrorCode::InvalidRequest, message));
116            }
117        }
118
119        if self.records.is_empty() {
120            return Err(ReductError::new(
121                ErrorCode::InvalidRequest,
122                "Batch must contain at least one record",
123            ));
124        }
125
126        let mut entries = Vec::new();
127        let mut entry_index = HashMap::new();
128        for record in &self.records {
129            if record.entry().is_empty() {
130                return Err(ReductError::new(
131                    ErrorCode::InvalidRequest,
132                    "Record entry name is required for multi-entry batch operations",
133                ));
134            }
135
136            if !entry_index.contains_key(record.entry()) {
137                let index = entries.len();
138                entries.push(record.entry().to_string());
139                entry_index.insert(record.entry().to_string(), index);
140            }
141        }
142
143        let mut records: Vec<Record> = self.records.drain(..).collect();
144        let start_ts = records
145            .iter()
146            .map(|record| record.timestamp_us())
147            .min()
148            .unwrap();
149
150        records.sort_by(|left, right| {
151            let left_idx = entry_index.get(left.entry()).unwrap();
152            let right_idx = entry_index.get(right.entry()).unwrap();
153            left_idx
154                .cmp(right_idx)
155                .then_with(|| left.timestamp_us().cmp(&right.timestamp_us()))
156        });
157
158        let mut request = match self.batch_type {
159            WriteBatchType::Write => self
160                .client
161                .request(Method::POST, &format!("/io/{}/write", self.bucket))
162                .header(
163                    CONTENT_TYPE,
164                    HeaderValue::from_static("application/octet-stream"),
165                )
166                .header(
167                    CONTENT_LENGTH,
168                    HeaderValue::from_str(
169                        &records
170                            .iter()
171                            .map(|r| r.content_length())
172                            .sum::<usize>()
173                            .to_string(),
174                    )
175                    .unwrap(),
176                ),
177            WriteBatchType::Update => self
178                .client
179                .request(Method::PATCH, &format!("/io/{}/update", self.bucket))
180                .header(
181                    CONTENT_TYPE,
182                    HeaderValue::from_static("application/octet-stream"),
183                )
184                .header(CONTENT_LENGTH, HeaderValue::from_static("0")),
185            WriteBatchType::Remove => self
186                .client
187                .request(Method::DELETE, &format!("/io/{}/remove", self.bucket))
188                .header(CONTENT_LENGTH, HeaderValue::from_static("0")),
189        };
190
191        request = request
192            .header(
193                "x-reduct-start-ts",
194                HeaderValue::from_str(&start_ts.to_string()).unwrap(),
195            )
196            .header(
197                "x-reduct-entries",
198                HeaderValue::from_str(&encode_entries(&entries)).unwrap(),
199            );
200
201        for record in &records {
202            let idx = *entry_index.get(record.entry()).unwrap();
203            let delta = record.timestamp_us() - start_ts;
204            let value = match self.batch_type {
205                WriteBatchType::Write => make_record_header_value(record),
206                WriteBatchType::Update => make_update_header_value(record),
207                WriteBatchType::Remove => String::new(),
208            };
209            let header_value = if value.is_empty() {
210                HeaderValue::from_static("")
211            } else {
212                HeaderValue::from_str(&value).unwrap()
213            };
214            request = request.header(make_batched_header_name(idx, delta), header_value);
215        }
216
217        let response = match self.batch_type {
218            WriteBatchType::Write => {
219                let client = Arc::clone(&self.client);
220                let stream = stream! {
221                    for record in records {
222                        let mut stream = record.stream_bytes();
223                        while let Some(bytes) = stream.next().await {
224                            yield bytes;
225                        }
226                    }
227                };
228
229                client
230                    .send_request(request.body(Body::wrap_stream(stream)))
231                    .await?
232            }
233            WriteBatchType::Update | WriteBatchType::Remove => {
234                self.client.send_request(request).await?
235            }
236        };
237
238        let mut failed_records = FailedRecordMap::new();
239        response
240            .headers()
241            .iter()
242            .filter(|(key, _)| key.as_str().starts_with("x-reduct-error-"))
243            .for_each(|(key, value)| {
244                if let Some((entry_idx, delta)) = parse_error_key(key.as_str()) {
245                    if let Some(entry) = entries.get(entry_idx) {
246                        if let Some((status, message)) = value.to_str().unwrap().split_once(',') {
247                            if let Ok(status) = status.parse::<i16>() {
248                                if let Ok(code) = ErrorCode::try_from(status) {
249                                    failed_records.insert(
250                                        (entry.to_string(), start_ts + delta),
251                                        ReductError::new(code, message),
252                                    );
253                                }
254                            }
255                        }
256                    }
257                }
258            });
259
260        Ok(failed_records)
261    }
262
263    /// Get the size of the batch in bytes.
264    pub fn size(&self) -> usize {
265        self.records.iter().map(|r| r.content_length()).sum()
266    }
267
268    /// Get the number of records in the batch.
269    pub fn record_count(&self) -> usize {
270        self.records.len()
271    }
272
273    /// Get the last time a record was added to the batch.
274    ///
275    /// Can be used for sending the batch after a certain period of time.
276    pub fn last_access(&self) -> SystemTime {
277        self.last_access
278    }
279
280    /// Clear the batch of records.
281    pub fn clear(&mut self) {
282        self.records.clear();
283    }
284}
285
286fn encode_entries(entries: &[String]) -> String {
287    entries
288        .iter()
289        .map(|entry| encode_entry_name(entry))
290        .collect::<Vec<_>>()
291        .join(",")
292}
293
294fn make_record_header_value(record: &Record) -> String {
295    let content_type = if record.content_type().is_empty() {
296        "application/octet-stream"
297    } else {
298        record.content_type()
299    };
300
301    let labels = record.labels();
302    if labels.is_empty() {
303        format!("{},{}", record.content_length(), content_type)
304    } else {
305        format!(
306            "{},{},{}",
307            record.content_length(),
308            content_type,
309            format_label_delta(labels)
310        )
311    }
312}
313
314fn make_update_header_value(record: &Record) -> String {
315    let labels = record.labels();
316    if labels.is_empty() {
317        "0,application/octet-stream".to_string()
318    } else {
319        format!("0,application/octet-stream,{}", format_label_delta(labels))
320    }
321}
322
323fn format_label_delta(labels: &crate::Labels) -> String {
324    let mut pairs: Vec<_> = labels.iter().collect();
325    pairs.sort_by(|(a, _), (b, _)| a.cmp(b));
326
327    pairs
328        .into_iter()
329        .map(|(key, value)| {
330            if value.contains(',') {
331                format!("{}=\"{}\"", key, value)
332            } else {
333                format!("{}={}", key, value)
334            }
335        })
336        .collect::<Vec<_>>()
337        .join(",")
338}
339
340fn parse_error_key(key: &str) -> Option<(usize, u64)> {
341    let suffix = key.strip_prefix("x-reduct-error-")?;
342    let (entry_idx, delta) = suffix.rsplit_once('-')?;
343    Some((entry_idx.parse().ok()?, delta.parse().ok()?))
344}