Skip to main content

iii_sdk/
types.rs

1use std::{collections::HashMap, sync::Arc};
2
3use futures_util::future::BoxFuture;
4use schemars::JsonSchema;
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7
8use crate::{
9    channels::{ChannelReader, ChannelWriter, StreamChannelRef},
10    error::IIIError,
11    protocol::{RegisterFunctionMessage, RegisterTriggerTypeMessage},
12    triggers::TriggerHandler,
13};
14
15pub type RemoteFunctionHandler =
16    Arc<dyn Fn(Value) -> BoxFuture<'static, Result<Value, IIIError>> + Send + Sync>;
17
18// ============================================================================
19// Stream Update Types
20// ============================================================================
21
22/// Path target for a [`UpdateOp::Merge`] operation. Accepts either a
23/// single string (legacy / first-level field) or an array of literal
24/// segments (nested path).
25///
26/// Path normalization rules applied by the engine:
27/// - absent / `Single("")` / `Segments(vec![])` → root merge
28/// - `Single("foo")` is equivalent to `Segments(vec!["foo".into()])`
29/// - `Segments(["a", "b", "c"])` walks three literal keys, never
30///   interpreting dots specially. `Segments(vec!["a.b".into()])` is a
31///   single literal key named `"a.b"`.
32///
33/// **Variant ordering is load-bearing.** `#[serde(untagged)]` tries
34/// variants in declaration order — `Single` MUST come before
35/// `Segments` so a JSON string deserializes into `Single` rather than
36/// failing the array match first.
37#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
38// VARIANT-ORDER-LOAD-BEARING: `Single` MUST precede `Segments` for serde
39// untagged deserialization to route bare strings to `MergePath::Single`.
40// Reordering breaks wire compatibility — string payloads would deserialize
41// as one-element `Segments`. Locked by `merge_path_single_variant_deserializes_string_first`.
42#[serde(untagged)]
43pub enum MergePath {
44    Single(String),
45    Segments(Vec<String>),
46}
47
48impl From<&str> for MergePath {
49    fn from(value: &str) -> Self {
50        Self::Single(value.to_string())
51    }
52}
53
54impl From<String> for MergePath {
55    fn from(value: String) -> Self {
56        Self::Single(value)
57    }
58}
59
60impl From<Vec<String>> for MergePath {
61    fn from(value: Vec<String>) -> Self {
62        Self::Segments(value)
63    }
64}
65
66impl From<Vec<&str>> for MergePath {
67    fn from(value: Vec<&str>) -> Self {
68        Self::Segments(value.into_iter().map(String::from).collect())
69    }
70}
71
72/// Operations that can be performed atomically on a stream value
73#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
74#[serde(tag = "type", rename_all = "lowercase")]
75pub enum UpdateOp {
76    /// Set a value at path (overwrite)
77    Set { path: String, value: Option<Value> },
78
79    /// Merge object into existing value (object-only). Path may be
80    /// omitted (root merge), a single first-level key, or an array of
81    /// literal segments for nested merge. See [`MergePath`].
82    Merge {
83        #[serde(default, skip_serializing_if = "Option::is_none")]
84        path: Option<MergePath>,
85        value: Value,
86    },
87
88    /// Increment numeric value
89    Increment { path: String, by: i64 },
90
91    /// Decrement numeric value
92    Decrement { path: String, by: i64 },
93
94    /// Append an element to an array or concatenate a string at the
95    /// optional path. Path may be omitted (root append), a single
96    /// first-level key, or an array of literal segments for nested
97    /// append. See [`MergePath`] for the variant shape.
98    Append {
99        #[serde(default, skip_serializing_if = "Option::is_none")]
100        path: Option<MergePath>,
101        value: Value,
102    },
103
104    /// Remove a field
105    Remove { path: String },
106}
107
108impl UpdateOp {
109    /// Create a Set operation
110    pub fn set(path: impl Into<String>, value: impl Into<Option<Value>>) -> Self {
111        Self::Set {
112            path: path.into(),
113            value: value.into(),
114        }
115    }
116
117    /// Create an Increment operation
118    pub fn increment(path: impl Into<String>, by: i64) -> Self {
119        Self::Increment {
120            path: path.into(),
121            by,
122        }
123    }
124
125    /// Create a Decrement operation
126    pub fn decrement(path: impl Into<String>, by: i64) -> Self {
127        Self::Decrement {
128            path: path.into(),
129            by,
130        }
131    }
132
133    /// Create an Append operation at a specific path. Accepts a single
134    /// first-level key (`"foo"`) or any type that converts into
135    /// [`MergePath`] (e.g. `Vec<String>` for nested paths).
136    pub fn append(path: impl Into<MergePath>, value: impl Into<Value>) -> Self {
137        Self::Append {
138            path: Some(path.into()),
139            value: value.into(),
140        }
141    }
142
143    /// Create an Append operation at the root level (no path).
144    pub fn append_root(value: impl Into<Value>) -> Self {
145        Self::Append {
146            path: None,
147            value: value.into(),
148        }
149    }
150
151    /// Create an Append operation at a nested path of literal segments.
152    /// Convenience wrapper for `append(vec!["a", "b"], v)`.
153    pub fn append_at_path<I, S>(segments: I, value: impl Into<Value>) -> Self
154    where
155        I: IntoIterator<Item = S>,
156        S: Into<String>,
157    {
158        Self::Append {
159            path: Some(MergePath::Segments(
160                segments.into_iter().map(Into::into).collect(),
161            )),
162            value: value.into(),
163        }
164    }
165
166    /// Create a Remove operation
167    pub fn remove(path: impl Into<String>) -> Self {
168        Self::Remove { path: path.into() }
169    }
170
171    /// Create a Merge operation at root level
172    pub fn merge(value: impl Into<Value>) -> Self {
173        Self::Merge {
174            path: None,
175            value: value.into(),
176        }
177    }
178
179    /// Create a Merge operation at a specific path. Accepts a single
180    /// first-level key (`"foo"`) or any type that converts into
181    /// [`MergePath`] (e.g. `Vec<String>` for nested paths).
182    pub fn merge_at(path: impl Into<MergePath>, value: impl Into<Value>) -> Self {
183        Self::Merge {
184            path: Some(path.into()),
185            value: value.into(),
186        }
187    }
188
189    /// Create a Merge operation at a nested path of literal segments.
190    /// Convenience wrapper for `merge_at(vec!["a", "b"], v)`.
191    pub fn merge_at_path<I, S>(segments: I, value: impl Into<Value>) -> Self
192    where
193        I: IntoIterator<Item = S>,
194        S: Into<String>,
195    {
196        Self::Merge {
197            path: Some(MergePath::Segments(
198                segments.into_iter().map(Into::into).collect(),
199            )),
200            value: value.into(),
201        }
202    }
203}
204
205/// Per-op error reported by an atomic update operation.
206#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
207pub struct UpdateOpError {
208    /// Index of the offending op within the original `ops` array.
209    pub op_index: usize,
210    /// Stable error code, e.g. `"merge.path.too_deep"`.
211    pub code: String,
212    /// Human-readable description with concrete numbers when applicable.
213    pub message: String,
214    /// Optional documentation URL for this error class.
215    #[serde(default, skip_serializing_if = "Option::is_none")]
216    pub doc_url: Option<String>,
217}
218
219/// Result of an atomic update operation
220#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
221pub struct UpdateResult {
222    /// The value before the update (None if key didn't exist)
223    pub old_value: Option<Value>,
224    /// The value after the update
225    pub new_value: Value,
226    /// Errors encountered while applying ops. Successfully applied ops
227    /// are still reflected in `new_value`. Field is omitted from JSON
228    /// when empty for backward compatibility.
229    #[serde(default, skip_serializing_if = "Vec::is_empty")]
230    pub errors: Vec<UpdateOpError>,
231}
232
233#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
234pub struct SetResult {
235    /// The value before the update (None if key didn't exist)
236    pub old_value: Option<Value>,
237    /// The value after the update
238    pub new_value: Value,
239}
240
241#[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)]
242pub struct DeleteResult {
243    /// The value before the update (None if key didn't exist)
244    pub old_value: Option<Value>,
245}
246
247// ============================================================================
248// Stream Input Types
249// ============================================================================
250
251/// Input for retrieving a single stream item.
252#[derive(Debug, Clone, Serialize, Deserialize)]
253pub struct StreamGetInput {
254    pub stream_name: String,
255    pub group_id: String,
256    pub item_id: String,
257}
258
259/// Input for setting a stream item.
260#[derive(Debug, Clone, Serialize, Deserialize)]
261pub struct StreamSetInput {
262    pub stream_name: String,
263    pub group_id: String,
264    pub item_id: String,
265    pub data: Value,
266}
267
268/// Input for deleting a stream item.
269#[derive(Debug, Clone, Serialize, Deserialize)]
270pub struct StreamDeleteInput {
271    pub stream_name: String,
272    pub group_id: String,
273    pub item_id: String,
274}
275
276/// Input for listing all items in a stream group.
277#[derive(Debug, Clone, Serialize, Deserialize)]
278pub struct StreamListInput {
279    pub stream_name: String,
280    pub group_id: String,
281}
282
283/// Input for listing all groups in a stream.
284#[derive(Debug, Clone, Serialize, Deserialize)]
285pub struct StreamListGroupsInput {
286    pub stream_name: String,
287}
288
289/// Input for atomically updating a stream item.
290#[derive(Debug, Clone, Serialize, Deserialize)]
291pub struct StreamUpdateInput {
292    pub stream_name: String,
293    pub group_id: String,
294    pub item_id: String,
295    pub ops: Vec<UpdateOp>,
296}
297
298// ============================================================================
299// Stream Auth Types
300// ============================================================================
301
302/// Input for stream authentication.
303#[derive(Debug, Clone, Serialize, Deserialize)]
304pub struct StreamAuthInput {
305    pub headers: HashMap<String, String>,
306    pub path: String,
307    pub query_params: HashMap<String, Vec<String>>,
308    pub addr: String,
309}
310
311/// Result of stream authentication.
312#[derive(Debug, Clone, Serialize, Deserialize)]
313pub struct StreamAuthResult {
314    pub context: Option<Value>,
315}
316
317/// Result of a stream join request.
318#[derive(Debug, Clone, Serialize, Deserialize)]
319pub struct StreamJoinResult {
320    pub unauthorized: bool,
321}
322
323#[derive(Clone)]
324pub struct RemoteFunctionData {
325    pub message: RegisterFunctionMessage,
326    pub handler: Option<RemoteFunctionHandler>,
327}
328
329#[derive(Clone)]
330pub struct RemoteTriggerTypeData {
331    pub message: RegisterTriggerTypeMessage,
332    pub handler: Arc<dyn TriggerHandler>,
333}
334
335#[derive(Debug, Clone, Serialize, Deserialize)]
336pub struct ApiRequest<T = Value> {
337    #[serde(default)]
338    pub query_params: HashMap<String, String>,
339    #[serde(default)]
340    pub path_params: HashMap<String, String>,
341    #[serde(default)]
342    pub headers: HashMap<String, String>,
343    #[serde(default)]
344    pub path: String,
345    #[serde(default)]
346    pub method: String,
347    #[serde(default)]
348    pub body: T,
349}
350
351#[derive(Debug, Clone, Serialize, Deserialize)]
352pub struct ApiResponse<T = Value> {
353    pub status_code: u16,
354    #[serde(default)]
355    pub headers: HashMap<String, String>,
356    pub body: T,
357}
358
359/// A streaming channel pair for worker-to-worker data transfer.
360pub struct Channel {
361    pub writer: ChannelWriter,
362    pub reader: ChannelReader,
363    pub writer_ref: StreamChannelRef,
364    pub reader_ref: StreamChannelRef,
365}
366
367#[cfg(test)]
368mod tests {
369    use super::*;
370
371    #[test]
372    fn api_request_defaults_when_missing_fields() {
373        let request: ApiRequest = serde_json::from_str("{}").unwrap();
374
375        assert!(request.query_params.is_empty());
376        assert!(request.path_params.is_empty());
377        assert!(request.headers.is_empty());
378        assert_eq!(request.path, "");
379        assert_eq!(request.method, "");
380        assert!(request.body.is_null());
381    }
382
383    #[test]
384    fn update_append_serializes_as_tagged_operation() {
385        let op = UpdateOp::append("chunks", serde_json::json!({"text": "hello"}));
386        let encoded = serde_json::to_value(&op).unwrap();
387
388        assert_eq!(
389            encoded,
390            serde_json::json!({
391                "type": "append",
392                "path": "chunks",
393                "value": {"text": "hello"},
394            })
395        );
396
397        let decoded: UpdateOp = serde_json::from_value(encoded).unwrap();
398        match decoded {
399            UpdateOp::Append {
400                path: Some(MergePath::Single(s)),
401                value,
402            } => {
403                assert_eq!(s, "chunks");
404                assert_eq!(value, serde_json::json!({"text": "hello"}));
405            }
406            other => panic!("expected single-string append, got {other:?}"),
407        }
408    }
409
410    #[test]
411    fn append_with_segments_path_round_trips_as_array() {
412        let op = UpdateOp::append_at_path(["entityId", "buffer"], serde_json::json!("chunk"));
413        let encoded = serde_json::to_value(&op).unwrap();
414
415        assert_eq!(
416            encoded,
417            serde_json::json!({
418                "type": "append",
419                "path": ["entityId", "buffer"],
420                "value": "chunk",
421            })
422        );
423
424        let decoded: UpdateOp = serde_json::from_value(encoded).unwrap();
425        match decoded {
426            UpdateOp::Append {
427                path: Some(MergePath::Segments(segs)),
428                value,
429            } => {
430                assert_eq!(segs, vec!["entityId", "buffer"]);
431                assert_eq!(value, serde_json::json!("chunk"));
432            }
433            other => panic!("expected segments append, got {other:?}"),
434        }
435    }
436
437    #[test]
438    fn append_with_root_path_round_trips() {
439        let op = UpdateOp::append_root(serde_json::json!("first"));
440        let encoded = serde_json::to_value(&op).unwrap();
441
442        // `path` is None, so it is omitted entirely from the JSON
443        // wire format (no explicit `null`). Cross-SDK consumers (Node
444        // / Python / browser) decode the `path?` field as absent.
445        assert_eq!(
446            encoded,
447            serde_json::json!({
448                "type": "append",
449                "value": "first",
450            })
451        );
452
453        let decoded: UpdateOp = serde_json::from_value(encoded).unwrap();
454        match decoded {
455            UpdateOp::Append { path: None, value } => {
456                assert_eq!(value, serde_json::json!("first"));
457            }
458            other => panic!("expected root append, got {other:?}"),
459        }
460    }
461
462    #[test]
463    fn append_path_omitted_deserializes_as_none() {
464        // Wire payloads that pre-date this change may omit `path` entirely
465        // (effectively root append). Guard against that breaking. Also
466        // covers explicit `null` and missing-field deserialization paths.
467        for raw in [
468            r#"{"type":"append","value":"x"}"#,
469            r#"{"type":"append","path":null,"value":"x"}"#,
470        ] {
471            let op: UpdateOp = serde_json::from_str(raw).unwrap_or_else(|e| {
472                panic!("expected to parse {raw:?} as UpdateOp::Append, got {e}")
473            });
474            match op {
475                UpdateOp::Append { path: None, value } => {
476                    assert_eq!(value, serde_json::json!("x"));
477                }
478                other => panic!("expected root append for {raw:?}, got {other:?}"),
479            }
480        }
481    }
482
483    #[test]
484    fn merge_path_single_variant_deserializes_string_first() {
485        // Regression for VARIANT-ORDER-LOAD-BEARING: bare JSON strings
486        // must deserialize to `MergePath::Single`, not a one-element
487        // `Segments`. If the variant order in `MergePath` is ever
488        // reordered (alphabetized, auto-formatted) this test fails.
489        let single: MergePath = serde_json::from_str(r#""foo""#).unwrap();
490        assert_eq!(single, MergePath::Single("foo".to_string()));
491
492        let segments: MergePath = serde_json::from_str(r#"["a","b"]"#).unwrap();
493        assert_eq!(
494            segments,
495            MergePath::Segments(vec!["a".to_string(), "b".to_string()])
496        );
497    }
498
499    #[test]
500    fn merge_with_string_path_round_trips_to_single_variant() {
501        // Regression: Single must come before Segments in the
502        // untagged enum or this test fails.
503        let op = UpdateOp::merge_at("session-abc", serde_json::json!({"author": "alice"}));
504        let encoded = serde_json::to_value(&op).unwrap();
505
506        assert_eq!(
507            encoded,
508            serde_json::json!({
509                "type": "merge",
510                "path": "session-abc",
511                "value": {"author": "alice"},
512            })
513        );
514
515        let decoded: UpdateOp = serde_json::from_value(encoded).unwrap();
516        match decoded {
517            UpdateOp::Merge {
518                path: Some(MergePath::Single(s)),
519                value,
520            } => {
521                assert_eq!(s, "session-abc");
522                assert_eq!(value, serde_json::json!({"author": "alice"}));
523            }
524            other => panic!("expected single-string merge, got {other:?}"),
525        }
526    }
527
528    #[test]
529    fn merge_with_segments_path_round_trips_as_array() {
530        let op = UpdateOp::merge_at_path(["sessions", "abc"], serde_json::json!({"ts": "chunk"}));
531        let encoded = serde_json::to_value(&op).unwrap();
532
533        assert_eq!(
534            encoded,
535            serde_json::json!({
536                "type": "merge",
537                "path": ["sessions", "abc"],
538                "value": {"ts": "chunk"},
539            })
540        );
541
542        let decoded: UpdateOp = serde_json::from_value(encoded).unwrap();
543        match decoded {
544            UpdateOp::Merge {
545                path: Some(MergePath::Segments(segs)),
546                value,
547            } => {
548                assert_eq!(segs, vec!["sessions", "abc"]);
549                assert_eq!(value, serde_json::json!({"ts": "chunk"}));
550            }
551            other => panic!("expected segments merge, got {other:?}"),
552        }
553    }
554
555    #[test]
556    fn merge_without_path_round_trips() {
557        let op = UpdateOp::merge(serde_json::json!({"x": 1}));
558        let encoded = serde_json::to_value(&op).unwrap();
559
560        // `path` is None, so it is omitted from the JSON wire format
561        // (no explicit `null`). Cross-SDK consumers decode `path?` as
562        // absent. `path: null` payloads still deserialize via the
563        // `#[serde(default)]` attribute.
564        assert_eq!(
565            encoded,
566            serde_json::json!({
567                "type": "merge",
568                "value": {"x": 1},
569            })
570        );
571
572        let decoded: UpdateOp = serde_json::from_value(encoded).unwrap();
573        match decoded {
574            UpdateOp::Merge { path: None, value } => {
575                assert_eq!(value, serde_json::json!({"x": 1}));
576            }
577            other => panic!("expected root merge, got {other:?}"),
578        }
579    }
580
581    #[test]
582    fn update_result_with_errors_serializes_field() {
583        let result = UpdateResult {
584            old_value: None,
585            new_value: serde_json::json!({"a": 1}),
586            errors: vec![UpdateOpError {
587                op_index: 0,
588                code: "merge.path.too_deep".to_string(),
589                message: "Path depth 33 exceeds maximum of 32".to_string(),
590                doc_url: Some("https://iii.dev/docs/workers/iii-state#merge-bounds".to_string()),
591            }],
592        };
593        let encoded = serde_json::to_value(&result).unwrap();
594        assert_eq!(encoded["errors"][0]["code"], "merge.path.too_deep");
595    }
596
597    #[test]
598    fn update_result_without_errors_omits_field_from_json() {
599        let result = UpdateResult {
600            old_value: None,
601            new_value: serde_json::json!({"a": 1}),
602            errors: vec![],
603        };
604        let encoded = serde_json::to_value(&result).unwrap();
605        assert!(
606            encoded.get("errors").is_none(),
607            "errors field should be omitted when empty for backward compat"
608        );
609    }
610}