reduct_rs/bucket/
update.rs

1// Copyright 2024 ReductStore
2// This Source Code Form is subject to the terms of the Mozilla Public
3//    License, v. 2.0. If a copy of the MPL was not distributed with this
4//    file, You can obtain one at https://mozilla.org/MPL/2.0/.
5
6use crate::record::update_record::UpdateRecordBuilder;
7use crate::record::write_batched_records::WriteBatchType;
8use crate::{Bucket, WriteBatchBuilder};
9use std::sync::Arc;
10
11impl Bucket {
12    /// Update labels of a record in an entry.
13    ///
14    /// # Arguments
15    ///
16    /// * `entry` - The entry to update.
17    ///
18    /// # Returns
19    ///
20    /// Returns a builder to update the record and send the request.
21    pub fn update_record(&self, entry: &str) -> UpdateRecordBuilder {
22        UpdateRecordBuilder::new(
23            self.name.clone(),
24            entry.to_string(),
25            Arc::clone(&self.http_client),
26        )
27    }
28
29    /// Create a batch to update records in the bucket.
30    ///
31    /// You should use RecordBuilder to create the records to update.
32    /// Add labels to the record to update. Labels with an empty value will be removed.
33    ///
34    /// # Arguments
35    ///
36    /// * `entry` - The entry to update.
37    ///
38    /// # Returns
39    ///
40    /// Returns a batch builder.
41    ///
42    /// # Example
43    ///
44    /// ```no_run
45    /// use tokio;
46    /// use reduct_rs::{ReductClient, ReductError};
47    /// use reduct_rs::RecordBuilder;
48    /// #[tokio::main]
49    /// async fn main() -> Result<(), ReductError> {
50    ///     let client = ReductClient::builder()
51    ///        .url("https://play.reduct.store")
52    ///        .api_token("reductstore")
53    ///        .build();
54    ///     let bucket = client.get_bucket("datasets").await?;
55    ///     let batch = bucket.update_batch("cats");
56    ///     let record1 = RecordBuilder::new()
57    ///         .timestamp_us(1000)
58    ///         .add_label("test".to_string(), "2".to_string())
59    ///         .add_label("x".to_string(), "".to_string()) // Remove label x
60    ///         .build();
61    ///
62    ///     batch.add_record(record1).send().await?;
63    ///     Ok(())
64    /// }
65    pub fn update_batch(&self, entry: &str) -> WriteBatchBuilder {
66        WriteBatchBuilder::new(
67            self.name.clone(),
68            entry.to_string(),
69            Arc::clone(&self.http_client),
70            WriteBatchType::Update,
71        )
72    }
73}
74
75#[cfg(test)]
76mod tests {
77    use crate::bucket::tests::bucket;
78    use crate::{Bucket, RecordBuilder};
79    use bytes::Bytes;
80    use reduct_base::error::ErrorCode;
81    use rstest::rstest;
82
83    #[rstest]
84    #[tokio::test]
85    async fn test_update_record(#[future] bucket: Bucket) {
86        let bucket: Bucket = bucket.await;
87        bucket
88            .write_record("test")
89            .timestamp_us(1000)
90            .data(Bytes::from("Hey"))
91            .add_label("test", "1")
92            .add_label("x", "y")
93            .send()
94            .await
95            .unwrap();
96
97        bucket
98            .update_record("test")
99            .timestamp_us(1000)
100            .update_label("test", "2")
101            .remove_label("x")
102            .send()
103            .await
104            .unwrap();
105
106        let record = bucket
107            .read_record("test")
108            .timestamp_us(1000)
109            .send()
110            .await
111            .unwrap();
112
113        assert_eq!(record.labels().get("test"), Some(&"2".to_string()));
114    }
115
116    #[rstest]
117    #[tokio::test]
118    async fn test_update_record_batched(#[future] bucket: Bucket) {
119        let bucket: Bucket = bucket.await;
120        bucket
121            .write_record("test")
122            .timestamp_us(1000)
123            .data(Bytes::from("Hey"))
124            .add_label("test", "1")
125            .add_label("x", "y")
126            .send()
127            .await
128            .unwrap();
129
130        let batch = bucket.update_batch("test");
131        let record1 = RecordBuilder::new()
132            .timestamp_us(1000)
133            .add_label("test".to_string(), "2".to_string())
134            .add_label("x".to_string(), "".to_string())
135            .build();
136        let record2 = RecordBuilder::new()
137            .timestamp_us(10000)
138            .add_label("test".to_string(), "3".to_string())
139            .build();
140
141        let error_map = batch
142            .add_record(record1)
143            .add_record(record2)
144            .send()
145            .await
146            .unwrap();
147
148        assert_eq!(error_map.len(), 1);
149        assert_eq!(error_map.get(&10000).unwrap().status, ErrorCode::NotFound);
150
151        let record = bucket
152            .read_record("test")
153            .timestamp_us(1000)
154            .send()
155            .await
156            .unwrap();
157
158        assert_eq!(record.labels().get("test"), Some(&"2".to_string()));
159        assert_eq!(record.labels().get("x"), None);
160    }
161}