reduct_rs/bucket/
write.rs1use crate::record::write_batched_records::WriteBatchType;
7use crate::{Bucket, WriteBatchBuilder, WriteRecordBuilder};
8use std::sync::Arc;
9
10impl Bucket {
11 pub fn write_record(&self, entry: &str) -> WriteRecordBuilder {
39 WriteRecordBuilder::new(
40 self.name.clone(),
41 entry.to_string(),
42 Arc::clone(&self.http_client),
43 )
44 }
45
46 pub fn write_batch(&self, entry: &str) -> WriteBatchBuilder {
56 WriteBatchBuilder::new(
57 self.name.clone(),
58 entry.to_string(),
59 Arc::clone(&self.http_client),
60 WriteBatchType::Write,
61 )
62 }
63}
64
65#[cfg(test)]
66mod tests {
67 use crate::bucket::tests::bucket;
68 use crate::{Bucket, RecordBuilder};
69 use bytes::Bytes;
70 use reduct_base::error::ErrorCode;
71 use rstest::rstest;
72
73 #[rstest]
74 #[tokio::test]
75 async fn test_bucket_write_record_data(#[future] bucket: Bucket) {
76 let bucket: Bucket = bucket.await;
77 bucket
78 .write_record("test")
79 .timestamp_us(1000)
80 .data(Bytes::from("Hey"))
81 .send()
82 .await
83 .unwrap();
84
85 let record = bucket
86 .read_record("test")
87 .timestamp_us(1000)
88 .send()
89 .await
90 .unwrap();
91
92 assert_eq!(record.bytes().await.unwrap(), Bytes::from("Hey"));
93 }
94
95 #[rstest]
96 #[tokio::test]
97 async fn test_bucket_write_record_stream(#[future] bucket: Bucket) {
98 let chunks: Vec<crate::client::Result<_>> = vec![Ok("hello"), Ok(" "), Ok("world")];
99
100 let stream = futures_util::stream::iter(chunks);
101
102 let bucket: Bucket = bucket.await;
103 bucket
104 .write_record("test")
105 .timestamp_us(1000)
106 .content_length(11)
107 .stream(Box::pin(stream))
108 .send()
109 .await
110 .unwrap();
111
112 let record = bucket
113 .read_record("test")
114 .timestamp_us(1000)
115 .send()
116 .await
117 .unwrap();
118 assert_eq!(record.bytes().await.unwrap(), Bytes::from("hello world"));
119 }
120
121 #[rstest]
122 #[tokio::test]
123 async fn test_batched_write(#[future] bucket: Bucket) {
124 let bucket: Bucket = bucket.await;
125 let batch = bucket.write_batch("test");
126
127 let record1 = RecordBuilder::new()
128 .timestamp_us(1000)
129 .data(Bytes::from("Hey,"))
130 .add_label("test", "1")
131 .content_type("text/plain")
132 .build();
133
134 let stream = futures_util::stream::iter(vec![Ok(Bytes::from("World"))]);
135
136 let record2 = RecordBuilder::new()
137 .timestamp_us(2000)
138 .stream(Box::pin(stream))
139 .add_label("test", "2")
140 .content_type("text/plain")
141 .content_length(5)
142 .build();
143
144 let error_map = batch
145 .add_record(record1)
146 .add_record(record2)
147 .send()
148 .await
149 .unwrap();
150 assert!(error_map.is_empty());
151
152 let record = bucket
153 .read_record("test")
154 .timestamp_us(1000)
155 .send()
156 .await
157 .unwrap();
158
159 assert_eq!(record.content_length(), 4);
160 assert_eq!(record.content_type(), "text/plain");
161 assert_eq!(record.labels().get("test"), Some(&"1".to_string()));
162 assert_eq!(record.bytes().await.unwrap(), Bytes::from("Hey,"));
163
164 let record = bucket
165 .read_record("test")
166 .timestamp_us(2000)
167 .send()
168 .await
169 .unwrap();
170
171 assert_eq!(record.content_length(), 5);
172 assert_eq!(record.content_type(), "text/plain");
173 assert_eq!(record.labels().get("test"), Some(&"2".to_string()));
174 assert_eq!(record.bytes().await.unwrap(), Bytes::from("World"));
175 }
176
177 #[rstest]
178 #[tokio::test]
179 async fn test_batched_write_with_error(#[future] bucket: Bucket) {
180 let bucket: Bucket = bucket.await;
181 bucket
182 .write_record("test")
183 .timestamp_us(1000)
184 .data(Bytes::from("xxx"))
185 .send()
186 .await
187 .unwrap();
188
189 let batch = bucket.write_batch("test");
190 let record1 = RecordBuilder::new()
191 .timestamp_us(1000)
192 .data(Bytes::from("Hey,"))
193 .build();
194 let record2 = RecordBuilder::new()
195 .timestamp_us(2000)
196 .data(Bytes::from("World"))
197 .build();
198
199 let error_map = batch
200 .add_record(record1)
201 .add_record(record2)
202 .send()
203 .await
204 .unwrap();
205
206 assert_eq!(error_map.len(), 1);
207 assert_eq!(error_map.get(&1000).unwrap().status, ErrorCode::Conflict);
208 assert_eq!(
209 error_map.get(&1000).unwrap().message,
210 "A record with timestamp 1000 already exists"
211 );
212 }
213
214 #[rstest]
215 #[tokio::test]
216 async fn test_batch_helper_methods(#[future] bucket: Bucket) {
217 let bucket: Bucket = bucket.await;
218 let mut batch = bucket.write_batch("test");
219
220 batch.append_record(
221 RecordBuilder::new()
222 .timestamp_us(1000)
223 .data(Bytes::from("Hey,"))
224 .build(),
225 );
226 batch.append_record(
227 RecordBuilder::new()
228 .timestamp_us(2000)
229 .data(Bytes::from("World"))
230 .build(),
231 );
232
233 assert_eq!(batch.record_count(), 2);
234 assert_eq!(batch.size(), 9);
235
236 batch.clear();
237 assert_eq!(batch.record_count(), 0);
238 assert_eq!(batch.size(), 0);
239 }
240}