Skip to main content

iii_sdk/
stream.rs

1//! Stream operations for atomic updates
2//!
3//! This module provides functionality for performing atomic updates on stream data.
4//!
5//! # Example
6//!
7//! ```rust,ignore
8//! use iii::{III, Streams, UpdateOp};
9//!
10//! let iii = III::new("ws://localhost:49134");
11//! iii.connect().await?;
12//!
13//! let streams = Streams::new(iii.clone());
14//!
15//! // Atomic update with multiple operations
16//! let result = streams.update(
17//!     "my-stream::group-1::item-1",
18//!     vec![
19//!         UpdateOp::increment("counter", 1),
20//!         UpdateOp::set("lastUpdated", serde_json::json!("2024-01-01")),
21//!     ],
22//! ).await?;
23//!
24//! println!("Old value: {:?}", result.old_value);
25//! println!("New value: {:?}", result.new_value);
26//! ```
27
28use crate::{
29    error::IIIError,
30    iii::III,
31    types::{StreamUpdateInput, UpdateOp, UpdateResult},
32};
33
34/// Provides atomic stream update operations
35#[derive(Clone)]
36pub struct Streams {
37    iii: III,
38}
39
40impl Streams {
41    /// Create a new Streams instance with the given iii
42    pub fn new(iii: III) -> Self {
43        Self { iii }
44    }
45
46    /// Perform an atomic update on a stream key
47    ///
48    /// All operations are applied atomically - either all succeed or none are applied.
49    ///
50    /// # Arguments
51    ///
52    /// * `key` - The stream key to update (format: "stream_name::group_id::item_id")
53    /// * `ops` - List of operations to apply atomically
54    ///
55    /// # Returns
56    ///
57    /// Returns `UpdateResult` containing the old and new values
58    ///
59    /// # Example
60    ///
61    /// ```rust,ignore
62    /// let result = streams.update(
63    ///     "orders::user-123::order-456",
64    ///     vec![
65    ///         UpdateOp::increment("total", 100),
66    ///         UpdateOp::set("status", "processing".into()),
67    ///     ],
68    /// ).await?;
69    /// ```
70    pub async fn update(
71        &self,
72        key: impl Into<String>,
73        ops: Vec<UpdateOp>,
74    ) -> Result<UpdateResult, IIIError> {
75        let input = StreamUpdateInput {
76            key: key.into(),
77            ops,
78        };
79
80        let result = self.iii.call("stream.update", input).await?;
81
82        serde_json::from_value(result).map_err(|e| IIIError::Serde(e.to_string()))
83    }
84
85    /// Atomically increment a numeric field
86    ///
87    /// Convenience method for a single increment operation.
88    ///
89    /// # Example
90    ///
91    /// ```rust,ignore
92    /// streams.increment("counters::daily::page-views", "count", 1).await?;
93    /// ```
94    pub async fn increment(
95        &self,
96        key: impl Into<String>,
97        field: impl Into<String>,
98        by: i64,
99    ) -> Result<UpdateResult, IIIError> {
100        self.update(key, vec![UpdateOp::increment(field.into(), by)])
101            .await
102    }
103
104    /// Atomically decrement a numeric field
105    ///
106    /// Convenience method for a single decrement operation.
107    pub async fn decrement(
108        &self,
109        key: impl Into<String>,
110        field: impl Into<String>,
111        by: i64,
112    ) -> Result<UpdateResult, IIIError> {
113        self.update(key, vec![UpdateOp::decrement(field.into(), by)])
114            .await
115    }
116
117    /// Atomically set a field value
118    ///
119    /// Convenience method for a single set operation.
120    ///
121    /// # Example
122    ///
123    /// ```rust,ignore
124    /// streams.set_field("users::active::user-1", "status", "online".into()).await?;
125    /// ```
126    pub async fn set_field(
127        &self,
128        key: impl Into<String>,
129        field: impl Into<String>,
130        value: impl Into<serde_json::Value>,
131    ) -> Result<UpdateResult, IIIError> {
132        self.update(key, vec![UpdateOp::set(field.into(), value.into())])
133            .await
134    }
135
136    /// Atomically remove a field
137    ///
138    /// Convenience method for a single remove operation.
139    pub async fn remove_field(
140        &self,
141        key: impl Into<String>,
142        field: impl Into<String>,
143    ) -> Result<UpdateResult, IIIError> {
144        self.update(key, vec![UpdateOp::remove(field.into())]).await
145    }
146
147    /// Atomically merge an object into the existing value
148    ///
149    /// Convenience method for a single merge operation.
150    ///
151    /// # Example
152    ///
153    /// ```rust,ignore
154    /// streams.merge(
155    ///     "settings::user-1::preferences",
156    ///     serde_json::json!({"theme": "dark", "language": "en"}),
157    /// ).await?;
158    /// ```
159    pub async fn merge(
160        &self,
161        key: impl Into<String>,
162        value: impl Into<serde_json::Value>,
163    ) -> Result<UpdateResult, IIIError> {
164        self.update(key, vec![UpdateOp::merge(value.into())]).await
165    }
166}
167
168/// Builder for creating multiple update operations
169#[derive(Debug, Clone, Default)]
170pub struct UpdateBuilder {
171    ops: Vec<UpdateOp>,
172}
173
174impl UpdateBuilder {
175    /// Create a new UpdateBuilder
176    pub fn new() -> Self {
177        Self::default()
178    }
179
180    /// Add a set operation
181    pub fn set(mut self, path: impl Into<String>, value: impl Into<serde_json::Value>) -> Self {
182        self.ops.push(UpdateOp::set(path.into(), value.into()));
183        self
184    }
185
186    /// Add an increment operation
187    pub fn increment(mut self, path: impl Into<String>, by: i64) -> Self {
188        self.ops.push(UpdateOp::increment(path.into(), by));
189        self
190    }
191
192    /// Add a decrement operation
193    pub fn decrement(mut self, path: impl Into<String>, by: i64) -> Self {
194        self.ops.push(UpdateOp::decrement(path.into(), by));
195        self
196    }
197
198    /// Add a remove operation
199    pub fn remove(mut self, path: impl Into<String>) -> Self {
200        self.ops.push(UpdateOp::remove(path.into()));
201        self
202    }
203
204    /// Add a merge operation
205    pub fn merge(mut self, value: impl Into<serde_json::Value>) -> Self {
206        self.ops.push(UpdateOp::merge(value.into()));
207        self
208    }
209
210    /// Build the list of operations
211    pub fn build(self) -> Vec<UpdateOp> {
212        self.ops
213    }
214}