#![forbid(unsafe_code)]
pub use jmap_types::{
Argument, Id, Invocation, JmapError, JmapRequest, JmapResponse, ResultReference, State, UTCDate,
};
pub mod backend;
pub mod handlers;
pub(crate) mod helpers;
pub use backend::{
AddedItem, BackendChangesError, BackendSetError, ChangesResult, GetObject, JmapBackend,
JmapObject, QueryChangesResult, QueryObject, QueryResult, SetError, SetErrorType, SetObject,
};
pub use handlers::{handle_changes, handle_get, handle_query, handle_query_changes};
pub use helpers::{extract_account_id, not_found_json, now_utc_string, ser};
mod parse;
mod response;
pub use parse::{parse_request, resolve_args};
pub use response::{error_invocation, error_status, request_error, RequestError};
use std::{collections::HashMap, fmt, future::Future, pin::Pin, sync::Arc};
use serde_json::Value;
use tokio::task;
pub type HandlerFuture =
Pin<Box<dyn Future<Output = Result<(Value, Vec<Invocation>), JmapError>> + Send>>;
pub trait JmapHandler<CallerCtx>: Send + Sync {
fn call(
&self,
method: String,
call_id: String,
args: Value,
caller: CallerCtx,
) -> HandlerFuture;
}
pub struct Dispatcher<CallerCtx> {
handlers: HashMap<String, Arc<dyn JmapHandler<CallerCtx>>>,
}
impl<CallerCtx: Clone + Send + 'static> Dispatcher<CallerCtx> {
pub fn new() -> Self {
Self {
handlers: HashMap::new(),
}
}
pub fn register(
&mut self,
method: impl Into<String>,
handler: Arc<dyn JmapHandler<CallerCtx>>,
) {
self.handlers.insert(method.into(), handler);
}
pub async fn dispatch(
&self,
request: JmapRequest,
caller: CallerCtx,
session_state: State,
) -> JmapResponse {
let mut method_responses: Vec<Invocation> = Vec::with_capacity(request.method_calls.len());
let client_sent_created_ids = request.created_ids.is_some();
let mut created_ids: HashMap<Id, Id> = request.created_ids.unwrap_or_default();
for (method, mut args, call_id) in request.method_calls {
if let Err(e) = resolve_args(&mut args, &method_responses) {
method_responses.push(error_invocation(&call_id, e));
continue;
}
let handler = match self.handlers.get(&method) {
Some(h) => Arc::clone(h),
None => {
method_responses.push(error_invocation(&call_id, JmapError::unknown_method()));
continue;
}
};
let caller_clone = caller.clone();
let method_clone = method.clone();
let call_id_clone = call_id.clone();
let result: Result<
Result<(Value, Vec<Invocation>), JmapError>,
tokio::task::JoinError,
> = task::spawn(async move {
handler
.call(method_clone, call_id_clone, args, caller_clone)
.await
})
.await;
match result {
Ok(Ok((primary_value, extra_invocations))) => {
if client_sent_created_ids {
if let Some(map) = primary_value.get("created").and_then(|v| v.as_object())
{
for (client_id, created_obj) in map {
if let Some(id_val) = created_obj.get("id").and_then(|v| v.as_str())
{
created_ids.insert(client_id.as_str().into(), id_val.into());
}
}
}
}
method_responses.push((method, primary_value, call_id));
method_responses.extend(extra_invocations);
}
Ok(Err(e)) => {
method_responses.push(error_invocation(&call_id, e));
}
Err(join_err) => {
let desc = if join_err.is_cancelled() {
"task cancelled"
} else {
"internal error"
};
method_responses.push(error_invocation(&call_id, JmapError::server_fail(desc)));
}
}
}
let created_ids = client_sent_created_ids.then_some(created_ids);
JmapResponse::new(method_responses, session_state, created_ids)
}
}
impl<CallerCtx: Clone + Send + 'static> Default for Dispatcher<CallerCtx> {
fn default() -> Self {
Self::new()
}
}
impl<CallerCtx> fmt::Debug for Dispatcher<CallerCtx> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Dispatcher")
.field("methods", &self.handlers.keys())
.finish()
}
}
pub type BackendCallFn<B> =
dyn Fn(Arc<B>, String, serde_json::Value) -> HandlerFuture + Send + Sync + 'static;
pub struct ClosureHandler<B: Send + Sync + 'static> {
pub backend: Arc<B>,
pub call_fn: Box<BackendCallFn<B>>,
}
impl<B: Send + Sync + 'static, C: Clone + Send + 'static> JmapHandler<C> for ClosureHandler<B> {
fn call(
&self,
_method: String,
call_id: String,
args: serde_json::Value,
_caller: C,
) -> HandlerFuture {
(self.call_fn)(Arc::clone(&self.backend), call_id, args)
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::{json, Value};
use std::sync::{Arc, Mutex};
#[allow(dead_code)]
fn assert_dispatcher_send_sync() {
fn check<T: Send + Sync>() {}
check::<Dispatcher<String>>();
check::<Dispatcher<()>>();
}
struct EchoHandler(Value);
impl<C: Clone + Send + 'static> JmapHandler<C> for EchoHandler {
fn call(
&self,
_method: String,
_call_id: String,
_args: Value,
_caller: C,
) -> HandlerFuture {
let v = self.0.clone();
Box::pin(async move { Ok((v, vec![])) })
}
}
struct ErrorHandler(JmapError);
impl JmapHandler<String> for ErrorHandler {
fn call(
&self,
_method: String,
_call_id: String,
_args: Value,
_caller: String,
) -> HandlerFuture {
let e = self.0.clone();
Box::pin(async move { Err(e) })
}
}
struct CaptureArgsHandler(Arc<Mutex<Option<Value>>>);
impl JmapHandler<String> for CaptureArgsHandler {
fn call(
&self,
_method: String,
_call_id: String,
args: Value,
_caller: String,
) -> HandlerFuture {
let slot = self.0.clone();
Box::pin(async move {
*slot.lock().expect("test: mutex poisoned") = Some(args);
Ok((json!({}), vec![]))
})
}
}
struct CaptureCallerHandler(Arc<Mutex<Option<String>>>);
impl JmapHandler<String> for CaptureCallerHandler {
fn call(
&self,
_method: String,
_call_id: String,
_args: Value,
caller: String,
) -> HandlerFuture {
let slot = self.0.clone();
Box::pin(async move {
*slot.lock().expect("test: mutex poisoned") = Some(caller);
Ok((json!({}), vec![]))
})
}
}
struct PanicHandler;
impl JmapHandler<String> for PanicHandler {
fn call(
&self,
_method: String,
_call_id: String,
_args: Value,
_caller: String,
) -> HandlerFuture {
Box::pin(async move { panic!("deliberate test panic") })
}
}
fn single_call(method: &str, args: Value, call_id: &str) -> JmapRequest {
JmapRequest::new(
vec!["urn:ietf:params:jmap:core".into()],
vec![(method.into(), args, call_id.into())],
None,
)
}
#[tokio::test]
async fn unknown_method_returns_error_invocation() {
let d: Dispatcher<String> = Dispatcher::new();
let req = single_call("Foo/get", json!({}), "c0");
let resp = d.dispatch(req, "alice".into(), "s0".into()).await;
assert_eq!(resp.method_responses.len(), 1);
let (_, args, call_id) = &resp.method_responses[0];
assert_eq!(call_id, "c0");
assert_eq!(args["type"], "unknownMethod");
}
#[tokio::test]
async fn known_method_success() {
let mut d: Dispatcher<String> = Dispatcher::new();
d.register("Foo/get", Arc::new(EchoHandler(json!({"list": []}))));
let req = single_call("Foo/get", json!({}), "c1");
let resp = d.dispatch(req, "alice".into(), "s0".into()).await;
assert_eq!(resp.method_responses.len(), 1);
let (method, args, call_id) = &resp.method_responses[0];
assert_eq!(method, "Foo/get");
assert_eq!(call_id, "c1");
assert_eq!(args["list"], json!([]));
}
#[tokio::test]
async fn handler_returns_error() {
let mut d: Dispatcher<String> = Dispatcher::new();
d.register("Foo/get", Arc::new(ErrorHandler(JmapError::not_found())));
let req = single_call("Foo/get", json!({}), "c2");
let resp = d.dispatch(req, "alice".into(), "s0".into()).await;
assert_eq!(resp.method_responses.len(), 1);
let (_, args, _) = &resp.method_responses[0];
assert_eq!(args["type"], "notFound");
}
#[tokio::test]
async fn session_state_echoed() {
let d: Dispatcher<String> = Dispatcher::new();
let req = JmapRequest::new(vec!["urn:ietf:params:jmap:core".into()], vec![], None);
let resp = d.dispatch(req, "alice".into(), "my-state-123".into()).await;
assert_eq!(resp.session_state.as_ref(), "my-state-123");
}
#[tokio::test]
async fn mixed_batch_all_responses_in_order() {
let mut d: Dispatcher<String> = Dispatcher::new();
d.register("M/a", Arc::new(EchoHandler(json!({"ok": true}))));
let req = JmapRequest::new(
vec!["urn:ietf:params:jmap:core".into()],
vec![
("M/a".into(), json!({}), "c0".into()),
("M/b".into(), json!({}), "c1".into()),
("M/a".into(), json!({}), "c2".into()),
],
None,
);
let resp = d.dispatch(req, "alice".into(), "s0".into()).await;
assert_eq!(
resp.method_responses.len(),
3,
"all three calls must produce a response"
);
assert_eq!(resp.method_responses[0].2, "c0");
assert!(
resp.method_responses[0].1.get("type").is_none(),
"c0 must not be an error"
);
assert_eq!(resp.method_responses[1].2, "c1");
assert_eq!(resp.method_responses[1].1["type"], "unknownMethod");
assert_eq!(resp.method_responses[2].2, "c2");
assert!(
resp.method_responses[2].1.get("type").is_none(),
"c2 must not be an error"
);
}
#[tokio::test]
async fn error_does_not_abort_subsequent_calls() {
let mut d: Dispatcher<String> = Dispatcher::new();
d.register("M/ok", Arc::new(EchoHandler(json!({"ok": true}))));
d.register("M/err", Arc::new(ErrorHandler(JmapError::forbidden())));
let req = JmapRequest::new(
vec!["urn:ietf:params:jmap:core".into()],
vec![
("M/err".into(), json!({}), "c0".into()),
("M/ok".into(), json!({}), "c1".into()),
],
None,
);
let resp = d.dispatch(req, "alice".into(), "s0".into()).await;
assert_eq!(resp.method_responses.len(), 2);
assert_eq!(resp.method_responses[0].1["type"], "forbidden");
assert!(
resp.method_responses[1].1.get("type").is_none(),
"second call must succeed"
);
}
#[tokio::test]
async fn panicking_handler_returns_server_fail() {
let mut d: Dispatcher<String> = Dispatcher::new();
d.register("Panic/now", Arc::new(PanicHandler));
let req = single_call("Panic/now", json!({}), "c0");
let resp = d.dispatch(req, "alice".into(), "s0".into()).await;
assert_eq!(resp.method_responses.len(), 1);
let (_, args, _) = &resp.method_responses[0];
assert_eq!(
args["type"], "serverFail",
"panicking handler must produce serverFail"
);
}
#[tokio::test]
async fn panic_message_not_in_response() {
let mut d: Dispatcher<String> = Dispatcher::new();
d.register("Panic/now", Arc::new(PanicHandler));
let req = single_call("Panic/now", json!({}), "c0");
let resp = d.dispatch(req, "alice".into(), "s0".into()).await;
let (_, args, _) = &resp.method_responses[0];
if let Some(desc) = args["description"].as_str() {
assert!(
!desc.contains("deliberate test panic"),
"panic message must not leak into response description"
);
}
}
#[tokio::test]
async fn result_reference_resolved_before_dispatch() {
let captured = Arc::new(Mutex::new(None::<Value>));
let mut d: Dispatcher<String> = Dispatcher::new();
d.register(
"Foo/get",
Arc::new(EchoHandler(json!({"list": [{"id": "item-1"}]}))),
);
d.register(
"Bar/query",
Arc::new(CaptureArgsHandler(Arc::clone(&captured))),
);
let req = JmapRequest::new(
vec!["urn:ietf:params:jmap:core".into()],
vec![
("Foo/get".into(), json!({}), "c0".into()),
(
"Bar/query".into(),
json!({"#ids": {"resultOf": "c0", "name": "Foo/get", "path": "/list/0/id"}}),
"c1".into(),
),
],
None,
);
let resp = d.dispatch(req, "alice".into(), "s0".into()).await;
assert_eq!(resp.method_responses.len(), 2);
assert!(
resp.method_responses[1].1.get("type").is_none(),
"Bar/query must succeed after ResultReference resolution"
);
let got = captured
.lock()
.unwrap()
.clone()
.expect("CaptureArgsHandler was not called");
assert_eq!(
got["ids"],
json!("item-1"),
"resolved value must be the string item-1"
);
assert!(
got.get("#ids").is_none(),
"#ids key must have been replaced"
);
}
#[tokio::test]
async fn result_reference_failure_stops_that_call() {
let d: Dispatcher<String> = Dispatcher::new();
let req = single_call(
"Foo/get",
json!({"#ids": {"resultOf": "nonexistent", "name": "Foo/get", "path": "/x"}}),
"c0",
);
let resp = d.dispatch(req, "alice".into(), "s0".into()).await;
assert_eq!(resp.method_responses.len(), 1);
let (_, args, _) = &resp.method_responses[0];
assert!(
args.get("type").is_some(),
"failed ResultReference must produce an error invocation"
);
}
#[tokio::test]
async fn created_ids_accumulated_from_set_response() {
let mut d: Dispatcher<String> = Dispatcher::new();
d.register(
"Foo/set",
Arc::new(EchoHandler(
json!({"created": {"client-1": {"id": "server-abc"}}}),
)),
);
let req = JmapRequest::new(
vec!["urn:ietf:params:jmap:core".into()],
vec![("Foo/set".into(), json!({}), "c0".into())],
Some(std::collections::HashMap::new()),
);
let resp = d.dispatch(req, "alice".into(), "s0".into()).await;
let ids = resp
.created_ids
.as_ref()
.expect("created_ids must be Some when client sent createdIds");
assert_eq!(
ids.get(&Id::from("client-1")),
Some(&Id::from("server-abc")),
"client-1 must map to server-abc"
);
}
#[tokio::test]
async fn created_ids_absent_when_no_set() {
let mut d: Dispatcher<String> = Dispatcher::new();
d.register("Foo/get", Arc::new(EchoHandler(json!({"list": []}))));
let req = single_call("Foo/get", json!({}), "c0");
let resp = d.dispatch(req, "alice".into(), "s0".into()).await;
assert!(
resp.created_ids.is_none(),
"created_ids must be None when no /set call created objects"
);
}
#[tokio::test]
async fn created_ids_accumulated_across_multiple_set_calls() {
let mut d: Dispatcher<String> = Dispatcher::new();
d.register(
"A/set",
Arc::new(EchoHandler(json!({"created": {"cA": {"id": "sA"}}}))),
);
d.register(
"B/set",
Arc::new(EchoHandler(json!({"created": {"cB": {"id": "sB"}}}))),
);
let req = JmapRequest::new(
vec!["urn:ietf:params:jmap:core".into()],
vec![
("A/set".into(), json!({}), "c0".into()),
("B/set".into(), json!({}), "c1".into()),
],
Some(std::collections::HashMap::new()),
);
let resp = d.dispatch(req, "alice".into(), "s0".into()).await;
let ids = resp
.created_ids
.as_ref()
.expect("created_ids must be Some when client sent createdIds");
assert_eq!(
ids.get(&Id::from("cA")),
Some(&Id::from("sA")),
"cA must be present"
);
assert_eq!(
ids.get(&Id::from("cB")),
Some(&Id::from("sB")),
"cB must be present"
);
}
#[tokio::test]
async fn created_ids_merges_with_pre_populated_map() {
let mut d: Dispatcher<String> = Dispatcher::new();
d.register(
"Foo/set",
Arc::new(EchoHandler(
json!({"created": {"client-new": {"id": "server-new"}}}),
)),
);
let mut initial = std::collections::HashMap::new();
initial.insert(Id::from("client-old"), Id::from("server-old"));
let req = JmapRequest::new(
vec!["urn:ietf:params:jmap:core".into()],
vec![("Foo/set".into(), json!({}), "c0".into())],
Some(initial),
);
let resp = d.dispatch(req, "alice".into(), "s0".into()).await;
let ids = resp
.created_ids
.as_ref()
.expect("created_ids must be Some when client sent createdIds");
assert_eq!(
ids.get(&Id::from("client-old")),
Some(&Id::from("server-old")),
"pre-populated entry must be preserved"
);
assert_eq!(
ids.get(&Id::from("client-new")),
Some(&Id::from("server-new")),
"new /set entry must be merged in"
);
}
#[tokio::test]
async fn caller_ctx_passed_to_handler() {
let captured = Arc::new(Mutex::new(None::<String>));
let mut d: Dispatcher<String> = Dispatcher::new();
d.register(
"Foo/get",
Arc::new(CaptureCallerHandler(Arc::clone(&captured))),
);
let req = single_call("Foo/get", json!({}), "c0");
let resp = d.dispatch(req, "alice".into(), "s0".into()).await;
assert!(
resp.method_responses[0].1.get("type").is_none(),
"must succeed"
);
let got = captured
.lock()
.unwrap()
.clone()
.expect("handler was not called");
assert_eq!(got, "alice", "caller must be passed through unchanged");
}
#[tokio::test]
async fn unit_caller_ctx_works() {
let mut d: Dispatcher<()> = Dispatcher::new();
d.register("Foo/get", Arc::new(EchoHandler(json!({"ok": true}))));
let req = single_call("Foo/get", json!({}), "c0");
let resp = d.dispatch(req, (), "s0".into()).await;
assert_eq!(resp.method_responses.len(), 1);
assert!(
resp.method_responses[0].1.get("type").is_none(),
"must succeed with () caller"
);
}
struct ExtraInvocationHandler;
impl JmapHandler<String> for ExtraInvocationHandler {
fn call(
&self,
_method: String,
_call_id: String,
_args: Value,
_caller: String,
) -> HandlerFuture {
Box::pin(async move {
let primary = json!({"type": "primary"});
let extra: Vec<Invocation> = vec![(
"Extra/call".to_owned(),
json!({"type": "extra"}),
"x0".to_owned(),
)];
Ok((primary, extra))
})
}
}
#[tokio::test]
async fn extra_invocations_appended_after_primary() {
let mut d: Dispatcher<String> = Dispatcher::new();
d.register("Sub/set", Arc::new(ExtraInvocationHandler));
let req = single_call("Sub/set", json!({}), "c0");
let resp = d.dispatch(req, "alice".into(), "s0".into()).await;
assert_eq!(
resp.method_responses.len(),
2,
"primary + 1 extra = 2 total invocations"
);
assert_eq!(resp.method_responses[0].0, "Sub/set");
assert_eq!(resp.method_responses[0].2, "c0");
assert_eq!(resp.method_responses[0].1["type"], "primary");
assert_eq!(resp.method_responses[1].0, "Extra/call");
assert_eq!(resp.method_responses[1].2, "x0");
assert_eq!(resp.method_responses[1].1["type"], "extra");
}
}