1use crate::http_client::HttpClient;
7use crate::record::write_batched_records_v1::WriteBatchType;
8use crate::Record;
9use async_stream::stream;
10use futures_util::StreamExt;
11use reduct_base::batch::v2::{encode_entry_name, make_batched_header_name};
12use reduct_base::error::{ErrorCode, ReductError};
13use reqwest::header::{HeaderValue, CONTENT_LENGTH, CONTENT_TYPE};
14use reqwest::{Body, Method};
15use std::collections::{BTreeMap, HashMap, VecDeque};
16use std::sync::Arc;
17use std::time::SystemTime;
18
19pub struct WriteRecordBatchBuilder {
21 bucket: String,
22 batch_type: WriteBatchType,
23 records: VecDeque<Record>,
24 client: Arc<HttpClient>,
25 last_access: SystemTime,
26}
27
28type FailedRecordMap = BTreeMap<(String, u64), ReductError>;
29
30impl WriteRecordBatchBuilder {
31 pub(crate) fn new(bucket: String, client: Arc<HttpClient>, batch_type: WriteBatchType) -> Self {
32 Self {
33 bucket,
34 batch_type,
35 records: VecDeque::new(),
36 client,
37 last_access: SystemTime::now(),
38 }
39 }
40
41 pub fn add_record(mut self, record: Record) -> Self {
51 self.records.push_back(record);
52 self.last_access = SystemTime::now();
53 self
54 }
55
56 pub fn append_record(&mut self, record: Record) {
62 self.records.push_back(record);
63 self.last_access = SystemTime::now();
64 }
65
66 pub fn add_records(mut self, records: Vec<Record>) -> Self {
76 self.records.extend(records);
77 self.last_access = SystemTime::now();
78 self
79 }
80
81 pub fn append_records(&mut self, records: Vec<Record>) {
88 self.records.extend(records);
89 self.last_access = SystemTime::now();
90 }
91
92 pub async fn send(mut self) -> Result<FailedRecordMap, ReductError> {
102 if let Some(version) = self.client.get_api_version().await {
103 if version.1 < 18 {
104 let message = match self.batch_type {
105 WriteBatchType::Write => {
106 "Multi-entry batch writes are not supported in API versions below v1.18"
107 }
108 WriteBatchType::Update => {
109 "Multi-entry batch updates are not supported in API versions below v1.18"
110 }
111 WriteBatchType::Remove => {
112 "Multi-entry batch remove is not supported in API versions below v1.18"
113 }
114 };
115 return Err(ReductError::new(ErrorCode::InvalidRequest, message));
116 }
117 }
118
119 if self.records.is_empty() {
120 return Err(ReductError::new(
121 ErrorCode::InvalidRequest,
122 "Batch must contain at least one record",
123 ));
124 }
125
126 let mut entries = Vec::new();
127 let mut entry_index = HashMap::new();
128 for record in &self.records {
129 if record.entry().is_empty() {
130 return Err(ReductError::new(
131 ErrorCode::InvalidRequest,
132 "Record entry name is required for multi-entry batch operations",
133 ));
134 }
135
136 if !entry_index.contains_key(record.entry()) {
137 let index = entries.len();
138 entries.push(record.entry().to_string());
139 entry_index.insert(record.entry().to_string(), index);
140 }
141 }
142
143 let mut records: Vec<Record> = self.records.drain(..).collect();
144 let start_ts = records
145 .iter()
146 .map(|record| record.timestamp_us())
147 .min()
148 .unwrap();
149
150 records.sort_by(|left, right| {
151 let left_idx = entry_index.get(left.entry()).unwrap();
152 let right_idx = entry_index.get(right.entry()).unwrap();
153 left_idx
154 .cmp(right_idx)
155 .then_with(|| left.timestamp_us().cmp(&right.timestamp_us()))
156 });
157
158 let mut request = match self.batch_type {
159 WriteBatchType::Write => self
160 .client
161 .request(Method::POST, &format!("/io/{}/write", self.bucket))
162 .header(
163 CONTENT_TYPE,
164 HeaderValue::from_static("application/octet-stream"),
165 )
166 .header(
167 CONTENT_LENGTH,
168 HeaderValue::from_str(
169 &records
170 .iter()
171 .map(|r| r.content_length())
172 .sum::<usize>()
173 .to_string(),
174 )
175 .unwrap(),
176 ),
177 WriteBatchType::Update => self
178 .client
179 .request(Method::PATCH, &format!("/io/{}/update", self.bucket))
180 .header(
181 CONTENT_TYPE,
182 HeaderValue::from_static("application/octet-stream"),
183 )
184 .header(CONTENT_LENGTH, HeaderValue::from_static("0")),
185 WriteBatchType::Remove => self
186 .client
187 .request(Method::DELETE, &format!("/io/{}/remove", self.bucket))
188 .header(CONTENT_LENGTH, HeaderValue::from_static("0")),
189 };
190
191 request = request
192 .header(
193 "x-reduct-start-ts",
194 HeaderValue::from_str(&start_ts.to_string()).unwrap(),
195 )
196 .header(
197 "x-reduct-entries",
198 HeaderValue::from_str(&encode_entries(&entries)).unwrap(),
199 );
200
201 for record in &records {
202 let idx = *entry_index.get(record.entry()).unwrap();
203 let delta = record.timestamp_us() - start_ts;
204 let value = match self.batch_type {
205 WriteBatchType::Write => make_record_header_value(record),
206 WriteBatchType::Update => make_update_header_value(record),
207 WriteBatchType::Remove => String::new(),
208 };
209 let header_value = if value.is_empty() {
210 HeaderValue::from_static("")
211 } else {
212 HeaderValue::from_str(&value).unwrap()
213 };
214 request = request.header(make_batched_header_name(idx, delta), header_value);
215 }
216
217 let response = match self.batch_type {
218 WriteBatchType::Write => {
219 let client = Arc::clone(&self.client);
220 let stream = stream! {
221 for record in records {
222 let mut stream = record.stream_bytes();
223 while let Some(bytes) = stream.next().await {
224 yield bytes;
225 }
226 }
227 };
228
229 client
230 .send_request(request.body(Body::wrap_stream(stream)))
231 .await?
232 }
233 WriteBatchType::Update | WriteBatchType::Remove => {
234 self.client.send_request(request).await?
235 }
236 };
237
238 let mut failed_records = FailedRecordMap::new();
239 response
240 .headers()
241 .iter()
242 .filter(|(key, _)| key.as_str().starts_with("x-reduct-error-"))
243 .for_each(|(key, value)| {
244 if let Some((entry_idx, delta)) = parse_error_key(key.as_str()) {
245 if let Some(entry) = entries.get(entry_idx) {
246 if let Some((status, message)) = value.to_str().unwrap().split_once(',') {
247 if let Ok(status) = status.parse::<i16>() {
248 if let Ok(code) = ErrorCode::try_from(status) {
249 failed_records.insert(
250 (entry.to_string(), start_ts + delta),
251 ReductError::new(code, message),
252 );
253 }
254 }
255 }
256 }
257 }
258 });
259
260 Ok(failed_records)
261 }
262
263 pub fn size(&self) -> usize {
265 self.records.iter().map(|r| r.content_length()).sum()
266 }
267
268 pub fn record_count(&self) -> usize {
270 self.records.len()
271 }
272
273 pub fn last_access(&self) -> SystemTime {
277 self.last_access
278 }
279
280 pub fn clear(&mut self) {
282 self.records.clear();
283 }
284}
285
286fn encode_entries(entries: &[String]) -> String {
287 entries
288 .iter()
289 .map(|entry| encode_entry_name(entry))
290 .collect::<Vec<_>>()
291 .join(",")
292}
293
294fn make_record_header_value(record: &Record) -> String {
295 let content_type = if record.content_type().is_empty() {
296 "application/octet-stream"
297 } else {
298 record.content_type()
299 };
300
301 let labels = record.labels();
302 if labels.is_empty() {
303 format!("{},{}", record.content_length(), content_type)
304 } else {
305 format!(
306 "{},{},{}",
307 record.content_length(),
308 content_type,
309 format_label_delta(labels)
310 )
311 }
312}
313
314fn make_update_header_value(record: &Record) -> String {
315 let labels = record.labels();
316 if labels.is_empty() {
317 "0,application/octet-stream".to_string()
318 } else {
319 format!("0,application/octet-stream,{}", format_label_delta(labels))
320 }
321}
322
323fn format_label_delta(labels: &crate::Labels) -> String {
324 let mut pairs: Vec<_> = labels.iter().collect();
325 pairs.sort_by(|(a, _), (b, _)| a.cmp(b));
326
327 pairs
328 .into_iter()
329 .map(|(key, value)| {
330 if value.contains(',') {
331 format!("{}=\"{}\"", key, value)
332 } else {
333 format!("{}={}", key, value)
334 }
335 })
336 .collect::<Vec<_>>()
337 .join(",")
338}
339
340fn parse_error_key(key: &str) -> Option<(usize, u64)> {
341 let suffix = key.strip_prefix("x-reduct-error-")?;
342 let (entry_idx, delta) = suffix.rsplit_once('-')?;
343 Some((entry_idx.parse().ok()?, delta.parse().ok()?))
344}