1use std::{collections::HashMap, sync::Arc};
2
3use futures_util::future::BoxFuture;
4use schemars::JsonSchema;
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7
8use crate::{
9 channels::{ChannelReader, ChannelWriter, StreamChannelRef},
10 error::IIIError,
11 protocol::{RegisterFunctionMessage, RegisterTriggerTypeMessage},
12 triggers::TriggerHandler,
13};
14
15pub type RemoteFunctionHandler =
16 Arc<dyn Fn(Value) -> BoxFuture<'static, Result<Value, IIIError>> + Send + Sync>;
17
18#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
38#[serde(untagged)]
43pub enum MergePath {
44 Single(String),
45 Segments(Vec<String>),
46}
47
48impl From<&str> for MergePath {
49 fn from(value: &str) -> Self {
50 Self::Single(value.to_string())
51 }
52}
53
54impl From<String> for MergePath {
55 fn from(value: String) -> Self {
56 Self::Single(value)
57 }
58}
59
60impl From<Vec<String>> for MergePath {
61 fn from(value: Vec<String>) -> Self {
62 Self::Segments(value)
63 }
64}
65
66impl From<Vec<&str>> for MergePath {
67 fn from(value: Vec<&str>) -> Self {
68 Self::Segments(value.into_iter().map(String::from).collect())
69 }
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
74#[serde(tag = "type", rename_all = "lowercase")]
75pub enum UpdateOp {
76 Set { path: String, value: Option<Value> },
78
79 Merge {
83 #[serde(default, skip_serializing_if = "Option::is_none")]
84 path: Option<MergePath>,
85 value: Value,
86 },
87
88 Increment { path: String, by: i64 },
90
91 Decrement { path: String, by: i64 },
93
94 Append {
99 #[serde(default, skip_serializing_if = "Option::is_none")]
100 path: Option<MergePath>,
101 value: Value,
102 },
103
104 Remove { path: String },
106}
107
108impl UpdateOp {
109 pub fn set(path: impl Into<String>, value: impl Into<Option<Value>>) -> Self {
111 Self::Set {
112 path: path.into(),
113 value: value.into(),
114 }
115 }
116
117 pub fn increment(path: impl Into<String>, by: i64) -> Self {
119 Self::Increment {
120 path: path.into(),
121 by,
122 }
123 }
124
125 pub fn decrement(path: impl Into<String>, by: i64) -> Self {
127 Self::Decrement {
128 path: path.into(),
129 by,
130 }
131 }
132
133 pub fn append(path: impl Into<MergePath>, value: impl Into<Value>) -> Self {
137 Self::Append {
138 path: Some(path.into()),
139 value: value.into(),
140 }
141 }
142
143 pub fn append_root(value: impl Into<Value>) -> Self {
145 Self::Append {
146 path: None,
147 value: value.into(),
148 }
149 }
150
151 pub fn append_at_path<I, S>(segments: I, value: impl Into<Value>) -> Self
154 where
155 I: IntoIterator<Item = S>,
156 S: Into<String>,
157 {
158 Self::Append {
159 path: Some(MergePath::Segments(
160 segments.into_iter().map(Into::into).collect(),
161 )),
162 value: value.into(),
163 }
164 }
165
166 pub fn remove(path: impl Into<String>) -> Self {
168 Self::Remove { path: path.into() }
169 }
170
171 pub fn merge(value: impl Into<Value>) -> Self {
173 Self::Merge {
174 path: None,
175 value: value.into(),
176 }
177 }
178
179 pub fn merge_at(path: impl Into<MergePath>, value: impl Into<Value>) -> Self {
183 Self::Merge {
184 path: Some(path.into()),
185 value: value.into(),
186 }
187 }
188
189 pub fn merge_at_path<I, S>(segments: I, value: impl Into<Value>) -> Self
192 where
193 I: IntoIterator<Item = S>,
194 S: Into<String>,
195 {
196 Self::Merge {
197 path: Some(MergePath::Segments(
198 segments.into_iter().map(Into::into).collect(),
199 )),
200 value: value.into(),
201 }
202 }
203}
204
205#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
207pub struct UpdateOpError {
208 pub op_index: usize,
210 pub code: String,
212 pub message: String,
214 #[serde(default, skip_serializing_if = "Option::is_none")]
216 pub doc_url: Option<String>,
217}
218
219#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
221pub struct UpdateResult {
222 pub old_value: Option<Value>,
224 pub new_value: Value,
226 #[serde(default, skip_serializing_if = "Vec::is_empty")]
230 pub errors: Vec<UpdateOpError>,
231}
232
233#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
234pub struct SetResult {
235 pub old_value: Option<Value>,
237 pub new_value: Value,
239}
240
241#[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)]
242pub struct DeleteResult {
243 pub old_value: Option<Value>,
245}
246
247#[derive(Debug, Clone, Serialize, Deserialize)]
253pub struct StreamGetInput {
254 pub stream_name: String,
255 pub group_id: String,
256 pub item_id: String,
257}
258
259#[derive(Debug, Clone, Serialize, Deserialize)]
261pub struct StreamSetInput {
262 pub stream_name: String,
263 pub group_id: String,
264 pub item_id: String,
265 pub data: Value,
266}
267
268#[derive(Debug, Clone, Serialize, Deserialize)]
270pub struct StreamDeleteInput {
271 pub stream_name: String,
272 pub group_id: String,
273 pub item_id: String,
274}
275
276#[derive(Debug, Clone, Serialize, Deserialize)]
278pub struct StreamListInput {
279 pub stream_name: String,
280 pub group_id: String,
281}
282
283#[derive(Debug, Clone, Serialize, Deserialize)]
285pub struct StreamListGroupsInput {
286 pub stream_name: String,
287}
288
289#[derive(Debug, Clone, Serialize, Deserialize)]
291pub struct StreamUpdateInput {
292 pub stream_name: String,
293 pub group_id: String,
294 pub item_id: String,
295 pub ops: Vec<UpdateOp>,
296}
297
298#[derive(Debug, Clone, Serialize, Deserialize)]
304pub struct StreamAuthInput {
305 pub headers: HashMap<String, String>,
306 pub path: String,
307 pub query_params: HashMap<String, Vec<String>>,
308 pub addr: String,
309}
310
311#[derive(Debug, Clone, Serialize, Deserialize)]
313pub struct StreamAuthResult {
314 pub context: Option<Value>,
315}
316
317#[derive(Debug, Clone, Serialize, Deserialize)]
319pub struct StreamJoinResult {
320 pub unauthorized: bool,
321}
322
323#[derive(Clone)]
324pub struct RemoteFunctionData {
325 pub message: RegisterFunctionMessage,
326 pub handler: Option<RemoteFunctionHandler>,
327}
328
329#[derive(Clone)]
330pub struct RemoteTriggerTypeData {
331 pub message: RegisterTriggerTypeMessage,
332 pub handler: Arc<dyn TriggerHandler>,
333}
334
335#[derive(Debug, Clone, Serialize, Deserialize)]
336pub struct ApiRequest<T = Value> {
337 #[serde(default)]
338 pub query_params: HashMap<String, String>,
339 #[serde(default)]
340 pub path_params: HashMap<String, String>,
341 #[serde(default)]
342 pub headers: HashMap<String, String>,
343 #[serde(default)]
344 pub path: String,
345 #[serde(default)]
346 pub method: String,
347 #[serde(default)]
348 pub body: T,
349}
350
351#[derive(Debug, Clone, Serialize, Deserialize)]
352pub struct ApiResponse<T = Value> {
353 pub status_code: u16,
354 #[serde(default)]
355 pub headers: HashMap<String, String>,
356 pub body: T,
357}
358
359pub struct Channel {
361 pub writer: ChannelWriter,
362 pub reader: ChannelReader,
363 pub writer_ref: StreamChannelRef,
364 pub reader_ref: StreamChannelRef,
365}
366
367#[cfg(test)]
368mod tests {
369 use super::*;
370
371 #[test]
372 fn api_request_defaults_when_missing_fields() {
373 let request: ApiRequest = serde_json::from_str("{}").unwrap();
374
375 assert!(request.query_params.is_empty());
376 assert!(request.path_params.is_empty());
377 assert!(request.headers.is_empty());
378 assert_eq!(request.path, "");
379 assert_eq!(request.method, "");
380 assert!(request.body.is_null());
381 }
382
383 #[test]
384 fn update_append_serializes_as_tagged_operation() {
385 let op = UpdateOp::append("chunks", serde_json::json!({"text": "hello"}));
386 let encoded = serde_json::to_value(&op).unwrap();
387
388 assert_eq!(
389 encoded,
390 serde_json::json!({
391 "type": "append",
392 "path": "chunks",
393 "value": {"text": "hello"},
394 })
395 );
396
397 let decoded: UpdateOp = serde_json::from_value(encoded).unwrap();
398 match decoded {
399 UpdateOp::Append {
400 path: Some(MergePath::Single(s)),
401 value,
402 } => {
403 assert_eq!(s, "chunks");
404 assert_eq!(value, serde_json::json!({"text": "hello"}));
405 }
406 other => panic!("expected single-string append, got {other:?}"),
407 }
408 }
409
410 #[test]
411 fn append_with_segments_path_round_trips_as_array() {
412 let op = UpdateOp::append_at_path(["entityId", "buffer"], serde_json::json!("chunk"));
413 let encoded = serde_json::to_value(&op).unwrap();
414
415 assert_eq!(
416 encoded,
417 serde_json::json!({
418 "type": "append",
419 "path": ["entityId", "buffer"],
420 "value": "chunk",
421 })
422 );
423
424 let decoded: UpdateOp = serde_json::from_value(encoded).unwrap();
425 match decoded {
426 UpdateOp::Append {
427 path: Some(MergePath::Segments(segs)),
428 value,
429 } => {
430 assert_eq!(segs, vec!["entityId", "buffer"]);
431 assert_eq!(value, serde_json::json!("chunk"));
432 }
433 other => panic!("expected segments append, got {other:?}"),
434 }
435 }
436
437 #[test]
438 fn append_with_root_path_round_trips() {
439 let op = UpdateOp::append_root(serde_json::json!("first"));
440 let encoded = serde_json::to_value(&op).unwrap();
441
442 assert_eq!(
446 encoded,
447 serde_json::json!({
448 "type": "append",
449 "value": "first",
450 })
451 );
452
453 let decoded: UpdateOp = serde_json::from_value(encoded).unwrap();
454 match decoded {
455 UpdateOp::Append { path: None, value } => {
456 assert_eq!(value, serde_json::json!("first"));
457 }
458 other => panic!("expected root append, got {other:?}"),
459 }
460 }
461
462 #[test]
463 fn append_path_omitted_deserializes_as_none() {
464 for raw in [
468 r#"{"type":"append","value":"x"}"#,
469 r#"{"type":"append","path":null,"value":"x"}"#,
470 ] {
471 let op: UpdateOp = serde_json::from_str(raw).unwrap_or_else(|e| {
472 panic!("expected to parse {raw:?} as UpdateOp::Append, got {e}")
473 });
474 match op {
475 UpdateOp::Append { path: None, value } => {
476 assert_eq!(value, serde_json::json!("x"));
477 }
478 other => panic!("expected root append for {raw:?}, got {other:?}"),
479 }
480 }
481 }
482
483 #[test]
484 fn merge_path_single_variant_deserializes_string_first() {
485 let single: MergePath = serde_json::from_str(r#""foo""#).unwrap();
490 assert_eq!(single, MergePath::Single("foo".to_string()));
491
492 let segments: MergePath = serde_json::from_str(r#"["a","b"]"#).unwrap();
493 assert_eq!(
494 segments,
495 MergePath::Segments(vec!["a".to_string(), "b".to_string()])
496 );
497 }
498
499 #[test]
500 fn merge_with_string_path_round_trips_to_single_variant() {
501 let op = UpdateOp::merge_at("session-abc", serde_json::json!({"author": "alice"}));
504 let encoded = serde_json::to_value(&op).unwrap();
505
506 assert_eq!(
507 encoded,
508 serde_json::json!({
509 "type": "merge",
510 "path": "session-abc",
511 "value": {"author": "alice"},
512 })
513 );
514
515 let decoded: UpdateOp = serde_json::from_value(encoded).unwrap();
516 match decoded {
517 UpdateOp::Merge {
518 path: Some(MergePath::Single(s)),
519 value,
520 } => {
521 assert_eq!(s, "session-abc");
522 assert_eq!(value, serde_json::json!({"author": "alice"}));
523 }
524 other => panic!("expected single-string merge, got {other:?}"),
525 }
526 }
527
528 #[test]
529 fn merge_with_segments_path_round_trips_as_array() {
530 let op = UpdateOp::merge_at_path(["sessions", "abc"], serde_json::json!({"ts": "chunk"}));
531 let encoded = serde_json::to_value(&op).unwrap();
532
533 assert_eq!(
534 encoded,
535 serde_json::json!({
536 "type": "merge",
537 "path": ["sessions", "abc"],
538 "value": {"ts": "chunk"},
539 })
540 );
541
542 let decoded: UpdateOp = serde_json::from_value(encoded).unwrap();
543 match decoded {
544 UpdateOp::Merge {
545 path: Some(MergePath::Segments(segs)),
546 value,
547 } => {
548 assert_eq!(segs, vec!["sessions", "abc"]);
549 assert_eq!(value, serde_json::json!({"ts": "chunk"}));
550 }
551 other => panic!("expected segments merge, got {other:?}"),
552 }
553 }
554
555 #[test]
556 fn merge_without_path_round_trips() {
557 let op = UpdateOp::merge(serde_json::json!({"x": 1}));
558 let encoded = serde_json::to_value(&op).unwrap();
559
560 assert_eq!(
565 encoded,
566 serde_json::json!({
567 "type": "merge",
568 "value": {"x": 1},
569 })
570 );
571
572 let decoded: UpdateOp = serde_json::from_value(encoded).unwrap();
573 match decoded {
574 UpdateOp::Merge { path: None, value } => {
575 assert_eq!(value, serde_json::json!({"x": 1}));
576 }
577 other => panic!("expected root merge, got {other:?}"),
578 }
579 }
580
581 #[test]
582 fn update_result_with_errors_serializes_field() {
583 let result = UpdateResult {
584 old_value: None,
585 new_value: serde_json::json!({"a": 1}),
586 errors: vec![UpdateOpError {
587 op_index: 0,
588 code: "merge.path.too_deep".to_string(),
589 message: "Path depth 33 exceeds maximum of 32".to_string(),
590 doc_url: Some("https://iii.dev/docs/workers/iii-state#merge-bounds".to_string()),
591 }],
592 };
593 let encoded = serde_json::to_value(&result).unwrap();
594 assert_eq!(encoded["errors"][0]["code"], "merge.path.too_deep");
595 }
596
597 #[test]
598 fn update_result_without_errors_omits_field_from_json() {
599 let result = UpdateResult {
600 old_value: None,
601 new_value: serde_json::json!({"a": 1}),
602 errors: vec![],
603 };
604 let encoded = serde_json::to_value(&result).unwrap();
605 assert!(
606 encoded.get("errors").is_none(),
607 "errors field should be omitted when empty for backward compat"
608 );
609 }
610}