1use std::{collections::HashMap, sync::Arc};
2
3use futures_util::future::BoxFuture;
4use schemars::JsonSchema;
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7
8use crate::{
9 channels::{ChannelReader, ChannelWriter, StreamChannelRef},
10 error::IIIError,
11 protocol::{RegisterFunctionMessage, RegisterTriggerTypeMessage},
12 triggers::TriggerHandler,
13};
14
15pub type RemoteFunctionHandler =
16 Arc<dyn Fn(Value) -> BoxFuture<'static, Result<Value, IIIError>> + Send + Sync>;
17
18#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, JsonSchema)]
24pub struct FieldPath(pub String);
25
26impl FieldPath {
27 pub fn new(path: impl Into<String>) -> Self {
28 Self(path.into())
29 }
30
31 pub fn root() -> Self {
32 Self(String::new())
33 }
34}
35
36impl From<&str> for FieldPath {
37 fn from(value: &str) -> Self {
38 Self(value.to_string())
39 }
40}
41
42impl From<String> for FieldPath {
43 fn from(value: String) -> Self {
44 Self(value)
45 }
46}
47
48#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
64#[serde(untagged)]
69pub enum MergePath {
70 Single(String),
71 Segments(Vec<String>),
72}
73
74impl From<&str> for MergePath {
75 fn from(value: &str) -> Self {
76 Self::Single(value.to_string())
77 }
78}
79
80impl From<String> for MergePath {
81 fn from(value: String) -> Self {
82 Self::Single(value)
83 }
84}
85
86impl From<Vec<String>> for MergePath {
87 fn from(value: Vec<String>) -> Self {
88 Self::Segments(value)
89 }
90}
91
92impl From<Vec<&str>> for MergePath {
93 fn from(value: Vec<&str>) -> Self {
94 Self::Segments(value.into_iter().map(String::from).collect())
95 }
96}
97
98impl From<FieldPath> for MergePath {
104 fn from(value: FieldPath) -> Self {
105 Self::Single(value.0)
106 }
107}
108
109#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
111#[serde(tag = "type", rename_all = "lowercase")]
112pub enum UpdateOp {
113 Set {
115 path: FieldPath,
116 value: Option<Value>,
117 },
118
119 Merge {
123 #[serde(default, skip_serializing_if = "Option::is_none")]
124 path: Option<MergePath>,
125 value: Value,
126 },
127
128 Increment { path: FieldPath, by: i64 },
130
131 Decrement { path: FieldPath, by: i64 },
133
134 Append {
139 #[serde(default, skip_serializing_if = "Option::is_none")]
140 path: Option<MergePath>,
141 value: Value,
142 },
143
144 Remove { path: FieldPath },
146}
147
148impl UpdateOp {
149 pub fn set(path: impl Into<FieldPath>, value: impl Into<Option<Value>>) -> Self {
151 Self::Set {
152 path: path.into(),
153 value: value.into(),
154 }
155 }
156
157 pub fn increment(path: impl Into<FieldPath>, by: i64) -> Self {
159 Self::Increment {
160 path: path.into(),
161 by,
162 }
163 }
164
165 pub fn decrement(path: impl Into<FieldPath>, by: i64) -> Self {
167 Self::Decrement {
168 path: path.into(),
169 by,
170 }
171 }
172
173 pub fn append(path: impl Into<MergePath>, value: impl Into<Value>) -> Self {
177 Self::Append {
178 path: Some(path.into()),
179 value: value.into(),
180 }
181 }
182
183 pub fn append_root(value: impl Into<Value>) -> Self {
185 Self::Append {
186 path: None,
187 value: value.into(),
188 }
189 }
190
191 pub fn append_at_path<I, S>(segments: I, value: impl Into<Value>) -> Self
194 where
195 I: IntoIterator<Item = S>,
196 S: Into<String>,
197 {
198 Self::Append {
199 path: Some(MergePath::Segments(
200 segments.into_iter().map(Into::into).collect(),
201 )),
202 value: value.into(),
203 }
204 }
205
206 pub fn remove(path: impl Into<FieldPath>) -> Self {
208 Self::Remove { path: path.into() }
209 }
210
211 pub fn merge(value: impl Into<Value>) -> Self {
213 Self::Merge {
214 path: None,
215 value: value.into(),
216 }
217 }
218
219 pub fn merge_at(path: impl Into<MergePath>, value: impl Into<Value>) -> Self {
223 Self::Merge {
224 path: Some(path.into()),
225 value: value.into(),
226 }
227 }
228
229 pub fn merge_at_path<I, S>(segments: I, value: impl Into<Value>) -> Self
232 where
233 I: IntoIterator<Item = S>,
234 S: Into<String>,
235 {
236 Self::Merge {
237 path: Some(MergePath::Segments(
238 segments.into_iter().map(Into::into).collect(),
239 )),
240 value: value.into(),
241 }
242 }
243}
244
245#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
247pub struct UpdateOpError {
248 pub op_index: usize,
250 pub code: String,
252 pub message: String,
254 #[serde(default, skip_serializing_if = "Option::is_none")]
256 pub doc_url: Option<String>,
257}
258
259#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
261pub struct UpdateResult {
262 pub old_value: Option<Value>,
264 pub new_value: Value,
266 #[serde(default, skip_serializing_if = "Vec::is_empty")]
270 pub errors: Vec<UpdateOpError>,
271}
272
273#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
274pub struct SetResult {
275 pub old_value: Option<Value>,
277 pub new_value: Value,
279}
280
281#[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)]
282pub struct DeleteResult {
283 pub old_value: Option<Value>,
285}
286
287#[derive(Debug, Clone, Serialize, Deserialize)]
293pub struct StreamGetInput {
294 pub stream_name: String,
295 pub group_id: String,
296 pub item_id: String,
297}
298
299#[derive(Debug, Clone, Serialize, Deserialize)]
301pub struct StreamSetInput {
302 pub stream_name: String,
303 pub group_id: String,
304 pub item_id: String,
305 pub data: Value,
306}
307
308#[derive(Debug, Clone, Serialize, Deserialize)]
310pub struct StreamDeleteInput {
311 pub stream_name: String,
312 pub group_id: String,
313 pub item_id: String,
314}
315
316#[derive(Debug, Clone, Serialize, Deserialize)]
318pub struct StreamListInput {
319 pub stream_name: String,
320 pub group_id: String,
321}
322
323#[derive(Debug, Clone, Serialize, Deserialize)]
325pub struct StreamListGroupsInput {
326 pub stream_name: String,
327}
328
329#[derive(Debug, Clone, Serialize, Deserialize)]
331pub struct StreamUpdateInput {
332 pub stream_name: String,
333 pub group_id: String,
334 pub item_id: String,
335 pub ops: Vec<UpdateOp>,
336}
337
338#[derive(Debug, Clone, Serialize, Deserialize)]
344pub struct StreamAuthInput {
345 pub headers: HashMap<String, String>,
346 pub path: String,
347 pub query_params: HashMap<String, Vec<String>>,
348 pub addr: String,
349}
350
351#[derive(Debug, Clone, Serialize, Deserialize)]
353pub struct StreamAuthResult {
354 pub context: Option<Value>,
355}
356
357#[derive(Debug, Clone, Serialize, Deserialize)]
359pub struct StreamJoinResult {
360 pub unauthorized: bool,
361}
362
363#[derive(Clone)]
364pub struct RemoteFunctionData {
365 pub message: RegisterFunctionMessage,
366 pub handler: Option<RemoteFunctionHandler>,
367}
368
369#[derive(Clone)]
370pub struct RemoteTriggerTypeData {
371 pub message: RegisterTriggerTypeMessage,
372 pub handler: Arc<dyn TriggerHandler>,
373}
374
375#[derive(Debug, Clone, Serialize, Deserialize)]
376pub struct ApiRequest<T = Value> {
377 #[serde(default)]
378 pub query_params: HashMap<String, String>,
379 #[serde(default)]
380 pub path_params: HashMap<String, String>,
381 #[serde(default)]
382 pub headers: HashMap<String, String>,
383 #[serde(default)]
384 pub path: String,
385 #[serde(default)]
386 pub method: String,
387 #[serde(default)]
388 pub body: T,
389}
390
391#[derive(Debug, Clone, Serialize, Deserialize)]
392pub struct ApiResponse<T = Value> {
393 pub status_code: u16,
394 #[serde(default)]
395 pub headers: HashMap<String, String>,
396 pub body: T,
397}
398
399pub struct Channel {
401 pub writer: ChannelWriter,
402 pub reader: ChannelReader,
403 pub writer_ref: StreamChannelRef,
404 pub reader_ref: StreamChannelRef,
405}
406
407#[cfg(test)]
408mod tests {
409 use super::*;
410
411 #[test]
412 fn api_request_defaults_when_missing_fields() {
413 let request: ApiRequest = serde_json::from_str("{}").unwrap();
414
415 assert!(request.query_params.is_empty());
416 assert!(request.path_params.is_empty());
417 assert!(request.headers.is_empty());
418 assert_eq!(request.path, "");
419 assert_eq!(request.method, "");
420 assert!(request.body.is_null());
421 }
422
423 #[test]
424 fn update_append_serializes_as_tagged_operation() {
425 let op = UpdateOp::append("chunks", serde_json::json!({"text": "hello"}));
426 let encoded = serde_json::to_value(&op).unwrap();
427
428 assert_eq!(
429 encoded,
430 serde_json::json!({
431 "type": "append",
432 "path": "chunks",
433 "value": {"text": "hello"},
434 })
435 );
436
437 let decoded: UpdateOp = serde_json::from_value(encoded).unwrap();
438 match decoded {
439 UpdateOp::Append {
440 path: Some(MergePath::Single(s)),
441 value,
442 } => {
443 assert_eq!(s, "chunks");
444 assert_eq!(value, serde_json::json!({"text": "hello"}));
445 }
446 other => panic!("expected single-string append, got {other:?}"),
447 }
448 }
449
450 #[test]
451 fn append_with_segments_path_round_trips_as_array() {
452 let op = UpdateOp::append_at_path(["entityId", "buffer"], serde_json::json!("chunk"));
453 let encoded = serde_json::to_value(&op).unwrap();
454
455 assert_eq!(
456 encoded,
457 serde_json::json!({
458 "type": "append",
459 "path": ["entityId", "buffer"],
460 "value": "chunk",
461 })
462 );
463
464 let decoded: UpdateOp = serde_json::from_value(encoded).unwrap();
465 match decoded {
466 UpdateOp::Append {
467 path: Some(MergePath::Segments(segs)),
468 value,
469 } => {
470 assert_eq!(segs, vec!["entityId", "buffer"]);
471 assert_eq!(value, serde_json::json!("chunk"));
472 }
473 other => panic!("expected segments append, got {other:?}"),
474 }
475 }
476
477 #[test]
478 fn append_with_root_path_round_trips() {
479 let op = UpdateOp::append_root(serde_json::json!("first"));
480 let encoded = serde_json::to_value(&op).unwrap();
481
482 assert_eq!(
486 encoded,
487 serde_json::json!({
488 "type": "append",
489 "value": "first",
490 })
491 );
492
493 let decoded: UpdateOp = serde_json::from_value(encoded).unwrap();
494 match decoded {
495 UpdateOp::Append { path: None, value } => {
496 assert_eq!(value, serde_json::json!("first"));
497 }
498 other => panic!("expected root append, got {other:?}"),
499 }
500 }
501
502 #[test]
503 fn append_path_omitted_deserializes_as_none() {
504 for raw in [
508 r#"{"type":"append","value":"x"}"#,
509 r#"{"type":"append","path":null,"value":"x"}"#,
510 ] {
511 let op: UpdateOp = serde_json::from_str(raw).unwrap_or_else(|e| {
512 panic!("expected to parse {raw:?} as UpdateOp::Append, got {e}")
513 });
514 match op {
515 UpdateOp::Append { path: None, value } => {
516 assert_eq!(value, serde_json::json!("x"));
517 }
518 other => panic!("expected root append for {raw:?}, got {other:?}"),
519 }
520 }
521 }
522
523 #[test]
524 fn append_field_path_into_merge_path_compat() {
525 let fp = FieldPath::new("legacy");
528 let mp: MergePath = fp.into();
529 assert_eq!(mp, MergePath::Single("legacy".to_string()));
530 }
531
532 #[test]
533 fn merge_path_single_variant_deserializes_string_first() {
534 let single: MergePath = serde_json::from_str(r#""foo""#).unwrap();
539 assert_eq!(single, MergePath::Single("foo".to_string()));
540
541 let segments: MergePath = serde_json::from_str(r#"["a","b"]"#).unwrap();
542 assert_eq!(
543 segments,
544 MergePath::Segments(vec!["a".to_string(), "b".to_string()])
545 );
546 }
547
548 #[test]
549 fn merge_with_string_path_round_trips_to_single_variant() {
550 let op = UpdateOp::merge_at("session-abc", serde_json::json!({"author": "alice"}));
553 let encoded = serde_json::to_value(&op).unwrap();
554
555 assert_eq!(
556 encoded,
557 serde_json::json!({
558 "type": "merge",
559 "path": "session-abc",
560 "value": {"author": "alice"},
561 })
562 );
563
564 let decoded: UpdateOp = serde_json::from_value(encoded).unwrap();
565 match decoded {
566 UpdateOp::Merge {
567 path: Some(MergePath::Single(s)),
568 value,
569 } => {
570 assert_eq!(s, "session-abc");
571 assert_eq!(value, serde_json::json!({"author": "alice"}));
572 }
573 other => panic!("expected single-string merge, got {other:?}"),
574 }
575 }
576
577 #[test]
578 fn merge_with_segments_path_round_trips_as_array() {
579 let op = UpdateOp::merge_at_path(["sessions", "abc"], serde_json::json!({"ts": "chunk"}));
580 let encoded = serde_json::to_value(&op).unwrap();
581
582 assert_eq!(
583 encoded,
584 serde_json::json!({
585 "type": "merge",
586 "path": ["sessions", "abc"],
587 "value": {"ts": "chunk"},
588 })
589 );
590
591 let decoded: UpdateOp = serde_json::from_value(encoded).unwrap();
592 match decoded {
593 UpdateOp::Merge {
594 path: Some(MergePath::Segments(segs)),
595 value,
596 } => {
597 assert_eq!(segs, vec!["sessions", "abc"]);
598 assert_eq!(value, serde_json::json!({"ts": "chunk"}));
599 }
600 other => panic!("expected segments merge, got {other:?}"),
601 }
602 }
603
604 #[test]
605 fn merge_without_path_round_trips() {
606 let op = UpdateOp::merge(serde_json::json!({"x": 1}));
607 let encoded = serde_json::to_value(&op).unwrap();
608
609 assert_eq!(
614 encoded,
615 serde_json::json!({
616 "type": "merge",
617 "value": {"x": 1},
618 })
619 );
620
621 let decoded: UpdateOp = serde_json::from_value(encoded).unwrap();
622 match decoded {
623 UpdateOp::Merge { path: None, value } => {
624 assert_eq!(value, serde_json::json!({"x": 1}));
625 }
626 other => panic!("expected root merge, got {other:?}"),
627 }
628 }
629
630 #[test]
631 fn update_result_with_errors_serializes_field() {
632 let result = UpdateResult {
633 old_value: None,
634 new_value: serde_json::json!({"a": 1}),
635 errors: vec![UpdateOpError {
636 op_index: 0,
637 code: "merge.path.too_deep".to_string(),
638 message: "Path depth 33 exceeds maximum of 32".to_string(),
639 doc_url: Some("https://iii.dev/docs/workers/iii-state#merge-bounds".to_string()),
640 }],
641 };
642 let encoded = serde_json::to_value(&result).unwrap();
643 assert_eq!(encoded["errors"][0]["code"], "merge.path.too_deep");
644 }
645
646 #[test]
647 fn update_result_without_errors_omits_field_from_json() {
648 let result = UpdateResult {
649 old_value: None,
650 new_value: serde_json::json!({"a": 1}),
651 errors: vec![],
652 };
653 let encoded = serde_json::to_value(&result).unwrap();
654 assert!(
655 encoded.get("errors").is_none(),
656 "errors field should be omitted when empty for backward compat"
657 );
658 }
659}