Skip to main content

iii_sdk/
types.rs

1use std::{collections::HashMap, sync::Arc};
2
3use futures_util::future::BoxFuture;
4use schemars::JsonSchema;
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7
8use crate::{
9    channels::{ChannelReader, ChannelWriter, StreamChannelRef},
10    error::IIIError,
11    protocol::{RegisterFunctionMessage, RegisterTriggerTypeMessage},
12    triggers::TriggerHandler,
13};
14
15pub type RemoteFunctionHandler =
16    Arc<dyn Fn(Value) -> BoxFuture<'static, Result<Value, IIIError>> + Send + Sync>;
17
18// ============================================================================
19// Stream Update Types
20// ============================================================================
21
22/// Represents a path to a field in a JSON object
23#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, JsonSchema)]
24pub struct FieldPath(pub String);
25
26impl FieldPath {
27    pub fn new(path: impl Into<String>) -> Self {
28        Self(path.into())
29    }
30
31    pub fn root() -> Self {
32        Self(String::new())
33    }
34}
35
36impl From<&str> for FieldPath {
37    fn from(value: &str) -> Self {
38        Self(value.to_string())
39    }
40}
41
42impl From<String> for FieldPath {
43    fn from(value: String) -> Self {
44        Self(value)
45    }
46}
47
48/// Path target for a [`UpdateOp::Merge`] operation. Accepts either a
49/// single string (legacy / first-level field) or an array of literal
50/// segments (nested path).
51///
52/// Path normalization rules applied by the engine:
53/// - absent / `Single("")` / `Segments(vec![])` → root merge
54/// - `Single("foo")` is equivalent to `Segments(vec!["foo".into()])`
55/// - `Segments(["a", "b", "c"])` walks three literal keys, never
56///   interpreting dots specially. `Segments(vec!["a.b".into()])` is a
57///   single literal key named `"a.b"`.
58///
59/// **Variant ordering is load-bearing.** `#[serde(untagged)]` tries
60/// variants in declaration order — `Single` MUST come before
61/// `Segments` so a JSON string deserializes into `Single` rather than
62/// failing the array match first.
63#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
64// VARIANT-ORDER-LOAD-BEARING: `Single` MUST precede `Segments` for serde
65// untagged deserialization to route bare strings to `MergePath::Single`.
66// Reordering breaks wire compatibility — string payloads would deserialize
67// as one-element `Segments`. Locked by `merge_path_single_variant_deserializes_string_first`.
68#[serde(untagged)]
69pub enum MergePath {
70    Single(String),
71    Segments(Vec<String>),
72}
73
74impl From<&str> for MergePath {
75    fn from(value: &str) -> Self {
76        Self::Single(value.to_string())
77    }
78}
79
80impl From<String> for MergePath {
81    fn from(value: String) -> Self {
82        Self::Single(value)
83    }
84}
85
86impl From<Vec<String>> for MergePath {
87    fn from(value: Vec<String>) -> Self {
88        Self::Segments(value)
89    }
90}
91
92impl From<Vec<&str>> for MergePath {
93    fn from(value: Vec<&str>) -> Self {
94        Self::Segments(value.into_iter().map(String::from).collect())
95    }
96}
97
98// Compatibility shim for callers that constructed paths via `FieldPath`
99// before `Append.path` widened to `Option<MergePath>` in PR #1552-fix.
100// `impl From<FieldPath> for Option<MergePath>` would violate Rust orphan
101// rules (both `From` and `Option` are foreign); call sites needing the
102// `Option` wrapping use `.map(Into::into)` or `Some(fp.into())`.
103impl From<FieldPath> for MergePath {
104    fn from(value: FieldPath) -> Self {
105        Self::Single(value.0)
106    }
107}
108
109/// Operations that can be performed atomically on a stream value
110#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
111#[serde(tag = "type", rename_all = "lowercase")]
112pub enum UpdateOp {
113    /// Set a value at path (overwrite)
114    Set {
115        path: FieldPath,
116        value: Option<Value>,
117    },
118
119    /// Merge object into existing value (object-only). Path may be
120    /// omitted (root merge), a single first-level key, or an array of
121    /// literal segments for nested merge. See [`MergePath`].
122    Merge {
123        #[serde(default, skip_serializing_if = "Option::is_none")]
124        path: Option<MergePath>,
125        value: Value,
126    },
127
128    /// Increment numeric value
129    Increment { path: FieldPath, by: i64 },
130
131    /// Decrement numeric value
132    Decrement { path: FieldPath, by: i64 },
133
134    /// Append an element to an array or concatenate a string at the
135    /// optional path. Path may be omitted (root append), a single
136    /// first-level key, or an array of literal segments for nested
137    /// append. See [`MergePath`] for the variant shape.
138    Append {
139        #[serde(default, skip_serializing_if = "Option::is_none")]
140        path: Option<MergePath>,
141        value: Value,
142    },
143
144    /// Remove a field
145    Remove { path: FieldPath },
146}
147
148impl UpdateOp {
149    /// Create a Set operation
150    pub fn set(path: impl Into<FieldPath>, value: impl Into<Option<Value>>) -> Self {
151        Self::Set {
152            path: path.into(),
153            value: value.into(),
154        }
155    }
156
157    /// Create an Increment operation
158    pub fn increment(path: impl Into<FieldPath>, by: i64) -> Self {
159        Self::Increment {
160            path: path.into(),
161            by,
162        }
163    }
164
165    /// Create a Decrement operation
166    pub fn decrement(path: impl Into<FieldPath>, by: i64) -> Self {
167        Self::Decrement {
168            path: path.into(),
169            by,
170        }
171    }
172
173    /// Create an Append operation at a specific path. Accepts a single
174    /// first-level key (`"foo"`) or any type that converts into
175    /// [`MergePath`] (e.g. `Vec<String>` for nested paths).
176    pub fn append(path: impl Into<MergePath>, value: impl Into<Value>) -> Self {
177        Self::Append {
178            path: Some(path.into()),
179            value: value.into(),
180        }
181    }
182
183    /// Create an Append operation at the root level (no path).
184    pub fn append_root(value: impl Into<Value>) -> Self {
185        Self::Append {
186            path: None,
187            value: value.into(),
188        }
189    }
190
191    /// Create an Append operation at a nested path of literal segments.
192    /// Convenience wrapper for `append(vec!["a", "b"], v)`.
193    pub fn append_at_path<I, S>(segments: I, value: impl Into<Value>) -> Self
194    where
195        I: IntoIterator<Item = S>,
196        S: Into<String>,
197    {
198        Self::Append {
199            path: Some(MergePath::Segments(
200                segments.into_iter().map(Into::into).collect(),
201            )),
202            value: value.into(),
203        }
204    }
205
206    /// Create a Remove operation
207    pub fn remove(path: impl Into<FieldPath>) -> Self {
208        Self::Remove { path: path.into() }
209    }
210
211    /// Create a Merge operation at root level
212    pub fn merge(value: impl Into<Value>) -> Self {
213        Self::Merge {
214            path: None,
215            value: value.into(),
216        }
217    }
218
219    /// Create a Merge operation at a specific path. Accepts a single
220    /// first-level key (`"foo"`) or any type that converts into
221    /// [`MergePath`] (e.g. `Vec<String>` for nested paths).
222    pub fn merge_at(path: impl Into<MergePath>, value: impl Into<Value>) -> Self {
223        Self::Merge {
224            path: Some(path.into()),
225            value: value.into(),
226        }
227    }
228
229    /// Create a Merge operation at a nested path of literal segments.
230    /// Convenience wrapper for `merge_at(vec!["a", "b"], v)`.
231    pub fn merge_at_path<I, S>(segments: I, value: impl Into<Value>) -> Self
232    where
233        I: IntoIterator<Item = S>,
234        S: Into<String>,
235    {
236        Self::Merge {
237            path: Some(MergePath::Segments(
238                segments.into_iter().map(Into::into).collect(),
239            )),
240            value: value.into(),
241        }
242    }
243}
244
245/// Per-op error reported by an atomic update operation.
246#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
247pub struct UpdateOpError {
248    /// Index of the offending op within the original `ops` array.
249    pub op_index: usize,
250    /// Stable error code, e.g. `"merge.path.too_deep"`.
251    pub code: String,
252    /// Human-readable description with concrete numbers when applicable.
253    pub message: String,
254    /// Optional documentation URL for this error class.
255    #[serde(default, skip_serializing_if = "Option::is_none")]
256    pub doc_url: Option<String>,
257}
258
259/// Result of an atomic update operation
260#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
261pub struct UpdateResult {
262    /// The value before the update (None if key didn't exist)
263    pub old_value: Option<Value>,
264    /// The value after the update
265    pub new_value: Value,
266    /// Errors encountered while applying ops. Successfully applied ops
267    /// are still reflected in `new_value`. Field is omitted from JSON
268    /// when empty for backward compatibility.
269    #[serde(default, skip_serializing_if = "Vec::is_empty")]
270    pub errors: Vec<UpdateOpError>,
271}
272
273#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
274pub struct SetResult {
275    /// The value before the update (None if key didn't exist)
276    pub old_value: Option<Value>,
277    /// The value after the update
278    pub new_value: Value,
279}
280
281#[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)]
282pub struct DeleteResult {
283    /// The value before the update (None if key didn't exist)
284    pub old_value: Option<Value>,
285}
286
287// ============================================================================
288// Stream Input Types
289// ============================================================================
290
291/// Input for retrieving a single stream item.
292#[derive(Debug, Clone, Serialize, Deserialize)]
293pub struct StreamGetInput {
294    pub stream_name: String,
295    pub group_id: String,
296    pub item_id: String,
297}
298
299/// Input for setting a stream item.
300#[derive(Debug, Clone, Serialize, Deserialize)]
301pub struct StreamSetInput {
302    pub stream_name: String,
303    pub group_id: String,
304    pub item_id: String,
305    pub data: Value,
306}
307
308/// Input for deleting a stream item.
309#[derive(Debug, Clone, Serialize, Deserialize)]
310pub struct StreamDeleteInput {
311    pub stream_name: String,
312    pub group_id: String,
313    pub item_id: String,
314}
315
316/// Input for listing all items in a stream group.
317#[derive(Debug, Clone, Serialize, Deserialize)]
318pub struct StreamListInput {
319    pub stream_name: String,
320    pub group_id: String,
321}
322
323/// Input for listing all groups in a stream.
324#[derive(Debug, Clone, Serialize, Deserialize)]
325pub struct StreamListGroupsInput {
326    pub stream_name: String,
327}
328
329/// Input for atomically updating a stream item.
330#[derive(Debug, Clone, Serialize, Deserialize)]
331pub struct StreamUpdateInput {
332    pub stream_name: String,
333    pub group_id: String,
334    pub item_id: String,
335    pub ops: Vec<UpdateOp>,
336}
337
338// ============================================================================
339// Stream Auth Types
340// ============================================================================
341
342/// Input for stream authentication.
343#[derive(Debug, Clone, Serialize, Deserialize)]
344pub struct StreamAuthInput {
345    pub headers: HashMap<String, String>,
346    pub path: String,
347    pub query_params: HashMap<String, Vec<String>>,
348    pub addr: String,
349}
350
351/// Result of stream authentication.
352#[derive(Debug, Clone, Serialize, Deserialize)]
353pub struct StreamAuthResult {
354    pub context: Option<Value>,
355}
356
357/// Result of a stream join request.
358#[derive(Debug, Clone, Serialize, Deserialize)]
359pub struct StreamJoinResult {
360    pub unauthorized: bool,
361}
362
363#[derive(Clone)]
364pub struct RemoteFunctionData {
365    pub message: RegisterFunctionMessage,
366    pub handler: Option<RemoteFunctionHandler>,
367}
368
369#[derive(Clone)]
370pub struct RemoteTriggerTypeData {
371    pub message: RegisterTriggerTypeMessage,
372    pub handler: Arc<dyn TriggerHandler>,
373}
374
375#[derive(Debug, Clone, Serialize, Deserialize)]
376pub struct ApiRequest<T = Value> {
377    #[serde(default)]
378    pub query_params: HashMap<String, String>,
379    #[serde(default)]
380    pub path_params: HashMap<String, String>,
381    #[serde(default)]
382    pub headers: HashMap<String, String>,
383    #[serde(default)]
384    pub path: String,
385    #[serde(default)]
386    pub method: String,
387    #[serde(default)]
388    pub body: T,
389}
390
391#[derive(Debug, Clone, Serialize, Deserialize)]
392pub struct ApiResponse<T = Value> {
393    pub status_code: u16,
394    #[serde(default)]
395    pub headers: HashMap<String, String>,
396    pub body: T,
397}
398
399/// A streaming channel pair for worker-to-worker data transfer.
400pub struct Channel {
401    pub writer: ChannelWriter,
402    pub reader: ChannelReader,
403    pub writer_ref: StreamChannelRef,
404    pub reader_ref: StreamChannelRef,
405}
406
407#[cfg(test)]
408mod tests {
409    use super::*;
410
411    #[test]
412    fn api_request_defaults_when_missing_fields() {
413        let request: ApiRequest = serde_json::from_str("{}").unwrap();
414
415        assert!(request.query_params.is_empty());
416        assert!(request.path_params.is_empty());
417        assert!(request.headers.is_empty());
418        assert_eq!(request.path, "");
419        assert_eq!(request.method, "");
420        assert!(request.body.is_null());
421    }
422
423    #[test]
424    fn update_append_serializes_as_tagged_operation() {
425        let op = UpdateOp::append("chunks", serde_json::json!({"text": "hello"}));
426        let encoded = serde_json::to_value(&op).unwrap();
427
428        assert_eq!(
429            encoded,
430            serde_json::json!({
431                "type": "append",
432                "path": "chunks",
433                "value": {"text": "hello"},
434            })
435        );
436
437        let decoded: UpdateOp = serde_json::from_value(encoded).unwrap();
438        match decoded {
439            UpdateOp::Append {
440                path: Some(MergePath::Single(s)),
441                value,
442            } => {
443                assert_eq!(s, "chunks");
444                assert_eq!(value, serde_json::json!({"text": "hello"}));
445            }
446            other => panic!("expected single-string append, got {other:?}"),
447        }
448    }
449
450    #[test]
451    fn append_with_segments_path_round_trips_as_array() {
452        let op = UpdateOp::append_at_path(["entityId", "buffer"], serde_json::json!("chunk"));
453        let encoded = serde_json::to_value(&op).unwrap();
454
455        assert_eq!(
456            encoded,
457            serde_json::json!({
458                "type": "append",
459                "path": ["entityId", "buffer"],
460                "value": "chunk",
461            })
462        );
463
464        let decoded: UpdateOp = serde_json::from_value(encoded).unwrap();
465        match decoded {
466            UpdateOp::Append {
467                path: Some(MergePath::Segments(segs)),
468                value,
469            } => {
470                assert_eq!(segs, vec!["entityId", "buffer"]);
471                assert_eq!(value, serde_json::json!("chunk"));
472            }
473            other => panic!("expected segments append, got {other:?}"),
474        }
475    }
476
477    #[test]
478    fn append_with_root_path_round_trips() {
479        let op = UpdateOp::append_root(serde_json::json!("first"));
480        let encoded = serde_json::to_value(&op).unwrap();
481
482        // `path` is None, so it is omitted entirely from the JSON
483        // wire format (no explicit `null`). Cross-SDK consumers (Node
484        // / Python / browser) decode the `path?` field as absent.
485        assert_eq!(
486            encoded,
487            serde_json::json!({
488                "type": "append",
489                "value": "first",
490            })
491        );
492
493        let decoded: UpdateOp = serde_json::from_value(encoded).unwrap();
494        match decoded {
495            UpdateOp::Append { path: None, value } => {
496                assert_eq!(value, serde_json::json!("first"));
497            }
498            other => panic!("expected root append, got {other:?}"),
499        }
500    }
501
502    #[test]
503    fn append_path_omitted_deserializes_as_none() {
504        // Wire payloads that pre-date this change may omit `path` entirely
505        // (effectively root append). Guard against that breaking. Also
506        // covers explicit `null` and missing-field deserialization paths.
507        for raw in [
508            r#"{"type":"append","value":"x"}"#,
509            r#"{"type":"append","path":null,"value":"x"}"#,
510        ] {
511            let op: UpdateOp = serde_json::from_str(raw).unwrap_or_else(|e| {
512                panic!("expected to parse {raw:?} as UpdateOp::Append, got {e}")
513            });
514            match op {
515                UpdateOp::Append { path: None, value } => {
516                    assert_eq!(value, serde_json::json!("x"));
517                }
518                other => panic!("expected root append for {raw:?}, got {other:?}"),
519            }
520        }
521    }
522
523    #[test]
524    fn append_field_path_into_merge_path_compat() {
525        // Legacy callers constructed paths via `FieldPath`; the
526        // `From<FieldPath> for MergePath` shim keeps them compiling.
527        let fp = FieldPath::new("legacy");
528        let mp: MergePath = fp.into();
529        assert_eq!(mp, MergePath::Single("legacy".to_string()));
530    }
531
532    #[test]
533    fn merge_path_single_variant_deserializes_string_first() {
534        // Regression for VARIANT-ORDER-LOAD-BEARING: bare JSON strings
535        // must deserialize to `MergePath::Single`, not a one-element
536        // `Segments`. If the variant order in `MergePath` is ever
537        // reordered (alphabetized, auto-formatted) this test fails.
538        let single: MergePath = serde_json::from_str(r#""foo""#).unwrap();
539        assert_eq!(single, MergePath::Single("foo".to_string()));
540
541        let segments: MergePath = serde_json::from_str(r#"["a","b"]"#).unwrap();
542        assert_eq!(
543            segments,
544            MergePath::Segments(vec!["a".to_string(), "b".to_string()])
545        );
546    }
547
548    #[test]
549    fn merge_with_string_path_round_trips_to_single_variant() {
550        // Regression: Single must come before Segments in the
551        // untagged enum or this test fails.
552        let op = UpdateOp::merge_at("session-abc", serde_json::json!({"author": "alice"}));
553        let encoded = serde_json::to_value(&op).unwrap();
554
555        assert_eq!(
556            encoded,
557            serde_json::json!({
558                "type": "merge",
559                "path": "session-abc",
560                "value": {"author": "alice"},
561            })
562        );
563
564        let decoded: UpdateOp = serde_json::from_value(encoded).unwrap();
565        match decoded {
566            UpdateOp::Merge {
567                path: Some(MergePath::Single(s)),
568                value,
569            } => {
570                assert_eq!(s, "session-abc");
571                assert_eq!(value, serde_json::json!({"author": "alice"}));
572            }
573            other => panic!("expected single-string merge, got {other:?}"),
574        }
575    }
576
577    #[test]
578    fn merge_with_segments_path_round_trips_as_array() {
579        let op = UpdateOp::merge_at_path(["sessions", "abc"], serde_json::json!({"ts": "chunk"}));
580        let encoded = serde_json::to_value(&op).unwrap();
581
582        assert_eq!(
583            encoded,
584            serde_json::json!({
585                "type": "merge",
586                "path": ["sessions", "abc"],
587                "value": {"ts": "chunk"},
588            })
589        );
590
591        let decoded: UpdateOp = serde_json::from_value(encoded).unwrap();
592        match decoded {
593            UpdateOp::Merge {
594                path: Some(MergePath::Segments(segs)),
595                value,
596            } => {
597                assert_eq!(segs, vec!["sessions", "abc"]);
598                assert_eq!(value, serde_json::json!({"ts": "chunk"}));
599            }
600            other => panic!("expected segments merge, got {other:?}"),
601        }
602    }
603
604    #[test]
605    fn merge_without_path_round_trips() {
606        let op = UpdateOp::merge(serde_json::json!({"x": 1}));
607        let encoded = serde_json::to_value(&op).unwrap();
608
609        // `path` is None, so it is omitted from the JSON wire format
610        // (no explicit `null`). Cross-SDK consumers decode `path?` as
611        // absent. `path: null` payloads still deserialize via the
612        // `#[serde(default)]` attribute.
613        assert_eq!(
614            encoded,
615            serde_json::json!({
616                "type": "merge",
617                "value": {"x": 1},
618            })
619        );
620
621        let decoded: UpdateOp = serde_json::from_value(encoded).unwrap();
622        match decoded {
623            UpdateOp::Merge { path: None, value } => {
624                assert_eq!(value, serde_json::json!({"x": 1}));
625            }
626            other => panic!("expected root merge, got {other:?}"),
627        }
628    }
629
630    #[test]
631    fn update_result_with_errors_serializes_field() {
632        let result = UpdateResult {
633            old_value: None,
634            new_value: serde_json::json!({"a": 1}),
635            errors: vec![UpdateOpError {
636                op_index: 0,
637                code: "merge.path.too_deep".to_string(),
638                message: "Path depth 33 exceeds maximum of 32".to_string(),
639                doc_url: Some("https://iii.dev/docs/workers/iii-state#merge-bounds".to_string()),
640            }],
641        };
642        let encoded = serde_json::to_value(&result).unwrap();
643        assert_eq!(encoded["errors"][0]["code"], "merge.path.too_deep");
644    }
645
646    #[test]
647    fn update_result_without_errors_omits_field_from_json() {
648        let result = UpdateResult {
649            old_value: None,
650            new_value: serde_json::json!({"a": 1}),
651            errors: vec![],
652        };
653        let encoded = serde_json::to_value(&result).unwrap();
654        assert!(
655            encoded.get("errors").is_none(),
656            "errors field should be omitted when empty for backward compat"
657        );
658    }
659}