Skip to main content

iii_sdk/
types.rs

1use std::{collections::HashMap, sync::Arc};
2
3use futures_util::future::BoxFuture;
4use schemars::JsonSchema;
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7
8use crate::{
9    channels::{ChannelReader, ChannelWriter, StreamChannelRef},
10    error::IIIError,
11    protocol::{RegisterFunctionMessage, RegisterTriggerTypeMessage},
12    triggers::TriggerHandler,
13};
14
15pub type RemoteFunctionHandler =
16    Arc<dyn Fn(Value) -> BoxFuture<'static, Result<Value, IIIError>> + Send + Sync>;
17
18// ============================================================================
19// Stream Update Types
20// ============================================================================
21
22/// Represents a path to a field in a JSON object
23#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, JsonSchema)]
24pub struct FieldPath(pub String);
25
26impl FieldPath {
27    pub fn new(path: impl Into<String>) -> Self {
28        Self(path.into())
29    }
30
31    pub fn root() -> Self {
32        Self(String::new())
33    }
34}
35
36impl From<&str> for FieldPath {
37    fn from(value: &str) -> Self {
38        Self(value.to_string())
39    }
40}
41
42impl From<String> for FieldPath {
43    fn from(value: String) -> Self {
44        Self(value)
45    }
46}
47
48/// Operations that can be performed atomically on a stream value
49#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
50#[serde(tag = "type", rename_all = "lowercase")]
51pub enum UpdateOp {
52    /// Set a value at path (overwrite)
53    Set {
54        path: FieldPath,
55        value: Option<Value>,
56    },
57
58    /// Merge object into existing value (object-only)
59    Merge {
60        path: Option<FieldPath>,
61        value: Value,
62    },
63
64    /// Increment numeric value
65    Increment { path: FieldPath, by: i64 },
66
67    /// Decrement numeric value
68    Decrement { path: FieldPath, by: i64 },
69
70    /// Remove a field
71    Remove { path: FieldPath },
72}
73
74impl UpdateOp {
75    /// Create a Set operation
76    pub fn set(path: impl Into<FieldPath>, value: impl Into<Option<Value>>) -> Self {
77        Self::Set {
78            path: path.into(),
79            value: value.into(),
80        }
81    }
82
83    /// Create an Increment operation
84    pub fn increment(path: impl Into<FieldPath>, by: i64) -> Self {
85        Self::Increment {
86            path: path.into(),
87            by,
88        }
89    }
90
91    /// Create a Decrement operation
92    pub fn decrement(path: impl Into<FieldPath>, by: i64) -> Self {
93        Self::Decrement {
94            path: path.into(),
95            by,
96        }
97    }
98
99    /// Create a Remove operation
100    pub fn remove(path: impl Into<FieldPath>) -> Self {
101        Self::Remove { path: path.into() }
102    }
103
104    /// Create a Merge operation at root level
105    pub fn merge(value: impl Into<Value>) -> Self {
106        Self::Merge {
107            path: None,
108            value: value.into(),
109        }
110    }
111
112    /// Create a Merge operation at a specific path
113    pub fn merge_at(path: impl Into<FieldPath>, value: impl Into<Value>) -> Self {
114        Self::Merge {
115            path: Some(path.into()),
116            value: value.into(),
117        }
118    }
119}
120
121/// Result of an atomic update operation
122#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
123pub struct UpdateResult {
124    /// The value before the update (None if key didn't exist)
125    pub old_value: Option<Value>,
126    /// The value after the update
127    pub new_value: Value,
128}
129
130#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
131pub struct SetResult {
132    /// The value before the update (None if key didn't exist)
133    pub old_value: Option<Value>,
134    /// The value after the update
135    pub new_value: Value,
136}
137
138#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
139pub struct DeleteResult {
140    /// The value before the update (None if key didn't exist)
141    pub old_value: Option<Value>,
142}
143
144/// Input for the stream update function
145#[derive(Debug, Clone, Serialize, Deserialize)]
146pub struct StreamUpdateInput {
147    pub stream_name: String,
148    pub group_id: String,
149    pub item_id: String,
150    pub ops: Vec<UpdateOp>,
151}
152
153#[derive(Clone)]
154pub struct RemoteFunctionData {
155    pub message: RegisterFunctionMessage,
156    pub handler: Option<RemoteFunctionHandler>,
157}
158
159#[derive(Clone)]
160pub struct RemoteTriggerTypeData {
161    pub message: RegisterTriggerTypeMessage,
162    pub handler: Arc<dyn TriggerHandler>,
163}
164
165#[derive(Debug, Clone, Serialize, Deserialize)]
166pub struct ApiRequest<T = Value> {
167    #[serde(default)]
168    pub query_params: HashMap<String, String>,
169    #[serde(default)]
170    pub path_params: HashMap<String, String>,
171    #[serde(default)]
172    pub headers: HashMap<String, String>,
173    #[serde(default)]
174    pub path: String,
175    #[serde(default)]
176    pub method: String,
177    #[serde(default)]
178    pub body: T,
179}
180
181#[derive(Debug, Clone, Serialize, Deserialize)]
182pub struct ApiResponse<T = Value> {
183    pub status_code: u16,
184    #[serde(default)]
185    pub headers: HashMap<String, String>,
186    pub body: T,
187}
188
189/// A streaming channel pair for worker-to-worker data transfer.
190pub struct Channel {
191    pub writer: ChannelWriter,
192    pub reader: ChannelReader,
193    pub writer_ref: StreamChannelRef,
194    pub reader_ref: StreamChannelRef,
195}
196
197#[cfg(test)]
198mod tests {
199    use super::*;
200
201    #[test]
202    fn api_request_defaults_when_missing_fields() {
203        let request: ApiRequest = serde_json::from_str("{}").unwrap();
204
205        assert!(request.query_params.is_empty());
206        assert!(request.path_params.is_empty());
207        assert!(request.headers.is_empty());
208        assert_eq!(request.path, "");
209        assert_eq!(request.method, "");
210        assert!(request.body.is_null());
211    }
212}