reduct_rs/record/
write_record.rs

1// Copyright 2023 ReductStore
2// This Source Code Form is subject to the terms of the Mozilla Public
3//    License, v. 2.0. If a copy of the MPL was not distributed with this
4//    file, You can obtain one at https://mozilla.org/MPL/2.0/.
5
6use crate::http_client::HttpClient;
7use crate::record::{from_system_time, Labels};
8use bytes::Bytes;
9
10use futures::TryStream;
11
12use reqwest::header::{CONTENT_LENGTH, CONTENT_TYPE};
13use reqwest::{Body, Method};
14
15use reduct_base::error::{ErrorCode, ReductError};
16use std::sync::Arc;
17use std::time::SystemTime;
18
19/// Builder for a write record request.
20pub struct WriteRecordBuilder {
21    bucket: String,
22    entry: String,
23    timestamp: Option<u64>,
24    labels: Labels,
25    content_type: String,
26    content_length: Option<u64>,
27    data: Option<Body>,
28    client: Arc<HttpClient>,
29}
30
31impl WriteRecordBuilder {
32    pub(crate) fn new(bucket: String, entry: String, client: Arc<HttpClient>) -> Self {
33        Self {
34            timestamp: None,
35            labels: Labels::new(),
36            content_type: "application/octet-stream".to_string(),
37            content_length: None,
38            data: None,
39            bucket,
40            entry,
41            client,
42        }
43    }
44
45    /// Set the timestamp of the record to write.
46    pub fn timestamp(mut self, timestamp: SystemTime) -> Self {
47        self.timestamp = Some(from_system_time(timestamp));
48        self
49    }
50
51    /// Set the timestamp of the record to write as a unix timestamp in microseconds.
52    pub fn timestamp_us(mut self, timestamp: u64) -> Self {
53        self.timestamp = Some(timestamp);
54        self
55    }
56
57    /// Set the labels of the record to write.
58    pub fn labels(mut self, labels: Labels) -> Self {
59        self.labels = labels;
60        self
61    }
62
63    /// Add a label to the record to write.
64    pub fn add_label<Str1, Str2>(mut self, key: Str1, value: Str2) -> Self
65    where
66        Str1: ToString,
67        Str2: ToString,
68    {
69        self.labels.insert(key.to_string(), value.to_string());
70        self
71    }
72
73    /// Set the content type of the record to write.
74    pub fn content_type<Str>(mut self, content_type: Str) -> Self
75    where
76        Str: Into<String>,
77    {
78        self.content_type = content_type.into();
79        self
80    }
81
82    /// Set the content length of the record to write
83    /// (only required if the data is a stream).
84    pub fn content_length(mut self, content_length: u64) -> Self {
85        self.content_length = Some(content_length);
86        self
87    }
88
89    /// Set the data of the record to write.
90    ///
91    /// # Arguments
92    ///
93    /// * `data` - The data to write.
94    pub fn data<D>(mut self, data: D) -> Self
95    where
96        D: Into<Body>,
97    {
98        self.data = Some(data.into());
99        self
100    }
101
102    /// Set the data of the record to write as a stream.
103    /// The content length must be set.
104    pub fn stream<S>(mut self, stream: S) -> Self
105    where
106        S: TryStream + Send + Sync + 'static,
107        S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
108        Bytes: From<S::Ok>,
109    {
110        self.data = Some(Body::wrap_stream(stream));
111        self
112    }
113
114    /// Send the write record request.
115    pub async fn send(self) -> Result<(), ReductError> {
116        let timestamp = self
117            .timestamp
118            .unwrap_or_else(|| from_system_time(SystemTime::now()));
119
120        let mut request = self.client.request(
121            Method::POST,
122            &format!("/b/{}/{}?ts={}", self.bucket, self.entry, timestamp),
123        );
124
125        for (key, value) in self.labels {
126            request = request.header(&format!("x-reduct-label-{}", key), value);
127        }
128
129        request = request.header(CONTENT_TYPE, self.content_type);
130        if let Some(content_length) = self.content_length {
131            request = request.header(CONTENT_LENGTH, content_length);
132        }
133
134        if let Some(data) = self.data {
135            request = request.body(data);
136            self.client.send_request(request).await?;
137            Ok(())
138        } else {
139            Err(ReductError::new(ErrorCode::BadRequest, "No data provided"))
140        }
141    }
142}