iii_sdk/stream.rs
1//! Stream operations for atomic updates
2//!
3//! This module provides functionality for performing atomic updates on stream data.
4//!
5//! # Example
6//!
7//! ```rust,ignore
8//! use iii_sdk::{register_worker, InitOptions, Streams, UpdateOp};
9//!
10//! let iii = register_worker("ws://localhost:49134", InitOptions::default())?;
11//!
12//! let streams = Streams::new(iii.clone());
13//!
14//! // Atomic update with multiple operations
15//! let result = streams.update(
16//! "my-stream::group-1::item-1",
17//! vec![
18//! UpdateOp::increment("counter", 1),
19//! UpdateOp::set("lastUpdated", serde_json::json!("2024-01-01")),
20//! ],
21//! ).await?;
22//!
23//! println!("Old value: {:?}", result.old_value);
24//! println!("New value: {:?}", result.new_value);
25//! ```
26
27use crate::{
28 TriggerRequest,
29 error::IIIError,
30 iii::III,
31 types::{StreamUpdateInput, UpdateOp, UpdateResult},
32};
33
34/// Provides atomic stream update operations
35#[derive(Clone)]
36pub struct Streams {
37 iii: III,
38}
39
40impl Streams {
41 /// Create a new Streams instance with the given iii
42 pub fn new(iii: III) -> Self {
43 Self { iii }
44 }
45
46 /// Perform an atomic update on a stream key
47 ///
48 /// All operations are applied atomically - either all succeed or none are applied.
49 ///
50 /// # Arguments
51 ///
52 /// * `key` - The stream key to update (format: "stream_name::group_id::item_id")
53 /// * `ops` - List of operations to apply atomically
54 ///
55 /// # Returns
56 ///
57 /// Returns `UpdateResult` containing the old and new values
58 ///
59 /// # Example
60 ///
61 /// ```rust,ignore
62 /// let result = streams.update(
63 /// "orders::user-123::order-456",
64 /// vec![
65 /// UpdateOp::increment("total", 100),
66 /// UpdateOp::set("status", "processing".into()),
67 /// ],
68 /// ).await?;
69 /// ```
70 pub async fn update(
71 &self,
72 key: impl Into<String>,
73 ops: Vec<UpdateOp>,
74 ) -> Result<UpdateResult, IIIError> {
75 let key_str = key.into();
76 let parts: Vec<&str> = key_str.splitn(3, "::").collect();
77 let (stream_name, group_id, item_id) = match parts.len() {
78 3 => (
79 parts[0].to_string(),
80 parts[1].to_string(),
81 parts[2].to_string(),
82 ),
83 2 => (parts[0].to_string(), parts[1].to_string(), String::new()),
84 _ => (key_str, String::new(), String::new()),
85 };
86
87 let input = StreamUpdateInput {
88 stream_name,
89 group_id,
90 item_id,
91 ops,
92 };
93
94 let result = self
95 .iii
96 .trigger(TriggerRequest {
97 function_id: "stream::update".to_string(),
98 payload: serde_json::to_value(input).unwrap_or(serde_json::Value::Null),
99 action: None,
100 timeout_ms: None,
101 })
102 .await?;
103
104 serde_json::from_value(result).map_err(|e| IIIError::Serde(e.to_string()))
105 }
106
107 /// Atomically increment a numeric field
108 ///
109 /// Convenience method for a single increment operation.
110 ///
111 /// # Example
112 ///
113 /// ```rust,ignore
114 /// streams.increment("counters::daily::page-views", "count", 1).await?;
115 /// ```
116 pub async fn increment(
117 &self,
118 key: impl Into<String>,
119 field: impl Into<String>,
120 by: i64,
121 ) -> Result<UpdateResult, IIIError> {
122 self.update(key, vec![UpdateOp::increment(field.into(), by)])
123 .await
124 }
125
126 /// Atomically decrement a numeric field
127 ///
128 /// Convenience method for a single decrement operation.
129 pub async fn decrement(
130 &self,
131 key: impl Into<String>,
132 field: impl Into<String>,
133 by: i64,
134 ) -> Result<UpdateResult, IIIError> {
135 self.update(key, vec![UpdateOp::decrement(field.into(), by)])
136 .await
137 }
138
139 /// Atomically set a field value
140 ///
141 /// Convenience method for a single set operation.
142 ///
143 /// # Example
144 ///
145 /// ```rust,ignore
146 /// streams.set_field("users::active::user-1", "status", "online".into()).await?;
147 /// ```
148 pub async fn set_field(
149 &self,
150 key: impl Into<String>,
151 field: impl Into<String>,
152 value: impl Into<serde_json::Value>,
153 ) -> Result<UpdateResult, IIIError> {
154 self.update(key, vec![UpdateOp::set(field.into(), value.into())])
155 .await
156 }
157
158 /// Atomically remove a field
159 ///
160 /// Convenience method for a single remove operation.
161 pub async fn remove_field(
162 &self,
163 key: impl Into<String>,
164 field: impl Into<String>,
165 ) -> Result<UpdateResult, IIIError> {
166 self.update(key, vec![UpdateOp::remove(field.into())]).await
167 }
168
169 /// Atomically merge an object into the existing value
170 ///
171 /// Convenience method for a single merge operation.
172 ///
173 /// # Example
174 ///
175 /// ```rust,ignore
176 /// streams.merge(
177 /// "settings::user-1::preferences",
178 /// serde_json::json!({"theme": "dark", "language": "en"}),
179 /// ).await?;
180 /// ```
181 pub async fn merge(
182 &self,
183 key: impl Into<String>,
184 value: impl Into<serde_json::Value>,
185 ) -> Result<UpdateResult, IIIError> {
186 self.update(key, vec![UpdateOp::merge(value.into())]).await
187 }
188}
189
190/// Builder for creating multiple update operations
191#[derive(Debug, Clone, Default)]
192pub struct UpdateBuilder {
193 ops: Vec<UpdateOp>,
194}
195
196impl UpdateBuilder {
197 /// Create a new UpdateBuilder
198 pub fn new() -> Self {
199 Self::default()
200 }
201
202 /// Add a set operation
203 pub fn set(mut self, path: impl Into<String>, value: impl Into<serde_json::Value>) -> Self {
204 self.ops.push(UpdateOp::set(path.into(), value.into()));
205 self
206 }
207
208 /// Add an increment operation
209 pub fn increment(mut self, path: impl Into<String>, by: i64) -> Self {
210 self.ops.push(UpdateOp::increment(path.into(), by));
211 self
212 }
213
214 /// Add a decrement operation
215 pub fn decrement(mut self, path: impl Into<String>, by: i64) -> Self {
216 self.ops.push(UpdateOp::decrement(path.into(), by));
217 self
218 }
219
220 /// Add a remove operation
221 pub fn remove(mut self, path: impl Into<String>) -> Self {
222 self.ops.push(UpdateOp::remove(path.into()));
223 self
224 }
225
226 /// Add a merge operation
227 pub fn merge(mut self, value: impl Into<serde_json::Value>) -> Self {
228 self.ops.push(UpdateOp::merge(value.into()));
229 self
230 }
231
232 /// Build the list of operations
233 pub fn build(self) -> Vec<UpdateOp> {
234 self.ops
235 }
236}