reduct_rs/record/
write_record.rs1use crate::http_client::HttpClient;
7use crate::record::{from_system_time, Labels};
8use bytes::Bytes;
9
10use futures::TryStream;
11
12use reqwest::header::{CONTENT_LENGTH, CONTENT_TYPE};
13use reqwest::{Body, Method};
14
15use reduct_base::error::{ErrorCode, ReductError};
16use std::sync::Arc;
17use std::time::SystemTime;
18
19pub struct WriteRecordBuilder {
21 bucket: String,
22 entry: String,
23 timestamp: Option<u64>,
24 labels: Labels,
25 content_type: String,
26 content_length: Option<u64>,
27 data: Option<Body>,
28 client: Arc<HttpClient>,
29}
30
31impl WriteRecordBuilder {
32 pub(crate) fn new(bucket: String, entry: String, client: Arc<HttpClient>) -> Self {
33 Self {
34 timestamp: None,
35 labels: Labels::new(),
36 content_type: "application/octet-stream".to_string(),
37 content_length: None,
38 data: None,
39 bucket,
40 entry,
41 client,
42 }
43 }
44
45 pub fn timestamp(mut self, timestamp: SystemTime) -> Self {
47 self.timestamp = Some(from_system_time(timestamp));
48 self
49 }
50
51 pub fn timestamp_us(mut self, timestamp: u64) -> Self {
53 self.timestamp = Some(timestamp);
54 self
55 }
56
57 pub fn labels(mut self, labels: Labels) -> Self {
59 self.labels = labels;
60 self
61 }
62
63 pub fn add_label<Str1, Str2>(mut self, key: Str1, value: Str2) -> Self
65 where
66 Str1: ToString,
67 Str2: ToString,
68 {
69 self.labels.insert(key.to_string(), value.to_string());
70 self
71 }
72
73 pub fn content_type<Str>(mut self, content_type: Str) -> Self
75 where
76 Str: Into<String>,
77 {
78 self.content_type = content_type.into();
79 self
80 }
81
82 pub fn content_length(mut self, content_length: u64) -> Self {
85 self.content_length = Some(content_length);
86 self
87 }
88
89 pub fn data<D>(mut self, data: D) -> Self
95 where
96 D: Into<Body>,
97 {
98 self.data = Some(data.into());
99 self
100 }
101
102 pub fn stream<S>(mut self, stream: S) -> Self
105 where
106 S: TryStream + Send + Sync + 'static,
107 S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
108 Bytes: From<S::Ok>,
109 {
110 self.data = Some(Body::wrap_stream(stream));
111 self
112 }
113
114 pub async fn send(self) -> Result<(), ReductError> {
116 let timestamp = self
117 .timestamp
118 .unwrap_or_else(|| from_system_time(SystemTime::now()));
119
120 let mut request = self.client.request(
121 Method::POST,
122 &format!("/b/{}/{}?ts={}", self.bucket, self.entry, timestamp),
123 );
124
125 for (key, value) in self.labels {
126 request = request.header(&format!("x-reduct-label-{}", key), value);
127 }
128
129 request = request.header(CONTENT_TYPE, self.content_type);
130 if let Some(content_length) = self.content_length {
131 request = request.header(CONTENT_LENGTH, content_length);
132 }
133
134 if let Some(data) = self.data {
135 request = request.body(data);
136 self.client.send_request(request).await?;
137 Ok(())
138 } else {
139 Err(ReductError::new(ErrorCode::BadRequest, "No data provided"))
140 }
141 }
142}