Skip to main content

iii_sdk/
stream.rs

1//! Stream operations for atomic updates
2//!
3//! This module provides functionality for performing atomic updates on stream data.
4//!
5//! # Example
6//!
7//! ```rust,ignore
8//! use iii_sdk::{register_worker, InitOptions, Streams, UpdateOp};
9//!
10//! let iii = register_worker("ws://localhost:49134", InitOptions::default())?;
11//!
12//! let streams = Streams::new(iii.clone());
13//!
14//! // Atomic update with multiple operations
15//! let result = streams.update(
16//!     "my-stream::group-1::item-1",
17//!     vec![
18//!         UpdateOp::increment("counter", 1),
19//!         UpdateOp::set("lastUpdated", serde_json::json!("2024-01-01")),
20//!     ],
21//! ).await?;
22//!
23//! println!("Old value: {:?}", result.old_value);
24//! println!("New value: {:?}", result.new_value);
25//! ```
26
27use crate::{
28    TriggerRequest,
29    error::IIIError,
30    iii::III,
31    types::{StreamUpdateInput, UpdateOp, UpdateResult},
32};
33
34/// Provides atomic stream update operations
35#[derive(Clone)]
36pub struct Streams {
37    iii: III,
38}
39
40impl Streams {
41    /// Create a new Streams instance with the given iii
42    pub fn new(iii: III) -> Self {
43        Self { iii }
44    }
45
46    /// Perform an atomic update on a stream key
47    ///
48    /// All operations are applied atomically - either all succeed or none are applied.
49    ///
50    /// # Arguments
51    ///
52    /// * `key` - The stream key to update (format: "stream_name::group_id::item_id")
53    /// * `ops` - List of operations to apply atomically
54    ///
55    /// # Returns
56    ///
57    /// Returns `UpdateResult` containing the old and new values
58    ///
59    /// # Example
60    ///
61    /// ```rust,ignore
62    /// let result = streams.update(
63    ///     "orders::user-123::order-456",
64    ///     vec![
65    ///         UpdateOp::increment("total", 100),
66    ///         UpdateOp::set("status", "processing".into()),
67    ///     ],
68    /// ).await?;
69    /// ```
70    pub async fn update(
71        &self,
72        key: impl Into<String>,
73        ops: Vec<UpdateOp>,
74    ) -> Result<UpdateResult, IIIError> {
75        let key_str = key.into();
76        let parts: Vec<&str> = key_str.splitn(3, "::").collect();
77        let (stream_name, group_id, item_id) = match parts.len() {
78            3 => (
79                parts[0].to_string(),
80                parts[1].to_string(),
81                parts[2].to_string(),
82            ),
83            2 => (parts[0].to_string(), parts[1].to_string(), String::new()),
84            _ => (key_str, String::new(), String::new()),
85        };
86
87        let input = StreamUpdateInput {
88            stream_name,
89            group_id,
90            item_id,
91            ops,
92        };
93
94        let result = self
95            .iii
96            .trigger(TriggerRequest {
97                function_id: "stream::update".to_string(),
98                payload: serde_json::to_value(input).unwrap_or(serde_json::Value::Null),
99                action: None,
100                timeout_ms: None,
101            })
102            .await?;
103
104        serde_json::from_value(result).map_err(|e| IIIError::Serde(e.to_string()))
105    }
106
107    /// Atomically increment a numeric field
108    ///
109    /// Convenience method for a single increment operation.
110    ///
111    /// # Example
112    ///
113    /// ```rust,ignore
114    /// streams.increment("counters::daily::page-views", "count", 1).await?;
115    /// ```
116    pub async fn increment(
117        &self,
118        key: impl Into<String>,
119        field: impl Into<String>,
120        by: i64,
121    ) -> Result<UpdateResult, IIIError> {
122        self.update(key, vec![UpdateOp::increment(field.into(), by)])
123            .await
124    }
125
126    /// Atomically decrement a numeric field
127    ///
128    /// Convenience method for a single decrement operation.
129    pub async fn decrement(
130        &self,
131        key: impl Into<String>,
132        field: impl Into<String>,
133        by: i64,
134    ) -> Result<UpdateResult, IIIError> {
135        self.update(key, vec![UpdateOp::decrement(field.into(), by)])
136            .await
137    }
138
139    /// Atomically set a field value
140    ///
141    /// Convenience method for a single set operation.
142    ///
143    /// # Example
144    ///
145    /// ```rust,ignore
146    /// streams.set_field("users::active::user-1", "status", "online".into()).await?;
147    /// ```
148    pub async fn set_field(
149        &self,
150        key: impl Into<String>,
151        field: impl Into<String>,
152        value: impl Into<serde_json::Value>,
153    ) -> Result<UpdateResult, IIIError> {
154        self.update(key, vec![UpdateOp::set(field.into(), value.into())])
155            .await
156    }
157
158    /// Atomically remove a field
159    ///
160    /// Convenience method for a single remove operation.
161    pub async fn remove_field(
162        &self,
163        key: impl Into<String>,
164        field: impl Into<String>,
165    ) -> Result<UpdateResult, IIIError> {
166        self.update(key, vec![UpdateOp::remove(field.into())]).await
167    }
168
169    /// Atomically merge an object into the existing value
170    ///
171    /// Convenience method for a single merge operation.
172    ///
173    /// # Example
174    ///
175    /// ```rust,ignore
176    /// streams.merge(
177    ///     "settings::user-1::preferences",
178    ///     serde_json::json!({"theme": "dark", "language": "en"}),
179    /// ).await?;
180    /// ```
181    pub async fn merge(
182        &self,
183        key: impl Into<String>,
184        value: impl Into<serde_json::Value>,
185    ) -> Result<UpdateResult, IIIError> {
186        self.update(key, vec![UpdateOp::merge(value.into())]).await
187    }
188}
189
190/// Builder for creating multiple update operations
191#[derive(Debug, Clone, Default)]
192pub struct UpdateBuilder {
193    ops: Vec<UpdateOp>,
194}
195
196impl UpdateBuilder {
197    /// Create a new UpdateBuilder
198    pub fn new() -> Self {
199        Self::default()
200    }
201
202    /// Add a set operation
203    pub fn set(mut self, path: impl Into<String>, value: impl Into<serde_json::Value>) -> Self {
204        self.ops.push(UpdateOp::set(path.into(), value.into()));
205        self
206    }
207
208    /// Add an increment operation
209    pub fn increment(mut self, path: impl Into<String>, by: i64) -> Self {
210        self.ops.push(UpdateOp::increment(path.into(), by));
211        self
212    }
213
214    /// Add a decrement operation
215    pub fn decrement(mut self, path: impl Into<String>, by: i64) -> Self {
216        self.ops.push(UpdateOp::decrement(path.into(), by));
217        self
218    }
219
220    /// Add a remove operation
221    pub fn remove(mut self, path: impl Into<String>) -> Self {
222        self.ops.push(UpdateOp::remove(path.into()));
223        self
224    }
225
226    /// Add a merge operation
227    pub fn merge(mut self, value: impl Into<serde_json::Value>) -> Self {
228        self.ops.push(UpdateOp::merge(value.into()));
229        self
230    }
231
232    /// Build the list of operations
233    pub fn build(self) -> Vec<UpdateOp> {
234        self.ops
235    }
236}