use std::{collections::HashMap, sync::Arc};
use futures_util::future::BoxFuture;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use crate::{
channels::{ChannelReader, ChannelWriter, StreamChannelRef},
error::IIIError,
protocol::{RegisterFunctionMessage, RegisterTriggerTypeMessage},
triggers::TriggerHandler,
};
pub type RemoteFunctionHandler =
Arc<dyn Fn(Value) -> BoxFuture<'static, Result<Value, IIIError>> + Send + Sync>;
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, JsonSchema)]
pub struct FieldPath(pub String);
impl FieldPath {
pub fn new(path: impl Into<String>) -> Self {
Self(path.into())
}
pub fn root() -> Self {
Self(String::new())
}
}
impl From<&str> for FieldPath {
fn from(value: &str) -> Self {
Self(value.to_string())
}
}
impl From<String> for FieldPath {
fn from(value: String) -> Self {
Self(value)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
#[serde(untagged)]
pub enum MergePath {
Single(String),
Segments(Vec<String>),
}
impl From<&str> for MergePath {
fn from(value: &str) -> Self {
Self::Single(value.to_string())
}
}
impl From<String> for MergePath {
fn from(value: String) -> Self {
Self::Single(value)
}
}
impl From<Vec<String>> for MergePath {
fn from(value: Vec<String>) -> Self {
Self::Segments(value)
}
}
impl From<Vec<&str>> for MergePath {
fn from(value: Vec<&str>) -> Self {
Self::Segments(value.into_iter().map(String::from).collect())
}
}
impl From<FieldPath> for MergePath {
fn from(value: FieldPath) -> Self {
Self::Single(value.0)
}
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(tag = "type", rename_all = "lowercase")]
pub enum UpdateOp {
Set {
path: FieldPath,
value: Option<Value>,
},
Merge {
#[serde(default, skip_serializing_if = "Option::is_none")]
path: Option<MergePath>,
value: Value,
},
Increment { path: FieldPath, by: i64 },
Decrement { path: FieldPath, by: i64 },
Append {
#[serde(default, skip_serializing_if = "Option::is_none")]
path: Option<MergePath>,
value: Value,
},
Remove { path: FieldPath },
}
impl UpdateOp {
pub fn set(path: impl Into<FieldPath>, value: impl Into<Option<Value>>) -> Self {
Self::Set {
path: path.into(),
value: value.into(),
}
}
pub fn increment(path: impl Into<FieldPath>, by: i64) -> Self {
Self::Increment {
path: path.into(),
by,
}
}
pub fn decrement(path: impl Into<FieldPath>, by: i64) -> Self {
Self::Decrement {
path: path.into(),
by,
}
}
pub fn append(path: impl Into<MergePath>, value: impl Into<Value>) -> Self {
Self::Append {
path: Some(path.into()),
value: value.into(),
}
}
pub fn append_root(value: impl Into<Value>) -> Self {
Self::Append {
path: None,
value: value.into(),
}
}
pub fn append_at_path<I, S>(segments: I, value: impl Into<Value>) -> Self
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
Self::Append {
path: Some(MergePath::Segments(
segments.into_iter().map(Into::into).collect(),
)),
value: value.into(),
}
}
pub fn remove(path: impl Into<FieldPath>) -> Self {
Self::Remove { path: path.into() }
}
pub fn merge(value: impl Into<Value>) -> Self {
Self::Merge {
path: None,
value: value.into(),
}
}
pub fn merge_at(path: impl Into<MergePath>, value: impl Into<Value>) -> Self {
Self::Merge {
path: Some(path.into()),
value: value.into(),
}
}
pub fn merge_at_path<I, S>(segments: I, value: impl Into<Value>) -> Self
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
Self::Merge {
path: Some(MergePath::Segments(
segments.into_iter().map(Into::into).collect(),
)),
value: value.into(),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
pub struct UpdateOpError {
pub op_index: usize,
pub code: String,
pub message: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub doc_url: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub struct UpdateResult {
pub old_value: Option<Value>,
pub new_value: Value,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub errors: Vec<UpdateOpError>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub struct SetResult {
pub old_value: Option<Value>,
pub new_value: Value,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)]
pub struct DeleteResult {
pub old_value: Option<Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamGetInput {
pub stream_name: String,
pub group_id: String,
pub item_id: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamSetInput {
pub stream_name: String,
pub group_id: String,
pub item_id: String,
pub data: Value,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamDeleteInput {
pub stream_name: String,
pub group_id: String,
pub item_id: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamListInput {
pub stream_name: String,
pub group_id: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamListGroupsInput {
pub stream_name: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamUpdateInput {
pub stream_name: String,
pub group_id: String,
pub item_id: String,
pub ops: Vec<UpdateOp>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamAuthInput {
pub headers: HashMap<String, String>,
pub path: String,
pub query_params: HashMap<String, Vec<String>>,
pub addr: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamAuthResult {
pub context: Option<Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamJoinResult {
pub unauthorized: bool,
}
#[derive(Clone)]
pub struct RemoteFunctionData {
pub message: RegisterFunctionMessage,
pub handler: Option<RemoteFunctionHandler>,
}
#[derive(Clone)]
pub struct RemoteTriggerTypeData {
pub message: RegisterTriggerTypeMessage,
pub handler: Arc<dyn TriggerHandler>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ApiRequest<T = Value> {
#[serde(default)]
pub query_params: HashMap<String, String>,
#[serde(default)]
pub path_params: HashMap<String, String>,
#[serde(default)]
pub headers: HashMap<String, String>,
#[serde(default)]
pub path: String,
#[serde(default)]
pub method: String,
#[serde(default)]
pub body: T,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ApiResponse<T = Value> {
pub status_code: u16,
#[serde(default)]
pub headers: HashMap<String, String>,
pub body: T,
}
pub struct Channel {
pub writer: ChannelWriter,
pub reader: ChannelReader,
pub writer_ref: StreamChannelRef,
pub reader_ref: StreamChannelRef,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn api_request_defaults_when_missing_fields() {
let request: ApiRequest = serde_json::from_str("{}").unwrap();
assert!(request.query_params.is_empty());
assert!(request.path_params.is_empty());
assert!(request.headers.is_empty());
assert_eq!(request.path, "");
assert_eq!(request.method, "");
assert!(request.body.is_null());
}
#[test]
fn update_append_serializes_as_tagged_operation() {
let op = UpdateOp::append("chunks", serde_json::json!({"text": "hello"}));
let encoded = serde_json::to_value(&op).unwrap();
assert_eq!(
encoded,
serde_json::json!({
"type": "append",
"path": "chunks",
"value": {"text": "hello"},
})
);
let decoded: UpdateOp = serde_json::from_value(encoded).unwrap();
match decoded {
UpdateOp::Append {
path: Some(MergePath::Single(s)),
value,
} => {
assert_eq!(s, "chunks");
assert_eq!(value, serde_json::json!({"text": "hello"}));
}
other => panic!("expected single-string append, got {other:?}"),
}
}
#[test]
fn append_with_segments_path_round_trips_as_array() {
let op = UpdateOp::append_at_path(["entityId", "buffer"], serde_json::json!("chunk"));
let encoded = serde_json::to_value(&op).unwrap();
assert_eq!(
encoded,
serde_json::json!({
"type": "append",
"path": ["entityId", "buffer"],
"value": "chunk",
})
);
let decoded: UpdateOp = serde_json::from_value(encoded).unwrap();
match decoded {
UpdateOp::Append {
path: Some(MergePath::Segments(segs)),
value,
} => {
assert_eq!(segs, vec!["entityId", "buffer"]);
assert_eq!(value, serde_json::json!("chunk"));
}
other => panic!("expected segments append, got {other:?}"),
}
}
#[test]
fn append_with_root_path_round_trips() {
let op = UpdateOp::append_root(serde_json::json!("first"));
let encoded = serde_json::to_value(&op).unwrap();
assert_eq!(
encoded,
serde_json::json!({
"type": "append",
"value": "first",
})
);
let decoded: UpdateOp = serde_json::from_value(encoded).unwrap();
match decoded {
UpdateOp::Append { path: None, value } => {
assert_eq!(value, serde_json::json!("first"));
}
other => panic!("expected root append, got {other:?}"),
}
}
#[test]
fn append_path_omitted_deserializes_as_none() {
for raw in [
r#"{"type":"append","value":"x"}"#,
r#"{"type":"append","path":null,"value":"x"}"#,
] {
let op: UpdateOp = serde_json::from_str(raw).unwrap_or_else(|e| {
panic!("expected to parse {raw:?} as UpdateOp::Append, got {e}")
});
match op {
UpdateOp::Append { path: None, value } => {
assert_eq!(value, serde_json::json!("x"));
}
other => panic!("expected root append for {raw:?}, got {other:?}"),
}
}
}
#[test]
fn append_field_path_into_merge_path_compat() {
let fp = FieldPath::new("legacy");
let mp: MergePath = fp.into();
assert_eq!(mp, MergePath::Single("legacy".to_string()));
}
#[test]
fn merge_path_single_variant_deserializes_string_first() {
let single: MergePath = serde_json::from_str(r#""foo""#).unwrap();
assert_eq!(single, MergePath::Single("foo".to_string()));
let segments: MergePath = serde_json::from_str(r#"["a","b"]"#).unwrap();
assert_eq!(
segments,
MergePath::Segments(vec!["a".to_string(), "b".to_string()])
);
}
#[test]
fn merge_with_string_path_round_trips_to_single_variant() {
let op = UpdateOp::merge_at("session-abc", serde_json::json!({"author": "alice"}));
let encoded = serde_json::to_value(&op).unwrap();
assert_eq!(
encoded,
serde_json::json!({
"type": "merge",
"path": "session-abc",
"value": {"author": "alice"},
})
);
let decoded: UpdateOp = serde_json::from_value(encoded).unwrap();
match decoded {
UpdateOp::Merge {
path: Some(MergePath::Single(s)),
value,
} => {
assert_eq!(s, "session-abc");
assert_eq!(value, serde_json::json!({"author": "alice"}));
}
other => panic!("expected single-string merge, got {other:?}"),
}
}
#[test]
fn merge_with_segments_path_round_trips_as_array() {
let op = UpdateOp::merge_at_path(["sessions", "abc"], serde_json::json!({"ts": "chunk"}));
let encoded = serde_json::to_value(&op).unwrap();
assert_eq!(
encoded,
serde_json::json!({
"type": "merge",
"path": ["sessions", "abc"],
"value": {"ts": "chunk"},
})
);
let decoded: UpdateOp = serde_json::from_value(encoded).unwrap();
match decoded {
UpdateOp::Merge {
path: Some(MergePath::Segments(segs)),
value,
} => {
assert_eq!(segs, vec!["sessions", "abc"]);
assert_eq!(value, serde_json::json!({"ts": "chunk"}));
}
other => panic!("expected segments merge, got {other:?}"),
}
}
#[test]
fn merge_without_path_round_trips() {
let op = UpdateOp::merge(serde_json::json!({"x": 1}));
let encoded = serde_json::to_value(&op).unwrap();
assert_eq!(
encoded,
serde_json::json!({
"type": "merge",
"value": {"x": 1},
})
);
let decoded: UpdateOp = serde_json::from_value(encoded).unwrap();
match decoded {
UpdateOp::Merge { path: None, value } => {
assert_eq!(value, serde_json::json!({"x": 1}));
}
other => panic!("expected root merge, got {other:?}"),
}
}
#[test]
fn update_result_with_errors_serializes_field() {
let result = UpdateResult {
old_value: None,
new_value: serde_json::json!({"a": 1}),
errors: vec![UpdateOpError {
op_index: 0,
code: "merge.path.too_deep".to_string(),
message: "Path depth 33 exceeds maximum of 32".to_string(),
doc_url: Some("https://iii.dev/docs/workers/iii-state#merge-bounds".to_string()),
}],
};
let encoded = serde_json::to_value(&result).unwrap();
assert_eq!(encoded["errors"][0]["code"], "merge.path.too_deep");
}
#[test]
fn update_result_without_errors_omits_field_from_json() {
let result = UpdateResult {
old_value: None,
new_value: serde_json::json!({"a": 1}),
errors: vec![],
};
let encoded = serde_json::to_value(&result).unwrap();
assert!(
encoded.get("errors").is_none(),
"errors field should be omitted when empty for backward compat"
);
}
}