reduct_rs/bucket/
write.rs

1// Copyright 2024 ReductStore
2// This Source Code Form is subject to the terms of the Mozilla Public
3//    License, v. 2.0. If a copy of the MPL was not distributed with this
4//    file, You can obtain one at https://mozilla.org/MPL/2.0/.
5
6use crate::record::write_batched_records::WriteBatchType;
7use crate::{Bucket, WriteBatchBuilder, WriteRecordBuilder};
8use std::sync::Arc;
9
10impl Bucket {
11    /// Create a record to write to the bucket.
12    ///
13    /// # Arguments
14    ///
15    /// * `entry` - The entry to write to.
16    ///
17    /// # Returns
18    ///
19    /// Returns a record builder.
20    ///
21    /// # Example
22    ///
23    /// ```no_run
24    /// use reduct_rs::{ReductClient, ReductError};
25    /// use std::time::SystemTime;
26    ///
27    /// #[tokio::main]
28    /// async fn main() -> Result<(), ReductError> {
29    ///    let client = ReductClient::builder()
30    ///         .url("https://play.reduct.store")
31    ///         .api_token("reductstore")
32    ///         .build();
33    ///     let bucket = client.get_bucket("demo").await?;
34    ///     let record = bucket.write_record("entry-1").timestamp_us(1000).data("Some data").send().await?;
35    ///     Ok(())
36    /// }
37    /// ```
38    pub fn write_record(&self, entry: &str) -> WriteRecordBuilder {
39        WriteRecordBuilder::new(
40            self.name.clone(),
41            entry.to_string(),
42            Arc::clone(&self.http_client),
43        )
44    }
45
46    /// Create a batch to write to the bucket.
47    ///
48    /// # Arguments
49    ///
50    /// * `entry` - The entry to write to.
51    ///
52    /// # Returns
53    ///
54    /// Returns a batch builder.
55    pub fn write_batch(&self, entry: &str) -> WriteBatchBuilder {
56        WriteBatchBuilder::new(
57            self.name.clone(),
58            entry.to_string(),
59            Arc::clone(&self.http_client),
60            WriteBatchType::Write,
61        )
62    }
63}
64
65#[cfg(test)]
66mod tests {
67    use crate::bucket::tests::bucket;
68    use crate::{Bucket, RecordBuilder};
69    use bytes::Bytes;
70    use reduct_base::error::ErrorCode;
71    use rstest::rstest;
72
73    #[rstest]
74    #[tokio::test]
75    async fn test_bucket_write_record_data(#[future] bucket: Bucket) {
76        let bucket: Bucket = bucket.await;
77        bucket
78            .write_record("test")
79            .timestamp_us(1000)
80            .data(Bytes::from("Hey"))
81            .send()
82            .await
83            .unwrap();
84
85        let record = bucket
86            .read_record("test")
87            .timestamp_us(1000)
88            .send()
89            .await
90            .unwrap();
91
92        assert_eq!(record.bytes().await.unwrap(), Bytes::from("Hey"));
93    }
94
95    #[rstest]
96    #[tokio::test]
97    async fn test_bucket_write_record_stream(#[future] bucket: Bucket) {
98        let chunks: Vec<crate::client::Result<_>> = vec![Ok("hello"), Ok(" "), Ok("world")];
99
100        let stream = futures_util::stream::iter(chunks);
101
102        let bucket: Bucket = bucket.await;
103        bucket
104            .write_record("test")
105            .timestamp_us(1000)
106            .content_length(11)
107            .stream(Box::pin(stream))
108            .send()
109            .await
110            .unwrap();
111
112        let record = bucket
113            .read_record("test")
114            .timestamp_us(1000)
115            .send()
116            .await
117            .unwrap();
118        assert_eq!(record.bytes().await.unwrap(), Bytes::from("hello world"));
119    }
120
121    #[rstest]
122    #[tokio::test]
123    async fn test_batched_write(#[future] bucket: Bucket) {
124        let bucket: Bucket = bucket.await;
125        let batch = bucket.write_batch("test");
126
127        let record1 = RecordBuilder::new()
128            .timestamp_us(1000)
129            .data(Bytes::from("Hey,"))
130            .add_label("test", "1")
131            .content_type("text/plain")
132            .build();
133
134        let stream = futures_util::stream::iter(vec![Ok(Bytes::from("World"))]);
135
136        let record2 = RecordBuilder::new()
137            .timestamp_us(2000)
138            .stream(Box::pin(stream))
139            .add_label("test", "2")
140            .content_type("text/plain")
141            .content_length(5)
142            .build();
143
144        let error_map = batch
145            .add_record(record1)
146            .add_record(record2)
147            .send()
148            .await
149            .unwrap();
150        assert!(error_map.is_empty());
151
152        let record = bucket
153            .read_record("test")
154            .timestamp_us(1000)
155            .send()
156            .await
157            .unwrap();
158
159        assert_eq!(record.content_length(), 4);
160        assert_eq!(record.content_type(), "text/plain");
161        assert_eq!(record.labels().get("test"), Some(&"1".to_string()));
162        assert_eq!(record.bytes().await.unwrap(), Bytes::from("Hey,"));
163
164        let record = bucket
165            .read_record("test")
166            .timestamp_us(2000)
167            .send()
168            .await
169            .unwrap();
170
171        assert_eq!(record.content_length(), 5);
172        assert_eq!(record.content_type(), "text/plain");
173        assert_eq!(record.labels().get("test"), Some(&"2".to_string()));
174        assert_eq!(record.bytes().await.unwrap(), Bytes::from("World"));
175    }
176
177    #[rstest]
178    #[tokio::test]
179    async fn test_batched_write_with_error(#[future] bucket: Bucket) {
180        let bucket: Bucket = bucket.await;
181        bucket
182            .write_record("test")
183            .timestamp_us(1000)
184            .data(Bytes::from("xxx"))
185            .send()
186            .await
187            .unwrap();
188
189        let batch = bucket.write_batch("test");
190        let record1 = RecordBuilder::new()
191            .timestamp_us(1000)
192            .data(Bytes::from("Hey,"))
193            .build();
194        let record2 = RecordBuilder::new()
195            .timestamp_us(2000)
196            .data(Bytes::from("World"))
197            .build();
198
199        let error_map = batch
200            .add_record(record1)
201            .add_record(record2)
202            .send()
203            .await
204            .unwrap();
205
206        assert_eq!(error_map.len(), 1);
207        assert_eq!(error_map.get(&1000).unwrap().status, ErrorCode::Conflict);
208        assert_eq!(
209            error_map.get(&1000).unwrap().message,
210            "A record with timestamp 1000 already exists"
211        );
212    }
213
214    #[rstest]
215    #[tokio::test]
216    async fn test_batch_helper_methods(#[future] bucket: Bucket) {
217        let bucket: Bucket = bucket.await;
218        let mut batch = bucket.write_batch("test");
219
220        batch.append_record(
221            RecordBuilder::new()
222                .timestamp_us(1000)
223                .data(Bytes::from("Hey,"))
224                .build(),
225        );
226        batch.append_record(
227            RecordBuilder::new()
228                .timestamp_us(2000)
229                .data(Bytes::from("World"))
230                .build(),
231        );
232
233        assert_eq!(batch.record_count(), 2);
234        assert_eq!(batch.size(), 9);
235
236        batch.clear();
237        assert_eq!(batch.record_count(), 0);
238        assert_eq!(batch.size(), 0);
239    }
240}