iii_sdk/streams.rs
1//! Stream operations for atomic updates
2//!
3//! This module provides functionality for performing atomic updates on stream data.
4//!
5//! # Example
6//!
7//! ```rust,ignore
8//! use iii::{III, Streams, UpdateOp};
9//!
10//! let iii = III::new("ws://localhost:49134");
11//! iii.connect().await?;
12//!
13//! let streams = Streams::new(iii.clone());
14//!
15//! // Atomic update with multiple operations
16//! let result = streams.update(
17//! "my-stream::group-1::item-1",
18//! vec![
19//! UpdateOp::increment("counter", 1),
20//! UpdateOp::set("lastUpdated", serde_json::json!("2024-01-01")),
21//! ],
22//! ).await?;
23//!
24//! println!("Old value: {:?}", result.old_value);
25//! println!("New value: {:?}", result.new_value);
26//! ```
27
28use crate::{
29 error::IIIError,
30 iii::III,
31 types::{StreamUpdateInput, UpdateOp, UpdateResult},
32};
33
34/// Provides atomic stream update operations
35#[derive(Clone)]
36pub struct Streams {
37 iii: III,
38}
39
40impl Streams {
41 /// Create a new Streams instance with the given iii
42 pub fn new(iii: III) -> Self {
43 Self { iii }
44 }
45
46 /// Perform an atomic update on a stream key
47 ///
48 /// All operations are applied atomically - either all succeed or none are applied.
49 ///
50 /// # Arguments
51 ///
52 /// * `key` - The stream key to update (format: "stream_name::group_id::item_id")
53 /// * `ops` - List of operations to apply atomically
54 ///
55 /// # Returns
56 ///
57 /// Returns `UpdateResult` containing the old and new values
58 ///
59 /// # Example
60 ///
61 /// ```rust,ignore
62 /// let result = streams.update(
63 /// "orders::user-123::order-456",
64 /// vec![
65 /// UpdateOp::increment("total", 100),
66 /// UpdateOp::set("status", "processing".into()),
67 /// ],
68 /// ).await?;
69 /// ```
70 pub async fn update(
71 &self,
72 key: impl Into<String>,
73 ops: Vec<UpdateOp>,
74 ) -> Result<UpdateResult, IIIError> {
75 let input = StreamUpdateInput {
76 key: key.into(),
77 ops,
78 };
79
80 let result = self.iii.call("streams.update", input).await?;
81
82 serde_json::from_value(result).map_err(|e| IIIError::Serde(e.to_string()))
83 }
84
85 /// Atomically increment a numeric field
86 ///
87 /// Convenience method for a single increment operation.
88 ///
89 /// # Example
90 ///
91 /// ```rust,ignore
92 /// streams.increment("counters::daily::page-views", "count", 1).await?;
93 /// ```
94 pub async fn increment(
95 &self,
96 key: impl Into<String>,
97 field: impl Into<String>,
98 by: i64,
99 ) -> Result<UpdateResult, IIIError> {
100 self.update(key, vec![UpdateOp::increment(field.into(), by)])
101 .await
102 }
103
104 /// Atomically decrement a numeric field
105 ///
106 /// Convenience method for a single decrement operation.
107 pub async fn decrement(
108 &self,
109 key: impl Into<String>,
110 field: impl Into<String>,
111 by: i64,
112 ) -> Result<UpdateResult, IIIError> {
113 self.update(key, vec![UpdateOp::decrement(field.into(), by)])
114 .await
115 }
116
117 /// Atomically set a field value
118 ///
119 /// Convenience method for a single set operation.
120 ///
121 /// # Example
122 ///
123 /// ```rust,ignore
124 /// streams.set_field("users::active::user-1", "status", "online".into()).await?;
125 /// ```
126 pub async fn set_field(
127 &self,
128 key: impl Into<String>,
129 field: impl Into<String>,
130 value: impl Into<serde_json::Value>,
131 ) -> Result<UpdateResult, IIIError> {
132 self.update(key, vec![UpdateOp::set(field.into(), value.into())])
133 .await
134 }
135
136 /// Atomically remove a field
137 ///
138 /// Convenience method for a single remove operation.
139 pub async fn remove_field(
140 &self,
141 key: impl Into<String>,
142 field: impl Into<String>,
143 ) -> Result<UpdateResult, IIIError> {
144 self.update(key, vec![UpdateOp::remove(field.into())]).await
145 }
146
147 /// Atomically merge an object into the existing value
148 ///
149 /// Convenience method for a single merge operation.
150 ///
151 /// # Example
152 ///
153 /// ```rust,ignore
154 /// streams.merge(
155 /// "settings::user-1::preferences",
156 /// serde_json::json!({"theme": "dark", "language": "en"}),
157 /// ).await?;
158 /// ```
159 pub async fn merge(
160 &self,
161 key: impl Into<String>,
162 value: impl Into<serde_json::Value>,
163 ) -> Result<UpdateResult, IIIError> {
164 self.update(key, vec![UpdateOp::merge(value.into())]).await
165 }
166}
167
168/// Builder for creating multiple update operations
169#[derive(Debug, Clone, Default)]
170pub struct UpdateBuilder {
171 ops: Vec<UpdateOp>,
172}
173
174impl UpdateBuilder {
175 /// Create a new UpdateBuilder
176 pub fn new() -> Self {
177 Self::default()
178 }
179
180 /// Add a set operation
181 pub fn set(mut self, path: impl Into<String>, value: impl Into<serde_json::Value>) -> Self {
182 self.ops.push(UpdateOp::set(path.into(), value.into()));
183 self
184 }
185
186 /// Add an increment operation
187 pub fn increment(mut self, path: impl Into<String>, by: i64) -> Self {
188 self.ops.push(UpdateOp::increment(path.into(), by));
189 self
190 }
191
192 /// Add a decrement operation
193 pub fn decrement(mut self, path: impl Into<String>, by: i64) -> Self {
194 self.ops.push(UpdateOp::decrement(path.into(), by));
195 self
196 }
197
198 /// Add a remove operation
199 pub fn remove(mut self, path: impl Into<String>) -> Self {
200 self.ops.push(UpdateOp::remove(path.into()));
201 self
202 }
203
204 /// Add a merge operation
205 pub fn merge(mut self, value: impl Into<serde_json::Value>) -> Self {
206 self.ops.push(UpdateOp::merge(value.into()));
207 self
208 }
209
210 /// Build the list of operations
211 pub fn build(self) -> Vec<UpdateOp> {
212 self.ops
213 }
214}